fr_rb_tree_t *topics; //!< rlm_kafka_topic_t per declared topic
fr_dlist_head_t inflight; //!< outstanding rlm_kafka_msg_ctx_t
+
+#ifndef NDEBUG
+ pthread_t worker_tid; //!< pthread_self() captured at thread_instantiate.
+ //!< Debug-build sanity check: the delivery-report and
+ //!< error callbacks must run on this thread, never
+ //!< cross-thread - a cross-thread hit would mean
+ //!< librdkafka dispatched through something other than
+ //!< our polled main queue and invalidate the no-lock
+ //!< assumption around the inflight list.
+#endif
};
/** Call env for `kafka.produce.<topic>`
* @param[in] reason human-readable description.
* @param[in] uctx thread instance pointer we passed to rd_kafka_conf_set_opaque().
*/
-static void _kafka_error_cb(UNUSED rd_kafka_t *rk, int err, char const *reason, UNUSED void *uctx)
+static void _kafka_error_cb(UNUSED rd_kafka_t *rk, int err, char const *reason, void *uctx)
{
+ rlm_kafka_thread_t *t = talloc_get_type_abort(uctx, rlm_kafka_thread_t);
+
+ /*
+ * librdkafka dispatches error events via our polled main
+ * queue, so this must fire on the worker thread that
+ * called rd_kafka_poll - otherwise the no-lock assumption
+ * around our per-thread state would be unsafe.
+ */
+ fr_assert(pthread_equal(pthread_self(), t->worker_tid) != 0);
+
ERROR("%s", rd_kafka_err2name(err), reason ? reason : "<UNKNOWN ERROR>");
}
if (!msg->_private) return;
pctx = talloc_get_type_abort(msg->_private, rlm_kafka_msg_ctx_t);
+ /*
+ * DR dispatch must happen on the thread that owns the
+ * producer - librdkafka is only allowed to wake us via
+ * the polled main queue, so a cross-thread hit here would
+ * invalidate the no-lock handling of the inflight list.
+ */
+ fr_assert(pthread_equal(pthread_self(), pctx->t->worker_tid) != 0);
+
fr_dlist_remove(&pctx->t->inflight, pctx);
if (unlikely(!pctx->request)) {
t->el = mctx->el;
t->wake_pipe[0] = t->wake_pipe[1] = -1;
+#ifndef NDEBUG
+ t->worker_tid = pthread_self();
+#endif
+
fr_dlist_talloc_init(&t->inflight, rlm_kafka_msg_ctx_t, entry);
/*