]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MEDIUM: stream: Map task wake up reasons to dedicated stream events
authorChristopher Faulet <cfaulet@haproxy.com>
Mon, 27 Jan 2025 17:38:40 +0000 (18:38 +0100)
committerChristopher Faulet <cfaulet@haproxy.com>
Tue, 28 Jan 2025 13:53:37 +0000 (14:53 +0100)
To fix thread-safety issues when a stream must be shut, three new task
states were added. These states are generic (UEVT1, UEVT2 and UEVT3), the
task callback function is responsible to know what to do with them. However,
it is not really scalable.

The best is to use an atomic field in the stream structure itself to deal
with these dedicated events. There is already the "pending_events" field
that save wake up reasons (TASK_WOKEN_*) to not loose them if
process_stream() is interrupted before it had a chance to handle them.

So the idea is to introduce a new field to handle streams dedicated events
and merged them with the task's wake up reasons used by the stream. This
means a mapping must be performed between some task wake up reasons and
streams events. Note that not all task wake up reasons will be mapped.

In this patch, the "new_events" field is introduced. It is an atomic
bit-field. Streams events (STRM_EVT_*) are also introduced to map the task
wake up reasons used by process_stream(). Only TASK_WOKEN_TIMER and
TASK_WOKEN_MSG are mapped, in addition to TASK_F_UEVT* flags. In
process_stream(), "pending_events" field is now filled with new stream
events and the mapping of the wake up reasons.

addons/ot/src/filter.c
include/haproxy/stream-t.h
include/haproxy/stream.h
src/flt_spoe.c
src/stream.c

index cf67fd20766a5189f14461986d5a78a97f1b8965..20d61d9813f9052da98dabf9d81f41cc0f179b91 100644 (file)
@@ -718,7 +718,7 @@ static void flt_ot_check_timeouts(struct stream *s, struct filter *f)
        if (flt_ot_is_disabled(f FLT_OT_DBG_ARGS(, -1)))
                FLT_OT_RETURN();
 
-       s->pending_events |= TASK_WOKEN_MSG;
+       s->pending_events |= STRM_EVT_MSG;
 
        flt_ot_return_void(f, &err);
 
index 5da8101ed1959731821a507d091159878e18432a..0211adaaed80dd44ea8d25e4acc84d80a7773835 100644 (file)
@@ -167,6 +167,18 @@ enum {
        STRM_ENTITY_WREQ_BODY = 0x0003,
 };
 
+/* All possible stream events handled by process_stream(). First ones are mapped
+ * from TASK_WOKEN_*.
+ */
+enum {
+       STRM_EVT_NONE          = 0x00000000, /* No events */
+       STRM_EVT_TIMER         = 0x00000001, /* A timer has expired */
+       STRM_EVT_MSG           = 0x00000002, /* A message event was triggered  */
+       STRM_EVT_SHUT_SRV_DOWN = 0x00000004, /* Must be shut because the selected server became available */
+       STRM_EVT_SHUT_SRV_UP   = 0x00000008, /* Must be shut because a preferred server became available */
+       STRM_EVT_KILLED        = 0x00000010, /* Must be shut for external reason */
+};
+
 /* This function is used to report flags in debugging tools. Please reflect
  * below any single-bit flag addition above in the same order via the
  * __APPEND_FLAG macro. The new end of the buffer is returned.
@@ -241,8 +253,8 @@ struct stream {
        struct http_txn *txn;           /* current HTTP transaction being processed. Should become a list. */
 
        struct task *task;              /* the task associated with this stream */
-       unsigned int pending_events;    /* the pending events not yet processed by the stream.
-                                        * This is a bit field of TASK_WOKEN_* */
+       unsigned int pending_events;    /* the pending events not yet processed by the stream but handled by process_stream() */
+       unsigned int new_events;        /* the new events added since the previous wakeup (never seen by process_stream()). It is atomic field */
        int conn_retries;               /* number of connect retries performed */
        unsigned int conn_exp;          /* wake up time for connect, queue, turn-around, ... */
        unsigned int conn_err_type;     /* first error detected, one of STRM_ET_* */
