]> git.ipfire.org Git - thirdparty/xfsprogs-dev.git/commitdiff
libfrog: split workqueue destroy functions
authorDarrick J. Wong <darrick.wong@oracle.com>
Tue, 15 Oct 2019 16:54:37 +0000 (12:54 -0400)
committerEric Sandeen <sandeen@sandeen.net>
Tue, 15 Oct 2019 16:54:37 +0000 (12:54 -0400)
Split the workqueue destroy function into two parts -- one to signal all
the threads to exit and wait for them, and a second one that actually
destroys all the memory associated with the workqueue.  This mean we can
report latent workqueue errors independent of the freeing function.

Signed-off-by: Darrick J. Wong <darrick.wong@oracle.com>
Reviewed-by: Eric Sandeen <sandeen@redhat.com>
Signed-off-by: Eric Sandeen <sandeen@sandeen.net>
libfrog/workqueue.c
libfrog/workqueue.h
repair/threads.c
scrub/fscounters.c
scrub/inodes.c
scrub/phase2.c
scrub/phase4.c
scrub/read_verify.c
scrub/spacemap.c
scrub/vfs.c

index 48038363ebd08da4b5b0d5bdc7e064913b197f28..07f11a7b993256dbceef08c7f1c6a465c47e25fe 100644 (file)
@@ -82,6 +82,7 @@ workqueue_create(
                goto out_mutex;
        }
        wq->terminate = false;
