1
0
Fork 0
mirror of https://github.com/luanti-org/luanti.git synced 2025-06-27 16:36:03 +00:00
This commit is contained in:
DS 2025-06-27 12:36:29 +03:00 committed by GitHub
commit 8f10a947e3
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 757 additions and 5 deletions

View file

@ -5,7 +5,7 @@ ENV LUAJIT_VERSION v2.1
RUN apk add --no-cache git build-base cmake curl-dev zlib-dev zstd-dev \ RUN apk add --no-cache git build-base cmake curl-dev zlib-dev zstd-dev \
sqlite-dev postgresql-dev hiredis-dev leveldb-dev \ sqlite-dev postgresql-dev hiredis-dev leveldb-dev \
gmp-dev jsoncpp-dev ninja ca-certificates gmp-dev jsoncpp-dev linux-headers ninja ca-certificates
WORKDIR /usr/src/ WORKDIR /usr/src/
RUN git clone --recursive https://github.com/jupp0r/prometheus-cpp && \ RUN git clone --recursive https://github.com/jupp0r/prometheus-cpp && \

View file

@ -22,7 +22,7 @@
For Debian/Ubuntu users: For Debian/Ubuntu users:
sudo apt install g++ make libc6-dev cmake libpng-dev libjpeg-dev libgl1-mesa-dev libsqlite3-dev libogg-dev libvorbis-dev libopenal-dev libcurl4-gnutls-dev libfreetype6-dev zlib1g-dev libgmp-dev libjsoncpp-dev libzstd-dev libluajit-5.1-dev gettext libsdl2-dev sudo apt install g++ make libc6-dev cmake linux-libc-dev libpng-dev libjpeg-dev libgl1-mesa-dev libsqlite3-dev libogg-dev libvorbis-dev libopenal-dev libcurl4-gnutls-dev libfreetype6-dev zlib1g-dev libgmp-dev libjsoncpp-dev libzstd-dev libluajit-5.1-dev gettext libsdl2-dev
For Fedora users: For Fedora users:
@ -34,11 +34,11 @@ For openSUSE users:
For Arch users: For Arch users:
sudo pacman -S --needed base-devel libcurl-gnutls cmake libpng libjpeg-turbo sqlite libogg libvorbis openal freetype2 jsoncpp gmp luajit leveldb ncurses zstd gettext sdl2 sudo pacman -S --needed base-devel libcurl-gnutls cmake linux-api-headers libpng libjpeg-turbo sqlite libogg libvorbis openal freetype2 jsoncpp gmp luajit leveldb ncurses zstd gettext sdl2
For Alpine users: For Alpine users:
sudo apk add build-base cmake libpng-dev jpeg-dev mesa-dev sqlite-dev libogg-dev libvorbis-dev openal-soft-dev curl-dev freetype-dev zlib-dev gmp-dev jsoncpp-dev luajit-dev zstd-dev gettext sdl2-dev sudo apk add build-base cmake linux-headers libpng-dev jpeg-dev mesa-dev sqlite-dev libogg-dev libvorbis-dev openal-soft-dev curl-dev freetype-dev zlib-dev gmp-dev jsoncpp-dev luajit-dev zstd-dev gettext sdl2-dev
For Void users: For Void users:

View file

