2007-09-10 21:50:12 +04:00
/*
* Copyright ( c ) 2003 - 2007 Network Appliance , Inc . All rights reserved .
*
* This software is available to you under a choice of one of two
* licenses . You may choose to be licensed under the terms of the GNU
* General Public License ( GPL ) Version 2 , available from the file
* COPYING in the main directory of this source tree , or the BSD - type
* license below :
*
* Redistribution and use in source and binary forms , with or without
* modification , are permitted provided that the following conditions
* are met :
*
* Redistributions of source code must retain the above copyright
* notice , this list of conditions and the following disclaimer .
*
* Redistributions in binary form must reproduce the above
* copyright notice , this list of conditions and the following
* disclaimer in the documentation and / or other materials provided
* with the distribution .
*
* Neither the name of the Network Appliance , Inc . nor the names of
* its contributors may be used to endorse or promote products
* derived from this software without specific prior written
* permission .
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* " AS IS " AND ANY EXPRESS OR IMPLIED WARRANTIES , INCLUDING , BUT NOT
* LIMITED TO , THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED . IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT , INDIRECT , INCIDENTAL ,
* SPECIAL , EXEMPLARY , OR CONSEQUENTIAL DAMAGES ( INCLUDING , BUT NOT
* LIMITED TO , PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES ; LOSS OF USE ,
* DATA , OR PROFITS ; OR BUSINESS INTERRUPTION ) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY , WHETHER IN CONTRACT , STRICT LIABILITY , OR TORT
* ( INCLUDING NEGLIGENCE OR OTHERWISE ) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE , EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE .
*/
/*
* transport . c
*
* This file contains the top - level implementation of an RPC RDMA
* transport .
*
* Naming convention : functions beginning with xprt_ are part of the
* transport switch . All others are RPC RDMA internal .
*/
# include <linux/module.h>
# include <linux/init.h>
# include <linux/seq_file.h>
# include "xprt_rdma.h"
# ifdef RPC_DEBUG
# define RPCDBG_FACILITY RPCDBG_TRANS
# endif
MODULE_LICENSE ( " Dual BSD/GPL " ) ;
MODULE_DESCRIPTION ( " RPC/RDMA Transport for Linux kernel NFS " ) ;
MODULE_AUTHOR ( " Network Appliance, Inc. " ) ;
/*
* tunables
*/
static unsigned int xprt_rdma_slot_table_entries = RPCRDMA_DEF_SLOT_TABLE ;
static unsigned int xprt_rdma_max_inline_read = RPCRDMA_DEF_INLINE ;
static unsigned int xprt_rdma_max_inline_write = RPCRDMA_DEF_INLINE ;
static unsigned int xprt_rdma_inline_write_padding ;
2008-10-09 23:00:20 +04:00
static unsigned int xprt_rdma_memreg_strategy = RPCRDMA_FRMR ;
2007-09-10 21:50:12 +04:00
# ifdef RPC_DEBUG
static unsigned int min_slot_table_size = RPCRDMA_MIN_SLOT_TABLE ;
static unsigned int max_slot_table_size = RPCRDMA_MAX_SLOT_TABLE ;
static unsigned int zero ;
static unsigned int max_padding = PAGE_SIZE ;
static unsigned int min_memreg = RPCRDMA_BOUNCEBUFFERS ;
static unsigned int max_memreg = RPCRDMA_LAST - 1 ;
static struct ctl_table_header * sunrpc_table_header ;
static ctl_table xr_tunables_table [ ] = {
{
2007-11-26 20:42:44 +03:00
. ctl_name = CTL_UNNUMBERED ,
2007-09-10 21:50:12 +04:00
. procname = " rdma_slot_table_entries " ,
. data = & xprt_rdma_slot_table_entries ,
. maxlen = sizeof ( unsigned int ) ,
. mode = 0644 ,
. proc_handler = & proc_dointvec_minmax ,
. strategy = & sysctl_intvec ,
. extra1 = & min_slot_table_size ,
. extra2 = & max_slot_table_size
} ,
{
2007-11-26 20:42:44 +03:00
. ctl_name = CTL_UNNUMBERED ,
2007-09-10 21:50:12 +04:00
. procname = " rdma_max_inline_read " ,
. data = & xprt_rdma_max_inline_read ,
. maxlen = sizeof ( unsigned int ) ,
. mode = 0644 ,
. proc_handler = & proc_dointvec ,
. strategy = & sysctl_intvec ,
} ,
{
2007-11-26 20:42:44 +03:00
. ctl_name = CTL_UNNUMBERED ,
2007-09-10 21:50:12 +04:00
. procname = " rdma_max_inline_write " ,
. data = & xprt_rdma_max_inline_write ,
. maxlen = sizeof ( unsigned int ) ,
. mode = 0644 ,
. proc_handler = & proc_dointvec ,
. strategy = & sysctl_intvec ,
} ,
{
2007-11-26 20:42:44 +03:00
. ctl_name = CTL_UNNUMBERED ,
2007-09-10 21:50:12 +04:00
. procname = " rdma_inline_write_padding " ,
. data = & xprt_rdma_inline_write_padding ,
. maxlen = sizeof ( unsigned int ) ,
. mode = 0644 ,
. proc_handler = & proc_dointvec_minmax ,
. strategy = & sysctl_intvec ,
. extra1 = & zero ,
. extra2 = & max_padding ,
} ,
{
2007-11-26 20:42:44 +03:00
. ctl_name = CTL_UNNUMBERED ,
2007-09-10 21:50:12 +04:00
. procname = " rdma_memreg_strategy " ,
. data = & xprt_rdma_memreg_strategy ,
. maxlen = sizeof ( unsigned int ) ,
. mode = 0644 ,
. proc_handler = & proc_dointvec_minmax ,
. strategy = & sysctl_intvec ,
. extra1 = & min_memreg ,
. extra2 = & max_memreg ,
} ,
{
. ctl_name = 0 ,
} ,
} ;
static ctl_table sunrpc_table [ ] = {
{
. ctl_name = CTL_SUNRPC ,
. procname = " sunrpc " ,
. mode = 0555 ,
. child = xr_tunables_table
} ,
{
. ctl_name = 0 ,
} ,
} ;
# endif
static struct rpc_xprt_ops xprt_rdma_procs ; /* forward reference */
static void
xprt_rdma_format_addresses ( struct rpc_xprt * xprt )
{
struct sockaddr_in * addr = ( struct sockaddr_in * )
& rpcx_to_rdmad ( xprt ) . addr ;
char * buf ;
buf = kzalloc ( 20 , GFP_KERNEL ) ;
if ( buf )
snprintf ( buf , 20 , NIPQUAD_FMT , NIPQUAD ( addr - > sin_addr . s_addr ) ) ;
xprt - > address_strings [ RPC_DISPLAY_ADDR ] = buf ;
buf = kzalloc ( 8 , GFP_KERNEL ) ;
if ( buf )
snprintf ( buf , 8 , " %u " , ntohs ( addr - > sin_port ) ) ;
xprt - > address_strings [ RPC_DISPLAY_PORT ] = buf ;
xprt - > address_strings [ RPC_DISPLAY_PROTO ] = " rdma " ;
buf = kzalloc ( 48 , GFP_KERNEL ) ;
if ( buf )
snprintf ( buf , 48 , " addr= " NIPQUAD_FMT " port=%u proto=%s " ,
NIPQUAD ( addr - > sin_addr . s_addr ) ,
ntohs ( addr - > sin_port ) , " rdma " ) ;
xprt - > address_strings [ RPC_DISPLAY_ALL ] = buf ;
buf = kzalloc ( 10 , GFP_KERNEL ) ;
if ( buf )
snprintf ( buf , 10 , " %02x%02x%02x%02x " ,
NIPQUAD ( addr - > sin_addr . s_addr ) ) ;
xprt - > address_strings [ RPC_DISPLAY_HEX_ADDR ] = buf ;
buf = kzalloc ( 8 , GFP_KERNEL ) ;
if ( buf )
snprintf ( buf , 8 , " %4hx " , ntohs ( addr - > sin_port ) ) ;
xprt - > address_strings [ RPC_DISPLAY_HEX_PORT ] = buf ;
buf = kzalloc ( 30 , GFP_KERNEL ) ;
if ( buf )
snprintf ( buf , 30 , NIPQUAD_FMT " .%u.%u " ,
NIPQUAD ( addr - > sin_addr . s_addr ) ,
ntohs ( addr - > sin_port ) > > 8 ,
ntohs ( addr - > sin_port ) & 0xff ) ;
xprt - > address_strings [ RPC_DISPLAY_UNIVERSAL_ADDR ] = buf ;
/* netid */
xprt - > address_strings [ RPC_DISPLAY_NETID ] = " rdma " ;
}
static void
xprt_rdma_free_addresses ( struct rpc_xprt * xprt )
{
2008-01-14 20:32:20 +03:00
unsigned int i ;
for ( i = 0 ; i < RPC_DISPLAY_MAX ; i + + )
switch ( i ) {
case RPC_DISPLAY_PROTO :
case RPC_DISPLAY_NETID :
continue ;
default :
kfree ( xprt - > address_strings [ i ] ) ;
}
2007-09-10 21:50:12 +04:00
}
static void
xprt_rdma_connect_worker ( struct work_struct * work )
{
struct rpcrdma_xprt * r_xprt =
container_of ( work , struct rpcrdma_xprt , rdma_connect . work ) ;
struct rpc_xprt * xprt = & r_xprt - > xprt ;
int rc = 0 ;
if ( ! xprt - > shutdown ) {
xprt_clear_connected ( xprt ) ;
dprintk ( " RPC: %s: %sconnect \n " , __func__ ,
r_xprt - > rx_ep . rep_connected ! = 0 ? " re " : " " ) ;
rc = rpcrdma_ep_connect ( & r_xprt - > rx_ep , & r_xprt - > rx_ia ) ;
if ( rc )
goto out ;
}
goto out_clear ;
out :
xprt_wake_pending_tasks ( xprt , rc ) ;
out_clear :
dprintk ( " RPC: %s: exit \n " , __func__ ) ;
xprt_clear_connecting ( xprt ) ;
}
/*
* xprt_rdma_destroy
*
* Destroy the xprt .
* Free all memory associated with the object , including its own .
* NOTE : none of the * destroy methods free memory for their top - level
* objects , even though they may have allocated it ( they do free
* private memory ) . It ' s up to the caller to handle it . In this
* case ( RDMA transport ) , all structure memory is inlined with the
* struct rpcrdma_xprt .
*/
static void
xprt_rdma_destroy ( struct rpc_xprt * xprt )
{
struct rpcrdma_xprt * r_xprt = rpcx_to_rdmax ( xprt ) ;
int rc ;
dprintk ( " RPC: %s: called \n " , __func__ ) ;
cancel_delayed_work ( & r_xprt - > rdma_connect ) ;
flush_scheduled_work ( ) ;
xprt_clear_connected ( xprt ) ;
rpcrdma_buffer_destroy ( & r_xprt - > rx_buf ) ;
rc = rpcrdma_ep_destroy ( & r_xprt - > rx_ep , & r_xprt - > rx_ia ) ;
if ( rc )
dprintk ( " RPC: %s: rpcrdma_ep_destroy returned %i \n " ,
__func__ , rc ) ;
rpcrdma_ia_close ( & r_xprt - > rx_ia ) ;
xprt_rdma_free_addresses ( xprt ) ;
kfree ( xprt - > slot ) ;
xprt - > slot = NULL ;
kfree ( xprt ) ;
dprintk ( " RPC: %s: returning \n " , __func__ ) ;
module_put ( THIS_MODULE ) ;
}
2007-12-21 00:03:54 +03:00
static const struct rpc_timeout xprt_rdma_default_timeout = {
. to_initval = 60 * HZ ,
. to_maxval = 60 * HZ ,
} ;
2007-09-10 21:50:12 +04:00
/**
* xprt_setup_rdma - Set up transport to use RDMA
*
* @ args : rpc transport arguments
*/
static struct rpc_xprt *
xprt_setup_rdma ( struct xprt_create * args )
{
struct rpcrdma_create_data_internal cdata ;
struct rpc_xprt * xprt ;
struct rpcrdma_xprt * new_xprt ;
struct rpcrdma_ep * new_ep ;
struct sockaddr_in * sin ;
int rc ;
if ( args - > addrlen > sizeof ( xprt - > addr ) ) {
dprintk ( " RPC: %s: address too large \n " , __func__ ) ;
return ERR_PTR ( - EBADF ) ;
}
xprt = kzalloc ( sizeof ( struct rpcrdma_xprt ) , GFP_KERNEL ) ;
if ( xprt = = NULL ) {
dprintk ( " RPC: %s: couldn't allocate rpcrdma_xprt \n " ,
__func__ ) ;
return ERR_PTR ( - ENOMEM ) ;
}
xprt - > max_reqs = xprt_rdma_slot_table_entries ;
xprt - > slot = kcalloc ( xprt - > max_reqs ,
sizeof ( struct rpc_rqst ) , GFP_KERNEL ) ;
if ( xprt - > slot = = NULL ) {
dprintk ( " RPC: %s: couldn't allocate %d slots \n " ,
__func__ , xprt - > max_reqs ) ;
2007-11-15 04:00:00 +03:00
kfree ( xprt ) ;
2007-09-10 21:50:12 +04:00
return ERR_PTR ( - ENOMEM ) ;
}
/* 60 second timeout, no retries */
2007-12-21 00:03:55 +03:00
xprt - > timeout = & xprt_rdma_default_timeout ;
2007-09-10 21:50:12 +04:00
xprt - > bind_timeout = ( 60U * HZ ) ;
xprt - > connect_timeout = ( 60U * HZ ) ;
xprt - > reestablish_timeout = ( 5U * HZ ) ;
xprt - > idle_timeout = ( 5U * 60 * HZ ) ;
xprt - > resvport = 0 ; /* privileged port not needed */
xprt - > tsh_size = 0 ; /* RPC-RDMA handles framing */
xprt - > max_payload = RPCRDMA_MAX_DATA_SEGS * PAGE_SIZE ;
xprt - > ops = & xprt_rdma_procs ;
/*
* Set up RDMA - specific connect data .
*/
/* Put server RDMA address in local cdata */
memcpy ( & cdata . addr , args - > dstaddr , args - > addrlen ) ;
/* Ensure xprt->addr holds valid server TCP (not RDMA)
* address , for any side protocols which peek at it */
xprt - > prot = IPPROTO_TCP ;
xprt - > addrlen = args - > addrlen ;
memcpy ( & xprt - > addr , & cdata . addr , xprt - > addrlen ) ;
sin = ( struct sockaddr_in * ) & cdata . addr ;
if ( ntohs ( sin - > sin_port ) ! = 0 )
xprt_set_bound ( xprt ) ;
dprintk ( " RPC: %s: %u.%u.%u.%u:%u \n " , __func__ ,
NIPQUAD ( sin - > sin_addr . s_addr ) , ntohs ( sin - > sin_port ) ) ;
/* Set max requests */
cdata . max_requests = xprt - > max_reqs ;
/* Set some length limits */
cdata . rsize = RPCRDMA_MAX_SEGS * PAGE_SIZE ; /* RDMA write max */
cdata . wsize = RPCRDMA_MAX_SEGS * PAGE_SIZE ; /* RDMA read max */
cdata . inline_wsize = xprt_rdma_max_inline_write ;
if ( cdata . inline_wsize > cdata . wsize )
cdata . inline_wsize = cdata . wsize ;
cdata . inline_rsize = xprt_rdma_max_inline_read ;
if ( cdata . inline_rsize > cdata . rsize )
cdata . inline_rsize = cdata . rsize ;
cdata . padding = xprt_rdma_inline_write_padding ;
/*
* Create new transport instance , which includes initialized
* o ia
* o endpoint
* o buffers
*/
new_xprt = rpcx_to_rdmax ( xprt ) ;
rc = rpcrdma_ia_open ( new_xprt , ( struct sockaddr * ) & cdata . addr ,
xprt_rdma_memreg_strategy ) ;
if ( rc )
goto out1 ;
/*
* initialize and create ep
*/
new_xprt - > rx_data = cdata ;
new_ep = & new_xprt - > rx_ep ;
new_ep - > rep_remote_addr = cdata . addr ;
rc = rpcrdma_ep_create ( & new_xprt - > rx_ep ,
& new_xprt - > rx_ia , & new_xprt - > rx_data ) ;
if ( rc )
goto out2 ;
/*
* Allocate pre - registered send and receive buffers for headers and
* any inline data . Also specify any padding which will be provided
* from a preregistered zero buffer .
*/
rc = rpcrdma_buffer_create ( & new_xprt - > rx_buf , new_ep , & new_xprt - > rx_ia ,
& new_xprt - > rx_data ) ;
if ( rc )
goto out3 ;
/*
* Register a callback for connection events . This is necessary because
* connection loss notification is async . We also catch connection loss
* when reaping receives .
*/
INIT_DELAYED_WORK ( & new_xprt - > rdma_connect , xprt_rdma_connect_worker ) ;
new_ep - > rep_func = rpcrdma_conn_func ;
new_ep - > rep_xprt = xprt ;
xprt_rdma_format_addresses ( xprt ) ;
if ( ! try_module_get ( THIS_MODULE ) )
goto out4 ;
return xprt ;
out4 :
xprt_rdma_free_addresses ( xprt ) ;
rc = - EINVAL ;
out3 :
( void ) rpcrdma_ep_destroy ( new_ep , & new_xprt - > rx_ia ) ;
out2 :
rpcrdma_ia_close ( & new_xprt - > rx_ia ) ;
out1 :
kfree ( xprt - > slot ) ;
kfree ( xprt ) ;
return ERR_PTR ( rc ) ;
}
/*
* Close a connection , during shutdown or timeout / reconnect
*/
static void
xprt_rdma_close ( struct rpc_xprt * xprt )
{
struct rpcrdma_xprt * r_xprt = rpcx_to_rdmax ( xprt ) ;
dprintk ( " RPC: %s: closing \n " , __func__ ) ;
2007-11-07 02:44:20 +03:00
xprt_disconnect_done ( xprt ) ;
2007-09-10 21:50:12 +04:00
( void ) rpcrdma_ep_disconnect ( & r_xprt - > rx_ep , & r_xprt - > rx_ia ) ;
}
static void
xprt_rdma_set_port ( struct rpc_xprt * xprt , u16 port )
{
struct sockaddr_in * sap ;
sap = ( struct sockaddr_in * ) & xprt - > addr ;
sap - > sin_port = htons ( port ) ;
sap = ( struct sockaddr_in * ) & rpcx_to_rdmad ( xprt ) . addr ;
sap - > sin_port = htons ( port ) ;
dprintk ( " RPC: %s: %u \n " , __func__ , port ) ;
}
static void
xprt_rdma_connect ( struct rpc_task * task )
{
struct rpc_xprt * xprt = ( struct rpc_xprt * ) task - > tk_xprt ;
struct rpcrdma_xprt * r_xprt = rpcx_to_rdmax ( xprt ) ;
if ( ! xprt_test_and_set_connecting ( xprt ) ) {
if ( r_xprt - > rx_ep . rep_connected ! = 0 ) {
/* Reconnect */
schedule_delayed_work ( & r_xprt - > rdma_connect ,
xprt - > reestablish_timeout ) ;
} else {
schedule_delayed_work ( & r_xprt - > rdma_connect , 0 ) ;
if ( ! RPC_IS_ASYNC ( task ) )
flush_scheduled_work ( ) ;
}
}
}
static int
xprt_rdma_reserve_xprt ( struct rpc_task * task )
{
struct rpc_xprt * xprt = task - > tk_xprt ;
struct rpcrdma_xprt * r_xprt = rpcx_to_rdmax ( xprt ) ;
int credits = atomic_read ( & r_xprt - > rx_buf . rb_credits ) ;
/* == RPC_CWNDSCALE @ init, but *after* setup */
if ( r_xprt - > rx_buf . rb_cwndscale = = 0UL ) {
r_xprt - > rx_buf . rb_cwndscale = xprt - > cwnd ;
dprintk ( " RPC: %s: cwndscale %lu \n " , __func__ ,
r_xprt - > rx_buf . rb_cwndscale ) ;
BUG_ON ( r_xprt - > rx_buf . rb_cwndscale < = 0 ) ;
}
xprt - > cwnd = credits * r_xprt - > rx_buf . rb_cwndscale ;
return xprt_reserve_xprt_cong ( task ) ;
}
/*
* The RDMA allocate / free functions need the task structure as a place
* to hide the struct rpcrdma_req , which is necessary for the actual send / recv
* sequence . For this reason , the recv buffers are attached to send
* buffers for portions of the RPC . Note that the RPC layer allocates
* both send and receive buffers in the same call . We may register
* the receive buffer portion when using reply chunks .
*/
static void *
xprt_rdma_allocate ( struct rpc_task * task , size_t size )
{
struct rpc_xprt * xprt = task - > tk_xprt ;
struct rpcrdma_req * req , * nreq ;
req = rpcrdma_buffer_get ( & rpcx_to_rdmax ( xprt ) - > rx_buf ) ;
BUG_ON ( NULL = = req ) ;
if ( size > req - > rl_size ) {
dprintk ( " RPC: %s: size %zd too large for buffer[%zd]: "
" prog %d vers %d proc %d \n " ,
__func__ , size , req - > rl_size ,
task - > tk_client - > cl_prog , task - > tk_client - > cl_vers ,
task - > tk_msg . rpc_proc - > p_proc ) ;
/*
* Outgoing length shortage . Our inline write max must have
* been configured to perform direct i / o .
*
* This is therefore a large metadata operation , and the
* allocate call was made on the maximum possible message ,
* e . g . containing long filename ( s ) or symlink data . In
* fact , while these metadata operations * might * carry
* large outgoing payloads , they rarely * do * . However , we
* have to commit to the request here , so reallocate and
* register it now . The data path will never require this
* reallocation .
*
* If the allocation or registration fails , the RPC framework
* will ( doggedly ) retry .
*/
if ( rpcx_to_rdmax ( xprt ) - > rx_ia . ri_memreg_strategy = =
RPCRDMA_BOUNCEBUFFERS ) {
/* forced to "pure inline" */
dprintk ( " RPC: %s: too much data (%zd) for inline "
" (r/w max %d/%d) \n " , __func__ , size ,
rpcx_to_rdmad ( xprt ) . inline_rsize ,
rpcx_to_rdmad ( xprt ) . inline_wsize ) ;
size = req - > rl_size ;
rpc_exit ( task , - EIO ) ; /* fail the operation */
rpcx_to_rdmax ( xprt ) - > rx_stats . failed_marshal_count + + ;
goto out ;
}
if ( task - > tk_flags & RPC_TASK_SWAPPER )
nreq = kmalloc ( sizeof * req + size , GFP_ATOMIC ) ;
else
nreq = kmalloc ( sizeof * req + size , GFP_NOFS ) ;
if ( nreq = = NULL )
goto outfail ;
if ( rpcrdma_register_internal ( & rpcx_to_rdmax ( xprt ) - > rx_ia ,
nreq - > rl_base , size + sizeof ( struct rpcrdma_req )
- offsetof ( struct rpcrdma_req , rl_base ) ,
& nreq - > rl_handle , & nreq - > rl_iov ) ) {
kfree ( nreq ) ;
goto outfail ;
}
rpcx_to_rdmax ( xprt ) - > rx_stats . hardway_register_count + = size ;
nreq - > rl_size = size ;
nreq - > rl_niovs = 0 ;
nreq - > rl_nchunks = 0 ;
nreq - > rl_buffer = ( struct rpcrdma_buffer * ) req ;
nreq - > rl_reply = req - > rl_reply ;
memcpy ( nreq - > rl_segments ,
req - > rl_segments , sizeof nreq - > rl_segments ) ;
/* flag the swap with an unused field */
nreq - > rl_iov . length = 0 ;
req - > rl_reply = NULL ;
req = nreq ;
}
dprintk ( " RPC: %s: size %zd, request 0x%p \n " , __func__ , size , req ) ;
out :
2008-10-09 23:00:40 +04:00
req - > rl_connect_cookie = 0 ; /* our reserved value */
2007-09-10 21:50:12 +04:00
return req - > rl_xdr_buf ;
outfail :
rpcrdma_buffer_put ( req ) ;
rpcx_to_rdmax ( xprt ) - > rx_stats . failed_marshal_count + + ;
return NULL ;
}
/*
* This function returns all RDMA resources to the pool .
*/
static void
xprt_rdma_free ( void * buffer )
{
struct rpcrdma_req * req ;
struct rpcrdma_xprt * r_xprt ;
struct rpcrdma_rep * rep ;
int i ;
if ( buffer = = NULL )
return ;
req = container_of ( buffer , struct rpcrdma_req , rl_xdr_buf [ 0 ] ) ;
2008-02-27 23:04:26 +03:00
if ( req - > rl_iov . length = = 0 ) { /* see allocate above */
r_xprt = container_of ( ( ( struct rpcrdma_req * ) req - > rl_buffer ) - > rl_buffer ,
struct rpcrdma_xprt , rx_buf ) ;
} else
r_xprt = container_of ( req - > rl_buffer , struct rpcrdma_xprt , rx_buf ) ;
2007-09-10 21:50:12 +04:00
rep = req - > rl_reply ;
dprintk ( " RPC: %s: called on 0x%p%s \n " ,
__func__ , rep , ( rep & & rep - > rr_func ) ? " (with waiter) " : " " ) ;
/*
* Finish the deregistration . When using mw bind , this was
* begun in rpcrdma_reply_handler ( ) . In all other modes , we
* do it here , in thread context . The process is considered
* complete when the rr_func vector becomes NULL - this
* was put in place during rpcrdma_reply_handler ( ) - the wait
* call below will not block if the dereg is " done " . If
* interrupted , our framework will clean up .
*/
for ( i = 0 ; req - > rl_nchunks ; ) {
- - req - > rl_nchunks ;
i + = rpcrdma_deregister_external (
& req - > rl_segments [ i ] , r_xprt , NULL ) ;
}
if ( rep & & wait_event_interruptible ( rep - > rr_unbind , ! rep - > rr_func ) ) {
rep - > rr_func = NULL ; /* abandon the callback */
req - > rl_reply = NULL ;
}
if ( req - > rl_iov . length = = 0 ) { /* see allocate above */
struct rpcrdma_req * oreq = ( struct rpcrdma_req * ) req - > rl_buffer ;
oreq - > rl_reply = req - > rl_reply ;
( void ) rpcrdma_deregister_internal ( & r_xprt - > rx_ia ,
req - > rl_handle ,
& req - > rl_iov ) ;
kfree ( req ) ;
req = oreq ;
}
/* Put back request+reply buffers */
rpcrdma_buffer_put ( req ) ;
}
/*
* send_request invokes the meat of RPC RDMA . It must do the following :
* 1. Marshal the RPC request into an RPC RDMA request , which means
* putting a header in front of data , and creating IOVs for RDMA
* from those in the request .
* 2. In marshaling , detect opportunities for RDMA , and use them .
* 3. Post a recv message to set up asynch completion , then send
* the request ( rpcrdma_ep_post ) .
* 4. No partial sends are possible in the RPC - RDMA protocol ( as in UDP ) .
*/
static int
xprt_rdma_send_request ( struct rpc_task * task )
{
struct rpc_rqst * rqst = task - > tk_rqstp ;
struct rpc_xprt * xprt = task - > tk_xprt ;
struct rpcrdma_req * req = rpcr_to_rdmar ( rqst ) ;
struct rpcrdma_xprt * r_xprt = rpcx_to_rdmax ( xprt ) ;
/* marshal the send itself */
if ( req - > rl_niovs = = 0 & & rpcrdma_marshal_req ( rqst ) ! = 0 ) {
r_xprt - > rx_stats . failed_marshal_count + + ;
dprintk ( " RPC: %s: rpcrdma_marshal_req failed \n " ,
__func__ ) ;
return - EIO ;
}
if ( req - > rl_reply = = NULL ) /* e.g. reconnection */
rpcrdma_recv_buffer_get ( req ) ;
if ( req - > rl_reply ) {
req - > rl_reply - > rr_func = rpcrdma_reply_handler ;
/* this need only be done once, but... */
req - > rl_reply - > rr_xprt = xprt ;
}
2008-10-09 23:00:40 +04:00
/* Must suppress retransmit to maintain credits */
if ( req - > rl_connect_cookie = = xprt - > connect_cookie )
goto drop_connection ;
req - > rl_connect_cookie = xprt - > connect_cookie ;
if ( rpcrdma_ep_post ( & r_xprt - > rx_ia , & r_xprt - > rx_ep , req ) )
goto drop_connection ;
2007-09-10 21:50:12 +04:00
2008-10-09 23:00:50 +04:00
task - > tk_bytes_sent + = rqst - > rq_snd_buf . len ;
2007-09-10 21:50:12 +04:00
rqst - > rq_bytes_sent = 0 ;
return 0 ;
2008-10-09 23:00:40 +04:00
drop_connection :
xprt_disconnect_done ( xprt ) ;
return - ENOTCONN ; /* implies disconnect */
2007-09-10 21:50:12 +04:00
}
static void xprt_rdma_print_stats ( struct rpc_xprt * xprt , struct seq_file * seq )
{
struct rpcrdma_xprt * r_xprt = rpcx_to_rdmax ( xprt ) ;
long idle_time = 0 ;
if ( xprt_connected ( xprt ) )
idle_time = ( long ) ( jiffies - xprt - > last_used ) / HZ ;
seq_printf ( seq ,
" \t xprt: \t rdma %u %lu %lu %lu %ld %lu %lu %lu %Lu %Lu "
" %lu %lu %lu %Lu %Lu %Lu %Lu %lu %lu %lu \n " ,
0 , /* need a local port? */
xprt - > stat . bind_count ,
xprt - > stat . connect_count ,
xprt - > stat . connect_time ,
idle_time ,
xprt - > stat . sends ,
xprt - > stat . recvs ,
xprt - > stat . bad_xids ,
xprt - > stat . req_u ,
xprt - > stat . bklog_u ,
r_xprt - > rx_stats . read_chunk_count ,
r_xprt - > rx_stats . write_chunk_count ,
r_xprt - > rx_stats . reply_chunk_count ,
r_xprt - > rx_stats . total_rdma_request ,
r_xprt - > rx_stats . total_rdma_reply ,
r_xprt - > rx_stats . pullup_copy_count ,
r_xprt - > rx_stats . fixup_copy_count ,
r_xprt - > rx_stats . hardway_register_count ,
r_xprt - > rx_stats . failed_marshal_count ,
r_xprt - > rx_stats . bad_reply_count ) ;
}
/*
* Plumbing for rpc transport switch and kernel module
*/
static struct rpc_xprt_ops xprt_rdma_procs = {
. reserve_xprt = xprt_rdma_reserve_xprt ,
. release_xprt = xprt_release_xprt_cong , /* sunrpc/xprt.c */
. release_request = xprt_release_rqst_cong , /* ditto */
. set_retrans_timeout = xprt_set_retrans_timeout_def , /* ditto */
. rpcbind = rpcb_getport_async , /* sunrpc/rpcb_clnt.c */
. set_port = xprt_rdma_set_port ,
. connect = xprt_rdma_connect ,
. buf_alloc = xprt_rdma_allocate ,
. buf_free = xprt_rdma_free ,
. send_request = xprt_rdma_send_request ,
. close = xprt_rdma_close ,
. destroy = xprt_rdma_destroy ,
. print_stats = xprt_rdma_print_stats
} ;
static struct xprt_class xprt_rdma = {
. list = LIST_HEAD_INIT ( xprt_rdma . list ) ,
. name = " rdma " ,
. owner = THIS_MODULE ,
. ident = XPRT_TRANSPORT_RDMA ,
. setup = xprt_setup_rdma ,
} ;
static void __exit xprt_rdma_cleanup ( void )
{
int rc ;
dprintk ( " RPCRDMA Module Removed, deregister RPC RDMA transport \n " ) ;
# ifdef RPC_DEBUG
if ( sunrpc_table_header ) {
unregister_sysctl_table ( sunrpc_table_header ) ;
sunrpc_table_header = NULL ;
}
# endif
rc = xprt_unregister_transport ( & xprt_rdma ) ;
if ( rc )
dprintk ( " RPC: %s: xprt_unregister returned %i \n " ,
__func__ , rc ) ;
}
static int __init xprt_rdma_init ( void )
{
int rc ;
rc = xprt_register_transport ( & xprt_rdma ) ;
if ( rc )
return rc ;
dprintk ( KERN_INFO " RPCRDMA Module Init, register RPC RDMA transport \n " ) ;
dprintk ( KERN_INFO " Defaults: \n " ) ;
dprintk ( KERN_INFO " \t Slots %d \n "
" \t MaxInlineRead %d \n \t MaxInlineWrite %d \n " ,
xprt_rdma_slot_table_entries ,
xprt_rdma_max_inline_read , xprt_rdma_max_inline_write ) ;
dprintk ( KERN_INFO " \t Padding %d \n \t Memreg %d \n " ,
xprt_rdma_inline_write_padding , xprt_rdma_memreg_strategy ) ;
# ifdef RPC_DEBUG
if ( ! sunrpc_table_header )
sunrpc_table_header = register_sysctl_table ( sunrpc_table ) ;
# endif
return 0 ;
}
module_init ( xprt_rdma_init ) ;
module_exit ( xprt_rdma_cleanup ) ;