cluster/dht: refactor dht_lookup

The dht lookup code is getting difficult to maintain
due to its size. Refactoring the code will make it
easier to modify it in future.

Change-Id: Ic7cb5bf4f018504dfaa7f0d48cf42ab0aa34abdd
updates: bz#1590385
Signed-off-by: N Balachandran <nbalacha@redhat.com>
This commit is contained in:
N Balachandran 2018-06-12 19:49:52 +05:30 committed by Amar Tumballi
parent 234d611160
commit 13b5ab49ac
2 changed files with 375 additions and 211 deletions

View File

@ -0,0 +1,54 @@
#!/bin/bash
. $(dirname $0)/../../include.rc
. $(dirname $0)/../../volume.rc
. $(dirname $0)/../../nfs.rc
. $(dirname $0)/../../common-utils.rc
# Test overview:
# Check that non-privileged users can also clean up stale linkto files
#
# 1. Use the current parallel-readdir behaviour of changing the DHT child subvols
# in the graph to generate stale linkto files
# 2. Access the file with the stale linkto file as a non-root user
# 3. This should now succeed (returned EIO before commit 3fb1df7870e03c9de)
cleanup
TEST glusterd
TEST pidof glusterd
TEST $CLI volume create $V0 $H0:$B0/$V0-{1..3}
TEST $CLI volume start $V0
# Mount using FUSE and create a file
TEST glusterfs -s $H0 --volfile-id $V0 $M0
TEST glusterfs -s $H0 --volfile-id $V0 $M1
ls $M0/FILE-1
EXPECT "2" echo $?
# Create a file and a directory on $M0
TEST dd if=/dev/urandom of=$M0/FILE-1 count=1 bs=16k
TEST mkdir $M0/dir1
ls $M0/FILE-1
EXPECT "0" echo $?
ls $M0/dir1
EXPECT "0" echo $?
#Use a fresh mount so as to trigger a fresh lookup
TEST glusterfs -s $H0 --volfile-id $V0 $M1
TEST ls $M1/FILE-1
EXPECT "0" echo $?
ls $M1/dir1
EXPECT "0" echo $?
# Cleanup
cleanup

View File

