rebalance: handshake_event_notify to make fsd talk to glusterd

Event_notify can be used by others to communicate with glusterd.
A cbk event is also added for future use.

req has a op, and dict.
rsp has op_ret, op_errno, and dict.

With this, rebalance process can update the status before
exiting.

Signed-off-by: shishir gowda <shishirng@gluster.com>
Change-Id: If5c0ec00514eb3a109a790b2ea273317611e4562
BUG: 807126
Reviewed-on: http://review.gluster.com/3013
Tested-by: Gluster Build System <jenkins@build.gluster.com>
Reviewed-by: Amar Tumballi <amarts@redhat.com>
Reviewed-by: Vijay Bellur <vijay@gluster.com>
This commit is contained in:
shishir gowda 2012-04-24 10:27:18 +05:30 committed by Vijay Bellur
parent 3f35280a36
commit 8f2eba00fd
13 changed files with 311 additions and 48 deletions

View File

@ -67,6 +67,12 @@ mgmt_cbk_spec (void *data)
return 0;
}
int
mgmt_cbk_event (void *data)
{
return 0;
}
struct iobuf *
glusterfs_serialize_reply (rpcsvc_request_t *req, void *arg,
struct iovec *outmsg, xdrproc_t xdrproc)
@ -1246,6 +1252,8 @@ out:
rpcclnt_cb_actor_t gluster_cbk_actors[] = {
[GF_CBK_FETCHSPEC] = {"FETCHSPEC", GF_CBK_FETCHSPEC, mgmt_cbk_spec },
[GF_CBK_EVENT_NOTIFY] = {"EVENTNOTIFY", GF_CBK_EVENT_NOTIFY,
mgmt_cbk_event},
};
@ -1279,6 +1287,7 @@ char *clnt_handshake_procs[GF_HNDSK_MAXVALUE] = {
[GF_HNDSK_SETVOLUME] = "SETVOLUME",
[GF_HNDSK_GETSPEC] = "GETSPEC",
[GF_HNDSK_PING] = "PING",
[GF_HNDSK_EVENT_NOTIFY] = "EVENTNOTIFY",
};
rpc_clnt_prog_t clnt_handshake_prog = {
@ -1639,6 +1648,77 @@ glusterfs_volfile_fetch (glusterfs_ctx_t *ctx)
return ret;
}
int32_t
mgmt_event_notify_cbk (struct rpc_req *req, struct iovec *iov, int count,
void *myframe)
{
gf_event_notify_rsp rsp = {0,};
call_frame_t *frame = NULL;
glusterfs_ctx_t *ctx = NULL;
int ret = 0;
frame = myframe;
ctx = frame->this->ctx;
if (-1 == req->rpc_status) {
ret = -1;
goto out;
}
ret = xdr_to_generic (*iov, &rsp, (xdrproc_t)xdr_gf_event_notify_rsp);
if (ret < 0) {
gf_log (frame->this->name, GF_LOG_ERROR, "XDR decoding error");
ret = -1;
goto out;
}
if (-1 == rsp.op_ret) {
gf_log (frame->this->name, GF_LOG_ERROR,
"failed to get the rsp from server");
ret = -1;
goto out;
}
out:
if (rsp.dict.dict_val)
free (rsp.dict.dict_val); //malloced by xdr
return ret;
}
int32_t
glusterfs_rebalance_event_notify (dict_t *dict)
{
glusterfs_ctx_t *ctx = NULL;
gf_event_notify_req req = {0,};
int32_t ret = -1;
cmd_args_t *cmd_args = NULL;
call_frame_t *frame = NULL;
ctx = glusterfs_ctx_get ();
cmd_args = &ctx->cmd_args;
frame = create_frame (THIS, ctx->pool);
req.op = GF_EN_DEFRAG_STATUS;
if (dict) {
ret = dict_set_str (dict, "volname", cmd_args->volfile_id);
if (ret)
gf_log ("", GF_LOG_ERROR, "failed to set volname");
ret = dict_allocate_and_serialize (dict, &req.dict.dict_val,
(size_t *)&req.dict.dict_len);
}
ret = mgmt_submit_request (&req, frame, ctx, &clnt_handshake_prog,
GF_HNDSK_EVENT_NOTIFY, NULL,
(xdrproc_t)xdr_gf_event_notify_req);
if (req.dict.dict_val)
GF_FREE (req.dict.dict_val);
STACK_DESTROY (frame->root);
return ret;
}
static int
mgmt_rpc_notify (struct rpc_clnt *rpc, void *mydata, rpc_clnt_event_t event,

View File

@ -881,4 +881,5 @@ int is_gf_log_command (xlator_t *trans, const char *name, char *value);
int glusterd_check_log_level (const char *value);
int xlator_volopt_dynload (char *xlator_type, void **dl_handle,
volume_opt_list_t *vol_opt_handle);
int32_t glusterfs_rebalance_event_notify (dict_t *dict);
#endif /* _XLATOR_H */

View File

@ -74,6 +74,7 @@ enum gf_handshake_procnum {
GF_HNDSK_GETSPEC,
GF_HNDSK_PING,
GF_HNDSK_SET_LK_VER,
GF_HNDSK_EVENT_NOTIFY,
GF_HNDSK_MAXVALUE,
};
@ -118,6 +119,7 @@ enum gf_cbk_procnum {
GF_CBK_NULL = 0,
GF_CBK_FETCHSPEC,
GF_CBK_INO_FLUSH,
GF_CBK_EVENT_NOTIFY,
GF_CBK_MAXVALUE,
};
@ -200,8 +202,14 @@ typedef enum {
GF_AFR_OP_HEAL_FAILED_FILES,
GF_AFR_OP_SPLIT_BRAIN_FILES
} gf_xl_afr_op_t ;
enum gf_hdsk_event_notify_op {
GF_EN_DEFRAG_STATUS,
GF_EN_MAX,
};
#define GLUSTER_HNDSK_PROGRAM 14398633 /* Completely random */
#define GLUSTER_HNDSK_VERSION 1 /* 0.0.1 */
#define GLUSTER_HNDSK_VERSION 2 /* 0.0.1 */
#define GLUSTER_PMAP_PROGRAM 34123456
#define GLUSTER_PMAP_VERSION 1

View File

@ -1892,3 +1892,31 @@ xdr_gf_set_lk_ver_req (XDR *xdrs, gf_set_lk_ver_req *objp)
return FALSE;
return TRUE;
}
bool_t
xdr_gf_event_notify_req (XDR *xdrs, gf_event_notify_req *objp)
{
register int32_t *buf;
buf = NULL;
if (!xdr_int (xdrs, &objp->op))
return FALSE;
if (!xdr_bytes (xdrs, (char **)&objp->dict.dict_val, (u_int *) &objp->dict.dict_len, ~0))
return FALSE;
return TRUE;
}
bool_t
xdr_gf_event_notify_rsp (XDR *xdrs, gf_event_notify_rsp *objp)
{
register int32_t *buf;
buf = NULL;
if (!xdr_int (xdrs, &objp->op_ret))
return FALSE;
if (!xdr_int (xdrs, &objp->op_errno))
return FALSE;
if (!xdr_bytes (xdrs, (char **)&objp->dict.dict_val, (u_int *) &objp->dict.dict_len, ~0))
return FALSE;
return TRUE;
}

