Files
linux/io_uring/wait.c
Jens Axboe 0105b0562a io_uring: split out CQ waiting code into wait.c
Move the completion queue waiting and scheduling code out of io_uring.c
into a dedicated wait.c file. This further removes code out of the
main io_uring C and header file, and into a topical new file.

Signed-off-by: Jens Axboe <axboe@kernel.dk>
2026-01-22 09:21:16 -07:00

309 lines
8.5 KiB
C

// SPDX-License-Identifier: GPL-2.0
/*
* Waiting for completion events
*/
#include <linux/kernel.h>
#include <linux/sched/signal.h>
#include <linux/io_uring.h>
#include <trace/events/io_uring.h>
#include <uapi/linux/io_uring.h>
#include "io_uring.h"
#include "napi.h"
#include "wait.h"
static int io_wake_function(struct wait_queue_entry *curr, unsigned int mode,
int wake_flags, void *key)
{
struct io_wait_queue *iowq = container_of(curr, struct io_wait_queue, wq);
/*
* Cannot safely flush overflowed CQEs from here, ensure we wake up
* the task, and the next invocation will do it.
*/
if (io_should_wake(iowq) || io_has_work(iowq->ctx))
return autoremove_wake_function(curr, mode, wake_flags, key);
return -1;
}
int io_run_task_work_sig(struct io_ring_ctx *ctx)
{
if (io_local_work_pending(ctx)) {
__set_current_state(TASK_RUNNING);
if (io_run_local_work(ctx, INT_MAX, IO_LOCAL_TW_DEFAULT_MAX) > 0)
return 0;
}
if (io_run_task_work() > 0)
return 0;
if (task_sigpending(current))
return -EINTR;
return 0;
}
static bool current_pending_io(void)
{
struct io_uring_task *tctx = current->io_uring;
if (!tctx)
return false;
return percpu_counter_read_positive(&tctx->inflight);
}
static enum hrtimer_restart io_cqring_timer_wakeup(struct hrtimer *timer)
{
struct io_wait_queue *iowq = container_of(timer, struct io_wait_queue, t);
WRITE_ONCE(iowq->hit_timeout, 1);
iowq->min_timeout = 0;
wake_up_process(iowq->wq.private);
return HRTIMER_NORESTART;
}
/*
* Doing min_timeout portion. If we saw any timeouts, events, or have work,
* wake up. If not, and we have a normal timeout, switch to that and keep
* sleeping.
*/
static enum hrtimer_restart io_cqring_min_timer_wakeup(struct hrtimer *timer)
{
struct io_wait_queue *iowq = container_of(timer, struct io_wait_queue, t);
struct io_ring_ctx *ctx = iowq->ctx;
/* no general timeout, or shorter (or equal), we are done */
if (iowq->timeout == KTIME_MAX ||
ktime_compare(iowq->min_timeout, iowq->timeout) >= 0)
goto out_wake;
/* work we may need to run, wake function will see if we need to wake */
if (io_has_work(ctx))
goto out_wake;
/* got events since we started waiting, min timeout is done */
if (iowq->cq_min_tail != READ_ONCE(ctx->rings->cq.tail))
goto out_wake;
/* if we have any events and min timeout expired, we're done */
if (io_cqring_events(ctx))
goto out_wake;
/*
* If using deferred task_work running and application is waiting on
* more than one request, ensure we reset it now where we are switching
* to normal sleeps. Any request completion post min_wait should wake
* the task and return.
*/
if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) {
atomic_set(&ctx->cq_wait_nr, 1);
smp_mb();
if (!llist_empty(&ctx->work_llist))
goto out_wake;
}
/* any generated CQE posted past this time should wake us up */
iowq->cq_tail = iowq->cq_min_tail;
hrtimer_update_function(&iowq->t, io_cqring_timer_wakeup);
hrtimer_set_expires(timer, iowq->timeout);
return HRTIMER_RESTART;
out_wake:
return io_cqring_timer_wakeup(timer);
}
static int io_cqring_schedule_timeout(struct io_wait_queue *iowq,
clockid_t clock_id, ktime_t start_time)
{
ktime_t timeout;
if (iowq->min_timeout) {
timeout = ktime_add_ns(iowq->min_timeout, start_time);
hrtimer_setup_on_stack(&iowq->t, io_cqring_min_timer_wakeup, clock_id,
HRTIMER_MODE_ABS);
} else {
timeout = iowq->timeout;
hrtimer_setup_on_stack(&iowq->t, io_cqring_timer_wakeup, clock_id,
HRTIMER_MODE_ABS);
}
hrtimer_set_expires_range_ns(&iowq->t, timeout, 0);
hrtimer_start_expires(&iowq->t, HRTIMER_MODE_ABS);
if (!READ_ONCE(iowq->hit_timeout))
schedule();
hrtimer_cancel(&iowq->t);
destroy_hrtimer_on_stack(&iowq->t);
__set_current_state(TASK_RUNNING);
return READ_ONCE(iowq->hit_timeout) ? -ETIME : 0;
}
static int __io_cqring_wait_schedule(struct io_ring_ctx *ctx,
struct io_wait_queue *iowq,
struct ext_arg *ext_arg,
ktime_t start_time)
{
int ret = 0;
/*
* Mark us as being in io_wait if we have pending requests, so cpufreq
* can take into account that the task is waiting for IO - turns out
* to be important for low QD IO.
*/
if (ext_arg->iowait && current_pending_io())
current->in_iowait = 1;
if (iowq->timeout != KTIME_MAX || iowq->min_timeout)
ret = io_cqring_schedule_timeout(iowq, ctx->clockid, start_time);
else
schedule();
current->in_iowait = 0;
return ret;
}
/* If this returns > 0, the caller should retry */
static inline int io_cqring_wait_schedule(struct io_ring_ctx *ctx,
struct io_wait_queue *iowq,
struct ext_arg *ext_arg,
ktime_t start_time)
{
if (unlikely(READ_ONCE(ctx->check_cq)))
return 1;
if (unlikely(io_local_work_pending(ctx)))
return 1;
if (unlikely(task_work_pending(current)))
return 1;
if (unlikely(task_sigpending(current)))
return -EINTR;
if (unlikely(io_should_wake(iowq)))
return 0;
return __io_cqring_wait_schedule(ctx, iowq, ext_arg, start_time);
}
/*
* Wait until events become available, if we don't already have some. The
* application must reap them itself, as they reside on the shared cq ring.
*/
int io_cqring_wait(struct io_ring_ctx *ctx, int min_events, u32 flags,
struct ext_arg *ext_arg)
{
struct io_wait_queue iowq;
struct io_rings *rings = ctx->rings;
ktime_t start_time;
int ret;
min_events = min_t(int, min_events, ctx->cq_entries);
if (!io_allowed_run_tw(ctx))
return -EEXIST;
if (io_local_work_pending(ctx))
io_run_local_work(ctx, min_events,
max(IO_LOCAL_TW_DEFAULT_MAX, min_events));
io_run_task_work();
if (unlikely(test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq)))
io_cqring_do_overflow_flush(ctx);
if (__io_cqring_events_user(ctx) >= min_events)
return 0;
init_waitqueue_func_entry(&iowq.wq, io_wake_function);
iowq.wq.private = current;
INIT_LIST_HEAD(&iowq.wq.entry);
iowq.ctx = ctx;
iowq.cq_tail = READ_ONCE(ctx->rings->cq.head) + min_events;
iowq.cq_min_tail = READ_ONCE(ctx->rings->cq.tail);
iowq.nr_timeouts = atomic_read(&ctx->cq_timeouts);
iowq.hit_timeout = 0;
iowq.min_timeout = ext_arg->min_time;
iowq.timeout = KTIME_MAX;
start_time = io_get_time(ctx);
if (ext_arg->ts_set) {
iowq.timeout = timespec64_to_ktime(ext_arg->ts);
if (!(flags & IORING_ENTER_ABS_TIMER))
iowq.timeout = ktime_add(iowq.timeout, start_time);
}
if (ext_arg->sig) {
#ifdef CONFIG_COMPAT
if (in_compat_syscall())
ret = set_compat_user_sigmask((const compat_sigset_t __user *)ext_arg->sig,
ext_arg->argsz);
else
#endif
ret = set_user_sigmask(ext_arg->sig, ext_arg->argsz);
if (ret)
return ret;
}
io_napi_busy_loop(ctx, &iowq);
trace_io_uring_cqring_wait(ctx, min_events);
do {
unsigned long check_cq;
int nr_wait;
/* if min timeout has been hit, don't reset wait count */
if (!iowq.hit_timeout)
nr_wait = (int) iowq.cq_tail -
READ_ONCE(ctx->rings->cq.tail);
else
nr_wait = 1;
if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) {
atomic_set(&ctx->cq_wait_nr, nr_wait);
set_current_state(TASK_INTERRUPTIBLE);
} else {
prepare_to_wait_exclusive(&ctx->cq_wait, &iowq.wq,
TASK_INTERRUPTIBLE);
}
ret = io_cqring_wait_schedule(ctx, &iowq, ext_arg, start_time);
__set_current_state(TASK_RUNNING);
atomic_set(&ctx->cq_wait_nr, IO_CQ_WAKE_INIT);
/*
* Run task_work after scheduling and before io_should_wake().
* If we got woken because of task_work being processed, run it
* now rather than let the caller do another wait loop.
*/
if (io_local_work_pending(ctx))
io_run_local_work(ctx, nr_wait, nr_wait);
io_run_task_work();
/*
* Non-local task_work will be run on exit to userspace, but
* if we're using DEFER_TASKRUN, then we could have waited
* with a timeout for a number of requests. If the timeout
* hits, we could have some requests ready to process. Ensure
* this break is _after_ we have run task_work, to avoid
* deferring running potentially pending requests until the
* next time we wait for events.
*/
if (ret < 0)
break;
check_cq = READ_ONCE(ctx->check_cq);
if (unlikely(check_cq)) {
/* let the caller flush overflows, retry */
if (check_cq & BIT(IO_CHECK_CQ_OVERFLOW_BIT))
io_cqring_do_overflow_flush(ctx);
if (check_cq & BIT(IO_CHECK_CQ_DROPPED_BIT)) {
ret = -EBADR;
break;
}
}
if (io_should_wake(&iowq)) {
ret = 0;
break;
}
cond_resched();
} while (1);
if (!(ctx->flags & IORING_SETUP_DEFER_TASKRUN))
finish_wait(&ctx->cq_wait, &iowq.wq);
restore_saved_sigmask_unless(ret == -EINTR);
return READ_ONCE(rings->cq.head) == READ_ONCE(rings->cq.tail) ? ret : 0;
}