features/shard: Get hard-link-count in {unlink,rename}_cbk before deleting shards

Change-Id: I0606b74f11f5412c4d9af44a6505635ed9022c15
BUG: 1335858
Signed-off-by: Krutika Dhananjay <kdhananj@redhat.com>
Reviewed-on: http://review.gluster.org/14334
Reviewed-by: Pranith Kumar Karampuri <pkarampu@redhat.com>
Smoke: Gluster Build System <jenkins@build.gluster.com>
NetBSD-regression: NetBSD Build System <jenkins@build.gluster.org>
CentOS-regression: Gluster Build System <jenkins@build.gluster.com>
This commit is contained in:
Krutika Dhananjay 2016-05-12 15:06:59 +05:30 committed by Pranith Kumar Karampuri
parent d5e67cd2b2
commit 7a230e269f
5 changed files with 583 additions and 165 deletions

View File

@ -252,6 +252,7 @@
#define TIER_LINKFILE_GFID "tier-linkfile-gfid"
#define DHT_SKIP_OPEN_FD_UNLINK "dont-unlink-for-open-fd"
#define DHT_IATT_IN_XDATA_KEY "dht-get-iatt-in-xattr"
#define GET_LINK_COUNT "get-link-count"
/*CTR and Marker requires inode dentry link count from posix*/
#define GF_RESPONSE_LINK_COUNT_XDATA "gf_response_link_count"

View File

