[PATCH] RPC: separate TCP and UDP write space callbacks
authorChuck Lever <cel@citi.umich.edu>
Thu, 11 Aug 2005 20:25:50 +0000 (16:25 -0400)
committerTrond Myklebust <Trond.Myklebust@netapp.com>
Fri, 23 Sep 2005 16:38:28 +0000 (12:38 -0400)
 Split the socket write space callback function into a TCP version and UDP
 version, eliminating one dependence on the "xprt->stream" variable.

 Keep the common pieces of this path in xprt.c so other transports can use
 it too.

 Test-plan:
 Write-intensive workload on a single mount point.

 Version: Thu, 11 Aug 2005 16:07:51 -0400

Signed-off-by: Chuck Lever <cel@netapp.com>
Signed-off-by: Trond Myklebust <Trond.Myklebust@netapp.com>
include/linux/sunrpc/xprt.h
net/sunrpc/xprt.c
net/sunrpc/xprtsock.c

index bfbc492ae36d5fe3a9cd94ffa5ff268455a0af08..e73174c7e4501d1b66edca5af2bb33e0cac6470b 100644 (file)
@@ -240,6 +240,8 @@ int                 xprt_destroy(struct rpc_xprt *xprt);
  * Transport switch helper functions
  */
 void                   xprt_wake_pending_tasks(struct rpc_xprt *xprt, int status);
+void                   xprt_wait_for_buffer_space(struct rpc_task *task);
+void                   xprt_write_space(struct rpc_xprt *xprt);
 struct rpc_rqst *      xprt_lookup_rqst(struct rpc_xprt *xprt, u32 xid);
 void                   xprt_complete_rqst(struct rpc_xprt *xprt, struct rpc_rqst *req, int copied);
 void                   xprt_disconnect(struct rpc_xprt *xprt);
index 247fa1ec870c3d97687b72540acea6cb0e895898..31ef7dc7eed6c492ed6502ef625f8b062d5753d7 100644 (file)
@@ -241,6 +241,40 @@ void xprt_wake_pending_tasks(struct rpc_xprt *xprt, int status)
                rpc_wake_up(&xprt->pending);
 }
 
