]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MEDIUM: sink: Update the sink applets to use their own buffers
authorChristopher Faulet <cfaulet@haproxy.com>
Tue, 22 Jul 2025 16:46:38 +0000 (18:46 +0200)
committerChristopher Faulet <cfaulet@haproxy.com>
Thu, 24 Jul 2025 10:06:49 +0000 (12:06 +0200)
Thanks to this patch, the sink applets is now using their own buffers.
.rcv_buf and .snd_buf callback functions are now defined to use the default
raw functions. The applet API is now used and any dependencies on the
stream-connectors and the channels were removed.

src/sink.c

index eef10006d051c78fd416b5401920bf2a94f82f5f..a98d806c45b2d8ef437462fe8de598a24a67a7a3 100644 (file)
@@ -419,7 +419,6 @@ void sink_setup_proxy(struct proxy *px)
 static void _sink_forward_io_handler(struct appctx *appctx,
                                      ssize_t (*msg_handler)(void *ctx, struct ist v1, struct ist v2, size_t ofs, size_t len, char delim))
 {
-       struct stconn *sc = appctx_sc(appctx);
        struct sink_forward_target *sft = appctx->svcctx;
        struct sink *sink = sft->sink;
        struct ring *ring = sink->ctx.ring;
@@ -427,9 +426,8 @@ static void _sink_forward_io_handler(struct appctx *appctx,
        size_t processed;
        int ret = 0;
 
-       if (unlikely(se_fl_test(appctx->sedesc, (SE_FL_EOS|SE_FL_ERROR)))) {
+       if (unlikely(applet_fl_test(appctx, APPCTX_FL_EOS|APPCTX_FL_ERROR)))
                goto out;
-       }
 
        /* if stopping was requested, close immediately */
        if (unlikely(stopping))
@@ -438,9 +436,13 @@ static void _sink_forward_io_handler(struct appctx *appctx,
        /* if the connection is not established, inform the stream that we want
         * to be notified whenever the connection completes.
         */
-       if (sc_opposite(sc)->state < SC_ST_EST) {
+       if (se_fl_test(appctx->sedesc, SE_FL_APPLET_NEED_CONN)) {
                applet_need_more_data(appctx);
-               se_need_remote_conn(appctx->sedesc);
+               applet_have_more_data(appctx);
+               goto out;
+       }
+
+       if (!applet_get_outbuf(appctx)) {
                applet_have_more_data(appctx);
                goto out;
        }
@@ -489,7 +491,7 @@ static void _sink_forward_io_handler(struct appctx *appctx,
 
 out:
        /* always drain data from server */
-       co_skip(sc_oc(sc), sc_oc(sc)->output);
+       applet_reset_input(appctx);
        return;
 
 soft_close:
@@ -497,7 +499,9 @@ soft_close:
         * soft_close will result in the port staying in TIME_WAIT state:
         * don't abuse from soft_close!
         */
-       se_fl_set(appctx->sedesc, SE_FL_EOS|SE_FL_EOI);
+       applet_set_eoi(appctx);
+       applet_set_eos(appctx);
+
        /* if required, hard_close could be achieve by using SE_FL_EOS|SE_FL_ERROR
         * flag combination: RST will be sent, TIME_WAIT will be avoided as if
         * we performed a normal close with NOLINGER flag set
@@ -568,6 +572,7 @@ static int sink_forward_session_init(struct appctx *appctx)
        s->do_log = NULL;
        s->uniq_id = 0;
 
+       se_need_remote_conn(appctx->sedesc);
        applet_expect_no_data(appctx);
 
        HA_SPIN_UNLOCK(SFT_LOCK, &sft->lock);
@@ -598,6 +603,8 @@ static struct applet sink_forward_applet = {
        .obj_type = OBJ_TYPE_APPLET,
        .name = "<SINKFWD>", /* used for logging */
        .fct = sink_forward_io_handler,
+       .rcv_buf = appctx_raw_rcv_buf,
+       .snd_buf = appctx_raw_snd_buf,
        .init = sink_forward_session_init,
        .release = sink_forward_session_release,
 };
@@ -606,6 +613,8 @@ static struct applet sink_forward_oc_applet = {
        .obj_type = OBJ_TYPE_APPLET,
        .name = "<SINKFWDOC>", /* used for logging */
        .fct = sink_forward_oc_io_handler,
+       .rcv_buf = appctx_raw_rcv_buf,
+       .snd_buf = appctx_raw_snd_buf,
        .init = sink_forward_session_init,
        .release = sink_forward_session_release,
 };