From: Christopher Faulet Date: Tue, 23 Jan 2024 06:56:34 +0000 (+0100) Subject: MEDIUM: applet: Add support for zero-copy forwarding from an applet X-Git-Tag: v3.0-dev3~54 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=39b6f5b04cbb8f3c1ae54faa8cf6113a6ff8baf4;p=thirdparty%2Fhaproxy.git MEDIUM: applet: Add support for zero-copy forwarding from an applet Thanks to this patch, it is possible to an applet to directly send data to the opposite endpoint. To do so, it must implement appctx callback function and set SE_FL_MAY_FASTFWD flag. Everything will be handled by appctx_fastfwd() function. The applet is only responsible to transfer data. If it sets value, it is used to limit the amount of data to forward. --- diff --git a/include/haproxy/applet.h b/include/haproxy/applet.h index dc9224b79e..2f84a11684 100644 --- a/include/haproxy/applet.h +++ b/include/haproxy/applet.h @@ -51,6 +51,7 @@ void appctx_free(struct appctx *appctx); size_t appctx_rcv_buf(struct stconn *sc, struct buffer *buf, size_t count, unsigned int flags); size_t appctx_snd_buf(struct stconn *sc, struct buffer *buf, size_t count, unsigned int flags); +int appctx_fastfwd(struct stconn *sc, unsigned int count, unsigned int flags); static inline struct appctx *appctx_new_here(struct applet *applet, struct sedesc *sedesc) { diff --git a/src/applet.c b/src/applet.c index 4ddb0da3bf..28341838ca 100644 --- a/src/applet.c +++ b/src/applet.c @@ -23,6 +23,7 @@ #include #include #include +#include unsigned int nb_applets = 0; @@ -594,6 +595,75 @@ size_t appctx_snd_buf(struct stconn *sc, struct buffer *buf, size_t count, unsig return ret; } +int appctx_fastfwd(struct stconn *sc, unsigned int count, unsigned int flags) +{ + struct appctx *appctx = __sc_appctx(sc); + struct xref *peer; + struct sedesc *sdo = NULL; + unsigned int len; + int ret = 0; + + TRACE_ENTER(APPLET_EV_RECV, appctx); + + /* TODO: outbuf must be empty. Find a better way to handle that but for now just return -1 */ + if (b_data(&appctx->outbuf)) { + TRACE_STATE("Output buffer not empty, cannot fast-forward data", APPLET_EV_RECV, appctx); + return -1; + } + + peer = xref_get_peer_and_lock(&appctx->sedesc->xref); + if (!peer) { + TRACE_STATE("Opposite endpoint not available yet", APPLET_EV_RECV, appctx); + goto end; + } + sdo = container_of(peer, struct sedesc, xref); + xref_unlock(&appctx->sedesc->xref, peer); + + if (appctx->to_forward && count > appctx->to_forward) + count = appctx->to_forward; + + len = se_nego_ff(sdo, &BUF_NULL, count, 0); + if (sdo->iobuf.flags & IOBUF_FL_NO_FF) { + sc_ep_clr(sc, SE_FL_MAY_FASTFWD); + TRACE_DEVEL("Fast-forwarding not supported by opposite endpoint, disable it", APPLET_EV_RECV, appctx); + goto end; + } + if (sdo->iobuf.flags & IOBUF_FL_FF_BLOCKED) { + sc_ep_set(sc, /* SE_FL_RCV_MORE | */SE_FL_WANT_ROOM); + TRACE_STATE("waiting for more room", APPLET_EV_RECV|APPLET_EV_BLK, appctx); + goto end; + } + + b_add(sdo->iobuf.buf, sdo->iobuf.offset); + ret = appctx->applet->fastfwd(appctx, sdo->iobuf.buf, len, 0); + b_sub(sdo->iobuf.buf, sdo->iobuf.offset); + sdo->iobuf.data += ret; + + if (applet_fl_test(appctx, APPCTX_FL_EOI)) { + se_fl_set(appctx->sedesc, SE_FL_EOI); + sdo->iobuf.flags |= IOBUF_FL_EOI; /* TODO: it may be good to have a flag to be sure we can + * forward the EOI the to consumer side + */ + TRACE_STATE("report EOI to SE", APPLET_EV_RECV|APPLET_EV_BLK, appctx); + } + if (applet_fl_test(appctx, APPCTX_FL_EOS)) { + se_fl_set(appctx->sedesc, SE_FL_EOS); + TRACE_STATE("report EOS to SE", APPLET_EV_RECV|APPLET_EV_BLK, appctx); + } + if (applet_fl_test(appctx, APPCTX_FL_ERROR)) { + se_fl_set(appctx->sedesc, SE_FL_ERROR); + TRACE_STATE("report ERROR to SE", APPLET_EV_RECV|APPLET_EV_BLK, appctx); + } + /* else */ + /* applet_have_more_data(appctx); */ + + se_done_ff(sdo); + +end: + TRACE_LEAVE(APPLET_EV_RECV, appctx); + return ret; +} + /* Default applet handler */ struct task *task_run_applet(struct task *t, void *context, unsigned int state) { diff --git a/src/stconn.c b/src/stconn.c index 7290036024..3817e7b4d2 100644 --- a/src/stconn.c +++ b/src/stconn.c @@ -1904,8 +1904,42 @@ int sc_applet_recv(struct stconn *sc) channel_check_idletimer(ic); - /* TODO: Handle fastfwd here be implement callback function first ! */ + /* First, let's see if we may fast-forward data from a side to the other + * one without using the channel buffer. + */ + if (!(global.tune.no_zero_copy_fwd & NO_ZERO_COPY_FWD) && + sc_ep_test(sc, SE_FL_MAY_FASTFWD) && ic->to_forward) { + if (channel_data(ic)) { + /* We're embarrassed, there are already data pending in + * the buffer and we don't want to have them at two + * locations at a time. Let's indicate we need some + * place and ask the consumer to hurry. + */ + flags |= CO_RFL_BUF_FLUSH; + goto abort_fastfwd; + } + ret = appctx_fastfwd(sc, ic->to_forward, flags); + if (ret < 0) + goto abort_fastfwd; + else if (ret > 0) { + if (ic->to_forward != CHN_INFINITE_FORWARD) + ic->to_forward -= ret; + ic->total += ret; + cur_read += ret; + ic->flags |= CF_READ_EVENT; + } + if (sc_ep_test(sc, SE_FL_EOS | SE_FL_ERROR)) + goto end_recv; + + if (sc_ep_test(sc, SE_FL_WANT_ROOM)) + sc_need_room(sc, -1); + + if (sc_ep_test(sc, SE_FL_MAY_FASTFWD) && ic->to_forward) + goto done_recv; + } + + abort_fastfwd: if (!sc_alloc_ibuf(sc, &appctx->buffer_wait)) goto end_recv; @@ -2108,6 +2142,9 @@ int sc_applet_send(struct stconn *sc) if (se_fl_test(sc->sedesc, SE_FL_ORPHAN)) return 0; + /* TODO: Splicing is not supported, so it is not possible to have FF data stuck into the I/O buf */ + BUG_ON(sc_ep_have_ff_data(sc)); + if (co_data(oc)) { ret = appctx->applet->snd_buf(sc, &oc->buf, co_data(oc), 0); if (ret > 0) {