2009-09-15 23:40:41 +04:00
# include <config.h>
# include <stdio.h>
# include <sys/types.h>
# include <stdint.h>
# include <malloc.h>
# include <signal.h>
# include <unistd.h>
# include <sys/select.h>
# include <string.h>
# include <errno.h>
# include <time.h>
# include <sys/uio.h>
# include <list.h>
2009-09-17 04:07:46 +04:00
# include <pthread.h>
2009-09-15 23:40:41 +04:00
# ifdef HAVE_OPENAIS_CPG_H
# include <openais/cpg.h>
# else
# ifdef HAVE_COROSYNC_CPG_H
# include <corosync/cpg.h>
# endif
# endif
2009-09-17 04:07:46 +04:00
# include "checkpoint.h"
# define NODE_ID_NONE ((uint32_t)-1)
struct msg_queue_node {
list_head ( ) ;
uint32_t seqno ;
# define STATE_CLEAR 0
# define STATE_MESSAGE 1
uint32_t state ;
void * msg ;
size_t msglen ;
} ;
struct wire_msg {
# define TYPE_REQUEST 0
# define TYPE_REPLY 1
uint32_t type ;
uint32_t seqno ;
uint32_t target ;
uint32_t pad ;
char data [ 0 ] ;
} ;
static uint32_t seqnum = 0 , my_node_id = NODE_ID_NONE ;
2016-04-25 21:12:53 +03:00
static uint32_t high_id_from_callback = NODE_ID_NONE ;
2009-09-17 04:07:46 +04:00
static struct msg_queue_node * pending = NULL ;
static cpg_handle_t cpg_handle ;
static struct cpg_name gname ;
static pthread_mutex_t cpg_mutex = PTHREAD_MUTEX_INITIALIZER ;
static pthread_cond_t cpg_cond = PTHREAD_COND_INITIALIZER ;
static pthread_t cpg_thread = 0 ;
static request_callback_fn req_callback_fn ;
2016-04-25 21:12:53 +03:00
/* <UVT> function cpg_membership_get is (probably) buggy and returns correct
count only before cpg_mcast_joined , subsequent calls set count to 0 < / UVT > */
#if 0
2009-09-17 04:07:46 +04:00
int
cpg_get_ids ( uint32_t * my_id , uint32_t * high_id )
{
/* This is segfaulting for some reason */
struct cpg_address cpg_nodes [ CPG_MEMBERS_MAX ] ;
uint32_t high = my_node_id ;
int count = CPG_MEMBERS_MAX , x ;
if ( ! my_id & & ! high_id )
return 0 ;
if ( my_id )
* my_id = my_node_id ;
if ( ! high_id )
return 0 ;
memset ( & cpg_nodes , 0 , sizeof ( cpg_nodes ) ) ;
if ( cpg_membership_get ( cpg_handle , & gname ,
cpg_nodes , & count ) ! = CPG_OK )
return - 1 ;
for ( x = 0 ; x < count ; x + + ) {
if ( cpg_nodes [ x ] . nodeid > high ) {
high = cpg_nodes [ x ] . nodeid ;
}
}
* high_id = high ;
return 0 ;
}
2016-04-25 21:12:53 +03:00
# endif
2009-09-17 04:07:46 +04:00
2016-04-25 21:12:53 +03:00
int
cpg_get_ids ( uint32_t * my_id , uint32_t * high_id )
{
if ( ! my_id & & ! high_id )
return 0 ;
2009-09-17 04:07:46 +04:00
2016-04-25 21:12:53 +03:00
if ( my_id )
* my_id = my_node_id ;
2009-09-17 04:07:46 +04:00
2016-04-25 21:12:53 +03:00
if ( ! high_id )
return 0 ;
* high_id = high_id_from_callback ;
return 0 ;
}
2009-09-15 23:40:41 +04:00
void
# ifdef HAVE_OPENAIS_CPG_H
cpg_deliver_func ( cpg_handle_t h ,
struct cpg_name * group_name ,
uint32_t nodeid ,
uint32_t pid ,
void * msg ,
int msglen )
# else
cpg_deliver_func ( cpg_handle_t h ,
const struct cpg_name * group_name ,
uint32_t nodeid ,
uint32_t pid ,
void * msg ,
size_t msglen )
# endif
{
2009-09-17 04:07:46 +04:00
struct msg_queue_node * n ;
struct wire_msg * m = msg ;
int x , found ;
pthread_mutex_lock ( & cpg_mutex ) ;
if ( m - > type = = TYPE_REPLY ) {
/* Reply to a request we sent */
found = 0 ;
list_for ( & pending , n , x ) {
if ( m - > seqno ! = n - > seqno )
continue ;
if ( m - > target ! = my_node_id )
continue ;
found = 1 ;
break ;
}
if ( ! found )
goto out_unlock ;
/* Copy our message in to a buffer */
n - > msglen = msglen - sizeof ( * m ) ;
if ( ! n - > msglen ) {
/* XXX do what? */
}
n - > msg = malloc ( n - > msglen ) ;
if ( ! n - > msg ) {
goto out_unlock ;
}
n - > state = STATE_MESSAGE ;
memcpy ( n - > msg , ( char * ) msg + sizeof ( * m ) , n - > msglen ) ;
list_remove ( & pending , n ) ;
list_insert ( & pending , n ) ;
#if 0
printf ( " Seqnum %d replied; removing from list " ,
n - > seqno ) ;
# endif
pthread_cond_broadcast ( & cpg_cond ) ;
goto out_unlock ;
}
pthread_mutex_unlock ( & cpg_mutex ) ;
if ( m - > type = = TYPE_REQUEST ) {
req_callback_fn ( & m - > data , msglen - sizeof ( * m ) ,
nodeid , m - > seqno ) ;
}
return ;
out_unlock :
pthread_mutex_unlock ( & cpg_mutex ) ;
2009-09-15 23:40:41 +04:00
}
2016-04-25 21:12:53 +03:00
2009-09-15 23:40:41 +04:00
void
# ifdef HAVE_OPENAIS_CPG_H
cpg_config_change ( cpg_handle_t h ,
struct cpg_name * group_name ,
struct cpg_address * members , int memberlen ,
struct cpg_address * left , int leftlen ,
struct cpg_address * join , int joinlen )
# else
cpg_config_change ( cpg_handle_t h ,
const struct cpg_name * group_name ,
const struct cpg_address * members , size_t memberlen ,
const struct cpg_address * left , size_t leftlen ,
const struct cpg_address * join , size_t joinlen )
# endif
{
2016-04-25 21:12:53 +03:00
int x ;
int high = my_node_id ;
for ( x = 0 ; x < memberlen ; x + + ) {
if ( members [ x ] . nodeid > high ) {
high = members [ x ] . nodeid ;
}
}
high_id_from_callback = high ;
2009-09-15 23:40:41 +04:00
return ;
}
static cpg_callbacks_t my_callbacks = {
. cpg_deliver_fn = cpg_deliver_func ,
. cpg_confchg_fn = cpg_config_change
} ;
int
2009-09-17 04:07:46 +04:00
cpg_send_req ( void * data , size_t len , uint32_t * seqno )
{
struct iovec iov ;
struct msg_queue_node * n ;
struct wire_msg * m ;
size_t msgsz = sizeof ( * m ) + len ;
int ret ;
n = malloc ( sizeof ( * n ) ) ;
if ( ! n )
return - 1 ;
m = malloc ( msgsz ) ;
if ( ! m )
return - 1 ;
/* only incremented on send */
n - > state = STATE_CLEAR ;
n - > msg = NULL ;
n - > msglen = 0 ;
pthread_mutex_lock ( & cpg_mutex ) ;
list_insert ( & pending , n ) ;
n - > seqno = + + seqnum ;
m - > seqno = seqnum ;
* seqno = seqnum ;
pthread_mutex_unlock ( & cpg_mutex ) ;
m - > type = TYPE_REQUEST ; /* XXX swab? */
m - > target = NODE_ID_NONE ;
memcpy ( & m - > data , data , len ) ;
iov . iov_base = m ;
iov . iov_len = msgsz ;
ret = cpg_mcast_joined ( cpg_handle , CPG_TYPE_AGREED , & iov , 1 ) ;
free ( m ) ;
if ( ret = = CPG_OK )
return 0 ;
return - 1 ;
}
int
cpg_send_reply ( void * data , size_t len , uint32_t nodeid ,
uint32_t seqno )
2009-09-15 23:40:41 +04:00
{
struct iovec iov ;
2009-09-17 04:07:46 +04:00
struct wire_msg * m ;
size_t msgsz = sizeof ( * m ) + len ;
int ret ;
m = malloc ( msgsz ) ;
if ( ! m )
return - 1 ;
/* only incremented on send */
m - > seqno = seqno ;
m - > type = TYPE_REPLY ; /* XXX swab? */
m - > target = nodeid ;
memcpy ( & m - > data , data , len ) ;
iov . iov_base = m ;
iov . iov_len = msgsz ;
ret = cpg_mcast_joined ( cpg_handle , CPG_TYPE_AGREED , & iov , 1 ) ;
free ( m ) ;
if ( ret = = CPG_OK )
return 0 ;
return - 1 ;
}
int
cpg_wait_reply ( void * * data , size_t * len , uint32_t seqno )
{
struct msg_queue_node * n ;
int x , found = 0 ;
while ( ! found ) {
found = 0 ;
pthread_mutex_lock ( & cpg_mutex ) ;
pthread_cond_wait ( & cpg_cond , & cpg_mutex ) ;
list_for ( & pending , n , x ) {
if ( n - > seqno ! = seqno )
continue ;
if ( n - > state ! = STATE_MESSAGE )
continue ;
found = 1 ;
break ;
}
pthread_mutex_unlock ( & cpg_mutex ) ;
}
list_remove ( & pending , n ) ;
pthread_mutex_unlock ( & cpg_mutex ) ;
* data = n - > msg ;
* len = n - > msglen ;
free ( n ) ;
return 0 ;
}
static void *
cpg_dispatch_thread ( void * arg )
{
cpg_dispatch ( cpg_handle , CPG_DISPATCH_BLOCKING ) ;
2009-09-15 23:40:41 +04:00
2009-09-17 04:07:46 +04:00
return NULL ;
2009-09-15 23:40:41 +04:00
}
int
2009-09-17 04:07:46 +04:00
cpg_start ( const char * name , request_callback_fn func )
2009-09-15 23:40:41 +04:00
{
cpg_handle_t h ;
errno = EINVAL ;
2009-09-17 04:07:46 +04:00
if ( ! name )
2009-09-15 23:40:41 +04:00
return - 1 ;
gname . length = snprintf ( gname . value ,
sizeof ( gname . value ) , name ) ;
if ( gname . length > = sizeof ( gname . value ) ) {
errno = ENAMETOOLONG ;
return - 1 ;
}
if ( gname . length < = 0 )
return - 1 ;
2009-09-17 04:07:46 +04:00
2009-09-15 23:40:41 +04:00
memset ( & h , 0 , sizeof ( h ) ) ;
if ( cpg_initialize ( & h , & my_callbacks ) ! = CPG_OK ) {
perror ( " cpg_initialize " ) ;
return - 1 ;
}
if ( cpg_join ( h , & gname ) ! = CPG_OK ) {
perror ( " cpg_join " ) ;
return - 1 ;
}
2009-09-17 04:07:46 +04:00
pthread_mutex_lock ( & cpg_mutex ) ;
cpg_local_get ( h , & my_node_id ) ;
pthread_create ( & cpg_thread , NULL , cpg_dispatch_thread , NULL ) ;
memcpy ( & cpg_handle , & h , sizeof ( h ) ) ;
req_callback_fn = func ;
pthread_mutex_unlock ( & cpg_mutex ) ;
2009-09-15 23:40:41 +04:00
return 0 ;
}
int
2009-09-17 04:07:46 +04:00
cpg_stop ( void )
2009-09-15 23:40:41 +04:00
{
2009-09-17 04:07:46 +04:00
pthread_cancel ( cpg_thread ) ;
pthread_join ( cpg_thread , NULL ) ;
cpg_leave ( cpg_handle , & gname ) ;
cpg_finalize ( cpg_handle ) ;
2009-09-15 23:40:41 +04:00
return 0 ;
}
# ifdef STANDALONE
int please_quit = 0 ;
void
go_away ( int sig )
{
please_quit = 1 ;
}
2009-09-17 04:07:46 +04:00
void
request_callback ( void * data , size_t len , uint32_t nodeid , uint32_t seqno )
{
char * msg = data ;
printf ( " msg = %s \n " , msg ) ;
cpg_send_reply ( " fail. " , 7 , nodeid , seqno ) ;
}
2009-09-15 23:40:41 +04:00
int
main ( int argc , char * * argv )
{
2009-09-17 04:07:46 +04:00
uint32_t seqno = 0 ;
2009-09-15 23:40:41 +04:00
int fd ;
2009-09-17 04:07:46 +04:00
char * data ;
size_t len ;
2009-09-15 23:40:41 +04:00
signal ( SIGINT , go_away ) ;
2009-09-17 04:07:46 +04:00
if ( cpg_start ( " lhh1 " , request_callback ) < 0 ) {
2009-09-15 23:40:41 +04:00
perror ( " cpg_start " ) ;
return 1 ;
}
2009-09-17 04:07:46 +04:00
cpg_send_req ( " hi " , 2 , & seqno ) ;
cpg_wait_reply ( & data , & len , seqno ) ;
2009-09-15 23:40:41 +04:00
2009-09-17 04:07:46 +04:00
printf ( " %s \n " , data ) ;
2009-09-15 23:40:41 +04:00
printf ( " going bye \n " ) ;
2009-09-17 04:07:46 +04:00
cpg_stop ( ) ;
2009-09-15 23:40:41 +04:00
return 0 ;
}
# endif