]> git.ipfire.org Git - thirdparty/xfsprogs-dev.git/blobdiff - libfrog/workqueue.c
libfrog: fix workqueue_add error out
[thirdparty/xfsprogs-dev.git] / libfrog / workqueue.c
index a806da3e63fa8a9f83651519e4ee67e60e01c519..a93bba3d30a4c7bfd619ca1aad972e9ff30f1aa9 100644 (file)
@@ -67,13 +67,22 @@ workqueue_create(
        int                     err = 0;
 
        memset(wq, 0, sizeof(*wq));
-       pthread_cond_init(&wq->wakeup, NULL);
-       pthread_mutex_init(&wq->lock, NULL);
+       err = pthread_cond_init(&wq->wakeup, NULL);
+       if (err)
+               return err;
+       err = pthread_mutex_init(&wq->lock, NULL);
+       if (err)
+               goto out_cond;
 
        wq->wq_ctx = wq_ctx;
        wq->thread_count = nr_workers;
        wq->threads = malloc(nr_workers * sizeof(pthread_t));
+       if (!wq->threads) {
+               err = errno;
+               goto out_mutex;
+       }
        wq->terminate = false;
+       wq->terminated = false;
 
        for (i = 0; i < nr_workers; i++) {
                err = pthread_create(&wq->threads[i], NULL, workqueue_thread,
@@ -82,9 +91,19 @@ workqueue_create(
                        break;
        }
 
+       /*
+        * If we encounter errors here, we have to signal and then wait for all
+        * the threads that may have been started running before we can destroy
+        * the workqueue.
+        */
        if (err)
                workqueue_destroy(wq);
        return err;
+out_mutex:
+       pthread_mutex_destroy(&wq->lock);
+out_cond:
+       pthread_cond_destroy(&wq->wakeup);
+       return err;
 }
 
 /*
@@ -99,6 +118,9 @@ workqueue_add(
        void                    *arg)
 {
        struct workqueue_item   *wi;
+       int                     ret;
+
+       assert(!wq->terminated);
 
        if (wq->thread_count == 0) {
                func(wq, index, arg);
@@ -118,9 +140,14 @@ workqueue_add(
        /* Now queue the new work structure to the work queue. */
        pthread_mutex_lock(&wq->lock);
        if (wq->next_item == NULL) {
-               wq->next_item = wi;
                assert(wq->item_count == 0);
-               pthread_cond_signal(&wq->wakeup);
+               ret = pthread_cond_signal(&wq->wakeup);
+               if (ret) {
+                       pthread_mutex_unlock(&wq->lock);
+                       free(wi);
+                       return ret;
+               }
+               wq->next_item = wi;
        } else {
                wq->last_item->next = wi;
        }
@@ -133,22 +160,42 @@ workqueue_add(
 
 /*
  * Wait for all pending work items to be processed and tear down the
- * workqueue.
+ * workqueue thread pool.
  */
-void
-workqueue_destroy(
+int
+workqueue_terminate(
        struct workqueue        *wq)
 {
        unsigned int            i;
+       int                     ret;
+
+       pthread_mutex_lock(&wq->lock);
+       wq->terminate = true;
+       pthread_mutex_unlock(&wq->lock);
+
+       ret = pthread_cond_broadcast(&wq->wakeup);
+       if (ret)
+               return ret;
+
+       for (i = 0; i < wq->thread_count; i++) {
+               ret = pthread_join(wq->threads[i], NULL);
+               if (ret)
+                       return ret;
+       }
 
        pthread_mutex_lock(&wq->lock);
-       wq->terminate = 1;
+       wq->terminated = true;
        pthread_mutex_unlock(&wq->lock);
 
-       pthread_cond_broadcast(&wq->wakeup);
+       return 0;
+}
 
-       for (i = 0; i < wq->thread_count; i++)
-               pthread_join(wq->threads[i], NULL);
+/* Tear down the workqueue. */
+void
+workqueue_destroy(
+       struct workqueue        *wq)
+{
+       assert(wq->terminated);
 
        free(wq->threads);
        pthread_mutex_destroy(&wq->lock);