if ((sess->fe->options & PR_O_CONTSTATS) && (s->flags & SF_BE_ASSIGNED))
stream_process_counters(s);
- if (si_f->state == SI_ST_EST && obj_type(si_f->end) != OBJ_TYPE_APPCTX)
+ if (si_f->state == SI_ST_EST)
si_update(si_f);
- if (si_b->state == SI_ST_EST && obj_type(si_b->end) != OBJ_TYPE_APPCTX)
+ if (si_b->state == SI_ST_EST)
si_update(si_b);
req->flags &= ~(CF_READ_NULL|CF_READ_PARTIAL|CF_WRITE_NULL|CF_WRITE_PARTIAL|CF_READ_ATTACHED);
req->rex = TICK_ETERNITY;
}
- /* When any of the stream interfaces is attached to an applet,
- * we have to call it here. Note that this one may wake the
- * task up again. If at least one applet was called, the current
- * task might have been woken up, in which case we don't want it
- * to be requeued to the wait queue but rather to the run queue
- * to run ASAP. The bitwise "or" in the condition ensures that
- * both functions are always called and that we wake up if at
- * least one did something.
- */
- if ((si_applet_call(si_b) | si_applet_call(si_f)) != 0) {
- if (task_in_rq(t)) {
- t->expire = TICK_ETERNITY;
- stream_release_buffers(s);
- return t;
- }
- }
-
update_exp_and_leave:
t->expire = tick_first(tick_first(req->rex, req->wex),
tick_first(res->rex, res->wex));
stream_release_buffers(si_strm(si));
}
-/* default update function for applets, to be used at the end of the i/o handler */
-static void stream_int_update_applet(struct stream_interface *si)
+/* updates the timers and flags of a stream interface attached to an applet.
+ * it's called from the upper layers after the buffers/channels have been
+ * updated.
+ */
+void stream_int_update_applet(struct stream_interface *si)
{
- int old_flags = si->flags;
struct channel *ic = si_ic(si);
struct channel *oc = si_oc(si);
- DPRINTF(stderr, "%s: si=%p, si->state=%d ic->flags=%08x oc->flags=%08x\n",
- __FUNCTION__,
- si, si->state, ic->flags, oc->flags);
-
- if (si->state != SI_ST_EST)
- return;
-
- if ((oc->flags & (CF_SHUTW|CF_SHUTW_NOW)) == CF_SHUTW_NOW &&
- channel_is_empty(oc))
- si_shutw(si);
-
- if ((oc->flags & (CF_SHUTW|CF_SHUTW_NOW)) == 0 && channel_may_recv(oc))
- si->flags |= SI_FL_WAIT_DATA;
-
- /* we're almost sure that we need some space if the buffer is not
- * empty, even if it's not full, because the applets can't fill it.
- */
- if ((ic->flags & (CF_SHUTR|CF_DONT_READ)) == 0 && !channel_is_empty(ic))
- si->flags |= SI_FL_WAIT_ROOM;
-
- if (oc->flags & CF_WRITE_ACTIVITY) {
- if (tick_isset(oc->wex))
- oc->wex = tick_add_ifset(now_ms, oc->wto);
- }
-
- if (ic->flags & CF_READ_ACTIVITY ||
- (oc->flags & CF_WRITE_ACTIVITY && !(si->flags & SI_FL_INDEP_STR))) {
- if (tick_isset(ic->rex))
- ic->rex = tick_add_ifset(now_ms, ic->rto);
- }
-
- /* save flags to detect changes */
- old_flags = si->flags;
- if (likely((oc->flags & (CF_SHUTW|CF_WRITE_PARTIAL|CF_DONT_READ)) == CF_WRITE_PARTIAL &&
- channel_may_recv(oc) &&
- (si_opposite(si)->flags & SI_FL_WAIT_ROOM)))
- si_chk_rcv(si_opposite(si));
-
- if (((ic->flags & CF_READ_PARTIAL) && !channel_is_empty(ic)) &&
- (ic->pipe /* always try to send spliced data */ ||
- (ic->buf->i == 0 && (si_opposite(si)->flags & SI_FL_WAIT_DATA)))) {
- si_chk_snd(si_opposite(si));
- /* check if the consumer has freed some space */
- if (channel_may_recv(ic) && !ic->pipe)
+ /* Check if we need to close the read side */
+ if (!(ic->flags & CF_SHUTR)) {
+ /* Read not closed, update FD status and timeout for reads */
+ if ((ic->flags & CF_DONT_READ) || !channel_may_recv(ic)) {
+ /* stop reading */
+ if (!(si->flags & SI_FL_WAIT_ROOM)) {
+ if (!(ic->flags & CF_DONT_READ)) /* full */
+ si->flags |= SI_FL_WAIT_ROOM;
+ ic->rex = TICK_ETERNITY;
+ }
+ }
+ else {
+ /* (re)start reading and update timeout. Note: we don't recompute the timeout
+ * everytime we get here, otherwise it would risk never to expire. We only
+ * update it if is was not yet set. The stream socket handler will already
+ * have updated it if there has been a completed I/O.
+ */
si->flags &= ~SI_FL_WAIT_ROOM;
+ if (!(ic->flags & (CF_READ_NOEXP|CF_DONT_READ)) && !tick_isset(ic->rex))
+ ic->rex = tick_add_ifset(now_ms, ic->rto);
+ }
}
- /* Note that we're trying to wake up in two conditions here :
- * - special event, which needs the holder task attention
- * - status indicating that the applet can go on working. This
- * is rather hard because we might be blocking on output and
- * don't want to wake up on input and vice-versa. The idea is
- * to only rely on the changes the chk_* might have performed.
- */
- if (/* check stream interface changes */
- ((old_flags & ~si->flags) & (SI_FL_WAIT_ROOM|SI_FL_WAIT_DATA)) ||
-
- /* changes on the production side */
- (ic->flags & (CF_READ_NULL|CF_READ_ERROR)) ||
- si->state != SI_ST_EST ||
- (si->flags & SI_FL_ERR) ||
- ((ic->flags & CF_READ_PARTIAL) &&
- (!ic->to_forward || si_opposite(si)->state != SI_ST_EST)) ||
-
- /* changes on the consumption side */
- (oc->flags & (CF_WRITE_NULL|CF_WRITE_ERROR)) ||
- ((oc->flags & CF_WRITE_ACTIVITY) &&
- ((oc->flags & CF_SHUTW) ||
- ((oc->flags & CF_WAKE_WRITE) &&
- (si_opposite(si)->state != SI_ST_EST ||
- (channel_is_empty(oc) && !oc->to_forward)))))) {
- if (!(si->flags & SI_FL_DONT_WAKE))
- task_wakeup(si_task(si), TASK_WOKEN_IO);
+ /* Check if we need to close the write side */
+ if (!(oc->flags & CF_SHUTW)) {
+ /* Write not closed, update FD status and timeout for writes */
+ if (channel_is_empty(oc)) {
+ /* stop writing */
+ if (!(si->flags & SI_FL_WAIT_DATA)) {
+ if ((oc->flags & CF_SHUTW_NOW) == 0)
+ si->flags |= SI_FL_WAIT_DATA;
+ oc->wex = TICK_ETERNITY;
+ }
+ }
+ else {
+ /* (re)start writing and update timeout. Note: we don't recompute the timeout
+ * everytime we get here, otherwise it would risk never to expire. We only
+ * update it if is was not yet set. The stream socket handler will already
+ * have updated it if there has been a completed I/O.
+ */
+ si->flags &= ~SI_FL_WAIT_DATA;
+ if (!tick_isset(oc->wex)) {
+ oc->wex = tick_add_ifset(now_ms, oc->wto);
+ if (tick_isset(ic->rex) && !(si->flags & SI_FL_INDEP_STR)) {
+ /* Note: depending on the protocol, we don't know if we're waiting
+ * for incoming data or not. So in order to prevent the socket from
+ * expiring read timeouts during writes, we refresh the read timeout,
+ * except if it was already infinite or if we have explicitly setup
+ * independent streams.
+ */
+ ic->rex = tick_add_ifset(now_ms, ic->rto);
+ }
+ }
+ }
}
- if (ic->flags & CF_READ_ACTIVITY)
- ic->flags &= ~CF_READ_DONTWAIT;
+
+ if (!(si->flags & (SI_FL_WAIT_ROOM|SI_FL_WAIT_DATA)) &&
+ !(ic->flags & CF_DONT_READ) &&
+ (!(ic->flags & CF_SHUTR) || !(oc->flags & CF_SHUTW)))
+ appctx_wakeup(si_appctx(si));
}
/*