tipc: introduce replicast as transport option for multicast
authorJon Paul Maloy <jon.maloy@ericsson.com>
Wed, 18 Jan 2017 18:50:52 +0000 (13:50 -0500)
committerDavid S. Miller <davem@davemloft.net>
Fri, 20 Jan 2017 17:10:17 +0000 (12:10 -0500)
TIPC multicast messages are currently carried over a reliable
'broadcast link', making use of the underlying media's ability to
transport packets as L2 broadcast or IP multicast to all nodes in
the cluster.

When the used bearer is lacking that ability, we can instead emulate
the broadcast service by replicating and sending the packets over as
many unicast links as needed to reach all identified destinations.
We now introduce a new TIPC link-level 'replicast' service that does
this.

Reviewed-by: Parthasarathy Bhuvaragan <parthasarathy.bhuvaragan@ericsson.com>
Acked-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
net/tipc/bcast.c
net/tipc/bcast.h
net/tipc/link.c
net/tipc/msg.c
net/tipc/msg.h
net/tipc/node.c
net/tipc/socket.c

index 412d3351abb79d98d1969e6e052ead16c33b0890..672e6ef93cabebfa3ac9b70cf8afb4d767977b73 100644 (file)
@@ -70,7 +70,7 @@ static struct tipc_bc_base *tipc_bc_base(struct net *net)
 
 int tipc_bcast_get_mtu(struct net *net)
 {
-       return tipc_link_mtu(tipc_bc_sndlink(net));
+       return tipc_link_mtu(tipc_bc_sndlink(net)) - INT_H_SIZE;
 }
 
 /* tipc_bcbase_select_primary(): find a bearer with links to all destinations,
@@ -175,42 +175,101 @@ static void tipc_bcbase_xmit(struct net *net, struct sk_buff_head *xmitq)
        __skb_queue_purge(&_xmitq);
 }
 
-/* tipc_bcast_xmit - deliver buffer chain to all nodes in cluster
- *                    and to identified node local sockets
+/* tipc_bcast_xmit - broadcast the buffer chain to all external nodes
  * @net: the applicable net namespace
- * @list: chain of buffers containing message
+ * @pkts: chain of buffers containing message
+ * @cong_link_cnt: set to 1 if broadcast link is congested, otherwise 0
  * Consumes the buffer chain.
- * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
+ * Returns 0 if success, otherwise errno: -EHOSTUNREACH,-EMSGSIZE
  */
-int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list)
+static int tipc_bcast_xmit(struct net *net, struct sk_buff_head *pkts,
+                          u16 *cong_link_cnt)
 {
        struct tipc_link *l = tipc_bc_sndlink(net);
-       struct sk_buff_head xmitq, inputq, rcvq;
+       struct sk_buff_head xmitq;
        int rc = 0;
 
-       __skb_queue_head_init(&rcvq);
        __skb_queue_head_init(&xmitq);
-       skb_queue_head_init(&inputq);
-
-       /* Prepare message clone for local node */
-       if (unlikely(!tipc_msg_reassemble(list, &rcvq)))
-               return -EHOSTUNREACH;
-
        tipc_bcast_lock(net);
        if (tipc_link_bc_peers(l))
-               rc = tipc_link_xmit(l, list, &xmitq);
+               rc = tipc_link_xmit(l, pkts, &xmitq);
        tipc_bcast_unlock(net);
+       tipc_bcbase_xmit(net, &xmitq);
+       __skb_queue_purge(pkts);
+       if (rc == -ELINKCONG) {
+               *cong_link_cnt = 1;
+               rc = 0;
+       }
+       return rc;
+}
 
