namespace util {
+namespace {
+
+// Identifies which thread pool (if any) the current thread is executing a
+// worker task for. Used to make enqueue() safe when called from worker threads
+// by avoiding blocking on a full queue, which can otherwise deadlock if all
+// workers try to enqueue while the queue is full.
+thread_local ThreadPool* t_current_pool = nullptr;
+
+void
+execute_task(const std::function<void()>& task, const char* context)
+{
+ try {
+ task();
+ } catch (const std::exception& e) {
+ LOG("Thread pool {} task failed: {}", context, e.what());
+ } catch (...) {
+ LOG("Thread pool {} task failed with unknown exception", context);
+ }
+}
+} // namespace
+
ThreadPool::ThreadPool(size_t number_of_threads, size_t task_queue_max_size)
: m_task_queue_max_size(task_queue_max_size)
{
void
ThreadPool::enqueue(std::function<void()> function)
{
+ // Fast path for worker threads: avoid blocking on a full queue to prevent
+ // deadlocks (all workers waiting inside enqueue() means no one can pop).
+ std::function<void()> inline_task;
{
std::unique_lock<std::mutex> lock(m_mutex);
- if (!m_shutting_down && m_task_queue.size() >= m_task_queue_max_size) {
- m_producer_cv.wait(lock, [this] {
- return m_shutting_down || m_task_queue.size() < m_task_queue_max_size;
- });
- }
+
if (m_shutting_down) {
return;
}
- m_task_queue.emplace(std::move(function));
+
+ // If called from a worker thread belonging to this pool and the queue is
+ // full, execute the task inline instead of blocking.
+ if (t_current_pool == this
+ && m_task_queue.size() >= m_task_queue_max_size) {
+ inline_task = std::move(function);
+ } else {
+ if (m_task_queue.size() >= m_task_queue_max_size) {
+ m_producer_cv.wait(lock, [this] {
+ return m_shutting_down || m_task_queue.size() < m_task_queue_max_size;
+ });
+ }
+ if (m_shutting_down) {
+ return;
+ }
+ m_task_queue.emplace(std::move(function));
+ // Wake a worker to process the enqueued task.
+ m_worker_cv.notify_one();
+ }
+ }
+
+ if (inline_task) {
+ execute_task(inline_task, "inline");
}
- m_worker_cv.notify_one();
}
void
void
ThreadPool::worker_thread_main()
{
+ // Mark the current thread as a worker of this pool for the duration of its
+ // lifetime to allow enqueue() to detect re-entrancy safely.
+ t_current_pool = this;
while (true) {
std::function<void()> task;
}
task = std::move(m_task_queue.front());
m_task_queue.pop();
+ // Notify any threads blocked in enqueue() that space is now available.
+ // Notifying while holding the lock ensures the waiting thread sees the
+ // updated queue size immediately upon waking.
+ m_producer_cv.notify_one();
}
- m_producer_cv.notify_one();
- try {
- task();
- } catch (const std::exception& e) {
- LOG("Thread pool task failed: {}", e.what());
- } catch (...) {
- LOG_RAW("Thread pool task failed with unknown exception");
- }
+ execute_task(task, "worker");
}
}