return pctx;
}
-/** Stand-alone rules for per-topic `value` and `key` tmpls.
+/** Per-topic call_env rules, applied against the `topic <name>` subsection.
*
- * Reused by @ref _kafka_topic_env_parse when it emits synthetic call_env
- * entries; also carries the offsets so the framework writes expanded
- * value boxes into the right fields on `rlm_kafka_env_t`.
+ * Invoked recursively from @ref _kafka_topic_env_parse via `call_env_parse()`
+ * so the framework handles pair lookup / tmpl compilation / offset writes
+ * for us.
*/
-static call_env_parser_t const rlm_kafka_value_rule = {
- FR_CALL_ENV_OFFSET("value", FR_TYPE_STRING, CALL_ENV_FLAG_REQUIRED | CALL_ENV_FLAG_CONCAT,
- rlm_kafka_env_t, value)
-};
-static call_env_parser_t const rlm_kafka_key_rule = {
- FR_CALL_ENV_OFFSET("key", FR_TYPE_STRING, CALL_ENV_FLAG_CONCAT | CALL_ENV_FLAG_NULLABLE,
- rlm_kafka_env_t, key)
+static call_env_parser_t const topic_env[] = {
+ { FR_CALL_ENV_OFFSET("value", FR_TYPE_STRING, CALL_ENV_FLAG_REQUIRED | CALL_ENV_FLAG_CONCAT,
+ rlm_kafka_env_t, value) },
+ { FR_CALL_ENV_OFFSET("key", FR_TYPE_STRING, CALL_ENV_FLAG_CONCAT | CALL_ENV_FLAG_NULLABLE,
+ rlm_kafka_env_t, key) },
+ CALL_ENV_TERMINATOR
};
-/** Resolve the topic name from the method's second identifier, and pull
- * the topic's `value` / `key` tmpls out of its subsection.
+/** Resolve the topic named in the method's second identifier, then hand its
+ * subsection back to the call_env framework for per-topic `value` / `key`
+ * tmpl parsing.
*
* Invocations look like `kafka.produce.<topic_name>`. We:
*
- * 1. Validate the topic against the declared-topic list in the module
- * CONF_SECTION. Unknown topics fail here so typos surface at
- * startup instead of at first produce.
- * 2. Emit a synthetic call_env entry for the topic name.
- * 3. Parse the `value` pair inside the topic's subsection (required)
- * and emit a tmpl entry for it.
- * 4. Parse the optional `key` pair the same way if present.
+ * 1. Validate the topic against the declared-topic tree. Unknown topics
+ * fail here so typos surface at startup instead of at first produce.
+ * 2. Emit a synthetic call_env entry carrying the topic name.
+ * 3. Recurse into `call_env_parse()` with @ref topic_env pointed at the
+ * topic's CONF_SECTION - the framework walks `value` and `key` for us.
*
- * Per-topic `value` and `key` means each declared topic carries its
- * own payload template; operators can publish different shapes to
- * different topics from one module instance.
+ * Per-topic `value` and `key` means each declared topic carries its own
+ * payload template; operators can publish different shapes to different
+ * topics from one module instance.
*/
static int _kafka_topic_env_parse(TALLOC_CTX *ctx, call_env_parsed_head_t *out,
tmpl_rules_t const *t_rules, CONF_ITEM *ci,
{
rlm_kafka_t const *inst = talloc_get_type_abort_const(cec->mi->data, rlm_kafka_t);
fr_kafka_topic_t *topic;
- CONF_PAIR *cp;
call_env_parsed_t *parsed;
- tmpl_t *parsed_tmpl;
char const *topic_name = cec->asked->name2;
if (!topic_name || !*topic_name) {
call_env_parsed_set_data(parsed, talloc_strdup(ctx, topic_name));
/*
- * Value: required. Parse its tmpl from the topic section.
+ * Framework walks `value` / `key` inside the topic subsection
+ * according to topic_env.
*/
- cp = cf_pair_find(topic->cs, "value");
- if (!cp) {
- cf_log_err(cf_section_to_item(topic->cs),
- "Kafka topic '%s' is missing the required 'value' setting", topic_name);
- return -1;
- }
- if (call_env_parse_pair(ctx, &parsed_tmpl, t_rules, cf_pair_to_item(cp),
- cec, &rlm_kafka_value_rule) < 0) return -1;
- MEM(parsed = call_env_parsed_add(ctx, out, &rlm_kafka_value_rule));
- call_env_parsed_set_tmpl(parsed, parsed_tmpl);
-
- /*
- * Key: optional.
- */
- cp = cf_pair_find(topic->cs, "key");
- if (cp) {
- if (call_env_parse_pair(ctx, &parsed_tmpl, t_rules, cf_pair_to_item(cp),
- cec, &rlm_kafka_key_rule) < 0) return -1;
- MEM(parsed = call_env_parsed_add(ctx, out, &rlm_kafka_key_rule));
- call_env_parsed_set_tmpl(parsed, parsed_tmpl);
- }
-
- return 0;
+ return call_env_parse(ctx, out, "kafka", t_rules, topic->cs, cec, topic_env);
}
static const call_env_method_t rlm_kafka_produce_env = {