]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MEDIUM: stream-int: make use of si_rx_chan_{rdy,blk} to control the stream-int from...
authorWilly Tarreau <w@1wt.eu>
Wed, 14 Nov 2018 16:10:36 +0000 (17:10 +0100)
committerWilly Tarreau <w@1wt.eu>
Sun, 18 Nov 2018 20:41:49 +0000 (21:41 +0100)
The channel can disable reading from the stream-interface using various
methods, such as :
  - CF_DONT_READ
  - !channel_may_recv()
  - and possibly others

Till now this was done by mangling SI_FL_RX_WAIT_EP which is not
appropriate at all since it's not the stream interface which decides
whether it wants to deliver data or not. Some places were also wrongly
relying on SI_FL_RXBLK_ROOM since it was the only other alternative,
but it's not suitable for CF_DONT_READ.

Let's use the SI_FL_RXBLK_CHAN flag for this instead. It will properly
prevent the stream interface from being woken up and reads from
subscribing to more receipt without being accidently removed. It is
automatically reset if CF_DONT_READ is not set in stream_int_notify().

The code is not trivial because it splits the logic between everything
related to buffer contents (channel_is_empty(), CF_WRITE_PARTIAL, etc)
and buffer policy (CF_DONT_READ). Also it now needs to decide timeouts
based on any blocking flag and not just SI_FL_RXBLK_ROOM anymore.

It looks like this patch has caused a minor performance degradation on
connection rate, which possibly deserves being investigated deeper as
the test conditions are uncertain (e.g. slightly more subscribe calls?).

include/proto/stream_interface.h
src/stream_interface.c

index e7293d747b874d234643aa8912dad8b742f05ba6..d1c022c263fb2a2869c516711ae4c990616af73f 100644 (file)
@@ -278,6 +278,18 @@ static inline void si_rx_endp_done(struct stream_interface *si)
        si->flags |=  SI_FL_RX_WAIT_EP;
 }
 
+/* Tell a stream interface the input channel is OK with it sending it some data */
+static inline void si_rx_chan_rdy(struct stream_interface *si)
+{
+       si->flags &= ~SI_FL_RXBLK_CHAN;
+}
+
+/* Tell a stream interface the input channel is not OK with it sending it some data */
+static inline void si_rx_chan_blk(struct stream_interface *si)
+{
+       si->flags |=  SI_FL_RXBLK_CHAN;
+}
+
 /* The stream interface just got the input buffer it was waiting for */
 static inline void si_rx_buff_rdy(struct stream_interface *si)
 {
index 80412e7c10a6d0c551f562c4a0ed5facbcf7c2a0..c349aac2e26551ff879bb2ce7106a7b02882d524 100644 (file)
@@ -481,10 +481,13 @@ void stream_int_notify(struct stream_interface *si)
        }
 
        if ((sio->flags & SI_FL_RXBLK_ROOM) &&
-           ((oc->flags & (CF_SHUTW|CF_WRITE_PARTIAL|CF_DONT_READ)) == CF_WRITE_PARTIAL ||
-            channel_is_empty(oc))) {
+           ((oc->flags & CF_WRITE_PARTIAL) || channel_is_empty(oc)))
                sio->flags &= ~SI_FL_RXBLK_ROOM;
-       }
+
+       if (oc->flags & CF_DONT_READ)
+               si_rx_chan_blk(sio);
+       else
+               si_rx_chan_rdy(sio);
 
        /* Notify the other side when we've injected data into the IC that
         * needs to be forwarded. We can do fast-forwarding as soon as there
@@ -519,13 +522,16 @@ void stream_int_notify(struct stream_interface *si)
                        si->flags &= ~SI_FL_RXBLK_ROOM;
        }
 
+       if (!(ic->flags & CF_DONT_READ))
+               si_rx_chan_rdy(si);
+
        si_chk_rcv(si);
        si_chk_rcv(sio);
 
-       if (si->flags & SI_FL_RXBLK_ROOM) {
+       if (si_rx_blocked(si)) {
                ic->rex = TICK_ETERNITY;
        }
-       else if ((ic->flags & (CF_SHUTR|CF_READ_PARTIAL|CF_DONT_READ)) == CF_READ_PARTIAL) {
+       else if ((ic->flags & (CF_SHUTR|CF_READ_PARTIAL)) == CF_READ_PARTIAL) {
                /* we must re-enable reading if si_chk_snd() has freed some space */
                if (!(ic->flags & CF_READ_NOEXP) && tick_isset(ic->rex))
                        ic->rex = tick_add_ifset(now_ms, ic->rto);
