]> git.ipfire.org Git - thirdparty/ccache.git/commitdiff
feat: Support enqueuing ThreadPool tasks from worker threads
authorJoel Rosdahl <joel@rosdahl.net>
Tue, 28 Oct 2025 20:09:01 +0000 (21:09 +0100)
committerJoel Rosdahl <joel@rosdahl.net>
Wed, 12 Nov 2025 20:06:24 +0000 (21:06 +0100)
src/ccache/util/threadpool.cpp

index fc645ce5f589ac103fbb32567cc4a2479c21d6e4..78c56ed1a717a06277a09b7e813fa2107a019b71 100644 (file)
 
 namespace util {
 
+namespace {
+
+// Identifies which thread pool (if any) the current thread is executing a
+// worker task for. Used to make enqueue() safe when called from worker threads
+// by avoiding blocking on a full queue, which can otherwise deadlock if all
+// workers try to enqueue while the queue is full.
+thread_local ThreadPool* t_current_pool = nullptr;
+
+void
+execute_task(const std::function<void()>& task, const char* context)
+{
+  try {
+    task();
+  } catch (const std::exception& e) {
+    LOG("Thread pool {} task failed: {}", context, e.what());
+  } catch (...) {
+    LOG("Thread pool {} task failed with unknown exception", context);
+  }
+}
+} // namespace
+
 ThreadPool::ThreadPool(size_t number_of_threads, size_t task_queue_max_size)
   : m_task_queue_max_size(task_queue_max_size)
 {
@@ -42,19 +63,39 @@ ThreadPool::~ThreadPool() noexcept
 void
 ThreadPool::enqueue(std::function<void()> function)
 {
+  // Fast path for worker threads: avoid blocking on a full queue to prevent
+  // deadlocks (all workers waiting inside enqueue() means no one can pop).
+  std::function<void()> inline_task;
   {
     std::unique_lock<std::mutex> lock(m_mutex);
-    if (!m_shutting_down && m_task_queue.size() >= m_task_queue_max_size) {
-      m_producer_cv.wait(lock, [this] {
-        return m_shutting_down || m_task_queue.size() < m_task_queue_max_size;
-      });
-    }
+
     if (m_shutting_down) {
       return;
     }
-    m_task_queue.emplace(std::move(function));
+
+    // If called from a worker thread belonging to this pool and the queue is
+    // full, execute the task inline instead of blocking.
+    if (t_current_pool == this
+        && m_task_queue.size() >= m_task_queue_max_size) {
+      inline_task = std::move(function);
+    } else {
+      if (m_task_queue.size() >= m_task_queue_max_size) {
+        m_producer_cv.wait(lock, [this] {
+          return m_shutting_down || m_task_queue.size() < m_task_queue_max_size;
+        });
+      }
+      if (m_shutting_down) {
+        return;
+      }
+      m_task_queue.emplace(std::move(function));
+      // Wake a worker to process the enqueued task.
+      m_worker_cv.notify_one();
+    }
+  }
+
+  if (inline_task) {
+    execute_task(inline_task, "inline");
   }
-  m_worker_cv.notify_one();
 }
 
 void
@@ -80,6 +121,9 @@ ThreadPool::shut_down() noexcept
 void
 ThreadPool::worker_thread_main()
 {
+  // Mark the current thread as a worker of this pool for the duration of its
+  // lifetime to allow enqueue() to detect re-entrancy safely.
+  t_current_pool = this;
   while (true) {
     std::function<void()> task;
 
@@ -92,16 +136,13 @@ ThreadPool::worker_thread_main()
       }
       task = std::move(m_task_queue.front());
       m_task_queue.pop();
+      // Notify any threads blocked in enqueue() that space is now available.
+      // Notifying while holding the lock ensures the waiting thread sees the
+      // updated queue size immediately upon waking.
+      m_producer_cv.notify_one();
     }
 
-    m_producer_cv.notify_one();
-    try {
-      task();
-    } catch (const std::exception& e) {
-      LOG("Thread pool task failed: {}", e.what());
-    } catch (...) {
-      LOG_RAW("Thread pool task failed with unknown exception");
-    }
+    execute_task(task, "worker");
   }
 }