]> git.ipfire.org Git - thirdparty/freeradius-server.git/commitdiff
rlm_kafka: accept a key as the middle xlat argument
authorArran Cudbard-Bell <a.cudbardb@freeradius.org>
Wed, 22 Apr 2026 17:53:59 +0000 (13:53 -0400)
committerArran Cudbard-Bell <a.cudbardb@freeradius.org>
Wed, 22 Apr 2026 17:53:59 +0000 (13:53 -0400)
%kafka.produce now takes (topic, key, value) instead of (topic, value),
so xlat callers can pick a partition the same way the method form does
via a declared topic `key = ...`.  Zero-length octets (the literal
empty string, or an attribute that expands to nothing) mean "no key"
on the wire - librdkafka falls back to its configured partitioner.

Updated existing xlat tests to pass an explicit '' key, and
xlat.unlang now covers the non-empty case too: produce to
freeradius-test-xlat-alt with a `"xlat-key"` key and assert it
round-trips byte-for-byte through the broker.

src/modules/rlm_kafka/rlm_kafka.c
src/tests/modules/kafka/base.unlang
src/tests/modules/kafka/binary.unlang
src/tests/modules/kafka/invalid.unlang
src/tests/modules/kafka/race.unlang
src/tests/modules/kafka/unreachable.unlang
src/tests/modules/kafka/xlat.unlang

