1
0
mirror of https://github.com/samba-team/samba.git synced 2025-01-11 05:18:09 +03:00

merge from tridge

(This used to be ctdb commit bb283ee8ebaea848366e9c3b3d3244da459a7967)
This commit is contained in:
Ronnie Sahlberg 2007-09-21 13:20:29 +10:00
commit e9f45419da
21 changed files with 774 additions and 139 deletions

View File

@ -48,10 +48,10 @@ CTDB_SERVER_OBJ = server/ctdbd.o server/ctdb_daemon.o server/ctdb_lockwait.o \
server/ctdb_tunables.o server/ctdb_monitor.o server/ctdb_server.o \
server/ctdb_control.o server/ctdb_call.o server/ctdb_ltdb_server.o \
server/ctdb_traverse.o server/eventscript.o server/ctdb_takeover.o \
server/ctdb_serverids.o \
server/ctdb_serverids.o server/ctdb_persistent.o \
$(CTDB_CLIENT_OBJ) $(CTDB_TCP_OBJ) @INFINIBAND_WRAPPER_OBJ@
TEST_BINS=bin/ctdb_bench bin/ctdb_fetch bin/ctdb_store bin/rb_test \
TEST_BINS=bin/ctdb_bench bin/ctdb_fetch bin/ctdb_store bin/ctdb_persistent bin/rb_test \
@INFINIBAND_BINS@
BINS = bin/ctdb @CTDB_SCSI_IO@ bin/smnotify
@ -120,6 +120,10 @@ bin/ctdb_store: $(CTDB_CLIENT_OBJ) tests/ctdb_store.o
@echo Linking $@
@$(CC) $(CFLAGS) -o $@ tests/ctdb_store.o $(CTDB_CLIENT_OBJ) $(LIB_FLAGS)
bin/ctdb_persistent: $(CTDB_CLIENT_OBJ) tests/ctdb_persistent.o
@echo Linking $@
@$(CC) $(CFLAGS) -o $@ tests/ctdb_persistent.o $(CTDB_CLIENT_OBJ) $(LIB_FLAGS)
bin/ibwrapper_test: $(CTDB_CLIENT_OBJ) ib/ibwrapper_test.o
@echo Linking $@
@$(CC) $(CFLAGS) -o $@ ib/ibwrapper_test.o $(CTDB_CLIENT_OBJ) $(LIB_FLAGS)

View File

@ -638,7 +638,46 @@ again:
*/
int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data)
{
return ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data);
int ret;
int32_t status;
struct ctdb_rec_data *rec;
TDB_DATA recdata;
if (h->ctdb_db->persistent) {
h->header.rsn++;
}
ret = ctdb_ltdb_store(h->ctdb_db, h->key, &h->header, data);
if (ret != 0) {
return ret;
}
/* don't need the persistent_store control for non-persistent databases */
if (!h->ctdb_db->persistent) {
return 0;
}
rec = ctdb_marshall_record(h, h->ctdb_db->db_id, h->key, &h->header, data);
if (rec == NULL) {
DEBUG(0,("Unable to marshall record in ctdb_record_store\n"));
return -1;
}
recdata.dptr = (uint8_t *)rec;
recdata.dsize = rec->length;
ret = ctdb_control(h->ctdb_db->ctdb, CTDB_CURRENT_NODE, 0,
CTDB_CONTROL_PERSISTENT_STORE, 0,
recdata, NULL, NULL, &status, NULL, NULL);
talloc_free(rec);
if (ret != 0 || status != 0) {
DEBUG(0,("Failed persistent store in ctdb_record_store\n"));
return -1;
}
return 0;
}
/*
@ -1449,7 +1488,8 @@ int ctdb_ctrl_getdbname(struct ctdb_context *ctdb, struct timeval timeout, uint3
/*
create a database
*/
int ctdb_ctrl_createdb(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, const char *name)
int ctdb_ctrl_createdb(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode,
TALLOC_CTX *mem_ctx, const char *name, bool persistent)
{
int ret;
int32_t res;
@ -1459,7 +1499,8 @@ int ctdb_ctrl_createdb(struct ctdb_context *ctdb, struct timeval timeout, uint32
data.dsize = strlen(name)+1;
ret = ctdb_control(ctdb, destnode, 0,
CTDB_CONTROL_DB_ATTACH, 0, data,
persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH,
0, data,
mem_ctx, &data, &res, &timeout, NULL);
if (ret != 0 || res != 0) {
@ -1571,7 +1612,7 @@ int ctdb_statistics_reset(struct ctdb_context *ctdb, uint32_t destnode)
/*
attach to a specific database - client call
*/
struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb, const char *name)
struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb, const char *name, bool persistent)
{
struct ctdb_db_context *ctdb_db;
TDB_DATA data;
@ -1594,7 +1635,8 @@ struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb, const char *name)
data.dsize = strlen(name)+1;
/* tell ctdb daemon to attach */
ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0, CTDB_CONTROL_DB_ATTACH,
ret = ctdb_control(ctdb, CTDB_CURRENT_NODE, 0,
persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:CTDB_CONTROL_DB_ATTACH,
0, data, ctdb_db, &data, &res, NULL, NULL);
if (ret != 0 || res != 0 || data.dsize != sizeof(uint32_t)) {
DEBUG(0,("Failed to attach to database '%s'\n", name));
@ -1619,6 +1661,8 @@ struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb, const char *name)
return NULL;
}
ctdb_db->persistent = persistent;
DLIST_ADD(ctdb->db_list, ctdb_db);
return ctdb_db;

View File

