diff --git a/src/threading/ipc_channel.cpp b/src/threading/ipc_channel.cpp index 8bd07f252..c57a77310 100644 --- a/src/threading/ipc_channel.cpp +++ b/src/threading/ipc_channel.cpp @@ -20,8 +20,6 @@ with this program; if not, write to the Free Software Foundation, Inc., #include "ipc_channel.h" #include "debug.h" -#include "exceptions.h" -#include "porting.h" #include #include #include @@ -30,9 +28,22 @@ with this program; if not, write to the Free Software Foundation, Inc., #include #include #include -#if defined(__i386__) || defined(__x86_64__) -#include #endif + +#if defined(IPC_CHANNEL_IMPLEMENTATION_LINUX_FUTEX) && (defined(__i386__) || defined(__x86_64__)) +#include + +#define HAVE_BUSY_WAIT 1 + +[[maybe_unused]] +static void busy_wait(int n) noexcept +{ + for (int i = 0; i < n; i++) + _mm_pause(); +} + +#else +#define HAVE_BUSY_WAIT 0 #endif IPCChannelBuffer::IPCChannelBuffer() @@ -91,17 +102,7 @@ static void post(HANDLE sem) FATAL_ERROR("ReleaseSemaphore failed unexpectedly"); } -#else - -#if defined(IPC_CHANNEL_IMPLEMENTATION_LINUX_FUTEX) - -#if defined(__i386__) || defined(__x86_64__) -static void busy_wait(int n) noexcept -{ - for (int i = 0; i < n; i++) - _mm_pause(); -} -#endif // defined(__i386__) || defined(__x86_64__) +#elif defined(IPC_CHANNEL_IMPLEMENTATION_LINUX_FUTEX) static int futex(std::atomic *uaddr, int futex_op, u32 val, const struct timespec *timeout, u32 *uaddr2, u32 val3) noexcept @@ -109,13 +110,10 @@ static int futex(std::atomic *uaddr, int futex_op, u32 val, return syscall(SYS_futex, uaddr, futex_op, val, timeout, uaddr2, val3); } -#endif // defined(IPC_CHANNEL_IMPLEMENTATION_LINUX_FUTEX) - -// timeout: relative on linux, and absolute on other posix +// timeout is relative // returns false on timeout static bool wait(IPCChannelBuffer *buf, const struct timespec *timeout) noexcept { -#if defined(IPC_CHANNEL_IMPLEMENTATION_LINUX_FUTEX) // try busy waiting for (int i = 0; i < 100; i++) { // posted? @@ -126,10 +124,10 @@ static bool wait(IPCChannelBuffer *buf, const struct timespec *timeout) noexcept buf->futex.store(0, std::memory_order_relaxed); return true; } -#if defined(__i386__) || defined(__x86_64__) +#if HAVE_BUSY_WAIT busy_wait(40); #else - break; // Busy wait not implemented + break; #endif } // wait with futex @@ -151,7 +149,27 @@ static bool wait(IPCChannelBuffer *buf, const struct timespec *timeout) noexcept } } } +} + +static void post(IPCChannelBuffer *buf) noexcept +{ + if (buf->futex.exchange(1, std::memory_order_acq_rel) == 2) { + // 2 means reader needs to be notified + int s = futex(&buf->futex, FUTEX_WAKE, 1, nullptr, nullptr, 0); + if (s == -1) { + std::string errmsg = std::string("FUTEX_WAKE failed unexpectedly: ") + + std::strerror(errno); + FATAL_ERROR(errmsg.c_str()); + } + } +} + #elif defined(IPC_CHANNEL_IMPLEMENTATION_POSIX) + +// timeout is absolute +// returns false on timeout +static bool wait(IPCChannelBuffer *buf, const struct timespec *timeout) noexcept +{ bool timed_out = false; pthread_mutex_lock(&buf->mutex); if (!buf->posted) { @@ -163,30 +181,38 @@ static bool wait(IPCChannelBuffer *buf, const struct timespec *timeout) noexcept buf->posted = false; pthread_mutex_unlock(&buf->mutex); return !timed_out; -#endif } static void post(IPCChannelBuffer *buf) noexcept { -#if defined(IPC_CHANNEL_IMPLEMENTATION_LINUX_FUTEX) - if (buf->futex.exchange(1, std::memory_order_acq_rel) == 2) { - // 2 means reader needs to be notified - int s = futex(&buf->futex, FUTEX_WAKE, 1, nullptr, nullptr, 0); - if (s == -1) { - std::string errmsg = std::string("FUTEX_WAKE failed unexpectedly: ") - + std::strerror(errno); - FATAL_ERROR(errmsg.c_str()); - } - } -#elif defined(IPC_CHANNEL_IMPLEMENTATION_POSIX) pthread_mutex_lock(&buf->mutex); buf->posted = true; pthread_cond_broadcast(&buf->cond); pthread_mutex_unlock(&buf->mutex); -#endif } -#endif // !defined(IPC_CHANNEL_IMPLEMENTATION_WIN32) +#endif + +#if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32) +static bool wait_in(IPCChannelEnd::Dir *dir, DWORD timeout) +{ + return wait(dir->sem_in, timeout); +} +#else +static bool wait_in(IPCChannelEnd::Dir *dir, const struct timespec *timeout) +{ + return wait(dir->buf_in, timeout); +} +#endif + +static void post_out(IPCChannelEnd::Dir *dir) +{ +#if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32) + post(dir->sem_out); +#else + post(dir->buf_out); +#endif +} #if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32) static DWORD get_timeout(int timeout_ms) @@ -226,9 +252,9 @@ IPCChannelEnd IPCChannelEnd::makeA(std::unique_ptr resource #if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32) HANDLE sem_a = resources->data.sem_a; HANDLE sem_b = resources->data.sem_b; - return IPCChannelEnd(std::move(resources), &shared->a, &shared->b, sem_a, sem_b); + return IPCChannelEnd(std::move(resources), Dir{&shared->a, &shared->b, sem_a, sem_b}); #else - return IPCChannelEnd(std::move(resources), &shared->a, &shared->b); + return IPCChannelEnd(std::move(resources), Dir{&shared->a, &shared->b}); #endif // !defined(IPC_CHANNEL_IMPLEMENTATION_WIN32) } @@ -238,22 +264,18 @@ IPCChannelEnd IPCChannelEnd::makeB(std::unique_ptr resource #if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32) HANDLE sem_a = resources->data.sem_a; HANDLE sem_b = resources->data.sem_b; - return IPCChannelEnd(std::move(resources), &shared->b, &shared->a, sem_b, sem_a); + return IPCChannelEnd(std::move(resources), Dir{&shared->b, &shared->a, sem_b, sem_a}); #else - return IPCChannelEnd(std::move(resources), &shared->b, &shared->a); + return IPCChannelEnd(std::move(resources), Dir{&shared->b, &shared->a}); #endif // !defined(IPC_CHANNEL_IMPLEMENTATION_WIN32) } void IPCChannelEnd::sendSmall(const void *data, size_t size) noexcept { - write_once(&m_out->size, size); + write_once(&m_dir.buf_out->size, size); if (size != 0) - memcpy(m_out->data, data, size); -#if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32) - post(m_sem_out); -#else - post(m_out); -#endif + memcpy(m_dir.buf_out->data, data, size); + post_out(&m_dir); } bool IPCChannelEnd::sendLarge(const void *data, size_t size, int timeout_ms) noexcept @@ -261,31 +283,21 @@ bool IPCChannelEnd::sendLarge(const void *data, size_t size, int timeout_ms) noe #if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32) DWORD timeout = get_timeout(timeout_ms); #else - struct timespec timeout; - struct timespec *timeoutp = set_timespec(&timeout, timeout_ms); + struct timespec timeout_s; + struct timespec *timeout = set_timespec(&timeout_s, timeout_ms); #endif - write_once(&m_out->size, size); + write_once(&m_dir.buf_out->size, size); do { - memcpy(m_out->data, data, IPC_CHANNEL_MSG_SIZE); -#if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32) - post(m_sem_out); - if (!wait(m_sem_in, timeout)) + memcpy(m_dir.buf_out->data, data, IPC_CHANNEL_MSG_SIZE); + post_out(&m_dir); + if (!wait_in(&m_dir, timeout)) // TODO: always relative timeout, or always absolute return false; -#else - post(m_out); - if (!wait(m_in, timeoutp)) - return false; -#endif size -= IPC_CHANNEL_MSG_SIZE; data = (u8 *)data + IPC_CHANNEL_MSG_SIZE; } while (size > IPC_CHANNEL_MSG_SIZE); if (size != 0) - memcpy(m_out->data, data, size); -#if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32) - post(m_sem_out); -#else - post(m_out); -#endif + memcpy(m_dir.buf_out->data, data, size); + post_out(&m_dir); return true; } @@ -293,15 +305,13 @@ bool IPCChannelEnd::recvWithTimeout(int timeout_ms) noexcept { #if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32) DWORD timeout = get_timeout(timeout_ms); - if (!wait(m_sem_in, timeout)) - return false; #else - struct timespec timeout; - struct timespec *timeoutp = set_timespec(&timeout, timeout_ms); - if (!wait(m_in, timeoutp)) - return false; + struct timespec timeout_s; + struct timespec *timeout = set_timespec(&timeout_s, timeout_ms); #endif - size_t size = read_once(&m_in->size); + if (!wait_in(&m_dir, timeout)) + return false; + size_t size = read_once(&m_dir.buf_in->size); m_recv_size = size; // Note about memcpy: If the other thread is evil, it might change the contents // of the memory while it's memcopied. We're assuming here that memcpy doesn't @@ -309,7 +319,7 @@ bool IPCChannelEnd::recvWithTimeout(int timeout_ms) noexcept if (size <= IPC_CHANNEL_MSG_SIZE) { // small msg // (m_large_recv.size() is always >= IPC_CHANNEL_MSG_SIZE) - memcpy(m_large_recv.data(), m_in->data, size); + memcpy(m_large_recv.data(), m_dir.buf_in->data, size); } else { // large msg try { @@ -322,20 +332,14 @@ bool IPCChannelEnd::recvWithTimeout(int timeout_ms) noexcept } u8 *recv_data = m_large_recv.data(); do { - memcpy(recv_data, m_in->data, IPC_CHANNEL_MSG_SIZE); + memcpy(recv_data, m_dir.buf_in->data, IPC_CHANNEL_MSG_SIZE); size -= IPC_CHANNEL_MSG_SIZE; recv_data += IPC_CHANNEL_MSG_SIZE; -#if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32) - post(m_sem_out); - if (!wait(m_sem_in, timeout)) + post_out(&m_dir); + if (!wait_in(&m_dir, timeout)) return false; -#else - post(m_out); - if (!wait(m_in, timeoutp)) - return false; -#endif } while (size > IPC_CHANNEL_MSG_SIZE); - memcpy(recv_data, m_in->data, size); + memcpy(recv_data, m_dir.buf_in->data, size); } return true; } diff --git a/src/threading/ipc_channel.h b/src/threading/ipc_channel.h index 843ff41e9..c701947e4 100644 --- a/src/threading/ipc_channel.h +++ b/src/threading/ipc_channel.h @@ -23,8 +23,6 @@ with this program; if not, write to the Free Software Foundation, Inc., #include "irrlichttypes.h" #include "util/basic_macros.h" #include -#include -#include #include #include @@ -85,6 +83,7 @@ struct IPCChannelBuffer ~IPCChannelBuffer(); // Note: only destruct once, i.e. in one process }; +// Data in shared memory struct IPCChannelShared { // Both ends unmap, but last deleter also deletes shared resources. @@ -94,6 +93,18 @@ struct IPCChannelShared IPCChannelBuffer b{}; }; +struct IPCChannelDirection +{ + IPCChannelBuffer *buf_in; + IPCChannelBuffer *buf_out; +#if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32) + HANDLE sem_in; + HANDLE sem_out; +#endif +}; + +// Each end holds this. One is A, one is B. +// Implementors of this struct decide how to allocate buffers (i.e. malloc or mmap). struct IPCChannelResources { // new struct, because the win32 #if is annoying @@ -115,7 +126,7 @@ struct IPCChannelResources data = data_; } - // Used for data_ that is already managed by a IPCChannelResources (grab() + // Used for data_ that is already managed by an IPCChannelResources (grab() // semantics) bool setSecond(Data data_) { @@ -159,6 +170,17 @@ struct IPCChannelResources class IPCChannelEnd { public: + // Direction. References into IPCChannelResources. + struct Dir + { + IPCChannelBuffer *buf_in = nullptr; + IPCChannelBuffer *buf_out = nullptr; +#if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32) + HANDLE sem_in; + HANDLE sem_out; +#endif + }; + IPCChannelEnd() = default; static IPCChannelEnd makeA(std::unique_ptr resources); @@ -217,23 +239,9 @@ public: inline size_t getRecvSize() const noexcept { return m_recv_size; } private: -#if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32) - IPCChannelEnd( - std::unique_ptr resources, - IPCChannelBuffer *in, IPCChannelBuffer *out, - HANDLE sem_in, HANDLE sem_out) : - m_resources(std::move(resources)), - m_in(in), m_out(out), - m_sem_in(sem_in), m_sem_out(sem_out) + IPCChannelEnd(std::unique_ptr resources, Dir dir) : + m_resources(std::move(resources)), m_dir(dir) {} -#else - IPCChannelEnd( - std::unique_ptr resources, - IPCChannelBuffer *in, IPCChannelBuffer *out) : - m_resources(std::move(resources)), - m_in(in), m_out(out) - {} -#endif // TODO: u8 *, or string_view? void sendSmall(const void *data, size_t size) noexcept; @@ -242,12 +250,7 @@ private: bool sendLarge(const void *data, size_t size, int timeout_ms) noexcept; std::unique_ptr m_resources; - IPCChannelBuffer *m_in = nullptr; - IPCChannelBuffer *m_out = nullptr; -#if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32) - HANDLE m_sem_in; - HANDLE m_sem_out; -#endif + Dir m_dir; size_t m_recv_size = 0; // we always copy from the shared buffer into this // (this buffer only grows)