index b20ae1d1298f3b7919d11855b1e8cc05bb06d774..2de67b5aefcd608d8fbfd764609385dc751b0897 100644 (file)
@@ -744,11 +744,12 @@ static void kafka_xlat_produce_signal(xlat_ctx_t const *xctx, UNUSED request_t *
 
 static xlat_arg_parser_t const kafka_xlat_produce_args[] = {
        { .required = true, .concat = true, .type = FR_TYPE_STRING },   /* topic */
+       { .required = true, .concat = true, .type = FR_TYPE_OCTETS },   /* key (zero-length octets = no key on the wire) */
        { .required = true, .concat = true, .type = FR_TYPE_OCTETS },   /* value */
        XLAT_ARG_PARSER_TERMINATOR
 };
 
-/** `%kafka.produce(topic, value)` - runtime-named produce
+/** `%kafka.produce(topic, key, value)` - runtime-named produce
  *
  * Unlike the @ref mod_produce method form (which resolves topics at
  * config-parse time), the xlat takes the topic name as a runtime
@@ -756,12 +757,19 @@ static xlat_arg_parser_t const kafka_xlat_produce_args[] = {
  *
  * @code
  *     send Accounting-Response {
- *         if (!%kafka.produce('accounting', %json.encode(&request.[*]))) {
+ *         if (!%kafka.produce('accounting', %{Acct-Session-Id}, %json.encode(&request.[*]))) {
  *             reject
  *         }
  *     }
  * @endcode
  *
+ * `key` is optional: pass an empty string (or an unset attribute) to
+ * produce without a key - librdkafka then uses the configured
+ * partitioner to spread records across partitions.  When a non-empty
+ * key is supplied, librdkafka hashes it to pick a partition, so
+ * records with the same key end up on the same partition and preserve
+ * per-key produce order on the consumer side.
+ *
  * Returns a bool: `true` on successful delivery, `false` on failure.
  * The topic must have been declared in the module config (unknown
  * topics fail the xlat) so librdkafka per-topic settings continue to
@@ -778,9 +786,12 @@ static xlat_action_t kafka_xlat_produce(UNUSED TALLOC_CTX *xctx_ctx, UNUSED fr_d
        rlm_kafka_thread_t                      *t = talloc_get_type_abort(xctx->mctx->thread, rlm_kafka_thread_t);
        rlm_kafka_xlat_thread_inst_t const      *t_inst = xctx->thread;
        fr_value_box_t                          *topic_vb = fr_value_box_list_head(in);
-       fr_value_box_t                          *value_vb = fr_value_box_list_next(in, topic_vb);
+       fr_value_box_t                          *key_vb   = fr_value_box_list_next(in, topic_vb);
+       fr_value_box_t                          *value_vb = fr_value_box_list_next(in, key_vb);
        rd_kafka_topic_t                        *topic;
        rlm_kafka_msg_ctx_t                     *pctx;
+       uint8_t const                           *key = NULL;
+       size_t                                  key_len = 0;
 
        /*
         *      Fast path: a literal topic argument was pre-resolved to
@@ -793,8 +804,19 @@ static xlat_action_t kafka_xlat_produce(UNUSED TALLOC_CTX *xctx_ctx, UNUSED fr_d
                return XLAT_ACTION_FAIL;
        }
 
+       /*
+        *      Zero-length octets (e.g. `''` or an attribute expanding
+        *      to nothing) map to "no key" on the wire - librdkafka then
+        *      uses the configured partitioner instead of key-hash
+        *      partitioning.
+        */
+       if (key_vb->vb_length > 0) {
+               key = key_vb->vb_octets;
+               key_len = key_vb->vb_length;
+       }
+
        pctx = kafka_produce_enqueue(t, request, topic,
-                                    NULL, 0,
+                                    key, key_len,
                                     value_vb->vb_octets, value_vb->vb_length);
        if (unlikely(!pctx)) return XLAT_ACTION_FAIL;
 
index b0ef264f0875e619632714ef6a91618d02f0e25a..fa4ecfb16064bfb4de2b488a70b16dac60c7f06e 100644 (file)
@@ -10,7 +10,7 @@
 %exec('/bin/sh', '-c', '"$KAFKA_SUBSCRIBE" freeradius-test-base $ENV{OUTPUT_DIR}kafka-base.json 1')
 
 control.Tmp-String-0 := "hello from rlm_kafka base test"
-if (!%kafka.produce('freeradius-test-base', control.Tmp-String-0)) {
+if (!%kafka.produce('freeradius-test-base', '', control.Tmp-String-0)) {
        test_fail
 }
 
index 428b44aa9b711974e3fbf1ec2241707320bc6ab1..30d01012590ffddbf42053fbe4b0988e09106aa9 100644 (file)
@@ -12,7 +12,7 @@
 #  control so the xlat path exercises an attribute reference.
 #
 control.Tmp-Octets-0 := 0x48656c6c6f00576f726c6400ff00ff
-if (!%kafka.produce('freeradius-test-binary', control.Tmp-Octets-0)) {
+if (!%kafka.produce('freeradius-test-binary', '', control.Tmp-Octets-0)) {
        test_fail
 }
 
@@ -27,7 +27,7 @@ if (!ok) {
 #
 #  Empty payload via the xlat form.
 #
-if (!%kafka.produce('freeradius-test-binary', "")) {
+if (!%kafka.produce('freeradius-test-binary', '', "")) {
        test_fail
 }
 
index 99f3e2a5c4c6c6fd808640ce37db7ca805ba4499..467297c2257599c3f18fd6c8d3a5b02db5b4d201 100644 (file)
@@ -8,7 +8,7 @@
 #
 #  Empty string payload.
 #
-if (!%kafka.produce('freeradius-test-invalid', "")) {
+if (!%kafka.produce('freeradius-test-invalid', '', "")) {
        test_fail
 }
 
@@ -22,7 +22,7 @@ if (!%kafka.produce('freeradius-test-invalid', "")) {
 #  sixteen 1024-byte binary classes.
 #
 control.Tmp-Octets-4 := (octets)%str.rand("1024b1024b1024b1024b1024b1024b1024b1024b1024b1024b1024b1024b1024b1024b1024b1024b")
-if (!%kafka.produce('freeradius-test-invalid', control.Tmp-Octets-4)) {
+if (!%kafka.produce('freeradius-test-invalid', '', control.Tmp-Octets-4)) {
        test_fail
 }
 
@@ -31,7 +31,7 @@ if (!%kafka.produce('freeradius-test-invalid', control.Tmp-Octets-4)) {
 #  string, all valid bytes.
 #
 control.Tmp-String-1 := "line1\nline2\ttabbed\r\n\"quoted\""
-if (!%kafka.produce('freeradius-test-invalid', control.Tmp-String-1)) {
+if (!%kafka.produce('freeradius-test-invalid', '', control.Tmp-String-1)) {
        test_fail
 }
 
@@ -39,7 +39,7 @@ if (!%kafka.produce('freeradius-test-invalid', control.Tmp-String-1)) {
 #  UTF-8 / high-bit characters.
 #
 control.Tmp-String-2 := "héllo — wörld 🚀"
-if (!%kafka.produce('freeradius-test-invalid', control.Tmp-String-2)) {
+if (!%kafka.produce('freeradius-test-invalid', '', control.Tmp-String-2)) {
        test_fail
 }
 
index 8693b01a7621210072879a40bd2ed21406a66e7e..d666464578c859d4f9510b6ce4dfc8553a2ff188 100644 (file)
 redundant {
        timeout 300ms {
                parallel {
-                       group { %kafka_unreachable.produce('freeradius-test-race', "race 1") }
-                       group { %kafka_unreachable.produce('freeradius-test-race', "race 2") }
-                       group { %kafka_unreachable.produce('freeradius-test-race', "race 3") }
-                       group { %kafka_unreachable.produce('freeradius-test-race', "race 4") }
-                       group { %kafka_unreachable.produce('freeradius-test-race', "race 5") }
-                       group { %kafka_unreachable.produce('freeradius-test-race', "race 6") }
-                       group { %kafka_unreachable.produce('freeradius-test-race', "race 7") }
-                       group { %kafka_unreachable.produce('freeradius-test-race', "race 8") }
+                       group { %kafka_unreachable.produce('freeradius-test-race', '', "race 1") }
+                       group { %kafka_unreachable.produce('freeradius-test-race', '', "race 2") }
+                       group { %kafka_unreachable.produce('freeradius-test-race', '', "race 3") }
+                       group { %kafka_unreachable.produce('freeradius-test-race', '', "race 4") }
+                       group { %kafka_unreachable.produce('freeradius-test-race', '', "race 5") }
+                       group { %kafka_unreachable.produce('freeradius-test-race', '', "race 6") }
+                       group { %kafka_unreachable.produce('freeradius-test-race', '', "race 7") }
+                       group { %kafka_unreachable.produce('freeradius-test-race', '', "race 8") }
                }
        }
        group {
index caf0af249e6614d92d7cdeae094abcba0af1f1e9..1d162a786c36985b863551c07dbe93e8de598b56 100644 (file)
@@ -32,7 +32,7 @@ redundant {
        timeout 200ms {
                parallel {
                        redundant {
-                               %kafka_unreachable.produce('freeradius-test-unreachable', "cancelled mid-flight")
+                               %kafka_unreachable.produce('freeradius-test-unreachable', '', "cancelled mid-flight")
                        }
                        group {
                                %cancel(0)
@@ -53,7 +53,7 @@ redundant {
 #  RD_KAFKA_RESP_ERR__MSG_TIMED_OUT.  kafka_produce_resume translates
 #  that to a false return from the xlat.
 #
-if (%kafka_unreachable.produce('freeradius-test-unreachable', "doomed")) {
+if (%kafka_unreachable.produce('freeradius-test-unreachable', '', "doomed")) {
        test_fail
 }
 
@@ -61,7 +61,7 @@ if (%kafka_unreachable.produce('freeradius-test-unreachable', "doomed")) {
 #  And one more, to prove the worker's produce path is still healthy
 #  after both the cancel and the natural failure above.
 #
-if (%kafka_unreachable.produce('freeradius-test-unreachable', "also doomed")) {
+if (%kafka_unreachable.produce('freeradius-test-unreachable', '', "also doomed")) {
        test_fail
 }
 
index 836fa659a07e500be8867e7cdcb3eb0d88c31ec9..7095739bf7ebb813e61256ee0477a1baca06fcf1 100644 (file)
@@ -1,5 +1,5 @@
 #
-#  Exercise the %kafka.produce(topic, value) xlat across several
+#  Exercise the %kafka.produce(topic, key, value) xlat across several
 #  produce cycles.  Each produce yields waiting for a delivery report;
 #  the resume path translates it to a bool (true = delivered).
 #
@@ -15,7 +15,7 @@
 #  before being handed to librdkafka.
 #
 control.Tmp-String-0 := "user=%{User-Name},packet=%{Packet-Type}"
-if (!%kafka.produce('freeradius-test-xlat', control.Tmp-String-0)) {
+if (!%kafka.produce('freeradius-test-xlat', '', control.Tmp-String-0)) {
        test_fail
 }
 
@@ -26,23 +26,25 @@ if (!%kafka.produce('freeradius-test-xlat', control.Tmp-String-0)) {
 #  without dropping any.  Unrolled because the unlang parser here doesn't
 #  accept `while` as a loop construct.
 #
-if (!%kafka.produce('freeradius-test-xlat', "sequential 1")) { test_fail }
-if (!%kafka.produce('freeradius-test-xlat', "sequential 2")) { test_fail }
-if (!%kafka.produce('freeradius-test-xlat', "sequential 3")) { test_fail }
-if (!%kafka.produce('freeradius-test-xlat', "sequential 4")) { test_fail }
-if (!%kafka.produce('freeradius-test-xlat', "sequential 5")) { test_fail }
+if (!%kafka.produce('freeradius-test-xlat', '', "sequential 1")) { test_fail }
+if (!%kafka.produce('freeradius-test-xlat', '', "sequential 2")) { test_fail }
+if (!%kafka.produce('freeradius-test-xlat', '', "sequential 3")) { test_fail }
+if (!%kafka.produce('freeradius-test-xlat', '', "sequential 4")) { test_fail }
+if (!%kafka.produce('freeradius-test-xlat', '', "sequential 5")) { test_fail }
 
 #
-#  Produce to a distinct topic to exercise a second topic handle cache entry.
+#  Produce to a distinct topic to exercise a second topic handle cache
+#  entry, and pass a non-empty key so we also cover the xlat key path.
+#  Same key bytes should round-trip unaltered on the consumer side.
 #
-if (!%kafka.produce('freeradius-test-xlat-alt', "alternate topic")) {
+if (!%kafka.produce('freeradius-test-xlat-alt', "xlat-key", "alternate topic")) {
        test_fail
 }
 
 #
 #  Empty value - librdkafka accepts zero-length payloads.
 #
-if (!%kafka.produce('freeradius-test-xlat', "")) {
+if (!%kafka.produce('freeradius-test-xlat', '', "")) {
        test_fail
 }
 
@@ -94,15 +96,19 @@ control.Tmp-Octets-6 := %base64.decode(control.Tmp-String-7)
 if (%length(control.Tmp-Octets-6) != 0) { test_fail }
 
 #
-#  Alternate topic: one record.
+#  Alternate topic: one record, value + key.
 #
 timeout 10s {
        map json %exec('/bin/sh', '-c', '"$KAFKA_WAIT_AND_CLEANUP" $ENV{OUTPUT_DIR}kafka-xlat-alt.json 1') {
                control.Tmp-String-8 := '$[0].value'
+               control.Tmp-String-9 := '$[0].key'
        }
 }
 
 control.Tmp-Octets-7 := %base64.decode(control.Tmp-String-8)
 if ((string)control.Tmp-Octets-7 != "alternate topic") { test_fail }
 
+control.Tmp-Octets-8 := %base64.decode(control.Tmp-String-9)
+if ((string)control.Tmp-Octets-8 != "xlat-key") { test_fail }
+
 test_pass