@ -165,13 +165,20 @@ void ctdb_reqid_remove(struct ctdb_context *ctdb, uint32_t reqid)
/*
form a ctdb_rec_data record from a key/data pair
note that header may be NULL. If not NULL then it is included in the data portion
of the record
*/
struct ctdb_rec_data *ctdb_marshall_record(TALLOC_CTX *mem_ctx, uint32_t reqid, TDB_DATA key, TDB_DATA data)
struct ctdb_rec_data *ctdb_marshall_record(TALLOC_CTX *mem_ctx, uint32_t reqid,
TDB_DATA key,
struct ctdb_ltdb_header *header,
TDB_DATA data)
{
size_t length;
struct ctdb_rec_data *d;
length = offsetof(struct ctdb_rec_data, data) + key.dsize + data.dsize;
length = offsetof(struct ctdb_rec_data, data) + key.dsize +
data.dsize + (header?sizeof(*header):0);
d = (struct ctdb_rec_data *)talloc_size(mem_ctx, length);
if (d == NULL) {
return NULL;
@ -179,9 +186,15 @@ struct ctdb_rec_data *ctdb_marshall_record(TALLOC_CTX *mem_ctx, uint32_t reqid,
d->length = length;
d->reqid = reqid;
d->keylen = key.dsize;
d->datalen = data.dsize;
memcpy(&d->data[0], key.dptr, key.dsize);
memcpy(&d->data[key.dsize], data.dptr, data.dsize);
if (header) {
d->datalen = data.dsize + sizeof(*header);
memcpy(&d->data[key.dsize], header, sizeof(*header));
memcpy(&d->data[key.dsize+sizeof(*header)], data.dptr, data.dsize);
} else {
d->datalen = data.dsize;
memcpy(&d->data[key.dsize], data.dptr, data.dsize);
}
return d;
}

View File

@ -134,6 +134,7 @@ int ctdb_set_transport(struct ctdb_context *ctdb, const char *transport);
set the directory for the local databases
*/
int ctdb_set_tdb_dir(struct ctdb_context *ctdb, const char *dir);
int ctdb_set_tdb_dir_persistent(struct ctdb_context *ctdb, const char *dir);
/*
set some flags
@ -167,7 +168,7 @@ int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork);
/*
attach to a ctdb database
*/
struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb, const char *name);
struct ctdb_db_context *ctdb_attach(struct ctdb_context *ctdb, const char *name, bool persistent);
/*
find an attached ctdb_db handle given a name
@ -267,7 +268,10 @@ int ctdb_ctrl_setvnnmap(struct ctdb_context *ctdb,
*/
struct ctdb_dbid_map {
uint32_t num;
uint32_t dbids[1];
struct ctdb_dbid {
uint32_t dbid;
bool persistent;
} dbs[1];
};
int ctdb_ctrl_getdbmap(struct ctdb_context *ctdb,
struct timeval timeout, uint32_t destnode,
@ -295,7 +299,7 @@ int ctdb_ctrl_copydb(struct ctdb_context *ctdb,
int ctdb_ctrl_getdbpath(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, const char **path);
int ctdb_ctrl_getdbname(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, uint32_t dbid, TALLOC_CTX *mem_ctx, const char **name);
int ctdb_ctrl_createdb(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, const char *name);
int ctdb_ctrl_createdb(struct ctdb_context *ctdb, struct timeval timeout, uint32_t destnode, TALLOC_CTX *mem_ctx, const char *name, bool persistent);
int ctdb_ctrl_process_exists(struct ctdb_context *ctdb, uint32_t destnode, pid_t pid);

View File

@ -328,6 +328,7 @@ struct ctdb_context {
struct ctdb_address address;
const char *name;
const char *db_directory;
const char *db_directory_persistent;
const char *transport;
const char *logfile;
char *node_list_file;
@ -365,6 +366,7 @@ struct ctdb_db_context {
struct ctdb_db_context *next, *prev;
struct ctdb_context *ctdb;
uint32_t db_id;
bool persistent;
const char *db_name;
const char *db_path;
struct tdb_wrap *ltdb;
@ -465,6 +467,9 @@ enum ctdb_controls {CTDB_CONTROL_PROCESS_EXISTS = 0,
CTDB_CONTROL_UNREGISTER_SERVER_ID = 58,
CTDB_CONTROL_CHECK_SERVER_ID = 59,
CTDB_CONTROL_GET_SERVER_ID_LIST = 60,
CTDB_CONTROL_DB_ATTACH_PERSISTENT = 61,
CTDB_CONTROL_PERSISTENT_STORE = 62,
CTDB_CONTROL_UPDATE_RECORD = 63,
};
/*
@ -516,6 +521,15 @@ struct ctdb_control_tcp_vnn {
struct sockaddr_in dest;
};
/*
persistent store control - update this record on all other nodes
*/
struct ctdb_control_persistent_store {
uint32_t db_id;
uint32_t len;
uint8_t data[1];
};
/*
structure used for CTDB_SRVID_NODE_FLAGS_CHANGED
*/
@ -857,7 +871,7 @@ int ctdb_daemon_send_control(struct ctdb_context *ctdb, uint32_t destnode,
void *private_data);
int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
TDB_DATA *outdata);
TDB_DATA *outdata, bool persistent);
int ctdb_daemon_set_call(struct ctdb_context *ctdb, uint32_t db_id,
ctdb_fn_t fn, int id);
@ -991,7 +1005,8 @@ int32_t ctdb_ltdb_enable_seqnum(struct ctdb_context *ctdb, uint32_t db_id);
int32_t ctdb_ltdb_update_seqnum(struct ctdb_context *ctdb, uint32_t db_id, uint32_t srcnode);
int32_t ctdb_ltdb_set_seqnum_frequency(struct ctdb_context *ctdb, uint32_t frequency);
struct ctdb_rec_data *ctdb_marshall_record(TALLOC_CTX *mem_ctx, uint32_t reqid, TDB_DATA key, TDB_DATA data);
struct ctdb_rec_data *ctdb_marshall_record(TALLOC_CTX *mem_ctx, uint32_t reqid,
TDB_DATA key, struct ctdb_ltdb_header *, TDB_DATA data);
int32_t ctdb_control_pull_db(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DATA *outdata);
int32_t ctdb_control_push_db(struct ctdb_context *ctdb, TDB_DATA indata);
@ -1146,4 +1161,14 @@ int32_t ctdb_control_unregister_server_id(struct ctdb_context *ctdb,
int32_t ctdb_control_get_server_id_list(struct ctdb_context *ctdb,
TDB_DATA *outdata);
int ctdb_attach_persistent(struct ctdb_context *ctdb);
int32_t ctdb_control_persistent_store(struct ctdb_context *ctdb,
struct ctdb_req_control *c,
TDB_DATA recdata, bool *async_reply);
int32_t ctdb_control_update_record(struct ctdb_context *ctdb,
struct ctdb_req_control *c, TDB_DATA recdata,
bool *async_reply);
#endif

View File

@ -175,7 +175,10 @@ static int32_t ctdb_control_dispatch(struct ctdb_context *ctdb,
}
case CTDB_CONTROL_DB_ATTACH:
return ctdb_control_db_attach(ctdb, indata, outdata);
return ctdb_control_db_attach(ctdb, indata, outdata, false);
case CTDB_CONTROL_DB_ATTACH_PERSISTENT:
return ctdb_control_db_attach(ctdb, indata, outdata, true);
case CTDB_CONTROL_SET_CALL: {
struct ctdb_control_set_call *sc =
@ -312,6 +315,12 @@ static int32_t ctdb_control_dispatch(struct ctdb_context *ctdb,
CHECK_CONTROL_DATA_SIZE(0);
return ctdb_control_get_server_id_list(ctdb, outdata);
case CTDB_CONTROL_PERSISTENT_STORE:
return ctdb_control_persistent_store(ctdb, c, indata, async_reply);
case CTDB_CONTROL_UPDATE_RECORD:
return ctdb_control_update_record(ctdb, c, indata, async_reply);
default:
DEBUG(0,(__location__ " Unknown CTDB control opcode %u\n", opcode));
return -1;

View File

@ -92,56 +92,6 @@ static void ctdb_start_transport(struct ctdb_context *ctdb, int status, void *p)
ctdb_start_tcp_tickle_update(ctdb);
}
/* go into main ctdb loop */
static void ctdb_main_loop(struct ctdb_context *ctdb)
{
int ret = -1;
if (strcmp(ctdb->transport, "tcp") == 0) {
int ctdb_tcp_init(struct ctdb_context *);
ret = ctdb_tcp_init(ctdb);
}
#ifdef USE_INFINIBAND
if (strcmp(ctdb->transport, "ib") == 0) {
int ctdb_ibw_init(struct ctdb_context *);
ret = ctdb_ibw_init(ctdb);
}
#endif
if (ret != 0) {
DEBUG(0,("Failed to initialise transport '%s'\n", ctdb->transport));
return;
}
/* initialise the transport */
if (ctdb->methods->initialise(ctdb) != 0) {
DEBUG(0,("transport failed to initialise!\n"));
ctdb_fatal(ctdb, "transport failed to initialise");
}
/* tell all other nodes we've just started up */
ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL,
0, CTDB_CONTROL_STARTUP, 0,
CTDB_CTRL_FLAG_NOREPLY,
tdb_null, NULL, NULL);
/* release any IPs we hold from previous runs of the daemon */
ctdb_release_all_ips(ctdb);
ret = ctdb_event_script_callback(ctdb, timeval_zero(), ctdb,
ctdb_start_transport, NULL, "startup");
if (ret != 0) {
DEBUG(0,("Failed startup event script\n"));
return;
}
/* go into a wait loop to allow other nodes to complete */
event_loop_wait(ctdb->ev);
DEBUG(0,("event_loop_wait() returned. this should not happen\n"));
exit(1);
}
static void block_signal(int signum)
{
struct sigaction act;
@ -626,7 +576,7 @@ static void print_exit_message(void)
*/
int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork)
{
int res;
int res, ret = -1;
struct fd_event *fde;
const char *domain_socket_name;
@ -665,23 +615,66 @@ int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork)
ctdb->ev = event_context_init(NULL);
/* start frozen, then let the first election sort things out */
if (!ctdb_blocking_freeze(ctdb)) {
DEBUG(0,("Failed to get initial freeze\n"));
exit(12);
}
/* force initial recovery for election */
ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
if (strcmp(ctdb->transport, "tcp") == 0) {
int ctdb_tcp_init(struct ctdb_context *);
ret = ctdb_tcp_init(ctdb);
}
#ifdef USE_INFINIBAND
if (strcmp(ctdb->transport, "ib") == 0) {
int ctdb_ibw_init(struct ctdb_context *);
ret = ctdb_ibw_init(ctdb);
}
#endif
if (ret != 0) {
DEBUG(0,("Failed to initialise transport '%s'\n", ctdb->transport));
return -1;
}
/* initialise the transport */
if (ctdb->methods->initialise(ctdb) != 0) {
DEBUG(0,("transport failed to initialise!\n"));
ctdb_fatal(ctdb, "transport failed to initialise");
}
/* attach to any existing persistent databases */
if (ctdb_attach_persistent(ctdb) != 0) {
ctdb_fatal(ctdb, "Failed to attach to persistent databases\n");
}
/* start frozen, then let the first election sort things out */
if (!ctdb_blocking_freeze(ctdb)) {
ctdb_fatal(ctdb, "Failed to get initial freeze\n");
}
/* now start accepting clients, only can do this once frozen */
fde = event_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd,
EVENT_FD_READ|EVENT_FD_AUTOCLOSE,
ctdb_accept_client, ctdb);
ctdb_main_loop(ctdb);
/* tell all other nodes we've just started up */
ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL,
0, CTDB_CONTROL_STARTUP, 0,
CTDB_CTRL_FLAG_NOREPLY,
tdb_null, NULL, NULL);
return 0;
/* release any IPs we hold from previous runs of the daemon */
ctdb_release_all_ips(ctdb);
ret = ctdb_event_script_callback(ctdb, timeval_zero(), ctdb,
ctdb_start_transport, NULL, "startup");
if (ret != 0) {
DEBUG(0,("Failed startup event script\n"));
return -1;
}
/* go into a wait loop to allow other nodes to complete */
event_loop_wait(ctdb->ev);
DEBUG(0,("event_loop_wait() returned. this should not happen\n"));
exit(1);
}
/*

View File

@ -22,6 +22,7 @@
#include "lib/tdb/include/tdb.h"
#include "system/network.h"
#include "system/filesys.h"
#include "system/dir.h"
#include "../include/ctdb_private.h"
#include "db_wrap.h"
#include "lib/util/dlinklist.h"
@ -186,36 +187,16 @@ static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
}
}
/*
a client has asked to attach a new database
attach to a database, handling both persistent and non-persistent databases
return 0 on success, -1 on failure
*/
int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
TDB_DATA *outdata)
static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name, bool persistent)
{
const char *db_name = (const char *)indata.dptr;
struct ctdb_db_context *ctdb_db, *tmp_db;
struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
int ret;
/* If the node is inactive it is not part of the cluster
and we should not allow clients to attach to any
databases
*/
if (node->flags & NODE_FLAGS_INACTIVE) {
DEBUG(0,("DB Attach to database %s refused since node is inactive (disconnected or banned)\n", db_name));
return -1;
}
/* see if we already have this name */
for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
if (strcmp(db_name, tmp_db->db_name) == 0) {
/* this is not an error */
outdata->dptr = (uint8_t *)&tmp_db->db_id;
outdata->dsize = sizeof(tmp_db->db_id);
return 0;
}
}
struct TDB_DATA key;
ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
CTDB_NO_MEMORY(ctdb, ctdb_db);
@ -224,10 +205,10 @@ int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
ctdb_db->db_id = ctdb_hash(&indata);
outdata->dptr = (uint8_t *)&ctdb_db->db_id;
outdata->dsize = sizeof(ctdb_db->db_id);
key.dsize = strlen(db_name)+1;
key.dptr = discard_const(db_name);
ctdb_db->db_id = ctdb_hash(&key);
ctdb_db->persistent = persistent;
/* check for hash collisions */
for (tmp_db=ctdb->db_list;tmp_db;tmp_db=tmp_db->next) {
@ -251,21 +232,31 @@ int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
return -1;
}
if (persistent && mkdir(ctdb->db_directory_persistent, 0700) == -1 && errno != EEXIST) {
DEBUG(0,(__location__ " Unable to create ctdb persistent directory '%s'\n",
ctdb->db_directory_persistent));
talloc_free(ctdb_db);
return -1;
}
/* open the database */
ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u",
ctdb->db_directory,
persistent?ctdb->db_directory_persistent:ctdb->db_directory,
db_name, ctdb->pnn);
ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path,
ctdb->tunable.database_hash_size,
TDB_CLEAR_IF_FIRST, O_CREAT|O_RDWR, 0666);
persistent?TDB_DEFAULT:TDB_CLEAR_IF_FIRST,
O_CREAT|O_RDWR, 0666);
if (ctdb_db->ltdb == NULL) {
DEBUG(0,("Failed to open tdb '%s'\n", ctdb_db->db_path));
talloc_free(ctdb_db);
return -1;
}
ctdb_check_db_empty(ctdb_db);
if (!persistent) {
ctdb_check_db_empty(ctdb_db);
}
DLIST_ADD(ctdb->db_list, ctdb_db);
@ -290,18 +281,113 @@ int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
talloc_free(ctdb_db);
return -1;
}
DEBUG(1,("Attached to database '%s'\n", ctdb_db->db_path));
/* success */
return 0;
}
/*
a client has asked to attach a new database
*/
int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
TDB_DATA *outdata, bool persistent)
{
const char *db_name = (const char *)indata.dptr;
struct ctdb_db_context *db;
struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
/* If the node is inactive it is not part of the cluster
and we should not allow clients to attach to any
databases
*/
if (node->flags & NODE_FLAGS_INACTIVE) {
DEBUG(0,("DB Attach to database %s refused since node is inactive (disconnected or banned)\n", db_name));
return -1;
}
/* see if we already have this name */
db = ctdb_db_handle(ctdb, db_name);
if (db) {
outdata->dptr = (uint8_t *)&db->db_id;
outdata->dsize = sizeof(db->db_id);
return 0;
}
if (ctdb_local_attach(ctdb, db_name, persistent) != 0) {
return -1;
}
db = ctdb_db_handle(ctdb, db_name);
if (!db) {
DEBUG(0,("Failed to find db handle for name '%s'\n", db_name));
return -1;
}
outdata->dptr = (uint8_t *)&db->db_id;
outdata->dsize = sizeof(db->db_id);
/* tell all the other nodes about this database */
ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, 0,
CTDB_CONTROL_DB_ATTACH, 0, CTDB_CTRL_FLAG_NOREPLY,
indata, NULL, NULL);
DEBUG(1,("Attached to database '%s'\n", ctdb_db->db_path));
/* success */
return 0;
}
/*
attach to all existing persistent databases
*/
int ctdb_attach_persistent(struct ctdb_context *ctdb)
{
DIR *d;
struct dirent *de;
/* open the persistent db directory and scan it for files */
d = opendir(ctdb->db_directory_persistent);
if (d == NULL) {
return 0;
}
while ((de=readdir(d))) {
char *p, *s;
size_t len = strlen(de->d_name);
uint32_t node;
s = talloc_strdup(ctdb, de->d_name);
CTDB_NO_MEMORY(ctdb, s);
/* only accept names ending in .tdb */
p = strstr(s, ".tdb.");
if (len < 7 || p == NULL) {
talloc_free(s);
continue;
}
if (sscanf(p+5, "%u", &node) != 1 || node != ctdb->pnn) {
talloc_free(s);
continue;
}
p[4] = 0;
if (ctdb_local_attach(ctdb, s, true) != 0) {
DEBUG(0,("Failed to attach to persistent database '%s'\n", de->d_name));
closedir(d);
talloc_free(s);
return -1;
}
DEBUG(0,("Attached to persistent database %s\n", s));
talloc_free(s);
}
closedir(d);
return 0;
}
/*
called when a broadcast seqnum update comes in
*/

