features/qemu-block: simplify coroutine model to use single synctask, ucontext

The current coroutine model, mapping synctasks 1-1 with qemu internal
Coroutines, has some unresolved raciness issues. This problem usually
manifests as lifecycle mismatches between top-level (gluster created)
synctasks and the subsequently created internal coroutines from that
context. Qemu's internal queueing (and locking) can cause situations
where the top-level synctask is destroyed before the internal scheduler
has released references to memory, leading to use after free crashes
and asserts.

Simplify the coroutine model to use a single synctask as a coroutine
processor and rely on the existing native ucontext coroutine
implementation. The syncenv thread is donated to qemu and ensures a
single top-level coroutine is processed at a time. Qemu now has
complete control over coroutine scheduling.

BUG: 986775
Change-Id: I38223479a608d80353128e390f243933fc946fd6
Signed-off-by: Brian Foster <bfoster@redhat.com>
Reviewed-on: http://review.gluster.org/6110
Tested-by: Gluster Build System <jenkins@build.gluster.com>
Reviewed-by: Anand Avati <avati@redhat.com>
This commit is contained in:
Brian Foster 2013-10-18 07:36:38 -04:00 committed by Anand Avati
parent 0826f9073a
commit b06ecde299
9 changed files with 303 additions and 255 deletions

View File

@ -64,7 +64,6 @@
#define CONFIG_OPEN_BY_HANDLE 1
#define CONFIG_LINUX_MAGIC_H 1
#define CONFIG_PRAGMA_DIAGNOSTIC_AVAILABLE 1
#define CONFIG_VALGRIND_H 1
#define CONFIG_HAS_ENVIRON 1
#define CONFIG_CPUID_H 1
#define CONFIG_INT128 1

View File

