mirror of
https://github.com/samba-team/samba.git
synced 2024-12-22 13:34:15 +03:00
ibw: modified tridge's code - in my point of view
ibw_alloc_send and node-centric params are the basics of these important changes. Also tried to avoid memcpy/memdup where it was possible. (This used to be ctdb commit 9e8cb9b96c685288c04ee8b69a972f582cd3c904)
This commit is contained in:
parent
00df320053
commit
9c114a3fc5
@ -30,14 +30,14 @@
|
||||
int ctdb_set_transport(struct ctdb_context *ctdb, const char *transport)
|
||||
{
|
||||
int ctdb_tcp_init(struct ctdb_context *ctdb);
|
||||
#ifdef HAVE_INFINIBAND
|
||||
#ifdef USE_INFINIBAND
|
||||
int ctdb_ibw_init(struct ctdb_context *ctdb);
|
||||
#endif /*HAVE_INFINIBAND*/
|
||||
|
||||
if (strcmp(transport, "tcp") == 0) {
|
||||
return ctdb_tcp_init(ctdb);
|
||||
}
|
||||
#ifdef HAVE_INFINIBAND
|
||||
#ifdef USE_INFINIBAND
|
||||
if (strcmp(transport, "ib") == 0) {
|
||||
return ctdb_ibw_init(ctdb);
|
||||
}
|
||||
@ -256,10 +256,15 @@ void ctdb_wait_loop(struct ctdb_context *ctdb)
|
||||
}
|
||||
}
|
||||
|
||||
void ctdb_stopped(struct ctdb_context *ctdb)
|
||||
{
|
||||
}
|
||||
|
||||
static const struct ctdb_upcalls ctdb_upcalls = {
|
||||
.recv_pkt = ctdb_recv_pkt,
|
||||
.node_dead = ctdb_node_dead,
|
||||
.node_connected = ctdb_node_connected
|
||||
.node_connected = ctdb_node_connected,
|
||||
.stopped = ctdb_stopped
|
||||
};
|
||||
|
||||
/*
|
||||
|
@ -31,12 +31,10 @@
|
||||
/*
|
||||
queue a packet or die
|
||||
*/
|
||||
static void ctdb_queue_packet(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
|
||||
static inline void ctdb_queue_packet(struct ctdb_node *node, struct ctdb_req_header *hdr)
|
||||
{
|
||||
struct ctdb_node *node;
|
||||
node = ctdb->nodes[hdr->destnode];
|
||||
if (ctdb->methods->queue_pkt(node, (uint8_t *)hdr, hdr->length) != 0) {
|
||||
ctdb_fatal(ctdb, "Unable to queue packet\n");
|
||||
if (node->ctdb->methods->queue_pkt(node, (uint8_t *)hdr, hdr->length) != 0) {
|
||||
ctdb_fatal(node->ctdb, "Unable to queue packet\n");
|
||||
}
|
||||
}
|
||||
|
||||
@ -121,6 +119,9 @@ static void ctdb_send_error(struct ctdb_context *ctdb,
|
||||
struct ctdb_reply_error *r;
|
||||
char *msg;
|
||||
int len;
|
||||
struct ctdb_node *node;
|
||||
|
||||
node = ctdb->nodes[hdr->srcnode];
|
||||
|
||||
va_start(ap, fmt);
|
||||
msg = talloc_vasprintf(ctdb, fmt, ap);
|
||||
@ -130,7 +131,7 @@ static void ctdb_send_error(struct ctdb_context *ctdb,
|
||||
va_end(ap);
|
||||
|
||||
len = strlen(msg)+1;
|
||||
r = ctdb->methods->allocate_pkt(ctdb, sizeof(*r) + len);
|
||||
r = ctdb->methods->allocate_pkt(node, sizeof(*r) + len);
|
||||
CTDB_NO_MEMORY_FATAL(ctdb, r);
|
||||
r->hdr.length = sizeof(*r) + len;
|
||||
r->hdr.operation = CTDB_REPLY_ERROR;
|
||||
@ -143,9 +144,8 @@ static void ctdb_send_error(struct ctdb_context *ctdb,
|
||||
|
||||
talloc_free(msg);
|
||||
|
||||
ctdb_queue_packet(ctdb, &r->hdr);
|
||||
|
||||
talloc_free(r);
|
||||
ctdb_queue_packet(node, &r->hdr);
|
||||
ctdb->methods->dealloc_pkt(node, r);
|
||||
}
|
||||
|
||||
|
||||
@ -157,8 +157,11 @@ static void ctdb_call_send_redirect(struct ctdb_context *ctdb,
|
||||
struct ctdb_ltdb_header *header)
|
||||
{
|
||||
struct ctdb_reply_redirect *r;
|
||||
struct ctdb_node *node;
|
||||
|
||||
r = ctdb->methods->allocate_pkt(ctdb, sizeof(*r));
|
||||
node = ctdb->nodes[c->hdr.srcnode];
|
||||
|
||||
r = ctdb->methods->allocate_pkt(node, sizeof(*r));
|
||||
CTDB_NO_MEMORY_FATAL(ctdb, r);
|
||||
r->hdr.length = sizeof(*r);
|
||||
r->hdr.operation = CTDB_REPLY_REDIRECT;
|
||||
@ -167,9 +170,8 @@ static void ctdb_call_send_redirect(struct ctdb_context *ctdb,
|
||||
r->hdr.reqid = c->hdr.reqid;
|
||||
r->dmaster = header->dmaster;
|
||||
|
||||
ctdb_queue_packet(ctdb, &r->hdr);
|
||||
|
||||
talloc_free(r);
|
||||
ctdb_queue_packet(node, &r->hdr);
|
||||
ctdb->methods->dealloc_pkt(node, r);
|
||||
}
|
||||
|
||||
/*
|
||||
@ -186,13 +188,18 @@ static void ctdb_call_send_dmaster(struct ctdb_context *ctdb,
|
||||
{
|
||||
struct ctdb_req_dmaster *r;
|
||||
int len;
|
||||
struct ctdb_node *node;
|
||||
uint32_t destnode;
|
||||
|
||||
destnode = ctdb_lmaster(ctdb, key);
|
||||
node = ctdb->nodes[destnode];
|
||||
|
||||
len = sizeof(*r) + key->dsize + data->dsize;
|
||||
r = ctdb->methods->allocate_pkt(ctdb, len);
|
||||
r = ctdb->methods->allocate_pkt(node, len);
|
||||
CTDB_NO_MEMORY_FATAL(ctdb, r);
|
||||
r->hdr.length = len;
|
||||
r->hdr.operation = CTDB_REQ_DMASTER;
|
||||
r->hdr.destnode = ctdb_lmaster(ctdb, key);
|
||||
r->hdr.destnode = destnode;
|
||||
r->hdr.srcnode = ctdb->vnn;
|
||||
r->hdr.reqid = c->hdr.reqid;
|
||||
r->dmaster = header->laccessor;
|
||||
@ -205,14 +212,14 @@ static void ctdb_call_send_dmaster(struct ctdb_context *ctdb,
|
||||
/* we are the lmaster - don't send to ourselves */
|
||||
ctdb_request_dmaster(ctdb, &r->hdr);
|
||||
} else {
|
||||
ctdb_queue_packet(ctdb, &r->hdr);
|
||||
ctdb_queue_packet(node, &r->hdr);
|
||||
|
||||
/* update the ltdb to record the new dmaster */
|
||||
header->dmaster = r->hdr.destnode;
|
||||
ctdb_ltdb_store(ctdb, *key, header, *data);
|
||||
}
|
||||
|
||||
talloc_free(r);
|
||||
ctdb->methods->dealloc_pkt(node, r);
|
||||
}
|
||||
|
||||
|
||||
@ -229,7 +236,9 @@ void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr
|
||||
TDB_DATA key, data;
|
||||
struct ctdb_ltdb_header header;
|
||||
int ret;
|
||||
struct ctdb_node *node;
|
||||
|
||||
node = ctdb->nodes[c->dmaster];
|
||||
key.dptr = c->data;
|
||||
key.dsize = c->keylen;
|
||||
data.dptr = c->data + c->keylen;
|
||||
@ -255,7 +264,7 @@ void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr
|
||||
}
|
||||
|
||||
/* send the CTDB_REPLY_DMASTER */
|
||||
r = ctdb->methods->allocate_pkt(ctdb, sizeof(*r) + data.dsize);
|
||||
r = ctdb->methods->allocate_pkt(node, sizeof(*r) + data.dsize);
|
||||
CTDB_NO_MEMORY_FATAL(ctdb, r);
|
||||
r->hdr.length = sizeof(*r) + data.dsize;
|
||||
r->hdr.operation = CTDB_REPLY_DMASTER;
|
||||
@ -265,9 +274,8 @@ void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr
|
||||
r->datalen = data.dsize;
|
||||
memcpy(&r->data[0], data.dptr, data.dsize);
|
||||
|
||||
ctdb_queue_packet(ctdb, &r->hdr);
|
||||
|
||||
talloc_free(r);
|
||||
ctdb_queue_packet(node, &r->hdr);
|
||||
ctdb->methods->dealloc_pkt(node, r);
|
||||
}
|
||||
|
||||
|
||||
@ -281,7 +289,9 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
|
||||
struct ctdb_reply_call *r;
|
||||
int ret;
|
||||
struct ctdb_ltdb_header header;
|
||||
struct ctdb_node *node;
|
||||
|
||||
node = ctdb->nodes[hdr->srcnode];
|
||||
key.dptr = c->data;
|
||||
key.dsize = c->keylen;
|
||||
call_data.dptr = c->data + c->keylen;
|
||||
@ -317,7 +327,7 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
|
||||
call_data.dsize?&call_data:NULL,
|
||||
&reply_data, c->hdr.srcnode);
|
||||
|
||||
r = ctdb->methods->allocate_pkt(ctdb, sizeof(*r) + reply_data.dsize);
|
||||
r = ctdb->methods->allocate_pkt(node, sizeof(*r) + reply_data.dsize);
|
||||
CTDB_NO_MEMORY_FATAL(ctdb, r);
|
||||
r->hdr.length = sizeof(*r) + reply_data.dsize;
|
||||
r->hdr.operation = CTDB_REPLY_CALL;
|
||||
@ -327,10 +337,10 @@ void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
|
||||
r->datalen = reply_data.dsize;
|
||||
memcpy(&r->data[0], reply_data.dptr, reply_data.dsize);
|
||||
|
||||
ctdb_queue_packet(ctdb, &r->hdr);
|
||||
ctdb_queue_packet(node, &r->hdr);
|
||||
ctdb->methods->dealloc_pkt(node, r);
|
||||
|
||||
talloc_free(reply_data.dptr);
|
||||
talloc_free(r);
|
||||
}
|
||||
|
||||
enum call_state {CTDB_CALL_WAIT, CTDB_CALL_DONE, CTDB_CALL_ERROR};
|
||||
@ -440,7 +450,10 @@ void ctdb_reply_redirect(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
|
||||
{
|
||||
struct ctdb_reply_redirect *c = (struct ctdb_reply_redirect *)hdr;
|
||||
struct ctdb_call_state *state;
|
||||
|
||||
struct ctdb_node *node;
|
||||
#ifdef USE_INFINIBAND
|
||||
uint8_t *r;
|
||||
#endif /* USE_INFINIBAND */
|
||||
state = idr_find(ctdb->idr, hdr->reqid);
|
||||
|
||||
talloc_steal(state, c);
|
||||
@ -453,7 +466,18 @@ void ctdb_reply_redirect(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
|
||||
/* send it off again */
|
||||
state->node = ctdb->nodes[c->dmaster];
|
||||
|
||||
ctdb_queue_packet(ctdb, &state->c->hdr);
|
||||
node = ctdb->nodes[state->c->hdr.destnode];
|
||||
|
||||
#ifdef USE_INFINIBAND
|
||||
r = ctdb->methods->allocate_pkt(node, state->c->hdr.length);
|
||||
memcpy(r, &state->c->hdr, state->c->hdr.length);
|
||||
#endif /* USE_INFINIBAND */
|
||||
|
||||
ctdb_queue_packet(node, &state->c->hdr);
|
||||
|
||||
#ifdef USE_INFINIBAND
|
||||
ctdb->methods->dealloc_pkt(node, r);
|
||||
#endif /* USE_INFINIBAND */
|
||||
}
|
||||
|
||||
/*
|
||||
@ -520,6 +544,7 @@ struct ctdb_call_state *ctdb_call_send(struct ctdb_context *ctdb,
|
||||
int ret;
|
||||
struct ctdb_ltdb_header header;
|
||||
TDB_DATA data;
|
||||
struct ctdb_node *node;
|
||||
|
||||
/*
|
||||
if we are the dmaster for this key then we don't need to
|
||||
@ -538,8 +563,9 @@ struct ctdb_call_state *ctdb_call_send(struct ctdb_context *ctdb,
|
||||
state = talloc_zero(ctdb, struct ctdb_call_state);
|
||||
CTDB_NO_MEMORY_NULL(ctdb, state);
|
||||
|
||||
node = ctdb->nodes[header.dmaster];
|
||||
len = sizeof(*state->c) + key.dsize + (call_data?call_data->dsize:0);
|
||||
state->c = ctdb->methods->allocate_pkt(ctdb, len);
|
||||
state->c = ctdb->methods->allocate_pkt(node, len);
|
||||
CTDB_NO_MEMORY_NULL(ctdb, state->c);
|
||||
|
||||
state->c->hdr.length = len;
|
||||
@ -566,7 +592,12 @@ struct ctdb_call_state *ctdb_call_send(struct ctdb_context *ctdb,
|
||||
|
||||
talloc_set_destructor(state, ctdb_call_destructor);
|
||||
|
||||
ctdb_queue_packet(ctdb, &state->c->hdr);
|
||||
ctdb_queue_packet(node, &state->c->hdr);
|
||||
|
||||
#ifdef USE_INFINIBAND
|
||||
ctdb->methods->dealloc_pkt(node, state->c);
|
||||
state->c = NULL;
|
||||
#endif /* USE_INFINIBAND */
|
||||
|
||||
event_add_timed(ctdb->ev, state, timeval_current_ofs(CTDB_REQ_TIMEOUT, 0),
|
||||
ctdb_call_timeout, state);
|
||||
|
@ -29,6 +29,9 @@
|
||||
#include "ibwrapper.h"
|
||||
#include "ibw_ctdb.h"
|
||||
|
||||
/* not nice; temporary workaround for the current implementation... */
|
||||
static void *last_key = NULL;
|
||||
|
||||
static int ctdb_ibw_listen(struct ctdb_context *ctdb, int backlog)
|
||||
{
|
||||
struct ibw_ctx *ictx = talloc_get_type(ctdb->private, struct ibw_ctx);
|
||||
@ -108,14 +111,12 @@ static int ctdb_ibw_add_node(struct ctdb_node *node)
|
||||
/*
|
||||
* transport packet allocator - allows transport to control memory for packets
|
||||
*/
|
||||
static void *ctdb_ibw_allocate_pkt(struct ctdb_context *ctdb, size_t size)
|
||||
static void *ctdb_ibw_allocate_pkt(struct ctdb_node *node, size_t size)
|
||||
{
|
||||
struct ibw_conn *conn = NULL;
|
||||
void *buf = NULL;
|
||||
void *key; /* TODO: expand the param list with this */
|
||||
|
||||
/* TODO2: !!! I need "node" or ibw_conn here */
|
||||
if (ibw_alloc_send_buf(conn, &buf, &key, (int)size))
|
||||
if (ibw_alloc_send_buf(conn, &buf, &last_key, size))
|
||||
return NULL;
|
||||
|
||||
return buf;
|
||||
@ -124,20 +125,40 @@ static void *ctdb_ibw_allocate_pkt(struct ctdb_context *ctdb, size_t size)
|
||||
static int ctdb_ibw_queue_pkt(struct ctdb_node *node, uint8_t *data, uint32_t length)
|
||||
{
|
||||
struct ibw_conn *conn = talloc_get_type(node->private, struct ibw_conn);
|
||||
void *key = NULL; /* TODO: expand the param list with this */
|
||||
int rc;
|
||||
|
||||
assert(conn!=NULL);
|
||||
return ibw_send(conn, data, key, length);
|
||||
rc = ibw_send(conn, data, last_key, length);
|
||||
last_key = NULL;
|
||||
|
||||
return rc;
|
||||
}
|
||||
|
||||
static void ctdb_ibw_dealloc_pkt(struct ctdb_node *node, void *data)
|
||||
{
|
||||
if (last_key) {
|
||||
struct ibw_conn *conn = talloc_get_type(node->private, struct ibw_conn);
|
||||
|
||||
assert(conn!=NULL);
|
||||
ibw_cancel_send_buf(conn, data, last_key);
|
||||
} /* else ibw_send is already using it and will free it after completion */
|
||||
}
|
||||
|
||||
static int ctdb_ibw_stop(struct ctdb_context *cctx)
|
||||
{
|
||||
struct ibw_ctx *ictx = talloc_get_type(cctx->private, struct ibw_ctx);
|
||||
|
||||
assert(ictx!=NULL);
|
||||
return ibw_stop(ictx);
|
||||
}
|
||||
|
||||
static const struct ctdb_methods ctdb_ibw_methods = {
|
||||
.start = ctdb_ibw_start,
|
||||
.add_node = ctdb_ibw_add_node,
|
||||
.queue_pkt = ctdb_ibw_queue_pkt,
|
||||
.allocate_pkt = ctdb_ibw_allocate_pkt
|
||||
|
||||
// .dealloc_pkt = ctdb_ibw_dealloc_pkt
|
||||
// .stop = ctdb_ibw_stop
|
||||
.allocate_pkt = ctdb_ibw_allocate_pkt,
|
||||
|
||||
.dealloc_pkt = ctdb_ibw_dealloc_pkt,
|
||||
.stop = ctdb_ibw_stop
|
||||
};
|
||||
|
||||
/*
|
||||
@ -146,7 +167,7 @@ static const struct ctdb_methods ctdb_ibw_methods = {
|
||||
int ctdb_ibw_init(struct ctdb_context *ctdb)
|
||||
{
|
||||
struct ibw_ctx *ictx;
|
||||
|
||||
|
||||
ictx = ibw_init(
|
||||
NULL, //struct ibw_initattr *attr, /* TODO */
|
||||
0, //int nattr, /* TODO */
|
||||
|
@ -55,7 +55,9 @@ struct ctdb_methods {
|
||||
int (*start)(struct ctdb_context *); /* start protocol processing */
|
||||
int (*add_node)(struct ctdb_node *); /* setup a new node */
|
||||
int (*queue_pkt)(struct ctdb_node *, uint8_t *data, uint32_t length);
|
||||
void *(*allocate_pkt)(struct ctdb_context *, size_t );
|
||||
void *(*allocate_pkt)(struct ctdb_node *, size_t);
|
||||
void (*dealloc_pkt)(struct ctdb_node *, void *data);
|
||||
int (*stop)(struct ctdb_context *); /* initiate stopping the protocol */
|
||||
};
|
||||
|
||||
/*
|
||||
@ -70,6 +72,9 @@ struct ctdb_upcalls {
|
||||
|
||||
/* node_connected is called when a connection to a node is established */
|
||||
void (*node_connected)(struct ctdb_node *);
|
||||
|
||||
/* protocol has been stopped */
|
||||
void (*stopped)(struct ctdb_context *);
|
||||
};
|
||||
|
||||
/* main state of the ctdb daemon */
|
||||
|
@ -67,21 +67,31 @@ int ctdb_tcp_add_node(struct ctdb_node *node)
|
||||
/*
|
||||
transport packet allocator - allows transport to control memory for packets
|
||||
*/
|
||||
void *ctdb_tcp_allocate_pkt(struct ctdb_context *ctdb, size_t size)
|
||||
void *ctdb_tcp_allocate_pkt(struct ctdb_node *node, size_t size)
|
||||
{
|
||||
/* tcp transport needs to round to 8 byte alignment to ensure
|
||||
that we can use a length header and 64 bit elements in
|
||||
structures */
|
||||
size = (size+(CTDB_TCP_ALIGNMENT-1)) & ~(CTDB_TCP_ALIGNMENT-1);
|
||||
return talloc_size(ctdb, size);
|
||||
return talloc_size(node, size);
|
||||
}
|
||||
|
||||
void ctdb_tcp_dealloc_pkt(struct ctdb_node *node, void *buf)
|
||||
{
|
||||
talloc_free(buf);
|
||||
}
|
||||
|
||||
int ctdb_tcp_stop(struct ctdb_context *ctdb)
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
static const struct ctdb_methods ctdb_tcp_methods = {
|
||||
.start = ctdb_tcp_start,
|
||||
.add_node = ctdb_tcp_add_node,
|
||||
.queue_pkt = ctdb_tcp_queue_pkt,
|
||||
.allocate_pkt = ctdb_tcp_allocate_pkt
|
||||
.allocate_pkt = ctdb_tcp_allocate_pkt,
|
||||
.dealloc_pkt = ctdb_tcp_dealloc_pkt
|
||||
};
|
||||
|
||||
/*
|
||||
|
Loading…
Reference in New Issue
Block a user