rxrpc: Move call state changes from recvmsg to I/O thread

Move the call state changes that are made in rxrpc_recvmsg() to the I/O
thread.  This means that, thenceforth, only the I/O thread does this and
the call state lock can be removed.

This requires the Rx phase to be ended when the last packet is received,
not when it is processed.

Since this now changes the rxrpc call state to SUCCEEDED before we've
consumed all the data from it, rxrpc_kernel_check_life() mustn't say the
call is dead until the recvmsg queue is empty (unless the call has failed).

Signed-off-by: David Howells <dhowells@redhat.com>
cc: Marc Dionne <marc.dionne@auristor.com>
cc: linux-afs@lists.infradead.org
This commit is contained in:
David Howells 2022-10-26 23:43:00 +01:00
parent 2d689424b6
commit 93368b6bd5
5 changed files with 109 additions and 111 deletions

View File

@ -909,6 +909,7 @@ int afs_extract_data(struct afs_call *call, bool want_more)
ret = rxrpc_kernel_recv_data(net->socket, call->rxcall, iter, ret = rxrpc_kernel_recv_data(net->socket, call->rxcall, iter,
&call->iov_len, want_more, &remote_abort, &call->iov_len, want_more, &remote_abort,
&call->service_id); &call->service_id);
trace_afs_receive_data(call, call->iter, want_more, ret);
if (ret == 0 || ret == -EAGAIN) if (ret == 0 || ret == -EAGAIN)
return ret; return ret;

View File

@ -373,13 +373,17 @@ EXPORT_SYMBOL(rxrpc_kernel_end_call);
* @sock: The socket the call is on * @sock: The socket the call is on
* @call: The call to check * @call: The call to check
* *
* Allow a kernel service to find out whether a call is still alive - * Allow a kernel service to find out whether a call is still alive - whether
* ie. whether it has completed. * it has completed successfully and all received data has been consumed.
*/ */
bool rxrpc_kernel_check_life(const struct socket *sock, bool rxrpc_kernel_check_life(const struct socket *sock,
const struct rxrpc_call *call) const struct rxrpc_call *call)
{ {
return !rxrpc_call_is_complete(call); if (!rxrpc_call_is_complete(call))
return true;
if (call->completion != RXRPC_CALL_SUCCEEDED)
return false;
return !skb_queue_empty(&call->recvmsg_queue);
} }
EXPORT_SYMBOL(rxrpc_kernel_check_life); EXPORT_SYMBOL(rxrpc_kernel_check_life);

View File

