ceph: track pending caps flushing accurately
authorYan, Zheng <zyan@redhat.com>
Tue, 9 Jun 2015 07:48:57 +0000 (15:48 +0800)
committerIlya Dryomov <idryomov@gmail.com>
Thu, 25 Jun 2015 08:49:30 +0000 (11:49 +0300)
Previously we do not trace accurate TID for flushing caps. when
MDS failovers, we have no choice but to re-send all flushing caps
with a new TID. This can cause problem because MDS can has already
flushed some caps and has issued the same caps to other client.
The re-sent cap flush has a new TID, which makes MDS unable to
detect if it has already processed the cap flush.

This patch adds code to track pending caps flushing accurately.
When re-sending cap flush is needed, we use its original flush
TID.

Signed-off-by: Yan, Zheng <zyan@redhat.com>
fs/ceph/caps.c
fs/ceph/inode.c
fs/ceph/mds_client.c
fs/ceph/mds_client.h
fs/ceph/super.h

index dc988337f8413cb253b00c27ec14cace933dddf8..9a25f8d66fbcd45e1aa7fe67dfe5b3d65974c994 100644 (file)
@@ -1097,7 +1097,8 @@ void ceph_queue_caps_release(struct inode *inode)
  * caller should hold snap_rwsem (read), s_mutex.
  */
 static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
-                     int op, int used, int want, int retain, int flushing)
+                     int op, int used, int want, int retain, int flushing,
+                     u64 flush_tid)
        __releases(cap->ci->i_ceph_lock)
 {
        struct ceph_inode_info *ci = cap->ci;
@@ -1115,8 +1116,6 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
        u64 xattr_version = 0;
        struct ceph_buffer *xattr_blob = NULL;
        int delayed = 0;
-       u64 flush_tid = 0;
-       int i;
        int ret;
        bool inline_data;
 
@@ -1160,24 +1159,7 @@ static int __send_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap,
        cap->implemented &= cap->issued | used;
        cap->mds_wanted = want;
 
-       if (flushing) {
-               /*
-                * assign a tid for flush operations so we can avoid
-                * flush1 -> dirty1 -> flush2 -> flushack1 -> mark
-                * clean type races.  track latest tid for every bit
-                * so we can handle flush AxFw, flush Fw, and have the
-                * first ack clean Ax.
-                */
-               flush_tid = ++ci->i_cap_flush_last_tid;
-               dout(" cap_flush_tid %d\n", (int)flush_tid);
-               for (i = 0; i < CEPH_CAP_BITS; i++)
-                       if (flushing & (1 << i))
-                               ci->i_cap_flush_tid[i] = flush_tid;
-
-               follows = ci->i_head_snapc->seq;
-       } else {
-               follows = 0;
-       }
+       follows = flushing ? ci->i_head_snapc->seq : 0;
 
        keep = cap->implemented;
        seq = cap->seq;
@@ -1311,7 +1293,10 @@ retry:
                        goto retry;
                }
 
-               capsnap->flush_tid = ++ci->i_cap_flush_last_tid;
+               spin_lock(&mdsc->cap_dirty_lock);
+               capsnap->flush_tid = ++mdsc->last_cap_flush_tid;
+               spin_unlock(&mdsc->cap_dirty_lock);
+
                atomic_inc(&capsnap->nref);
                if (list_empty(&capsnap->flushing_item))
                        list_add_tail(&capsnap->flushing_item,
@@ -1407,6 +1392,29 @@ int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask)
        return dirty;
 }
 
