From: Arran Cudbard-Bell Date: Wed, 22 Apr 2026 01:16:29 +0000 (-0400) Subject: rlm_kafka: delegate per-topic value/key parsing to call_env framework X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=3e138b7c840ae2fd96d128b365c5fdf2fc84bbef;p=thirdparty%2Ffreeradius-server.git rlm_kafka: delegate per-topic value/key parsing to call_env framework The topic-subsection callback was walking value and key by hand with cf_pair_find + call_env_parse_pair + call_env_parsed_add + call_env_parsed_set_tmpl, reimplementing what call_env_parse() already does when given a rules array pointed at a CONF_SECTION. Replace the bespoke walker with a single static topic_env[] array and a recursive call_env_parse() against the topic's subsection. The framework handles pair lookup, tmpl compilation, offset writes, and required/nullable enforcement. --- diff --git a/src/modules/rlm_kafka/rlm_kafka.c b/src/modules/rlm_kafka/rlm_kafka.c index a7bd243fddc..4e0d400244e 100644 --- a/src/modules/rlm_kafka/rlm_kafka.c +++ b/src/modules/rlm_kafka/rlm_kafka.c @@ -315,37 +315,35 @@ rlm_kafka_msg_ctx_t *kafka_produce_enqueue(rlm_kafka_thread_t *t, request_t *req return pctx; } -/** Stand-alone rules for per-topic `value` and `key` tmpls. +/** Per-topic call_env rules, applied against the `topic ` subsection. * - * Reused by @ref _kafka_topic_env_parse when it emits synthetic call_env - * entries; also carries the offsets so the framework writes expanded - * value boxes into the right fields on `rlm_kafka_env_t`. + * Invoked recursively from @ref _kafka_topic_env_parse via `call_env_parse()` + * so the framework handles pair lookup / tmpl compilation / offset writes + * for us. */ -static call_env_parser_t const rlm_kafka_value_rule = { - FR_CALL_ENV_OFFSET("value", FR_TYPE_STRING, CALL_ENV_FLAG_REQUIRED | CALL_ENV_FLAG_CONCAT, - rlm_kafka_env_t, value) -}; -static call_env_parser_t const rlm_kafka_key_rule = { - FR_CALL_ENV_OFFSET("key", FR_TYPE_STRING, CALL_ENV_FLAG_CONCAT | CALL_ENV_FLAG_NULLABLE, - rlm_kafka_env_t, key) +static call_env_parser_t const topic_env[] = { + { FR_CALL_ENV_OFFSET("value", FR_TYPE_STRING, CALL_ENV_FLAG_REQUIRED | CALL_ENV_FLAG_CONCAT, + rlm_kafka_env_t, value) }, + { FR_CALL_ENV_OFFSET("key", FR_TYPE_STRING, CALL_ENV_FLAG_CONCAT | CALL_ENV_FLAG_NULLABLE, + rlm_kafka_env_t, key) }, + CALL_ENV_TERMINATOR }; -/** Resolve the topic name from the method's second identifier, and pull - * the topic's `value` / `key` tmpls out of its subsection. +/** Resolve the topic named in the method's second identifier, then hand its + * subsection back to the call_env framework for per-topic `value` / `key` + * tmpl parsing. * * Invocations look like `kafka.produce.`. We: * - * 1. Validate the topic against the declared-topic list in the module - * CONF_SECTION. Unknown topics fail here so typos surface at - * startup instead of at first produce. - * 2. Emit a synthetic call_env entry for the topic name. - * 3. Parse the `value` pair inside the topic's subsection (required) - * and emit a tmpl entry for it. - * 4. Parse the optional `key` pair the same way if present. + * 1. Validate the topic against the declared-topic tree. Unknown topics + * fail here so typos surface at startup instead of at first produce. + * 2. Emit a synthetic call_env entry carrying the topic name. + * 3. Recurse into `call_env_parse()` with @ref topic_env pointed at the + * topic's CONF_SECTION - the framework walks `value` and `key` for us. * - * Per-topic `value` and `key` means each declared topic carries its - * own payload template; operators can publish different shapes to - * different topics from one module instance. + * Per-topic `value` and `key` means each declared topic carries its own + * payload template; operators can publish different shapes to different + * topics from one module instance. */ static int _kafka_topic_env_parse(TALLOC_CTX *ctx, call_env_parsed_head_t *out, tmpl_rules_t const *t_rules, CONF_ITEM *ci, @@ -353,9 +351,7 @@ static int _kafka_topic_env_parse(TALLOC_CTX *ctx, call_env_parsed_head_t *out, { rlm_kafka_t const *inst = talloc_get_type_abort_const(cec->mi->data, rlm_kafka_t); fr_kafka_topic_t *topic; - CONF_PAIR *cp; call_env_parsed_t *parsed; - tmpl_t *parsed_tmpl; char const *topic_name = cec->asked->name2; if (!topic_name || !*topic_name) { @@ -387,31 +383,10 @@ static int _kafka_topic_env_parse(TALLOC_CTX *ctx, call_env_parsed_head_t *out, call_env_parsed_set_data(parsed, talloc_strdup(ctx, topic_name)); /* - * Value: required. Parse its tmpl from the topic section. + * Framework walks `value` / `key` inside the topic subsection + * according to topic_env. */ - cp = cf_pair_find(topic->cs, "value"); - if (!cp) { - cf_log_err(cf_section_to_item(topic->cs), - "Kafka topic '%s' is missing the required 'value' setting", topic_name); - return -1; - } - if (call_env_parse_pair(ctx, &parsed_tmpl, t_rules, cf_pair_to_item(cp), - cec, &rlm_kafka_value_rule) < 0) return -1; - MEM(parsed = call_env_parsed_add(ctx, out, &rlm_kafka_value_rule)); - call_env_parsed_set_tmpl(parsed, parsed_tmpl); - - /* - * Key: optional. - */ - cp = cf_pair_find(topic->cs, "key"); - if (cp) { - if (call_env_parse_pair(ctx, &parsed_tmpl, t_rules, cf_pair_to_item(cp), - cec, &rlm_kafka_key_rule) < 0) return -1; - MEM(parsed = call_env_parsed_add(ctx, out, &rlm_kafka_key_rule)); - call_env_parsed_set_tmpl(parsed, parsed_tmpl); - } - - return 0; + return call_env_parse(ctx, out, "kafka", t_rules, topic->cs, cec, topic_env); } static const call_env_method_t rlm_kafka_produce_env = {