SUNRPC: Simplify TCP receive code by switching to using iterators

Most of this code should also be reusable with other socket types.

Signed-off-by: Trond Myklebust <trond.myklebust@hammerspace.com>
This commit is contained in:
Trond Myklebust 2018-09-14 09:49:06 -04:00
parent 9d96acbc7f
commit 277e4ab7d5
3 changed files with 338 additions and 393 deletions

View File

@ -31,15 +31,16 @@ struct sock_xprt {
* State of TCP reply receive * State of TCP reply receive
*/ */
struct { struct {
__be32 fraghdr, struct {
__be32 fraghdr,
xid, xid,
calldir; calldir;
} __attribute__((packed));
u32 offset, u32 offset,
len; len;
unsigned long copied, unsigned long copied;
flags;
} recv; } recv;
/* /*
@ -76,21 +77,9 @@ struct sock_xprt {
void (*old_error_report)(struct sock *); void (*old_error_report)(struct sock *);
}; };
/*
* TCP receive state flags
*/
#define TCP_RCV_LAST_FRAG (1UL << 0)
#define TCP_RCV_COPY_FRAGHDR (1UL << 1)
#define TCP_RCV_COPY_XID (1UL << 2)
#define TCP_RCV_COPY_DATA (1UL << 3)
#define TCP_RCV_READ_CALLDIR (1UL << 4)
#define TCP_RCV_COPY_CALLDIR (1UL << 5)
/* /*
* TCP RPC flags * TCP RPC flags
*/ */
#define TCP_RPC_REPLY (1UL << 6)
#define XPRT_SOCK_CONNECTING 1U #define XPRT_SOCK_CONNECTING 1U
#define XPRT_SOCK_DATA_READY (2) #define XPRT_SOCK_DATA_READY (2)
#define XPRT_SOCK_UPD_TIMEOUT (3) #define XPRT_SOCK_UPD_TIMEOUT (3)

View File

@ -497,16 +497,6 @@ TRACE_EVENT(xs_tcp_data_ready,
__get_str(port), __entry->err, __entry->total) __get_str(port), __entry->err, __entry->total)
); );
#define rpc_show_sock_xprt_flags(flags) \
__print_flags(flags, "|", \
{ TCP_RCV_LAST_FRAG, "TCP_RCV_LAST_FRAG" }, \
{ TCP_RCV_COPY_FRAGHDR, "TCP_RCV_COPY_FRAGHDR" }, \
{ TCP_RCV_COPY_XID, "TCP_RCV_COPY_XID" }, \
{ TCP_RCV_COPY_DATA, "TCP_RCV_COPY_DATA" }, \
{ TCP_RCV_READ_CALLDIR, "TCP_RCV_READ_CALLDIR" }, \
{ TCP_RCV_COPY_CALLDIR, "TCP_RCV_COPY_CALLDIR" }, \
{ TCP_RPC_REPLY, "TCP_RPC_REPLY" })
TRACE_EVENT(xs_tcp_data_recv, TRACE_EVENT(xs_tcp_data_recv,
TP_PROTO(struct sock_xprt *xs), TP_PROTO(struct sock_xprt *xs),
@ -516,7 +506,6 @@ TRACE_EVENT(xs_tcp_data_recv,
__string(addr, xs->xprt.address_strings[RPC_DISPLAY_ADDR]) __string(addr, xs->xprt.address_strings[RPC_DISPLAY_ADDR])
__string(port, xs->xprt.address_strings[RPC_DISPLAY_PORT]) __string(port, xs->xprt.address_strings[RPC_DISPLAY_PORT])
__field(u32, xid) __field(u32, xid)
__field(unsigned long, flags)
__field(unsigned long, copied) __field(unsigned long, copied)
__field(unsigned int, reclen) __field(unsigned int, reclen)
__field(unsigned long, offset) __field(unsigned long, offset)
@ -526,15 +515,13 @@ TRACE_EVENT(xs_tcp_data_recv,
__assign_str(addr, xs->xprt.address_strings[RPC_DISPLAY_ADDR]); __assign_str(addr, xs->xprt.address_strings[RPC_DISPLAY_ADDR]);
__assign_str(port, xs->xprt.address_strings[RPC_DISPLAY_PORT]); __assign_str(port, xs->xprt.address_strings[RPC_DISPLAY_PORT]);
__entry->xid = be32_to_cpu(xs->recv.xid); __entry->xid = be32_to_cpu(xs->recv.xid);
__entry->flags = xs->recv.flags;
__entry->copied = xs->recv.copied; __entry->copied = xs->recv.copied;
__entry->reclen = xs->recv.len; __entry->reclen = xs->recv.len;
__entry->offset = xs->recv.offset; __entry->offset = xs->recv.offset;
), ),
TP_printk("peer=[%s]:%s xid=0x%08x flags=%s copied=%lu reclen=%u offset=%lu", TP_printk("peer=[%s]:%s xid=0x%08x copied=%lu reclen=%u offset=%lu",
__get_str(addr), __get_str(port), __entry->xid, __get_str(addr), __get_str(port), __entry->xid,
rpc_show_sock_xprt_flags(__entry->flags),
__entry->copied, __entry->reclen, __entry->offset) __entry->copied, __entry->reclen, __entry->offset)
); );

