gfs2: gfs2_glock_get: Wait on freeing glocks
authorAndreas Gruenbacher <agruenba@redhat.com>
Tue, 1 Aug 2017 16:18:26 +0000 (11:18 -0500)
committerBob Peterson <rpeterso@redhat.com>
Thu, 10 Aug 2017 15:39:31 +0000 (10:39 -0500)
Keep glocks in their hash table until they are freed instead of removing
them when their last reference is dropped.  This allows to wait for any
previous instances of a glock to go away in gfs2_glock_get before
creating a new glocks.

Special thanks to Andy Price for finding and fixing a problem which also
required us to delete the rcu_read_unlock from the error case in function
gfs2_glock_get.

Signed-off-by: Andreas Gruenbacher <agruenba@redhat.com>
Signed-off-by: Bob Peterson <rpeterso@redhat.com>
fs/gfs2/glock.c

index 1029340fc8ba46f1bba1a26a9bcbeb403e13b59e..11d48b9640477e76109c5cc475ceb2d5fb2a8323 100644 (file)
@@ -15,6 +15,7 @@
 #include <linux/buffer_head.h>
 #include <linux/delay.h>
 #include <linux/sort.h>
+#include <linux/hash.h>
 #include <linux/jhash.h>
 #include <linux/kallsyms.h>
 #include <linux/gfs2_ondisk.h>
