ceph: fix security xattr deadlock
authorYan, Zheng <zyan@redhat.com>
Mon, 7 Mar 2016 02:34:50 +0000 (10:34 +0800)
committerIlya Dryomov <idryomov@gmail.com>
Fri, 25 Mar 2016 17:51:55 +0000 (18:51 +0100)
When security is enabled, security module can call filesystem's
getxattr/setxattr callbacks during d_instantiate(). For cephfs,
d_instantiate() is usually called by MDS' dispatch thread, while
handling MDS reply. If the MDS reply does not include xattrs and
corresponding caps, getxattr/setxattr need to send a new request
to MDS and waits for the reply. This makes MDS' dispatch sleep,
nobody handles later MDS replies.

The fix is make sure lookup/atomic_open reply include xattrs and
corresponding caps. So getxattr can be handled by cached xattrs.
This requires some modification to both MDS and request message.
(Client tells MDS what caps it wants; MDS encodes proper caps in
the reply)

Smack security module may call setxattr during d_instantiate().
Unlike getxattr, we can't force MDS to issue CEPH_CAP_XATTR_EXCL
to us. So just make setxattr return error when called by MDS'
dispatch thread.

Signed-off-by: Yan, Zheng <zyan@redhat.com>
fs/ceph/dir.c
fs/ceph/export.c
fs/ceph/file.c
fs/ceph/inode.c
fs/ceph/mds_client.c
fs/ceph/super.h
fs/ceph/xattr.c
include/linux/ceph/ceph_fs.h

