GFS2: Use lockref for glocks
authorSteven Whitehouse <swhiteho@redhat.com>
Tue, 15 Oct 2013 14:18:08 +0000 (15:18 +0100)
committerSteven Whitehouse <swhiteho@redhat.com>
Tue, 15 Oct 2013 14:18:08 +0000 (15:18 +0100)
Currently glocks have an atomic reference count and also a spinlock
which covers various internal fields, such as the state. This intent of
this patch is to replace the spinlock and the atomic reference count
with a lockref structure. This contains a spinlock which we can continue
to use as before, and a reference counter which is used in conjuction
with the spinlock to replace the previous atomic counter.

As a result of this there are some new rules for reference counting on
glocks. We need to distinguish between reference count changes under
gl_spin (which are now just increment or decrement of the new counter,
provided the count cannot hit zero) and those which are outside of
gl_spin, but which now take gl_spin internally.

The conversion is relatively straight forward. There is probably some
further clean up which can be done, but the priority at this stage is to
make the change in as simple a manner as possible.

A consequence of this change is that the reference count is being
decoupled from the lru list processing. This should allow future
adoption of the lru_list code with glocks in due course.

The reason for using the "dead" state and not just relying on 0 being
the "invalid state" is so that in due course 0 ref counts can be
allowable. The intent is to eventually be able to remove the ref count
changes which are currently hidden away in state_change().

Signed-off-by: Steven Whitehouse <swhiteho@redhat.com>
fs/gfs2/glock.c
fs/gfs2/glock.h
fs/gfs2/glops.c
fs/gfs2/incore.h
include/linux/lockref.h
lib/lockref.c

index c2f41b4d00b9872ccfb4f5f23f988d46f154df32..e66a8009aff16d66b1179bba0ffc3a266ff923f6 100644 (file)
@@ -31,6 +31,7 @@
 #include <linux/bit_spinlock.h>
 #include <linux/percpu.h>
 #include <linux/list_sort.h>
+#include <linux/lockref.h>
 
 #include "gfs2.h"
 #include "incore.h"
@@ -129,10 +130,10 @@ void gfs2_glock_free(struct gfs2_glock *gl)
  *
  */
 
