}
-/* Callback to be used by connection I/O handlers upon completion. It differs from
- * the update function in that it is designed to be called by lower layers after I/O
- * events have been completed. It will also try to wake the associated task up if
- * an important event requires special handling. It relies on the connection handler
- * to commit any polling updates. The function always returns 0.
+/* Callback to be used by connection I/O handlers upon completion. It propagates
+ * connection flags to the stream interface, updates the stream (which may or
+ * may not take this opportunity to try to forward data), then update the
+ * connection's polling based on the channels and stream interface's final
+ * states. The function always returns 0.
*/
static int si_conn_wake_cb(struct connection *conn)
{
struct channel *ic = si_ic(si);
struct channel *oc = si_oc(si);
- DPRINTF(stderr, "%s: si=%p, si->state=%d ic->flags=%08x oc->flags=%08x\n",
- __FUNCTION__,
- si, si->state, ic->flags, oc->flags);
-
+ /* First step, report to the stream-int what was detected at the
+ * connection layer : errors and connection establishment.
+ */
if (conn->flags & CO_FL_ERROR)
si->flags |= SI_FL_ERR;
- /* check for recent connection establishment */
if (unlikely(!(conn->flags & (CO_FL_WAIT_L4_CONN | CO_FL_WAIT_L6_CONN | CO_FL_CONNECTED)))) {
si->exp = TICK_ETERNITY;
oc->flags |= CF_WRITE_NULL;
}
- /* process consumer side */
- if (channel_is_empty(oc)) {
- if (((oc->flags & (CF_SHUTW|CF_SHUTW_NOW)) == CF_SHUTW_NOW) &&
- (si->state == SI_ST_EST))
- stream_int_shutw_conn(si);
- __conn_data_stop_send(conn);
- oc->wex = TICK_ETERNITY;
- }
-
- if ((oc->flags & (CF_SHUTW|CF_SHUTW_NOW)) == 0 && channel_may_recv(oc))
- si->flags |= SI_FL_WAIT_DATA;
-
- if (oc->flags & CF_WRITE_ACTIVITY) {
- /* update timeouts if we have written something */
- if ((oc->flags & (CF_SHUTW|CF_WRITE_PARTIAL)) == CF_WRITE_PARTIAL &&
- !channel_is_empty(oc))
- if (tick_isset(oc->wex))
- oc->wex = tick_add_ifset(now_ms, oc->wto);
-
- if (!(si->flags & SI_FL_INDEP_STR))
- if (tick_isset(ic->rex))
- ic->rex = tick_add_ifset(now_ms, ic->rto);
-
- if (likely((oc->flags & (CF_SHUTW|CF_WRITE_PARTIAL|CF_DONT_READ)) == CF_WRITE_PARTIAL &&
- channel_may_recv(oc) &&
- (si_opposite(si)->flags & SI_FL_WAIT_ROOM)))
- si_chk_rcv(si_opposite(si));
- }
-
- /* Notify the other side when we've injected data into the IC that
- * needs to be forwarded. We can do fast-forwarding as soon as there
- * are output data, but we avoid doing this if some of the data are
- * not yet scheduled for being forwarded, because it is very likely
- * that it will be done again immediately afterwards once the following
- * data are parsed (eg: HTTP chunking). We only SI_FL_WAIT_ROOM once
- * we've emptied *some* of the output buffer, and not just when there
- * is available room, because applets are often forced to stop before
- * the buffer is full. We must not stop based on input data alone because
- * an HTTP parser might need more data to complete the parsing.
+ /* Second step : update the stream-int and channels, try to forward any
+ * pending data, then possibly wake the stream up based on the new
+ * stream-int status.
*/
- if (!channel_is_empty(ic) &&
- (si_opposite(si)->flags & SI_FL_WAIT_DATA) &&
- (ic->buf->i == 0 || ic->pipe)) {
- int new_len, last_len;
-
- last_len = ic->buf->o;
- if (ic->pipe)
- last_len += ic->pipe->data;
-
- si_chk_snd(si_opposite(si));
+ stream_int_notify(si);
- new_len = ic->buf->o;
- if (ic->pipe)
- new_len += ic->pipe->data;
+ /* Third step : update the connection's polling status based on what
+ * was done above (eg: maybe some buffers got emptied).
+ */
+ if (channel_is_empty(oc))
+ __conn_data_stop_send(conn);
- /* check if the consumer has freed some space either in the
- * buffer or in the pipe.
- */
- if (channel_may_recv(ic) && new_len < last_len)
- si->flags &= ~SI_FL_WAIT_ROOM;
- }
if (si->flags & SI_FL_WAIT_ROOM) {
__conn_data_stop_recv(conn);
- ic->rex = TICK_ETERNITY;
}
else if ((ic->flags & (CF_SHUTR|CF_READ_PARTIAL|CF_DONT_READ)) == CF_READ_PARTIAL &&
channel_may_recv(ic)) {
- /* we must re-enable reading if si_chk_snd() has freed some space */
__conn_data_want_recv(conn);
- if (!(ic->flags & CF_READ_NOEXP) && tick_isset(ic->rex))
- ic->rex = tick_add_ifset(now_ms, ic->rto);
- }
-
- /* wake the task up only when needed */
- if (/* changes on the production side */
- (ic->flags & (CF_READ_NULL|CF_READ_ERROR)) ||
- si->state != SI_ST_EST ||
- (si->flags & SI_FL_ERR) ||
- ((ic->flags & CF_READ_PARTIAL) &&
- (!ic->to_forward || si_opposite(si)->state != SI_ST_EST)) ||
-
- /* changes on the consumption side */
- (oc->flags & (CF_WRITE_NULL|CF_WRITE_ERROR)) ||
- ((oc->flags & CF_WRITE_ACTIVITY) &&
- ((oc->flags & CF_SHUTW) ||
- ((oc->flags & CF_WAKE_WRITE) &&
- (si_opposite(si)->state != SI_ST_EST ||
- (channel_is_empty(oc) && !oc->to_forward)))))) {
- task_wakeup(si_task(si), TASK_WOKEN_IO);
}
- if (ic->flags & CF_READ_ACTIVITY)
- ic->flags &= ~CF_READ_DONTWAIT;
-
- stream_release_buffers(si_strm(si));
return 0;
}
return;
}
-/* notifies the stream interface that the applet has completed its work */
+/* Callback to be used by applet handlers upon completion. It updates the stream
+ * (which may or may not take this opportunity to try to forward data), then
+ * may disable the applet's based on the channels and stream interface's final
+ * states.
+ */
void si_applet_done(struct stream_interface *si)
{
- struct channel *ic = si_ic(si);
- struct channel *oc = si_oc(si);
-
- /* process consumer side */
- if (channel_is_empty(oc)) {
- if (((oc->flags & (CF_SHUTW|CF_SHUTW_NOW)) == CF_SHUTW_NOW) &&
- (si->state == SI_ST_EST))
- stream_int_shutw_applet(si);
- oc->wex = TICK_ETERNITY;
- }
-
- /* indicate that we may be waiting for data from the output channel */
- if ((oc->flags & (CF_SHUTW|CF_SHUTW_NOW)) == 0 && channel_may_recv(oc))
- si->flags |= SI_FL_WAIT_DATA;
-
- /* update OC timeouts and wake the other side up if it's waiting for room */
- if (oc->flags & CF_WRITE_ACTIVITY) {
- if ((oc->flags & (CF_SHUTW|CF_WRITE_PARTIAL)) == CF_WRITE_PARTIAL &&
- !channel_is_empty(oc))
- if (tick_isset(oc->wex))
- oc->wex = tick_add_ifset(now_ms, oc->wto);
-
- if (!(si->flags & SI_FL_INDEP_STR))
- if (tick_isset(ic->rex))
- ic->rex = tick_add_ifset(now_ms, ic->rto);
-
- if (likely((oc->flags & (CF_SHUTW|CF_WRITE_PARTIAL|CF_DONT_READ)) == CF_WRITE_PARTIAL &&
- channel_may_recv(oc) &&
- (si_opposite(si)->flags & SI_FL_WAIT_ROOM)))
- si_chk_rcv(si_opposite(si));
- }
-
- /* Notify the other side when we've injected data into the IC that
- * needs to be forwarded. We can do fast-forwarding as soon as there
- * are output data, but we avoid doing this for partial buffers,
- * because it is very likely that it will be done again immediately
- * afterwards once the following data are parsed (eg: HTTP chunking).
- * We only remove SI_FL_WAIT_ROOM once we've emptied the whole output
- * buffer, because applets are often forced to stop before the buffer
- * is full. We must not stop based on input data alone because an HTTP
- * parser might need more data to complete the parsing.
- */
- if (!channel_is_empty(ic) &&
- (si_opposite(si)->flags & SI_FL_WAIT_DATA) &&
- (si_ib(si)->i == 0 || ic->pipe)) {
- si_chk_snd(si_opposite(si));
- if (channel_is_empty(ic))
- si->flags &= ~SI_FL_WAIT_ROOM;
- }
-
- if (si->flags & SI_FL_WAIT_ROOM) {
- ic->rex = TICK_ETERNITY;
- }
- else if ((ic->flags & (CF_SHUTR|CF_READ_PARTIAL|CF_DONT_READ)) == CF_READ_PARTIAL &&
- channel_may_recv(ic)) {
- /* we must re-enable reading if si_chk_snd() has freed some space */
- if (!(ic->flags & CF_READ_NOEXP) && tick_isset(ic->rex))
- ic->rex = tick_add_ifset(now_ms, ic->rto);
- }
-
- /* wake the task up only when needed */
- if (/* changes on the production side */
- (ic->flags & (CF_READ_NULL|CF_READ_ERROR)) ||
- si->state != SI_ST_EST ||
- (si->flags & SI_FL_ERR) ||
- ((ic->flags & CF_READ_PARTIAL) &&
- (!ic->to_forward || si_opposite(si)->state != SI_ST_EST)) ||
-
- /* changes on the consumption side */
- (oc->flags & (CF_WRITE_NULL|CF_WRITE_ERROR)) ||
- ((oc->flags & CF_WRITE_ACTIVITY) &&
- ((oc->flags & CF_SHUTW) ||
- ((oc->flags & CF_WAKE_WRITE) &&
- (si_opposite(si)->state != SI_ST_EST ||
- (channel_is_empty(oc) && !oc->to_forward)))))) {
- task_wakeup(si_task(si), TASK_WOKEN_IO);
- }
-
- if (ic->flags & CF_READ_ACTIVITY)
- ic->flags &= ~CF_READ_DONTWAIT;
-
- stream_release_buffers(si_strm(si));
+ /* update the stream-int, channels, and possibly wake the stream up */
+ stream_int_notify(si);
/* Get away from the active list if we can't work anymore.
* We also do that if the main task has already scheduled, because it