ec: Fix posix compliance failures

This patch solves some problems that caused dispersed volumes to not
pass posix smoke tests:

* Problems in open/create with O_WRONLY
    Opening files with -w- permissions using O_WRONLY returned an EACCES
    error because internally O_WRONLY was replaced with O_RDWR.

* Problems with entrylk on renames.
    When source and destination were the same, ec tried to acquire
    the same entrylk twice, causing a deadlock.

* Overwrite of a variable when reordering locks.
    On a rename, if the second lock needed to be placed at the beggining
    of the list, the 'lock' variable was overwritten and later its timer
    was cancelled, cancelling the incorrect one.

* Handle O_TRUNC in open.
    When O_TRUNC was received in an open call, it was blindly propagated
    to child subvolumes. This caused a discrepancy between real file
    size and the size stored into trusted.ec.size xattr. This has been
    solved by removing O_TRUNC from open and later calling ftruncate.

Change-Id: I20c3d6e1c11be314be86879be54b728e01013798
BUG: 1161886
Signed-off-by: Xavier Hernandez <xhernandez@datalab.es>
Reviewed-on: http://review.gluster.org/9420
Reviewed-by: Dan Lambright <dlambrig@redhat.com>
Tested-by: Gluster Build System <jenkins@build.gluster.com>
Reviewed-by: Pranith Kumar Karampuri <pkarampu@redhat.com>
Tested-by: Pranith Kumar Karampuri <pkarampu@redhat.com>
This commit is contained in:
Xavier Hernandez 2014-11-08 21:46:41 +01:00 committed by Pranith Kumar Karampuri
parent 88136b53f5
commit b17122ffc7
8 changed files with 198 additions and 87 deletions

View File

@ -0,0 +1,53 @@
#include <stdio.h>
#include "glfs.h"
#include "glfs-handles.h"
int
main (int argc, char *argv[])
{
glfs_t *fs = NULL;
glfs_fd_t *fd = NULL;
int ret = 1;
if (argc != 4) {
fprintf (stderr, "Syntax: %s <host> <volname> <file>\n", argv[0]);
return 1;
}
fs = glfs_new (argv[2]);
if (!fs) {
fprintf (stderr, "glfs_new: returned NULL\n");
return 1;
}
ret = glfs_set_volfile_server (fs, "tcp", argv[1], 24007);
if (ret != 0) {
fprintf (stderr, "glfs_set_volfile_server: retuned %d\n", ret);
goto out;
}
ret = glfs_set_logging (fs, "/dev/null", 7);
if (ret != 0) {
fprintf (stderr, "glfs_set_logging: returned %d\n", ret);
goto out;
}
ret = glfs_init (fs);
if (ret != 0) {
fprintf (stderr, "glfs_init: returned %d\n", ret);
goto out;
}
fd = glfs_open (fs, argv[3], O_RDWR | O_TRUNC);
if (fd == NULL) {
fprintf (stderr, "glfs_open: returned NULL\n");
goto out;
}
glfs_close(fd);
ret = 0;
out:
glfs_fini (fs);
return ret;
}

View File

@ -0,0 +1,44 @@
#!/bin/bash
. $(dirname $0)/../../include.rc
. $(dirname $0)/../../volume.rc
function check_ec_size
{
local res
for i in {0..2}; do
res=$(( `getfattr -n trusted.ec.size -e hex $B0/$V0$i/$1 | sed -n "s/^trusted.ec.size=//p"` ))
if [ "x$res" == "x" -o "$res" != "$2" ]; then
echo "N"
return 0
fi
done
echo "Y"
return 0
}
cleanup
TEST glusterd
TEST pidof glusterd
TEST $CLI volume create $V0 redundancy 1 $H0:$B0/${V0}{0..2}
EXPECT "Created" volinfo_field $V0 'Status'
TEST $CLI volume start $V0
EXPECT_WITHIN $PROCESS_UP_TIMEOUT "Started" volinfo_field $V0 'Status'
TEST $GFS --volfile-id=/$V0 --volfile-server=$H0 $M0
EXPECT_WITHIN $CHILD_UP_TIMEOUT "3" ec_child_up_count $V0 0
TEST dd if=/dev/zero of=$M0/file1 bs=4k count=1
TEST mv $M0/file1 $M0/file2
TEST ! stat $M0/file1
TEST stat $M0/file2
TEST gcc -Wall -O2 -I api/src -o $(dirname $0)/bug-1161886 $(dirname $0)/bug-1161886.c -lgfapi
TEST $(dirname $0)/bug-1161886 $H0 $V0 /file2
EXPECT "^0$" stat -c "%s" $M0/file2
EXPECT "^Y$" check_ec_size file2 0
rm -f $(dirname $0)/bug-1161886
cleanup

