From 157f22ef95eea4075a8dc5c301d4785d3301bab1 Mon Sep 17 00:00:00 2001 From: Desour Date: Sat, 2 Mar 2024 01:38:49 +0100 Subject: [PATCH] IPCChannelStuff -> IPCChannelResources I think it should be simpler now. --- src/threading/ipc_channel.cpp | 24 ++++----- src/threading/ipc_channel.h | 88 ++++++++++++++++++++++++++------- src/unittest/test_threading.cpp | 57 ++++++++++----------- 3 files changed, 110 insertions(+), 59 deletions(-) diff --git a/src/threading/ipc_channel.cpp b/src/threading/ipc_channel.cpp index dce4ed602..30460a650 100644 --- a/src/threading/ipc_channel.cpp +++ b/src/threading/ipc_channel.cpp @@ -215,27 +215,27 @@ static inline T read_once(const volatile T *var) return *var; } -IPCChannelEnd IPCChannelEnd::makeA(std::unique_ptr stuff) +IPCChannelEnd IPCChannelEnd::makeA(std::unique_ptr resources) { - IPCChannelShared *shared = stuff->getShared(); + IPCChannelShared *shared = resources->data.shared; #if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32) - HANDLE sem_a = stuff->getSemA(); - HANDLE sem_b = stuff->getSemB(); - return IPCChannelEnd(std::move(stuff), &shared->a, &shared->b, sem_a, sem_b); + HANDLE sem_a = resources->data.sem_a; + HANDLE sem_b = resources->data.sem_b; + return IPCChannelEnd(std::move(resources), &shared->a, &shared->b, sem_a, sem_b); #else - return IPCChannelEnd(std::move(stuff), &shared->a, &shared->b); + return IPCChannelEnd(std::move(resources), &shared->a, &shared->b); #endif // !defined(IPC_CHANNEL_IMPLEMENTATION_WIN32) } -IPCChannelEnd IPCChannelEnd::makeB(std::unique_ptr stuff) +IPCChannelEnd IPCChannelEnd::makeB(std::unique_ptr resources) { - IPCChannelShared *shared = stuff->getShared(); + IPCChannelShared *shared = resources->data.shared; #if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32) - HANDLE sem_a = stuff->getSemA(); - HANDLE sem_b = stuff->getSemB(); - return IPCChannelEnd(std::move(stuff), &shared->b, &shared->a, sem_b, sem_a); + HANDLE sem_a = resources->data.sem_a; + HANDLE sem_b = resources->data.sem_b; + return IPCChannelEnd(std::move(resources), &shared->b, &shared->a, sem_b, sem_a); #else - return IPCChannelEnd(std::move(stuff), &shared->b, &shared->a); + return IPCChannelEnd(std::move(resources), &shared->b, &shared->a); #endif // !defined(IPC_CHANNEL_IMPLEMENTATION_WIN32) } diff --git a/src/threading/ipc_channel.h b/src/threading/ipc_channel.h index 5c68173ea..5b0a381e4 100644 --- a/src/threading/ipc_channel.h +++ b/src/threading/ipc_channel.h @@ -21,6 +21,7 @@ with this program; if not, write to the Free Software Foundation, Inc., #pragma once #include "irrlichttypes.h" +#include "util/basic_macros.h" #include #include #include @@ -87,20 +88,73 @@ struct IPCChannelBuffer struct IPCChannelShared { - IPCChannelBuffer a; - IPCChannelBuffer b; + // Both ends unmap, but last deleter also deletes shared resources. + std::atomic refcount{1}; + + IPCChannelBuffer a{}; + IPCChannelBuffer b{}; }; -// opaque owner for the shared mem and stuff -// users have to implement this -struct IPCChannelStuff +struct IPCChannelResources { - virtual ~IPCChannelStuff() = default; - virtual IPCChannelShared *getShared() = 0; + // new struct, because the win32 #if is annoying + struct Data + { + IPCChannelShared *shared = nullptr; + #if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32) - virtual HANDLE getSemA() = 0; - virtual HANDLE getSemB() = 0; + HANDLE sem_a; + HANDLE sem_b; #endif + }; + + Data data; + + // Used for previously unmanaged data_ (move semantics) + void setFirst(Data data_) + { + data = data_; + } + + // Used for data_ that is already managed by a IPCChannelResources (grab() + // semantics) + bool setSecond(Data data_) + { + if (data_.shared->refcount.fetch_add(1) == 0) { + // other end dead, can't use resources + data_.shared->refcount.fetch_sub(1); + return false; + } + data = data_; + return true; + } + + virtual void cleanupLast() noexcept = 0; + virtual void cleanupNotLast() noexcept = 0; + + void cleanup() noexcept + { + if (!data.shared) { + // No owned resources. Maybe setSecond failed. + return; + } + if (data.shared->refcount.fetch_sub(1) == 1) { + // We are last, we clean up. + cleanupLast(); + } else { + // We are not responsible for cleanup. + // Note: Shared resources may already be invalid by now. + cleanupNotLast(); + } + } + + IPCChannelResources() = default; + DISABLE_CLASS_COPY(IPCChannelResources) + + // Child should call cleanup(). + // (Parent destructor can not do this, because when it's called the child is + // already dead.) + virtual ~IPCChannelResources() = default; }; class IPCChannelEnd @@ -108,10 +162,10 @@ class IPCChannelEnd public: IPCChannelEnd() = default; - static IPCChannelEnd makeA(std::unique_ptr stuff); - static IPCChannelEnd makeB(std::unique_ptr stuff); + static IPCChannelEnd makeA(std::unique_ptr resources); + static IPCChannelEnd makeB(std::unique_ptr resources); - // If send, recv, or exchange return false (=timeout), stop using the channel. + // If send, recv, or exchange return false (=timeout), stop using the channel. <--- TODO:why? // Note: timeouts may be for receiving any response, not a whole message. bool send(const void *data, size_t size, int timeout_ms = -1) noexcept @@ -138,18 +192,18 @@ public: private: #if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32) IPCChannelEnd( - std::unique_ptr stuff, + std::unique_ptr resources, IPCChannelBuffer *in, IPCChannelBuffer *out, HANDLE sem_in, HANDLE sem_out) : - m_stuff(std::move(stuff)), + m_resources(std::move(resources)), m_in(in), m_out(out), m_sem_in(sem_in), m_sem_out(sem_out) {} #else IPCChannelEnd( - std::unique_ptr stuff, + std::unique_ptr resources, IPCChannelBuffer *in, IPCChannelBuffer *out) : - m_stuff(std::move(stuff)), + m_resources(std::move(resources)), m_in(in), m_out(out) {} #endif @@ -159,7 +213,7 @@ private: // returns false on timeout bool sendLarge(const void *data, size_t size, int timeout_ms) noexcept; - std::unique_ptr m_stuff; + std::unique_ptr m_resources; IPCChannelBuffer *m_in = nullptr; IPCChannelBuffer *m_out = nullptr; #if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32) diff --git a/src/unittest/test_threading.cpp b/src/unittest/test_threading.cpp index e84fe8a24..a8950d114 100644 --- a/src/unittest/test_threading.cpp +++ b/src/unittest/test_threading.cpp @@ -236,53 +236,50 @@ void TestThreading::testTLS() void TestThreading::testIPCChannel() { - struct Stuff + struct IPCChannelResourcesSingleProcess final : public IPCChannelResources { - IPCChannelShared shared{}; -#if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32) - HANDLE sem_a; - HANDLE sem_b; -#endif - Stuff() + void cleanupLast() noexcept override { + delete data.shared; #ifdef IPC_CHANNEL_IMPLEMENTATION_WIN32 - HANDLE sem_a = CreateSemaphoreA(nullptr, 0, 1, nullptr); - UASSERT(sem_a != INVALID_HANDLE_VALUE); - - HANDLE sem_b = CreateSemaphoreA(nullptr, 0, 1, nullptr); - UASSERT(sem_b != INVALID_HANDLE_VALUE); + CloseHandle(data.sem_b); + CloseHandle(data.sem_a); #endif } - ~Stuff() + void cleanupNotLast() noexcept override { -#ifdef IPC_CHANNEL_IMPLEMENTATION_WIN32 - CloseHandle(sem_b); - CloseHandle(sem_a); -#endif + // nothing to do (i.e. no unmapping needed) } + + ~IPCChannelResourcesSingleProcess() override { cleanup(); } }; - struct IPCChannelStuffSingleProcess final : public IPCChannelStuff - { - std::shared_ptr stuff; + auto resource_data = [] { + auto shared = new IPCChannelShared(); - IPCChannelStuffSingleProcess(std::shared_ptr stuff) : stuff(std::move(stuff)) {} - ~IPCChannelStuffSingleProcess() override = default; +#ifdef IPC_CHANNEL_IMPLEMENTATION_WIN32 + HANDLE sem_a = CreateSemaphoreA(nullptr, 0, 1, nullptr); + UASSERT(sem_a != INVALID_HANDLE_VALUE); - IPCChannelShared *getShared() override { return &stuff->shared; } -#if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32) - HANDLE getSemA() override { return stuff->sem_a; } - HANDLE getSemB() override { return stuff->sem_b; } + HANDLE sem_b = CreateSemaphoreA(nullptr, 0, 1, nullptr); + UASSERT(sem_b != INVALID_HANDLE_VALUE); + + return IPCChannelResources::Data{shared, sem_a, sem_b}; +#else + return IPCChannelResources::Data{shared}; #endif - }; + }(); - auto stuff = std::make_shared(); + auto resources_first = std::make_unique(); + resources_first->setFirst(resource_data); - IPCChannelEnd end_a = IPCChannelEnd::makeA(std::make_unique(stuff)); + IPCChannelEnd end_a = IPCChannelEnd::makeA(std::move(resources_first)); std::thread thread_b([=] { - IPCChannelEnd end_b = IPCChannelEnd::makeB(std::make_unique(stuff)); + auto resources_second = std::make_unique(); + resources_second->setSecond(resource_data); + IPCChannelEnd end_b = IPCChannelEnd::makeB(std::move(resources_second)); for (;;) { UASSERT(end_b.recv());