synctask: implement barriers around yield, not the other way
In the current implementation, barriers are in the core of the syncprocessors. Wake()s are treated as syncbarrier wake. This is however delicate, as spurious wake()s of the synctask can mess up the accounting of the barrier and waking it prematurely. The fix is to keep yield() and wake() as the basic primitives, and implement barriers as an object impelemented on top of these primitives. This way, only an explicit barrier_wake() gets counted towards the barrier accounting, and spurious wakes will be truly safe. Change-Id: I8087f0f446113e5b2d0853431c0354335ccda076 BUG: 948686 Signed-off-by: Anand Avati <avati@redhat.com> Reviewed-on: http://review.gluster.org/4921 Tested-by: Gluster Build System <jenkins@build.gluster.com> Reviewed-by: Krishnan Parthasarathi <kparthas@redhat.com>
This commit is contained in:
parent
54b9cd3df3
commit
83cedcd9be
@ -80,24 +80,15 @@ __wait (struct synctask *task)
|
||||
|
||||
|
||||
void
|
||||
synctask_waitfor (struct synctask *task, int waitfor)
|
||||
synctask_yield (struct synctask *task)
|
||||
{
|
||||
struct syncenv *env = NULL;
|
||||
xlator_t *oldTHIS = THIS;
|
||||
|
||||
env = task->env;
|
||||
|
||||
#if defined(__NetBSD__) && defined(_UC_TLSBASE)
|
||||
/* Preserve pthread private pointer through swapcontex() */
|
||||
task->proc->sched.uc_flags &= ~_UC_TLSBASE;
|
||||
#endif
|
||||
|
||||
pthread_mutex_lock (&env->mutex);
|
||||
{
|
||||
task->waitfor = waitfor;
|
||||
}
|
||||
pthread_mutex_unlock (&env->mutex);
|
||||
|
||||
if (swapcontext (&task->ctx, &task->proc->sched) < 0) {
|
||||
gf_log ("syncop", GF_LOG_ERROR,
|
||||
"swapcontext failed (%s)", strerror (errno));
|
||||
@ -107,13 +98,6 @@ synctask_waitfor (struct synctask *task, int waitfor)
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
synctask_yield (struct synctask *task)
|
||||
{
|
||||
synctask_waitfor (task, 1);
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
synctask_yawn (struct synctask *task)
|
||||
{
|
||||
@ -124,7 +108,6 @@ synctask_yawn (struct synctask *task)
|
||||
pthread_mutex_lock (&env->mutex);
|
||||
{
|
||||
task->woken = 0;
|
||||
task->waitfor = 0;
|
||||
}
|
||||
pthread_mutex_unlock (&env->mutex);
|
||||
}
|
||||
@ -139,9 +122,9 @@ synctask_wake (struct synctask *task)
|
||||
|
||||
pthread_mutex_lock (&env->mutex);
|
||||
{
|
||||
task->woken++;
|
||||
task->woken = 1;
|
||||
|
||||
if (task->slept && task->woken >= task->waitfor)
|
||||
if (task->slept)
|
||||
__run (task);
|
||||
|
||||
pthread_cond_broadcast (&env->cond);
|
||||
@ -352,7 +335,6 @@ syncenv_task (struct syncproc *proc)
|
||||
|
||||
task->woken = 0;
|
||||
task->slept = 0;
|
||||
task->waitfor = 0;
|
||||
|
||||
task->proc = proc;
|
||||
}
|
||||
@ -390,7 +372,7 @@ synctask_switchto (struct synctask *task)
|
||||
|
||||
pthread_mutex_lock (&env->mutex);
|
||||
{
|
||||
if (task->woken >= task->waitfor) {
|
||||
if (task->woken) {
|
||||
__run (task);
|
||||
} else {
|
||||
task->slept = 1;
|
||||
@ -655,6 +637,122 @@ synclock_unlock (synclock_t *lock)
|
||||
return ret;
|
||||
}
|
||||
|
||||
/* Barriers */
|
||||
|
||||
int
|
||||
syncbarrier_init (struct syncbarrier *barrier)
|
||||
{
|
||||
if (!barrier) {
|
||||
errno = EINVAL;
|
||||
return -1;
|
||||
}
|
||||
|
||||
pthread_cond_init (&barrier->cond, 0);
|
||||
barrier->count = 0;
|
||||
INIT_LIST_HEAD (&barrier->waitq);
|
||||
|
||||
return pthread_mutex_init (&barrier->guard, 0);
|
||||
}
|
||||
|
||||
|
||||
int
|
||||
syncbarrier_destroy (struct syncbarrier *barrier)
|
||||
{
|
||||
if (!barrier) {
|
||||
errno = EINVAL;
|
||||
return -1;
|
||||
}
|
||||
|
||||
pthread_cond_destroy (&barrier->cond);
|
||||
return pthread_mutex_destroy (&barrier->guard);
|
||||
}
|
||||
|
||||
|
||||
static int
|
||||
__syncbarrier_wait (struct syncbarrier *barrier, int waitfor)
|
||||
{
|
||||
struct synctask *task = NULL;
|
||||
|
||||
if (!barrier) {
|
||||
errno = EINVAL;
|
||||
return -1;
|
||||
}
|
||||
|
||||
task = synctask_get ();
|
||||
|
||||
while (barrier->count < waitfor) {
|
||||
if (task) {
|
||||
/* called within a synctask */
|
||||
list_add_tail (&task->waitq, &barrier->waitq);
|
||||
{
|
||||
pthread_mutex_unlock (&barrier->guard);
|
||||
synctask_yield (task);
|
||||
pthread_mutex_lock (&barrier->guard);
|
||||
}
|
||||
list_del_init (&task->waitq);
|
||||
} else {
|
||||
/* called by a non-synctask */
|
||||
pthread_cond_wait (&barrier->cond, &barrier->guard);
|
||||
}
|
||||
}
|
||||
|
||||
barrier->count = 0;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
int
|
||||
syncbarrier_wait (struct syncbarrier *barrier, int waitfor)
|
||||
{
|
||||
int ret = 0;
|
||||
|
||||
pthread_mutex_lock (&barrier->guard);
|
||||
{
|
||||
ret = __syncbarrier_wait (barrier, waitfor);
|
||||
}
|
||||
pthread_mutex_unlock (&barrier->guard);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
static int
|
||||
__syncbarrier_wake (struct syncbarrier *barrier)
|
||||
{
|
||||
struct synctask *task = NULL;
|
||||
|
||||
if (!barrier) {
|
||||
errno = EINVAL;
|
||||
return -1;
|
||||
}
|
||||
|
||||
barrier->count++;
|
||||
|
||||
pthread_cond_signal (&barrier->cond);
|
||||
if (!list_empty (&barrier->waitq)) {
|
||||
task = list_entry (barrier->waitq.next, struct synctask, waitq);
|
||||
synctask_wake (task);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
int
|
||||
syncbarrier_wake (struct syncbarrier *barrier)
|
||||
{
|
||||
int ret = 0;
|
||||
|
||||
pthread_mutex_lock (&barrier->guard);
|
||||
{
|
||||
ret = __syncbarrier_wake (barrier);
|
||||
}
|
||||
pthread_mutex_unlock (&barrier->guard);
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
|
||||
/* FOPS */
|
||||
|
||||
|
@ -57,7 +57,6 @@ struct synctask {
|
||||
void *stack;
|
||||
int woken;
|
||||
int slept;
|
||||
int waitfor;
|
||||
int ret;
|
||||
|
||||
uid_t uid;
|
||||
@ -107,6 +106,16 @@ struct synclock {
|
||||
};
|
||||
typedef struct synclock synclock_t;
|
||||
|
||||
|
||||
struct syncbarrier {
|
||||
pthread_mutex_t guard; /* guard the remaining members, pair @cond */
|
||||
pthread_cond_t cond; /* waiting non-synctasks */
|
||||
struct list_head waitq; /* waiting synctasks */
|
||||
int count; /* count the number of wakes */
|
||||
};
|
||||
typedef struct syncbarrier syncbarrier_t;
|
||||
|
||||
|
||||
struct syncargs {
|
||||
int op_ret;
|
||||
int op_errno;
|
||||
@ -128,11 +137,13 @@ struct syncargs {
|
||||
dict_t *dict;
|
||||
pthread_mutex_t lock_dict;
|
||||
|
||||
syncbarrier_t barrier;
|
||||
|
||||
/* do not touch */
|
||||
struct synctask *task;
|
||||
pthread_mutex_t mutex;
|
||||
pthread_cond_t cond;
|
||||
int wakecnt;
|
||||
int done;
|
||||
};
|
||||
|
||||
|
||||
@ -143,7 +154,7 @@ struct syncargs {
|
||||
} else { \
|
||||
pthread_mutex_init (&args->mutex, NULL); \
|
||||
pthread_cond_init (&args->cond, NULL); \
|
||||
args->wakecnt = 0; \
|
||||
args->done = 0; \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
@ -154,7 +165,7 @@ struct syncargs {
|
||||
} else { \
|
||||
pthread_mutex_lock (&args->mutex); \
|
||||
{ \
|
||||
args->wakecnt++; \
|
||||
args->done = 1; \
|
||||
pthread_cond_signal (&args->cond); \
|
||||
} \
|
||||
pthread_mutex_unlock (&args->mutex); \
|
||||
@ -162,13 +173,13 @@ struct syncargs {
|
||||
} while (0)
|
||||
|
||||
|
||||
#define __waitfor(args, cnt) do { \
|
||||
#define __yield(args) do { \
|
||||
if (args->task) { \
|
||||
synctask_waitfor (args->task, cnt); \
|
||||
synctask_yield (args->task); \
|
||||
} else { \
|
||||
pthread_mutex_lock (&args->mutex); \
|
||||
{ \
|
||||
while (args->wakecnt < cnt) \
|
||||
while (!args->done) \
|
||||
pthread_cond_wait (&args->cond, \
|
||||
&args->mutex); \
|
||||
} \
|
||||
@ -179,9 +190,6 @@ struct syncargs {
|
||||
} while (0)
|
||||
|
||||
|
||||
#define __yield(args) __waitfor(args, 1)
|
||||
|
||||
|
||||
#define SYNCOP(subvol, stb, cbk, op, params ...) do { \
|
||||
struct synctask *task = NULL; \
|
||||
call_frame_t *frame = NULL; \
|
||||
@ -225,9 +233,9 @@ void synctask_yield (struct synctask *task);
|
||||
void synctask_yawn (struct synctask *task);
|
||||
void synctask_waitfor (struct synctask *task, int count);
|
||||
|
||||
#define synctask_barrier_init(args) __yawn (args)
|
||||
#define synctask_barrier_wait(args, n) __waitfor (args, n)
|
||||
#define synctask_barrier_wake(args) __wake (args)
|
||||
#define synctask_barrier_init(args) syncbarrier_init (&args->barrier)
|
||||
#define synctask_barrier_wait(args, n) syncbarrier_wait (&args->barrier, n)
|
||||
#define synctask_barrier_wake(args) syncbarrier_wake (&args->barrier)
|
||||
|
||||
int synctask_setid (struct synctask *task, uid_t uid, gid_t gid);
|
||||
#define SYNCTASK_SETID(uid, gid) synctask_setid (synctask_get(), uid, gid);
|
||||
@ -256,6 +264,12 @@ int synclock_lock (synclock_t *lock);
|
||||
int synclock_trylock (synclock_t *lock);
|
||||
int synclock_unlock (synclock_t *lock);
|
||||
|
||||
|
||||
int syncbarrier_init (syncbarrier_t *barrier);
|
||||
int syncbarrier_wait (syncbarrier_t *barrier, int waitfor);
|
||||
int syncbarrier_wake (syncbarrier_t *barrier);
|
||||
int syncbarrier_destroy (syncbarrier_t *barrier);
|
||||
|
||||
int syncop_lookup (xlator_t *subvol, loc_t *loc, dict_t *xattr_req,
|
||||
/* out */
|
||||
struct iatt *iatt, dict_t **xattr_rsp, struct iatt *parent);
|
||||
|
@ -26,6 +26,8 @@ gd_synctask_barrier_wait (struct syncargs *args, int count)
|
||||
synclock_unlock (&conf->big_lock);
|
||||
synctask_barrier_wait (args, count);
|
||||
synclock_lock (&conf->big_lock);
|
||||
|
||||
syncbarrier_destroy (&args->barrier);
|
||||
}
|
||||
|
||||
static void
|
||||
|
Loading…
x
Reference in New Issue
Block a user