BUG/MAJOR: sched: prevent rare concurrent wakeup of multi-threaded tasks
Since the relaxation of the run-queue locks in 2.0 there has been a very small but existing race between expired tasks and running tasks: a task might be expiring and being woken up at the same time, on different threads. This is protected against via the TASK_QUEUED and TASK_RUNNING flags, but just after the task finishes executing, it releases it TASK_RUNNING bit an only then it may go to task_queue(). This one will do nothing if the task's ->expire field is zero, but if the field turns to zero between this test and the call to __task_queue() then three things may happen: - the task may remain in the WQ until the 24 next days if it's in the future; - the task may prevent any other task after it from expiring during the 24 next days once it's queued - if DEBUG_STRICT is set on 2.4 and above, an abort may happen - since 2.2, if the task got killed in between, then we may even requeue a freed task, causing random behaviour next time it's found there, or possibly corrupting the tree if it gets reinserted later. The peers code is one call path that easily reproduces the case with the ->expire field being reset, because it starts by setting it to TICK_ETERNITY as the first thing when entering the task handler. But other code parts also use multi-threaded tasks and rightfully expect to be able to touch their expire field without causing trouble. No trivial code path was found that would destroy such a shared task at runtime, which already limits the risks. This must be backported to 2.0.
This commit is contained in:
parent
27c8da1fd5
commit
6c8babf6c4
@ -220,6 +220,42 @@ static inline void _task_wakeup(struct task *t, unsigned int f, const char *file
|
||||
}
|
||||
}
|
||||
|
||||
/* Atomically drop the TASK_RUNNING bit while ensuring that any wakeup that
|
||||
* happened since the flag was set will result in the task being queued (if
|
||||
* it wasn't already). This is used to safely drop the flag from within the
|
||||
* scheduler. The flag <f> is combined with existing flags before the test so
|
||||
* that it's possible to inconditionally wakeup the task and drop the RUNNING
|
||||
* flag if needed.
|
||||
*/
|
||||
#define task_drop_running(t, f) _task_drop_running(t, f, __FILE__, __LINE__)
|
||||
static inline void _task_drop_running(struct task *t, unsigned int f, const char *file, int line)
|
||||
{
|
||||
unsigned int state, new_state;
|
||||
|
||||
state = _HA_ATOMIC_LOAD(&t->state);
|
||||
|
||||
while (1) {
|
||||
new_state = state | f;
|
||||
if (new_state & TASK_WOKEN_ANY)
|
||||
new_state |= TASK_QUEUED;
|
||||
|
||||
if (_HA_ATOMIC_CAS(&t->state, &state, new_state & ~TASK_RUNNING))
|
||||
break;
|
||||
__ha_cpu_relax();
|
||||
}
|
||||
|
||||
if ((new_state & ~state) & TASK_QUEUED) {
|
||||
#ifdef DEBUG_TASK
|
||||
if ((unsigned int)t->debug.caller_idx > 1)
|
||||
ABORT_NOW();
|
||||
t->debug.caller_idx = !t->debug.caller_idx;
|
||||
t->debug.caller_file[t->debug.caller_idx] = file;
|
||||
t->debug.caller_line[t->debug.caller_idx] = line;
|
||||
#endif
|
||||
__task_wakeup(t);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Unlink the task from the wait queue, and possibly update the last_timer
|
||||
* pointer. A pointer to the task itself is returned. The task *must* already
|
||||
|
46
src/task.c
46
src/task.c
@ -395,12 +395,29 @@ void wake_expired_tasks()
|
||||
}
|
||||
|
||||
task = eb32_entry(eb, struct task, wq);
|
||||
|
||||
/* Check for any competing run of the task (quite rare but may
|
||||
* involve a dangerous concurrent access on task->expire). In
|
||||
* order to protect against this, we'll take an exclusive access
|
||||
* on TASK_RUNNING before checking/touching task->expire. If the
|
||||
* task is already RUNNING on another thread, it will deal by
|
||||
* itself with the requeuing so we must not do anything and
|
||||
* simply quit the loop for now, because we cannot wait with the
|
||||
* WQ lock held as this would prevent the running thread from
|
||||
* requeuing the task. One annoying effect of holding RUNNING
|
||||
* here is that a concurrent task_wakeup() will refrain from
|
||||
* waking it up. This forces us to check for a wakeup after
|
||||
* releasing the flag.
|
||||
*/
|
||||
if (HA_ATOMIC_FETCH_OR(&task->state, TASK_RUNNING) & TASK_RUNNING)
|
||||
break;
|
||||
|
||||
if (tick_is_expired(task->expire, now_ms)) {
|
||||
/* expired task, wake it up */
|
||||
HA_RWLOCK_SKTOWR(TASK_WQ_LOCK, &wq_lock);
|
||||
__task_unlink_wq(task);
|
||||
HA_RWLOCK_WRTOSK(TASK_WQ_LOCK, &wq_lock);
|
||||
task_wakeup(task, TASK_WOKEN_TIMER);
|
||||
task_drop_running(task, TASK_WOKEN_TIMER);
|
||||
}
|
||||
else if (task->expire != eb->key) {
|
||||
/* task is not expired but its key doesn't match so let's
|
||||
@ -411,11 +428,13 @@ void wake_expired_tasks()
|
||||
if (tick_isset(task->expire))
|
||||
__task_queue(task, &timers);
|
||||
HA_RWLOCK_WRTOSK(TASK_WQ_LOCK, &wq_lock);
|
||||
task_drop_running(task, 0);
|
||||
goto lookup_next;
|
||||
}
|
||||
else {
|
||||
/* task not expired and correctly placed. It may not be eternal. */
|
||||
BUG_ON(task->expire == TICK_ETERNITY);
|
||||
task_drop_running(task, 0);
|
||||
break;
|
||||
}
|
||||
}
|
||||
@ -582,9 +601,20 @@ unsigned int run_tasks_from_lists(unsigned int budgets[])
|
||||
LIST_DEL_INIT(&((struct tasklet *)t)->list);
|
||||
__ha_barrier_store();
|
||||
|
||||
state = t->state;
|
||||
while (!_HA_ATOMIC_CAS(&t->state, &state, (state & TASK_PERSISTENT) | TASK_RUNNING))
|
||||
;
|
||||
/* We must be the exclusive owner of the TASK_RUNNING bit, and
|
||||
* have to be careful that the task is not being manipulated on
|
||||
* another thread finding it expired in wake_expired_tasks().
|
||||
* The TASK_RUNNING bit will be set during these operations,
|
||||
* they are extremely rare and do not last long so the best to
|
||||
* do here is to wait.
|
||||
*/
|
||||
state = _HA_ATOMIC_LOAD(&t->state);
|
||||
do {
|
||||
while (unlikely(state & TASK_RUNNING)) {
|
||||
__ha_cpu_relax();
|
||||
state = _HA_ATOMIC_LOAD(&t->state);
|
||||
}
|
||||
} while (!_HA_ATOMIC_CAS(&t->state, &state, (state & TASK_PERSISTENT) | TASK_RUNNING));
|
||||
|
||||
__ha_barrier_atomic_store();
|
||||
|
||||
@ -637,15 +667,15 @@ unsigned int run_tasks_from_lists(unsigned int budgets[])
|
||||
HA_ATOMIC_ADD(&profile_entry->cpu_time, cpu);
|
||||
}
|
||||
|
||||
state = _HA_ATOMIC_AND_FETCH(&t->state, ~TASK_RUNNING);
|
||||
state = _HA_ATOMIC_LOAD(&t->state);
|
||||
if (unlikely(state & TASK_KILLED)) {
|
||||
task_unlink_wq(t);
|
||||
__task_free(t);
|
||||
}
|
||||
else if (state & TASK_WOKEN_ANY)
|
||||
task_wakeup(t, 0);
|
||||
else
|
||||
else {
|
||||
task_queue(t);
|
||||
task_drop_running(t, 0);
|
||||
}
|
||||
}
|
||||
done++;
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user