1
0
Fork 0
mirror of https://github.com/luanti-org/luanti.git synced 2025-08-06 17:41:04 +00:00

Add IPC channel (for sscsm)

This commit is contained in:
Jude Melton-Houghton 2023-05-19 01:09:21 +02:00 committed by Desour
parent ba62808fe8
commit bda818840e
4 changed files with 486 additions and 0 deletions

View file

@ -2,5 +2,6 @@ set(threading_SRCS
${CMAKE_CURRENT_SOURCE_DIR}/event.cpp
${CMAKE_CURRENT_SOURCE_DIR}/thread.cpp
${CMAKE_CURRENT_SOURCE_DIR}/semaphore.cpp
${CMAKE_CURRENT_SOURCE_DIR}/ipc_channel.cpp
PARENT_SCOPE)

View file

@ -0,0 +1,281 @@
/*
Minetest
Copyright (C) 2022 Desour <vorunbekannt75@web.de>
Copyright (C) 2022 TurkeyMcMac, Jude Melton-Houghton <jwmhjwmh@gmail.com>
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 2.1 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License along
with this program; if not, write to the Free Software Foundation, Inc.,
51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#include "ipc_channel.h"
#include "debug.h"
#include "exceptions.h"
#include "porting.h"
#include <errno.h>
#include <utility>
#if defined(__linux__)
#include <linux/futex.h>
#include <string.h>
#include <sys/syscall.h>
#include <sys/wait.h>
#if defined(__i386__) || defined(__x86_64__)
#include <immintrin.h>
#endif
#endif
IPCChannelBuffer::IPCChannelBuffer()
{
#if !defined(__linux__) && !defined(_WIN32)
pthread_condattr_t condattr;
pthread_mutexattr_t mutexattr;
if (pthread_condattr_init(&condattr) != 0)
goto error_condattr_init;
if (pthread_mutexattr_init(&mutexattr) != 0)
goto error_mutexattr_init;
if (pthread_condattr_setpshared(&condattr, 1) != 0)
goto error_condattr_setpshared;
if (pthread_mutexattr_setpshared(&mutexattr, 1) != 0)
goto error_mutexattr_setpshared;
if (pthread_cond_init(&cond, &condattr) != 0)
goto error_cond_init;
if (pthread_mutex_init(&mutex, &mutexattr) != 0)
goto error_mutex_init;
pthread_mutexattr_destroy(&mutexattr);
pthread_condattr_destroy(&condattr);
return;
error_mutex_init:
pthread_cond_destroy(&cond);
error_cond_init:
error_mutexattr_setpshared:
error_condattr_setpshared:
pthread_mutexattr_destroy(&mutexattr);
error_mutexattr_init:
pthread_condattr_destroy(&condattr);
error_condattr_init:
throw BaseException("Unable to initialize IPCChannelBuffer");
#endif // !defined(__linux__) && !defined(_WIN32)
}
IPCChannelBuffer::~IPCChannelBuffer()
{
#if !defined(__linux__) && !defined(_WIN32)
pthread_mutex_destroy(&mutex);
pthread_cond_destroy(&cond);
#endif // !defined(__linux__) && !defined(_WIN32)
}
#if defined(_WIN32)
static bool wait(HANDLE sem, DWORD timeout)
{
return WaitForSingleObject(sem, timeout) == WAIT_OBJECT_0;
}
static void post(HANDLE sem)
{
if (!ReleaseSemaphore(sem, 1, nullptr))
FATAL_ERROR("ReleaseSemaphore failed unexpectedly");
}
#else
#if defined(__linux__)
#if defined(__i386__) || defined(__x86_64__)
static void busy_wait(int n) noexcept
{
for (int i = 0; i < n; i++)
_mm_pause();
}
#endif // defined(__i386__) || defined(__x86_64__)
static int futex(std::atomic<u32> *uaddr, int futex_op, u32 val,
const struct timespec *timeout, u32 *uaddr2, u32 val3) noexcept
{
return syscall(SYS_futex, uaddr, futex_op, val, timeout, uaddr2, val3);
}
#endif // defined(__linux__)
static bool wait(IPCChannelBuffer *buf, const struct timespec *timeout) noexcept
{
#if defined(__linux__)
// try busy waiting
for (int i = 0; i < 100; i++) {
// posted?
if (buf->futex.exchange(0) == 1)
return true; // yes
#if defined(__i386__) || defined(__x86_64__)
busy_wait(40);
#else
break; // Busy wait not implemented
#endif
}
// wait with futex
while (true) {
// write 2 to show that we're futexing
if (buf->futex.exchange(2) == 1) {
// futex was posted => change 2 to 0 (or 1 to 1)
buf->futex.fetch_and(1);
return true;
}
int s = futex(&buf->futex, FUTEX_WAIT, 2, timeout, nullptr, 0);
if (s == -1 && errno != EAGAIN)
return false;
}
#else
bool timed_out = false;
pthread_mutex_lock(&buf->mutex);
if (!buf->posted) {
if (timeout)
timed_out = pthread_cond_timedwait(&buf->cond, &buf->mutex, timeout) == ETIMEDOUT;
else
pthread_cond_wait(&buf->cond, &buf->mutex);
}
buf->posted = false;
pthread_mutex_unlock(&buf->mutex);
return !timed_out;
#endif // !defined(__linux__)
}
static void post(IPCChannelBuffer *buf) noexcept
{
#if defined(__linux__)
if (buf->futex.exchange(1) == 2) {
int s = futex(&buf->futex, FUTEX_WAKE, 1, nullptr, nullptr, 0);
if (s == -1)
FATAL_ERROR("FUTEX_WAKE failed unexpectedly");
}
#else
pthread_mutex_lock(&buf->mutex);
buf->posted = true;
pthread_cond_broadcast(&buf->cond);
pthread_mutex_unlock(&buf->mutex);
#endif // !defined(__linux__)
}
#endif // !defined(_WIN32)
#if defined(_WIN32)
static DWORD get_timeout(int timeout_ms)
{
return timeout_ms < 0 ? INFINITE : (DWORD)timeout_ms;
}
#else
static struct timespec *set_timespec(struct timespec *ts, int ms)
{
if (ms < 0)
return nullptr;
u64 msu = ms;
#if !defined(__linux__)
msu += porting::getTimeMs(); // Absolute time
#endif
ts->tv_sec = msu / 1000;
ts->tv_nsec = msu % 1000 * 1000000UL;
return ts;
}
#endif // !defined(_WIN32)
bool IPCChannelEnd::sendSmall(const void *data, size_t size) noexcept
{
m_out->size = size;
memcpy(m_out->data, data, size);
#if defined(_WIN32)
post(m_sem_out);
#else
post(m_out);
#endif
return true;
}
bool IPCChannelEnd::sendLarge(const void *data, size_t size, int timeout_ms) noexcept
{
#if defined(_WIN32)
DWORD timeout = get_timeout(timeout_ms);
#else
struct timespec timeout, *timeoutp = set_timespec(&timeout, timeout_ms);
#endif
m_out->size = size;
do {
memcpy(m_out->data, data, IPC_CHANNEL_MSG_SIZE);
#if defined(_WIN32)
post(m_sem_out);
#else
post(m_out);
#endif
#if defined(_WIN32)
if (!wait(m_sem_in, timeout))
#else
if (!wait(m_in, timeoutp))
#endif
return false;
size -= IPC_CHANNEL_MSG_SIZE;
data = (u8 *)data + IPC_CHANNEL_MSG_SIZE;
} while (size > IPC_CHANNEL_MSG_SIZE);
memcpy(m_out->data, data, size);
#if defined(_WIN32)
post(m_sem_out);
#else
post(m_out);
#endif
return true;
}
bool IPCChannelEnd::recv(int timeout_ms) noexcept
{
#if defined(_WIN32)
DWORD timeout = get_timeout(timeout_ms);
#else
struct timespec timeout, *timeoutp = set_timespec(&timeout, timeout_ms);
#endif
#if defined(_WIN32)
if (!wait(m_sem_in, timeout))
#else
if (!wait(m_in, timeoutp))
#endif
return false;
size_t size = m_in->size;
if (size <= IPC_CHANNEL_MSG_SIZE) {
m_recv_size = size;
m_recv_data = m_in->data;
} else {
try {
m_large_recv.resize(size);
} catch (...) {
return false;
}
u8 *recv_data = m_large_recv.data();
m_recv_size = size;
m_recv_data = recv_data;
do {
memcpy(recv_data, m_in->data, IPC_CHANNEL_MSG_SIZE);
size -= IPC_CHANNEL_MSG_SIZE;
recv_data += IPC_CHANNEL_MSG_SIZE;
#if defined(_WIN32)
post(m_sem_out);
#else
post(m_out);
#endif
#if defined(_WIN32)
if (!wait(m_sem_in, timeout))
#else
if (!wait(m_in, timeoutp))
#endif
return false;
} while (size > IPC_CHANNEL_MSG_SIZE);
memcpy(recv_data, m_in->data, size);
}
return true;
}

143
src/threading/ipc_channel.h Normal file
View file

@ -0,0 +1,143 @@
/*
Minetest
Copyright (C) 2022 Desour <vorunbekannt75@web.de>
Copyright (C) 2022 TurkeyMcMac, Jude Melton-Houghton <jwmhjwmh@gmail.com>
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 2.1 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License along
with this program; if not, write to the Free Software Foundation, Inc.,
51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#pragma once
#include "irrlichttypes.h"
#include <string>
#include <type_traits>
#include <vector>
#if defined(_WIN32)
#include <windows.h>
#elif defined(__linux__)
#include <atomic>
#else
#include <pthread.h>
#endif
/*
An IPC channel is used for synchronous communication between two processes.
Sending two messages in succession from one end is not allowed; messages
must alternate back and forth.
IPCChannelShared is situated in shared memory and is used by both ends of
the channel.
*/
#define IPC_CHANNEL_MSG_SIZE 8192U
struct IPCChannelBuffer
{
#if !defined(_WIN32)
#if defined(__linux__)
std::atomic<u32> futex = ATOMIC_VAR_INIT(0U);
#else
pthread_cond_t cond;
pthread_mutex_t mutex;
// TODO: use atomic?
bool posted = false;
#endif
#endif // !defined(_WIN32)
size_t size;
u8 data[IPC_CHANNEL_MSG_SIZE];
IPCChannelBuffer();
~IPCChannelBuffer();
};
struct IPCChannelShared
{
IPCChannelBuffer a;
IPCChannelBuffer b;
};
class IPCChannelEnd
{
public:
IPCChannelEnd() = default;
#if defined(_WIN32)
static IPCChannelEnd makeA(IPCChannelShared *shared, HANDLE sem_a, HANDLE sem_b)
{
return IPCChannelEnd(&shared->a, &shared->b, sem_a, sem_b);
}
static IPCChannelEnd makeB(IPCChannelShared *shared, HANDLE sem_a, HANDLE sem_b)
{
return IPCChannelEnd(&shared->b, &shared->a, sem_b, sem_a);
}
#else
static IPCChannelEnd makeA(IPCChannelShared *shared)
{
return IPCChannelEnd(&shared->a, &shared->b);
}
static IPCChannelEnd makeB(IPCChannelShared *shared)
{
return IPCChannelEnd(&shared->b, &shared->a);
}
#endif // !defined(_WIN32)
// If send, recv, or exchange return false, stop using the channel.
// Note: timeouts may be for receiving any response, not a whole message.
bool send(const void *data, size_t size, int timeout_ms = -1) noexcept
{
if (size <= IPC_CHANNEL_MSG_SIZE) {
return sendSmall(data, size);
} else {
return sendLarge(data, size, timeout_ms);
}
}
bool recv(int timeout_ms = -1) noexcept;
bool exchange(const void *data, size_t size, int timeout_ms = -1) noexcept
{
return send(data, size, timeout_ms) && recv(timeout_ms);
}
// Get information about the last received message
inline const void *getRecvData() const noexcept { return m_recv_data; }
inline size_t getRecvSize() const noexcept { return m_recv_size; }
private:
#if defined(_WIN32)
IPCChannelEnd(IPCChannelBuffer *in, IPCChannelBuffer *out, HANDLE sem_in, HANDLE sem_out):
m_in(in), m_out(out), m_sem_in(sem_in), m_sem_out(sem_out)
{}
#else
IPCChannelEnd(IPCChannelBuffer *in, IPCChannelBuffer *out): m_in(in), m_out(out) {}
#endif
bool sendSmall(const void *data, size_t size) noexcept;
bool sendLarge(const void *data, size_t size, int timeout_ms) noexcept;
IPCChannelBuffer *m_in = nullptr;
IPCChannelBuffer *m_out = nullptr;
#if defined(_WIN32)
HANDLE m_sem_in;
HANDLE m_sem_out;
#endif
const void *m_recv_data;
size_t m_recv_size;
std::vector<u8> m_large_recv;
};

