mirror of
https://github.com/samba-team/samba.git
synced 2025-01-11 05:18:09 +03:00
ib: fragment sent buf + many bugfixes
It came to light I have to fragment the send buffer in case destination's to fit receiver's buffers. Additionally fixed many bugs. Still testing. + TODO: clean code. (This used to be ctdb commit 2f8876f09bc92169487cb077326579044560a121)
This commit is contained in:
parent
3222a41b9b
commit
cae71b84d6
@ -39,6 +39,7 @@
|
||||
#include "ibwrapper.h"
|
||||
|
||||
#include <rdma/rdma_cma.h>
|
||||
#include "infiniband/sa-kern-abi.h"
|
||||
|
||||
#include "ibwrapper_internal.h"
|
||||
#include "lib/util/dlinklist.h"
|
||||
@ -46,11 +47,17 @@
|
||||
#define IBW_LASTERR_BUFSIZE 512
|
||||
static char ibw_lasterr[IBW_LASTERR_BUFSIZE];
|
||||
|
||||
#define IBW_MAX_SEND_WR 256
|
||||
#define IBW_MAX_RECV_WR 1024
|
||||
#define IBW_RECV_BUFSIZE 256
|
||||
#define IBW_RECV_THRESHOLD (1 * 1024 * 1024)
|
||||
|
||||
static void ibw_event_handler_verbs(struct event_context *ev,
|
||||
struct fd_event *fde, uint16_t flags, void *private_data);
|
||||
static int ibw_fill_cq(struct ibw_conn *conn);
|
||||
static inline int ibw_wc_recv(struct ibw_conn *conn, struct ibv_wc *wc);
|
||||
static inline int ibw_wc_send(struct ibw_conn *conn, struct ibv_wc *wc);
|
||||
static int ibw_wc_recv(struct ibw_conn *conn, struct ibv_wc *wc);
|
||||
static int ibw_wc_send(struct ibw_conn *conn, struct ibv_wc *wc);
|
||||
static int ibw_send_packet(struct ibw_conn *conn, void *buf, struct ibw_wr *p, uint32_t len);
|
||||
|
||||
static void *ibw_alloc_mr(struct ibw_ctx_priv *pctx, struct ibw_conn_priv *pconn,
|
||||
uint32_t n, struct ibv_mr **ppmr)
|
||||
@ -97,7 +104,7 @@ static int ibw_init_memory(struct ibw_conn *conn)
|
||||
|
||||
DEBUG(10, ("ibw_init_memory(cmid: %p)\n", pconn->cm_id));
|
||||
pconn->buf_send = ibw_alloc_mr(pctx, pconn,
|
||||
opts->max_send_wr * opts->avg_send_size, &pconn->mr_send);
|
||||
opts->max_send_wr * opts->recv_bufsize, &pconn->mr_send);
|
||||
if (!pconn->buf_send) {
|
||||
sprintf(ibw_lasterr, "couldn't allocate work send buf\n");
|
||||
return -1;
|
||||
@ -115,7 +122,7 @@ static int ibw_init_memory(struct ibw_conn *conn)
|
||||
|
||||
for(i=0; i<opts->max_send_wr; i++) {
|
||||
p = pconn->wr_index[i] = talloc_zero(pconn, struct ibw_wr);
|
||||
p->msg = pconn->buf_send + (i * opts->avg_send_size);
|
||||
p->buf = pconn->buf_send + (i * opts->recv_bufsize);
|
||||
p->wr_id = i;
|
||||
|
||||
DLIST_ADD(pconn->wr_list_avail, p);
|
||||
@ -228,6 +235,7 @@ static int ibw_setup_cq_qp(struct ibw_conn *conn)
|
||||
struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv);
|
||||
struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
|
||||
struct ibv_qp_init_attr init_attr;
|
||||
struct ibv_qp_attr attr;
|
||||
int rc;
|
||||
|
||||
DEBUG(10, ("ibw_setup_cq_qp(cmid: %p)\n", pconn->cm_id));
|
||||
@ -286,6 +294,12 @@ static int ibw_setup_cq_qp(struct ibw_conn *conn)
|
||||
}
|
||||
/* elase result is in pconn->cm_id->qp */
|
||||
|
||||
rc = ibv_query_qp(pconn->cm_id->qp, &attr, IBV_QP_PATH_MTU, &init_attr);
|
||||
if (rc) {
|
||||
sprintf(ibw_lasterr, "ibv_query_qp failed with %d\n", rc);
|
||||
return rc;
|
||||
}
|
||||
|
||||
return ibw_fill_cq(conn);
|
||||
}
|
||||
|
||||
@ -295,12 +309,12 @@ static int ibw_refill_cq_recv(struct ibw_conn *conn)
|
||||
struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
|
||||
int rc;
|
||||
struct ibv_sge list = {
|
||||
.addr = (uintptr_t) NULL,
|
||||
.addr = (uintptr_t) NULL, /* filled below */
|
||||
.length = pctx->opts.recv_bufsize,
|
||||
.lkey = pconn->mr_recv->lkey
|
||||
.lkey = pconn->mr_recv->lkey /* always the same */
|
||||
};
|
||||
struct ibv_recv_wr wr = {
|
||||
.wr_id = 0,
|
||||
.wr_id = 0, /* filled below */
|
||||
.sg_list = &list,
|
||||
.num_sge = 1,
|
||||
};
|
||||
@ -314,7 +328,7 @@ static int ibw_refill_cq_recv(struct ibw_conn *conn)
|
||||
|
||||
rc = ibv_post_recv(pconn->cm_id->qp, &wr, &bad_wr);
|
||||
if (rc) {
|
||||
sprintf(ibw_lasterr, "ibv_post_recv failed with %d\n", rc);
|
||||
sprintf(ibw_lasterr, "refill/ibv_post_recv failed with %d\n", rc);
|
||||
DEBUG(0, (ibw_lasterr));
|
||||
return -2;
|
||||
}
|
||||
@ -328,12 +342,12 @@ static int ibw_fill_cq(struct ibw_conn *conn)
|
||||
struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
|
||||
int i, rc;
|
||||
struct ibv_sge list = {
|
||||
.addr = (uintptr_t) NULL,
|
||||
.addr = (uintptr_t) NULL, /* filled below */
|
||||
.length = pctx->opts.recv_bufsize,
|
||||
.lkey = pconn->mr_recv->lkey
|
||||
.lkey = pconn->mr_recv->lkey /* always the same */
|
||||
};
|
||||
struct ibv_recv_wr wr = {
|
||||
.wr_id = 0,
|
||||
.wr_id = 0, /* filled below */
|
||||
.sg_list = &list,
|
||||
.num_sge = 1,
|
||||
};
|
||||
@ -348,7 +362,7 @@ static int ibw_fill_cq(struct ibw_conn *conn)
|
||||
|
||||
rc = ibv_post_recv(pconn->cm_id->qp, &wr, &bad_wr);
|
||||
if (rc) {
|
||||
sprintf(ibw_lasterr, "ibv_post_recv failed with %d\n", rc);
|
||||
sprintf(ibw_lasterr, "fill/ibv_post_recv failed with %d\n", rc);
|
||||
DEBUG(0, (ibw_lasterr));
|
||||
return -2;
|
||||
}
|
||||
@ -532,8 +546,7 @@ static void ibw_event_handler_verbs(struct event_context *ev,
|
||||
goto error;
|
||||
}
|
||||
if (ev_cq != pconn->cq) {
|
||||
sprintf(ibw_lasterr, "ev_cq(%u) != pconn->cq(%u)\n",
|
||||
(unsigned int)ev_cq, (unsigned int)pconn->cq);
|
||||
sprintf(ibw_lasterr, "ev_cq(%p) != pconn->cq(%p)\n", ev_cq, pconn->cq);
|
||||
goto error;
|
||||
}
|
||||
rc = ibv_req_notify_cq(pconn->cq, 0);
|
||||
@ -544,8 +557,8 @@ static void ibw_event_handler_verbs(struct event_context *ev,
|
||||
|
||||
while((rc=ibv_poll_cq(pconn->cq, 1, &wc))==1) {
|
||||
if (wc.status) {
|
||||
sprintf(ibw_lasterr, "cq completion failed status %d\n",
|
||||
wc.status);
|
||||
sprintf(ibw_lasterr, "cq completion failed status %d rc %d\n",
|
||||
wc.status, rc);
|
||||
goto error;
|
||||
}
|
||||
|
||||
@ -587,12 +600,13 @@ error:
|
||||
pctx->connstate_func(NULL, conn);
|
||||
}
|
||||
|
||||
static inline int ibw_wc_send(struct ibw_conn *conn, struct ibv_wc *wc)
|
||||
static int ibw_wc_send(struct ibw_conn *conn, struct ibv_wc *wc)
|
||||
{
|
||||
struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv);
|
||||
struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
|
||||
struct ibw_wr *p;
|
||||
int send_index;
|
||||
int rc = 0;
|
||||
|
||||
DEBUG(10, ("ibw_wc_send(cmid: %p, wr_id: %u, bl: %u)\n",
|
||||
pconn->cm_id, (uint32_t)wc->wr_id, (uint32_t)wc->byte_len));
|
||||
@ -605,10 +619,19 @@ static inline int ibw_wc_send(struct ibw_conn *conn, struct ibv_wc *wc)
|
||||
if (send_index < pctx->opts.max_send_wr) {
|
||||
DEBUG(10, ("ibw_wc_send#1 %u\n", (int)wc->wr_id));
|
||||
p = pconn->wr_index[send_index];
|
||||
if (p->msg_large)
|
||||
ibw_free_mr(&p->msg_large, &p->mr_large);
|
||||
if (p->buf_large!=NULL) {
|
||||
if (p->ref_cnt) {
|
||||
/* awaiting more of it... */
|
||||
p->ref_cnt--;
|
||||
} else {
|
||||
ibw_free_mr(&p->buf_large, &p->mr_large);
|
||||
DLIST_REMOVE(pconn->wr_list_used, p);
|
||||
DLIST_ADD(pconn->wr_list_avail, p);
|
||||
}
|
||||
} else { /* nasty - but necessary */
|
||||
DLIST_REMOVE(pconn->wr_list_used, p);
|
||||
DLIST_ADD(pconn->wr_list_avail, p);
|
||||
}
|
||||
} else { /* "extra" request - not optimized */
|
||||
DEBUG(10, ("ibw_wc_send#2 %u\n", (int)wc->wr_id));
|
||||
for(p=pconn->extra_sent; p!=NULL; p=p->next)
|
||||
@ -618,25 +641,42 @@ static inline int ibw_wc_send(struct ibw_conn *conn, struct ibv_wc *wc)
|
||||
sprintf(ibw_lasterr, "failed to find wr_id %d\n", (int)wc->wr_id);
|
||||
return -1;
|
||||
}
|
||||
ibw_free_mr(&p->msg_large, &p->mr_large);
|
||||
if (p->ref_cnt) {
|
||||
p->ref_cnt--;
|
||||
} else {
|
||||
ibw_free_mr(&p->buf_large, &p->mr_large);
|
||||
DLIST_REMOVE(pconn->extra_sent, p);
|
||||
DLIST_ADD(pconn->extra_avail, p);
|
||||
}
|
||||
}
|
||||
|
||||
if (pconn->queue) {
|
||||
uint32_t msg_size;
|
||||
|
||||
DEBUG(10, ("ibw_wc_send#queue %u\n", (int)wc->wr_id));
|
||||
|
||||
p = pconn->queue;
|
||||
DLIST_REMOVE(pconn->queue, p);
|
||||
|
||||
assert(p->queued_ref_cnt>0);
|
||||
p->queued_ref_cnt--;
|
||||
|
||||
msg_size = (p->queued_ref_cnt) ? pctx->opts.recv_bufsize : p->queued_rlen;
|
||||
|
||||
assert(p->queued_msg!=NULL);
|
||||
ibw_send(conn, p->queued_msg, p, ntohl(*(uint32_t *)p->queued_msg));
|
||||
assert(msg_size!=0);
|
||||
rc = ibw_send_packet(conn, p->queued_msg, p, msg_size);
|
||||
if (p->queued_ref_cnt) {
|
||||
p->queued_msg += pctx->opts.recv_bufsize;
|
||||
} else {
|
||||
DLIST_REMOVE2(pconn->queue, p, qprev, qnext);
|
||||
p->queued_msg = NULL;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static inline int ibw_append_to_part(struct ibw_conn_priv *pconn,
|
||||
return rc;
|
||||
}
|
||||
|
||||
static int ibw_append_to_part(struct ibw_conn_priv *pconn,
|
||||
struct ibw_part *part, char **pp, uint32_t add_len, int info)
|
||||
{
|
||||
DEBUG(10, ("ibw_append_to_part: cmid=%p, (bs=%u, len=%u, tr=%u), al=%u, i=%u\n",
|
||||
@ -674,7 +714,7 @@ static inline int ibw_append_to_part(struct ibw_conn_priv *pconn,
|
||||
return 0;
|
||||
}
|
||||
|
||||
static inline int ibw_wc_mem_threshold(struct ibw_conn_priv *pconn,
|
||||
static int ibw_wc_mem_threshold(struct ibw_conn_priv *pconn,
|
||||
struct ibw_part *part, uint32_t threshold)
|
||||
{
|
||||
DEBUG(10, ("ibw_wc_mem_threshold: cmid=%p, (bs=%u, len=%u, tr=%u), thr=%u\n",
|
||||
@ -694,7 +734,7 @@ static inline int ibw_wc_mem_threshold(struct ibw_conn_priv *pconn,
|
||||
return 0;
|
||||
}
|
||||
|
||||
static inline int ibw_wc_recv(struct ibw_conn *conn, struct ibv_wc *wc)
|
||||
static int ibw_wc_recv(struct ibw_conn *conn, struct ibv_wc *wc)
|
||||
{
|
||||
struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv);
|
||||
struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
|
||||
@ -786,11 +826,10 @@ static int ibw_process_init_attrs(struct ibw_initattr *attr, int nattr, struct i
|
||||
|
||||
DEBUG(10, ("ibw_process_init_attrs: nattr: %d\n", nattr));
|
||||
|
||||
opts->max_send_wr = 256;
|
||||
opts->max_recv_wr = 1024;
|
||||
opts->avg_send_size = 1024;
|
||||
opts->recv_bufsize = 256;
|
||||
opts->recv_threshold = 1 * 1024 * 1024;
|
||||
opts->max_send_wr = IBW_MAX_SEND_WR;
|
||||
opts->max_recv_wr = IBW_MAX_RECV_WR;
|
||||
opts->recv_bufsize = IBW_RECV_BUFSIZE;
|
||||
opts->recv_threshold = IBW_RECV_THRESHOLD;
|
||||
|
||||
for(i=0; i<nattr; i++) {
|
||||
name = attr[i].name;
|
||||
@ -801,8 +840,6 @@ static int ibw_process_init_attrs(struct ibw_initattr *attr, int nattr, struct i
|
||||
opts->max_send_wr = atoi(value);
|
||||
else if (strcmp(name, "max_recv_wr")==0)
|
||||
opts->max_recv_wr = atoi(value);
|
||||
else if (strcmp(name, "avg_send_size")==0)
|
||||
opts->avg_send_size = atoi(value);
|
||||
else if (strcmp(name, "recv_bufsize")==0)
|
||||
opts->recv_bufsize = atoi(value);
|
||||
else if (strcmp(name, "recv_threshold")==0)
|
||||
@ -1003,7 +1040,7 @@ int ibw_disconnect(struct ibw_conn *conn)
|
||||
|
||||
rc = rdma_disconnect(pconn->cm_id);
|
||||
if (rc) {
|
||||
sprintf(ibw_lasterr, "ibw_disconnect failed with %d", rc);
|
||||
sprintf(ibw_lasterr, "ibw_disconnect failed with %d\n", rc);
|
||||
DEBUG(0, (ibw_lasterr));
|
||||
return rc;
|
||||
}
|
||||
@ -1023,15 +1060,15 @@ int ibw_alloc_send_buf(struct ibw_conn *conn, void **buf, void **key, uint32_t l
|
||||
DLIST_REMOVE(pconn->wr_list_avail, p);
|
||||
DLIST_ADD(pconn->wr_list_used, p);
|
||||
|
||||
if (len <= pctx->opts.avg_send_size) {
|
||||
*buf = (void *)p->msg;
|
||||
if (len <= pctx->opts.recv_bufsize) {
|
||||
*buf = (void *)p->buf;
|
||||
} else {
|
||||
p->msg_large = ibw_alloc_mr(pctx, pconn, len, &p->mr_large);
|
||||
if (!p->msg_large) {
|
||||
p->buf_large = ibw_alloc_mr(pctx, pconn, len, &p->mr_large);
|
||||
if (p->buf_large==NULL) {
|
||||
sprintf(ibw_lasterr, "ibw_alloc_mr#1 failed\n");
|
||||
goto error;
|
||||
}
|
||||
*buf = (void *)p->msg_large;
|
||||
*buf = (void *)p->buf_large;
|
||||
}
|
||||
/* p->wr_id is already filled in ibw_init_memory */
|
||||
} else {
|
||||
@ -1041,7 +1078,7 @@ int ibw_alloc_send_buf(struct ibw_conn *conn, void **buf, void **key, uint32_t l
|
||||
if (!p) {
|
||||
p = pconn->extra_avail = talloc_zero(pconn, struct ibw_wr);
|
||||
if (p==NULL) {
|
||||
sprintf(ibw_lasterr, "talloc_zero failed (emax: %u)", pconn->extra_max);
|
||||
sprintf(ibw_lasterr, "talloc_zero failed (emax: %u)\n", pconn->extra_max);
|
||||
goto error;
|
||||
}
|
||||
p->wr_id = pctx->opts.max_send_wr + pconn->extra_max;
|
||||
@ -1054,40 +1091,42 @@ int ibw_alloc_send_buf(struct ibw_conn *conn, void **buf, void **key, uint32_t l
|
||||
default: break;
|
||||
}
|
||||
}
|
||||
DLIST_REMOVE(pconn->extra_avail, p);
|
||||
|
||||
p->msg_large = ibw_alloc_mr(pctx, pconn, len, &p->mr_large);
|
||||
if (!p->msg_large) {
|
||||
sprintf(ibw_lasterr, "ibw_alloc_mr#2 failed");
|
||||
p->buf_large = ibw_alloc_mr(pctx, pconn, len, &p->mr_large);
|
||||
if (p->buf_large==NULL) {
|
||||
sprintf(ibw_lasterr, "ibw_alloc_mr#2 failed\n");
|
||||
goto error;
|
||||
}
|
||||
*buf = (void *)p->msg_large;
|
||||
*buf = (void *)p->buf_large;
|
||||
|
||||
DLIST_REMOVE(pconn->extra_avail, p);
|
||||
/* we don't have prepared index for this, so that
|
||||
* we will have to find this by wr_id later on */
|
||||
DLIST_ADD(pconn->extra_sent, p);
|
||||
}
|
||||
|
||||
*key = (void *)p;
|
||||
|
||||
return 0;
|
||||
error:
|
||||
DEBUG(0, ("ibw_alloc_send_buf error: %s\n", ibw_lasterr));
|
||||
DEBUG(0, ("ibw_alloc_send_buf error: %s", ibw_lasterr));
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
||||
int ibw_send(struct ibw_conn *conn, void *buf, void *key, uint32_t len)
|
||||
static int ibw_send_packet(struct ibw_conn *conn, void *buf, struct ibw_wr *p, uint32_t len)
|
||||
{
|
||||
struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv);
|
||||
struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
|
||||
struct ibw_wr *p = talloc_get_type(key, struct ibw_wr);
|
||||
int rc;
|
||||
|
||||
*((uint32_t *)buf) = htonl(len);
|
||||
|
||||
/* can we send it right now? */
|
||||
if (pconn->wr_sent<pctx->opts.max_send_wr) {
|
||||
struct ibv_send_wr *bad_wr;
|
||||
struct ibv_sge list = {
|
||||
.addr = (uintptr_t) NULL,
|
||||
.addr = (uintptr_t)buf,
|
||||
.length = len,
|
||||
.lkey = 0
|
||||
.lkey = pconn->mr_send->lkey
|
||||
};
|
||||
struct ibv_send_wr wr = {
|
||||
.wr_id = p->wr_id + pctx->opts.max_recv_wr,
|
||||
@ -1096,16 +1135,13 @@ int ibw_send(struct ibw_conn *conn, void *buf, void *key, uint32_t len)
|
||||
.opcode = IBV_WR_SEND,
|
||||
.send_flags = IBV_SEND_SIGNALED,
|
||||
};
|
||||
struct ibv_send_wr *bad_wr;
|
||||
|
||||
DEBUG(10, ("ibw_send#1(cmid: %p, wrid: %u, n: %d)\n",
|
||||
if (p->buf_large==NULL) {
|
||||
DEBUG(10, ("ibw_send#normal(cmid: %p, wrid: %u, n: %d)\n",
|
||||
pconn->cm_id, (uint32_t)wr.wr_id, len));
|
||||
|
||||
list.addr = (uintptr_t)buf;
|
||||
if (p->msg_large==NULL) {
|
||||
list.lkey = pconn->mr_send->lkey;
|
||||
} else {
|
||||
assert(p->mr_large!=NULL);
|
||||
DEBUG(10, ("ibw_send#large(cmid: %p, wrid: %u, n: %d)\n",
|
||||
pconn->cm_id, (uint32_t)wr.wr_id, len));
|
||||
list.lkey = p->mr_large->lkey;
|
||||
}
|
||||
|
||||
@ -1113,26 +1149,72 @@ int ibw_send(struct ibw_conn *conn, void *buf, void *key, uint32_t len)
|
||||
if (rc) {
|
||||
sprintf(ibw_lasterr, "ibv_post_send error %d (%d)\n",
|
||||
rc, pconn->wr_sent);
|
||||
DEBUG(0, (ibw_lasterr));
|
||||
} else {
|
||||
/* good case */
|
||||
if (p->wr_id>=pctx->opts.max_send_wr) {
|
||||
/* we don't have prepared index for this, so that
|
||||
* we will have to find this later on */
|
||||
DLIST_ADD(pconn->extra_sent, p);
|
||||
goto error;
|
||||
}
|
||||
|
||||
pconn->wr_sent++;
|
||||
}
|
||||
|
||||
return rc;
|
||||
} /* else put the request into our own queue: */
|
||||
|
||||
DEBUG(10, ("ibw_send#2(cmid: %p, len: %u)\n", pconn->cm_id, len));
|
||||
DEBUG(10, ("ibw_send#queued(cmid: %p, len: %u)\n", pconn->cm_id, len));
|
||||
|
||||
/* to be sent by ibw_wc_send */
|
||||
DLIST_ADD_END(pconn->queue, p, struct ibw_wr *); /* TODO: optimize */
|
||||
/* regardless "normal" or [a part of] "large" packet */
|
||||
if (!p->queued_ref_cnt) {
|
||||
DLIST_ADD_END2(pconn->queue, p, struct ibw_wr *,
|
||||
qprev, qnext); /* TODO: optimize */
|
||||
p->queued_msg = buf;
|
||||
}
|
||||
p->queued_ref_cnt++;
|
||||
p->queued_rlen = len; /* last wins; see ibw_wc_send */
|
||||
|
||||
return 0;
|
||||
error:
|
||||
DEBUG(0, (ibw_lasterr));
|
||||
return -1;
|
||||
}
|
||||
|
||||
int ibw_send(struct ibw_conn *conn, void *buf, void *key, uint32_t len)
|
||||
{
|
||||
struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv);
|
||||
struct ibw_wr *p = talloc_get_type(key, struct ibw_wr);
|
||||
int rc;
|
||||
|
||||
assert(len>=sizeof(uint32_t));
|
||||
*((uint32_t *)buf) = htonl(len);
|
||||
|
||||
if (len > pctx->opts.recv_bufsize) {
|
||||
struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
|
||||
int rlen = len;
|
||||
char *packet = (char *)buf;
|
||||
uint32_t recv_bufsize = pctx->opts.recv_bufsize;
|
||||
|
||||
DEBUG(10, ("ibw_send#frag(cmid: %p, buf: %p, len: %u)\n",
|
||||
pconn->cm_id, buf, len));
|
||||
|
||||
/* single threaded => no race here: */
|
||||
assert(p->ref_cnt==0);
|
||||
while(rlen > recv_bufsize) {
|
||||
rc = ibw_send_packet(conn, packet, p, recv_bufsize);
|
||||
if (rc)
|
||||
return rc;
|
||||
packet += recv_bufsize;
|
||||
rlen -= recv_bufsize;
|
||||
p->ref_cnt++; /* not good to have it in ibw_send_packet */
|
||||
}
|
||||
if (rlen) {
|
||||
rc = ibw_send_packet(conn, packet, p, rlen);
|
||||
p->ref_cnt++; /* not good to have it in ibw_send_packet */
|
||||
}
|
||||
p->ref_cnt--; /* for the same handling */
|
||||
} else {
|
||||
assert(p->ref_cnt==0);
|
||||
assert(p->queued_ref_cnt==0);
|
||||
|
||||
rc = ibw_send_packet(conn, buf, p, len);
|
||||
}
|
||||
return rc;
|
||||
}
|
||||
|
||||
int ibw_cancel_send_buf(struct ibw_conn *conn, void *buf, void *key)
|
||||
@ -1145,8 +1227,8 @@ int ibw_cancel_send_buf(struct ibw_conn *conn, void *buf, void *key)
|
||||
assert(buf!=NULL);
|
||||
assert(conn!=NULL);
|
||||
|
||||
if (p->msg_large)
|
||||
ibw_free_mr(&p->msg_large, &p->mr_large);
|
||||
if (p->buf_large!=NULL)
|
||||
ibw_free_mr(&p->buf_large, &p->mr_large);
|
||||
|
||||
/* parallel case */
|
||||
if (p->wr_id < pctx->opts.max_send_wr) {
|
||||
@ -1155,6 +1237,7 @@ int ibw_cancel_send_buf(struct ibw_conn *conn, void *buf, void *key)
|
||||
DLIST_ADD(pconn->wr_list_avail, p);
|
||||
} else { /* "extra" packet */
|
||||
DEBUG(10, ("ibw_cancel_send_buf#2 %u", (int)p->wr_id));
|
||||
DLIST_REMOVE(pconn->extra_sent, p);
|
||||
DLIST_ADD(pconn->extra_avail, p);
|
||||
}
|
||||
|
||||
|
@ -24,21 +24,25 @@
|
||||
struct ibw_opts {
|
||||
uint32_t max_send_wr;
|
||||
uint32_t max_recv_wr;
|
||||
uint32_t avg_send_size;
|
||||
uint32_t recv_bufsize;
|
||||
uint32_t recv_threshold;
|
||||
};
|
||||
|
||||
struct ibw_wr {
|
||||
char *msg; /* initialized in ibw_init_memory once per connection */
|
||||
char *buf; /* initialized in ibw_init_memory once per connection */
|
||||
int wr_id; /* position in wr_index list; also used as wr id */
|
||||
|
||||
char *msg_large; /* allocated specially for "large" message */
|
||||
char *buf_large; /* allocated specially for "large" message */
|
||||
struct ibv_mr *mr_large;
|
||||
int ref_cnt; /* reference count for ibw_wc_send to know when to release */
|
||||
|
||||
char *queued_msg; /* set at ibw_send - can be different than above */
|
||||
int queued_ref_cnt; /* instead of adding the same to the queue again */
|
||||
uint32_t queued_rlen; /* last wins when queued_ref_cnt>0; or simple msg size */
|
||||
|
||||
struct ibw_wr *next, *prev; /* in wr_list_avail or wr_list_used */
|
||||
/* or extra_sent or extra_avail */
|
||||
struct ibw_wr *qnext, *qprev; /* in queue */
|
||||
};
|
||||
|
||||
struct ibw_ctx_priv {
|
||||
@ -81,11 +85,12 @@ struct ibw_conn_priv {
|
||||
struct ibw_wr **wr_index; /* array[0..(qsize-1)] of (ibw_wr *) */
|
||||
int wr_sent; /* # of send wrs in the CQ */
|
||||
|
||||
struct ibw_wr *queue;
|
||||
struct ibw_wr *extra_sent;
|
||||
struct ibw_wr *extra_avail;
|
||||
int extra_max; /* max wr_id in the queue */
|
||||
|
||||
struct ibw_wr *queue;
|
||||
|
||||
/* buf_recv is a ring buffer */
|
||||
char *buf_recv; /* max_recv_wr * avg_recv_size */
|
||||
struct ibv_mr *mr_recv;
|
||||
@ -93,3 +98,30 @@ struct ibw_conn_priv {
|
||||
struct ibw_part part;
|
||||
};
|
||||
|
||||
/* remove an element from a list - element doesn't have to be in list. */
|
||||
#define DLIST_REMOVE2(list, p, prev, next) \
|
||||
do { \
|
||||
if ((p) == (list)) { \
|
||||
(list) = (p)->next; \
|
||||
if (list) (list)->prev = NULL; \
|
||||
} else { \
|
||||
if ((p)->prev) (p)->prev->next = (p)->next; \
|
||||
if ((p)->next) (p)->next->prev = (p)->prev; \
|
||||
} \
|
||||
if ((p) != (list)) (p)->next = (p)->prev = NULL; \
|
||||
} while (0)
|
||||
|
||||
/* hook into the end of the list - needs a tmp pointer */
|
||||
#define DLIST_ADD_END2(list, p, type, prev, next) \
|
||||
do { \
|
||||
if (!(list)) { \
|
||||
(list) = (p); \
|
||||
(p)->next = (p)->prev = NULL; \
|
||||
} else { \
|
||||
type tmp; \
|
||||
for (tmp = (list); tmp->next; tmp = tmp->next) ; \
|
||||
tmp->next = (p); \
|
||||
(p)->next = NULL; \
|
||||
(p)->prev = tmp; \
|
||||
} \
|
||||
} while (0)
|
||||
|
@ -223,7 +223,7 @@ int ibwtest_do_varsize_scenario_conn(struct ibwtest_ctx *tcx, struct ibw_conn *c
|
||||
for(i=0; i<tcx->nmsg; i++)
|
||||
{
|
||||
//size = (uint32_t)((float)(tcx->maxsize) * (rand() / (RAND_MAX + 1.0)));
|
||||
size = (uint32_t)((float)(tcx->maxsize) * ((float)i/(float)tcx->nmsg));
|
||||
size = (uint32_t)((float)(tcx->maxsize) * ((float)(i+1)/(float)tcx->nmsg));
|
||||
if (ibwtest_do_varsize_scenario_conn_size(tcx, conn, size))
|
||||
return -1;
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user