tipc: limit usage of temporary skb list during packet reception
authorJon Paul Maloy <jon.maloy@ericsson.com>
Thu, 15 Oct 2015 18:52:40 +0000 (14:52 -0400)
committerDavid S. Miller <davem@davemloft.net>
Fri, 16 Oct 2015 06:55:18 +0000 (23:55 -0700)
During packet reception, the function tipc_link_rcv() adds its accepted
packets to a temporary buffer queue, before finally splicing this queue
into the lock protected input queue that will be delivered up to the
socket layer. The purpose is to reduce potential contention on the input
queue lock. However, since the vast majority of packets arrive in
sequence, they will anyway be added one by one to the input queue, and
the use of the temporary queue becomes a sub-optimization.

The only case where this queue makes sense is when unpacking buffers
from a bundle packet; here we want to avoid dozens of small buffers
to be added individually to the lock-protected input queue in a tight
loop.

In this commit, we remove the general usage of the temporary queue,
and keep it only for the packet unbundling case.

Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Acked-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
net/tipc/link.c

index 75db07c78a6900157c0b568491a5d4e8e694e274..11f74294e085d223e16c4b5873347c3ec2351a84 100644 (file)
@@ -953,7 +953,7 @@ static bool tipc_data_input(struct tipc_link *link, struct sk_buff *skb,
        case TIPC_HIGH_IMPORTANCE:
        case TIPC_CRITICAL_IMPORTANCE:
        case CONN_MANAGER:
-               __skb_queue_tail(inputq, skb);
+               skb_queue_tail(inputq, skb);
                return true;
        case NAME_DISTRIBUTOR:
                node->bclink.recv_permitted = true;
@@ -982,6 +982,7 @@ static int tipc_link_input(struct tipc_link *l, struct sk_buff *skb,
        struct tipc_msg *hdr = buf_msg(skb);
        struct sk_buff **reasm_skb = &l->reasm_buf;
        struct sk_buff *iskb;
+       struct sk_buff_head tmpq;
        int usr = msg_user(hdr);
        int rc = 0;
        int pos = 0;
@@ -1006,10 +1007,12 @@ static int tipc_link_input(struct tipc_link *l, struct sk_buff *skb,
        }
 
        if (usr == MSG_BUNDLER) {
+               skb_queue_head_init(&tmpq);
                l->stats.recv_bundles++;
                l->stats.recv_bundled += msg_msgcnt(hdr);
                while (tipc_msg_extract(skb, &iskb, &pos))
-                       tipc_data_input(l, iskb, inputq);
+                       tipc_data_input(l, iskb, &tmpq);
+               tipc_skb_queue_splice_tail(&tmpq, inputq);
                return 0;
        } else if (usr == MSG_FRAGMENTER) {
                l->stats.recv_fragments++;
@@ -1053,13 +1056,10 @@ int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb,
                  struct sk_buff_head *xmitq)
 {
        struct sk_buff_head *arrvq = &l->deferdq;
-       struct sk_buff_head tmpq;
        struct tipc_msg *hdr;
        u16 seqno, rcv_nxt;
        int rc = 0;
 
-       __skb_queue_head_init(&tmpq);
-
        if (unlikely(!__tipc_skb_queue_sorted(arrvq, skb))) {
                if (!(skb_queue_len(arrvq) % TIPC_NACK_INTV))
                        tipc_link_build_proto_msg(l, STATE_MSG, 0,
@@ -1114,8 +1114,8 @@ int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb,
                /* Packet can be delivered */
                l->rcv_nxt++;
                l->stats.recv_info++;
-               if (unlikely(!tipc_data_input(l, skb, &tmpq)))
-                       rc = tipc_link_input(l, skb, &tmpq);
+               if (unlikely(!tipc_data_input(l, skb, l->inputq)))
+                       rc = tipc_link_input(l, skb, l->inputq);
 
                /* Ack at regular intervals */
                if (unlikely(++l->rcv_unacked >= TIPC_MIN_LINK_WIN)) {
@@ -1126,7 +1126,6 @@ int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb,
                }
        }
 exit:
-       tipc_skb_queue_splice_tail(&tmpq, l->inputq);
        return rc;
 }