diff --git a/source/lib/messaging/messaging.c b/source/lib/messaging/messaging.c index 2130958b366..4d2cd9c9107 100644 --- a/source/lib/messaging/messaging.c +++ b/source/lib/messaging/messaging.c @@ -31,9 +31,6 @@ /* change the message version with any incompatible changes in the protocol */ #define MESSAGING_VERSION 1 -/* the number of microseconds to backoff in retrying to send a message */ -#define MESSAGING_BACKOFF 250000 - /* maximum message size */ #define MESSAGING_MAX_SIZE 512 @@ -42,6 +39,7 @@ struct messaging_context { struct socket_context *sock; const char *path; struct dispatch_fn *dispatch; + struct messaging_rec *pending; struct { struct event_context *ev; @@ -61,8 +59,8 @@ struct dispatch_fn { /* an individual message */ struct messaging_rec { + struct messaging_rec *next, *prev; struct messaging_context *msg; - struct socket_context *sock; const char *path; struct { @@ -76,6 +74,7 @@ struct messaging_rec { DATA_BLOB data; }; + /* A useful function for testing the message system. */ @@ -111,19 +110,67 @@ static void messaging_dispatch(struct messaging_context *msg, struct messaging_r d->fn(msg, d->private, d->msg_type, rec->header.from, &rec->data); } } - rec->header.length = 0; } /* - handle a new incoming connection + try to send the message */ -static void messaging_recv_handler(struct event_context *ev, struct fd_event *fde, - uint16_t flags, void *private) +static NTSTATUS try_send(struct messaging_rec *rec) +{ + struct messaging_context *msg = rec->msg; + DATA_BLOB blob; + size_t nsent; + void *priv; + NTSTATUS status; + + blob = data_blob_talloc(rec, NULL, sizeof(rec->header) + rec->data.length); + NT_STATUS_HAVE_NO_MEMORY(blob.data); + + memcpy(blob.data, &rec->header, sizeof(rec->header)); + memcpy(blob.data + sizeof(rec->header), rec->data.data, rec->data.length); + + /* we send with privileges so messages work from any context */ + priv = root_privileges(); + status = socket_sendto(msg->sock, &blob, &nsent, 0, rec->path, 0); + talloc_free(priv); + + data_blob_free(&blob); + + return status; +} + +/* + handle a socket write event +*/ +static void messaging_send_handler(struct messaging_context *msg) +{ + while (msg->pending) { + struct messaging_rec *rec = msg->pending; + NTSTATUS status; + status = try_send(rec); + if (NT_STATUS_EQUAL(status, STATUS_MORE_ENTRIES)) { + break; + } + if (!NT_STATUS_IS_OK(status)) { + DEBUG(1,("messaging: Lost message from %u to %u of type %u - %s\n", + rec->header.from, rec->header.to, rec->header.msg_type, + nt_errstr(status))); + } + DLIST_REMOVE(msg->pending, rec); + talloc_free(rec); + } + if (msg->pending == NULL) { + EVENT_FD_NOT_WRITEABLE(msg->event.fde); + } +} + +/* + handle a new incoming packet +*/ +static void messaging_recv_handler(struct messaging_context *msg) { - struct messaging_context *msg = talloc_get_type(private, - struct messaging_context); struct messaging_rec *rec; NTSTATUS status; uint8_t data[MESSAGING_MAX_SIZE]; @@ -146,7 +193,6 @@ static void messaging_recv_handler(struct event_context *ev, struct fd_event *fd rec->msg = msg; rec->path = msg->path; - rec->sock = NULL; memcpy(&rec->header, data, sizeof(rec->header)); if (msize != sizeof(rec->header) + rec->header.length) { @@ -166,6 +212,24 @@ static void messaging_recv_handler(struct event_context *ev, struct fd_event *fd talloc_free(rec); } + +/* + handle a socket event +*/ +static void messaging_handler(struct event_context *ev, struct fd_event *fde, + uint16_t flags, void *private) +{ + struct messaging_context *msg = talloc_get_type(private, + struct messaging_context); + if (flags & EVENT_FD_WRITE) { + messaging_send_handler(msg); + } + if (flags & EVENT_FD_READ) { + messaging_recv_handler(msg); + } +} + + /* Register a dispatch function for a particular message type. */ @@ -200,89 +264,11 @@ void messaging_deregister(struct messaging_context *msg, uint32_t msg_type, void } - -/* - handle IO for sending a message -*/ -static void messaging_send_handler(struct event_context *ev, struct fd_event *fde, - uint16_t flags, void *private) -{ - struct messaging_rec *rec = talloc_get_type(private, struct messaging_rec); - uint8_t data[MESSAGING_MAX_SIZE]; - DATA_BLOB blob; - size_t nsent; - NTSTATUS status; - - memcpy(data, &rec->header, sizeof(rec->header)); - memcpy(data + sizeof(rec->header), rec->data.data, rec->data.length); - - blob.data = data; - blob.length = sizeof(rec->header) + rec->header.length; - - status = socket_send(rec->sock, &blob, &nsent, 0); - if (NT_STATUS_IS_ERR(status)) { - DEBUG(3,("Unable to send message of type %d length %d - %s\n", - rec->header.msg_type, - rec->header.length, - nt_errstr(status))); - talloc_free(rec); - return; - } - - if (NT_STATUS_IS_OK(status)) { - talloc_free(rec); - } -} - - -/* - wrapper around socket_connect with raised privileges -*/ -static NTSTATUS try_connect(struct messaging_rec *rec) -{ - NTSTATUS status; - void *priv = root_privileges(); - status = socket_connect(rec->sock, NULL, 0, rec->path, 0, 0); - talloc_free(priv); - return status; -} - - -/* - when the servers listen queue is full we use this to backoff the message -*/ -static void messaging_backoff_handler(struct event_context *ev, struct timed_event *te, - struct timeval t, void *private) -{ - struct messaging_rec *rec = talloc_get_type(private, struct messaging_rec); - struct messaging_context *msg = rec->msg; - NTSTATUS status; - - status = try_connect(rec); - if (NT_STATUS_EQUAL(status, STATUS_MORE_ENTRIES)) { - /* backoff again */ - event_add_timed(msg->event.ev, rec, - timeval_add(&t, 0, MESSAGING_BACKOFF), - messaging_backoff_handler, rec); - return; - } - - if (!NT_STATUS_IS_OK(status)) { - DEBUG(1,("messaging: Lost message from %u to %u of type %u after backoff - %s\n", - rec->header.from, rec->header.to, rec->header.msg_type, nt_errstr(status))); - talloc_free(rec); - return; - } - - event_add_fd(msg->event.ev, rec, socket_get_fd(rec->sock), - EVENT_FD_WRITE, messaging_send_handler, rec); -} - - /* Send a message to a particular server */ -NTSTATUS messaging_send(struct messaging_context *msg, uint32_t server, uint32_t msg_type, DATA_BLOB *data) +NTSTATUS messaging_send(struct messaging_context *msg, uint32_t server, + uint32_t msg_type, DATA_BLOB *data) { struct messaging_rec *rec; NTSTATUS status; @@ -292,45 +278,32 @@ NTSTATUS messaging_send(struct messaging_context *msg, uint32_t server, uint32_t return NT_STATUS_NO_MEMORY; } - rec->msg = msg; - rec->header.version = MESSAGING_VERSION; + rec->msg = msg; + rec->header.version = MESSAGING_VERSION; rec->header.msg_type = msg_type; - rec->header.from = msg->server_id; - rec->header.to = server; - rec->header.length = data?data->length:0; + rec->header.from = msg->server_id; + rec->header.to = server; + rec->header.length = data?data->length:0; if (rec->header.length != 0) { rec->data = data_blob_talloc(rec, data->data, data->length); } else { rec->data = data_blob(NULL, 0); } - status = socket_create("unix", SOCKET_TYPE_DGRAM, &rec->sock, 0); - if (!NT_STATUS_IS_OK(status)) { - talloc_free(rec); - return status; - } - talloc_steal(rec, rec->sock); - rec->path = messaging_path(rec, server); - status = try_connect(rec); + status = try_send(rec); if (NT_STATUS_EQUAL(status, STATUS_MORE_ENTRIES)) { - /* backoff on this message - the servers listen queue is full */ - event_add_timed(msg->event.ev, rec, - timeval_current_ofs(0, MESSAGING_BACKOFF), - messaging_backoff_handler, rec); + if (msg->pending == NULL) { + EVENT_FD_WRITEABLE(msg->event.fde); + } + DLIST_ADD(msg->pending, rec); return NT_STATUS_OK; } - if (!NT_STATUS_IS_OK(status)) { - talloc_free(rec); - return status; - } + talloc_free(rec); - event_add_fd(msg->event.ev, rec, socket_get_fd(rec->sock), - EVENT_FD_WRITE, messaging_send_handler, rec); - - return NT_STATUS_OK; + return status; } /* @@ -381,6 +354,7 @@ struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, uint32_t server_id msg->path = messaging_path(msg, server_id); msg->server_id = server_id; msg->dispatch = NULL; + msg->pending = NULL; status = socket_create("unix", SOCKET_TYPE_DGRAM, &msg->sock, 0); if (!NT_STATUS_IS_OK(status)) { @@ -399,9 +373,12 @@ struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, uint32_t server_id return NULL; } + /* it needs to be non blocking for sends */ + set_blocking(socket_get_fd(msg->sock), False); + msg->event.ev = talloc_reference(msg, ev); msg->event.fde = event_add_fd(ev, msg, socket_get_fd(msg->sock), - EVENT_FD_READ, messaging_recv_handler, msg); + EVENT_FD_READ, messaging_handler, msg); talloc_set_destructor(msg, messaging_destructor); @@ -409,5 +386,3 @@ struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, uint32_t server_id return msg; } - -