mirror of
https://github.com/samba-team/samba.git
synced 2024-12-22 13:34:15 +03:00
added handling of partial packet reads
added transport level packet allocator, allowing the transport to enforce alignment or special memory rules (This used to be ctdb commit 50304a5c4d8d640732678eeed793857334ca5ec1)
This commit is contained in:
parent
ee547a0f9a
commit
3c097c9a5f
@ -130,7 +130,7 @@ static void ctdb_send_error(struct ctdb_context *ctdb,
|
||||
va_end(ap);
|
||||
|
||||
len = strlen(msg)+1;
|
||||
r = talloc_size(ctdb, sizeof(*r) + len);
|
||||
r = ctdb->methods->allocate_pkt(ctdb, sizeof(*r) + len);
|
||||
CTDB_NO_MEMORY_FATAL(ctdb, r);
|
||||
r->hdr.length = sizeof(*r) + len;
|
||||
r->hdr.operation = CTDB_REPLY_ERROR;
|
||||
@ -158,7 +158,7 @@ static void ctdb_call_send_redirect(struct ctdb_context *ctdb,
|
||||
{
|
||||
struct ctdb_reply_redirect *r;
|
||||
|
||||
r = talloc_size(ctdb, sizeof(*r));
|
||||
r = ctdb->methods->allocate_pkt(ctdb, sizeof(*r));
|
||||
CTDB_NO_MEMORY_FATAL(ctdb, r);
|
||||
r->hdr.length = sizeof(*r);
|
||||
r->hdr.operation = CTDB_REPLY_REDIRECT;
|
||||
@ -188,7 +188,7 @@ static void ctdb_call_send_dmaster(struct ctdb_context *ctdb,
|
||||
int len;
|
||||
|
||||
len = sizeof(*r) + key->dsize + data->dsize;
|
||||
r = talloc_size(ctdb, len);
|
||||
r = ctdb->methods->allocate_pkt(ctdb, len);
|
||||
CTDB_NO_MEMORY_FATAL(ctdb, r);
|
||||
r->hdr.length = len;
|
||||
r->hdr.operation = CTDB_REQ_DMASTER;
|
||||
@ -255,7 +255,7 @@ void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr
|
||||
}
|
||||
|
||||
/* send the CTDB_REPLY_DMASTER */
|
||||
r = talloc_size(ctdb, sizeof(*r) + data.dsize);
|
||||
r = ctdb->methods->allocate_pkt(ctdb, sizeof(*r) + data.dsize);
|
||||
CTDB_NO_MEMORY_FATAL(ctdb, r);
|
||||
r->hdr.length = sizeof(*r) + data.dsize;
|
||||
r->hdr.operation = CTDB_REPLY_DMASTER;
|
||||
@ -317,7 +317,7 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
|
||||
call_data.dsize?&call_data:NULL,
|
||||
&reply_data, c->hdr.srcnode);
|
||||
|
||||
r = talloc_size(ctdb, sizeof(*r) + reply_data.dsize);
|
||||
r = ctdb->methods->allocate_pkt(ctdb, sizeof(*r) + reply_data.dsize);
|
||||
CTDB_NO_MEMORY_FATAL(ctdb, r);
|
||||
r->hdr.length = sizeof(*r) + reply_data.dsize;
|
||||
r->hdr.operation = CTDB_REPLY_CALL;
|
||||
@ -539,7 +539,7 @@ struct ctdb_call_state *ctdb_call_send(struct ctdb_context *ctdb,
|
||||
CTDB_NO_MEMORY_NULL(ctdb, state);
|
||||
|
||||
len = sizeof(*state->c) + key.dsize + (call_data?call_data->dsize:0);
|
||||
state->c = talloc_size(ctdb, len);
|
||||
state->c = ctdb->methods->allocate_pkt(ctdb, len);
|
||||
CTDB_NO_MEMORY_NULL(ctdb, state->c);
|
||||
|
||||
state->c->hdr.length = len;
|
||||
|
@ -55,6 +55,7 @@ struct ctdb_methods {
|
||||
int (*start)(struct ctdb_context *); /* start protocol processing */
|
||||
int (*add_node)(struct ctdb_node *); /* setup a new node */
|
||||
int (*queue_pkt)(struct ctdb_node *, uint8_t *data, uint32_t length);
|
||||
void *(*allocate_pkt)(struct ctdb_context *, size_t );
|
||||
};
|
||||
|
||||
/*
|
||||
|
@ -24,14 +24,29 @@ struct ctdb_tcp {
|
||||
int listen_fd;
|
||||
};
|
||||
|
||||
/*
|
||||
incoming packet structure - only used when we get a partial packet
|
||||
on read
|
||||
*/
|
||||
struct ctdb_tcp_partial {
|
||||
uint8_t *data;
|
||||
uint32_t length;
|
||||
};
|
||||
|
||||
|
||||
/*
|
||||
state associated with an incoming connection
|
||||
*/
|
||||
struct ctdb_incoming {
|
||||
struct ctdb_context *ctdb;
|
||||
int fd;
|
||||
struct ctdb_tcp_partial partial;
|
||||
};
|
||||
|
||||
/*
|
||||
outgoing packet structure - only allocated when we can't write immediately
|
||||
to the socket
|
||||
*/
|
||||
struct ctdb_tcp_packet {
|
||||
struct ctdb_tcp_packet *next, *prev;
|
||||
uint8_t *data;
|
||||
|
@ -135,7 +135,7 @@ static void ctdb_listen_event(struct event_context *ev, struct fd_event *fde,
|
||||
fd = accept(ctcp->listen_fd, (struct sockaddr *)&addr, &len);
|
||||
if (fd == -1) return;
|
||||
|
||||
in = talloc(ctdb, struct ctdb_incoming);
|
||||
in = talloc_zero(ctdb, struct ctdb_incoming);
|
||||
in->fd = fd;
|
||||
in->ctdb = ctdb;
|
||||
|
||||
|
@ -64,10 +64,24 @@ int ctdb_tcp_add_node(struct ctdb_node *node)
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
transport packet allocator - allows transport to control memory for packets
|
||||
*/
|
||||
void *ctdb_tcp_allocate_pkt(struct ctdb_context *ctdb, size_t size)
|
||||
{
|
||||
/* tcp transport needs to round to 8 byte alignment to ensure
|
||||
that we can use a length header and 64 bit elements in
|
||||
structures */
|
||||
size = (size+7) & ~7;
|
||||
return talloc_size(ctdb, size);
|
||||
}
|
||||
|
||||
|
||||
static const struct ctdb_methods ctdb_tcp_methods = {
|
||||
.start = ctdb_tcp_start,
|
||||
.add_node = ctdb_tcp_add_node,
|
||||
.queue_pkt = ctdb_tcp_queue_pkt
|
||||
.queue_pkt = ctdb_tcp_queue_pkt,
|
||||
.allocate_pkt = ctdb_tcp_allocate_pkt
|
||||
};
|
||||
|
||||
/*
|
||||
|
@ -25,6 +25,7 @@
|
||||
#include "ctdb_private.h"
|
||||
#include "ctdb_tcp.h"
|
||||
|
||||
|
||||
/*
|
||||
called when we fail to send a message to a node
|
||||
*/
|
||||
@ -109,11 +110,8 @@ void ctdb_tcp_incoming_read(struct event_context *ev, struct fd_event *fde,
|
||||
{
|
||||
struct ctdb_incoming *in = talloc_get_type(private, struct ctdb_incoming);
|
||||
int num_ready = 0;
|
||||
uint8_t *data;
|
||||
|
||||
/* NOTE: we don't yet handle combined packets or partial
|
||||
packets. Obviously that needed fixing, using a similar
|
||||
scheme to the Samba4 packet layer */
|
||||
ssize_t nread;
|
||||
uint8_t *data, *data_base;
|
||||
|
||||
if (ioctl(in->fd, FIONREAD, &num_ready) != 0 ||
|
||||
num_ready == 0) {
|
||||
@ -126,20 +124,71 @@ void ctdb_tcp_incoming_read(struct event_context *ev, struct fd_event *fde,
|
||||
return;
|
||||
}
|
||||
|
||||
data = talloc_size(in, num_ready);
|
||||
if (data == NULL) {
|
||||
in->partial.data = talloc_realloc_size(in, in->partial.data,
|
||||
num_ready + in->partial.length);
|
||||
if (in->partial.data == NULL) {
|
||||
/* not much we can do except drop the socket */
|
||||
talloc_free(in);
|
||||
return;
|
||||
}
|
||||
|
||||
if (read(in->fd, data, num_ready) != num_ready) {
|
||||
nread = read(in->fd, in->partial.data+in->partial.length, num_ready);
|
||||
if (nread <= 0) {
|
||||
/* the connection must be dead */
|
||||
talloc_free(in);
|
||||
return;
|
||||
}
|
||||
|
||||
/* tell the ctdb layer above that we have a packet */
|
||||
in->ctdb->upcalls->recv_pkt(in->ctdb, data, num_ready);
|
||||
data = in->partial.data;
|
||||
nread += in->partial.length;
|
||||
|
||||
in->partial.data = NULL;
|
||||
in->partial.length = 0;
|
||||
|
||||
if (nread >= 4 && *(uint32_t *)data == nread) {
|
||||
/* most common case - we got a whole packet in one go
|
||||
tell the ctdb layer above that we have a packet */
|
||||
in->ctdb->upcalls->recv_pkt(in->ctdb, data, nread);
|
||||
return;
|
||||
}
|
||||
|
||||
data_base = data;
|
||||
|
||||
while (nread >= 4 && *(uint32_t *)data <= nread) {
|
||||
/* we have at least one packet */
|
||||
uint8_t *d2;
|
||||
uint32_t len;
|
||||
len = *(uint32_t *)data;
|
||||
d2 = talloc_memdup(in, data, len);
|
||||
if (d2 == NULL) {
|
||||
/* sigh */
|
||||
talloc_free(in);
|
||||
return;
|
||||
}
|
||||
in->ctdb->upcalls->recv_pkt(in->ctdb, d2, len);
|
||||
data += len;
|
||||
nread -= len;
|
||||
return;
|
||||
}
|
||||
|
||||
if (nread < 4 || *(uint32_t *)data > nread) {
|
||||
/* we have only part of a packet */
|
||||
if (data_base == data) {
|
||||
in->partial.data = data;
|
||||
in->partial.length = nread;
|
||||
} else {
|
||||
in->partial.data = talloc_memdup(in, data, nread);
|
||||
if (in->partial.data == NULL) {
|
||||
talloc_free(in);
|
||||
return;
|
||||
}
|
||||
in->partial.length = nread;
|
||||
talloc_free(data_base);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
talloc_free(data_base);
|
||||
}
|
||||
|
||||
/*
|
||||
|
Loading…
Reference in New Issue
Block a user