ceph: take snap_rwsem when accessing snap realm's cached_context
authorYan, Zheng <zyan@redhat.com>
Fri, 1 May 2015 09:49:16 +0000 (17:49 +0800)
committerIlya Dryomov <idryomov@gmail.com>
Thu, 25 Jun 2015 08:49:28 +0000 (11:49 +0300)
When ceph inode's i_head_snapc is NULL, __ceph_mark_dirty_caps()
accesses snap realm's cached_context. So we need take read lock
of snap_rwsem.

Signed-off-by: Yan, Zheng <zyan@redhat.com>
fs/ceph/caps.c
fs/ceph/inode.c
fs/ceph/xattr.c

index f1dbcae7c75c4d72e18f1c425a4b70e75a73a4b9..900c05fd77d86db82d7152b7c4d731186807fa83 100644 (file)
@@ -1413,9 +1413,11 @@ int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask)
             ceph_cap_string(was | mask));
        ci->i_dirty_caps |= mask;
        if (was == 0) {
-               if (!ci->i_head_snapc)
+               if (!ci->i_head_snapc) {
+                       WARN_ON_ONCE(!rwsem_is_locked(&mdsc->snap_rwsem));
                        ci->i_head_snapc = ceph_get_snap_context(
                                ci->i_snap_realm->cached_context);
+               }
                dout(" inode %p now dirty snapc %p auth cap %p\n",
                     &ci->vfs_inode, ci->i_head_snapc, ci->i_auth_cap);
                BUG_ON(!list_empty(&ci->i_dirty_item));
index 1a68c0e38a52d86c7973a182392a7a4650a05d28..1c991df276c96e093f88cb9cf795df4fb82edfbf 100644 (file)
@@ -1727,6 +1727,7 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr)
        int mask = 0;
        int err = 0;
        int inode_dirty_flags = 0;
+       bool lock_snap_rwsem = false;
 
        if (ceph_snap(inode) != CEPH_NOSNAP)
                return -EROFS;
@@ -1742,6 +1743,18 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr)
 
        spin_lock(&ci->i_ceph_lock);
        issued = __ceph_caps_issued(ci, NULL);
+
+       if (!ci->i_head_snapc &&
+           (issued & (CEPH_CAP_ANY_EXCL | CEPH_CAP_FILE_WR))) {
+               lock_snap_rwsem = true;
+               if (!down_read_trylock(&mdsc->snap_rwsem)) {
+                       spin_unlock(&ci->i_ceph_lock);
+                       down_read(&mdsc->snap_rwsem);
+                       spin_lock(&ci->i_ceph_lock);
+                       issued = __ceph_caps_issued(ci, NULL);
+               }
+       }
+
        dout("setattr %p issued %s\n", inode, ceph_cap_string(issued));
 
        if (ia_valid & ATTR_UID) {
@@ -1890,6 +1903,8 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr)
 
        release &= issued;
        spin_unlock(&ci->i_ceph_lock);
+       if (lock_snap_rwsem)
+               up_read(&mdsc->snap_rwsem);
 
        if (inode_dirty_flags)
                __mark_inode_dirty(inode, inode_dirty_flags);
