]> git.ipfire.org Git - thirdparty/freeradius-server.git/commitdiff
rlm_kafka: drop wordy type and local names
authorArran Cudbard-Bell <a.cudbardb@freeradius.org>
Wed, 22 Apr 2026 01:15:43 +0000 (21:15 -0400)
committerArran Cudbard-Bell <a.cudbardb@freeradius.org>
Wed, 22 Apr 2026 03:48:38 +0000 (23:48 -0400)
Strip the redundant noise from the type names now that the module is
settled:

    kafka_produce_ctx_t     -> rlm_kafka_msg_ctx_t
    rlm_kafka_produce_env_t -> rlm_kafka_env_t
    rlm_kafka_topic_handle_t -> rlm_kafka_topic_t

Plus the topic handle's rd_kafka_topic_t field and the loop locals in
kafka_topic_thread_handles (rkt/h/tc -> kt/topic_t/ktc).  Pure rename,
no behaviour change.

src/modules/rlm_kafka/rlm_kafka.c

index 1acff527bbfe0366e19f86ac8daeed17ed838a41..a7bd243fddc9cce529dd1b8da2c6afa5bebe2323 100644 (file)
@@ -84,9 +84,9 @@ typedef struct {
  */
 typedef struct {
        char const              *name;
-       rd_kafka_topic_t        *rkt;
+       rd_kafka_topic_t        *kt;
        fr_rb_node_t            node;
-} rlm_kafka_topic_handle_t;
+} rlm_kafka_topic_t;
 
 typedef struct rlm_kafka_thread_s rlm_kafka_thread_t;
 
@@ -104,7 +104,7 @@ typedef struct {
        rd_kafka_resp_err_t     err;            //!< stashed by dr_msg_cb for resume
        int32_t                 partition;
        int64_t                 offset;
-} kafka_produce_ctx_t;
+} rlm_kafka_msg_ctx_t;
 
 struct rlm_kafka_thread_s {
        rlm_kafka_t const       *inst;
@@ -116,9 +116,9 @@ struct rlm_kafka_thread_s {
        int                     wake_pipe[2];   //!< self-pipe, [0]=read, [1]=write
        fr_event_fd_t           *ev_fd;         //!< event for wake_pipe[0]
 
-       fr_rb_tree_t            *topics;        //!< rlm_kafka_topic_handle_t per declared topic
+       fr_rb_tree_t            *topics;        //!< rlm_kafka_topic_t per declared topic
 
-       fr_dlist_head_t         inflight;       //!< outstanding kafka_produce_ctx_t
+       fr_dlist_head_t         inflight;       //!< outstanding rlm_kafka_msg_ctx_t
 };
 
 /** Call env for `kafka.produce.<topic>`.
@@ -131,9 +131,9 @@ typedef struct {
        char const              *topic; //!< resolved topic name (validated at parse time)
        fr_value_box_t          *key;   //!< optional message key
        fr_value_box_t          *value; //!< message payload
-} rlm_kafka_produce_env_t;
+} rlm_kafka_env_t;
 
-/** @param[in] a  rlm_kafka_topic_handle_t.
+/** @param[in] a  rlm_kafka_topic_t.
  *  @param[in] b  same.
  *  @return `strcmp` ordering of `a->name` and `b->name`. */
 static int8_t topic_name_cmp(void const *a, void const *b)
@@ -144,9 +144,9 @@ static int8_t topic_name_cmp(void const *a, void const *b)
 }
 
 /** Destructor for per-thread topic handles.  Releases the rd_kafka_topic_t. */
-static int _topic_handle_free(rlm_kafka_topic_handle_t *h)
+static int _topic_free(rlm_kafka_topic_t *h)
 {
-       if (h->rkt) rd_kafka_topic_destroy(h->rkt);
+       if (h->kt) rd_kafka_topic_destroy(h->kt);
        return 0;
 }
 
