mirror of
https://github.com/samba-team/samba.git
synced 2024-12-23 17:34:34 +03:00
LIBCTDB: add support for traverse
(This used to be ctdb commit 9463e04038ba36792583f83bd95c1af322dc283a)
This commit is contained in:
parent
6494574d8f
commit
fcd98a7e59
@ -291,6 +291,47 @@ bool ctdb_writerecord(struct ctdb_db *ctdb_db,
|
||||
*/
|
||||
void ctdb_release_lock(struct ctdb_db *ctdb_db, struct ctdb_lock *lock);
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* ctdb_traverse_callback_t - callback for ctdb_traverse_async.
|
||||
* return 0 - to continue traverse
|
||||
* return 1 - to abort the traverse
|
||||
*
|
||||
* See Also:
|
||||
* ctdb_traverse_async()
|
||||
*/
|
||||
#define TRAVERSE_STATUS_RECORD 0
|
||||
#define TRAVERSE_STATUS_FINISHED 1
|
||||
#define TRAVERSE_STATUS_ERROR 2
|
||||
typedef int (*ctdb_traverse_callback_t)(struct ctdb_connection *ctdb,
|
||||
struct ctdb_db *ctdb_db,
|
||||
int status,
|
||||
TDB_DATA key,
|
||||
TDB_DATA data,
|
||||
void *private_data);
|
||||
|
||||
/**
|
||||
* ctdb_traverse_async - traverse a database.
|
||||
* @ctdb_db: the database handle from ctdb_attachdb/ctdb_attachdb_recv.
|
||||
* @callback: the callback once the record is locked (typesafe).
|
||||
* @cbdata: the argument to callback()
|
||||
*
|
||||
* This returns true on success.
|
||||
* when successfull, the callback will be invoked for each record
|
||||
* until the traversal is finished.
|
||||
*
|
||||
* status ==
|
||||
* TRAVERSE_STATUS_RECORD key/data contains a record.
|
||||
* TRAVERSE_STATUS_FINISHED traverse is finished. key/data is undefined.
|
||||
* TRAVERSE_STATUS_ERROR an error occured during traverse.
|
||||
* key/data is undefined.
|
||||
*
|
||||
* If failure is immediate, false is returned.
|
||||
*/
|
||||
bool ctdb_traverse_async(struct ctdb_db *ctdb_db,
|
||||
ctdb_traverse_callback_t callback, void *cbdata);
|
||||
|
||||
/**
|
||||
* ctdb_message_fn_t - messaging callback for ctdb messages
|
||||
*
|
||||
@ -480,6 +521,7 @@ ctdb_getpublicips_send(struct ctdb_connection *ctdb,
|
||||
bool ctdb_getpublicips_recv(struct ctdb_connection *ctdb,
|
||||
struct ctdb_request *req, struct ctdb_all_public_ips **ips);
|
||||
|
||||
|
||||
/**
|
||||
* ctdb_getrecmaster_send - read the recovery master of a node
|
||||
* @ctdb: the ctdb_connection from ctdb_connect.
|
||||
|
@ -838,24 +838,6 @@ int ctdb_control_getnodemap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA
|
||||
int ctdb_control_writerecord(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata);
|
||||
|
||||
|
||||
struct ctdb_traverse_start {
|
||||
uint32_t db_id;
|
||||
uint32_t reqid;
|
||||
uint64_t srvid;
|
||||
};
|
||||
|
||||
/*
|
||||
structure used to pass record data between the child and parent
|
||||
*/
|
||||
struct ctdb_rec_data {
|
||||
uint32_t length;
|
||||
uint32_t reqid;
|
||||
uint32_t keylen;
|
||||
uint32_t datalen;
|
||||
uint8_t data[1];
|
||||
};
|
||||
|
||||
|
||||
/* structure used for pulldb control */
|
||||
struct ctdb_control_pulldb {
|
||||
uint32_t db_id;
|
||||
|
@ -161,6 +161,9 @@ struct ctdb_call_info {
|
||||
*/
|
||||
#define CTDB_SRVID_TEST_RANGE 0xFE03000000000000LL
|
||||
|
||||
/* Range of ports reserved for traversals */
|
||||
#define CTDB_SRVID_TRAVERSE_RANGE 0xFE04000000000000LL
|
||||
|
||||
/* used on the domain socket, send a pdu to the local daemon */
|
||||
#define CTDB_CURRENT_NODE 0xF0000001
|
||||
/* send a broadcast to all nodes in the cluster, active or not */
|
||||
@ -543,6 +546,23 @@ struct latency_counter {
|
||||
double total;
|
||||
};
|
||||
|
||||
/*
|
||||
structure used to pass record data between the child and parent
|
||||
*/
|
||||
struct ctdb_rec_data {
|
||||
uint32_t length;
|
||||
uint32_t reqid;
|
||||
uint32_t keylen;
|
||||
uint32_t datalen;
|
||||
uint8_t data[1];
|
||||
};
|
||||
|
||||
struct ctdb_traverse_start {
|
||||
uint32_t db_id;
|
||||
uint32_t reqid;
|
||||
uint64_t srvid;
|
||||
};
|
||||
|
||||
/*
|
||||
ctdb statistics information
|
||||
*/
|
||||
|
@ -936,3 +936,186 @@ bool ctdb_writerecord(struct ctdb_db *ctdb_db,
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
struct ctdb_traverse_state {
|
||||
struct ctdb_request *handle;
|
||||
struct ctdb_db *ctdb_db;
|
||||
uint64_t srvid;
|
||||
|
||||
ctdb_traverse_callback_t callback;
|
||||
void *cbdata;
|
||||
};
|
||||
|
||||
static void traverse_remhnd_cb(struct ctdb_connection *ctdb,
|
||||
struct ctdb_request *req, void *private_data)
|
||||
{
|
||||
struct ctdb_traverse_state *state = private_data;
|
||||
|
||||
if (!ctdb_remove_message_handler_recv(ctdb, state->handle)) {
|
||||
DEBUG(ctdb, LOG_ERR,
|
||||
"Failed to remove message handler for"
|
||||
" traverse.");
|
||||
state->callback(state->ctdb_db->ctdb, state->ctdb_db,
|
||||
TRAVERSE_STATUS_ERROR,
|
||||
tdb_null, tdb_null,
|
||||
state->cbdata);
|
||||
}
|
||||
ctdb_request_free(ctdb, state->handle);
|
||||
state->handle = NULL;
|
||||
free(state);
|
||||
}
|
||||
|
||||
static void msg_h(struct ctdb_connection *ctdb, uint64_t srvid,
|
||||
TDB_DATA data, void *private_data)
|
||||
{
|
||||
struct ctdb_traverse_state *state = private_data;
|
||||
struct ctdb_db *ctdb_db = state->ctdb_db;
|
||||
struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
|
||||
TDB_DATA key;
|
||||
|
||||
if (data.dsize < sizeof(uint32_t) ||
|
||||
d->length != data.dsize) {
|
||||
DEBUG(ctdb, LOG_ERR,
|
||||
"Bad data size %u in traverse_handler",
|
||||
(unsigned)data.dsize);
|
||||
state->callback(state->ctdb_db->ctdb, state->ctdb_db,
|
||||
TRAVERSE_STATUS_ERROR,
|
||||
tdb_null, tdb_null,
|
||||
state->cbdata);
|
||||
state->handle = ctdb_remove_message_handler_send(
|
||||
state->ctdb_db->ctdb, state->srvid,
|
||||
msg_h, state,
|
||||
traverse_remhnd_cb, state);
|
||||
return;
|
||||
}
|
||||
|
||||
key.dsize = d->keylen;
|
||||
key.dptr = &d->data[0];
|
||||
data.dsize = d->datalen;
|
||||
data.dptr = &d->data[d->keylen];
|
||||
|
||||
if (key.dsize == 0 && data.dsize == 0) {
|
||||
state->callback(state->ctdb_db->ctdb, state->ctdb_db,
|
||||
TRAVERSE_STATUS_FINISHED,
|
||||
tdb_null, tdb_null,
|
||||
state->cbdata);
|
||||
state->handle = ctdb_remove_message_handler_send(
|
||||
state->ctdb_db->ctdb, state->srvid,
|
||||
msg_h, state,
|
||||
traverse_remhnd_cb, state);
|
||||
return;
|
||||
}
|
||||
|
||||
if (data.dsize <= sizeof(struct ctdb_ltdb_header)) {
|
||||
/* empty records are deleted records in ctdb */
|
||||
return;
|
||||
}
|
||||
|
||||
data.dsize -= sizeof(struct ctdb_ltdb_header);
|
||||
data.dptr += sizeof(struct ctdb_ltdb_header);
|
||||
|
||||
if (state->callback(ctdb, ctdb_db,
|
||||
TRAVERSE_STATUS_RECORD,
|
||||
key, data, state->cbdata) != 0) {
|
||||
state->handle = ctdb_remove_message_handler_send(
|
||||
state->ctdb_db->ctdb, state->srvid,
|
||||
msg_h, state,
|
||||
traverse_remhnd_cb, state);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
static void traverse_start_cb(struct ctdb_connection *ctdb,
|
||||
struct ctdb_request *req, void *private_data)
|
||||
{
|
||||
struct ctdb_traverse_state *state = private_data;
|
||||
|
||||
ctdb_request_free(ctdb, state->handle);
|
||||
state->handle = NULL;
|
||||
}
|
||||
|
||||
static void traverse_msghnd_cb(struct ctdb_connection *ctdb,
|
||||
struct ctdb_request *req, void *private_data)
|
||||
{
|
||||
struct ctdb_traverse_state *state = private_data;
|
||||
struct ctdb_db *ctdb_db = state->ctdb_db;
|
||||
struct ctdb_traverse_start t;
|
||||
|
||||
if (!ctdb_set_message_handler_recv(ctdb, state->handle)) {
|
||||
DEBUG(ctdb, LOG_ERR,
|
||||
"Failed to register message handler for"
|
||||
" traverse.");
|
||||
state->callback(state->ctdb_db->ctdb, state->ctdb_db,
|
||||
TRAVERSE_STATUS_ERROR,
|
||||
tdb_null, tdb_null,
|
||||
state->cbdata);
|
||||
ctdb_request_free(ctdb, state->handle);
|
||||
state->handle = NULL;
|
||||
free(state);
|
||||
return;
|
||||
}
|
||||
ctdb_request_free(ctdb, state->handle);
|
||||
state->handle = NULL;
|
||||
|
||||
t.db_id = ctdb_db->id;
|
||||
t.srvid = state->srvid;
|
||||
t.reqid = 0;
|
||||
|
||||
state->handle = new_ctdb_control_request(ctdb,
|
||||
CTDB_CONTROL_TRAVERSE_START,
|
||||
CTDB_CURRENT_NODE,
|
||||
&t, sizeof(t),
|
||||
traverse_start_cb, state);
|
||||
if (state->handle == NULL) {
|
||||
DEBUG(ctdb, LOG_ERR,
|
||||
"ctdb_traverse_async:"
|
||||
" failed to send traverse_start control");
|
||||
state->callback(state->ctdb_db->ctdb, state->ctdb_db,
|
||||
TRAVERSE_STATUS_ERROR,
|
||||
tdb_null, tdb_null,
|
||||
state->cbdata);
|
||||
state->handle = ctdb_remove_message_handler_send(
|
||||
state->ctdb_db->ctdb, state->srvid,
|
||||
msg_h, state,
|
||||
traverse_remhnd_cb, state);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
bool ctdb_traverse_async(struct ctdb_db *ctdb_db,
|
||||
ctdb_traverse_callback_t callback, void *cbdata)
|
||||
{
|
||||
struct ctdb_connection *ctdb = ctdb_db->ctdb;
|
||||
struct ctdb_traverse_state *state;
|
||||
static uint32_t tid = 0;
|
||||
|
||||
state = malloc(sizeof(struct ctdb_traverse_state));
|
||||
if (state == NULL) {
|
||||
DEBUG(ctdb, LOG_ERR,
|
||||
"ctdb_traverse_async: no memory."
|
||||
" allocate state failed");
|
||||
return false;
|
||||
}
|
||||
|
||||
tid++;
|
||||
state->srvid = CTDB_SRVID_TRAVERSE_RANGE|tid;
|
||||
|
||||
state->callback = callback;
|
||||
state->cbdata = cbdata;
|
||||
state->ctdb_db = ctdb_db;
|
||||
|
||||
state->handle = ctdb_set_message_handler_send(ctdb_db->ctdb,
|
||||
state->srvid,
|
||||
msg_h, state,
|
||||
traverse_msghnd_cb, state);
|
||||
if (state->handle == NULL) {
|
||||
DEBUG(ctdb, LOG_ERR,
|
||||
"ctdb_traverse_async:"
|
||||
" failed ctdb_set_message_handler_send");
|
||||
free(state);
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
@ -223,6 +223,25 @@ void message_handler_cb(struct ctdb_connection *ctdb,
|
||||
registered = true;
|
||||
}
|
||||
|
||||
static int traverse_callback(struct ctdb_connection *ctdb_connection, struct ctdb_db *ctdb_db, int status, TDB_DATA key, TDB_DATA data, void *private_data)
|
||||
{
|
||||
if (status == TRAVERSE_STATUS_FINISHED) {
|
||||
printf("Traverse finished\n");
|
||||
return 0;
|
||||
}
|
||||
if (status == TRAVERSE_STATUS_ERROR) {
|
||||
printf("Traverse failed\n");
|
||||
return 1;
|
||||
}
|
||||
|
||||
printf("traverse callback status:%d\n", status);
|
||||
printf("key: %d [%s]\n", key.dsize, key.dptr);
|
||||
printf("data:%d [%s]\n", data.dsize, data.dptr);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
int main(int argc, char *argv[])
|
||||
{
|
||||
struct ctdb_connection *ctdb_connection;
|
||||
@ -366,6 +385,9 @@ int main(int argc, char *argv[])
|
||||
print_nodemap(nodemap);
|
||||
ctdb_free_nodemap(nodemap);
|
||||
|
||||
printf("Traverse the test_test.tdb database\n");
|
||||
ctdb_traverse_async(ctdb_db_context, traverse_callback, NULL);
|
||||
|
||||
for (;;) {
|
||||
|
||||
pfd.events = ctdb_which_events(ctdb_connection);
|
||||
|
Loading…
Reference in New Issue
Block a user