ceph: fix directory fsync

fsync() on directory should flush dirty caps and wait for any
uncommitted directory opertions to commit. But ceph_dir_fsync()
only waits for uncommitted directory opertions.

Signed-off-by: Yan, Zheng <zyan@redhat.com>
This commit is contained in:
Yan, Zheng 2015-05-27 11:19:34 +08:00 committed by Ilya Dryomov
parent 89b52fe14d
commit da819c8150
2 changed files with 65 additions and 64 deletions

View File

@ -1838,13 +1838,16 @@ static void sync_write_wait(struct inode *inode)
struct ceph_osd_request *req; struct ceph_osd_request *req;
u64 last_tid; u64 last_tid;
if (!S_ISREG(inode->i_mode))
return;
spin_lock(&ci->i_unsafe_lock); spin_lock(&ci->i_unsafe_lock);
if (list_empty(head)) if (list_empty(head))
goto out; goto out;
/* set upper bound as _last_ entry in chain */ /* set upper bound as _last_ entry in chain */
req = list_entry(head->prev, struct ceph_osd_request, req = list_last_entry(head, struct ceph_osd_request,
r_unsafe_item); r_unsafe_item);
last_tid = req->r_tid; last_tid = req->r_tid;
do { do {
@ -1862,13 +1865,59 @@ static void sync_write_wait(struct inode *inode)
*/ */
if (list_empty(head)) if (list_empty(head))
break; break;
req = list_entry(head->next, struct ceph_osd_request, req = list_first_entry(head, struct ceph_osd_request,
r_unsafe_item); r_unsafe_item);
} while (req->r_tid < last_tid); } while (req->r_tid < last_tid);
out: out:
spin_unlock(&ci->i_unsafe_lock); spin_unlock(&ci->i_unsafe_lock);
} }
/*
* wait for any uncommitted directory operations to commit.
*/
static int unsafe_dirop_wait(struct inode *inode)
{
struct ceph_inode_info *ci = ceph_inode(inode);
struct list_head *head = &ci->i_unsafe_dirops;
struct ceph_mds_request *req;
u64 last_tid;
int ret = 0;
if (!S_ISDIR(inode->i_mode))
return 0;
spin_lock(&ci->i_unsafe_lock);
if (list_empty(head))
goto out;
req = list_last_entry(head, struct ceph_mds_request,
r_unsafe_dir_item);
last_tid = req->r_tid;
do {
ceph_mdsc_get_request(req);
spin_unlock(&ci->i_unsafe_lock);
dout("unsafe_dirop_wait %p wait on tid %llu (until %llu)\n",
inode, req->r_tid, last_tid);
ret = !wait_for_completion_timeout(&req->r_safe_completion,
ceph_timeout_jiffies(req->r_timeout));
if (ret)
ret = -EIO; /* timed out */
ceph_mdsc_put_request(req);
spin_lock(&ci->i_unsafe_lock);
if (ret || list_empty(head))
break;
req = list_first_entry(head, struct ceph_mds_request,
r_unsafe_dir_item);
} while (req->r_tid < last_tid);
out:
spin_unlock(&ci->i_unsafe_lock);
return ret;
}
int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync) int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync)
{ {
struct inode *inode = file->f_mapping->host; struct inode *inode = file->f_mapping->host;
@ -1882,24 +1931,30 @@ int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync)
ret = filemap_write_and_wait_range(inode->i_mapping, start, end); ret = filemap_write_and_wait_range(inode->i_mapping, start, end);
if (ret < 0) if (ret < 0)
return ret; goto out;
if (datasync)
goto out;
mutex_lock(&inode->i_mutex); mutex_lock(&inode->i_mutex);
dirty = try_flush_caps(inode, flush_tid); dirty = try_flush_caps(inode, flush_tid);
dout("fsync dirty caps are %s\n", ceph_cap_string(dirty)); dout("fsync dirty caps are %s\n", ceph_cap_string(dirty));
ret = unsafe_dirop_wait(inode);
/* /*
* only wait on non-file metadata writeback (the mds * only wait on non-file metadata writeback (the mds
* can recover size and mtime, so we don't need to * can recover size and mtime, so we don't need to
* wait for that) * wait for that)
*/ */
if (!datasync && (dirty & ~CEPH_CAP_ANY_FILE_WR)) { if (!ret && (dirty & ~CEPH_CAP_ANY_FILE_WR)) {
ret = wait_event_interruptible(ci->i_cap_wq, ret = wait_event_interruptible(ci->i_cap_wq,
caps_are_flushed(inode, flush_tid)); caps_are_flushed(inode, flush_tid));
} }
dout("fsync %p%s done\n", inode, datasync ? " datasync" : "");
mutex_unlock(&inode->i_mutex); mutex_unlock(&inode->i_mutex);
out:
dout("fsync %p%s result=%d\n", inode, datasync ? " datasync" : "", ret);
return ret; return ret;
} }

View File

@ -1223,60 +1223,6 @@ static ssize_t ceph_read_dir(struct file *file, char __user *buf, size_t size,
return size - left; return size - left;
} }
/*
* an fsync() on a dir will wait for any uncommitted directory
* operations to commit.
*/
static int ceph_dir_fsync(struct file *file, loff_t start, loff_t end,
int datasync)
{
struct inode *inode = file_inode(file);
struct ceph_inode_info *ci = ceph_inode(inode);
struct list_head *head = &ci->i_unsafe_dirops;
struct ceph_mds_request *req;
u64 last_tid;
int ret = 0;
dout("dir_fsync %p\n", inode);
ret = filemap_write_and_wait_range(inode->i_mapping, start, end);
if (ret)
return ret;
mutex_lock(&inode->i_mutex);
spin_lock(&ci->i_unsafe_lock);
if (list_empty(head))
goto out;
req = list_entry(head->prev,
struct ceph_mds_request, r_unsafe_dir_item);
last_tid = req->r_tid;
do {
ceph_mdsc_get_request(req);
spin_unlock(&ci->i_unsafe_lock);
dout("dir_fsync %p wait on tid %llu (until %llu)\n",
inode, req->r_tid, last_tid);
ret = !wait_for_completion_timeout(&req->r_safe_completion,
ceph_timeout_jiffies(req->r_timeout));
if (ret)
ret = -EIO; /* timed out */
ceph_mdsc_put_request(req);
spin_lock(&ci->i_unsafe_lock);
if (ret || list_empty(head))
break;
req = list_entry(head->next,
struct ceph_mds_request, r_unsafe_dir_item);
} while (req->r_tid < last_tid);
out:
spin_unlock(&ci->i_unsafe_lock);
mutex_unlock(&inode->i_mutex);
return ret;
}
/* /*
* We maintain a private dentry LRU. * We maintain a private dentry LRU.
* *
@ -1347,7 +1293,7 @@ const struct file_operations ceph_dir_fops = {
.open = ceph_open, .open = ceph_open,
.release = ceph_release, .release = ceph_release,
.unlocked_ioctl = ceph_ioctl, .unlocked_ioctl = ceph_ioctl,
.fsync = ceph_dir_fsync, .fsync = ceph_fsync,
}; };
const struct file_operations ceph_snapdir_fops = { const struct file_operations ceph_snapdir_fops = {