1
0
mirror of https://github.com/samba-team/samba.git synced 2025-08-29 13:49:30 +03:00

Merge from tridge

(This used to be ctdb commit dfca91b3ed5ee3c4531dd9c883bf39dbb8eececa)
This commit is contained in:
Alexander Bokovoy
2006-11-29 12:37:24 +03:00
20 changed files with 1494 additions and 485 deletions

View File

@ -5,3 +5,4 @@ common
config.log
push.sh
ctdb_test
config.cache

View File

@ -18,9 +18,14 @@ CFLAGS=-g -I$(srcdir)/include -Iinclude -I$(srcdir) \
LIB_FLAGS=@LDFLAGS@ -Llib @LIBS@ -lpopt
EVENTS_OBJ = lib/events/events.o lib/events/events_standard.o
EVENTS_OBJ = lib/events/events.o lib/events/events_standard.o
CTDB_OBJ = ctdb_tcp_child.o ctdb_tcp.o util.o
CTDB_COMMON_OBJ = common/ctdb.o common/util.o common/ctdb_util.o \
common/ctdb_call.o common/ctdb_ltdb.o lib/util/idtree.o
CTDB_TCP_OBJ = tcp/tcp_connect.o tcp/tcp_io.o tcp/tcp_init.o
CTDB_OBJ = $(CTDB_COMMON_OBJ) $(CTDB_TCP_OBJ)
OBJS = @TDBOBJ@ @TALLOCOBJ@ @LIBREPLACEOBJ@ $(EXTRA_OBJ) $(EVENTS_OBJ) $(CTDB_OBJ)

194
ctdb/common/ctdb.c Normal file
View File

@ -0,0 +1,194 @@
/*
ctdb main protocol code
Copyright (C) Andrew Tridgell 2006
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation; either
version 2 of the License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public
License along with this library; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include "includes.h"
#include "lib/events/events.h"
#include "system/network.h"
#include "system/filesys.h"
#include "ctdb_private.h"
/*
choose the transport we will use
*/
int ctdb_set_transport(struct ctdb_context *ctdb, const char *transport)
{
int ctdb_tcp_init(struct ctdb_context *ctdb);
if (strcmp(transport, "tcp") == 0) {
return ctdb_tcp_init(ctdb);
}
ctdb_set_error(ctdb, "Unknown transport '%s'\n", transport);
return -1;
}
/*
add a node to the list of active nodes
*/
static int ctdb_add_node(struct ctdb_context *ctdb, char *nstr)
{
struct ctdb_node *node, **nodep;
nodep = talloc_realloc(ctdb, ctdb->nodes, struct ctdb_node *, ctdb->num_nodes+1);
CTDB_NO_MEMORY(ctdb, nodep);
ctdb->nodes = nodep;
nodep = &ctdb->nodes[ctdb->num_nodes];
(*nodep) = talloc_zero(ctdb->nodes, struct ctdb_node);
CTDB_NO_MEMORY(ctdb, *nodep);
node = *nodep;
if (ctdb_parse_address(ctdb, node, nstr, &node->address) != 0) {
return -1;
}
node->ctdb = ctdb;
node->name = talloc_asprintf(node, "%s:%u",
node->address.address,
node->address.port);
/* for now we just set the vnn to the line in the file - this
will change! */
node->vnn = ctdb->num_nodes;
if (ctdb->methods->add_node(node) != 0) {
talloc_free(node);
return -1;
}
if (ctdb_same_address(&ctdb->address, &node->address)) {
ctdb->vnn = node->vnn;
}
ctdb->num_nodes++;
return 0;
}
/*
setup the node list from a file
*/
int ctdb_set_nlist(struct ctdb_context *ctdb, const char *nlist)
{
char **lines;
int nlines;
int i;
lines = file_lines_load(nlist, &nlines, ctdb);
if (lines == NULL) {
ctdb_set_error(ctdb, "Failed to load nlist '%s'\n", nlist);
return -1;
}
for (i=0;i<nlines;i++) {
if (ctdb_add_node(ctdb, lines[i]) != 0) {
talloc_free(lines);
return -1;
}
}
talloc_free(lines);
return 0;
}
/*
setup the local node address
*/
int ctdb_set_address(struct ctdb_context *ctdb, const char *address)
{
if (ctdb_parse_address(ctdb, ctdb, address, &ctdb->address) != 0) {
return -1;
}
ctdb->name = talloc_asprintf(ctdb, "%s:%u",
ctdb->address.address,
ctdb->address.port);
return 0;
}
/*
add a node to the list of active nodes
*/
int ctdb_set_call(struct ctdb_context *ctdb, ctdb_fn_t fn, int id)
{
struct ctdb_registered_call *call;
call = talloc(ctdb, struct ctdb_registered_call);
call->fn = fn;
call->id = id;
DLIST_ADD(ctdb->calls, call);
return 0;
}
/*
start the protocol going
*/
int ctdb_start(struct ctdb_context *ctdb)
{
return ctdb->methods->start(ctdb);
}
/*
called by the transport layer when a packet comes in
*/
static void ctdb_recv_pkt(struct ctdb_context *ctdb, uint8_t *data, uint32_t length)
{
printf("received pkt of length %d\n", length);
}
/*
called by the transport layer when a node is dead
*/
static void ctdb_node_dead(struct ctdb_node *node)
{
printf("%s: node %s is dead\n", node->ctdb->name, node->name);
}
/*
called by the transport layer when a node is dead
*/
static void ctdb_node_connected(struct ctdb_node *node)
{
printf("%s: connected to %s\n", node->ctdb->name, node->name);
}
static const struct ctdb_upcalls ctdb_upcalls = {
.recv_pkt = ctdb_recv_pkt,
.node_dead = ctdb_node_dead,
.node_connected = ctdb_node_connected
};
/*
initialise the ctdb daemon.
NOTE: In current code the daemon does not fork. This is for testing purposes only
and to simplify the code.
*/
struct ctdb_context *ctdb_init(struct event_context *ev)
{
struct ctdb_context *ctdb;
ctdb = talloc_zero(ev, struct ctdb_context);
ctdb->ev = ev;
ctdb->upcalls = &ctdb_upcalls;
ctdb->idr = idr_init(ctdb);
return ctdb;
}

133
ctdb/common/ctdb_call.c Normal file
View File

