1
0
mirror of https://github.com/samba-team/samba.git synced 2025-01-05 09:18:06 +03:00
samba-mirror/ctdb/server/ctdb_recovery_helper.c

3201 lines
74 KiB
C
Raw Normal View History

/*
ctdb parallel database recovery
Copyright (C) Amitay Isaacs 2015
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, see <http://www.gnu.org/licenses/>.
*/
#include "replace.h"
#include "system/network.h"
#include "system/filesys.h"
#include <talloc.h>
#include <tevent.h>
#include <tdb.h>
#include <libgen.h>
#include "lib/tdb_wrap/tdb_wrap.h"
#include "lib/util/dlinklist.h"
#include "lib/util/sys_rw.h"
#include "lib/util/time.h"
#include "lib/util/tevent_unix.h"
#include "lib/util/util.h"
#include "lib/util/smb_strtox.h"
#include "protocol/protocol.h"
#include "protocol/protocol_api.h"
#include "client/client.h"
#include "common/logging.h"
ctdb-recovery: Update timeout and number of retries during recovery The timeout RecoverTimeout (default 120) is used for control messages sent during the recovery. If any of the nodes does not respond to any of the recovery control messages for RecoverTimeout seconds, then it will cause a failure of recovery of a database. Recovery helper will retry the recovery for a database 5 times. In the worst case, if a database could not be recovered within 5 attempts, a total of 600 seconds would have passed. During this time period other timeouts will be triggered causing unnecessary failures as follows: 1. During the recovery, even though recoverd is processing events, it does not send a ping message to ctdb daemon. If a ping message is not received for RecdPingTimeout (default 60) seconds, then ctdb will count it as unresponsive recovery daemon. If the recovery daemon fails for RecdFailCount (default 10) times, then ctdb daemon will restart recovery daemon. So after 600 seconds, ctdb daemon will restart recovery daemon. 2. If ctdb daemon stays in recovery for RecoveryDropAllIPs (default 120), then it will drop all the public addresses. This will cause all SMB client to be disconnected unnecessarily. The released public addresses will not be taken over till the recovery is complete. To avoid dropping of IPs and restarting recovery daemon during a delayed recovery, adjust RecoverTimeout to 30 seconds and limit number of retries for recovering a database to 3. If we don't hear from a node for more than 25 seconds, then the node is considered disconnected. So 30 seconds is sufficient timeout for controls during recovery. Signed-off-by: Amitay Isaacs <amitay@gmail.com> Reviewed-by: Martin Schwenke <martin@meltin.net> Autobuild-User(master): Martin Schwenke <martins@samba.org> Autobuild-Date(master): Mon Jun 6 08:49:15 CEST 2016 on sn-devel-144
2016-06-02 11:27:29 +03:00
static int recover_timeout = 30;
ctdb-recovery: Update timeout and number of retries during recovery The timeout RecoverTimeout (default 120) is used for control messages sent during the recovery. If any of the nodes does not respond to any of the recovery control messages for RecoverTimeout seconds, then it will cause a failure of recovery of a database. Recovery helper will retry the recovery for a database 5 times. In the worst case, if a database could not be recovered within 5 attempts, a total of 600 seconds would have passed. During this time period other timeouts will be triggered causing unnecessary failures as follows: 1. During the recovery, even though recoverd is processing events, it does not send a ping message to ctdb daemon. If a ping message is not received for RecdPingTimeout (default 60) seconds, then ctdb will count it as unresponsive recovery daemon. If the recovery daemon fails for RecdFailCount (default 10) times, then ctdb daemon will restart recovery daemon. So after 600 seconds, ctdb daemon will restart recovery daemon. 2. If ctdb daemon stays in recovery for RecoveryDropAllIPs (default 120), then it will drop all the public addresses. This will cause all SMB client to be disconnected unnecessarily. The released public addresses will not be taken over till the recovery is complete. To avoid dropping of IPs and restarting recovery daemon during a delayed recovery, adjust RecoverTimeout to 30 seconds and limit number of retries for recovering a database to 3. If we don't hear from a node for more than 25 seconds, then the node is considered disconnected. So 30 seconds is sufficient timeout for controls during recovery. Signed-off-by: Amitay Isaacs <amitay@gmail.com> Reviewed-by: Martin Schwenke <martin@meltin.net> Autobuild-User(master): Martin Schwenke <martins@samba.org> Autobuild-Date(master): Mon Jun 6 08:49:15 CEST 2016 on sn-devel-144
2016-06-02 11:27:29 +03:00
#define NUM_RETRIES 3
#define TIMEOUT() timeval_current_ofs(recover_timeout, 0)
/*
* Utility functions
*/
static bool generic_recv(struct tevent_req *req, int *perr)
{
int err;
if (tevent_req_is_unix_error(req, &err)) {
if (perr != NULL) {
*perr = err;
}
return false;
}
return true;
}
static uint64_t rec_srvid = CTDB_SRVID_RECOVERY;
static uint64_t srvid_next(void)
{
rec_srvid += 1;
return rec_srvid;
}
/*
* Node related functions
*/
struct node_list {
uint32_t *pnn_list;
uint32_t *caps;
uint32_t *ban_credits;
unsigned int size;
unsigned int count;
};
static struct node_list *node_list_init(TALLOC_CTX *mem_ctx, unsigned int size)
{
struct node_list *nlist;
unsigned int i;
nlist = talloc_zero(mem_ctx, struct node_list);
if (nlist == NULL) {
return NULL;
}
nlist->pnn_list = talloc_array(nlist, uint32_t, size);
nlist->caps = talloc_zero_array(nlist, uint32_t, size);
nlist->ban_credits = talloc_zero_array(nlist, uint32_t, size);
if (nlist->pnn_list == NULL ||
nlist->caps == NULL ||
nlist->ban_credits == NULL) {
talloc_free(nlist);
return NULL;
}
nlist->size = size;
for (i=0; i<nlist->size; i++) {
nlist->pnn_list[i] = CTDB_UNKNOWN_PNN;
}
return nlist;
}
static bool node_list_add(struct node_list *nlist, uint32_t pnn)
{
unsigned int i;
if (nlist->count == nlist->size) {
return false;
}
for (i=0; i<nlist->count; i++) {
if (nlist->pnn_list[i] == pnn) {
return false;
}
}
nlist->pnn_list[nlist->count] = pnn;
nlist->count += 1;
return true;
}
static uint32_t *node_list_lmaster(struct node_list *nlist,
TALLOC_CTX *mem_ctx,
unsigned int *pnn_count)
{
uint32_t *pnn_list;
unsigned int count, i;
pnn_list = talloc_zero_array(mem_ctx, uint32_t, nlist->count);
if (pnn_list == NULL) {
return NULL;
}
count = 0;
for (i=0; i<nlist->count; i++) {
if (!(nlist->caps[i] & CTDB_CAP_LMASTER)) {
continue;
}
pnn_list[count] = nlist->pnn_list[i];
count += 1;
}
*pnn_count = count;
return pnn_list;
}
static void node_list_ban_credits(struct node_list *nlist, uint32_t pnn)
{
unsigned int i;
for (i=0; i<nlist->count; i++) {
if (nlist->pnn_list[i] == pnn) {
nlist->ban_credits[i] += 1;
break;
}
}
}
/*
* Database list functions
*
* Simple, naive implementation that could be updated to a db_hash or similar
*/
struct db {
struct db *prev, *next;
uint32_t db_id;
uint32_t db_flags;
uint32_t *pnn_list;
unsigned int num_nodes;
};
struct db_list {
unsigned int num_dbs;
struct db *db;
unsigned int num_nodes;
};
static struct db_list *db_list_init(TALLOC_CTX *mem_ctx, unsigned int num_nodes)
{
struct db_list *l;
l = talloc_zero(mem_ctx, struct db_list);
l->num_nodes = num_nodes;
return l;
}
static struct db *db_list_find(struct db_list *dblist, uint32_t db_id)
{
struct db *db;
if (dblist == NULL) {
return NULL;
}
db = dblist->db;
while (db != NULL && db->db_id != db_id) {
db = db->next;
}
return db;
}
static int db_list_add(struct db_list *dblist,
uint32_t db_id,
uint32_t db_flags,
uint32_t node)
{
struct db *db = NULL;
if (dblist == NULL) {
return EINVAL;
}
db = talloc_zero(dblist, struct db);
if (db == NULL) {
return ENOMEM;
}
db->db_id = db_id;
db->db_flags = db_flags;
db->pnn_list = talloc_zero_array(db, uint32_t, dblist->num_nodes);
if (db->pnn_list == NULL) {
talloc_free(db);
return ENOMEM;
}
db->pnn_list[0] = node;
db->num_nodes = 1;
DLIST_ADD_END(dblist->db, db);
dblist->num_dbs++;
return 0;
}
static int db_list_check_and_add(struct db_list *dblist,
uint32_t db_id,
uint32_t db_flags,
uint32_t node)
{
struct db *db = NULL;
int ret;
/*
* These flags are masked out because they are only set on a
* node when a client attaches to that node, so they might not
* be set yet. They can't be passed as part of the attch, so
* they're no use here.
*/
db_flags &= ~(CTDB_DB_FLAGS_READONLY | CTDB_DB_FLAGS_STICKY);
if (dblist == NULL) {
return EINVAL;
}
db = db_list_find(dblist, db_id);
if (db == NULL) {
ret = db_list_add(dblist, db_id, db_flags, node);
return ret;
}
if (db->db_flags != db_flags) {
D_ERR("Incompatible database flags for 0x%"PRIx32" "
"(0x%"PRIx32" != 0x%"PRIx32")\n",
db_id,
db_flags,
db->db_flags);
return EINVAL;
}
if (db->num_nodes >= dblist->num_nodes) {
return EINVAL;
}
db->pnn_list[db->num_nodes] = node;
db->num_nodes++;
return 0;
}
/*
* Create database on nodes where it is missing
*/
struct db_create_missing_state {
struct tevent_context *ev;
struct ctdb_client_context *client;
struct node_list *nlist;
const char *db_name;
uint32_t *missing_pnn_list;
int missing_num_nodes;
};
static void db_create_missing_done(struct tevent_req *subreq);
static struct tevent_req *db_create_missing_send(
TALLOC_CTX *mem_ctx,
struct tevent_context *ev,
struct ctdb_client_context *client,
struct node_list *nlist,
const char *db_name,
struct db *db)
{
struct tevent_req *req, *subreq;
struct db_create_missing_state *state;
struct ctdb_req_control request;
unsigned int i, j;
req = tevent_req_create(mem_ctx,
&state,
struct db_create_missing_state);
if (req == NULL) {
return NULL;
}
state->ev = ev;
state->client = client;
state->nlist = nlist;
state->db_name = db_name;
if (nlist->count == db->num_nodes) {
tevent_req_done(req);
return tevent_req_post(req, ev);
}
state->missing_pnn_list = talloc_array(mem_ctx, uint32_t, nlist->count);
if (tevent_req_nomem(state->missing_pnn_list, req)) {
return tevent_req_post(req, ev);
}
for (i = 0; i < nlist->count; i++) {
uint32_t pnn = nlist->pnn_list[i] ;
for (j = 0; j < db->num_nodes; j++) {
if (pnn == db->pnn_list[j]) {
break;
}
}
if (j < db->num_nodes) {
continue;
}
DBG_INFO("Create database %s on node %u\n",
state->db_name,
pnn);
state->missing_pnn_list[state->missing_num_nodes] = pnn;
state->missing_num_nodes++;
}
if (db->db_flags & CTDB_DB_FLAGS_PERSISTENT) {
ctdb_req_control_db_attach_persistent(&request, db_name);
} else if (db->db_flags & CTDB_DB_FLAGS_REPLICATED) {
ctdb_req_control_db_attach_replicated(&request, db_name);
} else {
ctdb_req_control_db_attach(&request, db_name);
}
request.flags = CTDB_CTRL_FLAG_ATTACH_RECOVERY;
subreq = ctdb_client_control_multi_send(state,
state->ev,
state->client,
state->missing_pnn_list,
state->missing_num_nodes,
TIMEOUT(),
&request);
if (tevent_req_nomem(subreq, req)) {
return tevent_req_post(req, ev);
}
tevent_req_set_callback(subreq, db_create_missing_done, req);
return req;
}
static void db_create_missing_done(struct tevent_req *subreq)
{
struct tevent_req *req = tevent_req_callback_data(
subreq, struct tevent_req);
struct db_create_missing_state *state = tevent_req_data(
req, struct db_create_missing_state);
int *err_list;
int ret;
bool status;
status = ctdb_client_control_multi_recv(subreq,
&ret,
NULL,
&err_list,
NULL);
TALLOC_FREE(subreq);
if (! status) {
int ret2;
uint32_t pnn;
ret2 = ctdb_client_control_multi_error(
state->missing_pnn_list,
state->missing_num_nodes,
err_list,
&pnn);
if (ret2 != 0) {
D_ERR("control DB_ATTACH failed for db %s"
" on node %u, ret=%d\n",
state->db_name,
pnn,
ret2);
node_list_ban_credits(state->nlist, pnn);
} else {
D_ERR("control DB_ATTACH failed for db %s, ret=%d\n",
state->db_name,
ret);
}
tevent_req_error(req, ret);
return;
}
tevent_req_done(req);
}
static bool db_create_missing_recv(struct tevent_req *req, int *perr)
{
return generic_recv(req, perr);
}
/*
* Recovery database functions
*/
struct recdb_context {
uint32_t db_id;
const char *db_name;
const char *db_path;
struct tdb_wrap *db;
bool persistent;
};
static struct recdb_context *recdb_create(TALLOC_CTX *mem_ctx, uint32_t db_id,
const char *db_name,
const char *db_path,
uint32_t hash_size, bool persistent)
{
static char *db_dir_state = NULL;
struct recdb_context *recdb;
unsigned int tdb_flags;
recdb = talloc(mem_ctx, struct recdb_context);
if (recdb == NULL) {
return NULL;
}
if (db_dir_state == NULL) {
db_dir_state = getenv("CTDB_DBDIR_STATE");
}
recdb->db_name = db_name;
recdb->db_id = db_id;
recdb->db_path = talloc_asprintf(recdb, "%s/recdb.%s",
db_dir_state != NULL ?
db_dir_state :
dirname(discard_const(db_path)),
db_name);
if (recdb->db_path == NULL) {
talloc_free(recdb);
return NULL;
}
unlink(recdb->db_path);
tdb_flags = TDB_NOLOCK | TDB_INCOMPATIBLE_HASH | TDB_DISALLOW_NESTING;
recdb->db = tdb_wrap_open(mem_ctx, recdb->db_path, hash_size,
tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
if (recdb->db == NULL) {
talloc_free(recdb);
D_ERR("failed to create recovery db %s\n", recdb->db_path);
return NULL;
}
recdb->persistent = persistent;
return recdb;
}
static uint32_t recdb_id(struct recdb_context *recdb)
{
return recdb->db_id;
}
static const char *recdb_name(struct recdb_context *recdb)
{
return recdb->db_name;
}
static const char *recdb_path(struct recdb_context *recdb)
{
return recdb->db_path;
}
static struct tdb_context *recdb_tdb(struct recdb_context *recdb)
{
return recdb->db->tdb;
}
static bool recdb_persistent(struct recdb_context *recdb)
{
return recdb->persistent;
}
struct recdb_add_traverse_state {
struct recdb_context *recdb;
uint32_t mypnn;
};
static int recdb_add_traverse(uint32_t reqid, struct ctdb_ltdb_header *header,
TDB_DATA key, TDB_DATA data,
void *private_data)
{
struct recdb_add_traverse_state *state =
(struct recdb_add_traverse_state *)private_data;
struct ctdb_ltdb_header *hdr;
TDB_DATA prev_data;
int ret;
/* header is not marshalled separately in the pulldb control */
if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
return -1;
}
hdr = (struct ctdb_ltdb_header *)data.dptr;
/* fetch the existing record, if any */
prev_data = tdb_fetch(recdb_tdb(state->recdb), key);
if (prev_data.dptr != NULL) {
struct ctdb_ltdb_header prev_hdr;
prev_hdr = *(struct ctdb_ltdb_header *)prev_data.dptr;
free(prev_data.dptr);
if (hdr->rsn < prev_hdr.rsn ||
(hdr->rsn == prev_hdr.rsn &&
prev_hdr.dmaster != state->mypnn)) {
return 0;
}
}
ret = tdb_store(recdb_tdb(state->recdb), key, data, TDB_REPLACE);
if (ret != 0) {
return -1;
}
return 0;
}
static bool recdb_add(struct recdb_context *recdb, int mypnn,
struct ctdb_rec_buffer *recbuf)
{
struct recdb_add_traverse_state state;
int ret;
state.recdb = recdb;
state.mypnn = mypnn;
ret = ctdb_rec_buffer_traverse(recbuf, recdb_add_traverse, &state);
if (ret != 0) {
return false;
}
return true;
}
/* This function decides which records from recdb are retained */
static int recbuf_filter_add(struct ctdb_rec_buffer *recbuf, bool persistent,
uint32_t reqid, uint32_t dmaster,
TDB_DATA key, TDB_DATA data)
{
struct ctdb_ltdb_header *header;
int ret;
/* Skip empty records */
if (data.dsize <= sizeof(struct ctdb_ltdb_header)) {
return 0;
}
/* update the dmaster field to point to us */
header = (struct ctdb_ltdb_header *)data.dptr;
if (!persistent) {
header->dmaster = dmaster;
header->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
}
ret = ctdb_rec_buffer_add(recbuf, recbuf, reqid, NULL, key, data);
if (ret != 0) {
return ret;
}
return 0;
}
struct recdb_file_traverse_state {
struct ctdb_rec_buffer *recbuf;
struct recdb_context *recdb;
TALLOC_CTX *mem_ctx;
uint32_t dmaster;
uint32_t reqid;
bool persistent;
bool failed;
int fd;
size_t max_size;
unsigned int num_buffers;
};
static int recdb_file_traverse(struct tdb_context *tdb,
TDB_DATA key, TDB_DATA data,
void *private_data)
{
struct recdb_file_traverse_state *state =
(struct recdb_file_traverse_state *)private_data;
int ret;
ret = recbuf_filter_add(state->recbuf, state->persistent,
state->reqid, state->dmaster, key, data);
if (ret != 0) {
state->failed = true;
return ret;
}
if (ctdb_rec_buffer_len(state->recbuf) > state->max_size) {
ret = ctdb_rec_buffer_write(state->recbuf, state->fd);
if (ret != 0) {
D_ERR("Failed to collect recovery records for %s\n",
recdb_name(state->recdb));
state->failed = true;
return ret;
}
state->num_buffers += 1;
TALLOC_FREE(state->recbuf);
state->recbuf = ctdb_rec_buffer_init(state->mem_ctx,
recdb_id(state->recdb));
if (state->recbuf == NULL) {
state->failed = true;
return ENOMEM;
}
}
return 0;
}
static int recdb_file(struct recdb_context *recdb, TALLOC_CTX *mem_ctx,
uint32_t dmaster, int fd, int max_size)
{
struct recdb_file_traverse_state state;
int ret;
state.recbuf = ctdb_rec_buffer_init(mem_ctx, recdb_id(recdb));
if (state.recbuf == NULL) {
return -1;
}
state.recdb = recdb;
state.mem_ctx = mem_ctx;
state.dmaster = dmaster;
state.reqid = 0;
state.persistent = recdb_persistent(recdb);
state.failed = false;
state.fd = fd;
state.max_size = max_size;
state.num_buffers = 0;
ret = tdb_traverse_read(recdb_tdb(recdb), recdb_file_traverse, &state);
if (ret == -1 || state.failed) {
TALLOC_FREE(state.recbuf);
return -1;
}
ret = ctdb_rec_buffer_write(state.recbuf, fd);
if (ret != 0) {
D_ERR("Failed to collect recovery records for %s\n",
recdb_name(recdb));
TALLOC_FREE(state.recbuf);
return -1;
}
state.num_buffers += 1;
D_DEBUG("Wrote %d buffers of recovery records for %s\n",
state.num_buffers, recdb_name(recdb));
return state.num_buffers;
}
/*
* Pull database from a single node
*/
struct pull_database_state {
struct tevent_context *ev;
struct ctdb_client_context *client;
struct recdb_context *recdb;
uint32_t pnn;
uint64_t srvid;
unsigned int num_records;
int result;
};
static void pull_database_handler(uint64_t srvid, TDB_DATA data,
void *private_data);
static void pull_database_register_done(struct tevent_req *subreq);
static void pull_database_unregister_done(struct tevent_req *subreq);
static void pull_database_done(struct tevent_req *subreq);
static struct tevent_req *pull_database_send(
TALLOC_CTX *mem_ctx,
struct tevent_context *ev,
struct ctdb_client_context *client,
uint32_t pnn,
struct recdb_context *recdb)
{
struct tevent_req *req, *subreq;
struct pull_database_state *state;
req = tevent_req_create(mem_ctx, &state, struct pull_database_state);
if (req == NULL) {
return NULL;
}
state->ev = ev;
state->client = client;
state->recdb = recdb;
state->pnn = pnn;
state->srvid = srvid_next();
subreq = ctdb_client_set_message_handler_send(
state, state->ev, state->client,
state->srvid, pull_database_handler,
req);
if (tevent_req_nomem(subreq, req)) {
return tevent_req_post(req, ev);
}
tevent_req_set_callback(subreq, pull_database_register_done, req);
return req;
}
static void pull_database_handler(uint64_t srvid, TDB_DATA data,
void *private_data)
{
struct tevent_req *req = talloc_get_type_abort(
private_data, struct tevent_req);
struct pull_database_state *state = tevent_req_data(
req, struct pull_database_state);
struct ctdb_rec_buffer *recbuf;
size_t np;
int ret;
bool status;
if (srvid != state->srvid) {
return;
}
ret = ctdb_rec_buffer_pull(data.dptr, data.dsize, state, &recbuf, &np);
if (ret != 0) {
D_ERR("Invalid data received for DB_PULL messages\n");
return;
}
if (recbuf->db_id != recdb_id(state->recdb)) {
talloc_free(recbuf);
D_ERR("Invalid dbid:%08x for DB_PULL messages for %s\n",
recbuf->db_id, recdb_name(state->recdb));
return;
}
status = recdb_add(state->recdb, ctdb_client_pnn(state->client),
recbuf);
if (! status) {
talloc_free(recbuf);
D_ERR("Failed to add records to recdb for %s\n",
recdb_name(state->recdb));
return;
}
state->num_records += recbuf->count;
talloc_free(recbuf);
}
static void pull_database_register_done(struct tevent_req *subreq)
{
struct tevent_req *req = tevent_req_callback_data(
subreq, struct tevent_req);
struct pull_database_state *state = tevent_req_data(
req, struct pull_database_state);
struct ctdb_req_control request;
struct ctdb_pulldb_ext pulldb_ext;
int ret;
bool status;
status = ctdb_client_set_message_handler_recv(subreq, &ret);
TALLOC_FREE(subreq);
if (! status) {
D_ERR("Failed to set message handler for DB_PULL for %s\n",
recdb_name(state->recdb));
tevent_req_error(req, ret);
return;
}
pulldb_ext.db_id = recdb_id(state->recdb);
pulldb_ext.lmaster = CTDB_LMASTER_ANY;
pulldb_ext.srvid = state->srvid;
ctdb_req_control_db_pull(&request, &pulldb_ext);
subreq = ctdb_client_control_send(state, state->ev, state->client,
state->pnn, TIMEOUT(), &request);
if (tevent_req_nomem(subreq, req)) {
return;
}
tevent_req_set_callback(subreq, pull_database_done, req);
}
static void pull_database_done(struct tevent_req *subreq)
{
struct tevent_req *req = tevent_req_callback_data(
subreq, struct tevent_req);
struct pull_database_state *state = tevent_req_data(
req, struct pull_database_state);
struct ctdb_reply_control *reply;
uint32_t num_records;
int ret;
bool status;
status = ctdb_client_control_recv(subreq, &ret, state, &reply);
TALLOC_FREE(subreq);
if (! status) {
D_ERR("control DB_PULL failed for %s on node %u, ret=%d\n",
recdb_name(state->recdb), state->pnn, ret);
state->result = ret;
goto unregister;
}
ret = ctdb_reply_control_db_pull(reply, &num_records);
talloc_free(reply);
if (num_records != state->num_records) {
D_ERR("mismatch (%u != %u) in DB_PULL records for db %s\n",
num_records, state->num_records,
recdb_name(state->recdb));
state->result = EIO;
goto unregister;
}
D_INFO("Pulled %d records for db %s from node %d\n",
state->num_records, recdb_name(state->recdb), state->pnn);
unregister:
subreq = ctdb_client_remove_message_handler_send(
state, state->ev, state->client,
state->srvid, req);
if (tevent_req_nomem(subreq, req)) {
return;
}
tevent_req_set_callback(subreq, pull_database_unregister_done, req);
}
static void pull_database_unregister_done(struct tevent_req *subreq)
{
struct tevent_req *req = tevent_req_callback_data(
subreq, struct tevent_req);
struct pull_database_state *state = tevent_req_data(
req, struct pull_database_state);
int ret;
bool status;
status = ctdb_client_remove_message_handler_recv(subreq, &ret);
TALLOC_FREE(subreq);
if (! status) {
D_ERR("failed to remove message handler for DB_PULL for db %s\n",
recdb_name(state->recdb));
tevent_req_error(req, ret);
return;
}
if (state->result != 0) {
tevent_req_error(req, state->result);
return;
}
tevent_req_done(req);
}
static bool pull_database_recv(struct tevent_req *req, int *perr)
{
return generic_recv(req, perr);
}
/*
* Push database to specified nodes (new style)
*/
struct push_database_state {
struct tevent_context *ev;
struct ctdb_client_context *client;
struct recdb_context *recdb;
uint32_t *pnn_list;
unsigned int count;
uint64_t srvid;
uint32_t dmaster;
int fd;
int num_buffers;
int num_buffers_sent;
unsigned int num_records;
};
static void push_database_started(struct tevent_req *subreq);
static void push_database_send_msg(struct tevent_req *req);
static void push_database_send_done(struct tevent_req *subreq);
static void push_database_confirmed(struct tevent_req *subreq);
static struct tevent_req *push_database_send(
TALLOC_CTX *mem_ctx,
struct tevent_context *ev,
struct ctdb_client_context *client,
uint32_t *pnn_list,
unsigned int count,
struct recdb_context *recdb,
int max_size)
{
struct tevent_req *req, *subreq;
struct push_database_state *state;
struct ctdb_req_control request;
struct ctdb_pulldb_ext pulldb_ext;
char *filename;
off_t offset;
req = tevent_req_create(mem_ctx, &state,
struct push_database_state);
if (req == NULL) {
return NULL;
}
state->ev = ev;
state->client = client;
state->recdb = recdb;
state->pnn_list = pnn_list;
state->count = count;
state->srvid = srvid_next();
state->dmaster = ctdb_client_pnn(client);
state->num_buffers_sent = 0;
state->num_records = 0;
filename = talloc_asprintf(state, "%s.dat", recdb_path(recdb));
if (tevent_req_nomem(filename, req)) {
return tevent_req_post(req, ev);
}
state->fd = open(filename, O_RDWR|O_CREAT, 0644);
if (state->fd == -1) {
tevent_req_error(req, errno);
return tevent_req_post(req, ev);
}
unlink(filename);
talloc_free(filename);
state->num_buffers = recdb_file(recdb, state, state->dmaster,
state->fd, max_size);
if (state->num_buffers == -1) {
tevent_req_error(req, ENOMEM);
return tevent_req_post(req, ev);
}
offset = lseek(state->fd, 0, SEEK_SET);
if (offset != 0) {
tevent_req_error(req, EIO);
return tevent_req_post(req, ev);
}
pulldb_ext.db_id = recdb_id(recdb);
pulldb_ext.srvid = state->srvid;
ctdb_req_control_db_push_start(&request, &pulldb_ext);
subreq = ctdb_client_control_multi_send(state, ev, client,
pnn_list, count,
TIMEOUT(), &request);
if (tevent_req_nomem(subreq, req)) {
return tevent_req_post(req, ev);
}
tevent_req_set_callback(subreq, push_database_started, req);
return req;
}
static void push_database_started(struct tevent_req *subreq)
{
struct tevent_req *req = tevent_req_callback_data(
subreq, struct tevent_req);
struct push_database_state *state = tevent_req_data(
req, struct push_database_state);
int *err_list;
int ret;
bool status;
status = ctdb_client_control_multi_recv(subreq, &ret, state,
&err_list, NULL);
TALLOC_FREE(subreq);
if (! status) {
int ret2;
uint32_t pnn;
ret2 = ctdb_client_control_multi_error(state->pnn_list,
state->count,
err_list, &pnn);
if (ret2 != 0) {
D_ERR("control DB_PUSH_START failed for db %s"
" on node %u, ret=%d\n",
recdb_name(state->recdb), pnn, ret2);
} else {
D_ERR("control DB_PUSH_START failed for db %s,"
" ret=%d\n",
recdb_name(state->recdb), ret);
}
talloc_free(err_list);
tevent_req_error(req, ret);
return;
}
push_database_send_msg(req);
}
static void push_database_send_msg(struct tevent_req *req)
{
struct push_database_state *state = tevent_req_data(
req, struct push_database_state);
struct tevent_req *subreq;
struct ctdb_rec_buffer *recbuf;
struct ctdb_req_message message;
TDB_DATA data;
size_t np;
int ret;
if (state->num_buffers_sent == state->num_buffers) {
struct ctdb_req_control request;
ctdb_req_control_db_push_confirm(&request,
recdb_id(state->recdb));
subreq = ctdb_client_control_multi_send(state, state->ev,
state->client,
state->pnn_list,
state->count,
TIMEOUT(), &request);
if (tevent_req_nomem(subreq, req)) {
return;
}
tevent_req_set_callback(subreq, push_database_confirmed, req);
return;
}
ret = ctdb_rec_buffer_read(state->fd, state, &recbuf);
if (ret != 0) {
tevent_req_error(req, ret);
return;
}
data.dsize = ctdb_rec_buffer_len(recbuf);
data.dptr = talloc_size(state, data.dsize);
if (tevent_req_nomem(data.dptr, req)) {
return;
}
ctdb_rec_buffer_push(recbuf, data.dptr, &np);
message.srvid = state->srvid;
message.data.data = data;
D_DEBUG("Pushing buffer %d with %d records for db %s\n",
state->num_buffers_sent, recbuf->count,
recdb_name(state->recdb));
subreq = ctdb_client_message_multi_send(state, state->ev,
state->client,
state->pnn_list, state->count,
&message);
if (tevent_req_nomem(subreq, req)) {
return;
}
tevent_req_set_callback(subreq, push_database_send_done, req);
state->num_records += recbuf->count;
talloc_free(data.dptr);
talloc_free(recbuf);
}
static void push_database_send_done(struct tevent_req *subreq)
{
struct tevent_req *req = tevent_req_callback_data(
subreq, struct tevent_req);
struct push_database_state *state = tevent_req_data(
req, struct push_database_state);
bool status;
int ret;
status = ctdb_client_message_multi_recv(subreq, &ret, NULL, NULL);
TALLOC_FREE(subreq);
if (! status) {
D_ERR("Sending recovery records failed for %s\n",
recdb_name(state->recdb));
tevent_req_error(req, ret);
return;
}
state->num_buffers_sent += 1;
push_database_send_msg(req);
}
static void push_database_confirmed(struct tevent_req *subreq)
{
struct tevent_req *req = tevent_req_callback_data(
subreq, struct tevent_req);
struct push_database_state *state = tevent_req_data(
req, struct push_database_state);
struct ctdb_reply_control **reply;
int *err_list;
bool status;
unsigned int i;
int ret;
uint32_t num_records;
status = ctdb_client_control_multi_recv(subreq, &ret, state,
&err_list, &reply);
TALLOC_FREE(subreq);
if (! status) {
int ret2;
uint32_t pnn;
ret2 = ctdb_client_control_multi_error(state->pnn_list,
state->count, err_list,
&pnn);
if (ret2 != 0) {
D_ERR("control DB_PUSH_CONFIRM failed for db %s"
" on node %u, ret=%d\n",
recdb_name(state->recdb), pnn, ret2);
} else {
D_ERR("control DB_PUSH_CONFIRM failed for db %s,"
" ret=%d\n",
recdb_name(state->recdb), ret);
}
tevent_req_error(req, ret);
return;
}
for (i=0; i<state->count; i++) {
ret = ctdb_reply_control_db_push_confirm(reply[i],
&num_records);
if (ret != 0) {
tevent_req_error(req, EPROTO);
return;
}
if (num_records != state->num_records) {
D_ERR("Node %u received %d of %d records for %s\n",
state->pnn_list[i], num_records,
state->num_records, recdb_name(state->recdb));
tevent_req_error(req, EPROTO);
return;
}
}
talloc_free(reply);
D_INFO("Pushed %d records for db %s\n",
state->num_records, recdb_name(state->recdb));
tevent_req_done(req);
}
static bool push_database_recv(struct tevent_req *req, int *perr)
{
return generic_recv(req, perr);
}
/*
* Collect databases using highest sequence number
*/
struct collect_highseqnum_db_state {
struct tevent_context *ev;
struct ctdb_client_context *client;
struct node_list *nlist;
uint32_t db_id;
struct recdb_context *recdb;
uint32_t max_pnn;
};
static void collect_highseqnum_db_seqnum_done(struct tevent_req *subreq);
static void collect_highseqnum_db_pulldb_done(struct tevent_req *subreq);
static struct tevent_req *collect_highseqnum_db_send(
TALLOC_CTX *mem_ctx,
struct tevent_context *ev,
struct ctdb_client_context *client,
struct node_list *nlist,
uint32_t db_id,
struct recdb_context *recdb)
{
struct tevent_req *req, *subreq;
struct collect_highseqnum_db_state *state;
struct ctdb_req_control request;
req = tevent_req_create(mem_ctx, &state,
struct collect_highseqnum_db_state);
if (req == NULL) {
return NULL;
}
state->ev = ev;
state->client = client;
state->nlist = nlist;
state->db_id = db_id;
state->recdb = recdb;
ctdb_req_control_get_db_seqnum(&request, db_id);
subreq = ctdb_client_control_multi_send(mem_ctx,
ev,
client,
nlist->pnn_list,
nlist->count,
TIMEOUT(),
&request);
if (tevent_req_nomem(subreq, req)) {
return tevent_req_post(req, ev);
}
tevent_req_set_callback(subreq, collect_highseqnum_db_seqnum_done,
req);
return req;
}
static void collect_highseqnum_db_seqnum_done(struct tevent_req *subreq)
{
struct tevent_req *req = tevent_req_callback_data(
subreq, struct tevent_req);
struct collect_highseqnum_db_state *state = tevent_req_data(
req, struct collect_highseqnum_db_state);
struct ctdb_reply_control **reply;
int *err_list;
bool status;
unsigned int i;
int ret;
uint64_t seqnum, max_seqnum;
status = ctdb_client_control_multi_recv(subreq, &ret, state,
&err_list, &reply);
TALLOC_FREE(subreq);
if (! status) {
int ret2;
uint32_t pnn;
ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
state->nlist->count,
err_list,
&pnn);
if (ret2 != 0) {
D_ERR("control GET_DB_SEQNUM failed for db %s"
" on node %u, ret=%d\n",
recdb_name(state->recdb), pnn, ret2);
} else {
D_ERR("control GET_DB_SEQNUM failed for db %s,"
" ret=%d\n",
recdb_name(state->recdb), ret);
}
tevent_req_error(req, ret);
return;
}
max_seqnum = 0;
state->max_pnn = state->nlist->pnn_list[0];
for (i=0; i<state->nlist->count; i++) {
ret = ctdb_reply_control_get_db_seqnum(reply[i], &seqnum);
if (ret != 0) {
tevent_req_error(req, EPROTO);
return;
}
if (max_seqnum < seqnum) {
max_seqnum = seqnum;
state->max_pnn = state->nlist->pnn_list[i];
}
}
talloc_free(reply);
D_INFO("Pull persistent db %s from node %d with seqnum 0x%"PRIx64"\n",
recdb_name(state->recdb), state->max_pnn, max_seqnum);
subreq = pull_database_send(state,
state->ev,
state->client,
state->max_pnn,
state->recdb);
if (tevent_req_nomem(subreq, req)) {
return;
}
tevent_req_set_callback(subreq, collect_highseqnum_db_pulldb_done,
req);
}
static void collect_highseqnum_db_pulldb_done(struct tevent_req *subreq)
{
struct tevent_req *req = tevent_req_callback_data(
subreq, struct tevent_req);
struct collect_highseqnum_db_state *state = tevent_req_data(
req, struct collect_highseqnum_db_state);
int ret;
bool status;
status = pull_database_recv(subreq, &ret);
TALLOC_FREE(subreq);
if (! status) {
node_list_ban_credits(state->nlist, state->max_pnn);
tevent_req_error(req, ret);
return;
}
tevent_req_done(req);
}
static bool collect_highseqnum_db_recv(struct tevent_req *req, int *perr)
{
return generic_recv(req, perr);
}
/*
* Collect all databases
*/
struct collect_all_db_state {
struct tevent_context *ev;
struct ctdb_client_context *client;
struct node_list *nlist;
uint32_t db_id;
struct recdb_context *recdb;
struct ctdb_pulldb pulldb;
unsigned int index;
};
static void collect_all_db_pulldb_done(struct tevent_req *subreq);
static struct tevent_req *collect_all_db_send(
TALLOC_CTX *mem_ctx,
struct tevent_context *ev,
struct ctdb_client_context *client,
struct node_list *nlist,
uint32_t db_id,
struct recdb_context *recdb)
{
struct tevent_req *req, *subreq;
struct collect_all_db_state *state;
req = tevent_req_create(mem_ctx, &state,
struct collect_all_db_state);
if (req == NULL) {
return NULL;
}
state->ev = ev;
state->client = client;
state->nlist = nlist;
state->db_id = db_id;
state->recdb = recdb;
state->index = 0;
subreq = pull_database_send(state,
ev,
client,
nlist->pnn_list[state->index],
recdb);
if (tevent_req_nomem(subreq, req)) {
return tevent_req_post(req, ev);
}
tevent_req_set_callback(subreq, collect_all_db_pulldb_done, req);
return req;
}
static void collect_all_db_pulldb_done(struct tevent_req *subreq)
{
struct tevent_req *req = tevent_req_callback_data(
subreq, struct tevent_req);
struct collect_all_db_state *state = tevent_req_data(
req, struct collect_all_db_state);
int ret;
bool status;
status = pull_database_recv(subreq, &ret);
TALLOC_FREE(subreq);
if (! status) {
node_list_ban_credits(state->nlist,
state->nlist->pnn_list[state->index]);
tevent_req_error(req, ret);
return;
}
state->index += 1;
if (state->index == state->nlist->count) {
tevent_req_done(req);
return;
}
subreq = pull_database_send(state,
state->ev,
state->client,
state->nlist->pnn_list[state->index],
state->recdb);
if (tevent_req_nomem(subreq, req)) {
return;
}
tevent_req_set_callback(subreq, collect_all_db_pulldb_done, req);
}
static bool collect_all_db_recv(struct tevent_req *req, int *perr)
{
return generic_recv(req, perr);
}
/**
* For each database do the following:
* - Get DB name from all nodes
* - Attach database on missing nodes
* - Get DB path
* - Freeze database on all nodes
* - Start transaction on all nodes
* - Collect database from all nodes
* - Wipe database on all nodes
* - Push database to all nodes
* - Commit transaction on all nodes
* - Thaw database on all nodes
*/
struct recover_db_state {
struct tevent_context *ev;
struct ctdb_client_context *client;
struct ctdb_tunable_list *tun_list;
struct node_list *nlist;
struct db *db;
uint32_t destnode;
struct ctdb_transdb transdb;
const char *db_name, *db_path;
struct recdb_context *recdb;
};
static void recover_db_name_done(struct tevent_req *subreq);
static void recover_db_create_missing_done(struct tevent_req *subreq);
static void recover_db_path_done(struct tevent_req *subreq);
static void recover_db_freeze_done(struct tevent_req *subreq);
static void recover_db_transaction_started(struct tevent_req *subreq);
static void recover_db_collect_done(struct tevent_req *subreq);
static void recover_db_wipedb_done(struct tevent_req *subreq);
static void recover_db_pushdb_done(struct tevent_req *subreq);
static void recover_db_transaction_committed(struct tevent_req *subreq);
static void recover_db_thaw_done(struct tevent_req *subreq);
static struct tevent_req *recover_db_send(TALLOC_CTX *mem_ctx,
struct tevent_context *ev,
struct ctdb_client_context *client,
struct ctdb_tunable_list *tun_list,
struct node_list *nlist,
uint32_t generation,
struct db *db)
{
struct tevent_req *req, *subreq;
struct recover_db_state *state;
struct ctdb_req_control request;
req = tevent_req_create(mem_ctx, &state, struct recover_db_state);
if (req == NULL) {
return NULL;
}
state->ev = ev;
state->client = client;
state->tun_list = tun_list;
state->nlist = nlist;
state->db = db;
state->destnode = ctdb_client_pnn(client);
state->transdb.db_id = db->db_id;
state->transdb.tid = generation;
ctdb_req_control_get_dbname(&request, db->db_id);
subreq = ctdb_client_control_multi_send(state,
ev,
client,
state->db->pnn_list,
state->db->num_nodes,
TIMEOUT(),
&request);
if (tevent_req_nomem(subreq, req)) {
return tevent_req_post(req, ev);
}
tevent_req_set_callback(subreq, recover_db_name_done, req);
return req;
}
static void recover_db_name_done(struct tevent_req *subreq)
{
struct tevent_req *req = tevent_req_callback_data(
subreq, struct tevent_req);
struct recover_db_state *state = tevent_req_data(
req, struct recover_db_state);
struct ctdb_reply_control **reply;
int *err_list;
unsigned int i;
int ret;
bool status;
status = ctdb_client_control_multi_recv(subreq,
&ret,
state,
&err_list,
&reply);
TALLOC_FREE(subreq);
if (! status) {
int ret2;
uint32_t pnn;
ret2 = ctdb_client_control_multi_error(state->db->pnn_list,
state->db->num_nodes,
err_list,
&pnn);
if (ret2 != 0) {
D_ERR("control GET_DBNAME failed on node %u,"
" ret=%d\n",
pnn,
ret2);
} else {
D_ERR("control GET_DBNAME failed, ret=%d\n",
ret);
}
tevent_req_error(req, ret);
return;
}
for (i = 0; i < state->db->num_nodes; i++) {
const char *db_name;
uint32_t pnn;
pnn = state->nlist->pnn_list[i];
ret = ctdb_reply_control_get_dbname(reply[i],
state,
&db_name);
if (ret != 0) {
D_ERR("control GET_DBNAME failed on node %u "
"for db=0x%x, ret=%d\n",
pnn,
state->db->db_id,
ret);
tevent_req_error(req, EPROTO);
return;
}
if (state->db_name == NULL) {
state->db_name = db_name;
continue;
}
if (strcmp(state->db_name, db_name) != 0) {
D_ERR("Incompatible database name for 0x%"PRIx32" "
"(%s != %s) on node %"PRIu32"\n",
state->db->db_id,
db_name,
state->db_name,
pnn);
node_list_ban_credits(state->nlist, pnn);
tevent_req_error(req, ret);
return;
}
}
talloc_free(reply);
subreq = db_create_missing_send(state,
state->ev,
state->client,
state->nlist,
state->db_name,
state->db);
if (tevent_req_nomem(subreq, req)) {
return;
}
tevent_req_set_callback(subreq, recover_db_create_missing_done, req);
}
static void recover_db_create_missing_done(struct tevent_req *subreq)
{
struct tevent_req *req = tevent_req_callback_data(
subreq, struct tevent_req);
struct recover_db_state *state = tevent_req_data(
req, struct recover_db_state);
struct ctdb_req_control request;
int ret;
bool status;
/* Could sanity check the db_id here */
status = db_create_missing_recv(subreq, &ret);
TALLOC_FREE(subreq);
if (! status) {
tevent_req_error(req, ret);
return;
}
ctdb_req_control_getdbpath(&request, state->db->db_id);
subreq = ctdb_client_control_send(state, state->ev, state->client,
state->destnode, TIMEOUT(),
&request);
if (tevent_req_nomem(subreq, req)) {
return;
}
tevent_req_set_callback(subreq, recover_db_path_done, req);
}
static void recover_db_path_done(struct tevent_req *subreq)
{
struct tevent_req *req = tevent_req_callback_data(
subreq, struct tevent_req);
struct recover_db_state *state = tevent_req_data(
req, struct recover_db_state);
struct ctdb_reply_control *reply;
struct ctdb_req_control request;
int ret;
bool status;
status = ctdb_client_control_recv(subreq, &ret, state, &reply);
TALLOC_FREE(subreq);
if (! status) {
D_ERR("control GETDBPATH failed for db %s, ret=%d\n",
state->db_name, ret);
tevent_req_error(req, ret);
return;
}
ret = ctdb_reply_control_getdbpath(reply, state, &state->db_path);
if (ret != 0) {
D_ERR("control GETDBPATH failed for db %s, ret=%d\n",
state->db_name, ret);
tevent_req_error(req, EPROTO);
return;
}
talloc_free(reply);
ctdb_req_control_db_freeze(&request, state->db->db_id);
subreq = ctdb_client_control_multi_send(state,
state->ev,
state->client,
state->nlist->pnn_list,
state->nlist->count,
TIMEOUT(),
&request);
if (tevent_req_nomem(subreq, req)) {
return;
}
tevent_req_set_callback(subreq, recover_db_freeze_done, req);
}
static void recover_db_freeze_done(struct tevent_req *subreq)
{
struct tevent_req *req = tevent_req_callback_data(
subreq, struct tevent_req);
struct recover_db_state *state = tevent_req_data(
req, struct recover_db_state);
struct ctdb_req_control request;
int *err_list;
int ret;
bool status;
status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
NULL);
TALLOC_FREE(subreq);
if (! status) {
int ret2;
uint32_t pnn;
ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
state->nlist->count,
err_list,
&pnn);
if (ret2 != 0) {
D_ERR("control FREEZE_DB failed for db %s"
" on node %u, ret=%d\n",
state->db_name, pnn, ret2);
node_list_ban_credits(state->nlist, pnn);
} else {
D_ERR("control FREEZE_DB failed for db %s, ret=%d\n",
state->db_name, ret);
}
tevent_req_error(req, ret);
return;
}
ctdb_req_control_db_transaction_start(&request, &state->transdb);
subreq = ctdb_client_control_multi_send(state,
state->ev,
state->client,
state->nlist->pnn_list,
state->nlist->count,
TIMEOUT(),
&request);
if (tevent_req_nomem(subreq, req)) {
return;
}
tevent_req_set_callback(subreq, recover_db_transaction_started, req);
}
static void recover_db_transaction_started(struct tevent_req *subreq)
{
struct tevent_req *req = tevent_req_callback_data(
subreq, struct tevent_req);
struct recover_db_state *state = tevent_req_data(
req, struct recover_db_state);
int *err_list;
uint32_t flags;
int ret;
bool status;
status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
NULL);
TALLOC_FREE(subreq);
if (! status) {
int ret2;
uint32_t pnn;
ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
state->nlist->count,
err_list,
&pnn);
if (ret2 != 0) {
D_ERR("control TRANSACTION_DB failed for db=%s"
" on node %u, ret=%d\n",
state->db_name, pnn, ret2);
} else {
D_ERR("control TRANSACTION_DB failed for db=%s,"
" ret=%d\n", state->db_name, ret);
}
tevent_req_error(req, ret);
return;
}
flags = state->db->db_flags;
state->recdb = recdb_create(state,
state->db->db_id,
state->db_name,
state->db_path,
state->tun_list->database_hash_size,
flags & CTDB_DB_FLAGS_PERSISTENT);
if (tevent_req_nomem(state->recdb, req)) {
return;
}
if ((flags & CTDB_DB_FLAGS_PERSISTENT) ||
(flags & CTDB_DB_FLAGS_REPLICATED)) {
subreq = collect_highseqnum_db_send(state,
state->ev,
state->client,
state->nlist,
state->db->db_id,
state->recdb);
} else {
subreq = collect_all_db_send(state,
state->ev,
state->client,
state->nlist,
state->db->db_id,
state->recdb);
}
if (tevent_req_nomem(subreq, req)) {
return;
}
tevent_req_set_callback(subreq, recover_db_collect_done, req);
}
static void recover_db_collect_done(struct tevent_req *subreq)
{
struct tevent_req *req = tevent_req_callback_data(
subreq, struct tevent_req);
struct recover_db_state *state = tevent_req_data(
req, struct recover_db_state);
struct ctdb_req_control request;
int ret;
bool status;
if ((state->db->db_flags & CTDB_DB_FLAGS_PERSISTENT) ||
(state->db->db_flags & CTDB_DB_FLAGS_REPLICATED)) {
status = collect_highseqnum_db_recv(subreq, &ret);
} else {
status = collect_all_db_recv(subreq, &ret);
}
TALLOC_FREE(subreq);
if (! status) {
tevent_req_error(req, ret);
return;
}
ctdb_req_control_wipe_database(&request, &state->transdb);
subreq = ctdb_client_control_multi_send(state,
state->ev,
state->client,
state->nlist->pnn_list,
state->nlist->count,
TIMEOUT(),
&request);
if (tevent_req_nomem(subreq, req)) {
return;
}
tevent_req_set_callback(subreq, recover_db_wipedb_done, req);
}
static void recover_db_wipedb_done(struct tevent_req *subreq)
{
struct tevent_req *req = tevent_req_callback_data(
subreq, struct tevent_req);
struct recover_db_state *state = tevent_req_data(
req, struct recover_db_state);
int *err_list;
int ret;
bool status;
status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
NULL);
TALLOC_FREE(subreq);
if (! status) {
int ret2;
uint32_t pnn;
ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
state->nlist->count,
err_list,
&pnn);
if (ret2 != 0) {
D_ERR("control WIPEDB failed for db %s on node %u,"
" ret=%d\n", state->db_name, pnn, ret2);
} else {
D_ERR("control WIPEDB failed for db %s, ret=%d\n",
state->db_name, ret);
}
tevent_req_error(req, ret);
return;
}
subreq = push_database_send(state,
state->ev,
state->client,
state->nlist->pnn_list,
state->nlist->count,
state->recdb,
state->tun_list->rec_buffer_size_limit);
if (tevent_req_nomem(subreq, req)) {
return;
}
tevent_req_set_callback(subreq, recover_db_pushdb_done, req);
}
static void recover_db_pushdb_done(struct tevent_req *subreq)
{
struct tevent_req *req = tevent_req_callback_data(
subreq, struct tevent_req);
struct recover_db_state *state = tevent_req_data(
req, struct recover_db_state);
struct ctdb_req_control request;
int ret;
bool status;
status = push_database_recv(subreq, &ret);
TALLOC_FREE(subreq);
if (! status) {
tevent_req_error(req, ret);
return;
}
TALLOC_FREE(state->recdb);
ctdb_req_control_db_transaction_commit(&request, &state->transdb);
subreq = ctdb_client_control_multi_send(state,
state->ev,
state->client,
state->nlist->pnn_list,
state->nlist->count,
TIMEOUT(),
&request);
if (tevent_req_nomem(subreq, req)) {
return;
}
tevent_req_set_callback(subreq, recover_db_transaction_committed, req);
}
static void recover_db_transaction_committed(struct tevent_req *subreq)
{
struct tevent_req *req = tevent_req_callback_data(
subreq, struct tevent_req);
struct recover_db_state *state = tevent_req_data(
req, struct recover_db_state);
struct ctdb_req_control request;
int *err_list;
int ret;
bool status;
status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
NULL);
TALLOC_FREE(subreq);
if (! status) {
int ret2;
uint32_t pnn;
ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
state->nlist->count,
err_list,
&pnn);
if (ret2 != 0) {
D_ERR("control DB_TRANSACTION_COMMIT failed for db %s"
" on node %u, ret=%d\n",
state->db_name, pnn, ret2);
} else {
D_ERR("control DB_TRANSACTION_COMMIT failed for db %s,"
" ret=%d\n", state->db_name, ret);
}
tevent_req_error(req, ret);
return;
}
ctdb_req_control_db_thaw(&request, state->db->db_id);
subreq = ctdb_client_control_multi_send(state,
state->ev,
state->client,
state->nlist->pnn_list,
state->nlist->count,
TIMEOUT(),
&request);
if (tevent_req_nomem(subreq, req)) {
return;
}
tevent_req_set_callback(subreq, recover_db_thaw_done, req);
}
static void recover_db_thaw_done(struct tevent_req *subreq)
{
struct tevent_req *req = tevent_req_callback_data(
subreq, struct tevent_req);
struct recover_db_state *state = tevent_req_data(
req, struct recover_db_state);
int *err_list;
int ret;
bool status;
status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
NULL);
TALLOC_FREE(subreq);
if (! status) {
int ret2;
uint32_t pnn;
ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
state->nlist->count,
err_list,
&pnn);
if (ret2 != 0) {
D_ERR("control DB_THAW failed for db %s on node %u,"
" ret=%d\n", state->db_name, pnn, ret2);
} else {
D_ERR("control DB_THAW failed for db %s, ret=%d\n",
state->db_name, ret);
}
tevent_req_error(req, ret);
return;
}
tevent_req_done(req);
}
static bool recover_db_recv(struct tevent_req *req)
{
return generic_recv(req, NULL);
}
/*
* Start database recovery for each database
*
* Try to recover each database 5 times before failing recovery.
*/
struct db_recovery_state {
struct tevent_context *ev;
struct db_list *dblist;
unsigned int num_replies;
unsigned int num_failed;
};
struct db_recovery_one_state {
struct tevent_req *req;
struct ctdb_client_context *client;
struct db_list *dblist;
struct ctdb_tunable_list *tun_list;
struct node_list *nlist;
uint32_t generation;
struct db *db;
int num_fails;
};
static void db_recovery_one_done(struct tevent_req *subreq);
static struct tevent_req *db_recovery_send(TALLOC_CTX *mem_ctx,
struct tevent_context *ev,
struct ctdb_client_context *client,
struct db_list *dblist,
struct ctdb_tunable_list *tun_list,
struct node_list *nlist,
uint32_t generation)
{
struct tevent_req *req, *subreq;
struct db_recovery_state *state;
struct db *db;
req = tevent_req_create(mem_ctx, &state, struct db_recovery_state);
if (req == NULL) {
return NULL;
}
state->ev = ev;
state->dblist = dblist;
state->num_replies = 0;
state->num_failed = 0;
if (dblist->num_dbs == 0) {
tevent_req_done(req);
return tevent_req_post(req, ev);
}
for (db = dblist->db; db != NULL; db = db->next) {
struct db_recovery_one_state *substate;
substate = talloc_zero(state, struct db_recovery_one_state);
if (tevent_req_nomem(substate, req)) {
return tevent_req_post(req, ev);
}
substate->req = req;
substate->client = client;
substate->dblist = dblist;
substate->tun_list = tun_list;
substate->nlist = nlist;
substate->generation = generation;
substate->db = db;
subreq = recover_db_send(state,
ev,
client,
tun_list,
nlist,
generation,
substate->db);
if (tevent_req_nomem(subreq, req)) {
return tevent_req_post(req, ev);
}
tevent_req_set_callback(subreq, db_recovery_one_done,
substate);
D_NOTICE("recover database 0x%08x\n", substate->db->db_id);
}
return req;
}
static void db_recovery_one_done(struct tevent_req *subreq)
{
struct db_recovery_one_state *substate = tevent_req_callback_data(
subreq, struct db_recovery_one_state);
struct tevent_req *req = substate->req;
struct db_recovery_state *state = tevent_req_data(
req, struct db_recovery_state);
bool status;
status = recover_db_recv(subreq);
TALLOC_FREE(subreq);
if (status) {
talloc_free(substate);
goto done;
}
substate->num_fails += 1;
if (substate->num_fails < NUM_RETRIES) {
subreq = recover_db_send(state,
state->ev,
substate->client,
substate->tun_list,
substate->nlist,
substate->generation,
substate->db);
if (tevent_req_nomem(subreq, req)) {
goto failed;
}
tevent_req_set_callback(subreq, db_recovery_one_done, substate);
D_NOTICE("recover database 0x%08x, attempt %d\n",
substate->db->db_id, substate->num_fails+1);
return;
}
failed:
state->num_failed += 1;
done:
state->num_replies += 1;
if (state->num_replies == state->dblist->num_dbs) {
tevent_req_done(req);
}
}
static bool db_recovery_recv(struct tevent_req *req, unsigned int *count)
{
struct db_recovery_state *state = tevent_req_data(
req, struct db_recovery_state);
int err;
if (tevent_req_is_unix_error(req, &err)) {
*count = 0;
return false;
}
*count = state->num_replies - state->num_failed;
if (state->num_failed > 0) {
return false;
}
return true;
}
struct ban_node_state {
struct tevent_context *ev;
struct ctdb_client_context *client;
struct ctdb_tunable_list *tun_list;
struct node_list *nlist;
uint32_t destnode;
uint32_t max_pnn;
};
static bool ban_node_check(struct tevent_req *req);
static void ban_node_check_done(struct tevent_req *subreq);
static void ban_node_done(struct tevent_req *subreq);
static struct tevent_req *ban_node_send(TALLOC_CTX *mem_ctx,
struct tevent_context *ev,
struct ctdb_client_context *client,
struct ctdb_tunable_list *tun_list,
struct node_list *nlist)
{
struct tevent_req *req;
struct ban_node_state *state;
bool ok;
req = tevent_req_create(mem_ctx, &state, struct ban_node_state);
if (req == NULL) {
return NULL;
}
state->ev = ev;
state->client = client;
state->tun_list = tun_list;
state->nlist = nlist;
state->destnode = ctdb_client_pnn(client);
/* Bans are not enabled */
if (state->tun_list->enable_bans == 0) {
D_ERR("Bans are not enabled\n");
tevent_req_done(req);
return tevent_req_post(req, ev);
}
ok = ban_node_check(req);
if (!ok) {
return tevent_req_post(req, ev);
}
return req;
}
static bool ban_node_check(struct tevent_req *req)
{
struct tevent_req *subreq;
struct ban_node_state *state = tevent_req_data(
req, struct ban_node_state);
struct ctdb_req_control request;
unsigned max_credits = 0, i;
for (i=0; i<state->nlist->count; i++) {
if (state->nlist->ban_credits[i] > max_credits) {
state->max_pnn = state->nlist->pnn_list[i];
max_credits = state->nlist->ban_credits[i];
}
}
if (max_credits < NUM_RETRIES) {
tevent_req_done(req);
return false;
}
ctdb_req_control_get_nodemap(&request);
subreq = ctdb_client_control_send(state,
state->ev,
state->client,
state->max_pnn,
TIMEOUT(),
&request);
if (tevent_req_nomem(subreq, req)) {
return false;
}
tevent_req_set_callback(subreq, ban_node_check_done, req);
return true;
}
static void ban_node_check_done(struct tevent_req *subreq)
{
struct tevent_req *req = tevent_req_callback_data(
subreq, struct tevent_req);
struct ban_node_state *state = tevent_req_data(
req, struct ban_node_state);
struct ctdb_reply_control *reply;
struct ctdb_node_map *nodemap;
struct ctdb_req_control request;
struct ctdb_ban_state ban;
unsigned int i;
int ret;
bool ok;
ok = ctdb_client_control_recv(subreq, &ret, state, &reply);
TALLOC_FREE(subreq);
if (!ok) {
D_ERR("control GET_NODEMAP failed to node %u, ret=%d\n",
state->max_pnn, ret);
tevent_req_error(req, ret);
return;
}
ret = ctdb_reply_control_get_nodemap(reply, state, &nodemap);
if (ret != 0) {
D_ERR("control GET_NODEMAP failed, ret=%d\n", ret);
tevent_req_error(req, ret);
return;
}
for (i=0; i<nodemap->num; i++) {
if (nodemap->node[i].pnn != state->max_pnn) {
continue;
}
/* If the node became inactive, reset ban_credits */
if (nodemap->node[i].flags & NODE_FLAGS_INACTIVE) {
unsigned int j;
for (j=0; j<state->nlist->count; j++) {
if (state->nlist->pnn_list[j] ==
state->max_pnn) {
state->nlist->ban_credits[j] = 0;
break;
}
}
state->max_pnn = CTDB_UNKNOWN_PNN;
}
}
talloc_free(nodemap);
talloc_free(reply);
/* If node becames inactive during recovery, pick next */
if (state->max_pnn == CTDB_UNKNOWN_PNN) {
(void) ban_node_check(req);
return;
}
ban = (struct ctdb_ban_state) {
.pnn = state->max_pnn,
.time = state->tun_list->recovery_ban_period,
};
D_ERR("Banning node %u for %u seconds\n", ban.pnn, ban.time);
ctdb_req_control_set_ban_state(&request, &ban);
subreq = ctdb_client_control_send(state,
state->ev,
state->client,
ban.pnn,
TIMEOUT(),
&request);
if (tevent_req_nomem(subreq, req)) {
return;
}
tevent_req_set_callback(subreq, ban_node_done, req);
}
static void ban_node_done(struct tevent_req *subreq)
{
struct tevent_req *req = tevent_req_callback_data(
subreq, struct tevent_req);
struct node_ban_state *state = tevent_req_data(
req, struct node_ban_state);
struct ctdb_reply_control *reply;
int ret;
bool status;
status = ctdb_client_control_recv(subreq, &ret, state, &reply);
TALLOC_FREE(subreq);
if (! status) {
tevent_req_error(req, ret);
return;
}
ret = ctdb_reply_control_set_ban_state(reply);
if (ret != 0) {
D_ERR("control SET_BAN_STATE failed, ret=%d\n", ret);
tevent_req_error(req, ret);
return;
}
talloc_free(reply);
tevent_req_done(req);
}
static bool ban_node_recv(struct tevent_req *req, int *perr)
{
if (tevent_req_is_unix_error(req, perr)) {
return false;
}
return true;
}
/*
* Run the parallel database recovery
*
* - Get tunables
* - Get nodemap from all nodes
* - Get capabilities from all nodes
* - Get dbmap
* - Set RECOVERY_ACTIVE
* - Send START_RECOVERY
* - Update vnnmap on all nodes
* - Run database recovery
* - Set RECOVERY_NORMAL
* - Send END_RECOVERY
*/
struct recovery_state {
struct tevent_context *ev;
struct ctdb_client_context *client;
uint32_t generation;
uint32_t destnode;
struct node_list *nlist;
struct ctdb_tunable_list *tun_list;
struct ctdb_vnn_map *vnnmap;
struct db_list *dblist;
};
static void recovery_tunables_done(struct tevent_req *subreq);
static void recovery_nodemap_done(struct tevent_req *subreq);
static void recovery_nodemap_verify(struct tevent_req *subreq);
static void recovery_capabilities_done(struct tevent_req *subreq);
static void recovery_dbmap_done(struct tevent_req *subreq);
static void recovery_active_done(struct tevent_req *subreq);
static void recovery_start_recovery_done(struct tevent_req *subreq);
static void recovery_vnnmap_update_done(struct tevent_req *subreq);
static void recovery_db_recovery_done(struct tevent_req *subreq);
static void recovery_failed_done(struct tevent_req *subreq);
static void recovery_normal_done(struct tevent_req *subreq);
static void recovery_end_recovery_done(struct tevent_req *subreq);
static struct tevent_req *recovery_send(TALLOC_CTX *mem_ctx,
struct tevent_context *ev,
struct ctdb_client_context *client,
uint32_t generation)
{
struct tevent_req *req, *subreq;
struct recovery_state *state;
struct ctdb_req_control request;
req = tevent_req_create(mem_ctx, &state, struct recovery_state);
if (req == NULL) {
return NULL;
}
state->ev = ev;
state->client = client;
state->generation = generation;
state->destnode = ctdb_client_pnn(client);
ctdb_req_control_get_all_tunables(&request);
subreq = ctdb_client_control_send(state, state->ev, state->client,
state->destnode, TIMEOUT(),
&request);
if (tevent_req_nomem(subreq, req)) {
return tevent_req_post(req, ev);
}
tevent_req_set_callback(subreq, recovery_tunables_done, req);
return req;
}
static void recovery_tunables_done(struct tevent_req *subreq)
{
struct tevent_req *req = tevent_req_callback_data(
subreq, struct tevent_req);
struct recovery_state *state = tevent_req_data(
req, struct recovery_state);
struct ctdb_reply_control *reply;
struct ctdb_req_control request;
int ret;
bool status;
status = ctdb_client_control_recv(subreq, &ret, state, &reply);
TALLOC_FREE(subreq);
if (! status) {
D_ERR("control GET_ALL_TUNABLES failed, ret=%d\n", ret);
tevent_req_error(req, ret);
return;
}
ret = ctdb_reply_control_get_all_tunables(reply, state,
&state->tun_list);
if (ret != 0) {
D_ERR("control GET_ALL_TUNABLES failed, ret=%d\n", ret);
tevent_req_error(req, EPROTO);
return;
}
talloc_free(reply);
recover_timeout = state->tun_list->recover_timeout;
ctdb_req_control_get_nodemap(&request);
subreq = ctdb_client_control_send(state, state->ev, state->client,
state->destnode, TIMEOUT(),
&request);
if (tevent_req_nomem(subreq, req)) {
return;
}
tevent_req_set_callback(subreq, recovery_nodemap_done, req);
}
static void recovery_nodemap_done(struct tevent_req *subreq)
{
struct tevent_req *req = tevent_req_callback_data(
subreq, struct tevent_req);
struct recovery_state *state = tevent_req_data(
req, struct recovery_state);
struct ctdb_reply_control *reply;
struct ctdb_req_control request;
struct ctdb_node_map *nodemap;
unsigned int i;
bool status;
int ret;
status = ctdb_client_control_recv(subreq, &ret, state, &reply);
TALLOC_FREE(subreq);
if (! status) {
D_ERR("control GET_NODEMAP failed to node %u, ret=%d\n",
state->destnode, ret);
tevent_req_error(req, ret);
return;
}
ret = ctdb_reply_control_get_nodemap(reply, state, &nodemap);
if (ret != 0) {
D_ERR("control GET_NODEMAP failed, ret=%d\n", ret);
tevent_req_error(req, ret);
return;
}
state->nlist = node_list_init(state, nodemap->num);
if (tevent_req_nomem(state->nlist, req)) {
return;
}
for (i=0; i<nodemap->num; i++) {
bool ok;
if (nodemap->node[i].flags & NODE_FLAGS_DISCONNECTED) {
continue;
}
ok = node_list_add(state->nlist, nodemap->node[i].pnn);
if (!ok) {
tevent_req_error(req, EINVAL);
return;
}
}
talloc_free(nodemap);
talloc_free(reply);
/* Verify flags by getting local node information from each node */
ctdb_req_control_get_nodemap(&request);
subreq = ctdb_client_control_multi_send(state,
state->ev,
state->client,
state->nlist->pnn_list,
state->nlist->count,
TIMEOUT(),
&request);
if (tevent_req_nomem(subreq, req)) {
return;
}
tevent_req_set_callback(subreq, recovery_nodemap_verify, req);
}
static void recovery_nodemap_verify(struct tevent_req *subreq)
{
struct tevent_req *req = tevent_req_callback_data(
subreq, struct tevent_req);
struct recovery_state *state = tevent_req_data(
req, struct recovery_state);
struct ctdb_req_control request;
struct ctdb_reply_control **reply;
struct node_list *nlist;
unsigned int i;
int *err_list;
int ret;
bool status;
status = ctdb_client_control_multi_recv(subreq,
&ret,
state,
&err_list,
&reply);
TALLOC_FREE(subreq);
if (! status) {
int ret2;
uint32_t pnn;
ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
state->nlist->count,
err_list,
&pnn);
if (ret2 != 0) {
D_ERR("control GET_NODEMAP failed on node %u,"
" ret=%d\n", pnn, ret2);
} else {
D_ERR("control GET_NODEMAP failed, ret=%d\n", ret);
}
tevent_req_error(req, ret);
return;
}
nlist = node_list_init(state, state->nlist->size);
if (tevent_req_nomem(nlist, req)) {
return;
}
for (i=0; i<state->nlist->count; i++) {
struct ctdb_node_map *nodemap = NULL;
uint32_t pnn, flags;
unsigned int j;
bool ok;
pnn = state->nlist->pnn_list[i];
ret = ctdb_reply_control_get_nodemap(reply[i],
state,
&nodemap);
if (ret != 0) {
D_ERR("control GET_NODEMAP failed on node %u\n", pnn);
tevent_req_error(req, EPROTO);
return;
}
flags = NODE_FLAGS_DISCONNECTED;
for (j=0; j<nodemap->num; j++) {
if (nodemap->node[j].pnn == pnn) {
flags = nodemap->node[j].flags;
break;
}
}
TALLOC_FREE(nodemap);
if (flags & NODE_FLAGS_INACTIVE) {
continue;
}
ok = node_list_add(nlist, pnn);
if (!ok) {
tevent_req_error(req, EINVAL);
return;
}
}
talloc_free(reply);
talloc_free(state->nlist);
state->nlist = nlist;
ctdb_req_control_get_capabilities(&request);
subreq = ctdb_client_control_multi_send(state,
state->ev,
state->client,
state->nlist->pnn_list,
state->nlist->count,
TIMEOUT(),
&request);
if (tevent_req_nomem(subreq, req)) {
return;
}
tevent_req_set_callback(subreq, recovery_capabilities_done, req);
}
static void recovery_capabilities_done(struct tevent_req *subreq)
{
struct tevent_req *req = tevent_req_callback_data(
subreq, struct tevent_req);
struct recovery_state *state = tevent_req_data(
req, struct recovery_state);
struct ctdb_reply_control **reply;
struct ctdb_req_control request;
int *err_list;
unsigned int i;
int ret;
bool status;
status = ctdb_client_control_multi_recv(subreq, &ret, state, &err_list,
&reply);
TALLOC_FREE(subreq);
if (! status) {
int ret2;
uint32_t pnn;
ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
state->nlist->count,
err_list,
&pnn);
if (ret2 != 0) {
D_ERR("control GET_CAPABILITIES failed on node %u,"
" ret=%d\n", pnn, ret2);
} else {
D_ERR("control GET_CAPABILITIES failed, ret=%d\n",
ret);
}
tevent_req_error(req, ret);
return;
}
for (i=0; i<state->nlist->count; i++) {
uint32_t caps;
ret = ctdb_reply_control_get_capabilities(reply[i], &caps);
if (ret != 0) {
D_ERR("control GET_CAPABILITIES failed on node %u\n",
state->nlist->pnn_list[i]);
tevent_req_error(req, EPROTO);
return;
}
state->nlist->caps[i] = caps;
}
talloc_free(reply);
ctdb_req_control_get_dbmap(&request);
subreq = ctdb_client_control_multi_send(state,
state->ev,
state->client,
state->nlist->pnn_list,
state->nlist->count,
TIMEOUT(),
&request);
if (tevent_req_nomem(subreq, req)) {
return;
}
tevent_req_set_callback(subreq, recovery_dbmap_done, req);
}
static void recovery_dbmap_done(struct tevent_req *subreq)
{
struct tevent_req *req = tevent_req_callback_data(
subreq, struct tevent_req);
struct recovery_state *state = tevent_req_data(
req, struct recovery_state);
struct ctdb_reply_control **reply;
struct ctdb_req_control request;
int *err_list;
unsigned int i, j;
int ret;
bool status;
status = ctdb_client_control_multi_recv(subreq,
&ret,
state,
&err_list,
&reply);
TALLOC_FREE(subreq);
if (! status) {
int ret2;
uint32_t pnn;
ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
state->nlist->count,
err_list,
&pnn);
if (ret2 != 0) {
D_ERR("control GET_DBMAP failed on node %u,"
" ret=%d\n", pnn, ret2);
} else {
D_ERR("control GET_DBMAP failed, ret=%d\n",
ret);
}
tevent_req_error(req, ret);
return;
}
state->dblist = db_list_init(state, state->nlist->count);
if (tevent_req_nomem(state->dblist, req)) {
D_ERR("memory allocation error\n");
return;
}
for (i = 0; i < state->nlist->count; i++) {
struct ctdb_dbid_map *dbmap = NULL;
uint32_t pnn;
pnn = state->nlist->pnn_list[i];
ret = ctdb_reply_control_get_dbmap(reply[i], state, &dbmap);
if (ret != 0) {
D_ERR("control GET_DBMAP failed on node %u\n",
pnn);
tevent_req_error(req, EPROTO);
return;
}
for (j = 0; j < dbmap->num; j++) {
ret = db_list_check_and_add(state->dblist,
dbmap->dbs[j].db_id,
dbmap->dbs[j].flags,
pnn);
if (ret != 0) {
D_ERR("failed to add database list entry, "
"ret=%d\n",
ret);
tevent_req_error(req, ret);
return;
}
}
TALLOC_FREE(dbmap);
}
ctdb_req_control_set_recmode(&request, CTDB_RECOVERY_ACTIVE);
subreq = ctdb_client_control_multi_send(state,
state->ev,
state->client,
state->nlist->pnn_list,
state->nlist->count,
TIMEOUT(),
&request);
if (tevent_req_nomem(subreq, req)) {
return;
}
tevent_req_set_callback(subreq, recovery_active_done, req);
}
static void recovery_active_done(struct tevent_req *subreq)
{
struct tevent_req *req = tevent_req_callback_data(
subreq, struct tevent_req);
struct recovery_state *state = tevent_req_data(
req, struct recovery_state);
struct ctdb_req_control request;
struct ctdb_vnn_map *vnnmap;
int *err_list;
int ret;
bool status;
status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
NULL);
TALLOC_FREE(subreq);
if (! status) {
int ret2;
uint32_t pnn;
ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
state->nlist->count,
err_list,
&pnn);
if (ret2 != 0) {
D_ERR("failed to set recovery mode ACTIVE on node %u,"
" ret=%d\n", pnn, ret2);
} else {
D_ERR("failed to set recovery mode ACTIVE, ret=%d\n",
ret);
}
tevent_req_error(req, ret);
return;
}
D_ERR("Set recovery mode to ACTIVE\n");
/* Calculate new VNNMAP */
vnnmap = talloc_zero(state, struct ctdb_vnn_map);
if (tevent_req_nomem(vnnmap, req)) {
return;
}
vnnmap->map = node_list_lmaster(state->nlist, vnnmap, &vnnmap->size);
if (tevent_req_nomem(vnnmap->map, req)) {
return;
}
if (vnnmap->size == 0) {
D_WARNING("No active lmasters found. Adding recmaster anyway\n");
vnnmap->map[0] = state->destnode;
vnnmap->size = 1;
}
vnnmap->generation = state->generation;
state->vnnmap = vnnmap;
ctdb_req_control_start_recovery(&request);
subreq = ctdb_client_control_multi_send(state,
state->ev,
state->client,
state->nlist->pnn_list,
state->nlist->count,
TIMEOUT(),
&request);
if (tevent_req_nomem(subreq, req)) {
return;
}
tevent_req_set_callback(subreq, recovery_start_recovery_done, req);
}
static void recovery_start_recovery_done(struct tevent_req *subreq)
{
struct tevent_req *req = tevent_req_callback_data(
subreq, struct tevent_req);
struct recovery_state *state = tevent_req_data(
req, struct recovery_state);
struct ctdb_req_control request;
int *err_list;
int ret;
bool status;
status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
NULL);
TALLOC_FREE(subreq);
if (! status) {
int ret2;
uint32_t pnn;
ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
state->nlist->count,
err_list,
&pnn);
if (ret2 != 0) {
D_ERR("failed to run start_recovery event on node %u,"
" ret=%d\n", pnn, ret2);
} else {
D_ERR("failed to run start_recovery event, ret=%d\n",
ret);
}
tevent_req_error(req, ret);
return;
}
D_ERR("start_recovery event finished\n");
ctdb_req_control_setvnnmap(&request, state->vnnmap);
subreq = ctdb_client_control_multi_send(state,
state->ev,
state->client,
state->nlist->pnn_list,
state->nlist->count,
TIMEOUT(),
&request);
if (tevent_req_nomem(subreq, req)) {
return;
}
tevent_req_set_callback(subreq, recovery_vnnmap_update_done, req);
}
static void recovery_vnnmap_update_done(struct tevent_req *subreq)
{
struct tevent_req *req = tevent_req_callback_data(
subreq, struct tevent_req);
struct recovery_state *state = tevent_req_data(
req, struct recovery_state);
int *err_list;
int ret;
bool status;
status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list,
NULL);
TALLOC_FREE(subreq);
if (! status) {
int ret2;
uint32_t pnn;
ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
state->nlist->count,
err_list,
&pnn);
if (ret2 != 0) {
D_ERR("failed to update VNNMAP on node %u, ret=%d\n",
pnn, ret2);
} else {
D_ERR("failed to update VNNMAP, ret=%d\n", ret);
}
tevent_req_error(req, ret);
return;
}
D_NOTICE("updated VNNMAP\n");
subreq = db_recovery_send(state,
state->ev,
state->client,
state->dblist,
state->tun_list,
state->nlist,
state->vnnmap->generation);
if (tevent_req_nomem(subreq, req)) {
return;
}
tevent_req_set_callback(subreq, recovery_db_recovery_done, req);
}
static void recovery_db_recovery_done(struct tevent_req *subreq)
{
struct tevent_req *req = tevent_req_callback_data(
subreq, struct tevent_req);
struct recovery_state *state = tevent_req_data(
req, struct recovery_state);
struct ctdb_req_control request;
bool status;
unsigned int count;
status = db_recovery_recv(subreq, &count);
TALLOC_FREE(subreq);
D_ERR("%d of %d databases recovered\n", count, state->dblist->num_dbs);
if (! status) {
subreq = ban_node_send(state,
state->ev,
state->client,
state->tun_list,
state->nlist);
if (tevent_req_nomem(subreq, req)) {
return;
}
tevent_req_set_callback(subreq, recovery_failed_done, req);
return;
}
ctdb_req_control_set_recmode(&request, CTDB_RECOVERY_NORMAL);
subreq = ctdb_client_control_multi_send(state,
state->ev,
state->client,
state->nlist->pnn_list,
state->nlist->count,
TIMEOUT(),
&request);
if (tevent_req_nomem(subreq, req)) {
return;
}
tevent_req_set_callback(subreq, recovery_normal_done, req);
}
static void recovery_failed_done(struct tevent_req *subreq)
{
struct tevent_req *req = tevent_req_callback_data(
subreq, struct tevent_req);
int ret;
bool status;
status = ban_node_recv(subreq, &ret);
TALLOC_FREE(subreq);
if (! status) {
D_ERR("failed to ban node, ret=%d\n", ret);
}
tevent_req_error(req, EIO);
}
static void recovery_normal_done(struct tevent_req *subreq)
{
struct tevent_req *req = tevent_req_callback_data(
subreq, struct tevent_req);
struct recovery_state *state = tevent_req_data(
req, struct recovery_state);
struct ctdb_req_control request;
int *err_list;
int ret;
bool status;
status = ctdb_client_control_multi_recv(subreq, &ret, state, &err_list,
NULL);
TALLOC_FREE(subreq);
if (! status) {
int ret2;
uint32_t pnn;
ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
state->nlist->count,
err_list,
&pnn);
if (ret2 != 0) {
D_ERR("failed to set recovery mode NORMAL on node %u,"
" ret=%d\n", pnn, ret2);
} else {
D_ERR("failed to set recovery mode NORMAL, ret=%d\n",
ret);
}
tevent_req_error(req, ret);
return;
}
D_ERR("Set recovery mode to NORMAL\n");
ctdb_req_control_end_recovery(&request);
subreq = ctdb_client_control_multi_send(state,
state->ev,
state->client,
state->nlist->pnn_list,
state->nlist->count,
TIMEOUT(),
&request);
if (tevent_req_nomem(subreq, req)) {
return;
}
tevent_req_set_callback(subreq, recovery_end_recovery_done, req);
}
static void recovery_end_recovery_done(struct tevent_req *subreq)
{
struct tevent_req *req = tevent_req_callback_data(
subreq, struct tevent_req);
struct recovery_state *state = tevent_req_data(
req, struct recovery_state);
int *err_list;
int ret;
bool status;
status = ctdb_client_control_multi_recv(subreq, &ret, state, &err_list,
NULL);
TALLOC_FREE(subreq);
if (! status) {
int ret2;
uint32_t pnn;
ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list,
state->nlist->count,
err_list,
&pnn);
if (ret2 != 0) {
D_ERR("failed to run recovered event on node %u,"
" ret=%d\n", pnn, ret2);
} else {
D_ERR("failed to run recovered event, ret=%d\n", ret);
}
tevent_req_error(req, ret);
return;
}
D_ERR("recovered event finished\n");
tevent_req_done(req);
}
static void recovery_recv(struct tevent_req *req, int *perr)
{
generic_recv(req, perr);
}
static void usage(const char *progname)
{
fprintf(stderr, "\nUsage: %s <output-fd> <ctdb-socket-path> <generation>\n",
progname);
}
/*
* Arguments - log fd, write fd, socket path, generation
*/
int main(int argc, char *argv[])
{
int write_fd;
const char *sockpath;
TALLOC_CTX *mem_ctx = NULL;
struct tevent_context *ev;
struct ctdb_client_context *client;
bool status;
int ret = 0;
struct tevent_req *req;
uint32_t generation;
if (argc != 4) {
usage(argv[0]);
exit(1);
}
write_fd = atoi(argv[1]);
sockpath = argv[2];
generation = (uint32_t)smb_strtoul(argv[3],
NULL,
0,
&ret,
SMB_STR_STANDARD);
if (ret != 0) {
fprintf(stderr, "recovery: unable to initialize generation\n");
goto failed;
}
mem_ctx = talloc_new(NULL);
if (mem_ctx == NULL) {
fprintf(stderr, "recovery: talloc_new() failed\n");
goto failed;
}
ret = logging_init(mem_ctx, NULL, NULL, "ctdb-recovery");
if (ret != 0) {
fprintf(stderr, "recovery: Unable to initialize logging\n");
goto failed;
}
ev = tevent_context_init(mem_ctx);
if (ev == NULL) {
D_ERR("tevent_context_init() failed\n");
goto failed;
}
status = logging_setup_sighup_handler(ev, mem_ctx, NULL, NULL);
if (!status) {
D_ERR("logging_setup_sighup_handler() failed\n");
goto failed;
}
ret = ctdb_client_init(mem_ctx, ev, sockpath, &client);
if (ret != 0) {
D_ERR("ctdb_client_init() failed, ret=%d\n", ret);
goto failed;
}
req = recovery_send(mem_ctx, ev, client, generation);
if (req == NULL) {
D_ERR("database_recover_send() failed\n");
goto failed;
}
if (! tevent_req_poll(req, ev)) {
D_ERR("tevent_req_poll() failed\n");
goto failed;
}
recovery_recv(req, &ret);
TALLOC_FREE(req);
if (ret != 0) {
D_ERR("database recovery failed, ret=%d\n", ret);
goto failed;
}
sys_write(write_fd, &ret, sizeof(ret));
return 0;
failed:
TALLOC_FREE(mem_ctx);
return 1;
}