1 // SPDX-License-Identifier: GPL-2.0
3 * Contains the core associated with submission side polling of the SQ
4 * ring, offloading submissions from the application to a kernel thread.
6 #include <linux/kernel.h>
7 #include <linux/errno.h>
8 #include <linux/file.h>
10 #include <linux/slab.h>
11 #include <linux/audit.h>
12 #include <linux/security.h>
13 #include <linux/io_uring.h>
15 #include <uapi/linux/io_uring.h>
20 #define IORING_SQPOLL_CAP_ENTRIES_VALUE 8
23 IO_SQ_THREAD_SHOULD_STOP
= 0,
24 IO_SQ_THREAD_SHOULD_PARK
,
27 void io_sq_thread_unpark(struct io_sq_data
*sqd
)
28 __releases(&sqd
->lock
)
30 WARN_ON_ONCE(sqd
->thread
== current
);
33 * Do the dance but not conditional clear_bit() because it'd race with
34 * other threads incrementing park_pending and setting the bit.
36 clear_bit(IO_SQ_THREAD_SHOULD_PARK
, &sqd
->state
);
37 if (atomic_dec_return(&sqd
->park_pending
))
38 set_bit(IO_SQ_THREAD_SHOULD_PARK
, &sqd
->state
);
39 mutex_unlock(&sqd
->lock
);
42 void io_sq_thread_park(struct io_sq_data
*sqd
)
43 __acquires(&sqd
->lock
)
45 WARN_ON_ONCE(sqd
->thread
== current
);
47 atomic_inc(&sqd
->park_pending
);
48 set_bit(IO_SQ_THREAD_SHOULD_PARK
, &sqd
->state
);
49 mutex_lock(&sqd
->lock
);
51 wake_up_process(sqd
->thread
);
54 void io_sq_thread_stop(struct io_sq_data
*sqd
)
56 WARN_ON_ONCE(sqd
->thread
== current
);
57 WARN_ON_ONCE(test_bit(IO_SQ_THREAD_SHOULD_STOP
, &sqd
->state
));
59 set_bit(IO_SQ_THREAD_SHOULD_STOP
, &sqd
->state
);
60 mutex_lock(&sqd
->lock
);
62 wake_up_process(sqd
->thread
);
63 mutex_unlock(&sqd
->lock
);
64 wait_for_completion(&sqd
->exited
);
67 void io_put_sq_data(struct io_sq_data
*sqd
)
69 if (refcount_dec_and_test(&sqd
->refs
)) {
70 WARN_ON_ONCE(atomic_read(&sqd
->park_pending
));
72 io_sq_thread_stop(sqd
);
77 static __cold
void io_sqd_update_thread_idle(struct io_sq_data
*sqd
)
79 struct io_ring_ctx
*ctx
;
80 unsigned sq_thread_idle
= 0;
82 list_for_each_entry(ctx
, &sqd
->ctx_list
, sqd_list
)
83 sq_thread_idle
= max(sq_thread_idle
, ctx
->sq_thread_idle
);
84 sqd
->sq_thread_idle
= sq_thread_idle
;
87 void io_sq_thread_finish(struct io_ring_ctx
*ctx
)
89 struct io_sq_data
*sqd
= ctx
->sq_data
;
92 io_sq_thread_park(sqd
);
93 list_del_init(&ctx
->sqd_list
);
94 io_sqd_update_thread_idle(sqd
);
95 io_sq_thread_unpark(sqd
);
102 static struct io_sq_data
*io_attach_sq_data(struct io_uring_params
*p
)
104 struct io_ring_ctx
*ctx_attach
;
105 struct io_sq_data
*sqd
;
110 return ERR_PTR(-ENXIO
);
111 if (!io_is_uring_fops(f
.file
)) {
113 return ERR_PTR(-EINVAL
);
116 ctx_attach
= f
.file
->private_data
;
117 sqd
= ctx_attach
->sq_data
;
120 return ERR_PTR(-EINVAL
);
122 if (sqd
->task_tgid
!= current
->tgid
) {
124 return ERR_PTR(-EPERM
);
127 refcount_inc(&sqd
->refs
);
132 static struct io_sq_data
*io_get_sq_data(struct io_uring_params
*p
,
135 struct io_sq_data
*sqd
;
138 if (p
->flags
& IORING_SETUP_ATTACH_WQ
) {
139 sqd
= io_attach_sq_data(p
);
144 /* fall through for EPERM case, setup new sqd/task */
145 if (PTR_ERR(sqd
) != -EPERM
)
149 sqd
= kzalloc(sizeof(*sqd
), GFP_KERNEL
);
151 return ERR_PTR(-ENOMEM
);
153 atomic_set(&sqd
->park_pending
, 0);
154 refcount_set(&sqd
->refs
, 1);
155 INIT_LIST_HEAD(&sqd
->ctx_list
);
156 mutex_init(&sqd
->lock
);
157 init_waitqueue_head(&sqd
->wait
);
158 init_completion(&sqd
->exited
);
162 static inline bool io_sqd_events_pending(struct io_sq_data
*sqd
)
164 return READ_ONCE(sqd
->state
);
167 static int __io_sq_thread(struct io_ring_ctx
*ctx
, bool cap_entries
)
169 unsigned int to_submit
;
172 to_submit
= io_sqring_entries(ctx
);
173 /* if we're handling multiple rings, cap submit size for fairness */
174 if (cap_entries
&& to_submit
> IORING_SQPOLL_CAP_ENTRIES_VALUE
)
175 to_submit
= IORING_SQPOLL_CAP_ENTRIES_VALUE
;
177 if (!wq_list_empty(&ctx
->iopoll_list
) || to_submit
) {
178 const struct cred
*creds
= NULL
;
180 if (ctx
->sq_creds
!= current_cred())
181 creds
= override_creds(ctx
->sq_creds
);
183 mutex_lock(&ctx
->uring_lock
);
184 if (!wq_list_empty(&ctx
->iopoll_list
))
185 io_do_iopoll(ctx
, true);
188 * Don't submit if refs are dying, good for io_uring_register(),
189 * but also it is relied upon by io_ring_exit_work()
191 if (to_submit
&& likely(!percpu_ref_is_dying(&ctx
->refs
)) &&
192 !(ctx
->flags
& IORING_SETUP_R_DISABLED
))
193 ret
= io_submit_sqes(ctx
, to_submit
);
194 mutex_unlock(&ctx
->uring_lock
);
196 if (to_submit
&& wq_has_sleeper(&ctx
->sqo_sq_wait
))
197 wake_up(&ctx
->sqo_sq_wait
);
205 static bool io_sqd_handle_event(struct io_sq_data
*sqd
)
207 bool did_sig
= false;
210 if (test_bit(IO_SQ_THREAD_SHOULD_PARK
, &sqd
->state
) ||
211 signal_pending(current
)) {
212 mutex_unlock(&sqd
->lock
);
213 if (signal_pending(current
))
214 did_sig
= get_signal(&ksig
);
216 mutex_lock(&sqd
->lock
);
217 sqd
->sq_cpu
= raw_smp_processor_id();
219 return did_sig
|| test_bit(IO_SQ_THREAD_SHOULD_STOP
, &sqd
->state
);
222 static int io_sq_thread(void *data
)
224 struct io_sq_data
*sqd
= data
;
225 struct io_ring_ctx
*ctx
;
226 unsigned long timeout
= 0;
227 char buf
[TASK_COMM_LEN
];
230 snprintf(buf
, sizeof(buf
), "iou-sqp-%d", sqd
->task_pid
);
231 set_task_comm(current
, buf
);
233 /* reset to our pid after we've set task_comm, for fdinfo */
234 sqd
->task_pid
= current
->pid
;
236 if (sqd
->sq_cpu
!= -1) {
237 set_cpus_allowed_ptr(current
, cpumask_of(sqd
->sq_cpu
));
239 set_cpus_allowed_ptr(current
, cpu_online_mask
);
240 sqd
->sq_cpu
= raw_smp_processor_id();
243 mutex_lock(&sqd
->lock
);
245 bool cap_entries
, sqt_spin
= false;
247 if (io_sqd_events_pending(sqd
) || signal_pending(current
)) {
248 if (io_sqd_handle_event(sqd
))
250 timeout
= jiffies
+ sqd
->sq_thread_idle
;
253 cap_entries
= !list_is_singular(&sqd
->ctx_list
);
254 list_for_each_entry(ctx
, &sqd
->ctx_list
, sqd_list
) {
255 int ret
= __io_sq_thread(ctx
, cap_entries
);
257 if (!sqt_spin
&& (ret
> 0 || !wq_list_empty(&ctx
->iopoll_list
)))
260 if (io_run_task_work())
263 if (sqt_spin
|| !time_after(jiffies
, timeout
)) {
265 timeout
= jiffies
+ sqd
->sq_thread_idle
;
266 if (unlikely(need_resched())) {
267 mutex_unlock(&sqd
->lock
);
269 mutex_lock(&sqd
->lock
);
270 sqd
->sq_cpu
= raw_smp_processor_id();
275 prepare_to_wait(&sqd
->wait
, &wait
, TASK_INTERRUPTIBLE
);
276 if (!io_sqd_events_pending(sqd
) && !task_work_pending(current
)) {
277 bool needs_sched
= true;
279 list_for_each_entry(ctx
, &sqd
->ctx_list
, sqd_list
) {
280 atomic_or(IORING_SQ_NEED_WAKEUP
,
281 &ctx
->rings
->sq_flags
);
282 if ((ctx
->flags
& IORING_SETUP_IOPOLL
) &&
283 !wq_list_empty(&ctx
->iopoll_list
)) {
289 * Ensure the store of the wakeup flag is not
290 * reordered with the load of the SQ tail
292 smp_mb__after_atomic();
294 if (io_sqring_entries(ctx
)) {
301 mutex_unlock(&sqd
->lock
);
303 mutex_lock(&sqd
->lock
);
304 sqd
->sq_cpu
= raw_smp_processor_id();
306 list_for_each_entry(ctx
, &sqd
->ctx_list
, sqd_list
)
307 atomic_andnot(IORING_SQ_NEED_WAKEUP
,
308 &ctx
->rings
->sq_flags
);
311 finish_wait(&sqd
->wait
, &wait
);
312 timeout
= jiffies
+ sqd
->sq_thread_idle
;
315 io_uring_cancel_generic(true, sqd
);
317 list_for_each_entry(ctx
, &sqd
->ctx_list
, sqd_list
)
318 atomic_or(IORING_SQ_NEED_WAKEUP
, &ctx
->rings
->sq_flags
);
320 mutex_unlock(&sqd
->lock
);
322 complete(&sqd
->exited
);
326 void io_sqpoll_wait_sq(struct io_ring_ctx
*ctx
)
331 if (!io_sqring_full(ctx
))
333 prepare_to_wait(&ctx
->sqo_sq_wait
, &wait
, TASK_INTERRUPTIBLE
);
335 if (!io_sqring_full(ctx
))
338 } while (!signal_pending(current
));
340 finish_wait(&ctx
->sqo_sq_wait
, &wait
);
343 __cold
int io_sq_offload_create(struct io_ring_ctx
*ctx
,
344 struct io_uring_params
*p
)
348 /* Retain compatibility with failing for an invalid attach attempt */
349 if ((ctx
->flags
& (IORING_SETUP_ATTACH_WQ
| IORING_SETUP_SQPOLL
)) ==
350 IORING_SETUP_ATTACH_WQ
) {
356 if (!io_is_uring_fops(f
.file
)) {
362 if (ctx
->flags
& IORING_SETUP_SQPOLL
) {
363 struct task_struct
*tsk
;
364 struct io_sq_data
*sqd
;
367 ret
= security_uring_sqpoll();
371 sqd
= io_get_sq_data(p
, &attached
);
377 ctx
->sq_creds
= get_current_cred();
379 ctx
->sq_thread_idle
= msecs_to_jiffies(p
->sq_thread_idle
);
380 if (!ctx
->sq_thread_idle
)
381 ctx
->sq_thread_idle
= HZ
;
383 io_sq_thread_park(sqd
);
384 list_add(&ctx
->sqd_list
, &sqd
->ctx_list
);
385 io_sqd_update_thread_idle(sqd
);
386 /* don't attach to a dying SQPOLL thread, would be racy */
387 ret
= (attached
&& !sqd
->thread
) ? -ENXIO
: 0;
388 io_sq_thread_unpark(sqd
);
395 if (p
->flags
& IORING_SETUP_SQ_AFF
) {
396 int cpu
= p
->sq_thread_cpu
;
399 if (cpu
>= nr_cpu_ids
|| !cpu_online(cpu
))
406 sqd
->task_pid
= current
->pid
;
407 sqd
->task_tgid
= current
->tgid
;
408 tsk
= create_io_thread(io_sq_thread
, sqd
, NUMA_NO_NODE
);
415 ret
= io_uring_alloc_task_context(tsk
, ctx
);
416 wake_up_new_task(tsk
);
419 } else if (p
->flags
& IORING_SETUP_SQ_AFF
) {
420 /* Can't have SQ_AFF without SQPOLL */
427 complete(&ctx
->sq_data
->exited
);
429 io_sq_thread_finish(ctx
);
433 __cold
int io_sqpoll_wq_cpu_affinity(struct io_ring_ctx
*ctx
,
436 struct io_sq_data
*sqd
= ctx
->sq_data
;
440 io_sq_thread_park(sqd
);
441 /* Don't set affinity for a dying thread */
443 ret
= io_wq_cpu_affinity(sqd
->thread
->io_uring
, mask
);
444 io_sq_thread_unpark(sqd
);