@ -0,0 +1,133 @@
/*
ctdb_call protocol code
Copyright (C) Andrew Tridgell 2006
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation; either
version 2 of the License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public
License along with this library; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include "includes.h"
#include "lib/events/events.h"
#include "system/network.h"
#include "system/filesys.h"
#include "ctdb_private.h"
/*
local version of ctdb_call
*/
static int ctdb_call_local(struct ctdb_context *ctdb, TDB_DATA key, int call_id,
TDB_DATA *call_data, TDB_DATA *reply_data)
{
struct ctdb_call *c;
struct ctdb_registered_call *fn;
TDB_DATA data;
c = talloc(ctdb, struct ctdb_call);
CTDB_NO_MEMORY(ctdb, c);
data = tdb_fetch(ctdb->ltdb, key);
c->key = key;
c->call_data = call_data;
c->record_data.dptr = talloc_memdup(c, data.dptr, data.dsize);
CTDB_NO_MEMORY(ctdb, c->record_data.dptr);
if (data.dptr) free(data.dptr);
c->new_data = NULL;
c->reply_data = NULL;
for (fn=ctdb->calls;fn;fn=fn->next) {
if (fn->id == call_id) break;
}
if (fn == NULL) {
ctdb_set_error(ctdb, "Unknown call id %u\n", call_id);
return -1;
}
if (fn->fn(c) != 0) {
free(c->record_data.dptr);
ctdb_set_error(ctdb, "ctdb_call %u failed\n", call_id);
return -1;
}
if (c->new_data) {
if (tdb_store(ctdb->ltdb, key, *c->new_data, TDB_REPLACE) != 0) {
ctdb_set_error(ctdb, "ctdb_call tdb_store failed\n");
return -1;
}
}
if (reply_data) {
if (c->reply_data) {
*reply_data = *c->reply_data;
talloc_steal(ctdb, reply_data->dptr);
} else {
reply_data->dptr = NULL;
reply_data->dsize = 0;
}
}
talloc_free(c);
return -1;
}
/*
make a remote ctdb call
*/
int ctdb_call(struct ctdb_context *ctdb, TDB_DATA key, int call_id,
TDB_DATA *call_data, TDB_DATA *reply_data)
{
uint32_t dest;
struct ctdb_req_call *c;
uint32_t len;
struct ctdb_node *node;
dest = ctdb_hash(&key) % ctdb->num_nodes;
if (dest == ctdb->vnn) {
return ctdb_call_local(ctdb, key, call_id, call_data, reply_data);
}
len = sizeof(*c) + key.dsize + (call_data?call_data->dsize:0);
c = talloc_size(ctdb, len);
CTDB_NO_MEMORY(ctdb, c);
c->hdr.operation = CTDB_OP_CALL;
c->hdr.destnode = dest;
c->hdr.srcnode = ctdb->vnn;
/* this limits us to 16k outstanding messages - not unreasonable */
c->hdr.reqid = idr_get_new(ctdb->idr, c, 0xFFFF);
c->callid = call_id;
c->keylen = key.dsize;
c->calldatalen = call_data?call_data->dsize:0;
memcpy(&c->data[0], key.dptr, key.dsize);
if (call_data) {
memcpy(&c->data[key.dsize], call_data->dptr, call_data->dsize);
}
node = ctdb->nodes[dest];
if (ctdb->methods->queue_pkt(node, (uint8_t *)c, len) != 0) {
talloc_free(c);
return -1;
}
/*
event_add_timed(ctdb->ev, c, timeval_current_ofs(CTDB_REQ_TIMEOUT, 0),
ctdb_call_timeout, c);
*/
return -1;
}

View File

@ -1,5 +1,5 @@
/*
ctdb database library
ctdb ltdb code
Copyright (C) Andrew Tridgell 2006
@ -18,3 +18,25 @@
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include "includes.h"
#include "lib/events/events.h"
#include "system/network.h"
#include "system/filesys.h"
#include "ctdb_private.h"
/*
attach to a specific database
*/
int ctdb_attach(struct ctdb_context *ctdb, const char *name, int tdb_flags,
int open_flags, mode_t mode)
{
/* when we have a separate daemon this will need to be a real
file, not a TDB_INTERNAL, so the parent can access it to
for ltdb bypass */
ctdb->ltdb = tdb_open(name, 0, TDB_INTERNAL, 0, 0);
if (ctdb->ltdb == NULL) {
ctdb_set_error(ctdb, "Failed to open tdb %s\n", name);
return -1;
}
return 0;
}

91
ctdb/common/ctdb_util.c Normal file
View File

@ -0,0 +1,91 @@
/*
ctdb utility code
Copyright (C) Andrew Tridgell 2006
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation; either
version 2 of the License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public
License along with this library; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include "includes.h"
#include "lib/events/events.h"
#include "system/network.h"
#include "system/filesys.h"
#include "ctdb_private.h"
/*
return error string for last error
*/
const char *ctdb_errstr(struct ctdb_context *ctdb)
{
return ctdb->err_msg;
}
/*
remember an error message
*/
void ctdb_set_error(struct ctdb_context *ctdb, const char *fmt, ...)
{
va_list ap;
talloc_free(ctdb->err_msg);
va_start(ap, fmt);
ctdb->err_msg = talloc_vasprintf(ctdb, fmt, ap);
va_end(ap);
}
/*
parse a IP:port pair
*/
int ctdb_parse_address(struct ctdb_context *ctdb,
TALLOC_CTX *mem_ctx, const char *str,
struct ctdb_address *address)
{
char *p;
p = strchr(str, ':');
if (p == NULL) {
ctdb_set_error(ctdb, "Badly formed node '%s'\n", str);
return -1;
}
address->address = talloc_strndup(mem_ctx, str, p-str);
address->port = strtoul(p+1, NULL, 0);
return 0;
}
/*
check if two addresses are the same
*/
bool ctdb_same_address(struct ctdb_address *a1, struct ctdb_address *a2)
{
return strcmp(a1->address, a2->address) == 0 && a1->port == a2->port;
}
/*
hash function for mapping data to a VNN - taken from tdb
*/
uint32_t ctdb_hash(TDB_DATA *key)
{
uint32_t value; /* Used to compute the hash value. */
uint32_t i; /* Used to cycle through random values. */
/* Set the initial value from the key size. */
for (value = 0x238F13AF * key->dsize, i=0; i < key->dsize; i++)
value = (value + (key->dptr[i] << (i*5 % 24)));
return (1103515243 * value + 12345);
}

View File