-       /* Don't send to local node if adding to link failed */
-       if (unlikely(rc && (rc != -ELINKCONG))) {
-               __skb_queue_purge(&rcvq);
-               return rc;
+/* tipc_rcast_xmit - replicate and send a message to given destination nodes
+ * @net: the applicable net namespace
+ * @pkts: chain of buffers containing message
+ * @dests: list of destination nodes
+ * @cong_link_cnt: returns number of congested links
+ * @cong_links: returns identities of congested links
+ * Returns 0 if success, otherwise errno
+ */
+static int tipc_rcast_xmit(struct net *net, struct sk_buff_head *pkts,
+                          struct tipc_nlist *dests, u16 *cong_link_cnt)
+{
+       struct sk_buff_head _pkts;
+       struct u32_item *n, *tmp;
+       u32 dst, selector;
+
+       selector = msg_link_selector(buf_msg(skb_peek(pkts)));
+       __skb_queue_head_init(&_pkts);
+
+       list_for_each_entry_safe(n, tmp, &dests->list, list) {
+               dst = n->value;
+               if (!tipc_msg_pskb_copy(dst, pkts, &_pkts))
+                       return -ENOMEM;
+
+               /* Any other return value than -ELINKCONG is ignored */
+               if (tipc_node_xmit(net, &_pkts, dst, selector) == -ELINKCONG)
+                       (*cong_link_cnt)++;
        }
+       return 0;
+}
 
-       /* Broadcast to all nodes, inluding local node */
-       tipc_bcbase_xmit(net, &xmitq);
-       tipc_sk_mcast_rcv(net, &rcvq, &inputq);
-       __skb_queue_purge(list);
+/* tipc_mcast_xmit - deliver message to indicated destination nodes
+ *                   and to identified node local sockets
+ * @net: the applicable net namespace
+ * @pkts: chain of buffers containing message
+ * @dests: destination nodes for message. Not consumed.
+ * @cong_link_cnt: returns number of encountered congested destination links
+ * @cong_links: returns identities of congested links
+ * Consumes buffer chain.
+ * Returns 0 if success, otherwise errno
+ */
+int tipc_mcast_xmit(struct net *net, struct sk_buff_head *pkts,
+                   struct tipc_nlist *dests, u16 *cong_link_cnt)
+{
+       struct tipc_bc_base *bb = tipc_bc_base(net);
+       struct sk_buff_head inputq, localq;
+       int rc = 0;
+
+       skb_queue_head_init(&inputq);
+       skb_queue_head_init(&localq);
+
+       /* Clone packets before they are consumed by next call */
+       if (dests->local && !tipc_msg_reassemble(pkts, &localq)) {
+               rc = -ENOMEM;
+               goto exit;
+       }
+
+       if (dests->remote) {
+               if (!bb->bcast_support)
+                       rc = tipc_rcast_xmit(net, pkts, dests, cong_link_cnt);
+               else
+                       rc = tipc_bcast_xmit(net, pkts, cong_link_cnt);
+       }
+
+       if (dests->local)
+               tipc_sk_mcast_rcv(net, &localq, &inputq);
+exit:
+       __skb_queue_purge(pkts);
        return rc;
 }
 
index 18f379198f8fb5fbdd3afbbce7e8c4d00cf65eed..dd772e6f6fa4a211e6d0c72791e0af5b35c6178a 100644 (file)
@@ -66,7 +66,8 @@ void tipc_bcast_remove_peer(struct net *net, struct tipc_link *rcv_bcl);
 void tipc_bcast_inc_bearer_dst_cnt(struct net *net, int bearer_id);
 void tipc_bcast_dec_bearer_dst_cnt(struct net *net, int bearer_id);
 int  tipc_bcast_get_mtu(struct net *net);
-int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list);
+int tipc_mcast_xmit(struct net *net, struct sk_buff_head *pkts,
+                   struct tipc_nlist *dests, u16 *cong_link_cnt);
 int tipc_bcast_rcv(struct net *net, struct tipc_link *l, struct sk_buff *skb);
 void tipc_bcast_ack_rcv(struct net *net, struct tipc_link *l,
                        struct tipc_msg *hdr);
