1
0
mirror of https://github.com/samba-team/samba.git synced 2024-12-22 13:34:15 +03:00

ctdb-daemon: Replace ctdb_message with srvid abstraction

Signed-off-by: Amitay Isaacs <amitay@gmail.com>
Reviewed-by: Martin Schwenke <martin@meltin.net>
This commit is contained in:
Amitay Isaacs 2015-04-08 14:38:26 +10:00 committed by Amitay Isaacs
parent 6272ef0d09
commit 62f1e2579a
16 changed files with 223 additions and 451 deletions

View File

@ -178,6 +178,22 @@ static void ctdb_client_reply_call(struct ctdb_context *ctdb, struct ctdb_req_he
}
}
void ctdb_request_message(struct ctdb_context *ctdb,
struct ctdb_req_header *hdr)
{
struct ctdb_req_message *c = (struct ctdb_req_message *)hdr;
TDB_DATA data;
data.dsize = c->datalen;
data.dptr = talloc_memdup(c, &c->data[0], c->datalen);
if (data.dptr == NULL) {
DEBUG(DEBUG_ERR, (__location__ " Memory allocation failure\n"));
return;
}
srvid_dispatch(ctdb->srv, c->srvid, CTDB_SRVID_ALL, data);
}
static void ctdb_client_reply_control(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
/*
@ -472,7 +488,7 @@ int ctdb_call(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
handler function in the client
*/
int ctdb_client_set_message_handler(struct ctdb_context *ctdb, uint64_t srvid,
ctdb_msg_fn_t handler,
srvid_handler_fn handler,
void *private_data)
{
int res;
@ -486,7 +502,7 @@ int ctdb_client_set_message_handler(struct ctdb_context *ctdb, uint64_t srvid,
}
/* also need to register the handler with our own ctdb structure */
return ctdb_register_message_handler(ctdb, ctdb, srvid, handler, private_data);
return srvid_register(ctdb->srv, ctdb, srvid, handler, private_data);
}
/*
@ -505,7 +521,7 @@ int ctdb_client_remove_message_handler(struct ctdb_context *ctdb, uint64_t srvid
}
/* also need to register the handler with our own ctdb structure */
ctdb_deregister_message_handler(ctdb, srvid, private_data);
srvid_deregister(ctdb->srv, srvid, private_data);
return 0;
}
@ -2182,7 +2198,7 @@ struct traverse_state {
/*
called on each key during a ctdb_traverse
*/
static void traverse_handler(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data, void *p)
static void traverse_handler(uint64_t srvid, TDB_DATA data, void *p)
{
struct traverse_state *state = (struct traverse_state *)p;
struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
@ -3295,6 +3311,13 @@ struct ctdb_context *ctdb_init(struct event_context *ev)
ctdb->lastid = INT_MAX-200;
CTDB_NO_MEMORY_NULL(ctdb, ctdb->idr);
ret = srvid_init(ctdb, &ctdb->srv);
if (ret != 0) {
DEBUG(DEBUG_ERR, ("srvid_init failed (%s)\n", strerror(ret)));
talloc_free(ctdb);
return NULL;
}
ret = ctdb_set_socketname(ctdb, CTDB_SOCKET);
if (ret != 0) {
DEBUG(DEBUG_ERR,(__location__ " ctdb_set_socketname failed.\n"));

View File

@ -1,286 +0,0 @@
/*
ctdb_message protocol code
Copyright (C) Andrew Tridgell 2007
Copyright (C) Amitay Isaacs 2013
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, see <http://www.gnu.org/licenses/>.
*/
/*
see http://wiki.samba.org/index.php/Samba_%26_Clustering for
protocol design and packet details
*/
#include "includes.h"
#include "tdb.h"
#include "system/network.h"
#include "system/filesys.h"
#include "../include/ctdb_private.h"
#include "lib/util/dlinklist.h"
static int message_list_db_init(struct ctdb_context *ctdb)
{
ctdb->message_list_indexdb = tdb_open("messagedb", 8192,
TDB_INTERNAL|
TDB_INCOMPATIBLE_HASH|
TDB_DISALLOW_NESTING,
O_RDWR|O_CREAT, 0);
if (ctdb->message_list_indexdb == NULL) {
DEBUG(DEBUG_ERR, ("Failed to create message list indexdb\n"));
return -1;
}
return 0;
}
static int message_list_db_add(struct ctdb_context *ctdb, uint64_t srvid,
struct ctdb_message_list_header *h)
{
int ret;
TDB_DATA key, data;
if (ctdb->message_list_indexdb == NULL) {
ret = message_list_db_init(ctdb);
if (ret < 0) {
return -1;
}
}
key.dptr = (uint8_t *)&srvid;
key.dsize = sizeof(uint64_t);
data.dptr = (uint8_t *)&h;
data.dsize = sizeof(struct ctdb_message_list_header *);
ret = tdb_store(ctdb->message_list_indexdb, key, data, TDB_INSERT);
if (ret < 0) {
DEBUG(DEBUG_ERR, ("Failed to add message list handler (%s)\n",
tdb_errorstr(ctdb->message_list_indexdb)));
return -1;
}
return 0;
}
static int message_list_db_delete(struct ctdb_context *ctdb, uint64_t srvid)
{
int ret;
TDB_DATA key;
if (ctdb->message_list_indexdb == NULL) {
return -1;
}
key.dptr = (uint8_t *)&srvid;
key.dsize = sizeof(uint64_t);
ret = tdb_delete(ctdb->message_list_indexdb, key);
if (ret < 0) {
DEBUG(DEBUG_ERR, ("Failed to delete message list handler (%s)\n",
tdb_errorstr(ctdb->message_list_indexdb)));
return -1;
}
return 0;
}
static int message_list_db_fetch_parser(TDB_DATA key, TDB_DATA data,
void *private_data)
{
struct ctdb_message_list_header **h =
(struct ctdb_message_list_header **)private_data;
if (data.dsize != sizeof(struct ctdb_message_list_header *)) {
return -1;
}
*h = *(struct ctdb_message_list_header **)data.dptr;
return 0;
}
static int message_list_db_fetch(struct ctdb_context *ctdb, uint64_t srvid,
struct ctdb_message_list_header **h)
{
TDB_DATA key;
if (ctdb->message_list_indexdb == NULL) {
return -1;
}
key.dptr = (uint8_t *)&srvid;
key.dsize = sizeof(uint64_t);
return tdb_parse_record(ctdb->message_list_indexdb, key,
message_list_db_fetch_parser, h);
}
/*
this dispatches the messages to the registered ctdb message handler
*/
int ctdb_dispatch_message(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data)
{
struct ctdb_message_list_header *h;
struct ctdb_message_list *m;
uint64_t srvid_all = CTDB_SRVID_ALL;
int ret;
ret = message_list_db_fetch(ctdb, srvid, &h);
if (ret == 0) {
for (m=h->m; m; m=m->next) {
m->message_handler(ctdb, srvid, data, m->message_private);
}
}
ret = message_list_db_fetch(ctdb, srvid_all, &h);
if (ret == 0) {
for(m=h->m; m; m=m->next) {
m->message_handler(ctdb, srvid, data, m->message_private);
}
}
return 0;
}
/*
called when a CTDB_REQ_MESSAGE packet comes in
*/
void ctdb_request_message(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
{
struct ctdb_req_message *c = (struct ctdb_req_message *)hdr;
TDB_DATA data;
data.dsize = c->datalen;
data.dptr = talloc_memdup(c, &c->data[0], c->datalen);
ctdb_dispatch_message(ctdb, c->srvid, data);
}
/*
* When header is freed, remove all the srvid handlers
*/
static int message_header_destructor(struct ctdb_message_list_header *h)
{
struct ctdb_message_list *m;
while (h->m != NULL) {
m = h->m;
DLIST_REMOVE(h->m, m);
TALLOC_FREE(m);
}
message_list_db_delete(h->ctdb, h->srvid);
DLIST_REMOVE(h->ctdb->message_list_header, h);
return 0;
}
/*
when a client goes away, we need to remove its srvid handler from the list
*/
static int message_handler_destructor(struct ctdb_message_list *m)
{
struct ctdb_message_list_header *h = m->h;
DLIST_REMOVE(h->m, m);
if (h->m == NULL) {
talloc_free(h);
}
return 0;
}
/*
setup handler for receipt of ctdb messages from ctdb_send_message()
*/
int ctdb_register_message_handler(struct ctdb_context *ctdb,
TALLOC_CTX *mem_ctx,
uint64_t srvid,
ctdb_msg_fn_t handler,
void *private_data)
{
struct ctdb_message_list_header *h;
struct ctdb_message_list *m;
int ret;
m = talloc_zero(mem_ctx, struct ctdb_message_list);
CTDB_NO_MEMORY(ctdb, m);
m->message_handler = handler;
m->message_private = private_data;
ret = message_list_db_fetch(ctdb, srvid, &h);
if (ret != 0) {
/* srvid not registered yet */
h = talloc_zero(ctdb, struct ctdb_message_list_header);
CTDB_NO_MEMORY(ctdb, h);
h->ctdb = ctdb;
h->srvid = srvid;
ret = message_list_db_add(ctdb, srvid, h);
if (ret < 0) {
talloc_free(m);
talloc_free(h);
return -1;
}
DLIST_ADD(ctdb->message_list_header, h);
talloc_set_destructor(h, message_header_destructor);
}
m->h = h;
DLIST_ADD(h->m, m);
talloc_set_destructor(m, message_handler_destructor);
return 0;
}
/*
setup handler for receipt of ctdb messages from ctdb_send_message()
*/
int ctdb_deregister_message_handler(struct ctdb_context *ctdb, uint64_t srvid, void *private_data)
{
struct ctdb_message_list_header *h;
struct ctdb_message_list *m;
int ret;
ret = message_list_db_fetch(ctdb, srvid, &h);
if (ret != 0) {
return -1;
}
for (m=h->m; m; m=m->next) {
if (m->message_private == private_data) {
talloc_free(m);
return 0;
}
}
return -1;
}
/*
* check if the given srvid exists
*/
bool ctdb_check_message_handler(struct ctdb_context *ctdb, uint64_t srvid)
{
struct ctdb_message_list_header *h;
int ret;
ret = message_list_db_fetch(ctdb, srvid, &h);
if (ret != 0 || h->m == NULL) {
return false;
}
return true;
}

1
ctdb/include/common/srvid.h Symbolic link
View File

@ -0,0 +1 @@
../../common/srvid.h

View File

@ -19,6 +19,8 @@
#ifndef _CTDB_CLIENT_H
#define _CTDB_CLIENT_H
#include "common/srvid.h"
#include "ctdb_protocol.h"
enum control_state {CTDB_CONTROL_WAIT, CTDB_CONTROL_DONE, CTDB_CONTROL_ERROR, CTDB_CONTROL_TIMEOUT};
@ -127,8 +129,8 @@ uint32_t ctdb_get_pnn(struct ctdb_context *ctdb);
typedef void (*ctdb_msg_fn_t)(struct ctdb_context *, uint64_t srvid,
TDB_DATA data, void *);
int ctdb_client_set_message_handler(struct ctdb_context *ctdb, uint64_t srvid,
ctdb_msg_fn_t handler,
void *private_data);
srvid_handler_fn handler,
void *private_data);
int ctdb_client_remove_message_handler(struct ctdb_context *ctdb,
uint64_t srvid, void *private_data);
int ctdb_client_check_message_handlers(struct ctdb_context *ctdb,
@ -159,12 +161,6 @@ int ctdb_record_store(struct ctdb_record_handle *h, TDB_DATA data);
int ctdb_fetch(struct ctdb_db_context *ctdb_db, TALLOC_CTX *mem_ctx,
TDB_DATA key, TDB_DATA *data);
int ctdb_register_message_handler(struct ctdb_context *ctdb,
TALLOC_CTX *mem_ctx,
uint64_t srvid,
ctdb_msg_fn_t handler,
void *private_data);
struct ctdb_db_context *find_ctdb_db(struct ctdb_context *ctdb, uint32_t id);

View File

@ -264,21 +264,6 @@ struct ctdb_upcalls {
void (*node_connected)(struct ctdb_node *);
};
/* list of message handlers - needs to be changed to a more efficient data
structure so we can find a message handler given a srvid quickly */
struct ctdb_message_list_header {
struct ctdb_message_list_header *next, *prev;
struct ctdb_context *ctdb;
uint64_t srvid;
struct ctdb_message_list *m;
};
struct ctdb_message_list {
struct ctdb_message_list *next, *prev;
struct ctdb_message_list_header *h;
ctdb_msg_fn_t message_handler;
void *message_private;
};
/* additional data required for the daemon mode */
struct ctdb_daemon_data {
int sd;
@ -479,8 +464,7 @@ struct ctdb_context {
const struct ctdb_upcalls *upcalls; /* transport upcalls */
void *private_data; /* private to transport */
struct ctdb_db_context *db_list;
struct ctdb_message_list_header *message_list_header;
struct tdb_context *message_list_indexdb;
struct srvid_context *srv;
struct ctdb_daemon_data daemon;
struct ctdb_statistics statistics;
struct ctdb_statistics statistics_current;
@ -970,11 +954,7 @@ int32_t ctdb_control_traverse_data(struct ctdb_context *ctdb, TDB_DATA data, TDB
int32_t ctdb_control_traverse_kill(struct ctdb_context *ctdb, TDB_DATA indata,
TDB_DATA *outdata, uint32_t srcnode);
int ctdb_dispatch_message(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data);
bool ctdb_check_message_handler(struct ctdb_context *ctdb, uint64_t srvid);
int daemon_register_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid);
int ctdb_deregister_message_handler(struct ctdb_context *ctdb, uint64_t srvid, void *private_data);
int daemon_deregister_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid);
int daemon_check_srvids(struct ctdb_context *ctdb, TDB_DATA indata,
TDB_DATA *outdata);

View File

@ -18,6 +18,7 @@
#include "lib/util/debug.h"
#include "lib/util/samba_util.h"
#include "common/srvid.h"
#include "ctdb_client.h"
#include "ctdb_logging.h"

View File

@ -127,8 +127,8 @@ static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header
message handler for when we are in daemon mode. This redirects the message
to the right client
*/
static void daemon_message_handler(struct ctdb_context *ctdb, uint64_t srvid,
TDB_DATA data, void *private_data)
static void daemon_message_handler(uint64_t srvid, TDB_DATA data,
void *private_data)
{
struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client);
struct ctdb_req_message *r;
@ -136,9 +136,9 @@ static void daemon_message_handler(struct ctdb_context *ctdb, uint64_t srvid,
/* construct a message to send to the client containing the data */
len = offsetof(struct ctdb_req_message, data) + data.dsize;
r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE,
r = ctdbd_allocate_pkt(client->ctdb, client->ctdb, CTDB_REQ_MESSAGE,
len, struct ctdb_req_message);
CTDB_NO_MEMORY_VOID(ctdb, r);
CTDB_NO_MEMORY_VOID(client->ctdb, r);
talloc_set_name_const(r, "req_message packet");
@ -163,7 +163,8 @@ int daemon_register_message_handler(struct ctdb_context *ctdb, uint32_t client_i
DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_register_message_handler\n"));
return -1;
}
res = ctdb_register_message_handler(ctdb, client, srvid, daemon_message_handler, client);
res = srvid_register(ctdb->srv, client, srvid, daemon_message_handler,
client);
if (res != 0) {
DEBUG(DEBUG_ERR,(__location__ " Failed to register handler %llu in daemon\n",
(unsigned long long)srvid));
@ -186,7 +187,7 @@ int daemon_deregister_message_handler(struct ctdb_context *ctdb, uint32_t client
DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_deregister_message_handler\n"));
return -1;
}
return ctdb_deregister_message_handler(ctdb, srvid, client);
return srvid_deregister(ctdb->srv, srvid, client);
}
int daemon_check_srvids(struct ctdb_context *ctdb, TDB_DATA indata,
@ -211,7 +212,7 @@ int daemon_check_srvids(struct ctdb_context *ctdb, TDB_DATA indata,
return -1;
}
for (i=0; i<num_ids; i++) {
if (ctdb_check_message_handler(ctdb, ids[i])) {
if (srvid_exists(ctdb->srv, ids[i]) == 0) {
results[i/8] |= (1 << (i%8));
}
}
@ -1257,6 +1258,11 @@ int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork)
ctdb_set_child_logging(ctdb);
if (srvid_init(ctdb, &ctdb->srv) != 0) {
DEBUG(DEBUG_CRIT,("Failed to setup message srvid context\n"));
exit(1);
}
/* initialize statistics collection */
ctdb_statistics_init(ctdb);
@ -1558,15 +1564,10 @@ struct ctdb_local_message {
static void ctdb_local_message_trigger(struct event_context *ev, struct timed_event *te,
struct timeval t, void *private_data)
{
struct ctdb_local_message *m = talloc_get_type(private_data,
struct ctdb_local_message);
int res;
struct ctdb_local_message *m = talloc_get_type(
private_data, struct ctdb_local_message);
res = ctdb_dispatch_message(m->ctdb, m->srvid, m->data);
if (res != 0) {
DEBUG(DEBUG_ERR, (__location__ " Failed to dispatch message for srvid=%llu\n",
(unsigned long long)m->srvid));
}
srvid_dispatch(m->ctdb->srv, m->srvid, CTDB_SRVID_ALL, m->data);
talloc_free(m);
}

View File

@ -1063,10 +1063,12 @@ static bool vacuum_fetch_process_one(struct ctdb_db_context *ctdb_db,
/*
handler for vacuum fetch
*/
static void vacuum_fetch_handler(struct ctdb_context *ctdb, uint64_t srvid,
TDB_DATA data, void *private_data)
static void vacuum_fetch_handler(uint64_t srvid, TDB_DATA data,
void *private_data)
{
struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
struct ctdb_recoverd *rec = talloc_get_type(
private_data, struct ctdb_recoverd);
struct ctdb_context *ctdb = rec->ctdb;
struct ctdb_marshall_buffer *recs;
int ret, i;
TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
@ -1134,9 +1136,12 @@ done:
/*
* handler for database detach
*/
static void detach_database_handler(struct ctdb_context *ctdb, uint64_t srvid,
TDB_DATA data, void *private_data)
static void detach_database_handler(uint64_t srvid, TDB_DATA data,
void *private_data)
{
struct ctdb_recoverd *rec = talloc_get_type(
private_data, struct ctdb_recoverd);
struct ctdb_context *ctdb = rec->ctdb;
uint32_t db_id;
struct ctdb_db_context *ctdb_db;
@ -2339,9 +2344,11 @@ static void election_send_request(struct event_context *ev, struct timed_event *
/*
handler for memory dumps
*/
static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid,
TDB_DATA data, void *private_data)
static void mem_dump_handler(uint64_t srvid, TDB_DATA data, void *private_data)
{
struct ctdb_recoverd *rec = talloc_get_type(
private_data, struct ctdb_recoverd);
struct ctdb_context *ctdb = rec->ctdb;
TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
TDB_DATA *dump;
int ret;
@ -2382,10 +2389,11 @@ DEBUG(DEBUG_ERR, ("recovery master memory dump\n"));
/*
handler for reload_nodes
*/
static void reload_nodes_handler(struct ctdb_context *ctdb, uint64_t srvid,
TDB_DATA data, void *private_data)
static void reload_nodes_handler(uint64_t srvid, TDB_DATA data,
void *private_data)
{
struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
struct ctdb_recoverd *rec = talloc_get_type(
private_data, struct ctdb_recoverd);
DEBUG(DEBUG_ERR, (__location__ " Reload nodes file from recovery daemon\n"));
@ -2410,16 +2418,17 @@ static void ctdb_rebalance_timeout(struct event_context *ev,
do_takeover_run(rec, rec->nodemap, false);
}
static void recd_node_rebalance_handler(struct ctdb_context *ctdb,
uint64_t srvid,
TDB_DATA data, void *private_data)
static void recd_node_rebalance_handler(uint64_t srvid, TDB_DATA data,
void *private_data)
{
struct ctdb_recoverd *rec = talloc_get_type(
private_data, struct ctdb_recoverd);
struct ctdb_context *ctdb = rec->ctdb;
uint32_t pnn;
uint32_t *t;
int len;
uint32_t deferred_rebalance;
struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
if (rec->recmaster != ctdb_get_pnn(ctdb)) {
return;
@ -2475,10 +2484,11 @@ static void recd_node_rebalance_handler(struct ctdb_context *ctdb,
static void recd_update_ip_handler(struct ctdb_context *ctdb, uint64_t srvid,
TDB_DATA data, void *private_data)
static void recd_update_ip_handler(uint64_t srvid, TDB_DATA data,
void *private_data)
{
struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
struct ctdb_recoverd *rec = talloc_get_type(
private_data, struct ctdb_recoverd);
struct ctdb_public_ip *ip;
if (rec->recmaster != rec->ctdb->pnn) {
@ -2533,22 +2543,21 @@ done:
srvid_request_reply(ctdb, (struct srvid_request *)r, result);
}
static void disable_takeover_runs_handler(struct ctdb_context *ctdb,
uint64_t srvid, TDB_DATA data,
static void disable_takeover_runs_handler(uint64_t srvid, TDB_DATA data,
void *private_data)
{
struct ctdb_recoverd *rec = talloc_get_type(private_data,
struct ctdb_recoverd);
struct ctdb_recoverd *rec = talloc_get_type(
private_data, struct ctdb_recoverd);
srvid_disable_and_reply(ctdb, data, rec->takeover_run);
srvid_disable_and_reply(rec->ctdb, data, rec->takeover_run);
}
/* Backward compatibility for this SRVID */
static void disable_ip_check_handler(struct ctdb_context *ctdb, uint64_t srvid,
TDB_DATA data, void *private_data)
static void disable_ip_check_handler(uint64_t srvid, TDB_DATA data,
void *private_data)
{
struct ctdb_recoverd *rec = talloc_get_type(private_data,
struct ctdb_recoverd);
struct ctdb_recoverd *rec = talloc_get_type(
private_data, struct ctdb_recoverd);
uint32_t timeout;
if (data.dsize != sizeof(uint32_t)) {
@ -2564,17 +2573,16 @@ static void disable_ip_check_handler(struct ctdb_context *ctdb, uint64_t srvid,
timeout = *((uint32_t *)data.dptr);
ctdb_op_disable(rec->takeover_run, ctdb->ev, timeout);
ctdb_op_disable(rec->takeover_run, rec->ctdb->ev, timeout);
}
static void disable_recoveries_handler(struct ctdb_context *ctdb,
uint64_t srvid, TDB_DATA data,
static void disable_recoveries_handler(uint64_t srvid, TDB_DATA data,
void *private_data)
{
struct ctdb_recoverd *rec = talloc_get_type(private_data,
struct ctdb_recoverd);
struct ctdb_recoverd *rec = talloc_get_type(
private_data, struct ctdb_recoverd);
srvid_disable_and_reply(ctdb, data, rec->recovery);
srvid_disable_and_reply(rec->ctdb, data, rec->recovery);
}
/*
@ -2582,12 +2590,12 @@ static void disable_recoveries_handler(struct ctdb_context *ctdb,
handle this later in the monitor_cluster loop so we do not recurse
with other requests to takeover_run()
*/
static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid,
TDB_DATA data, void *private_data)
static void ip_reallocate_handler(uint64_t srvid, TDB_DATA data,
void *private_data)
{
struct srvid_request *request;
struct ctdb_recoverd *rec = talloc_get_type(private_data,
struct ctdb_recoverd);
struct ctdb_recoverd *rec = talloc_get_type(
private_data, struct ctdb_recoverd);
if (data.dsize != sizeof(struct srvid_request)) {
DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
@ -2596,7 +2604,7 @@ static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid,
request = (struct srvid_request *)data.dptr;
srvid_request_add(ctdb, &rec->reallocate_requests, request);
srvid_request_add(rec->ctdb, &rec->reallocate_requests, request);
}
static void process_ipreallocate_requests(struct ctdb_context *ctdb,
@ -2644,10 +2652,11 @@ static void process_ipreallocate_requests(struct ctdb_context *ctdb,
/*
handler for recovery master elections
*/
static void election_handler(struct ctdb_context *ctdb, uint64_t srvid,
TDB_DATA data, void *private_data)
static void election_handler(uint64_t srvid, TDB_DATA data, void *private_data)
{
struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
struct ctdb_recoverd *rec = talloc_get_type(
private_data, struct ctdb_recoverd);
struct ctdb_context *ctdb = rec->ctdb;
int ret;
struct election_message *em = (struct election_message *)data.dptr;
@ -2740,15 +2749,16 @@ static void force_election(struct ctdb_recoverd *rec, uint32_t pnn,
/*
handler for when a node changes its flags
*/
static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid,
TDB_DATA data, void *private_data)
static void monitor_handler(uint64_t srvid, TDB_DATA data, void *private_data)
{
struct ctdb_recoverd *rec = talloc_get_type(
private_data, struct ctdb_recoverd);
struct ctdb_context *ctdb = rec->ctdb;
int ret;
struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
struct ctdb_node_map *nodemap=NULL;
TALLOC_CTX *tmp_ctx;
int i;
struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
int disabled_flag_changed;
if (data.dsize != sizeof(*c)) {
@ -2814,9 +2824,12 @@ static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid,
/*
handler for when we need to push out flag changes ot all other nodes
*/
static void push_flags_handler(struct ctdb_context *ctdb, uint64_t srvid,
TDB_DATA data, void *private_data)
static void push_flags_handler(uint64_t srvid, TDB_DATA data,
void *private_data)
{
struct ctdb_recoverd *rec = talloc_get_type(
private_data, struct ctdb_recoverd);
struct ctdb_context *ctdb = rec->ctdb;
int ret;
struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
struct ctdb_node_map *nodemap=NULL;

View File

@ -674,7 +674,7 @@ static void traverse_start_callback(void *p, TDB_DATA key, TDB_DATA data)
cdata.dptr = (uint8_t *)d;
cdata.dsize = d->length;
ctdb_dispatch_message(state->ctdb, state->srvid, cdata);
srvid_dispatch(state->ctdb->srv, state->srvid, 0, cdata);
if (key.dsize == 0 && data.dsize == 0) {
DEBUG(DEBUG_NOTICE, ("Ending traverse on DB %s (id %d), records %d\n",
state->h->ctdb_db->db_name, state->h->reqid,

View File

@ -78,26 +78,31 @@ static int fetch_func(struct ctdb_call_info *call)
}
static int msg_count;
static int msg_plus, msg_minus;
struct bench_data {
struct ctdb_context *ctdb;
struct tevent_context *ev;
int msg_count;
int msg_plus, msg_minus;
};
/*
handler for messages in bench_ring()
*/
static void ring_message_handler(struct ctdb_context *ctdb, uint64_t srvid,
TDB_DATA data, void *private_data)
static void ring_message_handler(uint64_t srvid, TDB_DATA data,
void *private_data)
{
struct bench_data *bdata = talloc_get_type_abort(
private_data, struct bench_data);
int incr = *(int *)data.dptr;
int *count = (int *)private_data;
int dest;
(*count)++;
dest = (ctdb_get_pnn(ctdb) + num_nodes + incr) % num_nodes;
ctdb_client_send_message(ctdb, dest, srvid, data);
bdata->msg_count++;
dest = (ctdb_get_pnn(bdata->ctdb) + num_nodes + incr) % num_nodes;
ctdb_client_send_message(bdata->ctdb, dest, srvid, data);
if (incr == 1) {
msg_plus++;
bdata->msg_plus++;
} else {
msg_minus++;
bdata->msg_minus++;
}
}
@ -116,10 +121,11 @@ static void send_start_messages(struct ctdb_context *ctdb, int incr)
ctdb_client_send_message(ctdb, dest, 0, data);
}
static void each_second(struct event_context *ev, struct timed_event *te,
struct timeval t, void *private_data)
static void each_second(struct event_context *ev, struct timed_event *te,
struct timeval t, void *private_data)
{
struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
struct bench_data *bdata = talloc_get_type_abort(
private_data, struct bench_data);
/* we kickstart the ring into action by inserting messages from node
with pnn 0.
@ -127,49 +133,57 @@ static void each_second(struct event_context *ev, struct timed_event *te,
running in which case the ring is broken and the messages are lost.
if so, once every second try again to restart the ring
*/
if (msg_plus == 0) {
if (bdata->msg_plus == 0) {
// printf("no messages recevied, try again to kickstart the ring in forward direction...\n");
send_start_messages(ctdb, 1);
send_start_messages(bdata->ctdb, 1);
}
if (msg_minus == 0) {
if (bdata->msg_minus == 0) {
// printf("no messages recevied, try again to kickstart the ring in reverse direction...\n");
send_start_messages(ctdb, -1);
send_start_messages(bdata->ctdb, -1);
}
event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(1, 0), each_second, ctdb);
event_add_timed(bdata->ev, bdata, timeval_current_ofs(1, 0),
each_second, bdata);
}
static void dummy_event(struct event_context *ev, struct timed_event *te,
struct timeval t, void *private_data)
{
struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(1, 0), dummy_event, ctdb);
struct bench_data *bdata = talloc_get_type_abort(
private_data, struct bench_data);
event_add_timed(bdata->ev, bdata, timeval_current_ofs(1, 0),
dummy_event, bdata);
}
/*
benchmark sending messages in a ring around the nodes
*/
static void bench_ring(struct ctdb_context *ctdb, struct event_context *ev)
static void bench_ring(struct bench_data *bdata)
{
int pnn=ctdb_get_pnn(ctdb);
int pnn = ctdb_get_pnn(bdata->ctdb);
if (pnn == 0) {
event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(1, 0), each_second, ctdb);
event_add_timed(bdata->ev, bdata, timeval_current_ofs(1, 0),
each_second, bdata);
} else {
event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(1, 0), dummy_event, ctdb);
event_add_timed(bdata->ev, bdata, timeval_current_ofs(1, 0),
dummy_event, bdata);
}
start_timer();
while (end_timer() < timelimit) {
if (pnn == 0 && msg_count % 10000 == 0 && end_timer() > 0) {
printf("Ring: %.2f msgs/sec (+ve=%d -ve=%d)\r",
msg_count/end_timer(), msg_plus, msg_minus);
if (pnn == 0 && bdata->msg_count % 10000 == 0 && end_timer() > 0) {
printf("Ring: %.2f msgs/sec (+ve=%d -ve=%d)\r",
bdata->msg_count/end_timer(),
bdata->msg_plus, bdata->msg_minus);
fflush(stdout);
}
event_loop_once(ev);
event_loop_once(bdata->ev);
}
printf("Ring: %.2f msgs/sec (+ve=%d -ve=%d)\n",
msg_count/end_timer(), msg_plus, msg_minus);
printf("Ring: %.2f msgs/sec (+ve=%d -ve=%d)\n",
bdata->msg_count/end_timer(),
bdata->msg_plus, bdata->msg_minus);
}
/*
@ -194,6 +208,7 @@ int main(int argc, const char *argv[])
int ret;
poptContext pc;
struct event_context *ev;
struct bench_data *bdata;
pc = poptGetContext(argv[0], argc, argv, popt_options, POPT_CONTEXT_KEEP_FIRST);
@ -244,7 +259,14 @@ int main(int argc, const char *argv[])
DEBUG(DEBUG_DEBUG,("ctdb_set_call() failed, ignoring return code %d\n", ret));
}
if (ctdb_client_set_message_handler(ctdb, 0, ring_message_handler,&msg_count))
bdata = talloc_zero(ctdb, struct bench_data);
if (bdata == NULL) {
goto error;
}
bdata->ctdb = ctdb;
bdata->ev = ev;
if (ctdb_client_set_message_handler(ctdb, 0, ring_message_handler, bdata))
goto error;
printf("Waiting for cluster\n");
@ -255,8 +277,8 @@ int main(int argc, const char *argv[])
event_loop_once(ev);
}
bench_ring(ctdb, ev);
bench_ring(bdata);
error:
return 0;
}

View File

@ -43,7 +43,12 @@ static double end_timer(void)
static int timelimit = 10;
static int num_records = 10;
static int num_nodes;
static int msg_count;
struct bench_data {
struct ctdb_context *ctdb;
struct tevent_context *ev;
int msg_count;
};
#define TESTKEY "testkey"
@ -52,8 +57,9 @@ static int msg_count;
store a expanded record
send a message to next node to tell it to do the same
*/
static void bench_fetch_1node(struct ctdb_context *ctdb)
static void bench_fetch_1node(struct bench_data *bdata)
{
struct ctdb_context *ctdb = bdata->ctdb;
TDB_DATA key, data, nulldata;
struct ctdb_db_context *ctdb_db;
TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
@ -82,7 +88,8 @@ static void bench_fetch_1node(struct ctdb_context *ctdb)
}
data.dptr = (uint8_t *)talloc_asprintf_append((char *)data.dptr,
"msg_count=%d on node %d\n",
msg_count, ctdb_get_pnn(ctdb));
bdata->msg_count,
ctdb_get_pnn(ctdb));
if (data.dptr == NULL) {
printf("Failed to create record\n");
talloc_free(tmp_ctx);
@ -109,11 +116,13 @@ static void bench_fetch_1node(struct ctdb_context *ctdb)
/*
handler for messages in bench_ring()
*/
static void message_handler(struct ctdb_context *ctdb, uint64_t srvid,
TDB_DATA data, void *private_data)
static void message_handler(uint64_t srvid, TDB_DATA data, void *private_data)
{
msg_count++;
bench_fetch_1node(ctdb);
struct bench_data *bdata = talloc_get_type_abort(
private_data, struct bench_data);
bdata->msg_count++;
bench_fetch_1node(bdata);
}
@ -134,36 +143,38 @@ static void timeout_handler(struct event_context *ev, struct timed_event *timer,
send a message to next node to tell it to do the same
*/
static void bench_fetch(struct ctdb_context *ctdb, struct event_context *ev)
static void bench_fetch(struct bench_data *bdata)
{
struct ctdb_context *ctdb = bdata->ctdb;
int pnn=ctdb_get_pnn(ctdb);
if (pnn == num_nodes - 1) {
bench_fetch_1node(ctdb);
bench_fetch_1node(bdata);
}
start_timer();
event_add_timed(ev, ctdb, timeval_current_ofs(timelimit,0), timeout_handler, NULL);
event_add_timed(bdata->ev, bdata, timeval_current_ofs(timelimit,0),
timeout_handler, NULL);
while (end_timer() < timelimit) {
if (pnn == 0 && msg_count % 100 == 0 && end_timer() > 0) {
printf("Fetch: %.2f msgs/sec\r", msg_count/end_timer());
if (pnn == 0 && bdata->msg_count % 100 == 0 && end_timer() > 0) {
printf("Fetch: %.2f msgs/sec\r", bdata->msg_count/end_timer());
fflush(stdout);
}
if (event_loop_once(ev) != 0) {
if (event_loop_once(bdata->ev) != 0) {
printf("Event loop failed!\n");
break;
}
}
printf("Fetch: %.2f msgs/sec\n", msg_count/end_timer());
printf("Fetch: %.2f msgs/sec\n", bdata->msg_count/end_timer());
}
/*
handler for reconfigure message
*/
static void reconfigure_handler(struct ctdb_context *ctdb, uint64_t srvid,
TDB_DATA data, void *private_data)
static void reconfigure_handler(uint64_t srvid, TDB_DATA data,
void *private_data)
{
int *ready = (int *)private_data;
*ready = 1;
@ -193,6 +204,7 @@ int main(int argc, const char *argv[])
TDB_DATA key, data;
struct ctdb_record_handle *h;
int cluster_ready=0;
struct bench_data *bdata;
pc = poptGetContext(argv[0], argc, argv, popt_options, POPT_CONTEXT_KEEP_FIRST);
@ -240,7 +252,16 @@ int main(int argc, const char *argv[])
exit(1);
}
ctdb_client_set_message_handler(ctdb, 0, message_handler, &msg_count);
bdata = talloc_zero(ctdb, struct bench_data);
if (bdata == NULL) {
printf("memory allocation error\n");
exit(1);
}
bdata->ctdb = ctdb;
bdata->ev = ev;
ctdb_client_set_message_handler(ctdb, 0, message_handler, bdata);
printf("Waiting for cluster\n");
while (1) {
@ -257,7 +278,7 @@ int main(int argc, const char *argv[])
*/
printf("Sleeping for %d seconds\n", num_nodes);
sleep(num_nodes);
bench_fetch(ctdb, ev);
bench_fetch(bdata);
key.dptr = discard_const(TESTKEY);
key.dsize = strlen(TESTKEY);

View File

@ -132,7 +132,7 @@ int ctdb_ctrl_getdbseqnum(struct ctdb_context *ctdb, struct timeval timeout,
uint32_t destnode, uint32_t dbid, uint64_t *seqnum);
int ctdb_client_set_message_handler(struct ctdb_context *ctdb,
uint64_t srvid,
ctdb_msg_fn_t handler,
srvid_handler_fn handler,
void *private_data);
int ctdb_client_remove_message_handler(struct ctdb_context *ctdb,
uint64_t srvid,
@ -173,7 +173,8 @@ ctdb_get_capabilities(struct ctdb_context *ctdb,
#include "common/ctdb_io.c"
#include "common/ctdb_util.c"
#include "common/ctdb_ltdb.c"
#include "common/ctdb_message.c"
#include "common/db_hash.c"
#include "common/srvid.c"
#include "common/rb_tree.c"
#include "common/ctdb_logging.c"
#include "common/ctdb_fork.c"

View File

@ -610,7 +610,7 @@ ctdb_ctrl_get_ifaces_stub(struct ctdb_context *ctdb,
* the ctdb tool only registers one at a time so keep this simple. */
static struct {
uint64_t srvid;
ctdb_msg_fn_t message_handler;
srvid_handler_fn message_handler;
void *message_private;
} ctdb_message_list_fake = {
.srvid = 0,
@ -620,7 +620,7 @@ static struct {
int ctdb_client_set_message_handler_stub(struct ctdb_context *ctdb,
uint64_t srvid,
ctdb_msg_fn_t handler,
srvid_handler_fn handler,
void *private_data)
{
ctdb_message_list_fake.srvid = srvid;
@ -649,7 +649,6 @@ static void ctdb_fake_handler_pnn_reply(struct ctdb_context *ctdb,
reply_data.dsize = sizeof(pnn);
reply_data.dptr = (uint8_t *)&pnn;
ctdb_message_list_fake.message_handler(
ctdb,
ctdb_message_list_fake.srvid,
reply_data,
ctdb_message_list_fake.message_private);

View File

@ -35,7 +35,8 @@ bool fast_start;
#include "common/ctdb_io.c"
#include "common/ctdb_util.c"
#include "common/ctdb_ltdb.c"
#include "common/ctdb_message.c"
#include "common/db_hash.c"
#include "common/srvid.c"
#include "common/cmdline.c"
#include "common/rb_tree.c"
#include "common/ctdb_logging.c"

View File

@ -2189,9 +2189,7 @@ struct srvid_reply_handler_data {
const char *srvid_str;
};
static void srvid_broadcast_reply_handler(struct ctdb_context *ctdb,
uint64_t srvid,
TDB_DATA data,
static void srvid_broadcast_reply_handler(uint64_t srvid, TDB_DATA data,
void *private_data)
{
struct srvid_reply_handler_data *d =
@ -6036,8 +6034,7 @@ static int control_dumpmemory(struct ctdb_context *ctdb, int argc, const char **
/*
handler for memory dumps
*/
static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid,
TDB_DATA data, void *private_data)
static void mem_dump_handler(uint64_t srvid, TDB_DATA data, void *private_data)
{
sys_write(1, data.dptr, data.dsize);
exit(0);
@ -6108,8 +6105,8 @@ static int control_msgsend(struct ctdb_context *ctdb, int argc, const char **arg
/*
handler for msglisten
*/
static void msglisten_handler(struct ctdb_context *ctdb, uint64_t srvid,
TDB_DATA data, void *private_data)
static void msglisten_handler(uint64_t srvid, TDB_DATA data,
void *private_data)
{
int i;

View File

@ -320,7 +320,7 @@ def build(bld):
bld.SAMBA_SUBSYSTEM('ctdb-common',
source=bld.SUBDIR('common',
'''ctdb_io.c ctdb_util.c ctdb_ltdb.c
ctdb_message.c cmdline.c rb_tree.c
cmdline.c rb_tree.c
ctdb_fork.c'''),
includes='include include/internal common .',
deps='replace popt talloc tevent tdb popt ctdb-system')
@ -340,7 +340,7 @@ def build(bld):
source=bld.SUBDIR('client', 'ctdb_client.c'),
includes='include include/internal',
deps='''replace popt talloc tevent tdb
samba-util tdb-wrap''')
samba-util tdb-wrap ctdb-util''')
bld.SAMBA_SUBSYSTEM('ctdb-server',
source='server/ctdbd.c ' +
@ -367,14 +367,15 @@ def build(bld):
bld.SAMBA_BINARY('ctdbd',
source='',
deps='''ctdb-server ctdb-client ctdb-common
ctdb-common-util ctdb-tcp''' +
ctdb-common-util ctdb-tcp ctdb-util''' +
ib_deps,
install_path='${SBINDIR}',
manpages='ctdbd.1')
bld.SAMBA_BINARY('ctdb',
source='tools/ctdb.c tools/ctdb_vacuum.c',
deps='ctdb-client ctdb-common ctdb-common-util',
deps='''ctdb-client ctdb-common ctdb-common-util
ctdb-util''',
includes='include include/internal',
install_path='${BINDIR}',
manpages='ctdb.1')
@ -610,7 +611,8 @@ def build(bld):
bld.SAMBA_BINARY(target,
source=src,
includes='include include/internal',
deps='ctdb-client ctdb-common ctdb-common-util',
deps='''ctdb-client ctdb-common ctdb-common-util
ctdb-util''',
install_path='${CTDB_TEST_LIBDIR}')
bld.SAMBA_BINARY('ctdb_takeover_tests',