]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MEDIUM: stconn: Add mux-to-mux fast-forward support
authorChristopher Faulet <cfaulet@haproxy.com>
Thu, 3 Aug 2023 15:51:58 +0000 (17:51 +0200)
committerChristopher Faulet <cfaulet@haproxy.com>
Tue, 17 Oct 2023 16:51:13 +0000 (18:51 +0200)
Now the kernel splicing support was removed, we can add mux-to-mux
fast-forward support. Of course, the splicing support will be reintroduced
in the muxes themselves but this will be transparent.

Changes are mainly located into sc_conn_recv() and sc_conn_send().

src/stconn.c

index 8af16deb975e64eedf0e16b8f55192e31e50d0ed..8a00b18cc5df66f17507ef59a3769b9be7399501 100644 (file)
@@ -806,7 +806,7 @@ static void sc_app_chk_snd_conn(struct stconn *sc)
            !sc_ep_test(sc, SE_FL_WAIT_DATA))       /* not waiting for data */
                return;
 
-       if (!(sc->wait_event.events & SUB_RETRY_SEND) && !channel_is_empty(sc_oc(sc)))
+       if (!(sc->wait_event.events & SUB_RETRY_SEND))
                sc_conn_send(sc);
 
        if (sc_ep_test(sc, SE_FL_ERROR | SE_FL_ERR_PENDING) || sc_is_conn_error(sc)) {
@@ -1090,9 +1090,9 @@ static void sc_notify(struct stconn *sc)
         * alone because an HTTP parser might need more data to complete the
         * parsing.
         */
-       if (!channel_is_empty(ic) &&
-           sc_ep_test(sco, SE_FL_WAIT_DATA) &&
-           (!(sc->flags & SC_FL_SND_EXP_MORE) || c_full(ic) || ci_data(ic) == 0 || sc_ep_have_ff_data(sco))) {
+       if (sc_ep_have_ff_data(sc_opposite(sc)) ||
+           (!channel_is_empty(ic) && sc_ep_test(sco, SE_FL_WAIT_DATA) &&
+            (!(sc->flags & SC_FL_SND_EXP_MORE) || c_full(ic) || ci_data(ic) == 0))) {
                int new_len, last_len;
 
                last_len = co_data(ic) + sc_ep_ff_data(sco);
@@ -1258,14 +1258,51 @@ static int sc_conn_recv(struct stconn *sc)
                ic->flags &= ~(CF_STREAMER | CF_STREAMER_FAST);
        }
 
-       if (sc_ep_have_ff_data(sc_opposite(sc)) && ic->to_forward &&
-           !(flags & CO_RFL_BUF_FLUSH) && sc_ep_test(sc, SE_FL_MAY_FASTFWD)) {
-               /* don't break splicing by reading, but still call rcv_buf()
-                * to pass the flag.
-                */
-               goto done_recv;
+#if defined(USE_LINUX_SPLICE)
+       /* Detect if the splicing is possible depending on the stream policy */
+       if ((global.tune.options & GTUNE_USE_SPLICE) &&
+           (ic->to_forward >= MIN_SPLICE_FORWARD) &&
+           ((!(sc->flags & SC_FL_ISBACK) && ((strm_fe(__sc_strm(sc))->options2|__sc_strm(sc)->be->options2) & PR_O2_SPLIC_REQ)) ||
+            ((sc->flags & SC_FL_ISBACK) && ((strm_fe(__sc_strm(sc))->options2|__sc_strm(sc)->be->options2) & PR_O2_SPLIC_RTR)) ||
+            ((ic->flags & CF_STREAMER_FAST) && ((strm_sess(__sc_strm(sc))->fe->options2|__sc_strm(sc)->be->options2) & PR_O2_SPLIC_AUT))))
+               flags |= CO_RFL_MAY_SPLICE;
+#endif
+
+       /* First, let's see if we may fast-forward data from a side to the other
+        * one without using the channel buffer.
+        */
+       if (sc_ep_test(sc, SE_FL_MAY_FASTFWD) && ic->to_forward) {
+               if (c_data(ic)) {
+                       /* We're embarrassed, there are already data pending in
+                        * the buffer and we don't want to have them at two
+                        * locations at a time. Let's indicate we need some
+                        * place and ask the consumer to hurry.
+                        */
+                       flags |= CO_RFL_BUF_FLUSH;
+                       goto abort_fastfwd;
+               }
+               ret = conn->mux->fastfwd(sc, ic->to_forward, flags);
+               if (ret < 0)
+                       goto abort_fastfwd;
+               else if (ret > 0) {
+                       if (ic->to_forward != CHN_INFINITE_FORWARD)
+                               ic->to_forward -= ret;
+                       ic->total += ret;
+                       cur_read += ret;
+                       ic->flags |= CF_READ_EVENT;
+               }
+
+               if (sc_ep_test(sc, SE_FL_EOS | SE_FL_ERROR))
+                       goto end_recv;
+
+               if (sc_ep_test(sc, SE_FL_WANT_ROOM))
+                       sc_need_room(sc, -1);
+
+               if (sc_ep_test(sc, SE_FL_MAY_FASTFWD) && ic->to_forward)
+                       goto done_recv;
        }
 
+ abort_fastfwd:
        /* now we'll need a input buffer for the stream */
        if (!sc_alloc_ibuf(sc, &(__sc_strm(sc)->buffer_wait)))
                goto end_recv;
@@ -1397,7 +1434,9 @@ static int sc_conn_recv(struct stconn *sc)
        } /* while !flags */
 
  done_recv:
-       if (cur_read) {
+       if (!cur_read)
+               se_have_no_more_data(sc->sedesc);
+       else {
                if ((ic->flags & (CF_STREAMER | CF_STREAMER_FAST)) &&
                    (cur_read <= ic->buf.size / 2)) {
                        ic->xfer_large = 0;
@@ -1461,7 +1500,8 @@ static int sc_conn_recv(struct stconn *sc)
                sc->flags |= SC_FL_ERROR;
                ret = 1;
        }
-       else if (!(sc->flags & (SC_FL_WONT_READ|SC_FL_NEED_BUFF|SC_FL_NEED_ROOM)) &&
+       else if (!cur_read &&
+                !(sc->flags & (SC_FL_WONT_READ|SC_FL_NEED_BUFF|SC_FL_NEED_ROOM)) &&
                 !(sc->flags & (SC_FL_EOS|SC_FL_ABRT_DONE))) {
                /* Subscribe to receive events if we're blocking on I/O */
                conn->mux->subscribe(sc, SUB_RETRY_RECV, &sc->wait_event);
@@ -1539,6 +1579,30 @@ static int sc_conn_send(struct stconn *sc)
        if (!conn->mux)
                return 0;
 
+       if (sc_ep_have_ff_data(sc)) {
+               unsigned int send_flag = 0;
+
+               if ((!(sc->flags & (SC_FL_SND_ASAP|SC_FL_SND_NEVERWAIT)) &&
+                    ((oc->to_forward && oc->to_forward != CHN_INFINITE_FORWARD) ||
+                     (sc->flags & SC_FL_SND_EXP_MORE) ||
+                     (IS_HTX_STRM(s) &&
+                      (!(sco->flags & (SC_FL_EOI|SC_FL_EOS|SC_FL_ABRT_DONE)) && htx_expect_more(htxbuf(&oc->buf)))))) ||
+                   ((oc->flags & CF_ISRESP) &&
+                    (oc->flags & CF_AUTO_CLOSE) &&
+                    (sc->flags & SC_FL_SHUT_WANTED)))
+                       send_flag |= CO_SFL_MSG_MORE;
+
+               if (oc->flags & CF_STREAMER)
+                       send_flag |= CO_SFL_STREAMER;
+
+               ret = conn->mux->resume_fastfwd(sc, send_flag);
+               if (ret > 0)
+                       did_send = 1;
+
+               if (sc_ep_have_ff_data(sc))
+                       goto end;
+       }
+
        /* At this point, the pipe is empty, but we may still have data pending
         * in the normal buffer.
         */
@@ -1637,7 +1701,18 @@ static int sc_conn_send(struct stconn *sc)
                return 1;
        }
 
-       if (!channel_is_empty(oc)) {
+       /* FIXME: Must be reviewed for FF */
+       if (channel_is_empty(oc)) {
+               /* If fast-forwarding is blocked, unblock it now to check for
+                * receive on the other side
+                */
+               if (sc->sedesc->iobuf.flags & IOBUF_FL_FF_BLOCKED) {
+                       sc->sedesc->iobuf.flags &= ~IOBUF_FL_FF_BLOCKED;
+                       sc_have_room(sco);
+                       did_send = 1;
+               }
+       }
+       else {
                /* We couldn't send all of our data, let the mux know we'd like to send more */
                conn->mux->subscribe(sc, SUB_RETRY_SEND, &sc->wait_event);
        }
@@ -1777,7 +1852,7 @@ struct task *sc_conn_io_cb(struct task *t, void *ctx, unsigned int state)
        if (!sc_conn(sc))
                return t;
 
-       if (!(sc->wait_event.events & SUB_RETRY_SEND) && !channel_is_empty(sc_oc(sc)))
+       if (!(sc->wait_event.events & SUB_RETRY_SEND) && (!channel_is_empty(sc_oc(sc)) || (sc->sedesc->iobuf.flags & IOBUF_FL_FF_BLOCKED)))
                ret = sc_conn_send(sc);
        if (!(sc->wait_event.events & SUB_RETRY_RECV))
                ret |= sc_conn_recv(sc);