1
0
Fork 0
mirror of https://github.com/luanti-org/luanti.git synced 2025-08-11 17:51:04 +00:00

improve code readbility

This commit is contained in:
Desour 2024-10-05 11:43:01 +02:00
parent 47c6f94e87
commit 9b6244c6c2
2 changed files with 114 additions and 107 deletions

View file

@ -20,8 +20,6 @@ with this program; if not, write to the Free Software Foundation, Inc.,
#include "ipc_channel.h" #include "ipc_channel.h"
#include "debug.h" #include "debug.h"
#include "exceptions.h"
#include "porting.h"
#include <cerrno> #include <cerrno>
#include <utility> #include <utility>
#include <cstring> #include <cstring>
@ -30,9 +28,22 @@ with this program; if not, write to the Free Software Foundation, Inc.,
#include <linux/futex.h> #include <linux/futex.h>
#include <sys/syscall.h> #include <sys/syscall.h>
#include <sys/wait.h> #include <sys/wait.h>
#if defined(__i386__) || defined(__x86_64__)
#include <immintrin.h>
#endif #endif
#if defined(IPC_CHANNEL_IMPLEMENTATION_LINUX_FUTEX) && (defined(__i386__) || defined(__x86_64__))
#include <immintrin.h>
#define HAVE_BUSY_WAIT 1
[[maybe_unused]]
static void busy_wait(int n) noexcept
{
for (int i = 0; i < n; i++)
_mm_pause();
}
#else
#define HAVE_BUSY_WAIT 0
#endif #endif
IPCChannelBuffer::IPCChannelBuffer() IPCChannelBuffer::IPCChannelBuffer()
@ -91,17 +102,7 @@ static void post(HANDLE sem)
FATAL_ERROR("ReleaseSemaphore failed unexpectedly"); FATAL_ERROR("ReleaseSemaphore failed unexpectedly");
} }
#else #elif defined(IPC_CHANNEL_IMPLEMENTATION_LINUX_FUTEX)
#if defined(IPC_CHANNEL_IMPLEMENTATION_LINUX_FUTEX)
#if defined(__i386__) || defined(__x86_64__)
static void busy_wait(int n) noexcept
{
for (int i = 0; i < n; i++)
_mm_pause();
}
#endif // defined(__i386__) || defined(__x86_64__)
static int futex(std::atomic<u32> *uaddr, int futex_op, u32 val, static int futex(std::atomic<u32> *uaddr, int futex_op, u32 val,
const struct timespec *timeout, u32 *uaddr2, u32 val3) noexcept const struct timespec *timeout, u32 *uaddr2, u32 val3) noexcept
@ -109,13 +110,10 @@ static int futex(std::atomic<u32> *uaddr, int futex_op, u32 val,
return syscall(SYS_futex, uaddr, futex_op, val, timeout, uaddr2, val3); return syscall(SYS_futex, uaddr, futex_op, val, timeout, uaddr2, val3);
} }
#endif // defined(IPC_CHANNEL_IMPLEMENTATION_LINUX_FUTEX) // timeout is relative
// timeout: relative on linux, and absolute on other posix
// returns false on timeout // returns false on timeout
static bool wait(IPCChannelBuffer *buf, const struct timespec *timeout) noexcept static bool wait(IPCChannelBuffer *buf, const struct timespec *timeout) noexcept
{ {
#if defined(IPC_CHANNEL_IMPLEMENTATION_LINUX_FUTEX)
// try busy waiting // try busy waiting
for (int i = 0; i < 100; i++) { for (int i = 0; i < 100; i++) {
// posted? // posted?
@ -126,10 +124,10 @@ static bool wait(IPCChannelBuffer *buf, const struct timespec *timeout) noexcept
buf->futex.store(0, std::memory_order_relaxed); buf->futex.store(0, std::memory_order_relaxed);
return true; return true;
} }
#if defined(__i386__) || defined(__x86_64__) #if HAVE_BUSY_WAIT
busy_wait(40); busy_wait(40);
#else #else
break; // Busy wait not implemented break;
#endif #endif
} }
// wait with futex // wait with futex
@ -151,7 +149,27 @@ static bool wait(IPCChannelBuffer *buf, const struct timespec *timeout) noexcept
} }
} }
} }
}
static void post(IPCChannelBuffer *buf) noexcept
{
if (buf->futex.exchange(1, std::memory_order_acq_rel) == 2) {
// 2 means reader needs to be notified
int s = futex(&buf->futex, FUTEX_WAKE, 1, nullptr, nullptr, 0);
if (s == -1) {
std::string errmsg = std::string("FUTEX_WAKE failed unexpectedly: ")
+ std::strerror(errno);
FATAL_ERROR(errmsg.c_str());
}
}
}
#elif defined(IPC_CHANNEL_IMPLEMENTATION_POSIX) #elif defined(IPC_CHANNEL_IMPLEMENTATION_POSIX)
// timeout is absolute
// returns false on timeout
static bool wait(IPCChannelBuffer *buf, const struct timespec *timeout) noexcept
{
bool timed_out = false; bool timed_out = false;
pthread_mutex_lock(&buf->mutex); pthread_mutex_lock(&buf->mutex);
if (!buf->posted) { if (!buf->posted) {
@ -163,30 +181,38 @@ static bool wait(IPCChannelBuffer *buf, const struct timespec *timeout) noexcept
buf->posted = false; buf->posted = false;
pthread_mutex_unlock(&buf->mutex); pthread_mutex_unlock(&buf->mutex);
return !timed_out; return !timed_out;
#endif
} }
static void post(IPCChannelBuffer *buf) noexcept static void post(IPCChannelBuffer *buf) noexcept
{ {
#if defined(IPC_CHANNEL_IMPLEMENTATION_LINUX_FUTEX)
if (buf->futex.exchange(1, std::memory_order_acq_rel) == 2) {
// 2 means reader needs to be notified
int s = futex(&buf->futex, FUTEX_WAKE, 1, nullptr, nullptr, 0);
if (s == -1) {
std::string errmsg = std::string("FUTEX_WAKE failed unexpectedly: ")
+ std::strerror(errno);
FATAL_ERROR(errmsg.c_str());
}
}
#elif defined(IPC_CHANNEL_IMPLEMENTATION_POSIX)
pthread_mutex_lock(&buf->mutex); pthread_mutex_lock(&buf->mutex);
buf->posted = true; buf->posted = true;
pthread_cond_broadcast(&buf->cond); pthread_cond_broadcast(&buf->cond);
pthread_mutex_unlock(&buf->mutex); pthread_mutex_unlock(&buf->mutex);
#endif
} }
#endif // !defined(IPC_CHANNEL_IMPLEMENTATION_WIN32) #endif
#if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32)
static bool wait_in(IPCChannelEnd::Dir *dir, DWORD timeout)
{
return wait(dir->sem_in, timeout);
}
#else
static bool wait_in(IPCChannelEnd::Dir *dir, const struct timespec *timeout)
{
return wait(dir->buf_in, timeout);
}
#endif
static void post_out(IPCChannelEnd::Dir *dir)
{
#if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32)
post(dir->sem_out);
#else
post(dir->buf_out);
#endif
}
#if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32) #if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32)
static DWORD get_timeout(int timeout_ms) static DWORD get_timeout(int timeout_ms)
@ -226,9 +252,9 @@ IPCChannelEnd IPCChannelEnd::makeA(std::unique_ptr<IPCChannelResources> resource
#if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32) #if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32)
HANDLE sem_a = resources->data.sem_a; HANDLE sem_a = resources->data.sem_a;
HANDLE sem_b = resources->data.sem_b; HANDLE sem_b = resources->data.sem_b;
return IPCChannelEnd(std::move(resources), &shared->a, &shared->b, sem_a, sem_b); return IPCChannelEnd(std::move(resources), Dir{&shared->a, &shared->b, sem_a, sem_b});
#else #else
return IPCChannelEnd(std::move(resources), &shared->a, &shared->b); return IPCChannelEnd(std::move(resources), Dir{&shared->a, &shared->b});
#endif // !defined(IPC_CHANNEL_IMPLEMENTATION_WIN32) #endif // !defined(IPC_CHANNEL_IMPLEMENTATION_WIN32)
} }
@ -238,22 +264,18 @@ IPCChannelEnd IPCChannelEnd::makeB(std::unique_ptr<IPCChannelResources> resource
#if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32) #if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32)
HANDLE sem_a = resources->data.sem_a; HANDLE sem_a = resources->data.sem_a;
HANDLE sem_b = resources->data.sem_b; HANDLE sem_b = resources->data.sem_b;
return IPCChannelEnd(std::move(resources), &shared->b, &shared->a, sem_b, sem_a); return IPCChannelEnd(std::move(resources), Dir{&shared->b, &shared->a, sem_b, sem_a});
#else #else
return IPCChannelEnd(std::move(resources), &shared->b, &shared->a); return IPCChannelEnd(std::move(resources), Dir{&shared->b, &shared->a});
#endif // !defined(IPC_CHANNEL_IMPLEMENTATION_WIN32) #endif // !defined(IPC_CHANNEL_IMPLEMENTATION_WIN32)
} }
void IPCChannelEnd::sendSmall(const void *data, size_t size) noexcept void IPCChannelEnd::sendSmall(const void *data, size_t size) noexcept
{ {
write_once(&m_out->size, size); write_once(&m_dir.buf_out->size, size);
if (size != 0) if (size != 0)
memcpy(m_out->data, data, size); memcpy(m_dir.buf_out->data, data, size);
#if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32) post_out(&m_dir);
post(m_sem_out);
#else
post(m_out);
#endif
} }
bool IPCChannelEnd::sendLarge(const void *data, size_t size, int timeout_ms) noexcept bool IPCChannelEnd::sendLarge(const void *data, size_t size, int timeout_ms) noexcept
@ -261,31 +283,21 @@ bool IPCChannelEnd::sendLarge(const void *data, size_t size, int timeout_ms) noe
#if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32) #if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32)
DWORD timeout = get_timeout(timeout_ms); DWORD timeout = get_timeout(timeout_ms);
#else #else
struct timespec timeout; struct timespec timeout_s;
struct timespec *timeoutp = set_timespec(&timeout, timeout_ms); struct timespec *timeout = set_timespec(&timeout_s, timeout_ms);
#endif #endif
write_once(&m_out->size, size); write_once(&m_dir.buf_out->size, size);
do { do {
memcpy(m_out->data, data, IPC_CHANNEL_MSG_SIZE); memcpy(m_dir.buf_out->data, data, IPC_CHANNEL_MSG_SIZE);
#if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32) post_out(&m_dir);
post(m_sem_out); if (!wait_in(&m_dir, timeout)) // TODO: always relative timeout, or always absolute
if (!wait(m_sem_in, timeout))
return false; return false;
#else
post(m_out);
if (!wait(m_in, timeoutp))
return false;
#endif
size -= IPC_CHANNEL_MSG_SIZE; size -= IPC_CHANNEL_MSG_SIZE;
data = (u8 *)data + IPC_CHANNEL_MSG_SIZE; data = (u8 *)data + IPC_CHANNEL_MSG_SIZE;
} while (size > IPC_CHANNEL_MSG_SIZE); } while (size > IPC_CHANNEL_MSG_SIZE);
if (size != 0) if (size != 0)
memcpy(m_out->data, data, size); memcpy(m_dir.buf_out->data, data, size);
#if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32) post_out(&m_dir);
post(m_sem_out);
#else
post(m_out);
#endif
return true; return true;
} }
@ -293,15 +305,13 @@ bool IPCChannelEnd::recvWithTimeout(int timeout_ms) noexcept
{ {
#if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32) #if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32)
DWORD timeout = get_timeout(timeout_ms); DWORD timeout = get_timeout(timeout_ms);
if (!wait(m_sem_in, timeout))
return false;
#else #else
struct timespec timeout; struct timespec timeout_s;
struct timespec *timeoutp = set_timespec(&timeout, timeout_ms); struct timespec *timeout = set_timespec(&timeout_s, timeout_ms);
if (!wait(m_in, timeoutp))
return false;
#endif #endif
size_t size = read_once(&m_in->size); if (!wait_in(&m_dir, timeout))
return false;
size_t size = read_once(&m_dir.buf_in->size);
m_recv_size = size; m_recv_size = size;
// Note about memcpy: If the other thread is evil, it might change the contents // Note about memcpy: If the other thread is evil, it might change the contents
// of the memory while it's memcopied. We're assuming here that memcpy doesn't // of the memory while it's memcopied. We're assuming here that memcpy doesn't
@ -309,7 +319,7 @@ bool IPCChannelEnd::recvWithTimeout(int timeout_ms) noexcept
if (size <= IPC_CHANNEL_MSG_SIZE) { if (size <= IPC_CHANNEL_MSG_SIZE) {
// small msg // small msg
// (m_large_recv.size() is always >= IPC_CHANNEL_MSG_SIZE) // (m_large_recv.size() is always >= IPC_CHANNEL_MSG_SIZE)
memcpy(m_large_recv.data(), m_in->data, size); memcpy(m_large_recv.data(), m_dir.buf_in->data, size);
} else { } else {
// large msg // large msg
try { try {
@ -322,20 +332,14 @@ bool IPCChannelEnd::recvWithTimeout(int timeout_ms) noexcept
} }
u8 *recv_data = m_large_recv.data(); u8 *recv_data = m_large_recv.data();
do { do {
memcpy(recv_data, m_in->data, IPC_CHANNEL_MSG_SIZE); memcpy(recv_data, m_dir.buf_in->data, IPC_CHANNEL_MSG_SIZE);
size -= IPC_CHANNEL_MSG_SIZE; size -= IPC_CHANNEL_MSG_SIZE;
recv_data += IPC_CHANNEL_MSG_SIZE; recv_data += IPC_CHANNEL_MSG_SIZE;
#if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32) post_out(&m_dir);
post(m_sem_out); if (!wait_in(&m_dir, timeout))
if (!wait(m_sem_in, timeout))
return false; return false;
#else
post(m_out);
if (!wait(m_in, timeoutp))
return false;
#endif
} while (size > IPC_CHANNEL_MSG_SIZE); } while (size > IPC_CHANNEL_MSG_SIZE);
memcpy(recv_data, m_in->data, size); memcpy(recv_data, m_dir.buf_in->data, size);
} }
return true; return true;
} }

