]> git.ipfire.org Git - thirdparty/tor.git/commitdiff
Rework of worker threads' start and exit + slight changes in cleanup code
authorWaldemar Zimpel <w.zimpel@dev.utilizer.de>
Tue, 12 Nov 2024 23:58:30 +0000 (00:58 +0100)
committerDavid Goulet <dgoulet@torproject.org>
Tue, 3 Dec 2024 14:20:15 +0000 (09:20 -0500)
See issue #40991

changes/ticket40991 [new file with mode: 0644]
src/lib/evloop/workqueue.c

diff --git a/changes/ticket40991 b/changes/ticket40991
new file mode 100644 (file)
index 0000000..3ee26ef
--- /dev/null
@@ -0,0 +1,4 @@
+  o Minor bugfixes (threads, memory):
+    - Rework start and exit of worker threads.
+      Improvements in cleanup of resources used by threads.
+      Fixes bug 40991; bugfix on 0.4.8.13.
index 17ab44e3ab08cbe00ed98be2ae5f4032ddbb9468..e61c838b22e53abd4eadff576ac1fbb52a1e5a2e 100644 (file)
@@ -78,6 +78,8 @@ struct threadpool_t {
 
   /** Number of elements in threads. */
   int n_threads;
+  /** Number of elements to be created in threads. */
+  int n_threads_max;
   /** Mutex to protect all the above fields. */
   tor_mutex_t lock;
 
@@ -88,6 +90,11 @@ struct threadpool_t {
   void *(*new_thread_state_fn)(void*);
   void (*free_thread_state_fn)(void*);
   void *new_thread_state_arg;
+
+  /** Used for signalling the worker threads to exit. */
+  int exit;
+  /** Mutex for controlling worker threads' startup and exit. */
+  tor_mutex_t control_lock;
 };
 
 /** Used to put a workqueue_priority_t value into a bitfield. */
@@ -270,13 +277,25 @@ worker_thread_extract_next_work(workerthread_t *thread)
 static void
 worker_thread_main(void *thread_)
 {
+  static int n_worker_threads_running = 0;
   workerthread_t *thread = thread_;
   threadpool_t *pool = thread->in_pool;
   workqueue_entry_t *work;
   workqueue_reply_t result;
 
   tor_mutex_acquire(&pool->lock);
+  log_debug(LD_GENERAL, "Worker thread (TID: %lu) has started.",
+            tor_get_thread_id());
+
+  if (++n_worker_threads_running == pool->n_threads_max)
+    /* Let the main thread know, the last worker thread has started. */
+    tor_cond_signal_one(&pool->condition);
+
   while (1) {
+    /* Exit thread when signaled to exit */
+    if (pool->exit)
+      break;
+
     /* lock must be held at this point. */
     while (worker_thread_has_work(thread)) {
       /* lock must be held at this point. */
@@ -291,7 +310,7 @@ worker_thread_main(void *thread_)
         workqueue_reply_t r = update_fn(thread->state, arg);
 
         if (r != WQ_RPL_REPLY) {
-          return;
+          break;
         }
 
         tor_mutex_acquire(&pool->lock);
@@ -311,7 +330,7 @@ worker_thread_main(void *thread_)
 
       /* We may need to exit the thread. */
       if (result != WQ_RPL_REPLY) {
-        return;
+        break;
       }
       tor_mutex_acquire(&pool->lock);
     }
@@ -325,6 +344,14 @@ worker_thread_main(void *thread_)
       log_warn(LD_GENERAL, "Fail tor_cond_wait.");
     }
   }
+
+  log_debug(LD_GENERAL, "Worker thread (TID: %lu) has exited.",
+            tor_get_thread_id());
+
+  if (--n_worker_threads_running == 0)
+    tor_mutex_release(&pool->control_lock);
+
+  tor_mutex_release(&pool->lock);
 }
 
 /** Put a reply on the reply queue.  The reply must not currently be on
@@ -522,6 +549,9 @@ threadpool_start_threads(threadpool_t *pool, int n)
     pool->threads = tor_reallocarray(pool->threads,
                                      sizeof(workerthread_t*), n);
 
+  pool->n_threads_max = n;
+  log_debug(LD_GENERAL, "Starting worker threads...");
+
   while (pool->n_threads < n) {
     /* For half of our threads, we'll choose lower priorities permissively;
      * for the other half, we'll stick more strictly to higher priorities.
@@ -543,9 +573,36 @@ threadpool_start_threads(threadpool_t *pool, int n)
     thr->index = pool->n_threads;
     pool->threads[pool->n_threads++] = thr;
   }
+
   tor_mutex_release(&pool->lock);
+  tor_mutex_acquire(&pool->control_lock);
+
+  struct timeval tv = {.tv_sec = 30, .tv_usec = 0};
+
+  /* Wait for the last launched thread to confirm us, it has started.
+   * Wait max 30 seconds */
+  switch (tor_cond_wait(&pool->condition, &pool->control_lock, &tv)) {
+  case -1:
+    log_warn(LD_GENERAL, "Failed to confirm worker threads' start up.");
+    goto error;
+  case 1:
+    log_warn(LD_GENERAL, "Failed to confirm worker threads' "
+                         "start up after timeout.");
+    goto error;
+  case 0:
+    log_debug(LD_GENERAL, "Starting worker threads finished.");
+    break;
+  default:;
+  }
+
+  /* On success control lock stays locked. This is required for the
+   * main thread to wait for the worker threads to exit on shutdown. */
 
   return 0;