@ -0,0 +1,282 @@
#!/bin/bash
. $(dirname $0)/../../include.rc
. $(dirname $0)/../../volume.rc
cleanup
# The aim of this test script is to exercise the various codepaths of unlink
# and rename fops in sharding and make sure they work fine.
#
#################################################
################### UNLINK ######################
#################################################
TEST glusterd
TEST pidof glusterd
TEST $CLI volume create $V0 replica 2 $H0:$B0/${V0}{0,1}
TEST $CLI volume set $V0 features.shard on
TEST $CLI volume start $V0
TEST glusterfs --volfile-id=$V0 --volfile-server=$H0 $M0
TEST mkdir $M0/dir
TEST touch $M0/dir/foo
TEST touch $M0/dir/new
######################################
##### Unlink with /.shard absent #####
######################################
TEST truncate -s 5M $M0/dir/foo
TEST ! stat $B0/${V0}0/.shard
TEST ! stat $B0/${V0}1/.shard
# Test to ensure that unlink doesn't fail due to absence of /.shard
TEST unlink $M0/dir/foo
##################################################
##### Unlink of a sharded file without holes #####
##################################################
# Create a 9M sharded file
TEST dd if=/dev/zero of=$M0/dir/new bs=1024 count=9216
gfid_new=$(get_gfid_string $M0/dir/new)
# Ensure its shards are created.
TEST stat $B0/${V0}0/.shard/$gfid_new.1
TEST stat $B0/${V0}1/.shard/$gfid_new.1
TEST stat $B0/${V0}0/.shard/$gfid_new.2
TEST stat $B0/${V0}1/.shard/$gfid_new.2
TEST unlink $M0/dir/new
TEST ! stat $B0/${V0}0/.shard/$gfid_new.1
TEST ! stat $B0/${V0}1/.shard/$gfid_new.1
TEST ! stat $B0/${V0}0/.shard/$gfid_new.2
TEST ! stat $B0/${V0}1/.shard/$gfid_new.2
TEST ! stat $M0/dir/new
TEST ! stat $B0/${V0}0/dir/new
TEST ! stat $B0/${V0}1/dir/new
#######################################
##### Unlink with /.shard present #####
#######################################
TEST truncate -s 5M $M0/dir/foo
gfid_foo=$(get_gfid_string $M0/dir/foo)
# Ensure its shards are absent.
TEST ! stat $B0/${V0}0/.shard/$gfid_foo.1
TEST ! stat $B0/${V0}1/.shard/$gfid_foo.1
# Test to ensure that unlink of a sparse file works fine.
TEST unlink $M0/dir/foo
TEST ! stat $B0/${V0}0/dir/foo
TEST ! stat $B0/${V0}1/dir/foo
TEST ! stat $M0/dir/foo
#############################################################
##### Unlink of a file with only one block (the zeroth) #####
#############################################################
TEST touch $M0/dir/foo
TEST dd if=/dev/zero of=$M0/dir/foo bs=1024 count=1024
# Test to ensure that unlink of a sparse file works fine.
TEST unlink $M0/dir/foo
TEST ! stat $B0/${V0}0/dir/foo
TEST ! stat $B0/${V0}1/dir/foo
TEST ! stat $M0/dir/foo
####################################################
##### Unlink of a sharded file with hard-links #####
####################################################
# Create a 9M sharded file
TEST dd if=/dev/zero of=$M0/dir/original bs=1024 count=9216
gfid_original=$(get_gfid_string $M0/dir/original)
# Ensure its shards are created.
TEST stat $B0/${V0}0/.shard/$gfid_original.1
TEST stat $B0/${V0}1/.shard/$gfid_original.1
TEST stat $B0/${V0}0/.shard/$gfid_original.2
TEST stat $B0/${V0}1/.shard/$gfid_original.2
# Create a hard link.
TEST ln $M0/dir/original $M0/link
# Now delete the original file.
TEST unlink $M0/dir/original
# Ensure the shards are still intact.
TEST stat $B0/${V0}0/.shard/$gfid_original.1
TEST stat $B0/${V0}1/.shard/$gfid_original.1
TEST stat $B0/${V0}0/.shard/$gfid_original.2
TEST stat $B0/${V0}1/.shard/$gfid_original.2
TEST ! stat $M0/dir/original
TEST stat $M0/link
TEST stat $B0/${V0}0/link
TEST stat $B0/${V0}1/link
# Now delete the last link.
TEST unlink $M0/link
# Ensure that the shards are all cleaned up.
TEST ! stat $B0/${V0}0/.shard/$gfid_original.1
TEST ! stat $B0/${V0}1/.shard/$gfid_original.1
TEST ! stat $B0/${V0}0/.shard/$gfid_original.2
TEST ! stat $B0/${V0}1/.shard/$gfid_original.2
TEST ! stat $M0/link
TEST ! stat $B0/${V0}0/link
TEST ! stat $B0/${V0}1/link
EXPECT_WITHIN $UMOUNT_TIMEOUT "Y" force_umount $M0
TEST $CLI volume stop $V0
TEST $CLI volume delete $V0
cleanup
#################################################
################### RENAME ######################
#################################################
TEST glusterd
TEST pidof glusterd
TEST $CLI volume create $V0 replica 2 $H0:$B0/${V0}{0,1}
TEST $CLI volume set $V0 features.shard on
TEST $CLI volume start $V0
TEST glusterfs --volfile-id=$V0 --volfile-server=$H0 $M0
TEST mkdir $M0/dir
TEST touch $M0/dir/src
TEST touch $M0/dir/dst
######################################
##### Rename with /.shard absent #####
######################################
TEST truncate -s 5M $M0/dir/dst
TEST ! stat $B0/${V0}0/.shard
TEST ! stat $B0/${V0}1/.shard
# Test to ensure that rename doesn't fail due to absence of /.shard
TEST mv -f $M0/dir/src $M0/dir/dst
TEST ! stat $M0/dir/src
TEST stat $M0/dir/dst
TEST ! stat $B0/${V0}0/dir/src
TEST ! stat $B0/${V0}1/dir/src
TEST stat $B0/${V0}0/dir/dst
TEST stat $B0/${V0}1/dir/dst
##################################################
##### Rename to a sharded file without holes #####
##################################################
TEST unlink $M0/dir/dst
TEST touch $M0/dir/src
# Create a 9M sharded file
TEST dd if=/dev/zero of=$M0/dir/dst bs=1024 count=9216
gfid_dst=$(get_gfid_string $M0/dir/dst)
# Ensure its shards are created.
TEST stat $B0/${V0}0/.shard/$gfid_dst.1
TEST stat $B0/${V0}1/.shard/$gfid_dst.1
TEST stat $B0/${V0}0/.shard/$gfid_dst.2
TEST stat $B0/${V0}1/.shard/$gfid_dst.2
TEST mv -f $M0/dir/src $M0/dir/dst
TEST ! stat $B0/${V0}0/.shard/$gfid_dst.1
TEST ! stat $B0/${V0}1/.shard/$gfid_dst.1
TEST ! stat $B0/${V0}0/.shard/$gfid_dst.2
TEST ! stat $B0/${V0}1/.shard/$gfid_dst.2
TEST ! stat $M0/dir/src
TEST stat $M0/dir/dst
TEST ! stat $B0/${V0}0/dir/src
TEST ! stat $B0/${V0}1/dir/src
TEST stat $B0/${V0}0/dir/dst
TEST stat $B0/${V0}1/dir/dst
###################################################
##### Rename of dst file with /.shard present #####
###################################################
TEST unlink $M0/dir/dst
TEST touch $M0/dir/src
TEST truncate -s 5M $M0/dir/dst
# Test to ensure that unlink of a sparse file works fine.
TEST mv -f $M0/dir/src $M0/dir/dst
TEST ! stat $M0/dir/src
TEST stat $M0/dir/dst
TEST ! stat $B0/${V0}0/dir/src
TEST ! stat $B0/${V0}1/dir/src
TEST stat $B0/${V0}0/dir/dst
TEST stat $B0/${V0}1/dir/dst
###############################################################
##### Rename of dst file with only one block (the zeroth) #####
###############################################################
TEST unlink $M0/dir/dst
TEST touch $M0/dir/src
TEST dd if=/dev/zero of=$M0/dir/dst bs=1024 count=1024
# Test to ensure that unlink of a sparse file works fine.
TEST mv -f $M0/dir/src $M0/dir/dst
TEST ! stat $M0/dir/src
TEST stat $M0/dir/dst
TEST ! stat $B0/${V0}0/dir/src
TEST ! stat $B0/${V0}1/dir/src
TEST stat $B0/${V0}0/dir/dst
TEST stat $B0/${V0}1/dir/dst
########################################################
##### Rename to a dst sharded file with hard-links #####
########################################################
TEST unlink $M0/dir/dst
TEST touch $M0/dir/src
# Create a 9M sharded file
TEST dd if=/dev/zero of=$M0/dir/dst bs=1024 count=9216
gfid_dst=$(get_gfid_string $M0/dir/dst)
# Ensure its shards are created.
TEST stat $B0/${V0}0/.shard/$gfid_dst.1
TEST stat $B0/${V0}1/.shard/$gfid_dst.1
TEST stat $B0/${V0}0/.shard/$gfid_dst.2
TEST stat $B0/${V0}1/.shard/$gfid_dst.2
# Create a hard link.
TEST ln $M0/dir/dst $M0/link
# Now rename src to the dst.
TEST mv -f $M0/dir/src $M0/dir/dst
# Ensure the shards are still intact.
TEST stat $B0/${V0}0/.shard/$gfid_dst.1
TEST stat $B0/${V0}1/.shard/$gfid_dst.1
TEST stat $B0/${V0}0/.shard/$gfid_dst.2
TEST stat $B0/${V0}1/.shard/$gfid_dst.2
TEST ! stat $M0/dir/src
TEST ! stat $B0/${V0}0/dir/src
TEST ! stat $B0/${V0}1/dir/src
# Now rename another file to the last link.
TEST touch $M0/dir/src2
TEST mv -f $M0/dir/src2 $M0/link
# Ensure that the shards are all cleaned up.
TEST ! stat $B0/${V0}0/.shard/$gfid_dst.1
TEST ! stat $B0/${V0}1/.shard/$gfid_dst.1
TEST ! stat $B0/${V0}0/.shard/$gfid_dst.2
TEST ! stat $B0/${V0}1/.shard/$gfid_dst.2
TEST ! stat $M0/dir/src2
TEST ! stat $B0/${V0}0/dir/src2
TEST ! stat $B0/${V0}1/dir/src2
# Rename with non-existent dst and a sharded src
TEST touch $M0/dir/src
TEST dd if=/dev/zero of=$M0/dir/src bs=1024 count=9216
gfid_src=$(get_gfid_string $M0/dir/src)
# Ensure its shards are created.
TEST stat $B0/${V0}0/.shard/$gfid_src.1
TEST stat $B0/${V0}1/.shard/$gfid_src.1
TEST stat $B0/${V0}0/.shard/$gfid_src.2
TEST stat $B0/${V0}1/.shard/$gfid_src.2
# Now rename src to the dst.
TEST mv $M0/dir/src $M0/dir/dst
TEST stat $B0/${V0}0/.shard/$gfid_src.1
TEST stat $B0/${V0}1/.shard/$gfid_src.1
TEST stat $B0/${V0}0/.shard/$gfid_src.2
TEST stat $B0/${V0}1/.shard/$gfid_src.2
TEST ! stat $M0/dir/src
TEST ! stat $B0/${V0}0/dir/src
TEST ! stat $B0/${V0}1/dir/src
TEST stat $M0/dir/dst
TEST stat $B0/${V0}0/dir/dst
TEST stat $B0/${V0}1/dir/dst
# Rename with non-existent dst and a sharded src with no shards
TEST touch $M0/dir/src
TEST dd if=/dev/zero of=$M0/dir/src bs=1024 count=1024
gfid_src=$(get_gfid_string $M0/dir/src)
TEST ! stat $B0/${V0}0/.shard/$gfid_src.1
TEST ! stat $B0/${V0}1/.shard/$gfid_src.1
# Now rename src to the dst.
TEST mv $M0/dir/src $M0/dir/dst
TEST ! stat $M0/dir/src
TEST ! stat $B0/${V0}0/dir/src
TEST ! stat $B0/${V0}1/dir/src
TEST stat $M0/dir/dst
TEST stat $B0/${V0}0/dir/dst
TEST stat $B0/${V0}1/dir/dst
cleanup

