From b8e3b0a18d59b4f52b4ecb7ae61cef0b8b2402a0 Mon Sep 17 00:00:00 2001 From: Willy Tarreau Date: Thu, 26 Sep 2024 18:30:36 +0200 Subject: [PATCH] BUG/MEDIUM: stream: make stream_shutdown() async-safe The solution found in commit b500e84e24 ("BUG/MINOR: server: shut down streams under thread isolation") to deal with inter-thread stream shutdown doesn't work fine because there exists code paths involving a server lock which can then deadlock on thread_isolate(). A better solution then consists in deferring the shutdown to the stream itself and just wake it up for that. The only thing is that TASK_WOKEN_OTHER is a bit too generic and we need to pass at least 2 types of events (SF_ERR_DOWN and SF_ERR_KILLED), so we're now leveraging the new TASK_F_UEVT1 and _UEVT2 flags on the task's state to convey these info. The caller only needs to wake the task up with these flags set, and the stream handler will then finish the job locally using stream_shutdown_self(). This needs to be carefully backported to all branches affected by the dequeuing issue and containing any of the 5541d4995d ("BUG/MEDIUM: queue: deal with a rare TOCTOU in assign_server_and_queue()"), and/or b11495652e ("BUG/MEDIUM: queue: implement a flag to check for the dequeuing"). --- include/haproxy/stream.h | 20 ++++++++++++++++++-- src/stream.c | 20 +++++++++++++++++--- 2 files changed, 35 insertions(+), 5 deletions(-) diff --git a/include/haproxy/stream.h b/include/haproxy/stream.h index 6781249e89..e806a2a6f0 100644 --- a/include/haproxy/stream.h +++ b/include/haproxy/stream.h @@ -64,8 +64,8 @@ void stream_free(struct stream *s); int stream_upgrade_from_sc(struct stconn *sc, struct buffer *input); int stream_set_http_mode(struct stream *s, const struct mux_proto_list *mux_proto); -/* kill a stream and set the termination flags to (one of SF_ERR_*) */ -void stream_shutdown(struct stream *stream, int why); +/* shutdown the stream from itself */ +void stream_shutdown_self(struct stream *stream, int why); void stream_dump_and_crash(enum obj_type *obj, int rate); void strm_dump_to_buffer(struct buffer *buf, const struct stream *strm, const char *pfx, uint32_t anon_key); @@ -385,6 +385,22 @@ static inline int stream_check_conn_timeout(struct stream *s) return 0; } +/* Wake a stream up for shutdown by sending it an event. The stream must be + * locked one way or another so that it cannot leave (i.e. when inspecting + * a locked list or under thread isolation). Process_stream() will recognize + * the message and complete the job. only supports SF_ERR_DOWN (mapped + * to UEVT1) and SF_ERR_KILLED (mapped to UEVT2). Other values will just + * trigger TASK_WOKEN_OTHER. The stream handler will first call function + * stream_shutdown_self() on wakeup to complete the notification. + */ +static inline void stream_shutdown(struct stream *s, int why) +{ + task_wakeup(s->task, TASK_WOKEN_OTHER | + ((why == SF_ERR_DOWN) ? TASK_F_UEVT1 : + (why == SF_ERR_KILLED) ? TASK_F_UEVT2 : + 0)); +} + int stream_set_timeout(struct stream *s, enum act_timeout_name name, int timeout); void stream_retnclose(struct stream *s, const struct buffer *msg); void sess_set_term_flags(struct stream *s); diff --git a/src/stream.c b/src/stream.c index 9b48e7d789..f689edd768 100644 --- a/src/stream.c +++ b/src/stream.c @@ -1722,6 +1722,12 @@ void stream_update_timings(struct task *t, uint64_t lat, uint64_t cpu) * nothing too many times, the request and response buffers flags are monitored * and each function is called only if at least another function has changed at * least one flag it is interested in. + * + * This task handler understands a few wake up reasons: + * - TASK_WOKEN_MSG forces analysers to be re-evaluated + * - TASK_WOKEN_OTHER+TASK_F_UEVT1 shuts the stream down on server down + * - TASK_WOKEN_OTHER+TASK_F_UEVT2 shuts the stream down on active kill + * - TASK_WOKEN_OTHER alone has no effect */ struct task *process_stream(struct task *t, void *context, unsigned int state) { @@ -1742,6 +1748,11 @@ struct task *process_stream(struct task *t, void *context, unsigned int state) activity[tid].stream_calls++; stream_cond_update_cpu_latency(s); + if ((state & TASK_WOKEN_OTHER) && (state & (TASK_F_UEVT1 | TASK_F_UEVT2))) { + /* that an instant kill message, the reason is in _UEVT* */ + stream_shutdown_self(s, (state & TASK_F_UEVT2) ? SF_ERR_KILLED : SF_ERR_DOWN); + } + req = &s->req; res = &s->res; @@ -2776,8 +2787,12 @@ void default_srv_error(struct stream *s, struct stconn *sc) s->flags |= fin; } -/* kill a stream and set the termination flags to (one of SF_ERR_*) */ -void stream_shutdown(struct stream *stream, int why) +/* shutdown the stream from itself. It's also possible for another one from the + * same thread but then an explicit wakeup will be needed since this function + * does not perform it. is a set of SF_ERR_* flags to pass as the cause + * for shutting down. + */ +void stream_shutdown_self(struct stream *stream, int why) { if (stream->scb->flags & (SC_FL_SHUT_DONE|SC_FL_SHUT_WANTED)) return; @@ -2787,7 +2802,6 @@ void stream_shutdown(struct stream *stream, int why) stream->task->nice = 1024; if (!(stream->flags & SF_ERR_MASK)) stream->flags |= why; - task_wakeup(stream->task, TASK_WOKEN_OTHER); } /* dumps an error message for type at ptr related to stream , -- 2.39.5