tipc: move all link_reset() calls to link aggregation level
authorJon Paul Maloy <jon.maloy@ericsson.com>
Thu, 30 Jul 2015 22:24:16 +0000 (18:24 -0400)
committerDavid S. Miller <davem@davemloft.net>
Fri, 31 Jul 2015 00:25:13 +0000 (17:25 -0700)
In line with our effort to let the node level have full control over
its links, we want to move all link reset calls from link.c to node.c.
Some of the calls can be moved by simply moving the calling function,
when this is the right thing to do. For the remaining calls we use
the now established technique of returning a TIPC_LINK_DOWN_EVT
flag from tipc_link_rcv(), whereafter we perform the reset call when
the call returns.

This change serves as a preparation for the coming commits.

Tested-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
net/tipc/bearer.c
net/tipc/link.c
net/tipc/link.h
net/tipc/node.c
net/tipc/node.h

index eae58a6b121cf8b3d70260038fc644edcd6c97df..ce9f7bfc0b92444950f51893e87abbc426151eb6 100644 (file)
@@ -343,7 +343,7 @@ restart:
 static int tipc_reset_bearer(struct net *net, struct tipc_bearer *b_ptr)
 {
        pr_info("Resetting bearer <%s>\n", b_ptr->name);
-       tipc_link_delete_list(net, b_ptr->identity);
+       tipc_node_delete_links(net, b_ptr->identity);
        tipc_disc_reset(net, b_ptr);
        return 0;
 }
@@ -361,7 +361,7 @@ static void bearer_disable(struct net *net, struct tipc_bearer *b_ptr)
        pr_info("Disabling bearer <%s>\n", b_ptr->name);
        b_ptr->media->disable_media(b_ptr);
 
-       tipc_link_delete_list(net, b_ptr->identity);
+       tipc_node_delete_links(net, b_ptr->identity);
        if (b_ptr->link_req)
                tipc_disc_delete(b_ptr->link_req);
 
index 05837ba7b68c48eed2c627eac0366d0ed12b8e87..8c81db7b17f9fae6272a96f044e476ef9e26e571 100644 (file)
@@ -137,9 +137,9 @@ static void link_print(struct tipc_link *l_ptr, const char *str);
 static void tipc_link_build_bcast_sync_msg(struct tipc_link *l,
                                           struct sk_buff_head *xmitq);
 static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf);
-static void tipc_link_input(struct tipc_link *l, struct sk_buff *skb);
+static int tipc_link_input(struct tipc_link *l, struct sk_buff *skb);
 static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb);
-static bool tipc_link_failover_rcv(struct tipc_link *l, struct sk_buff **skb);
+static int tipc_link_failover_rcv(struct tipc_link *l, struct sk_buff **skb);
 
 /*
  *  Simple link routines
@@ -258,34 +258,6 @@ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,
        return l_ptr;
 }
 
-/**
- * tipc_link_delete - Delete a link
- * @l: link to be deleted
- */
-void tipc_link_delete(struct tipc_link *l)
-{
-       tipc_link_reset(l);
-       tipc_link_reset_fragments(l);
-       tipc_node_detach_link(l->owner, l);
-}
-
-void tipc_link_delete_list(struct net *net, unsigned int bearer_id)
-{
-       struct tipc_net *tn = net_generic(net, tipc_net_id);
-       struct tipc_link *link;
-       struct tipc_node *node;
-
-       rcu_read_lock();
-       list_for_each_entry_rcu(node, &tn->node_list, list) {
-               tipc_node_lock(node);
-               link = node->links[bearer_id].link;
-               if (link)
-                       tipc_link_delete(link);
-               tipc_node_unlock(node);
-       }
-       rcu_read_unlock();
-}
-
 /* tipc_link_build_bcast_sync_msg() - synchronize broadcast link endpoints.
  *
  * Give a newly added peer node the sequence number where it should
@@ -875,26 +847,6 @@ void tipc_link_advance_backlog(struct tipc_link *l, struct sk_buff_head *xmitq)
        l->snd_nxt = seqno;
 }
 
-void tipc_link_reset_all(struct tipc_node *node)
-{
-       char addr_string[16];
-       u32 i;
-
-       tipc_node_lock(node);
-
-       pr_warn("Resetting all links to %s\n",
-               tipc_addr_string_fill(addr_string, node->addr));
-
-       for (i = 0; i < MAX_BEARERS; i++) {
-               if (node->links[i].link) {
-                       link_print(node->links[i].link, "Resetting link\n");
-                       tipc_link_reset(node->links[i].link);
-               }
-       }
-
-       tipc_node_unlock(node);
-}
-
 static void link_retransmit_failure(struct tipc_link *l_ptr,
                                    struct sk_buff *buf)
 {
@@ -911,7 +863,6 @@ static void link_retransmit_failure(struct tipc_link *l_ptr,
                        msg_errcode(msg));
                pr_info("sqno %u, prev: %x, src: %x\n",
                        msg_seqno(msg), msg_prevnode(msg), msg_orignode(msg));
-               tipc_link_reset(l_ptr);
        } else {
                /* Handle failure on broadcast link */
                struct tipc_node *n_ptr;
