]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MEDIUM: stream-int: unconditionally call si_chk_rcv() in update and notify
authorWilly Tarreau <w@1wt.eu>
Thu, 15 Nov 2018 06:46:57 +0000 (07:46 +0100)
committerWilly Tarreau <w@1wt.eu>
Sun, 18 Nov 2018 20:41:49 +0000 (21:41 +0100)
For a long time, stream_int_update() and stream_int_notify() used to only
conditionally call si_chk_rcv() based on state change detection. This
detection is not reliable and quite complex. With the new blocked flags
that si_chk_rcv() checks, it's much more reliable to always call the
function to take into account recent changes,and let it decide if it needs
to wake something up or not.

This also removes the calls to si_chk_rcv() that were performed in
si_update_both() since these ones are systematically performed in
stream_int_update() after updating the Rx flags.

src/stream_interface.c

index df2adb3aa1fb144ca55d517a604ac9ad59dacc8e..80412e7c10a6d0c551f562c4a0ed5facbcf7c2a0 100644 (file)
@@ -448,6 +448,7 @@ void stream_int_notify(struct stream_interface *si)
 {
        struct channel *ic = si_ic(si);
        struct channel *oc = si_oc(si);
+       struct stream_interface *sio = si_opposite(si);
 
        /* process consumer side */
        if (channel_is_empty(oc)) {
@@ -479,11 +480,10 @@ void stream_int_notify(struct stream_interface *si)
                                ic->rex = tick_add_ifset(now_ms, ic->rto);
        }
 
-       if ((si_opposite(si)->flags & SI_FL_RXBLK_ROOM) &&
+       if ((sio->flags & SI_FL_RXBLK_ROOM) &&
            ((oc->flags & (CF_SHUTW|CF_WRITE_PARTIAL|CF_DONT_READ)) == CF_WRITE_PARTIAL ||
             channel_is_empty(oc))) {
-               si_opposite(si)->flags &= ~SI_FL_RXBLK_ROOM;
-               si_chk_rcv(si_opposite(si));
+               sio->flags &= ~SI_FL_RXBLK_ROOM;
        }
 
        /* Notify the other side when we've injected data into the IC that
@@ -498,7 +498,7 @@ void stream_int_notify(struct stream_interface *si)
         * an HTTP parser might need more data to complete the parsing.
         */
        if (!channel_is_empty(ic) &&
-           (si_opposite(si)->flags & SI_FL_WAIT_DATA) &&
+           (sio->flags & SI_FL_WAIT_DATA) &&
            (!(ic->flags & CF_EXPECT_MORE) || c_full(ic) || ci_data(ic) == 0 || ic->pipe)) {
                int new_len, last_len;
 
@@ -506,7 +506,7 @@ void stream_int_notify(struct stream_interface *si)
                if (ic->pipe)
                        last_len += ic->pipe->data;
 
-               si_chk_snd(si_opposite(si));
+               si_chk_snd(sio);
 
                new_len = co_data(ic);
                if (ic->pipe)
@@ -515,12 +515,13 @@ void stream_int_notify(struct stream_interface *si)
                /* check if the consumer has freed some space either in the
                 * buffer or in the pipe.
                 */
-               if (new_len < last_len) {
+               if (new_len < last_len)
                        si->flags &= ~SI_FL_RXBLK_ROOM;
-                       si_chk_rcv(si);
-               }
        }
 
+       si_chk_rcv(si);
+       si_chk_rcv(sio);
+
        if (si->flags & SI_FL_RXBLK_ROOM) {
                ic->rex = TICK_ETERNITY;
        }
@@ -536,14 +537,14 @@ void stream_int_notify(struct stream_interface *si)
            (si->state != SI_ST_EST && si->state != SI_ST_CON) ||
            (si->flags & SI_FL_ERR) ||
            ((ic->flags & CF_READ_PARTIAL) &&
-            (!ic->to_forward || si_opposite(si)->state != SI_ST_EST)) ||
+            (!ic->to_forward || sio->state != SI_ST_EST)) ||
 
            /* changes on the consumption side */
            (oc->flags & (CF_WRITE_NULL|CF_WRITE_ERROR)) ||
            ((oc->flags & CF_WRITE_ACTIVITY) &&
             ((oc->flags & CF_SHUTW) ||
              ((oc->flags & CF_WAKE_WRITE) &&
-              (si_opposite(si)->state != SI_ST_EST ||
+              (sio->state != SI_ST_EST ||
                (channel_is_empty(oc) && !oc->to_forward)))))) {
                task_wakeup(si_task(si), TASK_WOKEN_IO);
        }
@@ -762,10 +763,10 @@ void stream_int_update(struct stream_interface *si)
                         * have updated it if there has been a completed I/O.
                         */
                        si->flags &= ~SI_FL_RXBLK_ROOM;
-                       si_chk_rcv(si);
                        if (!(ic->flags & (CF_READ_NOEXP|CF_DONT_READ)) && !tick_isset(ic->rex))
                                ic->rex = tick_add_ifset(now_ms, ic->rto);
                }
+               si_chk_rcv(si);
        }
        else
                si_rx_shut_blk(si);
@@ -848,10 +849,6 @@ void si_update_both(struct stream_interface *si_f, struct stream_interface *si_b
                        si_f->flags &= ~SI_FL_RXBLK_ROOM;
        }
 
-       /* it's time to try to receive */
-       si_chk_rcv(si_f);
-       si_chk_rcv(si_b);
-
        /* let's recompute both sides states */
        if (si_f->state == SI_ST_EST)
                stream_int_update(si_f);