]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MEDIUM: stream-int: make stream_int_update() aware of the lower layers
authorWilly Tarreau <w@1wt.eu>
Fri, 9 Nov 2018 13:59:25 +0000 (14:59 +0100)
committerWilly Tarreau <w@1wt.eu>
Sun, 11 Nov 2018 09:18:37 +0000 (10:18 +0100)
It's far from being clean, but at least it allows to resync both CS and
applets from the same place, taking into account the fact that CS are
processed synchronously for the send side while appletx are processed
outside of the process_stream() loop. The arrangement is optimised to
minimize the amount of iteration by handling send first, then updating
the SI_FL_WAIT_ROOM flags and only then dealing with si_chk_rcv() on
both sides. The SI_FL_WANT_PUT flag is set if needed before calling
si_chk_rcv() since this is done prior to calling stream_int_update().

Now there's no risk that stream_int_notify() is called anymore during
such operations, thus we cannot have any spurious wake-up anymore. The
case where a successful send() could complete a pending connect() is
handled by taking any stream-int state changes into account at the
call place, which is normal since process_stream() is designed to
iterate till stabilisation.

Doing this solves most of the remaining inconsistencies between CS and
applets.

src/stream.c
src/stream_interface.c

index b4acc3f758e3545b398fae6096305478a3813158..8e115e595ef9fa02ab90e34d03d7a5975c056ef6 100644 (file)
@@ -2476,7 +2476,8 @@ redo:
 
                si_update_both(si_f, si_b);
 
-               if (si_f->state == SI_ST_DIS || si_b->state == SI_ST_DIS ||
+               if (si_f->state == SI_ST_DIS || si_f->state != si_f->prev_state ||
+                   si_b->state == SI_ST_DIS || si_b->state != si_b->prev_state ||
                    (((req->flags ^ rqf_last) | (res->flags ^ rpf_last)) & CF_MASK_ANALYSER))
                        goto redo;
 
index 8d223fabf948166ef83066340bea71903377e81a..4788a20ab9dbc19ed4cd9eba67cad0f0e8acc8af 100644 (file)
@@ -817,13 +817,7 @@ void si_update_both(struct stream_interface *si_f, struct stream_interface *si_b
 {
        struct channel *req = si_ic(si_f);
        struct channel *res = si_oc(si_f);
-
-       /* let's recompute both sides states */
-       if (si_f->state == SI_ST_EST)
-               stream_int_update(si_f);
-
-       if (si_b->state == SI_ST_EST)
-               stream_int_update(si_b);
+       struct conn_stream *cs;
 
        req->flags &= ~(CF_READ_NULL|CF_READ_PARTIAL|CF_READ_ATTACHED|CF_WRITE_NULL|CF_WRITE_PARTIAL);
        res->flags &= ~(CF_READ_NULL|CF_READ_PARTIAL|CF_READ_ATTACHED|CF_WRITE_NULL|CF_WRITE_PARTIAL);
@@ -834,11 +828,60 @@ void si_update_both(struct stream_interface *si_f, struct stream_interface *si_b
        si_f->prev_state = si_f->state;
        si_b->prev_state = si_b->state;
 
-       if (si_f->ops->update && si_f->state == SI_ST_EST)
-               si_f->ops->update(si_f);
+       /* front stream-int */
+       cs = objt_cs(si_f->end);
+       if (cs &&
+           si_f->state == SI_ST_EST &&
+           !(res->flags & CF_SHUTW) && /* Write not closed */
+           !channel_is_empty(res) &&
+           !(cs->flags & CS_FL_ERROR) &&
+           !(cs->conn->flags & CO_FL_ERROR)) {
+               if (si_cs_send(cs))
+                       si_b->flags &= ~SI_FL_WAIT_ROOM;
+       }
+
+       /* back stream-int */
+       cs = objt_cs(si_b->end);
+       if (cs &&
+           (si_b->state == SI_ST_EST || si_b->state == SI_ST_CON) &&
+           !(req->flags & CF_SHUTW) && /* Write not closed */
+           !channel_is_empty(req) &&
+           !(cs->flags & CS_FL_ERROR) &&
+           !(cs->conn->flags & CO_FL_ERROR)) {
+               if (si_cs_send(cs))
+                       si_f->flags &= ~SI_FL_WAIT_ROOM;
+       }
+
+       /* it's time to try to receive */
+       if (!(req->flags & (CF_SHUTR|CF_DONT_READ)))
+               si_want_put(si_f);
+
+       si_chk_rcv(si_f);
+
+       if (!(res->flags & (CF_SHUTR|CF_DONT_READ)))
+               si_want_put(si_b);
 
-       if (si_b->ops->update && (si_b->state == SI_ST_EST || si_b->state == SI_ST_CON))
-               si_b->ops->update(si_b);
+       si_chk_rcv(si_b);
+
+       /* let's recompute both sides states */
+       if (si_f->state == SI_ST_EST)
+               stream_int_update(si_f);
+
+       if (si_b->state == SI_ST_EST)
+               stream_int_update(si_b);
+
+       /* stream ints are processed outside of process_stream() and must be
+        * handled at the latest moment.
+        */
+       if (obj_type(si_f->end) == OBJ_TYPE_APPCTX &&
+           (((si_f->flags & (SI_FL_WANT_PUT|SI_FL_WAIT_ROOM)) == SI_FL_WANT_PUT) ||
+            ((si_f->flags & (SI_FL_WANT_GET|SI_FL_WAIT_DATA)) == SI_FL_WANT_GET)))
+               appctx_wakeup(si_appctx(si_f));
+
+       if (obj_type(si_b->end) == OBJ_TYPE_APPCTX &&
+           (((si_b->flags & (SI_FL_WANT_PUT|SI_FL_WAIT_ROOM)) == SI_FL_WANT_PUT) ||
+            ((si_b->flags & (SI_FL_WANT_GET|SI_FL_WAIT_DATA)) == SI_FL_WANT_GET)))
+               appctx_wakeup(si_appctx(si_b));
 }
 
 /* Updates the active status of a connection outside of the connection handler