tipc: transfer broadcast nacks in link state messages
authorJon Paul Maloy <jon.maloy@ericsson.com>
Thu, 1 Sep 2016 17:52:49 +0000 (13:52 -0400)
committerDavid S. Miller <davem@davemloft.net>
Sat, 3 Sep 2016 00:10:24 +0000 (17:10 -0700)
When we send broadcasts in clusters of more 70-80 nodes, we sometimes
see the broadcast link resetting because of an excessive number of
retransmissions. This is caused by a combination of two factors:

1) A 'NACK crunch", where loss of broadcast packets is discovered
   and NACK'ed by several nodes simultaneously, leading to multiple
   redundant broadcast retransmissions.

2) The fact that the NACKS as such also are sent as broadcast, leading
   to excessive load and packet loss on the transmitting switch/bridge.

This commit deals with the latter problem, by moving sending of
broadcast nacks from the dedicated BCAST_PROTOCOL/NACK message type
to regular unicast LINK_PROTOCOL/STATE messages. We allocate 10 unused
bits in word 8 of the said message for this purpose, and introduce a
new capability bit, TIPC_BCAST_STATE_NACK in order to keep the change
backwards compatible.

Reviewed-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
net/tipc/bcast.c
net/tipc/bcast.h
net/tipc/link.c
net/tipc/link.h
net/tipc/msg.h
net/tipc/node.c
net/tipc/node.h

index ae469b37d85262e89e99f22784c7fa10b3d4bcb7..753f774cb46f39a7280515ca64c8c41897d9a480 100644 (file)
@@ -269,18 +269,19 @@ void tipc_bcast_ack_rcv(struct net *net, struct tipc_link *l, u32 acked)
  *
  * RCU is locked, no other locks set
  */
-void tipc_bcast_sync_rcv(struct net *net, struct tipc_link *l,
-                        struct tipc_msg *hdr)
+int tipc_bcast_sync_rcv(struct net *net, struct tipc_link *l,
+                       struct tipc_msg *hdr)
 {
        struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq;
        struct sk_buff_head xmitq;
+       int rc = 0;
 
        __skb_queue_head_init(&xmitq);
 
        tipc_bcast_lock(net);
        if (msg_type(hdr) == STATE_MSG) {
                tipc_link_bc_ack_rcv(l, msg_bcast_ack(hdr), &xmitq);
-               tipc_link_bc_sync_rcv(l, hdr, &xmitq);
+               rc = tipc_link_bc_sync_rcv(l, hdr, &xmitq);
        } else {
                tipc_link_bc_init_rcv(l, hdr);
        }
@@ -291,6 +292,7 @@ void tipc_bcast_sync_rcv(struct net *net, struct tipc_link *l,
        /* Any socket wakeup messages ? */
        if (!skb_queue_empty(inputq))
                tipc_sk_rcv(net, inputq);
+       return rc;
 }
 
 /* tipc_bcast_add_peer - add a peer node to broadcast link and bearer
index d5e79b3767fd7747f8d7389bfd7ebe7d702ea6c0..5ffe34472ccd091d19e92137c379191b0d596844 100644 (file)
@@ -56,8 +56,8 @@ int  tipc_bcast_get_mtu(struct net *net);
 int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list);
 int tipc_bcast_rcv(struct net *net, struct tipc_link *l, struct sk_buff *skb);
 void tipc_bcast_ack_rcv(struct net *net, struct tipc_link *l, u32 acked);
-void tipc_bcast_sync_rcv(struct net *net, struct tipc_link *l,
-                        struct tipc_msg *hdr);
+int tipc_bcast_sync_rcv(struct net *net, struct tipc_link *l,
+                       struct tipc_msg *hdr);
 int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg);
 int tipc_nl_bc_link_set(struct net *net, struct nlattr *attrs[]);
 int tipc_bclink_reset_stats(struct net *net);
index 2c6e1b9e024b5d03c434e8bd853fcd971735ac78..136316fb37ec9ba29c757a70cdca1ae3b99abc1c 100644 (file)
@@ -367,6 +367,18 @@ int tipc_link_bc_peers(struct tipc_link *l)
        return l->ackers;
 }
 
+u16 link_bc_rcv_gap(struct tipc_link *l)
+{
+       struct sk_buff *skb = skb_peek(&l->deferdq);
+       u16 gap = 0;
+
+       if (more(l->snd_nxt, l->rcv_nxt))
+               gap = l->snd_nxt - l->rcv_nxt;
+       if (skb)
+               gap = buf_seqno(skb) - l->rcv_nxt;
+       return gap;
+}
+
 void tipc_link_set_mtu(struct tipc_link *l, int mtu)
 {
        l->mtu = mtu;
@@ -1135,7 +1147,10 @@ int tipc_link_build_state_msg(struct tipc_link *l, struct sk_buff_head *xmitq)
                if (((l->rcv_nxt ^ tipc_own_addr(l->net)) & 0xf) != 0xf)
                        return 0;
                l->rcv_unacked = 0;
-               return TIPC_LINK_SND_BC_ACK;
+
+               /* Use snd_nxt to store peer's snd_nxt in broadcast rcv link */
+               l->snd_nxt = l->rcv_nxt;
+               return TIPC_LINK_SND_STATE;
        }
 
        /* Unicast ACK */
