]> git.ipfire.org Git - thirdparty/xfsprogs-dev.git/blame - libfrog/workqueue.c
Merge branch 'libxfs-4.15-sync' into for-next
[thirdparty/xfsprogs-dev.git] / libfrog / workqueue.c
CommitLineData
f434fd95
DW
1/*
2 * Copyright (C) 2017 Oracle. All Rights Reserved.
3 *
4 * Author: Darrick J. Wong <darrick.wong@oracle.com>
5 *
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License
8 * as published by the Free Software Foundation; either version 2
9 * of the License, or (at your option) any later version.
10 *
11 * This program is distributed in the hope that it would be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write the Free Software Foundation,
18 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.
19 *
20 * This code was adapted from repair/threads.c, which (at the time)
21 * did not contain a copyright statement.
22 */
23#include <pthread.h>
24#include <signal.h>
25#include <stdlib.h>
26#include <string.h>
27#include <stdint.h>
28#include <stdbool.h>
29#include <errno.h>
30#include <assert.h>
31#include "workqueue.h"
32
33/* Main processing thread */
34static void *
35workqueue_thread(void *arg)
36{
37 struct workqueue *wq = arg;
38 struct workqueue_item *wi;
39
40 /*
41 * Loop pulling work from the passed in work queue.
42 * Check for notification to exit after every chunk of work.
43 */
44 while (1) {
45 pthread_mutex_lock(&wq->lock);
46
47 /*
48 * Wait for work.
49 */
50 while (wq->next_item == NULL && !wq->terminate) {
51 assert(wq->item_count == 0);
52 pthread_cond_wait(&wq->wakeup, &wq->lock);
53 }
54 if (wq->next_item == NULL && wq->terminate) {
55 pthread_mutex_unlock(&wq->lock);
56 break;
57 }
58
59 /*
60 * Dequeue work from the head of the list.
61 */
62 assert(wq->item_count > 0);
63 wi = wq->next_item;
64 wq->next_item = wi->next;
65 wq->item_count--;
66
67 pthread_mutex_unlock(&wq->lock);
68
69 (wi->function)(wi->queue, wi->index, wi->arg);
70 free(wi);
71 }
72
73 return NULL;
74}
75
76/* Allocate a work queue and threads. */
77int
78workqueue_create(
79 struct workqueue *wq,
80 void *wq_ctx,
81 unsigned int nr_workers)
82{
83 unsigned int i;
84 int err = 0;
85
86 memset(wq, 0, sizeof(*wq));
87 pthread_cond_init(&wq->wakeup, NULL);
88 pthread_mutex_init(&wq->lock, NULL);
89
90 wq->wq_ctx = wq_ctx;
91 wq->thread_count = nr_workers;
92 wq->threads = malloc(nr_workers * sizeof(pthread_t));
93 wq->terminate = false;
94
95 for (i = 0; i < nr_workers; i++) {
96 err = pthread_create(&wq->threads[i], NULL, workqueue_thread,
97 wq);
98 if (err)
99 break;
100 }
101
102 if (err)
103 workqueue_destroy(wq);
104 return err;
105}
106
107/*
108 * Create a work item consisting of a function and some arguments and
109 * schedule the work item to be run via the thread pool.
110 */
111int
112workqueue_add(
113 struct workqueue *wq,
114 workqueue_func_t func,
115 uint32_t index,
116 void *arg)
117{
118 struct workqueue_item *wi;
119
120 if (wq->thread_count == 0) {
121 func(wq, index, arg);
122 return 0;
123 }
124
125 wi = malloc(sizeof(struct workqueue_item));
126 if (wi == NULL)
127 return ENOMEM;
128
129 wi->function = func;
130 wi->index = index;
131 wi->arg = arg;
132 wi->queue = wq;
133 wi->next = NULL;
134
135 /* Now queue the new work structure to the work queue. */
136 pthread_mutex_lock(&wq->lock);
137 if (wq->next_item == NULL) {
138 wq->next_item = wi;
139 assert(wq->item_count == 0);
140 pthread_cond_signal(&wq->wakeup);
141 } else {
142 wq->last_item->next = wi;
143 }
144 wq->last_item = wi;
145 wq->item_count++;
146 pthread_mutex_unlock(&wq->lock);
147
148 return 0;
149}
150
151/*
152 * Wait for all pending work items to be processed and tear down the
153 * workqueue.
154 */
155void
156workqueue_destroy(
157 struct workqueue *wq)
158{
159 unsigned int i;
160
161 pthread_mutex_lock(&wq->lock);
162 wq->terminate = 1;
163 pthread_mutex_unlock(&wq->lock);
164
165 pthread_cond_broadcast(&wq->wakeup);
166
167 for (i = 0; i < wq->thread_count; i++)
168 pthread_join(wq->threads[i], NULL);
169
170 free(wq->threads);
171 pthread_mutex_destroy(&wq->lock);
172 pthread_cond_destroy(&wq->wakeup);
173 memset(wq, 0, sizeof(*wq));
174}