425 lines
7.1 KiB
C
Raw Normal View History

#include <config.h>
#include <stdio.h>
#include <sys/types.h>
#include <stdint.h>
#include <malloc.h>
#include <signal.h>
#include <unistd.h>
#include <sys/select.h>
#include <string.h>
#include <errno.h>
#include <time.h>
#include <sys/uio.h>
#include <list.h>
#include <pthread.h>
#ifdef HAVE_OPENAIS_CPG_H
#include <openais/cpg.h>
#else
#ifdef HAVE_COROSYNC_CPG_H
#include <corosync/cpg.h>
#endif
#endif
#include "checkpoint.h"
#define NODE_ID_NONE ((uint32_t)-1)
struct msg_queue_node {
list_head();
uint32_t seqno;
#define STATE_CLEAR 0
#define STATE_MESSAGE 1
uint32_t state;
void *msg;
size_t msglen;
};
struct wire_msg {
#define TYPE_REQUEST 0
#define TYPE_REPLY 1
uint32_t type;
uint32_t seqno;
uint32_t target;
uint32_t pad;
char data[0];
};
static uint32_t seqnum = 0, my_node_id = NODE_ID_NONE;
static struct msg_queue_node *pending= NULL;
static cpg_handle_t cpg_handle;
static struct cpg_name gname;
static pthread_mutex_t cpg_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t cpg_cond = PTHREAD_COND_INITIALIZER;
static pthread_t cpg_thread = 0;
static request_callback_fn req_callback_fn;
int
cpg_get_ids(uint32_t *my_id, uint32_t *high_id)
{
#if 0
/* This is segfaulting for some reason */
struct cpg_address cpg_nodes[CPG_MEMBERS_MAX];
uint32_t high = my_node_id;
int count = CPG_MEMBERS_MAX, x;
if (!my_id && !high_id)
return 0;
if (my_id)
*my_id = my_node_id;
if (!high_id)
return 0;
memset(&cpg_nodes, 0, sizeof(cpg_nodes));
if (cpg_membership_get(cpg_handle, &gname,
cpg_nodes, &count) != CPG_OK)
return -1;
for (x = 0; x < count; x++) {
if (cpg_nodes[x].nodeid > high) {
high = cpg_nodes[x].nodeid;
}
}
*high_id = high;
return 0;
#endif
return -ENOSYS;
}
void
#ifdef HAVE_OPENAIS_CPG_H
cpg_deliver_func(cpg_handle_t h,
struct cpg_name *group_name,
uint32_t nodeid,
uint32_t pid,
void *msg,
int msglen)
#else
cpg_deliver_func(cpg_handle_t h,
const struct cpg_name *group_name,
uint32_t nodeid,
uint32_t pid,
void *msg,
size_t msglen)
#endif
{
struct msg_queue_node *n;
struct wire_msg *m = msg;
int x, found;
pthread_mutex_lock(&cpg_mutex);
if (m->type == TYPE_REPLY) {
/* Reply to a request we sent */
found = 0;
list_for(&pending, n, x) {
if (m->seqno != n->seqno)
continue;
if (m->target != my_node_id)
continue;
found = 1;
break;
}
if (!found)
goto out_unlock;
/* Copy our message in to a buffer */
n->msglen = msglen - sizeof(*m);
if (!n->msglen) {
/* XXX do what? */
}
n->msg = malloc(n->msglen);
if (!n->msg) {
goto out_unlock;
}
n->state = STATE_MESSAGE;
memcpy(n->msg, (char *)msg + sizeof(*m), n->msglen);
list_remove(&pending, n);
list_insert(&pending, n);
#if 0
printf("Seqnum %d replied; removing from list",
n->seqno);
#endif
pthread_cond_broadcast(&cpg_cond);
goto out_unlock;
}
pthread_mutex_unlock(&cpg_mutex);
if (m->type == TYPE_REQUEST) {
req_callback_fn(&m->data, msglen - sizeof(*m),
nodeid, m->seqno);
}
return;
out_unlock:
pthread_mutex_unlock(&cpg_mutex);
}
void
#ifdef HAVE_OPENAIS_CPG_H
cpg_config_change(cpg_handle_t h,
struct cpg_name *group_name,
struct cpg_address *members, int memberlen,
struct cpg_address *left, int leftlen,
struct cpg_address *join, int joinlen)
#else
cpg_config_change(cpg_handle_t h,
const struct cpg_name *group_name,
const struct cpg_address *members, size_t memberlen,
const struct cpg_address *left, size_t leftlen,
const struct cpg_address *join, size_t joinlen)
#endif
{
/* Don't care */
return;
}
static cpg_callbacks_t my_callbacks = {
.cpg_deliver_fn = cpg_deliver_func,
.cpg_confchg_fn = cpg_config_change
};
int
cpg_send_req(void *data, size_t len, uint32_t *seqno)
{
struct iovec iov;
struct msg_queue_node *n;
struct wire_msg *m;
size_t msgsz = sizeof(*m) + len;
int ret;
n = malloc(sizeof(*n));
if (!n)
return -1;
m = malloc(msgsz);
if (!m)
return -1;
/* only incremented on send */
n->state = STATE_CLEAR;
n->msg = NULL;
n->msglen = 0;
pthread_mutex_lock(&cpg_mutex);
list_insert(&pending, n);
n->seqno = ++seqnum;
m->seqno = seqnum;
*seqno = seqnum;
pthread_mutex_unlock(&cpg_mutex);
m->type = TYPE_REQUEST; /* XXX swab? */
m->target = NODE_ID_NONE;
memcpy(&m->data, data, len);
iov.iov_base = m;
iov.iov_len = msgsz;
ret = cpg_mcast_joined(cpg_handle, CPG_TYPE_AGREED, &iov, 1);
free(m);
if (ret == CPG_OK)
return 0;
return -1;
}
int
cpg_send_reply(void *data, size_t len, uint32_t nodeid,
uint32_t seqno)
{
struct iovec iov;
struct wire_msg *m;
size_t msgsz = sizeof(*m) + len;
int ret;
m = malloc(msgsz);
if (!m)
return -1;
/* only incremented on send */
m->seqno = seqno;
m->type = TYPE_REPLY; /* XXX swab? */
m->target = nodeid;
memcpy(&m->data, data, len);
iov.iov_base = m;
iov.iov_len = msgsz;
ret = cpg_mcast_joined(cpg_handle, CPG_TYPE_AGREED, &iov, 1);
free(m);
if (ret == CPG_OK)
return 0;
return -1;
}
int
cpg_wait_reply(void **data, size_t *len, uint32_t seqno)
{
struct msg_queue_node *n;
int x, found = 0;
while (!found) {
found = 0;
pthread_mutex_lock(&cpg_mutex);
pthread_cond_wait(&cpg_cond, &cpg_mutex);
list_for(&pending, n, x) {
if (n->seqno != seqno)
continue;
if (n->state != STATE_MESSAGE)
continue;
found = 1;
break;
}
pthread_mutex_unlock(&cpg_mutex);
}
list_remove(&pending, n);
pthread_mutex_unlock(&cpg_mutex);
*data = n->msg;
*len = n->msglen;
free(n);
return 0;
}
static void *
cpg_dispatch_thread(void *arg)
{
cpg_dispatch(cpg_handle, CPG_DISPATCH_BLOCKING);
return NULL;
}
int
cpg_start(const char *name, request_callback_fn func)
{
cpg_handle_t h;
errno = EINVAL;
if (!name)
return -1;
gname.length = snprintf(gname.value,
sizeof(gname.value), name);
if (gname.length >= sizeof(gname.value)) {
errno = ENAMETOOLONG;
return -1;
}
if (gname.length <= 0)
return -1;
memset(&h, 0, sizeof(h));
if (cpg_initialize(&h, &my_callbacks) != CPG_OK) {
perror("cpg_initialize");
return -1;
}
if (cpg_join(h, &gname) != CPG_OK) {
perror("cpg_join");
return -1;
}
pthread_mutex_lock(&cpg_mutex);
cpg_local_get(h, &my_node_id);
pthread_create(&cpg_thread, NULL, cpg_dispatch_thread, NULL);
memcpy(&cpg_handle, &h, sizeof(h));
req_callback_fn = func;
pthread_mutex_unlock(&cpg_mutex);
return 0;
}
int
cpg_stop(void)
{
pthread_cancel(cpg_thread);
pthread_join(cpg_thread, NULL);
cpg_leave(cpg_handle, &gname);
cpg_finalize(cpg_handle);
return 0;
}
#ifdef STANDALONE
int please_quit = 0;
void
go_away(int sig)
{
please_quit = 1;
}
void
request_callback(void *data, size_t len, uint32_t nodeid, uint32_t seqno)
{
char *msg = data;
printf("msg = %s\n", msg);
cpg_send_reply("fail.", 7, nodeid, seqno);
}
int
main(int argc, char **argv)
{
uint32_t seqno = 0;
int fd;
char *data;
size_t len;
signal(SIGINT, go_away);
if (cpg_start("lhh1", request_callback) < 0) {
perror("cpg_start");
return 1;
}
cpg_send_req("hi", 2, &seqno);
cpg_wait_reply(&data, &len, seqno);
printf("%s\n", data);
printf("going bye\n");
cpg_stop();
return 0;
}
#endif