net: batch skb dequeueing from softnet input_pkt_queue
authorChangli Gao <xiaosuo@gmail.com>
Tue, 27 Apr 2010 22:07:33 +0000 (15:07 -0700)
committerDavid S. Miller <davem@davemloft.net>
Tue, 27 Apr 2010 22:11:49 +0000 (15:11 -0700)
batch skb dequeueing from softnet input_pkt_queue to reduce potential lock
contention when RPS is enabled.

Note: in the worst case, the number of packets in a softnet_data may
be double of netdev_max_backlog.

Signed-off-by: Changli Gao <xiaosuo@gmail.com>
Signed-off-by: Eric Dumazet <eric.dumazet@gmail.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
include/linux/netdevice.h
net/core/dev.c

index c04ca246395db47d94dd1304c595e9ab031202e6..40d4c20d034bfe826a3123d50a8052bb31e337a5 100644 (file)
@@ -1388,6 +1388,7 @@ struct softnet_data {
        struct Qdisc            **output_queue_tailp;
        struct list_head        poll_list;
        struct sk_buff          *completion_queue;
+       struct sk_buff_head     process_queue;
 
 #ifdef CONFIG_RPS
        struct softnet_data     *rps_ipi_list;
@@ -1402,10 +1403,11 @@ struct softnet_data {
        struct napi_struct      backlog;
 };
 
-static inline void input_queue_head_incr(struct softnet_data *sd)
+static inline void input_queue_head_add(struct softnet_data *sd,
+                                       unsigned int len)
 {
 #ifdef CONFIG_RPS
-       sd->input_queue_head++;
+       sd->input_queue_head += len;
 #endif
 }
 
index 3d314919a2cf61b947fc96a865b46b7af418fb93..100dcbd29739b27c8934e5c1502caebd8771a04e 100644 (file)
@@ -2408,12 +2408,13 @@ static int enqueue_to_backlog(struct sk_buff *skb, int cpu,
        __get_cpu_var(netdev_rx_stat).total++;
 
        rps_lock(sd);
-       if (sd->input_pkt_queue.qlen <= netdev_max_backlog) {
-               if (sd->input_pkt_queue.qlen) {
+       if (skb_queue_len(&sd->input_pkt_queue) <= netdev_max_backlog) {
+               if (skb_queue_len(&sd->input_pkt_queue)) {
 enqueue:
                        __skb_queue_tail(&sd->input_pkt_queue, skb);
 #ifdef CONFIG_RPS
-                       *qtail = sd->input_queue_head + sd->input_pkt_queue.qlen;
+                       *qtail = sd->input_queue_head +
+                                       skb_queue_len(&sd->input_pkt_queue);
 #endif
                        rps_unlock(sd);
                        local_irq_restore(flags);
@@ -2934,13 +2935,21 @@ static void flush_backlog(void *arg)
        struct sk_buff *skb, *tmp;
 
        rps_lock(sd);
-       skb_queue_walk_safe(&sd->input_pkt_queue, skb, tmp)
+       skb_queue_walk_safe(&sd->input_pkt_queue, skb, tmp) {
                if (skb->dev == dev) {
                        __skb_unlink(skb, &sd->input_pkt_queue);
                        kfree_skb(skb);
-                       input_queue_head_incr(sd);
+                       input_queue_head_add(sd, 1);
                }
+       }
        rps_unlock(sd);
+
+       skb_queue_walk_safe(&sd->process_queue, skb, tmp) {
+               if (skb->dev == dev) {
+                       __skb_unlink(skb, &sd->process_queue);
+                       kfree_skb(skb);
+               }
+       }
 }
 
 static int napi_gro_complete(struct sk_buff *skb)
@@ -3286,24 +3295,33 @@ static int process_backlog(struct napi_struct *napi, int quota)
        }
 #endif
        napi->weight = weight_p;
-       do {
+       local_irq_disable();
+       while (work < quota) {
                struct sk_buff *skb;
+               unsigned int qlen;
+
+               while ((skb = __skb_dequeue(&sd->process_queue))) {
+                       local_irq_enable();
+                       __netif_receive_skb(skb);
+                       if (++work >= quota)
+                               return work;
+                       local_irq_disable();
+               }
 
-               local_irq_disable();
                rps_lock(sd);
-               skb = __skb_dequeue(&sd->input_pkt_queue);
-               if (!skb) {
+               qlen = skb_queue_len(&sd->input_pkt_queue);
+               if (qlen) {
+                       input_queue_head_add(sd, qlen);
+                       skb_queue_splice_tail_init(&sd->input_pkt_queue,
+                                                  &sd->process_queue);
+               }
+               if (qlen < quota - work) {
                        __napi_complete(napi);
-                       rps_unlock(sd);
-                       local_irq_enable();
-                       break;
+                       quota = work + qlen;
                }
-               input_queue_head_incr(sd);
                rps_unlock(sd);
-               local_irq_enable();
-
-               __netif_receive_skb(skb);
-       } while (++work < quota);
+       }
+       local_irq_enable();
 
        return work;
 }
@@ -5630,8 +5648,10 @@ static int dev_cpu_callback(struct notifier_block *nfb,
        /* Process offline CPU's input_pkt_queue */
        while ((skb = __skb_dequeue(&oldsd->input_pkt_queue))) {
                netif_rx(skb);
-               input_queue_head_incr(oldsd);
+               input_queue_head_add(oldsd, 1);
        }
+       while ((skb = __skb_dequeue(&oldsd->process_queue)))
+               netif_rx(skb);
 
        return NOTIFY_OK;
 }
@@ -5850,6 +5870,7 @@ static int __init net_dev_init(void)
                struct softnet_data *sd = &per_cpu(softnet_data, i);
 
                skb_queue_head_init(&sd->input_pkt_queue);
+               skb_queue_head_init(&sd->process_queue);
                sd->completion_queue = NULL;
                INIT_LIST_HEAD(&sd->poll_list);
                sd->output_queue = NULL;