1
0
mirror of https://github.com/samba-team/samba.git synced 2025-01-12 09:18:10 +03:00

Merge branch 'master' of git://git.samba.org/sahlberg/ctdb

(This used to be ctdb commit a442668923d4d8f8d624e00138fe37d76d593d21)
This commit is contained in:
Martin Schwenke 2010-02-05 14:00:23 +11:00
commit 240625fe9d
15 changed files with 170 additions and 25 deletions

View File

@ -45,7 +45,7 @@ struct ctdb_queue_pkt {
struct ctdb_queue {
struct ctdb_context *ctdb;
struct ctdb_partial partial; /* partial input packet */
struct ctdb_queue_pkt *out_queue;
struct ctdb_queue_pkt *out_queue, *out_queue_tail;
uint32_t out_queue_length;
struct fd_event *fde;
int fd;
@ -194,7 +194,8 @@ static void queue_io_write(struct ctdb_queue *queue)
if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
if (pkt->length != pkt->full_length) {
/* partial packet sent - we have to drop it */
DLIST_REMOVE(queue->out_queue, pkt);
TLIST_REMOVE(queue->out_queue, queue->out_queue_tail,
pkt);
queue->out_queue_length--;
talloc_free(pkt);
}
@ -213,7 +214,7 @@ static void queue_io_write(struct ctdb_queue *queue)
return;
}
DLIST_REMOVE(queue->out_queue, pkt);
TLIST_REMOVE(queue->out_queue, queue->out_queue_tail, pkt);
queue->out_queue_length--;
talloc_free(pkt);
}
@ -294,7 +295,8 @@ int ctdb_queue_send(struct ctdb_queue *queue, uint8_t *data, uint32_t length)
EVENT_FD_WRITEABLE(queue->fde);
}
DLIST_ADD_END(queue->out_queue, pkt, struct ctdb_queue_pkt *);
TLIST_ADD_END(queue->out_queue, queue->out_queue_tail, pkt);
queue->out_queue_length++;
if (queue->ctdb->tunable.verbose_memory_names != 0) {

View File

@ -186,7 +186,7 @@ int ctdb_sys_send_arp(const ctdb_sock_addr *addr, const char *iface)
return -1;
}
DEBUG(DEBUG_NOTICE, (__location__ " Created SOCKET FD:%d for sending arp\n", s));
DEBUG(DEBUG_DEBUG, (__location__ " Created SOCKET FD:%d for sending arp\n", s));
strncpy(ifr.ifr_name, iface, sizeof(ifr.ifr_name));
if (ioctl(s, SIOCGIFINDEX, &ifr) < 0) {
DEBUG(DEBUG_CRIT,(__location__ " interface '%s' not found\n", iface));
@ -427,7 +427,7 @@ int ctdb_sys_open_capture_socket(const char *iface, void **private_data)
return -1;
}
DEBUG(DEBUG_NOTICE, (__location__ " Created RAW SOCKET FD:%d for tcp tickle\n", s));
DEBUG(DEBUG_DEBUG, (__location__ " Created RAW SOCKET FD:%d for tcp tickle\n", s));
set_nonblocking(s);
set_close_on_exec(s);

View File

