workqueue: reimplement work flushing using linked works
A work is linked to the next one by having WORK_STRUCT_LINKED bit set and these links can be chained. When a linked work is dispatched to a worker, all linked works are dispatched to the worker's newly added ->scheduled queue and processed back-to-back. Currently, as there's only single worker per cwq, having linked works doesn't make any visible behavior difference. This change is to prepare for multiple shared workers per cpu. Signed-off-by: Tejun Heo <tj@kernel.org>
This commit is contained in:
parent
c34056a3fd
commit
affee4b294
@ -24,8 +24,9 @@ typedef void (*work_func_t)(struct work_struct *work);
|
|||||||
|
|
||||||
enum {
|
enum {
|
||||||
WORK_STRUCT_PENDING_BIT = 0, /* work item is pending execution */
|
WORK_STRUCT_PENDING_BIT = 0, /* work item is pending execution */
|
||||||
|
WORK_STRUCT_LINKED_BIT = 1, /* next work is linked to this one */
|
||||||
#ifdef CONFIG_DEBUG_OBJECTS_WORK
|
#ifdef CONFIG_DEBUG_OBJECTS_WORK
|
||||||
WORK_STRUCT_STATIC_BIT = 1, /* static initializer (debugobjects) */
|
WORK_STRUCT_STATIC_BIT = 2, /* static initializer (debugobjects) */
|
||||||
WORK_STRUCT_COLOR_SHIFT = 3, /* color for workqueue flushing */
|
WORK_STRUCT_COLOR_SHIFT = 3, /* color for workqueue flushing */
|
||||||
#else
|
#else
|
||||||
WORK_STRUCT_COLOR_SHIFT = 2, /* color for workqueue flushing */
|
WORK_STRUCT_COLOR_SHIFT = 2, /* color for workqueue flushing */
|
||||||
@ -34,6 +35,7 @@ enum {
|
|||||||
WORK_STRUCT_COLOR_BITS = 4,
|
WORK_STRUCT_COLOR_BITS = 4,
|
||||||
|
|
||||||
WORK_STRUCT_PENDING = 1 << WORK_STRUCT_PENDING_BIT,
|
WORK_STRUCT_PENDING = 1 << WORK_STRUCT_PENDING_BIT,
|
||||||
|
WORK_STRUCT_LINKED = 1 << WORK_STRUCT_LINKED_BIT,
|
||||||
#ifdef CONFIG_DEBUG_OBJECTS_WORK
|
#ifdef CONFIG_DEBUG_OBJECTS_WORK
|
||||||
WORK_STRUCT_STATIC = 1 << WORK_STRUCT_STATIC_BIT,
|
WORK_STRUCT_STATIC = 1 << WORK_STRUCT_STATIC_BIT,
|
||||||
#else
|
#else
|
||||||
|
@ -51,6 +51,7 @@ struct cpu_workqueue_struct;
|
|||||||
|
|
||||||
struct worker {
|
struct worker {
|
||||||
struct work_struct *current_work; /* L: work being processed */
|
struct work_struct *current_work; /* L: work being processed */
|
||||||
|
struct list_head scheduled; /* L: scheduled works */
|
||||||
struct task_struct *task; /* I: worker task */
|
struct task_struct *task; /* I: worker task */
|
||||||
struct cpu_workqueue_struct *cwq; /* I: the associated cwq */
|
struct cpu_workqueue_struct *cwq; /* I: the associated cwq */
|
||||||
int id; /* I: worker id */
|
int id; /* I: worker id */
|
||||||
@ -445,6 +446,8 @@ static struct worker *alloc_worker(void)
|
|||||||
struct worker *worker;
|
struct worker *worker;
|
||||||
|
|
||||||
worker = kzalloc(sizeof(*worker), GFP_KERNEL);
|
worker = kzalloc(sizeof(*worker), GFP_KERNEL);
|
||||||
|
if (worker)
|
||||||
|
INIT_LIST_HEAD(&worker->scheduled);
|
||||||
return worker;
|
return worker;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -530,6 +533,7 @@ static void destroy_worker(struct worker *worker)
|
|||||||
|
|
||||||
/* sanity check frenzy */
|
/* sanity check frenzy */
|
||||||
BUG_ON(worker->current_work);
|
BUG_ON(worker->current_work);
|
||||||
|
BUG_ON(!list_empty(&worker->scheduled));
|
||||||
|
|
||||||
kthread_stop(worker->task);
|
kthread_stop(worker->task);
|
||||||
kfree(worker);
|
kfree(worker);
|
||||||
@ -539,6 +543,47 @@ static void destroy_worker(struct worker *worker)
|
|||||||
spin_unlock(&workqueue_lock);
|
spin_unlock(&workqueue_lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* move_linked_works - move linked works to a list
|
||||||
|
* @work: start of series of works to be scheduled
|
||||||
|
* @head: target list to append @work to
|
||||||
|
* @nextp: out paramter for nested worklist walking
|
||||||
|
*
|
||||||
|
* Schedule linked works starting from @work to @head. Work series to
|
||||||
|
* be scheduled starts at @work and includes any consecutive work with
|
||||||
|
* WORK_STRUCT_LINKED set in its predecessor.
|
||||||
|
*
|
||||||
|
* If @nextp is not NULL, it's updated to point to the next work of
|
||||||
|
* the last scheduled work. This allows move_linked_works() to be
|
||||||
|
* nested inside outer list_for_each_entry_safe().
|
||||||
|
*
|
||||||
|
* CONTEXT:
|
||||||
|
* spin_lock_irq(cwq->lock).
|
||||||
|
*/
|
||||||
|
static void move_linked_works(struct work_struct *work, struct list_head *head,
|
||||||
|
struct work_struct **nextp)
|
||||||
|
{
|
||||||
|
struct work_struct *n;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Linked worklist will always end before the end of the list,
|
||||||
|
* use NULL for list head.
|
||||||
|
*/
|
||||||
|
list_for_each_entry_safe_from(work, n, NULL, entry) {
|
||||||
|
list_move_tail(&work->entry, head);
|
||||||
|
if (!(*work_data_bits(work) & WORK_STRUCT_LINKED))
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* If we're already inside safe list traversal and have moved
|
||||||
|
* multiple works to the scheduled queue, the next position
|
||||||
|
* needs to be updated.
|
||||||
|
*/
|
||||||
|
if (nextp)
|
||||||
|
*nextp = n;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* cwq_dec_nr_in_flight - decrement cwq's nr_in_flight
|
* cwq_dec_nr_in_flight - decrement cwq's nr_in_flight
|
||||||
* @cwq: cwq of interest
|
* @cwq: cwq of interest
|
||||||
@ -639,17 +684,25 @@ static void process_one_work(struct worker *worker, struct work_struct *work)
|
|||||||
cwq_dec_nr_in_flight(cwq, work_color);
|
cwq_dec_nr_in_flight(cwq, work_color);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void run_workqueue(struct worker *worker)
|
/**
|
||||||
|
* process_scheduled_works - process scheduled works
|
||||||
|
* @worker: self
|
||||||
|
*
|
||||||
|
* Process all scheduled works. Please note that the scheduled list
|
||||||
|
* may change while processing a work, so this function repeatedly
|
||||||
|
* fetches a work from the top and executes it.
|
||||||
|
*
|
||||||
|
* CONTEXT:
|
||||||
|
* spin_lock_irq(cwq->lock) which may be released and regrabbed
|
||||||
|
* multiple times.
|
||||||
|
*/
|
||||||
|
static void process_scheduled_works(struct worker *worker)
|
||||||
{
|
{
|
||||||
struct cpu_workqueue_struct *cwq = worker->cwq;
|
while (!list_empty(&worker->scheduled)) {
|
||||||
|
struct work_struct *work = list_first_entry(&worker->scheduled,
|
||||||
spin_lock_irq(&cwq->lock);
|
|
||||||
while (!list_empty(&cwq->worklist)) {
|
|
||||||
struct work_struct *work = list_entry(cwq->worklist.next,
|
|
||||||
struct work_struct, entry);
|
struct work_struct, entry);
|
||||||
process_one_work(worker, work);
|
process_one_work(worker, work);
|
||||||
}
|
}
|
||||||
spin_unlock_irq(&cwq->lock);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -684,7 +737,28 @@ static int worker_thread(void *__worker)
|
|||||||
get_cpu_mask(cwq->cpu))))
|
get_cpu_mask(cwq->cpu))))
|
||||||
set_cpus_allowed_ptr(worker->task,
|
set_cpus_allowed_ptr(worker->task,
|
||||||
get_cpu_mask(cwq->cpu));
|
get_cpu_mask(cwq->cpu));
|
||||||
run_workqueue(worker);
|
|
||||||
|
spin_lock_irq(&cwq->lock);
|
||||||
|
|
||||||
|
while (!list_empty(&cwq->worklist)) {
|
||||||
|
struct work_struct *work =
|
||||||
|
list_first_entry(&cwq->worklist,
|
||||||
|
struct work_struct, entry);
|
||||||
|
|
||||||
|
if (likely(!(*work_data_bits(work) &
|
||||||
|
WORK_STRUCT_LINKED))) {
|
||||||
|
/* optimization path, not strictly necessary */
|
||||||
|
process_one_work(worker, work);
|
||||||
|
if (unlikely(!list_empty(&worker->scheduled)))
|
||||||
|
process_scheduled_works(worker);
|
||||||
|
} else {
|
||||||
|
move_linked_works(work, &worker->scheduled,
|
||||||
|
NULL);
|
||||||
|
process_scheduled_works(worker);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
spin_unlock_irq(&cwq->lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
@ -705,16 +779,33 @@ static void wq_barrier_func(struct work_struct *work)
|
|||||||
* insert_wq_barrier - insert a barrier work
|
* insert_wq_barrier - insert a barrier work
|
||||||
* @cwq: cwq to insert barrier into
|
* @cwq: cwq to insert barrier into
|
||||||
* @barr: wq_barrier to insert
|
* @barr: wq_barrier to insert
|
||||||
* @head: insertion point
|
* @target: target work to attach @barr to
|
||||||
|
* @worker: worker currently executing @target, NULL if @target is not executing
|
||||||
*
|
*
|
||||||
* Insert barrier @barr into @cwq before @head.
|
* @barr is linked to @target such that @barr is completed only after
|
||||||
|
* @target finishes execution. Please note that the ordering
|
||||||
|
* guarantee is observed only with respect to @target and on the local
|
||||||
|
* cpu.
|
||||||
|
*
|
||||||
|
* Currently, a queued barrier can't be canceled. This is because
|
||||||
|
* try_to_grab_pending() can't determine whether the work to be
|
||||||
|
* grabbed is at the head of the queue and thus can't clear LINKED
|
||||||
|
* flag of the previous work while there must be a valid next work
|
||||||
|
* after a work with LINKED flag set.
|
||||||
|
*
|
||||||
|
* Note that when @worker is non-NULL, @target may be modified
|
||||||
|
* underneath us, so we can't reliably determine cwq from @target.
|
||||||
*
|
*
|
||||||
* CONTEXT:
|
* CONTEXT:
|
||||||
* spin_lock_irq(cwq->lock).
|
* spin_lock_irq(cwq->lock).
|
||||||
*/
|
*/
|
||||||
static void insert_wq_barrier(struct cpu_workqueue_struct *cwq,
|
static void insert_wq_barrier(struct cpu_workqueue_struct *cwq,
|
||||||
struct wq_barrier *barr, struct list_head *head)
|
struct wq_barrier *barr,
|
||||||
|
struct work_struct *target, struct worker *worker)
|
||||||
{
|
{
|
||||||
|
struct list_head *head;
|
||||||
|
unsigned int linked = 0;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* debugobject calls are safe here even with cwq->lock locked
|
* debugobject calls are safe here even with cwq->lock locked
|
||||||
* as we know for sure that this will not trigger any of the
|
* as we know for sure that this will not trigger any of the
|
||||||
@ -725,8 +816,24 @@ static void insert_wq_barrier(struct cpu_workqueue_struct *cwq,
|
|||||||
__set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(&barr->work));
|
__set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(&barr->work));
|
||||||
init_completion(&barr->done);
|
init_completion(&barr->done);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* If @target is currently being executed, schedule the
|
||||||
|
* barrier to the worker; otherwise, put it after @target.
|
||||||
|
*/
|
||||||
|
if (worker)
|
||||||
|
head = worker->scheduled.next;
|
||||||
|
else {
|
||||||
|
unsigned long *bits = work_data_bits(target);
|
||||||
|
|
||||||
|
head = target->entry.next;
|
||||||
|
/* there can already be other linked works, inherit and set */
|
||||||
|
linked = *bits & WORK_STRUCT_LINKED;
|
||||||
|
__set_bit(WORK_STRUCT_LINKED_BIT, bits);
|
||||||
|
}
|
||||||
|
|
||||||
debug_work_activate(&barr->work);
|
debug_work_activate(&barr->work);
|
||||||
insert_work(cwq, &barr->work, head, work_color_to_flags(WORK_NO_COLOR));
|
insert_work(cwq, &barr->work, head,
|
||||||
|
work_color_to_flags(WORK_NO_COLOR) | linked);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -964,8 +1071,8 @@ EXPORT_SYMBOL_GPL(flush_workqueue);
|
|||||||
*/
|
*/
|
||||||
int flush_work(struct work_struct *work)
|
int flush_work(struct work_struct *work)
|
||||||
{
|
{
|
||||||
|
struct worker *worker = NULL;
|
||||||
struct cpu_workqueue_struct *cwq;
|
struct cpu_workqueue_struct *cwq;
|
||||||
struct list_head *prev;
|
|
||||||
struct wq_barrier barr;
|
struct wq_barrier barr;
|
||||||
|
|
||||||
might_sleep();
|
might_sleep();
|
||||||
@ -985,14 +1092,14 @@ int flush_work(struct work_struct *work)
|
|||||||
smp_rmb();
|
smp_rmb();
|
||||||
if (unlikely(cwq != get_wq_data(work)))
|
if (unlikely(cwq != get_wq_data(work)))
|
||||||
goto already_gone;
|
goto already_gone;
|
||||||
prev = &work->entry;
|
|
||||||
} else {
|
} else {
|
||||||
if (!cwq->worker || cwq->worker->current_work != work)
|
if (cwq->worker && cwq->worker->current_work == work)
|
||||||
|
worker = cwq->worker;
|
||||||
|
if (!worker)
|
||||||
goto already_gone;
|
goto already_gone;
|
||||||
prev = &cwq->worklist;
|
|
||||||
}
|
}
|
||||||
insert_wq_barrier(cwq, &barr, prev->next);
|
|
||||||
|
|
||||||
|
insert_wq_barrier(cwq, &barr, work, worker);
|
||||||
spin_unlock_irq(&cwq->lock);
|
spin_unlock_irq(&cwq->lock);
|
||||||
wait_for_completion(&barr.done);
|
wait_for_completion(&barr.done);
|
||||||
destroy_work_on_stack(&barr.work);
|
destroy_work_on_stack(&barr.work);
|
||||||
@ -1048,16 +1155,19 @@ static void wait_on_cpu_work(struct cpu_workqueue_struct *cwq,
|
|||||||
struct work_struct *work)
|
struct work_struct *work)
|
||||||
{
|
{
|
||||||
struct wq_barrier barr;
|
struct wq_barrier barr;
|
||||||
int running = 0;
|
struct worker *worker;
|
||||||
|
|
||||||
spin_lock_irq(&cwq->lock);
|
spin_lock_irq(&cwq->lock);
|
||||||
|
|
||||||
|
worker = NULL;
|
||||||
if (unlikely(cwq->worker && cwq->worker->current_work == work)) {
|
if (unlikely(cwq->worker && cwq->worker->current_work == work)) {
|
||||||
insert_wq_barrier(cwq, &barr, cwq->worklist.next);
|
worker = cwq->worker;
|
||||||
running = 1;
|
insert_wq_barrier(cwq, &barr, work, worker);
|
||||||
}
|
}
|
||||||
|
|
||||||
spin_unlock_irq(&cwq->lock);
|
spin_unlock_irq(&cwq->lock);
|
||||||
|
|
||||||
if (unlikely(running)) {
|
if (unlikely(worker)) {
|
||||||
wait_for_completion(&barr.done);
|
wait_for_completion(&barr.done);
|
||||||
destroy_work_on_stack(&barr.work);
|
destroy_work_on_stack(&barr.work);
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user