Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/mszeredi...
authorLinus Torvalds <torvalds@linux-foundation.org>
Tue, 7 May 2013 17:12:32 +0000 (10:12 -0700)
committerLinus Torvalds <torvalds@linux-foundation.org>
Tue, 7 May 2013 17:12:32 +0000 (10:12 -0700)
Pull fuse updates from Miklos Szeredi:
 "This contains two patchsets from Maxim Patlasov.

  The first reworks the request throttling so that only async requests
  are throttled.  Wakeup of waiting async requests is also optimized.

  The second series adds support for async processing of direct IO which
  optimizes direct IO and enables the use of the AIO userspace
  interface."

* 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/mszeredi/fuse:
  fuse: add flag to turn on async direct IO
  fuse: truncate file if async dio failed
  fuse: optimize short direct reads
  fuse: enable asynchronous processing direct IO
  fuse: make fuse_direct_io() aware about AIO
  fuse: add support of async IO
  fuse: move fuse_release_user_pages() up
  fuse: optimize wake_up
  fuse: implement exclusive wakeup for blocked_waitq
  fuse: skip blocking on allocations of synchronous requests
  fuse: add flag fc->initialized
  fuse: make request allocations for background processing explicit

1  2 
fs/fuse/dev.c
fs/fuse/file.c

diff --combined fs/fuse/dev.c
index 9bfd1a3214e663fb579ad3523f0f22eb082e5926,07bdb14ae7a3a40ddc5b2d59c657f8210aaa9214..a6c1664e330b0e8610eb3faf3d14df6c8ed3eec2
@@@ -111,7 -111,7 +111,7 @@@ static void restore_sigs(sigset_t *olds
        sigprocmask(SIG_SETMASK, oldset, NULL);
  }
  
static void __fuse_get_request(struct fuse_req *req)
+ void __fuse_get_request(struct fuse_req *req)
  {
        atomic_inc(&req->count);
  }
@@@ -130,20 -130,30 +130,30 @@@ static void fuse_req_init_context(struc
        req->in.h.pid = current->pid;
  }
  
- struct fuse_req *fuse_get_req(struct fuse_conn *fc, unsigned npages)
+ static bool fuse_block_alloc(struct fuse_conn *fc, bool for_background)
+ {
+       return !fc->initialized || (for_background && fc->blocked);
+ }
+ static struct fuse_req *__fuse_get_req(struct fuse_conn *fc, unsigned npages,
+                                      bool for_background)
  {
        struct fuse_req *req;
-       sigset_t oldset;
-       int intr;
        int err;
        atomic_inc(&fc->num_waiting);
-       block_sigs(&oldset);
-       intr = wait_event_interruptible(fc->blocked_waitq, !fc->blocked);
-       restore_sigs(&oldset);
-       err = -EINTR;
-       if (intr)
-               goto out;
+       if (fuse_block_alloc(fc, for_background)) {
+               sigset_t oldset;
+               int intr;
+               block_sigs(&oldset);
+               intr = wait_event_interruptible_exclusive(fc->blocked_waitq,
+                               !fuse_block_alloc(fc, for_background));
+               restore_sigs(&oldset);
+               err = -EINTR;
+               if (intr)
+                       goto out;
+       }
  
        err = -ENOTCONN;
        if (!fc->connected)
  
        req = fuse_request_alloc(npages);
        err = -ENOMEM;
-       if (!req)
+       if (!req) {
+               if (for_background)
+                       wake_up(&fc->blocked_waitq);
                goto out;
+       }
  
        fuse_req_init_context(req);
        req->waiting = 1;
+       req->background = for_background;
        return req;
  
   out:
        atomic_dec(&fc->num_waiting);
        return ERR_PTR(err);
  }
+ struct fuse_req *fuse_get_req(struct fuse_conn *fc, unsigned npages)
+ {
+       return __fuse_get_req(fc, npages, false);
+ }
  EXPORT_SYMBOL_GPL(fuse_get_req);
  
