1
0
mirror of https://github.com/samba-team/samba.git synced 2024-12-25 23:21:54 +03:00

vfs_glusterfs: Use pthreadpool for scheduling aio operations

BUG: https://bugzilla.samba.org/show_bug.cgi?id=14098

Signed-off-by: Poornima G <pgurusid@redhat.com>
Reviewed-by: Guenther Deschner <gd@samba.org>
Reviewed-by: Jeremy Allison <jra@samba.org>

Autobuild-User(master): Jeremy Allison <jra@samba.org>
Autobuild-Date(master): Fri Aug 23 18:40:08 UTC 2019 on sn-devel-184
This commit is contained in:
Poornima G 2019-07-24 15:15:33 +05:30 committed by Jeremy Allison
parent b4816861f2
commit d8863dd8cb

View File

@ -45,14 +45,11 @@
#include "lib/util/sys_rw.h"
#include "smbprofile.h"
#include "modules/posixacl_xattr.h"
#include "lib/pthreadpool/pthreadpool_tevent.h"
#define DEFAULT_VOLFILE_SERVER "localhost"
#define GLUSTER_NAME_MAX 255
static int read_fd = -1;
static int write_fd = -1;
static struct tevent_fd *aio_read_event = NULL;
/**
* Helper to convert struct stat to struct stat_ex.
*/
@ -713,195 +710,20 @@ static ssize_t vfs_gluster_pread(struct vfs_handle_struct *handle,
return ret;
}
struct glusterfs_aio_state;
struct glusterfs_aio_wrapper {
struct glusterfs_aio_state *state;
};
struct glusterfs_aio_state {
struct vfs_gluster_pread_state {
ssize_t ret;
struct tevent_req *req;
bool cancelled;
glfs_fd_t *fd;
void *buf;
size_t count;
off_t offset;
struct vfs_aio_state vfs_aio_state;
struct timespec start;
SMBPROFILE_BYTES_ASYNC_STATE(profile_bytes);
};
static int aio_wrapper_destructor(struct glusterfs_aio_wrapper *wrap)
{
if (wrap->state != NULL) {
wrap->state->cancelled = true;
}
return 0;
}
/*
* This function is the callback that will be called on glusterfs
* threads once the async IO submitted is complete. To notify
* Samba of the completion we use a pipe based queue.
*/
#ifdef HAVE_GFAPI_VER_7_6
static void aio_glusterfs_done(glfs_fd_t *fd, ssize_t ret,
struct glfs_stat *prestat,
struct glfs_stat *poststat,
void *data)
#else
static void aio_glusterfs_done(glfs_fd_t *fd, ssize_t ret, void *data)
#endif
{
struct glusterfs_aio_state *state = NULL;
int sts = 0;
struct timespec end;
state = (struct glusterfs_aio_state *)data;
PROFILE_TIMESTAMP(&end);
if (ret < 0) {
state->ret = -1;
state->vfs_aio_state.error = errno;
} else {
state->ret = ret;
}
state->vfs_aio_state.duration = nsec_time_diff(&end, &state->start);
SMBPROFILE_BYTES_ASYNC_END(state->profile_bytes);
/*
* Write the state pointer to glusterfs_aio_state to the
* pipe, so we can call tevent_req_done() from the main thread,
* because tevent_req_done() is not designed to be executed in
* the multithread environment, so tevent_req_done() must be
* executed from the smbd main thread.
*
* write(2) on pipes with sizes under _POSIX_PIPE_BUF
* in size is atomic, without this, the use op pipes in this
* code would not work.
*
* sys_write is a thin enough wrapper around write(2)
* that we can trust it here.
*/
sts = sys_write(write_fd, &state, sizeof(struct glusterfs_aio_state *));
if (sts < 0) {
DEBUG(0,("\nWrite to pipe failed (%s)", strerror(errno)));
}
return;
}
/*
* Read each req off the pipe and process it.
*/
static void aio_tevent_fd_done(struct tevent_context *event_ctx,
struct tevent_fd *fde,
uint16_t flags, void *data)
{
struct tevent_req *req = NULL;
struct glusterfs_aio_state *state = NULL;
int sts = 0;
/*
* read(2) on pipes is atomic if the needed data is available
* in the pipe, per SUS and POSIX. Because we always write
* to the pipe in sizeof(struct tevent_req *) chunks, we can
* always read in those chunks, atomically.
*
* sys_read is a thin enough wrapper around read(2) that we
* can trust it here.
*/
sts = sys_read(read_fd, &state, sizeof(struct glusterfs_aio_state *));
if (sts < 0) {
DEBUG(0,("\nRead from pipe failed (%s)", strerror(errno)));
}
/* if we've cancelled the op, there is no req, so just clean up. */
if (state->cancelled == true) {
TALLOC_FREE(state);
return;
}
req = state->req;
if (req) {
tevent_req_done(req);
}
return;
}
static bool init_gluster_aio(struct vfs_handle_struct *handle)
{
int fds[2];
int ret = -1;
if (read_fd != -1) {
/*
* Already initialized.
*/
return true;
}
ret = pipe(fds);
if (ret == -1) {
goto fail;
}
read_fd = fds[0];
write_fd = fds[1];
aio_read_event = tevent_add_fd(handle->conn->sconn->ev_ctx,
NULL,
read_fd,
TEVENT_FD_READ,
aio_tevent_fd_done,
NULL);
if (aio_read_event == NULL) {
goto fail;
}
return true;
fail:
TALLOC_FREE(aio_read_event);
if (read_fd != -1) {
close(read_fd);
close(write_fd);
read_fd = -1;
write_fd = -1;
}
return false;
}
static struct glusterfs_aio_state *aio_state_create(TALLOC_CTX *mem_ctx)
{
struct tevent_req *req = NULL;
struct glusterfs_aio_state *state = NULL;
struct glusterfs_aio_wrapper *wrapper = NULL;
req = tevent_req_create(mem_ctx, &wrapper, struct glusterfs_aio_wrapper);
if (req == NULL) {
return NULL;
}
state = talloc_zero(NULL, struct glusterfs_aio_state);
if (state == NULL) {
TALLOC_FREE(req);
return NULL;
}
talloc_set_destructor(wrapper, aio_wrapper_destructor);
state->cancelled = false;
state->req = req;
wrapper->state = state;
return state;
}
static void vfs_gluster_pread_do(void *private_data);
static void vfs_gluster_pread_done(struct tevent_req *subreq);
static int vfs_gluster_pread_state_destructor(struct vfs_gluster_pread_state *state);
static struct tevent_req *vfs_gluster_pread_send(struct vfs_handle_struct
*handle, TALLOC_CTX *mem_ctx,
@ -910,50 +732,138 @@ static struct tevent_req *vfs_gluster_pread_send(struct vfs_handle_struct
void *data, size_t n,
off_t offset)
{
struct glusterfs_aio_state *state = NULL;
struct tevent_req *req = NULL;
int ret = 0;
glfs_fd_t *glfd = vfs_gluster_fetch_glfd(handle, fsp);
struct vfs_gluster_pread_state *state;
struct tevent_req *req, *subreq;
glfs_fd_t *glfd = vfs_gluster_fetch_glfd(handle, fsp);
if (glfd == NULL) {
DBG_ERR("Failed to fetch gluster fd\n");
return NULL;
}
state = aio_state_create(mem_ctx);
if (state == NULL) {
req = tevent_req_create(mem_ctx, &state, struct vfs_gluster_pread_state);
if (req == NULL) {
return NULL;
}
req = state->req;
if (!init_gluster_aio(handle)) {
tevent_req_error(req, EIO);
return tevent_req_post(req, ev);
}
/*
* aio_glusterfs_done and aio_tevent_fd_done()
* use the raw tevent context. We need to use
* tevent_req_defer_callback() in order to
* use the event context we're started with.
*/
tevent_req_defer_callback(req, ev);
state->ret = -1;
state->fd = glfd;
state->buf = data;
state->count = n;
state->offset = offset;
SMBPROFILE_BYTES_ASYNC_START(syscall_asys_pread, profile_p,
state->profile_bytes, n);
PROFILE_TIMESTAMP(&state->start);
ret = glfs_pread_async(glfd, data, n, offset, 0, aio_glusterfs_done,
state);
if (ret < 0) {
tevent_req_error(req, -ret);
SMBPROFILE_BYTES_ASYNC_SET_IDLE(state->profile_bytes);
subreq = pthreadpool_tevent_job_send(
state, ev, handle->conn->sconn->pool,
vfs_gluster_pread_do, state);
if (tevent_req_nomem(subreq, req)) {
return tevent_req_post(req, ev);
}
tevent_req_set_callback(subreq, vfs_gluster_pread_done, req);
talloc_set_destructor(state, vfs_gluster_pread_state_destructor);
return req;
}
static void vfs_gluster_pread_do(void *private_data)
{
struct vfs_gluster_pread_state *state = talloc_get_type_abort(
private_data, struct vfs_gluster_pread_state);
struct timespec start_time;
struct timespec end_time;
SMBPROFILE_BYTES_ASYNC_SET_BUSY(state->profile_bytes);
PROFILE_TIMESTAMP(&start_time);
do {
#ifdef HAVE_GFAPI_VER_7_6
state->ret = glfs_pread(state->fd, state->buf, state->count,
state->offset, 0, NULL);
#else
state->ret = glfs_pread(state->fd, state->buf, state->count,
state->offset, 0);
#endif
} while ((state->ret == -1) && (errno == EINTR));
if (state->ret == -1) {
state->vfs_aio_state.error = errno;
}
PROFILE_TIMESTAMP(&end_time);
state->vfs_aio_state.duration = nsec_time_diff(&end_time, &start_time);
SMBPROFILE_BYTES_ASYNC_SET_IDLE(state->profile_bytes);
}
static int vfs_gluster_pread_state_destructor(struct vfs_gluster_pread_state *state)
{
return -1;
}
static void vfs_gluster_pread_done(struct tevent_req *subreq)
{
struct tevent_req *req = tevent_req_callback_data(
subreq, struct tevent_req);
struct vfs_gluster_pread_state *state = tevent_req_data(
req, struct vfs_gluster_pread_state);
int ret;
ret = pthreadpool_tevent_job_recv(subreq);
TALLOC_FREE(subreq);
SMBPROFILE_BYTES_ASYNC_END(state->profile_bytes);
talloc_set_destructor(state, NULL);
if (ret != 0) {
if (ret != EAGAIN) {
tevent_req_error(req, ret);
return;
}
/*
* If we get EAGAIN from pthreadpool_tevent_job_recv() this
* means the lower level pthreadpool failed to create a new
* thread. Fallback to sync processing in that case to allow
* some progress for the client.
*/
vfs_gluster_pread_do(state);
}
tevent_req_done(req);
}
static ssize_t vfs_gluster_pread_recv(struct tevent_req *req,
struct vfs_aio_state *vfs_aio_state)
{
struct vfs_gluster_pread_state *state = tevent_req_data(
req, struct vfs_gluster_pread_state);
if (tevent_req_is_unix_error(req, &vfs_aio_state->error)) {
return -1;
}
*vfs_aio_state = state->vfs_aio_state;
return state->ret;
}
struct vfs_gluster_pwrite_state {
ssize_t ret;
glfs_fd_t *fd;
const void *buf;
size_t count;
off_t offset;
struct vfs_aio_state vfs_aio_state;
SMBPROFILE_BYTES_ASYNC_STATE(profile_bytes);
};
static void vfs_gluster_pwrite_do(void *private_data);
static void vfs_gluster_pwrite_done(struct tevent_req *subreq);
static int vfs_gluster_pwrite_state_destructor(struct vfs_gluster_pwrite_state *state);
static struct tevent_req *vfs_gluster_pwrite_send(struct vfs_handle_struct
*handle, TALLOC_CTX *mem_ctx,
struct tevent_context *ev,
@ -961,78 +871,122 @@ static struct tevent_req *vfs_gluster_pwrite_send(struct vfs_handle_struct
const void *data, size_t n,
off_t offset)
{
struct glusterfs_aio_state *state = NULL;
struct tevent_req *req = NULL;
int ret = 0;
glfs_fd_t *glfd = vfs_gluster_fetch_glfd(handle, fsp);
struct tevent_req *req, *subreq;
struct vfs_gluster_pwrite_state *state;
glfs_fd_t *glfd = vfs_gluster_fetch_glfd(handle, fsp);
if (glfd == NULL) {
DBG_ERR("Failed to fetch gluster fd\n");
return NULL;
}
state = aio_state_create(mem_ctx);
if (state == NULL) {
req = tevent_req_create(mem_ctx, &state, struct vfs_gluster_pwrite_state);
if (req == NULL) {
return NULL;
}
req = state->req;
if (!init_gluster_aio(handle)) {
tevent_req_error(req, EIO);
return tevent_req_post(req, ev);
}
/*
* aio_glusterfs_done and aio_tevent_fd_done()
* use the raw tevent context. We need to use
* tevent_req_defer_callback() in order to
* use the event context we're started with.
*/
tevent_req_defer_callback(req, ev);
state->ret = -1;
state->fd = glfd;
state->buf = data;
state->count = n;
state->offset = offset;
SMBPROFILE_BYTES_ASYNC_START(syscall_asys_pwrite, profile_p,
state->profile_bytes, n);
PROFILE_TIMESTAMP(&state->start);
ret = glfs_pwrite_async(glfd, data, n, offset, 0, aio_glusterfs_done,
state);
if (ret < 0) {
tevent_req_error(req, -ret);
SMBPROFILE_BYTES_ASYNC_SET_IDLE(state->profile_bytes);
subreq = pthreadpool_tevent_job_send(
state, ev, handle->conn->sconn->pool,
vfs_gluster_pwrite_do, state);
if (tevent_req_nomem(subreq, req)) {
return tevent_req_post(req, ev);
}
tevent_req_set_callback(subreq, vfs_gluster_pwrite_done, req);
talloc_set_destructor(state, vfs_gluster_pwrite_state_destructor);
return req;
}
static ssize_t vfs_gluster_recv(struct tevent_req *req,
static void vfs_gluster_pwrite_do(void *private_data)
{
struct vfs_gluster_pwrite_state *state = talloc_get_type_abort(
private_data, struct vfs_gluster_pwrite_state);
struct timespec start_time;
struct timespec end_time;
SMBPROFILE_BYTES_ASYNC_SET_BUSY(state->profile_bytes);
PROFILE_TIMESTAMP(&start_time);
do {
#ifdef HAVE_GFAPI_VER_7_6
state->ret = glfs_pwrite(state->fd, state->buf, state->count,
state->offset, 0, NULL, NULL);
#else
state->ret = glfs_pwrite(state->fd, state->buf, state->count,
state->offset, 0);
#endif
} while ((state->ret == -1) && (errno == EINTR));
if (state->ret == -1) {
state->vfs_aio_state.error = errno;
}
PROFILE_TIMESTAMP(&end_time);
state->vfs_aio_state.duration = nsec_time_diff(&end_time, &start_time);
SMBPROFILE_BYTES_ASYNC_SET_IDLE(state->profile_bytes);
}
static int vfs_gluster_pwrite_state_destructor(struct vfs_gluster_pwrite_state *state)
{
return -1;
}
static void vfs_gluster_pwrite_done(struct tevent_req *subreq)
{
struct tevent_req *req = tevent_req_callback_data(
subreq, struct tevent_req);
struct vfs_gluster_pwrite_state *state = tevent_req_data(
req, struct vfs_gluster_pwrite_state);
int ret;
ret = pthreadpool_tevent_job_recv(subreq);
TALLOC_FREE(subreq);
SMBPROFILE_BYTES_ASYNC_END(state->profile_bytes);
talloc_set_destructor(state, NULL);
if (ret != 0) {
if (ret != EAGAIN) {
tevent_req_error(req, ret);
return;
}
/*
* If we get EAGAIN from pthreadpool_tevent_job_recv() this
* means the lower level pthreadpool failed to create a new
* thread. Fallback to sync processing in that case to allow
* some progress for the client.
*/
vfs_gluster_pwrite_do(state);
}
tevent_req_done(req);
}
static ssize_t vfs_gluster_pwrite_recv(struct tevent_req *req,
struct vfs_aio_state *vfs_aio_state)
{
struct glusterfs_aio_wrapper *wrapper = NULL;
int ret = 0;
wrapper = tevent_req_data(req, struct glusterfs_aio_wrapper);
if (wrapper == NULL) {
return -1;
}
if (wrapper->state == NULL) {
return -1;
}
struct vfs_gluster_pwrite_state *state = tevent_req_data(
req, struct vfs_gluster_pwrite_state);
if (tevent_req_is_unix_error(req, &vfs_aio_state->error)) {
return -1;
}
*vfs_aio_state = wrapper->state->vfs_aio_state;
ret = wrapper->state->ret;
*vfs_aio_state = state->vfs_aio_state;
/* Clean up the state, it is in a NULL context. */
TALLOC_FREE(wrapper->state);
return ret;
return state->ret;
}
static ssize_t vfs_gluster_pwrite(struct vfs_handle_struct *handle,
@ -1115,60 +1069,132 @@ static int vfs_gluster_renameat(struct vfs_handle_struct *handle,
return ret;
}
struct vfs_gluster_fsync_state {
ssize_t ret;
glfs_fd_t *fd;
struct vfs_aio_state vfs_aio_state;
SMBPROFILE_BYTES_ASYNC_STATE(profile_bytes);
};
static void vfs_gluster_fsync_do(void *private_data);
static void vfs_gluster_fsync_done(struct tevent_req *subreq);
static int vfs_gluster_fsync_state_destructor(struct vfs_gluster_fsync_state *state);
static struct tevent_req *vfs_gluster_fsync_send(struct vfs_handle_struct
*handle, TALLOC_CTX *mem_ctx,
struct tevent_context *ev,
files_struct *fsp)
{
struct tevent_req *req = NULL;
struct glusterfs_aio_state *state = NULL;
int ret = 0;
glfs_fd_t *glfd = vfs_gluster_fetch_glfd(handle, fsp);
struct tevent_req *req, *subreq;
struct vfs_gluster_fsync_state *state;
glfs_fd_t *glfd = vfs_gluster_fetch_glfd(handle, fsp);
if (glfd == NULL) {
DBG_ERR("Failed to fetch gluster fd\n");
return NULL;
}
state = aio_state_create(mem_ctx);
if (state == NULL) {
req = tevent_req_create(mem_ctx, &state, struct vfs_gluster_fsync_state);
if (req == NULL) {
return NULL;
}
req = state->req;
if (!init_gluster_aio(handle)) {
tevent_req_error(req, EIO);
return tevent_req_post(req, ev);
}
/*
* aio_glusterfs_done and aio_tevent_fd_done()
* use the raw tevent context. We need to use
* tevent_req_defer_callback() in order to
* use the event context we're started with.
*/
tevent_req_defer_callback(req, ev);
state->ret = -1;
state->fd = glfd;
SMBPROFILE_BYTES_ASYNC_START(syscall_asys_fsync, profile_p,
state->profile_bytes, 0);
PROFILE_TIMESTAMP(&state->start);
ret = glfs_fsync_async(glfd, aio_glusterfs_done, state);
if (ret < 0) {
tevent_req_error(req, -ret);
SMBPROFILE_BYTES_ASYNC_SET_IDLE(state->profile_bytes);
subreq = pthreadpool_tevent_job_send(
state, ev, handle->conn->sconn->pool, vfs_gluster_fsync_do, state);
if (tevent_req_nomem(subreq, req)) {
return tevent_req_post(req, ev);
}
tevent_req_set_callback(subreq, vfs_gluster_fsync_done, req);
talloc_set_destructor(state, vfs_gluster_fsync_state_destructor);
return req;
}
static void vfs_gluster_fsync_do(void *private_data)
{
struct vfs_gluster_fsync_state *state = talloc_get_type_abort(
private_data, struct vfs_gluster_fsync_state);
struct timespec start_time;
struct timespec end_time;
SMBPROFILE_BYTES_ASYNC_SET_BUSY(state->profile_bytes);
PROFILE_TIMESTAMP(&start_time);
do {
#ifdef HAVE_GFAPI_VER_7_6
state->ret = glfs_fsync(state->fd, NULL, NULL);
#else
state->ret = glfs_fsync(state->fd);
#endif
} while ((state->ret == -1) && (errno == EINTR));
if (state->ret == -1) {
state->vfs_aio_state.error = errno;
}
PROFILE_TIMESTAMP(&end_time);
state->vfs_aio_state.duration = nsec_time_diff(&end_time, &start_time);
SMBPROFILE_BYTES_ASYNC_SET_IDLE(state->profile_bytes);
}
static int vfs_gluster_fsync_state_destructor(struct vfs_gluster_fsync_state *state)
{
return -1;
}
static void vfs_gluster_fsync_done(struct tevent_req *subreq)
{
struct tevent_req *req = tevent_req_callback_data(
subreq, struct tevent_req);
struct vfs_gluster_fsync_state *state = tevent_req_data(
req, struct vfs_gluster_fsync_state);
int ret;
ret = pthreadpool_tevent_job_recv(subreq);
TALLOC_FREE(subreq);
SMBPROFILE_BYTES_ASYNC_END(state->profile_bytes);
talloc_set_destructor(state, NULL);
if (ret != 0) {
if (ret != EAGAIN) {
tevent_req_error(req, ret);
return;
}
/*
* If we get EAGAIN from pthreadpool_tevent_job_recv() this
* means the lower level pthreadpool failed to create a new
* thread. Fallback to sync processing in that case to allow
* some progress for the client.
*/
vfs_gluster_fsync_do(state);
}
tevent_req_done(req);
}
static int vfs_gluster_fsync_recv(struct tevent_req *req,
struct vfs_aio_state *vfs_aio_state)
{
/*
* Use implicit conversion ssize_t->int
*/
return vfs_gluster_recv(req, vfs_aio_state);
struct vfs_gluster_fsync_state *state = tevent_req_data(
req, struct vfs_gluster_fsync_state);
if (tevent_req_is_unix_error(req, &vfs_aio_state->error)) {
return -1;
}
*vfs_aio_state = state->vfs_aio_state;
return state->ret;
}
static int vfs_gluster_stat(struct vfs_handle_struct *handle,
@ -1873,10 +1899,10 @@ static struct vfs_fn_pointers glusterfs_fns = {
.close_fn = vfs_gluster_close,
.pread_fn = vfs_gluster_pread,
.pread_send_fn = vfs_gluster_pread_send,
.pread_recv_fn = vfs_gluster_recv,
.pread_recv_fn = vfs_gluster_pread_recv,
.pwrite_fn = vfs_gluster_pwrite,
.pwrite_send_fn = vfs_gluster_pwrite_send,
.pwrite_recv_fn = vfs_gluster_recv,
.pwrite_recv_fn = vfs_gluster_pwrite_recv,
.lseek_fn = vfs_gluster_lseek,
.sendfile_fn = vfs_gluster_sendfile,
.recvfile_fn = vfs_gluster_recvfile,