@ -135,6 +135,9 @@ struct tdb_transaction {
bool prepared;
tdb_off_t magic_offset;
/* set when the GLOBAL_LOCK has been taken */
bool global_lock_taken;
/* old file size before transaction */
tdb_len_t old_map_size;
@ -603,6 +606,11 @@ int _tdb_transaction_cancel(struct tdb_context *tdb)
}
}
if (tdb->transaction->global_lock_taken) {
tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
tdb->transaction->global_lock_taken = false;
}
/* remove any global lock created during the transaction */
if (tdb->global_lock.count != 0) {
tdb_brlock(tdb, FREELIST_TOP, F_UNLCK, F_SETLKW, 0, 4*tdb->header.hash_size);
@ -947,11 +955,12 @@ static int _tdb_transaction_prepare_commit(struct tdb_context *tdb)
return -1;
}
tdb->transaction->global_lock_taken = true;
if (!(tdb->flags & TDB_NOSYNC)) {
/* write the recovery data to the end of the file */
if (transaction_setup_recovery(tdb, &tdb->transaction->magic_offset) == -1) {
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_prepare_commit: failed to setup recovery data\n"));
tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
_tdb_transaction_cancel(tdb);
return -1;
}
@ -966,7 +975,6 @@ static int _tdb_transaction_prepare_commit(struct tdb_context *tdb)
tdb->transaction->old_map_size) == -1) {
tdb->ecode = TDB_ERR_IO;
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_prepare_commit: expansion failed\n"));
tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
_tdb_transaction_cancel(tdb);
return -1;
}
@ -1056,7 +1064,6 @@ int tdb_transaction_commit(struct tdb_context *tdb)
tdb_transaction_recover(tdb);
_tdb_transaction_cancel(tdb);
tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
TDB_LOG((tdb, TDB_DEBUG_FATAL, "tdb_transaction_commit: write failed\n"));
return -1;
@ -1072,8 +1079,6 @@ int tdb_transaction_commit(struct tdb_context *tdb)
return -1;
}
tdb_brlock(tdb, GLOBAL_LOCK, F_UNLCK, F_SETLKW, 0, 1);
/*
TODO: maybe write to some dummy hdr field, or write to magic
offset without mmap, before the last sync, instead of the

View File

@ -110,4 +110,50 @@ do { \
} \
} while (0)
/*
The TLIST_*() macros are meant for when you have two list pointers,
one pointing at the head of the list and one pointing at the tail
of the list. This makes the common case of adding to the end of the
list and removing from the front of the list efficient
TLIST stands for "tailed list"
Note: When initialising the structure containing your lists, make
sure that you set both head and tail to NULL
Also, do not mix the TLIST_*() macros with the DLIST_* macros!
*/
/* TLIST_ADD_FRONT adds elements to the front of the list. */
#define TLIST_ADD_FRONT(listhead, listtail, p) \
do { \
DLIST_ADD(listhead, p); \
if (NULL == (listtail)) { \
(listtail) = (p); \
} \
} while (0)
/* TLIST_ADD_END adds elements to the end of the list. */
#define TLIST_ADD_END(listhead, listtail, p) \
do { \
if ((listtail) == NULL) { \
DLIST_ADD(listhead, p); \
(listtail) = (listhead); \
} else { \
(listtail)->next = (p); \
(p)->prev = (listtail); \
(p)->next = NULL; \
(listtail) = (p); \
} \
} while (0)
/* TLIST_REMOVE removes an element from the list */
#define TLIST_REMOVE(listhead, listtail, p) \
do { \
if ((p) == (listtail)) { \
(listtail) = (p)->prev; \
} \
DLIST_REMOVE(listhead, p); \
} while (0)
#endif /* _DLINKLIST_H */

View File