+static void __add_cap_flushing_to_inode(struct ceph_inode_info *ci,
+                                       struct ceph_cap_flush *cf)
+{
+       struct rb_node **p = &ci->i_cap_flush_tree.rb_node;
+       struct rb_node *parent = NULL;
+       struct ceph_cap_flush *other = NULL;
+
+       while (*p) {
+               parent = *p;
+               other = rb_entry(parent, struct ceph_cap_flush, i_node);
+
+               if (cf->tid < other->tid)
+                       p = &(*p)->rb_left;
+               else if (cf->tid > other->tid)
+                       p = &(*p)->rb_right;
+               else
+                       BUG();
+       }
+
+       rb_link_node(&cf->i_node, parent, p);
+       rb_insert_color(&cf->i_node, &ci->i_cap_flush_tree);
+}
+
 /*
  * Add dirty inode to the flushing list.  Assigned a seq number so we
  * can wait for caps to flush without starving.
@@ -1414,10 +1422,12 @@ int __ceph_mark_dirty_caps(struct ceph_inode_info *ci, int mask)
  * Called under i_ceph_lock.
  */
 static int __mark_caps_flushing(struct inode *inode,
-                                struct ceph_mds_session *session)
+                               struct ceph_mds_session *session,
+                               u64 *flush_tid)
 {
        struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
        struct ceph_inode_info *ci = ceph_inode(inode);
+       struct ceph_cap_flush *cf;
        int flushing;
 
        BUG_ON(ci->i_dirty_caps == 0);
@@ -1432,9 +1442,14 @@ static int __mark_caps_flushing(struct inode *inode,
        ci->i_dirty_caps = 0;
        dout(" inode %p now !dirty\n", inode);
 
+       cf = kmalloc(sizeof(*cf), GFP_ATOMIC);
+       cf->caps = flushing;
+
        spin_lock(&mdsc->cap_dirty_lock);
        list_del_init(&ci->i_dirty_item);
 
+       cf->tid = ++mdsc->last_cap_flush_tid;
+
        if (list_empty(&ci->i_flushing_item)) {
                ci->i_cap_flush_seq = ++mdsc->cap_flush_seq;
                list_add_tail(&ci->i_flushing_item, &session->s_cap_flushing);
@@ -1448,6 +1463,9 @@ static int __mark_caps_flushing(struct inode *inode,
        }
        spin_unlock(&mdsc->cap_dirty_lock);
 
+       __add_cap_flushing_to_inode(ci, cf);
+
+       *flush_tid = cf->tid;
        return flushing;
 }
 
@@ -1493,6 +1511,7 @@ void ceph_check_caps(struct ceph_inode_info *ci, int flags,
        struct ceph_mds_client *mdsc = fsc->mdsc;
        struct inode *inode = &ci->vfs_inode;
        struct ceph_cap *cap;
+       u64 flush_tid;
        int file_wanted, used, cap_used;
        int took_snap_rwsem = 0;             /* true if mdsc->snap_rwsem held */
        int issued, implemented, want, retain, revoking, flushing = 0;
@@ -1711,17 +1730,20 @@ ack:
                        took_snap_rwsem = 1;
                }
 
-               if (cap == ci->i_auth_cap && ci->i_dirty_caps)
-                       flushing = __mark_caps_flushing(inode, session);
-               else
+               if (cap == ci->i_auth_cap && ci->i_dirty_caps) {
+                       flushing = __mark_caps_flushing(inode, session,
+                                                       &flush_tid);
+               } else {
                        flushing = 0;
+                       flush_tid = 0;
+               }
 
                mds = cap->mds;  /* remember mds, so we don't repeat */
                sent++;
 
                /* __send_cap drops i_ceph_lock */
                delayed += __send_cap(mdsc, cap, CEPH_CAP_OP_UPDATE, cap_used,
-                                     want, retain, flushing);
+                                     want, retain, flushing, flush_tid);
                goto retry; /* retake i_ceph_lock and restart our cap scan. */
        }
 
@@ -1750,12 +1772,13 @@ ack:
 /*
  * Try to flush dirty caps back to the auth mds.
  */
-static int try_flush_caps(struct inode *inode, u16 flush_tid[])
+static int try_flush_caps(struct inode *inode, u64 *ptid)
 {
        struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
        struct ceph_inode_info *ci = ceph_inode(inode);
        struct ceph_mds_session *session = NULL;
        int flushing = 0;
+       u64 flush_tid = 0;
 
 retry:
        spin_lock(&ci->i_ceph_lock);
@@ -1780,46 +1803,52 @@ retry:
                if (cap->session->s_state < CEPH_MDS_SESSION_OPEN)
                        goto out;
 
-               flushing = __mark_caps_flushing(inode, session);
+               flushing = __mark_caps_flushing(inode, session, &flush_tid);
 
                /* __send_cap drops i_ceph_lock */
                delayed = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH, used, want,
-                                    cap->issued | cap->implemented, flushing);
+                                    (cap->issued | cap->implemented),
+                                    flushing, flush_tid);
 
