diff --git a/include/xvm.h b/include/xvm.h index 54d499e..5836384 100644 --- a/include/xvm.h +++ b/include/xvm.h @@ -55,7 +55,8 @@ typedef enum { FENCE_REBOOT = 0x2, /* Hit the reset button */ FENCE_ON = 0x3, /* Turn the VM on */ FENCE_STATUS = 0x4, /* virtual machine status (off/on) */ - FENCE_DEVSTATUS = 0x5 /* List virtual machines */ + FENCE_DEVSTATUS = 0x5, /* Status of the fencing device */ + FENCE_HOSTLIST = 0x6 /* List VMs controllable */ } fence_cmd_t; #define DEFAULT_TTL 4 diff --git a/server/checkpoint.c b/server/checkpoint.c index 48a76ad..1626b1d 100644 --- a/server/checkpoint.c +++ b/server/checkpoint.c @@ -28,8 +28,11 @@ #include #include #include +#include #include +#include #include +#include #ifdef HAVE_OPENAIS_CPG_H #include #else @@ -38,6 +41,13 @@ #endif #endif +#include + +#include +#include "virt.h" +#include "xvm.h" +#include "checkpoint.h" + #define NAME "checkpoint" #define VERSION "0.8" @@ -47,8 +57,6 @@ struct check_info { int magic; int pad; - cpg_handle_t h; - virConnectPtr vp; }; #define VALIDATE(arg) \ @@ -60,8 +68,531 @@ do {\ } while(0) +static void *checkpoint_handle = NULL; +static virt_list_t *local_vms = NULL; +static char *uri = NULL; +static int use_uuid = 0; + static int -ckpt_null(const char *vm_name, void *priv) +virt_list_update(virConnectPtr vp, virt_list_t **vl, int my_id) +{ + virt_list_t *list = NULL; + + list = vl_get(vp, my_id); + if (!list) + return -1; + + if (*vl) + vl_free(*vl); + *vl = list; + + return 0; +} + + +static int +get_cman_ids(cman_handle_t ch, uint32_t *my_id, uint32_t *high_id) +{ + int max_nodes; + int actual; + cman_node_t *nodes = NULL; + cman_node_t me; + uint32_t high = 0; + int ret = -1, x, _local = 0; + + if (!my_id && !high_id) + return 0; + + if (!ch) { + _local = 1; + ch = cman_init(NULL); + } + if (!ch) + return -1; + + max_nodes = cman_get_node_count(ch); + if (max_nodes <= 0) + goto out; + + if (my_id) { + memset(&me, 0, sizeof(me)); + if (cman_get_node(ch, CMAN_NODEID_US, &me) < 0) + goto out; + *my_id = me.cn_nodeid; + } + + if (!high_id) { + ret = 0; + goto out; + } + + nodes = malloc(sizeof(cman_node_t) * max_nodes); + if (!nodes) + goto out; + memset(nodes, 0, sizeof(cman_node_t) * max_nodes); + + if (cman_get_nodes(ch, max_nodes, &actual, nodes) < 0) + goto out; + + for (x = 0; x < actual; x++) + if (nodes[x].cn_nodeid > high && nodes[x].cn_member) + high = nodes[x].cn_nodeid; + + *high_id = high; + + ret = 0; +out: + if (nodes) + free(nodes); + if (ch && _local) + cman_finish(ch); + return ret; +} + + +static int +node_operational(uint32_t nodeid) +{ + cman_handle_t ch; + cman_node_t node; + + ch = cman_init(NULL); + if (!ch) + return -1; + + memset(&node, 0, sizeof(node)); + if (cman_get_node(ch, nodeid, &node) == 0) { + cman_finish(ch); + return !!node.cn_member; + } + + cman_finish(ch); + return 0; +} + + +static int +get_domain_state_ckpt(void *hp, const char *domain, vm_state_t *state) +{ + errno = EINVAL; + + if (!hp || !domain || !state || !strlen((char *)domain)) + return -1; + if (!strcmp(DOMAIN0NAME, (char *)domain)) + return -1; + + return ckpt_read(hp, domain, state, sizeof(*state)); +} + + +static inline int +wait_domain(const char *vm_name, virConnectPtr vp, int timeout) +{ + int tries = 0; + int response = 1; + virDomainPtr vdp; + virDomainInfo vdi; + + if (use_uuid) { + vdp = virDomainLookupByUUID(vp, (const unsigned char *)vm_name); + } else { + vdp = virDomainLookupByName(vp, vm_name); + } + if (!vdp) + return 0; + + /* Check domain liveliness. If the domain is still here, + we return failure, and the client must then retry */ + /* XXX On the xen 3.0.4 API, we will be able to guarantee + synchronous virDomainDestroy, so this check will not + be necessary */ + do { + sleep(1); + if (use_uuid) { + vdp = virDomainLookupByUUID(vp, + (const unsigned char *)vm_name); + } else { + vdp = virDomainLookupByName(vp, vm_name); + } + if (!vdp) { + dbg_printf(2, "Domain no longer exists\n"); + response = 0; + break; + } + + memset(&vdi, 0, sizeof(vdi)); + virDomainGetInfo(vdp, &vdi); + virDomainFree(vdp); + + if (vdi.state == VIR_DOMAIN_SHUTOFF) { + dbg_printf(2, "Domain has been shut off\n"); + response = 0; + break; + } + + dbg_printf(4, "Domain still exists (state %d) " + "after %d seconds\n", + vdi.state, tries); + + if (++tries >= timeout) + break; + } while (1); + + return response; +} + + + +/* + Returns: 0 - operational + 1 - dead or presumed so + 2 - VM not local and I am not the right node to deal with it + 3 - VM status unknown; cannot operate on it + */ +static int +cluster_virt_status(const char *vm_name, uint32_t *owner) +{ + vm_state_t chk_state; + virt_state_t *vs; + uint32_t me, high_id; + int ret = 0; + + dbg_printf(80, "%s %s\n", __FUNCTION__, vm_name); + + /* if we can't find the high ID, we can't do anything useful */ + /* This should be cpg_get_ids() but it's segfaulting for some + reason :( */ + if (get_cman_ids(NULL, &me, &high_id) != 0) + return 2; + + if (use_uuid) { + vs = vl_find_uuid(local_vms, vm_name); + } else { + vs = vl_find_name(local_vms, vm_name); + } + + if (!vs) { + ret = 2; /* not found locally */ + + if (me != high_id) + goto out; + + if (get_domain_state_ckpt(checkpoint_handle, + vm_name, &chk_state)) { + dbg_printf(2, "High ID: Unknown VM\n"); + ret = 3; + goto out; + } + + if (node_operational(chk_state.s_owner)) { + *owner = chk_state.s_owner; + dbg_printf(2, "High ID: Owner is operational\n"); + ret = 2; + } else { + dbg_printf(2, "High ID: Owner is dead; returning 'off'\n"); + ret = 1; + } + } else if (vs->v_state.s_state == VIR_DOMAIN_SHUTOFF) { + ret = 1; /* local and off */ + } + +out: + dbg_printf(80, "%s %s\n", __FUNCTION__, vm_name); + return ret; +} + + +static void +store_domains_by_name(void *hp, virt_list_t *vl) +{ + int x; + + if (!vl) + return; + + for (x = 0; x < vl->vm_count; x++) { + if (!strcmp(DOMAIN0NAME, vl->vm_states[x].v_name)) + continue; + dbg_printf(2, "Storing %s\n", vl->vm_states[x].v_name); + ckpt_write(hp, vl->vm_states[x].v_name, + &vl->vm_states[x].v_state, + sizeof(vm_state_t)); + } +} + + +static void +store_domains_by_uuid(void *hp, virt_list_t *vl) +{ + int x; + + if (!vl) + return; + + for (x = 0; x < vl->vm_count; x++) { + if (!strcmp(DOMAIN0UUID, vl->vm_states[x].v_uuid)) + continue; + dbg_printf(2, "Storing %s\n", vl->vm_states[x].v_uuid); + ckpt_write(hp, vl->vm_states[x].v_uuid, + &vl->vm_states[x].v_state, + sizeof(vm_state_t)); + } +} + + +static void +update_local_vms(void) +{ + virConnectPtr vp = NULL; + uint32_t my_id = 0; + + cpg_get_ids(&my_id, NULL); + + vp = virConnectOpen(uri); + if (!vp) { + syslog(LOG_ERR, "Failed to connect to hypervisor\n"); + } + virt_list_update(vp, &local_vms, my_id); + vl_print(local_vms); + if (use_uuid) + store_domains_by_uuid(checkpoint_handle, local_vms); + else + store_domains_by_name(checkpoint_handle, local_vms); + virConnectClose(vp); +} + + +static int +do_off(const char *vm_name) +{ + virConnectPtr vp; + virDomainPtr vdp; + virDomainInfo vdi; + int ret = -1; + + dbg_printf(5, "%s %s\n", __FUNCTION__, vm_name); + vp = virConnectOpen(uri); + if (!vp) + return 1; + + if (use_uuid) { + vdp = virDomainLookupByUUID(vp, + (const unsigned char *)vm_name); + } else { + vdp = virDomainLookupByName(vp, vm_name); + } + + if (!vdp || + ((virDomainGetInfo(vdp, &vdi) == 0) && + (vdi.state == VIR_DOMAIN_SHUTOFF))) { + dbg_printf(2, "Nothing to do - domain does not exist\n"); + + if (vdp) + virDomainFree(vdp); + return 0; + } + + syslog(LOG_NOTICE, "Destroying domain %s\n", vm_name); + dbg_printf(2, "[OFF] Calling virDomainDestroy\n"); + ret = virDomainDestroy(vdp); + if (ret < 0) { + syslog(LOG_NOTICE, "Failed to destroy domain: %d\n", ret); + printf("virDomainDestroy() failed: %d\n", ret); + + ret = 1; + goto out; + } + + if (ret) { + syslog(LOG_NOTICE, + "Domain %s still exists; fencing failed\n", + vm_name); + printf("Domain %s still exists; fencing failed\n", vm_name); + + ret = 1; + goto out; + } + + ret = 0; +out: + virConnectClose(vp); + return ret; +} + + +static int +do_reboot(const char *vm_name) +{ + virConnectPtr vp; + virDomainPtr vdp, nvdp; + virDomainInfo vdi; + char *domain_desc; + int ret; + + //uuid_unparse(vm_uuid, uu_string); + dbg_printf(5, "%s %s\n", __FUNCTION__, vm_name); + vp = virConnectOpen(uri); + if (!vp) + return 1; + + if (use_uuid) { + vdp = virDomainLookupByUUID(vp, + (const unsigned char *)vm_name); + } else { + vdp = virDomainLookupByName(vp, vm_name); + } + + if (!vdp || ((virDomainGetInfo(vdp, &vdi) == 0) && + (vdi.state == VIR_DOMAIN_SHUTOFF))) { + dbg_printf(2, "[libvirt:REBOOT] Nothing to " + "do - domain does not exist\n"); + if (vdp) + virDomainFree(vdp); + return 0; + } + + syslog(LOG_NOTICE, "Rebooting domain %s\n", vm_name); + printf("Rebooting domain %s...\n", vm_name); + domain_desc = virDomainGetXMLDesc(vdp, 0); + + if (!domain_desc) { + printf("Failed getting domain description from " + "libvirt\n"); + } + + dbg_printf(2, "[REBOOT] Calling virDomainDestroy(%p)\n", vdp); + ret = virDomainDestroy(vdp); + if (ret < 0) { + printf("virDomainDestroy() failed: %d/%d\n", ret, errno); + free(domain_desc); + virDomainFree(vdp); + ret = 1; + goto out; + } + + ret = wait_domain(vm_name, vp, 15); + + if (ret) { + syslog(LOG_NOTICE, "Domain %s still exists; fencing failed\n", + vm_name); + printf("Domain %s still exists; fencing failed\n", vm_name); + if (domain_desc) + free(domain_desc); + ret = 1; + goto out; + } + + if (!domain_desc) { + ret = 0; + goto out; + } + + /* 'on' is not a failure */ + ret = 0; + + dbg_printf(3, "[[ XML Domain Info ]]\n"); + dbg_printf(3, "%s\n[[ XML END ]]\n", domain_desc); + dbg_printf(2, "Calling virDomainCreateLinux()...\n"); + + nvdp = virDomainCreateLinux(vp, domain_desc, 0); + if (nvdp == NULL) { + /* More recent versions of libvirt or perhaps the + * KVM back-end do not let you create a domain from + * XML if there is already a defined domain description + * with the same name that it knows about. You must + * then call virDomainCreate() */ + dbg_printf(2, "Failed; Trying virDomainCreate()...\n"); + if (virDomainCreate(vdp) < 0) { + syslog(LOG_NOTICE, + "Could not restart %s\n", + vm_name); + dbg_printf(1, "Failed to recreate guest" + " %s!\n", vm_name); + } + } + + free(domain_desc); + +out: + virConnectClose(vp); + return ret; +} + + + +static void +do_real_work(void *data, size_t len, uint32_t nodeid, uint32_t seqno) +{ + struct ckpt_fence_req *req = data; + struct ckpt_fence_req reply; + uint32_t owner; + int ret; + + memcpy(&reply, req, sizeof(reply)); + + update_local_vms(); + + switch(req->request) { + case FENCE_STATUS: + ret = cluster_virt_status(req->vm_name, &owner); + if (ret == 2) { + return; + } + break; + case FENCE_OFF: + ret = cluster_virt_status(req->vm_name, &owner); + if (ret != 0) { + return; + } + /* Must be running locally to perform 'off' */ + ret = do_off(req->vm_name); + break; + case FENCE_REBOOT: + ret = cluster_virt_status(req->vm_name, &owner); + if (ret != 0) { + return; + } + /* Must be running locally to perform 'reboot' */ + ret = do_reboot(req->vm_name); + break; + } + + cpg_send_reply(&reply, sizeof(reply), nodeid, seqno); +} + + +static int +do_request(const char *vm_name, int request, uint32_t seqno) +{ + struct ckpt_fence_req freq, *frp; + size_t retlen; + uint32_t seq; + int ret; + + memset(&freq, 0, sizeof(freq)); + snprintf(freq.vm_name, sizeof(freq.vm_name), vm_name); + freq.request = request; + freq.seqno = seqno; + + if (cpg_send_req(&freq, sizeof(freq), &seq) != 0) { + printf("Failed to send\n"); + return 1; + } + + if (cpg_wait_reply((void *)&frp, &retlen, seq) != 0) { + printf("Failed to receive\n"); + return 1; + } + + ret = frp->response; + free(frp); + + return ret; +} + + +static int +checkpoint_null(const char *vm_name, void *priv) { VALIDATE(priv); printf("[CKPT] Null operation on %s\n", vm_name); @@ -71,17 +602,17 @@ ckpt_null(const char *vm_name, void *priv) static int -ckpt_off(const char *vm_name, uint32_t seqno, void *priv) +checkpoint_off(const char *vm_name, uint32_t seqno, void *priv) { VALIDATE(priv); printf("[CKPT] OFF operation on %s seq %d\n", vm_name, seqno); - return 1; + return do_request(vm_name, FENCE_OFF, seqno); } static int -ckpt_on(const char *vm_name, uint32_t seqno, void *priv) +checkpoint_on(const char *vm_name, uint32_t seqno, void *priv) { VALIDATE(priv); printf("[CKPT] ON operation on %s seq %d\n", vm_name, seqno); @@ -91,7 +622,7 @@ ckpt_on(const char *vm_name, uint32_t seqno, void *priv) static int -ckpt_devstatus(void *priv) +checkpoint_devstatus(void *priv) { printf("[CKPT] Device status\n"); VALIDATE(priv); @@ -101,31 +632,76 @@ ckpt_devstatus(void *priv) static int -ckpt_status(const char *vm_name, void *priv) +checkpoint_status(const char *vm_name, void *priv) { VALIDATE(priv); printf("[CKPT] STATUS operation on %s\n", vm_name); - return 1; + return do_request(vm_name, FENCE_STATUS, 0); } static int -ckpt_reboot(const char *vm_name, uint32_t seqno, void *priv) +checkpoint_reboot(const char *vm_name, uint32_t seqno, void *priv) { VALIDATE(priv); printf("[CKPT] REBOOT operation on %s seq %d\n", vm_name, seqno); - return 1; + return do_request(vm_name, FENCE_REBOOT, 0); } static int -ckpt_init(backend_context_t *c, config_object_t *config) +checkpoint_init(backend_context_t *c, config_object_t *config) { - //char value[256]; + char value[1024]; struct check_info *info = NULL; + int x; +#ifdef _MODULE + if (sc_get(config, "fence_virtd/@debug", value, sizeof(value))==0) + dset(atoi(value)); +#endif + + if (sc_get(config, "backends/libvirt/@uri", + value, sizeof(value)) == 0) { + uri = strdup(value); + if (!uri) { + free(info); + return -1; + } + dbg_printf(1, "Using %s\n", uri); + } + + if (sc_get(config, "backends/checkpoint/@uri", + value, sizeof(value)) == 0) { + if (uri) + free(uri); + uri = strdup(value); + if (!uri) { + free(info); + return -1; + } + dbg_printf(1, "Using %s\n", uri); + } + + /* Naming scheme is a top-level configuration option */ + if (sc_get(config, "fence_virtd/@name_mode", + value, sizeof(value)-1) == 0) { + + dbg_printf(1, "Got %s for name_mode\n", value); + if (!strcasecmp(value, "uuid")) { + use_uuid = 1; + } else if (!strcasecmp(value, "name")) { + use_uuid = 0; + } else { + dbg_printf(1, "Unsupported name_mode: %s\n", value); + } + } + + if (cpg_start(PACKAGE_NAME, do_real_work) < 0) { + return -1; + } info = malloc(sizeof(*info)); if (!info) return -1; @@ -134,13 +710,29 @@ ckpt_init(backend_context_t *c, config_object_t *config) info->magic = MAGIC; + x = 0; + while ((checkpoint_handle = ckpt_init( + "vm_states", 262144, 4096, 64, 10 + )) == NULL) { + if (!x) { + dbg_printf(1, "Could not initialize " + "saCkPt; retrying...\n"); + x = 1; + } + sleep(3); + } + if (x) + dbg_printf(1, "Checkpoint initialized\n"); + + update_local_vms(); + *c = (void *)info; return 0; } static int -ckpt_shutdown(backend_context_t c) +checkpoint_shutdown(backend_context_t c) { struct check_info *info = (struct check_info *)c; @@ -148,25 +740,27 @@ ckpt_shutdown(backend_context_t c) info->magic = 0; free(info); + cpg_stop(); + return 0; } -static fence_callbacks_t ckpt_callbacks = { - .null = ckpt_null, - .off = ckpt_off, - .on = ckpt_on, - .reboot = ckpt_reboot, - .status = ckpt_status, - .devstatus = ckpt_devstatus +static fence_callbacks_t checkpoint_callbacks = { + .null = checkpoint_null, + .off = checkpoint_off, + .on = checkpoint_on, + .reboot = checkpoint_reboot, + .status = checkpoint_status, + .devstatus = checkpoint_devstatus }; -static backend_plugin_t ckpt_plugin = { +static backend_plugin_t checkpoint_plugin = { .name = NAME, .version = VERSION, - .callbacks = &ckpt_callbacks, - .init = ckpt_init, - .cleanup = ckpt_shutdown, + .callbacks = &checkpoint_callbacks, + .init = checkpoint_init, + .cleanup = checkpoint_shutdown, }; @@ -180,12 +774,12 @@ BACKEND_VER_SYM(void) const backend_plugin_t * BACKEND_INFO_SYM(void) { - return &ckpt_plugin; + return &checkpoint_plugin; } #else static void __attribute__((constructor)) -ckpt_register_plugin(void) +checkpoint_register_plugin(void) { - plugin_reg_backend(&ckpt_plugin); + plugin_reg_backend(&checkpoint_plugin); } #endif diff --git a/server/cpg.c b/server/cpg.c index 4ea64f9..3fc687c 100644 --- a/server/cpg.c +++ b/server/cpg.c @@ -11,6 +11,7 @@ #include #include #include +#include #ifdef HAVE_OPENAIS_CPG_H #include #else @@ -19,6 +20,81 @@ #endif #endif +#include "checkpoint.h" + +#define NODE_ID_NONE ((uint32_t)-1) + + +struct msg_queue_node { + list_head(); + uint32_t seqno; +#define STATE_CLEAR 0 +#define STATE_MESSAGE 1 + uint32_t state; + void *msg; + size_t msglen; +}; + +struct wire_msg { +#define TYPE_REQUEST 0 +#define TYPE_REPLY 1 + uint32_t type; + uint32_t seqno; + uint32_t target; + uint32_t pad; + char data[0]; +}; + +static uint32_t seqnum = 0, my_node_id = NODE_ID_NONE; +static struct msg_queue_node *pending= NULL; +static cpg_handle_t cpg_handle; +static struct cpg_name gname; + +static pthread_mutex_t cpg_mutex = PTHREAD_MUTEX_INITIALIZER; +static pthread_cond_t cpg_cond = PTHREAD_COND_INITIALIZER; +static pthread_t cpg_thread = 0; +static request_callback_fn req_callback_fn; + + +int +cpg_get_ids(uint32_t *my_id, uint32_t *high_id) +{ +#if 0 + /* This is segfaulting for some reason */ + struct cpg_address cpg_nodes[CPG_MEMBERS_MAX]; + uint32_t high = my_node_id; + int count = CPG_MEMBERS_MAX, x; + + if (!my_id && !high_id) + return 0; + + if (my_id) + *my_id = my_node_id; + + if (!high_id) + return 0; + + memset(&cpg_nodes, 0, sizeof(cpg_nodes)); + + if (cpg_membership_get(cpg_handle, &gname, + cpg_nodes, &count) != CPG_OK) + return -1; + + for (x = 0; x < count; x++) { + if (cpg_nodes[x].nodeid > high) { + high = cpg_nodes[x].nodeid; + } + } + + *high_id = high; + + return 0; +#endif + return -ENOSYS; +} + + + void #ifdef HAVE_OPENAIS_CPG_H @@ -37,8 +113,62 @@ cpg_deliver_func(cpg_handle_t h, size_t msglen) #endif { - printf("%s (len = %d) from node %d pid %d\n)\n", - (char *)msg, (int)msglen, nodeid, pid); + struct msg_queue_node *n; + struct wire_msg *m = msg; + int x, found; + + pthread_mutex_lock(&cpg_mutex); + + if (m->type == TYPE_REPLY) { + /* Reply to a request we sent */ + found = 0; + + list_for(&pending, n, x) { + if (m->seqno != n->seqno) + continue; + if (m->target != my_node_id) + continue; + found = 1; + break; + } + + if (!found) + goto out_unlock; + + /* Copy our message in to a buffer */ + n->msglen = msglen - sizeof(*m); + if (!n->msglen) { + /* XXX do what? */ + } + n->msg = malloc(n->msglen); + if (!n->msg) { + goto out_unlock; + } + n->state = STATE_MESSAGE; + memcpy(n->msg, (char *)msg + sizeof(*m), n->msglen); + + list_remove(&pending, n); + list_insert(&pending, n); + +#if 0 + printf("Seqnum %d replied; removing from list", + n->seqno); +#endif + + pthread_cond_broadcast(&cpg_cond); + goto out_unlock; + } + pthread_mutex_unlock(&cpg_mutex); + + if (m->type == TYPE_REQUEST) { + req_callback_fn(&m->data, msglen - sizeof(*m), + nodeid, m->seqno); + } + + return; + +out_unlock: + pthread_mutex_unlock(&cpg_mutex); } void @@ -68,25 +198,128 @@ static cpg_callbacks_t my_callbacks = { int -cpg_send(cpg_handle_t h, void *data, size_t len) +cpg_send_req(void *data, size_t len, uint32_t *seqno) { struct iovec iov; + struct msg_queue_node *n; + struct wire_msg *m; + size_t msgsz = sizeof(*m) + len; + int ret; - iov.iov_base = data; - iov.iov_len = len; - return cpg_mcast_joined(h, CPG_TYPE_AGREED, &iov, 1); + n = malloc(sizeof(*n)); + if (!n) + return -1; + m = malloc(msgsz); + if (!m) + return -1; + + /* only incremented on send */ + n->state = STATE_CLEAR; + n->msg = NULL; + n->msglen = 0; + + pthread_mutex_lock(&cpg_mutex); + list_insert(&pending, n); + n->seqno = ++seqnum; + m->seqno = seqnum; + *seqno = seqnum; + pthread_mutex_unlock(&cpg_mutex); + + m->type = TYPE_REQUEST; /* XXX swab? */ + m->target = NODE_ID_NONE; + memcpy(&m->data, data, len); + + iov.iov_base = m; + iov.iov_len = msgsz; + ret = cpg_mcast_joined(cpg_handle, CPG_TYPE_AGREED, &iov, 1); + + free(m); + if (ret == CPG_OK) + return 0; + return -1; } int -cpg_start(const char *name, cpg_handle_t *handle, struct cpg_name *grpname) +cpg_send_reply(void *data, size_t len, uint32_t nodeid, + uint32_t seqno) +{ + struct iovec iov; + struct wire_msg *m; + size_t msgsz = sizeof(*m) + len; + int ret; + + m = malloc(msgsz); + if (!m) + return -1; + + /* only incremented on send */ + m->seqno = seqno; + m->type = TYPE_REPLY; /* XXX swab? */ + m->target = nodeid; + memcpy(&m->data, data, len); + + iov.iov_base = m; + iov.iov_len = msgsz; + ret = cpg_mcast_joined(cpg_handle, CPG_TYPE_AGREED, &iov, 1); + + free(m); + if (ret == CPG_OK) + return 0; + return -1; +} + + +int +cpg_wait_reply(void **data, size_t *len, uint32_t seqno) +{ + struct msg_queue_node *n; + int x, found = 0; + + while (!found) { + found = 0; + pthread_mutex_lock(&cpg_mutex); + pthread_cond_wait(&cpg_cond, &cpg_mutex); + + list_for(&pending, n, x) { + if (n->seqno != seqno) + continue; + if (n->state != STATE_MESSAGE) + continue; + found = 1; + break; + } + pthread_mutex_unlock(&cpg_mutex); + } + + list_remove(&pending, n); + pthread_mutex_unlock(&cpg_mutex); + + *data = n->msg; + *len = n->msglen; + free(n); + + return 0; +} + + +static void * +cpg_dispatch_thread(void *arg) +{ + cpg_dispatch(cpg_handle, CPG_DISPATCH_BLOCKING); + + return NULL; +} + + +int +cpg_start(const char *name, request_callback_fn func) { cpg_handle_t h; - struct cpg_name gname; errno = EINVAL; - if (!name || !handle || !grpname) + if (!name) return -1; gname.length = snprintf(gname.value, @@ -99,6 +332,7 @@ cpg_start(const char *name, cpg_handle_t *handle, struct cpg_name *grpname) if (gname.length <= 0) return -1; + memset(&h, 0, sizeof(h)); if (cpg_initialize(&h, &my_callbacks) != CPG_OK) { perror("cpg_initialize"); @@ -110,18 +344,32 @@ cpg_start(const char *name, cpg_handle_t *handle, struct cpg_name *grpname) return -1; } - memcpy(handle, &h, sizeof(h)); - memcpy(grpname, &gname, sizeof(gname)); + + pthread_mutex_lock(&cpg_mutex); + + cpg_local_get(h, &my_node_id); + + pthread_create(&cpg_thread, NULL, cpg_dispatch_thread, NULL); + + memcpy(&cpg_handle, &h, sizeof(h)); + + req_callback_fn = func; + + pthread_mutex_unlock(&cpg_mutex); return 0; } int -cpg_end(cpg_handle_t h, struct cpg_name *gname) +cpg_stop(void) { - cpg_leave(h, gname); - cpg_finalize(h); + pthread_cancel(cpg_thread); + pthread_join(cpg_thread, NULL); + + cpg_leave(cpg_handle, &gname); + cpg_finalize(cpg_handle); + return 0; } @@ -136,42 +384,40 @@ go_away(int sig) } +void +request_callback(void *data, size_t len, uint32_t nodeid, uint32_t seqno) +{ + char *msg = data; + + printf("msg = %s\n", msg); + + cpg_send_reply("fail.", 7, nodeid, seqno); +} + + int main(int argc, char **argv) { - cpg_handle_t h; - struct cpg_name gname; - fd_set rfds; + uint32_t seqno = 0; int fd; + char *data; + size_t len; signal(SIGINT, go_away); - if (cpg_start("lhh1", &h, &gname) < 0) { + if (cpg_start("lhh1", request_callback) < 0) { perror("cpg_start"); return 1; } - if (cpg_fd_get(h, &fd) != CPG_OK) { - perror("cpg_fd_get"); - return -1; - } + cpg_send_req("hi", 2, &seqno); + cpg_wait_reply(&data, &len, seqno); - cpg_send(h, "hi", 2); - - while (please_quit != 1) { - FD_ZERO(&rfds); - FD_SET(fd, &rfds); - - if (select(fd+1, &rfds, NULL, NULL, NULL) < 0) - continue; - - cpg_dispatch(h, CPG_DISPATCH_ALL); - } + printf("%s\n", data); printf("going bye\n"); - cpg_leave(h, &gname); - cpg_finalize(h); + cpg_stop(); return 0; } diff --git a/server/virt.c b/server/virt.c index 35882b5..a67010c 100644 --- a/server/virt.c +++ b/server/virt.c @@ -177,7 +177,7 @@ vl_print(virt_list_t *vl) virt_state_t * -vl_find_name(virt_list_t *vl, char *name) +vl_find_name(virt_list_t *vl, const char *name) { int x; @@ -194,7 +194,7 @@ vl_find_name(virt_list_t *vl, char *name) virt_state_t * -vl_find_uuid(virt_list_t *vl, char *uuid) +vl_find_uuid(virt_list_t *vl, const char *uuid) { int x; diff --git a/server/virt.h b/server/virt.h index 9505542..7ebea7d 100644 --- a/server/virt.h +++ b/server/virt.h @@ -65,15 +65,15 @@ int vl_cmp(virt_list_t *left, virt_list_t *right); void vl_print(virt_list_t *vl); void vl_free(virt_list_t *old); -virt_state_t * vl_find_uuid(virt_list_t *vl, char *name); -virt_state_t * vl_find_name(virt_list_t *vl, char *name); +virt_state_t * vl_find_uuid(virt_list_t *vl, const char *name); +virt_state_t * vl_find_name(virt_list_t *vl, const char *name); typedef void ckpt_handle; -int ckpt_read(void *hp, char *secid, void *buf, size_t maxlen); +int ckpt_read(void *hp, const char *secid, void *buf, size_t maxlen); int ckpt_finish(void *hp); -int ckpt_write(void *hp, char *secid, void *buf, size_t maxlen); -void *ckpt_init(char *ckpt_name, int maxlen, int maxsec, int maxseclen, +int ckpt_write(void *hp, const char *secid, void *buf, size_t maxlen); +void *ckpt_init(const char *ckpt_name, int maxlen, int maxsec, int maxseclen, int timeout); diff --git a/server/vm_states.c b/server/vm_states.c index 351d24e..6b95bfa 100644 --- a/server/vm_states.c +++ b/server/vm_states.c @@ -245,8 +245,9 @@ ckpt_init(char *ckpt_name, int maxlen, int maxsec, if (err != SA_AIS_OK) { free(h); return NULL; - } else + } else { h->ck_ready = READY_MAGIC; + } if (ckpt_open(h, ckpt_name, maxlen, maxsec, maxseclen, timeout) < 0) { @@ -262,7 +263,7 @@ ckpt_init(char *ckpt_name, int maxlen, int maxsec, int -ckpt_write(void *hp, char *secid, void *buf, size_t maxlen) +ckpt_write(void *hp, const char *secid, void *buf, size_t maxlen) { ckpt_handle *h = (ckpt_handle *)hp; SaCkptIOVectorElementT iov = {SA_CKPT_DEFAULT_SECTION_ID, @@ -302,7 +303,7 @@ ckpt_write(void *hp, char *secid, void *buf, size_t maxlen) int -ckpt_read(void *hp, char *secid, void *buf, size_t maxlen) +ckpt_read(void *hp, const char *secid, void *buf, size_t maxlen) { ckpt_handle *h = (ckpt_handle *)hp; SaCkptIOVectorElementT iov = {SA_CKPT_DEFAULT_SECTION_ID,