static int ocfs2_recover_orphans(struct ocfs2_super *osb,
int slot);
static int ocfs2_commit_thread(void *arg);
+static void ocfs2_queue_recovery_completion(struct ocfs2_journal *journal,
+ int slot_num,
+ struct ocfs2_dinode *la_dinode,
+ struct ocfs2_dinode *tl_dinode,
+ struct ocfs2_quota_recovery *qrec);
static inline int ocfs2_wait_on_mount(struct ocfs2_super *osb)
{
return __ocfs2_wait_on_mount(osb, 1);
}
+/*
+ * This replay_map is to track online/offline slots, so we could recover
+ * offline slots during recovery and mount
+ */
+
+enum ocfs2_replay_state {
+ REPLAY_UNNEEDED = 0, /* Replay is not needed, so ignore this map */
+ REPLAY_NEEDED, /* Replay slots marked in rm_replay_slots */
+ REPLAY_DONE /* Replay was already queued */
+};
+
+struct ocfs2_replay_map {
+ unsigned int rm_slots;
+ enum ocfs2_replay_state rm_state;
+ unsigned char rm_replay_slots[0];
+};
+
+void ocfs2_replay_map_set_state(struct ocfs2_super *osb, int state)
+{
+ if (!osb->replay_map)
+ return;
+
+ /* If we've already queued the replay, we don't have any more to do */
+ if (osb->replay_map->rm_state == REPLAY_DONE)
+ return;
+
+ osb->replay_map->rm_state = state;
+}
+
+int ocfs2_compute_replay_slots(struct ocfs2_super *osb)
+{
+ struct ocfs2_replay_map *replay_map;
+ int i, node_num;
+
+ /* If replay map is already set, we don't do it again */
+ if (osb->replay_map)
+ return 0;
+
+ replay_map = kzalloc(sizeof(struct ocfs2_replay_map) +
+ (osb->max_slots * sizeof(char)), GFP_KERNEL);
+
+ if (!replay_map) {
+ mlog_errno(-ENOMEM);
+ return -ENOMEM;
+ }
+
+ spin_lock(&osb->osb_lock);
+
+ replay_map->rm_slots = osb->max_slots;
+ replay_map->rm_state = REPLAY_UNNEEDED;
+
+ /* set rm_replay_slots for offline slot(s) */
+ for (i = 0; i < replay_map->rm_slots; i++) {
+ if (ocfs2_slot_to_node_num_locked(osb, i, &node_num) == -ENOENT)
+ replay_map->rm_replay_slots[i] = 1;
+ }
+
+ osb->replay_map = replay_map;
+ spin_unlock(&osb->osb_lock);
+ return 0;
+}
+
+void ocfs2_queue_replay_slots(struct ocfs2_super *osb)
+{
+ struct ocfs2_replay_map *replay_map = osb->replay_map;
+ int i;
+
+ if (!replay_map)
+ return;
+
+ if (replay_map->rm_state != REPLAY_NEEDED)
+ return;
+
+ for (i = 0; i < replay_map->rm_slots; i++)
+ if (replay_map->rm_replay_slots[i])
+ ocfs2_queue_recovery_completion(osb->journal, i, NULL,
+ NULL, NULL);
+ replay_map->rm_state = REPLAY_DONE;
+}
+
+void ocfs2_free_replay_slots(struct ocfs2_super *osb)
+{
+ struct ocfs2_replay_map *replay_map = osb->replay_map;
+
+ if (!osb->replay_map)
+ return;
+
+ kfree(replay_map);
+ osb->replay_map = NULL;
+}
+
int ocfs2_recovery_init(struct ocfs2_super *osb)
{
struct ocfs2_recovery_map *rm;
}
/* Called by the mount code to queue recovery the last part of
- * recovery for it's own slot. */
+ * recovery for it's own and offline slot(s). */
void ocfs2_complete_mount_recovery(struct ocfs2_super *osb)
{
struct ocfs2_journal *journal = osb->journal;
- if (osb->dirty) {
- /* No need to queue up our truncate_log as regular
- * cleanup will catch that. */
- ocfs2_queue_recovery_completion(journal,
- osb->slot_num,
- osb->local_alloc_copy,
- NULL,
- NULL);
- ocfs2_schedule_truncate_log_flush(osb, 0);
+ /* No need to queue up our truncate_log as regular cleanup will catch
+ * that */
+ ocfs2_queue_recovery_completion(journal, osb->slot_num,
+ osb->local_alloc_copy, NULL, NULL);
+ ocfs2_schedule_truncate_log_flush(osb, 0);
- osb->local_alloc_copy = NULL;
- osb->dirty = 0;
- }
+ osb->local_alloc_copy = NULL;
+ osb->dirty = 0;
+
+ /* queue to recover orphan slots for all offline slots */
+ ocfs2_replay_map_set_state(osb, REPLAY_NEEDED);
+ ocfs2_queue_replay_slots(osb);
+ ocfs2_free_replay_slots(osb);
}
void ocfs2_complete_quota_recovery(struct ocfs2_super *osb)
goto bail;
}
+ status = ocfs2_compute_replay_slots(osb);
+ if (status < 0)
+ mlog_errno(status);
+
+ /* queue recovery for our own slot */
+ ocfs2_queue_recovery_completion(osb->journal, osb->slot_num, NULL,
+ NULL, NULL);
+
spin_lock(&osb->osb_lock);
while (rm->rm_used) {
/* It's always safe to remove entry zero, as we won't
ocfs2_super_unlock(osb, 1);
- /* We always run recovery on our own orphan dir - the dead
- * node(s) may have disallowd a previos inode delete. Re-processing
- * is therefore required. */
- ocfs2_queue_recovery_completion(osb->journal, osb->slot_num, NULL,
- NULL, NULL);
+ /* queue recovery for offline slots */
+ ocfs2_queue_replay_slots(osb);
bail:
mutex_lock(&osb->recovery_lock);
goto restart;
}
+ ocfs2_free_replay_slots(osb);
osb->recovery_thread_task = NULL;
mb(); /* sync with ocfs2_recovery_thread_running */
wake_up(&osb->recovery_event);
goto done;
}
+ /* we need to run complete recovery for offline orphan slots */
+ ocfs2_replay_map_set_state(osb, REPLAY_NEEDED);
+
mlog(ML_NOTICE, "Recovering node %d from slot %d on device (%u,%u)\n",
node_num, slot_num,
MAJOR(osb->sb->s_dev), MINOR(osb->sb->s_dev));