index a14a4b17a966b61c94c704787d59ea2c050fb6bf..3ca799912a3fccb00052b63472e66aeefe056da6 100644 (file)
@@ -403,6 +403,21 @@ static inline void stream_shutdown(struct stream *s, int why)
                     0));
 }
 
+/* Map task states to stream events. TASK_WOKEN_* and TASK_F_UEVT* are mapped on
+ * STRM_EVT_*. Not all states/flags are mapped, only those explicitly used by
+ * the stream.
+ */
+static inline unsigned int stream_map_task_state(unsigned int state)
+{
+       return ((state & TASK_WOKEN_TIMER) ? STRM_EVT_TIMER : 0)         |
+               ((state & TASK_WOKEN_MSG)  ? STRM_EVT_MSG : 0)           |
+               ((state & TASK_F_UEVT1)    ? STRM_EVT_SHUT_SRV_DOWN : 0) |
+               ((state & TASK_F_UEVT3)    ? STRM_EVT_SHUT_SRV_UP : 0)   |
+               ((state & TASK_F_UEVT2)    ? STRM_EVT_KILLED : 0)        |
+               0;
+}
+
+
 int stream_set_timeout(struct stream *s, enum act_timeout_name name, int timeout);
 void stream_retnclose(struct stream *s, const struct buffer *msg);
 void sess_set_term_flags(struct stream *s);
index 8f81e5494b0b846b70d7072a891b3eda848cd238..7aa354dcbf7af64eabacba3a9410edbb942abd93 100644 (file)
@@ -1346,7 +1346,7 @@ static void spoe_check_timeouts(struct stream *s, struct filter *filter)
        struct spoe_context *ctx = filter->ctx;
 
        if (tick_is_expired(ctx->process_exp, now_ms))
-               s->pending_events |= TASK_WOKEN_MSG;
+               s->pending_events |= STRM_EVT_MSG;
 }
 
 /* Called when we are ready to filter data on a channel */
