features/sdfs: implement readdirp

Since readdirp acts as a batched lookup for all dentries it reads, it
has to synchronize with any entry operation within the directory being
read.

Change-Id: I923a6ebd21856dbaa5fa5db4a26a29b7b29b3159
Signed-off-by: Raghavendra G <rgowdapp@redhat.com>
fixes: #421
This commit is contained in:
Raghavendra G 2018-05-22 19:15:19 +05:30
parent 2d5b179d1a
commit ead98973da

View File

@ -118,7 +118,7 @@ out:
}
static int
sdfs_get_new_frame (call_frame_t *frame, loc_t *loc, call_frame_t **new_frame)
sdfs_get_new_frame_common (call_frame_t *frame, call_frame_t **new_frame)
{
int ret = -1;
sdfs_local_t *local = NULL;
@ -132,6 +132,7 @@ sdfs_get_new_frame (call_frame_t *frame, loc_t *loc, call_frame_t **new_frame)
client = frame->root->client;
gf_client_ref (client);
(*new_frame)->root->client = client;
local = sdfs_local_init (*new_frame, THIS);
if (!local) {
goto err;
@ -139,6 +140,29 @@ sdfs_get_new_frame (call_frame_t *frame, loc_t *loc, call_frame_t **new_frame)
local->main_frame = frame;
ret = 0;
err:
if ((ret == -1) && (*new_frame)) {
SDFS_STACK_DESTROY ((*new_frame));
*new_frame = NULL;
}
return ret;
}
static int
sdfs_get_new_frame (call_frame_t *frame, loc_t *loc, call_frame_t **new_frame)
{
int ret = -1;
sdfs_local_t *local = NULL;
ret = sdfs_get_new_frame_common (frame, new_frame);
if (ret < 0) {
goto err;
}
local = (*new_frame)->local;
ret = sdfs_build_parent_loc (&local->parent_loc, loc);
if (ret) {
goto err;
@ -151,9 +175,32 @@ sdfs_get_new_frame (call_frame_t *frame, loc_t *loc, call_frame_t **new_frame)
ret = 0;
err:
if (ret == -1) {
SDFS_STACK_DESTROY (frame);
if ((ret < 0) && (*new_frame != NULL)) {
SDFS_STACK_DESTROY ((*new_frame));
*new_frame = NULL;
}
return ret;
}
static int
sdfs_get_new_frame_readdirp (call_frame_t *frame, fd_t *fd,
call_frame_t **new_frame)
{
int ret = -1;
sdfs_local_t *local = NULL;
ret = sdfs_get_new_frame_common (frame, new_frame);
if (ret < 0) {
goto err;
}
local = (*new_frame)->local;
local->parent_loc.inode = inode_ref (fd->inode);
gf_uuid_copy (local->parent_loc.gfid, fd->inode->gfid);
ret = 0;
err:
return ret;
}
@ -1305,6 +1352,99 @@ err:
return 0;
}
int32_t
sdfs_readdirp_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno, gf_dirent_t *entries,
dict_t *xdata)
{
sdfs_local_t *local = NULL;
local = frame->local;
STACK_UNWIND_STRICT (readdirp, local->main_frame, op_ret, op_errno,
entries, xdata);
local->main_frame = NULL;
STACK_WIND (frame, sdfs_entrylk_cbk, FIRST_CHILD (this),
FIRST_CHILD(this)->fops->entrylk,
this->name, &local->parent_loc, NULL,
ENTRYLK_UNLOCK, ENTRYLK_RDLCK, xdata);
return 0;
}
int32_t
sdfs_readdirp_helper (call_frame_t *frame, xlator_t *this, fd_t *fd,
size_t size, off_t off, dict_t *xdata)
{
sdfs_local_t *local = NULL;
char gfid[GF_UUID_BUF_SIZE] = {0};
local = frame->local;
gf_uuid_unparse(fd->inode->gfid, gfid);
if (local->op_ret < 0) {
gf_msg (this->name, GF_LOG_ERROR, 0,
SDFS_MSG_ENTRYLK_ERROR,
"Acquiring entry lock failed for directory %s "
"with parent gfid %s", local->loc.name, gfid);
goto err;
}
STACK_WIND (frame, sdfs_readdirp_cbk, FIRST_CHILD(this),
FIRST_CHILD(this)->fops->readdirp, fd, size, off, xdata);
return 0;
err:
STACK_UNWIND_STRICT (readdirp, local->main_frame, -1, local->op_errno,
NULL, NULL);
local->main_frame = NULL;
SDFS_STACK_DESTROY (frame);
return 0;
}
int32_t
sdfs_readdirp (call_frame_t *frame, xlator_t *this, fd_t *fd, size_t size,
off_t off, dict_t *xdata)
{
sdfs_local_t *local = NULL;
call_frame_t *new_frame = NULL;
call_stub_t *stub = NULL;
int op_errno = 0;
if (-1 == sdfs_get_new_frame_readdirp (frame, fd, &new_frame)) {
op_errno = ENOMEM;
goto err;
}
stub = fop_readdirp_stub (new_frame, sdfs_readdirp_helper, fd, size,
off, xdata);
if (!stub) {
op_errno = ENOMEM;
goto err;
}
local = new_frame->local;
local->stub = stub;
STACK_WIND (new_frame, sdfs_entrylk_cbk,
FIRST_CHILD (this),
FIRST_CHILD(this)->fops->entrylk,
this->name, &local->parent_loc, NULL,
ENTRYLK_LOCK, ENTRYLK_RDLCK, xdata);
return 0;
err:
STACK_UNWIND_STRICT (readdirp, frame, -1, op_errno, NULL, NULL);
if (new_frame)
SDFS_STACK_DESTROY (new_frame);
return 0;
}
int
init (xlator_t *this)
{
@ -1351,6 +1491,7 @@ struct xlator_fops fops = {
.mknod = sdfs_mknod,
.rename = sdfs_rename,
.lookup = sdfs_lookup,
.readdirp = sdfs_readdirp,
};
struct xlator_cbks cbks;