2007-04-10 00:03:39 +04:00
/*
ctdb database library
Utility functions to read / write blobs of data from a file descriptor
and handle the case where we might need multiple read / writes to get all the
data .
Copyright ( C ) Andrew Tridgell 2006
2007-05-31 07:50:53 +04:00
This program is free software ; you can redistribute it and / or modify
it under the terms of the GNU General Public License as published by
2007-07-10 09:29:31 +04:00
the Free Software Foundation ; either version 3 of the License , or
2007-05-31 07:50:53 +04:00
( at your option ) any later version .
This program is distributed in the hope that it will be useful ,
2007-04-10 00:03:39 +04:00
but WITHOUT ANY WARRANTY ; without even the implied warranty of
2007-05-31 07:50:53 +04:00
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE . See the
GNU General Public License for more details .
You should have received a copy of the GNU General Public License
2007-07-10 09:29:31 +04:00
along with this program ; if not , see < http : //www.gnu.org/licenses/>.
2007-04-10 00:03:39 +04:00
*/
# include "includes.h"
# include "lib/tdb/include/tdb.h"
# include "lib/events/events.h"
# include "lib/util/dlinklist.h"
# include "system/network.h"
# include "system/filesys.h"
# include "../include/ctdb_private.h"
2007-04-16 04:21:44 +04:00
# include "../include/ctdb.h"
2007-04-10 00:03:39 +04:00
2007-04-10 13:33:21 +04:00
/* structures for packet queueing - see common/ctdb_io.c */
struct ctdb_partial {
uint8_t * data ;
uint32_t length ;
} ;
2007-04-10 00:03:39 +04:00
2007-04-10 13:33:21 +04:00
struct ctdb_queue_pkt {
struct ctdb_queue_pkt * next , * prev ;
uint8_t * data ;
uint32_t length ;
2007-05-26 10:41:32 +04:00
uint32_t full_length ;
2007-04-10 13:33:21 +04:00
} ;
struct ctdb_queue {
struct ctdb_context * ctdb ;
struct ctdb_partial partial ; /* partial input packet */
struct ctdb_queue_pkt * out_queue ;
struct fd_event * fde ;
int fd ;
size_t alignment ;
2007-04-13 14:38:24 +04:00
void * private_data ;
2007-04-10 13:33:21 +04:00
ctdb_queue_cb_fn_t callback ;
} ;
2009-10-21 08:20:55 +04:00
int ctdb_queue_length ( struct ctdb_queue * queue )
{
int i ;
struct ctdb_queue_pkt * pkt ;
for ( i = 0 , pkt = queue - > out_queue ; pkt ; i + + , pkt = pkt - > next ) ;
return i ;
}
2007-04-10 13:33:21 +04:00
/*
called when an incoming connection is readable
*/
static void queue_io_read ( struct ctdb_queue * queue )
2007-04-10 00:03:39 +04:00
{
int num_ready = 0 ;
ssize_t nread ;
uint8_t * data , * data_base ;
2007-04-29 18:19:40 +04:00
if ( ioctl ( queue - > fd , FIONREAD , & num_ready ) ! = 0 ) {
return ;
}
if ( num_ready = = 0 ) {
2007-04-10 00:03:39 +04:00
/* the descriptor has been closed */
2007-04-10 13:33:21 +04:00
goto failed ;
2007-04-10 00:03:39 +04:00
}
2007-04-10 13:33:21 +04:00
queue - > partial . data = talloc_realloc_size ( queue , queue - > partial . data ,
num_ready + queue - > partial . length ) ;
2007-04-10 00:03:39 +04:00
2007-04-10 13:33:21 +04:00
if ( queue - > partial . data = = NULL ) {
2008-02-04 12:07:15 +03:00
DEBUG ( DEBUG_ERR , ( " read error alloc failed for %u \n " ,
2007-04-29 18:19:40 +04:00
num_ready + queue - > partial . length ) ) ;
2007-04-10 13:33:21 +04:00
goto failed ;
2007-04-10 00:03:39 +04:00
}
2007-04-10 13:33:21 +04:00
nread = read ( queue - > fd , queue - > partial . data + queue - > partial . length , num_ready ) ;
2007-04-10 00:03:39 +04:00
if ( nread < = 0 ) {
2008-02-04 12:07:15 +03:00
DEBUG ( DEBUG_ERR , ( " read error nread=%d \n " , ( int ) nread ) ) ;
2007-04-10 13:33:21 +04:00
goto failed ;
2007-04-10 00:03:39 +04:00
}
2007-04-10 13:33:21 +04:00
data = queue - > partial . data ;
nread + = queue - > partial . length ;
2007-04-10 00:03:39 +04:00
2007-04-10 13:33:21 +04:00
queue - > partial . data = NULL ;
queue - > partial . length = 0 ;
2007-04-10 00:03:39 +04:00
if ( nread > = 4 & & * ( uint32_t * ) data = = nread ) {
2007-04-10 13:33:21 +04:00
/* it is the responsibility of the incoming packet
function to free ' data ' */
2007-04-13 14:38:24 +04:00
queue - > callback ( data , nread , queue - > private_data ) ;
2007-04-10 00:03:39 +04:00
return ;
}
data_base = data ;
while ( nread > = 4 & & * ( uint32_t * ) data < = nread ) {
/* we have at least one packet */
uint8_t * d2 ;
uint32_t len ;
len = * ( uint32_t * ) data ;
2007-05-26 08:46:12 +04:00
if ( len = = 0 ) {
/* bad packet! treat as EOF */
2008-02-04 12:07:15 +03:00
DEBUG ( DEBUG_CRIT , ( " Invalid packet of length 0 \n " ) ) ;
2007-05-26 08:46:12 +04:00
goto failed ;
}
2007-04-10 13:33:21 +04:00
d2 = talloc_memdup ( queue , data , len ) ;
2007-04-10 00:03:39 +04:00
if ( d2 = = NULL ) {
2008-02-04 12:07:15 +03:00
DEBUG ( DEBUG_ERR , ( " read error memdup failed for %u \n " , len ) ) ;
2007-04-10 00:03:39 +04:00
/* sigh */
2007-04-10 13:33:21 +04:00
goto failed ;
2007-04-10 00:03:39 +04:00
}
2007-04-13 14:38:24 +04:00
queue - > callback ( d2 , len , queue - > private_data ) ;
2007-04-10 00:03:39 +04:00
data + = len ;
nread - = len ;
}
if ( nread > 0 ) {
/* we have only part of a packet */
if ( data_base = = data ) {
2007-04-10 13:33:21 +04:00
queue - > partial . data = data ;
queue - > partial . length = nread ;
2007-04-10 00:03:39 +04:00
} else {
2007-04-10 13:33:21 +04:00
queue - > partial . data = talloc_memdup ( queue , data , nread ) ;
if ( queue - > partial . data = = NULL ) {
2008-02-04 12:07:15 +03:00
DEBUG ( DEBUG_ERR , ( " read error memdup partial failed for %u \n " ,
2007-05-29 07:58:41 +04:00
( unsigned ) nread ) ) ;
2007-04-10 13:33:21 +04:00
goto failed ;
2007-04-10 00:03:39 +04:00
}
2007-04-10 13:33:21 +04:00
queue - > partial . length = nread ;
2007-04-10 00:03:39 +04:00
talloc_free ( data_base ) ;
}
return ;
}
talloc_free ( data_base ) ;
2007-04-10 13:33:21 +04:00
return ;
failed :
2007-04-13 14:38:24 +04:00
queue - > callback ( NULL , 0 , queue - > private_data ) ;
2007-04-10 13:33:21 +04:00
}
/* used when an event triggers a dead queue */
static void queue_dead ( struct event_context * ev , struct timed_event * te ,
2007-04-13 14:38:24 +04:00
struct timeval t , void * private_data )
2007-04-10 13:33:21 +04:00
{
2007-04-13 14:38:24 +04:00
struct ctdb_queue * queue = talloc_get_type ( private_data , struct ctdb_queue ) ;
queue - > callback ( NULL , 0 , queue - > private_data ) ;
2007-04-10 00:03:39 +04:00
}
2007-04-10 13:33:21 +04:00
/*
called when an incoming connection is writeable
*/
static void queue_io_write ( struct ctdb_queue * queue )
{
while ( queue - > out_queue ) {
struct ctdb_queue_pkt * pkt = queue - > out_queue ;
ssize_t n ;
2007-04-29 18:19:40 +04:00
if ( queue - > ctdb - > flags & CTDB_FLAG_TORTURE ) {
n = write ( queue - > fd , pkt - > data , 1 ) ;
} else {
n = write ( queue - > fd , pkt - > data , pkt - > length ) ;
}
2007-04-10 13:33:21 +04:00
if ( n = = - 1 & & errno ! = EAGAIN & & errno ! = EWOULDBLOCK ) {
2007-05-26 10:41:32 +04:00
if ( pkt - > length ! = pkt - > full_length ) {
/* partial packet sent - we have to drop it */
DLIST_REMOVE ( queue - > out_queue , pkt ) ;
talloc_free ( pkt ) ;
}
2007-05-15 08:08:58 +04:00
talloc_free ( queue - > fde ) ;
queue - > fde = NULL ;
queue - > fd = - 1 ;
2007-04-10 13:33:21 +04:00
event_add_timed ( queue - > ctdb - > ev , queue , timeval_zero ( ) ,
queue_dead , queue ) ;
return ;
}
if ( n < = 0 ) return ;
if ( n ! = pkt - > length ) {
pkt - > length - = n ;
pkt - > data + = n ;
return ;
}
DLIST_REMOVE ( queue - > out_queue , pkt ) ;
talloc_free ( pkt ) ;
}
EVENT_FD_NOT_WRITEABLE ( queue - > fde ) ;
}
/*
called when an incoming connection is readable or writeable
*/
static void queue_io_handler ( struct event_context * ev , struct fd_event * fde ,
2007-04-13 14:38:24 +04:00
uint16_t flags , void * private_data )
2007-04-10 13:33:21 +04:00
{
2007-04-13 14:38:24 +04:00
struct ctdb_queue * queue = talloc_get_type ( private_data , struct ctdb_queue ) ;
2007-04-10 13:33:21 +04:00
if ( flags & EVENT_FD_READ ) {
queue_io_read ( queue ) ;
} else {
queue_io_write ( queue ) ;
}
}
/*
queue a packet for sending
*/
int ctdb_queue_send ( struct ctdb_queue * queue , uint8_t * data , uint32_t length )
{
struct ctdb_queue_pkt * pkt ;
2007-05-28 02:37:54 +04:00
uint32_t length2 , full_length ;
2007-04-10 13:33:21 +04:00
2007-05-03 06:16:03 +04:00
if ( queue - > alignment ) {
/* enforce the length and alignment rules from the tcp packet allocator */
length2 = ( length + ( queue - > alignment - 1 ) ) & ~ ( queue - > alignment - 1 ) ;
* ( uint32_t * ) data = length2 ;
} else {
length2 = length ;
}
2007-04-10 13:33:21 +04:00
if ( length2 ! = length ) {
memset ( data + length , 0 , length2 - length ) ;
}
2007-05-28 02:37:54 +04:00
full_length = length2 ;
2007-04-10 13:33:21 +04:00
/* if the queue is empty then try an immediate write, avoiding
queue overhead . This relies on non - blocking sockets */
2007-04-29 18:19:40 +04:00
if ( queue - > out_queue = = NULL & & queue - > fd ! = - 1 & &
! ( queue - > ctdb - > flags & CTDB_FLAG_TORTURE ) ) {
2007-04-10 13:33:21 +04:00
ssize_t n = write ( queue - > fd , data , length2 ) ;
if ( n = = - 1 & & errno ! = EAGAIN & & errno ! = EWOULDBLOCK ) {
2007-05-15 08:08:58 +04:00
talloc_free ( queue - > fde ) ;
queue - > fde = NULL ;
queue - > fd = - 1 ;
2007-04-10 13:33:21 +04:00
event_add_timed ( queue - > ctdb - > ev , queue , timeval_zero ( ) ,
queue_dead , queue ) ;
/* yes, we report success, as the dead node is
handled via a separate event */
return 0 ;
}
if ( n > 0 ) {
data + = n ;
length2 - = n ;
}
if ( length2 = = 0 ) return 0 ;
}
pkt = talloc ( queue , struct ctdb_queue_pkt ) ;
CTDB_NO_MEMORY ( queue - > ctdb , pkt ) ;
pkt - > data = talloc_memdup ( pkt , data , length2 ) ;
CTDB_NO_MEMORY ( queue - > ctdb , pkt - > data ) ;
pkt - > length = length2 ;
2007-05-28 02:37:54 +04:00
pkt - > full_length = full_length ;
2007-04-10 13:33:21 +04:00
if ( queue - > out_queue = = NULL & & queue - > fd ! = - 1 ) {
EVENT_FD_WRITEABLE ( queue - > fde ) ;
}
DLIST_ADD_END ( queue - > out_queue , pkt , struct ctdb_queue_pkt * ) ;
2008-04-01 08:34:54 +04:00
if ( queue - > ctdb - > tunable . verbose_memory_names ! = 0 ) {
2008-04-01 04:31:42 +04:00
struct ctdb_req_header * hdr = ( struct ctdb_req_header * ) pkt - > data ;
switch ( hdr - > operation ) {
case CTDB_REQ_CONTROL : {
struct ctdb_req_control * c = ( struct ctdb_req_control * ) hdr ;
talloc_set_name ( pkt , " ctdb_queue_pkt: control opcode=%u srvid=%llu datalen=%u " ,
( unsigned ) c - > opcode , ( unsigned long long ) c - > srvid , ( unsigned ) c - > datalen ) ;
break ;
}
case CTDB_REQ_MESSAGE : {
struct ctdb_req_message * m = ( struct ctdb_req_message * ) hdr ;
talloc_set_name ( pkt , " ctdb_queue_pkt: message srvid=%llu datalen=%u " ,
( unsigned long long ) m - > srvid , ( unsigned ) m - > datalen ) ;
break ;
}
default :
talloc_set_name ( pkt , " ctdb_queue_pkt: operation=%u length=%u src=%u dest=%u " ,
( unsigned ) hdr - > operation , ( unsigned ) hdr - > length ,
( unsigned ) hdr - > srcnode , ( unsigned ) hdr - > destnode ) ;
break ;
}
}
2007-04-10 13:33:21 +04:00
return 0 ;
}
/*
setup the fd used by the queue
*/
int ctdb_queue_set_fd ( struct ctdb_queue * queue , int fd )
{
queue - > fd = fd ;
talloc_free ( queue - > fde ) ;
queue - > fde = NULL ;
if ( fd ! = - 1 ) {
2007-05-05 11:19:59 +04:00
queue - > fde = event_add_fd ( queue - > ctdb - > ev , queue , fd , EVENT_FD_READ | EVENT_FD_AUTOCLOSE ,
2007-04-10 13:33:21 +04:00
queue_io_handler , queue ) ;
if ( queue - > fde = = NULL ) {
return - 1 ;
}
if ( queue - > out_queue ) {
EVENT_FD_WRITEABLE ( queue - > fde ) ;
}
}
return 0 ;
}
/*
setup a packet queue on a socket
*/
struct ctdb_queue * ctdb_queue_setup ( struct ctdb_context * ctdb ,
TALLOC_CTX * mem_ctx , int fd , int alignment ,
ctdb_queue_cb_fn_t callback ,
2007-04-13 14:38:24 +04:00
void * private_data )
2007-04-10 13:33:21 +04:00
{
struct ctdb_queue * queue ;
queue = talloc_zero ( mem_ctx , struct ctdb_queue ) ;
CTDB_NO_MEMORY_NULL ( ctdb , queue ) ;
queue - > ctdb = ctdb ;
queue - > fd = fd ;
queue - > alignment = alignment ;
2007-04-13 14:38:24 +04:00
queue - > private_data = private_data ;
2007-04-10 13:33:21 +04:00
queue - > callback = callback ;
if ( fd ! = - 1 ) {
if ( ctdb_queue_set_fd ( queue , fd ) ! = 0 ) {
talloc_free ( queue ) ;
return NULL ;
}
}
return queue ;
}