transport shortcut b/w client and server

This patch makes the server pass back the transport pointer of the client. If the UUID matches, the client makes the local transport 'shortcut' with the remote transport (pointer received from server)

The shortcut simulates a socket queue. Instead of serialized messages going over the network and getting queued in the tcp socket queue, the messages get queued in a transport specific queue picked by a polling thread.
This commit is contained in:
Anand V. Avati 2009-05-07 01:24:41 +05:30
parent 12eb832e25
commit f9f5519b66
4 changed files with 131 additions and 2 deletions

View File

@ -214,7 +214,43 @@ transport_submit (transport_t *this, char *buf, int32_t len,
struct iovec *vector, int count,
struct iobref *iobref)
{
int32_t ret = -1;
int32_t ret = -1;
transport_t *peer_trans = NULL;
struct iobuf *iobuf = NULL;
struct transport_msg *msg = NULL;
if (this->peer_trans) {
peer_trans = this->peer_trans;
msg = CALLOC (1, sizeof (*msg));
if (!msg) {
return -ENOMEM;
}
msg->hdr = buf;
msg->hdrlen = len;
if (vector) {
iobuf = iobuf_get (this->xl->ctx->iobuf_pool);
if (!iobuf) {
FREE (msg->hdr);
FREE (msg);
return -ENOMEM;
}
iov_unload (iobuf->ptr, vector, count);
msg->iobuf = iobuf;
}
pthread_mutex_lock (&peer_trans->handover.mutex);
{
list_add_tail (&msg->list, &peer_trans->handover.msgs);
pthread_cond_broadcast (&peer_trans->handover.cond);
}
pthread_mutex_unlock (&peer_trans->handover.mutex);
return 0;
}
GF_VALIDATE_OR_GOTO("transport", this, fail);
GF_VALIDATE_OR_GOTO("transport", this->ops, fail);
@ -307,7 +343,15 @@ transport_receive (transport_t *this, char **hdr_p, size_t *hdrlen_p,
int32_t ret = -1;
GF_VALIDATE_OR_GOTO("transport", this, fail);
if (this->peer_trans) {
*hdr_p = this->handover.msg->hdr;
*hdrlen_p = this->handover.msg->hdrlen;
*iobuf_p = this->handover.msg->iobuf;
return 0;
}
ret = this->ops->receive (this, hdr_p, hdrlen_p, iobuf_p);
fail:
return ret;
@ -338,3 +382,56 @@ fail:
return ret;
}
void *
transport_peerproc (void *trans_data)
{
transport_t *trans = NULL;
struct transport_msg *msg = NULL;
trans = trans_data;
while (1) {
pthread_mutex_lock (&trans->handover.mutex);
{
while (list_empty (&trans->handover.msgs))
pthread_cond_wait (&trans->handover.cond,
&trans->handover.mutex);
msg = list_entry (trans->handover.msgs.next,
struct transport_msg, list);
list_del_init (&msg->list);
}
pthread_mutex_unlock (&trans->handover.mutex);
trans->handover.msg = msg;
trans->xl->notify (trans->xl, GF_EVENT_POLLIN, trans);
FREE (msg);
}
}
int
transport_setpeer (transport_t *trans, transport_t *peer_trans)
{
trans->peer_trans = peer_trans;
INIT_LIST_HEAD (&trans->handover.msgs);
pthread_cond_init (&trans->handover.cond, NULL);
pthread_mutex_init (&trans->handover.mutex, NULL);
pthread_create (&trans->handover.thread, NULL,
transport_peerproc, trans);
peer_trans->peer_trans = trans;
INIT_LIST_HEAD (&peer_trans->handover.msgs);
pthread_cond_init (&peer_trans->handover.cond, NULL);
pthread_mutex_init (&peer_trans->handover.mutex, NULL);
pthread_create (&peer_trans->handover.thread, NULL,
transport_peerproc, peer_trans);
return 0;
}

View File

@ -40,6 +40,13 @@ typedef struct peer_info {
char identifier[UNIX_PATH_MAX];
}peer_info_t;
struct transport_msg {
struct list_head list;
char *hdr;
int hdrlen;
struct iobuf *iobuf;
};
struct transport {
struct transport_ops *ops;
void *private;
@ -55,6 +62,16 @@ struct transport {
/* int (*notify) (transport_t *this, int event, void *data); */
peer_info_t peerinfo;
peer_info_t myinfo;
transport_t *peer_trans;
struct {
pthread_mutex_t mutex;
pthread_cond_t cond;
pthread_t thread;
struct list_head msgs;
struct transport_msg *msg;
} handover;
};
struct transport_ops {
@ -84,4 +101,6 @@ transport_t *transport_load (dict_t *options, xlator_t *xl);
transport_t *transport_ref (transport_t *trans);
int32_t transport_unref (transport_t *trans);
int transport_setpeer (transport_t *trans, transport_t *trans_peer);
#endif /* __TRANSPORT_H__ */

View File

@ -6254,6 +6254,8 @@ client_setvolume_cbk (call_frame_t *frame,
int32_t op_ret = -1;
int32_t op_errno = EINVAL;
int32_t dict_len = 0;
transport_t *peer_trans = NULL;
uint64_t peer_trans_int = 0;
trans = frame->local; frame->local = NULL;
@ -6321,14 +6323,22 @@ client_setvolume_cbk (call_frame_t *frame,
ctx = get_global_ctx_ptr ();
if (process_uuid && !strcmp (ctx->process_uuid,process_uuid)) {
ret = dict_get_uint64 (reply, "transport-ptr",
&peer_trans_int);
peer_trans = (void *) (long) (peer_trans_int);
gf_log (this->name, GF_LOG_WARNING,
"attaching to the local volume '%s'",
remote_subvol);
transport_setpeer (trans, peer_trans);
/* TODO: */
/*
conf->child = xlator_search_by_name (this,
remote_subvol);
*/
}
gf_log (trans->xl->name, GF_LOG_NORMAL,

View File

@ -7085,6 +7085,9 @@ mop_setvolume (call_frame_t *frame, xlator_t *bound_xl,
ret = dict_set_str (reply, "process-uuid",
xl->ctx->process_uuid);
ret = dict_set_uint64 (reply, "transport-ptr",
((uint64_t) (long) trans));
fail:
dict_len = dict_serialized_length (reply);
if (dict_len < 0) {