diff --git a/src/threading/ipc_channel.cpp b/src/threading/ipc_channel.cpp index 1ea61e7ff..982e3831e 100644 --- a/src/threading/ipc_channel.cpp +++ b/src/threading/ipc_channel.cpp @@ -22,11 +22,12 @@ with this program; if not, write to the Free Software Foundation, Inc., #include "debug.h" #include "exceptions.h" #include "porting.h" -#include +#include #include -#if defined(__linux__) + +#if defined(IPC_CHANNEL_IMPLEMENTATION_LINUX_FUTEX) #include -#include +#include #include #include #if defined(__i386__) || defined(__x86_64__) @@ -36,7 +37,7 @@ with this program; if not, write to the Free Software Foundation, Inc., IPCChannelBuffer::IPCChannelBuffer() { -#if !defined(__linux__) && !defined(_WIN32) +#if defined(IPC_CHANNEL_IMPLEMENTATION_POSIX) pthread_condattr_t condattr; pthread_mutexattr_t mutexattr; if (pthread_condattr_init(&condattr) != 0) @@ -65,18 +66,18 @@ error_mutexattr_init: pthread_condattr_destroy(&condattr); error_condattr_init: throw BaseException("Unable to initialize IPCChannelBuffer"); -#endif // !defined(__linux__) && !defined(_WIN32) +#endif // defined(IPC_CHANNEL_IMPLEMENTATION_POSIX) } IPCChannelBuffer::~IPCChannelBuffer() { -#if !defined(__linux__) && !defined(_WIN32) +#if defined(IPC_CHANNEL_IMPLEMENTATION_POSIX) pthread_mutex_destroy(&mutex); pthread_cond_destroy(&cond); -#endif // !defined(__linux__) && !defined(_WIN32) +#endif // defined(IPC_CHANNEL_IMPLEMENTATION_POSIX) } -#if defined(_WIN32) +#if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32) // returns false on timeout static bool wait(HANDLE sem, DWORD timeout) @@ -92,7 +93,7 @@ static void post(HANDLE sem) #else -#if defined(__linux__) +#if defined(IPC_CHANNEL_IMPLEMENTATION_LINUX_FUTEX) #if defined(__i386__) || defined(__x86_64__) static void busy_wait(int n) noexcept @@ -108,13 +109,13 @@ static int futex(std::atomic *uaddr, int futex_op, u32 val, return syscall(SYS_futex, uaddr, futex_op, val, timeout, uaddr2, val3); } -#endif // defined(__linux__) +#endif // defined(IPC_CHANNEL_IMPLEMENTATION_LINUX_FUTEX) // timeout: relative on linux, and absolute on other posix // returns false on timeout static bool wait(IPCChannelBuffer *buf, const struct timespec *timeout) noexcept { -#if defined(__linux__) +#if defined(IPC_CHANNEL_IMPLEMENTATION_LINUX_FUTEX) // try busy waiting for (int i = 0; i < 100; i++) { // posted? @@ -145,7 +146,7 @@ static bool wait(IPCChannelBuffer *buf, const struct timespec *timeout) noexcept } } } -#else +#elif defined(IPC_CHANNEL_IMPLEMENTATION_POSIX) bool timed_out = false; pthread_mutex_lock(&buf->mutex); if (!buf->posted) { @@ -157,12 +158,12 @@ static bool wait(IPCChannelBuffer *buf, const struct timespec *timeout) noexcept buf->posted = false; pthread_mutex_unlock(&buf->mutex); return !timed_out; -#endif // !defined(__linux__) +#endif } static void post(IPCChannelBuffer *buf) noexcept { -#if defined(__linux__) +#if defined(IPC_CHANNEL_IMPLEMENTATION_LINUX_FUTEX) if (buf->futex.exchange(1) == 2) { // 2 means reader needs to be notified int s = futex(&buf->futex, FUTEX_WAKE, 1, nullptr, nullptr, 0); @@ -172,35 +173,35 @@ static void post(IPCChannelBuffer *buf) noexcept FATAL_ERROR(errmsg.c_str()); } } -#else +#elif defined(IPC_CHANNEL_IMPLEMENTATION_POSIX) pthread_mutex_lock(&buf->mutex); buf->posted = true; pthread_cond_broadcast(&buf->cond); pthread_mutex_unlock(&buf->mutex); -#endif // !defined(__linux__) +#endif } -#endif // !defined(_WIN32) +#endif // !defined(IPC_CHANNEL_IMPLEMENTATION_WIN32) -#if defined(_WIN32) +#if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32) static DWORD get_timeout(int timeout_ms) { return timeout_ms < 0 ? INFINITE : (DWORD)timeout_ms; } -#else +#elif defined(IPC_CHANNEL_IMPLEMENTATION_LINUX_FUTEX) || defined(IPC_CHANNEL_IMPLEMENTATION_POSIX) static struct timespec *set_timespec(struct timespec *ts, int ms) { if (ms < 0) return nullptr; u64 msu = ms; -#if !defined(__linux__) +#if defined(IPC_CHANNEL_IMPLEMENTATION_POSIX) msu += porting::getTimeMs(); // Absolute time #endif ts->tv_sec = msu / 1000; ts->tv_nsec = msu % 1000 * 1000000UL; return ts; } -#endif // !defined(_WIN32) +#endif template static inline void write_once(volatile T *var, const T val) @@ -217,32 +218,32 @@ static inline T read_once(const volatile T *var) IPCChannelEnd IPCChannelEnd::makeA(std::unique_ptr stuff) { IPCChannelShared *shared = stuff->getShared(); -#if defined(_WIN32) +#if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32) HANDLE sem_a = stuff->getSemA(); HANDLE sem_b = stuff->getSemB(); return IPCChannelEnd(std::move(stuff), &shared->a, &shared->b, sem_a, sem_b); #else return IPCChannelEnd(std::move(stuff), &shared->a, &shared->b); -#endif // !defined(_WIN32) +#endif // !defined(IPC_CHANNEL_IMPLEMENTATION_WIN32) } IPCChannelEnd IPCChannelEnd::makeB(std::unique_ptr stuff) { IPCChannelShared *shared = stuff->getShared(); -#if defined(_WIN32) +#if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32) HANDLE sem_a = stuff->getSemA(); HANDLE sem_b = stuff->getSemB(); return IPCChannelEnd(std::move(stuff), &shared->b, &shared->a, sem_b, sem_a); #else return IPCChannelEnd(std::move(stuff), &shared->b, &shared->a); -#endif // !defined(_WIN32) +#endif // !defined(IPC_CHANNEL_IMPLEMENTATION_WIN32) } void IPCChannelEnd::sendSmall(const void *data, size_t size) noexcept { write_once(&m_out->size, size); memcpy(m_out->data, data, size); -#if defined(_WIN32) +#if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32) post(m_sem_out); #else post(m_out); @@ -251,7 +252,7 @@ void IPCChannelEnd::sendSmall(const void *data, size_t size) noexcept bool IPCChannelEnd::sendLarge(const void *data, size_t size, int timeout_ms) noexcept { -#if defined(_WIN32) +#if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32) DWORD timeout = get_timeout(timeout_ms); #else struct timespec timeout; @@ -260,12 +261,12 @@ bool IPCChannelEnd::sendLarge(const void *data, size_t size, int timeout_ms) noe write_once(&m_out->size, size); do { memcpy(m_out->data, data, IPC_CHANNEL_MSG_SIZE); -#if defined(_WIN32) +#if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32) post(m_sem_out); #else post(m_out); #endif -#if defined(_WIN32) +#if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32) if (!wait(m_sem_in, timeout)) #else if (!wait(m_in, timeoutp)) @@ -275,7 +276,7 @@ bool IPCChannelEnd::sendLarge(const void *data, size_t size, int timeout_ms) noe data = (u8 *)data + IPC_CHANNEL_MSG_SIZE; } while (size > IPC_CHANNEL_MSG_SIZE); memcpy(m_out->data, data, size); -#if defined(_WIN32) +#if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32) post(m_sem_out); #else post(m_out); @@ -285,13 +286,13 @@ bool IPCChannelEnd::sendLarge(const void *data, size_t size, int timeout_ms) noe bool IPCChannelEnd::recv(int timeout_ms) noexcept { -#if defined(_WIN32) +#if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32) DWORD timeout = get_timeout(timeout_ms); #else struct timespec timeout; struct timespec *timeoutp = set_timespec(&timeout, timeout_ms); #endif -#if defined(_WIN32) +#if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32) if (!wait(m_sem_in, timeout)) #else if (!wait(m_in, timeoutp)) @@ -316,12 +317,12 @@ bool IPCChannelEnd::recv(int timeout_ms) noexcept memcpy(recv_data, m_in->data, IPC_CHANNEL_MSG_SIZE); size -= IPC_CHANNEL_MSG_SIZE; recv_data += IPC_CHANNEL_MSG_SIZE; -#if defined(_WIN32) +#if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32) post(m_sem_out); #else post(m_out); #endif -#if defined(_WIN32) +#if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32) if (!wait(m_sem_in, timeout)) #else if (!wait(m_in, timeoutp)) diff --git a/src/threading/ipc_channel.h b/src/threading/ipc_channel.h index 8247eddb5..a6ea47303 100644 --- a/src/threading/ipc_channel.h +++ b/src/threading/ipc_channel.h @@ -27,10 +27,18 @@ with this program; if not, write to the Free Software Foundation, Inc., #include #if defined(_WIN32) -#include +#define IPC_CHANNEL_IMPLEMENTATION_WIN32 #elif defined(__linux__) -#include +#define IPC_CHANNEL_IMPLEMENTATION_LINUX_FUTEX #else +#define IPC_CHANNEL_IMPLEMENTATION_POSIX +#endif + +#if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32) +#include +#elif defined(IPC_CHANNEL_IMPLEMENTATION_LINUX_FUTEX) +#include +#elif defined(IPC_CHANNEL_IMPLEMENTATION_POSIX) #include #endif @@ -52,8 +60,7 @@ with this program; if not, write to the Free Software Foundation, Inc., struct IPCChannelBuffer { -#if !defined(_WIN32) -#if defined(__linux__) +#if defined(IPC_CHANNEL_IMPLEMENTATION_LINUX_FUTEX) // possible values: // 0: futex is not posted. reader will check value before blocking => no // notify needed when posting @@ -61,12 +68,13 @@ struct IPCChannelBuffer // 2: futex is not posted. reader is waiting with futex syscall, and needs // to be notified std::atomic futex{0}; -#else + +#elif defined(IPC_CHANNEL_IMPLEMENTATION_POSIX) pthread_cond_t cond; pthread_mutex_t mutex; bool posted = false; // protected by mutex #endif -#endif // !defined(_WIN32) + // Note: If the other side isn't acting cooperatively, they might write to // this at any times. So we must make sure to copy out the data once, and // only access that copy. @@ -89,7 +97,7 @@ struct IPCChannelStuff { virtual ~IPCChannelStuff() = default; virtual IPCChannelShared *getShared() = 0; -#ifdef _WIN32 +#if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32) virtual HANDLE getSemA() = 0; virtual HANDLE getSemB() = 0; #endif @@ -128,7 +136,7 @@ public: inline size_t getRecvSize() const noexcept { return m_recv_size; } private: -#if defined(_WIN32) +#if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32) IPCChannelEnd( std::unique_ptr stuff, IPCChannelBuffer *in, IPCChannelBuffer *out, @@ -154,7 +162,7 @@ private: std::unique_ptr m_stuff; IPCChannelBuffer *m_in = nullptr; IPCChannelBuffer *m_out = nullptr; -#if defined(_WIN32) +#if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32) HANDLE m_sem_in; HANDLE m_sem_out; #endif diff --git a/src/unittest/test_threading.cpp b/src/unittest/test_threading.cpp index 28bde44ee..9b6f4e532 100644 --- a/src/unittest/test_threading.cpp +++ b/src/unittest/test_threading.cpp @@ -6,7 +6,7 @@ #include #include -#if defined(_WIN32) +#if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32) || defined(_WIN32) #include #endif #include "threading/ipc_channel.h" @@ -239,13 +239,13 @@ void TestThreading::testIPCChannel() struct Stuff { IPCChannelShared shared{}; -#if defined(_WIN32) +#if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32) HANDLE sem_a; HANDLE sem_b; #endif Stuff() { -#ifdef _WIN32 +#ifdef IPC_CHANNEL_IMPLEMENTATION_WIN32 HANDLE sem_a = CreateSemaphoreA(nullptr, 0, 1, nullptr); UASSERT(sem_a != INVALID_HANDLE_VALUE); @@ -256,7 +256,7 @@ void TestThreading::testIPCChannel() ~Stuff() { -#ifdef _WIN32 +#ifdef IPC_CHANNEL_IMPLEMENTATION_WIN32 CloseHandle(sem_b); CloseHandle(sem_a); #endif @@ -271,7 +271,7 @@ void TestThreading::testIPCChannel() ~IPCChannelStuffSingleProcess() override = default; IPCChannelShared *getShared() override { return &stuff->shared; } -#if defined(_WIN32) +#if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32) HANDLE getSemA() override { return stuff->sem_a; } HANDLE getSemB() override { return stuff->sem_b; } #endif