From: Willy Tarreau Date: Wed, 23 Sep 2015 16:40:09 +0000 (+0200) Subject: MINOR: stream-int: implement the stream_int_notify() function X-Git-Tag: v1.6-dev6~76 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=615f28bec1aace96133550ba0e4895a463196968;p=thirdparty%2Fhaproxy.git MINOR: stream-int: implement the stream_int_notify() function stream_int_notify() was taken from the common part between si_conn_wake_cb() and si_applet_done(). It is designed to report activity to a stream from outside its handler. It'll generally be used by lower layers to report I/O completion but may also be used by remote streams if the buffer processing is shared. --- diff --git a/include/proto/stream_interface.h b/include/proto/stream_interface.h index d1d75baabb..22d05be805 100644 --- a/include/proto/stream_interface.h +++ b/include/proto/stream_interface.h @@ -51,6 +51,7 @@ void si_applet_done(struct stream_interface *si); void stream_int_update(struct stream_interface *si); void stream_int_update_conn(struct stream_interface *si); void stream_int_update_applet(struct stream_interface *si); +void stream_int_notify(struct stream_interface *si); /* returns the channel which receives data from this stream interface (input channel) */ static inline struct channel *si_ic(struct stream_interface *si) diff --git a/src/stream_interface.c b/src/stream_interface.c index 1da5248e00..dbc481f799 100644 --- a/src/stream_interface.c +++ b/src/stream_interface.c @@ -519,6 +519,115 @@ static int si_idle_conn_wake_cb(struct connection *conn) return 0; } +/* This function is the equivalent to stream_int_update() except that it's + * designed to be called from outside the stream handlers, typically the lower + * layers (applets, connections) after I/O completion. After updating the stream + * interface and timeouts, it will try to forward what can be forwarded, then to + * wake the associated task up if an important event requires special handling. + * It should not be called from within the stream itself, stream_int_update() + * is designed for this. + */ +void stream_int_notify(struct stream_interface *si) +{ + struct channel *ic = si_ic(si); + struct channel *oc = si_oc(si); + + /* process consumer side */ + if (channel_is_empty(oc)) { + if (((oc->flags & (CF_SHUTW|CF_SHUTW_NOW)) == CF_SHUTW_NOW) && + (si->state == SI_ST_EST)) + si_shutw(si); + oc->wex = TICK_ETERNITY; + } + + /* indicate that we may be waiting for data from the output channel */ + if ((oc->flags & (CF_SHUTW|CF_SHUTW_NOW)) == 0 && channel_may_recv(oc)) + si->flags |= SI_FL_WAIT_DATA; + + /* update OC timeouts and wake the other side up if it's waiting for room */ + if (oc->flags & CF_WRITE_ACTIVITY) { + if ((oc->flags & (CF_SHUTW|CF_WRITE_PARTIAL)) == CF_WRITE_PARTIAL && + !channel_is_empty(oc)) + if (tick_isset(oc->wex)) + oc->wex = tick_add_ifset(now_ms, oc->wto); + + if (!(si->flags & SI_FL_INDEP_STR)) + if (tick_isset(ic->rex)) + ic->rex = tick_add_ifset(now_ms, ic->rto); + + if (likely((oc->flags & (CF_SHUTW|CF_WRITE_PARTIAL|CF_DONT_READ)) == CF_WRITE_PARTIAL && + channel_may_recv(oc) && + (si_opposite(si)->flags & SI_FL_WAIT_ROOM))) + si_chk_rcv(si_opposite(si)); + } + + /* Notify the other side when we've injected data into the IC that + * needs to be forwarded. We can do fast-forwarding as soon as there + * are output data, but we avoid doing this if some of the data are + * not yet scheduled for being forwarded, because it is very likely + * that it will be done again immediately afterwards once the following + * data are parsed (eg: HTTP chunking). We only SI_FL_WAIT_ROOM once + * we've emptied *some* of the output buffer, and not just when there + * is available room, because applets are often forced to stop before + * the buffer is full. We must not stop based on input data alone because + * an HTTP parser might need more data to complete the parsing. + */ + if (!channel_is_empty(ic) && + (si_opposite(si)->flags & SI_FL_WAIT_DATA) && + (ic->buf->i == 0 || ic->pipe)) { + int new_len, last_len; + + last_len = ic->buf->o; + if (ic->pipe) + last_len += ic->pipe->data; + + si_chk_snd(si_opposite(si)); + + new_len = ic->buf->o; + if (ic->pipe) + new_len += ic->pipe->data; + + /* check if the consumer has freed some space either in the + * buffer or in the pipe. + */ + if (channel_may_recv(ic) && new_len < last_len) + si->flags &= ~SI_FL_WAIT_ROOM; + } + + if (si->flags & SI_FL_WAIT_ROOM) { + ic->rex = TICK_ETERNITY; + } + else if ((ic->flags & (CF_SHUTR|CF_READ_PARTIAL|CF_DONT_READ)) == CF_READ_PARTIAL && + channel_may_recv(ic)) { + /* we must re-enable reading if si_chk_snd() has freed some space */ + if (!(ic->flags & CF_READ_NOEXP) && tick_isset(ic->rex)) + ic->rex = tick_add_ifset(now_ms, ic->rto); + } + + /* wake the task up only when needed */ + if (/* changes on the production side */ + (ic->flags & (CF_READ_NULL|CF_READ_ERROR)) || + si->state != SI_ST_EST || + (si->flags & SI_FL_ERR) || + ((ic->flags & CF_READ_PARTIAL) && + (!ic->to_forward || si_opposite(si)->state != SI_ST_EST)) || + + /* changes on the consumption side */ + (oc->flags & (CF_WRITE_NULL|CF_WRITE_ERROR)) || + ((oc->flags & CF_WRITE_ACTIVITY) && + ((oc->flags & CF_SHUTW) || + ((oc->flags & CF_WAKE_WRITE) && + (si_opposite(si)->state != SI_ST_EST || + (channel_is_empty(oc) && !oc->to_forward)))))) { + task_wakeup(si_task(si), TASK_WOKEN_IO); + } + if (ic->flags & CF_READ_ACTIVITY) + ic->flags &= ~CF_READ_DONTWAIT; + + stream_release_buffers(si_strm(si)); +} + + /* Callback to be used by connection I/O handlers upon completion. It differs from * the update function in that it is designed to be called by lower layers after I/O * events have been completed. It will also try to wake the associated task up if