1
0
mirror of https://github.com/samba-team/samba.git synced 2024-12-24 21:34:56 +03:00

messaging3: Push down the self-send callback

In the messaging_read receivers we already defer the callback: We need to
reply on potentially different tevent contexts, thus the defer_callback.

The callback case in messaging_dispatch_rec was direct before this
patch. This changes messaging_dispatch_rec to also defer the callback
in the self-send case.

Now we need only two roundtrips in local-messaging-read1 :-)

Signed-off-by: Volker Lendecke <vl@samba.org>
Reviewed-by: Stefan Metzmacher <metze@samba.org>
Reviewed-by: Jeremy Allison <jra@samba.org>
This commit is contained in:
Volker Lendecke 2014-05-07 09:44:57 +02:00
parent 80365e030d
commit c0f6ab92f7
2 changed files with 95 additions and 53 deletions

View File

@ -341,15 +341,6 @@ void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
}
}
struct messaging_selfsend_state {
struct messaging_context *msg;
struct messaging_rec rec;
};
static void messaging_trigger_self(struct tevent_context *ev,
struct tevent_immediate *im,
void *private_data);
/*
Send a message to a particular server
*/
@ -368,33 +359,13 @@ NTSTATUS messaging_send(struct messaging_context *msg_ctx,
}
if (server_id_equal(&msg_ctx->id, &server)) {
struct messaging_selfsend_state *state;
struct tevent_immediate *im;
state = talloc_pooled_object(
msg_ctx, struct messaging_selfsend_state,
1, data->length);
if (state == NULL) {
return NT_STATUS_NO_MEMORY;
}
state->msg = msg_ctx;
state->rec.msg_version = MESSAGE_VERSION;
state->rec.msg_type = msg_type & MSG_TYPE_MASK;
state->rec.dest = server;
state->rec.src = msg_ctx->id;
/* Can't fail, it's a pooled_object */
state->rec.buf = data_blob_talloc(
state, data->data, data->length);
im = tevent_create_immediate(state);
if (im == NULL) {
TALLOC_FREE(state);
return NT_STATUS_NO_MEMORY;
}
tevent_schedule_immediate(im, msg_ctx->event_ctx,
messaging_trigger_self, state);
struct messaging_rec rec;
rec.msg_version = MESSAGE_VERSION;
rec.msg_type = msg_type & MSG_TYPE_MASK;
rec.dest = server;
rec.src = msg_ctx->id;
rec.buf = *data;
messaging_dispatch_rec(msg_ctx, &rec);
return NT_STATUS_OK;
}
@ -402,16 +373,6 @@ NTSTATUS messaging_send(struct messaging_context *msg_ctx,
msg_ctx->local);
}
static void messaging_trigger_self(struct tevent_context *ev,
struct tevent_immediate *im,
void *private_data)
{
struct messaging_selfsend_state *state = talloc_get_type_abort(
private_data, struct messaging_selfsend_state);
messaging_dispatch_rec(state->msg, &state->rec);
TALLOC_FREE(state);
}
NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
struct server_id server, uint32_t msg_type,
const uint8_t *buf, size_t len)
@ -697,6 +658,68 @@ static bool messaging_append_new_waiters(struct messaging_context *msg_ctx)
return true;
}
struct messaging_defer_callback_state {
struct messaging_context *msg_ctx;
struct messaging_rec *rec;
void (*fn)(struct messaging_context *msg, void *private_data,
uint32_t msg_type, struct server_id server_id,
DATA_BLOB *data);
void *private_data;
};
static void messaging_defer_callback_trigger(struct tevent_context *ev,
struct tevent_immediate *im,
void *private_data);
static void messaging_defer_callback(
struct messaging_context *msg_ctx, struct messaging_rec *rec,
void (*fn)(struct messaging_context *msg, void *private_data,
uint32_t msg_type, struct server_id server_id,
DATA_BLOB *data),
void *private_data)
{
struct messaging_defer_callback_state *state;
struct tevent_immediate *im;
state = talloc(msg_ctx, struct messaging_defer_callback_state);
if (state == NULL) {
DEBUG(1, ("talloc failed\n"));
return;
}
state->msg_ctx = msg_ctx;
state->fn = fn;
state->private_data = private_data;
state->rec = messaging_rec_dup(state, rec);
if (state->rec == NULL) {
DEBUG(1, ("talloc failed\n"));
TALLOC_FREE(state);
return;
}
im = tevent_create_immediate(state);
if (im == NULL) {
DEBUG(1, ("tevent_create_immediate failed\n"));
TALLOC_FREE(state);
return;
}
tevent_schedule_immediate(im, msg_ctx->event_ctx,
messaging_defer_callback_trigger, state);
}
static void messaging_defer_callback_trigger(struct tevent_context *ev,
struct tevent_immediate *im,
void *private_data)
{
struct messaging_defer_callback_state *state = talloc_get_type_abort(
private_data, struct messaging_defer_callback_state);
struct messaging_rec *rec = state->rec;
state->fn(state->msg_ctx, state->private_data, rec->msg_type, rec->src,
&rec->buf);
TALLOC_FREE(state);
}
/*
Dispatch one messaging_rec
*/
@ -708,15 +731,34 @@ void messaging_dispatch_rec(struct messaging_context *msg_ctx,
for (cb = msg_ctx->callbacks; cb != NULL; cb = next) {
next = cb->next;
if (cb->msg_type == rec->msg_type) {
if (cb->msg_type != rec->msg_type) {
continue;
}
if (server_id_equal(&msg_ctx->id, &rec->dest)) {
/*
* This is a self-send. We are called here from
* messaging_send(), and we don't want to directly
* recurse into the callback but go via a
* tevent_loop_once
*/
messaging_defer_callback(msg_ctx, rec, cb->fn,
cb->private_data);
} else {
/*
* This comes from a different process. we are called
* from the event loop, so we should call back
* directly.
*/
cb->fn(msg_ctx, cb->private_data, rec->msg_type,
rec->src, &rec->buf);
/* we continue looking for matching messages
after finding one. This matters for
subsystems like the internal notify code
which register more than one handler for
the same message type */
}
/*
* we continue looking for matching messages after finding
* one. This matters for subsystems like the internal notify
* code which register more than one handler for the same
* message type
*/
}
if (!messaging_append_new_waiters(msg_ctx)) {

View File

@ -122,7 +122,7 @@ bool run_messaging_read1(int dummy)
goto fail;
}
for (i=0; i<3; i++) {
for (i=0; i<2; i++) {
if (tevent_loop_once(ev) != 0) {
fprintf(stderr, "tevent_loop_once failed\n");
goto fail;