1
0
mirror of https://github.com/samba-team/samba.git synced 2024-12-24 21:34:56 +03:00

Merge from tridge

(This used to be ctdb commit e087b4644fe2236bc2de3c8623900d56288bc9c6)
This commit is contained in:
Alexander Bokovoy 2006-12-01 12:10:18 +03:00
commit 6b97bddd7c
11 changed files with 334 additions and 22866 deletions

View File

@ -38,6 +38,14 @@ int ctdb_set_transport(struct ctdb_context *ctdb, const char *transport)
return -1;
}
/*
set some ctdb flags
*/
void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags)
{
ctdb->flags |= flags;
}
/*
add a node to the list of active nodes
@ -149,7 +157,31 @@ int ctdb_start(struct ctdb_context *ctdb)
*/
static void ctdb_recv_pkt(struct ctdb_context *ctdb, uint8_t *data, uint32_t length)
{
printf("received pkt of length %d\n", length);
struct ctdb_req_header *hdr;
if (length < sizeof(*hdr)) {
ctdb_set_error(ctdb, "Bad packet length %d\n", length);
return;
}
hdr = (struct ctdb_req_header *)data;
if (length != hdr->length) {
ctdb_set_error(ctdb, "Bad header length %d expected %d\n",
hdr->length, length);
return;
}
switch (hdr->operation) {
case CTDB_REQ_CALL:
ctdb_request_call(ctdb, hdr);
break;
case CTDB_REPLY_CALL:
ctdb_reply_call(ctdb, hdr);
break;
default:
printf("Packet with unknown operation %d\n", hdr->operation);
talloc_free(hdr);
break;
}
}
/*
@ -157,7 +189,9 @@ static void ctdb_recv_pkt(struct ctdb_context *ctdb, uint8_t *data, uint32_t len
*/
static void ctdb_node_dead(struct ctdb_node *node)
{
printf("%s: node %s is dead\n", node->ctdb->name, node->name);
node->ctdb->num_connected--;
printf("%s: node %s is dead: %d connected\n",
node->ctdb->name, node->name, node->ctdb->num_connected);
}
/*
@ -165,7 +199,37 @@ static void ctdb_node_dead(struct ctdb_node *node)
*/
static void ctdb_node_connected(struct ctdb_node *node)
{
printf("%s: connected to %s\n", node->ctdb->name, node->name);
node->ctdb->num_connected++;
printf("%s: connected to %s - %d connected\n",
node->ctdb->name, node->name, node->ctdb->num_connected);
}
/*
wait for all nodes to be connected
*/
void ctdb_connect_wait(struct ctdb_context *ctdb)
{
int expected = ctdb->num_nodes - 1;
if (ctdb->flags & CTDB_FLAG_SELF_CONNECT) {
expected++;
}
while (ctdb->num_connected != expected) {
event_loop_once(ctdb->ev);
}
}
/*
wait until we're the only node left
*/
void ctdb_wait_loop(struct ctdb_context *ctdb)
{
int expected = 0;
if (ctdb->flags & CTDB_FLAG_SELF_CONNECT) {
expected++;
}
while (ctdb->num_connected > expected) {
event_loop_once(ctdb->ev);
}
}
static const struct ctdb_upcalls ctdb_upcalls = {

View File

@ -86,49 +86,199 @@ static int ctdb_call_local(struct ctdb_context *ctdb, TDB_DATA key, int call_id,
}
/*
make a remote ctdb call
called when a CTDB_REQ_CALL packet comes in
*/
int ctdb_call(struct ctdb_context *ctdb, TDB_DATA key, int call_id,
TDB_DATA *call_data, TDB_DATA *reply_data)
void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
{
uint32_t dest;
struct ctdb_req_call *c;
uint32_t len;
struct ctdb_req_call *c = (struct ctdb_req_call *)hdr;
TDB_DATA key, call_data, reply_data;
struct ctdb_reply_call *r;
struct ctdb_node *node;
dest = ctdb_hash(&key) % ctdb->num_nodes;
if (dest == ctdb->vnn) {
return ctdb_call_local(ctdb, key, call_id, call_data, reply_data);
}
key.dptr = c->data;
key.dsize = c->keylen;
call_data.dptr = c->data + c->keylen;
call_data.dsize = c->calldatalen;
len = sizeof(*c) + key.dsize + (call_data?call_data->dsize:0);
c = talloc_size(ctdb, len);
CTDB_NO_MEMORY(ctdb, c);
ctdb_call_local(ctdb, key, c->callid,
call_data.dsize?&call_data:NULL,
&reply_data);
c->hdr.operation = CTDB_OP_CALL;
c->hdr.destnode = dest;
c->hdr.srcnode = ctdb->vnn;
/* this limits us to 16k outstanding messages - not unreasonable */
c->hdr.reqid = idr_get_new(ctdb->idr, c, 0xFFFF);
c->callid = call_id;
c->keylen = key.dsize;
c->calldatalen = call_data?call_data->dsize:0;
memcpy(&c->data[0], key.dptr, key.dsize);
if (call_data) {
memcpy(&c->data[key.dsize], call_data->dptr, call_data->dsize);
}
r = talloc_size(ctdb, sizeof(*r) + reply_data.dsize);
r->hdr.length = sizeof(*r) + reply_data.dsize;
r->hdr.operation = CTDB_REPLY_CALL;
r->hdr.destnode = hdr->srcnode;
r->hdr.srcnode = hdr->destnode;
r->hdr.reqid = hdr->reqid;
r->datalen = reply_data.dsize;
memcpy(&r->data[0], reply_data.dptr, reply_data.dsize);
node = ctdb->nodes[dest];
node = ctdb->nodes[hdr->srcnode];
if (ctdb->methods->queue_pkt(node, (uint8_t *)c, len) != 0) {
talloc_free(c);
return -1;
}
ctdb->methods->queue_pkt(node, (uint8_t *)r, r->hdr.length);
/*
event_add_timed(ctdb->ev, c, timeval_current_ofs(CTDB_REQ_TIMEOUT, 0),
ctdb_call_timeout, c);
*/
return -1;
talloc_free(reply_data.dptr);
talloc_free(r);
}
enum call_state {CTDB_CALL_WAIT, CTDB_CALL_DONE, CTDB_CALL_ERROR};
/*
state of a in-progress ctdb call
*/
struct ctdb_call_state {
enum call_state state;
struct ctdb_req_call *c;
struct ctdb_node *node;
TDB_DATA reply_data;
};
/*
called when a CTDB_REPLY_CALL packet comes in
*/
void ctdb_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
{
struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
struct ctdb_call_state *state;
TDB_DATA reply_data;
state = idr_find(ctdb->idr, hdr->reqid);
reply_data.dptr = c->data;
reply_data.dsize = c->datalen;
state->reply_data = reply_data;
talloc_steal(state, c);
state->state = CTDB_CALL_DONE;
}
/*
destroy a ctdb_call
*/
static int ctdb_call_destructor(struct ctdb_call_state *state)
{
idr_remove(state->node->ctdb->idr, state->c->hdr.reqid);
return 0;
}
/*
called when a call times out
*/
void ctdb_call_timeout(struct event_context *ev, struct timed_event *te,
struct timeval t, void *private)
{
struct ctdb_call_state *state = talloc_get_type(private, struct ctdb_call_state);
state->state = CTDB_CALL_ERROR;
ctdb_set_error(state->node->ctdb, "ctdb_call timed out");
}
/*
fake an event driven local ctdb_call
*/
struct ctdb_call_state *ctdb_call_local_send(struct ctdb_context *ctdb,
TDB_DATA key, int call_id,
TDB_DATA *call_data, TDB_DATA *reply_data)
{
struct ctdb_call_state *state;
int ret;
state = talloc_zero(ctdb, struct ctdb_call_state);
CTDB_NO_MEMORY(ctdb, state);
state->state = CTDB_CALL_DONE;
state->node = ctdb->nodes[ctdb->vnn];
ret = ctdb_call_local(ctdb, key, call_id, call_data, &state->reply_data);
return state;
}
/*
make a remote ctdb call - async send
*/
struct ctdb_call_state *ctdb_call_send(struct ctdb_context *ctdb,
TDB_DATA key, int call_id,
TDB_DATA *call_data, TDB_DATA *reply_data)
{
uint32_t dest;
uint32_t len;
struct ctdb_call_state *state;
dest = ctdb_hash(&key) % ctdb->num_nodes;
if (dest == ctdb->vnn && !(ctdb->flags & CTDB_FLAG_SELF_CONNECT)) {
return ctdb_call_local_send(ctdb, key, call_id, call_data, reply_data);
}
state = talloc_zero(ctdb, struct ctdb_call_state);
CTDB_NO_MEMORY(ctdb, state);
len = sizeof(*state->c) + key.dsize + (call_data?call_data->dsize:0);
state->c = talloc_size(ctdb, len);
CTDB_NO_MEMORY(ctdb, state->c);
state->c->hdr.length = len;
state->c->hdr.operation = CTDB_REQ_CALL;
state->c->hdr.destnode = dest;
state->c->hdr.srcnode = ctdb->vnn;
/* this limits us to 16k outstanding messages - not unreasonable */
state->c->hdr.reqid = idr_get_new(ctdb->idr, state, 0xFFFF);
state->c->callid = call_id;
state->c->keylen = key.dsize;
state->c->calldatalen = call_data?call_data->dsize:0;
memcpy(&state->c->data[0], key.dptr, key.dsize);
if (call_data) {
memcpy(&state->c->data[key.dsize], call_data->dptr, call_data->dsize);
}
state->node = ctdb->nodes[dest];
state->state = CTDB_CALL_WAIT;
talloc_set_destructor(state, ctdb_call_destructor);
if (ctdb->methods->queue_pkt(state->node, (uint8_t *)state->c, len) != 0) {
talloc_free(state);
return NULL;
}
event_add_timed(ctdb->ev, state, timeval_current_ofs(CTDB_REQ_TIMEOUT, 0),
ctdb_call_timeout, state);
return state;
}
/*
make a remote ctdb call - async recv
*/
int ctdb_call_recv(struct ctdb_call_state *state, TDB_DATA *reply_data)
{
while (state->state < CTDB_CALL_DONE) {
event_loop_once(state->node->ctdb->ev);
}
if (state->state != CTDB_CALL_DONE) {
talloc_free(state);
return -1;
}
if (reply_data) {
reply_data->dptr = talloc_memdup(state->node->ctdb,
state->reply_data.dptr,
state->reply_data.dsize);
reply_data->dsize = state->reply_data.dsize;
}
talloc_free(state);
return 0;
}
/*
full ctdb_call
*/
int ctdb_call(struct ctdb_context *ctdb,
TDB_DATA key, int call_id,
TDB_DATA *call_data, TDB_DATA *reply_data)
{
struct ctdb_call_state *state;
state = ctdb_call_send(ctdb, key, call_id, call_data, reply_data);
return ctdb_call_recv(state, reply_data);
}

22743
ctdb/configure vendored

File diff suppressed because it is too large Load Diff

View File

@ -23,7 +23,7 @@
#include "system/filesys.h"
#include "popt.h"
enum my_functions {FUNC_SORT=1, FUNC_FETCH=2, FUNC_SORT_ARRAY=3};
enum my_functions {FUNC_SORT=1, FUNC_FETCH=2};
static int int_compare(int *i1, int *i2)
{
@ -50,36 +50,14 @@ static int sort_func(struct ctdb_call *call)
return CTDB_ERR_NOMEM;
}
call->new_data->dsize = call->record_data.dsize + call->call_data->dsize;
memcpy(call->new_data->dptr,
call->record_data.dptr, call->record_data.dsize);
memcpy(call->new_data->dptr+call->record_data.dsize,
call->call_data->dptr, call->call_data->dsize);
qsort(call->new_data->dptr, call->new_data->dsize / sizeof(int),
sizeof(int), (comparison_fn_t)int_compare);
return 0;
}
/*
add an integer into a record in sorted order
*/
static int sort_func_array(struct ctdb_call *call)
{
if (call->call_data == NULL ||
call->call_data->dsize % sizeof(int) != 0) {
return CTDB_ERR_INVALID;
}
call->new_data = talloc(call, TDB_DATA);
if (call->new_data == NULL) {
return CTDB_ERR_NOMEM;
}
call->new_data->dptr = talloc_size(call,
call->call_data->dsize);
if (call->new_data->dptr == NULL) {
return CTDB_ERR_NOMEM;
}
call->new_data->dsize = call->call_data->dsize;
memcpy(call->new_data->dptr,
call->call_data->dptr, call->call_data->dsize);
qsort(call->new_data->dptr, call->new_data->dsize / sizeof(int),
sizeof(int), (comparison_fn_t)int_compare);
return 0;
}
@ -101,12 +79,14 @@ int main(int argc, const char *argv[])
const char *nlist = NULL;
const char *transport = "tcp";
const char *myaddress = NULL;
int self_connect=0;
struct poptOption popt_options[] = {
POPT_AUTOHELP
{ "nlist", 0, POPT_ARG_STRING, &nlist, 0, "node list file", "filename" },
{ "listen", 0, POPT_ARG_STRING, &myaddress, 0, "address to listen on", "address" },
{ "transport", 0, POPT_ARG_STRING, &transport, 0, "protocol transport", NULL },
{ "self-connect", 0, POPT_ARG_NONE, &self_connect, 0, "enable self connect", "boolean" },
POPT_TABLEEND
};
int opt;
@ -149,6 +129,10 @@ int main(int argc, const char *argv[])
exit(1);
}
if (self_connect) {
ctdb_set_flags(ctdb, CTDB_FLAG_SELF_CONNECT);
}
ret = ctdb_set_transport(ctdb, transport);
if (ret == -1) {
printf("ctdb_set_transport failed - %s\n", ctdb_errstr(ctdb));
@ -172,10 +156,9 @@ int main(int argc, const char *argv[])
/* setup a ctdb call function */
ret = ctdb_set_call(ctdb, sort_func, FUNC_SORT);
ret = ctdb_set_call(ctdb, fetch_func, FUNC_FETCH);
ret = ctdb_set_call(ctdb, sort_func_array, FUNC_SORT_ARRAY);
/* attach to a specific database */
ret = ctdb_attach(ctdb, "./testx.tdb", TDB_DEFAULT, O_RDWR|O_CREAT|O_TRUNC, 0666);
ret = ctdb_attach(ctdb, "test.tdb", TDB_DEFAULT, O_RDWR|O_CREAT|O_TRUNC, 0666);
if (ret == -1) {
printf("ctdb_attach failed - %s\n", ctdb_errstr(ctdb));
exit(1);
@ -183,48 +166,41 @@ int main(int argc, const char *argv[])
/* start the protocol running */
ret = ctdb_start(ctdb);
/* wait until all nodes are connected (should not be needed
outide of test code) */
ctdb_connect_wait(ctdb);
key.dptr = "test";
key.dsize = strlen("test")+1;
#if 1
#define TEST_ARRAY_SIZE 100
/* loop for testing */
while (1) {
/* add some random data */
data.dptr = (void*)talloc_array(ev, int, TEST_ARRAY_SIZE);
for (i=0;i<TEST_ARRAY_SIZE;i++) {
data.dptr[i] = random();
}
data.dsize = sizeof(int)*TEST_ARRAY_SIZE;
ret = ctdb_call(ctdb, key, FUNC_SORT_ARRAY, &data, NULL);
/* add some random data */
for (i=0;i<10;i++) {
int v = random() % 1000;
data.dptr = (uint8_t *)&v;
data.dsize = sizeof(v);
ret = ctdb_call(ctdb, key, FUNC_SORT, &data, NULL);
if (ret == -1) {
printf("ctdb_call FUNC_SORT_ARRAY failed - %s\n", ctdb_errstr(ctdb));
printf("ctdb_call FUNC_SORT failed - %s\n", ctdb_errstr(ctdb));
exit(1);
}
talloc_free(data.dptr);
data.dptr = NULL;
data.dsize = 0;
/* fetch the record */
ret = ctdb_call(ctdb, key, FUNC_FETCH, NULL, &data);
if (ret == -1) {
printf("ctdb_call FUNC_FETCH failed - %s\n", ctdb_errstr(ctdb));
exit(1);
}
printf("ctdb_call result. Data size is %u, sizeof(int)-based is %u\n", data.dsize, data.dsize/sizeof(int));
for (i=0;i<data.dsize/sizeof(int);i++) {
printf("%d, \n", data.dptr[i]);
}
talloc_free(data.dptr);
event_loop_once(ev);
}
#endif
/* fetch the record */
ret = ctdb_call(ctdb, key, FUNC_FETCH, NULL, &data);
if (ret == -1) {
printf("ctdb_call FUNC_FETCH failed - %s\n", ctdb_errstr(ctdb));
exit(1);
}
for (i=0;i<data.dsize/sizeof(int);i++) {
printf("%3d\n", ((int *)data.dptr)[i]);
}
talloc_free(data.dptr);
/* go into a wait loop to allow other nodes to complete */
ctdb_wait_loop(ctdb);
/* shut it down */
talloc_free(ctdb);
return 0;

View File

@ -33,6 +33,12 @@ struct ctdb_call {
#define CTDB_ERR_INVALID 1
#define CTDB_ERR_NOMEM 2
/*
ctdb flags
*/
#define CTDB_FLAG_SELF_CONNECT (1<<0)
struct event_context;
/*
@ -45,6 +51,11 @@ struct ctdb_context *ctdb_init(struct event_context *ev);
*/
int ctdb_set_transport(struct ctdb_context *ctdb, const char *transport);
/*
set some flags
*/
void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags);
/*
tell ctdb what address to listen on, in transport specific format
*/
@ -88,3 +99,13 @@ int ctdb_attach(struct ctdb_context *ctdb, const char *name, int tdb_flags,
int ctdb_call(struct ctdb_context *ctdb, TDB_DATA key, int call_id,
TDB_DATA *call_data, TDB_DATA *reply_data);
/*
wait for all nodes to be connected - useful for test code
*/
void ctdb_connect_wait(struct ctdb_context *ctdb);
/*
wait until we're the only node left
*/
void ctdb_wait_loop(struct ctdb_context *ctdb);

View File

@ -78,6 +78,8 @@ struct ctdb_context {
const char *name;
uint32_t vnn; /* our own vnn */
uint32_t num_nodes;
uint32_t num_connected;
unsigned flags;
struct idr_context *idr;
struct ctdb_node **nodes; /* array of nodes in the cluster - indexed by vnn */
struct ctdb_registered_call *calls; /* list of registered calls */
@ -100,23 +102,15 @@ struct ctdb_context {
operation IDs
*/
enum ctdb_operation {
CTDB_OP_CALL = 0
CTDB_REQ_CALL = 0,
CTDB_REPLY_CALL = 1
};
/*
packet structures
*/
struct ctdb_req_header {
uint32_t _length; /* ignored by datagram transports */
uint32_t operation;
uint32_t destnode;
uint32_t srcnode;
uint32_t reqid;
uint32_t reqtimeout;
};
struct ctdb_reply_header {
uint32_t _length; /* ignored by datagram transports */
uint32_t length;
uint32_t operation;
uint32_t destnode;
uint32_t srcnode;
@ -132,7 +126,7 @@ struct ctdb_req_call {
};
struct ctdb_reply_call {
struct ctdb_reply_header hdr;
struct ctdb_req_header hdr;
uint32_t datalen;
uint8_t data[0];
};
@ -144,4 +138,6 @@ int ctdb_parse_address(struct ctdb_context *ctdb,
TALLOC_CTX *mem_ctx, const char *str,
struct ctdb_address *address);
uint32_t ctdb_hash(TDB_DATA *key);
void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
void ctdb_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);

View File

@ -44,7 +44,7 @@ static void ctdb_node_connect_write(struct event_context *ev, struct fd_event *f
struct ctdb_tcp_node);
struct ctdb_context *ctdb = node->ctdb;
int error = 0;
socklen_t len;
socklen_t len = sizeof(error);
if (getsockopt(tnode->fd, SOL_SOCKET, SO_ERROR, &error, &len) != 0 ||
error != 0) {
@ -63,6 +63,10 @@ static void ctdb_node_connect_write(struct event_context *ev, struct fd_event *f
/* tell the ctdb layer we are connected */
node->ctdb->upcalls->node_connected(node);
if (tnode->queue) {
EVENT_FD_WRITEABLE(tnode->fde);
}
}
/*

View File

@ -39,7 +39,8 @@ int ctdb_tcp_start(struct ctdb_context *ctdb)
next event loop */
for (i=0;i<ctdb->num_nodes;i++) {
struct ctdb_node *node = *(ctdb->nodes + i);
if (ctdb_same_address(&ctdb->address, &node->address)) continue;
if (!(ctdb->flags & CTDB_FLAG_SELF_CONNECT) &&
ctdb_same_address(&ctdb->address, &node->address)) continue;
event_add_timed(ctdb->ev, node, timeval_zero(),
ctdb_tcp_node_connect, node);
}

View File

@ -140,8 +140,6 @@ void ctdb_tcp_incoming_read(struct event_context *ev, struct fd_event *fde,
/* tell the ctdb layer above that we have a packet */
in->ctdb->upcalls->recv_pkt(in->ctdb, data, num_ready);
talloc_free(data);
}
/*
@ -153,14 +151,9 @@ int ctdb_tcp_queue_pkt(struct ctdb_node *node, uint8_t *data, uint32_t length)
struct ctdb_tcp_node);
struct ctdb_tcp_packet *pkt;
if (tnode->fd == -1) {
ctdb_set_error(node->ctdb, "Sending to dead node %s\n", node->name);
return -1;
}
/* if the queue is empty then try an immediate write, avoiding
queue overhead. This relies on non-blocking sockets */
if (tnode->queue == NULL) {
if (tnode->queue == NULL && tnode->fd != -1) {
ssize_t n = write(tnode->fd, data, length);
if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
event_add_timed(node->ctdb->ev, node, timeval_zero(),
@ -184,7 +177,7 @@ int ctdb_tcp_queue_pkt(struct ctdb_node *node, uint8_t *data, uint32_t length)
pkt->length = length;
if (tnode->queue == NULL) {
if (tnode->queue == NULL && tnode->fd != -1) {
EVENT_FD_WRITEABLE(tnode->fde);
}

1
ctdb/tests/1node.txt Normal file
View File

@ -0,0 +1 @@
127.0.0.1:9001

5
ctdb/tests/test1.sh Executable file
View File

@ -0,0 +1,5 @@
#!/bin/sh
killall -q ctdb_test
bin/ctdb_test --nlist tests/1node.txt --listen 127.0.0.1:9001
killall ctdb_test