flush_timeout = 5s
}
+
+#
+# A second kafka instance pointed at a dead address so the unreachable
+# test can exercise the delivery-report failure path without
+# interfering with the broker the other tests rely on. Port 1 is
+# reserved (tcpmux) and virtually never bound, so librdkafka sees
+# immediate ECONNREFUSED on every produce attempt. `message_timeout`
+# caps how long librdkafka will retry before the DR arrives as
+# `_MSG_TIMED_OUT`; keep it short so the test finishes quickly but
+# long enough to cover a handful of reconnect cycles.
+#
+kafka kafka_unreachable {
+ server = "127.0.0.1:1"
+
+ topic {
+ freeradius-test-unreachable {
+ properties {
+ message.timeout.ms = "1000"
+ }
+ }
+
+ #
+ # Shorter timeout used by the race test, so the unlang
+ # `timeout` and librdkafka's `message.timeout.ms` expire
+ # close enough to each other that dr_msg_cb and
+ # kafka_xlat_produce_signal interleave unpredictably.
+ #
+ freeradius-test-race {
+ properties {
+ message.timeout.ms = "300"
+ }
+ }
+ }
+
+ flush_timeout = 1s
+}
--- /dev/null
+#
+# Input packet
+#
+Packet-Type = Access-Request
+User-Name = 'test'
+
+#
+# Expected answer
+#
+Packet-Type == Access-Accept
--- /dev/null
+#
+# Race dr_msg_cb against kafka_xlat_produce_signal.
+#
+# The `freeradius-test-race` topic is configured with
+# `message.timeout.ms = 300`. We wrap a batch of concurrent produces
+# in `timeout 300ms`, so the unlang cancel signal and librdkafka's
+# DR-timeout fire at approximately the same wall-clock moment. Both
+# paths run on the same worker thread's event loop, but the ordering
+# between them - "DR first, then cancel" vs "cancel first, then DR"
+# vs "DR marks runnable, cancel fires before resume gets dispatched"
+# - is determined entirely by epoll/kqueue timing.
+#
+# Running enough concurrent produces here means at least one of them
+# hits each interleaving. Any UAF, double-free, or lost pctx
+# surfaces here under ASAN/TSAN. The only behavioural assertion is
+# that we reach test_pass without the worker wedging or tripping an
+# invariant check in the interpreter.
+#
+redundant {
+ timeout 300ms {
+ parallel {
+ group { %kafka_unreachable.produce('freeradius-test-race', "race 1") }
+ group { %kafka_unreachable.produce('freeradius-test-race', "race 2") }
+ group { %kafka_unreachable.produce('freeradius-test-race', "race 3") }
+ group { %kafka_unreachable.produce('freeradius-test-race', "race 4") }
+ group { %kafka_unreachable.produce('freeradius-test-race', "race 5") }
+ group { %kafka_unreachable.produce('freeradius-test-race', "race 6") }
+ group { %kafka_unreachable.produce('freeradius-test-race', "race 7") }
+ group { %kafka_unreachable.produce('freeradius-test-race', "race 8") }
+ }
+ }
+ group {
+ ok
+ }
+}
+
+test_pass
--- /dev/null
+#
+# Input packet
+#
+Packet-Type = Access-Request
+User-Name = 'test'
+
+#
+# Expected answer
+#
+Packet-Type == Access-Accept
--- /dev/null
+#
+# Exercise the two error paths a produce can take when the broker is
+# dead: cancellation mid-flight, and delivery-report timeout.
+#
+# The `kafka_unreachable` module instance in module.conf points at
+# 127.0.0.1:1 with a 1s message_timeout, so every produce here is
+# guaranteed to fail - the only question is whether it fails via the
+# signal path or via a natural DR.
+#
+
+#
+# Cancellation path.
+#
+# `parallel` starts two branches concurrently. Branch 1 calls
+# `%kafka_unreachable.produce()` which yields waiting for a DR that
+# won't arrive inside the timeout. Branch 2 immediately calls
+# `%cancel(0)` to terminate itself; combined with the wrapping
+# `timeout 200ms`, branch 1 gets an FR_SIGNAL_CANCEL delivered to
+# kafka_produce_signal long before the 1s message_timeout fires.
+#
+# The signal handler must detach ctx->request without freeing the
+# ctx - librdkafka still owns the opaque and will call dr_msg_cb
+# when the message eventually times out. At that point dr_msg_cb
+# sees ctx->request == NULL and silently frees. No crash, no resume
+# into a freed request.
+#
+# If the worker survives this cleanly, the subsequent produces in
+# this test file will run on a healthy event loop - that's the real
+# assertion here, not anything about the rcode of the cancelled call.
+#
+redundant {
+ timeout 200ms {
+ parallel {
+ redundant {
+ %kafka_unreachable.produce('freeradius-test-unreachable', "cancelled mid-flight")
+ }
+ group {
+ %cancel(0)
+ }
+ }
+ }
+ group {
+ ok
+ }
+}
+
+#
+# Delivery-report failure path.
+#
+# No timeout wrapper this time: let the produce run to completion so
+# the DR actually fires. librdkafka can't reach the broker, retries
+# until message.timeout.ms (1s), then delivers a DR with
+# RD_KAFKA_RESP_ERR__MSG_TIMED_OUT. kafka_produce_resume translates
+# that to a false return from the xlat.
+#
+if (%kafka_unreachable.produce('freeradius-test-unreachable', "doomed")) {
+ test_fail
+}
+
+#
+# And one more, to prove the worker's produce path is still healthy
+# after both the cancel and the natural failure above.
+#
+if (%kafka_unreachable.produce('freeradius-test-unreachable', "also doomed")) {
+ test_fail
+}
+
+test_pass