tipc: introduce per-link spinlock
authorJon Paul Maloy <jon.maloy@ericsson.com>
Thu, 19 Nov 2015 19:30:43 +0000 (14:30 -0500)
committerDavid S. Miller <davem@davemloft.net>
Fri, 20 Nov 2015 19:06:10 +0000 (14:06 -0500)
As a preparation to allow parallel links to work more independently
from each other we introduce a per-link spinlock, to be stored in the
struct nodes's link entry area. Since the node lock still is a regular
spinlock there is no increase in parallellism at this stage.

Reviewed-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
net/tipc/link.c
net/tipc/node.c
net/tipc/node.h

index fa452fb5f34e461fb4111f26f2f5ae9ba0e7d644..b5e895c6f1aa3e17706ebe96b95097ac6e1102d8 100644 (file)
@@ -1995,6 +1995,7 @@ int tipc_nl_link_reset_stats(struct sk_buff *skb, struct genl_info *info)
        struct tipc_node *node;
        struct nlattr *attrs[TIPC_NLA_LINK_MAX + 1];
        struct net *net = sock_net(skb->sk);
+       struct tipc_link_entry *le;
 
        if (!info->attrs[TIPC_NLA_LINK])
                return -EINVAL;
@@ -2020,17 +2021,17 @@ int tipc_nl_link_reset_stats(struct sk_buff *skb, struct genl_info *info)
        node = tipc_link_find_owner(net, link_name, &bearer_id);
        if (!node)
                return -EINVAL;
-
+       le = &node->links[bearer_id];
        tipc_node_lock(node);
-
-       link = node->links[bearer_id].link;
+       spin_lock_bh(&le->lock);
+       link = le->link;
        if (!link) {
                tipc_node_unlock(node);
                return -EINVAL;
        }
 
        link_reset_statistics(link);
-
+       spin_unlock_bh(&le->lock);
        tipc_node_unlock(node);
 
        return 0;
index 9321952585514c1400c92fa9efc04d340a468365..572063a0190e817b3780cbe55150711bdfc02c69 100644 (file)
@@ -339,11 +339,13 @@ static void tipc_node_timeout(unsigned long data)
        for (bearer_id = 0; bearer_id < MAX_BEARERS; bearer_id++) {
                tipc_node_lock(n);
                le = &n->links[bearer_id];
+               spin_lock_bh(&le->lock);
                if (le->link) {
                        /* Link tolerance may change asynchronously: */
                        tipc_node_calculate_timer(n, le->link);
                        rc = tipc_link_timeout(le->link, &xmitq);
                }
+               spin_unlock_bh(&le->lock);
                tipc_node_unlock(n);
                tipc_bearer_xmit(n->net, bearer_id, &xmitq, &le->maddr);
                if (rc & TIPC_LINK_DOWN_EVT)
@@ -654,6 +656,7 @@ void tipc_node_check_dest(struct net *net, u32 onode,
                if (n->state == NODE_FAILINGOVER)
                        tipc_link_fsm_evt(l, LINK_FAILOVER_BEGIN_EVT);
                le->link = l;
+               spin_lock_init(&le->lock);
                n->link_cnt++;
                tipc_node_calculate_timer(n, l);
                if (n->link_cnt == 1)
@@ -1033,20 +1036,6 @@ msg_full:
        return -EMSGSIZE;
 }
 
-static struct tipc_link *tipc_node_select_link(struct tipc_node *n, int sel,
-                                              int *bearer_id,
-                                              struct tipc_media_addr **maddr)
-{
-       int id = n->active_links[sel & 1];
-
-       if (unlikely(id < 0))
-               return NULL;
-
-       *bearer_id = id;
-       *maddr = &n->links[id].maddr;
-       return n->links[id].link;
-}
-
 /**
  * tipc_node_xmit() is the general link level function for message sending
  * @net: the applicable net namespace
@@ -1059,26 +1048,32 @@ static struct tipc_link *tipc_node_select_link(struct tipc_node *n, int sel,
 int tipc_node_xmit(struct net *net, struct sk_buff_head *list,
                   u32 dnode, int selector)
 {
-       struct tipc_link *l = NULL;
+       struct tipc_link_entry *le;
        struct tipc_node *n;
        struct sk_buff_head xmitq;
-       struct tipc_media_addr *maddr;
-       int bearer_id;
+       struct tipc_media_addr *maddr = NULL;
+       int bearer_id = -1;
        int rc = -EHOSTUNREACH;
 
        __skb_queue_head_init(&xmitq);
        n = tipc_node_find(net, dnode);
        if (likely(n)) {
                tipc_node_lock(n);
-               l = tipc_node_select_link(n, selector, &bearer_id, &maddr);
-               if (likely(l))
-                       rc = tipc_link_xmit(l, list, &xmitq);
+               bearer_id = n->active_links[selector & 1];
+               if (bearer_id >= 0) {
+                       le = &n->links[bearer_id];
+                       maddr = &le->maddr;
+                       spin_lock_bh(&le->lock);
+                       if (likely(le->link))
+                               rc = tipc_link_xmit(le->link, list, &xmitq);
+                       spin_unlock_bh(&le->lock);
+               }
                tipc_node_unlock(n);
                if (unlikely(rc == -ENOBUFS))
                        tipc_node_link_down(n, bearer_id, false);
                tipc_node_put(n);
        }
-       if (likely(!rc)) {
+       if (likely(!skb_queue_empty(&xmitq))) {
                tipc_bearer_xmit(net, bearer_id, &xmitq, maddr);
                return 0;
        }
@@ -1374,7 +1369,9 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b)
 
        /* Check and if necessary update node state */
        if (likely(tipc_node_check_state(n, skb, bearer_id, &xmitq))) {
+               spin_lock_bh(&le->lock);
                rc = tipc_link_rcv(le->link, skb, &xmitq);
+               spin_unlock_bh(&le->lock);
                skb = NULL;
        }
 unlock:
index dd79e9742bd68d8acf247fbe0762c12514c962ac..8784907486c04effe39a762711c7ed9300f83cc9 100644 (file)
@@ -69,6 +69,7 @@ enum {
 
 struct tipc_link_entry {
        struct tipc_link *link;
+       spinlock_t lock;   /* per-link */
        u32 mtu;
        struct sk_buff_head inputq;
        struct tipc_media_addr maddr;
@@ -86,7 +87,7 @@ struct tipc_bclink_entry {
  * struct tipc_node - TIPC node structure
  * @addr: network address of node
  * @ref: reference counter to node object
- * @lock: spinlock governing access to structure
+ * @lock: rwlock governing access to structure
  * @net: the applicable net namespace
  * @hash: links to adjacent nodes in unsorted hash chain
  * @inputq: pointer to input queue containing messages for msg event