linux/net/smc/smc_cdc.c
Dust Li dcd2cf5f2f net/smc: add autocorking support
This patch adds autocorking support for SMC which could improve
throughput for small message by x3+.

The main idea is borrowed from TCP autocorking with some RDMA
specific modification:
1. The first message should never cork to make sure we won't
   bring extra latency
2. If we have posted any Tx WRs to the NIC that have not
   completed, cork the new messages until:
   a) Receive CQE for the last Tx WR
   b) We have corked enough message on the connection
3. Try to push the corked data out when we receive CQE of
   the last Tx WR to prevent the corked messages hang in
   the send queue.

Both SMC autocorking and TCP autocorking check the TX completion
to decide whether we should cork or not. The difference is
when we got a SMC Tx WR completion, the data have been confirmed
by the RNIC while TCP TX completion just tells us the data
have been sent out by the local NIC.

Add an atomic variable tx_pushing in smc_connection to make
sure only one can send to let it cork more and save CDC slot.

SMC autocorking should not bring extra latency since the first
message will always been sent out immediately.

The qperf tcp_bw test shows more than x4 increase under small
message size with Mellanox connectX4-Lx, same result with other
throughput benchmarks like sockperf/netperf.
The qperf tcp_lat test shows SMC autocorking has not increase any
ping-pong latency.

Test command:
 client: smc_run taskset -c 1 qperf smc-server -oo msg_size:1:64K:*2 \
			-t 30 -vu tcp_{bw|lat}
 server: smc_run taskset -c 1 qperf

=== Bandwidth ====
MsgSize(Bytes)  SMC-NoCork           TCP                      SMC-AutoCorking
      1         0.578 MB/s       2.392 MB/s(313.57%)        2.647 MB/s(357.72%)
      2         1.159 MB/s       4.780 MB/s(312.53%)        5.153 MB/s(344.71%)
      4         2.283 MB/s      10.266 MB/s(349.77%)       10.363 MB/s(354.02%)
      8         4.668 MB/s      19.040 MB/s(307.86%)       21.215 MB/s(354.45%)
     16         9.147 MB/s      38.904 MB/s(325.31%)       41.740 MB/s(356.32%)
     32        18.369 MB/s      79.587 MB/s(333.25%)       82.392 MB/s(348.52%)
     64        36.562 MB/s     148.668 MB/s(306.61%)      161.564 MB/s(341.89%)
    128        72.961 MB/s     274.913 MB/s(276.80%)      325.363 MB/s(345.94%)
    256       144.705 MB/s     512.059 MB/s(253.86%)      633.743 MB/s(337.96%)
    512       288.873 MB/s     884.977 MB/s(206.35%)     1250.681 MB/s(332.95%)
   1024       574.180 MB/s    1337.736 MB/s(132.98%)     2246.121 MB/s(291.19%)
   2048      1095.192 MB/s    1865.952 MB/s( 70.38%)     2057.767 MB/s( 87.89%)
   4096      2066.157 MB/s    2380.337 MB/s( 15.21%)     2173.983 MB/s(  5.22%)
   8192      3717.198 MB/s    2733.073 MB/s(-26.47%)     3491.223 MB/s( -6.08%)
  16384      4742.221 MB/s    2958.693 MB/s(-37.61%)     4637.692 MB/s( -2.20%)
  32768      5349.550 MB/s    3061.285 MB/s(-42.77%)     5385.796 MB/s(  0.68%)
  65536      5162.919 MB/s    3731.408 MB/s(-27.73%)     5223.890 MB/s(  1.18%)
