rlm_kafka: delegate per-topic value/key parsing to call_env framework
The topic-subsection callback was walking value and key by hand with
cf_pair_find + call_env_parse_pair + call_env_parsed_add +
call_env_parsed_set_tmpl, reimplementing what call_env_parse() already
does when given a rules array pointed at a CONF_SECTION.
Replace the bespoke walker with a single static topic_env[] array and
a recursive call_env_parse() against the topic's subsection. The
framework handles pair lookup, tmpl compilation, offset writes, and
required/nullable enforcement.
Plus the topic handle's rd_kafka_topic_t field and the loop locals in
kafka_topic_thread_handles (rkt/h/tc -> kt/topic_t/ktc). Pure rename,
no behaviour change.
rlm_kafka: own flush_timeout as a module-level setting
flush_timeout controls how long thread_detach waits for in-flight
messages to drain, which is a policy decision that belongs to the
module using the kafka base library rather than to the library
itself. Move the CONF_PARSER entry into rlm_kafka's module_config
and store the value on rlm_kafka_t.
rlm_kafka: implement async producer with event-triggered I/O
Replaces the stub with a full async Kafka producer.
* Per-worker rd_kafka_t: each worker thread owns its own producer handle
so delivery report callbacks always run on the worker that initiated
the produce. No cross-thread wakeups, no resume races.
* Self-pipe I/O: librdkafka's main queue is wired to a self-pipe via
rd_kafka_queue_io_event_enable(). The pipe's read end sits in the
worker's event loop; kafka_fd_readable() drains it then polls the
producer with a zero-timeout rd_kafka_poll() loop to dispatch DRs
and broker errors.
* Module surface: kafka.produce method (topic/key/value/headers
call_env) plus %kafka.produce(topic, value) xlat. Both yield waiting
for the delivery report and resume with a rcode that distinguishes
transient failure (fail), permanent rejection (reject), timeout
(timeout), and success.
* Cancellation: signal handler detaches request from the in-flight
ctx rather than trying to recall the message (librdkafka has no
cancel API). dr_msg_cb sees request == NULL and silently frees.
Safe against dr_msg_cb racing because both fire on the same worker
thread.
* Shutdown: thread_detach flushes outstanding produces within the
configured flush_timeout, then drains any queued DRs before tearing
down the producer. Inflight ctxs whose DRs never arrive are walked
and their requests nulled out as a belt-and-suspenders.
The module gates off the build system automatically via the existing
src/lib/kafka/all.mk include if librdkafka isn't available.
lib/kafka: expose fr_kafka_conf_t and accessors to modules
Promote kafka_conf_from_cs() and kafka_topic_conf_from_cs() from static
inline to extern, and move the fr_kafka_conf_t / fr_kafka_topic_conf_t
typedefs into the public header so modules can parse kafka config
subsections and hand the resulting rd_kafka_conf_t to rd_kafka_new().
Also fixes a sbuff terminator construction bug in kafka_config_dflt()
where FR_SBUFF_TERM() was being applied to a runtime pointer (sizeof
produces pointer size, not string length), and adds subcs_size metadata
to the topic multi-subsection so cf_parse doesn't assert.
Nick Porter [Thu, 9 Apr 2026 13:05:43 +0000 (14:05 +0100)]
Only standard modules register xlats with their own name
Without this, if a virtual server, for example, has the same name as a
module which registers an xlat in its name, then, during server
shutdown, removing the process module for the virtual server attempts to
unregister the xlat which it doesn't own and leads to a seg fault.
Nick Porter [Thu, 9 Apr 2026 13:55:16 +0000 (14:55 +0100)]
Parent coord_pair_reg off the list of registrations
As with coord_reg, the entry component of the registration will change
as additional modules register coord_pair, which conflicts with module
instance data protection.
Nick Porter [Wed, 8 Apr 2026 10:55:46 +0000 (11:55 +0100)]
Parent coordinator registrations off the list of registrations
When more than one module registers a coordinator, the "previous"
registration changes when the new one is added to the list. If the
registration is parented off the module instance data then that gets
protected - so a seg fault happens when the second registration is
added. Parenting the registration off the list removes this issue.
Alan T. DeKok [Sat, 4 Apr 2026 15:57:19 +0000 (11:57 -0400)]
add assume() macro
which is a hint to the compiler that a variable can have a
particular value. It's not an assertion, but it lets the compiler
know that it can make more optimizations based on the given
assumption.