@ -1,11 +1,12 @@
set (BENCHMARK_SRCS set (BENCHMARK_SRCS
${CMAKE_CURRENT_SOURCE_DIR}/benchmark.cpp ${CMAKE_CURRENT_SOURCE_DIR}/benchmark.cpp
${CMAKE_CURRENT_SOURCE_DIR}/benchmark_activeobjectmgr.cpp ${CMAKE_CURRENT_SOURCE_DIR}/benchmark_activeobjectmgr.cpp
${CMAKE_CURRENT_SOURCE_DIR}/benchmark_ipc_channel.cpp
${CMAKE_CURRENT_SOURCE_DIR}/benchmark_lighting.cpp ${CMAKE_CURRENT_SOURCE_DIR}/benchmark_lighting.cpp
${CMAKE_CURRENT_SOURCE_DIR}/benchmark_serialize.cpp
${CMAKE_CURRENT_SOURCE_DIR}/benchmark_mapblock.cpp ${CMAKE_CURRENT_SOURCE_DIR}/benchmark_mapblock.cpp
${CMAKE_CURRENT_SOURCE_DIR}/benchmark_map.cpp ${CMAKE_CURRENT_SOURCE_DIR}/benchmark_map.cpp
${CMAKE_CURRENT_SOURCE_DIR}/benchmark_mapmodify.cpp ${CMAKE_CURRENT_SOURCE_DIR}/benchmark_mapmodify.cpp
${CMAKE_CURRENT_SOURCE_DIR}/benchmark_serialize.cpp
${CMAKE_CURRENT_SOURCE_DIR}/benchmark_sha.cpp ${CMAKE_CURRENT_SOURCE_DIR}/benchmark_sha.cpp
PARENT_SCOPE) PARENT_SCOPE)

View file

@ -0,0 +1,46 @@
// SPDX-FileCopyrightText: 2024 Luanti authors
//
// SPDX-License-Identifier: LGPL-2.1-or-later
#include "catch.h"
#include "threading/ipc_channel.h"
#include <thread>
TEST_CASE("benchmark_ipc_channel")
{
auto end_a_thread_b_p = make_test_ipc_channel([](IPCChannelEnd end_b) {
// echos back messages. stops if "" is sent
while (true) {
end_b.recv();
end_b.send(end_b.getRecvData(), end_b.getRecvSize());
if (end_b.getRecvSize() == 0)
break;
}
});
// Can't use structured bindings before C++20, because of lamda captures below.
auto end_a = std::move(end_a_thread_b_p.first);
auto thread_b = std::move(end_a_thread_b_p.second);
BENCHMARK("simple_call_1", i) {
u8 buf[16] = {};
buf[i & 0xf] = i;
end_a.exchange(buf, 16);
return reinterpret_cast<const u8 *>(end_a.getRecvData())[i & 0xf];
};
BENCHMARK("simple_call_1000", i) {
u8 buf[16] = {};
buf[i & 0xf] = i;
for (int k = 0; k < 1000; ++k) {
buf[0] = k & 0xff;
end_a.exchange(buf, 16);
}
return reinterpret_cast<const u8 *>(end_a.getRecvData())[i & 0xf];
};
// stop thread_b
end_a.exchange(nullptr, 0);
REQUIRE(end_a.getRecvSize() == 0);
thread_b.join();
}

View file

@ -2,5 +2,6 @@ set(threading_SRCS
${CMAKE_CURRENT_SOURCE_DIR}/event.cpp ${CMAKE_CURRENT_SOURCE_DIR}/event.cpp
${CMAKE_CURRENT_SOURCE_DIR}/thread.cpp ${CMAKE_CURRENT_SOURCE_DIR}/thread.cpp
${CMAKE_CURRENT_SOURCE_DIR}/semaphore.cpp ${CMAKE_CURRENT_SOURCE_DIR}/semaphore.cpp
${CMAKE_CURRENT_SOURCE_DIR}/ipc_channel.cpp
PARENT_SCOPE) PARENT_SCOPE)

View file

