diff --git a/src/threading/ipc_channel.cpp b/src/threading/ipc_channel.cpp index 082c20461..202a91ecb 100644 --- a/src/threading/ipc_channel.cpp +++ b/src/threading/ipc_channel.cpp @@ -1,6 +1,6 @@ /* Minetest -Copyright (C) 2022 Desour +Copyright (C) 2022 DS Copyright (C) 2022 TurkeyMcMac, Jude Melton-Houghton This program is free software; you can redistribute it and/or modify @@ -188,6 +188,37 @@ static struct timespec *set_timespec(struct timespec *ts, int ms) } #endif // !defined(_WIN32) +#if defined(_WIN32) +IPCChannelEnd IPCChannelEnd::makeA(std::unique_ptr stuff) +{ + IPCChannelShared *shared = stuff->getShared(); + HANDLE sem_a = stuff->getSemA(); + HANDLE sem_b = stuff->getSemB(); + return IPCChannelEnd(std::move(stuff), &shared->a, &shared->b, sem_a, sem_b); +} + +IPCChannelEnd IPCChannelEnd::makeB(std::unique_ptr stuff) +{ + IPCChannelShared *shared = stuff->getShared(); + HANDLE sem_a = stuff->getSemA(); + HANDLE sem_b = stuff->getSemB(); + return IPCChannelEnd(std::move(stuff), &shared->b, &shared->a, sem_b, sem_a); +} + +#else // defined(_WIN32) +IPCChannelEnd IPCChannelEnd::makeA(std::unique_ptr stuff) +{ + IPCChannelShared *shared = stuff->getShared(); + return IPCChannelEnd(std::move(stuff), &shared->a, &shared->b); +} + +IPCChannelEnd IPCChannelEnd::makeB(std::unique_ptr stuff) +{ + IPCChannelShared *shared = stuff->getShared(); + return IPCChannelEnd(std::move(stuff), &shared->b, &shared->a); +} +#endif // !defined(_WIN32) + bool IPCChannelEnd::sendSmall(const void *data, size_t size) noexcept { m_out->size = size; diff --git a/src/threading/ipc_channel.h b/src/threading/ipc_channel.h index 3b1a44d29..b16b24da2 100644 --- a/src/threading/ipc_channel.h +++ b/src/threading/ipc_channel.h @@ -1,6 +1,6 @@ /* Minetest -Copyright (C) 2022 Desour +Copyright (C) 2022 DS Copyright (C) 2022 TurkeyMcMac, Jude Melton-Houghton This program is free software; you can redistribute it and/or modify @@ -21,9 +21,11 @@ with this program; if not, write to the Free Software Foundation, Inc., #pragma once #include "irrlichttypes.h" +#include #include #include #include + #if defined(_WIN32) #include #elif defined(__linux__) @@ -47,7 +49,7 @@ struct IPCChannelBuffer { #if !defined(_WIN32) #if defined(__linux__) - std::atomic futex = ATOMIC_VAR_INIT(0U); + std::atomic futex{0}; #else pthread_cond_t cond; pthread_mutex_t mutex; @@ -56,7 +58,7 @@ struct IPCChannelBuffer #endif #endif // !defined(_WIN32) size_t size; - u8 data[IPC_CHANNEL_MSG_SIZE]; + u8 data[IPC_CHANNEL_MSG_SIZE]; //TODO: volatile? IPCChannelBuffer(); ~IPCChannelBuffer(); @@ -68,32 +70,25 @@ struct IPCChannelShared IPCChannelBuffer b; }; +// opaque owner for the shared mem and stuff +// users have to implement this +struct IPCChannelStuff +{ + virtual ~IPCChannelStuff() = default; + virtual IPCChannelShared *getShared() = 0; +#ifdef _WIN32 + virtual HANDLE getSemA() = 0; + virtual HANDLE getSemB() = 0; +#endif +}; + class IPCChannelEnd { public: IPCChannelEnd() = default; -#if defined(_WIN32) - static IPCChannelEnd makeA(IPCChannelShared *shared, HANDLE sem_a, HANDLE sem_b) - { - return IPCChannelEnd(&shared->a, &shared->b, sem_a, sem_b); - } - - static IPCChannelEnd makeB(IPCChannelShared *shared, HANDLE sem_a, HANDLE sem_b) - { - return IPCChannelEnd(&shared->b, &shared->a, sem_b, sem_a); - } -#else - static IPCChannelEnd makeA(IPCChannelShared *shared) - { - return IPCChannelEnd(&shared->a, &shared->b); - } - - static IPCChannelEnd makeB(IPCChannelShared *shared) - { - return IPCChannelEnd(&shared->b, &shared->a); - } -#endif // !defined(_WIN32) + static IPCChannelEnd makeA(std::unique_ptr stuff); + static IPCChannelEnd makeB(std::unique_ptr stuff); // If send, recv, or exchange return false, stop using the channel. // Note: timeouts may be for receiving any response, not a whole message. @@ -120,17 +115,28 @@ public: private: #if defined(_WIN32) - IPCChannelEnd(IPCChannelBuffer *in, IPCChannelBuffer *out, HANDLE sem_in, HANDLE sem_out): - m_in(in), m_out(out), m_sem_in(sem_in), m_sem_out(sem_out) + IPCChannelEnd( + std::unique_ptr stuff, + IPCChannelBuffer *in, IPCChannelBuffer *out, + HANDLE sem_in, HANDLE sem_out) : + m_stuff(std::move(stuff)), + m_in(in), m_out(out), + m_sem_in(sem_in), m_sem_out(sem_out) {} #else - IPCChannelEnd(IPCChannelBuffer *in, IPCChannelBuffer *out): m_in(in), m_out(out) {} + IPCChannelEnd( + std::unique_ptr stuff, + IPCChannelBuffer *in, IPCChannelBuffer *out) : + m_stuff(std::move(stuff)), + m_in(in), m_out(out) + {} #endif bool sendSmall(const void *data, size_t size) noexcept; bool sendLarge(const void *data, size_t size, int timeout_ms) noexcept; + std::unique_ptr m_stuff; IPCChannelBuffer *m_in = nullptr; IPCChannelBuffer *m_out = nullptr; #if defined(_WIN32) diff --git a/src/unittest/test_threading.cpp b/src/unittest/test_threading.cpp index 71742e6c1..ed56abe61 100644 --- a/src/unittest/test_threading.cpp +++ b/src/unittest/test_threading.cpp @@ -236,28 +236,53 @@ void TestThreading::testTLS() void TestThreading::testIPCChannel() { + struct Stuff + { + IPCChannelShared shared{}; #if defined(_WIN32) - HANDLE sem_a = CreateSemaphoreA(nullptr, 0, 1, nullptr); - UASSERT(sem_a != INVALID_HANDLE_VALUE); - - HANDLE sem_b = CreateSemaphoreA(nullptr, 0, 1, nullptr); - UASSERT(sem_b != INVALID_HANDLE_VALUE); + HANDLE sem_a; + HANDLE sem_b; #endif + Stuff() + { +#ifdef _WIN32 + HANDLE sem_a = CreateSemaphoreA(nullptr, 0, 1, nullptr); + UASSERT(sem_a != INVALID_HANDLE_VALUE); - IPCChannelShared shared, *sharedp = &shared; + HANDLE sem_b = CreateSemaphoreA(nullptr, 0, 1, nullptr); + UASSERT(sem_b != INVALID_HANDLE_VALUE); +#endif + } + ~Stuff() + { +#ifdef _WIN32 + CloseHandle(sem_b); + CloseHandle(sem_a); +#endif + } + }; + + struct IPCChannelStuffSingleProcess final : public IPCChannelStuff + { + std::shared_ptr stuff; + + IPCChannelStuffSingleProcess(std::shared_ptr stuff) : stuff(std::move(stuff)) {} + ~IPCChannelStuffSingleProcess() override = default; + + IPCChannelShared *getShared() override { return &stuff->shared; } #if defined(_WIN32) - IPCChannelEnd end_a = IPCChannelEnd::makeA(sharedp, sem_a, sem_b); -#else - IPCChannelEnd end_a = IPCChannelEnd::makeA(sharedp); + HANDLE getSemA() override { return stuff->sem_a; } + HANDLE getSemB() override { return stuff->sem_b; } #endif + }; + + auto stuff = std::make_shared(); + + IPCChannelEnd end_a = IPCChannelEnd::makeA(std::make_unique(stuff)); std::thread thread_b([=] { -#if defined(_WIN32) - IPCChannelEnd end_b = IPCChannelEnd::makeB(sharedp, sem_a, sem_b); -#else - IPCChannelEnd end_b = IPCChannelEnd::makeB(sharedp); -#endif + IPCChannelEnd end_b = IPCChannelEnd::makeB(std::make_unique(stuff)); for (;;) { end_b.recv(); @@ -281,10 +306,4 @@ void TestThreading::testIPCChannel() thread_b.join(); UASSERT(!end_a.exchange(buf, 0, 1000)); - -#if defined(_WIN32) - CloseHandle(sem_b); - - CloseHandle(sem_a); -#endif }