]> git.ipfire.org Git - thirdparty/linux.git/commitdiff
io_uring: switch normal task_work to a mpscq
authorJens Axboe <axboe@kernel.dk>
Thu, 11 Jun 2026 16:13:22 +0000 (10:13 -0600)
committerJens Axboe <axboe@kernel.dk>
Sat, 13 Jun 2026 12:27:11 +0000 (06:27 -0600)
Like the local task_work list, the normal (tctx) task_work list is an
llist, and hence needs the O(n) llist_reverse_order() pass before
running entries in queue order. On top of that, capped runs - sqpoll
processing IORING_TW_CAP_ENTRIES_VALUE entries at a time - need the
claimed-but-unprocessed leftovers carried in a separate retry_list,
as they can't be pushed back to the shared list.

Switch tctx->task_list to a mpscq, like what was done for the
DEFER_TASKRUN paths as well.

Signed-off-by: Jens Axboe <axboe@kernel.dk>
include/linux/io_uring_types.h
io_uring/sqpoll.c
io_uring/tctx.c
io_uring/tw.c
io_uring/tw.h

index 3e07c7059d7b277e074d71472700cd3671701ee3..f511e96865b6c66d5cb794c3a78c7e9296d15e67 100644 (file)
@@ -131,6 +131,11 @@ struct io_uring_task {
        const struct io_ring_ctx        *last;
        struct task_struct              *task;
        struct io_wq                    *io_wq;
+       /*
+        * Consumer cursor for ->task_list. Only popped by the task itself,
+        * or by ->fallback_work once the task can no longer run task_work.
+        */
+       struct llist_node               *task_head;
        struct file                     *registered_rings[IO_RINGFD_REG_MAX];
 
        struct xarray                   xa;
@@ -139,8 +144,13 @@ struct io_uring_task {
        atomic_t                        inflight_tracked;
        struct percpu_counter           inflight;
 
+       /* drains ->task_list once the task can no longer run task_work */
+       struct work_struct              fallback_work;
+
        struct { /* task_work */
-               struct llist_head       task_list;
+               struct mpscq            task_list;
+               /* BIT(0) guards adding tw only once */
+               unsigned long           tw_pending;
                struct callback_head    task_work;
        } ____cacheline_aligned_in_smp;
 };
index 46c12afec73ec57ddc60f9b98e723c9e8d74e05a..2460bd6052665d88f524836fb34addbfd4281619 100644 (file)
@@ -260,39 +260,29 @@ static bool io_sqd_handle_event(struct io_sq_data *sqd)
 }
 
 /*
- * Run task_work, processing the retry_list first. The retry_list holds
- * entries that we passed on in the previous run, if we had more task_work
- * than we were asked to process. Newly queued task_work isn't run until the
- * retry list has been fully processed.
+ * Run task_work, processing no more than max_entries at a time. If more
+ * than that is pending, it simply stays on the queue for the next run.
  */
-static unsigned int io_sq_tw(struct llist_node **retry_list, int max_entries)
+static unsigned int io_sq_tw(int max_entries)
 {
        struct io_uring_task *tctx = current->io_uring;
        unsigned int count = 0;
 
-       if (*retry_list) {
-               *retry_list = io_handle_tw_list(*retry_list, &count, max_entries);
-               if (count >= max_entries)
-                       goto out;
-               max_entries -= count;
-       }
-       *retry_list = tctx_task_work_run(tctx, max_entries, &count);
-out:
+       tctx_task_work_run(tctx, max_entries, &count);
        if (task_work_pending(current))
                task_work_run();
        return count;
 }
 
-static bool io_sq_tw_pending(struct llist_node *retry_list)
+static bool io_sq_tw_pending(void)
 {
        struct io_uring_task *tctx = current->io_uring;
 
-       return retry_list || !llist_empty(&tctx->task_list);
+       return !mpscq_empty(&tctx->task_list);
 }
 
 static int io_sq_thread(void *data)
 {
-       struct llist_node *retry_list = NULL;
        struct io_sq_data *sqd = data;
        struct io_ring_ctx *ctx;
        unsigned long timeout = 0;
@@ -347,7 +337,7 @@ static int io_sq_thread(void *data)
                        if (!sqt_spin && (ret > 0 || !list_empty(&ctx->iopoll_list)))
                                sqt_spin = true;
                }
-               if (io_sq_tw(&retry_list, IORING_TW_CAP_ENTRIES_VALUE))
+               if (io_sq_tw(IORING_TW_CAP_ENTRIES_VALUE))
                        sqt_spin = true;
 
                list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) {
@@ -372,7 +362,7 @@ static int io_sq_thread(void *data)
                }
 
                prepare_to_wait(&sqd->wait, &wait, TASK_INTERRUPTIBLE);
