dht: reverting changes that takes lock on all subvols to prevent rmdir vs lookup selfheal race

Locking on all subvols before an rmdir is unable to remove all
directory entries. Hence reverting the patch for now.

Change-Id: I31baf2b2fa2f62c57429cd44f3f229c35eff1939
BUG: 1245065
Signed-off-by: Sakshi <sabansal@redhat.com>
Reviewed-on: http://review.gluster.org/12125
Tested-by: Gluster Build System <jenkins@build.gluster.com>
Tested-by: NetBSD Build System <jenkins@build.gluster.org>
Reviewed-by: Raghavendra G <rgowdapp@redhat.com>
This commit is contained in:
Sakshi 2015-08-31 16:06:35 +05:30 committed by Raghavendra G
parent fa08597a00
commit 7b91350456
5 changed files with 84 additions and 331 deletions

View File

@ -34,10 +34,6 @@ dht_removexattr2 (xlator_t *this, xlator_t *subvol, call_frame_t *frame);
int
dht_setxattr2 (xlator_t *this, xlator_t *subvol, call_frame_t *frame);
int
dht_rmdir_unlock (call_frame_t *frame, xlator_t *this);
int
dht_aggregate_quota_xattr (dict_t *dst, char *key, data_t *value)
{
@ -4528,6 +4524,7 @@ dht_readdirp_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int op_ret,
* corresponding hashed subvolume will take care of the
* directory entry.
*/
if (readdir_optimize) {
if (prev->this == local->first_up_subvol)
goto list;
@ -5012,7 +5009,7 @@ out:
if (local && local->lock.locks) {
/* store op_errno for failure case*/
local->op_errno = op_errno;
local->refresh_layout_unlock (frame, this, op_ret, 0);
local->refresh_layout_unlock (frame, this, op_ret);
if (op_ret == 0) {
DHT_STACK_UNWIND (mknod, frame, op_ret, op_errno,
@ -5073,7 +5070,7 @@ dht_mknod_linkfile_create_cbk (call_frame_t *frame, void *cookie,
return 0;
err:
if (local && local->lock.locks) {
local->refresh_layout_unlock (frame, this, -1, 0);
local->refresh_layout_unlock (frame, this, -1);
} else {
DHT_STACK_UNWIND (mknod, frame, -1,
op_errno, NULL, NULL, NULL,
@ -5182,7 +5179,7 @@ dht_mknod_do (call_frame_t *frame)
local->umask, local->params);
return 0;
err:
local->refresh_layout_unlock (frame, this, -1, 0);
local->refresh_layout_unlock (frame, this, -1);
return 0;
}
@ -5197,7 +5194,7 @@ dht_mknod_unlock_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
}
int32_t
dht_mknod_finish (call_frame_t *frame, xlator_t *this, int op_ret, int invoke_cbk)
dht_mknod_finish (call_frame_t *frame, xlator_t *this, int op_ret)
{
dht_local_t *local = NULL, *lock_local = NULL;
call_frame_t *lock_frame = NULL;
@ -5272,7 +5269,7 @@ dht_mknod_lock_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
return 0;
err:
dht_mknod_finish (frame, this, -1, 0);
dht_mknod_finish (frame, this, -1);
return 0;
}
@ -5303,7 +5300,7 @@ dht_mknod_lock (call_frame_t *frame, xlator_t *subvol)
local->lock.lk_count = count;
ret = dht_blocking_inodelk (frame, lk_array, count,
IGNORE_ENOENT_ESTALE, dht_mknod_lock_cbk);
dht_mknod_lock_cbk);
if (ret < 0) {
local->lock.locks = NULL;
@ -5831,7 +5828,7 @@ out:
if (local && local->lock.locks) {
/* store op_errno for failure case*/
local->op_errno = op_errno;
local->refresh_layout_unlock (frame, this, op_ret, 0);
local->refresh_layout_unlock (frame, this, op_ret);
if (op_ret == 0) {
DHT_STACK_UNWIND (create, frame, op_ret, op_errno, fd,
@ -5890,7 +5887,7 @@ dht_create_linkfile_create_cbk (call_frame_t *frame, void *cookie,
return 0;
err:
if (local && local->lock.locks) {
local->refresh_layout_unlock (frame, this, -1, 0);
local->refresh_layout_unlock (frame, this, -1);
} else {
DHT_STACK_UNWIND (create, frame, -1,
op_errno, NULL, NULL, NULL,
@ -6058,7 +6055,7 @@ dht_create_do (call_frame_t *frame)
local->umask, local->fd, local->params);
return 0;
err:
local->refresh_layout_unlock (frame, this, -1, 0);
local->refresh_layout_unlock (frame, this, -1);
return 0;
}
@ -6072,7 +6069,7 @@ dht_create_unlock_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
}
int32_t
dht_create_finish (call_frame_t *frame, xlator_t *this, int op_ret, int invoke_cbk)
dht_create_finish (call_frame_t *frame, xlator_t *this, int op_ret)
{
dht_local_t *local = NULL, *lock_local = NULL;
call_frame_t *lock_frame = NULL;
@ -6147,7 +6144,7 @@ dht_create_lock_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
return 0;
err:
dht_create_finish (frame, this, -1, 0);
dht_create_finish (frame, this, -1);
return 0;
}
@ -6178,7 +6175,7 @@ dht_create_lock (call_frame_t *frame, xlator_t *subvol)
local->lock.lk_count = count;
ret = dht_blocking_inodelk (frame, lk_array, count,
IGNORE_ENOENT_ESTALE, dht_create_lock_cbk);
dht_create_lock_cbk);
if (ret < 0) {
local->lock.locks = NULL;
@ -6638,7 +6635,6 @@ unlock:
this_call_cnt = dht_frame_return (frame);
if (is_last_call (this_call_cnt)) {
if (local->need_selfheal) {
dht_rmdir_unlock (frame, this);
local->layout =
dht_layout_get (this, local->loc.inode);
@ -6662,7 +6658,6 @@ unlock:
1);
}
dht_rmdir_unlock (frame, this);
DHT_STACK_UNWIND (rmdir, frame, local->op_ret,
local->op_errno, &local->preparent,
&local->postparent, NULL);
@ -6731,7 +6726,6 @@ unlock:
if (done) {
if (local->need_selfheal && local->fop_succeeded) {
dht_rmdir_unlock (frame, this);
local->layout =
dht_layout_get (this, local->loc.inode);
@ -6766,7 +6760,6 @@ unlock:
}
dht_rmdir_unlock (frame, this);
DHT_STACK_UNWIND (rmdir, frame, local->op_ret,
local->op_errno, &local->preparent,
&local->postparent, NULL);
@ -6777,111 +6770,12 @@ unlock:
}
int
dht_rmdir_unlock_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno, dict_t *xdata)
{
DHT_STACK_DESTROY (frame);
return 0;
}
int
dht_rmdir_unlock (call_frame_t *frame, xlator_t *this)
{
dht_local_t *local = NULL, *lock_local = NULL;
call_frame_t *lock_frame = NULL;
int lock_count = 0;
local = frame->local;
lock_count = dht_lock_count (local->lock.locks, local->lock.lk_count);
if (lock_count == 0)
goto done;
lock_frame = copy_frame (frame);
if (lock_frame == NULL)
goto done;
lock_local = dht_local_init (lock_frame, &local->loc, NULL,
lock_frame->root->op);
if (lock_local == NULL)
goto done;
lock_local->lock.locks = local->lock.locks;
lock_local->lock.lk_count = local->lock.lk_count;
local->lock.locks = NULL;
local->lock.lk_count = 0;
dht_unlock_inodelk (lock_frame, lock_local->lock.locks,
lock_local->lock.lk_count,
dht_rmdir_unlock_cbk);
lock_frame = NULL;
done:
if (lock_frame != NULL) {
DHT_STACK_DESTROY (lock_frame);
}
return 0;
}
int
dht_rmdir_lock_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno, dict_t *xdata)
{
dht_local_t *local = NULL;
dht_conf_t *conf = NULL;
int i = 0;
VALIDATE_OR_GOTO (this->private, err);
conf = this->private;
local = frame->local;
if (op_ret < 0) {
gf_msg (this->name, GF_LOG_WARNING, op_errno,
DHT_MSG_INODE_LK_ERROR,
"acquiring inodelk failed rmdir for %s)",
local->loc.path);
local->op_ret = -1;
local->op_errno = (op_errno == EAGAIN) ? EBUSY : op_errno;
goto err;
}
for (i = 0; i < conf->subvolume_cnt; i++) {
if (local->hashed_subvol &&
(local->hashed_subvol == conf->subvolumes[i]))
continue;
STACK_WIND (frame, dht_rmdir_cbk,
conf->subvolumes[i],
conf->subvolumes[i]->fops->rmdir,
&local->loc, local->flags, NULL);
}
return 0;
err:
/* No harm in calling an extra rmdir unlock */
dht_rmdir_unlock (frame, this);
DHT_STACK_UNWIND (rmdir, frame, local->op_ret, local->op_errno,
&local->preparent, &local->postparent, NULL);
return 0;
}
int
dht_rmdir_do (call_frame_t *frame, xlator_t *this)
{
dht_local_t *local = NULL;
dht_conf_t *conf = NULL;
dht_lock_t **lk_array = NULL;
int i = 0, ret = -1;
int count = 1;
int i = 0;
xlator_t *hashed_subvol = NULL;
char gfid[GF_UUID_BUF_SIZE] ={0};
@ -6895,6 +6789,7 @@ dht_rmdir_do (call_frame_t *frame, xlator_t *this)
local->call_cnt = conf->subvolume_cnt;
/* first remove from non-hashed_subvol */
hashed_subvol = dht_subvol_get_hashed (this, &local->loc);
@ -6918,49 +6813,20 @@ dht_rmdir_do (call_frame_t *frame, xlator_t *this)
return 0;
}
count = conf->subvolume_cnt;
for (i = 0; i < conf->subvolume_cnt; i++) {
if (hashed_subvol &&
(hashed_subvol == conf->subvolumes[i]))
continue;
lk_array = GF_CALLOC (count, sizeof (*lk_array), gf_common_mt_char);
if (lk_array == NULL) {
local->op_ret = -1;
local->op_errno = ENOMEM;
goto err;
}
for (i = 0; i < count; i++) {
lk_array[i] = dht_lock_new (frame->this,
conf->subvolumes[i],
&local->loc, F_WRLCK,
DHT_LAYOUT_HEAL_DOMAIN);
if (lk_array[i] == NULL) {
local->op_ret = -1;
local->op_errno = EINVAL;
goto err;
}
}
local->lock.locks = lk_array;
local->lock.lk_count = count;
ret = dht_blocking_inodelk (frame, lk_array, count,
IGNORE_ENOENT_ESTALE,
dht_rmdir_lock_cbk);
if (ret < 0) {
local->lock.locks = NULL;
local->lock.lk_count = 0;
local->op_ret = -1;
local->op_errno = errno ? errno : EINVAL;
goto err;
STACK_WIND (frame, dht_rmdir_cbk,
conf->subvolumes[i],
conf->subvolumes[i]->fops->rmdir,
&local->loc, local->flags, NULL);
}
return 0;
err:
if (lk_array != NULL) {
dht_lock_array_free (lk_array, count);
GF_FREE (lk_array);
}
DHT_STACK_UNWIND (rmdir, frame, local->op_ret, local->op_errno,
&local->preparent, &local->postparent, NULL);
return 0;

View File

@ -40,7 +40,7 @@ typedef int (*dht_defrag_cbk_fn_t) (xlator_t *this, xlator_t *dst_node,
call_frame_t *frame);
typedef int (*dht_refresh_layout_unlock) (call_frame_t *frame, xlator_t *this,
int op_ret, int invoke_cbk);
int op_ret);
typedef int (*dht_refresh_layout_done_handle) (call_frame_t *frame);
@ -131,11 +131,6 @@ typedef enum {
qdstatfs_action_COMPARE,
} qdstatfs_action_t;
typedef enum {
FAIL_ON_ANY_ERROR,
IGNORE_ENOENT_ESTALE
} dht_reaction_type_t;
struct dht_skip_linkto_unlink {
gf_boolean_t handle_valid_link;
@ -266,7 +261,6 @@ struct dht_local {
fop_inodelk_cbk_t inodelk_cbk;
dht_lock_t **locks;
int lk_count;
dht_reaction_type_t reaction;
/* whether locking failed on _any_ of the "locks" above */
int op_ret;
@ -1048,8 +1042,7 @@ dht_fill_dict_to_avoid_unlink_of_migrating_file (dict_t *dict);
int
dht_nonblocking_inodelk (call_frame_t *frame, dht_lock_t **lk_array,
int lk_count, dht_reaction_type_t reaction,
fop_inodelk_cbk_t inodelk_cbk);
int lk_count, fop_inodelk_cbk_t inodelk_cbk);
/* same as dht_nonblocking_inodelk, but issues sequential blocking locks on
* @lk_array directly. locks are issued on some order which remains same
@ -1057,8 +1050,7 @@ dht_nonblocking_inodelk (call_frame_t *frame, dht_lock_t **lk_array,
*/
int
dht_blocking_inodelk (call_frame_t *frame, dht_lock_t **lk_array,
int lk_count, dht_reaction_type_t reaction,
fop_inodelk_cbk_t inodelk_cbk);
int lk_count, fop_inodelk_cbk_t inodelk_cbk);
int32_t
dht_unlock_inodelk (call_frame_t *frame, dht_lock_t **lk_array, int lk_count,

View File

@ -342,7 +342,6 @@ dht_lock_new (xlator_t *this, xlator_t *xl, loc_t *loc, short type,
lock->xl = xl;
lock->type = type;
lock->domain = gf_strdup (domain);
if (lock->domain == NULL) {
dht_lock_free (lock);
@ -1680,8 +1679,7 @@ out:
int
dht_nonblocking_inodelk (call_frame_t *frame, dht_lock_t **lk_array,
int lk_count, dht_reaction_type_t reaction,
fop_inodelk_cbk_t inodelk_cbk)
int lk_count, fop_inodelk_cbk_t inodelk_cbk)
{
struct gf_flock flock = {0,};
int i = 0, ret = 0;
@ -1704,7 +1702,6 @@ dht_nonblocking_inodelk (call_frame_t *frame, dht_lock_t **lk_array,
dht_set_lkowner (lk_array, lk_count, &lock_frame->root->lk_owner);
local = lock_frame->local;
local->lock.reaction = reaction;
local->main_frame = frame;
local->call_cnt = lk_count;
@ -1735,42 +1732,21 @@ dht_blocking_inodelk_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno, dict_t *xdata)
{
int lk_index = 0;
int i = 0;
dht_local_t *local = NULL;
lk_index = (long) cookie;
local = frame->local;
if (op_ret == 0) {
local->lock.locks[lk_index]->locked = _gf_true;
} else {
switch (op_errno) {
case ESTALE:
case ENOENT:
if (local->lock.reaction != IGNORE_ENOENT_ESTALE) {
local->lock.op_ret = -1;
local->lock.op_errno = op_errno;
goto cleanup;
}
break;
default:
local->lock.op_ret = -1;
local->lock.op_errno = op_errno;
goto cleanup;
}
local->lock.op_ret = -1;
local->lock.op_errno = op_errno;
goto cleanup;
}
if (lk_index == (local->lock.lk_count - 1)) {
for (i = 0; (i < local->lock.lk_count) &&
(!local->lock.locks[i]->locked); i++) {
;
}
if (i == local->lock.lk_count) {
local->lock.op_ret = -1;
local->lock.op_errno = op_errno;
}
dht_inodelk_done (frame);
} else {
dht_blocking_inodelk_rec (frame, ++lk_index);
@ -1844,8 +1820,7 @@ out:
int
dht_blocking_inodelk (call_frame_t *frame, dht_lock_t **lk_array,
int lk_count, dht_reaction_type_t reaction,
fop_inodelk_cbk_t inodelk_cbk)
int lk_count, fop_inodelk_cbk_t inodelk_cbk)
{
int ret = -1;
call_frame_t *lock_frame = NULL;
@ -1867,7 +1842,6 @@ dht_blocking_inodelk (call_frame_t *frame, dht_lock_t **lk_array,
dht_set_lkowner (lk_array, lk_count, &lock_frame->root->lk_owner);
local = lock_frame->local;
local->lock.reaction = reaction;
local->main_frame = frame;
dht_blocking_inodelk_rec (lock_frame, 0);

View File

@ -1304,7 +1304,7 @@ dht_rename_lock (call_frame_t *frame)
local->lock.lk_count = count;
ret = dht_nonblocking_inodelk (frame, lk_array, count,
FAIL_ON_ANY_ERROR, dht_rename_lock_cbk);
dht_rename_lock_cbk);
if (ret < 0) {
local->lock.locks = NULL;
local->lock.lk_count = 0;

View File

@ -77,7 +77,7 @@ dht_selfheal_unlock_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
}
int
dht_selfheal_dir_finish (call_frame_t *frame, xlator_t *this, int ret, int invoke_cbk)
dht_selfheal_dir_finish (call_frame_t *frame, xlator_t *this, int ret)
{
dht_local_t *local = NULL, *lock_local = NULL;
call_frame_t *lock_frame = NULL;
@ -85,6 +85,7 @@ dht_selfheal_dir_finish (call_frame_t *frame, xlator_t *this, int ret, int invok
local = frame->local;
lock_count = dht_lock_count (local->lock.locks, local->lock.lk_count);
if (lock_count == 0)
goto done;
@ -111,9 +112,8 @@ dht_selfheal_dir_finish (call_frame_t *frame, xlator_t *this, int ret, int invok
lock_frame = NULL;
done:
if (!invoke_cbk)
local->selfheal.dir_cbk (frame, NULL, frame->this, ret,
local->op_errno, NULL);
local->selfheal.dir_cbk (frame, NULL, frame->this, ret,
local->op_errno, NULL);
if (lock_frame != NULL) {
DHT_STACK_DESTROY (lock_frame);
}
@ -155,13 +155,13 @@ dht_refresh_layout_done (call_frame_t *frame)
dht_layout_unref (frame->this, heal);
dht_selfheal_dir_finish (frame, frame->this, 0, 0);
dht_selfheal_dir_finish (frame, frame->this, 0);
}
return 0;
err:
dht_selfheal_dir_finish (frame, frame->this, -1, 0);
dht_selfheal_dir_finish (frame, frame->this, -1);
return 0;
}
@ -219,9 +219,8 @@ unlock:
return 0;
err:
local->refresh_layout_unlock (frame, this, -1, 0);
local->refresh_layout_unlock (frame, this, -1);
dht_selfheal_dir_finish (frame, this, -1, 0);
return 0;
}
@ -287,8 +286,7 @@ dht_refresh_layout (call_frame_t *frame)
return 0;
out:
local->refresh_layout_unlock (frame, this, -1, 0);
dht_selfheal_dir_finish (frame, this, -1, 0);
local->refresh_layout_unlock (frame, this, -1);
return 0;
}
@ -317,7 +315,7 @@ dht_selfheal_layout_lock_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
return 0;
err:
dht_selfheal_dir_finish (frame, this, -1, 0);
dht_selfheal_dir_finish (frame, this, -1);
return 0;
}
@ -578,7 +576,7 @@ dht_selfheal_layout_lock (call_frame_t *frame, dht_layout_t *layout,
local->lock.locks = lk_array;
local->lock.lk_count = count;
ret = dht_blocking_inodelk (frame, lk_array, count, FAIL_ON_ANY_ERROR,
ret = dht_blocking_inodelk (frame, lk_array, count,
dht_selfheal_layout_lock_cbk);
if (ret < 0) {
local->lock.locks = NULL;
@ -589,7 +587,13 @@ dht_selfheal_layout_lock (call_frame_t *frame, dht_layout_t *layout,
return 0;
err:
if (lk_array != NULL) {
dht_lock_array_free (lk_array, count);
int tmp_count = 0, i = 0;
for (i = 0; (i < count) && (lk_array[i]); i++, tmp_count++) {
;
}
dht_lock_array_free (lk_array, tmp_count);
GF_FREE (lk_array);
}
@ -628,7 +632,7 @@ dht_selfheal_dir_xattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
this_call_cnt = dht_frame_return (frame);
if (is_last_call (this_call_cnt)) {
dht_selfheal_dir_finish (frame, this, 0, 0);
dht_selfheal_dir_finish (frame, this, 0);
}
return 0;
@ -824,7 +828,7 @@ dht_selfheal_dir_xattr (call_frame_t *frame, loc_t *loc, dht_layout_t *layout)
missing_xattr, loc->path);
if (missing_xattr == 0) {
dht_selfheal_dir_finish (frame, this, 0, 0);
dht_selfheal_dir_finish (frame, this, 0);
return 0;
}
@ -951,7 +955,7 @@ dht_selfheal_dir_xattr_for_nameless_lookup (call_frame_t *frame, loc_t *loc,
missing_xattr, loc->path);
if (missing_xattr == 0) {
dht_selfheal_dir_finish (frame, this, 0, 0);
dht_selfheal_dir_finish (frame, this, 0);
return 0;
}
@ -1019,7 +1023,7 @@ dht_selfheal_dir_setattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
dht_should_heal_layout);
if (ret < 0) {
dht_selfheal_dir_finish (frame, this, -1, 0);
dht_selfheal_dir_finish (frame, this, -1);
}
}
@ -1050,7 +1054,7 @@ dht_selfheal_dir_setattr (call_frame_t *frame, loc_t *loc, struct iatt *stbuf,
dht_should_heal_layout);
if (ret < 0) {
dht_selfheal_dir_finish (frame, this, -1, 0);
dht_selfheal_dir_finish (frame, this, -1);
}
return 0;
@ -1088,7 +1092,7 @@ dht_selfheal_dir_mkdir_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
dht_layout_t *layout = NULL;
call_frame_t *prev = NULL;
xlator_t *subvol = NULL;
int i = 0, ret = -1;
int i = 0;
int this_call_cnt = 0;
char gfid[GF_UUID_BUF_SIZE] = {0};
@ -1117,13 +1121,11 @@ dht_selfheal_dir_mkdir_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
}
dht_iatt_merge (this, &local->preparent, preparent, prev->this);
dht_iatt_merge (this, &local->postparent, postparent, prev->this);
ret = 0;
out:
this_call_cnt = dht_frame_return (frame);
if (is_last_call (this_call_cnt)) {
dht_selfheal_dir_finish (frame, this, ret, -1);
dht_selfheal_dir_setattr (frame, &local->loc, &local->stbuf, 0xffffff, layout);
}
@ -1176,33 +1178,33 @@ out:
}
int
dht_selfheal_dir_mkdir_lock_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno, dict_t *xdata)
dht_selfheal_dir_mkdir (call_frame_t *frame, loc_t *loc,
dht_layout_t *layout, int force)
{
dht_local_t *local = NULL;
int missing_dirs = 0;
int i = 0;
int ret = -1;
dht_local_t *local = NULL;
xlator_t *this = NULL;
dict_t *dict = NULL;
dht_layout_t *layout = NULL;
loc_t *loc = NULL;
VALIDATE_OR_GOTO (this->private, err);
local = frame->local;
layout = local->layout;
loc = &local->loc;
this = frame->this;
if (op_ret < 0) {
gf_msg (this->name, GF_LOG_WARNING, op_errno,
DHT_MSG_INODE_LK_ERROR,
"acquiring inodelk failed for %s",
loc->path);
local->selfheal.force_mkdir = force ? _gf_true : _gf_false;
local->op_ret = -1;
local->op_errno = (op_errno == EAGAIN) ? EBUSY : op_errno;
goto err;
for (i = 0; i < layout->cnt; i++) {
if (layout->list[i].err == ENOENT || force)
missing_dirs++;
}
if (missing_dirs == 0) {
dht_selfheal_dir_setattr (frame, loc, &local->stbuf,
0xffffffff, layout);
return 0;
}
local->call_cnt = missing_dirs;
if (!gf_uuid_is_null (local->gfid)) {
dict = dict_new ();
if (!dict)
@ -1216,7 +1218,6 @@ dht_selfheal_dir_mkdir_lock_cbk (call_frame_t *frame, void *cookie, xlator_t *th
" key = gfid-req", loc->path);
} else if (local->params) {
/* Send the dictionary from higher layers directly */
dict = dict_ref (local->params);
}
/* Set acls */
@ -1228,18 +1229,8 @@ dht_selfheal_dir_mkdir_lock_cbk (call_frame_t *frame, void *cookie, xlator_t *th
DHT_MSG_DICT_SET_FAILED,
"dict is NULL, need to make sure gfids are same");
/* We don't have to do a lookup here again:
1) Parallel rmdir would had removed the directory and locking would
have anyway failed with an ESTALE on all subvols. Hence selfheal
will never create the directory.
2) Parallel lookup creating directory does not have to be mutually
exclusive for the mkdir phase of lookup selfheal.
*/
for (i = 0; i < layout->cnt; i++) {
if (layout->list[i].err == ENOENT ||
local->selfheal.force_mkdir) {
if (layout->list[i].err == ENOENT || force) {
gf_msg_debug (this->name, 0,
"Creating directory %s on subvol %s",
loc->path, layout->list[i].xlator->name);
@ -1258,82 +1249,6 @@ dht_selfheal_dir_mkdir_lock_cbk (call_frame_t *frame, void *cookie, xlator_t *th
dict_unref (dict);
return 0;
err:
dht_selfheal_dir_finish (frame, this, -1, 0);
return 0;
}
int
dht_selfheal_dir_mkdir (call_frame_t *frame, loc_t *loc,
dht_layout_t *layout, int force)
{
int missing_dirs = 0;
int i = 0;
int ret = -1;
int count = 1;
dht_local_t *local = NULL;
dht_conf_t *conf = NULL;
xlator_t *this = NULL;
dht_lock_t **lk_array = NULL;
local = frame->local;
this = frame->this;
conf = this->private;
local->selfheal.force_mkdir = force ? _gf_true : _gf_false;
for (i = 0; i < layout->cnt; i++) {
if (layout->list[i].err == ENOENT || force)
missing_dirs++;
}
if (missing_dirs == 0) {
dht_selfheal_dir_setattr (frame, loc, &local->stbuf,
0xffffffff, layout);
return 0;
}
local->call_cnt = missing_dirs;
count = conf->subvolume_cnt;
/* Locking on all subvols in the mkdir phase of lookup selfheal is
is done to synchronize with rmdir/rename.
*/
lk_array = GF_CALLOC (count, sizeof (*lk_array), gf_common_mt_char);
if (lk_array == NULL)
goto err;
for (i = 0; i < count; i++) {
lk_array[i] = dht_lock_new (frame->this,
conf->subvolumes[i],
&local->loc, F_WRLCK,
DHT_LAYOUT_HEAL_DOMAIN);
if (lk_array[i] == NULL)
goto err;
}
local->lock.locks = lk_array;
local->lock.lk_count = count;
ret = dht_blocking_inodelk (frame, lk_array, count,
IGNORE_ENOENT_ESTALE,
dht_selfheal_dir_mkdir_lock_cbk);
if (ret < 0) {
local->lock.locks = NULL;
local->lock.lk_count = 0;
goto err;
}
return 0;
err:
if (lk_array != NULL) {
dht_lock_array_free (lk_array, count);
GF_FREE (lk_array);
}
return -1;
}
int
@ -1905,7 +1820,7 @@ dht_selfheal_directory (call_frame_t *frame, dht_selfheal_dir_cbk_t dir_cbk,
sorry_no_fix:
/* TODO: need to put appropriate local->op_errno */
dht_selfheal_dir_finish (frame, this, ret, 0);
dht_selfheal_dir_finish (frame, this, ret);
return 0;
}
@ -1973,7 +1888,7 @@ dht_selfheal_directory_for_nameless_lookup (call_frame_t *frame,
sorry_no_fix:
/* TODO: need to put appropriate local->op_errno */
dht_selfheal_dir_finish (frame, this, ret, 0);
dht_selfheal_dir_finish (frame, this, ret);
return 0;
@ -2326,7 +2241,7 @@ dht_update_commit_hash_for_layout (call_frame_t *frame)
local->lock.locks = lk_array;
local->lock.lk_count = count;
ret = dht_blocking_inodelk (frame, lk_array, count, FAIL_ON_ANY_ERROR,
ret = dht_blocking_inodelk (frame, lk_array, count,
dht_update_commit_hash_for_layout_resume);
if (ret < 0) {
local->lock.locks = NULL;
@ -2337,7 +2252,13 @@ dht_update_commit_hash_for_layout (call_frame_t *frame)
return 0;
err:
if (lk_array != NULL) {
dht_lock_array_free (lk_array, count);
int tmp_count = 0, i = 0;
for (i = 0; (i < count) && (lk_array[i]); i++, tmp_count++) {
;
}
dht_lock_array_free (lk_array, tmp_count);
GF_FREE (lk_array);
}