@ -0,0 +1,225 @@
/*
* ucontext coroutine initialization code
*
* Copyright (C) 2006 Anthony Liguori <anthony@codemonkey.ws>
* Copyright (C) 2011 Kevin Wolf <kwolf@redhat.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.0 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, see <http://www.gnu.org/licenses/>.
*/
/* XXX Is there a nicer way to disable glibc's stack check for longjmp? */
#ifdef _FORTIFY_SOURCE
#undef _FORTIFY_SOURCE
#endif
#include <stdlib.h>
#include <setjmp.h>
#include <stdint.h>
#include <pthread.h>
#include <ucontext.h>
#include "qemu-common.h"
#include "block/coroutine_int.h"
#ifdef CONFIG_VALGRIND_H
#include <valgrind/valgrind.h>
#endif
typedef struct {
Coroutine base;
void *stack;
sigjmp_buf env;
#ifdef CONFIG_VALGRIND_H
unsigned int valgrind_stack_id;
#endif
} CoroutineUContext;
/**
* Per-thread coroutine bookkeeping
*/
typedef struct {
/** Currently executing coroutine */
Coroutine *current;
/** The default coroutine */
CoroutineUContext leader;
} CoroutineThreadState;
static pthread_key_t thread_state_key;
/*
* va_args to makecontext() must be type 'int', so passing
* the pointer we need may require several int args. This
* union is a quick hack to let us do that
*/
union cc_arg {
void *p;
int i[2];
};
static CoroutineThreadState *coroutine_get_thread_state(void)
{
CoroutineThreadState *s = pthread_getspecific(thread_state_key);
if (!s) {
s = g_malloc0(sizeof(*s));
s->current = &s->leader.base;
pthread_setspecific(thread_state_key, s);
}
return s;
}
static void qemu_coroutine_thread_cleanup(void *opaque)
{
CoroutineThreadState *s = opaque;
g_free(s);
}
static void __attribute__((constructor)) coroutine_init(void)
{
int ret;
ret = pthread_key_create(&thread_state_key, qemu_coroutine_thread_cleanup);
if (ret != 0) {
fprintf(stderr, "unable to create leader key: %s\n", strerror(errno));
abort();
}
}
static void coroutine_trampoline(int i0, int i1)
{
union cc_arg arg;
CoroutineUContext *self;
Coroutine *co;
arg.i[0] = i0;
arg.i[1] = i1;
self = arg.p;
co = &self->base;
/* Initialize longjmp environment and switch back the caller */
if (!sigsetjmp(self->env, 0)) {
siglongjmp(*(sigjmp_buf *)co->entry_arg, 1);
}
while (true) {
co->entry(co->entry_arg);
qemu_coroutine_switch(co, co->caller, COROUTINE_TERMINATE);
}
}
Coroutine *qemu_coroutine_new(void)
{
const size_t stack_size = 1 << 20;
CoroutineUContext *co;
ucontext_t old_uc, uc;
sigjmp_buf old_env;
union cc_arg arg = {0};
/* The ucontext functions preserve signal masks which incurs a
* system call overhead. sigsetjmp(buf, 0)/siglongjmp() does not
* preserve signal masks but only works on the current stack.
* Since we need a way to create and switch to a new stack, use
* the ucontext functions for that but sigsetjmp()/siglongjmp() for
* everything else.
*/
if (getcontext(&uc) == -1) {
abort();
}
co = g_malloc0(sizeof(*co));
co->stack = g_malloc(stack_size);
co->base.entry_arg = &old_env; /* stash away our jmp_buf */
uc.uc_link = &old_uc;
uc.uc_stack.ss_sp = co->stack;
uc.uc_stack.ss_size = stack_size;
uc.uc_stack.ss_flags = 0;
#ifdef CONFIG_VALGRIND_H
co->valgrind_stack_id =
VALGRIND_STACK_REGISTER(co->stack, co->stack + stack_size);
#endif
arg.p = co;
makecontext(&uc, (void (*)(void))coroutine_trampoline,
2, arg.i[0], arg.i[1]);
/* swapcontext() in, siglongjmp() back out */
if (!sigsetjmp(old_env, 0)) {
swapcontext(&old_uc, &uc);
}
return &co->base;
}
#ifdef CONFIG_VALGRIND_H
#ifdef CONFIG_PRAGMA_DIAGNOSTIC_AVAILABLE
/* Work around an unused variable in the valgrind.h macro... */
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wunused-but-set-variable"
#endif
static inline void valgrind_stack_deregister(CoroutineUContext *co)
{
VALGRIND_STACK_DEREGISTER(co->valgrind_stack_id);
}
#ifdef CONFIG_PRAGMA_DIAGNOSTIC_AVAILABLE
#pragma GCC diagnostic pop
#endif
#endif
void qemu_coroutine_delete(Coroutine *co_)
{
CoroutineUContext *co = DO_UPCAST(CoroutineUContext, base, co_);
#ifdef CONFIG_VALGRIND_H
valgrind_stack_deregister(co);
#endif
g_free(co->stack);
g_free(co);
}
CoroutineAction qemu_coroutine_switch(Coroutine *from_, Coroutine *to_,
CoroutineAction action)
{
CoroutineUContext *from = DO_UPCAST(CoroutineUContext, base, from_);
CoroutineUContext *to = DO_UPCAST(CoroutineUContext, base, to_);
CoroutineThreadState *s = coroutine_get_thread_state();
int ret;
s->current = to_;
ret = sigsetjmp(from->env, 0);
if (ret == 0) {
siglongjmp(to->env, action);
}
return ret;
}
Coroutine *qemu_coroutine_self(void)
{
CoroutineThreadState *s = coroutine_get_thread_state();
return s->current;
}
bool qemu_in_coroutine(void)
{
CoroutineThreadState *s = pthread_getspecific(thread_state_key);
return s && s->current->caller;
}

View File

