From: Olivier Houchard Date: Fri, 31 Aug 2018 15:29:12 +0000 (+0200) Subject: MEDIUM: stream_interface: Make recv() subscribe when more data is needed. X-Git-Tag: v1.9-dev2~29 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=f653528dc1dc001c8bdd126c7364371eedd78bff;p=thirdparty%2Fhaproxy.git MEDIUM: stream_interface: Make recv() subscribe when more data is needed. Refactor the code so that si_cs_recv() subscribes to receive events. --- diff --git a/src/stream_interface.c b/src/stream_interface.c index 46e57d453a..e5ddee68f1 100644 --- a/src/stream_interface.c +++ b/src/stream_interface.c @@ -51,10 +51,10 @@ static void stream_int_shutr_applet(struct stream_interface *si); static void stream_int_shutw_applet(struct stream_interface *si); static void stream_int_chk_rcv_applet(struct stream_interface *si); static void stream_int_chk_snd_applet(struct stream_interface *si); -static void si_cs_recv(struct conn_stream *cs); +static int si_cs_recv(struct conn_stream *cs); static int si_cs_wake_cb(struct conn_stream *cs); static int si_idle_conn_wake_cb(struct conn_stream *cs); -static struct task * si_cs_send(struct conn_stream *cs); +static int si_cs_send(struct conn_stream *cs); /* stream-interface operations for embedded tasks */ struct si_ops si_embedded_ops = { @@ -631,7 +631,7 @@ static int si_cs_wake_cb(struct conn_stream *cs) * caller to commit polling changes. The caller should check conn->flags * for errors. */ -static struct task * si_cs_send(struct conn_stream *cs) +static int si_cs_send(struct conn_stream *cs) { struct connection *conn = cs->conn; struct stream_interface *si = cs->data; @@ -641,21 +641,21 @@ static struct task * si_cs_send(struct conn_stream *cs) /* We're already waiting to be able to send, give up */ if (si->wait_list.wait_reason & SUB_CAN_SEND) - return NULL; + return 0; if (conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR) - return NULL; + return 0; if (conn->flags & CO_FL_HANDSHAKE) { /* a handshake was requested */ /* Schedule ourself to be woken up once the handshake is done */ conn->xprt->subscribe(conn, SUB_CAN_SEND, &si->wait_list); - return NULL; + return 0; } /* we might have been called just after an asynchronous shutw */ if (si_oc(si)->flags & CF_SHUTW) - return NULL; + return 0; /* ensure it's only set if a write attempt has succeeded */ oc->flags &= ~CF_WRITE_PARTIAL; @@ -673,7 +673,7 @@ static struct task * si_cs_send(struct conn_stream *cs) } if (conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR) - return NULL; + return 0; } /* At this point, the pipe is empty, but we may still have data pending @@ -753,20 +753,24 @@ wake_others: } } - return NULL; + return did_send; } struct task *si_cs_io_cb(struct task *t, void *ctx, unsigned short state) { struct stream_interface *si = ctx; struct conn_stream *cs = objt_cs(si->end); + int ret = 0; if (!cs) return NULL; - if (!(si->wait_list.wait_reason & SUB_CAN_SEND)) { - si_cs_send(cs); + if (!(si->wait_list.wait_reason & SUB_CAN_SEND)) + ret = si_cs_send(cs); + if (!(si->wait_list.wait_reason & SUB_CAN_RECV)) + ret |= si_cs_recv(cs); + if (ret != 0) si_cs_wake_cb(cs); - } + return (NULL); } @@ -1138,12 +1142,12 @@ static void stream_int_chk_snd_conn(struct stream_interface *si) * into the buffer from the connection. It iterates over the mux layer's * rcv_buf function. */ -static void si_cs_recv(struct conn_stream *cs) +static int si_cs_recv(struct conn_stream *cs) { struct connection *conn = cs->conn; struct stream_interface *si = cs->data; struct channel *ic = si_ic(si); - int ret, max, cur_read; + int ret, max, cur_read = 0; int read_poll = MAX_READ_POLL_LOOPS; /* stop immediately on errors. Note that we DON'T want to stop on @@ -1153,18 +1157,22 @@ static void si_cs_recv(struct conn_stream *cs) * which rejects it before reading it all. */ if (conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR) - return; + return 0; + + /* If another call to si_cs_recv() failed, and we subscribed to + * recv events already, give up now. + */ + if (si->wait_list.wait_reason & SUB_CAN_RECV) + return 0; /* maybe we were called immediately after an asynchronous shutr */ if (ic->flags & CF_SHUTR) - return; + return 0; /* stop here if we reached the end of data */ if (cs->flags & CS_FL_EOS) goto out_shutdown_r; - cur_read = 0; - if ((ic->flags & (CF_STREAMER | CF_STREAMER_FAST)) && !co_data(ic) && global.tune.idle_timer && (unsigned short)(now_ms - ic->last_read) >= global.tune.idle_timer) { @@ -1218,7 +1226,7 @@ static void si_cs_recv(struct conn_stream *cs) goto out_shutdown_r; if (conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR) - return; + return cur_read != 0; if (conn->flags & CO_FL_WAIT_ROOM) { /* the pipe is full or we have read enough data that it @@ -1377,13 +1385,16 @@ static void si_cs_recv(struct conn_stream *cs) end_recv: if (conn->flags & CO_FL_ERROR || cs->flags & CS_FL_ERROR) - return; + return cur_read != 0; if (cs->flags & CS_FL_EOS) /* connection closed */ goto out_shutdown_r; - return; + /* Subscribe to receive events */ + conn->mux->subscribe(cs, SUB_CAN_RECV, &si->wait_list); + + return cur_read != 0; out_shutdown_r: /* we received a shutdown */ @@ -1391,7 +1402,7 @@ static void si_cs_recv(struct conn_stream *cs) if (ic->flags & CF_AUTO_CLOSE) channel_shutw_now(ic); stream_sock_read0(si); - return; + return cur_read != 0; } /*