index fd11fb231a2ea796e86eae933265a488ad6774e5..b9f50a388aee30d9b341c90c736d4eee0e3f7510 100644 (file)
@@ -624,6 +624,7 @@ static struct dentry *ceph_lookup(struct inode *dir, struct dentry *dentry,
        struct ceph_mds_client *mdsc = fsc->mdsc;
        struct ceph_mds_request *req;
        int op;
+       int mask;
        int err;
 
        dout("lookup %p dentry %p '%pd'\n",
@@ -666,8 +667,12 @@ static struct dentry *ceph_lookup(struct inode *dir, struct dentry *dentry,
                return ERR_CAST(req);
        req->r_dentry = dget(dentry);
        req->r_num_caps = 2;
-       /* we only need inode linkage */
-       req->r_args.getattr.mask = cpu_to_le32(CEPH_STAT_CAP_INODE);
+
+       mask = CEPH_STAT_CAP_INODE | CEPH_CAP_AUTH_SHARED;
+       if (ceph_security_xattr_wanted(dir))
+               mask |= CEPH_CAP_XATTR_SHARED;
+       req->r_args.getattr.mask = cpu_to_le32(mask);
+
        req->r_locked_dir = dir;
        err = ceph_mdsc_do_request(mdsc, NULL, req);
        err = ceph_handle_snapdir(req, dentry, err);
index 3b31723573268e924346c1bffddb7361390b2a8d..6e72c98162d5351a9ad27b581eb17737eb4a1203 100644 (file)
@@ -71,12 +71,18 @@ static struct dentry *__fh_to_dentry(struct super_block *sb, u64 ino)
        inode = ceph_find_inode(sb, vino);
        if (!inode) {
                struct ceph_mds_request *req;
+               int mask;
 
                req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_LOOKUPINO,
                                               USE_ANY_MDS);
                if (IS_ERR(req))
                        return ERR_CAST(req);
 
+               mask = CEPH_STAT_CAP_INODE;
+               if (ceph_security_xattr_wanted(d_inode(sb->s_root)))
+                       mask |= CEPH_CAP_XATTR_SHARED;
+               req->r_args.getattr.mask = cpu_to_le32(mask);
+
                req->r_ino1 = vino;
                req->r_num_caps = 1;
                err = ceph_mdsc_do_request(mdsc, NULL, req);
@@ -128,6 +134,7 @@ static struct dentry *__get_parent(struct super_block *sb,
        struct ceph_mds_request *req;
        struct inode *inode;
        struct dentry *dentry;
+       int mask;
        int err;
 
        req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_LOOKUPPARENT,
@@ -144,6 +151,12 @@ static struct dentry *__get_parent(struct super_block *sb,
                        .snap = CEPH_NOSNAP,
                };
        }
+
+       mask = CEPH_STAT_CAP_INODE;
+       if (ceph_security_xattr_wanted(d_inode(sb->s_root)))
+               mask |= CEPH_CAP_XATTR_SHARED;
+       req->r_args.getattr.mask = cpu_to_le32(mask);
+
        req->r_num_caps = 1;
        err = ceph_mdsc_do_request(mdsc, NULL, req);
        inode = req->r_target_inode;
index 389adacbc71935f2aa38e417fbc8ea7c40970126..334a75170a3badae4f0f130548cfc656d234f94a 100644 (file)
@@ -300,6 +300,7 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
        struct ceph_mds_request *req;
        struct dentry *dn;
        struct ceph_acls_info acls = {};
+       int mask;
        int err;
 
        dout("atomic_open %p dentry %p '%pd' %s flags %d mode 0%o\n",
@@ -335,6 +336,12 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
                        acls.pagelist = NULL;
                }
        }
+
+       mask = CEPH_STAT_CAP_INODE | CEPH_CAP_AUTH_SHARED;
+       if (ceph_security_xattr_wanted(dir))
+               mask |= CEPH_CAP_XATTR_SHARED;
+       req->r_args.open.mask = cpu_to_le32(mask);
+
        req->r_locked_dir = dir;           /* caller holds dir->i_mutex */
        err = ceph_mdsc_do_request(mdsc,
                                   (flags & (O_CREAT|O_TRUNC)) ? dir : NULL,
index 66edef12c6f2516f765353646f93ef8aae629831..8b136dc0bc1396688998c968a54a086b87d00782 100644 (file)
@@ -1389,7 +1389,7 @@ int ceph_readdir_prepopulate(struct ceph_mds_request *req,
        struct qstr dname;
        struct dentry *dn;
        struct inode *in;
-       int err = 0, ret, i;
+       int err = 0, skipped = 0, ret, i;
        struct inode *snapdir = NULL;
        struct ceph_mds_request_head *rhead = req->r_request->front.iov_base;
        struct ceph_dentry_info *di;
@@ -1501,7 +1501,17 @@ retry_lookup:
                }
 
                if (d_really_is_negative(dn)) {
-                       struct dentry *realdn = splice_dentry(dn, in);
+                       struct dentry *realdn;
+
+                       if (ceph_security_xattr_deadlock(in)) {
+                               dout(" skip splicing dn %p to inode %p"
+                                    " (security xattr deadlock)\n", dn, in);
+                               iput(in);
+                               skipped++;
+                               goto next_item;
+                       }
+
+                       realdn = splice_dentry(dn, in);
                        if (IS_ERR(realdn)) {
                                err = PTR_ERR(realdn);
                                d_drop(dn);
@@ -1518,7 +1528,7 @@ retry_lookup:
                                    req->r_session,
                                    req->r_request_started);
 
-               if (err == 0 && cache_ctl.index >= 0) {
+               if (err == 0 && skipped == 0 && cache_ctl.index >= 0) {
                        ret = fill_readdir_cache(d_inode(parent), dn,
                                                 &cache_ctl, req);
                        if (ret < 0)
@@ -1529,7 +1539,7 @@ next_item:
                        dput(dn);
        }
 out:
-       if (err == 0) {
+       if (err == 0 && skipped == 0) {
                req->r_did_prepopulate = true;
                req->r_readdir_cache_idx = cache_ctl.index;
        }
index aa43dcb5f9b95af6d76f488f57c47363f56e939b..44852c3ae5311a3367ca081685f8baa7ecd60bed 100644 (file)
@@ -2540,6 +2540,7 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
 
        /* insert trace into our cache */
        mutex_lock(&req->r_fill_mutex);
+       current->journal_info = req;
        err = ceph_fill_trace(mdsc->fsc->sb, req, req->r_session);
        if (err == 0) {
                if (result == 0 && (req->r_op == CEPH_MDS_OP_READDIR ||
@@ -2547,6 +2548,7 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
                        ceph_readdir_prepopulate(req, req->r_session);
                ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
        }
+       current->journal_info = NULL;
        mutex_unlock(&req->r_fill_mutex);
 
        up_read(&mdsc->snap_rwsem);
index 57ac43d64322cc04e9222502d7553ac2d4e316a6..2d48138da58efa7a86c27c53b9344fbbc7b50c2f 100644 (file)
@@ -468,7 +468,7 @@ static inline struct inode *ceph_find_inode(struct super_block *sb,
 #define CEPH_I_POOL_PERM       (1 << 4)  /* pool rd/wr bits are valid */
 #define CEPH_I_POOL_RD         (1 << 5)  /* can read from pool */
 #define CEPH_I_POOL_WR         (1 << 6)  /* can write to pool */
-
+#define CEPH_I_SEC_INITED      (1 << 7)  /* security initialized */
 
 static inline void __ceph_dir_set_complete(struct ceph_inode_info *ci,
                                           long long release_count,
@@ -804,6 +804,20 @@ extern void __init ceph_xattr_init(void);
 extern void ceph_xattr_exit(void);
 extern const struct xattr_handler *ceph_xattr_handlers[];
 
+#ifdef CONFIG_SECURITY
+extern bool ceph_security_xattr_deadlock(struct inode *in);
+extern bool ceph_security_xattr_wanted(struct inode *in);
+#else
+static inline bool ceph_security_xattr_deadlock(struct inode *in)
+{
+       return false;
+}
+static inline bool ceph_security_xattr_wanted(struct inode *in)
+{
+       return false;
+}
+#endif
+
 /* acl.c */
 struct ceph_acls_info {
        void *default_acl;
index 139cdef8eb41bb8de5b6cc8b312ae1e464a4f94c..9410abdef3cec5fdb0ca73a905c503c10bc994fb 100644 (file)
@@ -714,13 +714,31 @@ void __ceph_build_xattrs_blob(struct ceph_inode_info *ci)
        }
 }
 
+static inline int __get_request_mask(struct inode *in) {
+       struct ceph_mds_request *req = current->journal_info;
+       int mask = 0;
+       if (req && req->r_target_inode == in) {
+               if (req->r_op == CEPH_MDS_OP_LOOKUP ||
+                   req->r_op == CEPH_MDS_OP_LOOKUPINO ||
+                   req->r_op == CEPH_MDS_OP_LOOKUPPARENT ||
+                   req->r_op == CEPH_MDS_OP_GETATTR) {
+                       mask = le32_to_cpu(req->r_args.getattr.mask);
+               } else if (req->r_op == CEPH_MDS_OP_OPEN ||
+                          req->r_op == CEPH_MDS_OP_CREATE) {
+                       mask = le32_to_cpu(req->r_args.open.mask);
+               }
+       }
+       return mask;
+}
+
 ssize_t __ceph_getxattr(struct inode *inode, const char *name, void *value,
                      size_t size)
 {
        struct ceph_inode_info *ci = ceph_inode(inode);
-       int err;
        struct ceph_inode_xattr *xattr;
        struct ceph_vxattr *vxattr = NULL;
+       int req_mask;
+       int err;
 
        if (!ceph_is_valid_xattr(name))
                return -ENODATA;
@@ -734,13 +752,24 @@ ssize_t __ceph_getxattr(struct inode *inode, const char *name, void *value,
                return err;
        }
 
+       req_mask = __get_request_mask(inode);
+
        spin_lock(&ci->i_ceph_lock);
        dout("getxattr %p ver=%lld index_ver=%lld\n", inode,
             ci->i_xattrs.version, ci->i_xattrs.index_version);
 
        if (ci->i_xattrs.version == 0 ||
-           !__ceph_caps_issued_mask(ci, CEPH_CAP_XATTR_SHARED, 1)) {
+           !((req_mask & CEPH_CAP_XATTR_SHARED) ||
+             __ceph_caps_issued_mask(ci, CEPH_CAP_XATTR_SHARED, 1))) {
                spin_unlock(&ci->i_ceph_lock);
+
+               /* security module gets xattr while filling trace */
+               if (current->journal_info != NULL) {
+                       pr_warn_ratelimited("sync getxattr %p "
+                                           "during filling trace\n", inode);
+                       return -EBUSY;
+               }
+
                /* get xattrs from mds (if we don't already have them) */
                err = ceph_do_getattr(inode, CEPH_STAT_CAP_XATTR, true);
                if (err)
@@ -767,6 +796,9 @@ ssize_t __ceph_getxattr(struct inode *inode, const char *name, void *value,
 
        memcpy(value, xattr->val, xattr->val_len);
 
+       if (current->journal_info != NULL &&
+           !strncmp(name, XATTR_SECURITY_PREFIX, XATTR_SECURITY_PREFIX_LEN))
+               ci->i_ceph_flags |= CEPH_I_SEC_INITED;
 out:
        spin_unlock(&ci->i_ceph_lock);
        return err;
@@ -1017,7 +1049,15 @@ do_sync:
 do_sync_unlocked:
        if (lock_snap_rwsem)
                up_read(&mdsc->snap_rwsem);
-       err = ceph_sync_setxattr(dentry, name, value, size, flags);
+
+       /* security module set xattr while filling trace */
+       if (current->journal_info != NULL) {
+               pr_warn_ratelimited("sync setxattr %p "
+                                   "during filling trace\n", inode);
+               err = -EBUSY;
+       } else {
+               err = ceph_sync_setxattr(dentry, name, value, size, flags);
+       }
 out:
        ceph_free_cap_flush(prealloc_cf);
        kfree(newname);
@@ -1166,3 +1206,25 @@ int ceph_removexattr(struct dentry *dentry, const char *name)
 
        return __ceph_removexattr(dentry, name);
 }
+
+#ifdef CONFIG_SECURITY
+bool ceph_security_xattr_wanted(struct inode *in)
+{
+       return in->i_security != NULL;
+}
+
+bool ceph_security_xattr_deadlock(struct inode *in)
+{
+       struct ceph_inode_info *ci;
+       bool ret;
+       if (in->i_security == NULL)
+               return false;
+       ci = ceph_inode(in);
+       spin_lock(&ci->i_ceph_lock);
+       ret = !(ci->i_ceph_flags & CEPH_I_SEC_INITED) &&
+             !(ci->i_xattrs.version > 0 &&
+               __ceph_caps_issued_mask(ci, CEPH_CAP_XATTR_SHARED, 0));
+       spin_unlock(&ci->i_ceph_lock);
+       return ret;
+}
+#endif
index bf74005eedec75834330ce43162f767385f064ed..37f28bf55ce426bac3a8a533101f4afed21be53f 100644 (file)
@@ -376,7 +376,8 @@ union ceph_mds_request_args {
                __le32 stripe_count;         /* ... */
                __le32 object_size;
                __le32 file_replication;
-               __le32 unused;               /* used to be preferred osd */
+               __le32 mask;                 /* CEPH_CAP_* */
+               __le32 old_size;
        } __attribute__ ((packed)) open;
        struct {
                __le32 flags;