1
0
mirror of git://sourceware.org/git/lvm2.git synced 2025-01-10 05:18:36 +03:00
lvm2/daemons/cmirrord/cluster.c
Jonathan Brassow bdd7baeab3 cmirrord: Clean-up stray warning message (attempt #2)
There are two types of CPG communications in a corosync cluster:
messages and state transitions.  Cmirrord processes the state
transitions first.

When a cluster mirror issues a POSTSUSPEND, it signals the end of
cluster communication with the rest of the nodes in the cluster.
The POSTSUSPEND marks the last communication of the 'message'
type that will go around the cluster.  The node then calls
cpg_leave which causes a final 'state transition' communication to
all of the nodes.  Once the out-going node receives its own state
transition notice from the cluster, it finalizes the leave.  At this
point, the state of the log is 'INVALID'; but it is possible that
there remains some cluster trafic that was queued up behind the
state transition that still wants to be processed.  It is harmless
to attempt to dispatch any remaining messages - they won't be
delivered because the node is no longer in the cluster.  However,
there was a warning message that was being printed in this case
that is now removed by this patch.  The failure of the dispatch
created a false positive condition that triggered the message.
2014-03-19 14:43:00 -05:00

1804 lines
50 KiB
C

/*
* Copyright (C) 2004-2009 Red Hat, Inc. All rights reserved.
*
* This copyrighted material is made available to anyone wishing to use,
* modify, copy, or redistribute it subject to the terms and conditions
* of the GNU Lesser General Public License v.2.1.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program; if not, write to the Free Software Foundation,
* Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include "logging.h"
#include "cluster.h"
#include "common.h"
#include "compat.h"
#include "functions.h"
#include "link_mon.h"
#include "local.h"
#include "xlate.h"
#include <corosync/cpg.h>
#include <errno.h>
#include <signal.h>
#include <unistd.h>
#if CMIRROR_HAS_CHECKPOINT
#include <openais/saAis.h>
#include <openais/saCkpt.h>
/* Open AIS error codes */
#define str_ais_error(x) \
((x) == SA_AIS_OK) ? "SA_AIS_OK" : \
((x) == SA_AIS_ERR_LIBRARY) ? "SA_AIS_ERR_LIBRARY" : \
((x) == SA_AIS_ERR_VERSION) ? "SA_AIS_ERR_VERSION" : \
((x) == SA_AIS_ERR_INIT) ? "SA_AIS_ERR_INIT" : \
((x) == SA_AIS_ERR_TIMEOUT) ? "SA_AIS_ERR_TIMEOUT" : \
((x) == SA_AIS_ERR_TRY_AGAIN) ? "SA_AIS_ERR_TRY_AGAIN" : \
((x) == SA_AIS_ERR_INVALID_PARAM) ? "SA_AIS_ERR_INVALID_PARAM" : \
((x) == SA_AIS_ERR_NO_MEMORY) ? "SA_AIS_ERR_NO_MEMORY" : \
((x) == SA_AIS_ERR_BAD_HANDLE) ? "SA_AIS_ERR_BAD_HANDLE" : \
((x) == SA_AIS_ERR_BUSY) ? "SA_AIS_ERR_BUSY" : \
((x) == SA_AIS_ERR_ACCESS) ? "SA_AIS_ERR_ACCESS" : \
((x) == SA_AIS_ERR_NOT_EXIST) ? "SA_AIS_ERR_NOT_EXIST" : \
((x) == SA_AIS_ERR_NAME_TOO_LONG) ? "SA_AIS_ERR_NAME_TOO_LONG" : \
((x) == SA_AIS_ERR_EXIST) ? "SA_AIS_ERR_EXIST" : \
((x) == SA_AIS_ERR_NO_SPACE) ? "SA_AIS_ERR_NO_SPACE" : \
((x) == SA_AIS_ERR_INTERRUPT) ? "SA_AIS_ERR_INTERRUPT" : \
((x) == SA_AIS_ERR_NAME_NOT_FOUND) ? "SA_AIS_ERR_NAME_NOT_FOUND" : \
((x) == SA_AIS_ERR_NO_RESOURCES) ? "SA_AIS_ERR_NO_RESOURCES" : \
((x) == SA_AIS_ERR_NOT_SUPPORTED) ? "SA_AIS_ERR_NOT_SUPPORTED" : \
((x) == SA_AIS_ERR_BAD_OPERATION) ? "SA_AIS_ERR_BAD_OPERATION" : \
((x) == SA_AIS_ERR_FAILED_OPERATION) ? "SA_AIS_ERR_FAILED_OPERATION" : \
((x) == SA_AIS_ERR_MESSAGE_ERROR) ? "SA_AIS_ERR_MESSAGE_ERROR" : \
((x) == SA_AIS_ERR_QUEUE_FULL) ? "SA_AIS_ERR_QUEUE_FULL" : \
((x) == SA_AIS_ERR_QUEUE_NOT_AVAILABLE) ? "SA_AIS_ERR_QUEUE_NOT_AVAILABLE" : \
((x) == SA_AIS_ERR_BAD_FLAGS) ? "SA_AIS_ERR_BAD_FLAGS" : \
((x) == SA_AIS_ERR_TOO_BIG) ? "SA_AIS_ERR_TOO_BIG" : \
((x) == SA_AIS_ERR_NO_SECTIONS) ? "SA_AIS_ERR_NO_SECTIONS" : \
"ais_error_unknown"
#else
#define str_ais_error(x) \
((x) == CS_OK) ? "CS_OK" : \
((x) == CS_ERR_LIBRARY) ? "CS_ERR_LIBRARY" : \
((x) == CS_ERR_VERSION) ? "CS_ERR_VERSION" : \
((x) == CS_ERR_INIT) ? "CS_ERR_INIT" : \
((x) == CS_ERR_TIMEOUT) ? "CS_ERR_TIMEOUT" : \
((x) == CS_ERR_TRY_AGAIN) ? "CS_ERR_TRY_AGAIN" : \
((x) == CS_ERR_INVALID_PARAM) ? "CS_ERR_INVALID_PARAM" : \
((x) == CS_ERR_NO_MEMORY) ? "CS_ERR_NO_MEMORY" : \
((x) == CS_ERR_BAD_HANDLE) ? "CS_ERR_BAD_HANDLE" : \
((x) == CS_ERR_BUSY) ? "CS_ERR_BUSY" : \
((x) == CS_ERR_ACCESS) ? "CS_ERR_ACCESS" : \
((x) == CS_ERR_NOT_EXIST) ? "CS_ERR_NOT_EXIST" : \
((x) == CS_ERR_NAME_TOO_LONG) ? "CS_ERR_NAME_TOO_LONG" : \
((x) == CS_ERR_EXIST) ? "CS_ERR_EXIST" : \
((x) == CS_ERR_NO_SPACE) ? "CS_ERR_NO_SPACE" : \
((x) == CS_ERR_INTERRUPT) ? "CS_ERR_INTERRUPT" : \
((x) == CS_ERR_NAME_NOT_FOUND) ? "CS_ERR_NAME_NOT_FOUND" : \
((x) == CS_ERR_NO_RESOURCES) ? "CS_ERR_NO_RESOURCES" : \
((x) == CS_ERR_NOT_SUPPORTED) ? "CS_ERR_NOT_SUPPORTED" : \
((x) == CS_ERR_BAD_OPERATION) ? "CS_ERR_BAD_OPERATION" : \
((x) == CS_ERR_FAILED_OPERATION) ? "CS_ERR_FAILED_OPERATION" : \
((x) == CS_ERR_MESSAGE_ERROR) ? "CS_ERR_MESSAGE_ERROR" : \
((x) == CS_ERR_QUEUE_FULL) ? "CS_ERR_QUEUE_FULL" : \
((x) == CS_ERR_QUEUE_NOT_AVAILABLE) ? "CS_ERR_QUEUE_NOT_AVAILABLE" : \
((x) == CS_ERR_BAD_FLAGS) ? "CS_ERR_BAD_FLAGS" : \
((x) == CS_ERR_TOO_BIG) ? "CS_ERR_TOO_BIG" : \
((x) == CS_ERR_NO_SECTIONS) ? "CS_ERR_NO_SECTIONS" : \
((x) == CS_ERR_CONTEXT_NOT_FOUND) ? "CS_ERR_CONTEXT_NOT_FOUND" : \
((x) == CS_ERR_TOO_MANY_GROUPS) ? "CS_ERR_TOO_MANY_GROUPS" : \
((x) == CS_ERR_SECURITY) ? "CS_ERR_SECURITY" : \
"cs_error_unknown"
#endif
#define _RQ_TYPE(x) \
((x) == DM_ULOG_CHECKPOINT_READY) ? "DM_ULOG_CHECKPOINT_READY": \
((x) == DM_ULOG_MEMBER_JOIN) ? "DM_ULOG_MEMBER_JOIN": \
RQ_TYPE((x) & ~DM_ULOG_RESPONSE)
static uint32_t my_cluster_id = 0xDEAD;
#if CMIRROR_HAS_CHECKPOINT
static SaCkptHandleT ckpt_handle = 0;
static SaCkptCallbacksT callbacks = { 0, 0 };
static SaVersionT version = { 'B', 1, 1 };
#endif
#define DEBUGGING_HISTORY 100
#define LOG_SPRINT(cc, f, arg...) do { \
cc->idx++; \
cc->idx = cc->idx % DEBUGGING_HISTORY; \
sprintf(cc->debugging[cc->idx], f, ## arg); \
} while (0)
static int log_resp_rec = 0;
#define RECOVERING_REGION_SECTION_SIZE 64
struct checkpoint_data {
uint32_t requester;
char uuid[CPG_MAX_NAME_LENGTH];
int bitmap_size; /* in bytes */
char *sync_bits;
char *clean_bits;
char *recovering_region;
struct checkpoint_data *next;
};
#define INVALID 0
#define VALID 1
#define LEAVING 2
#define MAX_CHECKPOINT_REQUESTERS 10
struct clog_cpg {
struct dm_list list;
uint32_t lowest_id;
cpg_handle_t handle;
struct cpg_name name;
uint64_t luid;
/* Are we the first, or have we received checkpoint? */
int state;
int cpg_state; /* FIXME: debugging */
int free_me;
int delay;
int resend_requests;
struct dm_list startup_list;
struct dm_list working_list;
int checkpoints_needed;
uint32_t checkpoint_requesters[MAX_CHECKPOINT_REQUESTERS];
struct checkpoint_data *checkpoint_list;
int idx;
char debugging[DEBUGGING_HISTORY][128];
};
static struct dm_list clog_cpg_list;
/*
* cluster_send
* @rq
*
* Returns: 0 on success, -Exxx on error
*/
int cluster_send(struct clog_request *rq)
{
int r;
int found = 0;
struct iovec iov;
struct clog_cpg *entry;
dm_list_iterate_items(entry, &clog_cpg_list)
if (!strncmp(entry->name.value, rq->u_rq.uuid,
CPG_MAX_NAME_LENGTH)) {
found = 1;
break;
}
if (!found) {
rq->u_rq.error = -ENOENT;
return -ENOENT;
}
/*
* Once the request heads for the cluster, the luid looses
* all its meaning.
*/
rq->u_rq.luid = 0;
iov.iov_base = rq;
iov.iov_len = sizeof(struct clog_request) + rq->u_rq.data_size;
rq->u.version[0] = xlate64(CLOG_TFR_VERSION);
rq->u.version[1] = CLOG_TFR_VERSION;
r = clog_request_to_network(rq);
if (r < 0)
/* FIXME: Better error code for byteswap failure? */
return -EINVAL;
if (entry->cpg_state != VALID)
return -EINVAL;
#if CMIRROR_HAS_CHECKPOINT
do {
int count = 0;
r = cpg_mcast_joined(entry->handle, CPG_TYPE_AGREED, &iov, 1);
if (r != SA_AIS_ERR_TRY_AGAIN)
break;
count++;
if (count < 10)
LOG_PRINT("[%s] Retry #%d of cpg_mcast_joined: %s",
SHORT_UUID(rq->u_rq.uuid), count,
str_ais_error(r));
else if ((count < 100) && !(count % 10))
LOG_ERROR("[%s] Retry #%d of cpg_mcast_joined: %s",
SHORT_UUID(rq->u_rq.uuid), count,
str_ais_error(r));
else if ((count < 1000) && !(count % 100))
LOG_ERROR("[%s] Retry #%d of cpg_mcast_joined: %s",
SHORT_UUID(rq->u_rq.uuid), count,
str_ais_error(r));
else if ((count < 10000) && !(count % 1000))
LOG_ERROR("[%s] Retry #%d of cpg_mcast_joined: %s - "
"OpenAIS not handling the load?",
SHORT_UUID(rq->u_rq.uuid), count,
str_ais_error(r));
usleep(1000);
} while (1);
#else
r = cpg_mcast_joined(entry->handle, CPG_TYPE_AGREED, &iov, 1);
#endif
if (r == CS_OK)
return 0;
/* error codes found in openais/cpg.h */
LOG_ERROR("cpg_mcast_joined error: %d", r);
rq->u_rq.error = -EBADE;
return -EBADE;
}
static struct clog_request *get_matching_rq(struct clog_request *rq,
struct dm_list *l)
{
struct clog_request *match, *n;
dm_list_iterate_items_gen_safe(match, n, l, u.list)
if (match->u_rq.seq == rq->u_rq.seq) {
dm_list_del(&match->u.list);
return match;
}
return NULL;
}
static char rq_buffer[DM_ULOG_REQUEST_SIZE];
static int handle_cluster_request(struct clog_cpg *entry __attribute__((unused)),
struct clog_request *rq, int server)
{
int r = 0;
struct clog_request *tmp = (struct clog_request *)rq_buffer;
/*
* We need a separate dm_ulog_request struct, one that can carry
* a return payload. Otherwise, the memory address after
* rq will be altered - leading to problems
*/
memset(rq_buffer, 0, sizeof(rq_buffer));
memcpy(tmp, rq, sizeof(struct clog_request) + rq->u_rq.data_size);
/*
* With resumes, we only handle our own.
* Resume is a special case that requires
* local action (to set up CPG), followed by
* a cluster action to co-ordinate reading
* the disk and checkpointing
*/
if (tmp->u_rq.request_type == DM_ULOG_RESUME) {
if (tmp->originator == my_cluster_id) {
r = do_request(tmp, server);
r = kernel_send(&tmp->u_rq);
if (r < 0)
LOG_ERROR("Failed to send resume response to kernel");
}
return r;
}
r = do_request(tmp, server);
if (server &&
(tmp->u_rq.request_type != DM_ULOG_CLEAR_REGION) &&
(tmp->u_rq.request_type != DM_ULOG_POSTSUSPEND)) {
tmp->u_rq.request_type |= DM_ULOG_RESPONSE;
/*
* Errors from previous functions are in the rq struct.
*/
r = cluster_send(tmp);
if (r < 0)
LOG_ERROR("cluster_send failed: %s", strerror(-r));
}
return r;
}
static int handle_cluster_response(struct clog_cpg *entry,
struct clog_request *rq)
{
int r = 0;
struct clog_request *orig_rq;
/*
* If I didn't send it, then I don't care about the response
*/
if (rq->originator != my_cluster_id)
return 0;
rq->u_rq.request_type &= ~DM_ULOG_RESPONSE;
orig_rq = get_matching_rq(rq, &entry->working_list);
if (!orig_rq) {
/* Unable to find match for response */
LOG_ERROR("[%s] No match for cluster response: %s:%u",
SHORT_UUID(rq->u_rq.uuid),
_RQ_TYPE(rq->u_rq.request_type),
rq->u_rq.seq);
LOG_ERROR("Current local list:");
if (dm_list_empty(&entry->working_list))
LOG_ERROR(" [none]");
dm_list_iterate_items_gen(orig_rq, &entry->working_list, u.list)
LOG_ERROR(" [%s] %s:%u",
SHORT_UUID(orig_rq->u_rq.uuid),
_RQ_TYPE(orig_rq->u_rq.request_type),
orig_rq->u_rq.seq);
return -EINVAL;
}
if (log_resp_rec > 0) {
LOG_COND(log_resend_requests,
"[%s] Response received to %s/#%u",
SHORT_UUID(rq->u_rq.uuid),
_RQ_TYPE(rq->u_rq.request_type),
rq->u_rq.seq);
log_resp_rec--;
}
/* FIXME: Ensure memcpy cannot explode */
memcpy(orig_rq, rq, sizeof(*rq) + rq->u_rq.data_size);
r = kernel_send(&orig_rq->u_rq);
if (r)
LOG_ERROR("Failed to send response to kernel");
free(orig_rq);
return r;
}
static struct clog_cpg *find_clog_cpg(cpg_handle_t handle)
{
struct clog_cpg *match;
dm_list_iterate_items(match, &clog_cpg_list)
if (match->handle == handle)
return match;
return NULL;
}
/*
* prepare_checkpoint
* @entry: clog_cpg describing the log
* @cp_requester: nodeid requesting the checkpoint
*
* Creates and fills in a new checkpoint_data struct.
*
* Returns: checkpoint_data on success, NULL on error
*/
static struct checkpoint_data *prepare_checkpoint(struct clog_cpg *entry,
uint32_t cp_requester)
{
int r;
struct checkpoint_data *new;
if (entry->state != VALID) {
/*
* We can't store bitmaps yet, because the log is not
* valid yet.
*/
LOG_ERROR("Forced to refuse checkpoint for nodeid %u - log not valid yet",
cp_requester);
return NULL;
}
new = malloc(sizeof(*new));
if (!new) {
LOG_ERROR("Unable to create checkpoint data for %u",
cp_requester);
return NULL;
}
memset(new, 0, sizeof(*new));
new->requester = cp_requester;
strncpy(new->uuid, entry->name.value, entry->name.length);
new->bitmap_size = push_state(entry->name.value, entry->luid,
"clean_bits",
&new->clean_bits, cp_requester);
if (new->bitmap_size <= 0) {
LOG_ERROR("Failed to store clean_bits to checkpoint for node %u",
new->requester);
free(new);
return NULL;
}
new->bitmap_size = push_state(entry->name.value, entry->luid,
"sync_bits",
&new->sync_bits, cp_requester);
if (new->bitmap_size <= 0) {
LOG_ERROR("Failed to store sync_bits to checkpoint for node %u",
new->requester);
free(new->clean_bits);
free(new);
return NULL;
}
r = push_state(entry->name.value, entry->luid,
"recovering_region",
&new->recovering_region, cp_requester);
if (r <= 0) {
LOG_ERROR("Failed to store recovering_region to checkpoint for node %u",
new->requester);
free(new->sync_bits);
free(new->clean_bits);
free(new);
return NULL;
}
LOG_DBG("[%s] Checkpoint prepared for node %u:",
SHORT_UUID(new->uuid), new->requester);
LOG_DBG(" bitmap_size = %d", new->bitmap_size);
return new;
}
/*
* free_checkpoint
* @cp: the checkpoint_data struct to free
*
*/
static void free_checkpoint(struct checkpoint_data *cp)
{
free(cp->recovering_region);
free(cp->sync_bits);
free(cp->clean_bits);
free(cp);
}
#if CMIRROR_HAS_CHECKPOINT
static int export_checkpoint(struct checkpoint_data *cp)
{
SaCkptCheckpointCreationAttributesT attr;
SaCkptCheckpointHandleT h;
SaCkptSectionIdT section_id;
SaCkptSectionCreationAttributesT section_attr;
SaCkptCheckpointOpenFlagsT flags;
SaNameT name;
SaAisErrorT rv;
struct clog_request *rq;
int len, r = 0;
char buf[32];
LOG_DBG("Sending checkpointed data to %u", cp->requester);
len = snprintf((char *)(name.value), SA_MAX_NAME_LENGTH,
"bitmaps_%s_%u", SHORT_UUID(cp->uuid), cp->requester);
name.length = (SaUint16T)len;
len = (int)strlen(cp->recovering_region) + 1;
attr.creationFlags = SA_CKPT_WR_ALL_REPLICAS;
attr.checkpointSize = cp->bitmap_size * 2 + len;
attr.retentionDuration = SA_TIME_MAX;
attr.maxSections = 4; /* don't know why we need +1 */
attr.maxSectionSize = (cp->bitmap_size > len) ? cp->bitmap_size : len;
attr.maxSectionIdSize = 22;
flags = SA_CKPT_CHECKPOINT_READ |
SA_CKPT_CHECKPOINT_WRITE |
SA_CKPT_CHECKPOINT_CREATE;
open_retry:
rv = saCkptCheckpointOpen(ckpt_handle, &name, &attr, flags, 0, &h);
if (rv == SA_AIS_ERR_TRY_AGAIN) {
LOG_ERROR("export_checkpoint: ckpt open retry");
usleep(1000);
goto open_retry;
}
if (rv == SA_AIS_ERR_EXIST) {
LOG_DBG("export_checkpoint: checkpoint already exists");
return -EEXIST;
}
if (rv != SA_AIS_OK) {
LOG_ERROR("[%s] Failed to open checkpoint for %u: %s",
SHORT_UUID(cp->uuid), cp->requester,
str_ais_error(rv));
return -EIO; /* FIXME: better error */
}
/*
* Add section for sync_bits
*/
section_id.idLen = (SaUint16T)snprintf(buf, 32, "sync_bits");
section_id.id = (unsigned char *)buf;
section_attr.sectionId = &section_id;
section_attr.expirationTime = SA_TIME_END;
sync_create_retry:
rv = saCkptSectionCreate(h, &section_attr,
cp->sync_bits, cp->bitmap_size);
if (rv == SA_AIS_ERR_TRY_AGAIN) {
LOG_ERROR("Sync checkpoint section create retry");
usleep(1000);
goto sync_create_retry;
}
if (rv == SA_AIS_ERR_EXIST) {
LOG_DBG("Sync checkpoint section already exists");
saCkptCheckpointClose(h);
return -EEXIST;
}
if (rv != SA_AIS_OK) {
LOG_ERROR("Sync checkpoint section creation failed: %s",
str_ais_error(rv));
saCkptCheckpointClose(h);
return -EIO; /* FIXME: better error */
}
/*
* Add section for clean_bits
*/
section_id.idLen = snprintf(buf, 32, "clean_bits");
section_id.id = (unsigned char *)buf;
section_attr.sectionId = &section_id;
section_attr.expirationTime = SA_TIME_END;
clean_create_retry:
rv = saCkptSectionCreate(h, &section_attr, cp->clean_bits, cp->bitmap_size);
if (rv == SA_AIS_ERR_TRY_AGAIN) {
LOG_ERROR("Clean checkpoint section create retry");
usleep(1000);
goto clean_create_retry;
}
if (rv == SA_AIS_ERR_EXIST) {
LOG_DBG("Clean checkpoint section already exists");
saCkptCheckpointClose(h);
return -EEXIST;
}
if (rv != SA_AIS_OK) {
LOG_ERROR("Clean checkpoint section creation failed: %s",
str_ais_error(rv));
saCkptCheckpointClose(h);
return -EIO; /* FIXME: better error */
}
/*
* Add section for recovering_region
*/
section_id.idLen = snprintf(buf, 32, "recovering_region");
section_id.id = (unsigned char *)buf;
section_attr.sectionId = &section_id;
section_attr.expirationTime = SA_TIME_END;
rr_create_retry:
rv = saCkptSectionCreate(h, &section_attr, cp->recovering_region,
strlen(cp->recovering_region) + 1);
if (rv == SA_AIS_ERR_TRY_AGAIN) {
LOG_ERROR("RR checkpoint section create retry");
usleep(1000);
goto rr_create_retry;
}
if (rv == SA_AIS_ERR_EXIST) {
LOG_DBG("RR checkpoint section already exists");
saCkptCheckpointClose(h);
return -EEXIST;
}
if (rv != SA_AIS_OK) {
LOG_ERROR("RR checkpoint section creation failed: %s",
str_ais_error(rv));
saCkptCheckpointClose(h);
return -EIO; /* FIXME: better error */
}
LOG_DBG("export_checkpoint: closing checkpoint");
saCkptCheckpointClose(h);
rq = malloc(DM_ULOG_REQUEST_SIZE);
if (!rq) {
LOG_ERROR("export_checkpoint: Unable to allocate transfer structs");
return -ENOMEM;
}
memset(rq, 0, sizeof(*rq));
dm_list_init(&rq->u.list);
rq->u_rq.request_type = DM_ULOG_CHECKPOINT_READY;
rq->originator = cp->requester; /* FIXME: hack to overload meaning of originator */
strncpy(rq->u_rq.uuid, cp->uuid, CPG_MAX_NAME_LENGTH);
rq->u_rq.seq = my_cluster_id;
r = cluster_send(rq);
if (r)
LOG_ERROR("Failed to send checkpoint ready notice: %s",
strerror(-r));
free(rq);
return 0;
}
#else
static int export_checkpoint(struct checkpoint_data *cp)
{
int r, rq_size;
struct clog_request *rq;
rq_size = sizeof(*rq);
rq_size += RECOVERING_REGION_SECTION_SIZE;
rq_size += cp->bitmap_size * 2; /* clean|sync_bits */
rq = malloc(rq_size);
if (!rq) {
LOG_ERROR("export_checkpoint: "
"Unable to allocate transfer structs");
return -ENOMEM;
}
memset(rq, 0, rq_size);
dm_list_init(&rq->u.list);
rq->u_rq.request_type = DM_ULOG_CHECKPOINT_READY;
rq->originator = cp->requester;
strncpy(rq->u_rq.uuid, cp->uuid, CPG_MAX_NAME_LENGTH);
rq->u_rq.seq = my_cluster_id;
rq->u_rq.data_size = rq_size - sizeof(*rq);
/* Sync bits */
memcpy(rq->u_rq.data, cp->sync_bits, cp->bitmap_size);
/* Clean bits */
memcpy(rq->u_rq.data + cp->bitmap_size, cp->clean_bits, cp->bitmap_size);
/* Recovering region */
memcpy(rq->u_rq.data + (cp->bitmap_size * 2), cp->recovering_region,
strlen(cp->recovering_region));
r = cluster_send(rq);
if (r)
LOG_ERROR("Failed to send checkpoint ready notice: %s",
strerror(-r));
free(rq);
return 0;
}
#endif /* CMIRROR_HAS_CHECKPOINT */
#if CMIRROR_HAS_CHECKPOINT
static int import_checkpoint(struct clog_cpg *entry, int no_read,
struct clog_request *rq __attribute__((unused)))
{
int rtn = 0;
SaCkptCheckpointHandleT h;
SaCkptSectionIterationHandleT itr;
SaCkptSectionDescriptorT desc;
SaCkptIOVectorElementT iov;
SaNameT name;
SaAisErrorT rv;
char *bitmap = NULL;
int len;
bitmap = malloc(1024*1024);
if (!bitmap)
return -ENOMEM;
len = snprintf((char *)(name.value), SA_MAX_NAME_LENGTH, "bitmaps_%s_%u",
SHORT_UUID(entry->name.value), my_cluster_id);
name.length = (SaUint16T)len;
open_retry:
rv = saCkptCheckpointOpen(ckpt_handle, &name, NULL,
SA_CKPT_CHECKPOINT_READ, 0, &h);
if (rv == SA_AIS_ERR_TRY_AGAIN) {
LOG_ERROR("import_checkpoint: ckpt open retry");
usleep(1000);
goto open_retry;
}
if (rv != SA_AIS_OK) {
LOG_ERROR("[%s] Failed to open checkpoint: %s",
SHORT_UUID(entry->name.value), str_ais_error(rv));
free(bitmap);
return -EIO; /* FIXME: better error */
}
unlink_retry:
rv = saCkptCheckpointUnlink(ckpt_handle, &name);
if (rv == SA_AIS_ERR_TRY_AGAIN) {
LOG_ERROR("import_checkpoint: ckpt unlink retry");
usleep(1000);
goto unlink_retry;
}
if (no_read) {
LOG_DBG("Checkpoint for this log already received");
goto no_read;
}
init_retry:
rv = saCkptSectionIterationInitialize(h, SA_CKPT_SECTIONS_ANY,
SA_TIME_END, &itr);
if (rv == SA_AIS_ERR_TRY_AGAIN) {
LOG_ERROR("import_checkpoint: sync create retry");
usleep(1000);
goto init_retry;
}
if (rv != SA_AIS_OK) {
LOG_ERROR("[%s] Sync checkpoint section creation failed: %s",
SHORT_UUID(entry->name.value), str_ais_error(rv));
free(bitmap);
return -EIO; /* FIXME: better error */
}
len = 0;
while (1) {
rv = saCkptSectionIterationNext(itr, &desc);
if (rv == SA_AIS_OK)
len++;
else if ((rv == SA_AIS_ERR_NO_SECTIONS) && len)
break;
else if (rv != SA_AIS_ERR_TRY_AGAIN) {
LOG_ERROR("saCkptSectionIterationNext failure: %d", rv);
break;
}
}
saCkptSectionIterationFinalize(itr);
if (len != 3) {
LOG_ERROR("import_checkpoint: %d checkpoint sections found",
len);
usleep(1000);
goto init_retry;
}
saCkptSectionIterationInitialize(h, SA_CKPT_SECTIONS_ANY,
SA_TIME_END, &itr);
while (1) {
rv = saCkptSectionIterationNext(itr, &desc);
if (rv == SA_AIS_ERR_NO_SECTIONS)
break;
if (rv == SA_AIS_ERR_TRY_AGAIN) {
LOG_ERROR("import_checkpoint: ckpt iternext retry");
usleep(1000);
continue;
}
if (rv != SA_AIS_OK) {
LOG_ERROR("import_checkpoint: clean checkpoint section "
"creation failed: %s", str_ais_error(rv));
rtn = -EIO; /* FIXME: better error */
goto fail;
}
if (!desc.sectionSize) {
LOG_ERROR("Checkpoint section empty");
continue;
}
memset(bitmap, 0, sizeof(*bitmap));
iov.sectionId = desc.sectionId;
iov.dataBuffer = bitmap;
iov.dataSize = desc.sectionSize;
iov.dataOffset = 0;
read_retry:
rv = saCkptCheckpointRead(h, &iov, 1, NULL);
if (rv == SA_AIS_ERR_TRY_AGAIN) {
LOG_ERROR("ckpt read retry");
usleep(1000);
goto read_retry;
}
if (rv != SA_AIS_OK) {
LOG_ERROR("import_checkpoint: ckpt read error: %s",
str_ais_error(rv));
rtn = -EIO; /* FIXME: better error */
goto fail;
}
if (iov.readSize) {
if (pull_state(entry->name.value, entry->luid,
(char *)desc.sectionId.id, bitmap,
iov.readSize)) {
LOG_ERROR("Error loading state");
rtn = -EIO;
goto fail;
}
} else {
/* Need to request new checkpoint */
rtn = -EAGAIN;
goto fail;
}
}
fail:
saCkptSectionIterationFinalize(itr);
no_read:
saCkptCheckpointClose(h);
free(bitmap);
return rtn;
}
#else
static int import_checkpoint(struct clog_cpg *entry, int no_read,
struct clog_request *rq)
{
int bitmap_size;
if (no_read) {
LOG_DBG("Checkpoint for this log already received");
return 0;
}
bitmap_size = (rq->u_rq.data_size - RECOVERING_REGION_SECTION_SIZE) / 2;
if (bitmap_size < 0) {
LOG_ERROR("Checkpoint has invalid payload size.");
return -EINVAL;
}
if (pull_state(entry->name.value, entry->luid, "sync_bits",
rq->u_rq.data, bitmap_size) ||
pull_state(entry->name.value, entry->luid, "clean_bits",
rq->u_rq.data + bitmap_size, bitmap_size) ||
pull_state(entry->name.value, entry->luid, "recovering_region",
rq->u_rq.data + (bitmap_size * 2),
RECOVERING_REGION_SECTION_SIZE)) {
LOG_ERROR("Error loading bitmap state from checkpoint.");
return -EIO;
}
return 0;
}
#endif /* CMIRROR_HAS_CHECKPOINT */
static void do_checkpoints(struct clog_cpg *entry, int leaving)
{
struct checkpoint_data *cp;
for (cp = entry->checkpoint_list; cp;) {
/*
* FIXME: Check return code. Could send failure
* notice in rq in export_checkpoint function
* by setting rq->error
*/
switch (export_checkpoint(cp)) {
case -EEXIST:
LOG_SPRINT(entry, "[%s] Checkpoint for %u already handled%s",
SHORT_UUID(entry->name.value), cp->requester,
(leaving) ? "(L)": "");
LOG_COND(log_checkpoint,
"[%s] Checkpoint for %u already handled%s",
SHORT_UUID(entry->name.value), cp->requester,
(leaving) ? "(L)": "");
entry->checkpoint_list = cp->next;
free_checkpoint(cp);
cp = entry->checkpoint_list;
break;
case 0:
LOG_SPRINT(entry, "[%s] Checkpoint data available for node %u%s",
SHORT_UUID(entry->name.value), cp->requester,
(leaving) ? "(L)": "");
LOG_COND(log_checkpoint,
"[%s] Checkpoint data available for node %u%s",
SHORT_UUID(entry->name.value), cp->requester,
(leaving) ? "(L)": "");
entry->checkpoint_list = cp->next;
free_checkpoint(cp);
cp = entry->checkpoint_list;
break;
default:
/* FIXME: Skipping will cause list corruption */
LOG_ERROR("[%s] Failed to export checkpoint for %u%s",
SHORT_UUID(entry->name.value), cp->requester,
(leaving) ? "(L)": "");
}
}
}
static int resend_requests(struct clog_cpg *entry)
{
int r = 0;
struct clog_request *rq, *n;
if (!entry->resend_requests || entry->delay)
return 0;
if (entry->state != VALID)
return 0;
entry->resend_requests = 0;
dm_list_iterate_items_gen_safe(rq, n, &entry->working_list, u.list) {
dm_list_del(&rq->u.list);
if (strcmp(entry->name.value, rq->u_rq.uuid)) {
LOG_ERROR("[%s] Stray request from another log (%s)",
SHORT_UUID(entry->name.value),
SHORT_UUID(rq->u_rq.uuid));
free(rq);
continue;
}
switch (rq->u_rq.request_type) {
case DM_ULOG_SET_REGION_SYNC:
/*
* Some requests simply do not need to be resent.
* If it is a request that just changes log state,
* then it doesn't need to be resent (everyone makes
* updates).
*/
LOG_COND(log_resend_requests,
"[%s] Skipping resend of %s/#%u...",
SHORT_UUID(entry->name.value),
_RQ_TYPE(rq->u_rq.request_type),
rq->u_rq.seq);
LOG_SPRINT(entry, "### No resend: [%s] %s/%u ###",
SHORT_UUID(entry->name.value),
_RQ_TYPE(rq->u_rq.request_type),
rq->u_rq.seq);
rq->u_rq.data_size = 0;
if (kernel_send(&rq->u_rq))
LOG_ERROR("Failed to respond to kernel [%s]",
RQ_TYPE(rq->u_rq.request_type));
break;
default:
/*
* If an action or a response is required, then
* the request must be resent.
*/
LOG_COND(log_resend_requests,
"[%s] Resending %s(#%u) due to new server(%u)",
SHORT_UUID(entry->name.value),
_RQ_TYPE(rq->u_rq.request_type),
rq->u_rq.seq, entry->lowest_id);
LOG_SPRINT(entry, "*** Resending: [%s] %s/%u ***",
SHORT_UUID(entry->name.value),
_RQ_TYPE(rq->u_rq.request_type),
rq->u_rq.seq);
r = cluster_send(rq);
if (r < 0)
LOG_ERROR("Failed resend");
}
free(rq);
}
return r;
}
static int do_cluster_work(void *data __attribute__((unused)))
{
int r = CS_OK;
struct clog_cpg *entry, *tmp;
dm_list_iterate_items_safe(entry, tmp, &clog_cpg_list) {
r = cpg_dispatch(entry->handle, CS_DISPATCH_ALL);
if (r != CS_OK) {
if ((r == CS_ERR_BAD_HANDLE) &&
((entry->state == INVALID) ||
(entry->state == LEAVING)))
/* It's ok if we've left the cluster */
r = CS_OK;
else
LOG_ERROR("cpg_dispatch failed: %s",
str_ais_error(r));
}
if (entry->free_me) {
free(entry);
continue;
}
do_checkpoints(entry, 0);
resend_requests(entry);
}
return (r == CS_OK) ? 0 : -1; /* FIXME: good error number? */
}
static int flush_startup_list(struct clog_cpg *entry)
{
int r = 0;
int i_was_server;
struct clog_request *rq, *n;
struct checkpoint_data *new;
dm_list_iterate_items_gen_safe(rq, n, &entry->startup_list, u.list) {
dm_list_del(&rq->u.list);
if (rq->u_rq.request_type == DM_ULOG_MEMBER_JOIN) {
new = prepare_checkpoint(entry, rq->originator);
if (!new) {
/*
* FIXME: Need better error handling. Other nodes
* will be trying to send the checkpoint too, and we
* must continue processing the list; so report error
* but continue.
*/
LOG_ERROR("Failed to prepare checkpoint for %u!!!",
rq->originator);
free(rq);
continue;
}
LOG_SPRINT(entry, "[%s] Checkpoint prepared for %u",
SHORT_UUID(entry->name.value), rq->originator);
LOG_COND(log_checkpoint, "[%s] Checkpoint prepared for %u",
SHORT_UUID(entry->name.value), rq->originator);
new->next = entry->checkpoint_list;
entry->checkpoint_list = new;
} else {
LOG_DBG("[%s] Processing delayed request: %s",
SHORT_UUID(rq->u_rq.uuid),
_RQ_TYPE(rq->u_rq.request_type));
i_was_server = (rq->pit_server == my_cluster_id) ? 1 : 0;
r = handle_cluster_request(entry, rq, i_was_server);
if (r)
/*
* FIXME: If we error out here, we will never get
* another opportunity to retry these requests
*/
LOG_ERROR("Error while processing delayed CPG message");
}
free(rq);
}
return 0;
}
static void cpg_message_callback(cpg_handle_t handle, const struct cpg_name *gname __attribute__((unused)),
uint32_t nodeid, uint32_t pid __attribute__((unused)),
void *msg, size_t msg_len)
{
int i;
int r = 0;
int i_am_server;
int response = 0;
struct clog_request *rq = msg;
struct clog_request *tmp_rq, *tmp_rq2;
struct clog_cpg *match;
match = find_clog_cpg(handle);
if (!match) {
LOG_ERROR("Unable to find clog_cpg for cluster message");
return;
}
/*
* Perform necessary endian and version compatibility conversions
*/
if (clog_request_from_network(rq, msg_len) < 0)
/* Any error messages come from 'clog_request_from_network' */
return;
if ((nodeid == my_cluster_id) &&
!(rq->u_rq.request_type & DM_ULOG_RESPONSE) &&
(rq->u_rq.request_type != DM_ULOG_RESUME) &&
(rq->u_rq.request_type != DM_ULOG_CLEAR_REGION) &&
(rq->u_rq.request_type != DM_ULOG_CHECKPOINT_READY)) {
tmp_rq = malloc(DM_ULOG_REQUEST_SIZE);
if (!tmp_rq) {
/*
* FIXME: It may be possible to continue... but we
* would not be able to resend any messages that might
* be necessary during membership changes
*/
LOG_ERROR("[%s] Unable to record request: -ENOMEM",
SHORT_UUID(rq->u_rq.uuid));
return;
}
memcpy(tmp_rq, rq, sizeof(*rq) + rq->u_rq.data_size);
dm_list_init(&tmp_rq->u.list);
dm_list_add(&match->working_list, &tmp_rq->u.list);
}
if (rq->u_rq.request_type == DM_ULOG_POSTSUSPEND) {
/*
* If the server (lowest_id) indicates it is leaving,
* then we must resend any outstanding requests. However,
* we do not want to resend them if the next server in
* line is in the process of leaving.
*/
if (nodeid == my_cluster_id) {
LOG_COND(log_resend_requests, "[%s] I am leaving.1.....",
SHORT_UUID(rq->u_rq.uuid));
} else {
if (nodeid < my_cluster_id) {
if (nodeid == match->lowest_id) {
match->resend_requests = 1;
LOG_COND(log_resend_requests, "[%s] %u is leaving, resend required%s",
SHORT_UUID(rq->u_rq.uuid), nodeid,
(dm_list_empty(&match->working_list)) ? " -- working_list empty": "");
dm_list_iterate_items_gen(tmp_rq, &match->working_list, u.list)
LOG_COND(log_resend_requests,
"[%s] %s/%u",
SHORT_UUID(tmp_rq->u_rq.uuid),
_RQ_TYPE(tmp_rq->u_rq.request_type),
tmp_rq->u_rq.seq);
}
match->delay++;
LOG_COND(log_resend_requests, "[%s] %u is leaving, delay = %d",
SHORT_UUID(rq->u_rq.uuid), nodeid, match->delay);
}
rq->originator = nodeid; /* don't really need this, but nice for debug */
goto out;
}
}
/*
* We can receive messages after we do a cpg_leave but before we
* get our config callback. However, since we can't respond after
* leaving, we simply return.
*/
if (match->state == LEAVING)
return;
i_am_server = (my_cluster_id == match->lowest_id) ? 1 : 0;
if (rq->u_rq.request_type == DM_ULOG_CHECKPOINT_READY) {
if (my_cluster_id == rq->originator) {
/* Redundant checkpoints ignored if match->valid */
LOG_SPRINT(match, "[%s] CHECKPOINT_READY notification from %u",
SHORT_UUID(rq->u_rq.uuid), nodeid);
if (import_checkpoint(match,
(match->state != INVALID), rq)) {
LOG_SPRINT(match,
"[%s] Failed to import checkpoint from %u",
SHORT_UUID(rq->u_rq.uuid), nodeid);
LOG_ERROR("[%s] Failed to import checkpoint from %u",
SHORT_UUID(rq->u_rq.uuid), nodeid);
kill(getpid(), SIGUSR1);
/* Could we retry? */
goto out;
} else if (match->state == INVALID) {
LOG_SPRINT(match,
"[%s] Checkpoint data received from %u. Log is now valid",
SHORT_UUID(match->name.value), nodeid);
LOG_COND(log_checkpoint,
"[%s] Checkpoint data received from %u. Log is now valid",
SHORT_UUID(match->name.value), nodeid);
match->state = VALID;
flush_startup_list(match);
} else {
LOG_SPRINT(match,
"[%s] Redundant checkpoint from %u ignored.",
SHORT_UUID(rq->u_rq.uuid), nodeid);
}
}
goto out;
}
if (rq->u_rq.request_type & DM_ULOG_RESPONSE) {
response = 1;
r = handle_cluster_response(match, rq);
} else {
rq->originator = nodeid;
if (match->state == LEAVING) {
LOG_ERROR("[%s] Ignoring %s from %u. Reason: I'm leaving",
SHORT_UUID(rq->u_rq.uuid), _RQ_TYPE(rq->u_rq.request_type),
rq->originator);
goto out;
}
if (match->state == INVALID) {
LOG_DBG("Log not valid yet, storing request");
if (!(tmp_rq2 = malloc(DM_ULOG_REQUEST_SIZE))) {
LOG_ERROR("cpg_message_callback: Unable to"
" allocate transfer structs");
r = -ENOMEM; /* FIXME: Better error #? */
goto out;
}
memcpy(tmp_rq2, rq, sizeof(*rq) + rq->u_rq.data_size);
tmp_rq2->pit_server = match->lowest_id;
dm_list_init(&tmp_rq2->u.list);
dm_list_add(&match->startup_list, &tmp_rq2->u.list);
goto out;
}
r = handle_cluster_request(match, rq, i_am_server);
}
/*
* If the log is now valid, we can queue the checkpoints
*/
for (i = match->checkpoints_needed; i; ) {
struct checkpoint_data *new;
if (log_get_state(&rq->u_rq) != LOG_RESUMED) {
LOG_DBG("[%s] Withholding checkpoints until log is valid (%s from %u)",
SHORT_UUID(rq->u_rq.uuid), _RQ_TYPE(rq->u_rq.request_type), nodeid);
break;
}
i--;
new = prepare_checkpoint(match, match->checkpoint_requesters[i]);
if (!new) {
/* FIXME: Need better error handling */
LOG_ERROR("[%s] Failed to prepare checkpoint for %u!!!",
SHORT_UUID(rq->u_rq.uuid), match->checkpoint_requesters[i]);
break;
}
LOG_SPRINT(match, "[%s] Checkpoint prepared for %u* (%s)",
SHORT_UUID(rq->u_rq.uuid), match->checkpoint_requesters[i],
(log_get_state(&rq->u_rq) != LOG_RESUMED)? "LOG_RESUMED": "LOG_SUSPENDED");
LOG_COND(log_checkpoint, "[%s] Checkpoint prepared for %u*",
SHORT_UUID(rq->u_rq.uuid), match->checkpoint_requesters[i]);
match->checkpoints_needed--;
new->next = match->checkpoint_list;
match->checkpoint_list = new;
}
out:
/* nothing happens after this point. It is just for debugging */
if (r) {
LOG_ERROR("[%s] Error while processing CPG message, %s: %s",
SHORT_UUID(rq->u_rq.uuid),
_RQ_TYPE(rq->u_rq.request_type & ~DM_ULOG_RESPONSE),
strerror(-r));
LOG_ERROR("[%s] Response : %s", SHORT_UUID(rq->u_rq.uuid),
(response) ? "YES" : "NO");
LOG_ERROR("[%s] Originator: %u",
SHORT_UUID(rq->u_rq.uuid), rq->originator);
if (response)
LOG_ERROR("[%s] Responder : %u",
SHORT_UUID(rq->u_rq.uuid), nodeid);
LOG_ERROR("HISTORY::");
for (i = 0; i < DEBUGGING_HISTORY; i++) {
match->idx++;
match->idx = match->idx % DEBUGGING_HISTORY;
if (match->debugging[match->idx][0] == '\0')
continue;
LOG_ERROR("%d:%d) %s", i, match->idx,
match->debugging[match->idx]);
}
} else if (!(rq->u_rq.request_type & DM_ULOG_RESPONSE) ||
(rq->originator == my_cluster_id)) {
if (!response)
LOG_SPRINT(match, "SEQ#=%u, UUID=%s, TYPE=%s, ORIG=%u, RESP=%s",
rq->u_rq.seq, SHORT_UUID(rq->u_rq.uuid),
_RQ_TYPE(rq->u_rq.request_type),
rq->originator, (response) ? "YES" : "NO");
else
LOG_SPRINT(match, "SEQ#=%u, UUID=%s, TYPE=%s, ORIG=%u, RESP=%s, RSPR=%u, error=%d",
rq->u_rq.seq, SHORT_UUID(rq->u_rq.uuid),
_RQ_TYPE(rq->u_rq.request_type),
rq->originator, (response) ? "YES" : "NO",
nodeid, rq->u_rq.error);
}
}
static void cpg_join_callback(struct clog_cpg *match,
const struct cpg_address *joined,
const struct cpg_address *member_list,
size_t member_list_entries)
{
unsigned i;
uint32_t my_pid = (uint32_t)getpid();
uint32_t lowest = match->lowest_id;
struct clog_request *rq;
char dbuf[32] = { 0 };
/* Assign my_cluster_id */
if ((my_cluster_id == 0xDEAD) && (joined->pid == my_pid))
my_cluster_id = joined->nodeid;
/* Am I the very first to join? */
if (member_list_entries == 1) {
match->lowest_id = joined->nodeid;
match->state = VALID;
}
/* If I am part of the joining list, I do not send checkpoints */
if (joined->nodeid == my_cluster_id)
goto out;
for (i = 0; i < member_list_entries - 1; i++)
sprintf(dbuf+strlen(dbuf), "%u-", member_list[i].nodeid);
sprintf(dbuf+strlen(dbuf), "(%u)", joined->nodeid);
LOG_COND(log_checkpoint, "[%s] Joining node, %u needs checkpoint [%s]",
SHORT_UUID(match->name.value), joined->nodeid, dbuf);
/*
* FIXME: remove checkpoint_requesters/checkpoints_needed, and use
* the startup_list interface exclusively
*/
if (dm_list_empty(&match->startup_list) && (match->state == VALID) &&
(match->checkpoints_needed < MAX_CHECKPOINT_REQUESTERS)) {
match->checkpoint_requesters[match->checkpoints_needed++] = joined->nodeid;
goto out;
}
rq = malloc(DM_ULOG_REQUEST_SIZE);
if (!rq) {
LOG_ERROR("cpg_config_callback: "
"Unable to allocate transfer structs");
LOG_ERROR("cpg_config_callback: "
"Unable to perform checkpoint");
goto out;
}
rq->u_rq.request_type = DM_ULOG_MEMBER_JOIN;
rq->originator = joined->nodeid;
dm_list_init(&rq->u.list);
dm_list_add(&match->startup_list, &rq->u.list);
out:
/* Find the lowest_id, i.e. the server */
match->lowest_id = member_list[0].nodeid;
for (i = 0; i < member_list_entries; i++)
if (match->lowest_id > member_list[i].nodeid)
match->lowest_id = member_list[i].nodeid;
if (lowest == 0xDEAD)
LOG_COND(log_membership_change, "[%s] Server change <none> -> %u (%u %s)",
SHORT_UUID(match->name.value), match->lowest_id,
joined->nodeid, (member_list_entries == 1) ?
"is first to join" : "joined");
else if (lowest != match->lowest_id)
LOG_COND(log_membership_change, "[%s] Server change %u -> %u (%u joined)",
SHORT_UUID(match->name.value), lowest,
match->lowest_id, joined->nodeid);
else
LOG_COND(log_membership_change, "[%s] Server unchanged at %u (%u joined)",
SHORT_UUID(match->name.value),
lowest, joined->nodeid);
LOG_SPRINT(match, "+++ UUID=%s %u join +++",
SHORT_UUID(match->name.value), joined->nodeid);
}
static void cpg_leave_callback(struct clog_cpg *match,
const struct cpg_address *left,
const struct cpg_address *member_list,
size_t member_list_entries)
{
unsigned i;
int j, fd;
uint32_t lowest = match->lowest_id;
struct clog_request *rq, *n;
struct checkpoint_data *p_cp, *c_cp;
LOG_SPRINT(match, "--- UUID=%s %u left ---",
SHORT_UUID(match->name.value), left->nodeid);
/* Am I leaving? */
if (my_cluster_id == left->nodeid) {
LOG_DBG("Finalizing leave...");
dm_list_del(&match->list);
cpg_fd_get(match->handle, &fd);
links_unregister(fd);
cluster_postsuspend(match->name.value, match->luid);
dm_list_iterate_items_gen_safe(rq, n, &match->working_list, u.list) {
dm_list_del(&rq->u.list);
if (rq->u_rq.request_type == DM_ULOG_POSTSUSPEND)
if (kernel_send(&rq->u_rq))
LOG_ERROR("Failed to respond to kernel [%s]",
RQ_TYPE(rq->u_rq.request_type));
free(rq);
}
cpg_finalize(match->handle);
match->free_me = 1;
match->lowest_id = 0xDEAD;
match->state = INVALID;
}
/* Remove any pending checkpoints for the leaving node. */
for (p_cp = NULL, c_cp = match->checkpoint_list;
c_cp && (c_cp->requester != left->nodeid);
p_cp = c_cp, c_cp = c_cp->next);
if (c_cp) {
if (p_cp)
p_cp->next = c_cp->next;
else
match->checkpoint_list = c_cp->next;
LOG_COND(log_checkpoint,
"[%s] Removing pending checkpoint (%u is leaving)",
SHORT_UUID(match->name.value), left->nodeid);
free_checkpoint(c_cp);
}
dm_list_iterate_items_gen_safe(rq, n, &match->startup_list, u.list) {
if ((rq->u_rq.request_type == DM_ULOG_MEMBER_JOIN) &&
(rq->originator == left->nodeid)) {
LOG_COND(log_checkpoint,
"[%s] Removing pending ckpt from startup list (%u is leaving)",
SHORT_UUID(match->name.value), left->nodeid);
dm_list_del(&rq->u.list);
free(rq);
}
}
for (i = 0, j = 0; i < match->checkpoints_needed; i++, j++) {
match->checkpoint_requesters[j] = match->checkpoint_requesters[i];
if (match->checkpoint_requesters[i] == left->nodeid) {
LOG_ERROR("[%s] Removing pending ckpt from needed list (%u is leaving)",
SHORT_UUID(match->name.value), left->nodeid);
j--;
}
}
match->checkpoints_needed = j;
if (left->nodeid < my_cluster_id) {
match->delay = (match->delay > 0) ? match->delay - 1 : 0;
if (!match->delay && dm_list_empty(&match->working_list))
match->resend_requests = 0;
LOG_COND(log_resend_requests, "[%s] %u has left, delay = %d%s",
SHORT_UUID(match->name.value), left->nodeid,
match->delay, (dm_list_empty(&match->working_list)) ?
" -- working_list empty": "");
}
/* Find the lowest_id, i.e. the server */
if (!member_list_entries) {
match->lowest_id = 0xDEAD;
LOG_COND(log_membership_change, "[%s] Server change %u -> <none> "
"(%u is last to leave)",
SHORT_UUID(match->name.value), left->nodeid,
left->nodeid);
return;
}
match->lowest_id = member_list[0].nodeid;
for (i = 0; i < member_list_entries; i++)
if (match->lowest_id > member_list[i].nodeid)
match->lowest_id = member_list[i].nodeid;
if (lowest != match->lowest_id) {
LOG_COND(log_membership_change, "[%s] Server change %u -> %u (%u left)",
SHORT_UUID(match->name.value), lowest,
match->lowest_id, left->nodeid);
} else
LOG_COND(log_membership_change, "[%s] Server unchanged at %u (%u left)",
SHORT_UUID(match->name.value), lowest, left->nodeid);
if ((match->state == INVALID) && !match->free_me) {
/*
* If all CPG members are waiting for checkpoints and they
* are all present in my startup_list, then I was the first to
* join and I must assume control.
*
* We do not normally end up here, but if there was a quick
* 'resume -> suspend -> resume' across the cluster, we may
* have initially thought we were not the first to join because
* of the presence of out-going (and unable to respond) members.
*/
i = 1; /* We do not have a DM_ULOG_MEMBER_JOIN entry of our own */
dm_list_iterate_items_gen(rq, &match->startup_list, u.list)
if (rq->u_rq.request_type == DM_ULOG_MEMBER_JOIN)
i++;
if (i == member_list_entries) {
/*
* Last node who could have given me a checkpoint just left.
* Setting log state to VALID and acting as 'first join'.
*/
match->state = VALID;
flush_startup_list(match);
}
}
}
static void cpg_config_callback(cpg_handle_t handle, const struct cpg_name *gname __attribute__((unused)),
const struct cpg_address *member_list,
size_t member_list_entries,
const struct cpg_address *left_list,
size_t left_list_entries,
const struct cpg_address *joined_list,
size_t joined_list_entries)
{
struct clog_cpg *match;
int found = 0;
dm_list_iterate_items(match, &clog_cpg_list)
if (match->handle == handle) {
found = 1;
break;
}
if (!found) {
LOG_ERROR("Unable to find match for CPG config callback");
return;
}
if ((joined_list_entries + left_list_entries) > 1)
LOG_ERROR("[%s] More than one node joining/leaving",
SHORT_UUID(match->name.value));
if (joined_list_entries)
cpg_join_callback(match, joined_list,
member_list, member_list_entries);
else
cpg_leave_callback(match, left_list,
member_list, member_list_entries);
}
cpg_callbacks_t cpg_callbacks = {
.cpg_deliver_fn = cpg_message_callback,
.cpg_confchg_fn = cpg_config_callback,
};
/*
* remove_checkpoint
* @entry
*
* Returns: 1 if checkpoint removed, 0 if no checkpoints, -EXXX on error
*/
static int remove_checkpoint(struct clog_cpg *entry)
{
#if CMIRROR_HAS_CHECKPOINT
int len;
SaNameT name;
SaAisErrorT rv;
SaCkptCheckpointHandleT h;
len = snprintf((char *)(name.value), SA_MAX_NAME_LENGTH, "bitmaps_%s_%u",
SHORT_UUID(entry->name.value), my_cluster_id);
name.length = len;
open_retry:
rv = saCkptCheckpointOpen(ckpt_handle, &name, NULL,
SA_CKPT_CHECKPOINT_READ, 0, &h);
if (rv == SA_AIS_ERR_TRY_AGAIN) {
LOG_ERROR("abort_startup: ckpt open retry");
usleep(1000);
goto open_retry;
}
if (rv != SA_AIS_OK)
return 0;
LOG_DBG("[%s] Removing checkpoint", SHORT_UUID(entry->name.value));
unlink_retry:
rv = saCkptCheckpointUnlink(ckpt_handle, &name);
if (rv == SA_AIS_ERR_TRY_AGAIN) {
LOG_ERROR("abort_startup: ckpt unlink retry");
usleep(1000);
goto unlink_retry;
}
if (rv != SA_AIS_OK) {
LOG_ERROR("[%s] Failed to unlink checkpoint: %s",
SHORT_UUID(entry->name.value), str_ais_error(rv));
return -EIO;
}
saCkptCheckpointClose(h);
return 1;
#else
/* No checkpoint to remove, so 'success' */
return 1;
#endif
}
int create_cluster_cpg(char *uuid, uint64_t luid)
{
int r;
size_t size;
struct clog_cpg *new = NULL;
struct clog_cpg *tmp;
dm_list_iterate_items(tmp, &clog_cpg_list)
if (!strncmp(tmp->name.value, uuid, CPG_MAX_NAME_LENGTH)) {
LOG_ERROR("Log entry already exists: %s", uuid);
return -EEXIST;
}
new = malloc(sizeof(*new));
if (!new) {
LOG_ERROR("Unable to allocate memory for clog_cpg");
return -ENOMEM;
}
memset(new, 0, sizeof(*new));
dm_list_init(&new->list);
new->lowest_id = 0xDEAD;
dm_list_init(&new->startup_list);
dm_list_init(&new->working_list);
size = ((strlen(uuid) + 1) > CPG_MAX_NAME_LENGTH) ?
CPG_MAX_NAME_LENGTH : (strlen(uuid) + 1);
strncpy(new->name.value, uuid, size);
new->name.length = (uint32_t)size;
new->luid = luid;
/*
* Ensure there are no stale checkpoints around before we join
*/
if (remove_checkpoint(new) == 1)
LOG_COND(log_checkpoint,
"[%s] Removing checkpoints left from previous session",
SHORT_UUID(new->name.value));
r = cpg_initialize(&new->handle, &cpg_callbacks);
if (r != CS_OK) {
LOG_ERROR("cpg_initialize failed: Cannot join cluster");
free(new);
return -EPERM;
}
r = cpg_join(new->handle, &new->name);
if (r != CS_OK) {
LOG_ERROR("cpg_join failed: Cannot join cluster");
free(new);
return -EPERM;
}
new->cpg_state = VALID;
dm_list_add(&clog_cpg_list, &new->list);
LOG_DBG("New handle: %llu", (unsigned long long)new->handle);
LOG_DBG("New name: %s", new->name.value);
/* FIXME: better variable */
cpg_fd_get(new->handle, &r);
links_register(r, "cluster", do_cluster_work, NULL);
return 0;
}
static void abort_startup(struct clog_cpg *del)
{
struct clog_request *rq, *n;
LOG_DBG("[%s] CPG teardown before checkpoint received",
SHORT_UUID(del->name.value));
dm_list_iterate_items_gen_safe(rq, n, &del->startup_list, u.list) {
dm_list_del(&rq->u.list);
LOG_DBG("[%s] Ignoring request from %u: %s",
SHORT_UUID(del->name.value), rq->originator,
_RQ_TYPE(rq->u_rq.request_type));
free(rq);
}
remove_checkpoint(del);
}
static int _destroy_cluster_cpg(struct clog_cpg *del)
{
int r;
int state;
LOG_COND(log_resend_requests, "[%s] I am leaving.2.....",
SHORT_UUID(del->name.value));
/*
* We must send any left over checkpoints before
* leaving. If we don't, an incoming node could
* be stuck with no checkpoint and stall.
do_checkpoints(del); --- THIS COULD BE CAUSING OUR PROBLEMS:
- Incoming node deletes old checkpoints before joining
- A stale checkpoint is issued here by leaving node
- (leaving node leaves)
- Incoming node joins cluster and finds stale checkpoint.
- (leaving node leaves - option 2)
*/
do_checkpoints(del, 1);
state = del->state;
del->cpg_state = INVALID;
del->state = LEAVING;
/*
* If the state is VALID, we might be processing the
* startup list. If so, we certainly don't want to
* clear the startup_list here by calling abort_startup
*/
if (!dm_list_empty(&del->startup_list) && (state != VALID))
abort_startup(del);
r = cpg_leave(del->handle, &del->name);
if (r != CS_OK)
LOG_ERROR("Error leaving CPG!");
return 0;
}
int destroy_cluster_cpg(char *uuid)
{
struct clog_cpg *del, *tmp;
dm_list_iterate_items_safe(del, tmp, &clog_cpg_list)
if (!strncmp(del->name.value, uuid, CPG_MAX_NAME_LENGTH))
_destroy_cluster_cpg(del);
return 0;
}
int init_cluster(void)
{
#if CMIRROR_HAS_CHECKPOINT
SaAisErrorT rv;
rv = saCkptInitialize(&ckpt_handle, &callbacks, &version);
if (rv != SA_AIS_OK)
return EXIT_CLUSTER_CKPT_INIT;
#endif
dm_list_init(&clog_cpg_list);
return 0;
}
void cleanup_cluster(void)
{
#if CMIRROR_HAS_CHECKPOINT
SaAisErrorT err;
err = saCkptFinalize(ckpt_handle);
if (err != SA_AIS_OK)
LOG_ERROR("Failed to finalize checkpoint handle");
#endif
}
void cluster_debug(void)
{
struct checkpoint_data *cp;
struct clog_cpg *entry;
struct clog_request *rq;
int i;
LOG_ERROR("");
LOG_ERROR("CLUSTER COMPONENT DEBUGGING::");
dm_list_iterate_items(entry, &clog_cpg_list) {
LOG_ERROR("%s::", SHORT_UUID(entry->name.value));
LOG_ERROR(" lowest_id : %u", entry->lowest_id);
LOG_ERROR(" state : %s", (entry->state == INVALID) ?
"INVALID" : (entry->state == VALID) ? "VALID" :
(entry->state == LEAVING) ? "LEAVING" : "UNKNOWN");
LOG_ERROR(" cpg_state : %d", entry->cpg_state);
LOG_ERROR(" free_me : %d", entry->free_me);
LOG_ERROR(" delay : %d", entry->delay);
LOG_ERROR(" resend_requests : %d", entry->resend_requests);
LOG_ERROR(" checkpoints_needed: %d", entry->checkpoints_needed);
for (i = 0, cp = entry->checkpoint_list;
i < MAX_CHECKPOINT_REQUESTERS; i++)
if (cp)
cp = cp->next;
else
break;
LOG_ERROR(" CKPTs waiting : %d", i);
LOG_ERROR(" Working list:");
dm_list_iterate_items_gen(rq, &entry->working_list, u.list)
LOG_ERROR(" %s/%u", _RQ_TYPE(rq->u_rq.request_type),
rq->u_rq.seq);
LOG_ERROR(" Startup list:");
dm_list_iterate_items_gen(rq, &entry->startup_list, u.list)
LOG_ERROR(" %s/%u", _RQ_TYPE(rq->u_rq.request_type),
rq->u_rq.seq);
LOG_ERROR("Command History:");
for (i = 0; i < DEBUGGING_HISTORY; i++) {
entry->idx++;
entry->idx = entry->idx % DEBUGGING_HISTORY;
if (entry->debugging[entry->idx][0] == '\0')
continue;
LOG_ERROR("%d:%d) %s", i, entry->idx,
entry->debugging[entry->idx]);
}
}
}