From: Arran Cudbard-Bell Date: Wed, 22 Apr 2026 17:53:59 +0000 (-0400) Subject: rlm_kafka: accept a key as the middle xlat argument X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=30867b7535d4745bb00cfbc0aaa055b7eb2aa2b4;p=thirdparty%2Ffreeradius-server.git rlm_kafka: accept a key as the middle xlat argument %kafka.produce now takes (topic, key, value) instead of (topic, value), so xlat callers can pick a partition the same way the method form does via a declared topic `key = ...`. Zero-length octets (the literal empty string, or an attribute that expands to nothing) mean "no key" on the wire - librdkafka falls back to its configured partitioner. Updated existing xlat tests to pass an explicit '' key, and xlat.unlang now covers the non-empty case too: produce to freeradius-test-xlat-alt with a `"xlat-key"` key and assert it round-trips byte-for-byte through the broker. --- diff --git a/src/modules/rlm_kafka/rlm_kafka.c b/src/modules/rlm_kafka/rlm_kafka.c index b20ae1d1298..2de67b5aefc 100644 --- a/src/modules/rlm_kafka/rlm_kafka.c +++ b/src/modules/rlm_kafka/rlm_kafka.c @@ -744,11 +744,12 @@ static void kafka_xlat_produce_signal(xlat_ctx_t const *xctx, UNUSED request_t * static xlat_arg_parser_t const kafka_xlat_produce_args[] = { { .required = true, .concat = true, .type = FR_TYPE_STRING }, /* topic */ + { .required = true, .concat = true, .type = FR_TYPE_OCTETS }, /* key (zero-length octets = no key on the wire) */ { .required = true, .concat = true, .type = FR_TYPE_OCTETS }, /* value */ XLAT_ARG_PARSER_TERMINATOR }; -/** `%kafka.produce(topic, value)` - runtime-named produce +/** `%kafka.produce(topic, key, value)` - runtime-named produce * * Unlike the @ref mod_produce method form (which resolves topics at * config-parse time), the xlat takes the topic name as a runtime @@ -756,12 +757,19 @@ static xlat_arg_parser_t const kafka_xlat_produce_args[] = { * * @code * send Accounting-Response { - * if (!%kafka.produce('accounting', %json.encode(&request.[*]))) { + * if (!%kafka.produce('accounting', %{Acct-Session-Id}, %json.encode(&request.[*]))) { * reject * } * } * @endcode * + * `key` is optional: pass an empty string (or an unset attribute) to + * produce without a key - librdkafka then uses the configured + * partitioner to spread records across partitions. When a non-empty + * key is supplied, librdkafka hashes it to pick a partition, so + * records with the same key end up on the same partition and preserve + * per-key produce order on the consumer side. + * * Returns a bool: `true` on successful delivery, `false` on failure. * The topic must have been declared in the module config (unknown * topics fail the xlat) so librdkafka per-topic settings continue to @@ -778,9 +786,12 @@ static xlat_action_t kafka_xlat_produce(UNUSED TALLOC_CTX *xctx_ctx, UNUSED fr_d rlm_kafka_thread_t *t = talloc_get_type_abort(xctx->mctx->thread, rlm_kafka_thread_t); rlm_kafka_xlat_thread_inst_t const *t_inst = xctx->thread; fr_value_box_t *topic_vb = fr_value_box_list_head(in); - fr_value_box_t *value_vb = fr_value_box_list_next(in, topic_vb); + fr_value_box_t *key_vb = fr_value_box_list_next(in, topic_vb); + fr_value_box_t *value_vb = fr_value_box_list_next(in, key_vb); rd_kafka_topic_t *topic; rlm_kafka_msg_ctx_t *pctx; + uint8_t const *key = NULL; + size_t key_len = 0; /* * Fast path: a literal topic argument was pre-resolved to @@ -793,8 +804,19 @@ static xlat_action_t kafka_xlat_produce(UNUSED TALLOC_CTX *xctx_ctx, UNUSED fr_d return XLAT_ACTION_FAIL; } + /* + * Zero-length octets (e.g. `''` or an attribute expanding + * to nothing) map to "no key" on the wire - librdkafka then + * uses the configured partitioner instead of key-hash + * partitioning. + */ + if (key_vb->vb_length > 0) { + key = key_vb->vb_octets; + key_len = key_vb->vb_length; + } + pctx = kafka_produce_enqueue(t, request, topic, - NULL, 0, + key, key_len, value_vb->vb_octets, value_vb->vb_length); if (unlikely(!pctx)) return XLAT_ACTION_FAIL; diff --git a/src/tests/modules/kafka/base.unlang b/src/tests/modules/kafka/base.unlang index b0ef264f087..fa4ecfb1606 100644 --- a/src/tests/modules/kafka/base.unlang +++ b/src/tests/modules/kafka/base.unlang @@ -10,7 +10,7 @@ %exec('/bin/sh', '-c', '"$KAFKA_SUBSCRIBE" freeradius-test-base $ENV{OUTPUT_DIR}kafka-base.json 1') control.Tmp-String-0 := "hello from rlm_kafka base test" -if (!%kafka.produce('freeradius-test-base', control.Tmp-String-0)) { +if (!%kafka.produce('freeradius-test-base', '', control.Tmp-String-0)) { test_fail } diff --git a/src/tests/modules/kafka/binary.unlang b/src/tests/modules/kafka/binary.unlang index 428b44aa9b7..30d01012590 100644 --- a/src/tests/modules/kafka/binary.unlang +++ b/src/tests/modules/kafka/binary.unlang @@ -12,7 +12,7 @@ # control so the xlat path exercises an attribute reference. # control.Tmp-Octets-0 := 0x48656c6c6f00576f726c6400ff00ff -if (!%kafka.produce('freeradius-test-binary', control.Tmp-Octets-0)) { +if (!%kafka.produce('freeradius-test-binary', '', control.Tmp-Octets-0)) { test_fail } @@ -27,7 +27,7 @@ if (!ok) { # # Empty payload via the xlat form. # -if (!%kafka.produce('freeradius-test-binary', "")) { +if (!%kafka.produce('freeradius-test-binary', '', "")) { test_fail } diff --git a/src/tests/modules/kafka/invalid.unlang b/src/tests/modules/kafka/invalid.unlang index 99f3e2a5c4c..467297c2257 100644 --- a/src/tests/modules/kafka/invalid.unlang +++ b/src/tests/modules/kafka/invalid.unlang @@ -8,7 +8,7 @@ # # Empty string payload. # -if (!%kafka.produce('freeradius-test-invalid', "")) { +if (!%kafka.produce('freeradius-test-invalid', '', "")) { test_fail } @@ -22,7 +22,7 @@ if (!%kafka.produce('freeradius-test-invalid', "")) { # sixteen 1024-byte binary classes. # control.Tmp-Octets-4 := (octets)%str.rand("1024b1024b1024b1024b1024b1024b1024b1024b1024b1024b1024b1024b1024b1024b1024b1024b") -if (!%kafka.produce('freeradius-test-invalid', control.Tmp-Octets-4)) { +if (!%kafka.produce('freeradius-test-invalid', '', control.Tmp-Octets-4)) { test_fail } @@ -31,7 +31,7 @@ if (!%kafka.produce('freeradius-test-invalid', control.Tmp-Octets-4)) { # string, all valid bytes. # control.Tmp-String-1 := "line1\nline2\ttabbed\r\n\"quoted\"" -if (!%kafka.produce('freeradius-test-invalid', control.Tmp-String-1)) { +if (!%kafka.produce('freeradius-test-invalid', '', control.Tmp-String-1)) { test_fail } @@ -39,7 +39,7 @@ if (!%kafka.produce('freeradius-test-invalid', control.Tmp-String-1)) { # UTF-8 / high-bit characters. # control.Tmp-String-2 := "héllo — wörld 🚀" -if (!%kafka.produce('freeradius-test-invalid', control.Tmp-String-2)) { +if (!%kafka.produce('freeradius-test-invalid', '', control.Tmp-String-2)) { test_fail } diff --git a/src/tests/modules/kafka/race.unlang b/src/tests/modules/kafka/race.unlang index 8693b01a762..d666464578c 100644 --- a/src/tests/modules/kafka/race.unlang +++ b/src/tests/modules/kafka/race.unlang @@ -19,14 +19,14 @@ redundant { timeout 300ms { parallel { - group { %kafka_unreachable.produce('freeradius-test-race', "race 1") } - group { %kafka_unreachable.produce('freeradius-test-race', "race 2") } - group { %kafka_unreachable.produce('freeradius-test-race', "race 3") } - group { %kafka_unreachable.produce('freeradius-test-race', "race 4") } - group { %kafka_unreachable.produce('freeradius-test-race', "race 5") } - group { %kafka_unreachable.produce('freeradius-test-race', "race 6") } - group { %kafka_unreachable.produce('freeradius-test-race', "race 7") } - group { %kafka_unreachable.produce('freeradius-test-race', "race 8") } + group { %kafka_unreachable.produce('freeradius-test-race', '', "race 1") } + group { %kafka_unreachable.produce('freeradius-test-race', '', "race 2") } + group { %kafka_unreachable.produce('freeradius-test-race', '', "race 3") } + group { %kafka_unreachable.produce('freeradius-test-race', '', "race 4") } + group { %kafka_unreachable.produce('freeradius-test-race', '', "race 5") } + group { %kafka_unreachable.produce('freeradius-test-race', '', "race 6") } + group { %kafka_unreachable.produce('freeradius-test-race', '', "race 7") } + group { %kafka_unreachable.produce('freeradius-test-race', '', "race 8") } } } group { diff --git a/src/tests/modules/kafka/unreachable.unlang b/src/tests/modules/kafka/unreachable.unlang index caf0af249e6..1d162a786c3 100644 --- a/src/tests/modules/kafka/unreachable.unlang +++ b/src/tests/modules/kafka/unreachable.unlang @@ -32,7 +32,7 @@ redundant { timeout 200ms { parallel { redundant { - %kafka_unreachable.produce('freeradius-test-unreachable', "cancelled mid-flight") + %kafka_unreachable.produce('freeradius-test-unreachable', '', "cancelled mid-flight") } group { %cancel(0) @@ -53,7 +53,7 @@ redundant { # RD_KAFKA_RESP_ERR__MSG_TIMED_OUT. kafka_produce_resume translates # that to a false return from the xlat. # -if (%kafka_unreachable.produce('freeradius-test-unreachable', "doomed")) { +if (%kafka_unreachable.produce('freeradius-test-unreachable', '', "doomed")) { test_fail } @@ -61,7 +61,7 @@ if (%kafka_unreachable.produce('freeradius-test-unreachable', "doomed")) { # And one more, to prove the worker's produce path is still healthy # after both the cancel and the natural failure above. # -if (%kafka_unreachable.produce('freeradius-test-unreachable', "also doomed")) { +if (%kafka_unreachable.produce('freeradius-test-unreachable', '', "also doomed")) { test_fail } diff --git a/src/tests/modules/kafka/xlat.unlang b/src/tests/modules/kafka/xlat.unlang index 836fa659a07..7095739bf7e 100644 --- a/src/tests/modules/kafka/xlat.unlang +++ b/src/tests/modules/kafka/xlat.unlang @@ -1,5 +1,5 @@ # -# Exercise the %kafka.produce(topic, value) xlat across several +# Exercise the %kafka.produce(topic, key, value) xlat across several # produce cycles. Each produce yields waiting for a delivery report; # the resume path translates it to a bool (true = delivered). # @@ -15,7 +15,7 @@ # before being handed to librdkafka. # control.Tmp-String-0 := "user=%{User-Name},packet=%{Packet-Type}" -if (!%kafka.produce('freeradius-test-xlat', control.Tmp-String-0)) { +if (!%kafka.produce('freeradius-test-xlat', '', control.Tmp-String-0)) { test_fail } @@ -26,23 +26,25 @@ if (!%kafka.produce('freeradius-test-xlat', control.Tmp-String-0)) { # without dropping any. Unrolled because the unlang parser here doesn't # accept `while` as a loop construct. # -if (!%kafka.produce('freeradius-test-xlat', "sequential 1")) { test_fail } -if (!%kafka.produce('freeradius-test-xlat', "sequential 2")) { test_fail } -if (!%kafka.produce('freeradius-test-xlat', "sequential 3")) { test_fail } -if (!%kafka.produce('freeradius-test-xlat', "sequential 4")) { test_fail } -if (!%kafka.produce('freeradius-test-xlat', "sequential 5")) { test_fail } +if (!%kafka.produce('freeradius-test-xlat', '', "sequential 1")) { test_fail } +if (!%kafka.produce('freeradius-test-xlat', '', "sequential 2")) { test_fail } +if (!%kafka.produce('freeradius-test-xlat', '', "sequential 3")) { test_fail } +if (!%kafka.produce('freeradius-test-xlat', '', "sequential 4")) { test_fail } +if (!%kafka.produce('freeradius-test-xlat', '', "sequential 5")) { test_fail } # -# Produce to a distinct topic to exercise a second topic handle cache entry. +# Produce to a distinct topic to exercise a second topic handle cache +# entry, and pass a non-empty key so we also cover the xlat key path. +# Same key bytes should round-trip unaltered on the consumer side. # -if (!%kafka.produce('freeradius-test-xlat-alt', "alternate topic")) { +if (!%kafka.produce('freeradius-test-xlat-alt', "xlat-key", "alternate topic")) { test_fail } # # Empty value - librdkafka accepts zero-length payloads. # -if (!%kafka.produce('freeradius-test-xlat', "")) { +if (!%kafka.produce('freeradius-test-xlat', '', "")) { test_fail } @@ -94,15 +96,19 @@ control.Tmp-Octets-6 := %base64.decode(control.Tmp-String-7) if (%length(control.Tmp-Octets-6) != 0) { test_fail } # -# Alternate topic: one record. +# Alternate topic: one record, value + key. # timeout 10s { map json %exec('/bin/sh', '-c', '"$KAFKA_WAIT_AND_CLEANUP" $ENV{OUTPUT_DIR}kafka-xlat-alt.json 1') { control.Tmp-String-8 := '$[0].value' + control.Tmp-String-9 := '$[0].key' } } control.Tmp-Octets-7 := %base64.decode(control.Tmp-String-8) if ((string)control.Tmp-Octets-7 != "alternate topic") { test_fail } +control.Tmp-Octets-8 := %base64.decode(control.Tmp-String-9) +if ((string)control.Tmp-Octets-8 != "xlat-key") { test_fail } + test_pass