]> git.ipfire.org Git - thirdparty/xfsprogs-dev.git/commitdiff
libfrog: create a threaded workqueue
authorDarrick J. Wong <darrick.wong@oracle.com>
Wed, 6 Dec 2017 15:17:08 +0000 (09:17 -0600)
committerEric Sandeen <sandeen@redhat.com>
Wed, 6 Dec 2017 15:17:08 +0000 (09:17 -0600)
Create a thread pool that queues and runs discrete work items.  This
will be a namespaced version of the pool in repair/threads.c; a
subsequent patch will switch repair over.  xfs_scrub will use the
generic thread pool.

Signed-off-by: Darrick J. Wong <darrick.wong@oracle.com>
Reviewed-by: Eric Sandeen <sandeen@redhat.com>
Signed-off-by: Eric Sandeen <sandeen@sandeen.net>
include/workqueue.h [new file with mode: 0644]
libfrog/Makefile
libfrog/workqueue.c [new file with mode: 0644]

diff --git a/include/workqueue.h b/include/workqueue.h
new file mode 100644 (file)
index 0000000..b4b3541
--- /dev/null
@@ -0,0 +1,55 @@
+/*
+ * Copyright (C) 2017 Oracle.  All Rights Reserved.
+ *
+ * Author: Darrick J. Wong <darrick.wong@oracle.com>
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it would be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write the Free Software Foundation,
+ * Inc.,  51 Franklin St, Fifth Floor, Boston, MA  02110-1301, USA.
+ *
+ * This code was adapted from repair/threads.h.
+ */
+#ifndef        _WORKQUEUE_H_
+#define        _WORKQUEUE_H_
+
+struct workqueue;
+
+typedef void workqueue_func_t(struct workqueue *wq, uint32_t index, void *arg);
+
+struct workqueue_item {
+       struct workqueue        *queue;
+       struct workqueue_item   *next;
+       workqueue_func_t        *function;
+       void                    *arg;
+       uint32_t                index;
+};
+
+struct workqueue {
+       void                    *wq_ctx;
+       pthread_t               *threads;
+       struct workqueue_item   *next_item;
+       struct workqueue_item   *last_item;
+       pthread_mutex_t         lock;
+       pthread_cond_t          wakeup;
+       unsigned int            item_count;
+       unsigned int            thread_count;
+       bool                    terminate;
+};
+
+int workqueue_create(struct workqueue *wq, void *wq_ctx,
+               unsigned int nr_workers);
+int workqueue_add(struct workqueue *wq, workqueue_func_t fn,
+               uint32_t index, void *arg);
+void workqueue_destroy(struct workqueue *wq);
+
+#endif /* _WORKQUEUE_H_ */
index 3fd42a4359610d2591707dac1cd26e6e248a871f..9a4362136d22cae028ed4c563509dadb6fbfba6c 100644 (file)
@@ -14,7 +14,8 @@ CFILES = \
 avl64.c \
 list_sort.c \
 radix-tree.c \
-util.c
+util.c \
+workqueue.c
 
 default: ltdepend $(LTLIBRARY)
 
