]> git.ipfire.org Git - thirdparty/xfsprogs-dev.git/blob - libfrog/workqueue.c
libfrog: fix workqueue error communication problems
[thirdparty/xfsprogs-dev.git] / libfrog / workqueue.c
1 // SPDX-License-Identifier: GPL-2.0+
2 /*
3 * Copyright (C) 2017 Oracle. All Rights Reserved.
4 * Author: Darrick J. Wong <darrick.wong@oracle.com>
5 */
6 #include <pthread.h>
7 #include <signal.h>
8 #include <stdlib.h>
9 #include <string.h>
10 #include <stdint.h>
11 #include <stdbool.h>
12 #include <errno.h>
13 #include <assert.h>
14 #include "workqueue.h"
15
16 /* Main processing thread */
17 static void *
18 workqueue_thread(void *arg)
19 {
20 struct workqueue *wq = arg;
21 struct workqueue_item *wi;
22
23 /*
24 * Loop pulling work from the passed in work queue.
25 * Check for notification to exit after every chunk of work.
26 */
27 while (1) {
28 pthread_mutex_lock(&wq->lock);
29
30 /*
31 * Wait for work.
32 */
33 while (wq->next_item == NULL && !wq->terminate) {
34 assert(wq->item_count == 0);
35 pthread_cond_wait(&wq->wakeup, &wq->lock);
36 }
37 if (wq->next_item == NULL && wq->terminate) {
38 pthread_mutex_unlock(&wq->lock);
39 break;
40 }
41
42 /*
43 * Dequeue work from the head of the list.
44 */
45 assert(wq->item_count > 0);
46 wi = wq->next_item;
47 wq->next_item = wi->next;
48 wq->item_count--;
49
50 pthread_mutex_unlock(&wq->lock);
51
52 (wi->function)(wi->queue, wi->index, wi->arg);
53 free(wi);
54 }
55
56 return NULL;
57 }
58
59 /* Allocate a work queue and threads. */
60 int
61 workqueue_create(
62 struct workqueue *wq,
63 void *wq_ctx,
64 unsigned int nr_workers)
65 {
66 unsigned int i;
67 int err = 0;
68
69 memset(wq, 0, sizeof(*wq));
70 pthread_cond_init(&wq->wakeup, NULL);
71 pthread_mutex_init(&wq->lock, NULL);
72
73 wq->wq_ctx = wq_ctx;
74 wq->thread_count = nr_workers;
75 wq->threads = malloc(nr_workers * sizeof(pthread_t));
76 wq->terminate = false;
77
78 for (i = 0; i < nr_workers; i++) {
79 err = pthread_create(&wq->threads[i], NULL, workqueue_thread,
80 wq);
81 if (err)
82 break;
83 }
84
85 if (err)
86 workqueue_destroy(wq);
87 return err;
88 }
89
90 /*
91 * Create a work item consisting of a function and some arguments and
92 * schedule the work item to be run via the thread pool.
93 */
94 int
95 workqueue_add(
96 struct workqueue *wq,
97 workqueue_func_t func,
98 uint32_t index,
99 void *arg)
100 {
101 struct workqueue_item *wi;
102
103 if (wq->thread_count == 0) {
104 func(wq, index, arg);
105 return 0;
106 }
107
108 wi = malloc(sizeof(struct workqueue_item));
109 if (!wi)
110 return errno;
111
112 wi->function = func;
113 wi->index = index;
114 wi->arg = arg;
115 wi->queue = wq;
116 wi->next = NULL;
117
118 /* Now queue the new work structure to the work queue. */
119 pthread_mutex_lock(&wq->lock);
120 if (wq->next_item == NULL) {
121 wq->next_item = wi;
122 assert(wq->item_count == 0);
123 pthread_cond_signal(&wq->wakeup);
124 } else {
125 wq->last_item->next = wi;
126 }
127 wq->last_item = wi;
128 wq->item_count++;
129 pthread_mutex_unlock(&wq->lock);
130
131 return 0;
132 }
133
134 /*
135 * Wait for all pending work items to be processed and tear down the
136 * workqueue.
137 */
138 void
139 workqueue_destroy(
140 struct workqueue *wq)
141 {
142 unsigned int i;
143
144 pthread_mutex_lock(&wq->lock);
145 wq->terminate = 1;
146 pthread_mutex_unlock(&wq->lock);
147
148 pthread_cond_broadcast(&wq->wakeup);
149
150 for (i = 0; i < wq->thread_count; i++)
151 pthread_join(wq->threads[i], NULL);
152
153 free(wq->threads);
154 pthread_mutex_destroy(&wq->lock);
155 pthread_cond_destroy(&wq->wakeup);
156 memset(wq, 0, sizeof(*wq));
157 }