* Check for notification to exit after every chunk of work.
*/
rcu_register_thread();
+ pthread_mutex_lock(&wq->lock);
while (1) {
- pthread_mutex_lock(&wq->lock);
/*
* Wait for work.
assert(wq->item_count == 0);
pthread_cond_wait(&wq->wakeup, &wq->lock);
}
- if (wq->next_item == NULL && wq->terminate) {
- pthread_mutex_unlock(&wq->lock);
+ if (wq->next_item == NULL && wq->terminate)
break;
- }
/*
* Dequeue work from the head of the list. If the queue was
/* more work, wake up another worker */
pthread_cond_signal(&wq->wakeup);
}
+ wq->active_threads++;
pthread_mutex_unlock(&wq->lock);
(wi->function)(wi->queue, wi->index, wi->arg);
free(wi);
+
+ pthread_mutex_lock(&wq->lock);
+ wq->active_threads--;
}
+ pthread_mutex_unlock(&wq->lock);
rcu_unregister_thread();
return NULL;
restart:
if (wq->next_item == NULL) {
assert(wq->item_count == 0);
- ret = -pthread_cond_signal(&wq->wakeup);
- if (ret) {
- pthread_mutex_unlock(&wq->lock);
- free(wi);
- return ret;
- }
wq->next_item = wi;
} else {
/* throttle on a full queue if configured */
}
wq->last_item = wi;
wq->item_count++;
+
+ if (wq->active_threads == wq->thread_count - 1) {
+ /* One thread is idle, wake it */
+ ret = -pthread_cond_signal(&wq->wakeup);
+ if (ret) {
+ pthread_mutex_unlock(&wq->lock);
+ return ret;
+ }
+ } else if (wq->active_threads < wq->thread_count) {
+ /* Multiple threads are idle, wake everyone */
+ ret = -pthread_cond_broadcast(&wq->wakeup);
+ if (ret) {
+ pthread_mutex_unlock(&wq->lock);
+ return ret;
+ }
+ }
+
pthread_mutex_unlock(&wq->lock);
return 0;