From 5f13c0aa4863c814e7c5264bb1edccc106e24882 Mon Sep 17 00:00:00 2001 From: Desour Date: Thu, 8 Jun 2023 01:45:26 +0200 Subject: [PATCH] add some comments, and improve error handling --- src/threading/ipc_channel.cpp | 57 +++++++++++++++++++++-------------- src/threading/ipc_channel.h | 30 ++++++++++++------ 2 files changed, 55 insertions(+), 32 deletions(-) diff --git a/src/threading/ipc_channel.cpp b/src/threading/ipc_channel.cpp index 202a91ecb..113a7f701 100644 --- a/src/threading/ipc_channel.cpp +++ b/src/threading/ipc_channel.cpp @@ -78,6 +78,7 @@ IPCChannelBuffer::~IPCChannelBuffer() #if defined(_WIN32) +// returns false on timeout static bool wait(HANDLE sem, DWORD timeout) { return WaitForSingleObject(sem, timeout) == WAIT_OBJECT_0; @@ -109,6 +110,8 @@ static int futex(std::atomic *uaddr, int futex_op, u32 val, #endif // defined(__linux__) +// timeout: relative on linux, and absolute on other posix +// returns false on timeout static bool wait(IPCChannelBuffer *buf, const struct timespec *timeout) noexcept { #if defined(__linux__) @@ -132,8 +135,15 @@ static bool wait(IPCChannelBuffer *buf, const struct timespec *timeout) noexcept return true; } int s = futex(&buf->futex, FUTEX_WAIT, 2, timeout, nullptr, 0); - if (s == -1 && errno != EAGAIN) - return false; + if (s == -1) { + if (errno == ETIMEDOUT) { + return false; + } else if (errno != EAGAIN && errno != EINTR) { + std::string errmsg = std::string("FUTEX_WAIT failed unexpectedly: ") + + std::strerror(errno); + FATAL_ERROR(errmsg.c_str()); + } + } } #else bool timed_out = false; @@ -154,9 +164,13 @@ static void post(IPCChannelBuffer *buf) noexcept { #if defined(__linux__) if (buf->futex.exchange(1) == 2) { + // 2 means reader needs to be notified int s = futex(&buf->futex, FUTEX_WAKE, 1, nullptr, nullptr, 0); - if (s == -1) - FATAL_ERROR("FUTEX_WAKE failed unexpectedly"); + if (s == -1) { + std::string errmsg = std::string("FUTEX_WAKE failed unexpectedly: ") + + std::strerror(errno); + FATAL_ERROR(errmsg.c_str()); + } } #else pthread_mutex_lock(&buf->mutex); @@ -188,38 +202,31 @@ static struct timespec *set_timespec(struct timespec *ts, int ms) } #endif // !defined(_WIN32) -#if defined(_WIN32) IPCChannelEnd IPCChannelEnd::makeA(std::unique_ptr stuff) { IPCChannelShared *shared = stuff->getShared(); +#if defined(_WIN32) HANDLE sem_a = stuff->getSemA(); HANDLE sem_b = stuff->getSemB(); return IPCChannelEnd(std::move(stuff), &shared->a, &shared->b, sem_a, sem_b); +#else + return IPCChannelEnd(std::move(stuff), &shared->a, &shared->b); +#endif // !defined(_WIN32) } IPCChannelEnd IPCChannelEnd::makeB(std::unique_ptr stuff) { IPCChannelShared *shared = stuff->getShared(); +#if defined(_WIN32) HANDLE sem_a = stuff->getSemA(); HANDLE sem_b = stuff->getSemB(); return IPCChannelEnd(std::move(stuff), &shared->b, &shared->a, sem_b, sem_a); -} - -#else // defined(_WIN32) -IPCChannelEnd IPCChannelEnd::makeA(std::unique_ptr stuff) -{ - IPCChannelShared *shared = stuff->getShared(); - return IPCChannelEnd(std::move(stuff), &shared->a, &shared->b); -} - -IPCChannelEnd IPCChannelEnd::makeB(std::unique_ptr stuff) -{ - IPCChannelShared *shared = stuff->getShared(); +#else return IPCChannelEnd(std::move(stuff), &shared->b, &shared->a); -} #endif // !defined(_WIN32) +} -bool IPCChannelEnd::sendSmall(const void *data, size_t size) noexcept +void IPCChannelEnd::sendSmall(const void *data, size_t size) noexcept { m_out->size = size; memcpy(m_out->data, data, size); @@ -228,7 +235,6 @@ bool IPCChannelEnd::sendSmall(const void *data, size_t size) noexcept #else post(m_out); #endif - return true; } bool IPCChannelEnd::sendLarge(const void *data, size_t size, int timeout_ms) noexcept @@ -236,7 +242,8 @@ bool IPCChannelEnd::sendLarge(const void *data, size_t size, int timeout_ms) noe #if defined(_WIN32) DWORD timeout = get_timeout(timeout_ms); #else - struct timespec timeout, *timeoutp = set_timespec(&timeout, timeout_ms); + struct timespec timeout; + struct timespec *timeoutp = set_timespec(&timeout, timeout_ms); #endif m_out->size = size; do { @@ -269,7 +276,8 @@ bool IPCChannelEnd::recv(int timeout_ms) noexcept #if defined(_WIN32) DWORD timeout = get_timeout(timeout_ms); #else - struct timespec timeout, *timeoutp = set_timespec(&timeout, timeout_ms); + struct timespec timeout; + struct timespec *timeoutp = set_timespec(&timeout, timeout_ms); #endif #if defined(_WIN32) if (!wait(m_sem_in, timeout)) @@ -285,7 +293,10 @@ bool IPCChannelEnd::recv(int timeout_ms) noexcept try { m_large_recv.resize(size); } catch (...) { - return false; + // it's ok for us if an attacker wants to make us abort + std::string errmsg = std::string("std::vector::resize failed, size was: ") + + std::to_string(size); + FATAL_ERROR(errmsg.c_str()); } u8 *recv_data = m_large_recv.data(); m_recv_size = size; diff --git a/src/threading/ipc_channel.h b/src/threading/ipc_channel.h index b16b24da2..b085d286a 100644 --- a/src/threading/ipc_channel.h +++ b/src/threading/ipc_channel.h @@ -41,6 +41,11 @@ with this program; if not, write to the Free Software Foundation, Inc., IPCChannelShared is situated in shared memory and is used by both ends of the channel. + + There are currently 3 implementations for synchronisation: + * win32: uses win32 semaphore + * linux: uses futex, and does busy waiting if on x86/x86_64 + * other posix: uses posix mutex and condition variable */ #define IPC_CHANNEL_MSG_SIZE 8192U @@ -49,16 +54,21 @@ struct IPCChannelBuffer { #if !defined(_WIN32) #if defined(__linux__) + // possible values: + // 0: futex is not posted. reader will check value before blocking => no + // notify needed when posting + // 1: futex is posted + // 2: futex is not posted. reader is waiting with futex syscall, and needs + // to be notified std::atomic futex{0}; #else pthread_cond_t cond; pthread_mutex_t mutex; - // TODO: use atomic? - bool posted = false; + bool posted = false; // protected by mutex #endif #endif // !defined(_WIN32) size_t size; - u8 data[IPC_CHANNEL_MSG_SIZE]; //TODO: volatile? + u8 data[IPC_CHANNEL_MSG_SIZE]; IPCChannelBuffer(); ~IPCChannelBuffer(); @@ -90,13 +100,14 @@ public: static IPCChannelEnd makeA(std::unique_ptr stuff); static IPCChannelEnd makeB(std::unique_ptr stuff); - // If send, recv, or exchange return false, stop using the channel. + // If send, recv, or exchange return false (=timeout), stop using the channel. // Note: timeouts may be for receiving any response, not a whole message. bool send(const void *data, size_t size, int timeout_ms = -1) noexcept { if (size <= IPC_CHANNEL_MSG_SIZE) { - return sendSmall(data, size); + sendSmall(data, size); + return true; } else { return sendLarge(data, size, timeout_ms); } @@ -109,7 +120,7 @@ public: return send(data, size, timeout_ms) && recv(timeout_ms); } - // Get information about the last received message + // Get the content of the last received message inline const void *getRecvData() const noexcept { return m_recv_data; } inline size_t getRecvSize() const noexcept { return m_recv_size; } @@ -132,8 +143,9 @@ private: {} #endif - bool sendSmall(const void *data, size_t size) noexcept; + void sendSmall(const void *data, size_t size) noexcept; + // returns false on timeout bool sendLarge(const void *data, size_t size, int timeout_ms) noexcept; std::unique_ptr m_stuff; @@ -143,7 +155,7 @@ private: HANDLE m_sem_in; HANDLE m_sem_out; #endif - const void *m_recv_data; - size_t m_recv_size; + const void *m_recv_data = nullptr; + size_t m_recv_size = 0; std::vector m_large_recv; };