ceph: fix race between page writeback and truncate
authorYan, Zheng <zheng.z.yan@intel.com>
Fri, 31 May 2013 08:48:29 +0000 (16:48 +0800)
committerSage Weil <sage@inktank.com>
Wed, 3 Jul 2013 22:32:47 +0000 (15:32 -0700)
The client can receive truncate request from MDS at any time.
So the page writeback code need to get i_size, truncate_seq and
truncate_size atomically

Signed-off-by: Yan, Zheng <zheng.z.yan@intel.com>
Reviewed-by: Sage Weil <sage@inktank.com>
fs/ceph/addr.c

index 3e68ac1010407b23617cf1f6e019834d2eee1670..3500b74c32ed158e6db2119311f0a64c1e6cd0b5 100644 (file)
@@ -438,13 +438,12 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
        struct ceph_inode_info *ci;
        struct ceph_fs_client *fsc;
        struct ceph_osd_client *osdc;
-       loff_t page_off = page_offset(page);
-       int len = PAGE_CACHE_SIZE;
-       loff_t i_size;
-       int err = 0;
        struct ceph_snap_context *snapc, *oldest;
-       u64 snap_size = 0;
+       loff_t page_off = page_offset(page);
        long writeback_stat;
+       u64 truncate_size, snap_size = 0;
+       u32 truncate_seq;
+       int err = 0, len = PAGE_CACHE_SIZE;
 
        dout("writepage %p idx %lu\n", page, page->index);
 
@@ -474,13 +473,20 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
        }
        ceph_put_snap_context(oldest);
 
+       spin_lock(&ci->i_ceph_lock);
+       truncate_seq = ci->i_truncate_seq;
+       truncate_size = ci->i_truncate_size;
+       if (!snap_size)
+               snap_size = i_size_read(inode);
+       spin_unlock(&ci->i_ceph_lock);
+
        /* is this a partial page at end of file? */
-       if (snap_size)
-               i_size = snap_size;
-       else
-               i_size = i_size_read(inode);
-       if (i_size < page_off + len)
-               len = i_size - page_off;
+       if (page_off >= snap_size) {
+               dout("%p page eof %llu\n", page, snap_size);
+               goto out;
+       }
+       if (snap_size < page_off + len)
+               len = snap_size - page_off;
 
        dout("writepage %p page %p index %lu on %llu~%u snapc %p\n",
             inode, page, page->index, page_off, len, snapc);
@@ -494,7 +500,7 @@ static int writepage_nounlock(struct page *page, struct writeback_control *wbc)
        err = ceph_osdc_writepages(osdc, ceph_vino(inode),
                                   &ci->i_layout, snapc,
                                   page_off, len,
-                                  ci->i_truncate_seq, ci->i_truncate_size,
+                                  truncate_seq, truncate_size,
                                   &inode->i_mtime, &page, 1);
        if (err < 0) {
                dout("writepage setting page/mapping error %d %p\n", err, page);
@@ -631,25 +637,6 @@ static void writepages_finish(struct ceph_osd_request *req,
        ceph_osdc_put_request(req);
 }
 
-static struct ceph_osd_request *
-ceph_writepages_osd_request(struct inode *inode, u64 offset, u64 *len,
-                               struct ceph_snap_context *snapc, int num_ops)
-{
-       struct ceph_fs_client *fsc;
-       struct ceph_inode_info *ci;
-       struct ceph_vino vino;
-
-       fsc = ceph_inode_to_client(inode);
-       ci = ceph_inode(inode);
-       vino = ceph_vino(inode);
-       /* BUG_ON(vino.snap != CEPH_NOSNAP); */
-
-       return ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
-                       vino, offset, len, num_ops, CEPH_OSD_OP_WRITE,
-                       CEPH_OSD_FLAG_WRITE|CEPH_OSD_FLAG_ONDISK,
-                       snapc, ci->i_truncate_seq, ci->i_truncate_size, true);
-}
-
 /*
  * initiate async writeback
  */
@@ -658,7 +645,8 @@ static int ceph_writepages_start(struct address_space *mapping,
 {
        struct inode *inode = mapping->host;
        struct ceph_inode_info *ci = ceph_inode(inode);
-       struct ceph_fs_client *fsc;
+       struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
+       struct ceph_vino vino = ceph_vino(inode);
        pgoff_t index, start, end;
        int range_whole = 0;
        int should_loop = 1;
@@ -670,7 +658,8 @@ static int ceph_writepages_start(struct address_space *mapping,
        unsigned wsize = 1 << inode->i_blkbits;
        struct ceph_osd_request *req = NULL;
        int do_sync;
-       u64 snap_size;
+       u64 truncate_size, snap_size;
+       u32 truncate_seq;
 
        /*
         * Include a 'sync' in the OSD request if this is a data
@@ -685,7 +674,6 @@ static int ceph_writepages_start(struct address_space *mapping,
             wbc->sync_mode == WB_SYNC_NONE ? "NONE" :
             (wbc->sync_mode == WB_SYNC_ALL ? "ALL" : "HOLD"));
 
-       fsc = ceph_inode_to_client(inode);
        if (fsc->mount_state == CEPH_MOUNT_SHUTDOWN) {
                pr_warning("writepage_start %p on forced umount\n", inode);
                return -EIO; /* we're in a forced umount, don't write! */
@@ -728,6 +716,14 @@ retry:
                snap_size = i_size_read(inode);
        dout(" oldest snapc is %p seq %lld (%d snaps)\n",
             snapc, snapc->seq, snapc->num_snaps);
+
+       spin_lock(&ci->i_ceph_lock);
+       truncate_seq = ci->i_truncate_seq;
+       truncate_size = ci->i_truncate_size;
+       if (!snap_size)
+               snap_size = i_size_read(inode);
+       spin_unlock(&ci->i_ceph_lock);
+
        if (last_snapc && snapc != last_snapc) {
                /* if we switched to a newer snapc, restart our scan at the
                 * start of the original file range. */
@@ -739,7 +735,6 @@ retry:
 
        while (!done && index <= end) {
                int num_ops = do_sync ? 2 : 1;
-               struct ceph_vino vino;
                unsigned i;
                int first;
                pgoff_t next;
@@ -833,17 +828,18 @@ get_more_pages:
                         * that it will use.
                         */
                        if (locked_pages == 0) {
-                               size_t size;
-
                                BUG_ON(pages);
-
                                /* prepare async write request */
                                offset = (u64)page_offset(page);
                                len = wsize;
-                               req = ceph_writepages_osd_request(inode,
-                                                       offset, &len, snapc,
-                                                       num_ops);
-
+                               req = ceph_osdc_new_request(&fsc->client->osdc,
+                                                       &ci->i_layout, vino,
+                                                       offset, &len, num_ops,
+                                                       CEPH_OSD_OP_WRITE,
+                                                       CEPH_OSD_FLAG_WRITE |
+                                                       CEPH_OSD_FLAG_ONDISK,
+                                                       snapc, truncate_seq,
+                                                       truncate_size, true);
                                if (IS_ERR(req)) {
                                        rc = PTR_ERR(req);
                                        unlock_page(page);
@@ -854,8 +850,8 @@ get_more_pages:
                                req->r_inode = inode;
 
                                max_pages = calc_pages_for(0, (u64)len);
-                               size = max_pages * sizeof (*pages);
-                               pages = kmalloc(size, GFP_NOFS);
+                               pages = kmalloc(max_pages * sizeof (*pages),
+                                               GFP_NOFS);
                                if (!pages) {
                                        pool = fsc->wb_pagevec_pool;
                                        pages = mempool_alloc(pool, GFP_NOFS);