]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MEDIUM: stream: Limit number of synchronous send per stream wakeup
authorChristopher Faulet <cfaulet@haproxy.com>
Tue, 3 Feb 2026 06:49:21 +0000 (07:49 +0100)
committerChristopher Faulet <cfaulet@haproxy.com>
Wed, 18 Feb 2026 12:26:21 +0000 (13:26 +0100)
It is not a bug fix, because there is no way to hit the issue for now. But
there is nothing preventing a loop of synchronous sends in process_stream().
Indead, when a synchronous send is successfully performed, we restart the
SCs evaluation and at the end another synchronous send is attempted. So with
an endpoint consuming data bit by bit or with a filter fowarding few bytes
at each call, it is possible to loop for a while in process_stream().

Because it is not expected, we now limit the number of synchronous send per
wakeup to two calls. In a nominal case, it should never be more. This commit
is mandatory to be able to handle large buffers on channels

There is no reason to backport this commit except if the large buffers
support on channels are backported.

include/haproxy/sc_strm.h
src/stconn.c
src/stream.c

index bac877dc1a22cd8d2644c236bb26acddf93b0726..c0e91201d8fbd013f9940b16e1876ae4d6e6ab95 100644 (file)
@@ -38,7 +38,7 @@ void sc_update_tx(struct stconn *sc);
 
 struct task *sc_conn_io_cb(struct task *t, void *ctx, unsigned int state);
 int sc_conn_sync_recv(struct stconn *sc);
-void sc_conn_sync_send(struct stconn *sc);
+int sc_conn_sync_send(struct stconn *sc);
 
 int sc_applet_sync_recv(struct stconn *sc);
 void sc_applet_sync_send(struct stconn *sc);
@@ -408,10 +408,15 @@ static inline int sc_sync_recv(struct stconn *sc)
 /* Perform a synchronous send using the right version, depending the endpoing is
  * a connection or an applet.
  */
-static inline void sc_sync_send(struct stconn *sc)
+static inline int sc_sync_send(struct stconn *sc, unsigned cnt)
 {
-       if (sc_ep_test(sc, SE_FL_T_MUX))
-               sc_conn_sync_send(sc);
+       if (!sc_ep_test(sc, SE_FL_T_MUX))
+               return 0;
+       if (cnt >= 2 && co_data(sc_oc(sc))) {
+               task_wakeup(__sc_strm(sc)->task, TASK_WOKEN_MSG);
+               return 0;
+       }
+       return sc_conn_sync_send(sc);
 }
 
 /* Combines both sc_update_rx() and sc_update_tx() at once */
index 9d54dbff43c7ec41cb7957dfbb1028b54b0c8a9d..c4a96d4fef3d55d8cb78dc74f47885984ae1b18d 100644 (file)
@@ -1801,27 +1801,30 @@ int sc_conn_send(struct stconn *sc)
  * flag are cleared prior to the attempt, and will possibly be updated in case
  * of success.
  */
-void sc_conn_sync_send(struct stconn *sc)
+int sc_conn_sync_send(struct stconn *sc)
 {
        struct channel *oc = sc_oc(sc);
+       int did_send = 0;
 
        oc->flags &= ~CF_WRITE_EVENT;
 
        if (sc->flags & SC_FL_SHUT_DONE)
-               return;
+               goto end;
 
        if (!co_data(oc))
-               return;
+               goto end;
 
        if (!sc_state_in(sc->state, SC_SB_CON|SC_SB_RDY|SC_SB_EST))
-               return;
+               goto end;
 
        if (!sc_mux_ops(sc))
-               return;
+               goto end;
 
-       sc_conn_send(sc);
+       did_send = sc_conn_send(sc);
        if (oc->flags & CF_WRITE_EVENT)
                oc->flags |= CF_WAKE_ONCE;
+  end:
+       return did_send;
 }
 
 /* Called by I/O handlers after completion.. It propagates
index 8913773010e345ef539d836fc71bd02f0d859356..873207a4a9dc5351d12406f35df53e2996b0459a 100644 (file)
@@ -1828,6 +1828,7 @@ struct task *process_stream(struct task *t, void *context, unsigned int state)
        struct channel *req, *res;
        struct stconn *scf, *scb;
        unsigned int rate;
+       unsigned int scf_send_cnt, scb_send_cnt;
 
        activity[tid].stream_calls++;
        stream_cond_update_cpu_latency(s);
@@ -1855,6 +1856,7 @@ struct task *process_stream(struct task *t, void *context, unsigned int state)
        /* Keep a copy of SC flags */
        scf_flags = scf->flags;
        scb_flags = scb->flags;
+       scf_send_cnt = scb_send_cnt = 0;
 
        /* update pending events */
        s->pending_events |= stream_map_task_state(state);
@@ -2488,7 +2490,7 @@ struct task *process_stream(struct task *t, void *context, unsigned int state)
        }
 
        /* Let's see if we can send the pending request now */
-       sc_sync_send(scb);
+       sc_sync_send(scb, scb_send_cnt++);
 
        /*
         * Now forward all shutdown requests between both sides of the request buffer
@@ -2598,7 +2600,7 @@ struct task *process_stream(struct task *t, void *context, unsigned int state)
        scf_flags = (scf_flags & ~(SC_FL_SHUT_DONE|SC_FL_SHUT_WANTED)) | (scf->flags & (SC_FL_SHUT_DONE|SC_FL_SHUT_WANTED));
 
        /* Let's see if we can send the pending response now */
-       sc_sync_send(scf);
+       sc_sync_send(scf, scf_send_cnt++);
 
        /*
         * Now forward all shutdown requests between both sides of the buffer