@@ -987,6 +938,7 @@ static int tipc_link_retransm(struct tipc_link *l, int retransm,
                l->stale_count = 1;
        } else if (++l->stale_count > 100) {
                link_retransmit_failure(l, skb);
+               l->exec_mode = TIPC_LINK_BLOCKED;
                return TIPC_LINK_DOWN_EVT;
        }
        skb_queue_walk(&l->transmq, skb) {
@@ -1079,12 +1031,13 @@ static bool tipc_data_input(struct tipc_link *link, struct sk_buff *skb)
  * Consumes buffer
  * Node lock must be held
  */
-static void tipc_link_input(struct tipc_link *link, struct sk_buff *skb)
+static int tipc_link_input(struct tipc_link *link, struct sk_buff *skb)
 {
        struct tipc_node *node = link->owner;
        struct tipc_msg *msg = buf_msg(skb);
        struct sk_buff *iskb;
        int pos = 0;
+       int rc = 0;
 
        switch (msg_user(msg)) {
        case TUNNEL_PROTOCOL:
@@ -1094,7 +1047,8 @@ static void tipc_link_input(struct tipc_link *link, struct sk_buff *skb)
                        kfree_skb(skb);
                        break;
                }
-               if (!tipc_link_failover_rcv(link, &skb))
+               rc |= tipc_link_failover_rcv(link, &skb);
+               if (!skb)
                        break;
                if (msg_user(buf_msg(skb)) != MSG_BUNDLER) {
                        tipc_data_input(link, skb);
@@ -1113,7 +1067,8 @@ static void tipc_link_input(struct tipc_link *link, struct sk_buff *skb)
                        link->stats.recv_fragmented++;
                        tipc_data_input(link, skb);
                } else if (!link->reasm_buf) {
-                       tipc_link_reset(link);
+                       link->exec_mode = TIPC_LINK_BLOCKED;
+                       rc |= TIPC_LINK_DOWN_EVT;
                }
                break;
        case BCAST_PROTOCOL:
@@ -1122,6 +1077,7 @@ static void tipc_link_input(struct tipc_link *link, struct sk_buff *skb)
        default:
                break;
        };
+       return rc;
 }
 
 static bool tipc_link_release_pkts(struct tipc_link *l, u16 acked)
@@ -1215,7 +1171,7 @@ int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb,
                l->rcv_nxt++;
                l->stats.recv_info++;
                if (unlikely(!tipc_data_input(l, skb)))
