]>
Commit | Line | Data |
---|---|---|
959ef981 | 1 | // SPDX-License-Identifier: GPL-2.0+ |
f434fd95 DW |
2 | /* |
3 | * Copyright (C) 2017 Oracle. All Rights Reserved. | |
f434fd95 | 4 | * Author: Darrick J. Wong <darrick.wong@oracle.com> |
f434fd95 DW |
5 | */ |
6 | #include <pthread.h> | |
7 | #include <signal.h> | |
8 | #include <stdlib.h> | |
9 | #include <string.h> | |
10 | #include <stdint.h> | |
11 | #include <stdbool.h> | |
12 | #include <errno.h> | |
13 | #include <assert.h> | |
14 | #include "workqueue.h" | |
15 | ||
16 | /* Main processing thread */ | |
17 | static void * | |
18 | workqueue_thread(void *arg) | |
19 | { | |
20 | struct workqueue *wq = arg; | |
21 | struct workqueue_item *wi; | |
22 | ||
23 | /* | |
24 | * Loop pulling work from the passed in work queue. | |
25 | * Check for notification to exit after every chunk of work. | |
26 | */ | |
27 | while (1) { | |
28 | pthread_mutex_lock(&wq->lock); | |
29 | ||
30 | /* | |
31 | * Wait for work. | |
32 | */ | |
33 | while (wq->next_item == NULL && !wq->terminate) { | |
34 | assert(wq->item_count == 0); | |
35 | pthread_cond_wait(&wq->wakeup, &wq->lock); | |
36 | } | |
37 | if (wq->next_item == NULL && wq->terminate) { | |
38 | pthread_mutex_unlock(&wq->lock); | |
39 | break; | |
40 | } | |
41 | ||
42 | /* | |
43 | * Dequeue work from the head of the list. | |
44 | */ | |
45 | assert(wq->item_count > 0); | |
46 | wi = wq->next_item; | |
47 | wq->next_item = wi->next; | |
48 | wq->item_count--; | |
49 | ||
50 | pthread_mutex_unlock(&wq->lock); | |
51 | ||
52 | (wi->function)(wi->queue, wi->index, wi->arg); | |
53 | free(wi); | |
54 | } | |
55 | ||
56 | return NULL; | |
57 | } | |
58 | ||
59 | /* Allocate a work queue and threads. */ | |
60 | int | |
61 | workqueue_create( | |
62 | struct workqueue *wq, | |
63 | void *wq_ctx, | |
64 | unsigned int nr_workers) | |
65 | { | |
66 | unsigned int i; | |
67 | int err = 0; | |
68 | ||
69 | memset(wq, 0, sizeof(*wq)); | |
70 | pthread_cond_init(&wq->wakeup, NULL); | |
71 | pthread_mutex_init(&wq->lock, NULL); | |
72 | ||
73 | wq->wq_ctx = wq_ctx; | |
74 | wq->thread_count = nr_workers; | |
75 | wq->threads = malloc(nr_workers * sizeof(pthread_t)); | |
76 | wq->terminate = false; | |
77 | ||
78 | for (i = 0; i < nr_workers; i++) { | |
79 | err = pthread_create(&wq->threads[i], NULL, workqueue_thread, | |
80 | wq); | |
81 | if (err) | |
82 | break; | |
83 | } | |
84 | ||
85 | if (err) | |
86 | workqueue_destroy(wq); | |
87 | return err; | |
88 | } | |
89 | ||
90 | /* | |
91 | * Create a work item consisting of a function and some arguments and | |
92 | * schedule the work item to be run via the thread pool. | |
93 | */ | |
94 | int | |
95 | workqueue_add( | |
96 | struct workqueue *wq, | |
97 | workqueue_func_t func, | |
98 | uint32_t index, | |
99 | void *arg) | |
100 | { | |
101 | struct workqueue_item *wi; | |
102 | ||
103 | if (wq->thread_count == 0) { | |
104 | func(wq, index, arg); | |
105 | return 0; | |
106 | } | |
107 | ||
108 | wi = malloc(sizeof(struct workqueue_item)); | |
109 | if (wi == NULL) | |
110 | return ENOMEM; | |
111 | ||
112 | wi->function = func; | |
113 | wi->index = index; | |
114 | wi->arg = arg; | |
115 | wi->queue = wq; | |
116 | wi->next = NULL; | |
117 | ||
118 | /* Now queue the new work structure to the work queue. */ | |
119 | pthread_mutex_lock(&wq->lock); | |
120 | if (wq->next_item == NULL) { | |
121 | wq->next_item = wi; | |
122 | assert(wq->item_count == 0); | |
123 | pthread_cond_signal(&wq->wakeup); | |
124 | } else { | |
125 | wq->last_item->next = wi; | |
126 | } | |
127 | wq->last_item = wi; | |
128 | wq->item_count++; | |
129 | pthread_mutex_unlock(&wq->lock); | |
130 | ||
131 | return 0; | |
132 | } | |
133 | ||
134 | /* | |
135 | * Wait for all pending work items to be processed and tear down the | |
136 | * workqueue. | |
137 | */ | |
138 | void | |
139 | workqueue_destroy( | |
140 | struct workqueue *wq) | |
141 | { | |
142 | unsigned int i; | |
143 | ||
144 | pthread_mutex_lock(&wq->lock); | |
145 | wq->terminate = 1; | |
146 | pthread_mutex_unlock(&wq->lock); | |
147 | ||
148 | pthread_cond_broadcast(&wq->wakeup); | |
149 | ||
150 | for (i = 0; i < wq->thread_count; i++) | |
151 | pthread_join(wq->threads[i], NULL); | |
152 | ||
153 | free(wq->threads); | |
154 | pthread_mutex_destroy(&wq->lock); | |
155 | pthread_cond_destroy(&wq->wakeup); | |
156 | memset(wq, 0, sizeof(*wq)); | |
157 | } |