2009-09-15 23:40:41 +04:00
# include <config.h>
# include <stdio.h>
# include <sys/types.h>
# include <stdint.h>
# include <malloc.h>
# include <signal.h>
# include <unistd.h>
# include <sys/select.h>
# include <string.h>
# include <errno.h>
# include <time.h>
# include <sys/uio.h>
# include <list.h>
2009-09-17 04:07:46 +04:00
# include <pthread.h>
2009-09-15 23:40:41 +04:00
2017-05-28 04:19:41 +03:00
# include <corosync/cpg.h>
2009-09-17 04:07:46 +04:00
2017-05-28 04:19:41 +03:00
# include "debug.h"
# include "virt.h"
# include "cpg.h"
2009-09-17 04:07:46 +04:00
2017-05-28 04:19:41 +03:00
# define NODE_ID_NONE ((uint32_t) -1)
2009-09-17 04:07:46 +04:00
struct msg_queue_node {
list_head ( ) ;
uint32_t seqno ;
# define STATE_CLEAR 0
# define STATE_MESSAGE 1
uint32_t state ;
void * msg ;
size_t msglen ;
} ;
struct wire_msg {
# define TYPE_REQUEST 0
# define TYPE_REPLY 1
2017-05-28 04:19:41 +03:00
# define TYPE_STORE_VM 2
2009-09-17 04:07:46 +04:00
uint32_t type ;
uint32_t seqno ;
uint32_t target ;
uint32_t pad ;
char data [ 0 ] ;
} ;
2017-05-28 04:19:41 +03:00
static uint32_t seqnum = 0 ;
static struct msg_queue_node * pending = NULL ;
2009-09-17 04:07:46 +04:00
static cpg_handle_t cpg_handle ;
static struct cpg_name gname ;
static pthread_mutex_t cpg_mutex = PTHREAD_MUTEX_INITIALIZER ;
static pthread_cond_t cpg_cond = PTHREAD_COND_INITIALIZER ;
static pthread_t cpg_thread = 0 ;
2017-05-28 04:19:41 +03:00
static pthread_mutex_t cpg_ids_mutex = PTHREAD_MUTEX_INITIALIZER ;
static uint32_t my_node_id = NODE_ID_NONE ;
static uint32_t high_id_from_callback = NODE_ID_NONE ;
2009-09-17 04:07:46 +04:00
2017-05-28 04:19:41 +03:00
static request_callback_fn req_callback_fn ;
static request_callback_fn store_callback_fn ;
static confchange_callback_fn conf_leave_fn ;
static confchange_callback_fn conf_join_fn ;
2009-09-17 04:07:46 +04:00
2016-04-25 21:12:53 +03:00
int
cpg_get_ids ( uint32_t * my_id , uint32_t * high_id )
{
if ( ! my_id & & ! high_id )
2017-05-28 04:19:41 +03:00
return - 1 ;
2009-09-17 04:07:46 +04:00
2017-05-28 04:19:41 +03:00
pthread_mutex_lock ( & cpg_ids_mutex ) ;
2016-04-25 21:12:53 +03:00
if ( my_id )
* my_id = my_node_id ;
2009-09-17 04:07:46 +04:00
2017-05-28 04:19:41 +03:00
if ( high_id )
* high_id = high_id_from_callback ;
pthread_mutex_unlock ( & cpg_ids_mutex ) ;
2016-04-25 21:12:53 +03:00
return 0 ;
}
2009-09-15 23:40:41 +04:00
void
cpg_deliver_func ( cpg_handle_t h ,
const struct cpg_name * group_name ,
uint32_t nodeid ,
uint32_t pid ,
void * msg ,
size_t msglen )
{
2009-09-17 04:07:46 +04:00
struct msg_queue_node * n ;
struct wire_msg * m = msg ;
int x , found ;
pthread_mutex_lock ( & cpg_mutex ) ;
if ( m - > type = = TYPE_REPLY ) {
/* Reply to a request we sent */
found = 0 ;
2017-05-28 04:19:41 +03:00
2009-09-17 04:07:46 +04:00
list_for ( & pending , n , x ) {
if ( m - > seqno ! = n - > seqno )
continue ;
if ( m - > target ! = my_node_id )
continue ;
found = 1 ;
break ;
}
if ( ! found )
goto out_unlock ;
/* Copy our message in to a buffer */
n - > msglen = msglen - sizeof ( * m ) ;
if ( ! n - > msglen ) {
/* XXX do what? */
}
n - > msg = malloc ( n - > msglen ) ;
if ( ! n - > msg ) {
goto out_unlock ;
}
n - > state = STATE_MESSAGE ;
memcpy ( n - > msg , ( char * ) msg + sizeof ( * m ) , n - > msglen ) ;
list_remove ( & pending , n ) ;
list_insert ( & pending , n ) ;
2017-05-28 04:19:41 +03:00
dbg_printf ( 2 , " Seqnum %d replied; removing from list \n " , n - > seqno ) ;
2009-09-17 04:07:46 +04:00
pthread_cond_broadcast ( & cpg_cond ) ;
goto out_unlock ;
}
pthread_mutex_unlock ( & cpg_mutex ) ;
if ( m - > type = = TYPE_REQUEST ) {
req_callback_fn ( & m - > data , msglen - sizeof ( * m ) ,
nodeid , m - > seqno ) ;
}
2017-05-28 04:19:41 +03:00
if ( m - > type = = TYPE_STORE_VM ) {
store_callback_fn ( & m - > data , msglen - sizeof ( * m ) ,
nodeid , m - > seqno ) ;
}
2009-09-17 04:07:46 +04:00
return ;
out_unlock :
pthread_mutex_unlock ( & cpg_mutex ) ;
2009-09-15 23:40:41 +04:00
}
2016-04-25 21:12:53 +03:00
2009-09-15 23:40:41 +04:00
void
cpg_config_change ( cpg_handle_t h ,
2017-05-28 04:19:41 +03:00
const struct cpg_name * group_name ,
2009-09-15 23:40:41 +04:00
const struct cpg_address * members , size_t memberlen ,
const struct cpg_address * left , size_t leftlen ,
const struct cpg_address * join , size_t joinlen )
{
2016-04-25 21:12:53 +03:00
int x ;
2017-05-28 04:19:41 +03:00
int high ;
pthread_mutex_lock ( & cpg_ids_mutex ) ;
high = my_node_id ;
2016-04-25 21:12:53 +03:00
for ( x = 0 ; x < memberlen ; x + + ) {
2017-05-28 04:19:41 +03:00
if ( members [ x ] . nodeid > high )
2016-04-25 21:12:53 +03:00
high = members [ x ] . nodeid ;
}
high_id_from_callback = high ;
2017-05-28 04:19:41 +03:00
pthread_mutex_unlock ( & cpg_ids_mutex ) ;
2016-04-25 21:12:53 +03:00
2017-05-28 04:19:41 +03:00
if ( joinlen > 0 )
conf_join_fn ( join , joinlen ) ;
if ( leftlen > 0 )
conf_leave_fn ( left , leftlen ) ;
2009-09-15 23:40:41 +04:00
}
static cpg_callbacks_t my_callbacks = {
. cpg_deliver_fn = cpg_deliver_func ,
. cpg_confchg_fn = cpg_config_change
} ;
int
2009-09-17 04:07:46 +04:00
cpg_send_req ( void * data , size_t len , uint32_t * seqno )
{
struct iovec iov ;
struct msg_queue_node * n ;
struct wire_msg * m ;
size_t msgsz = sizeof ( * m ) + len ;
int ret ;
n = malloc ( sizeof ( * n ) ) ;
if ( ! n )
return - 1 ;
2017-05-28 04:19:41 +03:00
2009-09-17 04:07:46 +04:00
m = malloc ( msgsz ) ;
2017-05-28 04:19:41 +03:00
if ( ! m ) {
free ( n ) ;
2009-09-17 04:07:46 +04:00
return - 1 ;
2017-05-28 04:19:41 +03:00
}
2009-09-17 04:07:46 +04:00
/* only incremented on send */
n - > state = STATE_CLEAR ;
n - > msg = NULL ;
n - > msglen = 0 ;
pthread_mutex_lock ( & cpg_mutex ) ;
list_insert ( & pending , n ) ;
n - > seqno = + + seqnum ;
m - > seqno = seqnum ;
* seqno = seqnum ;
pthread_mutex_unlock ( & cpg_mutex ) ;
m - > type = TYPE_REQUEST ; /* XXX swab? */
m - > target = NODE_ID_NONE ;
memcpy ( & m - > data , data , len ) ;
iov . iov_base = m ;
iov . iov_len = msgsz ;
ret = cpg_mcast_joined ( cpg_handle , CPG_TYPE_AGREED , & iov , 1 ) ;
free ( m ) ;
2017-05-28 04:19:41 +03:00
if ( ret = = CS_OK )
2009-09-17 04:07:46 +04:00
return 0 ;
return - 1 ;
}
int
2017-05-28 04:19:41 +03:00
cpg_send_vm_state ( virt_state_t * vs )
{
struct iovec iov ;
struct msg_queue_node * n ;
struct wire_msg * m ;
size_t msgsz = sizeof ( * m ) + sizeof ( * vs ) ;
int ret ;
n = calloc ( 1 , ( sizeof ( * n ) ) ) ;
if ( ! n )
return - 1 ;
m = calloc ( 1 , msgsz ) ;
if ( ! m ) {
free ( n ) ;
return - 1 ;
}
n - > state = STATE_MESSAGE ;
n - > msg = NULL ;
n - > msglen = 0 ;
pthread_mutex_lock ( & cpg_mutex ) ;
list_insert ( & pending , n ) ;
pthread_mutex_unlock ( & cpg_mutex ) ;
m - > type = TYPE_STORE_VM ;
m - > target = NODE_ID_NONE ;
memcpy ( & m - > data , vs , sizeof ( * vs ) ) ;
iov . iov_base = m ;
iov . iov_len = msgsz ;
ret = cpg_mcast_joined ( cpg_handle , CPG_TYPE_AGREED , & iov , 1 ) ;
free ( m ) ;
if ( ret = = CS_OK )
return 0 ;
return - 1 ;
}
int
cpg_send_reply ( void * data , size_t len , uint32_t nodeid , uint32_t seqno )
2009-09-15 23:40:41 +04:00
{
struct iovec iov ;
2009-09-17 04:07:46 +04:00
struct wire_msg * m ;
size_t msgsz = sizeof ( * m ) + len ;
int ret ;
m = malloc ( msgsz ) ;
if ( ! m )
return - 1 ;
/* only incremented on send */
m - > seqno = seqno ;
m - > type = TYPE_REPLY ; /* XXX swab? */
m - > target = nodeid ;
memcpy ( & m - > data , data , len ) ;
iov . iov_base = m ;
iov . iov_len = msgsz ;
ret = cpg_mcast_joined ( cpg_handle , CPG_TYPE_AGREED , & iov , 1 ) ;
free ( m ) ;
2017-05-28 04:19:41 +03:00
if ( ret = = CS_OK )
2009-09-17 04:07:46 +04:00
return 0 ;
2017-05-28 04:19:41 +03:00
2009-09-17 04:07:46 +04:00
return - 1 ;
}
int
cpg_wait_reply ( void * * data , size_t * len , uint32_t seqno )
{
struct msg_queue_node * n ;
int x , found = 0 ;
while ( ! found ) {
found = 0 ;
pthread_mutex_lock ( & cpg_mutex ) ;
pthread_cond_wait ( & cpg_cond , & cpg_mutex ) ;
list_for ( & pending , n , x ) {
if ( n - > seqno ! = seqno )
continue ;
if ( n - > state ! = STATE_MESSAGE )
continue ;
found = 1 ;
2017-05-28 04:19:41 +03:00
goto out ;
2009-09-17 04:07:46 +04:00
}
pthread_mutex_unlock ( & cpg_mutex ) ;
}
2017-05-28 04:19:41 +03:00
out :
2009-09-17 04:07:46 +04:00
list_remove ( & pending , n ) ;
pthread_mutex_unlock ( & cpg_mutex ) ;
* data = n - > msg ;
* len = n - > msglen ;
free ( n ) ;
return 0 ;
}
static void *
cpg_dispatch_thread ( void * arg )
{
2017-05-28 04:19:41 +03:00
cpg_dispatch ( cpg_handle , CS_DISPATCH_BLOCKING ) ;
2009-09-15 23:40:41 +04:00
2009-09-17 04:07:46 +04:00
return NULL ;
2009-09-15 23:40:41 +04:00
}
int
2017-05-28 04:19:41 +03:00
cpg_start ( const char * name ,
request_callback_fn req_cb_fn ,
request_callback_fn store_cb_fn ,
confchange_callback_fn join_fn ,
confchange_callback_fn leave_fn )
2009-09-15 23:40:41 +04:00
{
cpg_handle_t h ;
2017-05-28 04:19:41 +03:00
int ret ;
2009-09-15 23:40:41 +04:00
errno = EINVAL ;
2009-09-17 04:07:46 +04:00
if ( ! name )
2009-09-15 23:40:41 +04:00
return - 1 ;
2017-05-29 20:54:51 +03:00
ret = snprintf ( gname . value , sizeof ( gname . value ) , " %s " , name ) ;
2017-05-28 04:19:41 +03:00
if ( ret < = 0 )
2009-09-15 23:40:41 +04:00
return - 1 ;
2017-05-28 04:19:41 +03:00
if ( ret > = sizeof ( gname . value ) ) {
errno = ENAMETOOLONG ;
2009-09-15 23:40:41 +04:00
return - 1 ;
2017-05-28 04:19:41 +03:00
}
gname . length = ret ;
2009-09-17 04:07:46 +04:00
2009-09-15 23:40:41 +04:00
memset ( & h , 0 , sizeof ( h ) ) ;
2017-05-28 04:19:41 +03:00
if ( cpg_initialize ( & h , & my_callbacks ) ! = CS_OK ) {
2009-09-15 23:40:41 +04:00
perror ( " cpg_initialize " ) ;
return - 1 ;
}
2017-05-28 04:19:41 +03:00
if ( cpg_join ( h , & gname ) ! = CS_OK ) {
2009-09-15 23:40:41 +04:00
perror ( " cpg_join " ) ;
return - 1 ;
}
2009-09-17 04:07:46 +04:00
cpg_local_get ( h , & my_node_id ) ;
2017-05-28 04:19:41 +03:00
dbg_printf ( 2 , " My CPG nodeid is %d \n " , my_node_id ) ;
2009-09-17 04:07:46 +04:00
2017-05-28 04:19:41 +03:00
pthread_mutex_lock ( & cpg_mutex ) ;
2009-09-17 04:07:46 +04:00
pthread_create ( & cpg_thread , NULL , cpg_dispatch_thread , NULL ) ;
memcpy ( & cpg_handle , & h , sizeof ( h ) ) ;
2017-05-28 04:19:41 +03:00
req_callback_fn = req_cb_fn ;
store_callback_fn = store_cb_fn ;
conf_join_fn = join_fn ;
conf_leave_fn = leave_fn ;
2009-09-17 04:07:46 +04:00
pthread_mutex_unlock ( & cpg_mutex ) ;
2009-09-15 23:40:41 +04:00
return 0 ;
}
int
2009-09-17 04:07:46 +04:00
cpg_stop ( void )
2009-09-15 23:40:41 +04:00
{
2009-09-17 04:07:46 +04:00
pthread_cancel ( cpg_thread ) ;
pthread_join ( cpg_thread , NULL ) ;
cpg_leave ( cpg_handle , & gname ) ;
cpg_finalize ( cpg_handle ) ;
2009-09-15 23:40:41 +04:00
return 0 ;
}