diff --git a/src/threading/CMakeLists.txt b/src/threading/CMakeLists.txt index 6771b715f..030d37f28 100644 --- a/src/threading/CMakeLists.txt +++ b/src/threading/CMakeLists.txt @@ -2,5 +2,6 @@ set(threading_SRCS ${CMAKE_CURRENT_SOURCE_DIR}/event.cpp ${CMAKE_CURRENT_SOURCE_DIR}/thread.cpp ${CMAKE_CURRENT_SOURCE_DIR}/semaphore.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/ipc_channel.cpp PARENT_SCOPE) diff --git a/src/threading/ipc_channel.cpp b/src/threading/ipc_channel.cpp new file mode 100644 index 000000000..082c20461 --- /dev/null +++ b/src/threading/ipc_channel.cpp @@ -0,0 +1,281 @@ +/* +Minetest +Copyright (C) 2022 Desour +Copyright (C) 2022 TurkeyMcMac, Jude Melton-Houghton + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU Lesser General Public License as published by +the Free Software Foundation; either version 2.1 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Lesser General Public License for more details. + +You should have received a copy of the GNU Lesser General Public License along +with this program; if not, write to the Free Software Foundation, Inc., +51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +*/ + +#include "ipc_channel.h" +#include "debug.h" +#include "exceptions.h" +#include "porting.h" +#include +#include +#if defined(__linux__) +#include +#include +#include +#include +#if defined(__i386__) || defined(__x86_64__) +#include +#endif +#endif + +IPCChannelBuffer::IPCChannelBuffer() +{ +#if !defined(__linux__) && !defined(_WIN32) + pthread_condattr_t condattr; + pthread_mutexattr_t mutexattr; + if (pthread_condattr_init(&condattr) != 0) + goto error_condattr_init; + if (pthread_mutexattr_init(&mutexattr) != 0) + goto error_mutexattr_init; + if (pthread_condattr_setpshared(&condattr, 1) != 0) + goto error_condattr_setpshared; + if (pthread_mutexattr_setpshared(&mutexattr, 1) != 0) + goto error_mutexattr_setpshared; + if (pthread_cond_init(&cond, &condattr) != 0) + goto error_cond_init; + if (pthread_mutex_init(&mutex, &mutexattr) != 0) + goto error_mutex_init; + pthread_mutexattr_destroy(&mutexattr); + pthread_condattr_destroy(&condattr); + return; + +error_mutex_init: + pthread_cond_destroy(&cond); +error_cond_init: +error_mutexattr_setpshared: +error_condattr_setpshared: + pthread_mutexattr_destroy(&mutexattr); +error_mutexattr_init: + pthread_condattr_destroy(&condattr); +error_condattr_init: + throw BaseException("Unable to initialize IPCChannelBuffer"); +#endif // !defined(__linux__) && !defined(_WIN32) +} + +IPCChannelBuffer::~IPCChannelBuffer() +{ +#if !defined(__linux__) && !defined(_WIN32) + pthread_mutex_destroy(&mutex); + pthread_cond_destroy(&cond); +#endif // !defined(__linux__) && !defined(_WIN32) +} + +#if defined(_WIN32) + +static bool wait(HANDLE sem, DWORD timeout) +{ + return WaitForSingleObject(sem, timeout) == WAIT_OBJECT_0; +} + +static void post(HANDLE sem) +{ + if (!ReleaseSemaphore(sem, 1, nullptr)) + FATAL_ERROR("ReleaseSemaphore failed unexpectedly"); +} + +#else + +#if defined(__linux__) + +#if defined(__i386__) || defined(__x86_64__) +static void busy_wait(int n) noexcept +{ + for (int i = 0; i < n; i++) + _mm_pause(); +} +#endif // defined(__i386__) || defined(__x86_64__) + +static int futex(std::atomic *uaddr, int futex_op, u32 val, + const struct timespec *timeout, u32 *uaddr2, u32 val3) noexcept +{ + return syscall(SYS_futex, uaddr, futex_op, val, timeout, uaddr2, val3); +} + +#endif // defined(__linux__) + +static bool wait(IPCChannelBuffer *buf, const struct timespec *timeout) noexcept +{ +#if defined(__linux__) + // try busy waiting + for (int i = 0; i < 100; i++) { + // posted? + if (buf->futex.exchange(0) == 1) + return true; // yes +#if defined(__i386__) || defined(__x86_64__) + busy_wait(40); +#else + break; // Busy wait not implemented +#endif + } + // wait with futex + while (true) { + // write 2 to show that we're futexing + if (buf->futex.exchange(2) == 1) { + // futex was posted => change 2 to 0 (or 1 to 1) + buf->futex.fetch_and(1); + return true; + } + int s = futex(&buf->futex, FUTEX_WAIT, 2, timeout, nullptr, 0); + if (s == -1 && errno != EAGAIN) + return false; + } +#else + bool timed_out = false; + pthread_mutex_lock(&buf->mutex); + if (!buf->posted) { + if (timeout) + timed_out = pthread_cond_timedwait(&buf->cond, &buf->mutex, timeout) == ETIMEDOUT; + else + pthread_cond_wait(&buf->cond, &buf->mutex); + } + buf->posted = false; + pthread_mutex_unlock(&buf->mutex); + return !timed_out; +#endif // !defined(__linux__) +} + +static void post(IPCChannelBuffer *buf) noexcept +{ +#if defined(__linux__) + if (buf->futex.exchange(1) == 2) { + int s = futex(&buf->futex, FUTEX_WAKE, 1, nullptr, nullptr, 0); + if (s == -1) + FATAL_ERROR("FUTEX_WAKE failed unexpectedly"); + } +#else + pthread_mutex_lock(&buf->mutex); + buf->posted = true; + pthread_cond_broadcast(&buf->cond); + pthread_mutex_unlock(&buf->mutex); +#endif // !defined(__linux__) +} + +#endif // !defined(_WIN32) + +#if defined(_WIN32) +static DWORD get_timeout(int timeout_ms) +{ + return timeout_ms < 0 ? INFINITE : (DWORD)timeout_ms; +} +#else +static struct timespec *set_timespec(struct timespec *ts, int ms) +{ + if (ms < 0) + return nullptr; + u64 msu = ms; +#if !defined(__linux__) + msu += porting::getTimeMs(); // Absolute time +#endif + ts->tv_sec = msu / 1000; + ts->tv_nsec = msu % 1000 * 1000000UL; + return ts; +} +#endif // !defined(_WIN32) + +bool IPCChannelEnd::sendSmall(const void *data, size_t size) noexcept +{ + m_out->size = size; + memcpy(m_out->data, data, size); +#if defined(_WIN32) + post(m_sem_out); +#else + post(m_out); +#endif + return true; +} + +bool IPCChannelEnd::sendLarge(const void *data, size_t size, int timeout_ms) noexcept +{ +#if defined(_WIN32) + DWORD timeout = get_timeout(timeout_ms); +#else + struct timespec timeout, *timeoutp = set_timespec(&timeout, timeout_ms); +#endif + m_out->size = size; + do { + memcpy(m_out->data, data, IPC_CHANNEL_MSG_SIZE); +#if defined(_WIN32) + post(m_sem_out); +#else + post(m_out); +#endif +#if defined(_WIN32) + if (!wait(m_sem_in, timeout)) +#else + if (!wait(m_in, timeoutp)) +#endif + return false; + size -= IPC_CHANNEL_MSG_SIZE; + data = (u8 *)data + IPC_CHANNEL_MSG_SIZE; + } while (size > IPC_CHANNEL_MSG_SIZE); + memcpy(m_out->data, data, size); +#if defined(_WIN32) + post(m_sem_out); +#else + post(m_out); +#endif + return true; +} + +bool IPCChannelEnd::recv(int timeout_ms) noexcept +{ +#if defined(_WIN32) + DWORD timeout = get_timeout(timeout_ms); +#else + struct timespec timeout, *timeoutp = set_timespec(&timeout, timeout_ms); +#endif +#if defined(_WIN32) + if (!wait(m_sem_in, timeout)) +#else + if (!wait(m_in, timeoutp)) +#endif + return false; + size_t size = m_in->size; + if (size <= IPC_CHANNEL_MSG_SIZE) { + m_recv_size = size; + m_recv_data = m_in->data; + } else { + try { + m_large_recv.resize(size); + } catch (...) { + return false; + } + u8 *recv_data = m_large_recv.data(); + m_recv_size = size; + m_recv_data = recv_data; + do { + memcpy(recv_data, m_in->data, IPC_CHANNEL_MSG_SIZE); + size -= IPC_CHANNEL_MSG_SIZE; + recv_data += IPC_CHANNEL_MSG_SIZE; +#if defined(_WIN32) + post(m_sem_out); +#else + post(m_out); +#endif +#if defined(_WIN32) + if (!wait(m_sem_in, timeout)) +#else + if (!wait(m_in, timeoutp)) +#endif + return false; + } while (size > IPC_CHANNEL_MSG_SIZE); + memcpy(recv_data, m_in->data, size); + } + return true; +} diff --git a/src/threading/ipc_channel.h b/src/threading/ipc_channel.h new file mode 100644 index 000000000..3b1a44d29 --- /dev/null +++ b/src/threading/ipc_channel.h @@ -0,0 +1,143 @@ +/* +Minetest +Copyright (C) 2022 Desour +Copyright (C) 2022 TurkeyMcMac, Jude Melton-Houghton + +This program is free software; you can redistribute it and/or modify +it under the terms of the GNU Lesser General Public License as published by +the Free Software Foundation; either version 2.1 of the License, or +(at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Lesser General Public License for more details. + +You should have received a copy of the GNU Lesser General Public License along +with this program; if not, write to the Free Software Foundation, Inc., +51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +*/ + +#pragma once + +#include "irrlichttypes.h" +#include +#include +#include +#if defined(_WIN32) +#include +#elif defined(__linux__) +#include +#else +#include +#endif + +/* + An IPC channel is used for synchronous communication between two processes. + Sending two messages in succession from one end is not allowed; messages + must alternate back and forth. + + IPCChannelShared is situated in shared memory and is used by both ends of + the channel. +*/ + +#define IPC_CHANNEL_MSG_SIZE 8192U + +struct IPCChannelBuffer +{ +#if !defined(_WIN32) +#if defined(__linux__) + std::atomic futex = ATOMIC_VAR_INIT(0U); +#else + pthread_cond_t cond; + pthread_mutex_t mutex; + // TODO: use atomic? + bool posted = false; +#endif +#endif // !defined(_WIN32) + size_t size; + u8 data[IPC_CHANNEL_MSG_SIZE]; + + IPCChannelBuffer(); + ~IPCChannelBuffer(); +}; + +struct IPCChannelShared +{ + IPCChannelBuffer a; + IPCChannelBuffer b; +}; + +class IPCChannelEnd +{ +public: + IPCChannelEnd() = default; + +#if defined(_WIN32) + static IPCChannelEnd makeA(IPCChannelShared *shared, HANDLE sem_a, HANDLE sem_b) + { + return IPCChannelEnd(&shared->a, &shared->b, sem_a, sem_b); + } + + static IPCChannelEnd makeB(IPCChannelShared *shared, HANDLE sem_a, HANDLE sem_b) + { + return IPCChannelEnd(&shared->b, &shared->a, sem_b, sem_a); + } +#else + static IPCChannelEnd makeA(IPCChannelShared *shared) + { + return IPCChannelEnd(&shared->a, &shared->b); + } + + static IPCChannelEnd makeB(IPCChannelShared *shared) + { + return IPCChannelEnd(&shared->b, &shared->a); + } +#endif // !defined(_WIN32) + + // If send, recv, or exchange return false, stop using the channel. + // Note: timeouts may be for receiving any response, not a whole message. + + bool send(const void *data, size_t size, int timeout_ms = -1) noexcept + { + if (size <= IPC_CHANNEL_MSG_SIZE) { + return sendSmall(data, size); + } else { + return sendLarge(data, size, timeout_ms); + } + } + + bool recv(int timeout_ms = -1) noexcept; + + bool exchange(const void *data, size_t size, int timeout_ms = -1) noexcept + { + return send(data, size, timeout_ms) && recv(timeout_ms); + } + + // Get information about the last received message + inline const void *getRecvData() const noexcept { return m_recv_data; } + inline size_t getRecvSize() const noexcept { return m_recv_size; } + +private: +#if defined(_WIN32) + IPCChannelEnd(IPCChannelBuffer *in, IPCChannelBuffer *out, HANDLE sem_in, HANDLE sem_out): + m_in(in), m_out(out), m_sem_in(sem_in), m_sem_out(sem_out) + {} +#else + IPCChannelEnd(IPCChannelBuffer *in, IPCChannelBuffer *out): m_in(in), m_out(out) {} +#endif + + bool sendSmall(const void *data, size_t size) noexcept; + + bool sendLarge(const void *data, size_t size, int timeout_ms) noexcept; + + IPCChannelBuffer *m_in = nullptr; + IPCChannelBuffer *m_out = nullptr; +#if defined(_WIN32) + HANDLE m_sem_in; + HANDLE m_sem_out; +#endif + const void *m_recv_data; + size_t m_recv_size; + std::vector m_large_recv; +}; diff --git a/src/unittest/test_threading.cpp b/src/unittest/test_threading.cpp index 77842964b..71742e6c1 100644 --- a/src/unittest/test_threading.cpp +++ b/src/unittest/test_threading.cpp @@ -6,6 +6,10 @@ #include #include +#if defined(_WIN32) +#include +#endif +#include "threading/ipc_channel.h" #include "threading/semaphore.h" #include "threading/thread.h" @@ -19,6 +23,7 @@ public: void testStartStopWait(); void testAtomicSemaphoreThread(); void testTLS(); + void testIPCChannel(); }; static TestThreading g_test_instance; @@ -28,6 +33,7 @@ void TestThreading::runTests(IGameDef *gamedef) TEST(testStartStopWait); TEST(testAtomicSemaphoreThread); TEST(testTLS); + TEST(testIPCChannel); } class SimpleTestThread : public Thread { @@ -227,3 +233,58 @@ void TestThreading::testTLS() } } } + +void TestThreading::testIPCChannel() +{ +#if defined(_WIN32) + HANDLE sem_a = CreateSemaphoreA(nullptr, 0, 1, nullptr); + UASSERT(sem_a != INVALID_HANDLE_VALUE); + + HANDLE sem_b = CreateSemaphoreA(nullptr, 0, 1, nullptr); + UASSERT(sem_b != INVALID_HANDLE_VALUE); +#endif + + IPCChannelShared shared, *sharedp = &shared; + +#if defined(_WIN32) + IPCChannelEnd end_a = IPCChannelEnd::makeA(sharedp, sem_a, sem_b); +#else + IPCChannelEnd end_a = IPCChannelEnd::makeA(sharedp); +#endif + + std::thread thread_b([=] { +#if defined(_WIN32) + IPCChannelEnd end_b = IPCChannelEnd::makeB(sharedp, sem_a, sem_b); +#else + IPCChannelEnd end_b = IPCChannelEnd::makeB(sharedp); +#endif + + for (;;) { + end_b.recv(); + end_b.send(end_b.getRecvData(), end_b.getRecvSize()); + if (end_b.getRecvSize() == 0) + break; + } + }); + + char buf[20000] = {}; + for (int i = sizeof(buf); i > 0; i -= 1000) { + buf[i - 1] = 123; + end_a.exchange(buf, i); + UASSERTEQ(int, end_a.getRecvSize(), i); + UASSERTEQ(int, ((const char *)end_a.getRecvData())[i - 1], 123); + } + + end_a.exchange(buf, 0); + UASSERTEQ(int, end_a.getRecvSize(), 0); + + thread_b.join(); + + UASSERT(!end_a.exchange(buf, 0, 1000)); + +#if defined(_WIN32) + CloseHandle(sem_b); + + CloseHandle(sem_a); +#endif +}