fr_kafka_conf_t kconf; //!< parsed producer conf - MUST be first
fr_time_delta_t flush_timeout; //!< How long `thread_detach` waits for in-flight
//!< messages to drain before tearing down the producer.
+ char const *log_prefix; //!< pre-rendered `"rlm_kafka (<instance>)"`, used by
+ //!< librdkafka's log_cb which fires from internal
+ //!< threads with no mctx in scope. Built once in
+ //!< mod_instantiate so we don't reformat per line.
} rlm_kafka_t;
/** Per-thread topic handle.
ERROR("%s", rd_kafka_err2name(err), reason ? reason : "<UNKNOWN ERROR>");
}
+/** librdkafka log callback - bridge internal library messages into the server log
+ *
+ * Called from librdkafka's internal threads (no request context, no mctx in
+ * scope), so we pull the pre-rendered log prefix off the producer's opaque
+ * pointer (the `rlm_kafka_thread_t` we attached at thread_instantiate).
+ * Which librdkafka categories are actually emitted is controlled by the
+ * top-level `debug` config knob.
+ *
+ * @param[in] rk producer handle. `rd_kafka_opaque(rk)` is the
+ * `rlm_kafka_thread_t` set during thread_instantiate.
+ * @param[in] level syslog-style severity (0 emerg .. 7 debug).
+ * @param[in] fac librdkafka facility / category, e.g. `BROKER`, `MSG`.
+ * @param[in] buf pre-formatted message body.
+ */
+static void _kafka_log_cb(rd_kafka_t const *rk, int level, char const *fac, char const *buf)
+{
+ rlm_kafka_thread_t *t = talloc_get_type_abort(rd_kafka_opaque(rk), rlm_kafka_thread_t);
+
+ switch (level) {
+ case 0: /* LOG_EMERG */
+ case 1: /* LOG_ALERT */
+ case 2: /* LOG_CRIT */
+ case 3: /* LOG_ERR */
+ ERROR("%s - %s: %s", t->inst->log_prefix, fac, buf);
+ break;
+
+ case 4: /* LOG_WARNING */
+ WARN("%s - %s: %s", t->inst->log_prefix, fac, buf);
+ break;
+
+ case 5: /* LOG_NOTICE */
+ case 6: /* LOG_INFO */
+ INFO("%s - %s: %s", t->inst->log_prefix, fac, buf);
+ break;
+
+ default: /* LOG_DEBUG and anything else */
+ DEBUG("%s - %s: %s", t->inst->log_prefix, fac, buf);
+ break;
+ }
+}
+
/** Drain every byte currently pending on the self-pipe read end.
*
* librdkafka suppresses subsequent writes until the queue has been served,
CONF_PARSER_TERMINATOR
};
+/** Module-instance setup
+ *
+ * Pre-render the log prefix for librdkafka's log_cb (which fires from
+ * internal threads where no mctx is in scope), then register the cb on
+ * the shared producer conf. Every per-thread `rd_kafka_conf_dup()` in
+ * `mod_thread_instantiate` inherits this registration, so all producers
+ * feed through the same bridge into the server log.
+ *
+ * @param[in] mctx module-instance ctx.
+ * @return 0 on success, -1 on error.
+ */
+static int mod_instantiate(module_inst_ctx_t const *mctx)
+{
+ rlm_kafka_t *inst = talloc_get_type_abort(mctx->mi->data, rlm_kafka_t);
+
+ MEM(inst->log_prefix = talloc_typed_asprintf(inst, "rlm_kafka (%s)", mctx->mi->name));
+ rd_kafka_conf_set_log_cb(inst->kconf.conf, _kafka_log_cb);
+
+ return 0;
+}
+
/** Bootstrap-phase setup.
*
* Just registers the `%kafka.produce()` xlat. Topic declarations are
.thread_inst_size = sizeof(rlm_kafka_thread_t),
.config = module_config,
.bootstrap = mod_bootstrap,
+ .instantiate = mod_instantiate,
.thread_instantiate = mod_thread_instantiate,
.thread_detach = mod_thread_detach
},