2014-02-24 12:23:49 +00:00
/*
* Unix SMB / CIFS implementation .
* Samba internal messaging functions
* Copyright ( C ) 2013 by Volker Lendecke
*
* This program is free software ; you can redistribute it and / or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation ; either version 3 of the License , or
* ( at your option ) any later version .
*
* This program is distributed in the hope that it will be useful ,
* but WITHOUT ANY WARRANTY ; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE . See the
* GNU General Public License for more details .
*
* You should have received a copy of the GNU General Public License
* along with this program . If not , see < http : //www.gnu.org/licenses/>.
*/
2015-09-30 00:30:28 +02:00
# include "replace.h"
2019-01-28 12:54:07 +01:00
# include "util/util.h"
2015-09-30 00:30:28 +02:00
# include "system/network.h"
# include "system/filesys.h"
2016-08-18 22:00:06 +02:00
# include "system/dir.h"
2016-09-09 16:51:00 +02:00
# include "system/select.h"
2014-02-24 12:23:49 +00:00
# include "lib/util/debug.h"
2019-06-28 23:05:43 +10:00
# include "messages_dgm.h"
2015-10-12 21:30:30 +02:00
# include "lib/util/genrand.h"
2016-09-09 16:51:00 +02:00
# include "lib/util/dlinklist.h"
# include "lib/pthreadpool/pthreadpool_tevent.h"
# include "lib/util/msghdr.h"
# include "lib/util/iov_buf.h"
# include "lib/util/blocking.h"
# include "lib/util/tevent_unix.h"
2020-07-03 08:11:20 +02:00
# include "lib/util/smb_strtox.h"
2016-09-09 16:51:00 +02:00
# define MESSAGING_DGM_FRAGMENT_LENGTH 1024
2014-02-24 12:23:49 +00:00
2014-09-10 09:58:00 +02:00
struct sun_path_buf {
/*
* This will carry enough for a socket path
*/
char buf [ sizeof ( struct sockaddr_un ) ] ;
} ;
2016-09-30 21:53:44 -07:00
/*
* We can only have one tevent_fd per dgm_context and per
* tevent_context . Maintain a list of registered tevent_contexts per
* dgm_context .
*/
struct messaging_dgm_fde_ev {
struct messaging_dgm_fde_ev * prev , * next ;
/*
* Backreference to enable DLIST_REMOVE from our
* destructor . Also , set to NULL when the dgm_context dies
* before the messaging_dgm_fde_ev .
*/
struct messaging_dgm_context * ctx ;
struct tevent_context * ev ;
struct tevent_fd * fde ;
} ;
2016-09-09 16:51:00 +02:00
struct messaging_dgm_out {
struct messaging_dgm_out * prev , * next ;
struct messaging_dgm_context * ctx ;
pid_t pid ;
int sock ;
bool is_blocking ;
uint64_t cookie ;
struct tevent_queue * queue ;
struct tevent_timer * idle_timer ;
} ;
struct messaging_dgm_in_msg {
struct messaging_dgm_in_msg * prev , * next ;
struct messaging_dgm_context * ctx ;
size_t msglen ;
size_t received ;
pid_t sender_pid ;
int sender_sock ;
uint64_t cookie ;
uint8_t buf [ ] ;
} ;
2014-02-24 12:23:49 +00:00
struct messaging_dgm_context {
2016-09-09 16:51:00 +02:00
struct tevent_context * ev ;
2014-07-25 11:03:11 +00:00
pid_t pid ;
2014-10-04 10:58:15 +02:00
struct sun_path_buf socket_dir ;
struct sun_path_buf lockfile_dir ;
2014-02-24 12:23:49 +00:00
int lockfile_fd ;
2014-07-17 09:44:41 +00:00
2016-09-09 16:51:00 +02:00
int sock ;
struct messaging_dgm_in_msg * in_msgs ;
2016-09-30 21:53:44 -07:00
struct messaging_dgm_fde_ev * fde_evs ;
2016-09-23 18:36:15 -07:00
void ( * recv_cb ) ( struct tevent_context * ev ,
const uint8_t * msg ,
2014-07-27 12:29:26 +02:00
size_t msg_len ,
2014-09-30 11:29:22 +02:00
int * fds ,
2014-06-24 07:39:05 +02:00
size_t num_fds ,
2014-07-17 09:44:41 +00:00
void * private_data ) ;
void * recv_cb_private_data ;
2014-07-20 14:51:47 +02:00
bool * have_dgm_context ;
2016-09-09 16:51:00 +02:00
struct pthreadpool_tevent * pool ;
struct messaging_dgm_out * outsocks ;
2014-02-24 12:23:49 +00:00
} ;
2016-09-09 16:51:00 +02:00
/* Set socket close on exec. */
static int prepare_socket_cloexec ( int sock )
{
# ifdef FD_CLOEXEC
int flags ;
2014-09-10 16:13:18 +02:00
2016-09-09 16:51:00 +02:00
flags = fcntl ( sock , F_GETFD , 0 ) ;
if ( flags = = - 1 ) {
return errno ;
}
flags | = FD_CLOEXEC ;
if ( fcntl ( sock , F_SETFD , flags ) = = - 1 ) {
return errno ;
}
# endif
return 0 ;
}
static void close_fd_array ( int * fds , size_t num_fds )
{
size_t i ;
for ( i = 0 ; i < num_fds ; i + + ) {
if ( fds [ i ] = = - 1 ) {
continue ;
}
close ( fds [ i ] ) ;
fds [ i ] = - 1 ;
}
}
2016-10-05 10:46:13 -07:00
/*
* The idle handler can free the struct messaging_dgm_out * ,
* if it ' s unused ( qlen of zero ) which closes the socket .
*/
2016-09-09 16:51:00 +02:00
static void messaging_dgm_out_idle_handler ( struct tevent_context * ev ,
struct tevent_timer * te ,
struct timeval current_time ,
void * private_data )
{
struct messaging_dgm_out * out = talloc_get_type_abort (
private_data , struct messaging_dgm_out ) ;
size_t qlen ;
out - > idle_timer = NULL ;
qlen = tevent_queue_length ( out - > queue ) ;
if ( qlen = = 0 ) {
TALLOC_FREE ( out ) ;
}
}
2016-10-05 10:46:13 -07:00
/*
* Setup the idle handler to fire afer 1 second if the
* queue is zero .
*/
2016-09-09 16:51:00 +02:00
static void messaging_dgm_out_rearm_idle_timer ( struct messaging_dgm_out * out )
{
size_t qlen ;
qlen = tevent_queue_length ( out - > queue ) ;
if ( qlen ! = 0 ) {
TALLOC_FREE ( out - > idle_timer ) ;
return ;
}
if ( out - > idle_timer ! = NULL ) {
tevent_update_timer ( out - > idle_timer ,
tevent_timeval_current_ofs ( 1 , 0 ) ) ;
return ;
}
out - > idle_timer = tevent_add_timer (
out - > ctx - > ev , out , tevent_timeval_current_ofs ( 1 , 0 ) ,
messaging_dgm_out_idle_handler , out ) ;
/*
* No NULL check , we ' ll come back here . Worst case we ' re
* leaking a bit .
*/
}
static int messaging_dgm_out_destructor ( struct messaging_dgm_out * dst ) ;
static void messaging_dgm_out_idle_handler ( struct tevent_context * ev ,
struct tevent_timer * te ,
struct timeval current_time ,
void * private_data ) ;
2016-10-05 10:46:13 -07:00
/*
* Connect to an existing rendezvous point for another
* pid - wrapped inside a struct messaging_dgm_out * .
*/
2016-09-09 16:51:00 +02:00
static int messaging_dgm_out_create ( TALLOC_CTX * mem_ctx ,
struct messaging_dgm_context * ctx ,
pid_t pid , struct messaging_dgm_out * * pout )
{
struct messaging_dgm_out * out ;
struct sockaddr_un addr = { . sun_family = AF_UNIX } ;
int ret = ENOMEM ;
int out_pathlen ;
2017-02-09 14:03:33 +13:00
char addr_buf [ sizeof ( addr . sun_path ) + ( 3 * sizeof ( unsigned ) + 2 ) ] ;
2016-09-09 16:51:00 +02:00
out = talloc ( mem_ctx , struct messaging_dgm_out ) ;
if ( out = = NULL ) {
goto fail ;
}
* out = ( struct messaging_dgm_out ) {
. pid = pid ,
. ctx = ctx ,
. cookie = 1
} ;
2017-02-09 14:03:33 +13:00
out_pathlen = snprintf ( addr_buf , sizeof ( addr_buf ) ,
2016-09-09 16:51:00 +02:00
" %s/%u " , ctx - > socket_dir . buf , ( unsigned ) pid ) ;
if ( out_pathlen < 0 ) {
goto errno_fail ;
}
if ( ( size_t ) out_pathlen > = sizeof ( addr . sun_path ) ) {
ret = ENAMETOOLONG ;
goto fail ;
}
2017-02-09 14:03:33 +13:00
memcpy ( addr . sun_path , addr_buf , out_pathlen + 1 ) ;
2016-09-09 16:51:00 +02:00
out - > queue = tevent_queue_create ( out , addr . sun_path ) ;
if ( out - > queue = = NULL ) {
ret = ENOMEM ;
goto fail ;
}
out - > sock = socket ( AF_UNIX , SOCK_DGRAM , 0 ) ;
if ( out - > sock = = - 1 ) {
goto errno_fail ;
}
DLIST_ADD ( ctx - > outsocks , out ) ;
talloc_set_destructor ( out , messaging_dgm_out_destructor ) ;
do {
ret = connect ( out - > sock ,
( const struct sockaddr * ) ( const void * ) & addr ,
sizeof ( addr ) ) ;
} while ( ( ret = = - 1 ) & & ( errno = = EINTR ) ) ;
if ( ret = = - 1 ) {
goto errno_fail ;
}
ret = set_blocking ( out - > sock , false ) ;
if ( ret = = - 1 ) {
goto errno_fail ;
}
out - > is_blocking = false ;
* pout = out ;
return 0 ;
errno_fail :
ret = errno ;
fail :
TALLOC_FREE ( out ) ;
return ret ;
}
static int messaging_dgm_out_destructor ( struct messaging_dgm_out * out )
{
DLIST_REMOVE ( out - > ctx - > outsocks , out ) ;
2017-08-30 17:49:54 +02:00
if ( ( tevent_queue_length ( out - > queue ) ! = 0 ) & &
2022-07-25 14:29:35 +02:00
( tevent_cached_getpid ( ) = = out - > ctx - > pid ) ) {
2016-09-09 16:51:00 +02:00
/*
* We have pending jobs . We can ' t close the socket ,
* this has been handed over to messaging_dgm_out_queue_state .
*/
return 0 ;
}
if ( out - > sock ! = - 1 ) {
close ( out - > sock ) ;
out - > sock = - 1 ;
}
return 0 ;
}
2016-10-05 10:46:13 -07:00
/*
* Find the struct messaging_dgm_out * to talk to pid .
* If we don ' t have one , create it . Set the timer to
* delete after 1 sec .
*/
2016-09-09 16:51:00 +02:00
static int messaging_dgm_out_get ( struct messaging_dgm_context * ctx , pid_t pid ,
struct messaging_dgm_out * * pout )
{
struct messaging_dgm_out * out ;
int ret ;
for ( out = ctx - > outsocks ; out ! = NULL ; out = out - > next ) {
if ( out - > pid = = pid ) {
break ;
}
}
if ( out = = NULL ) {
ret = messaging_dgm_out_create ( ctx , ctx , pid , & out ) ;
if ( ret ! = 0 ) {
return ret ;
}
}
2019-07-03 12:45:56 +00:00
/*
* shouldn ' t be possible , should be set if messaging_dgm_out_create
* succeeded . This check is to satisfy static checker
*/
if ( out = = NULL ) {
return EINVAL ;
}
2019-12-09 10:47:46 +01:00
messaging_dgm_out_rearm_idle_timer ( out ) ;
2016-09-09 16:51:00 +02:00
* pout = out ;
return 0 ;
}
2016-10-05 10:46:13 -07:00
/*
* This function is called directly to send a message fragment
* when the outgoing queue is zero , and from a pthreadpool
* job thread when messages are being queued ( qlen ! = 0 ) .
* Make sure * ONLY * thread - safe functions are called within .
*/
2016-09-09 16:51:00 +02:00
static ssize_t messaging_dgm_sendmsg ( int sock ,
const struct iovec * iov , int iovlen ,
const int * fds , size_t num_fds ,
int * perrno )
{
struct msghdr msg ;
ssize_t fdlen , ret ;
/*
* Do the actual sendmsg syscall . This will be called from a
* pthreadpool helper thread , so be careful what you do here .
*/
msg = ( struct msghdr ) {
. msg_iov = discard_const_p ( struct iovec , iov ) ,
. msg_iovlen = iovlen
} ;
fdlen = msghdr_prep_fds ( & msg , NULL , 0 , fds , num_fds ) ;
if ( fdlen = = - 1 ) {
* perrno = EINVAL ;
return - 1 ;
}
{
uint8_t buf [ fdlen ] ;
msghdr_prep_fds ( & msg , buf , fdlen , fds , num_fds ) ;
do {
2017-05-29 16:15:50 +02:00
ret = sendmsg ( sock , & msg , 0 ) ;
2016-09-09 16:51:00 +02:00
} while ( ( ret = = - 1 ) & & ( errno = = EINTR ) ) ;
}
if ( ret = = - 1 ) {
* perrno = errno ;
}
return ret ;
}
struct messaging_dgm_out_queue_state {
struct tevent_context * ev ;
struct pthreadpool_tevent * pool ;
struct tevent_req * req ;
struct tevent_req * subreq ;
int sock ;
int * fds ;
uint8_t * buf ;
ssize_t sent ;
int err ;
} ;
static int messaging_dgm_out_queue_state_destructor (
struct messaging_dgm_out_queue_state * state ) ;
static void messaging_dgm_out_queue_trigger ( struct tevent_req * req ,
void * private_data ) ;
static void messaging_dgm_out_threaded_job ( void * private_data ) ;
static void messaging_dgm_out_queue_done ( struct tevent_req * subreq ) ;
2016-10-05 10:46:13 -07:00
/*
* Push a message fragment onto a queue to be sent by a
* threadpool job . Makes copies of data / fd ' s to be sent .
* The running tevent_queue internally creates an immediate
* event to schedule the write .
*/
2016-09-09 16:51:00 +02:00
static struct tevent_req * messaging_dgm_out_queue_send (
TALLOC_CTX * mem_ctx , struct tevent_context * ev ,
struct messaging_dgm_out * out ,
const struct iovec * iov , int iovlen , const int * fds , size_t num_fds )
{
struct tevent_req * req ;
struct messaging_dgm_out_queue_state * state ;
struct tevent_queue_entry * e ;
size_t i ;
ssize_t buflen ;
req = tevent_req_create ( out , & state ,
struct messaging_dgm_out_queue_state ) ;
if ( req = = NULL ) {
return NULL ;
}
state - > ev = ev ;
state - > pool = out - > ctx - > pool ;
state - > sock = out - > sock ;
state - > req = req ;
/*
* Go blocking in a thread
*/
if ( ! out - > is_blocking ) {
int ret = set_blocking ( out - > sock , true ) ;
if ( ret = = - 1 ) {
tevent_req_error ( req , errno ) ;
return tevent_req_post ( req , ev ) ;
}
out - > is_blocking = true ;
}
buflen = iov_buflen ( iov , iovlen ) ;
if ( buflen = = - 1 ) {
tevent_req_error ( req , EMSGSIZE ) ;
return tevent_req_post ( req , ev ) ;
}
state - > buf = talloc_array ( state , uint8_t , buflen ) ;
if ( tevent_req_nomem ( state - > buf , req ) ) {
return tevent_req_post ( req , ev ) ;
}
iov_buf ( iov , iovlen , state - > buf , buflen ) ;
state - > fds = talloc_array ( state , int , num_fds ) ;
if ( tevent_req_nomem ( state - > fds , req ) ) {
return tevent_req_post ( req , ev ) ;
}
for ( i = 0 ; i < num_fds ; i + + ) {
state - > fds [ i ] = - 1 ;
}
for ( i = 0 ; i < num_fds ; i + + ) {
state - > fds [ i ] = dup ( fds [ i ] ) ;
if ( state - > fds [ i ] = = - 1 ) {
int ret = errno ;
close_fd_array ( state - > fds , num_fds ) ;
tevent_req_error ( req , ret ) ;
return tevent_req_post ( req , ev ) ;
}
}
talloc_set_destructor ( state , messaging_dgm_out_queue_state_destructor ) ;
e = tevent_queue_add_entry ( out - > queue , ev , req ,
messaging_dgm_out_queue_trigger , req ) ;
if ( tevent_req_nomem ( e , req ) ) {
return tevent_req_post ( req , ev ) ;
}
return req ;
}
static int messaging_dgm_out_queue_state_destructor (
struct messaging_dgm_out_queue_state * state )
{
int * fds ;
size_t num_fds ;
if ( state - > subreq ! = NULL ) {
/*
* We ' re scheduled , but we ' re destroyed . This happens
* if the messaging_dgm_context is destroyed while
* we ' re stuck in a blocking send . There ' s nothing we
* can do but to leak memory .
*/
TALLOC_FREE ( state - > subreq ) ;
( void ) talloc_reparent ( state - > req , NULL , state ) ;
return - 1 ;
}
fds = state - > fds ;
num_fds = talloc_array_length ( fds ) ;
close_fd_array ( fds , num_fds ) ;
return 0 ;
}
2016-10-05 10:46:13 -07:00
/*
* tevent_queue callback that schedules the pthreadpool to actually
* send the queued message fragment .
*/
2016-09-09 16:51:00 +02:00
static void messaging_dgm_out_queue_trigger ( struct tevent_req * req ,
void * private_data )
{
struct messaging_dgm_out_queue_state * state = tevent_req_data (
req , struct messaging_dgm_out_queue_state ) ;
2016-09-12 14:12:10 +02:00
tevent_req_reset_endtime ( req ) ;
2016-09-09 16:51:00 +02:00
state - > subreq = pthreadpool_tevent_job_send (
state , state - > ev , state - > pool ,
messaging_dgm_out_threaded_job , state ) ;
if ( tevent_req_nomem ( state - > subreq , req ) ) {
return ;
}
tevent_req_set_callback ( state - > subreq , messaging_dgm_out_queue_done ,
req ) ;
}
2016-10-05 10:46:13 -07:00
/*
* Wrapper function run by the pthread that calls
* messaging_dgm_sendmsg ( ) to actually do the sendmsg ( ) .
*/
2016-09-09 16:51:00 +02:00
static void messaging_dgm_out_threaded_job ( void * private_data )
{
struct messaging_dgm_out_queue_state * state = talloc_get_type_abort (
private_data , struct messaging_dgm_out_queue_state ) ;
struct iovec iov = { . iov_base = state - > buf ,
. iov_len = talloc_get_size ( state - > buf ) } ;
size_t num_fds = talloc_array_length ( state - > fds ) ;
2016-11-23 16:51:25 +01:00
int msec = 1 ;
2016-09-09 16:51:00 +02:00
2016-11-23 16:51:25 +01:00
while ( true ) {
int ret ;
state - > sent = messaging_dgm_sendmsg ( state - > sock , & iov , 1 ,
2016-09-09 16:51:00 +02:00
state - > fds , num_fds , & state - > err ) ;
2016-11-23 16:51:25 +01:00
if ( state - > sent ! = - 1 ) {
return ;
}
2019-02-07 15:57:06 +01:00
if ( state - > err ! = ENOBUFS ) {
2016-11-23 16:51:25 +01:00
return ;
}
/*
* ENOBUFS is the FreeBSD way of saying " Try
* again " . We have to do polling.
*/
do {
ret = poll ( NULL , 0 , msec ) ;
} while ( ( ret = = - 1 ) & & ( errno = = EINTR ) ) ;
/*
* Exponential backoff up to once a second
*/
msec * = 2 ;
msec = MIN ( msec , 1000 ) ;
}
2016-09-09 16:51:00 +02:00
}
2016-10-05 10:46:13 -07:00
/*
* Pickup the results of the pthread sendmsg ( ) .
*/
2016-09-09 16:51:00 +02:00
static void messaging_dgm_out_queue_done ( struct tevent_req * subreq )
{
struct tevent_req * req = tevent_req_callback_data (
subreq , struct tevent_req ) ;
struct messaging_dgm_out_queue_state * state = tevent_req_data (
req , struct messaging_dgm_out_queue_state ) ;
int ret ;
if ( subreq ! = state - > subreq ) {
abort ( ) ;
}
ret = pthreadpool_tevent_job_recv ( subreq ) ;
TALLOC_FREE ( subreq ) ;
state - > subreq = NULL ;
if ( tevent_req_error ( req , ret ) ) {
return ;
}
if ( state - > sent = = - 1 ) {
tevent_req_error ( req , state - > err ) ;
return ;
}
tevent_req_done ( req ) ;
}
static int messaging_dgm_out_queue_recv ( struct tevent_req * req )
{
return tevent_req_simple_recv_unix ( req ) ;
}
static void messaging_dgm_out_sent_fragment ( struct tevent_req * req ) ;
2016-10-05 10:46:13 -07:00
/*
* Core function to send a message fragment given a
* connected struct messaging_dgm_out * destination .
* If no current queue tries to send nonblocking
* directly . If not , queues the fragment ( which makes
* a copy of it ) and adds a 60 - second timeout on the send .
*/
2016-09-09 16:51:00 +02:00
static int messaging_dgm_out_send_fragment (
struct tevent_context * ev , struct messaging_dgm_out * out ,
const struct iovec * iov , int iovlen , const int * fds , size_t num_fds )
{
struct tevent_req * req ;
size_t qlen ;
2016-09-12 14:12:10 +02:00
bool ok ;
2016-09-09 16:51:00 +02:00
qlen = tevent_queue_length ( out - > queue ) ;
if ( qlen = = 0 ) {
ssize_t nsent ;
int err = 0 ;
if ( out - > is_blocking ) {
int ret = set_blocking ( out - > sock , false ) ;
if ( ret = = - 1 ) {
return errno ;
}
out - > is_blocking = false ;
}
nsent = messaging_dgm_sendmsg ( out - > sock , iov , iovlen , fds ,
num_fds , & err ) ;
if ( nsent > = 0 ) {
return 0 ;
}
2016-11-23 16:51:25 +01:00
if ( err = = ENOBUFS ) {
/*
* FreeBSD ' s way of telling us the dst socket
* is full . EWOULDBLOCK makes us spawn a
* polling helper thread .
*/
err = EWOULDBLOCK ;
}
2016-09-09 16:51:00 +02:00
if ( err ! = EWOULDBLOCK ) {
return err ;
}
}
req = messaging_dgm_out_queue_send ( out , ev , out , iov , iovlen ,
fds , num_fds ) ;
if ( req = = NULL ) {
return ENOMEM ;
}
tevent_req_set_callback ( req , messaging_dgm_out_sent_fragment , out ) ;
2016-09-12 14:12:10 +02:00
ok = tevent_req_set_endtime ( req , ev ,
tevent_timeval_current_ofs ( 60 , 0 ) ) ;
if ( ! ok ) {
TALLOC_FREE ( req ) ;
return ENOMEM ;
}
2016-09-09 16:51:00 +02:00
return 0 ;
}
2016-10-05 10:46:13 -07:00
/*
* Pickup the result of the fragment send . Reset idle timer
* if queue empty .
*/
2016-09-09 16:51:00 +02:00
static void messaging_dgm_out_sent_fragment ( struct tevent_req * req )
{
struct messaging_dgm_out * out = tevent_req_callback_data (
req , struct messaging_dgm_out ) ;
int ret ;
ret = messaging_dgm_out_queue_recv ( req ) ;
TALLOC_FREE ( req ) ;
if ( ret ! = 0 ) {
DBG_WARNING ( " messaging_out_queue_recv returned %s \n " ,
strerror ( ret ) ) ;
}
messaging_dgm_out_rearm_idle_timer ( out ) ;
}
struct messaging_dgm_fragment_hdr {
size_t msglen ;
pid_t pid ;
int sock ;
} ;
2016-10-05 10:46:13 -07:00
/*
* Fragment a message into MESSAGING_DGM_FRAGMENT_LENGTH - 64 - bit cookie
* size chunks and send it .
*
* Message fragments are prefixed by a 64 - bit cookie that
* stays the same for all fragments . This allows the receiver
* to recognise fragments of the same message and re - assemble
* them on the other end .
*
* Note that this allows other message fragments from other
* senders to be interleaved in the receive read processing ,
* the combination of the cookie and header info allows unique
* identification of the message from a specific sender in
* re - assembly .
*
* If the message is smaller than MESSAGING_DGM_FRAGMENT_LENGTH - cookie
* then send a single message with cookie set to zero .
*
* Otherwise the message is fragmented into chunks and added
* to the sending queue . Any file descriptors are passed only
* in the last fragment .
*
* Finally the cookie is incremented ( wrap over zero ) to
* prepare for the next message sent to this channel .
*
*/
2016-09-09 16:51:00 +02:00
static int messaging_dgm_out_send_fragmented ( struct tevent_context * ev ,
struct messaging_dgm_out * out ,
const struct iovec * iov ,
int iovlen ,
const int * fds , size_t num_fds )
{
ssize_t msglen , sent ;
int ret = 0 ;
struct iovec iov_copy [ iovlen + 2 ] ;
struct messaging_dgm_fragment_hdr hdr ;
struct iovec src_iov ;
if ( iovlen < 0 ) {
return EINVAL ;
}
msglen = iov_buflen ( iov , iovlen ) ;
if ( msglen = = - 1 ) {
return EMSGSIZE ;
}
if ( num_fds > INT8_MAX ) {
return EINVAL ;
}
if ( ( size_t ) msglen < =
( MESSAGING_DGM_FRAGMENT_LENGTH - sizeof ( uint64_t ) ) ) {
uint64_t cookie = 0 ;
iov_copy [ 0 ] . iov_base = & cookie ;
iov_copy [ 0 ] . iov_len = sizeof ( cookie ) ;
if ( iovlen > 0 ) {
memcpy ( & iov_copy [ 1 ] , iov ,
sizeof ( struct iovec ) * iovlen ) ;
}
return messaging_dgm_out_send_fragment (
ev , out , iov_copy , iovlen + 1 , fds , num_fds ) ;
}
hdr = ( struct messaging_dgm_fragment_hdr ) {
. msglen = msglen ,
2022-07-25 14:29:35 +02:00
. pid = tevent_cached_getpid ( ) ,
2016-09-09 16:51:00 +02:00
. sock = out - > sock
} ;
iov_copy [ 0 ] . iov_base = & out - > cookie ;
iov_copy [ 0 ] . iov_len = sizeof ( out - > cookie ) ;
iov_copy [ 1 ] . iov_base = & hdr ;
iov_copy [ 1 ] . iov_len = sizeof ( hdr ) ;
sent = 0 ;
src_iov = iov [ 0 ] ;
/*
* The following write loop sends the user message in pieces . We have
* filled the first two iovecs above with " cookie " and " hdr " . In the
* following loops we pull message chunks from the user iov array and
* fill iov_copy piece by piece , possibly truncating chunks from the
* caller ' s iov array . Ugly , but hopefully efficient .
*/
while ( sent < msglen ) {
size_t fragment_len ;
size_t iov_index = 2 ;
fragment_len = sizeof ( out - > cookie ) + sizeof ( hdr ) ;
while ( fragment_len < MESSAGING_DGM_FRAGMENT_LENGTH ) {
size_t space , chunk ;
space = MESSAGING_DGM_FRAGMENT_LENGTH - fragment_len ;
chunk = MIN ( space , src_iov . iov_len ) ;
iov_copy [ iov_index ] . iov_base = src_iov . iov_base ;
iov_copy [ iov_index ] . iov_len = chunk ;
iov_index + = 1 ;
src_iov . iov_base = ( char * ) src_iov . iov_base + chunk ;
src_iov . iov_len - = chunk ;
fragment_len + = chunk ;
if ( src_iov . iov_len = = 0 ) {
iov + = 1 ;
iovlen - = 1 ;
if ( iovlen = = 0 ) {
break ;
}
src_iov = iov [ 0 ] ;
}
}
sent + = ( fragment_len - sizeof ( out - > cookie ) - sizeof ( hdr ) ) ;
/*
* only the last fragment should pass the fd array .
* That simplifies the receiver a lot .
*/
if ( sent < msglen ) {
ret = messaging_dgm_out_send_fragment (
ev , out , iov_copy , iov_index , NULL , 0 ) ;
} else {
ret = messaging_dgm_out_send_fragment (
ev , out , iov_copy , iov_index , fds , num_fds ) ;
}
if ( ret ! = 0 ) {
break ;
}
}
out - > cookie + = 1 ;
if ( out - > cookie = = 0 ) {
out - > cookie + = 1 ;
}
return ret ;
}
static struct messaging_dgm_context * global_dgm_context ;
2014-02-24 12:23:49 +00:00
static int messaging_dgm_context_destructor ( struct messaging_dgm_context * c ) ;
2014-10-04 10:58:15 +02:00
static int messaging_dgm_lockfile_create ( struct messaging_dgm_context * ctx ,
pid_t pid , int * plockfile_fd ,
2015-10-12 21:30:30 +02:00
uint64_t * punique )
2014-02-24 12:23:49 +00:00
{
2015-09-30 00:30:28 +02:00
char buf [ 64 ] ;
2014-02-24 12:23:49 +00:00
int lockfile_fd ;
2014-10-04 10:58:15 +02:00
struct sun_path_buf lockfile_name ;
2014-07-25 10:42:19 +00:00
struct flock lck ;
2015-10-12 21:30:30 +02:00
uint64_t unique ;
2014-02-24 12:23:49 +00:00
int unique_len , ret ;
ssize_t written ;
2014-10-04 10:58:15 +02:00
ret = snprintf ( lockfile_name . buf , sizeof ( lockfile_name . buf ) ,
2015-09-30 00:30:49 +02:00
" %s/%u " , ctx - > lockfile_dir . buf , ( unsigned ) pid ) ;
2016-08-21 17:46:16 +02:00
if ( ret < 0 ) {
return errno ;
}
if ( ( unsigned ) ret > = sizeof ( lockfile_name . buf ) ) {
2014-10-04 10:58:15 +02:00
return ENAMETOOLONG ;
2014-02-24 12:23:49 +00:00
}
/* no O_EXCL, existence check is via the fcntl lock */
2015-09-30 00:31:17 +02:00
lockfile_fd = open ( lockfile_name . buf , O_NONBLOCK | O_CREAT | O_RDWR ,
2014-09-10 09:58:00 +02:00
0644 ) ;
2015-10-21 15:15:51 +02:00
if ( ( lockfile_fd = = - 1 ) & &
( ( errno = = ENXIO ) /* Linux */ | |
( errno = = ENODEV ) /* Linux kernel bug */ | |
( errno = = EOPNOTSUPP ) /* FreeBSD */ ) ) {
/*
* Huh - - a socket ? This might be a stale socket from
* an upgrade of Samba . Just unlink and retry , nobody
* else is supposed to be here at this time .
*
* Yes , this is racy , but I don ' t see a way to deal
* with this properly .
*/
unlink ( lockfile_name . buf ) ;
lockfile_fd = open ( lockfile_name . buf ,
O_NONBLOCK | O_CREAT | O_WRONLY ,
0644 ) ;
}
2014-02-24 12:23:49 +00:00
if ( lockfile_fd = = - 1 ) {
ret = errno ;
DEBUG ( 1 , ( " %s: open failed: %s \n " , __func__ , strerror ( errno ) ) ) ;
2014-09-10 09:58:00 +02:00
return ret ;
2014-02-24 12:23:49 +00:00
}
2014-07-25 10:42:19 +00:00
lck = ( struct flock ) {
. l_type = F_WRLCK ,
. l_whence = SEEK_SET
} ;
2014-02-24 12:23:49 +00:00
ret = fcntl ( lockfile_fd , F_SETLK , & lck ) ;
if ( ret = = - 1 ) {
ret = errno ;
DEBUG ( 1 , ( " %s: fcntl failed: %s \n " , __func__ , strerror ( ret ) ) ) ;
goto fail_close ;
}
2015-10-12 21:30:30 +02:00
/*
* Directly using the binary value for
* SERVERID_UNIQUE_ID_NOT_TO_VERIFY is a layering
* violation . But including all of ndr here just for this
* seems to be a bit overkill to me . Also , messages_dgm might
* be replaced sooner or later by something streams - based ,
* where unique_id generation will be handled differently .
*/
do {
generate_random_buffer ( ( uint8_t * ) & unique , sizeof ( unique ) ) ;
} while ( unique = = UINT64_C ( 0xFFFFFFFFFFFFFFFF ) ) ;
2021-02-14 09:15:15 +01:00
unique_len = snprintf ( buf , sizeof ( buf ) , " % " PRIu64 " \n " , unique ) ;
2014-02-24 12:23:49 +00:00
/* shorten a potentially preexisting file */
ret = ftruncate ( lockfile_fd , unique_len ) ;
if ( ret = = - 1 ) {
ret = errno ;
DEBUG ( 1 , ( " %s: ftruncate failed: %s \n " , __func__ ,
strerror ( ret ) ) ) ;
goto fail_unlink ;
}
written = write ( lockfile_fd , buf , unique_len ) ;
if ( written ! = unique_len ) {
ret = errno ;
DEBUG ( 1 , ( " %s: write failed: %s \n " , __func__ , strerror ( ret ) ) ) ;
goto fail_unlink ;
}
* plockfile_fd = lockfile_fd ;
2015-10-12 21:30:30 +02:00
* punique = unique ;
2014-02-24 12:23:49 +00:00
return 0 ;
fail_unlink :
2014-09-10 09:58:00 +02:00
unlink ( lockfile_name . buf ) ;
2014-02-24 12:23:49 +00:00
fail_close :
close ( lockfile_fd ) ;
return ret ;
}
2016-09-09 16:51:00 +02:00
static void messaging_dgm_read_handler ( struct tevent_context * ev ,
struct tevent_fd * fde ,
uint16_t flags ,
void * private_data ) ;
2016-10-05 10:46:13 -07:00
/*
* Create the rendezvous point in the file system
* that other processes can use to send messages to
* this pid .
*/
2014-09-10 16:13:18 +02:00
int messaging_dgm_init ( struct tevent_context * ev ,
2015-10-12 21:30:30 +02:00
uint64_t * punique ,
2014-10-04 10:58:15 +02:00
const char * socket_dir ,
const char * lockfile_dir ,
2016-09-23 18:36:15 -07:00
void ( * recv_cb ) ( struct tevent_context * ev ,
const uint8_t * msg ,
2014-07-17 09:44:41 +00:00
size_t msg_len ,
2014-09-30 11:29:22 +02:00
int * fds ,
2014-06-24 07:39:05 +02:00
size_t num_fds ,
2014-07-17 09:44:41 +00:00
void * private_data ) ,
2014-09-10 16:13:18 +02:00
void * recv_cb_private_data )
2014-02-24 12:23:49 +00:00
{
struct messaging_dgm_context * ctx ;
int ret ;
2014-06-01 20:57:21 +02:00
struct sockaddr_un socket_address ;
2014-10-04 10:58:15 +02:00
size_t len ;
2014-07-20 14:51:47 +02:00
static bool have_dgm_context = false ;
if ( have_dgm_context ) {
return EEXIST ;
}
2014-02-24 12:23:49 +00:00
2021-03-06 15:49:46 +01:00
if ( ( socket_dir = = NULL ) | | ( lockfile_dir = = NULL ) ) {
return EINVAL ;
}
2014-09-10 16:13:18 +02:00
ctx = talloc_zero ( NULL , struct messaging_dgm_context ) ;
2014-02-24 12:23:49 +00:00
if ( ctx = = NULL ) {
goto fail_nomem ;
}
2016-09-09 16:51:00 +02:00
ctx - > ev = ev ;
2022-07-25 14:29:35 +02:00
ctx - > pid = tevent_cached_getpid ( ) ;
2014-07-17 09:44:41 +00:00
ctx - > recv_cb = recv_cb ;
ctx - > recv_cb_private_data = recv_cb_private_data ;
2014-10-04 10:58:15 +02:00
len = strlcpy ( ctx - > lockfile_dir . buf , lockfile_dir ,
sizeof ( ctx - > lockfile_dir . buf ) ) ;
if ( len > = sizeof ( ctx - > lockfile_dir . buf ) ) {
2014-09-10 10:12:23 +02:00
TALLOC_FREE ( ctx ) ;
return ENAMETOOLONG ;
2014-02-24 12:23:49 +00:00
}
2014-06-01 20:57:21 +02:00
2014-10-04 10:58:15 +02:00
len = strlcpy ( ctx - > socket_dir . buf , socket_dir ,
sizeof ( ctx - > socket_dir . buf ) ) ;
if ( len > = sizeof ( ctx - > socket_dir . buf ) ) {
TALLOC_FREE ( ctx ) ;
return ENAMETOOLONG ;
}
2014-09-10 10:12:23 +02:00
2014-06-01 20:57:21 +02:00
socket_address = ( struct sockaddr_un ) { . sun_family = AF_UNIX } ;
2014-10-04 10:58:15 +02:00
len = snprintf ( socket_address . sun_path ,
sizeof ( socket_address . sun_path ) ,
" %s/%u " , socket_dir , ( unsigned ) ctx - > pid ) ;
if ( len > = sizeof ( socket_address . sun_path ) ) {
2014-07-17 11:23:46 +00:00
TALLOC_FREE ( ctx ) ;
2014-06-10 15:21:10 +00:00
return ENAMETOOLONG ;
2014-02-24 12:23:49 +00:00
}
2014-10-04 10:58:15 +02:00
ret = messaging_dgm_lockfile_create ( ctx , ctx - > pid , & ctx - > lockfile_fd ,
2015-10-12 21:30:30 +02:00
punique ) ;
2014-02-24 12:23:49 +00:00
if ( ret ! = 0 ) {
DEBUG ( 1 , ( " %s: messaging_dgm_create_lockfile failed: %s \n " ,
__func__ , strerror ( ret ) ) ) ;
2014-07-17 11:23:46 +00:00
TALLOC_FREE ( ctx ) ;
2014-06-10 15:21:10 +00:00
return ret ;
2014-02-24 12:23:49 +00:00
}
2016-09-09 16:51:00 +02:00
unlink ( socket_address . sun_path ) ;
2014-05-05 08:45:52 +02:00
2016-09-09 16:51:00 +02:00
ctx - > sock = socket ( AF_UNIX , SOCK_DGRAM , 0 ) ;
if ( ctx - > sock = = - 1 ) {
ret = errno ;
DBG_WARNING ( " socket failed: %s \n " , strerror ( ret ) ) ;
TALLOC_FREE ( ctx ) ;
return ret ;
2014-05-05 08:45:52 +02:00
}
2014-02-24 12:23:49 +00:00
2016-09-09 16:51:00 +02:00
ret = prepare_socket_cloexec ( ctx - > sock ) ;
if ( ret = = - 1 ) {
ret = errno ;
DBG_WARNING ( " prepare_socket_cloexec failed: %s \n " ,
strerror ( ret ) ) ;
TALLOC_FREE ( ctx ) ;
return ret ;
}
2014-02-24 12:23:49 +00:00
2016-09-09 16:51:00 +02:00
ret = bind ( ctx - > sock , ( struct sockaddr * ) ( void * ) & socket_address ,
sizeof ( socket_address ) ) ;
if ( ret = = - 1 ) {
ret = errno ;
DBG_WARNING ( " bind failed: %s \n " , strerror ( ret ) ) ;
2014-07-17 11:23:46 +00:00
TALLOC_FREE ( ctx ) ;
2014-06-10 15:21:10 +00:00
return ret ;
2014-02-24 12:23:49 +00:00
}
2016-09-09 16:51:00 +02:00
2014-02-24 12:23:49 +00:00
talloc_set_destructor ( ctx , messaging_dgm_context_destructor ) ;
2014-07-20 14:51:47 +02:00
ctx - > have_dgm_context = & have_dgm_context ;
2018-06-22 00:10:08 +02:00
ret = pthreadpool_tevent_init ( ctx , UINT_MAX , & ctx - > pool ) ;
2016-09-09 16:51:00 +02:00
if ( ret ! = 0 ) {
DBG_WARNING ( " pthreadpool_tevent_init failed: %s \n " ,
strerror ( ret ) ) ;
TALLOC_FREE ( ctx ) ;
return ret ;
}
2014-09-10 16:13:18 +02:00
global_dgm_context = ctx ;
2014-06-10 15:21:10 +00:00
return 0 ;
2014-02-24 12:23:49 +00:00
fail_nomem :
2014-07-17 11:23:46 +00:00
TALLOC_FREE ( ctx ) ;
2014-06-10 15:21:10 +00:00
return ENOMEM ;
2014-02-24 12:23:49 +00:00
}
2016-10-05 10:46:13 -07:00
/*
* Remove the rendezvous point in the filesystem
* if we ' re the owner .
*/
2014-02-24 12:23:49 +00:00
static int messaging_dgm_context_destructor ( struct messaging_dgm_context * c )
{
2016-09-09 16:51:00 +02:00
while ( c - > outsocks ! = NULL ) {
TALLOC_FREE ( c - > outsocks ) ;
}
while ( c - > in_msgs ! = NULL ) {
TALLOC_FREE ( c - > in_msgs ) ;
}
2016-09-30 21:53:44 -07:00
while ( c - > fde_evs ! = NULL ) {
tevent_fd_set_flags ( c - > fde_evs - > fde , 0 ) ;
c - > fde_evs - > ctx = NULL ;
DLIST_REMOVE ( c - > fde_evs , c - > fde_evs ) ;
}
2016-09-09 16:51:00 +02:00
close ( c - > sock ) ;
2014-02-24 12:23:49 +00:00
2022-07-25 14:29:35 +02:00
if ( tevent_cached_getpid ( ) = = c - > pid ) {
2014-10-04 10:58:15 +02:00
struct sun_path_buf name ;
int ret ;
2016-09-09 16:51:00 +02:00
ret = snprintf ( name . buf , sizeof ( name . buf ) , " %s/%u " ,
c - > socket_dir . buf , ( unsigned ) c - > pid ) ;
if ( ( ret < 0 ) | | ( ( size_t ) ret > = sizeof ( name . buf ) ) ) {
/*
* We ' ve checked the length when creating , so this
* should never happen
*/
abort ( ) ;
}
unlink ( name . buf ) ;
2014-10-04 10:58:15 +02:00
ret = snprintf ( name . buf , sizeof ( name . buf ) , " %s/%u " ,
c - > lockfile_dir . buf , ( unsigned ) c - > pid ) ;
2016-08-21 17:46:16 +02:00
if ( ( ret < 0 ) | | ( ( size_t ) ret > = sizeof ( name . buf ) ) ) {
2014-10-04 10:58:15 +02:00
/*
* We ' ve checked the length when creating , so this
* should never happen
*/
abort ( ) ;
}
unlink ( name . buf ) ;
2014-02-24 12:23:49 +00:00
}
close ( c - > lockfile_fd ) ;
2014-07-20 14:51:47 +02:00
if ( c - > have_dgm_context ! = NULL ) {
* c - > have_dgm_context = false ;
}
2014-02-24 12:23:49 +00:00
return 0 ;
}
2017-11-25 16:47:24 +01:00
static void messaging_dgm_validate ( struct messaging_dgm_context * ctx )
{
# ifdef DEVELOPER
2022-07-25 14:29:35 +02:00
pid_t pid = tevent_cached_getpid ( ) ;
2017-11-25 16:47:24 +01:00
struct sockaddr_storage addr ;
socklen_t addrlen = sizeof ( addr ) ;
struct sockaddr_un * un_addr ;
struct sun_path_buf pathbuf ;
struct stat st1 , st2 ;
int ret ;
/*
* Protect against using the wrong messaging context after a
* fork without reinit_after_fork .
*/
ret = getsockname ( ctx - > sock , ( struct sockaddr * ) & addr , & addrlen ) ;
if ( ret = = - 1 ) {
DBG_ERR ( " getsockname failed: %s \n " , strerror ( errno ) ) ;
goto fail ;
}
if ( addr . ss_family ! = AF_UNIX ) {
DBG_ERR ( " getsockname returned family %d \n " ,
( int ) addr . ss_family ) ;
goto fail ;
}
un_addr = ( struct sockaddr_un * ) & addr ;
ret = snprintf ( pathbuf . buf , sizeof ( pathbuf . buf ) ,
" %s/%u " , ctx - > socket_dir . buf , ( unsigned ) pid ) ;
if ( ret < 0 ) {
DBG_ERR ( " snprintf failed: %s \n " , strerror ( errno ) ) ;
goto fail ;
}
if ( ( size_t ) ret > = sizeof ( pathbuf . buf ) ) {
DBG_ERR ( " snprintf returned %d chars \n " , ( int ) ret ) ;
goto fail ;
}
if ( strcmp ( pathbuf . buf , un_addr - > sun_path ) ! = 0 ) {
DBG_ERR ( " sockname wrong: Expected %s, got %s \n " ,
pathbuf . buf , un_addr - > sun_path ) ;
goto fail ;
}
ret = snprintf ( pathbuf . buf , sizeof ( pathbuf . buf ) ,
" %s/%u " , ctx - > lockfile_dir . buf , ( unsigned ) pid ) ;
if ( ret < 0 ) {
DBG_ERR ( " snprintf failed: %s \n " , strerror ( errno ) ) ;
goto fail ;
}
if ( ( size_t ) ret > = sizeof ( pathbuf . buf ) ) {
DBG_ERR ( " snprintf returned %d chars \n " , ( int ) ret ) ;
goto fail ;
}
ret = stat ( pathbuf . buf , & st1 ) ;
if ( ret = = - 1 ) {
DBG_ERR ( " stat failed: %s \n " , strerror ( errno ) ) ;
goto fail ;
}
ret = fstat ( ctx - > lockfile_fd , & st2 ) ;
if ( ret = = - 1 ) {
DBG_ERR ( " fstat failed: %s \n " , strerror ( errno ) ) ;
goto fail ;
}
if ( ( st1 . st_dev ! = st2 . st_dev ) | | ( st1 . st_ino ! = st2 . st_ino ) ) {
DBG_ERR ( " lockfile differs, expected (%d/%d), got (%d/%d) \n " ,
( int ) st2 . st_dev , ( int ) st2 . st_ino ,
( int ) st1 . st_dev , ( int ) st1 . st_ino ) ;
goto fail ;
}
return ;
fail :
abort ( ) ;
# else
return ;
# endif
}
2016-09-09 16:51:00 +02:00
static void messaging_dgm_recv ( struct messaging_dgm_context * ctx ,
2016-09-23 17:07:20 -07:00
struct tevent_context * ev ,
2016-09-09 16:51:00 +02:00
uint8_t * msg , size_t msg_len ,
int * fds , size_t num_fds ) ;
2016-10-05 10:46:13 -07:00
/*
* Raw read callback handler - passes to messaging_dgm_recv ( )
* for fragment reassembly processing .
*/
2016-09-09 16:51:00 +02:00
static void messaging_dgm_read_handler ( struct tevent_context * ev ,
struct tevent_fd * fde ,
uint16_t flags ,
void * private_data )
{
struct messaging_dgm_context * ctx = talloc_get_type_abort (
private_data , struct messaging_dgm_context ) ;
ssize_t received ;
struct msghdr msg ;
struct iovec iov ;
size_t msgbufsize = msghdr_prep_recv_fds ( NULL , NULL , 0 , INT8_MAX ) ;
uint8_t msgbuf [ msgbufsize ] ;
uint8_t buf [ MESSAGING_DGM_FRAGMENT_LENGTH ] ;
2018-11-22 13:57:18 +01:00
size_t num_fds ;
2016-09-09 16:51:00 +02:00
2017-11-25 16:47:24 +01:00
messaging_dgm_validate ( ctx ) ;
2016-09-09 16:51:00 +02:00
if ( ( flags & TEVENT_FD_READ ) = = 0 ) {
return ;
}
iov = ( struct iovec ) { . iov_base = buf , . iov_len = sizeof ( buf ) } ;
msg = ( struct msghdr ) { . msg_iov = & iov , . msg_iovlen = 1 } ;
msghdr_prep_recv_fds ( & msg , msgbuf , msgbufsize , INT8_MAX ) ;
# ifdef MSG_CMSG_CLOEXEC
2016-10-12 12:25:39 +02:00
msg . msg_flags | = MSG_CMSG_CLOEXEC ;
2016-09-09 16:51:00 +02:00
# endif
received = recvmsg ( ctx - > sock , & msg , 0 ) ;
if ( received = = - 1 ) {
if ( ( errno = = EAGAIN ) | |
( errno = = EWOULDBLOCK ) | |
( errno = = EINTR ) | |
( errno = = ENOMEM ) ) {
/* Not really an error - just try again. */
return ;
}
/* Problem with the socket. Set it unreadable. */
2016-09-30 06:42:40 -07:00
tevent_fd_set_flags ( fde , 0 ) ;
2016-09-09 16:51:00 +02:00
return ;
}
if ( ( size_t ) received > sizeof ( buf ) ) {
/* More than we expected, not for us */
return ;
}
2018-11-22 13:57:18 +01:00
num_fds = msghdr_extract_fds ( & msg , NULL , 0 ) ;
if ( num_fds = = 0 ) {
int fds [ 1 ] ;
messaging_dgm_recv ( ctx , ev , buf , received , fds , 0 ) ;
} else {
2016-09-09 16:51:00 +02:00
size_t i ;
int fds [ num_fds ] ;
msghdr_extract_fds ( & msg , fds , num_fds ) ;
for ( i = 0 ; i < num_fds ; i + + ) {
int err ;
err = prepare_socket_cloexec ( fds [ i ] ) ;
if ( err ! = 0 ) {
close_fd_array ( fds , num_fds ) ;
num_fds = 0 ;
}
}
2016-09-23 17:07:20 -07:00
messaging_dgm_recv ( ctx , ev , buf , received , fds , num_fds ) ;
2016-09-09 16:51:00 +02:00
}
}
static int messaging_dgm_in_msg_destructor ( struct messaging_dgm_in_msg * m )
{
DLIST_REMOVE ( m - > ctx - > in_msgs , m ) ;
return 0 ;
}
2021-01-21 18:33:58 +01:00
static void messaging_dgm_close_unconsumed ( int * fds , size_t num_fds )
{
size_t i ;
for ( i = 0 ; i < num_fds ; i + + ) {
if ( fds [ i ] ! = - 1 ) {
close ( fds [ i ] ) ;
fds [ i ] = - 1 ;
}
}
}
2016-10-05 10:46:13 -07:00
/*
* Deal with identification of fragmented messages and
* re - assembly into full messages sent , then calls the
* callback .
*/
2016-09-09 16:51:00 +02:00
static void messaging_dgm_recv ( struct messaging_dgm_context * ctx ,
2016-09-23 17:07:20 -07:00
struct tevent_context * ev ,
2016-09-09 16:51:00 +02:00
uint8_t * buf , size_t buflen ,
int * fds , size_t num_fds )
{
struct messaging_dgm_fragment_hdr hdr ;
struct messaging_dgm_in_msg * msg ;
size_t space ;
uint64_t cookie ;
if ( buflen < sizeof ( cookie ) ) {
goto close_fds ;
}
memcpy ( & cookie , buf , sizeof ( cookie ) ) ;
buf + = sizeof ( cookie ) ;
buflen - = sizeof ( cookie ) ;
if ( cookie = = 0 ) {
2016-09-23 18:36:15 -07:00
ctx - > recv_cb ( ev , buf , buflen , fds , num_fds ,
2016-09-09 16:51:00 +02:00
ctx - > recv_cb_private_data ) ;
2021-01-21 18:33:58 +01:00
messaging_dgm_close_unconsumed ( fds , num_fds ) ;
2016-09-09 16:51:00 +02:00
return ;
}
if ( buflen < sizeof ( hdr ) ) {
goto close_fds ;
}
memcpy ( & hdr , buf , sizeof ( hdr ) ) ;
buf + = sizeof ( hdr ) ;
buflen - = sizeof ( hdr ) ;
for ( msg = ctx - > in_msgs ; msg ! = NULL ; msg = msg - > next ) {
if ( ( msg - > sender_pid = = hdr . pid ) & &
( msg - > sender_sock = = hdr . sock ) ) {
break ;
}
}
if ( ( msg ! = NULL ) & & ( msg - > cookie ! = cookie ) ) {
TALLOC_FREE ( msg ) ;
}
if ( msg = = NULL ) {
size_t msglen ;
msglen = offsetof ( struct messaging_dgm_in_msg , buf ) +
hdr . msglen ;
msg = talloc_size ( ctx , msglen ) ;
if ( msg = = NULL ) {
goto close_fds ;
}
talloc_set_name_const ( msg , " struct messaging_dgm_in_msg " ) ;
* msg = ( struct messaging_dgm_in_msg ) {
. ctx = ctx , . msglen = hdr . msglen ,
. sender_pid = hdr . pid , . sender_sock = hdr . sock ,
. cookie = cookie
} ;
DLIST_ADD ( ctx - > in_msgs , msg ) ;
talloc_set_destructor ( msg , messaging_dgm_in_msg_destructor ) ;
}
space = msg - > msglen - msg - > received ;
if ( buflen > space ) {
goto close_fds ;
}
memcpy ( msg - > buf + msg - > received , buf , buflen ) ;
msg - > received + = buflen ;
if ( msg - > received < msg - > msglen ) {
/*
* Any valid sender will send the fds in the last
* block . Invalid senders might have sent fd ' s that we
* need to close here .
*/
goto close_fds ;
}
DLIST_REMOVE ( ctx - > in_msgs , msg ) ;
talloc_set_destructor ( msg , NULL ) ;
2016-09-23 18:36:15 -07:00
ctx - > recv_cb ( ev , msg - > buf , msg - > msglen , fds , num_fds ,
2016-09-09 16:51:00 +02:00
ctx - > recv_cb_private_data ) ;
2021-01-21 18:33:58 +01:00
messaging_dgm_close_unconsumed ( fds , num_fds ) ;
2016-09-09 16:51:00 +02:00
TALLOC_FREE ( msg ) ;
return ;
close_fds :
close_fd_array ( fds , num_fds ) ;
}
2014-09-10 16:13:18 +02:00
void messaging_dgm_destroy ( void )
{
TALLOC_FREE ( global_dgm_context ) ;
}
2014-05-17 15:16:02 +02:00
int messaging_dgm_send ( pid_t pid ,
const struct iovec * iov , int iovlen ,
const int * fds , size_t num_fds )
2014-02-24 12:23:49 +00:00
{
2014-09-10 16:13:18 +02:00
struct messaging_dgm_context * ctx = global_dgm_context ;
2016-09-09 16:51:00 +02:00
struct messaging_dgm_out * out ;
2014-02-24 12:23:49 +00:00
int ret ;
2019-02-07 16:15:46 +01:00
unsigned retries = 0 ;
2014-02-24 12:23:49 +00:00
2014-09-10 16:13:18 +02:00
if ( ctx = = NULL ) {
return ENOTCONN ;
}
2017-11-25 16:47:24 +01:00
messaging_dgm_validate ( ctx ) ;
2019-02-07 16:15:46 +01:00
again :
2016-09-09 16:51:00 +02:00
ret = messaging_dgm_out_get ( ctx , pid , & out ) ;
if ( ret ! = 0 ) {
return ret ;
2014-02-24 12:23:49 +00:00
}
2014-07-27 12:29:26 +02:00
DEBUG ( 10 , ( " %s: Sending message to %u \n " , __func__ , ( unsigned ) pid ) ) ;
2014-02-24 12:23:49 +00:00
2016-09-09 16:51:00 +02:00
ret = messaging_dgm_out_send_fragmented ( ctx - > ev , out , iov , iovlen ,
fds , num_fds ) ;
2019-02-07 16:15:46 +01:00
if ( ret = = ECONNREFUSED ) {
/*
* We cache outgoing sockets . If the receiver has
* closed and re - opened the socket since our last
* message , we get connection refused . Retry .
*/
TALLOC_FREE ( out ) ;
if ( retries < 5 ) {
retries + = 1 ;
goto again ;
}
}
2014-06-04 14:36:57 +00:00
return ret ;
2014-02-24 12:23:49 +00:00
}
2015-09-30 00:31:17 +02:00
static int messaging_dgm_read_unique ( int fd , uint64_t * punique )
{
char buf [ 25 ] ;
ssize_t rw_ret ;
2019-01-28 12:54:07 +01:00
int error = 0 ;
2015-09-30 00:31:17 +02:00
unsigned long long unique ;
char * endptr ;
rw_ret = pread ( fd , buf , sizeof ( buf ) - 1 , 0 ) ;
if ( rw_ret = = - 1 ) {
return errno ;
}
buf [ rw_ret ] = ' \0 ' ;
2019-06-04 09:04:15 +02:00
unique = smb_strtoull ( buf , & endptr , 10 , & error , SMB_STR_STANDARD ) ;
2019-01-28 12:54:07 +01:00
if ( error ! = 0 ) {
return error ;
2015-09-30 00:31:17 +02:00
}
2019-01-28 12:54:07 +01:00
2015-09-30 00:31:17 +02:00
if ( endptr [ 0 ] ! = ' \n ' ) {
return EINVAL ;
}
* punique = unique ;
return 0 ;
}
int messaging_dgm_get_unique ( pid_t pid , uint64_t * unique )
{
struct messaging_dgm_context * ctx = global_dgm_context ;
struct sun_path_buf lockfile_name ;
int ret , fd ;
if ( ctx = = NULL ) {
return EBADF ;
}
2017-11-25 16:47:24 +01:00
messaging_dgm_validate ( ctx ) ;
2022-07-25 14:29:35 +02:00
if ( pid = = tevent_cached_getpid ( ) ) {
2015-09-30 00:31:17 +02:00
/*
* Protect against losing our own lock
*/
return messaging_dgm_read_unique ( ctx - > lockfile_fd , unique ) ;
}
ret = snprintf ( lockfile_name . buf , sizeof ( lockfile_name . buf ) ,
" %s/%u " , ctx - > lockfile_dir . buf , ( int ) pid ) ;
2016-08-21 17:46:16 +02:00
if ( ret < 0 ) {
return errno ;
}
if ( ( size_t ) ret > = sizeof ( lockfile_name . buf ) ) {
2015-09-30 00:31:17 +02:00
return ENAMETOOLONG ;
}
fd = open ( lockfile_name . buf , O_NONBLOCK | O_RDONLY , 0 ) ;
if ( fd = = - 1 ) {
return errno ;
}
ret = messaging_dgm_read_unique ( fd , unique ) ;
close ( fd ) ;
return ret ;
}
2014-09-10 16:13:18 +02:00
int messaging_dgm_cleanup ( pid_t pid )
2014-02-24 12:23:49 +00:00
{
2014-09-10 16:13:18 +02:00
struct messaging_dgm_context * ctx = global_dgm_context ;
2014-09-10 09:58:00 +02:00
struct sun_path_buf lockfile_name , socket_name ;
2014-10-04 10:58:15 +02:00
int fd , len , ret ;
2019-01-14 12:20:35 +01:00
struct flock lck = {
. l_pid = 0 ,
} ;
2014-02-24 12:23:49 +00:00
2014-09-10 16:13:18 +02:00
if ( ctx = = NULL ) {
return ENOTCONN ;
}
2014-10-04 10:58:15 +02:00
len = snprintf ( socket_name . buf , sizeof ( socket_name . buf ) , " %s/%u " ,
ctx - > socket_dir . buf , ( unsigned ) pid ) ;
2016-08-21 17:46:16 +02:00
if ( len < 0 ) {
return errno ;
}
if ( ( size_t ) len > = sizeof ( socket_name . buf ) ) {
2014-10-04 10:58:15 +02:00
return ENAMETOOLONG ;
2014-02-24 12:23:49 +00:00
}
2014-10-04 10:58:15 +02:00
len = snprintf ( lockfile_name . buf , sizeof ( lockfile_name . buf ) , " %s/%u " ,
ctx - > lockfile_dir . buf , ( unsigned ) pid ) ;
2016-08-21 17:46:16 +02:00
if ( len < 0 ) {
return errno ;
}
if ( ( size_t ) len > = sizeof ( lockfile_name . buf ) ) {
2014-10-04 10:58:15 +02:00
return ENAMETOOLONG ;
}
2014-09-10 09:58:00 +02:00
fd = open ( lockfile_name . buf , O_NONBLOCK | O_WRONLY , 0 ) ;
2014-02-24 12:23:49 +00:00
if ( fd = = - 1 ) {
2014-06-04 14:42:46 +00:00
ret = errno ;
2014-08-18 11:58:05 +00:00
if ( ret ! = ENOENT ) {
DEBUG ( 10 , ( " %s: open(%s) failed: %s \n " , __func__ ,
2014-09-10 09:58:00 +02:00
lockfile_name . buf , strerror ( ret ) ) ) ;
2014-08-18 11:58:05 +00:00
}
2014-06-04 14:42:46 +00:00
return ret ;
2014-02-24 12:23:49 +00:00
}
lck . l_type = F_WRLCK ;
lck . l_whence = SEEK_SET ;
lck . l_start = 0 ;
lck . l_len = 0 ;
ret = fcntl ( fd , F_SETLK , & lck ) ;
if ( ret ! = 0 ) {
2014-06-04 14:42:46 +00:00
ret = errno ;
2014-10-04 10:58:15 +02:00
if ( ( ret ! = EACCES ) & & ( ret ! = EAGAIN ) ) {
DEBUG ( 10 , ( " %s: Could not get lock: %s \n " , __func__ ,
strerror ( ret ) ) ) ;
}
2014-02-24 12:23:49 +00:00
close ( fd ) ;
2014-06-04 14:42:46 +00:00
return ret ;
2014-02-24 12:23:49 +00:00
}
2014-10-04 10:58:15 +02:00
DEBUG ( 10 , ( " %s: Cleaning up : %s \n " , __func__ , strerror ( ret ) ) ) ;
2014-09-10 09:58:00 +02:00
( void ) unlink ( socket_name . buf ) ;
( void ) unlink ( lockfile_name . buf ) ;
2014-02-24 12:23:49 +00:00
( void ) close ( fd ) ;
2014-06-04 14:42:46 +00:00
return 0 ;
2014-02-24 12:23:49 +00:00
}
2014-04-10 22:07:11 +02:00
2017-10-16 21:52:35 +02:00
static int messaging_dgm_wipe_fn ( pid_t pid , void * private_data )
2014-04-10 22:07:11 +02:00
{
2017-10-16 21:52:35 +02:00
pid_t * our_pid = ( pid_t * ) private_data ;
2014-06-04 14:47:05 +00:00
int ret ;
2014-04-10 22:07:11 +02:00
2017-10-16 21:52:35 +02:00
if ( pid = = * our_pid ) {
/*
* fcntl ( F_GETLK ) will succeed for ourselves , we hold
* that lock ourselves .
*/
return 0 ;
2014-09-10 16:13:18 +02:00
}
2017-10-16 21:52:35 +02:00
ret = messaging_dgm_cleanup ( pid ) ;
DEBUG ( 10 , ( " messaging_dgm_cleanup(%lu) returned %s \n " ,
( unsigned long ) pid , ret ? strerror ( ret ) : " ok " ) ) ;
2014-04-10 22:07:11 +02:00
2017-10-16 21:52:35 +02:00
return 0 ;
}
2014-04-10 22:07:11 +02:00
2017-10-16 21:52:35 +02:00
int messaging_dgm_wipe ( void )
{
2022-07-25 14:29:35 +02:00
pid_t pid = tevent_cached_getpid ( ) ;
2017-10-16 21:52:35 +02:00
messaging_dgm_forall ( messaging_dgm_wipe_fn , & pid ) ;
2014-06-04 14:47:05 +00:00
return 0 ;
2014-04-10 22:07:11 +02:00
}
2014-05-06 09:11:17 +02:00
2017-07-21 19:03:26 +02:00
int messaging_dgm_forall ( int ( * fn ) ( pid_t pid , void * private_data ) ,
void * private_data )
{
struct messaging_dgm_context * ctx = global_dgm_context ;
DIR * msgdir ;
struct dirent * dp ;
2019-01-28 12:54:07 +01:00
int error = 0 ;
2017-07-21 19:03:26 +02:00
if ( ctx = = NULL ) {
return ENOTCONN ;
}
messaging_dgm_validate ( ctx ) ;
/*
* We scan the socket directory and not the lock directory . Otherwise
* we would race against messaging_dgm_lockfile_create ' s open ( O_CREAT )
* and fcntl ( SETLK ) .
*/
msgdir = opendir ( ctx - > socket_dir . buf ) ;
if ( msgdir = = NULL ) {
return errno ;
}
while ( ( dp = readdir ( msgdir ) ) ! = NULL ) {
unsigned long pid ;
int ret ;
2019-06-04 09:04:15 +02:00
pid = smb_strtoul ( dp - > d_name , NULL , 10 , & error , SMB_STR_STANDARD ) ;
2019-01-28 12:54:07 +01:00
if ( ( pid = = 0 ) | | ( error ! = 0 ) ) {
2017-07-21 19:03:26 +02:00
/*
* . and . . and other malformed entries
*/
continue ;
}
ret = fn ( pid , private_data ) ;
if ( ret ! = 0 ) {
break ;
}
}
closedir ( msgdir ) ;
return 0 ;
}
2016-09-30 21:53:44 -07:00
struct messaging_dgm_fde {
struct tevent_fd * fde ;
} ;
static int messaging_dgm_fde_ev_destructor ( struct messaging_dgm_fde_ev * fde_ev )
{
if ( fde_ev - > ctx ! = NULL ) {
DLIST_REMOVE ( fde_ev - > ctx - > fde_evs , fde_ev ) ;
fde_ev - > ctx = NULL ;
}
return 0 ;
}
2016-10-05 10:46:13 -07:00
/*
* Reference counter for a struct tevent_fd messaging read event
* ( with callback function ) on a struct tevent_context registered
* on a messaging context .
*
* If we ' ve already registered this struct tevent_context before
* ( so already have a read event ) , just increase the reference count .
*
* Otherwise create a new struct tevent_fd messaging read event on the
* previously unseen struct tevent_context - this is what drives
* the message receive processing .
*
*/
2016-09-30 21:53:44 -07:00
struct messaging_dgm_fde * messaging_dgm_register_tevent_context (
2016-09-30 06:42:40 -07:00
TALLOC_CTX * mem_ctx , struct tevent_context * ev )
2014-05-06 09:11:17 +02:00
{
2014-09-10 16:13:18 +02:00
struct messaging_dgm_context * ctx = global_dgm_context ;
2016-09-30 21:53:44 -07:00
struct messaging_dgm_fde_ev * fde_ev ;
struct messaging_dgm_fde * fde ;
2014-09-10 16:13:18 +02:00
if ( ctx = = NULL ) {
return NULL ;
}
2016-09-30 21:53:44 -07:00
fde = talloc ( mem_ctx , struct messaging_dgm_fde ) ;
if ( fde = = NULL ) {
return NULL ;
}
for ( fde_ev = ctx - > fde_evs ; fde_ev ! = NULL ; fde_ev = fde_ev - > next ) {
2018-03-27 15:27:32 +02:00
if ( tevent_fd_get_flags ( fde_ev - > fde ) = = 0 ) {
/*
* If the event context got deleted ,
* tevent_fd_get_flags ( ) will return 0
* for the stale fde .
*
* In that case we should not
* use fde_ev - > ev anymore .
*/
continue ;
}
2018-12-27 12:45:42 +01:00
if ( fde_ev - > ev = = ev ) {
2016-09-30 21:53:44 -07:00
break ;
}
}
if ( fde_ev = = NULL ) {
fde_ev = talloc ( fde , struct messaging_dgm_fde_ev ) ;
if ( fde_ev = = NULL ) {
return NULL ;
}
fde_ev - > fde = tevent_add_fd (
ev , fde_ev , ctx - > sock , TEVENT_FD_READ ,
messaging_dgm_read_handler , ctx ) ;
if ( fde_ev - > fde = = NULL ) {
TALLOC_FREE ( fde ) ;
return NULL ;
}
fde_ev - > ev = ev ;
fde_ev - > ctx = ctx ;
DLIST_ADD ( ctx - > fde_evs , fde_ev ) ;
talloc_set_destructor (
fde_ev , messaging_dgm_fde_ev_destructor ) ;
} else {
/*
* Same trick as with tdb_wrap : The caller will never
* see the talloc_referenced object , the
* messaging_dgm_fde_ev , so problems with
* talloc_unlink will not happen .
*/
if ( talloc_reference ( fde , fde_ev ) = = NULL ) {
TALLOC_FREE ( fde ) ;
return NULL ;
}
}
fde - > fde = fde_ev - > fde ;
return fde ;
}
bool messaging_dgm_fde_active ( struct messaging_dgm_fde * fde )
{
uint16_t flags ;
if ( fde = = NULL ) {
return false ;
}
flags = tevent_fd_get_flags ( fde - > fde ) ;
return ( flags ! = 0 ) ;
2014-05-06 09:11:17 +02:00
}