+ struct fuse_req *fuse_get_req_for_background(struct fuse_conn *fc,
+                                            unsigned npages)
+ {
+       return __fuse_get_req(fc, npages, true);
+ }
+ EXPORT_SYMBOL_GPL(fuse_get_req_for_background);
  /*
   * Return request in fuse_file->reserved_req.  However that may
   * currently be in use.  If that is the case, wait for it to become
@@@ -225,19 -251,31 +251,31 @@@ struct fuse_req *fuse_get_req_nofail_no
        struct fuse_req *req;
  
        atomic_inc(&fc->num_waiting);
-       wait_event(fc->blocked_waitq, !fc->blocked);
+       wait_event(fc->blocked_waitq, fc->initialized);
        req = fuse_request_alloc(0);
        if (!req)
                req = get_reserved_req(fc, file);
  
        fuse_req_init_context(req);
        req->waiting = 1;
+       req->background = 0;
        return req;
  }
  
  void fuse_put_request(struct fuse_conn *fc, struct fuse_req *req)
  {
        if (atomic_dec_and_test(&req->count)) {
+               if (unlikely(req->background)) {
+                       /*
+                        * We get here in the unlikely case that a background
+                        * request was allocated but not sent
+                        */
+                       spin_lock(&fc->lock);
+                       if (!fc->blocked)
+                               wake_up(&fc->blocked_waitq);
+                       spin_unlock(&fc->lock);
+               }
                if (req->waiting)
                        atomic_dec(&fc->num_waiting);
  