+/**
+ * xprt_wait_for_buffer_space - wait for transport output buffer to clear
+ * @task: task to be put to sleep
+ *
+ */
+void xprt_wait_for_buffer_space(struct rpc_task *task)
+{
+       struct rpc_rqst *req = task->tk_rqstp;
+       struct rpc_xprt *xprt = req->rq_xprt;
+
+       task->tk_timeout = req->rq_timeout;
+       rpc_sleep_on(&xprt->pending, task, NULL, NULL);
+}
+
+/**
+ * xprt_write_space - wake the task waiting for transport output buffer space
+ * @xprt: transport with waiting tasks
+ *
+ * Can be called in a soft IRQ context, so xprt_write_space never sleeps.
+ */
+void xprt_write_space(struct rpc_xprt *xprt)
+{
+       if (unlikely(xprt->shutdown))
+               return;
+
+       spin_lock_bh(&xprt->transport_lock);
+       if (xprt->snd_task) {
+               dprintk("RPC:      write space: waking waiting task on xprt %p\n",
+                               xprt);
+               rpc_wake_up_task(xprt->snd_task);
+       }
+       spin_unlock_bh(&xprt->transport_lock);
+}
+
 static void xprt_reset_majortimeo(struct rpc_rqst *req)
 {
        struct rpc_timeout *to = &req->rq_xprt->timeout;
index 7f0b9f7f167bcad1c846710effdc9b025988cdba..70a772d7a7966d9045bc535597f1011cca1d7243 100644 (file)
@@ -308,15 +308,13 @@ static int xs_send_request(struct rpc_task *task)
 
        if (status == -EAGAIN) {
                if (test_bit(SOCK_ASYNC_NOSPACE, &xprt->sock->flags)) {
-                       /* Protect against races with xs_write_space */
+                       /* Protect against races with write_space */
                        spin_lock_bh(&xprt->transport_lock);
                        /* Don't race with disconnect */
                        if (!xprt_connected(xprt))
                                task->tk_status = -ENOTCONN;
-                       else if (test_bit(SOCK_NOSPACE, &xprt->sock->flags)) {
-                               task->tk_timeout = req->rq_timeout;
-                               rpc_sleep_on(&xprt->pending, task, NULL, NULL);
-                       }
+                       else if (test_bit(SOCK_NOSPACE, &xprt->sock->flags))
+                               xprt_wait_for_buffer_space(task);
                        spin_unlock_bh(&xprt->transport_lock);
                        return status;
                }
@@ -721,45 +719,68 @@ static void xs_tcp_state_change(struct sock *sk)
 }
 
 /**
- * xs_write_space - callback invoked when socket buffer space becomes
- *                         available
+ * xs_udp_write_space - callback invoked when socket buffer space
+ *                             becomes available
  * @sk: socket whose state has changed
  *
  * Called when more output buffer space is available for this socket.
  * We try not to wake our writers until they can make "significant"
- * progress, otherwise we'll waste resources thrashing sock_sendmsg
+ * progress, otherwise we'll waste resources thrashing kernel_sendmsg
  * with a bunch of small requests.
  */
-static void xs_write_space(struct sock *sk)
+static void xs_udp_write_space(struct sock *sk)
 {
-       struct rpc_xprt *xprt;
-       struct socket *sock;
-
        read_lock(&sk->sk_callback_lock);
-       if (!(xprt = xprt_from_sock(sk)) || !(sock = sk->sk_socket))
-               goto out;
-       if (xprt->shutdown)
-               goto out;
 
-       /* Wait until we have enough socket memory */
-       if (xprt->stream) {
-               /* from net/core/stream.c:sk_stream_write_space */
-               if (sk_stream_wspace(sk) < sk_stream_min_wspace(sk))
+       /* from net/core/sock.c:sock_def_write_space */
+       if (sock_writeable(sk)) {
+               struct socket *sock;
+               struct rpc_xprt *xprt;
+
+               if (unlikely(!(sock = sk->sk_socket)))
                        goto out;
-       } else {
-               /* from net/core/sock.c:sock_def_write_space */
-               if (!sock_writeable(sk))
+               if (unlikely(!(xprt = xprt_from_sock(sk))))
+                       goto out;
+               if (unlikely(!test_and_clear_bit(SOCK_NOSPACE, &sock->flags)))
                        goto out;
+
+               xprt_write_space(xprt);
        }
 
-       if (!test_and_clear_bit(SOCK_NOSPACE, &sock->flags))
-               goto out;
+ out:
+       read_unlock(&sk->sk_callback_lock);
+}
 
-       spin_lock_bh(&xprt->transport_lock);
-       if (xprt->snd_task)
-               rpc_wake_up_task(xprt->snd_task);
-       spin_unlock_bh(&xprt->transport_lock);
-out:
+/**
+ * xs_tcp_write_space - callback invoked when socket buffer space
+ *                             becomes available
+ * @sk: socket whose state has changed
+ *
+ * Called when more output buffer space is available for this socket.
+ * We try not to wake our writers until they can make "significant"
+ * progress, otherwise we'll waste resources thrashing kernel_sendmsg
+ * with a bunch of small requests.
+ */
+static void xs_tcp_write_space(struct sock *sk)
+{
+       read_lock(&sk->sk_callback_lock);
+
+       /* from net/core/stream.c:sk_stream_write_space */
+       if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk)) {
+               struct socket *sock;
+               struct rpc_xprt *xprt;
+
+               if (unlikely(!(sock = sk->sk_socket)))
+                       goto out;
+               if (unlikely(!(xprt = xprt_from_sock(sk))))
+                       goto out;
+               if (unlikely(!test_and_clear_bit(SOCK_NOSPACE, &sock->flags)))
+                       goto out;
+
+               xprt_write_space(xprt);
+       }
+
+ out:
        read_unlock(&sk->sk_callback_lock);
 }
 
@@ -855,15 +876,16 @@ static void xs_bind(struct rpc_xprt *xprt, struct socket *sock)
        xprt->old_write_space = sk->sk_write_space;
        if (xprt->prot == IPPROTO_UDP) {
                sk->sk_data_ready = xs_udp_data_ready;
+               sk->sk_write_space = xs_udp_write_space;
                sk->sk_no_check = UDP_CSUM_NORCV;
                xprt_set_connected(xprt);
        } else {
                tcp_sk(sk)->nonagle = 1;        /* disable Nagle's algorithm */
                sk->sk_data_ready = xs_tcp_data_ready;
                sk->sk_state_change = xs_tcp_state_change;
+               sk->sk_write_space = xs_tcp_write_space;
                xprt_clear_connected(xprt);
        }
-       sk->sk_write_space = xs_write_space;
 
        /* Reset to new socket */
        xprt->sock = sock;