[PATCH] RPC: separate TCP and UDP transport connection logic
authorChuck Lever <cel@citi.umich.edu>
Thu, 11 Aug 2005 20:25:53 +0000 (16:25 -0400)
committerTrond Myklebust <Trond.Myklebust@netapp.com>
Fri, 23 Sep 2005 16:38:29 +0000 (12:38 -0400)
 Create separate connection worker functions for managing UDP and TCP
 transport sockets.  This eliminates several dependencies on "xprt->stream".

 Test-plan:
 Destructive testing (unplugging the network temporarily).  Connectathon with
 v2, v3, and v4.

 Version: Thu, 11 Aug 2005 16:08:18 -0400

Signed-off-by: Chuck Lever <cel@netapp.com>
Signed-off-by: Trond Myklebust <Trond.Myklebust@netapp.com>
net/sunrpc/xprtsock.c

index 70a772d7a7966d9045bc535597f1011cca1d7243..f91529787b9bc885bffc66ed8a26a7831b53a844 100644 (file)
@@ -836,102 +836,118 @@ static int xs_bindresvport(struct rpc_xprt *xprt, struct socket *sock)
        return err;
 }
 
-static struct socket *xs_create(struct rpc_xprt *xprt, int proto, int resvport)
+/**
+ * xs_udp_connect_worker - set up a UDP socket
+ * @args: RPC transport to connect
+ *
+ * Invoked by a work queue tasklet.
+ */
+static void xs_udp_connect_worker(void *args)
 {
-       struct socket *sock;
-       int type, err;
-
-       dprintk("RPC:      xs_create(%s %d)\n",
-                          (proto == IPPROTO_UDP)? "udp" : "tcp", proto);
+       struct rpc_xprt *xprt = (struct rpc_xprt *) args;
+       struct socket *sock = xprt->sock;
+       int err, status = -EIO;
 
-       type = (proto == IPPROTO_UDP)? SOCK_DGRAM : SOCK_STREAM;
+       if (xprt->shutdown || xprt->addr.sin_port == 0)
+               goto out;
 
-       if ((err = sock_create_kern(PF_INET, type, proto, &sock)) < 0) {
-               dprintk("RPC:      can't create socket (%d).\n", -err);
-               return NULL;
-       }
+       dprintk("RPC:      xs_udp_connect_worker for xprt %p\n", xprt);
 
-       /* If the caller has the capability, bind to a reserved port */
-       if (resvport && xs_bindresvport(xprt, sock) < 0)
-               goto failed;
+       /* Start by resetting any existing state */
+       xs_close(xprt);
 
-       return sock;
+       if ((err = sock_create_kern(PF_INET, SOCK_DGRAM, IPPROTO_UDP, &sock)) < 0) {
+               dprintk("RPC:      can't create UDP transport socket (%d).\n", -err);
+               goto out;
+       }
 
-failed:
-       sock_release(sock);
-       return NULL;
-}
+       if (xprt->resvport && xs_bindresvport(xprt, sock) < 0) {
+               sock_release(sock);
+               goto out;
+       }
 
-static void xs_bind(struct rpc_xprt *xprt, struct socket *sock)
-{
-       struct sock *sk = sock->sk;
+       if (!xprt->inet) {
+               struct sock *sk = sock->sk;
 
-       if (xprt->inet)
-               return;
+               write_lock_bh(&sk->sk_callback_lock);
 
-       write_lock_bh(&sk->sk_callback_lock);
-       sk->sk_user_data = xprt;
-       xprt->old_data_ready = sk->sk_data_ready;
-       xprt->old_state_change = sk->sk_state_change;
-       xprt->old_write_space = sk->sk_write_space;
-       if (xprt->prot == IPPROTO_UDP) {
+               sk->sk_user_data = xprt;
+               xprt->old_data_ready = sk->sk_data_ready;
+               xprt->old_state_change = sk->sk_state_change;
+               xprt->old_write_space = sk->sk_write_space;
                sk->sk_data_ready = xs_udp_data_ready;
                sk->sk_write_space = xs_udp_write_space;
                sk->sk_no_check = UDP_CSUM_NORCV;
+
                xprt_set_connected(xprt);
-       } else {
-               tcp_sk(sk)->nonagle = 1;        /* disable Nagle's algorithm */
-               sk->sk_data_ready = xs_tcp_data_ready;
-               sk->sk_state_change = xs_tcp_state_change;
-               sk->sk_write_space = xs_tcp_write_space;
-               xprt_clear_connected(xprt);
-       }
 
-       /* Reset to new socket */
-       xprt->sock = sock;
-       xprt->inet = sk;
-       write_unlock_bh(&sk->sk_callback_lock);
+               /* Reset to new socket */
+               xprt->sock = sock;
+               xprt->inet = sk;
 
-       return;
+               write_unlock_bh(&sk->sk_callback_lock);
+       }
+       xs_set_buffer_size(xprt);
+       status = 0;
+out:
+       xprt_wake_pending_tasks(xprt, status);
+       xprt_clear_connecting(xprt);
 }
 
 /**
- * xs_connect_worker - try to connect a socket to a remote endpoint
+ * xs_tcp_connect_worker - connect a TCP socket to a remote endpoint
  * @args: RPC transport to connect
  *
  * Invoked by a work queue tasklet.
  */
