int err = 0;
memset(wq, 0, sizeof(*wq));
- pthread_cond_init(&wq->wakeup, NULL);
- pthread_mutex_init(&wq->lock, NULL);
+ err = pthread_cond_init(&wq->wakeup, NULL);
+ if (err)
+ return err;
+ err = pthread_mutex_init(&wq->lock, NULL);
+ if (err)
+ goto out_cond;
wq->wq_ctx = wq_ctx;
wq->thread_count = nr_workers;
wq->threads = malloc(nr_workers * sizeof(pthread_t));
+ if (!wq->threads) {
+ err = errno;
+ goto out_mutex;
+ }
wq->terminate = false;
+ wq->terminated = false;
for (i = 0; i < nr_workers; i++) {
err = pthread_create(&wq->threads[i], NULL, workqueue_thread,
break;
}
+ /*
+ * If we encounter errors here, we have to signal and then wait for all
+ * the threads that may have been started running before we can destroy
+ * the workqueue.
+ */
if (err)
workqueue_destroy(wq);
return err;
+out_mutex:
+ pthread_mutex_destroy(&wq->lock);
+out_cond:
+ pthread_cond_destroy(&wq->wakeup);
+ return err;
}
/*
void *arg)
{
struct workqueue_item *wi;
+ int ret;
+
+ assert(!wq->terminated);
if (wq->thread_count == 0) {
func(wq, index, arg);
/* Now queue the new work structure to the work queue. */
pthread_mutex_lock(&wq->lock);
if (wq->next_item == NULL) {
- wq->next_item = wi;
assert(wq->item_count == 0);
- pthread_cond_signal(&wq->wakeup);
+ ret = pthread_cond_signal(&wq->wakeup);
+ if (ret) {
+ pthread_mutex_unlock(&wq->lock);
+ free(wi);
+ return ret;
+ }
+ wq->next_item = wi;
} else {
wq->last_item->next = wi;
}
/*
* Wait for all pending work items to be processed and tear down the
- * workqueue.
+ * workqueue thread pool.
*/
-void
-workqueue_destroy(
+int
+workqueue_terminate(
struct workqueue *wq)
{
unsigned int i;
+ int ret;
+
+ pthread_mutex_lock(&wq->lock);
+ wq->terminate = true;
+ pthread_mutex_unlock(&wq->lock);
+
+ ret = pthread_cond_broadcast(&wq->wakeup);
+ if (ret)
+ return ret;
+
+ for (i = 0; i < wq->thread_count; i++) {
+ ret = pthread_join(wq->threads[i], NULL);
+ if (ret)
+ return ret;
+ }
pthread_mutex_lock(&wq->lock);
- wq->terminate = 1;
+ wq->terminated = true;
pthread_mutex_unlock(&wq->lock);
- pthread_cond_broadcast(&wq->wakeup);
+ return 0;
+}
- for (i = 0; i < wq->thread_count; i++)
- pthread_join(wq->threads[i], NULL);
+/* Tear down the workqueue. */
+void
+workqueue_destroy(
+ struct workqueue *wq)
+{
+ assert(wq->terminated);
free(wq->threads);
pthread_mutex_destroy(&wq->lock);