]> git.ipfire.org Git - thirdparty/samba.git/commitdiff
pthreadpool: split out a pthreadpool_stop() from pthreadpool_destroy()
authorStefan Metzmacher <metze@samba.org>
Wed, 25 Apr 2018 12:03:30 +0000 (14:03 +0200)
committerStefan Metzmacher <metze@samba.org>
Thu, 12 Jul 2018 12:25:19 +0000 (14:25 +0200)
This can be used in combination with pthreadpool_cancel_job() to
implement a multi step shutdown of the pool.

Signed-off-by: Stefan Metzmacher <metze@samba.org>
Reviewed-by: Ralph Boehme <slow@samba.org>
lib/pthreadpool/pthreadpool.c
lib/pthreadpool/pthreadpool.h
lib/pthreadpool/pthreadpool_sync.c

index 1ef6dccee6214133cb7d91e47763ddf778d3cc2e..610cfb02f154866b8ac7d598680e4d0aeb3b9593 100644 (file)
@@ -71,9 +71,16 @@ struct pthreadpool {
        void *signal_fn_private_data;
 
        /*
-        * indicator to worker threads that they should shut down
+        * indicator to worker threads to stop processing further jobs
+        * and exit.
         */
-       bool shutdown;
+       bool stopped;
+
+       /*
+        * indicator to the last worker thread to free the pool
+        * resources.
+        */
+       bool destroyed;
 
        /*
         * maximum number of threads
@@ -169,7 +176,8 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
                return ret;
        }
 
-       pool->shutdown = false;
+       pool->stopped = false;
+       pool->destroyed = false;
        pool->num_threads = 0;
        pool->max_threads = max_threads;
        pool->num_idle = 0;
@@ -198,6 +206,10 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
 
 size_t pthreadpool_max_threads(struct pthreadpool *pool)
 {
+       if (pool->stopped) {
+               return 0;
+       }
+
        return pool->max_threads;
 }
 
@@ -207,8 +219,18 @@ size_t pthreadpool_queued_jobs(struct pthreadpool *pool)
        int unlock_res;
        size_t ret;
 
+       if (pool->stopped) {
+               return 0;
+       }
+
        res = pthread_mutex_lock(&pool->mutex);
        if (res != 0) {
+               return res;
+       }
+
+       if (pool->stopped) {
+               unlock_res = pthread_mutex_unlock(&pool->mutex);
+               assert(unlock_res == 0);
                return 0;
        }
 
@@ -378,11 +400,33 @@ static int pthreadpool_free(struct pthreadpool *pool)
 }
 
 /*
- * Destroy a thread pool. Wake up all idle threads for exit. The last
- * one will free the pool.
+ * Stop a thread pool. Wake up all idle threads for exit.
  */
 
-int pthreadpool_destroy(struct pthreadpool *pool)
+static int pthreadpool_stop_locked(struct pthreadpool *pool)
+{
+       int ret;
+
+       pool->stopped = true;
+
+       if (pool->num_threads == 0) {
+               return 0;
+       }
+
+       /*
+        * We have active threads, tell them to finish.
+        */
+
+       ret = pthread_cond_broadcast(&pool->condvar);
+
+       return ret;
+}
+
+/*
+ * Stop a thread pool. Wake up all idle threads for exit.
+ */
+
+int pthreadpool_stop(struct pthreadpool *pool)
 {
        int ret, ret1;
 
@@ -391,34 +435,50 @@ int pthreadpool_destroy(struct pthreadpool *pool)
                return ret;
        }
 
-       if (pool->shutdown) {
-               ret = pthread_mutex_unlock(&pool->mutex);
-               assert(ret == 0);
-               return EBUSY;
+       if (!pool->stopped) {
+               ret = pthreadpool_stop_locked(pool);
        }
 
-       pool->shutdown = true;
+       ret1 = pthread_mutex_unlock(&pool->mutex);
+       assert(ret1 == 0);
 
-       if (pool->num_threads == 0) {
-               ret = pthread_mutex_unlock(&pool->mutex);
-               assert(ret == 0);
+       return ret;
+}
+
+/*
+ * Destroy a thread pool. Wake up all idle threads for exit. The last
+ * one will free the pool.
+ */
+
+int pthreadpool_destroy(struct pthreadpool *pool)
+{
+       int ret, ret1;
+       bool free_it;
+
+       assert(!pool->destroyed);
 
-               ret = pthreadpool_free(pool);
+       ret = pthread_mutex_lock(&pool->mutex);
+       if (ret != 0) {
                return ret;
        }
 
-       /*
-        * We have active threads, tell them to finish.
-        */
+       pool->destroyed = true;
 
-       ret = pthread_cond_broadcast(&pool->condvar);
+       if (!pool->stopped) {
+               ret = pthreadpool_stop_locked(pool);
+       }
+
+       free_it = (pool->num_threads == 0);
 
        ret1 = pthread_mutex_unlock(&pool->mutex);
        assert(ret1 == 0);
 
+       if (free_it) {
+               pthreadpool_free(pool);
+       }
+
        return ret;
 }
-
 /*
  * Prepare for pthread_exit(), pool->mutex must be locked and will be
  * unlocked here. This is a bit of a layering violation, but here we
@@ -431,7 +491,7 @@ static void pthreadpool_server_exit(struct pthreadpool *pool)
 
        pool->num_threads -= 1;
 
-       free_it = (pool->shutdown && (pool->num_threads == 0));
+       free_it = (pool->destroyed && (pool->num_threads == 0));
 
        ret = pthread_mutex_unlock(&pool->mutex);
        assert(ret == 0);
@@ -444,7 +504,7 @@ static void pthreadpool_server_exit(struct pthreadpool *pool)
 static bool pthreadpool_get_job(struct pthreadpool *p,
                                struct pthreadpool_job *job)
 {
-       if (p->shutdown) {
+       if (p->stopped) {
                return false;
        }
 
@@ -527,7 +587,7 @@ static void *pthreadpool_server(void *arg)
                clock_gettime(CLOCK_REALTIME, &ts);
                ts.tv_sec += 1;
 
-               while ((pool->num_jobs == 0) && !pool->shutdown) {
+               while ((pool->num_jobs == 0) && !pool->stopped) {
 
                        pool->num_idle += 1;
                        res = pthread_cond_timedwait(
@@ -605,9 +665,9 @@ static void *pthreadpool_server(void *arg)
                        }
                }
 
-               if (pool->shutdown) {
+               if (pool->stopped) {
                        /*
-                        * we're asked to shut down, so exit
+                        * we're asked to stop processing jobs, so exit
                         */
                        pthreadpool_server_exit(pool);
                        return NULL;
@@ -666,12 +726,14 @@ int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
        int res;
        int unlock_res;
 
+       assert(!pool->destroyed);
+
        res = pthread_mutex_lock(&pool->mutex);
        if (res != 0) {
                return res;
        }
 
-       if (pool->shutdown) {
+       if (pool->stopped) {
                /*
                 * Protect against the pool being shut down while
                 * trying to add a job
@@ -761,6 +823,8 @@ size_t pthreadpool_cancel_job(struct pthreadpool *pool, int job_id,
        size_t i, j;
        size_t num = 0;
 
+       assert(!pool->destroyed);
+
        res = pthread_mutex_lock(&pool->mutex);
        if (res != 0) {
                return res;
index dd1f9718b2364ff20dcafe6babf71766fc9f6d32..b4733580e07bbc7ed5150de88242302e5c52877c 100644 (file)
@@ -71,9 +71,31 @@ size_t pthreadpool_max_threads(struct pthreadpool *pool);
  */
 size_t pthreadpool_queued_jobs(struct pthreadpool *pool);
 
+/**
+ * @brief Stop a pthreadpool
+ *
+ * Stop a pthreadpool. If jobs are submitted, but not yet active in
+ * a thread, they won't get executed. If a job has already been
+ * submitted to a thread, the job function will continue running, and
+ * the signal function might still be called.
+ *
+ * This allows a multi step shutdown using pthreadpool_stop(),
+ * pthreadpool_cancel_job() and pthreadpool_destroy().
+ *
+ * @param[in]  pool            The pool to stop
+ * @return                     success: 0, failure: errno
+ *
+ * @see pthreadpool_cancel_job()
+ * @see pthreadpool_destroy()
+ */
+int pthreadpool_stop(struct pthreadpool *pool);
+
 /**
  * @brief Destroy a pthreadpool
  *
+ * This basically implies pthreadpool_stop() if the pool
+ * isn't already stopped.
+ *
  * Destroy a pthreadpool. If jobs are submitted, but not yet active in
  * a thread, they won't get executed. If a job has already been
  * submitted to a thread, the job function will continue running, and
@@ -84,6 +106,8 @@ size_t pthreadpool_queued_jobs(struct pthreadpool *pool);
  *
  * @param[in]  pool            The pool to destroy
  * @return                     success: 0, failure: errno
+ *
+ * @see pthreadpool_stop()
  */
 int pthreadpool_destroy(struct pthreadpool *pool);
 
@@ -125,6 +149,8 @@ int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
  * @return                     The number of canceled jobs
  *
  * @see pthreadpool_add_job()
+ * @see pthreadpool_stop()
+ * @see pthreadpool_destroy()
  */
 size_t pthreadpool_cancel_job(struct pthreadpool *pool, int job_id,
                              void (*fn)(void *private_data), void *private_data);
index 837abac54d7c1ddcd1506e85f570b7ff48846676..48e6a0ddb6049db90617c598f48271ce95486680 100644 (file)
@@ -22,6 +22,8 @@
 #include "pthreadpool.h"
 
 struct pthreadpool {
+       bool stopped;
+
        /*
         * Indicate job completion
         */
@@ -45,6 +47,7 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
        if (pool == NULL) {
                return ENOMEM;
        }
+       pool->stopped = false;
        pool->signal_fn = signal_fn;
        pool->signal_fn_private_data = signal_fn_private_data;
 
@@ -65,6 +68,10 @@ size_t pthreadpool_queued_jobs(struct pthreadpool *pool)
 int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
                        void (*fn)(void *private_data), void *private_data)
 {
+       if (pool->stopped) {
+               return EINVAL;
+       }
+
        fn(private_data);
 
        return pool->signal_fn(job_id, fn, private_data,
@@ -77,6 +84,12 @@ size_t pthreadpool_cancel_job(struct pthreadpool *pool, int job_id,
        return 0;
 }
 
+int pthreadpool_stop(struct pthreadpool *pool)
+{
+       pool->stopped = true;
+       return 0;
+}
+
 int pthreadpool_destroy(struct pthreadpool *pool)
 {
        free(pool);