ceph: fetch inline data when getting Fcr cap refs
authorYan, Zheng <zyan@redhat.com>
Fri, 14 Nov 2014 14:10:07 +0000 (22:10 +0800)
committerIlya Dryomov <idryomov@redhat.com>
Wed, 17 Dec 2014 17:09:52 +0000 (20:09 +0300)
we can't use getattr to fetch inline data after getting Fcr caps,
because it can cause deadlock. The solution is try bringing inline
data to page cache when not holding any cap, and hope the inline
data page is still there after getting the Fcr caps. If the page
is still there, pin it in page cache for later IO.

Signed-off-by: Yan, Zheng <zyan@redhat.com>
fs/ceph/addr.c
fs/ceph/caps.c
fs/ceph/file.c

index 4a3f55f27ab4d524fc1950935494cee94fe6e8e2..5d2b88e3ff0bb4784f77a5a683bfe6ac79ca7436 100644 (file)
@@ -1207,6 +1207,7 @@ static int ceph_filemap_fault(struct vm_area_struct *vma, struct vm_fault *vmf)
        struct inode *inode = file_inode(vma->vm_file);
        struct ceph_inode_info *ci = ceph_inode(inode);
        struct ceph_file_info *fi = vma->vm_file->private_data;
+       struct page *pinned_page = NULL;
        loff_t off = vmf->pgoff << PAGE_CACHE_SHIFT;
        int want, got, ret;
 
@@ -1218,7 +1219,8 @@ static int ceph_filemap_fault(struct vm_area_struct *vma, struct vm_fault *vmf)
                want = CEPH_CAP_FILE_CACHE;
        while (1) {
                got = 0;
-               ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, &got, -1);
+               ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, -1,
+                                   &got, &pinned_page);
                if (ret == 0)
                        break;
                if (ret != -ERESTARTSYS) {
@@ -1233,6 +1235,8 @@ static int ceph_filemap_fault(struct vm_area_struct *vma, struct vm_fault *vmf)
 
        dout("filemap_fault %p %llu~%zd dropping cap refs on %s ret %d\n",
             inode, off, (size_t)PAGE_CACHE_SIZE, ceph_cap_string(got), ret);
+       if (pinned_page)
+               page_cache_release(pinned_page);
        ceph_put_cap_refs(ci, got);
 
        return ret;
@@ -1266,7 +1270,8 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf)
                want = CEPH_CAP_FILE_BUFFER;
        while (1) {
                got = 0;
-               ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, &got, off + len);
+               ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, off + len,
+                                   &got, NULL);
                if (ret == 0)
                        break;
                if (ret != -ERESTARTSYS) {
index 6372eb9ce491f5ef9c58a9c05e6304db8247acf1..795afe304871b66f7110343268f64b0380703b13 100644 (file)
@@ -2057,15 +2057,17 @@ static void __take_cap_refs(struct ceph_inode_info *ci, int got)
  * requested from the MDS.
  */
 static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want,
-                           int *got, loff_t endoff, int *check_max, int *err)
+                           loff_t endoff, int *got, struct page **pinned_page,
+                           int *check_max, int *err)
 {
        struct inode *inode = &ci->vfs_inode;
        int ret = 0;
-       int have, implemented;
+       int have, implemented, _got = 0;
        int file_wanted;
 
        dout("get_cap_refs %p need %s want %s\n", inode,
             ceph_cap_string(need), ceph_cap_string(want));
+again:
        spin_lock(&ci->i_ceph_lock);
 
        /* make sure file is actually open */
@@ -2075,7 +2077,7 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want,
                     ceph_cap_string(need), ceph_cap_string(file_wanted));
                *err = -EBADF;
                ret = 1;
-               goto out;
+               goto out_unlock;
        }
 
        /* finish pending truncate */
@@ -2095,7 +2097,7 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want,
                                *check_max = 1;
                                ret = 1;
                        }
-                       goto out;
+                       goto out_unlock;
                }
                /*
                 * If a sync write is in progress, we must wait, so that we
@@ -2103,7 +2105,7 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want,
                 */
                if (__ceph_have_pending_cap_snap(ci)) {
                        dout("get_cap_refs %p cap_snap_pending\n", inode);
-                       goto out;
+                       goto out_unlock;
                }
        }
 