View File

@ -232,6 +232,16 @@ void ec_fop_set_error(ec_fop_data_t * fop, int32_t error)
UNLOCK(&fop->lock);
}
void ec_sleep(ec_fop_data_t *fop)
{
LOCK(&fop->lock);
fop->refs++;
fop->jobs++;
UNLOCK(&fop->lock);
}
int32_t ec_check_complete(ec_fop_data_t * fop, ec_resume_f resume)
{
int32_t error = -1;
@ -435,12 +445,7 @@ int32_t ec_child_select(ec_fop_data_t * fop)
return 0;
}
LOCK(&fop->lock);
fop->jobs++;
fop->refs++;
UNLOCK(&fop->lock);
ec_sleep(fop);
return 1;
}
@ -637,22 +642,23 @@ int32_t ec_lock_compare(ec_lock_t * lock1, ec_lock_t * lock2)
ec_lock_link_t *ec_lock_insert(ec_fop_data_t *fop, ec_lock_t *lock,
int32_t update)
{
ec_lock_t * tmp;
ec_lock_t *new_lock, *tmp;
ec_lock_link_t *link = NULL;
int32_t tmp_update;
new_lock = lock;
if ((fop->lock_count > 0) &&
(ec_lock_compare(fop->locks[0].lock, lock) > 0))
(ec_lock_compare(fop->locks[0].lock, new_lock) > 0))
{
tmp = fop->locks[0].lock;
fop->locks[0].lock = lock;
lock = tmp;
fop->locks[0].lock = new_lock;
new_lock = tmp;
tmp_update = fop->locks_update;
fop->locks_update = update;
update = tmp_update;
}
fop->locks[fop->lock_count].lock = lock;
fop->locks[fop->lock_count].lock = new_lock;
fop->locks[fop->lock_count].fop = fop;
fop->locks_update |= update << fop->lock_count;
@ -693,6 +699,16 @@ void ec_lock_prepare_entry(ec_fop_data_t *fop, loc_t *loc, int32_t update)
return;
}
/* If there's another lock, make sure that it's not the same. Otherwise
* do not insert it.
*
* This can only happen on renames where source and target names are
* in the same directory. */
if ((fop->lock_count > 0) &&
(fop->locks[0].lock->loc.inode == tmp.inode)) {
goto wipe;
}
} else {
if (ec_loc_from_loc(fop->xl, &tmp, loc) != 0) {
ec_fop_set_error(fop, EIO);
@ -742,6 +758,7 @@ insert:
unlock:
UNLOCK(&tmp.inode->lock);
wipe:
loc_wipe(&tmp);
if (link != NULL) {
@ -870,12 +887,7 @@ void ec_lock(ec_fop_data_t * fop)
list_add_tail(&fop->locks[fop->locked].wait_list, &lock->waiting);
LOCK(&fop->lock);
fop->jobs++;
fop->refs++;
UNLOCK(&fop->lock);
ec_sleep(fop);
UNLOCK(&lock->loc.inode->lock);
@ -1332,12 +1344,7 @@ void ec_unlock_timer_add(ec_lock_link_t *link)
delay.tv_sec = 1;
delay.tv_nsec = 0;
LOCK(&fop->lock);
fop->jobs++;
fop->refs++;
UNLOCK(&fop->lock);
ec_sleep(fop);
/* If healing is needed, do not delay lock release to let self-heal
* start working as soon as possible. */
@ -1356,6 +1363,7 @@ void ec_unlock_timer_add(ec_lock_link_t *link)
refs = 0;
}
} else {
ec_trace("UNLOCK_FORCE", fop, "lock=%p", lock);
*lock->plock = NULL;
refs = 0;
}

View File

@ -93,6 +93,7 @@ void ec_dispatch_one(ec_fop_data_t * fop);
void ec_wait_winds(ec_fop_data_t * fop);
void ec_sleep(ec_fop_data_t *fop);
void ec_resume(ec_fop_data_t * fop, int32_t error);
void ec_resume_parent(ec_fop_data_t * fop, int32_t error);

View File

@ -158,10 +158,7 @@ int32_t ec_manager_create(ec_fop_data_t * fop, int32_t state)
return EC_STATE_REPORT;
}
if (ctx->flags == 0)
{
ctx->flags = fop->int32;
}
ctx->flags = fop->int32;
UNLOCK(&fop->fd->lock);
@ -207,8 +204,7 @@ int32_t ec_manager_create(ec_fop_data_t * fop, int32_t state)
/* We need to write to specific offsets on the bricks, so we
* need to remove O_APPEND from flags (if present) */
fop->int32 &= ~(O_ACCMODE | O_APPEND);
fop->int32 |= O_RDWR;
fop->int32 &= ~O_APPEND;
/* Fall through */

