]> git.ipfire.org Git - thirdparty/freeradius-server.git/commitdiff
rlm_kafka: debug-build sanity check that dr / error cbs run on-thread
authorArran Cudbard-Bell <a.cudbardb@freeradius.org>
Wed, 22 Apr 2026 17:12:50 +0000 (13:12 -0400)
committerArran Cudbard-Bell <a.cudbardb@freeradius.org>
Wed, 22 Apr 2026 17:12:50 +0000 (13:12 -0400)
Capture pthread_self() into rlm_kafka_thread_t at thread_instantiate,
then assert the same tid in _kafka_delivery_report_cb and
_kafka_error_cb.  librdkafka only wakes us via the main queue (which
we poll from the worker's event loop), so a cross-thread hit would
mean an event slipped a different path and the no-lock handling of
the inflight list is unsafe.

Field and assertions are #ifndef NDEBUG so release builds carry
neither the extra tid nor the check - fr_assert(_x) expands to
nothing under NDEBUG so the missing field doesn't matter.

src/modules/rlm_kafka/rlm_kafka.c

index 17f22f17ae874be565e0109a7fcb1176a4467d0b..47e88b2137ab3808aaea227a83e65b8e5aa5749a 100644 (file)
@@ -129,6 +129,16 @@ struct rlm_kafka_thread_s {
        fr_rb_tree_t            *topics;        //!< rlm_kafka_topic_t per declared topic
 
        fr_dlist_head_t         inflight;       //!< outstanding rlm_kafka_msg_ctx_t
+
+#ifndef NDEBUG
+       pthread_t               worker_tid;     //!< pthread_self() captured at thread_instantiate.
+                                               //!< Debug-build sanity check: the delivery-report and
+                                               //!< error callbacks must run on this thread, never
+                                               //!< cross-thread - a cross-thread hit would mean
+                                               //!< librdkafka dispatched through something other than
+                                               //!< our polled main queue and invalidate the no-lock
+                                               //!< assumption around the inflight list.
+#endif
 };
 
 /** Call env for `kafka.produce.<topic>`
@@ -184,8 +194,18 @@ static rd_kafka_topic_t *kafka_thread_topic(rlm_kafka_thread_t *t, char const *n
  * @param[in] reason human-readable description.
  * @param[in] uctx   thread instance pointer we passed to rd_kafka_conf_set_opaque().
  */
-static void _kafka_error_cb(UNUSED rd_kafka_t *rk, int err, char const *reason, UNUSED void *uctx)
+static void _kafka_error_cb(UNUSED rd_kafka_t *rk, int err, char const *reason, void *uctx)
 {
+       rlm_kafka_thread_t      *t = talloc_get_type_abort(uctx, rlm_kafka_thread_t);
+
+       /*
+        *      librdkafka dispatches error events via our polled main
+        *      queue, so this must fire on the worker thread that
+        *      called rd_kafka_poll - otherwise the no-lock assumption
+        *      around our per-thread state would be unsafe.
+        */
+       fr_assert(pthread_equal(pthread_self(), t->worker_tid) != 0);
+
        ERROR("%s", rd_kafka_err2name(err), reason ? reason : "<UNKNOWN ERROR>");
 }
 
@@ -804,6 +824,14 @@ static void _kafka_delivery_report_cb(UNUSED rd_kafka_t *rk, rd_kafka_message_t
        if (!msg->_private) return;
        pctx = talloc_get_type_abort(msg->_private, rlm_kafka_msg_ctx_t);
 
+       /*
+        *      DR dispatch must happen on the thread that owns the
+        *      producer - librdkafka is only allowed to wake us via
+        *      the polled main queue, so a cross-thread hit here would
+        *      invalidate the no-lock handling of the inflight list.
+        */
+       fr_assert(pthread_equal(pthread_self(), pctx->t->worker_tid) != 0);
+
        fr_dlist_remove(&pctx->t->inflight, pctx);
 
        if (unlikely(!pctx->request)) {
@@ -959,6 +987,10 @@ static int mod_thread_instantiate(module_thread_inst_ctx_t const *mctx)
        t->el = mctx->el;
        t->wake_pipe[0] = t->wake_pipe[1] = -1;
 
+#ifndef NDEBUG
+       t->worker_tid = pthread_self();
+#endif
+
        fr_dlist_talloc_init(&t->inflight, rlm_kafka_msg_ctx_t, entry);
 
        /*