View File

@ -1081,6 +1081,25 @@ struct gf_set_lk_ver_req {
};
typedef struct gf_set_lk_ver_req gf_set_lk_ver_req;
struct gf_event_notify_req {
int op;
struct {
u_int dict_len;
char *dict_val;
} dict;
};
typedef struct gf_event_notify_req gf_event_notify_req;
struct gf_event_notify_rsp {
int op_ret;
int op_errno;
struct {
u_int dict_len;
char *dict_val;
} dict;
};
typedef struct gf_event_notify_rsp gf_event_notify_rsp;
/* the xdr functions */
#if defined(__STDC__) || defined(__cplusplus)
@ -1172,6 +1191,8 @@ extern bool_t xdr_gfs3_dirplist (XDR *, gfs3_dirplist*);
extern bool_t xdr_gfs3_readdirp_rsp (XDR *, gfs3_readdirp_rsp*);
extern bool_t xdr_gf_set_lk_ver_rsp (XDR *, gf_set_lk_ver_rsp*);
extern bool_t xdr_gf_set_lk_ver_req (XDR *, gf_set_lk_ver_req*);
extern bool_t xdr_gf_event_notify_req (XDR *, gf_event_notify_req*);
extern bool_t xdr_gf_event_notify_rsp (XDR *, gf_event_notify_rsp*);
#else /* K&R C */
extern bool_t xdr_gf_statfs ();
@ -1262,6 +1283,8 @@ extern bool_t xdr_gfs3_dirplist ();
extern bool_t xdr_gfs3_readdirp_rsp ();
extern bool_t xdr_gf_set_lk_ver_rsp ();
extern bool_t xdr_gf_set_lk_ver_req ();
extern bool_t xdr_gf_event_notify_req ();
extern bool_t xdr_gf_event_notify_rsp ();
#endif /* K&R C */

View File

@ -683,3 +683,14 @@ struct gf_set_lk_ver_req {
string uid<>;
int lk_ver;
};
struct gf_event_notify_req {
int op;
opaque dict<>;
};
struct gf_event_notify_rsp {
int op_ret;
int op_errno;
opaque dict<>;
};

