synctask: minor enhancements
- Enhance syncenv_new() to accept scaling parameters of syncproc. Previously the scaling parameters were hardcoded and decided at compile time. - New API synctask_create() which returns the created synctask. This is similar to synctask_new which only returned the status of whether a synctask could be created or not. The meaning of NULL cbk in synctask_create() means the task is "joinable". Until synctask_join() is called on such a synctask, the task is not reaped and resources are not destroyed. The task would be in a zombie state after synctask_fn returns and before synctask_join() is called. Change-Id: I368ec9037de9510d2ba951f0aad86aaf18d9a6b6 BUG: 986775 Signed-off-by: Anand Avati <avati@redhat.com> Reviewed-on: http://review.gluster.org/5365 Tested-by: Gluster Build System <jenkins@build.gluster.com> Reviewed-by: Brian Foster <bfoster@redhat.com>
This commit is contained in:
parent
faef08b7cf
commit
bbcdbd8c36
@ -85,7 +85,7 @@ glusterfs_ctx_defaults_init (glusterfs_ctx_t *ctx)
|
||||
goto err;
|
||||
}
|
||||
|
||||
ctx->env = syncenv_new (0);
|
||||
ctx->env = syncenv_new (0, 0, 0);
|
||||
if (!ctx->env) {
|
||||
goto err;
|
||||
}
|
||||
|
@ -1957,7 +1957,7 @@ main (int argc, char *argv[])
|
||||
if (ret)
|
||||
goto out;
|
||||
|
||||
ctx->env = syncenv_new (0);
|
||||
ctx->env = syncenv_new (0, 0, 0);
|
||||
if (!ctx->env) {
|
||||
gf_log ("", GF_LOG_ERROR,
|
||||
"Could not create new sync-environment");
|
||||
|
@ -28,7 +28,7 @@ __run (struct synctask *task)
|
||||
case SYNCTASK_SUSPEND:
|
||||
break;
|
||||
case SYNCTASK_RUN:
|
||||
gf_log (task->xl->name, GF_LOG_WARNING,
|
||||
gf_log (task->xl->name, GF_LOG_DEBUG,
|
||||
"re-running already running task");
|
||||
env->runcount--;
|
||||
break;
|
||||
@ -38,7 +38,11 @@ __run (struct synctask *task)
|
||||
case SYNCTASK_DONE:
|
||||
gf_log (task->xl->name, GF_LOG_WARNING,
|
||||
"running completed task");
|
||||
break;
|
||||
return;
|
||||
case SYNCTASK_ZOMBIE:
|
||||
gf_log (task->xl->name, GF_LOG_WARNING,
|
||||
"attempted to wake up zombie!!");
|
||||
return;
|
||||
}
|
||||
|
||||
list_add_tail (&task->all_tasks, &env->runq);
|
||||
@ -70,7 +74,11 @@ __wait (struct synctask *task)
|
||||
case SYNCTASK_DONE:
|
||||
gf_log (task->xl->name, GF_LOG_WARNING,
|
||||
"running completed task");
|
||||
break;
|
||||
return;
|
||||
case SYNCTASK_ZOMBIE:
|
||||
gf_log (task->xl->name, GF_LOG_WARNING,
|
||||
"attempted to sleep a zombie!!");
|
||||
return;
|
||||
}
|
||||
|
||||
list_add_tail (&task->all_tasks, &env->waitq);
|
||||
@ -168,6 +176,7 @@ synctask_done (struct synctask *task)
|
||||
|
||||
pthread_mutex_lock (&task->mutex);
|
||||
{
|
||||
task->state = SYNCTASK_ZOMBIE;
|
||||
task->done = 1;
|
||||
pthread_cond_broadcast (&task->cond);
|
||||
}
|
||||
@ -191,20 +200,19 @@ synctask_setid (struct synctask *task, uid_t uid, gid_t gid)
|
||||
}
|
||||
|
||||
|
||||
int
|
||||
synctask_new (struct syncenv *env, synctask_fn_t fn, synctask_cbk_t cbk,
|
||||
call_frame_t *frame, void *opaque)
|
||||
struct synctask *
|
||||
synctask_create (struct syncenv *env, synctask_fn_t fn, synctask_cbk_t cbk,
|
||||
call_frame_t *frame, void *opaque)
|
||||
{
|
||||
struct synctask *newtask = NULL;
|
||||
xlator_t *this = THIS;
|
||||
int ret = 0;
|
||||
|
||||
VALIDATE_OR_GOTO (env, err);
|
||||
VALIDATE_OR_GOTO (fn, err);
|
||||
|
||||
newtask = CALLOC (1, sizeof (*newtask));
|
||||
if (!newtask)
|
||||
return -ENOMEM;
|
||||
return NULL;
|
||||
|
||||
newtask->frame = frame;
|
||||
if (!frame) {
|
||||
@ -263,21 +271,7 @@ synctask_new (struct syncenv *env, synctask_fn_t fn, synctask_cbk_t cbk,
|
||||
*/
|
||||
syncenv_scale(env);
|
||||
|
||||
if (!cbk) {
|
||||
pthread_mutex_lock (&newtask->mutex);
|
||||
{
|
||||
while (!newtask->done) {
|
||||
pthread_cond_wait (&newtask->cond, &newtask->mutex);
|
||||
}
|
||||
}
|
||||
pthread_mutex_unlock (&newtask->mutex);
|
||||
|
||||
ret = newtask->ret;
|
||||
|
||||
synctask_destroy (newtask);
|
||||
}
|
||||
|
||||
return ret;
|
||||
return newtask;
|
||||
err:
|
||||
if (newtask) {
|
||||
FREE (newtask->stack);
|
||||
@ -285,7 +279,46 @@ err:
|
||||
STACK_DESTROY (newtask->opframe->root);
|
||||
FREE (newtask);
|
||||
}
|
||||
return -1;
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
int
|
||||
synctask_join (struct synctask *task)
|
||||
{
|
||||
int ret = 0;
|
||||
|
||||
pthread_mutex_lock (&task->mutex);
|
||||
{
|
||||
while (!task->done)
|
||||
pthread_cond_wait (&task->cond, &task->mutex);
|
||||
}
|
||||
pthread_mutex_unlock (&task->mutex);
|
||||
|
||||
ret = task->ret;
|
||||
|
||||
synctask_destroy (task);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
int
|
||||
synctask_new (struct syncenv *env, synctask_fn_t fn, synctask_cbk_t cbk,
|
||||
call_frame_t *frame, void *opaque)
|
||||
{
|
||||
struct synctask *newtask = NULL;
|
||||
int ret = 0;
|
||||
|
||||
newtask = synctask_create (env, fn, cbk, frame, opaque);
|
||||
if (!newtask)
|
||||
return -1;
|
||||
|
||||
if (!cbk)
|
||||
ret = synctask_join (newtask);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
@ -308,7 +341,7 @@ syncenv_task (struct syncproc *proc)
|
||||
if (!list_empty (&env->runq))
|
||||
break;
|
||||
if ((ret == ETIMEDOUT) &&
|
||||
(env->procs > SYNCENV_PROC_MIN)) {
|
||||
(env->procs > env->procmin)) {
|
||||
task = NULL;
|
||||
env->procs--;
|
||||
memset (proc, 0, sizeof (*proc));
|
||||
@ -408,13 +441,13 @@ syncenv_scale (struct syncenv *env)
|
||||
goto unlock;
|
||||
|
||||
scale = env->runcount;
|
||||
if (scale > SYNCENV_PROC_MAX)
|
||||
scale = SYNCENV_PROC_MAX;
|
||||
if (scale > env->procmax)
|
||||
scale = env->procmax;
|
||||
if (scale > env->procs)
|
||||
diff = scale - env->procs;
|
||||
while (diff) {
|
||||
diff--;
|
||||
for (; (i < SYNCENV_PROC_MAX); i++) {
|
||||
for (; (i < env->procmax); i++) {
|
||||
if (env->proc[i].processor == 0)
|
||||
break;
|
||||
}
|
||||
@ -441,12 +474,20 @@ syncenv_destroy (struct syncenv *env)
|
||||
|
||||
|
||||
struct syncenv *
|
||||
syncenv_new (size_t stacksize)
|
||||
syncenv_new (size_t stacksize, int procmin, int procmax)
|
||||
{
|
||||
struct syncenv *newenv = NULL;
|
||||
int ret = 0;
|
||||
int i = 0;
|
||||
|
||||
if (!procmin || procmin < 0)
|
||||
procmin = SYNCENV_PROC_MIN;
|
||||
if (!procmax || procmax > SYNCENV_PROC_MAX)
|
||||
procmax = SYNCENV_PROC_MAX;
|
||||
|
||||
if (procmin > procmax)
|
||||
return NULL;
|
||||
|
||||
newenv = CALLOC (1, sizeof (*newenv));
|
||||
|
||||
if (!newenv)
|
||||
@ -461,8 +502,10 @@ syncenv_new (size_t stacksize)
|
||||
newenv->stacksize = SYNCENV_DEFAULT_STACKSIZE;
|
||||
if (stacksize)
|
||||
newenv->stacksize = stacksize;
|
||||
newenv->procmin = procmin;
|
||||
newenv->procmax = procmax;
|
||||
|
||||
for (i = 0; i < SYNCENV_PROC_MIN; i++) {
|
||||
for (i = 0; i < newenv->procmin; i++) {
|
||||
newenv->proc[i].env = newenv;
|
||||
ret = pthread_create (&newenv->proc[i].processor, NULL,
|
||||
syncenv_processor, &newenv->proc[i]);
|
||||
|
@ -41,6 +41,7 @@ typedef enum {
|
||||
SYNCTASK_SUSPEND,
|
||||
SYNCTASK_WAIT,
|
||||
SYNCTASK_DONE,
|
||||
SYNCTASK_ZOMBIE,
|
||||
} synctask_state_t;
|
||||
|
||||
/* for one sequential execution of @syncfn */
|
||||
@ -90,6 +91,9 @@ struct syncenv {
|
||||
struct list_head waitq;
|
||||
int waitcount;
|
||||
|
||||
int procmin;
|
||||
int procmax;
|
||||
|
||||
pthread_mutex_t mutex;
|
||||
pthread_cond_t cond;
|
||||
|
||||
@ -219,11 +223,14 @@ struct syncargs {
|
||||
|
||||
#define SYNCENV_DEFAULT_STACKSIZE (2 * 1024 * 1024)
|
||||
|
||||
struct syncenv * syncenv_new ();
|
||||
struct syncenv * syncenv_new (size_t stacksize, int procmin, int procmax);
|
||||
void syncenv_destroy (struct syncenv *);
|
||||
void syncenv_scale (struct syncenv *env);
|
||||
|
||||
int synctask_new (struct syncenv *, synctask_fn_t, synctask_cbk_t, call_frame_t* frame, void *);
|
||||
struct synctask *synctask_create (struct syncenv *, synctask_fn_t,
|
||||
synctask_cbk_t, call_frame_t *, void *);
|
||||
int synctask_join (struct synctask *task);
|
||||
void synctask_wake (struct synctask *task);
|
||||
void synctask_yield (struct synctask *task);
|
||||
void synctask_waitfor (struct synctask *task, int count);
|
||||
|
@ -2538,7 +2538,7 @@ init (xlator_t *this)
|
||||
goto out;
|
||||
}
|
||||
|
||||
pump_priv->env = syncenv_new (0);
|
||||
pump_priv->env = this->ctx->env;
|
||||
if (!pump_priv->env) {
|
||||
gf_log (this->name, GF_LOG_ERROR,
|
||||
"Could not create new sync-environment");
|
||||
@ -2579,9 +2579,6 @@ fini (xlator_t *this)
|
||||
if (!pump_priv)
|
||||
goto afr_priv;
|
||||
|
||||
if (pump_priv->env)
|
||||
syncenv_destroy (pump_priv->env);
|
||||
|
||||
GF_FREE (pump_priv->resume_path);
|
||||
LOCK_DESTROY (&pump_priv->resume_path_lock);
|
||||
LOCK_DESTROY (&pump_priv->pump_state_lock);
|
||||
|
Loading…
x
Reference in New Issue
Block a user