1
0
mirror of https://github.com/samba-team/samba.git synced 2025-03-27 22:50:26 +03:00

started splitting out transport code

(This used to be ctdb commit 3b75ef65bd0bff9c6366aba5a26b90be509fa77b)
This commit is contained in:
Andrew Tridgell 2006-11-27 21:38:13 +11:00
parent 561cb43f35
commit 749a6b4c3a
10 changed files with 356 additions and 259 deletions

View File

@ -20,7 +20,11 @@ LIB_FLAGS=@LDFLAGS@ -Llib @LIBS@ -lpopt
EVENTS_OBJ = lib/events/events.o lib/events/events_standard.o
CTDB_OBJ = ctdb_tcp_child.o ctdb_tcp.o util.o
CTDB_COMMON_OBJ = common/ctdb.o common/util.o
CTDB_TCP_OBJ = tcp/ctdb_tcp.o
CTDB_OBJ = $(CTDB_COMMON_OBJ) $(CTDB_TCP_OBJ)
OBJS = @TDBOBJ@ @TALLOCOBJ@ @LIBREPLACEOBJ@ $(EXTRA_OBJ) $(EVENTS_OBJ) $(CTDB_OBJ)

214
ctdb/common/ctdb.c Normal file
View File

@ -0,0 +1,214 @@
/*
ctdb over TCP
Copyright (C) Andrew Tridgell 2006
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation; either
version 2 of the License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public
License along with this library; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include "includes.h"
#include "lib/events/events.h"
#include "system/network.h"
#include "system/filesys.h"
#include "ctdb_private.h"
/*
initialise the ctdb daemon.
if the ctdb dispatcher daemon has already been started then this
does nothing. Otherwise it forks the ctdb dispatcher daemon and
starts the daemons connecting to each other
NOTE: In current code the daemon does not fork. This is for testing purposes only
and to simplify the code.
*/
struct ctdb_context *ctdb_init(struct event_context *ev)
{
struct ctdb_context *ctdb;
ctdb = talloc_zero(ev, struct ctdb_context);
ctdb->ev = ev;
return ctdb;
}
const char *ctdb_errstr(struct ctdb_context *ctdb)
{
return ctdb->err_msg;
}
/*
remember an error message
*/
void ctdb_set_error(struct ctdb_context *ctdb, const char *fmt, ...)
{
va_list ap;
talloc_free(ctdb->err_msg);
va_start(ap, fmt);
ctdb->err_msg = talloc_vasprintf(ctdb, fmt, ap);
va_end(ap);
}
/*
choose the transport we will use
*/
int ctdb_set_transport(struct ctdb_context *ctdb, const char *transport)
{
int ctdb_tcp_init(struct ctdb_context *ctdb);
if (strcmp(transport, "tcp") == 0) {
return ctdb_tcp_init(ctdb);
}
ctdb_set_error(ctdb, "Unknown transport '%s'\n", transport);
return -1;
}
/*
parse a IP:port pair
*/
static int ctdb_parse_address(struct ctdb_context *ctdb,
TALLOC_CTX *mem_ctx, const char *str,
struct ctdb_address *address)
{
char *p;
p = strchr(str, ':');
if (p == NULL) {
ctdb_set_error(ctdb, "Badly formed node '%s'\n", str);
return -1;
}
address->address = talloc_strndup(mem_ctx, str, p-str);
address->port = strtoul(p+1, NULL, 0);
return 0;
}
/*
add a node to the list of active nodes
*/
static int ctdb_add_node(struct ctdb_context *ctdb, char *nstr)
{
struct ctdb_node *node;
node = talloc(ctdb, struct ctdb_node);
if (ctdb_parse_address(ctdb, node, nstr, &node->address) != 0) {
return -1;
}
node->ctdb = ctdb;
if (ctdb->methods->add_node(node) != 0) {
talloc_free(node);
return -1;
}
DLIST_ADD(ctdb->nodes, node);
return 0;
}
/*
setup the node list from a file
*/
int ctdb_set_nlist(struct ctdb_context *ctdb, const char *nlist)
{
char **lines;
int nlines;
int i;
lines = file_lines_load(nlist, &nlines, ctdb);
if (lines == NULL) {
ctdb_set_error(ctdb, "Failed to load nlist '%s'\n", nlist);
return -1;
}
for (i=0;i<nlines;i++) {
if (ctdb_add_node(ctdb, lines[i]) != 0) {
talloc_free(lines);
return -1;
}
}
talloc_free(lines);
return 0;
}
/*
setup the local node address
*/
int ctdb_set_address(struct ctdb_context *ctdb, const char *address)
{
return ctdb_parse_address(ctdb, ctdb, address, &ctdb->address);
}
/*
add a node to the list of active nodes
*/
int ctdb_set_call(struct ctdb_context *ctdb, ctdb_fn_t fn, int id)
{
struct ctdb_registered_call *call;
call = talloc(ctdb, struct ctdb_registered_call);
call->fn = fn;
call->id = id;
DLIST_ADD(ctdb->calls, call);
return 0;
}
/*
attach to a specific database
*/
int ctdb_attach(struct ctdb_context *ctdb, const char *name, int tdb_flags,
int open_flags, mode_t mode)
{
/* when we have a separate daemon this will need to be a real
file, not a TDB_INTERNAL, so the parent can access it to
for ltdb bypass */
ctdb->ltdb = tdb_open(name, 0, TDB_INTERNAL, 0, 0);
if (ctdb->ltdb == NULL) {
ctdb_set_error(ctdb, "Failed to open tdb %s\n", name);
return -1;
}
return 0;
}
/*
start the protocol going
*/
int ctdb_start(struct ctdb_context *ctdb)
{
return ctdb->methods->start(ctdb);
}
/*
make a remote ctdb call
*/
int ctdb_call(struct ctdb_context *ctdb, TDB_DATA key, int call_id,
TDB_DATA *call_data, TDB_DATA *reply_data)
{
printf("ctdb_call not implemented\n");
return -1;
}
/*
check if two addresses are the same
*/
bool ctdb_same_address(struct ctdb_address *a1, struct ctdb_address *a2)
{
return strcmp(a1->address, a2->address) == 0 && a1->port == a2->port;
}

