ceph: add a new flag to indicate whether parent is locked
authorJeff Layton <jlayton@redhat.com>
Tue, 31 Jan 2017 15:28:26 +0000 (10:28 -0500)
committerIlya Dryomov <idryomov@gmail.com>
Mon, 20 Feb 2017 11:16:08 +0000 (12:16 +0100)
struct ceph_mds_request has an r_locked_dir pointer, which is set to
indicate the parent inode and that its i_rwsem is locked.  In some
critical places, we need to be able to indicate the parent inode to the
request handling code, even when its i_rwsem may not be locked.

Most of the code that operates on r_locked_dir doesn't require that the
i_rwsem be locked. We only really need it to handle manipulation of the
dcache. The rest (filling of the inode, updating dentry leases, etc.)
already has its own locking.

Add a new r_req_flags bit that indicates whether the parent is locked
when doing the request, and rename the pointer to "r_parent". For now,
all the places that set r_parent also set this flag, but that will
change in a later patch.

Signed-off-by: Jeff Layton <jlayton@redhat.com>
Reviewed-by: Yan, Zheng <zyan@redhat.com>
Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
fs/ceph/dir.c
fs/ceph/export.c
fs/ceph/file.c
fs/ceph/inode.c
fs/ceph/mds_client.c
fs/ceph/mds_client.h

index 5fb86d71220e21fe91ac39f9ce0f89ea124011ed..c8562b8a8c7e1210afd571ca403da680f3cf9e5c 100644 (file)
@@ -752,7 +752,8 @@ static struct dentry *ceph_lookup(struct inode *dir, struct dentry *dentry,
                mask |= CEPH_CAP_XATTR_SHARED;
        req->r_args.getattr.mask = cpu_to_le32(mask);
 
-       req->r_locked_dir = dir;
+       req->r_parent = dir;
+       set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags);
        err = ceph_mdsc_do_request(mdsc, NULL, req);
        err = ceph_handle_snapdir(req, dentry, err);
        dentry = ceph_finish_lookup(req, dentry, err);
@@ -813,7 +814,8 @@ static int ceph_mknod(struct inode *dir, struct dentry *dentry,
        }
        req->r_dentry = dget(dentry);
        req->r_num_caps = 2;
-       req->r_locked_dir = dir;
+       req->r_parent = dir;
+       set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags);
        req->r_args.mknod.mode = cpu_to_le32(mode);
        req->r_args.mknod.rdev = cpu_to_le32(rdev);
        req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
@@ -864,7 +866,8 @@ static int ceph_symlink(struct inode *dir, struct dentry *dentry,
                ceph_mdsc_put_request(req);
                goto out;
        }
-       req->r_locked_dir = dir;
+       req->r_parent = dir;
+       set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags);
        req->r_dentry = dget(dentry);
        req->r_num_caps = 2;
        req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
@@ -913,7 +916,8 @@ static int ceph_mkdir(struct inode *dir, struct dentry *dentry, umode_t mode)
 
        req->r_dentry = dget(dentry);
        req->r_num_caps = 2;
-       req->r_locked_dir = dir;
+       req->r_parent = dir;
+       set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags);
        req->r_args.mkdir.mode = cpu_to_le32(mode);
        req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
        req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
@@ -957,7 +961,8 @@ static int ceph_link(struct dentry *old_dentry, struct inode *dir,
        req->r_dentry = dget(dentry);
        req->r_num_caps = 2;
        req->r_old_dentry = dget(old_dentry);
-       req->r_locked_dir = dir;
+       req->r_parent = dir;
+       set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags);
        req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
        req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
        /* release LINK_SHARED on source inode (mds will lock it) */
@@ -1023,7 +1028,8 @@ static int ceph_unlink(struct inode *dir, struct dentry *dentry)
        }
        req->r_dentry = dget(dentry);
        req->r_num_caps = 2;
-       req->r_locked_dir = dir;
+       req->r_parent = dir;
+       set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags);
        req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
        req->r_dentry_unless = CEPH_CAP_FILE_EXCL;
        req->r_inode_drop = drop_caps_for_unlink(inode);
@@ -1066,7 +1072,8 @@ static int ceph_rename(struct inode *old_dir, struct dentry *old_dentry,
        req->r_num_caps = 2;
        req->r_old_dentry = dget(old_dentry);
        req->r_old_dentry_dir = old_dir;
-       req->r_locked_dir = new_dir;
+       req->r_parent = new_dir;
+       set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags);
        req->r_old_dentry_drop = CEPH_CAP_FILE_SHARED;
        req->r_old_dentry_unless = CEPH_CAP_FILE_EXCL;
        req->r_dentry_drop = CEPH_CAP_FILE_SHARED;
