static xlat_arg_parser_t const kafka_xlat_produce_args[] = {
{ .required = true, .concat = true, .type = FR_TYPE_STRING }, /* topic */
+ { .required = true, .concat = true, .type = FR_TYPE_OCTETS }, /* key (zero-length octets = no key on the wire) */
{ .required = true, .concat = true, .type = FR_TYPE_OCTETS }, /* value */
XLAT_ARG_PARSER_TERMINATOR
};
-/** `%kafka.produce(topic, value)` - runtime-named produce
+/** `%kafka.produce(topic, key, value)` - runtime-named produce
*
* Unlike the @ref mod_produce method form (which resolves topics at
* config-parse time), the xlat takes the topic name as a runtime
*
* @code
* send Accounting-Response {
- * if (!%kafka.produce('accounting', %json.encode(&request.[*]))) {
+ * if (!%kafka.produce('accounting', %{Acct-Session-Id}, %json.encode(&request.[*]))) {
* reject
* }
* }
* @endcode
*
+ * `key` is optional: pass an empty string (or an unset attribute) to
+ * produce without a key - librdkafka then uses the configured
+ * partitioner to spread records across partitions. When a non-empty
+ * key is supplied, librdkafka hashes it to pick a partition, so
+ * records with the same key end up on the same partition and preserve
+ * per-key produce order on the consumer side.
+ *
* Returns a bool: `true` on successful delivery, `false` on failure.
* The topic must have been declared in the module config (unknown
* topics fail the xlat) so librdkafka per-topic settings continue to
rlm_kafka_thread_t *t = talloc_get_type_abort(xctx->mctx->thread, rlm_kafka_thread_t);
rlm_kafka_xlat_thread_inst_t const *t_inst = xctx->thread;
fr_value_box_t *topic_vb = fr_value_box_list_head(in);
- fr_value_box_t *value_vb = fr_value_box_list_next(in, topic_vb);
+ fr_value_box_t *key_vb = fr_value_box_list_next(in, topic_vb);
+ fr_value_box_t *value_vb = fr_value_box_list_next(in, key_vb);
rd_kafka_topic_t *topic;
rlm_kafka_msg_ctx_t *pctx;
+ uint8_t const *key = NULL;
+ size_t key_len = 0;
/*
* Fast path: a literal topic argument was pre-resolved to
return XLAT_ACTION_FAIL;
}
+ /*
+ * Zero-length octets (e.g. `''` or an attribute expanding
+ * to nothing) map to "no key" on the wire - librdkafka then
+ * uses the configured partitioner instead of key-hash
+ * partitioning.
+ */
+ if (key_vb->vb_length > 0) {
+ key = key_vb->vb_octets;
+ key_len = key_vb->vb_length;
+ }
+
pctx = kafka_produce_enqueue(t, request, topic,
- NULL, 0,
+ key, key_len,
value_vb->vb_octets, value_vb->vb_length);
if (unlikely(!pctx)) return XLAT_ACTION_FAIL;
%exec('/bin/sh', '-c', '"$KAFKA_SUBSCRIBE" freeradius-test-base $ENV{OUTPUT_DIR}kafka-base.json 1')
control.Tmp-String-0 := "hello from rlm_kafka base test"
-if (!%kafka.produce('freeradius-test-base', control.Tmp-String-0)) {
+if (!%kafka.produce('freeradius-test-base', '', control.Tmp-String-0)) {
test_fail
}
# control so the xlat path exercises an attribute reference.
#
control.Tmp-Octets-0 := 0x48656c6c6f00576f726c6400ff00ff
-if (!%kafka.produce('freeradius-test-binary', control.Tmp-Octets-0)) {
+if (!%kafka.produce('freeradius-test-binary', '', control.Tmp-Octets-0)) {
test_fail
}
#
# Empty payload via the xlat form.
#
-if (!%kafka.produce('freeradius-test-binary', "")) {
+if (!%kafka.produce('freeradius-test-binary', '', "")) {
test_fail
}
#
# Empty string payload.
#
-if (!%kafka.produce('freeradius-test-invalid', "")) {
+if (!%kafka.produce('freeradius-test-invalid', '', "")) {
test_fail
}
# sixteen 1024-byte binary classes.
#
control.Tmp-Octets-4 := (octets)%str.rand("1024b1024b1024b1024b1024b1024b1024b1024b1024b1024b1024b1024b1024b1024b1024b1024b")
-if (!%kafka.produce('freeradius-test-invalid', control.Tmp-Octets-4)) {
+if (!%kafka.produce('freeradius-test-invalid', '', control.Tmp-Octets-4)) {
test_fail
}
# string, all valid bytes.
#
control.Tmp-String-1 := "line1\nline2\ttabbed\r\n\"quoted\""
-if (!%kafka.produce('freeradius-test-invalid', control.Tmp-String-1)) {
+if (!%kafka.produce('freeradius-test-invalid', '', control.Tmp-String-1)) {
test_fail
}
# UTF-8 / high-bit characters.
#
control.Tmp-String-2 := "héllo — wörld 🚀"
-if (!%kafka.produce('freeradius-test-invalid', control.Tmp-String-2)) {
+if (!%kafka.produce('freeradius-test-invalid', '', control.Tmp-String-2)) {
test_fail
}
redundant {
timeout 300ms {
parallel {
- group { %kafka_unreachable.produce('freeradius-test-race', "race 1") }
- group { %kafka_unreachable.produce('freeradius-test-race', "race 2") }
- group { %kafka_unreachable.produce('freeradius-test-race', "race 3") }
- group { %kafka_unreachable.produce('freeradius-test-race', "race 4") }
- group { %kafka_unreachable.produce('freeradius-test-race', "race 5") }
- group { %kafka_unreachable.produce('freeradius-test-race', "race 6") }
- group { %kafka_unreachable.produce('freeradius-test-race', "race 7") }
- group { %kafka_unreachable.produce('freeradius-test-race', "race 8") }
+ group { %kafka_unreachable.produce('freeradius-test-race', '', "race 1") }
+ group { %kafka_unreachable.produce('freeradius-test-race', '', "race 2") }
+ group { %kafka_unreachable.produce('freeradius-test-race', '', "race 3") }
+ group { %kafka_unreachable.produce('freeradius-test-race', '', "race 4") }
+ group { %kafka_unreachable.produce('freeradius-test-race', '', "race 5") }
+ group { %kafka_unreachable.produce('freeradius-test-race', '', "race 6") }
+ group { %kafka_unreachable.produce('freeradius-test-race', '', "race 7") }
+ group { %kafka_unreachable.produce('freeradius-test-race', '', "race 8") }
}
}
group {
timeout 200ms {
parallel {
redundant {
- %kafka_unreachable.produce('freeradius-test-unreachable', "cancelled mid-flight")
+ %kafka_unreachable.produce('freeradius-test-unreachable', '', "cancelled mid-flight")
}
group {
%cancel(0)
# RD_KAFKA_RESP_ERR__MSG_TIMED_OUT. kafka_produce_resume translates
# that to a false return from the xlat.
#
-if (%kafka_unreachable.produce('freeradius-test-unreachable', "doomed")) {
+if (%kafka_unreachable.produce('freeradius-test-unreachable', '', "doomed")) {
test_fail
}
# And one more, to prove the worker's produce path is still healthy
# after both the cancel and the natural failure above.
#
-if (%kafka_unreachable.produce('freeradius-test-unreachable', "also doomed")) {
+if (%kafka_unreachable.produce('freeradius-test-unreachable', '', "also doomed")) {
test_fail
}
#
-# Exercise the %kafka.produce(topic, value) xlat across several
+# Exercise the %kafka.produce(topic, key, value) xlat across several
# produce cycles. Each produce yields waiting for a delivery report;
# the resume path translates it to a bool (true = delivered).
#
# before being handed to librdkafka.
#
control.Tmp-String-0 := "user=%{User-Name},packet=%{Packet-Type}"
-if (!%kafka.produce('freeradius-test-xlat', control.Tmp-String-0)) {
+if (!%kafka.produce('freeradius-test-xlat', '', control.Tmp-String-0)) {
test_fail
}
# without dropping any. Unrolled because the unlang parser here doesn't
# accept `while` as a loop construct.
#
-if (!%kafka.produce('freeradius-test-xlat', "sequential 1")) { test_fail }
-if (!%kafka.produce('freeradius-test-xlat', "sequential 2")) { test_fail }
-if (!%kafka.produce('freeradius-test-xlat', "sequential 3")) { test_fail }
-if (!%kafka.produce('freeradius-test-xlat', "sequential 4")) { test_fail }
-if (!%kafka.produce('freeradius-test-xlat', "sequential 5")) { test_fail }
+if (!%kafka.produce('freeradius-test-xlat', '', "sequential 1")) { test_fail }
+if (!%kafka.produce('freeradius-test-xlat', '', "sequential 2")) { test_fail }
+if (!%kafka.produce('freeradius-test-xlat', '', "sequential 3")) { test_fail }
+if (!%kafka.produce('freeradius-test-xlat', '', "sequential 4")) { test_fail }
+if (!%kafka.produce('freeradius-test-xlat', '', "sequential 5")) { test_fail }
#
-# Produce to a distinct topic to exercise a second topic handle cache entry.
+# Produce to a distinct topic to exercise a second topic handle cache
+# entry, and pass a non-empty key so we also cover the xlat key path.
+# Same key bytes should round-trip unaltered on the consumer side.
#
-if (!%kafka.produce('freeradius-test-xlat-alt', "alternate topic")) {
+if (!%kafka.produce('freeradius-test-xlat-alt', "xlat-key", "alternate topic")) {
test_fail
}
#
# Empty value - librdkafka accepts zero-length payloads.
#
-if (!%kafka.produce('freeradius-test-xlat', "")) {
+if (!%kafka.produce('freeradius-test-xlat', '', "")) {
test_fail
}
if (%length(control.Tmp-Octets-6) != 0) { test_fail }
#
-# Alternate topic: one record.
+# Alternate topic: one record, value + key.
#
timeout 10s {
map json %exec('/bin/sh', '-c', '"$KAFKA_WAIT_AND_CLEANUP" $ENV{OUTPUT_DIR}kafka-xlat-alt.json 1') {
control.Tmp-String-8 := '$[0].value'
+ control.Tmp-String-9 := '$[0].key'
}
}
control.Tmp-Octets-7 := %base64.decode(control.Tmp-String-8)
if ((string)control.Tmp-Octets-7 != "alternate topic") { test_fail }
+control.Tmp-Octets-8 := %base64.decode(control.Tmp-String-9)
+if ((string)control.Tmp-Octets-8 != "xlat-key") { test_fail }
+
test_pass