ceph: Implement writev/pwritev for sync operation.
authormajianpeng <majianpeng@gmail.com>
Thu, 12 Sep 2013 05:54:26 +0000 (13:54 +0800)
committerSage Weil <sage@inktank.com>
Fri, 13 Dec 2013 17:13:17 +0000 (09:13 -0800)
For writev/pwritev sync-operatoin, ceph only do the first iov.

I divided the write-sync-operation into two functions. One for
direct-write, other for none-direct-sync-write. This is because for
none-direct-sync-write we can merge iovs to one. But for direct-write,
we can't merge iovs.

Signed-off-by: Jianpeng Ma <majianpeng@gmail.com>
Reviewed-by: Yan, Zheng <zheng.z.yan@intel.com>
Signed-off-by: Sage Weil <sage@inktank.com>
fs/ceph/file.c

index 3de89829e2a162ab6bce2a58296b25aef9235c43..5cf034e915bb1ba5053b8d49786a2956148d3b72 100644 (file)
@@ -489,83 +489,79 @@ static void ceph_sync_write_unsafe(struct ceph_osd_request *req, bool unsafe)
        }
 }
 
+
 /*
- * Synchronous write, straight from __user pointer or user pages (if
- * O_DIRECT).
+ * Synchronous write, straight from __user pointer or user pages.
  *
  * If write spans object boundary, just do multiple writes.  (For a
  * correct atomic write, we should e.g. take write locks on all
  * objects, rollback on failure, etc.)
  */
-static ssize_t ceph_sync_write(struct file *file, const char __user *data,
-                              size_t left, loff_t pos, loff_t *ppos)
+static ssize_t
+ceph_sync_direct_write(struct kiocb *iocb, const struct iovec *iov,
+                      unsigned long nr_segs, size_t count)
 {
+       struct file *file = iocb->ki_filp;
        struct inode *inode = file_inode(file);
        struct ceph_inode_info *ci = ceph_inode(inode);
        struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
        struct ceph_snap_context *snapc;
        struct ceph_vino vino;
        struct ceph_osd_request *req;
-       int num_ops = 1;
        struct page **pages;
        int num_pages;
-       u64 len;
        int written = 0;
        int flags;
        int check_caps = 0;
-       int page_align, io_align;
-       unsigned long buf_align;
+       int page_align;
        int ret;
        struct timespec mtime = CURRENT_TIME;
-       bool own_pages = false;
+       loff_t pos = iocb->ki_pos;
+       struct iov_iter i;
 
        if (ceph_snap(file_inode(file)) != CEPH_NOSNAP)
                return -EROFS;
 
-       dout("sync_write on file %p %lld~%u %s\n", file, pos,
-            (unsigned)left, (file->f_flags & O_DIRECT) ? "O_DIRECT" : "");
+       dout("sync_direct_write on file %p %lld~%u\n", file, pos,
+            (unsigned)count);
 
-       ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + left);
+       ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + count);
        if (ret < 0)
                return ret;
 
        ret = invalidate_inode_pages2_range(inode->i_mapping,
                                            pos >> PAGE_CACHE_SHIFT,
-                                           (pos + left) >> PAGE_CACHE_SHIFT);
+                                           (pos + count) >> PAGE_CACHE_SHIFT);
        if (ret < 0)
                dout("invalidate_inode_pages2_range returned %d\n", ret);
 
        flags = CEPH_OSD_FLAG_ORDERSNAP |
                CEPH_OSD_FLAG_ONDISK |
                CEPH_OSD_FLAG_WRITE;
-       if ((file->f_flags & (O_SYNC|O_DIRECT)) == 0)
-               flags |= CEPH_OSD_FLAG_ACK;
-       else
-               num_ops++;      /* Also include a 'startsync' command. */
 
-       /*
-        * we may need to do multiple writes here if we span an object
-        * boundary.  this isn't atomic, unfortunately.  :(
-        */
-more:
-       io_align = pos & ~PAGE_MASK;
-       buf_align = (unsigned long)data & ~PAGE_MASK;
-       len = left;
-
-       snapc = ci->i_snap_realm->cached_context;
-       vino = ceph_vino(inode);
-       req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
-                                   vino, pos, &len, num_ops,
-                                   CEPH_OSD_OP_WRITE, flags, snapc,
-                                   ci->i_truncate_seq, ci->i_truncate_size,
-                                   false);
-       if (IS_ERR(req))
-               return PTR_ERR(req);
+       iov_iter_init(&i, iov, nr_segs, count, 0);
+
+       while (iov_iter_count(&i) > 0) {
+               void __user *data = i.iov->iov_base + i.iov_offset;
+               u64 len = i.iov->iov_len - i.iov_offset;
+
+               page_align = (unsigned long)data & ~PAGE_MASK;
+
+               snapc = ci->i_snap_realm->cached_context;
+               vino = ceph_vino(inode);
+               req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
+                                           vino, pos, &len,
+                                           2,/*include a 'startsync' command*/
+                                           CEPH_OSD_OP_WRITE, flags, snapc,
+                                           ci->i_truncate_seq,
+                                           ci->i_truncate_size,
+                                           false);
+               if (IS_ERR(req)) {
+                       ret = PTR_ERR(req);
+                       goto out;
+               }
 