@ -9,6 +9,7 @@ qemu_block_la_SOURCES_qemu = \
$(CONTRIBDIR)/qemu/qemu-coroutine.c \
$(CONTRIBDIR)/qemu/qemu-coroutine-lock.c \
$(CONTRIBDIR)/qemu/qemu-coroutine-sleep.c \
$(CONTRIBDIR)/qemu/coroutine-ucontext.c \
$(CONTRIBDIR)/qemu/block.c \
$(CONTRIBDIR)/qemu/nop-symbols.c
@ -140,7 +141,6 @@ noinst_HEADERS = \
$(noinst_HEADERS_qemu) \
qemu-block.h \
qemu-block-memory-types.h \
coroutine-synctask.h \
qb-coroutines.h
AM_CPPFLAGS = $(GF_CPPFLAGS) -I$(top_srcdir)/libglusterfs/src \

View File

@ -22,192 +22,95 @@
#include "qemu-block-memory-types.h"
#include "qemu-block.h"
#include "coroutine-synctask.h"
void
qemu_coroutine_delete (Coroutine *co_)
/*
* This code serves as the bridge from the main glusterfs context to the qemu
* coroutine context via synctask. We create a single threaded syncenv with a
* single synctask responsible for processing a queue of coroutines. The qemu
* code invoked from within the synctask function handlers uses the ucontext
* coroutine implementation and scheduling logic internal to qemu. This
* effectively donates a thread of execution to qemu and its internal coroutine
* management.
*
* NOTE: The existence of concurrent synctasks has proven quite racy with regard
* to qemu coroutine management, particularly related to the lifecycle
* differences with top-level synctasks and internally created coroutines and
* interactions with qemu-internal queues (and locks, in turn). We explicitly
* disallow this scenario, via the queue, until it is more well supported.
*/
static struct {
struct list_head queue;
gf_lock_t lock;
struct synctask *task;
} qb_co;
static void
init_qbco()
{
struct synctask *synctask = NULL;
CoroutineSynctask *cs = NULL;
cs = DO_UPCAST(CoroutineSynctask, base, co_);
synctask = cs->synctask;
cs->die = 1;
synctask_wake (synctask);
/* Do not free either @cs or @synctask here.
@synctask is naturally destroyed when
cs_proc() returns (after "break"ing out of
the loop because of setting cs->die=1 above.
We free @cs too just before returning from
cs_proc()
*/
return;
INIT_LIST_HEAD(&qb_co.queue);
LOCK_INIT(&qb_co.lock);
}
CoroutineAction
qemu_coroutine_switch (Coroutine *from_, Coroutine *to_, CoroutineAction action)
{
struct synctask *to = NULL;
struct synctask *from = NULL;
CoroutineSynctask *csto = NULL;
CoroutineSynctask *csfrom = NULL;
csto = DO_UPCAST(CoroutineSynctask, base, to_);
csfrom = DO_UPCAST(CoroutineSynctask, base, from_);
to = csto->synctask;
from = csfrom->synctask;
/* TODO: need mutex/cond guarding when making syncenv
multithreaded
*/
csfrom->run = false;
csto->run = true;
/* the next three lines must be in this specific order only */
csfrom->action = action;
synctask_wake (to);
synctask_yield (from);
/* the yielder set @action value in @csfrom, but for the
resumer it is @csto
*/
return csto->action;
}
int
cs_fin (int ret, call_frame_t *frame, void *opaque)
{
/* nop */
return 0;
}
static int
cs_proc (void *opaque)
{
CoroutineSynctask *cs = opaque;
struct synctask *synctask = NULL;
synctask = synctask_get (); /* == cs->synctask */
for (;;) {
while (!cs->run && !cs->die)
/* entry function (i.e cs->base.entry) will
not be set just yet first time. Wait for
caller to set it and call switch()
*/
synctask_yield (synctask);
if (cs->die)
break;
cs->base.entry (cs->base.entry_arg);
qemu_coroutine_switch (&cs->base, cs->base.caller,
COROUTINE_TERMINATE);
}
GF_FREE (cs);
return 0;
}
Coroutine *
qemu_coroutine_new()
{
qb_conf_t *conf = NULL;
CoroutineSynctask *cs = NULL;
struct synctask *task = NULL;
conf = THIS->private;
cs = GF_CALLOC (1, sizeof (*cs), gf_qb_mt_coroutinesynctask_t);
if (!cs)
return NULL;
task = synctask_get ();
/* Inherit the frame from the parent synctask, as this will
carry forward things like uid, gid, pid, lkowner etc. of the
caller properly.
*/
cs->synctask = synctask_create (conf->env, cs_proc, cs_fin,
task ? task->frame : NULL, cs);
if (!cs->synctask)
return NULL;
return &cs->base;
}
Coroutine *
qemu_coroutine_self()
{
struct synctask *synctask = NULL;
CoroutineSynctask *cs = NULL;
synctask = synctask_get();
cs = synctask->opaque;
return &cs->base;
}
bool
qemu_in_coroutine ()
{
Coroutine *co = NULL;
co = qemu_coroutine_self ();
return co && co->caller;
}
/* These are calls for the "top" xlator to invoke/submit
coroutines
*/
static int
synctask_nop_cbk (int ret, call_frame_t *frame, void *opaque)
{
return 0;
}
int
static int
qb_synctask_wrap (void *opaque)
{
struct synctask *task = NULL;
CoroutineSynctask *cs = NULL;
qb_local_t *qb_local = NULL;
qb_local_t *qb_local, *tmp;
task = synctask_get ();
cs = opaque;
cs->synctask = task;
qb_local = DO_UPCAST (qb_local_t, cs, cs);
LOCK(&qb_co.lock);
return qb_local->synctask_fn (opaque);
while (!list_empty(&qb_co.queue)) {
list_for_each_entry_safe(qb_local, tmp, &qb_co.queue, list) {
list_del_init(&qb_local->list);
break;
}
UNLOCK(&qb_co.lock);
qb_local->synctask_fn(qb_local);
/* qb_local is now unwound and gone! */
LOCK(&qb_co.lock);
}
qb_co.task = NULL;
UNLOCK(&qb_co.lock);
return 0;
}
int
qb_coroutine (call_frame_t *frame, synctask_fn_t fn)
{
qb_local_t *qb_local = NULL;
qb_conf_t *qb_conf = NULL;
static int init = 0;
qb_local = frame->local;
qb_local->synctask_fn = fn;
qb_conf = frame->this->private;
return synctask_new (qb_conf->env, qb_synctask_wrap, synctask_nop_cbk,
frame, &qb_local->cs);
if (!init) {
init = 1;
init_qbco();
}
LOCK(&qb_co.lock);
if (!qb_co.task)
qb_co.task = synctask_create(qb_conf->env, qb_synctask_wrap,
synctask_nop_cbk, frame, NULL);
list_add_tail(&qb_local->list, &qb_co.queue);
UNLOCK(&qb_co.lock);
return 0;
}