-static void xs_connect_worker(void *args)
+static void xs_tcp_connect_worker(void *args)
 {
        struct rpc_xprt *xprt = (struct rpc_xprt *)args;
        struct socket *sock = xprt->sock;
-       int status = -EIO;
+       int err, status = -EIO;
 
        if (xprt->shutdown || xprt->addr.sin_port == 0)
                goto out;
 
-       dprintk("RPC:      xs_connect_worker xprt %p\n", xprt);
+       dprintk("RPC:      xs_tcp_connect_worker for xprt %p\n", xprt);
 
-       /*
-        * Start by resetting any existing state
-        */
+       /* Start by resetting any existing socket state */
        xs_close(xprt);
-       sock = xs_create(xprt, xprt->prot, xprt->resvport);
-       if (sock == NULL) {
-               /* couldn't create socket or bind to reserved port;
-                * this is likely a permanent error, so cause an abort */
+
+       if ((err = sock_create_kern(PF_INET, SOCK_STREAM, IPPROTO_TCP, &sock)) < 0) {
+               dprintk("RPC:      can't create TCP transport socket (%d).\n", -err);
                goto out;
        }
-       xs_bind(xprt, sock);
-       xs_set_buffer_size(xprt);
 
-       status = 0;
-       if (!xprt->stream)
+       if (xprt->resvport && xs_bindresvport(xprt, sock) < 0) {
+               sock_release(sock);
                goto out;
+       }
 
-       /*
-        * Tell the socket layer to start connecting...
-        */
+       if (!xprt->inet) {
+               struct sock *sk = sock->sk;
+
+               write_lock_bh(&sk->sk_callback_lock);
+
+               sk->sk_user_data = xprt;
+               xprt->old_data_ready = sk->sk_data_ready;
+               xprt->old_state_change = sk->sk_state_change;
+               xprt->old_write_space = sk->sk_write_space;
+               sk->sk_data_ready = xs_tcp_data_ready;
+               sk->sk_state_change = xs_tcp_state_change;
+               sk->sk_write_space = xs_tcp_write_space;
+               tcp_sk(sk)->nonagle = 1;
+
+               xprt_clear_connected(xprt);
+
+               /* Reset to new socket */
+               xprt->sock = sock;
+               xprt->inet = sk;
+
+               write_unlock_bh(&sk->sk_callback_lock);
+       }
+
+       /* Tell the socket layer to start connecting... */
        status = sock->ops->connect(sock, (struct sockaddr *) &xprt->addr,
                        sizeof(xprt->addr), O_NONBLOCK);
        dprintk("RPC: %p  connect status %d connected %d sock state %d\n",
@@ -959,18 +975,20 @@ static void xs_connect(struct rpc_task *task)
 {
        struct rpc_xprt *xprt = task->tk_xprt;
 
-       if (!xprt_test_and_set_connecting(xprt)) {
-               if (xprt->sock != NULL) {
-                       dprintk("RPC:      xs_connect delayed xprt %p\n", xprt);
-                       schedule_delayed_work(&xprt->connect_worker,
+       if (xprt_test_and_set_connecting(xprt))
+               return;
+
+       if (xprt->sock != NULL) {
+               dprintk("RPC:      xs_connect delayed xprt %p\n", xprt);
+               schedule_delayed_work(&xprt->connect_worker,
                                        RPC_REESTABLISH_TIMEOUT);
-               } else {
-                       dprintk("RPC:      xs_connect scheduled xprt %p\n", xprt);
-                       schedule_work(&xprt->connect_worker);
-                       /* flush_scheduled_work can sleep... */
-                       if (!RPC_IS_ASYNC(task))
-                               flush_scheduled_work();
-               }
+       } else {
+               dprintk("RPC:      xs_connect scheduled xprt %p\n", xprt);
+               schedule_work(&xprt->connect_worker);
+
+               /* flush_scheduled_work can sleep... */
+               if (!RPC_IS_ASYNC(task))
+                       flush_scheduled_work();
        }
 }
 
@@ -1013,7 +1031,7 @@ int xs_setup_udp(struct rpc_xprt *xprt, struct rpc_timeout *to)
        /* XXX: header size can vary due to auth type, IPv6, etc. */
        xprt->max_payload = (1U << 16) - (MAX_HEADER << 3);
 
-       INIT_WORK(&xprt->connect_worker, xs_connect_worker, xprt);
+       INIT_WORK(&xprt->connect_worker, xs_udp_connect_worker, xprt);
 
        xprt->ops = &xs_ops;
 
@@ -1052,7 +1070,7 @@ int xs_setup_tcp(struct rpc_xprt *xprt, struct rpc_timeout *to)
        xprt->resvport = capable(CAP_NET_BIND_SERVICE) ? 1 : 0;
        xprt->max_payload = (1U << 31) - 1;
 
-       INIT_WORK(&xprt->connect_worker, xs_connect_worker, xprt);
+       INIT_WORK(&xprt->connect_worker, xs_tcp_connect_worker, xprt);
 
        xprt->ops = &xs_ops;