1
0
mirror of https://github.com/samba-team/samba.git synced 2025-01-10 01:18:15 +03:00

s3: lib: messaging. Add function comments I needed to understand this code.

Signed-off-by: Jeremy Allison <jra@samba.org>
Reviewed-by: Volker Lendecke <vl@samba.org>

Autobuild-User(master): Volker Lendecke <vl@samba.org>
Autobuild-Date(master): Thu Oct  6 02:29:41 CEST 2016 on sn-devel-144
This commit is contained in:
Jeremy Allison 2016-10-05 10:46:13 -07:00 committed by Volker Lendecke
parent eb7555397f
commit d02909f3e0

View File

@ -141,6 +141,11 @@ static void close_fd_array(int *fds, size_t num_fds)
}
}
/*
* The idle handler can free the struct messaging_dgm_out *,
* if it's unused (qlen of zero) which closes the socket.
*/
static void messaging_dgm_out_idle_handler(struct tevent_context *ev,
struct tevent_timer *te,
struct timeval current_time,
@ -158,6 +163,11 @@ static void messaging_dgm_out_idle_handler(struct tevent_context *ev,
}
}
/*
* Setup the idle handler to fire afer 1 second if the
* queue is zero.
*/
static void messaging_dgm_out_rearm_idle_timer(struct messaging_dgm_out *out)
{
size_t qlen;
@ -189,6 +199,11 @@ static void messaging_dgm_out_idle_handler(struct tevent_context *ev,
struct timeval current_time,
void *private_data);
/*
* Connect to an existing rendezvous point for another
* pid - wrapped inside a struct messaging_dgm_out *.
*/
static int messaging_dgm_out_create(TALLOC_CTX *mem_ctx,
struct messaging_dgm_context *ctx,
pid_t pid, struct messaging_dgm_out **pout)
@ -277,6 +292,12 @@ static int messaging_dgm_out_destructor(struct messaging_dgm_out *out)
return 0;
}
/*
* Find the struct messaging_dgm_out * to talk to pid.
* If we don't have one, create it. Set the timer to
* delete after 1 sec.
*/
static int messaging_dgm_out_get(struct messaging_dgm_context *ctx, pid_t pid,
struct messaging_dgm_out **pout)
{
@ -302,6 +323,13 @@ static int messaging_dgm_out_get(struct messaging_dgm_context *ctx, pid_t pid,
return 0;
}
/*
* This function is called directly to send a message fragment
* when the outgoing queue is zero, and from a pthreadpool
* job thread when messages are being queued (qlen != 0).
* Make sure *ONLY* thread-safe functions are called within.
*/
static ssize_t messaging_dgm_sendmsg(int sock,
const struct iovec *iov, int iovlen,
const int *fds, size_t num_fds,
@ -365,6 +393,13 @@ static void messaging_dgm_out_queue_trigger(struct tevent_req *req,
static void messaging_dgm_out_threaded_job(void *private_data);
static void messaging_dgm_out_queue_done(struct tevent_req *subreq);
/*
* Push a message fragment onto a queue to be sent by a
* threadpool job. Makes copies of data/fd's to be sent.
* The running tevent_queue internally creates an immediate
* event to schedule the write.
*/
static struct tevent_req *messaging_dgm_out_queue_send(
TALLOC_CTX *mem_ctx, struct tevent_context *ev,
struct messaging_dgm_out *out,
@ -467,6 +502,11 @@ static int messaging_dgm_out_queue_state_destructor(
return 0;
}
/*
* tevent_queue callback that schedules the pthreadpool to actually
* send the queued message fragment.
*/
static void messaging_dgm_out_queue_trigger(struct tevent_req *req,
void *private_data)
{
@ -485,6 +525,11 @@ static void messaging_dgm_out_queue_trigger(struct tevent_req *req,
req);
}
/*
* Wrapper function run by the pthread that calls
* messaging_dgm_sendmsg() to actually do the sendmsg().
*/
static void messaging_dgm_out_threaded_job(void *private_data)
{
struct messaging_dgm_out_queue_state *state = talloc_get_type_abort(
@ -498,6 +543,10 @@ static void messaging_dgm_out_threaded_job(void *private_data)
state->fds, num_fds, &state->err);
}
/*
* Pickup the results of the pthread sendmsg().
*/
static void messaging_dgm_out_queue_done(struct tevent_req *subreq)
{
struct tevent_req *req = tevent_req_callback_data(
@ -532,6 +581,14 @@ static int messaging_dgm_out_queue_recv(struct tevent_req *req)
static void messaging_dgm_out_sent_fragment(struct tevent_req *req);
/*
* Core function to send a message fragment given a
* connected struct messaging_dgm_out * destination.
* If no current queue tries to send nonblocking
* directly. If not, queues the fragment (which makes
* a copy of it) and adds a 60-second timeout on the send.
*/
static int messaging_dgm_out_send_fragment(
struct tevent_context *ev, struct messaging_dgm_out *out,
const struct iovec *iov, int iovlen, const int *fds, size_t num_fds)
@ -581,6 +638,11 @@ static int messaging_dgm_out_send_fragment(
return 0;
}
/*
* Pickup the result of the fragment send. Reset idle timer
* if queue empty.
*/
static void messaging_dgm_out_sent_fragment(struct tevent_req *req)
{
struct messaging_dgm_out *out = tevent_req_callback_data(
@ -605,6 +667,33 @@ struct messaging_dgm_fragment_hdr {
int sock;
};
/*
* Fragment a message into MESSAGING_DGM_FRAGMENT_LENGTH - 64-bit cookie
* size chunks and send it.
*
* Message fragments are prefixed by a 64-bit cookie that
* stays the same for all fragments. This allows the receiver
* to recognise fragments of the same message and re-assemble
* them on the other end.
*
* Note that this allows other message fragments from other
* senders to be interleaved in the receive read processing,
* the combination of the cookie and header info allows unique
* identification of the message from a specific sender in
* re-assembly.
*
* If the message is smaller than MESSAGING_DGM_FRAGMENT_LENGTH - cookie
* then send a single message with cookie set to zero.
*
* Otherwise the message is fragmented into chunks and added
* to the sending queue. Any file descriptors are passed only
* in the last fragment.
*
* Finally the cookie is incremented (wrap over zero) to
* prepare for the next message sent to this channel.
*
*/
static int messaging_dgm_out_send_fragmented(struct tevent_context *ev,
struct messaging_dgm_out *out,
const struct iovec *iov,
@ -837,6 +926,12 @@ static void messaging_dgm_read_handler(struct tevent_context *ev,
uint16_t flags,
void *private_data);
/*
* Create the rendezvous point in the file system
* that other processes can use to send messages to
* this pid.
*/
int messaging_dgm_init(struct tevent_context *ev,
uint64_t *punique,
const char *socket_dir,
@ -948,6 +1043,11 @@ fail_nomem:
return ENOMEM;
}
/*
* Remove the rendezvous point in the filesystem
* if we're the owner.
*/
static int messaging_dgm_context_destructor(struct messaging_dgm_context *c)
{
while (c->outsocks != NULL) {
@ -1004,6 +1104,11 @@ static void messaging_dgm_recv(struct messaging_dgm_context *ctx,
uint8_t *msg, size_t msg_len,
int *fds, size_t num_fds);
/*
* Raw read callback handler - passes to messaging_dgm_recv()
* for fragment reassembly processing.
*/
static void messaging_dgm_read_handler(struct tevent_context *ev,
struct tevent_fd *fde,
uint16_t flags,
@ -1078,6 +1183,12 @@ static int messaging_dgm_in_msg_destructor(struct messaging_dgm_in_msg *m)
return 0;
}
/*
* Deal with identification of fragmented messages and
* re-assembly into full messages sent, then calls the
* callback.
*/
static void messaging_dgm_recv(struct messaging_dgm_context *ctx,
struct tevent_context *ev,
uint8_t *buf, size_t buflen,
@ -1387,6 +1498,20 @@ static int messaging_dgm_fde_ev_destructor(struct messaging_dgm_fde_ev *fde_ev)
return 0;
}
/*
* Reference counter for a struct tevent_fd messaging read event
* (with callback function) on a struct tevent_context registered
* on a messaging context.
*
* If we've already registered this struct tevent_context before
* (so already have a read event), just increase the reference count.
*
* Otherwise create a new struct tevent_fd messaging read event on the
* previously unseen struct tevent_context - this is what drives
* the message receive processing.
*
*/
struct messaging_dgm_fde *messaging_dgm_register_tevent_context(
TALLOC_CTX *mem_ctx, struct tevent_context *ev)
{