@@ -80,6 +81,66 @@ static struct rhashtable_params ht_parms = {
 
 static struct rhashtable gl_hash_table;
 
+#define GLOCK_WAIT_TABLE_BITS 12
+#define GLOCK_WAIT_TABLE_SIZE (1 << GLOCK_WAIT_TABLE_BITS)
+static wait_queue_head_t glock_wait_table[GLOCK_WAIT_TABLE_SIZE] __cacheline_aligned;
+
+struct wait_glock_queue {
+       struct lm_lockname *name;
+       wait_queue_entry_t wait;
+};
+
+static int glock_wake_function(wait_queue_entry_t *wait, unsigned int mode,
+                              int sync, void *key)
+{
+       struct wait_glock_queue *wait_glock =
+               container_of(wait, struct wait_glock_queue, wait);
+       struct lm_lockname *wait_name = wait_glock->name;
+       struct lm_lockname *wake_name = key;
+
+       if (wake_name->ln_sbd != wait_name->ln_sbd ||
+           wake_name->ln_number != wait_name->ln_number ||
+           wake_name->ln_type != wait_name->ln_type)
+               return 0;
+       return autoremove_wake_function(wait, mode, sync, key);
+}
+
+static wait_queue_head_t *glock_waitqueue(struct lm_lockname *name)
+{
+       u32 hash = jhash2((u32 *)name, sizeof(*name) / 4, 0);
+
+       return glock_wait_table + hash_32(hash, GLOCK_WAIT_TABLE_BITS);
+}
+
+static void prepare_to_wait_on_glock(wait_queue_head_t **wq,
+                                    struct wait_glock_queue *wait,
+                                    struct lm_lockname *name)
+{
+       wait->name = name;
+       init_wait(&wait->wait);
+       wait->wait.func = glock_wake_function;
+       *wq = glock_waitqueue(name);
+       prepare_to_wait(*wq, &wait->wait, TASK_UNINTERRUPTIBLE);
+}
+
+static void finish_wait_on_glock(wait_queue_head_t *wq,
+                                struct wait_glock_queue *wait)
+{
+       finish_wait(wq, &wait->wait);
+}
+
+/**
+ * wake_up_glock  -  Wake up waiters on a glock
+ * @gl: the glock
+ */
+static void wake_up_glock(struct gfs2_glock *gl)
+{
+       wait_queue_head_t *wq = glock_waitqueue(&gl->gl_name);
+
+       if (waitqueue_active(wq))
+               __wake_up(wq, TASK_NORMAL, 1, &gl->gl_name);
+}
+
 static void gfs2_glock_dealloc(struct rcu_head *rcu)
 {
        struct gfs2_glock *gl = container_of(rcu, struct gfs2_glock, gl_rcu);
@@ -96,6 +157,9 @@ void gfs2_glock_free(struct gfs2_glock *gl)
 {
        struct gfs2_sbd *sdp = gl->gl_name.ln_sbd;
 
+       rhashtable_remove_fast(&gl_hash_table, &gl->gl_node, ht_parms);
+       smp_mb();
+       wake_up_glock(gl);
        call_rcu(&gl->gl_rcu, gfs2_glock_dealloc);
        if (atomic_dec_and_test(&sdp->sd_glock_disposal))
                wake_up(&sdp->sd_glock_wait);
@@ -194,7 +258,6 @@ static void __gfs2_glock_put(struct gfs2_glock *gl)
 
        gfs2_glock_remove_from_lru(gl);
        spin_unlock(&gl->gl_lockref.lock);
-       rhashtable_remove_fast(&gl_hash_table, &gl->gl_node, ht_parms);
        GLOCK_BUG_ON(gl, !list_empty(&gl->gl_holders));
        GLOCK_BUG_ON(gl, mapping && mapping->nrpages);
        trace_gfs2_glock_put(gl);
@@ -679,6 +742,36 @@ static void glock_work_func(struct work_struct *work)
        spin_unlock(&gl->gl_lockref.lock);
 }
 
+static struct gfs2_glock *find_insert_glock(struct lm_lockname *name,
+                                           struct gfs2_glock *new)
+{
+       struct wait_glock_queue wait;
+       wait_queue_head_t *wq;
+       struct gfs2_glock *gl;
+
+again:
+       prepare_to_wait_on_glock(&wq, &wait, name);
+       rcu_read_lock();
+       if (new) {
+               gl = rhashtable_lookup_get_insert_fast(&gl_hash_table,
+                       &new->gl_node, ht_parms);
+               if (IS_ERR(gl))
+                       goto out;
+       } else {
+               gl = rhashtable_lookup_fast(&gl_hash_table,
+                       name, ht_parms);
+       }
+       if (gl && !lockref_get_not_dead(&gl->gl_lockref)) {
+               rcu_read_unlock();
+               schedule();
+               goto again;
+       }
+out:
+       rcu_read_unlock();
+       finish_wait_on_glock(wq, &wait);
+       return gl;
+}
+
 /**
  * gfs2_glock_get() - Get a glock, or create one if one doesn't exist
  * @sdp: The GFS2 superblock
@@ -705,15 +798,11 @@ int gfs2_glock_get(struct gfs2_sbd *sdp, u64 number,
        struct kmem_cache *cachep;
        int ret = 0;
 
-       rcu_read_lock();
-       gl = rhashtable_lookup_fast(&gl_hash_table, &name, ht_parms);
-       if (gl && !lockref_get_not_dead(&gl->gl_lockref))
-               gl = NULL;
-       rcu_read_unlock();
-
-       *glp = gl;
-       if (gl)
+       gl = find_insert_glock(&name, NULL);
+       if (gl) {
+               *glp = gl;
                return 0;
+       }
        if (!create)
                return -ENOENT;
 
@@ -767,10 +856,7 @@ int gfs2_glock_get(struct gfs2_sbd *sdp, u64 number,
                mapping->writeback_index = 0;
        }
 
-again:
-       rcu_read_lock();
-       tmp = rhashtable_lookup_get_insert_fast(&gl_hash_table, &gl->gl_node,
-                                               ht_parms);
+       tmp = find_insert_glock(&name, gl);
        if (!tmp) {
                *glp = gl;
                goto out;
@@ -779,13 +865,7 @@ again:
                ret = PTR_ERR(tmp);
                goto out_free;
        }
-       if (lockref_get_not_dead(&tmp->gl_lockref)) {
-               *glp = tmp;
-               goto out_free;
-       }
-       rcu_read_unlock();
-       cond_resched();
-       goto again;
+       *glp = tmp;
 
 out_free:
        kfree(gl->gl_lksb.sb_lvbptr);
@@ -793,7 +873,6 @@ out_free:
        atomic_dec(&sdp->sd_glock_disposal);
 
 out:
-       rcu_read_unlock();
        return ret;
 }
 
@@ -1806,7 +1885,7 @@ static int gfs2_sbstats_seq_show(struct seq_file *seq, void *iter_ptr)
 
 int __init gfs2_glock_init(void)
 {
-       int ret;
+       int i, ret;
 
        ret = rhashtable_init(&gl_hash_table, &ht_parms);
        if (ret < 0)
@@ -1835,6 +1914,9 @@ int __init gfs2_glock_init(void)
                return ret;
        }
 
+       for (i = 0; i < GLOCK_WAIT_TABLE_SIZE; i++)
+               init_waitqueue_head(glock_wait_table + i);
+
        return 0;
 }