-       /* write from beginning of first page, regardless of io alignment */
-       page_align = file->f_flags & O_DIRECT ? buf_align : io_align;
-       num_pages = calc_pages_for(page_align, len);
-       if (file->f_flags & O_DIRECT) {
+               num_pages = calc_pages_for(page_align, len);
                pages = ceph_get_direct_page_vector(data, num_pages, false);
                if (IS_ERR(pages)) {
                        ret = PTR_ERR(pages);
@@ -577,60 +573,175 @@ more:
                 * may block.
                 */
                truncate_inode_pages_range(inode->i_mapping, pos,
-                                          (pos+len) | (PAGE_CACHE_SIZE-1));
-       } else {
+                                  (pos+len) | (PAGE_CACHE_SIZE-1));
+               osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_align,
+                                               false, false);
+
+               /* BUG_ON(vino.snap != CEPH_NOSNAP); */
+               ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime);
+
+               ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);
+               if (!ret)
+                       ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
+
+               ceph_put_page_vector(pages, num_pages, false);
+
+out:
+               ceph_osdc_put_request(req);
+               if (ret == 0) {
+                       pos += len;
+                       written += len;
+                       iov_iter_advance(&i, (size_t)len);
+
+                       if (pos > i_size_read(inode)) {
+                               check_caps = ceph_inode_set_size(inode, pos);
+                               if (check_caps)
+                                       ceph_check_caps(ceph_inode(inode),
+                                                       CHECK_CAPS_AUTHONLY,
+                                                       NULL);
+                       }
+               } else
+                       break;
+       }
+
+       if (ret != -EOLDSNAPC && written > 0) {
+               iocb->ki_pos = pos;
+               ret = written;
+       }
+       return ret;
+}
+
+
+/*
+ * Synchronous write, straight from __user pointer or user pages.
+ *
+ * If write spans object boundary, just do multiple writes.  (For a
+ * correct atomic write, we should e.g. take write locks on all
+ * objects, rollback on failure, etc.)
+ */
+static ssize_t ceph_sync_write(struct kiocb *iocb, const struct iovec *iov,
+                              unsigned long nr_segs, size_t count)
+{
+       struct file *file = iocb->ki_filp;
+       struct inode *inode = file_inode(file);
+       struct ceph_inode_info *ci = ceph_inode(inode);
+       struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
+       struct ceph_snap_context *snapc;
+       struct ceph_vino vino;
+       struct ceph_osd_request *req;
+       struct page **pages;
+       u64 len;
+       int num_pages;
+       int written = 0;
+       int flags;
+       int check_caps = 0;
+       int ret;
+       struct timespec mtime = CURRENT_TIME;
+       loff_t pos = iocb->ki_pos;
+       struct iov_iter i;
+
+       if (ceph_snap(file_inode(file)) != CEPH_NOSNAP)
+               return -EROFS;
+
+       dout("sync_write on file %p %lld~%u\n", file, pos, (unsigned)count);
+
+       ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + count);
+       if (ret < 0)
+               return ret;
+
+       ret = invalidate_inode_pages2_range(inode->i_mapping,
+                                           pos >> PAGE_CACHE_SHIFT,
+                                           (pos + count) >> PAGE_CACHE_SHIFT);
+       if (ret < 0)
+               dout("invalidate_inode_pages2_range returned %d\n", ret);
+
+       flags = CEPH_OSD_FLAG_ORDERSNAP |
+               CEPH_OSD_FLAG_ONDISK |
+               CEPH_OSD_FLAG_WRITE |
+               CEPH_OSD_FLAG_ACK;
+
+       iov_iter_init(&i, iov, nr_segs, count, 0);
+
+       while ((len = iov_iter_count(&i)) > 0) {
+               size_t left;
+               int n;
+
+               snapc = ci->i_snap_realm->cached_context;
+               vino = ceph_vino(inode);
+               req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
+                                           vino, pos, &len, 1,
+                                           CEPH_OSD_OP_WRITE, flags, snapc,
+                                           ci->i_truncate_seq,
+                                           ci->i_truncate_size,
+                                           false);
+               if (IS_ERR(req)) {
+                       ret = PTR_ERR(req);
+                       goto out;
+               }
+
+               /*
+                * write from beginning of first page,
+                * regardless of io alignment
+                */
+               num_pages = (len + PAGE_CACHE_SIZE - 1) >> PAGE_CACHE_SHIFT;
+
                pages = ceph_alloc_page_vector(num_pages, GFP_NOFS);
                if (IS_ERR(pages)) {
                        ret = PTR_ERR(pages);
                        goto out;
                }