View File

@ -1,51 +0,0 @@
/*
Copyright (c) 2013 Red Hat, Inc. <http://www.redhat.com>
This file is part of GlusterFS.
This file is licensed to you under your choice of the GNU Lesser
General Public License, version 3 or any later version (LGPLv3 or
later), or the GNU General Public License, version 2 (GPLv2), in all
cases as published by the Free Software Foundation.
*/
#ifndef __COROUTINE_SYNCTASK_H
#define __COROUTINE_SYNCTASK_H
#include "syncop.h"
#include "block/coroutine_int.h"
#include "qemu-common.h"
#include "block/coroutine_int.h"
/*
Three entities:
synctask - glusterfs implementation of xlator friendly lightweight threads
Coroutine - qemu coroutine API for its block drivers
CoroutineSynctask - implementation of Coroutine using synctasks
Coroutine is an "embedded" structure inside CoroutineSynctask, called "base".
E.g:
Coroutine *co;
CoroutineSynctask *cs;
struct synctask *synctask;
cs == synctask->opaque;
co == &(cs->base);
cs = DO_UPCAST(CoroutineSynctask, base, co);
synctask == cs->synctask;
*/
typedef struct {
Coroutine base;
struct synctask *synctask;
CoroutineAction action;
bool run;
bool die;
} CoroutineSynctask;
#endif /* !__COROUTINE_SYNCTASK_H */

