*/
typedef struct {
fr_kafka_conf_t kconf; //!< parsed producer conf - MUST be first
+ fr_time_delta_t flush_timeout; //!< How long `thread_detach` waits for in-flight
+ //!< messages to drain before tearing down the producer.
} rlm_kafka_t;
/** Per-thread topic handle.
if (t->rk) {
rd_kafka_resp_err_t ferr;
- ferr = rd_kafka_flush(t->rk, fr_time_delta_to_msec(t->inst->kconf.flush_timeout));
+ ferr = rd_kafka_flush(t->rk, fr_time_delta_to_msec(t->inst->flush_timeout));
if (ferr != RD_KAFKA_RESP_ERR_NO_ERROR) {
WARN("rlm_kafka: flush timed out - %d messages remain in queue",
rd_kafka_outq_len(t->rk));
KAFKA_BASE_CONFIG,
KAFKA_PRODUCER_CONFIG,
+ /*
+ * How long to wait for in-flight messages to drain when a
+ * worker tears down its producer handle. Module-level (not
+ * a librdkafka property) so we own the CONF_PARSER entry
+ * rather than the kafka base library.
+ */
+ { FR_CONF_OFFSET("flush_timeout", rlm_kafka_t, flush_timeout), .dflt = "5s" },
+
CONF_PARSER_TERMINATOR
};