index 180bbef760f2c8c12fd94d458c246014634233dd..e8f11fa565c53ac58fddf402f6ade6320d47d490 100644 (file)
@@ -207,7 +207,8 @@ static int ceph_get_name(struct dentry *parent, char *name,
        req->r_inode = d_inode(child);
        ihold(d_inode(child));
        req->r_ino2 = ceph_vino(d_inode(parent));
-       req->r_locked_dir = d_inode(parent);
+       req->r_parent = d_inode(parent);
+       set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags);
        req->r_num_caps = 2;
        err = ceph_mdsc_do_request(mdsc, NULL, req);
 
index beb24f8bb917e266d269a4e7bf304dd2f318ac28..baad3b688ada9e43732c9dad86c2908ece2cab9d 100644 (file)
@@ -379,7 +379,8 @@ int ceph_atomic_open(struct inode *dir, struct dentry *dentry,
                mask |= CEPH_CAP_XATTR_SHARED;
        req->r_args.open.mask = cpu_to_le32(mask);
 
-       req->r_locked_dir = dir;           /* caller holds dir->i_mutex */
+       req->r_parent = dir;
+       set_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags);
        err = ceph_mdsc_do_request(mdsc,
                                   (flags & (O_CREAT|O_TRUNC)) ? dir : NULL,
                                   req);
index ebfb156aba89a08cefa6dd8ab89699d99625e4f3..35a8c453bea66c4fef8a8df4b54ba99d81d5577a 100644 (file)
@@ -1122,13 +1122,13 @@ int ceph_fill_trace(struct super_block *sb, struct ceph_mds_request *req)
 
        if (!rinfo->head->is_target && !rinfo->head->is_dentry) {
                dout("fill_trace reply is empty!\n");
-               if (rinfo->head->result == 0 && req->r_locked_dir)
+               if (rinfo->head->result == 0 && req->r_parent)
                        ceph_invalidate_dir_request(req);
                return 0;
        }
 
        if (rinfo->head->is_dentry) {
-               struct inode *dir = req->r_locked_dir;
+               struct inode *dir = req->r_parent;
 
                if (dir) {
                        err = fill_inode(dir, NULL,
@@ -1213,8 +1213,9 @@ retry_lookup:
         * ignore null lease/binding on snapdir ENOENT, or else we
         * will have trouble splicing in the virtual snapdir later
         */
-       if (rinfo->head->is_dentry && req->r_locked_dir &&
-           !test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags) &&
+       if (rinfo->head->is_dentry &&
+            !test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags) &&
+           test_bit(CEPH_MDS_R_PARENT_LOCKED, &req->r_req_flags) &&
            (rinfo->head->is_target || strncmp(req->r_dentry->d_name.name,
                                               fsc->mount_options->snapdir_name,
                                               req->r_dentry->d_name.len))) {
@@ -1223,7 +1224,7 @@ retry_lookup:
                 * mknod symlink mkdir  : null -> new inode
                 * unlink               : linked -> null
                 */
-               struct inode *dir = req->r_locked_dir;
+               struct inode *dir = req->r_parent;
                struct dentry *dn = req->r_dentry;
                bool have_dir_cap, have_lease;
 
@@ -1321,7 +1322,7 @@ retry_lookup:
                    req->r_op == CEPH_MDS_OP_MKSNAP) &&
                   !test_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags)) {
                struct dentry *dn = req->r_dentry;
-               struct inode *dir = req->r_locked_dir;
+               struct inode *dir = req->r_parent;
 
                /* fill out a snapdir LOOKUPSNAP dentry */
                BUG_ON(!dn);
index ccf75a3260e80dcca63e3e4406ffcc2cf7a30302..52521f33974561b85192e8f1ba6e601200e9e6f0 100644 (file)
@@ -547,8 +547,8 @@ void ceph_mdsc_release_request(struct kref *kref)
                ceph_put_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
                iput(req->r_inode);
        }
-       if (req->r_locked_dir)
-               ceph_put_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN);
+       if (req->r_parent)
+               ceph_put_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN);
        iput(req->r_target_inode);
        if (req->r_dentry)
                dput(req->r_dentry);
