ocfs2: teach dlm_restart_lock_mastery() to wait on recovery
authorKurt Hackel <kurt.hackel@oracle.com>
Mon, 1 May 2006 18:49:52 +0000 (11:49 -0700)
committerMark Fasheh <mark.fasheh@oracle.com>
Mon, 26 Jun 2006 21:43:03 +0000 (14:43 -0700)
Change behavior of dlm_restart_lock_mastery() when a node goes down.  Dump
all responses that have been collected and start over.

Signed-off-by: Kurt Hackel <kurt.hackel@oracle.com>
Signed-off-by: Mark Fasheh <mark.fasheh@oracle.com>
fs/ocfs2/dlm/dlmmaster.c

index e5d7271d503834ede247b6d21170591a2c85e536..915283fb48c3de42a2865d54d344593e87d609d6 100644 (file)
@@ -867,6 +867,7 @@ lookup:
        spin_unlock(&dlm->master_lock);
        spin_unlock(&dlm->spinlock);
 
+redo_request:
        while (wait_on_recovery) {
                /* any cluster changes that occurred after dropping the
                 * dlm spinlock would be detectable be a change on the mle,
@@ -904,7 +905,6 @@ lookup:
        if (blocked)
                goto wait;
 
-redo_request:
        ret = -EINVAL;
        dlm_node_iter_init(mle->vote_map, &iter);
        while ((nodenum = dlm_node_iter_next(&iter)) >= 0) {
@@ -929,6 +929,7 @@ wait:
        /* keep going until the response map includes all nodes */
        ret = dlm_wait_for_lock_mastery(dlm, res, mle, &blocked);
        if (ret < 0) {
+               wait_on_recovery = 1;
                mlog(0, "%s:%.*s: node map changed, redo the "
                     "master request now, blocked=%d\n",
                     dlm->name, res->lockname.len,
@@ -1210,18 +1211,6 @@ static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm,
                        set_bit(node, mle->vote_map);
                } else {
                        mlog(ML_ERROR, "node down! %d\n", node);
-
-                       /* if the node wasn't involved in mastery skip it,
-                        * but clear it out from the maps so that it will
-                        * not affect mastery of this lockres */
-                       clear_bit(node, mle->response_map);
-                       clear_bit(node, mle->vote_map);
-                       if (!test_bit(node, mle->maybe_map))
-                               goto next;
-
-                       /* if we're already blocked on lock mastery, and the
-                        * dead node wasn't the expected master, or there is
-                        * another node in the maybe_map, keep waiting */
                        if (blocked) {
                                int lowest = find_next_bit(mle->maybe_map,
                                                       O2NM_MAX_NODES, 0);
@@ -1229,54 +1218,53 @@ static int dlm_restart_lock_mastery(struct dlm_ctxt *dlm,
                                /* act like it was never there */
                                clear_bit(node, mle->maybe_map);
 
-                               if (node != lowest)
-                                       goto next;
-
-                               mlog(ML_ERROR, "expected master %u died while "
-                                    "this node was blocked waiting on it!\n",
-                                    node);
-                               lowest = find_next_bit(mle->maybe_map,
-                                                      O2NM_MAX_NODES,
-                                                      lowest+1);
-                               if (lowest < O2NM_MAX_NODES) {
-                                       mlog(0, "still blocked. waiting "
-                                            "on %u now\n", lowest);
-                                       goto next;
+                               if (node == lowest) {
+                                       mlog(0, "expected master %u died"
+                                           " while this node was blocked "
+                                           "waiting on it!\n", node);
+                                       lowest = find_next_bit(mle->maybe_map,
+                                                       O2NM_MAX_NODES,
+                                                       lowest+1);
+                                       if (lowest < O2NM_MAX_NODES) {
+                                               mlog(0, "%s:%.*s:still "
+                                                    "blocked. waiting on %u "
+                                                    "now\n", dlm->name,
+                                                    res->lockname.len,
+                                                    res->lockname.name,
+                                                    lowest);
+                                       } else {
+                                               /* mle is an MLE_BLOCK, but
+                                                * there is now nothing left to
+                                                * block on.  we need to return
+                                                * all the way back out and try
+                                                * again with an MLE_MASTER.
+                                                * dlm_do_local_recovery_cleanup
+                                                * has already run, so the mle
+                                                * refcount is ok */
+                                               mlog(0, "%s:%.*s: no "
+                                                    "longer blocking. try to "
+                                                    "master this here\n",
+                                                    dlm->name,
+                                                    res->lockname.len,
+                                                    res->lockname.name);
+                                               mle->type = DLM_MLE_MASTER;
+                                               mle->u.res = res;
+                                       }
                                }
-
-                               /* mle is an MLE_BLOCK, but there is now
-                                * nothing left to block on.  we need to return
-                                * all the way back out and try again with
-                                * an MLE_MASTER. dlm_do_local_recovery_cleanup
-                                * has already run, so the mle refcount is ok */
-                               mlog(0, "no longer blocking. we can "
-                                    "try to master this here\n");
-                               mle->type = DLM_MLE_MASTER;
-                               memset(mle->maybe_map, 0,
-                                      sizeof(mle->maybe_map));
-                               memset(mle->response_map, 0,
-                                      sizeof(mle->maybe_map));
-                               memcpy(mle->vote_map, mle->node_map,
-                                      sizeof(mle->node_map));
-                               mle->u.res = res;
-                               set_bit(dlm->node_num, mle->maybe_map);
-
-                               ret = -EAGAIN;
-                               goto next;
                        }
 
-                       clear_bit(node, mle->maybe_map);
-                       if (node > dlm->node_num)
-                               goto next;
-
-                       mlog(0, "dead node in map!\n");
-                       /* yuck. go back and re-contact all nodes
-                        * in the vote_map, removing this node. */
-                       memset(mle->response_map, 0,
-                              sizeof(mle->response_map));
+                       /* now blank out everything, as if we had never
+                        * contacted anyone */
+                       memset(mle->maybe_map, 0, sizeof(mle->maybe_map));
+                       memset(mle->response_map, 0, sizeof(mle->response_map));
+                       /* reset the vote_map to the current node_map */
+                       memcpy(mle->vote_map, mle->node_map,
+                              sizeof(mle->node_map));
+                       /* put myself into the maybe map */
+                       if (mle->type != DLM_MLE_BLOCK)
+                               set_bit(dlm->node_num, mle->maybe_map);
                }
                ret = -EAGAIN;
-next:
                node = dlm_bitmap_diff_iter_next(&bdi, &sc);
        }
        return ret;