From 53b7150357b0ca3bfd4af784e349b6f06131baab Mon Sep 17 00:00:00 2001 From: Christopher Faulet Date: Tue, 3 Feb 2026 07:49:21 +0100 Subject: [PATCH] MEDIUM: stream: Limit number of synchronous send per stream wakeup It is not a bug fix, because there is no way to hit the issue for now. But there is nothing preventing a loop of synchronous sends in process_stream(). Indead, when a synchronous send is successfully performed, we restart the SCs evaluation and at the end another synchronous send is attempted. So with an endpoint consuming data bit by bit or with a filter fowarding few bytes at each call, it is possible to loop for a while in process_stream(). Because it is not expected, we now limit the number of synchronous send per wakeup to two calls. In a nominal case, it should never be more. This commit is mandatory to be able to handle large buffers on channels There is no reason to backport this commit except if the large buffers support on channels are backported. --- include/haproxy/sc_strm.h | 13 +++++++++---- src/stconn.c | 15 +++++++++------ src/stream.c | 6 ++++-- 3 files changed, 22 insertions(+), 12 deletions(-) diff --git a/include/haproxy/sc_strm.h b/include/haproxy/sc_strm.h index bac877dc1..c0e91201d 100644 --- a/include/haproxy/sc_strm.h +++ b/include/haproxy/sc_strm.h @@ -38,7 +38,7 @@ void sc_update_tx(struct stconn *sc); struct task *sc_conn_io_cb(struct task *t, void *ctx, unsigned int state); int sc_conn_sync_recv(struct stconn *sc); -void sc_conn_sync_send(struct stconn *sc); +int sc_conn_sync_send(struct stconn *sc); int sc_applet_sync_recv(struct stconn *sc); void sc_applet_sync_send(struct stconn *sc); @@ -408,10 +408,15 @@ static inline int sc_sync_recv(struct stconn *sc) /* Perform a synchronous send using the right version, depending the endpoing is * a connection or an applet. */ -static inline void sc_sync_send(struct stconn *sc) +static inline int sc_sync_send(struct stconn *sc, unsigned cnt) { - if (sc_ep_test(sc, SE_FL_T_MUX)) - sc_conn_sync_send(sc); + if (!sc_ep_test(sc, SE_FL_T_MUX)) + return 0; + if (cnt >= 2 && co_data(sc_oc(sc))) { + task_wakeup(__sc_strm(sc)->task, TASK_WOKEN_MSG); + return 0; + } + return sc_conn_sync_send(sc); } /* Combines both sc_update_rx() and sc_update_tx() at once */ diff --git a/src/stconn.c b/src/stconn.c index 9d54dbff4..c4a96d4fe 100644 --- a/src/stconn.c +++ b/src/stconn.c @@ -1801,27 +1801,30 @@ int sc_conn_send(struct stconn *sc) * flag are cleared prior to the attempt, and will possibly be updated in case * of success. */ -void sc_conn_sync_send(struct stconn *sc) +int sc_conn_sync_send(struct stconn *sc) { struct channel *oc = sc_oc(sc); + int did_send = 0; oc->flags &= ~CF_WRITE_EVENT; if (sc->flags & SC_FL_SHUT_DONE) - return; + goto end; if (!co_data(oc)) - return; + goto end; if (!sc_state_in(sc->state, SC_SB_CON|SC_SB_RDY|SC_SB_EST)) - return; + goto end; if (!sc_mux_ops(sc)) - return; + goto end; - sc_conn_send(sc); + did_send = sc_conn_send(sc); if (oc->flags & CF_WRITE_EVENT) oc->flags |= CF_WAKE_ONCE; + end: + return did_send; } /* Called by I/O handlers after completion.. It propagates diff --git a/src/stream.c b/src/stream.c index 891377301..873207a4a 100644 --- a/src/stream.c +++ b/src/stream.c @@ -1828,6 +1828,7 @@ struct task *process_stream(struct task *t, void *context, unsigned int state) struct channel *req, *res; struct stconn *scf, *scb; unsigned int rate; + unsigned int scf_send_cnt, scb_send_cnt; activity[tid].stream_calls++; stream_cond_update_cpu_latency(s); @@ -1855,6 +1856,7 @@ struct task *process_stream(struct task *t, void *context, unsigned int state) /* Keep a copy of SC flags */ scf_flags = scf->flags; scb_flags = scb->flags; + scf_send_cnt = scb_send_cnt = 0; /* update pending events */ s->pending_events |= stream_map_task_state(state); @@ -2488,7 +2490,7 @@ struct task *process_stream(struct task *t, void *context, unsigned int state) } /* Let's see if we can send the pending request now */ - sc_sync_send(scb); + sc_sync_send(scb, scb_send_cnt++); /* * Now forward all shutdown requests between both sides of the request buffer @@ -2598,7 +2600,7 @@ struct task *process_stream(struct task *t, void *context, unsigned int state) scf_flags = (scf_flags & ~(SC_FL_SHUT_DONE|SC_FL_SHUT_WANTED)) | (scf->flags & (SC_FL_SHUT_DONE|SC_FL_SHUT_WANTED)); /* Let's see if we can send the pending response now */ - sc_sync_send(scf); + sc_sync_send(scf, scf_send_cnt++); /* * Now forward all shutdown requests between both sides of the buffer -- 2.47.3