View File

@ -29,7 +29,6 @@
int
qb_format_and_resume (void *opaque)
{
CoroutineSynctask *cs = NULL;
qb_local_t *local = NULL;
call_frame_t *frame = NULL;
call_stub_t *stub = NULL;
@ -44,9 +43,7 @@ qb_format_and_resume (void *opaque)
qb_conf_t *qb_conf = NULL;
int ret = -1;
cs = opaque;
local = DO_UPCAST(qb_local_t, cs, cs);
local = opaque;
frame = local->frame;
stub = local->stub;
inode = local->inode;
@ -224,16 +221,13 @@ err:
int
qb_co_open (void *opaque)
{
CoroutineSynctask *cs = NULL;
qb_local_t *local = NULL;
call_frame_t *frame = NULL;
call_stub_t *stub = NULL;
inode_t *inode = NULL;
qb_inode_t *qb_inode = NULL;
cs = opaque;
local = DO_UPCAST(qb_local_t, cs, cs);
local = opaque;
frame = local->frame;
stub = local->stub;
inode = local->inode;
@ -262,7 +256,6 @@ qb_co_open (void *opaque)
int
qb_co_writev (void *opaque)
{
CoroutineSynctask *cs = NULL;
qb_local_t *local = NULL;
call_frame_t *frame = NULL;
call_stub_t *stub = NULL;
@ -271,9 +264,7 @@ qb_co_writev (void *opaque)
QEMUIOVector qiov = {0, };
int ret = 0;
cs = opaque;
local = DO_UPCAST(qb_local_t, cs, cs);
local = opaque;
frame = local->frame;
stub = local->stub;
inode = local->inode;
@ -309,7 +300,6 @@ qb_co_writev (void *opaque)
int
qb_co_readv (void *opaque)
{
CoroutineSynctask *cs = NULL;
qb_local_t *local = NULL;
call_frame_t *frame = NULL;
call_stub_t *stub = NULL;
@ -320,9 +310,7 @@ qb_co_readv (void *opaque)
struct iovec iov = {0, };
int ret = 0;
cs = opaque;
local = DO_UPCAST(qb_local_t, cs, cs);
local = opaque;
frame = local->frame;
stub = local->stub;
inode = local->inode;
@ -391,7 +379,6 @@ qb_co_readv (void *opaque)
int
qb_co_fsync (void *opaque)
{
CoroutineSynctask *cs = NULL;
qb_local_t *local = NULL;
call_frame_t *frame = NULL;
call_stub_t *stub = NULL;
@ -399,9 +386,7 @@ qb_co_fsync (void *opaque)
qb_inode_t *qb_inode = NULL;
int ret = 0;
cs = opaque;
local = DO_UPCAST(qb_local_t, cs, cs);
local = opaque;
frame = local->frame;
stub = local->stub;
inode = local->inode;
@ -461,7 +446,6 @@ qb_update_size_xattr (xlator_t *this, fd_t *fd, const char *fmt, off_t offset)
int
qb_co_truncate (void *opaque)
{
CoroutineSynctask *cs = NULL;
qb_local_t *local = NULL;
call_frame_t *frame = NULL;
call_stub_t *stub = NULL;
@ -472,9 +456,8 @@ qb_co_truncate (void *opaque)
xlator_t *this = NULL;
this = THIS;
cs = opaque;
local = DO_UPCAST(qb_local_t, cs, cs);
local = opaque;
frame = local->frame;
stub = local->stub;
inode = local->inode;
@ -523,14 +506,13 @@ out:
int
qb_co_close (void *opaque)
{
CoroutineSynctask *cs = NULL;
qb_local_t *local = NULL;
call_frame_t *frame = NULL;
inode_t *inode = NULL;
qb_inode_t *qb_inode = NULL;
BlockDriverState *bs = NULL;
local = DO_UPCAST(qb_local_t, cs, cs);
local = opaque;
inode = local->inode;
qb_inode = qb_inode_ctx_get (THIS, inode);
@ -553,7 +535,6 @@ qb_co_close (void *opaque)
int
qb_snapshot_create (void *opaque)
{
CoroutineSynctask *cs = NULL;
qb_local_t *local = NULL;
call_frame_t *frame = NULL;
call_stub_t *stub = NULL;
@ -563,9 +544,7 @@ qb_snapshot_create (void *opaque)
struct timeval tv = {0, };
int ret = 0;
cs = opaque;
local = DO_UPCAST(qb_local_t, cs, cs);
local = opaque;
frame = local->frame;
stub = local->stub;
inode = local->inode;
@ -604,7 +583,6 @@ qb_snapshot_create (void *opaque)
int
qb_snapshot_delete (void *opaque)
{
CoroutineSynctask *cs = NULL;
qb_local_t *local = NULL;
call_frame_t *frame = NULL;
call_stub_t *stub = NULL;
@ -612,9 +590,7 @@ qb_snapshot_delete (void *opaque)
qb_inode_t *qb_inode = NULL;
int ret = 0;
cs = opaque;
local = DO_UPCAST(qb_local_t, cs, cs);
local = opaque;
frame = local->frame;
stub = local->stub;
inode = local->inode;
@ -648,7 +624,6 @@ qb_snapshot_delete (void *opaque)
int
qb_snapshot_goto (void *opaque)
{
CoroutineSynctask *cs = NULL;
qb_local_t *local = NULL;
call_frame_t *frame = NULL;
call_stub_t *stub = NULL;
@ -656,9 +631,7 @@ qb_snapshot_goto (void *opaque)
qb_inode_t *qb_inode = NULL;
int ret = 0;
cs = opaque;
local = DO_UPCAST(qb_local_t, cs, cs);
local = opaque;
frame = local->frame;
stub = local->stub;
inode = local->inode;

View File

@ -15,7 +15,6 @@
#include "call-stub.h"
#include "block/block_int.h"
#include "monitor/monitor.h"
#include "coroutine-synctask.h"
int qb_format_and_resume (void *opaque);
int qb_snapshot_create (void *opaque);

View File

@ -204,6 +204,7 @@ qb_local_init (call_frame_t *frame)
qb_local = GF_CALLOC (1, sizeof (*qb_local), gf_qb_mt_qb_local_t);
if (!qb_local)
return -1;
INIT_LIST_HEAD(&qb_local->list);
qb_local->frame = frame;
frame->local = qb_local;

View File

@ -15,7 +15,6 @@
#include "call-stub.h"
#include "block/block_int.h"
#include "monitor/monitor.h"
#include "coroutine-synctask.h"
/* QB_XATTR_KEY_FMT is the on-disk xattr stored in the inode which
indicates that the file must be "interpreted" by the block format
@ -59,7 +58,6 @@ typedef struct qb_conf {
typedef struct qb_local {
CoroutineSynctask cs;
call_frame_t *frame; /* backpointer */
call_stub_t *stub;
inode_t *inode;
@ -67,6 +65,7 @@ typedef struct qb_local {
char fmt[QB_XATTR_VAL_MAX+1];
char name[256];
synctask_fn_t synctask_fn;
struct list_head list;
} qb_local_t;
void qb_local_free (xlator_t *this, qb_local_t *local);