1
0
mirror of git://sourceware.org/git/lvm2.git synced 2025-03-10 16:58:47 +03:00

After rebasing the cluster logging code, adjustments need to be

made to compensate for the changes in the kernel-side component
that recently went upstream.  (Things like: renamed structures,
removal of structure fields, and changes to arguments passed
between userspace and kernel.)
This commit is contained in:
Jonathan Earl Brassow 2009-07-21 15:34:53 +00:00
parent a9f628b78d
commit 31add81f69
10 changed files with 676 additions and 609 deletions

View File

@ -13,7 +13,7 @@
#include <linux/types.h>
#include <sys/socket.h>
#include <linux/netlink.h>
#include <linux/dm-clog-tfr.h>
#include <linux/dm-log-userspace.h>
#include <linux/dm-ioctl.h>
#include "functions.h"
@ -190,13 +190,13 @@ static void daemonize(void)
LOG_ERROR("Failed to create lockfile");
LOG_ERROR("Process already running?");
break;
case EXIT_KERNEL_TFR_SOCKET:
case EXIT_KERNEL_SOCKET:
LOG_ERROR("Unable to create netlink socket");
break;
case EXIT_KERNEL_TFR_BIND:
case EXIT_KERNEL_BIND:
LOG_ERROR("Unable to bind to netlink socket");
break;
case EXIT_KERNEL_TFR_SETSOCKOPT:
case EXIT_KERNEL_SETSOCKOPT:
LOG_ERROR("Unable to setsockopt on netlink socket");
break;
case EXIT_CLUSTER_CKPT_INIT:

View File

