]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MEDIUM: applet: Add support for zero-copy forwarding from an applet
authorChristopher Faulet <cfaulet@haproxy.com>
Tue, 23 Jan 2024 06:56:34 +0000 (07:56 +0100)
committerChristopher Faulet <cfaulet@haproxy.com>
Wed, 7 Feb 2024 14:04:01 +0000 (15:04 +0100)
Thanks to this patch, it is possible to an applet to directly send data to
the opposite endpoint. To do so, it must implement <fastfwd> appctx callback
function and set SE_FL_MAY_FASTFWD flag.

Everything will be handled by appctx_fastfwd() function. The applet is only
responsible to transfer data. If it sets <to_forward> value, it is used to
limit the amount of data to forward.

include/haproxy/applet.h
src/applet.c
src/stconn.c

index dc9224b79e3872cdc1358142e359b2806b6b91ce..2f84a1168402773c211f6acd739af29655b96c7b 100644 (file)
@@ -51,6 +51,7 @@ void appctx_free(struct appctx *appctx);
 
 size_t appctx_rcv_buf(struct stconn *sc, struct buffer *buf, size_t count, unsigned int flags);
 size_t appctx_snd_buf(struct stconn *sc, struct buffer *buf, size_t count, unsigned int flags);
+int appctx_fastfwd(struct stconn *sc, unsigned int count, unsigned int flags);
 
 static inline struct appctx *appctx_new_here(struct applet *applet, struct sedesc *sedesc)
 {
index 4ddb0da3bfdc922f7fdf7795e79e47cad0e25db6..28341838cab05d2e2e2a0ff3b3401ec33e568881 100644 (file)
@@ -23,6 +23,7 @@
 #include <haproxy/stream.h>
 #include <haproxy/task.h>
 #include <haproxy/trace.h>
+#include <haproxy/xref.h>
 
 unsigned int nb_applets = 0;
 
@@ -594,6 +595,75 @@ size_t appctx_snd_buf(struct stconn *sc, struct buffer *buf, size_t count, unsig
        return ret;
 }
 
+int appctx_fastfwd(struct stconn *sc, unsigned int count, unsigned int flags)
+{
+       struct appctx *appctx = __sc_appctx(sc);
+       struct xref *peer;
+       struct sedesc *sdo = NULL;
+       unsigned int len;
+       int ret = 0;
+
+       TRACE_ENTER(APPLET_EV_RECV, appctx);
+
+       /* TODO: outbuf must be empty. Find a better way to handle that but for now just return -1 */
+       if (b_data(&appctx->outbuf)) {
+               TRACE_STATE("Output buffer not empty, cannot fast-forward data", APPLET_EV_RECV, appctx);
+               return -1;
+       }
+
+       peer = xref_get_peer_and_lock(&appctx->sedesc->xref);
+       if (!peer) {
+               TRACE_STATE("Opposite endpoint not available yet", APPLET_EV_RECV, appctx);
+               goto end;
+       }
+       sdo = container_of(peer, struct sedesc, xref);
+       xref_unlock(&appctx->sedesc->xref, peer);
+
+       if (appctx->to_forward && count > appctx->to_forward)
+               count = appctx->to_forward;
+
+       len = se_nego_ff(sdo, &BUF_NULL, count, 0);
+       if (sdo->iobuf.flags & IOBUF_FL_NO_FF) {
+               sc_ep_clr(sc, SE_FL_MAY_FASTFWD);
+               TRACE_DEVEL("Fast-forwarding not supported by opposite endpoint, disable it", APPLET_EV_RECV, appctx);
+               goto end;
+       }
+       if (sdo->iobuf.flags & IOBUF_FL_FF_BLOCKED) {
+               sc_ep_set(sc, /* SE_FL_RCV_MORE |  */SE_FL_WANT_ROOM);
+               TRACE_STATE("waiting for more room", APPLET_EV_RECV|APPLET_EV_BLK, appctx);
+               goto end;
+       }
+
+       b_add(sdo->iobuf.buf, sdo->iobuf.offset);
+       ret = appctx->applet->fastfwd(appctx, sdo->iobuf.buf, len, 0);
+       b_sub(sdo->iobuf.buf, sdo->iobuf.offset);
+       sdo->iobuf.data += ret;
+
+       if (applet_fl_test(appctx, APPCTX_FL_EOI)) {
+               se_fl_set(appctx->sedesc, SE_FL_EOI);
+               sdo->iobuf.flags |= IOBUF_FL_EOI; /* TODO: it may be good to have a flag to be sure we can
+                                                  *       forward the EOI the to consumer side
+                                                  */
+               TRACE_STATE("report EOI to SE", APPLET_EV_RECV|APPLET_EV_BLK, appctx);
+       }
+       if (applet_fl_test(appctx, APPCTX_FL_EOS)) {
+               se_fl_set(appctx->sedesc, SE_FL_EOS);
+               TRACE_STATE("report EOS to SE", APPLET_EV_RECV|APPLET_EV_BLK, appctx);
+       }
+       if (applet_fl_test(appctx, APPCTX_FL_ERROR)) {
+               se_fl_set(appctx->sedesc, SE_FL_ERROR);
+               TRACE_STATE("report ERROR to SE", APPLET_EV_RECV|APPLET_EV_BLK, appctx);
+       }
+       /* else */
+       /*      applet_have_more_data(appctx); */
+
+       se_done_ff(sdo);
+
+end:
+       TRACE_LEAVE(APPLET_EV_RECV, appctx);
+       return ret;
+}
+
 /* Default applet handler */
 struct task *task_run_applet(struct task *t, void *context, unsigned int state)
 {
index 729003602436127841f1e8b7ae26ea2fc45f048d..3817e7b4d25cb92396ddf442f7366333f8cd379d 100644 (file)
@@ -1904,8 +1904,42 @@ int sc_applet_recv(struct stconn *sc)
 
        channel_check_idletimer(ic);
 
-       /* TODO: Handle fastfwd here be implement callback function first ! */
+       /* First, let's see if we may fast-forward data from a side to the other
+        * one without using the channel buffer.
+        */
+       if (!(global.tune.no_zero_copy_fwd & NO_ZERO_COPY_FWD) &&
+           sc_ep_test(sc, SE_FL_MAY_FASTFWD) && ic->to_forward) {
+               if (channel_data(ic)) {
+                       /* We're embarrassed, there are already data pending in
+                        * the buffer and we don't want to have them at two
+                        * locations at a time. Let's indicate we need some
+                        * place and ask the consumer to hurry.
+                        */
+                       flags |= CO_RFL_BUF_FLUSH;
+                       goto abort_fastfwd;
+               }
+               ret = appctx_fastfwd(sc, ic->to_forward, flags);
+               if (ret < 0)
+                       goto abort_fastfwd;
+               else if (ret > 0) {
+                       if (ic->to_forward != CHN_INFINITE_FORWARD)
+                               ic->to_forward -= ret;
+                       ic->total += ret;
+                       cur_read += ret;
+                       ic->flags |= CF_READ_EVENT;
+               }
 
+               if (sc_ep_test(sc, SE_FL_EOS | SE_FL_ERROR))
+                       goto end_recv;
+
+               if (sc_ep_test(sc, SE_FL_WANT_ROOM))
+                       sc_need_room(sc, -1);
+
+               if (sc_ep_test(sc, SE_FL_MAY_FASTFWD) && ic->to_forward)
+                       goto done_recv;
+       }
+
+ abort_fastfwd:
        if (!sc_alloc_ibuf(sc, &appctx->buffer_wait))
                goto end_recv;
 
@@ -2108,6 +2142,9 @@ int sc_applet_send(struct stconn *sc)
        if (se_fl_test(sc->sedesc, SE_FL_ORPHAN))
                return 0;
 
+       /* TODO: Splicing is not supported, so it is not possible to have FF data stuck into the I/O buf */
+       BUG_ON(sc_ep_have_ff_data(sc));
+
        if (co_data(oc)) {
                ret = appctx->applet->snd_buf(sc, &oc->buf, co_data(oc), 0);
                if (ret > 0) {