struct tipc_node *node;
struct nlattr *attrs[TIPC_NLA_LINK_MAX + 1];
struct net *net = sock_net(skb->sk);
+ struct tipc_link_entry *le;
if (!info->attrs[TIPC_NLA_LINK])
return -EINVAL;
node = tipc_link_find_owner(net, link_name, &bearer_id);
if (!node)
return -EINVAL;
-
+ le = &node->links[bearer_id];
tipc_node_lock(node);
-
- link = node->links[bearer_id].link;
+ spin_lock_bh(&le->lock);
+ link = le->link;
if (!link) {
tipc_node_unlock(node);
return -EINVAL;
}
link_reset_statistics(link);
-
+ spin_unlock_bh(&le->lock);
tipc_node_unlock(node);
return 0;
for (bearer_id = 0; bearer_id < MAX_BEARERS; bearer_id++) {
tipc_node_lock(n);
le = &n->links[bearer_id];
+ spin_lock_bh(&le->lock);
if (le->link) {
/* Link tolerance may change asynchronously: */
tipc_node_calculate_timer(n, le->link);
rc = tipc_link_timeout(le->link, &xmitq);
}
+ spin_unlock_bh(&le->lock);
tipc_node_unlock(n);
tipc_bearer_xmit(n->net, bearer_id, &xmitq, &le->maddr);
if (rc & TIPC_LINK_DOWN_EVT)
if (n->state == NODE_FAILINGOVER)
tipc_link_fsm_evt(l, LINK_FAILOVER_BEGIN_EVT);
le->link = l;
+ spin_lock_init(&le->lock);
n->link_cnt++;
tipc_node_calculate_timer(n, l);
if (n->link_cnt == 1)
return -EMSGSIZE;
}
-static struct tipc_link *tipc_node_select_link(struct tipc_node *n, int sel,
- int *bearer_id,
- struct tipc_media_addr **maddr)
-{
- int id = n->active_links[sel & 1];
-
- if (unlikely(id < 0))
- return NULL;
-
- *bearer_id = id;
- *maddr = &n->links[id].maddr;
- return n->links[id].link;
-}
-
/**
* tipc_node_xmit() is the general link level function for message sending
* @net: the applicable net namespace
int tipc_node_xmit(struct net *net, struct sk_buff_head *list,
u32 dnode, int selector)
{
- struct tipc_link *l = NULL;
+ struct tipc_link_entry *le;
struct tipc_node *n;
struct sk_buff_head xmitq;
- struct tipc_media_addr *maddr;
- int bearer_id;
+ struct tipc_media_addr *maddr = NULL;
+ int bearer_id = -1;
int rc = -EHOSTUNREACH;
__skb_queue_head_init(&xmitq);
n = tipc_node_find(net, dnode);
if (likely(n)) {
tipc_node_lock(n);
- l = tipc_node_select_link(n, selector, &bearer_id, &maddr);
- if (likely(l))
- rc = tipc_link_xmit(l, list, &xmitq);
+ bearer_id = n->active_links[selector & 1];
+ if (bearer_id >= 0) {
+ le = &n->links[bearer_id];
+ maddr = &le->maddr;
+ spin_lock_bh(&le->lock);
+ if (likely(le->link))
+ rc = tipc_link_xmit(le->link, list, &xmitq);
+ spin_unlock_bh(&le->lock);
+ }
tipc_node_unlock(n);
if (unlikely(rc == -ENOBUFS))
tipc_node_link_down(n, bearer_id, false);
tipc_node_put(n);
}
- if (likely(!rc)) {
+ if (likely(!skb_queue_empty(&xmitq))) {
tipc_bearer_xmit(net, bearer_id, &xmitq, maddr);
return 0;
}
/* Check and if necessary update node state */
if (likely(tipc_node_check_state(n, skb, bearer_id, &xmitq))) {
+ spin_lock_bh(&le->lock);
rc = tipc_link_rcv(le->link, skb, &xmitq);
+ spin_unlock_bh(&le->lock);
skb = NULL;
}
unlock:
struct tipc_link_entry {
struct tipc_link *link;
+ spinlock_t lock; /* per-link */
u32 mtu;
struct sk_buff_head inputq;
struct tipc_media_addr maddr;
* struct tipc_node - TIPC node structure
* @addr: network address of node
* @ref: reference counter to node object
- * @lock: spinlock governing access to structure
+ * @lock: rwlock governing access to structure
* @net: the applicable net namespace
* @hash: links to adjacent nodes in unsorted hash chain
* @inputq: pointer to input queue containing messages for msg event