ceph: fix splice read for no Fc capability case
authorYan, Zheng <zyan@redhat.com>
Tue, 8 Nov 2016 13:54:34 +0000 (21:54 +0800)
committerIlya Dryomov <idryomov@gmail.com>
Mon, 12 Dec 2016 22:54:27 +0000 (23:54 +0100)
When iov_iter type is ITER_PIPE, copy_page_to_iter() increases
the page's reference and add the page to a pipe_buffer. It also
set the pipe_buffer's ops to page_cache_pipe_buf_ops. The comfirm
callback in page_cache_pipe_buf_ops expects the page is from page
cache and uptodate, otherwise it return error.

For ceph_sync_read() case, pages are not from page cache. So we
can't call copy_page_to_iter() when iov_iter type is ITER_PIPE.
The fix is using iov_iter_get_pages_alloc() to allocate pages
for the pipe. (the code is similar to default_file_splice_read)

Signed-off-by: Yan, Zheng <zyan@redhat.com>
fs/ceph/file.c

index ae3cec5724d654a8fc80a7d49b99e0ad456f8ec2..12ce2b562d1492d74200a0151721a48dfd1b4bbc 100644 (file)
@@ -458,71 +458,60 @@ enum {
  * only return a short read to the caller if we hit EOF.
  */
 static int striped_read(struct inode *inode,
-                       u64 off, u64 len,
+                       u64 pos, u64 len,
                        struct page **pages, int num_pages,
-                       int *checkeof)
+                       int page_align, int *checkeof)
 {
        struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
        struct ceph_inode_info *ci = ceph_inode(inode);
-       u64 pos, this_len, left;
+       u64 this_len;
        loff_t i_size;
-       int page_align, pages_left;
-       int read, ret;
-       struct page **page_pos;
+       int page_idx;
+       int ret, read = 0;
        bool hit_stripe, was_short;
 
        /*
         * we may need to do multiple reads.  not atomic, unfortunately.
         */
-       pos = off;
-       left = len;
-       page_pos = pages;
-       pages_left = num_pages;
-       read = 0;
-
 more:
-       page_align = pos & ~PAGE_MASK;
-       this_len = left;
+       this_len = len;
+       page_idx = (page_align + read) >> PAGE_SHIFT;
        ret = ceph_osdc_readpages(&fsc->client->osdc, ceph_vino(inode),
                                  &ci->i_layout, pos, &this_len,
-                                 ci->i_truncate_seq,
-                                 ci->i_truncate_size,
-                                 page_pos, pages_left, page_align);
+                                 ci->i_truncate_seq, ci->i_truncate_size,
+                                 pages + page_idx, num_pages - page_idx,
+                                 ((page_align + read) & ~PAGE_MASK));
        if (ret == -ENOENT)
                ret = 0;
-       hit_stripe = this_len < left;
+       hit_stripe = this_len < len;
        was_short = ret >= 0 && ret < this_len;
-       dout("striped_read %llu~%llu (read %u) got %d%s%s\n", pos, left, read,
+       dout("striped_read %llu~%llu (read %u) got %d%s%s\n", pos, len, read,
             ret, hit_stripe ? " HITSTRIPE" : "", was_short ? " SHORT" : "");
 
        i_size = i_size_read(inode);
        if (ret >= 0) {
-               int didpages;
                if (was_short && (pos + ret < i_size)) {
                        int zlen = min(this_len - ret, i_size - pos - ret);
-                       int zoff = (off & ~PAGE_MASK) + read + ret;
+                       int zoff = page_align + read + ret;
                        dout(" zero gap %llu to %llu\n",
-                               pos + ret, pos + ret + zlen);
+                            pos + ret, pos + ret + zlen);
                        ceph_zero_page_vector_range(zoff, zlen, pages);
                        ret += zlen;
                }
 
-               didpages = (page_align + ret) >> PAGE_SHIFT;
+               read += ret;
                pos += ret;
-               read = pos - off;
-               left -= ret;
-               page_pos += didpages;
-               pages_left -= didpages;
+               len -= ret;
 
                /* hit stripe and need continue*/
-               if (left && hit_stripe && pos < i_size)
+               if (len && hit_stripe && pos < i_size)
                        goto more;
        }
 
        if (read > 0) {
                ret = read;
                /* did we bounce off eof? */
-               if (pos + left > i_size)
+               if (pos + len > i_size)
                        *checkeof = CHECK_EOF;
        }
 
