#define RLM_KAFKA_PROP_SET(CONF, PROP, VALUE, BUF_ERRSTR) \
do { \
if (rd_kafka_conf_set(CONF, PROP, VALUE, BUF_ERRSTR, sizeof(BUF_ERRSTR)) != RD_KAFKA_CONF_OK ) \
- ERROR("Error setting global property: '%s=%s' : %s\n", PROP, VALUE, BUF_ERRSTR); \
+ ERROR("Error setting Kafka global property: '%s=%s' : %s\n", PROP, VALUE, BUF_ERRSTR); \
} while (0)
#define RLM_KAFKA_TOPIC_PROP_SET(CONF, PROP, VALUE, BUF_ERRSTR) \
do { \
if (rd_kafka_topic_conf_set(CONF, PROP, VALUE, BUF_ERRSTR, sizeof(BUF_ERRSTR)) != RD_KAFKA_CONF_OK ) \
- ERROR("Error setting topic property: '%s=%s' : %s\n", PROP, VALUE, BUF_ERRSTR); \
+ ERROR("Error setting Kafka topic property: '%s=%s' : %s\n", PROP, VALUE, BUF_ERRSTR); \
} while (0)
typedef struct rlm_kafka_section_config {
+
CONF_SECTION *cs;
char const *reference;
char const *key;
+
+ /*
+ * Topic handle to avoid rbtree lookups for section-based calls
+ *
+ */
+ rd_kafka_topic_t *rkt;
+
} rlm_kafka_section_config_t;
+typedef struct rlm_kafka_rkt_by_name {
+
+ const char *name;
+ rd_kafka_topic_t *rkt;
+
+ /*
+ * Only one entry is the "owner" for a topic, and all others are
+ * references to it (having ref = true)
+ */
+ bool ref;
+
+} rlm_kafka_rkt_by_name_t;
+
typedef struct rlm_kafka_t {
char const *name;
bool async;
char const *bootstrap;
- char const *topic;
char const *schema;
char const *stats_filename;
FILE *stats_file;
rd_kafka_t *rk;
- rd_kafka_topic_t *rkt;
+
+ rbtree_t *rkt_by_name_tree;
rlm_kafka_section_config_t authorize;
rlm_kafka_section_config_t postauth;
*/
static const CONF_PARSER module_config[] = {
{ "bootstrap-servers", FR_CONF_OFFSET(PW_TYPE_STRING | PW_TYPE_REQUIRED, rlm_kafka_t, bootstrap), NULL },
- { "topic", FR_CONF_OFFSET(PW_TYPE_STRING | PW_TYPE_REQUIRED, rlm_kafka_t, topic), NULL },
{ "asynchronous", FR_CONF_OFFSET(PW_TYPE_BOOLEAN, rlm_kafka_t, async), "no" },
{ "global-config", FR_CONF_POINTER(PW_TYPE_SUBSECTION, NULL), (void const*) global_config },
{ "topic-config", FR_CONF_POINTER(PW_TYPE_SUBSECTION, NULL), (void const*) topic_config },
if (err != RD_KAFKA_RESP_ERR__QUEUE_FULL) break;
- RERROR("Queue full: %s. Produce attempt %d/%d\n", rd_kafka_err2str(err), attempt, RETRIES + 1);
+ RERROR("Kafka queue full: %s. Produce attempt %d/%d\n",
+ rd_kafka_err2str(err), attempt, RETRIES + 1);
rd_kafka_poll(inst->rk, 1000);
}
}
if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
- RERROR("Failed to produce to topic: %s: %s\n", inst->topic, rd_kafka_err2str(err));
+ RERROR("Failed to produce to Kafka topic: %s: %s\n",
+ rd_kafka_topic_name(rkt), rd_kafka_err2str(err));
return -1;
}
#undef NO_DELIVERY_REPORT
/*
- * Either "%{kafka:&Key-Attr-Ref <message data>}" or "%{kafka:<space><message data>}"
+ * Format is either:
+ * - "%{kafka:<topic> &Key-Attr-Ref <message data>}", or
+ * - "%{kafka:<topic> <message data>}" (if no key)
*
*/
static ssize_t kafka_xlat(void *instance, REQUEST *request, char const *fmt, char *out, UNUSED size_t outlen)
{
- rlm_kafka_t *inst = instance;
- char const *p = fmt;
- uint8_t const *key;
- ssize_t key_len = 0;
- char *expanded = NULL;
+ rlm_kafka_t *inst = instance;
+ char const *p = fmt;
+ ssize_t key_len = 0;
+ char *expanded = NULL;
+ char buf[256];
+ rlm_kafka_rkt_by_name_t *entry, my_topic;
union {
const uint8_t *key_const;
char* const key_unconst;
} k;
- key = k.key_const = NULL;
+ k.key_const = NULL;
*out = '\0';
- if (*p == '&') {
- char key_ref[256];
+ /*
+ * Extract and lookup the topic.
+ *
+ */
+ p = strchr(fmt, ' ');
+ if (!p || *fmt == ' ' || *(p+1) == '\0') {
+ REDEBUG("Kafka xlat must begin with a topic followed by the payload");
+ return -1;
+ }
+ if ((size_t)(p - fmt) >= sizeof(buf)) {
+ REDEBUG("Insufficient space to store Kafka topic name, needed %zu bytes, have %zu bytes",
+ (p - fmt) + 1, sizeof(buf));
+ return -1;
+ }
+ strlcpy(buf, fmt, (p - fmt) + 1);
+ p++;
+ my_topic.name = buf;
+ entry = rbtree_finddata(inst->rkt_by_name_tree, &my_topic);
+ if (!entry || !entry->rkt) {
+ RWARN("No configuration section exists for kafka topic \"%s\"", buf);
+ return -1;
+ }
+
+ /*
+ * Extract the key, if there is one, otherwise expect a space.
+ *
+ */
+ if (*p == '&') {
p = strchr(fmt, ' ');
if (!p) {
REDEBUG("Key attribute form requires a message after the key (&Key-Attr-Ref <message data>)");
return -1;
}
- if ((size_t)(p - fmt) >= sizeof(key_ref)) {
+ if ((size_t)(p - fmt) >= sizeof(buf)) {
REDEBUG("Insufficient space to store key attribute ref, needed %zu bytes, have %zu bytes",
- (p - fmt) + 1, sizeof(key_ref));
+ (p - fmt) + 1, sizeof(buf));
return -1;
}
- strlcpy(key_ref, fmt, (p - fmt) + 1);
+ strlcpy(buf, fmt, (p - fmt) + 1);
- key_len = xlat_fmt_to_ref(&key, request, key_ref);
+ key_len = xlat_fmt_to_ref(&k.key_const, request, buf);
if (key_len < 0) return -1;
- RDEBUG3("message key=%.*s\n", (int)key_len, key);
+ RDEBUG3("message key=%.*s\n", (int)key_len, k.key_const);
} else if (*p != ' ') {
/*
* Require a space to disambiguate data starting with "&"
*
*/
- REDEBUG("Must begin with an attribute reference or a space");
+ REDEBUG("Kafka payload must begin with an attribute reference or a space");
return -1;
}
p++;
+ /*
+ * The remainder is the message.
+ *
+ */
if (radius_axlat(&expanded, request, p, NULL, NULL) < 0) {
REDEBUG("Message expansion failed");
return -1;
}
- kafka_produce(inst, request, inst->rkt, k.key_unconst, key_len, expanded, strlen(expanded), inst->async);
+ kafka_produce(inst, request, entry->rkt, k.key_unconst, key_len, expanded, strlen(expanded), inst->async);
talloc_free(expanded);
return 0;
}
+static int rkt_by_name_cmp(void const *one, void const *two)
+{
+ rlm_kafka_rkt_by_name_t const *a = (rlm_kafka_rkt_by_name_t const *) one;
+ rlm_kafka_rkt_by_name_t const *b = (rlm_kafka_rkt_by_name_t const *) two;
+
+ return strcmp(a->name, b->name);
+}
+
+static int destruct_entry(rlm_kafka_rkt_by_name_t *entry) {
+ /*
+ * Destroy rkt only if we are the owner (not a reference)
+ *
+ */
+ if (!entry->ref && entry->rkt)
+ rd_kafka_topic_destroy(entry->rkt);
+ entry->rkt = NULL;
+ return 0;
+}
+
+static void free_rkt_by_name_entry(void *data)
+{
+ rlm_kafka_rkt_by_name_t *entry = (rlm_kafka_rkt_by_name_t *) data;
+ talloc_free(entry);
+}
+
+static int instantiate_topic(CONF_SECTION *cs, rlm_kafka_t *inst, char *errstr) {
+
+ CONF_PAIR *cp;
+ rd_kafka_topic_conf_t *tconf;
+ rd_kafka_topic_t *rkt;
+ bool ref = false;
+ rlm_kafka_rkt_by_name_t *entry = NULL;
+ const char *name = cf_section_name2(cs);
+
+ /*
+ * Short circuit for when we are given a reference to an existing topic
+ *
+ */
+ cp = cf_pair_find_next(cs, NULL, NULL);
+ if (cp) {
+ char const *attr = cf_pair_attr(cp);
+ char const *value = cf_pair_value(cp);
+
+ if (strcmp(attr, "reference") == 0) {
+ rlm_kafka_rkt_by_name_t my_topic;
+
+ my_topic.name = value;
+ entry = rbtree_finddata(inst->rkt_by_name_tree, &my_topic);
+ if (!entry || !entry->rkt) {
+ ERROR("Couldn't reference Kafka topic \"%s\" for \"%s\"",
+ value, cf_section_name2(cs));
+ return -1;
+ }
+ if (cf_pair_find_next(cs, cp, NULL)) {
+ ERROR("A reference for another Kafka topic must be the only attribute");
+ return -1;
+ }
+ DEBUG3("Kafka topic \"%s\" configured as a reference to \"%s\"",
+ cf_section_name2(cs), value);
+ rkt = entry->rkt;
+ ref = true;
+ goto finalise;
+ }
+ }
+
+ /*
+ * Configuration for the new topic
+ *
+ */
+ tconf = rd_kafka_topic_conf_new();
+
+ /*
+ * When synchronous, don't block for longer than a typical request timeout.
+ *
+ * Can be overridden by message.timeout.ms in the topic conf_section
+ */
+ if (!inst->async)
+ RLM_KAFKA_TOPIC_PROP_SET(tconf, "message.timeout.ms", "30000", errstr);
+
+ /*
+ * Set topic properties from the topic conf_section
+ */
+ cp = NULL;
+ do {
+
+ cp = cf_pair_find_next(cs, cp, NULL);
+ if (cp) {
+ char const *attr = cf_pair_attr(cp);
+ char const *value = cf_pair_value(cp);
+ if (strcmp(attr, "name") == 0) { /* Override section name */
+ name = value;
+ continue;
+ } else if (strcmp(attr, "reference") == 0) {
+ ERROR("A reference for another Kafka topic must be the only attribute");
+ rd_kafka_topic_conf_destroy(tconf);
+ return -1;
+ }
+ RLM_KAFKA_TOPIC_PROP_SET(tconf, attr, value, errstr);
+ }
+ } while (cp != NULL);
+
+ /*
+ * Show the topic configurations for debugging
+ */
+ if (rad_debug_lvl >= L_DBG_LVL_3) {
+ size_t cnt, i;
+ const char **arr;
+
+ DEBUG3("Configuration for Kafka topic \"%s\":", name);
+ for (i = 0, arr = rd_kafka_topic_conf_dump(tconf, &cnt); i < cnt; i += 2)
+ DEBUG3("\t%s = %s", arr[i], arr[i + 1]);
+ }
+
+ /*
+ * And create the topic according to the configuration.
+ *
+ * Upon success, the rkt assumes responsibility for tconf
+ *
+ */
+ rkt = rd_kafka_topic_new(inst->rk, name, tconf);
+ if (!rkt) {
+ ERROR("Failed to create Kafka topic \"%s\"", name);
+ rd_kafka_topic_conf_destroy(tconf);
+ return -1;
+ }
+
+finalise:
+
+ /*
+ * Finally insert the entry into the rbtree.
+ *
+ */
+ entry = talloc(NULL, rlm_kafka_rkt_by_name_t);
+ if (!entry)
+ return -1;
+ talloc_set_destructor(entry, destruct_entry);
+ entry->name = talloc_strdup(entry, cf_section_name2(cs));
+ if (!entry->name) {
+ fail:
+ talloc_free(entry);
+ return -1;
+ }
+ entry->rkt = rkt;
+ entry->ref = ref;
+
+ if (!rbtree_insert(inst->rkt_by_name_tree, entry))
+ goto fail;
+
+ DEBUG("Created Kafka topic for \"%s\"", name);
+
+ return 0;
+
+}
+
+static inline void set_section_rkt(rlm_kafka_t *inst, rlm_kafka_section_config_t *section)
+{
+ rlm_kafka_rkt_by_name_t *entry, my_topic;
+
+ my_topic.name = cf_section_name1(section->cs);
+ entry = rbtree_finddata(inst->rkt_by_name_tree, &my_topic);
+ section->rkt = entry && entry->rkt ? entry->rkt : NULL;
+}
+
static int mod_instantiate(CONF_SECTION *conf, void *instance)
{
rlm_kafka_t *inst = instance;
rd_kafka_conf_t *kconf;
- rd_kafka_topic_conf_t *tconf;
char errstr[512];
CONF_PAIR *cp = NULL;
-
- /*
- * Capture a self-reference for the sections
- */
- inst->authorize.cs = cf_section_sub_find(conf, "authorize");
- inst->postauth.cs = cf_section_sub_find(conf, "post-auth");
- inst->accounting.cs = cf_section_sub_find(conf, "accounting");
+ CONF_SECTION *cs;
/*
* Configuration for the global producer
rd_kafka_conf_set_opaque(kconf, inst);
- DEBUG3("Registering logging callback");
+ DEBUG3("Registering Kafka logging callback");
rd_kafka_conf_set_log_cb(kconf, log_cb);
- DEBUG3("Registering delivery report callback");
+ DEBUG3("Registering Kafka delivery report callback");
rd_kafka_conf_set_dr_msg_cb(kconf, dr_msg_cb);
if (inst->stats_filename) {
- DEBUG3("Opening statistics file for writing: %s", inst->stats_filename);
+ DEBUG3("Opening Kafka statistics file for writing: %s", inst->stats_filename);
inst->stats_file = fopen(inst->stats_filename, "a");
if (!inst->stats_file) {
- ERROR("Error opening statistics file: %s", inst->stats_filename);
+ ERROR("Error opening Kafka statistics file: %s", inst->stats_filename);
/* Carry on, just don't log stats */
} else {
- DEBUG3("Registering statistics callback");
+ DEBUG3("Registering Kafka statistics callback");
rd_kafka_conf_set_stats_cb(kconf, stats_cb);
}
}
*/
inst->rk = rd_kafka_new(RD_KAFKA_PRODUCER, kconf, errstr, sizeof(errstr));
if (!inst->rk) {
- ERROR("Failed to create new producer: %s\n", errstr);
+ ERROR("Failed to create new Kafka producer: %s\n", errstr);
rd_kafka_conf_destroy(kconf);
return -1;
}
/*
- * Configuration for the topic
+ * Instantiate a topic for each named topic-config section
*
*/
- tconf = rd_kafka_topic_conf_new();
+ inst->rkt_by_name_tree = rbtree_create(instance, rkt_by_name_cmp, free_rkt_by_name_entry, 0);
+ if (!inst->rkt_by_name_tree) return -1;
- /*
- * When synchronous, don't block for longer than a typical request timeout.
- *
- * Can be overridden by message.timeout.ms in the topic conf_section
- */
- if (!inst->async)
- RLM_KAFKA_TOPIC_PROP_SET(tconf, "message.timeout.ms", "30000", errstr);
+ for (cs = cf_subsection_find_next(conf, NULL, "topic-config");
+ cs != NULL;
+ cs = cf_subsection_find_next(conf, cs, "topic-config")) {
- /*
- * Set topic properties from the topic conf_section
- */
- do {
- CONF_SECTION *tc = cf_section_sub_find(conf, "topic-config");
+ if (!cf_section_name2(cs)) {
+ WARN("Ignoring unnamed Kafka topic-config");
+ continue;
+ }
- cp = cf_pair_find_next(tc, cp, NULL);
- if (cp) {
- char const *attr = cf_pair_attr(cp);
- char const *value = cf_pair_value(cp);
- RLM_KAFKA_TOPIC_PROP_SET(tconf, attr, value, errstr);
+ if (instantiate_topic(cs, inst, errstr) != 0) {
+ ERROR("Failed to instantiate new Kafka topic for %s\n",
+ cf_section_name2(cs));
+ rbtree_free(inst->rkt_by_name_tree);
+ rd_kafka_destroy(inst->rk);
+ return -1;
}
- } while (cp != NULL);
+
+ }
/*
- * Show the topic configurations for debugging
+ * Capture a self-reference for the sections
*/
- if (rad_debug_lvl >= L_DBG_LVL_3) {
- size_t cnt, i;
- const char **arr;
-
- DEBUG3("Topic configuration:");
- for (i = 0, arr = rd_kafka_topic_conf_dump(tconf, &cnt); i < cnt; i += 2)
- DEBUG3("\t%s = %s", arr[i], arr[i + 1]);
- }
+ inst->authorize.cs = cf_section_sub_find(conf, "authorize");
+ inst->postauth.cs = cf_section_sub_find(conf, "post-auth");
+ inst->accounting.cs = cf_section_sub_find(conf, "accounting");
/*
- * And create the topic according to the configuration
+ * Set the rkt for each section, where such configuration exists
*/
- inst->rkt = rd_kafka_topic_new(inst->rk, inst->topic, tconf);
- if (!inst->rkt) {
- ERROR("Failed to create new topic: %s\n", errstr);
- rd_kafka_topic_conf_destroy(tconf);
- rd_kafka_destroy(inst->rk);
- return -1;
- }
+ set_section_rkt(inst, &inst->authorize);
+ set_section_rkt(inst, &inst->postauth);
+ set_section_rkt(inst, &inst->accounting);
return 0;
}
rlm_kafka_t *inst = instance;
if (inst->stats_file) {
- DEBUG3("Closing statistics file");
+ DEBUG3("Closing Kafka statistics file");
fclose(inst->stats_file);
}
- DEBUG3("Flushing");
+ DEBUG3("Flushing Kafka queues");
if ((err = rd_kafka_flush(inst->rk, 10*1000)) == RD_KAFKA_RESP_ERR__TIMED_OUT)
ERROR("Flush failed: %s\n", rd_kafka_err2str(err));
- rd_kafka_topic_destroy(inst->rkt);
+ rbtree_free(inst->rkt_by_name_tree);
+
rd_kafka_destroy(inst->rk);
return 0;
static rlm_rcode_t CC_HINT(nonnull) kafka_common(void *instance, REQUEST *request, rlm_kafka_section_config_t *section)
{
- rlm_kafka_t *inst = instance;
- char *key = NULL;
- char *message = NULL;
- CONF_ITEM *item;
- CONF_PAIR *cp;
- const char *schema;
- rlm_rcode_t ret = RLM_MODULE_OK;
- char path[MAX_STRING_LEN];
- char *p = path;
+ rlm_kafka_t *inst = instance;
+ char *key = NULL;
+ char *message = NULL;
+ CONF_ITEM *item;
+ CONF_PAIR *cp;
+ const char *schema;
+ rlm_rcode_t ret = RLM_MODULE_OK;
+ char path[MAX_STRING_LEN];
+ char *p = path;
+
+ if (!section->rkt) {
+ RWARN("No configuration exists for Kafka topic for %s section",
+ cf_section_name1(section->cs));
+ return RLM_MODULE_NOOP;
+ }
if (section->reference[0] != '.') {
*p++ = '.';
return RLM_MODULE_FAIL;
}
- if (kafka_produce(inst, request, inst->rkt,
+ if (kafka_produce(inst, request, section->rkt,
key, strlen(key), message, strlen(message),
inst->async
) != 0)