]> git.ipfire.org Git - thirdparty/xfsprogs-dev.git/commitdiff
libfrog: fix overly sleep workqueues
authorDarrick J. Wong <djwong@kernel.org>
Tue, 12 Sep 2023 19:39:41 +0000 (12:39 -0700)
committerCarlos Maiolino <cem@kernel.org>
Thu, 5 Oct 2023 12:57:20 +0000 (14:57 +0200)
I discovered the following bad behavior in the workqueue code when I
noticed that xfs_scrub was running single-threaded despite having 4
virtual CPUs allocated to the VM.  I observed this sequence:

Thread 1 WQ1 WQ2...N
workqueue_create
<start up>
pthread_cond_wait
<start up>
pthread_cond_wait
workqueue_add
next_item == NULL
pthread_cond_signal

workqueue_add
next_item != NULL
<do not pthread_cond_signal>

<receives wakeup>
<run first item>

workqueue_add
next_item != NULL
<do not pthread_cond_signal>

<run second item>
<run third item>
pthread_cond_wait

workqueue_terminate
pthread_cond_broadcast
<receives wakeup>
<nothing to do, exits>
<wakes up again>
<nothing to do, exits>

Notice how threads WQ2...N are completely idle while WQ1 ends up doing
all the work!  That wasn't the point of a worker pool!  Observe that
thread 1 manages to queue two work items before WQ1 pulls the first item
off the queue.  When thread 1 queues the third item, it sees that
next_item is not NULL, so it doesn't wake a worker.  If thread 1 queues
all the N work that it has before WQ1 empties the queue, then none of
the other thread get woken up.

Fix this by maintaining a count of the number of active threads, and
using that to wake either the sole idle thread, or all the threads if
there are many that are idle.  This dramatically improves startup
behavior of the workqueue and eliminates the collapse case.

Signed-off-by: Darrick J. Wong <djwong@kernel.org>
Reviewed-by: Carlos Maiolino <cmaiolino@redhat.com>
Signed-off-by: Carlos Maiolino <cem@kernel.org>
libfrog/workqueue.c
libfrog/workqueue.h

index 702a53e2f3c0a3e7f5ad3456ec741c347f65abf1..db5b3f68bc54280e10681177543eeee71e8fac69 100644 (file)
@@ -26,8 +26,8 @@ workqueue_thread(void *arg)
         * Check for notification to exit after every chunk of work.
         */
        rcu_register_thread();
+       pthread_mutex_lock(&wq->lock);
        while (1) {
-               pthread_mutex_lock(&wq->lock);
 
                /*
                 * Wait for work.
@@ -36,10 +36,8 @@ workqueue_thread(void *arg)
                        assert(wq->item_count == 0);
                        pthread_cond_wait(&wq->wakeup, &wq->lock);
                }
-               if (wq->next_item == NULL && wq->terminate) {
-                       pthread_mutex_unlock(&wq->lock);
+               if (wq->next_item == NULL && wq->terminate)
                        break;
-               }
 
                /*
                 *  Dequeue work from the head of the list. If the queue was
@@ -57,11 +55,16 @@ workqueue_thread(void *arg)
                        /* more work, wake up another worker */
                        pthread_cond_signal(&wq->wakeup);
                }
+               wq->active_threads++;
                pthread_mutex_unlock(&wq->lock);
 
                (wi->function)(wi->queue, wi->index, wi->arg);
                free(wi);
+
+               pthread_mutex_lock(&wq->lock);
+               wq->active_threads--;
        }
+       pthread_mutex_unlock(&wq->lock);
        rcu_unregister_thread();
 
        return NULL;
@@ -170,12 +173,6 @@ workqueue_add(
 restart:
        if (wq->next_item == NULL) {
                assert(wq->item_count == 0);
-               ret = -pthread_cond_signal(&wq->wakeup);
-               if (ret) {
-                       pthread_mutex_unlock(&wq->lock);
-                       free(wi);
-                       return ret;
-               }
                wq->next_item = wi;
        } else {
                /* throttle on a full queue if configured */
@@ -192,6 +189,23 @@ restart:
        }
        wq->last_item = wi;
        wq->item_count++;
+
+       if (wq->active_threads == wq->thread_count - 1) {
+               /* One thread is idle, wake it */
+               ret = -pthread_cond_signal(&wq->wakeup);
+               if (ret) {
+                       pthread_mutex_unlock(&wq->lock);
+                       return ret;
+               }
+       } else if (wq->active_threads < wq->thread_count) {
+               /* Multiple threads are idle, wake everyone */
+               ret = -pthread_cond_broadcast(&wq->wakeup);
+               if (ret) {
+                       pthread_mutex_unlock(&wq->lock);
+                       return ret;
+               }
+       }
+
        pthread_mutex_unlock(&wq->lock);
 
        return 0;
index a9c108d0e66a3bd3efc77c5b669f4b672a42090f..edbe12fabab4ffaeac77ac3fe3bc27e776c2baa2 100644 (file)
@@ -29,6 +29,7 @@ struct workqueue {
        pthread_cond_t          wakeup;
        unsigned int            item_count;
        unsigned int            thread_count;
+       unsigned int            active_threads;
        bool                    terminate;
        bool                    terminated;
        int                     max_queued;