event/epoll: Add back socket for polling of events immediately after

reading the entire rpc message from the wire

Currently socket is added back for future events after higher layers
(rpc, xlators etc) have processed the message. If message processing
involves signficant delay (as in writev replies processed by Erasure
Coding), performance takes hit. Hence this patch modifies
transport/socket to add back the socket for polling of events
immediately after reading the entire rpc message, but before
notification to higher layers.

credits: Thanks to "Kotresh Hiremath Ravishankar"
         <khiremat@redhat.com> for assitance in fixing a regression in
         bitrot caused by this patch.

Change-Id: I04b6b9d0b51a1cfb86ecac3c3d87a5f388cf5800
BUG: 1448364
Signed-off-by: Raghavendra G <rgowdapp@redhat.com>
Reviewed-on: https://review.gluster.org/15036
CentOS-regression: Gluster Build System <jenkins@build.gluster.org>
NetBSD-regression: NetBSD Build System <jenkins@build.gluster.org>
Smoke: Gluster Build System <jenkins@build.gluster.org>
Reviewed-by: Amar Tumballi <amarts@redhat.com>
This commit is contained in:
Raghavendra G 2017-05-05 15:21:30 +05:30
parent 333474e0d6
commit cea8b70250
9 changed files with 268 additions and 121 deletions

View File

