2007-04-10 00:03:39 +04:00
/*
ctdb database library
Utility functions to read / write blobs of data from a file descriptor
and handle the case where we might need multiple read / writes to get all the
data .
Copyright ( C ) Andrew Tridgell 2006
2007-05-31 07:50:53 +04:00
This program is free software ; you can redistribute it and / or modify
it under the terms of the GNU General Public License as published by
2007-07-10 09:29:31 +04:00
the Free Software Foundation ; either version 3 of the License , or
2007-05-31 07:50:53 +04:00
( at your option ) any later version .
This program is distributed in the hope that it will be useful ,
2007-04-10 00:03:39 +04:00
but WITHOUT ANY WARRANTY ; without even the implied warranty of
2007-05-31 07:50:53 +04:00
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE . See the
GNU General Public License for more details .
You should have received a copy of the GNU General Public License
2007-07-10 09:29:31 +04:00
along with this program ; if not , see < http : //www.gnu.org/licenses/>.
2007-04-10 00:03:39 +04:00
*/
2015-10-26 08:50:46 +03:00
# include "replace.h"
2007-04-10 00:03:39 +04:00
# include "system/network.h"
# include "system/filesys.h"
2015-10-26 08:50:46 +03:00
# include <tdb.h>
# include <talloc.h>
# include <tevent.h>
# include "lib/util/dlinklist.h"
# include "lib/util/debug.h"
# include "ctdb_logging.h"
# include "ctdb_private.h"
# include "ctdb_client.h"
2007-04-10 00:03:39 +04:00
2015-10-23 06:11:53 +03:00
# include "common/system.h"
2013-08-21 08:42:06 +04:00
# define QUEUE_BUFFER_SIZE (16*1024)
2007-04-10 13:33:21 +04:00
/* structures for packet queueing - see common/ctdb_io.c */
2013-01-18 03:42:14 +04:00
struct ctdb_buffer {
2007-04-10 13:33:21 +04:00
uint8_t * data ;
uint32_t length ;
2013-01-18 03:42:14 +04:00
uint32_t size ;
2013-08-21 08:42:06 +04:00
uint32_t extend ;
2007-04-10 13:33:21 +04:00
} ;
2007-04-10 00:03:39 +04:00
2007-04-10 13:33:21 +04:00
struct ctdb_queue_pkt {
struct ctdb_queue_pkt * next , * prev ;
uint8_t * data ;
uint32_t length ;
2007-05-26 10:41:32 +04:00
uint32_t full_length ;
2014-07-21 13:42:54 +04:00
uint8_t buf [ ] ;
2007-04-10 13:33:21 +04:00
} ;
struct ctdb_queue {
struct ctdb_context * ctdb ;
2013-02-22 05:59:39 +04:00
struct tevent_immediate * im ;
2013-01-18 03:42:14 +04:00
struct ctdb_buffer buffer ; /* input buffer */
2010-02-04 06:14:18 +03:00
struct ctdb_queue_pkt * out_queue , * out_queue_tail ;
2009-10-26 04:20:52 +03:00
uint32_t out_queue_length ;
2015-10-26 08:50:09 +03:00
struct tevent_fd * fde ;
2007-04-10 13:33:21 +04:00
int fd ;
size_t alignment ;
2007-04-13 14:38:24 +04:00
void * private_data ;
2007-04-10 13:33:21 +04:00
ctdb_queue_cb_fn_t callback ;
2009-12-02 01:27:42 +03:00
bool * destroyed ;
2010-07-01 17:08:49 +04:00
const char * name ;
2007-04-10 13:33:21 +04:00
} ;
2009-10-21 08:20:55 +04:00
int ctdb_queue_length ( struct ctdb_queue * queue )
{
2009-10-26 04:20:52 +03:00
return queue - > out_queue_length ;
2009-10-21 08:20:55 +04:00
}
2013-01-18 03:42:14 +04:00
static void queue_process ( struct ctdb_queue * queue ) ;
2013-02-22 05:59:39 +04:00
static void queue_process_event ( struct tevent_context * ev , struct tevent_immediate * im ,
void * private_data )
2013-01-18 03:42:14 +04:00
{
struct ctdb_queue * queue = talloc_get_type ( private_data , struct ctdb_queue ) ;
queue_process ( queue ) ;
}
/*
* This function is used to process data in queue buffer .
*
* Queue callback function can end up freeing the queue , there should not be a
* loop processing packets from queue buffer . Instead set up a timed event for
* immediate run to process remaining packets from buffer .
*/
static void queue_process ( struct ctdb_queue * queue )
{
uint32_t pkt_size ;
uint8_t * data ;
if ( queue - > buffer . length < sizeof ( pkt_size ) ) {
return ;
}
pkt_size = * ( uint32_t * ) queue - > buffer . data ;
if ( pkt_size = = 0 ) {
DEBUG ( DEBUG_CRIT , ( " Invalid packet of length 0 \n " ) ) ;
goto failed ;
}
if ( queue - > buffer . length < pkt_size ) {
2013-08-21 08:42:06 +04:00
if ( pkt_size > QUEUE_BUFFER_SIZE ) {
queue - > buffer . extend = pkt_size ;
}
2013-01-18 03:42:14 +04:00
return ;
}
/* Extract complete packet */
data = talloc_size ( queue , pkt_size ) ;
if ( data = = NULL ) {
DEBUG ( DEBUG_ERR , ( " read error alloc failed for %u \n " , pkt_size ) ) ;
return ;
}
memcpy ( data , queue - > buffer . data , pkt_size ) ;
/* Shift packet out from buffer */
if ( queue - > buffer . length > pkt_size ) {
memmove ( queue - > buffer . data ,
queue - > buffer . data + pkt_size ,
queue - > buffer . length - pkt_size ) ;
}
queue - > buffer . length - = pkt_size ;
if ( queue - > buffer . length > 0 ) {
2013-02-22 05:59:39 +04:00
/* There is more data to be processed, schedule an event */
tevent_schedule_immediate ( queue - > im , queue - > ctdb - > ev ,
queue_process_event , queue ) ;
2013-08-21 08:42:06 +04:00
} else {
if ( queue - > buffer . size > QUEUE_BUFFER_SIZE ) {
TALLOC_FREE ( queue - > buffer . data ) ;
queue - > buffer . size = 0 ;
}
2013-01-18 03:42:14 +04:00
}
/* It is the responsibility of the callback to free 'data' */
queue - > callback ( data , pkt_size , queue - > private_data ) ;
return ;
failed :
queue - > callback ( NULL , 0 , queue - > private_data ) ;
}
2007-04-10 13:33:21 +04:00
/*
called when an incoming connection is readable
2011-07-31 05:14:54 +04:00
This function MUST be safe for reentry via the queue callback !
2007-04-10 13:33:21 +04:00
*/
static void queue_io_read ( struct ctdb_queue * queue )
2007-04-10 00:03:39 +04:00
{
int num_ready = 0 ;
2011-07-31 05:14:54 +04:00
ssize_t nread ;
uint8_t * data ;
2013-08-21 08:42:06 +04:00
int navail ;
2007-04-10 00:03:39 +04:00
2012-01-04 14:41:12 +04:00
/* check how much data is available on the socket for immediately
guaranteed nonblocking access .
as long as we are careful never to try to read more than this
we know all reads will be successful and will neither block
nor fail with a " data not available right now " error
*/
2007-04-29 18:19:40 +04:00
if ( ioctl ( queue - > fd , FIONREAD , & num_ready ) ! = 0 ) {
return ;
}
if ( num_ready = = 0 ) {
2007-04-10 00:03:39 +04:00
/* the descriptor has been closed */
2007-04-10 13:33:21 +04:00
goto failed ;
2007-04-10 00:03:39 +04:00
}
2013-01-18 03:42:14 +04:00
if ( queue - > buffer . data = = NULL ) {
/* starting fresh, allocate buf to read data */
2013-08-21 08:42:06 +04:00
queue - > buffer . data = talloc_size ( queue , QUEUE_BUFFER_SIZE ) ;
2013-01-18 03:42:14 +04:00
if ( queue - > buffer . data = = NULL ) {
2013-08-20 08:20:09 +04:00
DEBUG ( DEBUG_ERR , ( " read error alloc failed for %u \n " , num_ready ) ) ;
2011-07-31 05:14:54 +04:00
goto failed ;
}
2013-08-21 08:42:06 +04:00
queue - > buffer . size = QUEUE_BUFFER_SIZE ;
} else if ( queue - > buffer . extend > 0 ) {
2013-01-18 03:42:14 +04:00
/* extending buffer */
2013-08-21 08:42:06 +04:00
data = talloc_realloc_size ( queue , queue - > buffer . data , queue - > buffer . extend ) ;
2013-01-18 03:42:14 +04:00
if ( data = = NULL ) {
2013-08-21 08:42:06 +04:00
DEBUG ( DEBUG_ERR , ( " read error realloc failed for %u \n " , queue - > buffer . extend ) ) ;
2007-04-10 13:33:21 +04:00
goto failed ;
2007-04-10 00:03:39 +04:00
}
2013-01-18 03:42:14 +04:00
queue - > buffer . data = data ;
2013-08-21 08:42:06 +04:00
queue - > buffer . size = queue - > buffer . extend ;
queue - > buffer . extend = 0 ;
2011-07-31 05:14:54 +04:00
}
2009-12-02 01:27:42 +03:00
2013-08-21 08:42:06 +04:00
navail = queue - > buffer . size - queue - > buffer . length ;
if ( num_ready > navail ) {
num_ready = navail ;
}
if ( num_ready > 0 ) {
2014-07-30 15:03:53 +04:00
nread = sys_read ( queue - > fd ,
queue - > buffer . data + queue - > buffer . length ,
num_ready ) ;
2013-08-21 08:42:06 +04:00
if ( nread < = 0 ) {
DEBUG ( DEBUG_ERR , ( " read error nread=%d \n " , ( int ) nread ) ) ;
goto failed ;
}
queue - > buffer . length + = nread ;
2011-07-31 05:14:54 +04:00
}
2007-04-10 00:03:39 +04:00
2013-01-18 03:42:14 +04:00
queue_process ( queue ) ;
2007-04-10 13:33:21 +04:00
return ;
failed :
2007-04-13 14:38:24 +04:00
queue - > callback ( NULL , 0 , queue - > private_data ) ;
2007-04-10 13:33:21 +04:00
}
/* used when an event triggers a dead queue */
2015-10-26 08:50:09 +03:00
static void queue_dead ( struct tevent_context * ev , struct tevent_immediate * im ,
2013-02-22 05:59:39 +04:00
void * private_data )
2007-04-10 13:33:21 +04:00
{
2007-04-13 14:38:24 +04:00
struct ctdb_queue * queue = talloc_get_type ( private_data , struct ctdb_queue ) ;
queue - > callback ( NULL , 0 , queue - > private_data ) ;
2007-04-10 00:03:39 +04:00
}
2007-04-10 13:33:21 +04:00
/*
called when an incoming connection is writeable
*/
static void queue_io_write ( struct ctdb_queue * queue )
{
while ( queue - > out_queue ) {
struct ctdb_queue_pkt * pkt = queue - > out_queue ;
ssize_t n ;
2007-04-29 18:19:40 +04:00
if ( queue - > ctdb - > flags & CTDB_FLAG_TORTURE ) {
2015-02-23 04:38:11 +03:00
n = write ( queue - > fd , pkt - > data , 1 ) ;
2007-04-29 18:19:40 +04:00
} else {
2015-02-23 04:38:11 +03:00
n = write ( queue - > fd , pkt - > data , pkt - > length ) ;
2007-04-29 18:19:40 +04:00
}
2007-04-10 13:33:21 +04:00
if ( n = = - 1 & & errno ! = EAGAIN & & errno ! = EWOULDBLOCK ) {
2007-05-26 10:41:32 +04:00
if ( pkt - > length ! = pkt - > full_length ) {
/* partial packet sent - we have to drop it */
2010-02-07 11:02:06 +03:00
DLIST_REMOVE ( queue - > out_queue , pkt ) ;
2009-10-26 04:20:52 +03:00
queue - > out_queue_length - - ;
2007-05-26 10:41:32 +04:00
talloc_free ( pkt ) ;
}
2007-05-15 08:08:58 +04:00
talloc_free ( queue - > fde ) ;
queue - > fde = NULL ;
queue - > fd = - 1 ;
2013-02-22 05:59:39 +04:00
tevent_schedule_immediate ( queue - > im , queue - > ctdb - > ev ,
queue_dead , queue ) ;
2007-04-10 13:33:21 +04:00
return ;
}
if ( n < = 0 ) return ;
if ( n ! = pkt - > length ) {
pkt - > length - = n ;
pkt - > data + = n ;
return ;
}
2010-02-07 11:02:06 +03:00
DLIST_REMOVE ( queue - > out_queue , pkt ) ;
2009-10-26 04:20:52 +03:00
queue - > out_queue_length - - ;
2007-04-10 13:33:21 +04:00
talloc_free ( pkt ) ;
}
2015-10-26 08:50:09 +03:00
TEVENT_FD_NOT_WRITEABLE ( queue - > fde ) ;
2007-04-10 13:33:21 +04:00
}
/*
called when an incoming connection is readable or writeable
*/
2015-10-26 08:50:09 +03:00
static void queue_io_handler ( struct tevent_context * ev , struct tevent_fd * fde ,
2007-04-13 14:38:24 +04:00
uint16_t flags , void * private_data )
2007-04-10 13:33:21 +04:00
{
2007-04-13 14:38:24 +04:00
struct ctdb_queue * queue = talloc_get_type ( private_data , struct ctdb_queue ) ;
2007-04-10 13:33:21 +04:00
2015-10-26 08:50:09 +03:00
if ( flags & TEVENT_FD_READ ) {
2007-04-10 13:33:21 +04:00
queue_io_read ( queue ) ;
} else {
queue_io_write ( queue ) ;
}
}
/*
queue a packet for sending
*/
int ctdb_queue_send ( struct ctdb_queue * queue , uint8_t * data , uint32_t length )
{
2014-07-21 13:48:45 +04:00
struct ctdb_req_header * hdr = ( struct ctdb_req_header * ) data ;
2007-04-10 13:33:21 +04:00
struct ctdb_queue_pkt * pkt ;
2007-05-28 02:37:54 +04:00
uint32_t length2 , full_length ;
2007-04-10 13:33:21 +04:00
2007-05-03 06:16:03 +04:00
if ( queue - > alignment ) {
/* enforce the length and alignment rules from the tcp packet allocator */
length2 = ( length + ( queue - > alignment - 1 ) ) & ~ ( queue - > alignment - 1 ) ;
* ( uint32_t * ) data = length2 ;
} else {
length2 = length ;
}
2007-04-10 13:33:21 +04:00
if ( length2 ! = length ) {
memset ( data + length , 0 , length2 - length ) ;
}
2007-05-28 02:37:54 +04:00
full_length = length2 ;
2007-04-10 13:33:21 +04:00
/* if the queue is empty then try an immediate write, avoiding
queue overhead . This relies on non - blocking sockets */
2007-04-29 18:19:40 +04:00
if ( queue - > out_queue = = NULL & & queue - > fd ! = - 1 & &
! ( queue - > ctdb - > flags & CTDB_FLAG_TORTURE ) ) {
2015-02-23 04:38:11 +03:00
ssize_t n = write ( queue - > fd , data , length2 ) ;
2007-04-10 13:33:21 +04:00
if ( n = = - 1 & & errno ! = EAGAIN & & errno ! = EWOULDBLOCK ) {
2007-05-15 08:08:58 +04:00
talloc_free ( queue - > fde ) ;
queue - > fde = NULL ;
queue - > fd = - 1 ;
2013-02-22 05:59:39 +04:00
tevent_schedule_immediate ( queue - > im , queue - > ctdb - > ev ,
queue_dead , queue ) ;
2007-04-10 13:33:21 +04:00
/* yes, we report success, as the dead node is
handled via a separate event */
return 0 ;
}
if ( n > 0 ) {
data + = n ;
length2 - = n ;
}
if ( length2 = = 0 ) return 0 ;
}
2014-07-21 13:42:54 +04:00
pkt = talloc_size (
queue , offsetof ( struct ctdb_queue_pkt , buf ) + length2 ) ;
2007-04-10 13:33:21 +04:00
CTDB_NO_MEMORY ( queue - > ctdb , pkt ) ;
2014-07-21 13:42:54 +04:00
talloc_set_name_const ( pkt , " struct ctdb_queue_pkt " ) ;
2007-04-10 13:33:21 +04:00
2014-07-21 13:42:54 +04:00
pkt - > data = pkt - > buf ;
memcpy ( pkt - > data , data , length2 ) ;
2007-04-10 13:33:21 +04:00
pkt - > length = length2 ;
2007-05-28 02:37:54 +04:00
pkt - > full_length = full_length ;
2007-04-10 13:33:21 +04:00
if ( queue - > out_queue = = NULL & & queue - > fd ! = - 1 ) {
2015-10-26 08:50:09 +03:00
TEVENT_FD_WRITEABLE ( queue - > fde ) ;
2007-04-10 13:33:21 +04:00
}
2010-02-07 11:02:06 +03:00
DLIST_ADD_END ( queue - > out_queue , pkt , NULL ) ;
2010-02-04 06:14:18 +03:00
2009-10-26 04:20:52 +03:00
queue - > out_queue_length + + ;
2007-04-10 13:33:21 +04:00
2008-04-01 08:34:54 +04:00
if ( queue - > ctdb - > tunable . verbose_memory_names ! = 0 ) {
2008-04-01 04:31:42 +04:00
switch ( hdr - > operation ) {
case CTDB_REQ_CONTROL : {
struct ctdb_req_control * c = ( struct ctdb_req_control * ) hdr ;
2010-07-01 17:08:49 +04:00
talloc_set_name ( pkt , " ctdb_queue_pkt: %s control opcode=%u srvid=%llu datalen=%u " ,
queue - > name , ( unsigned ) c - > opcode , ( unsigned long long ) c - > srvid , ( unsigned ) c - > datalen ) ;
2008-04-01 04:31:42 +04:00
break ;
}
case CTDB_REQ_MESSAGE : {
struct ctdb_req_message * m = ( struct ctdb_req_message * ) hdr ;
2010-07-01 17:08:49 +04:00
talloc_set_name ( pkt , " ctdb_queue_pkt: %s message srvid=%llu datalen=%u " ,
queue - > name , ( unsigned long long ) m - > srvid , ( unsigned ) m - > datalen ) ;
2008-04-01 04:31:42 +04:00
break ;
}
default :
2010-07-01 17:08:49 +04:00
talloc_set_name ( pkt , " ctdb_queue_pkt: %s operation=%u length=%u src=%u dest=%u " ,
queue - > name , ( unsigned ) hdr - > operation , ( unsigned ) hdr - > length ,
2008-04-01 04:31:42 +04:00
( unsigned ) hdr - > srcnode , ( unsigned ) hdr - > destnode ) ;
break ;
}
}
2007-04-10 13:33:21 +04:00
return 0 ;
}
/*
setup the fd used by the queue
*/
int ctdb_queue_set_fd ( struct ctdb_queue * queue , int fd )
{
queue - > fd = fd ;
talloc_free ( queue - > fde ) ;
queue - > fde = NULL ;
if ( fd ! = - 1 ) {
2015-10-26 08:50:09 +03:00
queue - > fde = tevent_add_fd ( queue - > ctdb - > ev , queue , fd ,
TEVENT_FD_READ ,
queue_io_handler , queue ) ;
2007-04-10 13:33:21 +04:00
if ( queue - > fde = = NULL ) {
return - 1 ;
}
2010-08-18 03:46:31 +04:00
tevent_fd_set_auto_close ( queue - > fde ) ;
2007-04-10 13:33:21 +04:00
if ( queue - > out_queue ) {
2015-10-26 08:50:09 +03:00
TEVENT_FD_WRITEABLE ( queue - > fde ) ;
2007-04-10 13:33:21 +04:00
}
}
return 0 ;
}
2009-12-02 01:27:42 +03:00
/* If someone sets up this pointer, they want to know if the queue is freed */
static int queue_destructor ( struct ctdb_queue * queue )
{
2013-01-18 03:42:14 +04:00
TALLOC_FREE ( queue - > buffer . data ) ;
queue - > buffer . length = 0 ;
queue - > buffer . size = 0 ;
2009-12-02 01:27:42 +03:00
if ( queue - > destroyed ! = NULL )
* queue - > destroyed = true ;
return 0 ;
}
2007-04-10 13:33:21 +04:00
/*
setup a packet queue on a socket
*/
struct ctdb_queue * ctdb_queue_setup ( struct ctdb_context * ctdb ,
TALLOC_CTX * mem_ctx , int fd , int alignment ,
ctdb_queue_cb_fn_t callback ,
2010-07-01 17:08:49 +04:00
void * private_data , const char * fmt , . . . )
2007-04-10 13:33:21 +04:00
{
struct ctdb_queue * queue ;
2010-07-01 17:08:49 +04:00
va_list ap ;
2007-04-10 13:33:21 +04:00
queue = talloc_zero ( mem_ctx , struct ctdb_queue ) ;
CTDB_NO_MEMORY_NULL ( ctdb , queue ) ;
2010-07-01 17:08:49 +04:00
va_start ( ap , fmt ) ;
queue - > name = talloc_vasprintf ( mem_ctx , fmt , ap ) ;
va_end ( ap ) ;
CTDB_NO_MEMORY_NULL ( ctdb , queue - > name ) ;
2007-04-10 13:33:21 +04:00
2013-02-22 05:59:39 +04:00
queue - > im = tevent_create_immediate ( queue ) ;
CTDB_NO_MEMORY_NULL ( ctdb , queue - > im ) ;
2007-04-10 13:33:21 +04:00
queue - > ctdb = ctdb ;
queue - > fd = fd ;
queue - > alignment = alignment ;
2007-04-13 14:38:24 +04:00
queue - > private_data = private_data ;
2007-04-10 13:33:21 +04:00
queue - > callback = callback ;
if ( fd ! = - 1 ) {
if ( ctdb_queue_set_fd ( queue , fd ) ! = 0 ) {
talloc_free ( queue ) ;
return NULL ;
}
}
2009-12-02 01:27:42 +03:00
talloc_set_destructor ( queue , queue_destructor ) ;
2007-04-10 13:33:21 +04:00
return queue ;
}