]>
git.ipfire.org Git - thirdparty/xfsprogs-dev.git/blob - libfrog/workqueue.c
48038363ebd08da4b5b0d5bdc7e064913b197f28
1 // SPDX-License-Identifier: GPL-2.0+
3 * Copyright (C) 2017 Oracle. All Rights Reserved.
4 * Author: Darrick J. Wong <darrick.wong@oracle.com>
14 #include "workqueue.h"
16 /* Main processing thread */
18 workqueue_thread(void *arg
)
20 struct workqueue
*wq
= arg
;
21 struct workqueue_item
*wi
;
24 * Loop pulling work from the passed in work queue.
25 * Check for notification to exit after every chunk of work.
28 pthread_mutex_lock(&wq
->lock
);
33 while (wq
->next_item
== NULL
&& !wq
->terminate
) {
34 assert(wq
->item_count
== 0);
35 pthread_cond_wait(&wq
->wakeup
, &wq
->lock
);
37 if (wq
->next_item
== NULL
&& wq
->terminate
) {
38 pthread_mutex_unlock(&wq
->lock
);
43 * Dequeue work from the head of the list.
45 assert(wq
->item_count
> 0);
47 wq
->next_item
= wi
->next
;
50 pthread_mutex_unlock(&wq
->lock
);
52 (wi
->function
)(wi
->queue
, wi
->index
, wi
->arg
);
59 /* Allocate a work queue and threads. */
64 unsigned int nr_workers
)
69 memset(wq
, 0, sizeof(*wq
));
70 err
= pthread_cond_init(&wq
->wakeup
, NULL
);
73 err
= pthread_mutex_init(&wq
->lock
, NULL
);
78 wq
->thread_count
= nr_workers
;
79 wq
->threads
= malloc(nr_workers
* sizeof(pthread_t
));
84 wq
->terminate
= false;
86 for (i
= 0; i
< nr_workers
; i
++) {
87 err
= pthread_create(&wq
->threads
[i
], NULL
, workqueue_thread
,
94 * If we encounter errors here, we have to signal and then wait for all
95 * the threads that may have been started running before we can destroy
99 workqueue_destroy(wq
);
102 pthread_mutex_destroy(&wq
->lock
);
104 pthread_cond_destroy(&wq
->wakeup
);
109 * Create a work item consisting of a function and some arguments and
110 * schedule the work item to be run via the thread pool.
114 struct workqueue
*wq
,
115 workqueue_func_t func
,
119 struct workqueue_item
*wi
;
122 if (wq
->thread_count
== 0) {
123 func(wq
, index
, arg
);
127 wi
= malloc(sizeof(struct workqueue_item
));
137 /* Now queue the new work structure to the work queue. */
138 pthread_mutex_lock(&wq
->lock
);
139 if (wq
->next_item
== NULL
) {
140 assert(wq
->item_count
== 0);
141 ret
= pthread_cond_signal(&wq
->wakeup
);
146 wq
->last_item
->next
= wi
;
150 pthread_mutex_unlock(&wq
->lock
);
159 * Wait for all pending work items to be processed and tear down the
164 struct workqueue
*wq
)
168 pthread_mutex_lock(&wq
->lock
);
170 pthread_mutex_unlock(&wq
->lock
);
172 pthread_cond_broadcast(&wq
->wakeup
);
174 for (i
= 0; i
< wq
->thread_count
; i
++)
175 pthread_join(wq
->threads
[i
], NULL
);
178 pthread_mutex_destroy(&wq
->lock
);
179 pthread_cond_destroy(&wq
->wakeup
);
180 memset(wq
, 0, sizeof(*wq
));