View File

@ -0,0 +1,262 @@
/*
persistent store logic
Copyright (C) Andrew Tridgell 2007
Copyright (C) Ronnie Sahlberg 2007
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, see <http://www.gnu.org/licenses/>.
*/
#include "includes.h"
#include "lib/events/events.h"
#include "system/filesys.h"
#include "system/wait.h"
#include "db_wrap.h"
#include "lib/tdb/include/tdb.h"
#include "../include/ctdb_private.h"
struct ctdb_persistent_state {
struct ctdb_context *ctdb;
struct ctdb_req_control *c;
const char *errormsg;
uint32_t num_pending;
int32_t status;
};
/*
called when a node has acknowledged a ctdb_control_update_record call
*/
static void ctdb_persistent_callback(struct ctdb_context *ctdb,
int32_t status, TDB_DATA data,
const char *errormsg,
void *private_data)
{
struct ctdb_persistent_state *state = talloc_get_type(private_data,
struct ctdb_persistent_state);
if (status != 0) {
DEBUG(0,("ctdb_persistent_callback failed with status %d (%s)\n",
status, errormsg));
state->status = status;
state->errormsg = errormsg;
}
state->num_pending--;
if (state->num_pending == 0) {
ctdb_request_control_reply(state->ctdb, state->c, NULL, state->status, state->errormsg);
talloc_free(state);
}
}
/*
store a persistent record - called from a ctdb client when it has updated
a record in a persistent database. The client will have the record
locked for the duration of this call. The client is the dmaster when
this call is made
*/
int32_t ctdb_control_persistent_store(struct ctdb_context *ctdb,
struct ctdb_req_control *c,
TDB_DATA recdata, bool *async_reply)
{
struct ctdb_persistent_state *state;
int i;
state = talloc_zero(c, struct ctdb_persistent_state);
CTDB_NO_MEMORY(ctdb, state);
state->ctdb = ctdb;
state->c = c;
for (i=0;i<ctdb->num_nodes;i++) {
struct ctdb_node *node = ctdb->nodes[i];
int ret;
/* only send to active nodes */
if (node->flags & NODE_FLAGS_INACTIVE) {
continue;
}
/* don't send to ourselves */
if (node->pnn == ctdb->pnn) {
continue;
}
ret = ctdb_daemon_send_control(ctdb, node->pnn, 0, CTDB_CONTROL_UPDATE_RECORD,
c->client_id, 0, recdata,
ctdb_persistent_callback, state);
if (ret == -1) {
DEBUG(0,("Unable to send CTDB_CONTROL_UPDATE_RECORD to pnn %u\n", node->pnn));
talloc_free(state);
return -1;
}
state->num_pending++;
}
if (state->num_pending == 0) {
talloc_free(state);
return 0;
}
/* we need to wait for the replies */
*async_reply = true;
return 0;
}
struct ctdb_persistent_lock_state {
struct ctdb_db_context *ctdb_db;
TDB_DATA key;
TDB_DATA data;
struct ctdb_ltdb_header *header;
struct tdb_context *tdb;
struct ctdb_req_control *c;
};
/*
called with a lock held in the current process
*/
static int ctdb_persistent_store(struct ctdb_persistent_lock_state *state)
{
struct ctdb_ltdb_header oldheader;
int ret;
/* fetch the old header and ensure the rsn is less than the new rsn */
ret = ctdb_ltdb_fetch(state->ctdb_db, state->key, &oldheader, NULL, NULL);
if (ret != 0) {
DEBUG(0,("Failed to fetch old record for db_id 0x%08x in ctdb_persistent_store\n",
state->ctdb_db->db_id));
return -1;
}
if (oldheader.rsn >= state->header->rsn) {
DEBUG(0,("existing header for db_id 0x%08x has larger RSN %llu than new RSN %llu in ctdb_persistent_store\n",
state->ctdb_db->db_id, oldheader.rsn, state->header->rsn));
return -1;
}
ret = ctdb_ltdb_store(state->ctdb_db, state->key, state->header, state->data);
if (ret != 0) {
DEBUG(0,("Failed to store record for db_id 0x%08x in ctdb_persistent_store\n",
state->ctdb_db->db_id));
return -1;
}
return 0;
}
/*
called when we get the lock on the given record
at this point the lockwait child holds a lock on our behalf
*/
static void ctdb_persistent_lock_callback(void *private_data)
{
struct ctdb_persistent_lock_state *state = talloc_get_type(private_data,
struct ctdb_persistent_lock_state);
int ret;
ret = tdb_chainlock_mark(state->tdb, state->key);
if (ret != 0) {
DEBUG(0,("Failed to mark lock in ctdb_persistent_lock_callback\n"));
ctdb_request_control_reply(state->ctdb_db->ctdb, state->c, NULL, ret, NULL);
return;
}
ret = ctdb_persistent_store(state);
ctdb_request_control_reply(state->ctdb_db->ctdb, state->c, NULL, ret, NULL);
tdb_chainlock_unmark(state->tdb, state->key);
}
/*
called if our lockwait child times out
*/
static void ctdb_persistent_lock_timeout(struct event_context *ev, struct timed_event *te,
struct timeval t, void *private_data)
{
struct ctdb_persistent_lock_state *state = talloc_get_type(private_data,
struct ctdb_persistent_lock_state);
ctdb_request_control_reply(state->ctdb_db->ctdb, state->c, NULL, -1, "timeout in ctdb_persistent_lock");
talloc_free(state);
}
/*
update a record on this node if the new record has a higher rsn than the
current record
*/
int32_t ctdb_control_update_record(struct ctdb_context *ctdb,
struct ctdb_req_control *c, TDB_DATA recdata,
bool *async_reply)
{
struct ctdb_rec_data *rec = (struct ctdb_rec_data *)&recdata.dptr[0];
int ret;
struct ctdb_db_context *ctdb_db;
uint32_t db_id = rec->reqid;
struct lockwait_handle *handle;
struct ctdb_persistent_lock_state *state;
if (ctdb->recovery_mode != CTDB_RECOVERY_NORMAL) {
DEBUG(0,("rejecting ctdb_control_update_record when recovery active\n"));
return -1;
}
ctdb_db = find_ctdb_db(ctdb, db_id);
if (ctdb_db == NULL) {
DEBUG(0,("Unknown database 0x%08x in ctdb_control_update_record\n", db_id));
return -1;
}
state = talloc(c, struct ctdb_persistent_lock_state);
CTDB_NO_MEMORY(ctdb, state);
state->ctdb_db = ctdb_db;
state->c = c;
state->tdb = ctdb_db->ltdb->tdb;
state->key.dptr = &rec->data[0];
state->key.dsize = rec->keylen;
state->data.dptr = &rec->data[rec->keylen];
state->data.dsize = rec->datalen;
if (state->data.dsize < sizeof(struct ctdb_ltdb_header)) {
DEBUG(0,("Invalid data size %u in ctdb_control_update_record\n", state->data.dsize));
return -1;
}
state->header = (struct ctdb_ltdb_header *)&state->data.dptr[0];
state->data.dptr += sizeof(struct ctdb_ltdb_header);
state->data.dsize -= sizeof(struct ctdb_ltdb_header);
/* try and do it without a lockwait */
ret = tdb_chainlock_nonblock(state->tdb, state->key);
if (ret == 0) {
ret = ctdb_persistent_store(state);
tdb_chainunlock(state->tdb, state->key);
return ret;
}
/* wait until we have a lock on this record */
handle = ctdb_lockwait(ctdb_db, state->key, ctdb_persistent_lock_callback, state);
if (handle == NULL) {
DEBUG(0,("Failed to setup lockwait handler in ctdb_control_update_record\n"));
return -1;
}
*async_reply = true;
event_add_timed(ctdb->ev, state, timeval_current_ofs(ctdb->tunable.control_timeout, 0),
ctdb_persistent_lock_timeout, state);
return 0;
}