@ -0,0 +1,386 @@
// SPDX-FileCopyrightText: 2024 Luanti authors
//
// SPDX-License-Identifier: LGPL-2.1-or-later
#include "ipc_channel.h"
#include "debug.h"
#include "exceptions.h"
#include "porting.h"
#include <cerrno>
#include <utility>
#include <cstring>
#if defined(IPC_CHANNEL_IMPLEMENTATION_LINUX_FUTEX)
#include <linux/futex.h>
#include <sys/syscall.h>
#include <sys/wait.h>
#endif
#if defined(IPC_CHANNEL_IMPLEMENTATION_LINUX_FUTEX) && (defined(__i386__) || defined(__x86_64__))
#include <immintrin.h>
#define HAVE_BUSY_WAIT 1
[[maybe_unused]]
static void busy_wait(int n) noexcept
{
for (int i = 0; i < n; i++)
_mm_pause();
}
#else
#define HAVE_BUSY_WAIT 0
#endif
#if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32)
// returns false on timeout
static bool wait(HANDLE sem, DWORD timeout)
{
return WaitForSingleObject(sem, timeout) == WAIT_OBJECT_0;
}
static void post(HANDLE sem)
{
if (!ReleaseSemaphore(sem, 1, nullptr))
FATAL_ERROR("ReleaseSemaphore failed unexpectedly");
}
#elif defined(IPC_CHANNEL_IMPLEMENTATION_LINUX_FUTEX)
static int futex(std::atomic<u32> *uaddr, int futex_op, u32 val,
const struct timespec *timeout, u32 *uaddr2, u32 val3) noexcept
{
return syscall(SYS_futex, uaddr, futex_op, val, timeout, uaddr2, val3);
}
// timeout is relative
// returns false on timeout
static bool wait(IPCChannelBuffer *buf, const struct timespec *timeout) noexcept
{
// try busy waiting
for (int i = 0; i < 100; i++) {
// posted?
if (buf->futex.load(std::memory_order_acquire) == 1) {
// yes
// reset it. (relaxed ordering is sufficient, because the other thread
// does not need to see the side effects we did before unposting)
buf->futex.store(0, std::memory_order_relaxed);
return true;
}
#if HAVE_BUSY_WAIT
busy_wait(40);
#else
break;
#endif
}
// wait with futex
while (true) {
// write 2 to show that we're futexing
if (buf->futex.exchange(2, std::memory_order_acquire) == 1) {
// it was posted in the meantime
buf->futex.store(0, std::memory_order_relaxed);
return true;
}
int s = futex(&buf->futex, FUTEX_WAIT, 2, timeout, nullptr, 0);
if (s == -1) {
if (errno == ETIMEDOUT) {
return false;
} else if (errno != EAGAIN && errno != EINTR) {
std::string errmsg = std::string("FUTEX_WAIT failed unexpectedly: ")
+ std::strerror(errno);
FATAL_ERROR(errmsg.c_str());
}
}
}
}
static void post(IPCChannelBuffer *buf) noexcept
{
if (buf->futex.exchange(1, std::memory_order_release) == 2) {
// 2 means reader needs to be notified
int s = futex(&buf->futex, FUTEX_WAKE, 1, nullptr, nullptr, 0);
if (s == -1) {
std::string errmsg = std::string("FUTEX_WAKE failed unexpectedly: ")
+ std::strerror(errno);
FATAL_ERROR(errmsg.c_str());
}
}
}
#elif defined(IPC_CHANNEL_IMPLEMENTATION_POSIX)
// timeout is absolute (using cond_clockid)
// returns false on timeout
static bool wait(IPCChannelBuffer *buf, const struct timespec *timeout) noexcept
{
bool timed_out = false;
pthread_mutex_lock(&buf->mutex);
while (!buf->posted) {
if (timeout) {
auto err = pthread_cond_timedwait(&buf->cond, &buf->mutex, timeout);
if (err == ETIMEDOUT) {
timed_out = true;
break;
}
FATAL_ERROR_IF(err != 0 && err != EINTR, "pthread_cond_timedwait failed");
} else {
pthread_cond_wait(&buf->cond, &buf->mutex);
}
}
buf->posted = false;
pthread_mutex_unlock(&buf->mutex);
return !timed_out;
}
static void post(IPCChannelBuffer *buf) noexcept
{
pthread_mutex_lock(&buf->mutex);
buf->posted = true;
pthread_mutex_unlock(&buf->mutex);
pthread_cond_broadcast(&buf->cond);
}
#endif
// timeout_ms_abs: absolute timeout (using porting::getTimeMs()), or 0 for no timeout
// returns false on timeout
static bool wait_in(IPCChannelEnd::Dir *dir, u64 timeout_ms_abs)
{
#if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32)
// Relative time
DWORD timeout = INFINITE;
if (timeout_ms_abs > 0) {
u64 tnow = porting::getTimeMs();
if (tnow > timeout_ms_abs)
return false;
timeout = (DWORD)(timeout_ms_abs - tnow);
}
return wait(dir->sem_in, timeout);
#else
struct timespec timeout;
struct timespec *timeoutp = nullptr;
if (timeout_ms_abs > 0) {
u64 tnow = porting::getTimeMs();
if (tnow > timeout_ms_abs)
return false;
u64 timeout_ms_rel = timeout_ms_abs - tnow;
#if defined(IPC_CHANNEL_IMPLEMENTATION_LINUX_FUTEX)
// Relative time
timeout.tv_sec = 0;
timeout.tv_nsec = 0;
#elif defined(IPC_CHANNEL_IMPLEMENTATION_POSIX)
// Absolute time
FATAL_ERROR_IF(clock_gettime(CLOCK_REALTIME, &timeout) < 0,
"clock_gettime failed");
#endif
timeout.tv_sec += timeout_ms_rel / 1000;
timeout.tv_nsec += timeout_ms_rel % 1000 * 1000'000L;
// tv_nsec must be smaller than 1 sec, or else pthread_cond_timedwait fails
if (timeout.tv_nsec >= 1000'000'000L) {
timeout.tv_nsec -= 1000'000'000L;
timeout.tv_sec += 1;
}
timeoutp = &timeout;
}
return wait(dir->buf_in, timeoutp);
#endif
}
static void post_out(IPCChannelEnd::Dir *dir)
{
#if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32)
post(dir->sem_out);
#else
post(dir->buf_out);
#endif
}
template <typename T>
static void write_once(volatile T *var, const T val)
{
*var = val;
}
template <typename T>
static T read_once(const volatile T *var)
{
return *var;
}
IPCChannelBuffer::IPCChannelBuffer()
{
#if defined(IPC_CHANNEL_IMPLEMENTATION_POSIX)
pthread_condattr_t condattr;
pthread_mutexattr_t mutexattr;
if (pthread_condattr_init(&condattr) != 0)
goto error_condattr_init;
if (pthread_mutexattr_init(&mutexattr) != 0)
goto error_mutexattr_init;
if (pthread_condattr_setpshared(&condattr, PTHREAD_PROCESS_SHARED) != 0)
goto error_condattr_setpshared;
if (pthread_mutexattr_setpshared(&mutexattr, PTHREAD_PROCESS_SHARED) != 0)
goto error_mutexattr_setpshared;
if (pthread_cond_init(&cond, &condattr) != 0)
goto error_cond_init;
if (pthread_mutex_init(&mutex, &mutexattr) != 0)
goto error_mutex_init;
pthread_mutexattr_destroy(&mutexattr);
pthread_condattr_destroy(&condattr);
return;
error_mutex_init:
pthread_cond_destroy(&cond);
error_cond_init:
error_mutexattr_setpshared:
error_condattr_setpshared:
pthread_mutexattr_destroy(&mutexattr);
error_mutexattr_init:
pthread_condattr_destroy(&condattr);
error_condattr_init:
throw BaseException("Unable to initialize IPCChannelBuffer");
#endif // defined(IPC_CHANNEL_IMPLEMENTATION_POSIX)
}
IPCChannelBuffer::~IPCChannelBuffer()
{
#if defined(IPC_CHANNEL_IMPLEMENTATION_POSIX)
pthread_mutex_destroy(&mutex);
pthread_cond_destroy(&cond);
#endif // defined(IPC_CHANNEL_IMPLEMENTATION_POSIX)
}
IPCChannelEnd IPCChannelEnd::makeA(std::unique_ptr<IPCChannelResources> resources)
{
IPCChannelShared *shared = resources->data.shared;
#if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32)
HANDLE sem_a = resources->data.sem_a;
HANDLE sem_b = resources->data.sem_b;
return IPCChannelEnd(std::move(resources), Dir{&shared->a, &shared->b, sem_a, sem_b});
#else
return IPCChannelEnd(std::move(resources), Dir{&shared->a, &shared->b});
#endif // !defined(IPC_CHANNEL_IMPLEMENTATION_WIN32)
}
IPCChannelEnd IPCChannelEnd::makeB(std::unique_ptr<IPCChannelResources> resources)
{
IPCChannelShared *shared = resources->data.shared;
#if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32)
HANDLE sem_a = resources->data.sem_a;
HANDLE sem_b = resources->data.sem_b;
return IPCChannelEnd(std::move(resources), Dir{&shared->b, &shared->a, sem_b, sem_a});
#else
return IPCChannelEnd(std::move(resources), Dir{&shared->b, &shared->a});
#endif // !defined(IPC_CHANNEL_IMPLEMENTATION_WIN32)
}
void IPCChannelEnd::sendSmall(const void *data, size_t size) noexcept
{
write_once(&m_dir.buf_out->size, size);
if (size != 0)
memcpy(m_dir.buf_out->data, data, size);
post_out(&m_dir);
}
bool IPCChannelEnd::sendLarge(const void *data, size_t size, int timeout_ms) noexcept
{
u64 timeout_ms_abs = timeout_ms < 0 ? 0 : porting::getTimeMs() + timeout_ms;
write_once(&m_dir.buf_out->size, size);
do {
memcpy(m_dir.buf_out->data, data, IPC_CHANNEL_MSG_SIZE);
post_out(&m_dir);
if (!wait_in(&m_dir, timeout_ms_abs))
return false;
size -= IPC_CHANNEL_MSG_SIZE;
data = reinterpret_cast<const u8 *>(data) + IPC_CHANNEL_MSG_SIZE;
} while (size > IPC_CHANNEL_MSG_SIZE);
if (size != 0)
memcpy(m_dir.buf_out->data, data, size);
post_out(&m_dir);
return true;
}
bool IPCChannelEnd::recvWithTimeout(int timeout_ms) noexcept
{
// Note about memcpy: If the other thread is evil, it might change the contents
// of the memory while it's memcopied. We're assuming here that memcpy doesn't
// cause vulnerabilities due to this.
u64 timeout_ms_abs = timeout_ms < 0 ? 0 : porting::getTimeMs() + timeout_ms;
if (!wait_in(&m_dir, timeout_ms_abs))
return false;
size_t size = read_once(&m_dir.buf_in->size);
m_recv_size = size;
if (size <= IPC_CHANNEL_MSG_SIZE) {
// small msg
// (m_large_recv.size() is always >= IPC_CHANNEL_MSG_SIZE)
if (size != 0)
memcpy(m_large_recv.data(), m_dir.buf_in->data, size);
} else {
// large msg
try {
m_large_recv.resize(size);
} catch (...) {
// it's ok for us if an attacker wants to make us abort
std::string errmsg = "std::vector::resize failed, size was: "
+ std::to_string(size);
FATAL_ERROR(errmsg.c_str());
}
u8 *recv_data = m_large_recv.data();
do {
memcpy(recv_data, m_dir.buf_in->data, IPC_CHANNEL_MSG_SIZE);
size -= IPC_CHANNEL_MSG_SIZE;
recv_data += IPC_CHANNEL_MSG_SIZE;
post_out(&m_dir);
if (!wait_in(&m_dir, timeout_ms_abs))
return false;
} while (size > IPC_CHANNEL_MSG_SIZE);
if (size != 0)
memcpy(recv_data, m_dir.buf_in->data, size);
}
return true;
}
std::pair<IPCChannelEnd, std::thread> make_test_ipc_channel(
const std::function<void(IPCChannelEnd)> &fun)
{
auto resource_data = [] {
auto shared = new IPCChannelShared();
#ifdef IPC_CHANNEL_IMPLEMENTATION_WIN32
HANDLE sem_a = CreateSemaphoreA(nullptr, 0, 1, nullptr);
HANDLE sem_b = CreateSemaphoreA(nullptr, 0, 1, nullptr);
FATAL_ERROR_IF(!sem_a || !sem_b, "CreateSemaphoreA failed");
return IPCChannelResources::Data{shared, sem_a, sem_b};
#else
return IPCChannelResources::Data{shared};
#endif
}();
std::thread thread_b([=] {
auto resources_second = IPCChannelResourcesSingleProcess::makeSecond(resource_data);
IPCChannelEnd end_b = IPCChannelEnd::makeB(std::move(resources_second));
fun(std::move(end_b));
});
auto resources_first = IPCChannelResourcesSingleProcess::makeFirst(resource_data);
IPCChannelEnd end_a = IPCChannelEnd::makeA(std::move(resources_first));
return {std::move(end_a), std::move(thread_b)};
}