@@ -2120,18 +2122,50 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want,
                     inode, ceph_cap_string(have), ceph_cap_string(not),
                     ceph_cap_string(revoking));
                if ((revoking & not) == 0) {
-                       *got = need | (have & want);
-                       __take_cap_refs(ci, *got);
+                       _got = need | (have & want);
+                       __take_cap_refs(ci, _got);
                        ret = 1;
                }
        } else {
                dout("get_cap_refs %p have %s needed %s\n", inode,
                     ceph_cap_string(have), ceph_cap_string(need));
        }
-out:
+out_unlock:
        spin_unlock(&ci->i_ceph_lock);
+
+       if (ci->i_inline_version != CEPH_INLINE_NONE &&
+           (_got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) &&
+           i_size_read(inode) > 0) {
+               int ret1;
+               struct page *page = find_get_page(inode->i_mapping, 0);
+               if (page) {
+                       if (PageUptodate(page)) {
+                               *pinned_page = page;
+                               goto out;
+                       }
+                       page_cache_release(page);
+               }
+               /*
+                * drop cap refs first because getattr while holding
+                * caps refs can cause deadlock.
+                */
+               ceph_put_cap_refs(ci, _got);
+               _got = 0;
+
+               /* getattr request will bring inline data into page cache */
+               ret1 = __ceph_do_getattr(inode, NULL,
+                                        CEPH_STAT_CAP_INLINE_DATA, true);
+               if (ret1 >= 0) {
+                       ret = 0;
+                       goto again;
+               }
+               *err = ret1;
+               ret = 1;
+       }
+out:
        dout("get_cap_refs %p ret %d got %s\n", inode,
-            ret, ceph_cap_string(*got));
+            ret, ceph_cap_string(_got));
+       *got = _got;
        return ret;
 }
 
@@ -2168,8 +2202,8 @@ static void check_max_size(struct inode *inode, loff_t endoff)
  * due to a small max_size, make sure we check_max_size (and possibly
  * ask the mds) so we don't get hung up indefinitely.
  */
-int ceph_get_caps(struct ceph_inode_info *ci, int need, int want, int *got,
-                 loff_t endoff)
+int ceph_get_caps(struct ceph_inode_info *ci, int need, int want,
+                 loff_t endoff, int *got, struct page **pinned_page)
 {
        int check_max, ret, err;
 
@@ -2179,8 +2213,8 @@ retry:
        check_max = 0;
        err = 0;
        ret = wait_event_interruptible(ci->i_cap_wq,
-                                      try_get_cap_refs(ci, need, want,
-                                                       got, endoff,
+                                      try_get_cap_refs(ci, need, want, endoff,
+                                                       got, pinned_page,
                                                        &check_max, &err));
        if (err)
                ret = err;
index c03ac4c4bcd102502ec1142bd25ffae0c70eae34..861b9954a63a0bf59fc23b6be9ff24f06949ebb0 100644 (file)
@@ -805,6 +805,7 @@ static ssize_t ceph_read_iter(struct kiocb *iocb, struct iov_iter *to)
        size_t len = iocb->ki_nbytes;
        struct inode *inode = file_inode(filp);
        struct ceph_inode_info *ci = ceph_inode(inode);
+       struct page *pinned_page = NULL;
        ssize_t ret;
        int want, got = 0;
        int checkeof = 0, read = 0;
@@ -817,7 +818,7 @@ again:
                want = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO;
        else
                want = CEPH_CAP_FILE_CACHE;
-       ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, &got, -1);
+       ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, -1, &got, &pinned_page);
        if (ret < 0)
                return ret;
 
@@ -840,6 +841,10 @@ again:
        }
        dout("aio_read %p %llx.%llx dropping cap refs on %s = %d\n",
             inode, ceph_vinop(inode), ceph_cap_string(got), (int)ret);
+       if (pinned_page) {
+               page_cache_release(pinned_page);
+               pinned_page = NULL;
+       }
        ceph_put_cap_refs(ci, got);
 
        if (checkeof && ret >= 0) {
@@ -924,7 +929,8 @@ retry_snap:
        else
                want = CEPH_CAP_FILE_BUFFER;
        got = 0;
-       err = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, &got, pos + count);
+       err = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, pos + count,
+                           &got, NULL);
        if (err < 0)
                goto out;
 
@@ -1225,7 +1231,7 @@ static long ceph_fallocate(struct file *file, int mode,
        else
                want = CEPH_CAP_FILE_BUFFER;
 
-       ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, &got, endoff);
+       ret = ceph_get_caps(ci, CEPH_CAP_FILE_WR, want, endoff, &got, NULL);
        if (ret < 0)
                goto unlock;