index 0961ec9add823aea1ff4a531e04b672124c699a4..f5a3f806e8000bb2db6a4b97c4ed514299ded5e4 100644 (file)
@@ -432,7 +432,7 @@ struct stream *stream_new(struct session *sess, struct stconn *sc, struct buffer
                goto out_fail_alloc;
 
        s->task = t;
-       s->pending_events = 0;
+       s->pending_events = s->new_events = STRM_EVT_NONE;
        s->conn_retries = 0;
        s->max_retries = 0;
        s->conn_exp = TICK_ETERNITY;
@@ -1729,9 +1729,15 @@ struct task *process_stream(struct task *t, void *context, unsigned int state)
        activity[tid].stream_calls++;
        stream_cond_update_cpu_latency(s);
 
-       if ((state & TASK_WOKEN_OTHER) && (state & (TASK_F_UEVT1 | TASK_F_UEVT2 | TASK_F_UEVT3))) {
+       /* update pending events */
+       s->pending_events |= stream_map_task_state(state);
+       s->pending_events |= HA_ATOMIC_XCHG(&s->new_events, STRM_EVT_NONE);
+
+       if (s->pending_events & (STRM_EVT_SHUT_SRV_DOWN|STRM_EVT_SHUT_SRV_UP|STRM_EVT_KILLED)) {
                /* that an instant kill message, the reason is in _UEVT* */
-               stream_shutdown_self(s, (state & TASK_F_UEVT3) ? SF_ERR_UP : (state & TASK_F_UEVT2) ? SF_ERR_KILLED : SF_ERR_DOWN);
+               stream_shutdown_self(s, ((s->pending_events & STRM_EVT_SHUT_SRV_DOWN) ? SF_ERR_DOWN :
+                                        (s->pending_events & STRM_EVT_SHUT_SRV_UP) ? SF_ERR_UP:
+                                        SF_ERR_KILLED));
        }
 
        req = &s->req;
@@ -1774,24 +1780,23 @@ struct task *process_stream(struct task *t, void *context, unsigned int state)
        scf_flags = scf->flags;
        scb_flags = scb->flags;
 
-       /* update pending events */
-       s->pending_events |= (state & TASK_WOKEN_ANY);
-
        /* 1a: Check for low level timeouts if needed. We just set a flag on
         * stream connectors when their timeouts have expired.
         */
-       if (unlikely(s->pending_events & TASK_WOKEN_TIMER)) {
+       if (unlikely(s->pending_events & STRM_EVT_TIMER)) {
                stream_handle_timeouts(s);
 
                /* Once in a while we're woken up because the task expires. But
-                * this does not necessarily mean that a timeout has been reached.
-                * So let's not run a whole stream processing if only an expiration
-                * timeout needs to be refreshed.
+                * this does not necessarily mean that a timeout has been
+                * reached.  So let's not run a whole stream processing if only
+                * an expiration timeout needs to be refreshed. To do so, we
+                * must be sure only the TIMER event was triggered and not
+                * error/timeout/abort/shut occurred. on both sides.
                 */
                if (!((scf->flags | scb->flags) & (SC_FL_ERROR|SC_FL_EOS|SC_FL_ABRT_DONE|SC_FL_SHUT_DONE)) &&
                    !((req->flags | res->flags) & (CF_READ_EVENT|CF_READ_TIMEOUT|CF_WRITE_EVENT|CF_WRITE_TIMEOUT)) &&
                    !(s->flags & SF_CONN_EXP) &&
-                   ((s->pending_events & TASK_WOKEN_ANY) == TASK_WOKEN_TIMER)) {
+                   (s->pending_events  == STRM_EVT_TIMER)) {
                        scf->flags &= ~SC_FL_DONT_WAKE;
                        scb->flags &= ~SC_FL_DONT_WAKE;
                        goto update_exp_and_leave;
@@ -1952,7 +1957,7 @@ struct task *process_stream(struct task *t, void *context, unsigned int state)
            (req->analysers && (scb->flags & SC_FL_SHUT_DONE)) ||
            scf->state != rq_prod_last ||
            scb->state != rq_cons_last ||
-           s->pending_events & TASK_WOKEN_MSG) {
+           s->pending_events & STRM_EVT_MSG) {
                unsigned int scf_flags_ana = scf->flags;
                unsigned int scb_flags_ana = scb->flags;
 
@@ -2058,7 +2063,7 @@ struct task *process_stream(struct task *t, void *context, unsigned int state)
            (res->analysers && (scf->flags & SC_FL_SHUT_DONE)) ||
            scf->state != rp_cons_last ||
            scb->state != rp_prod_last ||
-           s->pending_events & TASK_WOKEN_MSG) {
+           s->pending_events & STRM_EVT_MSG) {
                unsigned int scb_flags_ana = scb->flags;
                unsigned int scf_flags_ana = scf->flags;
 
@@ -2524,7 +2529,7 @@ struct task *process_stream(struct task *t, void *context, unsigned int state)
                stream_update_both_sc(s);
 
                /* Reset pending events now */
-               s->pending_events = 0;
+               s->pending_events = STRM_EVT_NONE;
 
        update_exp_and_leave:
                /* Note: please ensure that if you branch here you disable SC_FL_DONT_WAKE */
@@ -2554,7 +2559,7 @@ struct task *process_stream(struct task *t, void *context, unsigned int state)
                        goto resync_stconns;
                }
        leave:
-               s->pending_events &= ~(TASK_WOKEN_TIMER | TASK_WOKEN_RES);
+               s->pending_events &= ~STRM_EVT_TIMER;
                stream_release_buffers(s);
 
                DBG_TRACE_DEVEL("queuing", STRM_EV_STRM_PROC, s);