]> git.ipfire.org Git - thirdparty/xfsprogs-dev.git/blob - libfrog/workqueue.c
libfrog: fix bitmap error communication problems
[thirdparty/xfsprogs-dev.git] / libfrog / workqueue.c
1 // SPDX-License-Identifier: GPL-2.0+
2 /*
3 * Copyright (C) 2017 Oracle. All Rights Reserved.
4 * Author: Darrick J. Wong <darrick.wong@oracle.com>
5 */
6 #include <pthread.h>
7 #include <signal.h>
8 #include <stdlib.h>
9 #include <string.h>
10 #include <stdint.h>
11 #include <stdbool.h>
12 #include <errno.h>
13 #include <assert.h>
14 #include "workqueue.h"
15
16 /* Main processing thread */
17 static void *
18 workqueue_thread(void *arg)
19 {
20 struct workqueue *wq = arg;
21 struct workqueue_item *wi;
22
23 /*
24 * Loop pulling work from the passed in work queue.
25 * Check for notification to exit after every chunk of work.
26 */
27 while (1) {
28 pthread_mutex_lock(&wq->lock);
29
30 /*
31 * Wait for work.
32 */
33 while (wq->next_item == NULL && !wq->terminate) {
34 assert(wq->item_count == 0);
35 pthread_cond_wait(&wq->wakeup, &wq->lock);
36 }
37 if (wq->next_item == NULL && wq->terminate) {
38 pthread_mutex_unlock(&wq->lock);
39 break;
40 }
41
42 /*
43 * Dequeue work from the head of the list.
44 */
45 assert(wq->item_count > 0);
46 wi = wq->next_item;
47 wq->next_item = wi->next;
48 wq->item_count--;
49
50 pthread_mutex_unlock(&wq->lock);
51
52 (wi->function)(wi->queue, wi->index, wi->arg);
53 free(wi);
54 }
55
56 return NULL;
57 }
58
59 /* Allocate a work queue and threads. */
60 int
61 workqueue_create(
62 struct workqueue *wq,
63 void *wq_ctx,
64 unsigned int nr_workers)
65 {
66 unsigned int i;
67 int err = 0;
68
69 memset(wq, 0, sizeof(*wq));
70 err = pthread_cond_init(&wq->wakeup, NULL);
71 if (err)
72 return err;
73 err = pthread_mutex_init(&wq->lock, NULL);
74 if (err)
75 goto out_cond;
76
77 wq->wq_ctx = wq_ctx;
78 wq->thread_count = nr_workers;
79 wq->threads = malloc(nr_workers * sizeof(pthread_t));
80 if (!wq->threads) {
81 err = errno;
82 goto out_mutex;
83 }
84 wq->terminate = false;
85 wq->terminated = false;
86
87 for (i = 0; i < nr_workers; i++) {
88 err = pthread_create(&wq->threads[i], NULL, workqueue_thread,
89 wq);
90 if (err)
91 break;
92 }
93
94 /*
95 * If we encounter errors here, we have to signal and then wait for all
96 * the threads that may have been started running before we can destroy
97 * the workqueue.
98 */
99 if (err)
100 workqueue_destroy(wq);
101 return err;
102 out_mutex:
103 pthread_mutex_destroy(&wq->lock);
104 out_cond:
105 pthread_cond_destroy(&wq->wakeup);
106 return err;
107 }
108
109 /*
110 * Create a work item consisting of a function and some arguments and
111 * schedule the work item to be run via the thread pool.
112 */
113 int
114 workqueue_add(
115 struct workqueue *wq,
116 workqueue_func_t func,
117 uint32_t index,
118 void *arg)
119 {
120 struct workqueue_item *wi;
121 int ret;
122
123 assert(!wq->terminated);
124
125 if (wq->thread_count == 0) {
126 func(wq, index, arg);
127 return 0;
128 }
129
130 wi = malloc(sizeof(struct workqueue_item));
131 if (!wi)
132 return errno;
133
134 wi->function = func;
135 wi->index = index;
136 wi->arg = arg;
137 wi->queue = wq;
138 wi->next = NULL;
139
140 /* Now queue the new work structure to the work queue. */
141 pthread_mutex_lock(&wq->lock);
142 if (wq->next_item == NULL) {
143 assert(wq->item_count == 0);
144 ret = pthread_cond_signal(&wq->wakeup);
145 if (ret)
146 goto out_item;
147 wq->next_item = wi;
148 } else {
149 wq->last_item->next = wi;
150 }
151 wq->last_item = wi;
152 wq->item_count++;
153 pthread_mutex_unlock(&wq->lock);
154
155 return 0;
156 out_item:
157 free(wi);
158 return ret;
159 }
160
161 /*
162 * Wait for all pending work items to be processed and tear down the
163 * workqueue thread pool.
164 */
165 int
166 workqueue_terminate(
167 struct workqueue *wq)
168 {
169 unsigned int i;
170 int ret;
171
172 pthread_mutex_lock(&wq->lock);
173 wq->terminate = true;
174 pthread_mutex_unlock(&wq->lock);
175
176 ret = pthread_cond_broadcast(&wq->wakeup);
177 if (ret)
178 return ret;
179
180 for (i = 0; i < wq->thread_count; i++) {
181 ret = pthread_join(wq->threads[i], NULL);
182 if (ret)
183 return ret;
184 }
185
186 pthread_mutex_lock(&wq->lock);
187 wq->terminated = true;
188 pthread_mutex_unlock(&wq->lock);
189
190 return 0;
191 }
192
193 /* Tear down the workqueue. */
194 void
195 workqueue_destroy(
196 struct workqueue *wq)
197 {
198 assert(wq->terminated);
199
200 free(wq->threads);
201 pthread_mutex_destroy(&wq->lock);
202 pthread_cond_destroy(&wq->wakeup);
203 memset(wq, 0, sizeof(*wq));
204 }