]> git.ipfire.org Git - thirdparty/xfsprogs-dev.git/blob - libfrog/workqueue.c
48038363ebd08da4b5b0d5bdc7e064913b197f28
[thirdparty/xfsprogs-dev.git] / libfrog / workqueue.c
1 // SPDX-License-Identifier: GPL-2.0+
2 /*
3 * Copyright (C) 2017 Oracle. All Rights Reserved.
4 * Author: Darrick J. Wong <darrick.wong@oracle.com>
5 */
6 #include <pthread.h>
7 #include <signal.h>
8 #include <stdlib.h>
9 #include <string.h>
10 #include <stdint.h>
11 #include <stdbool.h>
12 #include <errno.h>
13 #include <assert.h>
14 #include "workqueue.h"
15
16 /* Main processing thread */
17 static void *
18 workqueue_thread(void *arg)
19 {
20 struct workqueue *wq = arg;
21 struct workqueue_item *wi;
22
23 /*
24 * Loop pulling work from the passed in work queue.
25 * Check for notification to exit after every chunk of work.
26 */
27 while (1) {
28 pthread_mutex_lock(&wq->lock);
29
30 /*
31 * Wait for work.
32 */
33 while (wq->next_item == NULL && !wq->terminate) {
34 assert(wq->item_count == 0);
35 pthread_cond_wait(&wq->wakeup, &wq->lock);
36 }
37 if (wq->next_item == NULL && wq->terminate) {
38 pthread_mutex_unlock(&wq->lock);
39 break;
40 }
41
42 /*
43 * Dequeue work from the head of the list.
44 */
45 assert(wq->item_count > 0);
46 wi = wq->next_item;
47 wq->next_item = wi->next;
48 wq->item_count--;
49
50 pthread_mutex_unlock(&wq->lock);
51
52 (wi->function)(wi->queue, wi->index, wi->arg);
53 free(wi);
54 }
55
56 return NULL;
57 }
58
59 /* Allocate a work queue and threads. */
60 int
61 workqueue_create(
62 struct workqueue *wq,
63 void *wq_ctx,
64 unsigned int nr_workers)
65 {
66 unsigned int i;
67 int err = 0;
68
69 memset(wq, 0, sizeof(*wq));
70 err = pthread_cond_init(&wq->wakeup, NULL);
71 if (err)
72 return err;
73 err = pthread_mutex_init(&wq->lock, NULL);
74 if (err)
75 goto out_cond;
76
77 wq->wq_ctx = wq_ctx;
78 wq->thread_count = nr_workers;
79 wq->threads = malloc(nr_workers * sizeof(pthread_t));
80 if (!wq->threads) {
81 err = errno;
82 goto out_mutex;
83 }
84 wq->terminate = false;
85
86 for (i = 0; i < nr_workers; i++) {
87 err = pthread_create(&wq->threads[i], NULL, workqueue_thread,
88 wq);
89 if (err)
90 break;
91 }
92
93 /*
94 * If we encounter errors here, we have to signal and then wait for all
95 * the threads that may have been started running before we can destroy
96 * the workqueue.
97 */
98 if (err)
99 workqueue_destroy(wq);
100 return err;
101 out_mutex:
102 pthread_mutex_destroy(&wq->lock);
103 out_cond:
104 pthread_cond_destroy(&wq->wakeup);
105 return err;
106 }
107
108 /*
109 * Create a work item consisting of a function and some arguments and
110 * schedule the work item to be run via the thread pool.
111 */
112 int
113 workqueue_add(
114 struct workqueue *wq,
115 workqueue_func_t func,
116 uint32_t index,
117 void *arg)
118 {
119 struct workqueue_item *wi;
120 int ret;
121
122 if (wq->thread_count == 0) {
123 func(wq, index, arg);
124 return 0;
125 }
126
127 wi = malloc(sizeof(struct workqueue_item));
128 if (!wi)
129 return errno;
130
131 wi->function = func;
132 wi->index = index;
133 wi->arg = arg;
134 wi->queue = wq;
135 wi->next = NULL;
136
137 /* Now queue the new work structure to the work queue. */
138 pthread_mutex_lock(&wq->lock);
139 if (wq->next_item == NULL) {
140 assert(wq->item_count == 0);
141 ret = pthread_cond_signal(&wq->wakeup);
142 if (ret)
143 goto out_item;
144 wq->next_item = wi;
145 } else {
146 wq->last_item->next = wi;
147 }
148 wq->last_item = wi;
149 wq->item_count++;
150 pthread_mutex_unlock(&wq->lock);
151
152 return 0;
153 out_item:
154 free(wi);
155 return ret;
156 }
157
158 /*
159 * Wait for all pending work items to be processed and tear down the
160 * workqueue.
161 */
162 void
163 workqueue_destroy(
164 struct workqueue *wq)
165 {
166 unsigned int i;
167
168 pthread_mutex_lock(&wq->lock);
169 wq->terminate = 1;
170 pthread_mutex_unlock(&wq->lock);
171
172 pthread_cond_broadcast(&wq->wakeup);
173
174 for (i = 0; i < wq->thread_count; i++)
175 pthread_join(wq->threads[i], NULL);
176
177 free(wq->threads);
178 pthread_mutex_destroy(&wq->lock);
179 pthread_cond_destroy(&wq->wakeup);
180 memset(wq, 0, sizeof(*wq));
181 }