/* ctdb parallel database recovery Copyright (C) Amitay Isaacs 2015 This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, see . */ #include "replace.h" #include "system/network.h" #include "system/filesys.h" #include #include #include #include #include "lib/tdb_wrap/tdb_wrap.h" #include "lib/util/dlinklist.h" #include "lib/util/sys_rw.h" #include "lib/util/time.h" #include "lib/util/tevent_unix.h" #include "lib/util/util.h" #include "lib/util/smb_strtox.h" #include "protocol/protocol.h" #include "protocol/protocol_api.h" #include "client/client.h" #include "common/logging.h" static int recover_timeout = 30; #define NUM_RETRIES 3 #define TIMEOUT() timeval_current_ofs(recover_timeout, 0) /* * Utility functions */ static bool generic_recv(struct tevent_req *req, int *perr) { int err; if (tevent_req_is_unix_error(req, &err)) { if (perr != NULL) { *perr = err; } return false; } return true; } static uint64_t rec_srvid = CTDB_SRVID_RECOVERY; static uint64_t srvid_next(void) { rec_srvid += 1; return rec_srvid; } /* * Node related functions */ struct node_list { uint32_t *pnn_list; uint32_t *caps; uint32_t *ban_credits; unsigned int size; unsigned int count; }; static struct node_list *node_list_init(TALLOC_CTX *mem_ctx, unsigned int size) { struct node_list *nlist; unsigned int i; nlist = talloc_zero(mem_ctx, struct node_list); if (nlist == NULL) { return NULL; } nlist->pnn_list = talloc_array(nlist, uint32_t, size); nlist->caps = talloc_zero_array(nlist, uint32_t, size); nlist->ban_credits = talloc_zero_array(nlist, uint32_t, size); if (nlist->pnn_list == NULL || nlist->caps == NULL || nlist->ban_credits == NULL) { talloc_free(nlist); return NULL; } nlist->size = size; for (i=0; isize; i++) { nlist->pnn_list[i] = CTDB_UNKNOWN_PNN; } return nlist; } static bool node_list_add(struct node_list *nlist, uint32_t pnn) { unsigned int i; if (nlist->count == nlist->size) { return false; } for (i=0; icount; i++) { if (nlist->pnn_list[i] == pnn) { return false; } } nlist->pnn_list[nlist->count] = pnn; nlist->count += 1; return true; } static uint32_t *node_list_lmaster(struct node_list *nlist, TALLOC_CTX *mem_ctx, unsigned int *pnn_count) { uint32_t *pnn_list; unsigned int count, i; pnn_list = talloc_zero_array(mem_ctx, uint32_t, nlist->count); if (pnn_list == NULL) { return NULL; } count = 0; for (i=0; icount; i++) { if (!(nlist->caps[i] & CTDB_CAP_LMASTER)) { continue; } pnn_list[count] = nlist->pnn_list[i]; count += 1; } *pnn_count = count; return pnn_list; } static void node_list_ban_credits(struct node_list *nlist, uint32_t pnn) { unsigned int i; for (i=0; icount; i++) { if (nlist->pnn_list[i] == pnn) { nlist->ban_credits[i] += 1; break; } } } /* * Database list functions * * Simple, naive implementation that could be updated to a db_hash or similar */ struct db { struct db *prev, *next; uint32_t db_id; uint32_t db_flags; uint32_t *pnn_list; unsigned int num_nodes; }; struct db_list { unsigned int num_dbs; struct db *db; unsigned int num_nodes; }; static struct db_list *db_list_init(TALLOC_CTX *mem_ctx, unsigned int num_nodes) { struct db_list *l; l = talloc_zero(mem_ctx, struct db_list); l->num_nodes = num_nodes; return l; } static struct db *db_list_find(struct db_list *dblist, uint32_t db_id) { struct db *db; if (dblist == NULL) { return NULL; } db = dblist->db; while (db != NULL && db->db_id != db_id) { db = db->next; } return db; } static int db_list_add(struct db_list *dblist, uint32_t db_id, uint32_t db_flags, uint32_t node) { struct db *db = NULL; if (dblist == NULL) { return EINVAL; } db = talloc_zero(dblist, struct db); if (db == NULL) { return ENOMEM; } db->db_id = db_id; db->db_flags = db_flags; db->pnn_list = talloc_zero_array(db, uint32_t, dblist->num_nodes); if (db->pnn_list == NULL) { talloc_free(db); return ENOMEM; } db->pnn_list[0] = node; db->num_nodes = 1; DLIST_ADD_END(dblist->db, db); dblist->num_dbs++; return 0; } static int db_list_check_and_add(struct db_list *dblist, uint32_t db_id, uint32_t db_flags, uint32_t node) { struct db *db = NULL; int ret; /* * These flags are masked out because they are only set on a * node when a client attaches to that node, so they might not * be set yet. They can't be passed as part of the attach, so * they're no use here. */ db_flags &= ~(CTDB_DB_FLAGS_READONLY | CTDB_DB_FLAGS_STICKY); if (dblist == NULL) { return EINVAL; } db = db_list_find(dblist, db_id); if (db == NULL) { ret = db_list_add(dblist, db_id, db_flags, node); return ret; } if (db->db_flags != db_flags) { D_ERR("Incompatible database flags for 0x%"PRIx32" " "(0x%"PRIx32" != 0x%"PRIx32")\n", db_id, db_flags, db->db_flags); return EINVAL; } if (db->num_nodes >= dblist->num_nodes) { return EINVAL; } db->pnn_list[db->num_nodes] = node; db->num_nodes++; return 0; } /* * Create database on nodes where it is missing */ struct db_create_missing_state { struct tevent_context *ev; struct ctdb_client_context *client; struct node_list *nlist; const char *db_name; uint32_t *missing_pnn_list; int missing_num_nodes; }; static void db_create_missing_done(struct tevent_req *subreq); static struct tevent_req *db_create_missing_send( TALLOC_CTX *mem_ctx, struct tevent_context *ev, struct ctdb_client_context *client, struct node_list *nlist, const char *db_name, struct db *db) { struct tevent_req *req, *subreq; struct db_create_missing_state *state; struct ctdb_req_control request; unsigned int i, j; req = tevent_req_create(mem_ctx, &state, struct db_create_missing_state); if (req == NULL) { return NULL; } state->ev = ev; state->client = client; state->nlist = nlist; state->db_name = db_name; if (nlist->count == db->num_nodes) { tevent_req_done(req); return tevent_req_post(req, ev); } state->missing_pnn_list = talloc_array(mem_ctx, uint32_t, nlist->count); if (tevent_req_nomem(state->missing_pnn_list, req)) { return tevent_req_post(req, ev); } for (i = 0; i < nlist->count; i++) { uint32_t pnn = nlist->pnn_list[i] ; for (j = 0; j < db->num_nodes; j++) { if (pnn == db->pnn_list[j]) { break; } } if (j < db->num_nodes) { continue; } DBG_INFO("Create database %s on node %u\n", state->db_name, pnn); state->missing_pnn_list[state->missing_num_nodes] = pnn; state->missing_num_nodes++; } if (db->db_flags & CTDB_DB_FLAGS_PERSISTENT) { ctdb_req_control_db_attach_persistent(&request, db_name); } else if (db->db_flags & CTDB_DB_FLAGS_REPLICATED) { ctdb_req_control_db_attach_replicated(&request, db_name); } else { ctdb_req_control_db_attach(&request, db_name); } request.flags = CTDB_CTRL_FLAG_ATTACH_RECOVERY; subreq = ctdb_client_control_multi_send(state, state->ev, state->client, state->missing_pnn_list, state->missing_num_nodes, TIMEOUT(), &request); if (tevent_req_nomem(subreq, req)) { return tevent_req_post(req, ev); } tevent_req_set_callback(subreq, db_create_missing_done, req); return req; } static void db_create_missing_done(struct tevent_req *subreq) { struct tevent_req *req = tevent_req_callback_data( subreq, struct tevent_req); struct db_create_missing_state *state = tevent_req_data( req, struct db_create_missing_state); int *err_list; int ret; bool status; status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list, NULL); TALLOC_FREE(subreq); if (! status) { int ret2; uint32_t pnn; ret2 = ctdb_client_control_multi_error( state->missing_pnn_list, state->missing_num_nodes, err_list, &pnn); if (ret2 != 0) { D_ERR("control DB_ATTACH failed for db %s" " on node %u, ret=%d\n", state->db_name, pnn, ret2); node_list_ban_credits(state->nlist, pnn); } else { D_ERR("control DB_ATTACH failed for db %s, ret=%d\n", state->db_name, ret); } tevent_req_error(req, ret); return; } tevent_req_done(req); } static bool db_create_missing_recv(struct tevent_req *req, int *perr) { return generic_recv(req, perr); } /* * Recovery database functions */ struct recdb_context { uint32_t db_id; const char *db_name; const char *db_path; struct tdb_wrap *db; bool persistent; }; static struct recdb_context *recdb_create(TALLOC_CTX *mem_ctx, uint32_t db_id, const char *db_name, const char *db_path, uint32_t hash_size, bool persistent) { static char *db_dir_state = NULL; struct recdb_context *recdb; unsigned int tdb_flags; recdb = talloc(mem_ctx, struct recdb_context); if (recdb == NULL) { return NULL; } if (db_dir_state == NULL) { db_dir_state = getenv("CTDB_DBDIR_STATE"); } recdb->db_name = db_name; recdb->db_id = db_id; recdb->db_path = talloc_asprintf(recdb, "%s/recdb.%s", db_dir_state != NULL ? db_dir_state : dirname(discard_const(db_path)), db_name); if (recdb->db_path == NULL) { talloc_free(recdb); return NULL; } unlink(recdb->db_path); tdb_flags = TDB_NOLOCK | TDB_INCOMPATIBLE_HASH | TDB_DISALLOW_NESTING; recdb->db = tdb_wrap_open(mem_ctx, recdb->db_path, hash_size, tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600); if (recdb->db == NULL) { talloc_free(recdb); D_ERR("failed to create recovery db %s\n", recdb->db_path); return NULL; } recdb->persistent = persistent; return recdb; } static uint32_t recdb_id(struct recdb_context *recdb) { return recdb->db_id; } static const char *recdb_name(struct recdb_context *recdb) { return recdb->db_name; } static const char *recdb_path(struct recdb_context *recdb) { return recdb->db_path; } static struct tdb_context *recdb_tdb(struct recdb_context *recdb) { return recdb->db->tdb; } static bool recdb_persistent(struct recdb_context *recdb) { return recdb->persistent; } struct recdb_add_traverse_state { struct recdb_context *recdb; uint32_t mypnn; }; static int recdb_add_traverse(uint32_t reqid, struct ctdb_ltdb_header *header, TDB_DATA key, TDB_DATA data, void *private_data) { struct recdb_add_traverse_state *state = (struct recdb_add_traverse_state *)private_data; struct ctdb_ltdb_header *hdr; TDB_DATA prev_data; int ret; /* header is not marshalled separately in the pulldb control */ if (data.dsize < sizeof(struct ctdb_ltdb_header)) { return -1; } hdr = (struct ctdb_ltdb_header *)data.dptr; /* fetch the existing record, if any */ prev_data = tdb_fetch(recdb_tdb(state->recdb), key); if (prev_data.dptr != NULL) { struct ctdb_ltdb_header prev_hdr; prev_hdr = *(struct ctdb_ltdb_header *)prev_data.dptr; free(prev_data.dptr); if (hdr->rsn < prev_hdr.rsn || (hdr->rsn == prev_hdr.rsn && prev_hdr.dmaster != state->mypnn)) { return 0; } } ret = tdb_store(recdb_tdb(state->recdb), key, data, TDB_REPLACE); if (ret != 0) { return -1; } return 0; } static bool recdb_add(struct recdb_context *recdb, int mypnn, struct ctdb_rec_buffer *recbuf) { struct recdb_add_traverse_state state; int ret; state.recdb = recdb; state.mypnn = mypnn; ret = ctdb_rec_buffer_traverse(recbuf, recdb_add_traverse, &state); if (ret != 0) { return false; } return true; } /* This function decides which records from recdb are retained */ static int recbuf_filter_add(struct ctdb_rec_buffer *recbuf, bool persistent, uint32_t reqid, uint32_t dmaster, TDB_DATA key, TDB_DATA data) { struct ctdb_ltdb_header *header; int ret; /* Skip empty records */ if (data.dsize <= sizeof(struct ctdb_ltdb_header)) { return 0; } /* update the dmaster field to point to us */ header = (struct ctdb_ltdb_header *)data.dptr; if (!persistent) { header->dmaster = dmaster; header->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA; } ret = ctdb_rec_buffer_add(recbuf, recbuf, reqid, NULL, key, data); if (ret != 0) { return ret; } return 0; } struct recdb_file_traverse_state { struct ctdb_rec_buffer *recbuf; struct recdb_context *recdb; TALLOC_CTX *mem_ctx; uint32_t dmaster; uint32_t reqid; bool persistent; bool failed; int fd; size_t max_size; unsigned int num_buffers; }; static int recdb_file_traverse(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *private_data) { struct recdb_file_traverse_state *state = (struct recdb_file_traverse_state *)private_data; int ret; ret = recbuf_filter_add(state->recbuf, state->persistent, state->reqid, state->dmaster, key, data); if (ret != 0) { state->failed = true; return ret; } if (ctdb_rec_buffer_len(state->recbuf) > state->max_size) { ret = ctdb_rec_buffer_write(state->recbuf, state->fd); if (ret != 0) { D_ERR("Failed to collect recovery records for %s\n", recdb_name(state->recdb)); state->failed = true; return ret; } state->num_buffers += 1; TALLOC_FREE(state->recbuf); state->recbuf = ctdb_rec_buffer_init(state->mem_ctx, recdb_id(state->recdb)); if (state->recbuf == NULL) { state->failed = true; return ENOMEM; } } return 0; } static int recdb_file(struct recdb_context *recdb, TALLOC_CTX *mem_ctx, uint32_t dmaster, int fd, int max_size) { struct recdb_file_traverse_state state; int ret; state.recbuf = ctdb_rec_buffer_init(mem_ctx, recdb_id(recdb)); if (state.recbuf == NULL) { return -1; } state.recdb = recdb; state.mem_ctx = mem_ctx; state.dmaster = dmaster; state.reqid = 0; state.persistent = recdb_persistent(recdb); state.failed = false; state.fd = fd; state.max_size = max_size; state.num_buffers = 0; ret = tdb_traverse_read(recdb_tdb(recdb), recdb_file_traverse, &state); if (ret == -1 || state.failed) { TALLOC_FREE(state.recbuf); return -1; } ret = ctdb_rec_buffer_write(state.recbuf, fd); if (ret != 0) { D_ERR("Failed to collect recovery records for %s\n", recdb_name(recdb)); TALLOC_FREE(state.recbuf); return -1; } state.num_buffers += 1; D_DEBUG("Wrote %d buffers of recovery records for %s\n", state.num_buffers, recdb_name(recdb)); return state.num_buffers; } /* * Pull database from a single node */ struct pull_database_state { struct tevent_context *ev; struct ctdb_client_context *client; struct recdb_context *recdb; uint32_t pnn; uint64_t srvid; unsigned int num_records; int result; }; static void pull_database_handler(uint64_t srvid, TDB_DATA data, void *private_data); static void pull_database_register_done(struct tevent_req *subreq); static void pull_database_unregister_done(struct tevent_req *subreq); static void pull_database_done(struct tevent_req *subreq); static struct tevent_req *pull_database_send( TALLOC_CTX *mem_ctx, struct tevent_context *ev, struct ctdb_client_context *client, uint32_t pnn, struct recdb_context *recdb) { struct tevent_req *req, *subreq; struct pull_database_state *state; req = tevent_req_create(mem_ctx, &state, struct pull_database_state); if (req == NULL) { return NULL; } state->ev = ev; state->client = client; state->recdb = recdb; state->pnn = pnn; state->srvid = srvid_next(); subreq = ctdb_client_set_message_handler_send( state, state->ev, state->client, state->srvid, pull_database_handler, req); if (tevent_req_nomem(subreq, req)) { return tevent_req_post(req, ev); } tevent_req_set_callback(subreq, pull_database_register_done, req); return req; } static void pull_database_handler(uint64_t srvid, TDB_DATA data, void *private_data) { struct tevent_req *req = talloc_get_type_abort( private_data, struct tevent_req); struct pull_database_state *state = tevent_req_data( req, struct pull_database_state); struct ctdb_rec_buffer *recbuf; size_t np; int ret; bool status; if (srvid != state->srvid) { return; } ret = ctdb_rec_buffer_pull(data.dptr, data.dsize, state, &recbuf, &np); if (ret != 0) { D_ERR("Invalid data received for DB_PULL messages\n"); return; } if (recbuf->db_id != recdb_id(state->recdb)) { talloc_free(recbuf); D_ERR("Invalid dbid:%08x for DB_PULL messages for %s\n", recbuf->db_id, recdb_name(state->recdb)); return; } status = recdb_add(state->recdb, ctdb_client_pnn(state->client), recbuf); if (! status) { talloc_free(recbuf); D_ERR("Failed to add records to recdb for %s\n", recdb_name(state->recdb)); return; } state->num_records += recbuf->count; talloc_free(recbuf); } static void pull_database_register_done(struct tevent_req *subreq) { struct tevent_req *req = tevent_req_callback_data( subreq, struct tevent_req); struct pull_database_state *state = tevent_req_data( req, struct pull_database_state); struct ctdb_req_control request; struct ctdb_pulldb_ext pulldb_ext; int ret; bool status; status = ctdb_client_set_message_handler_recv(subreq, &ret); TALLOC_FREE(subreq); if (! status) { D_ERR("Failed to set message handler for DB_PULL for %s\n", recdb_name(state->recdb)); tevent_req_error(req, ret); return; } pulldb_ext.db_id = recdb_id(state->recdb); pulldb_ext.lmaster = CTDB_LMASTER_ANY; pulldb_ext.srvid = state->srvid; ctdb_req_control_db_pull(&request, &pulldb_ext); subreq = ctdb_client_control_send(state, state->ev, state->client, state->pnn, TIMEOUT(), &request); if (tevent_req_nomem(subreq, req)) { return; } tevent_req_set_callback(subreq, pull_database_done, req); } static void pull_database_done(struct tevent_req *subreq) { struct tevent_req *req = tevent_req_callback_data( subreq, struct tevent_req); struct pull_database_state *state = tevent_req_data( req, struct pull_database_state); struct ctdb_reply_control *reply; uint32_t num_records; int ret; bool status; status = ctdb_client_control_recv(subreq, &ret, state, &reply); TALLOC_FREE(subreq); if (! status) { D_ERR("control DB_PULL failed for %s on node %u, ret=%d\n", recdb_name(state->recdb), state->pnn, ret); state->result = ret; goto unregister; } ret = ctdb_reply_control_db_pull(reply, &num_records); talloc_free(reply); if (num_records != state->num_records) { D_ERR("mismatch (%u != %u) in DB_PULL records for db %s\n", num_records, state->num_records, recdb_name(state->recdb)); state->result = EIO; goto unregister; } D_INFO("Pulled %d records for db %s from node %d\n", state->num_records, recdb_name(state->recdb), state->pnn); unregister: subreq = ctdb_client_remove_message_handler_send( state, state->ev, state->client, state->srvid, req); if (tevent_req_nomem(subreq, req)) { return; } tevent_req_set_callback(subreq, pull_database_unregister_done, req); } static void pull_database_unregister_done(struct tevent_req *subreq) { struct tevent_req *req = tevent_req_callback_data( subreq, struct tevent_req); struct pull_database_state *state = tevent_req_data( req, struct pull_database_state); int ret; bool status; status = ctdb_client_remove_message_handler_recv(subreq, &ret); TALLOC_FREE(subreq); if (! status) { D_ERR("failed to remove message handler for DB_PULL for db %s\n", recdb_name(state->recdb)); tevent_req_error(req, ret); return; } if (state->result != 0) { tevent_req_error(req, state->result); return; } tevent_req_done(req); } static bool pull_database_recv(struct tevent_req *req, int *perr) { return generic_recv(req, perr); } /* * Push database to specified nodes (new style) */ struct push_database_state { struct tevent_context *ev; struct ctdb_client_context *client; struct recdb_context *recdb; uint32_t *pnn_list; unsigned int count; uint64_t srvid; uint32_t dmaster; int fd; int num_buffers; int num_buffers_sent; unsigned int num_records; }; static void push_database_started(struct tevent_req *subreq); static void push_database_send_msg(struct tevent_req *req); static void push_database_send_done(struct tevent_req *subreq); static void push_database_confirmed(struct tevent_req *subreq); static struct tevent_req *push_database_send( TALLOC_CTX *mem_ctx, struct tevent_context *ev, struct ctdb_client_context *client, uint32_t *pnn_list, unsigned int count, struct recdb_context *recdb, int max_size) { struct tevent_req *req, *subreq; struct push_database_state *state; struct ctdb_req_control request; struct ctdb_pulldb_ext pulldb_ext; char *filename; off_t offset; req = tevent_req_create(mem_ctx, &state, struct push_database_state); if (req == NULL) { return NULL; } state->ev = ev; state->client = client; state->recdb = recdb; state->pnn_list = pnn_list; state->count = count; state->srvid = srvid_next(); state->dmaster = ctdb_client_pnn(client); state->num_buffers_sent = 0; state->num_records = 0; filename = talloc_asprintf(state, "%s.dat", recdb_path(recdb)); if (tevent_req_nomem(filename, req)) { return tevent_req_post(req, ev); } state->fd = open(filename, O_RDWR|O_CREAT, 0644); if (state->fd == -1) { tevent_req_error(req, errno); return tevent_req_post(req, ev); } unlink(filename); talloc_free(filename); state->num_buffers = recdb_file(recdb, state, state->dmaster, state->fd, max_size); if (state->num_buffers == -1) { tevent_req_error(req, ENOMEM); return tevent_req_post(req, ev); } offset = lseek(state->fd, 0, SEEK_SET); if (offset != 0) { tevent_req_error(req, EIO); return tevent_req_post(req, ev); } pulldb_ext.db_id = recdb_id(recdb); pulldb_ext.srvid = state->srvid; ctdb_req_control_db_push_start(&request, &pulldb_ext); subreq = ctdb_client_control_multi_send(state, ev, client, pnn_list, count, TIMEOUT(), &request); if (tevent_req_nomem(subreq, req)) { return tevent_req_post(req, ev); } tevent_req_set_callback(subreq, push_database_started, req); return req; } static void push_database_started(struct tevent_req *subreq) { struct tevent_req *req = tevent_req_callback_data( subreq, struct tevent_req); struct push_database_state *state = tevent_req_data( req, struct push_database_state); int *err_list; int ret; bool status; status = ctdb_client_control_multi_recv(subreq, &ret, state, &err_list, NULL); TALLOC_FREE(subreq); if (! status) { int ret2; uint32_t pnn; ret2 = ctdb_client_control_multi_error(state->pnn_list, state->count, err_list, &pnn); if (ret2 != 0) { D_ERR("control DB_PUSH_START failed for db %s" " on node %u, ret=%d\n", recdb_name(state->recdb), pnn, ret2); } else { D_ERR("control DB_PUSH_START failed for db %s," " ret=%d\n", recdb_name(state->recdb), ret); } talloc_free(err_list); tevent_req_error(req, ret); return; } push_database_send_msg(req); } static void push_database_send_msg(struct tevent_req *req) { struct push_database_state *state = tevent_req_data( req, struct push_database_state); struct tevent_req *subreq; struct ctdb_rec_buffer *recbuf; struct ctdb_req_message message; TDB_DATA data; size_t np; int ret; if (state->num_buffers_sent == state->num_buffers) { struct ctdb_req_control request; ctdb_req_control_db_push_confirm(&request, recdb_id(state->recdb)); subreq = ctdb_client_control_multi_send(state, state->ev, state->client, state->pnn_list, state->count, TIMEOUT(), &request); if (tevent_req_nomem(subreq, req)) { return; } tevent_req_set_callback(subreq, push_database_confirmed, req); return; } ret = ctdb_rec_buffer_read(state->fd, state, &recbuf); if (ret != 0) { tevent_req_error(req, ret); return; } data.dsize = ctdb_rec_buffer_len(recbuf); data.dptr = talloc_size(state, data.dsize); if (tevent_req_nomem(data.dptr, req)) { return; } ctdb_rec_buffer_push(recbuf, data.dptr, &np); message.srvid = state->srvid; message.data.data = data; D_DEBUG("Pushing buffer %d with %d records for db %s\n", state->num_buffers_sent, recbuf->count, recdb_name(state->recdb)); subreq = ctdb_client_message_multi_send(state, state->ev, state->client, state->pnn_list, state->count, &message); if (tevent_req_nomem(subreq, req)) { return; } tevent_req_set_callback(subreq, push_database_send_done, req); state->num_records += recbuf->count; talloc_free(data.dptr); talloc_free(recbuf); } static void push_database_send_done(struct tevent_req *subreq) { struct tevent_req *req = tevent_req_callback_data( subreq, struct tevent_req); struct push_database_state *state = tevent_req_data( req, struct push_database_state); bool status; int ret; status = ctdb_client_message_multi_recv(subreq, &ret, NULL, NULL); TALLOC_FREE(subreq); if (! status) { D_ERR("Sending recovery records failed for %s\n", recdb_name(state->recdb)); tevent_req_error(req, ret); return; } state->num_buffers_sent += 1; push_database_send_msg(req); } static void push_database_confirmed(struct tevent_req *subreq) { struct tevent_req *req = tevent_req_callback_data( subreq, struct tevent_req); struct push_database_state *state = tevent_req_data( req, struct push_database_state); struct ctdb_reply_control **reply; int *err_list; bool status; unsigned int i; int ret; uint32_t num_records; status = ctdb_client_control_multi_recv(subreq, &ret, state, &err_list, &reply); TALLOC_FREE(subreq); if (! status) { int ret2; uint32_t pnn; ret2 = ctdb_client_control_multi_error(state->pnn_list, state->count, err_list, &pnn); if (ret2 != 0) { D_ERR("control DB_PUSH_CONFIRM failed for db %s" " on node %u, ret=%d\n", recdb_name(state->recdb), pnn, ret2); } else { D_ERR("control DB_PUSH_CONFIRM failed for db %s," " ret=%d\n", recdb_name(state->recdb), ret); } tevent_req_error(req, ret); return; } for (i=0; icount; i++) { ret = ctdb_reply_control_db_push_confirm(reply[i], &num_records); if (ret != 0) { tevent_req_error(req, EPROTO); return; } if (num_records != state->num_records) { D_ERR("Node %u received %d of %d records for %s\n", state->pnn_list[i], num_records, state->num_records, recdb_name(state->recdb)); tevent_req_error(req, EPROTO); return; } } talloc_free(reply); D_INFO("Pushed %d records for db %s\n", state->num_records, recdb_name(state->recdb)); tevent_req_done(req); } static bool push_database_recv(struct tevent_req *req, int *perr) { return generic_recv(req, perr); } /* * Collect databases using highest sequence number */ struct collect_highseqnum_db_state { struct tevent_context *ev; struct ctdb_client_context *client; struct node_list *nlist; uint32_t db_id; struct recdb_context *recdb; uint32_t max_pnn; }; static void collect_highseqnum_db_seqnum_done(struct tevent_req *subreq); static void collect_highseqnum_db_pulldb_done(struct tevent_req *subreq); static struct tevent_req *collect_highseqnum_db_send( TALLOC_CTX *mem_ctx, struct tevent_context *ev, struct ctdb_client_context *client, struct node_list *nlist, uint32_t db_id, struct recdb_context *recdb) { struct tevent_req *req, *subreq; struct collect_highseqnum_db_state *state; struct ctdb_req_control request; req = tevent_req_create(mem_ctx, &state, struct collect_highseqnum_db_state); if (req == NULL) { return NULL; } state->ev = ev; state->client = client; state->nlist = nlist; state->db_id = db_id; state->recdb = recdb; ctdb_req_control_get_db_seqnum(&request, db_id); subreq = ctdb_client_control_multi_send(mem_ctx, ev, client, nlist->pnn_list, nlist->count, TIMEOUT(), &request); if (tevent_req_nomem(subreq, req)) { return tevent_req_post(req, ev); } tevent_req_set_callback(subreq, collect_highseqnum_db_seqnum_done, req); return req; } static void collect_highseqnum_db_seqnum_done(struct tevent_req *subreq) { struct tevent_req *req = tevent_req_callback_data( subreq, struct tevent_req); struct collect_highseqnum_db_state *state = tevent_req_data( req, struct collect_highseqnum_db_state); struct ctdb_reply_control **reply; int *err_list; bool status; unsigned int i; int ret; uint64_t seqnum, max_seqnum; status = ctdb_client_control_multi_recv(subreq, &ret, state, &err_list, &reply); TALLOC_FREE(subreq); if (! status) { int ret2; uint32_t pnn; ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list, state->nlist->count, err_list, &pnn); if (ret2 != 0) { D_ERR("control GET_DB_SEQNUM failed for db %s" " on node %u, ret=%d\n", recdb_name(state->recdb), pnn, ret2); } else { D_ERR("control GET_DB_SEQNUM failed for db %s," " ret=%d\n", recdb_name(state->recdb), ret); } tevent_req_error(req, ret); return; } max_seqnum = 0; state->max_pnn = state->nlist->pnn_list[0]; for (i=0; inlist->count; i++) { ret = ctdb_reply_control_get_db_seqnum(reply[i], &seqnum); if (ret != 0) { tevent_req_error(req, EPROTO); return; } if (max_seqnum < seqnum) { max_seqnum = seqnum; state->max_pnn = state->nlist->pnn_list[i]; } } talloc_free(reply); D_INFO("Pull persistent db %s from node %d with seqnum 0x%"PRIx64"\n", recdb_name(state->recdb), state->max_pnn, max_seqnum); subreq = pull_database_send(state, state->ev, state->client, state->max_pnn, state->recdb); if (tevent_req_nomem(subreq, req)) { return; } tevent_req_set_callback(subreq, collect_highseqnum_db_pulldb_done, req); } static void collect_highseqnum_db_pulldb_done(struct tevent_req *subreq) { struct tevent_req *req = tevent_req_callback_data( subreq, struct tevent_req); struct collect_highseqnum_db_state *state = tevent_req_data( req, struct collect_highseqnum_db_state); int ret; bool status; status = pull_database_recv(subreq, &ret); TALLOC_FREE(subreq); if (! status) { node_list_ban_credits(state->nlist, state->max_pnn); tevent_req_error(req, ret); return; } tevent_req_done(req); } static bool collect_highseqnum_db_recv(struct tevent_req *req, int *perr) { return generic_recv(req, perr); } /* * Collect all databases */ struct collect_all_db_state { struct tevent_context *ev; struct ctdb_client_context *client; struct node_list *nlist; uint32_t db_id; struct recdb_context *recdb; struct ctdb_pulldb pulldb; unsigned int index; }; static void collect_all_db_pulldb_done(struct tevent_req *subreq); static struct tevent_req *collect_all_db_send( TALLOC_CTX *mem_ctx, struct tevent_context *ev, struct ctdb_client_context *client, struct node_list *nlist, uint32_t db_id, struct recdb_context *recdb) { struct tevent_req *req, *subreq; struct collect_all_db_state *state; req = tevent_req_create(mem_ctx, &state, struct collect_all_db_state); if (req == NULL) { return NULL; } state->ev = ev; state->client = client; state->nlist = nlist; state->db_id = db_id; state->recdb = recdb; state->index = 0; subreq = pull_database_send(state, ev, client, nlist->pnn_list[state->index], recdb); if (tevent_req_nomem(subreq, req)) { return tevent_req_post(req, ev); } tevent_req_set_callback(subreq, collect_all_db_pulldb_done, req); return req; } static void collect_all_db_pulldb_done(struct tevent_req *subreq) { struct tevent_req *req = tevent_req_callback_data( subreq, struct tevent_req); struct collect_all_db_state *state = tevent_req_data( req, struct collect_all_db_state); int ret; bool status; status = pull_database_recv(subreq, &ret); TALLOC_FREE(subreq); if (! status) { node_list_ban_credits(state->nlist, state->nlist->pnn_list[state->index]); tevent_req_error(req, ret); return; } state->index += 1; if (state->index == state->nlist->count) { tevent_req_done(req); return; } subreq = pull_database_send(state, state->ev, state->client, state->nlist->pnn_list[state->index], state->recdb); if (tevent_req_nomem(subreq, req)) { return; } tevent_req_set_callback(subreq, collect_all_db_pulldb_done, req); } static bool collect_all_db_recv(struct tevent_req *req, int *perr) { return generic_recv(req, perr); } /** * For each database do the following: * - Get DB name from all nodes * - Attach database on missing nodes * - Get DB path * - Freeze database on all nodes * - Start transaction on all nodes * - Collect database from all nodes * - Wipe database on all nodes * - Push database to all nodes * - Commit transaction on all nodes * - Thaw database on all nodes */ struct recover_db_state { struct tevent_context *ev; struct ctdb_client_context *client; struct ctdb_tunable_list *tun_list; struct node_list *nlist; struct db *db; uint32_t destnode; struct ctdb_transdb transdb; const char *db_name, *db_path; struct recdb_context *recdb; }; static void recover_db_name_done(struct tevent_req *subreq); static void recover_db_create_missing_done(struct tevent_req *subreq); static void recover_db_path_done(struct tevent_req *subreq); static void recover_db_freeze_done(struct tevent_req *subreq); static void recover_db_transaction_started(struct tevent_req *subreq); static void recover_db_collect_done(struct tevent_req *subreq); static void recover_db_wipedb_done(struct tevent_req *subreq); static void recover_db_pushdb_done(struct tevent_req *subreq); static void recover_db_transaction_committed(struct tevent_req *subreq); static void recover_db_thaw_done(struct tevent_req *subreq); static struct tevent_req *recover_db_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev, struct ctdb_client_context *client, struct ctdb_tunable_list *tun_list, struct node_list *nlist, uint32_t generation, struct db *db) { struct tevent_req *req, *subreq; struct recover_db_state *state; struct ctdb_req_control request; req = tevent_req_create(mem_ctx, &state, struct recover_db_state); if (req == NULL) { return NULL; } state->ev = ev; state->client = client; state->tun_list = tun_list; state->nlist = nlist; state->db = db; state->destnode = ctdb_client_pnn(client); state->transdb.db_id = db->db_id; state->transdb.tid = generation; ctdb_req_control_get_dbname(&request, db->db_id); subreq = ctdb_client_control_multi_send(state, ev, client, state->db->pnn_list, state->db->num_nodes, TIMEOUT(), &request); if (tevent_req_nomem(subreq, req)) { return tevent_req_post(req, ev); } tevent_req_set_callback(subreq, recover_db_name_done, req); return req; } static void recover_db_name_done(struct tevent_req *subreq) { struct tevent_req *req = tevent_req_callback_data( subreq, struct tevent_req); struct recover_db_state *state = tevent_req_data( req, struct recover_db_state); struct ctdb_reply_control **reply; int *err_list; unsigned int i; int ret; bool status; status = ctdb_client_control_multi_recv(subreq, &ret, state, &err_list, &reply); TALLOC_FREE(subreq); if (! status) { int ret2; uint32_t pnn; ret2 = ctdb_client_control_multi_error(state->db->pnn_list, state->db->num_nodes, err_list, &pnn); if (ret2 != 0) { D_ERR("control GET_DBNAME failed on node %u," " ret=%d\n", pnn, ret2); } else { D_ERR("control GET_DBNAME failed, ret=%d\n", ret); } tevent_req_error(req, ret); return; } for (i = 0; i < state->db->num_nodes; i++) { const char *db_name; uint32_t pnn; pnn = state->nlist->pnn_list[i]; ret = ctdb_reply_control_get_dbname(reply[i], state, &db_name); if (ret != 0) { D_ERR("control GET_DBNAME failed on node %u " "for db=0x%x, ret=%d\n", pnn, state->db->db_id, ret); tevent_req_error(req, EPROTO); return; } if (state->db_name == NULL) { state->db_name = db_name; continue; } if (strcmp(state->db_name, db_name) != 0) { D_ERR("Incompatible database name for 0x%"PRIx32" " "(%s != %s) on node %"PRIu32"\n", state->db->db_id, db_name, state->db_name, pnn); node_list_ban_credits(state->nlist, pnn); tevent_req_error(req, ret); return; } } talloc_free(reply); subreq = db_create_missing_send(state, state->ev, state->client, state->nlist, state->db_name, state->db); if (tevent_req_nomem(subreq, req)) { return; } tevent_req_set_callback(subreq, recover_db_create_missing_done, req); } static void recover_db_create_missing_done(struct tevent_req *subreq) { struct tevent_req *req = tevent_req_callback_data( subreq, struct tevent_req); struct recover_db_state *state = tevent_req_data( req, struct recover_db_state); struct ctdb_req_control request; int ret; bool status; /* Could sanity check the db_id here */ status = db_create_missing_recv(subreq, &ret); TALLOC_FREE(subreq); if (! status) { tevent_req_error(req, ret); return; } ctdb_req_control_getdbpath(&request, state->db->db_id); subreq = ctdb_client_control_send(state, state->ev, state->client, state->destnode, TIMEOUT(), &request); if (tevent_req_nomem(subreq, req)) { return; } tevent_req_set_callback(subreq, recover_db_path_done, req); } static void recover_db_path_done(struct tevent_req *subreq) { struct tevent_req *req = tevent_req_callback_data( subreq, struct tevent_req); struct recover_db_state *state = tevent_req_data( req, struct recover_db_state); struct ctdb_reply_control *reply; struct ctdb_req_control request; int ret; bool status; status = ctdb_client_control_recv(subreq, &ret, state, &reply); TALLOC_FREE(subreq); if (! status) { D_ERR("control GETDBPATH failed for db %s, ret=%d\n", state->db_name, ret); tevent_req_error(req, ret); return; } ret = ctdb_reply_control_getdbpath(reply, state, &state->db_path); if (ret != 0) { D_ERR("control GETDBPATH failed for db %s, ret=%d\n", state->db_name, ret); tevent_req_error(req, EPROTO); return; } talloc_free(reply); ctdb_req_control_db_freeze(&request, state->db->db_id); subreq = ctdb_client_control_multi_send(state, state->ev, state->client, state->nlist->pnn_list, state->nlist->count, TIMEOUT(), &request); if (tevent_req_nomem(subreq, req)) { return; } tevent_req_set_callback(subreq, recover_db_freeze_done, req); } static void recover_db_freeze_done(struct tevent_req *subreq) { struct tevent_req *req = tevent_req_callback_data( subreq, struct tevent_req); struct recover_db_state *state = tevent_req_data( req, struct recover_db_state); struct ctdb_req_control request; int *err_list; int ret; bool status; status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list, NULL); TALLOC_FREE(subreq); if (! status) { int ret2; uint32_t pnn; ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list, state->nlist->count, err_list, &pnn); if (ret2 != 0) { D_ERR("control FREEZE_DB failed for db %s" " on node %u, ret=%d\n", state->db_name, pnn, ret2); node_list_ban_credits(state->nlist, pnn); } else { D_ERR("control FREEZE_DB failed for db %s, ret=%d\n", state->db_name, ret); } tevent_req_error(req, ret); return; } ctdb_req_control_db_transaction_start(&request, &state->transdb); subreq = ctdb_client_control_multi_send(state, state->ev, state->client, state->nlist->pnn_list, state->nlist->count, TIMEOUT(), &request); if (tevent_req_nomem(subreq, req)) { return; } tevent_req_set_callback(subreq, recover_db_transaction_started, req); } static void recover_db_transaction_started(struct tevent_req *subreq) { struct tevent_req *req = tevent_req_callback_data( subreq, struct tevent_req); struct recover_db_state *state = tevent_req_data( req, struct recover_db_state); int *err_list; uint32_t flags; int ret; bool status; status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list, NULL); TALLOC_FREE(subreq); if (! status) { int ret2; uint32_t pnn; ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list, state->nlist->count, err_list, &pnn); if (ret2 != 0) { D_ERR("control TRANSACTION_DB failed for db=%s" " on node %u, ret=%d\n", state->db_name, pnn, ret2); } else { D_ERR("control TRANSACTION_DB failed for db=%s," " ret=%d\n", state->db_name, ret); } tevent_req_error(req, ret); return; } flags = state->db->db_flags; state->recdb = recdb_create(state, state->db->db_id, state->db_name, state->db_path, state->tun_list->database_hash_size, flags & CTDB_DB_FLAGS_PERSISTENT); if (tevent_req_nomem(state->recdb, req)) { return; } if ((flags & CTDB_DB_FLAGS_PERSISTENT) || (flags & CTDB_DB_FLAGS_REPLICATED)) { subreq = collect_highseqnum_db_send(state, state->ev, state->client, state->nlist, state->db->db_id, state->recdb); } else { subreq = collect_all_db_send(state, state->ev, state->client, state->nlist, state->db->db_id, state->recdb); } if (tevent_req_nomem(subreq, req)) { return; } tevent_req_set_callback(subreq, recover_db_collect_done, req); } static void recover_db_collect_done(struct tevent_req *subreq) { struct tevent_req *req = tevent_req_callback_data( subreq, struct tevent_req); struct recover_db_state *state = tevent_req_data( req, struct recover_db_state); struct ctdb_req_control request; int ret; bool status; if ((state->db->db_flags & CTDB_DB_FLAGS_PERSISTENT) || (state->db->db_flags & CTDB_DB_FLAGS_REPLICATED)) { status = collect_highseqnum_db_recv(subreq, &ret); } else { status = collect_all_db_recv(subreq, &ret); } TALLOC_FREE(subreq); if (! status) { tevent_req_error(req, ret); return; } ctdb_req_control_wipe_database(&request, &state->transdb); subreq = ctdb_client_control_multi_send(state, state->ev, state->client, state->nlist->pnn_list, state->nlist->count, TIMEOUT(), &request); if (tevent_req_nomem(subreq, req)) { return; } tevent_req_set_callback(subreq, recover_db_wipedb_done, req); } static void recover_db_wipedb_done(struct tevent_req *subreq) { struct tevent_req *req = tevent_req_callback_data( subreq, struct tevent_req); struct recover_db_state *state = tevent_req_data( req, struct recover_db_state); int *err_list; int ret; bool status; status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list, NULL); TALLOC_FREE(subreq); if (! status) { int ret2; uint32_t pnn; ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list, state->nlist->count, err_list, &pnn); if (ret2 != 0) { D_ERR("control WIPEDB failed for db %s on node %u," " ret=%d\n", state->db_name, pnn, ret2); } else { D_ERR("control WIPEDB failed for db %s, ret=%d\n", state->db_name, ret); } tevent_req_error(req, ret); return; } subreq = push_database_send(state, state->ev, state->client, state->nlist->pnn_list, state->nlist->count, state->recdb, state->tun_list->rec_buffer_size_limit); if (tevent_req_nomem(subreq, req)) { return; } tevent_req_set_callback(subreq, recover_db_pushdb_done, req); } static void recover_db_pushdb_done(struct tevent_req *subreq) { struct tevent_req *req = tevent_req_callback_data( subreq, struct tevent_req); struct recover_db_state *state = tevent_req_data( req, struct recover_db_state); struct ctdb_req_control request; int ret; bool status; status = push_database_recv(subreq, &ret); TALLOC_FREE(subreq); if (! status) { tevent_req_error(req, ret); return; } TALLOC_FREE(state->recdb); ctdb_req_control_db_transaction_commit(&request, &state->transdb); subreq = ctdb_client_control_multi_send(state, state->ev, state->client, state->nlist->pnn_list, state->nlist->count, TIMEOUT(), &request); if (tevent_req_nomem(subreq, req)) { return; } tevent_req_set_callback(subreq, recover_db_transaction_committed, req); } static void recover_db_transaction_committed(struct tevent_req *subreq) { struct tevent_req *req = tevent_req_callback_data( subreq, struct tevent_req); struct recover_db_state *state = tevent_req_data( req, struct recover_db_state); struct ctdb_req_control request; int *err_list; int ret; bool status; status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list, NULL); TALLOC_FREE(subreq); if (! status) { int ret2; uint32_t pnn; ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list, state->nlist->count, err_list, &pnn); if (ret2 != 0) { D_ERR("control DB_TRANSACTION_COMMIT failed for db %s" " on node %u, ret=%d\n", state->db_name, pnn, ret2); } else { D_ERR("control DB_TRANSACTION_COMMIT failed for db %s," " ret=%d\n", state->db_name, ret); } tevent_req_error(req, ret); return; } ctdb_req_control_db_thaw(&request, state->db->db_id); subreq = ctdb_client_control_multi_send(state, state->ev, state->client, state->nlist->pnn_list, state->nlist->count, TIMEOUT(), &request); if (tevent_req_nomem(subreq, req)) { return; } tevent_req_set_callback(subreq, recover_db_thaw_done, req); } static void recover_db_thaw_done(struct tevent_req *subreq) { struct tevent_req *req = tevent_req_callback_data( subreq, struct tevent_req); struct recover_db_state *state = tevent_req_data( req, struct recover_db_state); int *err_list; int ret; bool status; status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list, NULL); TALLOC_FREE(subreq); if (! status) { int ret2; uint32_t pnn; ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list, state->nlist->count, err_list, &pnn); if (ret2 != 0) { D_ERR("control DB_THAW failed for db %s on node %u," " ret=%d\n", state->db_name, pnn, ret2); } else { D_ERR("control DB_THAW failed for db %s, ret=%d\n", state->db_name, ret); } tevent_req_error(req, ret); return; } tevent_req_done(req); } static bool recover_db_recv(struct tevent_req *req) { return generic_recv(req, NULL); } /* * Start database recovery for each database * * Try to recover each database 5 times before failing recovery. */ struct db_recovery_state { struct tevent_context *ev; struct db_list *dblist; unsigned int num_replies; unsigned int num_failed; }; struct db_recovery_one_state { struct tevent_req *req; struct ctdb_client_context *client; struct db_list *dblist; struct ctdb_tunable_list *tun_list; struct node_list *nlist; uint32_t generation; struct db *db; int num_fails; }; static void db_recovery_one_done(struct tevent_req *subreq); static struct tevent_req *db_recovery_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev, struct ctdb_client_context *client, struct db_list *dblist, struct ctdb_tunable_list *tun_list, struct node_list *nlist, uint32_t generation) { struct tevent_req *req, *subreq; struct db_recovery_state *state; struct db *db; req = tevent_req_create(mem_ctx, &state, struct db_recovery_state); if (req == NULL) { return NULL; } state->ev = ev; state->dblist = dblist; state->num_replies = 0; state->num_failed = 0; if (dblist->num_dbs == 0) { tevent_req_done(req); return tevent_req_post(req, ev); } for (db = dblist->db; db != NULL; db = db->next) { struct db_recovery_one_state *substate; substate = talloc_zero(state, struct db_recovery_one_state); if (tevent_req_nomem(substate, req)) { return tevent_req_post(req, ev); } substate->req = req; substate->client = client; substate->dblist = dblist; substate->tun_list = tun_list; substate->nlist = nlist; substate->generation = generation; substate->db = db; subreq = recover_db_send(state, ev, client, tun_list, nlist, generation, substate->db); if (tevent_req_nomem(subreq, req)) { return tevent_req_post(req, ev); } tevent_req_set_callback(subreq, db_recovery_one_done, substate); D_NOTICE("recover database 0x%08x\n", substate->db->db_id); } return req; } static void db_recovery_one_done(struct tevent_req *subreq) { struct db_recovery_one_state *substate = tevent_req_callback_data( subreq, struct db_recovery_one_state); struct tevent_req *req = substate->req; struct db_recovery_state *state = tevent_req_data( req, struct db_recovery_state); bool status; status = recover_db_recv(subreq); TALLOC_FREE(subreq); if (status) { talloc_free(substate); goto done; } substate->num_fails += 1; if (substate->num_fails < NUM_RETRIES) { subreq = recover_db_send(state, state->ev, substate->client, substate->tun_list, substate->nlist, substate->generation, substate->db); if (tevent_req_nomem(subreq, req)) { goto failed; } tevent_req_set_callback(subreq, db_recovery_one_done, substate); D_NOTICE("recover database 0x%08x, attempt %d\n", substate->db->db_id, substate->num_fails+1); return; } failed: state->num_failed += 1; done: state->num_replies += 1; if (state->num_replies == state->dblist->num_dbs) { tevent_req_done(req); } } static bool db_recovery_recv(struct tevent_req *req, unsigned int *count) { struct db_recovery_state *state = tevent_req_data( req, struct db_recovery_state); int err; if (tevent_req_is_unix_error(req, &err)) { *count = 0; return false; } *count = state->num_replies - state->num_failed; if (state->num_failed > 0) { return false; } return true; } struct ban_node_state { struct tevent_context *ev; struct ctdb_client_context *client; struct ctdb_tunable_list *tun_list; struct node_list *nlist; uint32_t destnode; uint32_t max_pnn; }; static bool ban_node_check(struct tevent_req *req); static void ban_node_check_done(struct tevent_req *subreq); static void ban_node_done(struct tevent_req *subreq); static struct tevent_req *ban_node_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev, struct ctdb_client_context *client, struct ctdb_tunable_list *tun_list, struct node_list *nlist) { struct tevent_req *req; struct ban_node_state *state; bool ok; req = tevent_req_create(mem_ctx, &state, struct ban_node_state); if (req == NULL) { return NULL; } state->ev = ev; state->client = client; state->tun_list = tun_list; state->nlist = nlist; state->destnode = ctdb_client_pnn(client); /* Bans are not enabled */ if (state->tun_list->enable_bans == 0) { D_ERR("Bans are not enabled\n"); tevent_req_done(req); return tevent_req_post(req, ev); } ok = ban_node_check(req); if (!ok) { return tevent_req_post(req, ev); } return req; } static bool ban_node_check(struct tevent_req *req) { struct tevent_req *subreq; struct ban_node_state *state = tevent_req_data( req, struct ban_node_state); struct ctdb_req_control request; unsigned max_credits = 0, i; for (i=0; inlist->count; i++) { if (state->nlist->ban_credits[i] > max_credits) { state->max_pnn = state->nlist->pnn_list[i]; max_credits = state->nlist->ban_credits[i]; } } if (max_credits < NUM_RETRIES) { tevent_req_done(req); return false; } ctdb_req_control_get_nodemap(&request); subreq = ctdb_client_control_send(state, state->ev, state->client, state->max_pnn, TIMEOUT(), &request); if (tevent_req_nomem(subreq, req)) { return false; } tevent_req_set_callback(subreq, ban_node_check_done, req); return true; } static void ban_node_check_done(struct tevent_req *subreq) { struct tevent_req *req = tevent_req_callback_data( subreq, struct tevent_req); struct ban_node_state *state = tevent_req_data( req, struct ban_node_state); struct ctdb_reply_control *reply; struct ctdb_node_map *nodemap; struct ctdb_req_control request; struct ctdb_ban_state ban; unsigned int i; int ret; bool ok; ok = ctdb_client_control_recv(subreq, &ret, state, &reply); TALLOC_FREE(subreq); if (!ok) { D_ERR("control GET_NODEMAP failed to node %u, ret=%d\n", state->max_pnn, ret); tevent_req_error(req, ret); return; } ret = ctdb_reply_control_get_nodemap(reply, state, &nodemap); if (ret != 0) { D_ERR("control GET_NODEMAP failed, ret=%d\n", ret); tevent_req_error(req, ret); return; } for (i=0; inum; i++) { if (nodemap->node[i].pnn != state->max_pnn) { continue; } /* If the node became inactive, reset ban_credits */ if (nodemap->node[i].flags & NODE_FLAGS_INACTIVE) { unsigned int j; for (j=0; jnlist->count; j++) { if (state->nlist->pnn_list[j] == state->max_pnn) { state->nlist->ban_credits[j] = 0; break; } } state->max_pnn = CTDB_UNKNOWN_PNN; } } talloc_free(nodemap); talloc_free(reply); /* If node becomes inactive during recovery, pick next */ if (state->max_pnn == CTDB_UNKNOWN_PNN) { (void) ban_node_check(req); return; } ban = (struct ctdb_ban_state) { .pnn = state->max_pnn, .time = state->tun_list->recovery_ban_period, }; D_ERR("Banning node %u for %u seconds\n", ban.pnn, ban.time); ctdb_req_control_set_ban_state(&request, &ban); subreq = ctdb_client_control_send(state, state->ev, state->client, ban.pnn, TIMEOUT(), &request); if (tevent_req_nomem(subreq, req)) { return; } tevent_req_set_callback(subreq, ban_node_done, req); } static void ban_node_done(struct tevent_req *subreq) { struct tevent_req *req = tevent_req_callback_data( subreq, struct tevent_req); struct node_ban_state *state = tevent_req_data( req, struct node_ban_state); struct ctdb_reply_control *reply; int ret; bool status; status = ctdb_client_control_recv(subreq, &ret, state, &reply); TALLOC_FREE(subreq); if (! status) { tevent_req_error(req, ret); return; } ret = ctdb_reply_control_set_ban_state(reply); if (ret != 0) { D_ERR("control SET_BAN_STATE failed, ret=%d\n", ret); tevent_req_error(req, ret); return; } talloc_free(reply); tevent_req_done(req); } static bool ban_node_recv(struct tevent_req *req, int *perr) { if (tevent_req_is_unix_error(req, perr)) { return false; } return true; } /* * Run the parallel database recovery * * - Get tunables * - Get nodemap from all nodes * - Get capabilities from all nodes * - Get dbmap * - Set RECOVERY_ACTIVE * - Send START_RECOVERY * - Update vnnmap on all nodes * - Run database recovery * - Set RECOVERY_NORMAL * - Send END_RECOVERY */ struct recovery_state { struct tevent_context *ev; struct ctdb_client_context *client; uint32_t generation; uint32_t destnode; struct node_list *nlist; struct ctdb_tunable_list *tun_list; struct ctdb_vnn_map *vnnmap; struct db_list *dblist; }; static void recovery_tunables_done(struct tevent_req *subreq); static void recovery_nodemap_done(struct tevent_req *subreq); static void recovery_nodemap_verify(struct tevent_req *subreq); static void recovery_capabilities_done(struct tevent_req *subreq); static void recovery_dbmap_done(struct tevent_req *subreq); static void recovery_active_done(struct tevent_req *subreq); static void recovery_start_recovery_done(struct tevent_req *subreq); static void recovery_vnnmap_update_done(struct tevent_req *subreq); static void recovery_db_recovery_done(struct tevent_req *subreq); static void recovery_failed_done(struct tevent_req *subreq); static void recovery_normal_done(struct tevent_req *subreq); static void recovery_end_recovery_done(struct tevent_req *subreq); static struct tevent_req *recovery_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev, struct ctdb_client_context *client, uint32_t generation) { struct tevent_req *req, *subreq; struct recovery_state *state; struct ctdb_req_control request; req = tevent_req_create(mem_ctx, &state, struct recovery_state); if (req == NULL) { return NULL; } state->ev = ev; state->client = client; state->generation = generation; state->destnode = ctdb_client_pnn(client); ctdb_req_control_get_all_tunables(&request); subreq = ctdb_client_control_send(state, state->ev, state->client, state->destnode, TIMEOUT(), &request); if (tevent_req_nomem(subreq, req)) { return tevent_req_post(req, ev); } tevent_req_set_callback(subreq, recovery_tunables_done, req); return req; } static void recovery_tunables_done(struct tevent_req *subreq) { struct tevent_req *req = tevent_req_callback_data( subreq, struct tevent_req); struct recovery_state *state = tevent_req_data( req, struct recovery_state); struct ctdb_reply_control *reply; struct ctdb_req_control request; int ret; bool status; status = ctdb_client_control_recv(subreq, &ret, state, &reply); TALLOC_FREE(subreq); if (! status) { D_ERR("control GET_ALL_TUNABLES failed, ret=%d\n", ret); tevent_req_error(req, ret); return; } ret = ctdb_reply_control_get_all_tunables(reply, state, &state->tun_list); if (ret != 0) { D_ERR("control GET_ALL_TUNABLES failed, ret=%d\n", ret); tevent_req_error(req, EPROTO); return; } talloc_free(reply); recover_timeout = state->tun_list->recover_timeout; ctdb_req_control_get_nodemap(&request); subreq = ctdb_client_control_send(state, state->ev, state->client, state->destnode, TIMEOUT(), &request); if (tevent_req_nomem(subreq, req)) { return; } tevent_req_set_callback(subreq, recovery_nodemap_done, req); } static void recovery_nodemap_done(struct tevent_req *subreq) { struct tevent_req *req = tevent_req_callback_data( subreq, struct tevent_req); struct recovery_state *state = tevent_req_data( req, struct recovery_state); struct ctdb_reply_control *reply; struct ctdb_req_control request; struct ctdb_node_map *nodemap; unsigned int i; bool status; int ret; status = ctdb_client_control_recv(subreq, &ret, state, &reply); TALLOC_FREE(subreq); if (! status) { D_ERR("control GET_NODEMAP failed to node %u, ret=%d\n", state->destnode, ret); tevent_req_error(req, ret); return; } ret = ctdb_reply_control_get_nodemap(reply, state, &nodemap); if (ret != 0) { D_ERR("control GET_NODEMAP failed, ret=%d\n", ret); tevent_req_error(req, ret); return; } state->nlist = node_list_init(state, nodemap->num); if (tevent_req_nomem(state->nlist, req)) { return; } for (i=0; inum; i++) { bool ok; if (nodemap->node[i].flags & NODE_FLAGS_DISCONNECTED) { continue; } ok = node_list_add(state->nlist, nodemap->node[i].pnn); if (!ok) { tevent_req_error(req, EINVAL); return; } } talloc_free(nodemap); talloc_free(reply); /* Verify flags by getting local node information from each node */ ctdb_req_control_get_nodemap(&request); subreq = ctdb_client_control_multi_send(state, state->ev, state->client, state->nlist->pnn_list, state->nlist->count, TIMEOUT(), &request); if (tevent_req_nomem(subreq, req)) { return; } tevent_req_set_callback(subreq, recovery_nodemap_verify, req); } static void recovery_nodemap_verify(struct tevent_req *subreq) { struct tevent_req *req = tevent_req_callback_data( subreq, struct tevent_req); struct recovery_state *state = tevent_req_data( req, struct recovery_state); struct ctdb_req_control request; struct ctdb_reply_control **reply; struct node_list *nlist; unsigned int i; int *err_list; int ret; bool status; status = ctdb_client_control_multi_recv(subreq, &ret, state, &err_list, &reply); TALLOC_FREE(subreq); if (! status) { int ret2; uint32_t pnn; ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list, state->nlist->count, err_list, &pnn); if (ret2 != 0) { D_ERR("control GET_NODEMAP failed on node %u," " ret=%d\n", pnn, ret2); } else { D_ERR("control GET_NODEMAP failed, ret=%d\n", ret); } tevent_req_error(req, ret); return; } nlist = node_list_init(state, state->nlist->size); if (tevent_req_nomem(nlist, req)) { return; } for (i=0; inlist->count; i++) { struct ctdb_node_map *nodemap = NULL; uint32_t pnn, flags; unsigned int j; bool ok; pnn = state->nlist->pnn_list[i]; ret = ctdb_reply_control_get_nodemap(reply[i], state, &nodemap); if (ret != 0) { D_ERR("control GET_NODEMAP failed on node %u\n", pnn); tevent_req_error(req, EPROTO); return; } flags = NODE_FLAGS_DISCONNECTED; for (j=0; jnum; j++) { if (nodemap->node[j].pnn == pnn) { flags = nodemap->node[j].flags; break; } } TALLOC_FREE(nodemap); if (flags & NODE_FLAGS_INACTIVE) { continue; } ok = node_list_add(nlist, pnn); if (!ok) { tevent_req_error(req, EINVAL); return; } } talloc_free(reply); talloc_free(state->nlist); state->nlist = nlist; ctdb_req_control_get_capabilities(&request); subreq = ctdb_client_control_multi_send(state, state->ev, state->client, state->nlist->pnn_list, state->nlist->count, TIMEOUT(), &request); if (tevent_req_nomem(subreq, req)) { return; } tevent_req_set_callback(subreq, recovery_capabilities_done, req); } static void recovery_capabilities_done(struct tevent_req *subreq) { struct tevent_req *req = tevent_req_callback_data( subreq, struct tevent_req); struct recovery_state *state = tevent_req_data( req, struct recovery_state); struct ctdb_reply_control **reply; struct ctdb_req_control request; int *err_list; unsigned int i; int ret; bool status; status = ctdb_client_control_multi_recv(subreq, &ret, state, &err_list, &reply); TALLOC_FREE(subreq); if (! status) { int ret2; uint32_t pnn; ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list, state->nlist->count, err_list, &pnn); if (ret2 != 0) { D_ERR("control GET_CAPABILITIES failed on node %u," " ret=%d\n", pnn, ret2); } else { D_ERR("control GET_CAPABILITIES failed, ret=%d\n", ret); } tevent_req_error(req, ret); return; } for (i=0; inlist->count; i++) { uint32_t caps; ret = ctdb_reply_control_get_capabilities(reply[i], &caps); if (ret != 0) { D_ERR("control GET_CAPABILITIES failed on node %u\n", state->nlist->pnn_list[i]); tevent_req_error(req, EPROTO); return; } state->nlist->caps[i] = caps; } talloc_free(reply); ctdb_req_control_get_dbmap(&request); subreq = ctdb_client_control_multi_send(state, state->ev, state->client, state->nlist->pnn_list, state->nlist->count, TIMEOUT(), &request); if (tevent_req_nomem(subreq, req)) { return; } tevent_req_set_callback(subreq, recovery_dbmap_done, req); } static void recovery_dbmap_done(struct tevent_req *subreq) { struct tevent_req *req = tevent_req_callback_data( subreq, struct tevent_req); struct recovery_state *state = tevent_req_data( req, struct recovery_state); struct ctdb_reply_control **reply; struct ctdb_req_control request; int *err_list; unsigned int i, j; int ret; bool status; status = ctdb_client_control_multi_recv(subreq, &ret, state, &err_list, &reply); TALLOC_FREE(subreq); if (! status) { int ret2; uint32_t pnn; ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list, state->nlist->count, err_list, &pnn); if (ret2 != 0) { D_ERR("control GET_DBMAP failed on node %u," " ret=%d\n", pnn, ret2); } else { D_ERR("control GET_DBMAP failed, ret=%d\n", ret); } tevent_req_error(req, ret); return; } state->dblist = db_list_init(state, state->nlist->count); if (tevent_req_nomem(state->dblist, req)) { D_ERR("memory allocation error\n"); return; } for (i = 0; i < state->nlist->count; i++) { struct ctdb_dbid_map *dbmap = NULL; uint32_t pnn; pnn = state->nlist->pnn_list[i]; ret = ctdb_reply_control_get_dbmap(reply[i], state, &dbmap); if (ret != 0) { D_ERR("control GET_DBMAP failed on node %u\n", pnn); tevent_req_error(req, EPROTO); return; } for (j = 0; j < dbmap->num; j++) { ret = db_list_check_and_add(state->dblist, dbmap->dbs[j].db_id, dbmap->dbs[j].flags, pnn); if (ret != 0) { D_ERR("failed to add database list entry, " "ret=%d\n", ret); tevent_req_error(req, ret); return; } } TALLOC_FREE(dbmap); } ctdb_req_control_set_recmode(&request, CTDB_RECOVERY_ACTIVE); subreq = ctdb_client_control_multi_send(state, state->ev, state->client, state->nlist->pnn_list, state->nlist->count, TIMEOUT(), &request); if (tevent_req_nomem(subreq, req)) { return; } tevent_req_set_callback(subreq, recovery_active_done, req); } static void recovery_active_done(struct tevent_req *subreq) { struct tevent_req *req = tevent_req_callback_data( subreq, struct tevent_req); struct recovery_state *state = tevent_req_data( req, struct recovery_state); struct ctdb_req_control request; struct ctdb_vnn_map *vnnmap; int *err_list; int ret; bool status; status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list, NULL); TALLOC_FREE(subreq); if (! status) { int ret2; uint32_t pnn; ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list, state->nlist->count, err_list, &pnn); if (ret2 != 0) { D_ERR("failed to set recovery mode ACTIVE on node %u," " ret=%d\n", pnn, ret2); } else { D_ERR("failed to set recovery mode ACTIVE, ret=%d\n", ret); } tevent_req_error(req, ret); return; } D_ERR("Set recovery mode to ACTIVE\n"); /* Calculate new VNNMAP */ vnnmap = talloc_zero(state, struct ctdb_vnn_map); if (tevent_req_nomem(vnnmap, req)) { return; } vnnmap->map = node_list_lmaster(state->nlist, vnnmap, &vnnmap->size); if (tevent_req_nomem(vnnmap->map, req)) { return; } if (vnnmap->size == 0) { D_WARNING("No active lmasters found. Adding recmaster anyway\n"); vnnmap->map[0] = state->destnode; vnnmap->size = 1; } vnnmap->generation = state->generation; state->vnnmap = vnnmap; ctdb_req_control_start_recovery(&request); subreq = ctdb_client_control_multi_send(state, state->ev, state->client, state->nlist->pnn_list, state->nlist->count, TIMEOUT(), &request); if (tevent_req_nomem(subreq, req)) { return; } tevent_req_set_callback(subreq, recovery_start_recovery_done, req); } static void recovery_start_recovery_done(struct tevent_req *subreq) { struct tevent_req *req = tevent_req_callback_data( subreq, struct tevent_req); struct recovery_state *state = tevent_req_data( req, struct recovery_state); struct ctdb_req_control request; int *err_list; int ret; bool status; status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list, NULL); TALLOC_FREE(subreq); if (! status) { int ret2; uint32_t pnn; ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list, state->nlist->count, err_list, &pnn); if (ret2 != 0) { D_ERR("failed to run start_recovery event on node %u," " ret=%d\n", pnn, ret2); } else { D_ERR("failed to run start_recovery event, ret=%d\n", ret); } tevent_req_error(req, ret); return; } D_ERR("start_recovery event finished\n"); ctdb_req_control_setvnnmap(&request, state->vnnmap); subreq = ctdb_client_control_multi_send(state, state->ev, state->client, state->nlist->pnn_list, state->nlist->count, TIMEOUT(), &request); if (tevent_req_nomem(subreq, req)) { return; } tevent_req_set_callback(subreq, recovery_vnnmap_update_done, req); } static void recovery_vnnmap_update_done(struct tevent_req *subreq) { struct tevent_req *req = tevent_req_callback_data( subreq, struct tevent_req); struct recovery_state *state = tevent_req_data( req, struct recovery_state); int *err_list; int ret; bool status; status = ctdb_client_control_multi_recv(subreq, &ret, NULL, &err_list, NULL); TALLOC_FREE(subreq); if (! status) { int ret2; uint32_t pnn; ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list, state->nlist->count, err_list, &pnn); if (ret2 != 0) { D_ERR("failed to update VNNMAP on node %u, ret=%d\n", pnn, ret2); } else { D_ERR("failed to update VNNMAP, ret=%d\n", ret); } tevent_req_error(req, ret); return; } D_NOTICE("updated VNNMAP\n"); subreq = db_recovery_send(state, state->ev, state->client, state->dblist, state->tun_list, state->nlist, state->vnnmap->generation); if (tevent_req_nomem(subreq, req)) { return; } tevent_req_set_callback(subreq, recovery_db_recovery_done, req); } static void recovery_db_recovery_done(struct tevent_req *subreq) { struct tevent_req *req = tevent_req_callback_data( subreq, struct tevent_req); struct recovery_state *state = tevent_req_data( req, struct recovery_state); struct ctdb_req_control request; bool status; unsigned int count; status = db_recovery_recv(subreq, &count); TALLOC_FREE(subreq); D_ERR("%d of %d databases recovered\n", count, state->dblist->num_dbs); if (! status) { subreq = ban_node_send(state, state->ev, state->client, state->tun_list, state->nlist); if (tevent_req_nomem(subreq, req)) { return; } tevent_req_set_callback(subreq, recovery_failed_done, req); return; } ctdb_req_control_set_recmode(&request, CTDB_RECOVERY_NORMAL); subreq = ctdb_client_control_multi_send(state, state->ev, state->client, state->nlist->pnn_list, state->nlist->count, TIMEOUT(), &request); if (tevent_req_nomem(subreq, req)) { return; } tevent_req_set_callback(subreq, recovery_normal_done, req); } static void recovery_failed_done(struct tevent_req *subreq) { struct tevent_req *req = tevent_req_callback_data( subreq, struct tevent_req); int ret; bool status; status = ban_node_recv(subreq, &ret); TALLOC_FREE(subreq); if (! status) { D_ERR("failed to ban node, ret=%d\n", ret); } tevent_req_error(req, EIO); } static void recovery_normal_done(struct tevent_req *subreq) { struct tevent_req *req = tevent_req_callback_data( subreq, struct tevent_req); struct recovery_state *state = tevent_req_data( req, struct recovery_state); struct ctdb_req_control request; int *err_list; int ret; bool status; status = ctdb_client_control_multi_recv(subreq, &ret, state, &err_list, NULL); TALLOC_FREE(subreq); if (! status) { int ret2; uint32_t pnn; ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list, state->nlist->count, err_list, &pnn); if (ret2 != 0) { D_ERR("failed to set recovery mode NORMAL on node %u," " ret=%d\n", pnn, ret2); } else { D_ERR("failed to set recovery mode NORMAL, ret=%d\n", ret); } tevent_req_error(req, ret); return; } D_ERR("Set recovery mode to NORMAL\n"); ctdb_req_control_end_recovery(&request); subreq = ctdb_client_control_multi_send(state, state->ev, state->client, state->nlist->pnn_list, state->nlist->count, TIMEOUT(), &request); if (tevent_req_nomem(subreq, req)) { return; } tevent_req_set_callback(subreq, recovery_end_recovery_done, req); } static void recovery_end_recovery_done(struct tevent_req *subreq) { struct tevent_req *req = tevent_req_callback_data( subreq, struct tevent_req); struct recovery_state *state = tevent_req_data( req, struct recovery_state); int *err_list; int ret; bool status; status = ctdb_client_control_multi_recv(subreq, &ret, state, &err_list, NULL); TALLOC_FREE(subreq); if (! status) { int ret2; uint32_t pnn; ret2 = ctdb_client_control_multi_error(state->nlist->pnn_list, state->nlist->count, err_list, &pnn); if (ret2 != 0) { D_ERR("failed to run recovered event on node %u," " ret=%d\n", pnn, ret2); } else { D_ERR("failed to run recovered event, ret=%d\n", ret); } tevent_req_error(req, ret); return; } D_ERR("recovered event finished\n"); tevent_req_done(req); } static void recovery_recv(struct tevent_req *req, int *perr) { generic_recv(req, perr); } static void usage(const char *progname) { fprintf(stderr, "\nUsage: %s \n", progname); } /* * Arguments - log fd, write fd, socket path, generation */ int main(int argc, char *argv[]) { int write_fd; const char *sockpath; TALLOC_CTX *mem_ctx = NULL; struct tevent_context *ev; struct ctdb_client_context *client; bool status; int ret = 0; struct tevent_req *req; uint32_t generation; if (argc != 4) { usage(argv[0]); exit(1); } write_fd = atoi(argv[1]); sockpath = argv[2]; generation = (uint32_t)smb_strtoul(argv[3], NULL, 0, &ret, SMB_STR_STANDARD); if (ret != 0) { fprintf(stderr, "recovery: unable to initialize generation\n"); goto failed; } mem_ctx = talloc_new(NULL); if (mem_ctx == NULL) { fprintf(stderr, "recovery: talloc_new() failed\n"); goto failed; } ret = logging_init(mem_ctx, NULL, NULL, "ctdb-recovery"); if (ret != 0) { fprintf(stderr, "recovery: Unable to initialize logging\n"); goto failed; } ev = tevent_context_init(mem_ctx); if (ev == NULL) { D_ERR("tevent_context_init() failed\n"); goto failed; } status = logging_setup_sighup_handler(ev, mem_ctx, NULL, NULL); if (!status) { D_ERR("logging_setup_sighup_handler() failed\n"); goto failed; } ret = ctdb_client_init(mem_ctx, ev, sockpath, &client); if (ret != 0) { D_ERR("ctdb_client_init() failed, ret=%d\n", ret); goto failed; } req = recovery_send(mem_ctx, ev, client, generation); if (req == NULL) { D_ERR("database_recover_send() failed\n"); goto failed; } if (! tevent_req_poll(req, ev)) { D_ERR("tevent_req_poll() failed\n"); goto failed; } recovery_recv(req, &ret); TALLOC_FREE(req); if (ret != 0) { D_ERR("database recovery failed, ret=%d\n", ret); goto failed; } sys_write(write_fd, &ret, sizeof(ret)); return 0; failed: TALLOC_FREE(mem_ctx); return 1; }