@ -1,373 +0,0 @@
/*
ctdb over TCP
Copyright (C) Andrew Tridgell 2006
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation; either
version 2 of the License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public
License along with this library; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include "includes.h"
#include "lib/events/events.h"
#include "system/network.h"
#include "system/filesys.h"
#include "ctdb_private.h"
/*
initialise the ctdb daemon.
if the ctdb dispatcher daemon has already been started then this
does nothing. Otherwise it forks the ctdb dispatcher daemon and
starts the daemons connecting to each other
NOTE: In current code the daemon does not fork. This is for testing purposes only
and to simplify the code.
*/
struct ctdb_context *ctdb_init(struct event_context *ev)
{
struct ctdb_context *ctdb;
ctdb = talloc_zero(ev, struct ctdb_context);
ctdb->ev = ev;
return ctdb;
}
const char *ctdb_errstr(struct ctdb_context *ctdb)
{
return ctdb->err_msg;
}
/*
remember an error message
*/
static void ctdb_set_error(struct ctdb_context *ctdb, const char *fmt, ...)
{
va_list ap;
talloc_free(ctdb->err_msg);
va_start(ap, fmt);
ctdb->err_msg = talloc_vasprintf(ctdb, fmt, ap);
va_end(ap);
}
/*
called when socket becomes readable
*/
static void ctdb_node_read(struct event_context *ev, struct fd_event *fde,
uint16_t flags, void *private)
{
struct ctdb_node *node = talloc_get_type(private, struct ctdb_node);
printf("connection to node %s:%u is readable\n",
node->address.address, node->address.port);
event_set_fd_flags(fde, 0);
}
static void ctdb_node_connect(struct event_context *ev, struct timed_event *te,
struct timeval t, void *private);
/*
called when socket becomes writeable on connect
*/
static void ctdb_node_connect_write(struct event_context *ev, struct fd_event *fde,
uint16_t flags, void *private)
{
struct ctdb_node *node = talloc_get_type(private, struct ctdb_node);
struct ctdb_context *ctdb = node->ctdb;
int error;
socklen_t len;
if (getsockopt(node->fd, SOL_SOCKET, SO_ERROR, &error, &len) != 0 ||
error != 0) {
if (error == EINPROGRESS) {
printf("connect in progress\n");
return;
}
printf("getsockopt errno=%s\n", strerror(errno));
talloc_free(fde);
close(node->fd);
node->fd = -1;
event_add_timed(ctdb->ev, node, timeval_current_ofs(1, 0),
ctdb_node_connect, node);
return;
}
printf("Established connection to %s:%u\n",
node->address.address, node->address.port);
talloc_free(fde);
event_add_fd(node->ctdb->ev, node, node->fd, EVENT_FD_READ,
ctdb_node_read, node);
}
/*
called when we should try and establish a tcp connection to a node
*/
static void ctdb_node_connect(struct event_context *ev, struct timed_event *te,
struct timeval t, void *private)
{
struct ctdb_node *node = talloc_get_type(private, struct ctdb_node);
struct ctdb_context *ctdb = node->ctdb;
unsigned v;
struct sockaddr_in sock_out;
node->fd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
v = fcntl(node->fd, F_GETFL, 0);
fcntl(node->fd, F_SETFL, v | O_NONBLOCK);
inet_pton(AF_INET, node->address.address, &sock_out.sin_addr);
sock_out.sin_port = htons(node->address.port);
sock_out.sin_family = PF_INET;
if (connect(node->fd, &sock_out, sizeof(sock_out)) != 0 &&
errno != EINPROGRESS) {
/* try again once a second */
close(node->fd);
event_add_timed(ctdb->ev, node, timeval_current_ofs(1, 0),
ctdb_node_connect, node);
return;
}
/* non-blocking connect - wait for write event */
event_add_fd(node->ctdb->ev, node, node->fd, EVENT_FD_WRITE,
ctdb_node_connect_write, node);
}
/*
parse a IP:port pair
*/
static int ctdb_parse_address(struct ctdb_context *ctdb,
TALLOC_CTX *mem_ctx, const char *str,
struct ctdb_address *address)
{
char *p;
p = strchr(str, ':');
if (p == NULL) {
ctdb_set_error(ctdb, "Badly formed node '%s'\n", str);
return -1;
}
address->address = talloc_strndup(mem_ctx, str, p-str);
address->port = strtoul(p+1, NULL, 0);
return 0;
}
/*
add a node to the list of active nodes
*/
static int ctdb_add_node(struct ctdb_context *ctdb, char *nstr)
{
struct ctdb_node *node;
node = talloc(ctdb, struct ctdb_node);
if (ctdb_parse_address(ctdb, node, nstr, &node->address) != 0) {
return -1;
}
node->fd = -1;
node->ctdb = ctdb;
DLIST_ADD(ctdb->nodes, node);
return 0;
}
/*
setup the node list from a file
*/
int ctdb_set_nlist(struct ctdb_context *ctdb, const char *nlist)
{
char **lines;
int nlines;
int i;
lines = file_lines_load(nlist, &nlines, ctdb);
if (lines == NULL) {
ctdb_set_error(ctdb, "Failed to load nlist '%s'\n", nlist);
return -1;
}
for (i=0;i<nlines;i++) {
if (ctdb_add_node(ctdb, lines[i]) != 0) {
talloc_free(lines);
return -1;
}
}
talloc_free(lines);
return 0;
}
/*
setup the node list from a file
*/
int ctdb_set_address(struct ctdb_context *ctdb, const char *address)
{
return ctdb_parse_address(ctdb, ctdb, address, &ctdb->address);
}
/*
add a node to the list of active nodes
*/
int ctdb_set_call(struct ctdb_context *ctdb, ctdb_fn_t fn, int id)
{
struct ctdb_registered_call *call;
call = talloc(ctdb, struct ctdb_registered_call);
call->fn = fn;
call->id = id;
DLIST_ADD(ctdb->calls, call);
return 0;
}
/*
attach to a specific database
*/
int ctdb_attach(struct ctdb_context *ctdb, const char *name, int tdb_flags,
int open_flags, mode_t mode)
{
ctdb->ltdb = tdb_open(name, 0, TDB_INTERNAL, 0, 0);
if (ctdb->ltdb == NULL) {
ctdb_set_error(ctdb, "Failed to open tdb %s\n", name);
return -1;
}
return 0;
}
/*
called when an incoming connection is readable
*/
static void ctdb_incoming_read(struct event_context *ev, struct fd_event *fde,
uint16_t flags, void *private)
{
struct ctdb_incoming *in = talloc_get_type(private, struct ctdb_incoming);
char c;
printf("Incoming data\n");
if (read(in->fd, &c, 1) <= 0) {
/* socket is dead */
close(in->fd);
talloc_free(in);
}
}
/*
called when we get contacted by another node
currently makes no attempt to check if the connection is really from a ctdb
node in our cluster
*/
static void ctdb_listen_event(struct event_context *ev, struct fd_event *fde,
uint16_t flags, void *private)
{
struct ctdb_context *ctdb;
struct sockaddr_in addr;
socklen_t len;
int fd;
struct ctdb_incoming *in;
ctdb = talloc_get_type(private, struct ctdb_context);
memset(&addr, 0, sizeof(addr));
len = sizeof(addr);
fd = accept(ctdb->listen_fd, (struct sockaddr *)&addr, &len);
if (fd == -1) return;
in = talloc(ctdb, struct ctdb_incoming);
in->fd = fd;
in->ctdb = ctdb;
event_add_fd(ctdb->ev, in, in->fd, EVENT_FD_READ,
ctdb_incoming_read, in);
printf("New incoming socket %d\n", in->fd);
}
/*
listen on our own address
*/
static int ctdb_listen(struct ctdb_context *ctdb)
{
struct sockaddr_in sock;
int one = 1;
sock.sin_port = htons(ctdb->address.port);
sock.sin_family = PF_INET;
inet_pton(AF_INET, ctdb->address.address, &sock.sin_addr);
ctdb->listen_fd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
if (ctdb->listen_fd == -1) {
ctdb_set_error(ctdb, "socket failed\n");
return -1;
}
setsockopt(ctdb->listen_fd,SOL_SOCKET,SO_REUSEADDR,(char *)&one,sizeof(one));
if (bind(ctdb->listen_fd, (struct sockaddr * )&sock, sizeof(sock)) != 0) {
ctdb_set_error(ctdb, "bind failed\n");
close(ctdb->listen_fd);
ctdb->listen_fd = -1;
return -1;
}
if (listen(ctdb->listen_fd, 10) == -1) {
ctdb_set_error(ctdb, "listen failed\n");
close(ctdb->listen_fd);
ctdb->listen_fd = -1;
return -1;
}
event_add_fd(ctdb->ev, ctdb, ctdb->listen_fd, EVENT_FD_READ,
ctdb_listen_event, ctdb);
return 0;
}
/*
check if two addresses are the same
*/
static bool ctdb_same_address(struct ctdb_address *a1, struct ctdb_address *a2)
{
return strcmp(a1->address, a2->address) == 0 && a1->port == a2->port;
}
/*
start the protocol going
*/
int ctdb_start(struct ctdb_context *ctdb)
{
struct ctdb_node *node;
/* listen on our own address */
if (ctdb_listen(ctdb) != 0) return -1;
/* startup connections to the other servers - will happen on
next event loop */
for (node=ctdb->nodes;node;node=node->next) {
if (ctdb_same_address(&ctdb->address, &node->address)) continue;
event_add_timed(ctdb->ev, node, timeval_zero(),
ctdb_node_connect, node);
}
return 0;
}
/*
make a remote ctdb call
*/
int ctdb_call(struct ctdb_context *ctdb, TDB_DATA key, int call_id,
TDB_DATA *call_data, TDB_DATA *reply_data)
{
printf("ctdb_call not implemented\n");
return -1;
}