@ -14,7 +14,7 @@
#include <openais/cpg.h>
#include <openais/saCkpt.h>
#include "linux/dm-clog-tfr.h"
#include "linux/dm-log-userspace.h"
#include "list.h"
#include "functions.h"
#include "local.h"
@ -24,44 +24,44 @@
#include "cluster.h"
/* Open AIS error codes */
#define str_ais_error(x) \
((x) == SA_AIS_OK) ? "SA_AIS_OK" : \
((x) == SA_AIS_ERR_LIBRARY) ? "SA_AIS_ERR_LIBRARY" : \
((x) == SA_AIS_ERR_VERSION) ? "SA_AIS_ERR_VERSION" : \
((x) == SA_AIS_ERR_INIT) ? "SA_AIS_ERR_INIT" : \
((x) == SA_AIS_ERR_TIMEOUT) ? "SA_AIS_ERR_TIMEOUT" : \
((x) == SA_AIS_ERR_TRY_AGAIN) ? "SA_AIS_ERR_TRY_AGAIN" : \
#define str_ais_error(x) \
((x) == SA_AIS_OK) ? "SA_AIS_OK" : \
((x) == SA_AIS_ERR_LIBRARY) ? "SA_AIS_ERR_LIBRARY" : \
((x) == SA_AIS_ERR_VERSION) ? "SA_AIS_ERR_VERSION" : \
((x) == SA_AIS_ERR_INIT) ? "SA_AIS_ERR_INIT" : \
((x) == SA_AIS_ERR_TIMEOUT) ? "SA_AIS_ERR_TIMEOUT" : \
((x) == SA_AIS_ERR_TRY_AGAIN) ? "SA_AIS_ERR_TRY_AGAIN" : \
((x) == SA_AIS_ERR_INVALID_PARAM) ? "SA_AIS_ERR_INVALID_PARAM" : \
((x) == SA_AIS_ERR_NO_MEMORY) ? "SA_AIS_ERR_NO_MEMORY" : \
((x) == SA_AIS_ERR_BAD_HANDLE) ? "SA_AIS_ERR_BAD_HANDLE" : \
((x) == SA_AIS_ERR_BUSY) ? "SA_AIS_ERR_BUSY" : \
((x) == SA_AIS_ERR_ACCESS) ? "SA_AIS_ERR_ACCESS" : \
((x) == SA_AIS_ERR_NOT_EXIST) ? "SA_AIS_ERR_NOT_EXIST" : \
((x) == SA_AIS_ERR_NO_MEMORY) ? "SA_AIS_ERR_NO_MEMORY" : \
((x) == SA_AIS_ERR_BAD_HANDLE) ? "SA_AIS_ERR_BAD_HANDLE" : \
((x) == SA_AIS_ERR_BUSY) ? "SA_AIS_ERR_BUSY" : \
((x) == SA_AIS_ERR_ACCESS) ? "SA_AIS_ERR_ACCESS" : \
((x) == SA_AIS_ERR_NOT_EXIST) ? "SA_AIS_ERR_NOT_EXIST" : \
((x) == SA_AIS_ERR_NAME_TOO_LONG) ? "SA_AIS_ERR_NAME_TOO_LONG" : \
((x) == SA_AIS_ERR_EXIST) ? "SA_AIS_ERR_EXIST" : \
((x) == SA_AIS_ERR_NO_SPACE) ? "SA_AIS_ERR_NO_SPACE" : \
((x) == SA_AIS_ERR_INTERRUPT) ? "SA_AIS_ERR_INTERRUPT" : \
((x) == SA_AIS_ERR_EXIST) ? "SA_AIS_ERR_EXIST" : \
((x) == SA_AIS_ERR_NO_SPACE) ? "SA_AIS_ERR_NO_SPACE" : \
((x) == SA_AIS_ERR_INTERRUPT) ? "SA_AIS_ERR_INTERRUPT" : \
((x) == SA_AIS_ERR_NAME_NOT_FOUND) ? "SA_AIS_ERR_NAME_NOT_FOUND" : \
((x) == SA_AIS_ERR_NO_RESOURCES) ? "SA_AIS_ERR_NO_RESOURCES" : \
((x) == SA_AIS_ERR_NO_RESOURCES) ? "SA_AIS_ERR_NO_RESOURCES" : \
((x) == SA_AIS_ERR_NOT_SUPPORTED) ? "SA_AIS_ERR_NOT_SUPPORTED" : \
((x) == SA_AIS_ERR_BAD_OPERATION) ? "SA_AIS_ERR_BAD_OPERATION" : \
((x) == SA_AIS_ERR_FAILED_OPERATION) ? "SA_AIS_ERR_FAILED_OPERATION" : \
((x) == SA_AIS_ERR_MESSAGE_ERROR) ? "SA_AIS_ERR_MESSAGE_ERROR" : \
((x) == SA_AIS_ERR_QUEUE_FULL) ? "SA_AIS_ERR_QUEUE_FULL" : \
((x) == SA_AIS_ERR_QUEUE_FULL) ? "SA_AIS_ERR_QUEUE_FULL" : \
((x) == SA_AIS_ERR_QUEUE_NOT_AVAILABLE) ? "SA_AIS_ERR_QUEUE_NOT_AVAILABLE" : \
((x) == SA_AIS_ERR_BAD_FLAGS) ? "SA_AIS_ERR_BAD_FLAGS" : \
((x) == SA_AIS_ERR_TOO_BIG) ? "SA_AIS_ERR_TOO_BIG" : \
((x) == SA_AIS_ERR_NO_SECTIONS) ? "SA_AIS_ERR_NO_SECTIONS" : \
((x) == SA_AIS_ERR_BAD_FLAGS) ? "SA_AIS_ERR_BAD_FLAGS" : \
((x) == SA_AIS_ERR_TOO_BIG) ? "SA_AIS_ERR_TOO_BIG" : \
((x) == SA_AIS_ERR_NO_SECTIONS) ? "SA_AIS_ERR_NO_SECTIONS" : \
"ais_error_unknown"
#define DM_CLOG_RESPONSE 0x1000 /* in last byte of 32-bit value */
#define DM_CLOG_CHECKPOINT_READY 21
#define DM_CLOG_MEMBER_JOIN 22
#define DM_ULOG_RESPONSE 0x1000 /* in last byte of 32-bit value */
#define DM_ULOG_CHECKPOINT_READY 21
#define DM_ULOG_MEMBER_JOIN 22
#define _RQ_TYPE(x) \
((x) == DM_CLOG_CHECKPOINT_READY) ? "DM_CLOG_CHECKPOINT_READY": \
((x) == DM_CLOG_MEMBER_JOIN) ? "DM_CLOG_MEMBER_JOIN": \
RQ_TYPE((x) & ~DM_CLOG_RESPONSE)
#define _RQ_TYPE(x) \
((x) == DM_ULOG_CHECKPOINT_READY) ? "DM_ULOG_CHECKPOINT_READY": \
((x) == DM_ULOG_MEMBER_JOIN) ? "DM_ULOG_MEMBER_JOIN": \
RQ_TYPE((x) & ~DM_ULOG_RESPONSE)
static uint32_t my_cluster_id = 0xDEAD;
static SaCkptHandleT ckpt_handle = 0;
@ -71,10 +71,10 @@ static SaVersionT version = { 'B', 1, 1 };
#define DEBUGGING_HISTORY 100
//static char debugging[DEBUGGING_HISTORY][128];
//static int idx = 0;
#define LOG_SPRINT(cc, f, arg...) do { \
cc->idx++; \
cc->idx = cc->idx % DEBUGGING_HISTORY; \
sprintf(cc->debugging[cc->idx], f, ## arg); \
#define LOG_SPRINT(cc, f, arg...) do { \
cc->idx++; \
cc->idx = cc->idx % DEBUGGING_HISTORY; \
sprintf(cc->debugging[cc->idx], f, ## arg); \
} while (0)
static int log_resp_rec = 0;
@ -123,11 +123,11 @@ static struct list_head clog_cpg_list;
/*
* cluster_send
* @tfr
* @rq
*
* Returns: 0 on success, -Exxx on error
*/
int cluster_send(struct clog_tfr *tfr)
int cluster_send(struct clog_request *rq)
{
int r;
int count=0;
@ -136,18 +136,19 @@ int cluster_send(struct clog_tfr *tfr)
struct clog_cpg *entry, *tmp;
list_for_each_entry_safe(entry, tmp, &clog_cpg_list, list)
if (!strncmp(entry->name.value, tfr->uuid, CPG_MAX_NAME_LENGTH)) {
if (!strncmp(entry->name.value, rq->u_rq.uuid,
CPG_MAX_NAME_LENGTH)) {
found = 1;
break;
}
if (!found) {
tfr->error = -ENOENT;
rq->u_rq.error = -ENOENT;
return -ENOENT;
}
iov.iov_base = tfr;
iov.iov_len = sizeof(struct clog_tfr) + tfr->data_size;
iov.iov_base = rq;
iov.iov_len = sizeof(struct clog_request) + rq->u_rq.data_size;
if (entry->cpg_state != VALID)
return -EINVAL;
@ -159,17 +160,21 @@ int cluster_send(struct clog_tfr *tfr)
count++;
if (count < 10)
LOG_PRINT("[%s] Retry #%d of cpg_mcast_joined: %s",
SHORT_UUID(tfr->uuid), count, str_ais_error(r));
SHORT_UUID(rq->u_rq.uuid), count,
str_ais_error(r));
else if ((count < 100) && !(count % 10))
LOG_ERROR("[%s] Retry #%d of cpg_mcast_joined: %s",
SHORT_UUID(tfr->uuid), count, str_ais_error(r));
SHORT_UUID(rq->u_rq.uuid), count,
str_ais_error(r));
else if ((count < 1000) && !(count % 100))
LOG_ERROR("[%s] Retry #%d of cpg_mcast_joined: %s",
SHORT_UUID(tfr->uuid), count, str_ais_error(r));
SHORT_UUID(rq->u_rq.uuid), count,
str_ais_error(r));
else if ((count < 10000) && !(count % 1000))
LOG_ERROR("[%s] Retry #%d of cpg_mcast_joined: %s - "
"OpenAIS not handling the load?",
SHORT_UUID(tfr->uuid), count, str_ais_error(r));
SHORT_UUID(rq->u_rq.uuid), count,
str_ais_error(r));
usleep(1000);
} while (1);
@ -179,39 +184,38 @@ int cluster_send(struct clog_tfr *tfr)
/* error codes found in openais/cpg.h */
LOG_ERROR("cpg_mcast_joined error: %s", str_ais_error(r));
tfr->error = -EBADE;
rq->u_rq.error = -EBADE;
return -EBADE;
}
static struct clog_tfr *get_matching_tfr(struct clog_tfr *t, struct list_head *l)
static struct clog_request *get_matching_rq(struct clog_request *rq,
struct list_head *l)
{
struct clog_tfr *match;
struct list_head *p, *n;
struct clog_request *match, *n;
list_for_each_safe(p, n, l) {
match = (struct clog_tfr *)p;
if (match->seq == t->seq) {
list_del_init(p);
list_for_each_entry_safe(match, n, l, list) {
if (match->u_rq.seq == rq->u_rq.seq) {
list_del_init(&match->list);
return match;
}
}
return NULL;
}
static char tfr_buffer[DM_CLOG_TFR_SIZE];
static char rq_buffer[DM_ULOG_REQUEST_SIZE];
static int handle_cluster_request(struct clog_cpg *entry,
struct clog_tfr *tfr, int server)
struct clog_request *rq, int server)
{
int r = 0;
struct clog_tfr *t = (struct clog_tfr *)tfr_buffer;
struct clog_request *tmp = (struct clog_request *)rq_buffer;
/*
* We need a separate clog_tfr struct, one that can carry
* We need a separate dm_ulog_request struct, one that can carry
* a return payload. Otherwise, the memory address after
* tfr will be altered - leading to problems
* rq will be altered - leading to problems
*/
memset(t, 0, DM_CLOG_TFR_SIZE);
memcpy(t, tfr, sizeof(struct clog_tfr) + tfr->data_size);
memset(rq_buffer, 0, sizeof(rq_buffer));
memcpy(tmp, rq, sizeof(struct clog_request) + rq->u_rq.data_size);
/*
* With resumes, we only handle our own.
@ -220,28 +224,28 @@ static int handle_cluster_request(struct clog_cpg *entry,
* a cluster action to co-ordinate reading
* the disk and checkpointing
*/
if (t->request_type == DM_CLOG_RESUME) {
if (t->originator == my_cluster_id) {
r = do_request(t, server);
if (tmp->u_rq.request_type == DM_ULOG_RESUME) {
if (tmp->originator == my_cluster_id) {
r = do_request(tmp, server);
r = kernel_send(t);
r = kernel_send(&tmp->u_rq);
if (r < 0)
LOG_ERROR("Failed to send resume response to kernel");
}
return r;
}
r = do_request(t, server);
r = do_request(tmp, server);
if (server &&
(t->request_type != DM_CLOG_CLEAR_REGION) &&
(t->request_type != DM_CLOG_POSTSUSPEND)) {
t->request_type |= DM_CLOG_RESPONSE;
(tmp->u_rq.request_type != DM_ULOG_CLEAR_REGION) &&
(tmp->u_rq.request_type != DM_ULOG_POSTSUSPEND)) {
tmp->u_rq.request_type |= DM_ULOG_RESPONSE;
/*
* Errors from previous functions are in the tfr struct.
* Errors from previous functions are in the rq struct.
*/
r = cluster_send(t);
r = cluster_send(tmp);
if (r < 0)
LOG_ERROR("cluster_send failed: %s", strerror(-r));
}
@ -249,38 +253,38 @@ static int handle_cluster_request(struct clog_cpg *entry,
return r;
}
static int handle_cluster_response(struct clog_cpg *entry, struct clog_tfr *tfr)
static int handle_cluster_response(struct clog_cpg *entry,
struct clog_request *rq)
{
int r = 0;
struct clog_tfr *orig_tfr;
struct clog_request *orig_rq, *n;
/*
* If I didn't send it, then I don't care about the response
*/
if (tfr->originator != my_cluster_id)
if (rq->originator != my_cluster_id)
return 0;
tfr->request_type &= ~DM_CLOG_RESPONSE;
orig_tfr = get_matching_tfr(tfr, &entry->working_list);
if (!orig_tfr) {
struct list_head *p, *n;
struct clog_tfr *t;
rq->u_rq.request_type &= ~DM_ULOG_RESPONSE;
orig_rq = get_matching_rq(rq, &entry->working_list);
if (!orig_rq) {
/* Unable to find match for response */
LOG_ERROR("[%s] No match for cluster response: %s:%u",
SHORT_UUID(tfr->uuid),
_RQ_TYPE(tfr->request_type), tfr->seq);
SHORT_UUID(rq->u_rq.uuid),
_RQ_TYPE(rq->u_rq.request_type),
rq->u_rq.seq);
LOG_ERROR("Current local list:");
if (list_empty(&entry->working_list))
LOG_ERROR(" [none]");
list_for_each_safe(p, n, &entry->working_list) {
t = (struct clog_tfr *)p;
LOG_ERROR(" [%s] %s:%u", SHORT_UUID(t->uuid),
_RQ_TYPE(t->request_type), t->seq);
list_for_each_entry_safe(orig_rq, n, &entry->working_list, list) {
LOG_ERROR(" [%s] %s:%u",
SHORT_UUID(orig_rq->u_rq.uuid),
_RQ_TYPE(orig_rq->u_rq.request_type),
orig_rq->u_rq.seq);
}
return -EINVAL;
@ -289,19 +293,20 @@ static int handle_cluster_response(struct clog_cpg *entry, struct clog_tfr *tfr)
if (log_resp_rec > 0) {
LOG_COND(log_resend_requests,
"[%s] Response received to %s/#%u",
SHORT_UUID(tfr->uuid), _RQ_TYPE(tfr->request_type),
tfr->seq);
SHORT_UUID(rq->u_rq.uuid),
_RQ_TYPE(rq->u_rq.request_type),
rq->u_rq.seq);
log_resp_rec--;
}
/* FIXME: Ensure memcpy cannot explode */
memcpy(orig_tfr, tfr, sizeof(*tfr) + tfr->data_size);
memcpy(orig_rq, rq, sizeof(*rq) + rq->u_rq.data_size);
r = kernel_send(orig_tfr);
r = kernel_send(&orig_rq->u_rq);
if (r)
LOG_ERROR("Failed to send response to kernel");
free(orig_tfr);
free(orig_rq);
return r;
}
@ -409,7 +414,7 @@ static int export_checkpoint(struct checkpoint_data *cp)
SaCkptCheckpointOpenFlagsT flags;
SaNameT name;
SaAisErrorT rv;
struct clog_tfr *tfr;
struct clog_request *rq;
int len, r = 0;
char buf[32];
@ -546,25 +551,25 @@ rr_create_retry:
LOG_DBG("export_checkpoint: closing checkpoint");
saCkptCheckpointClose(h);
tfr = malloc(DM_CLOG_TFR_SIZE);
if (!tfr) {
rq = malloc(DM_ULOG_REQUEST_SIZE);
if (!rq) {
LOG_ERROR("export_checkpoint: Unable to allocate transfer structs");
return -ENOMEM;
}
memset(tfr, 0, sizeof(*tfr));
memset(rq, 0, sizeof(*rq));
INIT_LIST_HEAD((struct list_head *)&tfr->private);
tfr->request_type = DM_CLOG_CHECKPOINT_READY;
tfr->originator = cp->requester; /* FIXME: hack to overload meaning of originator */
strncpy(tfr->uuid, cp->uuid, CPG_MAX_NAME_LENGTH);
tfr->seq = my_cluster_id;
INIT_LIST_HEAD(&rq->list);
rq->u_rq.request_type = DM_ULOG_CHECKPOINT_READY;
rq->originator = cp->requester; /* FIXME: hack to overload meaning of originator */
strncpy(rq->u_rq.uuid, cp->uuid, CPG_MAX_NAME_LENGTH);
rq->u_rq.seq = my_cluster_id;
r = cluster_send(tfr);
r = cluster_send(rq);
if (r)
LOG_ERROR("Failed to send checkpoint ready notice: %s",
strerror(-r));
free(tfr);
free(rq);
return 0;
}
@ -728,8 +733,8 @@ static void do_checkpoints(struct clog_cpg *entry, int leaving)
for (cp = entry->checkpoint_list; cp;) {
/*
* FIXME: Check return code. Could send failure
* notice in tfr in export_checkpoint function
* by setting tfr->error
* notice in rq in export_checkpoint function
* by setting rq->error
*/
switch (export_checkpoint(cp)) {
case -EEXIST:
@ -768,8 +773,7 @@ static void do_checkpoints(struct clog_cpg *entry, int leaving)
static int resend_requests(struct clog_cpg *entry)
{
int r = 0;
struct list_head *p, *n;
struct clog_tfr *tfr;
struct clog_request *rq, *n;
if (!entry->resend_requests || entry->delay)
return 0;
@ -779,20 +783,19 @@ static int resend_requests(struct clog_cpg *entry)
entry->resend_requests = 0;
list_for_each_safe(p, n, &entry->working_list) {
list_del_init(p);
tfr = (struct clog_tfr *)p;
list_for_each_entry_safe(rq, n, &entry->working_list, list) {
list_del_init(&rq->list);
if (strcmp(entry->name.value, tfr->uuid)) {
if (strcmp(entry->name.value, rq->u_rq.uuid)) {
LOG_ERROR("[%s] Stray request from another log (%s)",
SHORT_UUID(entry->name.value),
SHORT_UUID(tfr->uuid));
free(tfr);
SHORT_UUID(rq->u_rq.uuid));
free(rq);
continue;
}
switch (tfr->request_type) {
case DM_CLOG_SET_REGION_SYNC:
switch (rq->u_rq.request_type) {
case DM_ULOG_SET_REGION_SYNC:
/*
* Some requests simply do not need to be resent.
* If it is a request that just changes log state,
@ -802,13 +805,15 @@ static int resend_requests(struct clog_cpg *entry)
LOG_COND(log_resend_requests,
"[%s] Skipping resend of %s/#%u...",
SHORT_UUID(entry->name.value),
_RQ_TYPE(tfr->request_type), tfr->seq);
_RQ_TYPE(rq->u_rq.request_type),
rq->u_rq.seq);
LOG_SPRINT(entry, "### No resend: [%s] %s/%u ###",
SHORT_UUID(entry->name.value),
_RQ_TYPE(tfr->request_type), tfr->seq);
_RQ_TYPE(rq->u_rq.request_type),
rq->u_rq.seq);
tfr->data_size = 0;
kernel_send(tfr);
rq->u_rq.data_size = 0;
kernel_send(&rq->u_rq);
break;
@ -820,16 +825,17 @@ static int resend_requests(struct clog_cpg *entry)
LOG_COND(log_resend_requests,
"[%s] Resending %s(#%u) due to new server(%u)",
SHORT_UUID(entry->name.value),
_RQ_TYPE(tfr->request_type),
tfr->seq, entry->lowest_id);
_RQ_TYPE(rq->u_rq.request_type),
rq->u_rq.seq, entry->lowest_id);
LOG_SPRINT(entry, "*** Resending: [%s] %s/%u ***",
SHORT_UUID(entry->name.value),
_RQ_TYPE(tfr->request_type), tfr->seq);
r = cluster_send(tfr);
_RQ_TYPE(rq->u_rq.request_type),
rq->u_rq.seq);
r = cluster_send(rq);
if (r < 0)
LOG_ERROR("Failed resend");
}
free(tfr);
free(rq);
}
return r;
@ -861,15 +867,14 @@ static int flush_startup_list(struct clog_cpg *entry)
{
int r = 0;
int i_was_server;
struct list_head *p, *n;
struct clog_tfr *tfr = NULL;
struct clog_request *rq, *n;
struct checkpoint_data *new;
list_for_each_safe(p, n, &entry->startup_list) {
list_del_init(p);
tfr = (struct clog_tfr *)p;
if (tfr->request_type == DM_CLOG_MEMBER_JOIN) {
new = prepare_checkpoint(entry, tfr->originator);
list_for_each_entry_safe(rq, n, &entry->startup_list, list) {
list_del_init(&rq->list);
if (rq->u_rq.request_type == DM_ULOG_MEMBER_JOIN) {
new = prepare_checkpoint(entry, rq->originator);
if (!new) {
/*
* FIXME: Need better error handling. Other nodes
@ -878,22 +883,22 @@ static int flush_startup_list(struct clog_cpg *entry)
* but continue.
*/
LOG_ERROR("Failed to prepare checkpoint for %u!!!",
tfr->originator);
free(tfr);
rq->originator);
free(rq);
continue;
}
LOG_SPRINT(entry, "[%s] Checkpoint prepared for %u",
SHORT_UUID(entry->name.value), tfr->originator);
SHORT_UUID(entry->name.value), rq->originator);
LOG_COND(log_checkpoint, "[%s] Checkpoint prepared for %u",
SHORT_UUID(entry->name.value), tfr->originator);
SHORT_UUID(entry->name.value), rq->originator);
new->next = entry->checkpoint_list;
entry->checkpoint_list = new;
} else {
LOG_DBG("[%s] Processing delayed request: %s",
SHORT_UUID(tfr->uuid), _RQ_TYPE(tfr->request_type));
i_was_server = (tfr->error == my_cluster_id) ? 1 : 0;
tfr->error = 0;
r = handle_cluster_request(entry, tfr, i_was_server);
SHORT_UUID(rq->u_rq.uuid),
_RQ_TYPE(rq->u_rq.request_type));
i_was_server = (rq->pit_server == my_cluster_id) ? 1 : 0;
r = handle_cluster_request(entry, rq, i_was_server);
if (r)
/*
@ -902,7 +907,7 @@ static int flush_startup_list(struct clog_cpg *entry)
*/
LOG_ERROR("Error while processing delayed CPG message");
}
free(tfr);
free(rq);
}
return 0;
@ -916,8 +921,8 @@ static void cpg_message_callback(cpg_handle_t handle, struct cpg_name *gname,
int r = 0;
int i_am_server;
int response = 0;
struct clog_tfr *tfr = msg;
struct clog_tfr *tmp_tfr = NULL;
struct clog_request *rq = msg;
struct clog_request *tmp_rq, *n;
struct clog_cpg *match;
match = find_clog_cpg(handle);
@ -927,27 +932,27 @@ static void cpg_message_callback(cpg_handle_t handle, struct cpg_name *gname,
}
if ((nodeid == my_cluster_id) &&
!(tfr->request_type & DM_CLOG_RESPONSE) &&
(tfr->request_type != DM_CLOG_RESUME) &&
(tfr->request_type != DM_CLOG_CLEAR_REGION) &&
(tfr->request_type != DM_CLOG_CHECKPOINT_READY)) {
tmp_tfr = malloc(DM_CLOG_TFR_SIZE);
if (!tmp_tfr) {
!(rq->u_rq.request_type & DM_ULOG_RESPONSE) &&
(rq->u_rq.request_type != DM_ULOG_RESUME) &&
(rq->u_rq.request_type != DM_ULOG_CLEAR_REGION) &&
(rq->u_rq.request_type != DM_ULOG_CHECKPOINT_READY)) {
tmp_rq = malloc(DM_ULOG_REQUEST_SIZE);
if (!tmp_rq) {
/*
* FIXME: It may be possible to continue... but we
* would not be able to resend any messages that might
* be necessary during membership changes
*/
LOG_ERROR("[%s] Unable to record request: -ENOMEM",
SHORT_UUID(tfr->uuid));
SHORT_UUID(rq->u_rq.uuid));
return;
}
memcpy(tmp_tfr, tfr, sizeof(*tfr) + tfr->data_size);
INIT_LIST_HEAD((struct list_head *)&tmp_tfr->private);
list_add_tail((struct list_head *)&tmp_tfr->private, &match->working_list);
memcpy(tmp_rq, rq, sizeof(*rq) + rq->u_rq.data_size);
INIT_LIST_HEAD(&tmp_rq->list);
list_add_tail(&tmp_rq->list, &match->working_list);
}
if (tfr->request_type == DM_CLOG_POSTSUSPEND) {
if (rq->u_rq.request_type == DM_ULOG_POSTSUSPEND) {
/*
* If the server (lowest_id) indicates it is leaving,
* then we must resend any outstanding requests. However,
@ -956,32 +961,29 @@ static void cpg_message_callback(cpg_handle_t handle, struct cpg_name *gname,
*/
if (nodeid == my_cluster_id) {
LOG_COND(log_resend_requests, "[%s] I am leaving.1.....",
SHORT_UUID(tfr->uuid));
SHORT_UUID(rq->u_rq.uuid));
} else {
if (nodeid < my_cluster_id) {
if (nodeid == match->lowest_id) {
struct list_head *p, *n;
match->resend_requests = 1;
LOG_COND(log_resend_requests, "[%s] %u is leaving, resend required%s",
SHORT_UUID(tfr->uuid), nodeid,
SHORT_UUID(rq->u_rq.uuid), nodeid,
(list_empty(&match->working_list)) ? " -- working_list empty": "");
list_for_each_safe(p, n, &match->working_list) {
tmp_tfr = (struct clog_tfr *)p;
list_for_each_entry_safe(tmp_rq, n, &match->working_list, list) {
LOG_COND(log_resend_requests,
"[%s] %s/%u",
SHORT_UUID(tmp_tfr->uuid),
_RQ_TYPE(tmp_tfr->request_type), tmp_tfr->seq);
SHORT_UUID(tmp_rq->u_rq.uuid),
_RQ_TYPE(tmp_rq->u_rq.request_type),
tmp_rq->u_rq.seq);
}
}
match->delay++;
LOG_COND(log_resend_requests, "[%s] %u is leaving, delay = %d",
SHORT_UUID(tfr->uuid), nodeid, match->delay);
SHORT_UUID(rq->u_rq.uuid), nodeid, match->delay);
}
tfr->originator = nodeid; /* don't really need this, but nice for debug */
rq->originator = nodeid; /* don't really need this, but nice for debug */
goto out;
}
}
@ -996,17 +998,17 @@ static void cpg_message_callback(cpg_handle_t handle, struct cpg_name *gname,
i_am_server = (my_cluster_id == match->lowest_id) ? 1 : 0;
if (tfr->request_type == DM_CLOG_CHECKPOINT_READY) {
if (my_cluster_id == tfr->originator) {
if (rq->u_rq.request_type == DM_ULOG_CHECKPOINT_READY) {
if (my_cluster_id == rq->originator) {
/* Redundant checkpoints ignored if match->valid */
LOG_SPRINT(match, "[%s] CHECKPOINT_READY notification from %u",
SHORT_UUID(tfr->uuid), nodeid);
SHORT_UUID(rq->u_rq.uuid), nodeid);
if (import_checkpoint(match, (match->state != INVALID))) {
LOG_SPRINT(match,
"[%s] Failed to import checkpoint from %u",
SHORT_UUID(tfr->uuid), nodeid);
SHORT_UUID(rq->u_rq.uuid), nodeid);
LOG_ERROR("[%s] Failed to import checkpoint from %u",
SHORT_UUID(tfr->uuid), nodeid);
SHORT_UUID(rq->u_rq.uuid), nodeid);
kill(getpid(), SIGUSR1);
/* Could we retry? */
goto out;
@ -1023,44 +1025,43 @@ static void cpg_message_callback(cpg_handle_t handle, struct cpg_name *gname,
} else {
LOG_SPRINT(match,
"[%s] Redundant checkpoint from %u ignored.",
SHORT_UUID(tfr->uuid), nodeid);
SHORT_UUID(rq->u_rq.uuid), nodeid);
}
}
goto out;
}
if (tfr->request_type & DM_CLOG_RESPONSE) {
if (rq->u_rq.request_type & DM_ULOG_RESPONSE) {
response = 1;
r = handle_cluster_response(match, tfr);
r = handle_cluster_response(match, rq);
} else {
tfr->originator = nodeid;
rq->originator = nodeid;
if (match->state == LEAVING) {
LOG_ERROR("[%s] Ignoring %s from %u. Reason: I'm leaving",
SHORT_UUID(tfr->uuid), _RQ_TYPE(tfr->request_type),
tfr->originator);
SHORT_UUID(rq->u_rq.uuid), _RQ_TYPE(rq->u_rq.request_type),
rq->originator);
goto out;
}
if (match->state == INVALID) {
LOG_DBG("Log not valid yet, storing request");
tmp_tfr = malloc(DM_CLOG_TFR_SIZE);
if (!tmp_tfr) {
tmp_rq = malloc(DM_ULOG_REQUEST_SIZE);
if (!tmp_rq) {
LOG_ERROR("cpg_message_callback: Unable to"
" allocate transfer structs");
r = -ENOMEM; /* FIXME: Better error #? */
goto out;
}
memcpy(tmp_tfr, tfr, sizeof(*tfr) + tfr->data_size);
tmp_tfr->error = match->lowest_id;
INIT_LIST_HEAD((struct list_head *)&tmp_tfr->private);
list_add_tail((struct list_head *)&tmp_tfr->private,
&match->startup_list);
memcpy(tmp_rq, rq, sizeof(*rq) + rq->u_rq.data_size);
tmp_rq->pit_server = match->lowest_id;
INIT_LIST_HEAD(&tmp_rq->list);
list_add_tail(&tmp_rq->list, &match->startup_list);
goto out;
}
r = handle_cluster_request(match, tfr, i_am_server);
r = handle_cluster_request(match, rq, i_am_server);
}
/*
@ -1069,9 +1070,9 @@ static void cpg_message_callback(cpg_handle_t handle, struct cpg_name *gname,
for (i = match->checkpoints_needed; i; ) {
struct checkpoint_data *new;
if (log_get_state(tfr) != LOG_RESUMED) {
if (log_get_state(&rq->u_rq) != LOG_RESUMED) {
LOG_DBG("[%s] Withholding checkpoints until log is valid (%s from %u)",
SHORT_UUID(tfr->uuid), _RQ_TYPE(tfr->request_type), nodeid);
SHORT_UUID(rq->u_rq.uuid), _RQ_TYPE(rq->u_rq.request_type), nodeid);
break;
}
@ -1080,14 +1081,14 @@ static void cpg_message_callback(cpg_handle_t handle, struct cpg_name *gname,
if (!new) {
/* FIXME: Need better error handling */
LOG_ERROR("[%s] Failed to prepare checkpoint for %u!!!",
SHORT_UUID(tfr->uuid), match->checkpoint_requesters[i]);
SHORT_UUID(rq->u_rq.uuid), match->checkpoint_requesters[i]);
break;
}
LOG_SPRINT(match, "[%s] Checkpoint prepared for %u* (%s)",
SHORT_UUID(tfr->uuid), match->checkpoint_requesters[i],
(log_get_state(tfr) != LOG_RESUMED)? "LOG_RESUMED": "LOG_SUSPENDED");
SHORT_UUID(rq->u_rq.uuid), match->checkpoint_requesters[i],
(log_get_state(&rq->u_rq) != LOG_RESUMED)? "LOG_RESUMED": "LOG_SUSPENDED");
LOG_COND(log_checkpoint, "[%s] Checkpoint prepared for %u*",
SHORT_UUID(tfr->uuid), match->checkpoint_requesters[i]);
SHORT_UUID(rq->u_rq.uuid), match->checkpoint_requesters[i]);
match->checkpoints_needed--;
new->next = match->checkpoint_list;
@ -1098,38 +1099,38 @@ out:
/* nothing happens after this point. It is just for debugging */
if (r) {
LOG_ERROR("[%s] Error while processing CPG message, %s: %s",
SHORT_UUID(tfr->uuid),
_RQ_TYPE(tfr->request_type & ~DM_CLOG_RESPONSE),
SHORT_UUID(rq->u_rq.uuid),
_RQ_TYPE(rq->u_rq.request_type & ~DM_ULOG_RESPONSE),
strerror(-r));
LOG_ERROR("[%s] Response : %s", SHORT_UUID(tfr->uuid),
LOG_ERROR("[%s] Response : %s", SHORT_UUID(rq->u_rq.uuid),
(response) ? "YES" : "NO");
LOG_ERROR("[%s] Originator: %u",
SHORT_UUID(tfr->uuid), tfr->originator);
SHORT_UUID(rq->u_rq.uuid), rq->originator);
if (response)
LOG_ERROR("[%s] Responder : %u",
SHORT_UUID(tfr->uuid), nodeid);
SHORT_UUID(rq->u_rq.uuid), nodeid);
LOG_ERROR("HISTORY::");
for (i = 0; i < DEBUGGING_HISTORY; i++) {
match->idx++;
match->idx = match->idx % DEBUGGING_HISTORY;
if (match->debugging[match->idx][0] == '\0')
continue;
LOG_ERROR("%d:%d) %s", i, match->idx,
match->debugging[match->idx]);
}
} else if (!(tfr->request_type & DM_CLOG_RESPONSE) ||
(tfr->originator == my_cluster_id)) {
LOG_ERROR("HISTORY::");
for (i = 0; i < DEBUGGING_HISTORY; i++) {
match->idx++;
match->idx = match->idx % DEBUGGING_HISTORY;
if (match->debugging[match->idx][0] == '\0')
continue;
LOG_ERROR("%d:%d) %s", i, match->idx,
match->debugging[match->idx]);
}
} else if (!(rq->u_rq.request_type & DM_ULOG_RESPONSE) ||
(rq->originator == my_cluster_id)) {
if (!response)
LOG_SPRINT(match, "SEQ#=%u, UUID=%s, TYPE=%s, ORIG=%u, RESP=%s",
tfr->seq, SHORT_UUID(tfr->uuid),
_RQ_TYPE(tfr->request_type),
tfr->originator, (response) ? "YES" : "NO");
rq->u_rq.seq, SHORT_UUID(rq->u_rq.uuid),
_RQ_TYPE(rq->u_rq.request_type),
rq->originator, (response) ? "YES" : "NO");
else
LOG_SPRINT(match, "SEQ#=%u, UUID=%s, TYPE=%s, ORIG=%u, RESP=%s, RSPR=%u",
tfr->seq, SHORT_UUID(tfr->uuid),
_RQ_TYPE(tfr->request_type),
tfr->originator, (response) ? "YES" : "NO",
rq->u_rq.seq, SHORT_UUID(rq->u_rq.uuid),
_RQ_TYPE(rq->u_rq.request_type),
rq->originator, (response) ? "YES" : "NO",
nodeid);
}
}
@ -1142,7 +1143,7 @@ static void cpg_join_callback(struct clog_cpg *match,
int i;
int my_pid = getpid();
uint32_t lowest = match->lowest_id;
struct clog_tfr *tfr;
struct clog_request *rq;
char dbuf[32];
/* Assign my_cluster_id */
@ -1176,18 +1177,18 @@ static void cpg_join_callback(struct clog_cpg *match,
goto out;
}
tfr = malloc(DM_CLOG_TFR_SIZE);
if (!tfr) {
rq = malloc(DM_ULOG_REQUEST_SIZE);
if (!rq) {
LOG_ERROR("cpg_config_callback: "
"Unable to allocate transfer structs");
LOG_ERROR("cpg_config_callback: "
"Unable to perform checkpoint");
goto out;
}
tfr->request_type = DM_CLOG_MEMBER_JOIN;
tfr->originator = joined->nodeid;
INIT_LIST_HEAD((struct list_head *)&tfr->private);
list_add_tail((struct list_head *)&tfr->private, &match->startup_list);
rq->u_rq.request_type = DM_ULOG_MEMBER_JOIN;
rq->originator = joined->nodeid;
INIT_LIST_HEAD(&rq->list);
list_add_tail(&rq->list, &match->startup_list);
out:
/* Find the lowest_id, i.e. the server */
@ -1219,9 +1220,8 @@ static void cpg_leave_callback(struct clog_cpg *match,
int member_list_entries)
{
int i, j, fd;
struct list_head *p, *n;
uint32_t lowest = match->lowest_id;
struct clog_tfr *tfr;
struct clog_request *rq, *n;
struct checkpoint_data *p_cp, *c_cp;
LOG_SPRINT(match, "--- UUID=%s %u left ---",
@ -1237,13 +1237,12 @@ static void cpg_leave_callback(struct clog_cpg *match,
cluster_postsuspend(match->name.value);
list_for_each_safe(p, n, &match->working_list) {
list_del_init(p);
tfr = (struct clog_tfr *)p;
list_for_each_entry_safe(rq, n, &match->working_list, list) {
list_del_init(&rq->list);
if (tfr->request_type == DM_CLOG_POSTSUSPEND)
kernel_send(tfr);
free(tfr);
if (rq->u_rq.request_type == DM_ULOG_POSTSUSPEND)
kernel_send(&rq->u_rq);
free(rq);
}
cpg_finalize(match->handle);
@ -1268,15 +1267,14 @@ static void cpg_leave_callback(struct clog_cpg *match,
SHORT_UUID(match->name.value), left->nodeid);
free_checkpoint(c_cp);
}
list_for_each_safe(p, n, &match->startup_list) {
tfr = (struct clog_tfr *)p;
if ((tfr->request_type == DM_CLOG_MEMBER_JOIN) &&
(tfr->originator == left->nodeid)) {
list_for_each_entry_safe(rq, n, &match->startup_list, list) {
if ((rq->u_rq.request_type == DM_ULOG_MEMBER_JOIN) &&
(rq->originator == left->nodeid)) {
LOG_COND(log_checkpoint,
"[%s] Removing pending ckpt from startup list (%u is leaving)",
SHORT_UUID(match->name.value), left->nodeid);
list_del_init(p);
free(tfr);
list_del_init(&rq->list);
free(rq);
}
}
for (i = 0, j = 0; i < match->checkpoints_needed; i++, j++) {
@ -1334,10 +1332,9 @@ static void cpg_leave_callback(struct clog_cpg *match,
* of the presence of out-going (and unable to respond) members.
*/
i = 1; /* We do not have a DM_CLOG_MEMBER_JOIN entry */
list_for_each_safe(p, n, &match->startup_list) {
tfr = (struct clog_tfr *)p;
if (tfr->request_type == DM_CLOG_MEMBER_JOIN)
i = 1; /* We do not have a DM_ULOG_MEMBER_JOIN entry of our own */
list_for_each_entry_safe(rq, n, &match->startup_list, list) {
if (rq->u_rq.request_type == DM_ULOG_MEMBER_JOIN)
i++;
}
@ -1383,7 +1380,7 @@ static void cpg_config_callback(cpg_handle_t handle, struct cpg_name *gname,
member_list, member_list_entries);
else
cpg_leave_callback(match, left_list,
member_list, member_list_entries);
member_list, member_list_entries);
}
cpg_callbacks_t cpg_callbacks = {
@ -1505,19 +1502,18 @@ int create_cluster_cpg(char *str)
static void abort_startup(struct clog_cpg *del)
{
struct list_head *p, *n;
struct clog_tfr *tfr = NULL;
struct clog_request *rq, *n;
LOG_DBG("[%s] CPG teardown before checkpoint received",
SHORT_UUID(del->name.value));
list_for_each_safe(p, n, &del->startup_list) {
list_del_init(p);
tfr = (struct clog_tfr *)p;
list_for_each_entry_safe(rq, n, &del->startup_list, list) {
list_del_init(&rq->list);
LOG_DBG("[%s] Ignoring request from %u: %s",
SHORT_UUID(del->name.value), tfr->originator,
_RQ_TYPE(tfr->request_type));
free(tfr);
SHORT_UUID(del->name.value), rq->originator,
_RQ_TYPE(rq->u_rq.request_type));
free(rq);
}
remove_checkpoint(del);
@ -1535,14 +1531,14 @@ static int _destroy_cluster_cpg(struct clog_cpg *del)
* We must send any left over checkpoints before
* leaving. If we don't, an incoming node could
* be stuck with no checkpoint and stall.
do_checkpoints(del); --- THIS COULD BE CAUSING OUR PROBLEMS:
do_checkpoints(del); --- THIS COULD BE CAUSING OUR PROBLEMS:
- Incoming node deletes old checkpoints before joining
- A stale checkpoint is issued here by leaving node
- (leaving node leaves)
- Incoming node joins cluster and finds stale checkpoint.
- (leaving node leaves - option 2)
*/
- Incoming node deletes old checkpoints before joining
- A stale checkpoint is issued here by leaving node
- (leaving node leaves)
- Incoming node joins cluster and finds stale checkpoint.
- (leaving node leaves - option 2)
*/
do_checkpoints(del, 1);
state = del->state;
@ -1577,12 +1573,8 @@ int destroy_cluster_cpg(char *str)
int init_cluster(void)
{
int i;
SaAisErrorT rv;
// for (i = 0; i < DEBUGGING_HISTORY; i++)
// debugging[i][0] = '\0';
INIT_LIST_HEAD(&clog_cpg_list);
rv = saCkptInitialize(&ckpt_handle, &callbacks, &version);
@ -1605,8 +1597,7 @@ void cluster_debug(void)
{
struct checkpoint_data *cp;
struct clog_cpg *entry, *tmp;
struct list_head *p, *n;
struct clog_tfr *t;
struct clog_request *rq, *n;
int i;
LOG_ERROR("");
@ -1630,15 +1621,15 @@ void cluster_debug(void)
break;
LOG_ERROR(" CKPTs waiting : %d", i);
LOG_ERROR(" Working list:");
list_for_each_safe(p, n, &entry->working_list) {
t = (struct clog_tfr *)p;
LOG_ERROR(" %s/%u", _RQ_TYPE(t->request_type), t->seq);
list_for_each_entry_safe(rq, n, &entry->working_list, list) {
LOG_ERROR(" %s/%u", _RQ_TYPE(rq->u_rq.request_type),
rq->u_rq.seq);
}
LOG_ERROR(" Startup list:");
list_for_each_safe(p, n, &entry->startup_list) {
t = (struct clog_tfr *)p;
LOG_ERROR(" %s/%u", _RQ_TYPE(t->request_type), t->seq);
list_for_each_entry_safe(rq, n, &entry->startup_list, list) {
LOG_ERROR(" %s/%u", _RQ_TYPE(rq->u_rq.request_type),
rq->u_rq.seq);
}
LOG_ERROR("Command History:");

View File

@ -1,6 +1,39 @@
#ifndef __CLUSTER_LOG_CLUSTER_DOT_H__
#define __CLUSTER_LOG_CLUSTER_DOT_H__
#include "list.h"
#include <linux/dm-log-userspace.h>
/*
* There is other information in addition to what can
* be found in the dm_ulog_request structure that we
* need for processing. 'clog_request' is the wrapping
* structure we use to make the additional fields
* available.
*/
struct clog_request {
struct list_head list;
/*
* 'originator' is the machine from which the requests
* was made.
*/
uint32_t originator;
/*
* 'pit_server' is the "point-in-time" server for the
* request. (I.e. The machine that was the server at
* the time the request was issued - only important during
* startup.
*/
uint32_t pit_server;
/*
* The request from the kernel that is being processed
*/
struct dm_ulog_request u_rq;
};
int init_cluster(void);
void cleanup_cluster(void);
void cluster_debug(void);
@ -8,6 +41,6 @@ void cluster_debug(void);
int create_cluster_cpg(char *str);
int destroy_cluster_cpg(char *str);
int cluster_send(struct clog_tfr *tfr);
int cluster_send(struct clog_request *rq);
#endif /* __CLUSTER_LOG_CLUSTER_DOT_H__ */

View File

@ -8,33 +8,15 @@
#define EXIT_LOCKFILE 2
#define EXIT_KERNEL_TFR_SOCKET 3 /* Failed netlink socket create */
#define EXIT_KERNEL_TFR_BIND 4
#define EXIT_KERNEL_TFR_SETSOCKOPT 5
#define EXIT_KERNEL_SOCKET 3 /* Failed netlink socket create */
#define EXIT_KERNEL_BIND 4
#define EXIT_KERNEL_SETSOCKOPT 5
#define EXIT_CLUSTER_CKPT_INIT 6 /* Failed to init checkpoint */
#define EXIT_QUEUE_NOMEM 7
/* Located in dm-clog-tfr.h
#define RQ_TYPE(x) \
((x) == DM_CLOG_CTR) ? "DM_CLOG_CTR" : \
((x) == DM_CLOG_DTR) ? "DM_CLOG_DTR" : \
((x) == DM_CLOG_PRESUSPEND) ? "DM_CLOG_PRESUSPEND" : \
((x) == DM_CLOG_POSTSUSPEND) ? "DM_CLOG_POSTSUSPEND" : \
((x) == DM_CLOG_RESUME) ? "DM_CLOG_RESUME" : \
((x) == DM_CLOG_GET_REGION_SIZE) ? "DM_CLOG_GET_REGION_SIZE" : \
((x) == DM_CLOG_IS_CLEAN) ? "DM_CLOG_IS_CLEAN" : \
((x) == DM_CLOG_IN_SYNC) ? "DM_CLOG_IN_SYNC" : \
((x) == DM_CLOG_FLUSH) ? "DM_CLOG_FLUSH" : \
((x) == DM_CLOG_MARK_REGION) ? "DM_CLOG_MARK_REGION" : \
((x) == DM_CLOG_CLEAR_REGION) ? "DM_CLOG_CLEAR_REGION" : \
((x) == DM_CLOG_GET_RESYNC_WORK) ? "DM_CLOG_GET_RESYNC_WORK" : \
((x) == DM_CLOG_SET_REGION_SYNC) ? "DM_CLOG_SET_REGION_SYNC" : \
((x) == DM_CLOG_GET_SYNC_COUNT) ? "DM_CLOG_GET_SYNC_COUNT" : \
((x) == DM_CLOG_STATUS_INFO) ? "DM_CLOG_STATUS_INFO" : \
((x) == DM_CLOG_STATUS_TABLE) ? "DM_CLOG_STATUS_TABLE" : \
NULL
*/
#define DM_ULOG_REQUEST_SIZE 1024
#endif /* __CLUSTER_LOG_COMMON_DOT_H__ */

File diff suppressed because it is too large Load Diff

View File

@ -1,19 +1,21 @@
#ifndef __CLOG_FUNCTIONS_DOT_H__
#define __CLOG_FUNCTIONS_DOT_H__
#include <linux/dm-clog-tfr.h>
#include <linux/dm-log-userspace.h>
#include "cluster.h"
#define LOG_RESUMED 1
#define LOG_SUSPENDED 2
int local_resume(struct clog_tfr *tfr);
int local_resume(struct dm_ulog_request *rq);
int cluster_postsuspend(char *);
int do_request(struct clog_tfr *tfr, int server);
int push_state(const char *uuid, const char *which, char **buf, uint32_t debug_who);
int do_request(struct clog_request *rq, int server);
int push_state(const char *uuid, const char *which,
char **buf, uint32_t debug_who);
int pull_state(const char *uuid, const char *which, char *buf, int size);
int log_get_state(struct clog_tfr *tfr);
int log_get_state(struct dm_ulog_request *rq);
int log_status(void);
void log_debug(void);

View File

@ -8,7 +8,7 @@
#include <linux/connector.h>
#include <linux/netlink.h>
#include "linux/dm-clog-tfr.h"
#include "linux/dm-log-userspace.h"
#include "functions.h"
#include "cluster.h"
#include "common.h"
@ -18,14 +18,14 @@
static int cn_fd; /* Connector (netlink) socket fd */
static char recv_buf[2048];
static char send_buf[2048];
/* FIXME: merge this function with kernel_send_helper */
static int kernel_ack(uint32_t seq, int error)
{
int r;
unsigned char buf[sizeof(struct nlmsghdr) + sizeof(struct cn_msg)];
struct nlmsghdr *nlh = (struct nlmsghdr *)buf;
struct nlmsghdr *nlh = (struct nlmsghdr *)send_buf;
struct cn_msg *msg = NLMSG_DATA(nlh);
if (error < 0) {
@ -33,7 +33,7 @@ static int kernel_ack(uint32_t seq, int error)
return -EINVAL;
}
memset(buf, 0, sizeof(buf));
memset(send_buf, 0, sizeof(send_buf));
nlh->nlmsg_seq = 0;
nlh->nlmsg_pid = getpid();
@ -42,8 +42,8 @@ static int kernel_ack(uint32_t seq, int error)
nlh->nlmsg_flags = 0;
msg->len = 0;
msg->id.idx = 0x4;
msg->id.val = 0x1;
msg->id.idx = CN_IDX_DM;
msg->id.val = CN_VAL_DM_USERSPACE_LOG;
msg->seq = seq;
msg->ack = error;
@ -58,23 +58,24 @@ static int kernel_ack(uint32_t seq, int error)
/*
* kernel_recv
* @tfr: the newly allocated request from kernel
* @rq: the newly allocated request from kernel
*
* Read requests from the kernel and allocate space for the new request.
* If there is no request from the kernel, *tfr is NULL.
* If there is no request from the kernel, *rq is NULL.
*
* This function is not thread safe due to returned stack pointer. In fact,
* the returned pointer must not be in-use when this function is called again.
*
* Returns: 0 on success, -EXXX on error
*/
static int kernel_recv(struct clog_tfr **tfr)
static int kernel_recv(struct clog_request **rq)
{
int r = 0;
int len;
struct cn_msg *msg;
struct dm_ulog_request *u_rq;
*tfr = NULL;
*rq = NULL;
memset(recv_buf, 0, sizeof(recv_buf));
len = recv(cn_fd, recv_buf, sizeof(recv_buf), 0);
@ -99,9 +100,9 @@ static int kernel_recv(struct clog_tfr **tfr)
goto fail;
}
if (msg->len > DM_CLOG_TFR_SIZE) {
if (msg->len > DM_ULOG_REQUEST_SIZE) {
LOG_ERROR("Not enough space to receive kernel request (%d/%d)",
msg->len, DM_CLOG_TFR_SIZE);
msg->len, DM_ULOG_REQUEST_SIZE);
r = -EBADE;
goto fail;
}
@ -115,10 +116,11 @@ static int kernel_recv(struct clog_tfr **tfr)
LOG_ERROR("len = %d, msg->len = %d", len, msg->len);
msg->data[msg->len] = '\0'; /* Cleaner way to ensure this? */
*tfr = (struct clog_tfr *)msg->data;
u_rq = (struct dm_ulog_request *)msg->data;
if (!(*tfr)->request_type) {
LOG_DBG("Bad transmission, requesting resend [%u]", msg->seq);
if (!u_rq->request_type) {
LOG_DBG("Bad transmission, requesting resend [%u]",
msg->seq);
r = -EAGAIN;
if (kernel_ack(msg->seq, EAGAIN)) {
@ -127,6 +129,25 @@ static int kernel_recv(struct clog_tfr **tfr)
r = -EBADE;
}
}
/*
* Now we've got sizeof(struct cn_msg) + sizeof(struct nlmsghdr)
* worth of space that precede the request structure from the
* kernel. Since that space isn't going to be used again, we
* can take it for our purposes; rather than allocating a whole
* new structure and doing a memcpy.
*
* We should really make sure 'clog_request' doesn't grow
* beyond what is available to us, but we need only check it
* once... perhaps at compile time?
*/
// *rq = container_of(u_rq, struct clog_request, u_rq);
*rq = (void *)u_rq -
(sizeof(struct clog_request) -
sizeof(struct dm_ulog_request));
/* Clear the wrapper container fields */
memset(*rq, 0, (void *)u_rq - (void *)(*rq));
break;
default:
LOG_ERROR("Unknown nlmsg_type");
@ -135,7 +156,7 @@ static int kernel_recv(struct clog_tfr **tfr)
fail:
if (r)
*tfr = NULL;
*rq = NULL;
return (r == -EAGAIN) ? 0 : r;
}
@ -145,11 +166,10 @@ static int kernel_send_helper(void *data, int out_size)
int r;
struct nlmsghdr *nlh;
struct cn_msg *msg;
unsigned char buf[2048];
memset(buf, 0, sizeof(buf));
memset(send_buf, 0, sizeof(send_buf));
nlh = (struct nlmsghdr *)buf;
nlh = (struct nlmsghdr *)send_buf;
nlh->nlmsg_seq = 0; /* FIXME: Is this used? */
nlh->nlmsg_pid = getpid();
nlh->nlmsg_type = NLMSG_DONE;
@ -159,8 +179,8 @@ static int kernel_send_helper(void *data, int out_size)
msg = NLMSG_DATA(nlh);
memcpy(msg->data, data, out_size);
msg->len = out_size;
msg->id.idx = 0x4;
msg->id.val = 0x1;
msg->id.idx = CN_IDX_DM;
msg->id.val = CN_VAL_DM_USERSPACE_LOG;
msg->seq = 0;
r = send(cn_fd, nlh, NLMSG_LENGTH(out_size + sizeof(struct cn_msg)), 0);
@ -174,7 +194,7 @@ static int kernel_send_helper(void *data, int out_size)
/*
* do_local_work
*
* Any processing errors are placed in the 'tfr'
* Any processing errors are placed in the 'rq'
* structure to be reported back to the kernel.
* It may be pointless for this function to
* return an int.
@ -184,73 +204,76 @@ static int kernel_send_helper(void *data, int out_size)
static int do_local_work(void *data)
{
int r;
struct clog_tfr *tfr = NULL;
struct clog_request *rq;
struct dm_ulog_request *u_rq = NULL;
r = kernel_recv(&tfr);
r = kernel_recv(&rq);
if (r)
return r;
if (!tfr)
if (!rq)
return 0;
u_rq = &rq->u_rq;
LOG_DBG("[%s] Request from kernel received: [%s/%u]",
SHORT_UUID(tfr->uuid), RQ_TYPE(tfr->request_type),
tfr->seq);
switch (tfr->request_type) {
case DM_CLOG_CTR:
case DM_CLOG_DTR:
case DM_CLOG_IN_SYNC:
case DM_CLOG_GET_SYNC_COUNT:
case DM_CLOG_STATUS_INFO:
case DM_CLOG_STATUS_TABLE:
case DM_CLOG_PRESUSPEND:
SHORT_UUID(u_rq->uuid), RQ_TYPE(u_rq->request_type),
u_rq->seq);
switch (u_rq->request_type) {
case DM_ULOG_CTR:
case DM_ULOG_DTR:
case DM_ULOG_GET_REGION_SIZE:
case DM_ULOG_IN_SYNC:
case DM_ULOG_GET_SYNC_COUNT:
case DM_ULOG_STATUS_INFO:
case DM_ULOG_STATUS_TABLE:
case DM_ULOG_PRESUSPEND:
/* We do not specify ourselves as server here */
r = do_request(tfr, 0);
r = do_request(rq, 0);
if (r)
LOG_DBG("Returning failed request to kernel [%s]",
RQ_TYPE(tfr->request_type));
r = kernel_send(tfr);
RQ_TYPE(u_rq->request_type));
r = kernel_send(u_rq);
if (r)
LOG_ERROR("Failed to respond to kernel [%s]",
RQ_TYPE(tfr->request_type));
RQ_TYPE(u_rq->request_type));
break;
case DM_CLOG_RESUME:
case DM_ULOG_RESUME:
/*
* Resume is a special case that requires a local
* component to join the CPG, and a cluster component
* to handle the request.
*/
r = local_resume(tfr);
r = local_resume(u_rq);
if (r) {
LOG_DBG("Returning failed request to kernel [%s]",
RQ_TYPE(tfr->request_type));
r = kernel_send(tfr);
RQ_TYPE(u_rq->request_type));
r = kernel_send(u_rq);
if (r)
LOG_ERROR("Failed to respond to kernel [%s]",
RQ_TYPE(tfr->request_type));
RQ_TYPE(u_rq->request_type));
break;
}
/* ELSE, fall through */
case DM_CLOG_IS_CLEAN:
case DM_CLOG_FLUSH:
case DM_CLOG_MARK_REGION:
case DM_CLOG_GET_RESYNC_WORK:
case DM_CLOG_SET_REGION_SYNC:
case DM_CLOG_IS_REMOTE_RECOVERING:
case DM_CLOG_POSTSUSPEND:
r = cluster_send(tfr);
case DM_ULOG_IS_CLEAN:
case DM_ULOG_FLUSH:
case DM_ULOG_MARK_REGION:
case DM_ULOG_GET_RESYNC_WORK:
case DM_ULOG_SET_REGION_SYNC:
case DM_ULOG_IS_REMOTE_RECOVERING:
case DM_ULOG_POSTSUSPEND:
r = cluster_send(rq);
if (r) {
tfr->data_size = 0;
tfr->error = r;
kernel_send(tfr);
u_rq->data_size = 0;
u_rq->error = r;
kernel_send(u_rq);
}
break;
case DM_CLOG_CLEAR_REGION:
r = kernel_ack(tfr->seq, 0);
case DM_ULOG_CLEAR_REGION:
r = kernel_ack(u_rq->seq, 0);
r = cluster_send(tfr);
r = cluster_send(rq);
if (r) {
/*
* FIXME: store error for delivery on flush
@ -260,24 +283,24 @@ static int do_local_work(void *data)
}
break;
case DM_CLOG_GET_REGION_SIZE:
default:
LOG_ERROR("Invalid log request received, ignoring.");
LOG_ERROR("Invalid log request received (%u), ignoring.",
u_rq->request_type);
return 0;
}
if (r && !tfr->error)
tfr->error = r;
if (r && !u_rq->error)
u_rq->error = r;
return r;
}
/*
* kernel_send
* @tfr: result to pass back to kernel
* @u_rq: result to pass back to kernel
*
* This function returns the tfr structure
* This function returns the u_rq structure
* (containing the results) to the kernel.
* It then frees the structure.
*
@ -288,21 +311,21 @@ static int do_local_work(void *data)
*
* Returns: 0 on success, -EXXX on failure
*/
int kernel_send(struct clog_tfr *tfr)
int kernel_send(struct dm_ulog_request *u_rq)
{
int r;
int size;
if (!tfr)
if (!u_rq)
return -EINVAL;
size = sizeof(struct clog_tfr) + tfr->data_size;
size = sizeof(struct dm_ulog_request) + u_rq->data_size;
if (!tfr->data_size && !tfr->error) {
if (!u_rq->data_size && !u_rq->error) {
/* An ACK is all that is needed */
/* FIXME: add ACK code */
} else if (size > DM_CLOG_TFR_SIZE) {
} else if (size > DM_ULOG_REQUEST_SIZE) {
/*
* If we gotten here, we've already overrun
* our allotted space somewhere.
@ -311,11 +334,11 @@ int kernel_send(struct clog_tfr *tfr)
* is waiting for a response.
*/
LOG_ERROR("Not enough space to respond to server");
tfr->error = -ENOSPC;
size = sizeof(struct clog_tfr);
u_rq->error = -ENOSPC;
size = sizeof(struct dm_ulog_request);
}
r = kernel_send_helper(tfr, size);
r = kernel_send_helper(u_rq, size);
if (r)
LOG_ERROR("Failed to send msg to kernel.");
@ -337,26 +360,26 @@ int init_local(void)
cn_fd = socket(PF_NETLINK, SOCK_DGRAM, NETLINK_CONNECTOR);
if (cn_fd < 0)
return EXIT_KERNEL_TFR_SOCKET;
return EXIT_KERNEL_SOCKET;
/* memset to fix valgrind complaint */
memset(&addr, 0, sizeof(struct sockaddr_nl));
addr.nl_family = AF_NETLINK;
addr.nl_groups = 0x4;
addr.nl_groups = CN_IDX_DM;
addr.nl_pid = 0;
r = bind(cn_fd, (struct sockaddr *) &addr, sizeof(addr));
if (r < 0) {
close(cn_fd);
return EXIT_KERNEL_TFR_BIND;
return EXIT_KERNEL_BIND;
}
opt = addr.nl_groups;
r = setsockopt(cn_fd, 270, NETLINK_ADD_MEMBERSHIP, &opt, sizeof(opt));
if (r) {
close(cn_fd);
return EXIT_KERNEL_TFR_SETSOCKOPT;
return EXIT_KERNEL_SETSOCKOPT;
}
/*

View File

@ -4,6 +4,6 @@
int init_local(void);
void cleanup_local(void);
int kernel_send(struct clog_tfr *tfr);
int kernel_send(struct dm_ulog_request *rq);
#endif /* __CLUSTER_LOG_LOCAL_DOT_H__ */

View File

@ -1,5 +1,27 @@
#include <stdio.h>
#include <syslog.h>
char *__rq_types_off_by_one[] = {
"DM_ULOG_CTR",
"DM_ULOG_DTR",
"DM_ULOG_PRESUSPEND",
"DM_ULOG_POSTSUSPEND",
"DM_ULOG_RESUME",
"DM_ULOG_GET_REGION_SIZE",
"DM_ULOG_IS_CLEAN",
"DM_ULOG_IN_SYNC",
"DM_ULOG_FLUSH",
"DM_ULOG_MARK_REGION",
"DM_ULOG_CLEAR_REGION",
"DM_ULOG_GET_RESYNC_WORK",
"DM_ULOG_SET_REGION_SYNC",
"DM_ULOG_GET_SYNC_COUNT",
"DM_ULOG_STATUS_INFO",
"DM_ULOG_STATUS_TABLE",
"DM_ULOG_IS_REMOTE_RECOVERING",
NULL
};
int log_tabbing = 0;
int log_is_open = 0;

View File

@ -31,6 +31,9 @@
/* SHORT_UUID - print last 8 chars of a string */
#define SHORT_UUID(x) (strlen(x) > 8) ? ((x) + (strlen(x) - 8)) : (x)
extern char *__rq_types_off_by_one[];
#define RQ_TYPE(x) __rq_types_off_by_one[(x) - 1]
extern int log_tabbing;
extern int log_is_open;
extern int log_membership_change;