]> git.ipfire.org Git - thirdparty/freeradius-server.git/commitdiff
rlm_kafka: bridge librdkafka log output into the server log
authorArran Cudbard-Bell <a.cudbardb@freeradius.org>
Wed, 22 Apr 2026 16:38:03 +0000 (12:38 -0400)
committerArran Cudbard-Bell <a.cudbardb@freeradius.org>
Wed, 22 Apr 2026 16:38:03 +0000 (12:38 -0400)
Register an rd_kafka_conf_set_log_cb on the shared producer conf in
mod_instantiate.  Every per-thread rd_kafka_conf_dup inherits it, so
broker errors, protocol traces, and any categories enabled via the
top-level `debug` knob now feed through the server's ERROR / WARN /
INFO / DEBUG macros with a pre-rendered "rlm_kafka (<instance>)"
prefix instead of going to librdkafka's default stderr sink.

Callback runs on librdkafka's internal threads so no mctx is in
scope; we stash the prefix on rlm_kafka_t at instantiate time and
reach for it via the producer's opaque (rlm_kafka_thread_t) on each
line.

src/modules/rlm_kafka/rlm_kafka.c

index e304b24831194447bbe38b74cae46e4f821eb7ce..998abc4b1513b8d749eb52192be1e4fc6c3c063b 100644 (file)
@@ -76,6 +76,10 @@ typedef struct {
        fr_kafka_conf_t                 kconf;          //!< parsed producer conf - MUST be first
        fr_time_delta_t                 flush_timeout;  //!< How long `thread_detach` waits for in-flight
                                                        //!< messages to drain before tearing down the producer.
+       char const                      *log_prefix;    //!< pre-rendered `"rlm_kafka (<instance>)"`, used by
+                                                       //!< librdkafka's log_cb which fires from internal
+                                                       //!< threads with no mctx in scope.  Built once in
+                                                       //!< mod_instantiate so we don't reformat per line.
 } rlm_kafka_t;
 
 /** Per-thread topic handle.
@@ -180,6 +184,47 @@ static void _kafka_error_cb(UNUSED rd_kafka_t *rk, int err, char const *reason,
        ERROR("%s", rd_kafka_err2name(err), reason ? reason : "<UNKNOWN ERROR>");
 }
 
+/** librdkafka log callback - bridge internal library messages into the server log
+ *
+ * Called from librdkafka's internal threads (no request context, no mctx in
+ * scope), so we pull the pre-rendered log prefix off the producer's opaque
+ * pointer (the `rlm_kafka_thread_t` we attached at thread_instantiate).
+ * Which librdkafka categories are actually emitted is controlled by the
+ * top-level `debug` config knob.
+ *
+ * @param[in] rk    producer handle.  `rd_kafka_opaque(rk)` is the
+ *                  `rlm_kafka_thread_t` set during thread_instantiate.
+ * @param[in] level syslog-style severity (0 emerg .. 7 debug).
+ * @param[in] fac   librdkafka facility / category, e.g. `BROKER`, `MSG`.
+ * @param[in] buf   pre-formatted message body.
+ */
+static void _kafka_log_cb(rd_kafka_t const *rk, int level, char const *fac, char const *buf)
+{
+       rlm_kafka_thread_t      *t = talloc_get_type_abort(rd_kafka_opaque(rk), rlm_kafka_thread_t);
+
+       switch (level) {
+       case 0:         /* LOG_EMERG   */
+       case 1:         /* LOG_ALERT   */
+       case 2:         /* LOG_CRIT    */
+       case 3:         /* LOG_ERR     */
+               ERROR("%s - %s: %s", t->inst->log_prefix, fac, buf);
+               break;
+
+       case 4:         /* LOG_WARNING */
+               WARN("%s - %s: %s", t->inst->log_prefix, fac, buf);
+               break;
+
+       case 5:         /* LOG_NOTICE  */
+       case 6:         /* LOG_INFO    */
+               INFO("%s - %s: %s", t->inst->log_prefix, fac, buf);
+               break;
+
+       default:        /* LOG_DEBUG and anything else */
+               DEBUG("%s - %s: %s", t->inst->log_prefix, fac, buf);
+               break;
+       }
+}
+
 /** Drain every byte currently pending on the self-pipe read end.
  *
  * librdkafka suppresses subsequent writes until the queue has been served,
@@ -990,6 +1035,27 @@ static conf_parser_t const module_config[] = {
        CONF_PARSER_TERMINATOR
 };
 
+/** Module-instance setup
+ *
+ * Pre-render the log prefix for librdkafka's log_cb (which fires from
+ * internal threads where no mctx is in scope), then register the cb on
+ * the shared producer conf.  Every per-thread `rd_kafka_conf_dup()` in
+ * `mod_thread_instantiate` inherits this registration, so all producers
+ * feed through the same bridge into the server log.
+ *
+ * @param[in] mctx module-instance ctx.
+ * @return 0 on success, -1 on error.
+ */
+static int mod_instantiate(module_inst_ctx_t const *mctx)
+{
+       rlm_kafka_t     *inst = talloc_get_type_abort(mctx->mi->data, rlm_kafka_t);
+
+       MEM(inst->log_prefix = talloc_typed_asprintf(inst, "rlm_kafka (%s)", mctx->mi->name));
+       rd_kafka_conf_set_log_cb(inst->kconf.conf, _kafka_log_cb);
+
+       return 0;
+}
+
 /** Bootstrap-phase setup.
  *
  * Just registers the `%kafka.produce()` xlat.  Topic declarations are
@@ -1023,6 +1089,7 @@ module_rlm_t rlm_kafka = {
                .thread_inst_size       = sizeof(rlm_kafka_thread_t),
                .config                 = module_config,
                .bootstrap              = mod_bootstrap,
+               .instantiate            = mod_instantiate,
                .thread_instantiate     = mod_thread_instantiate,
                .thread_detach          = mod_thread_detach
        },