-               if (!io_sqd_events_pending(sqd) && !io_sq_tw_pending(retry_list)) {
+               if (!io_sqd_events_pending(sqd) && !io_sq_tw_pending()) {
                        bool needs_sched = true;
 
                        list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) {
@@ -411,8 +401,8 @@ static int io_sq_thread(void *data)
                timeout = jiffies + sqd->sq_thread_idle;
        }
 
-       if (retry_list)
-               io_sq_tw(&retry_list, UINT_MAX);
+       if (io_sq_tw_pending())
+               io_sq_tw(UINT_MAX);
 
        io_uring_cancel_generic(true, sqd);
        rcu_assign_pointer(sqd->thread, NULL);
index 6af62ca9baba3720e5926126a470bce257e792b5..8c6abc332b6ca77944f4c8ed3015dc1e171d2a94 100644 (file)
@@ -103,7 +103,8 @@ __cold struct io_uring_task *io_uring_alloc_task_context(struct task_struct *tas
        init_waitqueue_head(&tctx->wait);
        atomic_set(&tctx->in_cancel, 0);
        atomic_set(&tctx->inflight_tracked, 0);
-       init_llist_head(&tctx->task_list);
+       mpscq_init(&tctx->task_list, &tctx->task_head);
+       INIT_WORK(&tctx->fallback_work, io_tctx_fallback_work);
        init_task_work(&tctx->task_work, tctx_task_work);
        return tctx;
 }
index 79feeb4f671a117a18f885eea83077c536090e56..485d4c094a9ffc11da8527b963fd64fb13662ec2 100644 (file)
@@ -46,46 +46,6 @@ static void ctx_flush_and_put(struct io_ring_ctx *ctx, io_tw_token_t tw)
        percpu_ref_put(&ctx->refs);
 }
 
-/*
- * Run queued task_work, returning the number of entries processed in *count.
- * If more entries than max_entries are available, stop processing once this
- * is reached and return the rest of the list.
- */
-struct llist_node *io_handle_tw_list(struct llist_node *node,
-                                    unsigned int *count,
-                                    unsigned int max_entries)
-{
-       struct io_ring_ctx *ctx = NULL;
-       struct io_tw_state ts = { };
-
-       do {
-               struct llist_node *next = node->next;
-               struct io_kiocb *req = container_of(node, struct io_kiocb,
-                                                   io_task_work.node);
-
-               if (req->ctx != ctx) {
-                       ctx_flush_and_put(ctx, ts);
-                       ctx = req->ctx;
-                       mutex_lock(&ctx->uring_lock);
-                       percpu_ref_get(&ctx->refs);
-                       ts.cancel = io_should_terminate_tw(ctx);
-               }
-               INDIRECT_CALL_2(req->io_task_work.func,
-                               io_poll_task_func, io_req_rw_complete,
-                               (struct io_tw_req){req}, ts);
-               node = next;
-               (*count)++;
-               if (unlikely(need_resched())) {
-                       ctx_flush_and_put(ctx, ts);
-                       ctx = NULL;
-                       cond_resched();
-               }
-       } while (node && *count < max_entries);
-
-       ctx_flush_and_put(ctx, ts);
-       return node;
-}
-
 static __cold void __io_fallback_tw(struct llist_node *node, bool sync)
 {
        struct io_ring_ctx *last_ctx = NULL;
@@ -114,43 +74,109 @@ static __cold void __io_fallback_tw(struct llist_node *node, bool sync)
        }
 }
 
-static void io_fallback_tw(struct io_uring_task *tctx, bool sync)
+void io_tctx_fallback_work(struct work_struct *work)
 {
-       struct llist_node *node = llist_del_all(&tctx->task_list);
+       struct io_uring_task *tctx = container_of(work, struct io_uring_task,
+                                                 fallback_work);
+       struct llist_node *node, *first = NULL, **tail = &first;
+
+       /* see tctx_task_work() - a set bit must always have a run coming */
+       clear_bit(0, &tctx->tw_pending);
+       smp_mb__after_atomic();
+
+       while (!mpscq_empty(&tctx->task_list)) {
+               node = mpscq_pop(&tctx->task_list, &tctx->task_head);
+               if (!node) {
+                       /* a producer is mid-push, wait for it to link */
+                       cond_resched();
+                       continue;
+               }
+               *tail = node;
+               tail = &node->next;
+       }
+       *tail = NULL;
+       __io_fallback_tw(first, false);
+       put_task_struct(tctx->task);
+}
 
-       __io_fallback_tw(node, sync);
+static void io_fallback_tw(struct io_uring_task *tctx)
+{
+       /*
+        * The task ref both keeps ->task valid and, as __io_uring_free() is
+        * only called when the task itself is freed, ensures the tctx (and
+        * the queued work) stay around until the drain has run.
+        */
+       get_task_struct(tctx->task);
+       if (!queue_work(system_unbound_wq, &tctx->fallback_work))
+               put_task_struct(tctx->task);
 }
 
