]> git.ipfire.org Git - thirdparty/tor.git/commitdiff
Re: Coverity report Oct 31st, 2024 (Issue #40991)
authorexcurso <w.zimpel@dev.utilizer.de>
Thu, 27 Mar 2025 14:29:11 +0000 (14:29 +0000)
committerDavid Goulet <dgoulet@torproject.org>
Thu, 27 Mar 2025 14:29:11 +0000 (14:29 +0000)
changes/ticket40991 [new file with mode: 0644]
src/lib/evloop/workqueue.c

diff --git a/changes/ticket40991 b/changes/ticket40991
new file mode 100644 (file)
index 0000000..c8826cf
--- /dev/null
@@ -0,0 +1,4 @@
+  o Minor bugfixes (threads, memory):
+    - Rework start and exit of worker threads.
+    - Improvements in cleanup of resources used by threads.
+      Fixes bug 40991; bugfix on 0.4.8.13-dev.
index 17ab44e3ab08cbe00ed98be2ae5f4032ddbb9468..59a6cf13cdc18a845b8259dd9944cd5ff1f34424 100644 (file)
@@ -78,6 +78,8 @@ struct threadpool_t {
 
   /** Number of elements in threads. */
   int n_threads;
+  /** Number of elements to be created in threads. */
+  int n_threads_max;
   /** Mutex to protect all the above fields. */
   tor_mutex_t lock;
 
@@ -88,6 +90,11 @@ struct threadpool_t {
   void *(*new_thread_state_fn)(void*);
   void (*free_thread_state_fn)(void*);
   void *new_thread_state_arg;
+
+  /** Used for signalling the worker threads to exit. */
+  int exit;
+  /** Mutex for controlling worker threads' startup and exit. */
+  tor_mutex_t control_lock;
 };
 
 /** Used to put a workqueue_priority_t value into a bitfield. */
@@ -270,13 +277,34 @@ worker_thread_extract_next_work(workerthread_t *thread)
 static void
 worker_thread_main(void *thread_)
 {
+  static int n_worker_threads_running = 0;
   workerthread_t *thread = thread_;
   threadpool_t *pool = thread->in_pool;
   workqueue_entry_t *work;
   workqueue_reply_t result;
 
+  tor_mutex_acquire(&pool->control_lock);
+  log_debug(LD_GENERAL, "Worker thread %u/%u has started [TID: %lu].",
+            n_worker_threads_running + 1, pool->n_threads_max,
+            tor_get_thread_id());
+
+  if (++n_worker_threads_running == pool->n_threads_max)
+    tor_cond_signal_one(&pool->condition);
+
+  tor_mutex_release(&pool->control_lock);
+
+  /* Wait until all worker threads have started.
+   * pool->lock must be prelocked here. */
   tor_mutex_acquire(&pool->lock);
+
+  log_debug(LD_GENERAL, "Worker thread has entered the work loop [TID: %lu].",
+            tor_get_thread_id());
+
   while (1) {
+    /* Exit thread when signaled to exit */
+    if (pool->exit)
+      goto exit;
+
     /* lock must be held at this point. */
     while (worker_thread_has_work(thread)) {
       /* lock must be held at this point. */
@@ -290,11 +318,12 @@ worker_thread_main(void *thread_)
 
         workqueue_reply_t r = update_fn(thread->state, arg);
 
-        if (r != WQ_RPL_REPLY) {
-          return;
-        }
-
         tor_mutex_acquire(&pool->lock);
+
+        /* We may need to exit the thread. */
+        if (r != WQ_RPL_REPLY)
+          goto exit;
+
         continue;
       }
       work = worker_thread_extract_next_work(thread);
@@ -309,11 +338,11 @@ worker_thread_main(void *thread_)
       /* Queue the reply for the main thread. */
       queue_reply(thread->reply_queue, work);
 
-      /* We may need to exit the thread. */
-      if (result != WQ_RPL_REPLY) {
-        return;
-      }
       tor_mutex_acquire(&pool->lock);
+
+      /* We may need to exit the thread. */
+      if (result != WQ_RPL_REPLY)
+        goto exit;
     }
     /* At this point the lock is held, and there is no work in this thread's
      * queue. */
@@ -325,6 +354,19 @@ worker_thread_main(void *thread_)
       log_warn(LD_GENERAL, "Fail tor_cond_wait.");
     }
   }
+
+exit:
+  /* At this point pool->lock must be held */
+
+  log_debug(LD_GENERAL, "Worker thread %u/%u has exited [TID: %lu].",
+            pool->n_threads_max - n_worker_threads_running + 1,
+            pool->n_threads_max, tor_get_thread_id());
+
+  if (--n_worker_threads_running == 0)
+    /* Let the main thread know, the last worker thread has exited. */
+    tor_mutex_release(&pool->control_lock);
+
+  tor_mutex_release(&pool->lock);
 }
 
 /** Put a reply on the reply queue.  The reply must not currently be on
@@ -516,12 +558,17 @@ threadpool_start_threads(threadpool_t *pool, int n)
   if (n > MAX_THREADS)
     n = MAX_THREADS;
 
+  tor_mutex_acquire(&pool->control_lock);
   tor_mutex_acquire(&pool->lock);
 
   if (pool->n_threads < n)
     pool->threads = tor_reallocarray(pool->threads,
                                      sizeof(workerthread_t*), n);
 
+  int status = 0;
+  pool->n_threads_max = n;
+  log_debug(LD_GENERAL, "Starting worker threads...");
+
   while (pool->n_threads < n) {
     /* For half of our threads, we'll choose lower priorities permissively;
      * for the other half, we'll stick more strictly to higher priorities.
@@ -536,16 +583,80 @@ threadpool_start_threads(threadpool_t *pool, int n)
       //LCOV_EXCL_START
       tor_assert_nonfatal_unreached();
       pool->free_thread_state_fn(state);
-      tor_mutex_release(&pool->lock);
-      return -1;
+      status = -1;
+      goto check_status;
       //LCOV_EXCL_STOP
     }
     thr->index = pool->n_threads;
     pool->threads[pool->n_threads++] = thr;
   }
+
+  struct timeval tv = {.tv_sec = 30, .tv_usec = 0};
+
+  /* Wait for the last launched thread to confirm us, it has started.
+   * Wait max 30 seconds */
+  status = tor_cond_wait(&pool->condition, &pool->control_lock, &tv);
+
+check_status:
+  switch (status) {
+  case 0:
+    log_debug(LD_GENERAL, "Starting worker threads finished.");
+    break;
+  case -1:
+    log_warn(LD_GENERAL, "Failed to confirm worker threads' start up.");
+    break;
+  case 1:
+    log_warn(LD_GENERAL, "Failed to confirm worker threads' "
+                         "start up after timeout.");
+    FALLTHROUGH;
+  default:
+    status = -1;
+  }
+
+  log_debug(LD_GENERAL, "Signaled the worker threads to enter the work loop.");
+
+  /* If we had an error, let the worker threads (if any) exit directly. */
+  if (status != 0) {
+    pool->exit = 1;
+    log_debug(LD_GENERAL, "Signaled the worker threads to exit...");
+  }
+
+  /* Let worker threads enter the work loop. */
   tor_mutex_release(&pool->lock);
 