View file

@ -23,8 +23,6 @@ with this program; if not, write to the Free Software Foundation, Inc.,
#include "irrlichttypes.h" #include "irrlichttypes.h"
#include "util/basic_macros.h" #include "util/basic_macros.h"
#include <memory> #include <memory>
#include <string>
#include <type_traits>
#include <vector> #include <vector>
#include <atomic> #include <atomic>
@ -85,6 +83,7 @@ struct IPCChannelBuffer
~IPCChannelBuffer(); // Note: only destruct once, i.e. in one process ~IPCChannelBuffer(); // Note: only destruct once, i.e. in one process
}; };
// Data in shared memory
struct IPCChannelShared struct IPCChannelShared
{ {
// Both ends unmap, but last deleter also deletes shared resources. // Both ends unmap, but last deleter also deletes shared resources.
@ -94,6 +93,18 @@ struct IPCChannelShared
IPCChannelBuffer b{}; IPCChannelBuffer b{};
}; };
struct IPCChannelDirection
{
IPCChannelBuffer *buf_in;
IPCChannelBuffer *buf_out;
#if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32)
HANDLE sem_in;
HANDLE sem_out;
#endif
};
// Each end holds this. One is A, one is B.
// Implementors of this struct decide how to allocate buffers (i.e. malloc or mmap).
struct IPCChannelResources struct IPCChannelResources
{ {
// new struct, because the win32 #if is annoying // new struct, because the win32 #if is annoying
@ -115,7 +126,7 @@ struct IPCChannelResources
data = data_; data = data_;
} }
// Used for data_ that is already managed by a IPCChannelResources (grab() // Used for data_ that is already managed by an IPCChannelResources (grab()
// semantics) // semantics)
bool setSecond(Data data_) bool setSecond(Data data_)
{ {
@ -159,6 +170,17 @@ struct IPCChannelResources
class IPCChannelEnd class IPCChannelEnd
{ {
public: public:
// Direction. References into IPCChannelResources.
struct Dir
{
IPCChannelBuffer *buf_in = nullptr;
IPCChannelBuffer *buf_out = nullptr;
#if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32)
HANDLE sem_in;
HANDLE sem_out;
#endif
};
IPCChannelEnd() = default; IPCChannelEnd() = default;
static IPCChannelEnd makeA(std::unique_ptr<IPCChannelResources> resources); static IPCChannelEnd makeA(std::unique_ptr<IPCChannelResources> resources);
@ -217,23 +239,9 @@ public:
inline size_t getRecvSize() const noexcept { return m_recv_size; } inline size_t getRecvSize() const noexcept { return m_recv_size; }
private: private:
#if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32) IPCChannelEnd(std::unique_ptr<IPCChannelResources> resources, Dir dir) :
IPCChannelEnd( m_resources(std::move(resources)), m_dir(dir)
std::unique_ptr<IPCChannelResources> resources,
IPCChannelBuffer *in, IPCChannelBuffer *out,
HANDLE sem_in, HANDLE sem_out) :
m_resources(std::move(resources)),
m_in(in), m_out(out),
m_sem_in(sem_in), m_sem_out(sem_out)
{} {}
#else
IPCChannelEnd(
std::unique_ptr<IPCChannelResources> resources,
IPCChannelBuffer *in, IPCChannelBuffer *out) :
m_resources(std::move(resources)),
m_in(in), m_out(out)
{}
#endif
// TODO: u8 *, or string_view? // TODO: u8 *, or string_view?
void sendSmall(const void *data, size_t size) noexcept; void sendSmall(const void *data, size_t size) noexcept;
@ -242,12 +250,7 @@ private:
bool sendLarge(const void *data, size_t size, int timeout_ms) noexcept; bool sendLarge(const void *data, size_t size, int timeout_ms) noexcept;
std::unique_ptr<IPCChannelResources> m_resources; std::unique_ptr<IPCChannelResources> m_resources;
IPCChannelBuffer *m_in = nullptr; Dir m_dir;
IPCChannelBuffer *m_out = nullptr;
#if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32)
HANDLE m_sem_in;
HANDLE m_sem_out;
#endif
size_t m_recv_size = 0; size_t m_recv_size = 0;
// we always copy from the shared buffer into this // we always copy from the shared buffer into this
// (this buffer only grows) // (this buffer only grows)