index cd7ffad4041d81b605bdfbc97a4bc0072c19ca99..c6f7d9b8208557b78b4b74b91ad1ae46e8e545bd 100644 (file)
@@ -911,6 +911,7 @@ int __ceph_setxattr(struct dentry *dentry, const char *name,
        struct inode *inode = d_inode(dentry);
        struct ceph_vxattr *vxattr;
        struct ceph_inode_info *ci = ceph_inode(inode);
+       struct ceph_mds_client *mdsc = ceph_sb_to_client(dentry->d_sb)->mdsc;
        int issued;
        int err;
        int dirty = 0;
@@ -920,6 +921,7 @@ int __ceph_setxattr(struct dentry *dentry, const char *name,
        char *newval = NULL;
        struct ceph_inode_xattr *xattr = NULL;
        int required_blob_size;
+       bool lock_snap_rwsem = false;
 
        if (!ceph_is_valid_xattr(name))
                return -EOPNOTSUPP;
@@ -951,9 +953,20 @@ int __ceph_setxattr(struct dentry *dentry, const char *name,
        spin_lock(&ci->i_ceph_lock);
 retry:
        issued = __ceph_caps_issued(ci, NULL);
-       dout("setxattr %p issued %s\n", inode, ceph_cap_string(issued));
        if (ci->i_xattrs.version == 0 || !(issued & CEPH_CAP_XATTR_EXCL))
                goto do_sync;
+
+       if (!lock_snap_rwsem && !ci->i_head_snapc) {
+               lock_snap_rwsem = true;
+               if (!down_read_trylock(&mdsc->snap_rwsem)) {
+                       spin_unlock(&ci->i_ceph_lock);
+                       down_read(&mdsc->snap_rwsem);
+                       spin_lock(&ci->i_ceph_lock);
+                       goto retry;
+               }
+       }
+
+       dout("setxattr %p issued %s\n", inode, ceph_cap_string(issued));
        __build_xattrs(inode);
 
        required_blob_size = __get_required_blob_size(ci, name_len, val_len);
@@ -966,7 +979,7 @@ retry:
                dout(" preaallocating new blob size=%d\n", required_blob_size);
                blob = ceph_buffer_new(required_blob_size, GFP_NOFS);
                if (!blob)
-                       goto out;
+                       goto do_sync_unlocked;
                spin_lock(&ci->i_ceph_lock);
                if (ci->i_xattrs.prealloc_blob)
                        ceph_buffer_put(ci->i_xattrs.prealloc_blob);
@@ -984,6 +997,8 @@ retry:
        }
 
        spin_unlock(&ci->i_ceph_lock);
+       if (lock_snap_rwsem)
+               up_read(&mdsc->snap_rwsem);
        if (dirty)
                __mark_inode_dirty(inode, dirty);
        return err;
@@ -991,6 +1006,8 @@ retry:
 do_sync:
        spin_unlock(&ci->i_ceph_lock);
 do_sync_unlocked:
+       if (lock_snap_rwsem)
+               up_read(&mdsc->snap_rwsem);
        err = ceph_sync_setxattr(dentry, name, value, size, flags);
 out:
        kfree(newname);
@@ -1044,10 +1061,12 @@ int __ceph_removexattr(struct dentry *dentry, const char *name)
        struct inode *inode = d_inode(dentry);
        struct ceph_vxattr *vxattr;
        struct ceph_inode_info *ci = ceph_inode(inode);
+       struct ceph_mds_client *mdsc = ceph_sb_to_client(dentry->d_sb)->mdsc;
        int issued;
        int err;
        int required_blob_size;
        int dirty;
+       bool lock_snap_rwsem = false;
 
        if (!ceph_is_valid_xattr(name))
                return -EOPNOTSUPP;
@@ -1064,10 +1083,21 @@ int __ceph_removexattr(struct dentry *dentry, const char *name)
        spin_lock(&ci->i_ceph_lock);
 retry:
        issued = __ceph_caps_issued(ci, NULL);
-       dout("removexattr %p issued %s\n", inode, ceph_cap_string(issued));
-
        if (ci->i_xattrs.version == 0 || !(issued & CEPH_CAP_XATTR_EXCL))
                goto do_sync;
+
+       if (!lock_snap_rwsem && !ci->i_head_snapc) {
+               lock_snap_rwsem = true;
+               if (!down_read_trylock(&mdsc->snap_rwsem)) {
+                       spin_unlock(&ci->i_ceph_lock);
+                       down_read(&mdsc->snap_rwsem);
+                       spin_lock(&ci->i_ceph_lock);
+                       goto retry;
+               }
+       }
+
+       dout("removexattr %p issued %s\n", inode, ceph_cap_string(issued));
+
        __build_xattrs(inode);
 
        required_blob_size = __get_required_blob_size(ci, 0, 0);
@@ -1080,7 +1110,7 @@ retry:
                dout(" preaallocating new blob size=%d\n", required_blob_size);
                blob = ceph_buffer_new(required_blob_size, GFP_NOFS);
                if (!blob)
-                       goto out;
+                       goto do_sync_unlocked;
                spin_lock(&ci->i_ceph_lock);
                if (ci->i_xattrs.prealloc_blob)
                        ceph_buffer_put(ci->i_xattrs.prealloc_blob);
@@ -1094,14 +1124,17 @@ retry:
        ci->i_xattrs.dirty = true;
        inode->i_ctime = CURRENT_TIME;
        spin_unlock(&ci->i_ceph_lock);
+       if (lock_snap_rwsem)
+               up_read(&mdsc->snap_rwsem);
        if (dirty)
                __mark_inode_dirty(inode, dirty);
        return err;
 do_sync:
        spin_unlock(&ci->i_ceph_lock);
 do_sync_unlocked:
+       if (lock_snap_rwsem)
+               up_read(&mdsc->snap_rwsem);
        err = ceph_send_removexattr(dentry, name);
-out:
        return err;
 }