rds: use RCU to protect the connection hash
authorChris Mason <chris.mason@oracle.com>
Tue, 11 May 2010 22:15:15 +0000 (15:15 -0700)
committerAndy Grover <andy.grover@oracle.com>
Thu, 9 Sep 2010 01:15:12 +0000 (18:15 -0700)
The connection hash was almost entirely RCU ready, this
just makes the final couple of changes to use RCU instead
of spinlocks for everything.

Signed-off-by: Chris Mason <chris.mason@oracle.com>
net/rds/connection.c

index 87df15b9f8e4b1072120f252520371f5e8c66146..180b83ab260754191296c6eeb29748c3ee3e4f8c 100644 (file)
@@ -62,6 +62,7 @@ static struct hlist_head *rds_conn_bucket(__be32 laddr, __be32 faddr)
                var |= RDS_INFO_CONNECTION_FLAG_##suffix;       \
 } while (0)
 
+/* rcu read lock must be held or the connection spinlock */
 static struct rds_connection *rds_conn_lookup(struct hlist_head *head,
                                              __be32 laddr, __be32 faddr,
                                              struct rds_transport *trans)
@@ -69,7 +70,7 @@ static struct rds_connection *rds_conn_lookup(struct hlist_head *head,
        struct rds_connection *conn, *ret = NULL;
        struct hlist_node *pos;
 
-       hlist_for_each_entry(conn, pos, head, c_hash_node) {
+       hlist_for_each_entry_rcu(conn, pos, head, c_hash_node) {
                if (conn->c_faddr == faddr && conn->c_laddr == laddr &&
                                conn->c_trans == trans) {
                        ret = conn;
@@ -119,7 +120,8 @@ static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr,
        unsigned long flags;
        int ret;
 
-       spin_lock_irqsave(&rds_conn_lock, flags);
+
+       rcu_read_lock();
        conn = rds_conn_lookup(head, laddr, faddr, trans);
        if (conn && conn->c_loopback && conn->c_trans != &rds_loop_transport &&
            !is_outgoing) {
@@ -130,7 +132,7 @@ static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr,
                parent = conn;
                conn = parent->c_passive;
        }
-       spin_unlock_irqrestore(&rds_conn_lock, flags);
+       rcu_read_unlock();
        if (conn)
                goto out;
 
@@ -227,7 +229,7 @@ static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr,
                        kmem_cache_free(rds_conn_slab, conn);
                        conn = found;
                } else {
-                       hlist_add_head(&conn->c_hash_node, head);
+                       hlist_add_head_rcu(&conn->c_hash_node, head);
                        rds_cong_add_conn(conn);
                        rds_conn_count++;
                }
@@ -306,8 +308,13 @@ void rds_conn_shutdown(struct rds_connection *conn)
         * to the conn hash, so we never trigger a reconnect on this
         * conn - the reconnect is always triggered by the active peer. */
        cancel_delayed_work_sync(&conn->c_conn_w);
-       if (!hlist_unhashed(&conn->c_hash_node))
+       rcu_read_lock();
+       if (!hlist_unhashed(&conn->c_hash_node)) {
+               rcu_read_unlock();
                rds_queue_reconnect(conn);
+       } else {
+               rcu_read_unlock();
+       }
 }
 
 /*
@@ -323,14 +330,12 @@ void rds_conn_destroy(struct rds_connection *conn)
 
        /* Ensure conn will not be scheduled for reconnect */
        spin_lock_irq(&rds_conn_lock);
-       hlist_del_init(&conn->c_hash_node);
+       hlist_del_init_rcu(&conn->c_hash_node);
        spin_unlock_irq(&rds_conn_lock);
 
-       /* wait for the rds thread to shut it down */
-       atomic_set(&conn->c_state, RDS_CONN_ERROR);
-       cancel_delayed_work(&conn->c_conn_w);
-       queue_work(rds_wq, &conn->c_down_w);
-       flush_workqueue(rds_wq);
+       synchronize_rcu();
+
+       rds_conn_shutdown(conn);
 
        /* tear down queued messages */
        list_for_each_entry_safe(rm, rtmp,
@@ -369,17 +374,16 @@ static void rds_conn_message_info(struct socket *sock, unsigned int len,
        struct list_head *list;
        struct rds_connection *conn;
        struct rds_message *rm;
-       unsigned long flags;
        unsigned int total = 0;
        size_t i;
 
        len /= sizeof(struct rds_info_message);
 
-       spin_lock_irqsave(&rds_conn_lock, flags);
+       rcu_read_lock();
 
        for (i = 0, head = rds_conn_hash; i < ARRAY_SIZE(rds_conn_hash);
             i++, head++) {
-               hlist_for_each_entry(conn, pos, head, c_hash_node) {
+               hlist_for_each_entry_rcu(conn, pos, head, c_hash_node) {
                        if (want_send)
                                list = &conn->c_send_queue;
                        else
@@ -399,8 +403,7 @@ static void rds_conn_message_info(struct socket *sock, unsigned int len,
                        spin_unlock(&conn->c_lock);
                }
        }
-
-       spin_unlock_irqrestore(&rds_conn_lock, flags);
+       rcu_read_unlock();
 
        lens->nr = total;
        lens->each = sizeof(struct rds_info_message);
@@ -430,19 +433,17 @@ void rds_for_each_conn_info(struct socket *sock, unsigned int len,
        uint64_t buffer[(item_len + 7) / 8];
        struct hlist_head *head;
        struct hlist_node *pos;
-       struct hlist_node *tmp;
        struct rds_connection *conn;
-       unsigned long flags;
        size_t i;
 
-       spin_lock_irqsave(&rds_conn_lock, flags);
+       rcu_read_lock();
 
        lens->nr = 0;
        lens->each = item_len;
 
        for (i = 0, head = rds_conn_hash; i < ARRAY_SIZE(rds_conn_hash);
             i++, head++) {
-               hlist_for_each_entry_safe(conn, pos, tmp, head, c_hash_node) {
+               hlist_for_each_entry_rcu(conn, pos, head, c_hash_node) {
 
                        /* XXX no c_lock usage.. */
                        if (!visitor(conn, buffer))
@@ -458,8 +459,7 @@ void rds_for_each_conn_info(struct socket *sock, unsigned int len,
                        lens->nr++;
                }
        }
-
-       spin_unlock_irqrestore(&rds_conn_lock, flags);
+       rcu_read_unlock();
 }
 EXPORT_SYMBOL_GPL(rds_for_each_conn_info);