-                       tipc_link_input(l, skb);
+                       rc |= tipc_link_input(l, skb);
 
                /* Ack at regular intervals */
                if (unlikely(++l->rcv_unacked >= TIPC_MIN_LINK_WIN)) {
@@ -1504,14 +1460,15 @@ tunnel_queue:
 /*  tipc_link_failover_rcv(): Receive a tunnelled FAILOVER_MSG packet
  *  Owner node is locked.
  */
-static bool tipc_link_failover_rcv(struct tipc_link *link,
-                                  struct sk_buff **skb)
+static int tipc_link_failover_rcv(struct tipc_link *link,
+                                 struct sk_buff **skb)
 {
        struct tipc_msg *msg = buf_msg(*skb);
        struct sk_buff *iskb = NULL;
        struct tipc_link *pl = NULL;
        int bearer_id = msg_bearer_id(msg);
        int pos = 0;
+       int rc = 0;
 
        if (msg_type(msg) != FAILOVER_MSG) {
                pr_warn("%sunknown tunnel pkt received\n", link_co_err);
@@ -1524,8 +1481,6 @@ static bool tipc_link_failover_rcv(struct tipc_link *link,
                goto exit;
 
        pl = link->owner->links[bearer_id].link;
-       if (pl && tipc_link_is_up(pl))
-               tipc_link_reset(pl);
 
        if (link->failover_pkts == FIRST_FAILOVER)
                link->failover_pkts = msg_msgcnt(msg);
@@ -1550,14 +1505,18 @@ static bool tipc_link_failover_rcv(struct tipc_link *link,
        }
        if (msg_user(buf_msg(iskb)) == MSG_FRAGMENTER) {
                link->stats.recv_fragments++;
-               tipc_buf_append(&link->failover_skb, &iskb);
+               if (!tipc_buf_append(&link->failover_skb, &iskb) &&
+                   !link->failover_skb) {
+                       link->exec_mode = TIPC_LINK_BLOCKED;
+                       rc |= TIPC_LINK_DOWN_EVT;
+               }
        }
 exit:
        if (!link->failover_pkts && pl)
                pl->exec_mode = TIPC_LINK_OPEN;
        kfree_skb(*skb);
        *skb = iskb;
-       return *skb;
+       return rc;
 }
 
 /* tipc_link_proto_rcv(): receive link level protocol message :
index 279196d6baacc3ffdacad5a6671a2393ca71de9f..bb1378b7cb5974c924da76c24cfba8143f14c124 100644 (file)
@@ -212,8 +212,6 @@ struct tipc_link *tipc_link_create(struct tipc_node *n,
                                   const struct tipc_media_addr *maddr,
                                   struct sk_buff_head *inputq,
                                   struct sk_buff_head *namedq);
-void tipc_link_delete(struct tipc_link *link);
-void tipc_link_delete_list(struct net *net, unsigned int bearer_id);
 void tipc_link_failover_send_queue(struct tipc_link *l_ptr);
 void tipc_link_dup_queue_xmit(struct tipc_link *l_ptr, struct tipc_link *dest);
 void tipc_link_reset_fragments(struct tipc_link *l_ptr);
@@ -221,7 +219,6 @@ int tipc_link_is_up(struct tipc_link *l_ptr);
 int tipc_link_is_active(struct tipc_link *l_ptr);
 void tipc_link_purge_queues(struct tipc_link *l_ptr);
 void tipc_link_purge_backlog(struct tipc_link *l);
-void tipc_link_reset_all(struct tipc_node *node);
 void tipc_link_reset(struct tipc_link *l_ptr);
 int __tipc_link_xmit(struct net *net, struct tipc_link *link,
                     struct sk_buff_head *list);
index 558df25a7fc6a3897039cffb7b2da897776bd5f6..6a0680ba98a98e4ae716a44aeefb6298ed3d8990 100644 (file)
@@ -407,6 +407,44 @@ bool tipc_node_update_dest(struct tipc_node *n,  struct tipc_bearer *b,
        return true;
 }
 
+void tipc_node_delete_links(struct net *net, int bearer_id)
+{
+       struct tipc_net *tn = net_generic(net, tipc_net_id);
+       struct tipc_link *l;
+       struct tipc_node *n;
+
+       rcu_read_lock();
+       list_for_each_entry_rcu(n, &tn->node_list, list) {
+               tipc_node_lock(n);
+               l = n->links[bearer_id].link;
+               if (l) {
+                       tipc_link_reset(l);
+                       n->links[bearer_id].link = NULL;
+                       n->link_cnt--;
+               }
+               tipc_node_unlock(n);
+               kfree(l);
+       }
+       rcu_read_unlock();
+}
+
+static void tipc_node_reset_links(struct tipc_node *n)
+{
+       char addr_string[16];
+       u32 i;
+
+       tipc_node_lock(n);
+
+       pr_warn("Resetting all links to %s\n",
+               tipc_addr_string_fill(addr_string, n->addr));
+
+       for (i = 0; i < MAX_BEARERS; i++) {
+               if (n->links[i].link)
+                       tipc_link_reset(n->links[i].link);
+       }
+       tipc_node_unlock(n);
+}
+
 void tipc_node_attach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr)
 {
        n_ptr->links[l_ptr->bearer_id].link = l_ptr;
@@ -721,7 +759,7 @@ void tipc_node_unlock(struct tipc_node *node)
                tipc_bclink_input(net);
 
        if (flags & TIPC_BCAST_RESET)
-               tipc_link_reset_all(node);
+               tipc_node_reset_links(node);
 }
 
 /* Caller should hold node lock for the passed node */
@@ -836,6 +874,40 @@ int tipc_node_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode,
        return 0;
 }
 
+/* tipc_node_tnl_init(): handle a received TUNNEL_PROTOCOL packet,
+ * in order to control parallel link failover or synchronization
+ */
+static void tipc_node_tnl_init(struct tipc_node *n, int bearer_id,
+                              struct sk_buff *skb)
+{
+       struct tipc_link *tnl, *pl;
+       struct tipc_msg *hdr = buf_msg(skb);
+       u16 oseqno = msg_seqno(hdr);
+       int pb_id = msg_bearer_id(hdr);
+
+       if (pb_id >= MAX_BEARERS)
+               return;
+
+       tnl = n->links[bearer_id].link;
+       if (!tnl)
+               return;
+
+       /* Ignore if duplicate */
+       if (less(oseqno, tnl->rcv_nxt))
+               return;
+
+       pl = n->links[pb_id].link;
+       if (!pl)
+               return;
+
+       if (msg_type(hdr) == FAILOVER_MSG) {
+               if (tipc_link_is_up(pl)) {
+                       tipc_link_reset(pl);
+                       pl->exec_mode = TIPC_LINK_BLOCKED;
+               }
+       }
+}
+
 /**
  * tipc_rcv - process TIPC packets/messages arriving from off-node
  * @net: the applicable net namespace
@@ -854,6 +926,7 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b)
        struct tipc_media_addr *maddr;
        int bearer_id = b->identity;
        int rc = 0;
+       int usr;
 
        __skb_queue_head_init(&xmitq);
 
@@ -863,8 +936,9 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b)
 
        /* Handle arrival of a non-unicast link packet */
        hdr = buf_msg(skb);
+       usr = msg_user(hdr);
        if (unlikely(msg_non_seq(hdr))) {
-               if (msg_user(hdr) ==  LINK_CONFIG)
+               if (usr ==  LINK_CONFIG)
                        tipc_disc_rcv(net, skb, b);
                else
                        tipc_bclink_rcv(net, skb);
@@ -877,6 +951,10 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b)
                goto discard;
        tipc_node_lock(n);
 
+       /* Prepare links for tunneled reception if applicable */
+       if (unlikely(usr == TUNNEL_PROTOCOL))
+               tipc_node_tnl_init(n, bearer_id, skb);
+
        /* Locate link endpoint that should handle packet */
        l = n->links[bearer_id].link;
        if (unlikely(!l))
@@ -887,7 +965,7 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b)
                if (!tipc_node_filter_skb(n, l, hdr))
                        goto unlock;
 
-       if (unlikely(msg_user(hdr) == LINK_PROTOCOL))
+       if (unlikely(usr == LINK_PROTOCOL))
                tipc_bclink_sync_state(n, hdr);
 
        /* Release acked broadcast messages */
index 5e7016802077c65db72599f9713a94bee24f1a8e..49df0e934a65b7ebb706f93ff2ec04e01d47b9a1 100644 (file)
@@ -171,6 +171,7 @@ void tipc_node_check_dest(struct tipc_node *n, struct tipc_bearer *bearer,
                          struct tipc_media_addr *maddr);
 bool tipc_node_update_dest(struct tipc_node *n, struct tipc_bearer *bearer,
                           struct tipc_media_addr *maddr);
+void tipc_node_delete_links(struct net *net, int bearer_id);
 void tipc_node_attach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr);
 void tipc_node_detach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr);
 void tipc_node_link_down(struct tipc_node *n_ptr, int bearer_id);