io_uring: add IORING_SETUP_DEFER_TASKRUN
Allow deferring async tasks until the user calls io_uring_enter(2) with the IORING_ENTER_GETEVENTS flag. Enable this mode with a flag at io_uring_setup time. This functionality requires that the later io_uring_enter will be called from the same submission task, and therefore restrict this flag to work only when IORING_SETUP_SINGLE_ISSUER is also set. Being able to hand pick when tasks are run prevents the problem where there is current work to be done, however task work runs anyway. For example, a common workload would obtain a batch of CQEs, and process each one. Interrupting this to additional taskwork would add latency but not gain anything. If instead task work is deferred to just before more CQEs are obtained then no additional latency is added. The way this is implemented is by trying to keep task work local to a io_ring_ctx, rather than to the submission task. This is required, as the application will want to wake up only a single io_ring_ctx at a time to process work, and so the lists of work have to be kept separate. This has some other benefits like not having to check the task continually in handle_tw_list (and potentially unlocking/locking those), and reducing locks in the submit & process completions path. There are networking cases where using this option can reduce request latency by 50%. For example a contrived example using [1] where the client sends 2k data and receives the same data back while doing some system calls (to trigger task work) shows this reduction. The reason ends up being that if sending responses is delayed by processing task work, then the client side sits idle. Whereas reordering the sends first means that the client runs it's workload in parallel with the local task work. [1]: Using https://github.com/DylanZA/netbench/tree/defer_run Client: ./netbench --client_only 1 --control_port 10000 --host <host> --tx "epoll --threads 16 --per_thread 1 --size 2048 --resp 2048 --workload 1000" Server: ./netbench --server_only 1 --control_port 10000 --rx "io_uring --defer_taskrun 0 --workload 100" --rx "io_uring --defer_taskrun 1 --workload 100" Signed-off-by: Dylan Yudaken <dylany@fb.com> Link: https://lore.kernel.org/r/20220830125013.570060-5-dylany@fb.com Signed-off-by: Jens Axboe <axboe@kernel.dk>
This commit is contained in:
parent
2327337b88
commit
c0e0d6ba25
@ -301,6 +301,8 @@ struct io_ring_ctx {
|
||||
struct io_hash_table cancel_table;
|
||||
bool poll_multi_queue;
|
||||
|
||||
struct llist_head work_llist;
|
||||
|
||||
struct list_head io_buffers_comp;
|
||||
} ____cacheline_aligned_in_smp;
|
||||
|
||||
|
@ -157,6 +157,13 @@ enum {
|
||||
*/
|
||||
#define IORING_SETUP_SINGLE_ISSUER (1U << 12)
|
||||
|
||||
/*
|
||||
* Defer running task work to get events.
|
||||
* Rather than running bits of task work whenever the task transitions
|
||||
* try to do it just before it is needed.
|
||||
*/
|
||||
#define IORING_SETUP_DEFER_TASKRUN (1U << 13)
|
||||
|
||||
enum io_uring_op {
|
||||
IORING_OP_NOP,
|
||||
IORING_OP_READV,
|
||||
|
@ -292,7 +292,7 @@ int io_sync_cancel(struct io_ring_ctx *ctx, void __user *arg)
|
||||
break;
|
||||
|
||||
mutex_unlock(&ctx->uring_lock);
|
||||
ret = io_run_task_work_sig();
|
||||
ret = io_run_task_work_sig(ctx);
|
||||
if (ret < 0) {
|
||||
mutex_lock(&ctx->uring_lock);
|
||||
break;
|
||||
|
@ -142,7 +142,7 @@ static bool io_uring_try_cancel_requests(struct io_ring_ctx *ctx,
|
||||
static void io_dismantle_req(struct io_kiocb *req);
|
||||
static void io_clean_op(struct io_kiocb *req);
|
||||
static void io_queue_sqe(struct io_kiocb *req);
|
||||
|
||||
static void io_move_task_work_from_local(struct io_ring_ctx *ctx);
|
||||
static void __io_submit_flush_completions(struct io_ring_ctx *ctx);
|
||||
|
||||
static struct kmem_cache *req_cachep;
|
||||
@ -316,6 +316,7 @@ static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
|
||||
INIT_LIST_HEAD(&ctx->rsrc_ref_list);
|
||||
INIT_DELAYED_WORK(&ctx->rsrc_put_work, io_rsrc_put_work);
|
||||
init_llist_head(&ctx->rsrc_put_llist);
|
||||
init_llist_head(&ctx->work_llist);
|
||||
INIT_LIST_HEAD(&ctx->tctx_list);
|
||||
ctx->submit_state.free_list.next = NULL;
|
||||
INIT_WQ_LIST(&ctx->locked_free_list);
|
||||
@ -1047,12 +1048,36 @@ void tctx_task_work(struct callback_head *cb)
|
||||
trace_io_uring_task_work_run(tctx, count, loops);
|
||||
}
|
||||
|
||||
void io_req_task_work_add(struct io_kiocb *req)
|
||||
static void io_req_local_work_add(struct io_kiocb *req)
|
||||
{
|
||||
struct io_ring_ctx *ctx = req->ctx;
|
||||
|
||||
if (!llist_add(&req->io_task_work.node, &ctx->work_llist))
|
||||
return;
|
||||
|
||||
if (unlikely(atomic_read(&req->task->io_uring->in_idle))) {
|
||||
io_move_task_work_from_local(ctx);
|
||||
return;
|
||||
}
|
||||
|
||||
if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
|
||||
atomic_or(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
|
||||
|
||||
io_cqring_wake(ctx);
|
||||
|
||||
}
|
||||
|
||||
static inline void __io_req_task_work_add(struct io_kiocb *req, bool allow_local)
|
||||
{
|
||||
struct io_uring_task *tctx = req->task->io_uring;
|
||||
struct io_ring_ctx *ctx = req->ctx;
|
||||
struct llist_node *node;
|
||||
|
||||
if (allow_local && ctx->flags & IORING_SETUP_DEFER_TASKRUN) {
|
||||
io_req_local_work_add(req);
|
||||
return;
|
||||
}
|
||||
|
||||
/* task_work already pending, we're done */
|
||||
if (!llist_add(&req->io_task_work.node, &tctx->task_list))
|
||||
return;
|
||||
@ -1074,6 +1099,73 @@ void io_req_task_work_add(struct io_kiocb *req)
|
||||
}
|
||||
}
|
||||
|
||||
void io_req_task_work_add(struct io_kiocb *req)
|
||||
{
|
||||
__io_req_task_work_add(req, true);
|
||||
}
|
||||
|
||||
static void __cold io_move_task_work_from_local(struct io_ring_ctx *ctx)
|
||||
{
|
||||
struct llist_node *node;
|
||||
|
||||
node = llist_del_all(&ctx->work_llist);
|
||||
while (node) {
|
||||
struct io_kiocb *req = container_of(node, struct io_kiocb,
|
||||
io_task_work.node);
|
||||
|
||||
node = node->next;
|
||||
__io_req_task_work_add(req, false);
|
||||
}
|
||||
}
|
||||
|
||||
int io_run_local_work(struct io_ring_ctx *ctx)
|
||||
{
|
||||
bool locked;
|
||||
struct llist_node *node;
|
||||
struct llist_node fake;
|
||||
struct llist_node *current_final = NULL;
|
||||
int ret;
|
||||
|
||||
if (unlikely(ctx->submitter_task != current)) {
|
||||
/* maybe this is before any submissions */
|
||||
if (!ctx->submitter_task)
|
||||
return 0;
|
||||
|
||||
return -EEXIST;
|
||||
}
|
||||
|
||||
locked = mutex_trylock(&ctx->uring_lock);
|
||||
|
||||
node = io_llist_xchg(&ctx->work_llist, &fake);
|
||||
ret = 0;
|
||||
again:
|
||||
while (node != current_final) {
|
||||
struct llist_node *next = node->next;
|
||||
struct io_kiocb *req = container_of(node, struct io_kiocb,
|
||||
io_task_work.node);
|
||||
prefetch(container_of(next, struct io_kiocb, io_task_work.node));
|
||||
req->io_task_work.func(req, &locked);
|
||||
ret++;
|
||||
node = next;
|
||||
}
|
||||
|
||||
if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
|
||||
atomic_andnot(IORING_SQ_TASKRUN, &ctx->rings->sq_flags);
|
||||
|
||||
node = io_llist_cmpxchg(&ctx->work_llist, &fake, NULL);
|
||||
if (node != &fake) {
|
||||
current_final = &fake;
|
||||
node = io_llist_xchg(&ctx->work_llist, &fake);
|
||||
goto again;
|
||||
}
|
||||
|
||||
if (locked) {
|
||||
io_submit_flush_completions(ctx);
|
||||
mutex_unlock(&ctx->uring_lock);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
static void io_req_tw_post(struct io_kiocb *req, bool *locked)
|
||||
{
|
||||
io_req_complete_post(req);
|
||||
@ -1285,8 +1377,10 @@ static int io_iopoll_check(struct io_ring_ctx *ctx, long min)
|
||||
u32 tail = ctx->cached_cq_tail;
|
||||
|
||||
mutex_unlock(&ctx->uring_lock);
|
||||
io_run_task_work();
|
||||
ret = io_run_task_work_ctx(ctx);
|
||||
mutex_lock(&ctx->uring_lock);
|
||||
if (ret < 0)
|
||||
break;
|
||||
|
||||
/* some requests don't go through iopoll_list */
|
||||
if (tail != ctx->cached_cq_tail ||
|
||||
@ -2148,7 +2242,9 @@ struct io_wait_queue {
|
||||
|
||||
static inline bool io_has_work(struct io_ring_ctx *ctx)
|
||||
{
|
||||
return test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq);
|
||||
return test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq) ||
|
||||
((ctx->flags & IORING_SETUP_DEFER_TASKRUN) &&
|
||||
!llist_empty(&ctx->work_llist));
|
||||
}
|
||||
|
||||
static inline bool io_should_wake(struct io_wait_queue *iowq)
|
||||
@ -2180,9 +2276,9 @@ static int io_wake_function(struct wait_queue_entry *curr, unsigned int mode,
|
||||
return -1;
|
||||
}
|
||||
|
||||
int io_run_task_work_sig(void)
|
||||
int io_run_task_work_sig(struct io_ring_ctx *ctx)
|
||||
{
|
||||
if (io_run_task_work())
|
||||
if (io_run_task_work_ctx(ctx) > 0)
|
||||
return 1;
|
||||
if (task_sigpending(current))
|
||||
return -EINTR;
|
||||
@ -2198,7 +2294,7 @@ static inline int io_cqring_wait_schedule(struct io_ring_ctx *ctx,
|
||||
unsigned long check_cq;
|
||||
|
||||
/* make sure we run task_work before checking for signals */
|
||||
ret = io_run_task_work_sig();
|
||||
ret = io_run_task_work_sig(ctx);
|
||||
if (ret || io_should_wake(iowq))
|
||||
return ret;
|
||||
|
||||
@ -2229,12 +2325,14 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
|
||||
int ret;
|
||||
|
||||
do {
|
||||
/* always run at least 1 task work to process local work */
|
||||
ret = io_run_task_work_ctx(ctx);
|
||||
if (ret < 0)
|
||||
return ret;
|
||||
io_cqring_overflow_flush(ctx);
|
||||
if (io_cqring_events(ctx) >= min_events)
|
||||
return 0;
|
||||
if (!io_run_task_work())
|
||||
break;
|
||||
} while (1);
|
||||
} while (ret > 0);
|
||||
|
||||
if (sig) {
|
||||
#ifdef CONFIG_COMPAT
|
||||
@ -2575,6 +2673,9 @@ static __cold void io_ring_exit_work(struct work_struct *work)
|
||||
* as nobody else will be looking for them.
|
||||
*/
|
||||
do {
|
||||
if (ctx->flags & IORING_SETUP_DEFER_TASKRUN)
|
||||
io_move_task_work_from_local(ctx);
|
||||
|
||||
while (io_uring_try_cancel_requests(ctx, NULL, true))
|
||||
cond_resched();
|
||||
|
||||
@ -2769,13 +2870,15 @@ static __cold bool io_uring_try_cancel_requests(struct io_ring_ctx *ctx,
|
||||
}
|
||||
}
|
||||
|
||||
if (ctx->flags & IORING_SETUP_DEFER_TASKRUN)
|
||||
ret |= io_run_local_work(ctx) > 0;
|
||||
ret |= io_cancel_defer_files(ctx, task, cancel_all);
|
||||
mutex_lock(&ctx->uring_lock);
|
||||
ret |= io_poll_remove_all(ctx, task, cancel_all);
|
||||
mutex_unlock(&ctx->uring_lock);
|
||||
ret |= io_kill_timeouts(ctx, task, cancel_all);
|
||||
if (task)
|
||||
ret |= io_run_task_work();
|
||||
ret |= io_run_task_work() > 0;
|
||||
return ret;
|
||||
}
|
||||
|
||||
@ -3060,8 +3163,10 @@ SYSCALL_DEFINE6(io_uring_enter, unsigned int, fd, u32, to_submit,
|
||||
goto iopoll_locked;
|
||||
mutex_unlock(&ctx->uring_lock);
|
||||
}
|
||||
|
||||
if (flags & IORING_ENTER_GETEVENTS) {
|
||||
int ret2;
|
||||
|
||||
if (ctx->syscall_iopoll) {
|
||||
/*
|
||||
* We disallow the app entering submit/complete with
|
||||
@ -3290,17 +3395,29 @@ static __cold int io_uring_create(unsigned entries, struct io_uring_params *p,
|
||||
if (ctx->flags & IORING_SETUP_SQPOLL) {
|
||||
/* IPI related flags don't make sense with SQPOLL */
|
||||
if (ctx->flags & (IORING_SETUP_COOP_TASKRUN |
|
||||
IORING_SETUP_TASKRUN_FLAG))
|
||||
IORING_SETUP_TASKRUN_FLAG |
|
||||
IORING_SETUP_DEFER_TASKRUN))
|
||||
goto err;
|
||||
ctx->notify_method = TWA_SIGNAL_NO_IPI;
|
||||
} else if (ctx->flags & IORING_SETUP_COOP_TASKRUN) {
|
||||
ctx->notify_method = TWA_SIGNAL_NO_IPI;
|
||||
} else {
|
||||
if (ctx->flags & IORING_SETUP_TASKRUN_FLAG)
|
||||
if (ctx->flags & IORING_SETUP_TASKRUN_FLAG &&
|
||||
!(ctx->flags & IORING_SETUP_DEFER_TASKRUN))
|
||||
goto err;
|
||||
ctx->notify_method = TWA_SIGNAL;
|
||||
}
|
||||
|
||||
/*
|
||||
* For DEFER_TASKRUN we require the completion task to be the same as the
|
||||
* submission task. This implies that there is only one submitter, so enforce
|
||||
* that.
|
||||
*/
|
||||
if (ctx->flags & IORING_SETUP_DEFER_TASKRUN &&
|
||||
!(ctx->flags & IORING_SETUP_SINGLE_ISSUER)) {
|
||||
goto err;
|
||||
}
|
||||
|
||||
/*
|
||||
* This is just grabbed for accounting purposes. When a process exits,
|
||||
* the mm is exited and dropped before the files, hence we need to hang
|
||||
@ -3401,7 +3518,7 @@ static long io_uring_setup(u32 entries, struct io_uring_params __user *params)
|
||||
IORING_SETUP_R_DISABLED | IORING_SETUP_SUBMIT_ALL |
|
||||
IORING_SETUP_COOP_TASKRUN | IORING_SETUP_TASKRUN_FLAG |
|
||||
IORING_SETUP_SQE128 | IORING_SETUP_CQE32 |
|
||||
IORING_SETUP_SINGLE_ISSUER))
|
||||
IORING_SETUP_SINGLE_ISSUER | IORING_SETUP_DEFER_TASKRUN))
|
||||
return -EINVAL;
|
||||
|
||||
return io_uring_create(entries, &p, params);
|
||||
@ -3864,7 +3981,7 @@ SYSCALL_DEFINE4(io_uring_register, unsigned int, fd, unsigned int, opcode,
|
||||
|
||||
ctx = f.file->private_data;
|
||||
|
||||
io_run_task_work();
|
||||
io_run_task_work_ctx(ctx);
|
||||
|
||||
mutex_lock(&ctx->uring_lock);
|
||||
ret = __io_uring_register(ctx, opcode, arg, nr_args);
|
||||
|
@ -26,7 +26,8 @@ enum {
|
||||
|
||||
struct io_uring_cqe *__io_get_cqe(struct io_ring_ctx *ctx);
|
||||
bool io_req_cqe_overflow(struct io_kiocb *req);
|
||||
int io_run_task_work_sig(void);
|
||||
int io_run_task_work_sig(struct io_ring_ctx *ctx);
|
||||
int io_run_local_work(struct io_ring_ctx *ctx);
|
||||
void io_req_complete_failed(struct io_kiocb *req, s32 res);
|
||||
void __io_req_complete(struct io_kiocb *req, unsigned issue_flags);
|
||||
void io_req_complete_post(struct io_kiocb *req);
|
||||
@ -221,17 +222,37 @@ static inline unsigned int io_sqring_entries(struct io_ring_ctx *ctx)
|
||||
return smp_load_acquire(&rings->sq.tail) - ctx->cached_sq_head;
|
||||
}
|
||||
|
||||
static inline bool io_run_task_work(void)
|
||||
static inline int io_run_task_work(void)
|
||||
{
|
||||
if (test_thread_flag(TIF_NOTIFY_SIGNAL)) {
|
||||
__set_current_state(TASK_RUNNING);
|
||||
clear_notify_signal();
|
||||
if (task_work_pending(current))
|
||||
task_work_run();
|
||||
return true;
|
||||
return 1;
|
||||
}
|
||||
|
||||
return false;
|
||||
return 0;
|
||||
}
|
||||
|
||||
static inline int io_run_task_work_ctx(struct io_ring_ctx *ctx)
|
||||
{
|
||||
int ret = 0;
|
||||
int ret2;
|
||||
|
||||
if (ctx->flags & IORING_SETUP_DEFER_TASKRUN)
|
||||
ret = io_run_local_work(ctx);
|
||||
|
||||
/* want to run this after in case more is added */
|
||||
ret2 = io_run_task_work();
|
||||
|
||||
/* Try propagate error in favour of if tasks were run,
|
||||
* but still make sure to run them if requested
|
||||
*/
|
||||
if (ret >= 0)
|
||||
ret += ret2;
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
static inline void io_tw_lock(struct io_ring_ctx *ctx, bool *locked)
|
||||
|
@ -341,7 +341,7 @@ __cold static int io_rsrc_ref_quiesce(struct io_rsrc_data *data,
|
||||
flush_delayed_work(&ctx->rsrc_put_work);
|
||||
reinit_completion(&data->done);
|
||||
|
||||
ret = io_run_task_work_sig();
|
||||
ret = io_run_task_work_sig(ctx);
|
||||
mutex_lock(&ctx->uring_lock);
|
||||
} while (ret >= 0);
|
||||
data->quiesce = false;
|
||||
|
Loading…
Reference in New Issue
Block a user