index b0f8646e06315c7acded07721b029d69f9c3b1eb..b17b9e155469c0ea880e28366b89fa1463ae8bf0 100644 (file)
@@ -1032,11 +1032,17 @@ int tipc_link_retrans(struct tipc_link *l, u16 from, u16 to,
 static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb,
                            struct sk_buff_head *inputq)
 {
-       switch (msg_user(buf_msg(skb))) {
+       struct tipc_msg *hdr = buf_msg(skb);
+
+       switch (msg_user(hdr)) {
        case TIPC_LOW_IMPORTANCE:
        case TIPC_MEDIUM_IMPORTANCE:
        case TIPC_HIGH_IMPORTANCE:
        case TIPC_CRITICAL_IMPORTANCE:
+               if (unlikely(msg_type(hdr) == TIPC_MCAST_MSG)) {
+                       skb_queue_tail(l->bc_rcvlink->inputq, skb);
+                       return true;
+               }
        case CONN_MANAGER:
                skb_queue_tail(inputq, skb);
                return true;
index ab02d07424764ad4b269b6ad560167f528454b80..312ef7de57d7ba27c58533df9edb2f4dd61cc864 100644 (file)
@@ -607,6 +607,23 @@ error:
        return false;
 }
 
+bool tipc_msg_pskb_copy(u32 dst, struct sk_buff_head *msg,
+                       struct sk_buff_head *cpy)
+{
+       struct sk_buff *skb, *_skb;
+
+       skb_queue_walk(msg, skb) {
+               _skb = pskb_copy(skb, GFP_ATOMIC);
+               if (!_skb) {
+                       __skb_queue_purge(cpy);
+                       return false;
+               }
+               msg_set_destnode(buf_msg(_skb), dst);
+               __skb_queue_tail(cpy, _skb);
+       }
+       return true;
+}
+
 /* tipc_skb_queue_sorted(); sort pkt into list according to sequence number
  * @list: list to be appended to
  * @seqno: sequence number of buffer to add
index f07b51e3f6f1efab8e1563e88aa6dd3589281a18..c843fd2bc48d2d4a9fb4edebbffc2a114f66f89d 100644 (file)
@@ -631,14 +631,11 @@ static inline void msg_set_bc_netid(struct tipc_msg *m, u32 id)
 
 static inline u32 msg_link_selector(struct tipc_msg *m)
 {
+       if (msg_user(m) == MSG_FRAGMENTER)
+               m = (void *)msg_data(m);
        return msg_bits(m, 4, 0, 1);
 }
 
-static inline void msg_set_link_selector(struct tipc_msg *m, u32 n)
-{
-       msg_set_bits(m, 4, 0, 1, n);
-}
-
 /*
  * Word 5
  */
@@ -835,6 +832,8 @@ int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m,
                   int offset, int dsz, int mtu, struct sk_buff_head *list);
 bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, int *err);
 bool tipc_msg_reassemble(struct sk_buff_head *list, struct sk_buff_head *rcvq);
+bool tipc_msg_pskb_copy(u32 dst, struct sk_buff_head *msg,
+                       struct sk_buff_head *cpy);
 void __tipc_skb_queue_sorted(struct sk_buff_head *list, u16 seqno,
                             struct sk_buff *skb);
 
index 2883f6a0ed9895765d953273a8c8394e1102ade9..f96dacf173abe6479e2e78a8454b25d12b11413f 100644 (file)
@@ -1257,6 +1257,19 @@ void tipc_node_broadcast(struct net *net, struct sk_buff *skb)
        kfree_skb(skb);
 }
 
