diff --git a/src/threading/ipc_channel.cpp b/src/threading/ipc_channel.cpp index 2e5eafe2f..922dde34c 100644 --- a/src/threading/ipc_channel.cpp +++ b/src/threading/ipc_channel.cpp @@ -47,48 +47,6 @@ static void busy_wait(int n) noexcept #define HAVE_BUSY_WAIT 0 #endif -IPCChannelBuffer::IPCChannelBuffer() -{ -#if defined(IPC_CHANNEL_IMPLEMENTATION_POSIX) - pthread_condattr_t condattr; - pthread_mutexattr_t mutexattr; - if (pthread_condattr_init(&condattr) != 0) - goto error_condattr_init; - if (pthread_mutexattr_init(&mutexattr) != 0) - goto error_mutexattr_init; - if (pthread_condattr_setpshared(&condattr, PTHREAD_PROCESS_SHARED) != 0) - goto error_condattr_setpshared; - if (pthread_mutexattr_setpshared(&mutexattr, PTHREAD_PROCESS_SHARED) != 0) - goto error_mutexattr_setpshared; - if (pthread_cond_init(&cond, &condattr) != 0) - goto error_cond_init; - if (pthread_mutex_init(&mutex, &mutexattr) != 0) - goto error_mutex_init; - pthread_mutexattr_destroy(&mutexattr); - pthread_condattr_destroy(&condattr); - return; - -error_mutex_init: - pthread_cond_destroy(&cond); -error_cond_init: -error_mutexattr_setpshared: -error_condattr_setpshared: - pthread_mutexattr_destroy(&mutexattr); -error_mutexattr_init: - pthread_condattr_destroy(&condattr); -error_condattr_init: - throw BaseException("Unable to initialize IPCChannelBuffer"); -#endif // defined(IPC_CHANNEL_IMPLEMENTATION_POSIX) -} - -IPCChannelBuffer::~IPCChannelBuffer() -{ -#if defined(IPC_CHANNEL_IMPLEMENTATION_POSIX) - pthread_mutex_destroy(&mutex); - pthread_cond_destroy(&cond); -#endif // defined(IPC_CHANNEL_IMPLEMENTATION_POSIX) -} - #if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32) // returns false on timeout @@ -194,28 +152,37 @@ static void post(IPCChannelBuffer *buf) noexcept #endif -static bool wait_in(IPCChannelEnd::Dir *dir, int timeout_ms, u64 t0) +// timeout_ms_abs: absolute timeout (using porting::getTimeMs()), or 0 for no timeout +// returns false on timeout +static bool wait_in(IPCChannelEnd::Dir *dir, u64 timeout_ms_abs) { #if defined(IPC_CHANNEL_IMPLEMENTATION_WIN32) + // Relative time DWORD timeout = INFINITE; - if (timeout_ms >= 0) { - timeout = (DWORD)timeout_ms; - timeout_msu -= porting::getTimeMs() - t0; // Relative time + if (timeout_ms_abs > 0) { + u64 tnow = porting::getTimeMs(); + if (tnow > timeout_ms_abs) + return false; + timeout = (DWORD)(timeout_ms_abs - tnow); } return wait(dir->sem_in, timeout); #else struct timespec timeout; struct timespec *timeoutp = nullptr; - if (timeout_ms >= 0) { - u64 timeout_msu = timeout_ms; + if (timeout_ms_abs > 0) { #if defined(IPC_CHANNEL_IMPLEMENTATION_LINUX_FUTEX) - timeout_msu -= porting::getTimeMs() - t0; // Relative time + // Relative time + u64 tnow = porting::getTimeMs(); + if (tnow > timeout_ms_abs) + return false; + u64 timeout_ms = timeout_ms_abs - tnow; #elif defined(IPC_CHANNEL_IMPLEMENTATION_POSIX) - timeout_msu += t0; // Absolute time + // Absolute time + u64 timeout_msu = timeout_ms_abs; #endif - timeout.tv_sec = timeout_msu / 1000; - timeout.tv_nsec = timeout_msu % 1000 * 1000000UL; + timeout.tv_sec = timeout_ms / 1000; + timeout.tv_nsec = timeout_ms % 1000 * 1000000UL; timeoutp = &timeout; } @@ -244,6 +211,48 @@ static inline T read_once(const volatile T *var) return *var; } +IPCChannelBuffer::IPCChannelBuffer() +{ +#if defined(IPC_CHANNEL_IMPLEMENTATION_POSIX) + pthread_condattr_t condattr; + pthread_mutexattr_t mutexattr; + if (pthread_condattr_init(&condattr) != 0) + goto error_condattr_init; + if (pthread_mutexattr_init(&mutexattr) != 0) + goto error_mutexattr_init; + if (pthread_condattr_setpshared(&condattr, PTHREAD_PROCESS_SHARED) != 0) + goto error_condattr_setpshared; + if (pthread_mutexattr_setpshared(&mutexattr, PTHREAD_PROCESS_SHARED) != 0) + goto error_mutexattr_setpshared; + if (pthread_cond_init(&cond, &condattr) != 0) + goto error_cond_init; + if (pthread_mutex_init(&mutex, &mutexattr) != 0) + goto error_mutex_init; + pthread_mutexattr_destroy(&mutexattr); + pthread_condattr_destroy(&condattr); + return; + +error_mutex_init: + pthread_cond_destroy(&cond); +error_cond_init: +error_mutexattr_setpshared: +error_condattr_setpshared: + pthread_mutexattr_destroy(&mutexattr); +error_mutexattr_init: + pthread_condattr_destroy(&condattr); +error_condattr_init: + throw BaseException("Unable to initialize IPCChannelBuffer"); +#endif // defined(IPC_CHANNEL_IMPLEMENTATION_POSIX) +} + +IPCChannelBuffer::~IPCChannelBuffer() +{ +#if defined(IPC_CHANNEL_IMPLEMENTATION_POSIX) + pthread_mutex_destroy(&mutex); + pthread_cond_destroy(&cond); +#endif // defined(IPC_CHANNEL_IMPLEMENTATION_POSIX) +} + IPCChannelEnd IPCChannelEnd::makeA(std::unique_ptr resources) { IPCChannelShared *shared = resources->data.shared; @@ -271,43 +280,56 @@ IPCChannelEnd IPCChannelEnd::makeB(std::unique_ptr resource void IPCChannelEnd::sendSmall(const void *data, size_t size) noexcept { write_once(&m_dir.buf_out->size, size); + if (size != 0) memcpy(m_dir.buf_out->data, data, size); + post_out(&m_dir); } bool IPCChannelEnd::sendLarge(const void *data, size_t size, int timeout_ms) noexcept { - u64 t0 = porting::getTimeMs(); + u64 timeout_ms_abs = timeout_ms < 0 ? 0 : porting::getTimeMs() + timeout_ms; + write_once(&m_dir.buf_out->size, size); + do { memcpy(m_dir.buf_out->data, data, IPC_CHANNEL_MSG_SIZE); post_out(&m_dir); - if (!wait_in(&m_dir, timeout_ms, t0)) + + if (!wait_in(&m_dir, timeout_ms_abs)) return false; + size -= IPC_CHANNEL_MSG_SIZE; data = (u8 *)data + IPC_CHANNEL_MSG_SIZE; } while (size > IPC_CHANNEL_MSG_SIZE); + if (size != 0) memcpy(m_dir.buf_out->data, data, size); post_out(&m_dir); + return true; } bool IPCChannelEnd::recvWithTimeout(int timeout_ms) noexcept { - u64 t0 = porting::getTimeMs(); - if (!wait_in(&m_dir, timeout_ms, t0)) - return false; - size_t size = read_once(&m_dir.buf_in->size); - m_recv_size = size; // Note about memcpy: If the other thread is evil, it might change the contents // of the memory while it's memcopied. We're assuming here that memcpy doesn't // cause vulnerabilities due to this. + + u64 timeout_ms_abs = timeout_ms < 0 ? 0 : porting::getTimeMs() + timeout_ms; + + if (!wait_in(&m_dir, timeout_ms_abs)) + return false; + + size_t size = read_once(&m_dir.buf_in->size); + m_recv_size = size; + if (size <= IPC_CHANNEL_MSG_SIZE) { // small msg // (m_large_recv.size() is always >= IPC_CHANNEL_MSG_SIZE) memcpy(m_large_recv.data(), m_dir.buf_in->data, size); + } else { // large msg try { @@ -324,7 +346,7 @@ bool IPCChannelEnd::recvWithTimeout(int timeout_ms) noexcept size -= IPC_CHANNEL_MSG_SIZE; recv_data += IPC_CHANNEL_MSG_SIZE; post_out(&m_dir); - if (!wait_in(&m_dir, timeout_ms, t0)) + if (!wait_in(&m_dir, timeout_ms_abs)) return false; } while (size > IPC_CHANNEL_MSG_SIZE); memcpy(recv_data, m_dir.buf_in->data, size);