@@ -735,7 +735,7 @@ static int __choose_mds(struct ceph_mds_client *mdsc,
 
                rcu_read_lock();
                parent = req->r_dentry->d_parent;
-               dir = req->r_locked_dir ? : d_inode_rcu(parent);
+               dir = req->r_parent ? : d_inode_rcu(parent);
 
                if (!dir || dir->i_sb != mdsc->fsc->sb) {
                        /*  not this fs or parent went negative */
@@ -1894,7 +1894,7 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
        int ret;
 
        ret = set_request_path_attr(req->r_inode, req->r_dentry,
-                             req->r_locked_dir, req->r_path1, req->r_ino1.ino,
+                             req->r_parent, req->r_path1, req->r_ino1.ino,
                              &path1, &pathlen1, &ino1, &freepath1);
        if (ret < 0) {
                msg = ERR_PTR(ret);
@@ -1956,7 +1956,7 @@ static struct ceph_msg *create_request_message(struct ceph_mds_client *mdsc,
                      mds, req->r_inode_drop, req->r_inode_unless, 0);
        if (req->r_dentry_drop)
                releases += ceph_encode_dentry_release(&p, req->r_dentry,
-                               req->r_locked_dir, mds, req->r_dentry_drop,
+                               req->r_parent, mds, req->r_dentry_drop,
                                req->r_dentry_unless);
        if (req->r_old_dentry_drop)
                releases += ceph_encode_dentry_release(&p, req->r_old_dentry,
@@ -2095,14 +2095,14 @@ static int __prepare_send_request(struct ceph_mds_client *mdsc,
        rhead->oldest_client_tid = cpu_to_le64(__get_oldest_tid(mdsc));
        if (test_bit(CEPH_MDS_R_GOT_UNSAFE, &req->r_req_flags))
                flags |= CEPH_MDS_FLAG_REPLAY;
-       if (req->r_locked_dir)
+       if (req->r_parent)
                flags |= CEPH_MDS_FLAG_WANT_DENTRY;
        rhead->flags = cpu_to_le32(flags);
        rhead->num_fwd = req->r_num_fwd;
        rhead->num_retry = req->r_attempts - 1;
        rhead->ino = 0;
 
-       dout(" r_locked_dir = %p\n", req->r_locked_dir);
+       dout(" r_parent = %p\n", req->r_parent);
        return 0;
 }
 
@@ -2282,11 +2282,11 @@ int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
 
        dout("do_request on %p\n", req);
 
-       /* take CAP_PIN refs for r_inode, r_locked_dir, r_old_dentry */
+       /* take CAP_PIN refs for r_inode, r_parent, r_old_dentry */
        if (req->r_inode)
                ceph_get_cap_refs(ceph_inode(req->r_inode), CEPH_CAP_PIN);
-       if (req->r_locked_dir)
-               ceph_get_cap_refs(ceph_inode(req->r_locked_dir), CEPH_CAP_PIN);
+       if (req->r_parent)
+               ceph_get_cap_refs(ceph_inode(req->r_parent), CEPH_CAP_PIN);
        if (req->r_old_dentry_dir)
                ceph_get_cap_refs(ceph_inode(req->r_old_dentry_dir),
                                  CEPH_CAP_PIN);
@@ -2336,7 +2336,7 @@ int ceph_mdsc_do_request(struct ceph_mds_client *mdsc,
                set_bit(CEPH_MDS_R_ABORTED, &req->r_req_flags);
                mutex_unlock(&req->r_fill_mutex);
 
-               if (req->r_locked_dir &&
+               if (req->r_parent &&
                    (req->r_op & CEPH_MDS_OP_WRITE))
                        ceph_invalidate_dir_request(req);
        } else {
@@ -2355,7 +2355,7 @@ out:
  */
 void ceph_invalidate_dir_request(struct ceph_mds_request *req)
 {
-       struct inode *inode = req->r_locked_dir;
+       struct inode *inode = req->r_parent;
 
        dout("invalidate_dir_request %p (complete, lease(s))\n", inode);
 
index 409b0e3c3b7a7d3267ca0370ba93437dc88e80ef..ac0475a2daa749d3d689956cc45a2913c955ca8f 100644 (file)
@@ -202,7 +202,7 @@ struct ceph_mds_request {
        char *r_path1, *r_path2;
        struct ceph_vino r_ino1, r_ino2;
 
-       struct inode *r_locked_dir; /* dir (if any) i_mutex locked by vfs */
+       struct inode *r_parent;             /* parent dir inode */
        struct inode *r_target_inode;       /* resulting inode */
 
 #define CEPH_MDS_R_DIRECT_IS_HASH      (1) /* r_direct_hash is valid */
@@ -211,6 +211,7 @@ struct ceph_mds_request {
 #define CEPH_MDS_R_GOT_SAFE            (4) /* got a safe reply */
 #define CEPH_MDS_R_GOT_RESULT          (5) /* got a result */
 #define CEPH_MDS_R_DID_PREPOPULATE     (6) /* prepopulated readdir */
+#define CEPH_MDS_R_PARENT_LOCKED       (7) /* is r_parent->i_rwsem wlocked? */
        unsigned long   r_req_flags;
 
        struct mutex r_fill_mutex;