2007-05-03 06:16:03 +04:00
/*
efficient async ctdb traverse
Copyright ( C ) Andrew Tridgell 2007
This library is free software ; you can redistribute it and / or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation ; either
version 2 of the License , or ( at your option ) any later version .
This library is distributed in the hope that it will be useful ,
but WITHOUT ANY WARRANTY ; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE . See the GNU
Lesser General Public License for more details .
You should have received a copy of the GNU Lesser General Public
License along with this library ; if not , write to the Free Software
Foundation , Inc . , 59 Temple Place , Suite 330 , Boston , MA 02111 - 1307 USA
*/
# include "includes.h"
# include "lib/events/events.h"
# include "system/filesys.h"
# include "system/wait.h"
# include "db_wrap.h"
# include "lib/tdb/include/tdb.h"
# include "../include/ctdb_private.h"
typedef void ( * ctdb_traverse_fn_t ) ( void * private_data , TDB_DATA key , TDB_DATA data ) ;
/*
handle returned to caller - freeing this handler will kill the child and
terminate the traverse
*/
2007-05-03 11:12:23 +04:00
struct ctdb_traverse_local_handle {
2007-05-03 06:16:03 +04:00
struct ctdb_db_context * ctdb_db ;
int fd [ 2 ] ;
pid_t child ;
void * private_data ;
ctdb_traverse_fn_t callback ;
struct timeval start_time ;
struct ctdb_queue * queue ;
} ;
/*
called when data is available from the child
*/
2007-05-03 11:12:23 +04:00
static void ctdb_traverse_local_handler ( uint8_t * rawdata , size_t length , void * private_data )
2007-05-03 06:16:03 +04:00
{
2007-05-03 11:12:23 +04:00
struct ctdb_traverse_local_handle * h = talloc_get_type ( private_data ,
struct ctdb_traverse_local_handle ) ;
2007-05-03 06:16:03 +04:00
TDB_DATA key , data ;
ctdb_traverse_fn_t callback = h - > callback ;
void * p = h - > private_data ;
2007-05-10 11:43:45 +04:00
struct ctdb_rec_data * tdata = ( struct ctdb_rec_data * ) rawdata ;
2007-05-03 06:16:03 +04:00
if ( rawdata = = NULL | | length < 4 | | length ! = tdata - > length ) {
/* end of traverse */
talloc_free ( h ) ;
callback ( p , tdb_null , tdb_null ) ;
return ;
}
key . dsize = tdata - > keylen ;
key . dptr = & tdata - > data [ 0 ] ;
data . dsize = tdata - > datalen ;
data . dptr = & tdata - > data [ tdata - > keylen ] ;
callback ( p , key , data ) ;
}
/*
destroy a in - flight traverse operation
*/
2007-05-03 11:12:23 +04:00
static int traverse_local_destructor ( struct ctdb_traverse_local_handle * h )
2007-05-03 06:16:03 +04:00
{
kill ( h - > child , SIGKILL ) ;
waitpid ( h - > child , NULL , 0 ) ;
return 0 ;
}
2007-05-03 11:12:23 +04:00
/*
callback from tdb_traverse_read ( )
*/
static int ctdb_traverse_local_fn ( struct tdb_context * tdb , TDB_DATA key , TDB_DATA data , void * p )
{
struct ctdb_traverse_local_handle * h = talloc_get_type ( p ,
struct ctdb_traverse_local_handle ) ;
2007-05-10 11:43:45 +04:00
struct ctdb_rec_data * d ;
2007-05-03 11:12:23 +04:00
struct ctdb_ltdb_header * hdr ;
/* filter out non-authoritative and zero-length records */
hdr = ( struct ctdb_ltdb_header * ) data . dptr ;
if ( data . dsize < = sizeof ( struct ctdb_ltdb_header ) | |
hdr - > dmaster ! = h - > ctdb_db - > ctdb - > vnn ) {
return 0 ;
}
2007-05-10 11:43:45 +04:00
d = ctdb_marshall_record ( h , 0 , key , data ) ;
2007-05-03 11:12:23 +04:00
if ( d = = NULL ) {
/* error handling is tricky in this child code .... */
return - 1 ;
}
if ( write ( h - > fd [ 1 ] , ( uint8_t * ) d , d - > length ) ! = d - > length ) {
2007-05-03 06:16:03 +04:00
return - 1 ;
}
return 0 ;
}
2007-05-03 11:12:23 +04:00
2007-05-03 06:16:03 +04:00
/*
2007-05-03 11:12:23 +04:00
setup a non - blocking traverse of a local ltdb . The callback function
will be called on every record in the local ltdb . To stop the
travserse , talloc_free ( ) the travserse_handle .
The traverse is finished when the callback is called with tdb_null for key and data
2007-05-03 06:16:03 +04:00
*/
2007-05-03 11:12:23 +04:00
struct ctdb_traverse_local_handle * ctdb_traverse_local ( struct ctdb_db_context * ctdb_db ,
ctdb_traverse_fn_t callback ,
void * private_data )
2007-05-03 06:16:03 +04:00
{
2007-05-03 11:12:23 +04:00
struct ctdb_traverse_local_handle * h ;
2007-05-03 06:16:03 +04:00
int ret ;
2007-05-03 11:12:23 +04:00
h = talloc_zero ( ctdb_db , struct ctdb_traverse_local_handle ) ;
if ( h = = NULL ) {
2007-05-03 06:16:03 +04:00
return NULL ;
}
ret = pipe ( h - > fd ) ;
if ( ret ! = 0 ) {
talloc_free ( h ) ;
return NULL ;
}
h - > child = fork ( ) ;
if ( h - > child = = ( pid_t ) - 1 ) {
close ( h - > fd [ 0 ] ) ;
close ( h - > fd [ 1 ] ) ;
talloc_free ( h ) ;
return NULL ;
}
h - > callback = callback ;
h - > private_data = private_data ;
h - > ctdb_db = ctdb_db ;
if ( h - > child = = 0 ) {
/* start the traverse in the child */
close ( h - > fd [ 0 ] ) ;
2007-05-03 11:12:23 +04:00
tdb_traverse_read ( ctdb_db - > ltdb - > tdb , ctdb_traverse_local_fn , h ) ;
2007-05-03 06:16:03 +04:00
_exit ( 0 ) ;
}
close ( h - > fd [ 1 ] ) ;
2007-05-03 11:12:23 +04:00
talloc_set_destructor ( h , traverse_local_destructor ) ;
2007-05-03 06:16:03 +04:00
/*
setup a packet queue between the child and the parent . This
copes with all the async and packet boundary issues
*/
2007-05-03 11:12:23 +04:00
h - > queue = ctdb_queue_setup ( ctdb_db - > ctdb , h , h - > fd [ 0 ] , 0 , ctdb_traverse_local_handler , h ) ;
2007-05-03 06:16:03 +04:00
if ( h - > queue = = NULL ) {
talloc_free ( h ) ;
return NULL ;
}
h - > start_time = timeval_current ( ) ;
return h ;
}
2007-05-03 11:12:23 +04:00
struct ctdb_traverse_all_handle {
struct ctdb_context * ctdb ;
uint32_t reqid ;
ctdb_traverse_fn_t callback ;
void * private_data ;
uint32_t null_count ;
} ;
/*
destroy a traverse_all op
*/
static int ctdb_traverse_all_destructor ( struct ctdb_traverse_all_handle * state )
{
ctdb_reqid_remove ( state - > ctdb , state - > reqid ) ;
return 0 ;
}
struct ctdb_traverse_all {
uint32_t db_id ;
uint32_t reqid ;
uint32_t vnn ;
} ;
2007-05-10 08:06:48 +04:00
/* called when a traverse times out */
static void ctdb_traverse_all_timeout ( struct event_context * ev , struct timed_event * te ,
struct timeval t , void * private_data )
{
struct ctdb_traverse_all_handle * state = talloc_get_type ( private_data , struct ctdb_traverse_all_handle ) ;
state - > ctdb - > status . timeouts . traverse + + ;
state - > callback ( state - > private_data , tdb_null , tdb_null ) ;
talloc_free ( state ) ;
}
2007-05-03 11:12:23 +04:00
/*
setup a cluster - wide non - blocking traverse of a ctdb . The
callback function will be called on every record in the local
ltdb . To stop the travserse , talloc_free ( ) the traverse_handle .
The traverse is finished when the callback is called with tdb_null
for key and data
*/
struct ctdb_traverse_all_handle * ctdb_daemon_traverse_all ( struct ctdb_db_context * ctdb_db ,
ctdb_traverse_fn_t callback ,
void * private_data )
{
struct ctdb_traverse_all_handle * state ;
struct ctdb_context * ctdb = ctdb_db - > ctdb ;
int ret ;
TDB_DATA data ;
struct ctdb_traverse_all r ;
state = talloc ( ctdb_db , struct ctdb_traverse_all_handle ) ;
if ( state = = NULL ) {
return NULL ;
}
state - > ctdb = ctdb ;
state - > reqid = ctdb_reqid_new ( ctdb_db - > ctdb , state ) ;
state - > callback = callback ;
state - > private_data = private_data ;
state - > null_count = 0 ;
talloc_set_destructor ( state , ctdb_traverse_all_destructor ) ;
r . db_id = ctdb_db - > db_id ;
r . reqid = state - > reqid ;
r . vnn = ctdb - > vnn ;
data . dptr = ( uint8_t * ) & r ;
data . dsize = sizeof ( r ) ;
/* tell all the nodes in the cluster to start sending records to this node */
2007-05-05 07:17:26 +04:00
ret = ctdb_daemon_send_control ( ctdb , CTDB_BROADCAST_VNNMAP , 0 , CTDB_CONTROL_TRAVERSE_ALL ,
2007-05-04 05:41:29 +04:00
0 , CTDB_CTRL_FLAG_NOREPLY , data , NULL , NULL ) ;
2007-05-03 11:12:23 +04:00
if ( ret ! = 0 ) {
talloc_free ( state ) ;
return NULL ;
}
2007-05-10 08:06:48 +04:00
/* timeout the traverse */
event_add_timed ( ctdb - > ev , state , timeval_current_ofs ( CTDB_TRAVERSE_TIMEOUT , 0 ) ,
ctdb_traverse_all_timeout , state ) ;
2007-05-03 11:12:23 +04:00
return state ;
}
struct traverse_all_state {
struct ctdb_context * ctdb ;
struct ctdb_traverse_local_handle * h ;
uint32_t reqid ;
uint32_t srcnode ;
} ;
/*
called for each record during a traverse all
*/
static void traverse_all_callback ( void * p , TDB_DATA key , TDB_DATA data )
{
struct traverse_all_state * state = talloc_get_type ( p , struct traverse_all_state ) ;
int ret ;
2007-05-10 11:43:45 +04:00
struct ctdb_rec_data * d ;
2007-05-03 11:12:23 +04:00
2007-05-10 11:43:45 +04:00
d = ctdb_marshall_record ( state , state - > reqid , key , data ) ;
2007-05-03 11:12:23 +04:00
if ( d = = NULL ) {
/* darn .... */
DEBUG ( 0 , ( " Out of memory in traverse_all_callback \n " ) ) ;
return ;
}
data . dptr = ( uint8_t * ) d ;
data . dsize = d - > length ;
ret = ctdb_daemon_send_control ( state - > ctdb , state - > srcnode , 0 , CTDB_CONTROL_TRAVERSE_DATA ,
2007-05-04 05:41:29 +04:00
0 , CTDB_CTRL_FLAG_NOREPLY , data , NULL , NULL ) ;
2007-05-03 11:12:23 +04:00
if ( ret ! = 0 ) {
DEBUG ( 0 , ( " Failed to send traverse data \n " ) ) ;
}
}
/*
called when a CTDB_CONTROL_TRAVERSE_ALL control comes in . We then
setup a traverse of our local ltdb , sending the records as
CTDB_CONTROL_TRAVERSE_DATA records back to the originator
*/
int32_t ctdb_control_traverse_all ( struct ctdb_context * ctdb , TDB_DATA data , TDB_DATA * outdata )
{
struct ctdb_traverse_all * c = ( struct ctdb_traverse_all * ) data . dptr ;
struct traverse_all_state * state ;
struct ctdb_db_context * ctdb_db ;
if ( data . dsize ! = sizeof ( struct ctdb_traverse_all ) ) {
DEBUG ( 0 , ( " Invalid size in ctdb_control_traverse_all \n " ) ) ;
return - 1 ;
}
ctdb_db = find_ctdb_db ( ctdb , c - > db_id ) ;
if ( ctdb_db = = NULL ) {
return - 1 ;
}
state = talloc ( ctdb_db , struct traverse_all_state ) ;
if ( state = = NULL ) {
return - 1 ;
}
state - > reqid = c - > reqid ;
state - > srcnode = c - > vnn ;
state - > ctdb = ctdb ;
state - > h = ctdb_traverse_local ( ctdb_db , traverse_all_callback , state ) ;
if ( state - > h = = NULL ) {
talloc_free ( state ) ;
return - 1 ;
}
return 0 ;
}
/*
called when a CTDB_CONTROL_TRAVERSE_DATA control comes in . We then
call the traverse_all callback with the record
*/
int32_t ctdb_control_traverse_data ( struct ctdb_context * ctdb , TDB_DATA data , TDB_DATA * outdata )
{
2007-05-10 11:43:45 +04:00
struct ctdb_rec_data * d = ( struct ctdb_rec_data * ) data . dptr ;
2007-05-03 11:12:23 +04:00
struct ctdb_traverse_all_handle * state ;
TDB_DATA key ;
ctdb_traverse_fn_t callback ;
void * private_data ;
if ( data . dsize < sizeof ( uint32_t ) | | data . dsize ! = d - > length ) {
DEBUG ( 0 , ( " Bad record size in ctdb_control_traverse_data \n " ) ) ;
return - 1 ;
}
state = ctdb_reqid_find ( ctdb , d - > reqid , struct ctdb_traverse_all_handle ) ;
if ( state = = NULL | | d - > reqid ! = state - > reqid ) {
/* traverse might have been terminated already */
return - 1 ;
}
key . dsize = d - > keylen ;
key . dptr = & d - > data [ 0 ] ;
data . dsize = d - > datalen ;
data . dptr = & d - > data [ d - > keylen ] ;
if ( key . dsize = = 0 & & data . dsize = = 0 ) {
state - > null_count + + ;
2007-05-17 17:23:41 +04:00
if ( state - > null_count ! = ctdb_get_num_connected_nodes ( ctdb ) ) {
2007-05-03 11:12:23 +04:00
return 0 ;
}
}
callback = state - > callback ;
private_data = state - > private_data ;
callback ( private_data , key , data ) ;
if ( key . dsize = = 0 & & data . dsize = = 0 ) {
/* we've received all of the null replies, so all
nodes are finished */
talloc_free ( state ) ;
}
return 0 ;
}
struct traverse_start_state {
struct ctdb_context * ctdb ;
struct ctdb_traverse_all_handle * h ;
uint32_t srcnode ;
uint32_t reqid ;
uint64_t srvid ;
} ;
/*
callback which sends records as messages to the client
*/
static void traverse_start_callback ( void * p , TDB_DATA key , TDB_DATA data )
{
struct traverse_start_state * state ;
2007-05-10 11:43:45 +04:00
struct ctdb_rec_data * d ;
2007-05-03 11:12:23 +04:00
state = talloc_get_type ( p , struct traverse_start_state ) ;
2007-05-10 11:43:45 +04:00
d = ctdb_marshall_record ( state , state - > reqid , key , data ) ;
2007-05-03 11:12:23 +04:00
if ( d = = NULL ) {
return ;
}
data . dptr = ( uint8_t * ) d ;
data . dsize = d - > length ;
ctdb_dispatch_message ( state - > ctdb , state - > srvid , data ) ;
if ( key . dsize = = 0 & & data . dsize = = 0 ) {
/* end of traverse */
talloc_free ( state ) ;
}
}
/*
start a traverse_all - called as a control from a client
*/
int32_t ctdb_control_traverse_start ( struct ctdb_context * ctdb , TDB_DATA data ,
TDB_DATA * outdata , uint32_t srcnode )
{
struct ctdb_traverse_start * d = ( struct ctdb_traverse_start * ) data . dptr ;
struct traverse_start_state * state ;
struct ctdb_db_context * ctdb_db ;
if ( data . dsize ! = sizeof ( * d ) ) {
DEBUG ( 0 , ( " Bad record size in ctdb_control_traverse_start \n " ) ) ;
return - 1 ;
}
ctdb_db = find_ctdb_db ( ctdb , d - > db_id ) ;
if ( ctdb_db = = NULL ) {
return - 1 ;
}
state = talloc ( ctdb_db , struct traverse_start_state ) ;
if ( state = = NULL ) {
return - 1 ;
}
state - > srcnode = srcnode ;
state - > reqid = d - > reqid ;
state - > srvid = d - > srvid ;
state - > ctdb = ctdb ;
state - > h = ctdb_daemon_traverse_all ( ctdb_db , traverse_start_callback , state ) ;
if ( state - > h = = NULL ) {
talloc_free ( state ) ;
return - 1 ;
}
return 0 ;
}