]>
Commit | Line | Data |
---|---|---|
b08338d7 | 1 | #include "xfs/libxfs.h" |
2556c98b BN |
2 | #include <pthread.h> |
3 | #include <signal.h> | |
3b6ac903 MV |
4 | #include "threads.h" |
5 | #include "err_protos.h" | |
6 | #include "protos.h" | |
2556c98b | 7 | #include "globals.h" |
3b6ac903 | 8 | |
2556c98b BN |
9 | static void * |
10 | worker_thread(void *arg) | |
3b6ac903 | 11 | { |
2556c98b BN |
12 | work_queue_t *wq; |
13 | work_item_t *wi; | |
3b6ac903 | 14 | |
2556c98b | 15 | wq = (work_queue_t*)arg; |
3b6ac903 | 16 | |
2556c98b BN |
17 | /* |
18 | * Loop pulling work from the passed in work queue. | |
19 | * Check for notification to exit after every chunk of work. | |
20 | */ | |
21 | while (1) { | |
22 | pthread_mutex_lock(&wq->lock); | |
3b6ac903 | 23 | |
2556c98b BN |
24 | /* |
25 | * Wait for work. | |
26 | */ | |
27 | while (wq->next_item == NULL && !wq->terminate) { | |
28 | ASSERT(wq->item_count == 0); | |
29 | pthread_cond_wait(&wq->wakeup, &wq->lock); | |
30 | } | |
31 | if (wq->next_item == NULL && wq->terminate) { | |
32 | pthread_mutex_unlock(&wq->lock); | |
33 | break; | |
34 | } | |
3b6ac903 | 35 | |
2556c98b BN |
36 | /* |
37 | * Dequeue work from the head of the list. | |
38 | */ | |
39 | ASSERT(wq->item_count > 0); | |
40 | wi = wq->next_item; | |
41 | wq->next_item = wi->next; | |
42 | wq->item_count--; | |
3b6ac903 | 43 | |
2556c98b | 44 | pthread_mutex_unlock(&wq->lock); |
3b6ac903 | 45 | |
2556c98b BN |
46 | (wi->function)(wi->queue, wi->agno, wi->arg); |
47 | free(wi); | |
3b6ac903 | 48 | } |
3b6ac903 | 49 | |
2556c98b | 50 | return NULL; |
3b6ac903 MV |
51 | } |
52 | ||
53 | void | |
54 | thread_init(void) | |
55 | { | |
3b6ac903 MV |
56 | sigset_t blocked; |
57 | ||
3b6ac903 MV |
58 | /* |
59 | * block delivery of progress report signal to all threads | |
60 | */ | |
61 | sigemptyset(&blocked); | |
62 | sigaddset(&blocked, SIGHUP); | |
63 | sigaddset(&blocked, SIGALRM); | |
64 | pthread_sigmask(SIG_BLOCK, &blocked, NULL); | |
3b6ac903 MV |
65 | } |
66 | ||
2556c98b BN |
67 | |
68 | void | |
69 | create_work_queue( | |
70 | work_queue_t *wq, | |
71 | xfs_mount_t *mp, | |
72 | int nworkers) | |
3b6ac903 | 73 | { |
2556c98b BN |
74 | int err; |
75 | int i; | |
3b6ac903 | 76 | |
2556c98b | 77 | memset(wq, 0, sizeof(work_queue_t)); |
3b6ac903 | 78 | |
2556c98b BN |
79 | pthread_cond_init(&wq->wakeup, NULL); |
80 | pthread_mutex_init(&wq->lock, NULL); | |
3b6ac903 | 81 | |
2556c98b BN |
82 | wq->mp = mp; |
83 | wq->thread_count = nworkers; | |
84 | wq->threads = malloc(nworkers * sizeof(pthread_t)); | |
85 | wq->terminate = 0; | |
3b6ac903 | 86 | |
2556c98b BN |
87 | for (i = 0; i < nworkers; i++) { |
88 | err = pthread_create(&wq->threads[i], NULL, worker_thread, wq); | |
89 | if (err != 0) { | |
90 | do_error(_("cannot create worker threads, error = [%d] %s\n"), | |
91 | err, strerror(err)); | |
3b6ac903 | 92 | } |
3b6ac903 | 93 | } |
2556c98b | 94 | |
3b6ac903 MV |
95 | } |
96 | ||
2556c98b BN |
97 | void |
98 | queue_work( | |
99 | work_queue_t *wq, | |
100 | work_func_t func, | |
101 | xfs_agnumber_t agno, | |
102 | void *arg) | |
3b6ac903 | 103 | { |
2556c98b | 104 | work_item_t *wi; |
3b6ac903 | 105 | |
2556c98b BN |
106 | wi = (work_item_t *)malloc(sizeof(work_item_t)); |
107 | if (wi == NULL) | |
108 | do_error(_("cannot allocate worker item, error = [%d] %s\n"), | |
109 | errno, strerror(errno)); | |
110 | ||
111 | wi->function = func; | |
112 | wi->agno = agno; | |
113 | wi->arg = arg; | |
114 | wi->queue = wq; | |
115 | wi->next = NULL; | |
3b6ac903 MV |
116 | |
117 | /* | |
118 | * Now queue the new work structure to the work queue. | |
119 | */ | |
2556c98b BN |
120 | pthread_mutex_lock(&wq->lock); |
121 | if (wq->next_item == NULL) { | |
122 | wq->next_item = wi; | |
123 | ASSERT(wq->item_count == 0); | |
124 | pthread_cond_signal(&wq->wakeup); | |
3b6ac903 | 125 | } else { |
2556c98b | 126 | wq->last_item->next = wi; |
3b6ac903 | 127 | } |
2556c98b BN |
128 | wq->last_item = wi; |
129 | wq->item_count++; | |
130 | pthread_mutex_unlock(&wq->lock); | |
3b6ac903 MV |
131 | } |
132 | ||
133 | void | |
2556c98b BN |
134 | destroy_work_queue( |
135 | work_queue_t *wq) | |
3b6ac903 | 136 | { |
2556c98b | 137 | int i; |
3b6ac903 | 138 | |
2556c98b BN |
139 | pthread_mutex_lock(&wq->lock); |
140 | wq->terminate = 1; | |
141 | pthread_mutex_unlock(&wq->lock); | |
3b6ac903 | 142 | |
2556c98b BN |
143 | pthread_cond_broadcast(&wq->wakeup); |
144 | ||
145 | for (i = 0; i < wq->thread_count; i++) | |
146 | pthread_join(wq->threads[i], NULL); | |
3b6ac903 | 147 | |
2556c98b BN |
148 | free(wq->threads); |
149 | pthread_mutex_destroy(&wq->lock); | |
150 | pthread_cond_destroy(&wq->wakeup); | |
3b6ac903 | 151 | } |