ocfs2: rework ocfs2_buffered_write_cluster()
authorMark Fasheh <mark.fasheh@oracle.com>
Wed, 9 May 2007 00:47:32 +0000 (17:47 -0700)
committerMark Fasheh <mark.fasheh@oracle.com>
Wed, 11 Jul 2007 00:31:46 +0000 (17:31 -0700)
Use some ideas from the new-aops patch series and turn
ocfs2_buffered_write_cluster() into a 2 stage operation with the caller
copying data in between. The code now understands multiple cluster writes as
a result of having to deal with a full page write for greater than 4k pages.

This sets us up to easily call into the write path during ->page_mkwrite().

Signed-off-by: Mark Fasheh <mark.fasheh@oracle.com>
fs/ocfs2/aops.c
fs/ocfs2/aops.h
fs/ocfs2/file.c

index a480b09c79b916de88252129919bedbbfb10850a..3e5758ebd9326e262fb40d7ad0595378bc8027f6 100644 (file)
@@ -684,6 +684,8 @@ int ocfs2_map_page_blocks(struct page *page, u64 *p_blkno,
             bh = bh->b_this_page, block_start += bsize) {
                block_end = block_start + bsize;
 
+               clear_buffer_new(bh);
+
                /*
                 * Ignore blocks outside of our i/o range -
                 * they may belong to unallocated clusters.
@@ -698,9 +700,8 @@ int ocfs2_map_page_blocks(struct page *page, u64 *p_blkno,
                 * For an allocating write with cluster size >= page
                 * size, we always write the entire page.
                 */
-
-               if (buffer_new(bh))
-                       clear_buffer_new(bh);
+               if (new)
+                       set_buffer_new(bh);
 
                if (!buffer_mapped(bh)) {
                        map_bh(bh, inode->i_sb, *p_blkno);
@@ -761,217 +762,232 @@ next_bh:
        return ret;
 }
 
+#if (PAGE_CACHE_SIZE >= OCFS2_MAX_CLUSTERSIZE)
+#define OCFS2_MAX_CTXT_PAGES   1
+#else
+#define OCFS2_MAX_CTXT_PAGES   (OCFS2_MAX_CLUSTERSIZE / PAGE_CACHE_SIZE)
+#endif
+
+#define OCFS2_MAX_CLUSTERS_PER_PAGE    (PAGE_CACHE_SIZE / OCFS2_MIN_CLUSTERSIZE)
+
 /*
- * This will copy user data from the buffer page in the splice
- * context.
- *
- * For now, we ignore SPLICE_F_MOVE as that would require some extra
- * communication out all the way to ocfs2_write().
+ * Describe the state of a single cluster to be written to.
  */
-int ocfs2_map_and_write_splice_data(struct inode *inode,
-                                 struct ocfs2_write_ctxt *wc, u64 *p_blkno,
-                                 unsigned int *ret_from, unsigned int *ret_to)
-{
-       int ret;
-       unsigned int to, from, cluster_start, cluster_end;
-       char *src, *dst;
-       struct ocfs2_splice_write_priv *sp = wc->w_private;
-       struct pipe_buffer *buf = sp->s_buf;
-       unsigned long bytes, src_from;
-       struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
+struct ocfs2_write_cluster_desc {
+       u32             c_cpos;
+       u32             c_phys;
+       /*
+        * Give this a unique field because c_phys eventually gets
+        * filled.
+        */
+       unsigned        c_new;
+};
 
-       ocfs2_figure_cluster_boundaries(osb, wc->w_cpos, &cluster_start,
-                                       &cluster_end);
+struct ocfs2_write_ctxt {
+       /* Logical cluster position / len of write */
+       u32                             w_cpos;
+       u32                             w_clen;
 
-       from = sp->s_offset;
-       src_from = sp->s_buf_offset;
-       bytes = wc->w_count;
+       struct ocfs2_write_cluster_desc w_desc[OCFS2_MAX_CLUSTERS_PER_PAGE];
 
-       if (wc->w_large_pages) {
-               /*
-                * For cluster size < page size, we have to
-                * calculate pos within the cluster and obey
-                * the rightmost boundary.
-                */
-               bytes = min(bytes, (unsigned long)(osb->s_clustersize
-                                  - (wc->w_pos & (osb->s_clustersize - 1))));
-       }
-       to = from + bytes;
+       /*
+        * This is true if page_size > cluster_size.
+        *
+        * It triggers a set of special cases during write which might
+        * have to deal with allocating writes to partial pages.
+        */
+       unsigned int                    w_large_pages;
 
-       BUG_ON(from > PAGE_CACHE_SIZE);
-       BUG_ON(to > PAGE_CACHE_SIZE);
-       BUG_ON(from < cluster_start);
-       BUG_ON(to > cluster_end);
+       /*
+        * Pages involved in this write.
+        *
+        * w_target_page is the page being written to by the user.
+        *
+        * w_pages is an array of pages which always contains
+        * w_target_page, and in the case of an allocating write with
+        * page_size < cluster size, it will contain zero'd and mapped
+        * pages adjacent to w_target_page which need to be written
+        * out in so that future reads from that region will get
+        * zero's.
+        */
+       struct page                     *w_pages[OCFS2_MAX_CTXT_PAGES];
+       unsigned int                    w_num_pages;
+       struct page                     *w_target_page;
 
-       if (wc->w_this_page_new)
-               ret = ocfs2_map_page_blocks(wc->w_this_page, p_blkno, inode,
-                                           cluster_start, cluster_end, 1);
-       else
-               ret = ocfs2_map_page_blocks(wc->w_this_page, p_blkno, inode,
-                                           from, to, 0);
-       if (ret) {
-               mlog_errno(ret);
-               goto out;
+       /*
+        * ocfs2_write_end() uses this to know what the real range to
+        * write in the target should be.
+        */
+       unsigned int                    w_target_from;
+       unsigned int                    w_target_to;
+
+       /*
+        * We could use journal_current_handle() but this is cleaner,
+        * IMHO -Mark
+        */
+       handle_t                        *w_handle;
+
+       struct buffer_head              *w_di_bh;
+};
+
+static void ocfs2_free_write_ctxt(struct ocfs2_write_ctxt *wc)
+{
+       int i;
+
+       for(i = 0; i < wc->w_num_pages; i++) {
+               if (wc->w_pages[i] == NULL)
+                       continue;
+
+               unlock_page(wc->w_pages[i]);
+               mark_page_accessed(wc->w_pages[i]);
+               page_cache_release(wc->w_pages[i]);
        }
 
-       src = buf->ops->map(sp->s_pipe, buf, 1);
-       dst = kmap_atomic(wc->w_this_page, KM_USER1);
-       memcpy(dst + from, src + src_from, bytes);
-       kunmap_atomic(wc->w_this_page, KM_USER1);
-       buf->ops->unmap(sp->s_pipe, buf, src);
+       brelse(wc->w_di_bh);
+       kfree(wc);
+}
+
+static int ocfs2_alloc_write_ctxt(struct ocfs2_write_ctxt **wcp,
+                                 struct ocfs2_super *osb, loff_t pos,
+                                 unsigned len)
+{
+       struct ocfs2_write_ctxt *wc;
+
+       wc = kzalloc(sizeof(struct ocfs2_write_ctxt), GFP_NOFS);
+       if (!wc)
+               return -ENOMEM;
 
-       wc->w_finished_copy = 1;
+       wc->w_cpos = pos >> osb->s_clustersize_bits;
+       wc->w_clen = ocfs2_clusters_for_bytes(osb->sb, len);
 
-       *ret_from = from;
-       *ret_to = to;
-out:
+       if (unlikely(PAGE_CACHE_SHIFT > osb->s_clustersize_bits))
+               wc->w_large_pages = 1;
+       else
+               wc->w_large_pages = 0;
+
+       *wcp = wc;
 
-       return bytes ? (unsigned int)bytes : ret;
+       return 0;
 }
 
 /*
- * This will copy user data from the iovec in the buffered write
- * context.
+ * If a page has any new buffers, zero them out here, and mark them uptodate
+ * and dirty so they'll be written out (in order to prevent uninitialised
+ * block data from leaking). And clear the new bit.
  */
-int ocfs2_map_and_write_user_data(struct inode *inode,
-                                 struct ocfs2_write_ctxt *wc, u64 *p_blkno,
-                                 unsigned int *ret_from, unsigned int *ret_to)
+static void ocfs2_zero_new_buffers(struct page *page, unsigned from, unsigned to)
 {
-       int ret;
-       unsigned int to, from, cluster_start, cluster_end;
-       unsigned long bytes, src_from;
-       char *dst;
-       struct ocfs2_buffered_write_priv *bp = wc->w_private;
-       const struct iovec *cur_iov = bp->b_cur_iov;
-       char __user *buf;
-       struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
+       unsigned int block_start, block_end;
+       struct buffer_head *head, *bh;
 
-       ocfs2_figure_cluster_boundaries(osb, wc->w_cpos, &cluster_start,
-                                       &cluster_end);
+       BUG_ON(!PageLocked(page));
+       if (!page_has_buffers(page))
+               return;
 
-       buf = cur_iov->iov_base + bp->b_cur_off;
-       src_from = (unsigned long)buf & ~PAGE_CACHE_MASK;
+       bh = head = page_buffers(page);
+       block_start = 0;
+       do {
+               block_end = block_start + bh->b_size;
+
+               if (buffer_new(bh)) {
+                       if (block_end > from && block_start < to) {
+                               if (!PageUptodate(page)) {
+                                       unsigned start, end;
+                                       void *kaddr;
+
+                                       start = max(from, block_start);
+                                       end = min(to, block_end);
+
+                                       kaddr = kmap_atomic(page, KM_USER0);
+                                       memset(kaddr+start, 0, end - start);
+                                       flush_dcache_page(page);
+                                       kunmap_atomic(kaddr, KM_USER0);
+                                       set_buffer_uptodate(bh);
+                               }
+
+                               clear_buffer_new(bh);
+                               mark_buffer_dirty(bh);
+                       }
+               }
 
-       from = wc->w_pos & (PAGE_CACHE_SIZE - 1);
+               block_start = block_end;
+               bh = bh->b_this_page;
+       } while (bh != head);
+}
+
+/*
+ * Only called when we have a failure during allocating write to write
+ * zero's to the newly allocated region.
+ */
+static void ocfs2_write_failure(struct inode *inode,
+                               struct ocfs2_write_ctxt *wc,
+                               loff_t user_pos, unsigned user_len)
+{
+       int i;
+       unsigned from, to;
+       struct page *tmppage;
+
+       ocfs2_zero_new_buffers(wc->w_target_page, user_pos, user_len);
 
-       /*
-        * This is a lot of comparisons, but it reads quite
-        * easily, which is important here.
-        */
-       /* Stay within the src page */
-       bytes = PAGE_SIZE - src_from;
-       /* Stay within the vector */
-       bytes = min(bytes,
-                   (unsigned long)(cur_iov->iov_len - bp->b_cur_off));
-       /* Stay within count */
-       bytes = min(bytes, (unsigned long)wc->w_count);
-       /*
-        * For clustersize > page size, just stay within
-        * target page, otherwise we have to calculate pos
-        * within the cluster and obey the rightmost
-        * boundary.
-        */
        if (wc->w_large_pages) {
-               /*
-                * For cluster size < page size, we have to
-                * calculate pos within the cluster and obey
-                * the rightmost boundary.
-                */
-               bytes = min(bytes, (unsigned long)(osb->s_clustersize
-                                  - (wc->w_pos & (osb->s_clustersize - 1))));
+               from = wc->w_target_from;
+               to = wc->w_target_to;
        } else {
-               /*
-                * cluster size > page size is the most common
-                * case - we just stay within the target page
-                * boundary.
-                */
-               bytes = min(bytes, PAGE_CACHE_SIZE - from);
+               from = 0;
+               to = PAGE_CACHE_SIZE;
        }
 
-       to = from + bytes;
+       for(i = 0; i < wc->w_num_pages; i++) {
+               tmppage = wc->w_pages[i];
 
-       BUG_ON(from > PAGE_CACHE_SIZE);
-       BUG_ON(to > PAGE_CACHE_SIZE);
-       BUG_ON(from < cluster_start);
-       BUG_ON(to > cluster_end);
+               if (ocfs2_should_order_data(inode))
+                       walk_page_buffers(wc->w_handle, page_buffers(tmppage),
+                                         from, to, NULL,
+                                         ocfs2_journal_dirty_data);
 
-       if (wc->w_this_page_new)
-               ret = ocfs2_map_page_blocks(wc->w_this_page, p_blkno, inode,
-                                           cluster_start, cluster_end, 1);
-       else
-               ret = ocfs2_map_page_blocks(wc->w_this_page, p_blkno, inode,
-                                           from, to, 0);
-       if (ret) {
-               mlog_errno(ret);
-               goto out;
+               block_commit_write(tmppage, from, to);
        }
-
-       dst = kmap(wc->w_this_page);
-       memcpy(dst + from, bp->b_src_buf + src_from, bytes);
-       kunmap(wc->w_this_page);
-
-       /*
-        * XXX: This is slow, but simple. The caller of
-        * ocfs2_buffered_write_cluster() is responsible for
-        * passing through the iovecs, so it's difficult to
-        * predict what our next step is in here after our
-        * initial write. A future version should be pushing
-        * that iovec manipulation further down.
-        *
-        * By setting this, we indicate that a copy from user
-        * data was done, and subsequent calls for this
-        * cluster will skip copying more data.
-        */
-       wc->w_finished_copy = 1;
-
-       *ret_from = from;
-       *ret_to = to;
-out:
-
-       return bytes ? (unsigned int)bytes : ret;
 }
 
-/*
- * Map, fill and write a page to disk.
- *
- * The work of copying data is done via callback.  Newly allocated
- * pages which don't take user data will be zero'd (set 'new' to
- * indicate an allocating write)
- *
- * Returns a negative error code or the number of bytes copied into
- * the page.
- */
-static int ocfs2_write_data_page(struct inode *inode, handle_t *handle,
-                                u64 *p_blkno, struct page *page,
-                                struct ocfs2_write_ctxt *wc, int new)
+static int ocfs2_prepare_page_for_write(struct inode *inode, u64 *p_blkno,
+                                       struct ocfs2_write_ctxt *wc,
+                                       struct page *page, u32 cpos,
+                                       loff_t user_pos, unsigned user_len,
+                                       int new)
 {
-       int ret, copied = 0;
-       unsigned int from = 0, to = 0;
+       int ret;
+       unsigned int map_from = 0, map_to = 0;
        unsigned int cluster_start, cluster_end;
-       unsigned int zero_from = 0, zero_to = 0;
+       unsigned int user_data_from = 0, user_data_to = 0;
 
-       ocfs2_figure_cluster_boundaries(OCFS2_SB(inode->i_sb), wc->w_cpos,
+       ocfs2_figure_cluster_boundaries(OCFS2_SB(inode->i_sb), cpos,
                                        &cluster_start, &cluster_end);
 
-       if ((wc->w_pos >> PAGE_CACHE_SHIFT) == page->index
-           && !wc->w_finished_copy) {
-
-               wc->w_this_page = page;
-               wc->w_this_page_new = new;
-               ret = wc->w_write_data_page(inode, wc, p_blkno, &from, &to);
-               if (ret < 0) {
+       if (page == wc->w_target_page) {
+               map_from = user_pos & (PAGE_CACHE_SIZE - 1);
+               map_to = map_from + user_len;
+
+               if (new)
+                       ret = ocfs2_map_page_blocks(page, p_blkno, inode,
+                                                   cluster_start, cluster_end,
+                                                   new);
+               else
+                       ret = ocfs2_map_page_blocks(page, p_blkno, inode,
+                                                   map_from, map_to, new);
+               if (ret) {
                        mlog_errno(ret);
                        goto out;
                }
 
-               copied = ret;
-
-               zero_from = from;
-               zero_to = to;
+               user_data_from = map_from;
+               user_data_to = map_to;
                if (new) {
-                       from = cluster_start;
-                       to = cluster_end;
+                       map_from = cluster_start;
+                       map_to = cluster_end;
                }
+
+               wc->w_target_from = map_from;
+               wc->w_target_to = map_to;
        } else {
                /*
                 * If we haven't allocated the new page yet, we
@@ -980,11 +996,11 @@ static int ocfs2_write_data_page(struct inode *inode, handle_t *handle,
                 */
                BUG_ON(!new);
 
-               from = cluster_start;
-               to = cluster_end;
+               map_from = cluster_start;
+               map_to = cluster_end;
 
                ret = ocfs2_map_page_blocks(page, p_blkno, inode,
-                                           cluster_start, cluster_end, 1);
+                                           cluster_start, cluster_end, new);
                if (ret) {
                        mlog_errno(ret);
                        goto out;
@@ -1003,108 +1019,84 @@ static int ocfs2_write_data_page(struct inode *inode, handle_t *handle,
         */
        if (new && !PageUptodate(page))
                ocfs2_clear_page_regions(page, OCFS2_SB(inode->i_sb),
-                                        wc->w_cpos, zero_from, zero_to);
+                                        cpos, user_data_from, user_data_to);
 
        flush_dcache_page(page);
 
-       if (ocfs2_should_order_data(inode)) {
-               ret = walk_page_buffers(handle,
-                                       page_buffers(page),
-                                       from, to, NULL,
-                                       ocfs2_journal_dirty_data);
-               if (ret < 0)
-                       mlog_errno(ret);
-       }
-
-       /*
-        * We don't use generic_commit_write() because we need to
-        * handle our own i_size update.
-        */
-       ret = block_commit_write(page, from, to);
-       if (ret)
-               mlog_errno(ret);
 out:
-
-       return copied ? copied : ret;
+       return ret;
 }
 
 /*
- * Do the actual write of some data into an inode. Optionally allocate
- * in order to fulfill the write.
- *
- * cpos is the logical cluster offset within the file to write at
- *
- * 'phys' is the physical mapping of that offset. a 'phys' value of
- * zero indicates that allocation is required. In this case, data_ac
- * and meta_ac should be valid (meta_ac can be null if metadata
- * allocation isn't required).
+ * This function will only grab one clusters worth of pages.
  */
-static ssize_t ocfs2_write(struct file *file, u32 phys, handle_t *handle,
-                          struct buffer_head *di_bh,
-                          struct ocfs2_alloc_context *data_ac,
-                          struct ocfs2_alloc_context *meta_ac,
-                          struct ocfs2_write_ctxt *wc)
+static int ocfs2_grab_pages_for_write(struct address_space *mapping,
+                                     struct ocfs2_write_ctxt *wc,
+                                     u32 cpos, loff_t user_pos, int new)
 {
-       int ret, i, numpages = 1, new;
-       unsigned int copied = 0;
-       u32 tmp_pos;
-       u64 v_blkno, p_blkno;
-       struct address_space *mapping = file->f_mapping;
+       int ret = 0, i;
+       unsigned long start, target_index, index;
        struct inode *inode = mapping->host;
-       unsigned long index, start;
-       struct page **cpages;
 
-       new = phys == 0 ? 1 : 0;
+       target_index = user_pos >> PAGE_CACHE_SHIFT;
 
        /*
         * Figure out how many pages we'll be manipulating here. For
         * non allocating write, we just change the one
         * page. Otherwise, we'll need a whole clusters worth.
         */
-       if (new)
-               numpages = ocfs2_pages_per_cluster(inode->i_sb);
-
-       cpages = kzalloc(sizeof(*cpages) * numpages, GFP_NOFS);
-       if (!cpages) {
-               ret = -ENOMEM;
-               mlog_errno(ret);
-               return ret;
-       }
-
-       /*
-        * Fill our page array first. That way we've grabbed enough so
-        * that we can zero and flush if we error after adding the
-        * extent.
-        */
        if (new) {
-               start = ocfs2_align_clusters_to_page_index(inode->i_sb,
-                                                          wc->w_cpos);
-               v_blkno = ocfs2_clusters_to_blocks(inode->i_sb, wc->w_cpos);
+               wc->w_num_pages = ocfs2_pages_per_cluster(inode->i_sb);
+               start = ocfs2_align_clusters_to_page_index(inode->i_sb, cpos);
        } else {
-               start = wc->w_pos >> PAGE_CACHE_SHIFT;
-               v_blkno = wc->w_pos >> inode->i_sb->s_blocksize_bits;
+               wc->w_num_pages = 1;
+               start = target_index;
        }
 
-       for(i = 0; i < numpages; i++) {
+       for(i = 0; i < wc->w_num_pages; i++) {
                index = start + i;
 
-               cpages[i] = find_or_create_page(mapping, index, GFP_NOFS);
-               if (!cpages[i]) {
+               wc->w_pages[i] = find_or_create_page(mapping, index, GFP_NOFS);
+               if (!wc->w_pages[i]) {
                        ret = -ENOMEM;
                        mlog_errno(ret);
                        goto out;
                }
+
+               if (index == target_index)
+                       wc->w_target_page = wc->w_pages[i];
        }
+out:
+       return ret;
+}
+
+/*
+ * Prepare a single cluster for write one cluster into the file.
+ */
+static int ocfs2_write_cluster(struct address_space *mapping,
+                              u32 phys, struct ocfs2_alloc_context *data_ac,
+                              struct ocfs2_alloc_context *meta_ac,
+                              struct ocfs2_write_ctxt *wc, u32 cpos,
+                              loff_t user_pos, unsigned user_len)
+{
+       int ret, i, new;
+       u64 v_blkno, p_blkno;
+       struct inode *inode = mapping->host;
+
+       new = phys == 0 ? 1 : 0;
 
        if (new) {
+               u32 tmp_pos;
+
                /*
                 * This is safe to call with the page locks - it won't take
                 * any additional semaphores or cluster locks.
                 */
-               tmp_pos = wc->w_cpos;
+               tmp_pos = cpos;
                ret = ocfs2_do_extend_allocation(OCFS2_SB(inode->i_sb), inode,
-                                                &tmp_pos, 1, di_bh, handle,
-                                                data_ac, meta_ac, NULL);
+                                                &tmp_pos, 1, wc->w_di_bh,
+                                                wc->w_handle, data_ac,
+                                                meta_ac, NULL);
                /*
                 * This shouldn't happen because we must have already
                 * calculated the correct meta data allocation required. The
@@ -1121,103 +1113,132 @@ static ssize_t ocfs2_write(struct file *file, u32 phys, handle_t *handle,
                        mlog_errno(ret);
                        goto out;
                }
+
+               v_blkno = ocfs2_clusters_to_blocks(inode->i_sb, cpos);
+       } else {
+               v_blkno = user_pos >> inode->i_sb->s_blocksize_bits;
        }
 
+       /*
+        * The only reason this should fail is due to an inability to
+        * find the extent added.
+        */
        ret = ocfs2_extent_map_get_blocks(inode, v_blkno, &p_blkno, NULL,
                                          NULL);
        if (ret < 0) {
-
-               /*
-                * XXX: Should we go readonly here?
-                */
-
-               mlog_errno(ret);
+               ocfs2_error(inode->i_sb, "Corrupting extend for inode %llu, "
+                           "at logical block %llu",
+                           (unsigned long long)OCFS2_I(inode)->ip_blkno,
+                           (unsigned long long)v_blkno);
                goto out;
        }
 
        BUG_ON(p_blkno == 0);
 
-       for(i = 0; i < numpages; i++) {
-               ret = ocfs2_write_data_page(inode, handle, &p_blkno, cpages[i],
-                                           wc, new);
-               if (ret < 0) {
-                       mlog_errno(ret);
-                       goto out;
-               }
+       for(i = 0; i < wc->w_num_pages; i++) {
+               int tmpret;
 
-               copied += ret;
+               tmpret = ocfs2_prepare_page_for_write(inode, &p_blkno, wc,
+                                                     wc->w_pages[i], cpos,
+                                                     user_pos, user_len, new);
+               if (tmpret) {
+                       mlog_errno(tmpret);
+                       if (ret == 0)
+                               tmpret = ret;
+               }
        }
 
+       /*
+        * We only have cleanup to do in case of allocating write.
+        */
+       if (ret && new)
+               ocfs2_write_failure(inode, wc, user_pos, user_len);
+
 out:
-       for(i = 0; i < numpages; i++) {
-               unlock_page(cpages[i]);
-               mark_page_accessed(cpages[i]);
-               page_cache_release(cpages[i]);
-       }
-       kfree(cpages);
 
-       return copied ? copied : ret;
+       return ret;
 }
 
-static void ocfs2_write_ctxt_init(struct ocfs2_write_ctxt *wc,
-                                 struct ocfs2_super *osb, loff_t pos,
-                                 size_t count, ocfs2_page_writer *cb,
-                                 void *cb_priv)
+/*
+ * ocfs2_write_end() wants to know which parts of the target page it
+ * should complete the write on. It's easiest to compute them ahead of
+ * time when a more complete view of the write is available.
+ */
+static void ocfs2_set_target_boundaries(struct ocfs2_super *osb,
+                                       struct ocfs2_write_ctxt *wc,
+                                       loff_t pos, unsigned len, int alloc)
 {
-       wc->w_count = count;
-       wc->w_pos = pos;
-       wc->w_cpos = wc->w_pos >> osb->s_clustersize_bits;
-       wc->w_finished_copy = 0;
+       struct ocfs2_write_cluster_desc *desc;
 
-       if (unlikely(PAGE_CACHE_SHIFT > osb->s_clustersize_bits))
-               wc->w_large_pages = 1;
-       else
-               wc->w_large_pages = 0;
+       wc->w_target_from = pos & (PAGE_CACHE_SIZE - 1);
+       wc->w_target_to = wc->w_target_from + len;
+
+       if (alloc == 0)
+               return;
+
+       /*
+        * Allocating write - we may have different boundaries based
+        * on page size and cluster size.
+        *
+        * NOTE: We can no longer compute one value from the other as
+        * the actual write length and user provided length may be
+        * different.
+        */
 
-       wc->w_write_data_page = cb;
-       wc->w_private = cb_priv;
+       if (wc->w_large_pages) {
+               /*
+                * We only care about the 1st and last cluster within
+                * our range and whether they are holes or not. Either
+                * value may be extended out to the start/end of a
+                * newly allocated cluster.
+                */
+               desc = &wc->w_desc[0];
+               if (desc->c_new)
+                       ocfs2_figure_cluster_boundaries(osb,
+                                                       desc->c_cpos,
+                                                       &wc->w_target_from,
+                                                       NULL);
+
+               desc = &wc->w_desc[wc->w_clen - 1];
+               if (desc->c_new)
+                       ocfs2_figure_cluster_boundaries(osb,
+                                                       desc->c_cpos,
+                                                       NULL,
+                                                       &wc->w_target_to);
+       } else {
+               wc->w_target_from = 0;
+               wc->w_target_to = PAGE_CACHE_SIZE;
+       }
 }
 
-/*
- * Write a cluster to an inode. The cluster may not be allocated yet,
- * in which case it will be. This only exists for buffered writes -
- * O_DIRECT takes a more "traditional" path through the kernel.
- *
- * The caller is responsible for incrementing pos, written counts, etc
- *
- * For file systems that don't support sparse files, pre-allocation
- * and page zeroing up until cpos should be done prior to this
- * function call.
- *
- * Callers should be holding i_sem, and the rw cluster lock.
- *
- * Returns the number of user bytes written, or less than zero for
- * error.
- */
-ssize_t ocfs2_buffered_write_cluster(struct file *file, loff_t pos,
-                                    size_t count, ocfs2_page_writer *actor,
-                                    void *priv)
+int ocfs2_write_begin(struct file *file, struct address_space *mapping,
+                     loff_t pos, unsigned len, unsigned flags,
+                     struct page **pagep, void **fsdata)
 {
-       int ret, credits = OCFS2_INODE_UPDATE_CREDITS;
-       ssize_t written = 0;
-       u32 phys;
-       struct inode *inode = file->f_mapping->host;
+       int ret, i, credits = OCFS2_INODE_UPDATE_CREDITS;
+       unsigned int num_clusters = 0, clusters_to_alloc = 0;
+       u32 phys = 0;
+       struct ocfs2_write_ctxt *wc;
+       struct inode *inode = mapping->host;
        struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
-       struct buffer_head *di_bh = NULL;
        struct ocfs2_dinode *di;
        struct ocfs2_alloc_context *data_ac = NULL;
        struct ocfs2_alloc_context *meta_ac = NULL;
        handle_t *handle;
-       struct ocfs2_write_ctxt wc;
+       struct ocfs2_write_cluster_desc *desc;
 
-       ocfs2_write_ctxt_init(&wc, osb, pos, count, actor, priv);
+       ret = ocfs2_alloc_write_ctxt(&wc, osb, pos, len);
+       if (ret) {
+               mlog_errno(ret);
+               return ret;
+       }
 
-       ret = ocfs2_meta_lock(inode, &di_bh, 1);
+       ret = ocfs2_meta_lock(inode, &wc->w_di_bh, 1);
        if (ret) {
                mlog_errno(ret);
                goto out;
        }
-       di = (struct ocfs2_dinode *)di_bh->b_data;
+       di = (struct ocfs2_dinode *)wc->w_di_bh->b_data;
 
        /*
         * Take alloc sem here to prevent concurrent lookups. That way
@@ -1228,23 +1249,60 @@ ssize_t ocfs2_buffered_write_cluster(struct file *file, loff_t pos,
         */
        down_write(&OCFS2_I(inode)->ip_alloc_sem);
 
-       ret = ocfs2_get_clusters(inode, wc.w_cpos, &phys, NULL, NULL);
-       if (ret) {
-               mlog_errno(ret);
-               goto out_meta;
+       for (i = 0; i < wc->w_clen; i++) {
+               desc = &wc->w_desc[i];
+               desc->c_cpos = wc->w_cpos + i;
+
+               if (num_clusters == 0) {
+                       ret = ocfs2_get_clusters(inode, desc->c_cpos, &phys,
+                                                &num_clusters, NULL);
+                       if (ret) {
+                               mlog_errno(ret);
+                               goto out_meta;
+                       }
+               } else if (phys) {
+                       /*
+                        * Only increment phys if it doesn't describe
+                        * a hole.
+                        */
+                       phys++;
+               }
+
+               desc->c_phys = phys;
+               if (phys == 0) {
+                       desc->c_new = 1;
+                       clusters_to_alloc++;
+               }
+
+               num_clusters--;
        }
 
-       /* phys == 0 means that allocation is required. */
-       if (phys == 0) {
-               ret = ocfs2_lock_allocators(inode, di, 1, &data_ac, &meta_ac);
+       /*
+        * We set w_target_from, w_target_to here so that
+        * ocfs2_write_end() knows which range in the target page to
+        * write out. An allocation requires that we write the entire
+        * cluster range.
+        */
+       if (clusters_to_alloc > 0) {
+               /*
+                * XXX: We are stretching the limits of
+                * ocfs2_lock_allocators(). It greately over-estimates
+                * the work to be done.
+                */
+               ret = ocfs2_lock_allocators(inode, di, clusters_to_alloc,
+                                           &data_ac, &meta_ac);
                if (ret) {
                        mlog_errno(ret);
                        goto out_meta;
                }
 
-               credits = ocfs2_calc_extend_credits(inode->i_sb, di, 1);
+               credits = ocfs2_calc_extend_credits(inode->i_sb, di,
+                                                   clusters_to_alloc);
+
        }
 
+       ocfs2_set_target_boundaries(osb, wc, pos, len, clusters_to_alloc);
+
        ret = ocfs2_data_lock(inode, 1);
        if (ret) {
                mlog_errno(ret);
@@ -1258,36 +1316,50 @@ ssize_t ocfs2_buffered_write_cluster(struct file *file, loff_t pos,
                goto out_data;
        }
 
-       written = ocfs2_write(file, phys, handle, di_bh, data_ac,
-                             meta_ac, &wc);
-       if (written < 0) {
-               ret = written;
+       wc->w_handle = handle;
+
+       /*
+        * We don't want this to fail in ocfs2_write_end(), so do it
+        * here.
+        */
+       ret = ocfs2_journal_access(handle, inode, wc->w_di_bh,
+                                  OCFS2_JOURNAL_ACCESS_WRITE);
+       if (ret) {
                mlog_errno(ret);
                goto out_commit;
        }
 
-       ret = ocfs2_journal_access(handle, inode, di_bh,
-                                  OCFS2_JOURNAL_ACCESS_WRITE);
+       /*
+        * Fill our page array first. That way we've grabbed enough so
+        * that we can zero and flush if we error after adding the
+        * extent.
+        */
+       ret = ocfs2_grab_pages_for_write(mapping, wc, wc->w_cpos, pos,
+                                        clusters_to_alloc);
        if (ret) {
                mlog_errno(ret);
                goto out_commit;
        }
 
-       pos += written;
-       if (pos > inode->i_size) {
-               i_size_write(inode, pos);
-               mark_inode_dirty(inode);
+       for (i = 0; i < wc->w_clen; i++) {
+               desc = &wc->w_desc[i];
+
+               ret = ocfs2_write_cluster(mapping, desc->c_phys, data_ac,
+                                         meta_ac, wc, desc->c_cpos, pos, len);
+               if (ret) {
+                       mlog_errno(ret);
+                       goto out_commit;
+               }
        }
-       inode->i_blocks = ocfs2_inode_sector_count(inode);
-       di->i_size = cpu_to_le64((u64)i_size_read(inode));
-       inode->i_mtime = inode->i_ctime = CURRENT_TIME;
-       di->i_mtime = di->i_ctime = cpu_to_le64(inode->i_mtime.tv_sec);
-       di->i_mtime_nsec = di->i_ctime_nsec = cpu_to_le32(inode->i_mtime.tv_nsec);
 
-       ret = ocfs2_journal_dirty(handle, di_bh);
-       if (ret)
-               mlog_errno(ret);
+       if (data_ac)
+               ocfs2_free_alloc_context(data_ac);
+       if (meta_ac)
+               ocfs2_free_alloc_context(meta_ac);
 
+       *pagep = wc->w_target_page;
+       *fsdata = wc;
+       return 0;
 out_commit:
        ocfs2_commit_trans(osb, handle);
 
@@ -1299,13 +1371,85 @@ out_meta:
        ocfs2_meta_unlock(inode, 1);
 
 out:
-       brelse(di_bh);
+       ocfs2_free_write_ctxt(wc);
+
        if (data_ac)
                ocfs2_free_alloc_context(data_ac);
        if (meta_ac)
                ocfs2_free_alloc_context(meta_ac);
+       return ret;
+}
+
+int ocfs2_write_end(struct file *file, struct address_space *mapping,
+                   loff_t pos, unsigned len, unsigned copied,
+                   struct page *page, void *fsdata)
+{
+       int i;
+       unsigned from, to, start = pos & (PAGE_CACHE_SIZE - 1);
+       struct inode *inode = mapping->host;
+       struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
+       struct ocfs2_write_ctxt *wc = fsdata;
+       struct ocfs2_dinode *di = (struct ocfs2_dinode *)wc->w_di_bh->b_data;
+       handle_t *handle = wc->w_handle;
+       struct page *tmppage;
+
+       if (unlikely(copied < len)) {
+               if (!PageUptodate(wc->w_target_page))
+                       copied = 0;
+
+               ocfs2_zero_new_buffers(wc->w_target_page, start+copied,
+                                      start+len);
+       }
+       flush_dcache_page(wc->w_target_page);
+
+       for(i = 0; i < wc->w_num_pages; i++) {
+               tmppage = wc->w_pages[i];
+
+               if (tmppage == wc->w_target_page) {
+                       from = wc->w_target_from;
+                       to = wc->w_target_to;
+
+                       BUG_ON(from > PAGE_CACHE_SIZE ||
+                              to > PAGE_CACHE_SIZE ||
+                              to < from);
+               } else {
+                       /*
+                        * Pages adjacent to the target (if any) imply
+                        * a hole-filling write in which case we want
+                        * to flush their entire range.
+                        */
+                       from = 0;
+                       to = PAGE_CACHE_SIZE;
+               }
+
+               if (ocfs2_should_order_data(inode))
+                       walk_page_buffers(wc->w_handle, page_buffers(tmppage),
+                                         from, to, NULL,
+                                         ocfs2_journal_dirty_data);
+
+               block_commit_write(tmppage, from, to);
+       }
+
+       pos += copied;
+       if (pos > inode->i_size) {
+               i_size_write(inode, pos);
+               mark_inode_dirty(inode);
+       }
+       inode->i_blocks = ocfs2_inode_sector_count(inode);
+       di->i_size = cpu_to_le64((u64)i_size_read(inode));
+       inode->i_mtime = inode->i_ctime = CURRENT_TIME;
+       di->i_mtime = di->i_ctime = cpu_to_le64(inode->i_mtime.tv_sec);
+       di->i_mtime_nsec = di->i_ctime_nsec = cpu_to_le32(inode->i_mtime.tv_nsec);
+
+       ocfs2_journal_dirty(handle, wc->w_di_bh);
+
+       ocfs2_commit_trans(osb, handle);
+       ocfs2_data_unlock(inode, 1);
+       up_write(&OCFS2_I(inode)->ip_alloc_sem);
+       ocfs2_meta_unlock(inode, 1);
+       ocfs2_free_write_ctxt(wc);
 
-       return written ? written : ret;
+       return copied;
 }
 
 const struct address_space_operations ocfs2_aops = {
index 45821d479b5a3662028d99847dc64835f39b92fb..bdcdd1ae63a9c418dd41255bd784d8d482e7ca18 100644 (file)
@@ -42,57 +42,13 @@ int walk_page_buffers(      handle_t *handle,
                        int (*fn)(      handle_t *handle,
                                        struct buffer_head *bh));
 
-struct ocfs2_write_ctxt;
-typedef int (ocfs2_page_writer)(struct inode *, struct ocfs2_write_ctxt *,
-                               u64 *, unsigned int *, unsigned int *);
+int ocfs2_write_begin(struct file *file, struct address_space *mapping,
+                     loff_t pos, unsigned len, unsigned flags,
+                     struct page **pagep, void **fsdata);
 
-ssize_t ocfs2_buffered_write_cluster(struct file *file, loff_t pos,
-                                    size_t count, ocfs2_page_writer *actor,
-                                    void *priv);
-
-struct ocfs2_write_ctxt {
-       size_t                          w_count;
-       loff_t                          w_pos;
-       u32                             w_cpos;
-       unsigned int                    w_finished_copy;
-
-       /* This is true if page_size > cluster_size */
-       unsigned int                    w_large_pages;
-
-       /* Filler callback and private data */
-       ocfs2_page_writer               *w_write_data_page;
-       void                            *w_private;
-
-       /* Only valid for the filler callback */
-       struct page                     *w_this_page;
-       unsigned int                    w_this_page_new;
-};
-
-struct ocfs2_buffered_write_priv {
-       char                            *b_src_buf;
-       const struct iovec              *b_cur_iov; /* Current iovec */
-       size_t                          b_cur_off; /* Offset in the
-                                                   * current iovec */
-};
-int ocfs2_map_and_write_user_data(struct inode *inode,
-                                 struct ocfs2_write_ctxt *wc,
-                                 u64 *p_blkno,
-                                 unsigned int *ret_from,
-                                 unsigned int *ret_to);
-
-struct ocfs2_splice_write_priv {
-       struct splice_desc              *s_sd;
-       struct pipe_buffer              *s_buf;
-       struct pipe_inode_info          *s_pipe;
-       /* Neither offset value is ever larger than one page */
-       unsigned int                    s_offset;
-       unsigned int                    s_buf_offset;
-};
-int ocfs2_map_and_write_splice_data(struct inode *inode,
-                                   struct ocfs2_write_ctxt *wc,
-                                   u64 *p_blkno,
-                                   unsigned int *ret_from,
-                                   unsigned int *ret_to);
+int ocfs2_write_end(struct file *file, struct address_space *mapping,
+                   loff_t pos, unsigned len, unsigned copied,
+                   struct page *page, void *fsdata);
 
 /* all ocfs2_dio_end_io()'s fault */
 #define ocfs2_iocb_is_rw_locked(iocb) \
index 566f9b70ec9122567c986455fcd22f5f888bead9..4c850d00c26975ee7dd6f165ba704e9b55cbb290 100644 (file)
@@ -1335,15 +1335,16 @@ ocfs2_set_next_iovec(const struct iovec **iovp, size_t *basep, size_t bytes)
        *basep = base;
 }
 
-static struct page * ocfs2_get_write_source(struct ocfs2_buffered_write_priv *bp,
+static struct page * ocfs2_get_write_source(char **ret_src_buf,
                                            const struct iovec *cur_iov,
                                            size_t iov_offset)
 {
        int ret;
-       char *buf;
+       char *buf = cur_iov->iov_base + iov_offset;
        struct page *src_page = NULL;
+       unsigned long off;
 
-       buf = cur_iov->iov_base + iov_offset;
+       off = (unsigned long)(buf) & ~PAGE_CACHE_MASK;
 
        if (!segment_eq(get_fs(), KERNEL_DS)) {
                /*
@@ -1355,18 +1356,17 @@ static struct page * ocfs2_get_write_source(struct ocfs2_buffered_write_priv *bp
                                     (unsigned long)buf & PAGE_CACHE_MASK, 1,
                                     0, 0, &src_page, NULL);
                if (ret == 1)
-                       bp->b_src_buf = kmap(src_page);
+                       *ret_src_buf = kmap(src_page) + off;
                else
                        src_page = ERR_PTR(-EFAULT);
        } else {
-               bp->b_src_buf = buf;
+               *ret_src_buf = buf;
        }
 
        return src_page;
 }
 
-static void ocfs2_put_write_source(struct ocfs2_buffered_write_priv *bp,
-                                  struct page *page)
+static void ocfs2_put_write_source(struct page *page)
 {
        if (page) {
                kunmap(page);
@@ -1382,10 +1382,12 @@ static ssize_t ocfs2_file_buffered_write(struct file *file, loff_t *ppos,
 {
        int ret = 0;
        ssize_t copied, total = 0;
-       size_t iov_offset = 0;
+       size_t iov_offset = 0, bytes;
+       loff_t pos;
        const struct iovec *cur_iov = iov;
-       struct ocfs2_buffered_write_priv bp;
-       struct page *page;
+       struct page *user_page, *page;
+       char *buf, *dst;
+       void *fsdata;
 
        /*
         * handle partial DIO write.  Adjust cur_iov if needed.
@@ -1393,21 +1395,38 @@ static ssize_t ocfs2_file_buffered_write(struct file *file, loff_t *ppos,
        ocfs2_set_next_iovec(&cur_iov, &iov_offset, o_direct_written);
 
        do {
-               bp.b_cur_off = iov_offset;
-               bp.b_cur_iov = cur_iov;
+               pos = *ppos;
 
-               page = ocfs2_get_write_source(&bp, cur_iov, iov_offset);
-               if (IS_ERR(page)) {
-                       ret = PTR_ERR(page);
+               user_page = ocfs2_get_write_source(&buf, cur_iov, iov_offset);
+               if (IS_ERR(user_page)) {
+                       ret = PTR_ERR(user_page);
                        goto out;
                }
 
-               copied = ocfs2_buffered_write_cluster(file, *ppos, count,
-                                                     ocfs2_map_and_write_user_data,
-                                                     &bp);
+               /* Stay within our page boundaries */
+               bytes = min((PAGE_CACHE_SIZE - ((unsigned long)pos & ~PAGE_CACHE_MASK)),
+                           (PAGE_CACHE_SIZE - ((unsigned long)buf & ~PAGE_CACHE_MASK)));
+               /* Stay within the vector boundary */
+               bytes = min_t(size_t, bytes, cur_iov->iov_len - iov_offset);
+               /* Stay within count */
+               bytes = min(bytes, count);
+
+               page = NULL;
+               ret = ocfs2_write_begin(file, file->f_mapping, pos, bytes, 0,
+                                       &page, &fsdata);
+               if (ret) {
+                       mlog_errno(ret);
+                       goto out;
+               }
 
-               ocfs2_put_write_source(&bp, page);
+               dst = kmap_atomic(page, KM_USER0);
+               memcpy(dst + (pos & (PAGE_CACHE_SIZE - 1)), buf, bytes);
+               kunmap_atomic(dst, KM_USER0);
+               flush_dcache_page(page);
+               ocfs2_put_write_source(user_page);
 
+               copied = ocfs2_write_end(file, file->f_mapping, pos, bytes,
+                                        bytes, page, fsdata);
                if (copied < 0) {
                        mlog_errno(copied);
                        ret = copied;
@@ -1415,7 +1434,7 @@ static ssize_t ocfs2_file_buffered_write(struct file *file, loff_t *ppos,
                }
 
                total += copied;
-               *ppos = *ppos + copied;
+               *ppos = pos + copied;
                count -= copied;
 
                ocfs2_set_next_iovec(&cur_iov, &iov_offset, copied);
@@ -1585,52 +1604,46 @@ static int ocfs2_splice_write_actor(struct pipe_inode_info *pipe,
                                    struct pipe_buffer *buf,
                                    struct splice_desc *sd)
 {
-       int ret, count, total = 0;
+       int ret, count;
        ssize_t copied = 0;
-       struct ocfs2_splice_write_priv sp;
+       struct file *file = sd->u.file;
+       unsigned int offset;
+       struct page *page = NULL;
+       void *fsdata;
+       char *src, *dst;
 
        ret = buf->ops->confirm(pipe, buf);
        if (ret)
                goto out;
 
-       sp.s_sd = sd;
-       sp.s_buf = buf;
-       sp.s_pipe = pipe;
-       sp.s_offset = sd->pos & ~PAGE_CACHE_MASK;
-       sp.s_buf_offset = buf->offset;
-
+       offset = sd->pos & ~PAGE_CACHE_MASK;
        count = sd->len;
-       if (count + sp.s_offset > PAGE_CACHE_SIZE)
-               count = PAGE_CACHE_SIZE - sp.s_offset;
+       if (count + offset > PAGE_CACHE_SIZE)
+               count = PAGE_CACHE_SIZE - offset;
 
-       do {
-               /*
-                * splice wants us to copy up to one page at a
-                * time. For pagesize > cluster size, this means we
-                * might enter ocfs2_buffered_write_cluster() more
-                * than once, so keep track of our progress here.
-                */
-               copied = ocfs2_buffered_write_cluster(sd->u.file,
-                                                     (loff_t)sd->pos + total,
-                                                     count,
-                                                     ocfs2_map_and_write_splice_data,
-                                                     &sp);
-               if (copied < 0) {
-                       mlog_errno(copied);
-                       ret = copied;
-                       goto out;
-               }
+       ret = ocfs2_write_begin(file, file->f_mapping, sd->pos, count, 0,
+                               &page, &fsdata);
+       if (ret) {
+               mlog_errno(ret);
+               goto out;
+       }
 
-               count -= copied;
-               sp.s_offset += copied;
-               sp.s_buf_offset += copied;
-               total += copied;
-       } while (count);
+       src = buf->ops->map(pipe, buf, 1);
+       dst = kmap_atomic(page, KM_USER1);
+       memcpy(dst + offset, src + buf->offset, count);
+       kunmap_atomic(page, KM_USER1);
+       buf->ops->unmap(pipe, buf, src);
 
-       ret = 0;
+       copied = ocfs2_write_end(file, file->f_mapping, sd->pos, count, count,
+                                page, fsdata);
+       if (copied < 0) {
+               mlog_errno(copied);
+               ret = copied;
+               goto out;
+       }
 out:
 
-       return total ? total : ret;
+       return copied ? copied : ret;
 }
 
 static ssize_t __ocfs2_file_splice_write(struct pipe_inode_info *pipe,