}
/*
- * Run task_work, processing the retry_list first. The retry_list holds
- * entries that we passed on in the previous run, if we had more task_work
- * than we were asked to process. Newly queued task_work isn't run until the
- * retry list has been fully processed.
+ * Run task_work, processing no more than max_entries at a time. If more
+ * than that is pending, it simply stays on the queue for the next run.
*/
-static unsigned int io_sq_tw(struct llist_node **retry_list, int max_entries)
+static unsigned int io_sq_tw(int max_entries)
{
struct io_uring_task *tctx = current->io_uring;
unsigned int count = 0;
- if (*retry_list) {
- *retry_list = io_handle_tw_list(*retry_list, &count, max_entries);
- if (count >= max_entries)
- goto out;
- max_entries -= count;
- }
- *retry_list = tctx_task_work_run(tctx, max_entries, &count);
-out:
+ tctx_task_work_run(tctx, max_entries, &count);
if (task_work_pending(current))
task_work_run();
return count;
}
-static bool io_sq_tw_pending(struct llist_node *retry_list)
+static bool io_sq_tw_pending(void)
{
struct io_uring_task *tctx = current->io_uring;
- return retry_list || !llist_empty(&tctx->task_list);
+ return !mpscq_empty(&tctx->task_list);
}
static int io_sq_thread(void *data)
{
- struct llist_node *retry_list = NULL;
struct io_sq_data *sqd = data;
struct io_ring_ctx *ctx;
unsigned long timeout = 0;
if (!sqt_spin && (ret > 0 || !list_empty(&ctx->iopoll_list)))
sqt_spin = true;
}
- if (io_sq_tw(&retry_list, IORING_TW_CAP_ENTRIES_VALUE))
+ if (io_sq_tw(IORING_TW_CAP_ENTRIES_VALUE))
sqt_spin = true;
list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) {
}
prepare_to_wait(&sqd->wait, &wait, TASK_INTERRUPTIBLE);
- if (!io_sqd_events_pending(sqd) && !io_sq_tw_pending(retry_list)) {
+ if (!io_sqd_events_pending(sqd) && !io_sq_tw_pending()) {
bool needs_sched = true;
list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) {
timeout = jiffies + sqd->sq_thread_idle;
}
- if (retry_list)
- io_sq_tw(&retry_list, UINT_MAX);
+ if (io_sq_tw_pending())
+ io_sq_tw(UINT_MAX);
io_uring_cancel_generic(true, sqd);
rcu_assign_pointer(sqd->thread, NULL);
percpu_ref_put(&ctx->refs);
}
-/*
- * Run queued task_work, returning the number of entries processed in *count.
- * If more entries than max_entries are available, stop processing once this
- * is reached and return the rest of the list.
- */
-struct llist_node *io_handle_tw_list(struct llist_node *node,
- unsigned int *count,
- unsigned int max_entries)
-{
- struct io_ring_ctx *ctx = NULL;
- struct io_tw_state ts = { };
-
- do {
- struct llist_node *next = node->next;
- struct io_kiocb *req = container_of(node, struct io_kiocb,
- io_task_work.node);
-
- if (req->ctx != ctx) {
- ctx_flush_and_put(ctx, ts);
- ctx = req->ctx;
- mutex_lock(&ctx->uring_lock);
- percpu_ref_get(&ctx->refs);
- ts.cancel = io_should_terminate_tw(ctx);
- }
- INDIRECT_CALL_2(req->io_task_work.func,
- io_poll_task_func, io_req_rw_complete,
- (struct io_tw_req){req}, ts);
- node = next;
- (*count)++;
- if (unlikely(need_resched())) {
- ctx_flush_and_put(ctx, ts);
- ctx = NULL;
- cond_resched();
- }
- } while (node && *count < max_entries);
-
- ctx_flush_and_put(ctx, ts);
- return node;
-}
-
static __cold void __io_fallback_tw(struct llist_node *node, bool sync)
{
struct io_ring_ctx *last_ctx = NULL;
}
}
-static void io_fallback_tw(struct io_uring_task *tctx, bool sync)
+void io_tctx_fallback_work(struct work_struct *work)
{
- struct llist_node *node = llist_del_all(&tctx->task_list);
+ struct io_uring_task *tctx = container_of(work, struct io_uring_task,
+ fallback_work);
+ struct llist_node *node, *first = NULL, **tail = &first;
+
+ /* see tctx_task_work() - a set bit must always have a run coming */
+ clear_bit(0, &tctx->tw_pending);
+ smp_mb__after_atomic();
+
+ while (!mpscq_empty(&tctx->task_list)) {
+ node = mpscq_pop(&tctx->task_list, &tctx->task_head);
+ if (!node) {
+ /* a producer is mid-push, wait for it to link */
+ cond_resched();
+ continue;
+ }
+ *tail = node;
+ tail = &node->next;
+ }
+ *tail = NULL;
+ __io_fallback_tw(first, false);
+ put_task_struct(tctx->task);
+}
- __io_fallback_tw(node, sync);
+static void io_fallback_tw(struct io_uring_task *tctx)
+{
+ /*
+ * The task ref both keeps ->task valid and, as __io_uring_free() is
+ * only called when the task itself is freed, ensures the tctx (and
+ * the queued work) stay around until the drain has run.
+ */
+ get_task_struct(tctx->task);
+ if (!queue_work(system_unbound_wq, &tctx->fallback_work))
+ put_task_struct(tctx->task);
}
-struct llist_node *tctx_task_work_run(struct io_uring_task *tctx,
- unsigned int max_entries,
- unsigned int *count)
+/*
+ * Run queued task_work, processing no more than max_entries, with the number
+ * of entries processed added to *count. If more entries than max_entries are
+ * available, the remainder simply stay on the queue for the next run.
+ */
+void tctx_task_work_run(struct io_uring_task *tctx, unsigned int max_entries,
+ unsigned int *count)
{
- struct llist_node *node;
+ struct io_ring_ctx *ctx = NULL;
+ struct io_tw_state ts = { };
- node = llist_del_all(&tctx->task_list);
- if (node) {
- node = llist_reverse_order(node);
- node = io_handle_tw_list(node, count, max_entries);
+ while (*count < max_entries) {
+ struct llist_node *node = mpscq_pop(&tctx->task_list,
+ &tctx->task_head);
+ struct io_kiocb *req;
+
+ if (!node) {
+ if (mpscq_empty(&tctx->task_list))
+ break;
+ /*
+ * A producer has published a node but hasn't
+ * linked it into the queue yet (see mpscq_pop()).
+ * Give it a chance to finish rather than spinning,
+ * and don't sit on the ctx lock while doing so.
+ */
+ ctx_flush_and_put(ctx, ts);
+ ctx = NULL;
+ cond_resched();
+ continue;
+ }
+ req = container_of(node, struct io_kiocb, io_task_work.node);
+ if (req->ctx != ctx) {
+ ctx_flush_and_put(ctx, ts);
+ ctx = req->ctx;
+ mutex_lock(&ctx->uring_lock);
+ percpu_ref_get(&ctx->refs);
+ ts.cancel = io_should_terminate_tw(ctx);
+ }
+ INDIRECT_CALL_2(req->io_task_work.func,
+ io_poll_task_func, io_req_rw_complete,
+ (struct io_tw_req){req}, ts);
+ (*count)++;
+ if (unlikely(need_resched())) {
+ ctx_flush_and_put(ctx, ts);
+ ctx = NULL;
+ cond_resched();
+ }
}
+ ctx_flush_and_put(ctx, ts);
/* relaxed read is enough as only the task itself sets ->in_cancel */
if (unlikely(atomic_read(&tctx->in_cancel)))
io_uring_drop_tctx_refs(current);
trace_io_uring_task_work_run(tctx, *count);
- return node;
}
void tctx_task_work(struct callback_head *cb)
{
struct io_uring_task *tctx;
- struct llist_node *ret;
unsigned int count = 0;
tctx = container_of(cb, struct io_uring_task, task_work);
- ret = tctx_task_work_run(tctx, UINT_MAX, &count);
- /* can't happen */
- WARN_ON_ONCE(ret);
+ clear_bit(0, &tctx->tw_pending);
+ smp_mb__after_atomic();
+ tctx_task_work_run(tctx, UINT_MAX, &count);
}
/*
struct io_ring_ctx *ctx = req->ctx;
/* task_work already pending, we're done */
- if (!llist_add(&req->io_task_work.node, &tctx->task_list))
+ if (!mpscq_push(&tctx->task_list, &req->io_task_work.node))
return;
/*
return;
}
+ /* task_work must only be added once */
+ if (test_and_set_bit(0, &tctx->tw_pending))
+ return;
+
if (likely(!task_work_add(tctx->task, &tctx->task_work, ctx->notify_method)))
return;
- io_fallback_tw(tctx, false);
+ io_fallback_tw(tctx);
}
void io_req_task_work_add_remote(struct io_kiocb *req, unsigned flags)