]> git.ipfire.org Git - thirdparty/freeradius-server.git/commitdiff
Add NO_OUTPUT plug to conf parser, rename FR_CONF_FUNC to FR_CONF_PAIR_GLOBAL to...
authorArran Cudbard-Bell <a.cudbardb@freeradius.org>
Wed, 22 Apr 2026 01:12:06 +0000 (21:12 -0400)
committerArran Cudbard-Bell <a.cudbardb@freeradius.org>
Wed, 22 Apr 2026 03:48:38 +0000 (23:48 -0400)
src/lib/kafka/base.c
src/lib/kafka/base.h
src/lib/server/cf_parse.c
src/lib/server/cf_parse.h

index 7e8ef74a43df666a8ea15a373fd0fbfdcd85fd0e..29ad3d45685fe7e2473ad8b8297172d022a322e3 100644 (file)
@@ -231,7 +231,7 @@ static int kafka_noop_parse(UNUSED TALLOC_CTX *ctx, UNUSED void *out, UNUSED voi
 
 /** @name Base conf (`fr_kafka_conf_t`)
  *
- * Lifecycle, lazy-init + talloc sentinel, and the FR_CONF_FUNC parsers for
+ * Lifecycle, lazy-init + talloc sentinel, and the FR_CONF_PAIR_GLOBAL parsers for
  * the top-level `kafka { ... }` section.
  *
  * @{
@@ -471,7 +471,7 @@ int kafka_config_raw_parse(TALLOC_CTX *ctx, UNUSED void *out, void *base,
 
 /** @name Topic conf (`fr_kafka_topic_conf_t` + `fr_kafka_topic_t`)
  *
- * Per-topic lifecycle, FR_CONF_FUNC parsers for entries inside a declared
+ * Per-topic lifecycle, FR_CONF_PAIR_GLOBAL parsers for entries inside a declared
  * topic subsection, and the subsection hook that indexes each declared
  * topic onto `fr_kafka_conf_t.topics`.
  *
@@ -719,10 +719,10 @@ conf_parser_t const kafka_base_topic_properties_config[] = {
 };
 
 static conf_parser_t const kafka_sasl_oauth_config[] = {
-       { FR_CONF_FUNC("oauthbearer_conf", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt),
+       { FR_CONF_PAIR_GLOBAL("oauthbearer_conf", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt),
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "sasl.oauthbearer.config", .empty_default = true }},
 
-       { FR_CONF_FUNC("unsecure_jwt", FR_TYPE_BOOL, 0, kafka_config_parse, kafka_config_dflt),
+       { FR_CONF_PAIR_GLOBAL("unsecure_jwt", FR_TYPE_BOOL, 0, kafka_config_parse, kafka_config_dflt),
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "enable.sasl.oauthbearer.unsecure.jwt" }},
 
        CONF_PARSER_TERMINATOR
@@ -732,31 +732,31 @@ static conf_parser_t const kafka_sasl_kerberos_config[] = {
        /*
         *      Service principal
         */
-       { FR_CONF_FUNC("service_name", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt),
+       { FR_CONF_PAIR_GLOBAL("service_name", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt),
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "sasl.kerberos.service.name" }},
 
        /*
         *      Principal
         */
-       { FR_CONF_FUNC("principal", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt),
+       { FR_CONF_PAIR_GLOBAL("principal", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt),
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "sasl.kerberos.principal" }},
 
        /*
         *      knit cmd
         */
-       { FR_CONF_FUNC("kinit_cmd", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt),
+       { FR_CONF_PAIR_GLOBAL("kinit_cmd", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt),
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "sasl.kerberos.kinit.cmd" }},
 
        /*
         *      keytab
         */
-       { FR_CONF_FUNC("keytab", FR_TYPE_STRING, CONF_FLAG_FILE_READABLE, kafka_config_parse, kafka_config_dflt),
+       { FR_CONF_PAIR_GLOBAL("keytab", FR_TYPE_STRING, CONF_FLAG_FILE_READABLE, kafka_config_parse, kafka_config_dflt),
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "sasl.kerberos.kinit.keytab", .empty_default = true }},
 
        /*
         *      How long between key refreshes
         */
-       { FR_CONF_FUNC("refresh_delay", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt),
+       { FR_CONF_PAIR_GLOBAL("refresh_delay", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt),
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "sasl.kerberos.min.time.before.relogin" }},
 
        CONF_PARSER_TERMINATOR
@@ -766,19 +766,19 @@ conf_parser_t const kafka_sasl_config[] = {
        /*
         *      SASL mechanism
         */
-       { FR_CONF_FUNC("mech", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt),
+       { FR_CONF_PAIR_GLOBAL("mech", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt),
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "sasl.mechanism" }},
 
        /*
         *      Static SASL username
         */
-       { FR_CONF_FUNC("username", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt),
+       { FR_CONF_PAIR_GLOBAL("username", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt),
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "sasl.username", .empty_default = true }},
 
        /*
         *      Static SASL password
         */
-       { FR_CONF_FUNC("password", FR_TYPE_STRING, CONF_FLAG_SECRET, kafka_config_parse, kafka_config_dflt),
+       { FR_CONF_PAIR_GLOBAL("password", FR_TYPE_STRING, CONF_FLAG_SECRET, kafka_config_parse, kafka_config_dflt),
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "sasl.password", .empty_default = true }},
 
        { FR_CONF_SUBSECTION_GLOBAL("kerberos", 0, kafka_sasl_kerberos_config) },
@@ -800,55 +800,55 @@ conf_parser_t const kafka_tls_config[] = {
        /*
         *      Cipher suite list in OpenSSL's format
         */
-       { FR_CONF_FUNC("cipher_list", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt),
+       { FR_CONF_PAIR_GLOBAL("cipher_list", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt),
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "ssl.cipher.suites", .empty_default = true }},
 
        /*
         *      Curves list in OpenSSL's format
         */
-       { FR_CONF_FUNC("curve_list", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt),
+       { FR_CONF_PAIR_GLOBAL("curve_list", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt),
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "ssl.curves.list", .empty_default = true }},
 
        /*
         *      Curves list in OpenSSL's format
         */
-       { FR_CONF_FUNC("sigalg_list", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt),
+       { FR_CONF_PAIR_GLOBAL("sigalg_list", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt),
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "ssl.sigalgs.list", .empty_default = true }},
 
        /*
         *      Sets the full path to a CA certificate (used to validate
         *      the certificate the server presents).
         */
-       { FR_CONF_FUNC("ca_file", FR_TYPE_STRING, CONF_FLAG_FILE_READABLE, kafka_config_parse, kafka_config_dflt),
+       { FR_CONF_PAIR_GLOBAL("ca_file", FR_TYPE_STRING, CONF_FLAG_FILE_READABLE, kafka_config_parse, kafka_config_dflt),
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "ssl.ca.location", .empty_default = true }},
 
        /*
         *      Location of the CRL file.
         */
-       { FR_CONF_FUNC("crl_file", FR_TYPE_STRING, CONF_FLAG_FILE_READABLE, kafka_config_parse, kafka_config_dflt),
+       { FR_CONF_PAIR_GLOBAL("crl_file", FR_TYPE_STRING, CONF_FLAG_FILE_READABLE, kafka_config_parse, kafka_config_dflt),
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "ssl.crl.location", .empty_default = true }},
 
        /*
         *      Sets the path to the public certificate file we present
         *      to the servers.
         */
-       { FR_CONF_FUNC("certificate_file", FR_TYPE_STRING, CONF_FLAG_FILE_READABLE, kafka_config_parse, kafka_config_dflt),
+       { FR_CONF_PAIR_GLOBAL("certificate_file", FR_TYPE_STRING, CONF_FLAG_FILE_READABLE, kafka_config_parse, kafka_config_dflt),
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "ssl.certificate.location", .empty_default = true }},
 
        /*
         *      Sets the path to the private key for our public
         *      certificate.
         */
-       { FR_CONF_FUNC("private_key_file", FR_TYPE_STRING, CONF_FLAG_FILE_READABLE, kafka_config_parse, kafka_config_dflt),
+       { FR_CONF_PAIR_GLOBAL("private_key_file", FR_TYPE_STRING, CONF_FLAG_FILE_READABLE, kafka_config_parse, kafka_config_dflt),
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "ssl.key.location", .empty_default = true }},
 
        /*
         *      Enable or disable certificate validation
         */
-       { FR_CONF_FUNC("require_cert", FR_TYPE_BOOL, 0, kafka_config_parse, kafka_config_dflt),
+       { FR_CONF_PAIR_GLOBAL("require_cert", FR_TYPE_BOOL, 0, kafka_config_parse, kafka_config_dflt),
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "enable.ssl.certificate.verification" }},
 
-       { FR_CONF_FUNC("check_cert_cn", FR_TYPE_BOOL, 0, kafka_config_parse, kafka_config_dflt),
+       { FR_CONF_PAIR_GLOBAL("check_cert_cn", FR_TYPE_BOOL, 0, kafka_config_parse, kafka_config_dflt),
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "ssl.endpoint.identification.algorithm",
                                          .mapping = kafka_check_cert_cn_table,
                                          .mapping_len = &kafka_check_cert_cn_table_len }},
@@ -859,74 +859,74 @@ conf_parser_t const kafka_connection_config[] = {
        /*
         *      Socket timeout
         */
-       { FR_CONF_FUNC("timeout", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt),
+       { FR_CONF_PAIR_GLOBAL("timeout", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt),
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "socket.timeout.ms" }},
 
        /*
         *      Close broker connections after this period.
         */
-       { FR_CONF_FUNC("idle_timeout", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt),
+       { FR_CONF_PAIR_GLOBAL("idle_timeout", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt),
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "connections.max.idle.ms" }},
 
        /*
         *      Maximum requests in flight (per connection).
         */
-       { FR_CONF_FUNC("max_requests_in_flight", FR_TYPE_UINT64, 0, kafka_config_parse, kafka_config_dflt),
+       { FR_CONF_PAIR_GLOBAL("max_requests_in_flight", FR_TYPE_UINT64, 0, kafka_config_parse, kafka_config_dflt),
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "max.in.flight.requests.per.connection" }},
 
        /*
         *      Socket send buffer.
         */
-       { FR_CONF_FUNC("send_buff", FR_TYPE_UINT64, 0, kafka_config_parse, kafka_config_dflt),
+       { FR_CONF_PAIR_GLOBAL("send_buff", FR_TYPE_UINT64, 0, kafka_config_parse, kafka_config_dflt),
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "socket.send.buffer.bytes" }},
 
        /*
         *      Socket recv buffer.
         */
-       { FR_CONF_FUNC("recv_buff", FR_TYPE_UINT64, 0, kafka_config_parse, kafka_config_dflt),
+       { FR_CONF_PAIR_GLOBAL("recv_buff", FR_TYPE_UINT64, 0, kafka_config_parse, kafka_config_dflt),
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "socket.receive.buffer.bytes" }},
 
        /*
         *      If true, send TCP keepalives
         */
-       { FR_CONF_FUNC("keepalive", FR_TYPE_BOOL, 0, kafka_config_parse, kafka_config_dflt),
+       { FR_CONF_PAIR_GLOBAL("keepalive", FR_TYPE_BOOL, 0, kafka_config_parse, kafka_config_dflt),
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "socket.keepalive.enable" }},
 
        /*
         *      If true, disable nagle algorithm
         */
-       { FR_CONF_FUNC("nodelay", FR_TYPE_BOOL, 0, kafka_config_parse, kafka_config_dflt),
+       { FR_CONF_PAIR_GLOBAL("nodelay", FR_TYPE_BOOL, 0, kafka_config_parse, kafka_config_dflt),
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "socket.nagle.disable" }},
 
        /*
         *      How long the DNS resolver cache is valid for
         */
-       { FR_CONF_FUNC("resolver_cache_ttl", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt),
+       { FR_CONF_PAIR_GLOBAL("resolver_cache_ttl", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt),
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "broker.address.ttl" }},
 
        /*
         *      Should we use A records, AAAA records or either
         *      when resolving broker addresses
         */
-       { FR_CONF_FUNC("resolver_addr_family", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt),
+       { FR_CONF_PAIR_GLOBAL("resolver_addr_family", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt),
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "broker.address.family" }},
 
        /*
         *      How many failures before we reconnect the connection
         */
-       { FR_CONF_FUNC("reconnection_failure_count", FR_TYPE_UINT32, 0, kafka_config_parse, kafka_config_dflt),
+       { FR_CONF_PAIR_GLOBAL("reconnection_failure_count", FR_TYPE_UINT32, 0, kafka_config_parse, kafka_config_dflt),
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "socket.max.fails" }},
 
        /*
         *      Initial time to wait before reconnecting.
         */
-       { FR_CONF_FUNC("reconnection_delay_initial", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt),
+       { FR_CONF_PAIR_GLOBAL("reconnection_delay_initial", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt),
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "reconnect.backoff.ms" }},
 
        /*
         *      Max time to wait before reconnecting.
         */
-       { FR_CONF_FUNC("reconnection_delay_max", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt),
+       { FR_CONF_PAIR_GLOBAL("reconnection_delay_max", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt),
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "reconnect.backoff.max.ms" }},
 
        CONF_PARSER_TERMINATOR
@@ -936,25 +936,25 @@ conf_parser_t const kafka_version_config[] = {
        /*
         *      Request the API version from connected brokers
         */
-       { FR_CONF_FUNC("request", FR_TYPE_BOOL, 0, kafka_config_parse, kafka_config_dflt),
+       { FR_CONF_PAIR_GLOBAL("request", FR_TYPE_BOOL, 0, kafka_config_parse, kafka_config_dflt),
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "api.version.request" }},
 
        /*
         *      How long to wait for a version response.
         */
-       { FR_CONF_FUNC("timeout", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt),
+       { FR_CONF_PAIR_GLOBAL("timeout", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt),
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "api.version.request.timeout.ms" }},
 
        /*
         *      How long to wait before retrying a version request.
         */
-       { FR_CONF_FUNC("retry_delay", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt),
+       { FR_CONF_PAIR_GLOBAL("retry_delay", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt),
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "api.version.fallback.ms" }},
 
        /*
         *      Default version to use if the version request fails.
         */
-       { FR_CONF_FUNC("default", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt),
+       { FR_CONF_PAIR_GLOBAL("default", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt),
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "broker.version.fallback" }},
 
        CONF_PARSER_TERMINATOR
@@ -964,37 +964,37 @@ conf_parser_t const kafka_metadata_config[] = {
        /*
         *      Interval between attempts to refresh metadata from brokers
         */
-       { FR_CONF_FUNC("refresh_interval", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt),
+       { FR_CONF_PAIR_GLOBAL("refresh_interval", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt),
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "topic.metadata.refresh.interval.ms" }},
 
        /*
         *      Interval between attempts to refresh metadata from brokers
         */
-       { FR_CONF_FUNC("max_age", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt),
+       { FR_CONF_PAIR_GLOBAL("max_age", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt),
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "metadata.max.age.ms" }},
 
        /*
         *       Used when a topic loses its leader
         */
-       { FR_CONF_FUNC("fast_refresh_interval", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt),
+       { FR_CONF_PAIR_GLOBAL("fast_refresh_interval", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt),
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "topic.metadata.refresh.fast.interval.ms" }},
 
        /*
         *       Used when a topic loses its leader to prevent spurious metadata changes
         */
-       { FR_CONF_FUNC("max_propagation", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt),
+       { FR_CONF_PAIR_GLOBAL("max_propagation", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt),
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "topic.metadata.propagation.max.ms" }},
 
        /*
         *      Use sparse metadata requests which use less bandwidth maps
         */
-       { FR_CONF_FUNC("refresh_sparse", FR_TYPE_BOOL, 0, kafka_config_parse, kafka_config_dflt),
+       { FR_CONF_PAIR_GLOBAL("refresh_sparse", FR_TYPE_BOOL, 0, kafka_config_parse, kafka_config_dflt),
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "topic.metadata.refresh.sparse" }},
 
        /*
         *      List of topics to ignore
         */
-       { FR_CONF_FUNC("blacklist", FR_TYPE_STRING, CONF_FLAG_MULTI, kafka_config_parse, kafka_config_dflt),
+       { FR_CONF_PAIR_GLOBAL("blacklist", FR_TYPE_STRING, CONF_FLAG_MULTI, kafka_config_parse, kafka_config_dflt),
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "topic.blacklist", .string_sep = ",", .empty_default = true }},
 
        CONF_PARSER_TERMINATOR
@@ -1011,44 +1011,44 @@ static conf_parser_t const kafka_base_producer_topic_config[] = {
         *      the names here so the raw-passthrough catch-all below
         *      doesn't try to hand them to rd_kafka_topic_conf_set.
         */
-       { FR_CONF_FUNC("value", FR_TYPE_STRING, 0, kafka_noop_parse, NULL) },
-       { FR_CONF_FUNC("key", FR_TYPE_STRING, 0, kafka_noop_parse, NULL) },
+       { FR_CONF_PAIR_GLOBAL("value", FR_TYPE_STRING, 0, kafka_noop_parse, NULL) },
+       { FR_CONF_PAIR_GLOBAL("key", FR_TYPE_STRING, 0, kafka_noop_parse, NULL) },
 
        /*
         *      This field indicates the number of acknowledgements the leader
         *      broker must receive from ISR brokers before responding to the request.
         */
-       { FR_CONF_FUNC("request_required_acks", FR_TYPE_INT16, 0, kafka_topic_config_parse, kafka_topic_config_dflt),
+       { FR_CONF_PAIR_GLOBAL("request_required_acks", FR_TYPE_INT16, 0, kafka_topic_config_parse, kafka_topic_config_dflt),
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "request.required.acks" }},
 
        /*
         *      medium  The ack timeout of the producer request in milliseconds
         */
-       { FR_CONF_FUNC("request_timeout", FR_TYPE_TIME_DELTA, 0, kafka_topic_config_parse, kafka_topic_config_dflt),
+       { FR_CONF_PAIR_GLOBAL("request_timeout", FR_TYPE_TIME_DELTA, 0, kafka_topic_config_parse, kafka_topic_config_dflt),
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "request.timeout.ms" }},
 
        /*
         *      Local message timeout
         */
-       { FR_CONF_FUNC("message_timeout", FR_TYPE_TIME_DELTA, 0, kafka_topic_config_parse, kafka_topic_config_dflt),
+       { FR_CONF_PAIR_GLOBAL("message_timeout", FR_TYPE_TIME_DELTA, 0, kafka_topic_config_parse, kafka_topic_config_dflt),
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "message.timeout.ms" }},
 
        /*
         *      Partitioning strategy
         */
-       { FR_CONF_FUNC("partitioner", FR_TYPE_STRING, 0, kafka_topic_config_parse, kafka_topic_config_dflt),
+       { FR_CONF_PAIR_GLOBAL("partitioner", FR_TYPE_STRING, 0, kafka_topic_config_parse, kafka_topic_config_dflt),
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "partitioner" }},
 
        /*
         *      compression codec to use for compressing message sets.
         */
-       { FR_CONF_FUNC("compression_type", FR_TYPE_STRING, 0, kafka_topic_config_parse, kafka_topic_config_dflt),
+       { FR_CONF_PAIR_GLOBAL("compression_type", FR_TYPE_STRING, 0, kafka_topic_config_parse, kafka_topic_config_dflt),
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "compression.type" }},
 
        /*
         *      compression level to use
         */
-       { FR_CONF_FUNC("compression_level", FR_TYPE_INT8, 0, kafka_topic_config_parse, kafka_topic_config_dflt),
+       { FR_CONF_PAIR_GLOBAL("compression_level", FR_TYPE_INT8, 0, kafka_topic_config_parse, kafka_topic_config_dflt),
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "compression.level" }},
 
        /*
@@ -1093,37 +1093,37 @@ conf_parser_t const kafka_consumer_group_config[] = {
        /*
         *      Group consumer is a member of
         */
-       { FR_CONF_FUNC("id", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt),
+       { FR_CONF_PAIR_GLOBAL("id", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt),
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "group.id" }},
 
        /*
         *      A unique identifier of the consumer instance provided by the end user
         */
-       { FR_CONF_FUNC("instance_id", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt),
+       { FR_CONF_PAIR_GLOBAL("instance_id", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt),
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "group.instance.id" }},
 
        /*
         *      Range or roundrobin
         */
-       { FR_CONF_FUNC("partition_assignment_strategy", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt),
+       { FR_CONF_PAIR_GLOBAL("partition_assignment_strategy", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt),
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "partition.assignment.strategy" }},
 
        /*
         *      Client group session and failure detection timeout.
         */
-       { FR_CONF_FUNC("session_timeout", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt),
+       { FR_CONF_PAIR_GLOBAL("session_timeout", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt),
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "session.timeout.ms" }},
 
        /*
         *      Group session keepalive heartbeat interval.
         */
-       { FR_CONF_FUNC("heartbeat_interval", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt),
+       { FR_CONF_PAIR_GLOBAL("heartbeat_interval", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt),
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "heartbeat.interval.ms" }},
 
        /*
         *      How often to query for the current client group coordinator
         */
-       { FR_CONF_FUNC("coordinator_query_interval", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt),
+       { FR_CONF_PAIR_GLOBAL("coordinator_query_interval", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt),
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "coordinator.query.interval.ms" }},
 
 
@@ -1136,14 +1136,14 @@ conf_parser_t const kafka_base_consumer_topic_config[] = {
         *
         *      High numbers may starve the worker thread
         */
-       { FR_CONF_FUNC("max_messages_per_cycle", FR_TYPE_UINT32, 0, kafka_config_parse, kafka_config_dflt),
+       { FR_CONF_PAIR_GLOBAL("max_messages_per_cycle", FR_TYPE_UINT32, 0, kafka_config_parse, kafka_config_dflt),
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "consume.callback.max.messages" }},
 
        /*
         *      Action to take when there is no initial offset
         *      in offset store or the desired offset is out of range.
         */
-       { FR_CONF_FUNC("auto_offset_reset", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt),
+       { FR_CONF_PAIR_GLOBAL("auto_offset_reset", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt),
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "auto.offset.reset" }},
 
        /*
index 83a3a27f57391809b43b18a90341f7bb325e7dcb..5b72a153c386b2b221ff5bc39adfcb8ab82fe463 100644 (file)
@@ -48,11 +48,6 @@ typedef struct fr_kafka_topic_s fr_kafka_topic_t;
 typedef struct {
        rd_kafka_conf_t         *conf;
 
-       fr_time_delta_t         flush_timeout;  //!< How long to wait for in-flight messages to drain
-                                               //!< on producer teardown.  Also doubles as the consumer's
-                                               //!< close timeout.  Needed by every user of the library, so
-                                               //!< it lives here rather than being duplicated per-module.
-
        fr_rb_tree_t            *topics;        //!< Declared topics, keyed by name.  Populated during
                                                //!< config parsing by the per-topic hook on the `topic { }`
                                                //!< subsection; use `kafka_topic_lookup` to query.
@@ -94,7 +89,7 @@ typedef struct {
 
 /** Generic librdkafka-property parser used by `KAFKA_BASE_PRODUCER_CONFIG` entries.
  *
- * Exposed so the macro's FR_CONF_FUNC entries can reference it from any TU.
+ * Exposed so the macro's FR_CONF_PAIR_GLOBAL entries can reference it from any TU.
  */
 int kafka_config_parse(TALLOC_CTX *ctx, void *out, void *base, CONF_ITEM *ci, conf_parser_t const *rule);
 
@@ -159,31 +154,31 @@ extern conf_parser_t const kafka_base_topic_properties_config[];
  */
 #define KAFKA_BASE_CONFIG \
        /* Initial list of brokers. librdkafka only needs one it can reach. */ \
-       { FR_CONF_FUNC("server", FR_TYPE_STRING, CONF_FLAG_REQUIRED | CONF_FLAG_MULTI, kafka_config_parse, kafka_config_dflt), \
+       { FR_CONF_PAIR_GLOBAL("server", FR_TYPE_STRING, CONF_FLAG_REQUIRED | CONF_FLAG_MULTI, kafka_config_parse, kafka_config_dflt), \
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "metadata.broker.list", .string_sep = "," }}, \
        /* Identifier sent with each request to brokers. */ \
-       { FR_CONF_FUNC("client_id", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt), \
+       { FR_CONF_PAIR_GLOBAL("client_id", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt), \
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "client.id" }}, \
        /* Rack identifier for rack-aware fetch-from-follower. */ \
-       { FR_CONF_FUNC("rack_id", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt), \
+       { FR_CONF_PAIR_GLOBAL("rack_id", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt), \
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "client.rack" }}, \
        /* Max size of a message the broker will accept. */ \
-       { FR_CONF_FUNC("request_max_size", FR_TYPE_SIZE, 0, kafka_config_parse, kafka_config_dflt), \
+       { FR_CONF_PAIR_GLOBAL("request_max_size", FR_TYPE_SIZE, 0, kafka_config_parse, kafka_config_dflt), \
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "message.max.bytes" }}, \
        /* Max size of a message copied into librdkafka's send buffer. */ \
-       { FR_CONF_FUNC("request_copy_max_size", FR_TYPE_SIZE, 0, kafka_config_parse, kafka_config_dflt), \
+       { FR_CONF_PAIR_GLOBAL("request_copy_max_size", FR_TYPE_SIZE, 0, kafka_config_parse, kafka_config_dflt), \
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "message.copy.max.bytes" }}, \
        /* Max size of a response from a broker. */ \
-       { FR_CONF_FUNC("response_max_size", FR_TYPE_SIZE, 0, kafka_config_parse, kafka_config_dflt), \
+       { FR_CONF_PAIR_GLOBAL("response_max_size", FR_TYPE_SIZE, 0, kafka_config_parse, kafka_config_dflt), \
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "receive.message.max.bytes" }}, \
        /* Compile-time features to enable (comma-separated). */ \
-       { FR_CONF_FUNC("feature", FR_TYPE_STRING, CONF_FLAG_MULTI, kafka_config_parse, kafka_config_dflt), \
+       { FR_CONF_PAIR_GLOBAL("feature", FR_TYPE_STRING, CONF_FLAG_MULTI, kafka_config_parse, kafka_config_dflt), \
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "builtin.features", .string_sep = "," }}, \
        /* Comma-separated list of debug contexts to enable. */ \
-       { FR_CONF_FUNC("debug", FR_TYPE_STRING, CONF_FLAG_MULTI, kafka_config_parse, kafka_config_dflt), \
+       { FR_CONF_PAIR_GLOBAL("debug", FR_TYPE_STRING, CONF_FLAG_MULTI, kafka_config_parse, kafka_config_dflt), \
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "debug", .string_sep = "," }}, \
        /* Semicolon-separated plugin library paths to load. */ \
-       { FR_CONF_FUNC("plugin", FR_TYPE_STRING, CONF_FLAG_MULTI, kafka_config_parse, NULL), \
+       { FR_CONF_PAIR_GLOBAL("plugin", FR_TYPE_STRING, CONF_FLAG_MULTI, kafka_config_parse, NULL), \
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "plugin.library.paths", .string_sep = ";" }}, \
        { FR_CONF_SUBSECTION_GLOBAL("metadata", 0, kafka_metadata_config) }, \
        { FR_CONF_SUBSECTION_GLOBAL("version", 0, kafka_version_config) }, \
@@ -193,10 +188,7 @@ extern conf_parser_t const kafka_base_topic_properties_config[];
        /* Escape-hatch for librdkafka client properties we don't enumerate. \
         * Contents are fed verbatim to rd_kafka_conf_set - no type dispatch, \
         * the user writes librdkafka-native units (e.g. "500" for ms). */ \
-       { FR_CONF_SUBSECTION_GLOBAL("properties", 0, kafka_base_properties_config) }, \
-       /* How long producer teardown / consumer close wait for in-flight messages \
-        * to drain before tearing down the handle.  Lost on expiry. */ \
-       { FR_CONF_OFFSET("flush_timeout", fr_kafka_conf_t, flush_timeout), .dflt = "5s" }
+       { FR_CONF_SUBSECTION_GLOBAL("properties", 0, kafka_base_properties_config) }
 
 /** Producer-only delta: librdkafka producer tuning + declared topics.
  *
@@ -215,46 +207,46 @@ extern conf_parser_t const kafka_base_topic_properties_config[];
  */
 #define KAFKA_PRODUCER_CONFIG \
        /* Enables the transactional producer. */ \
-       { FR_CONF_FUNC("transactional_id", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt), \
+       { FR_CONF_PAIR_GLOBAL("transactional_id", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt), \
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "transactional.id", .empty_default = true }}, \
        /* Maximum time the transaction coordinator will wait for a status update \
         * from the producer before proactively aborting the transaction. */ \
-       { FR_CONF_FUNC("transaction_timeout", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt), \
+       { FR_CONF_PAIR_GLOBAL("transaction_timeout", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt), \
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "transaction.timeout.ms" }}, \
        /* Ensures exactly-once, in-order delivery per partition. \
         * Requires acks=all semantics broker-side. */ \
-       { FR_CONF_FUNC("idempotence", FR_TYPE_BOOL, 0, kafka_config_parse, kafka_config_dflt), \
+       { FR_CONF_PAIR_GLOBAL("idempotence", FR_TYPE_BOOL, 0, kafka_config_parse, kafka_config_dflt), \
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "enable.idempotence" }}, \
        /* Fail any error that would cause a gap in the produced message series. */ \
-       { FR_CONF_FUNC("gapless_guarantee", FR_TYPE_BOOL, 0, kafka_config_parse, kafka_config_dflt), \
+       { FR_CONF_PAIR_GLOBAL("gapless_guarantee", FR_TYPE_BOOL, 0, kafka_config_parse, kafka_config_dflt), \
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "enable.gapless.guarantee" }}, \
        /* Max number of messages buffered across all topics/partitions. \
         * Produce fails synchronously (QUEUE_FULL) once hit. */ \
-       { FR_CONF_FUNC("queue_max_messages", FR_TYPE_UINT32, 0, kafka_config_parse, kafka_config_dflt), \
+       { FR_CONF_PAIR_GLOBAL("queue_max_messages", FR_TYPE_UINT32, 0, kafka_config_parse, kafka_config_dflt), \
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "queue.buffering.max.messages" }}, \
        /* Max total size of buffered messages (bytes, scaled from kbytes). */ \
-       { FR_CONF_FUNC("queue_max_size", FR_TYPE_SIZE, 0, kafka_config_parse, kafka_config_dflt), \
+       { FR_CONF_PAIR_GLOBAL("queue_max_size", FR_TYPE_SIZE, 0, kafka_config_parse, kafka_config_dflt), \
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "queue.buffering.max.kbytes", .size_scale = 1024 }}, \
        /* Linger time before a batch is sent, for producer-side batching. */ \
-       { FR_CONF_FUNC("queue_max_delay", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt), \
+       { FR_CONF_PAIR_GLOBAL("queue_max_delay", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt), \
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "queue.buffering.max.ms" }}, \
        /* Max number of retries per failed send. */ \
-       { FR_CONF_FUNC("message_retry_max", FR_TYPE_UINT32, 0, kafka_config_parse, kafka_config_dflt), \
+       { FR_CONF_PAIR_GLOBAL("message_retry_max", FR_TYPE_UINT32, 0, kafka_config_parse, kafka_config_dflt), \
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "message.send.max.retries" }}, \
        /* Backoff between retries of a protocol request. */ \
-       { FR_CONF_FUNC("message_retry_interval", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt), \
+       { FR_CONF_PAIR_GLOBAL("message_retry_interval", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt), \
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "retry.backoff.ms" }}, \
        /* Outstanding-request threshold at which the accumulator backpressures. */ \
-       { FR_CONF_FUNC("backpressure_threshold", FR_TYPE_UINT32, 0, kafka_config_parse, kafka_config_dflt), \
+       { FR_CONF_PAIR_GLOBAL("backpressure_threshold", FR_TYPE_UINT32, 0, kafka_config_parse, kafka_config_dflt), \
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "queue.buffering.backpressure.threshold" }}, \
        /* Compression codec: none, gzip, snappy, lz4, zstd. */ \
-       { FR_CONF_FUNC("compression_type", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt), \
+       { FR_CONF_PAIR_GLOBAL("compression_type", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt), \
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "compression.type" }}, \
        /* Max size (bytes) of all messages batched into one MessageSet. */ \
-       { FR_CONF_FUNC("batch_size", FR_TYPE_SIZE, 0, kafka_config_parse, kafka_config_dflt), \
+       { FR_CONF_PAIR_GLOBAL("batch_size", FR_TYPE_SIZE, 0, kafka_config_parse, kafka_config_dflt), \
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "batch.size" }}, \
        /* Delay before reassigning sticky partitions per topic. */ \
-       { FR_CONF_FUNC("sticky_partition_delay", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt), \
+       { FR_CONF_PAIR_GLOBAL("sticky_partition_delay", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt), \
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "sticky.partitioning.linger.ms" }}, \
        /* Declared topics.  Topic-level conf is stashed on each subsection via \
         * cf_data_add; the subcs_size is a dummy because cf_parse asserts on \
@@ -279,49 +271,49 @@ extern conf_parser_t const kafka_base_topic_properties_config[];
        /* Consumer-group membership config (id, instance_id, session_timeout, ...). */ \
        { FR_CONF_SUBSECTION_GLOBAL("group", 0, kafka_consumer_group_config) }, \
        /* Max allowed time between calls to consume messages. */ \
-       { FR_CONF_FUNC("max_poll_interval", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt), \
+       { FR_CONF_PAIR_GLOBAL("max_poll_interval", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt), \
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "max.poll.interval.ms" }}, \
        /* Whether offsets are committed automatically. */ \
-       { FR_CONF_FUNC("auto_commit", FR_TYPE_BOOL, 0, kafka_config_parse, kafka_config_dflt), \
+       { FR_CONF_PAIR_GLOBAL("auto_commit", FR_TYPE_BOOL, 0, kafka_config_parse, kafka_config_dflt), \
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "enable.auto.commit" }}, \
        /* Interval between auto commits. */ \
-       { FR_CONF_FUNC("auto_commit_interval", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt), \
+       { FR_CONF_PAIR_GLOBAL("auto_commit_interval", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt), \
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "auto.commit.interval.ms" }}, \
        /* Automatically store the offset of the last message handed to the application. */ \
-       { FR_CONF_FUNC("auto_offset_store", FR_TYPE_BOOL, 0, kafka_config_parse, kafka_config_dflt), \
+       { FR_CONF_PAIR_GLOBAL("auto_offset_store", FR_TYPE_BOOL, 0, kafka_config_parse, kafka_config_dflt), \
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "enable.auto.offset.store" }}, \
        /* Min number of messages per topic+partition librdkafka keeps locally. */ \
-       { FR_CONF_FUNC("queued_messages_min", FR_TYPE_UINT64, 0, kafka_config_parse, kafka_config_dflt), \
+       { FR_CONF_PAIR_GLOBAL("queued_messages_min", FR_TYPE_UINT64, 0, kafka_config_parse, kafka_config_dflt), \
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "queued.min.messages" }}, \
        /* Max total size of pre-fetched messages in the local consumer queue. */ \
-       { FR_CONF_FUNC("queued_messages_max_size", FR_TYPE_SIZE, 0, kafka_config_parse, kafka_config_dflt), \
+       { FR_CONF_PAIR_GLOBAL("queued_messages_max_size", FR_TYPE_SIZE, 0, kafka_config_parse, kafka_config_dflt), \
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "queued.max.messages.kbytes", .size_scale = 1024 }}, \
        /* Max time the broker may wait to fill the Fetch response. */ \
-       { FR_CONF_FUNC("fetch_wait_max", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt), \
+       { FR_CONF_PAIR_GLOBAL("fetch_wait_max", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt), \
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "fetch.wait.max.ms" }}, \
        /* Initial per-partition fetch size. */ \
-       { FR_CONF_FUNC("fetch_message_max_size", FR_TYPE_SIZE, 0, kafka_config_parse, kafka_config_dflt), \
+       { FR_CONF_PAIR_GLOBAL("fetch_message_max_size", FR_TYPE_SIZE, 0, kafka_config_parse, kafka_config_dflt), \
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "fetch.message.max.bytes" }}, \
        /* Max bytes per topic+partition in a Fetch response. */ \
-       { FR_CONF_FUNC("fetch_partition_max_size", FR_TYPE_SIZE, 0, kafka_config_parse, kafka_config_dflt), \
+       { FR_CONF_PAIR_GLOBAL("fetch_partition_max_size", FR_TYPE_SIZE, 0, kafka_config_parse, kafka_config_dflt), \
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "max.partition.fetch.bytes" }}, \
        /* Max total data a broker may return for a single Fetch request. */ \
-       { FR_CONF_FUNC("fetch_max_size", FR_TYPE_SIZE, 0, kafka_config_parse, kafka_config_dflt), \
+       { FR_CONF_PAIR_GLOBAL("fetch_max_size", FR_TYPE_SIZE, 0, kafka_config_parse, kafka_config_dflt), \
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "fetch.max.bytes" }}, \
        /* Min bytes the broker responds with. */ \
-       { FR_CONF_FUNC("fetch_min_size", FR_TYPE_SIZE, 0, kafka_config_parse, kafka_config_dflt), \
+       { FR_CONF_PAIR_GLOBAL("fetch_min_size", FR_TYPE_SIZE, 0, kafka_config_parse, kafka_config_dflt), \
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "fetch.min.bytes" }}, \
        /* How long to wait before retrying a fetch after an error. */ \
-       { FR_CONF_FUNC("fetch_error_backoff", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt), \
+       { FR_CONF_PAIR_GLOBAL("fetch_error_backoff", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt), \
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "fetch.error.backoff.ms" }}, \
        /* How to read messages written transactionally (read_committed / read_uncommitted). */ \
-       { FR_CONF_FUNC("isolation_level", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt), \
+       { FR_CONF_PAIR_GLOBAL("isolation_level", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt), \
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "isolation.level" }}, \
        /* Verify CRC32 of every consumed message. */ \
-       { FR_CONF_FUNC("check_crcs", FR_TYPE_BOOL, 0, kafka_config_parse, kafka_config_dflt), \
+       { FR_CONF_PAIR_GLOBAL("check_crcs", FR_TYPE_BOOL, 0, kafka_config_parse, kafka_config_dflt), \
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "check.crcs" }}, \
        /* Allow automatic topic creation when subscribing to an unknown topic. */ \
-       { FR_CONF_FUNC("auto_create_topic", FR_TYPE_BOOL, 0, kafka_config_parse, kafka_config_dflt), \
+       { FR_CONF_PAIR_GLOBAL("auto_create_topic", FR_TYPE_BOOL, 0, kafka_config_parse, kafka_config_dflt), \
          .uctx = &(fr_kafka_conf_ctx_t){ .property = "allow.auto.create.topics" }}, \
        /* Declared subscription topics; same layering as the producer form. */ \
        { FR_CONF_SUBSECTION_GLOBAL("topic", 0, kafka_base_consumer_topics_config) }
index cf841aebcab9b29d6c7de944acfbe75b9512a74c..31269338f748344cbb9e87c2be7f77247bae60dd 100644 (file)
@@ -912,12 +912,10 @@ static int cf_section_parse_init(CONF_SECTION *cs, void *base, conf_parser_t con
        }
 
        /*
-        *      FR_CONF_FUNC rules own their own output routing via the
-        *      parse function - there's no guaranteed offset into `base`
-        *      for us to NULL out.  The func decides where the result
-        *      lives.
+        *      CONF_FLAG_NO_OUTPUT means the rule has no framework-managed
+        *      output slot - nothing to NULL-init.
         */
-       if (rule->func) return 0;
+       if (rule->flags & CONF_FLAG_NO_OUTPUT) return 0;
 
        if (rule->data) {
                *(char **) rule->data = NULL;
@@ -992,6 +990,8 @@ static int cf_subsection_parse(TALLOC_CTX *ctx, void *out, void *base, CONF_SECT
        bool const              skip_parsed = (rule->name1 == CF_IDENT_ANY) &&
                                              !(rule->flags & CONF_FLAG_ALWAYS_PARSE);
 
+       bool const              no_output = (rule->flags & CONF_FLAG_NO_OUTPUT);
+
        uint8_t                 **array = NULL;
 
        fr_assert(rule->flags & CONF_FLAG_SUBSECTION);
@@ -1028,7 +1028,7 @@ static int cf_subsection_parse(TALLOC_CTX *ctx, void *out, void *base, CONF_SECT
                 */
                if (!subcs_size) return cf_section_parse(ctx, out, subcs);
 
-               if (out) {
+               if (out && !no_output) {
                        MEM(buff = talloc_zero_array(ctx, uint8_t, subcs_size));
                        if (rule->subcs_type) talloc_set_name_const(buff, rule->subcs_type);
                }
@@ -1039,7 +1039,7 @@ static int cf_subsection_parse(TALLOC_CTX *ctx, void *out, void *base, CONF_SECT
                        return ret;
                }
 
-               if (out) *((uint8_t **)out) = buff;
+               if (out && !no_output) *((uint8_t **)out) = buff;
 
                return 0;
        }
@@ -1058,7 +1058,7 @@ static int cf_subsection_parse(TALLOC_CTX *ctx, void *out, void *base, CONF_SECT
        /*
         *      Allocate an array to hold the subsections
         */
-       if (out) {
+       if (out && !no_output) {
                MEM(array = talloc_zero_array(ctx, uint8_t *, count));
                if (rule->subcs_type) talloc_set_name(array, "%s *", rule->subcs_type);
        }
@@ -1109,7 +1109,7 @@ static int cf_subsection_parse(TALLOC_CTX *ctx, void *out, void *base, CONF_SECT
                }
        }
 
-       if (out) *((uint8_t ***)out) = array;
+       if (out && !no_output) *((uint8_t ***)out) = array;
 
        return 0;
 }
@@ -1133,7 +1133,16 @@ static int cf_section_parse_rule(TALLOC_CTX *ctx, void *base, CONF_SECTION *cs,
 
        if (rule->data) {
                data = rule->data; /* prefer this. */
-       } else if (base) {
+       } else if (base &&
+                  (!(rule->flags & CONF_FLAG_NO_OUTPUT) || (rule->flags & CONF_FLAG_SUBSECTION))) {
+               /*
+                *      CONF_FLAG_NO_OUTPUT on a pair rule means leave
+                *      data NULL so the framework won't write through
+                *      base+offset.  For subsections it only suppresses
+                *      the end-of-MULTI array write (handled inside
+                *      cf_subsection_parse) - we still need to pass
+                *      `base` through so nested rules can address into it.
+                */
                data = ((uint8_t *)base) + rule->offset;
        }
 
@@ -1406,7 +1415,7 @@ int cf_section_parse_pass2(void *base, CONF_SECTION *cs)
                         *      Select base by whether this is a nested struct,
                         *      or a pointer to another struct.
                         */
-                       if (!base) {
+                       if (!base || (flags & CONF_FLAG_NO_OUTPUT)) {
                                subcs_base = NULL;
                        } else if (multi) {
                                size_t          j, len;
@@ -1437,7 +1446,7 @@ int cf_section_parse_pass2(void *base, CONF_SECTION *cs)
                 *      Figure out which data we need to fix.
                 */
                data = rule->data; /* prefer this. */
-               if (!data && base) data = ((char *)base) + rule->offset;
+               if (!data && base && !(rule->flags & CONF_FLAG_NO_OUTPUT)) data = ((char *)base) + rule->offset;
                if (!data) continue;
 
                /*
index 82ee0a797f68ac6729b78727d5f3088ab4ca0ecc..7cfc2c6266b4ce27ff16a0ceaaf80158b2df1259 100644 (file)
@@ -382,10 +382,10 @@ _Generic(&(_ct), \
  * @param[in] _func            to use to record value.
  * @param[in] _dflt_func       to use to get defaults from a 3rd party library.
  */
-#  define FR_CONF_FUNC(_name, _type, _flags, _func, _dflt_func) \
+#  define FR_CONF_PAIR_GLOBAL(_name, _type, _flags, _func, _dflt_func) \
        .name1 = _name, \
        .type = (_type), \
-       .flags = (_flags), \
+       .flags = CONF_FLAG_NO_OUTPUT | (_flags), \
        .func = _func, \
        .dflt_func = _dflt_func
 
@@ -397,7 +397,7 @@ _Generic(&(_ct), \
  */
 #  define FR_CONF_SUBSECTION_GLOBAL(_name, _flags, _subcs) \
        .name1 = _name, \
-       .flags = CONF_FLAG_SUBSECTION | (_flags), \
+       .flags = CONF_FLAG_SUBSECTION | CONF_FLAG_NO_OUTPUT | (_flags), \
        .subcs = _subcs
 
 /** conf_parser_t entry which raises an error if a matching CONF_PAIR is found
@@ -457,10 +457,17 @@ typedef enum CC_HINT(flag_enum) {
        CONF_FLAG_REF                   = (1 << 25),                    //!< reference another conf_parser_t inline in this one
        CONF_FLAG_OPTIONAL              = (1 << 26),                    //!< subsection is pushed only if a non-optional matching one is pushed
        CONF_FLAG_ALWAYS_PARSE          = (1 << 27),                    //!< Run this rule even against items already marked
-                                                                               ///< parsed.  Useful for CF_IDENT_ANY observer rules that
-                                                                               ///< want to see every item, not just the leftovers, or for
-                                                                               ///< a second-pass rule that needs to re-examine something
-                                                                               ///< an earlier rule already claimed.
+                                                                       ///< parsed.  Useful for CF_IDENT_ANY observer rules that
+                                                                       ///< want to see every item, not just the leftovers, or for
+                                                                       ///< a second-pass rule that needs to re-examine something
+                                                                       ///< an earlier rule already claimed.
+       CONF_FLAG_NO_OUTPUT             = (1 << 28),                    //!< Rule has no framework-managed output destination.
+                                                                       ///< The parse function owns its own side effects - the
+                                                                       ///< framework will not pre-zero `base + offset` for
+                                                                       ///< strings, nor write a MULTI array pointer through
+                                                                       ///< that offset.  Set automatically by `FR_CONF_PAIR_GLOBAL`
+                                                                       ///< so its default `offset = 0` doesn't clobber the
+                                                                       ///< first field of `base`.
 } conf_parser_flags_t;
 DIAG_ON(attributes)