diff --git a/daemons/Makefile.in b/daemons/Makefile.in index d93953896..ee4efe87a 100644 --- a/daemons/Makefile.in +++ b/daemons/Makefile.in @@ -15,14 +15,14 @@ srcdir = @srcdir@ top_srcdir = @top_srcdir@ VPATH = @srcdir@ -.PHONY: dmeventd clvmd +.PHONY: dmeventd clvmd cmirrord ifneq ("@CLVMD@", "none") SUBDIRS = clvmd endif ifeq ("@BUILD_CMIRRORD@", "yes") - SUBDIRS += clogd + SUBDIRS += cmirrord endif ifeq ("@BUILD_DMEVENTD@", "yes") diff --git a/daemons/clogd/Makefile.in b/daemons/clogd/Makefile.in deleted file mode 100644 index ce13d0241..000000000 --- a/daemons/clogd/Makefile.in +++ /dev/null @@ -1,33 +0,0 @@ -# -# Copyright (C) 2009 Red Hat, Inc. All rights reserved. -# -# This file is part of LVM2. -# -# This copyrighted material is made available to anyone wishing to use, -# modify, copy, or redistribute it subject to the terms and conditions -# of the GNU General Public License v.2. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software Foundation, -# Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - -srcdir = @srcdir@ -top_srcdir = @top_srcdir@ -VPATH = @srcdir@ - -SOURCES = clogd.c cluster.c functions.c link_mon.c local.c logging.c - -TARGETS = cmirrord - -include $(top_srcdir)/make.tmpl - -LDFLAGS += -L$(usrlibdir)/openais -LIBS += -lcpg -lSaCkpt -ldevmapper - -cmirrord: $(OBJECTS) $(top_srcdir)/lib/liblvm-internal.a - $(CC) -o cmirrord $(OBJECTS) $(CFLAGS) $(LDFLAGS) \ - $(LVMLIBS) $(LMLIBS) $(LIBS) - -install: $(TARGETS) - $(INSTALL) -D $(OWNER) $(GROUP) -m 555 $(STRIP) cmirrord \ - $(usrsbindir)/cmirrord diff --git a/daemons/clogd/clogd.c b/daemons/clogd/clogd.c deleted file mode 100644 index bfb74fadc..000000000 --- a/daemons/clogd/clogd.c +++ /dev/null @@ -1,276 +0,0 @@ -/* - * Copyright (C) 2004-2009 Red Hat, Inc. All rights reserved. - * - * This copyrighted material is made available to anyone wishing to use, - * modify, copy, or redistribute it subject to the terms and conditions - * of the GNU General Public License v.2. - * - * You should have received a copy of the GNU General Public License - * along with this program; if not, write to the Free Software Foundation, - * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - */ -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include "dm-log-userspace.h" -#include "functions.h" -#include "local.h" -#include "cluster.h" -#include "common.h" -#include "logging.h" -#include "link_mon.h" - -static int exit_now = 0; -static sigset_t signal_mask; -static int signal_received; - -static void process_signals(void); -static void daemonize(void); -static void init_all(void); -static void cleanup_all(void); - -int main(int argc, char *argv[]) -{ - daemonize(); - - init_all(); - - /* Parent can now exit, we're ready to handle requests */ - kill(getppid(), SIGTERM); - - LOG_PRINT("Starting cmirrord:"); - LOG_PRINT(" Built: "__DATE__" "__TIME__"\n"); - LOG_DBG(" Compiled with debugging."); - - while (!exit_now) { - links_monitor(); - - links_issue_callbacks(); - - process_signals(); - } - exit(EXIT_SUCCESS); -} - -/* - * parent_exit_handler: exit the parent - * @sig: the signal - * - */ -static void parent_exit_handler(int sig) -{ - exit_now = 1; -} - -/* - * create_lockfile - create and lock a lock file - * @lockfile: location of lock file - * - * Returns: 0 on success, -1 otherwise - */ -static int create_lockfile(char *lockfile) -{ - int fd; - struct flock lock; - char buffer[50]; - - if((fd = open(lockfile, O_CREAT | O_WRONLY, - (S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH))) < 0) - return -errno; - - lock.l_type = F_WRLCK; - lock.l_start = 0; - lock.l_whence = SEEK_SET; - lock.l_len = 0; - - if (fcntl(fd, F_SETLK, &lock) < 0) { - close(fd); - return -errno; - } - - if (ftruncate(fd, 0) < 0) { - close(fd); - return -errno; - } - - sprintf(buffer, "%d\n", getpid()); - - if(write(fd, buffer, strlen(buffer)) < strlen(buffer)){ - close(fd); - unlink(lockfile); - return -errno; - } - - return 0; -} - -static void sig_handler(int sig) -{ - sigaddset(&signal_mask, sig); - ++signal_received; -} - -static void process_signal(int sig){ - int r = 0; - - switch(sig) { - case SIGINT: - case SIGQUIT: - case SIGTERM: - case SIGHUP: - r += log_status(); - break; - case SIGUSR1: - case SIGUSR2: - log_debug(); - /*local_debug();*/ - cluster_debug(); - return; - default: - LOG_PRINT("Unknown signal received... ignoring"); - return; - } - - if (!r) { - LOG_DBG("No current cluster logs... safe to exit."); - cleanup_all(); - exit(EXIT_SUCCESS); - } - - LOG_ERROR("Cluster logs exist. Refusing to exit."); -} - -static void process_signals(void) -{ - int x; - - if (!signal_received) - return; - - signal_received = 0; - - for (x = 1; x < _NSIG; x++) { - if (sigismember(&signal_mask, x)) { - sigdelset(&signal_mask, x); - process_signal(x); - } - } -} - -/* - * daemonize - * - * Performs the steps necessary to become a daemon. - */ -static void daemonize(void) -{ - int pid; - int status; - - signal(SIGTERM, &parent_exit_handler); - - pid = fork(); - - if (pid < 0) { - LOG_ERROR("Unable to fork()"); - exit(EXIT_FAILURE); - } - - if (pid) { - /* Parent waits here for child to get going */ - while (!waitpid(pid, &status, WNOHANG) && !exit_now); - if (exit_now) - exit(EXIT_SUCCESS); - - switch (WEXITSTATUS(status)) { - case EXIT_LOCKFILE: - LOG_ERROR("Failed to create lockfile"); - LOG_ERROR("Process already running?"); - break; - case EXIT_KERNEL_SOCKET: - LOG_ERROR("Unable to create netlink socket"); - break; - case EXIT_KERNEL_BIND: - LOG_ERROR("Unable to bind to netlink socket"); - break; - case EXIT_KERNEL_SETSOCKOPT: - LOG_ERROR("Unable to setsockopt on netlink socket"); - break; - case EXIT_CLUSTER_CKPT_INIT: - LOG_ERROR("Unable to initialize checkpoint service"); - LOG_ERROR("Has the cluster infrastructure been started?"); - break; - case EXIT_FAILURE: - LOG_ERROR("Failed to start: Generic error"); - break; - default: - LOG_ERROR("Failed to start: Unknown error"); - break; - } - exit(EXIT_FAILURE); - } - - setsid(); - chdir("/"); - umask(0); - - close(0); close(1); close(2); - open("/dev/null", O_RDONLY); /* reopen stdin */ - open("/dev/null", O_WRONLY); /* reopen stdout */ - open("/dev/null", O_WRONLY); /* reopen stderr */ - - LOG_OPEN("cmirrord", LOG_PID, LOG_DAEMON); - - if (create_lockfile(CMIRRORD_PIDFILE)) - exit(EXIT_LOCKFILE); - - signal(SIGINT, &sig_handler); - signal(SIGQUIT, &sig_handler); - signal(SIGTERM, &sig_handler); - signal(SIGHUP, &sig_handler); - signal(SIGPIPE, SIG_IGN); - signal(SIGUSR1, &sig_handler); - signal(SIGUSR2, &sig_handler); - sigemptyset(&signal_mask); - signal_received = 0; -} - -/* - * init_all - * - * Initialize modules. Exit on failure. - */ -static void init_all(void) -{ - int r; - - if ((r = init_local()) || - (r = init_cluster())) { - exit(r); - } -} - -/* - * cleanup_all - * - * Clean up before exiting - */ -static void cleanup_all(void) -{ - cleanup_local(); - cleanup_cluster(); -} diff --git a/daemons/clogd/cluster.c b/daemons/clogd/cluster.c deleted file mode 100644 index 0cfe5a9e9..000000000 --- a/daemons/clogd/cluster.c +++ /dev/null @@ -1,1661 +0,0 @@ -/* - * Copyright (C) 2004-2009 Red Hat, Inc. All rights reserved. - * - * This copyrighted material is made available to anyone wishing to use, - * modify, copy, or redistribute it subject to the terms and conditions - * of the GNU Lesser General Public License v.2.1. - * - * You should have received a copy of the GNU Lesser General Public License - * along with this program; if not, write to the Free Software Foundation, - * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - */ -#include -#include -#include -#include -#include -#include -#include -#include /* These are for OpenAIS CPGs */ -#include -#include -#include -#include -#include -#include -#include - -#include "dm-log-userspace.h" -#include "libdevmapper.h" -#include "functions.h" -#include "local.h" -#include "common.h" -#include "logging.h" -#include "link_mon.h" -#include "cluster.h" - -/* Open AIS error codes */ -#define str_ais_error(x) \ - ((x) == SA_AIS_OK) ? "SA_AIS_OK" : \ - ((x) == SA_AIS_ERR_LIBRARY) ? "SA_AIS_ERR_LIBRARY" : \ - ((x) == SA_AIS_ERR_VERSION) ? "SA_AIS_ERR_VERSION" : \ - ((x) == SA_AIS_ERR_INIT) ? "SA_AIS_ERR_INIT" : \ - ((x) == SA_AIS_ERR_TIMEOUT) ? "SA_AIS_ERR_TIMEOUT" : \ - ((x) == SA_AIS_ERR_TRY_AGAIN) ? "SA_AIS_ERR_TRY_AGAIN" : \ - ((x) == SA_AIS_ERR_INVALID_PARAM) ? "SA_AIS_ERR_INVALID_PARAM" : \ - ((x) == SA_AIS_ERR_NO_MEMORY) ? "SA_AIS_ERR_NO_MEMORY" : \ - ((x) == SA_AIS_ERR_BAD_HANDLE) ? "SA_AIS_ERR_BAD_HANDLE" : \ - ((x) == SA_AIS_ERR_BUSY) ? "SA_AIS_ERR_BUSY" : \ - ((x) == SA_AIS_ERR_ACCESS) ? "SA_AIS_ERR_ACCESS" : \ - ((x) == SA_AIS_ERR_NOT_EXIST) ? "SA_AIS_ERR_NOT_EXIST" : \ - ((x) == SA_AIS_ERR_NAME_TOO_LONG) ? "SA_AIS_ERR_NAME_TOO_LONG" : \ - ((x) == SA_AIS_ERR_EXIST) ? "SA_AIS_ERR_EXIST" : \ - ((x) == SA_AIS_ERR_NO_SPACE) ? "SA_AIS_ERR_NO_SPACE" : \ - ((x) == SA_AIS_ERR_INTERRUPT) ? "SA_AIS_ERR_INTERRUPT" : \ - ((x) == SA_AIS_ERR_NAME_NOT_FOUND) ? "SA_AIS_ERR_NAME_NOT_FOUND" : \ - ((x) == SA_AIS_ERR_NO_RESOURCES) ? "SA_AIS_ERR_NO_RESOURCES" : \ - ((x) == SA_AIS_ERR_NOT_SUPPORTED) ? "SA_AIS_ERR_NOT_SUPPORTED" : \ - ((x) == SA_AIS_ERR_BAD_OPERATION) ? "SA_AIS_ERR_BAD_OPERATION" : \ - ((x) == SA_AIS_ERR_FAILED_OPERATION) ? "SA_AIS_ERR_FAILED_OPERATION" : \ - ((x) == SA_AIS_ERR_MESSAGE_ERROR) ? "SA_AIS_ERR_MESSAGE_ERROR" : \ - ((x) == SA_AIS_ERR_QUEUE_FULL) ? "SA_AIS_ERR_QUEUE_FULL" : \ - ((x) == SA_AIS_ERR_QUEUE_NOT_AVAILABLE) ? "SA_AIS_ERR_QUEUE_NOT_AVAILABLE" : \ - ((x) == SA_AIS_ERR_BAD_FLAGS) ? "SA_AIS_ERR_BAD_FLAGS" : \ - ((x) == SA_AIS_ERR_TOO_BIG) ? "SA_AIS_ERR_TOO_BIG" : \ - ((x) == SA_AIS_ERR_NO_SECTIONS) ? "SA_AIS_ERR_NO_SECTIONS" : \ - "ais_error_unknown" - -#define DM_ULOG_RESPONSE 0x1000 /* in last byte of 32-bit value */ -#define DM_ULOG_CHECKPOINT_READY 21 -#define DM_ULOG_MEMBER_JOIN 22 - -#define _RQ_TYPE(x) \ - ((x) == DM_ULOG_CHECKPOINT_READY) ? "DM_ULOG_CHECKPOINT_READY": \ - ((x) == DM_ULOG_MEMBER_JOIN) ? "DM_ULOG_MEMBER_JOIN": \ - RQ_TYPE((x) & ~DM_ULOG_RESPONSE) - -static uint32_t my_cluster_id = 0xDEAD; -static SaCkptHandleT ckpt_handle = 0; -static SaCkptCallbacksT callbacks = { 0, 0 }; -static SaVersionT version = { 'B', 1, 1 }; - -#define DEBUGGING_HISTORY 100 -//static char debugging[DEBUGGING_HISTORY][128]; -//static int idx = 0; -#define LOG_SPRINT(cc, f, arg...) do { \ - cc->idx++; \ - cc->idx = cc->idx % DEBUGGING_HISTORY; \ - sprintf(cc->debugging[cc->idx], f, ## arg); \ - } while (0) - -static int log_resp_rec = 0; - -struct checkpoint_data { - uint32_t requester; - char uuid[CPG_MAX_NAME_LENGTH]; - - int bitmap_size; /* in bytes */ - char *sync_bits; - char *clean_bits; - char *recovering_region; - struct checkpoint_data *next; -}; - -#define INVALID 0 -#define VALID 1 -#define LEAVING 2 - -#define MAX_CHECKPOINT_REQUESTERS 10 -struct clog_cpg { - struct dm_list list; - - uint32_t lowest_id; - cpg_handle_t handle; - struct cpg_name name; - uint64_t luid; - - /* Are we the first, or have we received checkpoint? */ - int state; - int cpg_state; /* FIXME: debugging */ - int free_me; - int delay; - int resend_requests; - struct dm_list startup_list; - struct dm_list working_list; - - int checkpoints_needed; - uint32_t checkpoint_requesters[MAX_CHECKPOINT_REQUESTERS]; - struct checkpoint_data *checkpoint_list; - int idx; - char debugging[DEBUGGING_HISTORY][128]; -}; - -static struct dm_list clog_cpg_list; - -/* - * cluster_send - * @rq - * - * Returns: 0 on success, -Exxx on error - */ -int cluster_send(struct clog_request *rq) -{ - int r; - int count=0; - int found; - struct iovec iov; - struct clog_cpg *entry; - - dm_list_iterate_items(entry, &clog_cpg_list) - if (!strncmp(entry->name.value, rq->u_rq.uuid, - CPG_MAX_NAME_LENGTH)) { - found = 1; - break; - } - - if (!found) { - rq->u_rq.error = -ENOENT; - return -ENOENT; - } - - /* - * Once the request heads for the cluster, the luid looses - * all its meaning. - */ - rq->u_rq.luid = 0; - - iov.iov_base = rq; - iov.iov_len = sizeof(struct clog_request) + rq->u_rq.data_size; - - if (entry->cpg_state != VALID) - return -EINVAL; - - do { - r = cpg_mcast_joined(entry->handle, CPG_TYPE_AGREED, &iov, 1); - if (r != SA_AIS_ERR_TRY_AGAIN) - break; - count++; - if (count < 10) - LOG_PRINT("[%s] Retry #%d of cpg_mcast_joined: %s", - SHORT_UUID(rq->u_rq.uuid), count, - str_ais_error(r)); - else if ((count < 100) && !(count % 10)) - LOG_ERROR("[%s] Retry #%d of cpg_mcast_joined: %s", - SHORT_UUID(rq->u_rq.uuid), count, - str_ais_error(r)); - else if ((count < 1000) && !(count % 100)) - LOG_ERROR("[%s] Retry #%d of cpg_mcast_joined: %s", - SHORT_UUID(rq->u_rq.uuid), count, - str_ais_error(r)); - else if ((count < 10000) && !(count % 1000)) - LOG_ERROR("[%s] Retry #%d of cpg_mcast_joined: %s - " - "OpenAIS not handling the load?", - SHORT_UUID(rq->u_rq.uuid), count, - str_ais_error(r)); - usleep(1000); - } while (1); - - if (r == CPG_OK) - return 0; - - /* error codes found in openais/cpg.h */ - LOG_ERROR("cpg_mcast_joined error: %s", str_ais_error(r)); - - rq->u_rq.error = -EBADE; - return -EBADE; -} - -static struct clog_request *get_matching_rq(struct clog_request *rq, - struct dm_list *l) -{ - struct clog_request *match, *n; - - dm_list_iterate_items_safe(match, n, l) - if (match->u_rq.seq == rq->u_rq.seq) { - dm_list_del(&match->list); - return match; - } - - return NULL; -} - -static char rq_buffer[DM_ULOG_REQUEST_SIZE]; -static int handle_cluster_request(struct clog_cpg *entry, - struct clog_request *rq, int server) -{ - int r = 0; - struct clog_request *tmp = (struct clog_request *)rq_buffer; - - /* - * We need a separate dm_ulog_request struct, one that can carry - * a return payload. Otherwise, the memory address after - * rq will be altered - leading to problems - */ - memset(rq_buffer, 0, sizeof(rq_buffer)); - memcpy(tmp, rq, sizeof(struct clog_request) + rq->u_rq.data_size); - - /* - * With resumes, we only handle our own. - * Resume is a special case that requires - * local action (to set up CPG), followed by - * a cluster action to co-ordinate reading - * the disk and checkpointing - */ - if (tmp->u_rq.request_type == DM_ULOG_RESUME) { - if (tmp->originator == my_cluster_id) { - r = do_request(tmp, server); - - r = kernel_send(&tmp->u_rq); - if (r < 0) - LOG_ERROR("Failed to send resume response to kernel"); - } - return r; - } - - r = do_request(tmp, server); - - if (server && - (tmp->u_rq.request_type != DM_ULOG_CLEAR_REGION) && - (tmp->u_rq.request_type != DM_ULOG_POSTSUSPEND)) { - tmp->u_rq.request_type |= DM_ULOG_RESPONSE; - - /* - * Errors from previous functions are in the rq struct. - */ - r = cluster_send(tmp); - if (r < 0) - LOG_ERROR("cluster_send failed: %s", strerror(-r)); - } - - return r; -} - -static int handle_cluster_response(struct clog_cpg *entry, - struct clog_request *rq) -{ - int r = 0; - struct clog_request *orig_rq; - - /* - * If I didn't send it, then I don't care about the response - */ - if (rq->originator != my_cluster_id) - return 0; - - rq->u_rq.request_type &= ~DM_ULOG_RESPONSE; - orig_rq = get_matching_rq(rq, &entry->working_list); - - if (!orig_rq) { - /* Unable to find match for response */ - - LOG_ERROR("[%s] No match for cluster response: %s:%u", - SHORT_UUID(rq->u_rq.uuid), - _RQ_TYPE(rq->u_rq.request_type), - rq->u_rq.seq); - - LOG_ERROR("Current local list:"); - if (dm_list_empty(&entry->working_list)) - LOG_ERROR(" [none]"); - - dm_list_iterate_items(orig_rq, &entry->working_list) - LOG_ERROR(" [%s] %s:%u", - SHORT_UUID(orig_rq->u_rq.uuid), - _RQ_TYPE(orig_rq->u_rq.request_type), - orig_rq->u_rq.seq); - - return -EINVAL; - } - - if (log_resp_rec > 0) { - LOG_COND(log_resend_requests, - "[%s] Response received to %s/#%u", - SHORT_UUID(rq->u_rq.uuid), - _RQ_TYPE(rq->u_rq.request_type), - rq->u_rq.seq); - log_resp_rec--; - } - - /* FIXME: Ensure memcpy cannot explode */ - memcpy(orig_rq, rq, sizeof(*rq) + rq->u_rq.data_size); - - r = kernel_send(&orig_rq->u_rq); - if (r) - LOG_ERROR("Failed to send response to kernel"); - - free(orig_rq); - return r; -} - -static struct clog_cpg *find_clog_cpg(cpg_handle_t handle) -{ - struct clog_cpg *match; - - dm_list_iterate_items(match, &clog_cpg_list) - if (match->handle == handle) - return match; - - return NULL; -} - -/* - * prepare_checkpoint - * @entry: clog_cpg describing the log - * @cp_requester: nodeid requesting the checkpoint - * - * Creates and fills in a new checkpoint_data struct. - * - * Returns: checkpoint_data on success, NULL on error - */ -static struct checkpoint_data *prepare_checkpoint(struct clog_cpg *entry, - uint32_t cp_requester) -{ - int r; - struct checkpoint_data *new; - - if (entry->state != VALID) { - /* - * We can't store bitmaps yet, because the log is not - * valid yet. - */ - LOG_ERROR("Forced to refuse checkpoint for nodeid %u - log not valid yet", - cp_requester); - return NULL; - } - - new = malloc(sizeof(*new)); - if (!new) { - LOG_ERROR("Unable to create checkpoint data for %u", - cp_requester); - return NULL; - } - memset(new, 0, sizeof(*new)); - new->requester = cp_requester; - strncpy(new->uuid, entry->name.value, entry->name.length); - - new->bitmap_size = push_state(entry->name.value, entry->luid, - "clean_bits", - &new->clean_bits, cp_requester); - if (new->bitmap_size <= 0) { - LOG_ERROR("Failed to store clean_bits to checkpoint for node %u", - new->requester); - free(new); - return NULL; - } - - new->bitmap_size = push_state(entry->name.value, entry->luid, - "sync_bits", - &new->sync_bits, cp_requester); - if (new->bitmap_size <= 0) { - LOG_ERROR("Failed to store sync_bits to checkpoint for node %u", - new->requester); - free(new->clean_bits); - free(new); - return NULL; - } - - r = push_state(entry->name.value, entry->luid, - "recovering_region", - &new->recovering_region, cp_requester); - if (r <= 0) { - LOG_ERROR("Failed to store recovering_region to checkpoint for node %u", - new->requester); - free(new->sync_bits); - free(new->clean_bits); - free(new); - return NULL; - } - LOG_DBG("[%s] Checkpoint prepared for node %u:", - SHORT_UUID(new->uuid), new->requester); - LOG_DBG(" bitmap_size = %d", new->bitmap_size); - - return new; -} - -/* - * free_checkpoint - * @cp: the checkpoint_data struct to free - * - */ -static void free_checkpoint(struct checkpoint_data *cp) -{ - free(cp->recovering_region); - free(cp->sync_bits); - free(cp->clean_bits); - free(cp); -} - -static int export_checkpoint(struct checkpoint_data *cp) -{ - SaCkptCheckpointCreationAttributesT attr; - SaCkptCheckpointHandleT h; - SaCkptSectionIdT section_id; - SaCkptSectionCreationAttributesT section_attr; - SaCkptCheckpointOpenFlagsT flags; - SaNameT name; - SaAisErrorT rv; - struct clog_request *rq; - int len, r = 0; - char buf[32]; - - LOG_DBG("Sending checkpointed data to %u", cp->requester); - - len = snprintf((char *)(name.value), SA_MAX_NAME_LENGTH, - "bitmaps_%s_%u", SHORT_UUID(cp->uuid), cp->requester); - name.length = len; - - len = strlen(cp->recovering_region) + 1; - - attr.creationFlags = SA_CKPT_WR_ALL_REPLICAS; - attr.checkpointSize = cp->bitmap_size * 2 + len; - - attr.retentionDuration = SA_TIME_MAX; - attr.maxSections = 4; /* don't know why we need +1 */ - - attr.maxSectionSize = (cp->bitmap_size > len) ? cp->bitmap_size : len; - attr.maxSectionIdSize = 22; - - flags = SA_CKPT_CHECKPOINT_READ | - SA_CKPT_CHECKPOINT_WRITE | - SA_CKPT_CHECKPOINT_CREATE; - -open_retry: - rv = saCkptCheckpointOpen(ckpt_handle, &name, &attr, flags, 0, &h); - if (rv == SA_AIS_ERR_TRY_AGAIN) { - LOG_ERROR("export_checkpoint: ckpt open retry"); - usleep(1000); - goto open_retry; - } - - if (rv == SA_AIS_ERR_EXIST) { - LOG_DBG("export_checkpoint: checkpoint already exists"); - return -EEXIST; - } - - if (rv != SA_AIS_OK) { - LOG_ERROR("[%s] Failed to open checkpoint for %u: %s", - SHORT_UUID(cp->uuid), cp->requester, - str_ais_error(rv)); - return -EIO; /* FIXME: better error */ - } - - /* - * Add section for sync_bits - */ - section_id.idLen = snprintf(buf, 32, "sync_bits"); - section_id.id = (unsigned char *)buf; - section_attr.sectionId = §ion_id; - section_attr.expirationTime = SA_TIME_END; - -sync_create_retry: - rv = saCkptSectionCreate(h, §ion_attr, - cp->sync_bits, cp->bitmap_size); - if (rv == SA_AIS_ERR_TRY_AGAIN) { - LOG_ERROR("Sync checkpoint section create retry"); - usleep(1000); - goto sync_create_retry; - } - - if (rv == SA_AIS_ERR_EXIST) { - LOG_DBG("Sync checkpoint section already exists"); - saCkptCheckpointClose(h); - return -EEXIST; - } - - if (rv != SA_AIS_OK) { - LOG_ERROR("Sync checkpoint section creation failed: %s", - str_ais_error(rv)); - saCkptCheckpointClose(h); - return -EIO; /* FIXME: better error */ - } - - /* - * Add section for clean_bits - */ - section_id.idLen = snprintf(buf, 32, "clean_bits"); - section_id.id = (unsigned char *)buf; - section_attr.sectionId = §ion_id; - section_attr.expirationTime = SA_TIME_END; - -clean_create_retry: - rv = saCkptSectionCreate(h, §ion_attr, cp->clean_bits, cp->bitmap_size); - if (rv == SA_AIS_ERR_TRY_AGAIN) { - LOG_ERROR("Clean checkpoint section create retry"); - usleep(1000); - goto clean_create_retry; - } - - if (rv == SA_AIS_ERR_EXIST) { - LOG_DBG("Clean checkpoint section already exists"); - saCkptCheckpointClose(h); - return -EEXIST; - } - - if (rv != SA_AIS_OK) { - LOG_ERROR("Clean checkpoint section creation failed: %s", - str_ais_error(rv)); - saCkptCheckpointClose(h); - return -EIO; /* FIXME: better error */ - } - - /* - * Add section for recovering_region - */ - section_id.idLen = snprintf(buf, 32, "recovering_region"); - section_id.id = (unsigned char *)buf; - section_attr.sectionId = §ion_id; - section_attr.expirationTime = SA_TIME_END; - -rr_create_retry: - rv = saCkptSectionCreate(h, §ion_attr, cp->recovering_region, - strlen(cp->recovering_region) + 1); - if (rv == SA_AIS_ERR_TRY_AGAIN) { - LOG_ERROR("RR checkpoint section create retry"); - usleep(1000); - goto rr_create_retry; - } - - if (rv == SA_AIS_ERR_EXIST) { - LOG_DBG("RR checkpoint section already exists"); - saCkptCheckpointClose(h); - return -EEXIST; - } - - if (rv != SA_AIS_OK) { - LOG_ERROR("RR checkpoint section creation failed: %s", - str_ais_error(rv)); - saCkptCheckpointClose(h); - return -EIO; /* FIXME: better error */ - } - - LOG_DBG("export_checkpoint: closing checkpoint"); - saCkptCheckpointClose(h); - - rq = malloc(DM_ULOG_REQUEST_SIZE); - if (!rq) { - LOG_ERROR("export_checkpoint: Unable to allocate transfer structs"); - return -ENOMEM; - } - memset(rq, 0, sizeof(*rq)); - - dm_list_init(&rq->list); - rq->u_rq.request_type = DM_ULOG_CHECKPOINT_READY; - rq->originator = cp->requester; /* FIXME: hack to overload meaning of originator */ - strncpy(rq->u_rq.uuid, cp->uuid, CPG_MAX_NAME_LENGTH); - rq->u_rq.seq = my_cluster_id; - - r = cluster_send(rq); - if (r) - LOG_ERROR("Failed to send checkpoint ready notice: %s", - strerror(-r)); - - free(rq); - return 0; -} - -static int import_checkpoint(struct clog_cpg *entry, int no_read) -{ - int rtn = 0; - SaCkptCheckpointHandleT h; - SaCkptSectionIterationHandleT itr; - SaCkptSectionDescriptorT desc; - SaCkptIOVectorElementT iov; - SaNameT name; - SaAisErrorT rv; - char *bitmap = NULL; - int len; - - bitmap = malloc(1024*1024); - if (!bitmap) - return -ENOMEM; - - len = snprintf((char *)(name.value), SA_MAX_NAME_LENGTH, "bitmaps_%s_%u", - SHORT_UUID(entry->name.value), my_cluster_id); - name.length = len; - -open_retry: - rv = saCkptCheckpointOpen(ckpt_handle, &name, NULL, - SA_CKPT_CHECKPOINT_READ, 0, &h); - if (rv == SA_AIS_ERR_TRY_AGAIN) { - LOG_ERROR("import_checkpoint: ckpt open retry"); - usleep(1000); - goto open_retry; - } - - if (rv != SA_AIS_OK) { - LOG_ERROR("[%s] Failed to open checkpoint: %s", - SHORT_UUID(entry->name.value), str_ais_error(rv)); - return -EIO; /* FIXME: better error */ - } - -unlink_retry: - rv = saCkptCheckpointUnlink(ckpt_handle, &name); - if (rv == SA_AIS_ERR_TRY_AGAIN) { - LOG_ERROR("import_checkpoint: ckpt unlink retry"); - usleep(1000); - goto unlink_retry; - } - - if (no_read) { - LOG_DBG("Checkpoint for this log already received"); - goto no_read; - } - -init_retry: - rv = saCkptSectionIterationInitialize(h, SA_CKPT_SECTIONS_ANY, - SA_TIME_END, &itr); - if (rv == SA_AIS_ERR_TRY_AGAIN) { - LOG_ERROR("import_checkpoint: sync create retry"); - usleep(1000); - goto init_retry; - } - - if (rv != SA_AIS_OK) { - LOG_ERROR("[%s] Sync checkpoint section creation failed: %s", - SHORT_UUID(entry->name.value), str_ais_error(rv)); - return -EIO; /* FIXME: better error */ - } - - len = 0; - while (1) { - rv = saCkptSectionIterationNext(itr, &desc); - if (rv == SA_AIS_OK) - len++; - else if ((rv == SA_AIS_ERR_NO_SECTIONS) && len) - break; - else if (rv != SA_AIS_ERR_TRY_AGAIN) { - LOG_ERROR("saCkptSectionIterationNext failure: %d", rv); - break; - } - } - saCkptSectionIterationFinalize(itr); - if (len != 3) { - LOG_ERROR("import_checkpoint: %d checkpoint sections found", - len); - usleep(1000); - goto init_retry; - } - saCkptSectionIterationInitialize(h, SA_CKPT_SECTIONS_ANY, - SA_TIME_END, &itr); - - while (1) { - rv = saCkptSectionIterationNext(itr, &desc); - if (rv == SA_AIS_ERR_NO_SECTIONS) - break; - - if (rv == SA_AIS_ERR_TRY_AGAIN) { - LOG_ERROR("import_checkpoint: ckpt iternext retry"); - usleep(1000); - continue; - } - - if (rv != SA_AIS_OK) { - LOG_ERROR("import_checkpoint: clean checkpoint section " - "creation failed: %s", str_ais_error(rv)); - rtn = -EIO; /* FIXME: better error */ - goto fail; - } - - if (!desc.sectionSize) { - LOG_ERROR("Checkpoint section empty"); - continue; - } - - memset(bitmap, 0, sizeof(*bitmap)); - iov.sectionId = desc.sectionId; - iov.dataBuffer = bitmap; - iov.dataSize = desc.sectionSize; - iov.dataOffset = 0; - - read_retry: - rv = saCkptCheckpointRead(h, &iov, 1, NULL); - if (rv == SA_AIS_ERR_TRY_AGAIN) { - LOG_ERROR("ckpt read retry"); - usleep(1000); - goto read_retry; - } - - if (rv != SA_AIS_OK) { - LOG_ERROR("import_checkpoint: ckpt read error: %s", - str_ais_error(rv)); - rtn = -EIO; /* FIXME: better error */ - goto fail; - } - - if (iov.readSize) { - if (pull_state(entry->name.value, entry->luid, - (char *)desc.sectionId.id, bitmap, - iov.readSize)) { - LOG_ERROR("Error loading state"); - rtn = -EIO; - goto fail; - } - } else { - /* Need to request new checkpoint */ - rtn = -EAGAIN; - goto fail; - } - } - -fail: - saCkptSectionIterationFinalize(itr); -no_read: - saCkptCheckpointClose(h); - - free(bitmap); - return rtn; -} - -static void do_checkpoints(struct clog_cpg *entry, int leaving) -{ - struct checkpoint_data *cp; - - for (cp = entry->checkpoint_list; cp;) { - /* - * FIXME: Check return code. Could send failure - * notice in rq in export_checkpoint function - * by setting rq->error - */ - switch (export_checkpoint(cp)) { - case -EEXIST: - LOG_SPRINT(entry, "[%s] Checkpoint for %u already handled%s", - SHORT_UUID(entry->name.value), cp->requester, - (leaving) ? "(L)": ""); - LOG_COND(log_checkpoint, - "[%s] Checkpoint for %u already handled%s", - SHORT_UUID(entry->name.value), cp->requester, - (leaving) ? "(L)": ""); - entry->checkpoint_list = cp->next; - free_checkpoint(cp); - cp = entry->checkpoint_list; - break; - case 0: - LOG_SPRINT(entry, "[%s] Checkpoint data available for node %u%s", - SHORT_UUID(entry->name.value), cp->requester, - (leaving) ? "(L)": ""); - LOG_COND(log_checkpoint, - "[%s] Checkpoint data available for node %u%s", - SHORT_UUID(entry->name.value), cp->requester, - (leaving) ? "(L)": ""); - entry->checkpoint_list = cp->next; - free_checkpoint(cp); - cp = entry->checkpoint_list; - break; - default: - /* FIXME: Skipping will cause list corruption */ - LOG_ERROR("[%s] Failed to export checkpoint for %u%s", - SHORT_UUID(entry->name.value), cp->requester, - (leaving) ? "(L)": ""); - } - } -} - -static int resend_requests(struct clog_cpg *entry) -{ - int r = 0; - struct clog_request *rq, *n; - - if (!entry->resend_requests || entry->delay) - return 0; - - if (entry->state != VALID) - return 0; - - entry->resend_requests = 0; - - dm_list_iterate_items_safe(rq, n, &entry->working_list) { - dm_list_del(&rq->list); - - if (strcmp(entry->name.value, rq->u_rq.uuid)) { - LOG_ERROR("[%s] Stray request from another log (%s)", - SHORT_UUID(entry->name.value), - SHORT_UUID(rq->u_rq.uuid)); - free(rq); - continue; - } - - switch (rq->u_rq.request_type) { - case DM_ULOG_SET_REGION_SYNC: - /* - * Some requests simply do not need to be resent. - * If it is a request that just changes log state, - * then it doesn't need to be resent (everyone makes - * updates). - */ - LOG_COND(log_resend_requests, - "[%s] Skipping resend of %s/#%u...", - SHORT_UUID(entry->name.value), - _RQ_TYPE(rq->u_rq.request_type), - rq->u_rq.seq); - LOG_SPRINT(entry, "### No resend: [%s] %s/%u ###", - SHORT_UUID(entry->name.value), - _RQ_TYPE(rq->u_rq.request_type), - rq->u_rq.seq); - - rq->u_rq.data_size = 0; - kernel_send(&rq->u_rq); - - break; - - default: - /* - * If an action or a response is required, then - * the request must be resent. - */ - LOG_COND(log_resend_requests, - "[%s] Resending %s(#%u) due to new server(%u)", - SHORT_UUID(entry->name.value), - _RQ_TYPE(rq->u_rq.request_type), - rq->u_rq.seq, entry->lowest_id); - LOG_SPRINT(entry, "*** Resending: [%s] %s/%u ***", - SHORT_UUID(entry->name.value), - _RQ_TYPE(rq->u_rq.request_type), - rq->u_rq.seq); - r = cluster_send(rq); - if (r < 0) - LOG_ERROR("Failed resend"); - } - free(rq); - } - - return r; -} - -static int do_cluster_work(void *data) -{ - int r = SA_AIS_OK; - struct clog_cpg *entry; - - dm_list_iterate_items(entry, &clog_cpg_list) { - r = cpg_dispatch(entry->handle, CPG_DISPATCH_ALL); - if (r != SA_AIS_OK) - LOG_ERROR("cpg_dispatch failed: %s", str_ais_error(r)); - - if (entry->free_me) { - free(entry); - continue; - } - do_checkpoints(entry, 0); - - resend_requests(entry); - } - - return (r == SA_AIS_OK) ? 0 : -1; /* FIXME: good error number? */ -} - -static int flush_startup_list(struct clog_cpg *entry) -{ - int r = 0; - int i_was_server; - struct clog_request *rq, *n; - struct checkpoint_data *new; - - dm_list_iterate_items_safe(rq, n, &entry->startup_list) { - dm_list_del(&rq->list); - - if (rq->u_rq.request_type == DM_ULOG_MEMBER_JOIN) { - new = prepare_checkpoint(entry, rq->originator); - if (!new) { - /* - * FIXME: Need better error handling. Other nodes - * will be trying to send the checkpoint too, and we - * must continue processing the list; so report error - * but continue. - */ - LOG_ERROR("Failed to prepare checkpoint for %u!!!", - rq->originator); - free(rq); - continue; - } - LOG_SPRINT(entry, "[%s] Checkpoint prepared for %u", - SHORT_UUID(entry->name.value), rq->originator); - LOG_COND(log_checkpoint, "[%s] Checkpoint prepared for %u", - SHORT_UUID(entry->name.value), rq->originator); - new->next = entry->checkpoint_list; - entry->checkpoint_list = new; - } else { - LOG_DBG("[%s] Processing delayed request: %s", - SHORT_UUID(rq->u_rq.uuid), - _RQ_TYPE(rq->u_rq.request_type)); - i_was_server = (rq->pit_server == my_cluster_id) ? 1 : 0; - r = handle_cluster_request(entry, rq, i_was_server); - - if (r) - /* - * FIXME: If we error out here, we will never get - * another opportunity to retry these requests - */ - LOG_ERROR("Error while processing delayed CPG message"); - } - free(rq); - } - - return 0; -} - -static void cpg_message_callback(cpg_handle_t handle, struct cpg_name *gname, - uint32_t nodeid, uint32_t pid, - void *msg, int msg_len) -{ - int i; - int r = 0; - int i_am_server; - int response = 0; - struct clog_request *rq = msg; - struct clog_request *tmp_rq; - struct clog_cpg *match; - - match = find_clog_cpg(handle); - if (!match) { - LOG_ERROR("Unable to find clog_cpg for cluster message"); - return; - } - - if ((nodeid == my_cluster_id) && - !(rq->u_rq.request_type & DM_ULOG_RESPONSE) && - (rq->u_rq.request_type != DM_ULOG_RESUME) && - (rq->u_rq.request_type != DM_ULOG_CLEAR_REGION) && - (rq->u_rq.request_type != DM_ULOG_CHECKPOINT_READY)) { - tmp_rq = malloc(DM_ULOG_REQUEST_SIZE); - if (!tmp_rq) { - /* - * FIXME: It may be possible to continue... but we - * would not be able to resend any messages that might - * be necessary during membership changes - */ - LOG_ERROR("[%s] Unable to record request: -ENOMEM", - SHORT_UUID(rq->u_rq.uuid)); - return; - } - memcpy(tmp_rq, rq, sizeof(*rq) + rq->u_rq.data_size); - dm_list_init(&tmp_rq->list); - dm_list_add( &match->working_list, &tmp_rq->list); - } - - if (rq->u_rq.request_type == DM_ULOG_POSTSUSPEND) { - /* - * If the server (lowest_id) indicates it is leaving, - * then we must resend any outstanding requests. However, - * we do not want to resend them if the next server in - * line is in the process of leaving. - */ - if (nodeid == my_cluster_id) { - LOG_COND(log_resend_requests, "[%s] I am leaving.1.....", - SHORT_UUID(rq->u_rq.uuid)); - } else { - if (nodeid < my_cluster_id) { - if (nodeid == match->lowest_id) { - match->resend_requests = 1; - LOG_COND(log_resend_requests, "[%s] %u is leaving, resend required%s", - SHORT_UUID(rq->u_rq.uuid), nodeid, - (dm_list_empty(&match->working_list)) ? " -- working_list empty": ""); - - dm_list_iterate_items(tmp_rq, &match->working_list) - LOG_COND(log_resend_requests, - "[%s] %s/%u", - SHORT_UUID(tmp_rq->u_rq.uuid), - _RQ_TYPE(tmp_rq->u_rq.request_type), - tmp_rq->u_rq.seq); - } - - match->delay++; - LOG_COND(log_resend_requests, "[%s] %u is leaving, delay = %d", - SHORT_UUID(rq->u_rq.uuid), nodeid, match->delay); - } - rq->originator = nodeid; /* don't really need this, but nice for debug */ - goto out; - } - } - - /* - * We can receive messages after we do a cpg_leave but before we - * get our config callback. However, since we can't respond after - * leaving, we simply return. - */ - if (match->state == LEAVING) - return; - - i_am_server = (my_cluster_id == match->lowest_id) ? 1 : 0; - - if (rq->u_rq.request_type == DM_ULOG_CHECKPOINT_READY) { - if (my_cluster_id == rq->originator) { - /* Redundant checkpoints ignored if match->valid */ - LOG_SPRINT(match, "[%s] CHECKPOINT_READY notification from %u", - SHORT_UUID(rq->u_rq.uuid), nodeid); - if (import_checkpoint(match, (match->state != INVALID))) { - LOG_SPRINT(match, - "[%s] Failed to import checkpoint from %u", - SHORT_UUID(rq->u_rq.uuid), nodeid); - LOG_ERROR("[%s] Failed to import checkpoint from %u", - SHORT_UUID(rq->u_rq.uuid), nodeid); - kill(getpid(), SIGUSR1); - /* Could we retry? */ - goto out; - } else if (match->state == INVALID) { - LOG_SPRINT(match, - "[%s] Checkpoint data received from %u. Log is now valid", - SHORT_UUID(match->name.value), nodeid); - LOG_COND(log_checkpoint, - "[%s] Checkpoint data received from %u. Log is now valid", - SHORT_UUID(match->name.value), nodeid); - match->state = VALID; - - flush_startup_list(match); - } else { - LOG_SPRINT(match, - "[%s] Redundant checkpoint from %u ignored.", - SHORT_UUID(rq->u_rq.uuid), nodeid); - } - } - goto out; - } - - if (rq->u_rq.request_type & DM_ULOG_RESPONSE) { - response = 1; - r = handle_cluster_response(match, rq); - } else { - rq->originator = nodeid; - - if (match->state == LEAVING) { - LOG_ERROR("[%s] Ignoring %s from %u. Reason: I'm leaving", - SHORT_UUID(rq->u_rq.uuid), _RQ_TYPE(rq->u_rq.request_type), - rq->originator); - goto out; - } - - if (match->state == INVALID) { - LOG_DBG("Log not valid yet, storing request"); - tmp_rq = malloc(DM_ULOG_REQUEST_SIZE); - if (!tmp_rq) { - LOG_ERROR("cpg_message_callback: Unable to" - " allocate transfer structs"); - r = -ENOMEM; /* FIXME: Better error #? */ - goto out; - } - - memcpy(tmp_rq, rq, sizeof(*rq) + rq->u_rq.data_size); - tmp_rq->pit_server = match->lowest_id; - dm_list_init(&tmp_rq->list); - dm_list_add(&match->startup_list, &tmp_rq->list); - goto out; - } - - r = handle_cluster_request(match, rq, i_am_server); - } - - /* - * If the log is now valid, we can queue the checkpoints - */ - for (i = match->checkpoints_needed; i; ) { - struct checkpoint_data *new; - - if (log_get_state(&rq->u_rq) != LOG_RESUMED) { - LOG_DBG("[%s] Withholding checkpoints until log is valid (%s from %u)", - SHORT_UUID(rq->u_rq.uuid), _RQ_TYPE(rq->u_rq.request_type), nodeid); - break; - } - - i--; - new = prepare_checkpoint(match, match->checkpoint_requesters[i]); - if (!new) { - /* FIXME: Need better error handling */ - LOG_ERROR("[%s] Failed to prepare checkpoint for %u!!!", - SHORT_UUID(rq->u_rq.uuid), match->checkpoint_requesters[i]); - break; - } - LOG_SPRINT(match, "[%s] Checkpoint prepared for %u* (%s)", - SHORT_UUID(rq->u_rq.uuid), match->checkpoint_requesters[i], - (log_get_state(&rq->u_rq) != LOG_RESUMED)? "LOG_RESUMED": "LOG_SUSPENDED"); - LOG_COND(log_checkpoint, "[%s] Checkpoint prepared for %u*", - SHORT_UUID(rq->u_rq.uuid), match->checkpoint_requesters[i]); - match->checkpoints_needed--; - - new->next = match->checkpoint_list; - match->checkpoint_list = new; - } - -out: - /* nothing happens after this point. It is just for debugging */ - if (r) { - LOG_ERROR("[%s] Error while processing CPG message, %s: %s", - SHORT_UUID(rq->u_rq.uuid), - _RQ_TYPE(rq->u_rq.request_type & ~DM_ULOG_RESPONSE), - strerror(-r)); - LOG_ERROR("[%s] Response : %s", SHORT_UUID(rq->u_rq.uuid), - (response) ? "YES" : "NO"); - LOG_ERROR("[%s] Originator: %u", - SHORT_UUID(rq->u_rq.uuid), rq->originator); - if (response) - LOG_ERROR("[%s] Responder : %u", - SHORT_UUID(rq->u_rq.uuid), nodeid); - - LOG_ERROR("HISTORY::"); - for (i = 0; i < DEBUGGING_HISTORY; i++) { - match->idx++; - match->idx = match->idx % DEBUGGING_HISTORY; - if (match->debugging[match->idx][0] == '\0') - continue; - LOG_ERROR("%d:%d) %s", i, match->idx, - match->debugging[match->idx]); - } - } else if (!(rq->u_rq.request_type & DM_ULOG_RESPONSE) || - (rq->originator == my_cluster_id)) { - if (!response) - LOG_SPRINT(match, "SEQ#=%u, UUID=%s, TYPE=%s, ORIG=%u, RESP=%s", - rq->u_rq.seq, SHORT_UUID(rq->u_rq.uuid), - _RQ_TYPE(rq->u_rq.request_type), - rq->originator, (response) ? "YES" : "NO"); - else - LOG_SPRINT(match, "SEQ#=%u, UUID=%s, TYPE=%s, ORIG=%u, RESP=%s, RSPR=%u", - rq->u_rq.seq, SHORT_UUID(rq->u_rq.uuid), - _RQ_TYPE(rq->u_rq.request_type), - rq->originator, (response) ? "YES" : "NO", - nodeid); - } -} - -static void cpg_join_callback(struct clog_cpg *match, - struct cpg_address *joined, - struct cpg_address *member_list, - int member_list_entries) -{ - int i; - int my_pid = getpid(); - uint32_t lowest = match->lowest_id; - struct clog_request *rq; - char dbuf[32]; - - /* Assign my_cluster_id */ - if ((my_cluster_id == 0xDEAD) && (joined->pid == my_pid)) - my_cluster_id = joined->nodeid; - - /* Am I the very first to join? */ - if (member_list_entries == 1) { - match->lowest_id = joined->nodeid; - match->state = VALID; - } - - /* If I am part of the joining list, I do not send checkpoints */ - if (joined->nodeid == my_cluster_id) - goto out; - - memset(dbuf, 0, sizeof(dbuf)); - for (i = 0; i < (member_list_entries-1); i++) - sprintf(dbuf+strlen(dbuf), "%u-", member_list[i].nodeid); - sprintf(dbuf+strlen(dbuf), "(%u)", joined->nodeid); - LOG_COND(log_checkpoint, "[%s] Joining node, %u needs checkpoint [%s]", - SHORT_UUID(match->name.value), joined->nodeid, dbuf); - - /* - * FIXME: remove checkpoint_requesters/checkpoints_needed, and use - * the startup_list interface exclusively - */ - if (dm_list_empty(&match->startup_list) && (match->state == VALID) && - (match->checkpoints_needed < MAX_CHECKPOINT_REQUESTERS)) { - match->checkpoint_requesters[match->checkpoints_needed++] = joined->nodeid; - goto out; - } - - rq = malloc(DM_ULOG_REQUEST_SIZE); - if (!rq) { - LOG_ERROR("cpg_config_callback: " - "Unable to allocate transfer structs"); - LOG_ERROR("cpg_config_callback: " - "Unable to perform checkpoint"); - goto out; - } - rq->u_rq.request_type = DM_ULOG_MEMBER_JOIN; - rq->originator = joined->nodeid; - dm_list_init(&rq->list); - dm_list_add(&match->startup_list, &rq->list); - -out: - /* Find the lowest_id, i.e. the server */ - match->lowest_id = member_list[0].nodeid; - for (i = 0; i < member_list_entries; i++) - if (match->lowest_id > member_list[i].nodeid) - match->lowest_id = member_list[i].nodeid; - - if (lowest == 0xDEAD) - LOG_COND(log_membership_change, "[%s] Server change -> %u (%u %s)", - SHORT_UUID(match->name.value), match->lowest_id, - joined->nodeid, (member_list_entries == 1) ? - "is first to join" : "joined"); - else if (lowest != match->lowest_id) - LOG_COND(log_membership_change, "[%s] Server change %u -> %u (%u joined)", - SHORT_UUID(match->name.value), lowest, - match->lowest_id, joined->nodeid); - else - LOG_COND(log_membership_change, "[%s] Server unchanged at %u (%u joined)", - SHORT_UUID(match->name.value), - lowest, joined->nodeid); - LOG_SPRINT(match, "+++ UUID=%s %u join +++", - SHORT_UUID(match->name.value), joined->nodeid); -} - -static void cpg_leave_callback(struct clog_cpg *match, - struct cpg_address *left, - struct cpg_address *member_list, - int member_list_entries) -{ - int i, j, fd; - uint32_t lowest = match->lowest_id; - struct clog_request *rq, *n; - struct checkpoint_data *p_cp, *c_cp; - - LOG_SPRINT(match, "--- UUID=%s %u left ---", - SHORT_UUID(match->name.value), left->nodeid); - - /* Am I leaving? */ - if (my_cluster_id == left->nodeid) { - LOG_DBG("Finalizing leave..."); - dm_list_del(&match->list); - - cpg_fd_get(match->handle, &fd); - links_unregister(fd); - - cluster_postsuspend(match->name.value, match->luid); - - dm_list_iterate_items_safe(rq, n, &match->working_list) { - dm_list_del(&rq->list); - - if (rq->u_rq.request_type == DM_ULOG_POSTSUSPEND) - kernel_send(&rq->u_rq); - free(rq); - } - - cpg_finalize(match->handle); - - match->free_me = 1; - match->lowest_id = 0xDEAD; - match->state = INVALID; - } - - /* Remove any pending checkpoints for the leaving node. */ - for (p_cp = NULL, c_cp = match->checkpoint_list; - c_cp && (c_cp->requester != left->nodeid); - p_cp = c_cp, c_cp = c_cp->next); - if (c_cp) { - if (p_cp) - p_cp->next = c_cp->next; - else - match->checkpoint_list = c_cp->next; - - LOG_COND(log_checkpoint, - "[%s] Removing pending checkpoint (%u is leaving)", - SHORT_UUID(match->name.value), left->nodeid); - free_checkpoint(c_cp); - } - dm_list_iterate_items_safe(rq, n, &match->startup_list) { - if ((rq->u_rq.request_type == DM_ULOG_MEMBER_JOIN) && - (rq->originator == left->nodeid)) { - LOG_COND(log_checkpoint, - "[%s] Removing pending ckpt from startup list (%u is leaving)", - SHORT_UUID(match->name.value), left->nodeid); - dm_list_del(&rq->list); - free(rq); - } - } - for (i = 0, j = 0; i < match->checkpoints_needed; i++, j++) { - match->checkpoint_requesters[j] = match->checkpoint_requesters[i]; - if (match->checkpoint_requesters[i] == left->nodeid) { - LOG_ERROR("[%s] Removing pending ckpt from needed list (%u is leaving)", - SHORT_UUID(match->name.value), left->nodeid); - j--; - } - } - match->checkpoints_needed = j; - - if (left->nodeid < my_cluster_id) { - match->delay = (match->delay > 0) ? match->delay - 1 : 0; - if (!match->delay && dm_list_empty(&match->working_list)) - match->resend_requests = 0; - LOG_COND(log_resend_requests, "[%s] %u has left, delay = %d%s", - SHORT_UUID(match->name.value), left->nodeid, - match->delay, (dm_list_empty(&match->working_list)) ? - " -- working_list empty": ""); - } - - /* Find the lowest_id, i.e. the server */ - if (!member_list_entries) { - match->lowest_id = 0xDEAD; - LOG_COND(log_membership_change, "[%s] Server change %u -> " - "(%u is last to leave)", - SHORT_UUID(match->name.value), left->nodeid, - left->nodeid); - return; - } - - match->lowest_id = member_list[0].nodeid; - for (i = 0; i < member_list_entries; i++) - if (match->lowest_id > member_list[i].nodeid) - match->lowest_id = member_list[i].nodeid; - - if (lowest != match->lowest_id) { - LOG_COND(log_membership_change, "[%s] Server change %u -> %u (%u left)", - SHORT_UUID(match->name.value), lowest, - match->lowest_id, left->nodeid); - } else - LOG_COND(log_membership_change, "[%s] Server unchanged at %u (%u left)", - SHORT_UUID(match->name.value), lowest, left->nodeid); - - if ((match->state == INVALID) && !match->free_me) { - /* - * If all CPG members are waiting for checkpoints and they - * are all present in my startup_list, then I was the first to - * join and I must assume control. - * - * We do not normally end up here, but if there was a quick - * 'resume -> suspend -> resume' across the cluster, we may - * have initially thought we were not the first to join because - * of the presence of out-going (and unable to respond) members. - */ - - i = 1; /* We do not have a DM_ULOG_MEMBER_JOIN entry of our own */ - dm_list_iterate_items(rq, &match->startup_list) - if (rq->u_rq.request_type == DM_ULOG_MEMBER_JOIN) - i++; - - if (i == member_list_entries) { - /* - * Last node who could have given me a checkpoint just left. - * Setting log state to VALID and acting as 'first join'. - */ - match->state = VALID; - flush_startup_list(match); - } - } -} - -static void cpg_config_callback(cpg_handle_t handle, struct cpg_name *gname, - struct cpg_address *member_list, - int member_list_entries, - struct cpg_address *left_list, - int left_list_entries, - struct cpg_address *joined_list, - int joined_list_entries) -{ - struct clog_cpg *match; - int found = 0; - - dm_list_iterate_items(match, &clog_cpg_list) - if (match->handle == handle) { - found = 1; - break; - } - - if (!found) { - LOG_ERROR("Unable to find match for CPG config callback"); - return; - } - - if ((joined_list_entries + left_list_entries) > 1) - LOG_ERROR("[%s] More than one node joining/leaving", - SHORT_UUID(match->name.value)); - - if (joined_list_entries) - cpg_join_callback(match, joined_list, - member_list, member_list_entries); - else - cpg_leave_callback(match, left_list, - member_list, member_list_entries); -} - -cpg_callbacks_t cpg_callbacks = { - .cpg_deliver_fn = cpg_message_callback, - .cpg_confchg_fn = cpg_config_callback, -}; - -/* - * remove_checkpoint - * @entry - * - * Returns: 1 if checkpoint removed, 0 if no checkpoints, -EXXX on error - */ -int remove_checkpoint(struct clog_cpg *entry) -{ - int len; - SaNameT name; - SaAisErrorT rv; - SaCkptCheckpointHandleT h; - - len = snprintf((char *)(name.value), SA_MAX_NAME_LENGTH, "bitmaps_%s_%u", - SHORT_UUID(entry->name.value), my_cluster_id); - name.length = len; - -open_retry: - rv = saCkptCheckpointOpen(ckpt_handle, &name, NULL, - SA_CKPT_CHECKPOINT_READ, 0, &h); - if (rv == SA_AIS_ERR_TRY_AGAIN) { - LOG_ERROR("abort_startup: ckpt open retry"); - usleep(1000); - goto open_retry; - } - - if (rv != SA_AIS_OK) - return 0; - - LOG_DBG("[%s] Removing checkpoint", SHORT_UUID(entry->name.value)); -unlink_retry: - rv = saCkptCheckpointUnlink(ckpt_handle, &name); - if (rv == SA_AIS_ERR_TRY_AGAIN) { - LOG_ERROR("abort_startup: ckpt unlink retry"); - usleep(1000); - goto unlink_retry; - } - - if (rv != SA_AIS_OK) { - LOG_ERROR("[%s] Failed to unlink checkpoint: %s", - SHORT_UUID(entry->name.value), str_ais_error(rv)); - return -EIO; - } - - saCkptCheckpointClose(h); - - return 1; -} - -int create_cluster_cpg(char *uuid, uint64_t luid) -{ - int r; - int size; - struct clog_cpg *new = NULL; - struct clog_cpg *tmp; - - dm_list_iterate_items(tmp, &clog_cpg_list) - if (!strncmp(tmp->name.value, uuid, CPG_MAX_NAME_LENGTH)) { - LOG_ERROR("Log entry already exists: %s", uuid); - return -EEXIST; - } - - new = malloc(sizeof(*new)); - if (!new) { - LOG_ERROR("Unable to allocate memory for clog_cpg"); - return -ENOMEM; - } - memset(new, 0, sizeof(*new)); - dm_list_init(&new->list); - new->lowest_id = 0xDEAD; - dm_list_init(&new->startup_list); - dm_list_init(&new->working_list); - - size = ((strlen(uuid) + 1) > CPG_MAX_NAME_LENGTH) ? - CPG_MAX_NAME_LENGTH : (strlen(uuid) + 1); - strncpy(new->name.value, uuid, size); - new->name.length = size; - new->luid = luid; - - /* - * Ensure there are no stale checkpoints around before we join - */ - if (remove_checkpoint(new) == 1) - LOG_COND(log_checkpoint, - "[%s] Removing checkpoints left from previous session", - SHORT_UUID(new->name.value)); - - r = cpg_initialize(&new->handle, &cpg_callbacks); - if (r != SA_AIS_OK) { - LOG_ERROR("cpg_initialize failed: Cannot join cluster"); - free(new); - return -EPERM; - } - - r = cpg_join(new->handle, &new->name); - if (r != SA_AIS_OK) { - LOG_ERROR("cpg_join failed: Cannot join cluster"); - free(new); - return -EPERM; - } - - new->cpg_state = VALID; - dm_list_add(&clog_cpg_list, &new->list); - LOG_DBG("New handle: %llu", (unsigned long long)new->handle); - LOG_DBG("New name: %s", new->name.value); - - /* FIXME: better variable */ - cpg_fd_get(new->handle, &r); - links_register(r, "cluster", do_cluster_work, NULL); - - return 0; -} - -static void abort_startup(struct clog_cpg *del) -{ - struct clog_request *rq, *n; - - LOG_DBG("[%s] CPG teardown before checkpoint received", - SHORT_UUID(del->name.value)); - - dm_list_iterate_items_safe(rq, n, &del->startup_list) { - dm_list_del(&rq->list); - - LOG_DBG("[%s] Ignoring request from %u: %s", - SHORT_UUID(del->name.value), rq->originator, - _RQ_TYPE(rq->u_rq.request_type)); - free(rq); - } - - remove_checkpoint(del); -} - -static int _destroy_cluster_cpg(struct clog_cpg *del) -{ - int r; - int state; - - LOG_COND(log_resend_requests, "[%s] I am leaving.2.....", - SHORT_UUID(del->name.value)); - - /* - * We must send any left over checkpoints before - * leaving. If we don't, an incoming node could - * be stuck with no checkpoint and stall. - do_checkpoints(del); --- THIS COULD BE CAUSING OUR PROBLEMS: - - - Incoming node deletes old checkpoints before joining - - A stale checkpoint is issued here by leaving node - - (leaving node leaves) - - Incoming node joins cluster and finds stale checkpoint. - - (leaving node leaves - option 2) - */ - do_checkpoints(del, 1); - - state = del->state; - - del->cpg_state = INVALID; - del->state = LEAVING; - - /* - * If the state is VALID, we might be processing the - * startup list. If so, we certainly don't want to - * clear the startup_list here by calling abort_startup - */ - if (!dm_list_empty(&del->startup_list) && (state != VALID)) - abort_startup(del); - - r = cpg_leave(del->handle, &del->name); - if (r != CPG_OK) - LOG_ERROR("Error leaving CPG!"); - return 0; -} - -int destroy_cluster_cpg(char *uuid) -{ - struct clog_cpg *del, *tmp; - - dm_list_iterate_items_safe(del, tmp, &clog_cpg_list) - if (!strncmp(del->name.value, uuid, CPG_MAX_NAME_LENGTH)) - _destroy_cluster_cpg(del); - - return 0; -} - -int init_cluster(void) -{ - SaAisErrorT rv; - - dm_list_init(&clog_cpg_list); - rv = saCkptInitialize(&ckpt_handle, &callbacks, &version); - - if (rv != SA_AIS_OK) - return EXIT_CLUSTER_CKPT_INIT; - - return 0; -} - -void cleanup_cluster(void) -{ - SaAisErrorT err; - - err = saCkptFinalize(ckpt_handle); - if (err != SA_AIS_OK) - LOG_ERROR("Failed to finalize checkpoint handle"); -} - -void cluster_debug(void) -{ - struct checkpoint_data *cp; - struct clog_cpg *entry; - struct clog_request *rq; - int i; - - LOG_ERROR(""); - LOG_ERROR("CLUSTER COMPONENT DEBUGGING::"); - dm_list_iterate_items(entry, &clog_cpg_list) { - LOG_ERROR("%s::", SHORT_UUID(entry->name.value)); - LOG_ERROR(" lowest_id : %u", entry->lowest_id); - LOG_ERROR(" state : %s", (entry->state == INVALID) ? - "INVALID" : (entry->state == VALID) ? "VALID" : - (entry->state == LEAVING) ? "LEAVING" : "UNKNOWN"); - LOG_ERROR(" cpg_state : %d", entry->cpg_state); - LOG_ERROR(" free_me : %d", entry->free_me); - LOG_ERROR(" delay : %d", entry->delay); - LOG_ERROR(" resend_requests : %d", entry->resend_requests); - LOG_ERROR(" checkpoints_needed: %d", entry->checkpoints_needed); - for (i = 0, cp = entry->checkpoint_list; - i < MAX_CHECKPOINT_REQUESTERS; i++) - if (cp) - cp = cp->next; - else - break; - LOG_ERROR(" CKPTs waiting : %d", i); - LOG_ERROR(" Working list:"); - dm_list_iterate_items(rq, &entry->working_list) - LOG_ERROR(" %s/%u", _RQ_TYPE(rq->u_rq.request_type), - rq->u_rq.seq); - - LOG_ERROR(" Startup list:"); - dm_list_iterate_items(rq, &entry->startup_list) - LOG_ERROR(" %s/%u", _RQ_TYPE(rq->u_rq.request_type), - rq->u_rq.seq); - - LOG_ERROR("Command History:"); - for (i = 0; i < DEBUGGING_HISTORY; i++) { - entry->idx++; - entry->idx = entry->idx % DEBUGGING_HISTORY; - if (entry->debugging[entry->idx][0] == '\0') - continue; - LOG_ERROR("%d:%d) %s", i, entry->idx, - entry->debugging[entry->idx]); - } - } -} diff --git a/daemons/clogd/cluster.h b/daemons/clogd/cluster.h deleted file mode 100644 index 36c3bf955..000000000 --- a/daemons/clogd/cluster.h +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Copyright (C) 2004-2009 Red Hat, Inc. All rights reserved. - * - * This copyrighted material is made available to anyone wishing to use, - * modify, copy, or redistribute it subject to the terms and conditions - * of the GNU Lesser General Public License v.2.1. - * - * You should have received a copy of the GNU Lesser General Public License - * along with this program; if not, write to the Free Software Foundation, - * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - */ -#ifndef __CLUSTER_LOG_CLUSTER_DOT_H__ -#define __CLUSTER_LOG_CLUSTER_DOT_H__ - -#include "libdevmapper.h" -#include "dm-log-userspace.h" - -/* - * There is other information in addition to what can - * be found in the dm_ulog_request structure that we - * need for processing. 'clog_request' is the wrapping - * structure we use to make the additional fields - * available. - */ -struct clog_request { - struct dm_list list; - - /* - * 'originator' is the machine from which the requests - * was made. - */ - uint32_t originator; - - /* - * 'pit_server' is the "point-in-time" server for the - * request. (I.e. The machine that was the server at - * the time the request was issued - only important during - * startup. - */ - uint32_t pit_server; - - /* - * The request from the kernel that is being processed - */ - struct dm_ulog_request u_rq; -}; - -int init_cluster(void); -void cleanup_cluster(void); -void cluster_debug(void); - -int create_cluster_cpg(char *uuid, uint64_t luid); -int destroy_cluster_cpg(char *uuid); - -int cluster_send(struct clog_request *rq); - -#endif /* __CLUSTER_LOG_CLUSTER_DOT_H__ */ diff --git a/daemons/clogd/common.h b/daemons/clogd/common.h deleted file mode 100644 index 5498a9ca5..000000000 --- a/daemons/clogd/common.h +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Copyright (C) 2004-2009 Red Hat, Inc. All rights reserved. - * - * This copyrighted material is made available to anyone wishing to use, - * modify, copy, or redistribute it subject to the terms and conditions - * of the GNU Lesser General Public License v.2.1. - * - * You should have received a copy of the GNU Lesser General Public License - * along with this program; if not, write to the Free Software Foundation, - * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - */ -#ifndef __CLUSTER_LOG_COMMON_DOT_H__ -#define __CLUSTER_LOG_COMMON_DOT_H__ - -/* -#define EXIT_SUCCESS 0 -#define EXIT_FAILURE 1 -*/ - -#define EXIT_LOCKFILE 2 - -#define EXIT_KERNEL_SOCKET 3 /* Failed netlink socket create */ -#define EXIT_KERNEL_BIND 4 -#define EXIT_KERNEL_SETSOCKOPT 5 - -#define EXIT_CLUSTER_CKPT_INIT 6 /* Failed to init checkpoint */ - -#define EXIT_QUEUE_NOMEM 7 - - -#define DM_ULOG_REQUEST_SIZE 1024 - -#endif /* __CLUSTER_LOG_COMMON_DOT_H__ */ diff --git a/daemons/clogd/functions.c b/daemons/clogd/functions.c deleted file mode 100644 index c42f2f426..000000000 --- a/daemons/clogd/functions.c +++ /dev/null @@ -1,1863 +0,0 @@ -/* - * Copyright (C) 2004-2009 Red Hat, Inc. All rights reserved. - * - * This copyrighted material is made available to anyone wishing to use, - * modify, copy, or redistribute it subject to the terms and conditions - * of the GNU Lesser General Public License v.2.1. - * - * You should have received a copy of the GNU Lesser General Public License - * along with this program; if not, write to the Free Software Foundation, - * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - */ -#define _GNU_SOURCE -#define _FILE_OFFSET_BITS 64 - -#include -#include -#include -#include -#include -#include -#include -#include -#include -//#define __USE_GNU /* for O_DIRECT */ -#include -#include -#include "libdevmapper.h" -#include "dm-log-userspace.h" -#include "functions.h" -#include "common.h" -#include "cluster.h" -#include "logging.h" - -#define BYTE_SHIFT 3 - -/* - * Magic for persistent mirrors: "MiRr" - * Following on-disk header information is stolen from - * drivers/md/dm-log.c - */ -#define MIRROR_MAGIC 0x4D695272 -#define MIRROR_DISK_VERSION 2 -#define LOG_OFFSET 2 - -#define RESYNC_HISTORY 50 -//static char resync_history[RESYNC_HISTORY][128]; -//static int idx = 0; -#define LOG_SPRINT(_lc, f, arg...) do { \ - lc->idx++; \ - lc->idx = lc->idx % RESYNC_HISTORY; \ - sprintf(lc->resync_history[lc->idx], f, ## arg); \ - } while (0) - -struct log_header { - uint32_t magic; - uint32_t version; - uint64_t nr_regions; -}; - -struct log_c { - struct dm_list list; - - char uuid[DM_UUID_LEN]; - uint64_t luid; - - time_t delay; /* limits how fast a resume can happen after suspend */ - int touched; - uint32_t region_size; - uint32_t region_count; - uint64_t sync_count; - - dm_bitset_t clean_bits; - dm_bitset_t sync_bits; - uint32_t recoverer; - uint64_t recovering_region; /* -1 means not recovering */ - uint64_t skip_bit_warning; /* used to warn if region skipped */ - int sync_search; - - int resume_override; - - uint32_t block_on_error; - enum sync { - DEFAULTSYNC, /* Synchronize if necessary */ - NOSYNC, /* Devices known to be already in sync */ - FORCESYNC, /* Force a sync to happen */ - } sync; - - uint32_t state; /* current operational state of the log */ - - struct dm_list mark_list; - - uint32_t recovery_halted; - struct recovery_request *recovery_request_list; - - int disk_fd; /* -1 means no disk log */ - int log_dev_failed; - uint64_t disk_nr_regions; - size_t disk_size; /* size of disk_buffer in bytes */ - void *disk_buffer; /* aligned memory for O_DIRECT */ - int idx; - char resync_history[RESYNC_HISTORY][128]; -}; - -struct mark_entry { - struct dm_list list; - uint32_t nodeid; - uint64_t region; -}; - -struct recovery_request { - uint64_t region; - struct recovery_request *next; -}; - -static DM_LIST_INIT(log_list); -static DM_LIST_INIT(log_pending_list); - -static int log_test_bit(dm_bitset_t bs, int bit) -{ - return dm_bit(bs, bit); -} - -static void log_set_bit(struct log_c *lc, dm_bitset_t bs, int bit) -{ - dm_bit_set(bs, bit); - lc->touched = 1; -} - -static void log_clear_bit(struct log_c *lc, dm_bitset_t bs, int bit) -{ - dm_bit_clear(bs, bit); - lc->touched = 1; -} - -static int find_next_zero_bit(dm_bitset_t bs, int start) -{ - while (dm_bit(bs, start++)) - if (start >= (int)bs[0]) - return -1; - - return start - 1; -} - -static uint64_t count_bits32(dm_bitset_t bs) -{ - int i, size = ((int)bs[0]/DM_BITS_PER_INT + 1); - unsigned count = 0; - - for (i = 1; i <= size; i++) - count += hweight32(bs[i]); - - return (uint64_t)count; -} - -/* - * get_log - * - * Returns: log if found, NULL otherwise - */ -static struct log_c *get_log(const char *uuid, uint64_t luid) -{ - struct log_c *lc; - - dm_list_iterate_items(lc, &log_list) - if (!strcmp(lc->uuid, uuid) && - (!luid || (luid == lc->luid))) - return lc; - - return NULL; -} - -/* - * get_pending_log - * - * Pending logs are logs that have been 'clog_ctr'ed, but - * have not joined the CPG (via clog_resume). - * - * Returns: log if found, NULL otherwise - */ -static struct log_c *get_pending_log(const char *uuid, uint64_t luid) -{ - struct log_c *lc; - - dm_list_iterate_items(lc, &log_pending_list) - if (!strcmp(lc->uuid, uuid) && - (!luid || (luid == lc->luid))) - return lc; - - return NULL; -} - -static void header_to_disk(struct log_header *mem, struct log_header *disk) -{ - memcpy(disk, mem, sizeof(struct log_header)); -} - -static void header_from_disk(struct log_header *mem, struct log_header *disk) -{ - memcpy(mem, disk, sizeof(struct log_header)); -} - -static int rw_log(struct log_c *lc, int do_write) -{ - int r; - - r = lseek(lc->disk_fd, 0, SEEK_SET); - if (r < 0) { - LOG_ERROR("[%s] rw_log: lseek failure: %s", - SHORT_UUID(lc->uuid), strerror(errno)); - return -errno; - } - - if (do_write) { - r = write(lc->disk_fd, lc->disk_buffer, lc->disk_size); - if (r < 0) { - LOG_ERROR("[%s] rw_log: write failure: %s", - SHORT_UUID(lc->uuid), strerror(errno)); - return -EIO; /* Failed disk write */ - } - return 0; - } - - /* Read */ - r = read(lc->disk_fd, lc->disk_buffer, lc->disk_size); - if (r < 0) - LOG_ERROR("[%s] rw_log: read failure: %s", - SHORT_UUID(lc->uuid), strerror(errno)); - if (r != lc->disk_size) - return -EIO; /* Failed disk read */ - return 0; -} - -/* - * read_log - * @lc - * - * Valid return codes: - * -EINVAL: Invalid header, bits not copied - * -EIO: Unable to read disk log - * 0: Valid header, disk bit -> lc->clean_bits - * - * Returns: 0 on success, -EXXX on failure - */ -static int read_log(struct log_c *lc) -{ - struct log_header lh; - size_t bitset_size; - - memset(&lh, 0, sizeof(struct log_header)); - - if (rw_log(lc, 0)) - return -EIO; /* Failed disk read */ - - header_from_disk(&lh, lc->disk_buffer); - if (lh.magic != MIRROR_MAGIC) - return -EINVAL; - - lc->disk_nr_regions = lh.nr_regions; - - /* Read disk bits into sync_bits */ - bitset_size = lc->region_count / 8; - bitset_size += (lc->region_count % 8) ? 1 : 0; - memcpy(lc->clean_bits, lc->disk_buffer + 1024, bitset_size); - - return 0; -} - -/* - * write_log - * @lc - * - * Returns: 0 on success, -EIO on failure - */ -static int write_log(struct log_c *lc) -{ - struct log_header lh; - size_t bitset_size; - - lh.magic = MIRROR_MAGIC; - lh.version = MIRROR_DISK_VERSION; - lh.nr_regions = lc->region_count; - - header_to_disk(&lh, lc->disk_buffer); - - /* Write disk bits from clean_bits */ - bitset_size = lc->region_count / 8; - bitset_size += (lc->region_count % 8) ? 1 : 0; - memcpy(lc->disk_buffer + 1024, lc->clean_bits, bitset_size); - - if (rw_log(lc, 1)) { - lc->log_dev_failed = 1; - return -EIO; /* Failed disk write */ - } - return 0; -} - -static int find_disk_path(char *major_minor_str, char *path_rtn, int *unlink_path) -{ - int r; - DIR *dp; - struct dirent *dep; - struct stat statbuf; - int major, minor; - - if (!strstr(major_minor_str, ":")) { - r = stat(major_minor_str, &statbuf); - if (r) - return -errno; - if (!S_ISBLK(statbuf.st_mode)) - return -EINVAL; - sprintf(path_rtn, "%s", major_minor_str); - return 0; - } - - r = sscanf(major_minor_str, "%d:%d", &major, &minor); - if (r != 2) - return -EINVAL; - - LOG_DBG("Checking /dev/mapper for device %d:%d", major, minor); - /* Check /dev/mapper dir */ - dp = opendir("/dev/mapper"); - if (!dp) - return -ENOENT; - - while ((dep = readdir(dp)) != NULL) { - /* - * FIXME: This is racy. By the time the path is used, - * it may point to something else. 'fstat' will be - * required upon opening to ensure we got what we - * wanted. - */ - - sprintf(path_rtn, "/dev/mapper/%s", dep->d_name); - stat(path_rtn, &statbuf); - if (S_ISBLK(statbuf.st_mode) && - (major(statbuf.st_rdev) == major) && - (minor(statbuf.st_rdev) == minor)) { - LOG_DBG(" %s: YES", dep->d_name); - closedir(dp); - return 0; - } else { - LOG_DBG(" %s: NO", dep->d_name); - } - } - - closedir(dp); - - LOG_DBG("Path not found for %d/%d", major, minor); - LOG_DBG("Creating /dev/mapper/%d-%d", major, minor); - sprintf(path_rtn, "/dev/mapper/%d-%d", major, minor); - r = mknod(path_rtn, S_IFBLK | S_IRUSR | S_IWUSR, MKDEV(major, minor)); - - /* - * If we have to make the path, we unlink it after we open it - */ - *unlink_path = 1; - - return r ? -errno : 0; -} - -static int _clog_ctr(char *uuid, uint64_t luid, - int argc, char **argv, uint64_t device_size) -{ - int i; - int r = 0; - char *p; - uint64_t region_size; - uint64_t region_count; - struct log_c *lc = NULL; - struct log_c *duplicate; - enum sync sync = DEFAULTSYNC; - uint32_t block_on_error = 0; - - int disk_log = 0; - char disk_path[128]; - int unlink_path = 0; - size_t page_size; - int pages; - - /* If core log request, then argv[0] will be region_size */ - if (!strtoll(argv[0], &p, 0) || *p) { - disk_log = 1; - - if ((argc < 2) || (argc > 4)) { - LOG_ERROR("Too %s arguments to clustered_disk log type", - (argc < 3) ? "few" : "many"); - r = -EINVAL; - goto fail; - } - - r = find_disk_path(argv[0], disk_path, &unlink_path); - if (r) { - LOG_ERROR("Unable to find path to device %s", argv[0]); - goto fail; - } - LOG_DBG("Clustered log disk is %s", disk_path); - } else { - disk_log = 0; - - if ((argc < 1) || (argc > 3)) { - LOG_ERROR("Too %s arguments to clustered_core log type", - (argc < 2) ? "few" : "many"); - r = -EINVAL; - goto fail; - } - } - - if (!(region_size = strtoll(argv[disk_log], &p, 0)) || *p) { - LOG_ERROR("Invalid region_size argument to clustered_%s log type", - (disk_log) ? "disk" : "core"); - r = -EINVAL; - goto fail; - } - - region_count = device_size / region_size; - if (device_size % region_size) { - /* - * I can't remember if device_size must be a multiple - * of region_size, so check it anyway. - */ - region_count++; - } - - for (i = 0; i < argc; i++) { - if (!strcmp(argv[i], "sync")) - sync = FORCESYNC; - else if (!strcmp(argv[i], "nosync")) - sync = NOSYNC; - else if (!strcmp(argv[i], "block_on_error")) - block_on_error = 1; - } - - lc = malloc(sizeof(*lc)); - if (!lc) { - LOG_ERROR("Unable to allocate cluster log context"); - r = -ENOMEM; - goto fail; - } - memset(lc, 0, sizeof(*lc)); - - lc->region_size = region_size; - lc->region_count = region_count; - lc->sync = sync; - lc->block_on_error = block_on_error; - lc->sync_search = 0; - lc->recovering_region = (uint64_t)-1; - lc->skip_bit_warning = region_count; - lc->disk_fd = -1; - lc->log_dev_failed = 0; - strncpy(lc->uuid, uuid, DM_UUID_LEN); - lc->luid = luid; - - if ((duplicate = get_log(lc->uuid, lc->luid)) || - (duplicate = get_pending_log(lc->uuid, lc->luid))) { - LOG_ERROR("[%s/%llu] Log already exists, unable to create.", - SHORT_UUID(lc->uuid), lc->luid); - free(lc); - return -EINVAL; - } - - dm_list_init(&lc->mark_list); - - lc->clean_bits = dm_bitset_create(NULL, region_count); - if (!lc->clean_bits) { - LOG_ERROR("Unable to allocate clean bitset"); - r = -ENOMEM; - goto fail; - } - - lc->sync_bits = dm_bitset_create(NULL, region_count); - if (!lc->sync_bits) { - LOG_ERROR("Unable to allocate sync bitset"); - r = -ENOMEM; - goto fail; - } - if (sync == NOSYNC) - dm_bit_set_all(lc->sync_bits); - - lc->sync_count = (sync == NOSYNC) ? region_count : 0; - if (disk_log) { - page_size = sysconf(_SC_PAGESIZE); - pages = ((int)lc->clean_bits[0])/page_size; - pages += ((int)lc->clean_bits[0])%page_size ? 1 : 0; - pages += 1; /* for header */ - - r = open(disk_path, O_RDWR | O_DIRECT); - if (r < 0) { - LOG_ERROR("Unable to open log device, %s: %s", - disk_path, strerror(errno)); - r = errno; - goto fail; - } - if (unlink_path) - unlink(disk_path); - - lc->disk_fd = r; - lc->disk_size = pages * page_size; - - r = posix_memalign(&(lc->disk_buffer), page_size, - lc->disk_size); - if (r) { - LOG_ERROR("Unable to allocate memory for disk_buffer"); - goto fail; - } - memset(lc->disk_buffer, 0, lc->disk_size); - LOG_DBG("Disk log ready"); - } - - dm_list_add(&log_pending_list, &lc->list); - - return 0; -fail: - if (lc) { - if (lc->clean_bits) - free(lc->clean_bits); - if (lc->sync_bits) - free(lc->sync_bits); - if (lc->disk_buffer) - free(lc->disk_buffer); - if (lc->disk_fd >= 0) - close(lc->disk_fd); - free(lc); - } - return r; -} - -/* - * clog_ctr - * @rq - * - * rq->data should contain constructor string as follows: - * [disk] [[no]sync] - * The kernel is responsible for adding the argument - * to the end; otherwise, we cannot compute the region_count. - * - * FIXME: Currently relies on caller to fill in rq->error - */ -static int clog_dtr(struct dm_ulog_request *rq); -static int clog_ctr(struct dm_ulog_request *rq) -{ - int argc, i, r = 0; - char *p, **argv = NULL; - char *dev_size_str; - uint64_t device_size; - - /* Sanity checks */ - if (!rq->data_size) { - LOG_ERROR("Received constructor request with no data"); - return -EINVAL; - } - - if (strlen(rq->data) > rq->data_size) { - LOG_ERROR("Received constructor request with bad data"); - LOG_ERROR("strlen(rq->data)[%d] != rq->data_size[%llu]", - (int)strlen(rq->data), - (unsigned long long)rq->data_size); - LOG_ERROR("rq->data = '%s' [%d]", - rq->data, (int)strlen(rq->data)); - return -EINVAL; - } - - /* Split up args */ - for (argc = 0, p = rq->data; (p = strstr(p, " ")); p++, argc++) - *p = '\0'; - - argv = malloc(argc * sizeof(char *)); - if (!argv) - return -ENOMEM; - - p = dev_size_str = rq->data; - p += strlen(p) + 1; - for (i = 0; i < argc; i++, p = p + strlen(p) + 1) - argv[i] = p; - - if (strcmp(argv[0], "clustered_disk") && - strcmp(argv[0], "clustered_core")) { - LOG_ERROR("Unsupported userspace log type, \"%s\"", argv[0]); - free(argv); - return -EINVAL; - } - - if (!(device_size = strtoll(dev_size_str, &p, 0)) || *p) { - LOG_ERROR("Invalid device size argument: %s", dev_size_str); - free(argv); - return -EINVAL; - } - - r = _clog_ctr(rq->uuid, rq->luid, argc - 1, argv + 1, device_size); - - /* We join the CPG when we resume */ - - /* No returning data */ - rq->data_size = 0; - - if (r) { - LOG_ERROR("Failed to create cluster log (%s)", rq->uuid); - for (i = 0; i < argc; i++) - LOG_ERROR("argv[%d] = %s", i, argv[i]); - } - else - LOG_DBG("[%s] Cluster log created", - SHORT_UUID(rq->uuid)); - - free(argv); - return r; -} - -/* - * clog_dtr - * @rq - * - */ -static int clog_dtr(struct dm_ulog_request *rq) -{ - struct log_c *lc = get_log(rq->uuid, rq->luid); - - if (lc) { - /* - * The log should not be on the official list. There - * should have been a suspend first. - */ - LOG_ERROR("[%s] DTR before SUS: leaving CPG", - SHORT_UUID(rq->uuid)); - destroy_cluster_cpg(rq->uuid); - } else if (!(lc = get_pending_log(rq->uuid, rq->luid))) { - LOG_ERROR("clog_dtr called on log that is not official or pending"); - return -EINVAL; - } - - LOG_DBG("[%s] Cluster log removed", SHORT_UUID(lc->uuid)); - - dm_list_del(&lc->list); - if (lc->disk_fd != -1) - close(lc->disk_fd); - if (lc->disk_buffer) - free(lc->disk_buffer); - free(lc->clean_bits); - free(lc->sync_bits); - free(lc); - - return 0; -} - -/* - * clog_presuspend - * @rq - * - */ -static int clog_presuspend(struct dm_ulog_request *rq) -{ - struct log_c *lc = get_log(rq->uuid, rq->luid); - - if (!lc) - return -EINVAL; - - if (lc->touched) - LOG_DBG("WARNING: log still marked as 'touched' during suspend"); - - lc->recovery_halted = 1; - - return 0; -} - -/* - * clog_postsuspend - * @rq - * - */ -static int clog_postsuspend(struct dm_ulog_request *rq) -{ - struct log_c *lc = get_log(rq->uuid, rq->luid); - - if (!lc) - return -EINVAL; - - LOG_DBG("[%s] clog_postsuspend: leaving CPG", SHORT_UUID(lc->uuid)); - destroy_cluster_cpg(rq->uuid); - - lc->state = LOG_SUSPENDED; - lc->recovering_region = (uint64_t)-1; - lc->recoverer = (uint32_t)-1; - lc->delay = time(NULL); - - return 0; -} - -/* - * cluster_postsuspend - * @rq - * - */ -int cluster_postsuspend(char *uuid, uint64_t luid) -{ - struct log_c *lc = get_log(uuid, luid); - - if (!lc) - return -EINVAL; - - LOG_DBG("[%s] clog_postsuspend: finalizing", SHORT_UUID(lc->uuid)); - lc->resume_override = 0; - - /* move log to pending list */ - dm_list_del(&lc->list); - dm_list_add(&log_pending_list, &lc->list); - - return 0; -} - -/* - * clog_resume - * @rq - * - * Does the main work of resuming. - */ -static int clog_resume(struct dm_ulog_request *rq) -{ - uint32_t i; - int commit_log = 0; - struct log_c *lc = get_log(rq->uuid, rq->luid); - - if (!lc) - return -EINVAL; - - switch (lc->resume_override) { - case 1000: - LOG_ERROR("[%s] Additional resume issued before suspend", - SHORT_UUID(rq->uuid)); -#ifdef DEBUG - kill(getpid(), SIGUSR1); -#endif - return 0; - case 0: - lc->resume_override = 1000; - if (lc->disk_fd == -1) { - LOG_DBG("[%s] Master resume.", - SHORT_UUID(lc->uuid)); - goto no_disk; - } - - LOG_DBG("[%s] Master resume: reading disk log", - SHORT_UUID(lc->uuid)); - commit_log = 1; - break; - case 1: - LOG_ERROR("Error:: partial bit loading (just sync_bits)"); - return -EINVAL; - case 2: - LOG_ERROR("Error:: partial bit loading (just clean_bits)"); - return -EINVAL; - case 3: - LOG_DBG("[%s] Non-master resume: bits pre-loaded", - SHORT_UUID(lc->uuid)); - lc->resume_override = 1000; - goto out; - default: - LOG_ERROR("Error:: multiple loading of bits (%d)", - lc->resume_override); - return -EINVAL; - } - - if (lc->log_dev_failed) { - LOG_ERROR("Log device has failed, unable to read bits"); - rq->error = 0; /* We can handle this so far */ - lc->disk_nr_regions = 0; - } else - rq->error = read_log(lc); - - switch (rq->error) { - case 0: - if (lc->disk_nr_regions < lc->region_count) - LOG_DBG("[%s] Mirror has grown, updating log bits", - SHORT_UUID(lc->uuid)); - else if (lc->disk_nr_regions > lc->region_count) - LOG_DBG("[%s] Mirror has shrunk, updating log bits", - SHORT_UUID(lc->uuid)); - break; - case -EINVAL: - LOG_DBG("[%s] (Re)initializing mirror log - resync issued.", - SHORT_UUID(lc->uuid)); - lc->disk_nr_regions = 0; - break; - default: - LOG_ERROR("Failed to read disk log"); - lc->disk_nr_regions = 0; - break; - } - -no_disk: - /* If mirror has grown, set bits appropriately */ - if (lc->sync == NOSYNC) - for (i = lc->disk_nr_regions; i < lc->region_count; i++) - log_set_bit(lc, lc->clean_bits, i); - else - for (i = lc->disk_nr_regions; i < lc->region_count; i++) - log_clear_bit(lc, lc->clean_bits, i); - - /* Clear any old bits if device has shrunk */ - for (i = lc->region_count; i % 32; i++) - log_clear_bit(lc, lc->clean_bits, i); - - /* copy clean across to sync */ - dm_bit_copy(lc->sync_bits, lc->clean_bits); - - if (commit_log && (lc->disk_fd >= 0)) { - rq->error = write_log(lc); - if (rq->error) - LOG_ERROR("Failed initial disk log write"); - else - LOG_DBG("Disk log initialized"); - lc->touched = 0; - } -out: - /* - * Clear any old bits if device has shrunk - necessary - * for non-master resume - */ - for (i = lc->region_count; i % 32; i++) { - log_clear_bit(lc, lc->clean_bits, i); - log_clear_bit(lc, lc->sync_bits, i); - } - - lc->sync_count = count_bits32(lc->sync_bits); - - LOG_SPRINT(lc, "[%s] Initial sync_count = %llu", - SHORT_UUID(lc->uuid), (unsigned long long)lc->sync_count); - lc->sync_search = 0; - lc->state = LOG_RESUMED; - lc->recovery_halted = 0; - - return rq->error; -} - -/* - * local_resume - * @rq - * - * If the log is pending, we must first join the cpg and - * put the log in the official list. - * - */ -int local_resume(struct dm_ulog_request *rq) -{ - int r; - time_t t; - struct log_c *lc = get_log(rq->uuid, rq->luid); - - if (!lc) { - /* Is the log in the pending list? */ - lc = get_pending_log(rq->uuid, rq->luid); - if (!lc) { - LOG_ERROR("clog_resume called on log that is not official or pending"); - return -EINVAL; - } - - t = time(NULL); - t -= lc->delay; - /* - * This should be considered a temporary fix. It addresses - * a problem that exists when nodes suspend/resume in rapid - * succession. While the problem is very rare, it has been - * seen to happen in real-world-like testing. - * - * The problem: - * - Node A joins cluster - * - Node B joins cluster - * - Node A prepares checkpoint - * - Node A gets ready to write checkpoint - * - Node B leaves - * - Node B joins - * - Node A finishes write of checkpoint - * - Node B receives checkpoint meant for previous session - * -- Node B can now be non-coherent - * - * This timer will solve the problem for now, but could be - * replaced by a generation number sent with the resume - * command from the kernel. The generation number would - * be included in the name of the checkpoint to prevent - * reading stale data. - */ - if ((t < 3) && (t >= 0)) - sleep(3 - t); - - /* Join the CPG */ - r = create_cluster_cpg(rq->uuid, rq->luid); - if (r) { - LOG_ERROR("clog_resume: Failed to create cluster CPG"); - return r; - } - - /* move log to official list */ - dm_list_del(&lc->list); - dm_list_add(&log_list, &lc->list); - } - - return 0; -} - -/* - * clog_get_region_size - * @rq - * - * Since this value doesn't change, the kernel - * should not need to talk to server to get this - * The function is here for completness - * - * Returns: 0 on success, -EXXX on failure - */ -static int clog_get_region_size(struct dm_ulog_request *rq) -{ - uint64_t *rtn = (uint64_t *)rq->data; - struct log_c *lc = get_log(rq->uuid, rq->luid); - - if (!lc && !(lc = get_pending_log(rq->uuid, rq->luid))) - return -EINVAL; - - *rtn = lc->region_size; - rq->data_size = sizeof(*rtn); - - return 0; -} - -/* - * clog_is_clean - * @rq - * - * Returns: 1 if clean, 0 otherwise - */ -static int clog_is_clean(struct dm_ulog_request *rq) -{ - int64_t *rtn = (int64_t *)rq->data; - uint64_t region = *((uint64_t *)(rq->data)); - struct log_c *lc = get_log(rq->uuid, rq->luid); - - if (!lc) - return -EINVAL; - - *rtn = log_test_bit(lc->clean_bits, region); - rq->data_size = sizeof(*rtn); - - return 0; -} - -/* - * clog_in_sync - * @rq - * - * We ignore any request for non-block. That - * should be handled elsewhere. (If the request - * has come this far, it has already blocked.) - * - * Returns: 1 if in-sync, 0 otherwise - */ -static int clog_in_sync(struct dm_ulog_request *rq) -{ - int64_t *rtn = (int64_t *)rq->data; - uint64_t region = *((uint64_t *)(rq->data)); - struct log_c *lc = get_log(rq->uuid, rq->luid); - - if (!lc) - return -EINVAL; - - if (region > lc->region_count) - return -EINVAL; - - *rtn = log_test_bit(lc->sync_bits, region); - if (*rtn) - LOG_DBG("[%s] Region is in-sync: %llu", - SHORT_UUID(lc->uuid), (unsigned long long)region); - else - LOG_DBG("[%s] Region is not in-sync: %llu", - SHORT_UUID(lc->uuid), (unsigned long long)region); - - rq->data_size = sizeof(*rtn); - - return 0; -} - -/* - * clog_flush - * @rq - * - */ -static int clog_flush(struct dm_ulog_request *rq, int server) -{ - int r = 0; - struct log_c *lc = get_log(rq->uuid, rq->luid); - - if (!lc) - return -EINVAL; - - if (!lc->touched) - return 0; - - /* - * Do the actual flushing of the log only - * if we are the server. - */ - if (server && (lc->disk_fd >= 0)) { - r = rq->error = write_log(lc); - if (r) - LOG_ERROR("[%s] Error writing to disk log", - SHORT_UUID(lc->uuid)); - else - LOG_DBG("[%s] Disk log written", SHORT_UUID(lc->uuid)); - } - - lc->touched = 0; - - return r; - -} - -/* - * mark_region - * @lc - * @region - * @who - * - * Put a mark region request in the tree for tracking. - * - * Returns: 0 on success, -EXXX on error - */ -static int mark_region(struct log_c *lc, uint64_t region, uint32_t who) -{ - int found = 0; - struct mark_entry *m; - - dm_list_iterate_items(m, &lc->mark_list) - if (m->region == region) { - found = 1; - if (m->nodeid == who) - return 0; - } - - if (!found) - log_clear_bit(lc, lc->clean_bits, region); - - /* - * Save allocation until here - if there is a failure, - * at least we have cleared the bit. - */ - m = malloc(sizeof(*m)); - if (!m) { - LOG_ERROR("Unable to allocate space for mark_entry: %llu/%u", - (unsigned long long)region, who); - return -ENOMEM; - } - - m->nodeid = who; - m->region = region; - dm_list_add(&lc->mark_list, &m->list); - - return 0; -} - -/* - * clog_mark_region - * @rq - * - * rq may contain more than one mark request. We - * can determine the number from the 'data_size' field. - * - * Returns: 0 on success, -EXXX on failure - */ -static int clog_mark_region(struct dm_ulog_request *rq, uint32_t originator) -{ - int r; - int count; - uint64_t *region; - struct log_c *lc = get_log(rq->uuid, rq->luid); - - if (!lc) - return -EINVAL; - - if (rq->data_size % sizeof(uint64_t)) { - LOG_ERROR("Bad data size given for mark_region request"); - return -EINVAL; - } - - count = rq->data_size / sizeof(uint64_t); - region = (uint64_t *)&rq->data; - - for (; count > 0; count--, region++) { - r = mark_region(lc, *region, originator); - if (r) - return r; - } - - rq->data_size = 0; - - return 0; -} - -static int clear_region(struct log_c *lc, uint64_t region, uint32_t who) -{ - int other_matches = 0; - struct mark_entry *m, *n; - - dm_list_iterate_items_safe(m, n, &lc->mark_list) - if (m->region == region) { - if (m->nodeid == who) { - dm_list_del(&m->list); - free(m); - } else - other_matches = 1; - } - - /* - * Clear region if: - * 1) It is in-sync - * 2) There are no other machines that have it marked - */ - if (!other_matches && log_test_bit(lc->sync_bits, region)) - log_set_bit(lc, lc->clean_bits, region); - - return 0; -} - -/* - * clog_clear_region - * @rq - * - * rq may contain more than one clear request. We - * can determine the number from the 'data_size' field. - * - * Returns: 0 on success, -EXXX on failure - */ -static int clog_clear_region(struct dm_ulog_request *rq, uint32_t originator) -{ - int r; - int count; - uint64_t *region; - struct log_c *lc = get_log(rq->uuid, rq->luid); - - if (!lc) - return -EINVAL; - - if (rq->data_size % sizeof(uint64_t)) { - LOG_ERROR("Bad data size given for clear_region request"); - return -EINVAL; - } - - count = rq->data_size / sizeof(uint64_t); - region = (uint64_t *)&rq->data; - - for (; count > 0; count--, region++) { - r = clear_region(lc, *region, originator); - if (r) - return r; - } - - rq->data_size = 0; - - return 0; -} - -/* - * clog_get_resync_work - * @rq - * - */ -static int clog_get_resync_work(struct dm_ulog_request *rq, uint32_t originator) -{ - struct { - int64_t i; - uint64_t r; - } *pkg = (void *)rq->data; - struct log_c *lc = get_log(rq->uuid, rq->luid); - - if (!lc) - return -EINVAL; - - rq->data_size = sizeof(*pkg); - pkg->i = 0; - - if (lc->sync_search >= lc->region_count) { - /* - * FIXME: handle intermittent errors during recovery - * by resetting sync_search... but not to many times. - */ - LOG_SPRINT(lc, "GET - SEQ#=%u, UUID=%s, nodeid = %u:: " - "Recovery finished", - rq->seq, SHORT_UUID(lc->uuid), originator); - return 0; - } - - if (lc->recovering_region != (uint64_t)-1) { - if (lc->recoverer == originator) { - LOG_SPRINT(lc, "GET - SEQ#=%u, UUID=%s, nodeid = %u:: " - "Re-requesting work (%llu)", - rq->seq, SHORT_UUID(lc->uuid), originator, - (unsigned long long)lc->recovering_region); - pkg->r = lc->recovering_region; - pkg->i = 1; - LOG_COND(log_resend_requests, "***** RE-REQUEST *****"); - } else { - LOG_SPRINT(lc, "GET - SEQ#=%u, UUID=%s, nodeid = %u:: " - "Someone already recovering (%llu)", - rq->seq, SHORT_UUID(lc->uuid), originator, - (unsigned long long)lc->recovering_region); - } - - return 0; - } - - while (lc->recovery_request_list) { - struct recovery_request *del; - - del = lc->recovery_request_list; - lc->recovery_request_list = del->next; - - pkg->r = del->region; - free(del); - - if (!log_test_bit(lc->sync_bits, pkg->r)) { - LOG_SPRINT(lc, "GET - SEQ#=%u, UUID=%s, nodeid = %u:: " - "Assigning priority resync work (%llu)", - rq->seq, SHORT_UUID(lc->uuid), originator, - (unsigned long long)pkg->r); - pkg->i = 1; - lc->recovering_region = pkg->r; - lc->recoverer = originator; - return 0; - } - } - - pkg->r = find_next_zero_bit(lc->sync_bits, - lc->sync_search); - - if (pkg->r >= lc->region_count) { - LOG_SPRINT(lc, "GET - SEQ#=%u, UUID=%s, nodeid = %u:: " - "Resync work complete.", - rq->seq, SHORT_UUID(lc->uuid), originator); - return 0; - } - - lc->sync_search = pkg->r + 1; - - LOG_SPRINT(lc, "GET - SEQ#=%u, UUID=%s, nodeid = %u:: " - "Assigning resync work (%llu)", - rq->seq, SHORT_UUID(lc->uuid), originator, - (unsigned long long)pkg->r); - pkg->i = 1; - lc->recovering_region = pkg->r; - lc->recoverer = originator; - - return 0; -} - -/* - * clog_set_region_sync - * @rq - */ -static int clog_set_region_sync(struct dm_ulog_request *rq, uint32_t originator) -{ - struct { - uint64_t region; - int64_t in_sync; - } *pkg = (void *)rq->data; - struct log_c *lc = get_log(rq->uuid, rq->luid); - - if (!lc) - return -EINVAL; - - lc->recovering_region = (uint64_t)-1; - - if (pkg->in_sync) { - if (log_test_bit(lc->sync_bits, pkg->region)) { - LOG_SPRINT(lc, "SET - SEQ#=%u, UUID=%s, nodeid = %u:: " - "Region already set (%llu)", - rq->seq, SHORT_UUID(lc->uuid), originator, - (unsigned long long)pkg->region); - } else { - log_set_bit(lc, lc->sync_bits, pkg->region); - lc->sync_count++; - - /* The rest of this section is all for debugging */ - LOG_SPRINT(lc, "SET - SEQ#=%u, UUID=%s, nodeid = %u:: " - "Setting region (%llu)", - rq->seq, SHORT_UUID(lc->uuid), originator, - (unsigned long long)pkg->region); - if (pkg->region == lc->skip_bit_warning) - lc->skip_bit_warning = lc->region_count; - - if (pkg->region > (lc->skip_bit_warning + 5)) { - LOG_ERROR("*** Region #%llu skipped during recovery ***", - (unsigned long long)lc->skip_bit_warning); - lc->skip_bit_warning = lc->region_count; -#ifdef DEBUG - kill(getpid(), SIGUSR1); -#endif - } - - if (!log_test_bit(lc->sync_bits, - (pkg->region) ? pkg->region - 1 : 0)) { - LOG_SPRINT(lc, "*** Previous bit not set ***"); - lc->skip_bit_warning = (pkg->region) ? - pkg->region - 1 : 0; - } - } - } else if (log_test_bit(lc->sync_bits, pkg->region)) { - lc->sync_count--; - log_clear_bit(lc, lc->sync_bits, pkg->region); - LOG_SPRINT(lc, "SET - SEQ#=%u, UUID=%s, nodeid = %u:: " - "Unsetting region (%llu)", - rq->seq, SHORT_UUID(lc->uuid), originator, - (unsigned long long)pkg->region); - } - - if (lc->sync_count != count_bits32(lc->sync_bits)) { - unsigned long long reset = count_bits32(lc->sync_bits); - - LOG_SPRINT(lc, "SET - SEQ#=%u, UUID=%s, nodeid = %u:: " - "sync_count(%llu) != bitmap count(%llu)", - rq->seq, SHORT_UUID(lc->uuid), originator, - (unsigned long long)lc->sync_count, reset); -#ifdef DEBUG - kill(getpid(), SIGUSR1); -#endif - lc->sync_count = reset; - } - - if (lc->sync_count > lc->region_count) - LOG_SPRINT(lc, "SET - SEQ#=%u, UUID=%s, nodeid = %u:: " - "(lc->sync_count > lc->region_count) - this is bad", - rq->seq, SHORT_UUID(lc->uuid), originator); - - rq->data_size = 0; - return 0; -} - -/* - * clog_get_sync_count - * @rq - */ -static int clog_get_sync_count(struct dm_ulog_request *rq, uint32_t originator) -{ - uint64_t *sync_count = (uint64_t *)rq->data; - struct log_c *lc = get_log(rq->uuid, rq->luid); - - /* - * FIXME: Mirror requires us to be able to ask for - * the sync count while pending... but I don't like - * it because other machines may not be suspended and - * the stored value may not be accurate. - */ - if (!lc) - lc = get_pending_log(rq->uuid, rq->luid); - - if (!lc) - return -EINVAL; - - *sync_count = lc->sync_count; - - rq->data_size = sizeof(*sync_count); - - if (lc->sync_count != count_bits32(lc->sync_bits)) { - unsigned long long reset = count_bits32(lc->sync_bits); - - LOG_SPRINT(lc, "get_sync_count - SEQ#=%u, UUID=%s, nodeid = %u:: " - "sync_count(%llu) != bitmap count(%llu)", - rq->seq, SHORT_UUID(lc->uuid), originator, - (unsigned long long)lc->sync_count, reset); -#ifdef DEBUG - kill(getpid(), SIGUSR1); -#endif - lc->sync_count = reset; - } - - return 0; -} - -static int core_status_info(struct log_c *lc, struct dm_ulog_request *rq) -{ - char *data = (char *)rq->data; - - rq->data_size = sprintf(data, "1 clustered_core"); - - return 0; -} - -static int disk_status_info(struct log_c *lc, struct dm_ulog_request *rq) -{ - char *data = (char *)rq->data; - struct stat statbuf; - - if(fstat(lc->disk_fd, &statbuf)) { - rq->error = -errno; - return -errno; - } - - rq->data_size = sprintf(data, "3 clustered_disk %d:%d %c", - major(statbuf.st_rdev), minor(statbuf.st_rdev), - (lc->log_dev_failed) ? 'D' : 'A'); - - return 0; -} - -/* - * clog_status_info - * @rq - * - */ -static int clog_status_info(struct dm_ulog_request *rq) -{ - int r; - struct log_c *lc = get_log(rq->uuid, rq->luid); - - if (!lc) - lc = get_pending_log(rq->uuid, rq->luid); - - if (!lc) - return -EINVAL; - - if (lc->disk_fd == -1) - r = core_status_info(lc, rq); - else - r = disk_status_info(lc, rq); - - return r; -} - -static int core_status_table(struct log_c *lc, struct dm_ulog_request *rq) -{ - char *data = (char *)rq->data; - - rq->data_size = sprintf(data, "clustered_core %u %s%s ", - lc->region_size, - (lc->sync == DEFAULTSYNC) ? "" : - (lc->sync == NOSYNC) ? "nosync " : "sync ", - (lc->block_on_error) ? "block_on_error" : ""); - return 0; -} - -static int disk_status_table(struct log_c *lc, struct dm_ulog_request *rq) -{ - char *data = (char *)rq->data; - struct stat statbuf; - - if(fstat(lc->disk_fd, &statbuf)) { - rq->error = -errno; - return -errno; - } - - rq->data_size = sprintf(data, "clustered_disk %d:%d %u %s%s ", - major(statbuf.st_rdev), minor(statbuf.st_rdev), - lc->region_size, - (lc->sync == DEFAULTSYNC) ? "" : - (lc->sync == NOSYNC) ? "nosync " : "sync ", - (lc->block_on_error) ? "block_on_error" : ""); - return 0; -} - -/* - * clog_status_table - * @rq - * - */ -static int clog_status_table(struct dm_ulog_request *rq) -{ - int r; - struct log_c *lc = get_log(rq->uuid, rq->luid); - - if (!lc) - lc = get_pending_log(rq->uuid, rq->luid); - - if (!lc) - return -EINVAL; - - if (lc->disk_fd == -1) - r = core_status_table(lc, rq); - else - r = disk_status_table(lc, rq); - - return r; -} - -/* - * clog_is_remote_recovering - * @rq - * - */ -static int clog_is_remote_recovering(struct dm_ulog_request *rq) -{ - uint64_t region = *((uint64_t *)(rq->data)); - struct { - int64_t is_recovering; - uint64_t in_sync_hint; - } *pkg = (void *)rq->data; - struct log_c *lc = get_log(rq->uuid, rq->luid); - - if (!lc) - return -EINVAL; - - if (region > lc->region_count) - return -EINVAL; - - if (lc->recovery_halted) { - LOG_DBG("[%s] Recovery halted... [not remote recovering]: %llu", - SHORT_UUID(lc->uuid), (unsigned long long)region); - pkg->is_recovering = 0; - pkg->in_sync_hint = lc->region_count; /* none are recovering */ - } else { - pkg->is_recovering = !log_test_bit(lc->sync_bits, region); - - /* - * Remember, 'lc->sync_search' is 1 plus the region - * currently being recovered. So, we must take off 1 - * to account for that; but only if 'sync_search > 1'. - */ - pkg->in_sync_hint = lc->sync_search ? (lc->sync_search - 1) : 0; - LOG_DBG("[%s] Region is %s: %llu", - SHORT_UUID(lc->uuid), - (region == lc->recovering_region) ? - "currently remote recovering" : - (pkg->is_recovering) ? "pending remote recovery" : - "not remote recovering", (unsigned long long)region); - } - - if (pkg->is_recovering && - (region != lc->recovering_region)) { - struct recovery_request *rr; - - /* Already in the list? */ - for (rr = lc->recovery_request_list; rr; rr = rr->next) - if (rr->region == region) - goto out; - - /* Failure to allocated simply means we can't prioritize it */ - rr = malloc(sizeof(*rr)); - if (!rr) - goto out; - - LOG_DBG("[%s] Adding region to priority list: %llu", - SHORT_UUID(lc->uuid), (unsigned long long)region); - rr->region = region; - rr->next = lc->recovery_request_list; - lc->recovery_request_list = rr; - } - -out: - - rq->data_size = sizeof(*pkg); - - return 0; -} - - -/* - * do_request - * @rq: the request - * @server: is this request performed by the server - * - * An inability to perform this function will return an error - * from this function. However, an inability to successfully - * perform the request will fill in the 'rq->error' field. - * - * Returns: 0 on success, -EXXX on error - */ -int do_request(struct clog_request *rq, int server) -{ - int r; - - if (!rq) - return 0; - - if (rq->u_rq.error) - LOG_DBG("Programmer error: rq struct has error set"); - - switch (rq->u_rq.request_type) { - case DM_ULOG_CTR: - r = clog_ctr(&rq->u_rq); - break; - case DM_ULOG_DTR: - r = clog_dtr(&rq->u_rq); - break; - case DM_ULOG_PRESUSPEND: - r = clog_presuspend(&rq->u_rq); - break; - case DM_ULOG_POSTSUSPEND: - r = clog_postsuspend(&rq->u_rq); - break; - case DM_ULOG_RESUME: - r = clog_resume(&rq->u_rq); - break; - case DM_ULOG_GET_REGION_SIZE: - r = clog_get_region_size(&rq->u_rq); - break; - case DM_ULOG_IS_CLEAN: - r = clog_is_clean(&rq->u_rq); - break; - case DM_ULOG_IN_SYNC: - r = clog_in_sync(&rq->u_rq); - break; - case DM_ULOG_FLUSH: - r = clog_flush(&rq->u_rq, server); - break; - case DM_ULOG_MARK_REGION: - r = clog_mark_region(&rq->u_rq, rq->originator); - break; - case DM_ULOG_CLEAR_REGION: - r = clog_clear_region(&rq->u_rq, rq->originator); - break; - case DM_ULOG_GET_RESYNC_WORK: - r = clog_get_resync_work(&rq->u_rq, rq->originator); - break; - case DM_ULOG_SET_REGION_SYNC: - r = clog_set_region_sync(&rq->u_rq, rq->originator); - break; - case DM_ULOG_GET_SYNC_COUNT: - r = clog_get_sync_count(&rq->u_rq, rq->originator); - break; - case DM_ULOG_STATUS_INFO: - r = clog_status_info(&rq->u_rq); - break; - case DM_ULOG_STATUS_TABLE: - r = clog_status_table(&rq->u_rq); - break; - case DM_ULOG_IS_REMOTE_RECOVERING: - r = clog_is_remote_recovering(&rq->u_rq); - break; - default: - LOG_ERROR("Unknown request"); - r = rq->u_rq.error = -EINVAL; - break; - } - - if (r && !rq->u_rq.error) - rq->u_rq.error = r; - else if (r != rq->u_rq.error) - LOG_DBG("Warning: error from function != rq->u_rq.error"); - - if (rq->u_rq.error && rq->u_rq.data_size) { - /* Make sure I'm handling errors correctly above */ - LOG_DBG("Programmer error: rq->u_rq.error && rq->u_rq.data_size"); - rq->u_rq.data_size = 0; - } - - return 0; -} - -static void print_bits(char *buf, int size, int print) -{ - int i; - char outbuf[128]; - - memset(outbuf, 0, sizeof(outbuf)); - - for (i = 0; i < size; i++) { - if (!(i % 16)) { - if (outbuf[0] != '\0') { - if (print) - LOG_PRINT("%s", outbuf); - else - LOG_DBG("%s", outbuf); - } - memset(outbuf, 0, sizeof(outbuf)); - sprintf(outbuf, "[%3d - %3d]", i, i+15); - } - sprintf(outbuf + strlen(outbuf), " %.2X", (unsigned char)buf[i]); - } - if (outbuf[0] != '\0') { - if (print) - LOG_PRINT("%s", outbuf); - else - LOG_DBG("%s", outbuf); - } -} - -/* int store_bits(const char *uuid, const char *which, char **buf)*/ -int push_state(const char *uuid, uint64_t luid, - const char *which, char **buf, uint32_t debug_who) -{ - int bitset_size; - struct log_c *lc; - - if (*buf) - LOG_ERROR("store_bits: *buf != NULL"); - - lc = get_log(uuid, luid); - if (!lc) { - LOG_ERROR("store_bits: No log found for %s", uuid); - return -EINVAL; - } - - if (!strcmp(which, "recovering_region")) { - *buf = malloc(64); /* easily handles the 2 written numbers */ - if (!*buf) - return -ENOMEM; - sprintf(*buf, "%llu %u", (unsigned long long)lc->recovering_region, - lc->recoverer); - - LOG_SPRINT(lc, "CKPT SEND - SEQ#=X, UUID=%s, nodeid = %u:: " - "recovering_region=%llu, recoverer=%u, sync_count=%llu", - SHORT_UUID(lc->uuid), debug_who, - (unsigned long long)lc->recovering_region, - lc->recoverer, - (unsigned long long)count_bits32(lc->sync_bits)); - return 64; - } - - /* Size in 'int's */ - bitset_size = ((int)lc->clean_bits[0]/DM_BITS_PER_INT) + 1; - - /* Size in bytes */ - bitset_size *= 4; - - *buf = malloc(bitset_size); - - if (!*buf) { - LOG_ERROR("store_bits: Unable to allocate memory"); - return -ENOMEM; - } - - if (!strncmp(which, "sync_bits", 9)) { - memcpy(*buf, lc->sync_bits + 1, bitset_size); - LOG_DBG("[%s] storing sync_bits (sync_count = %llu):", - SHORT_UUID(uuid), (unsigned long long) - count_bits32(lc->sync_bits)); - print_bits(*buf, bitset_size, 0); - } else if (!strncmp(which, "clean_bits", 9)) { - memcpy(*buf, lc->clean_bits + 1, bitset_size); - LOG_DBG("[%s] storing clean_bits:", SHORT_UUID(lc->uuid)); - print_bits(*buf, bitset_size, 0); - } - - return bitset_size; -} - -/*int load_bits(const char *uuid, const char *which, char *buf, int size)*/ -int pull_state(const char *uuid, uint64_t luid, - const char *which, char *buf, int size) -{ - int bitset_size; - struct log_c *lc; - - if (!buf) - LOG_ERROR("pull_state: buf == NULL"); - - lc = get_log(uuid, luid); - if (!lc) { - LOG_ERROR("pull_state: No log found for %s", uuid); - return -EINVAL; - } - - if (!strncmp(which, "recovering_region", 17)) { - sscanf(buf, "%llu %u", (unsigned long long *)&lc->recovering_region, - &lc->recoverer); - LOG_SPRINT(lc, "CKPT INIT - SEQ#=X, UUID=%s, nodeid = X:: " - "recovering_region=%llu, recoverer=%u", - SHORT_UUID(lc->uuid), - (unsigned long long)lc->recovering_region, lc->recoverer); - return 0; - } - - /* Size in 'int's */ - bitset_size = ((int)lc->clean_bits[0]/DM_BITS_PER_INT) + 1; - - /* Size in bytes */ - bitset_size *= 4; - - if (bitset_size != size) { - LOG_ERROR("pull_state(%s): bad bitset_size (%d vs %d)", - which, size, bitset_size); - return -EINVAL; - } - - if (!strncmp(which, "sync_bits", 9)) { - lc->resume_override += 1; - memcpy(lc->sync_bits + 1, buf, bitset_size); - LOG_DBG("[%s] loading sync_bits (sync_count = %llu):", - SHORT_UUID(lc->uuid),(unsigned long long) - count_bits32(lc->sync_bits)); - print_bits((char *)lc->sync_bits, bitset_size, 0); - } else if (!strncmp(which, "clean_bits", 9)) { - lc->resume_override += 2; - memcpy(lc->clean_bits + 1, buf, bitset_size); - LOG_DBG("[%s] loading clean_bits:", SHORT_UUID(lc->uuid)); - print_bits((char *)lc->clean_bits, bitset_size, 0); - } - - return 0; -} - -int log_get_state(struct dm_ulog_request *rq) -{ - struct log_c *lc; - - lc = get_log(rq->uuid, rq->luid); - if (!lc) - return -EINVAL; - - return lc->state; -} - -/* - * log_status - * - * Returns: 1 if logs are still present, 0 otherwise - */ -int log_status(void) -{ - if (!dm_list_empty(&log_list) || !dm_list_empty(&log_pending_list)) - return 1; - - return 0; -} - -void log_debug(void) -{ - struct log_c *lc; - uint64_t r; - int i; - - LOG_ERROR(""); - LOG_ERROR("LOG COMPONENT DEBUGGING::"); - LOG_ERROR("Official log list:"); - LOG_ERROR("Pending log list:"); - dm_list_iterate_items(lc, &log_pending_list) { - LOG_ERROR("%s", lc->uuid); - LOG_ERROR("sync_bits:"); - print_bits((char *)lc->sync_bits, (int)lc->sync_bits[0], 1); - LOG_ERROR("clean_bits:"); - print_bits((char *)lc->clean_bits, (int)lc->sync_bits[0], 1); - } - - dm_list_iterate_items(lc, &log_list) { - LOG_ERROR("%s", lc->uuid); - LOG_ERROR(" recoverer : %u", lc->recoverer); - LOG_ERROR(" recovering_region: %llu", - (unsigned long long)lc->recovering_region); - LOG_ERROR(" recovery_halted : %s", (lc->recovery_halted) ? - "YES" : "NO"); - LOG_ERROR("sync_bits:"); - print_bits((char *)lc->sync_bits, (int)lc->sync_bits[0], 1); - LOG_ERROR("clean_bits:"); - print_bits((char *)lc->clean_bits, (int)lc->sync_bits[0], 1); - - LOG_ERROR("Validating %s::", SHORT_UUID(lc->uuid)); - r = find_next_zero_bit(lc->sync_bits, 0); - LOG_ERROR(" lc->region_count = %llu", - (unsigned long long)lc->region_count); - LOG_ERROR(" lc->sync_count = %llu", - (unsigned long long)lc->sync_count); - LOG_ERROR(" next zero bit = %llu", - (unsigned long long)r); - if ((r > lc->region_count) || - ((r == lc->region_count) && (lc->sync_count > lc->region_count))) { - LOG_ERROR("ADJUSTING SYNC_COUNT"); - lc->sync_count = lc->region_count; - } - - LOG_ERROR("Resync request history:"); - for (i = 0; i < RESYNC_HISTORY; i++) { - lc->idx++; - lc->idx = lc->idx % RESYNC_HISTORY; - if (lc->resync_history[lc->idx][0] == '\0') - continue; - LOG_ERROR("%d:%d) %s", i, lc->idx, - lc->resync_history[lc->idx]); - } - } -} diff --git a/daemons/clogd/functions.h b/daemons/clogd/functions.h deleted file mode 100644 index 6ac79ce37..000000000 --- a/daemons/clogd/functions.h +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Copyright (C) 2004-2009 Red Hat, Inc. All rights reserved. - * - * This copyrighted material is made available to anyone wishing to use, - * modify, copy, or redistribute it subject to the terms and conditions - * of the GNU Lesser General Public License v.2.1. - * - * You should have received a copy of the GNU Lesser General Public License - * along with this program; if not, write to the Free Software Foundation, - * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - */ -#ifndef __CLOG_FUNCTIONS_DOT_H__ -#define __CLOG_FUNCTIONS_DOT_H__ - -#include "dm-log-userspace.h" -#include "cluster.h" - -#define LOG_RESUMED 1 -#define LOG_SUSPENDED 2 - -int local_resume(struct dm_ulog_request *rq); -int cluster_postsuspend(char *, uint64_t); - -int do_request(struct clog_request *rq, int server); -int push_state(const char *uuid, uint64_t luid, - const char *which, char **buf, uint32_t debug_who); -int pull_state(const char *uuid, uint64_t luid, - const char *which, char *buf, int size); - -int log_get_state(struct dm_ulog_request *rq); -int log_status(void); -void log_debug(void); - -#endif /* __CLOG_FUNCTIONS_DOT_H__ */ diff --git a/daemons/clogd/link_mon.c b/daemons/clogd/link_mon.c deleted file mode 100644 index 7b69664e5..000000000 --- a/daemons/clogd/link_mon.c +++ /dev/null @@ -1,149 +0,0 @@ -/* - * Copyright (C) 2004-2009 Red Hat, Inc. All rights reserved. - * - * This copyrighted material is made available to anyone wishing to use, - * modify, copy, or redistribute it subject to the terms and conditions - * of the GNU Lesser General Public License v.2.1. - * - * You should have received a copy of the GNU Lesser General Public License - * along with this program; if not, write to the Free Software Foundation, - * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - */ -#include -#include -#include - -#include "logging.h" - -struct link_callback { - int fd; - char *name; - void *data; - int (*callback)(void *data); - - struct link_callback *next; -}; - -static int used_pfds = 0; -static int free_pfds = 0; -static struct pollfd *pfds = NULL; -static struct link_callback *callbacks = NULL; - -int links_register(int fd, char *name, int (*callback)(void *data), void *data) -{ - int i; - struct link_callback *lc; - - for (i = 0; i < used_pfds; i++) { - if (fd == pfds[i].fd) { - LOG_ERROR("links_register: Duplicate file descriptor"); - return -EINVAL; - } - } - - lc = malloc(sizeof(*lc)); - if (!lc) - return -ENOMEM; - - lc->fd = fd; - lc->name = name; - lc->data = data; - lc->callback = callback; - - if (!free_pfds) { - struct pollfd *tmp; - tmp = realloc(pfds, sizeof(struct pollfd) * ((used_pfds*2) + 1)); - if (!tmp) { - free(lc); - return -ENOMEM; - } - - pfds = tmp; - free_pfds = used_pfds + 1; - } - - free_pfds--; - pfds[used_pfds].fd = fd; - pfds[used_pfds].events = POLLIN; - pfds[used_pfds].revents = 0; - used_pfds++; - - lc->next = callbacks; - callbacks = lc; - LOG_DBG("Adding %s/%d", lc->name, lc->fd); - LOG_DBG(" used_pfds = %d, free_pfds = %d", - used_pfds, free_pfds); - - return 0; -} - -int links_unregister(int fd) -{ - int i; - struct link_callback *p, *c; - - for (i = 0; i < used_pfds; i++) - if (fd == pfds[i].fd) { - /* entire struct is copied (overwritten) */ - pfds[i] = pfds[used_pfds - 1]; - used_pfds--; - free_pfds++; - } - - for (p = NULL, c = callbacks; c; p = c, c = c->next) - if (fd == c->fd) { - LOG_DBG("Freeing up %s/%d", c->name, c->fd); - LOG_DBG(" used_pfds = %d, free_pfds = %d", - used_pfds, free_pfds); - if (p) - p->next = c->next; - else - callbacks = c->next; - free(c); - break; - } - - return 0; -} - -int links_monitor(void) -{ - int i, r; - - for (i = 0; i < used_pfds; i++) { - pfds[i].revents = 0; - } - - r = poll(pfds, used_pfds, -1); - if (r <= 0) - return r; - - r = 0; - /* FIXME: handle POLLHUP */ - for (i = 0; i < used_pfds; i++) - if (pfds[i].revents & POLLIN) { - LOG_DBG("Data ready on %d", pfds[i].fd); - - /* FIXME: Add this back return 1;*/ - r++; - } - - return r; -} - -int links_issue_callbacks(void) -{ - int i; - struct link_callback *lc; - - for (i = 0; i < used_pfds; i++) - if (pfds[i].revents & POLLIN) - for (lc = callbacks; lc; lc = lc->next) - if (pfds[i].fd == lc->fd) { - LOG_DBG("Issuing callback on %s/%d", - lc->name, lc->fd); - lc->callback(lc->data); - break; - } - return 0; -} diff --git a/daemons/clogd/link_mon.h b/daemons/clogd/link_mon.h deleted file mode 100644 index ce9e0557b..000000000 --- a/daemons/clogd/link_mon.h +++ /dev/null @@ -1,20 +0,0 @@ -/* - * Copyright (C) 2004-2009 Red Hat, Inc. All rights reserved. - * - * This copyrighted material is made available to anyone wishing to use, - * modify, copy, or redistribute it subject to the terms and conditions - * of the GNU Lesser General Public License v.2.1. - * - * You should have received a copy of the GNU Lesser General Public License - * along with this program; if not, write to the Free Software Foundation, - * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - */ -#ifndef __LINK_MON_DOT_H__ -#define __LINK_MON_DOT_H__ - -int links_register(int fd, char *name, int (*callback)(void *data), void *data); -int links_unregister(int fd); -int links_monitor(void); -int links_issue_callbacks(void); - -#endif /* __LINK_MON_DOT_H__ */ diff --git a/daemons/clogd/local.c b/daemons/clogd/local.c deleted file mode 100644 index e835fa0f5..000000000 --- a/daemons/clogd/local.c +++ /dev/null @@ -1,420 +0,0 @@ -/* - * Copyright (C) 2004-2009 Red Hat, Inc. All rights reserved. - * - * This copyrighted material is made available to anyone wishing to use, - * modify, copy, or redistribute it subject to the terms and conditions - * of the GNU Lesser General Public License v.2.1. - * - * You should have received a copy of the GNU Lesser General Public License - * along with this program; if not, write to the Free Software Foundation, - * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - */ -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include "dm-log-userspace.h" -#include "functions.h" -#include "cluster.h" -#include "common.h" -#include "logging.h" -#include "link_mon.h" -#include "local.h" - -#ifndef CN_IDX_DM -#warning Kernel should be at least 2.6.31 -#define CN_IDX_DM 0x7 /* Device Mapper */ -#define CN_VAL_DM_USERSPACE_LOG 0x1 -#endif - -static int cn_fd; /* Connector (netlink) socket fd */ -static char recv_buf[2048]; -static char send_buf[2048]; - - -/* FIXME: merge this function with kernel_send_helper */ -static int kernel_ack(uint32_t seq, int error) -{ - int r; - struct nlmsghdr *nlh = (struct nlmsghdr *)send_buf; - struct cn_msg *msg = NLMSG_DATA(nlh); - - if (error < 0) { - LOG_ERROR("Programmer error: error codes must be positive"); - return -EINVAL; - } - - memset(send_buf, 0, sizeof(send_buf)); - - nlh->nlmsg_seq = 0; - nlh->nlmsg_pid = getpid(); - nlh->nlmsg_type = NLMSG_DONE; - nlh->nlmsg_len = NLMSG_LENGTH(sizeof(struct cn_msg)); - nlh->nlmsg_flags = 0; - - msg->len = 0; - msg->id.idx = CN_IDX_DM; - msg->id.val = CN_VAL_DM_USERSPACE_LOG; - msg->seq = seq; - msg->ack = error; - - r = send(cn_fd, nlh, NLMSG_LENGTH(sizeof(struct cn_msg)), 0); - /* FIXME: do better error processing */ - if (r <= 0) - return -EBADE; - - return 0; -} - - -/* - * kernel_recv - * @rq: the newly allocated request from kernel - * - * Read requests from the kernel and allocate space for the new request. - * If there is no request from the kernel, *rq is NULL. - * - * This function is not thread safe due to returned stack pointer. In fact, - * the returned pointer must not be in-use when this function is called again. - * - * Returns: 0 on success, -EXXX on error - */ -static int kernel_recv(struct clog_request **rq) -{ - int r = 0; - int len; - struct cn_msg *msg; - struct dm_ulog_request *u_rq; - - *rq = NULL; - memset(recv_buf, 0, sizeof(recv_buf)); - - len = recv(cn_fd, recv_buf, sizeof(recv_buf), 0); - if (len < 0) { - LOG_ERROR("Failed to recv message from kernel"); - r = -errno; - goto fail; - } - - switch (((struct nlmsghdr *)recv_buf)->nlmsg_type) { - case NLMSG_ERROR: - LOG_ERROR("Unable to recv message from kernel: NLMSG_ERROR"); - r = -EBADE; - goto fail; - case NLMSG_DONE: - msg = (struct cn_msg *)NLMSG_DATA((struct nlmsghdr *)recv_buf); - len -= sizeof(struct nlmsghdr); - - if (len < sizeof(struct cn_msg)) { - LOG_ERROR("Incomplete request from kernel received"); - r = -EBADE; - goto fail; - } - - if (msg->len > DM_ULOG_REQUEST_SIZE) { - LOG_ERROR("Not enough space to receive kernel request (%d/%d)", - msg->len, DM_ULOG_REQUEST_SIZE); - r = -EBADE; - goto fail; - } - - if (!msg->len) - LOG_ERROR("Zero length message received"); - - len -= sizeof(struct cn_msg); - - if (len < msg->len) - LOG_ERROR("len = %d, msg->len = %d", len, msg->len); - - msg->data[msg->len] = '\0'; /* Cleaner way to ensure this? */ - u_rq = (struct dm_ulog_request *)msg->data; - - if (!u_rq->request_type) { - LOG_DBG("Bad transmission, requesting resend [%u]", - msg->seq); - r = -EAGAIN; - - if (kernel_ack(msg->seq, EAGAIN)) { - LOG_ERROR("Failed to NACK kernel transmission [%u]", - msg->seq); - r = -EBADE; - } - } - - /* - * Now we've got sizeof(struct cn_msg) + sizeof(struct nlmsghdr) - * worth of space that precede the request structure from the - * kernel. Since that space isn't going to be used again, we - * can take it for our purposes; rather than allocating a whole - * new structure and doing a memcpy. - * - * We should really make sure 'clog_request' doesn't grow - * beyond what is available to us, but we need only check it - * once... perhaps at compile time? - */ -// *rq = container_of(u_rq, struct clog_request, u_rq); - *rq = (void *)u_rq - - (sizeof(struct clog_request) - - sizeof(struct dm_ulog_request)); - - /* Clear the wrapper container fields */ - memset(*rq, 0, (void *)u_rq - (void *)(*rq)); - break; - default: - LOG_ERROR("Unknown nlmsg_type"); - r = -EBADE; - } - -fail: - if (r) - *rq = NULL; - - return (r == -EAGAIN) ? 0 : r; -} - -static int kernel_send_helper(void *data, int out_size) -{ - int r; - struct nlmsghdr *nlh; - struct cn_msg *msg; - - memset(send_buf, 0, sizeof(send_buf)); - - nlh = (struct nlmsghdr *)send_buf; - nlh->nlmsg_seq = 0; /* FIXME: Is this used? */ - nlh->nlmsg_pid = getpid(); - nlh->nlmsg_type = NLMSG_DONE; - nlh->nlmsg_len = NLMSG_LENGTH(out_size + sizeof(struct cn_msg)); - nlh->nlmsg_flags = 0; - - msg = NLMSG_DATA(nlh); - memcpy(msg->data, data, out_size); - msg->len = out_size; - msg->id.idx = CN_IDX_DM; - msg->id.val = CN_VAL_DM_USERSPACE_LOG; - msg->seq = 0; - - r = send(cn_fd, nlh, NLMSG_LENGTH(out_size + sizeof(struct cn_msg)), 0); - /* FIXME: do better error processing */ - if (r <= 0) - return -EBADE; - - return 0; -} - -/* - * do_local_work - * - * Any processing errors are placed in the 'rq' - * structure to be reported back to the kernel. - * It may be pointless for this function to - * return an int. - * - * Returns: 0 on success, -EXXX on failure - */ -static int do_local_work(void *data) -{ - int r; - struct clog_request *rq; - struct dm_ulog_request *u_rq = NULL; - - r = kernel_recv(&rq); - if (r) - return r; - - if (!rq) - return 0; - - u_rq = &rq->u_rq; - LOG_DBG("[%s] Request from kernel received: [%s/%u]", - SHORT_UUID(u_rq->uuid), RQ_TYPE(u_rq->request_type), - u_rq->seq); - switch (u_rq->request_type) { - case DM_ULOG_CTR: - case DM_ULOG_DTR: - case DM_ULOG_GET_REGION_SIZE: - case DM_ULOG_IN_SYNC: - case DM_ULOG_GET_SYNC_COUNT: - case DM_ULOG_STATUS_INFO: - case DM_ULOG_STATUS_TABLE: - case DM_ULOG_PRESUSPEND: - /* We do not specify ourselves as server here */ - r = do_request(rq, 0); - if (r) - LOG_DBG("Returning failed request to kernel [%s]", - RQ_TYPE(u_rq->request_type)); - r = kernel_send(u_rq); - if (r) - LOG_ERROR("Failed to respond to kernel [%s]", - RQ_TYPE(u_rq->request_type)); - - break; - case DM_ULOG_RESUME: - /* - * Resume is a special case that requires a local - * component to join the CPG, and a cluster component - * to handle the request. - */ - r = local_resume(u_rq); - if (r) { - LOG_DBG("Returning failed request to kernel [%s]", - RQ_TYPE(u_rq->request_type)); - r = kernel_send(u_rq); - if (r) - LOG_ERROR("Failed to respond to kernel [%s]", - RQ_TYPE(u_rq->request_type)); - break; - } - /* ELSE, fall through */ - case DM_ULOG_IS_CLEAN: - case DM_ULOG_FLUSH: - case DM_ULOG_MARK_REGION: - case DM_ULOG_GET_RESYNC_WORK: - case DM_ULOG_SET_REGION_SYNC: - case DM_ULOG_IS_REMOTE_RECOVERING: - case DM_ULOG_POSTSUSPEND: - r = cluster_send(rq); - if (r) { - u_rq->data_size = 0; - u_rq->error = r; - kernel_send(u_rq); - } - - break; - case DM_ULOG_CLEAR_REGION: - r = kernel_ack(u_rq->seq, 0); - - r = cluster_send(rq); - if (r) { - /* - * FIXME: store error for delivery on flush - * This would allow us to optimize MARK_REGION - * too. - */ - } - - break; - default: - LOG_ERROR("Invalid log request received (%u), ignoring.", - u_rq->request_type); - - return 0; - } - - if (r && !u_rq->error) - u_rq->error = r; - - return r; -} - -/* - * kernel_send - * @u_rq: result to pass back to kernel - * - * This function returns the u_rq structure - * (containing the results) to the kernel. - * It then frees the structure. - * - * WARNING: should the structure be freed if - * there is an error? I vote 'yes'. If the - * kernel doesn't get the response, it should - * resend the request. - * - * Returns: 0 on success, -EXXX on failure - */ -int kernel_send(struct dm_ulog_request *u_rq) -{ - int r; - int size; - - if (!u_rq) - return -EINVAL; - - size = sizeof(struct dm_ulog_request) + u_rq->data_size; - - if (!u_rq->data_size && !u_rq->error) { - /* An ACK is all that is needed */ - - /* FIXME: add ACK code */ - } else if (size > DM_ULOG_REQUEST_SIZE) { - /* - * If we gotten here, we've already overrun - * our allotted space somewhere. - * - * We must do something, because the kernel - * is waiting for a response. - */ - LOG_ERROR("Not enough space to respond to server"); - u_rq->error = -ENOSPC; - size = sizeof(struct dm_ulog_request); - } - - r = kernel_send_helper(u_rq, size); - if (r) - LOG_ERROR("Failed to send msg to kernel."); - - return r; -} - -/* - * init_local - * - * Initialize kernel communication socket (netlink) - * - * Returns: 0 on success, values from common.h on failure - */ -int init_local(void) -{ - int r = 0; - int opt; - struct sockaddr_nl addr; - - cn_fd = socket(PF_NETLINK, SOCK_DGRAM, NETLINK_CONNECTOR); - if (cn_fd < 0) - return EXIT_KERNEL_SOCKET; - - /* memset to fix valgrind complaint */ - memset(&addr, 0, sizeof(struct sockaddr_nl)); - - addr.nl_family = AF_NETLINK; - addr.nl_groups = CN_IDX_DM; - addr.nl_pid = 0; - - r = bind(cn_fd, (struct sockaddr *) &addr, sizeof(addr)); - if (r < 0) { - close(cn_fd); - return EXIT_KERNEL_BIND; - } - - opt = addr.nl_groups; - r = setsockopt(cn_fd, 270, NETLINK_ADD_MEMBERSHIP, &opt, sizeof(opt)); - if (r) { - close(cn_fd); - return EXIT_KERNEL_SETSOCKOPT; - } - - /* - r = fcntl(cn_fd, F_SETFL, FNDELAY); - */ - - links_register(cn_fd, "local", do_local_work, NULL); - - return 0; -} - -/* - * cleanup_local - * - * Clean up before exiting - */ -void cleanup_local(void) -{ - links_unregister(cn_fd); - close(cn_fd); -} diff --git a/daemons/clogd/local.h b/daemons/clogd/local.h deleted file mode 100644 index 9c813c973..000000000 --- a/daemons/clogd/local.h +++ /dev/null @@ -1,20 +0,0 @@ -/* - * Copyright (C) 2004-2009 Red Hat, Inc. All rights reserved. - * - * This copyrighted material is made available to anyone wishing to use, - * modify, copy, or redistribute it subject to the terms and conditions - * of the GNU Lesser General Public License v.2.1. - * - * You should have received a copy of the GNU Lesser General Public License - * along with this program; if not, write to the Free Software Foundation, - * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - */ -#ifndef __CLUSTER_LOG_LOCAL_DOT_H__ -#define __CLUSTER_LOG_LOCAL_DOT_H__ - -int init_local(void); -void cleanup_local(void); - -int kernel_send(struct dm_ulog_request *rq); - -#endif /* __CLUSTER_LOG_LOCAL_DOT_H__ */ diff --git a/daemons/clogd/logging.c b/daemons/clogd/logging.c deleted file mode 100644 index e9a1d4af1..000000000 --- a/daemons/clogd/logging.c +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Copyright (C) 2004-2009 Red Hat, Inc. All rights reserved. - * - * This copyrighted material is made available to anyone wishing to use, - * modify, copy, or redistribute it subject to the terms and conditions - * of the GNU Lesser General Public License v.2.1. - * - * You should have received a copy of the GNU Lesser General Public License - * along with this program; if not, write to the Free Software Foundation, - * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - */ -#include -#include - -char *__rq_types_off_by_one[] = { - "DM_ULOG_CTR", - "DM_ULOG_DTR", - "DM_ULOG_PRESUSPEND", - "DM_ULOG_POSTSUSPEND", - "DM_ULOG_RESUME", - "DM_ULOG_GET_REGION_SIZE", - "DM_ULOG_IS_CLEAN", - "DM_ULOG_IN_SYNC", - "DM_ULOG_FLUSH", - "DM_ULOG_MARK_REGION", - "DM_ULOG_CLEAR_REGION", - "DM_ULOG_GET_RESYNC_WORK", - "DM_ULOG_SET_REGION_SYNC", - "DM_ULOG_GET_SYNC_COUNT", - "DM_ULOG_STATUS_INFO", - "DM_ULOG_STATUS_TABLE", - "DM_ULOG_IS_REMOTE_RECOVERING", - NULL -}; - -int log_tabbing = 0; -int log_is_open = 0; - -/* - * Variables for various conditional logging - */ -#ifdef MEMB -int log_membership_change = 1; -#else -int log_membership_change = 0; -#endif - -#ifdef CKPT -int log_checkpoint = 1; -#else -int log_checkpoint = 0; -#endif - -#ifdef RESEND -int log_resend_requests = 1; -#else -int log_resend_requests = 0; -#endif diff --git a/daemons/clogd/logging.h b/daemons/clogd/logging.h deleted file mode 100644 index 8465d6933..000000000 --- a/daemons/clogd/logging.h +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Copyright (C) 2004-2009 Red Hat, Inc. All rights reserved. - * - * This copyrighted material is made available to anyone wishing to use, - * modify, copy, or redistribute it subject to the terms and conditions - * of the GNU Lesser General Public License v.2.1. - * - * You should have received a copy of the GNU Lesser General Public License - * along with this program; if not, write to the Free Software Foundation, - * Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA - */ - -#ifndef __CLUSTER_LOG_LOGGING_DOT_H__ -#define __CLUSTER_LOG_LOGGING_DOT_H__ - -#include -#include - -/* SHORT_UUID - print last 8 chars of a string */ -#define SHORT_UUID(x) (strlen(x) > 8) ? ((x) + (strlen(x) - 8)) : (x) - -extern char *__rq_types_off_by_one[]; -#define RQ_TYPE(x) __rq_types_off_by_one[(x) - 1] - -extern int log_tabbing; -extern int log_is_open; -extern int log_membership_change; -extern int log_checkpoint; -extern int log_resend_requests; - -#define LOG_OPEN(ident, option, facility) do { \ - openlog(ident, option, facility); \ - log_is_open = 1; \ - } while (0) - -#define LOG_CLOSE(void) do { \ - log_is_open = 0; \ - closelog(); \ - } while (0) - -#define LOG_OUTPUT(level, f, arg...) do { \ - int __i; \ - char __buffer[16]; \ - FILE *fp = (level > LOG_NOTICE) ? stderr : stdout; \ - if (log_is_open) { \ - for (__i = 0; (__i < log_tabbing) && (__i < 15); __i++) \ - __buffer[__i] = '\t'; \ - __buffer[__i] = '\0'; \ - syslog(level, "%s" f "\n", __buffer, ## arg); \ - } else { \ - for (__i = 0; __i < log_tabbing; __i++) \ - fprintf(fp, "\t"); \ - fprintf(fp, f "\n", ## arg); \ - } \ - } while (0) - - -#ifdef DEBUG -#define LOG_DBG(f, arg...) LOG_OUTPUT(LOG_DEBUG, f, ## arg) -#else /* DEBUG */ -#define LOG_DBG(f, arg...) -#endif /* DEBUG */ - -#define LOG_COND(__X, f, arg...) do {\ - if (__X) { \ - LOG_OUTPUT(LOG_NOTICE, f, ## arg); \ - } \ - } while (0) -#define LOG_PRINT(f, arg...) LOG_OUTPUT(LOG_NOTICE, f, ## arg) -#define LOG_ERROR(f, arg...) LOG_OUTPUT(LOG_ERR, f, ## arg) - -#endif /* __CLUSTER_LOG_LOGGING_DOT_H__ */