1
0
mirror of https://github.com/samba-team/samba.git synced 2024-12-22 13:34:15 +03:00
samba-mirror/ctdb/server/ctdb_recoverd.c
Martin Schwenke 52e5e92693 ctdb-conf: Move all conf files to new conf/ subdirectory
Leave common/conf.[ch] where they are to make this commit
comprehensible.

Signed-off-by: Martin Schwenke <martin@meltin.net>
Reviewed-by: Guenther Deschner <gd@samba.org>
Reviewed-by: Anoop C S <anoopcs@samba.org>
2024-06-28 18:43:52 +05:30

3297 lines
80 KiB
C

/*
ctdb recovery daemon
Copyright (C) Ronnie Sahlberg 2007
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, see <http://www.gnu.org/licenses/>.
*/
#include "replace.h"
#include "system/filesys.h"
#include "system/time.h"
#include "system/network.h"
#include "system/wait.h"
#include <popt.h>
#include <talloc.h>
#include <tevent.h>
#include <tdb.h>
#include "lib/tdb_wrap/tdb_wrap.h"
#include "lib/util/dlinklist.h"
#include "lib/util/debug.h"
#include "lib/util/samba_util.h"
#include "lib/util/sys_rw.h"
#include "lib/util/util_process.h"
#include "ctdb_private.h"
#include "ctdb_client.h"
#include "protocol/protocol_basic.h"
#include "common/system_socket.h"
#include "common/common.h"
#include "common/logging.h"
#include "conf/ctdb_config.h"
#include "ctdb_cluster_mutex.h"
/* List of SRVID requests that need to be processed */
struct srvid_list {
struct srvid_list *next, *prev;
struct ctdb_srvid_message *request;
};
struct srvid_requests {
struct srvid_list *requests;
};
static void srvid_request_reply(struct ctdb_context *ctdb,
struct ctdb_srvid_message *request,
TDB_DATA result)
{
/* Someone that sent srvid==0 does not want a reply */
if (request->srvid == 0) {
talloc_free(request);
return;
}
if (ctdb_client_send_message(ctdb, request->pnn, request->srvid,
result) == 0) {
DEBUG(DEBUG_INFO,("Sent SRVID reply to %u:%llu\n",
(unsigned)request->pnn,
(unsigned long long)request->srvid));
} else {
DEBUG(DEBUG_ERR,("Failed to send SRVID reply to %u:%llu\n",
(unsigned)request->pnn,
(unsigned long long)request->srvid));
}
talloc_free(request);
}
static void srvid_requests_reply(struct ctdb_context *ctdb,
struct srvid_requests **requests,
TDB_DATA result)
{
struct srvid_list *r;
if (*requests == NULL) {
return;
}
for (r = (*requests)->requests; r != NULL; r = r->next) {
srvid_request_reply(ctdb, r->request, result);
}
/* Free the list structure... */
TALLOC_FREE(*requests);
}
static void srvid_request_add(struct ctdb_context *ctdb,
struct srvid_requests **requests,
struct ctdb_srvid_message *request)
{
struct srvid_list *t;
int32_t ret;
TDB_DATA result;
if (*requests == NULL) {
*requests = talloc_zero(ctdb, struct srvid_requests);
if (*requests == NULL) {
goto nomem;
}
}
t = talloc_zero(*requests, struct srvid_list);
if (t == NULL) {
/* If *requests was just allocated above then free it */
if ((*requests)->requests == NULL) {
TALLOC_FREE(*requests);
}
goto nomem;
}
t->request = (struct ctdb_srvid_message *)talloc_steal(t, request);
DLIST_ADD((*requests)->requests, t);
return;
nomem:
/* Failed to add the request to the list. Send a fail. */
DEBUG(DEBUG_ERR, (__location__
" Out of memory, failed to queue SRVID request\n"));
ret = -ENOMEM;
result.dsize = sizeof(ret);
result.dptr = (uint8_t *)&ret;
srvid_request_reply(ctdb, request, result);
}
/* An abstraction to allow an operation (takeover runs, recoveries,
* ...) to be disabled for a given timeout */
struct ctdb_op_state {
struct tevent_timer *timer;
bool in_progress;
const char *name;
};
static struct ctdb_op_state *ctdb_op_init(TALLOC_CTX *mem_ctx, const char *name)
{
struct ctdb_op_state *state = talloc_zero(mem_ctx, struct ctdb_op_state);
if (state != NULL) {
state->in_progress = false;
state->name = name;
}
return state;
}
static bool ctdb_op_is_disabled(struct ctdb_op_state *state)
{
return state->timer != NULL;
}
static bool ctdb_op_begin(struct ctdb_op_state *state)
{
if (ctdb_op_is_disabled(state)) {
DEBUG(DEBUG_NOTICE,
("Unable to begin - %s are disabled\n", state->name));
return false;
}
state->in_progress = true;
return true;
}
static bool ctdb_op_end(struct ctdb_op_state *state)
{
return state->in_progress = false;
}
static bool ctdb_op_is_in_progress(struct ctdb_op_state *state)
{
return state->in_progress;
}
static void ctdb_op_enable(struct ctdb_op_state *state)
{
TALLOC_FREE(state->timer);
}
static void ctdb_op_timeout_handler(struct tevent_context *ev,
struct tevent_timer *te,
struct timeval yt, void *p)
{
struct ctdb_op_state *state =
talloc_get_type(p, struct ctdb_op_state);
DEBUG(DEBUG_NOTICE,("Reenabling %s after timeout\n", state->name));
ctdb_op_enable(state);
}
static int ctdb_op_disable(struct ctdb_op_state *state,
struct tevent_context *ev,
uint32_t timeout)
{
if (timeout == 0) {
DEBUG(DEBUG_NOTICE,("Reenabling %s\n", state->name));
ctdb_op_enable(state);
return 0;
}
if (state->in_progress) {
DEBUG(DEBUG_ERR,
("Unable to disable %s - in progress\n", state->name));
return -EAGAIN;
}
DEBUG(DEBUG_NOTICE,("Disabling %s for %u seconds\n",
state->name, timeout));
/* Clear any old timers */
talloc_free(state->timer);
/* Arrange for the timeout to occur */
state->timer = tevent_add_timer(ev, state,
timeval_current_ofs(timeout, 0),
ctdb_op_timeout_handler, state);
if (state->timer == NULL) {
DEBUG(DEBUG_ERR,(__location__ " Unable to setup timer\n"));
return -ENOMEM;
}
return 0;
}
struct ctdb_banning_state {
uint32_t pnn;
uint32_t count;
struct timeval last_reported_time;
};
struct ctdb_cluster_lock_handle;
/*
private state of recovery daemon
*/
struct ctdb_recoverd {
struct ctdb_context *ctdb;
uint32_t leader;
struct tevent_timer *leader_broadcast_te;
struct tevent_timer *leader_broadcast_timeout_te;
uint32_t pnn;
uint32_t last_culprit_node;
struct ctdb_banning_state *banning_state;
struct ctdb_node_map_old *nodemap;
struct timeval priority_time;
bool need_takeover_run;
bool need_recovery;
uint32_t node_flags;
struct tevent_timer *send_election_te;
bool election_in_progress;
struct tevent_timer *election_timeout;
struct srvid_requests *reallocate_requests;
struct ctdb_op_state *takeover_run;
struct ctdb_op_state *recovery;
struct ctdb_iface_list_old *ifaces;
uint32_t *force_rebalance_nodes;
struct ctdb_node_capabilities *caps;
bool frozen_on_inactive;
struct ctdb_cluster_lock_handle *cluster_lock_handle;
pid_t helper_pid;
};
#define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
#define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
static void ctdb_restart_recd(struct tevent_context *ev,
struct tevent_timer *te, struct timeval t,
void *private_data);
static bool this_node_is_leader(struct ctdb_recoverd *rec)
{
return rec->leader == rec->pnn;
}
static bool this_node_can_be_leader(struct ctdb_recoverd *rec)
{
return (rec->node_flags & NODE_FLAGS_INACTIVE) == 0 &&
(rec->ctdb->capabilities & CTDB_CAP_RECMASTER) != 0;
}
static bool node_flags(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t *flags)
{
size_t i;
for (i = 0; i < rec->nodemap->num; i++) {
struct ctdb_node_and_flags *node = &rec->nodemap->nodes[i];
if (node->pnn == pnn) {
if (flags != NULL) {
*flags = node->flags;
}
return true;
}
}
return false;
}
/*
ban a node for a period of time
*/
static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn)
{
int ret;
struct ctdb_context *ctdb = rec->ctdb;
uint32_t ban_time = ctdb->tunable.recovery_ban_period;
struct ctdb_ban_state bantime;
if (!ctdb_validate_pnn(ctdb, pnn)) {
DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_ban_node\n", pnn));
return;
}
DEBUG(DEBUG_NOTICE,("Banning node %u for %u seconds\n", pnn, ban_time));
bantime.pnn = pnn;
bantime.time = ban_time;
ret = ctdb_ctrl_set_ban(ctdb, CONTROL_TIMEOUT(), pnn, &bantime);
if (ret != 0) {
DEBUG(DEBUG_ERR,(__location__ " Failed to ban node %d\n", pnn));
return;
}
}
enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
/*
remember the trouble maker
*/
static void ctdb_set_culprit_count(struct ctdb_recoverd *rec,
uint32_t culprit,
uint32_t count)
{
struct ctdb_context *ctdb = talloc_get_type_abort(
rec->ctdb, struct ctdb_context);
struct ctdb_banning_state *ban_state = NULL;
size_t len;
bool ok;
ok = node_flags(rec, culprit, NULL);
if (!ok) {
DBG_WARNING("Unknown culprit node %"PRIu32"\n", culprit);
return;
}
/* If we are banned or stopped, do not set other nodes as culprits */
if (rec->node_flags & NODE_FLAGS_INACTIVE) {
D_WARNING("This node is INACTIVE, cannot set culprit node %d\n",
culprit);
return;
}
if (rec->banning_state == NULL) {
len = 0;
} else {
size_t i;
len = talloc_array_length(rec->banning_state);
for (i = 0 ; i < len; i++) {
if (rec->banning_state[i].pnn == culprit) {
ban_state= &rec->banning_state[i];
break;
}
}
}
/* Not found, so extend (or allocate new) array */
if (ban_state == NULL) {
struct ctdb_banning_state *t;
len += 1;
/*
* talloc_realloc() handles the corner case where
* rec->banning_state is NULL
*/
t = talloc_realloc(rec,
rec->banning_state,
struct ctdb_banning_state,
len);
if (t == NULL) {
DBG_WARNING("Memory allocation error\n");
return;
}
rec->banning_state = t;
/* New element is always at the end - initialise it... */
ban_state = &rec->banning_state[len - 1];
*ban_state = (struct ctdb_banning_state) {
.pnn = culprit,
.count = 0,
};
} else if (ban_state->count > 0 &&
timeval_elapsed(&ban_state->last_reported_time) >
ctdb->tunable.recovery_grace_period) {
/*
* Forgive old transgressions beyond the tunable time-limit
*/
ban_state->count = 0;
}
ban_state->count += count;
ban_state->last_reported_time = timeval_current();
rec->last_culprit_node = culprit;
}
static void ban_counts_reset(struct ctdb_recoverd *rec)
{
D_NOTICE("Resetting ban count to 0 for all nodes\n");
TALLOC_FREE(rec->banning_state);
}
/*
remember the trouble maker
*/
static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
{
ctdb_set_culprit_count(rec, culprit, 1);
}
/*
Retrieve capabilities from all connected nodes
*/
static int update_capabilities(struct ctdb_recoverd *rec,
struct ctdb_node_map_old *nodemap)
{
uint32_t *capp;
TALLOC_CTX *tmp_ctx;
struct ctdb_node_capabilities *caps;
struct ctdb_context *ctdb = rec->ctdb;
tmp_ctx = talloc_new(rec);
CTDB_NO_MEMORY(ctdb, tmp_ctx);
caps = ctdb_get_capabilities(ctdb, tmp_ctx,
CONTROL_TIMEOUT(), nodemap);
if (caps == NULL) {
DEBUG(DEBUG_ERR,
(__location__ " Failed to get node capabilities\n"));
talloc_free(tmp_ctx);
return -1;
}
capp = ctdb_get_node_capabilities(caps, rec->pnn);
if (capp == NULL) {
DEBUG(DEBUG_ERR,
(__location__
" Capabilities don't include current node.\n"));
talloc_free(tmp_ctx);
return -1;
}
ctdb->capabilities = *capp;
TALLOC_FREE(rec->caps);
rec->caps = talloc_steal(rec, caps);
talloc_free(tmp_ctx);
return 0;
}
/*
change recovery mode on all nodes
*/
static int set_recovery_mode(struct ctdb_context *ctdb,
struct ctdb_recoverd *rec,
struct ctdb_node_map_old *nodemap,
uint32_t rec_mode)
{
TDB_DATA data;
uint32_t *nodes;
TALLOC_CTX *tmp_ctx;
tmp_ctx = talloc_new(ctdb);
CTDB_NO_MEMORY(ctdb, tmp_ctx);
nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
data.dsize = sizeof(uint32_t);
data.dptr = (unsigned char *)&rec_mode;
if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMODE,
nodes, 0,
CONTROL_TIMEOUT(),
false, data,
NULL, NULL,
NULL) != 0) {
DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode. Recovery failed.\n"));
talloc_free(tmp_ctx);
return -1;
}
talloc_free(tmp_ctx);
return 0;
}
/*
* Update flags on all connected nodes
*/
static int update_flags_on_all_nodes(struct ctdb_recoverd *rec,
uint32_t pnn,
uint32_t flags)
{
struct ctdb_context *ctdb = rec->ctdb;
struct timeval timeout = CONTROL_TIMEOUT();
TDB_DATA data;
struct ctdb_node_map_old *nodemap=NULL;
struct ctdb_node_flag_change c;
TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
uint32_t *nodes;
uint32_t i;
int ret;
nodemap = rec->nodemap;
for (i = 0; i < nodemap->num; i++) {
if (pnn == nodemap->nodes[i].pnn) {
break;
}
}
if (i >= nodemap->num) {
DBG_ERR("Nodemap does not contain node %d\n", pnn);
talloc_free(tmp_ctx);
return -1;
}
c.pnn = pnn;
c.old_flags = nodemap->nodes[i].flags;
c.new_flags = flags;
data.dsize = sizeof(c);
data.dptr = (unsigned char *)&c;
/* send the flags update to all connected nodes */
nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
ret = ctdb_client_async_control(ctdb,
CTDB_CONTROL_MODIFY_FLAGS,
nodes,
0,
timeout,
false,
data,
NULL,
NULL,
NULL);
if (ret != 0) {
DBG_ERR("Unable to update flags on remote nodes\n");
talloc_free(tmp_ctx);
return -1;
}
talloc_free(tmp_ctx);
return 0;
}
static bool _cluster_lock_lock(struct ctdb_recoverd *rec);
static bool cluster_lock_held(struct ctdb_recoverd *rec);
static bool cluster_lock_enabled(struct ctdb_recoverd *rec)
{
return rec->ctdb->recovery_lock != NULL;
}
static bool cluster_lock_take(struct ctdb_recoverd *rec)
{
struct ctdb_context *ctdb = rec->ctdb;
bool have_lock;
if (!cluster_lock_enabled(rec)) {
return true;
}
if (cluster_lock_held(rec)) {
D_NOTICE("Already holding cluster lock\n");
return true;
}
D_NOTICE("Attempting to take cluster lock (%s)\n", ctdb->recovery_lock);
have_lock = _cluster_lock_lock(rec);
if (!have_lock) {
return false;
}
D_NOTICE("Cluster lock taken successfully\n");
return true;
}
/*
called when ctdb_wait_timeout should finish
*/
static void ctdb_wait_handler(struct tevent_context *ev,
struct tevent_timer *te,
struct timeval yt, void *p)
{
uint32_t *timed_out = (uint32_t *)p;
(*timed_out) = 1;
}
/*
wait for a given number of seconds
*/
static void ctdb_wait_timeout(struct ctdb_context *ctdb, double secs)
{
uint32_t timed_out = 0;
uint32_t usecs = (secs - (uint32_t)secs) * 1000000;
tevent_add_timer(ctdb->ev, ctdb, timeval_current_ofs(secs, usecs),
ctdb_wait_handler, &timed_out);
while (!timed_out) {
tevent_loop_once(ctdb->ev);
}
}
/*
* Broadcast cluster leader
*/
static int leader_broadcast_send(struct ctdb_recoverd *rec, uint32_t pnn)
{
struct ctdb_context *ctdb = rec->ctdb;
TDB_DATA data;
int ret;
data.dptr = (uint8_t *)&pnn;
data.dsize = sizeof(pnn);
ret = ctdb_client_send_message(ctdb,
CTDB_BROADCAST_CONNECTED,
CTDB_SRVID_LEADER,
data);
return ret;
}
static int leader_broadcast_loop(struct ctdb_recoverd *rec);
static void cluster_lock_release(struct ctdb_recoverd *rec);
/* This runs continuously but only sends the broadcast when leader */
static void leader_broadcast_loop_handler(struct tevent_context *ev,
struct tevent_timer *te,
struct timeval current_time,
void *private_data)
{
struct ctdb_recoverd *rec = talloc_get_type_abort(
private_data, struct ctdb_recoverd);
int ret;
if (!this_node_can_be_leader(rec)) {
if (this_node_is_leader(rec)) {
rec->leader = CTDB_UNKNOWN_PNN;
}
if (cluster_lock_enabled(rec) && cluster_lock_held(rec)) {
cluster_lock_release(rec);
}
goto done;
}
if (!this_node_is_leader(rec)) {
goto done;
}
if (rec->election_in_progress) {
goto done;
}
ret = leader_broadcast_send(rec, rec->leader);
if (ret != 0) {
DBG_WARNING("Failed to send leader broadcast\n");
}
done:
ret = leader_broadcast_loop(rec);
if (ret != 0) {
D_WARNING("Failed to set up leader broadcast\n");
}
}
static int leader_broadcast_loop(struct ctdb_recoverd *rec)
{
struct ctdb_context *ctdb = rec->ctdb;
TALLOC_FREE(rec->leader_broadcast_te);
rec->leader_broadcast_te =
tevent_add_timer(ctdb->ev,
rec,
timeval_current_ofs(1, 0),
leader_broadcast_loop_handler,
rec);
if (rec->leader_broadcast_te == NULL) {
return ENOMEM;
}
return 0;
}
static bool leader_broadcast_loop_active(struct ctdb_recoverd *rec)
{
return rec->leader_broadcast_te != NULL;
}
/*
called when an election times out (ends)
*/
static void ctdb_election_timeout(struct tevent_context *ev,
struct tevent_timer *te,
struct timeval t, void *p)
{
struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
bool ok;
rec->election_in_progress = false;
rec->election_timeout = NULL;
fast_start = false;
D_WARNING("Election period ended, leader=%u\n", rec->leader);
if (!this_node_is_leader(rec)) {
return;
}
ok = cluster_lock_take(rec);
if (!ok) {
D_ERR("Unable to get cluster lock, banning node\n");
ctdb_ban_node(rec, rec->pnn);
}
}
/*
wait for an election to finish. It finished election_timeout seconds after
the last election packet is received
*/
static void ctdb_wait_election(struct ctdb_recoverd *rec)
{
struct ctdb_context *ctdb = rec->ctdb;
while (rec->election_in_progress) {
tevent_loop_once(ctdb->ev);
}
}
/*
* Update local flags from all remote connected nodes and push out
* flags changes to all nodes. This is only run by the leader.
*/
static int update_flags(struct ctdb_recoverd *rec,
struct ctdb_node_map_old *nodemap,
struct ctdb_node_map_old **remote_nodemaps)
{
unsigned int j;
struct ctdb_context *ctdb = rec->ctdb;
TALLOC_CTX *mem_ctx = talloc_new(ctdb);
/* Check flags from remote nodes */
for (j=0; j<nodemap->num; j++) {
struct ctdb_node_map_old *remote_nodemap=NULL;
uint32_t local_flags = nodemap->nodes[j].flags;
uint32_t remote_pnn = nodemap->nodes[j].pnn;
uint32_t remote_flags;
unsigned int i;
int ret;
if (local_flags & NODE_FLAGS_DISCONNECTED) {
continue;
}
if (remote_pnn == rec->pnn) {
/*
* No remote nodemap for this node since this
* is the local nodemap. However, still need
* to check this against the remote nodes and
* push it if they are out-of-date.
*/
goto compare_remotes;
}
remote_nodemap = remote_nodemaps[j];
remote_flags = remote_nodemap->nodes[j].flags;
if (local_flags != remote_flags) {
/*
* Update the local copy of the flags in the
* recovery daemon.
*/
D_NOTICE("Remote node %u had flags 0x%x, "
"local had 0x%x - updating local\n",
remote_pnn,
remote_flags,
local_flags);
nodemap->nodes[j].flags = remote_flags;
local_flags = remote_flags;
goto push;
}
compare_remotes:
for (i = 0; i < nodemap->num; i++) {
if (i == j) {
continue;
}
if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
continue;
}
if (nodemap->nodes[i].pnn == rec->pnn) {
continue;
}
remote_nodemap = remote_nodemaps[i];
remote_flags = remote_nodemap->nodes[j].flags;
if (local_flags != remote_flags) {
goto push;
}
}
continue;
push:
D_NOTICE("Pushing updated flags for node %u (0x%x)\n",
remote_pnn,
local_flags);
ret = update_flags_on_all_nodes(rec, remote_pnn, local_flags);
if (ret != 0) {
DBG_ERR("Unable to update flags on remote nodes\n");
talloc_free(mem_ctx);
return -1;
}
}
talloc_free(mem_ctx);
return 0;
}
/* Create a new random generation id.
The generation id can not be the INVALID_GENERATION id
*/
static uint32_t new_generation(void)
{
uint32_t generation;
while (1) {
generation = random();
if (generation != INVALID_GENERATION) {
break;
}
}
return generation;
}
static bool cluster_lock_held(struct ctdb_recoverd *rec)
{
return (rec->cluster_lock_handle != NULL);
}
struct ctdb_cluster_lock_handle {
bool done;
bool locked;
double latency;
struct ctdb_cluster_mutex_handle *h;
struct ctdb_recoverd *rec;
};
static void take_cluster_lock_handler(char status,
double latency,
void *private_data)
{
struct ctdb_cluster_lock_handle *s =
(struct ctdb_cluster_lock_handle *) private_data;
s->locked = (status == '0') ;
/*
* If unsuccessful then ensure the process has exited and that
* the file descriptor event handler has been cancelled
*/
if (! s->locked) {
TALLOC_FREE(s->h);
}
switch (status) {
case '0':
s->latency = latency;
break;
case '1':
D_ERR("Unable to take cluster lock - contention\n");
break;
case '2':
D_ERR("Unable to take cluster lock - timeout\n");
break;
default:
D_ERR("Unable to take cluster lock - unknown error\n");
}
s->done = true;
}
static void force_election(struct ctdb_recoverd *rec);
static void lost_cluster_lock_handler(void *private_data)
{
struct ctdb_recoverd *rec = talloc_get_type_abort(
private_data, struct ctdb_recoverd);
D_ERR("Cluster lock helper terminated\n");
TALLOC_FREE(rec->cluster_lock_handle);
if (this_node_can_be_leader(rec)) {
force_election(rec);
}
}
static bool _cluster_lock_lock(struct ctdb_recoverd *rec)
{
struct ctdb_context *ctdb = rec->ctdb;
struct ctdb_cluster_mutex_handle *h;
struct ctdb_cluster_lock_handle *s;
s = talloc_zero(rec, struct ctdb_cluster_lock_handle);
if (s == NULL) {
DBG_ERR("Memory allocation error\n");
return false;
};
s->rec = rec;
h = ctdb_cluster_mutex(s,
ctdb,
ctdb->recovery_lock,
120,
take_cluster_lock_handler,
s,
lost_cluster_lock_handler,
rec);
if (h == NULL) {
talloc_free(s);
return false;
}
rec->cluster_lock_handle = s;
s->h = h;
while (! s->done) {
tevent_loop_once(ctdb->ev);
}
if (! s->locked) {
TALLOC_FREE(rec->cluster_lock_handle);
return false;
}
ctdb_ctrl_report_recd_lock_latency(ctdb,
CONTROL_TIMEOUT(),
s->latency);
return true;
}
static void cluster_lock_release(struct ctdb_recoverd *rec)
{
if (rec->cluster_lock_handle == NULL) {
return;
}
if (! rec->cluster_lock_handle->done) {
/*
* Taking of cluster lock still in progress. Free
* the cluster mutex handle to release it but leave
* the cluster lock handle in place to allow taking
* of the lock to fail.
*/
D_NOTICE("Cancelling cluster lock\n");
TALLOC_FREE(rec->cluster_lock_handle->h);
rec->cluster_lock_handle->done = true;
rec->cluster_lock_handle->locked = false;
return;
}
D_NOTICE("Releasing cluster lock\n");
TALLOC_FREE(rec->cluster_lock_handle);
}
static void ban_misbehaving_nodes(struct ctdb_recoverd *rec, bool *self_ban)
{
size_t len = talloc_array_length(rec->banning_state);
size_t i;
*self_ban = false;
for (i = 0; i < len; i++) {
struct ctdb_banning_state *ban_state = &rec->banning_state[i];
if (ban_state->count < 2 * rec->nodemap->num) {
continue;
}
D_NOTICE("Node %u reached %u banning credits\n",
ban_state->pnn,
ban_state->count);
ctdb_ban_node(rec, ban_state->pnn);
ban_state->count = 0;
/* Banning ourself? */
if (ban_state->pnn == rec->pnn) {
*self_ban = true;
}
}
}
struct helper_state {
int fd[2];
pid_t pid;
int result;
bool done;
};
static void helper_handler(struct tevent_context *ev,
struct tevent_fd *fde,
uint16_t flags, void *private_data)
{
struct helper_state *state = talloc_get_type_abort(
private_data, struct helper_state);
int ret;
ret = sys_read(state->fd[0], &state->result, sizeof(state->result));
if (ret != sizeof(state->result)) {
state->result = EPIPE;
}
state->done = true;
}
static int helper_run(struct ctdb_recoverd *rec, TALLOC_CTX *mem_ctx,
const char *prog, const char *arg, const char *type)
{
struct helper_state *state;
struct tevent_fd *fde;
const char **args;
int nargs, ret;
state = talloc_zero(mem_ctx, struct helper_state);
if (state == NULL) {
DEBUG(DEBUG_ERR, (__location__ " memory error\n"));
return -1;
}
state->pid = -1;
ret = pipe(state->fd);
if (ret != 0) {
DEBUG(DEBUG_ERR,
("Failed to create pipe for %s helper\n", type));
goto fail;
}
set_close_on_exec(state->fd[0]);
nargs = 4;
args = talloc_array(state, const char *, nargs);
if (args == NULL) {
DEBUG(DEBUG_ERR, (__location__ " memory error\n"));
goto fail;
}
args[0] = talloc_asprintf(args, "%d", state->fd[1]);
if (args[0] == NULL) {
DEBUG(DEBUG_ERR, (__location__ " memory error\n"));
goto fail;
}
args[1] = rec->ctdb->daemon.name;
args[2] = arg;
args[3] = NULL;
if (args[2] == NULL) {
nargs = 3;
}
state->pid = ctdb_vfork_exec(state, rec->ctdb, prog, nargs, args);
if (state->pid == -1) {
DEBUG(DEBUG_ERR,
("Failed to create child for %s helper\n", type));
goto fail;
}
close(state->fd[1]);
state->fd[1] = -1;
rec->helper_pid = state->pid;
state->done = false;
fde = tevent_add_fd(rec->ctdb->ev, state, state->fd[0],
TEVENT_FD_READ, helper_handler, state);
if (fde == NULL) {
goto fail;
}
tevent_fd_set_auto_close(fde);
while (!state->done) {
tevent_loop_once(rec->ctdb->ev);
if (!this_node_is_leader(rec)) {
D_ERR("Leader changed to %u, aborting %s\n",
rec->leader,
type);
state->result = 1;
break;
}
}
close(state->fd[0]);
state->fd[0] = -1;
if (state->result != 0) {
goto fail;
}
rec->helper_pid = -1;
ctdb_kill(rec->ctdb, state->pid, SIGKILL);
talloc_free(state);
return 0;
fail:
if (state->fd[0] != -1) {
close(state->fd[0]);
}
if (state->fd[1] != -1) {
close(state->fd[1]);
}
rec->helper_pid = -1;
if (state->pid != -1) {
ctdb_kill(rec->ctdb, state->pid, SIGKILL);
}
talloc_free(state);
return -1;
}
static int ctdb_takeover(struct ctdb_recoverd *rec,
uint32_t *force_rebalance_nodes)
{
static char prog[PATH_MAX+1] = "";
char *arg;
unsigned int i;
int ret;
if (!ctdb_set_helper("takeover_helper", prog, sizeof(prog),
"CTDB_TAKEOVER_HELPER", CTDB_HELPER_BINDIR,
"ctdb_takeover_helper")) {
ctdb_die(rec->ctdb, "Unable to set takeover helper\n");
}
arg = NULL;
for (i = 0; i < talloc_array_length(force_rebalance_nodes); i++) {
uint32_t pnn = force_rebalance_nodes[i];
if (arg == NULL) {
arg = talloc_asprintf(rec, "%u", pnn);
} else {
arg = talloc_asprintf_append(arg, ",%u", pnn);
}
if (arg == NULL) {
DEBUG(DEBUG_ERR, (__location__ " memory error\n"));
return -1;
}
}
if (ctdb_config.failover_disabled) {
ret = setenv("CTDB_DISABLE_IP_FAILOVER", "1", 1);
if (ret != 0) {
D_ERR("Failed to set CTDB_DISABLE_IP_FAILOVER variable\n");
return -1;
}
}
return helper_run(rec, rec, prog, arg, "takeover");
}
static bool do_takeover_run(struct ctdb_recoverd *rec,
struct ctdb_node_map_old *nodemap)
{
uint32_t *nodes = NULL;
struct ctdb_disable_message dtr;
TDB_DATA data;
size_t i;
uint32_t *rebalance_nodes = rec->force_rebalance_nodes;
int ret;
bool ok;
DEBUG(DEBUG_NOTICE, ("Takeover run starting\n"));
if (ctdb_op_is_in_progress(rec->takeover_run)) {
DEBUG(DEBUG_ERR, (__location__
" takeover run already in progress \n"));
ok = false;
goto done;
}
if (!ctdb_op_begin(rec->takeover_run)) {
ok = false;
goto done;
}
/* Disable IP checks (takeover runs, really) on other nodes
* while doing this takeover run. This will stop those other
* nodes from triggering takeover runs when think they should
* be hosting an IP but it isn't yet on an interface. Don't
* wait for replies since a failure here might cause some
* noise in the logs but will not actually cause a problem.
*/
ZERO_STRUCT(dtr);
dtr.srvid = 0; /* No reply */
dtr.pnn = -1;
data.dptr = (uint8_t*)&dtr;
data.dsize = sizeof(dtr);
nodes = list_of_connected_nodes(rec->ctdb, nodemap, rec, false);
/* Disable for 60 seconds. This can be a tunable later if
* necessary.
*/
dtr.timeout = 60;
for (i = 0; i < talloc_array_length(nodes); i++) {
if (ctdb_client_send_message(rec->ctdb, nodes[i],
CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
data) != 0) {
DEBUG(DEBUG_INFO,("Failed to disable takeover runs\n"));
}
}
ret = ctdb_takeover(rec, rec->force_rebalance_nodes);
/* Re-enable takeover runs and IP checks on other nodes */
dtr.timeout = 0;
for (i = 0; i < talloc_array_length(nodes); i++) {
if (ctdb_client_send_message(rec->ctdb, nodes[i],
CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
data) != 0) {
DEBUG(DEBUG_INFO,("Failed to re-enable takeover runs\n"));
}
}
if (ret != 0) {
DEBUG(DEBUG_ERR, ("ctdb_takeover_run() failed\n"));
ok = false;
goto done;
}
ok = true;
/* Takeover run was successful so clear force rebalance targets */
if (rebalance_nodes == rec->force_rebalance_nodes) {
TALLOC_FREE(rec->force_rebalance_nodes);
} else {
DEBUG(DEBUG_WARNING,
("Rebalance target nodes changed during takeover run - not clearing\n"));
}
done:
rec->need_takeover_run = !ok;
talloc_free(nodes);
ctdb_op_end(rec->takeover_run);
DEBUG(DEBUG_NOTICE, ("Takeover run %s\n", ok ? "completed successfully" : "unsuccessful"));
return ok;
}
static int db_recovery_parallel(struct ctdb_recoverd *rec, TALLOC_CTX *mem_ctx)
{
static char prog[PATH_MAX+1] = "";
const char *arg;
if (!ctdb_set_helper("recovery_helper", prog, sizeof(prog),
"CTDB_RECOVERY_HELPER", CTDB_HELPER_BINDIR,
"ctdb_recovery_helper")) {
ctdb_die(rec->ctdb, "Unable to set recovery helper\n");
}
arg = talloc_asprintf(mem_ctx, "%u", new_generation());
if (arg == NULL) {
DEBUG(DEBUG_ERR, (__location__ " memory error\n"));
return -1;
}
setenv("CTDB_DBDIR_STATE", rec->ctdb->db_directory_state, 1);
return helper_run(rec, mem_ctx, prog, arg, "recovery");
}
/*
* Main recovery function, only run by leader
*/
static int do_recovery(struct ctdb_recoverd *rec, TALLOC_CTX *mem_ctx)
{
struct ctdb_context *ctdb = rec->ctdb;
struct ctdb_node_map_old *nodemap = rec->nodemap;
unsigned int i;
int ret;
bool self_ban;
DBG_NOTICE("Starting do_recovery\n");
/* Check if the current node is still the leader. It's possible that
* re-election has changed the leader.
*/
if (!this_node_is_leader(rec)) {
D_NOTICE("Leader changed to %" PRIu32 ", aborting recovery\n",
rec->leader);
return -1;
}
/* if recovery fails, force it again */
rec->need_recovery = true;
if (!ctdb_op_begin(rec->recovery)) {
return -1;
}
if (rec->election_in_progress) {
/* an election is in progress */
DBG_ERR("do_recovery called while election in progress - try "
"again later\n");
goto fail;
}
ban_misbehaving_nodes(rec, &self_ban);
if (self_ban) {
DBG_NOTICE("This node was banned, aborting recovery\n");
goto fail;
}
if (cluster_lock_enabled(rec) && !cluster_lock_held(rec)) {
/* Leader can change in ban_misbehaving_nodes() */
if (!this_node_is_leader(rec)) {
D_NOTICE("Leader changed to %" PRIu32
", aborting recovery\n",
rec->leader);
rec->need_recovery = false;
goto fail;
}
D_ERR("Cluster lock not held - abort recovery, ban node\n");
ctdb_ban_node(rec, rec->pnn);
goto fail;
}
DBG_NOTICE("Recovery initiated due to problem with node %" PRIu32 "\n",
rec->last_culprit_node);
/* Retrieve capabilities from all connected nodes */
ret = update_capabilities(rec, nodemap);
if (ret!=0) {
DBG_ERR("Unable to update node capabilities.\n");
return -1;
}
/*
update all nodes to have the same flags that we have
*/
for (i=0;i<nodemap->num;i++) {
if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
continue;
}
ret = update_flags_on_all_nodes(rec,
nodemap->nodes[i].pnn,
nodemap->nodes[i].flags);
if (ret != 0) {
if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
DBG_WARNING("Unable to update flags on "
"inactive node %d\n",
i);
} else {
DBG_ERR("Unable to update flags on all nodes "
"for node %d\n",
i);
return -1;
}
}
}
DBG_NOTICE("Recovery - updated flags\n");
ret = db_recovery_parallel(rec, mem_ctx);
if (ret != 0) {
goto fail;
}
do_takeover_run(rec, nodemap);
/* send a message to all clients telling them that the cluster
has been reconfigured */
ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED,
CTDB_SRVID_RECONFIGURE, tdb_null);
if (ret != 0) {
DBG_ERR("Failed to send reconfigure message\n");
goto fail;
}
DBG_NOTICE("Recovery complete\n");
rec->need_recovery = false;
ctdb_op_end(rec->recovery);
/*
* Completed a full recovery so forgive any past transgressions
*/
ban_counts_reset(rec);
/* We just finished a recovery successfully.
We now wait for rerecovery_timeout before we allow
another recovery to take place.
*/
D_NOTICE("Just finished a recovery. New recoveries will now be "
"suppressed for the rerecovery timeout (%" PRIu32
" seconds)\n",
ctdb->tunable.rerecovery_timeout);
ctdb_op_disable(rec->recovery, ctdb->ev,
ctdb->tunable.rerecovery_timeout);
return 0;
fail:
ctdb_op_end(rec->recovery);
return -1;
}
/*
elections are won by first checking the number of connected nodes, then
the priority time, then the pnn
*/
struct election_message {
uint32_t num_connected;
struct timeval priority_time;
uint32_t pnn;
uint32_t node_flags;
};
/*
form this nodes election data
*/
static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
{
unsigned int i;
int ret;
struct ctdb_node_map_old *nodemap;
struct ctdb_context *ctdb = rec->ctdb;
bool ok;
ZERO_STRUCTP(em);
em->pnn = rec->pnn;
em->priority_time = rec->priority_time;
ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
if (ret != 0) {
DEBUG(DEBUG_ERR,(__location__ " unable to get node map\n"));
return;
}
ok = node_flags(rec, rec->pnn, &rec->node_flags);
if (!ok) {
DBG_ERR("Unable to get node flags for this node\n");
return;
}
em->node_flags = rec->node_flags;
for (i=0;i<nodemap->num;i++) {
if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
em->num_connected++;
}
}
if (!this_node_can_be_leader(rec)) {
/* Try to lose... */
em->num_connected = 0;
em->priority_time = timeval_current();
}
talloc_free(nodemap);
}
/*
see if the given election data wins
*/
static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
{
struct election_message myem;
int cmp = 0;
ctdb_election_data(rec, &myem);
if (!this_node_can_be_leader(rec)) {
return false;
}
/* Automatically win if other node is banned or stopped */
if (em->node_flags & NODE_FLAGS_INACTIVE) {
return true;
}
/* then the longest running node */
if (cmp == 0) {
cmp = timeval_compare(&em->priority_time, &myem.priority_time);
}
if (cmp == 0) {
cmp = (int)myem.pnn - (int)em->pnn;
}
return cmp > 0;
}
/*
send out an election request
*/
static int send_election_request(struct ctdb_recoverd *rec)
{
TDB_DATA election_data;
struct election_message emsg;
uint64_t srvid;
struct ctdb_context *ctdb = rec->ctdb;
srvid = CTDB_SRVID_ELECTION;
ctdb_election_data(rec, &emsg);
election_data.dsize = sizeof(struct election_message);
election_data.dptr = (unsigned char *)&emsg;
/* Assume this node will win the election, set leader accordingly */
rec->leader = rec->pnn;
/* send an election message to all active nodes */
DEBUG(DEBUG_INFO,(__location__ " Send election request to all active nodes\n"));
return ctdb_client_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
}
/*
we think we are winning the election - send a broadcast election request
*/
static void election_send_request(struct tevent_context *ev,
struct tevent_timer *te,
struct timeval t, void *p)
{
struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
int ret;
ret = send_election_request(rec);
if (ret != 0) {
DEBUG(DEBUG_ERR,("Failed to send election request!\n"));
}
TALLOC_FREE(rec->send_election_te);
}
/*
handler for memory dumps
*/
static void mem_dump_handler(uint64_t srvid, TDB_DATA data, void *private_data)
{
struct ctdb_recoverd *rec = talloc_get_type(
private_data, struct ctdb_recoverd);
struct ctdb_context *ctdb = rec->ctdb;
TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
TDB_DATA *dump;
int ret;
struct ctdb_srvid_message *rd;
if (data.dsize != sizeof(struct ctdb_srvid_message)) {
DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
talloc_free(tmp_ctx);
return;
}
rd = (struct ctdb_srvid_message *)data.dptr;
dump = talloc_zero(tmp_ctx, TDB_DATA);
if (dump == NULL) {
DEBUG(DEBUG_ERR, (__location__ " Failed to allocate memory for memdump\n"));
talloc_free(tmp_ctx);
return;
}
ret = ctdb_dump_memory(ctdb, dump);
if (ret != 0) {
DEBUG(DEBUG_ERR, (__location__ " ctdb_dump_memory() failed\n"));
talloc_free(tmp_ctx);
return;
}
DBG_ERR("recovery daemon memory dump\n");
ret = ctdb_client_send_message(ctdb, rd->pnn, rd->srvid, *dump);
if (ret != 0) {
DEBUG(DEBUG_ERR,("Failed to send rd memdump reply message\n"));
talloc_free(tmp_ctx);
return;
}
talloc_free(tmp_ctx);
}
/*
handler for reload_nodes
*/
static void reload_nodes_handler(uint64_t srvid, TDB_DATA data,
void *private_data)
{
struct ctdb_recoverd *rec = talloc_get_type(
private_data, struct ctdb_recoverd);
DEBUG(DEBUG_ERR, (__location__ " Reload nodes file from recovery daemon\n"));
ctdb_load_nodes_file(rec->ctdb);
}
static void recd_node_rebalance_handler(uint64_t srvid, TDB_DATA data,
void *private_data)
{
struct ctdb_recoverd *rec = talloc_get_type(
private_data, struct ctdb_recoverd);
struct ctdb_context *ctdb = rec->ctdb;
uint32_t pnn;
uint32_t *t;
int len;
if (!this_node_is_leader(rec)) {
return;
}
if (data.dsize != sizeof(uint32_t)) {
DEBUG(DEBUG_ERR,(__location__ " Incorrect size of node rebalance message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(uint32_t)));
return;
}
pnn = *(uint32_t *)&data.dptr[0];
DEBUG(DEBUG_NOTICE,("Setting up rebalance of IPs to node %u\n", pnn));
/* Copy any existing list of nodes. There's probably some
* sort of realloc variant that will do this but we need to
* make sure that freeing the old array also cancels the timer
* event for the timeout... not sure if realloc will do that.
*/
len = (rec->force_rebalance_nodes != NULL) ?
talloc_array_length(rec->force_rebalance_nodes) :
0;
/* This allows duplicates to be added but they don't cause
* harm. A call to add a duplicate PNN arguably means that
* the timeout should be reset, so this is the simplest
* solution.
*/
t = talloc_zero_array(rec, uint32_t, len+1);
CTDB_NO_MEMORY_VOID(ctdb, t);
if (len > 0) {
memcpy(t, rec->force_rebalance_nodes, sizeof(uint32_t) * len);
}
t[len] = pnn;
talloc_free(rec->force_rebalance_nodes);
rec->force_rebalance_nodes = t;
}
static void srvid_disable_and_reply(struct ctdb_recoverd *rec,
TDB_DATA data,
struct ctdb_op_state *op_state)
{
struct ctdb_context *ctdb = rec->ctdb;
struct ctdb_disable_message *r;
uint32_t timeout;
TDB_DATA result;
int32_t ret = 0;
/* Validate input data */
if (data.dsize != sizeof(struct ctdb_disable_message)) {
DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
"expecting %lu\n", (long unsigned)data.dsize,
(long unsigned)sizeof(struct ctdb_srvid_message)));
return;
}
if (data.dptr == NULL) {
DEBUG(DEBUG_ERR,(__location__ " No data received\n"));
return;
}
r = (struct ctdb_disable_message *)data.dptr;
timeout = r->timeout;
ret = ctdb_op_disable(op_state, ctdb->ev, timeout);
if (ret != 0) {
goto done;
}
/* Returning our PNN tells the caller that we succeeded */
ret = rec->pnn;
done:
result.dsize = sizeof(int32_t);
result.dptr = (uint8_t *)&ret;
srvid_request_reply(ctdb, (struct ctdb_srvid_message *)r, result);
}
static void disable_takeover_runs_handler(uint64_t srvid, TDB_DATA data,
void *private_data)
{
struct ctdb_recoverd *rec = talloc_get_type(
private_data, struct ctdb_recoverd);
srvid_disable_and_reply(rec, data, rec->takeover_run);
}
/* Backward compatibility for this SRVID */
static void disable_ip_check_handler(uint64_t srvid, TDB_DATA data,
void *private_data)
{
struct ctdb_recoverd *rec = talloc_get_type(
private_data, struct ctdb_recoverd);
uint32_t timeout;
if (data.dsize != sizeof(uint32_t)) {
DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
"expecting %lu\n", (long unsigned)data.dsize,
(long unsigned)sizeof(uint32_t)));
return;
}
if (data.dptr == NULL) {
DEBUG(DEBUG_ERR,(__location__ " No data received\n"));
return;
}
timeout = *((uint32_t *)data.dptr);
ctdb_op_disable(rec->takeover_run, rec->ctdb->ev, timeout);
}
static void disable_recoveries_handler(uint64_t srvid, TDB_DATA data,
void *private_data)
{
struct ctdb_recoverd *rec = talloc_get_type(
private_data, struct ctdb_recoverd);
srvid_disable_and_reply(rec, data, rec->recovery);
}
/*
handler for ip reallocate, just add it to the list of requests and
handle this later in the monitor_cluster loop so we do not recurse
with other requests to takeover_run()
*/
static void ip_reallocate_handler(uint64_t srvid, TDB_DATA data,
void *private_data)
{
struct ctdb_srvid_message *request;
struct ctdb_recoverd *rec = talloc_get_type(
private_data, struct ctdb_recoverd);
if (data.dsize != sizeof(struct ctdb_srvid_message)) {
DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
return;
}
request = (struct ctdb_srvid_message *)data.dptr;
srvid_request_add(rec->ctdb, &rec->reallocate_requests, request);
}
static void process_ipreallocate_requests(struct ctdb_context *ctdb,
struct ctdb_recoverd *rec)
{
TDB_DATA result;
int32_t ret;
struct srvid_requests *current;
/* Only process requests that are currently pending. More
* might come in while the takeover run is in progress and
* they will need to be processed later since they might
* be in response flag changes.
*/
current = rec->reallocate_requests;
rec->reallocate_requests = NULL;
if (do_takeover_run(rec, rec->nodemap)) {
ret = rec->pnn;
} else {
ret = -1;
}
result.dsize = sizeof(int32_t);
result.dptr = (uint8_t *)&ret;
srvid_requests_reply(ctdb, &current, result);
}
/*
* handler for assigning banning credits
*/
static void banning_handler(uint64_t srvid, TDB_DATA data, void *private_data)
{
struct ctdb_recoverd *rec = talloc_get_type(
private_data, struct ctdb_recoverd);
uint32_t ban_pnn;
/* Ignore if we are not leader */
if (!this_node_is_leader(rec)) {
return;
}
if (data.dsize != sizeof(uint32_t)) {
DEBUG(DEBUG_ERR, (__location__ "invalid data size %zu\n",
data.dsize));
return;
}
ban_pnn = *(uint32_t *)data.dptr;
ctdb_set_culprit_count(rec, ban_pnn, rec->nodemap->num);
}
/*
* Handler for leader elections
*/
static void election_handler(uint64_t srvid, TDB_DATA data, void *private_data)
{
struct ctdb_recoverd *rec = talloc_get_type(
private_data, struct ctdb_recoverd);
struct ctdb_context *ctdb = rec->ctdb;
struct election_message *em = (struct election_message *)data.dptr;
/* Ignore election packets from ourself */
if (rec->pnn == em->pnn) {
return;
}
/* we got an election packet - update the timeout for the election */
talloc_free(rec->election_timeout);
rec->election_in_progress = true;
rec->election_timeout = tevent_add_timer(
ctdb->ev, ctdb,
fast_start ?
timeval_current_ofs(0, 500000) :
timeval_current_ofs(ctdb->tunable.election_timeout, 0),
ctdb_election_timeout, rec);
/* someone called an election. check their election data
and if we disagree and we would rather be the elected node,
send a new election message to all other nodes
*/
if (ctdb_election_win(rec, em)) {
if (!rec->send_election_te) {
rec->send_election_te = tevent_add_timer(
ctdb->ev, rec,
timeval_current_ofs(0, 500000),
election_send_request, rec);
}
return;
}
/* we didn't win */
TALLOC_FREE(rec->send_election_te);
/* Release the cluster lock file */
if (cluster_lock_held(rec)) {
cluster_lock_release(rec);
}
/* Set leader to the winner of this round */
rec->leader = em->pnn;
return;
}
static void cluster_lock_election(struct ctdb_recoverd *rec)
{
bool ok;
if (!this_node_can_be_leader(rec)) {
if (cluster_lock_held(rec)) {
cluster_lock_release(rec);
}
goto done;
}
/*
* Don't need to unconditionally release the lock and then
* attempt to retake it. This provides stability.
*/
if (cluster_lock_held(rec)) {
goto done;
}
rec->leader = CTDB_UNKNOWN_PNN;
ok = cluster_lock_take(rec);
if (ok) {
rec->leader = rec->pnn;
D_WARNING("Took cluster lock, leader=%"PRIu32"\n", rec->leader);
}
done:
rec->election_in_progress = false;
}
/*
force the start of the election process
*/
static void force_election(struct ctdb_recoverd *rec)
{
int ret;
struct ctdb_context *ctdb = rec->ctdb;
D_ERR("Start election\n");
/* set all nodes to recovery mode to stop all internode traffic */
ret = set_recovery_mode(ctdb, rec, rec->nodemap, CTDB_RECOVERY_ACTIVE);
if (ret != 0) {
DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
return;
}
rec->election_in_progress = true;
/* Let other nodes know that an election is underway */
leader_broadcast_send(rec, CTDB_UNKNOWN_PNN);
if (cluster_lock_enabled(rec)) {
cluster_lock_election(rec);
return;
}
talloc_free(rec->election_timeout);
rec->election_timeout = tevent_add_timer(
ctdb->ev, ctdb,
fast_start ?
timeval_current_ofs(0, 500000) :
timeval_current_ofs(ctdb->tunable.election_timeout, 0),
ctdb_election_timeout, rec);
ret = send_election_request(rec);
if (ret!=0) {
DBG_ERR("Failed to initiate leader election\n");
return;
}
/* wait for a few seconds to collect all responses */
ctdb_wait_election(rec);
}
static void srvid_not_implemented(uint64_t srvid,
TDB_DATA data,
void *private_data)
{
const char *s;
switch (srvid) {
case CTDB_SRVID_SET_NODE_FLAGS:
s = "CTDB_SRVID_SET_NODE_FLAGS";
break;
default:
s = "UNKNOWN";
}
D_WARNING("SRVID %s (0x%" PRIx64 ") is obsolete\n", s, srvid);
}
/*
handler for when we need to push out flag changes to all other nodes
*/
static void push_flags_handler(uint64_t srvid, TDB_DATA data,
void *private_data)
{
struct ctdb_recoverd *rec = talloc_get_type(
private_data, struct ctdb_recoverd);
struct ctdb_context *ctdb = rec->ctdb;
int ret;
struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
struct ctdb_node_map_old *nodemap=NULL;
TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
uint32_t *nodes;
/* read the node flags from the leader */
ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), rec->leader,
tmp_ctx, &nodemap);
if (ret != 0) {
DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", c->pnn));
talloc_free(tmp_ctx);
return;
}
if (c->pnn >= nodemap->num) {
DBG_ERR("Nodemap from leader does not contain node %d\n",
c->pnn);
talloc_free(tmp_ctx);
return;
}
/* send the flags update to all connected nodes */
nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
nodes, 0, CONTROL_TIMEOUT(),
false, data,
NULL, NULL,
NULL) != 0) {
DEBUG(DEBUG_ERR, (__location__ " ctdb_control to modify node flags failed\n"));
talloc_free(tmp_ctx);
return;
}
talloc_free(tmp_ctx);
}
static void leader_broadcast_timeout_handler(struct tevent_context *ev,
struct tevent_timer *te,
struct timeval current_time,
void *private_data)
{
struct ctdb_recoverd *rec = talloc_get_type_abort(
private_data, struct ctdb_recoverd);
rec->leader_broadcast_timeout_te = NULL;
D_NOTICE("Leader broadcast timeout\n");
force_election(rec);
}
static void leader_broadcast_timeout_cancel(struct ctdb_recoverd *rec)
{
TALLOC_FREE(rec->leader_broadcast_timeout_te);
}
static int leader_broadcast_timeout_start(struct ctdb_recoverd *rec)
{
struct ctdb_context *ctdb = rec->ctdb;
/*
* This should not be necessary. However, there will be
* interactions with election code here. It will want to
* cancel and restart the timer around potentially long
* elections.
*/
leader_broadcast_timeout_cancel(rec);
rec->leader_broadcast_timeout_te =
tevent_add_timer(
ctdb->ev,
rec,
timeval_current_ofs(ctdb_config.leader_timeout, 0),
leader_broadcast_timeout_handler,
rec);
if (rec->leader_broadcast_timeout_te == NULL) {
D_ERR("Unable to start leader broadcast timeout\n");
return ENOMEM;
}
return 0;
}
static bool leader_broadcast_timeout_active(struct ctdb_recoverd *rec)
{
return rec->leader_broadcast_timeout_te != NULL;
}
static void leader_handler(uint64_t srvid, TDB_DATA data, void *private_data)
{
struct ctdb_recoverd *rec = talloc_get_type_abort(
private_data, struct ctdb_recoverd);
uint32_t pnn;
size_t npull;
int ret;
ret = ctdb_uint32_pull(data.dptr, data.dsize, &pnn, &npull);
if (ret != 0) {
DBG_WARNING("Unable to parse leader broadcast, ret=%d\n", ret);
return;
}
leader_broadcast_timeout_cancel(rec);
if (pnn == rec->leader) {
goto done;
}
if (pnn == CTDB_UNKNOWN_PNN) {
bool was_election_in_progress = rec->election_in_progress;
/*
* Leader broadcast timeout was cancelled above - stop
* main loop from restarting it until election is
* complete
*/
rec->election_in_progress = true;
/*
* This is the only notification for a cluster lock
* election, so handle it here...
*/
if (cluster_lock_enabled(rec) && !was_election_in_progress) {
cluster_lock_election(rec);
}
return;
}
D_NOTICE("Received leader broadcast, leader=%"PRIu32"\n", pnn);
rec->leader = pnn;
done:
leader_broadcast_timeout_start(rec);
}
struct verify_recmode_normal_data {
uint32_t count;
enum monitor_result status;
};
static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
{
struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmode_normal_data);
/* one more node has responded with recmode data*/
rmdata->count--;
/* if we failed to get the recmode, then return an error and let
the main loop try again.
*/
if (state->state != CTDB_CONTROL_DONE) {
if (rmdata->status == MONITOR_OK) {
rmdata->status = MONITOR_FAILED;
}
return;
}
/* if we got a response, then the recmode will be stored in the
status field
*/
if (state->status != CTDB_RECOVERY_NORMAL) {
DEBUG(DEBUG_NOTICE, ("Node:%u was in recovery mode. Start recovery process\n", state->c->hdr.destnode));
rmdata->status = MONITOR_RECOVERY_NEEDED;
}
return;
}
/* verify that all nodes are in normal recovery mode */
static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map_old *nodemap)
{
struct verify_recmode_normal_data *rmdata;
TALLOC_CTX *mem_ctx = talloc_new(ctdb);
struct ctdb_client_control_state *state;
enum monitor_result status;
unsigned int j;
rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
rmdata->count = 0;
rmdata->status = MONITOR_OK;
/* loop over all active nodes and send an async getrecmode call to
them*/
for (j=0; j<nodemap->num; j++) {
if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
continue;
}
state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx,
CONTROL_TIMEOUT(),
nodemap->nodes[j].pnn);
if (state == NULL) {
/* we failed to send the control, treat this as
an error and try again next iteration
*/
DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
talloc_free(mem_ctx);
return MONITOR_FAILED;
}
/* set up the callback functions */
state->async.fn = verify_recmode_normal_callback;
state->async.private_data = rmdata;
/* one more control to wait for to complete */
rmdata->count++;
}
/* now wait for up to the maximum number of seconds allowed
or until all nodes we expect a response from has replied
*/
while (rmdata->count > 0) {
tevent_loop_once(ctdb->ev);
}
status = rmdata->status;
talloc_free(mem_ctx);
return status;
}
static bool interfaces_have_changed(struct ctdb_context *ctdb,
struct ctdb_recoverd *rec)
{
struct ctdb_iface_list_old *ifaces = NULL;
TALLOC_CTX *mem_ctx;
bool ret = false;
mem_ctx = talloc_new(NULL);
/* Read the interfaces from the local node */
if (ctdb_ctrl_get_ifaces(ctdb, CONTROL_TIMEOUT(),
CTDB_CURRENT_NODE, mem_ctx, &ifaces) != 0) {
D_ERR("Unable to get interfaces from local node %u\n", rec->pnn);
/* We could return an error. However, this will be
* rare so we'll decide that the interfaces have
* actually changed, just in case.
*/
talloc_free(mem_ctx);
return true;
}
if (!rec->ifaces) {
/* We haven't been here before so things have changed */
DEBUG(DEBUG_NOTICE, ("Initial interface fetched\n"));
ret = true;
} else if (rec->ifaces->num != ifaces->num) {
/* Number of interfaces has changed */
DEBUG(DEBUG_NOTICE, ("Interface count changed from %d to %d\n",
rec->ifaces->num, ifaces->num));
ret = true;
} else {
/* See if interface names or link states have changed */
unsigned int i;
for (i = 0; i < rec->ifaces->num; i++) {
struct ctdb_iface * iface = &rec->ifaces->ifaces[i];
if (strcmp(iface->name, ifaces->ifaces[i].name) != 0) {
DEBUG(DEBUG_NOTICE,
("Interface in slot %d changed: %s => %s\n",
i, iface->name, ifaces->ifaces[i].name));
ret = true;
break;
}
if (iface->link_state != ifaces->ifaces[i].link_state) {
DEBUG(DEBUG_NOTICE,
("Interface %s changed state: %d => %d\n",
iface->name, iface->link_state,
ifaces->ifaces[i].link_state));
ret = true;
break;
}
}
}
talloc_free(rec->ifaces);
rec->ifaces = talloc_steal(rec, ifaces);
talloc_free(mem_ctx);
return ret;
}
/* Check that the local allocation of public IP addresses is correct
* and do some house-keeping */
static int verify_local_ip_allocation(struct ctdb_recoverd *rec)
{
TALLOC_CTX *mem_ctx = talloc_new(NULL);
struct ctdb_context *ctdb = rec->ctdb;
unsigned int j;
int ret;
bool need_takeover_run = false;
struct ctdb_public_ip_list_old *ips = NULL;
/* If we are not the leader then do some housekeeping */
if (!this_node_is_leader(rec)) {
/* Ignore any IP reallocate requests - only leader
* processes them
*/
TALLOC_FREE(rec->reallocate_requests);
/* Clear any nodes that should be force rebalanced in
* the next takeover run. If the leader has changed
* then we don't want to process these some time in
* the future.
*/
TALLOC_FREE(rec->force_rebalance_nodes);
}
/* Return early if disabled... */
if (ctdb_config.failover_disabled ||
ctdb_op_is_disabled(rec->takeover_run)) {
talloc_free(mem_ctx);
return 0;
}
if (interfaces_have_changed(ctdb, rec)) {
need_takeover_run = true;
}
/* If there are unhosted IPs but this node can host them then
* trigger an IP reallocation */
/* Read *available* IPs from local node */
ret = ctdb_ctrl_get_public_ips_flags(
ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx,
CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE, &ips);
if (ret != 0) {
DEBUG(DEBUG_ERR, ("Unable to retrieve available public IPs\n"));
talloc_free(mem_ctx);
return -1;
}
for (j=0; j<ips->num; j++) {
if (ips->ips[j].pnn == CTDB_UNKNOWN_PNN &&
rec->nodemap->nodes[rec->pnn].flags == 0) {
DEBUG(DEBUG_WARNING,
("Unassigned IP %s can be served by this node\n",
ctdb_addr_to_str(&ips->ips[j].addr)));
need_takeover_run = true;
}
}
talloc_free(ips);
if (!ctdb->do_checkpublicip) {
goto done;
}
/* Validate the IP addresses that this node has on network
* interfaces. If there is an inconsistency between reality
* and the state expected by CTDB then try to fix it by
* triggering an IP reallocation or releasing extraneous IP
* addresses. */
/* Read *known* IPs from local node */
ret = ctdb_ctrl_get_public_ips_flags(
ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, 0, &ips);
if (ret != 0) {
DEBUG(DEBUG_ERR, ("Unable to retrieve known public IPs\n"));
talloc_free(mem_ctx);
return -1;
}
for (j=0; j<ips->num; j++) {
if (ips->ips[j].pnn == rec->pnn) {
if (!ctdb_sys_have_ip(&ips->ips[j].addr)) {
DEBUG(DEBUG_ERR,
("Assigned IP %s not on an interface\n",
ctdb_addr_to_str(&ips->ips[j].addr)));
need_takeover_run = true;
}
} else {
if (ctdb_sys_have_ip(&ips->ips[j].addr)) {
DEBUG(DEBUG_ERR,
("IP %s incorrectly on an interface\n",
ctdb_addr_to_str(&ips->ips[j].addr)));
need_takeover_run = true;
}
}
}
done:
if (need_takeover_run) {
struct ctdb_srvid_message rd;
TDB_DATA data;
DEBUG(DEBUG_NOTICE,("Trigger takeoverrun\n"));
ZERO_STRUCT(rd);
rd.pnn = rec->pnn;
rd.srvid = 0;
data.dptr = (uint8_t *)&rd;
data.dsize = sizeof(rd);
ret = ctdb_client_send_message(ctdb,
CTDB_BROADCAST_CONNECTED,
CTDB_SRVID_TAKEOVER_RUN,
data);
if (ret != 0) {
D_ERR("Failed to send takeover run request\n");
}
}
talloc_free(mem_ctx);
return 0;
}
struct remote_nodemaps_state {
struct ctdb_node_map_old **remote_nodemaps;
struct ctdb_recoverd *rec;
};
static void async_getnodemap_callback(struct ctdb_context *ctdb,
uint32_t node_pnn,
int32_t res,
TDB_DATA outdata,
void *callback_data)
{
struct remote_nodemaps_state *state =
(struct remote_nodemaps_state *)callback_data;
struct ctdb_node_map_old **remote_nodemaps = state->remote_nodemaps;
struct ctdb_node_map_old *nodemap = state->rec->nodemap;
size_t i;
for (i = 0; i < nodemap->num; i++) {
if (nodemap->nodes[i].pnn == node_pnn) {
break;
}
}
if (i >= nodemap->num) {
DBG_ERR("Invalid PNN %"PRIu32"\n", node_pnn);
return;
}
remote_nodemaps[i] = (struct ctdb_node_map_old *)talloc_steal(
remote_nodemaps, outdata.dptr);
}
static void async_getnodemap_error(struct ctdb_context *ctdb,
uint32_t node_pnn,
int32_t res,
TDB_DATA outdata,
void *callback_data)
{
struct remote_nodemaps_state *state =
(struct remote_nodemaps_state *)callback_data;
struct ctdb_recoverd *rec = state->rec;
DBG_ERR("Failed to retrieve nodemap from node %u\n", node_pnn);
ctdb_set_culprit(rec, node_pnn);
}
static int get_remote_nodemaps(struct ctdb_recoverd *rec,
TALLOC_CTX *mem_ctx,
struct ctdb_node_map_old ***remote_nodemaps)
{
struct ctdb_context *ctdb = rec->ctdb;
struct ctdb_node_map_old **t;
uint32_t *nodes;
struct remote_nodemaps_state state;
int ret;
t = talloc_zero_array(mem_ctx,
struct ctdb_node_map_old *,
rec->nodemap->num);
if (t == NULL) {
DBG_ERR("Memory allocation error\n");
return -1;
}
nodes = list_of_connected_nodes(ctdb, rec->nodemap, mem_ctx, false);
state.remote_nodemaps = t;
state.rec = rec;
ret = ctdb_client_async_control(ctdb,
CTDB_CONTROL_GET_NODEMAP,
nodes,
0,
CONTROL_TIMEOUT(),
false,
tdb_null,
async_getnodemap_callback,
async_getnodemap_error,
&state);
talloc_free(nodes);
if (ret != 0) {
talloc_free(t);
return ret;
}
*remote_nodemaps = t;
return 0;
}
static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
TALLOC_CTX *mem_ctx)
{
struct ctdb_node_map_old *nodemap=NULL;
struct ctdb_node_map_old **remote_nodemaps=NULL;
struct ctdb_vnn_map *vnnmap=NULL;
struct ctdb_vnn_map *remote_vnnmap=NULL;
uint32_t num_lmasters;
int32_t debug_level;
unsigned int i, j;
int ret;
bool self_ban;
/* verify that the main daemon is still running */
if (ctdb_kill(ctdb, ctdb->ctdbd_pid, 0) != 0) {
DEBUG(DEBUG_CRIT,("CTDB daemon is no longer available. Shutting down recovery daemon\n"));
exit(-1);
}
/* ping the local daemon to tell it we are alive */
ctdb_ctrl_recd_ping(ctdb);
if (rec->election_in_progress) {
/* an election is in progress */
return;
}
/*
* Start leader broadcasts if they are not active (1st time
* through main loop? Memory allocation error?)
*/
if (!leader_broadcast_loop_active(rec)) {
ret = leader_broadcast_loop(rec);
if (ret != 0) {
D_ERR("Failed to set up leader broadcast\n");
ctdb_set_culprit(rec, rec->pnn);
}
}
/*
* Similar for leader broadcast timeouts. These can also have
* been stopped by another node receiving a leader broadcast
* timeout and transmitting an "unknown leader broadcast".
* Note that this should never be done during an election - at
* the moment there is nothing between here and the above
* election-in-progress check that can process an election
* result (i.e. no event loop).
*/
if (!leader_broadcast_timeout_active(rec)) {
ret = leader_broadcast_timeout_start(rec);
if (ret != 0) {
ctdb_set_culprit(rec, rec->pnn);
}
}
/* read the debug level from the parent and update locally */
ret = ctdb_ctrl_get_debuglevel(ctdb, CTDB_CURRENT_NODE, &debug_level);
if (ret !=0) {
DEBUG(DEBUG_ERR, (__location__ " Failed to read debuglevel from parent\n"));
return;
}
debuglevel_set(debug_level);
/* get relevant tunables */
ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
if (ret != 0) {
DEBUG(DEBUG_ERR,("Failed to get tunables - retrying\n"));
return;
}
/* get runstate */
ret = ctdb_ctrl_get_runstate(ctdb, CONTROL_TIMEOUT(),
CTDB_CURRENT_NODE, &ctdb->runstate);
if (ret != 0) {
DEBUG(DEBUG_ERR, ("Failed to get runstate - retrying\n"));
return;
}
/* get nodemap */
ret = ctdb_ctrl_getnodemap(ctdb,
CONTROL_TIMEOUT(),
rec->pnn,
rec,
&nodemap);
if (ret != 0) {
DBG_ERR("Unable to get nodemap from node %"PRIu32"\n", rec->pnn);
return;
}
talloc_free(rec->nodemap);
rec->nodemap = nodemap;
/* remember our own node flags */
rec->node_flags = nodemap->nodes[rec->pnn].flags;
ban_misbehaving_nodes(rec, &self_ban);
if (self_ban) {
DEBUG(DEBUG_NOTICE, ("This node was banned, restart main_loop\n"));
return;
}
ret = ctdb_ctrl_getrecmode(ctdb, mem_ctx, CONTROL_TIMEOUT(),
CTDB_CURRENT_NODE, &ctdb->recovery_mode);
if (ret != 0) {
D_ERR("Failed to read recmode from local node\n");
return;
}
/* if the local daemon is STOPPED or BANNED, we verify that the databases are
also frozen and that the recmode is set to active.
*/
if (rec->node_flags & NODE_FLAGS_INACTIVE) {
/* If this node has become inactive then we want to
* reduce the chances of it taking over the leader
* role when it becomes active again. This
* helps to stabilise the leader role so that
* it stays on the most stable node.
*/
rec->priority_time = timeval_current();
if (ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
DEBUG(DEBUG_ERR,("Node is stopped or banned but recovery mode is not active. Activate recovery mode and lock databases\n"));
ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
if (ret != 0) {
DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode in STOPPED or BANNED state\n"));
return;
}
}
if (! rec->frozen_on_inactive) {
ret = ctdb_ctrl_freeze(ctdb, CONTROL_TIMEOUT(),
CTDB_CURRENT_NODE);
if (ret != 0) {
DEBUG(DEBUG_ERR,
(__location__ " Failed to freeze node "
"in STOPPED or BANNED state\n"));
return;
}
rec->frozen_on_inactive = true;
}
/* If this node is stopped or banned then it is not the recovery
* master, so don't do anything. This prevents stopped or banned
* node from starting election and sending unnecessary controls.
*/
return;
}
rec->frozen_on_inactive = false;
/* Retrieve capabilities from all connected nodes */
ret = update_capabilities(rec, nodemap);
if (ret != 0) {
DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
return;
}
if (ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
/* Check if an IP takeover run is needed and trigger one if
* necessary */
verify_local_ip_allocation(rec);
}
/* If this node is not the leader then skip recovery checks */
if (!this_node_is_leader(rec)) {
return;
}
/* Get the nodemaps for all connected remote nodes */
ret = get_remote_nodemaps(rec, mem_ctx, &remote_nodemaps);
if (ret != 0) {
DBG_ERR("Failed to read remote nodemaps\n");
return;
}
/* Ensure our local and remote flags are correct */
ret = update_flags(rec, nodemap, remote_nodemaps);
if (ret != 0) {
D_ERR("Unable to update flags\n");
return;
}
if (ctdb->num_nodes != nodemap->num) {
DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) reloading nodes file\n", ctdb->num_nodes, nodemap->num));
ctdb_load_nodes_file(ctdb);
return;
}
/* get the vnnmap */
ret = ctdb_ctrl_getvnnmap(ctdb,
CONTROL_TIMEOUT(),
rec->pnn,
mem_ctx,
&vnnmap);
if (ret != 0) {
DBG_ERR("Unable to get vnnmap from node %u\n", rec->pnn);
return;
}
if (rec->need_recovery) {
/* a previous recovery didn't finish */
do_recovery(rec, mem_ctx);
return;
}
/* verify that all active nodes are in normal mode
and not in recovery mode
*/
switch (verify_recmode(ctdb, nodemap)) {
case MONITOR_RECOVERY_NEEDED:
do_recovery(rec, mem_ctx);
return;
case MONITOR_FAILED:
return;
case MONITOR_ELECTION_NEEDED:
/* can not happen */
case MONITOR_OK:
break;
}
if (cluster_lock_enabled(rec)) {
/* We must already hold the cluster lock */
if (!cluster_lock_held(rec)) {
D_ERR("Failed cluster lock sanity check\n");
ctdb_set_culprit(rec, rec->pnn);
do_recovery(rec, mem_ctx);
return;
}
}
/* If recoveries are disabled then there is no use doing any
* nodemap or flags checks. Recoveries might be disabled due
* to "reloadnodes", so doing these checks might cause an
* unnecessary recovery. */
if (ctdb_op_is_disabled(rec->recovery)) {
goto takeover_run_checks;
}
/* verify that all other nodes have the same nodemap as we have
*/
for (j=0; j<nodemap->num; j++) {
if (nodemap->nodes[j].pnn == rec->pnn) {
continue;
}
if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
continue;
}
/* if the nodes disagree on how many nodes there are
then this is a good reason to try recovery
*/
if (remote_nodemaps[j]->num != nodemap->num) {
DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
nodemap->nodes[j].pnn, remote_nodemaps[j]->num, nodemap->num));
ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
do_recovery(rec, mem_ctx);
return;
}
/* if the nodes disagree on which nodes exist and are
active, then that is also a good reason to do recovery
*/
for (i=0;i<nodemap->num;i++) {
if (remote_nodemaps[j]->nodes[i].pnn != nodemap->nodes[i].pnn) {
DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n",
nodemap->nodes[j].pnn, i,
remote_nodemaps[j]->nodes[i].pnn, nodemap->nodes[i].pnn));
ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
do_recovery(rec, mem_ctx);
return;
}
}
}
/* count how many active nodes there are */
num_lmasters = 0;
for (i=0; i<nodemap->num; i++) {
if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
if (ctdb_node_has_capabilities(rec->caps,
ctdb->nodes[i]->pnn,
CTDB_CAP_LMASTER)) {
num_lmasters++;
}
}
}
/* There must be the same number of lmasters in the vnn map as
* there are active nodes with the lmaster capability... or
* do a recovery.
*/
if (vnnmap->size != num_lmasters) {
DEBUG(DEBUG_ERR, (__location__ " The vnnmap count is different from the number of active lmaster nodes: %u vs %u\n",
vnnmap->size, num_lmasters));
ctdb_set_culprit(rec, rec->pnn);
do_recovery(rec, mem_ctx);
return;
}
/*
* Verify that all active lmaster nodes in the nodemap also
* exist in the vnnmap
*/
for (j=0; j<nodemap->num; j++) {
if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
continue;
}
if (! ctdb_node_has_capabilities(rec->caps,
nodemap->nodes[j].pnn,
CTDB_CAP_LMASTER)) {
continue;
}
if (nodemap->nodes[j].pnn == rec->pnn) {
continue;
}
for (i=0; i<vnnmap->size; i++) {
if (vnnmap->map[i] == nodemap->nodes[j].pnn) {
break;
}
}
if (i == vnnmap->size) {
D_ERR("Active LMASTER node %u is not in the vnnmap\n",
nodemap->nodes[j].pnn);
ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
do_recovery(rec, mem_ctx);
return;
}
}
/* verify that all other nodes have the same vnnmap
and are from the same generation
*/
for (j=0; j<nodemap->num; j++) {
if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
continue;
}
if (nodemap->nodes[j].pnn == rec->pnn) {
continue;
}
ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn,
mem_ctx, &remote_vnnmap);
if (ret != 0) {
DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from remote node %u\n",
nodemap->nodes[j].pnn));
return;
}
/* verify the vnnmap generation is the same */
if (vnnmap->generation != remote_vnnmap->generation) {
DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n",
nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
do_recovery(rec, mem_ctx);
return;
}
/* verify the vnnmap size is the same */
if (vnnmap->size != remote_vnnmap->size) {
DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n",
nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
do_recovery(rec, mem_ctx);
return;
}
/* verify the vnnmap is the same */
for (i=0;i<vnnmap->size;i++) {
if (remote_vnnmap->map[i] != vnnmap->map[i]) {
DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different vnnmap.\n",
nodemap->nodes[j].pnn));
ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
do_recovery(rec, mem_ctx);
return;
}
}
}
/* FIXME: Add remote public IP checking to ensure that nodes
* have the IP addresses that are allocated to them. */
takeover_run_checks:
/* If there are IP takeover runs requested or the previous one
* failed then perform one and notify the waiters */
if (!ctdb_op_is_disabled(rec->takeover_run) &&
(rec->reallocate_requests || rec->need_takeover_run)) {
process_ipreallocate_requests(ctdb, rec);
}
}
static void recd_sig_term_handler(struct tevent_context *ev,
struct tevent_signal *se, int signum,
int count, void *dont_care,
void *private_data)
{
struct ctdb_recoverd *rec = talloc_get_type_abort(
private_data, struct ctdb_recoverd);
DEBUG(DEBUG_ERR, ("Received SIGTERM, exiting\n"));
cluster_lock_release(rec);
exit(0);
}
/*
* Periodically log elements of the cluster state
*
* This can be used to confirm a split brain has occurred
*/
static void maybe_log_cluster_state(struct tevent_context *ev,
struct tevent_timer *te,
struct timeval current_time,
void *private_data)
{
struct ctdb_recoverd *rec = talloc_get_type_abort(
private_data, struct ctdb_recoverd);
struct ctdb_context *ctdb = rec->ctdb;
struct tevent_timer *tt;
static struct timeval start_incomplete = {
.tv_sec = 0,
};
bool is_complete;
bool was_complete;
unsigned int i;
double seconds;
unsigned int minutes;
unsigned int num_connected;
if (!this_node_is_leader(rec)) {
goto done;
}
if (rec->nodemap == NULL) {
goto done;
}
is_complete = true;
num_connected = 0;
for (i = 0; i < rec->nodemap->num; i++) {
struct ctdb_node_and_flags *n = &rec->nodemap->nodes[i];
if (n->pnn == rec->pnn) {
continue;
}
if ((n->flags & NODE_FLAGS_DELETED) != 0) {
continue;
}
if ((n->flags & NODE_FLAGS_DISCONNECTED) != 0) {
is_complete = false;
continue;
}
num_connected++;
}
was_complete = timeval_is_zero(&start_incomplete);
if (is_complete) {
if (! was_complete) {
D_WARNING("Cluster complete with leader=%u\n",
rec->leader);
start_incomplete = timeval_zero();
}
goto done;
}
/* Cluster is newly incomplete... */
if (was_complete) {
start_incomplete = current_time;
minutes = 0;
goto log;
}
/*
* Cluster has been incomplete since previous check, so figure
* out how long (in minutes) and decide whether to log anything
*/
seconds = timeval_elapsed2(&start_incomplete, &current_time);
minutes = (unsigned int)seconds / 60;
if (minutes >= 60) {
/* Over an hour, log every hour */
if (minutes % 60 != 0) {
goto done;
}
} else if (minutes >= 10) {
/* Over 10 minutes, log every 10 minutes */
if (minutes % 10 != 0) {
goto done;
}
}
log:
D_WARNING("Cluster incomplete with leader=%u, elapsed=%u minutes, "
"connected=%u\n",
rec->leader,
minutes,
num_connected);
done:
tt = tevent_add_timer(ctdb->ev,
rec,
timeval_current_ofs(60, 0),
maybe_log_cluster_state,
rec);
if (tt == NULL) {
DBG_WARNING("Failed to set up cluster state timer\n");
}
}
static void recd_sighup_hook(void *private_data)
{
struct ctdb_recoverd *rec = talloc_get_type_abort(
private_data, struct ctdb_recoverd);
if (rec->helper_pid > 0) {
kill(rec->helper_pid, SIGHUP);
}
}
/*
the main monitoring loop
*/
static void monitor_cluster(struct ctdb_context *ctdb)
{
struct tevent_signal *se;
struct ctdb_recoverd *rec;
bool status;
DEBUG(DEBUG_NOTICE,("monitor_cluster starting\n"));
rec = talloc_zero(ctdb, struct ctdb_recoverd);
CTDB_NO_MEMORY_FATAL(ctdb, rec);
rec->ctdb = ctdb;
rec->leader = CTDB_UNKNOWN_PNN;
rec->pnn = ctdb_get_pnn(ctdb);
rec->cluster_lock_handle = NULL;
rec->helper_pid = -1;
rec->takeover_run = ctdb_op_init(rec, "takeover runs");
CTDB_NO_MEMORY_FATAL(ctdb, rec->takeover_run);
rec->recovery = ctdb_op_init(rec, "recoveries");
CTDB_NO_MEMORY_FATAL(ctdb, rec->recovery);
rec->priority_time = timeval_current();
rec->frozen_on_inactive = false;
status = logging_setup_sighup_handler(rec->ctdb->ev,
rec,
recd_sighup_hook,
rec);
if (!status) {
D_ERR("Failed to install SIGHUP handler\n");
exit(1);
}
se = tevent_add_signal(ctdb->ev, ctdb, SIGTERM, 0,
recd_sig_term_handler, rec);
if (se == NULL) {
DEBUG(DEBUG_ERR, ("Failed to install SIGTERM handler\n"));
exit(1);
}
if (!cluster_lock_enabled(rec)) {
struct tevent_timer *tt;
tt = tevent_add_timer(ctdb->ev,
rec,
timeval_current_ofs(60, 0),
maybe_log_cluster_state,
rec);
if (tt == NULL) {
DBG_WARNING("Failed to set up cluster state timer\n");
}
}
/* register a message port for sending memory dumps */
ctdb_client_set_message_handler(ctdb, CTDB_SRVID_MEM_DUMP, mem_dump_handler, rec);
/* when a node is assigned banning credits */
ctdb_client_set_message_handler(ctdb, CTDB_SRVID_BANNING,
banning_handler, rec);
/* register a message port for recovery elections */
ctdb_client_set_message_handler(ctdb, CTDB_SRVID_ELECTION, election_handler, rec);
ctdb_client_set_message_handler(ctdb,
CTDB_SRVID_SET_NODE_FLAGS,
srvid_not_implemented,
rec);
/* when we are asked to puch out a flag change */
ctdb_client_set_message_handler(ctdb, CTDB_SRVID_PUSH_NODE_FLAGS, push_flags_handler, rec);
/* register a message port for reloadnodes */
ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RELOAD_NODES, reload_nodes_handler, rec);
/* register a message port for performing a takeover run */
ctdb_client_set_message_handler(ctdb, CTDB_SRVID_TAKEOVER_RUN, ip_reallocate_handler, rec);
/* register a message port for disabling the ip check for a short while */
ctdb_client_set_message_handler(ctdb, CTDB_SRVID_DISABLE_IP_CHECK, disable_ip_check_handler, rec);
/* register a message port for forcing a rebalance of a node next
reallocation */
ctdb_client_set_message_handler(ctdb, CTDB_SRVID_REBALANCE_NODE, recd_node_rebalance_handler, rec);
/* Register a message port for disabling takeover runs */
ctdb_client_set_message_handler(ctdb,
CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
disable_takeover_runs_handler, rec);
/* Register a message port for disabling recoveries */
ctdb_client_set_message_handler(ctdb,
CTDB_SRVID_DISABLE_RECOVERIES,
disable_recoveries_handler, rec);
ctdb_client_set_message_handler(ctdb,
CTDB_SRVID_LEADER,
leader_handler,
rec);
for (;;) {
TALLOC_CTX *mem_ctx = talloc_new(ctdb);
struct timeval start;
double elapsed;
if (!mem_ctx) {
DEBUG(DEBUG_CRIT,(__location__
" Failed to create temp context\n"));
exit(-1);
}
start = timeval_current();
main_loop(ctdb, rec, mem_ctx);
talloc_free(mem_ctx);
/* we only check for recovery once every second */
elapsed = timeval_elapsed(&start);
if (elapsed < ctdb->tunable.recover_interval) {
ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval
- elapsed);
}
}
}
/*
event handler for when the main ctdbd dies
*/
static void ctdb_recoverd_parent(struct tevent_context *ev,
struct tevent_fd *fde,
uint16_t flags, void *private_data)
{
DEBUG(DEBUG_ALERT,("recovery daemon parent died - exiting\n"));
_exit(1);
}
/*
called regularly to verify that the recovery daemon is still running
*/
static void ctdb_check_recd(struct tevent_context *ev,
struct tevent_timer *te,
struct timeval yt, void *p)
{
struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
if (ctdb_kill(ctdb, ctdb->recoverd_pid, 0) != 0) {
DEBUG(DEBUG_ERR,("Recovery daemon (pid:%d) is no longer running. Trying to restart recovery daemon.\n", (int)ctdb->recoverd_pid));
tevent_add_timer(ctdb->ev, ctdb, timeval_zero(),
ctdb_restart_recd, ctdb);
return;
}
tevent_add_timer(ctdb->ev, ctdb->recd_ctx,
timeval_current_ofs(30, 0),
ctdb_check_recd, ctdb);
}
static void recd_sig_child_handler(struct tevent_context *ev,
struct tevent_signal *se, int signum,
int count, void *dont_care,
void *private_data)
{
// struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
int status;
pid_t pid = -1;
while (pid != 0) {
pid = waitpid(-1, &status, WNOHANG);
if (pid == -1) {
if (errno != ECHILD) {
DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%s(%d)\n", strerror(errno),errno));
}
return;
}
if (pid > 0) {
DEBUG(DEBUG_DEBUG, ("RECD SIGCHLD from %d\n", (int)pid));
}
}
}
/*
startup the recovery daemon as a child of the main ctdb daemon
*/
int ctdb_start_recoverd(struct ctdb_context *ctdb)
{
int fd[2];
struct tevent_signal *se;
struct tevent_fd *fde;
int ret;
if (pipe(fd) != 0) {
return -1;
}
ctdb->recoverd_pid = ctdb_fork(ctdb);
if (ctdb->recoverd_pid == -1) {
return -1;
}
if (ctdb->recoverd_pid != 0) {
talloc_free(ctdb->recd_ctx);
ctdb->recd_ctx = talloc_new(ctdb);
CTDB_NO_MEMORY(ctdb, ctdb->recd_ctx);
close(fd[0]);
tevent_add_timer(ctdb->ev, ctdb->recd_ctx,
timeval_current_ofs(30, 0),
ctdb_check_recd, ctdb);
return 0;
}
close(fd[1]);
srandom(getpid() ^ time(NULL));
ret = logging_init(ctdb, NULL, NULL, "ctdb-recoverd");
if (ret != 0) {
return -1;
}
prctl_set_comment("ctdb_recoverd");
if (switch_from_server_to_client(ctdb) != 0) {
DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch recovery daemon into client mode. shutting down.\n"));
exit(1);
}
DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to recovery daemon\n", fd[0]));
fde = tevent_add_fd(ctdb->ev, ctdb, fd[0], TEVENT_FD_READ,
ctdb_recoverd_parent, &fd[0]);
tevent_fd_set_auto_close(fde);
/* set up a handler to pick up sigchld */
se = tevent_add_signal(ctdb->ev, ctdb, SIGCHLD, 0,
recd_sig_child_handler, ctdb);
if (se == NULL) {
DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD in recovery daemon\n"));
exit(1);
}
monitor_cluster(ctdb);
DEBUG(DEBUG_ALERT,("ERROR: ctdb_recoverd finished!?\n"));
return -1;
}
/*
shutdown the recovery daemon
*/
void ctdb_stop_recoverd(struct ctdb_context *ctdb)
{
if (ctdb->recoverd_pid == 0) {
return;
}
DEBUG(DEBUG_NOTICE,("Shutting down recovery daemon\n"));
ctdb_kill(ctdb, ctdb->recoverd_pid, SIGTERM);
TALLOC_FREE(ctdb->recd_ctx);
TALLOC_FREE(ctdb->recd_ping_count);
}
static void ctdb_restart_recd(struct tevent_context *ev,
struct tevent_timer *te,
struct timeval t, void *private_data)
{
struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
DEBUG(DEBUG_ERR,("Restarting recovery daemon\n"));
ctdb_stop_recoverd(ctdb);
ctdb_start_recoverd(ctdb);
}