diff --git a/source/lib/messaging/messaging.c b/source/lib/messaging/messaging.c index 58b5e5243e4..ab94a30acea 100644 --- a/source/lib/messaging/messaging.c +++ b/source/lib/messaging/messaging.c @@ -48,11 +48,12 @@ struct messaging_context { uint32_t num_types; struct idr_context *dispatch_tree; struct messaging_rec *pending; + struct messaging_rec *retry_queue; struct irpc_list *irpc; struct idr_context *idr; const char **names; struct timeval start_time; - + struct timed_event *retry_te; struct { struct event_context *ev; struct fd_event *fde; @@ -83,6 +84,7 @@ struct messaging_rec { } *header; DATA_BLOB packet; + uint32_t retries; }; @@ -168,6 +170,7 @@ static void cluster_message_handler(struct messaging_context *msg, DATA_BLOB pac rec->path = msg->path; rec->header = (struct messaging_header *)packet.data; rec->packet = packet; + rec->retries = 0; if (packet.length != sizeof(*rec->header) + rec->header->length) { DEBUG(0,("messaging: bad message header size %d should be %d\n", @@ -210,6 +213,26 @@ static NTSTATUS try_send(struct messaging_rec *rec) return status; } +/* + retry backed off messages +*/ +static void msg_retry_timer(struct event_context *ev, struct timed_event *te, + struct timeval t, void *private) +{ + struct messaging_context *msg = talloc_get_type(private, + struct messaging_context); + msg->retry_te = NULL; + + /* put the messages back on the main queue */ + while (msg->retry_queue) { + struct messaging_rec *rec = msg->retry_queue; + DLIST_REMOVE(msg->retry_queue, rec); + DLIST_ADD_END(msg->pending, rec, struct messaging_rec *); + } + + EVENT_FD_WRITEABLE(msg->event.fde); +} + /* handle a socket write event */ @@ -220,8 +243,23 @@ static void messaging_send_handler(struct messaging_context *msg) NTSTATUS status; status = try_send(rec); if (NT_STATUS_EQUAL(status, STATUS_MORE_ENTRIES)) { + rec->retries++; + if (rec->retries > 3) { + /* we're getting continuous write errors - + backoff this record */ + DLIST_REMOVE(msg->pending, rec); + DLIST_ADD_END(msg->retry_queue, rec, + struct messaging_rec *); + if (msg->retry_te == NULL) { + msg->retry_te = + event_add_timed(msg->event.ev, msg, + timeval_current_ofs(1, 0), + msg_retry_timer, msg); + } + } break; } + rec->retries = 0; if (!NT_STATUS_IS_OK(status)) { DEBUG(1,("messaging: Lost message from %u to %u of type %u - %s\n", rec->header->from.id, rec->header->to.id, rec->header->msg_type, @@ -281,6 +319,7 @@ static void messaging_recv_handler(struct messaging_context *msg) rec->path = msg->path; rec->header = (struct messaging_header *)packet.data; rec->packet = packet; + rec->retries = 0; if (msize != sizeof(*rec->header) + rec->header->length) { DEBUG(0,("messaging: bad message header size %d should be %d\n", @@ -415,6 +454,7 @@ NTSTATUS messaging_send(struct messaging_context *msg, struct server_id server, return NT_STATUS_NO_MEMORY; } + rec->retries = 0; rec->msg = msg; rec->header = (struct messaging_header *)rec->packet.data; rec->header->version = MESSAGING_VERSION;