From: Willy Tarreau Date: Fri, 9 Nov 2018 13:59:25 +0000 (+0100) Subject: MEDIUM: stream-int: make stream_int_update() aware of the lower layers X-Git-Tag: v1.9-dev6~5 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=bf89ff3db8be1a8f87de305c144467bbc2503036;p=thirdparty%2Fhaproxy.git MEDIUM: stream-int: make stream_int_update() aware of the lower layers It's far from being clean, but at least it allows to resync both CS and applets from the same place, taking into account the fact that CS are processed synchronously for the send side while appletx are processed outside of the process_stream() loop. The arrangement is optimised to minimize the amount of iteration by handling send first, then updating the SI_FL_WAIT_ROOM flags and only then dealing with si_chk_rcv() on both sides. The SI_FL_WANT_PUT flag is set if needed before calling si_chk_rcv() since this is done prior to calling stream_int_update(). Now there's no risk that stream_int_notify() is called anymore during such operations, thus we cannot have any spurious wake-up anymore. The case where a successful send() could complete a pending connect() is handled by taking any stream-int state changes into account at the call place, which is normal since process_stream() is designed to iterate till stabilisation. Doing this solves most of the remaining inconsistencies between CS and applets. --- diff --git a/src/stream.c b/src/stream.c index b4acc3f758..8e115e595e 100644 --- a/src/stream.c +++ b/src/stream.c @@ -2476,7 +2476,8 @@ redo: si_update_both(si_f, si_b); - if (si_f->state == SI_ST_DIS || si_b->state == SI_ST_DIS || + if (si_f->state == SI_ST_DIS || si_f->state != si_f->prev_state || + si_b->state == SI_ST_DIS || si_b->state != si_b->prev_state || (((req->flags ^ rqf_last) | (res->flags ^ rpf_last)) & CF_MASK_ANALYSER)) goto redo; diff --git a/src/stream_interface.c b/src/stream_interface.c index 8d223fabf9..4788a20ab9 100644 --- a/src/stream_interface.c +++ b/src/stream_interface.c @@ -817,13 +817,7 @@ void si_update_both(struct stream_interface *si_f, struct stream_interface *si_b { struct channel *req = si_ic(si_f); struct channel *res = si_oc(si_f); - - /* let's recompute both sides states */ - if (si_f->state == SI_ST_EST) - stream_int_update(si_f); - - if (si_b->state == SI_ST_EST) - stream_int_update(si_b); + struct conn_stream *cs; req->flags &= ~(CF_READ_NULL|CF_READ_PARTIAL|CF_READ_ATTACHED|CF_WRITE_NULL|CF_WRITE_PARTIAL); res->flags &= ~(CF_READ_NULL|CF_READ_PARTIAL|CF_READ_ATTACHED|CF_WRITE_NULL|CF_WRITE_PARTIAL); @@ -834,11 +828,60 @@ void si_update_both(struct stream_interface *si_f, struct stream_interface *si_b si_f->prev_state = si_f->state; si_b->prev_state = si_b->state; - if (si_f->ops->update && si_f->state == SI_ST_EST) - si_f->ops->update(si_f); + /* front stream-int */ + cs = objt_cs(si_f->end); + if (cs && + si_f->state == SI_ST_EST && + !(res->flags & CF_SHUTW) && /* Write not closed */ + !channel_is_empty(res) && + !(cs->flags & CS_FL_ERROR) && + !(cs->conn->flags & CO_FL_ERROR)) { + if (si_cs_send(cs)) + si_b->flags &= ~SI_FL_WAIT_ROOM; + } + + /* back stream-int */ + cs = objt_cs(si_b->end); + if (cs && + (si_b->state == SI_ST_EST || si_b->state == SI_ST_CON) && + !(req->flags & CF_SHUTW) && /* Write not closed */ + !channel_is_empty(req) && + !(cs->flags & CS_FL_ERROR) && + !(cs->conn->flags & CO_FL_ERROR)) { + if (si_cs_send(cs)) + si_f->flags &= ~SI_FL_WAIT_ROOM; + } + + /* it's time to try to receive */ + if (!(req->flags & (CF_SHUTR|CF_DONT_READ))) + si_want_put(si_f); + + si_chk_rcv(si_f); + + if (!(res->flags & (CF_SHUTR|CF_DONT_READ))) + si_want_put(si_b); - if (si_b->ops->update && (si_b->state == SI_ST_EST || si_b->state == SI_ST_CON)) - si_b->ops->update(si_b); + si_chk_rcv(si_b); + + /* let's recompute both sides states */ + if (si_f->state == SI_ST_EST) + stream_int_update(si_f); + + if (si_b->state == SI_ST_EST) + stream_int_update(si_b); + + /* stream ints are processed outside of process_stream() and must be + * handled at the latest moment. + */ + if (obj_type(si_f->end) == OBJ_TYPE_APPCTX && + (((si_f->flags & (SI_FL_WANT_PUT|SI_FL_WAIT_ROOM)) == SI_FL_WANT_PUT) || + ((si_f->flags & (SI_FL_WANT_GET|SI_FL_WAIT_DATA)) == SI_FL_WANT_GET))) + appctx_wakeup(si_appctx(si_f)); + + if (obj_type(si_b->end) == OBJ_TYPE_APPCTX && + (((si_b->flags & (SI_FL_WANT_PUT|SI_FL_WAIT_ROOM)) == SI_FL_WANT_PUT) || + ((si_b->flags & (SI_FL_WANT_GET|SI_FL_WAIT_DATA)) == SI_FL_WANT_GET))) + appctx_wakeup(si_appctx(si_b)); } /* Updates the active status of a connection outside of the connection handler