]>
git.ipfire.org Git - thirdparty/xfsprogs-dev.git/blob - libfrog/workqueue.c
1 // SPDX-License-Identifier: GPL-2.0+
3 * Copyright (C) 2017 Oracle. All Rights Reserved.
4 * Author: Darrick J. Wong <darrick.wong@oracle.com>
14 #include "workqueue.h"
16 /* Main processing thread */
18 workqueue_thread(void *arg
)
20 struct workqueue
*wq
= arg
;
21 struct workqueue_item
*wi
;
24 * Loop pulling work from the passed in work queue.
25 * Check for notification to exit after every chunk of work.
28 pthread_mutex_lock(&wq
->lock
);
33 while (wq
->next_item
== NULL
&& !wq
->terminate
) {
34 assert(wq
->item_count
== 0);
35 pthread_cond_wait(&wq
->wakeup
, &wq
->lock
);
37 if (wq
->next_item
== NULL
&& wq
->terminate
) {
38 pthread_mutex_unlock(&wq
->lock
);
43 * Dequeue work from the head of the list.
45 assert(wq
->item_count
> 0);
47 wq
->next_item
= wi
->next
;
50 pthread_mutex_unlock(&wq
->lock
);
52 (wi
->function
)(wi
->queue
, wi
->index
, wi
->arg
);
59 /* Allocate a work queue and threads. */
64 unsigned int nr_workers
)
69 memset(wq
, 0, sizeof(*wq
));
70 pthread_cond_init(&wq
->wakeup
, NULL
);
71 pthread_mutex_init(&wq
->lock
, NULL
);
74 wq
->thread_count
= nr_workers
;
75 wq
->threads
= malloc(nr_workers
* sizeof(pthread_t
));
76 wq
->terminate
= false;
78 for (i
= 0; i
< nr_workers
; i
++) {
79 err
= pthread_create(&wq
->threads
[i
], NULL
, workqueue_thread
,
86 workqueue_destroy(wq
);
91 * Create a work item consisting of a function and some arguments and
92 * schedule the work item to be run via the thread pool.
97 workqueue_func_t func
,
101 struct workqueue_item
*wi
;
103 if (wq
->thread_count
== 0) {
104 func(wq
, index
, arg
);
108 wi
= malloc(sizeof(struct workqueue_item
));
118 /* Now queue the new work structure to the work queue. */
119 pthread_mutex_lock(&wq
->lock
);
120 if (wq
->next_item
== NULL
) {
122 assert(wq
->item_count
== 0);
123 pthread_cond_signal(&wq
->wakeup
);
125 wq
->last_item
->next
= wi
;
129 pthread_mutex_unlock(&wq
->lock
);
135 * Wait for all pending work items to be processed and tear down the
140 struct workqueue
*wq
)
144 pthread_mutex_lock(&wq
->lock
);
146 pthread_mutex_unlock(&wq
->lock
);
148 pthread_cond_broadcast(&wq
->wakeup
);
150 for (i
= 0; i
< wq
->thread_count
; i
++)
151 pthread_join(wq
->threads
[i
], NULL
);
154 pthread_mutex_destroy(&wq
->lock
);
155 pthread_cond_destroy(&wq
->wakeup
);
156 memset(wq
, 0, sizeof(*wq
));