ceph: try getting buffer capability for readahead/fadvise
authorYan, Zheng <zyan@redhat.com>
Tue, 25 Oct 2016 02:51:55 +0000 (10:51 +0800)
committerIlya Dryomov <idryomov@gmail.com>
Mon, 12 Dec 2016 22:54:27 +0000 (23:54 +0100)
For readahead/fadvise cases, caller of ceph_readpages does not
hold buffer capability. Pages can be added to page cache while
there is no buffer capability. This can cause data integrity
issue.

Signed-off-by: Yan, Zheng <zyan@redhat.com>
fs/ceph/addr.c
fs/ceph/caps.c
fs/ceph/file.c
fs/ceph/super.h

index ef3ebd780aff8bf8c9203ae366f3d4b688cd7af6..dbb5f7d6921635808eb19780ade7c1b3775254ef 100644 (file)
@@ -315,7 +315,32 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max)
        struct page **pages;
        pgoff_t next_index;
        int nr_pages = 0;
-       int ret;
+       int got = 0;
+       int ret = 0;
+
+       if (!current->journal_info) {
+               /* caller of readpages does not hold buffer and read caps
+                * (fadvise, madvise and readahead cases) */
+               int want = CEPH_CAP_FILE_CACHE;
+               ret = ceph_try_get_caps(ci, CEPH_CAP_FILE_RD, want, &got);
+               if (ret < 0) {
+                       dout("start_read %p, error getting cap\n", inode);
+               } else if (!(got & want)) {
+                       dout("start_read %p, no cache cap\n", inode);
+                       ret = 0;
+               }
+               if (ret <= 0) {
+                       if (got)
+                               ceph_put_cap_refs(ci, got);
+                       while (!list_empty(page_list)) {
+                               page = list_entry(page_list->prev,
+                                                 struct page, lru);
+                               list_del(&page->lru);
+                               put_page(page);
+                       }
+                       return ret;
+               }
+       }
 
        off = (u64) page_offset(page);
 
@@ -338,15 +363,18 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max)
                                    CEPH_OSD_FLAG_READ, NULL,
                                    ci->i_truncate_seq, ci->i_truncate_size,
                                    false);
-       if (IS_ERR(req))
-               return PTR_ERR(req);
+       if (IS_ERR(req)) {
+               ret = PTR_ERR(req);
+               goto out;
+       }
 
        /* build page vector */
        nr_pages = calc_pages_for(0, len);
        pages = kmalloc(sizeof(*pages) * nr_pages, GFP_KERNEL);
-       ret = -ENOMEM;
-       if (!pages)
-               goto out;
+       if (!pages) {
+               ret = -ENOMEM;
+               goto out_put;
+       }
        for (i = 0; i < nr_pages; ++i) {
                page = list_entry(page_list->prev, struct page, lru);
                BUG_ON(PageLocked(page));
@@ -378,6 +406,12 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max)
        if (ret < 0)
                goto out_pages;
        ceph_osdc_put_request(req);
+
+       /* After adding locked pages to page cache, the inode holds cache cap.
+        * So we can drop our cap refs. */
+       if (got)
+               ceph_put_cap_refs(ci, got);
+
        return nr_pages;
 
 out_pages:
@@ -386,8 +420,11 @@ out_pages:
                unlock_page(pages[i]);
        }
        ceph_put_page_vector(pages, nr_pages, false);
-out:
+out_put:
        ceph_osdc_put_request(req);
+out:
+       if (got)
+               ceph_put_cap_refs(ci, got);
        return ret;
 }
 
@@ -424,7 +461,6 @@ static int ceph_readpages(struct file *file, struct address_space *mapping,
                rc = start_read(inode, page_list, max);
                if (rc < 0)
                        goto out;
-               BUG_ON(rc == 0);
        }
 out:
        ceph_fscache_readpages_cancel(inode, page_list);
@@ -1371,9 +1407,11 @@ static int ceph_filemap_fault(struct vm_area_struct *vma, struct vm_fault *vmf)
             inode, off, (size_t)PAGE_SIZE, ceph_cap_string(got));
 
        if ((got & (CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO)) ||
-           ci->i_inline_version == CEPH_INLINE_NONE)
+           ci->i_inline_version == CEPH_INLINE_NONE) {
+               current->journal_info = vma->vm_file;
                ret = filemap_fault(vma, vmf);
-       else
+               current->journal_info = NULL;
+       } else
                ret = -EAGAIN;
 
        dout("filemap_fault %p %llu~%zd dropping cap refs on %s ret %d\n",
index 4037b389a7e901d910d3fd755994327aa9d692ab..edb407f38b40b82ef451b812843bb8331d74e802 100644 (file)
@@ -2479,6 +2479,27 @@ static void check_max_size(struct inode *inode, loff_t endoff)
                ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL);
 }
 
+int ceph_try_get_caps(struct ceph_inode_info *ci, int need, int want, int *got)
+{
+       int ret, err = 0;
+
+       BUG_ON(need & ~CEPH_CAP_FILE_RD);
+       BUG_ON(want & ~(CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO));
+       ret = ceph_pool_perm_check(ci, need);
+       if (ret < 0)
+               return ret;
+
+       ret = try_get_cap_refs(ci, need, want, 0, true, got, &err);
+       if (ret) {
+               if (err == -EAGAIN) {
+                       ret = 0;
+               } else if (err < 0) {
+                       ret = err;
+               }
+       }
+       return ret;
+}
+
 /*
  * Wait for caps, and take cap references.  If we can't get a WR cap
  * due to a small max_size, make sure we check_max_size (and possibly
index 10cd6acad44c15ec1615d8f22b94baf891eb4617..ae3cec5724d654a8fc80a7d49b99e0ad456f8ec2 100644 (file)
@@ -1249,8 +1249,9 @@ again:
                dout("aio_read %p %llx.%llx %llu~%u got cap refs on %s\n",
                     inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len,
                     ceph_cap_string(got));
-
+               current->journal_info = filp;
                ret = generic_file_read_iter(iocb, to);
+               current->journal_info = NULL;
        }
        dout("aio_read %p %llx.%llx dropping cap refs on %s = %d\n",
             inode, ceph_vinop(inode), ceph_cap_string(got), (int)ret);
index 3e3fa916305968e837fa02ae473efe63b8f366d6..622d5dd9f61694c7e4b774342a83df5376f889fc 100644 (file)
@@ -905,6 +905,8 @@ extern int ceph_encode_dentry_release(void **p, struct dentry *dn,
 
 extern int ceph_get_caps(struct ceph_inode_info *ci, int need, int want,
                         loff_t endoff, int *got, struct page **pinned_page);
+extern int ceph_try_get_caps(struct ceph_inode_info *ci,
+                            int need, int want, int *got);
 
 /* for counting open files by mode */
 extern void __ceph_get_fmode(struct ceph_inode_info *ci, int mode);