features/marker: changes in marker to avoid race conditions and corruptions

Change-Id: I38ddfab200d59dd4c8e9f9dd964a98f3d7aa7ab7
BUG: 3389
Reviewed-on: http://review.gluster.com/289
Tested-by: Gluster Build System <jenkins@build.gluster.com>
Reviewed-by: Vijay Bellur <vijay@gluster.com>
This commit is contained in:
Raghavendra Bhat 2011-08-21 18:53:04 +05:30 committed by Vijay Bellur
parent 08e8c96686
commit b6e3e9c480
4 changed files with 217 additions and 61 deletions

View File

@ -32,8 +32,12 @@ quota_loc_fill (loc_t *loc, inode_t *inode, inode_t *parent, char *path)
{
int ret = -1;
if (!loc)
return ret;
GF_VALIDATE_OR_GOTO ("marker", loc, out);
GF_VALIDATE_OR_GOTO ("marker", inode, out);
GF_VALIDATE_OR_GOTO ("marker", path, out);
/* Not checking for parent because while filling
* loc of root, parent will be NULL
*/
if (inode) {
loc->inode = inode_ref (inode);
@ -59,7 +63,7 @@ quota_loc_fill (loc_t *loc, inode_t *inode, inode_t *parent, char *path)
loc_wipe:
if (ret < 0)
loc_wipe (loc);
out:
return ret;
}
@ -180,6 +184,7 @@ __add_new_contribution_node (xlator_t *this, quota_inode_ctx_t *ctx, loc_t *loc)
uuid_copy (contribution->gfid, loc->parent->gfid);
LOCK_INIT (&contribution->lock);
INIT_LIST_HEAD (&contribution->contri_list);
list_add_tail (&contribution->contri_list, &ctx->contribution_head);
@ -363,7 +368,7 @@ quota_local_unref (xlator_t *this, quota_local_t *local)
QUOTA_SAFE_DECREMENT (&local->lock, local->ref, ref);
if (ref > 0)
if (ref != 0)
goto out;
if (local->fd != NULL)
@ -374,6 +379,8 @@ quota_local_unref (xlator_t *this, quota_local_t *local)
loc_wipe (&local->parent_loc);
LOCK_DESTROY (&local->lock);
GF_FREE (local);
out:
return 0;
}

View File

