1
0
Fork 0
mirror of https://github.com/luanti-org/luanti.git synced 2025-08-11 17:51:04 +00:00

timeout_ms_abs

This commit is contained in:
Desour 2024-10-05 12:18:24 +02:00
parent d6dd5b4d4f
commit 37358bd8fe

View file

@ -47,48 +47,6 @@ static void busy_wait(int n) noexcept
#define HAVE_BUSY_WAIT 0 #define HAVE_BUSY_WAIT 0
#endif #endif
IPCChannelBuffer::IPCChannelBuffer()
{
#if defined(IPC_CHANNEL_IMPLEMENTATION_POSIX)
pthread_condattr_t condattr;
pthread_mutexattr_t mutexattr;
if (pthread_condattr_init(&condattr) != 0)
goto error_condattr_init;
if (pthread_mutexattr_init(&mutexattr) != 0)
goto error_mutexattr_init;
if (pthread_condattr_setpshared(&condattr, PTHREAD_PROCESS_SHARED) != 0)
goto error_condattr_setpshared;
if (pthread_mutexattr_setpshared(&mutexattr, PTHREAD_PROCESS_SHARED) != 0)
goto error_mutexattr_setpshared;
if (pthread_cond_init(&cond, &condattr) != 0)
goto error_cond_init;
if (pthread_mutex_init(&mutex, &mutexattr) != 0)
goto error_mutex_init;
pthread_mutexattr_destroy(&mutexattr);
pthread_condattr_destroy(&condattr);
return;
error_mutex_init:
pthread_cond_destroy(&cond);
error_cond_init:
error_mutexattr_setpshared:
error_condattr_setpshared:
pthread_mutexattr_destroy(&mutexattr);
error_mutexattr_init:
pthread_condattr_destroy(&condattr);
error_condattr_init:
throw BaseException("Unable to initialize IPCChannelBuffer");
#endif // defined(IPC_CHANNEL_IMPLEMENTATION_POSIX)
}
IPCChannelBuffer::~IPCChannelBuffer()
{
#if defined(IPC_CHANNEL_IMPLEMENTATION_POSIX)
pthread_mutex_destroy(&mutex);
pthread_cond_destroy(&cond);
#endif // defined(IPC_CHANNEL_IMPLEMENTATION_POSIX)
}
#if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32) #if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32)
// returns false on timeout // returns false on timeout
@ -194,28 +152,37 @@ static void post(IPCChannelBuffer *buf) noexcept
#endif #endif
static bool wait_in(IPCChannelEnd::Dir *dir, int timeout_ms, u64 t0) // timeout_ms_abs: absolute timeout (using porting::getTimeMs()), or 0 for no timeout
// returns false on timeout
static bool wait_in(IPCChannelEnd::Dir *dir, u64 timeout_ms_abs)
{ {
#if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32) #if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32)
// Relative time
DWORD timeout = INFINITE; DWORD timeout = INFINITE;
if (timeout_ms >= 0) { if (timeout_ms_abs > 0) {
timeout = (DWORD)timeout_ms; u64 tnow = porting::getTimeMs();
timeout_msu -= porting::getTimeMs() - t0; // Relative time if (tnow > timeout_ms_abs)
return false;
timeout = (DWORD)(timeout_ms_abs - tnow);
} }
return wait(dir->sem_in, timeout); return wait(dir->sem_in, timeout);
#else #else
struct timespec timeout; struct timespec timeout;
struct timespec *timeoutp = nullptr; struct timespec *timeoutp = nullptr;
if (timeout_ms >= 0) { if (timeout_ms_abs > 0) {
u64 timeout_msu = timeout_ms;
#if defined(IPC_CHANNEL_IMPLEMENTATION_LINUX_FUTEX) #if defined(IPC_CHANNEL_IMPLEMENTATION_LINUX_FUTEX)
timeout_msu -= porting::getTimeMs() - t0; // Relative time // Relative time
u64 tnow = porting::getTimeMs();
if (tnow > timeout_ms_abs)
return false;
u64 timeout_ms = timeout_ms_abs - tnow;
#elif defined(IPC_CHANNEL_IMPLEMENTATION_POSIX) #elif defined(IPC_CHANNEL_IMPLEMENTATION_POSIX)
timeout_msu += t0; // Absolute time // Absolute time
u64 timeout_msu = timeout_ms_abs;
#endif #endif
timeout.tv_sec = timeout_msu / 1000; timeout.tv_sec = timeout_ms / 1000;
timeout.tv_nsec = timeout_msu % 1000 * 1000000UL; timeout.tv_nsec = timeout_ms % 1000 * 1000000UL;
timeoutp = &timeout; timeoutp = &timeout;
} }
@ -244,6 +211,48 @@ static inline T read_once(const volatile T *var)
return *var; return *var;
} }
IPCChannelBuffer::IPCChannelBuffer()
{
#if defined(IPC_CHANNEL_IMPLEMENTATION_POSIX)
pthread_condattr_t condattr;
pthread_mutexattr_t mutexattr;
if (pthread_condattr_init(&condattr) != 0)
goto error_condattr_init;
if (pthread_mutexattr_init(&mutexattr) != 0)
goto error_mutexattr_init;
if (pthread_condattr_setpshared(&condattr, PTHREAD_PROCESS_SHARED) != 0)
goto error_condattr_setpshared;
if (pthread_mutexattr_setpshared(&mutexattr, PTHREAD_PROCESS_SHARED) != 0)
goto error_mutexattr_setpshared;
if (pthread_cond_init(&cond, &condattr) != 0)
goto error_cond_init;
if (pthread_mutex_init(&mutex, &mutexattr) != 0)
goto error_mutex_init;
pthread_mutexattr_destroy(&mutexattr);
pthread_condattr_destroy(&condattr);
return;
error_mutex_init:
pthread_cond_destroy(&cond);
error_cond_init:
error_mutexattr_setpshared:
error_condattr_setpshared:
pthread_mutexattr_destroy(&mutexattr);
error_mutexattr_init:
pthread_condattr_destroy(&condattr);
error_condattr_init:
throw BaseException("Unable to initialize IPCChannelBuffer");
#endif // defined(IPC_CHANNEL_IMPLEMENTATION_POSIX)
}
IPCChannelBuffer::~IPCChannelBuffer()
{
#if defined(IPC_CHANNEL_IMPLEMENTATION_POSIX)
pthread_mutex_destroy(&mutex);
pthread_cond_destroy(&cond);
#endif // defined(IPC_CHANNEL_IMPLEMENTATION_POSIX)
}
IPCChannelEnd IPCChannelEnd::makeA(std::unique_ptr<IPCChannelResources> resources) IPCChannelEnd IPCChannelEnd::makeA(std::unique_ptr<IPCChannelResources> resources)
{ {
IPCChannelShared *shared = resources->data.shared; IPCChannelShared *shared = resources->data.shared;
@ -271,43 +280,56 @@ IPCChannelEnd IPCChannelEnd::makeB(std::unique_ptr<IPCChannelResources> resource
void IPCChannelEnd::sendSmall(const void *data, size_t size) noexcept void IPCChannelEnd::sendSmall(const void *data, size_t size) noexcept
{ {
write_once(&m_dir.buf_out->size, size); write_once(&m_dir.buf_out->size, size);
if (size != 0) if (size != 0)
memcpy(m_dir.buf_out->data, data, size); memcpy(m_dir.buf_out->data, data, size);
post_out(&m_dir); post_out(&m_dir);
} }
bool IPCChannelEnd::sendLarge(const void *data, size_t size, int timeout_ms) noexcept bool IPCChannelEnd::sendLarge(const void *data, size_t size, int timeout_ms) noexcept
{ {
u64 t0 = porting::getTimeMs(); u64 timeout_ms_abs = timeout_ms < 0 ? 0 : porting::getTimeMs() + timeout_ms;
write_once(&m_dir.buf_out->size, size); write_once(&m_dir.buf_out->size, size);
do { do {
memcpy(m_dir.buf_out->data, data, IPC_CHANNEL_MSG_SIZE); memcpy(m_dir.buf_out->data, data, IPC_CHANNEL_MSG_SIZE);
post_out(&m_dir); post_out(&m_dir);
if (!wait_in(&m_dir, timeout_ms, t0))
if (!wait_in(&m_dir, timeout_ms_abs))
return false; return false;
size -= IPC_CHANNEL_MSG_SIZE; size -= IPC_CHANNEL_MSG_SIZE;
data = (u8 *)data + IPC_CHANNEL_MSG_SIZE; data = (u8 *)data + IPC_CHANNEL_MSG_SIZE;
} while (size > IPC_CHANNEL_MSG_SIZE); } while (size > IPC_CHANNEL_MSG_SIZE);
if (size != 0) if (size != 0)
memcpy(m_dir.buf_out->data, data, size); memcpy(m_dir.buf_out->data, data, size);
post_out(&m_dir); post_out(&m_dir);
return true; return true;
} }
bool IPCChannelEnd::recvWithTimeout(int timeout_ms) noexcept bool IPCChannelEnd::recvWithTimeout(int timeout_ms) noexcept
{ {
u64 t0 = porting::getTimeMs();
if (!wait_in(&m_dir, timeout_ms, t0))
return false;
size_t size = read_once(&m_dir.buf_in->size);
m_recv_size = size;
// Note about memcpy: If the other thread is evil, it might change the contents // Note about memcpy: If the other thread is evil, it might change the contents
// of the memory while it's memcopied. We're assuming here that memcpy doesn't // of the memory while it's memcopied. We're assuming here that memcpy doesn't
// cause vulnerabilities due to this. // cause vulnerabilities due to this.
u64 timeout_ms_abs = timeout_ms < 0 ? 0 : porting::getTimeMs() + timeout_ms;
if (!wait_in(&m_dir, timeout_ms_abs))
return false;
size_t size = read_once(&m_dir.buf_in->size);
m_recv_size = size;
if (size <= IPC_CHANNEL_MSG_SIZE) { if (size <= IPC_CHANNEL_MSG_SIZE) {
// small msg // small msg
// (m_large_recv.size() is always >= IPC_CHANNEL_MSG_SIZE) // (m_large_recv.size() is always >= IPC_CHANNEL_MSG_SIZE)
memcpy(m_large_recv.data(), m_dir.buf_in->data, size); memcpy(m_large_recv.data(), m_dir.buf_in->data, size);
} else { } else {
// large msg // large msg
try { try {
@ -324,7 +346,7 @@ bool IPCChannelEnd::recvWithTimeout(int timeout_ms) noexcept
size -= IPC_CHANNEL_MSG_SIZE; size -= IPC_CHANNEL_MSG_SIZE;
recv_data += IPC_CHANNEL_MSG_SIZE; recv_data += IPC_CHANNEL_MSG_SIZE;
post_out(&m_dir); post_out(&m_dir);
if (!wait_in(&m_dir, timeout_ms, t0)) if (!wait_in(&m_dir, timeout_ms_abs))
return false; return false;
} while (size > IPC_CHANNEL_MSG_SIZE); } while (size > IPC_CHANNEL_MSG_SIZE);
memcpy(recv_data, m_dir.buf_in->data, size); memcpy(recv_data, m_dir.buf_in->data, size);