From: Jens Axboe Date: Thu, 11 Jun 2026 16:13:22 +0000 (-0600) Subject: io_uring: switch normal task_work to a mpscq X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=de7341ffe49ed30a1d75b254ac8c731b057247bf;p=thirdparty%2Fkernel%2Flinux.git io_uring: switch normal task_work to a mpscq Like the local task_work list, the normal (tctx) task_work list is an llist, and hence needs the O(n) llist_reverse_order() pass before running entries in queue order. On top of that, capped runs - sqpoll processing IORING_TW_CAP_ENTRIES_VALUE entries at a time - need the claimed-but-unprocessed leftovers carried in a separate retry_list, as they can't be pushed back to the shared list. Switch tctx->task_list to a mpscq, like what was done for the DEFER_TASKRUN paths as well. Signed-off-by: Jens Axboe --- diff --git a/include/linux/io_uring_types.h b/include/linux/io_uring_types.h index 3e07c7059d7b2..f511e96865b6c 100644 --- a/include/linux/io_uring_types.h +++ b/include/linux/io_uring_types.h @@ -131,6 +131,11 @@ struct io_uring_task { const struct io_ring_ctx *last; struct task_struct *task; struct io_wq *io_wq; + /* + * Consumer cursor for ->task_list. Only popped by the task itself, + * or by ->fallback_work once the task can no longer run task_work. + */ + struct llist_node *task_head; struct file *registered_rings[IO_RINGFD_REG_MAX]; struct xarray xa; @@ -139,8 +144,13 @@ struct io_uring_task { atomic_t inflight_tracked; struct percpu_counter inflight; + /* drains ->task_list once the task can no longer run task_work */ + struct work_struct fallback_work; + struct { /* task_work */ - struct llist_head task_list; + struct mpscq task_list; + /* BIT(0) guards adding tw only once */ + unsigned long tw_pending; struct callback_head task_work; } ____cacheline_aligned_in_smp; }; diff --git a/io_uring/sqpoll.c b/io_uring/sqpoll.c index 46c12afec73ec..2460bd6052665 100644 --- a/io_uring/sqpoll.c +++ b/io_uring/sqpoll.c @@ -260,39 +260,29 @@ static bool io_sqd_handle_event(struct io_sq_data *sqd) } /* - * Run task_work, processing the retry_list first. The retry_list holds - * entries that we passed on in the previous run, if we had more task_work - * than we were asked to process. Newly queued task_work isn't run until the - * retry list has been fully processed. + * Run task_work, processing no more than max_entries at a time. If more + * than that is pending, it simply stays on the queue for the next run. */ -static unsigned int io_sq_tw(struct llist_node **retry_list, int max_entries) +static unsigned int io_sq_tw(int max_entries) { struct io_uring_task *tctx = current->io_uring; unsigned int count = 0; - if (*retry_list) { - *retry_list = io_handle_tw_list(*retry_list, &count, max_entries); - if (count >= max_entries) - goto out; - max_entries -= count; - } - *retry_list = tctx_task_work_run(tctx, max_entries, &count); -out: + tctx_task_work_run(tctx, max_entries, &count); if (task_work_pending(current)) task_work_run(); return count; } -static bool io_sq_tw_pending(struct llist_node *retry_list) +static bool io_sq_tw_pending(void) { struct io_uring_task *tctx = current->io_uring; - return retry_list || !llist_empty(&tctx->task_list); + return !mpscq_empty(&tctx->task_list); } static int io_sq_thread(void *data) { - struct llist_node *retry_list = NULL; struct io_sq_data *sqd = data; struct io_ring_ctx *ctx; unsigned long timeout = 0; @@ -347,7 +337,7 @@ static int io_sq_thread(void *data) if (!sqt_spin && (ret > 0 || !list_empty(&ctx->iopoll_list))) sqt_spin = true; } - if (io_sq_tw(&retry_list, IORING_TW_CAP_ENTRIES_VALUE)) + if (io_sq_tw(IORING_TW_CAP_ENTRIES_VALUE)) sqt_spin = true; list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) { @@ -372,7 +362,7 @@ static int io_sq_thread(void *data) } prepare_to_wait(&sqd->wait, &wait, TASK_INTERRUPTIBLE); - if (!io_sqd_events_pending(sqd) && !io_sq_tw_pending(retry_list)) { + if (!io_sqd_events_pending(sqd) && !io_sq_tw_pending()) { bool needs_sched = true; list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) { @@ -411,8 +401,8 @@ static int io_sq_thread(void *data) timeout = jiffies + sqd->sq_thread_idle; } - if (retry_list) - io_sq_tw(&retry_list, UINT_MAX); + if (io_sq_tw_pending()) + io_sq_tw(UINT_MAX); io_uring_cancel_generic(true, sqd); rcu_assign_pointer(sqd->thread, NULL); diff --git a/io_uring/tctx.c b/io_uring/tctx.c index 6af62ca9baba3..8c6abc332b6ca 100644 --- a/io_uring/tctx.c +++ b/io_uring/tctx.c @@ -103,7 +103,8 @@ __cold struct io_uring_task *io_uring_alloc_task_context(struct task_struct *tas init_waitqueue_head(&tctx->wait); atomic_set(&tctx->in_cancel, 0); atomic_set(&tctx->inflight_tracked, 0); - init_llist_head(&tctx->task_list); + mpscq_init(&tctx->task_list, &tctx->task_head); + INIT_WORK(&tctx->fallback_work, io_tctx_fallback_work); init_task_work(&tctx->task_work, tctx_task_work); return tctx; } diff --git a/io_uring/tw.c b/io_uring/tw.c index 79feeb4f671a1..485d4c094a9ff 100644 --- a/io_uring/tw.c +++ b/io_uring/tw.c @@ -46,46 +46,6 @@ static void ctx_flush_and_put(struct io_ring_ctx *ctx, io_tw_token_t tw) percpu_ref_put(&ctx->refs); } -/* - * Run queued task_work, returning the number of entries processed in *count. - * If more entries than max_entries are available, stop processing once this - * is reached and return the rest of the list. - */ -struct llist_node *io_handle_tw_list(struct llist_node *node, - unsigned int *count, - unsigned int max_entries) -{ - struct io_ring_ctx *ctx = NULL; - struct io_tw_state ts = { }; - - do { - struct llist_node *next = node->next; - struct io_kiocb *req = container_of(node, struct io_kiocb, - io_task_work.node); - - if (req->ctx != ctx) { - ctx_flush_and_put(ctx, ts); - ctx = req->ctx; - mutex_lock(&ctx->uring_lock); - percpu_ref_get(&ctx->refs); - ts.cancel = io_should_terminate_tw(ctx); - } - INDIRECT_CALL_2(req->io_task_work.func, - io_poll_task_func, io_req_rw_complete, - (struct io_tw_req){req}, ts); - node = next; - (*count)++; - if (unlikely(need_resched())) { - ctx_flush_and_put(ctx, ts); - ctx = NULL; - cond_resched(); - } - } while (node && *count < max_entries); - - ctx_flush_and_put(ctx, ts); - return node; -} - static __cold void __io_fallback_tw(struct llist_node *node, bool sync) { struct io_ring_ctx *last_ctx = NULL; @@ -114,43 +74,109 @@ static __cold void __io_fallback_tw(struct llist_node *node, bool sync) } } -static void io_fallback_tw(struct io_uring_task *tctx, bool sync) +void io_tctx_fallback_work(struct work_struct *work) { - struct llist_node *node = llist_del_all(&tctx->task_list); + struct io_uring_task *tctx = container_of(work, struct io_uring_task, + fallback_work); + struct llist_node *node, *first = NULL, **tail = &first; + + /* see tctx_task_work() - a set bit must always have a run coming */ + clear_bit(0, &tctx->tw_pending); + smp_mb__after_atomic(); + + while (!mpscq_empty(&tctx->task_list)) { + node = mpscq_pop(&tctx->task_list, &tctx->task_head); + if (!node) { + /* a producer is mid-push, wait for it to link */ + cond_resched(); + continue; + } + *tail = node; + tail = &node->next; + } + *tail = NULL; + __io_fallback_tw(first, false); + put_task_struct(tctx->task); +} - __io_fallback_tw(node, sync); +static void io_fallback_tw(struct io_uring_task *tctx) +{ + /* + * The task ref both keeps ->task valid and, as __io_uring_free() is + * only called when the task itself is freed, ensures the tctx (and + * the queued work) stay around until the drain has run. + */ + get_task_struct(tctx->task); + if (!queue_work(system_unbound_wq, &tctx->fallback_work)) + put_task_struct(tctx->task); } -struct llist_node *tctx_task_work_run(struct io_uring_task *tctx, - unsigned int max_entries, - unsigned int *count) +/* + * Run queued task_work, processing no more than max_entries, with the number + * of entries processed added to *count. If more entries than max_entries are + * available, the remainder simply stay on the queue for the next run. + */ +void tctx_task_work_run(struct io_uring_task *tctx, unsigned int max_entries, + unsigned int *count) { - struct llist_node *node; + struct io_ring_ctx *ctx = NULL; + struct io_tw_state ts = { }; - node = llist_del_all(&tctx->task_list); - if (node) { - node = llist_reverse_order(node); - node = io_handle_tw_list(node, count, max_entries); + while (*count < max_entries) { + struct llist_node *node = mpscq_pop(&tctx->task_list, + &tctx->task_head); + struct io_kiocb *req; + + if (!node) { + if (mpscq_empty(&tctx->task_list)) + break; + /* + * A producer has published a node but hasn't + * linked it into the queue yet (see mpscq_pop()). + * Give it a chance to finish rather than spinning, + * and don't sit on the ctx lock while doing so. + */ + ctx_flush_and_put(ctx, ts); + ctx = NULL; + cond_resched(); + continue; + } + req = container_of(node, struct io_kiocb, io_task_work.node); + if (req->ctx != ctx) { + ctx_flush_and_put(ctx, ts); + ctx = req->ctx; + mutex_lock(&ctx->uring_lock); + percpu_ref_get(&ctx->refs); + ts.cancel = io_should_terminate_tw(ctx); + } + INDIRECT_CALL_2(req->io_task_work.func, + io_poll_task_func, io_req_rw_complete, + (struct io_tw_req){req}, ts); + (*count)++; + if (unlikely(need_resched())) { + ctx_flush_and_put(ctx, ts); + ctx = NULL; + cond_resched(); + } } + ctx_flush_and_put(ctx, ts); /* relaxed read is enough as only the task itself sets ->in_cancel */ if (unlikely(atomic_read(&tctx->in_cancel))) io_uring_drop_tctx_refs(current); trace_io_uring_task_work_run(tctx, *count); - return node; } void tctx_task_work(struct callback_head *cb) { struct io_uring_task *tctx; - struct llist_node *ret; unsigned int count = 0; tctx = container_of(cb, struct io_uring_task, task_work); - ret = tctx_task_work_run(tctx, UINT_MAX, &count); - /* can't happen */ - WARN_ON_ONCE(ret); + clear_bit(0, &tctx->tw_pending); + smp_mb__after_atomic(); + tctx_task_work_run(tctx, UINT_MAX, &count); } /* @@ -228,7 +254,7 @@ void io_req_normal_work_add(struct io_kiocb *req) struct io_ring_ctx *ctx = req->ctx; /* task_work already pending, we're done */ - if (!llist_add(&req->io_task_work.node, &tctx->task_list)) + if (!mpscq_push(&tctx->task_list, &req->io_task_work.node)) return; /* @@ -244,10 +270,14 @@ void io_req_normal_work_add(struct io_kiocb *req) return; } + /* task_work must only be added once */ + if (test_and_set_bit(0, &tctx->tw_pending)) + return; + if (likely(!task_work_add(tctx->task, &tctx->task_work, ctx->notify_method))) return; - io_fallback_tw(tctx, false); + io_fallback_tw(tctx); } void io_req_task_work_add_remote(struct io_kiocb *req, unsigned flags) diff --git a/io_uring/tw.h b/io_uring/tw.h index f42db5fdbdede..387e52004da80 100644 --- a/io_uring/tw.h +++ b/io_uring/tw.h @@ -25,8 +25,8 @@ static inline bool io_should_terminate_tw(struct io_ring_ctx *ctx) } void io_req_task_work_add_remote(struct io_kiocb *req, unsigned flags); -struct llist_node *io_handle_tw_list(struct llist_node *node, unsigned int *count, unsigned int max_entries); void tctx_task_work(struct callback_head *cb); +void io_tctx_fallback_work(struct work_struct *work); int io_run_local_work(struct io_ring_ctx *ctx, int min_events, int max_events); int io_run_task_work_sig(struct io_ring_ctx *ctx); @@ -36,7 +36,7 @@ int io_run_local_work_locked(struct io_ring_ctx *ctx, int min_events); void io_req_local_work_add(struct io_kiocb *req, unsigned flags); void io_req_normal_work_add(struct io_kiocb *req); -struct llist_node *tctx_task_work_run(struct io_uring_task *tctx, unsigned int max_entries, unsigned int *count); +void tctx_task_work_run(struct io_uring_task *tctx, unsigned int max_entries, unsigned int *count); static inline void __io_req_task_work_add(struct io_kiocb *req, unsigned flags) {