1
0
Fork 0
mirror of https://github.com/luanti-org/luanti.git synced 2025-08-11 17:51:04 +00:00

IPCChannelStuff -> IPCChannelResources

I think it should be simpler now.
This commit is contained in:
Desour 2024-03-02 01:38:49 +01:00
parent 6fca066a26
commit 157f22ef95
3 changed files with 110 additions and 59 deletions

View file

@ -215,27 +215,27 @@ static inline T read_once(const volatile T *var)
return *var; return *var;
} }
IPCChannelEnd IPCChannelEnd::makeA(std::unique_ptr<IPCChannelStuff> stuff) IPCChannelEnd IPCChannelEnd::makeA(std::unique_ptr<IPCChannelResources> resources)
{ {
IPCChannelShared *shared = stuff->getShared(); IPCChannelShared *shared = resources->data.shared;
#if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32) #if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32)
HANDLE sem_a = stuff->getSemA(); HANDLE sem_a = resources->data.sem_a;
HANDLE sem_b = stuff->getSemB(); HANDLE sem_b = resources->data.sem_b;
return IPCChannelEnd(std::move(stuff), &shared->a, &shared->b, sem_a, sem_b); return IPCChannelEnd(std::move(resources), &shared->a, &shared->b, sem_a, sem_b);
#else #else
return IPCChannelEnd(std::move(stuff), &shared->a, &shared->b); return IPCChannelEnd(std::move(resources), &shared->a, &shared->b);
#endif // !defined(IPC_CHANNEL_IMPLEMENTATION_WIN32) #endif // !defined(IPC_CHANNEL_IMPLEMENTATION_WIN32)
} }
IPCChannelEnd IPCChannelEnd::makeB(std::unique_ptr<IPCChannelStuff> stuff) IPCChannelEnd IPCChannelEnd::makeB(std::unique_ptr<IPCChannelResources> resources)
{ {
IPCChannelShared *shared = stuff->getShared(); IPCChannelShared *shared = resources->data.shared;
#if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32) #if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32)
HANDLE sem_a = stuff->getSemA(); HANDLE sem_a = resources->data.sem_a;
HANDLE sem_b = stuff->getSemB(); HANDLE sem_b = resources->data.sem_b;
return IPCChannelEnd(std::move(stuff), &shared->b, &shared->a, sem_b, sem_a); return IPCChannelEnd(std::move(resources), &shared->b, &shared->a, sem_b, sem_a);
#else #else
return IPCChannelEnd(std::move(stuff), &shared->b, &shared->a); return IPCChannelEnd(std::move(resources), &shared->b, &shared->a);
#endif // !defined(IPC_CHANNEL_IMPLEMENTATION_WIN32) #endif // !defined(IPC_CHANNEL_IMPLEMENTATION_WIN32)
} }

View file

@ -21,6 +21,7 @@ with this program; if not, write to the Free Software Foundation, Inc.,
#pragma once #pragma once
#include "irrlichttypes.h" #include "irrlichttypes.h"
#include "util/basic_macros.h"
#include <memory> #include <memory>
#include <string> #include <string>
#include <type_traits> #include <type_traits>
@ -87,20 +88,73 @@ struct IPCChannelBuffer
struct IPCChannelShared struct IPCChannelShared
{ {
IPCChannelBuffer a; // Both ends unmap, but last deleter also deletes shared resources.
IPCChannelBuffer b; std::atomic<u32> refcount{1};
IPCChannelBuffer a{};
IPCChannelBuffer b{};
}; };
// opaque owner for the shared mem and stuff struct IPCChannelResources
// users have to implement this
struct IPCChannelStuff
{ {
virtual ~IPCChannelStuff() = default; // new struct, because the win32 #if is annoying
virtual IPCChannelShared *getShared() = 0; struct Data
{
IPCChannelShared *shared = nullptr;
#if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32) #if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32)
virtual HANDLE getSemA() = 0; HANDLE sem_a;
virtual HANDLE getSemB() = 0; HANDLE sem_b;
#endif #endif
};
Data data;
// Used for previously unmanaged data_ (move semantics)
void setFirst(Data data_)
{
data = data_;
}
// Used for data_ that is already managed by a IPCChannelResources (grab()
// semantics)
bool setSecond(Data data_)
{
if (data_.shared->refcount.fetch_add(1) == 0) {
// other end dead, can't use resources
data_.shared->refcount.fetch_sub(1);
return false;
}
data = data_;
return true;
}
virtual void cleanupLast() noexcept = 0;
virtual void cleanupNotLast() noexcept = 0;
void cleanup() noexcept
{
if (!data.shared) {
// No owned resources. Maybe setSecond failed.
return;
}
if (data.shared->refcount.fetch_sub(1) == 1) {
// We are last, we clean up.
cleanupLast();
} else {
// We are not responsible for cleanup.
// Note: Shared resources may already be invalid by now.
cleanupNotLast();
}
}
IPCChannelResources() = default;
DISABLE_CLASS_COPY(IPCChannelResources)
// Child should call cleanup().
// (Parent destructor can not do this, because when it's called the child is
// already dead.)
virtual ~IPCChannelResources() = default;
}; };
class IPCChannelEnd class IPCChannelEnd
@ -108,10 +162,10 @@ class IPCChannelEnd
public: public:
IPCChannelEnd() = default; IPCChannelEnd() = default;
static IPCChannelEnd makeA(std::unique_ptr<IPCChannelStuff> stuff); static IPCChannelEnd makeA(std::unique_ptr<IPCChannelResources> resources);
static IPCChannelEnd makeB(std::unique_ptr<IPCChannelStuff> stuff); static IPCChannelEnd makeB(std::unique_ptr<IPCChannelResources> resources);
// If send, recv, or exchange return false (=timeout), stop using the channel. // If send, recv, or exchange return false (=timeout), stop using the channel. <--- TODO:why?
// Note: timeouts may be for receiving any response, not a whole message. // Note: timeouts may be for receiving any response, not a whole message.
bool send(const void *data, size_t size, int timeout_ms = -1) noexcept bool send(const void *data, size_t size, int timeout_ms = -1) noexcept
@ -138,18 +192,18 @@ public:
private: private:
#if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32) #if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32)
IPCChannelEnd( IPCChannelEnd(
std::unique_ptr<IPCChannelStuff> stuff, std::unique_ptr<IPCChannelResources> resources,
IPCChannelBuffer *in, IPCChannelBuffer *out, IPCChannelBuffer *in, IPCChannelBuffer *out,
HANDLE sem_in, HANDLE sem_out) : HANDLE sem_in, HANDLE sem_out) :
m_stuff(std::move(stuff)), m_resources(std::move(resources)),
m_in(in), m_out(out), m_in(in), m_out(out),
m_sem_in(sem_in), m_sem_out(sem_out) m_sem_in(sem_in), m_sem_out(sem_out)
{} {}
#else #else
IPCChannelEnd( IPCChannelEnd(
std::unique_ptr<IPCChannelStuff> stuff, std::unique_ptr<IPCChannelResources> resources,
IPCChannelBuffer *in, IPCChannelBuffer *out) : IPCChannelBuffer *in, IPCChannelBuffer *out) :
m_stuff(std::move(stuff)), m_resources(std::move(resources)),
m_in(in), m_out(out) m_in(in), m_out(out)
{} {}
#endif #endif
@ -159,7 +213,7 @@ private:
// returns false on timeout // returns false on timeout
bool sendLarge(const void *data, size_t size, int timeout_ms) noexcept; bool sendLarge(const void *data, size_t size, int timeout_ms) noexcept;
std::unique_ptr<IPCChannelStuff> m_stuff; std::unique_ptr<IPCChannelResources> m_resources;
IPCChannelBuffer *m_in = nullptr; IPCChannelBuffer *m_in = nullptr;
IPCChannelBuffer *m_out = nullptr; IPCChannelBuffer *m_out = nullptr;
#if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32) #if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32)