@ -50,6 +50,15 @@ dht_common_xattrop_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno, dict_t *dict,
dict_t *xdata);
int
dht_set_file_xattr_req (xlator_t *this, loc_t *loc, dict_t *xattr_req);
int
dht_set_dir_xattr_req (xlator_t *this, loc_t *loc, dict_t *xattr_req);
int
dht_do_fresh_lookup (call_frame_t *frame, xlator_t *this, loc_t *loc);
/* Sets the blocks and size values to fixed values. This is to be called
* only for dirs. The caller is responsible for checking the type
@ -1152,7 +1161,7 @@ out:
int
dht_discover (call_frame_t *frame, xlator_t *this, loc_t *loc)
dht_do_discover (call_frame_t *frame, xlator_t *this, loc_t *loc)
{
int ret;
dht_local_t *local = NULL;
@ -1165,21 +1174,17 @@ dht_discover (call_frame_t *frame, xlator_t *this, loc_t *loc)
conf = this->private;
local = frame->local;
ret = dict_set_uint32 (local->xattr_req, conf->xattr_name, 4 * 4);
if (ret)
gf_msg (this->name, GF_LOG_WARNING, 0,
DHT_MSG_DICT_SET_FAILED,
"%s: Failed to set dictionary value:key = %s",
loc->path, conf->xattr_name);
ret = dht_set_file_xattr_req (this, loc, local->xattr_req);
if (ret) {
goto err;
}
ret = dict_set_uint32 (local->xattr_req, conf->link_xattr_name, 256);
if (ret)
gf_msg (this->name, GF_LOG_WARNING, 0,
DHT_MSG_DICT_SET_FAILED,
"%s: Failed to set dictionary value:key = %s",
loc->path, conf->link_xattr_name);
ret = dht_set_dir_xattr_req (this, loc, local->xattr_req);
if (ret) {
goto err;
}
if (__is_root_gfid(local->loc.gfid)) {
if (loc_is_root (loc)) {
ret = dict_set_uint32 (local->xattr_req,
conf->commithash_xattr_name,
sizeof(uint32_t));
@ -3323,20 +3328,16 @@ err:
* perform proper self-healing of dirs
*/
void
dht_check_and_set_acl_xattr_req (inode_t *inode, dict_t *xattr_req)
dht_check_and_set_acl_xattr_req (xlator_t *this, dict_t *xattr_req)
{
int ret = 0;
GF_ASSERT (inode);
GF_ASSERT (xattr_req);
if (inode->ia_type != IA_IFDIR)
return;
if (!dict_get (xattr_req, POSIX_ACL_ACCESS_XATTR)) {
ret = dict_set_int8 (xattr_req, POSIX_ACL_ACCESS_XATTR, 0);
if (ret)
gf_msg (THIS->name, GF_LOG_WARNING, -ret,
gf_msg (this->name, GF_LOG_WARNING, -ret,
DHT_MSG_DICT_SET_FAILED,
"Failed to set dictionary value:key = %s",
POSIX_ACL_ACCESS_XATTR);
@ -3345,7 +3346,7 @@ dht_check_and_set_acl_xattr_req (inode_t *inode, dict_t *xattr_req)
if (!dict_get (xattr_req, POSIX_ACL_DEFAULT_XATTR)) {
ret = dict_set_int8 (xattr_req, POSIX_ACL_DEFAULT_XATTR, 0);
if (ret)
gf_msg (THIS->name, GF_LOG_WARNING, -ret,
gf_msg (this->name, GF_LOG_WARNING, -ret,
DHT_MSG_DICT_SET_FAILED,
"Failed to set dictionary value:key = %s",
POSIX_ACL_DEFAULT_XATTR);
@ -3354,12 +3355,104 @@ dht_check_and_set_acl_xattr_req (inode_t *inode, dict_t *xattr_req)
return;
}
/* for directories, we need the following info:
* the layout : trusted.glusterfs.dht
* the mds information : trusted.glusterfs.dht.mds
* the acl info: See above
*/
int
dht_lookup (call_frame_t *frame, xlator_t *this,
loc_t *loc, dict_t *xattr_req)
dht_set_dir_xattr_req (xlator_t *this, loc_t *loc, dict_t *xattr_req)
{
int ret = -EINVAL;
dht_conf_t *conf = NULL;
conf = this->private;
if (!conf) {
goto err;
}
if (!xattr_req) {
goto err;
}
/* Xattr to get the layout for a directory
*/
ret = dict_set_uint32 (xattr_req, conf->xattr_name, 4 * 4);
if (ret) {
gf_msg (this->name, GF_LOG_WARNING, ENOMEM,
DHT_MSG_DICT_SET_FAILED,
"Failed to set dictionary value:key = %s for "
"path %s", conf->xattr_name, loc->path);
goto err;
}
/*Non-fatal failure */
ret = dict_set_uint32 (xattr_req, conf->mds_xattr_key, 4);
if (ret) {
gf_msg (this->name, GF_LOG_WARNING, ENOMEM,
DHT_MSG_DICT_SET_FAILED,
"Failed to set dictionary value:key = %s for "
"path %s", conf->mds_xattr_key, loc->path);
}
dht_check_and_set_acl_xattr_req (this, xattr_req);
ret = 0;
err:
return ret;
}
int dht_set_file_xattr_req (xlator_t *this, loc_t *loc, dict_t *xattr_req)
{
int ret = -EINVAL;
dht_conf_t *conf = NULL;
conf = this->private;
if (!conf) {
goto err;
}
if (!xattr_req) {
goto err;
}
/* Used to check whether this is a linkto file.
*/
ret = dict_set_uint32 (xattr_req,
conf->link_xattr_name, 256);
if (ret < 0) {
gf_msg (this->name, GF_LOG_WARNING, ENOMEM,
DHT_MSG_DICT_SET_FAILED,
"Failed to set dictionary value:key = %s for "
"path %s", conf->link_xattr_name, loc->path);
goto err;
}
/* This is used to make sure we don't unlink linkto files
* which are the target of an ongoing file migration.
*/
ret = dict_set_uint32 (xattr_req,
GLUSTERFS_OPEN_FD_COUNT, 4);
if (ret) {
gf_msg (this->name, GF_LOG_WARNING, ENOMEM,
DHT_MSG_DICT_SET_FAILED,
"Failed to set dictionary value:key = %s for "
"path %s", GLUSTERFS_OPEN_FD_COUNT, loc->path);
goto err;
}
ret = 0;
err:
return ret;
}
int
dht_do_revalidate (call_frame_t *frame, xlator_t *this, loc_t *loc)
{
xlator_t *subvol = NULL;
xlator_t *hashed_subvol = NULL;
xlator_t *mds_subvol = NULL;
dht_local_t *local = NULL;
dht_conf_t *conf = NULL;
@ -3369,6 +3462,202 @@ dht_lookup (call_frame_t *frame, xlator_t *this,
int i = 0;
int call_cnt = 0;
int gen = 0;
conf = this->private;
if (!conf) {
op_errno = EINVAL;
goto err;
}
local = frame->local;
if (!local) {
op_errno = EINVAL;
goto err;
}
layout = local->layout;
if (!layout) {
gf_msg_debug (this->name, 0,
"path = %s. No layout found in the inode ctx.",
loc->path);
op_errno = EINVAL;
goto err;
}
/* Generation number has changed. This layout may be stale. */
if (layout->gen && (layout->gen < conf->gen)) {
gen = layout->gen;
dht_layout_unref (this, local->layout);
local->layout = NULL;
local->cached_subvol = NULL;
gf_msg_debug(this->name, 0,
"path = %s. In memory layout may be stale."
"(layout->gen (%d) is less than "
"conf->gen (%d)). Calling fresh lookup.",
loc->path, gen, conf->gen);
dht_do_fresh_lookup (frame, this, loc);
return 0;
}
local->inode = inode_ref (loc->inode);
/* Since we don't know whether this has changed,
* request all xattrs*/
ret = dht_set_file_xattr_req (this, loc, local->xattr_req);
if (ret) {
op_errno = -ret;
goto err;
}
ret = dht_set_dir_xattr_req (this, loc, local->xattr_req);
if (ret) {
op_errno = -ret;
goto err;
}
if (IA_ISDIR (local->inode->ia_type)) {
ret = dht_inode_ctx_mdsvol_get (local->inode, this,
&mds_subvol);
if (ret || !mds_subvol) {
gf_msg_debug (this->name, 0,
"path = %s. No mds subvol in inode ctx",
local->loc.path);
}
local->mds_subvol = mds_subvol;
local->call_cnt = conf->subvolume_cnt;
call_cnt = local->call_cnt;
for (i = 0; i < call_cnt; i++) {
STACK_WIND_COOKIE (frame, dht_revalidate_cbk,
conf->subvolumes[i],
conf->subvolumes[i],
conf->subvolumes[i]->fops->lookup,
loc, local->xattr_req);
}
return 0;
}
/* If not a dir, this should be 1 */
local->call_cnt = layout->cnt;
call_cnt = local->call_cnt;
for (i = 0; i < call_cnt; i++) {
subvol = layout->list[i].xlator;
gf_msg_debug (this->name, 0, "path = %s. Calling "
"revalidate lookup on %s",
loc->path, subvol->name);
STACK_WIND_COOKIE (frame, dht_revalidate_cbk, subvol,
subvol, subvol->fops->lookup,
&local->loc, local->xattr_req);
}
return 0;
err:
op_errno = (op_errno == -1) ? errno : op_errno;
DHT_STACK_UNWIND (lookup, frame, -1, op_errno, NULL, NULL, NULL,
NULL);
return 0;
}
int
dht_do_fresh_lookup (call_frame_t *frame, xlator_t *this, loc_t *loc)
{
int ret = -1;
dht_conf_t *conf = NULL;
xlator_t *hashed_subvol = NULL;
dht_local_t *local = NULL;
int op_errno = -1;
int call_cnt = 0;
int i = 0;
conf = this->private;
if (!conf) {
op_errno = EINVAL;
goto err;
}
local = frame->local;
if (!local) {
op_errno = EINVAL;
goto err;
}
/* Since we don't know whether this is a file or a directory,
* request all xattrs*/
ret = dht_set_file_xattr_req (this, loc, local->xattr_req);
if (ret) {
op_errno = -ret;
goto err;
}
ret = dht_set_dir_xattr_req (this, loc, local->xattr_req);
if (ret) {
op_errno = -ret;
goto err;
}
/* This should have been set in dht_lookup */
hashed_subvol = local->hashed_subvol;
if (!hashed_subvol) {
gf_msg_debug (this->name, 0,
"%s: no subvolume in layout for path, "
"checking on all the subvols to see if "
"it is a directory", loc->path);
call_cnt = conf->subvolume_cnt;
local->call_cnt = call_cnt;
local->layout = dht_layout_new (this,
conf->subvolume_cnt);
if (!local->layout) {
op_errno = ENOMEM;
goto err;
}
gf_msg_debug (this->name, 0,
"%s: Found null hashed subvol. Calling lookup"
" on all nodes.", loc->path);
for (i = 0; i < call_cnt; i++) {
STACK_WIND_COOKIE (frame, dht_lookup_dir_cbk,
conf->subvolumes[i],
conf->subvolumes[i],
conf->subvolumes[i]->fops->lookup,
&local->loc, local->xattr_req);
}
return 0;
}
/* if we have the hashed_subvol, send the lookup there first so
* as to see whether we have a file or a directory */
gf_msg_debug (this->name, 0, "Calling fresh lookup for %s on"
" %s", loc->path, hashed_subvol->name);
STACK_WIND_COOKIE (frame, dht_lookup_cbk, hashed_subvol,
hashed_subvol, hashed_subvol->fops->lookup,
loc, local->xattr_req);
return 0;
err:
op_errno = (op_errno == -1) ? errno : op_errno;
DHT_STACK_UNWIND (lookup, frame, -1, op_errno, NULL, NULL, NULL,
NULL);
return 0;
}
int
dht_lookup (call_frame_t *frame, xlator_t *this,
loc_t *loc, dict_t *xattr_req)
{
xlator_t *hashed_subvol = NULL;
dht_local_t *local = NULL;
dht_conf_t *conf = NULL;
int ret = -1;
int op_errno = -1;
loc_t new_loc = {0,};
VALIDATE_OR_GOTO (frame, err);
@ -3392,7 +3681,7 @@ dht_lookup (call_frame_t *frame, xlator_t *this,
loc_wipe (&local->loc);
ret = loc_dup (&new_loc, &local->loc);
/* we no more need 'new_loc' entries */
/* we no longer need 'new_loc' entries */
loc_wipe (&new_loc);
/* check if loc_dup() is successful */
@ -3411,26 +3700,16 @@ dht_lookup (call_frame_t *frame, xlator_t *this,
local->xattr_req = dict_new ();
}
ret = dict_set_uint32 (local->xattr_req, conf->mds_xattr_key, 4);
if (ret) {
gf_msg (this->name, GF_LOG_WARNING, ENOMEM,
DHT_MSG_DICT_SET_FAILED,
"Failed to set dictionary value:key = %s for "
"path %s", conf->mds_xattr_key, loc->path);
}
/* Nameless lookup */
if (gf_uuid_is_null (loc->pargfid) && !gf_uuid_is_null (loc->gfid) &&
!__is_root_gfid (loc->inode->gfid)) {
local->cached_subvol = NULL;
dht_discover (frame, this, loc);
dht_do_discover (frame, this, loc);
return 0;
}
if (__is_root_gfid(loc->gfid)) {
if (loc_is_root (loc)) {
ret = dict_set_uint32 (local->xattr_req,
conf->commithash_xattr_name,
sizeof(uint32_t));
@ -3444,183 +3723,14 @@ dht_lookup (call_frame_t *frame, xlator_t *this,
/* The entry has been looked up before and has an inode_ctx set
*/
if (is_revalidate (loc)) {
layout = local->layout;
if (!layout) {
gf_msg_debug (this->name, 0,
"Revalidate lookup without cache."
" path=%s", loc->path);
op_errno = EINVAL;
goto err;
}
if (layout->gen && (layout->gen < conf->gen)) {
gf_msg_trace (this->name, 0,
"incomplete layout failure for path=%s",
loc->path);
gen = layout->gen;
dht_layout_unref (this, local->layout);
local->layout = NULL;
local->cached_subvol = NULL;
gf_msg_debug(this->name, 0,
"Called revalidate lookup for %s, "
"but layout->gen (%d) is less than "
"conf->gen (%d), calling fresh_lookup",
loc->path, gen, conf->gen);
goto do_fresh_lookup;
}
local->inode = inode_ref (loc->inode);
ret = dict_set_uint32 (local->xattr_req,
conf->xattr_name, 4 * 4);
if (ret) {
gf_msg (this->name, GF_LOG_WARNING, ENOMEM,
DHT_MSG_DICT_SET_FAILED,
"Failed to set dictionary value:key = %s for "
"path %s", conf->xattr_name, loc->path);
goto err;
}
/* need it in case file is not found on cached file
* on revalidate path and we may encounter linkto files on
* with dht_lookup_everywhere*/
ret = dict_set_uint32 (local->xattr_req,
conf->link_xattr_name, 256);
if (ret < 0) {
gf_msg (this->name, GF_LOG_WARNING, ENOMEM,
DHT_MSG_DICT_SET_FAILED,
"Failed to set dictionary value:key = %s for "
"path %s", conf->link_xattr_name, loc->path);
goto err;
}
if (IA_ISDIR (local->inode->ia_type)) {
ret = dht_inode_ctx_mdsvol_get (local->inode, this,
&mds_subvol);
if (ret || !mds_subvol) {
gf_msg_debug (this->name, 0,
"Failed to get mds subvol for path %s",
local->loc.path);
}
local->mds_subvol = mds_subvol;
local->call_cnt = call_cnt = conf->subvolume_cnt;
for (i = 0; i < call_cnt; i++) {
STACK_WIND_COOKIE (frame, dht_revalidate_cbk,
conf->subvolumes[i],
conf->subvolumes[i],
conf->subvolumes[i]->fops->lookup,
loc, local->xattr_req);
}
return 0;
}
call_cnt = local->call_cnt = layout->cnt;
/* need it for self-healing linkfiles which is
'in-migration' state */
ret = dict_set_uint32 (local->xattr_req,
GLUSTERFS_OPEN_FD_COUNT, 4);
if (ret) {
gf_msg (this->name, GF_LOG_WARNING, ENOMEM,
DHT_MSG_DICT_SET_FAILED,
"Failed to set dictionary value:key = %s for "
"path %s", GLUSTERFS_OPEN_FD_COUNT, loc->path);
goto err;
}
/* need it for dir self-heal */
dht_check_and_set_acl_xattr_req (loc->inode, local->xattr_req);
for (i = 0; i < call_cnt; i++) {
subvol = layout->list[i].xlator;
gf_msg_debug (this->name, 0, "calling "
"revalidate lookup for %s at %s",
loc->path, subvol->name);
STACK_WIND_COOKIE (frame, dht_revalidate_cbk, subvol,
subvol, subvol->fops->lookup,
&local->loc, local->xattr_req);
}
} else {
do_fresh_lookup:
/* TODO: remove the hard-coding */
ret = dict_set_uint32 (local->xattr_req,
conf->xattr_name, 4 * 4);
if (ret) {
gf_msg (this->name, GF_LOG_WARNING, ENOMEM,
DHT_MSG_DICT_SET_FAILED,
"Failed to set dictionary value:key = %s for "
"path %s", conf->xattr_name, loc->path);
goto err;
}
ret = dict_set_uint32 (local->xattr_req,
conf->link_xattr_name, 256);
if (ret) {
gf_msg (this->name, GF_LOG_WARNING, ENOMEM,
DHT_MSG_DICT_SET_FAILED,
"Failed to set dictionary value:key = %s for "
"path %s", conf->link_xattr_name, loc->path);
goto err;
}
/* need it for self-healing linkfiles which is
'in-migration' state */
ret = dict_set_uint32 (local->xattr_req,
GLUSTERFS_OPEN_FD_COUNT, 4);
if (ret) {
gf_msg (this->name, GF_LOG_WARNING, ENOMEM,
DHT_MSG_DICT_SET_FAILED,
"Failed to set dictionary value:key = %s for "
"path %s", GLUSTERFS_OPEN_FD_COUNT, loc->path);
goto err;
}
/* need it for dir self-heal */
dht_check_and_set_acl_xattr_req (loc->inode, local->xattr_req);
if (!hashed_subvol) {
gf_msg_debug (this->name, 0,
"no subvolume in layout for path=%s, "
"checking on all the subvols to see if "
"it is a directory", loc->path);
call_cnt = conf->subvolume_cnt;
local->call_cnt = call_cnt;
local->layout = dht_layout_new (this,
conf->subvolume_cnt);
if (!local->layout) {
op_errno = ENOMEM;
goto err;
}
gf_msg_debug (this->name, 0,
"Found null hashed subvol. Calling lookup"
" on all nodes.");
for (i = 0; i < call_cnt; i++) {
STACK_WIND_COOKIE (frame, dht_lookup_dir_cbk,
conf->subvolumes[i],
conf->subvolumes[i],
conf->subvolumes[i]->fops->lookup,
&local->loc, local->xattr_req);
}
return 0;
}
gf_msg_debug (this->name, 0, "Calling fresh lookup for %s on"
" %s", loc->path, hashed_subvol->name);
STACK_WIND_COOKIE (frame, dht_lookup_cbk, hashed_subvol,
hashed_subvol, hashed_subvol->fops->lookup,
loc, local->xattr_req);
dht_do_revalidate (frame, this, loc);
return 0;
} else {
dht_do_fresh_lookup (frame, this, loc);
return 0;
}
return 0;
err:
op_errno = (op_errno == -1) ? errno : op_errno;
DHT_STACK_UNWIND (lookup, frame, -1, op_errno, NULL, NULL, NULL,