tipc: eliminate race condition at multicast reception

In a previous commit in this series we resolved a race problem during
unicast message reception.

Here, we resolve the same problem at multicast reception. We apply the
same technique: an input queue serializing the delivery of arriving
buffers. The main difference is that here we do it in two steps.
First, the broadcast link feeds arriving buffers into the tail of an
arrival queue, which head is consumed at the socket level, and where
destination lookup is performed. Second, if the lookup is successful,
the resulting buffer clones are fed into a second queue, the input
queue. This queue is consumed at reception in the socket just like
in the unicast case. Both queues are protected by the same lock, -the
one of the input queue.

Reviewed-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
This commit is contained in:
Jon Paul Maloy 2015-02-05 08:36:44 -05:00 committed by David S. Miller
parent 3c724acdd5
commit cb1b728096
8 changed files with 113 additions and 58 deletions

View File

@ -79,6 +79,13 @@ static void tipc_bclink_unlock(struct net *net)
tipc_link_reset_all(node); tipc_link_reset_all(node);
} }
void tipc_bclink_input(struct net *net)
{
struct tipc_net *tn = net_generic(net, tipc_net_id);
tipc_sk_mcast_rcv(net, &tn->bclink->arrvq, &tn->bclink->inputq);
}
uint tipc_bclink_get_mtu(void) uint tipc_bclink_get_mtu(void)
{ {
return MAX_PKT_DEFAULT_MCAST; return MAX_PKT_DEFAULT_MCAST;
@ -356,7 +363,7 @@ static void bclink_peek_nack(struct net *net, struct tipc_msg *msg)
tipc_node_unlock(n_ptr); tipc_node_unlock(n_ptr);
} }
/* tipc_bclink_xmit - broadcast buffer chain to all nodes in cluster /* tipc_bclink_xmit - deliver buffer chain to all nodes in cluster
* and to identified node local sockets * and to identified node local sockets
* @net: the applicable net namespace * @net: the applicable net namespace
* @list: chain of buffers containing message * @list: chain of buffers containing message
@ -371,6 +378,8 @@ int tipc_bclink_xmit(struct net *net, struct sk_buff_head *list)
int rc = 0; int rc = 0;
int bc = 0; int bc = 0;
struct sk_buff *skb; struct sk_buff *skb;
struct sk_buff_head arrvq;
struct sk_buff_head inputq;
/* Prepare clone of message for local node */ /* Prepare clone of message for local node */
skb = tipc_msg_reassemble(list); skb = tipc_msg_reassemble(list);
@ -379,7 +388,7 @@ int tipc_bclink_xmit(struct net *net, struct sk_buff_head *list)
return -EHOSTUNREACH; return -EHOSTUNREACH;
} }
/* Broadcast to all other nodes */ /* Broadcast to all nodes */
if (likely(bclink)) { if (likely(bclink)) {
tipc_bclink_lock(net); tipc_bclink_lock(net);
if (likely(bclink->bcast_nodes.count)) { if (likely(bclink->bcast_nodes.count)) {
@ -399,12 +408,15 @@ int tipc_bclink_xmit(struct net *net, struct sk_buff_head *list)
if (unlikely(!bc)) if (unlikely(!bc))
__skb_queue_purge(list); __skb_queue_purge(list);
/* Deliver message clone */ if (unlikely(rc)) {
if (likely(!rc))
tipc_sk_mcast_rcv(net, skb);
else
kfree_skb(skb); kfree_skb(skb);
return rc;
}
/* Deliver message clone */
__skb_queue_head_init(&arrvq);
skb_queue_head_init(&inputq);
__skb_queue_tail(&arrvq, skb);
tipc_sk_mcast_rcv(net, &arrvq, &inputq);
return rc; return rc;
} }
@ -449,7 +461,7 @@ void tipc_bclink_rcv(struct net *net, struct sk_buff *buf)
int deferred = 0; int deferred = 0;
int pos = 0; int pos = 0;
struct sk_buff *iskb; struct sk_buff *iskb;
struct sk_buff_head msgs; struct sk_buff_head *arrvq, *inputq;
/* Screen out unwanted broadcast messages */ /* Screen out unwanted broadcast messages */
if (msg_mc_netid(msg) != tn->net_id) if (msg_mc_netid(msg) != tn->net_id)
@ -486,6 +498,8 @@ void tipc_bclink_rcv(struct net *net, struct sk_buff *buf)
/* Handle in-sequence broadcast message */ /* Handle in-sequence broadcast message */
seqno = msg_seqno(msg); seqno = msg_seqno(msg);
next_in = mod(node->bclink.last_in + 1); next_in = mod(node->bclink.last_in + 1);
arrvq = &tn->bclink->arrvq;
inputq = &tn->bclink->inputq;
if (likely(seqno == next_in)) { if (likely(seqno == next_in)) {
receive: receive:
@ -493,21 +507,26 @@ receive:
if (likely(msg_isdata(msg))) { if (likely(msg_isdata(msg))) {
tipc_bclink_lock(net); tipc_bclink_lock(net);
bclink_accept_pkt(node, seqno); bclink_accept_pkt(node, seqno);
spin_lock_bh(&inputq->lock);
__skb_queue_tail(arrvq, buf);
spin_unlock_bh(&inputq->lock);
node->action_flags |= TIPC_BCAST_MSG_EVT;
tipc_bclink_unlock(net); tipc_bclink_unlock(net);
tipc_node_unlock(node); tipc_node_unlock(node);
if (likely(msg_mcast(msg)))
tipc_sk_mcast_rcv(net, buf);
else
kfree_skb(buf);
} else if (msg_user(msg) == MSG_BUNDLER) { } else if (msg_user(msg) == MSG_BUNDLER) {
tipc_bclink_lock(net); tipc_bclink_lock(net);
bclink_accept_pkt(node, seqno); bclink_accept_pkt(node, seqno);
bcl->stats.recv_bundles++; bcl->stats.recv_bundles++;
bcl->stats.recv_bundled += msg_msgcnt(msg); bcl->stats.recv_bundled += msg_msgcnt(msg);
pos = 0;
while (tipc_msg_extract(buf, &iskb, &pos)) {
spin_lock_bh(&inputq->lock);
__skb_queue_tail(arrvq, iskb);
spin_unlock_bh(&inputq->lock);
}
node->action_flags |= TIPC_BCAST_MSG_EVT;
tipc_bclink_unlock(net); tipc_bclink_unlock(net);
tipc_node_unlock(node); tipc_node_unlock(node);
while (tipc_msg_extract(buf, &iskb, &pos))
tipc_sk_mcast_rcv(net, iskb);
} else if (msg_user(msg) == MSG_FRAGMENTER) { } else if (msg_user(msg) == MSG_FRAGMENTER) {
tipc_buf_append(&node->bclink.reasm_buf, &buf); tipc_buf_append(&node->bclink.reasm_buf, &buf);
if (unlikely(!buf && !node->bclink.reasm_buf)) if (unlikely(!buf && !node->bclink.reasm_buf))
@ -523,14 +542,6 @@ receive:
} }
tipc_bclink_unlock(net); tipc_bclink_unlock(net);
tipc_node_unlock(node); tipc_node_unlock(node);
} else if (msg_user(msg) == NAME_DISTRIBUTOR) {
tipc_bclink_lock(net);
bclink_accept_pkt(node, seqno);
tipc_bclink_unlock(net);
tipc_node_unlock(node);
skb_queue_head_init(&msgs);
skb_queue_tail(&msgs, buf);
tipc_named_rcv(net, &msgs);
} else { } else {
tipc_bclink_lock(net); tipc_bclink_lock(net);
bclink_accept_pkt(node, seqno); bclink_accept_pkt(node, seqno);
@ -950,6 +961,8 @@ int tipc_bclink_init(struct net *net)
skb_queue_head_init(&bcl->wakeupq); skb_queue_head_init(&bcl->wakeupq);
bcl->next_out_no = 1; bcl->next_out_no = 1;
spin_lock_init(&bclink->node.lock); spin_lock_init(&bclink->node.lock);
__skb_queue_head_init(&bclink->arrvq);
skb_queue_head_init(&bclink->inputq);
bcl->owner = &bclink->node; bcl->owner = &bclink->node;
bcl->owner->net = net; bcl->owner->net = net;
bcl->max_pkt = MAX_PKT_DEFAULT_MCAST; bcl->max_pkt = MAX_PKT_DEFAULT_MCAST;

View File

@ -97,6 +97,8 @@ struct tipc_bclink {
struct tipc_link link; struct tipc_link link;
struct tipc_node node; struct tipc_node node;
unsigned int flags; unsigned int flags;
struct sk_buff_head arrvq;
struct sk_buff_head inputq;
struct tipc_node_map bcast_nodes; struct tipc_node_map bcast_nodes;
struct tipc_node *retransmit_to; struct tipc_node *retransmit_to;
}; };
@ -134,5 +136,6 @@ uint tipc_bclink_get_mtu(void);
int tipc_bclink_xmit(struct net *net, struct sk_buff_head *list); int tipc_bclink_xmit(struct net *net, struct sk_buff_head *list);
void tipc_bclink_wakeup_users(struct net *net); void tipc_bclink_wakeup_users(struct net *net);
int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg); int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg);
void tipc_bclink_input(struct net *net);
#endif #endif

View File

@ -767,6 +767,23 @@ bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, u32 *dnode,
int *err); int *err);
struct sk_buff *tipc_msg_reassemble(struct sk_buff_head *list); struct sk_buff *tipc_msg_reassemble(struct sk_buff_head *list);
/* tipc_skb_peek(): peek and reserve first buffer in list
* @list: list to be peeked in
* Returns pointer to first buffer in list, if any
*/
static inline struct sk_buff *tipc_skb_peek(struct sk_buff_head *list,
spinlock_t *lock)
{
struct sk_buff *skb;
spin_lock_bh(lock);
skb = skb_peek(list);
if (skb)
skb_get(skb);
spin_unlock_bh(lock);
return skb;
}
/* tipc_skb_peek_port(): find a destination port, ignoring all destinations /* tipc_skb_peek_port(): find a destination port, ignoring all destinations
* up to and including 'filter'. * up to and including 'filter'.
* Note: ignoring previously tried destinations minimizes the risk of * Note: ignoring previously tried destinations minimizes the risk of

View File

@ -1,7 +1,7 @@
/* /*
* net/tipc/name_table.h: Include file for TIPC name table code * net/tipc/name_table.h: Include file for TIPC name table code
* *
* Copyright (c) 2000-2006, 2014, Ericsson AB * Copyright (c) 2000-2006, 2014-2015, Ericsson AB
* Copyright (c) 2004-2005, 2010-2011, Wind River Systems * Copyright (c) 2004-2005, 2010-2011, Wind River Systems
* All rights reserved. * All rights reserved.
* *

View File

@ -582,10 +582,10 @@ void tipc_node_unlock(struct tipc_node *node)
namedq = node->namedq; namedq = node->namedq;
publ_list = &node->publ_list; publ_list = &node->publ_list;
node->action_flags &= ~(TIPC_MSG_EVT | TIPC_NOTIFY_NODE_DOWN | node->action_flags &= ~(TIPC_MSG_EVT |
TIPC_NOTIFY_NODE_UP | TIPC_NOTIFY_LINK_UP | TIPC_NOTIFY_NODE_DOWN | TIPC_NOTIFY_NODE_UP |
TIPC_NOTIFY_LINK_DOWN | TIPC_NOTIFY_LINK_DOWN | TIPC_NOTIFY_LINK_UP |
TIPC_WAKEUP_BCAST_USERS | TIPC_WAKEUP_BCAST_USERS | TIPC_BCAST_MSG_EVT |
TIPC_NAMED_MSG_EVT); TIPC_NAMED_MSG_EVT);
spin_unlock_bh(&node->lock); spin_unlock_bh(&node->lock);
@ -612,6 +612,9 @@ void tipc_node_unlock(struct tipc_node *node)
if (flags & TIPC_NAMED_MSG_EVT) if (flags & TIPC_NAMED_MSG_EVT)
tipc_named_rcv(net, namedq); tipc_named_rcv(net, namedq);
if (flags & TIPC_BCAST_MSG_EVT)
tipc_bclink_input(net);
} }
/* Caller should hold node lock for the passed node */ /* Caller should hold node lock for the passed node */

