]>
Commit | Line | Data |
---|---|---|
959ef981 | 1 | // SPDX-License-Identifier: GPL-2.0+ |
f434fd95 DW |
2 | /* |
3 | * Copyright (C) 2017 Oracle. All Rights Reserved. | |
f434fd95 | 4 | * Author: Darrick J. Wong <darrick.wong@oracle.com> |
f434fd95 DW |
5 | */ |
6 | #include <pthread.h> | |
7 | #include <signal.h> | |
8 | #include <stdlib.h> | |
9 | #include <string.h> | |
10 | #include <stdint.h> | |
11 | #include <stdbool.h> | |
12 | #include <errno.h> | |
13 | #include <assert.h> | |
14 | #include "workqueue.h" | |
15 | ||
16 | /* Main processing thread */ | |
17 | static void * | |
18 | workqueue_thread(void *arg) | |
19 | { | |
20 | struct workqueue *wq = arg; | |
21 | struct workqueue_item *wi; | |
22 | ||
23 | /* | |
24 | * Loop pulling work from the passed in work queue. | |
25 | * Check for notification to exit after every chunk of work. | |
26 | */ | |
27 | while (1) { | |
28 | pthread_mutex_lock(&wq->lock); | |
29 | ||
30 | /* | |
31 | * Wait for work. | |
32 | */ | |
33 | while (wq->next_item == NULL && !wq->terminate) { | |
34 | assert(wq->item_count == 0); | |
35 | pthread_cond_wait(&wq->wakeup, &wq->lock); | |
36 | } | |
37 | if (wq->next_item == NULL && wq->terminate) { | |
38 | pthread_mutex_unlock(&wq->lock); | |
39 | break; | |
40 | } | |
41 | ||
42 | /* | |
43 | * Dequeue work from the head of the list. | |
44 | */ | |
45 | assert(wq->item_count > 0); | |
46 | wi = wq->next_item; | |
47 | wq->next_item = wi->next; | |
48 | wq->item_count--; | |
49 | ||
50 | pthread_mutex_unlock(&wq->lock); | |
51 | ||
52 | (wi->function)(wi->queue, wi->index, wi->arg); | |
53 | free(wi); | |
54 | } | |
55 | ||
56 | return NULL; | |
57 | } | |
58 | ||
baed134d | 59 | /* Allocate a work queue and threads. Returns zero or negative error code. */ |
f434fd95 DW |
60 | int |
61 | workqueue_create( | |
62 | struct workqueue *wq, | |
63 | void *wq_ctx, | |
64 | unsigned int nr_workers) | |
65 | { | |
66 | unsigned int i; | |
67 | int err = 0; | |
68 | ||
69 | memset(wq, 0, sizeof(*wq)); | |
baed134d | 70 | err = -pthread_cond_init(&wq->wakeup, NULL); |
bfd9b38b DW |
71 | if (err) |
72 | return err; | |
baed134d | 73 | err = -pthread_mutex_init(&wq->lock, NULL); |
bfd9b38b DW |
74 | if (err) |
75 | goto out_cond; | |
f434fd95 DW |
76 | |
77 | wq->wq_ctx = wq_ctx; | |
78 | wq->thread_count = nr_workers; | |
79 | wq->threads = malloc(nr_workers * sizeof(pthread_t)); | |
bfd9b38b | 80 | if (!wq->threads) { |
baed134d | 81 | err = -errno; |
bfd9b38b DW |
82 | goto out_mutex; |
83 | } | |
f434fd95 | 84 | wq->terminate = false; |
71296cf8 | 85 | wq->terminated = false; |
f434fd95 DW |
86 | |
87 | for (i = 0; i < nr_workers; i++) { | |
baed134d | 88 | err = -pthread_create(&wq->threads[i], NULL, workqueue_thread, |
f434fd95 DW |
89 | wq); |
90 | if (err) | |
91 | break; | |
92 | } | |
93 | ||
bfd9b38b DW |
94 | /* |
95 | * If we encounter errors here, we have to signal and then wait for all | |
96 | * the threads that may have been started running before we can destroy | |
97 | * the workqueue. | |
98 | */ | |
f434fd95 DW |
99 | if (err) |
100 | workqueue_destroy(wq); | |
101 | return err; | |
bfd9b38b DW |
102 | out_mutex: |
103 | pthread_mutex_destroy(&wq->lock); | |
104 | out_cond: | |
105 | pthread_cond_destroy(&wq->wakeup); | |
106 | return err; | |
f434fd95 DW |
107 | } |
108 | ||
109 | /* | |
baed134d DW |
110 | * Create a work item consisting of a function and some arguments and schedule |
111 | * the work item to be run via the thread pool. Returns zero or a negative | |
112 | * error code. | |
f434fd95 DW |
113 | */ |
114 | int | |
115 | workqueue_add( | |
116 | struct workqueue *wq, | |
117 | workqueue_func_t func, | |
118 | uint32_t index, | |
119 | void *arg) | |
120 | { | |
121 | struct workqueue_item *wi; | |
bfd9b38b | 122 | int ret; |
f434fd95 | 123 | |
71296cf8 DW |
124 | assert(!wq->terminated); |
125 | ||
f434fd95 DW |
126 | if (wq->thread_count == 0) { |
127 | func(wq, index, arg); | |
128 | return 0; | |
129 | } | |
130 | ||
131 | wi = malloc(sizeof(struct workqueue_item)); | |
9d57cbfc | 132 | if (!wi) |
baed134d | 133 | return -errno; |
f434fd95 DW |
134 | |
135 | wi->function = func; | |
136 | wi->index = index; | |
137 | wi->arg = arg; | |
138 | wi->queue = wq; | |
139 | wi->next = NULL; | |
140 | ||
141 | /* Now queue the new work structure to the work queue. */ | |
142 | pthread_mutex_lock(&wq->lock); | |
143 | if (wq->next_item == NULL) { | |
f434fd95 | 144 | assert(wq->item_count == 0); |
baed134d | 145 | ret = -pthread_cond_signal(&wq->wakeup); |
a57cc320 DW |
146 | if (ret) { |
147 | pthread_mutex_unlock(&wq->lock); | |
148 | free(wi); | |
149 | return ret; | |
150 | } | |
bfd9b38b | 151 | wq->next_item = wi; |
f434fd95 DW |
152 | } else { |
153 | wq->last_item->next = wi; | |
154 | } | |
155 | wq->last_item = wi; | |
156 | wq->item_count++; | |
157 | pthread_mutex_unlock(&wq->lock); | |
158 | ||
159 | return 0; | |
160 | } | |
161 | ||
162 | /* | |
163 | * Wait for all pending work items to be processed and tear down the | |
baed134d | 164 | * workqueue thread pool. Returns zero or a negative error code. |
f434fd95 | 165 | */ |
71296cf8 DW |
166 | int |
167 | workqueue_terminate( | |
f434fd95 DW |
168 | struct workqueue *wq) |
169 | { | |
170 | unsigned int i; | |
71296cf8 DW |
171 | int ret; |
172 | ||
173 | pthread_mutex_lock(&wq->lock); | |
174 | wq->terminate = true; | |
175 | pthread_mutex_unlock(&wq->lock); | |
176 | ||
baed134d | 177 | ret = -pthread_cond_broadcast(&wq->wakeup); |
71296cf8 DW |
178 | if (ret) |
179 | return ret; | |
180 | ||
181 | for (i = 0; i < wq->thread_count; i++) { | |
baed134d | 182 | ret = -pthread_join(wq->threads[i], NULL); |
71296cf8 DW |
183 | if (ret) |
184 | return ret; | |
185 | } | |
f434fd95 DW |
186 | |
187 | pthread_mutex_lock(&wq->lock); | |
71296cf8 | 188 | wq->terminated = true; |
f434fd95 DW |
189 | pthread_mutex_unlock(&wq->lock); |
190 | ||
71296cf8 DW |
191 | return 0; |
192 | } | |
f434fd95 | 193 | |
71296cf8 DW |
194 | /* Tear down the workqueue. */ |
195 | void | |
196 | workqueue_destroy( | |
197 | struct workqueue *wq) | |
198 | { | |
199 | assert(wq->terminated); | |
f434fd95 DW |
200 | |
201 | free(wq->threads); | |
202 | pthread_mutex_destroy(&wq->lock); | |
203 | pthread_cond_destroy(&wq->wakeup); | |
204 | memset(wq, 0, sizeof(*wq)); | |
205 | } |