-               spin_lock(&ci->i_ceph_lock);
-               if (delayed)
+               if (delayed) {
+                       spin_lock(&ci->i_ceph_lock);
                        __cap_delay_requeue(mdsc, ci);
+                       spin_unlock(&ci->i_ceph_lock);
+               }
+       } else {
+               struct rb_node *n = rb_last(&ci->i_cap_flush_tree);
+               if (n) {
+                       struct ceph_cap_flush *cf =
+                               rb_entry(n, struct ceph_cap_flush, i_node);
+                       flush_tid = cf->tid;
+               }
+               flushing = ci->i_flushing_caps;
+               spin_unlock(&ci->i_ceph_lock);
        }
-
-       flushing = ci->i_flushing_caps;
-       if (flushing)
-               memcpy(flush_tid, ci->i_cap_flush_tid,
-                      sizeof(ci->i_cap_flush_tid));
 out:
-       spin_unlock(&ci->i_ceph_lock);
        if (session)
                mutex_unlock(&session->s_mutex);
+
+       *ptid = flush_tid;
        return flushing;
 }
 
 /*
  * Return true if we've flushed caps through the given flush_tid.
  */
-static int caps_are_flushed(struct inode *inode, u16 flush_tid[])
+static int caps_are_flushed(struct inode *inode, u64 flush_tid)
 {
        struct ceph_inode_info *ci = ceph_inode(inode);
-       int i, ret = 1;
+       struct ceph_cap_flush *cf;
+       struct rb_node *n;
+       int ret = 1;
 
        spin_lock(&ci->i_ceph_lock);
-       for (i = 0; i < CEPH_CAP_BITS; i++) {
-               if (!(ci->i_flushing_caps & (1 << i)))
-                       continue;
-               // tid only has 16 bits. we need to handle wrapping
-               if ((s16)(ci->i_cap_flush_tid[i] - flush_tid[i]) <= 0) {
-                       /* still flushing this bit */
+       n = rb_first(&ci->i_cap_flush_tree);
+       if (n) {
+               cf = rb_entry(n, struct ceph_cap_flush, i_node);
+               if (cf->tid <= flush_tid)
                        ret = 0;
-                       break;
-               }
        }
        spin_unlock(&ci->i_ceph_lock);
        return ret;
@@ -1922,7 +1951,7 @@ int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync)
 {
        struct inode *inode = file->f_mapping->host;
        struct ceph_inode_info *ci = ceph_inode(inode);
-       u16 flush_tid[CEPH_CAP_BITS];
+       u64 flush_tid;
        int ret;
        int dirty;
 
@@ -1938,7 +1967,7 @@ int ceph_fsync(struct file *file, loff_t start, loff_t end, int datasync)
 
        mutex_lock(&inode->i_mutex);
 
-       dirty = try_flush_caps(inode, flush_tid);
+       dirty = try_flush_caps(inode, &flush_tid);
        dout("fsync dirty caps are %s\n", ceph_cap_string(dirty));
 
        ret = unsafe_dirop_wait(inode);
@@ -1967,14 +1996,14 @@ out:
 int ceph_write_inode(struct inode *inode, struct writeback_control *wbc)
 {
        struct ceph_inode_info *ci = ceph_inode(inode);
-       u16 flush_tid[CEPH_CAP_BITS];
+       u64 flush_tid;
        int err = 0;
        int dirty;
        int wait = wbc->sync_mode == WB_SYNC_ALL;
 
        dout("write_inode %p wait=%d\n", inode, wait);
        if (wait) {
-               dirty = try_flush_caps(inode, flush_tid);
+               dirty = try_flush_caps(inode, &flush_tid);
                if (dirty)
                        err = wait_event_interruptible(ci->i_cap_wq,
                                       caps_are_flushed(inode, flush_tid));
@@ -2022,6 +2051,51 @@ static void kick_flushing_capsnaps(struct ceph_mds_client *mdsc,
        }
 }
 
+static int __kick_flushing_caps(struct ceph_mds_client *mdsc,
+                               struct ceph_mds_session *session,
+                               struct ceph_inode_info *ci)
+{
+       struct inode *inode = &ci->vfs_inode;
+       struct ceph_cap *cap;
+       struct ceph_cap_flush *cf;
+       struct rb_node *n;
+       int delayed = 0;
+       u64 first_tid = 0;
+
+       while (true) {
+               spin_lock(&ci->i_ceph_lock);
+               cap = ci->i_auth_cap;
+               if (!(cap && cap->session == session)) {
+                       pr_err("%p auth cap %p not mds%d ???\n", inode,
+                                       cap, session->s_mds);
+                       spin_unlock(&ci->i_ceph_lock);
+                       break;
+               }
+
+               for (n = rb_first(&ci->i_cap_flush_tree); n; n = rb_next(n)) {
+                       cf = rb_entry(n, struct ceph_cap_flush, i_node);
+                       if (cf->tid >= first_tid)
+                               break;
+               }
+               if (!n) {
+                       spin_unlock(&ci->i_ceph_lock);
+                       break;
+               }
+
+               cf = rb_entry(n, struct ceph_cap_flush, i_node);
+               first_tid = cf->tid + 1;
+
+               dout("kick_flushing_caps %p cap %p tid %llu %s\n", inode,
+                    cap, cf->tid, ceph_cap_string(cf->caps));
+               delayed |= __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH,
+                                     __ceph_caps_used(ci),
+                                     __ceph_caps_wanted(ci),
+                                     cap->issued | cap->implemented,
+                                     cf->caps, cf->tid);
+       }
+       return delayed;
+}
+
 void ceph_kick_flushing_caps(struct ceph_mds_client *mdsc,
                             struct ceph_mds_session *session)
 {
@@ -2031,28 +2105,10 @@ void ceph_kick_flushing_caps(struct ceph_mds_client *mdsc,
 
        dout("kick_flushing_caps mds%d\n", session->s_mds);
        list_for_each_entry(ci, &session->s_cap_flushing, i_flushing_item) {
-               struct inode *inode = &ci->vfs_inode;
-               struct ceph_cap *cap;
-               int delayed = 0;
-
-               spin_lock(&ci->i_ceph_lock);
-               cap = ci->i_auth_cap;
-               if (cap && cap->session == session) {
-                       dout("kick_flushing_caps %p cap %p %s\n", inode,
-                            cap, ceph_cap_string(ci->i_flushing_caps));
-                       delayed = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH,
-                                            __ceph_caps_used(ci),
-                                            __ceph_caps_wanted(ci),
-                                            cap->issued | cap->implemented,
-                                            ci->i_flushing_caps);
-                       if (delayed) {
-                               spin_lock(&ci->i_ceph_lock);
-                               __cap_delay_requeue(mdsc, ci);
-                               spin_unlock(&ci->i_ceph_lock);
-                       }
-               } else {
-                       pr_err("%p auth cap %p not mds%d ???\n", inode,
-                              cap, session->s_mds);
+               int delayed = __kick_flushing_caps(mdsc, session, ci);
+               if (delayed) {
+                       spin_lock(&ci->i_ceph_lock);
+                       __cap_delay_requeue(mdsc, ci);
                        spin_unlock(&ci->i_ceph_lock);
                }
        }
@@ -2064,7 +2120,6 @@ static void kick_flushing_inode_caps(struct ceph_mds_client *mdsc,
 {
        struct ceph_inode_info *ci = ceph_inode(inode);
        struct ceph_cap *cap;
-       int delayed = 0;
 
        spin_lock(&ci->i_ceph_lock);
        cap = ci->i_auth_cap;
@@ -2074,16 +2129,16 @@ static void kick_flushing_inode_caps(struct ceph_mds_client *mdsc,
        __ceph_flush_snaps(ci, &session, 1);
 
        if (ci->i_flushing_caps) {
+               int delayed;
+
                spin_lock(&mdsc->cap_dirty_lock);
                list_move_tail(&ci->i_flushing_item,
                               &cap->session->s_cap_flushing);
                spin_unlock(&mdsc->cap_dirty_lock);
 
-               delayed = __send_cap(mdsc, cap, CEPH_CAP_OP_FLUSH,
-                                    __ceph_caps_used(ci),
-                                    __ceph_caps_wanted(ci),
-                                    cap->issued | cap->implemented,
-                                    ci->i_flushing_caps);
+               spin_unlock(&ci->i_ceph_lock);
+
+               delayed = __kick_flushing_caps(mdsc, session, ci);
                if (delayed) {
                        spin_lock(&ci->i_ceph_lock);
                        __cap_delay_requeue(mdsc, ci);
@@ -2836,16 +2891,29 @@ static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid,
 {
        struct ceph_inode_info *ci = ceph_inode(inode);
        struct ceph_mds_client *mdsc = ceph_sb_to_client(inode->i_sb)->mdsc;
+       struct ceph_cap_flush *cf;
+       struct rb_node *n;
+       LIST_HEAD(to_remove);
        unsigned seq = le32_to_cpu(m->seq);
        int dirty = le32_to_cpu(m->dirty);
        int cleaned = 0;
        int drop = 0;
-       int i;
 
-       for (i = 0; i < CEPH_CAP_BITS; i++)
-               if ((dirty & (1 << i)) &&
-                   (u16)flush_tid == ci->i_cap_flush_tid[i])
-                       cleaned |= 1 << i;
+       n = rb_first(&ci->i_cap_flush_tree);
+       while (n) {
+               cf = rb_entry(n, struct ceph_cap_flush, i_node);
+               n = rb_next(&cf->i_node);
+               if (cf->tid == flush_tid)
+                       cleaned = cf->caps;
+               if (cf->tid <= flush_tid) {
+                       rb_erase(&cf->i_node, &ci->i_cap_flush_tree);
+                       list_add_tail(&cf->list, &to_remove);
+               } else {
+                       cleaned &= ~cf->caps;
+                       if (!cleaned)
+                               break;
+               }
+       }
 
        dout("handle_cap_flush_ack inode %p mds%d seq %d on %s cleaned %s,"
             " flushing %s -> %s\n",
@@ -2890,6 +2958,13 @@ static void handle_cap_flush_ack(struct inode *inode, u64 flush_tid,
 
 out:
        spin_unlock(&ci->i_ceph_lock);
+
+       while (!list_empty(&to_remove)) {
+               cf = list_first_entry(&to_remove,
+                                     struct ceph_cap_flush, list);
+               list_del(&cf->list);
+               kfree(cf);
+       }
        if (drop)
                iput(inode);
 }
index 1c991df276c96e093f88cb9cf795df4fb82edfbf..6d3f19db8c8a1eaa341ba5eb2fb996eafbae98e2 100644 (file)
@@ -417,8 +417,7 @@ struct inode *ceph_alloc_inode(struct super_block *sb)
        INIT_LIST_HEAD(&ci->i_dirty_item);
        INIT_LIST_HEAD(&ci->i_flushing_item);
        ci->i_cap_flush_seq = 0;
-       ci->i_cap_flush_last_tid = 0;
-       memset(&ci->i_cap_flush_tid, 0, sizeof(ci->i_cap_flush_tid));
+       ci->i_cap_flush_tree = RB_ROOT;
        init_waitqueue_head(&ci->i_cap_wq);
        ci->i_hold_caps_min = 0;
        ci->i_hold_caps_max = 0;
index 8080d486a9913226ea9a4c8e2bec4a17e959e3c8..839901f51512a395470d3f67ab5c1aeba6062e9c 100644 (file)
@@ -1142,6 +1142,7 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
                                  void *arg)
 {
        struct ceph_inode_info *ci = ceph_inode(inode);
+       LIST_HEAD(to_remove);
        int drop = 0;
 
        dout("removing cap %p, ci is %p, inode is %p\n",
@@ -1149,9 +1150,19 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
        spin_lock(&ci->i_ceph_lock);
        __ceph_remove_cap(cap, false);
        if (!ci->i_auth_cap) {
+               struct ceph_cap_flush *cf;
                struct ceph_mds_client *mdsc =
                        ceph_sb_to_client(inode->i_sb)->mdsc;
 
+               while (true) {
+                       struct rb_node *n = rb_first(&ci->i_cap_flush_tree);
+                       if (!n)
+                               break;
+                       cf = rb_entry(n, struct ceph_cap_flush, i_node);
+                       rb_erase(&cf->i_node, &ci->i_cap_flush_tree);
+                       list_add(&cf->list, &to_remove);
+               }
+
                spin_lock(&mdsc->cap_dirty_lock);
                if (!list_empty(&ci->i_dirty_item)) {
                        pr_warn_ratelimited(
@@ -1173,8 +1184,16 @@ static int remove_session_caps_cb(struct inode *inode, struct ceph_cap *cap,
                        drop = 1;
                }
                spin_unlock(&mdsc->cap_dirty_lock);
+
        }
        spin_unlock(&ci->i_ceph_lock);
+       while (!list_empty(&to_remove)) {
+               struct ceph_cap_flush *cf;
+               cf = list_first_entry(&to_remove,
+                                     struct ceph_cap_flush, list);
+               list_del(&cf->list);
+               kfree(cf);
+       }
        while (drop--)
                iput(inode);
        return 0;
@@ -3408,6 +3427,7 @@ int ceph_mdsc_init(struct ceph_fs_client *fsc)
        INIT_LIST_HEAD(&mdsc->snap_flush_list);
        spin_lock_init(&mdsc->snap_flush_lock);
        mdsc->cap_flush_seq = 0;
+       mdsc->last_cap_flush_tid = 1;
        INIT_LIST_HEAD(&mdsc->cap_dirty);
        INIT_LIST_HEAD(&mdsc->cap_dirty_migrating);
        mdsc->num_cap_flushing = 0;
index 509d6822e9b1b069db268f65b6b1c865997d7ea3..19f6084203f07d920e16b0605442a54c46e803e6 100644 (file)
@@ -307,6 +307,7 @@ struct ceph_mds_client {
        spinlock_t       snap_flush_lock;
 
        u64               cap_flush_seq;
+       u64               last_cap_flush_tid;
        struct list_head  cap_dirty;        /* inodes with dirty caps */
        struct list_head  cap_dirty_migrating; /* ...that are migration... */
        int               num_cap_flushing; /* # caps we are flushing */
index c4961353d058c2f2f2d2e9d8fdeff2926d153b40..cc597f52e04663091131f86c6a366e5d3fca77be 100644 (file)
@@ -186,6 +186,15 @@ static inline void ceph_put_cap_snap(struct ceph_cap_snap *capsnap)
        }
 }
 
+struct ceph_cap_flush {
+       u64 tid;
+       int caps;
+       union {
+               struct rb_node i_node;
+               struct list_head list;
+       };
+};
+
 /*
  * The frag tree describes how a directory is fragmented, potentially across
  * multiple metadata servers.  It is also used to indicate points where
@@ -299,7 +308,7 @@ struct ceph_inode_info {
        /* we need to track cap writeback on a per-cap-bit basis, to allow
         * overlapping, pipelined cap flushes to the mds.  we can probably
         * reduce the tid to 8 bits if we're concerned about inode size. */
-       u16 i_cap_flush_last_tid, i_cap_flush_tid[CEPH_CAP_BITS];
+       struct rb_root i_cap_flush_tree;
        wait_queue_head_t i_cap_wq;      /* threads waiting on a capability */
        unsigned long i_hold_caps_min; /* jiffies */
        unsigned long i_hold_caps_max; /* jiffies */