1
0
mirror of https://github.com/samba-team/samba.git synced 2025-03-08 04:58:40 +03:00

- setup a convenience name field for nodes

- added basic IO handling for the tcp backend

- added a ctdb_node_dead upcall

- added packet queueing

- adding incoming packet handling

(This used to be ctdb commit 415497c952630e746e8cdcf8e1e2a7b2ac3e51fb)
This commit is contained in:
Andrew Tridgell 2006-11-28 14:15:46 +11:00
parent 5b06e73fb1
commit 5d0ba69e06
6 changed files with 206 additions and 27 deletions

View File

@ -89,6 +89,9 @@ static int ctdb_add_node(struct ctdb_context *ctdb, char *nstr)
return -1;
}
node->ctdb = ctdb;
node->name = talloc_asprintf(node, "%s:%u",
node->address.address,
node->address.port);
if (ctdb->methods->add_node(node) != 0) {
talloc_free(node);
@ -194,13 +197,22 @@ bool ctdb_same_address(struct ctdb_address *a1, struct ctdb_address *a2)
/*
called by the transport layer when a packet comes in
*/
static void ctdb_recv_pkt(struct ctdb_node *node, uint8_t *data, uint32_t length)
static void ctdb_recv_pkt(struct ctdb_context *ctdb, uint8_t *data, uint32_t length)
{
printf("received pkt of length %d\n", length);
}
/*
called by the transport layer when a node is dead
*/
static void ctdb_node_dead(struct ctdb_node *node)
{
printf("node %s is dead\n", node->name);
}
static const struct ctdb_upcalls ctdb_upcalls = {
.recv_pkt = ctdb_recv_pkt
.recv_pkt = ctdb_recv_pkt,
.node_dead = ctdb_node_dead
};
/*

View File

@ -44,6 +44,7 @@ struct ctdb_node {
struct ctdb_context *ctdb;
struct ctdb_node *next, *prev;
struct ctdb_address address;
const char *name; /* for debug messages */
void *private; /* private to transport */
};
@ -60,7 +61,8 @@ struct ctdb_methods {
transport calls up to the ctdb layer
*/
struct ctdb_upcalls {
void (*recv_pkt)(struct ctdb_node *, uint8_t *data, uint32_t length);
void (*recv_pkt)(struct ctdb_context *, uint8_t *data, uint32_t length);
void (*node_dead)(struct ctdb_node *);
};
/* main state of the ctdb daemon */

View File

@ -32,21 +32,28 @@ struct ctdb_incoming {
int fd;
};
struct ctdb_tcp_packet {
struct ctdb_tcp_packet *next, *prev;
uint8_t *data;
uint32_t length;
};
/*
state associated with one tcp node
*/
struct ctdb_tcp_node {
int fd;
struct fd_event *fde;
struct ctdb_tcp_packet *queue;
};
/* prototypes internal to tcp transport */
void ctdb_tcp_node_read(struct event_context *ev, struct fd_event *fde,
uint16_t flags, void *private);
void ctdb_tcp_node_write(struct event_context *ev, struct fd_event *fde,
uint16_t flags, void *private);
void ctdb_tcp_incoming_read(struct event_context *ev, struct fd_event *fde,
uint16_t flags, void *private);
int ctdb_tcp_queue_pkt(struct ctdb_node *node, uint8_t *data, uint32_t length);
int ctdb_tcp_listen(struct ctdb_context *ctdb);
void ctdb_tcp_node_connect(struct event_context *ev, struct timed_event *te,
struct timeval t, void *private);

View File

@ -25,6 +25,14 @@
#include "ctdb_private.h"
#include "ctdb_tcp.h"
static void set_nonblocking(int fd)
{
unsigned v;
v = fcntl(fd, F_GETFL, 0);
fcntl(fd, F_SETFL, v | O_NONBLOCK);
}
/*
called when socket becomes writeable on connect
*/
@ -53,11 +61,10 @@ static void ctdb_node_connect_write(struct event_context *ev, struct fd_event *f
return;
}
printf("Established connection to %s:%u\n",
node->address.address, node->address.port);
printf("Established connection to %s\n", node->name);
talloc_free(fde);
event_add_fd(node->ctdb->ev, node, tnode->fd, EVENT_FD_READ,
ctdb_tcp_node_read, node);
tnode->fde = event_add_fd(node->ctdb->ev, node, tnode->fd, EVENT_FD_READ,
ctdb_tcp_node_write, node);
}
/*
@ -70,13 +77,11 @@ void ctdb_tcp_node_connect(struct event_context *ev, struct timed_event *te,
struct ctdb_tcp_node *tnode = talloc_get_type(node->private,
struct ctdb_tcp_node);
struct ctdb_context *ctdb = node->ctdb;
unsigned v;
struct sockaddr_in sock_out;
tnode->fd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
v = fcntl(tnode->fd, F_GETFL, 0);
fcntl(tnode->fd, F_SETFL, v | O_NONBLOCK);
set_nonblocking(tnode->fd);
inet_pton(AF_INET, node->address.address, &sock_out.sin_addr);
sock_out.sin_port = htons(node->address.port);
@ -96,6 +101,16 @@ void ctdb_tcp_node_connect(struct event_context *ev, struct timed_event *te,
ctdb_node_connect_write, node);
}
/*
destroy a ctdb_incoming structure
*/
static int ctdb_incoming_destructor(struct ctdb_incoming *in)
{
close(in->fd);
in->fd = -1;
return 0;
}
/*
called when we get contacted by another node
currently makes no attempt to check if the connection is really from a ctdb
@ -122,9 +137,13 @@ static void ctdb_listen_event(struct event_context *ev, struct fd_event *fde,
in->fd = fd;
in->ctdb = ctdb;
set_nonblocking(in->fd);
event_add_fd(ctdb->ev, in, in->fd, EVENT_FD_READ,
ctdb_tcp_incoming_read, in);
talloc_set_destructor(in, ctdb_incoming_destructor);
printf("New incoming socket %d\n", in->fd);
}

View File

@ -63,8 +63,9 @@ int ctdb_tcp_add_node(struct ctdb_node *node)
static const struct ctdb_methods ctdb_tcp_methods = {
.start = ctdb_tcp_start,
.add_node = ctdb_tcp_add_node
.start = ctdb_tcp_start,
.add_node = ctdb_tcp_add_node,
.queue_pkt = ctdb_tcp_queue_pkt
};
/*

View File

@ -26,15 +26,78 @@
#include "ctdb_tcp.h"
/*
called when socket becomes readable
called when we fail to send a message to a node
*/
void ctdb_tcp_node_read(struct event_context *ev, struct fd_event *fde,
uint16_t flags, void *private)
static void ctdb_tcp_node_dead(struct event_context *ev, struct timed_event *te,
struct timeval t, void *private)
{
struct ctdb_node *node = talloc_get_type(private, struct ctdb_node);
printf("connection to node %s:%u is readable\n",
node->address.address, node->address.port);
event_set_fd_flags(fde, 0);
struct ctdb_tcp_node *tnode = talloc_get_type(node->private,
struct ctdb_tcp_node);
/* flush the queue */
while (tnode->queue) {
struct ctdb_tcp_packet *pkt = tnode->queue;
DLIST_REMOVE(tnode->queue, pkt);
talloc_free(pkt);
}
/* start a new connect cycle to try to re-establish the
link */
talloc_free(tnode->fde);
close(tnode->fd);
tnode->fd = -1;
event_add_timed(node->ctdb->ev, node, timeval_zero(),
ctdb_tcp_node_connect, node);
}
/*
called when socket becomes readable
*/
void ctdb_tcp_node_write(struct event_context *ev, struct fd_event *fde,
uint16_t flags, void *private)
{
struct ctdb_node *node = talloc_get_type(private, struct ctdb_node);
struct ctdb_tcp_node *tnode = talloc_get_type(node->private,
struct ctdb_tcp_node);
if (flags & EVENT_FD_READ) {
/* getting a read event on this fd in the current tcp model is
always an error, as we have separate read and write
sockets. In future we may combine them, but for now it must
mean that the socket is dead, so we try to reconnect */
talloc_free(tnode->fde);
close(tnode->fd);
tnode->fd = -1;
event_add_timed(node->ctdb->ev, node, timeval_zero(),
ctdb_tcp_node_connect, node);
return;
}
while (tnode->queue) {
struct ctdb_tcp_packet *pkt = tnode->queue;
ssize_t n;
n = write(tnode->fd, pkt->data, pkt->length);
if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
event_add_timed(node->ctdb->ev, node, timeval_zero(),
ctdb_tcp_node_dead, node);
EVENT_FD_NOT_WRITEABLE(tnode->fde);
return;
}
if (n <= 0) return;
if (n != pkt->length) {
pkt->length -= n;
pkt->data += n;
return;
}
DLIST_REMOVE(tnode->queue, pkt);
talloc_free(pkt);
}
EVENT_FD_NOT_WRITEABLE(tnode->fde);
}
@ -45,12 +108,87 @@ void ctdb_tcp_incoming_read(struct event_context *ev, struct fd_event *fde,
uint16_t flags, void *private)
{
struct ctdb_incoming *in = talloc_get_type(private, struct ctdb_incoming);
char c;
printf("Incoming data\n");
if (read(in->fd, &c, 1) <= 0) {
/* socket is dead */
close(in->fd);
int num_ready = 0;
uint8_t *data;
/* NOTE: we don't yet handle combined packets or partial
packets. Obviously that needed fixing, using a similar
scheme to the Samba4 packet layer */
if (ioctl(in->fd, FIONREAD, &num_ready) != 0 ||
num_ready == 0) {
/* we've lost the link from another node. We don't
notify the upper layers, as we only want to trigger
a full node reorganisation when a send fails - that
allows nodes to restart without penalty as long as
the network is idle */
talloc_free(in);
return;
}
data = talloc_size(in, num_ready);
if (data == NULL) {
/* not much we can do except drop the socket */
talloc_free(in);
return;
}
if (read(in->fd, data, num_ready) != num_ready) {
talloc_free(in);
return;
}
/* tell the ctdb layer above that we have a packet */
in->ctdb->upcalls->recv_pkt(in->ctdb, data, num_ready);
talloc_free(data);
}
/*
queue a packet for sending
*/
int ctdb_tcp_queue_pkt(struct ctdb_node *node, uint8_t *data, uint32_t length)
{
struct ctdb_tcp_node *tnode = talloc_get_type(node->private,
struct ctdb_tcp_node);
struct ctdb_tcp_packet *pkt;
if (tnode->fd == -1) {
ctdb_set_error(node->ctdb, "Sending to dead node %s\n", node->name);
return -1;
}
/* if the queue is empty then try an immediate write, avoiding
queue overhead. This relies on non-blocking sockets */
if (tnode->queue == NULL) {
ssize_t n = write(tnode->fd, data, length);
if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
event_add_timed(node->ctdb->ev, node, timeval_zero(),
ctdb_tcp_node_dead, node);
/* yes, we report success, as the dead node is
handled via a separate event */
return 0;
}
if (n > 0) {
data += n;
length -= n;
}
if (length == 0) return 0;
}
pkt = talloc(tnode, struct ctdb_tcp_packet);
CTDB_NO_MEMORY(node->ctdb, pkt);
pkt->data = talloc_memdup(pkt, data, length);
CTDB_NO_MEMORY(node->ctdb, pkt->data);
pkt->length = length;
if (tnode->queue == NULL) {
EVENT_FD_WRITEABLE(tnode->fde);
}
DLIST_ADD_END(tnode->queue, pkt, struct ctdb_tcp_packet *);
return 0;
}