From: Joel Rosdahl Date: Tue, 28 Oct 2025 20:09:01 +0000 (+0100) Subject: feat: Support enqueuing ThreadPool tasks from worker threads X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=181549309c3cf2c93bad94e92feaf2bc652b0eb3;p=thirdparty%2Fccache.git feat: Support enqueuing ThreadPool tasks from worker threads --- diff --git a/src/ccache/util/threadpool.cpp b/src/ccache/util/threadpool.cpp index fc645ce5..78c56ed1 100644 --- a/src/ccache/util/threadpool.cpp +++ b/src/ccache/util/threadpool.cpp @@ -24,6 +24,27 @@ namespace util { +namespace { + +// Identifies which thread pool (if any) the current thread is executing a +// worker task for. Used to make enqueue() safe when called from worker threads +// by avoiding blocking on a full queue, which can otherwise deadlock if all +// workers try to enqueue while the queue is full. +thread_local ThreadPool* t_current_pool = nullptr; + +void +execute_task(const std::function& task, const char* context) +{ + try { + task(); + } catch (const std::exception& e) { + LOG("Thread pool {} task failed: {}", context, e.what()); + } catch (...) { + LOG("Thread pool {} task failed with unknown exception", context); + } +} +} // namespace + ThreadPool::ThreadPool(size_t number_of_threads, size_t task_queue_max_size) : m_task_queue_max_size(task_queue_max_size) { @@ -42,19 +63,39 @@ ThreadPool::~ThreadPool() noexcept void ThreadPool::enqueue(std::function function) { + // Fast path for worker threads: avoid blocking on a full queue to prevent + // deadlocks (all workers waiting inside enqueue() means no one can pop). + std::function inline_task; { std::unique_lock lock(m_mutex); - if (!m_shutting_down && m_task_queue.size() >= m_task_queue_max_size) { - m_producer_cv.wait(lock, [this] { - return m_shutting_down || m_task_queue.size() < m_task_queue_max_size; - }); - } + if (m_shutting_down) { return; } - m_task_queue.emplace(std::move(function)); + + // If called from a worker thread belonging to this pool and the queue is + // full, execute the task inline instead of blocking. + if (t_current_pool == this + && m_task_queue.size() >= m_task_queue_max_size) { + inline_task = std::move(function); + } else { + if (m_task_queue.size() >= m_task_queue_max_size) { + m_producer_cv.wait(lock, [this] { + return m_shutting_down || m_task_queue.size() < m_task_queue_max_size; + }); + } + if (m_shutting_down) { + return; + } + m_task_queue.emplace(std::move(function)); + // Wake a worker to process the enqueued task. + m_worker_cv.notify_one(); + } + } + + if (inline_task) { + execute_task(inline_task, "inline"); } - m_worker_cv.notify_one(); } void @@ -80,6 +121,9 @@ ThreadPool::shut_down() noexcept void ThreadPool::worker_thread_main() { + // Mark the current thread as a worker of this pool for the duration of its + // lifetime to allow enqueue() to detect re-entrancy safely. + t_current_pool = this; while (true) { std::function task; @@ -92,16 +136,13 @@ ThreadPool::worker_thread_main() } task = std::move(m_task_queue.front()); m_task_queue.pop(); + // Notify any threads blocked in enqueue() that space is now available. + // Notifying while holding the lock ensures the waiting thread sees the + // updated queue size immediately upon waking. + m_producer_cv.notify_one(); } - m_producer_cv.notify_one(); - try { - task(); - } catch (const std::exception& e) { - LOG("Thread pool task failed: {}", e.what()); - } catch (...) { - LOG_RAW("Thread pool task failed with unknown exception"); - } + execute_task(task, "worker"); } }