+
+error:
+  tor_mutex_release(&pool->control_lock);
+  return -1;
 }
 
 /**
@@ -566,6 +623,9 @@ threadpool_new(int n_threads,
   pool = tor_malloc_zero(sizeof(threadpool_t));
   tor_mutex_init_nonrecursive(&pool->lock);
   tor_cond_init(&pool->condition);
+  tor_mutex_init_nonrecursive(&pool->control_lock);
+  pool->exit = 0;
+
   unsigned i;
   for (i = WORKQUEUE_PRIORITY_FIRST; i <= WORKQUEUE_PRIORITY_LAST; ++i) {
     TOR_TAILQ_INIT(&pool->work[i]);
@@ -579,8 +639,6 @@ threadpool_new(int n_threads,
   if (threadpool_start_threads(pool, n_threads) < 0) {
     //LCOV_EXCL_START
     tor_assert_nonfatal_unreached();
-    tor_cond_uninit(&pool->condition);
-    tor_mutex_uninit(&pool->lock);
     threadpool_free(pool);
     return NULL;
     //LCOV_EXCL_STOP
@@ -598,6 +656,32 @@ threadpool_free_(threadpool_t *pool)
   if (!pool)
     return;
 
+  tor_mutex_acquire(&pool->lock);
+  /* Signal the worker threads to exit */
+  pool->exit = 1;
+  /* If worker threads are waiting for work, let them continue to exit */
+  tor_cond_signal_all(&pool->condition);
+
+  log_debug(LD_GENERAL, "Signaled worker threads to exit. "
+                        "Waiting for them to exit...");
+  tor_mutex_release(&pool->lock);
+
+  /* Wait until all worker threads have exited.
+   * pool->control_lock must be prelocked here. */
+  tor_mutex_acquire(&pool->control_lock);
+  /* Unlock required, else main thread hangs on
+   * tor_mutex_uninit(&pool->control_lock) */
+  tor_mutex_release(&pool->control_lock);
+
+  /* If this message appears in the log before all threads have confirmed
+   * their exit, then pool->control_lock wasn't prelocked for some reason. */
+  log_debug(LD_GENERAL, "All worker threads have exited. "
+                        "Beginning to clean up...");
+
+  tor_cond_uninit(&pool->condition);
+  tor_mutex_uninit(&pool->lock);
+  tor_mutex_uninit(&pool->control_lock);
+
   if (pool->threads) {
     for (int i = 0; i != pool->n_threads; ++i)
       workerthread_free(pool->threads[i]);
@@ -605,21 +689,35 @@ threadpool_free_(threadpool_t *pool)
     tor_free(pool->threads);
   }
 
-  if (pool->update_args)
-    pool->free_update_arg_fn(pool->update_args);
+  if (pool->update_args) {
+    if (!pool->free_update_arg_fn)
+      log_warn(LD_GENERAL, "Freeing pool->update_args not possible. "
+                           "pool->free_update_arg_fn is not set.");
+    else
+      pool->free_update_arg_fn(pool->update_args);
+  }
 
   if (pool->reply_event) {
-    tor_event_del(pool->reply_event);
-    tor_event_free(pool->reply_event);
+    if (tor_event_del(pool->reply_event) == -1)
+      log_warn(LD_GENERAL, "libevent error: deleting reply event failed.");
+    else
+      tor_event_free(pool->reply_event);
   }
 
   if (pool->reply_queue)
     replyqueue_free(pool->reply_queue);
 
-  if (pool->new_thread_state_arg)
-    pool->free_thread_state_fn(pool->new_thread_state_arg);
+  if (pool->new_thread_state_arg) {
+    if (!pool->free_thread_state_fn)
+      log_warn(LD_GENERAL, "Freeing pool->new_thread_state_arg not possible. "
+                           "pool->free_thread_state_fn is not set.");
+    else
+      pool->free_thread_state_fn(pool->new_thread_state_arg);
+  }
 
   tor_free(pool);
+
+  log_debug(LD_GENERAL, "Cleanup finished.");
 }
 
 /** Return the reply queue associated with a given thread pool. */