]> git.ipfire.org Git - thirdparty/xfsprogs-dev.git/blame - libfrog/workqueue.c
libxfs: return flush failures
[thirdparty/xfsprogs-dev.git] / libfrog / workqueue.c
CommitLineData
959ef981 1// SPDX-License-Identifier: GPL-2.0+
f434fd95
DW
2/*
3 * Copyright (C) 2017 Oracle. All Rights Reserved.
f434fd95 4 * Author: Darrick J. Wong <darrick.wong@oracle.com>
f434fd95
DW
5 */
6#include <pthread.h>
7#include <signal.h>
8#include <stdlib.h>
9#include <string.h>
10#include <stdint.h>
11#include <stdbool.h>
12#include <errno.h>
13#include <assert.h>
14#include "workqueue.h"
15
16/* Main processing thread */
17static void *
18workqueue_thread(void *arg)
19{
20 struct workqueue *wq = arg;
21 struct workqueue_item *wi;
22
23 /*
24 * Loop pulling work from the passed in work queue.
25 * Check for notification to exit after every chunk of work.
26 */
27 while (1) {
28 pthread_mutex_lock(&wq->lock);
29
30 /*
31 * Wait for work.
32 */
33 while (wq->next_item == NULL && !wq->terminate) {
34 assert(wq->item_count == 0);
35 pthread_cond_wait(&wq->wakeup, &wq->lock);
36 }
37 if (wq->next_item == NULL && wq->terminate) {
38 pthread_mutex_unlock(&wq->lock);
39 break;
40 }
41
42 /*
43 * Dequeue work from the head of the list.
44 */
45 assert(wq->item_count > 0);
46 wi = wq->next_item;
47 wq->next_item = wi->next;
48 wq->item_count--;
49
50 pthread_mutex_unlock(&wq->lock);
51
52 (wi->function)(wi->queue, wi->index, wi->arg);
53 free(wi);
54 }
55
56 return NULL;
57}
58
baed134d 59/* Allocate a work queue and threads. Returns zero or negative error code. */
f434fd95
DW
60int
61workqueue_create(
62 struct workqueue *wq,
63 void *wq_ctx,
64 unsigned int nr_workers)
65{
66 unsigned int i;
67 int err = 0;
68
69 memset(wq, 0, sizeof(*wq));
baed134d 70 err = -pthread_cond_init(&wq->wakeup, NULL);
bfd9b38b
DW
71 if (err)
72 return err;
baed134d 73 err = -pthread_mutex_init(&wq->lock, NULL);
bfd9b38b
DW
74 if (err)
75 goto out_cond;
f434fd95
DW
76
77 wq->wq_ctx = wq_ctx;
78 wq->thread_count = nr_workers;
79 wq->threads = malloc(nr_workers * sizeof(pthread_t));
bfd9b38b 80 if (!wq->threads) {
baed134d 81 err = -errno;
bfd9b38b
DW
82 goto out_mutex;
83 }
f434fd95 84 wq->terminate = false;
71296cf8 85 wq->terminated = false;
f434fd95
DW
86
87 for (i = 0; i < nr_workers; i++) {
baed134d 88 err = -pthread_create(&wq->threads[i], NULL, workqueue_thread,
f434fd95
DW
89 wq);
90 if (err)
91 break;
92 }
93
bfd9b38b
DW
94 /*
95 * If we encounter errors here, we have to signal and then wait for all
96 * the threads that may have been started running before we can destroy
97 * the workqueue.
98 */
f434fd95
DW
99 if (err)
100 workqueue_destroy(wq);
101 return err;
bfd9b38b
DW
102out_mutex:
103 pthread_mutex_destroy(&wq->lock);
104out_cond:
105 pthread_cond_destroy(&wq->wakeup);
106 return err;
f434fd95
DW
107}
108
109/*
baed134d
DW
110 * Create a work item consisting of a function and some arguments and schedule
111 * the work item to be run via the thread pool. Returns zero or a negative
112 * error code.
f434fd95
DW
113 */
114int
115workqueue_add(
116 struct workqueue *wq,
117 workqueue_func_t func,
118 uint32_t index,
119 void *arg)
120{
121 struct workqueue_item *wi;
bfd9b38b 122 int ret;
f434fd95 123
71296cf8
DW
124 assert(!wq->terminated);
125
f434fd95
DW
126 if (wq->thread_count == 0) {
127 func(wq, index, arg);
128 return 0;
129 }
130
131 wi = malloc(sizeof(struct workqueue_item));
9d57cbfc 132 if (!wi)
baed134d 133 return -errno;
f434fd95
DW
134
135 wi->function = func;
136 wi->index = index;
137 wi->arg = arg;
138 wi->queue = wq;
139 wi->next = NULL;
140
141 /* Now queue the new work structure to the work queue. */
142 pthread_mutex_lock(&wq->lock);
143 if (wq->next_item == NULL) {
f434fd95 144 assert(wq->item_count == 0);
baed134d 145 ret = -pthread_cond_signal(&wq->wakeup);
a57cc320
DW
146 if (ret) {
147 pthread_mutex_unlock(&wq->lock);
148 free(wi);
149 return ret;
150 }
bfd9b38b 151 wq->next_item = wi;
f434fd95
DW
152 } else {
153 wq->last_item->next = wi;
154 }
155 wq->last_item = wi;
156 wq->item_count++;
157 pthread_mutex_unlock(&wq->lock);
158
159 return 0;
160}
161
162/*
163 * Wait for all pending work items to be processed and tear down the
baed134d 164 * workqueue thread pool. Returns zero or a negative error code.
f434fd95 165 */
71296cf8
DW
166int
167workqueue_terminate(
f434fd95
DW
168 struct workqueue *wq)
169{
170 unsigned int i;
71296cf8
DW
171 int ret;
172
173 pthread_mutex_lock(&wq->lock);
174 wq->terminate = true;
175 pthread_mutex_unlock(&wq->lock);
176
baed134d 177 ret = -pthread_cond_broadcast(&wq->wakeup);
71296cf8
DW
178 if (ret)
179 return ret;
180
181 for (i = 0; i < wq->thread_count; i++) {
baed134d 182 ret = -pthread_join(wq->threads[i], NULL);
71296cf8
DW
183 if (ret)
184 return ret;
185 }
f434fd95
DW
186
187 pthread_mutex_lock(&wq->lock);
71296cf8 188 wq->terminated = true;
f434fd95
DW
189 pthread_mutex_unlock(&wq->lock);
190
71296cf8
DW
191 return 0;
192}
f434fd95 193
71296cf8
DW
194/* Tear down the workqueue. */
195void
196workqueue_destroy(
197 struct workqueue *wq)
198{
199 assert(wq->terminated);
f434fd95
DW
200
201 free(wq->threads);
202 pthread_mutex_destroy(&wq->lock);
203 pthread_cond_destroy(&wq->wakeup);
204 memset(wq, 0, sizeof(*wq));
205}