STRM_ENTITY_WREQ_BODY = 0x0003,
};
+/* All possible stream events handled by process_stream(). First ones are mapped
+ * from TASK_WOKEN_*.
+ */
+enum {
+ STRM_EVT_NONE = 0x00000000, /* No events */
+ STRM_EVT_TIMER = 0x00000001, /* A timer has expired */
+ STRM_EVT_MSG = 0x00000002, /* A message event was triggered */
+ STRM_EVT_SHUT_SRV_DOWN = 0x00000004, /* Must be shut because the selected server became available */
+ STRM_EVT_SHUT_SRV_UP = 0x00000008, /* Must be shut because a preferred server became available */
+ STRM_EVT_KILLED = 0x00000010, /* Must be shut for external reason */
+};
+
/* This function is used to report flags in debugging tools. Please reflect
* below any single-bit flag addition above in the same order via the
* __APPEND_FLAG macro. The new end of the buffer is returned.
struct http_txn *txn; /* current HTTP transaction being processed. Should become a list. */
struct task *task; /* the task associated with this stream */
- unsigned int pending_events; /* the pending events not yet processed by the stream.
- * This is a bit field of TASK_WOKEN_* */
+ unsigned int pending_events; /* the pending events not yet processed by the stream but handled by process_stream() */
+ unsigned int new_events; /* the new events added since the previous wakeup (never seen by process_stream()). It is atomic field */
int conn_retries; /* number of connect retries performed */
unsigned int conn_exp; /* wake up time for connect, queue, turn-around, ... */
unsigned int conn_err_type; /* first error detected, one of STRM_ET_* */
0));
}
+/* Map task states to stream events. TASK_WOKEN_* and TASK_F_UEVT* are mapped on
+ * STRM_EVT_*. Not all states/flags are mapped, only those explicitly used by
+ * the stream.
+ */
+static inline unsigned int stream_map_task_state(unsigned int state)
+{
+ return ((state & TASK_WOKEN_TIMER) ? STRM_EVT_TIMER : 0) |
+ ((state & TASK_WOKEN_MSG) ? STRM_EVT_MSG : 0) |
+ ((state & TASK_F_UEVT1) ? STRM_EVT_SHUT_SRV_DOWN : 0) |
+ ((state & TASK_F_UEVT3) ? STRM_EVT_SHUT_SRV_UP : 0) |
+ ((state & TASK_F_UEVT2) ? STRM_EVT_KILLED : 0) |
+ 0;
+}
+
+
int stream_set_timeout(struct stream *s, enum act_timeout_name name, int timeout);
void stream_retnclose(struct stream *s, const struct buffer *msg);
void sess_set_term_flags(struct stream *s);
goto out_fail_alloc;
s->task = t;
- s->pending_events = 0;
+ s->pending_events = s->new_events = STRM_EVT_NONE;
s->conn_retries = 0;
s->max_retries = 0;
s->conn_exp = TICK_ETERNITY;
activity[tid].stream_calls++;
stream_cond_update_cpu_latency(s);
- if ((state & TASK_WOKEN_OTHER) && (state & (TASK_F_UEVT1 | TASK_F_UEVT2 | TASK_F_UEVT3))) {
+ /* update pending events */
+ s->pending_events |= stream_map_task_state(state);
+ s->pending_events |= HA_ATOMIC_XCHG(&s->new_events, STRM_EVT_NONE);
+
+ if (s->pending_events & (STRM_EVT_SHUT_SRV_DOWN|STRM_EVT_SHUT_SRV_UP|STRM_EVT_KILLED)) {
/* that an instant kill message, the reason is in _UEVT* */
- stream_shutdown_self(s, (state & TASK_F_UEVT3) ? SF_ERR_UP : (state & TASK_F_UEVT2) ? SF_ERR_KILLED : SF_ERR_DOWN);
+ stream_shutdown_self(s, ((s->pending_events & STRM_EVT_SHUT_SRV_DOWN) ? SF_ERR_DOWN :
+ (s->pending_events & STRM_EVT_SHUT_SRV_UP) ? SF_ERR_UP:
+ SF_ERR_KILLED));
}
req = &s->req;
scf_flags = scf->flags;
scb_flags = scb->flags;
- /* update pending events */
- s->pending_events |= (state & TASK_WOKEN_ANY);
-
/* 1a: Check for low level timeouts if needed. We just set a flag on
* stream connectors when their timeouts have expired.
*/
- if (unlikely(s->pending_events & TASK_WOKEN_TIMER)) {
+ if (unlikely(s->pending_events & STRM_EVT_TIMER)) {
stream_handle_timeouts(s);
/* Once in a while we're woken up because the task expires. But
- * this does not necessarily mean that a timeout has been reached.
- * So let's not run a whole stream processing if only an expiration
- * timeout needs to be refreshed.
+ * this does not necessarily mean that a timeout has been
+ * reached. So let's not run a whole stream processing if only
+ * an expiration timeout needs to be refreshed. To do so, we
+ * must be sure only the TIMER event was triggered and not
+ * error/timeout/abort/shut occurred. on both sides.
*/
if (!((scf->flags | scb->flags) & (SC_FL_ERROR|SC_FL_EOS|SC_FL_ABRT_DONE|SC_FL_SHUT_DONE)) &&
!((req->flags | res->flags) & (CF_READ_EVENT|CF_READ_TIMEOUT|CF_WRITE_EVENT|CF_WRITE_TIMEOUT)) &&
!(s->flags & SF_CONN_EXP) &&
- ((s->pending_events & TASK_WOKEN_ANY) == TASK_WOKEN_TIMER)) {
+ (s->pending_events == STRM_EVT_TIMER)) {
scf->flags &= ~SC_FL_DONT_WAKE;
scb->flags &= ~SC_FL_DONT_WAKE;
goto update_exp_and_leave;
(req->analysers && (scb->flags & SC_FL_SHUT_DONE)) ||
scf->state != rq_prod_last ||
scb->state != rq_cons_last ||
- s->pending_events & TASK_WOKEN_MSG) {
+ s->pending_events & STRM_EVT_MSG) {
unsigned int scf_flags_ana = scf->flags;
unsigned int scb_flags_ana = scb->flags;
(res->analysers && (scf->flags & SC_FL_SHUT_DONE)) ||
scf->state != rp_cons_last ||
scb->state != rp_prod_last ||
- s->pending_events & TASK_WOKEN_MSG) {
+ s->pending_events & STRM_EVT_MSG) {
unsigned int scb_flags_ana = scb->flags;
unsigned int scf_flags_ana = scf->flags;
stream_update_both_sc(s);
/* Reset pending events now */
- s->pending_events = 0;
+ s->pending_events = STRM_EVT_NONE;
update_exp_and_leave:
/* Note: please ensure that if you branch here you disable SC_FL_DONT_WAKE */
goto resync_stconns;
}
leave:
- s->pending_events &= ~(TASK_WOKEN_TIMER | TASK_WOKEN_RES);
+ s->pending_events &= ~STRM_EVT_TIMER;
stream_release_buffers(s);
DBG_TRACE_DEVEL("queuing", STRM_EV_STRM_PROC, s);