ceph: set i_head_snapc when getting CEPH_CAP_FILE_WR reference
authorYan, Zheng <zyan@redhat.com>
Thu, 30 Apr 2015 06:40:54 +0000 (14:40 +0800)
committerIlya Dryomov <idryomov@gmail.com>
Thu, 25 Jun 2015 08:49:28 +0000 (11:49 +0300)
In most cases that snap context is needed, we are holding
reference of CEPH_CAP_FILE_WR. So we can set ceph inode's
i_head_snapc when getting the CEPH_CAP_FILE_WR reference,
and make codes get snap context from i_head_snapc. This makes
the code simpler.

Another benefit of this change is that we can handle snap
notification more elegantly. Especially when snap context
is updated while someone else is doing write. The old queue
cap_snap code may set cap_snap's context to ether the old
context or the new snap context, depending on if i_head_snapc
is set. The new queue capp_snap code always set cap_snap's
context to the old snap context.

Signed-off-by: Yan, Zheng <zyan@redhat.com>
fs/ceph/addr.c
fs/ceph/caps.c
fs/ceph/file.c
fs/ceph/snap.c

index ccc4325aa5c5793cc8c06e6a22c34643a6c63c0b..5f53ac0d9d7c37eb26dd9606d464f3bb69c2cb5e 100644 (file)
@@ -87,17 +87,21 @@ static int ceph_set_page_dirty(struct page *page)
        inode = mapping->host;
        ci = ceph_inode(inode);
 
-       /*
-        * Note that we're grabbing a snapc ref here without holding
-        * any locks!
-        */
-       snapc = ceph_get_snap_context(ci->i_snap_realm->cached_context);
-
        /* dirty the head */
        spin_lock(&ci->i_ceph_lock);
-       if (ci->i_head_snapc == NULL)
-               ci->i_head_snapc = ceph_get_snap_context(snapc);
-       ++ci->i_wrbuffer_ref_head;
+       BUG_ON(ci->i_wr_ref == 0); // caller should hold Fw reference
+       if (__ceph_have_pending_cap_snap(ci)) {
+               struct ceph_cap_snap *capsnap =
+                               list_last_entry(&ci->i_cap_snaps,
+                                               struct ceph_cap_snap,
+                                               ci_item);
+               snapc = ceph_get_snap_context(capsnap->context);
+               capsnap->dirty_pages++;
+       } else {
+               BUG_ON(!ci->i_head_snapc);
+               snapc = ceph_get_snap_context(ci->i_head_snapc);
+               ++ci->i_wrbuffer_ref_head;
+       }
        if (ci->i_wrbuffer_ref == 0)
                ihold(inode);
        ++ci->i_wrbuffer_ref;
