]> git.ipfire.org Git - thirdparty/freeradius-server.git/commitdiff
Add support for Kafka headers, via sections and xlats
authorTerry Burton <tez@terryburton.co.uk>
Thu, 3 Apr 2025 18:40:47 +0000 (19:40 +0100)
committerMatthew Newton <matthew-git@newtoncomputing.co.uk>
Thu, 1 May 2025 15:57:52 +0000 (16:57 +0100)
raddb/mods-available/kafka
raddb/mods-available/kafka_async
raddb/mods-config/kafka/messages-json.conf
src/modules/rlm_kafka/rlm_kafka.c
src/tests/modules/kafka/xlat.unlang

index bfbc557202c71b35dfe242ad5a7c796e33037601..115a8b0628a5351531dc777d47d3eba7c90ec63a 100644 (file)
@@ -32,18 +32,25 @@ kafka {
 
        }
 
-
-        #
-        #  A user-defined topic called by reference or by using its name (which may
-        #  be changed) in an xlat of either form:
-        #
-        #    - "%{kafka:custom-topic &Key-Attr-Ref Remainder is the message}"
-        #    - "%{kafka:custom-topic  Remainder is the message}", if no key
-        #
-        #  The attribute referred to by &Key-Attr-Ref holds the Kafka message's key.
-        #  Notice two spaces leading the message in case there is no key.
-        #
-        topic-config custom-topic {
+       #
+       #  Example of a user-defined topic called by reference from another
+       #  section, or by using its name (which may be changed) in an xlat of
+       #  either form:
+       #
+       #    - "%{kafka:custom-topic (<header-list>) &Key-Attr-Ref Remainder is the message}"
+       #    - "%{kafka:custom-topic (<header-list>)  Remainder is the message}", if no key
+       #    - "%{kafka:custom-topic &Key-Attr-Ref Remainder is the message}"
+       #    - "%{kafka:custom-topic  Remainder is the message}", if no key
+       #
+       #  The attribute referred to by the optional &Key-Attr-Ref holds the
+       #  Kafka message's key. Notice that it is replaced with a mandatory
+       #  space if there is no key.
+       #
+       #  <header-list> has the same format as the json_encode xlat. For
+       #  example `&request[*] !&User-Password` creates Kafka headers from all
+       #  attributes in the RADIUS request, except for `User-Password`.
+       #
+       topic-config custom-topic {
 
                #
                #  The message durabily strategy: Number of acks required by all
index b25f7432fc356bb2d0ef0c7c89fd901db06ef121..f7bb454ce0c53817b75cb0b2cda3aec9f9e2f1a3 100644 (file)
@@ -58,14 +58,22 @@ kafka kafka_async {
        }
 
        #
-       #  A user-defined topic called by reference or by using its name (which may
-       #  be changed) in an xlat of either form:
+       #  Example of a user-defined topic called by reference from another
+       #  section, or by using its name (which may be changed) in an xlat of
+       #  either form:
        #
+       #    - "%{kafka_async:custom-topic (<header-list>) &Key-Attr-Ref Remainder is the message}"
+       #    - "%{kafka_async:custom-topic (<header-list>)  Remainder is the message}", if no key
        #    - "%{kafka_async:custom-topic &Key-Attr-Ref Remainder is the message}"
        #    - "%{kafka_async:custom-topic  Remainder is the message}", if no key
        #
-       #  The attribute referred to by &Key-Attr-Ref holds the Kafka message's key.
-       #  Notice two spaces leading the message in case there is no key.
+       #  The attribute referred to by the optional &Key-Attr-Ref holds the
+       #  Kafka message's key. Notice that it is replaced with a mandatory
+       #  space if there is no key.
+       #
+       #  <header-list> has the same format as the json_encode xlat. For
+       #  example `&request[*] !&User-Password` creates Kafka headers from all
+       #  attributes in the RADIUS request, except for `User-Password`.
        #
        topic-config custom-topic {
 
index 16ebfaf7d988f32067fae27de5059c077550da88..9e39f1d1a24407f5e2afcb59f42e0db1880eaa25 100644 (file)
@@ -1,11 +1,23 @@
 #
-#  To use the json_encode xlats in the schemas below, you must first enable the
-#  json module, and may probably want to set json.encode.output_mode = object_simple
+#  key:       An optional key for the message.
+#
+#  headers:   An optional specification for which attributes should
+#             be set as Kafka headers, with their values encoded as strings.
+#
+#  message:   The message itself.
+#
+#  reference: Expanded to a configuration item reference that contains the
+#             message.
+#
+#  To use the json_encode xlats in the schemas examples below, you must first
+#  enable the json module, and may probably want to set
+#  json.encode.output_mode = object_simple
 #
 
 authorize {
 
        key = "%{User-Name}"
+       headers = "&Calling-Station-Id"
        message = "%{json_encode:&request:[*]}"
 
 }
@@ -13,7 +25,7 @@ authorize {
 post-auth {
 
        key = "%{User-Name}"
-
+       headers = "&reply:Framed-IP-Address"
        reference = "messages.%{%{reply:Packet-Type}:-default}"
 
        messages {
@@ -29,6 +41,7 @@ post-auth {
 accounting {
 
        key = "%{Acct-Unique-Session-Id}"
+       headers = "&Acct-Status-Type &NAS-Identifier"
 
        reference = "messages.%{%{Acct-Status-Type}:-default}"
 
index 3f7d96596832bcb15fcd7d54d25de38d7db4e69a..b957309172cb37e0a515023f7a7a5cfdd41d0359 100644 (file)
@@ -29,6 +29,7 @@ RCSID("$Id$")
 #include <freeradius-devel/modules.h>
 #include <freeradius-devel/rad_assert.h>
 #include <librdkafka/rdkafka.h>
+#include <ctype.h>
 
 #define LOG_PREFIX "rlm_kafka"
 
@@ -49,6 +50,7 @@ typedef struct rlm_kafka_section_config {
        CONF_SECTION *cs;
        char const *reference;
        char const *key;
+       char const *headers;
 
        /*
         *  Topic handle to avoid rbtree lookups for section-based calls
@@ -104,18 +106,21 @@ static const CONF_PARSER topic_config[] = {
 static const CONF_PARSER authorize_config[] = {
        { "reference", FR_CONF_OFFSET(PW_TYPE_STRING | PW_TYPE_XLAT, rlm_kafka_t, authorize.reference), ".message" },
        { "key", FR_CONF_OFFSET(PW_TYPE_STRING | PW_TYPE_XLAT, rlm_kafka_t, authorize.key), NULL},
+       { "headers", FR_CONF_OFFSET(PW_TYPE_STRING | PW_TYPE_XLAT, rlm_kafka_t, authorize.headers), NULL},
        CONF_PARSER_TERMINATOR
 };
 
 static const CONF_PARSER postauth_config[] = {
        { "reference", FR_CONF_OFFSET(PW_TYPE_STRING | PW_TYPE_XLAT, rlm_kafka_t, postauth.reference), ".message" },
        { "key", FR_CONF_OFFSET(PW_TYPE_STRING | PW_TYPE_XLAT, rlm_kafka_t, postauth.key), NULL},
+       { "headers", FR_CONF_OFFSET(PW_TYPE_STRING | PW_TYPE_XLAT, rlm_kafka_t, postauth.headers), NULL},
        CONF_PARSER_TERMINATOR
 };
 
 static const CONF_PARSER accounting_config[] = {
        { "reference", FR_CONF_OFFSET(PW_TYPE_STRING | PW_TYPE_XLAT, rlm_kafka_t, accounting.reference), ".message" },
        { "key", FR_CONF_OFFSET(PW_TYPE_STRING | PW_TYPE_XLAT, rlm_kafka_t, accounting.key), NULL},
+       { "headers", FR_CONF_OFFSET(PW_TYPE_STRING | PW_TYPE_XLAT, rlm_kafka_t, accounting.headers), NULL},
        CONF_PARSER_TERMINATOR
 };
 
@@ -260,7 +265,7 @@ static void log_cb(const rd_kafka_t *rk, int level, UNUSED const char *facility,
 #define RETRIES 2
 static int kafka_produce(rlm_kafka_t *inst, UNUSED REQUEST *request, rd_kafka_topic_t *rkt,
                         char* const key, const size_t key_len, char* const message, const size_t len,
-                        const bool async) {
+                        rd_kafka_headers_t *hdrs, const bool async) {
 
        rd_kafka_resp_err_t     status = NO_DELIVERY_REPORT;
        rd_kafka_resp_err_t     err;
@@ -290,6 +295,7 @@ static int kafka_produce(rlm_kafka_t *inst, UNUSED REQUEST *request, rd_kafka_to
                                        RD_KAFKA_V_MSGFLAGS(async ? RD_KAFKA_MSG_F_COPY : 0),
                                        RD_KAFKA_V_KEY(key, key_len),
                                        RD_KAFKA_V_VALUE(message, len),
+                                       RD_KAFKA_V_HEADERS(hdrs),
                                        RD_KAFKA_V_OPAQUE(async ? NULL : &status),
                                        RD_KAFKA_V_END
                                       );
@@ -301,6 +307,13 @@ static int kafka_produce(rlm_kafka_t *inst, UNUSED REQUEST *request, rd_kafka_to
                rd_kafka_poll(inst->rk, 1000);
        }
 
+       /*
+        *  If rd_kafka_producev() failed then we still own any headers.
+        *
+        */
+       if (err != RD_KAFKA_RESP_ERR_NO_ERROR && hdrs)
+               rd_kafka_headers_destroy(hdrs);
+
        /*
         *  If the delivery is specified as synchronous and we did not
         *  encounter an immediate error when producing to the queue then
@@ -333,20 +346,139 @@ static int kafka_produce(rlm_kafka_t *inst, UNUSED REQUEST *request, rd_kafka_to
 }
 #undef NO_DELIVERY_REPORT
 
+static int create_headers(REQUEST *request, const char *in, rd_kafka_headers_t **out)
+{
+       rd_kafka_headers_t      *hdrs = NULL;
+       const char              *p;
+       VALUE_PAIR              *header_vps = NULL, *vps, *vp;
+       int                     num_vps;
+       vp_cursor_t             cursor;
+
+       if (!in)
+               return 0;
+
+       /*
+        *  Decode the headers string to derive a set of VPs from which to
+        *  create Kafka headers
+        *
+        */
+       p = in;
+
+       while (isspace((uint8_t) *p)) p++;
+       if (*p == '\0') return -1;
+
+       while (*p) {
+               bool            negate = false;
+               vp_tmpl_t       *vpt = NULL;
+               ssize_t         slen;
+
+               while (isspace((uint8_t) *p)) p++;
+
+               if (*p == '\0') break;
+
+               /* Check if we should be removing attributes */
+               if (*p == '!') {
+                       p++;
+                       negate = true;
+               }
+
+               if (*p == '\0') {
+                       /* May happen e.g. with '!' on its own at the end */
+                       REMARKER(in, (p - in), "Missing attribute name");
+               error:
+                       fr_pair_list_free(&header_vps);
+                       talloc_free(vpt);
+                       return -1;
+               }
+
+               /* Decode next attr template */
+               slen = tmpl_afrom_attr_substr(request, &vpt, p, REQUEST_CURRENT, PAIR_LIST_REQUEST, false, false);
+
+               if (slen <= 0) {
+                       REMARKER(in, (p - in) - slen, fr_strerror());
+                       goto error;
+               }
+
+               /*
+                * Get attributes from the template.
+                * Missing attribute isn't an error (so -1, not 0).
+                */
+               if (tmpl_copy_vps(request, &vps, request, vpt) < -1) {
+                       REDEBUG("Error copying attributes");
+                       goto error;
+               }
+
+               if (negate) {
+                       /* Remove all template attributes from header list */
+                       for (vp = vps; vp; vp = vp->next)
+                               fr_pair_delete_by_da(&header_vps, vp->da);
+
+                       fr_pair_list_free(&vps);
+               } else {
+                       /* Add template VPs to header list */
+                       fr_pair_add(&header_vps, vps);
+               }
+
+               TALLOC_FREE(vpt);
+
+               /* Jump forward to next attr */
+               p += slen;
+
+               if (*p != '\0' && !isspace((uint8_t)*p)) {
+                       REMARKER(in, (p - in), "Missing whitespace");
+                       goto error;
+               }
+
+       }
+
+       /*
+        *  Create the Kafka headers for the derived VPs
+        *
+        */
+       for (vp = header_vps, num_vps = 0; vp; vp = vp->next, num_vps++);
+       if (num_vps == 0) return 0;
+
+       hdrs = rd_kafka_headers_new(num_vps);
+       for (vp = fr_cursor_init(&cursor, &header_vps);
+            vp;
+            vp = fr_cursor_next(&cursor)) {
+               rd_kafka_resp_err_t err;
+               const char *attr = vp->da->name;
+               char *value;
+
+               value = vp_aprints_value(NULL, vp, '\0');
+               err = rd_kafka_header_add(hdrs, attr, -1, value, -1);
+               talloc_free(value);
+               if (err) {
+                       rd_kafka_headers_destroy(hdrs);
+                       return -1;
+               }
+       }
+
+       *out = hdrs;
+       return 0;
+
+}
+
 /*
- *  Format is either:
- *    - "%{kafka:<topic> &Key-Attr-Ref <message data>}", or
- *    - "%{kafka:<topic>  <message data>}" (if no key)
+ *  Format is one of the following:
+ *
+ *  - "%{kafka:<topic> (<header-list>) &Key-Attr-Ref <message data>}"
+ *  - "%{kafka:<topic> (<header-list>)  <message data>}"  (no key => space)
+ *  - "%{kafka:<topic> &Key-Attr-Ref <message data>}"
+ *  - "%{kafka:<topic>  <message data>}"                  (no key => space)
  *
  */
 static ssize_t kafka_xlat(void *instance, REQUEST *request, char const *fmt, char *out, UNUSED size_t outlen)
 {
        rlm_kafka_t             *inst = instance;
-       char const              *p = fmt;
+       char const              *p = fmt, *q;
        ssize_t                 key_len = 0;
        char                    *expanded = NULL;
+       char                    *headers = NULL;
        char                    buf[256];
        rlm_kafka_rkt_by_name_t *entry, my_topic;
+       rd_kafka_headers_t      *hdrs = NULL;
 
        union {
                const uint8_t *key_const;
@@ -362,13 +494,16 @@ static ssize_t kafka_xlat(void *instance, REQUEST *request, char const *fmt, cha
         */
        p = strchr(fmt, ' ');
         if (!p || *fmt == ' ' || *(p+1) == '\0') {
-               REDEBUG("Kafka xlat must begin with a topic followed by the payload");
+               REDEBUG("Kafka xlat must begin with a topic, optionally followed by headers, then the payload");
+error:
+               talloc_free(expanded);
+               if (hdrs) rd_kafka_headers_destroy(hdrs);
                return -1;
        }
        if ((size_t)(p - fmt) >= sizeof(buf)) {
                REDEBUG("Insufficient space to store Kafka topic name, needed %zu bytes, have %zu bytes",
                        (p - fmt) + 1, sizeof(buf));
-               return -1;
+               goto error;
        }
        strlcpy(buf, fmt, (p - fmt) + 1);
        p++;
@@ -377,29 +512,56 @@ static ssize_t kafka_xlat(void *instance, REQUEST *request, char const *fmt, cha
        entry = rbtree_finddata(inst->rkt_by_name_tree, &my_topic);
        if (!entry || !entry->rkt) {
                RWARN("No configuration section exists for kafka topic \"%s\"", buf);
-               return -1;
+               goto error;
+       }
+
+       /*
+        *  Extract the header specification, and generate the headers
+        *
+        */
+       q = p;
+       if (*p == '(') {
+               p = strchr(p, ')');
+               if (!p) {
+                       REDEBUG("Header list is missing closing parenthesis)");
+                       goto error;
+               }
+               MEM(headers = talloc_strndup(NULL, q + 1, p - q - 1));
+               if (*headers && create_headers(request, headers, &hdrs) < 0) {
+                       RDEBUG3("Failed to create headers");
+                       talloc_free(headers);
+                       goto error;
+               }
+               talloc_free(headers);
+               p++;
+               if (*p != ' ') {
+                       REDEBUG("Kafka xlat must begin with a topic, optionally followed by headers, then the payload");
+                       goto error;
+               }
+               p++;
        }
 
        /*
         *  Extract the key, if there is one, otherwise expect a space.
         *
         */
+       q = p;
        if (*p == '&') {
-               p = strchr(fmt, ' ');
+               p = strchr(p, ' ');
                if (!p) {
-                       REDEBUG("Key attribute form requires a message after the key (&Key-Attr-Ref <message data>)");
-                       return -1;
+                       REDEBUG("Key attribute form requires a message after the key (... &Key-Attr-Ref <message data>)");
+                       goto error;
                }
 
-               if ((size_t)(p - fmt) >= sizeof(buf)) {
+               if ((size_t)(p - q) >= sizeof(buf)) {
                        REDEBUG("Insufficient space to store key attribute ref, needed %zu bytes, have %zu bytes",
-                               (p - fmt) + 1, sizeof(buf));
-                       return -1;
+                               (p - q) + 1, sizeof(buf));
+                       goto error;
                }
-               strlcpy(buf, fmt, (p - fmt) + 1);
+               strlcpy(buf, q, (p - q) + 1);
 
                key_len = xlat_fmt_to_ref(&k.key_const, request, buf);
-               if (key_len < 0) return -1;
+               if (key_len < 0) goto error;
 
                RDEBUG3("message key=%.*s\n", (int)key_len, k.key_const);
        } else if (*p != ' ') {
@@ -408,7 +570,7 @@ static ssize_t kafka_xlat(void *instance, REQUEST *request, char const *fmt, cha
                 *
                 */
                REDEBUG("Kafka payload must begin with an attribute reference or a space");
-               return -1;
+               goto error;
        }
 
        p++;
@@ -419,10 +581,11 @@ static ssize_t kafka_xlat(void *instance, REQUEST *request, char const *fmt, cha
         */
        if (radius_axlat(&expanded, request, p, NULL, NULL) < 0) {
                REDEBUG("Message expansion failed");
-               return -1;
+               goto error;
        }
 
-       kafka_produce(inst, request, entry->rkt, k.key_unconst, key_len, expanded, strlen(expanded), inst->async);
+       kafka_produce(inst, request, entry->rkt, k.key_unconst, key_len,
+                     expanded, strlen(expanded), hdrs, inst->async);
 
        talloc_free(expanded);
 
@@ -753,8 +916,8 @@ static int mod_instantiate(CONF_SECTION *conf, void *instance)
 
 static int mod_detach(UNUSED void *instance)
 {
-       rd_kafka_resp_err_t err;
-       rlm_kafka_t *inst = instance;
+       rd_kafka_resp_err_t     err;
+       rlm_kafka_t             *inst = instance;
 
        if (inst->stats_file) {
                DEBUG3("Closing Kafka statistics file");
@@ -781,6 +944,7 @@ static rlm_rcode_t CC_HINT(nonnull) kafka_common(void *instance, REQUEST *reques
        rlm_kafka_t             *inst = instance;
        char                    *key = NULL;
        char                    *message = NULL;
+       rd_kafka_headers_t      *hdrs = NULL;
        CONF_ITEM               *item;
        CONF_PAIR               *cp;
        const char              *schema;
@@ -820,15 +984,26 @@ static rlm_rcode_t CC_HINT(nonnull) kafka_common(void *instance, REQUEST *reques
                return RLM_MODULE_FAIL;
        }
 
-       if (radius_axlat(&key, request, section->key, NULL, NULL) < 0) {
-               RDEBUG3("Failed to expand key");
-               talloc_free(message);
-               return RLM_MODULE_FAIL;
+       if (section->key) {
+               if (radius_axlat(&key, request, section->key, NULL, NULL) < 0) {
+                       RDEBUG3("Failed to expand key");
+                       talloc_free(message);
+                       return RLM_MODULE_FAIL;
+               }
+       }
+
+       if (section->headers) {
+               if (create_headers(request, section->headers, &hdrs) < 0) {
+                       RDEBUG3("Failed to create headers");
+                       talloc_free(message);
+                       return RLM_MODULE_FAIL;
+               }
        }
 
        if (kafka_produce(inst, request, section->rkt,
-                         key, strlen(key), message, strlen(message),
-                         inst->async
+                         key, key ? strlen(key) : 0,
+                         message, strlen(message),
+                         hdrs, inst->async
                         ) != 0)
                ret = RLM_MODULE_FAIL;
 
index de3b9668198470eff597121d8f63ad93edcf288c..da24c407a746650f00218df0a9cb50f323989b3b 100644 (file)
@@ -38,6 +38,19 @@ if (&Module-Failure-Message) {
 }
 
 
+#
+#  Message with key and headers
+#
+update {
+       &Module-Failure-Message !* ANY
+}
+
+"%{kafka:custom-topic (&NAS-IP-Address) &User-Name Test mmessage}"
+if (&Module-Failure-Message) {
+       test_fail
+}
+
+
 #
 #  Bare message, with no key
 #
@@ -51,6 +64,19 @@ if (&Module-Failure-Message) {
 }
 
 
+#
+#  Bare message with headers, but no key
+#
+update {
+       &Module-Failure-Message !* ANY
+}
+
+"%{kafka:custom-topic (&NAS-IP-Address)  Test message}"
+if (&Module-Failure-Message) {
+       test_fail
+}
+
+
 #
 #  No key and empty message is useless, but still acceptable to Kafka
 #
@@ -72,7 +98,7 @@ update {
 }
 
 "%{kafka: &User-Name Topic is missing here}"
-if (!(&Module-Failure-Message[*] == "Kafka xlat must begin with a topic followed by the payload")) {
+if (!(&Module-Failure-Message[*] == "Kafka xlat must begin with a topic, optionally followed by headers, then the payload")) {
        test_fail
 }
 
@@ -85,20 +111,59 @@ update {
 }
 
 "%{kafka:custom-topic}"
-if (!(&Module-Failure-Message[*] == "Kafka xlat must begin with a topic followed by the payload")) {
+if (!(&Module-Failure-Message[*] == "Kafka xlat must begin with a topic, optionally followed by headers, then the payload")) {
+       test_fail
+}
+
+
+#
+#  Lone topic and headers
+#
+update {
+       &Module-Failure-Message !* ANY
+}
+
+"%{kafka:custom-topic (&NAS-IP-Address)}"
+if (!(&Module-Failure-Message[*] == "Kafka xlat must begin with a topic, optionally followed by headers, then the payload")) {
        test_fail
 }
 
 
 #
-#  No payload
+#  Headers missing closing parenthesis
+#
+update {
+       &Module-Failure-Message !* ANY
+}
+
+"%{kafka:custom-topic (&NAS-IP-Address  Test message}"
+if (!(&Module-Failure-Message[*] == "Header list is missing closing parenthesis)")) {
+       test_fail
+}
+
+
+#
+#  Topic with no payload
 #
 update {
        &Module-Failure-Message !* ANY
 }
 
 "%{kafka:custom-topic }"
-if (!(&Module-Failure-Message[*] == "Kafka xlat must begin with a topic followed by the payload")) {
+if (!(&Module-Failure-Message[*] == "Kafka xlat must begin with a topic, optionally followed by headers, then the payload")) {
+       test_fail
+}
+
+
+#
+#  Topic and headers with no payload
+#
+update {
+       &Module-Failure-Message !* ANY
+}
+
+"%{kafka:custom-topic (&NAS-IP-Address)}"
+if (!(&Module-Failure-Message[*] == "Kafka xlat must begin with a topic, optionally followed by headers, then the payload")) {
        test_fail
 }
 
@@ -116,4 +181,17 @@ if (!(&Module-Failure-Message[*] == "Kafka payload must begin with an attribute
 }
 
 
+#
+#  Nonsense payload, with headers
+#
+update {
+       &Module-Failure-Message !* ANY
+}
+
+"%{kafka:custom-topic (&NAS-IP-Address) Test message not preceeded by Key-Attr-Ref or two spaces}"
+if (!(&Module-Failure-Message[*] == "Kafka payload must begin with an attribute reference or a space")) {
+       test_fail
+}
+
+
 test_pass