ceph: fix iterate_caps removal race
authorSage Weil <sage@newdream.net>
Tue, 16 Feb 2010 19:39:45 +0000 (11:39 -0800)
committerSage Weil <sage@newdream.net>
Wed, 17 Feb 2010 18:02:47 +0000 (10:02 -0800)
We need to be able to iterate over all caps on a session with a
possibly slow callback on each cap.  To allow this, we used to
prevent cap reordering while we were iterating.  However, we were
not safe from races with removal: removing the 'next' cap would
make the next pointer from list_for_each_entry_safe be invalid,
and cause a lock up or similar badness.

Instead, we keep an iterator pointer in the session pointing to
the current cap.  As before, we avoid reordering.  For removal,
if the cap isn't the current cap we are iterating over, we are
fine.  If it is, we clear cap->ci (to mark the cap as pending
removal) but leave it in the session list.  In iterate_caps, we
can safely finish removal and get the next cap pointer.

While we're at it, clean up put_cap to not take a cap reservation
context, as it was never used.

Signed-off-by: Sage Weil <sage@newdream.net>
fs/ceph/caps.c
fs/ceph/mds_client.c
fs/ceph/mds_client.h
fs/ceph/super.h

index f94b56faba3b5627f5505615f5ba108d6e3a90c3..4958a2ef3e0482a7e338630973c74b42a7a385c4 100644 (file)
@@ -266,12 +266,11 @@ static struct ceph_cap *get_cap(struct ceph_cap_reservation *ctx)
        return cap;
 }
 
