diff --git a/src/script/cpp_api/s_async.cpp b/src/script/cpp_api/s_async.cpp index 4cb46f6bb..982fb825e 100644 --- a/src/script/cpp_api/s_async.cpp +++ b/src/script/cpp_api/s_async.cpp @@ -24,6 +24,11 @@ extern "C" { #endif #include "lua_api/l_base.h" +// if a job is waiting for this duration, an additional thread will be spawned +static constexpr int AUTOSCALE_DELAY_MS = 1000; +// if jobs are waiting for this duration, a warning is printed +static constexpr int STUCK_DELAY_MS = 11500; + /******************************************************************************/ AsyncEngine::~AsyncEngine() { @@ -156,6 +161,7 @@ void AsyncEngine::step(lua_State *L) { stepJobResults(L); stepAutoscale(); + stepStuckWarning(); } void AsyncEngine::stepJobResults(lua_State *L) @@ -203,11 +209,9 @@ void AsyncEngine::stepAutoscale() if (autoscaleTimer && porting::getTimeMs() >= autoscaleTimer) { autoscaleTimer = 0; // Determine overlap with previous snapshot - unsigned int n = 0; - for (const auto &it : jobQueue) - n += autoscaleSeenJobs.count(it.id); - autoscaleSeenJobs.clear(); - infostream << "AsyncEngine: " << n << " jobs were still waiting after 1s" << std::endl; + size_t n = compareJobs(autoscaleSeenJobs); + infostream << "AsyncEngine: " << n << " jobs were still waiting after " + << AUTOSCALE_DELAY_MS << "ms, adding more threads." << std::endl; // Start this many new threads while (workerThreads.size() < autoscaleMaxWorkers && n > 0) { addWorkerThread(); @@ -216,13 +220,34 @@ void AsyncEngine::stepAutoscale() return; } - // 1) Check if there's anything in the queue + // 1) Check queue contents if (!autoscaleTimer && !jobQueue.empty()) { - // Take a snapshot of all jobs we have seen - for (const auto &it : jobQueue) - autoscaleSeenJobs.emplace(it.id); - // and set a timer for 1 second - autoscaleTimer = porting::getTimeMs() + 1000; + autoscaleSeenJobs.clear(); + snapshotJobs(autoscaleSeenJobs); + autoscaleTimer = porting::getTimeMs() + AUTOSCALE_DELAY_MS; + } +} + +void AsyncEngine::stepStuckWarning() +{ + MutexAutoLock autolock(jobQueueMutex); + + // 2) If the timer elapsed, check again + if (stuckTimer && porting::getTimeMs() >= stuckTimer) { + stuckTimer = 0; + size_t n = compareJobs(stuckSeenJobs); + if (n > 0) { + warningstream << "AsyncEngine: " << n << " jobs seem to be stuck in queue" + " (" << workerThreads.size() << " workers active)" << std::endl; + } + // fallthrough + } + + // 1) Check queue contents + if (!stuckTimer && !jobQueue.empty()) { + stuckSeenJobs.clear(); + snapshotJobs(stuckSeenJobs); + stuckTimer = porting::getTimeMs() + STUCK_DELAY_MS; } } diff --git a/src/script/cpp_api/s_async.h b/src/script/cpp_api/s_async.h index d2a6913ef..1b6743dea 100644 --- a/src/script/cpp_api/s_async.h +++ b/src/script/cpp_api/s_async.h @@ -138,6 +138,11 @@ protected: */ void stepAutoscale(); + /** + * Print warning message if too many jobs are stuck + */ + void stepStuckWarning(); + /** * Initialize environment with current registred functions * this function adds all functions registred by registerFunction to the @@ -149,6 +154,21 @@ protected: bool prepareEnvironment(lua_State* L, int top); private: + template + inline void snapshotJobs(T &to) + { + for (const auto &it : jobQueue) + to.emplace(it.id); + } + template + inline size_t compareJobs(const T &from) + { + size_t overlap = 0; + for (const auto &it : jobQueue) + overlap += from.count(it.id); + return overlap; + } + // Variable locking the engine against further modification bool initDone = false; @@ -158,6 +178,9 @@ private: u64 autoscaleTimer = 0; std::unordered_set autoscaleSeenJobs; + u64 stuckTimer = 0; + std::unordered_set stuckSeenJobs; + // Only set for the server async environment (duh) Server *server = nullptr;