@@ -1033,7 +1037,6 @@ static int ceph_update_writeable_page(struct file *file,
 {
        struct inode *inode = file_inode(file);
        struct ceph_inode_info *ci = ceph_inode(inode);
-       struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
        loff_t page_off = pos & PAGE_CACHE_MASK;
        int pos_in_page = pos & ~PAGE_CACHE_MASK;
        int end_in_page = pos_in_page + len;
@@ -1045,10 +1048,6 @@ retry_locked:
        /* writepages currently holds page lock, but if we change that later, */
        wait_on_page_writeback(page);
 
-       /* check snap context */
-       BUG_ON(!ci->i_snap_realm);
-       down_read(&mdsc->snap_rwsem);
-       BUG_ON(!ci->i_snap_realm->cached_context);
        snapc = page_snap_context(page);
        if (snapc && snapc != ci->i_head_snapc) {
                /*
@@ -1056,7 +1055,6 @@ retry_locked:
                 * context!  is it writeable now?
                 */
                oldest = get_oldest_context(inode, NULL);
-               up_read(&mdsc->snap_rwsem);
 
                if (snapc->seq > oldest->seq) {
                        ceph_put_snap_context(oldest);
@@ -1113,7 +1111,6 @@ retry_locked:
        }
 
        /* we need to read it. */
-       up_read(&mdsc->snap_rwsem);
        r = readpage_nounlock(file, page);
        if (r < 0)
                goto fail_nosnap;
@@ -1158,16 +1155,13 @@ static int ceph_write_begin(struct file *file, struct address_space *mapping,
 
 /*
  * we don't do anything in here that simple_write_end doesn't do
- * except adjust dirty page accounting and drop read lock on
- * mdsc->snap_rwsem.
+ * except adjust dirty page accounting
  */
 static int ceph_write_end(struct file *file, struct address_space *mapping,
                          loff_t pos, unsigned len, unsigned copied,
                          struct page *page, void *fsdata)
 {
        struct inode *inode = file_inode(file);
-       struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
-       struct ceph_mds_client *mdsc = fsc->mdsc;
        unsigned from = pos & (PAGE_CACHE_SIZE - 1);
        int check_cap = 0;
 
@@ -1189,7 +1183,6 @@ static int ceph_write_end(struct file *file, struct address_space *mapping,
        set_page_dirty(page);
 
        unlock_page(page);
-       up_read(&mdsc->snap_rwsem);
        page_cache_release(page);
 
        if (check_cap)
@@ -1315,7 +1308,6 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf)
        struct inode *inode = file_inode(vma->vm_file);
        struct ceph_inode_info *ci = ceph_inode(inode);
        struct ceph_file_info *fi = vma->vm_file->private_data;
-       struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
        struct page *page = vmf->page;
        loff_t off = page_offset(page);
        loff_t size = i_size_read(inode);
@@ -1374,7 +1366,6 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf)
        if (ret == 0) {
                /* success.  we'll keep the page locked. */
                set_page_dirty(page);
-               up_read(&mdsc->snap_rwsem);
                ret = VM_FAULT_LOCKED;
        } else {
                if (ret == -ENOMEM)
index 7c8e93aeb01e8e90d5625ada31b3642221e32982..feb8ec92f1b4d22b203fae1dcab42dbdfce6bee8 100644 (file)
@@ -2073,7 +2073,8 @@ static void kick_flushing_inode_caps(struct ceph_mds_client *mdsc,
  *
  * Protected by i_ceph_lock.
  */
-static void __take_cap_refs(struct ceph_inode_info *ci, int got)
+static void __take_cap_refs(struct ceph_inode_info *ci, int got,
+                           bool snap_rwsem_locked)
 {
        if (got & CEPH_CAP_PIN)
                ci->i_pin_ref++;
@@ -2081,8 +2082,14 @@ static void __take_cap_refs(struct ceph_inode_info *ci, int got)
                ci->i_rd_ref++;
        if (got & CEPH_CAP_FILE_CACHE)
                ci->i_rdcache_ref++;
-       if (got & CEPH_CAP_FILE_WR)
+       if (got & CEPH_CAP_FILE_WR) {
+               if (ci->i_wr_ref == 0 && !ci->i_head_snapc) {
+                       BUG_ON(!snap_rwsem_locked);
+                       ci->i_head_snapc = ceph_get_snap_context(
+                                       ci->i_snap_realm->cached_context);
+               }
                ci->i_wr_ref++;
+       }
        if (got & CEPH_CAP_FILE_BUFFER) {
                if (ci->i_wb_ref == 0)
                        ihold(&ci->vfs_inode);
@@ -2100,16 +2107,19 @@ static void __take_cap_refs(struct ceph_inode_info *ci, int got)
  * requested from the MDS.
  */
 static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want,
-                           loff_t endoff, int *got, int *check_max, int *err)
+                           loff_t endoff, bool nonblock, int *got, int *err)
 {
        struct inode *inode = &ci->vfs_inode;
+       struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
        int ret = 0;
        int have, implemented;
        int file_wanted;
+       bool snap_rwsem_locked = false;
 
        dout("get_cap_refs %p need %s want %s\n", inode,
             ceph_cap_string(need), ceph_cap_string(want));
 
+again:
        spin_lock(&ci->i_ceph_lock);
 
        /* make sure file is actually open */
@@ -2125,6 +2135,10 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want,
        /* finish pending truncate */
        while (ci->i_truncate_pending) {
                spin_unlock(&ci->i_ceph_lock);
+               if (snap_rwsem_locked) {
+                       up_read(&mdsc->snap_rwsem);
+                       snap_rwsem_locked = false;
+               }
                __ceph_do_pending_vmtruncate(inode);
                spin_lock(&ci->i_ceph_lock);
        }
@@ -2136,7 +2150,7 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want,
                        dout("get_cap_refs %p endoff %llu > maxsize %llu\n",
                             inode, endoff, ci->i_max_size);
                        if (endoff > ci->i_requested_max_size) {
-                               *check_max = 1;
+                               *err = -EAGAIN;
                                ret = 1;
                        }
                        goto out_unlock;
@@ -2164,8 +2178,29 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want,
                     inode, ceph_cap_string(have), ceph_cap_string(not),
                     ceph_cap_string(revoking));
                if ((revoking & not) == 0) {
+                       if (!snap_rwsem_locked &&
+                           !ci->i_head_snapc &&
+                           (need & CEPH_CAP_FILE_WR)) {
+                               if (!down_read_trylock(&mdsc->snap_rwsem)) {
+                                       /*
+                                        * we can not call down_read() when
+                                        * task isn't in TASK_RUNNING state
+                                        */
+                                       if (nonblock) {
+                                               *err = -EAGAIN;
+                                               ret = 1;
+                                               goto out_unlock;
+                                       }
+
+                                       spin_unlock(&ci->i_ceph_lock);
+                                       down_read(&mdsc->snap_rwsem);
+                                       snap_rwsem_locked = true;
+                                       goto again;
+                               }
+                               snap_rwsem_locked = true;
+                       }
                        *got = need | (have & want);
-                       __take_cap_refs(ci, *got);
+                       __take_cap_refs(ci, *got, true);
                        ret = 1;
                }
        } else {
@@ -2189,6 +2224,8 @@ static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want,
        }
 out_unlock:
        spin_unlock(&ci->i_ceph_lock);
+       if (snap_rwsem_locked)
+               up_read(&mdsc->snap_rwsem);
 
        dout("get_cap_refs %p ret %d got %s\n", inode,
             ret, ceph_cap_string(*got));
@@ -2231,54 +2268,70 @@ static void check_max_size(struct inode *inode, loff_t endoff)
 int ceph_get_caps(struct ceph_inode_info *ci, int need, int want,
                  loff_t endoff, int *got, struct page **pinned_page)
 {
-       int _got, check_max, ret, err = 0;
+       int _got, ret, err = 0;
 
        ret = ceph_pool_perm_check(ci, need);
        if (ret < 0)
                return ret;
 
-retry:
-       if (endoff > 0)
-               check_max_size(&ci->vfs_inode, endoff);
-       _got = 0;
-       check_max = 0;
-       ret = wait_event_interruptible(ci->i_cap_wq,
-                               try_get_cap_refs(ci, need, want, endoff,
-                                                &_got, &check_max, &err));
-       if (err)
-               ret = err;
-       if (ret < 0)
-               return ret;
+       while (true) {
+               if (endoff > 0)
+                       check_max_size(&ci->vfs_inode, endoff);
 
-       if (check_max)
-               goto retry;
+               err = 0;
+               _got = 0;
+               ret = try_get_cap_refs(ci, need, want, endoff,
+                                      false, &_got, &err);
+               if (ret) {
+                       if (err == -EAGAIN)
+                               continue;
+                       if (err < 0)
+                               return err;
+               } else {
+                       ret = wait_event_interruptible(ci->i_cap_wq,
+                                       try_get_cap_refs(ci, need, want, endoff,
+                                                        true, &_got, &err));
+                       if (err == -EAGAIN)
+                               continue;
+                       if (err < 0)
+                               ret = err;
+                       if (ret < 0)
+                               return ret;
+               }
 
-       if (ci->i_inline_version != CEPH_INLINE_NONE &&
-           (_got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) &&
-           i_size_read(&ci->vfs_inode) > 0) {
-               struct page *page = find_get_page(ci->vfs_inode.i_mapping, 0);
-               if (page) {
-                       if (PageUptodate(page)) {
-                               *pinned_page = page;
-                               goto out;
+               if (ci->i_inline_version != CEPH_INLINE_NONE &&
+                   (_got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) &&
+                   i_size_read(&ci->vfs_inode) > 0) {
+                       struct page *page =
+                               find_get_page(ci->vfs_inode.i_mapping, 0);
+                       if (page) {
+                               if (PageUptodate(page)) {
+                                       *pinned_page = page;
+                                       break;
+                               }
+                               page_cache_release(page);
                        }
-                       page_cache_release(page);
-               }
-               /*
-                * drop cap refs first because getattr while holding
-                * caps refs can cause deadlock.
-                */
-               ceph_put_cap_refs(ci, _got);
-               _got = 0;
+                       /*
+                        * drop cap refs first because getattr while
+                        * holding * caps refs can cause deadlock.
+                        */
+                       ceph_put_cap_refs(ci, _got);
+                       _got = 0;
 
-               /* getattr request will bring inline data into page cache */
-               ret = __ceph_do_getattr(&ci->vfs_inode, NULL,
-                                       CEPH_STAT_CAP_INLINE_DATA, true);
-               if (ret < 0)
-                       return ret;
-               goto retry;
+                       /*
+                        * getattr request will bring inline data into
+                        * page cache
+                        */
+                       ret = __ceph_do_getattr(&ci->vfs_inode, NULL,
+                                               CEPH_STAT_CAP_INLINE_DATA,
+                                               true);
+                       if (ret < 0)
+                               return ret;
+                       continue;
+               }
+               break;
        }
-out:
+
        *got = _got;
        return 0;
 }
@@ -2290,7 +2343,7 @@ out:
 void ceph_get_cap_refs(struct ceph_inode_info *ci, int caps)
 {
        spin_lock(&ci->i_ceph_lock);
-       __take_cap_refs(ci, caps);
+       __take_cap_refs(ci, caps, false);
        spin_unlock(&ci->i_ceph_lock);
 }
 
@@ -2341,6 +2394,13 @@ void ceph_put_cap_refs(struct ceph_inode_info *ci, int had)
                                        wake = 1;
                                }
                        }
+                       if (ci->i_wrbuffer_ref_head == 0 &&
+                           ci->i_dirty_caps == 0 &&
+                           ci->i_flushing_caps == 0) {
+                               BUG_ON(!ci->i_head_snapc);
+                               ceph_put_snap_context(ci->i_head_snapc);
+                               ci->i_head_snapc = NULL;
+                       }
                        /* see comment in __ceph_remove_cap() */
                        if (!__ceph_is_any_caps(ci) && ci->i_snap_realm)
                                drop_inode_snap_realm(ci);
@@ -2384,7 +2444,9 @@ void ceph_put_wrbuffer_cap_refs(struct ceph_inode_info *ci, int nr,
        if (ci->i_head_snapc == snapc) {
                ci->i_wrbuffer_ref_head -= nr;
                if (ci->i_wrbuffer_ref_head == 0 &&
-                   ci->i_dirty_caps == 0 && ci->i_flushing_caps == 0) {
+                   ci->i_wr_ref == 0 &&
+                   ci->i_dirty_caps == 0 &&
+                   ci->i_flushing_caps == 0) {
                        BUG_ON(!ci->i_head_snapc);
                        ceph_put_snap_context(ci->i_head_snapc);
                        ci->i_head_snapc = NULL;
@@ -2775,7 +2837,8 @@ static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid,
                        dout(" inode %p now clean\n", inode);
                        BUG_ON(!list_empty(&ci->i_dirty_item));
                        drop = 1;
-                       if (ci->i_wrbuffer_ref_head == 0) {
+                       if (ci->i_wr_ref == 0 &&
+                           ci->i_wrbuffer_ref_head == 0) {
                                BUG_ON(!ci->i_head_snapc);
                                ceph_put_snap_context(ci->i_head_snapc);
                                ci->i_head_snapc = NULL;
index 777eb21d556f08618fce978ba6735c30276e6e0b..0a76a370d7987a184bcf3535bbfe990111bea892 100644 (file)
@@ -557,13 +557,13 @@ static void ceph_sync_write_unsafe(struct ceph_osd_request *req, bool unsafe)
  * objects, rollback on failure, etc.)
  */
 static ssize_t
-ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos)
+ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
+                      struct ceph_snap_context *snapc)
 {
        struct file *file = iocb->ki_filp;
        struct inode *inode = file_inode(file);
        struct ceph_inode_info *ci = ceph_inode(inode);
        struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
-       struct ceph_snap_context *snapc;
        struct ceph_vino vino;
        struct ceph_osd_request *req;
        struct page **pages;
@@ -600,7 +600,6 @@ ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos)
                size_t start;
                ssize_t n;
 
-               snapc = ci->i_snap_realm->cached_context;
                vino = ceph_vino(inode);
                req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
                                            vino, pos, &len, 0,
@@ -674,13 +673,13 @@ ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos)
  * objects, rollback on failure, etc.)
  */
 static ssize_t
-ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos)
+ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
+               struct ceph_snap_context *snapc)
 {
        struct file *file = iocb->ki_filp;
        struct inode *inode = file_inode(file);
        struct ceph_inode_info *ci = ceph_inode(inode);
        struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
-       struct ceph_snap_context *snapc;
        struct ceph_vino vino;
        struct ceph_osd_request *req;
        struct page **pages;
@@ -717,7 +716,6 @@ ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos)
                size_t left;
                int n;
 
-               snapc = ci->i_snap_realm->cached_context;
                vino = ceph_vino(inode);
                req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
                                            vino, pos, &len, 0, 1,
@@ -996,14 +994,30 @@ retry_snap:
 
        if ((got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO)) == 0 ||
            (iocb->ki_flags & IOCB_DIRECT) || (fi->flags & CEPH_F_SYNC)) {
+               struct ceph_snap_context *snapc;
                struct iov_iter data;
                mutex_unlock(&inode->i_mutex);
+
+               spin_lock(&ci->i_ceph_lock);
+               if (__ceph_have_pending_cap_snap(ci)) {
+                       struct ceph_cap_snap *capsnap =
+                                       list_last_entry(&ci->i_cap_snaps,
+                                                       struct ceph_cap_snap,
+                                                       ci_item);
+                       snapc = ceph_get_snap_context(capsnap->context);
+               } else {
+                       BUG_ON(!ci->i_head_snapc);
+                       snapc = ceph_get_snap_context(ci->i_head_snapc);
+               }
+               spin_unlock(&ci->i_ceph_lock);
+
                /* we might need to revert back to that point */
                data = *from;
                if (iocb->ki_flags & IOCB_DIRECT)
-                       written = ceph_sync_direct_write(iocb, &data, pos);
+                       written = ceph_sync_direct_write(iocb, &data, pos,
+                                                        snapc);
                else
-                       written = ceph_sync_write(iocb, &data, pos);
+                       written = ceph_sync_write(iocb, &data, pos, snapc);
                if (written == -EOLDSNAPC) {
                        dout("aio_write %p %llx.%llx %llu~%u"
                                "got EOLDSNAPC, retrying\n",
@@ -1014,6 +1028,7 @@ retry_snap:
                }
                if (written > 0)
                        iov_iter_advance(from, written);
+               ceph_put_snap_context(snapc);
        } else {
                loff_t old_size = inode->i_size;
                /*
index b2a945345d2b97633d1c0ec78adb48c68fdcb21b..5bfdab9a465ed13dd8612a9bec41ebf6976ecd0b 100644 (file)
@@ -455,6 +455,7 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci)
 {
        struct inode *inode = &ci->vfs_inode;
        struct ceph_cap_snap *capsnap;
+       struct ceph_snap_context *old_snapc;
        int used, dirty;
 
        capsnap = kzalloc(sizeof(*capsnap), GFP_NOFS);
@@ -467,6 +468,8 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci)
        used = __ceph_caps_used(ci);
        dirty = __ceph_caps_dirty(ci);
 
+       old_snapc = ci->i_head_snapc;
+
        /*
         * If there is a write in progress, treat that as a dirty Fw,
         * even though it hasn't completed yet; by the time we finish
@@ -481,76 +484,79 @@ void ceph_queue_cap_snap(struct ceph_inode_info *ci)
                   writes in progress now were started before the previous
                   cap_snap.  lucky us. */
                dout("queue_cap_snap %p already pending\n", inode);
-               kfree(capsnap);
-       } else if (ci->i_snap_realm->cached_context == ceph_empty_snapc) {
+               goto update_snapc;
+       }
+       if (ci->i_snap_realm->cached_context == ceph_empty_snapc) {
                dout("queue_cap_snap %p empty snapc\n", inode);
-               kfree(capsnap);
-       } else if (dirty & (CEPH_CAP_AUTH_EXCL|CEPH_CAP_XATTR_EXCL|
-                           CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR)) {
-               struct ceph_snap_context *snapc = ci->i_head_snapc;
+               goto update_snapc;
+       }
+       if (!(dirty & (CEPH_CAP_AUTH_EXCL|CEPH_CAP_XATTR_EXCL|
+                      CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR))) {
+               dout("queue_cap_snap %p nothing dirty|writing\n", inode);
+               goto update_snapc;
+       }
 
-               /*
-                * if we are a sync write, we may need to go to the snaprealm
-                * to get the current snapc.
-                */
-               if (!snapc)
-                       snapc = ci->i_snap_realm->cached_context;
+       BUG_ON(!old_snapc);
 
-               dout("queue_cap_snap %p cap_snap %p queuing under %p %s\n",
-                    inode, capsnap, snapc, ceph_cap_string(dirty));
-               ihold(inode);
+       dout("queue_cap_snap %p cap_snap %p queuing under %p %s\n",
+            inode, capsnap, old_snapc, ceph_cap_string(dirty));
+       ihold(inode);
 
-               atomic_set(&capsnap->nref, 1);
-               capsnap->ci = ci;
-               INIT_LIST_HEAD(&capsnap->ci_item);
-               INIT_LIST_HEAD(&capsnap->flushing_item);
-
-               capsnap->follows = snapc->seq;
-               capsnap->issued = __ceph_caps_issued(ci, NULL);
-               capsnap->dirty = dirty;
-
-               capsnap->mode = inode->i_mode;
-               capsnap->uid = inode->i_uid;
-               capsnap->gid = inode->i_gid;
-
-               if (dirty & CEPH_CAP_XATTR_EXCL) {
-                       __ceph_build_xattrs_blob(ci);
-                       capsnap->xattr_blob =
-                               ceph_buffer_get(ci->i_xattrs.blob);
-                       capsnap->xattr_version = ci->i_xattrs.version;
-               } else {
-                       capsnap->xattr_blob = NULL;
-                       capsnap->xattr_version = 0;
-               }
+       atomic_set(&capsnap->nref, 1);
+       capsnap->ci = ci;
+       INIT_LIST_HEAD(&capsnap->ci_item);
+       INIT_LIST_HEAD(&capsnap->flushing_item);
 
-               capsnap->inline_data = ci->i_inline_version != CEPH_INLINE_NONE;
+       capsnap->follows = old_snapc->seq;
+       capsnap->issued = __ceph_caps_issued(ci, NULL);
+       capsnap->dirty = dirty;
 
-               /* dirty page count moved from _head to this cap_snap;
-                  all subsequent writes page dirties occur _after_ this
-                  snapshot. */
-               capsnap->dirty_pages = ci->i_wrbuffer_ref_head;
-               ci->i_wrbuffer_ref_head = 0;
-               capsnap->context = snapc;
-               ci->i_head_snapc =
-                       ceph_get_snap_context(ci->i_snap_realm->cached_context);
-               dout(" new snapc is %p\n", ci->i_head_snapc);
-               list_add_tail(&capsnap->ci_item, &ci->i_cap_snaps);
-
-               if (used & CEPH_CAP_FILE_WR) {
-                       dout("queue_cap_snap %p cap_snap %p snapc %p"
-                            " seq %llu used WR, now pending\n", inode,
-                            capsnap, snapc, snapc->seq);
-                       capsnap->writing = 1;
-               } else {
-                       /* note mtime, size NOW. */
-                       __ceph_finish_cap_snap(ci, capsnap);
-               }
+       capsnap->mode = inode->i_mode;
+       capsnap->uid = inode->i_uid;
+       capsnap->gid = inode->i_gid;
+
+       if (dirty & CEPH_CAP_XATTR_EXCL) {
+               __ceph_build_xattrs_blob(ci);
+               capsnap->xattr_blob =
+                       ceph_buffer_get(ci->i_xattrs.blob);
+               capsnap->xattr_version = ci->i_xattrs.version;
        } else {
-               dout("queue_cap_snap %p nothing dirty|writing\n", inode);
-               kfree(capsnap);
+               capsnap->xattr_blob = NULL;
+               capsnap->xattr_version = 0;
        }
 
+       capsnap->inline_data = ci->i_inline_version != CEPH_INLINE_NONE;
+
+       /* dirty page count moved from _head to this cap_snap;
+          all subsequent writes page dirties occur _after_ this
+          snapshot. */
+       capsnap->dirty_pages = ci->i_wrbuffer_ref_head;
+       ci->i_wrbuffer_ref_head = 0;
+       capsnap->context = old_snapc;
+       list_add_tail(&capsnap->ci_item, &ci->i_cap_snaps);
+       old_snapc = NULL;
+
+       if (used & CEPH_CAP_FILE_WR) {
+               dout("queue_cap_snap %p cap_snap %p snapc %p"
+                    " seq %llu used WR, now pending\n", inode,
+                    capsnap, old_snapc, old_snapc->seq);
+               capsnap->writing = 1;
+       } else {
+               /* note mtime, size NOW. */
+               __ceph_finish_cap_snap(ci, capsnap);
+       }
+       capsnap = NULL;
+
+update_snapc:
+       if (ci->i_head_snapc) {
+               ci->i_head_snapc = ceph_get_snap_context(
+                               ci->i_snap_realm->cached_context);
+               dout(" new snapc is %p\n", ci->i_head_snapc);
+       }
        spin_unlock(&ci->i_ceph_lock);
+
+       kfree(capsnap);
+       ceph_put_snap_context(old_snapc);
 }
 
 /*