@@ -158,11 +158,11 @@ static int _topic_handle_free(rlm_kafka_topic_handle_t *h)
  */
 static rd_kafka_topic_t *kafka_thread_topic(rlm_kafka_thread_t *t, char const *name)
 {
-       rlm_kafka_topic_handle_t        key = { .name = name };
-       rlm_kafka_topic_handle_t        *h;
+       rlm_kafka_topic_t       key = { .name = name };
+       rlm_kafka_topic_t       *h;
 
        h = fr_rb_find(t->topics, &key);
-       return h ? h->rkt : NULL;
+       return h ? h->kt : NULL;
 }
 
 /** Broker-level error callback (connection failures etc).
@@ -239,7 +239,7 @@ static void _kafka_fd_error(UNUSED fr_event_list_t *el, int fd, UNUSED int flags
  * @param[in] pctx     produce context with the stashed error.
  * @return an `rlm_rcode_t` summarising the outcome.
  */
-static rlm_rcode_t kafka_err_to_rcode(request_t *request, kafka_produce_ctx_t const *pctx)
+static rlm_rcode_t kafka_err_to_rcode(request_t *request, rlm_kafka_msg_ctx_t const *pctx)
 {
        switch (pctx->err) {
        case RD_KAFKA_RESP_ERR_NO_ERROR:
@@ -280,18 +280,18 @@ static rlm_rcode_t kafka_err_to_rcode(request_t *request, kafka_produce_ctx_t co
  * @param[in] key_len   length of `key`, 0 if `key` is `NULL`.
  * @param[in] value     message payload.
  * @param[in] value_len length of `value`.
- * @return the kafka_produce_ctx_t tracking the request, or `NULL` on failure.
+ * @return the rlm_kafka_msg_ctx_t tracking the request, or `NULL` on failure.
  */
 static inline CC_HINT(always_inline)
-kafka_produce_ctx_t *kafka_produce_enqueue(rlm_kafka_thread_t *t, request_t *request,
+rlm_kafka_msg_ctx_t *kafka_produce_enqueue(rlm_kafka_thread_t *t, request_t *request,
                                           rd_kafka_topic_t *topic,
                                           uint8_t const *key, size_t key_len,
                                           uint8_t const *value, size_t value_len)
 {
-       kafka_produce_ctx_t     *pctx;
+       rlm_kafka_msg_ctx_t     *pctx;
 
-       MEM(pctx = talloc(t, kafka_produce_ctx_t));
-       *pctx = (kafka_produce_ctx_t) {
+       MEM(pctx = talloc(t, rlm_kafka_msg_ctx_t));
+       *pctx = (rlm_kafka_msg_ctx_t) {
                .t = t,
                .request = request,
                .err = RD_KAFKA_RESP_ERR_NO_ERROR,
@@ -319,15 +319,15 @@ kafka_produce_ctx_t *kafka_produce_enqueue(rlm_kafka_thread_t *t, request_t *req
  *
  * Reused by @ref _kafka_topic_env_parse when it emits synthetic call_env
  * entries; also carries the offsets so the framework writes expanded
- * value boxes into the right fields on `rlm_kafka_produce_env_t`.
+ * value boxes into the right fields on `rlm_kafka_env_t`.
  */
 static call_env_parser_t const rlm_kafka_value_rule = {
        FR_CALL_ENV_OFFSET("value", FR_TYPE_STRING, CALL_ENV_FLAG_REQUIRED | CALL_ENV_FLAG_CONCAT,
-                          rlm_kafka_produce_env_t, value)
+                          rlm_kafka_env_t, value)
 };
 static call_env_parser_t const rlm_kafka_key_rule = {
        FR_CALL_ENV_OFFSET("key", FR_TYPE_STRING, CALL_ENV_FLAG_CONCAT | CALL_ENV_FLAG_NULLABLE,
-                          rlm_kafka_produce_env_t, key)
+                          rlm_kafka_env_t, key)
 };
 
 /** Resolve the topic name from the method's second identifier, and pull
@@ -379,7 +379,7 @@ static int _kafka_topic_env_parse(TALLOC_CTX *ctx, call_env_parsed_head_t *out,
                                                .flags = CALL_ENV_FLAG_PARSE_ONLY,
                                                .pair = {
                                                        .parsed = {
-                                                               .offset = offsetof(rlm_kafka_produce_env_t, topic),
+                                                               .offset = offsetof(rlm_kafka_env_t, topic),
                                                                .type = CALL_ENV_PARSE_TYPE_VOID
                                                        }
                                                }
@@ -415,7 +415,7 @@ static int _kafka_topic_env_parse(TALLOC_CTX *ctx, call_env_parsed_head_t *out,
 }
 
 static const call_env_method_t rlm_kafka_produce_env = {
-       FR_CALL_ENV_METHOD_OUT(rlm_kafka_produce_env_t),
+       FR_CALL_ENV_METHOD_OUT(rlm_kafka_env_t),
        .env = (call_env_parser_t[]) {
                { FR_CALL_ENV_SUBSECTION_FUNC(CF_IDENT_ANY, CF_IDENT_ANY,
                                              CALL_ENV_FLAG_PARSE_MISSING, _kafka_topic_env_parse) },
@@ -426,7 +426,7 @@ static const call_env_method_t rlm_kafka_produce_env = {
 /** Resume a yielded module method after its delivery report has arrived.
  *
  * Runs on the same worker as the originating produce (per-thread
- * producer), with `mctx->rctx` being the @ref kafka_produce_ctx_t the
+ * producer), with `mctx->rctx` being the @ref rlm_kafka_msg_ctx_t the
  * method stashed on yield.  Translates the dr_msg_cb-populated error
  * into an rcode, frees the pctx, and hands control back to unlang.
  *
@@ -437,7 +437,7 @@ static const call_env_method_t rlm_kafka_produce_env = {
  */
 static unlang_action_t mod_resume(unlang_result_t *p_result, module_ctx_t const *mctx, request_t *request)
 {
-       kafka_produce_ctx_t     *pctx = talloc_get_type_abort(mctx->rctx, kafka_produce_ctx_t);
+       rlm_kafka_msg_ctx_t     *pctx = talloc_get_type_abort(mctx->rctx, rlm_kafka_msg_ctx_t);
        rlm_rcode_t             rcode = kafka_err_to_rcode(request, pctx);
 
        talloc_free(pctx);
@@ -460,7 +460,7 @@ static unlang_action_t mod_resume(unlang_result_t *p_result, module_ctx_t const
  */
 static void mod_signal(module_ctx_t const *mctx, request_t *request, UNUSED fr_signal_t action)
 {
-       kafka_produce_ctx_t     *pctx = talloc_get_type_abort(mctx->rctx, kafka_produce_ctx_t);
+       rlm_kafka_msg_ctx_t     *pctx = talloc_get_type_abort(mctx->rctx, rlm_kafka_msg_ctx_t);
 
        RDEBUG2("Cancellation signal received - detaching delivery report");
        pctx->request = NULL;
@@ -502,7 +502,7 @@ static void mod_signal(module_ctx_t const *mctx, request_t *request, UNUSED fr_s
  *
  * @param[out] p_result UNUSED (resume writes the real rcode).
  * @param[in] mctx      module ctx (mctx->thread is the rlm_kafka_thread_t,
- *                      mctx->env_data is the rlm_kafka_produce_env_t).
+ *                      mctx->env_data is the rlm_kafka_env_t).
  * @param[in] request   the request being handled.
  * @return yielded on success, UNLANG_ACTION_FAIL if the produce couldn't
  *         even be enqueued.
@@ -511,9 +511,9 @@ static unlang_action_t CC_HINT(nonnull) mod_produce(UNUSED unlang_result_t *p_re
                                                    module_ctx_t const *mctx, request_t *request)
 {
        rlm_kafka_thread_t              *t = talloc_get_type_abort(mctx->thread, rlm_kafka_thread_t);
-       rlm_kafka_produce_env_t         *env = talloc_get_type_abort(mctx->env_data, rlm_kafka_produce_env_t);
+       rlm_kafka_env_t         *env = talloc_get_type_abort(mctx->env_data, rlm_kafka_env_t);
        rd_kafka_topic_t                *topic;
-       kafka_produce_ctx_t             *pctx;
+       rlm_kafka_msg_ctx_t             *pctx;
 
        uint8_t const                   *key = NULL;
        size_t                          key_len = 0;
@@ -656,7 +656,7 @@ static int kafka_xlat_thread_instantiate(xlat_thread_inst_ctx_t const *xctx)
  *
  * @param[in] xctx_ctx talloc context for the returned value box.
  * @param[in] out      cursor to append the result to.
- * @param[in] xctx     xlat ctx, rctx points at the kafka_produce_ctx_t.
+ * @param[in] xctx     xlat ctx, rctx points at the rlm_kafka_msg_ctx_t.
  * @param[in] request  associated request (for logging).
  * @param[in] in       UNUSED (original args).
  */
@@ -664,7 +664,7 @@ static xlat_action_t kafka_xlat_produce_resume(TALLOC_CTX *xctx_ctx, fr_dcursor_
                                               xlat_ctx_t const *xctx,
                                               request_t *request, UNUSED fr_value_box_list_t *in)
 {
-       kafka_produce_ctx_t     *pctx = talloc_get_type_abort(xctx->rctx, kafka_produce_ctx_t);
+       rlm_kafka_msg_ctx_t     *pctx = talloc_get_type_abort(xctx->rctx, rlm_kafka_msg_ctx_t);
        fr_value_box_t          *vb;
        bool                    delivered = (pctx->err == RD_KAFKA_RESP_ERR_NO_ERROR);
 
@@ -685,13 +685,13 @@ static xlat_action_t kafka_xlat_produce_resume(TALLOC_CTX *xctx_ctx, fr_dcursor_
  * rather than trying to resume a cancelled request.  dr_msg_cb owns
  * the free.
  *
- * @param[in] xctx    xlat ctx (xctx->rctx is the kafka_produce_ctx_t).
+ * @param[in] xctx    xlat ctx (xctx->rctx is the rlm_kafka_msg_ctx_t).
  * @param[in] request UNUSED.
  * @param[in] action  UNUSED (we mask off everything except CANCEL).
  */
 static void kafka_xlat_produce_signal(xlat_ctx_t const *xctx, UNUSED request_t *request, UNUSED fr_signal_t action)
 {
-       kafka_produce_ctx_t     *pctx = talloc_get_type_abort(xctx->rctx, kafka_produce_ctx_t);
+       rlm_kafka_msg_ctx_t     *pctx = talloc_get_type_abort(xctx->rctx, rlm_kafka_msg_ctx_t);
        pctx->request = NULL;
 }
 
@@ -727,7 +727,7 @@ static xlat_action_t kafka_xlat_produce(UNUSED TALLOC_CTX *xctx_ctx, UNUSED fr_d
        fr_value_box_t                          *topic_vb = fr_value_box_list_head(in);
        fr_value_box_t                          *value_vb = fr_value_box_list_next(in, topic_vb);
        rd_kafka_topic_t                        *topic;
-       kafka_produce_ctx_t                     *pctx;
+       rlm_kafka_msg_ctx_t                     *pctx;
 
        if (!topic_vb || !value_vb) {
                REDEBUG("kafka.produce xlat requires (topic, value)");
@@ -768,7 +768,7 @@ static xlat_action_t kafka_xlat_produce(UNUSED TALLOC_CTX *xctx_ctx, UNUSED fr_d
  */
 static void _kafka_delivery_report_cb(UNUSED rd_kafka_t *rk, rd_kafka_message_t const *msg, UNUSED void *opaque)
 {
-       kafka_produce_ctx_t     *pctx = msg->_private;
+       rlm_kafka_msg_ctx_t     *pctx = msg->_private;
 
        if (!pctx) return;
 
@@ -796,7 +796,7 @@ static void _kafka_delivery_report_cb(UNUSED rd_kafka_t *rk, rd_kafka_message_t
  */
 static int kafka_topic_thread_handles(rlm_kafka_thread_t *t)
 {
-       t->topics = fr_rb_inline_talloc_alloc(t, rlm_kafka_topic_handle_t, node, topic_name_cmp, NULL);
+       t->topics = fr_rb_inline_talloc_alloc(t, rlm_kafka_topic_t, node, topic_name_cmp, NULL);
        if (!t->topics) {
                ERROR("rlm_kafka: failed to allocate topic handle tree");
                return -1;
@@ -805,26 +805,26 @@ static int kafka_topic_thread_handles(rlm_kafka_thread_t *t)
        if (!t->inst->kconf.topics) return 0;
 
        fr_rb_inorder_foreach(t->inst->kconf.topics, fr_kafka_topic_t, topic) {
-               rlm_kafka_topic_handle_t        *h;
-               rd_kafka_topic_conf_t           *tc;
-
-               tc = rd_kafka_topic_conf_dup(topic->conf->rdtc);
-               MEM(h = talloc_zero(t->topics, rlm_kafka_topic_handle_t));
-               MEM(h->name = talloc_strdup(h, topic->name));
-               h->rkt = rd_kafka_topic_new(t->rk, h->name, tc);
-               if (!h->rkt) {
+               rlm_kafka_topic_t       *topic_t;
+               rd_kafka_topic_conf_t   *ktc;
+
+               ktc = rd_kafka_topic_conf_dup(topic->conf->rdtc);
+               MEM(topic_t = talloc_zero(t->topics, rlm_kafka_topic_t));
+               MEM(topic_t->name = talloc_strdup(topic_t, topic->name));
+               topic_t->kt = rd_kafka_topic_new(t->rk, topic_t->name, ktc);
+               if (!topic_t->kt) {
                        /* librdkafka consumes tc only on success */
-                       rd_kafka_topic_conf_destroy(tc);
+                       rd_kafka_topic_conf_destroy(ktc);
                        ERROR("rlm_kafka: rd_kafka_topic_new('%s') failed: %s",
-                             h->name, rd_kafka_err2str(rd_kafka_last_error()));
-                       talloc_free(h);
+                             topic_t->name, rd_kafka_err2str(rd_kafka_last_error()));
+                       talloc_free(topic_t);
                        return -1;
                }
-               talloc_set_destructor(h, _topic_handle_free);
+               talloc_set_destructor(topic_t, _topic_free);
 
-               if (!fr_rb_insert(t->topics, h)) {
-                       talloc_free(h);
-                       ERROR("rlm_kafka: duplicate topic handle '%s'", h->name);
+               if (!fr_rb_insert(t->topics, topic_t)) {
+                       talloc_free(topic_t);
+                       ERROR("rlm_kafka: duplicate topic handle '%s'", topic_t->name);
                        return -1;
                }
        }
@@ -849,7 +849,7 @@ static int kafka_topic_thread_handles(rlm_kafka_thread_t *t)
 static int mod_thread_detach(module_thread_inst_ctx_t const *mctx)
 {
        rlm_kafka_thread_t      *t = talloc_get_type_abort(mctx->thread, rlm_kafka_thread_t);
-       kafka_produce_ctx_t     *pctx;
+       rlm_kafka_msg_ctx_t     *pctx;
 
        /*
         *      Detach all in-flight requests.  After this, dr_msg_cb will
@@ -932,7 +932,7 @@ static int mod_thread_instantiate(module_thread_inst_ctx_t const *mctx)
        t->el = mctx->el;
        t->wake_pipe[0] = t->wake_pipe[1] = -1;
 
-       fr_dlist_talloc_init(&t->inflight, kafka_produce_ctx_t, entry);
+       fr_dlist_talloc_init(&t->inflight, rlm_kafka_msg_ctx_t, entry);
 
        /*
         *      rd_kafka_new consumes the conf, so dup it.