@@ -725,11 +731,8 @@ struct task *si_cs_io_cb(struct task *t, void *ctx, unsigned short state)
 
        if (!(si->wait_event.wait_reason & SUB_CAN_SEND) && !channel_is_empty(si_oc(si)))
                ret = si_cs_send(cs);
-       if (!(si->wait_event.wait_reason & SUB_CAN_RECV)) {
+       if (!(si->wait_event.wait_reason & SUB_CAN_RECV))
                ret |= si_cs_recv(cs);
-               if (!(si_ic(si)->flags & (CF_SHUTR|CF_DONT_READ)))
-                       si->flags &= ~SI_FL_RX_WAIT_EP;
-       }
        if (ret != 0)
                si_cs_process(cs);
 
@@ -751,10 +754,14 @@ void stream_int_update(struct stream_interface *si)
 
        if (!(ic->flags & CF_SHUTR)) {
                /* Read not closed, update FD status and timeout for reads */
-               if ((ic->flags & CF_DONT_READ) || !channel_is_empty(ic)) {
+               if (ic->flags & CF_DONT_READ)
+                       si_rx_chan_blk(si);
+               else
+                       si_rx_chan_rdy(si);
+
+               if (!channel_is_empty(ic)) {
                        /* stop reading, imposed by channel's policy or contents */
                        si_cant_put(si);
-                       ic->rex = TICK_ETERNITY;
                }
                else {
                        /* (re)start reading and update timeout. Note: we don't recompute the timeout
@@ -763,9 +770,12 @@ void stream_int_update(struct stream_interface *si)
                         * have updated it if there has been a completed I/O.
                         */
                        si->flags &= ~SI_FL_RXBLK_ROOM;
-                       if (!(ic->flags & (CF_READ_NOEXP|CF_DONT_READ)) && !tick_isset(ic->rex))
-                               ic->rex = tick_add_ifset(now_ms, ic->rto);
                }
+               if (si->flags & SI_FL_RXBLK_ANY & ~SI_FL_RX_WAIT_EP)
+                       ic->rex = TICK_ETERNITY;
+               else if (!(ic->flags & CF_READ_NOEXP) && !tick_isset(ic->rex))
+                       ic->rex = tick_add_ifset(now_ms, ic->rto);
+
                si_chk_rcv(si);
        }
        else
@@ -1254,7 +1264,7 @@ int si_cs_recv(struct conn_stream *cs)
 
                if ((ic->flags & CF_READ_DONTWAIT) || --read_poll <= 0) {
                        /* we're stopped by the channel's policy */
-                       si_cant_put(si);
+                       si_rx_chan_blk(si);
                        break;
                }
 
@@ -1269,7 +1279,7 @@ int si_cs_recv(struct conn_stream *cs)
                         */
                        if (ic->flags & CF_STREAMER) {
                                /* we're stopped by the channel's policy */
-                               si_cant_put(si);
+                               si_rx_chan_blk(si);
                                break;
                        }
 
@@ -1278,7 +1288,7 @@ int si_cs_recv(struct conn_stream *cs)
                         */
                        if (ret >= global.tune.recv_enough) {
                                /* we're stopped by the channel's policy */
-                               si_cant_put(si);
+                               si_rx_chan_blk(si);
                                break;
                        }
                }
@@ -1286,7 +1296,7 @@ int si_cs_recv(struct conn_stream *cs)
                /* if we are waiting for more space, don't try to read more data
                 * right now.
                 */
-               if (si->flags & SI_FL_RXBLK_ROOM)
+               if (si_rx_blocked(si))
                        break;
        } /* while !flags */