View File

@ -24,6 +24,7 @@
#endif
#include "dht-common.h"
#include "xlator.h"
#define GF_DISK_SECTOR_SIZE 512
#define DHT_REBALANCE_PID 4242 /* Change it if required */
@ -1414,6 +1415,7 @@ gf_defrag_start_crawl (void *data)
struct iatt parent = {0,};
dict_t *fix_layout = NULL;
dict_t *migrate_data = NULL;
dict_t *status = NULL;
this = data;
if (!this)
@ -1482,7 +1484,11 @@ gf_defrag_start_crawl (void *data)
out:
LOCK (&defrag->lock);
{
gf_defrag_status_get (defrag, NULL);
status = dict_new ();
gf_defrag_status_get (defrag, status);
glusterfs_rebalance_event_notify (status);
if (status)
dict_unref (status);
defrag->is_exiting = 1;
}
UNLOCK (&defrag->lock);

View File

@ -221,10 +221,71 @@ fail:
return 0;
}
int32_t
server_event_notify (rpcsvc_request_t *req)
{
int32_t ret = -1;
int32_t op_errno = 0;
gf_event_notify_req args = {0,};
gf_event_notify_rsp rsp = {0,};
dict_t *dict = NULL;
gf_boolean_t need_rsp = _gf_true;
if (!xdr_to_generic (req->msg[0], &args,
(xdrproc_t)xdr_gf_event_notify_req)) {
req->rpc_err = GARBAGE_ARGS;
goto fail;
}
if (args.dict.dict_len) {
dict = dict_new ();
if (!dict)
return ret;
ret = dict_unserialize (args.dict.dict_val,
args.dict.dict_len, &dict);
if (ret) {
gf_log ("", GF_LOG_ERROR, "Failed to unserialize req");
goto fail;
}
}
switch (args.op) {
case GF_EN_DEFRAG_STATUS:
gf_log ("", GF_LOG_INFO,
"recieved defrag status updated");
if (dict) {
glusterd_defrag_event_notify_handle (dict);
need_rsp = _gf_false;
}
break;
default:
gf_log ("", GF_LOG_ERROR, "Unkown op recieved in in event "
"notify");
ret = -1;
break;
}
fail:
rsp.op_ret = ret;
if (op_errno)
rsp.op_errno = gf_errno_to_error (op_errno);
if (need_rsp)
glusterd_submit_reply (req, &rsp, NULL, 0, NULL,
(xdrproc_t)xdr_gf_event_notify_rsp);
if (dict)
dict_unref (dict);
if (args.dict.dict_val)
free (args.dict.dict_val);//malloced by xdr
return 0;
}
rpcsvc_actor_t gluster_handshake_actors[] = {
[GF_HNDSK_NULL] = {"NULL", GF_HNDSK_NULL, NULL, NULL, NULL, 0},
[GF_HNDSK_GETSPEC] = {"GETSPEC", GF_HNDSK_GETSPEC, server_getspec, NULL, NULL, 0},
[GF_HNDSK_EVENT_NOTIFY] = {"EVENTNOTIFY", GF_HNDSK_EVENT_NOTIFY, server_event_notify,
NULL, NULL, 0},
};

View File

