From: Willy Tarreau Date: Thu, 25 Oct 2018 09:06:57 +0000 (+0200) Subject: MEDIUM: stream-int: make si_update() synchronize flag changes before the I/O X-Git-Tag: v1.9-dev5~30 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=85f890174af1dee74051f4e7015b8e8807b3ddef;p=thirdparty%2Fhaproxy.git MEDIUM: stream-int: make si_update() synchronize flag changes before the I/O With the new synchronous si_cs_send() at the end of process_stream(), we're seeing re-appear the I/O layer specific part of the stream interface which is supposed to deal with I/O event subscription. The only difference is that now we subscribe to I/Os only after having attempted (and failed) them. This patch brings a cleanup in this by reintroducing stream_int_update_conn() with the send code from process_stream(). However this alone would not be enough because the flags which are cleared afterwards would result in the loss of the possible events (write events only at the moment). So the flags clearing and stream-int state updates are also performed inside si_update() between the generic code and the I/O specific code. This definitely makes sense as after this call we can simply check again for channel and SI flag changes and decide to loop once again or not. --- diff --git a/include/proto/stream_interface.h b/include/proto/stream_interface.h index f3cf5e0e90..4e400399b0 100644 --- a/include/proto/stream_interface.h +++ b/include/proto/stream_interface.h @@ -357,11 +357,23 @@ static inline void si_shutw(struct stream_interface *si) si->ops->shutw(si); } -/* Updates the stream interface and timers, then updates the data layer below */ +/* Updates the stream interface and timers, to complete the work after the + * analysers, then clears the relevant channel flags, and the errors and + * expirations, then updates the data layer below. This will ensure that any + * synchronous update performed at the data layer will be reflected in the + * channel flags and/or stream-interface. + */ static inline void si_update(struct stream_interface *si) { - stream_int_update(si); - if (si->ops->update) + if (si->state == SI_ST_EST) + stream_int_update(si); + + si_ic(si)->flags &= ~(CF_READ_NULL|CF_READ_PARTIAL|CF_READ_ATTACHED); + si_oc(si)->flags &= ~(CF_WRITE_NULL|CF_WRITE_PARTIAL); + si->flags &= ~(SI_FL_ERR|SI_FL_EXP); + si->prev_state = si->state; + + if (si->ops->update && (si->state == SI_ST_CON || si->state == SI_ST_EST)) si->ops->update(si); } diff --git a/src/stream.c b/src/stream.c index 4e2db45afc..f33713dc9c 100644 --- a/src/stream.c +++ b/src/stream.c @@ -1659,7 +1659,6 @@ struct task *process_stream(struct task *t, void *context, unsigned short state) struct channel *req, *res; struct stream_interface *si_f, *si_b; struct conn_stream *cs; - int ret; activity[tid].stream++; @@ -2447,37 +2446,12 @@ redo: if ((sess->fe->options & PR_O_CONTSTATS) && (s->flags & SF_BE_ASSIGNED)) stream_process_counters(s); - cs = objt_cs(si_f->end); - ret = 0; - if (cs && !(cs->conn->flags & CO_FL_ERROR) && - !(cs->flags & CS_FL_ERROR) && - !(si_oc(si_f)->flags & CF_SHUTW) && - !(si_f->wait_event.wait_reason & SUB_CAN_SEND) && - co_data(si_oc(si_f))) - ret = si_cs_send(cs); - cs = objt_cs(si_b->end); - if (cs && !(cs->conn->flags & CO_FL_ERROR) && - !(cs->flags & CS_FL_ERROR) && - !(si_oc(si_b)->flags & CF_SHUTW) && - !(si_b->wait_event.wait_reason & SUB_CAN_SEND) && - co_data(si_oc(si_b))) - ret |= si_cs_send(cs); - - if (ret) - goto redo; - - if (si_f->state == SI_ST_EST) - si_update(si_f); + si_update(si_f); + si_update(si_b); - if (si_b->state == SI_ST_EST) - si_update(si_b); - - req->flags &= ~(CF_READ_NULL|CF_READ_PARTIAL|CF_WRITE_NULL|CF_WRITE_PARTIAL|CF_READ_ATTACHED); - res->flags &= ~(CF_READ_NULL|CF_READ_PARTIAL|CF_WRITE_NULL|CF_WRITE_PARTIAL|CF_READ_ATTACHED); - si_f->prev_state = si_f->state; - si_b->prev_state = si_b->state; - si_f->flags &= ~(SI_FL_ERR|SI_FL_EXP); - si_b->flags &= ~(SI_FL_ERR|SI_FL_EXP); + if (si_f->state == SI_ST_DIS || si_b->state == SI_ST_DIS || + (((req->flags ^ rqf_last) | (res->flags ^ rpf_last)) & CF_MASK_ANALYSER)) + goto redo; /* Trick: if a request is being waiting for the server to respond, * and if we know the server can timeout, we don't want the timeout diff --git a/src/stream_interface.c b/src/stream_interface.c index 8574b4ca30..08814cf399 100644 --- a/src/stream_interface.c +++ b/src/stream_interface.c @@ -66,6 +66,7 @@ struct si_ops si_embedded_ops = { /* stream-interface operations for connections */ struct si_ops si_conn_ops = { + .update = stream_int_update_conn, .chk_rcv = stream_int_chk_rcv_conn, .chk_snd = stream_int_chk_snd_conn, .shutr = stream_int_shutr_conn, @@ -798,6 +799,33 @@ void stream_int_update(struct stream_interface *si) } } +/* Updates the active status of a connection outside of the connection handler + * based on the channel's flags and the stream interface's flags. It needs to + * be called once after the channels' flags have settled down and the stream + * has been updated. It is not designed to be called from within the connection + * handler itself. + */ +void stream_int_update_conn(struct stream_interface *si) +{ + struct channel *ic = si_ic(si); + struct channel *oc = si_oc(si); + struct conn_stream *cs = __objt_cs(si->end); + + if (!(ic->flags & CF_SHUTR)) { + /* Read not closed, it doesn't seem we have to do anything here */ + } + + if (!(oc->flags & CF_SHUTW)) { + /* Write not closed */ + if (!channel_is_empty(oc) && + !(cs->conn->flags & CO_FL_ERROR) && + !(cs->flags & CS_FL_ERROR) && + !(oc->flags & CF_SHUTW) && + !(si->wait_event.wait_reason & SUB_CAN_SEND)) + si_cs_send(cs); + } +} + /* * This function performs a shutdown-read on a stream interface attached to * a connection in a connected or init state (it does nothing for other