syncop: Multi-processor support in syncenv

This patch introduces:

- multithreading of syncop processors permitting synctasks to be executed
  concurrently if the runqueue has many tasks.

- Auto scaling of syncop processors based on runqueue length.

- Execute a synctask (synctask_new) in a blocking way if callback function
  is set NULL. The return value of the syncfn will be the return value
  of synctask_new()

Change-Id: Iff369709af9adfd07be3386842876a24e1a5a9b5
BUG: 763820
Reviewed-on: http://review.gluster.com/443
Tested-by: Gluster Build System <jenkins@build.gluster.com>
Reviewed-by: Anand Avati <avati@redhat.com>
This commit is contained in:
Anand Avati 2012-02-21 09:25:14 +05:30
parent dfc88bf372
commit 1206437fcf
2 changed files with 221 additions and 72 deletions

View File

@ -39,45 +39,79 @@ syncop_create_frame ()
return (call_frame_t *)frame;
}
void
synctask_yield (struct synctask *task)
static void
__run (struct synctask *task)
{
struct syncenv *env = NULL;
struct syncenv *env = NULL;
env = task->env;
if (swapcontext (&task->ctx, &env->sched) < 0) {
list_del_init (&task->all_tasks);
switch (task->state) {
case SYNCTASK_INIT:
break;
case SYNCTASK_RUN:
gf_log (task->xl->name, GF_LOG_WARNING,
"re-running already running task");
env->runcount--;
break;
case SYNCTASK_WAIT:
env->waitcount--;
break;
case SYNCTASK_DONE:
gf_log (task->xl->name, GF_LOG_WARNING,
"running completed task");
break;
}
list_add_tail (&task->all_tasks, &env->runq);
env->runcount++;
task->state = SYNCTASK_RUN;
}
static void
__wait (struct synctask *task)
{
struct syncenv *env = NULL;
env = task->env;
list_del_init (&task->all_tasks);
switch (task->state) {
case SYNCTASK_INIT:
break;
case SYNCTASK_RUN:
env->runcount--;
break;
case SYNCTASK_WAIT:
gf_log (task->xl->name, GF_LOG_WARNING,
"re-waiting already waiting task");
env->waitcount--;
break;
case SYNCTASK_DONE:
gf_log (task->xl->name, GF_LOG_WARNING,
"running completed task");
break;
}
list_add_tail (&task->all_tasks, &env->waitq);
env->waitcount++;
task->state = SYNCTASK_WAIT;
}
void
synctask_yield (struct synctask *task)
{
if (swapcontext (&task->ctx, &task->proc->sched) < 0) {
gf_log ("syncop", GF_LOG_ERROR,
"swapcontext failed (%s)", strerror (errno));
}
}
void
synctask_yawn (struct synctask *task)
{
struct syncenv *env = NULL;
env = task->env;
pthread_mutex_lock (&env->mutex);
{
list_del_init (&task->all_tasks);
list_add (&task->all_tasks, &env->waitq);
}
pthread_mutex_unlock (&env->mutex);
}
void
synctask_zzzz (struct synctask *task)
{
synctask_yawn (task);
synctask_yield (task);
}
void
synctask_wake (struct synctask *task)
{
@ -87,8 +121,10 @@ synctask_wake (struct synctask *task)
pthread_mutex_lock (&env->mutex);
{
list_del_init (&task->all_tasks);
list_add_tail (&task->all_tasks, &env->runq);
task->woken = 1;
if (task->slept)
__run (task);
}
pthread_mutex_unlock (&env->mutex);
@ -99,21 +135,17 @@ synctask_wake (struct synctask *task)
void
synctask_wrap (struct synctask *old_task)
{
int ret;
struct synctask *task = NULL;
/* Do not trust the pointer received. It may be
wrong and can lead to crashes. */
task = synctask_get ();
ret = task->syncfn (task->opaque);
task->synccbk (ret, task->frame, task->opaque);
task->ret = task->syncfn (task->opaque);
if (task->synccbk)
task->synccbk (task->ret, task->frame, task->opaque);
/* cannot destroy @task right here as we are
in the execution stack of @task itself
*/
task->complete = 1;
synctask_wake (task);
task->state = SYNCTASK_DONE;
synctask_yield (task);
}
@ -127,20 +159,42 @@ synctask_destroy (struct synctask *task)
if (task->stack)
FREE (task->stack);
pthread_mutex_destroy (&task->mutex);
pthread_cond_destroy (&task->cond);
FREE (task);
}
void
synctask_done (struct synctask *task)
{
if (task->synccbk) {
synctask_destroy (task);
return;
}
pthread_mutex_lock (&task->mutex);
{
task->done = 1;
pthread_cond_broadcast (&task->cond);
}
pthread_mutex_unlock (&task->mutex);
}
int
synctask_new (struct syncenv *env, synctask_fn_t fn, synctask_cbk_t cbk,
call_frame_t *frame, void *opaque)
{
struct synctask *newtask = NULL;
xlator_t *this = THIS;
int ret = 0;
VALIDATE_OR_GOTO (env, err);
VALIDATE_OR_GOTO (fn, err);
VALIDATE_OR_GOTO (cbk, err);
VALIDATE_OR_GOTO (frame, err);
newtask = CALLOC (1, sizeof (*newtask));
@ -150,7 +204,7 @@ synctask_new (struct syncenv *env, synctask_fn_t fn, synctask_cbk_t cbk,
newtask->env = env;
newtask->xl = this;
newtask->syncfn = fn;
newtask->synccbk = cbk;
newtask->synccbk = cbk;
newtask->opaque = opaque;
newtask->frame = frame;
@ -175,9 +229,33 @@ synctask_new (struct syncenv *env, synctask_fn_t fn, synctask_cbk_t cbk,
makecontext (&newtask->ctx, (void *) synctask_wrap, 2, newtask);
newtask->state = SYNCTASK_INIT;
newtask->slept = 1;
if (!cbk) {
pthread_mutex_init (&newtask->mutex, NULL);
pthread_cond_init (&newtask->cond, NULL);
newtask->done = 0;
}
synctask_wake (newtask);
return 0;
if (!cbk) {
pthread_mutex_lock (&newtask->mutex);
{
while (!newtask->done) {
pthread_cond_wait (&newtask->cond, &newtask->mutex);
}
}
pthread_mutex_unlock (&newtask->mutex);
ret = newtask->ret;
synctask_destroy (newtask);
}
return ret;
err:
if (newtask) {
if (newtask->stack)
@ -189,10 +267,13 @@ err:
struct synctask *
syncenv_task (struct syncenv *env)
syncenv_task (struct syncproc *proc)
{
struct syncenv *env = NULL;
struct synctask *task = NULL;
env = proc->env;
pthread_mutex_lock (&env->mutex);
{
while (list_empty (&env->runq))
@ -201,6 +282,9 @@ syncenv_task (struct syncenv *env)
task = list_entry (env->runq.next, struct synctask, all_tasks);
list_del_init (&task->all_tasks);
env->runcount--;
task->proc = proc;
}
pthread_mutex_unlock (&env->mutex);
@ -218,10 +302,29 @@ synctask_switchto (struct synctask *task)
synctask_set (task);
THIS = task->xl;
if (swapcontext (&env->sched, &task->ctx) < 0) {
task->woken = 0;
task->slept = 0;
if (swapcontext (&task->proc->sched, &task->ctx) < 0) {
gf_log ("syncop", GF_LOG_ERROR,
"swapcontext failed (%s)", strerror (errno));
}
if (task->state == SYNCTASK_DONE) {
synctask_done (task);
return;
}
pthread_mutex_lock (&env->mutex);
{
if (task->woken) {
__run (task);
} else {
task->slept = 1;
__wait (task);
}
}
pthread_mutex_unlock (&env->mutex);
}
@ -229,25 +332,51 @@ void *
syncenv_processor (void *thdata)
{
struct syncenv *env = NULL;
struct syncproc *proc = NULL;
struct synctask *task = NULL;
env = thdata;
proc = thdata;
env = proc->env;
for (;;) {
task = syncenv_task (env);
if (task->complete) {
synctask_destroy (task);
continue;
}
task = syncenv_task (proc);
synctask_switchto (task);
syncenv_scale (env);
}
return NULL;
}
void
syncenv_scale (struct syncenv *env)
{
int thmax = 0;
int i = 0;
int ret = 0;
pthread_mutex_lock (&env->mutex);
{
if (env->procs > env->runcount)
goto unlock;
thmax = max (env->runcount, SYNCENV_PROC_MAX);
for (i = env->procs; i < thmax; i++) {
env->proc[i].env = env;
ret = pthread_create (&env->proc[i].processor, NULL,
syncenv_processor, &env->proc[i]);
if (ret)
break;
env->procs++;
}
}
unlock:
pthread_mutex_unlock (&env->mutex);
}
void
syncenv_destroy (struct syncenv *env)
{
@ -260,6 +389,7 @@ syncenv_new (size_t stacksize)
{
struct syncenv *newenv = NULL;
int ret = 0;
int i = 0;
newenv = CALLOC (1, sizeof (*newenv));
@ -276,8 +406,14 @@ syncenv_new (size_t stacksize)
if (stacksize)
newenv->stacksize = stacksize;
ret = pthread_create (&newenv->processor, NULL,
syncenv_processor, newenv);
for (i = 0; i < SYNCENV_PROC_MIN; i++) {
newenv->proc[i].env = newenv;
ret = pthread_create (&newenv->proc[i].processor, NULL,
syncenv_processor, &newenv->proc[i]);
if (ret)
break;
newenv->procs++;
}
if (ret != 0)
syncenv_destroy (newenv);

View File

@ -30,8 +30,11 @@
#include <pthread.h>
#include <ucontext.h>
#define SYNCENV_PROC_MAX 16
#define SYNCENV_PROC_MIN 2
struct synctask;
struct syncproc;
struct syncenv;
@ -40,6 +43,13 @@ typedef int (*synctask_cbk_t) (int ret, call_frame_t *frame, void *opaque);
typedef int (*synctask_fn_t) (void *opaque);
typedef enum {
SYNCTASK_INIT = 0,
SYNCTASK_RUN,
SYNCTASK_WAIT,
SYNCTASK_DONE,
} synctask_state_t;
/* for one sequential execution of @syncfn */
struct synctask {
struct list_head all_tasks;
@ -48,25 +58,43 @@ struct synctask {
call_frame_t *frame;
synctask_cbk_t synccbk;
synctask_fn_t syncfn;
synctask_state_t state;
void *opaque;
void *stack;
int woken;
int slept;
int complete;
int ret;
ucontext_t ctx;
struct syncproc *proc;
pthread_mutex_t mutex; /* for synchronous spawning of synctask */
pthread_cond_t cond;
int done;
};
struct syncproc {
pthread_t processor;
ucontext_t sched;
struct syncenv *env;
struct synctask *current;
};
/* hosts the scheduler thread and framework for executing synctasks */
struct syncenv {
pthread_t processor;
struct synctask *current;
struct syncproc proc[SYNCENV_PROC_MAX];
int procs;
struct list_head runq;
int runcount;
struct list_head waitq;
int waitcount;
pthread_mutex_t mutex;
pthread_cond_t cond;
ucontext_t sched;
size_t stacksize;
};
@ -92,20 +120,6 @@ struct syncargs {
};
#define __yawn(args) do { \
struct synctask *task = NULL; \
\
task = synctask_get (); \
if (task) { \
args->task = task; \
synctask_yawn (task); \
} else { \
pthread_mutex_init (&args->mutex, NULL); \
pthread_cond_init (&args->cond, NULL); \
} \
} while (0)
#define __yield(args) do { \
if (args->task) { \
synctask_yield (args->task); \
@ -143,7 +157,6 @@ struct syncargs {
\
frame = syncop_create_frame (); \
\
__yawn (stb); \
STACK_WIND_COOKIE (frame, cbk, (void *)stb, subvol, op, params); \
__yield (stb); \
} while (0)
@ -153,10 +166,10 @@ struct syncargs {
struct syncenv * syncenv_new ();
void syncenv_destroy (struct syncenv *);
void syncenv_scale (struct syncenv *env);
int synctask_new (struct syncenv *, synctask_fn_t, synctask_cbk_t, call_frame_t* frame, void *);
void synctask_zzzz (struct synctask *task);
void synctask_yawn (struct synctask *task);
void synctask_wake (struct synctask *task);
void synctask_yield (struct synctask *task);