Merge pull request #7 from lanconnected/master

Add keepalive ability to virt-serial and resolve several problems in its event listener, fix several imperfections in checkpoint plugin.
This commit is contained in:
Ryan McCabe 2017-05-22 14:07:05 -04:00 committed by GitHub
commit fe112ac52d
5 changed files with 163 additions and 259 deletions

View File

@ -78,76 +78,14 @@ static int
virt_list_update(virConnectPtr vp, virt_list_t **vl, int my_id)
{
virt_list_t *list = NULL;
list = vl_get(vp, my_id);
if (!list)
return -1;
if (*vl)
vl_free(*vl);
list = vl_get(vp, my_id);
*vl = list;
return 0;
}
static int
get_cman_ids(cman_handle_t ch, uint32_t *my_id, uint32_t *high_id)
{
int max_nodes;
int actual;
cman_node_t *nodes = NULL;
cman_node_t me;
uint32_t high = 0;
int ret = -1, x, _local = 0;
if (!my_id && !high_id)
return 0;
if (!ch) {
_local = 1;
ch = cman_init(NULL);
}
if (!ch)
if (!list)
return -1;
max_nodes = cman_get_node_count(ch);
if (max_nodes <= 0)
goto out;
if (my_id) {
memset(&me, 0, sizeof(me));
if (cman_get_node(ch, CMAN_NODEID_US, &me) < 0)
goto out;
*my_id = me.cn_nodeid;
}
if (!high_id) {
ret = 0;
goto out;
}
nodes = malloc(sizeof(cman_node_t) * max_nodes);
if (!nodes)
goto out;
memset(nodes, 0, sizeof(cman_node_t) * max_nodes);
if (cman_get_nodes(ch, max_nodes, &actual, nodes) < 0)
goto out;
for (x = 0; x < actual; x++)
if (nodes[x].cn_nodeid > high && nodes[x].cn_member)
high = nodes[x].cn_nodeid;
*high_id = high;
ret = 0;
out:
if (nodes)
free(nodes);
if (ch && _local)
cman_finish(ch);
return ret;
return 0;
}
@ -256,7 +194,7 @@ wait_domain(const char *vm_name, virConnectPtr vp, int timeout)
static int
cluster_virt_status(const char *vm_name, uint32_t *owner)
{
vm_state_t chk_state;
vm_state_t chk_state, temp_state;
virt_state_t *vs;
uint32_t me, high_id;
int ret = 0;
@ -264,9 +202,7 @@ cluster_virt_status(const char *vm_name, uint32_t *owner)
dbg_printf(80, "%s %s\n", __FUNCTION__, vm_name);
/* if we can't find the high ID, we can't do anything useful */
/* This should be cpg_get_ids() but it's segfaulting for some
reason :( */
if (get_cman_ids(NULL, &me, &high_id) != 0)
if (cpg_get_ids(&me, &high_id) != 0)
return 2;
if (use_uuid) {
@ -277,14 +213,40 @@ cluster_virt_status(const char *vm_name, uint32_t *owner)
if (!vs) {
ret = 2; /* not found locally */
temp_state.s_owner = 0;
temp_state.s_state = 0;
if (get_domain_state_ckpt(checkpoint_handle,
vm_name, &chk_state) < 0) {
if (me == high_id) {
dbg_printf(2, "High ID: Unknown VM\n");
ret = 3;
goto out;
}
} else if (me == chk_state.s_owner) {
/* <UVT> If domain has disappeared completely from libvirt (i.e., destroyed)
we'd end up with the checkpoing section containing its last state and last owner.
fence_virtd will freeze at the next status call, as no one will be willing to
return anything but 2. So we should delete corresponding section, but only if
we are high_id, because otherwise we don't know if the domain hasn't been started
on some other node. If checkpoint states us as an owner of the domain, but we
don't have it, we set s_state to a special value to let high_id know about
this situation. </UVT> */
dbg_printf(2, "I am an owner of unexisting domain, mangling field\n");
temp_state.s_owner = me;
temp_state.s_state = -1;
if (ckpt_write(checkpoint_handle, vm_name,
&temp_state, sizeof(vm_state_t)) < 0)
dbg_printf(2, "error storing in %s\n", __FUNCTION__);
}
if (me != high_id)
goto out;
if (get_domain_state_ckpt(checkpoint_handle,
vm_name, &chk_state)) {
dbg_printf(2, "High ID: Unknown VM\n");
ret = 3;
if ((chk_state.s_state == -1) || (temp_state.s_state == -1)) {
dbg_printf(2, "I am high id and state field is mangled, removing section\n");
ckpt_erase (checkpoint_handle, vm_name);
ret = 1;
goto out;
}
@ -301,7 +263,7 @@ cluster_virt_status(const char *vm_name, uint32_t *owner)
}
out:
dbg_printf(80, "%s %s\n", __FUNCTION__, vm_name);
dbg_printf(80, "%s %s %d\n", __FUNCTION__, vm_name, ret);
return ret;
}
@ -318,9 +280,10 @@ store_domains_by_name(void *hp, virt_list_t *vl)
if (!strcmp(DOMAIN0NAME, vl->vm_states[x].v_name))
continue;
dbg_printf(2, "Storing %s\n", vl->vm_states[x].v_name);
ckpt_write(hp, vl->vm_states[x].v_name,
if (ckpt_write(hp, vl->vm_states[x].v_name,
&vl->vm_states[x].v_state,
sizeof(vm_state_t));
sizeof(vm_state_t)) < 0)
dbg_printf(2, "error storing in %s\n", __FUNCTION__);
}
}
@ -337,9 +300,10 @@ store_domains_by_uuid(void *hp, virt_list_t *vl)
if (!strcmp(DOMAIN0UUID, vl->vm_states[x].v_uuid))
continue;
dbg_printf(2, "Storing %s\n", vl->vm_states[x].v_uuid);
ckpt_write(hp, vl->vm_states[x].v_uuid,
if (ckpt_write(hp, vl->vm_states[x].v_uuid,
&vl->vm_states[x].v_state,
sizeof(vm_state_t));
sizeof(vm_state_t)) < 0)
dbg_printf(2, "error storing in %s\n", __FUNCTION__);
}
}
@ -362,10 +326,16 @@ update_local_vms(void)
store_domains_by_uuid(checkpoint_handle, local_vms);
else
store_domains_by_name(checkpoint_handle, local_vms);
virConnectClose(vp);
if (vp) virConnectClose(vp);
}
/* <UVT>
Functions do_off and do_reboot should return error only if fencing
was actualy unsuccessful, i.e., domain was running and is still
running after fencing attempt. If domain is not running after fencing
(did not exist before or couldn't be started after), 0 should be returned
</UVT> */
static int
do_off(const char *vm_name)
{
@ -388,7 +358,7 @@ do_off(const char *vm_name)
if (!vdp) {
dbg_printf(2, "Nothing to do - domain does not exist\n");
return 1;
return 0;
}
if (((virDomainGetInfo(vdp, &vdi) == 0) &&
@ -451,7 +421,7 @@ do_reboot(const char *vm_name)
if (!vdp) {
dbg_printf(2, "[libvirt:REBOOT] Nothing to "
"do - domain does not exist\n");
return 1;
return 0;
}
if (((virDomainGetInfo(vdp, &vdi) == 0) &&
@ -531,6 +501,8 @@ out:
/*<UVT> This function must send reply from at least one node, otherwise
requesting fence_virtd would block forever in wait_cpt_reply </UVT> */
static void
do_real_work(void *data, size_t len, uint32_t nodeid, uint32_t seqno)
{
@ -564,17 +536,29 @@ do_real_work(void *data, size_t len, uint32_t nodeid, uint32_t seqno)
ret = 0;
break;
}
if (ret != 0) {
if (ret == 2) {
return;
}
if (ret == 1) {
ret = 0;
break;
}
/* Must be running locally to perform 'off' */
ret = do_off(req->vm_name);
break;
case FENCE_REBOOT:
ret = cluster_virt_status(req->vm_name, &owner);
if (ret != 0) {
if (ret == 3) {
ret = 0;
break;
}
if (ret == 2) {
return;
}
if (ret == 1) {
ret = 0;
break;
}
/* Must be running locally to perform 'reboot' */
ret = do_reboot(req->vm_name);
break;

View File

@ -46,6 +46,7 @@ struct wire_msg {
};
static uint32_t seqnum = 0, my_node_id = NODE_ID_NONE;
static uint32_t high_id_from_callback = NODE_ID_NONE;
static struct msg_queue_node *pending= NULL;
static cpg_handle_t cpg_handle;
static struct cpg_name gname;
@ -55,11 +56,12 @@ static pthread_cond_t cpg_cond = PTHREAD_COND_INITIALIZER;
static pthread_t cpg_thread = 0;
static request_callback_fn req_callback_fn;
/* <UVT> function cpg_membership_get is (probably) buggy and returns correct
count only before cpg_mcast_joined, subsequent calls set count to 0 </UVT> */
#if 0
int
cpg_get_ids(uint32_t *my_id, uint32_t *high_id)
{
#if 0
/* This is segfaulting for some reason */
struct cpg_address cpg_nodes[CPG_MEMBERS_MAX];
uint32_t high = my_node_id;
@ -89,12 +91,25 @@ cpg_get_ids(uint32_t *my_id, uint32_t *high_id)
*high_id = high;
return 0;
#endif
return -ENOSYS;
}
#endif
int
cpg_get_ids(uint32_t *my_id, uint32_t *high_id)
{
if (!my_id && !high_id)
return 0;
if (my_id)
*my_id = my_node_id;
if (!high_id)
return 0;
*high_id = high_id_from_callback;
return 0;
}
void
#ifdef HAVE_OPENAIS_CPG_H
@ -171,6 +186,7 @@ out_unlock:
pthread_mutex_unlock(&cpg_mutex);
}
void
#ifdef HAVE_OPENAIS_CPG_H
cpg_config_change(cpg_handle_t h,
@ -186,7 +202,17 @@ cpg_config_change(cpg_handle_t h,
const struct cpg_address *join, size_t joinlen)
#endif
{
/* Don't care */
int x;
int high = my_node_id;
for (x = 0; x < memberlen; x++) {
if (members[x].nodeid > high) {
high = members[x].nodeid;
}
}
high_id_from_callback = high;
return;
}

View File

@ -27,20 +27,6 @@
#define STREQ(a,b) (strcmp((a),(b)) == 0)
/* handle globals */
static int h_fd = -1;
static virEventHandleType h_event = 0;
static virEventHandleCallback h_cb = NULL;
static virFreeCallback h_ff = NULL;
static void *h_opaque = NULL;
/* timeout globals */
#define TIMEOUT_MS 1000
static int t_active = 0;
static int t_timeout = -1;
static virEventTimeoutCallback t_cb = NULL;
static virFreeCallback t_ff = NULL;
static void *t_opaque = NULL;
static pthread_t event_tid = 0;
static int run = 0;
@ -48,20 +34,6 @@ static int run = 0;
const char *eventToString(int event);
int myDomainEventCallback1(virConnectPtr conn, virDomainPtr dom,
int event, int detail, void *opaque);
int myEventAddHandleFunc(int fd, int event,
virEventHandleCallback cb,
void *opaque, virFreeCallback ff);
void myEventUpdateHandleFunc(int watch, int event);
int myEventRemoveHandleFunc(int watch);
int myEventAddTimeoutFunc(int timeout,
virEventTimeoutCallback cb,
void *opaque, virFreeCallback ff);
void myEventUpdateTimeoutFunc(int timer, int timout);
int myEventRemoveTimeoutFunc(int timer);
int myEventHandleTypeToPollEvent(virEventHandleType events);
virEventHandleType myPollEventToEventHandleType(int events);
void usage(const char *pname);
@ -70,103 +42,6 @@ struct domain_info {
virDomainEventType event;
};
/* EventImpl Functions */
int
myEventHandleTypeToPollEvent(virEventHandleType events)
{
int ret = 0;
if (events & VIR_EVENT_HANDLE_READABLE)
ret |= POLLIN;
if (events & VIR_EVENT_HANDLE_WRITABLE)
ret |= POLLOUT;
if (events & VIR_EVENT_HANDLE_ERROR)
ret |= POLLERR;
if (events & VIR_EVENT_HANDLE_HANGUP)
ret |= POLLHUP;
return ret;
}
virEventHandleType
myPollEventToEventHandleType(int events)
{
virEventHandleType ret = 0;
if (events & POLLIN)
ret |= VIR_EVENT_HANDLE_READABLE;
if (events & POLLOUT)
ret |= VIR_EVENT_HANDLE_WRITABLE;
if (events & POLLERR)
ret |= VIR_EVENT_HANDLE_ERROR;
if (events & POLLHUP)
ret |= VIR_EVENT_HANDLE_HANGUP;
return ret;
}
int
myEventAddHandleFunc(int fd, int event,
virEventHandleCallback cb,
void *opaque, virFreeCallback ff)
{
DEBUG1("Add handle %d %d %p %p %p", fd, event, cb, opaque, ff);
h_fd = fd;
h_event = myEventHandleTypeToPollEvent(event);
h_cb = cb;
h_opaque = opaque;
h_ff = ff;
return 0;
}
void
myEventUpdateHandleFunc(int fd, int event)
{
DEBUG1("Updated Handle %d %d", fd, event);
h_event = myEventHandleTypeToPollEvent(event);
return;
}
int
myEventRemoveHandleFunc(int fd)
{
DEBUG1("Removed Handle %d", fd);
h_fd = 0;
if (h_ff)
(h_ff) (h_opaque);
return 0;
}
int
myEventAddTimeoutFunc(int timeout,
virEventTimeoutCallback cb,
void *opaque, virFreeCallback ff)
{
DEBUG1("Adding Timeout %d %p %p", timeout, cb, opaque);
t_active = 1;
t_timeout = timeout;
t_cb = cb;
t_ff = ff;
t_opaque = opaque;
return 0;
}
void
myEventUpdateTimeoutFunc(int timer, int timeout)
{
/*DEBUG1("Timeout updated %d %d", timer, timeout); */
t_timeout = timeout;
}
int
myEventRemoveTimeoutFunc(int timer)
{
DEBUG1("Timeout removed %d", timer);
t_active = 0;
if (t_ff)
(t_ff) (t_opaque);
return 0;
}
static int
is_in_directory(const char *dir, const char *pathspec)
{
@ -395,6 +270,30 @@ struct event_args {
int wake_fd;
};
void
connectClose(virConnectPtr conn ATTRIBUTE_UNUSED,
int reason,
void *opaque ATTRIBUTE_UNUSED)
{
switch (reason) {
case VIR_CONNECT_CLOSE_REASON_ERROR:
dbg_printf(2, "Connection closed due to I/O error\n");
break;
case VIR_CONNECT_CLOSE_REASON_EOF:
dbg_printf(2, "Connection closed due to end of file\n");
break;
case VIR_CONNECT_CLOSE_REASON_KEEPALIVE:
dbg_printf(2, "Connection closed due to keepalive timeout\n");
break;
case VIR_CONNECT_CLOSE_REASON_CLIENT:
dbg_printf(2, "Connection closed due to client request\n");
break;
default:
dbg_printf(2, "Connection closed due to unknown reason\n");
break;
};
run = 0;
}
int
myDomainEventCallback1(virConnectPtr conn,
@ -425,7 +324,6 @@ event_thread(void *arg)
struct event_args *args = (struct event_args *)arg;
virConnectPtr dconn = NULL;
int callback1ret = -1;
int sts;
dbg_printf(3, "Libvirt event listener starting\n");
if (args->uri)
@ -434,13 +332,10 @@ event_thread(void *arg)
dbg_printf(3," * Socket path: %s\n", args->path);
dbg_printf(3," * Mode: %s\n", args->mode ? "VMChannel" : "Serial");
top:
virEventRegisterImpl(myEventAddHandleFunc,
myEventUpdateHandleFunc,
myEventRemoveHandleFunc,
myEventAddTimeoutFunc,
myEventUpdateTimeoutFunc,
myEventRemoveTimeoutFunc);
if (virEventRegisterDefaultImpl() < 0) {
dbg_printf(1, "Failed to register default event impl\n");
goto out;
}
dconn = virConnectOpen(args->uri);
if (!dconn) {
@ -448,6 +343,8 @@ top:
goto out;
}
virConnectRegisterCloseCallback(dconn, connectClose, NULL, NULL);
DEBUG0("Registering domain event cbs");
registerExisting(dconn, args->path, args->mode);
@ -455,43 +352,14 @@ top:
callback1ret =
virConnectDomainEventRegister(dconn, myDomainEventCallback1, arg, NULL);
if (callback1ret == 0) {
if (callback1ret != -1) {
if (virConnectSetKeepAlive(dconn, 5, 5) < 0) {
dbg_printf(1, "Failed to start keepalive protocol\n");
run = 0;
}
while (run) {
struct pollfd pfd = {
.fd = h_fd,
.events = h_event,
.revents = 0
};
sts = poll(&pfd, 1, TIMEOUT_MS);
/* We are assuming timeout of 0 here - so execute every time */
if (t_cb && t_active) {
t_cb(t_timeout, t_opaque);
}
if (sts == 0) {
/* DEBUG0("Poll timeout"); */
continue;
}
if (sts < 0) {
DEBUG0("Poll failed");
continue;
}
if (pfd.revents & POLLHUP) {
DEBUG0("Reset by peer");
virConnectDomainEventDeregister(dconn, myDomainEventCallback1);
if (dconn && virConnectClose(dconn) < 0)
dbg_printf(1, "error closing libvirt connection\n");
DEBUG0("Attempting to reinitialize libvirt connection");
goto top;
}
if (h_cb) {
h_cb(0, h_fd,
myPollEventToEventHandleType(pfd.revents & h_event),
h_opaque);
if (virEventRunDefaultImpl() < 0) {
dbg_printf(1, "RunDefaultImpl Failed\n");
}
}

View File

@ -73,6 +73,7 @@ typedef void ckpt_handle;
int ckpt_read(void *hp, const char *secid, void *buf, size_t maxlen);
int ckpt_finish(void *hp);
int ckpt_write(void *hp, const char *secid, void *buf, size_t maxlen);
int ckpt_erase(void *hp, const char *secid);
void *ckpt_init(const char *ckpt_name, int maxlen, int maxsec, int maxseclen,
int timeout);

View File

@ -32,6 +32,7 @@
#include <unistd.h>
#include <stdio.h>
#include <assert.h>
#include "xvm.h"
typedef struct {
uint32_t ck_ready;
@ -189,10 +190,10 @@ ckpt_open(ckpt_handle *h, const char *ckpt_name, int maxsize,
attrs.creationFlags = SA_CKPT_WR_ALL_REPLICAS;
attrs.checkpointSize = (SaSizeT)maxsize;
attrs.retentionDuration = SA_TIME_ONE_HOUR;
attrs.retentionDuration = SA_TIME_ONE_MINUTE;
attrs.maxSections = maxsec;
attrs.maxSectionSize = (SaSizeT)maxsecsize;
attrs.maxSectionIdSize = (SaSizeT)32;
attrs.maxSectionIdSize = (SaSizeT)MAX_DOMAINNAME_LENGTH;
flags = SA_CKPT_CHECKPOINT_READ |
SA_CKPT_CHECKPOINT_WRITE |
@ -329,6 +330,30 @@ ckpt_read(void *hp, const char *secid, void *buf, size_t maxlen)
}
int
ckpt_erase(void *hp, const char *secid)
{
ckpt_handle *h = (ckpt_handle *)hp;
SaAisErrorT err;
SaCkptSectionIdT sectionId;
VALIDATE(h);
sectionId.id = (uint8_t *)secid;
sectionId.idLen = strlen(secid);
err = saCkptSectionDelete(h->ck_checkpoint, &sectionId);
if (err == SA_AIS_OK)
saCkptCheckpointSynchronize(h->ck_checkpoint,
h->ck_timeout);
errno = ais_to_posix(err);
if (errno)
return -1;
return 0;
}
int
ckpt_finish(void *hp)
{