SUNRPC: Eliminate side effects from rpc_malloc
authorChuck Lever <chuck.lever@oracle.com>
Thu, 29 Mar 2007 20:47:58 +0000 (16:47 -0400)
committerTrond Myklebust <Trond.Myklebust@netapp.com>
Tue, 1 May 2007 05:17:11 +0000 (22:17 -0700)
Currently rpc_malloc sets req->rq_buffer internally.  Make this a more
generic interface:  return a pointer to the new buffer (or NULL) and
make the caller set req->rq_buffer and req->rq_bufsize.  This looks much
more like kmalloc and eliminates the side effects.

To fix a potential deadlock, this patch also replaces GFP_NOFS with
GFP_NOWAIT in rpc_malloc.  This prevents async RPCs from sleeping outside
the RPC's task scheduler while allocating their buffer.

Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
Signed-off-by: Trond Myklebust <Trond.Myklebust@netapp.com>
include/linux/sunrpc/sched.h
include/linux/sunrpc/xprt.h
net/sunrpc/clnt.c
net/sunrpc/sched.c
net/sunrpc/xprt.c

index 3069ecca0129e9c46bf69223870c0788183ea3b3..2047fb202a137c048264b77036b3072849d21f3c 100644 (file)
@@ -264,7 +264,7 @@ struct rpc_task *rpc_wake_up_next(struct rpc_wait_queue *);
 void           rpc_wake_up_status(struct rpc_wait_queue *, int);
 void           rpc_delay(struct rpc_task *, unsigned long);
 void *         rpc_malloc(struct rpc_task *, size_t);
-void           rpc_free(struct rpc_task *);
+void           rpc_free(void *);
 int            rpciod_up(void);
 void           rpciod_down(void);
 int            __rpc_wait_for_completion_task(struct rpc_task *task, int (*)(void *));
index 7aa29502b187f76f0838c9954604e6fb2af23c33..745afc1d30684fa7950338e85636fd42dfe96cc2 100644 (file)
@@ -114,7 +114,7 @@ struct rpc_xprt_ops {
        void            (*set_port)(struct rpc_xprt *xprt, unsigned short port);
        void            (*connect)(struct rpc_task *task);
        void *          (*buf_alloc)(struct rpc_task *task, size_t size);
-       void            (*buf_free)(struct rpc_task *task);
+       void            (*buf_free)(void *buffer);
        int             (*send_request)(struct rpc_task *task);
        void            (*set_retrans_timeout)(struct rpc_task *task);
        void            (*timer)(struct rpc_task *task);
index 12487aafaab57418b330130bb54c7f4a10561818..e7dc09ecc47078b9b14faf39bc9b205f554116d7 100644 (file)
@@ -774,7 +774,8 @@ call_allocate(struct rpc_task *task)
        req->rq_rcvsize = RPC_REPHDRSIZE + slack + proc->p_replen;
        req->rq_rcvsize <<= 2;
 
-       xprt->ops->buf_alloc(task, req->rq_callsize + req->rq_rcvsize);
+       req->rq_buffer = xprt->ops->buf_alloc(task,
+                                       req->rq_callsize + req->rq_rcvsize);
        if (req->rq_buffer != NULL)
                return;
 
index 6d87320074b1556f98280dde419157f6288a2cb7..4a53e94f813415ac757f5180bf0d20a3323ba2d2 100644 (file)
@@ -741,50 +741,53 @@ static void rpc_async_schedule(struct work_struct *work)
  * @task: RPC task that will use this buffer
  * @size: requested byte size
  *
- * We try to ensure that some NFS reads and writes can always proceed
- * by using a mempool when allocating 'small' buffers.
+ * To prevent rpciod from hanging, this allocator never sleeps,
+ * returning NULL if the request cannot be serviced immediately.
+ * The caller can arrange to sleep in a way that is safe for rpciod.
+ *
+ * Most requests are 'small' (under 2KiB) and can be serviced from a
+ * mempool, ensuring that NFS reads and writes can always proceed,
+ * and that there is good locality of reference for these buffers.
+ *
  * In order to avoid memory starvation triggering more writebacks of
- * NFS requests, we use GFP_NOFS rather than GFP_KERNEL.
+ * NFS requests, we avoid using GFP_KERNEL.
  */
-void * rpc_malloc(struct rpc_task *task, size_t size)
+void *rpc_malloc(struct rpc_task *task, size_t size)
 {
-       struct rpc_rqst *req = task->tk_rqstp;
-       gfp_t   gfp;
+       size_t *buf;
+       gfp_t gfp = RPC_IS_SWAPPER(task) ? GFP_ATOMIC : GFP_NOWAIT;
 
-       if (task->tk_flags & RPC_TASK_SWAPPER)
-               gfp = GFP_ATOMIC;
+       size += sizeof(size_t);
+       if (size <= RPC_BUFFER_MAXSIZE)
+               buf = mempool_alloc(rpc_buffer_mempool, gfp);
        else
-               gfp = GFP_NOFS;
-
-       if (size > RPC_BUFFER_MAXSIZE) {
-               req->rq_buffer = kmalloc(size, gfp);
-               if (req->rq_buffer)
-                       req->rq_bufsize = size;
-       } else {
-               req->rq_buffer = mempool_alloc(rpc_buffer_mempool, gfp);
-               if (req->rq_buffer)
-                       req->rq_bufsize = RPC_BUFFER_MAXSIZE;
-       }
-       return req->rq_buffer;
+               buf = kmalloc(size, gfp);
+       *buf = size;
+       dprintk("RPC: %5u allocated buffer of size %u at %p\n",
+                       task->tk_pid, size, buf);
+       return (void *) ++buf;
 }
 
 /**
  * rpc_free - free buffer allocated via rpc_malloc
- * @task: RPC task with a buffer to be freed
+ * @buffer: buffer to free
  *
  */
-void rpc_free(struct rpc_task *task)
+void rpc_free(void *buffer)
 {
-       struct rpc_rqst *req = task->tk_rqstp;
+       size_t size, *buf = (size_t *) buffer;
 
-       if (req->rq_buffer) {
-               if (req->rq_bufsize == RPC_BUFFER_MAXSIZE)
-                       mempool_free(req->rq_buffer, rpc_buffer_mempool);
-               else
-                       kfree(req->rq_buffer);
-               req->rq_buffer = NULL;
-               req->rq_bufsize = 0;
-       }
+       if (!buffer)
+               return;
+       size = *buf;
+       buf--;
+
+       dprintk("RPC:       freeing buffer of size %u at %p\n",
+                       size, buf);
+       if (size <= RPC_BUFFER_MAXSIZE)
+               mempool_free(buf, rpc_buffer_mempool);
+       else
+               kfree(buf);
 }
 
 /*
index 432ee92cf262c7f5687165fa95413c519e770c8c..81fe830da8aa639186c2cd2e45e87cd568191cda 100644 (file)
@@ -854,7 +854,7 @@ void xprt_release(struct rpc_task *task)
                mod_timer(&xprt->timer,
                                xprt->last_used + xprt->idle_timeout);
        spin_unlock_bh(&xprt->transport_lock);
-       xprt->ops->buf_free(task);
+       xprt->ops->buf_free(req->rq_buffer);
        task->tk_rqstp = NULL;
        if (req->rq_release_snd_buf)
                req->rq_release_snd_buf(req);