+       wq->terminated = false;
 
        for (i = 0; i < nr_workers; i++) {
                err = pthread_create(&wq->threads[i], NULL, workqueue_thread,
@@ -119,6 +120,8 @@ workqueue_add(
        struct workqueue_item   *wi;
        int                     ret;
 
+       assert(!wq->terminated);
+
        if (wq->thread_count == 0) {
                func(wq, index, arg);
                return 0;
@@ -157,22 +160,42 @@ out_item:
 
 /*
  * Wait for all pending work items to be processed and tear down the
- * workqueue.
+ * workqueue thread pool.
  */
-void
-workqueue_destroy(
+int
+workqueue_terminate(
        struct workqueue        *wq)
 {
        unsigned int            i;
+       int                     ret;
+
+       pthread_mutex_lock(&wq->lock);
+       wq->terminate = true;
+       pthread_mutex_unlock(&wq->lock);
+
+       ret = pthread_cond_broadcast(&wq->wakeup);
+       if (ret)
+               return ret;
+
+       for (i = 0; i < wq->thread_count; i++) {
+               ret = pthread_join(wq->threads[i], NULL);
+               if (ret)
+                       return ret;
+       }
 
        pthread_mutex_lock(&wq->lock);
-       wq->terminate = 1;
+       wq->terminated = true;
        pthread_mutex_unlock(&wq->lock);
 
-       pthread_cond_broadcast(&wq->wakeup);
+       return 0;
+}
 
-       for (i = 0; i < wq->thread_count; i++)
-               pthread_join(wq->threads[i], NULL);
+/* Tear down the workqueue. */
+void
+workqueue_destroy(
+       struct workqueue        *wq)
+{
+       assert(wq->terminated);
 
        free(wq->threads);
        pthread_mutex_destroy(&wq->lock);
index a1f3a57c3d8dd66cef12d01cc86e6e10a8ad5d4c..a56d1cf14081be54cd4bb37b136cb5fe40cc03ce 100644 (file)
@@ -30,12 +30,14 @@ struct workqueue {
        unsigned int            item_count;
        unsigned int            thread_count;
        bool                    terminate;
+       bool                    terminated;
 };
 
 int workqueue_create(struct workqueue *wq, void *wq_ctx,
                unsigned int nr_workers);
 int workqueue_add(struct workqueue *wq, workqueue_func_t fn,
                uint32_t index, void *arg);
+int workqueue_terminate(struct workqueue *wq);
 void workqueue_destroy(struct workqueue *wq);
 
 #endif /* __LIBFROG_WORKQUEUE_H__ */
index d21909208262f32f5f499b06475f42eef85a3f6e..9b7241e3d1482976e577da3c28c7f84d2dd95523 100644 (file)
@@ -56,5 +56,11 @@ void
 destroy_work_queue(
        struct workqueue        *wq)
 {
+       int                     err;
+
+       err = workqueue_terminate(wq);
+       if (err)
+               do_error(_("cannot terminate worker item, error = [%d] %s\n"),
+                               err, strerror(err));
        workqueue_destroy(wq);
 }
index 669c5ab0cf09f70e8e459371b1ffe6ebabeed6b2..98aa3826e7b143e820e06d596a5c620635f6eddf 100644 (file)
@@ -102,7 +102,7 @@ xfs_count_all_inodes(
        struct xfs_count_inodes *ci;
        xfs_agnumber_t          agno;
        struct workqueue        wq;
-       bool                    moveon;
+       bool                    moveon = true;
        int                     ret;
 
        ci = calloc(1, sizeof(struct xfs_count_inodes) +
@@ -126,8 +126,17 @@ xfs_count_all_inodes(
                        break;
                }
        }
+
+       ret = workqueue_terminate(&wq);
+       if (ret) {
+               moveon = false;
+               str_liberror(ctx, ret, _("finishing icount work"));
+       }
        workqueue_destroy(&wq);
 
+       if (!moveon)
+               goto out_free;
+
        for (agno = 0; agno < ctx->mnt.fsgeom.agcount; agno++)
                *count += ci->counters[agno];
        moveon = ci->moveon;
index 74280ad6946add837a6429fc564aa05be4e7263e..91632b550b48c0f4eedf27aea4bd6530100df15f 100644 (file)
@@ -256,6 +256,11 @@ xfs_scan_all_inodes(
                }
        }
 
+       ret = workqueue_terminate(&wq);
+       if (ret) {
+               si.moveon = false;
+               str_liberror(ctx, ret, _("finishing bulkstat work"));
+       }
        workqueue_destroy(&wq);
 
        return si.moveon;
index 1d2244a4c68ce698ad1d173084f861ae415d813c..d92b7e29765373d8417a23120b1859b4869c47c0 100644 (file)
@@ -161,6 +161,11 @@ xfs_scan_metadata(
        }
 
 out:
+       ret = workqueue_terminate(&wq);
+       if (ret) {
+               moveon = false;
+               str_liberror(ctx, ret, _("finishing scrub work"));
+       }
        workqueue_destroy(&wq);
        return moveon;
 }
index 903da6d2f6619f703780b222bc104306169d22d0..eb30c189756e900621836cb335ff3234784519f6 100644 (file)
@@ -90,6 +90,12 @@ xfs_process_action_items(
                if (!moveon)
                        break;
        }
+
+       ret = workqueue_terminate(&wq);
+       if (ret) {
+               moveon = false;
+               str_liberror(ctx, ret, _("finishing repair work"));
+       }
        workqueue_destroy(&wq);
 
        pthread_mutex_lock(&ctx->lock);
index ff4d3572f223d78b196e9172afced9e931064b02..bb8f09a85f7fade597d19bbf342d91b0219e5adc 100644 (file)
@@ -120,6 +120,7 @@ void
 read_verify_pool_flush(
        struct read_verify_pool         *rvp)
 {
+       workqueue_terminate(&rvp->wq);
        workqueue_destroy(&rvp->wq);
 }
 
index 4258e318cf73579c4d702c28b4525c63e3684993..91e8badb7926dd4b3b838a338df01e8fc593abf3 100644 (file)
@@ -230,6 +230,11 @@ xfs_scan_all_spacemaps(
                }
        }
 out:
+       ret = workqueue_terminate(&wq);
+       if (ret) {
+               sbx.moveon = false;
+               str_liberror(ctx, ret, _("finishing fsmap work"));
+       }
        workqueue_destroy(&wq);
 
        return sbx.moveon;
index 0cff2e3f0801ed8d5ce429b91b4d2fbe83477e4d..49d689af8f506f48c32d8765e3f4f82c6aadd40d 100644 (file)
@@ -250,6 +250,11 @@ scan_fs_tree(
        assert(sft.nr_dirs == 0);
        pthread_mutex_unlock(&sft.lock);
 
+       ret = workqueue_terminate(&wq);
+       if (ret) {
+               sft.moveon = false;
+               str_liberror(ctx, ret, _("finishing directory scan work"));
+       }
 out_wq:
        workqueue_destroy(&wq);
        return sft.moveon;