==== Latency ====
MsgSize(Bytes)   SMC-NoCork         TCP                    SMC-AutoCorking
      1          10.540 us      11.938 us( 13.26%)       10.573 us(  0.31%)
      2          10.996 us      11.992 us(  9.06%)       10.269 us( -6.61%)
      4          10.229 us      11.687 us( 14.25%)       10.240 us(  0.11%)
      8          10.203 us      11.653 us( 14.21%)       10.402 us(  1.95%)
     16          10.530 us      11.313 us(  7.44%)       10.599 us(  0.66%)
     32          10.241 us      11.586 us( 13.13%)       10.223 us( -0.18%)
     64          10.693 us      11.652 us(  8.97%)       10.251 us( -4.13%)
    128          10.597 us      11.579 us(  9.27%)       10.494 us( -0.97%)
    256          10.409 us      11.957 us( 14.87%)       10.710 us(  2.89%)
    512          11.088 us      12.505 us( 12.78%)       10.547 us( -4.88%)
   1024          11.240 us      12.255 us(  9.03%)       10.787 us( -4.03%)
   2048          11.485 us      16.970 us( 47.76%)       11.256 us( -1.99%)
   4096          12.077 us      13.948 us( 15.49%)       12.230 us(  1.27%)
   8192          13.683 us      16.693 us( 22.00%)       13.786 us(  0.75%)
  16384          16.470 us      23.615 us( 43.38%)       16.459 us( -0.07%)
  32768          22.540 us      40.966 us( 81.75%)       23.284 us(  3.30%)
  65536          34.192 us      73.003 us(113.51%)       34.233 us(  0.12%)

With SMC autocorking support, we can archive better throughput
than TCP in most message sizes without any latency trade-off.

Signed-off-by: Dust Li <dust.li@linux.alibaba.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
2022-03-01 14:25:12 +00:00

483 lines
14 KiB
C

