]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
BUG/MEDIUM: stream-int: change the way buffer room is requested by a stream-int
authorWilly Tarreau <w@1wt.eu>
Mon, 12 Nov 2018 15:11:08 +0000 (16:11 +0100)
committerWilly Tarreau <w@1wt.eu>
Mon, 12 Nov 2018 17:58:45 +0000 (18:58 +0100)
Subsequent to the recent stream-int updates, we started to consider that
SI_FL_WANT_PUT needs to be set when receipt is enabled, but this is wrong
and results in 100% CPU when an HTTP client stays idle after a keep-alive
request because the stream-int has nothing to provide and nothing to send.

In fact just like for applets this flag should reflect the continuation
of an attempt. So it's si_cs_recv() which should set the flag, and clear
it if it has nothing more to provide. This function is called the first
time in process_stream()), and called again during transfers, so it will
always be up to date during stream_int_update() and stream_int_notify().

As a special case, it should also be set when a connection switches to
the established state. And we should absolutely refrain from calling
si_cs_recv() to re-enable reading, normally just setting this flag
(from within the stream-int's handler or prior to calling si_chk_rcv())
is expected to be OK.

A corner case remains where it was observed that in stream_int_notify() we
can sometimes be called with an empty output channel with SI_FL_WAIT_ROOM
and no CF_WRITE_PARTIAL, so there's no way to detect that we should
re-enable receiving. It's easy to also take care of this condition
there for the time it takes to figure if this situation is expected
or not.

Now it becomes more obvious that relying on a single flag to request
room (or on two flags to arbiter activity) is not workable given the
autonomy of both sides. The mux_h2 has taught us that blocking flags
are much more reliable, require much less condition and are much easier
to deal with. That's probably something to consider quickly in this
area.

No backport is needed.

src/stream.c
src/stream_interface.c

index 8e115e595ef9fa02ab90e34d03d7a5975c056ef6..fe07d4de529181a8250501a0b5761213c7b4a25a 100644 (file)
@@ -673,6 +673,7 @@ static int sess_update_st_con_tcp(struct stream *s)
        /* OK, this means that a connection succeeded. The caller will be
         * responsible for handling the transition from CON to EST.
         */
+       si_want_put(si);
        si->state    = SI_ST_EST;
        si->err_type = SI_ET_NONE;
        return 1;
index 65d2c6120c74d16e84e5d4335a1a672bcf68413a..1999589b8b52f658b2ebc7039c7ff95b6e86e76c 100644 (file)
@@ -475,12 +475,13 @@ void stream_int_notify(struct stream_interface *si)
                if (!(si->flags & SI_FL_INDEP_STR))
                        if (tick_isset(ic->rex))
                                ic->rex = tick_add_ifset(now_ms, ic->rto);
+       }
 
-               if (likely((oc->flags & (CF_SHUTW|CF_WRITE_PARTIAL|CF_DONT_READ)) == CF_WRITE_PARTIAL &&
-                          (si_opposite(si)->flags & SI_FL_WAIT_ROOM))) {
-                       si_opposite(si)->flags &= ~SI_FL_WAIT_ROOM;
-                       si_chk_rcv(si_opposite(si));
-               }
+       if ((si_opposite(si)->flags & SI_FL_WAIT_ROOM) &&
+           ((oc->flags & (CF_SHUTW|CF_WRITE_PARTIAL|CF_DONT_READ)) == CF_WRITE_PARTIAL ||
+            channel_is_empty(oc))) {
+               si_opposite(si)->flags &= ~SI_FL_WAIT_ROOM;
+               si_chk_rcv(si_opposite(si));
        }
 
        /* Notify the other side when we've injected data into the IC that
@@ -745,13 +746,10 @@ void stream_int_update(struct stream_interface *si)
        struct channel *oc = si_oc(si);
 
        if (!(ic->flags & CF_SHUTR)) {
-               if (!(ic->flags & CF_DONT_READ))
-                       si_want_put(si);
-
                /* Read not closed, update FD status and timeout for reads */
                if ((ic->flags & CF_DONT_READ) || co_data(ic)) {
-                       /* stop reading */
-                       si_stop_put(si);
+                       /* stop reading, imposed by channel's policy or contents */
+                       si_cant_put(si);
                        ic->rex = TICK_ETERNITY;
                }
                else {
@@ -846,14 +844,7 @@ void si_update_both(struct stream_interface *si_f, struct stream_interface *si_b
        }
 
        /* it's time to try to receive */
-       if (!(req->flags & (CF_SHUTR|CF_DONT_READ)))
-               si_want_put(si_f);
-
        si_chk_rcv(si_f);
-
-       if (!(res->flags & (CF_SHUTR|CF_DONT_READ)))
-               si_want_put(si_b);
-
        si_chk_rcv(si_b);
 
        /* let's recompute both sides states */
@@ -1133,9 +1124,6 @@ int si_cs_recv(struct conn_stream *cs)
        if (si->wait_event.wait_reason & SUB_CAN_RECV)
                return 0;
 
-       /* by default nothing to deliver */
-       si_stop_put(si);
-
        /* maybe we were called immediately after an asynchronous shutr */
        if (ic->flags & CF_SHUTR)
                return 1;
@@ -1144,6 +1132,9 @@ int si_cs_recv(struct conn_stream *cs)
        if (cs->flags & CS_FL_EOS)
                goto out_shutdown_r;
 
+       /* start by claiming we'll want to receive and change our mind later if needed */
+       si_want_put(si);
+
        if ((ic->flags & (CF_STREAMER | CF_STREAMER_FAST)) && !co_data(ic) &&
            global.tune.idle_timer &&
            (unsigned short)(now_ms - ic->last_read) >= global.tune.idle_timer) {
@@ -1235,8 +1226,10 @@ int si_cs_recv(struct conn_stream *cs)
                if (cs->flags & CS_FL_RCV_MORE)
                        si_cant_put(si);
 
-               if (ret <= 0)
+               if (ret <= 0) {
+                       si_stop_put(si);
                        break;
+               }
 
                cur_read += ret;
 
@@ -1254,8 +1247,11 @@ int si_cs_recv(struct conn_stream *cs)
                ic->flags |= CF_READ_PARTIAL;
                ic->total += ret;
 
-               if ((ic->flags & CF_READ_DONTWAIT) || --read_poll <= 0)
+               if ((ic->flags & CF_READ_DONTWAIT) || --read_poll <= 0) {
+                       /* we're stopped by the channel's policy */
+                       si_cant_put(si);
                        break;
+               }
 
                /* if too many bytes were missing from last read, it means that
                 * it's pointless trying to read again because the system does
@@ -1266,14 +1262,20 @@ int si_cs_recv(struct conn_stream *cs)
                         * have exhausted system buffers. It's not worth trying
                         * again.
                         */
-                       if (ic->flags & CF_STREAMER)
+                       if (ic->flags & CF_STREAMER) {
+                               /* we're stopped by the channel's policy */
+                               si_cant_put(si);
                                break;
+                       }
 
                        /* if we read a large block smaller than what we requested,
                         * it's almost certain we'll never get anything more.
                         */
-                       if (ret >= global.tune.recv_enough)
+                       if (ret >= global.tune.recv_enough) {
+                               /* we're stopped by the channel's policy */
+                               si_cant_put(si);
                                break;
+                       }
                }
 
                /* if we are waiting for more space, don't try to read more data