rxrpc: Maintain an extra ref on a conn for the cache list
authorDavid Howells <dhowells@redhat.com>
Thu, 30 Jun 2016 09:45:22 +0000 (10:45 +0100)
committerDavid Howells <dhowells@redhat.com>
Wed, 6 Jul 2016 09:50:04 +0000 (10:50 +0100)
Overhaul the usage count accounting for the rxrpc_connection struct to make
it easier to implement RCU access from the data_ready handler.

The problem is that currently we're using a lock to prevent the garbage
collector from trying to clean up a connection that we're contemplating
unidling.  We could just stick incoming packets on the connection we find,
but we've then got a problem that we may race when dispatching a work item
to process it as we need to give that a ref to prevent the rxrpc_connection
struct from disappearing in the meantime.

Further, incoming packets may get discarded if attached to an
rxrpc_connection struct that is going away.  Whilst this is not a total
disaster - the client will presumably resend - it would delay processing of
the call.  This would affect the AFS client filesystem's service manager
operation.

To this end:

 (1) We now maintain an extra count on the connection usage count whilst it
     is on the connection list.  This mean it is not in use when its
     refcount is 1.

 (2) When trying to reuse an old connection, we only increment the refcount
     if it is greater than 0.  If it is 0, we replace it in the tree with a
     new candidate connection.

 (3) Two connection flags are added to indicate whether or not a connection
     is in the local's client connection tree (used by sendmsg) or the
     peer's service connection tree (used by data_ready).  This makes sure
     that we don't try and remove a connection if it got replaced.

     The flags are tested under lock with the removal operation to prevent
     the reaper from killing the rxrpc_connection struct whilst someone
     else is trying to effect a replacement.

     This could probably be alleviated by using memory barriers between the
     flag set/test and the rb_tree ops.  The rb_tree op would still need to
     be under the lock, however.

 (4) When trying to reap an old connection, we try to flip the usage count
     from 1 to 0.  If it's not 1 at that point, then it must've come back
     to life temporarily and we ignore it.

Signed-off-by: David Howells <dhowells@redhat.com>
net/rxrpc/ar-internal.h
net/rxrpc/conn_client.c
net/rxrpc/conn_object.c
net/rxrpc/conn_service.c

