]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MEDIUM: stconn/channel: Move pipes used for the splicing in the SE descriptors
authorChristopher Faulet <cfaulet@haproxy.com>
Thu, 22 Jun 2023 09:39:29 +0000 (11:39 +0200)
committerChristopher Faulet <cfaulet@haproxy.com>
Tue, 17 Oct 2023 16:51:13 +0000 (18:51 +0200)
The pipes used to put data when the kernel splicing is in used are moved in
the SE descriptors. For now, it is just a simple remplacement but there is a
major difference with the pipes in the channel. The data are pushed in the
consumer's pipe while it was pushed in the producer's pipe. So it means the
request data are now pushed in the pipe of the backend SE descriptor and
response data are pushed in the pipe of the frontend SE descriptor.

The idea is to hide the pipe from the channel/SC side and to be able to
handle fast-forwading in pipe but also in buffer. To do so, the pipe is
inside a new entity, called iobuf. This entity will be extended.

include/haproxy/channel-t.h
include/haproxy/channel.h
include/haproxy/stconn-t.h
src/applet.c
src/stconn.c
src/stream.c

index f876d910182753019a33e6ac1023ec128ee8cf77..8629f5f1e9522f79f50a46039425a55010928994 100644 (file)
@@ -202,7 +202,6 @@ struct channel {
        unsigned int flags;             /* CF_* */
        unsigned int analysers;         /* bit field indicating what to do on the channel */
        struct buffer buf;              /* buffer attached to the channel, always present but may move */
-       struct pipe *pipe;              /* non-NULL only when data present */
        size_t output;                  /* part of buffer which is to be forwarded */
        unsigned int to_forward;        /* number of bytes to forward after out without a wake-up */
        unsigned short last_read;       /* 16 lower bits of last read date (max pause=65s) */
index 36199b17250fa5d2041f7a4154cba900d098fd6c..154017bdf11d1be36c9c3a71d01f1c6f87393a9a 100644 (file)
@@ -323,7 +323,6 @@ static inline void channel_init(struct channel *chn)
        chn->last_read = now_ms;
        chn->xfer_small = chn->xfer_large = 0;
        chn->total = 0;
-       chn->pipe = NULL;
        chn->analysers = 0;
        chn->flags = 0;
        chn->output = 0;
@@ -404,13 +403,13 @@ static inline void channel_htx_forward_forever(struct channel *chn, struct htx *
 /*********************************************************************/
 
 /* Reports non-zero if the channel is empty, which means both its
- * buffer and pipe are empty. The construct looks strange but is
- * jump-less and much more efficient on both 32 and 64-bit than
- * the boolean test.
+ * buffer and pipe on the opposite SE are empty. The construct looks
+ * strange but is jump-less and much more efficient on both 32 and
+ * 64-bit than the boolean test.
  */
 static inline unsigned int channel_is_empty(const struct channel *c)
 {
-       return !(co_data(c) | (long)c->pipe);
+       return !(co_data(c) | (long)chn_cons(c)->sedesc->iobuf.pipe);
 }
 
 /* Returns non-zero if the channel is rewritable, which means that the buffer
index e507e1e945582372c03ae23eeabfb32c22fb2dcf..1d9888dc50068c4e464a89a6f18381aa325f77a5 100644 (file)
 
 #include <haproxy/obj_type-t.h>
 #include <haproxy/connection-t.h>
+#include <haproxy/pipe-t.h>
 #include <haproxy/show_flags-t.h>
 #include <haproxy/xref-t.h>
 
+enum iobuf_flags {
+       IOBUF_FL_NONE             = 0x00000000, /* For initialization purposes */
+};
+
+struct iobuf {
+       struct pipe *pipe;     /* non-NULL only when data present */
+       unsigned int flags;
+};
+
 /* Stream Endpoint Flags.
  * Please also update the se_show_flags() function below in case of changes.
  */
@@ -246,11 +256,13 @@ struct stconn;
 
  * <fsb> should be updated when the first send of a series is blocked and reset
  *       when a successful send is reported.
+ *
  */
 struct sedesc {
        void *se;                  /* the stream endpoint, i.e. the mux stream or the appctx */
        struct connection *conn;   /* the connection for connection-based streams */
        struct stconn *sc;         /* the stream connector we're attached to, or NULL */
+       struct iobuf iobuf;        /* contains data forwarded by the other side and that must be sent by the stream endpoint */
        unsigned int flags;        /* SE_FL_* */
        unsigned int lra;          /* the last read activity */
        unsigned int fsb;          /* the first send blocked */
index cdcbc2556fdbd3024bb8530611d4b48e8f5de54e..95fc9032358f22f67e4d9e24f93a29854fdf71c7 100644 (file)
@@ -383,7 +383,7 @@ int appctx_buf_available(void *arg)
        sc_have_buff(sc);
 
        /* was already allocated another way ? if so, don't take this one */
-       if (c_size(sc_ic(sc)) || sc_ic(sc)->pipe)
+       if (c_size(sc_ic(sc)) || sc_opposite(sc)->sedesc->iobuf.pipe)
                return 0;
 
        /* allocation possible now ? */
index fbfebc7a599ef03541afedaf740965944f564c53..89face2672fa732acf26f834ac0d8f0eb8b2a37d 100644 (file)
@@ -97,6 +97,9 @@ void sedesc_init(struct sedesc *sedesc)
        sedesc->fsb = TICK_ETERNITY;
        sedesc->xref.peer = NULL;
        se_fl_setall(sedesc, SE_FL_NONE);
+
+       sedesc->iobuf.pipe = NULL;
+       sedesc->iobuf.flags = IOBUF_FL_NONE;
 }
 
 /* Tries to alloc an endpoint and initialize it. Returns NULL on failure. */
@@ -117,7 +120,11 @@ struct sedesc *sedesc_new()
  */
 void sedesc_free(struct sedesc *sedesc)
 {
-       pool_free(pool_head_sedesc, sedesc);
+       if (sedesc) {
+               if (sedesc->iobuf.pipe)
+                       put_pipe(sedesc->iobuf.pipe);
+               pool_free(pool_head_sedesc, sedesc);
+       }
 }
 
 /* Tries to allocate a new stconn and initialize its main fields. On
@@ -622,9 +629,7 @@ static void sc_app_shut(struct stconn *sc)
 /* default chk_rcv function for scheduled tasks */
 static void sc_app_chk_rcv(struct stconn *sc)
 {
-       struct channel *ic = sc_ic(sc);
-
-       if (ic->pipe) {
+       if (sc_opposite(sc)->sedesc->iobuf.pipe) {
                /* stop reading */
                sc_need_room(sc, -1);
        }
@@ -795,7 +800,7 @@ static void sc_app_chk_snd_conn(struct stconn *sc)
        if (unlikely(channel_is_empty(oc)))  /* called with nothing to send ! */
                return;
 
-       if (!oc->pipe &&                          /* spliced data wants to be forwarded ASAP */
+       if (!sc->sedesc->iobuf.pipe &&                    /* spliced data wants to be forwarded ASAP */
            !sc_ep_test(sc, SE_FL_WAIT_DATA))       /* not waiting for data */
                return;
 
@@ -939,11 +944,9 @@ static void sc_app_shut_applet(struct stconn *sc)
 /* chk_rcv function for applets */
 static void sc_app_chk_rcv_applet(struct stconn *sc)
 {
-       struct channel *ic = sc_ic(sc);
-
        BUG_ON(!sc_appctx(sc));
 
-       if (!ic->pipe) {
+       if (!sc_opposite(sc)->sedesc->iobuf.pipe) {
                /* (re)start reading */
                appctx_wakeup(__sc_appctx(sc));
        }
@@ -1087,18 +1090,18 @@ static void sc_notify(struct stconn *sc)
         */
        if (!channel_is_empty(ic) &&
            sc_ep_test(sco, SE_FL_WAIT_DATA) &&
-           (!(sc->flags & SC_FL_SND_EXP_MORE) || c_full(ic) || ci_data(ic) == 0 || ic->pipe)) {
+           (!(sc->flags & SC_FL_SND_EXP_MORE) || c_full(ic) || ci_data(ic) == 0 || sco->sedesc->iobuf.pipe)) {
                int new_len, last_len;
 
                last_len = co_data(ic);
-               if (ic->pipe)
-                       last_len += ic->pipe->data;
+               if (sco->sedesc->iobuf.pipe)
+                       last_len += sco->sedesc->iobuf.pipe->data;
 
                sc_chk_snd(sco);
 
                new_len = co_data(ic);
-               if (ic->pipe)
-                       new_len += ic->pipe->data;
+               if (sco->sedesc->iobuf.pipe)
+                       new_len += sco->sedesc->iobuf.pipe->data;
 
                /* check if the consumer has freed some space either in the
                 * buffer or in the pipe.
@@ -1263,7 +1266,7 @@ static int sc_conn_recv(struct stconn *sc)
         * using a buffer.
         */
        if (sc_ep_test(sc, SE_FL_MAY_SPLICE) &&
-           (ic->pipe || ic->to_forward >= MIN_SPLICE_FORWARD) &&
+           (sc_opposite(sc)->sedesc->iobuf.pipe || ic->to_forward >= MIN_SPLICE_FORWARD) &&
            ic->flags & CF_KERN_SPLICING) {
                if (c_data(ic)) {
                        /* We're embarrassed, there are already data pending in
@@ -1275,14 +1278,14 @@ static int sc_conn_recv(struct stconn *sc)
                        goto abort_splice;
                }
 
-               if (unlikely(ic->pipe == NULL)) {
-                       if (pipes_used >= global.maxpipes || !(ic->pipe = get_pipe())) {
+               if (unlikely(sc_opposite(sc)->sedesc->iobuf.pipe == NULL)) {
+                       if (pipes_used >= global.maxpipes || !(sc_opposite(sc)->sedesc->iobuf.pipe = get_pipe())) {
                                ic->flags &= ~CF_KERN_SPLICING;
                                goto abort_splice;
                        }
                }
 
-               ret = conn->mux->rcv_pipe(sc, ic->pipe, ic->to_forward);
+               ret = conn->mux->rcv_pipe(sc, sc_opposite(sc)->sedesc->iobuf.pipe, ic->to_forward);
                if (ret < 0) {
                        /* splice not supported on this end, let's disable it */
                        ic->flags &= ~CF_KERN_SPLICING;
@@ -1312,12 +1315,13 @@ static int sc_conn_recv(struct stconn *sc)
        }
 
  abort_splice:
-       if (ic->pipe && unlikely(!ic->pipe->data)) {
-               put_pipe(ic->pipe);
-               ic->pipe = NULL;
+       if (sc_opposite(sc)->sedesc->iobuf.pipe && unlikely(!sc_opposite(sc)->sedesc->iobuf.pipe->data)) {
+               put_pipe(sc_opposite(sc)->sedesc->iobuf.pipe);
+               sc_opposite(sc)->sedesc->iobuf.pipe = NULL;
        }
 
-       if (ic->pipe && ic->to_forward && !(flags & CO_RFL_BUF_FLUSH) && sc_ep_test(sc, SE_FL_MAY_SPLICE)) {
+       if (sc_opposite(sc)->sedesc->iobuf.pipe && ic->to_forward &&
+           !(flags & CO_RFL_BUF_FLUSH) && sc_ep_test(sc, SE_FL_MAY_SPLICE)) {
                /* don't break splicing by reading, but still call rcv_buf()
                 * to pass the flag.
                 */
@@ -1597,17 +1601,17 @@ static int sc_conn_send(struct stconn *sc)
        if (!conn->mux)
                return 0;
 
-       if (oc->pipe && conn->xprt->snd_pipe && conn->mux->snd_pipe) {
-               ret = conn->mux->snd_pipe(sc, oc->pipe);
+       if (sc->sedesc->iobuf.pipe && conn->xprt->snd_pipe && conn->mux->snd_pipe) {
+               ret = conn->mux->snd_pipe(sc, sc->sedesc->iobuf.pipe);
                if (ret > 0)
                        did_send = 1;
 
-               if (!oc->pipe->data) {
-                       put_pipe(oc->pipe);
-                       oc->pipe = NULL;
+               if (!sc->sedesc->iobuf.pipe->data) {
+                       put_pipe(sc->sedesc->iobuf.pipe);
+                       sc->sedesc->iobuf.pipe = NULL;
                }
 
-               if (oc->pipe)
+               if (sc->sedesc->iobuf.pipe)
                        goto end;
        }
 
index 45a0ad766da566de77f34aa466bf4e31b7dae0e9..4c300a33616e37d7c9fd32d0fe8cd433817e8c3e 100644 (file)
@@ -320,10 +320,10 @@ int stream_buf_available(void *arg)
 {
        struct stream *s = arg;
 
-       if (!s->req.buf.size && !s->req.pipe && s->scf->flags & SC_FL_NEED_BUFF &&
+       if (!s->req.buf.size && !s->scb->sedesc->iobuf.pipe && s->scf->flags & SC_FL_NEED_BUFF &&
            b_alloc(&s->req.buf))
                sc_have_buff(s->scf);
-       else if (!s->res.buf.size && !s->res.pipe && s->scb->flags & SC_FL_NEED_BUFF &&
+       else if (!s->res.buf.size && !s->scf->sedesc->iobuf.pipe && s->scb->flags & SC_FL_NEED_BUFF &&
                 b_alloc(&s->res.buf))
                sc_have_buff(s->scb);
        else
@@ -631,12 +631,6 @@ void stream_free(struct stream *s)
                sess_change_server(s, NULL);
        }
 
-       if (s->req.pipe)
-               put_pipe(s->req.pipe);
-
-       if (s->res.pipe)
-               put_pipe(s->res.pipe);
-
        /* We may still be present in the buffer wait queue */
        if (LIST_INLIST(&s->buffer_wait.list))
                LIST_DEL_INIT(&s->buffer_wait.list);
@@ -3419,7 +3413,7 @@ void strm_dump_to_buffer(struct buffer *buf, const struct stream *strm, const ch
                     pfx,
                     &strm->req,
                     strm->req.flags, strm->req.analysers,
-                    strm->req.pipe ? strm->req.pipe->data : 0,
+                    strm->scb->sedesc->iobuf.pipe ? strm->scb->sedesc->iobuf.pipe->data : 0,
                     strm->req.to_forward, strm->req.total,
                     pfx,
                     strm->req.analyse_exp ?
@@ -3452,7 +3446,7 @@ void strm_dump_to_buffer(struct buffer *buf, const struct stream *strm, const ch
                     pfx,
                     &strm->res,
                     strm->res.flags, strm->res.analysers,
-                    strm->res.pipe ? strm->res.pipe->data : 0,
+                    strm->scf->sedesc->iobuf.pipe ? strm->scf->sedesc->iobuf.pipe->data : 0,
                     strm->res.to_forward, strm->res.total,
                     pfx,
                     strm->res.analyse_exp ?