-  return 0;
+  /* pool->control_lock stays locked. This is required for the main thread
+   * to wait for the worker threads to exit on shutdown. */
+
+  return status;
+}
+
+/** Stop all worker threads */
+static void
+threadpool_stop_threads(threadpool_t *pool)
+{
+  tor_mutex_acquire(&pool->lock);
+
+  if (pool->exit == 0) {
+    /* Signal the worker threads to exit */
+    pool->exit = 1;
+    /* If worker threads are waiting for work, let them continue to exit */
+    tor_cond_signal_all(&pool->condition);
+
+    log_debug(LD_GENERAL, "Signaled worker threads to exit. "
+                          "Waiting for them to exit...");
+  }
+
+  tor_mutex_release(&pool->lock);
+
+  /* Wait until all worker threads have exited.
+   * pool->control_lock must be prelocked here. */
+  tor_mutex_acquire(&pool->control_lock);
+  /* Unlock required, else main thread hangs on mutex uninit. */
+  tor_mutex_release(&pool->control_lock);
+
+  /* If this message appears in the log before all threads have confirmed
+   * their exit, then pool->control_lock wasn't prelocked for some reason. */
+  log_debug(LD_GENERAL, "All worker threads have exited.");
 }
 
 /**
@@ -566,6 +677,9 @@ threadpool_new(int n_threads,
   pool = tor_malloc_zero(sizeof(threadpool_t));
   tor_mutex_init_nonrecursive(&pool->lock);
   tor_cond_init(&pool->condition);
+  tor_mutex_init_nonrecursive(&pool->control_lock);
+  pool->exit = 0;
+
   unsigned i;
   for (i = WORKQUEUE_PRIORITY_FIRST; i <= WORKQUEUE_PRIORITY_LAST; ++i) {
     TOR_TAILQ_INIT(&pool->work[i]);
@@ -579,8 +693,6 @@ threadpool_new(int n_threads,
   if (threadpool_start_threads(pool, n_threads) < 0) {
     //LCOV_EXCL_START
     tor_assert_nonfatal_unreached();
-    tor_cond_uninit(&pool->condition);
-    tor_mutex_uninit(&pool->lock);
     threadpool_free(pool);
     return NULL;
     //LCOV_EXCL_STOP
@@ -598,6 +710,14 @@ threadpool_free_(threadpool_t *pool)
   if (!pool)
     return;
 
+  threadpool_stop_threads(pool);
+
+  log_debug(LD_GENERAL, "Beginning to clean up...");
+
+  tor_cond_uninit(&pool->condition);
+  tor_mutex_uninit(&pool->lock);
+  tor_mutex_uninit(&pool->control_lock);
+
   if (pool->threads) {
     for (int i = 0; i != pool->n_threads; ++i)
       workerthread_free(pool->threads[i]);
@@ -605,21 +725,35 @@ threadpool_free_(threadpool_t *pool)
     tor_free(pool->threads);
   }
 
-  if (pool->update_args)
-    pool->free_update_arg_fn(pool->update_args);
+  if (pool->update_args) {
+    if (!pool->free_update_arg_fn)
+      log_warn(LD_GENERAL, "Freeing pool->update_args not possible. "
+                           "pool->free_update_arg_fn is not set.");
+    else
+      pool->free_update_arg_fn(pool->update_args);
+  }
 
   if (pool->reply_event) {
-    tor_event_del(pool->reply_event);
-    tor_event_free(pool->reply_event);
+    if (tor_event_del(pool->reply_event) == -1)
+      log_warn(LD_GENERAL, "libevent error: deleting reply event failed.");
+    else
+      tor_event_free(pool->reply_event);
   }
 
   if (pool->reply_queue)
     replyqueue_free(pool->reply_queue);
 
-  if (pool->new_thread_state_arg)
-    pool->free_thread_state_fn(pool->new_thread_state_arg);
+  if (pool->new_thread_state_arg) {
+    if (!pool->free_thread_state_fn)
+      log_warn(LD_GENERAL, "Freeing pool->new_thread_state_arg not possible. "
+                           "pool->free_thread_state_fn is not set.");
+    else
+      pool->free_thread_state_fn(pool->new_thread_state_arg);
+  }
 
   tor_free(pool);
+
+  log_debug(LD_GENERAL, "Cleanup finished.");
 }
 
 /** Return the reply queue associated with a given thread pool. */