ocfs2/dlm: fix race between dispatched_work and dlm_lockres_grab_inflight_worker
authorJoseph Qi <joseph.qi@huawei.com>
Fri, 19 Dec 2014 00:17:34 +0000 (16:17 -0800)
committerLinus Torvalds <torvalds@linux-foundation.org>
Fri, 19 Dec 2014 03:08:11 +0000 (19:08 -0800)
Commit ac4fef4d23ed ("ocfs2/dlm: do not purge lockres that is queued for
assert master") may have the following possible race case:

  dlm_dispatch_assert_master       dlm_wq
  ========================================================================
  queue_work(dlm->quedlm_worker,
      &dlm->dispatched_work);
                                 dispatch work,
                                 dlm_lockres_drop_inflight_worker
                                 *BUG_ON(res->inflight_assert_workers == 0)*
  dlm_lockres_grab_inflight_worker
  inflight_assert_workers++

So ensure inflight_assert_workers to be increased first.

Signed-off-by: Joseph Qi <joseph.qi@huawei.com>
Signed-off-by: Xue jiufei <xuejiufei@huawei.com>
Cc: Joel Becker <jlbec@evilplan.org>
Reviewed-by: Mark Fasheh <mfasheh@suse.de>
Signed-off-by: Andrew Morton <akpm@linux-foundation.org>
Signed-off-by: Linus Torvalds <torvalds@linux-foundation.org>
fs/ocfs2/dlm/dlmmaster.c

index 3689b35920422304b47fa92f6c65ced6a9b316c4..a6944b25fd5b5ddba427ac66c8f50fb582cf1451 100644 (file)
@@ -695,14 +695,6 @@ void __dlm_lockres_grab_inflight_worker(struct dlm_ctxt *dlm,
                        res->inflight_assert_workers);
 }
 
-static void dlm_lockres_grab_inflight_worker(struct dlm_ctxt *dlm,
-               struct dlm_lock_resource *res)
-{
-       spin_lock(&res->spinlock);
-       __dlm_lockres_grab_inflight_worker(dlm, res);
-       spin_unlock(&res->spinlock);
-}
-
 static void __dlm_lockres_drop_inflight_worker(struct dlm_ctxt *dlm,
                struct dlm_lock_resource *res)
 {
@@ -1646,6 +1638,7 @@ send_response:
                }
                mlog(0, "%u is the owner of %.*s, cleaning everyone else\n",
                             dlm->node_num, res->lockname.len, res->lockname.name);
+               spin_lock(&res->spinlock);
                ret = dlm_dispatch_assert_master(dlm, res, 0, request->node_idx,
                                                 DLM_ASSERT_MASTER_MLE_CLEANUP);
                if (ret < 0) {
@@ -1653,7 +1646,8 @@ send_response:
                        response = DLM_MASTER_RESP_ERROR;
                        dlm_lockres_put(res);
                } else
-                       dlm_lockres_grab_inflight_worker(dlm, res);
+                       __dlm_lockres_grab_inflight_worker(dlm, res);
+               spin_unlock(&res->spinlock);
        } else {
                if (res)
                        dlm_lockres_put(res);