]>
Commit | Line | Data |
---|---|---|
959ef981 | 1 | // SPDX-License-Identifier: GPL-2.0+ |
f434fd95 DW |
2 | /* |
3 | * Copyright (C) 2017 Oracle. All Rights Reserved. | |
f434fd95 | 4 | * Author: Darrick J. Wong <darrick.wong@oracle.com> |
f434fd95 DW |
5 | */ |
6 | #include <pthread.h> | |
7 | #include <signal.h> | |
8 | #include <stdlib.h> | |
9 | #include <string.h> | |
10 | #include <stdint.h> | |
11 | #include <stdbool.h> | |
12 | #include <errno.h> | |
13 | #include <assert.h> | |
14 | #include "workqueue.h" | |
15 | ||
16 | /* Main processing thread */ | |
17 | static void * | |
18 | workqueue_thread(void *arg) | |
19 | { | |
20 | struct workqueue *wq = arg; | |
21 | struct workqueue_item *wi; | |
22 | ||
23 | /* | |
24 | * Loop pulling work from the passed in work queue. | |
25 | * Check for notification to exit after every chunk of work. | |
26 | */ | |
27 | while (1) { | |
28 | pthread_mutex_lock(&wq->lock); | |
29 | ||
30 | /* | |
31 | * Wait for work. | |
32 | */ | |
33 | while (wq->next_item == NULL && !wq->terminate) { | |
34 | assert(wq->item_count == 0); | |
35 | pthread_cond_wait(&wq->wakeup, &wq->lock); | |
36 | } | |
37 | if (wq->next_item == NULL && wq->terminate) { | |
38 | pthread_mutex_unlock(&wq->lock); | |
39 | break; | |
40 | } | |
41 | ||
42 | /* | |
43 | * Dequeue work from the head of the list. | |
44 | */ | |
45 | assert(wq->item_count > 0); | |
46 | wi = wq->next_item; | |
47 | wq->next_item = wi->next; | |
48 | wq->item_count--; | |
49 | ||
50 | pthread_mutex_unlock(&wq->lock); | |
51 | ||
52 | (wi->function)(wi->queue, wi->index, wi->arg); | |
53 | free(wi); | |
54 | } | |
55 | ||
56 | return NULL; | |
57 | } | |
58 | ||
59 | /* Allocate a work queue and threads. */ | |
60 | int | |
61 | workqueue_create( | |
62 | struct workqueue *wq, | |
63 | void *wq_ctx, | |
64 | unsigned int nr_workers) | |
65 | { | |
66 | unsigned int i; | |
67 | int err = 0; | |
68 | ||
69 | memset(wq, 0, sizeof(*wq)); | |
bfd9b38b DW |
70 | err = pthread_cond_init(&wq->wakeup, NULL); |
71 | if (err) | |
72 | return err; | |
73 | err = pthread_mutex_init(&wq->lock, NULL); | |
74 | if (err) | |
75 | goto out_cond; | |
f434fd95 DW |
76 | |
77 | wq->wq_ctx = wq_ctx; | |
78 | wq->thread_count = nr_workers; | |
79 | wq->threads = malloc(nr_workers * sizeof(pthread_t)); | |
bfd9b38b DW |
80 | if (!wq->threads) { |
81 | err = errno; | |
82 | goto out_mutex; | |
83 | } | |
f434fd95 | 84 | wq->terminate = false; |
71296cf8 | 85 | wq->terminated = false; |
f434fd95 DW |
86 | |
87 | for (i = 0; i < nr_workers; i++) { | |
88 | err = pthread_create(&wq->threads[i], NULL, workqueue_thread, | |
89 | wq); | |
90 | if (err) | |
91 | break; | |
92 | } | |
93 | ||
bfd9b38b DW |
94 | /* |
95 | * If we encounter errors here, we have to signal and then wait for all | |
96 | * the threads that may have been started running before we can destroy | |
97 | * the workqueue. | |
98 | */ | |
f434fd95 DW |
99 | if (err) |
100 | workqueue_destroy(wq); | |
101 | return err; | |
bfd9b38b DW |
102 | out_mutex: |
103 | pthread_mutex_destroy(&wq->lock); | |
104 | out_cond: | |
105 | pthread_cond_destroy(&wq->wakeup); | |
106 | return err; | |
f434fd95 DW |
107 | } |
108 | ||
109 | /* | |
110 | * Create a work item consisting of a function and some arguments and | |
111 | * schedule the work item to be run via the thread pool. | |
112 | */ | |
113 | int | |
114 | workqueue_add( | |
115 | struct workqueue *wq, | |
116 | workqueue_func_t func, | |
117 | uint32_t index, | |
118 | void *arg) | |
119 | { | |
120 | struct workqueue_item *wi; | |
bfd9b38b | 121 | int ret; |
f434fd95 | 122 | |
71296cf8 DW |
123 | assert(!wq->terminated); |
124 | ||
f434fd95 DW |
125 | if (wq->thread_count == 0) { |
126 | func(wq, index, arg); | |
127 | return 0; | |
128 | } | |
129 | ||
130 | wi = malloc(sizeof(struct workqueue_item)); | |
9d57cbfc DW |
131 | if (!wi) |
132 | return errno; | |
f434fd95 DW |
133 | |
134 | wi->function = func; | |
135 | wi->index = index; | |
136 | wi->arg = arg; | |
137 | wi->queue = wq; | |
138 | wi->next = NULL; | |
139 | ||
140 | /* Now queue the new work structure to the work queue. */ | |
141 | pthread_mutex_lock(&wq->lock); | |
142 | if (wq->next_item == NULL) { | |
f434fd95 | 143 | assert(wq->item_count == 0); |
bfd9b38b DW |
144 | ret = pthread_cond_signal(&wq->wakeup); |
145 | if (ret) | |
146 | goto out_item; | |
147 | wq->next_item = wi; | |
f434fd95 DW |
148 | } else { |
149 | wq->last_item->next = wi; | |
150 | } | |
151 | wq->last_item = wi; | |
152 | wq->item_count++; | |
153 | pthread_mutex_unlock(&wq->lock); | |
154 | ||
155 | return 0; | |
bfd9b38b DW |
156 | out_item: |
157 | free(wi); | |
158 | return ret; | |
f434fd95 DW |
159 | } |
160 | ||
161 | /* | |
162 | * Wait for all pending work items to be processed and tear down the | |
71296cf8 | 163 | * workqueue thread pool. |
f434fd95 | 164 | */ |
71296cf8 DW |
165 | int |
166 | workqueue_terminate( | |
f434fd95 DW |
167 | struct workqueue *wq) |
168 | { | |
169 | unsigned int i; | |
71296cf8 DW |
170 | int ret; |
171 | ||
172 | pthread_mutex_lock(&wq->lock); | |
173 | wq->terminate = true; | |
174 | pthread_mutex_unlock(&wq->lock); | |
175 | ||
176 | ret = pthread_cond_broadcast(&wq->wakeup); | |
177 | if (ret) | |
178 | return ret; | |
179 | ||
180 | for (i = 0; i < wq->thread_count; i++) { | |
181 | ret = pthread_join(wq->threads[i], NULL); | |
182 | if (ret) | |
183 | return ret; | |
184 | } | |
f434fd95 DW |
185 | |
186 | pthread_mutex_lock(&wq->lock); | |
71296cf8 | 187 | wq->terminated = true; |
f434fd95 DW |
188 | pthread_mutex_unlock(&wq->lock); |
189 | ||
71296cf8 DW |
190 | return 0; |
191 | } | |
f434fd95 | 192 | |
71296cf8 DW |
193 | /* Tear down the workqueue. */ |
194 | void | |
195 | workqueue_destroy( | |
196 | struct workqueue *wq) | |
197 | { | |
198 | assert(wq->terminated); | |
f434fd95 DW |
199 | |
200 | free(wq->threads); | |
201 | pthread_mutex_destroy(&wq->lock); | |
202 | pthread_cond_destroy(&wq->wakeup); | |
203 | memset(wq, 0, sizeof(*wq)); | |
204 | } |