1
0
mirror of git://sourceware.org/git/lvm2.git synced 2025-01-06 17:18:29 +03:00
lvm2/daemons/clvmd/clvmd-corosync.c
Zdenek Kabelac dae4f53acb clvmd: add mutex protection for cpg_ call
The library for corosync multicasting is not supporting multithread
usage - add local mutex to avoid parallel call of cpg_mcast_joined().
2017-02-05 17:55:37 +01:00

663 lines
16 KiB
C

/*
* Copyright (C) 2009-2012 Red Hat, Inc. All rights reserved.
*
* This file is part of LVM2.
*
* This copyrighted material is made available to anyone wishing to use,
* modify, copy, or redistribute it subject to the terms and conditions
* of the GNU Lesser General Public License v.2.1.
*
* You should have received a copy of the GNU Lesser General Public License
* along with this program; if not, write to the Free Software Foundation,
* Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*/
/*
* This provides the interface between clvmd and corosync/DLM as the cluster
* and lock manager.
*/
#include "clvmd-common.h"
#include <pthread.h>
#include "clvm.h"
#include "clvmd-comms.h"
#include "clvmd.h"
#include "lvm-functions.h"
#include "locking.h"
#include <corosync/cpg.h>
#include <corosync/quorum.h>
#ifdef HAVE_COROSYNC_CONFDB_H
# include <corosync/confdb.h>
#elif defined HAVE_COROSYNC_CMAP_H
# include <corosync/cmap.h>
#else
# error "Either HAVE_COROSYNC_CONFDB_H or HAVE_COROSYNC_CMAP_H must be defined."
#endif
#include <libdlm.h>
#include <syslog.h>
/* Timeout value for several corosync calls */
#define LOCKSPACE_NAME "clvmd"
static void corosync_cpg_deliver_callback (cpg_handle_t handle,
const struct cpg_name *groupName,
uint32_t nodeid,
uint32_t pid,
void *msg,
size_t msg_len);
static void corosync_cpg_confchg_callback(cpg_handle_t handle,
const struct cpg_name *groupName,
const struct cpg_address *member_list, size_t member_list_entries,
const struct cpg_address *left_list, size_t left_list_entries,
const struct cpg_address *joined_list, size_t joined_list_entries);
static void _cluster_closedown(void);
/* Hash list of nodes in the cluster */
static struct dm_hash_table *node_hash;
/* Number of active nodes */
static int num_nodes;
static unsigned int our_nodeid;
static struct local_client *cluster_client;
/* Corosync handles */
static cpg_handle_t cpg_handle;
static quorum_handle_t quorum_handle;
/* DLM Handle */
static dlm_lshandle_t *lockspace;
static struct cpg_name cpg_group_name;
/* Corosync callback structs */
cpg_callbacks_t corosync_cpg_callbacks = {
.cpg_deliver_fn = corosync_cpg_deliver_callback,
.cpg_confchg_fn = corosync_cpg_confchg_callback,
};
quorum_callbacks_t quorum_callbacks = {
.quorum_notify_fn = NULL,
};
struct node_info
{
enum {NODE_DOWN, NODE_CLVMD} state;
int nodeid;
};
/* Set errno to something approximating the right value and return 0 or -1 */
static int cs_to_errno(cs_error_t err)
{
switch(err)
{
case CS_OK:
return 0;
case CS_ERR_LIBRARY:
errno = EINVAL;
break;
case CS_ERR_VERSION:
errno = EINVAL;
break;
case CS_ERR_INIT:
errno = EINVAL;
break;
case CS_ERR_TIMEOUT:
errno = ETIME;
break;
case CS_ERR_TRY_AGAIN:
errno = EAGAIN;
break;
case CS_ERR_INVALID_PARAM:
errno = EINVAL;
break;
case CS_ERR_NO_MEMORY:
errno = ENOMEM;
break;
case CS_ERR_BAD_HANDLE:
errno = EINVAL;
break;
case CS_ERR_BUSY:
errno = EBUSY;
break;
case CS_ERR_ACCESS:
errno = EPERM;
break;
case CS_ERR_NOT_EXIST:
errno = ENOENT;
break;
case CS_ERR_NAME_TOO_LONG:
errno = ENAMETOOLONG;
break;
case CS_ERR_EXIST:
errno = EEXIST;
break;
case CS_ERR_NO_SPACE:
errno = ENOSPC;
break;
case CS_ERR_INTERRUPT:
errno = EINTR;
break;
case CS_ERR_NAME_NOT_FOUND:
errno = ENOENT;
break;
case CS_ERR_NO_RESOURCES:
errno = ENOMEM;
break;
case CS_ERR_NOT_SUPPORTED:
errno = EOPNOTSUPP;
break;
case CS_ERR_BAD_OPERATION:
errno = EINVAL;
break;
case CS_ERR_FAILED_OPERATION:
errno = EIO;
break;
case CS_ERR_MESSAGE_ERROR:
errno = EIO;
break;
case CS_ERR_QUEUE_FULL:
errno = EXFULL;
break;
case CS_ERR_QUEUE_NOT_AVAILABLE:
errno = EINVAL;
break;
case CS_ERR_BAD_FLAGS:
errno = EINVAL;
break;
case CS_ERR_TOO_BIG:
errno = E2BIG;
break;
case CS_ERR_NO_SECTIONS:
errno = ENOMEM;
break;
default:
errno = EINVAL;
break;
}
return -1;
}
static char *print_corosync_csid(const char *csid)
{
static char buf[128];
int id;
memcpy(&id, csid, sizeof(int));
sprintf(buf, "%d", id);
return buf;
}
static void corosync_cpg_deliver_callback (cpg_handle_t handle,
const struct cpg_name *groupName,
uint32_t nodeid,
uint32_t pid,
void *msg,
size_t msg_len)
{
int target_nodeid;
memcpy(&target_nodeid, msg, COROSYNC_CSID_LEN);
DEBUGLOG("%u got message from nodeid %d for %d. len %zd\n",
our_nodeid, nodeid, target_nodeid, msg_len-4);
if (nodeid != our_nodeid)
if (target_nodeid == our_nodeid || target_nodeid == 0)
process_message(cluster_client, (char *)msg+COROSYNC_CSID_LEN,
msg_len-COROSYNC_CSID_LEN, (char*)&nodeid);
}
static void corosync_cpg_confchg_callback(cpg_handle_t handle,
const struct cpg_name *groupName,
const struct cpg_address *member_list, size_t member_list_entries,
const struct cpg_address *left_list, size_t left_list_entries,
const struct cpg_address *joined_list, size_t joined_list_entries)
{
int i;
struct node_info *ninfo;
DEBUGLOG("confchg callback. %zd joined, %zd left, %zd members\n",
joined_list_entries, left_list_entries, member_list_entries);
for (i=0; i<joined_list_entries; i++) {
ninfo = dm_hash_lookup_binary(node_hash,
(char *)&joined_list[i].nodeid,
COROSYNC_CSID_LEN);
if (!ninfo) {
ninfo = malloc(sizeof(struct node_info));
if (!ninfo) {
break;
}
else {
ninfo->nodeid = joined_list[i].nodeid;
dm_hash_insert_binary(node_hash,
(char *)&ninfo->nodeid,
COROSYNC_CSID_LEN, ninfo);
}
}
ninfo->state = NODE_CLVMD;
}
for (i=0; i<left_list_entries; i++) {
ninfo = dm_hash_lookup_binary(node_hash,
(char *)&left_list[i].nodeid,
COROSYNC_CSID_LEN);
if (ninfo)
ninfo->state = NODE_DOWN;
}
num_nodes = member_list_entries;
}
static int _init_cluster(void)
{
cs_error_t err;
#ifdef QUORUM_SET /* corosync/quorum.h */
uint32_t quorum_type;
#endif
node_hash = dm_hash_create(100);
err = cpg_initialize(&cpg_handle,
&corosync_cpg_callbacks);
if (err != CS_OK) {
syslog(LOG_ERR, "Cannot initialise Corosync CPG service: %d",
err);
DEBUGLOG("Cannot initialise Corosync CPG service: %d", err);
return cs_to_errno(err);
}
#ifdef QUORUM_SET
err = quorum_initialize(&quorum_handle,
&quorum_callbacks,
&quorum_type);
if (quorum_type != QUORUM_SET) {
syslog(LOG_ERR, "Corosync quorum service is not configured");
DEBUGLOG("Corosync quorum service is not configured");
return EINVAL;
}
#else
err = quorum_initialize(&quorum_handle,
&quorum_callbacks);
#endif
if (err != CS_OK) {
syslog(LOG_ERR, "Cannot initialise Corosync quorum service: %d",
err);
DEBUGLOG("Cannot initialise Corosync quorum service: %d", err);
return cs_to_errno(err);
}
/* Create a lockspace for LV & VG locks to live in */
lockspace = dlm_open_lockspace(LOCKSPACE_NAME);
if (!lockspace) {
lockspace = dlm_create_lockspace(LOCKSPACE_NAME, 0600);
if (!lockspace) {
syslog(LOG_ERR, "Unable to create DLM lockspace for CLVM: %m");
return -1;
}
DEBUGLOG("Created DLM lockspace for CLVMD.\n");
} else
DEBUGLOG("Opened existing DLM lockspace for CLVMD.\n");
dlm_ls_pthread_init(lockspace);
DEBUGLOG("DLM initialisation complete\n");
/* Connect to the clvmd group */
strcpy((char *)cpg_group_name.value, "clvmd");
cpg_group_name.length = strlen((char *)cpg_group_name.value);
err = cpg_join(cpg_handle, &cpg_group_name);
if (err != CS_OK) {
cpg_finalize(cpg_handle);
quorum_finalize(quorum_handle);
dlm_release_lockspace(LOCKSPACE_NAME, lockspace, 1);
syslog(LOG_ERR, "Cannot join clvmd process group");
DEBUGLOG("Cannot join clvmd process group: %d\n", err);
return cs_to_errno(err);
}
err = cpg_local_get(cpg_handle,
&our_nodeid);
if (err != CS_OK) {
cpg_finalize(cpg_handle);
quorum_finalize(quorum_handle);
dlm_release_lockspace(LOCKSPACE_NAME, lockspace, 1);
syslog(LOG_ERR, "Cannot get local node id\n");
return cs_to_errno(err);
}
DEBUGLOG("Our local node id is %d\n", our_nodeid);
DEBUGLOG("Connected to Corosync\n");
return 0;
}
static void _cluster_closedown(void)
{
dlm_release_lockspace(LOCKSPACE_NAME, lockspace, 1);
cpg_finalize(cpg_handle);
quorum_finalize(quorum_handle);
}
static void _get_our_csid(char *csid)
{
memcpy(csid, &our_nodeid, sizeof(int));
}
/* Corosync doesn't really have nmode names so we
just use the node ID in hex instead */
static int _csid_from_name(char *csid, const char *name)
{
int nodeid;
struct node_info *ninfo;
if (sscanf(name, "%x", &nodeid) == 1) {
ninfo = dm_hash_lookup_binary(node_hash, csid, COROSYNC_CSID_LEN);
if (ninfo)
return nodeid;
}
return -1;
}
static int _name_from_csid(const char *csid, char *name)
{
struct node_info *ninfo;
ninfo = dm_hash_lookup_binary(node_hash, csid, COROSYNC_CSID_LEN);
if (!ninfo)
{
sprintf(name, "UNKNOWN %s", print_corosync_csid(csid));
return -1;
}
sprintf(name, "%x", ninfo->nodeid);
return 0;
}
static int _get_num_nodes(void)
{
DEBUGLOG("num_nodes = %d\n", num_nodes);
return num_nodes;
}
/* Node is now known to be running a clvmd */
static void _add_up_node(const char *csid)
{
struct node_info *ninfo;
ninfo = dm_hash_lookup_binary(node_hash, csid, COROSYNC_CSID_LEN);
if (!ninfo) {
DEBUGLOG("corosync_add_up_node no node_hash entry for csid %s\n",
print_corosync_csid(csid));
return;
}
DEBUGLOG("corosync_add_up_node %d\n", ninfo->nodeid);
ninfo->state = NODE_CLVMD;
return;
}
/* Call a callback for each node, so the caller knows whether it's up or down */
static int _cluster_do_node_callback(struct local_client *master_client,
void (*callback)(struct local_client *,
const char *csid, int node_up))
{
struct dm_hash_node *hn;
struct node_info *ninfo;
dm_hash_iterate(hn, node_hash)
{
char csid[COROSYNC_CSID_LEN];
ninfo = dm_hash_get_data(node_hash, hn);
memcpy(csid, dm_hash_get_key(node_hash, hn), COROSYNC_CSID_LEN);
DEBUGLOG("down_callback. node %d, state = %d\n", ninfo->nodeid,
ninfo->state);
if (ninfo->state == NODE_CLVMD)
callback(master_client, csid, 1);
}
return 0;
}
/* Real locking */
static int _lock_resource(const char *resource, int mode, int flags, int *lockid)
{
struct dlm_lksb lksb;
int err;
DEBUGLOG("lock_resource '%s', flags=%d, mode=%d\n", resource, flags, mode);
if (flags & LKF_CONVERT)
lksb.sb_lkid = *lockid;
err = dlm_ls_lock_wait(lockspace,
mode,
&lksb,
flags,
resource,
strlen(resource),
0,
NULL, NULL, NULL);
if (err != 0)
{
DEBUGLOG("dlm_ls_lock returned %d\n", errno);
return err;
}
if (lksb.sb_status != 0)
{
DEBUGLOG("dlm_ls_lock returns lksb.sb_status %d\n", lksb.sb_status);
errno = lksb.sb_status;
return -1;
}
DEBUGLOG("lock_resource returning %d, lock_id=%x\n", err, lksb.sb_lkid);
*lockid = lksb.sb_lkid;
return 0;
}
static int _unlock_resource(const char *resource, int lockid)
{
struct dlm_lksb lksb;
int err;
DEBUGLOG("unlock_resource: %s lockid: %x\n", resource, lockid);
lksb.sb_lkid = lockid;
err = dlm_ls_unlock_wait(lockspace,
lockid,
0,
&lksb);
if (err != 0)
{
DEBUGLOG("Unlock returned %d\n", err);
return err;
}
if (lksb.sb_status != EUNLOCK)
{
DEBUGLOG("dlm_ls_unlock_wait returns lksb.sb_status: %d\n", lksb.sb_status);
errno = lksb.sb_status;
return -1;
}
return 0;
}
static int _is_quorate(void)
{
int quorate;
if (quorum_getquorate(quorum_handle, &quorate) == CS_OK)
return quorate;
else
return 0;
}
static int _get_main_cluster_fd(void)
{
int select_fd;
cpg_fd_get(cpg_handle, &select_fd);
return select_fd;
}
static int _cluster_fd_callback(struct local_client *fd, char *buf, int len,
const char *csid,
struct local_client **new_client)
{
cluster_client = fd;
*new_client = NULL;
cpg_dispatch(cpg_handle, CS_DISPATCH_ONE);
return 1;
}
static int _cluster_send_message(const void *buf, int msglen, const char *csid,
const char *errtext)
{
static pthread_mutex_t _mutex = PTHREAD_MUTEX_INITIALIZER;
struct iovec iov[2];
cs_error_t err;
int target_node;
if (csid)
memcpy(&target_node, csid, COROSYNC_CSID_LEN);
else
target_node = 0;
iov[0].iov_base = &target_node;
iov[0].iov_len = sizeof(int);
iov[1].iov_base = (char *)buf;
iov[1].iov_len = msglen;
pthread_mutex_lock(&_mutex);
err = cpg_mcast_joined(cpg_handle, CPG_TYPE_AGREED, iov, 2);
pthread_mutex_unlock(&_mutex);
return cs_to_errno(err);
}
#ifdef HAVE_COROSYNC_CONFDB_H
/*
* We are not necessarily connected to a Red Hat Cluster system,
* but if we are, this returns the cluster name from cluster.conf.
* I've used confdb rather than ccs to reduce the inter-package
* dependancies as well as to allow people to set a cluster name
* for themselves even if they are not running on RH cluster.
*/
static int _get_cluster_name(char *buf, int buflen)
{
confdb_handle_t handle;
int result;
size_t namelen = buflen;
hdb_handle_t cluster_handle;
confdb_callbacks_t callbacks = {
.confdb_key_change_notify_fn = NULL,
.confdb_object_create_change_notify_fn = NULL,
.confdb_object_delete_change_notify_fn = NULL
};
/* This is a default in case everything else fails */
strncpy(buf, "Corosync", buflen);
/* Look for a cluster name in confdb */
result = confdb_initialize (&handle, &callbacks);
if (result != CS_OK)
return 0;
result = confdb_object_find_start(handle, OBJECT_PARENT_HANDLE);
if (result != CS_OK)
goto out;
result = confdb_object_find(handle, OBJECT_PARENT_HANDLE, (void *)"cluster", strlen("cluster"), &cluster_handle);
if (result != CS_OK)
goto out;
result = confdb_key_get(handle, cluster_handle, (void *)"name", strlen("name"), buf, &namelen);
if (result != CS_OK)
goto out;
buf[namelen] = '\0';
out:
confdb_finalize(handle);
return 0;
}
#elif defined HAVE_COROSYNC_CMAP_H
static int _get_cluster_name(char *buf, int buflen)
{
cmap_handle_t cmap_handle = 0;
int result;
char *name = NULL;
/* This is a default in case everything else fails */
strncpy(buf, "Corosync", buflen);
/* Look for a cluster name in cmap */
result = cmap_initialize(&cmap_handle);
if (result != CS_OK)
return 0;
result = cmap_get_string(cmap_handle, "totem.cluster_name", &name);
if (result != CS_OK)
goto out;
memset(buf, 0, buflen);
strncpy(buf, name, buflen - 1);
out:
if (name)
free(name);
cmap_finalize(cmap_handle);
return 0;
}
#endif
static struct cluster_ops _cluster_corosync_ops = {
.name = "corosync",
.cluster_init_completed = NULL,
.cluster_send_message = _cluster_send_message,
.name_from_csid = _name_from_csid,
.csid_from_name = _csid_from_name,
.get_num_nodes = _get_num_nodes,
.cluster_fd_callback = _cluster_fd_callback,
.get_main_cluster_fd = _get_main_cluster_fd,
.cluster_do_node_callback = _cluster_do_node_callback,
.is_quorate = _is_quorate,
.get_our_csid = _get_our_csid,
.add_up_node = _add_up_node,
.reread_config = NULL,
.cluster_closedown = _cluster_closedown,
.get_cluster_name = _get_cluster_name,
.sync_lock = _lock_resource,
.sync_unlock = _unlock_resource,
};
struct cluster_ops *init_corosync_cluster(void)
{
if (!_init_cluster())
return &_cluster_corosync_ops;
else
return NULL;
}