diff --git a/libfrog/workqueue.c b/libfrog/workqueue.c
new file mode 100644 (file)
index 0000000..66e8075
--- /dev/null
@@ -0,0 +1,174 @@
+/*
+ * Copyright (C) 2017 Oracle.  All Rights Reserved.
+ *
+ * Author: Darrick J. Wong <darrick.wong@oracle.com>
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it would be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write the Free Software Foundation,
+ * Inc.,  51 Franklin St, Fifth Floor, Boston, MA  02110-1301, USA.
+ *
+ * This code was adapted from repair/threads.c, which (at the time)
+ * did not contain a copyright statement.
+ */
+#include <pthread.h>
+#include <signal.h>
+#include <stdlib.h>
+#include <string.h>
+#include <stdint.h>
+#include <stdbool.h>
+#include <errno.h>
+#include <assert.h>
+#include "workqueue.h"
+
+/* Main processing thread */
+static void *
+workqueue_thread(void *arg)
+{
+       struct workqueue        *wq = arg;
+       struct workqueue_item   *wi;
+
+       /*
+        * Loop pulling work from the passed in work queue.
+        * Check for notification to exit after every chunk of work.
+        */
+       while (1) {
+               pthread_mutex_lock(&wq->lock);
+
+               /*
+                * Wait for work.
+                */
+               while (wq->next_item == NULL && !wq->terminate) {
+                       assert(wq->item_count == 0);
+                       pthread_cond_wait(&wq->wakeup, &wq->lock);
+               }
+               if (wq->next_item == NULL && wq->terminate) {
+                       pthread_mutex_unlock(&wq->lock);
+                       break;
+               }
+
+               /*
+                *  Dequeue work from the head of the list.
+                */
+               assert(wq->item_count > 0);
+               wi = wq->next_item;
+               wq->next_item = wi->next;
+               wq->item_count--;
+
+               pthread_mutex_unlock(&wq->lock);
+
+               (wi->function)(wi->queue, wi->index, wi->arg);
+               free(wi);
+       }
+
+       return NULL;
+}
+
+/* Allocate a work queue and threads. */
+int
+workqueue_create(
+       struct workqueue        *wq,
+       void                    *wq_ctx,
+       unsigned int            nr_workers)
+{
+       unsigned int            i;
+       int                     err = 0;
+
+       memset(wq, 0, sizeof(*wq));
+       pthread_cond_init(&wq->wakeup, NULL);
+       pthread_mutex_init(&wq->lock, NULL);
+
+       wq->wq_ctx = wq_ctx;
+       wq->thread_count = nr_workers;
+       wq->threads = malloc(nr_workers * sizeof(pthread_t));
+       wq->terminate = false;
+
+       for (i = 0; i < nr_workers; i++) {
+               err = pthread_create(&wq->threads[i], NULL, workqueue_thread,
+                               wq);
+               if (err)
+                       break;
+       }
+
+       if (err)
+               workqueue_destroy(wq);
+       return err;
+}
+
+/*
+ * Create a work item consisting of a function and some arguments and
+ * schedule the work item to be run via the thread pool.
+ */
+int
+workqueue_add(
+       struct workqueue        *wq,
+       workqueue_func_t        func,
+       uint32_t                index,
+       void                    *arg)
+{
+       struct workqueue_item   *wi;
+
+       if (wq->thread_count == 0) {
+               func(wq, index, arg);
+               return 0;
+       }
+
+       wi = malloc(sizeof(struct workqueue_item));
+       if (wi == NULL)
+               return ENOMEM;
+
+       wi->function = func;
+       wi->index = index;
+       wi->arg = arg;
+       wi->queue = wq;
+       wi->next = NULL;
+
+       /* Now queue the new work structure to the work queue. */
+       pthread_mutex_lock(&wq->lock);
+       if (wq->next_item == NULL) {
+               wq->next_item = wi;
+               assert(wq->item_count == 0);
+               pthread_cond_signal(&wq->wakeup);
+       } else {
+               wq->last_item->next = wi;
+       }
+       wq->last_item = wi;
+       wq->item_count++;
+       pthread_mutex_unlock(&wq->lock);
+
+       return 0;
+}
+
+/*
+ * Wait for all pending work items to be processed and tear down the
+ * workqueue.
+ */
+void
+workqueue_destroy(
+       struct workqueue        *wq)
+{
+       unsigned int            i;
+
+       pthread_mutex_lock(&wq->lock);
+       wq->terminate = 1;
+       pthread_mutex_unlock(&wq->lock);
+
+       pthread_cond_broadcast(&wq->wakeup);
+
+       for (i = 0; i < wq->thread_count; i++)
+               pthread_join(wq->threads[i], NULL);
+
+       free(wq->threads);
+       pthread_mutex_destroy(&wq->lock);
+       pthread_cond_destroy(&wq->wakeup);
+       memset(wq, 0, sizeof(*wq));
+}