276
src/threading/ipc_channel.h Normal file
View file

@ -0,0 +1,276 @@
// SPDX-FileCopyrightText: 2024 Luanti authors
//
// SPDX-License-Identifier: LGPL-2.1-or-later
#pragma once
#include "irrlichttypes.h"
#include "util/basic_macros.h"
#include <memory>
#include <vector>
#include <atomic>
#include <thread>
#include <functional>
#if defined(_WIN32)
#define IPC_CHANNEL_IMPLEMENTATION_WIN32
#elif defined(__linux__)
#define IPC_CHANNEL_IMPLEMENTATION_LINUX_FUTEX
#else
#define IPC_CHANNEL_IMPLEMENTATION_POSIX
#endif
#if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32)
#include <windows.h>
#elif defined(IPC_CHANNEL_IMPLEMENTATION_POSIX)
#include <pthread.h>
#endif
/*
An IPC channel is used for synchronous communication between two processes.
Sending two messages in succession from one end is not allowed; messages
must alternate back and forth.
IPCChannelShared is situated in shared memory and is used by both ends of
the channel.
There are currently 3 implementations for synchronisation:
* win32: uses win32 semaphore
* linux: uses futex, and does busy waiting if on x86/x86_64
* other posix: uses posix mutex and condition variable
*/
constexpr size_t IPC_CHANNEL_MSG_SIZE = 0x2000;
struct IPCChannelBuffer
{
#if defined(IPC_CHANNEL_IMPLEMENTATION_LINUX_FUTEX)
// possible values:
// 0: futex is not posted. reader will check value before blocking => no
// notify needed when posting
// 1: futex is posted
// 2: futex is not posted. reader is waiting with futex syscall, and needs
// to be notified
std::atomic<u32> futex{0};
#elif defined(IPC_CHANNEL_IMPLEMENTATION_POSIX)
pthread_cond_t cond;
pthread_mutex_t mutex;
bool posted = false; // protected by mutex
#endif
// Note: If the other side isn't acting cooperatively, they might write to
// this at any times. So we must make sure to copy out the data once, and
// only access that copy.
size_t size = 0;
u8 data[IPC_CHANNEL_MSG_SIZE] = {};
IPCChannelBuffer();
DISABLE_CLASS_COPY(IPCChannelBuffer)
~IPCChannelBuffer(); // Note: only destruct once, i.e. in one process
};
// Data in shared memory
struct IPCChannelShared
{
// Both ends unmap, but last deleter also deletes shared resources.
std::atomic<u32> refcount{1};
IPCChannelBuffer a{};
IPCChannelBuffer b{};
};
// Interface for managing the shared resources.
// Implementors decide whether to use malloc or mmap.
struct IPCChannelResources
{
// new struct, because the win32 #if is annoying
struct Data
{
IPCChannelShared *shared = nullptr;
#if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32)
HANDLE sem_a;
HANDLE sem_b;
#endif
};
Data data;
IPCChannelResources() = default;
DISABLE_CLASS_COPY(IPCChannelResources)
// Child should call cleanup().
// (Parent destructor can not do this, because when it's called the child is
// already dead.)
virtual ~IPCChannelResources() = default;
protected:
// Used for previously unmanaged data_ (move semantics)
void setFirst(Data data_)
{
data = data_;
}
// Used for data_ that is already managed by an IPCChannelResources (grab()
// semantics)
bool setSecond(Data data_)
{
if (data_.shared->refcount.fetch_add(1) == 0) {
// other end dead, can't use resources
data_.shared->refcount.fetch_sub(1);
return false;
}
data = data_;
return true;
}
virtual void cleanupLast() noexcept = 0;
virtual void cleanupNotLast() noexcept = 0;
void cleanup() noexcept
{
if (!data.shared) {
// No owned resources. Maybe setSecond failed.
return;
}
if (data.shared->refcount.fetch_sub(1) == 1) {
// We are last, we clean up.
cleanupLast();
} else {
// We are not responsible for cleanup.
// Note: Shared resources may already be invalid by now.
cleanupNotLast();
}
}
};
class IPCChannelEnd
{
public:
// Direction. References into IPCChannelResources.
struct Dir
{
IPCChannelBuffer *buf_in = nullptr;
IPCChannelBuffer *buf_out = nullptr;
#if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32)
HANDLE sem_in;
HANDLE sem_out;
#endif
};
// Unusable empty end
IPCChannelEnd() = default;
// Construct end A or end B from resources
static IPCChannelEnd makeA(std::unique_ptr<IPCChannelResources> resources);
static IPCChannelEnd makeB(std::unique_ptr<IPCChannelResources> resources);
// Note: Timeouts may be for receiving any response, not a whole message.
// Therefore, if a timeout occurs, stop using the channel.
// Returns false on timeout
[[nodiscard]]
bool sendWithTimeout(const void *data, size_t size, int timeout_ms) noexcept
{
if (size <= IPC_CHANNEL_MSG_SIZE) {
sendSmall(data, size);
return true;
} else {
return sendLarge(data, size, timeout_ms);
}
}
// Same as above
void send(const void *data, size_t size) noexcept
{
(void)sendWithTimeout(data, size, -1);
}
// Returns false on timeout.
// Otherwise returns true, and data is available via getRecvData().
[[nodiscard]]
bool recvWithTimeout(int timeout_ms) noexcept;
// Same as above
void recv() noexcept
{
(void)recvWithTimeout(-1);
}
// Returns false on timeout
// Otherwise returns true, and data is available via getRecvData().
[[nodiscard]]
bool exchangeWithTimeout(const void *data, size_t size, int timeout_ms) noexcept
{
return sendWithTimeout(data, size, timeout_ms)
&& recvWithTimeout(timeout_ms);
}
// Same as above
void exchange(const void *data, size_t size) noexcept
{
(void)exchangeWithTimeout(data, size, -1);
}
// Get the content of the last received message
const void *getRecvData() const noexcept { return m_large_recv.data(); }
size_t getRecvSize() const noexcept { return m_recv_size; }
private:
IPCChannelEnd(std::unique_ptr<IPCChannelResources> resources, Dir dir) :
m_resources(std::move(resources)), m_dir(dir)
{}
void sendSmall(const void *data, size_t size) noexcept;
// returns false on timeout
bool sendLarge(const void *data, size_t size, int timeout_ms) noexcept;
std::unique_ptr<IPCChannelResources> m_resources;
Dir m_dir;
size_t m_recv_size = 0;
// we always copy from the shared buffer into this
// (this buffer only grows)
std::vector<u8> m_large_recv = std::vector<u8>(IPC_CHANNEL_MSG_SIZE);
};
// For testing purposes
struct IPCChannelResourcesSingleProcess final : public IPCChannelResources
{
void cleanupLast() noexcept override
{
delete data.shared;
#ifdef IPC_CHANNEL_IMPLEMENTATION_WIN32
CloseHandle(data.sem_b);
CloseHandle(data.sem_a);
#endif
}
void cleanupNotLast() noexcept override
{
// nothing to do (i.e. no unmapping needed)
}
~IPCChannelResourcesSingleProcess() override { cleanup(); }
static std::unique_ptr<IPCChannelResourcesSingleProcess> makeFirst(Data data)
{
auto ret = std::make_unique<IPCChannelResourcesSingleProcess>();
ret->setFirst(data);
return ret;
}
static std::unique_ptr<IPCChannelResourcesSingleProcess> makeSecond(Data data)
{
auto ret = std::make_unique<IPCChannelResourcesSingleProcess>();
ret->setSecond(data);
return ret;
}
};
// For testing
// Returns one end and a thread holding the other end. The thread will execute
// fun, and pass it the other end.
std::pair<IPCChannelEnd, std::thread> make_test_ipc_channel(
const std::function<void(IPCChannelEnd)> &fun);