@@ -536,15 +525,16 @@ more:
  *
  * If the read spans object boundary, just do multiple reads.
  */
-static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *i,
-                               int *checkeof)
+static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *to,
+                             int *checkeof)
 {
        struct file *file = iocb->ki_filp;
        struct inode *inode = file_inode(file);
        struct page **pages;
        u64 off = iocb->ki_pos;
-       int num_pages, ret;
-       size_t len = iov_iter_count(i);
+       int num_pages;
+       ssize_t ret;
+       size_t len = iov_iter_count(to);
 
        dout("sync_read on file %p %llu~%u %s\n", file, off,
             (unsigned)len,
@@ -563,35 +553,56 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *i,
        if (ret < 0)
                return ret;
 
-       num_pages = calc_pages_for(off, len);
-       pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
-       if (IS_ERR(pages))
-               return PTR_ERR(pages);
-       ret = striped_read(inode, off, len, pages,
-                               num_pages, checkeof);
-       if (ret > 0) {
-               int l, k = 0;
-               size_t left = ret;
-
-               while (left) {
-                       size_t page_off = off & ~PAGE_MASK;
-                       size_t copy = min_t(size_t, left,
-                                           PAGE_SIZE - page_off);
-                       l = copy_page_to_iter(pages[k++], page_off, copy, i);
-                       off += l;
-                       left -= l;
-                       if (l < copy)
-                               break;
+       if (unlikely(to->type & ITER_PIPE)) {
+               size_t page_off;
+               ret = iov_iter_get_pages_alloc(to, &pages, len,
+                                              &page_off);
+               if (ret <= 0)
+                       return -ENOMEM;
+               num_pages = DIV_ROUND_UP(ret + page_off, PAGE_SIZE);
+
+               ret = striped_read(inode, off, ret, pages, num_pages,
+                                  page_off, checkeof);
+               if (ret > 0) {
+                       iov_iter_advance(to, ret);
+                       off += ret;
+               } else {
+                       iov_iter_advance(to, 0);
+               }
+               ceph_put_page_vector(pages, num_pages, false);
+       } else {
+               num_pages = calc_pages_for(off, len);
+               pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
+               if (IS_ERR(pages))
+                       return PTR_ERR(pages);
+
+               ret = striped_read(inode, off, len, pages, num_pages,
+                                  (off & ~PAGE_MASK), checkeof);
+               if (ret > 0) {
+                       int l, k = 0;
+                       size_t left = ret;
+
+                       while (left) {
+                               size_t page_off = off & ~PAGE_MASK;
+                               size_t copy = min_t(size_t, left,
+                                                   PAGE_SIZE - page_off);
+                               l = copy_page_to_iter(pages[k++], page_off,
+                                                     copy, to);
+                               off += l;
+                               left -= l;
+                               if (l < copy)
+                                       break;
+                       }
                }
+               ceph_release_page_vector(pages, num_pages);
        }
-       ceph_release_page_vector(pages, num_pages);
 
        if (off > iocb->ki_pos) {
                ret = off - iocb->ki_pos;
                iocb->ki_pos = off;
        }
 
-       dout("sync_read result %d\n", ret);
+       dout("sync_read result %zd\n", ret);
        return ret;
 }
 
@@ -1771,6 +1782,7 @@ const struct file_operations ceph_file_fops = {
        .fsync = ceph_fsync,
        .lock = ceph_lock,
        .flock = ceph_flock,
+       .splice_read = generic_file_splice_read,
        .splice_write = iter_file_splice_write,
        .unlocked_ioctl = ceph_ioctl,
        .compat_ioctl   = ceph_ioctl,