View File

@ -1,70 +0,0 @@
/*
ctdb over TCP
Copyright (C) Andrew Tridgell 2006
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation; either
version 2 of the License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public
License along with this library; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
#include "includes.h"
#include "system/network.h"
#include "system/filesys.h"
#include "ctdb_private.h"
struct ctdb_child_state {
int sock;
struct event_context *ev;
};
/*
create a unix domain socket and bind it
return a file descriptor open on the socket
*/
static int ux_socket_bind(const char *name)
{
int fd;
struct sockaddr_un addr;
/* get rid of any old socket */
unlink(name);
fd = socket(AF_UNIX, SOCK_DGRAM, 0);
if (fd == -1) return -1;
memset(&addr, 0, sizeof(addr));
addr.sun_family = AF_UNIX;
strncpy(addr.sun_path, name, sizeof(addr.sun_path));
if (bind(fd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
close(fd);
return -1;
}
return fd;
}
/*
start the ctdb tcp child daemon
*/
int ctdb_tcp_child(void)
{
struct ctdb_child_state *state;
state = talloc(NULL, struct ctdb_child_state);
state->sock = ux_socket_bind(CTDB_SOCKET);
return 0;
}

View File

@ -73,12 +73,14 @@ int main(int argc, const char *argv[])
{
struct ctdb_context *ctdb;
const char *nlist = NULL;
const char *transport = "tcp";
const char *myaddress = NULL;
struct poptOption popt_options[] = {
POPT_AUTOHELP
{ "nlist", 0, POPT_ARG_STRING, &nlist, 0, "node list file", "filename" },
{ "listen", 0, POPT_ARG_STRING, &myaddress, 0, "address to listen on", "address" },
{ "transport", 0, POPT_ARG_STRING, &transport, 0, "protocol transport", NULL },
POPT_TABLEEND
};
int opt;
@ -121,6 +123,12 @@ int main(int argc, const char *argv[])
exit(1);
}
ret = ctdb_set_transport(ctdb, transport);
if (ret == -1) {
printf("ctdb_set_transport failed - %s\n", ctdb_errstr(ctdb));
exit(1);
}
/* tell ctdb what address to listen on */
ret = ctdb_set_address(ctdb, myaddress);
if (ret == -1) {

View File

@ -40,6 +40,11 @@ struct event_context;
*/
struct ctdb_context *ctdb_init(struct event_context *ev);
/*
choose the transport
*/
int ctdb_set_transport(struct ctdb_context *ctdb, const char *transport);
/*
tell ctdb what address to listen on, in transport specific format
*/
@ -82,3 +87,4 @@ int ctdb_attach(struct ctdb_context *ctdb, const char *name, int tdb_flags,
*/
int ctdb_call(struct ctdb_context *ctdb, TDB_DATA key, int call_id,
TDB_DATA *call_data, TDB_DATA *reply_data);

View File

@ -19,13 +19,6 @@
*/
/*
a pending ctdb request
*/
struct ctdb_request {
};
/*
an installed ctdb remote call
*/
@ -35,6 +28,10 @@ struct ctdb_registered_call {
ctdb_fn_t fn;
};
/*
this address structure might need to be generalised later for some
transports
*/
struct ctdb_address {
const char *address;
int port;
@ -47,27 +44,36 @@ struct ctdb_node {
struct ctdb_context *ctdb;
struct ctdb_node *next, *prev;
struct ctdb_address address;
int fd;
void *private; /* private to transport */
};
/*
state associated with an incoming connection
transport specific methods
*/
struct ctdb_incoming {
struct ctdb_context *ctdb;
int fd;
struct ctdb_methods {
int (*start)(struct ctdb_context *); /* start protocol processing */
int (*add_node)(struct ctdb_node *); /* setup a new node */
};
/* main state of the ctdb daemon */
struct ctdb_context {
struct event_context *ev;
struct ctdb_address address;
int listen_fd;
struct ctdb_node *nodes; /* list of nodes in the cluster */
struct ctdb_registered_call *calls; /* list of registered calls */
char *err_msg;
struct tdb_context *ltdb;
const struct ctdb_methods *methods; /* transport methods */
void *private; /* private to transport */
};
#define CTDB_NO_MEMORY(ctdb, p) do { if (!(p)) { \
ctdb_set_error(ctdb, "Out of memory at %s:%d", __FILE__, __LINE__); \
return -1; }} while (0)
/* internal prototypes */
void ctdb_set_error(struct ctdb_context *ctdb, const char *fmt, ...);
bool ctdb_same_address(struct ctdb_address *a1, struct ctdb_address *a2);
#define CTDB_SOCKET "/tmp/ctdb.sock"

View File

@ -23,44 +23,7 @@
#include "system/network.h"
#include "system/filesys.h"
#include "ctdb_private.h"
/*
initialise the ctdb daemon.
if the ctdb dispatcher daemon has already been started then this
does nothing. Otherwise it forks the ctdb dispatcher daemon and
starts the daemons connecting to each other
NOTE: In current code the daemon does not fork. This is for testing purposes only
and to simplify the code.
*/
struct ctdb_context *ctdb_init(struct event_context *ev)
{
struct ctdb_context *ctdb;
ctdb = talloc_zero(ev, struct ctdb_context);
ctdb->ev = ev;
return ctdb;
}
const char *ctdb_errstr(struct ctdb_context *ctdb)
{
return ctdb->err_msg;
}
/*
remember an error message
*/
static void ctdb_set_error(struct ctdb_context *ctdb, const char *fmt, ...)
{
va_list ap;
talloc_free(ctdb->err_msg);
va_start(ap, fmt);
ctdb->err_msg = talloc_vasprintf(ctdb, fmt, ap);
va_end(ap);
}
#include "ctdb_tcp.h"
/*
called when socket becomes readable
@ -84,11 +47,13 @@ static void ctdb_node_connect_write(struct event_context *ev, struct fd_event *f
uint16_t flags, void *private)
{
struct ctdb_node *node = talloc_get_type(private, struct ctdb_node);
struct ctdb_tcp_node *tnode = talloc_get_type(node->private,
struct ctdb_tcp_node);
struct ctdb_context *ctdb = node->ctdb;
int error;
socklen_t len;
if (getsockopt(node->fd, SOL_SOCKET, SO_ERROR, &error, &len) != 0 ||
if (getsockopt(tnode->fd, SOL_SOCKET, SO_ERROR, &error, &len) != 0 ||
error != 0) {
if (error == EINPROGRESS) {
printf("connect in progress\n");
@ -96,8 +61,8 @@ static void ctdb_node_connect_write(struct event_context *ev, struct fd_event *f
}
printf("getsockopt errno=%s\n", strerror(errno));
talloc_free(fde);
close(node->fd);
node->fd = -1;
close(tnode->fd);
tnode->fd = -1;
event_add_timed(ctdb->ev, node, timeval_current_ofs(1, 0),
ctdb_node_connect, node);
return;
@ -106,7 +71,7 @@ static void ctdb_node_connect_write(struct event_context *ev, struct fd_event *f
printf("Established connection to %s:%u\n",
node->address.address, node->address.port);
talloc_free(fde);
event_add_fd(node->ctdb->ev, node, node->fd, EVENT_FD_READ,
event_add_fd(node->ctdb->ev, node, tnode->fd, EVENT_FD_READ,
ctdb_node_read, node);
}
@ -117,134 +82,35 @@ static void ctdb_node_connect(struct event_context *ev, struct timed_event *te,
struct timeval t, void *private)
{
struct ctdb_node *node = talloc_get_type(private, struct ctdb_node);
struct ctdb_tcp_node *tnode = talloc_get_type(node->private,
struct ctdb_tcp_node);
struct ctdb_context *ctdb = node->ctdb;
unsigned v;
struct sockaddr_in sock_out;
node->fd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
tnode->fd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
v = fcntl(node->fd, F_GETFL, 0);
fcntl(node->fd, F_SETFL, v | O_NONBLOCK);
v = fcntl(tnode->fd, F_GETFL, 0);
fcntl(tnode->fd, F_SETFL, v | O_NONBLOCK);
inet_pton(AF_INET, node->address.address, &sock_out.sin_addr);
sock_out.sin_port = htons(node->address.port);
sock_out.sin_family = PF_INET;
if (connect(node->fd, &sock_out, sizeof(sock_out)) != 0 &&
if (connect(tnode->fd, &sock_out, sizeof(sock_out)) != 0 &&
errno != EINPROGRESS) {
/* try again once a second */
close(node->fd);
close(tnode->fd);
event_add_timed(ctdb->ev, node, timeval_current_ofs(1, 0),
ctdb_node_connect, node);
return;
}
/* non-blocking connect - wait for write event */
event_add_fd(node->ctdb->ev, node, node->fd, EVENT_FD_WRITE,
event_add_fd(node->ctdb->ev, node, tnode->fd, EVENT_FD_WRITE,
ctdb_node_connect_write, node);
}
/*
parse a IP:port pair
*/
static int ctdb_parse_address(struct ctdb_context *ctdb,
TALLOC_CTX *mem_ctx, const char *str,
struct ctdb_address *address)
{
char *p;
p = strchr(str, ':');
if (p == NULL) {
ctdb_set_error(ctdb, "Badly formed node '%s'\n", str);
return -1;
}
address->address = talloc_strndup(mem_ctx, str, p-str);
address->port = strtoul(p+1, NULL, 0);
return 0;
}
/*
add a node to the list of active nodes
*/
static int ctdb_add_node(struct ctdb_context *ctdb, char *nstr)
{
struct ctdb_node *node;
node = talloc(ctdb, struct ctdb_node);
if (ctdb_parse_address(ctdb, node, nstr, &node->address) != 0) {
return -1;
}
node->fd = -1;
node->ctdb = ctdb;
DLIST_ADD(ctdb->nodes, node);
return 0;
}
/*
setup the node list from a file
*/
int ctdb_set_nlist(struct ctdb_context *ctdb, const char *nlist)
{
char **lines;
int nlines;
int i;
lines = file_lines_load(nlist, &nlines, ctdb);
if (lines == NULL) {
ctdb_set_error(ctdb, "Failed to load nlist '%s'\n", nlist);
return -1;
}
for (i=0;i<nlines;i++) {
if (ctdb_add_node(ctdb, lines[i]) != 0) {
talloc_free(lines);
return -1;
}
}
talloc_free(lines);
return 0;
}
/*
setup the node list from a file
*/
int ctdb_set_address(struct ctdb_context *ctdb, const char *address)
{
return ctdb_parse_address(ctdb, ctdb, address, &ctdb->address);
}
/*
add a node to the list of active nodes
*/
int ctdb_set_call(struct ctdb_context *ctdb, ctdb_fn_t fn, int id)
{
struct ctdb_registered_call *call;
call = talloc(ctdb, struct ctdb_registered_call);
call->fn = fn;
call->id = id;
DLIST_ADD(ctdb->calls, call);
return 0;
}
/*
attach to a specific database
*/
int ctdb_attach(struct ctdb_context *ctdb, const char *name, int tdb_flags,
int open_flags, mode_t mode)
{
ctdb->ltdb = tdb_open(name, 0, TDB_INTERNAL, 0, 0);
if (ctdb->ltdb == NULL) {
ctdb_set_error(ctdb, "Failed to open tdb %s\n", name);
return -1;
}
return 0;
}
/*
called when an incoming connection is readable
*/
@ -271,15 +137,17 @@ static void ctdb_listen_event(struct event_context *ev, struct fd_event *fde,
uint16_t flags, void *private)
{
struct ctdb_context *ctdb;
struct ctdb_tcp *ctcp;
struct sockaddr_in addr;
socklen_t len;
int fd;
struct ctdb_incoming *in;
ctdb = talloc_get_type(private, struct ctdb_context);
ctcp = talloc_get_type(ctdb->private, struct ctdb_tcp);
memset(&addr, 0, sizeof(addr));
len = sizeof(addr);
fd = accept(ctdb->listen_fd, (struct sockaddr *)&addr, &len);
fd = accept(ctcp->listen_fd, (struct sockaddr *)&addr, &len);
if (fd == -1) return;
in = talloc(ctdb, struct ctdb_incoming);
@ -298,6 +166,7 @@ static void ctdb_listen_event(struct event_context *ev, struct fd_event *fde,
*/
static int ctdb_listen(struct ctdb_context *ctdb)
{
struct ctdb_tcp *ctcp = talloc_get_type(ctdb->private, struct ctdb_tcp);
struct sockaddr_in sock;
int one = 1;
@ -305,46 +174,38 @@ static int ctdb_listen(struct ctdb_context *ctdb)
sock.sin_family = PF_INET;
inet_pton(AF_INET, ctdb->address.address, &sock.sin_addr);
ctdb->listen_fd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
if (ctdb->listen_fd == -1) {
ctcp->listen_fd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
if (ctcp->listen_fd == -1) {
ctdb_set_error(ctdb, "socket failed\n");
return -1;
}
setsockopt(ctdb->listen_fd,SOL_SOCKET,SO_REUSEADDR,(char *)&one,sizeof(one));
setsockopt(ctcp->listen_fd,SOL_SOCKET,SO_REUSEADDR,(char *)&one,sizeof(one));
if (bind(ctdb->listen_fd, (struct sockaddr * )&sock, sizeof(sock)) != 0) {
if (bind(ctcp->listen_fd, (struct sockaddr * )&sock, sizeof(sock)) != 0) {
ctdb_set_error(ctdb, "bind failed\n");
close(ctdb->listen_fd);
ctdb->listen_fd = -1;
close(ctcp->listen_fd);
ctcp->listen_fd = -1;
return -1;
}
if (listen(ctdb->listen_fd, 10) == -1) {
if (listen(ctcp->listen_fd, 10) == -1) {
ctdb_set_error(ctdb, "listen failed\n");
close(ctdb->listen_fd);
ctdb->listen_fd = -1;
close(ctcp->listen_fd);
ctcp->listen_fd = -1;
return -1;
}
event_add_fd(ctdb->ev, ctdb, ctdb->listen_fd, EVENT_FD_READ,
event_add_fd(ctdb->ev, ctdb, ctcp->listen_fd, EVENT_FD_READ,
ctdb_listen_event, ctdb);
return 0;
}
/*
check if two addresses are the same
*/
static bool ctdb_same_address(struct ctdb_address *a1, struct ctdb_address *a2)
{
return strcmp(a1->address, a2->address) == 0 && a1->port == a2->port;
}
/*
start the protocol going
*/
int ctdb_start(struct ctdb_context *ctdb)
int ctdb_tcp_start(struct ctdb_context *ctdb)
{
struct ctdb_node *node;
@ -362,12 +223,39 @@ int ctdb_start(struct ctdb_context *ctdb)
return 0;
}
/*
make a remote ctdb call
initialise tcp portion of a ctdb node
*/
int ctdb_call(struct ctdb_context *ctdb, TDB_DATA key, int call_id,
TDB_DATA *call_data, TDB_DATA *reply_data)
int ctdb_tcp_add_node(struct ctdb_node *node)
{
printf("ctdb_call not implemented\n");
return -1;
struct ctdb_tcp_node *tnode;
tnode = talloc_zero(node, struct ctdb_tcp_node);
CTDB_NO_MEMORY(node->ctdb, tnode);
tnode->fd = -1;
node->private = tnode;
return 0;
}
static const struct ctdb_methods ctdb_tcp_methods = {
.start = ctdb_tcp_start,
.add_node = ctdb_tcp_add_node
};
/*
initialise tcp portion of ctdb
*/
int ctdb_tcp_init(struct ctdb_context *ctdb)
{
struct ctdb_tcp *ctcp;
ctcp = talloc_zero(ctdb, struct ctdb_tcp);
CTDB_NO_MEMORY(ctdb, ctcp);
ctcp->listen_fd = -1;
ctdb->private = ctcp;
ctdb->methods = &ctdb_tcp_methods;
return 0;
}

41
ctdb/tcp/ctdb_tcp.h Normal file
View File

@ -0,0 +1,41 @@
/*
ctdb database library
Copyright (C) Andrew Tridgell 2006
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation; either
version 2 of the License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public
License along with this library; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
/* ctdb_tcp main state */
struct ctdb_tcp {
int listen_fd;
};
/*
state associated with an incoming connection
*/
struct ctdb_incoming {
struct ctdb_context *ctdb;
int fd;
};
/*
state associated with one tcp node
*/
struct ctdb_tcp_node {
int fd;
};