From cb99ce0f99ae012086d7afe1c6b550272d4f96e7 Mon Sep 17 00:00:00 2001 From: Desour Date: Mon, 7 Oct 2024 13:18:49 +0200 Subject: [PATCH] some more little fixes and improvements --- src/benchmark/benchmark_ipc_channel.cpp | 2 +- src/threading/ipc_channel.cpp | 39 +++++++++--------- src/threading/ipc_channel.h | 54 ++++++++++++++----------- src/unittest/test_threading.cpp | 23 +++++++---- 4 files changed, 68 insertions(+), 50 deletions(-) diff --git a/src/benchmark/benchmark_ipc_channel.cpp b/src/benchmark/benchmark_ipc_channel.cpp index b59ffe572..a8bcfad5e 100644 --- a/src/benchmark/benchmark_ipc_channel.cpp +++ b/src/benchmark/benchmark_ipc_channel.cpp @@ -25,7 +25,7 @@ TEST_CASE("benchmark_ipc_channel") { auto end_a_thread_b_p = make_test_ipc_channel([](IPCChannelEnd end_b) { // echos back messages. stops if "" is sent - for (;;) { + while (true) { end_b.recv(); end_b.send(end_b.getRecvData(), end_b.getRecvSize()); if (end_b.getRecvSize() == 0) diff --git a/src/threading/ipc_channel.cpp b/src/threading/ipc_channel.cpp index 73ef7b4d7..4b0f37a54 100644 --- a/src/threading/ipc_channel.cpp +++ b/src/threading/ipc_channel.cpp @@ -80,7 +80,7 @@ static bool wait(IPCChannelBuffer *buf, const struct timespec *timeout) noexcept if (buf->futex.load(std::memory_order_acquire) == 1) { // yes // reset it. (relaxed ordering is sufficient, because the other thread - // does not need to see the side effects we did before writing 0) + // does not need to see the side effects we did before unposting) buf->futex.store(0, std::memory_order_relaxed); return true; } @@ -93,7 +93,7 @@ static bool wait(IPCChannelBuffer *buf, const struct timespec *timeout) noexcept // wait with futex while (true) { // write 2 to show that we're futexing - if (buf->futex.exchange(2, std::memory_order_acq_rel) == 1) { + if (buf->futex.exchange(2, std::memory_order_acquire) == 1) { // it was posted in the meantime buf->futex.store(0, std::memory_order_relaxed); return true; @@ -113,7 +113,7 @@ static bool wait(IPCChannelBuffer *buf, const struct timespec *timeout) noexcept static void post(IPCChannelBuffer *buf) noexcept { - if (buf->futex.exchange(1, std::memory_order_acq_rel) == 2) { + if (buf->futex.exchange(1, std::memory_order_release) == 2) { // 2 means reader needs to be notified int s = futex(&buf->futex, FUTEX_WAKE, 1, nullptr, nullptr, 0); if (s == -1) { @@ -130,17 +130,18 @@ static void post(IPCChannelBuffer *buf) noexcept // returns false on timeout static bool wait(IPCChannelBuffer *buf, const struct timespec *timeout) noexcept { - bool timed_out = false; pthread_mutex_lock(&buf->mutex); - if (!buf->posted) { - if (timeout) - timed_out = pthread_cond_timedwait(&buf->cond, &buf->mutex, timeout) == ETIMEDOUT; - else + while (!buf->posted) { + if (timeout) { + if (pthread_cond_timedwait(&buf->cond, &buf->mutex, timeout) == ETIMEDOUT) + return false; + } else { pthread_cond_wait(&buf->cond, &buf->mutex); + } } buf->posted = false; pthread_mutex_unlock(&buf->mutex); - return !timed_out; + return true; } static void post(IPCChannelBuffer *buf) noexcept @@ -172,18 +173,19 @@ static bool wait_in(IPCChannelEnd::Dir *dir, u64 timeout_ms_abs) struct timespec timeout; struct timespec *timeoutp = nullptr; if (timeout_ms_abs > 0) { - // Relative time u64 tnow = porting::getTimeMs(); if (tnow > timeout_ms_abs) return false; u64 timeout_ms_rel = timeout_ms_abs - tnow; #if defined(IPC_CHANNEL_IMPLEMENTATION_LINUX_FUTEX) + // Relative time timeout.tv_sec = 0; timeout.tv_nsec = 0; #elif defined(IPC_CHANNEL_IMPLEMENTATION_POSIX) // Absolute time, relative to cond_clockid FATAL_ERROR_IF(clock_gettime(dir->buf_in->cond_clockid, &timeout) < 0, "clock_gettime failed"); + // prevent overflow if (timeout.tv_nsec >= 1000'000'000L) { timeout.tv_nsec -= 1000'000'000L; timeout.tv_sec += 1; @@ -339,7 +341,8 @@ bool IPCChannelEnd::recvWithTimeout(int timeout_ms) noexcept if (size <= IPC_CHANNEL_MSG_SIZE) { // small msg // (m_large_recv.size() is always >= IPC_CHANNEL_MSG_SIZE) - memcpy(m_large_recv.data(), m_dir.buf_in->data, size); + if (size != 0) + memcpy(m_large_recv.data(), m_dir.buf_in->data, size); } else { // large msg @@ -360,7 +363,8 @@ bool IPCChannelEnd::recvWithTimeout(int timeout_ms) noexcept if (!wait_in(&m_dir, timeout_ms_abs)) return false; } while (size > IPC_CHANNEL_MSG_SIZE); - memcpy(recv_data, m_dir.buf_in->data, size); + if (size != 0) + memcpy(recv_data, m_dir.buf_in->data, size); } return true; } @@ -382,18 +386,15 @@ std::pair make_test_ipc_channel( #endif }(); - auto resources_first = std::make_unique(); - resources_first->setFirst(resource_data); - - IPCChannelEnd end_a = IPCChannelEnd::makeA(std::move(resources_first)); - std::thread thread_b([=] { - auto resources_second = std::make_unique(); - resources_second->setSecond(resource_data); + auto resources_second = IPCChannelResourcesSingleProcess::makeSecond(resource_data); IPCChannelEnd end_b = IPCChannelEnd::makeB(std::move(resources_second)); fun(std::move(end_b)); }); + auto resources_first = IPCChannelResourcesSingleProcess::makeFirst(resource_data); + IPCChannelEnd end_a = IPCChannelEnd::makeA(std::move(resources_first)); + return {std::move(end_a), std::move(thread_b)}; } diff --git a/src/threading/ipc_channel.h b/src/threading/ipc_channel.h index 68a570ed8..09acd09ec 100644 --- a/src/threading/ipc_channel.h +++ b/src/threading/ipc_channel.h @@ -56,7 +56,7 @@ with this program; if not, write to the Free Software Foundation, Inc., * other posix: uses posix mutex and condition variable */ -#define IPC_CHANNEL_MSG_SIZE 0x2000U +constexpr size_t IPC_CHANNEL_MSG_SIZE = 0x2000; struct IPCChannelBuffer { @@ -83,6 +83,7 @@ struct IPCChannelBuffer u8 data[IPC_CHANNEL_MSG_SIZE] = {}; IPCChannelBuffer(); + DISABLE_CLASS_COPY(IPCChannelBuffer) ~IPCChannelBuffer(); // Note: only destruct once, i.e. in one process }; @@ -96,18 +97,8 @@ struct IPCChannelShared IPCChannelBuffer b{}; }; -struct IPCChannelDirection -{ - IPCChannelBuffer *buf_in; - IPCChannelBuffer *buf_out; -#if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32) - HANDLE sem_in; - HANDLE sem_out; -#endif -}; - -// Each end holds this. One is A, one is B. -// Implementors of this struct decide how to allocate buffers (i.e. malloc or mmap). +// Interface for managing the shared resources. +// Implementors decide whether to use malloc or mmap. struct IPCChannelResources { // new struct, because the win32 #if is annoying @@ -123,6 +114,15 @@ struct IPCChannelResources Data data; + IPCChannelResources() = default; + DISABLE_CLASS_COPY(IPCChannelResources) + + // Child should call cleanup(). + // (Parent destructor can not do this, because when it's called the child is + // already dead.) + virtual ~IPCChannelResources() = default; + +protected: // Used for previously unmanaged data_ (move semantics) void setFirst(Data data_) { @@ -160,14 +160,6 @@ struct IPCChannelResources cleanupNotLast(); } } - - IPCChannelResources() = default; - DISABLE_CLASS_COPY(IPCChannelResources) - - // Child should call cleanup(). - // (Parent destructor can not do this, because when it's called the child is - // already dead.) - virtual ~IPCChannelResources() = default; }; class IPCChannelEnd @@ -184,13 +176,15 @@ public: #endif }; + // Unusable empty end IPCChannelEnd() = default; + // Construct end A or end B from resources static IPCChannelEnd makeA(std::unique_ptr resources); static IPCChannelEnd makeB(std::unique_ptr resources); - // Note: timeouts may be for receiving any response, not a whole message. - // If send, recv, or exchange return false (=timeout), stop using the channel. + // Note: Timeouts may be for receiving any response, not a whole message. + // Therefore, if a timeout occurs, stop using the channel. // Returns false on timeout [[nodiscard]] @@ -276,6 +270,20 @@ struct IPCChannelResourcesSingleProcess final : public IPCChannelResources } ~IPCChannelResourcesSingleProcess() override { cleanup(); } + + static std::unique_ptr makeFirst(Data data) + { + auto ret = std::make_unique(); + ret->setFirst(data); + return ret; + } + + static std::unique_ptr makeSecond(Data data) + { + auto ret = std::make_unique(); + ret->setSecond(data); + return ret; + } }; // For testing diff --git a/src/unittest/test_threading.cpp b/src/unittest/test_threading.cpp index 18b519838..51d757e5e 100644 --- a/src/unittest/test_threading.cpp +++ b/src/unittest/test_threading.cpp @@ -235,7 +235,7 @@ void TestThreading::testIPCChannel() { auto [end_a, thread_b] = make_test_ipc_channel([](IPCChannelEnd end_b) { // echos back messages. stops if "" is sent - for (;;) { + while (true) { UASSERT(end_b.recvWithTimeout(-1)); UASSERT(end_b.sendWithTimeout(end_b.getRecvData(), end_b.getRecvSize(), -1)); if (end_b.getRecvSize() == 0) @@ -243,20 +243,29 @@ void TestThreading::testIPCChannel() } }); - u8 buf[20000] = {}; - for (int i = sizeof(buf); i > 0; i -= 100) { - buf[i - 1] = 123; - UASSERT(end_a.exchangeWithTimeout(buf, i, -1)); + u8 buf1[20000] = {}; + for (int i = sizeof(buf1); i > 0; i -= 100) { + buf1[i - 1] = 123; + UASSERT(end_a.exchangeWithTimeout(buf1, i, -1)); UASSERTEQ(int, end_a.getRecvSize(), i); UASSERTEQ(int, reinterpret_cast(end_a.getRecvData())[i - 1], 123); } + u8 buf2[IPC_CHANNEL_MSG_SIZE * 3 + 10]; + end_a.exchange(buf2, IPC_CHANNEL_MSG_SIZE * 3 + 10); + end_a.exchange(buf2, IPC_CHANNEL_MSG_SIZE * 3); + end_a.exchange(buf2, IPC_CHANNEL_MSG_SIZE); + end_a.exchange(buf2, IPC_CHANNEL_MSG_SIZE * 2); + end_a.exchange(buf2, IPC_CHANNEL_MSG_SIZE - 1); + end_a.exchange(buf2, IPC_CHANNEL_MSG_SIZE + 1); + end_a.exchange(buf2, 1); + // stop thread_b - UASSERT(end_a.exchangeWithTimeout(buf, 0, -1)); + UASSERT(end_a.exchangeWithTimeout(nullptr, 0, -1)); UASSERTEQ(int, end_a.getRecvSize(), 0); thread_b.join(); // other side dead ==> should time out - UASSERT(!end_a.exchangeWithTimeout(buf, 0, 200)); + UASSERT(!end_a.exchangeWithTimeout(nullptr, 0, 200)); }