1
0
mirror of https://github.com/samba-team/samba.git synced 2025-03-27 22:50:26 +03:00

merge from tridge

(This used to be ctdb commit f261f554ccf5d85a90f504cc20fc6f1f8b3f14d6)
This commit is contained in:
Ronnie Sahlberg 2007-05-10 17:59:51 +10:00
commit 343e44918c
13 changed files with 458 additions and 634 deletions

View File

@ -613,17 +613,42 @@ static int ctdb_call_destructor(struct ctdb_call_state *state)
/*
called when a ctdb_call times out
*/
void ctdb_call_timeout(struct event_context *ev, struct timed_event *te,
struct timeval t, void *private_data)
static void ctdb_call_timeout(struct event_context *ev, struct timed_event *te,
struct timeval t, void *private_data)
{
struct ctdb_call_state *state = talloc_get_type(private_data, struct ctdb_call_state);
DEBUG(0,(__location__ " call timeout for reqid %d\n", state->c->hdr.reqid));
state->state = CTDB_CALL_ERROR;
ctdb_set_error(state->ctdb_db->ctdb, "ctdb_call %u timed out",
state->c->hdr.reqid);
if (state->async.fn) {
state->async.fn(state);
struct ctdb_context *ctdb = state->ctdb_db->ctdb;
ctdb->status.timeouts.call++;
event_add_timed(ctdb->ev, state, timeval_current_ofs(CTDB_CALL_TIMEOUT, 0),
ctdb_call_timeout, state);
if (++state->resend_count < 10 &&
(ctdb->vnn_map->generation == state->generation ||
ctdb->recovery_mode != CTDB_RECOVERY_NORMAL)) {
/* the call is just being slow, or we are curently
recovering, give it more time */
return;
}
/* the generation count changed or we're timing out too much -
the call must be re-issued */
state->generation = ctdb->vnn_map->generation;
state->resend_count = 0;
/* use a new reqid, in case the old reply does eventually come in */
ctdb_reqid_remove(ctdb, state->reqid);
state->reqid = ctdb_reqid_new(ctdb, state);
state->c->hdr.reqid = state->reqid;
/* update the generation count for this request, so its valid with the new vnn_map */
state->c->hdr.generation = state->generation;
/* send the packet to ourselves, it will be redirected appropriately */
state->c->hdr.destnode = ctdb->vnn;
ctdb_queue_packet(ctdb, &state->c->hdr);
}
/*
@ -689,6 +714,7 @@ struct ctdb_call_state *ctdb_daemon_call_send_remote(struct ctdb_db_context *ctd
state = talloc_zero(ctdb_db, struct ctdb_call_state);
CTDB_NO_MEMORY_NULL(ctdb, state);
state->reqid = ctdb_reqid_new(ctdb, state);
state->ctdb_db = ctdb_db;
talloc_set_destructor(state, ctdb_call_destructor);
len = offsetof(struct ctdb_req_call, data) + call->key.dsize + call->call_data.dsize;
@ -697,15 +723,6 @@ struct ctdb_call_state *ctdb_daemon_call_send_remote(struct ctdb_db_context *ctd
CTDB_NO_MEMORY_NULL(ctdb, state->c);
state->c->hdr.destnode = header->dmaster;
#if 0
/*always sending the remote call straight to the lmaster
improved performance slightly in some tests.
worth investigating further in the future
*/
state->c->hdr.destnode = ctdb_lmaster(ctdb_db->ctdb, &(call->key));
#endif
/* this limits us to 16k outstanding messages - not unreasonable */
state->c->hdr.reqid = state->reqid;
state->c->flags = call->flags;
@ -722,14 +739,12 @@ struct ctdb_call_state *ctdb_daemon_call_send_remote(struct ctdb_db_context *ctd
state->call.key.dptr = &state->c->data[0];
state->state = CTDB_CALL_WAIT;
state->ctdb_db = ctdb_db;
state->generation = ctdb->vnn_map->generation;
ctdb_queue_packet(ctdb, &state->c->hdr);
#if CTDB_REQ_TIMEOUT
event_add_timed(ctdb->ev, state, timeval_current_ofs(CTDB_REQ_TIMEOUT, 0),
event_add_timed(ctdb->ev, state, timeval_current_ofs(CTDB_CALL_TIMEOUT, 0),
ctdb_call_timeout, state);
#endif
return state;
}

View File

@ -328,8 +328,11 @@ struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db,
return NULL;
}
/* this limits us to 16k outstanding messages - not unreasonable */
c->hdr.reqid = ctdb_reqid_new(ctdb, state);
state->reqid = ctdb_reqid_new(ctdb, state);
state->ctdb_db = ctdb_db;
talloc_set_destructor(state, ctdb_client_call_destructor);
c->hdr.reqid = state->reqid;
c->flags = call->flags;
c->db_id = ctdb_db->db_id;
c->callid = call->call_id;
@ -344,10 +347,7 @@ struct ctdb_client_call_state *ctdb_call_send(struct ctdb_db_context *ctdb_db,
state->call.key.dptr = &c->data[0];
state->state = CTDB_CALL_WAIT;
state->ctdb_db = ctdb_db;
state->reqid = c->hdr.reqid;
talloc_set_destructor(state, ctdb_client_call_destructor);
ctdb_client_queue_pkt(ctdb, &c->hdr);
@ -615,6 +615,7 @@ void ctdb_shutdown(struct ctdb_context *ctdb)
struct ctdb_client_control_state {
struct ctdb_context *ctdb;
uint32_t reqid;
int32_t status;
TDB_DATA outdata;
@ -664,6 +665,15 @@ static void timeout_func(struct event_context *ev, struct timed_event *te,
*timed_out = 1;
}
/*
destroy a ctdb_control in client
*/
static int ctdb_control_destructor(struct ctdb_client_control_state *state)
{
ctdb_reqid_remove(state->ctdb, state->reqid);
return 0;
}
/*
send a ctdb control message
timeout specifies how long we should wait for a reply.
@ -688,9 +698,12 @@ int ctdb_control(struct ctdb_context *ctdb, uint32_t destnode, uint64_t srvid,
state = talloc_zero(ctdb, struct ctdb_client_control_state);
CTDB_NO_MEMORY(ctdb, state);
state->ctdb = ctdb;
state->reqid = ctdb_reqid_new(ctdb, state);
state->state = CTDB_CALL_WAIT;
talloc_set_destructor(state, ctdb_control_destructor);
len = offsetof(struct ctdb_req_control, data) + data.dsize;
c = ctdbd_allocate_pkt(ctdb, state, CTDB_REQ_CONTROL,
len, struct ctdb_req_control);
@ -1027,6 +1040,9 @@ int ctdb_ctrl_pulldb(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid
int32_t res;
unsigned char *ptr;
DEBUG(0,("ronnie to fix!\n"));
return -1;
indata.dsize = 2*sizeof(uint32_t);
indata.dptr = (unsigned char *)talloc_array(mem_ctx, uint32_t, 2);
@ -1170,36 +1186,6 @@ int ctdb_ctrl_cleardb(struct ctdb_context *ctdb, uint32_t destnode, TALLOC_CTX *
return 0;
}
int ctdb_ctrl_write_record(struct ctdb_context *ctdb, uint32_t destnode, TALLOC_CTX *mem_ctx, uint32_t dbid, TDB_DATA key, TDB_DATA data)
{
struct ctdb_write_record *wr;
TDB_DATA indata, outdata;
int32_t res;
int ret, len;
len = offsetof(struct ctdb_write_record, blob)+key.dsize+data.dsize;
wr = (struct ctdb_write_record *)talloc_zero_size(mem_ctx, len);
wr->dbid = dbid;
wr->keylen = key.dsize;
wr->datalen = data.dsize;
memcpy(&wr->blob[0], &key.dptr[0], key.dsize);
memcpy(&wr->blob[key.dsize], &data.dptr[0], data.dsize);
indata.dsize = len;
indata.dptr = (unsigned char *)wr;
ret = ctdb_control(ctdb, destnode, 0,
CTDB_CONTROL_WRITE_RECORD, 0, indata,
mem_ctx, &outdata, &res, NULL);
if (ret != 0 || res != 0) {
DEBUG(0,(__location__ " ctdb_control for write record failed\n"));
return -1;
}
return 0;
}
/*
ping a node, return number of clients connected
*/
@ -1534,7 +1520,7 @@ struct traverse_state {
static void traverse_handler(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data, void *p)
{
struct traverse_state *state = (struct traverse_state *)p;
struct ctdb_traverse_data *d = (struct ctdb_traverse_data *)data.dptr;
struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
TDB_DATA key;
if (data.dsize < sizeof(uint32_t) ||

View File

@ -34,100 +34,6 @@ struct ctdb_control_state {
void *private_data;
};
static int traverse_cleardb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
{
int ret;
ret = tdb_delete(tdb, key);
if (ret) {
DEBUG(0,(__location__ "failed to delete tdb record\n"));
return ret;
}
return 0;
}
static int traverse_setdmaster(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
{
uint32_t *dmaster = (uint32_t *)p;
struct ctdb_ltdb_header *header = (struct ctdb_ltdb_header *)data.dptr;
int ret;
header->dmaster = *dmaster;
ret = tdb_store(tdb, key, data, TDB_REPLACE);
if (ret) {
DEBUG(0,(__location__ "failed to write tdb data back ret:%d\n",ret));
return ret;
}
return 0;
}
struct getkeys_params {
struct ctdb_db_context *ctdb_db;
TDB_DATA *outdata;
uint32_t lmaster;
};
static int traverse_getkeys(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
{
struct getkeys_params *params = (struct getkeys_params *)p;
TDB_DATA *outdata = talloc_get_type(params->outdata, TDB_DATA);
struct ctdb_db_context *ctdb_db = talloc_get_type(params->ctdb_db, struct ctdb_db_context);
unsigned char *ptr;
int len;
uint32_t lmaster;
lmaster = ctdb_lmaster(ctdb_db->ctdb, &key);
/* only include this record if the lmaster matches or if
the wildcard lmaster (-1) was specified.
*/
if((params->lmaster!=CTDB_LMASTER_ANY)
&& (params->lmaster!=lmaster) ){
return 0;
}
len=outdata->dsize;
len+=4; /*lmaster*/
len+=4; /*key len*/
len+=4; /*data len */
len=(len+CTDB_DS_ALIGNMENT-1)& ~(CTDB_DS_ALIGNMENT-1);
len+=key.dsize;
len=(len+CTDB_DS_ALIGNMENT-1)& ~(CTDB_DS_ALIGNMENT-1);
len+=sizeof(struct ctdb_ltdb_header);
len=(len+CTDB_DS_ALIGNMENT-1)& ~(CTDB_DS_ALIGNMENT-1);
len+=(data.dsize-sizeof(struct ctdb_ltdb_header));
len=(len+CTDB_DS_ALIGNMENT-1)& ~(CTDB_DS_ALIGNMENT-1);
ptr=outdata->dptr=talloc_realloc_size(outdata, outdata->dptr, len);
ptr+=outdata->dsize;
outdata->dsize=len;
/* number of records is stored as the second 4 bytes */
((uint32_t *)(&outdata->dptr[0]))[1]++;
*((uint32_t *)ptr)=lmaster;
ptr+=4;
*((uint32_t *)ptr)=key.dsize;
ptr+=4;
*((uint32_t *)ptr)=data.dsize-sizeof(struct ctdb_ltdb_header);
ptr+=4;
ptr = outdata->dptr+(((ptr-outdata->dptr)+CTDB_DS_ALIGNMENT-1)& ~(CTDB_DS_ALIGNMENT-1));
memcpy(ptr, key.dptr, key.dsize);
ptr += key.dsize;
ptr = outdata->dptr+(((ptr-outdata->dptr)+CTDB_DS_ALIGNMENT-1)& ~(CTDB_DS_ALIGNMENT-1));
memcpy(ptr, data.dptr, sizeof(struct ctdb_ltdb_header));
ptr += sizeof(struct ctdb_ltdb_header);
ptr = outdata->dptr+(((ptr-outdata->dptr)+CTDB_DS_ALIGNMENT-1)& ~(CTDB_DS_ALIGNMENT-1));
memcpy(ptr, data.dptr+sizeof(struct ctdb_ltdb_header), data.dsize-sizeof(struct ctdb_ltdb_header));
ptr = outdata->dptr+(((ptr-outdata->dptr)+CTDB_DS_ALIGNMENT-1)& ~(CTDB_DS_ALIGNMENT-1));
return 0;
}
/*
process a control request
*/
@ -190,155 +96,23 @@ static int32_t ctdb_control_dispatch(struct ctdb_context *ctdb,
case CTDB_CONTROL_SETVNNMAP:
return ctdb_control_setvnnmap(ctdb, opcode, indata, outdata);
case CTDB_CONTROL_PULL_DB: {
uint32_t dbid, lmaster;
struct ctdb_db_context *ctdb_db;
struct getkeys_params params;
case CTDB_CONTROL_PULL_DB:
CHECK_CONTROL_DATA_SIZE(sizeof(struct ctdb_control_pulldb));
return ctdb_control_pull_db(ctdb, indata, outdata);
dbid = ((uint32_t *)(&indata.dptr[0]))[0];
ctdb_db = find_ctdb_db(ctdb, dbid);
if (!ctdb_db) {
DEBUG(0,(__location__ " Unknown db\n"));
return -1;
}
case CTDB_CONTROL_SET_DMASTER:
CHECK_CONTROL_DATA_SIZE(sizeof(struct ctdb_control_set_dmaster));
return ctdb_control_set_dmaster(ctdb, indata);
lmaster = ((uint32_t *)(&indata.dptr[0]))[1];
case CTDB_CONTROL_CLEAR_DB:
CHECK_CONTROL_DATA_SIZE(sizeof(uint32_t));
return ctdb_control_clear_db(ctdb, indata);
outdata->dsize = 2* sizeof(uint32_t);
outdata->dptr = (unsigned char *)talloc_array(outdata, uint32_t, 2);
((uint32_t *)(&outdata->dptr[0]))[0]=dbid;
((uint32_t *)(&outdata->dptr[0]))[1]=0;
params.ctdb_db = ctdb_db;
params.outdata = outdata;
params.lmaster = lmaster;
tdb_traverse_read(ctdb_db->ltdb->tdb, traverse_getkeys, &params);
return 0;
}
case CTDB_CONTROL_SET_DMASTER: {
uint32_t dbid, dmaster;
struct ctdb_db_context *ctdb_db;
dbid = ((uint32_t *)(&indata.dptr[0]))[0];
ctdb_db = find_ctdb_db(ctdb, dbid);
if (!ctdb_db) {
DEBUG(0,(__location__ " Unknown db 0x%08x\n",dbid));
return -1;
}
dmaster = ((uint32_t *)(&indata.dptr[0]))[1];
outdata->dsize = 0;
outdata->dptr = NULL;
tdb_traverse(ctdb_db->ltdb->tdb, traverse_setdmaster, &dmaster);
return 0;
}
case CTDB_CONTROL_CLEAR_DB: {
uint32_t dbid;
struct ctdb_db_context *ctdb_db;
dbid = ((uint32_t *)(&indata.dptr[0]))[0];
ctdb_db = find_ctdb_db(ctdb, dbid);
if (!ctdb_db) {
DEBUG(0,(__location__ " Unknown db 0x%08x\n",dbid));
return -1;
}
outdata->dsize = 0;
outdata->dptr = NULL;
tdb_traverse(ctdb_db->ltdb->tdb, traverse_cleardb, NULL);
return 0;
}
case CTDB_CONTROL_PUSH_DB: {
uint32_t dbid, num;
struct ctdb_db_context *ctdb_db;
unsigned char *ptr;
int i, ret;
TDB_DATA key, data;
struct ctdb_ltdb_header *hdr, header;
outdata->dsize = 0;
outdata->dptr = NULL;
dbid = ((uint32_t *)(&indata.dptr[0]))[0];
ctdb_db = find_ctdb_db(ctdb, dbid);
if (!ctdb_db) {
DEBUG(0,(__location__ " Unknown db 0x%08x\n",dbid));
return -1;
}
num = ((uint32_t *)(&indata.dptr[0]))[1];
ptr=&indata.dptr[8];
for(i=0;i<num;i++){
/* skip the lmaster*/
ptr += 4;
/* keylength */
key.dsize = *((uint32_t *)ptr);
ptr += 4;
/* data length */
data.dsize = *((uint32_t *)ptr);
ptr += 4;
/* key */
ptr = indata.dptr+(((ptr-indata.dptr)+CTDB_DS_ALIGNMENT-1)& ~(CTDB_DS_ALIGNMENT-1));
key.dptr = ptr;
ptr += key.dsize;
/* header */
ptr = indata.dptr+(((ptr-indata.dptr)+CTDB_DS_ALIGNMENT-1)& ~(CTDB_DS_ALIGNMENT-1));
hdr = (struct ctdb_ltdb_header *)ptr;
ptr += sizeof(struct ctdb_ltdb_header);
/* data */
ptr = indata.dptr+(((ptr-indata.dptr)+CTDB_DS_ALIGNMENT-1)& ~(CTDB_DS_ALIGNMENT-1));
data.dptr=ptr;
ptr += data.dsize;
ptr = indata.dptr+(((ptr-indata.dptr)+CTDB_DS_ALIGNMENT-1)& ~(CTDB_DS_ALIGNMENT-1));
ret = ctdb_ltdb_lock(ctdb_db, key);
if (ret != 0) {
DEBUG(0, (__location__ "Unable to lock db\n"));
return -1;
}
ret = ctdb_ltdb_fetch(ctdb_db, key, &header, outdata, NULL);
if (ret != 0) {
DEBUG(0, (__location__ "Unable to fetch record\n"));
ctdb_ltdb_unlock(ctdb_db, key);
return -1;
}
if (header.rsn < hdr->rsn) {
ret = ctdb_ltdb_store(ctdb_db, key, hdr, data);
if (ret != 0) {
DEBUG(0, (__location__ "Unable to store record\n"));
ctdb_ltdb_unlock(ctdb_db, key);
return -1;
}
}
ctdb_ltdb_unlock(ctdb_db, key);
}
return 0;
}
case CTDB_CONTROL_WRITE_RECORD:
return ctdb_control_writerecord(ctdb, opcode, indata, outdata);
case CTDB_CONTROL_PUSH_DB:
return ctdb_control_push_db(ctdb, indata);
case CTDB_CONTROL_SET_RECMODE: {
ctdb->recovery_mode = ((uint32_t *)(&indata.dptr[0]))[0];
return 0;
}
@ -531,6 +305,21 @@ static int ctdb_control_destructor(struct ctdb_control_state *state)
return 0;
}
/*
handle a timeout of a control
*/
static void ctdb_control_timeout(struct event_context *ev, struct timed_event *te,
struct timeval t, void *private_data)
{
struct ctdb_control_state *state = talloc_get_type(private_data, struct ctdb_control_state);
state->ctdb->status.timeouts.control++;
state->callback(state->ctdb, -1, tdb_null, state->private_data);
talloc_free(state);
}
/*
send a control message to a node
*/
@ -586,8 +375,8 @@ int ctdb_daemon_send_control(struct ctdb_context *ctdb, uint32_t destnode,
return 0;
}
#if CTDB_REQ_TIMEOUT
event_add_timed(ctdb->ev, state, timeval_current_ofs(CTDB_REQ_TIMEOUT, 0),
#if CTDB_CONTROL_TIMEOUT
event_add_timed(ctdb->ev, state, timeval_current_ofs(CTDB_CONTROL_TIMEOUT, 0),
ctdb_control_timeout, state);
#endif

View File

@ -361,7 +361,7 @@ int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
talloc_free(ctdb_db);
return -1;
}
DLIST_ADD(ctdb->db_list, ctdb_db);
/*

View File

@ -127,53 +127,243 @@ ctdb_control_getnodemap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA ind
return 0;
}
int
ctdb_control_writerecord(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
struct getkeys_params {
struct ctdb_context *ctdb;
uint32_t lmaster;
uint32_t rec_count;
struct getkeys_rec {
TDB_DATA key;
TDB_DATA data;
} *recs;
};
static int traverse_getkeys(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
{
struct ctdb_write_record *wr;
struct ctdb_db_context *ctdb_db;
struct ctdb_ltdb_header header;
TDB_DATA key, data;
int ret;
struct getkeys_params *params = (struct getkeys_params *)p;
uint32_t lmaster;
outdata->dsize = 0;
outdata->dptr = NULL;
lmaster = ctdb_lmaster(params->ctdb, &key);
wr = (struct ctdb_write_record *)indata.dptr;
ctdb_db = find_ctdb_db(ctdb, wr->dbid);
if (!ctdb_db) {
DEBUG(0,(__location__ " Unknown db 0x%08x\n", wr->dbid));
return -1;
/* only include this record if the lmaster matches or if
the wildcard lmaster (-1) was specified.
*/
if ((params->lmaster != CTDB_LMASTER_ANY) && (params->lmaster != lmaster)) {
return 0;
}
key.dsize = wr->keylen;
key.dptr = (unsigned char *)talloc_memdup(outdata, &wr->blob[0], wr->keylen);
data.dsize = wr->datalen;
data.dptr = (unsigned char *)talloc_memdup(outdata, &wr->blob[wr->keylen], wr->datalen);
ret = ctdb_ltdb_lock(ctdb_db, key);
if (ret != 0) {
DEBUG(0, (__location__ "Unable to lock db\n"));
return -1;
}
ret = ctdb_ltdb_fetch(ctdb_db, key, &header, outdata, NULL);
if (ret != 0) {
DEBUG(0, (__location__ "Unable to fetch record\n"));
ctdb_ltdb_unlock(ctdb_db, key);
return -1;
}
header.rsn++;
ret = ctdb_ltdb_store(ctdb_db, key, &header, data);
if (ret != 0) {
DEBUG(0, (__location__ "Unable to store record\n"));
ctdb_ltdb_unlock(ctdb_db, key);
return -1;
}
ctdb_ltdb_unlock(ctdb_db, key);
params->recs = talloc_realloc(NULL, params->recs, struct getkeys_rec, params->rec_count+1);
key.dptr = talloc_memdup(params->recs, key.dptr, key.dsize);
data.dptr = talloc_memdup(params->recs, data.dptr, data.dsize);
params->recs[params->rec_count].key = key;
params->recs[params->rec_count].data = data;
params->rec_count++;
return 0;
}
/*
pul a bunch of records from a ltdb, filtering by lmaster
*/
int32_t ctdb_control_pull_db(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata)
{
struct ctdb_control_pulldb *pull;
struct ctdb_db_context *ctdb_db;
struct getkeys_params params;
struct ctdb_control_pulldb_reply *reply;
int i;
size_t len = 0;
pull = (struct ctdb_control_pulldb *)indata.dptr;
ctdb_db = find_ctdb_db(ctdb, pull->db_id);
if (!ctdb_db) {
DEBUG(0,(__location__ " Unknown db\n"));
return -1;
}
params.ctdb = ctdb;
params.lmaster = pull->lmaster;
params.rec_count = 0;
params.recs = talloc_array(outdata, struct getkeys_rec, 0);
CTDB_NO_MEMORY(ctdb, params.recs);
if (tdb_lockall_nonblock(ctdb_db->ltdb->tdb) != 0) {
DEBUG(0,(__location__ " Failed to get nonblock lock on entired db - failing\n"));
return -1;
}
tdb_traverse_read(ctdb_db->ltdb->tdb, traverse_getkeys, &params);
tdb_unlockall(ctdb_db->ltdb->tdb);
reply = talloc(outdata, struct ctdb_control_pulldb_reply);
CTDB_NO_MEMORY(ctdb, reply);
reply->db_id = pull->db_id;
reply->count = params.rec_count;
len = offsetof(struct ctdb_control_pulldb_reply, data);
for (i=0;i<reply->count;i++) {
struct ctdb_rec_data *rec;
rec = ctdb_marshall_record(outdata, 0, params.recs[i].key, params.recs[i].data);
reply = talloc_realloc_size(outdata, reply, rec->length + len);
memcpy(len+(uint8_t *)reply, rec, rec->length);
len += rec->length;
talloc_free(rec);
}
talloc_free(params.recs);
outdata->dptr = (uint8_t *)reply;
outdata->dsize = len;
return 0;
}
/*
push a bunch of records into a ltdb, filtering by rsn
*/
int32_t ctdb_control_push_db(struct ctdb_context *ctdb, TDB_DATA indata)
{
struct ctdb_control_pulldb_reply *reply = (struct ctdb_control_pulldb_reply *)indata.dptr;
struct ctdb_db_context *ctdb_db;
int i, ret;
struct ctdb_rec_data *rec;
if (indata.dsize < offsetof(struct ctdb_control_pulldb_reply, data)) {
DEBUG(0,(__location__ " invalid data in pulldb reply\n"));
return -1;
}
ctdb_db = find_ctdb_db(ctdb, reply->db_id);
if (!ctdb_db) {
DEBUG(0,(__location__ " Unknown db 0x%08x\n", reply->db_id));
return -1;
}
if (tdb_lockall_nonblock(ctdb_db->ltdb->tdb) != 0) {
DEBUG(0,(__location__ " Failed to get nonblock lock on entired db - failing\n"));
return -1;
}
rec = (struct ctdb_rec_data *)&reply->data[0];
for (i=0;i<reply->count;i++) {
TDB_DATA key, data;
struct ctdb_ltdb_header *hdr, header;
key.dptr = &rec->data[0];
key.dsize = rec->keylen;
data.dptr = &rec->data[key.dsize];
data.dsize = rec->datalen;
if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
DEBUG(0,(__location__ " bad ltdb record\n"));
ctdb_ltdb_unlock(ctdb_db, key);
return -1;
}
hdr = (struct ctdb_ltdb_header *)data.dptr;
data.dptr += sizeof(*hdr);
data.dsize -= sizeof(*hdr);
ret = ctdb_ltdb_fetch(ctdb_db, key, &header, NULL, NULL);
if (ret != 0) {
DEBUG(0, (__location__ "Unable to fetch record\n"));
tdb_unlockall(ctdb_db->ltdb->tdb);
return -1;
}
if (header.rsn < hdr->rsn) {
ret = ctdb_ltdb_store(ctdb_db, key, hdr, data);
if (ret != 0) {
DEBUG(0, (__location__ "Unable to store record\n"));
tdb_unlockall(ctdb_db->ltdb->tdb);
return -1;
}
}
rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec);
}
tdb_unlockall(ctdb_db->ltdb->tdb);
return 0;
}
static int traverse_setdmaster(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
{
uint32_t *dmaster = (uint32_t *)p;
struct ctdb_ltdb_header *header = (struct ctdb_ltdb_header *)data.dptr;
int ret;
header->dmaster = *dmaster;
ret = tdb_store(tdb, key, data, TDB_REPLACE);
if (ret) {
DEBUG(0,(__location__ "failed to write tdb data back ret:%d\n",ret));
return ret;
}
return 0;
}
int32_t ctdb_control_set_dmaster(struct ctdb_context *ctdb, TDB_DATA indata)
{
struct ctdb_control_set_dmaster *p = (struct ctdb_control_set_dmaster *)indata.dptr;
struct ctdb_db_context *ctdb_db;
ctdb_db = find_ctdb_db(ctdb, p->db_id);
if (!ctdb_db) {
DEBUG(0,(__location__ " Unknown db 0x%08x\n", p->db_id));
return -1;
}
if (tdb_lockall_nonblock(ctdb_db->ltdb->tdb) != 0) {
DEBUG(0,(__location__ " Failed to get nonblock lock on entired db - failing\n"));
return -1;
}
tdb_traverse(ctdb_db->ltdb->tdb, traverse_setdmaster, &p->dmaster);
tdb_unlockall(ctdb_db->ltdb->tdb);
return 0;
}
static int traverse_cleardb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
{
int ret;
ret = tdb_delete(tdb, key);
if (ret) {
DEBUG(0,(__location__ "failed to delete tdb record\n"));
return ret;
}
return 0;
}
int32_t ctdb_control_clear_db(struct ctdb_context *ctdb, TDB_DATA indata)
{
uint32_t dbid = *(uint32_t *)indata.dptr;
struct ctdb_db_context *ctdb_db;
ctdb_db = find_ctdb_db(ctdb, dbid);
if (!ctdb_db) {
DEBUG(0,(__location__ " Unknown db 0x%08x\n",dbid));
return -1;
}
if (tdb_lockall_nonblock(ctdb_db->ltdb->tdb) != 0) {
DEBUG(0,(__location__ " Failed to get nonblock lock on entired db - failing\n"));
return -1;
}
tdb_traverse(ctdb_db->ltdb->tdb, traverse_cleardb, NULL);
tdb_unlockall(ctdb_db->ltdb->tdb);
return 0;
}

View File

@ -52,7 +52,7 @@ static void ctdb_traverse_local_handler(uint8_t *rawdata, size_t length, void *p
TDB_DATA key, data;
ctdb_traverse_fn_t callback = h->callback;
void *p = h->private_data;
struct ctdb_traverse_data *tdata = (struct ctdb_traverse_data *)rawdata;
struct ctdb_rec_data *tdata = (struct ctdb_rec_data *)rawdata;
if (rawdata == NULL || length < 4 || length != tdata->length) {
/* end of traverse */
@ -79,30 +79,6 @@ static int traverse_local_destructor(struct ctdb_traverse_local_handle *h)
return 0;
}
/*
form a ctdb_traverse_data record from a key/data pair
*/
static struct ctdb_traverse_data *ctdb_traverse_marshall_record(TALLOC_CTX *mem_ctx,
uint32_t reqid,
TDB_DATA key, TDB_DATA data)
{
size_t length;
struct ctdb_traverse_data *d;
length = offsetof(struct ctdb_traverse_data, data) + key.dsize + data.dsize;
d = (struct ctdb_traverse_data *)talloc_size(mem_ctx, length);
if (d == NULL) {
return NULL;
}
d->length = length;
d->reqid = reqid;
d->keylen = key.dsize;
d->datalen = data.dsize;
memcpy(&d->data[0], key.dptr, key.dsize);
memcpy(&d->data[key.dsize], data.dptr, data.dsize);
return d;
}
/*
callback from tdb_traverse_read()
*/
@ -110,7 +86,7 @@ static int ctdb_traverse_local_fn(struct tdb_context *tdb, TDB_DATA key, TDB_DAT
{
struct ctdb_traverse_local_handle *h = talloc_get_type(p,
struct ctdb_traverse_local_handle);
struct ctdb_traverse_data *d;
struct ctdb_rec_data *d;
struct ctdb_ltdb_header *hdr;
/* filter out non-authoritative and zero-length records */
@ -120,7 +96,7 @@ static int ctdb_traverse_local_fn(struct tdb_context *tdb, TDB_DATA key, TDB_DAT
return 0;
}
d = ctdb_traverse_marshall_record(h, 0, key, data);
d = ctdb_marshall_record(h, 0, key, data);
if (d == NULL) {
/* error handling is tricky in this child code .... */
return -1;
@ -147,8 +123,6 @@ struct ctdb_traverse_local_handle *ctdb_traverse_local(struct ctdb_db_context *c
struct ctdb_traverse_local_handle *h;
int ret;
ctdb_db->ctdb->status.traverse_calls++;
h = talloc_zero(ctdb_db, struct ctdb_traverse_local_handle);
if (h == NULL) {
return NULL;
@ -223,6 +197,18 @@ struct ctdb_traverse_all {
uint32_t vnn;
};
/* called when a traverse times out */
static void ctdb_traverse_all_timeout(struct event_context *ev, struct timed_event *te,
struct timeval t, void *private_data)
{
struct ctdb_traverse_all_handle *state = talloc_get_type(private_data, struct ctdb_traverse_all_handle);
state->ctdb->status.timeouts.traverse++;
state->callback(state->private_data, tdb_null, tdb_null);
talloc_free(state);
}
/*
setup a cluster-wide non-blocking traverse of a ctdb. The
callback function will be called on every record in the local
@ -269,6 +255,10 @@ struct ctdb_traverse_all_handle *ctdb_daemon_traverse_all(struct ctdb_db_context
return NULL;
}
/* timeout the traverse */
event_add_timed(ctdb->ev, state, timeval_current_ofs(CTDB_TRAVERSE_TIMEOUT, 0),
ctdb_traverse_all_timeout, state);
return state;
}
@ -286,9 +276,9 @@ static void traverse_all_callback(void *p, TDB_DATA key, TDB_DATA data)
{
struct traverse_all_state *state = talloc_get_type(p, struct traverse_all_state);
int ret;
struct ctdb_traverse_data *d;
struct ctdb_rec_data *d;
d = ctdb_traverse_marshall_record(state, state->reqid, key, data);
d = ctdb_marshall_record(state, state->reqid, key, data);
if (d == NULL) {
/* darn .... */
DEBUG(0,("Out of memory in traverse_all_callback\n"));
@ -351,7 +341,7 @@ int32_t ctdb_control_traverse_all(struct ctdb_context *ctdb, TDB_DATA data, TDB_
*/
int32_t ctdb_control_traverse_data(struct ctdb_context *ctdb, TDB_DATA data, TDB_DATA *outdata)
{
struct ctdb_traverse_data *d = (struct ctdb_traverse_data *)data.dptr;
struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
struct ctdb_traverse_all_handle *state;
TDB_DATA key;
ctdb_traverse_fn_t callback;
@ -406,11 +396,11 @@ struct traverse_start_state {
static void traverse_start_callback(void *p, TDB_DATA key, TDB_DATA data)
{
struct traverse_start_state *state;
struct ctdb_traverse_data *d;
struct ctdb_rec_data *d;
state = talloc_get_type(p, struct traverse_start_state);
d = ctdb_traverse_marshall_record(state, state->reqid, key, data);
d = ctdb_marshall_record(state, state->reqid, key, data);
if (d == NULL) {
return;
}

View File

@ -170,3 +170,27 @@ void ctdb_reqid_remove(struct ctdb_context *ctdb, uint32_t reqid)
DEBUG(0, ("Removing idr that does not exist\n"));
}
}
/*
form a ctdb_rec_data record from a key/data pair
*/
struct ctdb_rec_data *ctdb_marshall_record(TALLOC_CTX *mem_ctx, uint32_t reqid, TDB_DATA key, TDB_DATA data)
{
size_t length;
struct ctdb_rec_data *d;
length = offsetof(struct ctdb_rec_data, data) + key.dsize + data.dsize;
d = (struct ctdb_rec_data *)talloc_size(mem_ctx, length);
if (d == NULL) {
return NULL;
}
d->length = length;
d->reqid = reqid;
d->keylen = key.dsize;
d->datalen = data.dsize;
memcpy(&d->data[0], key.dptr, key.dsize);
memcpy(&d->data[key.dsize], data.dptr, data.dsize);
return d;
}

View File

@ -21,6 +21,7 @@
#include "includes.h"
#include "lib/events/events.h"
#include "system/filesys.h"
#include "system/time.h"
#include "popt.h"
#include "cmdline.h"
#include "../include/ctdb.h"
@ -409,13 +410,10 @@ static int do_recovery(struct ctdb_context *ctdb, struct event_context *ev,
/* build a new vnn map with all the currently active nodes */
vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
if (vnnmap == NULL) {
DEBUG(0,(__location__ " Unable to allocate vnn_map structure\n"));
return -1;
}
CTDB_NO_MEMORY(ctdb, vnnmap);
vnnmap->generation = generation;
vnnmap->size = num_active;
vnnmap->map = talloc_array(vnnmap, uint32_t, sizeof(uint32_t)*num_active);
vnnmap->map = talloc_array(vnnmap, uint32_t, vnnmap->size);
for (i=j=0;i<nodemap->num;i++) {
if (nodemap->nodes[i].flags&NODE_FLAGS_CONNECTED) {
vnnmap->map[j++]=nodemap->nodes[i].vnn;
@ -511,12 +509,11 @@ static void election_handler(struct ctdb_context *ctdb, uint64_t srvid,
struct election_message *em = (struct election_message *)data.dptr;
TALLOC_CTX *mem_ctx;
mem_ctx = talloc_new(ctdb);
if (em->vnn==ctdb_get_vnn(ctdb)) {
talloc_free(mem_ctx);
return;
}
mem_ctx = talloc_new(ctdb);
/* someone called an election. check their election data
and if we disagree and we would rather be the elected node,
@ -638,13 +635,12 @@ again:
goto again;
}
/* verify that the recmaster node is still active */
for (j=0; j<nodemap->num; j++) {
if (nodemap->nodes[j].vnn==recmaster) {
break;
}
}
}
if (!(nodemap->nodes[j].flags&NODE_FLAGS_CONNECTED)) {
DEBUG(0, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].vnn));
force_election(ctdb, mem_ctx, vnn, nodemap);
@ -746,7 +742,7 @@ again:
/* there better be the same number of lmasters in the vnn map
as there are active nodes or well have to do a recovery
as there are active nodes or we will have to do a recovery
*/
if (vnnmap->size != num_active) {
DEBUG(0, (__location__ "The vnnmap count is different from the number of active nodes. %d vs %d\n", vnnmap->size, num_active));
@ -866,6 +862,8 @@ int main(int argc, const char *argv[])
}
#endif
srandom(getpid() ^ time(NULL));
ev = event_context_init(NULL);
/* initialise ctdb */

View File

@ -175,10 +175,14 @@ struct ctdb_status {
uint32_t register_srvid;
uint32_t deregister_srvid;
} controls;
struct {
uint32_t call;
uint32_t control;
uint32_t traverse;
} timeouts;
uint32_t total_calls;
uint32_t pending_calls;
uint32_t lockwait_calls;
uint32_t traverse_calls;
uint32_t pending_lockwait_calls;
uint32_t memory_used;
uint32_t __last_counter; /* hack for control_status_all */
@ -260,23 +264,37 @@ struct ctdb_db_context {
#define CTDB_NO_MEMORY(ctdb, p) do { if (!(p)) { \
DEBUG(0,("Out of memory for %s at %s\n", #p, __location__)); \
ctdb_set_error(ctdb, "Out of memory at %s:%d", __FILE__, __LINE__); \
return -1; }} while (0)
#define CTDB_NO_MEMORY_VOID(ctdb, p) do { if (!(p)) { \
DEBUG(0,("Out of memory for %s at %s\n", #p, __location__)); \
ctdb_set_error(ctdb, "Out of memory at %s:%d", __FILE__, __LINE__); \
}} while (0)
#define CTDB_NO_MEMORY_NULL(ctdb, p) do { if (!(p)) { \
DEBUG(0,("Out of memory for %s at %s\n", #p, __location__)); \
ctdb_set_error(ctdb, "Out of memory at %s:%d", __FILE__, __LINE__); \
return NULL; }} while (0)
#define CTDB_NO_MEMORY_FATAL(ctdb, p) do { if (!(p)) { \
DEBUG(0,("Out of memory for %s at %s\n", #p, __location__)); \
ctdb_fatal(ctdb, "Out of memory in " __location__ ); \
}} while (0)
/* arbitrary maximum timeout for ctdb operations */
#define CTDB_REQ_TIMEOUT 0
/* timeout for ctdb call operations. When this timeout expires we
check if the generation count has changed, and if it has then
re-issue the call */
#define CTDB_CALL_TIMEOUT 2
/* timeout for ctdb control calls */
#define CTDB_CONTROL_TIMEOUT 10
/* timeout for ctdb traverse calls. When this is reached we cut short
the traverse */
#define CTDB_TRAVERSE_TIMEOUT 20
/* number of consecutive calls from the same node before we give them
the record */
@ -312,7 +330,6 @@ enum ctdb_controls {CTDB_CONTROL_PROCESS_EXISTS,
CTDB_CONTROL_STATUS_RESET,
CTDB_CONTROL_DB_ATTACH,
CTDB_CONTROL_SET_CALL,
CTDB_CONTROL_WRITE_RECORD,
CTDB_CONTROL_TRAVERSE_START,
CTDB_CONTROL_TRAVERSE_ALL,
CTDB_CONTROL_TRAVERSE_DATA,
@ -352,6 +369,8 @@ struct ctdb_call_state {
struct ctdb_db_context *ctdb_db;
const char *errmsg;
struct ctdb_call call;
uint32_t generation;
uint32_t resend_count;
struct {
void (*fn)(struct ctdb_call_state *);
void *private_data;
@ -710,9 +729,9 @@ struct ctdb_traverse_start {
};
/*
structure used to pass the data between the child and parent
structure used to pass record data between the child and parent
*/
struct ctdb_traverse_data {
struct ctdb_rec_data {
uint32_t length;
uint32_t reqid;
uint32_t keylen;
@ -721,6 +740,25 @@ struct ctdb_traverse_data {
};
/* structure used for pulldb control */
struct ctdb_control_pulldb {
uint32_t db_id;
uint32_t lmaster;
};
/* structure used for pulldb control */
struct ctdb_control_pulldb_reply {
uint32_t db_id;
uint32_t count;
uint8_t data[1];
};
/* set dmaster control structure */
struct ctdb_control_set_dmaster {
uint32_t db_id;
uint32_t dmaster;
};
int32_t ctdb_control_traverse_start(struct ctdb_context *ctdb, TDB_DATA indata,
TDB_DATA *outdata, uint32_t srcnode);
int32_t ctdb_control_traverse_all(struct ctdb_context *ctdb, TDB_DATA data, TDB_DATA *outdata);
@ -736,4 +774,11 @@ int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id);
int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode);
int32_t ctdb_ltdb_set_seqnum_frequency(struct ctdb_context *ctdb, uint32_t frequency);
struct ctdb_rec_data *ctdb_marshall_record(TALLOC_CTX *mem_ctx, uint32_t reqid, TDB_DATA key, TDB_DATA data);
int32_t ctdb_control_pull_db(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata);
int32_t ctdb_control_push_db(struct ctdb_context *ctdb, TDB_DATA indata);
int32_t ctdb_control_set_dmaster(struct ctdb_context *ctdb, TDB_DATA indata);
int32_t ctdb_control_clear_db(struct ctdb_context *ctdb, TDB_DATA indata);
#endif

View File

@ -288,7 +288,7 @@ int tdb_unlock(struct tdb_context *tdb, int list, int ltype)
/* lock/unlock entire database */
static int _tdb_lockall(struct tdb_context *tdb, int ltype)
static int _tdb_lockall(struct tdb_context *tdb, int ltype, int op)
{
/* There are no locks on read-only dbs */
if (tdb->read_only || tdb->traverse_read)
@ -309,9 +309,11 @@ static int _tdb_lockall(struct tdb_context *tdb, int ltype)
return TDB_ERRCODE(TDB_ERR_LOCK, -1);
}
if (tdb->methods->tdb_brlock(tdb, FREELIST_TOP, ltype, F_SETLKW,
if (tdb->methods->tdb_brlock(tdb, FREELIST_TOP, ltype, op,
0, 4*tdb->header.hash_size)) {
TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_lockall failed (%s)\n", strerror(errno)));
if (op == F_SETLKW) {
TDB_LOG((tdb, TDB_DEBUG_ERROR, "tdb_lockall failed (%s)\n", strerror(errno)));
}
return -1;
}
@ -321,6 +323,8 @@ static int _tdb_lockall(struct tdb_context *tdb, int ltype)
return 0;
}
/* unlock entire db */
static int _tdb_unlockall(struct tdb_context *tdb, int ltype)
{
@ -353,7 +357,13 @@ static int _tdb_unlockall(struct tdb_context *tdb, int ltype)
/* lock entire database with write lock */
int tdb_lockall(struct tdb_context *tdb)
{
return _tdb_lockall(tdb, F_WRLCK);
return _tdb_lockall(tdb, F_WRLCK, F_SETLKW);
}
/* lock entire database with write lock - nonblocking varient */
int tdb_lockall_nonblock(struct tdb_context *tdb)
{
return _tdb_lockall(tdb, F_WRLCK, F_SETLK);
}
/* unlock entire database with write lock */
@ -365,7 +375,13 @@ int tdb_unlockall(struct tdb_context *tdb)
/* lock entire database with read lock */
int tdb_lockall_read(struct tdb_context *tdb)
{
return _tdb_lockall(tdb, F_RDLCK);
return _tdb_lockall(tdb, F_RDLCK, F_SETLKW);
}
/* lock entire database with read lock - nonblock varient */
int tdb_lockall_read_nonblock(struct tdb_context *tdb)
{
return _tdb_lockall(tdb, F_RDLCK, F_SETLK);
}
/* unlock entire database with read lock */

View File

@ -116,8 +116,10 @@ int tdb_traverse(struct tdb_context *tdb, tdb_traverse_func fn, void *);
int tdb_traverse_read(struct tdb_context *tdb, tdb_traverse_func fn, void *);
int tdb_exists(struct tdb_context *tdb, TDB_DATA key);
int tdb_lockall(struct tdb_context *tdb);
int tdb_lockall_nonblock(struct tdb_context *tdb);
int tdb_unlockall(struct tdb_context *tdb);
int tdb_lockall_read(struct tdb_context *tdb);
int tdb_lockall_read_nonblock(struct tdb_context *tdb);
int tdb_unlockall_read(struct tdb_context *tdb);
const char *tdb_name(struct tdb_context *tdb);
int tdb_fd(struct tdb_context *tdb);

View File

@ -54,8 +54,6 @@ static void usage(void)
" setrecmode <vnn> <mode> set recovery mode\n"
" getrecmaster <vnn> get recovery master\n"
" setrecmaster <vnn> <master_vnn> set recovery master\n"
" writerecord <vnn> <dbid> <key> <data>\n"
" recover <vnn> recover the cluster\n"
" attach <dbname> attach a database\n"
" getpid <vnn> get the pid of a ctdb daemon\n"
);
@ -135,10 +133,12 @@ static void show_status(struct ctdb_status *s)
STATUS_FIELD(controls.set_seqnum_frequency),
STATUS_FIELD(controls.register_srvid),
STATUS_FIELD(controls.deregister_srvid),
STATUS_FIELD(timeouts.call),
STATUS_FIELD(timeouts.control),
STATUS_FIELD(timeouts.traverse),
STATUS_FIELD(total_calls),
STATUS_FIELD(pending_calls),
STATUS_FIELD(lockwait_calls),
STATUS_FIELD(traverse_calls),
STATUS_FIELD(pending_lockwait_calls),
STATUS_FIELD(memory_used),
STATUS_FIELD(max_hop_count),
@ -286,201 +286,6 @@ static int control_status_reset(struct ctdb_context *ctdb, int argc, const char
}
/*
perform a samba3 style recovery
*/
static int control_recover(struct ctdb_context *ctdb, int argc, const char **argv)
{
uint32_t vnn, num_nodes, generation, dmaster;
struct ctdb_vnn_map *vnnmap;
struct ctdb_node_map *nodemap=NULL;
int i, j, ret;
struct ctdb_dbid_map *dbmap=NULL;
if (argc < 1) {
usage();
}
vnn = strtoul(argv[0], NULL, 0);
printf("recover ctdb from node %d\n", vnn);
/* 1: find a list of all nodes */
printf("\n1: fetching list of nodes\n");
ret = ctdb_ctrl_getnodemap(ctdb, timeval_current_ofs(1, 0), vnn, ctdb, &nodemap);
if (ret != 0) {
printf("Unable to get nodemap from node %u\n", vnn);
return ret;
}
/* 2: count the active nodes */
printf("\n2: count number of active nodes\n");
num_nodes = 0;
for (i=0; i<nodemap->num; i++) {
if (nodemap->nodes[i].flags&NODE_FLAGS_CONNECTED) {
num_nodes++;
}
}
printf("number of active nodes:%d\n",num_nodes);
/* 3: go to all active nodes and activate recovery mode */
printf("\n3: set recovery mode for all active nodes\n");
for (j=0; j<nodemap->num; j++) {
/* dont change it for nodes that are unavailable */
if (!(nodemap->nodes[j].flags&NODE_FLAGS_CONNECTED)) {
continue;
}
printf("setting node %d to recovery mode\n",nodemap->nodes[j].vnn);
ret = ctdb_ctrl_setrecmode(ctdb, timeval_current_ofs(1, 0), nodemap->nodes[j].vnn, CTDB_RECOVERY_ACTIVE);
if (ret != 0) {
printf("Unable to set recmode on node %u\n", nodemap->nodes[j].vnn);
return ret;
}
}
/* 4: get a list of all databases */
printf("\n4: getting list of databases to recover\n");
ret = ctdb_ctrl_getdbmap(ctdb, timeval_current_ofs(1, 0), vnn, ctdb, &dbmap);
if (ret != 0) {
printf("Unable to get dbids from node %u\n", vnn);
return ret;
}
for (i=0;i<dbmap->num;i++) {
const char *path;
const char *name;
ctdb_ctrl_getdbpath(ctdb, timeval_current_ofs(1, 0), CTDB_CURRENT_NODE, dbmap->dbids[i], ctdb, &path);
ctdb_ctrl_getdbname(ctdb, timeval_current_ofs(1, 0), CTDB_CURRENT_NODE, dbmap->dbids[i], ctdb, &name);
printf("dbid:0x%08x name:%s path:%s\n", dbmap->dbids[i], name, path);
}
/* 5: pull all records from all other nodes across to this node
(this merges based on rsn internally)
*/
printf("\n5: merge all records from remote nodes\n");
for (i=0;i<dbmap->num;i++) {
printf("recovering database 0x%08x\n",dbmap->dbids[i]);
for (j=0; j<nodemap->num; j++) {
/* we dont need to merge with ourselves */
if (nodemap->nodes[j].vnn == vnn) {
continue;
}
/* dont merge from nodes that are unavailable */
if (!(nodemap->nodes[j].flags&NODE_FLAGS_CONNECTED)) {
continue;
}
printf("merging all records from node %d for database 0x%08x\n", nodemap->nodes[j].vnn, dbmap->dbids[i]);
ret = ctdb_ctrl_copydb(ctdb, timeval_current_ofs(1, 0), nodemap->nodes[j].vnn, vnn, dbmap->dbids[i], CTDB_LMASTER_ANY, ctdb);
if (ret != 0) {
printf("Unable to copy db from node %u to node %u\n", nodemap->nodes[j].vnn, vnn);
return ret;
}
}
}
/* 6: update dmaster to point to this node for all databases/nodes */
printf("\n6: repoint dmaster to the recovery node\n");
dmaster = vnn;
printf("new dmaster is %d\n", dmaster);
for (i=0;i<dbmap->num;i++) {
for (j=0; j<nodemap->num; j++) {
/* dont repoint nodes that are unavailable */
if (!(nodemap->nodes[j].flags&NODE_FLAGS_CONNECTED)) {
continue;
}
printf("setting dmaster to %d for node %d db 0x%08x\n",dmaster,nodemap->nodes[j].vnn,dbmap->dbids[i]);
ret = ctdb_ctrl_setdmaster(ctdb, timeval_current_ofs(1, 0), nodemap->nodes[j].vnn, ctdb, dbmap->dbids[i], dmaster);
if (ret != 0) {
printf("Unable to set dmaster for node %u db:0x%08x\n", nodemap->nodes[j].vnn, dbmap->dbids[i]);
return ret;
}
}
}
/* 7: push all records out to the nodes again */
printf("\n7: push all records to remote nodes\n");
for (i=0;i<dbmap->num;i++) {
printf("distributing new database 0x%08x\n",dbmap->dbids[i]);
for (j=0; j<nodemap->num; j++) {
/* we dont need to push to ourselves */
if (nodemap->nodes[j].vnn == vnn) {
continue;
}
/* dont push to nodes that are unavailable */
if (!(nodemap->nodes[j].flags&NODE_FLAGS_CONNECTED)) {
continue;
}
printf("pushing all records to node %d for database 0x%08x\n", nodemap->nodes[j].vnn, dbmap->dbids[i]);
ret = ctdb_ctrl_copydb(ctdb, timeval_current_ofs(1, 0), vnn, nodemap->nodes[j].vnn, dbmap->dbids[i], CTDB_LMASTER_ANY, ctdb);
if (ret != 0) {
printf("Unable to copy db from node %u to node %u\n", vnn, nodemap->nodes[j].vnn);
return ret;
}
}
}
/* 8: build a new vnn map */
printf("\n8: build a new vnn map with a new generation id\n");
vnnmap = talloc_zero_size(ctdb, offsetof(struct ctdb_vnn_map, map) + 4*num_nodes);
if (vnnmap == NULL) {
DEBUG(0,(__location__ " Unable to allocate vnn_map structure\n"));
exit(1);
}
generation = random();
vnnmap->generation = generation;
vnnmap->size = num_nodes;
for (i=j=0;i<nodemap->num;i++) {
if (nodemap->nodes[i].flags&NODE_FLAGS_CONNECTED) {
vnnmap->map[j++]=nodemap->nodes[i].vnn;
}
}
printf("Generation:%d\n",vnnmap->generation);
printf("Size:%d\n",vnnmap->size);
for(i=0;i<vnnmap->size;i++){
printf("hash:%d lmaster:%d\n",i,vnnmap->map[i]);
}
/* 9: push the new vnn map out to all the nodes */
printf("\n9: distribute the new vnn map\n");
for (j=0; j<nodemap->num; j++) {
/* dont push to nodes that are unavailable */
if (!(nodemap->nodes[j].flags&NODE_FLAGS_CONNECTED)) {
continue;
}
printf("setting new vnn map on node %d\n",nodemap->nodes[j].vnn);
ret = ctdb_ctrl_setvnnmap(ctdb, timeval_current_ofs(1, 0), nodemap->nodes[j].vnn, ctdb, vnnmap);
if (ret != 0) {
printf("Unable to set vnnmap for node %u\n", vnn);
return ret;
}
}
/* 10: disable recovery mode */
printf("\n10: restore recovery mode back to normal\n");
for (j=0; j<nodemap->num; j++) {
/* dont push to nodes that are unavailable */
if (!(nodemap->nodes[j].flags&NODE_FLAGS_CONNECTED)) {
continue;
}
printf("changing recovery mode back to normal for node %d\n",nodemap->nodes[j].vnn);
ret = ctdb_ctrl_setrecmode(ctdb, timeval_current_ofs(1, 0), nodemap->nodes[j].vnn, CTDB_RECOVERY_NORMAL);
if (ret != 0) {
printf("Unable to set recmode on node %u\n", nodemap->nodes[j].vnn);
return ret;
}
}
return 0;
}
/*
display remote ctdb vnn map
*/
@ -800,17 +605,12 @@ static int control_setvnnmap(struct ctdb_context *ctdb, int argc, const char **a
num_nodes = strtoul(argv[2], NULL, 0);
vnnmap = talloc(ctdb, struct ctdb_vnn_map);
if (vnnmap == NULL) {
DEBUG(0,(__location__ " Unable to allocate vnn_map structure\n"));
exit(1);
}
CTDB_NO_MEMORY(ctdb, vnnmap);
vnnmap->generation = generation;
vnnmap->size = num_nodes;
vnnmap->map = talloc_array(vnnmap, uint32_t, vnnmap->size);
if (vnnmap->map == NULL) {
DEBUG(0,(__location__ " Unable to allocate vnn_map->map array\n"));
exit(1);
}
CTDB_NO_MEMORY(ctdb, vnnmap->map);
for (i=0;i<vnnmap->size;i++) {
vnnmap->map[i] = strtoul(argv[3+i], NULL, 0);
@ -824,34 +624,6 @@ static int control_setvnnmap(struct ctdb_context *ctdb, int argc, const char **a
return 0;
}
/*
write a record to a remote tdb
*/
static int control_writerecord(struct ctdb_context *ctdb, int argc, const char **argv)
{
uint32_t vnn, dbid;
TDB_DATA key, data;
int ret;
if (argc < 4) {
usage();
}
vnn = strtoul(argv[0], NULL, 0);
dbid = strtoul(argv[1], NULL, 0);
key.dptr = discard_const(argv[2]);
key.dsize = strlen((const char *)(key.dptr));
data.dptr = discard_const(argv[3]);
data.dsize = strlen((const char *)(data.dptr));
ret = ctdb_ctrl_write_record(ctdb, vnn, ctdb, dbid, key, data);
if (ret != 0) {
printf("Unable to set vnnmap for node %u\n", vnn);
return ret;
}
return 0;
}
/*
set the dmaster for all records in a database
@ -1111,8 +883,6 @@ int main(int argc, const char *argv[])
{ "ping", control_ping },
{ "debug", control_debug },
{ "debuglevel", control_debuglevel },
{ "recover", control_recover },
{ "writerecord", control_writerecord },
{ "attach", control_attach },
{ "dumpmemory", control_dumpmemory },
{ "getpid", control_getpid },

View File

@ -1,11 +1,10 @@
#!/bin/sh
CTDB_CONTROL=./bin/ctdb_control
XPOS=0
export CTDB_CONTROL
$CTDB_CONTROL getnodemap 0 | egrep "^vnn:" | sed -e "s/^vnn://" -e "s/ .*$//" | while read NODE; do
xterm -geometry 30x25+$XPOS -e "while true; do sleep 1; clear; $CTDB_CONTROL getnodemap $NODE; $CTDB_CONTROL getvnnmap $NODE; $CTDB_CONTROL getrecmode $NODE; $CTDB_CONTROL getrecmaster $NODE;done" &
export XPOS=`expr $XPOS "+" "200"`
xterm -geometry 30x25 -e "watch -n1 \"$CTDB_CONTROL getnodemap $NODE; $CTDB_CONTROL getvnnmap $NODE; $CTDB_CONTROL getrecmode $NODE; $CTDB_CONTROL getrecmaster $NODE\"" &
done