]> git.ipfire.org Git - thirdparty/freeradius-server.git/commitdiff
rlm_kafka: own flush_timeout as a module-level setting
authorArran Cudbard-Bell <a.cudbardb@freeradius.org>
Wed, 22 Apr 2026 01:15:13 +0000 (21:15 -0400)
committerArran Cudbard-Bell <a.cudbardb@freeradius.org>
Wed, 22 Apr 2026 03:48:38 +0000 (23:48 -0400)
flush_timeout controls how long thread_detach waits for in-flight
messages to drain, which is a policy decision that belongs to the
module using the kafka base library rather than to the library
itself.  Move the CONF_PARSER entry into rlm_kafka's module_config
and store the value on rlm_kafka_t.

src/modules/rlm_kafka/rlm_kafka.c

index 5349d6ec183433b3896cc5290a9572d09283ac94..1acff527bbfe0366e19f86ac8daeed17ed838a41 100644 (file)
@@ -73,6 +73,8 @@ USES_APPLE_DEPRECATED_API
  */
 typedef struct {
        fr_kafka_conf_t                 kconf;          //!< parsed producer conf - MUST be first
+       fr_time_delta_t                 flush_timeout;  //!< How long `thread_detach` waits for in-flight
+                                                       //!< messages to drain before tearing down the producer.
 } rlm_kafka_t;
 
 /** Per-thread topic handle.
@@ -872,7 +874,7 @@ static int mod_thread_detach(module_thread_inst_ctx_t const *mctx)
        if (t->rk) {
                rd_kafka_resp_err_t     ferr;
 
-               ferr = rd_kafka_flush(t->rk, fr_time_delta_to_msec(t->inst->kconf.flush_timeout));
+               ferr = rd_kafka_flush(t->rk, fr_time_delta_to_msec(t->inst->flush_timeout));
                if (ferr != RD_KAFKA_RESP_ERR_NO_ERROR) {
                        WARN("rlm_kafka: flush timed out - %d messages remain in queue",
                             rd_kafka_outq_len(t->rk));
@@ -1006,6 +1008,14 @@ static conf_parser_t const module_config[] = {
        KAFKA_BASE_CONFIG,
        KAFKA_PRODUCER_CONFIG,
 
+       /*
+        *      How long to wait for in-flight messages to drain when a
+        *      worker tears down its producer handle.  Module-level (not
+        *      a librdkafka property) so we own the CONF_PARSER entry
+        *      rather than the kafka base library.
+        */
+       { FR_CONF_OFFSET("flush_timeout", rlm_kafka_t, flush_timeout), .dflt = "5s" },
+
        CONF_PARSER_TERMINATOR
 };