From: Darrick J. Wong Date: Tue, 12 Sep 2023 19:39:41 +0000 (-0700) Subject: libfrog: fix overly sleep workqueues X-Git-Tag: v6.5.0~7 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=12838bda12e6693dee72118f9187bf64201aac02;p=thirdparty%2Fxfsprogs-dev.git libfrog: fix overly sleep workqueues I discovered the following bad behavior in the workqueue code when I noticed that xfs_scrub was running single-threaded despite having 4 virtual CPUs allocated to the VM. I observed this sequence: Thread 1 WQ1 WQ2...N workqueue_create pthread_cond_wait pthread_cond_wait workqueue_add next_item == NULL pthread_cond_signal workqueue_add next_item != NULL workqueue_add next_item != NULL pthread_cond_wait workqueue_terminate pthread_cond_broadcast Notice how threads WQ2...N are completely idle while WQ1 ends up doing all the work! That wasn't the point of a worker pool! Observe that thread 1 manages to queue two work items before WQ1 pulls the first item off the queue. When thread 1 queues the third item, it sees that next_item is not NULL, so it doesn't wake a worker. If thread 1 queues all the N work that it has before WQ1 empties the queue, then none of the other thread get woken up. Fix this by maintaining a count of the number of active threads, and using that to wake either the sole idle thread, or all the threads if there are many that are idle. This dramatically improves startup behavior of the workqueue and eliminates the collapse case. Signed-off-by: Darrick J. Wong Reviewed-by: Carlos Maiolino Signed-off-by: Carlos Maiolino --- diff --git a/libfrog/workqueue.c b/libfrog/workqueue.c index 702a53e2f..db5b3f68b 100644 --- a/libfrog/workqueue.c +++ b/libfrog/workqueue.c @@ -26,8 +26,8 @@ workqueue_thread(void *arg) * Check for notification to exit after every chunk of work. */ rcu_register_thread(); + pthread_mutex_lock(&wq->lock); while (1) { - pthread_mutex_lock(&wq->lock); /* * Wait for work. @@ -36,10 +36,8 @@ workqueue_thread(void *arg) assert(wq->item_count == 0); pthread_cond_wait(&wq->wakeup, &wq->lock); } - if (wq->next_item == NULL && wq->terminate) { - pthread_mutex_unlock(&wq->lock); + if (wq->next_item == NULL && wq->terminate) break; - } /* * Dequeue work from the head of the list. If the queue was @@ -57,11 +55,16 @@ workqueue_thread(void *arg) /* more work, wake up another worker */ pthread_cond_signal(&wq->wakeup); } + wq->active_threads++; pthread_mutex_unlock(&wq->lock); (wi->function)(wi->queue, wi->index, wi->arg); free(wi); + + pthread_mutex_lock(&wq->lock); + wq->active_threads--; } + pthread_mutex_unlock(&wq->lock); rcu_unregister_thread(); return NULL; @@ -170,12 +173,6 @@ workqueue_add( restart: if (wq->next_item == NULL) { assert(wq->item_count == 0); - ret = -pthread_cond_signal(&wq->wakeup); - if (ret) { - pthread_mutex_unlock(&wq->lock); - free(wi); - return ret; - } wq->next_item = wi; } else { /* throttle on a full queue if configured */ @@ -192,6 +189,23 @@ restart: } wq->last_item = wi; wq->item_count++; + + if (wq->active_threads == wq->thread_count - 1) { + /* One thread is idle, wake it */ + ret = -pthread_cond_signal(&wq->wakeup); + if (ret) { + pthread_mutex_unlock(&wq->lock); + return ret; + } + } else if (wq->active_threads < wq->thread_count) { + /* Multiple threads are idle, wake everyone */ + ret = -pthread_cond_broadcast(&wq->wakeup); + if (ret) { + pthread_mutex_unlock(&wq->lock); + return ret; + } + } + pthread_mutex_unlock(&wq->lock); return 0; diff --git a/libfrog/workqueue.h b/libfrog/workqueue.h index a9c108d0e..edbe12fab 100644 --- a/libfrog/workqueue.h +++ b/libfrog/workqueue.h @@ -29,6 +29,7 @@ struct workqueue { pthread_cond_t wakeup; unsigned int item_count; unsigned int thread_count; + unsigned int active_threads; bool terminate; bool terminated; int max_queued;