1
0
Fork 0
mirror of https://github.com/luanti-org/luanti.git synced 2025-08-11 17:51:04 +00:00
luanti/src/threading/ipc_channel.h

174 lines
4.9 KiB
C
Raw Normal View History

2023-05-19 01:09:21 +02:00
/*
Minetest
Copyright (C) 2022 DS
2023-05-19 01:09:21 +02:00
Copyright (C) 2022 TurkeyMcMac, Jude Melton-Houghton <jwmhjwmh@gmail.com>
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as published by
the Free Software Foundation; either version 2.1 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public License along
with this program; if not, write to the Free Software Foundation, Inc.,
51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#pragma once
#include "irrlichttypes.h"
#include <memory>
2023-05-19 01:09:21 +02:00
#include <string>
#include <type_traits>
#include <vector>
2023-05-19 01:09:21 +02:00
#if defined(_WIN32)
#define IPC_CHANNEL_IMPLEMENTATION_WIN32
2023-05-19 01:09:21 +02:00
#elif defined(__linux__)
#define IPC_CHANNEL_IMPLEMENTATION_LINUX_FUTEX
2023-05-19 01:09:21 +02:00
#else
#define IPC_CHANNEL_IMPLEMENTATION_POSIX
#endif
#if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32)
#include <windows.h>
#elif defined(IPC_CHANNEL_IMPLEMENTATION_LINUX_FUTEX)
#include <atomic>
#elif defined(IPC_CHANNEL_IMPLEMENTATION_POSIX)
2023-05-19 01:09:21 +02:00
#include <pthread.h>
#endif
/*
An IPC channel is used for synchronous communication between two processes.
Sending two messages in succession from one end is not allowed; messages
must alternate back and forth.
IPCChannelShared is situated in shared memory and is used by both ends of
the channel.
There are currently 3 implementations for synchronisation:
* win32: uses win32 semaphore
* linux: uses futex, and does busy waiting if on x86/x86_64
* other posix: uses posix mutex and condition variable
2023-05-19 01:09:21 +02:00
*/
#define IPC_CHANNEL_MSG_SIZE 0x2000U
2023-05-19 01:09:21 +02:00
struct IPCChannelBuffer
{
#if defined(IPC_CHANNEL_IMPLEMENTATION_LINUX_FUTEX)
// possible values:
// 0: futex is not posted. reader will check value before blocking => no
// notify needed when posting
// 1: futex is posted
// 2: futex is not posted. reader is waiting with futex syscall, and needs
// to be notified
std::atomic<u32> futex{0};
#elif defined(IPC_CHANNEL_IMPLEMENTATION_POSIX)
2023-05-19 01:09:21 +02:00
pthread_cond_t cond;
pthread_mutex_t mutex;
bool posted = false; // protected by mutex
2023-05-19 01:09:21 +02:00
#endif
// Note: If the other side isn't acting cooperatively, they might write to
// this at any times. So we must make sure to copy out the data once, and
// only access that copy.
size_t size = 0;
u8 data[IPC_CHANNEL_MSG_SIZE] = {};
2023-05-19 01:09:21 +02:00
IPCChannelBuffer();
~IPCChannelBuffer(); // Note: only destruct once, i.e. in one process
2023-05-19 01:09:21 +02:00
};
struct IPCChannelShared
{
IPCChannelBuffer a;
IPCChannelBuffer b;
};
// opaque owner for the shared mem and stuff
// users have to implement this
struct IPCChannelStuff
{
virtual ~IPCChannelStuff() = default;
virtual IPCChannelShared *getShared() = 0;
#if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32)
virtual HANDLE getSemA() = 0;
virtual HANDLE getSemB() = 0;
#endif
};
2023-05-19 01:09:21 +02:00
class IPCChannelEnd
{
public:
IPCChannelEnd() = default;
static IPCChannelEnd makeA(std::unique_ptr<IPCChannelStuff> stuff);
static IPCChannelEnd makeB(std::unique_ptr<IPCChannelStuff> stuff);
2023-05-19 01:09:21 +02:00
// If send, recv, or exchange return false (=timeout), stop using the channel.
2023-05-19 01:09:21 +02:00
// Note: timeouts may be for receiving any response, not a whole message.
bool send(const void *data, size_t size, int timeout_ms = -1) noexcept
{
if (size <= IPC_CHANNEL_MSG_SIZE) {
sendSmall(data, size);
return true;
2023-05-19 01:09:21 +02:00
} else {
return sendLarge(data, size, timeout_ms);
}
}
bool recv(int timeout_ms = -1) noexcept;
bool exchange(const void *data, size_t size, int timeout_ms = -1) noexcept
{
return send(data, size, timeout_ms) && recv(timeout_ms);
}
// Get the content of the last received message
inline const void *getRecvData() const noexcept { return m_large_recv.data(); }
2023-05-19 01:09:21 +02:00
inline size_t getRecvSize() const noexcept { return m_recv_size; }
private:
#if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32)
IPCChannelEnd(
std::unique_ptr<IPCChannelStuff> stuff,
IPCChannelBuffer *in, IPCChannelBuffer *out,
HANDLE sem_in, HANDLE sem_out) :
m_stuff(std::move(stuff)),
m_in(in), m_out(out),
m_sem_in(sem_in), m_sem_out(sem_out)
2023-05-19 01:09:21 +02:00
{}
#else
IPCChannelEnd(
std::unique_ptr<IPCChannelStuff> stuff,
IPCChannelBuffer *in, IPCChannelBuffer *out) :
m_stuff(std::move(stuff)),
m_in(in), m_out(out)
{}
2023-05-19 01:09:21 +02:00
#endif
void sendSmall(const void *data, size_t size) noexcept;
2023-05-19 01:09:21 +02:00
// returns false on timeout
2023-05-19 01:09:21 +02:00
bool sendLarge(const void *data, size_t size, int timeout_ms) noexcept;
std::unique_ptr<IPCChannelStuff> m_stuff;
2023-05-19 01:09:21 +02:00
IPCChannelBuffer *m_in = nullptr;
IPCChannelBuffer *m_out = nullptr;
#if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32)
2023-05-19 01:09:21 +02:00
HANDLE m_sem_in;
HANDLE m_sem_out;
#endif
size_t m_recv_size = 0;
// we always copy from the shared buffer into this
// (this buffer only grows)
std::vector<u8> m_large_recv = std::vector<u8>(IPC_CHANNEL_MSG_SIZE);
2023-05-19 01:09:21 +02:00
};