]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
BUG/MEDIUM: stream: make stream_shutdown() async-safe
authorWilly Tarreau <w@1wt.eu>
Thu, 26 Sep 2024 16:30:36 +0000 (18:30 +0200)
committerWilly Tarreau <w@1wt.eu>
Fri, 27 Sep 2024 10:15:41 +0000 (12:15 +0200)
The solution found in commit b500e84e24 ("BUG/MINOR: server: shut down
streams under thread isolation") to deal with inter-thread stream
shutdown doesn't work fine because there exists code paths involving
a server lock which can then deadlock on thread_isolate(). A better
solution then consists in deferring the shutdown to the stream itself
and just wake it up for that.

The only thing is that TASK_WOKEN_OTHER is a bit too generic and we
need to pass at least 2 types of events (SF_ERR_DOWN and SF_ERR_KILLED),
so we're now leveraging the new TASK_F_UEVT1 and _UEVT2 flags on the
task's state to convey these info. The caller only needs to wake the
task up with these flags set, and the stream handler will then finish
the job locally using stream_shutdown_self().

This needs to be carefully backported to all branches affected by the
dequeuing issue and containing any of the 5541d4995d ("BUG/MEDIUM:
queue: deal with a rare TOCTOU in assign_server_and_queue()"), and/or
b11495652e ("BUG/MEDIUM: queue: implement a flag to check for the
dequeuing").

include/haproxy/stream.h
src/stream.c

index 6781249e89389733610fb421b82636efc533cef3..e806a2a6f09e260b3e55601ec490c96b06cbd2dd 100644 (file)
@@ -64,8 +64,8 @@ void stream_free(struct stream *s);
 int stream_upgrade_from_sc(struct stconn *sc, struct buffer *input);
 int stream_set_http_mode(struct stream *s, const struct mux_proto_list *mux_proto);
 
-/* kill a stream and set the termination flags to <why> (one of SF_ERR_*) */
-void stream_shutdown(struct stream *stream, int why);
+/* shutdown the stream from itself */
+void stream_shutdown_self(struct stream *stream, int why);
 void stream_dump_and_crash(enum obj_type *obj, int rate);
 void strm_dump_to_buffer(struct buffer *buf, const struct stream *strm, const char *pfx, uint32_t anon_key);
 
@@ -385,6 +385,22 @@ static inline int stream_check_conn_timeout(struct stream *s)
        return 0;
 }
 
+/* Wake a stream up for shutdown by sending it an event. The stream must be
+ * locked one way or another so that it cannot leave (i.e. when inspecting
+ * a locked list or under thread isolation). Process_stream() will recognize
+ * the message and complete the job. <why> only supports SF_ERR_DOWN (mapped
+ * to UEVT1) and SF_ERR_KILLED (mapped to UEVT2). Other values will just
+ * trigger TASK_WOKEN_OTHER. The stream handler will first call function
+ * stream_shutdown_self() on wakeup to complete the notification.
+ */
+static inline void stream_shutdown(struct stream *s, int why)
+{
+       task_wakeup(s->task, TASK_WOKEN_OTHER |
+                   ((why == SF_ERR_DOWN) ? TASK_F_UEVT1 :
+                    (why == SF_ERR_KILLED) ? TASK_F_UEVT2 :
+                    0));
+}
+
 int stream_set_timeout(struct stream *s, enum act_timeout_name name, int timeout);
 void stream_retnclose(struct stream *s, const struct buffer *msg);
 void sess_set_term_flags(struct stream *s);
index 9b48e7d78977d3d4e069a67721b3bb1b05ca863e..f689edd768ec5f885a9e7a830917236ae671efe8 100644 (file)
@@ -1722,6 +1722,12 @@ void stream_update_timings(struct task *t, uint64_t lat, uint64_t cpu)
  * nothing too many times, the request and response buffers flags are monitored
  * and each function is called only if at least another function has changed at
  * least one flag it is interested in.
+ *
+ * This task handler understands a few wake up reasons:
+ *  - TASK_WOKEN_MSG forces analysers to be re-evaluated
+ *  - TASK_WOKEN_OTHER+TASK_F_UEVT1 shuts the stream down on server down
+ *  - TASK_WOKEN_OTHER+TASK_F_UEVT2 shuts the stream down on active kill
+ *  - TASK_WOKEN_OTHER alone has no effect
  */
 struct task *process_stream(struct task *t, void *context, unsigned int state)
 {
@@ -1742,6 +1748,11 @@ struct task *process_stream(struct task *t, void *context, unsigned int state)
        activity[tid].stream_calls++;
        stream_cond_update_cpu_latency(s);
 
+       if ((state & TASK_WOKEN_OTHER) && (state & (TASK_F_UEVT1 | TASK_F_UEVT2))) {
+               /* that an instant kill message, the reason is in _UEVT* */
+               stream_shutdown_self(s, (state & TASK_F_UEVT2) ? SF_ERR_KILLED : SF_ERR_DOWN);
+       }
+
        req = &s->req;
        res = &s->res;
 
@@ -2776,8 +2787,12 @@ void default_srv_error(struct stream *s, struct stconn *sc)
                s->flags |= fin;
 }
 
-/* kill a stream and set the termination flags to <why> (one of SF_ERR_*) */
-void stream_shutdown(struct stream *stream, int why)
+/* shutdown the stream from itself. It's also possible for another one from the
+ * same thread but then an explicit wakeup will be needed since this function
+ * does not perform it. <why> is a set of SF_ERR_* flags to pass as the cause
+ * for shutting down.
+ */
+void stream_shutdown_self(struct stream *stream, int why)
 {
        if (stream->scb->flags & (SC_FL_SHUT_DONE|SC_FL_SHUT_WANTED))
                return;
@@ -2787,7 +2802,6 @@ void stream_shutdown(struct stream *stream, int why)
        stream->task->nice = 1024;
        if (!(stream->flags & SF_ERR_MASK))
                stream->flags |= why;
-       task_wakeup(stream->task, TASK_WOKEN_OTHER);
 }
 
 /* dumps an error message for type <type> at ptr <ptr> related to stream <s>,