xprtrdma: RPC completion should wait for Send completion
When an RPC Call includes a file data payload, that payload can come from pages in the page cache, or a user buffer (for direct I/O). If the payload can fit inline, xprtrdma includes it in the Send using a scatter-gather technique. xprtrdma mustn't allow the RPC consumer to re-use the memory where that payload resides before the Send completes. Otherwise, the new contents of that memory would be exposed by an HCA retransmit of the Send operation. So, block RPC completion on Send completion, but only in the case where a separate file data payload is part of the Send. This prevents the reuse of that memory while it is still part of a Send operation without an undue cost to other cases. Waiting is avoided in the common case because typically the Send will have completed long before the RPC Reply arrives. These days, an RPC timeout will trigger a disconnect, which tears down the QP. The disconnect flushes all waiting Sends. This bounds the amount of time the reply handler has to wait for a Send completion. Signed-off-by: Chuck Lever <chuck.lever@oracle.com> Signed-off-by: Anna Schumaker <Anna.Schumaker@Netapp.com>
This commit is contained in:
parent
0ba6f37012
commit
01bb35c89d
@ -534,6 +534,11 @@ rpcrdma_unmap_sendctx(struct rpcrdma_sendctx *sc)
|
|||||||
for (count = sc->sc_unmap_count; count; ++sge, --count)
|
for (count = sc->sc_unmap_count; count; ++sge, --count)
|
||||||
ib_dma_unmap_page(ia->ri_device,
|
ib_dma_unmap_page(ia->ri_device,
|
||||||
sge->addr, sge->length, DMA_TO_DEVICE);
|
sge->addr, sge->length, DMA_TO_DEVICE);
|
||||||
|
|
||||||
|
if (test_and_clear_bit(RPCRDMA_REQ_F_TX_RESOURCES, &sc->sc_req->rl_flags)) {
|
||||||
|
smp_mb__after_atomic();
|
||||||
|
wake_up_bit(&sc->sc_req->rl_flags, RPCRDMA_REQ_F_TX_RESOURCES);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Prepare an SGE for the RPC-over-RDMA transport header.
|
/* Prepare an SGE for the RPC-over-RDMA transport header.
|
||||||
@ -667,6 +672,8 @@ map_tail:
|
|||||||
|
|
||||||
out:
|
out:
|
||||||
sc->sc_wr.num_sge += sge_no;
|
sc->sc_wr.num_sge += sge_no;
|
||||||
|
if (sc->sc_unmap_count)
|
||||||
|
__set_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags);
|
||||||
return true;
|
return true;
|
||||||
|
|
||||||
out_regbuf:
|
out_regbuf:
|
||||||
@ -704,6 +711,8 @@ rpcrdma_prepare_send_sges(struct rpcrdma_xprt *r_xprt,
|
|||||||
return -ENOBUFS;
|
return -ENOBUFS;
|
||||||
req->rl_sendctx->sc_wr.num_sge = 0;
|
req->rl_sendctx->sc_wr.num_sge = 0;
|
||||||
req->rl_sendctx->sc_unmap_count = 0;
|
req->rl_sendctx->sc_unmap_count = 0;
|
||||||
|
req->rl_sendctx->sc_req = req;
|
||||||
|
__clear_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags);
|
||||||
|
|
||||||
if (!rpcrdma_prepare_hdr_sge(&r_xprt->rx_ia, req, hdrlen))
|
if (!rpcrdma_prepare_hdr_sge(&r_xprt->rx_ia, req, hdrlen))
|
||||||
return -EIO;
|
return -EIO;
|
||||||
@ -1305,6 +1314,20 @@ void rpcrdma_release_rqst(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
|
|||||||
if (!list_empty(&req->rl_registered))
|
if (!list_empty(&req->rl_registered))
|
||||||
r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt,
|
r_xprt->rx_ia.ri_ops->ro_unmap_sync(r_xprt,
|
||||||
&req->rl_registered);
|
&req->rl_registered);
|
||||||
|
|
||||||
|
/* Ensure that any DMA mapped pages associated with
|
||||||
|
* the Send of the RPC Call have been unmapped before
|
||||||
|
* allowing the RPC to complete. This protects argument
|
||||||
|
* memory not controlled by the RPC client from being
|
||||||
|
* re-used before we're done with it.
|
||||||
|
*/
|
||||||
|
if (test_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags)) {
|
||||||
|
r_xprt->rx_stats.reply_waits_for_send++;
|
||||||
|
out_of_line_wait_on_bit(&req->rl_flags,
|
||||||
|
RPCRDMA_REQ_F_TX_RESOURCES,
|
||||||
|
bit_wait,
|
||||||
|
TASK_UNINTERRUPTIBLE);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Reply handling runs in the poll worker thread. Anything that
|
/* Reply handling runs in the poll worker thread. Anything that
|
||||||
@ -1384,7 +1407,8 @@ void rpcrdma_reply_handler(struct rpcrdma_rep *rep)
|
|||||||
dprintk("RPC: %s: reply %p completes request %p (xid 0x%08x)\n",
|
dprintk("RPC: %s: reply %p completes request %p (xid 0x%08x)\n",
|
||||||
__func__, rep, req, be32_to_cpu(rep->rr_xid));
|
__func__, rep, req, be32_to_cpu(rep->rr_xid));
|
||||||
|
|
||||||
if (list_empty(&req->rl_registered))
|
if (list_empty(&req->rl_registered) &&
|
||||||
|
!test_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags))
|
||||||
rpcrdma_complete_rqst(rep);
|
rpcrdma_complete_rqst(rep);
|
||||||
else
|
else
|
||||||
queue_work(rpcrdma_receive_wq, &rep->rr_work);
|
queue_work(rpcrdma_receive_wq, &rep->rr_work);
|
||||||
|
@ -789,12 +789,13 @@ void xprt_rdma_print_stats(struct rpc_xprt *xprt, struct seq_file *seq)
|
|||||||
r_xprt->rx_stats.failed_marshal_count,
|
r_xprt->rx_stats.failed_marshal_count,
|
||||||
r_xprt->rx_stats.bad_reply_count,
|
r_xprt->rx_stats.bad_reply_count,
|
||||||
r_xprt->rx_stats.nomsg_call_count);
|
r_xprt->rx_stats.nomsg_call_count);
|
||||||
seq_printf(seq, "%lu %lu %lu %lu %lu\n",
|
seq_printf(seq, "%lu %lu %lu %lu %lu %lu\n",
|
||||||
r_xprt->rx_stats.mrs_recovered,
|
r_xprt->rx_stats.mrs_recovered,
|
||||||
r_xprt->rx_stats.mrs_orphaned,
|
r_xprt->rx_stats.mrs_orphaned,
|
||||||
r_xprt->rx_stats.mrs_allocated,
|
r_xprt->rx_stats.mrs_allocated,
|
||||||
r_xprt->rx_stats.local_inv_needed,
|
r_xprt->rx_stats.local_inv_needed,
|
||||||
r_xprt->rx_stats.empty_sendctx_q);
|
r_xprt->rx_stats.empty_sendctx_q,
|
||||||
|
r_xprt->rx_stats.reply_waits_for_send);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int
|
static int
|
||||||
|
@ -1526,7 +1526,8 @@ rpcrdma_ep_post(struct rpcrdma_ia *ia,
|
|||||||
dprintk("RPC: %s: posting %d s/g entries\n",
|
dprintk("RPC: %s: posting %d s/g entries\n",
|
||||||
__func__, send_wr->num_sge);
|
__func__, send_wr->num_sge);
|
||||||
|
|
||||||
if (!ep->rep_send_count) {
|
if (!ep->rep_send_count ||
|
||||||
|
test_bit(RPCRDMA_REQ_F_TX_RESOURCES, &req->rl_flags)) {
|
||||||
send_wr->send_flags |= IB_SEND_SIGNALED;
|
send_wr->send_flags |= IB_SEND_SIGNALED;
|
||||||
ep->rep_send_count = ep->rep_send_batch;
|
ep->rep_send_count = ep->rep_send_batch;
|
||||||
} else {
|
} else {
|
||||||
|
@ -236,11 +236,13 @@ struct rpcrdma_rep {
|
|||||||
|
|
||||||
/* struct rpcrdma_sendctx - DMA mapped SGEs to unmap after Send completes
|
/* struct rpcrdma_sendctx - DMA mapped SGEs to unmap after Send completes
|
||||||
*/
|
*/
|
||||||
|
struct rpcrdma_req;
|
||||||
struct rpcrdma_xprt;
|
struct rpcrdma_xprt;
|
||||||
struct rpcrdma_sendctx {
|
struct rpcrdma_sendctx {
|
||||||
struct ib_send_wr sc_wr;
|
struct ib_send_wr sc_wr;
|
||||||
struct ib_cqe sc_cqe;
|
struct ib_cqe sc_cqe;
|
||||||
struct rpcrdma_xprt *sc_xprt;
|
struct rpcrdma_xprt *sc_xprt;
|
||||||
|
struct rpcrdma_req *sc_req;
|
||||||
unsigned int sc_unmap_count;
|
unsigned int sc_unmap_count;
|
||||||
struct ib_sge sc_sges[];
|
struct ib_sge sc_sges[];
|
||||||
};
|
};
|
||||||
@ -387,6 +389,7 @@ struct rpcrdma_req {
|
|||||||
enum {
|
enum {
|
||||||
RPCRDMA_REQ_F_BACKCHANNEL = 0,
|
RPCRDMA_REQ_F_BACKCHANNEL = 0,
|
||||||
RPCRDMA_REQ_F_PENDING,
|
RPCRDMA_REQ_F_PENDING,
|
||||||
|
RPCRDMA_REQ_F_TX_RESOURCES,
|
||||||
};
|
};
|
||||||
|
|
||||||
static inline void
|
static inline void
|
||||||
@ -492,6 +495,7 @@ struct rpcrdma_stats {
|
|||||||
/* accessed when receiving a reply */
|
/* accessed when receiving a reply */
|
||||||
unsigned long long total_rdma_reply;
|
unsigned long long total_rdma_reply;
|
||||||
unsigned long long fixup_copy_count;
|
unsigned long long fixup_copy_count;
|
||||||
|
unsigned long reply_waits_for_send;
|
||||||
unsigned long local_inv_needed;
|
unsigned long local_inv_needed;
|
||||||
unsigned long nomsg_call_count;
|
unsigned long nomsg_call_count;
|
||||||
unsigned long bcall_count;
|
unsigned long bcall_count;
|
||||||
|
Loading…
Reference in New Issue
Block a user