]>
git.ipfire.org Git - thirdparty/xfsprogs-dev.git/blob - libfrog/workqueue.c
2 * Copyright (C) 2017 Oracle. All Rights Reserved.
4 * Author: Darrick J. Wong <darrick.wong@oracle.com>
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License
8 * as published by the Free Software Foundation; either version 2
9 * of the License, or (at your option) any later version.
11 * This program is distributed in the hope that it would be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write the Free Software Foundation,
18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.
20 * This code was adapted from repair/threads.c, which (at the time)
21 * did not contain a copyright statement.
31 #include "workqueue.h"
33 /* Main processing thread */
35 workqueue_thread(void *arg
)
37 struct workqueue
*wq
= arg
;
38 struct workqueue_item
*wi
;
41 * Loop pulling work from the passed in work queue.
42 * Check for notification to exit after every chunk of work.
45 pthread_mutex_lock(&wq
->lock
);
50 while (wq
->next_item
== NULL
&& !wq
->terminate
) {
51 assert(wq
->item_count
== 0);
52 pthread_cond_wait(&wq
->wakeup
, &wq
->lock
);
54 if (wq
->next_item
== NULL
&& wq
->terminate
) {
55 pthread_mutex_unlock(&wq
->lock
);
60 * Dequeue work from the head of the list.
62 assert(wq
->item_count
> 0);
64 wq
->next_item
= wi
->next
;
67 pthread_mutex_unlock(&wq
->lock
);
69 (wi
->function
)(wi
->queue
, wi
->index
, wi
->arg
);
76 /* Allocate a work queue and threads. */
81 unsigned int nr_workers
)
86 memset(wq
, 0, sizeof(*wq
));
87 pthread_cond_init(&wq
->wakeup
, NULL
);
88 pthread_mutex_init(&wq
->lock
, NULL
);
91 wq
->thread_count
= nr_workers
;
92 wq
->threads
= malloc(nr_workers
* sizeof(pthread_t
));
93 wq
->terminate
= false;
95 for (i
= 0; i
< nr_workers
; i
++) {
96 err
= pthread_create(&wq
->threads
[i
], NULL
, workqueue_thread
,
103 workqueue_destroy(wq
);
108 * Create a work item consisting of a function and some arguments and
109 * schedule the work item to be run via the thread pool.
113 struct workqueue
*wq
,
114 workqueue_func_t func
,
118 struct workqueue_item
*wi
;
120 if (wq
->thread_count
== 0) {
121 func(wq
, index
, arg
);
125 wi
= malloc(sizeof(struct workqueue_item
));
135 /* Now queue the new work structure to the work queue. */
136 pthread_mutex_lock(&wq
->lock
);
137 if (wq
->next_item
== NULL
) {
139 assert(wq
->item_count
== 0);
140 pthread_cond_signal(&wq
->wakeup
);
142 wq
->last_item
->next
= wi
;
146 pthread_mutex_unlock(&wq
->lock
);
152 * Wait for all pending work items to be processed and tear down the
157 struct workqueue
*wq
)
161 pthread_mutex_lock(&wq
->lock
);
163 pthread_mutex_unlock(&wq
->lock
);
165 pthread_cond_broadcast(&wq
->wakeup
);
167 for (i
= 0; i
< wq
->thread_count
; i
++)
168 pthread_join(wq
->threads
[i
], NULL
);
171 pthread_mutex_destroy(&wq
->lock
);
172 pthread_cond_destroy(&wq
->wakeup
);
173 memset(wq
, 0, sizeof(*wq
));