ocfs2/cluster: Check slots for unconfigured live nodes
authorSunil Mushran <sunil.mushran@oracle.com>
Fri, 8 Oct 2010 00:00:16 +0000 (17:00 -0700)
committerSunil Mushran <sunil.mushran@oracle.com>
Fri, 8 Oct 2010 00:00:16 +0000 (17:00 -0700)
o2hb currently checks slots for configured nodes only. This patch makes
it check the slots for the live nodes too to take care of a race in which
a node is removed from the configuration but not from the live map.

Signed-off-by: Sunil Mushran <sunil.mushran@oracle.com>
fs/ocfs2/cluster/heartbeat.c
fs/ocfs2/cluster/tcp.c

index 12bb12ba86403c3a0a317c8cd9f204147c1cd379..a8f10649674ddd2f4a3fc56d05dde7e2bb1f3d5d 100644 (file)
@@ -541,6 +541,8 @@ static void o2hb_queue_node_event(struct o2hb_node_event *event,
 {
        assert_spin_locked(&o2hb_live_lock);
 
+       BUG_ON((!node) && (type != O2HB_NODE_DOWN_CB));
+
        event->hn_event_type = type;
        event->hn_node = node;
        event->hn_node_num = node_num;
@@ -593,14 +595,22 @@ static int o2hb_check_slot(struct o2hb_region *reg,
        u64 cputime;
        unsigned int dead_ms = o2hb_dead_threshold * O2HB_REGION_TIMEOUT_MS;
        unsigned int slot_dead_ms;
+       int tmp;
 
        memcpy(hb_block, slot->ds_raw_block, reg->hr_block_bytes);
 
-       /* Is this correct? Do we assume that the node doesn't exist
-        * if we're not configured for him? */
+       /*
+        * If a node is no longer configured but is still in the livemap, we
+        * may need to clear that bit from the livemap.
+        */
        node = o2nm_get_node_by_num(slot->ds_node_num);
-       if (!node)
-               return 0;
+       if (!node) {
+               spin_lock(&o2hb_live_lock);
+               tmp = test_bit(slot->ds_node_num, o2hb_live_node_bitmap);
+               spin_unlock(&o2hb_live_lock);
+               if (!tmp)
+                       return 0;
+       }
 
        if (!o2hb_verify_crc(reg, hb_block)) {
                /* all paths from here will drop o2hb_live_lock for
@@ -717,8 +727,9 @@ fire_callbacks:
                if (list_empty(&o2hb_live_slots[slot->ds_node_num])) {
                        clear_bit(slot->ds_node_num, o2hb_live_node_bitmap);
 
-                       o2hb_queue_node_event(&event, O2HB_NODE_DOWN_CB, node,
-                                             slot->ds_node_num);
+                       /* node can be null */
+                       o2hb_queue_node_event(&event, O2HB_NODE_DOWN_CB,
+                                             node, slot->ds_node_num);
 
                        changed = 1;
                }
@@ -738,7 +749,8 @@ out:
 
        o2hb_run_event_list(&event);
 
-       o2nm_node_put(node);
+       if (node)
+               o2nm_node_put(node);
        return changed;
 }
 
@@ -765,6 +777,7 @@ static int o2hb_do_disk_heartbeat(struct o2hb_region *reg)
 {
        int i, ret, highest_node, change = 0;
        unsigned long configured_nodes[BITS_TO_LONGS(O2NM_MAX_NODES)];
+       unsigned long live_node_bitmap[BITS_TO_LONGS(O2NM_MAX_NODES)];
        struct o2hb_bio_wait_ctxt write_wc;
 
        ret = o2nm_configured_node_map(configured_nodes,
@@ -774,6 +787,17 @@ static int o2hb_do_disk_heartbeat(struct o2hb_region *reg)
                return ret;
        }
 
+       /*
+        * If a node is not configured but is in the livemap, we still need
+        * to read the slot so as to be able to remove it from the livemap.
+        */
+       o2hb_fill_node_map(live_node_bitmap, sizeof(live_node_bitmap));
+       i = -1;
+       while ((i = find_next_bit(live_node_bitmap,
+                                 O2NM_MAX_NODES, i + 1)) < O2NM_MAX_NODES) {
+               set_bit(i, configured_nodes);
+       }
+
        highest_node = o2hb_highest_node(configured_nodes, O2NM_MAX_NODES);
        if (highest_node >= O2NM_MAX_NODES) {
                mlog(ML_NOTICE, "ocfs2_heartbeat: no configured nodes found!\n");
index cbe2f057cc2826cb6af4ed833319fe27c4bd565f..9aa426e4212330994255aaa25c85a9f60ec3874b 100644 (file)
@@ -1696,6 +1696,9 @@ static void o2net_hb_node_down_cb(struct o2nm_node *node, int node_num,
 {
        o2quo_hb_down(node_num);
 
+       if (!node)
+               return;
+
        if (node_num != o2nm_this_node())
                o2net_disconnect_node(node);
 
@@ -1709,6 +1712,8 @@ static void o2net_hb_node_up_cb(struct o2nm_node *node, int node_num,
 
        o2quo_hb_up(node_num);
 
+       BUG_ON(!node);
+
        /* ensure an immediate connect attempt */
        nn->nn_last_connect_attempt = jiffies -
                (msecs_to_jiffies(o2net_reconnect_delay()) + 1);