static int dlm_recovery_thread(void *data);
void dlm_complete_recovery_thread(struct dlm_ctxt *dlm);
int dlm_launch_recovery_thread(struct dlm_ctxt *dlm);
-static void dlm_kick_recovery_thread(struct dlm_ctxt *dlm);
+void dlm_kick_recovery_thread(struct dlm_ctxt *dlm);
static int dlm_do_recovery(struct dlm_ctxt *dlm);
static int dlm_pick_recovery_master(struct dlm_ctxt *dlm);
u8 send_to,
struct dlm_lock_resource *res,
int total_locks);
-static int dlm_lockres_master_requery(struct dlm_ctxt *dlm,
- struct dlm_lock_resource *res,
- u8 *real_master);
static int dlm_process_recovery_data(struct dlm_ctxt *dlm,
struct dlm_lock_resource *res,
struct dlm_migratable_lockres *mres);
-static int dlm_do_master_requery(struct dlm_ctxt *dlm,
- struct dlm_lock_resource *res,
- u8 nodenum, u8 *real_master);
static int dlm_send_finalize_reco_message(struct dlm_ctxt *dlm);
static int dlm_send_all_done_msg(struct dlm_ctxt *dlm,
u8 dead_node, u8 send_to);
* RECOVERY THREAD
*/
-static void dlm_kick_recovery_thread(struct dlm_ctxt *dlm)
+void dlm_kick_recovery_thread(struct dlm_ctxt *dlm)
{
/* wake the recovery thread
* this will wake the reco thread in one of three places
return dead;
}
+int dlm_wait_for_node_death(struct dlm_ctxt *dlm, u8 node, int timeout)
+{
+ if (timeout) {
+ mlog(ML_NOTICE, "%s: waiting %dms for notification of "
+ "death of node %u\n", dlm->name, timeout, node);
+ wait_event_timeout(dlm->dlm_reco_thread_wq,
+ dlm_is_node_dead(dlm, node),
+ msecs_to_jiffies(timeout));
+ } else {
+ mlog(ML_NOTICE, "%s: waiting indefinitely for notification "
+ "of death of node %u\n", dlm->name, node);
+ wait_event(dlm->dlm_reco_thread_wq,
+ dlm_is_node_dead(dlm, node));
+ }
+ /* for now, return 0 */
+ return 0;
+}
+
/* callers of the top-level api calls (dlmlock/dlmunlock) should
* block on the dlm->reco.event when recovery is in progress.
* the dlm recovery thread will set this state when it begins
dlm->name, dlm->reco.dead_node, dlm->reco.new_master,
dead_node, reco_master);
mlog(ML_ERROR, "%s: name=%.*s master=%u locks=%u/%u flags=%u "
- "entry[0]={c=%"MLFu64",l=%u,f=%u,t=%d,ct=%d,hb=%d,n=%u}\n",
+ "entry[0]={c=%u:%llu,l=%u,f=%u,t=%d,ct=%d,hb=%d,n=%u}\n",
dlm->name, mres->lockname_len, mres->lockname, mres->master,
mres->num_locks, mres->total_locks, mres->flags,
- mres->ml[0].cookie, mres->ml[0].list, mres->ml[0].flags,
+ dlm_get_lock_cookie_node(mres->ml[0].cookie),
+ dlm_get_lock_cookie_seq(mres->ml[0].cookie),
+ mres->ml[0].list, mres->ml[0].flags,
mres->ml[0].type, mres->ml[0].convert_type,
mres->ml[0].highest_blocked, mres->ml[0].node);
BUG();
mlog(0, "found lockres owned by dead node while "
"doing recovery for node %u. sending it.\n",
dead_node);
- list_del_init(&res->recovering);
- list_add_tail(&res->recovering, list);
+ list_move_tail(&res->recovering, list);
} else if (res->owner == DLM_LOCK_RES_OWNER_UNKNOWN) {
mlog(0, "found UNKNOWN owner while doing recovery "
"for node %u. sending it.\n", dead_node);
- list_del_init(&res->recovering);
- list_add_tail(&res->recovering, list);
+ list_move_tail(&res->recovering, list);
}
}
spin_unlock(&dlm->spinlock);
-static int dlm_lockres_master_requery(struct dlm_ctxt *dlm,
- struct dlm_lock_resource *res,
- u8 *real_master)
+int dlm_lockres_master_requery(struct dlm_ctxt *dlm,
+ struct dlm_lock_resource *res, u8 *real_master)
{
struct dlm_node_iter iter;
int nodenum;
ret = dlm_do_master_requery(dlm, res, nodenum, real_master);
if (ret < 0) {
mlog_errno(ret);
- BUG();
- /* TODO: need to figure a way to restart this */
+ if (!dlm_is_host_down(ret))
+ BUG();
+ /* host is down, so answer for that node would be
+ * DLM_LOCK_RES_OWNER_UNKNOWN. continue. */
}
if (*real_master != DLM_LOCK_RES_OWNER_UNKNOWN) {
mlog(0, "lock master is %u\n", *real_master);
}
-static int dlm_do_master_requery(struct dlm_ctxt *dlm,
- struct dlm_lock_resource *res,
- u8 nodenum, u8 *real_master)
+int dlm_do_master_requery(struct dlm_ctxt *dlm, struct dlm_lock_resource *res,
+ u8 nodenum, u8 *real_master)
{
int ret = -EINVAL;
struct dlm_master_requery req;
struct dlm_ctxt *dlm = data;
struct dlm_master_requery *req = (struct dlm_master_requery *)msg->buf;
struct dlm_lock_resource *res = NULL;
+ unsigned int hash;
int master = DLM_LOCK_RES_OWNER_UNKNOWN;
u32 flags = DLM_ASSERT_MASTER_REQUERY;
return master;
}
+ hash = dlm_lockid_hash(req->name, req->namelen);
+
spin_lock(&dlm->spinlock);
- res = __dlm_lookup_lockres(dlm, req->name, req->namelen);
+ res = __dlm_lookup_lockres(dlm, req->name, req->namelen, hash);
if (res) {
spin_lock(&res->spinlock);
master = res->owner;
/* lock is always created locally first, and
* destroyed locally last. it must be on the list */
if (!lock) {
+ u64 c = ml->cookie;
mlog(ML_ERROR, "could not find local lock "
- "with cookie %"MLFu64"!\n",
- ml->cookie);
+ "with cookie %u:%llu!\n",
+ dlm_get_lock_cookie_node(c),
+ dlm_get_lock_cookie_seq(c));
BUG();
}
BUG_ON(lock->ml.node != ml->node);
/* move the lock to its proper place */
/* do not alter lock refcount. switching lists. */
- list_del_init(&lock->list);
- list_add_tail(&lock->list, queue);
+ list_move_tail(&lock->list, queue);
spin_unlock(&res->spinlock);
mlog(0, "just reordered a local lock!\n");
u8 dead_node, u8 new_master)
{
int i;
- struct list_head *iter, *iter2, *bucket;
+ struct list_head *iter, *iter2;
+ struct hlist_node *hash_iter;
+ struct hlist_head *bucket;
+
struct dlm_lock_resource *res;
mlog_entry_void();
* for now we need to run the whole hash, clear
* the RECOVERING state and set the owner
* if necessary */
- for (i=0; i<DLM_HASH_SIZE; i++) {
- bucket = &(dlm->resources[i]);
- list_for_each(iter, bucket) {
- res = list_entry (iter, struct dlm_lock_resource, list);
+ for (i = 0; i < DLM_HASH_BUCKETS; i++) {
+ bucket = &(dlm->lockres_hash[i]);
+ hlist_for_each_entry(res, hash_iter, bucket, hash_node) {
if (res->state & DLM_LOCK_RES_RECOVERING) {
if (res->owner == dead_node) {
mlog(0, "(this=%u) res %.*s owner=%u "
} else
continue;
+ if (!list_empty(&res->recovering)) {
+ mlog(0, "%s:%.*s: lockres was "
+ "marked RECOVERING, owner=%u\n",
+ dlm->name, res->lockname.len,
+ res->lockname.name, res->owner);
+ list_del_init(&res->recovering);
+ }
spin_lock(&res->spinlock);
dlm_change_lockres_owner(dlm, res, new_master);
res->state &= ~DLM_LOCK_RES_RECOVERING;
static void dlm_do_local_recovery_cleanup(struct dlm_ctxt *dlm, u8 dead_node)
{
- struct list_head *iter;
+ struct hlist_node *iter;
struct dlm_lock_resource *res;
int i;
- struct list_head *bucket;
+ struct hlist_head *bucket;
struct dlm_lock *lock;
* can be kicked again to see if any ASTs or BASTs
* need to be fired as a result.
*/
- for (i=0; i<DLM_HASH_SIZE; i++) {
- bucket = &(dlm->resources[i]);
- list_for_each(iter, bucket) {
- res = list_entry (iter, struct dlm_lock_resource, list);
+ for (i = 0; i < DLM_HASH_BUCKETS; i++) {
+ bucket = &(dlm->lockres_hash[i]);
+ hlist_for_each_entry(res, iter, bucket, hash_node) {
/* always prune any $RECOVERY entries for dead nodes,
* otherwise hangs can occur during later recovery */
if (dlm_is_recovery_lock(res->lockname.name,
dlm->reco.new_master);
status = -EEXIST;
} else {
+ status = 0;
+
+ /* see if recovery was already finished elsewhere */
+ spin_lock(&dlm->spinlock);
+ if (dlm->reco.dead_node == O2NM_INVALID_NODE_NUM) {
+ status = -EINVAL;
+ mlog(0, "%s: got reco EX lock, but "
+ "node got recovered already\n", dlm->name);
+ if (dlm->reco.new_master != O2NM_INVALID_NODE_NUM) {
+ mlog(ML_ERROR, "%s: new master is %u "
+ "but no dead node!\n",
+ dlm->name, dlm->reco.new_master);
+ BUG();
+ }
+ }
+ spin_unlock(&dlm->spinlock);
+ }
+
+ /* if this node has actually become the recovery master,
+ * set the master and send the messages to begin recovery */
+ if (!status) {
+ mlog(0, "%s: dead=%u, this=%u, sending "
+ "begin_reco now\n", dlm->name,
+ dlm->reco.dead_node, dlm->node_num);
status = dlm_send_begin_reco_message(dlm,
dlm->reco.dead_node);
/* this always succeeds */
mlog(0, "%u not in domain/live_nodes map "
"so setting it in reco map manually\n",
br->dead_node);
- set_bit(br->dead_node, dlm->recovery_map);
+ /* force the recovery cleanup in __dlm_hb_node_down
+ * both of these will be cleared in a moment */
+ set_bit(br->dead_node, dlm->domain_map);
+ set_bit(br->dead_node, dlm->live_nodes_map);
__dlm_hb_node_down(dlm, br->dead_node);
}
spin_unlock(&dlm->spinlock);