@ -30,6 +30,46 @@
#include "marker-quota.h"
#include "marker-quota-helper.h"
int
mq_loc_copy (loc_t *dst, loc_t *src)
{
int ret = -1;
GF_VALIDATE_OR_GOTO ("marker", dst, out);
GF_VALIDATE_OR_GOTO ("marker", src, out);
if (src->inode == NULL ||
src->path == NULL) {
gf_log ("marker", GF_LOG_WARNING,
"src loc is not valid");
goto out;
}
ret = loc_copy (dst, src);
out:
return ret;
}
int32_t
mq_get_local_err (quota_local_t *local,
int32_t *val)
{
int32_t ret = -1;
GF_VALIDATE_OR_GOTO ("marker", local, out);
GF_VALIDATE_OR_GOTO ("marker", val, out);
LOCK (&local->lock);
{
*val = local->err;
}
UNLOCK (&local->lock);
ret = 0;
out:
return ret;
}
int32_t
mq_get_ctx_updation_status (quota_inode_ctx_t *ctx,
gf_boolean_t *status)
@ -182,6 +222,8 @@ release_lock_on_dirty_inode (call_frame_t *frame, void *cookie, xlator_t *this,
{
struct gf_flock lock = {0, };
quota_local_t *local = NULL;
loc_t loc = {0, };
int ret = -1;
local = frame->local;
@ -202,11 +244,31 @@ release_lock_on_dirty_inode (call_frame_t *frame, void *cookie, xlator_t *this,
lock.l_len = 0;
lock.l_pid = 0;
ret = loc_copy (&loc, &local->loc);
if (ret == -1) {
local->err = -1;
frame->local = NULL;
dirty_inode_updation_done (frame, NULL, this, 0, 0);
return 0;
}
if (local->loc.inode == NULL) {
gf_log (this->name, GF_LOG_WARNING,
"Inode is NULL, so can't stackwind.");
goto out;
}
STACK_WIND (frame,
dirty_inode_updation_done,
FIRST_CHILD(this),
FIRST_CHILD(this)->fops->inodelk,
this->name, &local->loc, F_SETLKW, &lock);
this->name, &loc, F_SETLKW, &lock);
loc_wipe (&loc);
return 0;
out:
dirty_inode_updation_done (frame, NULL, this, -1, 0);
return 0;
}
@ -338,6 +400,29 @@ err:
return 0;
}
int32_t
mq_test_and_set_local_err(quota_local_t *local,
int32_t *val)
{
int tmp = 0;
int32_t ret = -1;
GF_VALIDATE_OR_GOTO ("marker", local, out);
GF_VALIDATE_OR_GOTO ("marker", val, out);
LOCK (&local->lock);
{
tmp = local->err;
local->err = *val;
*val = tmp;
}
UNLOCK (&local->lock);
ret = 0;
out:
return ret;
}
int32_t
get_dirty_inode_size (call_frame_t *frame, xlator_t *this)
{
@ -401,17 +486,19 @@ get_child_contribution (call_frame_t *frame,
QUOTA_STACK_DESTROY (frame, this);
if (op_ret == -1) {
gf_log (this->name, GF_LOG_ERROR, "%s", strerror (op_errno));
gf_log (this->name, GF_LOG_ERROR, "%s",
strerror (op_errno));
val = -2;
if (!mq_test_and_set_local_err (local, &val) &&
val != -2)
release_lock_on_dirty_inode (local->frame, NULL, this, 0, 0);
local->err = -2;
release_lock_on_dirty_inode (local->frame, NULL, this, 0, 0);
goto out;
goto exit;
}
if (local->err)
goto out;
ret = mq_get_local_err (local, &val);
if (!ret && val == -2)
goto exit;
GET_CONTRI_KEY (contri_key, local->loc.inode->gfid, ret);
if (ret < 0)
@ -430,16 +517,15 @@ out:
}
UNLOCK (&local->lock);
if (val== 0) {
if (local->err) {
QUOTA_SAFE_DECREMENT (&local->lock, local->ref, val);
quota_local_unref (this, local);
} else
quota_dirty_inode_readdir (local->frame, NULL, this,
if (val == 0) {
quota_dirty_inode_readdir (local->frame, NULL, this,
0, 0, NULL);
}
quota_local_unref (this, local);
return 0;
exit:
quota_local_unref (this, local);
return 0;
}
@ -453,6 +539,7 @@ quota_readdir_cbk (call_frame_t *frame,
{
char contri_key [512] = {0, };
int32_t ret = 0;
int32_t val = 0;
off_t offset = 0;
int32_t count = 0;
dict_t *dict = NULL;
@ -493,16 +580,19 @@ quota_readdir_cbk (call_frame_t *frame,
count++;
}
if (count == 0) {
get_dirty_inode_size (frame, this);
return 0;
}
local->frame = frame;
if (count > 0) {
LOCK (&local->lock);
{
local->dentry_child_count = count;
local->d_off = offset;
}
UNLOCK (&local->lock);
LOCK (&local->lock);
{
local->dentry_child_count = count;
local->d_off = offset;
}
UNLOCK (&local->lock);
list_for_each_entry (entry, (&entries->list), list) {
@ -526,7 +616,7 @@ quota_readdir_cbk (call_frame_t *frame,
goto out;
}
newframe->local = local;
newframe->local = quota_local_ref (local);
dict = dict_new ();
if (!dict) {
@ -559,18 +649,12 @@ quota_readdir_cbk (call_frame_t *frame,
}
if (ret) {
LOCK (&local->lock);
{
if (local->dentry_child_count == 0)
local->err = -1;
else
local->err = -2;
}
UNLOCK (&local->lock);
val = -2;
mq_test_and_set_local_err (local, &val);
if (newframe) {
newframe->local = NULL;
quota_local_unref(this, local);
QUOTA_STACK_DESTROY (newframe, this);
}
@ -578,10 +662,8 @@ quota_readdir_cbk (call_frame_t *frame,
}
}
if (ret) {
if (ret && val != -2) {
release_lock_on_dirty_inode (frame, NULL, this, 0, 0);
} else if (count == 0 ) {
get_dirty_inode_size (frame, this);
}
return 0;
@ -767,8 +849,7 @@ update_dirty_inode (xlator_t *this,
goto fr_destroy;
frame->local = local;
ret = loc_copy (&local->loc, loc);
ret = mq_loc_copy (&local->loc, loc);
if (ret < 0)
goto fr_destroy;
@ -781,6 +862,13 @@ update_dirty_inode (xlator_t *this,
lock.l_start = 0;
lock.l_len = 0;
if (local->loc.inode == NULL) {
ret = -1;
gf_log (this->name, GF_LOG_WARNING,
"Inode is NULL, so can't stackwind.");
goto fr_destroy;
}
STACK_WIND (frame,
get_dirty_xattr,
FIRST_CHILD(this),
@ -1113,26 +1201,53 @@ err:
int32_t
get_parent_inode_local (xlator_t *this, quota_local_t *local)
{
int32_t ret;
int32_t ret = -1;
quota_inode_ctx_t *ctx = NULL;
GF_VALIDATE_OR_GOTO ("marker", this, out);
GF_VALIDATE_OR_GOTO ("marker", local, out);
loc_wipe (&local->loc);
loc_copy (&local->loc, &local->parent_loc);
ret = mq_loc_copy (&local->loc, &local->parent_loc);
if (ret < 0) {
gf_log_callingfn (this->name, GF_LOG_WARNING,
"loc copy failed");
goto out;
}
loc_wipe (&local->parent_loc);
quota_inode_loc_fill (NULL, local->loc.parent, &local->parent_loc);
ret = quota_inode_loc_fill (NULL, local->loc.parent,
&local->parent_loc);
if (ret < 0) {
gf_log_callingfn (this->name, GF_LOG_WARNING,
"failed to build parent loc of %s",
local->loc.path);
goto out;
}
ret = quota_inode_ctx_get (local->loc.inode, this, &ctx);
if (ret < 0)
return -1;
if (ret < 0) {
gf_log_callingfn (this->name, GF_LOG_WARNING,
"inode ctx get failed");
goto out;
}
local->ctx = ctx;
if (list_empty (&ctx->contribution_head)) {
gf_log_callingfn (this->name, GF_LOG_WARNING,
"contribution node list is empty which "
"is an error");
goto out;
}
local->contri = (inode_contribution_t *) ctx->contribution_head.next;
return 0;
ret = 0;
out:
return ret;
}
@ -1228,6 +1343,12 @@ quota_release_parent_lock (call_frame_t *frame, void *cookie,
}
UNLOCK (&ctx->lock);
if (local->parent_loc.inode == NULL) {
gf_log (this->name, GF_LOG_WARNING,
"Invalid parent inode.");
goto err;
}
wind:
lock.l_type = F_UNLCK;
lock.l_whence = SEEK_SET;
@ -1243,6 +1364,10 @@ wind:
F_SETLKW, &lock);
return 0;
err:
xattr_updation_done (frame, NULL, this,
0, 0 , NULL);
return 0;
}
@ -1591,6 +1716,11 @@ quota_fetch_child_size_and_contri (call_frame_t *frame, void *cookie,
if (local->loc.inode->ia_type == IA_IFDIR) {
ret = dict_set_int64 (newdict, QUOTA_SIZE_KEY, 0);
if (ret < 0) {
gf_log (this->name, GF_LOG_WARNING,
"dict_set failed.");
goto err;
}
}
GET_CONTRI_KEY (contri_key, local->contri->gfid, ret);
@ -1600,6 +1730,11 @@ quota_fetch_child_size_and_contri (call_frame_t *frame, void *cookie,
}
ret = dict_set_int64 (newdict, contri_key, 0);
if (ret < 0) {
gf_log (this->name, GF_LOG_WARNING,
"dict_set failed.");
goto err;
}
mq_set_ctx_updation_status (local->ctx, _gf_false);
@ -1609,7 +1744,7 @@ quota_fetch_child_size_and_contri (call_frame_t *frame, void *cookie,
ret = 0;
err:
if ((op_ret == -1) || (ret == -1)) {
if ((op_ret == -1) || (ret < 0)) {
local->err = op_errno;
mq_set_ctx_updation_status (local->ctx, _gf_false);
@ -1697,6 +1832,13 @@ get_lock_on_parent (call_frame_t *frame, xlator_t *this)
gf_log (this->name, GF_LOG_DEBUG, "taking lock on %s",
local->parent_loc.path);
if (local->parent_loc.inode == NULL) {
gf_log (this->name, GF_LOG_DEBUG,
"parent inode is not valid, aborting "
"transaction.");
goto fr_destroy;
}
lock.l_len = 0;
lock.l_start = 0;
lock.l_type = F_WRLCK;
@ -1738,7 +1880,7 @@ start_quota_txn (xlator_t *this, loc_t *loc,
frame->local = local;
ret = loc_copy (&local->loc, loc);
ret = mq_loc_copy (&local->loc, loc);
if (ret < 0)
goto fr_destroy;
@ -1773,7 +1915,9 @@ initiate_quota_txn (xlator_t *this, loc_t *loc)
quota_inode_ctx_t *ctx = NULL;
inode_contribution_t *contribution = NULL;
VALIDATE_OR_GOTO (loc, out);
GF_VALIDATE_OR_GOTO ("marker", this, out);
GF_VALIDATE_OR_GOTO ("marker", loc, out);
GF_VALIDATE_OR_GOTO ("marker", loc->inode, out);
ret = quota_inode_ctx_get (loc->inode, this, &ctx);
if (ret == -1) {
@ -2073,14 +2217,14 @@ quota_inode_remove_done (call_frame_t *frame, void *cookie, xlator_t *this,
}
if (strcmp (local->parent_loc.path, "/") != 0) {
get_parent_inode_local (this, local);
ret = get_parent_inode_local (this, local);
if (ret < 0)
goto out;
start_quota_txn (this, &local->loc, local->ctx, local->contri);
}
/* TODO: free local in quota_local_unref only*/
out:
quota_local_unref (this, local);
GF_FREE (local);
return 0;
}
@ -2237,7 +2381,7 @@ reduce_parent_size (xlator_t *this, loc_t *loc, int64_t contri)
goto out;
}
ret = loc_copy (&local->loc, loc);
ret = mq_loc_copy (&local->loc, loc);
if (ret < 0)
goto out;
@ -2263,6 +2407,13 @@ reduce_parent_size (xlator_t *this, loc_t *loc, int64_t contri)
lock.l_type = F_WRLCK;
lock.l_whence = SEEK_SET;
if (local->parent_loc.inode == NULL) {
ret = -1;
gf_log (this->name, GF_LOG_WARNING,
"Inode is NULL, so can't stackwind.");
goto out;
}
STACK_WIND (frame,
mq_reduce_parent_size_xattr,
FIRST_CHILD(this),
@ -2272,10 +2423,8 @@ reduce_parent_size (xlator_t *this, loc_t *loc, int64_t contri)
ret = 0;
out:
if (local != NULL) {
if (local != NULL)
quota_local_unref (this, local);
GF_FREE (local);
}
return ret;
}

View File

@ -43,7 +43,6 @@
_frame->local = NULL; \
STACK_DESTROY (_frame->root); \
quota_local_unref (_this, _local); \
GF_FREE (_local); \
} while (0)

View File

@ -772,6 +772,7 @@ marker_unlink_stat_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
local = frame->local;
if (local == NULL) {
op_errno = EINVAL;
goto err;
}
@ -781,7 +782,7 @@ marker_unlink_stat_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
FIRST_CHILD(this)->fops->unlink, &local->loc);
return 0;
err:
STACK_UNWIND_STRICT (unlink, frame, -1, ENOMEM, NULL, NULL);
STACK_UNWIND_STRICT (unlink, frame, -1, op_errno, NULL, NULL);
return 0;
}