mirror of
https://github.com/samba-team/samba.git
synced 2025-02-02 09:47:23 +03:00
first version of traverse is working
(This used to be ctdb commit ecac90cee389a6fa0e9b1efba521e098a24d323f)
This commit is contained in:
parent
486c6b4fce
commit
60b42276eb
@ -1376,3 +1376,115 @@ int ctdb_set_call(struct ctdb_db_context *ctdb_db, ctdb_fn_t fn, uint32_t id)
|
||||
DLIST_ADD(ctdb_db->calls, call);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
start a cluster wide traverse. each record is sent as a message to
|
||||
the given srvid
|
||||
*/
|
||||
int ctdb_traverse_all(struct ctdb_db_context *ctdb_db, uint64_t srvid)
|
||||
{
|
||||
TDB_DATA data;
|
||||
struct ctdb_traverse_start t;
|
||||
int32_t status;
|
||||
int ret;
|
||||
|
||||
t.db_id = ctdb_db->db_id;
|
||||
t.srvid = srvid;
|
||||
t.reqid = 0;
|
||||
|
||||
data.dptr = (uint8_t *)&t;
|
||||
data.dsize = sizeof(t);
|
||||
|
||||
ret = ctdb_control(ctdb_db->ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_TRAVERSE_START, 0,
|
||||
data, NULL, NULL, &status);
|
||||
if (ret != 0 || status != 0) {
|
||||
DEBUG(0,("ctdb_traverse_all failed\n"));
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
struct list_keys_state {
|
||||
FILE *f;
|
||||
bool done;
|
||||
uint32_t count;
|
||||
};
|
||||
|
||||
/*
|
||||
called on each key during a list_keys
|
||||
*/
|
||||
static void list_keys_handler(struct ctdb_context *ctdb, uint64_t srvid,
|
||||
TDB_DATA data, void *p)
|
||||
{
|
||||
struct list_keys_state *state = (struct list_keys_state *)p;
|
||||
struct ctdb_traverse_data *d = (struct ctdb_traverse_data *)data.dptr;
|
||||
TDB_DATA key;
|
||||
char *keystr, *datastr;
|
||||
struct ctdb_ltdb_header *h;
|
||||
|
||||
if (data.dsize < sizeof(uint32_t) ||
|
||||
d->length != data.dsize) {
|
||||
DEBUG(0,("Bad data size %u in list_keys_handler\n", data.dsize));
|
||||
return;
|
||||
}
|
||||
|
||||
key.dsize = d->keylen;
|
||||
key.dptr = &d->data[0];
|
||||
data.dsize = d->datalen;
|
||||
data.dptr = &d->data[d->keylen];
|
||||
|
||||
if (key.dsize == 0 && data.dsize == 0) {
|
||||
/* end of traverse */
|
||||
state->done = True;
|
||||
return;
|
||||
}
|
||||
|
||||
h = (struct ctdb_ltdb_header *)data.dptr;
|
||||
if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
|
||||
DEBUG(0,("Bad ctdb ltdb header in list_keys_handler\n"));
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
keystr = hex_encode(ctdb, key.dptr, key.dsize);
|
||||
datastr = hex_encode(ctdb, data.dptr+sizeof(*h), data.dsize-sizeof(*h));
|
||||
|
||||
fprintf(state->f, "dmaster: %u\n", h->dmaster);
|
||||
fprintf(state->f, "rsn: %llu\n", (unsigned long long)h->rsn);
|
||||
fprintf(state->f, "key: %s\ndata: %s\n", keystr, datastr);
|
||||
|
||||
talloc_free(keystr);
|
||||
talloc_free(datastr);
|
||||
|
||||
state->count++;
|
||||
}
|
||||
|
||||
/*
|
||||
convenience function to list all keys to stdout
|
||||
*/
|
||||
int ctdb_list_keys(struct ctdb_db_context *ctdb_db, FILE *f)
|
||||
{
|
||||
int ret;
|
||||
uint64_t srvid = (getpid() | 0xFLL<<60);
|
||||
struct list_keys_state state;
|
||||
|
||||
state.f = f;
|
||||
state.done = False;
|
||||
state.count = 0;
|
||||
|
||||
ret = ctdb_set_message_handler(ctdb_db->ctdb, srvid, list_keys_handler, &state);
|
||||
if (ret != 0) {
|
||||
DEBUG(0,("Failed to setup list keys handler\n"));
|
||||
return -1;
|
||||
}
|
||||
|
||||
ret = ctdb_traverse_all(ctdb_db, srvid);
|
||||
|
||||
while (!state.done) {
|
||||
event_loop_once(ctdb_db->ctdb->ev);
|
||||
}
|
||||
|
||||
return state.count;
|
||||
}
|
||||
|
@ -131,7 +131,7 @@ static int traverse_getkeys(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data
|
||||
*/
|
||||
static int32_t ctdb_control_dispatch(struct ctdb_context *ctdb,
|
||||
uint32_t opcode, TDB_DATA indata,
|
||||
TDB_DATA *outdata)
|
||||
TDB_DATA *outdata, uint32_t srcnode)
|
||||
{
|
||||
switch (opcode) {
|
||||
case CTDB_CONTROL_PROCESS_EXISTS: {
|
||||
@ -428,6 +428,16 @@ static int32_t ctdb_control_dispatch(struct ctdb_context *ctdb,
|
||||
return ctdb_daemon_set_call(ctdb, sc->db_id, sc->fn, sc->id);
|
||||
}
|
||||
|
||||
case CTDB_CONTROL_TRAVERSE_START:
|
||||
CHECK_CONTROL_DATA_SIZE(sizeof(struct ctdb_traverse_start));
|
||||
return ctdb_control_traverse_start(ctdb, indata, outdata, srcnode);
|
||||
|
||||
case CTDB_CONTROL_TRAVERSE_ALL:
|
||||
return ctdb_control_traverse_all(ctdb, indata, outdata);
|
||||
|
||||
case CTDB_CONTROL_TRAVERSE_DATA:
|
||||
return ctdb_control_traverse_data(ctdb, indata, outdata);
|
||||
|
||||
default:
|
||||
DEBUG(0,(__location__ " Unknown CTDB control opcode %u\n", opcode));
|
||||
return -1;
|
||||
@ -449,7 +459,7 @@ void ctdb_request_control(struct ctdb_context *ctdb, struct ctdb_req_header *hdr
|
||||
data.dsize = c->datalen;
|
||||
|
||||
outdata = talloc_zero(c, TDB_DATA);
|
||||
status = ctdb_control_dispatch(ctdb, c->opcode, data, outdata);
|
||||
status = ctdb_control_dispatch(ctdb, c->opcode, data, outdata, hdr->srcnode);
|
||||
|
||||
/* some controls send no reply */
|
||||
if (c->flags & CTDB_CTRL_FLAG_NOREPLY) {
|
||||
|
@ -32,7 +32,7 @@
|
||||
/*
|
||||
this dispatches the messages to the registered ctdb message handler
|
||||
*/
|
||||
static int ctdb_dispatch_message(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data)
|
||||
int ctdb_dispatch_message(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data)
|
||||
{
|
||||
struct ctdb_message_list *ml;
|
||||
|
||||
|
@ -28,21 +28,11 @@
|
||||
|
||||
typedef void (*ctdb_traverse_fn_t)(void *private_data, TDB_DATA key, TDB_DATA data);
|
||||
|
||||
/*
|
||||
structure used to pass the data between the child and parent
|
||||
*/
|
||||
struct ctdb_traverse_data {
|
||||
uint32_t length;
|
||||
uint32_t keylen;
|
||||
uint32_t datalen;
|
||||
uint8_t data[1];
|
||||
};
|
||||
|
||||
/*
|
||||
handle returned to caller - freeing this handler will kill the child and
|
||||
terminate the traverse
|
||||
*/
|
||||
struct ctdb_traverse_handle {
|
||||
struct ctdb_traverse_local_handle {
|
||||
struct ctdb_db_context *ctdb_db;
|
||||
int fd[2];
|
||||
pid_t child;
|
||||
@ -55,10 +45,10 @@ struct ctdb_traverse_handle {
|
||||
/*
|
||||
called when data is available from the child
|
||||
*/
|
||||
static void ctdb_traverse_handler(uint8_t *rawdata, size_t length, void *private_data)
|
||||
static void ctdb_traverse_local_handler(uint8_t *rawdata, size_t length, void *private_data)
|
||||
{
|
||||
struct ctdb_traverse_handle *h = talloc_get_type(private_data,
|
||||
struct ctdb_traverse_handle);
|
||||
struct ctdb_traverse_local_handle *h = talloc_get_type(private_data,
|
||||
struct ctdb_traverse_local_handle);
|
||||
TDB_DATA key, data;
|
||||
ctdb_traverse_fn_t callback = h->callback;
|
||||
void *p = h->private_data;
|
||||
@ -82,7 +72,7 @@ static void ctdb_traverse_handler(uint8_t *rawdata, size_t length, void *private
|
||||
/*
|
||||
destroy a in-flight traverse operation
|
||||
*/
|
||||
static int traverse_destructor(struct ctdb_traverse_handle *h)
|
||||
static int traverse_local_destructor(struct ctdb_traverse_local_handle *h)
|
||||
{
|
||||
close(h->fd[0]);
|
||||
kill(h->child, SIGKILL);
|
||||
@ -91,44 +81,77 @@ static int traverse_destructor(struct ctdb_traverse_handle *h)
|
||||
}
|
||||
|
||||
/*
|
||||
callback from tdb_traverse_read()x
|
||||
form a ctdb_traverse_data record from a key/data pair
|
||||
*/
|
||||
static int ctdb_traverse_fn(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
|
||||
static struct ctdb_traverse_data *ctdb_traverse_marshall_record(TALLOC_CTX *mem_ctx,
|
||||
uint32_t reqid,
|
||||
TDB_DATA key, TDB_DATA data)
|
||||
{
|
||||
struct ctdb_traverse_handle *h = talloc_get_type(p, struct ctdb_traverse_handle);
|
||||
size_t length;
|
||||
struct ctdb_traverse_data *d;
|
||||
size_t length = offsetof(struct ctdb_traverse_data, data) + key.dsize + data.dsize;
|
||||
d = (struct ctdb_traverse_data *)talloc_size(h, length);
|
||||
|
||||
length = offsetof(struct ctdb_traverse_data, data) + key.dsize + data.dsize;
|
||||
d = (struct ctdb_traverse_data *)talloc_size(mem_ctx, length);
|
||||
if (d == NULL) {
|
||||
/* error handling is tricky in this child code .... */
|
||||
return -1;
|
||||
return NULL;
|
||||
}
|
||||
d->length = length;
|
||||
d->reqid = reqid;
|
||||
d->keylen = key.dsize;
|
||||
d->datalen = data.dsize;
|
||||
memcpy(&d->data[0], key.dptr, key.dsize);
|
||||
memcpy(&d->data[key.dsize], data.dptr, data.dsize);
|
||||
if (ctdb_queue_send(h->queue, (uint8_t *)d, d->length) != 0) {
|
||||
return d;
|
||||
}
|
||||
|
||||
/*
|
||||
callback from tdb_traverse_read()
|
||||
*/
|
||||
static int ctdb_traverse_local_fn(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
|
||||
{
|
||||
struct ctdb_traverse_local_handle *h = talloc_get_type(p,
|
||||
struct ctdb_traverse_local_handle);
|
||||
struct ctdb_traverse_data *d;
|
||||
struct ctdb_ltdb_header *hdr;
|
||||
|
||||
/* filter out non-authoritative and zero-length records */
|
||||
hdr = (struct ctdb_ltdb_header *)data.dptr;
|
||||
if (data.dsize <= sizeof(struct ctdb_ltdb_header) ||
|
||||
hdr->dmaster != h->ctdb_db->ctdb->vnn) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
d = ctdb_traverse_marshall_record(h, 0, key, data);
|
||||
if (d == NULL) {
|
||||
/* error handling is tricky in this child code .... */
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (write(h->fd[1], (uint8_t *)d, d->length) != d->length) {
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
setup a non-blocking traverse of a tdb. The callback function will
|
||||
be called on every record in the local ltdb. To stop the travserse,
|
||||
talloc_free() the travserse_handle.
|
||||
setup a non-blocking traverse of a local ltdb. The callback function
|
||||
will be called on every record in the local ltdb. To stop the
|
||||
travserse, talloc_free() the travserse_handle.
|
||||
|
||||
The traverse is finished when the callback is called with tdb_null for key and data
|
||||
*/
|
||||
struct ctdb_traverse_handle *ctdb_traverse(struct ctdb_db_context *ctdb_db,
|
||||
ctdb_traverse_fn_t callback,
|
||||
void *private_data)
|
||||
struct ctdb_traverse_local_handle *ctdb_traverse_local(struct ctdb_db_context *ctdb_db,
|
||||
ctdb_traverse_fn_t callback,
|
||||
void *private_data)
|
||||
{
|
||||
struct ctdb_traverse_handle *h;
|
||||
struct ctdb_traverse_local_handle *h;
|
||||
int ret;
|
||||
|
||||
ctdb_db->ctdb->status.traverse_calls++;
|
||||
|
||||
if (!(h = talloc_zero(ctdb_db, struct ctdb_traverse_handle))) {
|
||||
h = talloc_zero(ctdb_db, struct ctdb_traverse_local_handle);
|
||||
if (h == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
@ -155,18 +178,18 @@ struct ctdb_traverse_handle *ctdb_traverse(struct ctdb_db_context *ctdb_db,
|
||||
if (h->child == 0) {
|
||||
/* start the traverse in the child */
|
||||
close(h->fd[0]);
|
||||
tdb_traverse_read(ctdb_db->ltdb->tdb, ctdb_traverse_fn, h);
|
||||
tdb_traverse_read(ctdb_db->ltdb->tdb, ctdb_traverse_local_fn, h);
|
||||
_exit(0);
|
||||
}
|
||||
|
||||
close(h->fd[1]);
|
||||
talloc_set_destructor(h, traverse_destructor);
|
||||
talloc_set_destructor(h, traverse_local_destructor);
|
||||
|
||||
/*
|
||||
setup a packet queue between the child and the parent. This
|
||||
copes with all the async and packet boundary issues
|
||||
*/
|
||||
h->queue = ctdb_queue_setup(ctdb_db->ctdb, h, h->fd[0], 0, ctdb_traverse_handler, h);
|
||||
h->queue = ctdb_queue_setup(ctdb_db->ctdb, h, h->fd[0], 0, ctdb_traverse_local_handler, h);
|
||||
if (h->queue == NULL) {
|
||||
talloc_free(h);
|
||||
return NULL;
|
||||
@ -176,3 +199,268 @@ struct ctdb_traverse_handle *ctdb_traverse(struct ctdb_db_context *ctdb_db,
|
||||
|
||||
return h;
|
||||
}
|
||||
|
||||
|
||||
struct ctdb_traverse_all_handle {
|
||||
struct ctdb_context *ctdb;
|
||||
uint32_t reqid;
|
||||
ctdb_traverse_fn_t callback;
|
||||
void *private_data;
|
||||
uint32_t null_count;
|
||||
};
|
||||
|
||||
/*
|
||||
destroy a traverse_all op
|
||||
*/
|
||||
static int ctdb_traverse_all_destructor(struct ctdb_traverse_all_handle *state)
|
||||
{
|
||||
ctdb_reqid_remove(state->ctdb, state->reqid);
|
||||
return 0;
|
||||
}
|
||||
|
||||
struct ctdb_traverse_all {
|
||||
uint32_t db_id;
|
||||
uint32_t reqid;
|
||||
uint32_t vnn;
|
||||
};
|
||||
|
||||
/*
|
||||
setup a cluster-wide non-blocking traverse of a ctdb. The
|
||||
callback function will be called on every record in the local
|
||||
ltdb. To stop the travserse, talloc_free() the traverse_handle.
|
||||
|
||||
The traverse is finished when the callback is called with tdb_null
|
||||
for key and data
|
||||
*/
|
||||
struct ctdb_traverse_all_handle *ctdb_daemon_traverse_all(struct ctdb_db_context *ctdb_db,
|
||||
ctdb_traverse_fn_t callback,
|
||||
void *private_data)
|
||||
{
|
||||
struct ctdb_traverse_all_handle *state;
|
||||
struct ctdb_context *ctdb = ctdb_db->ctdb;
|
||||
int ret;
|
||||
TDB_DATA data;
|
||||
struct ctdb_traverse_all r;
|
||||
|
||||
state = talloc(ctdb_db, struct ctdb_traverse_all_handle);
|
||||
if (state == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
state->ctdb = ctdb;
|
||||
state->reqid = ctdb_reqid_new(ctdb_db->ctdb, state);
|
||||
state->callback = callback;
|
||||
state->private_data = private_data;
|
||||
state->null_count = 0;
|
||||
|
||||
talloc_set_destructor(state, ctdb_traverse_all_destructor);
|
||||
|
||||
r.db_id = ctdb_db->db_id;
|
||||
r.reqid = state->reqid;
|
||||
r.vnn = ctdb->vnn;
|
||||
|
||||
data.dptr = (uint8_t *)&r;
|
||||
data.dsize = sizeof(r);
|
||||
|
||||
/* tell all the nodes in the cluster to start sending records to this node */
|
||||
ret = ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNN, 0, CTDB_CONTROL_TRAVERSE_ALL,
|
||||
CTDB_CTRL_FLAG_NOREPLY, data, NULL, NULL);
|
||||
if (ret != 0) {
|
||||
talloc_free(state);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
return state;
|
||||
}
|
||||
|
||||
struct traverse_all_state {
|
||||
struct ctdb_context *ctdb;
|
||||
struct ctdb_traverse_local_handle *h;
|
||||
uint32_t reqid;
|
||||
uint32_t srcnode;
|
||||
};
|
||||
|
||||
/*
|
||||
called for each record during a traverse all
|
||||
*/
|
||||
static void traverse_all_callback(void *p, TDB_DATA key, TDB_DATA data)
|
||||
{
|
||||
struct traverse_all_state *state = talloc_get_type(p, struct traverse_all_state);
|
||||
int ret;
|
||||
struct ctdb_traverse_data *d;
|
||||
|
||||
d = ctdb_traverse_marshall_record(state, state->reqid, key, data);
|
||||
if (d == NULL) {
|
||||
/* darn .... */
|
||||
DEBUG(0,("Out of memory in traverse_all_callback\n"));
|
||||
return;
|
||||
}
|
||||
|
||||
data.dptr = (uint8_t *)d;
|
||||
data.dsize = d->length;
|
||||
|
||||
ret = ctdb_daemon_send_control(state->ctdb, state->srcnode, 0, CTDB_CONTROL_TRAVERSE_DATA,
|
||||
CTDB_CTRL_FLAG_NOREPLY, data, NULL, NULL);
|
||||
if (ret != 0) {
|
||||
DEBUG(0,("Failed to send traverse data\n"));
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
called when a CTDB_CONTROL_TRAVERSE_ALL control comes in. We then
|
||||
setup a traverse of our local ltdb, sending the records as
|
||||
CTDB_CONTROL_TRAVERSE_DATA records back to the originator
|
||||
*/
|
||||
int32_t ctdb_control_traverse_all(struct ctdb_context *ctdb, TDB_DATA data, TDB_DATA *outdata)
|
||||
{
|
||||
struct ctdb_traverse_all *c = (struct ctdb_traverse_all *)data.dptr;
|
||||
struct traverse_all_state *state;
|
||||
struct ctdb_db_context *ctdb_db;
|
||||
|
||||
if (data.dsize != sizeof(struct ctdb_traverse_all)) {
|
||||
DEBUG(0,("Invalid size in ctdb_control_traverse_all\n"));
|
||||
return -1;
|
||||
}
|
||||
|
||||
ctdb_db = find_ctdb_db(ctdb, c->db_id);
|
||||
if (ctdb_db == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
state = talloc(ctdb_db, struct traverse_all_state);
|
||||
if (state == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
state->reqid = c->reqid;
|
||||
state->srcnode = c->vnn;
|
||||
state->ctdb = ctdb;
|
||||
|
||||
state->h = ctdb_traverse_local(ctdb_db, traverse_all_callback, state);
|
||||
if (state->h == NULL) {
|
||||
talloc_free(state);
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
called when a CTDB_CONTROL_TRAVERSE_DATA control comes in. We then
|
||||
call the traverse_all callback with the record
|
||||
*/
|
||||
int32_t ctdb_control_traverse_data(struct ctdb_context *ctdb, TDB_DATA data, TDB_DATA *outdata)
|
||||
{
|
||||
struct ctdb_traverse_data *d = (struct ctdb_traverse_data *)data.dptr;
|
||||
struct ctdb_traverse_all_handle *state;
|
||||
TDB_DATA key;
|
||||
ctdb_traverse_fn_t callback;
|
||||
void *private_data;
|
||||
|
||||
if (data.dsize < sizeof(uint32_t) || data.dsize != d->length) {
|
||||
DEBUG(0,("Bad record size in ctdb_control_traverse_data\n"));
|
||||
return -1;
|
||||
}
|
||||
|
||||
state = ctdb_reqid_find(ctdb, d->reqid, struct ctdb_traverse_all_handle);
|
||||
if (state == NULL || d->reqid != state->reqid) {
|
||||
/* traverse might have been terminated already */
|
||||
return -1;
|
||||
}
|
||||
|
||||
key.dsize = d->keylen;
|
||||
key.dptr = &d->data[0];
|
||||
data.dsize = d->datalen;
|
||||
data.dptr = &d->data[d->keylen];
|
||||
|
||||
if (key.dsize == 0 && data.dsize == 0) {
|
||||
state->null_count++;
|
||||
if (state->null_count != ctdb_get_num_nodes(ctdb)) {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
callback = state->callback;
|
||||
private_data = state->private_data;
|
||||
|
||||
callback(private_data, key, data);
|
||||
if (key.dsize == 0 && data.dsize == 0) {
|
||||
/* we've received all of the null replies, so all
|
||||
nodes are finished */
|
||||
talloc_free(state);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
struct traverse_start_state {
|
||||
struct ctdb_context *ctdb;
|
||||
struct ctdb_traverse_all_handle *h;
|
||||
uint32_t srcnode;
|
||||
uint32_t reqid;
|
||||
uint64_t srvid;
|
||||
};
|
||||
|
||||
/*
|
||||
callback which sends records as messages to the client
|
||||
*/
|
||||
static void traverse_start_callback(void *p, TDB_DATA key, TDB_DATA data)
|
||||
{
|
||||
struct traverse_start_state *state;
|
||||
struct ctdb_traverse_data *d;
|
||||
|
||||
state = talloc_get_type(p, struct traverse_start_state);
|
||||
|
||||
d = ctdb_traverse_marshall_record(state, state->reqid, key, data);
|
||||
if (d == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
data.dptr = (uint8_t *)d;
|
||||
data.dsize = d->length;
|
||||
|
||||
ctdb_dispatch_message(state->ctdb, state->srvid, data);
|
||||
if (key.dsize == 0 && data.dsize == 0) {
|
||||
/* end of traverse */
|
||||
talloc_free(state);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
start a traverse_all - called as a control from a client
|
||||
*/
|
||||
int32_t ctdb_control_traverse_start(struct ctdb_context *ctdb, TDB_DATA data,
|
||||
TDB_DATA *outdata, uint32_t srcnode)
|
||||
{
|
||||
struct ctdb_traverse_start *d = (struct ctdb_traverse_start *)data.dptr;
|
||||
struct traverse_start_state *state;
|
||||
struct ctdb_db_context *ctdb_db;
|
||||
|
||||
if (data.dsize != sizeof(*d)) {
|
||||
DEBUG(0,("Bad record size in ctdb_control_traverse_start\n"));
|
||||
return -1;
|
||||
}
|
||||
|
||||
ctdb_db = find_ctdb_db(ctdb, d->db_id);
|
||||
if (ctdb_db == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
state = talloc(ctdb_db, struct traverse_start_state);
|
||||
if (state == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
state->srcnode = srcnode;
|
||||
state->reqid = d->reqid;
|
||||
state->srvid = d->srvid;
|
||||
state->ctdb = ctdb;
|
||||
|
||||
state->h = ctdb_daemon_traverse_all(ctdb_db, traverse_start_callback, state);
|
||||
if (state->h == NULL) {
|
||||
talloc_free(state);
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -139,51 +139,6 @@ void ctdb_latency(double *latency, struct timeval t)
|
||||
}
|
||||
}
|
||||
|
||||
#if 0
|
||||
struct idr_fake {
|
||||
uint32_t size;
|
||||
void **ptrs;
|
||||
};
|
||||
|
||||
static void idr_fake_init(struct ctdb_context *ctdb)
|
||||
{
|
||||
if (ctdb->fidr) return;
|
||||
ctdb->fidr = talloc(ctdb, struct idr_fake);
|
||||
ctdb->fidr->size = 0x10000;
|
||||
ctdb->fidr->ptrs = talloc_zero_array(ctdb->fidr, void *,
|
||||
ctdb->fidr->size);
|
||||
}
|
||||
|
||||
uint32_t ctdb_reqid_new(struct ctdb_context *ctdb, void *state)
|
||||
{
|
||||
uint32_t i;
|
||||
idr_fake_init(ctdb);
|
||||
for (i=0;i<ctdb->fidr->size;i++) {
|
||||
if (ctdb->fidr->ptrs[i] == NULL) {
|
||||
ctdb->fidr->ptrs[i] = state;
|
||||
return i;
|
||||
}
|
||||
}
|
||||
return (uint32_t)-1;
|
||||
}
|
||||
|
||||
void *_ctdb_reqid_find(struct ctdb_context *ctdb, uint32_t reqid, const char *type, const char *location)
|
||||
{
|
||||
idr_fake_init(ctdb);
|
||||
if (ctdb->fidr->ptrs[reqid] == NULL) {
|
||||
DEBUG(0,("bad fidr id %u\n", reqid));
|
||||
}
|
||||
return ctdb->fidr->ptrs[reqid];
|
||||
}
|
||||
|
||||
|
||||
void ctdb_reqid_remove(struct ctdb_context *ctdb, uint32_t reqid)
|
||||
{
|
||||
idr_fake_init(ctdb);
|
||||
ctdb->fidr->ptrs[reqid] = NULL;
|
||||
}
|
||||
|
||||
#else
|
||||
uint32_t ctdb_reqid_new(struct ctdb_context *ctdb, void *state)
|
||||
{
|
||||
uint32_t id;
|
||||
@ -215,5 +170,3 @@ void ctdb_reqid_remove(struct ctdb_context *ctdb, uint32_t reqid)
|
||||
DEBUG(0, ("Removing idr that does not exist\n"));
|
||||
}
|
||||
}
|
||||
|
||||
#endif
|
||||
|
@ -282,4 +282,6 @@ int ctdb_status_reset(struct ctdb_context *ctdb, uint32_t destnode);
|
||||
|
||||
int ctdb_set_logfile(struct ctdb_context *ctdb, const char *logfile);
|
||||
|
||||
int ctdb_list_keys(struct ctdb_db_context *ctdb_db, FILE *f);
|
||||
|
||||
#endif
|
||||
|
@ -201,7 +201,6 @@ struct ctdb_context {
|
||||
struct ctdb_status status;
|
||||
struct ctdb_vnn_map *vnn_map;
|
||||
uint32_t num_clients;
|
||||
struct idr_fake *fidr;
|
||||
};
|
||||
|
||||
struct ctdb_db_context {
|
||||
@ -268,7 +267,12 @@ enum ctdb_controls {CTDB_CONTROL_PROCESS_EXISTS,
|
||||
CTDB_CONTROL_STATUS_RESET,
|
||||
CTDB_CONTROL_DB_ATTACH,
|
||||
CTDB_CONTROL_SET_CALL,
|
||||
CTDB_CONTROL_WRITE_RECORD};
|
||||
CTDB_CONTROL_WRITE_RECORD,
|
||||
CTDB_CONTROL_TRAVERSE_START,
|
||||
CTDB_CONTROL_TRAVERSE_ALL,
|
||||
CTDB_CONTROL_TRAVERSE_DATA,
|
||||
};
|
||||
|
||||
|
||||
/*
|
||||
structure passed in set_call control
|
||||
@ -646,4 +650,31 @@ int ctdb_control_setvnnmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA
|
||||
int ctdb_control_getdbmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata);
|
||||
int ctdb_control_getnodemap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata);
|
||||
|
||||
|
||||
struct ctdb_traverse_start {
|
||||
uint32_t db_id;
|
||||
uint32_t reqid;
|
||||
uint64_t srvid;
|
||||
};
|
||||
|
||||
/*
|
||||
structure used to pass the data between the child and parent
|
||||
*/
|
||||
struct ctdb_traverse_data {
|
||||
uint32_t length;
|
||||
uint32_t reqid;
|
||||
uint32_t keylen;
|
||||
uint32_t datalen;
|
||||
uint8_t data[1];
|
||||
};
|
||||
|
||||
|
||||
int32_t ctdb_control_traverse_start(struct ctdb_context *ctdb, TDB_DATA indata,
|
||||
TDB_DATA *outdata, uint32_t srcnode);
|
||||
int32_t ctdb_control_traverse_all(struct ctdb_context *ctdb, TDB_DATA data, TDB_DATA *outdata);
|
||||
int32_t ctdb_control_traverse_data(struct ctdb_context *ctdb, TDB_DATA data, TDB_DATA *outdata);
|
||||
|
||||
int ctdb_dispatch_message(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data);
|
||||
|
||||
|
||||
#endif
|
||||
|
@ -176,6 +176,8 @@ int main(int argc, const char *argv[])
|
||||
}
|
||||
talloc_free(call.reply_data.dptr);
|
||||
|
||||
ctdb_list_keys(ctdb_db, stdout);
|
||||
|
||||
/* go into a wait loop to allow other nodes to complete */
|
||||
ctdb_shutdown(ctdb);
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user