RDS: make sure not to loop forever inside rds_send_xmit
authorSowmini Varadhan <sowmini.varadhan@oracle.com>
Wed, 8 Apr 2015 16:33:47 +0000 (12:33 -0400)
committerDavid S. Miller <davem@davemloft.net>
Wed, 8 Apr 2015 19:17:32 +0000 (15:17 -0400)
If a determined set of concurrent senders keep the send queue full,
we can loop forever inside rds_send_xmit.  This fix has two parts.

First we are dropping out of the while(1) loop after we've processed a
large batch of messages.

Second we add a generation number that gets bumped each time the
xmit bit lock is acquired.  If someone else has jumped in and
made progress in the queue, we skip our goto restart.

Original patch by Chris Mason.

Signed-off-by: Sowmini Varadhan <sowmini.varadhan@oracle.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
net/rds/connection.c
net/rds/rds.h
net/rds/send.c

index 7952a5b1b4c4966985b08cc577c9dd0c0de6dbc0..14f041398ca1744ea7596decaad7145184c7df0c 100644 (file)
@@ -193,6 +193,7 @@ static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr,
        }
 
        atomic_set(&conn->c_state, RDS_CONN_DOWN);
+       conn->c_send_gen = 0;
        conn->c_reconnect_jiffies = 0;
        INIT_DELAYED_WORK(&conn->c_send_w, rds_send_worker);
        INIT_DELAYED_WORK(&conn->c_recv_w, rds_recv_worker);
index c2a5eef41343c816f70e6dc16fc7a2fa9ea2d684..02d8fd5b40c08336bfd115ccdbc782357ca262c1 100644 (file)
@@ -110,6 +110,7 @@ struct rds_connection {
        void                    *c_transport_data;
 
        atomic_t                c_state;
+       unsigned long           c_send_gen;
        unsigned long           c_flags;
        unsigned long           c_reconnect_jiffies;
        struct delayed_work     c_send_w;
index 42f65d4305c88dc5db279fec71d5fb844e708cc1..49f77efd82b9783260cda91a58b5b62ea9e98d1c 100644 (file)
@@ -140,8 +140,11 @@ int rds_send_xmit(struct rds_connection *conn)
        struct scatterlist *sg;
        int ret = 0;
        LIST_HEAD(to_be_dropped);
+       int batch_count;
+       unsigned long send_gen = 0;
 
 restart:
+       batch_count = 0;
 
        /*
         * sendmsg calls here after having queued its message on the send
@@ -156,6 +159,17 @@ restart:
                goto out;
        }
 
+       /*
+        * we record the send generation after doing the xmit acquire.
+        * if someone else manages to jump in and do some work, we'll use
+        * this to avoid a goto restart farther down.
+        *
+        * The acquire_in_xmit() check above ensures that only one
+        * caller can increment c_send_gen at any time.
+        */
+       conn->c_send_gen++;
+       send_gen = conn->c_send_gen;
+
        /*
         * rds_conn_shutdown() sets the conn state and then tests RDS_IN_XMIT,
         * we do the opposite to avoid races.
@@ -202,6 +216,16 @@ restart:
                if (!rm) {
                        unsigned int len;
 
+                       batch_count++;
+
+                       /* we want to process as big a batch as we can, but
+                        * we also want to avoid softlockups.  If we've been
+                        * through a lot of messages, lets back off and see
+                        * if anyone else jumps in
+                        */
+                       if (batch_count >= 1024)
+                               goto over_batch;
+
                        spin_lock_irqsave(&conn->c_lock, flags);
 
                        if (!list_empty(&conn->c_send_queue)) {
@@ -357,9 +381,9 @@ restart:
                }
        }
 
+over_batch:
        if (conn->c_trans->xmit_complete)
                conn->c_trans->xmit_complete(conn);
-
        release_in_xmit(conn);
 
        /* Nuke any messages we decided not to retransmit. */
@@ -380,10 +404,15 @@ restart:
         * If the transport cannot continue (i.e ret != 0), then it must
         * call us when more room is available, such as from the tx
         * completion handler.
+        *
+        * We have an extra generation check here so that if someone manages
+        * to jump in after our release_in_xmit, we'll see that they have done
+        * some work and we will skip our goto
         */
        if (ret == 0) {
                smp_mb();
-               if (!list_empty(&conn->c_send_queue)) {
+               if (!list_empty(&conn->c_send_queue) &&
+                   send_gen == conn->c_send_gen) {
                        rds_stats_inc(s_send_lock_queue_raced);
                        goto restart;
                }