-static void put_cap(struct ceph_cap *cap,
-                   struct ceph_cap_reservation *ctx)
+void ceph_put_cap(struct ceph_cap *cap)
 {
        spin_lock(&caps_list_lock);
-       dout("put_cap ctx=%p (%d) %d = %d used + %d resv + %d avail\n",
-            ctx, ctx ? ctx->count : 0, caps_total_count, caps_use_count,
+       dout("put_cap %p %d = %d used + %d resv + %d avail\n",
+            cap, caps_total_count, caps_use_count,
             caps_reserve_count, caps_avail_count);
        caps_use_count--;
        /*
@@ -282,12 +281,7 @@ static void put_cap(struct ceph_cap *cap,
                caps_total_count--;
                kmem_cache_free(ceph_cap_cachep, cap);
        } else {
-               if (ctx) {
-                       ctx->count++;
-                       caps_reserve_count++;
-               } else {
-                       caps_avail_count++;
-               }
+               caps_avail_count++;
                list_add(&cap->caps_item, &caps_list);
        }
 
@@ -709,7 +703,7 @@ static void __touch_cap(struct ceph_cap *cap)
        struct ceph_mds_session *s = cap->session;
 
        spin_lock(&s->s_cap_lock);
-       if (!s->s_iterating_caps) {
+       if (s->s_cap_iterator == NULL) {
                dout("__touch_cap %p cap %p mds%d\n", &cap->ci->vfs_inode, cap,
                     s->s_mds);
                list_move_tail(&cap->session_caps, &s->s_caps);
@@ -865,8 +859,7 @@ static int __ceph_is_any_caps(struct ceph_inode_info *ci)
  * caller should hold i_lock, and session s_mutex.
  * returns true if this is the last cap.  if so, caller should iput.
  */
-void __ceph_remove_cap(struct ceph_cap *cap,
-                      struct ceph_cap_reservation *ctx)
+void __ceph_remove_cap(struct ceph_cap *cap)
 {
        struct ceph_mds_session *session = cap->session;
        struct ceph_inode_info *ci = cap->ci;
@@ -874,19 +867,27 @@ void __ceph_remove_cap(struct ceph_cap *cap,
 
        dout("__ceph_remove_cap %p from %p\n", cap, &ci->vfs_inode);
 
-       /* remove from session list */
-       spin_lock(&session->s_cap_lock);
-       list_del_init(&cap->session_caps);
-       session->s_nr_caps--;
-       spin_unlock(&session->s_cap_lock);
-
        /* remove from inode list */
        rb_erase(&cap->ci_node, &ci->i_caps);
-       cap->session = NULL;
+       cap->ci = NULL;
        if (ci->i_auth_cap == cap)
                ci->i_auth_cap = NULL;
 
-       put_cap(cap, ctx);
+       /* remove from session list */
+       spin_lock(&session->s_cap_lock);
+       if (session->s_cap_iterator == cap) {
+               /* not yet, we are iterating over this very cap */
+               dout("__ceph_remove_cap  delaying %p removal from session %p\n",
+                    cap, cap->session);
+       } else {
+               list_del_init(&cap->session_caps);
+               session->s_nr_caps--;
+               cap->session = NULL;
+       }
+       spin_unlock(&session->s_cap_lock);
+
+       if (cap->session == NULL)
+               ceph_put_cap(cap);
 
        if (!__ceph_is_any_caps(ci) && ci->i_snap_realm) {
                struct ceph_snap_realm *realm = ci->i_snap_realm;
@@ -1022,7 +1023,7 @@ void ceph_queue_caps_release(struct inode *inode)
                }
                spin_unlock(&session->s_cap_lock);
                p = rb_next(p);
-               __ceph_remove_cap(cap, NULL);
+               __ceph_remove_cap(cap);
 
        }
        spin_unlock(&inode->i_lock);
@@ -2521,7 +2522,7 @@ static void handle_cap_export(struct inode *inode, struct ceph_mds_caps *ex,
                        ci->i_cap_exporting_mseq = mseq;
                        ci->i_cap_exporting_issued = cap->issued;
                }
-               __ceph_remove_cap(cap, NULL);
+               __ceph_remove_cap(cap);
        } else {
                WARN_ON(!cap);
        }
index 02834cecc3a0641773c9233b8c45fa0bc233826c..124c0c17a14aca936bbdd275ba73ef35c7ac888d 100644 (file)
@@ -344,7 +344,7 @@ static struct ceph_mds_session *register_session(struct ceph_mds_client *mdsc,
        INIT_LIST_HEAD(&s->s_waiting);
        INIT_LIST_HEAD(&s->s_unsafe);
        s->s_num_cap_releases = 0;
-       s->s_iterating_caps = false;
+       s->s_cap_iterator = NULL;
        INIT_LIST_HEAD(&s->s_cap_releases);
        INIT_LIST_HEAD(&s->s_cap_releases_done);
        INIT_LIST_HEAD(&s->s_cap_flushing);
@@ -729,28 +729,61 @@ static int iterate_session_caps(struct ceph_mds_session *session,
                                 int (*cb)(struct inode *, struct ceph_cap *,
                                            void *), void *arg)
 {
-       struct ceph_cap *cap, *ncap;
-       struct inode *inode;
+       struct list_head *p;
+       struct ceph_cap *cap;
+       struct inode *inode, *last_inode = NULL;
+       struct ceph_cap *old_cap = NULL;
        int ret;
 
        dout("iterate_session_caps %p mds%d\n", session, session->s_mds);
        spin_lock(&session->s_cap_lock);
-       session->s_iterating_caps = true;
-       list_for_each_entry_safe(cap, ncap, &session->s_caps, session_caps) {
+       p = session->s_caps.next;
+       while (p != &session->s_caps) {
+               cap = list_entry(p, struct ceph_cap, session_caps);
                inode = igrab(&cap->ci->vfs_inode);
-               if (!inode)
+               if (!inode) {
+                       p = p->next;
                        continue;
+               }
+               session->s_cap_iterator = cap;
                spin_unlock(&session->s_cap_lock);
+
+               if (last_inode) {
+                       iput(last_inode);
+                       last_inode = NULL;
+               }
+               if (old_cap) {
+                       ceph_put_cap(old_cap);
+                       old_cap = NULL;
+               }
+
                ret = cb(inode, cap, arg);
-               iput(inode);
+               last_inode = inode;
+
                spin_lock(&session->s_cap_lock);
+               p = p->next;
+               if (cap->ci == NULL) {
+                       dout("iterate_session_caps  finishing cap %p removal\n",
+                            cap);
+                       BUG_ON(cap->session != session);
+                       list_del_init(&cap->session_caps);
+                       session->s_nr_caps--;
+                       cap->session = NULL;
+                       old_cap = cap;  /* put_cap it w/o locks held */
+               }
                if (ret < 0)
                        goto out;
        }
        ret = 0;
 out:
-       session->s_iterating_caps = false;
+       session->s_cap_iterator = NULL;
        spin_unlock(&session->s_cap_lock);
+
+       if (last_inode)
+               iput(last_inode);
+       if (old_cap)
+               ceph_put_cap(old_cap);
+
        return ret;
 }
 
@@ -942,7 +975,7 @@ static int trim_caps_cb(struct inode *inode, struct ceph_cap *cap, void *arg)
        session->s_trim_caps--;
        if (oissued) {
                /* we aren't the only cap.. just remove us */
-               __ceph_remove_cap(cap, NULL);
+               __ceph_remove_cap(cap);
        } else {
                /* try to drop referring dentries */
                spin_unlock(&inode->i_lock);
index 9d6b9017387989f409f541d792cf7a9d1deca720..961cc6f65878df8bd536683a12911581f8cca036 100644 (file)
@@ -114,7 +114,7 @@ struct ceph_mds_session {
        int               s_num_cap_releases;
        struct list_head  s_cap_releases; /* waiting cap_release messages */
        struct list_head  s_cap_releases_done; /* ready to send */
-       bool              s_iterating_caps;
+       struct ceph_cap  *s_cap_iterator;
 
        /* protected by mutex */
        struct list_head  s_cap_flushing;     /* inodes w/ flushing caps */
index 3b5faf9980f825fd0e6148a9c5f6ec3924fcc119..384f0e2e7c68e9267913fc16309a584bc011ccd3 100644 (file)
@@ -795,15 +795,15 @@ extern int ceph_add_cap(struct inode *inode,
                        int fmode, unsigned issued, unsigned wanted,
                        unsigned cap, unsigned seq, u64 realmino, int flags,
                        struct ceph_cap_reservation *caps_reservation);
-extern void __ceph_remove_cap(struct ceph_cap *cap,
-                             struct ceph_cap_reservation *ctx);
+extern void __ceph_remove_cap(struct ceph_cap *cap);
 static inline void ceph_remove_cap(struct ceph_cap *cap)
 {
        struct inode *inode = &cap->ci->vfs_inode;
        spin_lock(&inode->i_lock);
-       __ceph_remove_cap(cap, NULL);
+       __ceph_remove_cap(cap);
        spin_unlock(&inode->i_lock);
 }
+extern void ceph_put_cap(struct ceph_cap *cap);
 
 extern void ceph_queue_caps_release(struct inode *inode);
 extern int ceph_write_inode(struct inode *inode, int unused);