SUNRPC: Add helpers to prevent socket create from racing
authorTrond Myklebust <trond.myklebust@primarydata.com>
Sun, 8 Feb 2015 23:19:25 +0000 (18:19 -0500)
committerTrond Myklebust <trond.myklebust@primarydata.com>
Mon, 9 Feb 2015 02:47:29 +0000 (21:47 -0500)
The socket lock is currently held by the task that is requesting the
connection be established. While that is efficient in the case where
the connection happens quickly, it is racy in the case where it doesn't.
What we really want is for the connect helper to be able to block access
to the socket while it is being set up.

This patch does so by arranging to transfer the socket lock from the
task that is requesting the connect attempt, and then releasing that
lock once everything is done.
This scheme also gives us automatic protection against collisions with
the RPC close code, so we can kill the cancel_delayed_work_sync()
call in xs_close().

Signed-off-by: Trond Myklebust <trond.myklebust@primarydata.com>
include/linux/sunrpc/xprt.h
net/sunrpc/xprt.c
net/sunrpc/xprtsock.c

index 9d27ac45b909556ada39af3b4f0687b20e6f2d80..2926e618dbc603cdcdbf1d55d61fbefdc61e2980 100644 (file)
@@ -347,6 +347,9 @@ void                        xprt_force_disconnect(struct rpc_xprt *xprt);
 void                   xprt_conditional_disconnect(struct rpc_xprt *xprt, unsigned int cookie);
 int                    xs_swapper(struct rpc_xprt *xprt, int enable);
 
+bool                   xprt_lock_connect(struct rpc_xprt *, struct rpc_task *, void *);
+void                   xprt_unlock_connect(struct rpc_xprt *, void *);
+
 /*
  * Reserved bit positions in xprt->state
  */
index ebbefad21a370b7d4bb88b6c442d3fc04bfe1098..ff3574df834491c14afbe46dd9d8f43967689b1f 100644 (file)
@@ -690,6 +690,37 @@ out_abort:
        spin_unlock(&xprt->transport_lock);
 }
 
+bool xprt_lock_connect(struct rpc_xprt *xprt,
+               struct rpc_task *task,
+               void *cookie)
+{
+       bool ret = false;
+
+       spin_lock_bh(&xprt->transport_lock);
+       if (!test_bit(XPRT_LOCKED, &xprt->state))
+               goto out;
+       if (xprt->snd_task != task)
+               goto out;
+       xprt->snd_task = cookie;
+       ret = true;
+out:
+       spin_unlock_bh(&xprt->transport_lock);
+       return ret;
+}
+
+void xprt_unlock_connect(struct rpc_xprt *xprt, void *cookie)
+{
+       spin_lock_bh(&xprt->transport_lock);
+       if (xprt->snd_task != cookie)
+               goto out;
+       if (!test_bit(XPRT_LOCKED, &xprt->state))
+               goto out;
+       xprt->snd_task =NULL;
+       xprt->ops->release_xprt(xprt, NULL);
+out:
+       spin_unlock_bh(&xprt->transport_lock);
+}
+
 /**
  * xprt_connect - schedule a transport connect operation
  * @task: RPC task that is requesting the connect
@@ -712,9 +743,7 @@ void xprt_connect(struct rpc_task *task)
        if (test_and_clear_bit(XPRT_CLOSE_WAIT, &xprt->state))
                xprt->ops->close(xprt);
 
-       if (xprt_connected(xprt))
-               xprt_release_write(xprt, task);
-       else {
+       if (!xprt_connected(xprt)) {
                task->tk_rqstp->rq_bytes_sent = 0;
                task->tk_timeout = task->tk_rqstp->rq_timeout;
                rpc_sleep_on(&xprt->pending, task, xprt_connect_status);
@@ -726,6 +755,7 @@ void xprt_connect(struct rpc_task *task)
                xprt->stat.connect_start = jiffies;
                xprt->ops->connect(xprt, task);
        }
+       xprt_release_write(xprt, task);
 }
 
 static void xprt_connect_status(struct rpc_task *task)
@@ -758,7 +788,6 @@ static void xprt_connect_status(struct rpc_task *task)
                dprintk("RPC: %5u xprt_connect_status: error %d connecting to "
                                "server %s\n", task->tk_pid, -task->tk_status,
                                xprt->servername);
-               xprt_release_write(xprt, task);
                task->tk_status = -EIO;
        }
 }
index 0fa7ed93dc20537ca06479207aa8d48d48f22f92..e57d8ed2c4d8793bac3aaeaf03a1875f1217b24e 100644 (file)
@@ -852,8 +852,6 @@ static void xs_close(struct rpc_xprt *xprt)
 
        dprintk("RPC:       xs_close xprt %p\n", xprt);
 
-       cancel_delayed_work_sync(&transport->connect_worker);
-
        xs_reset_transport(transport);
        xprt->reestablish_timeout = 0;
 
@@ -2101,6 +2099,7 @@ static void xs_udp_setup_socket(struct work_struct *work)
        trace_rpc_socket_connect(xprt, sock, 0);
        status = 0;
 out:
+       xprt_unlock_connect(xprt, transport);
        xprt_clear_connecting(xprt);
        xprt_wake_pending_tasks(xprt, status);
 }
@@ -2286,6 +2285,7 @@ static void xs_tcp_setup_socket(struct work_struct *work)
        case 0:
        case -EINPROGRESS:
        case -EALREADY:
+               xprt_unlock_connect(xprt, transport);
                xprt_clear_connecting(xprt);
                return;
        case -EINVAL:
@@ -2303,6 +2303,7 @@ static void xs_tcp_setup_socket(struct work_struct *work)
 out_eagain:
        status = -EAGAIN;
 out:
+       xprt_unlock_connect(xprt, transport);
        xprt_clear_connecting(xprt);
        xprt_wake_pending_tasks(xprt, status);
 }
@@ -2325,6 +2326,8 @@ static void xs_connect(struct rpc_xprt *xprt, struct rpc_task *task)
 {
        struct sock_xprt *transport = container_of(xprt, struct sock_xprt, xprt);
 
+       WARN_ON_ONCE(!xprt_lock_connect(xprt, task, transport));
+
        if (transport->sock != NULL && !RPC_IS_SOFTCONN(task)) {
                dprintk("RPC:       xs_connect delayed xprt %p for %lu "
                                "seconds\n",