-void gfs2_glock_hold(struct gfs2_glock *gl)
+static void gfs2_glock_hold(struct gfs2_glock *gl)
 {
-       GLOCK_BUG_ON(gl, atomic_read(&gl->gl_ref) == 0);
-       atomic_inc(&gl->gl_ref);
+       GLOCK_BUG_ON(gl, __lockref_is_dead(&gl->gl_lockref));
+       lockref_get(&gl->gl_lockref);
 }
 
 /**
@@ -186,20 +187,6 @@ static void gfs2_glock_remove_from_lru(struct gfs2_glock *gl)
        spin_unlock(&lru_lock);
 }
 
-/**
- * gfs2_glock_put_nolock() - Decrement reference count on glock
- * @gl: The glock to put
- *
- * This function should only be used if the caller has its own reference
- * to the glock, in addition to the one it is dropping.
- */
-
-void gfs2_glock_put_nolock(struct gfs2_glock *gl)
-{
-       if (atomic_dec_and_test(&gl->gl_ref))
-               GLOCK_BUG_ON(gl, 1);
-}
-
 /**
  * gfs2_glock_put() - Decrement reference count on glock
  * @gl: The glock to put
@@ -211,17 +198,22 @@ void gfs2_glock_put(struct gfs2_glock *gl)
        struct gfs2_sbd *sdp = gl->gl_sbd;
        struct address_space *mapping = gfs2_glock2aspace(gl);
 
-       if (atomic_dec_and_lock(&gl->gl_ref, &lru_lock)) {
-               __gfs2_glock_remove_from_lru(gl);
-               spin_unlock(&lru_lock);
-               spin_lock_bucket(gl->gl_hash);
-               hlist_bl_del_rcu(&gl->gl_list);
-               spin_unlock_bucket(gl->gl_hash);
-               GLOCK_BUG_ON(gl, !list_empty(&gl->gl_holders));
-               GLOCK_BUG_ON(gl, mapping && mapping->nrpages);
-               trace_gfs2_glock_put(gl);
-               sdp->sd_lockstruct.ls_ops->lm_put_lock(gl);
-       }
+       if (lockref_put_or_lock(&gl->gl_lockref))
+               return;
+
+       lockref_mark_dead(&gl->gl_lockref);
+
+       spin_lock(&lru_lock);
+       __gfs2_glock_remove_from_lru(gl);
+       spin_unlock(&lru_lock);
+       spin_unlock(&gl->gl_lockref.lock);
+       spin_lock_bucket(gl->gl_hash);
+       hlist_bl_del_rcu(&gl->gl_list);
+       spin_unlock_bucket(gl->gl_hash);
+       GLOCK_BUG_ON(gl, !list_empty(&gl->gl_holders));
+       GLOCK_BUG_ON(gl, mapping && mapping->nrpages);
+       trace_gfs2_glock_put(gl);
+       sdp->sd_lockstruct.ls_ops->lm_put_lock(gl);
 }
 
 /**
@@ -244,7 +236,7 @@ static struct gfs2_glock *search_bucket(unsigned int hash,
                        continue;
                if (gl->gl_sbd != sdp)
                        continue;
-               if (atomic_inc_not_zero(&gl->gl_ref))
+               if (lockref_get_not_dead(&gl->gl_lockref))
                        return gl;
        }
 
@@ -396,10 +388,11 @@ static void state_change(struct gfs2_glock *gl, unsigned int new_state)
        held2 = (new_state != LM_ST_UNLOCKED);
 
        if (held1 != held2) {
+               GLOCK_BUG_ON(gl, __lockref_is_dead(&gl->gl_lockref));
                if (held2)
-                       gfs2_glock_hold(gl);
+                       gl->gl_lockref.count++;
                else
-                       gfs2_glock_put_nolock(gl);
+                       gl->gl_lockref.count--;
        }
        if (held1 && held2 && list_empty(&gl->gl_holders))
                clear_bit(GLF_QUEUED, &gl->gl_flags);
@@ -626,9 +619,9 @@ out:
 out_sched:
        clear_bit(GLF_LOCK, &gl->gl_flags);
        smp_mb__after_clear_bit();
-       gfs2_glock_hold(gl);
+       gl->gl_lockref.count++;
        if (queue_delayed_work(glock_workqueue, &gl->gl_work, 0) == 0)
-               gfs2_glock_put_nolock(gl);
+               gl->gl_lockref.count--;
        return;
 
 out_unlock:
@@ -754,7 +747,7 @@ int gfs2_glock_get(struct gfs2_sbd *sdp, u64 number,
        gl->gl_sbd = sdp;
        gl->gl_flags = 0;
        gl->gl_name = name;
-       atomic_set(&gl->gl_ref, 1);
+       gl->gl_lockref.count = 1;
        gl->gl_state = LM_ST_UNLOCKED;
        gl->gl_target = LM_ST_UNLOCKED;
        gl->gl_demote_state = LM_ST_EXCLUSIVE;
@@ -1356,10 +1349,10 @@ void gfs2_glock_complete(struct gfs2_glock *gl, int ret)
                }
        }
 
-       spin_unlock(&gl->gl_spin);
+       gl->gl_lockref.count++;
        set_bit(GLF_REPLY_PENDING, &gl->gl_flags);
-       smp_wmb();
-       gfs2_glock_hold(gl);
+       spin_unlock(&gl->gl_spin);
+
        if (queue_delayed_work(glock_workqueue, &gl->gl_work, 0) == 0)
                gfs2_glock_put(gl);
 }
@@ -1404,15 +1397,19 @@ __acquires(&lru_lock)
        while(!list_empty(list)) {
                gl = list_entry(list->next, struct gfs2_glock, gl_lru);
                list_del_init(&gl->gl_lru);
+               if (!spin_trylock(&gl->gl_spin)) {
+                       list_add(&gl->gl_lru, &lru_list);
+                       atomic_inc(&lru_count);
+                       continue;
+               }
                clear_bit(GLF_LRU, &gl->gl_flags);
-               gfs2_glock_hold(gl);
                spin_unlock(&lru_lock);
-               spin_lock(&gl->gl_spin);
+               gl->gl_lockref.count++;
                if (demote_ok(gl))
                        handle_callback(gl, LM_ST_UNLOCKED, 0, false);
                WARN_ON(!test_and_clear_bit(GLF_LOCK, &gl->gl_flags));
                if (queue_delayed_work(glock_workqueue, &gl->gl_work, 0) == 0)
-                       gfs2_glock_put_nolock(gl);
+                       gl->gl_lockref.count--;
                spin_unlock(&gl->gl_spin);
                spin_lock(&lru_lock);
        }
@@ -1493,7 +1490,7 @@ static void examine_bucket(glock_examiner examiner, const struct gfs2_sbd *sdp,
 
        rcu_read_lock();
        hlist_bl_for_each_entry_rcu(gl, pos, head, gl_list) {
-               if ((gl->gl_sbd == sdp) && atomic_inc_not_zero(&gl->gl_ref))
+               if ((gl->gl_sbd == sdp) && lockref_get_not_dead(&gl->gl_lockref))
                        examiner(gl);
        }
        rcu_read_unlock();
@@ -1746,7 +1743,7 @@ int gfs2_dump_glock(struct seq_file *seq, const struct gfs2_glock *gl)
                  state2str(gl->gl_demote_state), dtime,
                  atomic_read(&gl->gl_ail_count),
                  atomic_read(&gl->gl_revokes),
-                 atomic_read(&gl->gl_ref), gl->gl_hold_time);
+                 (int)gl->gl_lockref.count, gl->gl_hold_time);
 
        list_for_each_entry(gh, &gl->gl_holders, gh_list) {
                error = dump_holder(seq, gh);
@@ -1902,7 +1899,7 @@ static int gfs2_glock_iter_next(struct gfs2_glock_iter *gi)
                        gi->nhash = 0;
                }
        /* Skip entries for other sb and dead entries */