@ -100,8 +100,9 @@ static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header
client->ctdb->statistics.client_packets_sent++;
if (hdr->operation == CTDB_REQ_MESSAGE) {
if (ctdb_queue_length(client->queue) > client->ctdb->tunable.max_queue_depth_drop_msg) {
DEBUG(DEBUG_ERR,("Drop CTDB_REQ_MESSAGE to client. Queue full.\n"));
return 0;
DEBUG(DEBUG_ERR,("CTDB_REQ_MESSAGE queue full - killing client connection.\n"));
talloc_free(client);
return -1;
}
}
return ctdb_queue_send(client->queue, (uint8_t *)hdr, hdr->length);
@ -280,6 +281,10 @@ static void daemon_call_from_client_callback(struct ctdb_call_state *state)
memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen);
res = daemon_queue_send(client, &r->hdr);
if (res == -1) {
/* client is dead - return immediately */
return;
}
if (res != 0) {
DEBUG(DEBUG_ERR, (__location__ " Failed to queue packet from daemon to client\n"));
}
@ -890,6 +895,7 @@ static void daemon_control_callback(struct ctdb_context *ctdb,
struct ctdb_client *client = state->client;
struct ctdb_reply_control *r;
size_t len;
int ret;
/* construct a message to send to the client containing the data */
len = offsetof(struct ctdb_reply_control, data) + data.dsize;
@ -910,9 +916,10 @@ static void daemon_control_callback(struct ctdb_context *ctdb,
memcpy(&r->data[r->datalen], errormsg, r->errorlen);
}
daemon_queue_send(client, &r->hdr);
talloc_free(state);
ret = daemon_queue_send(client, &r->hdr);
if (ret != -1) {
talloc_free(state);
}
}
/*

View File

@ -148,7 +148,7 @@ struct lockwait_handle *ctdb_lockwait(struct ctdb_db_context *ctdb_db,
close(result->fd[1]);
set_close_on_exec(result->fd[0]);
DEBUG(DEBUG_NOTICE, (__location__ " Created PIPE FD:%d to child lockwait process\n", result->fd[0]));
DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to child lockwait process\n", result->fd[0]));
talloc_set_destructor(result, lockwait_destructor);

View File

@ -530,7 +530,7 @@ int ctdb_set_child_logging(struct ctdb_context *ctdb)
ctdb_log_handler, ctdb->log);
ctdb->log->pfd = p[0];
DEBUG(DEBUG_NOTICE, (__location__ " Created PIPE FD:%d for logging\n", p[0]));
DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for logging\n", p[0]));
return 0;
}

View File

@ -563,7 +563,7 @@ struct childwrite_handle *ctdb_childwrite(struct ctdb_db_context *ctdb_db,
talloc_set_destructor(result, childwrite_destructor);
DEBUG(DEBUG_NOTICE, (__location__ " Created PIPE FD:%d for ctdb_childwrite\n", result->fd[0]));
DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for ctdb_childwrite\n", result->fd[0]));
result->fde = event_add_fd(ctdb_db->ctdb->ev, result, result->fd[0],
EVENT_FD_READ|EVENT_FD_AUTOCLOSE, childwrite_handler,

View File

@ -748,7 +748,7 @@ int32_t ctdb_control_set_recmode(struct ctdb_context *ctdb,
talloc_set_destructor(state, set_recmode_destructor);
DEBUG(DEBUG_NOTICE, (__location__ " Created PIPE FD:%d for setrecmode\n", state->fd[0]));
DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for setrecmode\n", state->fd[0]));
state->te = event_add_timed(ctdb->ev, state, timeval_current_ofs(5, 0),
ctdb_set_recmode_timeout, state);

View File

@ -3496,7 +3496,7 @@ int ctdb_start_recoverd(struct ctdb_context *ctdb)
exit(1);
}
DEBUG(DEBUG_NOTICE, (__location__ " Created PIPE FD:%d to recovery daemon\n", fd[0]));
DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to recovery daemon\n", fd[0]));
event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ|EVENT_FD_AUTOCLOSE,
ctdb_recoverd_parent, &fd[0]);

View File

@ -186,7 +186,7 @@ static struct ctdb_traverse_local_handle *ctdb_traverse_local(struct ctdb_db_con
setup a packet queue between the child and the parent. This
copes with all the async and packet boundary issues
*/
DEBUG(DEBUG_NOTICE, (__location__ " Created PIPE FD:%d to child traverse\n", h->fd[0]));
DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to child traverse\n", h->fd[0]));
h->queue = ctdb_queue_setup(ctdb_db->ctdb, h, h->fd[0], 0, ctdb_traverse_local_handler, h);
if (h->queue == NULL) {

View File

@ -62,7 +62,7 @@ static const struct {
{ "VacuumLimit", 5000, offsetof(struct ctdb_tunable, vacuum_limit) },
{ "VacuumMinInterval", 60, offsetof(struct ctdb_tunable, vacuum_min_interval) },
{ "VacuumMaxInterval", 600, offsetof(struct ctdb_tunable, vacuum_max_interval) },
{ "MaxQueueDropMsg", 1000, offsetof(struct ctdb_tunable, max_queue_depth_drop_msg) },
{ "MaxQueueDropMsg", 1000000, offsetof(struct ctdb_tunable, max_queue_depth_drop_msg) },
{ "UseStatusEvents", 0, offsetof(struct ctdb_tunable, use_status_events_for_monitoring) },
{ "AllowUnhealthyDBRead", 0, offsetof(struct ctdb_tunable, allow_unhealthy_db_read) }
};

View File

@ -867,7 +867,7 @@ ctdb_vacuum_event(struct event_context *ev, struct timed_event *te,
timeval_current_ofs(ctdb->tunable.vacuum_max_run_time, 0),
vacuum_child_timeout, child_ctx);
DEBUG(DEBUG_INFO, (__location__ " Created PIPE FD:%d to child vacuum process\n", child_ctx->fd[0]));
DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to child vacuum process\n", child_ctx->fd[0]));
event_add_fd(ctdb->ev, child_ctx, child_ctx->fd[0],
EVENT_FD_READ|EVENT_FD_AUTOCLOSE,

View File

@ -258,7 +258,7 @@ static void ctdb_listen_event(struct event_context *ev, struct fd_event *fde,
set_nonblocking(in->fd);
set_close_on_exec(in->fd);
DEBUG(DEBUG_NOTICE, (__location__ " Created SOCKET FD:%d to incoming ctdb connection\n", fd));
DEBUG(DEBUG_DEBUG, (__location__ " Created SOCKET FD:%d to incoming ctdb connection\n", fd));
setsockopt(in->fd,SOL_SOCKET,SO_KEEPALIVE,(char *)&one,sizeof(one));

View File

@ -2085,6 +2085,16 @@ static int control_disable(struct ctdb_context *ctdb, int argc, const char **arg
int ret;
struct ctdb_node_map *nodemap=NULL;
/* check if the node is already disabled */
if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
exit(10);
}
if (nodemap->nodes[options.pnn].flags & NODE_FLAGS_PERMANENTLY_DISABLED) {
DEBUG(DEBUG_ERR,("Node %d is already disabled.\n", options.pnn));
return 0;
}
do {
ret = ctdb_ctrl_modflags(ctdb, TIMELIMIT(), options.pnn, NODE_FLAGS_PERMANENTLY_DISABLED, 0);
if (ret != 0) {
@ -2119,6 +2129,16 @@ static int control_enable(struct ctdb_context *ctdb, int argc, const char **argv
struct ctdb_node_map *nodemap=NULL;
/* check if the node is already enabled */
if (ctdb_ctrl_getnodemap(ctdb, TIMELIMIT(), CTDB_CURRENT_NODE, ctdb, &nodemap) != 0) {
DEBUG(DEBUG_ERR, ("Unable to get nodemap from local node\n"));
exit(10);
}
if (!(nodemap->nodes[options.pnn].flags & NODE_FLAGS_PERMANENTLY_DISABLED)) {
DEBUG(DEBUG_ERR,("Node %d is already enabled.\n", options.pnn));
return 0;
}
do {
ret = ctdb_ctrl_modflags(ctdb, TIMELIMIT(), options.pnn, 0, NODE_FLAGS_PERMANENTLY_DISABLED);
if (ret != 0) {
@ -4118,6 +4138,69 @@ static int control_rddumpmemory(struct ctdb_context *ctdb, int argc, const char
return 0;
}
/*
send a message to a srvid
*/
static int control_msgsend(struct ctdb_context *ctdb, int argc, const char **argv)
{
unsigned long srvid;
int ret;
TDB_DATA data;
if (argc < 2) {
usage();
}
srvid = strtoul(argv[0], NULL, 0);
data.dptr = (uint8_t *)discard_const(argv[1]);
data.dsize= strlen(argv[1]);
ret = ctdb_send_message(ctdb, CTDB_BROADCAST_CONNECTED, srvid, data);
if (ret != 0) {
DEBUG(DEBUG_ERR,("Failed to send memdump request message to %u\n", options.pnn));
return -1;
}
return 0;
}
/*
handler for msglisten
*/
static void msglisten_handler(struct ctdb_context *ctdb, uint64_t srvid,
TDB_DATA data, void *private_data)
{
int i;
printf("Message received: ");
for (i=0;i<data.dsize;i++) {
printf("%c", data.dptr[i]);
}
printf("\n");
}
/*
listen for messages on a messageport
*/
static int control_msglisten(struct ctdb_context *ctdb, int argc, const char **argv)
{
uint64_t srvid;
srvid = getpid();
/* register a message port and listen for messages
*/
ctdb_set_message_handler(ctdb, srvid, msglisten_handler, NULL);
printf("Listening for messages on srvid:%d\n", (int)srvid);
while (1) {
event_loop_once(ctdb->ev);
}
return 0;
}
/*
list all nodes in the cluster
if the daemon is running, we read the data from the daemon.
@ -4316,6 +4399,8 @@ static const struct {
{ "setrecmasterrole", control_setrecmasterrole, false, false, "Set RECMASTER role to on/off", "{on|off}"},
{ "setdbprio", control_setdbprio, false, false, "Set DB priority", "<dbid> <prio:1-3>"},
{ "getdbprio", control_getdbprio, false, false, "Get DB priority", "<dbid>"},
{ "msglisten", control_msglisten, false, false, "Listen on a srvid port for messages", "<msg srvid>"},
{ "msgsend", control_msgsend, false, false, "Send a message to srvid", "<srvid> <message>"},
};
/*