From c8fe9b17d055fe80e1a1591f5900ce41fbf6b796 Mon Sep 17 00:00:00 2001 From: "Yan, Zheng" Date: Wed, 23 Dec 2015 21:23:38 +0800 Subject: [PATCH] ceph: Asynchronous IO support The basic idea of AIO support is simple, just call kiocb::ki_complete() in OSD request's complete callback. But there are several special cases. when IO span multiple objects, we need to wait until all OSD requests are complete, then call kiocb::ki_complete(). Error handling in this case is tricky too. For simplify, AIO both span multiple objects and extends i_size are not allowed. Another special case is check EOF for reading (other client can write to the file and extend i_size concurrently). For simplify, the direct-IO/AIO code path does do the check, fallback to normal syn read instead. Signed-off-by: Yan, Zheng --- fs/ceph/file.c | 397 ++++++++++++++++++++++++++++++++++--------------- 1 file changed, 278 insertions(+), 119 deletions(-) diff --git a/fs/ceph/file.c b/fs/ceph/file.c index 3c68e6aee2f0..8e924b7dd498 100644 --- a/fs/ceph/file.c +++ b/fs/ceph/file.c @@ -397,8 +397,9 @@ int ceph_release(struct inode *inode, struct file *file) } enum { - CHECK_EOF = 1, - READ_INLINE = 2, + HAVE_RETRIED = 1, + CHECK_EOF = 2, + READ_INLINE = 3, }; /* @@ -411,17 +412,14 @@ enum { static int striped_read(struct inode *inode, u64 off, u64 len, struct page **pages, int num_pages, - int *checkeof, bool o_direct, - unsigned long buf_align) + int *checkeof) { struct ceph_fs_client *fsc = ceph_inode_to_client(inode); struct ceph_inode_info *ci = ceph_inode(inode); u64 pos, this_len, left; - int io_align, page_align; - int pages_left; - int read; + int page_align, pages_left; + int read, ret; struct page **page_pos; - int ret; bool hit_stripe, was_short; /* @@ -432,13 +430,9 @@ static int striped_read(struct inode *inode, page_pos = pages; pages_left = num_pages; read = 0; - io_align = off & ~PAGE_MASK; more: - if (o_direct) - page_align = (pos - io_align + buf_align) & ~PAGE_MASK; - else - page_align = pos & ~PAGE_MASK; + page_align = pos & ~PAGE_MASK; this_len = left; ret = ceph_osdc_readpages(&fsc->client->osdc, ceph_vino(inode), &ci->i_layout, pos, &this_len, @@ -457,8 +451,7 @@ more: if (was_short && (pos + ret < inode->i_size)) { int zlen = min(this_len - ret, inode->i_size - pos - ret); - int zoff = (o_direct ? buf_align : io_align) + - read + ret; + int zoff = (off & ~PAGE_MASK) + read + ret; dout(" zero gap %llu to %llu\n", pos + ret, pos + ret + zlen); ceph_zero_page_vector_range(zoff, zlen, pages); @@ -521,54 +514,28 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *i, if (ret < 0) return ret; - if (iocb->ki_flags & IOCB_DIRECT) { - while (iov_iter_count(i)) { - size_t start; - ssize_t n; - - n = dio_get_pagev_size(i); - pages = dio_get_pages_alloc(i, n, &start, &num_pages); - if (IS_ERR(pages)) - return PTR_ERR(pages); - - ret = striped_read(inode, off, n, - pages, num_pages, checkeof, - 1, start); - - ceph_put_page_vector(pages, num_pages, true); - - if (ret <= 0) - break; - off += ret; - iov_iter_advance(i, ret); - if (ret < n) + num_pages = calc_pages_for(off, len); + pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL); + if (IS_ERR(pages)) + return PTR_ERR(pages); + ret = striped_read(inode, off, len, pages, + num_pages, checkeof); + if (ret > 0) { + int l, k = 0; + size_t left = ret; + + while (left) { + size_t page_off = off & ~PAGE_MASK; + size_t copy = min_t(size_t, left, + PAGE_SIZE - page_off); + l = copy_page_to_iter(pages[k++], page_off, copy, i); + off += l; + left -= l; + if (l < copy) break; } - } else { - num_pages = calc_pages_for(off, len); - pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL); - if (IS_ERR(pages)) - return PTR_ERR(pages); - ret = striped_read(inode, off, len, pages, - num_pages, checkeof, 0, 0); - if (ret > 0) { - int l, k = 0; - size_t left = ret; - - while (left) { - size_t page_off = off & ~PAGE_MASK; - size_t copy = min_t(size_t, - PAGE_SIZE - page_off, left); - l = copy_page_to_iter(pages[k++], page_off, - copy, i); - off += l; - left -= l; - if (l < copy) - break; - } - } - ceph_release_page_vector(pages, num_pages); } + ceph_release_page_vector(pages, num_pages); if (off > iocb->ki_pos) { ret = off - iocb->ki_pos; @@ -579,6 +546,113 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *i, return ret; } +struct ceph_aio_request { + struct kiocb *iocb; + size_t total_len; + int write; + int error; + struct list_head osd_reqs; + unsigned num_reqs; + atomic_t pending_reqs; + struct ceph_cap_flush *prealloc_cf; +}; + +static void ceph_aio_complete(struct inode *inode, + struct ceph_aio_request *aio_req) +{ + struct ceph_inode_info *ci = ceph_inode(inode); + int ret; + + if (!atomic_dec_and_test(&aio_req->pending_reqs)) + return; + + ret = aio_req->error; + if (!ret) + ret = aio_req->total_len; + + dout("ceph_aio_complete %p rc %d\n", inode, ret); + + if (ret >= 0 && aio_req->write) { + int dirty; + + loff_t endoff = aio_req->iocb->ki_pos + aio_req->total_len; + if (endoff > i_size_read(inode)) { + if (ceph_inode_set_size(inode, endoff)) + ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL); + } + + spin_lock(&ci->i_ceph_lock); + ci->i_inline_version = CEPH_INLINE_NONE; + dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR, + &aio_req->prealloc_cf); + spin_unlock(&ci->i_ceph_lock); + if (dirty) + __mark_inode_dirty(inode, dirty); + + } + + ceph_put_cap_refs(ci, (aio_req->write ? CEPH_CAP_FILE_WR : + CEPH_CAP_FILE_RD)); + + aio_req->iocb->ki_complete(aio_req->iocb, ret, 0); + + ceph_free_cap_flush(aio_req->prealloc_cf); + kfree(aio_req); +} + +static void ceph_aio_complete_req(struct ceph_osd_request *req, + struct ceph_msg *msg) +{ + int rc = req->r_result; + struct inode *inode = req->r_inode; + struct ceph_aio_request *aio_req = req->r_priv; + struct ceph_osd_data *osd_data = osd_req_op_extent_osd_data(req, 0); + int num_pages = calc_pages_for((u64)osd_data->alignment, + osd_data->length); + + dout("ceph_aio_complete_req %p rc %d bytes %llu\n", + inode, rc, osd_data->length); + + if (rc == -EOLDSNAPC) { + BUG_ON(1); + } + + if (!aio_req->write) { + if (rc == -ENOENT) + rc = 0; + if (rc >= 0 && osd_data->length > rc) { + int zoff = osd_data->alignment + rc; + int zlen = osd_data->length - rc; + /* + * If read is satisfied by single OSD request, + * it can pass EOF. Otherwise read is within + * i_size. + */ + if (aio_req->num_reqs == 1) { + loff_t i_size = i_size_read(inode); + loff_t endoff = aio_req->iocb->ki_pos + rc; + if (endoff < i_size) + zlen = min_t(size_t, zlen, + i_size - endoff); + aio_req->total_len = rc + zlen; + } + + if (zlen > 0) + ceph_zero_page_vector_range(zoff, zlen, + osd_data->pages); + } + } + + ceph_put_page_vector(osd_data->pages, num_pages, false); + ceph_osdc_put_request(req); + + if (rc < 0) + cmpxchg(&aio_req->error, 0, rc); + + ceph_aio_complete(inode, aio_req); + return; +} + /* * Write commit request unsafe callback, called to tell us when a * request is unsafe (that is, in flight--has been handed to the @@ -612,16 +686,10 @@ static void ceph_sync_write_unsafe(struct ceph_osd_request *req, bool unsafe) } -/* - * Synchronous write, straight from __user pointer or user pages. - * - * If write spans object boundary, just do multiple writes. (For a - * correct atomic write, we should e.g. take write locks on all - * objects, rollback on failure, etc.) - */ static ssize_t -ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos, - struct ceph_snap_context *snapc) +ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter, + struct ceph_snap_context *snapc, + struct ceph_cap_flush **pcf) { struct file *file = iocb->ki_filp; struct inode *inode = file_inode(file); @@ -630,44 +698,52 @@ ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos, struct ceph_vino vino; struct ceph_osd_request *req; struct page **pages; - int num_pages; - int written = 0; + struct ceph_aio_request *aio_req = NULL; + int num_pages = 0; int flags; - int check_caps = 0; int ret; struct timespec mtime = CURRENT_TIME; - size_t count = iov_iter_count(from); + size_t count = iov_iter_count(iter); + loff_t pos = iocb->ki_pos; + bool write = iov_iter_rw(iter) == WRITE; - if (ceph_snap(file_inode(file)) != CEPH_NOSNAP) + if (write && ceph_snap(file_inode(file)) != CEPH_NOSNAP) return -EROFS; - dout("sync_direct_write on file %p %lld~%u\n", file, pos, - (unsigned)count); + dout("sync_direct_read_write (%s) on file %p %lld~%u\n", + (write ? "write" : "read"), file, pos, (unsigned)count); ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + count); if (ret < 0) return ret; - ret = invalidate_inode_pages2_range(inode->i_mapping, - pos >> PAGE_CACHE_SHIFT, - (pos + count) >> PAGE_CACHE_SHIFT); - if (ret < 0) - dout("invalidate_inode_pages2_range returned %d\n", ret); + if (write) { + ret = invalidate_inode_pages2_range(inode->i_mapping, + pos >> PAGE_CACHE_SHIFT, + (pos + count) >> PAGE_CACHE_SHIFT); + if (ret < 0) + dout("invalidate_inode_pages2_range returned %d\n", ret); - flags = CEPH_OSD_FLAG_ORDERSNAP | - CEPH_OSD_FLAG_ONDISK | - CEPH_OSD_FLAG_WRITE; + flags = CEPH_OSD_FLAG_ORDERSNAP | + CEPH_OSD_FLAG_ONDISK | + CEPH_OSD_FLAG_WRITE; + } else { + flags = CEPH_OSD_FLAG_READ; + } - while (iov_iter_count(from) > 0) { - u64 len = dio_get_pagev_size(from); - size_t start; - ssize_t n; + while (iov_iter_count(iter) > 0) { + u64 size = dio_get_pagev_size(iter); + size_t start = 0; + ssize_t len; vino = ceph_vino(inode); req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, - vino, pos, &len, 0, - 2,/*include a 'startsync' command*/ - CEPH_OSD_OP_WRITE, flags, snapc, + vino, pos, &size, 0, + /*include a 'startsync' command*/ + write ? 2 : 1, + write ? CEPH_OSD_OP_WRITE : + CEPH_OSD_OP_READ, + flags, snapc, ci->i_truncate_seq, ci->i_truncate_size, false); @@ -676,10 +752,8 @@ ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos, break; } - osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC, 0); - - n = len; - pages = dio_get_pages_alloc(from, len, &start, &num_pages); + len = size; + pages = dio_get_pages_alloc(iter, len, &start, &num_pages); if (IS_ERR(pages)) { ceph_osdc_put_request(req); ret = PTR_ERR(pages); @@ -687,47 +761,126 @@ ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos, } /* - * throw out any page cache pages in this range. this - * may block. + * To simplify error handling, allow AIO when IO within i_size + * or IO can be satisfied by single OSD request. */ - truncate_inode_pages_range(inode->i_mapping, pos, - (pos+n) | (PAGE_CACHE_SIZE-1)); - osd_req_op_extent_osd_data_pages(req, 0, pages, n, start, - false, false); + if (pos == iocb->ki_pos && !is_sync_kiocb(iocb) && + (len == count || pos + count <= i_size_read(inode))) { + aio_req = kzalloc(sizeof(*aio_req), GFP_KERNEL); + if (aio_req) { + aio_req->iocb = iocb; + aio_req->write = write; + INIT_LIST_HEAD(&aio_req->osd_reqs); + if (write) { + swap(aio_req->prealloc_cf, *pcf); + } + } + /* ignore error */ + } + + if (write) { + /* + * throw out any page cache pages in this range. this + * may block. + */ + truncate_inode_pages_range(inode->i_mapping, pos, + (pos+len) | (PAGE_CACHE_SIZE - 1)); + + osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC, 0); + } + + + osd_req_op_extent_osd_data_pages(req, 0, pages, len, start, + false, false); - /* BUG_ON(vino.snap != CEPH_NOSNAP); */ ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime); - ret = ceph_osdc_start_request(&fsc->client->osdc, req, false); + if (aio_req) { + aio_req->total_len += len; + aio_req->num_reqs++; + atomic_inc(&aio_req->pending_reqs); + + req->r_callback = ceph_aio_complete_req; + req->r_inode = inode; + req->r_priv = aio_req; + list_add_tail(&req->r_unsafe_item, &aio_req->osd_reqs); + + pos += len; + iov_iter_advance(iter, len); + continue; + } + + ret = ceph_osdc_start_request(req->r_osdc, req, false); if (!ret) ret = ceph_osdc_wait_request(&fsc->client->osdc, req); + size = i_size_read(inode); + if (!write) { + if (ret == -ENOENT) + ret = 0; + if (ret >= 0 && ret < len && pos + ret < size) { + int zlen = min_t(size_t, len - ret, + size - pos - ret); + ceph_zero_page_vector_range(start + ret, zlen, + pages); + ret += zlen; + } + if (ret >= 0) + len = ret; + } + ceph_put_page_vector(pages, num_pages, false); ceph_osdc_put_request(req); - if (ret) + if (ret < 0) break; - pos += n; - written += n; - iov_iter_advance(from, n); - if (pos > i_size_read(inode)) { - check_caps = ceph_inode_set_size(inode, pos); - if (check_caps) + pos += len; + iov_iter_advance(iter, len); + + if (!write && pos >= size) + break; + + if (write && pos > size) { + if (ceph_inode_set_size(inode, pos)) ceph_check_caps(ceph_inode(inode), CHECK_CAPS_AUTHONLY, NULL); } } - if (ret != -EOLDSNAPC && written > 0) { + if (aio_req) { + if (aio_req->num_reqs == 0) { + kfree(aio_req); + return ret; + } + + ceph_get_cap_refs(ci, write ? CEPH_CAP_FILE_WR : + CEPH_CAP_FILE_RD); + + while (!list_empty(&aio_req->osd_reqs)) { + req = list_first_entry(&aio_req->osd_reqs, + struct ceph_osd_request, + r_unsafe_item); + list_del_init(&req->r_unsafe_item); + if (ret >= 0) + ret = ceph_osdc_start_request(req->r_osdc, + req, false); + if (ret < 0) { + req->r_result = ret; + ceph_aio_complete_req(req, NULL); + } + } + return -EIOCBQUEUED; + } + + if (ret != -EOLDSNAPC && pos > iocb->ki_pos) { + ret = pos - iocb->ki_pos; iocb->ki_pos = pos; - ret = written; } return ret; } - /* * Synchronous write, straight from __user pointer or user pages. * @@ -897,8 +1050,14 @@ again: ceph_cap_string(got)); if (ci->i_inline_version == CEPH_INLINE_NONE) { - /* hmm, this isn't really async... */ - ret = ceph_sync_read(iocb, to, &retry_op); + if (!retry_op && (iocb->ki_flags & IOCB_DIRECT)) { + ret = ceph_direct_read_write(iocb, to, + NULL, NULL); + if (ret >= 0 && ret < len) + retry_op = CHECK_EOF; + } else { + ret = ceph_sync_read(iocb, to, &retry_op); + } } else { retry_op = READ_INLINE; } @@ -916,7 +1075,7 @@ again: pinned_page = NULL; } ceph_put_cap_refs(ci, got); - if (retry_op && ret >= 0) { + if (retry_op > HAVE_RETRIED && ret >= 0) { int statret; struct page *page = NULL; loff_t i_size; @@ -973,7 +1132,7 @@ again: read += ret; len -= ret; - retry_op = 0; + retry_op = HAVE_RETRIED; goto again; } } @@ -1088,8 +1247,8 @@ retry_snap: /* we might need to revert back to that point */ data = *from; if (iocb->ki_flags & IOCB_DIRECT) - written = ceph_sync_direct_write(iocb, &data, pos, - snapc); + written = ceph_direct_read_write(iocb, &data, snapc, + &prealloc_cf); else written = ceph_sync_write(iocb, &data, pos, snapc); if (written == -EOLDSNAPC) { -- 2.20.1