NFS: Replace atomic_t variables in nfs_direct_req with a single spin lock
authorChuck Lever <cel@netapp.com>
Mon, 20 Mar 2006 18:44:34 +0000 (13:44 -0500)
committerTrond Myklebust <Trond.Myklebust@netapp.com>
Mon, 20 Mar 2006 18:44:34 +0000 (13:44 -0500)
Three atomic_t variables cause a lot of bus locking.  Because they are all
used in the same places in the code, just use a single spin lock.

Now that the atomic_t variables are gone, we can remove the request size
limitation since the code no longer depends on the limited width of atomic_t
on some platforms.

Test plan:
Compile with CONFIG_NFS and CONFIG_NFS_DIRECTIO enabled.  Millions of fsx
operations, iozone, OraSim.

Signed-off-by: Chuck Lever <cel@netapp.com>
Signed-off-by: Trond Myklebust <Trond.Myklebust@netapp.com>
fs/nfs/direct.c

index bcbc213b4033575bfbe3b617ad71dfd3626263f2..3de7c4b0796883f87c8d2f1744d8e2befa199c3b 100644 (file)
@@ -58,7 +58,6 @@
 #include "iostat.h"
 
 #define NFSDBG_FACILITY                NFSDBG_VFS
-#define MAX_DIRECTIO_SIZE      (4096UL << PAGE_SHIFT)
 
 static void nfs_free_user_pages(struct page **pages, int npages, int do_dirty);
 static kmem_cache_t *nfs_direct_cachep;
@@ -68,6 +67,8 @@ static kmem_cache_t *nfs_direct_cachep;
  */
 struct nfs_direct_req {
        struct kref             kref;           /* release manager */
+
+       /* I/O parameters */
        struct list_head        list;           /* nfs_read/write_data structs */
        struct file *           filp;           /* file descriptor */
        struct kiocb *          iocb;           /* controlling i/o request */
@@ -75,12 +76,14 @@ struct nfs_direct_req {
        struct inode *          inode;          /* target file of i/o */
        struct page **          pages;          /* pages in our buffer */
        unsigned int            npages;         /* count of pages */
-       atomic_t                complete,       /* i/os we're waiting for */
-                               count,          /* bytes actually processed */
+
+       /* completion state */
+       spinlock_t              lock;           /* protect completion state */
+       int                     outstanding;    /* i/os we're waiting for */
+       ssize_t                 count,          /* bytes actually processed */
                                error;          /* any reported error */
 };
 
-
 /**
  * nfs_direct_IO - NFS address space operation for direct I/O
  * @rw: direction (read or write)
@@ -110,12 +113,6 @@ static inline int nfs_get_user_pages(int rw, unsigned long user_addr, size_t siz
        unsigned long page_count;
        size_t array_size;
 
-       /* set an arbitrary limit to prevent type overflow */
-       if (size > MAX_DIRECTIO_SIZE) {
-               *pages = NULL;
-               return -EFBIG;
-       }
-
        page_count = (user_addr + size + PAGE_SIZE - 1) >> PAGE_SHIFT;
        page_count -= user_addr >> PAGE_SHIFT;
 
@@ -164,8 +161,10 @@ static inline struct nfs_direct_req *nfs_direct_req_alloc(void)
        init_waitqueue_head(&dreq->wait);
        INIT_LIST_HEAD(&dreq->list);
        dreq->iocb = NULL;
-       atomic_set(&dreq->count, 0);
-       atomic_set(&dreq->error, 0);
+       spin_lock_init(&dreq->lock);
+       dreq->outstanding = 0;
+       dreq->count = 0;
+       dreq->error = 0;
 
        return dreq;
 }
