2007-06-10 21:02:09 +04:00
/*
Unix SMB / CIFS implementation .
Samba internal messaging functions
Copyright ( C ) 2007 by Volker Lendecke
Copyright ( C ) 2007 by Andrew Tridgell
2009-08-07 14:09:21 +04:00
2007-06-10 21:02:09 +04:00
This program is free software ; you can redistribute it and / or modify
it under the terms of the GNU General Public License as published by
2007-07-09 23:25:36 +04:00
the Free Software Foundation ; either version 3 of the License , or
2007-06-10 21:02:09 +04:00
( at your option ) any later version .
2009-08-07 14:09:21 +04:00
2007-06-10 21:02:09 +04:00
This program is distributed in the hope that it will be useful ,
but WITHOUT ANY WARRANTY ; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE . See the
GNU General Public License for more details .
2009-08-07 14:09:21 +04:00
2007-06-10 21:02:09 +04:00
You should have received a copy of the GNU General Public License
2007-07-10 04:52:41 +04:00
along with this program . If not , see < http : //www.gnu.org/licenses/>.
2007-06-10 21:02:09 +04:00
*/
2016-04-08 17:14:33 +03:00
# include "replace.h"
2017-01-09 10:17:02 +03:00
# include <tevent.h>
2011-05-05 13:25:29 +04:00
# include "util_tdb.h"
2012-02-15 14:22:45 +04:00
# include "serverid.h"
2012-04-04 13:56:06 +04:00
# include "ctdbd_conn.h"
2014-05-06 14:21:42 +04:00
# include "system/select.h"
2015-10-12 16:57:34 +03:00
# include "lib/util/sys_rw_data.h"
2015-05-25 09:50:35 +03:00
# include "lib/util/iov_buf.h"
2016-04-19 17:14:30 +03:00
# include "lib/util/select.h"
2016-04-08 17:14:33 +03:00
# include "lib/util/debug.h"
# include "lib/util/talloc_stack.h"
# include "lib/util/genrand.h"
# include "lib/util/fault.h"
2017-01-09 10:17:02 +03:00
# include "lib/util/dlinklist.h"
# include "lib/util/tevent_unix.c"
# include "lib/util/sys_rw.h"
# include "lib/util/blocking.h"
2017-06-09 09:48:21 +03:00
# include "ctdb/include/ctdb_protocol.h"
2007-06-10 21:02:09 +04:00
/* paths to these include files come from --with-ctdb= in configure */
2011-08-30 19:02:54 +04:00
2015-05-19 08:01:55 +03:00
struct ctdbd_srvid_cb {
uint64_t srvid ;
2015-06-23 17:55:09 +03:00
int ( * cb ) ( uint32_t src_vnn , uint32_t dst_vnn ,
uint64_t dst_srvid ,
const uint8_t * msg , size_t msglen ,
void * private_data ) ;
2015-05-19 08:01:55 +03:00
void * private_data ;
} ;
2017-01-09 10:17:02 +03:00
struct ctdb_pkt_send_state ;
struct ctdb_pkt_recv_state ;
2007-06-10 21:02:09 +04:00
struct ctdbd_connection {
2012-07-30 17:31:59 +04:00
uint32_t reqid ;
uint32_t our_vnn ;
uint64_t rand_srvid ;
2015-05-19 08:01:55 +03:00
struct ctdbd_srvid_cb * callbacks ;
2014-05-06 14:21:42 +04:00
int fd ;
2015-09-26 00:36:35 +03:00
int timeout ;
2017-01-09 10:17:02 +03:00
2017-06-08 13:20:15 +03:00
/* For async connections, enabled via ctdbd_setup_fde() */
2017-01-09 10:17:02 +03:00
struct tevent_fd * fde ;
/* State to track in-progress read */
struct ctdb_read_state {
/* Receive buffer for the initial packet length */
uint32_t msglen ;
/* iovec state for current read */
struct iovec iov ;
struct iovec * iovs ;
int iovcnt ;
/* allocated receive buffer based on packet length */
struct ctdb_req_header * hdr ;
} read_state ;
/* Lists of pending async reads and writes */
struct ctdb_pkt_recv_state * recv_list ;
struct ctdb_pkt_send_state * send_list ;
2007-06-10 21:02:09 +04:00
} ;
2017-01-09 10:17:02 +03:00
static void ctdbd_async_socket_handler ( struct tevent_context * ev ,
struct tevent_fd * fde ,
uint16_t flags ,
void * private_data ) ;
static bool ctdbd_conn_has_async_sends ( struct ctdbd_connection * conn )
{
return ( conn - > send_list ! = NULL ) ;
}
static bool ctdbd_conn_has_async_reqs ( struct ctdbd_connection * conn )
{
return ( conn - > fde ! = NULL ) ;
}
2011-10-26 12:56:32 +04:00
static uint32_t ctdbd_next_reqid ( struct ctdbd_connection * conn )
{
conn - > reqid + = 1 ;
if ( conn - > reqid = = 0 ) {
conn - > reqid + = 1 ;
}
return conn - > reqid ;
}
2015-10-03 06:31:52 +03:00
static int ctdbd_control ( struct ctdbd_connection * conn ,
uint32_t vnn , uint32_t opcode ,
uint64_t srvid , uint32_t flags ,
TDB_DATA data ,
TALLOC_CTX * mem_ctx , TDB_DATA * outdata ,
2016-04-21 14:31:39 +03:00
int32_t * cstatus ) ;
2007-06-10 21:02:09 +04:00
/*
* exit on fatal communications errors with the ctdbd daemon
*/
static void cluster_fatal ( const char * why )
{
DEBUG ( 0 , ( " cluster fatal event: %s - exiting immediately \n " , why ) ) ;
/* we don't use smb_panic() as we don't want to delay to write
a core file . We need to release this process id immediately
so that someone else can take over without getting sharing
violations */
2009-10-22 15:03:20 +04:00
_exit ( 1 ) ;
2007-06-10 21:02:09 +04:00
}
2007-08-29 15:46:44 +04:00
/*
*
*/
static void ctdb_packet_dump ( struct ctdb_req_header * hdr )
{
2012-02-07 19:41:25 +04:00
if ( DEBUGLEVEL < 11 ) {
2007-08-29 15:46:44 +04:00
return ;
}
2012-02-07 19:41:25 +04:00
DEBUGADD ( 11 , ( " len=%d, magic=%x, vers=%d, gen=%d, op=%d, reqid=%d \n " ,
2007-08-29 15:46:44 +04:00
( int ) hdr - > length , ( int ) hdr - > ctdb_magic ,
( int ) hdr - > ctdb_version , ( int ) hdr - > generation ,
( int ) hdr - > operation , ( int ) hdr - > reqid ) ) ;
}
2007-06-10 21:02:09 +04:00
/*
* Register a srvid with ctdbd
*/
2015-10-03 06:42:05 +03:00
int register_with_ctdbd ( struct ctdbd_connection * conn , uint64_t srvid ,
int ( * cb ) ( uint32_t src_vnn , uint32_t dst_vnn ,
uint64_t dst_srvid ,
const uint8_t * msg , size_t msglen ,
void * private_data ) ,
void * private_data )
2007-06-10 21:02:09 +04:00
{
2016-04-21 14:31:39 +03:00
int ret ;
int32_t cstatus ;
2015-05-19 08:01:55 +03:00
size_t num_callbacks ;
struct ctdbd_srvid_cb * tmp ;
2014-11-21 18:20:10 +03:00
2016-04-13 19:47:46 +03:00
ret = ctdbd_control_local ( conn , CTDB_CONTROL_REGISTER_SRVID , srvid , 0 ,
tdb_null , NULL , NULL , & cstatus ) ;
2015-10-03 06:00:32 +03:00
if ( ret ! = 0 ) {
2015-10-03 06:42:05 +03:00
return ret ;
2014-11-21 18:20:10 +03:00
}
2015-05-19 08:01:55 +03:00
num_callbacks = talloc_array_length ( conn - > callbacks ) ;
2014-11-21 18:20:10 +03:00
2015-05-19 08:01:55 +03:00
tmp = talloc_realloc ( conn , conn - > callbacks , struct ctdbd_srvid_cb ,
num_callbacks + 1 ) ;
2014-11-21 18:20:10 +03:00
if ( tmp = = NULL ) {
2015-10-03 06:42:05 +03:00
return ENOMEM ;
2014-11-21 18:20:10 +03:00
}
2015-05-19 08:01:55 +03:00
conn - > callbacks = tmp ;
conn - > callbacks [ num_callbacks ] = ( struct ctdbd_srvid_cb ) {
2015-05-19 08:05:24 +03:00
. srvid = srvid , . cb = cb , . private_data = private_data
2015-05-19 08:01:55 +03:00
} ;
2014-11-21 18:20:10 +03:00
2015-10-03 06:42:05 +03:00
return 0 ;
2014-11-21 18:20:10 +03:00
}
2015-06-23 17:59:00 +03:00
static int ctdbd_msg_call_back ( struct ctdbd_connection * conn ,
2015-10-29 08:36:30 +03:00
struct ctdb_req_message_old * msg )
2015-05-19 16:07:33 +03:00
{
2016-04-19 17:02:49 +03:00
uint32_t msg_len ;
2015-05-19 16:07:33 +03:00
size_t i , num_callbacks ;
2015-06-02 23:30:06 +03:00
msg_len = msg - > hdr . length ;
2015-10-29 08:36:30 +03:00
if ( msg_len < offsetof ( struct ctdb_req_message_old , data ) ) {
2016-04-19 17:02:49 +03:00
DBG_DEBUG ( " len % " PRIu32 " too small \n " , msg_len ) ;
2015-06-23 17:59:00 +03:00
return 0 ;
2015-06-02 23:30:06 +03:00
}
2015-10-29 08:36:30 +03:00
msg_len - = offsetof ( struct ctdb_req_message_old , data ) ;
2015-06-02 23:30:06 +03:00
if ( msg_len < msg - > datalen ) {
2016-04-19 17:02:49 +03:00
DBG_DEBUG ( " msg_len=% " PRIu32 " < msg->datalen=% " PRIu32 " \n " ,
msg_len , msg - > datalen ) ;
2015-06-23 17:59:00 +03:00
return 0 ;
2015-06-02 23:30:06 +03:00
}
2015-05-19 16:07:33 +03:00
num_callbacks = talloc_array_length ( conn - > callbacks ) ;
for ( i = 0 ; i < num_callbacks ; i + + ) {
struct ctdbd_srvid_cb * cb = & conn - > callbacks [ i ] ;
if ( ( cb - > srvid = = msg - > srvid ) & & ( cb - > cb ! = NULL ) ) {
2015-06-23 17:59:00 +03:00
int ret ;
ret = cb - > cb ( msg - > hdr . srcnode , msg - > hdr . destnode ,
msg - > srvid , msg - > data , msg - > datalen ,
cb - > private_data ) ;
if ( ret ! = 0 ) {
return ret ;
}
2015-05-19 16:07:33 +03:00
}
}
2015-06-23 17:59:00 +03:00
return 0 ;
2015-05-19 16:07:33 +03:00
}
2007-06-10 21:02:09 +04:00
/*
* get our vnn from the cluster
*/
2015-10-03 06:42:05 +03:00
static int get_cluster_vnn ( struct ctdbd_connection * conn , uint32_t * vnn )
2007-06-10 21:02:09 +04:00
{
int32_t cstatus = - 1 ;
2015-10-03 06:00:32 +03:00
int ret ;
2016-04-13 19:47:46 +03:00
ret = ctdbd_control_local ( conn , CTDB_CONTROL_GET_PNN , 0 , 0 ,
tdb_null , NULL , NULL , & cstatus ) ;
2015-10-03 06:00:32 +03:00
if ( ret ! = 0 ) {
DEBUG ( 1 , ( " ctdbd_control failed: %s \n " , strerror ( ret ) ) ) ;
2015-10-03 06:42:05 +03:00
return ret ;
2007-06-10 21:02:09 +04:00
}
* vnn = ( uint32_t ) cstatus ;
2015-10-03 06:42:05 +03:00
return ret ;
2007-06-10 21:02:09 +04:00
}
2009-11-16 14:03:24 +03:00
/*
* Are we active ( i . e . not banned or stopped ? )
*/
static bool ctdbd_working ( struct ctdbd_connection * conn , uint32_t vnn )
{
int32_t cstatus = - 1 ;
TDB_DATA outdata ;
2015-10-29 09:22:48 +03:00
struct ctdb_node_map_old * m ;
2015-10-03 06:05:15 +03:00
bool ok = false ;
2016-04-08 16:59:08 +03:00
uint32_t i ;
int ret ;
2009-11-16 14:03:24 +03:00
2016-04-13 19:47:46 +03:00
ret = ctdbd_control_local ( conn , CTDB_CONTROL_GET_NODEMAP , 0 , 0 ,
tdb_null , talloc_tos ( ) , & outdata , & cstatus ) ;
2015-10-03 06:06:59 +03:00
if ( ret ! = 0 ) {
DEBUG ( 1 , ( " ctdbd_control failed: %s \n " , strerror ( ret ) ) ) ;
2013-01-31 14:02:52 +04:00
return false ;
2009-11-16 14:03:24 +03:00
}
if ( ( cstatus ! = 0 ) | | ( outdata . dptr = = NULL ) ) {
DEBUG ( 2 , ( " Received invalid ctdb data \n " ) ) ;
return false ;
}
2015-10-29 09:22:48 +03:00
m = ( struct ctdb_node_map_old * ) outdata . dptr ;
2009-11-16 14:03:24 +03:00
for ( i = 0 ; i < m - > num ; i + + ) {
if ( vnn = = m - > nodes [ i ] . pnn ) {
break ;
}
}
if ( i = = m - > num ) {
DEBUG ( 2 , ( " Did not find ourselves (node %d) in nodemap \n " ,
( int ) vnn ) ) ;
goto fail ;
}
2016-06-23 09:17:51 +03:00
if ( ( m - > nodes [ i ] . flags & NODE_FLAGS_INACTIVE ) ! = 0 ) {
2009-11-16 14:03:24 +03:00
DEBUG ( 2 , ( " Node has status %x, not active \n " ,
( int ) m - > nodes [ i ] . flags ) ) ;
goto fail ;
}
2015-10-03 06:05:15 +03:00
ok = true ;
2009-11-16 14:03:24 +03:00
fail :
TALLOC_FREE ( outdata . dptr ) ;
2015-10-03 06:05:15 +03:00
return ok ;
2009-11-16 14:03:24 +03:00
}
2012-07-30 17:31:59 +04:00
uint32_t ctdbd_vnn ( const struct ctdbd_connection * conn )
2007-06-10 21:02:09 +04:00
{
return conn - > our_vnn ;
}
/*
* Get us a ctdb connection
*/
2015-09-26 00:32:09 +03:00
static int ctdbd_connect ( const char * sockname , int * pfd )
2007-06-10 21:02:09 +04:00
{
2013-08-13 14:50:15 +04:00
struct sockaddr_un addr = { 0 , } ;
2007-06-10 21:02:09 +04:00
int fd ;
2012-03-24 17:48:56 +04:00
socklen_t salen ;
2014-08-19 13:20:49 +04:00
size_t namelen ;
2007-06-10 21:02:09 +04:00
fd = socket ( AF_UNIX , SOCK_STREAM , 0 ) ;
if ( fd = = - 1 ) {
2014-05-06 14:21:42 +04:00
int err = errno ;
DEBUG ( 3 , ( " Could not create socket: %s \n " , strerror ( err ) ) ) ;
return err ;
2007-06-10 21:02:09 +04:00
}
addr . sun_family = AF_UNIX ;
2014-08-19 13:20:49 +04:00
namelen = strlcpy ( addr . sun_path , sockname , sizeof ( addr . sun_path ) ) ;
if ( namelen > = sizeof ( addr . sun_path ) ) {
DEBUG ( 3 , ( " %s: Socket name too long: %s \n " , __func__ ,
sockname ) ) ;
close ( fd ) ;
return ENAMETOOLONG ;
}
2007-06-10 21:02:09 +04:00
2012-03-24 17:48:56 +04:00
salen = sizeof ( struct sockaddr_un ) ;
2014-05-06 14:21:42 +04:00
2012-03-24 17:48:56 +04:00
if ( connect ( fd , ( struct sockaddr * ) ( void * ) & addr , salen ) = = - 1 ) {
2014-05-06 14:21:42 +04:00
int err = errno ;
2008-06-24 17:09:37 +04:00
DEBUG ( 1 , ( " connect(%s) failed: %s \n " , sockname ,
2014-05-06 14:21:42 +04:00
strerror ( err ) ) ) ;
2007-06-10 21:02:09 +04:00
close ( fd ) ;
2014-05-06 14:21:42 +04:00
return err ;
2007-06-10 21:02:09 +04:00
}
2014-05-06 14:21:42 +04:00
* pfd = fd ;
return 0 ;
2007-06-10 21:02:09 +04:00
}
2015-09-26 00:36:35 +03:00
static int ctdb_read_packet ( int fd , int timeout , TALLOC_CTX * mem_ctx ,
2015-07-11 13:23:22 +03:00
struct ctdb_req_header * * result )
2009-11-03 07:41:02 +03:00
{
2014-05-06 14:21:42 +04:00
struct ctdb_req_header * req ;
uint32_t msglen ;
2015-05-17 21:23:35 +03:00
ssize_t nread ;
2009-11-03 07:41:02 +03:00
2014-07-21 16:35:39 +04:00
if ( timeout ! = - 1 ) {
2016-04-19 17:14:30 +03:00
struct pollfd pfd = { . fd = fd , . events = POLLIN } ;
int ret ;
ret = sys_poll_intr ( & pfd , 1 , timeout ) ;
2014-07-21 16:35:39 +04:00
if ( ret = = - 1 ) {
2015-07-11 13:23:22 +03:00
return errno ;
2014-07-21 16:35:39 +04:00
}
if ( ret = = 0 ) {
2015-07-11 13:23:22 +03:00
return ETIMEDOUT ;
2014-07-21 16:35:39 +04:00
}
if ( ret ! = 1 ) {
2015-07-11 13:23:22 +03:00
return EIO ;
2014-07-21 16:35:39 +04:00
}
2014-05-06 14:21:42 +04:00
}
2015-05-17 21:23:35 +03:00
nread = read_data ( fd , & msglen , sizeof ( msglen ) ) ;
if ( nread = = - 1 ) {
2015-07-11 13:23:22 +03:00
return errno ;
2015-05-17 21:23:35 +03:00
}
if ( nread = = 0 ) {
2015-07-11 13:23:22 +03:00
return EIO ;
2014-05-06 14:21:42 +04:00
}
if ( msglen < sizeof ( struct ctdb_req_header ) ) {
2015-07-11 13:23:22 +03:00
return EIO ;
2014-05-06 14:21:42 +04:00
}
req = talloc_size ( mem_ctx , msglen ) ;
if ( req = = NULL ) {
2015-07-11 13:23:22 +03:00
return ENOMEM ;
2014-05-06 14:21:42 +04:00
}
talloc_set_name_const ( req , " struct ctdb_req_header " ) ;
req - > length = msglen ;
2015-05-17 21:23:35 +03:00
nread = read_data ( fd , ( ( char * ) req ) + sizeof ( msglen ) ,
msglen - sizeof ( msglen ) ) ;
if ( nread = = - 1 ) {
2015-10-03 05:51:01 +03:00
TALLOC_FREE ( req ) ;
2015-07-11 13:23:22 +03:00
return errno ;
2015-05-17 21:23:35 +03:00
}
if ( nread = = 0 ) {
2015-10-03 05:51:01 +03:00
TALLOC_FREE ( req ) ;
2015-07-11 13:23:22 +03:00
return EIO ;
2014-05-06 14:21:42 +04:00
}
* result = req ;
2015-07-11 13:23:22 +03:00
return 0 ;
2009-11-03 07:41:02 +03:00
}
2007-06-10 21:02:09 +04:00
/*
* Read a full ctdbd request . If we have a messaging context , defer incoming
* messages that might come in between .
*/
2015-06-19 12:47:01 +03:00
static int ctdb_read_req ( struct ctdbd_connection * conn , uint32_t reqid ,
TALLOC_CTX * mem_ctx , struct ctdb_req_header * * result )
2007-06-10 21:02:09 +04:00
{
struct ctdb_req_header * hdr ;
2015-07-11 13:23:22 +03:00
int ret ;
2007-06-10 21:02:09 +04:00
2007-08-29 15:46:44 +04:00
next_pkt :
2007-06-10 21:02:09 +04:00
2015-09-26 00:36:35 +03:00
ret = ctdb_read_packet ( conn - > fd , conn - > timeout , mem_ctx , & hdr ) ;
2015-07-11 13:23:22 +03:00
if ( ret ! = 0 ) {
DEBUG ( 0 , ( " ctdb_read_packet failed: %s \n " , strerror ( ret ) ) ) ;
2007-06-10 21:02:09 +04:00
cluster_fatal ( " ctdbd died \n " ) ;
}
2012-02-07 19:41:25 +04:00
DEBUG ( 11 , ( " Received ctdb packet \n " ) ) ;
2007-08-29 15:46:44 +04:00
ctdb_packet_dump ( hdr ) ;
2007-06-10 21:02:09 +04:00
if ( hdr - > operation = = CTDB_REQ_MESSAGE ) {
2015-10-29 08:36:30 +03:00
struct ctdb_req_message_old * msg = ( struct ctdb_req_message_old * ) hdr ;
2007-06-10 21:02:09 +04:00
2015-06-23 18:04:59 +03:00
ret = ctdbd_msg_call_back ( conn , msg ) ;
if ( ret ! = 0 ) {
TALLOC_FREE ( hdr ) ;
return ret ;
}
2007-06-10 21:02:09 +04:00
TALLOC_FREE ( hdr ) ;
2007-08-29 15:46:44 +04:00
goto next_pkt ;
2007-06-10 21:02:09 +04:00
}
2011-10-26 12:58:25 +04:00
if ( ( reqid ! = 0 ) & & ( hdr - > reqid ! = reqid ) ) {
2007-06-10 21:02:09 +04:00
/* we got the wrong reply */
DEBUG ( 0 , ( " Discarding mismatched ctdb reqid %u should have "
" been %u \n " , hdr - > reqid , reqid ) ) ;
TALLOC_FREE ( hdr ) ;
2011-10-27 17:21:29 +04:00
goto next_pkt ;
2007-06-10 21:02:09 +04:00
}
2014-05-06 14:21:42 +04:00
* result = talloc_move ( mem_ctx , & hdr ) ;
2007-06-10 21:02:09 +04:00
2015-06-19 12:47:01 +03:00
return 0 ;
2007-06-10 21:02:09 +04:00
}
2017-01-09 10:17:02 +03:00
/**
* This prepares conn for handling async requests
* */
int ctdbd_setup_fde ( struct ctdbd_connection * conn , struct tevent_context * ev )
2014-05-06 14:21:42 +04:00
{
2017-04-25 18:32:43 +03:00
int ret ;
ret = set_blocking ( conn - > fd , false ) ;
if ( ret = = - 1 ) {
return errno ;
}
2017-01-09 10:17:02 +03:00
conn - > fde = tevent_add_fd ( ev ,
conn ,
conn - > fd ,
TEVENT_FD_READ ,
ctdbd_async_socket_handler ,
conn ) ;
if ( conn - > fde = = NULL ) {
return ENOMEM ;
2015-10-05 16:57:42 +03:00
}
2017-01-09 10:17:02 +03:00
2014-05-06 14:21:42 +04:00
return 0 ;
}
2017-01-09 10:17:02 +03:00
static int ctdbd_connection_destructor ( struct ctdbd_connection * c ) ;
2007-06-10 21:02:09 +04:00
/*
* Get us a ctdbd connection
*/
2016-07-09 09:48:49 +03:00
static int ctdbd_init_connection_internal ( TALLOC_CTX * mem_ctx ,
const char * sockname , int timeout ,
struct ctdbd_connection * conn )
2007-06-10 21:02:09 +04:00
{
2014-05-06 14:21:42 +04:00
int ret ;
2007-06-10 21:02:09 +04:00
2015-09-26 02:09:23 +03:00
conn - > timeout = timeout ;
2015-09-26 00:36:35 +03:00
if ( conn - > timeout = = 0 ) {
conn - > timeout = - 1 ;
}
2016-04-19 22:40:40 +03:00
ret = ctdbd_connect ( sockname , & conn - > fd ) ;
2014-05-06 14:21:42 +04:00
if ( ret ! = 0 ) {
2015-06-26 14:17:01 +03:00
DEBUG ( 1 , ( " ctdbd_connect failed: %s \n " , strerror ( ret ) ) ) ;
2016-07-09 09:48:49 +03:00
return ret ;
2007-06-10 21:02:09 +04:00
}
2014-05-06 14:21:42 +04:00
talloc_set_destructor ( conn , ctdbd_connection_destructor ) ;
2007-06-10 21:02:09 +04:00
2015-10-03 06:42:05 +03:00
ret = get_cluster_vnn ( conn , & conn - > our_vnn ) ;
if ( ret ! = 0 ) {
DEBUG ( 10 , ( " get_cluster_vnn failed: %s \n " , strerror ( ret ) ) ) ;
2016-07-09 09:48:49 +03:00
return ret ;
2007-06-10 21:02:09 +04:00
}
2009-11-16 14:03:24 +03:00
if ( ! ctdbd_working ( conn , conn - > our_vnn ) ) {
DEBUG ( 2 , ( " Node is not working, can not connect \n " ) ) ;
2016-07-09 09:48:49 +03:00
return EIO ;
2009-11-16 14:03:24 +03:00
}
2007-06-10 21:02:09 +04:00
generate_random_buffer ( ( unsigned char * ) & conn - > rand_srvid ,
sizeof ( conn - > rand_srvid ) ) ;
2015-10-03 06:42:05 +03:00
ret = register_with_ctdbd ( conn , conn - > rand_srvid , NULL , NULL ) ;
if ( ret ! = 0 ) {
2007-06-10 21:02:09 +04:00
DEBUG ( 5 , ( " Could not register random srvid: %s \n " ,
2015-10-03 06:42:05 +03:00
strerror ( ret ) ) ) ;
2016-07-09 09:48:49 +03:00
return ret ;
}
return 0 ;
}
int ctdbd_init_connection ( TALLOC_CTX * mem_ctx ,
const char * sockname , int timeout ,
struct ctdbd_connection * * pconn )
{
struct ctdbd_connection * conn ;
int ret ;
if ( ! ( conn = talloc_zero ( mem_ctx , struct ctdbd_connection ) ) ) {
DEBUG ( 0 , ( " talloc failed \n " ) ) ;
return ENOMEM ;
}
ret = ctdbd_init_connection_internal ( mem_ctx ,
sockname ,
timeout ,
conn ) ;
if ( ret ! = 0 ) {
DBG_ERR ( " ctdbd_init_connection_internal failed (%s) \n " ,
strerror ( ret ) ) ;
2007-06-10 21:02:09 +04:00
goto fail ;
}
* pconn = conn ;
2015-10-03 06:42:05 +03:00
return 0 ;
2007-06-10 21:02:09 +04:00
fail :
TALLOC_FREE ( conn ) ;
2015-10-03 06:42:05 +03:00
return ret ;
2007-06-10 21:02:09 +04:00
}
2016-07-09 09:59:09 +03:00
int ctdbd_reinit_connection ( TALLOC_CTX * mem_ctx ,
const char * sockname , int timeout ,
struct ctdbd_connection * conn )
{
int ret ;
ret = ctdbd_connection_destructor ( conn ) ;
if ( ret ! = 0 ) {
DBG_ERR ( " ctdbd_connection_destructor failed \n " ) ;
return ret ;
}
ret = ctdbd_init_connection_internal ( mem_ctx ,
sockname ,
timeout ,
conn ) ;
if ( ret ! = 0 ) {
DBG_ERR ( " ctdbd_init_connection_internal failed (%s) \n " ,
strerror ( ret ) ) ;
return ret ;
}
return 0 ;
}
2010-01-23 02:05:15 +03:00
int ctdbd_conn_get_fd ( struct ctdbd_connection * conn )
{
2014-05-06 14:21:42 +04:00
return conn - > fd ;
2010-01-23 02:05:15 +03:00
}
2007-06-10 21:02:09 +04:00
/*
* Packet handler to receive and handle a ctdb message
*/
2015-06-19 12:47:01 +03:00
static int ctdb_handle_message ( struct ctdbd_connection * conn ,
struct ctdb_req_header * hdr )
2007-06-10 21:02:09 +04:00
{
2015-10-29 08:36:30 +03:00
struct ctdb_req_message_old * msg ;
2007-06-10 21:02:09 +04:00
2014-05-06 14:21:42 +04:00
if ( hdr - > operation ! = CTDB_REQ_MESSAGE ) {
2007-06-10 21:02:09 +04:00
DEBUG ( 0 , ( " Received async msg of type %u, discarding \n " ,
2014-05-06 14:21:42 +04:00
hdr - > operation ) ) ;
2015-06-19 12:47:01 +03:00
return EINVAL ;
2007-06-10 21:02:09 +04:00
}
2015-10-29 08:36:30 +03:00
msg = ( struct ctdb_req_message_old * ) hdr ;
2014-05-06 14:21:42 +04:00
2015-05-19 16:07:33 +03:00
ctdbd_msg_call_back ( conn , msg ) ;
2015-06-19 12:47:01 +03:00
return 0 ;
2007-06-10 21:02:09 +04:00
}
2016-04-24 18:36:00 +03:00
void ctdbd_socket_readable ( struct ctdbd_connection * conn )
2007-06-10 21:02:09 +04:00
{
2015-03-03 10:48:00 +03:00
struct ctdb_req_header * hdr = NULL ;
2015-07-11 13:23:22 +03:00
int ret ;
2007-06-10 21:02:09 +04:00
2015-09-26 00:36:35 +03:00
ret = ctdb_read_packet ( conn - > fd , conn - > timeout , talloc_tos ( ) , & hdr ) ;
2015-07-11 13:23:22 +03:00
if ( ret ! = 0 ) {
DEBUG ( 0 , ( " ctdb_read_packet failed: %s \n " , strerror ( ret ) ) ) ;
2007-06-10 21:02:09 +04:00
cluster_fatal ( " ctdbd died \n " ) ;
}
2015-06-19 12:47:01 +03:00
ret = ctdb_handle_message ( conn , hdr ) ;
2015-06-15 17:47:50 +03:00
TALLOC_FREE ( hdr ) ;
2015-06-19 12:47:01 +03:00
if ( ret ! = 0 ) {
2014-05-06 14:21:42 +04:00
DEBUG ( 10 , ( " could not handle incoming message: %s \n " ,
2015-06-19 12:47:01 +03:00
strerror ( ret ) ) ) ;
2007-06-10 21:02:09 +04:00
}
}
2017-01-09 10:17:02 +03:00
static int ctdb_pkt_send_handler ( struct ctdbd_connection * conn ) ;
static int ctdb_pkt_recv_handler ( struct ctdbd_connection * conn ) ;
/* Used for async connection and async ctcb requests */
static void ctdbd_async_socket_handler ( struct tevent_context * ev ,
struct tevent_fd * fde ,
uint16_t flags ,
void * private_data )
{
struct ctdbd_connection * conn = talloc_get_type_abort (
private_data , struct ctdbd_connection ) ;
int ret ;
if ( ( flags & TEVENT_FD_READ ) ! = 0 ) {
ret = ctdb_pkt_recv_handler ( conn ) ;
if ( ret ! = 0 ) {
DBG_DEBUG ( " ctdb_read_iov_handler returned %s \n " ,
strerror ( ret ) ) ;
}
return ;
}
if ( ( flags & TEVENT_FD_WRITE ) ! = 0 ) {
ret = ctdb_pkt_send_handler ( conn ) ;
if ( ret ! = 0 ) {
DBG_DEBUG ( " ctdb_write_iov_handler returned %s \n " ,
strerror ( ret ) ) ;
return ;
}
return ;
}
return ;
}
2015-10-03 06:42:05 +03:00
int ctdbd_messaging_send_iov ( struct ctdbd_connection * conn ,
uint32_t dst_vnn , uint64_t dst_srvid ,
const struct iovec * iov , int iovlen )
2012-03-30 13:10:47 +04:00
{
2015-10-29 08:36:30 +03:00
struct ctdb_req_message_old r ;
2015-05-25 09:50:35 +03:00
struct iovec iov2 [ iovlen + 1 ] ;
size_t buflen = iov_buflen ( iov , iovlen ) ;
2014-05-06 14:21:42 +04:00
ssize_t nwritten ;
2012-03-30 13:10:47 +04:00
2015-10-29 08:36:30 +03:00
r . hdr . length = offsetof ( struct ctdb_req_message_old , data ) + buflen ;
2007-06-10 21:02:09 +04:00
r . hdr . ctdb_magic = CTDB_MAGIC ;
2014-10-21 04:53:29 +04:00
r . hdr . ctdb_version = CTDB_PROTOCOL ;
2007-06-10 21:02:09 +04:00
r . hdr . generation = 1 ;
r . hdr . operation = CTDB_REQ_MESSAGE ;
r . hdr . destnode = dst_vnn ;
r . hdr . srcnode = conn - > our_vnn ;
r . hdr . reqid = 0 ;
r . srvid = dst_srvid ;
2012-03-30 13:10:47 +04:00
r . datalen = buflen ;
2007-06-10 21:02:09 +04:00
2007-08-29 15:46:44 +04:00
DEBUG ( 10 , ( " ctdbd_messaging_send: Sending ctdb packet \n " ) ) ;
ctdb_packet_dump ( & r . hdr ) ;
2015-05-25 09:50:35 +03:00
iov2 [ 0 ] . iov_base = & r ;
2015-10-29 08:36:30 +03:00
iov2 [ 0 ] . iov_len = offsetof ( struct ctdb_req_message_old , data ) ;
2015-05-25 09:50:35 +03:00
memcpy ( & iov2 [ 1 ] , iov , iovlen * sizeof ( struct iovec ) ) ;
2007-06-10 21:02:09 +04:00
2015-05-25 09:50:35 +03:00
nwritten = write_data_iov ( conn - > fd , iov2 , iovlen + 1 ) ;
2014-05-06 14:21:42 +04:00
if ( nwritten = = - 1 ) {
DEBUG ( 3 , ( " write_data_iov failed: %s \n " , strerror ( errno ) ) ) ;
2007-06-10 21:02:09 +04:00
cluster_fatal ( " cluster dispatch daemon msg write error \n " ) ;
}
2014-05-06 14:21:42 +04:00
2015-10-03 06:42:05 +03:00
return 0 ;
2007-06-10 21:02:09 +04:00
}
/*
* send / recv a generic ctdb control message
*/
2015-10-03 06:31:52 +03:00
static int ctdbd_control ( struct ctdbd_connection * conn ,
uint32_t vnn , uint32_t opcode ,
uint64_t srvid , uint32_t flags ,
TDB_DATA data ,
TALLOC_CTX * mem_ctx , TDB_DATA * outdata ,
2016-04-21 14:31:39 +03:00
int32_t * cstatus )
2007-06-10 21:02:09 +04:00
{
2015-10-29 08:42:05 +03:00
struct ctdb_req_control_old req ;
2014-05-06 14:21:42 +04:00
struct ctdb_req_header * hdr ;
2015-10-29 08:44:08 +03:00
struct ctdb_reply_control_old * reply = NULL ;
2014-05-06 14:21:42 +04:00
struct iovec iov [ 2 ] ;
ssize_t nwritten ;
2015-06-19 12:47:01 +03:00
int ret ;
2007-06-10 21:02:09 +04:00
2017-01-09 10:17:02 +03:00
if ( ctdbd_conn_has_async_reqs ( conn ) ) {
/*
* Can ' t use sync call while an async call is in flight . Adding
* this check as a safety net . We ' ll be using different
* connections for sync and async requests , so this shouldn ' t
* happen , but who knows . . .
*/
DBG_ERR ( " Async ctdb req on sync connection \n " ) ;
return EINVAL ;
}
2007-06-10 21:02:09 +04:00
ZERO_STRUCT ( req ) ;
2015-10-29 08:42:05 +03:00
req . hdr . length = offsetof ( struct ctdb_req_control_old , data ) + data . dsize ;
2007-06-10 21:02:09 +04:00
req . hdr . ctdb_magic = CTDB_MAGIC ;
2014-10-21 04:53:29 +04:00
req . hdr . ctdb_version = CTDB_PROTOCOL ;
2007-06-10 21:02:09 +04:00
req . hdr . operation = CTDB_REQ_CONTROL ;
2011-10-26 12:56:32 +04:00
req . hdr . reqid = ctdbd_next_reqid ( conn ) ;
2007-06-10 21:02:09 +04:00
req . hdr . destnode = vnn ;
req . opcode = opcode ;
req . srvid = srvid ;
req . datalen = data . dsize ;
2011-03-08 18:26:34 +03:00
req . flags = flags ;
2007-06-10 21:02:09 +04:00
2016-04-21 12:21:26 +03:00
DBG_DEBUG ( " Sending ctdb packet reqid=% " PRIu32 " , vnn=% " PRIu32 " , "
" opcode=% " PRIu32 " , srvid=% " PRIu64 " \n " , req . hdr . reqid ,
req . hdr . destnode , req . opcode , req . srvid ) ;
2007-08-29 15:46:44 +04:00
ctdb_packet_dump ( & req . hdr ) ;
2014-05-06 14:21:42 +04:00
iov [ 0 ] . iov_base = & req ;
2015-10-29 08:42:05 +03:00
iov [ 0 ] . iov_len = offsetof ( struct ctdb_req_control_old , data ) ;
2014-05-06 14:21:42 +04:00
iov [ 1 ] . iov_base = data . dptr ;
iov [ 1 ] . iov_len = data . dsize ;
2007-06-10 21:02:09 +04:00
2014-05-06 14:21:42 +04:00
nwritten = write_data_iov ( conn - > fd , iov , ARRAY_SIZE ( iov ) ) ;
if ( nwritten = = - 1 ) {
DEBUG ( 3 , ( " write_data_iov failed: %s \n " , strerror ( errno ) ) ) ;
cluster_fatal ( " cluster dispatch daemon msg write error \n " ) ;
2007-06-10 21:02:09 +04:00
}
2008-08-07 10:20:05 +04:00
if ( flags & CTDB_CTRL_FLAG_NOREPLY ) {
2010-12-23 18:43:55 +03:00
if ( cstatus ) {
* cstatus = 0 ;
}
2015-10-03 05:54:31 +03:00
return 0 ;
2008-08-07 10:20:05 +04:00
}
2015-06-19 12:47:01 +03:00
ret = ctdb_read_req ( conn , req . hdr . reqid , NULL , & hdr ) ;
if ( ret ! = 0 ) {
DEBUG ( 10 , ( " ctdb_read_req failed: %s \n " , strerror ( ret ) ) ) ;
2015-10-03 05:54:31 +03:00
return ret ;
2007-06-10 21:02:09 +04:00
}
2014-05-06 14:21:42 +04:00
if ( hdr - > operation ! = CTDB_REPLY_CONTROL ) {
2007-06-10 21:02:09 +04:00
DEBUG ( 0 , ( " received invalid reply \n " ) ) ;
2015-10-03 05:54:31 +03:00
TALLOC_FREE ( hdr ) ;
return EIO ;
2007-06-10 21:02:09 +04:00
}
2015-10-29 08:44:08 +03:00
reply = ( struct ctdb_reply_control_old * ) hdr ;
2007-06-10 21:02:09 +04:00
if ( outdata ) {
2015-05-10 02:33:10 +03:00
if ( ! ( outdata - > dptr = ( uint8_t * ) talloc_memdup (
2007-06-10 21:02:09 +04:00
mem_ctx , reply - > data , reply - > datalen ) ) ) {
TALLOC_FREE ( reply ) ;
2015-10-03 05:54:31 +03:00
return ENOMEM ;
2007-06-10 21:02:09 +04:00
}
outdata - > dsize = reply - > datalen ;
}
if ( cstatus ) {
( * cstatus ) = reply - > status ;
}
TALLOC_FREE ( reply ) ;
2015-10-03 05:54:31 +03:00
return ret ;
}
2007-06-10 21:02:09 +04:00
/*
* see if a remote process exists
*/
2012-07-30 17:31:59 +04:00
bool ctdbd_process_exists ( struct ctdbd_connection * conn , uint32_t vnn , pid_t pid )
2007-06-10 21:02:09 +04:00
{
2016-04-11 18:15:29 +03:00
int32_t cstatus = 0 ;
int ret ;
2007-06-10 21:02:09 +04:00
2016-04-11 18:15:29 +03:00
ret = ctdbd_control ( conn , vnn , CTDB_CONTROL_PROCESS_EXISTS , 0 , 0 ,
( TDB_DATA ) { . dptr = ( uint8_t * ) & pid ,
. dsize = sizeof ( pid ) } ,
NULL , NULL , & cstatus ) ;
if ( ret ! = 0 ) {
2011-10-26 13:36:21 +04:00
return false ;
2007-06-10 21:02:09 +04:00
}
2016-04-11 18:15:29 +03:00
return ( cstatus = = 0 ) ;
2011-10-23 23:38:54 +04:00
}
2007-06-10 21:02:09 +04:00
/*
* Get a db path
*/
char * ctdbd_dbpath ( struct ctdbd_connection * conn ,
TALLOC_CTX * mem_ctx , uint32_t db_id )
{
2015-10-03 06:08:53 +03:00
int ret ;
2007-06-10 21:02:09 +04:00
TDB_DATA data ;
2015-03-04 11:43:19 +03:00
TDB_DATA rdata = { 0 } ;
2015-03-03 10:48:00 +03:00
int32_t cstatus = 0 ;
2007-06-10 21:02:09 +04:00
data . dptr = ( uint8_t * ) & db_id ;
data . dsize = sizeof ( db_id ) ;
2016-04-13 19:47:46 +03:00
ret = ctdbd_control_local ( conn , CTDB_CONTROL_GETDBPATH , 0 , 0 , data ,
mem_ctx , & rdata , & cstatus ) ;
2015-10-03 06:08:53 +03:00
if ( ( ret ! = 0 ) | | cstatus ! = 0 ) {
DEBUG ( 0 , ( __location__ " ctdb_control for getdbpath failed: %s \n " ,
strerror ( ret ) ) ) ;
2007-06-10 21:02:09 +04:00
return NULL ;
}
2015-03-04 11:43:19 +03:00
return ( char * ) rdata . dptr ;
2007-06-10 21:02:09 +04:00
}
/*
* attach to a ctdb database
*/
2015-10-03 06:42:05 +03:00
int ctdbd_db_attach ( struct ctdbd_connection * conn ,
const char * name , uint32_t * db_id , int tdb_flags )
2007-06-10 21:02:09 +04:00
{
2015-10-03 06:08:53 +03:00
int ret ;
2007-06-10 21:02:09 +04:00
TDB_DATA data ;
int32_t cstatus ;
2008-03-17 16:12:10 +03:00
bool persistent = ( tdb_flags & TDB_CLEAR_IF_FIRST ) = = 0 ;
2007-06-10 21:02:09 +04:00
2012-06-20 13:47:53 +04:00
data = string_term_tdb_data ( name ) ;
2007-06-10 21:02:09 +04:00
2016-04-13 19:47:46 +03:00
ret = ctdbd_control_local ( conn ,
persistent
? CTDB_CONTROL_DB_ATTACH_PERSISTENT
: CTDB_CONTROL_DB_ATTACH ,
tdb_flags , 0 , data , NULL , & data , & cstatus ) ;
2015-10-03 06:08:53 +03:00
if ( ret ! = 0 ) {
2007-06-10 21:02:09 +04:00
DEBUG ( 0 , ( __location__ " ctdb_control for db_attach "
2015-10-03 06:08:53 +03:00
" failed: %s \n " , strerror ( ret ) ) ) ;
2015-10-03 06:42:05 +03:00
return ret ;
2007-06-10 21:02:09 +04:00
}
if ( cstatus ! = 0 | | data . dsize ! = sizeof ( uint32_t ) ) {
DEBUG ( 0 , ( __location__ " ctdb_control for db_attach failed \n " ) ) ;
2015-10-03 06:42:05 +03:00
return EIO ;
2007-06-10 21:02:09 +04:00
}
* db_id = * ( uint32_t * ) data . dptr ;
talloc_free ( data . dptr ) ;
if ( ! ( tdb_flags & TDB_SEQNUM ) ) {
2015-10-03 06:42:05 +03:00
return 0 ;
2007-06-10 21:02:09 +04:00
}
data . dptr = ( uint8_t * ) db_id ;
data . dsize = sizeof ( * db_id ) ;
2016-04-13 19:47:46 +03:00
ret = ctdbd_control_local ( conn , CTDB_CONTROL_ENABLE_SEQNUM , 0 , 0 , data ,
NULL , NULL , & cstatus ) ;
2015-10-03 06:08:53 +03:00
if ( ( ret ! = 0 ) | | cstatus ! = 0 ) {
DEBUG ( 0 , ( __location__ " ctdb_control for enable seqnum "
" failed: %s \n " , strerror ( ret ) ) ) ;
2015-10-03 06:42:05 +03:00
return ( ret = = 0 ) ? EIO : ret ;
2007-06-10 21:02:09 +04:00
}
2015-10-03 06:42:05 +03:00
return 0 ;
2007-06-10 21:02:09 +04:00
}
/*
* force the migration of a record to this node
*/
2015-10-03 06:42:05 +03:00
int ctdbd_migrate ( struct ctdbd_connection * conn , uint32_t db_id , TDB_DATA key )
2007-06-10 21:02:09 +04:00
{
2015-10-29 08:26:29 +03:00
struct ctdb_req_call_old req ;
2016-10-10 17:26:05 +03:00
struct ctdb_req_header * hdr = NULL ;
2014-05-06 14:21:42 +04:00
struct iovec iov [ 2 ] ;
ssize_t nwritten ;
2015-06-19 12:47:01 +03:00
int ret ;
2007-06-10 21:02:09 +04:00
2017-01-09 10:17:02 +03:00
if ( ctdbd_conn_has_async_reqs ( conn ) ) {
/*
* Can ' t use sync call while an async call is in flight . Adding
* this check as a safety net . We ' ll be using different
* connections for sync and async requests , so this shouldn ' t
* happen , but who knows . . .
*/
DBG_ERR ( " Async ctdb req on sync connection \n " ) ;
return EINVAL ;
}
2007-06-10 21:02:09 +04:00
ZERO_STRUCT ( req ) ;
2009-08-07 14:09:21 +04:00
2015-10-29 08:26:29 +03:00
req . hdr . length = offsetof ( struct ctdb_req_call_old , data ) + key . dsize ;
2007-06-10 21:02:09 +04:00
req . hdr . ctdb_magic = CTDB_MAGIC ;
2014-10-21 04:53:29 +04:00
req . hdr . ctdb_version = CTDB_PROTOCOL ;
2007-06-10 21:02:09 +04:00
req . hdr . operation = CTDB_REQ_CALL ;
2011-10-26 12:56:32 +04:00
req . hdr . reqid = ctdbd_next_reqid ( conn ) ;
2007-06-10 21:02:09 +04:00
req . flags = CTDB_IMMEDIATE_MIGRATION ;
req . callid = CTDB_NULL_FUNC ;
req . db_id = db_id ;
req . keylen = key . dsize ;
2007-08-29 15:46:44 +04:00
DEBUG ( 10 , ( " ctdbd_migrate: Sending ctdb packet \n " ) ) ;
ctdb_packet_dump ( & req . hdr ) ;
2014-05-06 14:21:42 +04:00
iov [ 0 ] . iov_base = & req ;
2015-10-29 08:26:29 +03:00
iov [ 0 ] . iov_len = offsetof ( struct ctdb_req_call_old , data ) ;
2014-05-06 14:21:42 +04:00
iov [ 1 ] . iov_base = key . dptr ;
iov [ 1 ] . iov_len = key . dsize ;
2007-06-10 21:02:09 +04:00
2014-05-06 14:21:42 +04:00
nwritten = write_data_iov ( conn - > fd , iov , ARRAY_SIZE ( iov ) ) ;
if ( nwritten = = - 1 ) {
DEBUG ( 3 , ( " write_data_iov failed: %s \n " , strerror ( errno ) ) ) ;
cluster_fatal ( " cluster dispatch daemon msg write error \n " ) ;
2007-06-10 21:02:09 +04:00
}
2015-06-19 12:47:01 +03:00
ret = ctdb_read_req ( conn , req . hdr . reqid , NULL , & hdr ) ;
if ( ret ! = 0 ) {
DEBUG ( 10 , ( " ctdb_read_req failed: %s \n " , strerror ( ret ) ) ) ;
2007-06-10 21:02:09 +04:00
goto fail ;
}
2014-05-06 14:21:42 +04:00
if ( hdr - > operation ! = CTDB_REPLY_CALL ) {
2016-06-24 12:22:02 +03:00
if ( hdr - > operation = = CTDB_REPLY_ERROR ) {
DBG_ERR ( " received error from ctdb \n " ) ;
} else {
DBG_ERR ( " received invalid reply \n " ) ;
}
ret = EIO ;
2007-06-10 21:02:09 +04:00
goto fail ;
}
fail :
2014-05-06 14:21:42 +04:00
TALLOC_FREE ( hdr ) ;
2015-10-03 06:42:05 +03:00
return ret ;
2007-06-10 21:02:09 +04:00
}
2012-11-23 20:54:57 +04:00
/*
* Fetch a record and parse it
*/
2015-10-03 06:42:05 +03:00
int ctdbd_parse ( struct ctdbd_connection * conn , uint32_t db_id ,
TDB_DATA key , bool local_copy ,
void ( * parser ) ( TDB_DATA key , TDB_DATA data ,
void * private_data ) ,
void * private_data )
2012-11-23 20:54:57 +04:00
{
2015-10-29 08:26:29 +03:00
struct ctdb_req_call_old req ;
2014-05-06 14:21:42 +04:00
struct ctdb_req_header * hdr = NULL ;
2015-10-29 08:29:01 +03:00
struct ctdb_reply_call_old * reply ;
2014-05-06 14:21:42 +04:00
struct iovec iov [ 2 ] ;
ssize_t nwritten ;
2012-11-23 20:54:57 +04:00
uint32_t flags ;
2015-06-19 12:47:01 +03:00
int ret ;
2012-11-23 20:54:57 +04:00
2017-01-09 10:17:02 +03:00
if ( ctdbd_conn_has_async_reqs ( conn ) ) {
/*
* Can ' t use sync call while an async call is in flight . Adding
* this check as a safety net . We ' ll be using different
* connections for sync and async requests , so this shouldn ' t
* happen , but who knows . . .
*/
DBG_ERR ( " Async ctdb req on sync connection \n " ) ;
return EINVAL ;
}
2012-11-23 20:54:57 +04:00
flags = local_copy ? CTDB_WANT_READONLY : 0 ;
ZERO_STRUCT ( req ) ;
2015-10-29 08:26:29 +03:00
req . hdr . length = offsetof ( struct ctdb_req_call_old , data ) + key . dsize ;
2012-11-23 20:54:57 +04:00
req . hdr . ctdb_magic = CTDB_MAGIC ;
2014-10-21 04:53:29 +04:00
req . hdr . ctdb_version = CTDB_PROTOCOL ;
2012-11-23 20:54:57 +04:00
req . hdr . operation = CTDB_REQ_CALL ;
req . hdr . reqid = ctdbd_next_reqid ( conn ) ;
req . flags = flags ;
req . callid = CTDB_FETCH_FUNC ;
req . db_id = db_id ;
req . keylen = key . dsize ;
2014-05-06 14:21:42 +04:00
iov [ 0 ] . iov_base = & req ;
2015-10-29 08:26:29 +03:00
iov [ 0 ] . iov_len = offsetof ( struct ctdb_req_call_old , data ) ;
2014-05-06 14:21:42 +04:00
iov [ 1 ] . iov_base = key . dptr ;
iov [ 1 ] . iov_len = key . dsize ;
2012-11-23 20:54:57 +04:00
2014-05-06 14:21:42 +04:00
nwritten = write_data_iov ( conn - > fd , iov , ARRAY_SIZE ( iov ) ) ;
if ( nwritten = = - 1 ) {
DEBUG ( 3 , ( " write_data_iov failed: %s \n " , strerror ( errno ) ) ) ;
cluster_fatal ( " cluster dispatch daemon msg write error \n " ) ;
2012-11-23 20:54:57 +04:00
}
2015-06-19 12:47:01 +03:00
ret = ctdb_read_req ( conn , req . hdr . reqid , NULL , & hdr ) ;
if ( ret ! = 0 ) {
DEBUG ( 10 , ( " ctdb_read_req failed: %s \n " , strerror ( ret ) ) ) ;
2012-11-23 20:54:57 +04:00
goto fail ;
}
2015-04-23 19:06:17 +03:00
if ( ( hdr = = NULL ) | | ( hdr - > operation ! = CTDB_REPLY_CALL ) ) {
2012-11-23 20:54:57 +04:00
DEBUG ( 0 , ( " received invalid reply \n " ) ) ;
2015-10-03 06:42:05 +03:00
ret = EIO ;
2012-11-23 20:54:57 +04:00
goto fail ;
}
2015-10-29 08:29:01 +03:00
reply = ( struct ctdb_reply_call_old * ) hdr ;
2012-11-23 20:54:57 +04:00
2013-08-28 15:34:08 +04:00
if ( reply - > datalen = = 0 ) {
/*
* Treat an empty record as non - existing
*/
2015-10-03 06:42:05 +03:00
ret = ENOENT ;
2013-08-28 15:34:08 +04:00
goto fail ;
}
2012-11-23 20:54:57 +04:00
parser ( key , make_tdb_data ( & reply - > data [ 0 ] , reply - > datalen ) ,
private_data ) ;
2015-10-03 06:42:05 +03:00
ret = 0 ;
2012-11-23 20:54:57 +04:00
fail :
2014-05-06 14:21:42 +04:00
TALLOC_FREE ( hdr ) ;
2015-10-03 06:42:05 +03:00
return ret ;
2007-06-10 21:02:09 +04:00
}
/*
2016-04-05 18:30:11 +03:00
Traverse a ctdb database . " conn " must be an otherwise unused
ctdb_connection where no other messages but the traverse ones are
expected .
2007-06-10 21:02:09 +04:00
*/
2016-04-05 18:30:11 +03:00
int ctdbd_traverse ( struct ctdbd_connection * conn , uint32_t db_id ,
2007-06-10 21:02:09 +04:00
void ( * fn ) ( TDB_DATA key , TDB_DATA data ,
void * private_data ) ,
void * private_data )
{
2015-10-03 06:08:53 +03:00
int ret ;
2014-05-06 14:21:42 +04:00
TDB_DATA key , data ;
2007-06-10 21:02:09 +04:00
struct ctdb_traverse_start t ;
2016-04-21 14:31:39 +03:00
int32_t cstatus ;
2007-06-10 21:02:09 +04:00
2017-01-09 10:17:02 +03:00
if ( ctdbd_conn_has_async_reqs ( conn ) ) {
/*
* Can ' t use sync call while an async call is in flight . Adding
* this check as a safety net . We ' ll be using different
* connections for sync and async requests , so this shouldn ' t
* happen , but who knows . . .
*/
DBG_ERR ( " Async ctdb req on sync connection \n " ) ;
return EINVAL ;
}
2007-06-10 21:02:09 +04:00
t . db_id = db_id ;
t . srvid = conn - > rand_srvid ;
2011-10-26 12:56:32 +04:00
t . reqid = ctdbd_next_reqid ( conn ) ;
2007-06-10 21:02:09 +04:00
data . dptr = ( uint8_t * ) & t ;
data . dsize = sizeof ( t ) ;
2016-04-13 19:47:46 +03:00
ret = ctdbd_control_local ( conn , CTDB_CONTROL_TRAVERSE_START ,
conn - > rand_srvid ,
0 , data , NULL , NULL , & cstatus ) ;
2007-06-10 21:02:09 +04:00
2015-10-03 06:08:53 +03:00
if ( ( ret ! = 0 ) | | ( cstatus ! = 0 ) ) {
DEBUG ( 0 , ( " ctdbd_control failed: %s, %d \n " , strerror ( ret ) ,
2007-06-10 21:02:09 +04:00
cstatus ) ) ;
2015-10-03 06:42:05 +03:00
if ( ret = = 0 ) {
2007-06-10 21:02:09 +04:00
/*
* We need a mapping here
*/
2015-10-03 06:42:05 +03:00
ret = EIO ;
2007-06-10 21:02:09 +04:00
}
2015-10-03 06:42:05 +03:00
return ret ;
2007-06-10 21:02:09 +04:00
}
2016-04-08 17:14:33 +03:00
while ( true ) {
2015-03-03 10:48:00 +03:00
struct ctdb_req_header * hdr = NULL ;
2015-10-29 08:36:30 +03:00
struct ctdb_req_message_old * m ;
2015-10-29 09:30:30 +03:00
struct ctdb_rec_data_old * d ;
2007-06-10 21:02:09 +04:00
2015-09-26 00:36:35 +03:00
ret = ctdb_read_packet ( conn - > fd , conn - > timeout , conn , & hdr ) ;
2015-07-11 13:23:22 +03:00
if ( ret ! = 0 ) {
2014-05-06 14:21:42 +04:00
DEBUG ( 0 , ( " ctdb_read_packet failed: %s \n " ,
2015-07-11 13:23:22 +03:00
strerror ( ret ) ) ) ;
2014-05-06 14:21:42 +04:00
cluster_fatal ( " ctdbd died \n " ) ;
}
2007-06-10 21:02:09 +04:00
2014-05-06 14:21:42 +04:00
if ( hdr - > operation ! = CTDB_REQ_MESSAGE ) {
DEBUG ( 0 , ( " Got operation %u, expected a message \n " ,
( unsigned ) hdr - > operation ) ) ;
2015-10-03 06:42:05 +03:00
return EIO ;
2007-06-10 21:02:09 +04:00
}
2015-10-29 08:36:30 +03:00
m = ( struct ctdb_req_message_old * ) hdr ;
2015-10-29 09:30:30 +03:00
d = ( struct ctdb_rec_data_old * ) & m - > data [ 0 ] ;
2014-05-06 14:21:42 +04:00
if ( m - > datalen < sizeof ( uint32_t ) | | m - > datalen ! = d - > length ) {
DEBUG ( 0 , ( " Got invalid traverse data of length %d \n " ,
( int ) m - > datalen ) ) ;
2015-10-03 06:42:05 +03:00
return EIO ;
2007-06-10 21:02:09 +04:00
}
2014-05-06 14:21:42 +04:00
key . dsize = d - > keylen ;
key . dptr = & d - > data [ 0 ] ;
data . dsize = d - > datalen ;
data . dptr = & d - > data [ d - > keylen ] ;
2007-06-10 21:02:09 +04:00
2014-05-06 14:21:42 +04:00
if ( key . dsize = = 0 & & data . dsize = = 0 ) {
/* end of traverse */
2015-10-03 06:42:05 +03:00
return 0 ;
2007-07-13 14:40:53 +04:00
}
2014-05-06 14:21:42 +04:00
if ( data . dsize < sizeof ( struct ctdb_ltdb_header ) ) {
DEBUG ( 0 , ( " Got invalid ltdb header length %d \n " ,
( int ) data . dsize ) ) ;
2015-10-03 06:42:05 +03:00
return EIO ;
2007-06-10 21:02:09 +04:00
}
2014-05-06 14:21:42 +04:00
data . dsize - = sizeof ( struct ctdb_ltdb_header ) ;
data . dptr + = sizeof ( struct ctdb_ltdb_header ) ;
2007-06-10 21:02:09 +04:00
2014-05-06 14:21:42 +04:00
if ( fn ! = NULL ) {
fn ( key , data , private_data ) ;
2007-06-10 21:02:09 +04:00
}
}
2015-10-03 06:42:05 +03:00
return 0 ;
2007-06-10 21:02:09 +04:00
}
2009-01-28 20:55:13 +03:00
/*
This is used to canonicalize a ctdb_sock_addr structure .
*/
static void smbd_ctdb_canonicalize_ip ( const struct sockaddr_storage * in ,
struct sockaddr_storage * out )
{
memcpy ( out , in , sizeof ( * out ) ) ;
# ifdef HAVE_IPV6
if ( in - > ss_family = = AF_INET6 ) {
const char prefix [ 12 ] = { 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0 , 0xff , 0xff } ;
2012-06-20 13:47:32 +04:00
const struct sockaddr_in6 * in6 =
( const struct sockaddr_in6 * ) in ;
2009-01-28 20:55:13 +03:00
struct sockaddr_in * out4 = ( struct sockaddr_in * ) out ;
if ( memcmp ( & in6 - > sin6_addr , prefix , 12 ) = = 0 ) {
memset ( out , 0 , sizeof ( * out ) ) ;
# ifdef HAVE_SOCK_SIN_LEN
out4 - > sin_len = sizeof ( * out ) ;
# endif
out4 - > sin_family = AF_INET ;
out4 - > sin_port = in6 - > sin6_port ;
2012-02-06 19:53:22 +04:00
memcpy ( & out4 - > sin_addr , & in6 - > sin6_addr . s6_addr [ 12 ] , 4 ) ;
2009-01-28 20:55:13 +03:00
}
}
# endif
}
2007-06-10 21:02:09 +04:00
/*
* Register us as a server for a particular tcp connection
*/
2015-10-03 06:42:05 +03:00
int ctdbd_register_ips ( struct ctdbd_connection * conn ,
const struct sockaddr_storage * _server ,
const struct sockaddr_storage * _client ,
int ( * cb ) ( uint32_t src_vnn , uint32_t dst_vnn ,
uint64_t dst_srvid ,
const uint8_t * msg , size_t msglen ,
void * private_data ) ,
void * private_data )
2007-06-10 21:02:09 +04:00
{
2015-10-29 06:25:34 +03:00
struct ctdb_connection p ;
2015-05-20 09:12:46 +03:00
TDB_DATA data = { . dptr = ( uint8_t * ) & p , . dsize = sizeof ( p ) } ;
2015-10-03 06:08:53 +03:00
int ret ;
2009-01-28 20:55:13 +03:00
struct sockaddr_storage client ;
struct sockaddr_storage server ;
2007-06-10 21:02:09 +04:00
/*
* Only one connection so far
*/
2009-01-28 20:55:13 +03:00
smbd_ctdb_canonicalize_ip ( _client , & client ) ;
smbd_ctdb_canonicalize_ip ( _server , & server ) ;
switch ( client . ss_family ) {
2008-12-18 17:02:42 +03:00
case AF_INET :
2015-10-29 06:25:34 +03:00
memcpy ( & p . dst . ip , & server , sizeof ( p . dst . ip ) ) ;
2015-03-23 09:32:34 +03:00
memcpy ( & p . src . ip , & client , sizeof ( p . src . ip ) ) ;
2008-12-18 17:02:42 +03:00
break ;
case AF_INET6 :
2015-10-29 06:25:34 +03:00
memcpy ( & p . dst . ip6 , & server , sizeof ( p . dst . ip6 ) ) ;
2011-10-21 14:04:59 +04:00
memcpy ( & p . src . ip6 , & client , sizeof ( p . src . ip6 ) ) ;
2008-12-18 17:02:42 +03:00
break ;
default :
2015-10-03 06:42:05 +03:00
return EIO ;
2008-12-18 17:02:42 +03:00
}
2007-06-10 21:02:09 +04:00
/*
* We want to be told about IP releases
*/
2015-10-03 06:42:05 +03:00
ret = register_with_ctdbd ( conn , CTDB_SRVID_RELEASE_IP ,
cb , private_data ) ;
if ( ret ! = 0 ) {
2015-10-03 06:42:05 +03:00
return ret ;
2007-06-10 21:02:09 +04:00
}
/*
* inform ctdb of our tcp connection , so if IP takeover happens ctdb
* can send an extra ack to trigger a reset for our client , so it
* immediately reconnects
*/
2015-10-03 06:31:52 +03:00
ret = ctdbd_control ( conn , CTDB_CURRENT_NODE ,
CTDB_CONTROL_TCP_CLIENT , 0 ,
CTDB_CTRL_FLAG_NOREPLY , data , NULL , NULL ,
NULL ) ;
2015-10-03 06:08:53 +03:00
if ( ret ! = 0 ) {
2015-10-03 06:42:05 +03:00
return ret ;
2015-10-03 06:08:53 +03:00
}
2015-10-03 06:42:05 +03:00
return 0 ;
2007-06-10 21:02:09 +04:00
}
2008-08-07 10:20:05 +04:00
/*
call a control on the local node
*/
2015-10-03 06:42:05 +03:00
int ctdbd_control_local ( struct ctdbd_connection * conn , uint32_t opcode ,
uint64_t srvid , uint32_t flags , TDB_DATA data ,
TALLOC_CTX * mem_ctx , TDB_DATA * outdata ,
2016-04-21 14:31:39 +03:00
int32_t * cstatus )
2008-08-07 10:20:05 +04:00
{
2015-10-03 06:42:05 +03:00
return ctdbd_control ( conn , CTDB_CURRENT_NODE , opcode , srvid , flags , data ,
mem_ctx , outdata , cstatus ) ;
2008-08-07 10:20:05 +04:00
}
2015-10-03 06:42:05 +03:00
int ctdb_watch_us ( struct ctdbd_connection * conn )
2009-10-25 18:12:12 +03:00
{
2015-10-28 09:43:20 +03:00
struct ctdb_notify_data_old reg_data ;
2009-10-25 18:12:12 +03:00
size_t struct_len ;
2015-10-03 06:42:05 +03:00
int ret ;
2016-04-21 14:31:39 +03:00
int32_t cstatus ;
2009-10-25 18:12:12 +03:00
reg_data . srvid = CTDB_SRVID_SAMBA_NOTIFY ;
reg_data . len = 1 ;
reg_data . notify_data [ 0 ] = 0 ;
2015-10-28 09:43:20 +03:00
struct_len = offsetof ( struct ctdb_notify_data_old ,
2009-10-25 18:12:12 +03:00
notify_data ) + reg_data . len ;
2015-10-03 06:42:05 +03:00
ret = ctdbd_control_local (
2009-10-25 18:12:12 +03:00
conn , CTDB_CONTROL_REGISTER_NOTIFY , conn - > rand_srvid , 0 ,
make_tdb_data ( ( uint8_t * ) & reg_data , struct_len ) ,
NULL , NULL , & cstatus ) ;
2015-10-03 06:42:05 +03:00
if ( ret ! = 0 ) {
2009-10-25 18:12:12 +03:00
DEBUG ( 1 , ( " ctdbd_control_local failed: %s \n " ,
2015-10-03 06:42:05 +03:00
strerror ( ret ) ) ) ;
2009-10-25 18:12:12 +03:00
}
2015-10-03 06:42:05 +03:00
return ret ;
2009-10-25 18:12:12 +03:00
}
2015-10-03 06:42:05 +03:00
int ctdb_unwatch ( struct ctdbd_connection * conn )
2009-10-25 18:12:12 +03:00
{
2015-10-28 09:47:03 +03:00
uint64_t srvid = CTDB_SRVID_SAMBA_NOTIFY ;
2015-10-03 06:42:05 +03:00
int ret ;
2016-04-21 14:31:39 +03:00
int32_t cstatus ;
2009-10-25 18:12:12 +03:00
2015-10-03 06:42:05 +03:00
ret = ctdbd_control_local (
2009-10-25 18:12:12 +03:00
conn , CTDB_CONTROL_DEREGISTER_NOTIFY , conn - > rand_srvid , 0 ,
2015-10-28 09:47:03 +03:00
make_tdb_data ( ( uint8_t * ) & srvid , sizeof ( srvid ) ) ,
2009-10-25 18:12:12 +03:00
NULL , NULL , & cstatus ) ;
2015-10-03 06:42:05 +03:00
if ( ret ! = 0 ) {
2009-10-25 18:12:12 +03:00
DEBUG ( 1 , ( " ctdbd_control_local failed: %s \n " ,
2015-10-03 06:42:05 +03:00
strerror ( ret ) ) ) ;
2009-10-25 18:12:12 +03:00
}
2015-10-03 06:42:05 +03:00
return ret ;
2009-10-25 18:12:12 +03:00
}
2015-10-03 06:42:05 +03:00
int ctdbd_probe ( const char * sockname , int timeout )
2013-01-31 14:15:09 +04:00
{
/*
* Do a very early check if ctdbd is around to avoid an abort and core
* later
*/
struct ctdbd_connection * conn = NULL ;
2015-10-03 06:42:05 +03:00
int ret ;
2013-01-31 14:15:09 +04:00
2016-04-05 10:52:01 +03:00
ret = ctdbd_init_connection ( talloc_tos ( ) , sockname , timeout ,
& conn ) ;
2013-01-31 14:15:09 +04:00
/*
* We only care if we can connect .
*/
TALLOC_FREE ( conn ) ;
2015-10-03 06:42:05 +03:00
return ret ;
2013-01-31 14:15:09 +04:00
}
2017-01-09 10:17:02 +03:00
struct ctdb_pkt_send_state {
struct ctdb_pkt_send_state * prev , * next ;
struct tevent_context * ev ;
struct ctdbd_connection * conn ;
/* ctdb request id */
uint32_t reqid ;
/* the associated tevent request */
struct tevent_req * req ;
/* iovec array with data to send */
struct iovec _iov ;
struct iovec * iov ;
int iovcnt ;
/* Initial packet length */
size_t packet_len ;
} ;
static void ctdb_pkt_send_cleanup ( struct tevent_req * req ,
enum tevent_req_state req_state ) ;
/**
* Asynchronously send a ctdb packet given as iovec array
*
* Note : the passed iov array is not const here . Similar
* functions in samba take a const array and create a copy
* before calling iov_advance ( ) on the array .
*
* This function will modify the iov array ! But
* this is a static function and our only caller
* ctdb_parse_send / recv is preparared for this to
* happen !
* */
static struct tevent_req * ctdb_pkt_send_send ( TALLOC_CTX * mem_ctx ,
struct tevent_context * ev ,
struct ctdbd_connection * conn ,
uint32_t reqid ,
struct iovec * iov ,
int iovcnt ,
enum dbwrap_req_state * req_state )
{
struct tevent_req * req = NULL ;
struct ctdb_pkt_send_state * state = NULL ;
ssize_t nwritten ;
bool ok ;
DBG_DEBUG ( " sending async ctdb reqid [% " PRIu32 " ] \n " , reqid ) ;
req = tevent_req_create ( mem_ctx , & state , struct ctdb_pkt_send_state ) ;
if ( req = = NULL ) {
return NULL ;
}
* state = ( struct ctdb_pkt_send_state ) {
. ev = ev ,
. conn = conn ,
. req = req ,
. reqid = reqid ,
. iov = iov ,
. iovcnt = iovcnt ,
. packet_len = iov_buflen ( iov , iovcnt ) ,
} ;
tevent_req_set_cleanup_fn ( req , ctdb_pkt_send_cleanup ) ;
* req_state = DBWRAP_REQ_QUEUED ;
if ( ctdbd_conn_has_async_sends ( conn ) ) {
/*
* Can ' t attempt direct write with messages already queued and
* possibly in progress
*/
DLIST_ADD_END ( conn - > send_list , state ) ;
return req ;
}
/*
* Attempt a direct write . If this returns short , shedule the
* remaining data as an async write , otherwise we ' re already done .
*/
nwritten = writev ( conn - > fd , state - > iov , state - > iovcnt ) ;
if ( nwritten = = state - > packet_len ) {
DBG_DEBUG ( " Finished sending reqid [% " PRIu32 " ] \n " , reqid ) ;
* req_state = DBWRAP_REQ_DISPATCHED ;
tevent_req_done ( req ) ;
return tevent_req_post ( req , ev ) ;
}
if ( nwritten = = - 1 ) {
if ( errno ! = EINTR & & errno ! = EAGAIN & & errno ! = EWOULDBLOCK ) {
cluster_fatal ( " cluster write error \n " ) ;
}
nwritten = 0 ;
}
DBG_DEBUG ( " Posting async write of reqid [% " PRIu32 " ] "
" after short write [%zd] \n " , reqid , nwritten ) ;
ok = iov_advance ( & state - > iov , & state - > iovcnt , nwritten ) ;
if ( ! ok ) {
* req_state = DBWRAP_REQ_ERROR ;
tevent_req_error ( req , EIO ) ;
return tevent_req_post ( req , ev ) ;
}
/*
* As this is the first async write req we post , we must enable
* fd - writable events .
*/
TEVENT_FD_WRITEABLE ( conn - > fde ) ;
DLIST_ADD_END ( conn - > send_list , state ) ;
return req ;
}
static int ctdb_pkt_send_state_destructor ( struct ctdb_pkt_send_state * state )
{
struct ctdbd_connection * conn = state - > conn ;
if ( conn = = NULL ) {
return 0 ;
}
if ( state - > req = = NULL ) {
DBG_DEBUG ( " Removing cancelled reqid [% " PRIu32 " ] \n " ,
state - > reqid ) ;
state - > conn = NULL ;
DLIST_REMOVE ( conn - > send_list , state ) ;
return 0 ;
}
DBG_DEBUG ( " Reparenting cancelled reqid [% " PRIu32 " ] \n " ,
state - > reqid ) ;
talloc_reparent ( state - > req , conn , state ) ;
state - > req = NULL ;
return - 1 ;
}
static void ctdb_pkt_send_cleanup ( struct tevent_req * req ,
enum tevent_req_state req_state )
{
struct ctdb_pkt_send_state * state = tevent_req_data (
req , struct ctdb_pkt_send_state ) ;
struct ctdbd_connection * conn = state - > conn ;
size_t missing_len = 0 ;
if ( conn = = NULL ) {
return ;
}
missing_len = iov_buflen ( state - > iov , state - > iovcnt ) ;
if ( state - > packet_len = = missing_len ) {
/*
* We haven ' t yet started sending this one , so we can just
* remove it from the pending list
*/
missing_len = 0 ;
}
if ( missing_len ! = 0 ) {
uint8_t * buf = NULL ;
if ( req_state ! = TEVENT_REQ_RECEIVED ) {
/*
* Wait til the req_state is TEVENT_REQ_RECEIVED , as
* that will be the final state when the request state
* is talloc_free ' d from tallloc_req_received ( ) . Which
* ensures we only run the following code * ONCE * !
*/
return ;
}
DBG_DEBUG ( " Cancelling in-flight reqid [% " PRIu32 " ] \n " ,
state - > reqid ) ;
/*
* A request in progress of being sent . Reparent the iov buffer
* so we can continue sending the request . See also the comment
* in ctdbd_parse_send ( ) when copying the key buffer .
*/
buf = iov_concat ( state , state - > iov , state - > iovcnt ) ;
if ( buf = = NULL ) {
cluster_fatal ( " iov_concat error \n " ) ;
return ;
}
state - > iovcnt = 1 ;
state - > _iov . iov_base = buf ;
state - > _iov . iov_len = missing_len ;
state - > iov = & state - > _iov ;
talloc_set_destructor ( state , ctdb_pkt_send_state_destructor ) ;
return ;
}
DBG_DEBUG ( " Removing pending reqid [% " PRIu32 " ] \n " , state - > reqid ) ;
state - > conn = NULL ;
DLIST_REMOVE ( conn - > send_list , state ) ;
if ( ! ctdbd_conn_has_async_sends ( conn ) ) {
DBG_DEBUG ( " No more sends, disabling fd-writable events \n " ) ;
TEVENT_FD_NOT_WRITEABLE ( conn - > fde ) ;
}
}
static int ctdb_pkt_send_handler ( struct ctdbd_connection * conn )
{
struct ctdb_pkt_send_state * state = NULL ;
ssize_t nwritten ;
ssize_t iovlen ;
bool ok ;
DBG_DEBUG ( " send handler \n " ) ;
if ( ! ctdbd_conn_has_async_sends ( conn ) ) {
DBG_WARNING ( " Writable fd-event without pending send \n " ) ;
TEVENT_FD_NOT_WRITEABLE ( conn - > fde ) ;
return 0 ;
}
state = conn - > send_list ;
iovlen = iov_buflen ( state - > iov , state - > iovcnt ) ;
nwritten = writev ( conn - > fd , state - > iov , state - > iovcnt ) ;
if ( nwritten = = - 1 ) {
if ( errno ! = EINTR & & errno ! = EAGAIN & & errno ! = EWOULDBLOCK ) {
DBG_ERR ( " writev failed: %s \n " , strerror ( errno ) ) ;
cluster_fatal ( " cluster write error \n " ) ;
}
DBG_DEBUG ( " recoverable writev error, retry \n " ) ;
return 0 ;
}
if ( nwritten < iovlen ) {
DBG_DEBUG ( " short write \n " ) ;
ok = iov_advance ( & state - > iov , & state - > iovcnt , nwritten ) ;
if ( ! ok ) {
DBG_ERR ( " iov_advance failed \n " ) ;
if ( state - > req = = NULL ) {
TALLOC_FREE ( state ) ;
return 0 ;
}
tevent_req_error ( state - > req , EIO ) ;
return 0 ;
}
return 0 ;
}
if ( state - > req = = NULL ) {
DBG_DEBUG ( " Finished sending cancelled reqid [% " PRIu32 " ] \n " ,
state - > reqid ) ;
TALLOC_FREE ( state ) ;
return 0 ;
}
DBG_DEBUG ( " Finished send request id [% " PRIu32 " ] \n " , state - > reqid ) ;
tevent_req_done ( state - > req ) ;
return 0 ;
}
static int ctdb_pkt_send_recv ( struct tevent_req * req )
{
int ret ;
if ( tevent_req_is_unix_error ( req , & ret ) ) {
tevent_req_received ( req ) ;
return ret ;
}
tevent_req_received ( req ) ;
return 0 ;
}
struct ctdb_pkt_recv_state {
struct ctdb_pkt_recv_state * prev , * next ;
struct tevent_context * ev ;
struct ctdbd_connection * conn ;
/* ctdb request id */
uint32_t reqid ;
/* the associated tevent_req */
struct tevent_req * req ;
/* pointer to allocated ctdb packet buffer */
struct ctdb_req_header * hdr ;
} ;
static void ctdb_pkt_recv_cleanup ( struct tevent_req * req ,
enum tevent_req_state req_state ) ;
static struct tevent_req * ctdb_pkt_recv_send ( TALLOC_CTX * mem_ctx ,
struct tevent_context * ev ,
struct ctdbd_connection * conn ,
uint32_t reqid )
{
struct tevent_req * req = NULL ;
struct ctdb_pkt_recv_state * state = NULL ;
req = tevent_req_create ( mem_ctx , & state , struct ctdb_pkt_recv_state ) ;
if ( req = = NULL ) {
return NULL ;
}
* state = ( struct ctdb_pkt_recv_state ) {
. ev = ev ,
. conn = conn ,
. reqid = reqid ,
. req = req ,
} ;
tevent_req_set_cleanup_fn ( req , ctdb_pkt_recv_cleanup ) ;
/*
* fd - readable event is always set for the fde , no need to deal with
* that here .
*/
DLIST_ADD_END ( conn - > recv_list , state ) ;
DBG_DEBUG ( " Posted receive reqid [% " PRIu32 " ] \n " , state - > reqid ) ;
return req ;
}
static void ctdb_pkt_recv_cleanup ( struct tevent_req * req ,
enum tevent_req_state req_state )
{
struct ctdb_pkt_recv_state * state = tevent_req_data (
req , struct ctdb_pkt_recv_state ) ;
struct ctdbd_connection * conn = state - > conn ;
if ( conn = = NULL ) {
return ;
}
state - > conn = NULL ;
DLIST_REMOVE ( conn - > recv_list , state ) ;
}
static int ctdb_pkt_recv_handler ( struct ctdbd_connection * conn )
{
struct ctdb_pkt_recv_state * state = NULL ;
ssize_t nread ;
ssize_t iovlen ;
bool ok ;
DBG_DEBUG ( " receive handler \n " ) ;
if ( conn - > read_state . iovs = = NULL ) {
conn - > read_state . iov . iov_base = & conn - > read_state . msglen ;
conn - > read_state . iov . iov_len = sizeof ( conn - > read_state . msglen ) ;
conn - > read_state . iovs = & conn - > read_state . iov ;
conn - > read_state . iovcnt = 1 ;
}
iovlen = iov_buflen ( conn - > read_state . iovs , conn - > read_state . iovcnt ) ;
DBG_DEBUG ( " iovlen [%zd] \n " , iovlen ) ;
nread = readv ( conn - > fd , conn - > read_state . iovs , conn - > read_state . iovcnt ) ;
if ( nread = = 0 ) {
cluster_fatal ( " cluster read error, peer closed connection \n " ) ;
}
if ( nread = = - 1 ) {
if ( errno ! = EINTR & & errno ! = EAGAIN & & errno ! = EWOULDBLOCK ) {
cluster_fatal ( " cluster read error \n " ) ;
}
DBG_DEBUG ( " recoverable error from readv, retry \n " ) ;
return 0 ;
}
if ( nread < iovlen ) {
DBG_DEBUG ( " iovlen [%zd] nread [%zd] \n " , iovlen , nread ) ;
ok = iov_advance ( & conn - > read_state . iovs ,
& conn - > read_state . iovcnt ,
nread ) ;
if ( ! ok ) {
return EIO ;
}
return 0 ;
}
conn - > read_state . iovs = NULL ;
conn - > read_state . iovcnt = 0 ;
if ( conn - > read_state . hdr = = NULL ) {
/*
* Going this way after reading the 4 initial byte message
* length
*/
uint32_t msglen = conn - > read_state . msglen ;
uint8_t * readbuf = NULL ;
size_t readlen ;
DBG_DEBUG ( " msglen: % " PRIu32 " \n " , msglen ) ;
if ( msglen < sizeof ( struct ctdb_req_header ) ) {
DBG_ERR ( " short message % " PRIu32 " \n " , msglen ) ;
return EIO ;
}
conn - > read_state . hdr = talloc_size ( conn , msglen ) ;
if ( conn - > read_state . hdr = = NULL ) {
return ENOMEM ;
}
conn - > read_state . hdr - > length = msglen ;
talloc_set_name_const ( conn - > read_state . hdr ,
" struct ctdb_req_header " ) ;
readbuf = ( uint8_t * ) conn - > read_state . hdr + sizeof ( msglen ) ;
readlen = msglen - sizeof ( msglen ) ;
conn - > read_state . iov . iov_base = readbuf ;
conn - > read_state . iov . iov_len = readlen ;
conn - > read_state . iovs = & conn - > read_state . iov ;
conn - > read_state . iovcnt = 1 ;
DBG_DEBUG ( " Scheduled packet read size %zd \n " , readlen ) ;
return 0 ;
}
/*
* Searching a list here is expected to be cheap , as messages are
* exepcted to be coming in more or less ordered and we should find the
* waiting request near the beginning of the list .
*/
for ( state = conn - > recv_list ; state ! = NULL ; state = state - > next ) {
if ( state - > reqid = = conn - > read_state . hdr - > reqid ) {
break ;
}
}
if ( state = = NULL ) {
DBG_ERR ( " Discarding async ctdb reqid %u \n " ,
conn - > read_state . hdr - > reqid ) ;
TALLOC_FREE ( conn - > read_state . hdr ) ;
ZERO_STRUCT ( conn - > read_state ) ;
return EINVAL ;
}
DBG_DEBUG ( " Got reply for reqid [% " PRIu32 " ] \n " , state - > reqid ) ;
state - > hdr = talloc_move ( state , & conn - > read_state . hdr ) ;
ZERO_STRUCT ( conn - > read_state ) ;
tevent_req_done ( state - > req ) ;
return 0 ;
}
static int ctdb_pkt_recv_recv ( struct tevent_req * req ,
TALLOC_CTX * mem_ctx ,
struct ctdb_req_header * * _hdr )
{
struct ctdb_pkt_recv_state * state = tevent_req_data (
req , struct ctdb_pkt_recv_state ) ;
int error ;
if ( tevent_req_is_unix_error ( req , & error ) ) {
DBG_ERR ( " ctdb_read_req failed %s \n " , strerror ( error ) ) ;
tevent_req_received ( req ) ;
return error ;
}
* _hdr = talloc_move ( mem_ctx , & state - > hdr ) ;
tevent_req_received ( req ) ;
return 0 ;
}
static int ctdbd_connection_destructor ( struct ctdbd_connection * c )
{
TALLOC_FREE ( c - > fde ) ;
if ( c - > fd ! = - 1 ) {
close ( c - > fd ) ;
c - > fd = - 1 ;
}
TALLOC_FREE ( c - > read_state . hdr ) ;
ZERO_STRUCT ( c - > read_state ) ;
2017-06-09 09:41:49 +03:00
while ( c - > send_list ! = NULL ) {
struct ctdb_pkt_send_state * send_state = c - > send_list ;
2017-01-09 10:17:02 +03:00
DLIST_REMOVE ( c - > send_list , send_state ) ;
send_state - > conn = NULL ;
tevent_req_defer_callback ( send_state - > req , send_state - > ev ) ;
tevent_req_error ( send_state - > req , EIO ) ;
}
2017-06-09 09:41:49 +03:00
while ( c - > recv_list ! = NULL ) {
struct ctdb_pkt_recv_state * recv_state = c - > recv_list ;
2017-01-09 10:17:02 +03:00
DLIST_REMOVE ( c - > recv_list , recv_state ) ;
recv_state - > conn = NULL ;
2017-06-09 09:41:16 +03:00
tevent_req_defer_callback ( recv_state - > req , recv_state - > ev ) ;
2017-01-09 10:17:02 +03:00
tevent_req_error ( recv_state - > req , EIO ) ;
}
return 0 ;
}
struct ctdbd_parse_state {
struct tevent_context * ev ;
struct ctdbd_connection * conn ;
uint32_t reqid ;
TDB_DATA key ;
uint8_t _keybuf [ 64 ] ;
struct ctdb_req_call_old ctdb_req ;
struct iovec iov [ 2 ] ;
void ( * parser ) ( TDB_DATA key ,
TDB_DATA data ,
void * private_data ) ;
void * private_data ;
enum dbwrap_req_state * req_state ;
} ;
static void ctdbd_parse_pkt_send_done ( struct tevent_req * subreq ) ;
static void ctdbd_parse_done ( struct tevent_req * subreq ) ;
struct tevent_req * ctdbd_parse_send ( TALLOC_CTX * mem_ctx ,
struct tevent_context * ev ,
struct ctdbd_connection * conn ,
uint32_t db_id ,
TDB_DATA key ,
bool local_copy ,
void ( * parser ) ( TDB_DATA key ,
TDB_DATA data ,
void * private_data ) ,
void * private_data ,
enum dbwrap_req_state * req_state )
{
struct tevent_req * req = NULL ;
struct ctdbd_parse_state * state = NULL ;
uint32_t flags ;
uint32_t packet_length ;
struct tevent_req * subreq = NULL ;
req = tevent_req_create ( mem_ctx , & state , struct ctdbd_parse_state ) ;
if ( req = = NULL ) {
* req_state = DBWRAP_REQ_ERROR ;
return NULL ;
}
* state = ( struct ctdbd_parse_state ) {
. ev = ev ,
. conn = conn ,
. reqid = ctdbd_next_reqid ( conn ) ,
. parser = parser ,
. private_data = private_data ,
. req_state = req_state ,
} ;
flags = local_copy ? CTDB_WANT_READONLY : 0 ;
packet_length = offsetof ( struct ctdb_req_call_old , data ) + key . dsize ;
/*
* Copy the key into our state , as ctdb_pkt_send_cleanup ( ) requires that
* all passed iov elements have a lifetime longer that the tevent_req
* returned by ctdb_pkt_send_send ( ) . This is required continue sending a
* the low level request into the ctdb socket , if a higher level
* ( ' this ' ) request is canceled ( or talloc free ' d ) by the application
* layer , without sending invalid packets to ctdb .
*/
if ( key . dsize > sizeof ( state - > _keybuf ) ) {
state - > key . dptr = talloc_memdup ( state , key . dptr , key . dsize ) ;
if ( tevent_req_nomem ( state - > key . dptr , req ) ) {
return tevent_req_post ( req , ev ) ;
}
} else {
memcpy ( state - > _keybuf , key . dptr , key . dsize ) ;
state - > key . dptr = state - > _keybuf ;
}
state - > key . dsize = key . dsize ;
state - > ctdb_req . hdr . length = packet_length ;
state - > ctdb_req . hdr . ctdb_magic = CTDB_MAGIC ;
state - > ctdb_req . hdr . ctdb_version = CTDB_PROTOCOL ;
state - > ctdb_req . hdr . operation = CTDB_REQ_CALL ;
state - > ctdb_req . hdr . reqid = state - > reqid ;
state - > ctdb_req . flags = flags ;
state - > ctdb_req . callid = CTDB_FETCH_FUNC ;
state - > ctdb_req . db_id = db_id ;
state - > ctdb_req . keylen = state - > key . dsize ;
state - > iov [ 0 ] . iov_base = & state - > ctdb_req ;
state - > iov [ 0 ] . iov_len = offsetof ( struct ctdb_req_call_old , data ) ;
state - > iov [ 1 ] . iov_base = state - > key . dptr ;
state - > iov [ 1 ] . iov_len = state - > key . dsize ;
/*
* Note that ctdb_pkt_send_send ( )
* will modify state - > iov using
* iov_advance ( ) without making a copy .
*/
subreq = ctdb_pkt_send_send ( state ,
ev ,
conn ,
state - > reqid ,
state - > iov ,
ARRAY_SIZE ( state - > iov ) ,
req_state ) ;
if ( tevent_req_nomem ( subreq , req ) ) {
* req_state = DBWRAP_REQ_ERROR ;
return tevent_req_post ( req , ev ) ;
}
tevent_req_set_callback ( subreq , ctdbd_parse_pkt_send_done , req ) ;
return req ;
}
static void ctdbd_parse_pkt_send_done ( struct tevent_req * subreq )
{
struct tevent_req * req = tevent_req_callback_data (
subreq , struct tevent_req ) ;
struct ctdbd_parse_state * state = tevent_req_data (
req , struct ctdbd_parse_state ) ;
int ret ;
ret = ctdb_pkt_send_recv ( subreq ) ;
TALLOC_FREE ( subreq ) ;
if ( tevent_req_error ( req , ret ) ) {
DBG_DEBUG ( " ctdb_pkt_send_recv failed %s \n " , strerror ( ret ) ) ;
return ;
}
subreq = ctdb_pkt_recv_send ( state ,
state - > ev ,
state - > conn ,
state - > reqid ) ;
if ( tevent_req_nomem ( subreq , req ) ) {
return ;
}
* state - > req_state = DBWRAP_REQ_DISPATCHED ;
tevent_req_set_callback ( subreq , ctdbd_parse_done , req ) ;
return ;
}
static void ctdbd_parse_done ( struct tevent_req * subreq )
{
struct tevent_req * req = tevent_req_callback_data (
subreq , struct tevent_req ) ;
struct ctdbd_parse_state * state = tevent_req_data (
req , struct ctdbd_parse_state ) ;
struct ctdb_req_header * hdr = NULL ;
struct ctdb_reply_call_old * reply = NULL ;
int ret ;
DBG_DEBUG ( " async parse request finished \n " ) ;
ret = ctdb_pkt_recv_recv ( subreq , state , & hdr ) ;
TALLOC_FREE ( subreq ) ;
if ( tevent_req_error ( req , ret ) ) {
DBG_ERR ( " ctdb_pkt_recv_recv returned %s \n " , strerror ( ret ) ) ;
return ;
}
if ( hdr - > operation ! = CTDB_REPLY_CALL ) {
DBG_ERR ( " received invalid reply \n " ) ;
ctdb_packet_dump ( hdr ) ;
tevent_req_error ( req , EIO ) ;
return ;
}
reply = ( struct ctdb_reply_call_old * ) hdr ;
if ( reply - > datalen = = 0 ) {
/*
* Treat an empty record as non - existing
*/
tevent_req_error ( req , ENOENT ) ;
return ;
}
state - > parser ( state - > key ,
make_tdb_data ( & reply - > data [ 0 ] , reply - > datalen ) ,
state - > private_data ) ;
tevent_req_done ( req ) ;
return ;
}
int ctdbd_parse_recv ( struct tevent_req * req )
{
int error ;
if ( tevent_req_is_unix_error ( req , & error ) ) {
DBG_DEBUG ( " async parse returned %s \n " , strerror ( error ) ) ;
tevent_req_received ( req ) ;
return error ;
}
tevent_req_received ( req ) ;
return 0 ;
}