1
0
mirror of https://github.com/samba-team/samba.git synced 2025-01-13 13:18:06 +03:00

merge from ronnie

(This used to be ctdb commit daba707370b28e200d9e55eb81fc819b62f8fbdd)
This commit is contained in:
Andrew Tridgell 2007-05-03 07:19:29 +10:00
commit 622dbb6337
9 changed files with 387 additions and 52 deletions

View File

@ -245,11 +245,11 @@ void ctdb_recv_pkt(struct ctdb_context *ctdb, uint8_t *data, uint32_t length)
is running in the same generation instance as this node
*/
if (ctdb->vnn_map->generation != hdr->generation) {
DEBUG(0,(__location__ " ctdb request %d of type"
" %d length %d from node %d to %d had an"
DEBUG(0,(__location__ " ctdb request %d"
" length %d from node %d to %d had an"
" invalid generation id:%d while our"
" generation id is:%d\n",
hdr->reqid, hdr->operation, hdr->length,
hdr->reqid, hdr->length,
hdr->srcnode, hdr->destnode,
ctdb->vnn_map->generation,
hdr->generation));
@ -259,10 +259,10 @@ void ctdb_recv_pkt(struct ctdb_context *ctdb, uint8_t *data, uint32_t length)
until the cluster has recovered.
*/
if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
DEBUG(0,(__location__ " ctdb request %d of type"
" %d length %d from node %d to %d"
DEBUG(0,(__location__ " ctdb request %d"
" length %d from node %d to %d"
" while we are in recovery mode\n",
hdr->reqid, hdr->operation, hdr->length,
hdr->reqid, hdr->length,
hdr->srcnode, hdr->destnode));
break;
}
@ -282,11 +282,63 @@ void ctdb_recv_pkt(struct ctdb_context *ctdb, uint8_t *data, uint32_t length)
break;
case CTDB_REQ_DMASTER:
/* verify that the remote node that sent us dmaster req
is running in the same generation instance as this node
*/
if (ctdb->vnn_map->generation != hdr->generation) {
DEBUG(0,(__location__ " ctdb dmaster request %d"
" length %d from node %d to %d had an"
" invalid generation id:%d while our"
" generation id is:%d\n",
hdr->reqid, hdr->length,
hdr->srcnode, hdr->destnode,
ctdb->vnn_map->generation,
hdr->generation));
break;
}
/* if we are in recovery mode we discard all traffic
until the cluster has recovered.
*/
if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
DEBUG(0,(__location__ " ctdb dmaster request %d"
" length %d from node %d to %d"
" while we are in recovery mode\n",
hdr->reqid, hdr->length,
hdr->srcnode, hdr->destnode));
break;
}
ctdb->status.count.req_dmaster++;
ctdb_request_dmaster(ctdb, hdr);
break;
case CTDB_REPLY_DMASTER:
/* verify that the remote node that sent us dmaster reply
is running in the same generation instance as this node
*/
if (ctdb->vnn_map->generation != hdr->generation) {
DEBUG(0,(__location__ " ctdb dmaster reply %d"
" length %d from node %d to %d had an"
" invalid generation id:%d while our"
" generation id is:%d\n",
hdr->reqid, hdr->length,
hdr->srcnode, hdr->destnode,
ctdb->vnn_map->generation,
hdr->generation));
break;
}
/* if we are in recovery mode we discard all traffic
until the cluster has recovered.
*/
if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
DEBUG(0,(__location__ " ctdb dmaster reply %d "
" length %d from node %d to %d"
" while we are in recovery mode\n",
hdr->reqid, hdr->length,
hdr->srcnode, hdr->destnode));
break;
}
ctdb->status.count.reply_dmaster++;
ctdb_reply_dmaster(ctdb, hdr);
break;

View File

@ -975,32 +975,34 @@ int ctdb_ctrl_pulldb(struct ctdb_context *ctdb, uint32_t destnode, uint32_t dbid
/* loop over all key/data pairs */
ptr=&outdata.dptr[8];
for(i=0;i<keys->num;i++){
uint32_t len;
TDB_DATA *key, *data;
keys->lmasters[i]= *((uint32_t *)ptr);
ptr+=4;
keys->lmasters[i] = *((uint32_t *)ptr);
ptr += 4;
key=&keys->keys[i];
key->dsize= *((uint32_t *)ptr);
ptr+=4;
key->dptr=talloc_size(mem_ctx, key->dsize);
key = &keys->keys[i];
key->dsize = *((uint32_t *)ptr);
key->dptr = talloc_size(mem_ctx, key->dsize);
ptr += 4;
data = &keys->data[i];
data->dsize = *((uint32_t *)ptr);
data->dptr = talloc_size(mem_ctx, data->dsize);
ptr += 4;
ptr = outdata.dptr+(((ptr-outdata.dptr)+CTDB_DS_ALIGNMENT-1)& ~(CTDB_DS_ALIGNMENT-1));
memcpy(key->dptr, ptr, key->dsize);
len = (key->dsize+CTDB_DS_ALIGNMENT-1)& ~(CTDB_DS_ALIGNMENT-1);
ptr+=len;
ptr += key->dsize;
ptr = outdata.dptr+(((ptr-outdata.dptr)+CTDB_DS_ALIGNMENT-1)& ~(CTDB_DS_ALIGNMENT-1));
memcpy(&keys->headers[i], ptr, sizeof(struct ctdb_ltdb_header));
len = (sizeof(struct ctdb_ltdb_header)+CTDB_DS_ALIGNMENT-1)& ~(CTDB_DS_ALIGNMENT-1);
ptr+=len;
ptr += sizeof(struct ctdb_ltdb_header);
data=&keys->data[i];
data->dsize= *((uint32_t *)ptr);
ptr+=4;
data->dptr=talloc_size(mem_ctx, data->dsize);
ptr = outdata.dptr+(((ptr-outdata.dptr)+CTDB_DS_ALIGNMENT-1)& ~(CTDB_DS_ALIGNMENT-1));
memcpy(data->dptr, ptr, data->dsize);
len = (data->dsize+CTDB_DS_ALIGNMENT-1)& ~(CTDB_DS_ALIGNMENT-1);
ptr+=len;
ptr += data->dsize;
ptr = outdata.dptr+(((ptr-outdata.dptr)+CTDB_DS_ALIGNMENT-1)& ~(CTDB_DS_ALIGNMENT-1));
}
return 0;
@ -1091,6 +1093,46 @@ int ctdb_ctrl_cleardb(struct ctdb_context *ctdb, uint32_t destnode, TALLOC_CTX *
return 0;
}
int ctdb_ctrl_write_record(struct ctdb_context *ctdb, uint32_t destnode, TALLOC_CTX *mem_ctx, uint32_t dbid, TDB_DATA key, TDB_DATA data)
{
int ret, len;
TDB_DATA indata, outdata;
int32_t res;
unsigned char *ptr;
len = 4; /* dbid */
len += 4; /* keylen */
len += (key.dsize+CTDB_DS_ALIGNMENT-1)& ~(CTDB_DS_ALIGNMENT-1);
len += 4; /* datalen */
len += (data.dsize+CTDB_DS_ALIGNMENT-1)& ~(CTDB_DS_ALIGNMENT-1);
indata.dsize = len;
indata.dptr = (unsigned char *)talloc_array(mem_ctx, uint8_t, len);
ptr = indata.dptr;
*((uint32_t *)ptr) = dbid;
ptr += 4;
*((uint32_t *)ptr) = key.dsize;
ptr += 4;
memcpy(ptr, key.dptr, key.dsize);
ptr += (key.dsize+CTDB_DS_ALIGNMENT-1)& ~(CTDB_DS_ALIGNMENT-1);
*((uint32_t *)ptr) = data.dsize;
ptr += 4;
memcpy(ptr, data.dptr, data.dsize);
ptr += (data.dsize+CTDB_DS_ALIGNMENT-1)& ~(CTDB_DS_ALIGNMENT-1);
ret = ctdb_control(ctdb, destnode, 0,
CTDB_CONTROL_WRITE_RECORD, 0, indata,
mem_ctx, &outdata, &res);
if (ret != 0 || res != 0) {
DEBUG(0,(__location__ " ctdb_control for write record failed\n"));
return -1;
}
return 0;
}
/*
ping a node, return number of clients connected
*/

View File

@ -91,20 +91,20 @@ static int traverse_getkeys(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data
/* only include this record if the lmaster matches or if
the wildcard lmaster (-1) was specified.
*/
if((lmaster!=CTDB_LMASTER_ANY)
&& (lmaster!=params->lmaster) ){
if((params->lmaster!=CTDB_LMASTER_ANY)
&& (params->lmaster!=lmaster) ){
return 0;
}
len=outdata->dsize;
len+=4; /*lmaster*/
len+=4; /*key len*/
len+=4; /*data len */
len+=key.dsize;
len=(len+CTDB_DS_ALIGNMENT-1)& ~(CTDB_DS_ALIGNMENT-1);
len+=sizeof(struct ctdb_ltdb_header);
len=(len+CTDB_DS_ALIGNMENT-1)& ~(CTDB_DS_ALIGNMENT-1);
len+=4; /*data len */
len+=data.dsize;
len+=(data.dsize-sizeof(struct ctdb_ltdb_header));
len=(len+CTDB_DS_ALIGNMENT-1)& ~(CTDB_DS_ALIGNMENT-1);
ptr=outdata->dptr=talloc_realloc_size(outdata, outdata->dptr, len);
@ -115,19 +115,21 @@ static int traverse_getkeys(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data
*((uint32_t *)ptr)=lmaster;
ptr+=4;
*((uint32_t *)ptr)=key.dsize;
ptr+=4;
memcpy(ptr, key.dptr, key.dsize);
ptr+= (key.dsize+CTDB_DS_ALIGNMENT-1)& ~(CTDB_DS_ALIGNMENT-1);
memcpy(ptr, data.dptr, sizeof(struct ctdb_ltdb_header));
ptr+=(sizeof(struct ctdb_ltdb_header)+CTDB_DS_ALIGNMENT-1)& ~(CTDB_DS_ALIGNMENT-1);
*((uint32_t *)ptr)=data.dsize-sizeof(struct ctdb_ltdb_header);
ptr+=4;
ptr = outdata->dptr+(((ptr-outdata->dptr)+CTDB_DS_ALIGNMENT-1)& ~(CTDB_DS_ALIGNMENT-1));
memcpy(ptr, key.dptr, key.dsize);
ptr += key.dsize;
ptr = outdata->dptr+(((ptr-outdata->dptr)+CTDB_DS_ALIGNMENT-1)& ~(CTDB_DS_ALIGNMENT-1));
memcpy(ptr, data.dptr, sizeof(struct ctdb_ltdb_header));
ptr += sizeof(struct ctdb_ltdb_header);
ptr = outdata->dptr+(((ptr-outdata->dptr)+CTDB_DS_ALIGNMENT-1)& ~(CTDB_DS_ALIGNMENT-1));
memcpy(ptr, data.dptr+sizeof(struct ctdb_ltdb_header), data.dsize-sizeof(struct ctdb_ltdb_header));
ptr+= (data.dsize-sizeof(struct ctdb_ltdb_header)+CTDB_DS_ALIGNMENT-1)& ~(CTDB_DS_ALIGNMENT-1);
return 0;
}
@ -355,23 +357,32 @@ static int32_t ctdb_control_dispatch(struct ctdb_context *ctdb,
ptr=&indata.dptr[8];
for(i=0;i<num;i++){
/* skip the lmaster*/
ptr+=4;
ptr += 4;
/* keylength */
key.dsize = *((uint32_t *)ptr);
ptr += 4;
/* data length */
data.dsize = *((uint32_t *)ptr);
ptr += 4;
/* key */
key.dsize=*((uint32_t *)ptr);
ptr+=4;
key.dptr=ptr;
ptr+=(key.dsize+CTDB_DS_ALIGNMENT-1)& ~(CTDB_DS_ALIGNMENT-1);
ptr = indata.dptr+(((ptr-indata.dptr)+CTDB_DS_ALIGNMENT-1)& ~(CTDB_DS_ALIGNMENT-1));
key.dptr = ptr;
ptr += key.dsize;
/* header */
ptr = indata.dptr+(((ptr-indata.dptr)+CTDB_DS_ALIGNMENT-1)& ~(CTDB_DS_ALIGNMENT-1));
hdr = (struct ctdb_ltdb_header *)ptr;
ptr+=(sizeof(struct ctdb_ltdb_header)+CTDB_DS_ALIGNMENT-1)& ~(CTDB_DS_ALIGNMENT-1);
ptr += sizeof(struct ctdb_ltdb_header);
/* data */
data.dsize=*((uint32_t *)ptr);
ptr+=4;
ptr = indata.dptr+(((ptr-indata.dptr)+CTDB_DS_ALIGNMENT-1)& ~(CTDB_DS_ALIGNMENT-1));
data.dptr=ptr;
ptr+=(data.dsize+CTDB_DS_ALIGNMENT-1)& ~(CTDB_DS_ALIGNMENT-1);
ptr += data.dsize;
ptr = indata.dptr+(((ptr-indata.dptr)+CTDB_DS_ALIGNMENT-1)& ~(CTDB_DS_ALIGNMENT-1));
ret = ctdb_ltdb_lock(ctdb_db, key);
if (ret != 0) {
@ -398,6 +409,60 @@ static int32_t ctdb_control_dispatch(struct ctdb_context *ctdb,
return 0;
}
case CTDB_CONTROL_WRITE_RECORD: {
uint32_t dbid;
struct ctdb_db_context *ctdb_db;
unsigned char *ptr;
TDB_DATA key, data;
struct ctdb_ltdb_header header;
int ret;
outdata->dsize = 0;
outdata->dptr = NULL;
dbid = ((uint32_t *)(&indata.dptr[0]))[0];
ctdb_db = find_ctdb_db(ctdb, dbid);
if (!ctdb_db) {
DEBUG(0,(__location__ " Unknown db 0x%08x\n",dbid));
return -1;
}
ptr = &indata.dptr[4];
key.dsize = *((uint32_t *)(ptr));
ptr += 4;
key.dptr = ptr;
ptr += (key.dsize+CTDB_DS_ALIGNMENT-1)& ~(CTDB_DS_ALIGNMENT-1);
data.dsize = *((uint32_t *)(ptr));
ptr += 4;
data.dptr = ptr;
ptr += (data.dsize+CTDB_DS_ALIGNMENT-1)& ~(CTDB_DS_ALIGNMENT-1);
ret = ctdb_ltdb_lock(ctdb_db, key);
if (ret != 0) {
DEBUG(0, (__location__ "Unable to lock db\n"));
return -1;
}
ret = ctdb_ltdb_fetch(ctdb_db, key, &header, outdata, NULL);
if (ret != 0) {
DEBUG(0, (__location__ "Unable to fetch record\n"));
ctdb_ltdb_unlock(ctdb_db, key);
return -1;
}
header.rsn++;
ret = ctdb_ltdb_store(ctdb_db, key, &header, data);
if (ret != 0) {
DEBUG(0, (__location__ "Unable to store record\n"));
ctdb_ltdb_unlock(ctdb_db, key);
return -1;
}
ctdb_ltdb_unlock(ctdb_db, key);
return 0;
}
case CTDB_CONTROL_SET_RECMODE: {
ctdb->recovery_mode = ((uint32_t *)(&indata.dptr[0]))[0];
@ -440,10 +505,10 @@ static int32_t ctdb_control_dispatch(struct ctdb_context *ctdb,
return ctdb_control_db_attach(ctdb, indata, outdata);
case CTDB_CONTROL_SET_CALL: {
struct ctdb_control_set_call *c =
struct ctdb_control_set_call *sc =
(struct ctdb_control_set_call *)indata.dptr;
CHECK_CONTROL_DATA_SIZE(sizeof(struct ctdb_control_set_call));
return ctdb_daemon_set_call(ctdb, c->db_id, c->fn, c->id);
return ctdb_daemon_set_call(ctdb, sc->db_id, sc->fn, sc->id);
}
default:

4
ctdb/direct/4nodes.txt Normal file
View File

@ -0,0 +1,4 @@
127.0.0.1:9001
127.0.0.2:9001
127.0.0.3:9001
127.0.0.4:9001

View File

@ -259,6 +259,10 @@ int ctdb_ctrl_setdmaster(struct ctdb_context *ctdb, uint32_t destnode, TALLOC_CT
*/
int ctdb_ctrl_cleardb(struct ctdb_context *ctdb, uint32_t destnode, TALLOC_CTX *mem_ctx, uint32_t dbid);
/*
write a record on a specific db (this implicitely updates dmaster of the record to locally be the vnn of the node where the control is executed on)
*/
int ctdb_ctrl_write_record(struct ctdb_context *ctdb, uint32_t destnode, TALLOC_CTX *mem_ctx, uint32_t dbid, TDB_DATA key, TDB_DATA data);
#define CTDB_RECOVERY_NORMAL 0
#define CTDB_RECOVERY_ACTIVE 1

View File

@ -266,7 +266,8 @@ enum ctdb_controls {CTDB_CONTROL_PROCESS_EXISTS,
CTDB_CONTROL_SET_RECMODE,
CTDB_CONTROL_STATUS_RESET,
CTDB_CONTROL_DB_ATTACH,
CTDB_CONTROL_SET_CALL};
CTDB_CONTROL_SET_CALL,
CTDB_CONTROL_WRITE_RECORD};
/*
structure passed in set_call control
@ -624,4 +625,8 @@ int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
int ctdb_daemon_set_call(struct ctdb_context *ctdb, uint32_t db_id,
ctdb_fn_t fn, int id);
int ctdb_control(struct ctdb_context *ctdb, uint32_t destnode, uint64_t srvid,
uint32_t opcode, uint32_t flags, TDB_DATA data,
TALLOC_CTX *mem_ctx, TDB_DATA *outdata, int32_t *status);
#endif

View File

@ -243,10 +243,6 @@ int main(int argc, const char *argv[])
printf("DATA:\n%s\n", (char *)call.reply_data.dptr);
#if 1
/* to keep the cluster up so one can play with it using the controls */
sleep(9999999);
#endif
/* go into a wait loop to allow other nodes to complete */
ctdb_shutdown(ctdb);

102
ctdb/tests/recover.sh Executable file
View File

@ -0,0 +1,102 @@
#!/bin/sh
killall -q ctdbd
echo "Starting 4 ctdb daemons"
bin/ctdbd --nlist direct/4nodes.txt
bin/ctdbd --nlist direct/4nodes.txt
bin/ctdbd --nlist direct/4nodes.txt
bin/ctdbd --nlist direct/4nodes.txt
echo "Attaching to some databases"
bin/ctdb_control --socket=/tmp/ctdb.socket attach test1.tdb || exit 1
bin/ctdb_control --socket=/tmp/ctdb.socket attach test2.tdb || exit 1
bin/ctdb_control --socket=/tmp/ctdb.socket attach test3.tdb || exit 1
bin/ctdb_control --socket=/tmp/ctdb.socket attach test4.tdb || exit 1
echo "Clearing all databases to make sure they are all empty"
bin/ctdb_control --socket=/tmp/ctdb.socket getdbmap 0 | egrep "^dbid:" | sed -e "s/^dbid://" -e "s/ .*$//" | while read DB; do
seq 0 3 | while read NODE; do
bin/ctdb_control --socket=/tmp/ctdb.socket cleardb $NODE $DB
done
done
echo
echo
echo "Printing all databases on all nodes. they should all be empty"
echo "============================================================="
bin/ctdb_control --socket=/tmp/ctdb.socket getdbmap 0 | egrep "^dbid:" | sed -e "s/^dbid://" -e "s/ .*$//" | while read DB; do
seq 0 3 | while read NODE; do
echo "Content of DB:$DB NODE:$NODE :"
bin/ctdb_control --socket=/tmp/ctdb.socket catdb $NODE $DB
done
done
echo
echo
echo "Populating the databases"
./bin/ctdb_control --socket=/tmp/ctdb.socket writerecord 0 0x220c2a7b testkey1 testdata1
./bin/ctdb_control --socket=/tmp/ctdb.socket setdmaster 0 0x220c2a7b 1
./bin/ctdb_control --socket=/tmp/ctdb.socket writerecord 1 0x220c2a7b testkey1 testdata1
./bin/ctdb_control --socket=/tmp/ctdb.socket writerecord 1 0x220c2a7b testkey1 testdata1
./bin/ctdb_control --socket=/tmp/ctdb.socket setdmaster 1 0x220c2a7b 2
./bin/ctdb_control --socket=/tmp/ctdb.socket writerecord 2 0x220c2a7b testkey1 testdata1
./bin/ctdb_control --socket=/tmp/ctdb.socket writerecord 2 0x220c2a7b testkey1 testdata1
./bin/ctdb_control --socket=/tmp/ctdb.socket writerecord 2 0x220c2a7b testkey1 testdata1
./bin/ctdb_control --socket=/tmp/ctdb.socket setdmaster 2 0x220c2a7b 3
./bin/ctdb_control --socket=/tmp/ctdb.socket writerecord 3 0x220c2a7b testkey1 testdata1
./bin/ctdb_control --socket=/tmp/ctdb.socket writerecord 3 0x220c2a7b testkey1 testdata1
./bin/ctdb_control --socket=/tmp/ctdb.socket writerecord 3 0x220c2a7b testkey1 testdata1
./bin/ctdb_control --socket=/tmp/ctdb.socket writerecord 3 0x220c2a7b testkey1 testdata1
./bin/ctdb_control --socket=/tmp/ctdb.socket setdmaster 3 0x220c2a7b 3
echo
echo
echo "Printing all databases on all nodes. there should be a record there"
echo "============================================================="
bin/ctdb_control --socket=/tmp/ctdb.socket getdbmap 0 | egrep "^dbid:" | sed -e "s/^dbid://" -e "s/ .*$//" | while read DB; do
seq 0 3 | while read NODE; do
echo "Content of DB:$DB NODE:$NODE :"
bin/ctdb_control --socket=/tmp/ctdb.socket catdb $NODE $DB
done
done
echo
echo
echo "killing off node #0"
echo "==================="
CTDBPID=`ps aux | grep ctdbd | grep -v grep | head -1 | sed -e "s/^[^ ]* *//" -e "s/ .*$//"`
kill $CTDBPID
sleep 1
echo
echo
echo "Recovery the cluster"
echo "===================="
./bin/ctdb_control --socket=/tmp/ctdb.socket recover 2 0x220c2a7b
echo
echo
echo "Printing all databases on all nodes."
echo "The databases should be the same now on all nodes"
echo "and the record will have been migrated to node 0"
echo "================================================="
echo "Node 1:"
bin/ctdb_control --socket=/tmp/ctdb.socket catdb 1 0x220c2a7b
echo "Node 2:"
bin/ctdb_control --socket=/tmp/ctdb.socket catdb 2 0x220c2a7b
echo "Node 3:"
bin/ctdb_control --socket=/tmp/ctdb.socket catdb 3 0x220c2a7b
echo "nodemap:"
bin/ctdb_control --socket=/tmp/ctdb.socket getnodemap 3
#leave the ctdb daemons running so one can look at the box in more detail
#killall -q ctdbd

View File

@ -23,6 +23,7 @@
#include "system/filesys.h"
#include "popt.h"
#include "cmdline.h"
#include "../include/ctdb.h"
#include "../include/ctdb_private.h"
@ -44,12 +45,14 @@ static void usage(void)
" setvnnmap <vnn> <generation> <numslots> <lmaster>*\n"
" getdbmap <vnn> lists databases on a node\n"
" getnodemap <vnn> lists nodes known to a ctdb daemon\n"
" createdb <vnn> <dbname> create a database\n"
" catdb <vnn> <dbid> lists all keys in a remote tdb\n"
" cpdb <fromvnn> <tovnn> <dbid> lists all keys in a remote tdb\n"
" setdmaster <vnn> <dbid> <dmaster> sets new dmaster for all records in the database\n"
" cleardb <vnn> <dbid> deletes all records in a db\n"
" getrecmode <vnn> get recovery mode\n"
" setrecmode <vnn> <mode> set recovery mode\n"
" writerecord <vnn> <dbid> <key> <data>\n"
" recover <vnn> recover the cluster\n"
" attach <dbname> attach a database\n");
exit(1);
@ -665,6 +668,35 @@ static int control_setvnnmap(struct ctdb_context *ctdb, int argc, const char **a
return 0;
}
/*
write a record to a remote tdb
*/
static int control_writerecord(struct ctdb_context *ctdb, int argc, const char **argv)
{
uint32_t vnn, dbid;
TDB_DATA key, data;
int ret;
if (argc < 4) {
usage();
}
vnn = strtoul(argv[0], NULL, 0);
dbid = strtoul(argv[1], NULL, 0);
key.dptr = discard_const(argv[2]);
key.dsize = strlen((const char *)(key.dptr));
data.dptr = discard_const(argv[3]);
data.dsize = strlen((const char *)(data.dptr));
ret = ctdb_ctrl_write_record(ctdb, vnn, ctdb, dbid, key, data);
if (ret != 0) {
printf("Unable to set vnnmap for node %u\n", vnn);
return ret;
}
return 0;
}
/*
set the dmaster for all records in a database
*/
@ -712,6 +744,37 @@ static int control_cleardb(struct ctdb_context *ctdb, int argc, const char **arg
return 0;
}
/*
create a database
*/
static int control_createdb(struct ctdb_context *ctdb, int argc, const char **argv)
{
uint32_t vnn;
const char *dbname;
int ret;
int32_t res;
TDB_DATA data;
if (argc < 2) {
usage();
}
vnn = strtoul(argv[0], NULL, 0);
dbname = argv[1];
/* tell ctdb daemon to attach */
data.dptr = discard_const(dbname);
data.dsize = strlen(dbname)+1;
ret = ctdb_control(ctdb, vnn, 0, CTDB_CONTROL_DB_ATTACH,
0, data, ctdb, &data, &res);
if (ret != 0 || res != 0 || data.dsize != sizeof(uint32_t)) {
DEBUG(0,("Failed to attach to database '%s'\n", dbname));
return -1;
}
return 0;
}
/*
ping all node
*/
@ -858,6 +921,7 @@ int main(int argc, const char *argv[])
{ "cpdb", control_cpdb },
{ "setvnnmap", control_setvnnmap },
{ "setdmaster", control_setdmaster },
{ "createdb", control_createdb },
{ "cleardb", control_cleardb },
{ "getrecmode", control_getrecmode },
{ "setrecmode", control_setrecmode },
@ -865,6 +929,7 @@ int main(int argc, const char *argv[])
{ "debug", control_debug },
{ "debuglevel", control_debuglevel },
{ "recover", control_recover },
{ "writerecord", control_writerecord },
{ "attach", control_attach },
};