@@ -181,19 +180,18 @@ static void nfs_direct_req_release(struct kref *kref)
  */
 static ssize_t nfs_direct_wait(struct nfs_direct_req *dreq)
 {
-       int result = -EIOCBQUEUED;
+       ssize_t result = -EIOCBQUEUED;
 
        /* Async requests don't wait here */
        if (dreq->iocb)
                goto out;
 
-       result = wait_event_interruptible(dreq->wait,
-                                       (atomic_read(&dreq->complete) == 0));
+       result = wait_event_interruptible(dreq->wait, (dreq->outstanding == 0));
 
        if (!result)
-               result = atomic_read(&dreq->error);
+               result = dreq->error;
        if (!result)
-               result = atomic_read(&dreq->count);
+               result = dreq->count;
 
 out:
        kref_put(&dreq->kref, nfs_direct_req_release);
@@ -214,9 +212,9 @@ static void nfs_direct_complete(struct nfs_direct_req *dreq)
        nfs_free_user_pages(dreq->pages, dreq->npages, 1);
 
        if (dreq->iocb) {
-               long res = atomic_read(&dreq->error);
+               long res = (long) dreq->error;
                if (!res)
-                       res = atomic_read(&dreq->count);
+                       res = (long) dreq->count;
                aio_complete(dreq->iocb, res, 0);
        } else
                wake_up(&dreq->wait);
@@ -233,7 +231,6 @@ static struct nfs_direct_req *nfs_direct_read_alloc(size_t nbytes, size_t rsize)
 {
        struct list_head *list;
        struct nfs_direct_req *dreq;
-       unsigned int reads = 0;
        unsigned int rpages = (rsize + PAGE_CACHE_SIZE - 1) >> PAGE_CACHE_SHIFT;
 
        dreq = nfs_direct_req_alloc();
@@ -259,13 +256,12 @@ static struct nfs_direct_req *nfs_direct_read_alloc(size_t nbytes, size_t rsize)
                list_add(&data->pages, list);
 
                data->req = (struct nfs_page *) dreq;
-               reads++;
+               dreq->outstanding++;
                if (nbytes <= rsize)
                        break;
                nbytes -= rsize;
        }
        kref_get(&dreq->kref);
-       atomic_set(&dreq->complete, reads);
        return dreq;
 }
 
@@ -276,13 +272,21 @@ static void nfs_direct_read_result(struct rpc_task *task, void *calldata)
 
        if (nfs_readpage_result(task, data) != 0)
                return;
+
+       spin_lock(&dreq->lock);
+
        if (likely(task->tk_status >= 0))
-               atomic_add(data->res.count, &dreq->count);
+               dreq->count += data->res.count;
        else
-               atomic_set(&dreq->error, task->tk_status);
+               dreq->error = task->tk_status;
+
+       if (--dreq->outstanding) {
+               spin_unlock(&dreq->lock);
+               return;
+       }
 
-       if (unlikely(atomic_dec_and_test(&dreq->complete)))
-               nfs_direct_complete(dreq);
+       spin_unlock(&dreq->lock);
+       nfs_direct_complete(dreq);
 }
 
 static const struct rpc_call_ops nfs_read_direct_ops = {
@@ -388,7 +392,6 @@ static struct nfs_direct_req *nfs_direct_write_alloc(size_t nbytes, size_t wsize
 {
        struct list_head *list;
        struct nfs_direct_req *dreq;
-       unsigned int writes = 0;
        unsigned int wpages = (wsize + PAGE_CACHE_SIZE - 1) >> PAGE_CACHE_SHIFT;
 
        dreq = nfs_direct_req_alloc();
@@ -414,16 +417,19 @@ static struct nfs_direct_req *nfs_direct_write_alloc(size_t nbytes, size_t wsize
                list_add(&data->pages, list);
 
                data->req = (struct nfs_page *) dreq;
-               writes++;
+               dreq->outstanding++;
                if (nbytes <= wsize)
                        break;
                nbytes -= wsize;
        }
        kref_get(&dreq->kref);
-       atomic_set(&dreq->complete, writes);
        return dreq;
 }
 
+/*
+ * NB: Return the value of the first error return code.  Subsequent
+ *     errors after the first one are ignored.
+ */
 static void nfs_direct_write_result(struct rpc_task *task, void *calldata)
 {
        struct nfs_write_data *data = calldata;
@@ -436,15 +442,22 @@ static void nfs_direct_write_result(struct rpc_task *task, void *calldata)
        if (unlikely(data->res.verf->committed != NFS_FILE_SYNC))
                status = -EIO;
 
+       spin_lock(&dreq->lock);
+
        if (likely(status >= 0))
-               atomic_add(data->res.count, &dreq->count);
+               dreq->count += data->res.count;
        else
-               atomic_set(&dreq->error, status);
+               dreq->error = status;
 
-       if (unlikely(atomic_dec_and_test(&dreq->complete))) {
-               nfs_end_data_update(data->inode);
-               nfs_direct_complete(dreq);
+       if (--dreq->outstanding) {
+               spin_unlock(&dreq->lock);
+               return;
        }
+
+       spin_unlock(&dreq->lock);
+
+       nfs_end_data_update(data->inode);
+       nfs_direct_complete(dreq);
 }
 
 static const struct rpc_call_ops nfs_write_direct_ops = {