1
0
mirror of https://github.com/samba-team/samba.git synced 2025-02-26 21:57:41 +03:00

merge from tridge

(This used to be ctdb commit 257a167d7e8b532c03a122626dbc2db767dbfdf0)
This commit is contained in:
Ronnie sahlberg 2007-04-11 15:40:03 +10:00
commit a7a1de7bf2
7 changed files with 245 additions and 54 deletions

View File

@ -274,7 +274,7 @@ static void ctdb_node_connected(struct ctdb_node *node)
/*
wait for all nodes to be connected
*/
void ctdb_connect_wait(struct ctdb_context *ctdb)
void ctdb_daemon_connect_wait(struct ctdb_context *ctdb)
{
int expected = ctdb->num_nodes - 1;
if (ctdb->flags & CTDB_FLAG_SELF_CONNECT) {

View File

@ -38,6 +38,16 @@ static int ctdb_client_queue_pkt(struct ctdb_context *ctdb, struct ctdb_req_head
}
/*
handle a connect wait reply packet
*/
static void ctdb_reply_connect_wait(struct ctdb_context *ctdb,
struct ctdb_req_header *hdr)
{
struct ctdb_reply_connect_wait *r = (struct ctdb_reply_connect_wait *)hdr;
ctdb->num_connected = r->num_connected;
}
/*
this is called in the client, when data comes in from the daemon
*/
@ -75,6 +85,10 @@ static void ctdb_client_read_cb(uint8_t *data, size_t cnt, void *args)
case CTDB_REQ_MESSAGE:
ctdb_request_message(ctdb, hdr);
break;
case CTDB_REPLY_CONNECT_WAIT:
ctdb_reply_connect_wait(ctdb, hdr);
break;
}
}
@ -317,6 +331,9 @@ int ctdb_set_message_handler(struct ctdb_context *ctdb,
}
/*
send a message - from client context
*/
int ctdb_client_send_message(struct ctdb_context *ctdb, uint32_t vnn,
uint32_t srvid, TDB_DATA data)
{
@ -347,3 +364,42 @@ int ctdb_client_send_message(struct ctdb_context *ctdb, uint32_t vnn,
talloc_free(r);
return 0;
}
/*
wait for all nodes to be connected - from client
*/
static void ctdb_client_connect_wait(struct ctdb_context *ctdb)
{
struct ctdb_req_connect_wait r;
int res;
ZERO_STRUCT(r);
r.hdr.length = sizeof(r);
r.hdr.ctdb_magic = CTDB_MAGIC;
r.hdr.ctdb_version = CTDB_VERSION;
r.hdr.operation = CTDB_REQ_CONNECT_WAIT;
res = ctdb_queue_send(ctdb->daemon.queue, (uint8_t *)&r.hdr, r.hdr.length);
if (res != 0) {
printf("Failed to queue a connect wait request\n");
return;
}
/* now we can go into the normal wait routine, as the reply packet
will update the ctdb->num_connected variable */
ctdb_daemon_connect_wait(ctdb);
}
/*
wait for all nodes to be connected
*/
void ctdb_connect_wait(struct ctdb_context *ctdb)
{
if (!(ctdb->flags & CTDB_FLAG_DAEMON_MODE)) {
ctdb_daemon_connect_wait(ctdb);
return;
}
ctdb_client_connect_wait(ctdb);
}

View File

@ -76,6 +76,8 @@ static void daemon_message_handler(struct ctdb_context *ctdb, uint32_t srvid,
/*XXX cant use this since it returns an int CTDB_NO_MEMORY(ctdb, r);*/
talloc_set_name_const(r, "req_message packet");
ZERO_STRUCT(*r);
r->hdr.length = len;
r->hdr.ctdb_magic = CTDB_MAGIC;
r->hdr.ctdb_version = CTDB_VERSION;
@ -108,6 +110,34 @@ static void daemon_request_register_message_handler(struct ctdb_client *client,
}
/*
called when the daemon gets a connect wait request from a client
*/
static void daemon_request_connect_wait(struct ctdb_client *client,
struct ctdb_req_connect_wait *c)
{
struct ctdb_reply_connect_wait r;
int res;
/* first wait - in the daemon */
ctdb_daemon_connect_wait(client->ctdb);
/* now send the reply */
ZERO_STRUCT(r);
r.hdr.length = sizeof(r);
r.hdr.ctdb_magic = CTDB_MAGIC;
r.hdr.ctdb_version = CTDB_VERSION;
r.hdr.operation = CTDB_REPLY_CONNECT_WAIT;
r.num_connected = client->ctdb->num_connected;
res = ctdb_queue_send(client->queue, (uint8_t *)&r.hdr, r.hdr.length);
if (res != 0) {
printf("Failed to queue a connect wait response\n");
return;
}
}
/*
destroy a ctdb_client
*/
@ -124,13 +154,25 @@ static int ctdb_client_destructor(struct ctdb_client *client)
from a local client over the unix domain socket
*/
static void daemon_request_message_from_client(struct ctdb_client *client,
struct ctdb_req_message *c)
struct ctdb_req_message *c)
{
TDB_DATA data;
int res;
/* maybe the message is for another client on this node */
if (ctdb_get_vnn(client->ctdb)==c->hdr.destnode) {
ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c);
} else {
/* this is for a remote client */
/*XXX*/
return;
}
/* its for a remote node */
data.dptr = &c->data[0];
data.dsize = c->datalen;
res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode,
c->srvid, data);
if (res != 0) {
printf("Failed to send message to remote node %u\n",
c->hdr.destnode);
}
}
@ -204,12 +246,12 @@ static void client_incoming_packet(struct ctdb_client *client, void *data, size_
if (hdr->ctdb_magic != CTDB_MAGIC) {
ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
return;
goto done;
}
if (hdr->ctdb_version != CTDB_VERSION) {
ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected\n", hdr->ctdb_version);
return;
goto done;
}
switch (hdr->operation) {
@ -224,8 +266,13 @@ static void client_incoming_packet(struct ctdb_client *client, void *data, size_
case CTDB_REQ_MESSAGE:
daemon_request_message_from_client(client, (struct ctdb_req_message *)hdr);
break;
case CTDB_REQ_CONNECT_WAIT:
daemon_request_connect_wait(client, (struct ctdb_req_connect_wait *)hdr);
break;
}
done:
talloc_free(data);
}

View File

@ -30,50 +30,101 @@
/*
called when a CTDB_REQ_MESSAGE packet comes in
this dispatches the messages to the registered ctdb message handler
*/
static int ctdb_dispatch_message(struct ctdb_context *ctdb, uint32_t srvid, TDB_DATA data)
{
struct ctdb_message_list *ml;
/* XXX we need a must faster way of finding the matching srvid
- maybe a tree? */
for (ml=ctdb->message_list;ml;ml=ml->next) {
if (ml->srvid == srvid) break;
}
if (ml == NULL) {
printf("no msg handler for srvid=%u\n", srvid);
/* no registered message handler */
return -1;
}
ml->message_handler(ctdb, srvid, data, ml->message_private);
return 0;
}
/*
called when a CTDB_REQ_MESSAGE packet comes in
*/
void ctdb_request_message(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
{
struct ctdb_req_message *c = (struct ctdb_req_message *)hdr;
struct ctdb_message_list *fml, *ml;
TDB_DATA data;
/* XXX need a much faster method to find the handler */
ml = ctdb->message_list;
fml = ml;
while (ml) {
if (ml->srvid==c->srvid) {
break;
}
ml = ml->next;
if (ml==fml) {
ml = NULL;
break;
}
}
if (ml == NULL) {
printf("no msg handler\n");
/* no registered message handler */
return;
}
data.dptr = &c->data[0];
data.dsize = c->datalen;
ml->message_handler(ctdb, c->srvid, data, ml->message_private);
ctdb_dispatch_message(ctdb, c->srvid, data);
}
/*
this local messaging handler is ugly, but is needed to prevent
recursion in ctdb_send_message() when the destination node is the
same as the source node
*/
struct ctdb_local_message {
struct ctdb_context *ctdb;
uint32_t srvid;
TDB_DATA data;
};
static void ctdb_local_message_trigger(struct event_context *ev, struct timed_event *te,
struct timeval t, void *private)
{
struct ctdb_local_message *m = talloc_get_type(private,
struct ctdb_local_message);
int res;
res = ctdb_dispatch_message(m->ctdb, m->srvid, m->data);
if (res != 0) {
printf("Failed to dispatch message for srvid=%u\n", m->srvid);
}
talloc_free(m);
}
static int ctdb_local_message(struct ctdb_context *ctdb, uint32_t srvid, TDB_DATA data)
{
struct ctdb_local_message *m;
m = talloc(ctdb, struct ctdb_local_message);
CTDB_NO_MEMORY(ctdb, m);
m->ctdb = ctdb;
m->srvid = srvid;
m->data = data;
m->data.dptr = talloc_memdup(m, m->data.dptr, m->data.dsize);
if (m->data.dptr == NULL) {
talloc_free(m);
return -1;
}
/* this needs to be done as an event to prevent recursion */
event_add_timed(ctdb->ev, m, timeval_zero(), ctdb_local_message_trigger, m);
return 0;
}
/*
send a ctdb message
*/
int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t vnn,
uint32_t srvid, TDB_DATA data)
uint32_t srvid, TDB_DATA data)
{
struct ctdb_req_message *r;
int len;
/* see if this is a message to ourselves */
if (vnn == ctdb->vnn && !(ctdb->flags & CTDB_FLAG_SELF_CONNECT)) {
return ctdb_local_message(ctdb, srvid, data);
}
len = offsetof(struct ctdb_req_message, data) + data.dsize;
r = ctdb->methods->allocate_pkt(ctdb, len);
CTDB_NO_MEMORY(ctdb, r);

View File

@ -199,14 +199,16 @@ struct ctdb_call_state {
operation IDs
*/
enum ctdb_operation {
CTDB_REQ_CALL = 0,
CTDB_REPLY_CALL = 1,
CTDB_REPLY_REDIRECT = 2,
CTDB_REQ_DMASTER = 3,
CTDB_REPLY_DMASTER = 4,
CTDB_REPLY_ERROR = 5,
CTDB_REQ_REGISTER = 6,
CTDB_REQ_MESSAGE = 7
CTDB_REQ_CALL = 0,
CTDB_REPLY_CALL = 1,
CTDB_REPLY_REDIRECT = 2,
CTDB_REQ_DMASTER = 3,
CTDB_REPLY_DMASTER = 4,
CTDB_REPLY_ERROR = 5,
CTDB_REQ_REGISTER = 6,
CTDB_REQ_MESSAGE = 7,
CTDB_REQ_CONNECT_WAIT = 8,
CTDB_REPLY_CONNECT_WAIT = 9
};
#define CTDB_MAGIC 0x43544442 /* CTDB */
@ -281,6 +283,15 @@ struct ctdb_req_message {
uint8_t data[1];
};
struct ctdb_req_connect_wait {
struct ctdb_req_header hdr;
};
struct ctdb_reply_connect_wait {
struct ctdb_req_header hdr;
uint32_t num_connected;
};
/* internal prototypes */
void ctdb_set_error(struct ctdb_context *ctdb, const char *fmt, ...) PRINTF_ATTRIBUTE(2,3);
void ctdb_fatal(struct ctdb_context *ctdb, const char *msg);
@ -375,4 +386,16 @@ int ctdb_daemon_set_message_handler(struct ctdb_context *ctdb, uint32_t srvid,
int ctdb_client_send_message(struct ctdb_context *ctdb, uint32_t vnn,
uint32_t srvid, TDB_DATA data);
/*
send a ctdb message
*/
int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t vnn,
uint32_t srvid, TDB_DATA data);
/*
wait for all nodes to be connected
*/
void ctdb_daemon_connect_wait(struct ctdb_context *ctdb);
#endif

View File

@ -260,7 +260,7 @@ int main(int argc, const char *argv[])
/* start the protocol running */
ret = ctdb_start(ctdb);
ctdb_set_message_handler(ctdb, message_handler, 0, &msg_count);
ctdb_set_message_handler(ctdb, 0, message_handler, &msg_count);
/* wait until all nodes are connected (should not be needed
outside of test code) */

View File

@ -27,6 +27,7 @@ static int timelimit = 10;
static int num_records = 10;
static int num_msgs = 1;
static int num_repeats = 100;
static int num_clients = 2;
/*
@ -35,7 +36,8 @@ static int num_repeats = 100;
static void message_handler(struct ctdb_context *ctdb, uint32_t srvid,
TDB_DATA data, void *private)
{
printf("client vnn:%d received a message to srvid:%d\n",ctdb_get_vnn(ctdb),srvid);
printf("client vnn:%d received a message to srvid:%d\n",ctdb_get_vnn(ctdb),srvid);
fflush(stdout);
}
/*
@ -61,16 +63,17 @@ int main(int argc, const char *argv[])
{ "timelimit", 't', POPT_ARG_INT, &timelimit, 0, "timelimit", "integer" },
{ "num-records", 'r', POPT_ARG_INT, &num_records, 0, "num_records", "integer" },
{ "num-msgs", 'n', POPT_ARG_INT, &num_msgs, 0, "num_msgs", "integer" },
{ "num-clients", 0, POPT_ARG_INT, &num_clients, 0, "num_clients", "integer" },
POPT_TABLEEND
};
int opt;
const char **extra_argv;
int extra_argc = 0;
int ret;
int ret, i, j;
poptContext pc;
struct event_context *ev;
pid_t pid;
uint32_t srvid;
int srvid;
TDB_DATA data;
pc = poptGetContext(argv[0], argc, argv, popt_options, POPT_CONTEXT_KEEP_FIRST);
@ -142,13 +145,16 @@ int main(int argc, const char *argv[])
/* start the protocol running */
ret = ctdb_start(ctdb);
/*XXX why does this block forever? ctdb_connect_wait(ctdb);*/
pid=fork();
if (pid) {
srvid=0;
} else {
srvid=1;
srvid = -1;
for (i=0;i<num_clients-1;i++) {
pid=fork();
if (pid) {
srvid = i;
break;
}
}
if (srvid == -1) {
srvid = num_clients-1;
}
/* wait until all nodes are connected (should not be needed
@ -157,11 +163,19 @@ int main(int argc, const char *argv[])
data.dsize=0;
ctdb_set_message_handler(ctdb, srvid, message_handler, NULL);
sleep(3);
printf("sending message from vnn:%d to vnn:%d/srvid:%d\n",ctdb_get_vnn(ctdb),ctdb_get_vnn(ctdb), 1-srvid);
ctdb_send_message(ctdb, ctdb_get_vnn(ctdb), 1-srvid, data);
ctdb_connect_wait(ctdb);
while(1){
sleep(1);
printf("sending message from vnn:%d to vnn:%d/srvid:%d\n",ctdb_get_vnn(ctdb),ctdb_get_vnn(ctdb), 1-srvid);
for (i=0;i<ctdb_get_num_nodes(ctdb);i++) {
for (j=0;j<num_clients;j++) {
printf("sending message to %d:%d\n", i, j);
ctdb_send_message(ctdb, i, j, data);
}
}
while (1) {
event_loop_once(ev);
}