@ -3253,7 +3253,6 @@ out:
return ret;
}
int
glusterd_defrag_volume_node_rsp (dict_t *req_dict, dict_t *rsp_dict,
dict_t *op_ctx)
@ -3261,16 +3260,11 @@ glusterd_defrag_volume_node_rsp (dict_t *req_dict, dict_t *rsp_dict,
int ret = 0;
char *volname = NULL;
glusterd_volinfo_t *volinfo = NULL;
uint64_t files = 0;
uint64_t size = 0;
uint64_t lookup = 0;
gf_defrag_status_t status = GF_DEFRAG_STATUS_NOT_STARTED;
char key[256] = {0,};
int32_t i = 0;
char buf[1024] = {0,};
char *node_str = NULL;
glusterd_conf_t *priv = NULL;
uint64_t failures = 0;
priv = THIS->private;
GF_ASSERT (req_dict);
@ -3286,48 +3280,16 @@ glusterd_defrag_volume_node_rsp (dict_t *req_dict, dict_t *rsp_dict,
if (ret)
goto out;
if (!rsp_dict)
goto populate;
ret = dict_get_uint64 (rsp_dict, "files", &files);
if (ret)
gf_log (THIS->name, GF_LOG_TRACE,
"failed to get file count");
ret = dict_get_uint64 (rsp_dict, "size", &size);
if (ret)
gf_log (THIS->name, GF_LOG_TRACE,
"failed to get size of xfer");
ret = dict_get_uint64 (rsp_dict, "lookups", &lookup);
if (ret)
gf_log (THIS->name, GF_LOG_TRACE,
"failed to get lookedup file count");
ret = dict_get_int32 (rsp_dict, "status", (int32_t *)&status);
if (ret)
gf_log (THIS->name, GF_LOG_TRACE,
"failed to get status");
ret = dict_get_uint64 (rsp_dict, "failures", &failures);
if (ret)
gf_log (THIS->name, GF_LOG_TRACE,
"failed to get failure count");
if (files)
volinfo->rebalance_files = files;
if (size)
volinfo->rebalance_data = size;
if (lookup)
volinfo->lookedup_files = lookup;
if (failures)
volinfo->rebalance_failures = failures;
if (rsp_dict) {
ret = glusterd_defrag_volume_status_update (volinfo,
rsp_dict);
}
if (!op_ctx) {
dict_copy (rsp_dict, op_ctx);
goto out;
}
populate:
ret = dict_get_int32 (op_ctx, "count", &i);
i++;
@ -3364,12 +3326,10 @@ populate:
if (ret)
gf_log (THIS->name, GF_LOG_ERROR,
"failed to set lookedup file count");
if (!status)
status = volinfo->defrag_status;
memset (key, 0 , 256);
snprintf (key, 256, "status-%d", i);
ret = dict_set_int32 (op_ctx, key, status);
ret = dict_set_int32 (op_ctx, key, volinfo->defrag_status);
if (ret)
gf_log (THIS->name, GF_LOG_ERROR,
"failed to set status");

View File

@ -677,3 +677,30 @@ out:
return ret;
}
int32_t
glusterd_defrag_event_notify_handle (dict_t *dict)
{
glusterd_volinfo_t *volinfo = NULL;
char *volname = NULL;
int32_t ret = -1;
ret = dict_get_str (dict, "volname", &volname);
if (ret) {
gf_log ("", GF_LOG_ERROR, "Failed to get volname");
return ret;
}
ret = glusterd_volinfo_find (volname, &volinfo);
if (ret) {
gf_log ("", GF_LOG_ERROR, "Failed to get volinfo for %s"
, volname);
return ret;
}
ret = glusterd_defrag_volume_status_update (volinfo, dict);
if (ret)
gf_log ("", GF_LOG_ERROR, "Failed to update status");
return ret;
}

View File

@ -5418,3 +5418,56 @@ glusterd_validate_volume_id (dict_t *op_dict, glusterd_volinfo_t *volinfo)
out:
return ret;
}
int
glusterd_defrag_volume_status_update (glusterd_volinfo_t *volinfo,
dict_t *rsp_dict)
{
int ret = 0;
uint64_t files = 0;
uint64_t size = 0;
uint64_t lookup = 0;
gf_defrag_status_t status = GF_DEFRAG_STATUS_NOT_STARTED;
uint64_t failures = 0;
xlator_t *this = NULL;
this = THIS;
ret = dict_get_uint64 (rsp_dict, "files", &files);
if (ret)
gf_log (this->name, GF_LOG_TRACE,
"failed to get file count");
ret = dict_get_uint64 (rsp_dict, "size", &size);
if (ret)
gf_log (this->name, GF_LOG_TRACE,
"failed to get size of xfer");
ret = dict_get_uint64 (rsp_dict, "lookups", &lookup);
if (ret)
gf_log (this->name, GF_LOG_TRACE,
"failed to get lookedup file count");
ret = dict_get_int32 (rsp_dict, "status", (int32_t *)&status);
if (ret)
gf_log (this->name, GF_LOG_TRACE,
"failed to get status");
ret = dict_get_uint64 (rsp_dict, "failures", &failures);
if (ret)
gf_log (this->name, GF_LOG_TRACE,
"failed to get failure count");
if (files)
volinfo->rebalance_files = files;
if (size)
volinfo->rebalance_data = size;
if (lookup)
volinfo->lookedup_files = lookup;
if (status)
volinfo->defrag_status = status;
if (failures)
volinfo->rebalance_failures = failures;
return ret;
}

View File

@ -434,4 +434,8 @@ glusterd_is_local_brick (xlator_t *this, glusterd_volinfo_t *volinfo,
glusterd_brickinfo_t *brickinfo);
int
glusterd_validate_volume_id (dict_t *op_dict, glusterd_volinfo_t *volinfo);
int
glusterd_defrag_volume_status_update (glusterd_volinfo_t *volinfo,
dict_t *rsp_dict);
#endif

View File

@ -642,5 +642,6 @@ int glusterd_op_gsync_args_get (dict_t *dict, char **op_errstr,
/* Synctask part */
int32_t glusterd_op_begin_synctask (rpcsvc_request_t *req, glusterd_op_t op,
void *dict);
int32_t
glusterd_defrag_event_notify_handle (dict_t *dict);
#endif