View File

@ -47,13 +47,13 @@
#include <net/checksum.h> #include <net/checksum.h>
#include <net/udp.h> #include <net/udp.h>
#include <net/tcp.h> #include <net/tcp.h>
#include <linux/bvec.h>
#include <linux/uio.h>
#include <trace/events/sunrpc.h> #include <trace/events/sunrpc.h>
#include "sunrpc.h" #include "sunrpc.h"
#define RPC_TCP_READ_CHUNK_SZ (3*512*1024)
static void xs_close(struct rpc_xprt *xprt); static void xs_close(struct rpc_xprt *xprt);
static void xs_tcp_set_socket_timeouts(struct rpc_xprt *xprt, static void xs_tcp_set_socket_timeouts(struct rpc_xprt *xprt,
struct socket *sock); struct socket *sock);
@ -325,6 +325,323 @@ static void xs_free_peer_addresses(struct rpc_xprt *xprt)
} }
} }
static size_t
xs_alloc_sparse_pages(struct xdr_buf *buf, size_t want, gfp_t gfp)
{
size_t i,n;
if (!(buf->flags & XDRBUF_SPARSE_PAGES))
return want;
if (want > buf->page_len)
want = buf->page_len;
n = (buf->page_base + want + PAGE_SIZE - 1) >> PAGE_SHIFT;
for (i = 0; i < n; i++) {
if (buf->pages[i])
continue;
buf->bvec[i].bv_page = buf->pages[i] = alloc_page(gfp);
if (!buf->pages[i]) {
buf->page_len = (i * PAGE_SIZE) - buf->page_base;
return buf->page_len;
}
}
return want;
}
static ssize_t
xs_sock_recvmsg(struct socket *sock, struct msghdr *msg, int flags, size_t seek)
{
ssize_t ret;
if (seek != 0)
iov_iter_advance(&msg->msg_iter, seek);
ret = sock_recvmsg(sock, msg, flags);
return ret > 0 ? ret + seek : ret;
}
static ssize_t
xs_read_kvec(struct socket *sock, struct msghdr *msg, int flags,
struct kvec *kvec, size_t count, size_t seek)
{
iov_iter_kvec(&msg->msg_iter, READ | ITER_KVEC, kvec, 1, count);
return xs_sock_recvmsg(sock, msg, flags, seek);
}
static ssize_t
xs_read_bvec(struct socket *sock, struct msghdr *msg, int flags,
struct bio_vec *bvec, unsigned long nr, size_t count,
size_t seek)
{
iov_iter_bvec(&msg->msg_iter, READ | ITER_BVEC, bvec, nr, count);
return xs_sock_recvmsg(sock, msg, flags, seek);
}
static ssize_t
xs_read_discard(struct socket *sock, struct msghdr *msg, int flags,
size_t count)
{
struct kvec kvec = { 0 };
return xs_read_kvec(sock, msg, flags | MSG_TRUNC, &kvec, count, 0);
}
static ssize_t
xs_read_xdr_buf(struct socket *sock, struct msghdr *msg, int flags,
struct xdr_buf *buf, size_t count, size_t seek, size_t *read)
{
size_t want, seek_init = seek, offset = 0;
ssize_t ret;
if (seek < buf->head[0].iov_len) {
want = min_t(size_t, count, buf->head[0].iov_len);
ret = xs_read_kvec(sock, msg, flags, &buf->head[0], want, seek);
if (ret <= 0)
goto sock_err;
offset += ret;
if (offset == count || msg->msg_flags & (MSG_EOR|MSG_TRUNC))
goto out;
if (ret != want)
goto eagain;
seek = 0;
} else {
seek -= buf->head[0].iov_len;
offset += buf->head[0].iov_len;
}
if (seek < buf->page_len) {
want = xs_alloc_sparse_pages(buf,
min_t(size_t, count - offset, buf->page_len),
GFP_NOWAIT);
ret = xs_read_bvec(sock, msg, flags, buf->bvec,
xdr_buf_pagecount(buf),
want + buf->page_base,
seek + buf->page_base);
if (ret <= 0)
goto sock_err;
offset += ret - buf->page_base;
if (offset == count || msg->msg_flags & (MSG_EOR|MSG_TRUNC))
goto out;
if (ret != want)
goto eagain;
seek = 0;
} else {
seek -= buf->page_len;
offset += buf->page_len;
}
if (seek < buf->tail[0].iov_len) {
want = min_t(size_t, count - offset, buf->tail[0].iov_len);
ret = xs_read_kvec(sock, msg, flags, &buf->tail[0], want, seek);
if (ret <= 0)
goto sock_err;
offset += ret;
if (offset == count || msg->msg_flags & (MSG_EOR|MSG_TRUNC))
goto out;
if (ret != want)
goto eagain;
} else
offset += buf->tail[0].iov_len;
ret = -EMSGSIZE;
msg->msg_flags |= MSG_TRUNC;
out:
*read = offset - seek_init;
return ret;
eagain:
ret = -EAGAIN;
goto out;
sock_err:
offset += seek;
goto out;
}
static void
xs_read_header(struct sock_xprt *transport, struct xdr_buf *buf)
{
if (!transport->recv.copied) {
if (buf->head[0].iov_len >= transport->recv.offset)
memcpy(buf->head[0].iov_base,
&transport->recv.xid,
transport->recv.offset);
transport->recv.copied = transport->recv.offset;
}
}
static bool
xs_read_stream_request_done(struct sock_xprt *transport)
{
return transport->recv.fraghdr & cpu_to_be32(RPC_LAST_STREAM_FRAGMENT);
}
static ssize_t
xs_read_stream_request(struct sock_xprt *transport, struct msghdr *msg,
int flags, struct rpc_rqst *req)
{
struct xdr_buf *buf = &req->rq_private_buf;
size_t want, read;
ssize_t ret;
xs_read_header(transport, buf);
want = transport->recv.len - transport->recv.offset;
ret = xs_read_xdr_buf(transport->sock, msg, flags, buf,
transport->recv.copied + want, transport->recv.copied,
&read);
transport->recv.offset += read;
transport->recv.copied += read;
if (transport->recv.offset == transport->recv.len) {
if (xs_read_stream_request_done(transport))
msg->msg_flags |= MSG_EOR;
return transport->recv.copied;
}
switch (ret) {
case -EMSGSIZE:
return transport->recv.copied;
case 0:
return -ESHUTDOWN;
default:
if (ret < 0)
return ret;
}
return -EAGAIN;
}
static size_t
xs_read_stream_headersize(bool isfrag)
{
if (isfrag)
return sizeof(__be32);
return 3 * sizeof(__be32);
}
static ssize_t
xs_read_stream_header(struct sock_xprt *transport, struct msghdr *msg,
int flags, size_t want, size_t seek)
{
struct kvec kvec = {
.iov_base = &transport->recv.fraghdr,
.iov_len = want,
};
return xs_read_kvec(transport->sock, msg, flags, &kvec, want, seek);
}
#if defined(CONFIG_SUNRPC_BACKCHANNEL)
static ssize_t
xs_read_stream_call(struct sock_xprt *transport, struct msghdr *msg, int flags)
{
struct rpc_xprt *xprt = &transport->xprt;
struct rpc_rqst *req;
ssize_t ret;
/* Look up and lock the request corresponding to the given XID */
req = xprt_lookup_bc_request(xprt, transport->recv.xid);
if (!req) {
printk(KERN_WARNING "Callback slot table overflowed\n");
return -ESHUTDOWN;
}
ret = xs_read_stream_request(transport, msg, flags, req);
if (msg->msg_flags & (MSG_EOR|MSG_TRUNC))
xprt_complete_bc_request(req, ret);
return ret;
}
#else /* CONFIG_SUNRPC_BACKCHANNEL */
static ssize_t
xs_read_stream_call(struct sock_xprt *transport, struct msghdr *msg, int flags)
{
return -ESHUTDOWN;
}
#endif /* CONFIG_SUNRPC_BACKCHANNEL */
static ssize_t
xs_read_stream_reply(struct sock_xprt *transport, struct msghdr *msg, int flags)
{
struct rpc_xprt *xprt = &transport->xprt;
struct rpc_rqst *req;
ssize_t ret = 0;
/* Look up and lock the request corresponding to the given XID */
spin_lock(&xprt->queue_lock);
req = xprt_lookup_rqst(xprt, transport->recv.xid);
if (!req) {
msg->msg_flags |= MSG_TRUNC;
goto out;
}
xprt_pin_rqst(req);
spin_unlock(&xprt->queue_lock);
ret = xs_read_stream_request(transport, msg, flags, req);
spin_lock(&xprt->queue_lock);
if (msg->msg_flags & (MSG_EOR|MSG_TRUNC))
xprt_complete_rqst(req->rq_task, ret);
xprt_unpin_rqst(req);
out:
spin_unlock(&xprt->queue_lock);
return ret;
}
static ssize_t
xs_read_stream(struct sock_xprt *transport, int flags)
{
struct msghdr msg = { 0 };
size_t want, read = 0;
ssize_t ret = 0;
if (transport->recv.len == 0) {
want = xs_read_stream_headersize(transport->recv.copied != 0);
ret = xs_read_stream_header(transport, &msg, flags, want,
transport->recv.offset);
if (ret <= 0)
goto out_err;
transport->recv.offset = ret;
if (ret != want) {
ret = -EAGAIN;
goto out_err;
}
transport->recv.len = be32_to_cpu(transport->recv.fraghdr) &
RPC_FRAGMENT_SIZE_MASK;
transport->recv.offset -= sizeof(transport->recv.fraghdr);
read = ret;
}
switch (be32_to_cpu(transport->recv.calldir)) {
case RPC_CALL:
ret = xs_read_stream_call(transport, &msg, flags);
break;
case RPC_REPLY:
ret = xs_read_stream_reply(transport, &msg, flags);
}
if (msg.msg_flags & MSG_TRUNC) {
transport->recv.calldir = cpu_to_be32(-1);
transport->recv.copied = -1;
}
if (ret < 0)
goto out_err;
read += ret;
if (transport->recv.offset < transport->recv.len) {
ret = xs_read_discard(transport->sock, &msg, flags,
transport->recv.len - transport->recv.offset);
if (ret <= 0)
goto out_err;
transport->recv.offset += ret;
read += ret;
if (transport->recv.offset != transport->recv.len)
return -EAGAIN;
}
if (xs_read_stream_request_done(transport)) {
trace_xs_tcp_data_recv(transport);
transport->recv.copied = 0;
}
transport->recv.offset = 0;
transport->recv.len = 0;
return read;
out_err:
switch (ret) {
case 0:
case -ESHUTDOWN:
xprt_force_disconnect(&transport->xprt);
return -ESHUTDOWN;
}
return ret;
}
#define XS_SENDMSG_FLAGS (MSG_DONTWAIT | MSG_NOSIGNAL) #define XS_SENDMSG_FLAGS (MSG_DONTWAIT | MSG_NOSIGNAL)
static int xs_send_kvec(struct socket *sock, struct sockaddr *addr, int addrlen, struct kvec *vec, unsigned int base, int more) static int xs_send_kvec(struct socket *sock, struct sockaddr *addr, int addrlen, struct kvec *vec, unsigned int base, int more)
@ -484,6 +801,12 @@ static int xs_nospace(struct rpc_rqst *req)
return ret; return ret;
} }
static void
xs_stream_prepare_request(struct rpc_rqst *req)
{
req->rq_task->tk_status = xdr_alloc_bvec(&req->rq_rcv_buf, GFP_NOIO);
}
/* /*
* Determine if the previous message in the stream was aborted before it * Determine if the previous message in the stream was aborted before it
* could complete transmission. * could complete transmission.
@ -1157,263 +1480,7 @@ static void xs_tcp_force_close(struct rpc_xprt *xprt)
xprt_force_disconnect(xprt); xprt_force_disconnect(xprt);
} }
static inline void xs_tcp_read_fraghdr(struct rpc_xprt *xprt, struct xdr_skb_reader *desc)
{
struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
size_t len, used;
char *p;
p = ((char *) &transport->recv.fraghdr) + transport->recv.offset;
len = sizeof(transport->recv.fraghdr) - transport->recv.offset;
used = xdr_skb_read_bits(desc, p, len);
transport->recv.offset += used;
if (used != len)
return;
transport->recv.len = ntohl(transport->recv.fraghdr);
if (transport->recv.len & RPC_LAST_STREAM_FRAGMENT)
transport->recv.flags |= TCP_RCV_LAST_FRAG;
else
transport->recv.flags &= ~TCP_RCV_LAST_FRAG;
transport->recv.len &= RPC_FRAGMENT_SIZE_MASK;
transport->recv.flags &= ~TCP_RCV_COPY_FRAGHDR;
transport->recv.offset = 0;
/* Sanity check of the record length */
if (unlikely(transport->recv.len < 8)) {
dprintk("RPC: invalid TCP record fragment length\n");
xs_tcp_force_close(xprt);
return;
}
dprintk("RPC: reading TCP record fragment of length %d\n",
transport->recv.len);
}
static void xs_tcp_check_fraghdr(struct sock_xprt *transport)
{
if (transport->recv.offset == transport->recv.len) {
transport->recv.flags |= TCP_RCV_COPY_FRAGHDR;
transport->recv.offset = 0;
if (transport->recv.flags & TCP_RCV_LAST_FRAG) {
transport->recv.flags &= ~TCP_RCV_COPY_DATA;
transport->recv.flags |= TCP_RCV_COPY_XID;
transport->recv.copied = 0;
}
}
}
static inline void xs_tcp_read_xid(struct sock_xprt *transport, struct xdr_skb_reader *desc)
{
size_t len, used;
char *p;
len = sizeof(transport->recv.xid) - transport->recv.offset;
dprintk("RPC: reading XID (%zu bytes)\n", len);
p = ((char *) &transport->recv.xid) + transport->recv.offset;
used = xdr_skb_read_bits(desc, p, len);
transport->recv.offset += used;
if (used != len)
return;
transport->recv.flags &= ~TCP_RCV_COPY_XID;
transport->recv.flags |= TCP_RCV_READ_CALLDIR;
transport->recv.copied = 4;
dprintk("RPC: reading %s XID %08x\n",
(transport->recv.flags & TCP_RPC_REPLY) ? "reply for"
: "request with",
ntohl(transport->recv.xid));
xs_tcp_check_fraghdr(transport);
}
static inline void xs_tcp_read_calldir(struct sock_xprt *transport,
struct xdr_skb_reader *desc)
{
size_t len, used;
u32 offset;
char *p;
/*
* We want transport->recv.offset to be 8 at the end of this routine
* (4 bytes for the xid and 4 bytes for the call/reply flag).
* When this function is called for the first time,
* transport->recv.offset is 4 (after having already read the xid).
*/
offset = transport->recv.offset - sizeof(transport->recv.xid);
len = sizeof(transport->recv.calldir) - offset;
dprintk("RPC: reading CALL/REPLY flag (%zu bytes)\n", len);
p = ((char *) &transport->recv.calldir) + offset;
used = xdr_skb_read_bits(desc, p, len);
transport->recv.offset += used;
if (used != len)
return;
transport->recv.flags &= ~TCP_RCV_READ_CALLDIR;
/*
* We don't yet have the XDR buffer, so we will write the calldir
* out after we get the buffer from the 'struct rpc_rqst'
*/
switch (ntohl(transport->recv.calldir)) {
case RPC_REPLY:
transport->recv.flags |= TCP_RCV_COPY_CALLDIR;
transport->recv.flags |= TCP_RCV_COPY_DATA;
transport->recv.flags |= TCP_RPC_REPLY;
break;
case RPC_CALL:
transport->recv.flags |= TCP_RCV_COPY_CALLDIR;
transport->recv.flags |= TCP_RCV_COPY_DATA;
transport->recv.flags &= ~TCP_RPC_REPLY;
break;
default:
dprintk("RPC: invalid request message type\n");
xs_tcp_force_close(&transport->xprt);
}
xs_tcp_check_fraghdr(transport);
}
static inline void xs_tcp_read_common(struct rpc_xprt *xprt,
struct xdr_skb_reader *desc,
struct rpc_rqst *req)
{
struct sock_xprt *transport =
container_of(xprt, struct sock_xprt, xprt);
struct xdr_buf *rcvbuf;
size_t len;
ssize_t r;
rcvbuf = &req->rq_private_buf;
if (transport->recv.flags & TCP_RCV_COPY_CALLDIR) {
/*
* Save the RPC direction in the XDR buffer
*/
memcpy(rcvbuf->head[0].iov_base + transport->recv.copied,
&transport->recv.calldir,
sizeof(transport->recv.calldir));
transport->recv.copied += sizeof(transport->recv.calldir);
transport->recv.flags &= ~TCP_RCV_COPY_CALLDIR;
}
len = desc->count;
if (len > transport->recv.len - transport->recv.offset)
desc->count = transport->recv.len - transport->recv.offset;
r = xdr_partial_copy_from_skb(rcvbuf, transport->recv.copied,
desc, xdr_skb_read_bits);
if (desc->count) {
/* Error when copying to the receive buffer,
* usually because we weren't able to allocate
* additional buffer pages. All we can do now
* is turn off TCP_RCV_COPY_DATA, so the request
* will not receive any additional updates,
* and time out.
* Any remaining data from this record will
* be discarded.
*/
transport->recv.flags &= ~TCP_RCV_COPY_DATA;
dprintk("RPC: XID %08x truncated request\n",
ntohl(transport->recv.xid));
dprintk("RPC: xprt = %p, recv.copied = %lu, "
"recv.offset = %u, recv.len = %u\n",
xprt, transport->recv.copied,
transport->recv.offset, transport->recv.len);
return;
}
transport->recv.copied += r;
transport->recv.offset += r;
desc->count = len - r;
dprintk("RPC: XID %08x read %zd bytes\n",
ntohl(transport->recv.xid), r);
dprintk("RPC: xprt = %p, recv.copied = %lu, recv.offset = %u, "
"recv.len = %u\n", xprt, transport->recv.copied,
transport->recv.offset, transport->recv.len);
if (transport->recv.copied == req->rq_private_buf.buflen)
transport->recv.flags &= ~TCP_RCV_COPY_DATA;
else if (transport->recv.offset == transport->recv.len) {
if (transport->recv.flags & TCP_RCV_LAST_FRAG)
transport->recv.flags &= ~TCP_RCV_COPY_DATA;
}
}
/*
* Finds the request corresponding to the RPC xid and invokes the common
* tcp read code to read the data.
*/
static inline int xs_tcp_read_reply(struct rpc_xprt *xprt,
struct xdr_skb_reader *desc)
{
struct sock_xprt *transport =
container_of(xprt, struct sock_xprt, xprt);
struct rpc_rqst *req;
dprintk("RPC: read reply XID %08x\n", ntohl(transport->recv.xid));
/* Find and lock the request corresponding to this xid */
spin_lock(&xprt->queue_lock);
req = xprt_lookup_rqst(xprt, transport->recv.xid);
if (!req) {
dprintk("RPC: XID %08x request not found!\n",
ntohl(transport->recv.xid));
spin_unlock(&xprt->queue_lock);
return -1;
}
xprt_pin_rqst(req);
spin_unlock(&xprt->queue_lock);
xs_tcp_read_common(xprt, desc, req);
spin_lock(&xprt->queue_lock);
if (!(transport->recv.flags & TCP_RCV_COPY_DATA))
xprt_complete_rqst(req->rq_task, transport->recv.copied);
xprt_unpin_rqst(req);
spin_unlock(&xprt->queue_lock);
return 0;
}
#if defined(CONFIG_SUNRPC_BACKCHANNEL) #if defined(CONFIG_SUNRPC_BACKCHANNEL)
/*
* Obtains an rpc_rqst previously allocated and invokes the common
* tcp read code to read the data. The result is placed in the callback
* queue.
* If we're unable to obtain the rpc_rqst we schedule the closing of the
* connection and return -1.
*/
static int xs_tcp_read_callback(struct rpc_xprt *xprt,
struct xdr_skb_reader *desc)
{
struct sock_xprt *transport =
container_of(xprt, struct sock_xprt, xprt);
struct rpc_rqst *req;
/* Look up the request corresponding to the given XID */
req = xprt_lookup_bc_request(xprt, transport->recv.xid);
if (req == NULL) {
printk(KERN_WARNING "Callback slot table overflowed\n");
xprt_force_disconnect(xprt);
return -1;
}
dprintk("RPC: read callback XID %08x\n", ntohl(req->rq_xid));
xs_tcp_read_common(xprt, desc, req);
if (!(transport->recv.flags & TCP_RCV_COPY_DATA))
xprt_complete_bc_request(req, transport->recv.copied);
return 0;
}
static inline int _xs_tcp_read_data(struct rpc_xprt *xprt,
struct xdr_skb_reader *desc)
{
struct sock_xprt *transport =
container_of(xprt, struct sock_xprt, xprt);
return (transport->recv.flags & TCP_RPC_REPLY) ?
xs_tcp_read_reply(xprt, desc) :
xs_tcp_read_callback(xprt, desc);
}
static int xs_tcp_bc_up(struct svc_serv *serv, struct net *net) static int xs_tcp_bc_up(struct svc_serv *serv, struct net *net)
{ {
int ret; int ret;
@ -1429,106 +1496,14 @@ static size_t xs_tcp_bc_maxpayload(struct rpc_xprt *xprt)
{ {
return PAGE_SIZE; return PAGE_SIZE;
} }
#else
static inline int _xs_tcp_read_data(struct rpc_xprt *xprt,
struct xdr_skb_reader *desc)
{
return xs_tcp_read_reply(xprt, desc);
}
#endif /* CONFIG_SUNRPC_BACKCHANNEL */ #endif /* CONFIG_SUNRPC_BACKCHANNEL */
/*
* Read data off the transport. This can be either an RPC_CALL or an
* RPC_REPLY. Relay the processing to helper functions.
*/
static void xs_tcp_read_data(struct rpc_xprt *xprt,
struct xdr_skb_reader *desc)
{
struct sock_xprt *transport =
container_of(xprt, struct sock_xprt, xprt);
if (_xs_tcp_read_data(xprt, desc) == 0)
xs_tcp_check_fraghdr(transport);
else {
/*
* The transport_lock protects the request handling.
* There's no need to hold it to update the recv.flags.
*/
transport->recv.flags &= ~TCP_RCV_COPY_DATA;
}
}
static inline void xs_tcp_read_discard(struct sock_xprt *transport, struct xdr_skb_reader *desc)
{
size_t len;
len = transport->recv.len - transport->recv.offset;
if (len > desc->count)
len = desc->count;
desc->count -= len;
desc->offset += len;
transport->recv.offset += len;
dprintk("RPC: discarded %zu bytes\n", len);
xs_tcp_check_fraghdr(transport);
}
static int xs_tcp_data_recv(read_descriptor_t *rd_desc, struct sk_buff *skb, unsigned int offset, size_t len)
{
struct rpc_xprt *xprt = rd_desc->arg.data;
struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
struct xdr_skb_reader desc = {
.skb = skb,
.offset = offset,
.count = len,
};
size_t ret;
dprintk("RPC: xs_tcp_data_recv started\n");
do {
trace_xs_tcp_data_recv(transport);
/* Read in a new fragment marker if necessary */
/* Can we ever really expect to get completely empty fragments? */
if (transport->recv.flags & TCP_RCV_COPY_FRAGHDR) {
xs_tcp_read_fraghdr(xprt, &desc);
continue;
}
/* Read in the xid if necessary */
if (transport->recv.flags & TCP_RCV_COPY_XID) {
xs_tcp_read_xid(transport, &desc);
continue;
}
/* Read in the call/reply flag */
if (transport->recv.flags & TCP_RCV_READ_CALLDIR) {
xs_tcp_read_calldir(transport, &desc);
continue;
}
/* Read in the request data */
if (transport->recv.flags & TCP_RCV_COPY_DATA) {
xs_tcp_read_data(xprt, &desc);
continue;
}
/* Skip over any trailing bytes on short reads */
xs_tcp_read_discard(transport, &desc);
} while (desc.count);
ret = len - desc.count;
if (ret < rd_desc->count)
rd_desc->count -= ret;
else
rd_desc->count = 0;
trace_xs_tcp_data_recv(transport);
dprintk("RPC: xs_tcp_data_recv done\n");
return ret;
}
static void xs_tcp_data_receive(struct sock_xprt *transport) static void xs_tcp_data_receive(struct sock_xprt *transport)
{ {
struct rpc_xprt *xprt = &transport->xprt; struct rpc_xprt *xprt = &transport->xprt;
struct sock *sk; struct sock *sk;
read_descriptor_t rd_desc = { size_t read = 0;
.arg.data = xprt, ssize_t ret = 0;
};
unsigned long total = 0;
int read = 0;
restart: restart:
mutex_lock(&transport->recv_mutex); mutex_lock(&transport->recv_mutex);
@ -1536,18 +1511,12 @@ restart:
if (sk == NULL) if (sk == NULL)
goto out; goto out;
/* We use rd_desc to pass struct xprt to xs_tcp_data_recv */
for (;;) { for (;;) {
rd_desc.count = RPC_TCP_READ_CHUNK_SZ; clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state);
lock_sock(sk); ret = xs_read_stream(transport, MSG_DONTWAIT | MSG_NOSIGNAL);
read = tcp_read_sock(sk, &rd_desc, xs_tcp_data_recv); if (ret < 0)
if (rd_desc.count != 0 || read < 0) {
clear_bit(XPRT_SOCK_DATA_READY, &transport->sock_state);
release_sock(sk);
break; break;
} read += ret;
release_sock(sk);
total += read;
if (need_resched()) { if (need_resched()) {
mutex_unlock(&transport->recv_mutex); mutex_unlock(&transport->recv_mutex);
cond_resched(); cond_resched();
@ -1558,7 +1527,7 @@ restart:
queue_work(xprtiod_workqueue, &transport->recv_worker); queue_work(xprtiod_workqueue, &transport->recv_worker);
out: out:
mutex_unlock(&transport->recv_mutex); mutex_unlock(&transport->recv_mutex);
trace_xs_tcp_data_ready(xprt, read, total); trace_xs_tcp_data_ready(xprt, ret, read);
} }
static void xs_tcp_data_receive_workfn(struct work_struct *work) static void xs_tcp_data_receive_workfn(struct work_struct *work)
@ -2380,7 +2349,6 @@ static int xs_tcp_finish_connecting(struct rpc_xprt *xprt, struct socket *sock)
transport->recv.offset = 0; transport->recv.offset = 0;
transport->recv.len = 0; transport->recv.len = 0;
transport->recv.copied = 0; transport->recv.copied = 0;
transport->recv.flags = TCP_RCV_COPY_FRAGHDR | TCP_RCV_COPY_XID;
transport->xmit.offset = 0; transport->xmit.offset = 0;
/* Tell the socket layer to start connecting... */ /* Tell the socket layer to start connecting... */
@ -2802,6 +2770,7 @@ static const struct rpc_xprt_ops xs_tcp_ops = {
.connect = xs_connect, .connect = xs_connect,
.buf_alloc = rpc_malloc, .buf_alloc = rpc_malloc,
.buf_free = rpc_free, .buf_free = rpc_free,
.prepare_request = xs_stream_prepare_request,
.send_request = xs_tcp_send_request, .send_request = xs_tcp_send_request,
.set_retrans_timeout = xprt_set_retrans_timeout_def, .set_retrans_timeout = xprt_set_retrans_timeout_def,
.close = xs_tcp_shutdown, .close = xs_tcp_shutdown,