@ -108,11 +108,17 @@ cli_rl_process_line (char *line)
int
cli_rl_stdin (int fd, int idx, void *data,
cli_rl_stdin (int fd, int idx, int gen, void *data,
int poll_out, int poll_in, int poll_err)
{
struct cli_state *state = NULL;
state = data;
rl_callback_read_char ();
event_handled (state->ctx->event_pool, fd, idx, gen);
return 0;
}

View File

@ -1732,8 +1732,7 @@ out:
/* XXX: move these into @ctx */
static char *oldvolfile = NULL;
static int oldvollen = 0;
static int oldvollen;
int
@ -1743,7 +1742,7 @@ mgmt_getspec_cbk (struct rpc_req *req, struct iovec *iov, int count,
gf_getspec_rsp rsp = {0,};
call_frame_t *frame = NULL;
glusterfs_ctx_t *ctx = NULL;
int ret = 0;
int ret = 0, locked = 0;
ssize_t size = 0;
FILE *tmpfp = NULL;
char *volfilebuf = NULL;
@ -1773,74 +1772,85 @@ mgmt_getspec_cbk (struct rpc_req *req, struct iovec *iov, int count,
ret = 0;
size = rsp.op_ret;
if (size == oldvollen && (memcmp (oldvolfile, rsp.spec, size) == 0)) {
gf_log (frame->this->name, GF_LOG_INFO,
"No change in volfile, continuing");
goto out;
}
LOCK (&ctx->volfile_lock);
{
locked = 1;
tmpfp = tmpfile ();
if (!tmpfp) {
ret = -1;
goto out;
}
if (size == oldvollen && (memcmp (oldvolfile, rsp.spec, size) == 0)) {
gf_log (frame->this->name, GF_LOG_INFO,
"No change in volfile, continuing");
goto out;
}
fwrite (rsp.spec, size, 1, tmpfp);
fflush (tmpfp);
if (ferror (tmpfp)) {
ret = -1;
goto out;
}
tmpfp = tmpfile ();
if (!tmpfp) {
ret = -1;
goto out;
}
/* Check if only options have changed. No need to reload the
* volfile if topology hasn't changed.
* glusterfs_volfile_reconfigure returns 3 possible return states
* return 0 =======> reconfiguration of options has succeeded
* return 1 =======> the graph has to be reconstructed and all the xlators should be inited
* return -1(or -ve) =======> Some Internal Error occurred during the operation
*/
fwrite (rsp.spec, size, 1, tmpfp);
fflush (tmpfp);
if (ferror (tmpfp)) {
ret = -1;
goto out;
}
/* Check if only options have changed. No need to reload the
* volfile if topology hasn't changed.
* glusterfs_volfile_reconfigure returns 3 possible return states
* return 0 =======> reconfiguration of options has succeeded
* return 1 =======> the graph has to be reconstructed and all the xlators should be inited
* return -1(or -ve) =======> Some Internal Error occurred during the operation
*/
ret = glusterfs_volfile_reconfigure (oldvollen, tmpfp, ctx, oldvolfile);
if (ret == 0) {
gf_log ("glusterfsd-mgmt", GF_LOG_DEBUG,
"No need to re-load volfile, reconfigure done");
if (oldvolfile)
volfilebuf = GF_REALLOC (oldvolfile, size);
else
volfilebuf = GF_CALLOC (1, size, gf_common_mt_char);
if (!volfilebuf) {
ret = -1;
goto out;
}
oldvolfile = volfilebuf;
oldvollen = size;
memcpy (oldvolfile, rsp.spec, size);
goto out;
}
if (ret < 0) {
gf_log ("glusterfsd-mgmt",
GF_LOG_DEBUG, "Reconfigure failed !!");
goto out;
}
ret = glusterfs_process_volfp (ctx, tmpfp);
/* tmpfp closed */
tmpfp = NULL;
if (ret)
goto out;
ret = glusterfs_volfile_reconfigure (oldvollen, tmpfp, ctx, oldvolfile);
if (ret == 0) {
gf_log ("glusterfsd-mgmt", GF_LOG_DEBUG,
"No need to re-load volfile, reconfigure done");
if (oldvolfile)
volfilebuf = GF_REALLOC (oldvolfile, size);
else
volfilebuf = GF_CALLOC (1, size, gf_common_mt_char);
if (!volfilebuf) {
ret = -1;
goto out;
}
oldvolfile = volfilebuf;
oldvollen = size;
memcpy (oldvolfile, rsp.spec, size);
goto out;
}
UNLOCK (&ctx->volfile_lock);
if (ret < 0) {
gf_log ("glusterfsd-mgmt", GF_LOG_DEBUG, "Reconfigure failed !!");
goto out;
}
locked = 0;
ret = glusterfs_process_volfp (ctx, tmpfp);
/* tmpfp closed */
tmpfp = NULL;
if (ret)
goto out;
if (oldvolfile)
volfilebuf = GF_REALLOC (oldvolfile, size);
else
volfilebuf = GF_CALLOC (1, size, gf_common_mt_char);
if (!volfilebuf) {
ret = -1;
goto out;
}
oldvolfile = volfilebuf;
oldvollen = size;
memcpy (oldvolfile, rsp.spec, size);
if (!is_mgmt_rpc_reconnect) {
need_emancipate = 1;
glusterfs_mgmt_pmap_signin (ctx);
@ -1848,6 +1858,10 @@ mgmt_getspec_cbk (struct rpc_req *req, struct iovec *iov, int count,
}
out:
if (locked)
UNLOCK (&ctx->volfile_lock);
STACK_DESTROY (frame->root);
free (rsp.spec);
@ -2345,6 +2359,8 @@ glusterfs_mgmt_init (glusterfs_ctx_t *ctx)
if (ctx->mgmt)
return 0;
LOCK_INIT (&ctx->volfile_lock);
if (cmd_args->volfile_server_port)
port = cmd_args->volfile_server_port;

View File

@ -569,38 +569,11 @@ pre_unlock:
if (!handler)
goto out;
ret = handler (fd, idx, data,
ret = handler (fd, idx, gen, data,
(event->events & (EPOLLIN|EPOLLPRI)),
(event->events & (EPOLLOUT)),
(event->events & (EPOLLERR|EPOLLHUP)));
LOCK (&slot->lock);
{
slot->in_handler--;
if (gen != slot->gen) {
/* event_unregister() happened while we were
in handler()
*/
gf_msg_debug ("epoll", 0, "generation bumped on idx=%d"
" from gen=%d to slot->gen=%d, fd=%d, "
"slot->fd=%d", idx, gen, slot->gen, fd,
slot->fd);
goto post_unlock;
}
/* This call also picks up the changes made by another
thread calling event_select_on_epoll() while this
thread was busy in handler()
*/
if (slot->in_handler == 0) {
event->events = slot->events;
ret = epoll_ctl (event_pool->fd, EPOLL_CTL_MOD,
fd, event);
}
}
post_unlock:
UNLOCK (&slot->lock);
out:
event_slot_unref (event_pool, slot, idx);
@ -891,6 +864,55 @@ event_pool_destroy_epoll (struct event_pool *event_pool)
return ret;
}
static int
event_handled_epoll (struct event_pool *event_pool, int fd, int idx, int gen)
{
struct event_slot_epoll *slot = NULL;
struct epoll_event epoll_event = {0, };
struct event_data *ev_data = (void *)&epoll_event.data;
int ret = 0;
slot = event_slot_get (event_pool, idx);
assert (slot->fd == fd);
LOCK (&slot->lock);
{
slot->in_handler--;
if (gen != slot->gen) {
/* event_unregister() happened while we were
in handler()
*/
gf_msg_debug ("epoll", 0, "generation bumped on idx=%d"
" from gen=%d to slot->gen=%d, fd=%d, "
"slot->fd=%d", idx, gen, slot->gen, fd,
slot->fd);
goto post_unlock;
}
/* This call also picks up the changes made by another
thread calling event_select_on_epoll() while this
thread was busy in handler()
*/
if (slot->in_handler == 0) {
epoll_event.events = slot->events;
ev_data->idx = idx;
ev_data->gen = gen;
ret = epoll_ctl (event_pool->fd, EPOLL_CTL_MOD,
fd, &epoll_event);
}
}
post_unlock:
UNLOCK (&slot->lock);
event_slot_unref (event_pool, slot, idx);
return ret;
}
struct event_ops event_ops_epoll = {
.new = event_pool_new_epoll,
.event_register = event_register_epoll,
@ -899,7 +921,8 @@ struct event_ops event_ops_epoll = {
.event_unregister_close = event_unregister_close_epoll,
.event_dispatch = event_dispatch_epoll,
.event_reconfigure_threads = event_reconfigure_threads_epoll,
.event_pool_destroy = event_pool_destroy_epoll
.event_pool_destroy = event_pool_destroy_epoll,
.event_handled = event_handled_epoll,
};
#endif

View File

@ -40,7 +40,7 @@ event_register_poll (struct event_pool *event_pool, int fd,
static int
__flush_fd (int fd, int idx, void *data,
__flush_fd (int fd, int idx, int gen, void *data,
int poll_in, int poll_out, int poll_err)
{
char buf[64];
@ -386,7 +386,7 @@ unlock:
pthread_mutex_unlock (&event_pool->mutex);
if (handler)
ret = handler (ufds[i].fd, idx, data,
ret = handler (ufds[i].fd, idx, 0, data,
(ufds[i].revents & (POLLIN|POLLPRI)),
(ufds[i].revents & (POLLOUT)),
(ufds[i].revents & (POLLERR|POLLHUP|POLLNVAL)));

View File

@ -159,8 +159,9 @@ event_pool_destroy (struct event_pool *event_pool)
}
pthread_mutex_unlock (&event_pool->mutex);
if (!destroy || (activethreadcount > 0))
if (!destroy || (activethreadcount > 0)) {
goto out;
}
ret = event_pool->ops->event_pool_destroy (event_pool);
out:
@ -168,19 +169,27 @@ out:
}
int
poller_destroy_handler (int fd, int idx, void *data,
poller_destroy_handler (int fd, int idx, int gen, void *data,
int poll_out, int poll_in, int poll_err)
{
int readfd = -1;
char buf = '\0';
struct event_destroy_data *destroy = NULL;
int readfd = -1, ret = -1;
char buf = '\0';
readfd = *(int *)data;
if (readfd < 0)
return -1;
destroy = data;
readfd = destroy->readfd;
if (readfd < 0) {
goto out;
}
while (sys_read (readfd, &buf, 1) > 0) {
}
return 0;
ret = 0;
out:
event_handled (destroy->pool, fd, idx, gen);
return ret;
}
/* This function destroys all the poller threads.
@ -197,11 +206,12 @@ poller_destroy_handler (int fd, int idx, void *data,
int
event_dispatch_destroy (struct event_pool *event_pool)
{
int ret = -1;
int fd[2] = {-1};
int idx = -1;
int flags = 0;
struct timespec sleep_till = {0, };
int ret = -1, threadcount = 0;
int fd[2] = {-1};
int idx = -1;
int flags = 0;
struct timespec sleep_till = {0, };
struct event_destroy_data data = {0, };
GF_VALIDATE_OR_GOTO ("event", event_pool, out);
@ -223,10 +233,13 @@ event_dispatch_destroy (struct event_pool *event_pool)
if (ret < 0)
goto out;
data.pool = event_pool;
data.readfd = fd[1];
/* From the main thread register an event on the pipe fd[0],
*/
idx = event_register (event_pool, fd[0], poller_destroy_handler,
&fd[1], 1, 0);
&data, 1, 0);
if (idx < 0)
goto out;
@ -235,6 +248,7 @@ event_dispatch_destroy (struct event_pool *event_pool)
*/
pthread_mutex_lock (&event_pool->mutex);
{
threadcount = event_pool->eventthreadcount;
event_pool->destroy = 1;
}
pthread_mutex_unlock (&event_pool->mutex);
@ -254,9 +268,11 @@ event_dispatch_destroy (struct event_pool *event_pool)
*/
int retry = 0;
while (event_pool->activethreadcount > 0 && retry++ < 10) {
if (sys_write (fd[1], "dummy", 6) == -1)
while (event_pool->activethreadcount > 0
&& (retry++ < (threadcount + 10))) {
if (sys_write (fd[1], "dummy", 6) == -1) {
break;
}
sleep_till.tv_sec = time (NULL) + 1;
ret = pthread_cond_timedwait (&event_pool->cond,
&event_pool->mutex,
@ -275,3 +291,14 @@ event_dispatch_destroy (struct event_pool *event_pool)
return ret;
}
int
event_handled (struct event_pool *event_pool, int fd, int idx, int gen)
{
int ret = 0;
if (event_pool->ops->event_handled)
ret = event_pool->ops->event_handled (event_pool, fd, idx, gen);
return ret;
}

View File

@ -23,7 +23,7 @@ struct event_data {
} __attribute__ ((__packed__, __may_alias__));
typedef int (*event_handler_t) (int fd, int idx, void *data,
typedef int (*event_handler_t) (int fd, int idx, int gen, void *data,
int poll_in, int poll_out, int poll_err);
#define EVENT_EPOLL_TABLES 1024
@ -73,6 +73,11 @@ struct event_pool {
};
struct event_destroy_data {
int readfd;
struct event_pool *pool;
};
struct event_ops {
struct event_pool * (*new) (int count, int eventthreadcount);
@ -93,6 +98,8 @@ struct event_ops {
int (*event_reconfigure_threads) (struct event_pool *event_pool,
int newcount);
int (*event_pool_destroy) (struct event_pool *event_pool);
int (*event_handled) (struct event_pool *event_pool, int fd, int idx,
int gen);
};
struct event_pool *event_pool_new (int count, int eventthreadcount);
@ -107,4 +114,6 @@ int event_dispatch (struct event_pool *event_pool);
int event_reconfigure_threads (struct event_pool *event_pool, int value);
int event_pool_destroy (struct event_pool *event_pool);
int event_dispatch_destroy (struct event_pool *event_pool);
int event_handled (struct event_pool *event_pool, int fd, int idx, int gen);
#endif /* _EVENT_H_ */

View File

@ -520,6 +520,8 @@ struct _glusterfs_ctx {
int notifying;
struct gf_ctx_tw *tw; /* refcounted timer_wheel */
gf_lock_t volfile_lock;
};
typedef struct _glusterfs_ctx glusterfs_ctx_t;

View File

@ -1172,11 +1172,11 @@ out:
}
static int
socket_event_poll_err (rpc_transport_t *this)
static gf_boolean_t
socket_event_poll_err (rpc_transport_t *this, int gen, int idx)
{
socket_private_t *priv = NULL;
int ret = -1;
socket_private_t *priv = NULL;
gf_boolean_t socket_closed = _gf_false;
GF_VALIDATE_OR_GOTO ("socket", this, out);
GF_VALIDATE_OR_GOTO ("socket", this->private, out);
@ -1185,15 +1185,29 @@ socket_event_poll_err (rpc_transport_t *this)
pthread_mutex_lock (&priv->lock);
{
__socket_ioq_flush (this);
__socket_reset (this);
if ((priv->gen == gen) && (priv->idx == idx)
&& (priv->sock != -1)) {
__socket_ioq_flush (this);
__socket_reset (this);
socket_closed = _gf_true;
}
}
pthread_mutex_unlock (&priv->lock);
rpc_transport_notify (this, RPC_TRANSPORT_DISCONNECT, this);
if (socket_closed) {
pthread_mutex_lock (&priv->notify.lock);
{
while (priv->notify.in_progress)
pthread_cond_wait (&priv->notify.cond,
&priv->notify.lock);
}
pthread_mutex_unlock (&priv->notify.lock);
rpc_transport_notify (this, RPC_TRANSPORT_DISCONNECT, this);
}
out:
return ret;
return socket_closed;
}
@ -2271,22 +2285,50 @@ out:
static int
socket_event_poll_in (rpc_transport_t *this)
socket_event_poll_in (rpc_transport_t *this, gf_boolean_t notify_handled)
{
int ret = -1;
rpc_transport_pollin_t *pollin = NULL;
socket_private_t *priv = this->private;
glusterfs_ctx_t *ctx = NULL;
ctx = this->ctx;
ret = socket_proto_state_machine (this, &pollin);
if (pollin) {
pthread_mutex_lock (&priv->notify.lock);
{
priv->notify.in_progress++;
}
pthread_mutex_unlock (&priv->notify.lock);
}
if (notify_handled && (ret != -1))
event_handled (ctx->event_pool, priv->sock, priv->idx,
priv->gen);
if (pollin) {
priv->ot_state = OT_CALLBACK;
ret = rpc_transport_notify (this, RPC_TRANSPORT_MSG_RECEIVED,
pollin);
if (priv->ot_state == OT_CALLBACK) {
priv->ot_state = OT_RUNNING;
}
rpc_transport_pollin_destroy (pollin);
pthread_mutex_lock (&priv->notify.lock);
{
--priv->notify.in_progress;
if (!priv->notify.in_progress)
pthread_cond_signal (&priv->notify.cond);
}
pthread_mutex_unlock (&priv->notify.lock);
}
return ret;
@ -2369,24 +2411,29 @@ static int socket_disconnect (rpc_transport_t *this, gf_boolean_t wait);
/* reads rpc_requests during pollin */
static int
socket_event_handler (int fd, int idx, void *data,
socket_event_handler (int fd, int idx, int gen, void *data,
int poll_in, int poll_out, int poll_err)
{
rpc_transport_t *this = NULL;
socket_private_t *priv = NULL;
int ret = -1;
rpc_transport_t *this = NULL;
socket_private_t *priv = NULL;
int ret = -1;
glusterfs_ctx_t *ctx = NULL;
gf_boolean_t socket_closed = _gf_false, notify_handled = _gf_false;
this = data;
GF_VALIDATE_OR_GOTO ("socket", this, out);
GF_VALIDATE_OR_GOTO ("socket", this->private, out);
GF_VALIDATE_OR_GOTO ("socket", this->xl, out);
THIS = this->xl;
priv = this->private;
ctx = this->ctx;
pthread_mutex_lock (&priv->lock);
{
priv->idx = idx;
priv->gen = gen;
}
pthread_mutex_unlock (&priv->lock);
@ -2417,16 +2464,23 @@ socket_event_handler (int fd, int idx, void *data,
}
if (!ret && poll_in) {
ret = socket_event_poll_in (this);
ret = socket_event_poll_in (this, !poll_err);
notify_handled = _gf_true;
}
if ((ret < 0) || poll_err) {
/* Logging has happened already in earlier cases */
gf_log ("transport", ((ret >= 0) ? GF_LOG_INFO : GF_LOG_DEBUG),
"EPOLLERR - disconnecting now");
socket_event_poll_err (this);
rpc_transport_unref (this);
}
socket_closed = socket_event_poll_err (this, gen, idx);
if (socket_closed)
rpc_transport_unref (this);
} else if (!notify_handled) {
event_handled (ctx->event_pool, fd, idx, gen);
}
out:
return ret;
@ -2533,7 +2587,7 @@ socket_poller (void *ctx)
}
if (pfd[1].revents & POLL_MASK_INPUT) {
ret = socket_event_poll_in(this);
ret = socket_event_poll_in(this, 0);
if (ret >= 0) {
/* Suppress errors while making progress. */
pfd[1].revents &= ~POLL_MASK_ERROR;
@ -2657,7 +2711,7 @@ socket_spawn (rpc_transport_t *this)
}
static int
socket_server_event_handler (int fd, int idx, void *data,
socket_server_event_handler (int fd, int idx, int gen, void *data,
int poll_in, int poll_out, int poll_err)
{
rpc_transport_t *this = NULL;
@ -2913,6 +2967,8 @@ socket_server_event_handler (int fd, int idx, void *data,
}
}
out:
event_handled (ctx->event_pool, fd, idx, gen);
if (cname && (cname != this->ssl_name)) {
GF_FREE(cname);
}
@ -4024,6 +4080,8 @@ socket_init (rpc_transport_t *this)
priv->bio = 0;
priv->windowsize = GF_DEFAULT_SOCKET_WINDOW_SIZE;
INIT_LIST_HEAD (&priv->ioq);
pthread_mutex_init (&priv->notify.lock, NULL);
pthread_cond_init (&priv->notify.cond, NULL);
/* All the below section needs 'this->options' to be present */
if (!this->options)

View File

@ -203,6 +203,7 @@ typedef enum {
typedef struct {
int32_t sock;
int32_t idx;
int32_t gen;
/* -1 = not connected. 0 = in progress. 1 = connected */
char connected;
/* 1 = connect failed for reasons other than EINPROGRESS/ENOENT
@ -254,6 +255,11 @@ typedef struct {
int log_ctr;
GF_REF_DECL; /* refcount to keep track of socket_poller
threads */
struct {
pthread_mutex_t lock;
pthread_cond_t cond;
uint64_t in_progress;
} notify;
} socket_private_t;