]>
git.ipfire.org Git - thirdparty/xfsprogs-dev.git/blob - libfrog/workqueue.c
1 // SPDX-License-Identifier: GPL-2.0+
3 * Copyright (C) 2017 Oracle. All Rights Reserved.
4 * Author: Darrick J. Wong <darrick.wong@oracle.com>
14 #include "workqueue.h"
16 /* Main processing thread */
18 workqueue_thread(void *arg
)
20 struct workqueue
*wq
= arg
;
21 struct workqueue_item
*wi
;
24 * Loop pulling work from the passed in work queue.
25 * Check for notification to exit after every chunk of work.
28 pthread_mutex_lock(&wq
->lock
);
33 while (wq
->next_item
== NULL
&& !wq
->terminate
) {
34 assert(wq
->item_count
== 0);
35 pthread_cond_wait(&wq
->wakeup
, &wq
->lock
);
37 if (wq
->next_item
== NULL
&& wq
->terminate
) {
38 pthread_mutex_unlock(&wq
->lock
);
43 * Dequeue work from the head of the list.
45 assert(wq
->item_count
> 0);
47 wq
->next_item
= wi
->next
;
50 pthread_mutex_unlock(&wq
->lock
);
52 (wi
->function
)(wi
->queue
, wi
->index
, wi
->arg
);
59 /* Allocate a work queue and threads. Returns zero or negative error code. */
64 unsigned int nr_workers
)
69 memset(wq
, 0, sizeof(*wq
));
70 err
= -pthread_cond_init(&wq
->wakeup
, NULL
);
73 err
= -pthread_mutex_init(&wq
->lock
, NULL
);
78 wq
->thread_count
= nr_workers
;
79 wq
->threads
= malloc(nr_workers
* sizeof(pthread_t
));
84 wq
->terminate
= false;
85 wq
->terminated
= false;
87 for (i
= 0; i
< nr_workers
; i
++) {
88 err
= -pthread_create(&wq
->threads
[i
], NULL
, workqueue_thread
,
95 * If we encounter errors here, we have to signal and then wait for all
96 * the threads that may have been started running before we can destroy
100 workqueue_destroy(wq
);
103 pthread_mutex_destroy(&wq
->lock
);
105 pthread_cond_destroy(&wq
->wakeup
);
110 * Create a work item consisting of a function and some arguments and schedule
111 * the work item to be run via the thread pool. Returns zero or a negative
116 struct workqueue
*wq
,
117 workqueue_func_t func
,
121 struct workqueue_item
*wi
;
124 assert(!wq
->terminated
);
126 if (wq
->thread_count
== 0) {
127 func(wq
, index
, arg
);
131 wi
= malloc(sizeof(struct workqueue_item
));
141 /* Now queue the new work structure to the work queue. */
142 pthread_mutex_lock(&wq
->lock
);
143 if (wq
->next_item
== NULL
) {
144 assert(wq
->item_count
== 0);
145 ret
= -pthread_cond_signal(&wq
->wakeup
);
147 pthread_mutex_unlock(&wq
->lock
);
153 wq
->last_item
->next
= wi
;
157 pthread_mutex_unlock(&wq
->lock
);
163 * Wait for all pending work items to be processed and tear down the
164 * workqueue thread pool. Returns zero or a negative error code.
168 struct workqueue
*wq
)
173 pthread_mutex_lock(&wq
->lock
);
174 wq
->terminate
= true;
175 pthread_mutex_unlock(&wq
->lock
);
177 ret
= -pthread_cond_broadcast(&wq
->wakeup
);
181 for (i
= 0; i
< wq
->thread_count
; i
++) {
182 ret
= -pthread_join(wq
->threads
[i
], NULL
);
187 pthread_mutex_lock(&wq
->lock
);
188 wq
->terminated
= true;
189 pthread_mutex_unlock(&wq
->lock
);
194 /* Tear down the workqueue. */
197 struct workqueue
*wq
)
199 assert(wq
->terminated
);
202 pthread_mutex_destroy(&wq
->lock
);
203 pthread_cond_destroy(&wq
->wakeup
);
204 memset(wq
, 0, sizeof(*wq
));