-struct llist_node *tctx_task_work_run(struct io_uring_task *tctx,
-                                     unsigned int max_entries,
-                                     unsigned int *count)
+/*
+ * Run queued task_work, processing no more than max_entries, with the number
+ * of entries processed added to *count. If more entries than max_entries are
+ * available, the remainder simply stay on the queue for the next run.
+ */
+void tctx_task_work_run(struct io_uring_task *tctx, unsigned int max_entries,
+                       unsigned int *count)
 {
-       struct llist_node *node;
+       struct io_ring_ctx *ctx = NULL;
+       struct io_tw_state ts = { };
 
-       node = llist_del_all(&tctx->task_list);
-       if (node) {
-               node = llist_reverse_order(node);
-               node = io_handle_tw_list(node, count, max_entries);
+       while (*count < max_entries) {
+               struct llist_node *node = mpscq_pop(&tctx->task_list,
+                                                   &tctx->task_head);
+               struct io_kiocb *req;
+
+               if (!node) {
+                       if (mpscq_empty(&tctx->task_list))
+                               break;
+                       /*
+                        * A producer has published a node but hasn't
+                        * linked it into the queue yet (see mpscq_pop()).
+                        * Give it a chance to finish rather than spinning,
+                        * and don't sit on the ctx lock while doing so.
+                        */
+                       ctx_flush_and_put(ctx, ts);
+                       ctx = NULL;
+                       cond_resched();
+                       continue;
+               }
+               req = container_of(node, struct io_kiocb, io_task_work.node);
+               if (req->ctx != ctx) {
+                       ctx_flush_and_put(ctx, ts);
+                       ctx = req->ctx;
+                       mutex_lock(&ctx->uring_lock);
+                       percpu_ref_get(&ctx->refs);
+                       ts.cancel = io_should_terminate_tw(ctx);
+               }
+               INDIRECT_CALL_2(req->io_task_work.func,
+                               io_poll_task_func, io_req_rw_complete,
+                               (struct io_tw_req){req}, ts);
+               (*count)++;
+               if (unlikely(need_resched())) {
+                       ctx_flush_and_put(ctx, ts);
+                       ctx = NULL;
+                       cond_resched();
+               }
        }
+       ctx_flush_and_put(ctx, ts);
 
        /* relaxed read is enough as only the task itself sets ->in_cancel */
        if (unlikely(atomic_read(&tctx->in_cancel)))
                io_uring_drop_tctx_refs(current);
 
        trace_io_uring_task_work_run(tctx, *count);
-       return node;
 }
 
 void tctx_task_work(struct callback_head *cb)
 {
        struct io_uring_task *tctx;
-       struct llist_node *ret;
        unsigned int count = 0;
 
        tctx = container_of(cb, struct io_uring_task, task_work);
-       ret = tctx_task_work_run(tctx, UINT_MAX, &count);
-       /* can't happen */
-       WARN_ON_ONCE(ret);
+       clear_bit(0, &tctx->tw_pending);
+       smp_mb__after_atomic();
+       tctx_task_work_run(tctx, UINT_MAX, &count);
 }
 
 /*
@@ -228,7 +254,7 @@ void io_req_normal_work_add(struct io_kiocb *req)
        struct io_ring_ctx *ctx = req->ctx;
 
        /* task_work already pending, we're done */
-       if (!llist_add(&req->io_task_work.node, &tctx->task_list))
+       if (!mpscq_push(&tctx->task_list, &req->io_task_work.node))
                return;
 
        /*
@@ -244,10 +270,14 @@ void io_req_normal_work_add(struct io_kiocb *req)
                return;
        }
 
+       /* task_work must only be added once */
+       if (test_and_set_bit(0, &tctx->tw_pending))
+               return;
+
        if (likely(!task_work_add(tctx->task, &tctx->task_work, ctx->notify_method)))
                return;
 
-       io_fallback_tw(tctx, false);
+       io_fallback_tw(tctx);
 }
 
 void io_req_task_work_add_remote(struct io_kiocb *req, unsigned flags)
index f42db5fdbdede9cf1c1dca777cd754106334fc08..387e52004da8032b4db6f44e25a31a785bfb2e90 100644 (file)
@@ -25,8 +25,8 @@ static inline bool io_should_terminate_tw(struct io_ring_ctx *ctx)
 }
 
 void io_req_task_work_add_remote(struct io_kiocb *req, unsigned flags);
-struct llist_node *io_handle_tw_list(struct llist_node *node, unsigned int *count, unsigned int max_entries);
 void tctx_task_work(struct callback_head *cb);
+void io_tctx_fallback_work(struct work_struct *work);
 int io_run_local_work(struct io_ring_ctx *ctx, int min_events, int max_events);
 int io_run_task_work_sig(struct io_ring_ctx *ctx);
 
@@ -36,7 +36,7 @@ int io_run_local_work_locked(struct io_ring_ctx *ctx, int min_events);
 
 void io_req_local_work_add(struct io_kiocb *req, unsigned flags);
 void io_req_normal_work_add(struct io_kiocb *req);
-struct llist_node *tctx_task_work_run(struct io_uring_task *tctx, unsigned int max_entries, unsigned int *count);
+void tctx_task_work_run(struct io_uring_task *tctx, unsigned int max_entries, unsigned int *count);
 
 static inline void __io_req_task_work_add(struct io_kiocb *req, unsigned flags)
 {