]> git.ipfire.org Git - thirdparty/freeradius-server.git/commitdiff
kafka: Basic unreachable test, and cancellation race
authorArran Cudbard-Bell <a.cudbardb@freeradius.org>
Wed, 22 Apr 2026 16:10:07 +0000 (12:10 -0400)
committerArran Cudbard-Bell <a.cudbardb@freeradius.org>
Wed, 22 Apr 2026 16:10:07 +0000 (12:10 -0400)
src/tests/modules/kafka/module.conf
src/tests/modules/kafka/race.attrs [new file with mode: 0644]
src/tests/modules/kafka/race.unlang [new file with mode: 0644]
src/tests/modules/kafka/unreachable.attrs [new file with mode: 0644]
src/tests/modules/kafka/unreachable.unlang [new file with mode: 0644]

index 98b13c3ca2dea5992896fec174e6ab2b4d83467b..9f1ee6d98f60fcc39d5b28c360864aadccee2b9c 100644 (file)
@@ -85,3 +85,39 @@ kafka {
 
        flush_timeout = 5s
 }
+
+#
+#  A second kafka instance pointed at a dead address so the unreachable
+#  test can exercise the delivery-report failure path without
+#  interfering with the broker the other tests rely on.  Port 1 is
+#  reserved (tcpmux) and virtually never bound, so librdkafka sees
+#  immediate ECONNREFUSED on every produce attempt.  `message_timeout`
+#  caps how long librdkafka will retry before the DR arrives as
+#  `_MSG_TIMED_OUT`; keep it short so the test finishes quickly but
+#  long enough to cover a handful of reconnect cycles.
+#
+kafka kafka_unreachable {
+       server = "127.0.0.1:1"
+
+       topic {
+               freeradius-test-unreachable {
+                       properties {
+                               message.timeout.ms = "1000"
+                       }
+               }
+
+               #
+               #  Shorter timeout used by the race test, so the unlang
+               #  `timeout` and librdkafka's `message.timeout.ms` expire
+               #  close enough to each other that dr_msg_cb and
+               #  kafka_xlat_produce_signal interleave unpredictably.
+               #
+               freeradius-test-race {
+                       properties {
+                               message.timeout.ms = "300"
+                       }
+               }
+       }
+
+       flush_timeout = 1s
+}
diff --git a/src/tests/modules/kafka/race.attrs b/src/tests/modules/kafka/race.attrs
new file mode 100644 (file)
index 0000000..a4ee6fb
--- /dev/null
@@ -0,0 +1,10 @@
+#
+#  Input packet
+#
+Packet-Type = Access-Request
+User-Name = 'test'
+
+#
+#  Expected answer
+#
+Packet-Type == Access-Accept
diff --git a/src/tests/modules/kafka/race.unlang b/src/tests/modules/kafka/race.unlang
new file mode 100644 (file)
index 0000000..8693b01
--- /dev/null
@@ -0,0 +1,37 @@
+#
+#  Race dr_msg_cb against kafka_xlat_produce_signal.
+#
+#  The `freeradius-test-race` topic is configured with
+#  `message.timeout.ms = 300`.  We wrap a batch of concurrent produces
+#  in `timeout 300ms`, so the unlang cancel signal and librdkafka's
+#  DR-timeout fire at approximately the same wall-clock moment.  Both
+#  paths run on the same worker thread's event loop, but the ordering
+#  between them - "DR first, then cancel" vs "cancel first, then DR"
+#  vs "DR marks runnable, cancel fires before resume gets dispatched"
+#  - is determined entirely by epoll/kqueue timing.
+#
+#  Running enough concurrent produces here means at least one of them
+#  hits each interleaving.  Any UAF, double-free, or lost pctx
+#  surfaces here under ASAN/TSAN.  The only behavioural assertion is
+#  that we reach test_pass without the worker wedging or tripping an
+#  invariant check in the interpreter.
+#
+redundant {
+       timeout 300ms {
+               parallel {
+                       group { %kafka_unreachable.produce('freeradius-test-race', "race 1") }
+                       group { %kafka_unreachable.produce('freeradius-test-race', "race 2") }
+                       group { %kafka_unreachable.produce('freeradius-test-race', "race 3") }
+                       group { %kafka_unreachable.produce('freeradius-test-race', "race 4") }
+                       group { %kafka_unreachable.produce('freeradius-test-race', "race 5") }
+                       group { %kafka_unreachable.produce('freeradius-test-race', "race 6") }
+                       group { %kafka_unreachable.produce('freeradius-test-race', "race 7") }
+                       group { %kafka_unreachable.produce('freeradius-test-race', "race 8") }
+               }
+       }
+       group {
+               ok
+       }
+}
+
+test_pass
diff --git a/src/tests/modules/kafka/unreachable.attrs b/src/tests/modules/kafka/unreachable.attrs
new file mode 100644 (file)
index 0000000..a4ee6fb
--- /dev/null
@@ -0,0 +1,10 @@
+#
+#  Input packet
+#
+Packet-Type = Access-Request
+User-Name = 'test'
+
+#
+#  Expected answer
+#
+Packet-Type == Access-Accept
diff --git a/src/tests/modules/kafka/unreachable.unlang b/src/tests/modules/kafka/unreachable.unlang
new file mode 100644 (file)
index 0000000..caf0af2
--- /dev/null
@@ -0,0 +1,68 @@
+#
+#  Exercise the two error paths a produce can take when the broker is
+#  dead: cancellation mid-flight, and delivery-report timeout.
+#
+#  The `kafka_unreachable` module instance in module.conf points at
+#  127.0.0.1:1 with a 1s message_timeout, so every produce here is
+#  guaranteed to fail - the only question is whether it fails via the
+#  signal path or via a natural DR.
+#
+
+#
+#  Cancellation path.
+#
+#  `parallel` starts two branches concurrently.  Branch 1 calls
+#  `%kafka_unreachable.produce()` which yields waiting for a DR that
+#  won't arrive inside the timeout.  Branch 2 immediately calls
+#  `%cancel(0)` to terminate itself; combined with the wrapping
+#  `timeout 200ms`, branch 1 gets an FR_SIGNAL_CANCEL delivered to
+#  kafka_produce_signal long before the 1s message_timeout fires.
+#
+#  The signal handler must detach ctx->request without freeing the
+#  ctx - librdkafka still owns the opaque and will call dr_msg_cb
+#  when the message eventually times out.  At that point dr_msg_cb
+#  sees ctx->request == NULL and silently frees.  No crash, no resume
+#  into a freed request.
+#
+#  If the worker survives this cleanly, the subsequent produces in
+#  this test file will run on a healthy event loop - that's the real
+#  assertion here, not anything about the rcode of the cancelled call.
+#
+redundant {
+       timeout 200ms {
+               parallel {
+                       redundant {
+                               %kafka_unreachable.produce('freeradius-test-unreachable', "cancelled mid-flight")
+                       }
+                       group {
+                               %cancel(0)
+                       }
+               }
+       }
+       group {
+               ok
+       }
+}
+
+#
+#  Delivery-report failure path.
+#
+#  No timeout wrapper this time: let the produce run to completion so
+#  the DR actually fires.  librdkafka can't reach the broker, retries
+#  until message.timeout.ms (1s), then delivers a DR with
+#  RD_KAFKA_RESP_ERR__MSG_TIMED_OUT.  kafka_produce_resume translates
+#  that to a false return from the xlat.
+#
+if (%kafka_unreachable.produce('freeradius-test-unreachable', "doomed")) {
+       test_fail
+}
+
+#
+#  And one more, to prove the worker's produce path is still healthy
+#  after both the cancel and the natural failure above.
+#
+if (%kafka_unreachable.produce('freeradius-test-unreachable', "also doomed")) {
+       test_fail
+}
+
+test_pass