From b3e0de46ce54de066d329253625d734b01134aa0 Mon Sep 17 00:00:00 2001 From: Christopher Faulet Date: Thu, 11 Oct 2018 13:54:13 +0200 Subject: [PATCH] MEDIUM: stream-int: Rely only on SI_FL_WAIT_ROOM to stop data receipt This flag is set on the stream interface when we should wait for more space in the channel's buffer to store more incoming data. This means we should wait some outgoing data are sent before retrying to receive more data. But in stream interface functions, at many places, instead of checking this flag, we use the function channel_may_recv to know if we can (re)start reading. This currently works but it is not really consistent. And, it works because only raw data are stored in buffers. But it will be a problem when we start to store structured data in buffers. So to avoid any problems with futur implementations, we now rely only on SI_FL_WAIT_ROOM. The function channel_may_recv can still be called, but only when we are sure to handle raw data (for instance in functions ci_put*). To do so, among other things, we must be sure to unset SI_FL_WAIT_ROOM and offer an opportunity to call chk_rcv() on a stream interface when some data are sent on the other end, which is now granted by the previous patch series. --- src/stream_interface.c | 48 ++++++++++++++++-------------------------- 1 file changed, 18 insertions(+), 30 deletions(-) diff --git a/src/stream_interface.c b/src/stream_interface.c index 29a76e44f5..46fc9ebf1c 100644 --- a/src/stream_interface.c +++ b/src/stream_interface.c @@ -246,7 +246,7 @@ static void stream_int_chk_rcv(struct stream_interface *si) __FUNCTION__, si, si->state, ic->flags, si_oc(si)->flags); - if (!channel_may_recv(ic) || ic->pipe) { + if (ic->pipe) { /* stop reading */ si->flags |= SI_FL_WAIT_ROOM; } @@ -460,7 +460,7 @@ void stream_int_notify(struct stream_interface *si) /* indicate that we may be waiting for data from the output channel or * we're about to close and can't expect more data if SHUTW_NOW is there. */ - if ((oc->flags & (CF_SHUTW|CF_SHUTW_NOW)) == 0 && channel_may_recv(oc)) + if (!(oc->flags & (CF_SHUTW|CF_SHUTW_NOW))) si->flags |= SI_FL_WAIT_DATA; else if ((oc->flags & (CF_SHUTW|CF_SHUTW_NOW)) == CF_SHUTW_NOW) si->flags &= ~SI_FL_WAIT_DATA; @@ -477,8 +477,7 @@ void stream_int_notify(struct stream_interface *si) ic->rex = tick_add_ifset(now_ms, ic->rto); if (likely((oc->flags & (CF_SHUTW|CF_WRITE_PARTIAL|CF_DONT_READ)) == CF_WRITE_PARTIAL && - channel_may_recv(oc) && - (si_opposite(si)->flags & SI_FL_WAIT_ROOM))) { + (si_opposite(si)->flags & SI_FL_WAIT_ROOM))) { si_opposite(si)->flags &= ~SI_FL_WAIT_ROOM; si_chk_rcv(si_opposite(si)); } @@ -513,7 +512,7 @@ void stream_int_notify(struct stream_interface *si) /* check if the consumer has freed some space either in the * buffer or in the pipe. */ - if (channel_may_recv(ic) && new_len < last_len) { + if (new_len < last_len) { si->flags &= ~SI_FL_WAIT_ROOM; si_chk_rcv(si); } @@ -522,8 +521,7 @@ void stream_int_notify(struct stream_interface *si) if (si->flags & SI_FL_WAIT_ROOM) { ic->rex = TICK_ETERNITY; } - else if ((ic->flags & (CF_SHUTR|CF_READ_PARTIAL|CF_DONT_READ)) == CF_READ_PARTIAL && - channel_may_recv(ic)) { + else if ((ic->flags & (CF_SHUTR|CF_READ_PARTIAL|CF_DONT_READ)) == CF_READ_PARTIAL) { /* we must re-enable reading if si_chk_snd() has freed some space */ if (!(ic->flags & CF_READ_NOEXP) && tick_isset(ic->rex)) ic->rex = tick_add_ifset(now_ms, ic->rto); @@ -751,15 +749,12 @@ void stream_int_update(struct stream_interface *si) si_want_put(si); /* Read not closed, update FD status and timeout for reads */ - if ((ic->flags & CF_DONT_READ) || !channel_may_recv(ic)) { + if ((ic->flags & CF_DONT_READ) || co_data(ic)) { /* stop reading */ - if (!(si->flags & SI_FL_WAIT_ROOM)) { - if (!(ic->flags & CF_DONT_READ)) /* full */ - si_cant_put(si); - ic->rex = TICK_ETERNITY; - } + si_stop_put(si); + ic->rex = TICK_ETERNITY; } - else if (!(si->flags & SI_FL_WAIT_ROOM) || !co_data(ic)) { + else { /* (re)start reading and update timeout. Note: we don't recompute the timeout * everytime we get here, otherwise it would risk never to expire. We only * update it if is was not yet set. The stream socket handler will already @@ -1011,16 +1006,8 @@ static void stream_int_shutw_conn(struct stream_interface *si) */ static void stream_int_chk_rcv_conn(struct stream_interface *si) { - struct channel *ic = si_ic(si); - - if (!channel_may_recv(ic)) { - /* stop reading */ - si->flags |= SI_FL_WAIT_ROOM; - } - else { - /* (re)start reading */ - tasklet_wakeup(si->wait_event.task); - } + /* (re)start reading */ + tasklet_wakeup(si->wait_event.task); } @@ -1269,11 +1256,6 @@ int si_cs_recv(struct conn_stream *cs) ic->flags |= CF_READ_PARTIAL; ic->total += ret; - if (!channel_may_recv(ic)) { - si_cant_put(si); - break; - } - if ((ic->flags & CF_READ_DONTWAIT) || --read_poll <= 0) break; @@ -1295,6 +1277,12 @@ int si_cs_recv(struct conn_stream *cs) if (ret >= global.tune.recv_enough) break; } + + /* if we are waiting for more space, don't try to read more data + * right now. + */ + if (si->flags & SI_FL_WAIT_ROOM) + break; } /* while !flags */ if (cur_read) { @@ -1537,7 +1525,7 @@ static void stream_int_chk_rcv_applet(struct stream_interface *si) __FUNCTION__, si, si->state, ic->flags, si_oc(si)->flags); - if (channel_may_recv(ic) && !ic->pipe) { + if (!ic->pipe) { /* (re)start reading */ appctx_wakeup(si_appctx(si)); } -- 2.39.5