2016-09-16 09:13:18 +03:00
/*
Generic Unix - domain Socket I / O
Copyright ( C ) Amitay Isaacs 2016
This program is free software ; you can redistribute it and / or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation ; either version 3 of the License , or
( at your option ) any later version .
This program is distributed in the hope that it will be useful ,
but WITHOUT ANY WARRANTY ; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE . See the
GNU General Public License for more details .
You should have received a copy of the GNU General Public License
along with this program ; if not , see < http : //www.gnu.org/licenses/>.
*/
# include "replace.h"
# include "system/filesys.h"
# include "system/network.h"
# include <talloc.h>
# include <tevent.h>
# include "lib/util/sys_rw.h"
# include "lib/util/debug.h"
# include "lib/util/blocking.h"
# include "common/logging.h"
# include "common/sock_io.h"
int sock_connect ( const char * sockpath )
{
struct sockaddr_un addr ;
size_t len ;
int fd , ret ;
if ( sockpath = = NULL ) {
D_ERR ( " Invalid socket path \n " ) ;
return - 1 ;
}
memset ( & addr , 0 , sizeof ( addr ) ) ;
addr . sun_family = AF_UNIX ;
len = strlcpy ( addr . sun_path , sockpath , sizeof ( addr . sun_path ) ) ;
if ( len > = sizeof ( addr . sun_path ) ) {
D_ERR ( " Socket path too long, len=%zu \n " , strlen ( sockpath ) ) ;
return - 1 ;
}
fd = socket ( AF_UNIX , SOCK_STREAM , 0 ) ;
if ( fd = = - 1 ) {
D_ERR ( " socket() failed, errno=%d \n " , errno ) ;
return - 1 ;
}
ret = connect ( fd , ( struct sockaddr * ) & addr , sizeof ( addr ) ) ;
if ( ret = = - 1 ) {
D_ERR ( " connect() failed, errno=%d \n " , errno ) ;
close ( fd ) ;
return - 1 ;
}
return fd ;
}
struct sock_queue {
struct tevent_context * ev ;
sock_queue_callback_fn_t callback ;
void * private_data ;
int fd ;
struct tevent_immediate * im ;
struct tevent_queue * queue ;
struct tevent_fd * fde ;
uint8_t * buf ;
2017-01-04 09:46:54 +03:00
size_t buflen , begin , end ;
2016-09-16 09:13:18 +03:00
} ;
static bool sock_queue_set_fd ( struct sock_queue * queue , int fd ) ;
static int sock_queue_destructor ( struct sock_queue * queue ) ;
static void sock_queue_handler ( struct tevent_context * ev ,
struct tevent_fd * fde , uint16_t flags ,
void * private_data ) ;
static void sock_queue_process ( struct sock_queue * queue ) ;
static void sock_queue_process_event ( struct tevent_context * ev ,
struct tevent_immediate * im ,
void * private_data ) ;
struct sock_queue * sock_queue_setup ( TALLOC_CTX * mem_ctx ,
struct tevent_context * ev ,
int fd ,
sock_queue_callback_fn_t callback ,
void * private_data )
{
struct sock_queue * queue ;
queue = talloc_zero ( mem_ctx , struct sock_queue ) ;
if ( queue = = NULL ) {
return NULL ;
}
queue - > ev = ev ;
queue - > callback = callback ;
queue - > private_data = private_data ;
queue - > im = tevent_create_immediate ( queue ) ;
if ( queue - > im = = NULL ) {
talloc_free ( queue ) ;
return NULL ;
}
queue - > queue = tevent_queue_create ( queue , " out-queue " ) ;
if ( queue - > queue = = NULL ) {
talloc_free ( queue ) ;
return NULL ;
}
if ( ! sock_queue_set_fd ( queue , fd ) ) {
talloc_free ( queue ) ;
return NULL ;
}
talloc_set_destructor ( queue , sock_queue_destructor ) ;
return queue ;
}
static bool sock_queue_set_fd ( struct sock_queue * queue , int fd )
{
TALLOC_FREE ( queue - > fde ) ;
queue - > fd = fd ;
if ( fd ! = - 1 ) {
int ret ;
ret = set_blocking ( fd , false ) ;
if ( ret ! = 0 ) {
return false ;
}
queue - > fde = tevent_add_fd ( queue - > ev , queue , fd ,
TEVENT_FD_READ ,
sock_queue_handler , queue ) ;
if ( queue - > fde = = NULL ) {
return false ;
}
tevent_fd_set_auto_close ( queue - > fde ) ;
}
return true ;
}
static int sock_queue_destructor ( struct sock_queue * queue )
{
TALLOC_FREE ( queue - > fde ) ;
queue - > fd = - 1 ;
return 0 ;
}
static void sock_queue_handler ( struct tevent_context * ev ,
struct tevent_fd * fde , uint16_t flags ,
void * private_data )
{
struct sock_queue * queue = talloc_get_type_abort (
private_data , struct sock_queue ) ;
int ret , num_ready ;
ssize_t nread ;
ret = ioctl ( queue - > fd , FIONREAD , & num_ready ) ;
if ( ret ! = 0 ) {
/* Ignore */
return ;
}
if ( num_ready = = 0 ) {
/* descriptor has been closed */
goto fail ;
}
2017-01-04 09:46:54 +03:00
if ( num_ready > queue - > buflen - queue - > end ) {
2016-09-16 09:13:18 +03:00
queue - > buf = talloc_realloc_size ( queue , queue - > buf ,
2017-01-04 09:46:54 +03:00
queue - > end + num_ready ) ;
2016-09-16 09:13:18 +03:00
if ( queue - > buf = = NULL ) {
goto fail ;
}
2017-01-04 09:46:54 +03:00
queue - > buflen = queue - > end + num_ready ;
2016-09-16 09:13:18 +03:00
}
2017-01-04 09:46:54 +03:00
nread = sys_read ( queue - > fd , queue - > buf + queue - > end , num_ready ) ;
2016-09-16 09:13:18 +03:00
if ( nread < 0 ) {
goto fail ;
}
2017-01-04 09:46:54 +03:00
queue - > end + = nread ;
2016-09-16 09:13:18 +03:00
sock_queue_process ( queue ) ;
return ;
fail :
queue - > callback ( NULL , 0 , queue - > private_data ) ;
}
static void sock_queue_process ( struct sock_queue * queue )
{
uint32_t pkt_size ;
2017-01-04 09:46:54 +03:00
if ( ( queue - > end - queue - > begin ) < sizeof ( uint32_t ) ) {
2016-09-16 09:13:18 +03:00
/* not enough data */
return ;
}
2017-01-04 09:46:54 +03:00
pkt_size = * ( uint32_t * ) ( queue - > buf + queue - > begin ) ;
2016-09-16 09:13:18 +03:00
if ( pkt_size = = 0 ) {
D_ERR ( " Invalid packet of length 0 \n " ) ;
queue - > callback ( NULL , 0 , queue - > private_data ) ;
}
2017-01-04 09:46:54 +03:00
if ( ( queue - > end - queue - > begin ) < pkt_size ) {
2016-09-16 09:13:18 +03:00
/* not enough data */
return ;
}
2017-01-04 09:46:54 +03:00
queue - > callback ( queue - > buf + queue - > begin , pkt_size ,
queue - > private_data ) ;
queue - > begin + = pkt_size ;
2016-09-16 09:13:18 +03:00
2017-01-04 09:46:54 +03:00
if ( queue - > begin < queue - > end ) {
2016-09-16 09:13:18 +03:00
/* more data to be processed */
tevent_schedule_immediate ( queue - > im , queue - > ev ,
sock_queue_process_event , queue ) ;
} else {
TALLOC_FREE ( queue - > buf ) ;
queue - > buflen = 0 ;
2017-01-04 09:46:54 +03:00
queue - > begin = 0 ;
queue - > end = 0 ;
2016-09-16 09:13:18 +03:00
}
}
static void sock_queue_process_event ( struct tevent_context * ev ,
struct tevent_immediate * im ,
void * private_data )
{
struct sock_queue * queue = talloc_get_type_abort (
private_data , struct sock_queue ) ;
sock_queue_process ( queue ) ;
}
struct sock_queue_write_state {
uint8_t * pkt ;
uint32_t pkt_size ;
} ;
static void sock_queue_trigger ( struct tevent_req * req , void * private_data ) ;
int sock_queue_write ( struct sock_queue * queue , uint8_t * buf , size_t buflen )
{
struct tevent_req * req ;
struct sock_queue_write_state * state ;
bool status ;
if ( buflen > = INT32_MAX ) {
return - 1 ;
}
req = tevent_req_create ( queue , & state , struct sock_queue_write_state ) ;
if ( req = = NULL ) {
return - 1 ;
}
state - > pkt = buf ;
state - > pkt_size = ( uint32_t ) buflen ;
status = tevent_queue_add_entry ( queue - > queue , queue - > ev , req ,
sock_queue_trigger , queue ) ;
if ( ! status ) {
talloc_free ( req ) ;
return - 1 ;
}
return 0 ;
}
static void sock_queue_trigger ( struct tevent_req * req , void * private_data )
{
struct sock_queue * queue = talloc_get_type_abort (
private_data , struct sock_queue ) ;
struct sock_queue_write_state * state = tevent_req_data (
req , struct sock_queue_write_state ) ;
size_t offset = 0 ;
do {
ssize_t nwritten ;
nwritten = sys_write ( queue - > fd , state - > pkt + offset ,
state - > pkt_size - offset ) ;
if ( nwritten < 0 ) {
queue - > callback ( NULL , 0 , queue - > private_data ) ;
return ;
}
offset + = nwritten ;
} while ( offset < state - > pkt_size ) ;
tevent_req_done ( req ) ;
talloc_free ( req ) ;
}