View File

@ -1,7 +1,7 @@
/* /*
* net/tipc/node.h: Include file for TIPC node management routines * net/tipc/node.h: Include file for TIPC node management routines
* *
* Copyright (c) 2000-2006, 2014, Ericsson AB * Copyright (c) 2000-2006, 2014-2015, Ericsson AB
* Copyright (c) 2005, 2010-2014, Wind River Systems * Copyright (c) 2005, 2010-2014, Wind River Systems
* All rights reserved. * All rights reserved.
* *
@ -63,7 +63,8 @@ enum {
TIPC_WAKEUP_BCAST_USERS = (1 << 5), TIPC_WAKEUP_BCAST_USERS = (1 << 5),
TIPC_NOTIFY_LINK_UP = (1 << 6), TIPC_NOTIFY_LINK_UP = (1 << 6),
TIPC_NOTIFY_LINK_DOWN = (1 << 7), TIPC_NOTIFY_LINK_DOWN = (1 << 7),
TIPC_NAMED_MSG_EVT = (1 << 8) TIPC_NAMED_MSG_EVT = (1 << 8),
TIPC_BCAST_MSG_EVT = (1 << 9)
}; };
/** /**
@ -74,6 +75,7 @@ enum {
* @oos_state: state tracker for handling OOS b'cast messages * @oos_state: state tracker for handling OOS b'cast messages
* @deferred_queue: deferred queue saved OOS b'cast message received from node * @deferred_queue: deferred queue saved OOS b'cast message received from node
* @reasm_buf: broadcast reassembly queue head from node * @reasm_buf: broadcast reassembly queue head from node
* @inputq_map: bitmap indicating which inqueues should be kicked
* @recv_permitted: true if node is allowed to receive b'cast messages * @recv_permitted: true if node is allowed to receive b'cast messages
*/ */
struct tipc_node_bclink { struct tipc_node_bclink {
@ -84,6 +86,7 @@ struct tipc_node_bclink {
u32 deferred_size; u32 deferred_size;
struct sk_buff_head deferred_queue; struct sk_buff_head deferred_queue;
struct sk_buff *reasm_buf; struct sk_buff *reasm_buf;
int inputq_map;
bool recv_permitted; bool recv_permitted;
}; };

View File

@ -776,44 +776,60 @@ new_mtu:
return rc; return rc;
} }
/* tipc_sk_mcast_rcv - Deliver multicast message to all destination sockets /**
* tipc_sk_mcast_rcv - Deliver multicast messages to all destination sockets
* @arrvq: queue with arriving messages, to be cloned after destination lookup
* @inputq: queue with cloned messages, delivered to socket after dest lookup
*
* Multi-threaded: parallel calls with reference to same queues may occur
*/ */
void tipc_sk_mcast_rcv(struct net *net, struct sk_buff *skb) void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
struct sk_buff_head *inputq)
{ {
struct tipc_msg *msg = buf_msg(skb); struct tipc_msg *msg;
struct tipc_plist dports; struct tipc_plist dports;
struct sk_buff *cskb;
u32 portid; u32 portid;
u32 scope = TIPC_CLUSTER_SCOPE; u32 scope = TIPC_CLUSTER_SCOPE;
struct sk_buff_head msgq; struct sk_buff_head tmpq;
uint hsz = skb_headroom(skb) + msg_hdr_sz(msg); uint hsz;
struct sk_buff *skb, *_skb;
skb_queue_head_init(&msgq); __skb_queue_head_init(&tmpq);
tipc_plist_init(&dports); tipc_plist_init(&dports);
if (in_own_node(net, msg_orignode(msg))) skb = tipc_skb_peek(arrvq, &inputq->lock);
scope = TIPC_NODE_SCOPE; for (; skb; skb = tipc_skb_peek(arrvq, &inputq->lock)) {
msg = buf_msg(skb);
hsz = skb_headroom(skb) + msg_hdr_sz(msg);
if (unlikely(!msg_mcast(msg))) { if (in_own_node(net, msg_orignode(msg)))
pr_warn("Received non-multicast msg in multicast\n"); scope = TIPC_NODE_SCOPE;
goto exit;
} /* Create destination port list and message clones: */
/* Create destination port list: */ tipc_nametbl_mc_translate(net,
tipc_nametbl_mc_translate(net, msg_nametype(msg), msg_namelower(msg), msg_nametype(msg), msg_namelower(msg),
msg_nameupper(msg), scope, &dports); msg_nameupper(msg), scope, &dports);
portid = tipc_plist_pop(&dports); portid = tipc_plist_pop(&dports);
for (; portid; portid = tipc_plist_pop(&dports)) { for (; portid; portid = tipc_plist_pop(&dports)) {
cskb = __pskb_copy(skb, hsz, GFP_ATOMIC); _skb = __pskb_copy(skb, hsz, GFP_ATOMIC);
if (!cskb) { if (_skb) {
pr_warn("Failed do clone mcast rcv buffer\n"); msg_set_destport(buf_msg(_skb), portid);
continue; __skb_queue_tail(&tmpq, _skb);
continue;
}
pr_warn("Failed to clone mcast rcv buffer\n");
} }
msg_set_destport(buf_msg(cskb), portid); /* Append to inputq if not already done by other thread */
skb_queue_tail(&msgq, cskb); spin_lock_bh(&inputq->lock);
if (skb_peek(arrvq) == skb) {
skb_queue_splice_tail_init(&tmpq, inputq);
kfree_skb(__skb_dequeue(arrvq));
}
spin_unlock_bh(&inputq->lock);
__skb_queue_purge(&tmpq);
kfree_skb(skb);
} }
tipc_sk_rcv(net, &msgq); tipc_sk_rcv(net, inputq);
exit:
kfree_skb(skb);
} }
/** /**

View File

@ -42,7 +42,6 @@
#define TIPC_FLOWCTRL_WIN (TIPC_CONNACK_INTV * 2) #define TIPC_FLOWCTRL_WIN (TIPC_CONNACK_INTV * 2)
#define TIPC_CONN_OVERLOAD_LIMIT ((TIPC_FLOWCTRL_WIN * 2 + 1) * \ #define TIPC_CONN_OVERLOAD_LIMIT ((TIPC_FLOWCTRL_WIN * 2 + 1) * \
SKB_TRUESIZE(TIPC_MAX_USER_MSG_SIZE)) SKB_TRUESIZE(TIPC_MAX_USER_MSG_SIZE))
int tipc_socket_init(void); int tipc_socket_init(void);
void tipc_socket_stop(void); void tipc_socket_stop(void);
int tipc_sock_create_local(struct net *net, int type, struct socket **res); int tipc_sock_create_local(struct net *net, int type, struct socket **res);
@ -51,7 +50,8 @@ int tipc_sock_accept_local(struct socket *sock, struct socket **newsock,
int flags); int flags);
int tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq); int tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq);
struct sk_buff *tipc_sk_socks_show(struct net *net); struct sk_buff *tipc_sk_socks_show(struct net *net);
void tipc_sk_mcast_rcv(struct net *net, struct sk_buff *buf); void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
struct sk_buff_head *inputq);
void tipc_sk_reinit(struct net *net); void tipc_sk_reinit(struct net *net);
int tipc_sk_rht_init(struct net *net); int tipc_sk_rht_init(struct net *net);
void tipc_sk_rht_destroy(struct net *net); void tipc_sk_rht_destroy(struct net *net);