View File

@ -125,7 +125,7 @@ ctdb_control_getdbmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indat
}
outdata->dsize = offsetof(struct ctdb_dbid_map, dbids) + 4*len;
outdata->dsize = offsetof(struct ctdb_dbid_map, dbs) + sizeof(dbid_map->dbs[0])*len;
outdata->dptr = (unsigned char *)talloc_zero_size(outdata, outdata->dsize);
if (!outdata->dptr) {
DEBUG(0, (__location__ " Failed to allocate dbmap array\n"));
@ -134,8 +134,9 @@ ctdb_control_getdbmap(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indat
dbid_map = (struct ctdb_dbid_map *)outdata->dptr;
dbid_map->num = len;
for(i=0,ctdb_db=ctdb->db_list;ctdb_db;i++,ctdb_db=ctdb_db->next){
dbid_map->dbids[i] = ctdb_db->db_id;
for (i=0,ctdb_db=ctdb->db_list;ctdb_db;i++,ctdb_db=ctdb_db->next){
dbid_map->dbs[i].dbid = ctdb_db->db_id;
dbid_map->dbs[i].persistent = ctdb_db->persistent;
}
return 0;
@ -254,7 +255,7 @@ int32_t ctdb_control_pull_db(struct ctdb_context *ctdb, TDB_DATA indata, TDB_DAT
for (i=0;i<reply->count;i++) {
struct ctdb_rec_data *rec;
rec = ctdb_marshall_record(outdata, 0, params.recs[i].key, params.recs[i].data);
rec = ctdb_marshall_record(outdata, 0, params.recs[i].key, NULL, params.recs[i].data);
reply = talloc_realloc_size(outdata, reply, rec->length + len);
memcpy(len+(uint8_t *)reply, rec, rec->length);
len += rec->length;

View File

@ -301,7 +301,7 @@ static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctd
for (i=0;i<remote_dbmap->num;i++) {
if (dbmap->dbids[db] == remote_dbmap->dbids[i]) {
if (dbmap->dbs[db].dbid == remote_dbmap->dbs[i].dbid) {
break;
}
}
@ -310,12 +310,14 @@ static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctd
continue;
}
/* ok so we need to create this database */
ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn, dbmap->dbids[db], mem_ctx, &name);
ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn, dbmap->dbs[db].dbid,
mem_ctx, &name);
if (ret != 0) {
DEBUG(0, (__location__ " Unable to get dbname from node %u\n", pnn));
return -1;
}
ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, name);
ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn,
mem_ctx, name, dbmap->dbs[db].persistent);
if (ret != 0) {
DEBUG(0, (__location__ " Unable to create remote db:%s\n", name));
return -1;
@ -359,7 +361,7 @@ static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb
const char *name;
for (i=0;i<(*dbmap)->num;i++) {
if (remote_dbmap->dbids[db] == (*dbmap)->dbids[i]) {
if (remote_dbmap->dbs[db].dbid == (*dbmap)->dbs[i].dbid) {
break;
}
}
@ -371,13 +373,14 @@ static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb
rebuild dbmap
*/
ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn,
remote_dbmap->dbids[db], mem_ctx, &name);
remote_dbmap->dbs[db].dbid, mem_ctx, &name);
if (ret != 0) {
DEBUG(0, (__location__ " Unable to get dbname from node %u\n",
nodemap->nodes[j].pnn));
return -1;
}
ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name);
ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name,
remote_dbmap->dbs[db].persistent);
if (ret != 0) {
DEBUG(0, (__location__ " Unable to create local db:%s\n", name));
return -1;
@ -416,7 +419,7 @@ static int pull_all_remote_databases(struct ctdb_context *ctdb, struct ctdb_node
continue;
}
ret = ctdb_ctrl_copydb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn,
pnn, dbmap->dbids[i], CTDB_LMASTER_ANY, mem_ctx);
pnn, dbmap->dbs[i].dbid, CTDB_LMASTER_ANY, mem_ctx);
if (ret != 0) {
DEBUG(0, (__location__ " Unable to copy db from node %u to node %u\n",
nodemap->nodes[j].pnn, pnn));
@ -444,9 +447,11 @@ static int update_dmaster_on_all_databases(struct ctdb_context *ctdb, struct ctd
if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
continue;
}
ret = ctdb_ctrl_setdmaster(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, ctdb, dbmap->dbids[i], pnn);
ret = ctdb_ctrl_setdmaster(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn,
ctdb, dbmap->dbs[i].dbid, pnn);
if (ret != 0) {
DEBUG(0, (__location__ " Unable to set dmaster for node %u db:0x%08x\n", nodemap->nodes[j].pnn, dbmap->dbids[i]));
DEBUG(0, (__location__ " Unable to set dmaster for node %u db:0x%08x\n",
nodemap->nodes[j].pnn, dbmap->dbs[i].dbid));
return -1;
}
}
@ -537,7 +542,7 @@ static int vacuum_all_databases(struct ctdb_context *ctdb, struct ctdb_node_map
/* update dmaster to point to this node for all databases/nodes */
for (i=0;i<dbmap->num;i++) {
if (vacuum_db(ctdb, dbmap->dbids[i], nodemap) != 0) {
if (vacuum_db(ctdb, dbmap->dbs[i].dbid, nodemap) != 0) {
return -1;
}
}
@ -565,7 +570,7 @@ static int push_all_local_databases(struct ctdb_context *ctdb, struct ctdb_node_
continue;
}
ret = ctdb_ctrl_copydb(ctdb, CONTROL_TIMEOUT(), pnn, nodemap->nodes[j].pnn,
dbmap->dbids[i], CTDB_LMASTER_ANY, mem_ctx);
dbmap->dbs[i].dbid, CTDB_LMASTER_ANY, mem_ctx);
if (ret != 0) {
DEBUG(0, (__location__ " Unable to copy db from node %u to node %u\n",
pnn, nodemap->nodes[j].pnn));
@ -919,6 +924,9 @@ static int do_recovery(struct ctdb_recoverd *rec,
DEBUG(1, (__location__ " Recovery - done takeover\n"));
}
for (i=0;i<dbmap->num;i++) {
DEBUG(0,("Recovered database with db_id 0x%08x\n", dbmap->dbs[i].dbid));
}
/* disable recovery mode */
ret = set_recovery_mode(ctdb, nodemap, CTDB_RECOVERY_NORMAL);
@ -1805,8 +1813,6 @@ static void ctdb_recoverd_parent(struct event_context *ev, struct fd_event *fde,
_exit(1);
}
/*
startup the recovery daemon as a child of the main ctdb daemon
*/

View File

@ -81,6 +81,18 @@ int ctdb_set_tdb_dir(struct ctdb_context *ctdb, const char *dir)
return 0;
}
/*
set the directory for the persistent databases
*/
int ctdb_set_tdb_dir_persistent(struct ctdb_context *ctdb, const char *dir)
{
ctdb->db_directory_persistent = talloc_strdup(ctdb, dir);
if (ctdb->db_directory_persistent == NULL) {
return -1;
}
return 0;
}
/*
add a node to the list of active nodes
*/

View File

@ -95,7 +95,7 @@ static int ctdb_traverse_local_fn(struct tdb_context *tdb, TDB_DATA key, TDB_DAT
return 0;
}
d = ctdb_marshall_record(h, 0, key, data);
d = ctdb_marshall_record(h, 0, key, NULL, data);
if (d == NULL) {
/* error handling is tricky in this child code .... */
return -1;
@ -280,7 +280,7 @@ static void traverse_all_callback(void *p, TDB_DATA key, TDB_DATA data)
struct ctdb_rec_data *d;
TDB_DATA cdata;
d = ctdb_marshall_record(state, state->reqid, key, data);
d = ctdb_marshall_record(state, state->reqid, key, NULL, data);
if (d == NULL) {
/* darn .... */
DEBUG(0,("Out of memory in traverse_all_callback\n"));
@ -408,7 +408,7 @@ static void traverse_start_callback(void *p, TDB_DATA key, TDB_DATA data)
state = talloc_get_type(p, struct traverse_start_state);
d = ctdb_marshall_record(state, state->reqid, key, data);
d = ctdb_marshall_record(state, state->reqid, key, NULL, data);
if (d == NULL) {
return;
}

View File

@ -46,6 +46,7 @@ static struct {
const char *logfile;
const char *recovery_lock_file;
const char *db_dir;
const char *db_dir_persistent;
const char *public_interface;
int no_setsched;
} options = {
@ -54,6 +55,7 @@ static struct {
.event_script_dir = ETCDIR "/ctdb/events.d",
.logfile = VARDIR "/log/log.ctdb",
.db_dir = VARDIR "/ctdb",
.db_dir_persistent = VARDIR "/ctdb/persistent",
};
@ -108,6 +110,7 @@ int main(int argc, const char *argv[])
{ "listen", 0, POPT_ARG_STRING, &options.myaddress, 0, "address to listen on", "address" },
{ "transport", 0, POPT_ARG_STRING, &options.transport, 0, "protocol transport", NULL },
{ "dbdir", 0, POPT_ARG_STRING, &options.db_dir, 0, "directory for the tdb files", NULL },
{ "dbdir-persistent", 0, POPT_ARG_STRING, &options.db_dir_persistent, 0, "directory for persistent tdb files", NULL },
{ "reclock", 0, POPT_ARG_STRING, &options.recovery_lock_file, 0, "location of recovery lock file", "filename" },
{ "nosetsched", 0, POPT_ARG_NONE, &options.no_setsched, 0, "disable setscheduler SCHED_FIFO call", NULL },
POPT_TABLEEND
@ -199,6 +202,13 @@ int main(int argc, const char *argv[])
exit(1);
}
}
if (options.db_dir_persistent) {
ret = ctdb_set_tdb_dir_persistent(ctdb, options.db_dir_persistent);
if (ret == -1) {
DEBUG(0,("ctdb_set_tdb_dir_persistent failed - %s\n", ctdb_errstr(ctdb)));
exit(1);
}
}
if (options.public_interface) {
ctdb->default_public_interface = talloc_strdup(ctdb, options.public_interface);

View File

@ -201,7 +201,7 @@ int main(int argc, const char *argv[])
&cluster_ready);
/* attach to a specific database */
ctdb_db = ctdb_attach(ctdb, "test.tdb");
ctdb_db = ctdb_attach(ctdb, "test.tdb", false);
if (!ctdb_db) {
printf("ctdb_attach failed - %s\n", ctdb_errstr(ctdb));
exit(1);

View File

@ -219,7 +219,7 @@ int main(int argc, const char *argv[])
&cluster_ready);
/* attach to a specific database */
ctdb_db = ctdb_attach(ctdb, "test.tdb");
ctdb_db = ctdb_attach(ctdb, "test.tdb", false);
if (!ctdb_db) {
printf("ctdb_attach failed - %s\n", ctdb_errstr(ctdb));
exit(1);

View File

@ -0,0 +1,139 @@
/*
simple tool to test persistent databases
Copyright (C) Andrew Tridgell 2006-2007
Copyright (c) Ronnie sahlberg 2007
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, see <http://www.gnu.org/licenses/>.
*/
#include "includes.h"
#include "lib/events/events.h"
#include "system/filesys.h"
#include "popt.h"
#include "cmdline.h"
#include <sys/time.h>
#include <time.h>
static void test_store_records(struct ctdb_context *ctdb, struct event_context *ev)
{
TDB_DATA key, data;
struct ctdb_db_context *ctdb_db;
TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
int ret, i;
struct ctdb_record_handle *h;
unsigned node=0, count=0;
ctdb_db = ctdb_db_handle(ctdb, "persistent.tdb");
key.dptr = discard_const("testkey");
key.dsize = strlen((const char *)key.dptr)+1;
for (i=0;i<10;i++) {
h = ctdb_fetch_lock(ctdb_db, tmp_ctx, key, &data);
if (h == NULL) {
printf("Failed to fetch record '%s' on node %d\n",
(const char *)key.dptr, ctdb_get_pnn(ctdb));
talloc_free(tmp_ctx);
return;
}
printf("Current value: %*.*s\n", data.dsize, data.dsize, data.dptr);
if (data.dsize != 0) {
if (sscanf((char *)data.dptr, "Node %u Count %u", &node, &count) != 2) {
printf("Badly formatted node data!\n");
exit(1);
}
}
node = ctdb_get_pnn(ctdb);
count++;
data.dptr = (uint8_t *)talloc_asprintf(h, "Node %u Count %u", node, count);
data.dsize = strlen((char *)data.dptr)+1;
ret = ctdb_record_store(h, data);
if (ret != 0) {
DEBUG(0,("Failed to store record\n"));
exit(1);
}
talloc_free(h);
}
talloc_free(tmp_ctx);
}
/*
main program
*/
int main(int argc, const char *argv[])
{
struct ctdb_context *ctdb;
struct ctdb_db_context *ctdb_db;
struct poptOption popt_options[] = {
POPT_AUTOHELP
POPT_CTDB_CMDLINE
POPT_TABLEEND
};
int opt;
const char **extra_argv;
int extra_argc = 0;
poptContext pc;
struct event_context *ev;
pc = poptGetContext(argv[0], argc, argv, popt_options, POPT_CONTEXT_KEEP_FIRST);
while ((opt = poptGetNextOpt(pc)) != -1) {
switch (opt) {
default:
fprintf(stderr, "Invalid option %s: %s\n",
poptBadOption(pc, 0), poptStrerror(opt));
exit(1);
}
}
/* setup the remaining options for the main program to use */
extra_argv = poptGetArgs(pc);
if (extra_argv) {
extra_argv++;
while (extra_argv[extra_argc]) extra_argc++;
}
ev = event_context_init(NULL);
ctdb = ctdb_cmdline_client(ev);
/* attach to a specific database */
ctdb_db = ctdb_attach(ctdb, "persistent.tdb", true);
if (!ctdb_db) {
printf("ctdb_attach failed - %s\n", ctdb_errstr(ctdb));
exit(1);
}
printf("Waiting for cluster\n");
while (1) {
uint32_t recmode=1;
ctdb_ctrl_getrecmode(ctdb, ctdb, timeval_zero(), CTDB_CURRENT_NODE, &recmode);
if (recmode == 0) break;
event_loop_once(ev);
}
printf("Starting test\n");
test_store_records(ctdb, ev);
return 0;
}

View File

@ -136,7 +136,7 @@ int main(int argc, const char *argv[])
ctdb = ctdb_cmdline_client(ev);
/* attach to a specific database */
ctdb_db = ctdb_attach(ctdb, "test.tdb");
ctdb_db = ctdb_attach(ctdb, "test.tdb", false);
if (!ctdb_db) {
printf("ctdb_attach failed - %s\n", ctdb_errstr(ctdb));
exit(1);

24
ctdb/tests/persistent.sh Executable file
View File

@ -0,0 +1,24 @@
#!/bin/sh
NUMNODES=2
if [ $# -gt 0 ]; then
NUMNODES=$1
fi
rm -f nodes.txt
for i in `seq 1 $NUMNODES`; do
echo 127.0.0.$i >> nodes.txt
done
tests/start_daemons.sh $NUMNODES nodes.txt || exit 1
killall -9 -q ctdb_persistent
for i in `seq 1 $NUMNODES`; do
$VALGRIND bin/ctdb_persistent --socket sock.$i $* &
done
wait
echo "Shutting down"
bin/ctdb shutdown -n all --socket=sock.1
exit 0

View File

@ -7,7 +7,7 @@ shift
killall -q ctdbd
CTDB_OPTIONS="--reclock=rec.lock --nlist $NODES --event-script-dir=tests/events.d --logfile=- --dbdir=test.db $*"
CTDB_OPTIONS="--reclock=rec.lock --nlist $NODES --event-script-dir=tests/events.d --logfile=- --dbdir=test.db --dbdir-persistent=test.db/persistent $*"
if [ `id -u` -eq 0 ]; then
CTDB_OPTIONS="$CTDB_OPTIONS --public-addresses=tests/public_addresses --public-interface=lo"
fi

View File

@ -762,7 +762,7 @@ static int control_catdb(struct ctdb_context *ctdb, int argc, const char **argv)
}
db_name = argv[0];
ctdb_db = ctdb_attach(ctdb, db_name);
ctdb_db = ctdb_attach(ctdb, db_name, false);
if (ctdb_db == NULL) {
DEBUG(0,("Unable to attach to database '%s'\n", db_name));
@ -800,10 +800,13 @@ static int control_getdbmap(struct ctdb_context *ctdb, int argc, const char **ar
for(i=0;i<dbmap->num;i++){
const char *path;
const char *name;
bool persistent;
ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn, dbmap->dbids[i], ctdb, &path);
ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbids[i], ctdb, &name);
printf("dbid:0x%08x name:%s path:%s\n", dbmap->dbids[i], name, path);
ctdb_ctrl_getdbpath(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &path);
ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), options.pnn, dbmap->dbs[i].dbid, ctdb, &name);
persistent = dbmap->dbs[i].persistent;
printf("dbid:0x%08x name:%s path:%s %s\n", dbmap->dbs[i].dbid, name,
path, persistent?"PERSISTENT":"");
}
return 0;
@ -982,7 +985,7 @@ static int control_attach(struct ctdb_context *ctdb, int argc, const char **argv
}
db_name = argv[0];
ctdb_db = ctdb_attach(ctdb, db_name);
ctdb_db = ctdb_attach(ctdb, db_name, false);
if (ctdb_db == NULL) {
DEBUG(0,("Unable to attach to database '%s'\n", db_name));
return -1;