SUNRPC: Remove the global temporary write buffer in net/sunrpc/cache.c
authorTrond Myklebust <Trond.Myklebust@netapp.com>
Sun, 9 Aug 2009 19:14:28 +0000 (15:14 -0400)
committerTrond Myklebust <Trond.Myklebust@netapp.com>
Sun, 9 Aug 2009 19:14:28 +0000 (15:14 -0400)
While we do want to protect against multiple concurrent readers and writers
on each upcall/downcall pipe, we don't want to limit concurrent reading and
writing to separate caches.

This patch therefore replaces the static buffer 'write_buf', which can only
be used by one writer at a time, with use of the page cache as the
temporary buffer for downcalls. We still fall back to using the the old
global buffer if the downcall is larger than PAGE_CACHE_SIZE, since this is
apparently needed by the SPKM security context initialisation.

It then replaces the use of the global 'queue_io_mutex' with the
inode->i_mutex in cache_read() and cache_write().

Signed-off-by: Trond Myklebust <Trond.Myklebust@netapp.com>
net/sunrpc/cache.c

index 062d4f4307eb1fad0252b473fdd2a2f8038fba81..c8e7d2d07260b46a63d51640942797ee76a9e841 100644 (file)
@@ -27,6 +27,7 @@
 #include <linux/net.h>
 #include <linux/workqueue.h>
 #include <linux/mutex.h>
+#include <linux/pagemap.h>
 #include <asm/ioctls.h>
 #include <linux/sunrpc/types.h>
 #include <linux/sunrpc/cache.h>
@@ -702,13 +703,14 @@ cache_read(struct file *filp, char __user *buf, size_t count, loff_t *ppos)
 {
        struct cache_reader *rp = filp->private_data;
        struct cache_request *rq;
-       struct cache_detail *cd = PDE(filp->f_path.dentry->d_inode)->data;
+       struct inode *inode = filp->f_path.dentry->d_inode;
+       struct cache_detail *cd = PDE(inode)->data;
        int err;
 
        if (count == 0)
                return 0;
 
-       mutex_lock(&queue_io_mutex); /* protect against multiple concurrent
+       mutex_lock(&inode->i_mutex); /* protect against multiple concurrent
                              * readers on this file */
  again:
        spin_lock(&queue_lock);
@@ -721,7 +723,7 @@ cache_read(struct file *filp, char __user *buf, size_t count, loff_t *ppos)
        }
        if (rp->q.list.next == &cd->queue) {
                spin_unlock(&queue_lock);
-               mutex_unlock(&queue_io_mutex);
+               mutex_unlock(&inode->i_mutex);
                BUG_ON(rp->offset);
                return 0;
        }
@@ -768,38 +770,81 @@ cache_read(struct file *filp, char __user *buf, size_t count, loff_t *ppos)
        }
        if (err == -EAGAIN)
                goto again;
-       mutex_unlock(&queue_io_mutex);
+       mutex_unlock(&inode->i_mutex);
        return err ? err :  count;
 }
 
-static char write_buf[8192]; /* protected by queue_io_mutex */
+static ssize_t cache_do_downcall(char *kaddr, const char __user *buf,
+                                size_t count, struct cache_detail *cd)
+{
+       ssize_t ret;
 
-static ssize_t
-cache_write(struct file *filp, const char __user *buf, size_t count,
-           loff_t *ppos)
+       if (copy_from_user(kaddr, buf, count))
+               return -EFAULT;
+       kaddr[count] = '\0';
+       ret = cd->cache_parse(cd, kaddr, count);
+       if (!ret)
+               ret = count;
+       return ret;
+}
+
+static ssize_t cache_slow_downcall(const char __user *buf,
+                                  size_t count, struct cache_detail *cd)
 {
-       int err;
-       struct cache_detail *cd = PDE(filp->f_path.dentry->d_inode)->data;
+       static char write_buf[8192]; /* protected by queue_io_mutex */
+       ssize_t ret = -EINVAL;
 
-       if (count == 0)
-               return 0;
        if (count >= sizeof(write_buf))
-               return -EINVAL;
-
+               goto out;
        mutex_lock(&queue_io_mutex);
+       ret = cache_do_downcall(write_buf, buf, count, cd);
+       mutex_unlock(&queue_io_mutex);
+out:
+       return ret;
+}
 
-       if (copy_from_user(write_buf, buf, count)) {
-               mutex_unlock(&queue_io_mutex);
-               return -EFAULT;
-       }
-       write_buf[count] = '\0';
-       if (cd->cache_parse)
-               err = cd->cache_parse(cd, write_buf, count);
-       else
-               err = -EINVAL;
+static ssize_t cache_downcall(struct address_space *mapping,
+                             const char __user *buf,
+                             size_t count, struct cache_detail *cd)
+{
+       struct page *page;
+       char *kaddr;
+       ssize_t ret = -ENOMEM;
+
+       if (count >= PAGE_CACHE_SIZE)
+               goto out_slow;
+
+       page = find_or_create_page(mapping, 0, GFP_KERNEL);
+       if (!page)
+               goto out_slow;
+
+       kaddr = kmap(page);
+       ret = cache_do_downcall(kaddr, buf, count, cd);
+       kunmap(page);
+       unlock_page(page);
+       page_cache_release(page);
+       return ret;
+out_slow:
+       return cache_slow_downcall(buf, count, cd);
+}
 
-       mutex_unlock(&queue_io_mutex);
-       return err ? err : count;
+static ssize_t
+cache_write(struct file *filp, const char __user *buf, size_t count,
+           loff_t *ppos)
+{
+       struct address_space *mapping = filp->f_mapping;
+       struct inode *inode = filp->f_path.dentry->d_inode;
+       struct cache_detail *cd = PDE(inode)->data;
+       ssize_t ret = -EINVAL;
+
+       if (!cd->cache_parse)
+               goto out;
+
+       mutex_lock(&inode->i_mutex);
+       ret = cache_downcall(mapping, buf, count, cd);
+       mutex_unlock(&inode->i_mutex);
+out:
+       return ret;
 }
 
 static DECLARE_WAIT_QUEUE_HEAD(queue_wait);