/** @name Base conf (`fr_kafka_conf_t`)
*
- * Lifecycle, lazy-init + talloc sentinel, and the FR_CONF_FUNC parsers for
+ * Lifecycle, lazy-init + talloc sentinel, and the FR_CONF_PAIR_GLOBAL parsers for
* the top-level `kafka { ... }` section.
*
* @{
/** @name Topic conf (`fr_kafka_topic_conf_t` + `fr_kafka_topic_t`)
*
- * Per-topic lifecycle, FR_CONF_FUNC parsers for entries inside a declared
+ * Per-topic lifecycle, FR_CONF_PAIR_GLOBAL parsers for entries inside a declared
* topic subsection, and the subsection hook that indexes each declared
* topic onto `fr_kafka_conf_t.topics`.
*
};
static conf_parser_t const kafka_sasl_oauth_config[] = {
- { FR_CONF_FUNC("oauthbearer_conf", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt),
+ { FR_CONF_PAIR_GLOBAL("oauthbearer_conf", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt),
.uctx = &(fr_kafka_conf_ctx_t){ .property = "sasl.oauthbearer.config", .empty_default = true }},
- { FR_CONF_FUNC("unsecure_jwt", FR_TYPE_BOOL, 0, kafka_config_parse, kafka_config_dflt),
+ { FR_CONF_PAIR_GLOBAL("unsecure_jwt", FR_TYPE_BOOL, 0, kafka_config_parse, kafka_config_dflt),
.uctx = &(fr_kafka_conf_ctx_t){ .property = "enable.sasl.oauthbearer.unsecure.jwt" }},
CONF_PARSER_TERMINATOR
/*
* Service principal
*/
- { FR_CONF_FUNC("service_name", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt),
+ { FR_CONF_PAIR_GLOBAL("service_name", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt),
.uctx = &(fr_kafka_conf_ctx_t){ .property = "sasl.kerberos.service.name" }},
/*
* Principal
*/
- { FR_CONF_FUNC("principal", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt),
+ { FR_CONF_PAIR_GLOBAL("principal", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt),
.uctx = &(fr_kafka_conf_ctx_t){ .property = "sasl.kerberos.principal" }},
/*
* knit cmd
*/
- { FR_CONF_FUNC("kinit_cmd", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt),
+ { FR_CONF_PAIR_GLOBAL("kinit_cmd", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt),
.uctx = &(fr_kafka_conf_ctx_t){ .property = "sasl.kerberos.kinit.cmd" }},
/*
* keytab
*/
- { FR_CONF_FUNC("keytab", FR_TYPE_STRING, CONF_FLAG_FILE_READABLE, kafka_config_parse, kafka_config_dflt),
+ { FR_CONF_PAIR_GLOBAL("keytab", FR_TYPE_STRING, CONF_FLAG_FILE_READABLE, kafka_config_parse, kafka_config_dflt),
.uctx = &(fr_kafka_conf_ctx_t){ .property = "sasl.kerberos.kinit.keytab", .empty_default = true }},
/*
* How long between key refreshes
*/
- { FR_CONF_FUNC("refresh_delay", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt),
+ { FR_CONF_PAIR_GLOBAL("refresh_delay", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt),
.uctx = &(fr_kafka_conf_ctx_t){ .property = "sasl.kerberos.min.time.before.relogin" }},
CONF_PARSER_TERMINATOR
/*
* SASL mechanism
*/
- { FR_CONF_FUNC("mech", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt),
+ { FR_CONF_PAIR_GLOBAL("mech", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt),
.uctx = &(fr_kafka_conf_ctx_t){ .property = "sasl.mechanism" }},
/*
* Static SASL username
*/
- { FR_CONF_FUNC("username", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt),
+ { FR_CONF_PAIR_GLOBAL("username", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt),
.uctx = &(fr_kafka_conf_ctx_t){ .property = "sasl.username", .empty_default = true }},
/*
* Static SASL password
*/
- { FR_CONF_FUNC("password", FR_TYPE_STRING, CONF_FLAG_SECRET, kafka_config_parse, kafka_config_dflt),
+ { FR_CONF_PAIR_GLOBAL("password", FR_TYPE_STRING, CONF_FLAG_SECRET, kafka_config_parse, kafka_config_dflt),
.uctx = &(fr_kafka_conf_ctx_t){ .property = "sasl.password", .empty_default = true }},
{ FR_CONF_SUBSECTION_GLOBAL("kerberos", 0, kafka_sasl_kerberos_config) },
/*
* Cipher suite list in OpenSSL's format
*/
- { FR_CONF_FUNC("cipher_list", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt),
+ { FR_CONF_PAIR_GLOBAL("cipher_list", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt),
.uctx = &(fr_kafka_conf_ctx_t){ .property = "ssl.cipher.suites", .empty_default = true }},
/*
* Curves list in OpenSSL's format
*/
- { FR_CONF_FUNC("curve_list", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt),
+ { FR_CONF_PAIR_GLOBAL("curve_list", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt),
.uctx = &(fr_kafka_conf_ctx_t){ .property = "ssl.curves.list", .empty_default = true }},
/*
* Curves list in OpenSSL's format
*/
- { FR_CONF_FUNC("sigalg_list", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt),
+ { FR_CONF_PAIR_GLOBAL("sigalg_list", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt),
.uctx = &(fr_kafka_conf_ctx_t){ .property = "ssl.sigalgs.list", .empty_default = true }},
/*
* Sets the full path to a CA certificate (used to validate
* the certificate the server presents).
*/
- { FR_CONF_FUNC("ca_file", FR_TYPE_STRING, CONF_FLAG_FILE_READABLE, kafka_config_parse, kafka_config_dflt),
+ { FR_CONF_PAIR_GLOBAL("ca_file", FR_TYPE_STRING, CONF_FLAG_FILE_READABLE, kafka_config_parse, kafka_config_dflt),
.uctx = &(fr_kafka_conf_ctx_t){ .property = "ssl.ca.location", .empty_default = true }},
/*
* Location of the CRL file.
*/
- { FR_CONF_FUNC("crl_file", FR_TYPE_STRING, CONF_FLAG_FILE_READABLE, kafka_config_parse, kafka_config_dflt),
+ { FR_CONF_PAIR_GLOBAL("crl_file", FR_TYPE_STRING, CONF_FLAG_FILE_READABLE, kafka_config_parse, kafka_config_dflt),
.uctx = &(fr_kafka_conf_ctx_t){ .property = "ssl.crl.location", .empty_default = true }},
/*
* Sets the path to the public certificate file we present
* to the servers.
*/
- { FR_CONF_FUNC("certificate_file", FR_TYPE_STRING, CONF_FLAG_FILE_READABLE, kafka_config_parse, kafka_config_dflt),
+ { FR_CONF_PAIR_GLOBAL("certificate_file", FR_TYPE_STRING, CONF_FLAG_FILE_READABLE, kafka_config_parse, kafka_config_dflt),
.uctx = &(fr_kafka_conf_ctx_t){ .property = "ssl.certificate.location", .empty_default = true }},
/*
* Sets the path to the private key for our public
* certificate.
*/
- { FR_CONF_FUNC("private_key_file", FR_TYPE_STRING, CONF_FLAG_FILE_READABLE, kafka_config_parse, kafka_config_dflt),
+ { FR_CONF_PAIR_GLOBAL("private_key_file", FR_TYPE_STRING, CONF_FLAG_FILE_READABLE, kafka_config_parse, kafka_config_dflt),
.uctx = &(fr_kafka_conf_ctx_t){ .property = "ssl.key.location", .empty_default = true }},
/*
* Enable or disable certificate validation
*/
- { FR_CONF_FUNC("require_cert", FR_TYPE_BOOL, 0, kafka_config_parse, kafka_config_dflt),
+ { FR_CONF_PAIR_GLOBAL("require_cert", FR_TYPE_BOOL, 0, kafka_config_parse, kafka_config_dflt),
.uctx = &(fr_kafka_conf_ctx_t){ .property = "enable.ssl.certificate.verification" }},
- { FR_CONF_FUNC("check_cert_cn", FR_TYPE_BOOL, 0, kafka_config_parse, kafka_config_dflt),
+ { FR_CONF_PAIR_GLOBAL("check_cert_cn", FR_TYPE_BOOL, 0, kafka_config_parse, kafka_config_dflt),
.uctx = &(fr_kafka_conf_ctx_t){ .property = "ssl.endpoint.identification.algorithm",
.mapping = kafka_check_cert_cn_table,
.mapping_len = &kafka_check_cert_cn_table_len }},
/*
* Socket timeout
*/
- { FR_CONF_FUNC("timeout", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt),
+ { FR_CONF_PAIR_GLOBAL("timeout", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt),
.uctx = &(fr_kafka_conf_ctx_t){ .property = "socket.timeout.ms" }},
/*
* Close broker connections after this period.
*/
- { FR_CONF_FUNC("idle_timeout", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt),
+ { FR_CONF_PAIR_GLOBAL("idle_timeout", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt),
.uctx = &(fr_kafka_conf_ctx_t){ .property = "connections.max.idle.ms" }},
/*
* Maximum requests in flight (per connection).
*/
- { FR_CONF_FUNC("max_requests_in_flight", FR_TYPE_UINT64, 0, kafka_config_parse, kafka_config_dflt),
+ { FR_CONF_PAIR_GLOBAL("max_requests_in_flight", FR_TYPE_UINT64, 0, kafka_config_parse, kafka_config_dflt),
.uctx = &(fr_kafka_conf_ctx_t){ .property = "max.in.flight.requests.per.connection" }},
/*
* Socket send buffer.
*/
- { FR_CONF_FUNC("send_buff", FR_TYPE_UINT64, 0, kafka_config_parse, kafka_config_dflt),
+ { FR_CONF_PAIR_GLOBAL("send_buff", FR_TYPE_UINT64, 0, kafka_config_parse, kafka_config_dflt),
.uctx = &(fr_kafka_conf_ctx_t){ .property = "socket.send.buffer.bytes" }},
/*
* Socket recv buffer.
*/
- { FR_CONF_FUNC("recv_buff", FR_TYPE_UINT64, 0, kafka_config_parse, kafka_config_dflt),
+ { FR_CONF_PAIR_GLOBAL("recv_buff", FR_TYPE_UINT64, 0, kafka_config_parse, kafka_config_dflt),
.uctx = &(fr_kafka_conf_ctx_t){ .property = "socket.receive.buffer.bytes" }},
/*
* If true, send TCP keepalives
*/
- { FR_CONF_FUNC("keepalive", FR_TYPE_BOOL, 0, kafka_config_parse, kafka_config_dflt),
+ { FR_CONF_PAIR_GLOBAL("keepalive", FR_TYPE_BOOL, 0, kafka_config_parse, kafka_config_dflt),
.uctx = &(fr_kafka_conf_ctx_t){ .property = "socket.keepalive.enable" }},
/*
* If true, disable nagle algorithm
*/
- { FR_CONF_FUNC("nodelay", FR_TYPE_BOOL, 0, kafka_config_parse, kafka_config_dflt),
+ { FR_CONF_PAIR_GLOBAL("nodelay", FR_TYPE_BOOL, 0, kafka_config_parse, kafka_config_dflt),
.uctx = &(fr_kafka_conf_ctx_t){ .property = "socket.nagle.disable" }},
/*
* How long the DNS resolver cache is valid for
*/
- { FR_CONF_FUNC("resolver_cache_ttl", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt),
+ { FR_CONF_PAIR_GLOBAL("resolver_cache_ttl", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt),
.uctx = &(fr_kafka_conf_ctx_t){ .property = "broker.address.ttl" }},
/*
* Should we use A records, AAAA records or either
* when resolving broker addresses
*/
- { FR_CONF_FUNC("resolver_addr_family", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt),
+ { FR_CONF_PAIR_GLOBAL("resolver_addr_family", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt),
.uctx = &(fr_kafka_conf_ctx_t){ .property = "broker.address.family" }},
/*
* How many failures before we reconnect the connection
*/
- { FR_CONF_FUNC("reconnection_failure_count", FR_TYPE_UINT32, 0, kafka_config_parse, kafka_config_dflt),
+ { FR_CONF_PAIR_GLOBAL("reconnection_failure_count", FR_TYPE_UINT32, 0, kafka_config_parse, kafka_config_dflt),
.uctx = &(fr_kafka_conf_ctx_t){ .property = "socket.max.fails" }},
/*
* Initial time to wait before reconnecting.
*/
- { FR_CONF_FUNC("reconnection_delay_initial", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt),
+ { FR_CONF_PAIR_GLOBAL("reconnection_delay_initial", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt),
.uctx = &(fr_kafka_conf_ctx_t){ .property = "reconnect.backoff.ms" }},
/*
* Max time to wait before reconnecting.
*/
- { FR_CONF_FUNC("reconnection_delay_max", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt),
+ { FR_CONF_PAIR_GLOBAL("reconnection_delay_max", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt),
.uctx = &(fr_kafka_conf_ctx_t){ .property = "reconnect.backoff.max.ms" }},
CONF_PARSER_TERMINATOR
/*
* Request the API version from connected brokers
*/
- { FR_CONF_FUNC("request", FR_TYPE_BOOL, 0, kafka_config_parse, kafka_config_dflt),
+ { FR_CONF_PAIR_GLOBAL("request", FR_TYPE_BOOL, 0, kafka_config_parse, kafka_config_dflt),
.uctx = &(fr_kafka_conf_ctx_t){ .property = "api.version.request" }},
/*
* How long to wait for a version response.
*/
- { FR_CONF_FUNC("timeout", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt),
+ { FR_CONF_PAIR_GLOBAL("timeout", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt),
.uctx = &(fr_kafka_conf_ctx_t){ .property = "api.version.request.timeout.ms" }},
/*
* How long to wait before retrying a version request.
*/
- { FR_CONF_FUNC("retry_delay", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt),
+ { FR_CONF_PAIR_GLOBAL("retry_delay", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt),
.uctx = &(fr_kafka_conf_ctx_t){ .property = "api.version.fallback.ms" }},
/*
* Default version to use if the version request fails.
*/
- { FR_CONF_FUNC("default", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt),
+ { FR_CONF_PAIR_GLOBAL("default", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt),
.uctx = &(fr_kafka_conf_ctx_t){ .property = "broker.version.fallback" }},
CONF_PARSER_TERMINATOR
/*
* Interval between attempts to refresh metadata from brokers
*/
- { FR_CONF_FUNC("refresh_interval", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt),
+ { FR_CONF_PAIR_GLOBAL("refresh_interval", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt),
.uctx = &(fr_kafka_conf_ctx_t){ .property = "topic.metadata.refresh.interval.ms" }},
/*
* Interval between attempts to refresh metadata from brokers
*/
- { FR_CONF_FUNC("max_age", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt),
+ { FR_CONF_PAIR_GLOBAL("max_age", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt),
.uctx = &(fr_kafka_conf_ctx_t){ .property = "metadata.max.age.ms" }},
/*
* Used when a topic loses its leader
*/
- { FR_CONF_FUNC("fast_refresh_interval", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt),
+ { FR_CONF_PAIR_GLOBAL("fast_refresh_interval", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt),
.uctx = &(fr_kafka_conf_ctx_t){ .property = "topic.metadata.refresh.fast.interval.ms" }},
/*
* Used when a topic loses its leader to prevent spurious metadata changes
*/
- { FR_CONF_FUNC("max_propagation", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt),
+ { FR_CONF_PAIR_GLOBAL("max_propagation", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt),
.uctx = &(fr_kafka_conf_ctx_t){ .property = "topic.metadata.propagation.max.ms" }},
/*
* Use sparse metadata requests which use less bandwidth maps
*/
- { FR_CONF_FUNC("refresh_sparse", FR_TYPE_BOOL, 0, kafka_config_parse, kafka_config_dflt),
+ { FR_CONF_PAIR_GLOBAL("refresh_sparse", FR_TYPE_BOOL, 0, kafka_config_parse, kafka_config_dflt),
.uctx = &(fr_kafka_conf_ctx_t){ .property = "topic.metadata.refresh.sparse" }},
/*
* List of topics to ignore
*/
- { FR_CONF_FUNC("blacklist", FR_TYPE_STRING, CONF_FLAG_MULTI, kafka_config_parse, kafka_config_dflt),
+ { FR_CONF_PAIR_GLOBAL("blacklist", FR_TYPE_STRING, CONF_FLAG_MULTI, kafka_config_parse, kafka_config_dflt),
.uctx = &(fr_kafka_conf_ctx_t){ .property = "topic.blacklist", .string_sep = ",", .empty_default = true }},
CONF_PARSER_TERMINATOR
* the names here so the raw-passthrough catch-all below
* doesn't try to hand them to rd_kafka_topic_conf_set.
*/
- { FR_CONF_FUNC("value", FR_TYPE_STRING, 0, kafka_noop_parse, NULL) },
- { FR_CONF_FUNC("key", FR_TYPE_STRING, 0, kafka_noop_parse, NULL) },
+ { FR_CONF_PAIR_GLOBAL("value", FR_TYPE_STRING, 0, kafka_noop_parse, NULL) },
+ { FR_CONF_PAIR_GLOBAL("key", FR_TYPE_STRING, 0, kafka_noop_parse, NULL) },
/*
* This field indicates the number of acknowledgements the leader
* broker must receive from ISR brokers before responding to the request.
*/
- { FR_CONF_FUNC("request_required_acks", FR_TYPE_INT16, 0, kafka_topic_config_parse, kafka_topic_config_dflt),
+ { FR_CONF_PAIR_GLOBAL("request_required_acks", FR_TYPE_INT16, 0, kafka_topic_config_parse, kafka_topic_config_dflt),
.uctx = &(fr_kafka_conf_ctx_t){ .property = "request.required.acks" }},
/*
* medium The ack timeout of the producer request in milliseconds
*/
- { FR_CONF_FUNC("request_timeout", FR_TYPE_TIME_DELTA, 0, kafka_topic_config_parse, kafka_topic_config_dflt),
+ { FR_CONF_PAIR_GLOBAL("request_timeout", FR_TYPE_TIME_DELTA, 0, kafka_topic_config_parse, kafka_topic_config_dflt),
.uctx = &(fr_kafka_conf_ctx_t){ .property = "request.timeout.ms" }},
/*
* Local message timeout
*/
- { FR_CONF_FUNC("message_timeout", FR_TYPE_TIME_DELTA, 0, kafka_topic_config_parse, kafka_topic_config_dflt),
+ { FR_CONF_PAIR_GLOBAL("message_timeout", FR_TYPE_TIME_DELTA, 0, kafka_topic_config_parse, kafka_topic_config_dflt),
.uctx = &(fr_kafka_conf_ctx_t){ .property = "message.timeout.ms" }},
/*
* Partitioning strategy
*/
- { FR_CONF_FUNC("partitioner", FR_TYPE_STRING, 0, kafka_topic_config_parse, kafka_topic_config_dflt),
+ { FR_CONF_PAIR_GLOBAL("partitioner", FR_TYPE_STRING, 0, kafka_topic_config_parse, kafka_topic_config_dflt),
.uctx = &(fr_kafka_conf_ctx_t){ .property = "partitioner" }},
/*
* compression codec to use for compressing message sets.
*/
- { FR_CONF_FUNC("compression_type", FR_TYPE_STRING, 0, kafka_topic_config_parse, kafka_topic_config_dflt),
+ { FR_CONF_PAIR_GLOBAL("compression_type", FR_TYPE_STRING, 0, kafka_topic_config_parse, kafka_topic_config_dflt),
.uctx = &(fr_kafka_conf_ctx_t){ .property = "compression.type" }},
/*
* compression level to use
*/
- { FR_CONF_FUNC("compression_level", FR_TYPE_INT8, 0, kafka_topic_config_parse, kafka_topic_config_dflt),
+ { FR_CONF_PAIR_GLOBAL("compression_level", FR_TYPE_INT8, 0, kafka_topic_config_parse, kafka_topic_config_dflt),
.uctx = &(fr_kafka_conf_ctx_t){ .property = "compression.level" }},
/*
/*
* Group consumer is a member of
*/
- { FR_CONF_FUNC("id", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt),
+ { FR_CONF_PAIR_GLOBAL("id", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt),
.uctx = &(fr_kafka_conf_ctx_t){ .property = "group.id" }},
/*
* A unique identifier of the consumer instance provided by the end user
*/
- { FR_CONF_FUNC("instance_id", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt),
+ { FR_CONF_PAIR_GLOBAL("instance_id", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt),
.uctx = &(fr_kafka_conf_ctx_t){ .property = "group.instance.id" }},
/*
* Range or roundrobin
*/
- { FR_CONF_FUNC("partition_assignment_strategy", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt),
+ { FR_CONF_PAIR_GLOBAL("partition_assignment_strategy", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt),
.uctx = &(fr_kafka_conf_ctx_t){ .property = "partition.assignment.strategy" }},
/*
* Client group session and failure detection timeout.
*/
- { FR_CONF_FUNC("session_timeout", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt),
+ { FR_CONF_PAIR_GLOBAL("session_timeout", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt),
.uctx = &(fr_kafka_conf_ctx_t){ .property = "session.timeout.ms" }},
/*
* Group session keepalive heartbeat interval.
*/
- { FR_CONF_FUNC("heartbeat_interval", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt),
+ { FR_CONF_PAIR_GLOBAL("heartbeat_interval", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt),
.uctx = &(fr_kafka_conf_ctx_t){ .property = "heartbeat.interval.ms" }},
/*
* How often to query for the current client group coordinator
*/
- { FR_CONF_FUNC("coordinator_query_interval", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt),
+ { FR_CONF_PAIR_GLOBAL("coordinator_query_interval", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt),
.uctx = &(fr_kafka_conf_ctx_t){ .property = "coordinator.query.interval.ms" }},
*
* High numbers may starve the worker thread
*/
- { FR_CONF_FUNC("max_messages_per_cycle", FR_TYPE_UINT32, 0, kafka_config_parse, kafka_config_dflt),
+ { FR_CONF_PAIR_GLOBAL("max_messages_per_cycle", FR_TYPE_UINT32, 0, kafka_config_parse, kafka_config_dflt),
.uctx = &(fr_kafka_conf_ctx_t){ .property = "consume.callback.max.messages" }},
/*
* Action to take when there is no initial offset
* in offset store or the desired offset is out of range.
*/
- { FR_CONF_FUNC("auto_offset_reset", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt),
+ { FR_CONF_PAIR_GLOBAL("auto_offset_reset", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt),
.uctx = &(fr_kafka_conf_ctx_t){ .property = "auto.offset.reset" }},
/*
typedef struct {
rd_kafka_conf_t *conf;
- fr_time_delta_t flush_timeout; //!< How long to wait for in-flight messages to drain
- //!< on producer teardown. Also doubles as the consumer's
- //!< close timeout. Needed by every user of the library, so
- //!< it lives here rather than being duplicated per-module.
-
fr_rb_tree_t *topics; //!< Declared topics, keyed by name. Populated during
//!< config parsing by the per-topic hook on the `topic { }`
//!< subsection; use `kafka_topic_lookup` to query.
/** Generic librdkafka-property parser used by `KAFKA_BASE_PRODUCER_CONFIG` entries.
*
- * Exposed so the macro's FR_CONF_FUNC entries can reference it from any TU.
+ * Exposed so the macro's FR_CONF_PAIR_GLOBAL entries can reference it from any TU.
*/
int kafka_config_parse(TALLOC_CTX *ctx, void *out, void *base, CONF_ITEM *ci, conf_parser_t const *rule);
*/
#define KAFKA_BASE_CONFIG \
/* Initial list of brokers. librdkafka only needs one it can reach. */ \
- { FR_CONF_FUNC("server", FR_TYPE_STRING, CONF_FLAG_REQUIRED | CONF_FLAG_MULTI, kafka_config_parse, kafka_config_dflt), \
+ { FR_CONF_PAIR_GLOBAL("server", FR_TYPE_STRING, CONF_FLAG_REQUIRED | CONF_FLAG_MULTI, kafka_config_parse, kafka_config_dflt), \
.uctx = &(fr_kafka_conf_ctx_t){ .property = "metadata.broker.list", .string_sep = "," }}, \
/* Identifier sent with each request to brokers. */ \
- { FR_CONF_FUNC("client_id", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt), \
+ { FR_CONF_PAIR_GLOBAL("client_id", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt), \
.uctx = &(fr_kafka_conf_ctx_t){ .property = "client.id" }}, \
/* Rack identifier for rack-aware fetch-from-follower. */ \
- { FR_CONF_FUNC("rack_id", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt), \
+ { FR_CONF_PAIR_GLOBAL("rack_id", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt), \
.uctx = &(fr_kafka_conf_ctx_t){ .property = "client.rack" }}, \
/* Max size of a message the broker will accept. */ \
- { FR_CONF_FUNC("request_max_size", FR_TYPE_SIZE, 0, kafka_config_parse, kafka_config_dflt), \
+ { FR_CONF_PAIR_GLOBAL("request_max_size", FR_TYPE_SIZE, 0, kafka_config_parse, kafka_config_dflt), \
.uctx = &(fr_kafka_conf_ctx_t){ .property = "message.max.bytes" }}, \
/* Max size of a message copied into librdkafka's send buffer. */ \
- { FR_CONF_FUNC("request_copy_max_size", FR_TYPE_SIZE, 0, kafka_config_parse, kafka_config_dflt), \
+ { FR_CONF_PAIR_GLOBAL("request_copy_max_size", FR_TYPE_SIZE, 0, kafka_config_parse, kafka_config_dflt), \
.uctx = &(fr_kafka_conf_ctx_t){ .property = "message.copy.max.bytes" }}, \
/* Max size of a response from a broker. */ \
- { FR_CONF_FUNC("response_max_size", FR_TYPE_SIZE, 0, kafka_config_parse, kafka_config_dflt), \
+ { FR_CONF_PAIR_GLOBAL("response_max_size", FR_TYPE_SIZE, 0, kafka_config_parse, kafka_config_dflt), \
.uctx = &(fr_kafka_conf_ctx_t){ .property = "receive.message.max.bytes" }}, \
/* Compile-time features to enable (comma-separated). */ \
- { FR_CONF_FUNC("feature", FR_TYPE_STRING, CONF_FLAG_MULTI, kafka_config_parse, kafka_config_dflt), \
+ { FR_CONF_PAIR_GLOBAL("feature", FR_TYPE_STRING, CONF_FLAG_MULTI, kafka_config_parse, kafka_config_dflt), \
.uctx = &(fr_kafka_conf_ctx_t){ .property = "builtin.features", .string_sep = "," }}, \
/* Comma-separated list of debug contexts to enable. */ \
- { FR_CONF_FUNC("debug", FR_TYPE_STRING, CONF_FLAG_MULTI, kafka_config_parse, kafka_config_dflt), \
+ { FR_CONF_PAIR_GLOBAL("debug", FR_TYPE_STRING, CONF_FLAG_MULTI, kafka_config_parse, kafka_config_dflt), \
.uctx = &(fr_kafka_conf_ctx_t){ .property = "debug", .string_sep = "," }}, \
/* Semicolon-separated plugin library paths to load. */ \
- { FR_CONF_FUNC("plugin", FR_TYPE_STRING, CONF_FLAG_MULTI, kafka_config_parse, NULL), \
+ { FR_CONF_PAIR_GLOBAL("plugin", FR_TYPE_STRING, CONF_FLAG_MULTI, kafka_config_parse, NULL), \
.uctx = &(fr_kafka_conf_ctx_t){ .property = "plugin.library.paths", .string_sep = ";" }}, \
{ FR_CONF_SUBSECTION_GLOBAL("metadata", 0, kafka_metadata_config) }, \
{ FR_CONF_SUBSECTION_GLOBAL("version", 0, kafka_version_config) }, \
/* Escape-hatch for librdkafka client properties we don't enumerate. \
* Contents are fed verbatim to rd_kafka_conf_set - no type dispatch, \
* the user writes librdkafka-native units (e.g. "500" for ms). */ \
- { FR_CONF_SUBSECTION_GLOBAL("properties", 0, kafka_base_properties_config) }, \
- /* How long producer teardown / consumer close wait for in-flight messages \
- * to drain before tearing down the handle. Lost on expiry. */ \
- { FR_CONF_OFFSET("flush_timeout", fr_kafka_conf_t, flush_timeout), .dflt = "5s" }
+ { FR_CONF_SUBSECTION_GLOBAL("properties", 0, kafka_base_properties_config) }
/** Producer-only delta: librdkafka producer tuning + declared topics.
*
*/
#define KAFKA_PRODUCER_CONFIG \
/* Enables the transactional producer. */ \
- { FR_CONF_FUNC("transactional_id", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt), \
+ { FR_CONF_PAIR_GLOBAL("transactional_id", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt), \
.uctx = &(fr_kafka_conf_ctx_t){ .property = "transactional.id", .empty_default = true }}, \
/* Maximum time the transaction coordinator will wait for a status update \
* from the producer before proactively aborting the transaction. */ \
- { FR_CONF_FUNC("transaction_timeout", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt), \
+ { FR_CONF_PAIR_GLOBAL("transaction_timeout", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt), \
.uctx = &(fr_kafka_conf_ctx_t){ .property = "transaction.timeout.ms" }}, \
/* Ensures exactly-once, in-order delivery per partition. \
* Requires acks=all semantics broker-side. */ \
- { FR_CONF_FUNC("idempotence", FR_TYPE_BOOL, 0, kafka_config_parse, kafka_config_dflt), \
+ { FR_CONF_PAIR_GLOBAL("idempotence", FR_TYPE_BOOL, 0, kafka_config_parse, kafka_config_dflt), \
.uctx = &(fr_kafka_conf_ctx_t){ .property = "enable.idempotence" }}, \
/* Fail any error that would cause a gap in the produced message series. */ \
- { FR_CONF_FUNC("gapless_guarantee", FR_TYPE_BOOL, 0, kafka_config_parse, kafka_config_dflt), \
+ { FR_CONF_PAIR_GLOBAL("gapless_guarantee", FR_TYPE_BOOL, 0, kafka_config_parse, kafka_config_dflt), \
.uctx = &(fr_kafka_conf_ctx_t){ .property = "enable.gapless.guarantee" }}, \
/* Max number of messages buffered across all topics/partitions. \
* Produce fails synchronously (QUEUE_FULL) once hit. */ \
- { FR_CONF_FUNC("queue_max_messages", FR_TYPE_UINT32, 0, kafka_config_parse, kafka_config_dflt), \
+ { FR_CONF_PAIR_GLOBAL("queue_max_messages", FR_TYPE_UINT32, 0, kafka_config_parse, kafka_config_dflt), \
.uctx = &(fr_kafka_conf_ctx_t){ .property = "queue.buffering.max.messages" }}, \
/* Max total size of buffered messages (bytes, scaled from kbytes). */ \
- { FR_CONF_FUNC("queue_max_size", FR_TYPE_SIZE, 0, kafka_config_parse, kafka_config_dflt), \
+ { FR_CONF_PAIR_GLOBAL("queue_max_size", FR_TYPE_SIZE, 0, kafka_config_parse, kafka_config_dflt), \
.uctx = &(fr_kafka_conf_ctx_t){ .property = "queue.buffering.max.kbytes", .size_scale = 1024 }}, \
/* Linger time before a batch is sent, for producer-side batching. */ \
- { FR_CONF_FUNC("queue_max_delay", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt), \
+ { FR_CONF_PAIR_GLOBAL("queue_max_delay", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt), \
.uctx = &(fr_kafka_conf_ctx_t){ .property = "queue.buffering.max.ms" }}, \
/* Max number of retries per failed send. */ \
- { FR_CONF_FUNC("message_retry_max", FR_TYPE_UINT32, 0, kafka_config_parse, kafka_config_dflt), \
+ { FR_CONF_PAIR_GLOBAL("message_retry_max", FR_TYPE_UINT32, 0, kafka_config_parse, kafka_config_dflt), \
.uctx = &(fr_kafka_conf_ctx_t){ .property = "message.send.max.retries" }}, \
/* Backoff between retries of a protocol request. */ \
- { FR_CONF_FUNC("message_retry_interval", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt), \
+ { FR_CONF_PAIR_GLOBAL("message_retry_interval", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt), \
.uctx = &(fr_kafka_conf_ctx_t){ .property = "retry.backoff.ms" }}, \
/* Outstanding-request threshold at which the accumulator backpressures. */ \
- { FR_CONF_FUNC("backpressure_threshold", FR_TYPE_UINT32, 0, kafka_config_parse, kafka_config_dflt), \
+ { FR_CONF_PAIR_GLOBAL("backpressure_threshold", FR_TYPE_UINT32, 0, kafka_config_parse, kafka_config_dflt), \
.uctx = &(fr_kafka_conf_ctx_t){ .property = "queue.buffering.backpressure.threshold" }}, \
/* Compression codec: none, gzip, snappy, lz4, zstd. */ \
- { FR_CONF_FUNC("compression_type", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt), \
+ { FR_CONF_PAIR_GLOBAL("compression_type", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt), \
.uctx = &(fr_kafka_conf_ctx_t){ .property = "compression.type" }}, \
/* Max size (bytes) of all messages batched into one MessageSet. */ \
- { FR_CONF_FUNC("batch_size", FR_TYPE_SIZE, 0, kafka_config_parse, kafka_config_dflt), \
+ { FR_CONF_PAIR_GLOBAL("batch_size", FR_TYPE_SIZE, 0, kafka_config_parse, kafka_config_dflt), \
.uctx = &(fr_kafka_conf_ctx_t){ .property = "batch.size" }}, \
/* Delay before reassigning sticky partitions per topic. */ \
- { FR_CONF_FUNC("sticky_partition_delay", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt), \
+ { FR_CONF_PAIR_GLOBAL("sticky_partition_delay", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt), \
.uctx = &(fr_kafka_conf_ctx_t){ .property = "sticky.partitioning.linger.ms" }}, \
/* Declared topics. Topic-level conf is stashed on each subsection via \
* cf_data_add; the subcs_size is a dummy because cf_parse asserts on \
/* Consumer-group membership config (id, instance_id, session_timeout, ...). */ \
{ FR_CONF_SUBSECTION_GLOBAL("group", 0, kafka_consumer_group_config) }, \
/* Max allowed time between calls to consume messages. */ \
- { FR_CONF_FUNC("max_poll_interval", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt), \
+ { FR_CONF_PAIR_GLOBAL("max_poll_interval", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt), \
.uctx = &(fr_kafka_conf_ctx_t){ .property = "max.poll.interval.ms" }}, \
/* Whether offsets are committed automatically. */ \
- { FR_CONF_FUNC("auto_commit", FR_TYPE_BOOL, 0, kafka_config_parse, kafka_config_dflt), \
+ { FR_CONF_PAIR_GLOBAL("auto_commit", FR_TYPE_BOOL, 0, kafka_config_parse, kafka_config_dflt), \
.uctx = &(fr_kafka_conf_ctx_t){ .property = "enable.auto.commit" }}, \
/* Interval between auto commits. */ \
- { FR_CONF_FUNC("auto_commit_interval", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt), \
+ { FR_CONF_PAIR_GLOBAL("auto_commit_interval", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt), \
.uctx = &(fr_kafka_conf_ctx_t){ .property = "auto.commit.interval.ms" }}, \
/* Automatically store the offset of the last message handed to the application. */ \
- { FR_CONF_FUNC("auto_offset_store", FR_TYPE_BOOL, 0, kafka_config_parse, kafka_config_dflt), \
+ { FR_CONF_PAIR_GLOBAL("auto_offset_store", FR_TYPE_BOOL, 0, kafka_config_parse, kafka_config_dflt), \
.uctx = &(fr_kafka_conf_ctx_t){ .property = "enable.auto.offset.store" }}, \
/* Min number of messages per topic+partition librdkafka keeps locally. */ \
- { FR_CONF_FUNC("queued_messages_min", FR_TYPE_UINT64, 0, kafka_config_parse, kafka_config_dflt), \
+ { FR_CONF_PAIR_GLOBAL("queued_messages_min", FR_TYPE_UINT64, 0, kafka_config_parse, kafka_config_dflt), \
.uctx = &(fr_kafka_conf_ctx_t){ .property = "queued.min.messages" }}, \
/* Max total size of pre-fetched messages in the local consumer queue. */ \
- { FR_CONF_FUNC("queued_messages_max_size", FR_TYPE_SIZE, 0, kafka_config_parse, kafka_config_dflt), \
+ { FR_CONF_PAIR_GLOBAL("queued_messages_max_size", FR_TYPE_SIZE, 0, kafka_config_parse, kafka_config_dflt), \
.uctx = &(fr_kafka_conf_ctx_t){ .property = "queued.max.messages.kbytes", .size_scale = 1024 }}, \
/* Max time the broker may wait to fill the Fetch response. */ \
- { FR_CONF_FUNC("fetch_wait_max", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt), \
+ { FR_CONF_PAIR_GLOBAL("fetch_wait_max", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt), \
.uctx = &(fr_kafka_conf_ctx_t){ .property = "fetch.wait.max.ms" }}, \
/* Initial per-partition fetch size. */ \
- { FR_CONF_FUNC("fetch_message_max_size", FR_TYPE_SIZE, 0, kafka_config_parse, kafka_config_dflt), \
+ { FR_CONF_PAIR_GLOBAL("fetch_message_max_size", FR_TYPE_SIZE, 0, kafka_config_parse, kafka_config_dflt), \
.uctx = &(fr_kafka_conf_ctx_t){ .property = "fetch.message.max.bytes" }}, \
/* Max bytes per topic+partition in a Fetch response. */ \
- { FR_CONF_FUNC("fetch_partition_max_size", FR_TYPE_SIZE, 0, kafka_config_parse, kafka_config_dflt), \
+ { FR_CONF_PAIR_GLOBAL("fetch_partition_max_size", FR_TYPE_SIZE, 0, kafka_config_parse, kafka_config_dflt), \
.uctx = &(fr_kafka_conf_ctx_t){ .property = "max.partition.fetch.bytes" }}, \
/* Max total data a broker may return for a single Fetch request. */ \
- { FR_CONF_FUNC("fetch_max_size", FR_TYPE_SIZE, 0, kafka_config_parse, kafka_config_dflt), \
+ { FR_CONF_PAIR_GLOBAL("fetch_max_size", FR_TYPE_SIZE, 0, kafka_config_parse, kafka_config_dflt), \
.uctx = &(fr_kafka_conf_ctx_t){ .property = "fetch.max.bytes" }}, \
/* Min bytes the broker responds with. */ \
- { FR_CONF_FUNC("fetch_min_size", FR_TYPE_SIZE, 0, kafka_config_parse, kafka_config_dflt), \
+ { FR_CONF_PAIR_GLOBAL("fetch_min_size", FR_TYPE_SIZE, 0, kafka_config_parse, kafka_config_dflt), \
.uctx = &(fr_kafka_conf_ctx_t){ .property = "fetch.min.bytes" }}, \
/* How long to wait before retrying a fetch after an error. */ \
- { FR_CONF_FUNC("fetch_error_backoff", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt), \
+ { FR_CONF_PAIR_GLOBAL("fetch_error_backoff", FR_TYPE_TIME_DELTA, 0, kafka_config_parse, kafka_config_dflt), \
.uctx = &(fr_kafka_conf_ctx_t){ .property = "fetch.error.backoff.ms" }}, \
/* How to read messages written transactionally (read_committed / read_uncommitted). */ \
- { FR_CONF_FUNC("isolation_level", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt), \
+ { FR_CONF_PAIR_GLOBAL("isolation_level", FR_TYPE_STRING, 0, kafka_config_parse, kafka_config_dflt), \
.uctx = &(fr_kafka_conf_ctx_t){ .property = "isolation.level" }}, \
/* Verify CRC32 of every consumed message. */ \
- { FR_CONF_FUNC("check_crcs", FR_TYPE_BOOL, 0, kafka_config_parse, kafka_config_dflt), \
+ { FR_CONF_PAIR_GLOBAL("check_crcs", FR_TYPE_BOOL, 0, kafka_config_parse, kafka_config_dflt), \
.uctx = &(fr_kafka_conf_ctx_t){ .property = "check.crcs" }}, \
/* Allow automatic topic creation when subscribing to an unknown topic. */ \
- { FR_CONF_FUNC("auto_create_topic", FR_TYPE_BOOL, 0, kafka_config_parse, kafka_config_dflt), \
+ { FR_CONF_PAIR_GLOBAL("auto_create_topic", FR_TYPE_BOOL, 0, kafka_config_parse, kafka_config_dflt), \
.uctx = &(fr_kafka_conf_ctx_t){ .property = "allow.auto.create.topics" }}, \
/* Declared subscription topics; same layering as the producer form. */ \
{ FR_CONF_SUBSECTION_GLOBAL("topic", 0, kafka_base_consumer_topics_config) }