View file

@ -6,6 +6,10 @@
#include <atomic>
#include <iostream>
#if defined(_WIN32)
#include <windows.h>
#endif
#include "threading/ipc_channel.h"
#include "threading/semaphore.h"
#include "threading/thread.h"
@ -19,6 +23,7 @@ public:
void testStartStopWait();
void testAtomicSemaphoreThread();
void testTLS();
void testIPCChannel();
};
static TestThreading g_test_instance;
@ -28,6 +33,7 @@ void TestThreading::runTests(IGameDef *gamedef)
TEST(testStartStopWait);
TEST(testAtomicSemaphoreThread);
TEST(testTLS);
TEST(testIPCChannel);
}
class SimpleTestThread : public Thread {
@ -227,3 +233,58 @@ void TestThreading::testTLS()
}
}
}
void TestThreading::testIPCChannel()
{
#if defined(_WIN32)
HANDLE sem_a = CreateSemaphoreA(nullptr, 0, 1, nullptr);
UASSERT(sem_a != INVALID_HANDLE_VALUE);
HANDLE sem_b = CreateSemaphoreA(nullptr, 0, 1, nullptr);
UASSERT(sem_b != INVALID_HANDLE_VALUE);
#endif
IPCChannelShared shared, *sharedp = &shared;
#if defined(_WIN32)
IPCChannelEnd end_a = IPCChannelEnd::makeA(sharedp, sem_a, sem_b);
#else
IPCChannelEnd end_a = IPCChannelEnd::makeA(sharedp);
#endif
std::thread thread_b([=] {
#if defined(_WIN32)
IPCChannelEnd end_b = IPCChannelEnd::makeB(sharedp, sem_a, sem_b);
#else
IPCChannelEnd end_b = IPCChannelEnd::makeB(sharedp);
#endif
for (;;) {
end_b.recv();
end_b.send(end_b.getRecvData(), end_b.getRecvSize());
if (end_b.getRecvSize() == 0)
break;
}
});
char buf[20000] = {};
for (int i = sizeof(buf); i > 0; i -= 1000) {
buf[i - 1] = 123;
end_a.exchange(buf, i);
UASSERTEQ(int, end_a.getRecvSize(), i);
UASSERTEQ(int, ((const char *)end_a.getRecvData())[i - 1], 123);
}
end_a.exchange(buf, 0);
UASSERTEQ(int, end_a.getRecvSize(), 0);
thread_b.join();
UASSERT(!end_a.exchange(buf, 0, 1000));
#if defined(_WIN32)
CloseHandle(sem_b);
CloseHandle(sem_a);
#endif
}