diff --git a/Dockerfile b/Dockerfile index c9a9848c5..0bddf0d91 100644 --- a/Dockerfile +++ b/Dockerfile @@ -5,7 +5,7 @@ ENV LUAJIT_VERSION v2.1 RUN apk add --no-cache git build-base cmake curl-dev zlib-dev zstd-dev \ sqlite-dev postgresql-dev hiredis-dev leveldb-dev \ - gmp-dev jsoncpp-dev ninja ca-certificates + gmp-dev jsoncpp-dev linux-headers ninja ca-certificates WORKDIR /usr/src/ RUN git clone --recursive https://github.com/jupp0r/prometheus-cpp && \ diff --git a/doc/compiling/linux.md b/doc/compiling/linux.md index 7984d34d7..1f6b6cf10 100644 --- a/doc/compiling/linux.md +++ b/doc/compiling/linux.md @@ -22,7 +22,7 @@ For Debian/Ubuntu users: - sudo apt install g++ make libc6-dev cmake libpng-dev libjpeg-dev libgl1-mesa-dev libsqlite3-dev libogg-dev libvorbis-dev libopenal-dev libcurl4-gnutls-dev libfreetype6-dev zlib1g-dev libgmp-dev libjsoncpp-dev libzstd-dev libluajit-5.1-dev gettext libsdl2-dev + sudo apt install g++ make libc6-dev cmake linux-libc-dev libpng-dev libjpeg-dev libgl1-mesa-dev libsqlite3-dev libogg-dev libvorbis-dev libopenal-dev libcurl4-gnutls-dev libfreetype6-dev zlib1g-dev libgmp-dev libjsoncpp-dev libzstd-dev libluajit-5.1-dev gettext libsdl2-dev For Fedora users: @@ -34,11 +34,11 @@ For openSUSE users: For Arch users: - sudo pacman -S --needed base-devel libcurl-gnutls cmake libpng libjpeg-turbo sqlite libogg libvorbis openal freetype2 jsoncpp gmp luajit leveldb ncurses zstd gettext sdl2 + sudo pacman -S --needed base-devel libcurl-gnutls cmake linux-api-headers libpng libjpeg-turbo sqlite libogg libvorbis openal freetype2 jsoncpp gmp luajit leveldb ncurses zstd gettext sdl2 For Alpine users: - sudo apk add build-base cmake libpng-dev jpeg-dev mesa-dev sqlite-dev libogg-dev libvorbis-dev openal-soft-dev curl-dev freetype-dev zlib-dev gmp-dev jsoncpp-dev luajit-dev zstd-dev gettext sdl2-dev + sudo apk add build-base cmake linux-headers libpng-dev jpeg-dev mesa-dev sqlite-dev libogg-dev libvorbis-dev openal-soft-dev curl-dev freetype-dev zlib-dev gmp-dev jsoncpp-dev luajit-dev zstd-dev gettext sdl2-dev For Void users: diff --git a/src/benchmark/CMakeLists.txt b/src/benchmark/CMakeLists.txt index bf2bf15db..ea3fbd5c5 100644 --- a/src/benchmark/CMakeLists.txt +++ b/src/benchmark/CMakeLists.txt @@ -1,11 +1,12 @@ set (BENCHMARK_SRCS ${CMAKE_CURRENT_SOURCE_DIR}/benchmark.cpp ${CMAKE_CURRENT_SOURCE_DIR}/benchmark_activeobjectmgr.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/benchmark_ipc_channel.cpp ${CMAKE_CURRENT_SOURCE_DIR}/benchmark_lighting.cpp - ${CMAKE_CURRENT_SOURCE_DIR}/benchmark_serialize.cpp ${CMAKE_CURRENT_SOURCE_DIR}/benchmark_mapblock.cpp ${CMAKE_CURRENT_SOURCE_DIR}/benchmark_map.cpp ${CMAKE_CURRENT_SOURCE_DIR}/benchmark_mapmodify.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/benchmark_serialize.cpp ${CMAKE_CURRENT_SOURCE_DIR}/benchmark_sha.cpp PARENT_SCOPE) diff --git a/src/benchmark/benchmark_ipc_channel.cpp b/src/benchmark/benchmark_ipc_channel.cpp new file mode 100644 index 000000000..a0d25e177 --- /dev/null +++ b/src/benchmark/benchmark_ipc_channel.cpp @@ -0,0 +1,46 @@ +// SPDX-FileCopyrightText: 2024 Luanti authors +// +// SPDX-License-Identifier: LGPL-2.1-or-later + +#include "catch.h" +#include "threading/ipc_channel.h" +#include + +TEST_CASE("benchmark_ipc_channel") +{ + auto end_a_thread_b_p = make_test_ipc_channel([](IPCChannelEnd end_b) { + // echos back messages. stops if "" is sent + while (true) { + end_b.recv(); + end_b.send(end_b.getRecvData(), end_b.getRecvSize()); + if (end_b.getRecvSize() == 0) + break; + } + }); + // Can't use structured bindings before C++20, because of lamda captures below. + auto end_a = std::move(end_a_thread_b_p.first); + auto thread_b = std::move(end_a_thread_b_p.second); + + BENCHMARK("simple_call_1", i) { + u8 buf[16] = {}; + buf[i & 0xf] = i; + end_a.exchange(buf, 16); + return reinterpret_cast(end_a.getRecvData())[i & 0xf]; + }; + + BENCHMARK("simple_call_1000", i) { + u8 buf[16] = {}; + buf[i & 0xf] = i; + for (int k = 0; k < 1000; ++k) { + buf[0] = k & 0xff; + end_a.exchange(buf, 16); + } + return reinterpret_cast(end_a.getRecvData())[i & 0xf]; + }; + + // stop thread_b + end_a.exchange(nullptr, 0); + REQUIRE(end_a.getRecvSize() == 0); + + thread_b.join(); +} diff --git a/src/threading/CMakeLists.txt b/src/threading/CMakeLists.txt index 6771b715f..030d37f28 100644 --- a/src/threading/CMakeLists.txt +++ b/src/threading/CMakeLists.txt @@ -2,5 +2,6 @@ set(threading_SRCS ${CMAKE_CURRENT_SOURCE_DIR}/event.cpp ${CMAKE_CURRENT_SOURCE_DIR}/thread.cpp ${CMAKE_CURRENT_SOURCE_DIR}/semaphore.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/ipc_channel.cpp PARENT_SCOPE) diff --git a/src/threading/ipc_channel.cpp b/src/threading/ipc_channel.cpp new file mode 100644 index 000000000..cebac56fa --- /dev/null +++ b/src/threading/ipc_channel.cpp @@ -0,0 +1,386 @@ +// SPDX-FileCopyrightText: 2024 Luanti authors +// +// SPDX-License-Identifier: LGPL-2.1-or-later + +#include "ipc_channel.h" +#include "debug.h" +#include "exceptions.h" +#include "porting.h" +#include +#include +#include + +#if defined(IPC_CHANNEL_IMPLEMENTATION_LINUX_FUTEX) +#include +#include +#include +#endif + +#if defined(IPC_CHANNEL_IMPLEMENTATION_LINUX_FUTEX) && (defined(__i386__) || defined(__x86_64__)) +#include + +#define HAVE_BUSY_WAIT 1 + +[[maybe_unused]] +static void busy_wait(int n) noexcept +{ + for (int i = 0; i < n; i++) + _mm_pause(); +} + +#else +#define HAVE_BUSY_WAIT 0 +#endif + +#if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32) + +// returns false on timeout +static bool wait(HANDLE sem, DWORD timeout) +{ + return WaitForSingleObject(sem, timeout) == WAIT_OBJECT_0; +} + +static void post(HANDLE sem) +{ + if (!ReleaseSemaphore(sem, 1, nullptr)) + FATAL_ERROR("ReleaseSemaphore failed unexpectedly"); +} + +#elif defined(IPC_CHANNEL_IMPLEMENTATION_LINUX_FUTEX) + +static int futex(std::atomic *uaddr, int futex_op, u32 val, + const struct timespec *timeout, u32 *uaddr2, u32 val3) noexcept +{ + return syscall(SYS_futex, uaddr, futex_op, val, timeout, uaddr2, val3); +} + +// timeout is relative +// returns false on timeout +static bool wait(IPCChannelBuffer *buf, const struct timespec *timeout) noexcept +{ + // try busy waiting + for (int i = 0; i < 100; i++) { + // posted? + if (buf->futex.load(std::memory_order_acquire) == 1) { + // yes + // reset it. (relaxed ordering is sufficient, because the other thread + // does not need to see the side effects we did before unposting) + buf->futex.store(0, std::memory_order_relaxed); + return true; + } +#if HAVE_BUSY_WAIT + busy_wait(40); +#else + break; +#endif + } + // wait with futex + while (true) { + // write 2 to show that we're futexing + if (buf->futex.exchange(2, std::memory_order_acquire) == 1) { + // it was posted in the meantime + buf->futex.store(0, std::memory_order_relaxed); + return true; + } + int s = futex(&buf->futex, FUTEX_WAIT, 2, timeout, nullptr, 0); + if (s == -1) { + if (errno == ETIMEDOUT) { + return false; + } else if (errno != EAGAIN && errno != EINTR) { + std::string errmsg = std::string("FUTEX_WAIT failed unexpectedly: ") + + std::strerror(errno); + FATAL_ERROR(errmsg.c_str()); + } + } + } +} + +static void post(IPCChannelBuffer *buf) noexcept +{ + if (buf->futex.exchange(1, std::memory_order_release) == 2) { + // 2 means reader needs to be notified + int s = futex(&buf->futex, FUTEX_WAKE, 1, nullptr, nullptr, 0); + if (s == -1) { + std::string errmsg = std::string("FUTEX_WAKE failed unexpectedly: ") + + std::strerror(errno); + FATAL_ERROR(errmsg.c_str()); + } + } +} + +#elif defined(IPC_CHANNEL_IMPLEMENTATION_POSIX) + +// timeout is absolute (using cond_clockid) +// returns false on timeout +static bool wait(IPCChannelBuffer *buf, const struct timespec *timeout) noexcept +{ + bool timed_out = false; + pthread_mutex_lock(&buf->mutex); + while (!buf->posted) { + if (timeout) { + auto err = pthread_cond_timedwait(&buf->cond, &buf->mutex, timeout); + if (err == ETIMEDOUT) { + timed_out = true; + break; + } + FATAL_ERROR_IF(err != 0 && err != EINTR, "pthread_cond_timedwait failed"); + } else { + pthread_cond_wait(&buf->cond, &buf->mutex); + } + } + buf->posted = false; + pthread_mutex_unlock(&buf->mutex); + return !timed_out; +} + +static void post(IPCChannelBuffer *buf) noexcept +{ + pthread_mutex_lock(&buf->mutex); + buf->posted = true; + pthread_mutex_unlock(&buf->mutex); + pthread_cond_broadcast(&buf->cond); +} + +#endif + +// timeout_ms_abs: absolute timeout (using porting::getTimeMs()), or 0 for no timeout +// returns false on timeout +static bool wait_in(IPCChannelEnd::Dir *dir, u64 timeout_ms_abs) +{ +#if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32) + // Relative time + DWORD timeout = INFINITE; + if (timeout_ms_abs > 0) { + u64 tnow = porting::getTimeMs(); + if (tnow > timeout_ms_abs) + return false; + timeout = (DWORD)(timeout_ms_abs - tnow); + } + return wait(dir->sem_in, timeout); + +#else + struct timespec timeout; + struct timespec *timeoutp = nullptr; + if (timeout_ms_abs > 0) { + u64 tnow = porting::getTimeMs(); + if (tnow > timeout_ms_abs) + return false; + u64 timeout_ms_rel = timeout_ms_abs - tnow; +#if defined(IPC_CHANNEL_IMPLEMENTATION_LINUX_FUTEX) + // Relative time + timeout.tv_sec = 0; + timeout.tv_nsec = 0; +#elif defined(IPC_CHANNEL_IMPLEMENTATION_POSIX) + // Absolute time + FATAL_ERROR_IF(clock_gettime(CLOCK_REALTIME, &timeout) < 0, + "clock_gettime failed"); +#endif + timeout.tv_sec += timeout_ms_rel / 1000; + timeout.tv_nsec += timeout_ms_rel % 1000 * 1000'000L; + // tv_nsec must be smaller than 1 sec, or else pthread_cond_timedwait fails + if (timeout.tv_nsec >= 1000'000'000L) { + timeout.tv_nsec -= 1000'000'000L; + timeout.tv_sec += 1; + } + timeoutp = &timeout; + } + + return wait(dir->buf_in, timeoutp); +#endif +} + +static void post_out(IPCChannelEnd::Dir *dir) +{ +#if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32) + post(dir->sem_out); +#else + post(dir->buf_out); +#endif +} + +template +static void write_once(volatile T *var, const T val) +{ + *var = val; +} + +template +static T read_once(const volatile T *var) +{ + return *var; +} + +IPCChannelBuffer::IPCChannelBuffer() +{ +#if defined(IPC_CHANNEL_IMPLEMENTATION_POSIX) + pthread_condattr_t condattr; + pthread_mutexattr_t mutexattr; + if (pthread_condattr_init(&condattr) != 0) + goto error_condattr_init; + if (pthread_mutexattr_init(&mutexattr) != 0) + goto error_mutexattr_init; + if (pthread_condattr_setpshared(&condattr, PTHREAD_PROCESS_SHARED) != 0) + goto error_condattr_setpshared; + if (pthread_mutexattr_setpshared(&mutexattr, PTHREAD_PROCESS_SHARED) != 0) + goto error_mutexattr_setpshared; + if (pthread_cond_init(&cond, &condattr) != 0) + goto error_cond_init; + if (pthread_mutex_init(&mutex, &mutexattr) != 0) + goto error_mutex_init; + pthread_mutexattr_destroy(&mutexattr); + pthread_condattr_destroy(&condattr); + return; + +error_mutex_init: + pthread_cond_destroy(&cond); +error_cond_init: +error_mutexattr_setpshared: +error_condattr_setpshared: + pthread_mutexattr_destroy(&mutexattr); +error_mutexattr_init: + pthread_condattr_destroy(&condattr); +error_condattr_init: + throw BaseException("Unable to initialize IPCChannelBuffer"); +#endif // defined(IPC_CHANNEL_IMPLEMENTATION_POSIX) +} + +IPCChannelBuffer::~IPCChannelBuffer() +{ +#if defined(IPC_CHANNEL_IMPLEMENTATION_POSIX) + pthread_mutex_destroy(&mutex); + pthread_cond_destroy(&cond); +#endif // defined(IPC_CHANNEL_IMPLEMENTATION_POSIX) +} + +IPCChannelEnd IPCChannelEnd::makeA(std::unique_ptr resources) +{ + IPCChannelShared *shared = resources->data.shared; +#if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32) + HANDLE sem_a = resources->data.sem_a; + HANDLE sem_b = resources->data.sem_b; + return IPCChannelEnd(std::move(resources), Dir{&shared->a, &shared->b, sem_a, sem_b}); +#else + return IPCChannelEnd(std::move(resources), Dir{&shared->a, &shared->b}); +#endif // !defined(IPC_CHANNEL_IMPLEMENTATION_WIN32) +} + +IPCChannelEnd IPCChannelEnd::makeB(std::unique_ptr resources) +{ + IPCChannelShared *shared = resources->data.shared; +#if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32) + HANDLE sem_a = resources->data.sem_a; + HANDLE sem_b = resources->data.sem_b; + return IPCChannelEnd(std::move(resources), Dir{&shared->b, &shared->a, sem_b, sem_a}); +#else + return IPCChannelEnd(std::move(resources), Dir{&shared->b, &shared->a}); +#endif // !defined(IPC_CHANNEL_IMPLEMENTATION_WIN32) +} + +void IPCChannelEnd::sendSmall(const void *data, size_t size) noexcept +{ + write_once(&m_dir.buf_out->size, size); + + if (size != 0) + memcpy(m_dir.buf_out->data, data, size); + + post_out(&m_dir); +} + +bool IPCChannelEnd::sendLarge(const void *data, size_t size, int timeout_ms) noexcept +{ + u64 timeout_ms_abs = timeout_ms < 0 ? 0 : porting::getTimeMs() + timeout_ms; + + write_once(&m_dir.buf_out->size, size); + + do { + memcpy(m_dir.buf_out->data, data, IPC_CHANNEL_MSG_SIZE); + post_out(&m_dir); + + if (!wait_in(&m_dir, timeout_ms_abs)) + return false; + + size -= IPC_CHANNEL_MSG_SIZE; + data = reinterpret_cast(data) + IPC_CHANNEL_MSG_SIZE; + } while (size > IPC_CHANNEL_MSG_SIZE); + + if (size != 0) + memcpy(m_dir.buf_out->data, data, size); + post_out(&m_dir); + + return true; +} + +bool IPCChannelEnd::recvWithTimeout(int timeout_ms) noexcept +{ + // Note about memcpy: If the other thread is evil, it might change the contents + // of the memory while it's memcopied. We're assuming here that memcpy doesn't + // cause vulnerabilities due to this. + + u64 timeout_ms_abs = timeout_ms < 0 ? 0 : porting::getTimeMs() + timeout_ms; + + if (!wait_in(&m_dir, timeout_ms_abs)) + return false; + + size_t size = read_once(&m_dir.buf_in->size); + m_recv_size = size; + + if (size <= IPC_CHANNEL_MSG_SIZE) { + // small msg + // (m_large_recv.size() is always >= IPC_CHANNEL_MSG_SIZE) + if (size != 0) + memcpy(m_large_recv.data(), m_dir.buf_in->data, size); + + } else { + // large msg + try { + m_large_recv.resize(size); + } catch (...) { + // it's ok for us if an attacker wants to make us abort + std::string errmsg = "std::vector::resize failed, size was: " + + std::to_string(size); + FATAL_ERROR(errmsg.c_str()); + } + u8 *recv_data = m_large_recv.data(); + do { + memcpy(recv_data, m_dir.buf_in->data, IPC_CHANNEL_MSG_SIZE); + size -= IPC_CHANNEL_MSG_SIZE; + recv_data += IPC_CHANNEL_MSG_SIZE; + post_out(&m_dir); + if (!wait_in(&m_dir, timeout_ms_abs)) + return false; + } while (size > IPC_CHANNEL_MSG_SIZE); + if (size != 0) + memcpy(recv_data, m_dir.buf_in->data, size); + } + return true; +} + +std::pair make_test_ipc_channel( + const std::function &fun) +{ + auto resource_data = [] { + auto shared = new IPCChannelShared(); + +#ifdef IPC_CHANNEL_IMPLEMENTATION_WIN32 + HANDLE sem_a = CreateSemaphoreA(nullptr, 0, 1, nullptr); + HANDLE sem_b = CreateSemaphoreA(nullptr, 0, 1, nullptr); + FATAL_ERROR_IF(!sem_a || !sem_b, "CreateSemaphoreA failed"); + + return IPCChannelResources::Data{shared, sem_a, sem_b}; +#else + return IPCChannelResources::Data{shared}; +#endif + }(); + + std::thread thread_b([=] { + auto resources_second = IPCChannelResourcesSingleProcess::makeSecond(resource_data); + IPCChannelEnd end_b = IPCChannelEnd::makeB(std::move(resources_second)); + + fun(std::move(end_b)); + }); + + auto resources_first = IPCChannelResourcesSingleProcess::makeFirst(resource_data); + IPCChannelEnd end_a = IPCChannelEnd::makeA(std::move(resources_first)); + + return {std::move(end_a), std::move(thread_b)}; +} diff --git a/src/threading/ipc_channel.h b/src/threading/ipc_channel.h new file mode 100644 index 000000000..71bb2b2f6 --- /dev/null +++ b/src/threading/ipc_channel.h @@ -0,0 +1,276 @@ +// SPDX-FileCopyrightText: 2024 Luanti authors +// +// SPDX-License-Identifier: LGPL-2.1-or-later + +#pragma once + +#include "irrlichttypes.h" +#include "util/basic_macros.h" +#include +#include +#include +#include +#include + +#if defined(_WIN32) +#define IPC_CHANNEL_IMPLEMENTATION_WIN32 +#elif defined(__linux__) +#define IPC_CHANNEL_IMPLEMENTATION_LINUX_FUTEX +#else +#define IPC_CHANNEL_IMPLEMENTATION_POSIX +#endif + +#if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32) +#include +#elif defined(IPC_CHANNEL_IMPLEMENTATION_POSIX) +#include +#endif + +/* + An IPC channel is used for synchronous communication between two processes. + Sending two messages in succession from one end is not allowed; messages + must alternate back and forth. + + IPCChannelShared is situated in shared memory and is used by both ends of + the channel. + + There are currently 3 implementations for synchronisation: + * win32: uses win32 semaphore + * linux: uses futex, and does busy waiting if on x86/x86_64 + * other posix: uses posix mutex and condition variable +*/ + +constexpr size_t IPC_CHANNEL_MSG_SIZE = 0x2000; + +struct IPCChannelBuffer +{ +#if defined(IPC_CHANNEL_IMPLEMENTATION_LINUX_FUTEX) + // possible values: + // 0: futex is not posted. reader will check value before blocking => no + // notify needed when posting + // 1: futex is posted + // 2: futex is not posted. reader is waiting with futex syscall, and needs + // to be notified + std::atomic futex{0}; + +#elif defined(IPC_CHANNEL_IMPLEMENTATION_POSIX) + pthread_cond_t cond; + pthread_mutex_t mutex; + bool posted = false; // protected by mutex +#endif + + // Note: If the other side isn't acting cooperatively, they might write to + // this at any times. So we must make sure to copy out the data once, and + // only access that copy. + size_t size = 0; + u8 data[IPC_CHANNEL_MSG_SIZE] = {}; + + IPCChannelBuffer(); + DISABLE_CLASS_COPY(IPCChannelBuffer) + ~IPCChannelBuffer(); // Note: only destruct once, i.e. in one process +}; + +// Data in shared memory +struct IPCChannelShared +{ + // Both ends unmap, but last deleter also deletes shared resources. + std::atomic refcount{1}; + + IPCChannelBuffer a{}; + IPCChannelBuffer b{}; +}; + +// Interface for managing the shared resources. +// Implementors decide whether to use malloc or mmap. +struct IPCChannelResources +{ + // new struct, because the win32 #if is annoying + struct Data + { + IPCChannelShared *shared = nullptr; + +#if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32) + HANDLE sem_a; + HANDLE sem_b; +#endif + }; + + Data data; + + IPCChannelResources() = default; + DISABLE_CLASS_COPY(IPCChannelResources) + + // Child should call cleanup(). + // (Parent destructor can not do this, because when it's called the child is + // already dead.) + virtual ~IPCChannelResources() = default; + +protected: + // Used for previously unmanaged data_ (move semantics) + void setFirst(Data data_) + { + data = data_; + } + + // Used for data_ that is already managed by an IPCChannelResources (grab() + // semantics) + bool setSecond(Data data_) + { + if (data_.shared->refcount.fetch_add(1) == 0) { + // other end dead, can't use resources + data_.shared->refcount.fetch_sub(1); + return false; + } + data = data_; + return true; + } + + virtual void cleanupLast() noexcept = 0; + virtual void cleanupNotLast() noexcept = 0; + + void cleanup() noexcept + { + if (!data.shared) { + // No owned resources. Maybe setSecond failed. + return; + } + if (data.shared->refcount.fetch_sub(1) == 1) { + // We are last, we clean up. + cleanupLast(); + } else { + // We are not responsible for cleanup. + // Note: Shared resources may already be invalid by now. + cleanupNotLast(); + } + } +}; + +class IPCChannelEnd +{ +public: + // Direction. References into IPCChannelResources. + struct Dir + { + IPCChannelBuffer *buf_in = nullptr; + IPCChannelBuffer *buf_out = nullptr; +#if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32) + HANDLE sem_in; + HANDLE sem_out; +#endif + }; + + // Unusable empty end + IPCChannelEnd() = default; + + // Construct end A or end B from resources + static IPCChannelEnd makeA(std::unique_ptr resources); + static IPCChannelEnd makeB(std::unique_ptr resources); + + // Note: Timeouts may be for receiving any response, not a whole message. + // Therefore, if a timeout occurs, stop using the channel. + + // Returns false on timeout + [[nodiscard]] + bool sendWithTimeout(const void *data, size_t size, int timeout_ms) noexcept + { + if (size <= IPC_CHANNEL_MSG_SIZE) { + sendSmall(data, size); + return true; + } else { + return sendLarge(data, size, timeout_ms); + } + } + + // Same as above + void send(const void *data, size_t size) noexcept + { + (void)sendWithTimeout(data, size, -1); + } + + // Returns false on timeout. + // Otherwise returns true, and data is available via getRecvData(). + [[nodiscard]] + bool recvWithTimeout(int timeout_ms) noexcept; + + // Same as above + void recv() noexcept + { + (void)recvWithTimeout(-1); + } + + // Returns false on timeout + // Otherwise returns true, and data is available via getRecvData(). + [[nodiscard]] + bool exchangeWithTimeout(const void *data, size_t size, int timeout_ms) noexcept + { + return sendWithTimeout(data, size, timeout_ms) + && recvWithTimeout(timeout_ms); + } + + // Same as above + void exchange(const void *data, size_t size) noexcept + { + (void)exchangeWithTimeout(data, size, -1); + } + + // Get the content of the last received message + const void *getRecvData() const noexcept { return m_large_recv.data(); } + size_t getRecvSize() const noexcept { return m_recv_size; } + +private: + IPCChannelEnd(std::unique_ptr resources, Dir dir) : + m_resources(std::move(resources)), m_dir(dir) + {} + + void sendSmall(const void *data, size_t size) noexcept; + + // returns false on timeout + bool sendLarge(const void *data, size_t size, int timeout_ms) noexcept; + + std::unique_ptr m_resources; + Dir m_dir; + size_t m_recv_size = 0; + // we always copy from the shared buffer into this + // (this buffer only grows) + std::vector m_large_recv = std::vector(IPC_CHANNEL_MSG_SIZE); +}; + +// For testing purposes +struct IPCChannelResourcesSingleProcess final : public IPCChannelResources +{ + void cleanupLast() noexcept override + { + delete data.shared; +#ifdef IPC_CHANNEL_IMPLEMENTATION_WIN32 + CloseHandle(data.sem_b); + CloseHandle(data.sem_a); +#endif + } + + void cleanupNotLast() noexcept override + { + // nothing to do (i.e. no unmapping needed) + } + + ~IPCChannelResourcesSingleProcess() override { cleanup(); } + + static std::unique_ptr makeFirst(Data data) + { + auto ret = std::make_unique(); + ret->setFirst(data); + return ret; + } + + static std::unique_ptr makeSecond(Data data) + { + auto ret = std::make_unique(); + ret->setSecond(data); + return ret; + } +}; + +// For testing +// Returns one end and a thread holding the other end. The thread will execute +// fun, and pass it the other end. +std::pair make_test_ipc_channel( + const std::function &fun); diff --git a/src/unittest/test_threading.cpp b/src/unittest/test_threading.cpp index 77842964b..51d757e5e 100644 --- a/src/unittest/test_threading.cpp +++ b/src/unittest/test_threading.cpp @@ -6,6 +6,7 @@ #include #include +#include "threading/ipc_channel.h" #include "threading/semaphore.h" #include "threading/thread.h" @@ -19,6 +20,7 @@ public: void testStartStopWait(); void testAtomicSemaphoreThread(); void testTLS(); + void testIPCChannel(); }; static TestThreading g_test_instance; @@ -28,6 +30,7 @@ void TestThreading::runTests(IGameDef *gamedef) TEST(testStartStopWait); TEST(testAtomicSemaphoreThread); TEST(testTLS); + TEST(testIPCChannel); } class SimpleTestThread : public Thread { @@ -227,3 +230,42 @@ void TestThreading::testTLS() } } } + +void TestThreading::testIPCChannel() +{ + auto [end_a, thread_b] = make_test_ipc_channel([](IPCChannelEnd end_b) { + // echos back messages. stops if "" is sent + while (true) { + UASSERT(end_b.recvWithTimeout(-1)); + UASSERT(end_b.sendWithTimeout(end_b.getRecvData(), end_b.getRecvSize(), -1)); + if (end_b.getRecvSize() == 0) + break; + } + }); + + u8 buf1[20000] = {}; + for (int i = sizeof(buf1); i > 0; i -= 100) { + buf1[i - 1] = 123; + UASSERT(end_a.exchangeWithTimeout(buf1, i, -1)); + UASSERTEQ(int, end_a.getRecvSize(), i); + UASSERTEQ(int, reinterpret_cast(end_a.getRecvData())[i - 1], 123); + } + + u8 buf2[IPC_CHANNEL_MSG_SIZE * 3 + 10]; + end_a.exchange(buf2, IPC_CHANNEL_MSG_SIZE * 3 + 10); + end_a.exchange(buf2, IPC_CHANNEL_MSG_SIZE * 3); + end_a.exchange(buf2, IPC_CHANNEL_MSG_SIZE); + end_a.exchange(buf2, IPC_CHANNEL_MSG_SIZE * 2); + end_a.exchange(buf2, IPC_CHANNEL_MSG_SIZE - 1); + end_a.exchange(buf2, IPC_CHANNEL_MSG_SIZE + 1); + end_a.exchange(buf2, 1); + + // stop thread_b + UASSERT(end_a.exchangeWithTimeout(nullptr, 0, -1)); + UASSERTEQ(int, end_a.getRecvSize(), 0); + + thread_b.join(); + + // other side dead ==> should time out + UASSERT(!end_a.exchangeWithTimeout(nullptr, 0, 200)); +}