1 // SPDX-License-Identifier: GPL-2.0
3 * Contains the core associated with submission side polling of the SQ
4 * ring, offloading submissions from the application to a kernel thread.
6 #include <linux/kernel.h>
7 #include <linux/errno.h>
8 #include <linux/file.h>
10 #include <linux/slab.h>
11 #include <linux/audit.h>
12 #include <linux/security.h>
13 #include <linux/io_uring.h>
15 #include <uapi/linux/io_uring.h>
21 #define IORING_SQPOLL_CAP_ENTRIES_VALUE 8
22 #define IORING_TW_CAP_ENTRIES_VALUE 8
25 IO_SQ_THREAD_SHOULD_STOP
= 0,
26 IO_SQ_THREAD_SHOULD_PARK
,
29 void io_sq_thread_unpark(struct io_sq_data
*sqd
)
30 __releases(&sqd
->lock
)
32 WARN_ON_ONCE(sqd
->thread
== current
);
35 * Do the dance but not conditional clear_bit() because it'd race with
36 * other threads incrementing park_pending and setting the bit.
38 clear_bit(IO_SQ_THREAD_SHOULD_PARK
, &sqd
->state
);
39 if (atomic_dec_return(&sqd
->park_pending
))
40 set_bit(IO_SQ_THREAD_SHOULD_PARK
, &sqd
->state
);
41 mutex_unlock(&sqd
->lock
);
44 void io_sq_thread_park(struct io_sq_data
*sqd
)
45 __acquires(&sqd
->lock
)
47 WARN_ON_ONCE(sqd
->thread
== current
);
49 atomic_inc(&sqd
->park_pending
);
50 set_bit(IO_SQ_THREAD_SHOULD_PARK
, &sqd
->state
);
51 mutex_lock(&sqd
->lock
);
53 wake_up_process(sqd
->thread
);
56 void io_sq_thread_stop(struct io_sq_data
*sqd
)
58 WARN_ON_ONCE(sqd
->thread
== current
);
59 WARN_ON_ONCE(test_bit(IO_SQ_THREAD_SHOULD_STOP
, &sqd
->state
));
61 set_bit(IO_SQ_THREAD_SHOULD_STOP
, &sqd
->state
);
62 mutex_lock(&sqd
->lock
);
64 wake_up_process(sqd
->thread
);
65 mutex_unlock(&sqd
->lock
);
66 wait_for_completion(&sqd
->exited
);
69 void io_put_sq_data(struct io_sq_data
*sqd
)
71 if (refcount_dec_and_test(&sqd
->refs
)) {
72 WARN_ON_ONCE(atomic_read(&sqd
->park_pending
));
74 io_sq_thread_stop(sqd
);
79 static __cold
void io_sqd_update_thread_idle(struct io_sq_data
*sqd
)
81 struct io_ring_ctx
*ctx
;
82 unsigned sq_thread_idle
= 0;
84 list_for_each_entry(ctx
, &sqd
->ctx_list
, sqd_list
)
85 sq_thread_idle
= max(sq_thread_idle
, ctx
->sq_thread_idle
);
86 sqd
->sq_thread_idle
= sq_thread_idle
;
89 void io_sq_thread_finish(struct io_ring_ctx
*ctx
)
91 struct io_sq_data
*sqd
= ctx
->sq_data
;
94 io_sq_thread_park(sqd
);
95 list_del_init(&ctx
->sqd_list
);
96 io_sqd_update_thread_idle(sqd
);
97 io_sq_thread_unpark(sqd
);
104 static struct io_sq_data
*io_attach_sq_data(struct io_uring_params
*p
)
106 struct io_ring_ctx
*ctx_attach
;
107 struct io_sq_data
*sqd
;
112 return ERR_PTR(-ENXIO
);
113 if (!io_is_uring_fops(f
.file
)) {
115 return ERR_PTR(-EINVAL
);
118 ctx_attach
= f
.file
->private_data
;
119 sqd
= ctx_attach
->sq_data
;
122 return ERR_PTR(-EINVAL
);
124 if (sqd
->task_tgid
!= current
->tgid
) {
126 return ERR_PTR(-EPERM
);
129 refcount_inc(&sqd
->refs
);
134 static struct io_sq_data
*io_get_sq_data(struct io_uring_params
*p
,
137 struct io_sq_data
*sqd
;
140 if (p
->flags
& IORING_SETUP_ATTACH_WQ
) {
141 sqd
= io_attach_sq_data(p
);
146 /* fall through for EPERM case, setup new sqd/task */
147 if (PTR_ERR(sqd
) != -EPERM
)
151 sqd
= kzalloc(sizeof(*sqd
), GFP_KERNEL
);
153 return ERR_PTR(-ENOMEM
);
155 atomic_set(&sqd
->park_pending
, 0);
156 refcount_set(&sqd
->refs
, 1);
157 INIT_LIST_HEAD(&sqd
->ctx_list
);
158 mutex_init(&sqd
->lock
);
159 init_waitqueue_head(&sqd
->wait
);
160 init_completion(&sqd
->exited
);
164 static inline bool io_sqd_events_pending(struct io_sq_data
*sqd
)
166 return READ_ONCE(sqd
->state
);
169 static int __io_sq_thread(struct io_ring_ctx
*ctx
, bool cap_entries
)
171 unsigned int to_submit
;
174 to_submit
= io_sqring_entries(ctx
);
175 /* if we're handling multiple rings, cap submit size for fairness */
176 if (cap_entries
&& to_submit
> IORING_SQPOLL_CAP_ENTRIES_VALUE
)
177 to_submit
= IORING_SQPOLL_CAP_ENTRIES_VALUE
;
179 if (!wq_list_empty(&ctx
->iopoll_list
) || to_submit
) {
180 const struct cred
*creds
= NULL
;
182 if (ctx
->sq_creds
!= current_cred())
183 creds
= override_creds(ctx
->sq_creds
);
185 mutex_lock(&ctx
->uring_lock
);
186 if (!wq_list_empty(&ctx
->iopoll_list
))
187 io_do_iopoll(ctx
, true);
190 * Don't submit if refs are dying, good for io_uring_register(),
191 * but also it is relied upon by io_ring_exit_work()
193 if (to_submit
&& likely(!percpu_ref_is_dying(&ctx
->refs
)) &&
194 !(ctx
->flags
& IORING_SETUP_R_DISABLED
))
195 ret
= io_submit_sqes(ctx
, to_submit
);
196 mutex_unlock(&ctx
->uring_lock
);
199 ret
+= io_napi_sqpoll_busy_poll(ctx
);
201 if (to_submit
&& wq_has_sleeper(&ctx
->sqo_sq_wait
))
202 wake_up(&ctx
->sqo_sq_wait
);
210 static bool io_sqd_handle_event(struct io_sq_data
*sqd
)
212 bool did_sig
= false;
215 if (test_bit(IO_SQ_THREAD_SHOULD_PARK
, &sqd
->state
) ||
216 signal_pending(current
)) {
217 mutex_unlock(&sqd
->lock
);
218 if (signal_pending(current
))
219 did_sig
= get_signal(&ksig
);
221 mutex_lock(&sqd
->lock
);
222 sqd
->sq_cpu
= raw_smp_processor_id();
224 return did_sig
|| test_bit(IO_SQ_THREAD_SHOULD_STOP
, &sqd
->state
);
228 * Run task_work, processing the retry_list first. The retry_list holds
229 * entries that we passed on in the previous run, if we had more task_work
230 * than we were asked to process. Newly queued task_work isn't run until the
231 * retry list has been fully processed.
233 static unsigned int io_sq_tw(struct llist_node
**retry_list
, int max_entries
)
235 struct io_uring_task
*tctx
= current
->io_uring
;
236 unsigned int count
= 0;
239 *retry_list
= io_handle_tw_list(*retry_list
, &count
, max_entries
);
240 if (count
>= max_entries
)
242 max_entries
-= count
;
245 *retry_list
= tctx_task_work_run(tctx
, max_entries
, &count
);
249 static bool io_sq_tw_pending(struct llist_node
*retry_list
)
251 struct io_uring_task
*tctx
= current
->io_uring
;
253 return retry_list
|| !llist_empty(&tctx
->task_list
);
256 static void io_sq_update_worktime(struct io_sq_data
*sqd
, struct rusage
*start
)
260 getrusage(current
, RUSAGE_SELF
, &end
);
261 end
.ru_stime
.tv_sec
-= start
->ru_stime
.tv_sec
;
262 end
.ru_stime
.tv_usec
-= start
->ru_stime
.tv_usec
;
264 sqd
->work_time
+= end
.ru_stime
.tv_usec
+ end
.ru_stime
.tv_sec
* 1000000;
267 static int io_sq_thread(void *data
)
269 struct llist_node
*retry_list
= NULL
;
270 struct io_sq_data
*sqd
= data
;
271 struct io_ring_ctx
*ctx
;
273 unsigned long timeout
= 0;
274 char buf
[TASK_COMM_LEN
];
277 /* offload context creation failed, just exit */
278 if (!current
->io_uring
)
281 snprintf(buf
, sizeof(buf
), "iou-sqp-%d", sqd
->task_pid
);
282 set_task_comm(current
, buf
);
284 /* reset to our pid after we've set task_comm, for fdinfo */
285 sqd
->task_pid
= current
->pid
;
287 if (sqd
->sq_cpu
!= -1) {
288 set_cpus_allowed_ptr(current
, cpumask_of(sqd
->sq_cpu
));
290 set_cpus_allowed_ptr(current
, cpu_online_mask
);
291 sqd
->sq_cpu
= raw_smp_processor_id();
294 mutex_lock(&sqd
->lock
);
296 bool cap_entries
, sqt_spin
= false;
298 if (io_sqd_events_pending(sqd
) || signal_pending(current
)) {
299 if (io_sqd_handle_event(sqd
))
301 timeout
= jiffies
+ sqd
->sq_thread_idle
;
304 cap_entries
= !list_is_singular(&sqd
->ctx_list
);
305 getrusage(current
, RUSAGE_SELF
, &start
);
306 list_for_each_entry(ctx
, &sqd
->ctx_list
, sqd_list
) {
307 int ret
= __io_sq_thread(ctx
, cap_entries
);
309 if (!sqt_spin
&& (ret
> 0 || !wq_list_empty(&ctx
->iopoll_list
)))
312 if (io_sq_tw(&retry_list
, IORING_TW_CAP_ENTRIES_VALUE
))
315 if (sqt_spin
|| !time_after(jiffies
, timeout
)) {
317 io_sq_update_worktime(sqd
, &start
);
318 timeout
= jiffies
+ sqd
->sq_thread_idle
;
320 if (unlikely(need_resched())) {
321 mutex_unlock(&sqd
->lock
);
323 mutex_lock(&sqd
->lock
);
324 sqd
->sq_cpu
= raw_smp_processor_id();
329 prepare_to_wait(&sqd
->wait
, &wait
, TASK_INTERRUPTIBLE
);
330 if (!io_sqd_events_pending(sqd
) && !io_sq_tw_pending(retry_list
)) {
331 bool needs_sched
= true;
333 list_for_each_entry(ctx
, &sqd
->ctx_list
, sqd_list
) {
334 atomic_or(IORING_SQ_NEED_WAKEUP
,
335 &ctx
->rings
->sq_flags
);
336 if ((ctx
->flags
& IORING_SETUP_IOPOLL
) &&
337 !wq_list_empty(&ctx
->iopoll_list
)) {
343 * Ensure the store of the wakeup flag is not
344 * reordered with the load of the SQ tail
346 smp_mb__after_atomic();
348 if (io_sqring_entries(ctx
)) {
355 mutex_unlock(&sqd
->lock
);
357 mutex_lock(&sqd
->lock
);
358 sqd
->sq_cpu
= raw_smp_processor_id();
360 list_for_each_entry(ctx
, &sqd
->ctx_list
, sqd_list
)
361 atomic_andnot(IORING_SQ_NEED_WAKEUP
,
362 &ctx
->rings
->sq_flags
);
365 finish_wait(&sqd
->wait
, &wait
);
366 timeout
= jiffies
+ sqd
->sq_thread_idle
;
370 io_sq_tw(&retry_list
, UINT_MAX
);
372 io_uring_cancel_generic(true, sqd
);
374 list_for_each_entry(ctx
, &sqd
->ctx_list
, sqd_list
)
375 atomic_or(IORING_SQ_NEED_WAKEUP
, &ctx
->rings
->sq_flags
);
377 mutex_unlock(&sqd
->lock
);
379 complete(&sqd
->exited
);
383 void io_sqpoll_wait_sq(struct io_ring_ctx
*ctx
)
388 if (!io_sqring_full(ctx
))
390 prepare_to_wait(&ctx
->sqo_sq_wait
, &wait
, TASK_INTERRUPTIBLE
);
392 if (!io_sqring_full(ctx
))
395 } while (!signal_pending(current
));
397 finish_wait(&ctx
->sqo_sq_wait
, &wait
);
400 __cold
int io_sq_offload_create(struct io_ring_ctx
*ctx
,
401 struct io_uring_params
*p
)
405 /* Retain compatibility with failing for an invalid attach attempt */
406 if ((ctx
->flags
& (IORING_SETUP_ATTACH_WQ
| IORING_SETUP_SQPOLL
)) ==
407 IORING_SETUP_ATTACH_WQ
) {
413 if (!io_is_uring_fops(f
.file
)) {
419 if (ctx
->flags
& IORING_SETUP_SQPOLL
) {
420 struct task_struct
*tsk
;
421 struct io_sq_data
*sqd
;
424 ret
= security_uring_sqpoll();
428 sqd
= io_get_sq_data(p
, &attached
);
434 ctx
->sq_creds
= get_current_cred();
436 ctx
->sq_thread_idle
= msecs_to_jiffies(p
->sq_thread_idle
);
437 if (!ctx
->sq_thread_idle
)
438 ctx
->sq_thread_idle
= HZ
;
440 io_sq_thread_park(sqd
);
441 list_add(&ctx
->sqd_list
, &sqd
->ctx_list
);
442 io_sqd_update_thread_idle(sqd
);
443 /* don't attach to a dying SQPOLL thread, would be racy */
444 ret
= (attached
&& !sqd
->thread
) ? -ENXIO
: 0;
445 io_sq_thread_unpark(sqd
);
452 if (p
->flags
& IORING_SETUP_SQ_AFF
) {
453 int cpu
= p
->sq_thread_cpu
;
456 if (cpu
>= nr_cpu_ids
|| !cpu_online(cpu
))
463 sqd
->task_pid
= current
->pid
;
464 sqd
->task_tgid
= current
->tgid
;
465 tsk
= create_io_thread(io_sq_thread
, sqd
, NUMA_NO_NODE
);
472 ret
= io_uring_alloc_task_context(tsk
, ctx
);
473 wake_up_new_task(tsk
);
476 } else if (p
->flags
& IORING_SETUP_SQ_AFF
) {
477 /* Can't have SQ_AFF without SQPOLL */
484 complete(&ctx
->sq_data
->exited
);
486 io_sq_thread_finish(ctx
);
490 __cold
int io_sqpoll_wq_cpu_affinity(struct io_ring_ctx
*ctx
,
493 struct io_sq_data
*sqd
= ctx
->sq_data
;
497 io_sq_thread_park(sqd
);
498 /* Don't set affinity for a dying thread */
500 ret
= io_wq_cpu_affinity(sqd
->thread
->io_uring
, mask
);
501 io_sq_thread_unpark(sqd
);