View file

@ -236,53 +236,50 @@ void TestThreading::testTLS()
void TestThreading::testIPCChannel() void TestThreading::testIPCChannel()
{ {
struct Stuff struct IPCChannelResourcesSingleProcess final : public IPCChannelResources
{ {
IPCChannelShared shared{}; void cleanupLast() noexcept override
#if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32)
HANDLE sem_a;
HANDLE sem_b;
#endif
Stuff()
{ {
delete data.shared;
#ifdef IPC_CHANNEL_IMPLEMENTATION_WIN32 #ifdef IPC_CHANNEL_IMPLEMENTATION_WIN32
HANDLE sem_a = CreateSemaphoreA(nullptr, 0, 1, nullptr); CloseHandle(data.sem_b);
UASSERT(sem_a != INVALID_HANDLE_VALUE); CloseHandle(data.sem_a);
HANDLE sem_b = CreateSemaphoreA(nullptr, 0, 1, nullptr);
UASSERT(sem_b != INVALID_HANDLE_VALUE);
#endif #endif
} }
~Stuff() void cleanupNotLast() noexcept override
{ {
#ifdef IPC_CHANNEL_IMPLEMENTATION_WIN32 // nothing to do (i.e. no unmapping needed)
CloseHandle(sem_b);
CloseHandle(sem_a);
#endif
} }
~IPCChannelResourcesSingleProcess() override { cleanup(); }
}; };
struct IPCChannelStuffSingleProcess final : public IPCChannelStuff auto resource_data = [] {
{ auto shared = new IPCChannelShared();
std::shared_ptr<Stuff> stuff;
IPCChannelStuffSingleProcess(std::shared_ptr<Stuff> stuff) : stuff(std::move(stuff)) {} #ifdef IPC_CHANNEL_IMPLEMENTATION_WIN32
~IPCChannelStuffSingleProcess() override = default; HANDLE sem_a = CreateSemaphoreA(nullptr, 0, 1, nullptr);
UASSERT(sem_a != INVALID_HANDLE_VALUE);
IPCChannelShared *getShared() override { return &stuff->shared; } HANDLE sem_b = CreateSemaphoreA(nullptr, 0, 1, nullptr);
#if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32) UASSERT(sem_b != INVALID_HANDLE_VALUE);
HANDLE getSemA() override { return stuff->sem_a; }
HANDLE getSemB() override { return stuff->sem_b; } return IPCChannelResources::Data{shared, sem_a, sem_b};
#else
return IPCChannelResources::Data{shared};
#endif #endif
}; }();
auto stuff = std::make_shared<Stuff>(); auto resources_first = std::make_unique<IPCChannelResourcesSingleProcess>();
resources_first->setFirst(resource_data);
IPCChannelEnd end_a = IPCChannelEnd::makeA(std::make_unique<IPCChannelStuffSingleProcess>(stuff)); IPCChannelEnd end_a = IPCChannelEnd::makeA(std::move(resources_first));
std::thread thread_b([=] { std::thread thread_b([=] {
IPCChannelEnd end_b = IPCChannelEnd::makeB(std::make_unique<IPCChannelStuffSingleProcess>(stuff)); auto resources_second = std::make_unique<IPCChannelResourcesSingleProcess>();
resources_second->setSecond(resource_data);
IPCChannelEnd end_b = IPCChannelEnd::makeB(std::move(resources_second));
for (;;) { for (;;) {
UASSERT(end_b.recv()); UASSERT(end_b.recv());