1
0
mirror of https://github.com/samba-team/samba.git synced 2025-03-08 04:58:40 +03:00

merged from peter

(This used to be ctdb commit e6fc75581e2038e98c730a6691a1cc61c5b83afe)
This commit is contained in:
Andrew Tridgell 2006-12-21 09:43:49 +11:00
commit 0d9ec11a50
2 changed files with 348 additions and 105 deletions

View File

@ -49,6 +49,8 @@ static char ibw_lasterr[IBW_LASTERR_BUFSIZE];
static void ibw_event_handler_verbs(struct event_context *ev,
struct fd_event *fde, uint16_t flags, void *private_data);
static int ibw_fill_cq(struct ibw_conn *conn);
static inline int ibw_wc_recv(struct ibw_conn *conn, struct ibv_wc *wc);
static inline int ibw_wc_send(struct ibw_conn *conn, struct ibv_wc *wc);
static void *ibw_alloc_mr(struct ibw_ctx_priv *pctx, struct ibw_conn_priv *pconn,
int n, struct ibv_mr **ppmr)
@ -86,31 +88,31 @@ static int ibw_init_memory(struct ibw_conn *conn)
{
struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv);
struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
struct ibw_opts *opts = &pctx->opts;
int i;
struct ibw_wr *p;
pconn->buf_send = ibw_alloc_mr(pctx, pconn,
pctx->opts.max_send_wr * pctx->opts.avg_send_size, &pconn->mr_send);
opts->max_send_wr * opts->avg_send_size, &pconn->mr_send);
if (!pconn->buf_send) {
sprintf(ibw_lasterr, "couldn't allocate work send buf\n");
return -1;
}
pconn->buf_recv = ibw_alloc_mr(pctx, pconn,
pctx->opts.max_recv_wr * pctx->opts.recv_bufsize, &pconn->mr_recv);
opts->max_recv_wr * opts->recv_bufsize, &pconn->mr_recv);
if (!pconn->buf_recv) {
sprintf(ibw_lasterr, "couldn't allocate work recv buf\n");
return -1;
}
pconn->wr_index = talloc_size(pconn, pctx->opts.max_send_wr * sizeof(struct ibw_wr *));
pconn->wr_index = talloc_size(pconn, opts->max_send_wr * sizeof(struct ibw_wr *));
assert(pconn->wr_index!=NULL);
for(i=0; i<pctx->opts.max_send_wr; i++) {
for(i=0; i<opts->max_send_wr; i++) {
p = pconn->wr_index[i] = talloc_zero(pconn, struct ibw_wr);
p->msg = pconn->buf_send + (i * pctx->opts.avg_send_size);
p->wr_id = i;
p->msg = pconn->buf_send + (i * opts->avg_send_size);
p->wr_id = i + opts->max_recv_wr;
DLIST_ADD(pconn->wr_list_avail, p);
}
@ -284,7 +286,7 @@ static int ibw_refill_cq_recv(struct ibw_conn *conn)
struct ibv_recv_wr *bad_wr;
list.addr = (uintptr_t) pconn->buf_recv + pctx->opts.recv_bufsize * pconn->recv_index;
wr.wr_id = pctx->opts.max_send_wr + pconn->recv_index;
wr.wr_id = pconn->recv_index;
pconn->recv_index = (pconn->recv_index + 1) % pctx->opts.max_recv_wr;
rc = ibv_post_recv(pconn->cm_id->qp, &wr, &bad_wr);
@ -316,7 +318,7 @@ static int ibw_fill_cq(struct ibw_conn *conn)
for(i = pctx->opts.max_recv_wr; i!=0; i--) {
list.addr = (uintptr_t) pconn->buf_recv + pctx->opts.recv_bufsize * pconn->recv_index;
wr.wr_id = pctx->opts.max_send_wr + pconn->recv_index;
wr.wr_id = pconn->recv_index;
pconn->recv_index = (pconn->recv_index + 1) % pctx->opts.max_recv_wr;
rc = ibv_post_recv(pconn->cm_id->qp, &wr, &bad_wr);
@ -503,67 +505,61 @@ static void ibw_event_handler_verbs(struct event_context *ev,
struct ibv_wc wc;
int rc;
struct ibv_cq *ev_cq;
void *ev_ctx;
rc = ibv_poll_cq(pconn->cq, 1, &wc);
if (rc!=1) {
sprintf(ibw_lasterr, "ibv_poll_cq error %d\n", rc);
/* TODO: check whether if it's good to have more channels here... */
rc = ibv_get_cq_event(pconn->verbs_channel, &ev_cq, &ev_ctx);
if (rc) {
sprintf(ibw_lasterr, "Failed to get cq_event with %d\n", rc);
goto error;
}
if (wc.status) {
sprintf(ibw_lasterr, "cq completion failed status %d\n",
wc.status);
if (ev_cq != pconn->cq) {
sprintf(ibw_lasterr, "ev_cq(%u) != pconn->cq(%u)\n",
(unsigned int)ev_cq, (unsigned int)pconn->cq);
goto error;
}
rc = ibv_req_notify_cq(pconn->cq, 0);
if (rc) {
sprintf(ibw_lasterr, "Couldn't request CQ notification (%d)\n", rc);
goto error;
}
switch(wc.opcode) {
case IBV_WC_SEND:
{
struct ibw_wr *p;
while((rc=ibv_poll_cq(pconn->cq, 1, &wc))==1) {
if (wc.status) {
sprintf(ibw_lasterr, "cq completion failed status %d\n",
wc.status);
goto error;
}
switch(wc.opcode) {
case IBV_WC_SEND:
DEBUG(10, ("send completion\n"));
assert(pconn->cm_id->qp->qp_num==wc.qp_num);
assert(wc.wr_id < pctx->opts.max_send_wr);
p = pconn->wr_index[wc.wr_id];
if (p->msg_large) {
ibw_free_mr(&p->msg_large, &p->mr_large);
}
DLIST_REMOVE(pconn->wr_list_used, p);
DLIST_ADD(pconn->wr_list_avail, p);
}
break;
case IBV_WC_RDMA_WRITE:
DEBUG(10, ("rdma write completion\n"));
break;
case IBV_WC_RDMA_READ:
DEBUG(10, ("rdma read completion\n"));
break;
case IBV_WC_RECV:
{
int recv_index;
DEBUG(10, ("recv completion\n"));
assert(pconn->cm_id->qp->qp_num==wc.qp_num);
assert((int)wc.wr_id > pctx->opts.max_send_wr);
recv_index = (int)wc.wr_id - pctx->opts.max_send_wr;
assert(recv_index < pctx->opts.max_recv_wr);
assert(wc.byte_len <= pctx->opts.recv_bufsize);
/* TODO: take care of fragmented messages !!! */
pctx->receive_func(conn,
pconn->buf_recv + (recv_index * pctx->opts.recv_bufsize),
wc.byte_len);
if (ibw_refill_cq_recv(conn))
if (ibw_wc_send(conn, &wc))
goto error;
}
break;
break;
default:
sprintf(ibw_lasterr, "unknown completion %d\n", wc.opcode);
case IBV_WC_RDMA_WRITE:
DEBUG(10, ("rdma write completion\n"));
break;
case IBV_WC_RDMA_READ:
DEBUG(10, ("rdma read completion\n"));
break;
case IBV_WC_RECV:
DEBUG(10, ("recv completion\n"));
if (ibw_wc_recv(conn, &wc))
goto error;
break;
default:
sprintf(ibw_lasterr, "unknown completion %d\n", wc.opcode);
goto error;
}
}
if (rc!=0) {
sprintf(ibw_lasterr, "ibv_poll_cq error %d\n", rc);
goto error;
}
@ -574,6 +570,199 @@ error:
pctx->connstate_func(NULL, conn);
}
static inline int ibw_wc_send(struct ibw_conn *conn, struct ibv_wc *wc)
{
struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv);
struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
struct ibw_wr *p;
int send_index;
assert(pconn->cm_id->qp->qp_num==wc->qp_num);
assert(wc->wr_id > pctx->opts.max_recv_wr);
send_index = wc->wr_id - pctx->opts.max_recv_wr;
if (send_index < pctx->opts.max_send_wr) {
DEBUG(10, ("ibw_wc_send#1 %u", (int)wc->wr_id));
p = pconn->wr_index[send_index];
if (p->msg_large)
ibw_free_mr(&p->msg_large, &p->mr_large);
DLIST_REMOVE(pconn->wr_list_used, p);
DLIST_ADD(pconn->wr_list_avail, p);
} else {
DEBUG(10, ("ibw_wc_send#2 %u", (int)wc->wr_id));
for(p=pconn->queue_sent; p!=NULL; p=p->next)
if (p->wr_id==(int)wc->wr_id)
break;
if (p==NULL) {
sprintf(ibw_lasterr, "failed to find wr_id %d\n", (int)wc->wr_id);
return -1;
}
ibw_free_mr(&p->msg_large, &p->mr_large);
DLIST_REMOVE(pconn->queue_sent, p);
DLIST_ADD(pconn->queue_avail, p);
}
if (pconn->queue) {
struct ibv_sge list = {
.addr = (uintptr_t) NULL,
.length = *(uint32_t *)(p->msg_large),
.lkey = 0
};
struct ibv_send_wr wr = {
.wr_id = p->wr_id + pctx->opts.max_recv_wr,
.sg_list = &list,
.num_sge = 1,
.opcode = IBV_WR_SEND,
.send_flags = IBV_SEND_SIGNALED,
};
struct ibv_send_wr *bad_wr;
int rc;
p = pconn->queue;
DLIST_REMOVE(pconn->queue, p);
DLIST_ADD(pconn->queue_sent, p);
rc = ibv_post_send(pconn->cm_id->qp, &wr, &bad_wr);
if (rc) {
sprintf(ibw_lasterr, "ibv_post_send failed with %d\n", rc);
return -1;
}
}
return 0;
}
static inline int ibw_append_to_part(void *memctx, struct ibw_part *part,
char **pp, uint32_t add_len, int info)
{
/* allocate more if necessary - it's an "evergrowing" buffer... */
if (part->len + add_len > part->bufsize) {
if (part->buf==NULL) {
assert(part->len==0);
part->buf = talloc_size(memctx, add_len);
if (part->buf==NULL) {
sprintf(ibw_lasterr, "recv talloc_size error (%u) #%d\n",
add_len, info);
return -1;
}
part->bufsize = add_len;
} else {
part->buf = talloc_realloc_size(memctx,
part->buf, part->len + add_len);
if (part->buf==NULL) {
sprintf(ibw_lasterr, "recv realloc error (%u + %u) #%d\n",
part->len, add_len, info);
return -1;
}
}
part->bufsize = part->len + add_len;
}
/* consume pp */
memcpy(part->buf + part->len, *pp, add_len);
*pp += add_len;
part->len += add_len;
part->to_read -= add_len;
return 0;
}
static inline int ibw_wc_mem_threshold(void *memctx, struct ibw_part *part, uint32_t threshold)
{
if (part->bufsize > threshold) {
talloc_free(part->buf);
part->buf = talloc_size(memctx, threshold);
if (part->buf==NULL) {
sprintf(ibw_lasterr, "talloc_size failed\n");
return -1;
}
part->bufsize = threshold;
}
return 0;
}
static inline int ibw_wc_recv(struct ibw_conn *conn, struct ibv_wc *wc)
{
struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv);
struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
struct ibw_part *part = &pconn->part;
char *p;
uint32_t remain;
assert(pconn->cm_id->qp->qp_num==wc->qp_num);
assert((int)wc->wr_id < pctx->opts.max_recv_wr);
assert(wc->byte_len <= pctx->opts.recv_bufsize);
p = pconn->buf_recv + ((int)wc->wr_id * pctx->opts.recv_bufsize);
remain = wc->byte_len;
while(remain) {
/* here always true: (part->len!=0 && part->to_read!=0) ||
(part->len==0 && part->to_read==0) */
if (part->len) { /* is there a partial msg to be continued? */
int read_len = (part->to_read<=remain) ? part->to_read : remain;
if (ibw_append_to_part(pconn, part, &p, read_len, 421))
goto error;
remain -= read_len;
if (part->len<=sizeof(uint32_t) && part->to_read==0) {
assert(part->len==sizeof(uint32_t));
/* set it again now... */
part->to_read = *((uint32_t *)(part->buf));
if (part->to_read<sizeof(uint32_t)) {
sprintf(ibw_lasterr, "got msglen=%u #2\n", part->to_read);
goto error;
}
part->to_read -= sizeof(uint32_t); /* it's already read */
}
if (part->to_read==0) {
pctx->receive_func(conn, part->buf, part->len);
part->len = 0; /* tells not having partial data (any more) */
if (ibw_wc_mem_threshold(pconn, part, pctx->opts.recv_threshold))
goto error;
}
} else {
if (remain>=sizeof(uint32_t)) {
uint32_t msglen = *(uint32_t *)p;
if (msglen<sizeof(uint32_t)) {
sprintf(ibw_lasterr, "got msglen=%u\n", msglen);
goto error;
}
/* mostly awaited case: */
if (msglen<=remain) {
pctx->receive_func(conn, p, msglen);
p += msglen;
remain -= msglen;
} else {
part->to_read = msglen;
/* part->len is already 0 */
if (ibw_append_to_part(pconn, part, &p, remain, 422))
goto error;
remain = 0; /* to be continued ... */
/* part->to_read > 0 here */
}
} else { /* edge case: */
part->to_read = sizeof(uint32_t);
/* part->len is already 0 */
if (ibw_append_to_part(pconn, part, &p, remain, 423))
goto error;
remain = 0;
/* part->to_read > 0 here */
}
}
} /* <remain> is always decreased at least by 1 */
if (ibw_refill_cq_recv(conn))
goto error;
return 0;
error:
DEBUG(0, ("ibw_wc_recv error: %s", ibw_lasterr));
return -1;
}
static int ibw_process_init_attrs(struct ibw_initattr *attr, int nattr, struct ibw_opts *opts)
{
int i;
@ -583,6 +772,7 @@ static int ibw_process_init_attrs(struct ibw_initattr *attr, int nattr, struct i
opts->max_recv_wr = 1024;
opts->avg_send_size = 1024;
opts->recv_bufsize = 256;
opts->recv_threshold = 1 * 1024 * 1024;
for(i=0; i<nattr; i++) {
name = attr[i].name;
@ -597,6 +787,8 @@ static int ibw_process_init_attrs(struct ibw_initattr *attr, int nattr, struct i
opts->avg_send_size = atoi(value);
else if (strcmp(name, "recv_bufsize")==0)
opts->recv_bufsize = atoi(value);
else if (strcmp(name, "recv_threshold")==0)
opts->recv_threshold = atoi(value);
else {
sprintf(ibw_lasterr, "ibw_init: unknown name %s\n", name);
return -1;
@ -800,65 +992,102 @@ int ibw_alloc_send_buf(struct ibw_conn *conn, void **buf, void **key, int n)
struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
struct ibw_wr *p = pconn->wr_list_avail;
if (p==NULL) {
sprintf(ibw_lasterr, "insufficient wr chunks\n");
return -1;
}
if (p) {
DLIST_REMOVE(pconn->wr_list_avail, p);
DLIST_ADD(pconn->wr_list_used, p);
DLIST_REMOVE(pconn->wr_list_avail, p);
DLIST_ADD(pconn->wr_list_used, p);
if (n + sizeof(long) <= pctx->opts.avg_send_size) {
*buf = (void *)(p->msg + sizeof(long));
*key = (void *)p;
if (n + sizeof(long) <= pctx->opts.avg_send_size) {
*buf = (void *)(p->msg + sizeof(long));
} else {
p->msg_large = ibw_alloc_mr(pctx, pconn, n + sizeof(long), &p->mr_large);
if (!p->msg_large) {
sprintf(ibw_lasterr, "ibw_alloc_mr#1 failed\n");
goto error;
}
*buf = (void *)(p->msg_large + sizeof(long));
}
} else {
/* not optimized */
p = pconn->queue_avail;
if (!p) {
p = pconn->queue_avail = talloc_zero(pconn, struct ibw_wr);
if (p==NULL) {
sprintf(ibw_lasterr, "talloc_zero failed (qmax: %u)", pconn->queue_max);
goto error;
}
p->wr_id = pconn->queue_max + pctx->opts.max_send_wr;
pconn->queue_max++;
switch(pconn->queue_max) {
case 1: DEBUG(2, ("warning: queue performed\n")); break;
case 10: DEBUG(0, ("warning: queue reached 10\n")); break;
case 100: DEBUG(0, ("warning: queue reached 100\n")); break;
case 1000: DEBUG(0, ("warning: queue reached 1000\n")); break;
default: break;
}
}
DLIST_REMOVE(pconn->queue_avail, p);
p->msg_large = ibw_alloc_mr(pctx, pconn, n + sizeof(long), &p->mr_large);
if (!p->msg_large) {
sprintf(ibw_lasterr, "ibw_alloc_send_buf alloc error\n");
DEBUG(0, (ibw_lasterr));
return -1;
sprintf(ibw_lasterr, "ibw_alloc_mr#2 failed");
goto error;
}
*buf = (void *)(p->msg_large + sizeof(long));
}
*key = (void *)p;
return 0;
error:
DEBUG(0, ("ibw_alloc_send_buf error: %s\n", ibw_lasterr));
return -1;
}
int ibw_send(struct ibw_conn *conn, void *buf, void *key, int n)
{
struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv);
struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
struct ibw_wr *p = talloc_get_type(key, struct ibw_wr);
struct ibv_sge list = {
.addr = (uintptr_t) NULL,
.length = n,
.lkey = 0
};
struct ibv_send_wr wr = {
.wr_id = p->wr_id,
.sg_list = &list,
.num_sge = 1,
.opcode = IBV_WR_SEND,
.send_flags = IBV_SEND_SIGNALED,
};
struct ibv_send_wr *bad_wr;
if (n + sizeof(long)<=pctx->opts.avg_send_size) {
assert((p->msg + sizeof(long))==(char *)buf);
list.lkey = pconn->mr_send->lkey;
list.addr = (uintptr_t) p->msg;
if (p->msg!=NULL) {
struct ibv_sge list = {
.addr = (uintptr_t) NULL,
.length = n,
.lkey = 0
};
struct ibv_send_wr wr = {
.wr_id = p->wr_id + pctx->opts.max_recv_wr,
.sg_list = &list,
.num_sge = 1,
.opcode = IBV_WR_SEND,
.send_flags = IBV_SEND_SIGNALED,
};
struct ibv_send_wr *bad_wr;
*((uint32_t *)p->msg) = htonl(n);
} else {
assert((p->msg_large + sizeof(long))==(char *)buf);
assert(p->mr_large!=NULL);
list.lkey = p->mr_large->lkey;
list.addr = (uintptr_t) p->msg_large;
if (n + sizeof(uint32_t)<=pctx->opts.avg_send_size) {
assert((p->msg + sizeof(long))==(char *)buf);
list.lkey = pconn->mr_send->lkey;
list.addr = (uintptr_t) p->msg;
*((uint32_t *)p->msg) = htonl(n);
} else {
assert((p->msg_large + sizeof(long))==(char *)buf);
assert(p->mr_large!=NULL);
list.lkey = p->mr_large->lkey;
list.addr = (uintptr_t) p->msg_large;
*((uint32_t *)p->msg_large) = htonl(n);
}
return ibv_post_send(pconn->cm_id->qp, &wr, &bad_wr);
} /* else: */
*((uint32_t *)p->msg_large) = htonl(n);
}
*((uint32_t *)p->msg_large) = htonl(n);
return ibv_post_send(pconn->cm_id->qp, &wr, &bad_wr);
/* to be sent by ibw_wc_send */
DLIST_ADD_END(pconn->queue, p, struct ibw_wr *); /* TODO: optimize */
return 0;
}
const char *ibw_getLastError(void)

View File

@ -22,10 +22,11 @@
*/
struct ibw_opts {
int max_send_wr;
int max_recv_wr;
int avg_send_size;
int recv_bufsize;
uint32_t max_send_wr;
uint32_t max_recv_wr;
uint32_t avg_send_size;
uint32_t recv_bufsize;
uint32_t recv_threshold;
};
struct ibw_wr {
@ -56,6 +57,13 @@ struct ibw_ctx_priv {
long pagesize; /* sysconf result for memalign */
};
struct ibw_part {
char *buf; /* talloced memory buffer */
uint32_t bufsize; /* allocated size of buf - always grows */
uint32_t len; /* message part length */
uint32_t to_read; /* 4 or *((uint32_t)buf) if len>=sizeof(uint32_t) */
};
struct ibw_conn_priv {
struct ibv_comp_channel *verbs_channel;
struct fd_event *verbs_channel_event;
@ -71,9 +79,15 @@ struct ibw_conn_priv {
struct ibw_wr *wr_list_used;
struct ibw_wr **wr_index; /* array[0..(qsize-1)] of (ibw_wr *) */
struct ibw_wr *queue;
struct ibw_wr *queue_sent;
struct ibw_wr *queue_avail;
int queue_max; /* max wr_id in the queue */
/* buf_recv is a ring buffer */
char *buf_recv; /* max_recv_wr * avg_recv_size */
struct ibv_mr *mr_recv;
int recv_index; /* index of the next recv buffer */
int recv_index; /* index of the next recv buffer when refilling */
struct ibw_part part;
};