]> git.ipfire.org Git - thirdparty/xfsprogs-dev.git/commitdiff
workqueue: bound maximum queue depth
authorDave Chinner <dchinner@redhat.com>
Thu, 15 Apr 2021 19:44:49 +0000 (15:44 -0400)
committerEric Sandeen <sandeen@sandeen.net>
Thu, 15 Apr 2021 19:44:49 +0000 (15:44 -0400)
Existing users of workqueues have bound maximum queue depths in
their external algorithms (e.g. prefetch counts). For parallelising
work that doesn't have an external bound, allow workqueues to
throttle incoming requests at a maximum bound. Bounded workqueues
also need to distribute work over all worker threads themselves as
there is no external bounding or worker function throttling
provided.

Existing callers are not throttled and retain direct control of
worker threads, only users of the new create interface will be
throttled and concurrency managed.

Reviewed-by: Darrick J. Wong <djwong@kernel.org>
Signed-off-by: Dave Chinner <dchinner@redhat.com>
Signed-off-by: Gao Xiang <hsiangkao@redhat.com>
Signed-off-by: Eric Sandeen <sandeen@sandeen.net>
libfrog/workqueue.c
libfrog/workqueue.h

index fe3de42893793c7f953faf7ff2834bfa56a00e4f..8c1a163e145f421d7d5ed743fe947807ddc25346 100644 (file)
@@ -40,13 +40,21 @@ workqueue_thread(void *arg)
                }
 
                /*
-                *  Dequeue work from the head of the list.
+                *  Dequeue work from the head of the list. If the queue was
+                *  full then send a wakeup if we're configured to do so.
                 */
                assert(wq->item_count > 0);
+               if (wq->max_queued)
+                       pthread_cond_broadcast(&wq->queue_full);
+
                wi = wq->next_item;
                wq->next_item = wi->next;
                wq->item_count--;
 
+               if (wq->max_queued && wq->next_item) {
+                       /* more work, wake up another worker */
+                       pthread_cond_signal(&wq->wakeup);
+               }
                pthread_mutex_unlock(&wq->lock);
 
                (wi->function)(wi->queue, wi->index, wi->arg);
@@ -58,10 +66,11 @@ workqueue_thread(void *arg)
 
 /* Allocate a work queue and threads.  Returns zero or negative error code. */
 int
-workqueue_create(
+workqueue_create_bound(
        struct workqueue        *wq,
        void                    *wq_ctx,
-       unsigned int            nr_workers)
+       unsigned int            nr_workers,
+       unsigned int            max_queue)
 {
        unsigned int            i;
        int                     err = 0;
@@ -70,12 +79,16 @@ workqueue_create(
        err = -pthread_cond_init(&wq->wakeup, NULL);
        if (err)
                return err;
+       err = -pthread_cond_init(&wq->queue_full, NULL);
+       if (err)
+               goto out_wake;
        err = -pthread_mutex_init(&wq->lock, NULL);
        if (err)
                goto out_cond;
 
        wq->wq_ctx = wq_ctx;
        wq->thread_count = nr_workers;
+       wq->max_queued = max_queue;
        wq->threads = malloc(nr_workers * sizeof(pthread_t));
        if (!wq->threads) {
                err = -errno;
@@ -102,10 +115,21 @@ workqueue_create(
 out_mutex:
        pthread_mutex_destroy(&wq->lock);
 out_cond:
+       pthread_cond_destroy(&wq->queue_full);
+out_wake:
        pthread_cond_destroy(&wq->wakeup);
        return err;
 }
 
+int
+workqueue_create(
+       struct workqueue        *wq,
+       void                    *wq_ctx,
+       unsigned int            nr_workers)
+{
+       return workqueue_create_bound(wq, wq_ctx, nr_workers, 0);
+}
+
 /*
  * Create a work item consisting of a function and some arguments and schedule
  * the work item to be run via the thread pool.  Returns zero or a negative
@@ -140,6 +164,7 @@ workqueue_add(
 
        /* Now queue the new work structure to the work queue. */
        pthread_mutex_lock(&wq->lock);
+restart:
        if (wq->next_item == NULL) {
                assert(wq->item_count == 0);
                ret = -pthread_cond_signal(&wq->wakeup);
@@ -150,6 +175,16 @@ workqueue_add(
                }
                wq->next_item = wi;
        } else {
+               /* throttle on a full queue if configured */
+               if (wq->max_queued && wq->item_count == wq->max_queued) {
+                       pthread_cond_wait(&wq->queue_full, &wq->lock);
+                       /*
+                        * Queue might be empty or even still full by the time
+                        * we get the lock back, so restart the lookup so we do
+                        * the right thing with the current state of the queue.
+                        */
+                       goto restart;
+               }
                wq->last_item->next = wi;
        }
        wq->last_item = wi;
@@ -201,5 +236,6 @@ workqueue_destroy(
        free(wq->threads);
        pthread_mutex_destroy(&wq->lock);
        pthread_cond_destroy(&wq->wakeup);
+       pthread_cond_destroy(&wq->queue_full);
        memset(wq, 0, sizeof(*wq));
 }
index a56d1cf14081be54cd4bb37b136cb5fe40cc03ce..a9c108d0e66a3bd3efc77c5b669f4b672a42090f 100644 (file)
@@ -31,10 +31,14 @@ struct workqueue {
        unsigned int            thread_count;
        bool                    terminate;
        bool                    terminated;
+       int                     max_queued;
+       pthread_cond_t          queue_full;
 };
 
 int workqueue_create(struct workqueue *wq, void *wq_ctx,
                unsigned int nr_workers);
+int workqueue_create_bound(struct workqueue *wq, void *wq_ctx,
+               unsigned int nr_workers, unsigned int max_queue);
 int workqueue_add(struct workqueue *wq, workqueue_func_t fn,
                uint32_t index, void *arg);
 int workqueue_terminate(struct workqueue *wq);