inode = mapping->host;
ci = ceph_inode(inode);
- /*
- * Note that we're grabbing a snapc ref here without holding
- * any locks!
- */
- snapc = ceph_get_snap_context(ci->i_snap_realm->cached_context);
-
/* dirty the head */
spin_lock(&ci->i_ceph_lock);
- if (ci->i_head_snapc == NULL)
- ci->i_head_snapc = ceph_get_snap_context(snapc);
- ++ci->i_wrbuffer_ref_head;
+ BUG_ON(ci->i_wr_ref == 0); // caller should hold Fw reference
+ if (__ceph_have_pending_cap_snap(ci)) {
+ struct ceph_cap_snap *capsnap =
+ list_last_entry(&ci->i_cap_snaps,
+ struct ceph_cap_snap,
+ ci_item);
+ snapc = ceph_get_snap_context(capsnap->context);
+ capsnap->dirty_pages++;
+ } else {
+ BUG_ON(!ci->i_head_snapc);
+ snapc = ceph_get_snap_context(ci->i_head_snapc);
+ ++ci->i_wrbuffer_ref_head;
+ }
if (ci->i_wrbuffer_ref == 0)
ihold(inode);
++ci->i_wrbuffer_ref;
{
struct inode *inode = file_inode(file);
struct ceph_inode_info *ci = ceph_inode(inode);
- struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
loff_t page_off = pos & PAGE_CACHE_MASK;
int pos_in_page = pos & ~PAGE_CACHE_MASK;
int end_in_page = pos_in_page + len;
/* writepages currently holds page lock, but if we change that later, */
wait_on_page_writeback(page);
- /* check snap context */
- BUG_ON(!ci->i_snap_realm);
- down_read(&mdsc->snap_rwsem);
- BUG_ON(!ci->i_snap_realm->cached_context);
snapc = page_snap_context(page);
if (snapc && snapc != ci->i_head_snapc) {
/*
* context! is it writeable now?
*/
oldest = get_oldest_context(inode, NULL);
- up_read(&mdsc->snap_rwsem);
if (snapc->seq > oldest->seq) {
ceph_put_snap_context(oldest);
}
/* we need to read it. */
- up_read(&mdsc->snap_rwsem);
r = readpage_nounlock(file, page);
if (r < 0)
goto fail_nosnap;
/*
* we don't do anything in here that simple_write_end doesn't do
- * except adjust dirty page accounting and drop read lock on
- * mdsc->snap_rwsem.
+ * except adjust dirty page accounting
*/
static int ceph_write_end(struct file *file, struct address_space *mapping,
loff_t pos, unsigned len, unsigned copied,
struct page *page, void *fsdata)
{
struct inode *inode = file_inode(file);
- struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
- struct ceph_mds_client *mdsc = fsc->mdsc;
unsigned from = pos & (PAGE_CACHE_SIZE - 1);
int check_cap = 0;
set_page_dirty(page);
unlock_page(page);
- up_read(&mdsc->snap_rwsem);
page_cache_release(page);
if (check_cap)
struct inode *inode = file_inode(vma->vm_file);
struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_file_info *fi = vma->vm_file->private_data;
- struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
struct page *page = vmf->page;
loff_t off = page_offset(page);
loff_t size = i_size_read(inode);
if (ret == 0) {
/* success. we'll keep the page locked. */
set_page_dirty(page);
- up_read(&mdsc->snap_rwsem);
ret = VM_FAULT_LOCKED;
} else {
if (ret == -ENOMEM)
*
* Protected by i_ceph_lock.
*/
-static void __take_cap_refs(struct ceph_inode_info *ci, int got)
+static void __take_cap_refs(struct ceph_inode_info *ci, int got,
+ bool snap_rwsem_locked)
{
if (got & CEPH_CAP_PIN)
ci->i_pin_ref++;
ci->i_rd_ref++;
if (got & CEPH_CAP_FILE_CACHE)
ci->i_rdcache_ref++;
- if (got & CEPH_CAP_FILE_WR)
+ if (got & CEPH_CAP_FILE_WR) {
+ if (ci->i_wr_ref == 0 && !ci->i_head_snapc) {
+ BUG_ON(!snap_rwsem_locked);
+ ci->i_head_snapc = ceph_get_snap_context(
+ ci->i_snap_realm->cached_context);
+ }
ci->i_wr_ref++;
+ }
if (got & CEPH_CAP_FILE_BUFFER) {
if (ci->i_wb_ref == 0)
ihold(&ci->vfs_inode);
* requested from the MDS.
*/
static int try_get_cap_refs(struct ceph_inode_info *ci, int need, int want,
- loff_t endoff, int *got, int *check_max, int *err)
+ loff_t endoff, bool nonblock, int *got, int *err)
{
struct inode *inode = &ci->vfs_inode;
+ struct ceph_mds_client *mdsc = ceph_inode_to_client(inode)->mdsc;
int ret = 0;
int have, implemented;
int file_wanted;
+ bool snap_rwsem_locked = false;
dout("get_cap_refs %p need %s want %s\n", inode,
ceph_cap_string(need), ceph_cap_string(want));
+again:
spin_lock(&ci->i_ceph_lock);
/* make sure file is actually open */
/* finish pending truncate */
while (ci->i_truncate_pending) {
spin_unlock(&ci->i_ceph_lock);
+ if (snap_rwsem_locked) {
+ up_read(&mdsc->snap_rwsem);
+ snap_rwsem_locked = false;
+ }
__ceph_do_pending_vmtruncate(inode);
spin_lock(&ci->i_ceph_lock);
}
dout("get_cap_refs %p endoff %llu > maxsize %llu\n",
inode, endoff, ci->i_max_size);
if (endoff > ci->i_requested_max_size) {
- *check_max = 1;
+ *err = -EAGAIN;
ret = 1;
}
goto out_unlock;
inode, ceph_cap_string(have), ceph_cap_string(not),
ceph_cap_string(revoking));
if ((revoking & not) == 0) {
+ if (!snap_rwsem_locked &&
+ !ci->i_head_snapc &&
+ (need & CEPH_CAP_FILE_WR)) {
+ if (!down_read_trylock(&mdsc->snap_rwsem)) {
+ /*
+ * we can not call down_read() when
+ * task isn't in TASK_RUNNING state
+ */
+ if (nonblock) {
+ *err = -EAGAIN;
+ ret = 1;
+ goto out_unlock;
+ }
+
+ spin_unlock(&ci->i_ceph_lock);
+ down_read(&mdsc->snap_rwsem);
+ snap_rwsem_locked = true;
+ goto again;
+ }
+ snap_rwsem_locked = true;
+ }
*got = need | (have & want);
- __take_cap_refs(ci, *got);
+ __take_cap_refs(ci, *got, true);
ret = 1;
}
} else {
}
out_unlock:
spin_unlock(&ci->i_ceph_lock);
+ if (snap_rwsem_locked)
+ up_read(&mdsc->snap_rwsem);
dout("get_cap_refs %p ret %d got %s\n", inode,
ret, ceph_cap_string(*got));
int ceph_get_caps(struct ceph_inode_info *ci, int need, int want,
loff_t endoff, int *got, struct page **pinned_page)
{
- int _got, check_max, ret, err = 0;
+ int _got, ret, err = 0;
ret = ceph_pool_perm_check(ci, need);
if (ret < 0)
return ret;
-retry:
- if (endoff > 0)
- check_max_size(&ci->vfs_inode, endoff);
- _got = 0;
- check_max = 0;
- ret = wait_event_interruptible(ci->i_cap_wq,
- try_get_cap_refs(ci, need, want, endoff,
- &_got, &check_max, &err));
- if (err)
- ret = err;
- if (ret < 0)
- return ret;
+ while (true) {
+ if (endoff > 0)
+ check_max_size(&ci->vfs_inode, endoff);
- if (check_max)
- goto retry;
+ err = 0;
+ _got = 0;
+ ret = try_get_cap_refs(ci, need, want, endoff,
+ false, &_got, &err);
+ if (ret) {
+ if (err == -EAGAIN)
+ continue;
+ if (err < 0)
+ return err;
+ } else {
+ ret = wait_event_interruptible(ci->i_cap_wq,
+ try_get_cap_refs(ci, need, want, endoff,
+ true, &_got, &err));
+ if (err == -EAGAIN)
+ continue;
+ if (err < 0)
+ ret = err;
+ if (ret < 0)
+ return ret;
+ }
- if (ci->i_inline_version != CEPH_INLINE_NONE &&
- (_got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) &&
- i_size_read(&ci->vfs_inode) > 0) {
- struct page *page = find_get_page(ci->vfs_inode.i_mapping, 0);
- if (page) {
- if (PageUptodate(page)) {
- *pinned_page = page;
- goto out;
+ if (ci->i_inline_version != CEPH_INLINE_NONE &&
+ (_got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) &&
+ i_size_read(&ci->vfs_inode) > 0) {
+ struct page *page =
+ find_get_page(ci->vfs_inode.i_mapping, 0);
+ if (page) {
+ if (PageUptodate(page)) {
+ *pinned_page = page;
+ break;
+ }
+ page_cache_release(page);
}
- page_cache_release(page);
- }
- /*
- * drop cap refs first because getattr while holding
- * caps refs can cause deadlock.
- */
- ceph_put_cap_refs(ci, _got);
- _got = 0;
+ /*
+ * drop cap refs first because getattr while
+ * holding * caps refs can cause deadlock.
+ */
+ ceph_put_cap_refs(ci, _got);
+ _got = 0;
- /* getattr request will bring inline data into page cache */
- ret = __ceph_do_getattr(&ci->vfs_inode, NULL,
- CEPH_STAT_CAP_INLINE_DATA, true);
- if (ret < 0)
- return ret;
- goto retry;
+ /*
+ * getattr request will bring inline data into
+ * page cache
+ */
+ ret = __ceph_do_getattr(&ci->vfs_inode, NULL,
+ CEPH_STAT_CAP_INLINE_DATA,
+ true);
+ if (ret < 0)
+ return ret;
+ continue;
+ }
+ break;
}
-out:
+
*got = _got;
return 0;
}
void ceph_get_cap_refs(struct ceph_inode_info *ci, int caps)
{
spin_lock(&ci->i_ceph_lock);
- __take_cap_refs(ci, caps);
+ __take_cap_refs(ci, caps, false);
spin_unlock(&ci->i_ceph_lock);
}
wake = 1;
}
}
+ if (ci->i_wrbuffer_ref_head == 0 &&
+ ci->i_dirty_caps == 0 &&
+ ci->i_flushing_caps == 0) {
+ BUG_ON(!ci->i_head_snapc);
+ ceph_put_snap_context(ci->i_head_snapc);
+ ci->i_head_snapc = NULL;
+ }
/* see comment in __ceph_remove_cap() */
if (!__ceph_is_any_caps(ci) && ci->i_snap_realm)
drop_inode_snap_realm(ci);
if (ci->i_head_snapc == snapc) {
ci->i_wrbuffer_ref_head -= nr;
if (ci->i_wrbuffer_ref_head == 0 &&
- ci->i_dirty_caps == 0 && ci->i_flushing_caps == 0) {
+ ci->i_wr_ref == 0 &&
+ ci->i_dirty_caps == 0 &&
+ ci->i_flushing_caps == 0) {
BUG_ON(!ci->i_head_snapc);
ceph_put_snap_context(ci->i_head_snapc);
ci->i_head_snapc = NULL;
dout(" inode %p now clean\n", inode);
BUG_ON(!list_empty(&ci->i_dirty_item));
drop = 1;
- if (ci->i_wrbuffer_ref_head == 0) {
+ if (ci->i_wr_ref == 0 &&
+ ci->i_wrbuffer_ref_head == 0) {
BUG_ON(!ci->i_head_snapc);
ceph_put_snap_context(ci->i_head_snapc);
ci->i_head_snapc = NULL;
* objects, rollback on failure, etc.)
*/
static ssize_t
-ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos)
+ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
+ struct ceph_snap_context *snapc)
{
struct file *file = iocb->ki_filp;
struct inode *inode = file_inode(file);
struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
- struct ceph_snap_context *snapc;
struct ceph_vino vino;
struct ceph_osd_request *req;
struct page **pages;
size_t start;
ssize_t n;
- snapc = ci->i_snap_realm->cached_context;
vino = ceph_vino(inode);
req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
vino, pos, &len, 0,
* objects, rollback on failure, etc.)
*/
static ssize_t
-ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos)
+ceph_sync_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
+ struct ceph_snap_context *snapc)
{
struct file *file = iocb->ki_filp;
struct inode *inode = file_inode(file);
struct ceph_inode_info *ci = ceph_inode(inode);
struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
- struct ceph_snap_context *snapc;
struct ceph_vino vino;
struct ceph_osd_request *req;
struct page **pages;
size_t left;
int n;
- snapc = ci->i_snap_realm->cached_context;
vino = ceph_vino(inode);
req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
vino, pos, &len, 0, 1,
if ((got & (CEPH_CAP_FILE_BUFFER|CEPH_CAP_FILE_LAZYIO)) == 0 ||
(iocb->ki_flags & IOCB_DIRECT) || (fi->flags & CEPH_F_SYNC)) {
+ struct ceph_snap_context *snapc;
struct iov_iter data;
mutex_unlock(&inode->i_mutex);
+
+ spin_lock(&ci->i_ceph_lock);
+ if (__ceph_have_pending_cap_snap(ci)) {
+ struct ceph_cap_snap *capsnap =
+ list_last_entry(&ci->i_cap_snaps,
+ struct ceph_cap_snap,
+ ci_item);
+ snapc = ceph_get_snap_context(capsnap->context);
+ } else {
+ BUG_ON(!ci->i_head_snapc);
+ snapc = ceph_get_snap_context(ci->i_head_snapc);
+ }
+ spin_unlock(&ci->i_ceph_lock);
+
/* we might need to revert back to that point */
data = *from;
if (iocb->ki_flags & IOCB_DIRECT)
- written = ceph_sync_direct_write(iocb, &data, pos);
+ written = ceph_sync_direct_write(iocb, &data, pos,
+ snapc);
else
- written = ceph_sync_write(iocb, &data, pos);
+ written = ceph_sync_write(iocb, &data, pos, snapc);
if (written == -EOLDSNAPC) {
dout("aio_write %p %llx.%llx %llu~%u"
"got EOLDSNAPC, retrying\n",
}
if (written > 0)
iov_iter_advance(from, written);
+ ceph_put_snap_context(snapc);
} else {
loff_t old_size = inode->i_size;
/*
{
struct inode *inode = &ci->vfs_inode;
struct ceph_cap_snap *capsnap;
+ struct ceph_snap_context *old_snapc;
int used, dirty;
capsnap = kzalloc(sizeof(*capsnap), GFP_NOFS);
used = __ceph_caps_used(ci);
dirty = __ceph_caps_dirty(ci);
+ old_snapc = ci->i_head_snapc;
+
/*
* If there is a write in progress, treat that as a dirty Fw,
* even though it hasn't completed yet; by the time we finish
writes in progress now were started before the previous
cap_snap. lucky us. */
dout("queue_cap_snap %p already pending\n", inode);
- kfree(capsnap);
- } else if (ci->i_snap_realm->cached_context == ceph_empty_snapc) {
+ goto update_snapc;
+ }
+ if (ci->i_snap_realm->cached_context == ceph_empty_snapc) {
dout("queue_cap_snap %p empty snapc\n", inode);
- kfree(capsnap);
- } else if (dirty & (CEPH_CAP_AUTH_EXCL|CEPH_CAP_XATTR_EXCL|
- CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR)) {
- struct ceph_snap_context *snapc = ci->i_head_snapc;
+ goto update_snapc;
+ }
+ if (!(dirty & (CEPH_CAP_AUTH_EXCL|CEPH_CAP_XATTR_EXCL|
+ CEPH_CAP_FILE_EXCL|CEPH_CAP_FILE_WR))) {
+ dout("queue_cap_snap %p nothing dirty|writing\n", inode);
+ goto update_snapc;
+ }
- /*
- * if we are a sync write, we may need to go to the snaprealm
- * to get the current snapc.
- */
- if (!snapc)
- snapc = ci->i_snap_realm->cached_context;
+ BUG_ON(!old_snapc);
- dout("queue_cap_snap %p cap_snap %p queuing under %p %s\n",
- inode, capsnap, snapc, ceph_cap_string(dirty));
- ihold(inode);
+ dout("queue_cap_snap %p cap_snap %p queuing under %p %s\n",
+ inode, capsnap, old_snapc, ceph_cap_string(dirty));
+ ihold(inode);
- atomic_set(&capsnap->nref, 1);
- capsnap->ci = ci;
- INIT_LIST_HEAD(&capsnap->ci_item);
- INIT_LIST_HEAD(&capsnap->flushing_item);
-
- capsnap->follows = snapc->seq;
- capsnap->issued = __ceph_caps_issued(ci, NULL);
- capsnap->dirty = dirty;
-
- capsnap->mode = inode->i_mode;
- capsnap->uid = inode->i_uid;
- capsnap->gid = inode->i_gid;
-
- if (dirty & CEPH_CAP_XATTR_EXCL) {
- __ceph_build_xattrs_blob(ci);
- capsnap->xattr_blob =
- ceph_buffer_get(ci->i_xattrs.blob);
- capsnap->xattr_version = ci->i_xattrs.version;
- } else {
- capsnap->xattr_blob = NULL;
- capsnap->xattr_version = 0;
- }
+ atomic_set(&capsnap->nref, 1);
+ capsnap->ci = ci;
+ INIT_LIST_HEAD(&capsnap->ci_item);
+ INIT_LIST_HEAD(&capsnap->flushing_item);
- capsnap->inline_data = ci->i_inline_version != CEPH_INLINE_NONE;
+ capsnap->follows = old_snapc->seq;
+ capsnap->issued = __ceph_caps_issued(ci, NULL);
+ capsnap->dirty = dirty;
- /* dirty page count moved from _head to this cap_snap;
- all subsequent writes page dirties occur _after_ this
- snapshot. */
- capsnap->dirty_pages = ci->i_wrbuffer_ref_head;
- ci->i_wrbuffer_ref_head = 0;
- capsnap->context = snapc;
- ci->i_head_snapc =
- ceph_get_snap_context(ci->i_snap_realm->cached_context);
- dout(" new snapc is %p\n", ci->i_head_snapc);
- list_add_tail(&capsnap->ci_item, &ci->i_cap_snaps);
-
- if (used & CEPH_CAP_FILE_WR) {
- dout("queue_cap_snap %p cap_snap %p snapc %p"
- " seq %llu used WR, now pending\n", inode,
- capsnap, snapc, snapc->seq);
- capsnap->writing = 1;
- } else {
- /* note mtime, size NOW. */
- __ceph_finish_cap_snap(ci, capsnap);
- }
+ capsnap->mode = inode->i_mode;
+ capsnap->uid = inode->i_uid;
+ capsnap->gid = inode->i_gid;
+
+ if (dirty & CEPH_CAP_XATTR_EXCL) {
+ __ceph_build_xattrs_blob(ci);
+ capsnap->xattr_blob =
+ ceph_buffer_get(ci->i_xattrs.blob);
+ capsnap->xattr_version = ci->i_xattrs.version;
} else {
- dout("queue_cap_snap %p nothing dirty|writing\n", inode);
- kfree(capsnap);
+ capsnap->xattr_blob = NULL;
+ capsnap->xattr_version = 0;
}
+ capsnap->inline_data = ci->i_inline_version != CEPH_INLINE_NONE;
+
+ /* dirty page count moved from _head to this cap_snap;
+ all subsequent writes page dirties occur _after_ this
+ snapshot. */
+ capsnap->dirty_pages = ci->i_wrbuffer_ref_head;
+ ci->i_wrbuffer_ref_head = 0;
+ capsnap->context = old_snapc;
+ list_add_tail(&capsnap->ci_item, &ci->i_cap_snaps);
+ old_snapc = NULL;
+
+ if (used & CEPH_CAP_FILE_WR) {
+ dout("queue_cap_snap %p cap_snap %p snapc %p"
+ " seq %llu used WR, now pending\n", inode,
+ capsnap, old_snapc, old_snapc->seq);
+ capsnap->writing = 1;
+ } else {
+ /* note mtime, size NOW. */
+ __ceph_finish_cap_snap(ci, capsnap);
+ }
+ capsnap = NULL;
+
+update_snapc:
+ if (ci->i_head_snapc) {
+ ci->i_head_snapc = ceph_get_snap_context(
+ ci->i_snap_realm->cached_context);
+ dout(" new snapc is %p\n", ci->i_head_snapc);
+ }
spin_unlock(&ci->i_ceph_lock);
+
+ kfree(capsnap);
+ ceph_put_snap_context(old_snapc);
}
/*