View File

@ -783,7 +783,6 @@ ec_cbk_data_t * ec_heal_lookup_check(ec_heal_t * heal, uintptr_t * pgood,
void ec_heal_prepare(ec_heal_t * heal)
{
ec_cbk_data_t * cbk;
ec_fd_t * ctx;
int32_t error = ENOMEM;
heal->available = heal->good;
@ -814,13 +813,6 @@ void ec_heal_prepare(ec_heal_t * heal)
goto out;
}
ctx = ec_fd_get(heal->fd, heal->xl);
if ((ctx == NULL) || (loc_copy(&ctx->loc, &heal->loc) != 0))
{
goto out;
}
ctx->flags = O_RDWR;
}
if (heal->iatt.ia_type == IA_IFLNK)
@ -1057,11 +1049,6 @@ void ec_heal_reopen_fd(ec_heal_t * heal)
else
{
flags = ctx_fd->flags & ~(O_TRUNC | O_APPEND);
if ((flags & O_ACCMODE) == O_WRONLY)
{
flags &= ~O_ACCMODE;
flags |= O_RDWR;
}
ec_open(heal->fop->frame, heal->xl, mask, EC_MINIMUM_ONE,
ec_heal_reopen_cbk, NULL, &heal->loc, flags, fd,

View File

@ -697,6 +697,26 @@ void ec_wind_open(ec_t * ec, ec_fop_data_t * fop, int32_t idx)
&fop->loc[0], fop->int32, fop->fd, fop->xdata);
}
int32_t ec_open_truncate_cbk(call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno,
struct iatt *prebuf, struct iatt *postbuf,
dict_t *xdata)
{
ec_fop_data_t *fop = cookie;
int32_t error = 0;
fop = fop->data;
if (op_ret >= 0) {
fop->answer->iatt[0] = *postbuf;
} else {
error = op_errno;
}
ec_resume(fop, error);
return 0;
}
int32_t ec_manager_open(ec_fop_data_t * fop, int32_t state)
{
ec_cbk_data_t * cbk;
@ -717,21 +737,18 @@ int32_t ec_manager_open(ec_fop_data_t * fop, int32_t state)
return EC_STATE_REPORT;
}
if (ctx->flags == 0)
{
ctx->flags = fop->int32;
}
ctx->flags = fop->int32;
UNLOCK(&fop->fd->lock);
if ((fop->int32 & O_ACCMODE) == O_WRONLY)
{
fop->int32 &= ~O_ACCMODE;
fop->int32 |= O_RDWR;
}
/* We need to write to specific offsets on the bricks, so we
* need to remove O_APPEND from flags (if present) */
fop->int32 &= ~O_APPEND;
need to remove O_APPEND from flags (if present).
If O_TRUNC is specified, we remove it from open and an
ftruncate will be executed later, which will correctly update
the file size taking appropriate locks. O_TRUNC flag is saved
into fop->uint32 to use it later.*/
fop->uint32 = fop->int32 & O_TRUNC;
fop->int32 &= ~(O_APPEND | O_TRUNC);
/* Fall through */
@ -766,6 +783,17 @@ int32_t ec_manager_open(ec_fop_data_t * fop, int32_t state)
}
UNLOCK(&fop->fd->lock);
/* If O_TRUNC was specified, call ftruncate to
effectively trunc the file with appropriate locks
acquired. We don't use ctx->flags because self-heal
can use the same fd with different flags. */
if (fop->uint32 != 0) {
ec_sleep(fop);
ec_ftruncate(fop->req_frame, fop->xl, cbk->mask,
fop->minimum, ec_open_truncate_cbk,
fop, cbk->fd, 0, NULL);
}
}
}
if (cbk->op_ret < 0) {

View File

@ -1248,8 +1248,6 @@ int32_t ec_truncate_open_cbk(call_frame_t * frame, void * cookie,
int32_t ec_truncate_clean(ec_fop_data_t * fop)
{
ec_fd_t * ctx;
if (fop->fd == NULL)
{
fop->fd = fd_create(fop->loc[0].inode, fop->frame->root->pid);
@ -1257,13 +1255,6 @@ int32_t ec_truncate_clean(ec_fop_data_t * fop)
{
return 0;
}
ctx = ec_fd_get(fop->fd, fop->xl);
if ((ctx == NULL) || (loc_copy(&ctx->loc, &fop->loc[0]) != 0))
{
return 0;
}
ctx->flags = O_RDWR;
ec_open(fop->frame, fop->xl, fop->answer->mask, fop->minimum,
ec_truncate_open_cbk, fop, &fop->loc[0], O_RDWR, fop->fd,
@ -1701,20 +1692,6 @@ out:
/* FOP: writev */
int32_t ec_writev_init(ec_fop_data_t * fop)
{
ec_fd_t * ctx;
ctx = ec_fd_get(fop->fd, fop->xl);
if (ctx != NULL) {
if ((ctx->flags & O_ACCMODE) == O_RDONLY) {
return EBADF;
}
}
return 0;
}
int32_t ec_writev_merge_tail(call_frame_t * frame, void * cookie,
xlator_t * this, int32_t op_ret, int32_t op_errno,
struct iovec * vector, int32_t count,
@ -1787,14 +1764,29 @@ int32_t ec_writev_merge_head(call_frame_t * frame, void * cookie,
return 0;
}
void ec_writev_start(ec_fop_data_t * fop)
void ec_writev_start(ec_fop_data_t *fop)
{
ec_t *ec = fop->xl->private;
struct iobref *iobref = NULL;
struct iobuf *iobuf = NULL;
void *ptr = NULL;
ec_fd_t *ctx;
fd_t *fd;
size_t tail;
uid_t uid;
gid_t gid;
fd = fd_anonymous(fop->fd->inode);
if (fd == NULL) {
ec_fop_set_error(fop, EIO);
return;
}
uid = fop->frame->root->uid;
fop->frame->root->uid = 0;
gid = fop->frame->root->gid;
fop->frame->root->gid = 0;
ctx = ec_fd_get(fop->fd, fop->xl);
if (ctx != NULL) {
@ -1833,7 +1825,7 @@ void ec_writev_start(ec_fop_data_t * fop)
if (fop->head > 0)
{
ec_readv(fop->frame, fop->xl, -1, EC_MINIMUM_MIN, ec_writev_merge_head,
NULL, fop->fd, ec->stripe_size, fop->offset, 0, NULL);
NULL, fd, ec->stripe_size, fop->offset, 0, NULL);
}
tail = fop->size - fop->user_size - fop->head;
if ((tail > 0) && ((fop->head == 0) || (fop->size > ec->stripe_size)))
@ -1841,7 +1833,7 @@ void ec_writev_start(ec_fop_data_t * fop)
if (fop->pre_size > fop->offset + fop->head + fop->user_size)
{
ec_readv(fop->frame, fop->xl, -1, EC_MINIMUM_MIN,
ec_writev_merge_tail, NULL, fop->fd, ec->stripe_size,
ec_writev_merge_tail, NULL, fd, ec->stripe_size,
fop->offset + fop->size - ec->stripe_size, 0, NULL);
}
else
@ -1850,6 +1842,11 @@ void ec_writev_start(ec_fop_data_t * fop)
}
}
fop->frame->root->uid = uid;
fop->frame->root->gid = gid;
fd_unref(fd);
return;
out:
@ -1860,6 +1857,11 @@ out:
iobref_unref(iobref);
}
fop->frame->root->uid = uid;
fop->frame->root->gid = gid;
fd_unref(fd);
ec_fop_set_error(fop, EIO);
}
@ -2007,14 +2009,6 @@ int32_t ec_manager_writev(ec_fop_data_t * fop, int32_t state)
switch (state)
{
case EC_STATE_INIT:
fop->error = ec_writev_init(fop);
if (fop->error != 0)
{
return EC_STATE_REPORT;
}
/* Fall through */
case EC_STATE_LOCK:
ec_lock_prepare_fd(fop, fop->fd, 1);
ec_lock(fop);