// SPDX-License-Identifier: GPL-2.0
/*
* Shared Memory Communications over RDMA (SMC-R) and RoCE
*
* Connection Data Control (CDC)
* handles flow control
*
* Copyright IBM Corp. 2016
*
* Author(s): Ursula Braun <ubraun@linux.vnet.ibm.com>
*/
#include <linux/spinlock.h>
#include "smc.h"
#include "smc_wr.h"
#include "smc_cdc.h"
#include "smc_tx.h"
#include "smc_rx.h"
#include "smc_close.h"
/********************************** send *************************************/
/* handler for send/transmission completion of a CDC msg */
static void smc_cdc_tx_handler(struct smc_wr_tx_pend_priv *pnd_snd,
struct smc_link *link,
enum ib_wc_status wc_status)
{
struct smc_cdc_tx_pend *cdcpend = (struct smc_cdc_tx_pend *)pnd_snd;
struct smc_connection *conn = cdcpend->conn;
struct smc_sock *smc;
int diff;
smc = container_of(conn, struct smc_sock, conn);
bh_lock_sock(&smc->sk);
if (!wc_status) {
diff = smc_curs_diff(cdcpend->conn->sndbuf_desc->len,
&cdcpend->conn->tx_curs_fin,
&cdcpend->cursor);
/* sndbuf_space is decreased in smc_sendmsg */
smp_mb__before_atomic();
atomic_add(diff, &cdcpend->conn->sndbuf_space);
/* guarantee 0 <= sndbuf_space <= sndbuf_desc->len */
smp_mb__after_atomic();
smc_curs_copy(&conn->tx_curs_fin, &cdcpend->cursor, conn);
smc_curs_copy(&conn->local_tx_ctrl_fin, &cdcpend->p_cursor,
conn);
conn->tx_cdc_seq_fin = cdcpend->ctrl_seq;
}
if (atomic_dec_and_test(&conn->cdc_pend_tx_wr)) {
/* If this is the last pending WR complete, we must push to
* prevent hang when autocork enabled.
*/
smc_tx_sndbuf_nonempty(conn);
if (unlikely(wq_has_sleeper(&conn->cdc_pend_tx_wq)))
wake_up(&conn->cdc_pend_tx_wq);
}
WARN_ON(atomic_read(&conn->cdc_pend_tx_wr) < 0);
smc_tx_sndbuf_nonfull(smc);
bh_unlock_sock(&smc->sk);
}
int smc_cdc_get_free_slot(struct smc_connection *conn,
struct smc_link *link,
struct smc_wr_buf **wr_buf,
struct smc_rdma_wr **wr_rdma_buf,
struct smc_cdc_tx_pend **pend)
{
int rc;
rc = smc_wr_tx_get_free_slot(link, smc_cdc_tx_handler, wr_buf,
wr_rdma_buf,
(struct smc_wr_tx_pend_priv **)pend);
if (conn->killed) {
/* abnormal termination */
if (!rc)
smc_wr_tx_put_slot(link,
(struct smc_wr_tx_pend_priv *)pend);
rc = -EPIPE;
}
return rc;
}
static inline void smc_cdc_add_pending_send(struct smc_connection *conn,
struct smc_cdc_tx_pend *pend)
{
BUILD_BUG_ON_MSG(
sizeof(struct smc_cdc_msg) > SMC_WR_BUF_SIZE,
"must increase SMC_WR_BUF_SIZE to at least sizeof(struct smc_cdc_msg)");
BUILD_BUG_ON_MSG(
offsetofend(struct smc_cdc_msg, reserved) > SMC_WR_TX_SIZE,
"must adapt SMC_WR_TX_SIZE to sizeof(struct smc_cdc_msg); if not all smc_wr upper layer protocols use the same message size any more, must start to set link->wr_tx_sges[i].length on each individual smc_wr_tx_send()");
BUILD_BUG_ON_MSG(
sizeof(struct smc_cdc_tx_pend) > SMC_WR_TX_PEND_PRIV_SIZE,
"must increase SMC_WR_TX_PEND_PRIV_SIZE to at least sizeof(struct smc_cdc_tx_pend)");
pend->conn = conn;
pend->cursor = conn->tx_curs_sent;
pend->p_cursor = conn->local_tx_ctrl.prod;
pend->ctrl_seq = conn->tx_cdc_seq;
}
int smc_cdc_msg_send(struct smc_connection *conn,
struct smc_wr_buf *wr_buf,
struct smc_cdc_tx_pend *pend)
{
struct smc_link *link = conn->lnk;
union smc_host_cursor cfed;
int rc;
smc_cdc_add_pending_send(conn, pend);
conn->tx_cdc_seq++;
conn->local_tx_ctrl.seqno = conn->tx_cdc_seq;
smc_host_msg_to_cdc((struct smc_cdc_msg *)wr_buf, conn, &cfed);
atomic_inc(&conn->cdc_pend_tx_wr);
smp_mb__after_atomic(); /* Make sure cdc_pend_tx_wr added before post */
rc = smc_wr_tx_send(link, (struct smc_wr_tx_pend_priv *)pend);
if (!rc) {
smc_curs_copy(&conn->rx_curs_confirmed, &cfed, conn);
conn->local_rx_ctrl.prod_flags.cons_curs_upd_req = 0;
} else {
conn->tx_cdc_seq--;
conn->local_tx_ctrl.seqno = conn->tx_cdc_seq;
atomic_dec(&conn->cdc_pend_tx_wr);
}
return rc;
}
/* send a validation msg indicating the move of a conn to an other QP link */
int smcr_cdc_msg_send_validation(struct smc_connection *conn,
struct smc_cdc_tx_pend *pend,
struct smc_wr_buf *wr_buf)
{
struct smc_host_cdc_msg *local = &conn->local_tx_ctrl;
struct smc_link *link = conn->lnk;
struct smc_cdc_msg *peer;
int rc;
peer = (struct smc_cdc_msg *)wr_buf;
peer->common.type = local->common.type;
peer->len = local->len;
peer->seqno = htons(conn->tx_cdc_seq_fin); /* seqno last compl. tx */
peer->token = htonl(local->token);
peer->prod_flags.failover_validation = 1;
/* We need to set pend->conn here to make sure smc_cdc_tx_handler()
* can handle properly
*/
smc_cdc_add_pending_send(conn, pend);
atomic_inc(&conn->cdc_pend_tx_wr);
smp_mb__after_atomic(); /* Make sure cdc_pend_tx_wr added before post */
rc = smc_wr_tx_send(link, (struct smc_wr_tx_pend_priv *)pend);
if (unlikely(rc))
atomic_dec(&conn->cdc_pend_tx_wr);
return rc;
}
static int smcr_cdc_get_slot_and_msg_send(struct smc_connection *conn)
{
struct smc_cdc_tx_pend *pend;
struct smc_wr_buf *wr_buf;
struct smc_link *link;
bool again = false;
int rc;
again:
link = conn->lnk;
if (!smc_wr_tx_link_hold(link))
return -ENOLINK;
rc = smc_cdc_get_free_slot(conn, link, &wr_buf, NULL, &pend);
if (rc)
goto put_out;
spin_lock_bh(&conn->send_lock);
if (link != conn->lnk) {
/* link of connection changed, try again one time*/
spin_unlock_bh(&conn->send_lock);
smc_wr_tx_put_slot(link,
(struct smc_wr_tx_pend_priv *)pend);
smc_wr_tx_link_put(link);
if (again)
return -ENOLINK;
again = true;
goto again;
}
rc = smc_cdc_msg_send(conn, wr_buf, pend);
spin_unlock_bh(&conn->send_lock);
put_out:
smc_wr_tx_link_put(link);
return rc;
}
int smc_cdc_get_slot_and_msg_send(struct smc_connection *conn)
{
int rc;
if (!smc_conn_lgr_valid(conn) ||
(conn->lgr->is_smcd && conn->lgr->peer_shutdown))
return -EPIPE;
if (conn->lgr->is_smcd) {
spin_lock_bh(&conn->send_lock);
rc = smcd_cdc_msg_send(conn);
spin_unlock_bh(&conn->send_lock);
} else {
rc = smcr_cdc_get_slot_and_msg_send(conn);
}
return rc;
}
void smc_cdc_wait_pend_tx_wr(struct smc_connection *conn)
{
wait_event(conn->cdc_pend_tx_wq, !atomic_read(&conn->cdc_pend_tx_wr));
}
/* Send a SMC-D CDC header.
* This increments the free space available in our send buffer.
* Also update the confirmed receive buffer with what was sent to the peer.
*/
int smcd_cdc_msg_send(struct smc_connection *conn)
{
struct smc_sock *smc = container_of(conn, struct smc_sock, conn);
union smc_host_cursor curs;
struct smcd_cdc_msg cdc;
int rc, diff;
memset(&cdc, 0, sizeof(cdc));
cdc.common.type = SMC_CDC_MSG_TYPE;
curs.acurs.counter = atomic64_read(&conn->local_tx_ctrl.prod.acurs);
cdc.prod.wrap = curs.wrap;
cdc.prod.count = curs.count;
curs.acurs.counter = atomic64_read(&conn->local_tx_ctrl.cons.acurs);
cdc.cons.wrap = curs.wrap;
cdc.cons.count = curs.count;
cdc.cons.prod_flags = conn->local_tx_ctrl.prod_flags;
cdc.cons.conn_state_flags = conn->local_tx_ctrl.conn_state_flags;
rc = smcd_tx_ism_write(conn, &cdc, sizeof(cdc), 0, 1);
if (rc)
return rc;
smc_curs_copy(&conn->rx_curs_confirmed, &curs, conn);
conn->local_rx_ctrl.prod_flags.cons_curs_upd_req = 0;
/* Calculate transmitted data and increment free send buffer space */
diff = smc_curs_diff(conn->sndbuf_desc->len, &conn->tx_curs_fin,
&conn->tx_curs_sent);
/* increased by confirmed number of bytes */
smp_mb__before_atomic();
atomic_add(diff, &conn->sndbuf_space);
/* guarantee 0 <= sndbuf_space <= sndbuf_desc->len */
smp_mb__after_atomic();
smc_curs_copy(&conn->tx_curs_fin, &conn->tx_curs_sent, conn);
smc_tx_sndbuf_nonfull(smc);
return rc;
}
/********************************* receive ***********************************/
static inline bool smc_cdc_before(u16 seq1, u16 seq2)
{
return (s16)(seq1 - seq2) < 0;
}
static void smc_cdc_handle_urg_data_arrival(struct smc_sock *smc,
int *diff_prod)
{
struct smc_connection *conn = &smc->conn;
char *base;
/* new data included urgent business */
smc_curs_copy(&conn->urg_curs, &conn->local_rx_ctrl.prod, conn);
conn->urg_state = SMC_URG_VALID;
if (!sock_flag(&smc->sk, SOCK_URGINLINE))
/* we'll skip the urgent byte, so don't account for it */
(*diff_prod)--;
base = (char *)conn->rmb_desc->cpu_addr + conn->rx_off;
if (conn->urg_curs.count)
conn->urg_rx_byte = *(base + conn->urg_curs.count - 1);
else
conn->urg_rx_byte = *(base + conn->rmb_desc->len - 1);
sk_send_sigurg(&smc->sk);
}
static void smc_cdc_msg_validate(struct smc_sock *smc, struct smc_cdc_msg *cdc,
struct smc_link *link)
{
struct smc_connection *conn = &smc->conn;
u16 recv_seq = ntohs(cdc->seqno);
s16 diff;
/* check that seqnum was seen before */
diff = conn->local_rx_ctrl.seqno - recv_seq;
if (diff < 0) { /* diff larger than 0x7fff */
/* drop connection */
conn->out_of_sync = 1; /* prevent any further receives */
spin_lock_bh(&conn->send_lock);
conn->local_tx_ctrl.conn_state_flags.peer_conn_abort = 1;
conn->lnk = link;
spin_unlock_bh(&conn->send_lock);
sock_hold(&smc->sk); /* sock_put in abort_work */
if (!queue_work(smc_close_wq, &conn->abort_work))
sock_put(&smc->sk);
}
}
static void smc_cdc_msg_recv_action(struct smc_sock *smc,
struct smc_cdc_msg *cdc)
{
union smc_host_cursor cons_old, prod_old;
struct smc_connection *conn = &smc->conn;
int diff_cons, diff_prod;
smc_curs_copy(&prod_old, &conn->local_rx_ctrl.prod, conn);
smc_curs_copy(&cons_old, &conn->local_rx_ctrl.cons, conn);
smc_cdc_msg_to_host(&conn->local_rx_ctrl, cdc, conn);
diff_cons = smc_curs_diff(conn->peer_rmbe_size, &cons_old,
&conn->local_rx_ctrl.cons);
if (diff_cons) {
/* peer_rmbe_space is decreased during data transfer with RDMA
* write
*/
smp_mb__before_atomic();
atomic_add(diff_cons, &conn->peer_rmbe_space);
/* guarantee 0 <= peer_rmbe_space <= peer_rmbe_size */
smp_mb__after_atomic();
}
diff_prod = smc_curs_diff(conn->rmb_desc->len, &prod_old,
&conn->local_rx_ctrl.prod);
if (diff_prod) {
if (conn->local_rx_ctrl.prod_flags.urg_data_present)
smc_cdc_handle_urg_data_arrival(smc, &diff_prod);
/* bytes_to_rcv is decreased in smc_recvmsg */
smp_mb__before_atomic();
atomic_add(diff_prod, &conn->bytes_to_rcv);
/* guarantee 0 <= bytes_to_rcv <= rmb_desc->len */
smp_mb__after_atomic();
smc->sk.sk_data_ready(&smc->sk);
} else {
if (conn->local_rx_ctrl.prod_flags.write_blocked)
smc->sk.sk_data_ready(&smc->sk);
if (conn->local_rx_ctrl.prod_flags.urg_data_pending)
conn->urg_state = SMC_URG_NOTYET;
}
/* trigger sndbuf consumer: RDMA write into peer RMBE and CDC */
if ((diff_cons && smc_tx_prepared_sends(conn)) ||
conn->local_rx_ctrl.prod_flags.cons_curs_upd_req ||
conn->local_rx_ctrl.prod_flags.urg_data_pending)
smc_tx_sndbuf_nonempty(conn);
if (diff_cons && conn->urg_tx_pend &&
atomic_read(&conn->peer_rmbe_space) == conn->peer_rmbe_size) {
/* urg data confirmed by peer, indicate we're ready for more */
conn->urg_tx_pend = false;
smc->sk.sk_write_space(&smc->sk);
}
if (conn->local_rx_ctrl.conn_state_flags.peer_conn_abort) {
smc->sk.sk_err = ECONNRESET;
conn->local_tx_ctrl.conn_state_flags.peer_conn_abort = 1;
}
if (smc_cdc_rxed_any_close_or_senddone(conn)) {
smc->sk.sk_shutdown |= RCV_SHUTDOWN;
if (smc->clcsock && smc->clcsock->sk)
smc->clcsock->sk->sk_shutdown |= RCV_SHUTDOWN;
sock_set_flag(&smc->sk, SOCK_DONE);
sock_hold(&smc->sk); /* sock_put in close_work */
if (!queue_work(smc_close_wq, &conn->close_work))
sock_put(&smc->sk);
}
}
/* called under tasklet context */
static void smc_cdc_msg_recv(struct smc_sock *smc, struct smc_cdc_msg *cdc)
{
sock_hold(&smc->sk);
bh_lock_sock(&smc->sk);
smc_cdc_msg_recv_action(smc, cdc);
bh_unlock_sock(&smc->sk);
sock_put(&smc->sk); /* no free sk in softirq-context */
}
/* Schedule a tasklet for this connection. Triggered from the ISM device IRQ
* handler to indicate update in the DMBE.
*
* Context:
* - tasklet context
*/
static void smcd_cdc_rx_tsklet(struct tasklet_struct *t)
{
struct smc_connection *conn = from_tasklet(conn, t, rx_tsklet);
struct smcd_cdc_msg *data_cdc;
struct smcd_cdc_msg cdc;
struct smc_sock *smc;
if (!conn || conn->killed)
return;
data_cdc = (struct smcd_cdc_msg *)conn->rmb_desc->cpu_addr;
smcd_curs_copy(&cdc.prod, &data_cdc->prod, conn);
smcd_curs_copy(&cdc.cons, &data_cdc->cons, conn);
smc = container_of(conn, struct smc_sock, conn);
smc_cdc_msg_recv(smc, (struct smc_cdc_msg *)&cdc);
}
/* Initialize receive tasklet. Called from ISM device IRQ handler to start
* receiver side.
*/
void smcd_cdc_rx_init(struct smc_connection *conn)
{
tasklet_setup(&conn->rx_tsklet, smcd_cdc_rx_tsklet);
}
/***************************** init, exit, misc ******************************/
static void smc_cdc_rx_handler(struct ib_wc *wc, void *buf)
{
struct smc_link *link = (struct smc_link *)wc->qp->qp_context;
struct smc_cdc_msg *cdc = buf;
struct smc_connection *conn;
struct smc_link_group *lgr;
struct smc_sock *smc;
if (wc->byte_len < offsetof(struct smc_cdc_msg, reserved))
return; /* short message */
if (cdc->len != SMC_WR_TX_SIZE)
return; /* invalid message */
/* lookup connection */
lgr = smc_get_lgr(link);
read_lock_bh(&lgr->conns_lock);
conn = smc_lgr_find_conn(ntohl(cdc->token), lgr);
read_unlock_bh(&lgr->conns_lock);
if (!conn || conn->out_of_sync)
return;
smc = container_of(conn, struct smc_sock, conn);
if (cdc->prod_flags.failover_validation) {
smc_cdc_msg_validate(smc, cdc, link);
return;
}
if (smc_cdc_before(ntohs(cdc->seqno),
conn->local_rx_ctrl.seqno))
/* received seqno is old */
return;
smc_cdc_msg_recv(smc, cdc);
}
static struct smc_wr_rx_handler smc_cdc_rx_handlers[] = {
{
.handler = smc_cdc_rx_handler,
.type = SMC_CDC_MSG_TYPE
},
{
.handler = NULL,
}
};
int __init smc_cdc_init(void)
{
struct smc_wr_rx_handler *handler;
int rc = 0;
for (handler = smc_cdc_rx_handlers; handler->handler; handler++) {
INIT_HLIST_NODE(&handler->list);
rc = smc_wr_rx_register_handler(handler);
if (rc)
break;
}
return rc;
}