@@ -1236,7 +1251,7 @@ int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb,
                        rc |= tipc_link_input(l, skb, l->inputq);
                if (unlikely(++l->rcv_unacked >= TIPC_MIN_LINK_WIN))
                        rc |= tipc_link_build_state_msg(l, xmitq);
-               if (unlikely(rc & ~TIPC_LINK_SND_BC_ACK))
+               if (unlikely(rc & ~TIPC_LINK_SND_STATE))
                        break;
        } while ((skb = __skb_dequeue(defq)));
 
@@ -1250,10 +1265,11 @@ static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe,
                                      u16 rcvgap, int tolerance, int priority,
                                      struct sk_buff_head *xmitq)
 {
+       struct tipc_link *bcl = l->bc_rcvlink;
        struct sk_buff *skb;
        struct tipc_msg *hdr;
        struct sk_buff_head *dfq = &l->deferdq;
-       bool node_up = link_is_up(l->bc_rcvlink);
+       bool node_up = link_is_up(bcl);
        struct tipc_mon_state *mstate = &l->mon_state;
        int dlen = 0;
        void *data;
@@ -1281,7 +1297,7 @@ static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe,
        msg_set_net_plane(hdr, l->net_plane);
        msg_set_next_sent(hdr, l->snd_nxt);
        msg_set_ack(hdr, l->rcv_nxt - 1);
-       msg_set_bcast_ack(hdr, l->bc_rcvlink->rcv_nxt - 1);
+       msg_set_bcast_ack(hdr, bcl->rcv_nxt - 1);
        msg_set_last_bcast(hdr, l->bc_sndlink->snd_nxt - 1);
        msg_set_link_tolerance(hdr, tolerance);
        msg_set_linkprio(hdr, priority);
@@ -1291,6 +1307,7 @@ static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe,
 
        if (mtyp == STATE_MSG) {
                msg_set_seq_gap(hdr, rcvgap);
+               msg_set_bc_gap(hdr, link_bc_rcv_gap(bcl));
                msg_set_probe(hdr, probe);
                tipc_mon_prep(l->net, data, &dlen, mstate, l->bearer_id);
                msg_set_size(hdr, INT_H_SIZE + dlen);
@@ -1575,49 +1592,68 @@ void tipc_link_bc_init_rcv(struct tipc_link *l, struct tipc_msg *hdr)
 
 /* tipc_link_bc_sync_rcv - update rcv link according to peer's send state
  */
-void tipc_link_bc_sync_rcv(struct tipc_link *l, struct tipc_msg *hdr,
-                          struct sk_buff_head *xmitq)
+int tipc_link_bc_sync_rcv(struct tipc_link *l, struct tipc_msg *hdr,
+                         struct sk_buff_head *xmitq)
 {
        u16 peers_snd_nxt = msg_bc_snd_nxt(hdr);
+       u16 from = msg_bcast_ack(hdr) + 1;
+       u16 to = from + msg_bc_gap(hdr) - 1;
+       int rc = 0;
 
        if (!link_is_up(l))
-               return;
+               return rc;
 
        if (!msg_peer_node_is_up(hdr))
-               return;
+               return rc;
 
        /* Open when peer ackowledges our bcast init msg (pkt #1) */
        if (msg_ack(hdr))
                l->bc_peer_is_up = true;
 
        if (!l->bc_peer_is_up)
-               return;
+               return rc;
 
        /* Ignore if peers_snd_nxt goes beyond receive window */
        if (more(peers_snd_nxt, l->rcv_nxt + l->window))
-               return;
+               return rc;
+
+       if (!less(to, from)) {
+               rc = tipc_link_retrans(l->bc_sndlink, from, to, xmitq);
+               l->stats.recv_nacks++;
+       }
+
+       l->snd_nxt = peers_snd_nxt;
+       if (link_bc_rcv_gap(l))
+               rc |= TIPC_LINK_SND_STATE;
+
+       /* Return now if sender supports nack via STATE messages */
+       if (l->peer_caps & TIPC_BCAST_STATE_NACK)
+               return rc;
+
+       /* Otherwise, be backwards compatible */
 
        if (!more(peers_snd_nxt, l->rcv_nxt)) {
                l->nack_state = BC_NACK_SND_CONDITIONAL;
-               return;
+               return 0;
        }
 
        /* Don't NACK if one was recently sent or peeked */
        if (l->nack_state == BC_NACK_SND_SUPPRESS) {
                l->nack_state = BC_NACK_SND_UNCONDITIONAL;
-               return;
+               return 0;
        }
 
        /* Conditionally delay NACK sending until next synch rcv */
        if (l->nack_state == BC_NACK_SND_CONDITIONAL) {
                l->nack_state = BC_NACK_SND_UNCONDITIONAL;
                if ((peers_snd_nxt - l->rcv_nxt) < TIPC_MIN_LINK_WIN)
-                       return;
+                       return 0;
        }
 
        /* Send NACK now but suppress next one */
        tipc_link_build_bc_proto_msg(l, true, peers_snd_nxt, xmitq);
        l->nack_state = BC_NACK_SND_SUPPRESS;
+       return 0;
 }
 
 void tipc_link_bc_ack_rcv(struct tipc_link *l, u16 acked,
@@ -1654,6 +1690,8 @@ void tipc_link_bc_ack_rcv(struct tipc_link *l, u16 acked,
 }
 
 /* tipc_link_bc_nack_rcv(): receive broadcast nack message
+ * This function is here for backwards compatibility, since
+ * no BCAST_PROTOCOL/STATE messages occur from TIPC v2.5.
  */
 int tipc_link_bc_nack_rcv(struct tipc_link *l, struct sk_buff *skb,
                          struct sk_buff_head *xmitq)
index d7e9d42fcb2d11855cd013382dfeef0ba7efd28a..d1bd1787a768306e84976c362aada4af99068e3a 100644 (file)
@@ -63,7 +63,7 @@ enum {
 enum {
        TIPC_LINK_UP_EVT       = 1,
        TIPC_LINK_DOWN_EVT     = (1 << 1),
-       TIPC_LINK_SND_BC_ACK   = (1 << 2)
+       TIPC_LINK_SND_STATE    = (1 << 2)
 };
 
 /* Starting value for maximum packet size negotiation on unicast links
@@ -138,8 +138,8 @@ void tipc_link_bc_ack_rcv(struct tipc_link *l, u16 acked,
 void tipc_link_build_bc_sync_msg(struct tipc_link *l,
                                 struct sk_buff_head *xmitq);
 void tipc_link_bc_init_rcv(struct tipc_link *l, struct tipc_msg *hdr);
-void tipc_link_bc_sync_rcv(struct tipc_link *l,   struct tipc_msg *hdr,
-                          struct sk_buff_head *xmitq);
+int tipc_link_bc_sync_rcv(struct tipc_link *l,   struct tipc_msg *hdr,
+                         struct sk_buff_head *xmitq);
 int tipc_link_bc_nack_rcv(struct tipc_link *l, struct sk_buff *skb,
                          struct sk_buff_head *xmitq);
 #endif
index 7cf52fb39bee6aa38379e81af31fe3d5c6fea557..c3832cdf2278a3a49dd6624d350ba34096764897 100644 (file)
@@ -719,6 +719,16 @@ static inline char *msg_media_addr(struct tipc_msg *m)
        return (char *)&m->hdr[TIPC_MEDIA_INFO_OFFSET];
 }
 
+static inline u32 msg_bc_gap(struct tipc_msg *m)
+{
+       return msg_bits(m, 8, 0, 0x3ff);
+}
+
+static inline void msg_set_bc_gap(struct tipc_msg *m, u32 n)
+{
+       msg_set_bits(m, 8, 0, 0x3ff, n);
+}
+
 /*
  * Word 9
  */
index 7e8b75fd1a02da768f9d203f47223eaf9f477b9b..7ef14e2d2356590d72284e3ef056d63dee3d1b12 100644 (file)
@@ -1262,6 +1262,34 @@ void tipc_node_broadcast(struct net *net, struct sk_buff *skb)
        kfree_skb(skb);
 }
 
+static void tipc_node_bc_sync_rcv(struct tipc_node *n, struct tipc_msg *hdr,
+                                 int bearer_id, struct sk_buff_head *xmitq)
+{
+       struct tipc_link *ucl;
+       int rc;
+
+       rc = tipc_bcast_sync_rcv(n->net, n->bc_entry.link, hdr);
+
+       if (rc & TIPC_LINK_DOWN_EVT) {
+               tipc_bearer_reset_all(n->net);
+               return;
+       }
+
+       if (!(rc & TIPC_LINK_SND_STATE))
+               return;
+
+       /* If probe message, a STATE response will be sent anyway */
+       if (msg_probe(hdr))
+               return;
+
+       /* Produce a STATE message carrying broadcast NACK */
+       tipc_node_read_lock(n);
+       ucl = n->links[bearer_id].link;
+       if (ucl)
+               tipc_link_build_state_msg(ucl, xmitq);
+       tipc_node_read_unlock(n);
+}
+
 /**
  * tipc_node_bc_rcv - process TIPC broadcast packet arriving from off-node
  * @net: the applicable net namespace
@@ -1298,7 +1326,7 @@ static void tipc_node_bc_rcv(struct net *net, struct sk_buff *skb, int bearer_id
        rc = tipc_bcast_rcv(net, be->link, skb);
 
        /* Broadcast ACKs are sent on a unicast link */
-       if (rc & TIPC_LINK_SND_BC_ACK) {
+       if (rc & TIPC_LINK_SND_STATE) {
                tipc_node_read_lock(n);
                tipc_link_build_state_msg(le->link, &xmitq);
                tipc_node_read_unlock(n);
@@ -1505,7 +1533,7 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b)
 
        /* Ensure broadcast reception is in synch with peer's send state */
        if (unlikely(usr == LINK_PROTOCOL))
-               tipc_bcast_sync_rcv(net, n->bc_entry.link, hdr);
+               tipc_node_bc_sync_rcv(n, hdr, bearer_id, &xmitq);
        else if (unlikely(tipc_link_acked(n->bc_entry.link) != bc_ack))
                tipc_bcast_ack_rcv(net, n->bc_entry.link, bc_ack);
 
index 4578b34c7dcad6c74eae0a5b777548460ae38ce5..39ef54c1f2ad41ad0d32b38d2b2bef1b5f0c4e82 100644 (file)
@@ -1,7 +1,7 @@
 /*
  * net/tipc/node.h: Include file for TIPC node management routines
  *
- * Copyright (c) 2000-2006, 2014-2015, Ericsson AB
+ * Copyright (c) 2000-2006, 2014-2016, Ericsson AB
  * Copyright (c) 2005, 2010-2014, Wind River Systems
  * All rights reserved.
  *
 /* Optional capabilities supported by this code version
  */
 enum {
-       TIPC_BCAST_SYNCH   = (1 << 1),
-       TIPC_BLOCK_FLOWCTL = (2 << 1)
+       TIPC_BCAST_SYNCH      = (1 << 1),
+       TIPC_BCAST_STATE_NACK = (1 << 2),
+       TIPC_BLOCK_FLOWCTL    = (1 << 3)
 };
 
-#define TIPC_NODE_CAPABILITIES (TIPC_BCAST_SYNCH | TIPC_BLOCK_FLOWCTL)
+#define TIPC_NODE_CAPABILITIES (TIPC_BCAST_SYNCH | \
+                               TIPC_BCAST_STATE_NACK | \
+                               TIPC_BLOCK_FLOWCTL)
 #define INVALID_BEARER_ID -1
 
 void tipc_node_stop(struct net *net);