-       } while (gi->sdp != gi->gl->gl_sbd || atomic_read(&gi->gl->gl_ref) == 0);
+       } while (gi->sdp != gi->gl->gl_sbd || __lockref_is_dead(&gl->gl_lockref));
 
        return 0;
 }
index 69f66e3d22bf512787b3b9db403e0c8c91babd59..6647d77366ba097c4c98f482bcef1ab980704baa 100644 (file)
@@ -181,8 +181,6 @@ static inline struct address_space *gfs2_glock2aspace(struct gfs2_glock *gl)
 extern int gfs2_glock_get(struct gfs2_sbd *sdp, u64 number,
                          const struct gfs2_glock_operations *glops,
                          int create, struct gfs2_glock **glp);
-extern void gfs2_glock_hold(struct gfs2_glock *gl);
-extern void gfs2_glock_put_nolock(struct gfs2_glock *gl);
 extern void gfs2_glock_put(struct gfs2_glock *gl);
 extern void gfs2_holder_init(struct gfs2_glock *gl, unsigned int state,
                             unsigned flags, struct gfs2_holder *gh);
index e2e0a90396e7823da9aa47c44a727d5e75751cc6..db908f697139cfffbca462d3a7528e13139576b5 100644 (file)
@@ -525,9 +525,9 @@ static void iopen_go_callback(struct gfs2_glock *gl, bool remote)
 
        if (gl->gl_demote_state == LM_ST_UNLOCKED &&
            gl->gl_state == LM_ST_SHARED && ip) {
-               gfs2_glock_hold(gl);
+               gl->gl_lockref.count++;
                if (queue_work(gfs2_delete_workqueue, &gl->gl_delete) == 0)
-                       gfs2_glock_put_nolock(gl);
+                       gl->gl_lockref.count--;
        }
 }
 
index 2ab4f8d8f4c4b092a92f636eafece065dd7957f6..bb88e417231f88f95c4801d582af0cf254cf8f7e 100644 (file)
@@ -21,6 +21,7 @@
 #include <linux/rbtree.h>
 #include <linux/ktime.h>
 #include <linux/percpu.h>
+#include <linux/lockref.h>
 
 #define DIO_WAIT       0x00000010
 #define DIO_METADATA   0x00000020
@@ -321,9 +322,9 @@ struct gfs2_glock {
        struct gfs2_sbd *gl_sbd;
        unsigned long gl_flags;         /* GLF_... */
        struct lm_lockname gl_name;
-       atomic_t gl_ref;
 
-       spinlock_t gl_spin;
+       struct lockref gl_lockref;
+#define gl_spin gl_lockref.lock
 
        /* State fields protected by gl_spin */
        unsigned int gl_state:2,        /* Current state */
index f279ed9a91631cca7f236d20ef5a29d152332932..13dfd36a329474df228102dd49bafbdf1b5c687d 100644 (file)
@@ -36,4 +36,10 @@ extern int lockref_put_or_lock(struct lockref *);
 extern void lockref_mark_dead(struct lockref *);
 extern int lockref_get_not_dead(struct lockref *);
 
+/* Must be called under spinlock for reliable results */
+static inline int __lockref_is_dead(const struct lockref *l)
+{
+       return ((int)l->count < 0);
+}
+
 #endif /* __LINUX_LOCKREF_H */
index e2cd2c0a882126c58e04e47102fb4975c5247d3c..8ff162fe3413f890e9b7a4bf4c0b6e80336e2ca5 100644 (file)
@@ -136,6 +136,7 @@ void lockref_mark_dead(struct lockref *lockref)
        assert_spin_locked(&lockref->lock);
        lockref->count = -128;
 }
+EXPORT_SYMBOL(lockref_mark_dead);
 
 /**
  * lockref_get_not_dead - Increments count unless the ref is dead