1
0
mirror of https://github.com/samba-team/samba.git synced 2025-01-27 14:04:05 +03:00

ctdbd_conn: Move message handling out of ctdbd_conn.c

This also removes the deferred message handling. It's no longer required,
because the messaging_send_iov_from always goes through the kernel which
takes at least one round through tevent.

Signed-off-by: Volker Lendecke <vl@samba.org>
Reviewed-by: Stefan Metzmacher <metze@samba.org>
This commit is contained in:
Volker Lendecke 2015-05-19 16:55:32 +02:00
parent 24eb3659e3
commit a37398b9de
2 changed files with 85 additions and 141 deletions

View File

@ -130,20 +130,6 @@ NTSTATUS register_with_ctdbd(struct ctdbd_connection *conn, uint64_t srvid,
return NT_STATUS_OK;
}
static bool ctdb_is_our_srvid(struct ctdbd_connection *conn, uint64_t srvid)
{
size_t i, num_callbacks;
num_callbacks = talloc_array_length(conn->callbacks);
for (i=0; i<num_callbacks; i++) {
if (srvid == conn->callbacks[i].srvid) {
return true;
}
}
return false;
}
static void ctdbd_msg_call_back(struct ctdbd_connection *conn,
struct ctdb_req_message *msg)
{
@ -292,78 +278,6 @@ static int ctdbd_connect(int *pfd)
return 0;
}
/*
* State necessary to defer an incoming message while we are waiting for a
* ctdb reply.
*/
struct deferred_msg_state {
struct messaging_context *msg_ctx;
struct messaging_rec *rec;
};
/*
* Timed event handler for the deferred message
*/
static void deferred_message_dispatch(struct tevent_context *event_ctx,
struct tevent_timer *te,
struct timeval now,
void *private_data)
{
struct deferred_msg_state *state = talloc_get_type_abort(
private_data, struct deferred_msg_state);
messaging_dispatch_rec(state->msg_ctx, state->rec);
TALLOC_FREE(state);
TALLOC_FREE(te);
}
/*
* Fetch a messaging_rec from an incoming ctdb style message
*/
static struct messaging_rec *ctdb_pull_messaging_rec(TALLOC_CTX *mem_ctx,
size_t overall_length,
struct ctdb_req_message *msg)
{
struct messaging_rec *result;
DATA_BLOB blob;
enum ndr_err_code ndr_err;
if ((overall_length < offsetof(struct ctdb_req_message, data))
|| (overall_length
< offsetof(struct ctdb_req_message, data) + msg->datalen)) {
cluster_fatal("got invalid msg length");
}
if (!(result = talloc(mem_ctx, struct messaging_rec))) {
DEBUG(0, ("talloc failed\n"));
return NULL;
}
blob = data_blob_const(msg->data, msg->datalen);
ndr_err = ndr_pull_struct_blob(
&blob, result, result,
(ndr_pull_flags_fn_t)ndr_pull_messaging_rec);
if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
DEBUG(0, ("ndr_pull_struct_blob failed: %s\n",
ndr_errstr(ndr_err)));
TALLOC_FREE(result);
return NULL;
}
if (DEBUGLEVEL >= 11) {
DEBUG(11, ("ctdb_pull_messaging_rec:\n"));
NDR_PRINT_DEBUG(messaging_rec, result);
}
return result;
}
static NTSTATUS ctdb_read_packet(int fd, TALLOC_CTX *mem_ctx,
struct ctdb_req_header **result)
{
@ -447,8 +361,6 @@ static NTSTATUS ctdb_read_req(struct ctdbd_connection *conn, uint32_t reqid,
ctdb_packet_dump(hdr);
if (hdr->operation == CTDB_REQ_MESSAGE) {
struct tevent_timer *evt;
struct deferred_msg_state *msg_state;
struct ctdb_req_message *msg = (struct ctdb_req_message *)hdr;
if (conn->msg_ctx == NULL) {
@ -497,42 +409,7 @@ static NTSTATUS ctdb_read_req(struct ctdbd_connection *conn, uint32_t reqid,
}
ctdbd_msg_call_back(conn, msg);
msg_state = talloc(NULL, struct deferred_msg_state);
if (msg_state == NULL) {
DEBUG(0, ("talloc failed\n"));
TALLOC_FREE(hdr);
goto next_pkt;
}
if (!(msg_state->rec = ctdb_pull_messaging_rec(
msg_state, msg->hdr.length, msg))) {
DEBUG(0, ("ctdbd_pull_messaging_rec failed\n"));
TALLOC_FREE(msg_state);
TALLOC_FREE(hdr);
goto next_pkt;
}
TALLOC_FREE(hdr);
msg_state->msg_ctx = conn->msg_ctx;
/*
* We're waiting for a call reply, but an async message has
* crossed. Defer dispatching to the toplevel event loop.
*/
evt = tevent_add_timer(messaging_tevent_context(conn->msg_ctx),
messaging_tevent_context(conn->msg_ctx),
timeval_zero(),
deferred_message_dispatch,
msg_state);
if (evt == NULL) {
DEBUG(0, ("event_add_timed failed\n"));
TALLOC_FREE(msg_state);
TALLOC_FREE(hdr);
goto next_pkt;
}
goto next_pkt;
}
@ -626,11 +503,6 @@ NTSTATUS ctdbd_messaging_connection(TALLOC_CTX *mem_ctx,
return status;
}
status = register_with_ctdbd(conn, (uint64_t)getpid(), NULL, NULL);
if (!NT_STATUS_IS_OK(status)) {
goto fail;
}
status = register_with_ctdbd(conn, MSG_SRVID_SAMBA, NULL, NULL);
if (!NT_STATUS_IS_OK(status)) {
goto fail;
@ -668,7 +540,6 @@ static NTSTATUS ctdb_handle_message(struct messaging_context *msg_ctx,
struct ctdb_req_header *hdr)
{
struct ctdb_req_message *msg;
struct messaging_rec *msg_rec;
if (hdr->operation != CTDB_REQ_MESSAGE) {
DEBUG(0, ("Received async msg of type %u, discarding\n",
@ -717,18 +588,6 @@ static NTSTATUS ctdb_handle_message(struct messaging_context *msg_ctx,
ctdbd_msg_call_back(conn, msg);
if (!ctdb_is_our_srvid(conn, msg->srvid)) {
DEBUG(0,("Got unexpected message with srvid=%llu\n",
(unsigned long long)msg->srvid));
return NT_STATUS_OK;
}
msg_rec = ctdb_pull_messaging_rec(talloc_tos(), msg->hdr.length, msg);
if (msg_rec == NULL) {
DEBUG(10, ("ctdb_pull_messaging_rec failed\n"));
return NT_STATUS_NO_MEMORY;
}
messaging_dispatch_rec(conn->msg_ctx, msg_rec);
return NT_STATUS_OK;
}

View File

@ -128,6 +128,88 @@ static int messaging_ctdbd_destructor(struct messaging_ctdbd_context *ctx)
return 0;
}
static struct messaging_rec *ctdb_pull_messaging_rec(
TALLOC_CTX *mem_ctx, const struct ctdb_req_message *msg)
{
struct messaging_rec *result;
DATA_BLOB blob;
enum ndr_err_code ndr_err;
size_t len = msg->hdr.length;
if (len < offsetof(struct ctdb_req_message, data)) {
return NULL;
}
len -= offsetof(struct ctdb_req_message, data);
if (len < msg->datalen) {
return NULL;
}
result = talloc(mem_ctx, struct messaging_rec);
if (result == NULL) {
return NULL;
}
blob = data_blob_const(msg->data, msg->datalen);
ndr_err = ndr_pull_struct_blob_all(
&blob, result, result,
(ndr_pull_flags_fn_t)ndr_pull_messaging_rec);
if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
DEBUG(0, ("ndr_pull_struct_blob failed: %s\n",
ndr_errstr(ndr_err)));
TALLOC_FREE(result);
return NULL;
}
if (DEBUGLEVEL >= 11) {
DEBUG(11, ("ctdb_pull_messaging_rec:\n"));
NDR_PRINT_DEBUG(messaging_rec, result);
}
return result;
}
static void messaging_ctdb_recv(struct ctdb_req_message *msg,
void *private_data)
{
struct messaging_context *msg_ctx = talloc_get_type_abort(
private_data, struct messaging_context);
struct server_id me = messaging_server_id(msg_ctx);
struct messaging_rec *rec;
NTSTATUS status;
struct iovec iov;
rec = ctdb_pull_messaging_rec(msg_ctx, msg);
if (rec == NULL) {
DEBUG(10, ("%s: ctdb_pull_messaging_rec failed\n", __func__));
return;
}
if (!server_id_same_process(&me, &rec->dest)) {
struct server_id_buf id1, id2;
DEBUG(10, ("%s: I'm %s, ignoring msg to %s\n", __func__,
server_id_str_buf(me, &id1),
server_id_str_buf(rec->dest, &id2)));
TALLOC_FREE(rec);
return;
}
iov = (struct iovec) { .iov_base = rec->buf.data,
.iov_len = rec->buf.length };
status = messaging_send_iov_from(msg_ctx, rec->src, rec->dest,
rec->msg_type, &iov, 1, NULL, 0);
TALLOC_FREE(rec);
if (!NT_STATUS_IS_OK(status)) {
DEBUG(10, ("%s: messaging_send_iov_from failed: %s\n",
__func__, nt_errstr(status)));
}
}
NTSTATUS messaging_ctdbd_init(struct messaging_context *msg_ctx,
TALLOC_CTX *mem_ctx,
struct messaging_backend **presult)
@ -165,6 +247,9 @@ NTSTATUS messaging_ctdbd_init(struct messaging_context *msg_ctx,
return status;
}
status = register_with_ctdbd(ctx->conn, getpid(),
messaging_ctdb_recv, msg_ctx);
global_ctdb_connection_pid = getpid();
global_ctdbd_connection = ctx->conn;
talloc_set_destructor(ctx, messaging_ctdbd_destructor);