2019-10-22 19:25:58 +03:00
// SPDX-License-Identifier: GPL-2.0
/*
* Basic worker thread pool for io_uring
*
* Copyright ( C ) 2019 Jens Axboe
*
*/
# include <linux/kernel.h>
# include <linux/init.h>
# include <linux/errno.h>
# include <linux/sched/signal.h>
# include <linux/mm.h>
# include <linux/mmu_context.h>
# include <linux/sched/mm.h>
# include <linux/percpu.h>
# include <linux/slab.h>
# include <linux/kthread.h>
# include <linux/rculist_nulls.h>
# include "io-wq.h"
# define WORKER_IDLE_TIMEOUT (5 * HZ)
enum {
IO_WORKER_F_UP = 1 , /* up and active */
IO_WORKER_F_RUNNING = 2 , /* account as running */
IO_WORKER_F_FREE = 4 , /* worker on free list */
IO_WORKER_F_EXITING = 8 , /* worker exiting */
IO_WORKER_F_FIXED = 16 , /* static idle worker */
2019-11-07 21:41:16 +03:00
IO_WORKER_F_BOUND = 32 , /* is doing bounded work */
2019-10-22 19:25:58 +03:00
} ;
enum {
IO_WQ_BIT_EXIT = 0 , /* wq exiting */
IO_WQ_BIT_CANCEL = 1 , /* cancel work on list */
2019-11-19 18:37:07 +03:00
IO_WQ_BIT_ERROR = 2 , /* error on setup */
2019-10-22 19:25:58 +03:00
} ;
enum {
IO_WQE_FLAG_STALLED = 1 , /* stalled on hash */
} ;
/*
* One for each thread in a wqe pool
*/
struct io_worker {
refcount_t ref ;
unsigned flags ;
struct hlist_nulls_node nulls_node ;
2019-11-13 23:54:49 +03:00
struct list_head all_list ;
2019-10-22 19:25:58 +03:00
struct task_struct * task ;
wait_queue_head_t wait ;
struct io_wqe * wqe ;
2019-11-13 19:43:34 +03:00
2019-10-22 19:25:58 +03:00
struct io_wq_work * cur_work ;
2019-11-13 19:43:34 +03:00
spinlock_t lock ;
2019-10-22 19:25:58 +03:00
struct rcu_head rcu ;
struct mm_struct * mm ;
2019-10-24 21:39:47 +03:00
struct files_struct * restore_files ;
2019-10-22 19:25:58 +03:00
} ;
# if BITS_PER_LONG == 64
# define IO_WQ_HASH_ORDER 6
# else
# define IO_WQ_HASH_ORDER 5
# endif
2019-11-07 21:41:16 +03:00
struct io_wqe_acct {
unsigned nr_workers ;
unsigned max_workers ;
atomic_t nr_running ;
} ;
enum {
IO_WQ_ACCT_BOUND ,
IO_WQ_ACCT_UNBOUND ,
} ;
2019-10-22 19:25:58 +03:00
/*
* Per - node worker thread pool
*/
struct io_wqe {
struct {
spinlock_t lock ;
struct list_head work_list ;
unsigned long hash_map ;
unsigned flags ;
} ____cacheline_aligned_in_smp ;
int node ;
2019-11-07 21:41:16 +03:00
struct io_wqe_acct acct [ 2 ] ;
2019-10-22 19:25:58 +03:00
2019-11-14 18:00:41 +03:00
struct hlist_nulls_head free_list ;
struct hlist_nulls_head busy_list ;
2019-11-13 23:54:49 +03:00
struct list_head all_list ;
2019-10-22 19:25:58 +03:00
struct io_wq * wq ;
} ;
/*
* Per io_wq state
*/
struct io_wq {
struct io_wqe * * wqes ;
unsigned long state ;
unsigned nr_wqes ;
2019-11-13 08:31:31 +03:00
get_work_fn * get_work ;
put_work_fn * put_work ;
2019-10-22 19:25:58 +03:00
struct task_struct * manager ;
2019-11-07 21:41:16 +03:00
struct user_struct * user ;
2019-10-22 19:25:58 +03:00
struct mm_struct * mm ;
refcount_t refs ;
struct completion done ;
} ;
static bool io_worker_get ( struct io_worker * worker )
{
return refcount_inc_not_zero ( & worker - > ref ) ;
}
static void io_worker_release ( struct io_worker * worker )
{
if ( refcount_dec_and_test ( & worker - > ref ) )
wake_up_process ( worker - > task ) ;
}
/*
* Note : drops the wqe - > lock if returning true ! The caller must re - acquire
* the lock in that case . Some callers need to restart handling if this
* happens , so we can ' t just re - acquire the lock on behalf of the caller .
*/
static bool __io_worker_unuse ( struct io_wqe * wqe , struct io_worker * worker )
{
2019-10-24 21:39:47 +03:00
bool dropped_lock = false ;
if ( current - > files ! = worker - > restore_files ) {
__acquire ( & wqe - > lock ) ;
spin_unlock_irq ( & wqe - > lock ) ;
dropped_lock = true ;
task_lock ( current ) ;
current - > files = worker - > restore_files ;
task_unlock ( current ) ;
}
2019-10-22 19:25:58 +03:00
/*
* If we have an active mm , we need to drop the wq lock before unusing
* it . If we do , return true and let the caller retry the idle loop .
*/
if ( worker - > mm ) {
2019-10-24 21:39:47 +03:00
if ( ! dropped_lock ) {
__acquire ( & wqe - > lock ) ;
spin_unlock_irq ( & wqe - > lock ) ;
dropped_lock = true ;
}
2019-10-22 19:25:58 +03:00
__set_current_state ( TASK_RUNNING ) ;
set_fs ( KERNEL_DS ) ;
unuse_mm ( worker - > mm ) ;
mmput ( worker - > mm ) ;
worker - > mm = NULL ;
}
2019-10-24 21:39:47 +03:00
return dropped_lock ;
2019-10-22 19:25:58 +03:00
}
2019-11-07 21:41:16 +03:00
static inline struct io_wqe_acct * io_work_get_acct ( struct io_wqe * wqe ,
struct io_wq_work * work )
{
if ( work - > flags & IO_WQ_WORK_UNBOUND )
return & wqe - > acct [ IO_WQ_ACCT_UNBOUND ] ;
return & wqe - > acct [ IO_WQ_ACCT_BOUND ] ;
}
static inline struct io_wqe_acct * io_wqe_get_acct ( struct io_wqe * wqe ,
struct io_worker * worker )
{
if ( worker - > flags & IO_WORKER_F_BOUND )
return & wqe - > acct [ IO_WQ_ACCT_BOUND ] ;
return & wqe - > acct [ IO_WQ_ACCT_UNBOUND ] ;
}
2019-10-22 19:25:58 +03:00
static void io_worker_exit ( struct io_worker * worker )
{
struct io_wqe * wqe = worker - > wqe ;
2019-11-07 21:41:16 +03:00
struct io_wqe_acct * acct = io_wqe_get_acct ( wqe , worker ) ;
unsigned nr_workers ;
2019-10-22 19:25:58 +03:00
/*
* If we ' re not at zero , someone else is holding a brief reference
* to the worker . Wait for that to go away .
*/
set_current_state ( TASK_INTERRUPTIBLE ) ;
if ( ! refcount_dec_and_test ( & worker - > ref ) )
schedule ( ) ;
__set_current_state ( TASK_RUNNING ) ;
preempt_disable ( ) ;
current - > flags & = ~ PF_IO_WORKER ;
if ( worker - > flags & IO_WORKER_F_RUNNING )
2019-11-07 21:41:16 +03:00
atomic_dec ( & acct - > nr_running ) ;
if ( ! ( worker - > flags & IO_WORKER_F_BOUND ) )
atomic_dec ( & wqe - > wq - > user - > processes ) ;
2019-10-22 19:25:58 +03:00
worker - > flags = 0 ;
preempt_enable ( ) ;
spin_lock_irq ( & wqe - > lock ) ;
hlist_nulls_del_rcu ( & worker - > nulls_node ) ;
2019-11-13 23:54:49 +03:00
list_del_rcu ( & worker - > all_list ) ;
2019-10-22 19:25:58 +03:00
if ( __io_worker_unuse ( wqe , worker ) ) {
__release ( & wqe - > lock ) ;
spin_lock_irq ( & wqe - > lock ) ;
}
2019-11-07 21:41:16 +03:00
acct - > nr_workers - - ;
nr_workers = wqe - > acct [ IO_WQ_ACCT_BOUND ] . nr_workers +
wqe - > acct [ IO_WQ_ACCT_UNBOUND ] . nr_workers ;
2019-10-22 19:25:58 +03:00
spin_unlock_irq ( & wqe - > lock ) ;
/* all workers gone, wq exit can proceed */
2019-11-07 21:41:16 +03:00
if ( ! nr_workers & & refcount_dec_and_test ( & wqe - > wq - > refs ) )
2019-10-22 19:25:58 +03:00
complete ( & wqe - > wq - > done ) ;
2019-11-02 10:55:01 +03:00
kfree_rcu ( worker , rcu ) ;
2019-10-22 19:25:58 +03:00
}
2019-11-07 21:41:16 +03:00
static inline bool io_wqe_run_queue ( struct io_wqe * wqe )
__must_hold ( wqe - > lock )
{
if ( ! list_empty ( & wqe - > work_list ) & & ! ( wqe - > flags & IO_WQE_FLAG_STALLED ) )
return true ;
return false ;
}
/*
* Check head of free list for an available worker . If one isn ' t available ,
* caller must wake up the wq manager to create one .
*/
static bool io_wqe_activate_free_worker ( struct io_wqe * wqe )
__must_hold ( RCU )
{
struct hlist_nulls_node * n ;
struct io_worker * worker ;
2019-11-14 18:00:41 +03:00
n = rcu_dereference ( hlist_nulls_first_rcu ( & wqe - > free_list ) ) ;
2019-11-07 21:41:16 +03:00
if ( is_a_nulls ( n ) )
return false ;
worker = hlist_nulls_entry ( n , struct io_worker , nulls_node ) ;
if ( io_worker_get ( worker ) ) {
wake_up ( & worker - > wait ) ;
io_worker_release ( worker ) ;
return true ;
}
return false ;
}
/*
* We need a worker . If we find a free one , we ' re good . If not , and we ' re
* below the max number of workers , wake up the manager to create one .
*/
static void io_wqe_wake_worker ( struct io_wqe * wqe , struct io_wqe_acct * acct )
{
bool ret ;
/*
* Most likely an attempt to queue unbounded work on an io_wq that
* wasn ' t setup with any unbounded workers .
*/
WARN_ON_ONCE ( ! acct - > max_workers ) ;
rcu_read_lock ( ) ;
ret = io_wqe_activate_free_worker ( wqe ) ;
rcu_read_unlock ( ) ;
if ( ! ret & & acct - > nr_workers < acct - > max_workers )
wake_up_process ( wqe - > wq - > manager ) ;
}
static void io_wqe_inc_running ( struct io_wqe * wqe , struct io_worker * worker )
{
struct io_wqe_acct * acct = io_wqe_get_acct ( wqe , worker ) ;
atomic_inc ( & acct - > nr_running ) ;
}
static void io_wqe_dec_running ( struct io_wqe * wqe , struct io_worker * worker )
__must_hold ( wqe - > lock )
{
struct io_wqe_acct * acct = io_wqe_get_acct ( wqe , worker ) ;
if ( atomic_dec_and_test ( & acct - > nr_running ) & & io_wqe_run_queue ( wqe ) )
io_wqe_wake_worker ( wqe , acct ) ;
}
2019-10-22 19:25:58 +03:00
static void io_worker_start ( struct io_wqe * wqe , struct io_worker * worker )
{
allow_kernel_signal ( SIGINT ) ;
current - > flags | = PF_IO_WORKER ;
worker - > flags | = ( IO_WORKER_F_UP | IO_WORKER_F_RUNNING ) ;
2019-10-24 21:39:47 +03:00
worker - > restore_files = current - > files ;
2019-11-07 21:41:16 +03:00
io_wqe_inc_running ( wqe , worker ) ;
2019-10-22 19:25:58 +03:00
}
/*
* Worker will start processing some work . Move it to the busy list , if
* it ' s currently on the freelist
*/
static void __io_worker_busy ( struct io_wqe * wqe , struct io_worker * worker ,
struct io_wq_work * work )
__must_hold ( wqe - > lock )
{
2019-11-07 21:41:16 +03:00
bool worker_bound , work_bound ;
2019-10-22 19:25:58 +03:00
if ( worker - > flags & IO_WORKER_F_FREE ) {
worker - > flags & = ~ IO_WORKER_F_FREE ;
hlist_nulls_del_init_rcu ( & worker - > nulls_node ) ;
2019-11-14 18:00:41 +03:00
hlist_nulls_add_head_rcu ( & worker - > nulls_node , & wqe - > busy_list ) ;
2019-10-22 19:25:58 +03:00
}
2019-11-07 21:41:16 +03:00
/*
* If worker is moving from bound to unbound ( or vice versa ) , then
* ensure we update the running accounting .
*/
worker_bound = ( worker - > flags & IO_WORKER_F_BOUND ) ! = 0 ;
work_bound = ( work - > flags & IO_WQ_WORK_UNBOUND ) = = 0 ;
if ( worker_bound ! = work_bound ) {
io_wqe_dec_running ( wqe , worker ) ;
if ( work_bound ) {
worker - > flags | = IO_WORKER_F_BOUND ;
wqe - > acct [ IO_WQ_ACCT_UNBOUND ] . nr_workers - - ;
wqe - > acct [ IO_WQ_ACCT_BOUND ] . nr_workers + + ;
atomic_dec ( & wqe - > wq - > user - > processes ) ;
} else {
worker - > flags & = ~ IO_WORKER_F_BOUND ;
wqe - > acct [ IO_WQ_ACCT_UNBOUND ] . nr_workers + + ;
wqe - > acct [ IO_WQ_ACCT_BOUND ] . nr_workers - - ;
atomic_inc ( & wqe - > wq - > user - > processes ) ;
}
io_wqe_inc_running ( wqe , worker ) ;
}
2019-10-22 19:25:58 +03:00
}
/*
* No work , worker going to sleep . Move to freelist , and unuse mm if we
* have one attached . Dropping the mm may potentially sleep , so we drop
* the lock in that case and return success . Since the caller has to
* retry the loop in that case ( we changed task state ) , we don ' t regrab
* the lock if we return success .
*/
static bool __io_worker_idle ( struct io_wqe * wqe , struct io_worker * worker )
__must_hold ( wqe - > lock )
{
if ( ! ( worker - > flags & IO_WORKER_F_FREE ) ) {
worker - > flags | = IO_WORKER_F_FREE ;
hlist_nulls_del_init_rcu ( & worker - > nulls_node ) ;
2019-11-14 18:00:41 +03:00
hlist_nulls_add_head_rcu ( & worker - > nulls_node , & wqe - > free_list ) ;
2019-10-22 19:25:58 +03:00
}
return __io_worker_unuse ( wqe , worker ) ;
}
static struct io_wq_work * io_get_next_work ( struct io_wqe * wqe , unsigned * hash )
__must_hold ( wqe - > lock )
{
struct io_wq_work * work ;
list_for_each_entry ( work , & wqe - > work_list , list ) {
/* not hashed, can run anytime */
if ( ! ( work - > flags & IO_WQ_WORK_HASHED ) ) {
list_del ( & work - > list ) ;
return work ;
}
/* hashed, can run if not already running */
* hash = work - > flags > > IO_WQ_HASH_SHIFT ;
if ( ! ( wqe - > hash_map & BIT_ULL ( * hash ) ) ) {
wqe - > hash_map | = BIT_ULL ( * hash ) ;
list_del ( & work - > list ) ;
return work ;
}
}
return NULL ;
}
static void io_worker_handle_work ( struct io_worker * worker )
__releases ( wqe - > lock )
{
2019-11-13 08:31:31 +03:00
struct io_wq_work * work , * old_work = NULL , * put_work = NULL ;
2019-10-22 19:25:58 +03:00
struct io_wqe * wqe = worker - > wqe ;
struct io_wq * wq = wqe - > wq ;
do {
unsigned hash = - 1U ;
/*
* If we got some work , mark us as busy . If we didn ' t , but
* the list isn ' t empty , it means we stalled on hashed work .
* Mark us stalled so we don ' t keep looking for work when we
* can ' t make progress , any work completion or insertion will
* clear the stalled flag .
*/
work = io_get_next_work ( wqe , & hash ) ;
if ( work )
__io_worker_busy ( wqe , worker , work ) ;
else if ( ! list_empty ( & wqe - > work_list ) )
wqe - > flags | = IO_WQE_FLAG_STALLED ;
spin_unlock_irq ( & wqe - > lock ) ;
2019-11-13 08:31:31 +03:00
if ( put_work & & wq - > put_work )
wq - > put_work ( old_work ) ;
2019-10-22 19:25:58 +03:00
if ( ! work )
break ;
next :
2019-11-13 19:43:34 +03:00
/* flush any pending signals before assigning new work */
if ( signal_pending ( current ) )
flush_signals ( current ) ;
spin_lock_irq ( & worker - > lock ) ;
worker - > cur_work = work ;
spin_unlock_irq ( & worker - > lock ) ;
2019-10-24 21:39:47 +03:00
if ( ( work - > flags & IO_WQ_WORK_NEEDS_FILES ) & &
current - > files ! = work - > files ) {
task_lock ( current ) ;
current - > files = work - > files ;
task_unlock ( current ) ;
}
2019-10-22 19:25:58 +03:00
if ( ( work - > flags & IO_WQ_WORK_NEEDS_USER ) & & ! worker - > mm & &
wq - > mm & & mmget_not_zero ( wq - > mm ) ) {
use_mm ( wq - > mm ) ;
set_fs ( USER_DS ) ;
worker - > mm = wq - > mm ;
}
if ( test_bit ( IO_WQ_BIT_CANCEL , & wq - > state ) )
work - > flags | = IO_WQ_WORK_CANCEL ;
if ( worker - > mm )
work - > flags | = IO_WQ_WORK_HAS_MM ;
2019-11-13 08:31:31 +03:00
if ( wq - > get_work & & ! ( work - > flags & IO_WQ_WORK_INTERNAL ) ) {
put_work = work ;
wq - > get_work ( work ) ;
}
2019-10-22 19:25:58 +03:00
old_work = work ;
work - > func ( & work ) ;
2019-11-13 19:43:34 +03:00
spin_lock_irq ( & worker - > lock ) ;
2019-10-22 19:25:58 +03:00
worker - > cur_work = NULL ;
2019-11-13 19:43:34 +03:00
spin_unlock_irq ( & worker - > lock ) ;
spin_lock_irq ( & wqe - > lock ) ;
2019-10-22 19:25:58 +03:00
if ( hash ! = - 1U ) {
wqe - > hash_map & = ~ BIT_ULL ( hash ) ;
wqe - > flags & = ~ IO_WQE_FLAG_STALLED ;
}
if ( work & & work ! = old_work ) {
spin_unlock_irq ( & wqe - > lock ) ;
2019-11-13 08:31:31 +03:00
if ( put_work & & wq - > put_work ) {
wq - > put_work ( put_work ) ;
put_work = NULL ;
}
2019-10-22 19:25:58 +03:00
/* dependent work not hashed */
hash = - 1U ;
goto next ;
}
} while ( 1 ) ;
}
static int io_wqe_worker ( void * data )
{
struct io_worker * worker = data ;
struct io_wqe * wqe = worker - > wqe ;
struct io_wq * wq = wqe - > wq ;
DEFINE_WAIT ( wait ) ;
io_worker_start ( wqe , worker ) ;
while ( ! test_bit ( IO_WQ_BIT_EXIT , & wq - > state ) ) {
prepare_to_wait ( & worker - > wait , & wait , TASK_INTERRUPTIBLE ) ;
spin_lock_irq ( & wqe - > lock ) ;
if ( io_wqe_run_queue ( wqe ) ) {
__set_current_state ( TASK_RUNNING ) ;
io_worker_handle_work ( worker ) ;
continue ;
}
/* drops the lock on success, retry */
if ( __io_worker_idle ( wqe , worker ) ) {
__release ( & wqe - > lock ) ;
continue ;
}
spin_unlock_irq ( & wqe - > lock ) ;
if ( signal_pending ( current ) )
flush_signals ( current ) ;
if ( schedule_timeout ( WORKER_IDLE_TIMEOUT ) )
continue ;
/* timed out, exit unless we're the fixed worker */
if ( test_bit ( IO_WQ_BIT_EXIT , & wq - > state ) | |
! ( worker - > flags & IO_WORKER_F_FIXED ) )
break ;
}
finish_wait ( & worker - > wait , & wait ) ;
if ( test_bit ( IO_WQ_BIT_EXIT , & wq - > state ) ) {
spin_lock_irq ( & wqe - > lock ) ;
if ( ! list_empty ( & wqe - > work_list ) )
io_worker_handle_work ( worker ) ;
else
spin_unlock_irq ( & wqe - > lock ) ;
}
io_worker_exit ( worker ) ;
return 0 ;
}
/*
* Called when a worker is scheduled in . Mark us as currently running .
*/
void io_wq_worker_running ( struct task_struct * tsk )
{
struct io_worker * worker = kthread_data ( tsk ) ;
struct io_wqe * wqe = worker - > wqe ;
if ( ! ( worker - > flags & IO_WORKER_F_UP ) )
return ;
if ( worker - > flags & IO_WORKER_F_RUNNING )
return ;
worker - > flags | = IO_WORKER_F_RUNNING ;
2019-11-07 21:41:16 +03:00
io_wqe_inc_running ( wqe , worker ) ;
2019-10-22 19:25:58 +03:00
}
/*
* Called when worker is going to sleep . If there are no workers currently
* running and we have work pending , wake up a free one or have the manager
* set one up .
*/
void io_wq_worker_sleeping ( struct task_struct * tsk )
{
struct io_worker * worker = kthread_data ( tsk ) ;
struct io_wqe * wqe = worker - > wqe ;
if ( ! ( worker - > flags & IO_WORKER_F_UP ) )
return ;
if ( ! ( worker - > flags & IO_WORKER_F_RUNNING ) )
return ;
worker - > flags & = ~ IO_WORKER_F_RUNNING ;
spin_lock_irq ( & wqe - > lock ) ;
2019-11-07 21:41:16 +03:00
io_wqe_dec_running ( wqe , worker ) ;
2019-10-22 19:25:58 +03:00
spin_unlock_irq ( & wqe - > lock ) ;
}
2019-11-19 18:37:07 +03:00
static bool create_io_worker ( struct io_wq * wq , struct io_wqe * wqe , int index )
2019-10-22 19:25:58 +03:00
{
2019-11-07 21:41:16 +03:00
struct io_wqe_acct * acct = & wqe - > acct [ index ] ;
2019-10-22 19:25:58 +03:00
struct io_worker * worker ;
worker = kcalloc_node ( 1 , sizeof ( * worker ) , GFP_KERNEL , wqe - > node ) ;
if ( ! worker )
2019-11-19 18:37:07 +03:00
return false ;
2019-10-22 19:25:58 +03:00
refcount_set ( & worker - > ref , 1 ) ;
worker - > nulls_node . pprev = NULL ;
init_waitqueue_head ( & worker - > wait ) ;
worker - > wqe = wqe ;
2019-11-13 19:43:34 +03:00
spin_lock_init ( & worker - > lock ) ;
2019-10-22 19:25:58 +03:00
worker - > task = kthread_create_on_node ( io_wqe_worker , worker , wqe - > node ,
2019-11-07 21:41:16 +03:00
" io_wqe_worker-%d/%d " , index , wqe - > node ) ;
2019-10-22 19:25:58 +03:00
if ( IS_ERR ( worker - > task ) ) {
kfree ( worker ) ;
2019-11-19 18:37:07 +03:00
return false ;
2019-10-22 19:25:58 +03:00
}
spin_lock_irq ( & wqe - > lock ) ;
2019-11-14 18:00:41 +03:00
hlist_nulls_add_head_rcu ( & worker - > nulls_node , & wqe - > free_list ) ;
2019-11-13 23:54:49 +03:00
list_add_tail_rcu ( & worker - > all_list , & wqe - > all_list ) ;
2019-10-22 19:25:58 +03:00
worker - > flags | = IO_WORKER_F_FREE ;
2019-11-07 21:41:16 +03:00
if ( index = = IO_WQ_ACCT_BOUND )
worker - > flags | = IO_WORKER_F_BOUND ;
if ( ! acct - > nr_workers & & ( worker - > flags & IO_WORKER_F_BOUND ) )
2019-10-22 19:25:58 +03:00
worker - > flags | = IO_WORKER_F_FIXED ;
2019-11-07 21:41:16 +03:00
acct - > nr_workers + + ;
2019-10-22 19:25:58 +03:00
spin_unlock_irq ( & wqe - > lock ) ;
2019-11-07 21:41:16 +03:00
if ( index = = IO_WQ_ACCT_UNBOUND )
atomic_inc ( & wq - > user - > processes ) ;
2019-10-22 19:25:58 +03:00
wake_up_process ( worker - > task ) ;
2019-11-19 18:37:07 +03:00
return true ;
2019-10-22 19:25:58 +03:00
}
2019-11-07 21:41:16 +03:00
static inline bool io_wqe_need_worker ( struct io_wqe * wqe , int index )
2019-10-22 19:25:58 +03:00
__must_hold ( wqe - > lock )
{
2019-11-07 21:41:16 +03:00
struct io_wqe_acct * acct = & wqe - > acct [ index ] ;
2019-10-22 19:25:58 +03:00
2019-11-07 21:41:16 +03:00
/* if we have available workers or no work, no need */
2019-11-14 18:00:41 +03:00
if ( ! hlist_nulls_empty ( & wqe - > free_list ) | | ! io_wqe_run_queue ( wqe ) )
2019-11-07 21:41:16 +03:00
return false ;
return acct - > nr_workers < acct - > max_workers ;
2019-10-22 19:25:58 +03:00
}
/*
* Manager thread . Tasked with creating new workers , if we need them .
*/
static int io_wq_manager ( void * data )
{
struct io_wq * wq = data ;
2019-11-19 18:37:07 +03:00
int i ;
2019-10-22 19:25:58 +03:00
2019-11-19 18:37:07 +03:00
/* create fixed workers */
refcount_set ( & wq - > refs , wq - > nr_wqes ) ;
for ( i = 0 ; i < wq - > nr_wqes ; i + + ) {
if ( create_io_worker ( wq , wq - > wqes [ i ] , IO_WQ_ACCT_BOUND ) )
continue ;
goto err ;
}
2019-10-22 19:25:58 +03:00
2019-11-19 18:37:07 +03:00
complete ( & wq - > done ) ;
while ( ! kthread_should_stop ( ) ) {
2019-10-22 19:25:58 +03:00
for ( i = 0 ; i < wq - > nr_wqes ; i + + ) {
struct io_wqe * wqe = wq - > wqes [ i ] ;
2019-11-07 21:41:16 +03:00
bool fork_worker [ 2 ] = { false , false } ;
2019-10-22 19:25:58 +03:00
spin_lock_irq ( & wqe - > lock ) ;
2019-11-07 21:41:16 +03:00
if ( io_wqe_need_worker ( wqe , IO_WQ_ACCT_BOUND ) )
fork_worker [ IO_WQ_ACCT_BOUND ] = true ;
if ( io_wqe_need_worker ( wqe , IO_WQ_ACCT_UNBOUND ) )
fork_worker [ IO_WQ_ACCT_UNBOUND ] = true ;
2019-10-22 19:25:58 +03:00
spin_unlock_irq ( & wqe - > lock ) ;
2019-11-07 21:41:16 +03:00
if ( fork_worker [ IO_WQ_ACCT_BOUND ] )
create_io_worker ( wq , wqe , IO_WQ_ACCT_BOUND ) ;
if ( fork_worker [ IO_WQ_ACCT_UNBOUND ] )
create_io_worker ( wq , wqe , IO_WQ_ACCT_UNBOUND ) ;
2019-10-22 19:25:58 +03:00
}
set_current_state ( TASK_INTERRUPTIBLE ) ;
schedule_timeout ( HZ ) ;
}
return 0 ;
2019-11-19 18:37:07 +03:00
err :
set_bit ( IO_WQ_BIT_ERROR , & wq - > state ) ;
set_bit ( IO_WQ_BIT_EXIT , & wq - > state ) ;
if ( refcount_sub_and_test ( wq - > nr_wqes - i , & wq - > refs ) )
complete ( & wq - > done ) ;
return 0 ;
2019-10-22 19:25:58 +03:00
}
2019-11-07 21:41:16 +03:00
static bool io_wq_can_queue ( struct io_wqe * wqe , struct io_wqe_acct * acct ,
struct io_wq_work * work )
{
bool free_worker ;
if ( ! ( work - > flags & IO_WQ_WORK_UNBOUND ) )
return true ;
if ( atomic_read ( & acct - > nr_running ) )
return true ;
rcu_read_lock ( ) ;
2019-11-14 18:00:41 +03:00
free_worker = ! hlist_nulls_empty ( & wqe - > free_list ) ;
2019-11-07 21:41:16 +03:00
rcu_read_unlock ( ) ;
if ( free_worker )
return true ;
if ( atomic_read ( & wqe - > wq - > user - > processes ) > = acct - > max_workers & &
! ( capable ( CAP_SYS_RESOURCE ) | | capable ( CAP_SYS_ADMIN ) ) )
return false ;
return true ;
}
2019-10-22 19:25:58 +03:00
static void io_wqe_enqueue ( struct io_wqe * wqe , struct io_wq_work * work )
{
2019-11-07 21:41:16 +03:00
struct io_wqe_acct * acct = io_work_get_acct ( wqe , work ) ;
2019-10-22 19:25:58 +03:00
unsigned long flags ;
2019-11-07 21:41:16 +03:00
/*
* Do early check to see if we need a new unbound worker , and if we do ,
* if we ' re allowed to do so . This isn ' t 100 % accurate as there ' s a
* gap between this check and incrementing the value , but that ' s OK .
* It ' s close enough to not be an issue , fork ( ) has the same delay .
*/
if ( unlikely ( ! io_wq_can_queue ( wqe , acct , work ) ) ) {
work - > flags | = IO_WQ_WORK_CANCEL ;
work - > func ( & work ) ;
return ;
}
2019-10-22 19:25:58 +03:00
spin_lock_irqsave ( & wqe - > lock , flags ) ;
list_add_tail ( & work - > list , & wqe - > work_list ) ;
wqe - > flags & = ~ IO_WQE_FLAG_STALLED ;
spin_unlock_irqrestore ( & wqe - > lock , flags ) ;
2019-11-07 21:41:16 +03:00
if ( ! atomic_read ( & acct - > nr_running ) )
io_wqe_wake_worker ( wqe , acct ) ;
2019-10-22 19:25:58 +03:00
}
void io_wq_enqueue ( struct io_wq * wq , struct io_wq_work * work )
{
struct io_wqe * wqe = wq - > wqes [ numa_node_id ( ) ] ;
io_wqe_enqueue ( wqe , work ) ;
}
/*
* Enqueue work , hashed by some key . Work items that hash to the same value
* will not be done in parallel . Used to limit concurrent writes , generally
* hashed by inode .
*/
void io_wq_enqueue_hashed ( struct io_wq * wq , struct io_wq_work * work , void * val )
{
struct io_wqe * wqe = wq - > wqes [ numa_node_id ( ) ] ;
unsigned bit ;
bit = hash_ptr ( val , IO_WQ_HASH_ORDER ) ;
work - > flags | = ( IO_WQ_WORK_HASHED | ( bit < < IO_WQ_HASH_SHIFT ) ) ;
io_wqe_enqueue ( wqe , work ) ;
}
static bool io_wqe_worker_send_sig ( struct io_worker * worker , void * data )
{
send_sig ( SIGINT , worker - > task , 1 ) ;
return false ;
}
/*
* Iterate the passed in list and call the specific function for each
* worker that isn ' t exiting
*/
static bool io_wq_for_each_worker ( struct io_wqe * wqe ,
bool ( * func ) ( struct io_worker * , void * ) ,
void * data )
{
struct io_worker * worker ;
bool ret = false ;
2019-11-13 23:54:49 +03:00
list_for_each_entry_rcu ( worker , & wqe - > all_list , all_list ) {
2019-10-22 19:25:58 +03:00
if ( io_worker_get ( worker ) ) {
ret = func ( worker , data ) ;
io_worker_release ( worker ) ;
if ( ret )
break ;
}
}
2019-11-13 23:54:49 +03:00
2019-10-22 19:25:58 +03:00
return ret ;
}
void io_wq_cancel_all ( struct io_wq * wq )
{
int i ;
set_bit ( IO_WQ_BIT_CANCEL , & wq - > state ) ;
/*
* Browse both lists , as there ' s a gap between handing work off
* to a worker and the worker putting itself on the busy_list
*/
rcu_read_lock ( ) ;
for ( i = 0 ; i < wq - > nr_wqes ; i + + ) {
struct io_wqe * wqe = wq - > wqes [ i ] ;
2019-11-13 23:54:49 +03:00
io_wq_for_each_worker ( wqe , io_wqe_worker_send_sig , NULL ) ;
2019-10-22 19:25:58 +03:00
}
rcu_read_unlock ( ) ;
}
2019-10-29 06:49:21 +03:00
struct io_cb_cancel_data {
struct io_wqe * wqe ;
work_cancel_fn * cancel ;
void * caller_data ;
} ;
static bool io_work_cancel ( struct io_worker * worker , void * cancel_data )
{
struct io_cb_cancel_data * data = cancel_data ;
2019-11-05 23:51:51 +03:00
unsigned long flags ;
2019-10-29 06:49:21 +03:00
bool ret = false ;
/*
* Hold the lock to avoid - > cur_work going out of scope , caller
2019-11-13 19:43:34 +03:00
* may dereference the passed in work .
2019-10-29 06:49:21 +03:00
*/
2019-11-13 19:43:34 +03:00
spin_lock_irqsave ( & worker - > lock , flags ) ;
2019-10-29 06:49:21 +03:00
if ( worker - > cur_work & &
data - > cancel ( worker - > cur_work , data - > caller_data ) ) {
send_sig ( SIGINT , worker - > task , 1 ) ;
ret = true ;
}
2019-11-13 19:43:34 +03:00
spin_unlock_irqrestore ( & worker - > lock , flags ) ;
2019-10-29 06:49:21 +03:00
return ret ;
}
static enum io_wq_cancel io_wqe_cancel_cb_work ( struct io_wqe * wqe ,
work_cancel_fn * cancel ,
void * cancel_data )
{
struct io_cb_cancel_data data = {
. wqe = wqe ,
. cancel = cancel ,
. caller_data = cancel_data ,
} ;
struct io_wq_work * work ;
2019-11-05 23:51:51 +03:00
unsigned long flags ;
2019-10-29 06:49:21 +03:00
bool found = false ;
2019-11-05 23:51:51 +03:00
spin_lock_irqsave ( & wqe - > lock , flags ) ;
2019-10-29 06:49:21 +03:00
list_for_each_entry ( work , & wqe - > work_list , list ) {
if ( cancel ( work , cancel_data ) ) {
list_del ( & work - > list ) ;
found = true ;
break ;
}
}
2019-11-05 23:51:51 +03:00
spin_unlock_irqrestore ( & wqe - > lock , flags ) ;
2019-10-29 06:49:21 +03:00
if ( found ) {
work - > flags | = IO_WQ_WORK_CANCEL ;
work - > func ( & work ) ;
return IO_WQ_CANCEL_OK ;
}
rcu_read_lock ( ) ;
2019-11-13 23:54:49 +03:00
found = io_wq_for_each_worker ( wqe , io_work_cancel , & data ) ;
2019-10-29 06:49:21 +03:00
rcu_read_unlock ( ) ;
return found ? IO_WQ_CANCEL_RUNNING : IO_WQ_CANCEL_NOTFOUND ;
}
enum io_wq_cancel io_wq_cancel_cb ( struct io_wq * wq , work_cancel_fn * cancel ,
void * data )
{
enum io_wq_cancel ret = IO_WQ_CANCEL_NOTFOUND ;
int i ;
for ( i = 0 ; i < wq - > nr_wqes ; i + + ) {
struct io_wqe * wqe = wq - > wqes [ i ] ;
ret = io_wqe_cancel_cb_work ( wqe , cancel , data ) ;
if ( ret ! = IO_WQ_CANCEL_NOTFOUND )
break ;
}
return ret ;
}
2019-10-22 19:25:58 +03:00
static bool io_wq_worker_cancel ( struct io_worker * worker , void * data )
{
struct io_wq_work * work = data ;
2019-11-13 19:43:34 +03:00
unsigned long flags ;
bool ret = false ;
2019-10-22 19:25:58 +03:00
2019-11-13 19:43:34 +03:00
if ( worker - > cur_work ! = work )
return false ;
spin_lock_irqsave ( & worker - > lock , flags ) ;
2019-10-22 19:25:58 +03:00
if ( worker - > cur_work = = work ) {
send_sig ( SIGINT , worker - > task , 1 ) ;
2019-11-13 19:43:34 +03:00
ret = true ;
2019-10-22 19:25:58 +03:00
}
2019-11-13 19:43:34 +03:00
spin_unlock_irqrestore ( & worker - > lock , flags ) ;
2019-10-22 19:25:58 +03:00
2019-11-13 19:43:34 +03:00
return ret ;
2019-10-22 19:25:58 +03:00
}
static enum io_wq_cancel io_wqe_cancel_work ( struct io_wqe * wqe ,
struct io_wq_work * cwork )
{
struct io_wq_work * work ;
2019-11-05 23:51:51 +03:00
unsigned long flags ;
2019-10-22 19:25:58 +03:00
bool found = false ;
cwork - > flags | = IO_WQ_WORK_CANCEL ;
/*
* First check pending list , if we ' re lucky we can just remove it
* from there . CANCEL_OK means that the work is returned as - new ,
* no completion will be posted for it .
*/
2019-11-05 23:51:51 +03:00
spin_lock_irqsave ( & wqe - > lock , flags ) ;
2019-10-22 19:25:58 +03:00
list_for_each_entry ( work , & wqe - > work_list , list ) {
if ( work = = cwork ) {
list_del ( & work - > list ) ;
found = true ;
break ;
}
}
2019-11-05 23:51:51 +03:00
spin_unlock_irqrestore ( & wqe - > lock , flags ) ;
2019-10-22 19:25:58 +03:00
if ( found ) {
work - > flags | = IO_WQ_WORK_CANCEL ;
work - > func ( & work ) ;
return IO_WQ_CANCEL_OK ;
}
/*
* Now check if a free ( going busy ) or busy worker has the work
* currently running . If we find it there , we ' ll return CANCEL_RUNNING
* as an indication that we attempte to signal cancellation . The
* completion will run normally in this case .
*/
rcu_read_lock ( ) ;
2019-11-13 23:54:49 +03:00
found = io_wq_for_each_worker ( wqe , io_wq_worker_cancel , cwork ) ;
2019-10-22 19:25:58 +03:00
rcu_read_unlock ( ) ;
return found ? IO_WQ_CANCEL_RUNNING : IO_WQ_CANCEL_NOTFOUND ;
}
enum io_wq_cancel io_wq_cancel_work ( struct io_wq * wq , struct io_wq_work * cwork )
{
enum io_wq_cancel ret = IO_WQ_CANCEL_NOTFOUND ;
int i ;
for ( i = 0 ; i < wq - > nr_wqes ; i + + ) {
struct io_wqe * wqe = wq - > wqes [ i ] ;
ret = io_wqe_cancel_work ( wqe , cwork ) ;
if ( ret ! = IO_WQ_CANCEL_NOTFOUND )
break ;
}
return ret ;
}
struct io_wq_flush_data {
struct io_wq_work work ;
struct completion done ;
} ;
static void io_wq_flush_func ( struct io_wq_work * * workptr )
{
struct io_wq_work * work = * workptr ;
struct io_wq_flush_data * data ;
data = container_of ( work , struct io_wq_flush_data , work ) ;
complete ( & data - > done ) ;
}
/*
* Doesn ' t wait for previously queued work to finish . When this completes ,
* it just means that previously queued work was started .
*/
void io_wq_flush ( struct io_wq * wq )
{
struct io_wq_flush_data data ;
int i ;
for ( i = 0 ; i < wq - > nr_wqes ; i + + ) {
struct io_wqe * wqe = wq - > wqes [ i ] ;
init_completion ( & data . done ) ;
INIT_IO_WORK ( & data . work , io_wq_flush_func ) ;
2019-11-13 08:31:31 +03:00
data . work . flags | = IO_WQ_WORK_INTERNAL ;
2019-10-22 19:25:58 +03:00
io_wqe_enqueue ( wqe , & data . work ) ;
wait_for_completion ( & data . done ) ;
}
}
2019-11-07 21:41:16 +03:00
struct io_wq * io_wq_create ( unsigned bounded , struct mm_struct * mm ,
2019-11-13 08:31:31 +03:00
struct user_struct * user , get_work_fn * get_work ,
put_work_fn * put_work )
2019-10-22 19:25:58 +03:00
{
int ret = - ENOMEM , i , node ;
struct io_wq * wq ;
wq = kcalloc ( 1 , sizeof ( * wq ) , GFP_KERNEL ) ;
if ( ! wq )
return ERR_PTR ( - ENOMEM ) ;
wq - > nr_wqes = num_online_nodes ( ) ;
wq - > wqes = kcalloc ( wq - > nr_wqes , sizeof ( struct io_wqe * ) , GFP_KERNEL ) ;
if ( ! wq - > wqes ) {
kfree ( wq ) ;
return ERR_PTR ( - ENOMEM ) ;
}
2019-11-13 08:31:31 +03:00
wq - > get_work = get_work ;
wq - > put_work = put_work ;
2019-11-07 21:41:16 +03:00
/* caller must already hold a reference to this */
wq - > user = user ;
2019-10-22 19:25:58 +03:00
i = 0 ;
for_each_online_node ( node ) {
struct io_wqe * wqe ;
wqe = kcalloc_node ( 1 , sizeof ( struct io_wqe ) , GFP_KERNEL , node ) ;
if ( ! wqe )
break ;
wq - > wqes [ i ] = wqe ;
wqe - > node = node ;
2019-11-07 21:41:16 +03:00
wqe - > acct [ IO_WQ_ACCT_BOUND ] . max_workers = bounded ;
atomic_set ( & wqe - > acct [ IO_WQ_ACCT_BOUND ] . nr_running , 0 ) ;
if ( user ) {
wqe - > acct [ IO_WQ_ACCT_UNBOUND ] . max_workers =
task_rlimit ( current , RLIMIT_NPROC ) ;
}
atomic_set ( & wqe - > acct [ IO_WQ_ACCT_UNBOUND ] . nr_running , 0 ) ;
2019-10-22 19:25:58 +03:00
wqe - > node = node ;
wqe - > wq = wq ;
spin_lock_init ( & wqe - > lock ) ;
INIT_LIST_HEAD ( & wqe - > work_list ) ;
2019-11-14 18:00:41 +03:00
INIT_HLIST_NULLS_HEAD ( & wqe - > free_list , 0 ) ;
INIT_HLIST_NULLS_HEAD ( & wqe - > busy_list , 1 ) ;
2019-11-13 23:54:49 +03:00
INIT_LIST_HEAD ( & wqe - > all_list ) ;
2019-10-22 19:25:58 +03:00
i + + ;
}
init_completion ( & wq - > done ) ;
if ( i ! = wq - > nr_wqes )
goto err ;
/* caller must have already done mmgrab() on this mm */
wq - > mm = mm ;
wq - > manager = kthread_create ( io_wq_manager , wq , " io_wq_manager " ) ;
if ( ! IS_ERR ( wq - > manager ) ) {
wake_up_process ( wq - > manager ) ;
2019-11-19 18:37:07 +03:00
wait_for_completion ( & wq - > done ) ;
if ( test_bit ( IO_WQ_BIT_ERROR , & wq - > state ) ) {
ret = - ENOMEM ;
goto err ;
}
reinit_completion ( & wq - > done ) ;
2019-10-22 19:25:58 +03:00
return wq ;
}
ret = PTR_ERR ( wq - > manager ) ;
complete ( & wq - > done ) ;
2019-11-19 18:37:07 +03:00
err :
for ( i = 0 ; i < wq - > nr_wqes ; i + + )
kfree ( wq - > wqes [ i ] ) ;
kfree ( wq - > wqes ) ;
kfree ( wq ) ;
2019-10-22 19:25:58 +03:00
return ERR_PTR ( ret ) ;
}
static bool io_wq_worker_wake ( struct io_worker * worker , void * data )
{
wake_up_process ( worker - > task ) ;
return false ;
}
void io_wq_destroy ( struct io_wq * wq )
{
int i ;
2019-11-19 18:37:07 +03:00
set_bit ( IO_WQ_BIT_EXIT , & wq - > state ) ;
if ( wq - > manager )
2019-10-22 19:25:58 +03:00
kthread_stop ( wq - > manager ) ;
rcu_read_lock ( ) ;
for ( i = 0 ; i < wq - > nr_wqes ; i + + ) {
struct io_wqe * wqe = wq - > wqes [ i ] ;
if ( ! wqe )
continue ;
2019-11-13 23:54:49 +03:00
io_wq_for_each_worker ( wqe , io_wq_worker_wake , NULL ) ;
2019-10-22 19:25:58 +03:00
}
rcu_read_unlock ( ) ;
wait_for_completion ( & wq - > done ) ;
for ( i = 0 ; i < wq - > nr_wqes ; i + + )
kfree ( wq - > wqes [ i ] ) ;
kfree ( wq - > wqes ) ;
kfree ( wq ) ;
}