ldlm_lock_decref_internal_nolock(lock, mode);
- if (ldlm_is_local(lock) &&
+ if ((ldlm_is_local(lock) || lock->l_req_mode == LCK_GROUP) &&
!lock->l_readers && !lock->l_writers) {
/* If this is a local lock on a server namespace and this was
* the last reference, cancel the lock.
- */
- CDEBUG(D_INFO, "forcing cancel of local lock\n");
- ldlm_set_cbpending(lock);
- }
-
- if (!lock->l_readers && !lock->l_writers &&
- (ldlm_is_cbpending(lock) || lock->l_req_mode == LCK_GROUP)) {
- /* If we received a blocked AST and this was the last reference,
- * run the callback.
+ *
* Group locks are special:
* They must not go in LRU, but they are not called back
* like non-group locks, instead they are manually released.
* they are manually released, so we remove them when they have
* no more reader or writer references. - LU-6368
*/
+ ldlm_set_cbpending(lock);
+ }
+
+ if (!lock->l_readers && !lock->l_writers && ldlm_is_cbpending(lock)) {
+ /* If we received a blocked AST and this was the last reference,
+ * run the callback.
+ */
LDLM_DEBUG(lock, "final decref done on cbpending lock");
LDLM_LOCK_GET(lock); /* dropped by bl thread */
return rc;
}
+static bool is_bl_done(struct ldlm_lock *lock)
+{
+ bool bl_done = true;
+
+ if (!ldlm_is_bl_done(lock)) {
+ lock_res_and_lock(lock);
+ bl_done = ldlm_is_bl_done(lock);
+ unlock_res_and_lock(lock);
+ }
+
+ return bl_done;
+}
+
/**
* Helper function to call blocking AST for LDLM lock \a lock in a
* "cancelling" mode.
} else {
LDLM_DEBUG(lock, "no blocking ast");
}
+ /* only canceller can set bl_done bit */
+ ldlm_set_bl_done(lock);
+ wake_up_all(&lock->l_waitq);
+ } else if (!ldlm_is_bl_done(lock)) {
+ struct l_wait_info lwi = { 0 };
+
+ /*
+ * The lock is guaranteed to have been canceled once
+ * returning from this function.
+ */
+ unlock_res_and_lock(lock);
+ l_wait_event(lock->l_waitq, is_bl_done(lock), &lwi);
+ lock_res_and_lock(lock);
}
- ldlm_set_bl_done(lock);
}
/**
struct ldlm_lock *lock;
LIST_HEAD(cancels);
- /* concurrent cancels on the same handle can happen */
- lock = ldlm_handle2lock_long(lockh, LDLM_FL_CANCELING);
+ lock = ldlm_handle2lock_long(lockh, 0);
if (!lock) {
LDLM_DEBUG_NOLOCK("lock is already being destroyed");
return 0;
}
+ lock_res_and_lock(lock);
+ /* Lock is being canceled and the caller doesn't want to wait */
+ if (ldlm_is_canceling(lock) && (cancel_flags & LCF_ASYNC)) {
+ unlock_res_and_lock(lock);
+ LDLM_LOCK_RELEASE(lock);
+ return 0;
+ }
+
+ ldlm_set_canceling(lock);
+ unlock_res_and_lock(lock);
+
rc = ldlm_cli_cancel_local(lock);
if (rc == LDLM_FL_LOCAL_ONLY || cancel_flags & LCF_LOCAL) {
LDLM_LOCK_RELEASE(lock);