@ -545,7 +545,8 @@ enum rxrpc_call_flag {
RXRPC_CALL_KERNEL, /* The call was made by the kernel */ RXRPC_CALL_KERNEL, /* The call was made by the kernel */
RXRPC_CALL_UPGRADE, /* Service upgrade was requested for the call */ RXRPC_CALL_UPGRADE, /* Service upgrade was requested for the call */
RXRPC_CALL_EXCLUSIVE, /* The call uses a once-only connection */ RXRPC_CALL_EXCLUSIVE, /* The call uses a once-only connection */
RXRPC_CALL_RX_IS_IDLE, /* Reception is idle - send an ACK */ RXRPC_CALL_RX_IS_IDLE, /* recvmsg() is idle - send an ACK */
RXRPC_CALL_RECVMSG_READ_ALL, /* recvmsg() read all of the received data */
}; };
/* /*

View File

@ -319,6 +319,41 @@ static bool rxrpc_receiving_reply(struct rxrpc_call *call)
return true; return true;
} }
/*
* End the packet reception phase.
*/
static void rxrpc_end_rx_phase(struct rxrpc_call *call, rxrpc_serial_t serial)
{
rxrpc_seq_t whigh = READ_ONCE(call->rx_highest_seq);
_enter("%d,%s", call->debug_id, rxrpc_call_states[call->state]);
trace_rxrpc_receive(call, rxrpc_receive_end, 0, whigh);
if (rxrpc_call_state(call) == RXRPC_CALL_CLIENT_RECV_REPLY)
rxrpc_propose_delay_ACK(call, serial, rxrpc_propose_ack_terminal_ack);
write_lock(&call->state_lock);
switch (call->state) {
case RXRPC_CALL_CLIENT_RECV_REPLY:
__rxrpc_call_completed(call);
write_unlock(&call->state_lock);
break;
case RXRPC_CALL_SERVER_RECV_REQUEST:
call->state = RXRPC_CALL_SERVER_ACK_REQUEST;
call->expect_req_by = jiffies + MAX_JIFFY_OFFSET;
write_unlock(&call->state_lock);
rxrpc_propose_delay_ACK(call, serial,
rxrpc_propose_ack_processing_op);
break;
default:
write_unlock(&call->state_lock);
break;
}
}
static void rxrpc_input_update_ack_window(struct rxrpc_call *call, static void rxrpc_input_update_ack_window(struct rxrpc_call *call,
rxrpc_seq_t window, rxrpc_seq_t wtop) rxrpc_seq_t window, rxrpc_seq_t wtop)
{ {
@ -337,8 +372,9 @@ static void rxrpc_input_queue_data(struct rxrpc_call *call, struct sk_buff *skb,
__skb_queue_tail(&call->recvmsg_queue, skb); __skb_queue_tail(&call->recvmsg_queue, skb);
rxrpc_input_update_ack_window(call, window, wtop); rxrpc_input_update_ack_window(call, window, wtop);
trace_rxrpc_receive(call, last ? why + 1 : why, sp->hdr.serial, sp->hdr.seq); trace_rxrpc_receive(call, last ? why + 1 : why, sp->hdr.serial, sp->hdr.seq);
if (last)
rxrpc_end_rx_phase(call, sp->hdr.serial);
} }
/* /*

View File

@ -100,42 +100,6 @@ static int rxrpc_recvmsg_term(struct rxrpc_call *call, struct msghdr *msg)
return ret; return ret;
} }
/*
* End the packet reception phase.
*/
static void rxrpc_end_rx_phase(struct rxrpc_call *call, rxrpc_serial_t serial)
{
rxrpc_seq_t whigh = READ_ONCE(call->rx_highest_seq);
_enter("%d,%s", call->debug_id, rxrpc_call_states[call->state]);
trace_rxrpc_receive(call, rxrpc_receive_end, 0, whigh);
if (rxrpc_call_state(call) == RXRPC_CALL_CLIENT_RECV_REPLY)
rxrpc_propose_delay_ACK(call, serial, rxrpc_propose_ack_terminal_ack);
write_lock(&call->state_lock);
switch (call->state) {
case RXRPC_CALL_CLIENT_RECV_REPLY:
__rxrpc_call_completed(call);
write_unlock(&call->state_lock);
rxrpc_poke_call(call, rxrpc_call_poke_complete);
break;
case RXRPC_CALL_SERVER_RECV_REQUEST:
call->state = RXRPC_CALL_SERVER_ACK_REQUEST;
call->expect_req_by = jiffies + MAX_JIFFY_OFFSET;
write_unlock(&call->state_lock);
rxrpc_propose_delay_ACK(call, serial,
rxrpc_propose_ack_processing_op);
break;
default:
write_unlock(&call->state_lock);
break;
}
}
/* /*
* Discard a packet we've used up and advance the Rx window by one. * Discard a packet we've used up and advance the Rx window by one.
*/ */
@ -166,10 +130,9 @@ static void rxrpc_rotate_rx_window(struct rxrpc_call *call)
trace_rxrpc_receive(call, last ? rxrpc_receive_rotate_last : rxrpc_receive_rotate, trace_rxrpc_receive(call, last ? rxrpc_receive_rotate_last : rxrpc_receive_rotate,
serial, call->rx_consumed); serial, call->rx_consumed);
if (last) {
rxrpc_end_rx_phase(call, serial); if (last)
return; set_bit(RXRPC_CALL_RECVMSG_READ_ALL, &call->flags);
}
/* Check to see if there's an ACK that needs sending. */ /* Check to see if there's an ACK that needs sending. */
acked = atomic_add_return(call->rx_consumed - old_consumed, acked = atomic_add_return(call->rx_consumed - old_consumed,
@ -194,7 +157,8 @@ static int rxrpc_verify_data(struct rxrpc_call *call, struct sk_buff *skb)
/* /*
* Deliver messages to a call. This keeps processing packets until the buffer * Deliver messages to a call. This keeps processing packets until the buffer
* is filled and we find either more DATA (returns 0) or the end of the DATA * is filled and we find either more DATA (returns 0) or the end of the DATA
* (returns 1). If more packets are required, it returns -EAGAIN. * (returns 1). If more packets are required, it returns -EAGAIN and if the
* call has failed it returns -EIO.
*/ */
static int rxrpc_recvmsg_data(struct socket *sock, struct rxrpc_call *call, static int rxrpc_recvmsg_data(struct socket *sock, struct rxrpc_call *call,
struct msghdr *msg, struct iov_iter *iter, struct msghdr *msg, struct iov_iter *iter,
@ -210,7 +174,13 @@ static int rxrpc_recvmsg_data(struct socket *sock, struct rxrpc_call *call,
rx_pkt_offset = call->rx_pkt_offset; rx_pkt_offset = call->rx_pkt_offset;
rx_pkt_len = call->rx_pkt_len; rx_pkt_len = call->rx_pkt_len;
if (rxrpc_call_state(call) >= RXRPC_CALL_SERVER_ACK_REQUEST) { if (rxrpc_call_has_failed(call)) {
seq = lower_32_bits(atomic64_read(&call->ackr_window)) - 1;
ret = -EIO;
goto done;
}
if (test_bit(RXRPC_CALL_RECVMSG_READ_ALL, &call->flags)) {
seq = lower_32_bits(atomic64_read(&call->ackr_window)) - 1; seq = lower_32_bits(atomic64_read(&call->ackr_window)) - 1;
ret = 1; ret = 1;
goto done; goto done;
@ -234,14 +204,15 @@ static int rxrpc_recvmsg_data(struct socket *sock, struct rxrpc_call *call,
if (rx_pkt_offset == 0) { if (rx_pkt_offset == 0) {
ret2 = rxrpc_verify_data(call, skb); ret2 = rxrpc_verify_data(call, skb);
rx_pkt_offset = sp->offset;
rx_pkt_len = sp->len;
trace_rxrpc_recvdata(call, rxrpc_recvmsg_next, seq, trace_rxrpc_recvdata(call, rxrpc_recvmsg_next, seq,
rx_pkt_offset, rx_pkt_len, ret2); sp->offset, sp->len, ret2);
if (ret2 < 0) { if (ret2 < 0) {
kdebug("verify = %d", ret2);
ret = ret2; ret = ret2;
goto out; goto out;
} }
rx_pkt_offset = sp->offset;
rx_pkt_len = sp->len;
} else { } else {
trace_rxrpc_recvdata(call, rxrpc_recvmsg_cont, seq, trace_rxrpc_recvdata(call, rxrpc_recvmsg_cont, seq,
rx_pkt_offset, rx_pkt_len, 0); rx_pkt_offset, rx_pkt_len, 0);
@ -416,36 +387,36 @@ try_again:
msg->msg_namelen = len; msg->msg_namelen = len;
} }
switch (rxrpc_call_state(call)) { ret = rxrpc_recvmsg_data(sock, call, msg, &msg->msg_iter, len,
case RXRPC_CALL_CLIENT_RECV_REPLY: flags, &copied);
case RXRPC_CALL_SERVER_RECV_REQUEST: if (ret == -EAGAIN)
case RXRPC_CALL_SERVER_ACK_REQUEST:
ret = rxrpc_recvmsg_data(sock, call, msg, &msg->msg_iter, len,
flags, &copied);
if (ret == -EAGAIN)
ret = 0;
if (!skb_queue_empty(&call->recvmsg_queue))
rxrpc_notify_socket(call);
break;
default:
ret = 0; ret = 0;
break; if (ret == -EIO)
} goto call_failed;
if (ret < 0) if (ret < 0)
goto error_unlock_call; goto error_unlock_call;
if (rxrpc_call_is_complete(call)) { if (rxrpc_call_is_complete(call) &&
ret = rxrpc_recvmsg_term(call, msg); skb_queue_empty(&call->recvmsg_queue))
if (ret < 0) goto call_complete;
goto error_unlock_call; if (rxrpc_call_has_failed(call))
if (!(flags & MSG_PEEK)) goto call_failed;
rxrpc_release_call(rx, call);
msg->msg_flags |= MSG_EOR;
ret = 1;
}
rxrpc_notify_socket(call);
goto not_yet_complete;
call_failed:
rxrpc_purge_queue(&call->recvmsg_queue);
call_complete:
ret = rxrpc_recvmsg_term(call, msg);
if (ret < 0)
goto error_unlock_call;
if (!(flags & MSG_PEEK))
rxrpc_release_call(rx, call);
msg->msg_flags |= MSG_EOR;
ret = 1;
not_yet_complete:
if (ret == 0) if (ret == 0)
msg->msg_flags |= MSG_MORE; msg->msg_flags |= MSG_MORE;
else else
@ -508,49 +479,34 @@ int rxrpc_kernel_recv_data(struct socket *sock, struct rxrpc_call *call,
size_t offset = 0; size_t offset = 0;
int ret; int ret;
_enter("{%d,%s},%zu,%d", _enter("{%d},%zu,%d", call->debug_id, *_len, want_more);
call->debug_id, rxrpc_call_states[call->state],
*_len, want_more);
ASSERTCMP(call->state, !=, RXRPC_CALL_SERVER_SECURING);
mutex_lock(&call->user_mutex); mutex_lock(&call->user_mutex);
switch (rxrpc_call_state(call)) { ret = rxrpc_recvmsg_data(sock, call, NULL, iter, *_len, 0, &offset);
case RXRPC_CALL_CLIENT_RECV_REPLY: *_len -= offset;
case RXRPC_CALL_SERVER_RECV_REQUEST: if (ret == -EIO)
case RXRPC_CALL_SERVER_ACK_REQUEST: goto call_failed;
ret = rxrpc_recvmsg_data(sock, call, NULL, iter, if (ret < 0)
*_len, 0, &offset);
*_len -= offset;
if (ret < 0)
goto out;
/* We can only reach here with a partially full buffer if we
* have reached the end of the data. We must otherwise have a
* full buffer or have been given -EAGAIN.
*/
if (ret == 1) {
if (iov_iter_count(iter) > 0)
goto short_data;
if (!want_more)
goto read_phase_complete;
ret = 0;
goto out;
}
if (!want_more)
goto excess_data;
goto out; goto out;
case RXRPC_CALL_COMPLETE: /* We can only reach here with a partially full buffer if we have
goto call_complete; * reached the end of the data. We must otherwise have a full buffer
* or have been given -EAGAIN.
default: */
ret = -EINPROGRESS; if (ret == 1) {
if (iov_iter_count(iter) > 0)
goto short_data;
if (!want_more)
goto read_phase_complete;
ret = 0;
goto out; goto out;
} }
if (!want_more)
goto excess_data;
goto out;
read_phase_complete: read_phase_complete:
ret = 1; ret = 1;
out: out:
@ -572,7 +528,7 @@ excess_data:
0, -EMSGSIZE); 0, -EMSGSIZE);
ret = -EMSGSIZE; ret = -EMSGSIZE;
goto out; goto out;
call_complete: call_failed:
*_abort = call->abort_code; *_abort = call->abort_code;
ret = call->error; ret = call->error;
if (call->completion == RXRPC_CALL_SUCCEEDED) { if (call->completion == RXRPC_CALL_SUCCEEDED) {