]> git.ipfire.org Git - thirdparty/freeradius-server.git/commitdiff
rlm_kafka: Split topics by section; support custom topics via xlat
authorTerry Burton <tez@terryburton.co.uk>
Thu, 27 Feb 2025 00:11:49 +0000 (00:11 +0000)
committerMatthew Newton <matthew-git@newtoncomputing.co.uk>
Thu, 1 May 2025 15:57:48 +0000 (16:57 +0100)
Add support for topic references and setting of published name for
section-based topic configurations.

raddb/mods-available/kafka
raddb/mods-available/kafka_async
src/modules/rlm_kafka/rlm_kafka.c
src/tests/modules/kafka/module.conf
src/tests/modules/kafka/xlat.unlang

index 7d4a115b96e0ef932f697db8b3762fcc56bc9e61..bfbc557202c71b35dfe242ad5a7c796e33037601 100644 (file)
@@ -1,6 +1,5 @@
 kafka {
        bootstrap-servers = "localhost:9092"
-       topic = "radius"
 
        #
        #  This initialises the default, synchronous instance of the kafka
@@ -28,12 +27,23 @@ kafka {
                #ssl.ca.location = "${cadir}"
                #sasl.username = "kafkauser"
                #sasl.password = "kafkapass"
-               # ...
+               #  ...
                #
 
        }
 
-       topic-config {
+
+        #
+        #  A user-defined topic called by reference or by using its name (which may
+        #  be changed) in an xlat of either form:
+        #
+        #    - "%{kafka:custom-topic &Key-Attr-Ref Remainder is the message}"
+        #    - "%{kafka:custom-topic  Remainder is the message}", if no key
+        #
+        #  The attribute referred to by &Key-Attr-Ref holds the Kafka message's key.
+        #  Notice two spaces leading the message in case there is no key.
+        #
+        topic-config custom-topic {
 
                #
                #  The message durabily strategy: Number of acks required by all
@@ -43,7 +53,7 @@ kafka {
 
                #
                #  How long a message can remain undelivered in the local
-               #  in-memory queue, with the  before it is timed out.
+               #  in-memory queue before it is timed out.
                #
                #  For synchronous delivery, the default local queue is set to
                #  25 sec. It should be some value less than max_request_time
@@ -64,6 +74,41 @@ kafka {
 
        }
 
+       #
+       #  Topic used by calls from an accounting section
+       #
+       topic-config accounting {
+
+               #
+               #  You may either provide topic configuration parameters
+               #  or reference an existing topic (declared above this) to
+               #  reuse it.
+               #
+               #reference = custom-topic
+
+       }
+
+       #
+       #  Topic used by calls from an authorize section
+       #
+       topic-config authorize {
+
+               #
+               #  Used to set the topic name to something other than the
+               #  section from which it was called.
+               #
+               #name = "radius.auth_request"
+
+       }
+
+       #
+       #  Topic used by calls from a post-auth section
+       #
+       topic-config post-auth {
+               #name = "radius.auth_result"
+               #request.required.acks = 2
+       }
+
        #
        #  Uncomment to generate producer statistics
        #
index 745cb0e3d019c1a5f5a6afcf96e5baea43d7459b..b25f7432fc356bb2d0ef0c7c89fd901db06ef121 100644 (file)
@@ -1,6 +1,5 @@
 kafka kafka_async {
        bootstrap-servers = "localhost:9092"
-       topic = "radius"
 
        #
        #  This initialises an asynchronous instance of the kafka module.
@@ -58,7 +57,17 @@ kafka kafka_async {
 
        }
 
-       topic-config {
+       #
+       #  A user-defined topic called by reference or by using its name (which may
+       #  be changed) in an xlat of either form:
+       #
+       #    - "%{kafka_async:custom-topic &Key-Attr-Ref Remainder is the message}"
+       #    - "%{kafka_async:custom-topic  Remainder is the message}", if no key
+       #
+       #  The attribute referred to by &Key-Attr-Ref holds the Kafka message's key.
+       #  Notice two spaces leading the message in case there is no key.
+       #
+       topic-config custom-topic {
 
                #
                #  The message durabily strategy: Number of acks required by all
@@ -90,8 +99,41 @@ kafka kafka_async {
        }
 
        #
-       #  Uncomment to generate producer statistics
+       #  Topic used by calls from an accounting section
+       #
+       topic-config accounting {
+
+               #
+               #  You may either provide topic configuration parameters
+               #  or reference an existing topic (declared above this) to
+               #  reuse it.
+               #
+               #reference = custom-topic
+
+       }
+
+       #
+       #  Topic used by calls from an authorize section
        #
+       topic-config authorize {
+
+               #
+               #  Used to set the topic name to something other than the
+               #  section from which it was called.
+               #
+               #name = "radius.auth_request"
+
+       }
+
+       #
+       #  Topic used by calls from a post-auth section
+       #
+       topic-config post-auth {
+               #name = "radius.auth_result"
+               #request.required.acks = 2
+       }
+
+
 #      statistics {
 #              file = /tmp/kafka_async_stats.json
 #      }
index 8c1b1c5a5a1573b6371427cdd9b5008cbe7e9cd6..3f7d96596832bcb15fcd7d54d25de38d7db4e69a 100644 (file)
@@ -35,21 +35,42 @@ RCSID("$Id$")
 #define RLM_KAFKA_PROP_SET(CONF, PROP, VALUE, BUF_ERRSTR)                                                              \
        do {                                                                                                            \
                if (rd_kafka_conf_set(CONF, PROP, VALUE, BUF_ERRSTR, sizeof(BUF_ERRSTR)) != RD_KAFKA_CONF_OK )          \
-                       ERROR("Error setting global property: '%s=%s' : %s\n", PROP, VALUE, BUF_ERRSTR);                \
+                       ERROR("Error setting Kafka global property: '%s=%s' : %s\n", PROP, VALUE, BUF_ERRSTR);          \
        } while (0)
 
 #define RLM_KAFKA_TOPIC_PROP_SET(CONF, PROP, VALUE, BUF_ERRSTR)                                                                \
        do {                                                                                                            \
                if (rd_kafka_topic_conf_set(CONF, PROP, VALUE, BUF_ERRSTR, sizeof(BUF_ERRSTR)) != RD_KAFKA_CONF_OK )    \
-                       ERROR("Error setting topic property: '%s=%s' : %s\n", PROP, VALUE, BUF_ERRSTR);                 \
+                       ERROR("Error setting Kafka topic property: '%s=%s' : %s\n", PROP, VALUE, BUF_ERRSTR);           \
        } while (0)
 
 typedef struct rlm_kafka_section_config {
+
        CONF_SECTION *cs;
        char const *reference;
        char const *key;
+
+       /*
+        *  Topic handle to avoid rbtree lookups for section-based calls
+        *
+        */
+       rd_kafka_topic_t *rkt;
+
 } rlm_kafka_section_config_t;
 
+typedef struct rlm_kafka_rkt_by_name {
+
+       const char *name;
+       rd_kafka_topic_t *rkt;
+
+       /*
+        *  Only one entry is the "owner" for a topic, and all others are
+        *  references to it (having ref = true)
+        */
+       bool ref;
+
+} rlm_kafka_rkt_by_name_t;
+
 typedef struct rlm_kafka_t {
 
        char const *name;
@@ -57,14 +78,14 @@ typedef struct rlm_kafka_t {
        bool async;
 
        char const *bootstrap;
-       char const *topic;
        char const *schema;
 
        char const *stats_filename;
        FILE *stats_file;
 
        rd_kafka_t *rk;
-       rd_kafka_topic_t *rkt;
+
+       rbtree_t *rkt_by_name_tree;
 
        rlm_kafka_section_config_t authorize;
        rlm_kafka_section_config_t postauth;
@@ -118,7 +139,6 @@ static const CONF_PARSER stats_config[] = {
  */
 static const CONF_PARSER module_config[] = {
        { "bootstrap-servers", FR_CONF_OFFSET(PW_TYPE_STRING | PW_TYPE_REQUIRED, rlm_kafka_t, bootstrap), NULL },
-       { "topic", FR_CONF_OFFSET(PW_TYPE_STRING | PW_TYPE_REQUIRED, rlm_kafka_t, topic), NULL },
        { "asynchronous", FR_CONF_OFFSET(PW_TYPE_BOOLEAN, rlm_kafka_t, async), "no" },
        { "global-config", FR_CONF_POINTER(PW_TYPE_SUBSECTION, NULL), (void const*) global_config },
        { "topic-config", FR_CONF_POINTER(PW_TYPE_SUBSECTION, NULL), (void const*) topic_config },
@@ -276,7 +296,8 @@ static int kafka_produce(rlm_kafka_t *inst, UNUSED REQUEST *request, rd_kafka_to
 
                if (err != RD_KAFKA_RESP_ERR__QUEUE_FULL) break;
 
-               RERROR("Queue full: %s. Produce attempt %d/%d\n", rd_kafka_err2str(err), attempt, RETRIES + 1);
+               RERROR("Kafka queue full: %s. Produce attempt %d/%d\n",
+                      rd_kafka_err2str(err), attempt, RETRIES + 1);
                rd_kafka_poll(inst->rk, 1000);
        }
 
@@ -302,7 +323,8 @@ static int kafka_produce(rlm_kafka_t *inst, UNUSED REQUEST *request, rd_kafka_to
        }
 
        if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
-               RERROR("Failed to produce to topic: %s: %s\n", inst->topic, rd_kafka_err2str(err));
+               RERROR("Failed to produce to Kafka topic: %s: %s\n",
+                      rd_kafka_topic_name(rkt), rd_kafka_err2str(err));
                return -1;
        }
 
@@ -312,62 +334,95 @@ static int kafka_produce(rlm_kafka_t *inst, UNUSED REQUEST *request, rd_kafka_to
 #undef NO_DELIVERY_REPORT
 
 /*
- *  Either "%{kafka:&Key-Attr-Ref <message data>}" or "%{kafka:<space><message data>}"
+ *  Format is either:
+ *    - "%{kafka:<topic> &Key-Attr-Ref <message data>}", or
+ *    - "%{kafka:<topic>  <message data>}" (if no key)
  *
  */
 static ssize_t kafka_xlat(void *instance, REQUEST *request, char const *fmt, char *out, UNUSED size_t outlen)
 {
-       rlm_kafka_t     *inst = instance;
-       char const      *p = fmt;
-       uint8_t const   *key;
-       ssize_t         key_len = 0;
-       char            *expanded = NULL;
+       rlm_kafka_t             *inst = instance;
+       char const              *p = fmt;
+       ssize_t                 key_len = 0;
+       char                    *expanded = NULL;
+       char                    buf[256];
+       rlm_kafka_rkt_by_name_t *entry, my_topic;
 
        union {
                const uint8_t *key_const;
                char* const key_unconst;
        } k;
-       key = k.key_const = NULL;
+       k.key_const = NULL;
 
        *out = '\0';
 
-       if (*p == '&') {
-               char    key_ref[256];
+       /*
+        *  Extract and lookup the topic.
+        *
+        */
+       p = strchr(fmt, ' ');
+        if (!p || *fmt == ' ' || *(p+1) == '\0') {
+               REDEBUG("Kafka xlat must begin with a topic followed by the payload");
+               return -1;
+       }
+       if ((size_t)(p - fmt) >= sizeof(buf)) {
+               REDEBUG("Insufficient space to store Kafka topic name, needed %zu bytes, have %zu bytes",
+                       (p - fmt) + 1, sizeof(buf));
+               return -1;
+       }
+       strlcpy(buf, fmt, (p - fmt) + 1);
+       p++;
 
+       my_topic.name = buf;
+       entry = rbtree_finddata(inst->rkt_by_name_tree, &my_topic);
+       if (!entry || !entry->rkt) {
+               RWARN("No configuration section exists for kafka topic \"%s\"", buf);
+               return -1;
+       }
+
+       /*
+        *  Extract the key, if there is one, otherwise expect a space.
+        *
+        */
+       if (*p == '&') {
                p = strchr(fmt, ' ');
                if (!p) {
                        REDEBUG("Key attribute form requires a message after the key (&Key-Attr-Ref <message data>)");
                        return -1;
                }
 
-               if ((size_t)(p - fmt) >= sizeof(key_ref)) {
+               if ((size_t)(p - fmt) >= sizeof(buf)) {
                        REDEBUG("Insufficient space to store key attribute ref, needed %zu bytes, have %zu bytes",
-                               (p - fmt) + 1, sizeof(key_ref));
+                               (p - fmt) + 1, sizeof(buf));
                        return -1;
                }
-               strlcpy(key_ref, fmt, (p - fmt) + 1);
+               strlcpy(buf, fmt, (p - fmt) + 1);
 
-               key_len = xlat_fmt_to_ref(&key, request, key_ref);
+               key_len = xlat_fmt_to_ref(&k.key_const, request, buf);
                if (key_len < 0) return -1;
 
-               RDEBUG3("message key=%.*s\n", (int)key_len, key);
+               RDEBUG3("message key=%.*s\n", (int)key_len, k.key_const);
        } else if (*p != ' ') {
                /*
                 * Require a space to disambiguate data starting with "&"
                 *
                 */
-               REDEBUG("Must begin with an attribute reference or a space");
+               REDEBUG("Kafka payload must begin with an attribute reference or a space");
                return -1;
        }
 
        p++;
 
+       /*
+        *  The remainder is the message.
+        *
+        */
        if (radius_axlat(&expanded, request, p, NULL, NULL) < 0) {
                REDEBUG("Message expansion failed");
                return -1;
        }
 
-       kafka_produce(inst, request, inst->rkt, k.key_unconst, key_len, expanded, strlen(expanded), inst->async);
+       kafka_produce(inst, request, entry->rkt, k.key_unconst, key_len, expanded, strlen(expanded), inst->async);
 
        talloc_free(expanded);
 
@@ -388,20 +443,176 @@ static int mod_bootstrap(CONF_SECTION *conf, void *instance)
        return 0;
 }
 
+static int rkt_by_name_cmp(void const *one, void const *two)
+{
+       rlm_kafka_rkt_by_name_t const *a = (rlm_kafka_rkt_by_name_t const *) one;
+       rlm_kafka_rkt_by_name_t const *b = (rlm_kafka_rkt_by_name_t const *) two;
+
+       return strcmp(a->name, b->name);
+}
+
+static int destruct_entry(rlm_kafka_rkt_by_name_t *entry) {
+       /*
+        *  Destroy rkt only if we are the owner (not a reference)
+        *
+        */
+       if (!entry->ref && entry->rkt)
+               rd_kafka_topic_destroy(entry->rkt);
+       entry->rkt = NULL;
+       return 0;
+}
+
+static void free_rkt_by_name_entry(void *data)
+{
+       rlm_kafka_rkt_by_name_t *entry = (rlm_kafka_rkt_by_name_t *) data;
+       talloc_free(entry);
+}
+
+static int instantiate_topic(CONF_SECTION *cs, rlm_kafka_t *inst, char *errstr) {
+
+       CONF_PAIR               *cp;
+       rd_kafka_topic_conf_t   *tconf;
+       rd_kafka_topic_t        *rkt;
+       bool                    ref = false;
+       rlm_kafka_rkt_by_name_t *entry = NULL;
+       const char              *name = cf_section_name2(cs);
+
+       /*
+        *  Short circuit for when we are given a reference to an existing topic
+        *
+        */
+       cp = cf_pair_find_next(cs, NULL, NULL);
+       if (cp) {
+               char const *attr = cf_pair_attr(cp);
+               char const *value = cf_pair_value(cp);
+
+               if (strcmp(attr, "reference") == 0) {
+                       rlm_kafka_rkt_by_name_t my_topic;
+
+                       my_topic.name = value;
+                       entry = rbtree_finddata(inst->rkt_by_name_tree, &my_topic);
+                       if (!entry || !entry->rkt) {
+                               ERROR("Couldn't reference Kafka topic \"%s\" for \"%s\"",
+                                     value, cf_section_name2(cs));
+                               return -1;
+                       }
+                       if (cf_pair_find_next(cs, cp, NULL)) {
+                               ERROR("A reference for another Kafka topic must be the only attribute");
+                               return -1;
+                       }
+                       DEBUG3("Kafka topic \"%s\" configured as a reference to \"%s\"",
+                              cf_section_name2(cs), value);
+                       rkt = entry->rkt;
+                       ref = true;
+                       goto finalise;
+               }
+       }
+
+       /*
+        *  Configuration for the new topic
+        *
+        */
+       tconf = rd_kafka_topic_conf_new();
+
+       /*
+        *  When synchronous, don't block for longer than a typical request timeout.
+        *
+        *  Can be overridden by message.timeout.ms in the topic conf_section
+        */
+       if (!inst->async)
+               RLM_KAFKA_TOPIC_PROP_SET(tconf, "message.timeout.ms", "30000", errstr);
+
+       /*
+        *  Set topic properties from the topic conf_section
+        */
+       cp = NULL;
+       do {
+
+               cp = cf_pair_find_next(cs, cp, NULL);
+               if (cp) {
+                       char const *attr = cf_pair_attr(cp);
+                       char const *value = cf_pair_value(cp);
+                       if (strcmp(attr, "name") == 0) {  /* Override section name */
+                               name = value;
+                               continue;
+                       } else if (strcmp(attr, "reference") == 0) {
+                               ERROR("A reference for another Kafka topic must be the only attribute");
+                               rd_kafka_topic_conf_destroy(tconf);
+                               return -1;
+                       }
+                       RLM_KAFKA_TOPIC_PROP_SET(tconf, attr, value, errstr);
+               }
+       } while (cp != NULL);
+
+       /*
+        *  Show the topic configurations for debugging
+        */
+       if (rad_debug_lvl >= L_DBG_LVL_3) {
+               size_t          cnt, i;
+               const char      **arr;
+
+               DEBUG3("Configuration for Kafka topic \"%s\":", name);
+               for (i = 0, arr = rd_kafka_topic_conf_dump(tconf, &cnt); i < cnt; i += 2)
+                       DEBUG3("\t%s = %s", arr[i], arr[i + 1]);
+       }
+
+       /*
+        *  And create the topic according to the configuration.
+        *
+        *  Upon success, the rkt assumes responsibility for tconf
+        *
+        */
+       rkt = rd_kafka_topic_new(inst->rk, name, tconf);
+       if (!rkt) {
+               ERROR("Failed to create Kafka topic \"%s\"", name);
+               rd_kafka_topic_conf_destroy(tconf);
+               return -1;
+       }
+
+finalise:
+
+       /*
+        *  Finally insert the entry into the rbtree.
+        *
+        */
+       entry = talloc(NULL, rlm_kafka_rkt_by_name_t);
+       if (!entry)
+               return -1;
+       talloc_set_destructor(entry, destruct_entry);
+       entry->name = talloc_strdup(entry, cf_section_name2(cs));
+       if (!entry->name) {
+       fail:
+               talloc_free(entry);
+               return -1;
+       }
+       entry->rkt = rkt;
+       entry->ref = ref;
+
+       if (!rbtree_insert(inst->rkt_by_name_tree, entry))
+               goto fail;
+
+       DEBUG("Created Kafka topic for \"%s\"", name);
+
+       return 0;
+
+}
+
+static inline void set_section_rkt(rlm_kafka_t *inst, rlm_kafka_section_config_t *section)
+{
+       rlm_kafka_rkt_by_name_t *entry, my_topic;
+
+       my_topic.name = cf_section_name1(section->cs);
+       entry = rbtree_finddata(inst->rkt_by_name_tree, &my_topic);
+       section->rkt = entry && entry->rkt ? entry->rkt : NULL;
+}
+
 static int mod_instantiate(CONF_SECTION *conf, void *instance)
 {
        rlm_kafka_t             *inst = instance;
        rd_kafka_conf_t         *kconf;
-       rd_kafka_topic_conf_t   *tconf;
        char                    errstr[512];
        CONF_PAIR               *cp = NULL;
-
-       /*
-        *  Capture a self-reference for the sections
-        */
-       inst->authorize.cs = cf_section_sub_find(conf, "authorize");
-       inst->postauth.cs = cf_section_sub_find(conf, "post-auth");
-       inst->accounting.cs = cf_section_sub_find(conf, "accounting");
+       CONF_SECTION            *cs;
 
        /*
         *  Configuration for the global producer
@@ -410,20 +621,20 @@ static int mod_instantiate(CONF_SECTION *conf, void *instance)
 
        rd_kafka_conf_set_opaque(kconf, inst);
 
-       DEBUG3("Registering logging callback");
+       DEBUG3("Registering Kafka logging callback");
        rd_kafka_conf_set_log_cb(kconf, log_cb);
 
-       DEBUG3("Registering delivery report callback");
+       DEBUG3("Registering Kafka delivery report callback");
        rd_kafka_conf_set_dr_msg_cb(kconf, dr_msg_cb);
 
        if (inst->stats_filename) {
-               DEBUG3("Opening statistics file for writing: %s", inst->stats_filename);
+               DEBUG3("Opening Kafka statistics file for writing: %s", inst->stats_filename);
                inst->stats_file = fopen(inst->stats_filename, "a");
                if (!inst->stats_file) {
-                       ERROR("Error opening statistics file: %s", inst->stats_filename);
+                       ERROR("Error opening Kafka statistics file: %s", inst->stats_filename);
                        /* Carry on, just don't log stats */
                } else {
-                       DEBUG3("Registering statistics callback");
+                       DEBUG3("Registering Kafka statistics callback");
                        rd_kafka_conf_set_stats_cb(kconf, stats_cb);
                }
        }
@@ -492,61 +703,50 @@ static int mod_instantiate(CONF_SECTION *conf, void *instance)
         */
        inst->rk = rd_kafka_new(RD_KAFKA_PRODUCER, kconf, errstr, sizeof(errstr));
        if (!inst->rk) {
-               ERROR("Failed to create new producer: %s\n", errstr);
+               ERROR("Failed to create new Kafka producer: %s\n", errstr);
                rd_kafka_conf_destroy(kconf);
                return -1;
        }
 
        /*
-        *  Configuration for the topic
+        *  Instantiate a topic for each named topic-config section
         *
         */
-       tconf = rd_kafka_topic_conf_new();
+       inst->rkt_by_name_tree = rbtree_create(instance, rkt_by_name_cmp, free_rkt_by_name_entry, 0);
+       if (!inst->rkt_by_name_tree) return -1;
 
-       /*
-        *  When synchronous, don't block for longer than a typical request timeout.
-        *
-        *  Can be overridden by message.timeout.ms in the topic conf_section
-        */
-       if (!inst->async)
-               RLM_KAFKA_TOPIC_PROP_SET(tconf, "message.timeout.ms", "30000", errstr);
+       for (cs = cf_subsection_find_next(conf, NULL, "topic-config");
+               cs != NULL;
+               cs = cf_subsection_find_next(conf, cs, "topic-config")) {
 
-       /*
-        *  Set topic properties from the topic conf_section
-        */
-       do {
-               CONF_SECTION    *tc = cf_section_sub_find(conf, "topic-config");
+               if (!cf_section_name2(cs)) {
+                       WARN("Ignoring unnamed Kafka topic-config");
+                       continue;
+               }
 
-               cp = cf_pair_find_next(tc, cp, NULL);
-               if (cp) {
-                       char const *attr = cf_pair_attr(cp);
-                       char const *value = cf_pair_value(cp);
-                       RLM_KAFKA_TOPIC_PROP_SET(tconf, attr, value, errstr);
+               if (instantiate_topic(cs, inst, errstr) != 0) {
+                       ERROR("Failed to instantiate new Kafka topic for %s\n",
+                             cf_section_name2(cs));
+                       rbtree_free(inst->rkt_by_name_tree);
+                       rd_kafka_destroy(inst->rk);
+                       return -1;
                }
-       } while (cp != NULL);
+
+       }
 
        /*
-        *  Show the topic configurations for debugging
+        *  Capture a self-reference for the sections
         */
-       if (rad_debug_lvl >= L_DBG_LVL_3) {
-               size_t          cnt, i;
-               const char      **arr;
-
-               DEBUG3("Topic configuration:");
-               for (i = 0, arr = rd_kafka_topic_conf_dump(tconf, &cnt); i < cnt; i += 2)
-                       DEBUG3("\t%s = %s", arr[i], arr[i + 1]);
-       }
+       inst->authorize.cs = cf_section_sub_find(conf, "authorize");
+       inst->postauth.cs = cf_section_sub_find(conf, "post-auth");
+       inst->accounting.cs = cf_section_sub_find(conf, "accounting");
 
        /*
-        *  And create the topic according to the configuration
+        *  Set the rkt for each section, where such configuration exists
         */
-       inst->rkt = rd_kafka_topic_new(inst->rk, inst->topic, tconf);
-       if (!inst->rkt) {
-               ERROR("Failed to create new topic: %s\n", errstr);
-               rd_kafka_topic_conf_destroy(tconf);
-               rd_kafka_destroy(inst->rk);
-               return -1;
-       }
+       set_section_rkt(inst, &inst->authorize);
+       set_section_rkt(inst, &inst->postauth);
+       set_section_rkt(inst, &inst->accounting);
 
        return 0;
 }
@@ -557,15 +757,16 @@ static int mod_detach(UNUSED void *instance)
        rlm_kafka_t *inst = instance;
 
        if (inst->stats_file) {
-               DEBUG3("Closing statistics file");
+               DEBUG3("Closing Kafka statistics file");
                fclose(inst->stats_file);
        }
 
-       DEBUG3("Flushing");
+       DEBUG3("Flushing Kafka queues");
        if ((err = rd_kafka_flush(inst->rk, 10*1000)) == RD_KAFKA_RESP_ERR__TIMED_OUT)
                ERROR("Flush failed: %s\n", rd_kafka_err2str(err));
 
-       rd_kafka_topic_destroy(inst->rkt);
+       rbtree_free(inst->rkt_by_name_tree);
+
        rd_kafka_destroy(inst->rk);
 
        return 0;
@@ -577,15 +778,21 @@ static int mod_detach(UNUSED void *instance)
 static rlm_rcode_t CC_HINT(nonnull) kafka_common(void *instance, REQUEST *request, rlm_kafka_section_config_t *section)
 {
 
-       rlm_kafka_t     *inst = instance;
-       char            *key = NULL;
-       char            *message = NULL;
-       CONF_ITEM       *item;
-       CONF_PAIR       *cp;
-       const char      *schema;
-       rlm_rcode_t     ret = RLM_MODULE_OK;
-       char            path[MAX_STRING_LEN];
-       char            *p = path;
+       rlm_kafka_t             *inst = instance;
+       char                    *key = NULL;
+       char                    *message = NULL;
+       CONF_ITEM               *item;
+       CONF_PAIR               *cp;
+       const char              *schema;
+       rlm_rcode_t             ret = RLM_MODULE_OK;
+       char                    path[MAX_STRING_LEN];
+       char                    *p = path;
+
+       if (!section->rkt) {
+               RWARN("No configuration exists for Kafka topic for %s section",
+                       cf_section_name1(section->cs));
+               return RLM_MODULE_NOOP;
+       }
 
        if (section->reference[0] != '.') {
                *p++ = '.';
@@ -619,7 +826,7 @@ static rlm_rcode_t CC_HINT(nonnull) kafka_common(void *instance, REQUEST *reques
                return RLM_MODULE_FAIL;
        }
 
-       if (kafka_produce(inst, request, inst->rkt,
+       if (kafka_produce(inst, request, section->rkt,
                          key, strlen(key), message, strlen(message),
                          inst->async
                         ) != 0)
index 791624c28a2e6de7291cfd9b5f922eb1401aa68f..8e66908f8f67cfe93a0ae339fec815edbd07e1ae 100644 (file)
@@ -10,10 +10,31 @@ kafka {
        topic = "radius"
 
        global-config {
-               test.mock.num.brokers = 3
+               test.mock.num.brokers = 1
        }
 
-       topic-config {
+       topic-config custom-topic {
+               message.timeout.ms = 1000
+       }
+
+       topic-config accounting {
+               request.required.acks = 1
+       }
+
+       topic-config authorize {
+               name = "radius.request"
+       }
+
+       topic-config post-auth {
+               reference = authorize
+       }
+
+       accounting {
+               key = "%{Acct-Unique-Session-Id}"
+               reference = "messages.%{%{Acct-Status-Type}:-unknown}"
+               messages {
+                       Start = "%{json_encode:&request:[*]}"
+               }
        }
 
        authorize {
@@ -29,12 +50,4 @@ kafka {
                }
        }
 
-       accounting {
-               key = "%{Acct-Unique-Session-Id}"
-               reference = "messages.%{%{Acct-Status-Type}:-unknown}"
-               messages {
-                       Start = "%{json_encode:&request:[*]}"
-               }
-       }
-
 }
index ed34e4a4cee29aca540c6d331e25b95b94022219..de3b9668198470eff597121d8f63ad93edcf288c 100644 (file)
 #
-#  Bare message
+#  Call from accounting, configured as regular topic
+#
+kafka.accounting
+if (&Module-Failure-Message) {
+       test_fail
+}
+
+
+#
+#  Call from authorize, configured with custom published name
+#
+kafka.authorize
+if (&Module-Failure-Message) {
+       test_fail
+}
+
+
+#
+#  Call from post-auth, configured as reference to accounting
+#
+kafka.accounting
+if (&Module-Failure-Message) {
+       test_fail
+}
+
+
+#
+#  Message with key
 #
 update {
        &Module-Failure-Message !* ANY
 }
 
-"%{kafka: Test message}"
+"%{kafka:custom-topic &User-Name Test mmessage}"
+if (&Module-Failure-Message) {
+       test_fail
+}
+
+
+#
+#  Bare message, with no key
+#
+update {
+       &Module-Failure-Message !* ANY
+}
 
+"%{kafka:custom-topic  Test message}"
 if (&Module-Failure-Message) {
        test_fail
 }
 
 
 #
-#  Message with key
+#  No key and empty message is useless, but still acceptable to Kafka
 #
 update {
        &Module-Failure-Message !* ANY
 }
 
-"%{kafka:&User-Name Test mmessage}"
+"%{kafka:custom-topic  }"
 if (&Module-Failure-Message) {
        test_fail
 }
 
 
 #
-#  Nonsense
+#  Missing topic
+#
+update {
+       &Module-Failure-Message !* ANY
+}
+
+"%{kafka: &User-Name Topic is missing here}"
+if (!(&Module-Failure-Message[*] == "Kafka xlat must begin with a topic followed by the payload")) {
+       test_fail
+}
+
+
+#
+#  Lone topic
+#
+update {
+       &Module-Failure-Message !* ANY
+}
+
+"%{kafka:custom-topic}"
+if (!(&Module-Failure-Message[*] == "Kafka xlat must begin with a topic followed by the payload")) {
+       test_fail
+}
+
+
+#
+#  No payload
+#
+update {
+       &Module-Failure-Message !* ANY
+}
+
+"%{kafka:custom-topic }"
+if (!(&Module-Failure-Message[*] == "Kafka xlat must begin with a topic followed by the payload")) {
+       test_fail
+}
+
+
+#
+#  Nonsense payload
 #
 update {
        &Module-Failure-Message !* ANY
 }
 
-"%{kafka:Test message not beginning with a space}"
-if (!(&Module-Failure-Message[*] == "Must begin with an attribute reference or a space")) {
+"%{kafka:custom-topic Test message not preceeded by Key-Attr-Ref or two spaces}"
+if (!(&Module-Failure-Message[*] == "Kafka payload must begin with an attribute reference or a space")) {
        test_fail
 }