*/
typedef struct {
char const *name;
- rd_kafka_topic_t *rkt;
+ rd_kafka_topic_t *kt;
fr_rb_node_t node;
-} rlm_kafka_topic_handle_t;
+} rlm_kafka_topic_t;
typedef struct rlm_kafka_thread_s rlm_kafka_thread_t;
rd_kafka_resp_err_t err; //!< stashed by dr_msg_cb for resume
int32_t partition;
int64_t offset;
-} kafka_produce_ctx_t;
+} rlm_kafka_msg_ctx_t;
struct rlm_kafka_thread_s {
rlm_kafka_t const *inst;
int wake_pipe[2]; //!< self-pipe, [0]=read, [1]=write
fr_event_fd_t *ev_fd; //!< event for wake_pipe[0]
- fr_rb_tree_t *topics; //!< rlm_kafka_topic_handle_t per declared topic
+ fr_rb_tree_t *topics; //!< rlm_kafka_topic_t per declared topic
- fr_dlist_head_t inflight; //!< outstanding kafka_produce_ctx_t
+ fr_dlist_head_t inflight; //!< outstanding rlm_kafka_msg_ctx_t
};
/** Call env for `kafka.produce.<topic>`.
char const *topic; //!< resolved topic name (validated at parse time)
fr_value_box_t *key; //!< optional message key
fr_value_box_t *value; //!< message payload
-} rlm_kafka_produce_env_t;
+} rlm_kafka_env_t;
-/** @param[in] a rlm_kafka_topic_handle_t.
+/** @param[in] a rlm_kafka_topic_t.
* @param[in] b same.
* @return `strcmp` ordering of `a->name` and `b->name`. */
static int8_t topic_name_cmp(void const *a, void const *b)
}
/** Destructor for per-thread topic handles. Releases the rd_kafka_topic_t. */
-static int _topic_handle_free(rlm_kafka_topic_handle_t *h)
+static int _topic_free(rlm_kafka_topic_t *h)
{
- if (h->rkt) rd_kafka_topic_destroy(h->rkt);
+ if (h->kt) rd_kafka_topic_destroy(h->kt);
return 0;
}
*/
static rd_kafka_topic_t *kafka_thread_topic(rlm_kafka_thread_t *t, char const *name)
{
- rlm_kafka_topic_handle_t key = { .name = name };
- rlm_kafka_topic_handle_t *h;
+ rlm_kafka_topic_t key = { .name = name };
+ rlm_kafka_topic_t *h;
h = fr_rb_find(t->topics, &key);
- return h ? h->rkt : NULL;
+ return h ? h->kt : NULL;
}
/** Broker-level error callback (connection failures etc).
* @param[in] pctx produce context with the stashed error.
* @return an `rlm_rcode_t` summarising the outcome.
*/
-static rlm_rcode_t kafka_err_to_rcode(request_t *request, kafka_produce_ctx_t const *pctx)
+static rlm_rcode_t kafka_err_to_rcode(request_t *request, rlm_kafka_msg_ctx_t const *pctx)
{
switch (pctx->err) {
case RD_KAFKA_RESP_ERR_NO_ERROR:
* @param[in] key_len length of `key`, 0 if `key` is `NULL`.
* @param[in] value message payload.
* @param[in] value_len length of `value`.
- * @return the kafka_produce_ctx_t tracking the request, or `NULL` on failure.
+ * @return the rlm_kafka_msg_ctx_t tracking the request, or `NULL` on failure.
*/
static inline CC_HINT(always_inline)
-kafka_produce_ctx_t *kafka_produce_enqueue(rlm_kafka_thread_t *t, request_t *request,
+rlm_kafka_msg_ctx_t *kafka_produce_enqueue(rlm_kafka_thread_t *t, request_t *request,
rd_kafka_topic_t *topic,
uint8_t const *key, size_t key_len,
uint8_t const *value, size_t value_len)
{
- kafka_produce_ctx_t *pctx;
+ rlm_kafka_msg_ctx_t *pctx;
- MEM(pctx = talloc(t, kafka_produce_ctx_t));
- *pctx = (kafka_produce_ctx_t) {
+ MEM(pctx = talloc(t, rlm_kafka_msg_ctx_t));
+ *pctx = (rlm_kafka_msg_ctx_t) {
.t = t,
.request = request,
.err = RD_KAFKA_RESP_ERR_NO_ERROR,
*
* Reused by @ref _kafka_topic_env_parse when it emits synthetic call_env
* entries; also carries the offsets so the framework writes expanded
- * value boxes into the right fields on `rlm_kafka_produce_env_t`.
+ * value boxes into the right fields on `rlm_kafka_env_t`.
*/
static call_env_parser_t const rlm_kafka_value_rule = {
FR_CALL_ENV_OFFSET("value", FR_TYPE_STRING, CALL_ENV_FLAG_REQUIRED | CALL_ENV_FLAG_CONCAT,
- rlm_kafka_produce_env_t, value)
+ rlm_kafka_env_t, value)
};
static call_env_parser_t const rlm_kafka_key_rule = {
FR_CALL_ENV_OFFSET("key", FR_TYPE_STRING, CALL_ENV_FLAG_CONCAT | CALL_ENV_FLAG_NULLABLE,
- rlm_kafka_produce_env_t, key)
+ rlm_kafka_env_t, key)
};
/** Resolve the topic name from the method's second identifier, and pull
.flags = CALL_ENV_FLAG_PARSE_ONLY,
.pair = {
.parsed = {
- .offset = offsetof(rlm_kafka_produce_env_t, topic),
+ .offset = offsetof(rlm_kafka_env_t, topic),
.type = CALL_ENV_PARSE_TYPE_VOID
}
}
}
static const call_env_method_t rlm_kafka_produce_env = {
- FR_CALL_ENV_METHOD_OUT(rlm_kafka_produce_env_t),
+ FR_CALL_ENV_METHOD_OUT(rlm_kafka_env_t),
.env = (call_env_parser_t[]) {
{ FR_CALL_ENV_SUBSECTION_FUNC(CF_IDENT_ANY, CF_IDENT_ANY,
CALL_ENV_FLAG_PARSE_MISSING, _kafka_topic_env_parse) },
/** Resume a yielded module method after its delivery report has arrived.
*
* Runs on the same worker as the originating produce (per-thread
- * producer), with `mctx->rctx` being the @ref kafka_produce_ctx_t the
+ * producer), with `mctx->rctx` being the @ref rlm_kafka_msg_ctx_t the
* method stashed on yield. Translates the dr_msg_cb-populated error
* into an rcode, frees the pctx, and hands control back to unlang.
*
*/
static unlang_action_t mod_resume(unlang_result_t *p_result, module_ctx_t const *mctx, request_t *request)
{
- kafka_produce_ctx_t *pctx = talloc_get_type_abort(mctx->rctx, kafka_produce_ctx_t);
+ rlm_kafka_msg_ctx_t *pctx = talloc_get_type_abort(mctx->rctx, rlm_kafka_msg_ctx_t);
rlm_rcode_t rcode = kafka_err_to_rcode(request, pctx);
talloc_free(pctx);
*/
static void mod_signal(module_ctx_t const *mctx, request_t *request, UNUSED fr_signal_t action)
{
- kafka_produce_ctx_t *pctx = talloc_get_type_abort(mctx->rctx, kafka_produce_ctx_t);
+ rlm_kafka_msg_ctx_t *pctx = talloc_get_type_abort(mctx->rctx, rlm_kafka_msg_ctx_t);
RDEBUG2("Cancellation signal received - detaching delivery report");
pctx->request = NULL;
*
* @param[out] p_result UNUSED (resume writes the real rcode).
* @param[in] mctx module ctx (mctx->thread is the rlm_kafka_thread_t,
- * mctx->env_data is the rlm_kafka_produce_env_t).
+ * mctx->env_data is the rlm_kafka_env_t).
* @param[in] request the request being handled.
* @return yielded on success, UNLANG_ACTION_FAIL if the produce couldn't
* even be enqueued.
module_ctx_t const *mctx, request_t *request)
{
rlm_kafka_thread_t *t = talloc_get_type_abort(mctx->thread, rlm_kafka_thread_t);
- rlm_kafka_produce_env_t *env = talloc_get_type_abort(mctx->env_data, rlm_kafka_produce_env_t);
+ rlm_kafka_env_t *env = talloc_get_type_abort(mctx->env_data, rlm_kafka_env_t);
rd_kafka_topic_t *topic;
- kafka_produce_ctx_t *pctx;
+ rlm_kafka_msg_ctx_t *pctx;
uint8_t const *key = NULL;
size_t key_len = 0;
*
* @param[in] xctx_ctx talloc context for the returned value box.
* @param[in] out cursor to append the result to.
- * @param[in] xctx xlat ctx, rctx points at the kafka_produce_ctx_t.
+ * @param[in] xctx xlat ctx, rctx points at the rlm_kafka_msg_ctx_t.
* @param[in] request associated request (for logging).
* @param[in] in UNUSED (original args).
*/
xlat_ctx_t const *xctx,
request_t *request, UNUSED fr_value_box_list_t *in)
{
- kafka_produce_ctx_t *pctx = talloc_get_type_abort(xctx->rctx, kafka_produce_ctx_t);
+ rlm_kafka_msg_ctx_t *pctx = talloc_get_type_abort(xctx->rctx, rlm_kafka_msg_ctx_t);
fr_value_box_t *vb;
bool delivered = (pctx->err == RD_KAFKA_RESP_ERR_NO_ERROR);
* rather than trying to resume a cancelled request. dr_msg_cb owns
* the free.
*
- * @param[in] xctx xlat ctx (xctx->rctx is the kafka_produce_ctx_t).
+ * @param[in] xctx xlat ctx (xctx->rctx is the rlm_kafka_msg_ctx_t).
* @param[in] request UNUSED.
* @param[in] action UNUSED (we mask off everything except CANCEL).
*/
static void kafka_xlat_produce_signal(xlat_ctx_t const *xctx, UNUSED request_t *request, UNUSED fr_signal_t action)
{
- kafka_produce_ctx_t *pctx = talloc_get_type_abort(xctx->rctx, kafka_produce_ctx_t);
+ rlm_kafka_msg_ctx_t *pctx = talloc_get_type_abort(xctx->rctx, rlm_kafka_msg_ctx_t);
pctx->request = NULL;
}
fr_value_box_t *topic_vb = fr_value_box_list_head(in);
fr_value_box_t *value_vb = fr_value_box_list_next(in, topic_vb);
rd_kafka_topic_t *topic;
- kafka_produce_ctx_t *pctx;
+ rlm_kafka_msg_ctx_t *pctx;
if (!topic_vb || !value_vb) {
REDEBUG("kafka.produce xlat requires (topic, value)");
*/
static void _kafka_delivery_report_cb(UNUSED rd_kafka_t *rk, rd_kafka_message_t const *msg, UNUSED void *opaque)
{
- kafka_produce_ctx_t *pctx = msg->_private;
+ rlm_kafka_msg_ctx_t *pctx = msg->_private;
if (!pctx) return;
*/
static int kafka_topic_thread_handles(rlm_kafka_thread_t *t)
{
- t->topics = fr_rb_inline_talloc_alloc(t, rlm_kafka_topic_handle_t, node, topic_name_cmp, NULL);
+ t->topics = fr_rb_inline_talloc_alloc(t, rlm_kafka_topic_t, node, topic_name_cmp, NULL);
if (!t->topics) {
ERROR("rlm_kafka: failed to allocate topic handle tree");
return -1;
if (!t->inst->kconf.topics) return 0;
fr_rb_inorder_foreach(t->inst->kconf.topics, fr_kafka_topic_t, topic) {
- rlm_kafka_topic_handle_t *h;
- rd_kafka_topic_conf_t *tc;
-
- tc = rd_kafka_topic_conf_dup(topic->conf->rdtc);
- MEM(h = talloc_zero(t->topics, rlm_kafka_topic_handle_t));
- MEM(h->name = talloc_strdup(h, topic->name));
- h->rkt = rd_kafka_topic_new(t->rk, h->name, tc);
- if (!h->rkt) {
+ rlm_kafka_topic_t *topic_t;
+ rd_kafka_topic_conf_t *ktc;
+
+ ktc = rd_kafka_topic_conf_dup(topic->conf->rdtc);
+ MEM(topic_t = talloc_zero(t->topics, rlm_kafka_topic_t));
+ MEM(topic_t->name = talloc_strdup(topic_t, topic->name));
+ topic_t->kt = rd_kafka_topic_new(t->rk, topic_t->name, ktc);
+ if (!topic_t->kt) {
/* librdkafka consumes tc only on success */
- rd_kafka_topic_conf_destroy(tc);
+ rd_kafka_topic_conf_destroy(ktc);
ERROR("rlm_kafka: rd_kafka_topic_new('%s') failed: %s",
- h->name, rd_kafka_err2str(rd_kafka_last_error()));
- talloc_free(h);
+ topic_t->name, rd_kafka_err2str(rd_kafka_last_error()));
+ talloc_free(topic_t);
return -1;
}
- talloc_set_destructor(h, _topic_handle_free);
+ talloc_set_destructor(topic_t, _topic_free);
- if (!fr_rb_insert(t->topics, h)) {
- talloc_free(h);
- ERROR("rlm_kafka: duplicate topic handle '%s'", h->name);
+ if (!fr_rb_insert(t->topics, topic_t)) {
+ talloc_free(topic_t);
+ ERROR("rlm_kafka: duplicate topic handle '%s'", topic_t->name);
return -1;
}
}
static int mod_thread_detach(module_thread_inst_ctx_t const *mctx)
{
rlm_kafka_thread_t *t = talloc_get_type_abort(mctx->thread, rlm_kafka_thread_t);
- kafka_produce_ctx_t *pctx;
+ rlm_kafka_msg_ctx_t *pctx;
/*
* Detach all in-flight requests. After this, dr_msg_cb will
t->el = mctx->el;
t->wake_pipe[0] = t->wake_pipe[1] = -1;
- fr_dlist_talloc_init(&t->inflight, kafka_produce_ctx_t, entry);
+ fr_dlist_talloc_init(&t->inflight, rlm_kafka_msg_ctx_t, entry);
/*
* rd_kafka_new consumes the conf, so dup it.