+static void tipc_node_mcast_rcv(struct tipc_node *n)
+{
+       struct tipc_bclink_entry *be = &n->bc_entry;
+
+       /* 'arrvq' is under inputq2's lock protection */
+       spin_lock_bh(&be->inputq2.lock);
+       spin_lock_bh(&be->inputq1.lock);
+       skb_queue_splice_tail_init(&be->inputq1, &be->arrvq);
+       spin_unlock_bh(&be->inputq1.lock);
+       spin_unlock_bh(&be->inputq2.lock);
+       tipc_sk_mcast_rcv(n->net, &be->arrvq, &be->inputq2);
+}
+
 static void tipc_node_bc_sync_rcv(struct tipc_node *n, struct tipc_msg *hdr,
                                  int bearer_id, struct sk_buff_head *xmitq)
 {
@@ -1330,15 +1343,8 @@ static void tipc_node_bc_rcv(struct net *net, struct sk_buff *skb, int bearer_id
        if (!skb_queue_empty(&xmitq))
                tipc_bearer_xmit(net, bearer_id, &xmitq, &le->maddr);
 
-       /* Deliver. 'arrvq' is under inputq2's lock protection */
-       if (!skb_queue_empty(&be->inputq1)) {
-               spin_lock_bh(&be->inputq2.lock);
-               spin_lock_bh(&be->inputq1.lock);
-               skb_queue_splice_tail_init(&be->inputq1, &be->arrvq);
-               spin_unlock_bh(&be->inputq1.lock);
-               spin_unlock_bh(&be->inputq2.lock);
-               tipc_sk_mcast_rcv(net, &be->arrvq, &be->inputq2);
-       }
+       if (!skb_queue_empty(&be->inputq1))
+               tipc_node_mcast_rcv(n);
 
        if (rc & TIPC_LINK_DOWN_EVT) {
                /* Reception reassembly failure => reset all links to peer */
@@ -1565,6 +1571,9 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b)
        if (unlikely(!skb_queue_empty(&n->bc_entry.namedq)))
                tipc_named_rcv(net, &n->bc_entry.namedq);
 
+       if (unlikely(!skb_queue_empty(&n->bc_entry.inputq1)))
+               tipc_node_mcast_rcv(n);
+
        if (!skb_queue_empty(&le->inputq))
                tipc_sk_rcv(net, &le->inputq);
 
index d2f353934f828aaa865c0619e12f4b4336c524ea..93b6ae3154c9a71a3aad6cf310d546dd1622e729 100644 (file)
@@ -740,32 +740,43 @@ static int tipc_sendmcast(struct  socket *sock, struct tipc_name_seq *seq,
        struct tipc_msg *hdr = &tsk->phdr;
        struct net *net = sock_net(sk);
        int mtu = tipc_bcast_get_mtu(net);
+       u32 domain = addr_domain(net, TIPC_CLUSTER_SCOPE);
        struct sk_buff_head pkts;
+       struct tipc_nlist dsts;
        int rc;
 
+       /* Block or return if any destination link is congested */
        rc = tipc_wait_for_cond(sock, &timeout, !tsk->cong_link_cnt);
        if (unlikely(rc))
                return rc;
 
+       /* Lookup destination nodes */
+       tipc_nlist_init(&dsts, tipc_own_addr(net));
+       tipc_nametbl_lookup_dst_nodes(net, seq->type, seq->lower,
+                                     seq->upper, domain, &dsts);
+       if (!dsts.local && !dsts.remote)
+               return -EHOSTUNREACH;
+
+       /* Build message header */
        msg_set_type(hdr, TIPC_MCAST_MSG);
+       msg_set_hdr_sz(hdr, MCAST_H_SIZE);
        msg_set_lookup_scope(hdr, TIPC_CLUSTER_SCOPE);
        msg_set_destport(hdr, 0);
        msg_set_destnode(hdr, 0);
        msg_set_nametype(hdr, seq->type);
        msg_set_namelower(hdr, seq->lower);
        msg_set_nameupper(hdr, seq->upper);
-       msg_set_hdr_sz(hdr, MCAST_H_SIZE);
 
+       /* Build message as chain of buffers */
        skb_queue_head_init(&pkts);
        rc = tipc_msg_build(hdr, msg, 0, dlen, mtu, &pkts);
-       if (unlikely(rc != dlen))
-               return rc;
 
-       rc = tipc_bcast_xmit(net, &pkts);
-       if (unlikely(rc == -ELINKCONG)) {
-               tsk->cong_link_cnt = 1;
-               rc = 0;
-       }
+       /* Send message if build was successful */
+       if (unlikely(rc == dlen))
+               rc = tipc_mcast_xmit(net, &pkts, &dsts,
+                                    &tsk->cong_link_cnt);
+
+       tipc_nlist_purge(&dsts);
 
        return rc ? rc : dlen;
 }