]> git.ipfire.org Git - thirdparty/xfsprogs-dev.git/blame - libfrog/workqueue.c
libfrog: fix bitmap error communication problems
[thirdparty/xfsprogs-dev.git] / libfrog / workqueue.c
CommitLineData
959ef981 1// SPDX-License-Identifier: GPL-2.0+
f434fd95
DW
2/*
3 * Copyright (C) 2017 Oracle. All Rights Reserved.
f434fd95 4 * Author: Darrick J. Wong <darrick.wong@oracle.com>
f434fd95
DW
5 */
6#include <pthread.h>
7#include <signal.h>
8#include <stdlib.h>
9#include <string.h>
10#include <stdint.h>
11#include <stdbool.h>
12#include <errno.h>
13#include <assert.h>
14#include "workqueue.h"
15
16/* Main processing thread */
17static void *
18workqueue_thread(void *arg)
19{
20 struct workqueue *wq = arg;
21 struct workqueue_item *wi;
22
23 /*
24 * Loop pulling work from the passed in work queue.
25 * Check for notification to exit after every chunk of work.
26 */
27 while (1) {
28 pthread_mutex_lock(&wq->lock);
29
30 /*
31 * Wait for work.
32 */
33 while (wq->next_item == NULL && !wq->terminate) {
34 assert(wq->item_count == 0);
35 pthread_cond_wait(&wq->wakeup, &wq->lock);
36 }
37 if (wq->next_item == NULL && wq->terminate) {
38 pthread_mutex_unlock(&wq->lock);
39 break;
40 }
41
42 /*
43 * Dequeue work from the head of the list.
44 */
45 assert(wq->item_count > 0);
46 wi = wq->next_item;
47 wq->next_item = wi->next;
48 wq->item_count--;
49
50 pthread_mutex_unlock(&wq->lock);
51
52 (wi->function)(wi->queue, wi->index, wi->arg);
53 free(wi);
54 }
55
56 return NULL;
57}
58
59/* Allocate a work queue and threads. */
60int
61workqueue_create(
62 struct workqueue *wq,
63 void *wq_ctx,
64 unsigned int nr_workers)
65{
66 unsigned int i;
67 int err = 0;
68
69 memset(wq, 0, sizeof(*wq));
bfd9b38b
DW
70 err = pthread_cond_init(&wq->wakeup, NULL);
71 if (err)
72 return err;
73 err = pthread_mutex_init(&wq->lock, NULL);
74 if (err)
75 goto out_cond;
f434fd95
DW
76
77 wq->wq_ctx = wq_ctx;
78 wq->thread_count = nr_workers;
79 wq->threads = malloc(nr_workers * sizeof(pthread_t));
bfd9b38b
DW
80 if (!wq->threads) {
81 err = errno;
82 goto out_mutex;
83 }
f434fd95 84 wq->terminate = false;
71296cf8 85 wq->terminated = false;
f434fd95
DW
86
87 for (i = 0; i < nr_workers; i++) {
88 err = pthread_create(&wq->threads[i], NULL, workqueue_thread,
89 wq);
90 if (err)
91 break;
92 }
93
bfd9b38b
DW
94 /*
95 * If we encounter errors here, we have to signal and then wait for all
96 * the threads that may have been started running before we can destroy
97 * the workqueue.
98 */
f434fd95
DW
99 if (err)
100 workqueue_destroy(wq);
101 return err;
bfd9b38b
DW
102out_mutex:
103 pthread_mutex_destroy(&wq->lock);
104out_cond:
105 pthread_cond_destroy(&wq->wakeup);
106 return err;
f434fd95
DW
107}
108
109/*
110 * Create a work item consisting of a function and some arguments and
111 * schedule the work item to be run via the thread pool.
112 */
113int
114workqueue_add(
115 struct workqueue *wq,
116 workqueue_func_t func,
117 uint32_t index,
118 void *arg)
119{
120 struct workqueue_item *wi;
bfd9b38b 121 int ret;
f434fd95 122
71296cf8
DW
123 assert(!wq->terminated);
124
f434fd95
DW
125 if (wq->thread_count == 0) {
126 func(wq, index, arg);
127 return 0;
128 }
129
130 wi = malloc(sizeof(struct workqueue_item));
9d57cbfc
DW
131 if (!wi)
132 return errno;
f434fd95
DW
133
134 wi->function = func;
135 wi->index = index;
136 wi->arg = arg;
137 wi->queue = wq;
138 wi->next = NULL;
139
140 /* Now queue the new work structure to the work queue. */
141 pthread_mutex_lock(&wq->lock);
142 if (wq->next_item == NULL) {
f434fd95 143 assert(wq->item_count == 0);
bfd9b38b
DW
144 ret = pthread_cond_signal(&wq->wakeup);
145 if (ret)
146 goto out_item;
147 wq->next_item = wi;
f434fd95
DW
148 } else {
149 wq->last_item->next = wi;
150 }
151 wq->last_item = wi;
152 wq->item_count++;
153 pthread_mutex_unlock(&wq->lock);
154
155 return 0;
bfd9b38b
DW
156out_item:
157 free(wi);
158 return ret;
f434fd95
DW
159}
160
161/*
162 * Wait for all pending work items to be processed and tear down the
71296cf8 163 * workqueue thread pool.
f434fd95 164 */
71296cf8
DW
165int
166workqueue_terminate(
f434fd95
DW
167 struct workqueue *wq)
168{
169 unsigned int i;
71296cf8
DW
170 int ret;
171
172 pthread_mutex_lock(&wq->lock);
173 wq->terminate = true;
174 pthread_mutex_unlock(&wq->lock);
175
176 ret = pthread_cond_broadcast(&wq->wakeup);
177 if (ret)
178 return ret;
179
180 for (i = 0; i < wq->thread_count; i++) {
181 ret = pthread_join(wq->threads[i], NULL);
182 if (ret)
183 return ret;
184 }
f434fd95
DW
185
186 pthread_mutex_lock(&wq->lock);
71296cf8 187 wq->terminated = true;
f434fd95
DW
188 pthread_mutex_unlock(&wq->lock);
189
71296cf8
DW
190 return 0;
191}
f434fd95 192
71296cf8
DW
193/* Tear down the workqueue. */
194void
195workqueue_destroy(
196 struct workqueue *wq)
197{
198 assert(wq->terminated);
f434fd95
DW
199
200 free(wq->threads);
201 pthread_mutex_destroy(&wq->lock);
202 pthread_cond_destroy(&wq->wakeup);
203 memset(wq, 0, sizeof(*wq));
204}