View File

@ -1,70 +0,0 @@
/*
ctdb over TCP
Copyright (C) Andrew Tridgell 2006
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation; either
version 2 of the License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public
License along with this library; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include "includes.h"
#include "system/network.h"
#include "system/filesys.h"
#include "ctdb_private.h"
struct ctdb_child_state {
int sock;
struct event_context *ev;
};
/*
create a unix domain socket and bind it
return a file descriptor open on the socket
*/
static int ux_socket_bind(const char *name)
{
int fd;
struct sockaddr_un addr;
/* get rid of any old socket */
unlink(name);
fd = socket(AF_UNIX, SOCK_DGRAM, 0);
if (fd == -1) return -1;
memset(&addr, 0, sizeof(addr));
addr.sun_family = AF_UNIX;
strncpy(addr.sun_path, name, sizeof(addr.sun_path));
if (bind(fd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
close(fd);
return -1;
}
return fd;
}
/*
start the ctdb tcp child daemon
*/
int ctdb_tcp_child(void)
{
struct ctdb_child_state *state;
state = talloc(NULL, struct ctdb_child_state);
state->sock = ux_socket_bind(CTDB_SOCKET);
return 0;
}

View File

@ -73,12 +73,14 @@ int main(int argc, const char *argv[])
{
struct ctdb_context *ctdb;
const char *nlist = NULL;
const char *transport = "tcp";
const char *myaddress = NULL;
struct poptOption popt_options[] = {
POPT_AUTOHELP
{ "nlist", 0, POPT_ARG_STRING, &nlist, 0, "node list file", "filename" },
{ "listen", 0, POPT_ARG_STRING, &myaddress, 0, "address to listen on", "address" },
{ "transport", 0, POPT_ARG_STRING, &transport, 0, "protocol transport", NULL },
POPT_TABLEEND
};
int opt;
@ -121,6 +123,12 @@ int main(int argc, const char *argv[])
exit(1);
}
ret = ctdb_set_transport(ctdb, transport);
if (ret == -1) {
printf("ctdb_set_transport failed - %s\n", ctdb_errstr(ctdb));
exit(1);
}
/* tell ctdb what address to listen on */
ret = ctdb_set_address(ctdb, myaddress);
if (ret == -1) {
@ -182,6 +190,7 @@ int main(int argc, const char *argv[])
for (i=0;i<data.dsize/sizeof(int);i++) {
printf("%3d\n", ((int *)data.dptr)[i]);
}
talloc_free(data.dptr);
/* shut it down */
talloc_free(ctdb);

View File

@ -40,6 +40,11 @@ struct event_context;
*/
struct ctdb_context *ctdb_init(struct event_context *ev);
/*
choose the transport
*/
int ctdb_set_transport(struct ctdb_context *ctdb, const char *transport);
/*
tell ctdb what address to listen on, in transport specific format
*/
@ -82,3 +87,4 @@ int ctdb_attach(struct ctdb_context *ctdb, const char *name, int tdb_flags,
*/
int ctdb_call(struct ctdb_context *ctdb, TDB_DATA key, int call_id,
TDB_DATA *call_data, TDB_DATA *reply_data);

147
ctdb/include/ctdb_private.h Normal file
View File

@ -0,0 +1,147 @@
/*
ctdb database library
Copyright (C) Andrew Tridgell 2006
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation; either
version 2 of the License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public
License along with this library; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
/*
an installed ctdb remote call
*/
struct ctdb_registered_call {
struct ctdb_registered_call *next, *prev;
uint32_t id;
ctdb_fn_t fn;
};
/*
this address structure might need to be generalised later for some
transports
*/
struct ctdb_address {
const char *address;
int port;
};
/*
state associated with one node
*/
struct ctdb_node {
struct ctdb_context *ctdb;
struct ctdb_address address;
const char *name; /* for debug messages */
void *private; /* private to transport */
uint32_t vnn;
};
/*
transport specific methods
*/
struct ctdb_methods {
int (*start)(struct ctdb_context *); /* start protocol processing */
int (*add_node)(struct ctdb_node *); /* setup a new node */
int (*queue_pkt)(struct ctdb_node *, uint8_t *data, uint32_t length);
};
/*
transport calls up to the ctdb layer
*/
struct ctdb_upcalls {
/* recv_pkt is called when a packet comes in */
void (*recv_pkt)(struct ctdb_context *, uint8_t *data, uint32_t length);
/* node_dead is called when an attempt to send to a node fails */
void (*node_dead)(struct ctdb_node *);
/* node_connected is called when a connection to a node is established */
void (*node_connected)(struct ctdb_node *);
};
/* main state of the ctdb daemon */
struct ctdb_context {
struct event_context *ev;
struct ctdb_address address;
const char *name;
uint32_t vnn; /* our own vnn */
uint32_t num_nodes;
struct idr_context *idr;
struct ctdb_node **nodes; /* array of nodes in the cluster - indexed by vnn */
struct ctdb_registered_call *calls; /* list of registered calls */
char *err_msg;
struct tdb_context *ltdb;
const struct ctdb_methods *methods; /* transport methods */
const struct ctdb_upcalls *upcalls; /* transport upcalls */
void *private; /* private to transport */
};
#define CTDB_NO_MEMORY(ctdb, p) do { if (!(p)) { \
ctdb_set_error(ctdb, "Out of memory at %s:%d", __FILE__, __LINE__); \
return -1; }} while (0)
/* arbitrary maximum timeout for ctdb operations */
#define CTDB_REQ_TIMEOUT 10
/*
operation IDs
*/
enum ctdb_operation {
CTDB_OP_CALL = 0
};
/*
packet structures
*/
struct ctdb_req_header {
uint32_t _length; /* ignored by datagram transports */
uint32_t operation;
uint32_t destnode;
uint32_t srcnode;
uint32_t reqid;
uint32_t reqtimeout;
};
struct ctdb_reply_header {
uint32_t _length; /* ignored by datagram transports */
uint32_t operation;
uint32_t destnode;
uint32_t srcnode;
uint32_t reqid;
};
struct ctdb_req_call {
struct ctdb_req_header hdr;
uint32_t callid;
uint32_t keylen;
uint32_t calldatalen;
uint8_t data[0]; /* key[] followed by calldata[] */
};
struct ctdb_reply_call {
struct ctdb_reply_header hdr;
uint32_t datalen;
uint8_t data[0];
};
/* internal prototypes */
void ctdb_set_error(struct ctdb_context *ctdb, const char *fmt, ...);
bool ctdb_same_address(struct ctdb_address *a1, struct ctdb_address *a2);
int ctdb_parse_address(struct ctdb_context *ctdb,
TALLOC_CTX *mem_ctx, const char *str,
struct ctdb_address *address);
uint32_t ctdb_hash(TDB_DATA *key);

7
ctdb/include/idtree.h Normal file
View File

@ -0,0 +1,7 @@
struct idr_context *idr_init(TALLOC_CTX *mem_ctx);
int idr_get_new(struct idr_context *idp, void *ptr, int limit);
int idr_get_new_above(struct idr_context *idp, void *ptr, int starting_id, int limit);
int idr_get_new_random(struct idr_context *idp, void *ptr, int limit);
void *idr_find(struct idr_context *idp, int id);
int idr_remove(struct idr_context *idp, int id);

View File

@ -3,6 +3,7 @@
#include "replace.h"
#include "talloc.h"
#include "tdb.h"
#include "idtree.h"
#include "ctdb.h"
#include "lib/util/dlinklist.h"

374
ctdb/lib/util/idtree.c Normal file
View File

@ -0,0 +1,374 @@
/*
Unix SMB/CIFS implementation.
very efficient functions to manage mapping a id (such as a fnum) to
a pointer. This is used for fnum and search id allocation.
Copyright (C) Andrew Tridgell 2004
This code is derived from lib/idr.c in the 2.6 Linux kernel, which was
written by Jim Houston jim.houston@ccur.com, and is
Copyright (C) 2002 by Concurrent Computer Corporation
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
*/
/*
see the section marked "public interface" below for documentation
*/
/**
* @file
*/
#include "includes.h"
#define IDR_BITS 5
#define IDR_FULL 0xfffffffful
#if 0 /* unused */
#define TOP_LEVEL_FULL (IDR_FULL >> 30)
#endif
#define IDR_SIZE (1 << IDR_BITS)
#define IDR_MASK ((1 << IDR_BITS)-1)
#define MAX_ID_SHIFT (sizeof(int)*8 - 1)
#define MAX_ID_BIT (1U << MAX_ID_SHIFT)
#define MAX_ID_MASK (MAX_ID_BIT - 1)
#define MAX_LEVEL (MAX_ID_SHIFT + IDR_BITS - 1) / IDR_BITS
#define IDR_FREE_MAX MAX_LEVEL + MAX_LEVEL
#define set_bit(bit, v) (v) |= (1<<(bit))
#define clear_bit(bit, v) (v) &= ~(1<<(bit))
#define test_bit(bit, v) ((v) & (1<<(bit)))
struct idr_layer {
uint32_t bitmap;
struct idr_layer *ary[IDR_SIZE];
int count;
};
struct idr_context {
struct idr_layer *top;
struct idr_layer *id_free;
int layers;
int id_free_cnt;
};
static struct idr_layer *alloc_layer(struct idr_context *idp)
{
struct idr_layer *p;
if (!(p = idp->id_free))
return NULL;
idp->id_free = p->ary[0];
idp->id_free_cnt--;
p->ary[0] = NULL;
return p;
}
static int find_next_bit(uint32_t bm, int maxid, int n)
{
while (n<maxid && !test_bit(n, bm)) n++;
return n;
}
static void free_layer(struct idr_context *idp, struct idr_layer *p)
{
p->ary[0] = idp->id_free;
idp->id_free = p;
idp->id_free_cnt++;
}
static int idr_pre_get(struct idr_context *idp)
{
while (idp->id_free_cnt < IDR_FREE_MAX) {
struct idr_layer *new = talloc_zero(idp, struct idr_layer);
if(new == NULL)
return (0);
free_layer(idp, new);
}
return 1;
}
static int sub_alloc(struct idr_context *idp, void *ptr, int *starting_id)
{
int n, m, sh;
struct idr_layer *p, *new;
struct idr_layer *pa[MAX_LEVEL];
int l, id;
uint32_t bm;
memset(pa, 0, sizeof(pa));
id = *starting_id;
p = idp->top;
l = idp->layers;
pa[l--] = NULL;
while (1) {
/*
* We run around this while until we reach the leaf node...
*/
n = (id >> (IDR_BITS*l)) & IDR_MASK;
bm = ~p->bitmap;
m = find_next_bit(bm, IDR_SIZE, n);
if (m == IDR_SIZE) {
/* no space available go back to previous layer. */
l++;
id = (id | ((1 << (IDR_BITS*l))-1)) + 1;
if (!(p = pa[l])) {
*starting_id = id;
return -2;
}
continue;
}
if (m != n) {
sh = IDR_BITS*l;
id = ((id >> sh) ^ n ^ m) << sh;
}
if ((id >= MAX_ID_BIT) || (id < 0))
return -1;
if (l == 0)
break;
/*
* Create the layer below if it is missing.
*/
if (!p->ary[m]) {
if (!(new = alloc_layer(idp)))
return -1;
p->ary[m] = new;
p->count++;
}
pa[l--] = p;
p = p->ary[m];
}
/*
* We have reached the leaf node, plant the
* users pointer and return the raw id.
*/
p->ary[m] = (struct idr_layer *)ptr;
set_bit(m, p->bitmap);
p->count++;
/*
* If this layer is full mark the bit in the layer above
* to show that this part of the radix tree is full.
* This may complete the layer above and require walking
* up the radix tree.
*/
n = id;
while (p->bitmap == IDR_FULL) {
if (!(p = pa[++l]))
break;
n = n >> IDR_BITS;
set_bit((n & IDR_MASK), p->bitmap);
}
return(id);
}
static int idr_get_new_above_int(struct idr_context *idp, void *ptr, int starting_id)
{
struct idr_layer *p, *new;
int layers, v, id;
idr_pre_get(idp);
id = starting_id;
build_up:
p = idp->top;
layers = idp->layers;
if (!p) {
if (!(p = alloc_layer(idp)))
return -1;
layers = 1;
}
/*
* Add a new layer to the top of the tree if the requested
* id is larger than the currently allocated space.
*/
while ((layers < MAX_LEVEL) && (id >= (1 << (layers*IDR_BITS)))) {
layers++;
if (!p->count)
continue;
if (!(new = alloc_layer(idp))) {
/*
* The allocation failed. If we built part of
* the structure tear it down.
*/
for (new = p; p && p != idp->top; new = p) {
p = p->ary[0];
new->ary[0] = NULL;
new->bitmap = new->count = 0;
free_layer(idp, new);
}
return -1;
}
new->ary[0] = p;
new->count = 1;
if (p->bitmap == IDR_FULL)
set_bit(0, new->bitmap);
p = new;
}
idp->top = p;
idp->layers = layers;
v = sub_alloc(idp, ptr, &id);
if (v == -2)
goto build_up;
return(v);
}
static int sub_remove(struct idr_context *idp, int shift, int id)
{
struct idr_layer *p = idp->top;
struct idr_layer **pa[MAX_LEVEL];
struct idr_layer ***paa = &pa[0];
int n;
*paa = NULL;
*++paa = &idp->top;
while ((shift > 0) && p) {
n = (id >> shift) & IDR_MASK;
clear_bit(n, p->bitmap);
*++paa = &p->ary[n];
p = p->ary[n];
shift -= IDR_BITS;
}
n = id & IDR_MASK;
if (p != NULL && test_bit(n, p->bitmap)) {
clear_bit(n, p->bitmap);
p->ary[n] = NULL;
while(*paa && ! --((**paa)->count)){
free_layer(idp, **paa);
**paa-- = NULL;
}
if ( ! *paa )
idp->layers = 0;
return 0;
}
return -1;
}
static void *_idr_find(struct idr_context *idp, int id)
{
int n;
struct idr_layer *p;
n = idp->layers * IDR_BITS;
p = idp->top;
/*
* This tests to see if bits outside the current tree are
* present. If so, tain't one of ours!
*/
if ((id & ~(~0 << MAX_ID_SHIFT)) >> (n + IDR_BITS))
return NULL;
/* Mask off upper bits we don't use for the search. */
id &= MAX_ID_MASK;
while (n >= IDR_BITS && p) {
n -= IDR_BITS;
p = p->ary[(id >> n) & IDR_MASK];
}
return((void *)p);
}
static int _idr_remove(struct idr_context *idp, int id)
{
struct idr_layer *p;
/* Mask off upper bits we don't use for the search. */
id &= MAX_ID_MASK;
if (sub_remove(idp, (idp->layers - 1) * IDR_BITS, id) == -1) {
return -1;
}
if ( idp->top && idp->top->count == 1 &&
(idp->layers > 1) &&
idp->top->ary[0]) {
/* We can drop a layer */
p = idp->top->ary[0];
idp->top->bitmap = idp->top->count = 0;
free_layer(idp, idp->top);
idp->top = p;
--idp->layers;
}
while (idp->id_free_cnt >= IDR_FREE_MAX) {
p = alloc_layer(idp);
talloc_free(p);
}
return 0;
}
/************************************************************************
this is the public interface
**************************************************************************/
/**
initialise a idr tree. The context return value must be passed to
all subsequent idr calls. To destroy the idr tree use talloc_free()
on this context
*/
_PUBLIC_ struct idr_context *idr_init(TALLOC_CTX *mem_ctx)
{
return talloc_zero(mem_ctx, struct idr_context);
}
/**
allocate the next available id, and assign 'ptr' into its slot.
you can retrieve later this pointer using idr_find()
*/
_PUBLIC_ int idr_get_new(struct idr_context *idp, void *ptr, int limit)
{
int ret = idr_get_new_above_int(idp, ptr, 0);
if (ret > limit) {
idr_remove(idp, ret);
return -1;
}
return ret;
}
/**
allocate a new id, giving the first available value greater than or
equal to the given starting id
*/
_PUBLIC_ int idr_get_new_above(struct idr_context *idp, void *ptr, int starting_id, int limit)
{
int ret = idr_get_new_above_int(idp, ptr, starting_id);
if (ret > limit) {
idr_remove(idp, ret);
return -1;
}
return ret;
}
/**
find a pointer value previously set with idr_get_new given an id
*/
_PUBLIC_ void *idr_find(struct idr_context *idp, int id)
{
return _idr_find(idp, id);
}
/**
remove an id from the idr tree
*/
_PUBLIC_ int idr_remove(struct idr_context *idp, int id)
{
int ret;
ret = _idr_remove((struct idr_context *)idp, id);
if (ret != 0) {
DEBUG(0,("WARNING: attempt to remove unset id %d in idtree\n", id));
}
return ret;
}

View File

@ -19,35 +19,9 @@
*/
/*
a pending ctdb request
*/
struct ctdb_request {
};
/*
an installed ctdb remote call
*/
struct ctdb_registered_call {
struct ctdb_registered_call *next, *prev;
uint32_t id;
ctdb_fn_t fn;
};
struct ctdb_address {
const char *address;
int port;
};
/*
state associated with one node
*/
struct ctdb_node {
struct ctdb_context *ctdb;
struct ctdb_node *next, *prev;
struct ctdb_address address;
int fd;
/* ctdb_tcp main state */
struct ctdb_tcp {
int listen_fd;
};
/*
@ -58,16 +32,28 @@ struct ctdb_incoming {
int fd;
};
/* main state of the ctdb daemon */
struct ctdb_context {
struct event_context *ev;
struct ctdb_address address;
int listen_fd;
struct ctdb_node *nodes; /* list of nodes in the cluster */
struct ctdb_registered_call *calls; /* list of registered calls */
char *err_msg;
struct tdb_context *ltdb;
struct ctdb_tcp_packet {
struct ctdb_tcp_packet *next, *prev;
uint8_t *data;
uint32_t length;
};
/*
state associated with one tcp node
*/
struct ctdb_tcp_node {
int fd;
struct fd_event *fde;
struct ctdb_tcp_packet *queue;
};
#define CTDB_SOCKET "/tmp/ctdb.sock"
/* prototypes internal to tcp transport */
void ctdb_tcp_node_write(struct event_context *ev, struct fd_event *fde,
uint16_t flags, void *private);
void ctdb_tcp_incoming_read(struct event_context *ev, struct fd_event *fde,
uint16_t flags, void *private);
int ctdb_tcp_queue_pkt(struct ctdb_node *node, uint8_t *data, uint32_t length);
int ctdb_tcp_listen(struct ctdb_context *ctdb);
void ctdb_tcp_node_connect(struct event_context *ev, struct timed_event *te,
struct timeval t, void *private);

187
ctdb/tcp/tcp_connect.c Normal file
View File

@ -0,0 +1,187 @@
/*
ctdb over TCP
Copyright (C) Andrew Tridgell 2006
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation; either
version 2 of the License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public
License along with this library; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include "includes.h"
#include "lib/events/events.h"
#include "system/network.h"
#include "system/filesys.h"
#include "ctdb_private.h"
#include "ctdb_tcp.h"
static void set_nonblocking(int fd)
{
unsigned v;
v = fcntl(fd, F_GETFL, 0);
fcntl(fd, F_SETFL, v | O_NONBLOCK);
}
/*
called when socket becomes writeable on connect
*/
static void ctdb_node_connect_write(struct event_context *ev, struct fd_event *fde,
uint16_t flags, void *private)
{
struct ctdb_node *node = talloc_get_type(private, struct ctdb_node);
struct ctdb_tcp_node *tnode = talloc_get_type(node->private,
struct ctdb_tcp_node);
struct ctdb_context *ctdb = node->ctdb;
int error = 0;
socklen_t len;
if (getsockopt(tnode->fd, SOL_SOCKET, SO_ERROR, &error, &len) != 0 ||
error != 0) {
printf("getsockopt error=%s\n", strerror(error));
talloc_free(fde);
close(tnode->fd);
tnode->fd = -1;
event_add_timed(ctdb->ev, node, timeval_current_ofs(1, 0),
ctdb_tcp_node_connect, node);
return;
}
talloc_free(fde);
tnode->fde = event_add_fd(node->ctdb->ev, node, tnode->fd, EVENT_FD_READ,
ctdb_tcp_node_write, node);
/* tell the ctdb layer we are connected */
node->ctdb->upcalls->node_connected(node);
}
/*
called when we should try and establish a tcp connection to a node
*/
void ctdb_tcp_node_connect(struct event_context *ev, struct timed_event *te,
struct timeval t, void *private)
{
struct ctdb_node *node = talloc_get_type(private, struct ctdb_node);
struct ctdb_tcp_node *tnode = talloc_get_type(node->private,
struct ctdb_tcp_node);
struct ctdb_context *ctdb = node->ctdb;
struct sockaddr_in sock_out;
tnode->fd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
set_nonblocking(tnode->fd);
inet_pton(AF_INET, node->address.address, &sock_out.sin_addr);
sock_out.sin_port = htons(node->address.port);
sock_out.sin_family = PF_INET;
if (connect(tnode->fd, &sock_out, sizeof(sock_out)) != 0 &&
errno != EINPROGRESS) {
/* try again once a second */
close(tnode->fd);
event_add_timed(ctdb->ev, node, timeval_current_ofs(1, 0),
ctdb_tcp_node_connect, node);
return;
}
/* non-blocking connect - wait for write event */
event_add_fd(node->ctdb->ev, node, tnode->fd, EVENT_FD_WRITE|EVENT_FD_READ,
ctdb_node_connect_write, node);
}
/*
destroy a ctdb_incoming structure
*/
static int ctdb_incoming_destructor(struct ctdb_incoming *in)
{
close(in->fd);
in->fd = -1;
return 0;
}
/*
called when we get contacted by another node
currently makes no attempt to check if the connection is really from a ctdb
node in our cluster
*/
static void ctdb_listen_event(struct event_context *ev, struct fd_event *fde,
uint16_t flags, void *private)
{
struct ctdb_context *ctdb;
struct ctdb_tcp *ctcp;
struct sockaddr_in addr;
socklen_t len;
int fd;
struct ctdb_incoming *in;
ctdb = talloc_get_type(private, struct ctdb_context);
ctcp = talloc_get_type(ctdb->private, struct ctdb_tcp);
memset(&addr, 0, sizeof(addr));
len = sizeof(addr);
fd = accept(ctcp->listen_fd, (struct sockaddr *)&addr, &len);
if (fd == -1) return;
in = talloc(ctdb, struct ctdb_incoming);
in->fd = fd;
in->ctdb = ctdb;
set_nonblocking(in->fd);
event_add_fd(ctdb->ev, in, in->fd, EVENT_FD_READ,
ctdb_tcp_incoming_read, in);
talloc_set_destructor(in, ctdb_incoming_destructor);
}
/*
listen on our own address
*/
int ctdb_tcp_listen(struct ctdb_context *ctdb)
{
struct ctdb_tcp *ctcp = talloc_get_type(ctdb->private, struct ctdb_tcp);
struct sockaddr_in sock;
int one = 1;
sock.sin_port = htons(ctdb->address.port);
sock.sin_family = PF_INET;
inet_pton(AF_INET, ctdb->address.address, &sock.sin_addr);
ctcp->listen_fd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
if (ctcp->listen_fd == -1) {
ctdb_set_error(ctdb, "socket failed\n");
return -1;
}
setsockopt(ctcp->listen_fd,SOL_SOCKET,SO_REUSEADDR,(char *)&one,sizeof(one));
if (bind(ctcp->listen_fd, (struct sockaddr * )&sock, sizeof(sock)) != 0) {
ctdb_set_error(ctdb, "bind failed\n");
close(ctcp->listen_fd);
ctcp->listen_fd = -1;
return -1;
}
if (listen(ctcp->listen_fd, 10) == -1) {
ctdb_set_error(ctdb, "listen failed\n");
close(ctcp->listen_fd);
ctcp->listen_fd = -1;
return -1;
}
event_add_fd(ctdb->ev, ctdb, ctcp->listen_fd, EVENT_FD_READ,
ctdb_listen_event, ctdb);
return 0;
}

86
ctdb/tcp/tcp_init.c Normal file
View File

@ -0,0 +1,86 @@
/*
ctdb over TCP
Copyright (C) Andrew Tridgell 2006
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation; either
version 2 of the License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public
License along with this library; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include "includes.h"
#include "lib/events/events.h"
#include "system/network.h"
#include "system/filesys.h"
#include "ctdb_private.h"
#include "ctdb_tcp.h"
/*
start the protocol going
*/
int ctdb_tcp_start(struct ctdb_context *ctdb)
{
int i;
/* listen on our own address */
if (ctdb_tcp_listen(ctdb) != 0) return -1;
/* startup connections to the other servers - will happen on
next event loop */
for (i=0;i<ctdb->num_nodes;i++) {
struct ctdb_node *node = *(ctdb->nodes + i);
if (ctdb_same_address(&ctdb->address, &node->address)) continue;
event_add_timed(ctdb->ev, node, timeval_zero(),
ctdb_tcp_node_connect, node);
}
return 0;
}
/*
initialise tcp portion of a ctdb node
*/
int ctdb_tcp_add_node(struct ctdb_node *node)
{
struct ctdb_tcp_node *tnode;
tnode = talloc_zero(node, struct ctdb_tcp_node);
CTDB_NO_MEMORY(node->ctdb, tnode);
tnode->fd = -1;
node->private = tnode;
return 0;
}
static const struct ctdb_methods ctdb_tcp_methods = {
.start = ctdb_tcp_start,
.add_node = ctdb_tcp_add_node,
.queue_pkt = ctdb_tcp_queue_pkt
};
/*
initialise tcp portion of ctdb
*/
int ctdb_tcp_init(struct ctdb_context *ctdb)
{
struct ctdb_tcp *ctcp;
ctcp = talloc_zero(ctdb, struct ctdb_tcp);
CTDB_NO_MEMORY(ctdb, ctcp);
ctcp->listen_fd = -1;
ctdb->private = ctcp;
ctdb->methods = &ctdb_tcp_methods;
return 0;
}

194
ctdb/tcp/tcp_io.c Normal file
View File

@ -0,0 +1,194 @@
/*
ctdb over TCP
Copyright (C) Andrew Tridgell 2006
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation; either
version 2 of the License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public
License along with this library; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include "includes.h"
#include "lib/events/events.h"
#include "system/network.h"
#include "system/filesys.h"
#include "ctdb_private.h"
#include "ctdb_tcp.h"
/*
called when we fail to send a message to a node
*/
static void ctdb_tcp_node_dead(struct event_context *ev, struct timed_event *te,
struct timeval t, void *private)
{
struct ctdb_node *node = talloc_get_type(private, struct ctdb_node);
struct ctdb_tcp_node *tnode = talloc_get_type(node->private,
struct ctdb_tcp_node);
/* flush the queue */
while (tnode->queue) {
struct ctdb_tcp_packet *pkt = tnode->queue;
DLIST_REMOVE(tnode->queue, pkt);
talloc_free(pkt);
}
/* start a new connect cycle to try to re-establish the
link */
talloc_free(tnode->fde);
close(tnode->fd);
tnode->fd = -1;
event_add_timed(node->ctdb->ev, node, timeval_zero(),
ctdb_tcp_node_connect, node);
}
/*
called when socket becomes readable
*/
void ctdb_tcp_node_write(struct event_context *ev, struct fd_event *fde,
uint16_t flags, void *private)
{
struct ctdb_node *node = talloc_get_type(private, struct ctdb_node);
struct ctdb_tcp_node *tnode = talloc_get_type(node->private,
struct ctdb_tcp_node);
if (flags & EVENT_FD_READ) {
/* getting a read event on this fd in the current tcp model is
always an error, as we have separate read and write
sockets. In future we may combine them, but for now it must
mean that the socket is dead, so we try to reconnect */
talloc_free(tnode->fde);
close(tnode->fd);
tnode->fd = -1;
event_add_timed(node->ctdb->ev, node, timeval_zero(),
ctdb_tcp_node_connect, node);
return;
}
while (tnode->queue) {
struct ctdb_tcp_packet *pkt = tnode->queue;
ssize_t n;
n = write(tnode->fd, pkt->data, pkt->length);
if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
event_add_timed(node->ctdb->ev, node, timeval_zero(),
ctdb_tcp_node_dead, node);
EVENT_FD_NOT_WRITEABLE(tnode->fde);
return;
}
if (n <= 0) return;
if (n != pkt->length) {
pkt->length -= n;
pkt->data += n;
return;
}
DLIST_REMOVE(tnode->queue, pkt);
talloc_free(pkt);
}
EVENT_FD_NOT_WRITEABLE(tnode->fde);
}
/*
called when an incoming connection is readable
*/
void ctdb_tcp_incoming_read(struct event_context *ev, struct fd_event *fde,
uint16_t flags, void *private)
{
struct ctdb_incoming *in = talloc_get_type(private, struct ctdb_incoming);
int num_ready = 0;
uint8_t *data;
/* NOTE: we don't yet handle combined packets or partial
packets. Obviously that needed fixing, using a similar
scheme to the Samba4 packet layer */
if (ioctl(in->fd, FIONREAD, &num_ready) != 0 ||
num_ready == 0) {
/* we've lost the link from another node. We don't
notify the upper layers, as we only want to trigger
a full node reorganisation when a send fails - that
allows nodes to restart without penalty as long as
the network is idle */
talloc_free(in);
return;
}
data = talloc_size(in, num_ready);
if (data == NULL) {
/* not much we can do except drop the socket */
talloc_free(in);
return;
}
if (read(in->fd, data, num_ready) != num_ready) {
talloc_free(in);
return;
}
/* tell the ctdb layer above that we have a packet */
in->ctdb->upcalls->recv_pkt(in->ctdb, data, num_ready);
talloc_free(data);
}
/*
queue a packet for sending
*/
int ctdb_tcp_queue_pkt(struct ctdb_node *node, uint8_t *data, uint32_t length)
{
struct ctdb_tcp_node *tnode = talloc_get_type(node->private,
struct ctdb_tcp_node);
struct ctdb_tcp_packet *pkt;
if (tnode->fd == -1) {
ctdb_set_error(node->ctdb, "Sending to dead node %s\n", node->name);
return -1;
}
/* if the queue is empty then try an immediate write, avoiding
queue overhead. This relies on non-blocking sockets */
if (tnode->queue == NULL) {
ssize_t n = write(tnode->fd, data, length);
if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
event_add_timed(node->ctdb->ev, node, timeval_zero(),
ctdb_tcp_node_dead, node);
/* yes, we report success, as the dead node is
handled via a separate event */
return 0;
}
if (n > 0) {
data += n;
length -= n;
}
if (length == 0) return 0;
}
pkt = talloc(tnode, struct ctdb_tcp_packet);
CTDB_NO_MEMORY(node->ctdb, pkt);
pkt->data = talloc_memdup(pkt, data, length);
CTDB_NO_MEMORY(node->ctdb, pkt->data);
pkt->length = length;
if (tnode->queue == NULL) {
EVENT_FD_WRITEABLE(tnode->fde);
}
DLIST_ADD_END(tnode->queue, pkt, struct ctdb_tcp_packet *);
return 0;
}

9
ctdb/tests/test.sh Executable file
View File

@ -0,0 +1,9 @@
#!/bin/sh
killall -q ctdb_test
bin/ctdb_test --nlist nodes.txt --listen 127.0.0.1:9001 &
bin/ctdb_test --nlist nodes.txt --listen 127.0.0.2:9001 &
sleep 3
killall ctdb_test