#include <freeradius-devel/modules.h>
#include <freeradius-devel/rad_assert.h>
#include <librdkafka/rdkafka.h>
+#include <ctype.h>
#define LOG_PREFIX "rlm_kafka"
CONF_SECTION *cs;
char const *reference;
char const *key;
+ char const *headers;
/*
* Topic handle to avoid rbtree lookups for section-based calls
static const CONF_PARSER authorize_config[] = {
{ "reference", FR_CONF_OFFSET(PW_TYPE_STRING | PW_TYPE_XLAT, rlm_kafka_t, authorize.reference), ".message" },
{ "key", FR_CONF_OFFSET(PW_TYPE_STRING | PW_TYPE_XLAT, rlm_kafka_t, authorize.key), NULL},
+ { "headers", FR_CONF_OFFSET(PW_TYPE_STRING | PW_TYPE_XLAT, rlm_kafka_t, authorize.headers), NULL},
CONF_PARSER_TERMINATOR
};
static const CONF_PARSER postauth_config[] = {
{ "reference", FR_CONF_OFFSET(PW_TYPE_STRING | PW_TYPE_XLAT, rlm_kafka_t, postauth.reference), ".message" },
{ "key", FR_CONF_OFFSET(PW_TYPE_STRING | PW_TYPE_XLAT, rlm_kafka_t, postauth.key), NULL},
+ { "headers", FR_CONF_OFFSET(PW_TYPE_STRING | PW_TYPE_XLAT, rlm_kafka_t, postauth.headers), NULL},
CONF_PARSER_TERMINATOR
};
static const CONF_PARSER accounting_config[] = {
{ "reference", FR_CONF_OFFSET(PW_TYPE_STRING | PW_TYPE_XLAT, rlm_kafka_t, accounting.reference), ".message" },
{ "key", FR_CONF_OFFSET(PW_TYPE_STRING | PW_TYPE_XLAT, rlm_kafka_t, accounting.key), NULL},
+ { "headers", FR_CONF_OFFSET(PW_TYPE_STRING | PW_TYPE_XLAT, rlm_kafka_t, accounting.headers), NULL},
CONF_PARSER_TERMINATOR
};
#define RETRIES 2
static int kafka_produce(rlm_kafka_t *inst, UNUSED REQUEST *request, rd_kafka_topic_t *rkt,
char* const key, const size_t key_len, char* const message, const size_t len,
- const bool async) {
+ rd_kafka_headers_t *hdrs, const bool async) {
rd_kafka_resp_err_t status = NO_DELIVERY_REPORT;
rd_kafka_resp_err_t err;
RD_KAFKA_V_MSGFLAGS(async ? RD_KAFKA_MSG_F_COPY : 0),
RD_KAFKA_V_KEY(key, key_len),
RD_KAFKA_V_VALUE(message, len),
+ RD_KAFKA_V_HEADERS(hdrs),
RD_KAFKA_V_OPAQUE(async ? NULL : &status),
RD_KAFKA_V_END
);
rd_kafka_poll(inst->rk, 1000);
}
+ /*
+ * If rd_kafka_producev() failed then we still own any headers.
+ *
+ */
+ if (err != RD_KAFKA_RESP_ERR_NO_ERROR && hdrs)
+ rd_kafka_headers_destroy(hdrs);
+
/*
* If the delivery is specified as synchronous and we did not
* encounter an immediate error when producing to the queue then
}
#undef NO_DELIVERY_REPORT
+static int create_headers(REQUEST *request, const char *in, rd_kafka_headers_t **out)
+{
+ rd_kafka_headers_t *hdrs = NULL;
+ const char *p;
+ VALUE_PAIR *header_vps = NULL, *vps, *vp;
+ int num_vps;
+ vp_cursor_t cursor;
+
+ if (!in)
+ return 0;
+
+ /*
+ * Decode the headers string to derive a set of VPs from which to
+ * create Kafka headers
+ *
+ */
+ p = in;
+
+ while (isspace((uint8_t) *p)) p++;
+ if (*p == '\0') return -1;
+
+ while (*p) {
+ bool negate = false;
+ vp_tmpl_t *vpt = NULL;
+ ssize_t slen;
+
+ while (isspace((uint8_t) *p)) p++;
+
+ if (*p == '\0') break;
+
+ /* Check if we should be removing attributes */
+ if (*p == '!') {
+ p++;
+ negate = true;
+ }
+
+ if (*p == '\0') {
+ /* May happen e.g. with '!' on its own at the end */
+ REMARKER(in, (p - in), "Missing attribute name");
+ error:
+ fr_pair_list_free(&header_vps);
+ talloc_free(vpt);
+ return -1;
+ }
+
+ /* Decode next attr template */
+ slen = tmpl_afrom_attr_substr(request, &vpt, p, REQUEST_CURRENT, PAIR_LIST_REQUEST, false, false);
+
+ if (slen <= 0) {
+ REMARKER(in, (p - in) - slen, fr_strerror());
+ goto error;
+ }
+
+ /*
+ * Get attributes from the template.
+ * Missing attribute isn't an error (so -1, not 0).
+ */
+ if (tmpl_copy_vps(request, &vps, request, vpt) < -1) {
+ REDEBUG("Error copying attributes");
+ goto error;
+ }
+
+ if (negate) {
+ /* Remove all template attributes from header list */
+ for (vp = vps; vp; vp = vp->next)
+ fr_pair_delete_by_da(&header_vps, vp->da);
+
+ fr_pair_list_free(&vps);
+ } else {
+ /* Add template VPs to header list */
+ fr_pair_add(&header_vps, vps);
+ }
+
+ TALLOC_FREE(vpt);
+
+ /* Jump forward to next attr */
+ p += slen;
+
+ if (*p != '\0' && !isspace((uint8_t)*p)) {
+ REMARKER(in, (p - in), "Missing whitespace");
+ goto error;
+ }
+
+ }
+
+ /*
+ * Create the Kafka headers for the derived VPs
+ *
+ */
+ for (vp = header_vps, num_vps = 0; vp; vp = vp->next, num_vps++);
+ if (num_vps == 0) return 0;
+
+ hdrs = rd_kafka_headers_new(num_vps);
+ for (vp = fr_cursor_init(&cursor, &header_vps);
+ vp;
+ vp = fr_cursor_next(&cursor)) {
+ rd_kafka_resp_err_t err;
+ const char *attr = vp->da->name;
+ char *value;
+
+ value = vp_aprints_value(NULL, vp, '\0');
+ err = rd_kafka_header_add(hdrs, attr, -1, value, -1);
+ talloc_free(value);
+ if (err) {
+ rd_kafka_headers_destroy(hdrs);
+ return -1;
+ }
+ }
+
+ *out = hdrs;
+ return 0;
+
+}
+
/*
- * Format is either:
- * - "%{kafka:<topic> &Key-Attr-Ref <message data>}", or
- * - "%{kafka:<topic> <message data>}" (if no key)
+ * Format is one of the following:
+ *
+ * - "%{kafka:<topic> (<header-list>) &Key-Attr-Ref <message data>}"
+ * - "%{kafka:<topic> (<header-list>) <message data>}" (no key => space)
+ * - "%{kafka:<topic> &Key-Attr-Ref <message data>}"
+ * - "%{kafka:<topic> <message data>}" (no key => space)
*
*/
static ssize_t kafka_xlat(void *instance, REQUEST *request, char const *fmt, char *out, UNUSED size_t outlen)
{
rlm_kafka_t *inst = instance;
- char const *p = fmt;
+ char const *p = fmt, *q;
ssize_t key_len = 0;
char *expanded = NULL;
+ char *headers = NULL;
char buf[256];
rlm_kafka_rkt_by_name_t *entry, my_topic;
+ rd_kafka_headers_t *hdrs = NULL;
union {
const uint8_t *key_const;
*/
p = strchr(fmt, ' ');
if (!p || *fmt == ' ' || *(p+1) == '\0') {
- REDEBUG("Kafka xlat must begin with a topic followed by the payload");
+ REDEBUG("Kafka xlat must begin with a topic, optionally followed by headers, then the payload");
+error:
+ talloc_free(expanded);
+ if (hdrs) rd_kafka_headers_destroy(hdrs);
return -1;
}
if ((size_t)(p - fmt) >= sizeof(buf)) {
REDEBUG("Insufficient space to store Kafka topic name, needed %zu bytes, have %zu bytes",
(p - fmt) + 1, sizeof(buf));
- return -1;
+ goto error;
}
strlcpy(buf, fmt, (p - fmt) + 1);
p++;
entry = rbtree_finddata(inst->rkt_by_name_tree, &my_topic);
if (!entry || !entry->rkt) {
RWARN("No configuration section exists for kafka topic \"%s\"", buf);
- return -1;
+ goto error;
+ }
+
+ /*
+ * Extract the header specification, and generate the headers
+ *
+ */
+ q = p;
+ if (*p == '(') {
+ p = strchr(p, ')');
+ if (!p) {
+ REDEBUG("Header list is missing closing parenthesis)");
+ goto error;
+ }
+ MEM(headers = talloc_strndup(NULL, q + 1, p - q - 1));
+ if (*headers && create_headers(request, headers, &hdrs) < 0) {
+ RDEBUG3("Failed to create headers");
+ talloc_free(headers);
+ goto error;
+ }
+ talloc_free(headers);
+ p++;
+ if (*p != ' ') {
+ REDEBUG("Kafka xlat must begin with a topic, optionally followed by headers, then the payload");
+ goto error;
+ }
+ p++;
}
/*
* Extract the key, if there is one, otherwise expect a space.
*
*/
+ q = p;
if (*p == '&') {
- p = strchr(fmt, ' ');
+ p = strchr(p, ' ');
if (!p) {
- REDEBUG("Key attribute form requires a message after the key (&Key-Attr-Ref <message data>)");
- return -1;
+ REDEBUG("Key attribute form requires a message after the key (... &Key-Attr-Ref <message data>)");
+ goto error;
}
- if ((size_t)(p - fmt) >= sizeof(buf)) {
+ if ((size_t)(p - q) >= sizeof(buf)) {
REDEBUG("Insufficient space to store key attribute ref, needed %zu bytes, have %zu bytes",
- (p - fmt) + 1, sizeof(buf));
- return -1;
+ (p - q) + 1, sizeof(buf));
+ goto error;
}
- strlcpy(buf, fmt, (p - fmt) + 1);
+ strlcpy(buf, q, (p - q) + 1);
key_len = xlat_fmt_to_ref(&k.key_const, request, buf);
- if (key_len < 0) return -1;
+ if (key_len < 0) goto error;
RDEBUG3("message key=%.*s\n", (int)key_len, k.key_const);
} else if (*p != ' ') {
*
*/
REDEBUG("Kafka payload must begin with an attribute reference or a space");
- return -1;
+ goto error;
}
p++;
*/
if (radius_axlat(&expanded, request, p, NULL, NULL) < 0) {
REDEBUG("Message expansion failed");
- return -1;
+ goto error;
}
- kafka_produce(inst, request, entry->rkt, k.key_unconst, key_len, expanded, strlen(expanded), inst->async);
+ kafka_produce(inst, request, entry->rkt, k.key_unconst, key_len,
+ expanded, strlen(expanded), hdrs, inst->async);
talloc_free(expanded);
static int mod_detach(UNUSED void *instance)
{
- rd_kafka_resp_err_t err;
- rlm_kafka_t *inst = instance;
+ rd_kafka_resp_err_t err;
+ rlm_kafka_t *inst = instance;
if (inst->stats_file) {
DEBUG3("Closing Kafka statistics file");
rlm_kafka_t *inst = instance;
char *key = NULL;
char *message = NULL;
+ rd_kafka_headers_t *hdrs = NULL;
CONF_ITEM *item;
CONF_PAIR *cp;
const char *schema;
return RLM_MODULE_FAIL;
}
- if (radius_axlat(&key, request, section->key, NULL, NULL) < 0) {
- RDEBUG3("Failed to expand key");
- talloc_free(message);
- return RLM_MODULE_FAIL;
+ if (section->key) {
+ if (radius_axlat(&key, request, section->key, NULL, NULL) < 0) {
+ RDEBUG3("Failed to expand key");
+ talloc_free(message);
+ return RLM_MODULE_FAIL;
+ }
+ }
+
+ if (section->headers) {
+ if (create_headers(request, section->headers, &hdrs) < 0) {
+ RDEBUG3("Failed to create headers");
+ talloc_free(message);
+ return RLM_MODULE_FAIL;
+ }
}
if (kafka_produce(inst, request, section->rkt,
- key, strlen(key), message, strlen(message),
- inst->async
+ key, key ? strlen(key) : 0,
+ message, strlen(message),
+ hdrs, inst->async
) != 0)
ret = RLM_MODULE_FAIL;