ocfs2: only recover one dead node at a time
authorKurt Hackel <kurt.hackel@oracle.com>
Fri, 28 Apr 2006 01:05:41 +0000 (18:05 -0700)
committerMark Fasheh <mark.fasheh@oracle.com>
Mon, 26 Jun 2006 21:42:48 +0000 (14:42 -0700)
Signed-off-by: Kurt Hackel <kurt.hackel@oracle.com>
Signed-off-by: Mark Fasheh <mark.fasheh@oracle.com>
fs/ocfs2/dlm/dlmrecovery.c

index ee0860bb67342c7152c051a2970982ed3f9d45cb..39488763728936e706dfb952f07e3f6b5963c0e5 100644 (file)
@@ -710,6 +710,14 @@ int dlm_request_all_locks_handler(struct o2net_msg *msg, u32 len, void *data)
        if (!dlm_grab(dlm))
                return -EINVAL;
 
+       if (lr->dead_node != dlm->reco.dead_node) {
+               mlog(ML_ERROR, "%s: node %u sent dead_node=%u, but local "
+                    "dead_node is %u\n", dlm->name, lr->node_idx,
+                    lr->dead_node, dlm->reco.dead_node);
+               /* this is a hack */
+               dlm_put(dlm);
+               return -ENOMEM;
+       }
        BUG_ON(lr->dead_node != dlm->reco.dead_node);
 
        item = kcalloc(1, sizeof(*item), GFP_KERNEL);
@@ -1504,7 +1512,7 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
        struct dlm_lock *newlock = NULL;
        struct dlm_lockstatus *lksb = NULL;
        int ret = 0;
-       int i;
+       int i, bad;
        struct list_head *iter;
        struct dlm_lock *lock = NULL;
 
@@ -1613,9 +1621,33 @@ static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
                 * relative to each other, but clearly *not*
                 * preserved relative to locks from other nodes.
                 */
+               bad = 0;
                spin_lock(&res->spinlock);
-               dlm_lock_get(newlock);
-               list_add_tail(&newlock->list, queue);
+               list_for_each_entry(lock, queue, list) {
+                       if (lock->ml.cookie == ml->cookie) {
+                               u64 c = lock->ml.cookie;
+                               mlog(ML_ERROR, "%s:%.*s: %u:%llu: lock already "
+                                    "exists on this lockres!\n", dlm->name,
+                                    res->lockname.len, res->lockname.name,
+                                    dlm_get_lock_cookie_node(c),
+                                    dlm_get_lock_cookie_seq(c));
+
+                               mlog(ML_NOTICE, "sent lock: type=%d, conv=%d, "
+                                    "node=%u, cookie=%u:%llu, queue=%d\n",
+                                    ml->type, ml->convert_type, ml->node,
+                                    dlm_get_lock_cookie_node(ml->cookie),
+                                    dlm_get_lock_cookie_seq(ml->cookie),
+                                    ml->list);
+
+                               __dlm_print_one_lock_resource(res);
+                               bad = 1;
+                               break;
+                       }
+               }
+               if (!bad) {
+                       dlm_lock_get(newlock);
+                       list_add_tail(&newlock->list, queue);
+               }
                spin_unlock(&res->spinlock);
        }
        mlog(0, "done running all the locks\n");