-               ret = ceph_copy_user_to_page_vector(pages, data, pos, len);
+
+               left = len;
+               for (n = 0; n < num_pages; n++) {
+                       size_t plen = min(left, PAGE_SIZE);
+                       ret = iov_iter_copy_from_user(pages[n], &i, 0, plen);
+                       if (ret != plen) {
+                               ret = -EFAULT;
+                               break;
+                       }
+                       left -= ret;
+                       iov_iter_advance(&i, ret);
+               }
+
                if (ret < 0) {
                        ceph_release_page_vector(pages, num_pages);
                        goto out;
                }
 
-               if ((file->f_flags & O_SYNC) == 0) {
-                       /* get a second commit callback */
-                       req->r_unsafe_callback = ceph_sync_write_unsafe;
-                       req->r_inode = inode;
-                       own_pages = true;
-               }
-       }
-       osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_align,
-                                       false, own_pages);
+               /* get a second commit callback */
+               req->r_unsafe_callback = ceph_sync_write_unsafe;
+               req->r_inode = inode;
 
-       /* BUG_ON(vino.snap != CEPH_NOSNAP); */
-       ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime);
+               osd_req_op_extent_osd_data_pages(req, 0, pages, len, 0,
+                                               false, true);
 
-       ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);
-       if (!ret)
-               ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
+               /* BUG_ON(vino.snap != CEPH_NOSNAP); */
+               ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime);
 
-       if (file->f_flags & O_DIRECT)
-               ceph_put_page_vector(pages, num_pages, false);
-       else if (file->f_flags & O_SYNC)
-               ceph_release_page_vector(pages, num_pages);
+               ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);
+               if (!ret)
+                       ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
 
 out:
-       ceph_osdc_put_request(req);
-       if (ret == 0) {
-               pos += len;
-               written += len;
-               left -= len;
-               data += len;
-               if (left)
-                       goto more;
+               ceph_osdc_put_request(req);
+               if (ret == 0) {
+                       pos += len;
+                       written += len;
+
+                       if (pos > i_size_read(inode)) {
+                               check_caps = ceph_inode_set_size(inode, pos);
+                               if (check_caps)
+                                       ceph_check_caps(ceph_inode(inode),
+                                                       CHECK_CAPS_AUTHONLY,
+                                                       NULL);
+                       }
+               } else
+                       break;
+       }
 
+       if (ret != -EOLDSNAPC && written > 0) {
                ret = written;
-               *ppos = pos;
-               if (pos > i_size_read(inode))
-                       check_caps = ceph_inode_set_size(inode, pos);
-               if (check_caps)
-                       ceph_check_caps(ceph_inode(inode), CHECK_CAPS_AUTHONLY,
-                                       NULL);
-       } else if (ret != -EOLDSNAPC && written > 0) {
-               ret = written;
+               iocb->ki_pos = pos;
        }
        return ret;
 }
@@ -772,11 +883,13 @@ retry_snap:
             inode, ceph_vinop(inode), pos, count, ceph_cap_string(got));
 
        if ((got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO)) == 0 ||
-           (iocb->ki_filp->f_flags & O_DIRECT) ||
-           (fi->flags & CEPH_F_SYNC)) {
+           (file->f_flags & O_DIRECT) || (fi->flags & CEPH_F_SYNC)) {
                mutex_unlock(&inode->i_mutex);
-               written = ceph_sync_write(file, iov->iov_base, count,
-                                         pos, &iocb->ki_pos);
+               if (file->f_flags & O_DIRECT)
+                       written = ceph_sync_direct_write(iocb, iov,
+                                                        nr_segs, count);
+               else
+                       written = ceph_sync_write(iocb, iov, nr_segs, count);
                if (written == -EOLDSNAPC) {
                        dout("aio_write %p %llx.%llx %llu~%u"
                                "got EOLDSNAPC, retrying\n",