1
0
Fork 0
mirror of https://github.com/luanti-org/luanti.git synced 2025-08-11 17:51:04 +00:00

some more little fixes and improvements

This commit is contained in:
Desour 2024-10-07 13:18:49 +02:00
parent 75467e3695
commit cb99ce0f99
4 changed files with 68 additions and 50 deletions

View file

@ -25,7 +25,7 @@ TEST_CASE("benchmark_ipc_channel")
{
auto end_a_thread_b_p = make_test_ipc_channel([](IPCChannelEnd end_b) {
// echos back messages. stops if "" is sent
for (;;) {
while (true) {
end_b.recv();
end_b.send(end_b.getRecvData(), end_b.getRecvSize());
if (end_b.getRecvSize() == 0)

View file

@ -80,7 +80,7 @@ static bool wait(IPCChannelBuffer *buf, const struct timespec *timeout) noexcept
if (buf->futex.load(std::memory_order_acquire) == 1) {
// yes
// reset it. (relaxed ordering is sufficient, because the other thread
// does not need to see the side effects we did before writing 0)
// does not need to see the side effects we did before unposting)
buf->futex.store(0, std::memory_order_relaxed);
return true;
}
@ -93,7 +93,7 @@ static bool wait(IPCChannelBuffer *buf, const struct timespec *timeout) noexcept
// wait with futex
while (true) {
// write 2 to show that we're futexing
if (buf->futex.exchange(2, std::memory_order_acq_rel) == 1) {
if (buf->futex.exchange(2, std::memory_order_acquire) == 1) {
// it was posted in the meantime
buf->futex.store(0, std::memory_order_relaxed);
return true;
@ -113,7 +113,7 @@ static bool wait(IPCChannelBuffer *buf, const struct timespec *timeout) noexcept
static void post(IPCChannelBuffer *buf) noexcept
{
if (buf->futex.exchange(1, std::memory_order_acq_rel) == 2) {
if (buf->futex.exchange(1, std::memory_order_release) == 2) {
// 2 means reader needs to be notified
int s = futex(&buf->futex, FUTEX_WAKE, 1, nullptr, nullptr, 0);
if (s == -1) {
@ -130,17 +130,18 @@ static void post(IPCChannelBuffer *buf) noexcept
// returns false on timeout
static bool wait(IPCChannelBuffer *buf, const struct timespec *timeout) noexcept
{
bool timed_out = false;
pthread_mutex_lock(&buf->mutex);
if (!buf->posted) {
if (timeout)
timed_out = pthread_cond_timedwait(&buf->cond, &buf->mutex, timeout) == ETIMEDOUT;
else
while (!buf->posted) {
if (timeout) {
if (pthread_cond_timedwait(&buf->cond, &buf->mutex, timeout) == ETIMEDOUT)
return false;
} else {
pthread_cond_wait(&buf->cond, &buf->mutex);
}
}
buf->posted = false;
pthread_mutex_unlock(&buf->mutex);
return !timed_out;
return true;
}
static void post(IPCChannelBuffer *buf) noexcept
@ -172,18 +173,19 @@ static bool wait_in(IPCChannelEnd::Dir *dir, u64 timeout_ms_abs)
struct timespec timeout;
struct timespec *timeoutp = nullptr;
if (timeout_ms_abs > 0) {
// Relative time
u64 tnow = porting::getTimeMs();
if (tnow > timeout_ms_abs)
return false;
u64 timeout_ms_rel = timeout_ms_abs - tnow;
#if defined(IPC_CHANNEL_IMPLEMENTATION_LINUX_FUTEX)
// Relative time
timeout.tv_sec = 0;
timeout.tv_nsec = 0;
#elif defined(IPC_CHANNEL_IMPLEMENTATION_POSIX)
// Absolute time, relative to cond_clockid
FATAL_ERROR_IF(clock_gettime(dir->buf_in->cond_clockid, &timeout) < 0,
"clock_gettime failed");
// prevent overflow
if (timeout.tv_nsec >= 1000'000'000L) {
timeout.tv_nsec -= 1000'000'000L;
timeout.tv_sec += 1;
@ -339,7 +341,8 @@ bool IPCChannelEnd::recvWithTimeout(int timeout_ms) noexcept
if (size <= IPC_CHANNEL_MSG_SIZE) {
// small msg
// (m_large_recv.size() is always >= IPC_CHANNEL_MSG_SIZE)
memcpy(m_large_recv.data(), m_dir.buf_in->data, size);
if (size != 0)
memcpy(m_large_recv.data(), m_dir.buf_in->data, size);
} else {
// large msg
@ -360,7 +363,8 @@ bool IPCChannelEnd::recvWithTimeout(int timeout_ms) noexcept
if (!wait_in(&m_dir, timeout_ms_abs))
return false;
} while (size > IPC_CHANNEL_MSG_SIZE);
memcpy(recv_data, m_dir.buf_in->data, size);
if (size != 0)
memcpy(recv_data, m_dir.buf_in->data, size);
}
return true;
}
@ -382,18 +386,15 @@ std::pair<IPCChannelEnd, std::thread> make_test_ipc_channel(
#endif
}();
auto resources_first = std::make_unique<IPCChannelResourcesSingleProcess>();
resources_first->setFirst(resource_data);
IPCChannelEnd end_a = IPCChannelEnd::makeA(std::move(resources_first));
std::thread thread_b([=] {
auto resources_second = std::make_unique<IPCChannelResourcesSingleProcess>();
resources_second->setSecond(resource_data);
auto resources_second = IPCChannelResourcesSingleProcess::makeSecond(resource_data);
IPCChannelEnd end_b = IPCChannelEnd::makeB(std::move(resources_second));
fun(std::move(end_b));
});
auto resources_first = IPCChannelResourcesSingleProcess::makeFirst(resource_data);
IPCChannelEnd end_a = IPCChannelEnd::makeA(std::move(resources_first));
return {std::move(end_a), std::move(thread_b)};
}

View file

@ -56,7 +56,7 @@ with this program; if not, write to the Free Software Foundation, Inc.,
* other posix: uses posix mutex and condition variable
*/
#define IPC_CHANNEL_MSG_SIZE 0x2000U
constexpr size_t IPC_CHANNEL_MSG_SIZE = 0x2000;
struct IPCChannelBuffer
{
@ -83,6 +83,7 @@ struct IPCChannelBuffer
u8 data[IPC_CHANNEL_MSG_SIZE] = {};
IPCChannelBuffer();
DISABLE_CLASS_COPY(IPCChannelBuffer)
~IPCChannelBuffer(); // Note: only destruct once, i.e. in one process
};
@ -96,18 +97,8 @@ struct IPCChannelShared
IPCChannelBuffer b{};
};
struct IPCChannelDirection
{
IPCChannelBuffer *buf_in;
IPCChannelBuffer *buf_out;
#if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32)
HANDLE sem_in;
HANDLE sem_out;
#endif
};
// Each end holds this. One is A, one is B.
// Implementors of this struct decide how to allocate buffers (i.e. malloc or mmap).
// Interface for managing the shared resources.
// Implementors decide whether to use malloc or mmap.
struct IPCChannelResources
{
// new struct, because the win32 #if is annoying
@ -123,6 +114,15 @@ struct IPCChannelResources
Data data;
IPCChannelResources() = default;
DISABLE_CLASS_COPY(IPCChannelResources)
// Child should call cleanup().
// (Parent destructor can not do this, because when it's called the child is
// already dead.)
virtual ~IPCChannelResources() = default;
protected:
// Used for previously unmanaged data_ (move semantics)
void setFirst(Data data_)
{
@ -160,14 +160,6 @@ struct IPCChannelResources
cleanupNotLast();
}
}
IPCChannelResources() = default;
DISABLE_CLASS_COPY(IPCChannelResources)
// Child should call cleanup().
// (Parent destructor can not do this, because when it's called the child is
// already dead.)
virtual ~IPCChannelResources() = default;
};
class IPCChannelEnd
@ -184,13 +176,15 @@ public:
#endif
};
// Unusable empty end
IPCChannelEnd() = default;
// Construct end A or end B from resources
static IPCChannelEnd makeA(std::unique_ptr<IPCChannelResources> resources);
static IPCChannelEnd makeB(std::unique_ptr<IPCChannelResources> resources);
// Note: timeouts may be for receiving any response, not a whole message.
// If send, recv, or exchange return false (=timeout), stop using the channel.
// Note: Timeouts may be for receiving any response, not a whole message.
// Therefore, if a timeout occurs, stop using the channel.
// Returns false on timeout
[[nodiscard]]
@ -276,6 +270,20 @@ struct IPCChannelResourcesSingleProcess final : public IPCChannelResources
}
~IPCChannelResourcesSingleProcess() override { cleanup(); }
static std::unique_ptr<IPCChannelResourcesSingleProcess> makeFirst(Data data)
{
auto ret = std::make_unique<IPCChannelResourcesSingleProcess>();
ret->setFirst(data);
return ret;
}
static std::unique_ptr<IPCChannelResourcesSingleProcess> makeSecond(Data data)
{
auto ret = std::make_unique<IPCChannelResourcesSingleProcess>();
ret->setSecond(data);
return ret;
}
};
// For testing

View file

@ -235,7 +235,7 @@ void TestThreading::testIPCChannel()
{
auto [end_a, thread_b] = make_test_ipc_channel([](IPCChannelEnd end_b) {
// echos back messages. stops if "" is sent
for (;;) {
while (true) {
UASSERT(end_b.recvWithTimeout(-1));
UASSERT(end_b.sendWithTimeout(end_b.getRecvData(), end_b.getRecvSize(), -1));
if (end_b.getRecvSize() == 0)
@ -243,20 +243,29 @@ void TestThreading::testIPCChannel()
}
});
u8 buf[20000] = {};
for (int i = sizeof(buf); i > 0; i -= 100) {
buf[i - 1] = 123;
UASSERT(end_a.exchangeWithTimeout(buf, i, -1));
u8 buf1[20000] = {};
for (int i = sizeof(buf1); i > 0; i -= 100) {
buf1[i - 1] = 123;
UASSERT(end_a.exchangeWithTimeout(buf1, i, -1));
UASSERTEQ(int, end_a.getRecvSize(), i);
UASSERTEQ(int, reinterpret_cast<const u8 *>(end_a.getRecvData())[i - 1], 123);
}
u8 buf2[IPC_CHANNEL_MSG_SIZE * 3 + 10];
end_a.exchange(buf2, IPC_CHANNEL_MSG_SIZE * 3 + 10);
end_a.exchange(buf2, IPC_CHANNEL_MSG_SIZE * 3);
end_a.exchange(buf2, IPC_CHANNEL_MSG_SIZE);
end_a.exchange(buf2, IPC_CHANNEL_MSG_SIZE * 2);
end_a.exchange(buf2, IPC_CHANNEL_MSG_SIZE - 1);
end_a.exchange(buf2, IPC_CHANNEL_MSG_SIZE + 1);
end_a.exchange(buf2, 1);
// stop thread_b
UASSERT(end_a.exchangeWithTimeout(buf, 0, -1));
UASSERT(end_a.exchangeWithTimeout(nullptr, 0, -1));
UASSERTEQ(int, end_a.getRecvSize(), 0);
thread_b.join();
// other side dead ==> should time out
UASSERT(!end_a.exchangeWithTimeout(buf, 0, 200));
UASSERT(!end_a.exchangeWithTimeout(nullptr, 0, 200));
}