@@@ -335,10 -373,15 +373,15 @@@ __releases(fc->lock
        list_del(&req->intr_entry);
        req->state = FUSE_REQ_FINISHED;
        if (req->background) {
-               if (fc->num_background == fc->max_background) {
+               req->background = 0;
+               if (fc->num_background == fc->max_background)
                        fc->blocked = 0;
-                       wake_up_all(&fc->blocked_waitq);
-               }
+               /* Wake up next waiter, if any */
+               if (!fc->blocked && waitqueue_active(&fc->blocked_waitq))
+                       wake_up(&fc->blocked_waitq);
                if (fc->num_background == fc->congestion_threshold &&
                    fc->connected && fc->bdi_initialized) {
                        clear_bdi_congested(&fc->bdi, BLK_RW_SYNC);
@@@ -442,6 -485,7 +485,7 @@@ __acquires(fc->lock
  
  static void __fuse_request_send(struct fuse_conn *fc, struct fuse_req *req)
  {
+       BUG_ON(req->background);
        spin_lock(&fc->lock);
        if (!fc->connected)
                req->out.h.error = -ENOTCONN;
@@@ -469,7 -513,7 +513,7 @@@ EXPORT_SYMBOL_GPL(fuse_request_send)
  static void fuse_request_send_nowait_locked(struct fuse_conn *fc,
                                            struct fuse_req *req)
  {
-       req->background = 1;
+       BUG_ON(!req->background);
        fc->num_background++;
        if (fc->num_background == fc->max_background)
                fc->blocked = 1;
@@@ -1319,7 -1363,7 +1363,7 @@@ static ssize_t fuse_dev_splice_read(str
                page_nr++;
                ret += buf->len;
  
 -              if (pipe->inode)
 +              if (pipe->files)
                        do_wakeup = 1;
        }
  
@@@ -2071,6 -2115,7 +2115,7 @@@ void fuse_abort_conn(struct fuse_conn *
        if (fc->connected) {
                fc->connected = 0;
                fc->blocked = 0;
+               fc->initialized = 1;
                end_io_requests(fc);
                end_queued_requests(fc);
                end_polls(fc);
@@@ -2089,6 -2134,7 +2134,7 @@@ int fuse_dev_release(struct inode *inod
                spin_lock(&fc->lock);
                fc->connected = 0;
                fc->blocked = 0;
+               fc->initialized = 1;
                end_queued_requests(fc);
                end_polls(fc);
                wake_up_all(&fc->blocked_waitq);
diff --combined fs/fuse/file.c
index d15c6f21c17f6eefe1d90f16e77d59afe045394b,6ab7ca43f9e07aadde0ef49058b07910fe5b784d..4655e59d545b88f7d1494652dd580968a1ff7572
@@@ -126,11 -126,13 +126,13 @@@ static void fuse_file_put(struct fuse_f
                struct fuse_req *req = ff->reserved_req;
  
                if (sync) {
+                       req->background = 0;
                        fuse_request_send(ff->fc, req);
                        path_put(&req->misc.release.path);
                        fuse_put_request(ff->fc, req);
                } else {
                        req->end = fuse_release_end;
+                       req->background = 1;
                        fuse_request_send_background(ff->fc, req);
                }
                kfree(ff);
@@@ -282,6 -284,7 +284,7 @@@ void fuse_sync_release(struct fuse_fil
        WARN_ON(atomic_read(&ff->count) > 1);
        fuse_prepare_release(ff, flags, FUSE_RELEASE);
        ff->reserved_req->force = 1;
+       ff->reserved_req->background = 0;
        fuse_request_send(ff->fc, ff->reserved_req);
        fuse_put_request(ff->fc, ff->reserved_req);
        kfree(ff);
@@@ -491,9 -494,115 +494,115 @@@ void fuse_read_fill(struct fuse_req *re
        req->out.args[0].size = count;
  }
  
- static size_t fuse_send_read(struct fuse_req *req, struct file *file,
+ static void fuse_release_user_pages(struct fuse_req *req, int write)
+ {
+       unsigned i;
+       for (i = 0; i < req->num_pages; i++) {
+               struct page *page = req->pages[i];
+               if (write)
+                       set_page_dirty_lock(page);
+               put_page(page);
+       }
+ }
+ /**
+  * In case of short read, the caller sets 'pos' to the position of
+  * actual end of fuse request in IO request. Otherwise, if bytes_requested
+  * == bytes_transferred or rw == WRITE, the caller sets 'pos' to -1.
+  *
+  * An example:
+  * User requested DIO read of 64K. It was splitted into two 32K fuse requests,
+  * both submitted asynchronously. The first of them was ACKed by userspace as
+  * fully completed (req->out.args[0].size == 32K) resulting in pos == -1. The
+  * second request was ACKed as short, e.g. only 1K was read, resulting in
+  * pos == 33K.
+  *
+  * Thus, when all fuse requests are completed, the minimal non-negative 'pos'
+  * will be equal to the length of the longest contiguous fragment of
+  * transferred data starting from the beginning of IO request.
+  */
+ static void fuse_aio_complete(struct fuse_io_priv *io, int err, ssize_t pos)
+ {
+       int left;
+       spin_lock(&io->lock);
+       if (err)
+               io->err = io->err ? : err;
+       else if (pos >= 0 && (io->bytes < 0 || pos < io->bytes))
+               io->bytes = pos;
+       left = --io->reqs;
+       spin_unlock(&io->lock);
+       if (!left) {
+               long res;
+               if (io->err)
+                       res = io->err;
+               else if (io->bytes >= 0 && io->write)
+                       res = -EIO;
+               else {
+                       res = io->bytes < 0 ? io->size : io->bytes;
+                       if (!is_sync_kiocb(io->iocb)) {
+                               struct path *path = &io->iocb->ki_filp->f_path;
+                               struct inode *inode = path->dentry->d_inode;
+                               struct fuse_conn *fc = get_fuse_conn(inode);
+                               struct fuse_inode *fi = get_fuse_inode(inode);
+                               spin_lock(&fc->lock);
+                               fi->attr_version = ++fc->attr_version;
+                               spin_unlock(&fc->lock);
+                       }
+               }
+               aio_complete(io->iocb, res, 0);
+               kfree(io);
+       }
+ }
+ static void fuse_aio_complete_req(struct fuse_conn *fc, struct fuse_req *req)
+ {
+       struct fuse_io_priv *io = req->io;
+       ssize_t pos = -1;
+       fuse_release_user_pages(req, !io->write);
+       if (io->write) {
+               if (req->misc.write.in.size != req->misc.write.out.size)
+                       pos = req->misc.write.in.offset - io->offset +
+                               req->misc.write.out.size;
+       } else {
+               if (req->misc.read.in.size != req->out.args[0].size)
+                       pos = req->misc.read.in.offset - io->offset +
+                               req->out.args[0].size;
+       }
+       fuse_aio_complete(io, req->out.h.error, pos);
+ }
+ static size_t fuse_async_req_send(struct fuse_conn *fc, struct fuse_req *req,
+               size_t num_bytes, struct fuse_io_priv *io)
+ {
+       spin_lock(&io->lock);
+       io->size += num_bytes;
+       io->reqs++;
+       spin_unlock(&io->lock);
+       req->io = io;
+       req->end = fuse_aio_complete_req;
+       __fuse_get_request(req);
+       fuse_request_send_background(fc, req);
+       return num_bytes;
+ }
+ static size_t fuse_send_read(struct fuse_req *req, struct fuse_io_priv *io,
                             loff_t pos, size_t count, fl_owner_t owner)
  {
+       struct file *file = io->file;
        struct fuse_file *ff = file->private_data;
        struct fuse_conn *fc = ff->fc;
  
                inarg->read_flags |= FUSE_READ_LOCKOWNER;
                inarg->lock_owner = fuse_lock_owner_id(fc, owner);
        }
+       if (io->async)
+               return fuse_async_req_send(fc, req, count, io);
        fuse_request_send(fc, req);
        return req->out.args[0].size;
  }
@@@ -524,6 -637,7 +637,7 @@@ static void fuse_read_update_size(struc
  
  static int fuse_readpage(struct file *file, struct page *page)
  {
+       struct fuse_io_priv io = { .async = 0, .file = file };
        struct inode *inode = page->mapping->host;
        struct fuse_conn *fc = get_fuse_conn(inode);
        struct fuse_req *req;
        req->num_pages = 1;
        req->pages[0] = page;
        req->page_descs[0].length = count;
-       num_read = fuse_send_read(req, file, pos, count, NULL);
+       num_read = fuse_send_read(req, &io, pos, count, NULL);
        err = req->out.h.error;
        fuse_put_request(fc, req);
  
@@@ -661,7 -775,12 +775,12 @@@ static int fuse_readpages_fill(void *_d
                int nr_alloc = min_t(unsigned, data->nr_pages,
                                     FUSE_MAX_PAGES_PER_REQ);
                fuse_send_readpages(req, data->file);
-               data->req = req = fuse_get_req(fc, nr_alloc);
+               if (fc->async_read)
+                       req = fuse_get_req_for_background(fc, nr_alloc);
+               else
+                       req = fuse_get_req(fc, nr_alloc);
+               data->req = req;
                if (IS_ERR(req)) {
                        unlock_page(page);
                        return PTR_ERR(req);
@@@ -696,7 -815,10 +815,10 @@@ static int fuse_readpages(struct file *
  
        data.file = file;
        data.inode = inode;
-       data.req = fuse_get_req(fc, nr_alloc);
+       if (fc->async_read)
+               data.req = fuse_get_req_for_background(fc, nr_alloc);
+       else
+               data.req = fuse_get_req(fc, nr_alloc);
        data.nr_pages = nr_pages;
        err = PTR_ERR(data.req);
        if (IS_ERR(data.req))
@@@ -758,9 -880,10 +880,10 @@@ static void fuse_write_fill(struct fuse
        req->out.args[0].value = outarg;
  }
  
- static size_t fuse_send_write(struct fuse_req *req, struct file *file,
+ static size_t fuse_send_write(struct fuse_req *req, struct fuse_io_priv *io,
                              loff_t pos, size_t count, fl_owner_t owner)
  {
+       struct file *file = io->file;
        struct fuse_file *ff = file->private_data;
        struct fuse_conn *fc = ff->fc;
        struct fuse_write_in *inarg = &req->misc.write.in;
                inarg->write_flags |= FUSE_WRITE_LOCKOWNER;
                inarg->lock_owner = fuse_lock_owner_id(fc, owner);
        }
+       if (io->async)
+               return fuse_async_req_send(fc, req, count, io);
        fuse_request_send(fc, req);
        return req->misc.write.out.size;
  }
@@@ -794,11 -921,12 +921,12 @@@ static size_t fuse_send_write_pages(str
        size_t res;
        unsigned offset;
        unsigned i;
+       struct fuse_io_priv io = { .async = 0, .file = file };
  
        for (i = 0; i < req->num_pages; i++)
                fuse_wait_on_page_writeback(inode, req->pages[i]->index);
  
-       res = fuse_send_write(req, file, pos, count, NULL);
+       res = fuse_send_write(req, &io, pos, count, NULL);
  
        offset = req->page_descs[0].offset;
        count = res;
@@@ -971,6 -1099,7 +1099,6 @@@ static ssize_t fuse_file_aio_write(stru
                return err;
  
        count = ocount;
 -      sb_start_write(inode->i_sb);
        mutex_lock(&inode->i_mutex);
  
        /* We can write back this queue in page reclaim */
  out:
        current->backing_dev_info = NULL;
        mutex_unlock(&inode->i_mutex);
 -      sb_end_write(inode->i_sb);
  
        return written ? written : err;
  }
  
- static void fuse_release_user_pages(struct fuse_req *req, int write)
- {
-       unsigned i;
-       for (i = 0; i < req->num_pages; i++) {
-               struct page *page = req->pages[i];
-               if (write)
-                       set_page_dirty_lock(page);
-               put_page(page);
-       }
- }
  static inline void fuse_page_descs_length_init(struct fuse_req *req,
                unsigned index, unsigned nr_pages)
  {
@@@ -1146,10 -1264,11 +1262,11 @@@ static inline int fuse_iter_npages(cons
        return min(npages, FUSE_MAX_PAGES_PER_REQ);
  }
  
- ssize_t fuse_direct_io(struct file *file, const struct iovec *iov,
+ ssize_t fuse_direct_io(struct fuse_io_priv *io, const struct iovec *iov,
                       unsigned long nr_segs, size_t count, loff_t *ppos,
                       int write)
  {
+       struct file *file = io->file;
        struct fuse_file *ff = file->private_data;
        struct fuse_conn *fc = ff->fc;
        size_t nmax = write ? fc->max_write : fc->max_read;
                }
  
                if (write)
-                       nres = fuse_send_write(req, file, pos, nbytes, owner);
+                       nres = fuse_send_write(req, io, pos, nbytes, owner);
                else
-                       nres = fuse_send_read(req, file, pos, nbytes, owner);
+                       nres = fuse_send_read(req, io, pos, nbytes, owner);
  
-               fuse_release_user_pages(req, !write);
+               if (!io->async)
+                       fuse_release_user_pages(req, !write);
                if (req->out.h.error) {
                        if (!res)
                                res = req->out.h.error;
  }
  EXPORT_SYMBOL_GPL(fuse_direct_io);
  
- static ssize_t __fuse_direct_read(struct file *file, const struct iovec *iov,
-                                 unsigned long nr_segs, loff_t *ppos)
+ static ssize_t __fuse_direct_read(struct fuse_io_priv *io,
+                                 const struct iovec *iov,
+                                 unsigned long nr_segs, loff_t *ppos,
+                                 size_t count)
  {
        ssize_t res;
+       struct file *file = io->file;
        struct inode *inode = file_inode(file);
  
        if (is_bad_inode(inode))
                return -EIO;
  
-       res = fuse_direct_io(file, iov, nr_segs, iov_length(iov, nr_segs),
-                            ppos, 0);
+       res = fuse_direct_io(io, iov, nr_segs, count, ppos, 0);
  
        fuse_invalidate_attr(inode);
  
  static ssize_t fuse_direct_read(struct file *file, char __user *buf,
                                     size_t count, loff_t *ppos)
  {
+       struct fuse_io_priv io = { .async = 0, .file = file };
        struct iovec iov = { .iov_base = buf, .iov_len = count };
-       return __fuse_direct_read(file, &iov, 1, ppos);
+       return __fuse_direct_read(&io, &iov, 1, ppos, count);
  }
  
- static ssize_t __fuse_direct_write(struct file *file, const struct iovec *iov,
+ static ssize_t __fuse_direct_write(struct fuse_io_priv *io,
+                                  const struct iovec *iov,
                                   unsigned long nr_segs, loff_t *ppos)
  {
+       struct file *file = io->file;
        struct inode *inode = file_inode(file);
        size_t count = iov_length(iov, nr_segs);
        ssize_t res;
  
        res = generic_write_checks(file, ppos, &count, 0);
-       if (!res) {
-               res = fuse_direct_io(file, iov, nr_segs, count, ppos, 1);
-               if (res > 0)
-                       fuse_write_update_size(inode, *ppos);
-       }
+       if (!res)
+               res = fuse_direct_io(io, iov, nr_segs, count, ppos, 1);
  
        fuse_invalidate_attr(inode);
  
@@@ -1258,13 -1380,16 +1378,16 @@@ static ssize_t fuse_direct_write(struc
        struct iovec iov = { .iov_base = (void __user *)buf, .iov_len = count };
        struct inode *inode = file_inode(file);
        ssize_t res;
+       struct fuse_io_priv io = { .async = 0, .file = file };
  
        if (is_bad_inode(inode))
                return -EIO;
  
        /* Don't allow parallel writes to the same file */
        mutex_lock(&inode->i_mutex);
-       res = __fuse_direct_write(file, &iov, 1, ppos);
+       res = __fuse_direct_write(&io, &iov, 1, ppos);
+       if (res > 0)
+               fuse_write_update_size(inode, *ppos);
        mutex_unlock(&inode->i_mutex);
  
        return res;
@@@ -1373,6 -1498,7 +1496,7 @@@ static int fuse_writepage_locked(struc
        if (!req)
                goto err;
  
+       req->background = 1; /* writeback always goes to bg_queue */
        tmp_page = alloc_page(GFP_NOFS | __GFP_HIGHMEM);
        if (!tmp_page)
                goto err_free;
@@@ -2226,21 -2352,93 +2350,93 @@@ int fuse_notify_poll_wakeup(struct fuse
        return 0;
  }
  
+ static void fuse_do_truncate(struct file *file)
+ {
+       struct inode *inode = file->f_mapping->host;
+       struct iattr attr;
+       attr.ia_valid = ATTR_SIZE;
+       attr.ia_size = i_size_read(inode);
+       attr.ia_file = file;
+       attr.ia_valid |= ATTR_FILE;
+       fuse_do_setattr(inode, &attr, file);
+ }
  static ssize_t
  fuse_direct_IO(int rw, struct kiocb *iocb, const struct iovec *iov,
                        loff_t offset, unsigned long nr_segs)
  {
        ssize_t ret = 0;
-       struct file *file = NULL;
+       struct file *file = iocb->ki_filp;
+       struct fuse_file *ff = file->private_data;
        loff_t pos = 0;
+       struct inode *inode;
+       loff_t i_size;
+       size_t count = iov_length(iov, nr_segs);
+       struct fuse_io_priv *io;
  
-       file = iocb->ki_filp;
        pos = offset;
+       inode = file->f_mapping->host;
+       i_size = i_size_read(inode);
+       /* optimization for short read */
+       if (rw != WRITE && offset + count > i_size) {
+               if (offset >= i_size)
+                       return 0;
+               count = i_size - offset;
+       }
+       io = kmalloc(sizeof(struct fuse_io_priv), GFP_KERNEL);
+       if (!io)
+               return -ENOMEM;
+       spin_lock_init(&io->lock);
+       io->reqs = 1;
+       io->bytes = -1;
+       io->size = 0;
+       io->offset = offset;
+       io->write = (rw == WRITE);
+       io->err = 0;
+       io->file = file;
+       /*
+        * By default, we want to optimize all I/Os with async request
+        * submission to the client filesystem if supported.
+        */
+       io->async = ff->fc->async_dio;
+       io->iocb = iocb;
+       /*
+        * We cannot asynchronously extend the size of a file. We have no method
+        * to wait on real async I/O requests, so we must submit this request
+        * synchronously.
+        */
+       if (!is_sync_kiocb(iocb) && (offset + count > i_size))
+               io->async = false;
  
        if (rw == WRITE)
-               ret = __fuse_direct_write(file, iov, nr_segs, &pos);
+               ret = __fuse_direct_write(io, iov, nr_segs, &pos);
        else
-               ret = __fuse_direct_read(file, iov, nr_segs, &pos);
+               ret = __fuse_direct_read(io, iov, nr_segs, &pos, count);
+       if (io->async) {
+               fuse_aio_complete(io, ret < 0 ? ret : 0, -1);
+               /* we have a non-extending, async request, so return */
+               if (ret > 0 && !is_sync_kiocb(iocb))
+                       return -EIOCBQUEUED;
+               ret = wait_on_sync_kiocb(iocb);
+       } else {
+               kfree(io);
+       }
+       if (rw == WRITE) {
+               if (ret > 0)
+                       fuse_write_update_size(inode, pos);
+               else if (ret < 0 && offset + count > i_size)
+                       fuse_do_truncate(file);
+       }
  
        return ret;
  }