From 6b0c7440bcb4b7e5a64836132caf56bf19a33f6e Mon Sep 17 00:00:00 2001 From: Andreas Gruenbacher Date: Fri, 30 Jun 2017 08:10:01 -0500 Subject: [PATCH] gfs2: Clean up glock work enqueuing This patch adds a standardized queueing mechanism for glock work with spin_lock protection to prevent races. Signed-off-by: Andreas Gruenbacher Signed-off-by: Bob Peterson --- fs/gfs2/glock.c | 124 +++++++++++++++++++++++++++++------------------- 1 file changed, 74 insertions(+), 50 deletions(-) diff --git a/fs/gfs2/glock.c b/fs/gfs2/glock.c index 959a19ced4d5..6cd71c50b8bd 100644 --- a/fs/gfs2/glock.c +++ b/fs/gfs2/glock.c @@ -152,20 +152,34 @@ static void gfs2_glock_remove_from_lru(struct gfs2_glock *gl) spin_unlock(&lru_lock); } -/** - * gfs2_glock_put() - Decrement reference count on glock - * @gl: The glock to put - * +/* + * Enqueue the glock on the work queue. Passes one glock reference on to the + * work queue. */ +static void __gfs2_glock_queue_work(struct gfs2_glock *gl, unsigned long delay) { + if (!queue_delayed_work(glock_workqueue, &gl->gl_work, delay)) { + /* + * We are holding the lockref spinlock, and the work was still + * queued above. The queued work (glock_work_func) takes that + * spinlock before dropping its glock reference(s), so it + * cannot have dropped them in the meantime. + */ + GLOCK_BUG_ON(gl, gl->gl_lockref.count < 2); + gl->gl_lockref.count--; + } +} -void gfs2_glock_put(struct gfs2_glock *gl) +static void gfs2_glock_queue_work(struct gfs2_glock *gl, unsigned long delay) { + spin_lock(&gl->gl_lockref.lock); + __gfs2_glock_queue_work(gl, delay); + spin_unlock(&gl->gl_lockref.lock); +} + +static void __gfs2_glock_put(struct gfs2_glock *gl) { struct gfs2_sbd *sdp = gl->gl_name.ln_sbd; struct address_space *mapping = gfs2_glock2aspace(gl); - if (lockref_put_or_lock(&gl->gl_lockref)) - return; - lockref_mark_dead(&gl->gl_lockref); gfs2_glock_remove_from_lru(gl); @@ -177,6 +191,20 @@ void gfs2_glock_put(struct gfs2_glock *gl) sdp->sd_lockstruct.ls_ops->lm_put_lock(gl); } +/** + * gfs2_glock_put() - Decrement reference count on glock + * @gl: The glock to put + * + */ + +void gfs2_glock_put(struct gfs2_glock *gl) +{ + if (lockref_put_or_lock(&gl->gl_lockref)) + return; + + __gfs2_glock_put(gl); +} + /** * may_grant - check if its ok to grant a new lock * @gl: The glock @@ -482,8 +510,7 @@ __acquires(&gl->gl_lockref.lock) target == LM_ST_UNLOCKED && test_bit(SDF_SKIP_DLM_UNLOCK, &sdp->sd_flags)) { finish_xmote(gl, target); - if (queue_delayed_work(glock_workqueue, &gl->gl_work, 0) == 0) - gfs2_glock_put(gl); + gfs2_glock_queue_work(gl, 0); } else if (ret) { pr_err("lm_lock ret %d\n", ret); @@ -492,8 +519,7 @@ __acquires(&gl->gl_lockref.lock) } } else { /* lock_nolock */ finish_xmote(gl, target); - if (queue_delayed_work(glock_workqueue, &gl->gl_work, 0) == 0) - gfs2_glock_put(gl); + gfs2_glock_queue_work(gl, 0); } spin_lock(&gl->gl_lockref.lock); @@ -565,8 +591,7 @@ out_sched: clear_bit(GLF_LOCK, &gl->gl_flags); smp_mb__after_atomic(); gl->gl_lockref.count++; - if (queue_delayed_work(glock_workqueue, &gl->gl_work, 0) == 0) - gl->gl_lockref.count--; + __gfs2_glock_queue_work(gl, 0); return; out_unlock: @@ -601,11 +626,11 @@ static void glock_work_func(struct work_struct *work) { unsigned long delay = 0; struct gfs2_glock *gl = container_of(work, struct gfs2_glock, gl_work.work); - int drop_ref = 0; + unsigned int drop_refs = 1; if (test_and_clear_bit(GLF_REPLY_PENDING, &gl->gl_flags)) { finish_xmote(gl, gl->gl_reply); - drop_ref = 1; + drop_refs++; } spin_lock(&gl->gl_lockref.lock); if (test_bit(GLF_PENDING_DEMOTE, &gl->gl_flags) && @@ -623,17 +648,25 @@ static void glock_work_func(struct work_struct *work) } } run_queue(gl, 0); - spin_unlock(&gl->gl_lockref.lock); - if (!delay) - gfs2_glock_put(gl); - else { + if (delay) { + /* Keep one glock reference for the work we requeue. */ + drop_refs--; if (gl->gl_name.ln_type != LM_TYPE_INODE) delay = 0; - if (queue_delayed_work(glock_workqueue, &gl->gl_work, delay) == 0) - gfs2_glock_put(gl); + __gfs2_glock_queue_work(gl, delay); } - if (drop_ref) - gfs2_glock_put(gl); + + /* + * Drop the remaining glock references manually here. (Mind that + * __gfs2_glock_queue_work depends on the lockref spinlock begin held + * here as well.) + */ + gl->gl_lockref.count -= drop_refs; + if (!gl->gl_lockref.count) { + __gfs2_glock_put(gl); + return; + } + spin_unlock(&gl->gl_lockref.lock); } /** @@ -986,8 +1019,7 @@ int gfs2_glock_nq(struct gfs2_holder *gh) test_and_clear_bit(GLF_FROZEN, &gl->gl_flags))) { set_bit(GLF_REPLY_PENDING, &gl->gl_flags); gl->gl_lockref.count++; - if (queue_delayed_work(glock_workqueue, &gl->gl_work, 0) == 0) - gl->gl_lockref.count--; + __gfs2_glock_queue_work(gl, 0); } run_queue(gl, 1); spin_unlock(&gl->gl_lockref.lock); @@ -1047,17 +1079,15 @@ void gfs2_glock_dq(struct gfs2_holder *gh) gfs2_glock_add_to_lru(gl); trace_gfs2_glock_queue(gh, 0); + if (unlikely(!fast_path)) { + gl->gl_lockref.count++; + if (test_bit(GLF_PENDING_DEMOTE, &gl->gl_flags) && + !test_bit(GLF_DEMOTE, &gl->gl_flags) && + gl->gl_name.ln_type == LM_TYPE_INODE) + delay = gl->gl_hold_time; + __gfs2_glock_queue_work(gl, delay); + } spin_unlock(&gl->gl_lockref.lock); - if (likely(fast_path)) - return; - - gfs2_glock_hold(gl); - if (test_bit(GLF_PENDING_DEMOTE, &gl->gl_flags) && - !test_bit(GLF_DEMOTE, &gl->gl_flags) && - gl->gl_name.ln_type == LM_TYPE_INODE) - delay = gl->gl_hold_time; - if (queue_delayed_work(glock_workqueue, &gl->gl_work, delay) == 0) - gfs2_glock_put(gl); } void gfs2_glock_dq_wait(struct gfs2_holder *gh) @@ -1233,9 +1263,8 @@ void gfs2_glock_cb(struct gfs2_glock *gl, unsigned int state) spin_lock(&gl->gl_lockref.lock); handle_callback(gl, state, delay, true); + __gfs2_glock_queue_work(gl, delay); spin_unlock(&gl->gl_lockref.lock); - if (queue_delayed_work(glock_workqueue, &gl->gl_work, delay) == 0) - gfs2_glock_put(gl); } /** @@ -1294,10 +1323,8 @@ void gfs2_glock_complete(struct gfs2_glock *gl, int ret) gl->gl_lockref.count++; set_bit(GLF_REPLY_PENDING, &gl->gl_flags); + __gfs2_glock_queue_work(gl, 0); spin_unlock(&gl->gl_lockref.lock); - - if (queue_delayed_work(glock_workqueue, &gl->gl_work, 0) == 0) - gfs2_glock_put(gl); } static int glock_cmp(void *priv, struct list_head *a, struct list_head *b) @@ -1355,8 +1382,7 @@ add_back_to_lru: if (demote_ok(gl)) handle_callback(gl, LM_ST_UNLOCKED, 0, false); WARN_ON(!test_and_clear_bit(GLF_LOCK, &gl->gl_flags)); - if (queue_delayed_work(glock_workqueue, &gl->gl_work, 0) == 0) - gl->gl_lockref.count--; + __gfs2_glock_queue_work(gl, 0); spin_unlock(&gl->gl_lockref.lock); cond_resched_lock(&lru_lock); } @@ -1462,13 +1488,12 @@ static void glock_hash_walk(glock_examiner examiner, const struct gfs2_sbd *sdp) static void thaw_glock(struct gfs2_glock *gl) { - if (!test_and_clear_bit(GLF_FROZEN, &gl->gl_flags)) - goto out; - set_bit(GLF_REPLY_PENDING, &gl->gl_flags); - if (queue_delayed_work(glock_workqueue, &gl->gl_work, 0) == 0) { -out: + if (!test_and_clear_bit(GLF_FROZEN, &gl->gl_flags)) { gfs2_glock_put(gl); + return; } + set_bit(GLF_REPLY_PENDING, &gl->gl_flags); + gfs2_glock_queue_work(gl, 0); } /** @@ -1484,9 +1509,8 @@ static void clear_glock(struct gfs2_glock *gl) spin_lock(&gl->gl_lockref.lock); if (gl->gl_state != LM_ST_UNLOCKED) handle_callback(gl, LM_ST_UNLOCKED, 0, false); + __gfs2_glock_queue_work(gl, 0); spin_unlock(&gl->gl_lockref.lock); - if (queue_delayed_work(glock_workqueue, &gl->gl_work, 0) == 0) - gfs2_glock_put(gl); } /** -- 2.20.1