View file

@ -6,6 +6,7 @@
#include <atomic> #include <atomic>
#include <iostream> #include <iostream>
#include "threading/ipc_channel.h"
#include "threading/semaphore.h" #include "threading/semaphore.h"
#include "threading/thread.h" #include "threading/thread.h"
@ -19,6 +20,7 @@ public:
void testStartStopWait(); void testStartStopWait();
void testAtomicSemaphoreThread(); void testAtomicSemaphoreThread();
void testTLS(); void testTLS();
void testIPCChannel();
}; };
static TestThreading g_test_instance; static TestThreading g_test_instance;
@ -28,6 +30,7 @@ void TestThreading::runTests(IGameDef *gamedef)
TEST(testStartStopWait); TEST(testStartStopWait);
TEST(testAtomicSemaphoreThread); TEST(testAtomicSemaphoreThread);
TEST(testTLS); TEST(testTLS);
TEST(testIPCChannel);
} }
class SimpleTestThread : public Thread { class SimpleTestThread : public Thread {
@ -227,3 +230,42 @@ void TestThreading::testTLS()
} }
} }
} }
void TestThreading::testIPCChannel()
{
auto [end_a, thread_b] = make_test_ipc_channel([](IPCChannelEnd end_b) {
// echos back messages. stops if "" is sent
while (true) {
UASSERT(end_b.recvWithTimeout(-1));
UASSERT(end_b.sendWithTimeout(end_b.getRecvData(), end_b.getRecvSize(), -1));
if (end_b.getRecvSize() == 0)
break;
}
});
u8 buf1[20000] = {};
for (int i = sizeof(buf1); i > 0; i -= 100) {
buf1[i - 1] = 123;
UASSERT(end_a.exchangeWithTimeout(buf1, i, -1));
UASSERTEQ(int, end_a.getRecvSize(), i);
UASSERTEQ(int, reinterpret_cast<const u8 *>(end_a.getRecvData())[i - 1], 123);
}
u8 buf2[IPC_CHANNEL_MSG_SIZE * 3 + 10];
end_a.exchange(buf2, IPC_CHANNEL_MSG_SIZE * 3 + 10);
end_a.exchange(buf2, IPC_CHANNEL_MSG_SIZE * 3);
end_a.exchange(buf2, IPC_CHANNEL_MSG_SIZE);
end_a.exchange(buf2, IPC_CHANNEL_MSG_SIZE * 2);
end_a.exchange(buf2, IPC_CHANNEL_MSG_SIZE - 1);
end_a.exchange(buf2, IPC_CHANNEL_MSG_SIZE + 1);
end_a.exchange(buf2, 1);
// stop thread_b
UASSERT(end_a.exchangeWithTimeout(nullptr, 0, -1));
UASSERTEQ(int, end_a.getRecvSize(), 0);
thread_b.join();
// other side dead ==> should time out
UASSERT(!end_a.exchangeWithTimeout(nullptr, 0, 200));
}