index ad48f851b40cb2a1562a0bf96ba49c8362fe74be..d8e4d6e6a030dc4f472aa72257e9e485dacdd68d 100644 (file)
@@ -258,6 +258,8 @@ struct rxrpc_conn_parameters {
  */
 enum rxrpc_conn_flag {
        RXRPC_CONN_HAS_IDR,             /* Has a client conn ID assigned */
+       RXRPC_CONN_IN_SERVICE_CONNS,    /* Conn is in peer->service_conns */
+       RXRPC_CONN_IN_CLIENT_CONNS,     /* Conn is in local->client_conns */
 };
 
 /*
@@ -544,10 +546,10 @@ void __exit rxrpc_destroy_all_calls(void);
  */
 extern struct idr rxrpc_client_conn_ids;
 
-void rxrpc_put_client_connection_id(struct rxrpc_connection *);
 void rxrpc_destroy_client_conn_ids(void);
 int rxrpc_connect_call(struct rxrpc_call *, struct rxrpc_conn_parameters *,
                       struct sockaddr_rxrpc *, gfp_t);
+void rxrpc_unpublish_client_conn(struct rxrpc_connection *);
 
 /*
  * conn_event.c
@@ -609,6 +611,7 @@ static inline void rxrpc_queue_conn(struct rxrpc_connection *conn)
 struct rxrpc_connection *rxrpc_incoming_connection(struct rxrpc_local *,
                                                   struct sockaddr_rxrpc *,
                                                   struct sk_buff *);
+void rxrpc_unpublish_service_conn(struct rxrpc_connection *);
 
 /*
  * input.c
index 9180164a51aa9ce9d3809d9320a22174d5e1059d..aa21462f3236f15f5015558d0b408f54ed91a4aa 100644 (file)
@@ -84,7 +84,7 @@ error:
 /*
  * Release a connection ID for a client connection from the global pool.
  */
-void rxrpc_put_client_connection_id(struct rxrpc_connection *conn)
+static void rxrpc_put_client_connection_id(struct rxrpc_connection *conn)
 {
        if (test_bit(RXRPC_CONN_HAS_IDR, &conn->flags)) {
                spin_lock(&rxrpc_conn_id_lock);
@@ -278,12 +278,13 @@ int rxrpc_connect_call(struct rxrpc_call *call,
         * lock before dropping the client conn lock.
         */
        _debug("new conn");
+       set_bit(RXRPC_CONN_IN_CLIENT_CONNS, &candidate->flags);
+       rb_link_node(&candidate->client_node, parent, pp);
+       rb_insert_color(&candidate->client_node, &local->client_conns);
+attached:
        conn = candidate;
        candidate = NULL;
 
-       rb_link_node(&conn->client_node, parent, pp);
-       rb_insert_color(&conn->client_node, &local->client_conns);
-
        atomic_set(&conn->avail_chans, RXRPC_MAXCALLS - 1);
        spin_lock(&conn->channel_lock);
        spin_unlock(&local->client_conns_lock);
@@ -307,13 +308,22 @@ found_channel:
        _leave(" = %p {u=%d}", conn, atomic_read(&conn->usage));
        return 0;
 
-       /* We found a suitable connection already in existence.  Discard any
-        * candidate we may have allocated, and try to get a channel on this
-        * one.
+       /* We found a potentially suitable connection already in existence.  If
+        * we can reuse it (ie. its usage count hasn't been reduced to 0 by the
+        * reaper), discard any candidate we may have allocated, and try to get
+        * a channel on this one, otherwise we have to replace it.
         */
 found_extant_conn:
        _debug("found conn");
-       rxrpc_get_connection(conn);
+       if (!rxrpc_get_connection_maybe(conn)) {
+               set_bit(RXRPC_CONN_IN_CLIENT_CONNS, &candidate->flags);
+               rb_replace_node(&conn->client_node,
+                               &candidate->client_node,
+                               &local->client_conns);
+               clear_bit(RXRPC_CONN_IN_CLIENT_CONNS, &conn->flags);
+               goto attached;
+       }
+
        spin_unlock(&local->client_conns_lock);
 
        rxrpc_put_connection(candidate);
@@ -357,3 +367,19 @@ interrupted:
        _leave(" = -ERESTARTSYS");
        return -ERESTARTSYS;
 }
+
+/*
+ * Remove a client connection from the local endpoint's tree, thereby removing
+ * it as a target for reuse for new client calls.
+ */
+void rxrpc_unpublish_client_conn(struct rxrpc_connection *conn)
+{
+       struct rxrpc_local *local = conn->params.local;
+
+       spin_lock(&local->client_conns_lock);
+       if (test_and_clear_bit(RXRPC_CONN_IN_CLIENT_CONNS, &conn->flags))
+               rb_erase(&conn->client_node, &local->client_conns);
+       spin_unlock(&local->client_conns_lock);
+
+       rxrpc_put_client_connection_id(conn);
+}
index 8379e3748d131cb8bfe1fd71d5790aae804aa5ca..89bc6480b4e283e79e5da8cf26e8598f167d4d03 100644 (file)
@@ -49,7 +49,10 @@ struct rxrpc_connection *rxrpc_alloc_connection(gfp_t gfp)
                skb_queue_head_init(&conn->rx_queue);
                conn->security = &rxrpc_no_security;
                spin_lock_init(&conn->state_lock);
-               atomic_set(&conn->usage, 1);
+               /* We maintain an extra ref on the connection whilst it is
+                * on the rxrpc_connections list.
+                */
+               atomic_set(&conn->usage, 2);
                conn->debug_id = atomic_inc_return(&rxrpc_debug_id);
                atomic_set(&conn->avail_chans, RXRPC_MAXCALLS);
                conn->size_align = 4;
@@ -111,7 +114,7 @@ struct rxrpc_connection *rxrpc_find_connection(struct rxrpc_local *local,
        return NULL;
 
 found:
-       rxrpc_get_connection(conn);
+       conn = rxrpc_get_connection_maybe(conn);
        read_unlock_bh(&peer->conn_lock);
        _leave(" = %p", conn);
        return conn;
@@ -173,10 +176,10 @@ void rxrpc_put_connection(struct rxrpc_connection *conn)
        _enter("%p{u=%d,d=%d}",
               conn, atomic_read(&conn->usage), conn->debug_id);
 
-       ASSERTCMP(atomic_read(&conn->usage), >, 0);
+       ASSERTCMP(atomic_read(&conn->usage), >, 1);
 
        conn->put_time = ktime_get_seconds();
-       if (atomic_dec_and_test(&conn->usage)) {
+       if (atomic_dec_return(&conn->usage) == 1) {
                _debug("zombie");
                rxrpc_queue_delayed_work(&rxrpc_connection_reap, 0);
        }
@@ -216,59 +219,41 @@ static void rxrpc_destroy_connection(struct rcu_head *rcu)
 static void rxrpc_connection_reaper(struct work_struct *work)
 {
        struct rxrpc_connection *conn, *_p;
-       struct rxrpc_peer *peer;
-       unsigned long now, earliest, reap_time;
+       unsigned long reap_older_than, earliest, put_time, now;
 
        LIST_HEAD(graveyard);
 
        _enter("");
 
        now = ktime_get_seconds();
+       reap_older_than =  now - rxrpc_connection_expiry;
        earliest = ULONG_MAX;
 
        write_lock(&rxrpc_connection_lock);
        list_for_each_entry_safe(conn, _p, &rxrpc_connections, link) {
-               _debug("reap CONN %d { u=%d,t=%ld }",
-                      conn->debug_id, atomic_read(&conn->usage),
-                      (long) now - (long) conn->put_time);
-
-               if (likely(atomic_read(&conn->usage) > 0))
+               ASSERTCMP(atomic_read(&conn->usage), >, 0);
+               if (likely(atomic_read(&conn->usage) > 1))
                        continue;
 
-               if (rxrpc_conn_is_client(conn)) {
-                       struct rxrpc_local *local = conn->params.local;
-                       spin_lock(&local->client_conns_lock);
-                       reap_time = conn->put_time + rxrpc_connection_expiry;
-
-                       if (atomic_read(&conn->usage) > 0) {
-                               ;
-                       } else if (reap_time <= now) {
-                               list_move_tail(&conn->link, &graveyard);
-                               rxrpc_put_client_connection_id(conn);
-                               rb_erase(&conn->client_node,
-                                        &local->client_conns);
-                       } else if (reap_time < earliest) {
-                               earliest = reap_time;
-                       }
-
-                       spin_unlock(&local->client_conns_lock);
-               } else {
-                       peer = conn->params.peer;
-                       write_lock_bh(&peer->conn_lock);
-                       reap_time = conn->put_time + rxrpc_connection_expiry;
-
-                       if (atomic_read(&conn->usage) > 0) {
-                               ;
-                       } else if (reap_time <= now) {
-                               list_move_tail(&conn->link, &graveyard);
-                               rb_erase(&conn->service_node,
-                                        &peer->service_conns);
-                       } else if (reap_time < earliest) {
-                               earliest = reap_time;
-                       }
-
-                       write_unlock_bh(&peer->conn_lock);
+               put_time = READ_ONCE(conn->put_time);
+               if (time_after(put_time, reap_older_than)) {
+                       if (time_before(put_time, earliest))
+                               earliest = put_time;
+                       continue;
                }
+
+               /* The usage count sits at 1 whilst the object is unused on the
+                * list; we reduce that to 0 to make the object unavailable.
+                */
+               if (atomic_cmpxchg(&conn->usage, 1, 0) != 1)
+                       continue;
+
+               if (rxrpc_conn_is_client(conn))
+                       rxrpc_unpublish_client_conn(conn);
+               else
+                       rxrpc_unpublish_service_conn(conn);
+
+               list_move_tail(&conn->link, &graveyard);
        }
        write_unlock(&rxrpc_connection_lock);
 
@@ -279,7 +264,6 @@ static void rxrpc_connection_reaper(struct work_struct *work)
                                         (earliest - now) * HZ);
        }
 
-       /* then destroy all those pulled out */
        while (!list_empty(&graveyard)) {
                conn = list_entry(graveyard.next, struct rxrpc_connection,
                                  link);
index a42b210c40a551a4fc2684903d251319055d495b..77a509e6003a67c6c0c6c8f31786fc1df673bd3c 100644 (file)
@@ -104,10 +104,12 @@ struct rxrpc_connection *rxrpc_incoming_connection(struct rxrpc_local *local,
        }
 
        /* we can now add the new candidate to the list */
+       set_bit(RXRPC_CONN_IN_SERVICE_CONNS, &candidate->flags);
+       rb_link_node(&candidate->service_node, p, pp);
+       rb_insert_color(&candidate->service_node, &peer->service_conns);
+attached:
        conn = candidate;
        candidate = NULL;
-       rb_link_node(&conn->service_node, p, pp);
-       rb_insert_color(&conn->service_node, &peer->service_conns);
        rxrpc_get_peer(peer);
        rxrpc_get_local(local);
 
@@ -128,11 +130,19 @@ success:
 
        /* we found the connection in the list immediately */
 found_extant_connection:
+       if (!rxrpc_get_connection_maybe(conn)) {
+               set_bit(RXRPC_CONN_IN_SERVICE_CONNS, &candidate->flags);
+               rb_replace_node(&conn->service_node,
+                               &candidate->service_node,
+                               &peer->service_conns);
+               clear_bit(RXRPC_CONN_IN_SERVICE_CONNS, &conn->flags);
+               goto attached;
+       }
+
        if (sp->hdr.securityIndex != conn->security_ix) {
                read_unlock_bh(&peer->conn_lock);
-               goto security_mismatch;
+               goto security_mismatch_put;
        }
-       rxrpc_get_connection(conn);
        read_unlock_bh(&peer->conn_lock);
        goto success;
 
@@ -147,8 +157,24 @@ found_extant_second:
        kfree(candidate);
        goto success;
 
+security_mismatch_put:
+       rxrpc_put_connection(conn);
 security_mismatch:
        kfree(candidate);
        _leave(" = -EKEYREJECTED");
        return ERR_PTR(-EKEYREJECTED);
 }
+
+/*
+ * Remove the service connection from the peer's tree, thereby removing it as a
+ * target for incoming packets.
+ */
+void rxrpc_unpublish_service_conn(struct rxrpc_connection *conn)
+{
+       struct rxrpc_peer *peer = conn->params.peer;
+
+       write_lock_bh(&peer->conn_lock);
+       if (test_and_clear_bit(RXRPC_CONN_IN_SERVICE_CONNS, &conn->flags))
+               rb_erase(&conn->service_node, &peer->service_conns);
+       write_unlock_bh(&peer->conn_lock);
+}