1 // SPDX-License-Identifier: GPL-2.0
3 * Contains the core associated with submission side polling of the SQ
4 * ring, offloading submissions from the application to a kernel thread.
6 #include <linux/kernel.h>
7 #include <linux/errno.h>
8 #include <linux/file.h>
10 #include <linux/slab.h>
11 #include <linux/audit.h>
12 #include <linux/security.h>
13 #include <linux/io_uring.h>
15 #include <uapi/linux/io_uring.h>
20 #define IORING_SQPOLL_CAP_ENTRIES_VALUE 8
21 #define IORING_TW_CAP_ENTRIES_VALUE 8
24 IO_SQ_THREAD_SHOULD_STOP
= 0,
25 IO_SQ_THREAD_SHOULD_PARK
,
28 void io_sq_thread_unpark(struct io_sq_data
*sqd
)
29 __releases(&sqd
->lock
)
31 WARN_ON_ONCE(sqd
->thread
== current
);
34 * Do the dance but not conditional clear_bit() because it'd race with
35 * other threads incrementing park_pending and setting the bit.
37 clear_bit(IO_SQ_THREAD_SHOULD_PARK
, &sqd
->state
);
38 if (atomic_dec_return(&sqd
->park_pending
))
39 set_bit(IO_SQ_THREAD_SHOULD_PARK
, &sqd
->state
);
40 mutex_unlock(&sqd
->lock
);
43 void io_sq_thread_park(struct io_sq_data
*sqd
)
44 __acquires(&sqd
->lock
)
46 WARN_ON_ONCE(sqd
->thread
== current
);
48 atomic_inc(&sqd
->park_pending
);
49 set_bit(IO_SQ_THREAD_SHOULD_PARK
, &sqd
->state
);
50 mutex_lock(&sqd
->lock
);
52 wake_up_process(sqd
->thread
);
55 void io_sq_thread_stop(struct io_sq_data
*sqd
)
57 WARN_ON_ONCE(sqd
->thread
== current
);
58 WARN_ON_ONCE(test_bit(IO_SQ_THREAD_SHOULD_STOP
, &sqd
->state
));
60 set_bit(IO_SQ_THREAD_SHOULD_STOP
, &sqd
->state
);
61 mutex_lock(&sqd
->lock
);
63 wake_up_process(sqd
->thread
);
64 mutex_unlock(&sqd
->lock
);
65 wait_for_completion(&sqd
->exited
);
68 void io_put_sq_data(struct io_sq_data
*sqd
)
70 if (refcount_dec_and_test(&sqd
->refs
)) {
71 WARN_ON_ONCE(atomic_read(&sqd
->park_pending
));
73 io_sq_thread_stop(sqd
);
78 static __cold
void io_sqd_update_thread_idle(struct io_sq_data
*sqd
)
80 struct io_ring_ctx
*ctx
;
81 unsigned sq_thread_idle
= 0;
83 list_for_each_entry(ctx
, &sqd
->ctx_list
, sqd_list
)
84 sq_thread_idle
= max(sq_thread_idle
, ctx
->sq_thread_idle
);
85 sqd
->sq_thread_idle
= sq_thread_idle
;
88 void io_sq_thread_finish(struct io_ring_ctx
*ctx
)
90 struct io_sq_data
*sqd
= ctx
->sq_data
;
93 io_sq_thread_park(sqd
);
94 list_del_init(&ctx
->sqd_list
);
95 io_sqd_update_thread_idle(sqd
);
96 io_sq_thread_unpark(sqd
);
103 static struct io_sq_data
*io_attach_sq_data(struct io_uring_params
*p
)
105 struct io_ring_ctx
*ctx_attach
;
106 struct io_sq_data
*sqd
;
111 return ERR_PTR(-ENXIO
);
112 if (!io_is_uring_fops(f
.file
)) {
114 return ERR_PTR(-EINVAL
);
117 ctx_attach
= f
.file
->private_data
;
118 sqd
= ctx_attach
->sq_data
;
121 return ERR_PTR(-EINVAL
);
123 if (sqd
->task_tgid
!= current
->tgid
) {
125 return ERR_PTR(-EPERM
);
128 refcount_inc(&sqd
->refs
);
133 static struct io_sq_data
*io_get_sq_data(struct io_uring_params
*p
,
136 struct io_sq_data
*sqd
;
139 if (p
->flags
& IORING_SETUP_ATTACH_WQ
) {
140 sqd
= io_attach_sq_data(p
);
145 /* fall through for EPERM case, setup new sqd/task */
146 if (PTR_ERR(sqd
) != -EPERM
)
150 sqd
= kzalloc(sizeof(*sqd
), GFP_KERNEL
);
152 return ERR_PTR(-ENOMEM
);
154 atomic_set(&sqd
->park_pending
, 0);
155 refcount_set(&sqd
->refs
, 1);
156 INIT_LIST_HEAD(&sqd
->ctx_list
);
157 mutex_init(&sqd
->lock
);
158 init_waitqueue_head(&sqd
->wait
);
159 init_completion(&sqd
->exited
);
163 static inline bool io_sqd_events_pending(struct io_sq_data
*sqd
)
165 return READ_ONCE(sqd
->state
);
168 static int __io_sq_thread(struct io_ring_ctx
*ctx
, bool cap_entries
)
170 unsigned int to_submit
;
173 to_submit
= io_sqring_entries(ctx
);
174 /* if we're handling multiple rings, cap submit size for fairness */
175 if (cap_entries
&& to_submit
> IORING_SQPOLL_CAP_ENTRIES_VALUE
)
176 to_submit
= IORING_SQPOLL_CAP_ENTRIES_VALUE
;
178 if (!wq_list_empty(&ctx
->iopoll_list
) || to_submit
) {
179 const struct cred
*creds
= NULL
;
181 if (ctx
->sq_creds
!= current_cred())
182 creds
= override_creds(ctx
->sq_creds
);
184 mutex_lock(&ctx
->uring_lock
);
185 if (!wq_list_empty(&ctx
->iopoll_list
))
186 io_do_iopoll(ctx
, true);
189 * Don't submit if refs are dying, good for io_uring_register(),
190 * but also it is relied upon by io_ring_exit_work()
192 if (to_submit
&& likely(!percpu_ref_is_dying(&ctx
->refs
)) &&
193 !(ctx
->flags
& IORING_SETUP_R_DISABLED
))
194 ret
= io_submit_sqes(ctx
, to_submit
);
195 mutex_unlock(&ctx
->uring_lock
);
197 if (to_submit
&& wq_has_sleeper(&ctx
->sqo_sq_wait
))
198 wake_up(&ctx
->sqo_sq_wait
);
206 static bool io_sqd_handle_event(struct io_sq_data
*sqd
)
208 bool did_sig
= false;
211 if (test_bit(IO_SQ_THREAD_SHOULD_PARK
, &sqd
->state
) ||
212 signal_pending(current
)) {
213 mutex_unlock(&sqd
->lock
);
214 if (signal_pending(current
))
215 did_sig
= get_signal(&ksig
);
217 mutex_lock(&sqd
->lock
);
218 sqd
->sq_cpu
= raw_smp_processor_id();
220 return did_sig
|| test_bit(IO_SQ_THREAD_SHOULD_STOP
, &sqd
->state
);
224 * Run task_work, processing the retry_list first. The retry_list holds
225 * entries that we passed on in the previous run, if we had more task_work
226 * than we were asked to process. Newly queued task_work isn't run until the
227 * retry list has been fully processed.
229 static unsigned int io_sq_tw(struct llist_node
**retry_list
, int max_entries
)
231 struct io_uring_task
*tctx
= current
->io_uring
;
232 unsigned int count
= 0;
235 *retry_list
= io_handle_tw_list(*retry_list
, &count
, max_entries
);
236 if (count
>= max_entries
)
238 max_entries
-= count
;
241 *retry_list
= tctx_task_work_run(tctx
, max_entries
, &count
);
245 static int io_sq_thread(void *data
)
247 struct llist_node
*retry_list
= NULL
;
248 struct io_sq_data
*sqd
= data
;
249 struct io_ring_ctx
*ctx
;
250 unsigned long timeout
= 0;
251 char buf
[TASK_COMM_LEN
];
254 snprintf(buf
, sizeof(buf
), "iou-sqp-%d", sqd
->task_pid
);
255 set_task_comm(current
, buf
);
257 /* reset to our pid after we've set task_comm, for fdinfo */
258 sqd
->task_pid
= current
->pid
;
260 if (sqd
->sq_cpu
!= -1) {
261 set_cpus_allowed_ptr(current
, cpumask_of(sqd
->sq_cpu
));
263 set_cpus_allowed_ptr(current
, cpu_online_mask
);
264 sqd
->sq_cpu
= raw_smp_processor_id();
267 mutex_lock(&sqd
->lock
);
269 bool cap_entries
, sqt_spin
= false;
271 if (io_sqd_events_pending(sqd
) || signal_pending(current
)) {
272 if (io_sqd_handle_event(sqd
))
274 timeout
= jiffies
+ sqd
->sq_thread_idle
;
277 cap_entries
= !list_is_singular(&sqd
->ctx_list
);
278 list_for_each_entry(ctx
, &sqd
->ctx_list
, sqd_list
) {
279 int ret
= __io_sq_thread(ctx
, cap_entries
);
281 if (!sqt_spin
&& (ret
> 0 || !wq_list_empty(&ctx
->iopoll_list
)))
284 if (io_sq_tw(&retry_list
, IORING_TW_CAP_ENTRIES_VALUE
))
287 if (sqt_spin
|| !time_after(jiffies
, timeout
)) {
289 timeout
= jiffies
+ sqd
->sq_thread_idle
;
290 if (unlikely(need_resched())) {
291 mutex_unlock(&sqd
->lock
);
293 mutex_lock(&sqd
->lock
);
294 sqd
->sq_cpu
= raw_smp_processor_id();
299 prepare_to_wait(&sqd
->wait
, &wait
, TASK_INTERRUPTIBLE
);
300 if (!io_sqd_events_pending(sqd
) && !task_work_pending(current
)) {
301 bool needs_sched
= true;
303 list_for_each_entry(ctx
, &sqd
->ctx_list
, sqd_list
) {
304 atomic_or(IORING_SQ_NEED_WAKEUP
,
305 &ctx
->rings
->sq_flags
);
306 if ((ctx
->flags
& IORING_SETUP_IOPOLL
) &&
307 !wq_list_empty(&ctx
->iopoll_list
)) {
313 * Ensure the store of the wakeup flag is not
314 * reordered with the load of the SQ tail
316 smp_mb__after_atomic();
318 if (io_sqring_entries(ctx
)) {
325 mutex_unlock(&sqd
->lock
);
327 mutex_lock(&sqd
->lock
);
328 sqd
->sq_cpu
= raw_smp_processor_id();
330 list_for_each_entry(ctx
, &sqd
->ctx_list
, sqd_list
)
331 atomic_andnot(IORING_SQ_NEED_WAKEUP
,
332 &ctx
->rings
->sq_flags
);
335 finish_wait(&sqd
->wait
, &wait
);
336 timeout
= jiffies
+ sqd
->sq_thread_idle
;
340 io_sq_tw(&retry_list
, UINT_MAX
);
342 io_uring_cancel_generic(true, sqd
);
344 list_for_each_entry(ctx
, &sqd
->ctx_list
, sqd_list
)
345 atomic_or(IORING_SQ_NEED_WAKEUP
, &ctx
->rings
->sq_flags
);
347 mutex_unlock(&sqd
->lock
);
349 complete(&sqd
->exited
);
353 void io_sqpoll_wait_sq(struct io_ring_ctx
*ctx
)
358 if (!io_sqring_full(ctx
))
360 prepare_to_wait(&ctx
->sqo_sq_wait
, &wait
, TASK_INTERRUPTIBLE
);
362 if (!io_sqring_full(ctx
))
365 } while (!signal_pending(current
));
367 finish_wait(&ctx
->sqo_sq_wait
, &wait
);
370 __cold
int io_sq_offload_create(struct io_ring_ctx
*ctx
,
371 struct io_uring_params
*p
)
375 /* Retain compatibility with failing for an invalid attach attempt */
376 if ((ctx
->flags
& (IORING_SETUP_ATTACH_WQ
| IORING_SETUP_SQPOLL
)) ==
377 IORING_SETUP_ATTACH_WQ
) {
383 if (!io_is_uring_fops(f
.file
)) {
389 if (ctx
->flags
& IORING_SETUP_SQPOLL
) {
390 struct task_struct
*tsk
;
391 struct io_sq_data
*sqd
;
394 ret
= security_uring_sqpoll();
398 sqd
= io_get_sq_data(p
, &attached
);
404 ctx
->sq_creds
= get_current_cred();
406 ctx
->sq_thread_idle
= msecs_to_jiffies(p
->sq_thread_idle
);
407 if (!ctx
->sq_thread_idle
)
408 ctx
->sq_thread_idle
= HZ
;
410 io_sq_thread_park(sqd
);
411 list_add(&ctx
->sqd_list
, &sqd
->ctx_list
);
412 io_sqd_update_thread_idle(sqd
);
413 /* don't attach to a dying SQPOLL thread, would be racy */
414 ret
= (attached
&& !sqd
->thread
) ? -ENXIO
: 0;
415 io_sq_thread_unpark(sqd
);
422 if (p
->flags
& IORING_SETUP_SQ_AFF
) {
423 int cpu
= p
->sq_thread_cpu
;
426 if (cpu
>= nr_cpu_ids
|| !cpu_online(cpu
))
433 sqd
->task_pid
= current
->pid
;
434 sqd
->task_tgid
= current
->tgid
;
435 tsk
= create_io_thread(io_sq_thread
, sqd
, NUMA_NO_NODE
);
442 ret
= io_uring_alloc_task_context(tsk
, ctx
);
443 wake_up_new_task(tsk
);
446 } else if (p
->flags
& IORING_SETUP_SQ_AFF
) {
447 /* Can't have SQ_AFF without SQPOLL */
454 complete(&ctx
->sq_data
->exited
);
456 io_sq_thread_finish(ctx
);
460 __cold
int io_sqpoll_wq_cpu_affinity(struct io_ring_ctx
*ctx
,
463 struct io_sq_data
*sqd
= ctx
->sq_data
;
467 io_sq_thread_park(sqd
);
468 /* Don't set affinity for a dying thread */
470 ret
= io_wq_cpu_affinity(sqd
->thread
->io_uring
, mask
);
471 io_sq_thread_unpark(sqd
);