rlm_kafka: unbox async opaques with talloc_get_type_abort
xctx->inst / xctx->thread / msg->_private are all void pointers that
we know at the call-site should point back at our typed talloc chunks.
Using talloc_get_type_abort turns a subtle type-system hole into a
loud abort at the exact callsite if the opaque ever gets crossed over,
instead of a mystery crash deep in the function body.
The DR callback keeps its NULL-guard first - a NULL opaque is the
documented signal for fire-and-forget produces, so it's part of the
contract, not an error worth asserting on.
rlm_kafka: bridge librdkafka log output into the server log
Register an rd_kafka_conf_set_log_cb on the shared producer conf in
mod_instantiate. Every per-thread rd_kafka_conf_dup inherits it, so
broker errors, protocol traces, and any categories enabled via the
top-level `debug` knob now feed through the server's ERROR / WARN /
INFO / DEBUG macros with a pre-rendered "rlm_kafka (<instance>)"
prefix instead of going to librdkafka's default stderr sink.
Callback runs on librdkafka's internal threads so no mctx is in
scope; we stash the prefix on rlm_kafka_t at instantiate time and
reach for it via the producer's opaque (rlm_kafka_thread_t) on each
line.
tests/multi-server: put kafka-produce back in the CI short suite
The client-cache fix in b56deabae7 ("Create different client cache
lists for TCP and UDP") addresses the underlying reason the
kafka-produce test was timing out in CI - the load-generator's own
Status-Server client lookup was failing the same way as
proxy-multihop-accept's. Rename short.test.yml back to
short.ci.test.yml so it runs under test.multi-server.ci again.
tests/multi-server: drop kafka-produce from the CI short suite
The kafka-produce multi-server test runs green locally against a
docker-compose redpanda, but on the shared GitHub Actions runners the
redpanda container fails to reach a healthy state reliably, even with
Redpanda's own dev-container preset. Rather than leave a perennially
red CI job, rename short.ci.test.yml to short.test.yml so the test
runs from `make test.multi-server` but not from
`test.multi-server.ci`. Local runs and future dedicated kafka
infrastructure can still exercise it.
CI rounds 1-2 showed two different redpanda startup failures:
- In the ci.yml / ci-sanitizers.yml `services:` stanzas, seastar
exits EINVAL on the self-hosted runners because GitHub Actions
services give us no way to override the default command line.
There's no good workaround via `options:` so drop the redpanda
service container, remove the kafka_test_server passthrough, and
let the kafka module tests skip cleanly (the existing
kafka_require_test_server gate already handles the empty-server
case). The multi-server kafka-produce test still exercises the
full produce path end-to-end.
- The multi-server docker-compose had a hand-rolled list of redpanda
start flags (`--overprovisioned --smp=1 ... --unsafe-bypass-fsync`)
which still failed to boot on the shared runner. Replace it with
Redpanda's official single-node preset `--mode dev-container`,
which sets the right seastar probing and IO defaults for a
containerised CI environment, and keep only the overrides we
actually need on top (smp/memory/node-id and the advertised
listener address).
ci: pull redpanda from its own registry and give it a longer health window
Two things tripped up the first CI run on developer/arr2036:
- The ci.yml and ci-sanitizers.yml workflows pulled redpanda through
the FreeRADIUS internal docker mirror (docker.internal.networkradius.com),
but redpandadata/redpanda is not mirrored there, so every job with a
redpanda service container died at 'Initialize containers'. Pull
directly from docker.redpanda.com, matching what the multi-server
docker-compose already does.
- The multi-server redpanda service had a 60s start window and 30s of
retries. On a busy self-hosted runner that's marginal - we saw
kafka-produce-short_ci fail the compose-up health gate. Bump the
start_period to 120s and extend retries so we allow up to ~4 minutes
for the broker to come up.
tests/kafka: cover method dispatch forms, keys, binary and edge payloads
Extend the kafka test harness to exercise:
- method form with explicit name2 (kafka.produce / send / recv .topic)
- method form with implicit name2 (bare 'kafka' inside a recv section
that matches a topic named after the packet type)
- keyed produces (deterministic partitioning by key attr)
- binary payloads with embedded NULs and high-bit bytes
- edge-case xlat inputs (empty string, 16 KiB payload, embedded
control characters, UTF-8 / multibyte)
Topics referenced by the new tests are declared in module.conf so
unknown-topic handling continues to fail at parse time.
rlm_kafka: tighten error handling and xlat register ctx
A grab-bag of cleanups surfaced while refactoring the module:
- Trust MEM() for talloc-only failures in kafka_topic_thread_handles
(rb_tree_alloc, rd_kafka_topic_conf_dup).
- Duplicate topic handle is now an fr_cond_assert_msg - it cannot
happen if the declared-topic tree was built correctly.
- Drop the redundant kafka.produce xlat NULL-arg guard; the xlat
arg parser already enforces required=true.
- Register the produce xlat against mi->boot rather than mi->data -
xlat registration happens in bootstrap, before mi->data is
allocated and mprotected.
- Move xlat_arg_parser_t below the signal callback so the xlat
function and its args sit together.
- Drop the 'rlm_kafka:' prefix from logger calls; the logging layer
already tags module-originated messages.
Kafka payloads and keys are opaque byte strings on the wire. Typing
value/key as FR_TYPE_STRING worked accidentally because fr_value_box_t
carries an explicit length, but it invited NUL-termination or UTF-8
assumptions to creep in from intermediate tmpl expansion.
Switch both the per-topic call_env rules and the xlat value argument
to FR_TYPE_OCTETS, and read .vb_octets instead of .vb_strvalue on the
produce paths. As a bonus, an integer-typed key attribute now
serialises in network byte order - matching what other Kafka clients
do - so the same numeric key hashes to the same partition regardless
of producer.
rlm_kafka: delegate per-topic value/key parsing to call_env framework
The topic-subsection callback was walking value and key by hand with
cf_pair_find + call_env_parse_pair + call_env_parsed_add +
call_env_parsed_set_tmpl, reimplementing what call_env_parse() already
does when given a rules array pointed at a CONF_SECTION.
Replace the bespoke walker with a single static topic_env[] array and
a recursive call_env_parse() against the topic's subsection. The
framework handles pair lookup, tmpl compilation, offset writes, and
required/nullable enforcement.
Plus the topic handle's rd_kafka_topic_t field and the loop locals in
kafka_topic_thread_handles (rkt/h/tc -> kt/topic_t/ktc). Pure rename,
no behaviour change.
rlm_kafka: own flush_timeout as a module-level setting
flush_timeout controls how long thread_detach waits for in-flight
messages to drain, which is a policy decision that belongs to the
module using the kafka base library rather than to the library
itself. Move the CONF_PARSER entry into rlm_kafka's module_config
and store the value on rlm_kafka_t.
rlm_kafka: implement async producer with event-triggered I/O
Replaces the stub with a full async Kafka producer.
* Per-worker rd_kafka_t: each worker thread owns its own producer handle
so delivery report callbacks always run on the worker that initiated
the produce. No cross-thread wakeups, no resume races.
* Self-pipe I/O: librdkafka's main queue is wired to a self-pipe via
rd_kafka_queue_io_event_enable(). The pipe's read end sits in the
worker's event loop; kafka_fd_readable() drains it then polls the
producer with a zero-timeout rd_kafka_poll() loop to dispatch DRs
and broker errors.
* Module surface: kafka.produce method (topic/key/value/headers
call_env) plus %kafka.produce(topic, value) xlat. Both yield waiting
for the delivery report and resume with a rcode that distinguishes
transient failure (fail), permanent rejection (reject), timeout
(timeout), and success.
* Cancellation: signal handler detaches request from the in-flight
ctx rather than trying to recall the message (librdkafka has no
cancel API). dr_msg_cb sees request == NULL and silently frees.
Safe against dr_msg_cb racing because both fire on the same worker
thread.
* Shutdown: thread_detach flushes outstanding produces within the
configured flush_timeout, then drains any queued DRs before tearing
down the producer. Inflight ctxs whose DRs never arrive are walked
and their requests nulled out as a belt-and-suspenders.
The module gates off the build system automatically via the existing
src/lib/kafka/all.mk include if librdkafka isn't available.
lib/kafka: expose fr_kafka_conf_t and accessors to modules
Promote kafka_conf_from_cs() and kafka_topic_conf_from_cs() from static
inline to extern, and move the fr_kafka_conf_t / fr_kafka_topic_conf_t
typedefs into the public header so modules can parse kafka config
subsections and hand the resulting rd_kafka_conf_t to rd_kafka_new().
Also fixes a sbuff terminator construction bug in kafka_config_dflt()
where FR_SBUFF_TERM() was being applied to a runtime pointer (sizeof
produces pointer size, not string length), and adds subcs_size metadata
to the topic multi-subsection so cf_parse doesn't assert.
Nick Porter [Thu, 9 Apr 2026 13:05:43 +0000 (14:05 +0100)]
Only standard modules register xlats with their own name
Without this, if a virtual server, for example, has the same name as a
module which registers an xlat in its name, then, during server
shutdown, removing the process module for the virtual server attempts to
unregister the xlat which it doesn't own and leads to a seg fault.
Nick Porter [Thu, 9 Apr 2026 13:55:16 +0000 (14:55 +0100)]
Parent coord_pair_reg off the list of registrations
As with coord_reg, the entry component of the registration will change
as additional modules register coord_pair, which conflicts with module
instance data protection.
Nick Porter [Wed, 8 Apr 2026 10:55:46 +0000 (11:55 +0100)]
Parent coordinator registrations off the list of registrations
When more than one module registers a coordinator, the "previous"
registration changes when the new one is added to the list. If the
registration is parented off the module instance data then that gets
protected - so a seg fault happens when the second registration is
added. Parenting the registration off the list removes this issue.
Alan T. DeKok [Sat, 4 Apr 2026 15:57:19 +0000 (11:57 -0400)]
add assume() macro
which is a hint to the compiler that a variable can have a
particular value. It's not an assertion, but it lets the compiler
know that it can make more optimizations based on the given
assumption.