mirror of
https://github.com/samba-team/samba.git
synced 2024-12-22 13:34:15 +03:00
r21233: first version of samba4 messaging using ctdb is working. This means we
should now work on a real cluster, and not just a localhost simulator
This commit is contained in:
parent
b288ba05e5
commit
f05072ad74
@ -80,8 +80,7 @@ struct tdb_wrap *cluster_tdb_tmp_open(TALLOC_CTX *mem_ctx, const char *dbname, i
|
||||
register a callback function for a messaging endpoint
|
||||
*/
|
||||
NTSTATUS cluster_message_init(struct messaging_context *msg, struct server_id server,
|
||||
void (*handler)(struct messaging_context *,
|
||||
struct server_id, uint32_t, DATA_BLOB))
|
||||
cluster_message_fn_t handler)
|
||||
{
|
||||
cluster_init();
|
||||
return ops->message_init(ops, msg, server, handler);
|
||||
|
@ -34,8 +34,7 @@
|
||||
#define cluster_node_equal(id1, id2) ((id1)->node == (id2)->node)
|
||||
|
||||
struct messaging_context;
|
||||
typedef void (*cluster_message_fn_t)(struct messaging_context *,
|
||||
struct server_id, uint32_t, DATA_BLOB);
|
||||
typedef void (*cluster_message_fn_t)(struct messaging_context *, DATA_BLOB);
|
||||
|
||||
/* prototypes */
|
||||
struct server_id cluster_id(uint32_t id);
|
||||
|
@ -28,12 +28,25 @@
|
||||
#include "lib/tdb/include/tdb.h"
|
||||
#include "include/ctdb.h"
|
||||
#include "db_wrap.h"
|
||||
#include "lib/util/dlinklist.h"
|
||||
|
||||
/* a linked list of messaging handlers, allowing incoming messages
|
||||
to be directed to the right messaging context */
|
||||
struct cluster_messaging_list {
|
||||
struct cluster_messaging_list *next, *prev;
|
||||
struct cluster_state *state;
|
||||
struct messaging_context *msg;
|
||||
struct server_id server;
|
||||
cluster_message_fn_t handler;
|
||||
};
|
||||
|
||||
struct cluster_state {
|
||||
struct ctdb_context *ctdb;
|
||||
struct cluster_messaging_list *list;
|
||||
};
|
||||
|
||||
|
||||
|
||||
/*
|
||||
return a server_id for a ctdb node
|
||||
*/
|
||||
@ -91,6 +104,33 @@ static void *ctdb_backend_handle(struct cluster_ops *ops)
|
||||
return (void *)state->ctdb;
|
||||
}
|
||||
|
||||
/*
|
||||
dispatch incoming ctdb messages
|
||||
*/
|
||||
static void ctdb_message_handler(struct ctdb_context *ctdb, uint32_t srvid,
|
||||
TDB_DATA data, void *private)
|
||||
{
|
||||
struct cluster_state *state = talloc_get_type(private, struct cluster_state);
|
||||
struct cluster_messaging_list *m;
|
||||
for (m=state->list;m;m=m->next) {
|
||||
if (srvid == m->server.id) {
|
||||
DATA_BLOB bdata;
|
||||
bdata.data = data.dptr;
|
||||
bdata.length = data.dsize;
|
||||
m->handler(m->msg, bdata);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
destroy a element of messaging list (when messaging context goes away)
|
||||
*/
|
||||
static int cluster_messaging_destructor(struct cluster_messaging_list *m)
|
||||
{
|
||||
DLIST_REMOVE(m->state->list, m);
|
||||
return 0;
|
||||
}
|
||||
|
||||
/*
|
||||
setup a handler for ctdb messages
|
||||
*/
|
||||
@ -99,6 +139,19 @@ static NTSTATUS ctdb_message_init(struct cluster_ops *ops,
|
||||
struct server_id server,
|
||||
cluster_message_fn_t handler)
|
||||
{
|
||||
struct cluster_state *state = ops->private;
|
||||
struct cluster_messaging_list *m;
|
||||
m = talloc(msg, struct cluster_messaging_list);
|
||||
NT_STATUS_HAVE_NO_MEMORY(m);
|
||||
|
||||
m->state = state;
|
||||
m->msg = msg;
|
||||
m->server = server;
|
||||
m->handler = handler;
|
||||
DLIST_ADD(state->list, m);
|
||||
|
||||
talloc_set_destructor(m, cluster_messaging_destructor);
|
||||
|
||||
return NT_STATUS_OK;
|
||||
}
|
||||
|
||||
@ -109,7 +162,19 @@ static NTSTATUS ctdb_message_send(struct cluster_ops *ops,
|
||||
struct server_id server, uint32_t msg_type,
|
||||
DATA_BLOB *data)
|
||||
{
|
||||
return NT_STATUS_INVALID_DEVICE_REQUEST;
|
||||
struct cluster_state *state = ops->private;
|
||||
struct ctdb_context *ctdb = state->ctdb;
|
||||
TDB_DATA tdata;
|
||||
int ret;
|
||||
|
||||
tdata.dptr = data->data;
|
||||
tdata.dsize = data->length;
|
||||
|
||||
ret = ctdb_send_message(ctdb, server.node, server.id, msg_type, tdata);
|
||||
if (ret != 0) {
|
||||
return NT_STATUS_INTERNAL_DB_CORRUPTION;
|
||||
}
|
||||
return NT_STATUS_OK;
|
||||
}
|
||||
|
||||
static struct cluster_ops cluster_ctdb_ops = {
|
||||
@ -148,6 +213,8 @@ void cluster_ctdb_init(struct event_context *ev)
|
||||
state->ctdb = ctdb_init(ev);
|
||||
if (state->ctdb == NULL) goto failed;
|
||||
|
||||
state->list = NULL;
|
||||
|
||||
cluster_ctdb_ops.private = state;
|
||||
|
||||
ret = ctdb_set_transport(state->ctdb, transport);
|
||||
@ -181,6 +248,14 @@ void cluster_ctdb_init(struct event_context *ev)
|
||||
goto failed;
|
||||
}
|
||||
|
||||
/* setup messaging handler */
|
||||
ret = ctdb_set_message_handler(state->ctdb, ctdb_message_handler, state);
|
||||
if (ret == -1) {
|
||||
DEBUG(0,("ctdb_set_message_handler failed - %s\n",
|
||||
ctdb_errstr(state->ctdb)));
|
||||
goto failed;
|
||||
}
|
||||
|
||||
ret = ctdb_attach(state->ctdb, "cluster.tdb", TDB_DEFAULT, O_RDWR|O_CREAT|O_TRUNC, 0666);
|
||||
if (ret == -1) {
|
||||
DEBUG(0,("ctdb_attach failed - %s\n", ctdb_errstr(state->ctdb)));
|
||||
@ -199,6 +274,7 @@ void cluster_ctdb_init(struct event_context *ev)
|
||||
ctdb_connect_wait(state->ctdb);
|
||||
|
||||
cluster_set_ops(&cluster_ctdb_ops);
|
||||
|
||||
return;
|
||||
|
||||
failed:
|
||||
|
@ -155,8 +155,7 @@ static void messaging_dispatch(struct messaging_context *msg, struct messaging_r
|
||||
/*
|
||||
handler for messages that arrive from other nodes in the cluster
|
||||
*/
|
||||
static void cluster_message_handler(struct messaging_context *msg, struct server_id from,
|
||||
uint32_t msg_type, DATA_BLOB packet)
|
||||
static void cluster_message_handler(struct messaging_context *msg, DATA_BLOB packet)
|
||||
{
|
||||
struct messaging_rec *rec;
|
||||
|
||||
@ -165,7 +164,6 @@ static void cluster_message_handler(struct messaging_context *msg, struct server
|
||||
smb_panic("Unable to allocate messaging_rec");
|
||||
}
|
||||
|
||||
talloc_steal(rec, packet.data);
|
||||
rec->msg = msg;
|
||||
rec->path = msg->path;
|
||||
rec->header = (struct messaging_header *)packet.data;
|
||||
@ -406,12 +404,6 @@ NTSTATUS messaging_send(struct messaging_context *msg, struct server_id server,
|
||||
NTSTATUS status;
|
||||
size_t dlength = data?data->length:0;
|
||||
|
||||
if (!cluster_node_equal(&msg->server_id, &server)) {
|
||||
/* the destination is on another node - dispatch via
|
||||
the cluster layer */
|
||||
return cluster_message_send(server, msg_type, data);
|
||||
}
|
||||
|
||||
rec = talloc(msg, struct messaging_rec);
|
||||
if (rec == NULL) {
|
||||
return NT_STATUS_NO_MEMORY;
|
||||
@ -435,6 +427,14 @@ NTSTATUS messaging_send(struct messaging_context *msg, struct server_id server,
|
||||
data->data, dlength);
|
||||
}
|
||||
|
||||
if (!cluster_node_equal(&msg->server_id, &server)) {
|
||||
/* the destination is on another node - dispatch via
|
||||
the cluster layer */
|
||||
status = cluster_message_send(server, msg_type, &rec->packet);
|
||||
talloc_free(rec);
|
||||
return status;
|
||||
}
|
||||
|
||||
rec->path = messaging_path(msg, server);
|
||||
talloc_steal(rec, rec->path);
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user