b541dd554b
The svc_rdma_recv_ctxt free list uses a lockless list to avoid the need for a spin lock in the fast path. llist_del_first(), which is used by svc_rdma_recv_ctxt_get(), requires serialization, however, when there are multiple list producers that are unserialized. I mistakenly thought there was only one caller of svc_rdma_recv_ctxt_get() (svc_rdma_refresh_recvs()), thus explicit serialization would not be necessary. But there is another caller: svc_rdma_bc_sendto(), and these two are not serialized against each other. I haven't seen ill effects that I could directly ascribe to a lack of serialization. It's just an observation based on code audit. When DMA-mapping before sending a Reply, the passed-in struct svc_rdma_recv_ctxt is used only for its write and reply PCLs. These are currently always empty in the backchannel case. So, instead of passing a full svc_rdma_recv_ctxt object to svc_rdma_map_reply_msg(), let's pass in just the Write and Reply PCLs. This change makes it unnecessary for the backchannel to acquire a dummy svc_rdma_recv_ctxt object when sending an RPC Call. The need for svc_rdma_recv_ctxt free list serialization is now completely avoided. Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
285 lines
7.2 KiB
C
285 lines
7.2 KiB
C
// SPDX-License-Identifier: GPL-2.0
|
|
/*
|
|
* Copyright (c) 2015-2018 Oracle. All rights reserved.
|
|
*
|
|
* Support for reverse-direction RPCs on RPC/RDMA (server-side).
|
|
*/
|
|
|
|
#include <linux/sunrpc/svc_rdma.h>
|
|
|
|
#include "xprt_rdma.h"
|
|
#include <trace/events/rpcrdma.h>
|
|
|
|
/**
|
|
* svc_rdma_handle_bc_reply - Process incoming backchannel Reply
|
|
* @rqstp: resources for handling the Reply
|
|
* @rctxt: Received message
|
|
*
|
|
*/
|
|
void svc_rdma_handle_bc_reply(struct svc_rqst *rqstp,
|
|
struct svc_rdma_recv_ctxt *rctxt)
|
|
{
|
|
struct svc_xprt *sxprt = rqstp->rq_xprt;
|
|
struct rpc_xprt *xprt = sxprt->xpt_bc_xprt;
|
|
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
|
|
struct xdr_buf *rcvbuf = &rqstp->rq_arg;
|
|
struct kvec *dst, *src = &rcvbuf->head[0];
|
|
__be32 *rdma_resp = rctxt->rc_recv_buf;
|
|
struct rpc_rqst *req;
|
|
u32 credits;
|
|
|
|
spin_lock(&xprt->queue_lock);
|
|
req = xprt_lookup_rqst(xprt, *rdma_resp);
|
|
if (!req)
|
|
goto out_unlock;
|
|
|
|
dst = &req->rq_private_buf.head[0];
|
|
memcpy(&req->rq_private_buf, &req->rq_rcv_buf, sizeof(struct xdr_buf));
|
|
if (dst->iov_len < src->iov_len)
|
|
goto out_unlock;
|
|
memcpy(dst->iov_base, src->iov_base, src->iov_len);
|
|
xprt_pin_rqst(req);
|
|
spin_unlock(&xprt->queue_lock);
|
|
|
|
credits = be32_to_cpup(rdma_resp + 2);
|
|
if (credits == 0)
|
|
credits = 1; /* don't deadlock */
|
|
else if (credits > r_xprt->rx_buf.rb_bc_max_requests)
|
|
credits = r_xprt->rx_buf.rb_bc_max_requests;
|
|
spin_lock(&xprt->transport_lock);
|
|
xprt->cwnd = credits << RPC_CWNDSHIFT;
|
|
spin_unlock(&xprt->transport_lock);
|
|
|
|
spin_lock(&xprt->queue_lock);
|
|
xprt_complete_rqst(req->rq_task, rcvbuf->len);
|
|
xprt_unpin_rqst(req);
|
|
rcvbuf->len = 0;
|
|
|
|
out_unlock:
|
|
spin_unlock(&xprt->queue_lock);
|
|
}
|
|
|
|
/* Send a reverse-direction RPC Call.
|
|
*
|
|
* Caller holds the connection's mutex and has already marshaled
|
|
* the RPC/RDMA request.
|
|
*
|
|
* This is similar to svc_rdma_send_reply_msg, but takes a struct
|
|
* rpc_rqst instead, does not support chunks, and avoids blocking
|
|
* memory allocation.
|
|
*
|
|
* XXX: There is still an opportunity to block in svc_rdma_send()
|
|
* if there are no SQ entries to post the Send. This may occur if
|
|
* the adapter has a small maximum SQ depth.
|
|
*/
|
|
static int svc_rdma_bc_sendto(struct svcxprt_rdma *rdma,
|
|
struct rpc_rqst *rqst,
|
|
struct svc_rdma_send_ctxt *sctxt)
|
|
{
|
|
struct svc_rdma_pcl empty_pcl;
|
|
int ret;
|
|
|
|
pcl_init(&empty_pcl);
|
|
ret = svc_rdma_map_reply_msg(rdma, sctxt, &empty_pcl, &empty_pcl,
|
|
&rqst->rq_snd_buf);
|
|
if (ret < 0)
|
|
return -EIO;
|
|
|
|
/* Bump page refcnt so Send completion doesn't release
|
|
* the rq_buffer before all retransmits are complete.
|
|
*/
|
|
get_page(virt_to_page(rqst->rq_buffer));
|
|
sctxt->sc_send_wr.opcode = IB_WR_SEND;
|
|
return svc_rdma_send(rdma, sctxt);
|
|
}
|
|
|
|
/* Server-side transport endpoint wants a whole page for its send
|
|
* buffer. The client RPC code constructs the RPC header in this
|
|
* buffer before it invokes ->send_request.
|
|
*/
|
|
static int
|
|
xprt_rdma_bc_allocate(struct rpc_task *task)
|
|
{
|
|
struct rpc_rqst *rqst = task->tk_rqstp;
|
|
size_t size = rqst->rq_callsize;
|
|
struct page *page;
|
|
|
|
if (size > PAGE_SIZE) {
|
|
WARN_ONCE(1, "svcrdma: large bc buffer request (size %zu)\n",
|
|
size);
|
|
return -EINVAL;
|
|
}
|
|
|
|
page = alloc_page(GFP_NOIO | __GFP_NOWARN);
|
|
if (!page)
|
|
return -ENOMEM;
|
|
rqst->rq_buffer = page_address(page);
|
|
|
|
rqst->rq_rbuffer = kmalloc(rqst->rq_rcvsize, GFP_NOIO | __GFP_NOWARN);
|
|
if (!rqst->rq_rbuffer) {
|
|
put_page(page);
|
|
return -ENOMEM;
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
static void
|
|
xprt_rdma_bc_free(struct rpc_task *task)
|
|
{
|
|
struct rpc_rqst *rqst = task->tk_rqstp;
|
|
|
|
put_page(virt_to_page(rqst->rq_buffer));
|
|
kfree(rqst->rq_rbuffer);
|
|
}
|
|
|
|
static int
|
|
rpcrdma_bc_send_request(struct svcxprt_rdma *rdma, struct rpc_rqst *rqst)
|
|
{
|
|
struct rpc_xprt *xprt = rqst->rq_xprt;
|
|
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(xprt);
|
|
struct svc_rdma_send_ctxt *ctxt;
|
|
__be32 *p;
|
|
int rc;
|
|
|
|
ctxt = svc_rdma_send_ctxt_get(rdma);
|
|
if (!ctxt)
|
|
goto drop_connection;
|
|
|
|
p = xdr_reserve_space(&ctxt->sc_stream, RPCRDMA_HDRLEN_MIN);
|
|
if (!p)
|
|
goto put_ctxt;
|
|
*p++ = rqst->rq_xid;
|
|
*p++ = rpcrdma_version;
|
|
*p++ = cpu_to_be32(r_xprt->rx_buf.rb_bc_max_requests);
|
|
*p++ = rdma_msg;
|
|
*p++ = xdr_zero;
|
|
*p++ = xdr_zero;
|
|
*p = xdr_zero;
|
|
|
|
rqst->rq_xtime = ktime_get();
|
|
rc = svc_rdma_bc_sendto(rdma, rqst, ctxt);
|
|
if (rc)
|
|
goto put_ctxt;
|
|
return 0;
|
|
|
|
put_ctxt:
|
|
svc_rdma_send_ctxt_put(rdma, ctxt);
|
|
|
|
drop_connection:
|
|
return -ENOTCONN;
|
|
}
|
|
|
|
/**
|
|
* xprt_rdma_bc_send_request - Send a reverse-direction Call
|
|
* @rqst: rpc_rqst containing Call message to be sent
|
|
*
|
|
* Return values:
|
|
* %0 if the message was sent successfully
|
|
* %ENOTCONN if the message was not sent
|
|
*/
|
|
static int xprt_rdma_bc_send_request(struct rpc_rqst *rqst)
|
|
{
|
|
struct svc_xprt *sxprt = rqst->rq_xprt->bc_xprt;
|
|
struct svcxprt_rdma *rdma =
|
|
container_of(sxprt, struct svcxprt_rdma, sc_xprt);
|
|
int ret;
|
|
|
|
if (test_bit(XPT_DEAD, &sxprt->xpt_flags))
|
|
return -ENOTCONN;
|
|
|
|
ret = rpcrdma_bc_send_request(rdma, rqst);
|
|
if (ret == -ENOTCONN)
|
|
svc_xprt_close(sxprt);
|
|
return ret;
|
|
}
|
|
|
|
static void
|
|
xprt_rdma_bc_close(struct rpc_xprt *xprt)
|
|
{
|
|
xprt_disconnect_done(xprt);
|
|
xprt->cwnd = RPC_CWNDSHIFT;
|
|
}
|
|
|
|
static void
|
|
xprt_rdma_bc_put(struct rpc_xprt *xprt)
|
|
{
|
|
xprt_rdma_free_addresses(xprt);
|
|
xprt_free(xprt);
|
|
}
|
|
|
|
static const struct rpc_xprt_ops xprt_rdma_bc_procs = {
|
|
.reserve_xprt = xprt_reserve_xprt_cong,
|
|
.release_xprt = xprt_release_xprt_cong,
|
|
.alloc_slot = xprt_alloc_slot,
|
|
.free_slot = xprt_free_slot,
|
|
.release_request = xprt_release_rqst_cong,
|
|
.buf_alloc = xprt_rdma_bc_allocate,
|
|
.buf_free = xprt_rdma_bc_free,
|
|
.send_request = xprt_rdma_bc_send_request,
|
|
.wait_for_reply_request = xprt_wait_for_reply_request_def,
|
|
.close = xprt_rdma_bc_close,
|
|
.destroy = xprt_rdma_bc_put,
|
|
.print_stats = xprt_rdma_print_stats
|
|
};
|
|
|
|
static const struct rpc_timeout xprt_rdma_bc_timeout = {
|
|
.to_initval = 60 * HZ,
|
|
.to_maxval = 60 * HZ,
|
|
};
|
|
|
|
/* It shouldn't matter if the number of backchannel session slots
|
|
* doesn't match the number of RPC/RDMA credits. That just means
|
|
* one or the other will have extra slots that aren't used.
|
|
*/
|
|
static struct rpc_xprt *
|
|
xprt_setup_rdma_bc(struct xprt_create *args)
|
|
{
|
|
struct rpc_xprt *xprt;
|
|
struct rpcrdma_xprt *new_xprt;
|
|
|
|
if (args->addrlen > sizeof(xprt->addr))
|
|
return ERR_PTR(-EBADF);
|
|
|
|
xprt = xprt_alloc(args->net, sizeof(*new_xprt),
|
|
RPCRDMA_MAX_BC_REQUESTS,
|
|
RPCRDMA_MAX_BC_REQUESTS);
|
|
if (!xprt)
|
|
return ERR_PTR(-ENOMEM);
|
|
|
|
xprt->timeout = &xprt_rdma_bc_timeout;
|
|
xprt_set_bound(xprt);
|
|
xprt_set_connected(xprt);
|
|
xprt->bind_timeout = 0;
|
|
xprt->reestablish_timeout = 0;
|
|
xprt->idle_timeout = 0;
|
|
|
|
xprt->prot = XPRT_TRANSPORT_BC_RDMA;
|
|
xprt->ops = &xprt_rdma_bc_procs;
|
|
|
|
memcpy(&xprt->addr, args->dstaddr, args->addrlen);
|
|
xprt->addrlen = args->addrlen;
|
|
xprt_rdma_format_addresses(xprt, (struct sockaddr *)&xprt->addr);
|
|
xprt->resvport = 0;
|
|
|
|
xprt->max_payload = xprt_rdma_max_inline_read;
|
|
|
|
new_xprt = rpcx_to_rdmax(xprt);
|
|
new_xprt->rx_buf.rb_bc_max_requests = xprt->max_reqs;
|
|
|
|
xprt_get(xprt);
|
|
args->bc_xprt->xpt_bc_xprt = xprt;
|
|
xprt->bc_xprt = args->bc_xprt;
|
|
|
|
/* Final put for backchannel xprt is in __svc_rdma_free */
|
|
xprt_get(xprt);
|
|
return xprt;
|
|
}
|
|
|
|
struct xprt_class xprt_rdma_bc = {
|
|
.list = LIST_HEAD_INIT(xprt_rdma_bc.list),
|
|
.name = "rdma backchannel",
|
|
.owner = THIS_MODULE,
|
|
.ident = XPRT_TRANSPORT_BC_RDMA,
|
|
.setup = xprt_setup_rdma_bc,
|
|
};
|