]> git.ipfire.org Git - thirdparty/xfsprogs-dev.git/blame - repair/threads.c
progs: clean up libxfs.h includes
[thirdparty/xfsprogs-dev.git] / repair / threads.c
CommitLineData
b08338d7 1#include "xfs/libxfs.h"
2556c98b
BN
2#include <pthread.h>
3#include <signal.h>
3b6ac903
MV
4#include "threads.h"
5#include "err_protos.h"
6#include "protos.h"
2556c98b 7#include "globals.h"
3b6ac903 8
2556c98b
BN
9static void *
10worker_thread(void *arg)
3b6ac903 11{
2556c98b
BN
12 work_queue_t *wq;
13 work_item_t *wi;
3b6ac903 14
2556c98b 15 wq = (work_queue_t*)arg;
3b6ac903 16
2556c98b
BN
17 /*
18 * Loop pulling work from the passed in work queue.
19 * Check for notification to exit after every chunk of work.
20 */
21 while (1) {
22 pthread_mutex_lock(&wq->lock);
3b6ac903 23
2556c98b
BN
24 /*
25 * Wait for work.
26 */
27 while (wq->next_item == NULL && !wq->terminate) {
28 ASSERT(wq->item_count == 0);
29 pthread_cond_wait(&wq->wakeup, &wq->lock);
30 }
31 if (wq->next_item == NULL && wq->terminate) {
32 pthread_mutex_unlock(&wq->lock);
33 break;
34 }
3b6ac903 35
2556c98b
BN
36 /*
37 * Dequeue work from the head of the list.
38 */
39 ASSERT(wq->item_count > 0);
40 wi = wq->next_item;
41 wq->next_item = wi->next;
42 wq->item_count--;
3b6ac903 43
2556c98b 44 pthread_mutex_unlock(&wq->lock);
3b6ac903 45
2556c98b
BN
46 (wi->function)(wi->queue, wi->agno, wi->arg);
47 free(wi);
3b6ac903 48 }
3b6ac903 49
2556c98b 50 return NULL;
3b6ac903
MV
51}
52
53void
54thread_init(void)
55{
3b6ac903
MV
56 sigset_t blocked;
57
3b6ac903
MV
58 /*
59 * block delivery of progress report signal to all threads
60 */
61 sigemptyset(&blocked);
62 sigaddset(&blocked, SIGHUP);
63 sigaddset(&blocked, SIGALRM);
64 pthread_sigmask(SIG_BLOCK, &blocked, NULL);
3b6ac903
MV
65}
66
2556c98b
BN
67
68void
69create_work_queue(
70 work_queue_t *wq,
71 xfs_mount_t *mp,
72 int nworkers)
3b6ac903 73{
2556c98b
BN
74 int err;
75 int i;
3b6ac903 76
2556c98b 77 memset(wq, 0, sizeof(work_queue_t));
3b6ac903 78
2556c98b
BN
79 pthread_cond_init(&wq->wakeup, NULL);
80 pthread_mutex_init(&wq->lock, NULL);
3b6ac903 81
2556c98b
BN
82 wq->mp = mp;
83 wq->thread_count = nworkers;
84 wq->threads = malloc(nworkers * sizeof(pthread_t));
85 wq->terminate = 0;
3b6ac903 86
2556c98b
BN
87 for (i = 0; i < nworkers; i++) {
88 err = pthread_create(&wq->threads[i], NULL, worker_thread, wq);
89 if (err != 0) {
90 do_error(_("cannot create worker threads, error = [%d] %s\n"),
91 err, strerror(err));
3b6ac903 92 }
3b6ac903 93 }
2556c98b 94
3b6ac903
MV
95}
96
2556c98b
BN
97void
98queue_work(
99 work_queue_t *wq,
100 work_func_t func,
101 xfs_agnumber_t agno,
102 void *arg)
3b6ac903 103{
2556c98b 104 work_item_t *wi;
3b6ac903 105
2556c98b
BN
106 wi = (work_item_t *)malloc(sizeof(work_item_t));
107 if (wi == NULL)
108 do_error(_("cannot allocate worker item, error = [%d] %s\n"),
109 errno, strerror(errno));
110
111 wi->function = func;
112 wi->agno = agno;
113 wi->arg = arg;
114 wi->queue = wq;
115 wi->next = NULL;
3b6ac903
MV
116
117 /*
118 * Now queue the new work structure to the work queue.
119 */
2556c98b
BN
120 pthread_mutex_lock(&wq->lock);
121 if (wq->next_item == NULL) {
122 wq->next_item = wi;
123 ASSERT(wq->item_count == 0);
124 pthread_cond_signal(&wq->wakeup);
3b6ac903 125 } else {
2556c98b 126 wq->last_item->next = wi;
3b6ac903 127 }
2556c98b
BN
128 wq->last_item = wi;
129 wq->item_count++;
130 pthread_mutex_unlock(&wq->lock);
3b6ac903
MV
131}
132
133void
2556c98b
BN
134destroy_work_queue(
135 work_queue_t *wq)
3b6ac903 136{
2556c98b 137 int i;
3b6ac903 138
2556c98b
BN
139 pthread_mutex_lock(&wq->lock);
140 wq->terminate = 1;
141 pthread_mutex_unlock(&wq->lock);
3b6ac903 142
2556c98b
BN
143 pthread_cond_broadcast(&wq->wakeup);
144
145 for (i = 0; i < wq->thread_count; i++)
146 pthread_join(wq->threads[i], NULL);
3b6ac903 147
2556c98b
BN
148 free(wq->threads);
149 pthread_mutex_destroy(&wq->lock);
150 pthread_cond_destroy(&wq->wakeup);
3b6ac903 151}