From ffad24d4d1871d7818a7066dfc55e76a44915a23 Mon Sep 17 00:00:00 2001 From: Arran Cudbard-Bell Date: Wed, 22 Apr 2026 13:12:50 -0400 Subject: [PATCH] rlm_kafka: debug-build sanity check that dr / error cbs run on-thread Capture pthread_self() into rlm_kafka_thread_t at thread_instantiate, then assert the same tid in _kafka_delivery_report_cb and _kafka_error_cb. librdkafka only wakes us via the main queue (which we poll from the worker's event loop), so a cross-thread hit would mean an event slipped a different path and the no-lock handling of the inflight list is unsafe. Field and assertions are #ifndef NDEBUG so release builds carry neither the extra tid nor the check - fr_assert(_x) expands to nothing under NDEBUG so the missing field doesn't matter. --- src/modules/rlm_kafka/rlm_kafka.c | 34 ++++++++++++++++++++++++++++++- 1 file changed, 33 insertions(+), 1 deletion(-) diff --git a/src/modules/rlm_kafka/rlm_kafka.c b/src/modules/rlm_kafka/rlm_kafka.c index 17f22f17ae8..47e88b2137a 100644 --- a/src/modules/rlm_kafka/rlm_kafka.c +++ b/src/modules/rlm_kafka/rlm_kafka.c @@ -129,6 +129,16 @@ struct rlm_kafka_thread_s { fr_rb_tree_t *topics; //!< rlm_kafka_topic_t per declared topic fr_dlist_head_t inflight; //!< outstanding rlm_kafka_msg_ctx_t + +#ifndef NDEBUG + pthread_t worker_tid; //!< pthread_self() captured at thread_instantiate. + //!< Debug-build sanity check: the delivery-report and + //!< error callbacks must run on this thread, never + //!< cross-thread - a cross-thread hit would mean + //!< librdkafka dispatched through something other than + //!< our polled main queue and invalidate the no-lock + //!< assumption around the inflight list. +#endif }; /** Call env for `kafka.produce.` @@ -184,8 +194,18 @@ static rd_kafka_topic_t *kafka_thread_topic(rlm_kafka_thread_t *t, char const *n * @param[in] reason human-readable description. * @param[in] uctx thread instance pointer we passed to rd_kafka_conf_set_opaque(). */ -static void _kafka_error_cb(UNUSED rd_kafka_t *rk, int err, char const *reason, UNUSED void *uctx) +static void _kafka_error_cb(UNUSED rd_kafka_t *rk, int err, char const *reason, void *uctx) { + rlm_kafka_thread_t *t = talloc_get_type_abort(uctx, rlm_kafka_thread_t); + + /* + * librdkafka dispatches error events via our polled main + * queue, so this must fire on the worker thread that + * called rd_kafka_poll - otherwise the no-lock assumption + * around our per-thread state would be unsafe. + */ + fr_assert(pthread_equal(pthread_self(), t->worker_tid) != 0); + ERROR("%s", rd_kafka_err2name(err), reason ? reason : ""); } @@ -804,6 +824,14 @@ static void _kafka_delivery_report_cb(UNUSED rd_kafka_t *rk, rd_kafka_message_t if (!msg->_private) return; pctx = talloc_get_type_abort(msg->_private, rlm_kafka_msg_ctx_t); + /* + * DR dispatch must happen on the thread that owns the + * producer - librdkafka is only allowed to wake us via + * the polled main queue, so a cross-thread hit here would + * invalidate the no-lock handling of the inflight list. + */ + fr_assert(pthread_equal(pthread_self(), pctx->t->worker_tid) != 0); + fr_dlist_remove(&pctx->t->inflight, pctx); if (unlikely(!pctx->request)) { @@ -959,6 +987,10 @@ static int mod_thread_instantiate(module_thread_inst_ctx_t const *mctx) t->el = mctx->el; t->wake_pipe[0] = t->wake_pipe[1] = -1; +#ifndef NDEBUG + t->worker_tid = pthread_self(); +#endif + fr_dlist_talloc_init(&t->inflight, rlm_kafka_msg_ctx_t, entry); /* -- 2.47.3