Copyright (C) Andrew Tridgell 2007
Copyright (C) Ronnie sahlberg 2007
diff --git a/ctdb/include/ctdb.h b/ctdb/include/ctdb.h
index 14f75b4c822..f8d0db3d174 100644
--- a/ctdb/include/ctdb.h
+++ b/ctdb/include/ctdb.h
@@ -85,6 +85,11 @@ struct ctdb_call_info {
*/
#define CTDB_SRVID_UNBAN_NODE 0xF600000000000000LL
+/*
+ a message to tell the recovery daemon to fetch a set of records
+ */
+#define CTDB_SRVID_VACUUM_FETCH 0xF700000000000000LL
+
/* used on the domain socket, send a pdu to the local daemon */
#define CTDB_CURRENT_NODE 0xF0000001
diff --git a/ctdb/include/ctdb_private.h b/ctdb/include/ctdb_private.h
index 07dfcbcd68a..f59ffeeed0d 100644
--- a/ctdb/include/ctdb_private.h
+++ b/ctdb/include/ctdb_private.h
@@ -366,6 +366,7 @@ struct ctdb_context {
struct _trbt_tree_t *server_ids;
const char *event_script_dir;
const char *default_public_interface;
+ pid_t ctdbd_pid;
pid_t recoverd_pid;
bool done_startup;
const char *node_ip;
@@ -483,6 +484,7 @@ enum ctdb_controls {CTDB_CONTROL_PROCESS_EXISTS = 0,
CTDB_CONTROL_TRANSACTION_START = 65,
CTDB_CONTROL_TRANSACTION_COMMIT = 66,
CTDB_CONTROL_WIPE_DATABASE = 67,
+ CTDB_CONTROL_DELETE_RECORD = 68,
};
/*
@@ -1001,6 +1003,21 @@ struct ctdb_control_wipe_database {
uint32_t transaction_id;
};
+/*
+ state of a in-progress ctdb call in client
+*/
+struct ctdb_client_call_state {
+ enum call_state state;
+ uint32_t reqid;
+ struct ctdb_db_context *ctdb_db;
+ struct ctdb_call call;
+ struct {
+ void (*fn)(struct ctdb_client_call_state *);
+ void *private;
+ } async;
+};
+
+
int32_t ctdb_control_traverse_start(struct ctdb_context *ctdb, TDB_DATA indata,
TDB_DATA *outdata, uint32_t srcnode);
int32_t ctdb_control_traverse_all(struct ctdb_context *ctdb, TDB_DATA data, TDB_DATA *outdata);
@@ -1186,4 +1203,11 @@ int32_t ctdb_control_transaction_start(struct ctdb_context *ctdb, uint32_t id);
int32_t ctdb_control_transaction_commit(struct ctdb_context *ctdb, uint32_t id);
int32_t ctdb_control_wipe_database(struct ctdb_context *ctdb, TDB_DATA indata);
+
+int ctdb_vacuum(struct ctdb_context *ctdb, int argc, const char **argv);
+int ctdb_repack(struct ctdb_context *ctdb, int argc, const char **argv);
+
+int32_t ctdb_control_delete_record(struct ctdb_context *ctdb, TDB_DATA indata);
+
+
#endif
diff --git a/ctdb/lib/tdb/common/freelist.c b/ctdb/lib/tdb/common/freelist.c
index 48e64c2b4cf..358545ed575 100644
--- a/ctdb/lib/tdb/common/freelist.c
+++ b/ctdb/lib/tdb/common/freelist.c
@@ -342,3 +342,26 @@ tdb_off_t tdb_allocate(struct tdb_context *tdb, tdb_len_t length, struct list_st
return 0;
}
+
+
+/*
+ return the size of the freelist - used to decide if we should repack
+*/
+int tdb_freelist_size(struct tdb_context *tdb)
+{
+ tdb_off_t ptr;
+ int count=0;
+
+ if (tdb_lock(tdb, -1, F_RDLCK) == -1) {
+ return -1;
+ }
+
+ ptr = FREELIST_TOP;
+ while (ptr != 0 && tdb_ofs_read(tdb, ptr, &ptr) == 0) {
+ count++;
+
+ }
+
+ tdb_unlock(tdb, -1, F_RDLCK);
+ return count;
+}
diff --git a/ctdb/lib/tdb/include/tdb.h b/ctdb/lib/tdb/include/tdb.h
index f6d4b4b1f45..371381049e9 100644
--- a/ctdb/lib/tdb/include/tdb.h
+++ b/ctdb/lib/tdb/include/tdb.h
@@ -156,6 +156,7 @@ void tdb_dump_all(struct tdb_context *tdb);
int tdb_printfreelist(struct tdb_context *tdb);
int tdb_validate_freelist(struct tdb_context *tdb, int *pnum_entries);
int tdb_wipe_all(struct tdb_context *tdb);
+int tdb_freelist_size(struct tdb_context *tdb);
extern TDB_DATA tdb_null;
diff --git a/ctdb/packaging/RPM/ctdb.spec b/ctdb/packaging/RPM/ctdb.spec
index d8fa622d36f..410a1044e82 100644
--- a/ctdb/packaging/RPM/ctdb.spec
+++ b/ctdb/packaging/RPM/ctdb.spec
@@ -5,7 +5,7 @@ Vendor: Samba Team
Packager: Samba Team
Name: ctdb
Version: 1.0
-Release: 20
+Release: 21
Epoch: 0
License: GNU GPL version 3
Group: System Environment/Daemons
@@ -118,6 +118,8 @@ fi
%{_includedir}/ctdb_private.h
%changelog
+* Wed Jan 09 2008 : Version 1.0.21
+ - added ctdb vacuum and ctdb repack code
* Sun Jan 06 2008 : Version 1.0.20
- new transaction based recovery code
* Sat Jan 05 2008 : Version 1.0.19
diff --git a/ctdb/server/ctdb_control.c b/ctdb/server/ctdb_control.c
index f2fd6ee641f..4e013a530ea 100644
--- a/ctdb/server/ctdb_control.c
+++ b/ctdb/server/ctdb_control.c
@@ -321,6 +321,9 @@ static int32_t ctdb_control_dispatch(struct ctdb_context *ctdb,
CHECK_CONTROL_DATA_SIZE(sizeof(struct ctdb_control_wipe_database));
return ctdb_control_wipe_database(ctdb, indata);
+ case CTDB_CONTROL_DELETE_RECORD:
+ return ctdb_control_delete_record(ctdb, indata);
+
default:
DEBUG(0,(__location__ " Unknown CTDB control opcode %u\n", opcode));
return -1;
diff --git a/ctdb/server/ctdb_recover.c b/ctdb/server/ctdb_recover.c
index 637894dfd15..3bc7a3744c1 100644
--- a/ctdb/server/ctdb_recover.c
+++ b/ctdb/server/ctdb_recover.c
@@ -509,6 +509,11 @@ int32_t ctdb_control_set_recmode(struct ctdb_context *ctdb,
return -1;
}
+ if (recmode != ctdb->recovery_mode) {
+ DEBUG(0,(__location__ " Recovery mode set to %s\n",
+ recmode==CTDB_RECOVERY_NORMAL?"NORMAL":"ACTIVE"));
+ }
+
if (recmode != CTDB_RECOVERY_NORMAL ||
ctdb->recovery_mode != CTDB_RECOVERY_ACTIVE) {
ctdb->recovery_mode = recmode;
@@ -631,3 +636,106 @@ bool ctdb_recovery_lock(struct ctdb_context *ctdb, bool keep)
}
+/*
+ delete a record as part of the vacuum process
+ only delete if we are not lmaster or dmaster, and our rsn is <= the provided rsn
+ use non-blocking locks
+ */
+int32_t ctdb_control_delete_record(struct ctdb_context *ctdb, TDB_DATA indata)
+{
+ struct ctdb_rec_data *rec = (struct ctdb_rec_data *)indata.dptr;
+ struct ctdb_db_context *ctdb_db;
+ TDB_DATA key, data;
+ struct ctdb_ltdb_header *hdr, *hdr2;
+
+ /* these are really internal tdb functions - but we need them here for
+ non-blocking lock of the freelist */
+ int tdb_lock_nonblock(struct tdb_context *tdb, int list, int ltype);
+ int tdb_unlock(struct tdb_context *tdb, int list, int ltype);
+
+ if (indata.dsize < sizeof(uint32_t) || indata.dsize != rec->length) {
+ DEBUG(0,(__location__ " Bad record size in ctdb_control_delete_record\n"));
+ return -1;
+ }
+
+ ctdb_db = find_ctdb_db(ctdb, rec->reqid);
+ if (!ctdb_db) {
+ DEBUG(0,(__location__ " Unknown db 0x%08x\n", rec->reqid));
+ return -1;
+ }
+
+ key.dsize = rec->keylen;
+ key.dptr = &rec->data[0];
+ data.dsize = rec->datalen;
+ data.dptr = &rec->data[rec->keylen];
+
+ if (ctdb_lmaster(ctdb, &key) == ctdb->pnn) {
+ DEBUG(2,(__location__ " Called delete on record where we are lmaster\n"));
+ return -1;
+ }
+
+ if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
+ DEBUG(0,(__location__ " Bad record size\n"));
+ return -1;
+ }
+
+ hdr = (struct ctdb_ltdb_header *)data.dptr;
+
+ /* use a non-blocking lock */
+ if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, key) != 0) {
+ return -1;
+ }
+
+ data = tdb_fetch(ctdb_db->ltdb->tdb, key);
+ if (data.dptr == NULL) {
+ tdb_chainunlock(ctdb_db->ltdb->tdb, key);
+ return 0;
+ }
+
+ if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
+ if (tdb_lock_nonblock(ctdb_db->ltdb->tdb, -1, F_WRLCK) == 0) {
+ tdb_delete(ctdb_db->ltdb->tdb, key);
+ tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
+ DEBUG(0,(__location__ " Deleted corrupt record\n"));
+ }
+ tdb_chainunlock(ctdb_db->ltdb->tdb, key);
+ free(data.dptr);
+ return 0;
+ }
+
+ hdr2 = (struct ctdb_ltdb_header *)data.dptr;
+
+ if (hdr2->rsn > hdr->rsn) {
+ tdb_chainunlock(ctdb_db->ltdb->tdb, key);
+ DEBUG(2,(__location__ " Skipping record with rsn=%llu - called with rsn=%llu\n",
+ (unsigned long long)hdr2->rsn, (unsigned long long)hdr->rsn));
+ free(data.dptr);
+ return -1;
+ }
+
+ if (hdr2->dmaster == ctdb->pnn) {
+ tdb_chainunlock(ctdb_db->ltdb->tdb, key);
+ DEBUG(2,(__location__ " Attempted delete record where we are the dmaster\n"));
+ free(data.dptr);
+ return -1;
+ }
+
+ if (tdb_lock_nonblock(ctdb_db->ltdb->tdb, -1, F_WRLCK) != 0) {
+ tdb_chainunlock(ctdb_db->ltdb->tdb, key);
+ free(data.dptr);
+ return -1;
+ }
+
+ if (tdb_delete(ctdb_db->ltdb->tdb, key) != 0) {
+ tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
+ tdb_chainunlock(ctdb_db->ltdb->tdb, key);
+ DEBUG(2,(__location__ " Failed to delete record\n"));
+ free(data.dptr);
+ return -1;
+ }
+
+ tdb_unlock(ctdb_db->ltdb->tdb, -1, F_WRLCK);
+ tdb_chainunlock(ctdb_db->ltdb->tdb, key);
+ free(data.dptr);
+ return 0;
+}
diff --git a/ctdb/server/ctdb_recoverd.c b/ctdb/server/ctdb_recoverd.c
index fa19a975a82..eeb6b77561b 100644
--- a/ctdb/server/ctdb_recoverd.c
+++ b/ctdb/server/ctdb_recoverd.c
@@ -28,6 +28,7 @@
#include "../include/ctdb.h"
#include "../include/ctdb_private.h"
#include "db_wrap.h"
+#include "dlinklist.h"
struct ban_state {
@@ -50,6 +51,7 @@ struct ctdb_recoverd {
uint32_t node_flags;
struct timed_event *send_election_te;
struct timed_event *election_timeout;
+ struct vacuum_info *vacuum_info;
};
#define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
@@ -701,6 +703,190 @@ static void unban_handler(struct ctdb_context *ctdb, uint64_t srvid,
}
+struct vacuum_info {
+ struct vacuum_info *next, *prev;
+ struct ctdb_recoverd *rec;
+ uint32_t srcnode;
+ struct ctdb_db_context *ctdb_db;
+ struct ctdb_control_pulldb_reply *recs;
+ struct ctdb_rec_data *r;
+};
+
+static void vacuum_fetch_next(struct vacuum_info *v);
+
+/*
+ called when a vacuum fetch has completed - just free it and do the next one
+ */
+static void vacuum_fetch_callback(struct ctdb_client_call_state *state)
+{
+ struct vacuum_info *v = talloc_get_type(state->async.private, struct vacuum_info);
+ talloc_free(state);
+ vacuum_fetch_next(v);
+}
+
+
+/*
+ process the next element from the vacuum list
+*/
+static void vacuum_fetch_next(struct vacuum_info *v)
+{
+ struct ctdb_call call;
+ struct ctdb_rec_data *r;
+
+ while (v->recs->count) {
+ struct ctdb_client_call_state *state;
+ TDB_DATA data;
+ struct ctdb_ltdb_header *hdr;
+
+ ZERO_STRUCT(call);
+ call.call_id = CTDB_NULL_FUNC;
+ call.flags = CTDB_IMMEDIATE_MIGRATION;
+
+ r = v->r;
+ v->r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
+ v->recs->count--;
+
+ call.key.dptr = &r->data[0];
+ call.key.dsize = r->keylen;
+
+ /* ensure we don't block this daemon - just skip a record if we can't get
+ the chainlock */
+ if (tdb_chainlock_nonblock(v->ctdb_db->ltdb->tdb, call.key) != 0) {
+ continue;
+ }
+
+ data = tdb_fetch(v->ctdb_db->ltdb->tdb, call.key);
+ if (data.dptr == NULL || data.dsize < sizeof(struct ctdb_ltdb_header)) {
+ tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
+ continue;
+ }
+
+ hdr = (struct ctdb_ltdb_header *)data.dptr;
+ if (hdr->dmaster == v->rec->ctdb->pnn) {
+ /* its already local */
+ tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
+ continue;
+ }
+
+ state = ctdb_call_send(v->ctdb_db, &call);
+ tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
+ if (state == NULL) {
+ DEBUG(0,(__location__ " Failed to setup vacuum fetch call\n"));
+ talloc_free(v);
+ return;
+ }
+ state->async.fn = vacuum_fetch_callback;
+ state->async.private = v;
+ return;
+ }
+
+ talloc_free(v);
+}
+
+
+/*
+ destroy a vacuum info structure
+ */
+static int vacuum_info_destructor(struct vacuum_info *v)
+{
+ DLIST_REMOVE(v->rec->vacuum_info, v);
+ return 0;
+}
+
+
+/*
+ handler for vacuum fetch
+*/
+static void vacuum_fetch_handler(struct ctdb_context *ctdb, uint64_t srvid,
+ TDB_DATA data, void *private_data)
+{
+ struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
+ struct ctdb_control_pulldb_reply *recs;
+ int ret, i;
+ TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
+ const char *name;
+ struct ctdb_dbid_map *dbmap=NULL;
+ bool persistent = false;
+ struct ctdb_db_context *ctdb_db;
+ struct ctdb_rec_data *r;
+ uint32_t srcnode;
+ struct vacuum_info *v;
+
+ recs = (struct ctdb_control_pulldb_reply *)data.dptr;
+ r = (struct ctdb_rec_data *)&recs->data[0];
+
+ if (recs->count == 0) {
+ return;
+ }
+
+ srcnode = r->reqid;
+
+ for (v=rec->vacuum_info;v;v=v->next) {
+ if (srcnode == v->srcnode) {
+ /* we're already working on records from this node */
+ return;
+ }
+ }
+
+ /* work out if the database is persistent */
+ ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &dbmap);
+ if (ret != 0) {
+ DEBUG(0, (__location__ " Unable to get dbids from local node\n"));
+ talloc_free(tmp_ctx);
+ return;
+ }
+
+ for (i=0;inum;i++) {
+ if (dbmap->dbs[i].dbid == recs->db_id) {
+ persistent = dbmap->dbs[i].persistent;
+ break;
+ }
+ }
+ if (i == dbmap->num) {
+ DEBUG(0, (__location__ " Unable to find db_id 0x%x on local node\n", recs->db_id));
+ talloc_free(tmp_ctx);
+ return;
+ }
+
+ /* find the name of this database */
+ if (ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, recs->db_id, tmp_ctx, &name) != 0) {
+ DEBUG(0,(__location__ " Failed to get name of db 0x%x\n", recs->db_id));
+ talloc_free(tmp_ctx);
+ return;
+ }
+
+ /* attach to it */
+ ctdb_db = ctdb_attach(ctdb, name, persistent);
+ if (ctdb_db == NULL) {
+ DEBUG(0,(__location__ " Failed to attach to database '%s'\n", name));
+ talloc_free(tmp_ctx);
+ return;
+ }
+
+ v = talloc_zero(rec, struct vacuum_info);
+ if (v == NULL) {
+ DEBUG(0,(__location__ " Out of memory\n"));
+ return;
+ }
+
+ v->rec = rec;
+ v->srcnode = srcnode;
+ v->ctdb_db = ctdb_db;
+ v->recs = talloc_memdup(v, recs, data.dsize);
+ if (v->recs == NULL) {
+ DEBUG(0,(__location__ " Out of memory\n"));
+ talloc_free(v);
+ return;
+ }
+ v->r = (struct ctdb_rec_data *)&v->recs->data[0];
+
+ DLIST_ADD(rec->vacuum_info, v);
+
+ talloc_set_destructor(v, vacuum_info_destructor);
+
+ vacuum_fetch_next(v);
+}
+
/*
called when ctdb_wait_timeout should finish
@@ -1806,6 +1992,9 @@ static void monitor_cluster(struct ctdb_context *ctdb)
/* and one for when nodes are unbanned */
ctdb_set_message_handler(ctdb, CTDB_SRVID_UNBAN_NODE, unban_handler, rec);
+
+ /* register a message port for vacuum fetch */
+ ctdb_set_message_handler(ctdb, CTDB_SRVID_VACUUM_FETCH, vacuum_fetch_handler, rec);
again:
if (mem_ctx) {
@@ -1821,6 +2010,12 @@ again:
/* we only check for recovery once every second */
ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval);
+ /* verify that the main daemon is still running */
+ if (kill(ctdb->ctdbd_pid, 0) != 0) {
+ DEBUG(0,("CTDB daemon is no longer available. Shutting down recovery daemon\n"));
+ exit(-1);
+ }
+
if (rec->election_timeout) {
/* an election is in progress */
goto again;
@@ -2275,6 +2470,8 @@ int ctdb_start_recoverd(struct ctdb_context *ctdb)
return -1;
}
+ ctdb->ctdbd_pid = getpid();
+
ctdb->recoverd_pid = fork();
if (ctdb->recoverd_pid == -1) {
return -1;
diff --git a/ctdb/tools/ctdb.c b/ctdb/tools/ctdb.c
index a1ce35aace2..bbd2f0ec28b 100644
--- a/ctdb/tools/ctdb.c
+++ b/ctdb/tools/ctdb.c
@@ -1025,7 +1025,6 @@ static int control_dumpmemory(struct ctdb_context *ctdb, int argc, const char **
CTDB_CTRL_FLAG_NOREPLY, tdb_null, NULL, NULL, NULL, NULL, NULL);
}
-
static const struct {
const char *name;
int (*fn)(struct ctdb_context *, int, const char **);
@@ -1068,6 +1067,8 @@ static const struct {
{ "unregsrvid", unregsrvid, false, "unregister a server id", " " },
{ "chksrvid", chksrvid, false, "check if a server id exists", " " },
{ "getsrvids", getsrvids, false, "get a list of all server ids"},
+ { "vacuum", ctdb_vacuum, false, "vacuum the databases of empty records", "[max_records]"},
+ { "repack", ctdb_repack, false, "repack all databases", "[max_freelist]"},
};
/*
@@ -1116,6 +1117,8 @@ int main(int argc, const char *argv[])
struct event_context *ev;
const char *control;
+ setlinebuf(stdout);
+
/* set some defaults */
options.timelimit = 3;
options.pnn = CTDB_CURRENT_NODE;
diff --git a/ctdb/tools/ctdb_vacuum.c b/ctdb/tools/ctdb_vacuum.c
new file mode 100644
index 00000000000..86d9c4c5f19
--- /dev/null
+++ b/ctdb/tools/ctdb_vacuum.c
@@ -0,0 +1,633 @@
+/*
+ ctdb control tool - database vacuum
+
+ Copyright (C) Andrew Tridgell 2008
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, see .
+*/
+
+#include "includes.h"
+#include "lib/events/events.h"
+#include "system/filesys.h"
+#include "system/network.h"
+#include "../include/ctdb.h"
+#include "../include/ctdb_private.h"
+#include "db_wrap.h"
+
+/* should be tunable */
+#define TIMELIMIT() timeval_current_ofs(10, 0)
+
+struct async_data {
+ uint32_t count;
+ uint32_t fail_count;
+};
+
+static void async_callback(struct ctdb_client_control_state *state)
+{
+ struct async_data *data = talloc_get_type(state->async.private_data, struct async_data);
+ int ret;
+ int32_t res;
+
+ /* one more node has responded with recmode data */
+ data->count--;
+
+ /* if we failed to push the db, then return an error and let
+ the main loop try again.
+ */
+ if (state->state != CTDB_CONTROL_DONE) {
+ data->fail_count++;
+ return;
+ }
+
+ state->async.fn = NULL;
+
+ ret = ctdb_control_recv(state->ctdb, state, data, NULL, &res, NULL);
+ if ((ret != 0) || (res != 0)) {
+ data->fail_count++;
+ }
+}
+
+static void async_add(struct async_data *data, struct ctdb_client_control_state *state)
+{
+ /* set up the callback functions */
+ state->async.fn = async_callback;
+ state->async.private_data = data;
+
+ /* one more control to wait for to complete */
+ data->count++;
+}
+
+
+/* wait for up to the maximum number of seconds allowed
+ or until all nodes we expect a response from has replied
+*/
+static int async_wait(struct ctdb_context *ctdb, struct async_data *data)
+{
+ while (data->count > 0) {
+ event_loop_once(ctdb->ev);
+ }
+ if (data->fail_count != 0) {
+ return -1;
+ }
+ return 0;
+}
+
+/*
+ perform a simple control on nodes in the vnn map except ourselves.
+ The control cannot return data
+ */
+static int async_control_on_vnnmap(struct ctdb_context *ctdb, enum ctdb_controls opcode,
+ TDB_DATA data)
+{
+ struct async_data *async_data;
+ struct ctdb_client_control_state *state;
+ int j;
+ struct timeval timeout = TIMELIMIT();
+
+ async_data = talloc_zero(ctdb, struct async_data);
+ CTDB_NO_MEMORY_FATAL(ctdb, async_data);
+
+ /* loop over all active nodes and send an async control to each of them */
+ for (j=0; jvnn_map->size; j++) {
+ uint32_t pnn = ctdb->vnn_map->map[j];
+ if (pnn == ctdb->pnn) {
+ continue;
+ }
+ state = ctdb_control_send(ctdb, pnn, 0, opcode,
+ 0, data, async_data, NULL, &timeout, NULL);
+ if (state == NULL) {
+ DEBUG(0,(__location__ " Failed to call async control %u\n", (unsigned)opcode));
+ talloc_free(async_data);
+ return -1;
+ }
+
+ async_add(async_data, state);
+ }
+
+ if (async_wait(ctdb, async_data) != 0) {
+ talloc_free(async_data);
+ return -1;
+ }
+
+ talloc_free(async_data);
+ return 0;
+}
+
+
+/*
+ vacuum one record
+ */
+static int ctdb_vacuum_one(struct ctdb_context *ctdb, TDB_DATA key,
+ struct ctdb_db_context *ctdb_db, uint32_t *count)
+{
+ TDB_DATA data;
+ struct ctdb_ltdb_header *hdr;
+ struct ctdb_rec_data *rec;
+ uint64_t rsn;
+
+ if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, key) != 0) {
+ /* the chain is busy - come back later */
+ return 0;
+ }
+
+ data = tdb_fetch(ctdb_db->ltdb->tdb, key);
+ tdb_chainunlock(ctdb_db->ltdb->tdb, key);
+ if (data.dptr == NULL) {
+ return 0;
+ }
+ if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
+ free(data.dptr);
+ return 0;
+ }
+
+
+ hdr = (struct ctdb_ltdb_header *)data.dptr;
+ rsn = hdr->rsn;
+
+ /* if we are not the lmaster and the dmaster then skip the record */
+ if (hdr->dmaster != ctdb->pnn ||
+ ctdb_lmaster(ctdb, &key) != ctdb->pnn) {
+ free(data.dptr);
+ return 0;
+ }
+
+ rec = ctdb_marshall_record(ctdb, ctdb_db->db_id, key, hdr, tdb_null);
+ free(data.dptr);
+ if (rec == NULL) {
+ /* try it again later */
+ return 0;
+ }
+
+ data.dptr = (void *)rec;
+ data.dsize = rec->length;
+
+ if (async_control_on_vnnmap(ctdb, CTDB_CONTROL_DELETE_RECORD, data) != 0) {
+ /* one or more nodes failed to delete a record - no problem! */
+ talloc_free(rec);
+ return 0;
+ }
+
+ talloc_free(rec);
+
+ /* its deleted on all other nodes - refetch, check and delete */
+ if (tdb_chainlock_nonblock(ctdb_db->ltdb->tdb, key) != 0) {
+ /* the chain is busy - come back later */
+ return 0;
+ }
+
+ data = tdb_fetch(ctdb_db->ltdb->tdb, key);
+ if (data.dptr == NULL) {
+ tdb_chainunlock(ctdb_db->ltdb->tdb, key);
+ return 0;
+ }
+ if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
+ free(data.dptr);
+ tdb_chainunlock(ctdb_db->ltdb->tdb, key);
+ return 0;
+ }
+
+ hdr = (struct ctdb_ltdb_header *)data.dptr;
+
+ /* if we are not the lmaster and the dmaster then skip the record */
+ if (hdr->dmaster != ctdb->pnn ||
+ ctdb_lmaster(ctdb, &key) != ctdb->pnn ||
+ rsn != hdr->rsn) {
+ tdb_chainunlock(ctdb_db->ltdb->tdb, key);
+ free(data.dptr);
+ return 0;
+ }
+
+ tdb_delete(ctdb_db->ltdb->tdb, key);
+ tdb_chainunlock(ctdb_db->ltdb->tdb, key);
+ free(data.dptr);
+
+ (*count)++;
+
+ return 0;
+}
+
+
+/*
+ vacuum records for which we are the lmaster
+ */
+static int ctdb_vacuum_local(struct ctdb_context *ctdb, struct ctdb_control_pulldb_reply *list,
+ struct ctdb_db_context *ctdb_db, uint32_t *count)
+{
+ struct ctdb_rec_data *r;
+ int i;
+
+ r = (struct ctdb_rec_data *)&list->data[0];
+
+ for (i=0;
+ icount;
+ r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r), i++) {
+ TDB_DATA key;
+ key.dptr = &r->data[0];
+ key.dsize = r->keylen;
+ if (ctdb_vacuum_one(ctdb, key, ctdb_db, count) != 0) {
+ return -1;
+ }
+ }
+
+ return 0;
+}
+
+/*
+ a list of records to possibly delete
+ */
+struct vacuum_data {
+ uint32_t vacuum_limit;
+ struct ctdb_context *ctdb;
+ struct ctdb_control_pulldb_reply **list;
+ bool traverse_error;
+ uint32_t total;
+};
+
+/*
+ traverse function for vacuuming
+ */
+static int vacuum_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private)
+{
+ struct vacuum_data *vdata = talloc_get_type(private, struct vacuum_data);
+ uint32_t lmaster;
+ struct ctdb_ltdb_header *hdr;
+ struct ctdb_rec_data *rec;
+ size_t old_size;
+
+ lmaster = ctdb_lmaster(vdata->ctdb, &key);
+ if (lmaster >= vdata->ctdb->vnn_map->size) {
+ return 0;
+ }
+
+ if (data.dsize != sizeof(struct ctdb_ltdb_header)) {
+ /* its not a deleted record */
+ return 0;
+ }
+
+ hdr = (struct ctdb_ltdb_header *)data.dptr;
+
+ if (hdr->dmaster != vdata->ctdb->pnn) {
+ return 0;
+ }
+
+
+ /* add the record to the blob ready to send to the nodes */
+ rec = ctdb_marshall_record(vdata->list[lmaster], vdata->ctdb->pnn, key, NULL, tdb_null);
+ if (rec == NULL) {
+ DEBUG(0,(__location__ " Out of memory\n"));
+ vdata->traverse_error = true;
+ return -1;
+ }
+ old_size = talloc_get_size(vdata->list[lmaster]);
+ vdata->list[lmaster] = talloc_realloc_size(NULL, vdata->list[lmaster],
+ old_size + rec->length);
+ if (vdata->list[lmaster] == NULL) {
+ DEBUG(0,(__location__ " Failed to expand\n"));
+ vdata->traverse_error = true;
+ return -1;
+ }
+ vdata->list[lmaster]->count++;
+ memcpy(old_size+(uint8_t *)vdata->list[lmaster], rec, rec->length);
+ talloc_free(rec);
+
+ vdata->total++;
+
+ /* don't gather too many records */
+ if (vdata->vacuum_limit != 0 &&
+ vdata->total == vdata->vacuum_limit) {
+ return -1;
+ }
+
+ return 0;
+}
+
+
+/* vacuum one database */
+static int ctdb_vacuum_db(struct ctdb_context *ctdb, uint32_t db_id, struct ctdb_node_map *map,
+ bool persistent, uint32_t vacuum_limit)
+{
+ struct ctdb_db_context *ctdb_db;
+ const char *name;
+ struct vacuum_data *vdata;
+ int i;
+
+ vdata = talloc_zero(ctdb, struct vacuum_data);
+ if (vdata == NULL) {
+ DEBUG(0,(__location__ " Out of memory\n"));
+ return -1;
+ }
+
+ vdata->ctdb = ctdb;
+ vdata->vacuum_limit = vacuum_limit;
+
+ if (ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, db_id, vdata, &name) != 0) {
+ DEBUG(0,(__location__ " Failed to get name of db 0x%x\n", db_id));
+ talloc_free(vdata);
+ return -1;
+ }
+
+ ctdb_db = ctdb_attach(ctdb, name, persistent);
+ if (ctdb_db == NULL) {
+ DEBUG(0,(__location__ " Failed to attach to database '%s'\n", name));
+ talloc_free(vdata);
+ return -1;
+ }
+
+ /* the list needs to be of length num_nodes */
+ vdata->list = talloc_array(vdata, struct ctdb_control_pulldb_reply *, ctdb->vnn_map->size);
+ if (vdata->list == NULL) {
+ DEBUG(0,(__location__ " Out of memory\n"));
+ talloc_free(vdata);
+ return -1;
+ }
+ for (i=0;ivnn_map->size;i++) {
+ vdata->list[i] = (struct ctdb_control_pulldb_reply *)
+ talloc_zero_size(vdata->list,
+ offsetof(struct ctdb_control_pulldb_reply, data));
+ if (vdata->list[i] == NULL) {
+ DEBUG(0,(__location__ " Out of memory\n"));
+ talloc_free(vdata);
+ return -1;
+ }
+ vdata->list[i]->db_id = db_id;
+ }
+
+ /* traverse, looking for records that might be able to be vacuumed */
+ if (tdb_traverse_read(ctdb_db->ltdb->tdb, vacuum_traverse, vdata) == -1 ||
+ vdata->traverse_error) {
+ DEBUG(0,(__location__ " Traverse error in vacuuming '%s'\n", name));
+ talloc_free(vdata);
+ return -1;
+ }
+
+
+ for (i=0;ivnn_map->size;i++) {
+ if (vdata->list[i]->count == 0) {
+ continue;
+ }
+
+ /* for records where we are not the lmaster, tell the lmaster to fetch the record */
+ if (ctdb->vnn_map->map[i] != ctdb->pnn) {
+ TDB_DATA data;
+ printf("Found %u records for lmaster %u in '%s'\n", vdata->list[i]->count, i, name);
+
+ data.dsize = talloc_get_size(vdata->list[i]);
+ data.dptr = (void *)vdata->list[i];
+ if (ctdb_send_message(ctdb, ctdb->vnn_map->map[i], CTDB_SRVID_VACUUM_FETCH, data) != 0) {
+ DEBUG(0,(__location__ " Failed to send vacuum fetch message to %u\n",
+ ctdb->vnn_map->map[i]));
+ talloc_free(vdata);
+ return -1;
+ }
+ continue;
+ }
+ }
+
+ for (i=0;ivnn_map->size;i++) {
+ uint32_t count = 0;
+
+ if (vdata->list[i]->count == 0) {
+ continue;
+ }
+
+ /* for records where we are the lmaster, we can try to delete them */
+ if (ctdb_vacuum_local(ctdb, vdata->list[i], ctdb_db, &count) != 0) {
+ DEBUG(0,(__location__ " Deletion error in vacuuming '%s'\n", name));
+ talloc_free(vdata);
+ return -1;
+ }
+ if (count != 0) {
+ printf("Deleted %u records on this node from '%s'\n", count, name);
+ }
+ }
+
+ /* this ensures we run our event queue */
+ ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
+
+ talloc_free(vdata);
+
+ return 0;
+}
+
+
+/*
+ vacuum all our databases
+ */
+int ctdb_vacuum(struct ctdb_context *ctdb, int argc, const char **argv)
+{
+ struct ctdb_dbid_map *dbmap=NULL;
+ struct ctdb_node_map *nodemap=NULL;
+ int ret, i, pnn;
+ uint32_t vacuum_limit = 0;
+
+ if (argc > 0) {
+ vacuum_limit = atoi(argv[0]);
+ }
+
+ ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &dbmap);
+ if (ret != 0) {
+ DEBUG(0, ("Unable to get dbids from local node\n"));
+ return ret;
+ }
+
+ ret = ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap);
+ if (ret != 0) {
+ DEBUG(0, ("Unable to get nodemap from local node\n"));
+ return ret;
+ }
+
+ ret = ctdb_ctrl_getvnnmap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &ctdb->vnn_map);
+ if (ret != 0) {
+ DEBUG(0, ("Unable to get vnnmap from local node\n"));
+ return ret;
+ }
+
+ pnn = ctdb_ctrl_getpnn(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE);
+ if (pnn == -1) {
+ DEBUG(0, ("Unable to get pnn from local node\n"));
+ return -1;
+ }
+ ctdb->pnn = pnn;
+
+ for (i=0;inum;i++) {
+ if (ctdb_vacuum_db(ctdb, dbmap->dbs[i].dbid, nodemap,
+ dbmap->dbs[i].persistent, vacuum_limit) != 0) {
+ DEBUG(0,("Failed to vacuum db 0x%x\n", dbmap->dbs[i].dbid));
+ return -1;
+ }
+ }
+
+ return 0;
+}
+
+struct traverse_state {
+ bool error;
+ struct tdb_context *dest_db;
+};
+
+/*
+ traverse function for repacking
+ */
+static int repack_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private)
+{
+ struct traverse_state *state = (struct traverse_state *)private;
+ if (tdb_store(state->dest_db, key, data, TDB_INSERT) != 0) {
+ state->error = true;
+ return -1;
+ }
+ return 0;
+}
+
+/*
+ repack a tdb
+ */
+static int ctdb_repack_tdb(struct tdb_context *tdb)
+{
+ struct tdb_context *tmp_db;
+ struct traverse_state state;
+
+ if (tdb_transaction_start(tdb) != 0) {
+ DEBUG(0,(__location__ " Failed to start transaction\n"));
+ return -1;
+ }
+
+ tmp_db = tdb_open("tmpdb", tdb_hash_size(tdb), TDB_INTERNAL, O_RDWR|O_CREAT, 0);
+ if (tmp_db == NULL) {
+ DEBUG(0,(__location__ " Failed to create tmp_db\n"));
+ tdb_transaction_cancel(tdb);
+ return -1;
+ }
+
+ state.error = false;
+ state.dest_db = tmp_db;
+
+ if (tdb_traverse_read(tdb, repack_traverse, &state) == -1) {
+ DEBUG(0,(__location__ " Failed to traverse copying out\n"));
+ tdb_transaction_cancel(tdb);
+ tdb_close(tmp_db);
+ return -1;
+ }
+
+ if (state.error) {
+ DEBUG(0,(__location__ " Error during traversal\n"));
+ tdb_transaction_cancel(tdb);
+ tdb_close(tmp_db);
+ return -1;
+ }
+
+ if (tdb_wipe_all(tdb) != 0) {
+ DEBUG(0,(__location__ " Failed to wipe database\n"));
+ tdb_transaction_cancel(tdb);
+ tdb_close(tmp_db);
+ return -1;
+ }
+
+ state.error = false;
+ state.dest_db = tdb;
+
+ if (tdb_traverse_read(tmp_db, repack_traverse, &state) == -1) {
+ DEBUG(0,(__location__ " Failed to traverse copying back\n"));
+ tdb_transaction_cancel(tdb);
+ tdb_close(tmp_db);
+ return -1;
+ }
+
+ if (state.error) {
+ DEBUG(0,(__location__ " Error during second traversal\n"));
+ tdb_transaction_cancel(tdb);
+ tdb_close(tmp_db);
+ return -1;
+ }
+
+ tdb_close(tmp_db);
+
+ if (tdb_transaction_commit(tdb) != 0) {
+ DEBUG(0,(__location__ " Failed to commit\n"));
+ return -1;
+ }
+
+ return 0;
+}
+
+
+/* repack one database */
+static int ctdb_repack_db(struct ctdb_context *ctdb, uint32_t db_id,
+ bool persistent, uint32_t repack_limit)
+{
+ struct ctdb_db_context *ctdb_db;
+ const char *name;
+ int size;
+
+ if (ctdb_ctrl_getdbname(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, db_id, ctdb, &name) != 0) {
+ DEBUG(0,(__location__ " Failed to get name of db 0x%x\n", db_id));
+ return -1;
+ }
+
+ ctdb_db = ctdb_attach(ctdb, name, persistent);
+ if (ctdb_db == NULL) {
+ DEBUG(0,(__location__ " Failed to attach to database '%s'\n", name));
+ return -1;
+ }
+
+ size = tdb_freelist_size(ctdb_db->ltdb->tdb);
+ if (size == -1) {
+ DEBUG(0,(__location__ " Failed to get freelist size for '%s'\n", name));
+ return -1;
+ }
+
+ if (size <= repack_limit) {
+ return 0;
+ }
+
+ printf("Repacking %s with %u freelist entries\n", name, size);
+
+ if (ctdb_repack_tdb(ctdb_db->ltdb->tdb) != 0) {
+ DEBUG(0,(__location__ " Failed to repack '%s'\n", name));
+ return -1;
+ }
+
+ return 0;
+}
+
+
+/*
+ repack all our databases
+ */
+int ctdb_repack(struct ctdb_context *ctdb, int argc, const char **argv)
+{
+ struct ctdb_dbid_map *dbmap=NULL;
+ int ret, i;
+ /* a reasonable default limit to prevent us using too much memory */
+ uint32_t repack_limit = 10000;
+
+ if (argc > 0) {
+ repack_limit = atoi(argv[0]);
+ }
+
+ ret = ctdb_ctrl_getdbmap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &dbmap);
+ if (ret != 0) {
+ DEBUG(0, ("Unable to get dbids from local node\n"));
+ return ret;
+ }
+
+ for (i=0;inum;i++) {
+ if (ctdb_repack_db(ctdb, dbmap->dbs[i].dbid,
+ dbmap->dbs[i].persistent, repack_limit) != 0) {
+ DEBUG(0,("Failed to repack db 0x%x\n", dbmap->dbs[i].dbid));
+ return -1;
+ }
+ }
+
+ return 0;
+}