View File

@ -561,7 +561,7 @@ dht_rename_unlock_cbk (call_frame_t *frame, void *cookie,
DHT_STACK_UNWIND (rename, frame, local->op_ret, local->op_errno,
&local->stbuf, &local->preoldparent,
&local->postoldparent, &local->preparent,
&local->postparent, NULL);
&local->postparent, local->xattr);
return 0;
}
@ -872,6 +872,12 @@ dht_rename_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
uuid_utoa(local->loc.inode->gfid):"");
}
}
if (xdata) {
if (!local->xattr)
local->xattr = dict_ref (xdata);
else
local->xattr = dict_copy_with_ref (xdata, local->xattr);
}
if ((src_cached == dst_cached) && (dst_hashed != dst_cached)) {
link_frame = copy_frame (frame);
@ -1022,7 +1028,6 @@ dht_do_rename (call_frame_t *frame)
xlator_t *dst_cached = NULL;
xlator_t *this = NULL;
xlator_t *rename_subvol = NULL;
dict_t *dict = NULL;
local = frame->local;
this = frame->this;
@ -1037,11 +1042,12 @@ dht_do_rename (call_frame_t *frame)
rename_subvol = dst_hashed;
if ((src_cached != dst_hashed) && (rename_subvol == dst_hashed)) {
DHT_MARKER_DONT_ACCOUNT(dict);
DHT_MARKER_DONT_ACCOUNT(local->xattr_req);
}
if (rename_subvol == src_cached) {
DHT_CHANGELOG_TRACK_AS_RENAME(dict, &local->loc, &local->loc2);
DHT_CHANGELOG_TRACK_AS_RENAME(local->xattr_req, &local->loc,
&local->loc2);
}
gf_msg_trace (this->name, 0,
@ -1052,10 +1058,7 @@ dht_do_rename (call_frame_t *frame)
FRAME_SU_DO (frame, dht_local_t);
STACK_WIND (frame, dht_rename_cbk,
rename_subvol, rename_subvol->fops->rename,
&local->loc, &local->loc2, dict);
if (dict)
dict_unref (dict);
&local->loc, &local->loc2, local->xattr_req);
return 0;
}
@ -1548,6 +1551,8 @@ dht_rename (call_frame_t *frame, xlator_t *this,
local->src_cached = src_cached;
local->dst_hashed = dst_hashed;
local->dst_cached = dst_cached;
if (xdata)
local->xattr_req = dict_ref (xdata);
gf_msg (this->name, GF_LOG_INFO, 0,
DHT_MSG_RENAME_INFO,

View File

@ -2135,13 +2135,161 @@ err:
}
int
shard_unlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno, struct iatt *preparent,
struct iatt *postparent, dict_t *xdata)
shard_unlink_shards_do (call_frame_t *frame, xlator_t *this, inode_t *inode);
int
shard_post_lookup_shards_unlink_handler (call_frame_t *frame, xlator_t *this)
{
shard_local_t *local = NULL;
local = frame->local;
if ((local->op_ret < 0) && (local->op_errno != ENOENT)) {
if (local->fop == GF_FOP_UNLINK)
SHARD_STACK_UNWIND (unlink, frame, local->op_ret,
local->op_errno, NULL, NULL, NULL);
else
SHARD_STACK_UNWIND (rename, frame, local->op_ret,
local->op_errno, NULL, NULL, NULL,
NULL, NULL, NULL);
return 0;
}
local->op_ret = 0;
local->op_errno = 0;
shard_unlink_shards_do (frame, this,
(local->fop == GF_FOP_RENAME)
? local->loc2.inode
: local->loc.inode);
return 0;
}
int
shard_rename_cbk (call_frame_t *frame, xlator_t *this);
int32_t
shard_unlink_cbk (call_frame_t *frame, xlator_t *this);
int
shard_post_resolve_unlink_handler (call_frame_t *frame, xlator_t *this)
{
shard_local_t *local = NULL;
local = frame->local;
if (local->op_ret < 0) {
if (local->op_errno == ENOENT) {
/* If lookup on /.shard fails with ENOENT, it probably
* means that the file is being unlinked before it
* could grow beyond its first block. In this case,
* unlink boils down to unlinking the base file and
* unwinding the call.
*/
local->op_ret = 0;
local->first_block = local->last_block = 0;
local->num_blocks = 1;
if (local->fop == GF_FOP_UNLINK)
shard_unlink_cbk (frame, this);
else
shard_rename_cbk (frame, this);
return 0;
} else {
if (local->fop == GF_FOP_UNLINK)
SHARD_STACK_UNWIND (unlink, frame,
local->op_ret,
local->op_errno, NULL, NULL,
NULL);
else
shard_rename_cbk (frame, this);
return 0;
}
}
if (!local->call_count)
shard_unlink_shards_do (frame, this,
(local->fop == GF_FOP_RENAME)
? local->loc2.inode
: local->loc.inode);
else
shard_common_lookup_shards (frame, this,
(local->fop == GF_FOP_RENAME)
? local->loc2.inode
: local->loc.inode,
shard_post_lookup_shards_unlink_handler);
return 0;
}
int
shard_unlink_base_file_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno,
struct iatt *preparent, struct iatt *postparent,
dict_t *xdata)
{
int ret = 0;
uint32_t link_count = 0;
shard_local_t *local = NULL;
shard_priv_t *priv = NULL;
local = frame->local;
priv = this->private;
if (op_ret < 0) {
SHARD_STACK_UNWIND (unlink, frame, op_ret, op_errno, NULL, NULL,
NULL);
return 0;
}
/* Because link() does not create links for all but the
* base shard, unlink() must delete these shards only when the
* link count is 1. We can return safely now.
*/
if ((xdata) && (!dict_get_uint32 (xdata, GET_LINK_COUNT, &link_count))
&& (link_count > 1))
goto unwind;
local->first_block = get_lowest_block (0, local->block_size);
local->last_block = get_highest_block (0, local->prebuf.ia_size,
local->block_size);
local->num_blocks = local->last_block - local->first_block + 1;
/* num_blocks = 1 implies that the file has not crossed its
* shard block size. So unlink boils down to unlinking just the
* base file. We can safely return now.
*/
if (local->num_blocks == 1)
goto unwind;
local->inode_list = GF_CALLOC (local->num_blocks, sizeof (inode_t *),
gf_shard_mt_inode_list);
if (!local->inode_list)
goto unwind;
/* Save the xdata and preparent and postparent iatts now. This will be
* used at the time of unwinding the call to the parent xl.
*/
local->preoldparent = *preparent;
local->postoldparent = *postparent;
if (xdata)
local->xattr_rsp = dict_ref (xdata);
local->dot_shard_loc.inode = inode_find (this->itable,
priv->dot_shard_gfid);
if (!local->dot_shard_loc.inode) {
ret = shard_init_dot_shard_loc (this, local);
if (ret)
goto unwind;
shard_lookup_dot_shard (frame, this,
shard_post_resolve_unlink_handler);
} else {
shard_common_resolve_shards (frame, this, local->loc.inode,
shard_post_resolve_unlink_handler);
}
return 0;
unwind:
SHARD_STACK_UNWIND (unlink, frame, op_ret, op_errno, preparent,
postparent, xdata);
return 0;
}
@ -2152,13 +2300,13 @@ shard_unlink_base_file (call_frame_t *frame, xlator_t *this)
local = frame->local;
if (local->op_ret < 0) {
shard_unlink_cbk (frame, 0, this, local->op_ret,
local->op_errno, NULL, NULL, NULL);
return 0;
}
if (dict_set_uint32 (local->xattr_req, GET_LINK_COUNT, 0))
gf_msg (this->name, GF_LOG_WARNING, 0,
SHARD_MSG_DICT_SET_FAILED, "Failed to set "
GET_LINK_COUNT" in dict");
STACK_WIND (frame, shard_unlink_cbk, FIRST_CHILD(this),
/* To-Do: Request open-fd count on base file */
STACK_WIND (frame, shard_unlink_base_file_cbk, FIRST_CHILD(this),
FIRST_CHILD(this)->fops->unlink, &local->loc, local->xflag,
local->xattr_req);
return 0;
@ -2199,6 +2347,17 @@ shard_unlink_block_inode (shard_local_t *local, int shard_block_num)
int
shard_rename_cbk (call_frame_t *frame, xlator_t *this);
int32_t
shard_unlink_cbk (call_frame_t *frame, xlator_t *this)
{
shard_local_t *local = frame->local;
SHARD_STACK_UNWIND (unlink, frame, local->op_ret, local->op_errno,
&local->preoldparent, &local->postoldparent,
local->xattr_rsp);
return 0;
}
int
shard_unlink_shards_do_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno,
@ -2225,7 +2384,7 @@ done:
SHARD_UNSET_ROOT_FS_ID (frame, local);
if (local->fop == GF_FOP_UNLINK)
shard_unlink_base_file (frame, this);
shard_unlink_cbk (frame, this);
else if (local->fop == GF_FOP_RENAME)
shard_rename_cbk (frame, this);
else
@ -2254,9 +2413,16 @@ shard_unlink_shards_do (call_frame_t *frame, xlator_t *this, inode_t *inode)
priv = this->private;
local = frame->local;
/* local->num_blocks includes the base file block. This function only
* deletes the shards under /.shard. So subtract num_blocks by 1.
*/
local->call_count = call_count = local->num_blocks - 1;
last_block = local->last_block;
/* Ignore the inode associated with the base file and start counting
* from 1.
*/
for (i = 1; i < local->num_blocks; i++) {
if (!local->inode_list[i])
continue;
@ -2266,20 +2432,15 @@ shard_unlink_shards_do (call_frame_t *frame, xlator_t *this, inode_t *inode)
if (!count) {
/* callcount = 0 implies that all of the shards that need to be
* unlinked are non-existent (in other words the file is full of
* holes). So shard xlator would now proceed to do the final
* unlink on the base file.
* holes). So shard xlator can simply return the fop to its
* parent now.
*/
gf_msg_debug (this->name, 0, "All shards that need to be "
"unlinked are non-existent: %s",
uuid_utoa (inode->gfid));
local->num_blocks = 1;
if (local->fop == GF_FOP_UNLINK) {
gf_msg_debug (this->name, 0, "Proceeding to unlink the"
" base file");
STACK_WIND (frame, shard_unlink_cbk, FIRST_CHILD(this),
FIRST_CHILD(this)->fops->unlink,
&local->loc, local->flags,
local->xattr_req);
shard_unlink_cbk (frame, this);
} else if (local->fop == GF_FOP_RENAME) {
gf_msg_debug (this->name, 0, "Resuming rename()");
shard_rename_cbk (frame, this);
@ -2291,6 +2452,8 @@ shard_unlink_shards_do (call_frame_t *frame, xlator_t *this, inode_t *inode)
cur_block = 1;
SHARD_SET_ROOT_FS_ID (frame, local);
/* Ignore the base file and start iterating from the first block shard.
*/
while (cur_block <= last_block) {
if (!local->inode_list[cur_block]) {
cur_block++;
@ -2346,86 +2509,6 @@ next:
return 0;
}
int
shard_post_lookup_shards_unlink_handler (call_frame_t *frame, xlator_t *this)
{
shard_local_t *local = NULL;
local = frame->local;
if ((local->op_ret < 0) && (local->op_errno != ENOENT)) {
if (local->fop == GF_FOP_UNLINK)
SHARD_STACK_UNWIND (unlink, frame, local->op_ret,
local->op_errno, NULL, NULL, NULL);
else
SHARD_STACK_UNWIND (rename, frame, local->op_ret,
local->op_errno, NULL, NULL, NULL,
NULL, NULL, NULL);
return 0;
}
local->op_ret = 0;
local->op_errno = 0;
shard_unlink_shards_do (frame, this,
(local->fop == GF_FOP_RENAME)
? local->loc2.inode
: local->loc.inode);
return 0;
}
int
shard_post_resolve_unlink_handler (call_frame_t *frame, xlator_t *this)
{
shard_local_t *local = NULL;
local = frame->local;
if (local->op_ret < 0) {
if (local->op_errno == ENOENT) {
/* If lookup on /.shard fails with ENOENT, it probably
* means that the file is being unlinked before it
* could grow beyond its first block. In this case,
* unlink boils down to unlinking the base file and
* unwinding the call.
*/
local->op_ret = 0;
local->first_block = local->last_block = 0;
local->num_blocks = 1;
if (local->fop == GF_FOP_UNLINK)
STACK_WIND (frame, shard_unlink_cbk,
FIRST_CHILD(this),
FIRST_CHILD (this)->fops->unlink,
&local->loc, local->xflag,
local->xattr_req);
else
shard_rename_cbk (frame, this);
return 0;
} else {
if (local->fop == GF_FOP_UNLINK)
SHARD_STACK_UNWIND (unlink, frame,
local->op_ret,
local->op_errno, NULL, NULL,
NULL);
else
shard_rename_cbk (frame, this);
return 0;
}
}
if (!local->call_count)
shard_unlink_shards_do (frame, this,
(local->fop == GF_FOP_RENAME)
? local->loc2.inode
: local->loc.inode);
else
shard_common_lookup_shards (frame, this,
(local->fop == GF_FOP_RENAME)
? local->loc2.inode
: local->loc.inode,
shard_post_lookup_shards_unlink_handler);
return 0;
}
int
shard_post_lookup_unlink_handler (call_frame_t *frame, xlator_t *this)
{
@ -2442,46 +2525,7 @@ shard_post_lookup_unlink_handler (call_frame_t *frame, xlator_t *this)
return 0;
}
local->first_block = get_lowest_block (0, local->block_size);
local->last_block = get_highest_block (0, local->prebuf.ia_size,
local->block_size);
local->num_blocks = local->last_block - local->first_block + 1;
if ((local->num_blocks == 1) || (local->prebuf.ia_nlink > 1)) {
/* num_blocks = 1 implies that the file has not crossed its
* shard block size. So unlink boils down to unlinking just the
* base file.
* Because link() does not create links for all but the
* base shard, unlink() must delete these shards only when the
* link count is 1.
*/
STACK_WIND (frame, shard_unlink_cbk, FIRST_CHILD (this),
FIRST_CHILD (this)->fops->unlink, &local->loc,
local->xflag, local->xattr_req);
return 0;
}
local->inode_list = GF_CALLOC (local->num_blocks, sizeof (inode_t *),
gf_shard_mt_inode_list);
if (!local->inode_list)
goto out;
local->dot_shard_loc.inode = inode_find (this->itable,
priv->dot_shard_gfid);
if (!local->dot_shard_loc.inode) {
ret = shard_init_dot_shard_loc (this, local);
if (ret)
goto out;
shard_lookup_dot_shard (frame, this,
shard_post_resolve_unlink_handler);
} else {
shard_common_resolve_shards (frame, this, local->loc.inode,
shard_post_resolve_unlink_handler);
}
return 0;
out:
SHARD_STACK_UNWIND (unlink, frame, -1, ENOMEM, NULL, NULL, NULL);
shard_unlink_base_file (frame, this);
return 0;
}
@ -2524,7 +2568,6 @@ shard_unlink (call_frame_t *frame, xlator_t *this, loc_t *loc, int xflag,
shard_lookup_base_file (frame, this, &local->loc,
shard_post_lookup_unlink_handler);
return 0;
err:
SHARD_STACK_UNWIND (unlink, frame, -1, ENOMEM, NULL, NULL, NULL);
@ -2549,9 +2592,10 @@ shard_rename_cbk (call_frame_t *frame, xlator_t *this)
int
shard_rename_unlink_dst_shards_do (call_frame_t *frame, xlator_t *this)
{
int ret = -1;
shard_local_t *local = NULL;
shard_priv_t *priv = NULL;
int ret = -1;
uint32_t link_count = 0;
shard_local_t *local = NULL;
shard_priv_t *priv = NULL;
local = frame->local;
priv = this->private;
@ -2561,7 +2605,14 @@ shard_rename_unlink_dst_shards_do (call_frame_t *frame, xlator_t *this)
local->dst_block_size);
local->num_blocks = local->last_block - local->first_block + 1;
if ((local->num_blocks == 1) || (local->postbuf.ia_nlink > 1)) {
if ((local->xattr_rsp) &&
(!dict_get_uint32 (local->xattr_rsp, GET_LINK_COUNT, &link_count))
&& (link_count > 1)) {
shard_rename_cbk (frame, this);
return 0;
}
if (local->num_blocks == 1) {
shard_rename_cbk (frame, this);
return 0;
}
@ -2664,6 +2715,12 @@ shard_rename_src_base_file (call_frame_t *frame, xlator_t *this)
local = frame->local;
if (dict_set_uint32 (local->xattr_req, GET_LINK_COUNT, 0))
gf_msg (this->name, GF_LOG_WARNING, 0,
SHARD_MSG_DICT_SET_FAILED, "Failed to set "
GET_LINK_COUNT" in dict");
/* To-Do: Request open-fd count on dst base file */
STACK_WIND (frame, shard_rename_src_cbk, FIRST_CHILD(this),
FIRST_CHILD(this)->fops->rename, &local->loc, &local->loc2,
local->xattr_req);

View File

@ -1717,13 +1717,13 @@ out:
int32_t
posix_unlink_gfid_handle_and_entry (xlator_t *this, const char *real_path,
struct iatt *stbuf, int32_t *op_errno,
loc_t *loc)
loc_t *loc, gf_boolean_t get_link_count,
dict_t *rsp_dict)
{
int32_t ret = 0;
struct posix_private *priv = NULL;
int fd_count = 0;
priv = this->private;
int fd_count = 0;
int32_t ret = 0;
struct iatt prebuf = {0,};
gf_boolean_t locked = _gf_false;
/* Unlink the gfid_handle_first */
if (stbuf && stbuf->ia_nlink == 1) {
@ -1746,6 +1746,18 @@ posix_unlink_gfid_handle_and_entry (xlator_t *this, const char *real_path,
}
}
if (get_link_count) {
LOCK (&loc->inode->lock);
locked = _gf_true;
ret = posix_pstat (this, loc->gfid, real_path, &prebuf);
if (ret) {
gf_msg (this->name, GF_LOG_ERROR, errno,
P_MSG_LSTAT_FAILED, "lstat on %s failed",
real_path);
goto err;
}
}
/* Unlink the actual file */
ret = sys_unlink (real_path);
if (ret == -1) {
@ -1756,9 +1768,23 @@ posix_unlink_gfid_handle_and_entry (xlator_t *this, const char *real_path,
goto err;
}
if (locked) {
UNLOCK (&loc->inode->lock);
locked = _gf_false;
}
ret = dict_set_uint32 (rsp_dict, GET_LINK_COUNT, prebuf.ia_nlink);
if (ret)
gf_msg (this->name, GF_LOG_WARNING, 0, P_MSG_SET_XDATA_FAIL,
"failed to set "GET_LINK_COUNT" for %s", real_path);
return 0;
err:
if (locked) {
UNLOCK (&loc->inode->lock);
locked = _gf_false;
}
return -1;
}
@ -1849,6 +1875,7 @@ posix_unlink (call_frame_t *frame, xlator_t *this,
void *uuid = NULL;
char uuid_str[GF_UUID_BUF_SIZE] = {0};
char gfid_str[GF_UUID_BUF_SIZE] = {0};
gf_boolean_t get_link_count = _gf_false;
DECLARE_OLD_FS_ID_VAR;
@ -1972,18 +1999,23 @@ posix_unlink (call_frame_t *frame, xlator_t *this,
}
}
op_ret = posix_unlink_gfid_handle_and_entry (this, real_path, &stbuf,
&op_errno, loc);
if (op_ret == -1) {
goto out;
}
unwind_dict = dict_new ();
if (!unwind_dict) {
op_errno = -ENOMEM;
op_ret = -1;
goto out;
}
if (xdata && dict_get (xdata, GET_LINK_COUNT))
get_link_count = _gf_true;
op_ret = posix_unlink_gfid_handle_and_entry (this, real_path, &stbuf,
&op_errno, loc,
get_link_count,
unwind_dict);
if (op_ret == -1) {
goto out;
}
if (fdstat_requested) {
op_ret = posix_fdstat (this, fd, &postbuf);
if (op_ret == -1) {
@ -2307,6 +2339,8 @@ posix_rename (call_frame_t *frame, xlator_t *this,
char *pgfid_xattr_key = NULL;
int32_t nlink_samepgfid = 0;
dict_t *unwind_dict = NULL;
gf_boolean_t locked = _gf_false;
gf_boolean_t get_link_count = _gf_false;
DECLARE_OLD_FS_ID_VAR;
@ -2333,6 +2367,13 @@ posix_rename (call_frame_t *frame, xlator_t *this,
goto out;
}
unwind_dict = dict_new ();
if (!unwind_dict) {
op_ret = -1;
op_errno = ENOMEM;
goto out;
}
op_ret = posix_pstat (this, oldloc->pargfid, par_oldpath, &preoldparent);
if (op_ret == -1) {
op_errno = errno;
@ -2399,6 +2440,22 @@ posix_rename (call_frame_t *frame, xlator_t *this,
this, unlock);
}
if ((xdata) && (dict_get (xdata, GET_LINK_COUNT))
&& (real_newpath) && (was_present)) {
LOCK (&newloc->inode->lock);
locked = _gf_true;
get_link_count = _gf_true;
op_ret = posix_pstat (this, newloc->gfid, real_newpath,
&stbuf);
if ((op_ret == -1) && (errno != ENOENT)) {
op_errno = errno;
gf_msg (this->name, GF_LOG_ERROR, errno,
P_MSG_LSTAT_FAILED,
"lstat on %s failed", real_newpath);
goto unlock;
}
}
op_ret = sys_rename (real_oldpath, real_newpath);
if (op_ret == -1) {
op_errno = errno;
@ -2426,6 +2483,18 @@ posix_rename (call_frame_t *frame, xlator_t *this,
goto unlock;
}
if (locked) {
UNLOCK (&newloc->inode->lock);
locked = _gf_false;
}
if ((get_link_count) &&
(dict_set_uint32 (unwind_dict, GET_LINK_COUNT,
stbuf.ia_nlink)))
gf_msg (this->name, GF_LOG_WARNING, 0,
P_MSG_SET_XDATA_FAIL, "failed to set "
GET_LINK_COUNT" for %s", real_newpath);
if (!IA_ISDIR (oldloc->inode->ia_type)
&& priv->update_pgfid_nlinks) {
MAKE_PGFID_XATTR_KEY (pgfid_xattr_key,
@ -2439,6 +2508,10 @@ posix_rename (call_frame_t *frame, xlator_t *this,
}
}
unlock:
if (locked) {
UNLOCK (&newloc->inode->lock);
locked = _gf_false;
}
UNLOCK (&oldloc->inode->lock);
if (op_ret < 0) {
@ -2487,7 +2560,7 @@ unlock:
}
if (was_present)
unwind_dict = posix_dict_set_nlink (xdata, NULL, nlink);
unwind_dict = posix_dict_set_nlink (xdata, unwind_dict, nlink);
op_ret = 0;
out: