net: add additional lock to qdisc to increase throughput
authorEric Dumazet <eric.dumazet@gmail.com>
Wed, 2 Jun 2010 12:09:29 +0000 (05:09 -0700)
committerDavid S. Miller <davem@davemloft.net>
Wed, 2 Jun 2010 12:09:29 +0000 (05:09 -0700)
When many cpus compete for sending frames on a given qdisc, the qdisc
spinlock suffers from very high contention.

The cpu owning __QDISC_STATE_RUNNING bit has same priority to acquire
the lock, and cannot dequeue packets fast enough, since it must wait for
this lock for each dequeued packet.

One solution to this problem is to force all cpus spinning on a second
lock before trying to get the main lock, when/if they see
__QDISC_STATE_RUNNING already set.

The owning cpu then compete with at most one other cpu for the main
lock, allowing for higher dequeueing rate.

Based on a previous patch from Alexander Duyck. I added the heuristic to
avoid the atomic in fast path, and put the new lock far away from the
cache line used by the dequeue worker. Also try to release the busylock
lock as late as possible.

Tests with following script gave a boost from ~50.000 pps to ~600.000
pps on a dual quad core machine (E5450 @3.00GHz), tg3 driver.
(A single netperf flow can reach ~800.000 pps on this platform)

for j in `seq 0 3`; do
  for i in `seq 0 7`; do
    netperf -H 192.168.0.1 -t UDP_STREAM -l 60 -N -T $i -- -m 6 &
  done
done

Signed-off-by: Eric Dumazet <eric.dumazet@gmail.com>
Acked-by: Alexander Duyck <alexander.h.duyck@intel.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
include/net/sch_generic.h
net/core/dev.c
net/sched/sch_generic.c

index b3591e4a514ccc8cf3da805695dc20915e9f3629..b35301b0c7b6e5b6002d351d8edabf439d23bbed 100644 (file)
@@ -80,7 +80,8 @@ struct Qdisc {
        struct gnet_stats_basic_packed bstats;
        unsigned long           __state;
        struct gnet_stats_queue qstats;
-       struct rcu_head     rcu_head;
+       struct rcu_head         rcu_head;
+       spinlock_t              busylock;
 };
 
 static inline bool qdisc_is_running(struct Qdisc *qdisc)
index 2733226d90b2584c6ba9361028a35294e9e4f15c..ffca5c1066fa50947c4e5b88a86a47cace0ec57b 100644 (file)
@@ -2040,8 +2040,18 @@ static inline int __dev_xmit_skb(struct sk_buff *skb, struct Qdisc *q,
                                 struct netdev_queue *txq)
 {
        spinlock_t *root_lock = qdisc_lock(q);
+       bool contended = qdisc_is_running(q);
        int rc;
 
+       /*
+        * Heuristic to force contended enqueues to serialize on a
+        * separate lock before trying to get qdisc main lock.
+        * This permits __QDISC_STATE_RUNNING owner to get the lock more often
+        * and dequeue packets faster.
+        */
+       if (unlikely(contended))
+               spin_lock(&q->busylock);
+
        spin_lock(root_lock);
        if (unlikely(test_bit(__QDISC_STATE_DEACTIVATED, &q->state))) {
                kfree_skb(skb);
@@ -2056,19 +2066,30 @@ static inline int __dev_xmit_skb(struct sk_buff *skb, struct Qdisc *q,
                if (!(dev->priv_flags & IFF_XMIT_DST_RELEASE))
                        skb_dst_force(skb);
                __qdisc_update_bstats(q, skb->len);
-               if (sch_direct_xmit(skb, q, dev, txq, root_lock))
+               if (sch_direct_xmit(skb, q, dev, txq, root_lock)) {
+                       if (unlikely(contended)) {
+                               spin_unlock(&q->busylock);
+                               contended = false;
+                       }
                        __qdisc_run(q);
-               else
+               else
                        qdisc_run_end(q);
 
                rc = NET_XMIT_SUCCESS;
        } else {
                skb_dst_force(skb);
                rc = qdisc_enqueue_root(skb, q);
-               qdisc_run(q);
+               if (qdisc_run_begin(q)) {
+                       if (unlikely(contended)) {
+                               spin_unlock(&q->busylock);
+                               contended = false;
+                       }
+                       __qdisc_run(q);
+               }
        }
        spin_unlock(root_lock);
-
+       if (unlikely(contended))
+               spin_unlock(&q->busylock);
        return rc;
 }
 
index 37b86eab6779b410cd7abd270b4d0961dc95d1ac..d20fcd2a55191a11ef7058748086d0f77c47e7ce 100644 (file)
@@ -561,6 +561,7 @@ struct Qdisc *qdisc_alloc(struct netdev_queue *dev_queue,
 
        INIT_LIST_HEAD(&sch->list);
        skb_queue_head_init(&sch->q);
+       spin_lock_init(&sch->busylock);
        sch->ops = ops;
        sch->enqueue = ops->enqueue;
        sch->dequeue = ops->dequeue;