ocfs2: Remove ->unblock lockres operation
[GitHub/mt8127/android_kernel_alcatel_ttab.git] / fs / ocfs2 / dlmglue.c
CommitLineData
ccd979bd
MF
1/* -*- mode: c; c-basic-offset: 8; -*-
2 * vim: noexpandtab sw=8 ts=8 sts=0:
3 *
4 * dlmglue.c
5 *
6 * Code which implements an OCFS2 specific interface to our DLM.
7 *
8 * Copyright (C) 2003, 2004 Oracle. All rights reserved.
9 *
10 * This program is free software; you can redistribute it and/or
11 * modify it under the terms of the GNU General Public
12 * License as published by the Free Software Foundation; either
13 * version 2 of the License, or (at your option) any later version.
14 *
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18 * General Public License for more details.
19 *
20 * You should have received a copy of the GNU General Public
21 * License along with this program; if not, write to the
22 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
23 * Boston, MA 021110-1307, USA.
24 */
25
26#include <linux/types.h>
27#include <linux/slab.h>
28#include <linux/highmem.h>
29#include <linux/mm.h>
30#include <linux/smp_lock.h>
31#include <linux/crc32.h>
32#include <linux/kthread.h>
33#include <linux/pagemap.h>
34#include <linux/debugfs.h>
35#include <linux/seq_file.h>
36
37#include <cluster/heartbeat.h>
38#include <cluster/nodemanager.h>
39#include <cluster/tcp.h>
40
41#include <dlm/dlmapi.h>
42
43#define MLOG_MASK_PREFIX ML_DLM_GLUE
44#include <cluster/masklog.h>
45
46#include "ocfs2.h"
47
48#include "alloc.h"
d680efe9 49#include "dcache.h"
ccd979bd
MF
50#include "dlmglue.h"
51#include "extent_map.h"
52#include "heartbeat.h"
53#include "inode.h"
54#include "journal.h"
55#include "slot_map.h"
56#include "super.h"
57#include "uptodate.h"
58#include "vote.h"
59
60#include "buffer_head_io.h"
61
62struct ocfs2_mask_waiter {
63 struct list_head mw_item;
64 int mw_status;
65 struct completion mw_complete;
66 unsigned long mw_mask;
67 unsigned long mw_goal;
68};
69
54a7e755
MF
70static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres);
71static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres);
ccd979bd 72
d680efe9 73/*
cc567d89 74 * Return value from ->downconvert_worker functions.
d680efe9 75 *
b5e500e2 76 * These control the precise actions of ocfs2_unblock_lock()
d680efe9
MF
77 * and ocfs2_process_blocked_lock()
78 *
79 */
80enum ocfs2_unblock_action {
81 UNBLOCK_CONTINUE = 0, /* Continue downconvert */
82 UNBLOCK_CONTINUE_POST = 1, /* Continue downconvert, fire
83 * ->post_unlock callback */
84 UNBLOCK_STOP_POST = 2, /* Do not downconvert, fire
85 * ->post_unlock() callback. */
86};
87
88struct ocfs2_unblock_ctl {
89 int requeue;
90 enum ocfs2_unblock_action unblock_action;
91};
92
810d5aeb
MF
93static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres,
94 int new_level);
95static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres);
96
cc567d89
MF
97static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres,
98 int blocking);
99
cc567d89
MF
100static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres,
101 int blocking);
d680efe9
MF
102
103static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb,
104 struct ocfs2_lock_res *lockres);
ccd979bd 105
f625c979
MF
106/*
107 * OCFS2 Lock Resource Operations
108 *
109 * These fine tune the behavior of the generic dlmglue locking infrastructure.
110 */
ccd979bd 111struct ocfs2_lock_res_ops {
54a7e755
MF
112 /*
113 * Translate an ocfs2_lock_res * into an ocfs2_super *. Define
114 * this callback if ->l_priv is not an ocfs2_super pointer
115 */
116 struct ocfs2_super * (*get_osb)(struct ocfs2_lock_res *);
b5e500e2 117
d680efe9 118 void (*post_unlock)(struct ocfs2_super *, struct ocfs2_lock_res *);
f625c979 119
16d5b956
MF
120 /*
121 * Allow a lock type to add checks to determine whether it is
122 * safe to downconvert a lock. Return 0 to re-queue the
123 * downconvert at a later time, nonzero to continue.
124 *
125 * For most locks, the default checks that there are no
126 * incompatible holders are sufficient.
127 *
128 * Called with the lockres spinlock held.
129 */
130 int (*check_downconvert)(struct ocfs2_lock_res *, int);
131
5ef0d4ea
MF
132 /*
133 * Allows a lock type to populate the lock value block. This
134 * is called on downconvert, and when we drop a lock.
135 *
136 * Locks that want to use this should set LOCK_TYPE_USES_LVB
137 * in the flags field.
138 *
139 * Called with the lockres spinlock held.
140 */
141 void (*set_lvb)(struct ocfs2_lock_res *);
142
cc567d89
MF
143 /*
144 * Called from the downconvert thread when it is determined
145 * that a lock will be downconverted. This is called without
146 * any locks held so the function can do work that might
147 * schedule (syncing out data, etc).
148 *
149 * This should return any one of the ocfs2_unblock_action
150 * values, depending on what it wants the thread to do.
151 */
152 int (*downconvert_worker)(struct ocfs2_lock_res *, int);
153
f625c979
MF
154 /*
155 * LOCK_TYPE_* flags which describe the specific requirements
156 * of a lock type. Descriptions of each individual flag follow.
157 */
158 int flags;
ccd979bd
MF
159};
160
f625c979
MF
161/*
162 * Some locks want to "refresh" potentially stale data when a
163 * meaningful (PRMODE or EXMODE) lock level is first obtained. If this
164 * flag is set, the OCFS2_LOCK_NEEDS_REFRESH flag will be set on the
165 * individual lockres l_flags member from the ast function. It is
166 * expected that the locking wrapper will clear the
167 * OCFS2_LOCK_NEEDS_REFRESH flag when done.
168 */
169#define LOCK_TYPE_REQUIRES_REFRESH 0x1
170
b80fc012 171/*
5ef0d4ea
MF
172 * Indicate that a lock type makes use of the lock value block. The
173 * ->set_lvb lock type callback must be defined.
b80fc012
MF
174 */
175#define LOCK_TYPE_USES_LVB 0x2
176
ccd979bd 177static struct ocfs2_lock_res_ops ocfs2_inode_rw_lops = {
54a7e755 178 .get_osb = ocfs2_get_inode_osb,
f625c979 179 .flags = 0,
ccd979bd
MF
180};
181
182static struct ocfs2_lock_res_ops ocfs2_inode_meta_lops = {
54a7e755 183 .get_osb = ocfs2_get_inode_osb,
810d5aeb
MF
184 .check_downconvert = ocfs2_check_meta_downconvert,
185 .set_lvb = ocfs2_set_meta_lvb,
b80fc012 186 .flags = LOCK_TYPE_REQUIRES_REFRESH|LOCK_TYPE_USES_LVB,
ccd979bd
MF
187};
188
ccd979bd 189static struct ocfs2_lock_res_ops ocfs2_inode_data_lops = {
54a7e755 190 .get_osb = ocfs2_get_inode_osb,
cc567d89 191 .downconvert_worker = ocfs2_data_convert_worker,
f625c979 192 .flags = 0,
ccd979bd
MF
193};
194
195static struct ocfs2_lock_res_ops ocfs2_super_lops = {
f625c979 196 .flags = LOCK_TYPE_REQUIRES_REFRESH,
ccd979bd
MF
197};
198
199static struct ocfs2_lock_res_ops ocfs2_rename_lops = {
f625c979 200 .flags = 0,
ccd979bd
MF
201};
202
d680efe9 203static struct ocfs2_lock_res_ops ocfs2_dentry_lops = {
54a7e755 204 .get_osb = ocfs2_get_dentry_osb,
d680efe9 205 .post_unlock = ocfs2_dentry_post_unlock,
cc567d89 206 .downconvert_worker = ocfs2_dentry_convert_worker,
f625c979 207 .flags = 0,
d680efe9
MF
208};
209
ccd979bd
MF
210static inline int ocfs2_is_inode_lock(struct ocfs2_lock_res *lockres)
211{
212 return lockres->l_type == OCFS2_LOCK_TYPE_META ||
213 lockres->l_type == OCFS2_LOCK_TYPE_DATA ||
214 lockres->l_type == OCFS2_LOCK_TYPE_RW;
215}
216
ccd979bd
MF
217static inline struct inode *ocfs2_lock_res_inode(struct ocfs2_lock_res *lockres)
218{
219 BUG_ON(!ocfs2_is_inode_lock(lockres));
220
221 return (struct inode *) lockres->l_priv;
222}
223
d680efe9
MF
224static inline struct ocfs2_dentry_lock *ocfs2_lock_res_dl(struct ocfs2_lock_res *lockres)
225{
226 BUG_ON(lockres->l_type != OCFS2_LOCK_TYPE_DENTRY);
227
228 return (struct ocfs2_dentry_lock *)lockres->l_priv;
229}
230
54a7e755
MF
231static inline struct ocfs2_super *ocfs2_get_lockres_osb(struct ocfs2_lock_res *lockres)
232{
233 if (lockres->l_ops->get_osb)
234 return lockres->l_ops->get_osb(lockres);
235
236 return (struct ocfs2_super *)lockres->l_priv;
237}
238
ccd979bd
MF
239static int ocfs2_lock_create(struct ocfs2_super *osb,
240 struct ocfs2_lock_res *lockres,
241 int level,
242 int dlm_flags);
243static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres,
244 int wanted);
245static void ocfs2_cluster_unlock(struct ocfs2_super *osb,
246 struct ocfs2_lock_res *lockres,
247 int level);
248static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres);
249static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres);
250static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres);
251static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres, int level);
252static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb,
253 struct ocfs2_lock_res *lockres);
254static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
255 int convert);
256#define ocfs2_log_dlm_error(_func, _stat, _lockres) do { \
257 mlog(ML_ERROR, "Dlm error \"%s\" while calling %s on " \
258 "resource %s: %s\n", dlm_errname(_stat), _func, \
259 _lockres->l_name, dlm_errmsg(_stat)); \
260} while (0)
261static void ocfs2_vote_on_unlock(struct ocfs2_super *osb,
262 struct ocfs2_lock_res *lockres);
263static int ocfs2_meta_lock_update(struct inode *inode,
264 struct buffer_head **bh);
265static void ocfs2_drop_osb_locks(struct ocfs2_super *osb);
266static inline int ocfs2_highest_compat_lock_level(int level);
ccd979bd 267
ccd979bd
MF
268static void ocfs2_build_lock_name(enum ocfs2_lock_type type,
269 u64 blkno,
270 u32 generation,
271 char *name)
272{
273 int len;
274
275 mlog_entry_void();
276
277 BUG_ON(type >= OCFS2_NUM_LOCK_TYPES);
278
b0697053
MF
279 len = snprintf(name, OCFS2_LOCK_ID_MAX_LEN, "%c%s%016llx%08x",
280 ocfs2_lock_type_char(type), OCFS2_LOCK_ID_PAD,
281 (long long)blkno, generation);
ccd979bd
MF
282
283 BUG_ON(len != (OCFS2_LOCK_ID_MAX_LEN - 1));
284
285 mlog(0, "built lock resource with name: %s\n", name);
286
287 mlog_exit_void();
288}
289
34af946a 290static DEFINE_SPINLOCK(ocfs2_dlm_tracking_lock);
ccd979bd
MF
291
292static void ocfs2_add_lockres_tracking(struct ocfs2_lock_res *res,
293 struct ocfs2_dlm_debug *dlm_debug)
294{
295 mlog(0, "Add tracking for lockres %s\n", res->l_name);
296
297 spin_lock(&ocfs2_dlm_tracking_lock);
298 list_add(&res->l_debug_list, &dlm_debug->d_lockres_tracking);
299 spin_unlock(&ocfs2_dlm_tracking_lock);
300}
301
302static void ocfs2_remove_lockres_tracking(struct ocfs2_lock_res *res)
303{
304 spin_lock(&ocfs2_dlm_tracking_lock);
305 if (!list_empty(&res->l_debug_list))
306 list_del_init(&res->l_debug_list);
307 spin_unlock(&ocfs2_dlm_tracking_lock);
308}
309
310static void ocfs2_lock_res_init_common(struct ocfs2_super *osb,
311 struct ocfs2_lock_res *res,
312 enum ocfs2_lock_type type,
ccd979bd
MF
313 struct ocfs2_lock_res_ops *ops,
314 void *priv)
315{
ccd979bd
MF
316 res->l_type = type;
317 res->l_ops = ops;
318 res->l_priv = priv;
319
320 res->l_level = LKM_IVMODE;
321 res->l_requested = LKM_IVMODE;
322 res->l_blocking = LKM_IVMODE;
323 res->l_action = OCFS2_AST_INVALID;
324 res->l_unlock_action = OCFS2_UNLOCK_INVALID;
325
326 res->l_flags = OCFS2_LOCK_INITIALIZED;
327
328 ocfs2_add_lockres_tracking(res, osb->osb_dlm_debug);
329}
330
331void ocfs2_lock_res_init_once(struct ocfs2_lock_res *res)
332{
333 /* This also clears out the lock status block */
334 memset(res, 0, sizeof(struct ocfs2_lock_res));
335 spin_lock_init(&res->l_lock);
336 init_waitqueue_head(&res->l_event);
337 INIT_LIST_HEAD(&res->l_blocked_list);
338 INIT_LIST_HEAD(&res->l_mask_waiters);
339}
340
341void ocfs2_inode_lock_res_init(struct ocfs2_lock_res *res,
342 enum ocfs2_lock_type type,
24c19ef4 343 unsigned int generation,
ccd979bd
MF
344 struct inode *inode)
345{
346 struct ocfs2_lock_res_ops *ops;
347
348 switch(type) {
349 case OCFS2_LOCK_TYPE_RW:
350 ops = &ocfs2_inode_rw_lops;
351 break;
352 case OCFS2_LOCK_TYPE_META:
353 ops = &ocfs2_inode_meta_lops;
354 break;
355 case OCFS2_LOCK_TYPE_DATA:
356 ops = &ocfs2_inode_data_lops;
357 break;
358 default:
359 mlog_bug_on_msg(1, "type: %d\n", type);
360 ops = NULL; /* thanks, gcc */
361 break;
362 };
363
d680efe9 364 ocfs2_build_lock_name(type, OCFS2_I(inode)->ip_blkno,
24c19ef4 365 generation, res->l_name);
d680efe9
MF
366 ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), res, type, ops, inode);
367}
368
54a7e755
MF
369static struct ocfs2_super *ocfs2_get_inode_osb(struct ocfs2_lock_res *lockres)
370{
371 struct inode *inode = ocfs2_lock_res_inode(lockres);
372
373 return OCFS2_SB(inode->i_sb);
374}
375
d680efe9
MF
376static __u64 ocfs2_get_dentry_lock_ino(struct ocfs2_lock_res *lockres)
377{
378 __be64 inode_blkno_be;
379
380 memcpy(&inode_blkno_be, &lockres->l_name[OCFS2_DENTRY_LOCK_INO_START],
381 sizeof(__be64));
382
383 return be64_to_cpu(inode_blkno_be);
384}
385
54a7e755
MF
386static struct ocfs2_super *ocfs2_get_dentry_osb(struct ocfs2_lock_res *lockres)
387{
388 struct ocfs2_dentry_lock *dl = lockres->l_priv;
389
390 return OCFS2_SB(dl->dl_inode->i_sb);
391}
392
d680efe9
MF
393void ocfs2_dentry_lock_res_init(struct ocfs2_dentry_lock *dl,
394 u64 parent, struct inode *inode)
395{
396 int len;
397 u64 inode_blkno = OCFS2_I(inode)->ip_blkno;
398 __be64 inode_blkno_be = cpu_to_be64(inode_blkno);
399 struct ocfs2_lock_res *lockres = &dl->dl_lockres;
400
401 ocfs2_lock_res_init_once(lockres);
402
403 /*
404 * Unfortunately, the standard lock naming scheme won't work
405 * here because we have two 16 byte values to use. Instead,
406 * we'll stuff the inode number as a binary value. We still
407 * want error prints to show something without garbling the
408 * display, so drop a null byte in there before the inode
409 * number. A future version of OCFS2 will likely use all
410 * binary lock names. The stringified names have been a
411 * tremendous aid in debugging, but now that the debugfs
412 * interface exists, we can mangle things there if need be.
413 *
414 * NOTE: We also drop the standard "pad" value (the total lock
415 * name size stays the same though - the last part is all
416 * zeros due to the memset in ocfs2_lock_res_init_once()
417 */
418 len = snprintf(lockres->l_name, OCFS2_DENTRY_LOCK_INO_START,
419 "%c%016llx",
420 ocfs2_lock_type_char(OCFS2_LOCK_TYPE_DENTRY),
421 (long long)parent);
422
423 BUG_ON(len != (OCFS2_DENTRY_LOCK_INO_START - 1));
424
425 memcpy(&lockres->l_name[OCFS2_DENTRY_LOCK_INO_START], &inode_blkno_be,
426 sizeof(__be64));
427
428 ocfs2_lock_res_init_common(OCFS2_SB(inode->i_sb), lockres,
429 OCFS2_LOCK_TYPE_DENTRY, &ocfs2_dentry_lops,
430 dl);
ccd979bd
MF
431}
432
433static void ocfs2_super_lock_res_init(struct ocfs2_lock_res *res,
434 struct ocfs2_super *osb)
435{
436 /* Superblock lockres doesn't come from a slab so we call init
437 * once on it manually. */
438 ocfs2_lock_res_init_once(res);
d680efe9
MF
439 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_SUPER, OCFS2_SUPER_BLOCK_BLKNO,
440 0, res->l_name);
ccd979bd 441 ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_SUPER,
ccd979bd
MF
442 &ocfs2_super_lops, osb);
443}
444
445static void ocfs2_rename_lock_res_init(struct ocfs2_lock_res *res,
446 struct ocfs2_super *osb)
447{
448 /* Rename lockres doesn't come from a slab so we call init
449 * once on it manually. */
450 ocfs2_lock_res_init_once(res);
d680efe9
MF
451 ocfs2_build_lock_name(OCFS2_LOCK_TYPE_RENAME, 0, 0, res->l_name);
452 ocfs2_lock_res_init_common(osb, res, OCFS2_LOCK_TYPE_RENAME,
ccd979bd
MF
453 &ocfs2_rename_lops, osb);
454}
455
456void ocfs2_lock_res_free(struct ocfs2_lock_res *res)
457{
458 mlog_entry_void();
459
460 if (!(res->l_flags & OCFS2_LOCK_INITIALIZED))
461 return;
462
463 ocfs2_remove_lockres_tracking(res);
464
465 mlog_bug_on_msg(!list_empty(&res->l_blocked_list),
466 "Lockres %s is on the blocked list\n",
467 res->l_name);
468 mlog_bug_on_msg(!list_empty(&res->l_mask_waiters),
469 "Lockres %s has mask waiters pending\n",
470 res->l_name);
471 mlog_bug_on_msg(spin_is_locked(&res->l_lock),
472 "Lockres %s is locked\n",
473 res->l_name);
474 mlog_bug_on_msg(res->l_ro_holders,
475 "Lockres %s has %u ro holders\n",
476 res->l_name, res->l_ro_holders);
477 mlog_bug_on_msg(res->l_ex_holders,
478 "Lockres %s has %u ex holders\n",
479 res->l_name, res->l_ex_holders);
480
481 /* Need to clear out the lock status block for the dlm */
482 memset(&res->l_lksb, 0, sizeof(res->l_lksb));
483
484 res->l_flags = 0UL;
485 mlog_exit_void();
486}
487
488static inline void ocfs2_inc_holders(struct ocfs2_lock_res *lockres,
489 int level)
490{
491 mlog_entry_void();
492
493 BUG_ON(!lockres);
494
495 switch(level) {
496 case LKM_EXMODE:
497 lockres->l_ex_holders++;
498 break;
499 case LKM_PRMODE:
500 lockres->l_ro_holders++;
501 break;
502 default:
503 BUG();
504 }
505
506 mlog_exit_void();
507}
508
509static inline void ocfs2_dec_holders(struct ocfs2_lock_res *lockres,
510 int level)
511{
512 mlog_entry_void();
513
514 BUG_ON(!lockres);
515
516 switch(level) {
517 case LKM_EXMODE:
518 BUG_ON(!lockres->l_ex_holders);
519 lockres->l_ex_holders--;
520 break;
521 case LKM_PRMODE:
522 BUG_ON(!lockres->l_ro_holders);
523 lockres->l_ro_holders--;
524 break;
525 default:
526 BUG();
527 }
528 mlog_exit_void();
529}
530
531/* WARNING: This function lives in a world where the only three lock
532 * levels are EX, PR, and NL. It *will* have to be adjusted when more
533 * lock types are added. */
534static inline int ocfs2_highest_compat_lock_level(int level)
535{
536 int new_level = LKM_EXMODE;
537
538 if (level == LKM_EXMODE)
539 new_level = LKM_NLMODE;
540 else if (level == LKM_PRMODE)
541 new_level = LKM_PRMODE;
542 return new_level;
543}
544
545static void lockres_set_flags(struct ocfs2_lock_res *lockres,
546 unsigned long newflags)
547{
548 struct list_head *pos, *tmp;
549 struct ocfs2_mask_waiter *mw;
550
551 assert_spin_locked(&lockres->l_lock);
552
553 lockres->l_flags = newflags;
554
555 list_for_each_safe(pos, tmp, &lockres->l_mask_waiters) {
556 mw = list_entry(pos, struct ocfs2_mask_waiter, mw_item);
557 if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal)
558 continue;
559
560 list_del_init(&mw->mw_item);
561 mw->mw_status = 0;
562 complete(&mw->mw_complete);
563 }
564}
565static void lockres_or_flags(struct ocfs2_lock_res *lockres, unsigned long or)
566{
567 lockres_set_flags(lockres, lockres->l_flags | or);
568}
569static void lockres_clear_flags(struct ocfs2_lock_res *lockres,
570 unsigned long clear)
571{
572 lockres_set_flags(lockres, lockres->l_flags & ~clear);
573}
574
575static inline void ocfs2_generic_handle_downconvert_action(struct ocfs2_lock_res *lockres)
576{
577 mlog_entry_void();
578
579 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
580 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED));
581 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
582 BUG_ON(lockres->l_blocking <= LKM_NLMODE);
583
584 lockres->l_level = lockres->l_requested;
585 if (lockres->l_level <=
586 ocfs2_highest_compat_lock_level(lockres->l_blocking)) {
587 lockres->l_blocking = LKM_NLMODE;
588 lockres_clear_flags(lockres, OCFS2_LOCK_BLOCKED);
589 }
590 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
591
592 mlog_exit_void();
593}
594
595static inline void ocfs2_generic_handle_convert_action(struct ocfs2_lock_res *lockres)
596{
597 mlog_entry_void();
598
599 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BUSY));
600 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_ATTACHED));
601
602 /* Convert from RO to EX doesn't really need anything as our
603 * information is already up to data. Convert from NL to
604 * *anything* however should mark ourselves as needing an
605 * update */
f625c979
MF
606 if (lockres->l_level == LKM_NLMODE &&
607 lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
ccd979bd
MF
608 lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
609
610 lockres->l_level = lockres->l_requested;
611 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
612
613 mlog_exit_void();
614}
615
616static inline void ocfs2_generic_handle_attach_action(struct ocfs2_lock_res *lockres)
617{
618 mlog_entry_void();
619
620 BUG_ON((!lockres->l_flags & OCFS2_LOCK_BUSY));
621 BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED);
622
623 if (lockres->l_requested > LKM_NLMODE &&
f625c979
MF
624 !(lockres->l_flags & OCFS2_LOCK_LOCAL) &&
625 lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
ccd979bd
MF
626 lockres_or_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
627
628 lockres->l_level = lockres->l_requested;
629 lockres_or_flags(lockres, OCFS2_LOCK_ATTACHED);
630 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
631
632 mlog_exit_void();
633}
634
ccd979bd
MF
635static int ocfs2_generic_handle_bast(struct ocfs2_lock_res *lockres,
636 int level)
637{
638 int needs_downconvert = 0;
639 mlog_entry_void();
640
641 assert_spin_locked(&lockres->l_lock);
642
643 lockres_or_flags(lockres, OCFS2_LOCK_BLOCKED);
644
645 if (level > lockres->l_blocking) {
646 /* only schedule a downconvert if we haven't already scheduled
647 * one that goes low enough to satisfy the level we're
648 * blocking. this also catches the case where we get
649 * duplicate BASTs */
650 if (ocfs2_highest_compat_lock_level(level) <
651 ocfs2_highest_compat_lock_level(lockres->l_blocking))
652 needs_downconvert = 1;
653
654 lockres->l_blocking = level;
655 }
656
657 mlog_exit(needs_downconvert);
658 return needs_downconvert;
659}
660
aa2623ad 661static void ocfs2_blocking_ast(void *opaque, int level)
ccd979bd 662{
aa2623ad
MF
663 struct ocfs2_lock_res *lockres = opaque;
664 struct ocfs2_super *osb = ocfs2_get_lockres_osb(lockres);
ccd979bd
MF
665 int needs_downconvert;
666 unsigned long flags;
667
ccd979bd
MF
668 BUG_ON(level <= LKM_NLMODE);
669
aa2623ad
MF
670 mlog(0, "BAST fired for lockres %s, blocking %d, level %d type %s\n",
671 lockres->l_name, level, lockres->l_level,
672 ocfs2_lock_type_string(lockres->l_type));
673
ccd979bd
MF
674 spin_lock_irqsave(&lockres->l_lock, flags);
675 needs_downconvert = ocfs2_generic_handle_bast(lockres, level);
676 if (needs_downconvert)
677 ocfs2_schedule_blocked_lock(osb, lockres);
678 spin_unlock_irqrestore(&lockres->l_lock, flags);
679
d680efe9
MF
680 wake_up(&lockres->l_event);
681
ccd979bd 682 ocfs2_kick_vote_thread(osb);
ccd979bd
MF
683}
684
e92d57df 685static void ocfs2_locking_ast(void *opaque)
ccd979bd 686{
e92d57df 687 struct ocfs2_lock_res *lockres = opaque;
ccd979bd
MF
688 struct dlm_lockstatus *lksb = &lockres->l_lksb;
689 unsigned long flags;
690
691 spin_lock_irqsave(&lockres->l_lock, flags);
692
693 if (lksb->status != DLM_NORMAL) {
694 mlog(ML_ERROR, "lockres %s: lksb status value of %u!\n",
695 lockres->l_name, lksb->status);
696 spin_unlock_irqrestore(&lockres->l_lock, flags);
697 return;
698 }
699
700 switch(lockres->l_action) {
701 case OCFS2_AST_ATTACH:
702 ocfs2_generic_handle_attach_action(lockres);
e92d57df 703 lockres_clear_flags(lockres, OCFS2_LOCK_LOCAL);
ccd979bd
MF
704 break;
705 case OCFS2_AST_CONVERT:
706 ocfs2_generic_handle_convert_action(lockres);
707 break;
708 case OCFS2_AST_DOWNCONVERT:
709 ocfs2_generic_handle_downconvert_action(lockres);
710 break;
711 default:
e92d57df
MF
712 mlog(ML_ERROR, "lockres %s: ast fired with invalid action: %u "
713 "lockres flags = 0x%lx, unlock action: %u\n",
714 lockres->l_name, lockres->l_action, lockres->l_flags,
715 lockres->l_unlock_action);
ccd979bd
MF
716 BUG();
717 }
718
ccd979bd
MF
719 /* set it to something invalid so if we get called again we
720 * can catch it. */
721 lockres->l_action = OCFS2_AST_INVALID;
ccd979bd
MF
722
723 wake_up(&lockres->l_event);
d680efe9 724 spin_unlock_irqrestore(&lockres->l_lock, flags);
ccd979bd
MF
725}
726
ccd979bd
MF
727static inline void ocfs2_recover_from_dlm_error(struct ocfs2_lock_res *lockres,
728 int convert)
729{
730 unsigned long flags;
731
732 mlog_entry_void();
733 spin_lock_irqsave(&lockres->l_lock, flags);
734 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
735 if (convert)
736 lockres->l_action = OCFS2_AST_INVALID;
737 else
738 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
739 spin_unlock_irqrestore(&lockres->l_lock, flags);
740
741 wake_up(&lockres->l_event);
742 mlog_exit_void();
743}
744
745/* Note: If we detect another process working on the lock (i.e.,
746 * OCFS2_LOCK_BUSY), we'll bail out returning 0. It's up to the caller
747 * to do the right thing in that case.
748 */
749static int ocfs2_lock_create(struct ocfs2_super *osb,
750 struct ocfs2_lock_res *lockres,
751 int level,
752 int dlm_flags)
753{
754 int ret = 0;
755 enum dlm_status status;
756 unsigned long flags;
757
758 mlog_entry_void();
759
760 mlog(0, "lock %s, level = %d, flags = %d\n", lockres->l_name, level,
761 dlm_flags);
762
763 spin_lock_irqsave(&lockres->l_lock, flags);
764 if ((lockres->l_flags & OCFS2_LOCK_ATTACHED) ||
765 (lockres->l_flags & OCFS2_LOCK_BUSY)) {
766 spin_unlock_irqrestore(&lockres->l_lock, flags);
767 goto bail;
768 }
769
770 lockres->l_action = OCFS2_AST_ATTACH;
771 lockres->l_requested = level;
772 lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
773 spin_unlock_irqrestore(&lockres->l_lock, flags);
774
775 status = dlmlock(osb->dlm,
776 level,
777 &lockres->l_lksb,
778 dlm_flags,
779 lockres->l_name,
f0681062 780 OCFS2_LOCK_ID_MAX_LEN - 1,
e92d57df 781 ocfs2_locking_ast,
ccd979bd 782 lockres,
aa2623ad 783 ocfs2_blocking_ast);
ccd979bd
MF
784 if (status != DLM_NORMAL) {
785 ocfs2_log_dlm_error("dlmlock", status, lockres);
786 ret = -EINVAL;
787 ocfs2_recover_from_dlm_error(lockres, 1);
788 }
789
790 mlog(0, "lock %s, successfull return from dlmlock\n", lockres->l_name);
791
792bail:
793 mlog_exit(ret);
794 return ret;
795}
796
797static inline int ocfs2_check_wait_flag(struct ocfs2_lock_res *lockres,
798 int flag)
799{
800 unsigned long flags;
801 int ret;
802
803 spin_lock_irqsave(&lockres->l_lock, flags);
804 ret = lockres->l_flags & flag;
805 spin_unlock_irqrestore(&lockres->l_lock, flags);
806
807 return ret;
808}
809
810static inline void ocfs2_wait_on_busy_lock(struct ocfs2_lock_res *lockres)
811
812{
813 wait_event(lockres->l_event,
814 !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_BUSY));
815}
816
817static inline void ocfs2_wait_on_refreshing_lock(struct ocfs2_lock_res *lockres)
818
819{
820 wait_event(lockres->l_event,
821 !ocfs2_check_wait_flag(lockres, OCFS2_LOCK_REFRESHING));
822}
823
824/* predict what lock level we'll be dropping down to on behalf
825 * of another node, and return true if the currently wanted
826 * level will be compatible with it. */
827static inline int ocfs2_may_continue_on_blocked_lock(struct ocfs2_lock_res *lockres,
828 int wanted)
829{
830 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
831
832 return wanted <= ocfs2_highest_compat_lock_level(lockres->l_blocking);
833}
834
835static void ocfs2_init_mask_waiter(struct ocfs2_mask_waiter *mw)
836{
837 INIT_LIST_HEAD(&mw->mw_item);
838 init_completion(&mw->mw_complete);
839}
840
841static int ocfs2_wait_for_mask(struct ocfs2_mask_waiter *mw)
842{
843 wait_for_completion(&mw->mw_complete);
844 /* Re-arm the completion in case we want to wait on it again */
845 INIT_COMPLETION(mw->mw_complete);
846 return mw->mw_status;
847}
848
849static void lockres_add_mask_waiter(struct ocfs2_lock_res *lockres,
850 struct ocfs2_mask_waiter *mw,
851 unsigned long mask,
852 unsigned long goal)
853{
854 BUG_ON(!list_empty(&mw->mw_item));
855
856 assert_spin_locked(&lockres->l_lock);
857
858 list_add_tail(&mw->mw_item, &lockres->l_mask_waiters);
859 mw->mw_mask = mask;
860 mw->mw_goal = goal;
861}
862
863/* returns 0 if the mw that was removed was already satisfied, -EBUSY
864 * if the mask still hadn't reached its goal */
865static int lockres_remove_mask_waiter(struct ocfs2_lock_res *lockres,
866 struct ocfs2_mask_waiter *mw)
867{
868 unsigned long flags;
869 int ret = 0;
870
871 spin_lock_irqsave(&lockres->l_lock, flags);
872 if (!list_empty(&mw->mw_item)) {
873 if ((lockres->l_flags & mw->mw_mask) != mw->mw_goal)
874 ret = -EBUSY;
875
876 list_del_init(&mw->mw_item);
877 init_completion(&mw->mw_complete);
878 }
879 spin_unlock_irqrestore(&lockres->l_lock, flags);
880
881 return ret;
882
883}
884
885static int ocfs2_cluster_lock(struct ocfs2_super *osb,
886 struct ocfs2_lock_res *lockres,
887 int level,
888 int lkm_flags,
889 int arg_flags)
890{
891 struct ocfs2_mask_waiter mw;
892 enum dlm_status status;
893 int wait, catch_signals = !(osb->s_mount_opt & OCFS2_MOUNT_NOINTR);
894 int ret = 0; /* gcc doesn't realize wait = 1 guarantees ret is set */
895 unsigned long flags;
896
897 mlog_entry_void();
898
899 ocfs2_init_mask_waiter(&mw);
900
b80fc012
MF
901 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB)
902 lkm_flags |= LKM_VALBLK;
903
ccd979bd
MF
904again:
905 wait = 0;
906
907 if (catch_signals && signal_pending(current)) {
908 ret = -ERESTARTSYS;
909 goto out;
910 }
911
912 spin_lock_irqsave(&lockres->l_lock, flags);
913
914 mlog_bug_on_msg(lockres->l_flags & OCFS2_LOCK_FREEING,
915 "Cluster lock called on freeing lockres %s! flags "
916 "0x%lx\n", lockres->l_name, lockres->l_flags);
917
918 /* We only compare against the currently granted level
919 * here. If the lock is blocked waiting on a downconvert,
920 * we'll get caught below. */
921 if (lockres->l_flags & OCFS2_LOCK_BUSY &&
922 level > lockres->l_level) {
923 /* is someone sitting in dlm_lock? If so, wait on
924 * them. */
925 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BUSY, 0);
926 wait = 1;
927 goto unlock;
928 }
929
930 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
931 /* lock has not been created yet. */
932 spin_unlock_irqrestore(&lockres->l_lock, flags);
933
934 ret = ocfs2_lock_create(osb, lockres, LKM_NLMODE, 0);
935 if (ret < 0) {
936 mlog_errno(ret);
937 goto out;
938 }
939 goto again;
940 }
941
942 if (lockres->l_flags & OCFS2_LOCK_BLOCKED &&
943 !ocfs2_may_continue_on_blocked_lock(lockres, level)) {
944 /* is the lock is currently blocked on behalf of
945 * another node */
946 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_BLOCKED, 0);
947 wait = 1;
948 goto unlock;
949 }
950
951 if (level > lockres->l_level) {
952 if (lockres->l_action != OCFS2_AST_INVALID)
953 mlog(ML_ERROR, "lockres %s has action %u pending\n",
954 lockres->l_name, lockres->l_action);
955
956 lockres->l_action = OCFS2_AST_CONVERT;
957 lockres->l_requested = level;
958 lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
959 spin_unlock_irqrestore(&lockres->l_lock, flags);
960
961 BUG_ON(level == LKM_IVMODE);
962 BUG_ON(level == LKM_NLMODE);
963
964 mlog(0, "lock %s, convert from %d to level = %d\n",
965 lockres->l_name, lockres->l_level, level);
966
967 /* call dlm_lock to upgrade lock now */
968 status = dlmlock(osb->dlm,
969 level,
970 &lockres->l_lksb,
b80fc012 971 lkm_flags|LKM_CONVERT,
ccd979bd 972 lockres->l_name,
f0681062 973 OCFS2_LOCK_ID_MAX_LEN - 1,
e92d57df 974 ocfs2_locking_ast,
ccd979bd 975 lockres,
aa2623ad 976 ocfs2_blocking_ast);
ccd979bd
MF
977 if (status != DLM_NORMAL) {
978 if ((lkm_flags & LKM_NOQUEUE) &&
979 (status == DLM_NOTQUEUED))
980 ret = -EAGAIN;
981 else {
982 ocfs2_log_dlm_error("dlmlock", status,
983 lockres);
984 ret = -EINVAL;
985 }
986 ocfs2_recover_from_dlm_error(lockres, 1);
987 goto out;
988 }
989
990 mlog(0, "lock %s, successfull return from dlmlock\n",
991 lockres->l_name);
992
993 /* At this point we've gone inside the dlm and need to
994 * complete our work regardless. */
995 catch_signals = 0;
996
997 /* wait for busy to clear and carry on */
998 goto again;
999 }
1000
1001 /* Ok, if we get here then we're good to go. */
1002 ocfs2_inc_holders(lockres, level);
1003
1004 ret = 0;
1005unlock:
1006 spin_unlock_irqrestore(&lockres->l_lock, flags);
1007out:
1008 /*
1009 * This is helping work around a lock inversion between the page lock
1010 * and dlm locks. One path holds the page lock while calling aops
1011 * which block acquiring dlm locks. The voting thread holds dlm
1012 * locks while acquiring page locks while down converting data locks.
1013 * This block is helping an aop path notice the inversion and back
1014 * off to unlock its page lock before trying the dlm lock again.
1015 */
1016 if (wait && arg_flags & OCFS2_LOCK_NONBLOCK &&
1017 mw.mw_mask & (OCFS2_LOCK_BUSY|OCFS2_LOCK_BLOCKED)) {
1018 wait = 0;
1019 if (lockres_remove_mask_waiter(lockres, &mw))
1020 ret = -EAGAIN;
1021 else
1022 goto again;
1023 }
1024 if (wait) {
1025 ret = ocfs2_wait_for_mask(&mw);
1026 if (ret == 0)
1027 goto again;
1028 mlog_errno(ret);
1029 }
1030
1031 mlog_exit(ret);
1032 return ret;
1033}
1034
1035static void ocfs2_cluster_unlock(struct ocfs2_super *osb,
1036 struct ocfs2_lock_res *lockres,
1037 int level)
1038{
1039 unsigned long flags;
1040
1041 mlog_entry_void();
1042 spin_lock_irqsave(&lockres->l_lock, flags);
1043 ocfs2_dec_holders(lockres, level);
1044 ocfs2_vote_on_unlock(osb, lockres);
1045 spin_unlock_irqrestore(&lockres->l_lock, flags);
1046 mlog_exit_void();
1047}
1048
d680efe9
MF
1049int ocfs2_create_new_lock(struct ocfs2_super *osb,
1050 struct ocfs2_lock_res *lockres,
24c19ef4
MF
1051 int ex,
1052 int local)
ccd979bd 1053{
d680efe9 1054 int level = ex ? LKM_EXMODE : LKM_PRMODE;
ccd979bd 1055 unsigned long flags;
24c19ef4 1056 int lkm_flags = local ? LKM_LOCAL : 0;
ccd979bd
MF
1057
1058 spin_lock_irqsave(&lockres->l_lock, flags);
1059 BUG_ON(lockres->l_flags & OCFS2_LOCK_ATTACHED);
1060 lockres_or_flags(lockres, OCFS2_LOCK_LOCAL);
1061 spin_unlock_irqrestore(&lockres->l_lock, flags);
1062
24c19ef4 1063 return ocfs2_lock_create(osb, lockres, level, lkm_flags);
ccd979bd
MF
1064}
1065
1066/* Grants us an EX lock on the data and metadata resources, skipping
1067 * the normal cluster directory lookup. Use this ONLY on newly created
1068 * inodes which other nodes can't possibly see, and which haven't been
1069 * hashed in the inode hash yet. This can give us a good performance
1070 * increase as it'll skip the network broadcast normally associated
1071 * with creating a new lock resource. */
1072int ocfs2_create_new_inode_locks(struct inode *inode)
1073{
1074 int ret;
d680efe9 1075 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
ccd979bd
MF
1076
1077 BUG_ON(!inode);
1078 BUG_ON(!ocfs2_inode_is_new(inode));
1079
1080 mlog_entry_void();
1081
b0697053 1082 mlog(0, "Inode %llu\n", (unsigned long long)OCFS2_I(inode)->ip_blkno);
ccd979bd
MF
1083
1084 /* NOTE: That we don't increment any of the holder counts, nor
1085 * do we add anything to a journal handle. Since this is
1086 * supposed to be a new inode which the cluster doesn't know
1087 * about yet, there is no need to. As far as the LVB handling
1088 * is concerned, this is basically like acquiring an EX lock
1089 * on a resource which has an invalid one -- we'll set it
1090 * valid when we release the EX. */
1091
24c19ef4 1092 ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_rw_lockres, 1, 1);
ccd979bd
MF
1093 if (ret) {
1094 mlog_errno(ret);
1095 goto bail;
1096 }
1097
24c19ef4
MF
1098 /*
1099 * We don't want to use LKM_LOCAL on a meta data lock as they
1100 * don't use a generation in their lock names.
1101 */
1102 ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_meta_lockres, 1, 0);
ccd979bd
MF
1103 if (ret) {
1104 mlog_errno(ret);
1105 goto bail;
1106 }
1107
24c19ef4 1108 ret = ocfs2_create_new_lock(osb, &OCFS2_I(inode)->ip_data_lockres, 1, 1);
ccd979bd
MF
1109 if (ret) {
1110 mlog_errno(ret);
1111 goto bail;
1112 }
1113
1114bail:
1115 mlog_exit(ret);
1116 return ret;
1117}
1118
1119int ocfs2_rw_lock(struct inode *inode, int write)
1120{
1121 int status, level;
1122 struct ocfs2_lock_res *lockres;
1123
1124 BUG_ON(!inode);
1125
1126 mlog_entry_void();
1127
b0697053
MF
1128 mlog(0, "inode %llu take %s RW lock\n",
1129 (unsigned long long)OCFS2_I(inode)->ip_blkno,
ccd979bd
MF
1130 write ? "EXMODE" : "PRMODE");
1131
1132 lockres = &OCFS2_I(inode)->ip_rw_lockres;
1133
1134 level = write ? LKM_EXMODE : LKM_PRMODE;
1135
1136 status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, level, 0,
1137 0);
1138 if (status < 0)
1139 mlog_errno(status);
1140
1141 mlog_exit(status);
1142 return status;
1143}
1144
1145void ocfs2_rw_unlock(struct inode *inode, int write)
1146{
1147 int level = write ? LKM_EXMODE : LKM_PRMODE;
1148 struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_rw_lockres;
1149
1150 mlog_entry_void();
1151
b0697053
MF
1152 mlog(0, "inode %llu drop %s RW lock\n",
1153 (unsigned long long)OCFS2_I(inode)->ip_blkno,
ccd979bd
MF
1154 write ? "EXMODE" : "PRMODE");
1155
1156 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level);
1157
1158 mlog_exit_void();
1159}
1160
1161int ocfs2_data_lock_full(struct inode *inode,
1162 int write,
1163 int arg_flags)
1164{
1165 int status = 0, level;
1166 struct ocfs2_lock_res *lockres;
1167
1168 BUG_ON(!inode);
1169
1170 mlog_entry_void();
1171
b0697053
MF
1172 mlog(0, "inode %llu take %s DATA lock\n",
1173 (unsigned long long)OCFS2_I(inode)->ip_blkno,
ccd979bd
MF
1174 write ? "EXMODE" : "PRMODE");
1175
1176 /* We'll allow faking a readonly data lock for
1177 * rodevices. */
1178 if (ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb))) {
1179 if (write) {
1180 status = -EROFS;
1181 mlog_errno(status);
1182 }
1183 goto out;
1184 }
1185
1186 lockres = &OCFS2_I(inode)->ip_data_lockres;
1187
1188 level = write ? LKM_EXMODE : LKM_PRMODE;
1189
1190 status = ocfs2_cluster_lock(OCFS2_SB(inode->i_sb), lockres, level,
1191 0, arg_flags);
1192 if (status < 0 && status != -EAGAIN)
1193 mlog_errno(status);
1194
1195out:
1196 mlog_exit(status);
1197 return status;
1198}
1199
1200/* see ocfs2_meta_lock_with_page() */
1201int ocfs2_data_lock_with_page(struct inode *inode,
1202 int write,
1203 struct page *page)
1204{
1205 int ret;
1206
1207 ret = ocfs2_data_lock_full(inode, write, OCFS2_LOCK_NONBLOCK);
1208 if (ret == -EAGAIN) {
1209 unlock_page(page);
1210 if (ocfs2_data_lock(inode, write) == 0)
1211 ocfs2_data_unlock(inode, write);
1212 ret = AOP_TRUNCATED_PAGE;
1213 }
1214
1215 return ret;
1216}
1217
1218static void ocfs2_vote_on_unlock(struct ocfs2_super *osb,
1219 struct ocfs2_lock_res *lockres)
1220{
1221 int kick = 0;
1222
1223 mlog_entry_void();
1224
1225 /* If we know that another node is waiting on our lock, kick
1226 * the vote thread * pre-emptively when we reach a release
1227 * condition. */
1228 if (lockres->l_flags & OCFS2_LOCK_BLOCKED) {
1229 switch(lockres->l_blocking) {
1230 case LKM_EXMODE:
1231 if (!lockres->l_ex_holders && !lockres->l_ro_holders)
1232 kick = 1;
1233 break;
1234 case LKM_PRMODE:
1235 if (!lockres->l_ex_holders)
1236 kick = 1;
1237 break;
1238 default:
1239 BUG();
1240 }
1241 }
1242
1243 if (kick)
1244 ocfs2_kick_vote_thread(osb);
1245
1246 mlog_exit_void();
1247}
1248
1249void ocfs2_data_unlock(struct inode *inode,
1250 int write)
1251{
1252 int level = write ? LKM_EXMODE : LKM_PRMODE;
1253 struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_data_lockres;
1254
1255 mlog_entry_void();
1256
b0697053
MF
1257 mlog(0, "inode %llu drop %s DATA lock\n",
1258 (unsigned long long)OCFS2_I(inode)->ip_blkno,
ccd979bd
MF
1259 write ? "EXMODE" : "PRMODE");
1260
1261 if (!ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb)))
1262 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level);
1263
1264 mlog_exit_void();
1265}
1266
1267#define OCFS2_SEC_BITS 34
1268#define OCFS2_SEC_SHIFT (64 - 34)
1269#define OCFS2_NSEC_MASK ((1ULL << OCFS2_SEC_SHIFT) - 1)
1270
1271/* LVB only has room for 64 bits of time here so we pack it for
1272 * now. */
1273static u64 ocfs2_pack_timespec(struct timespec *spec)
1274{
1275 u64 res;
1276 u64 sec = spec->tv_sec;
1277 u32 nsec = spec->tv_nsec;
1278
1279 res = (sec << OCFS2_SEC_SHIFT) | (nsec & OCFS2_NSEC_MASK);
1280
1281 return res;
1282}
1283
1284/* Call this with the lockres locked. I am reasonably sure we don't
1285 * need ip_lock in this function as anyone who would be changing those
1286 * values is supposed to be blocked in ocfs2_meta_lock right now. */
1287static void __ocfs2_stuff_meta_lvb(struct inode *inode)
1288{
1289 struct ocfs2_inode_info *oi = OCFS2_I(inode);
1290 struct ocfs2_lock_res *lockres = &oi->ip_meta_lockres;
1291 struct ocfs2_meta_lvb *lvb;
1292
1293 mlog_entry_void();
1294
1295 lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
1296
24c19ef4
MF
1297 /*
1298 * Invalidate the LVB of a deleted inode - this way other
1299 * nodes are forced to go to disk and discover the new inode
1300 * status.
1301 */
1302 if (oi->ip_flags & OCFS2_INODE_DELETED) {
1303 lvb->lvb_version = 0;
1304 goto out;
1305 }
1306
4d3b83f7 1307 lvb->lvb_version = OCFS2_LVB_VERSION;
ccd979bd
MF
1308 lvb->lvb_isize = cpu_to_be64(i_size_read(inode));
1309 lvb->lvb_iclusters = cpu_to_be32(oi->ip_clusters);
1310 lvb->lvb_iuid = cpu_to_be32(inode->i_uid);
1311 lvb->lvb_igid = cpu_to_be32(inode->i_gid);
1312 lvb->lvb_imode = cpu_to_be16(inode->i_mode);
1313 lvb->lvb_inlink = cpu_to_be16(inode->i_nlink);
1314 lvb->lvb_iatime_packed =
1315 cpu_to_be64(ocfs2_pack_timespec(&inode->i_atime));
1316 lvb->lvb_ictime_packed =
1317 cpu_to_be64(ocfs2_pack_timespec(&inode->i_ctime));
1318 lvb->lvb_imtime_packed =
1319 cpu_to_be64(ocfs2_pack_timespec(&inode->i_mtime));
ca4d147e 1320 lvb->lvb_iattr = cpu_to_be32(oi->ip_attr);
f9e2d82e 1321 lvb->lvb_igeneration = cpu_to_be32(inode->i_generation);
ccd979bd 1322
24c19ef4 1323out:
ccd979bd
MF
1324 mlog_meta_lvb(0, lockres);
1325
1326 mlog_exit_void();
1327}
1328
1329static void ocfs2_unpack_timespec(struct timespec *spec,
1330 u64 packed_time)
1331{
1332 spec->tv_sec = packed_time >> OCFS2_SEC_SHIFT;
1333 spec->tv_nsec = packed_time & OCFS2_NSEC_MASK;
1334}
1335
1336static void ocfs2_refresh_inode_from_lvb(struct inode *inode)
1337{
1338 struct ocfs2_inode_info *oi = OCFS2_I(inode);
1339 struct ocfs2_lock_res *lockres = &oi->ip_meta_lockres;
1340 struct ocfs2_meta_lvb *lvb;
1341
1342 mlog_entry_void();
1343
1344 mlog_meta_lvb(0, lockres);
1345
1346 lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
1347
1348 /* We're safe here without the lockres lock... */
1349 spin_lock(&oi->ip_lock);
1350 oi->ip_clusters = be32_to_cpu(lvb->lvb_iclusters);
1351 i_size_write(inode, be64_to_cpu(lvb->lvb_isize));
1352
ca4d147e
HP
1353 oi->ip_attr = be32_to_cpu(lvb->lvb_iattr);
1354 ocfs2_set_inode_flags(inode);
1355
ccd979bd
MF
1356 /* fast-symlinks are a special case */
1357 if (S_ISLNK(inode->i_mode) && !oi->ip_clusters)
1358 inode->i_blocks = 0;
1359 else
1360 inode->i_blocks =
1361 ocfs2_align_bytes_to_sectors(i_size_read(inode));
1362
1363 inode->i_uid = be32_to_cpu(lvb->lvb_iuid);
1364 inode->i_gid = be32_to_cpu(lvb->lvb_igid);
1365 inode->i_mode = be16_to_cpu(lvb->lvb_imode);
1366 inode->i_nlink = be16_to_cpu(lvb->lvb_inlink);
1367 ocfs2_unpack_timespec(&inode->i_atime,
1368 be64_to_cpu(lvb->lvb_iatime_packed));
1369 ocfs2_unpack_timespec(&inode->i_mtime,
1370 be64_to_cpu(lvb->lvb_imtime_packed));
1371 ocfs2_unpack_timespec(&inode->i_ctime,
1372 be64_to_cpu(lvb->lvb_ictime_packed));
1373 spin_unlock(&oi->ip_lock);
1374
1375 mlog_exit_void();
1376}
1377
f9e2d82e
MF
1378static inline int ocfs2_meta_lvb_is_trustable(struct inode *inode,
1379 struct ocfs2_lock_res *lockres)
ccd979bd
MF
1380{
1381 struct ocfs2_meta_lvb *lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
1382
f9e2d82e
MF
1383 if (lvb->lvb_version == OCFS2_LVB_VERSION
1384 && be32_to_cpu(lvb->lvb_igeneration) == inode->i_generation)
ccd979bd
MF
1385 return 1;
1386 return 0;
1387}
1388
1389/* Determine whether a lock resource needs to be refreshed, and
1390 * arbitrate who gets to refresh it.
1391 *
1392 * 0 means no refresh needed.
1393 *
1394 * > 0 means you need to refresh this and you MUST call
1395 * ocfs2_complete_lock_res_refresh afterwards. */
1396static int ocfs2_should_refresh_lock_res(struct ocfs2_lock_res *lockres)
1397{
1398 unsigned long flags;
1399 int status = 0;
1400
1401 mlog_entry_void();
1402
1403refresh_check:
1404 spin_lock_irqsave(&lockres->l_lock, flags);
1405 if (!(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH)) {
1406 spin_unlock_irqrestore(&lockres->l_lock, flags);
1407 goto bail;
1408 }
1409
1410 if (lockres->l_flags & OCFS2_LOCK_REFRESHING) {
1411 spin_unlock_irqrestore(&lockres->l_lock, flags);
1412
1413 ocfs2_wait_on_refreshing_lock(lockres);
1414 goto refresh_check;
1415 }
1416
1417 /* Ok, I'll be the one to refresh this lock. */
1418 lockres_or_flags(lockres, OCFS2_LOCK_REFRESHING);
1419 spin_unlock_irqrestore(&lockres->l_lock, flags);
1420
1421 status = 1;
1422bail:
1423 mlog_exit(status);
1424 return status;
1425}
1426
1427/* If status is non zero, I'll mark it as not being in refresh
1428 * anymroe, but i won't clear the needs refresh flag. */
1429static inline void ocfs2_complete_lock_res_refresh(struct ocfs2_lock_res *lockres,
1430 int status)
1431{
1432 unsigned long flags;
1433 mlog_entry_void();
1434
1435 spin_lock_irqsave(&lockres->l_lock, flags);
1436 lockres_clear_flags(lockres, OCFS2_LOCK_REFRESHING);
1437 if (!status)
1438 lockres_clear_flags(lockres, OCFS2_LOCK_NEEDS_REFRESH);
1439 spin_unlock_irqrestore(&lockres->l_lock, flags);
1440
1441 wake_up(&lockres->l_event);
1442
1443 mlog_exit_void();
1444}
1445
1446/* may or may not return a bh if it went to disk. */
1447static int ocfs2_meta_lock_update(struct inode *inode,
1448 struct buffer_head **bh)
1449{
1450 int status = 0;
1451 struct ocfs2_inode_info *oi = OCFS2_I(inode);
1452 struct ocfs2_lock_res *lockres;
1453 struct ocfs2_dinode *fe;
1454
1455 mlog_entry_void();
1456
1457 spin_lock(&oi->ip_lock);
1458 if (oi->ip_flags & OCFS2_INODE_DELETED) {
b0697053 1459 mlog(0, "Orphaned inode %llu was deleted while we "
ccd979bd 1460 "were waiting on a lock. ip_flags = 0x%x\n",
b0697053 1461 (unsigned long long)oi->ip_blkno, oi->ip_flags);
ccd979bd
MF
1462 spin_unlock(&oi->ip_lock);
1463 status = -ENOENT;
1464 goto bail;
1465 }
1466 spin_unlock(&oi->ip_lock);
1467
1468 lockres = &oi->ip_meta_lockres;
1469
1470 if (!ocfs2_should_refresh_lock_res(lockres))
1471 goto bail;
1472
1473 /* This will discard any caching information we might have had
1474 * for the inode metadata. */
1475 ocfs2_metadata_cache_purge(inode);
1476
1477 /* will do nothing for inode types that don't use the extent
1478 * map (directories, bitmap files, etc) */
1479 ocfs2_extent_map_trunc(inode, 0);
1480
f9e2d82e 1481 if (ocfs2_meta_lvb_is_trustable(inode, lockres)) {
b0697053
MF
1482 mlog(0, "Trusting LVB on inode %llu\n",
1483 (unsigned long long)oi->ip_blkno);
ccd979bd
MF
1484 ocfs2_refresh_inode_from_lvb(inode);
1485 } else {
1486 /* Boo, we have to go to disk. */
1487 /* read bh, cast, ocfs2_refresh_inode */
1488 status = ocfs2_read_block(OCFS2_SB(inode->i_sb), oi->ip_blkno,
1489 bh, OCFS2_BH_CACHED, inode);
1490 if (status < 0) {
1491 mlog_errno(status);
1492 goto bail_refresh;
1493 }
1494 fe = (struct ocfs2_dinode *) (*bh)->b_data;
1495
1496 /* This is a good chance to make sure we're not
1497 * locking an invalid object.
1498 *
1499 * We bug on a stale inode here because we checked
1500 * above whether it was wiped from disk. The wiping
1501 * node provides a guarantee that we receive that
1502 * message and can mark the inode before dropping any
1503 * locks associated with it. */
1504 if (!OCFS2_IS_VALID_DINODE(fe)) {
1505 OCFS2_RO_ON_INVALID_DINODE(inode->i_sb, fe);
1506 status = -EIO;
1507 goto bail_refresh;
1508 }
1509 mlog_bug_on_msg(inode->i_generation !=
1510 le32_to_cpu(fe->i_generation),
b0697053 1511 "Invalid dinode %llu disk generation: %u "
ccd979bd 1512 "inode->i_generation: %u\n",
b0697053
MF
1513 (unsigned long long)oi->ip_blkno,
1514 le32_to_cpu(fe->i_generation),
ccd979bd
MF
1515 inode->i_generation);
1516 mlog_bug_on_msg(le64_to_cpu(fe->i_dtime) ||
1517 !(fe->i_flags & cpu_to_le32(OCFS2_VALID_FL)),
b0697053
MF
1518 "Stale dinode %llu dtime: %llu flags: 0x%x\n",
1519 (unsigned long long)oi->ip_blkno,
1520 (unsigned long long)le64_to_cpu(fe->i_dtime),
ccd979bd
MF
1521 le32_to_cpu(fe->i_flags));
1522
1523 ocfs2_refresh_inode(inode, fe);
1524 }
1525
1526 status = 0;
1527bail_refresh:
1528 ocfs2_complete_lock_res_refresh(lockres, status);
1529bail:
1530 mlog_exit(status);
1531 return status;
1532}
1533
1534static int ocfs2_assign_bh(struct inode *inode,
1535 struct buffer_head **ret_bh,
1536 struct buffer_head *passed_bh)
1537{
1538 int status;
1539
1540 if (passed_bh) {
1541 /* Ok, the update went to disk for us, use the
1542 * returned bh. */
1543 *ret_bh = passed_bh;
1544 get_bh(*ret_bh);
1545
1546 return 0;
1547 }
1548
1549 status = ocfs2_read_block(OCFS2_SB(inode->i_sb),
1550 OCFS2_I(inode)->ip_blkno,
1551 ret_bh,
1552 OCFS2_BH_CACHED,
1553 inode);
1554 if (status < 0)
1555 mlog_errno(status);
1556
1557 return status;
1558}
1559
1560/*
1561 * returns < 0 error if the callback will never be called, otherwise
1562 * the result of the lock will be communicated via the callback.
1563 */
1564int ocfs2_meta_lock_full(struct inode *inode,
1565 struct ocfs2_journal_handle *handle,
1566 struct buffer_head **ret_bh,
1567 int ex,
1568 int arg_flags)
1569{
1570 int status, level, dlm_flags, acquired;
1571 struct ocfs2_lock_res *lockres;
1572 struct ocfs2_super *osb = OCFS2_SB(inode->i_sb);
1573 struct buffer_head *local_bh = NULL;
1574
1575 BUG_ON(!inode);
1576
1577 mlog_entry_void();
1578
b0697053
MF
1579 mlog(0, "inode %llu, take %s META lock\n",
1580 (unsigned long long)OCFS2_I(inode)->ip_blkno,
ccd979bd
MF
1581 ex ? "EXMODE" : "PRMODE");
1582
1583 status = 0;
1584 acquired = 0;
1585 /* We'll allow faking a readonly metadata lock for
1586 * rodevices. */
1587 if (ocfs2_is_hard_readonly(osb)) {
1588 if (ex)
1589 status = -EROFS;
1590 goto bail;
1591 }
1592
1593 if (!(arg_flags & OCFS2_META_LOCK_RECOVERY))
1594 wait_event(osb->recovery_event,
1595 ocfs2_node_map_is_empty(osb, &osb->recovery_map));
1596
1597 acquired = 0;
1598 lockres = &OCFS2_I(inode)->ip_meta_lockres;
1599 level = ex ? LKM_EXMODE : LKM_PRMODE;
1600 dlm_flags = 0;
1601 if (arg_flags & OCFS2_META_LOCK_NOQUEUE)
1602 dlm_flags |= LKM_NOQUEUE;
1603
1604 status = ocfs2_cluster_lock(osb, lockres, level, dlm_flags, arg_flags);
1605 if (status < 0) {
1606 if (status != -EAGAIN && status != -EIOCBRETRY)
1607 mlog_errno(status);
1608 goto bail;
1609 }
1610
1611 /* Notify the error cleanup path to drop the cluster lock. */
1612 acquired = 1;
1613
1614 /* We wait twice because a node may have died while we were in
1615 * the lower dlm layers. The second time though, we've
1616 * committed to owning this lock so we don't allow signals to
1617 * abort the operation. */
1618 if (!(arg_flags & OCFS2_META_LOCK_RECOVERY))
1619 wait_event(osb->recovery_event,
1620 ocfs2_node_map_is_empty(osb, &osb->recovery_map));
1621
24c19ef4
MF
1622 /*
1623 * We only see this flag if we're being called from
1624 * ocfs2_read_locked_inode(). It means we're locking an inode
1625 * which hasn't been populated yet, so clear the refresh flag
1626 * and let the caller handle it.
1627 */
1628 if (inode->i_state & I_NEW) {
1629 status = 0;
1630 ocfs2_complete_lock_res_refresh(lockres, 0);
1631 goto bail;
1632 }
1633
ccd979bd
MF
1634 /* This is fun. The caller may want a bh back, or it may
1635 * not. ocfs2_meta_lock_update definitely wants one in, but
1636 * may or may not read one, depending on what's in the
1637 * LVB. The result of all of this is that we've *only* gone to
1638 * disk if we have to, so the complexity is worthwhile. */
1639 status = ocfs2_meta_lock_update(inode, &local_bh);
1640 if (status < 0) {
1641 if (status != -ENOENT)
1642 mlog_errno(status);
1643 goto bail;
1644 }
1645
1646 if (ret_bh) {
1647 status = ocfs2_assign_bh(inode, ret_bh, local_bh);
1648 if (status < 0) {
1649 mlog_errno(status);
1650 goto bail;
1651 }
1652 }
1653
1654 if (handle) {
1655 status = ocfs2_handle_add_lock(handle, inode);
1656 if (status < 0)
1657 mlog_errno(status);
1658 }
1659
1660bail:
1661 if (status < 0) {
1662 if (ret_bh && (*ret_bh)) {
1663 brelse(*ret_bh);
1664 *ret_bh = NULL;
1665 }
1666 if (acquired)
1667 ocfs2_meta_unlock(inode, ex);
1668 }
1669
1670 if (local_bh)
1671 brelse(local_bh);
1672
1673 mlog_exit(status);
1674 return status;
1675}
1676
1677/*
1678 * This is working around a lock inversion between tasks acquiring DLM locks
1679 * while holding a page lock and the vote thread which blocks dlm lock acquiry
1680 * while acquiring page locks.
1681 *
1682 * ** These _with_page variantes are only intended to be called from aop
1683 * methods that hold page locks and return a very specific *positive* error
1684 * code that aop methods pass up to the VFS -- test for errors with != 0. **
1685 *
1686 * The DLM is called such that it returns -EAGAIN if it would have blocked
1687 * waiting for the vote thread. In that case we unlock our page so the vote
1688 * thread can make progress. Once we've done this we have to return
1689 * AOP_TRUNCATED_PAGE so the aop method that called us can bubble that back up
1690 * into the VFS who will then immediately retry the aop call.
1691 *
1692 * We do a blocking lock and immediate unlock before returning, though, so that
1693 * the lock has a great chance of being cached on this node by the time the VFS
1694 * calls back to retry the aop. This has a potential to livelock as nodes
1695 * ping locks back and forth, but that's a risk we're willing to take to avoid
1696 * the lock inversion simply.
1697 */
1698int ocfs2_meta_lock_with_page(struct inode *inode,
1699 struct ocfs2_journal_handle *handle,
1700 struct buffer_head **ret_bh,
1701 int ex,
1702 struct page *page)
1703{
1704 int ret;
1705
1706 ret = ocfs2_meta_lock_full(inode, handle, ret_bh, ex,
1707 OCFS2_LOCK_NONBLOCK);
1708 if (ret == -EAGAIN) {
1709 unlock_page(page);
1710 if (ocfs2_meta_lock(inode, handle, ret_bh, ex) == 0)
1711 ocfs2_meta_unlock(inode, ex);
1712 ret = AOP_TRUNCATED_PAGE;
1713 }
1714
1715 return ret;
1716}
1717
1718void ocfs2_meta_unlock(struct inode *inode,
1719 int ex)
1720{
1721 int level = ex ? LKM_EXMODE : LKM_PRMODE;
1722 struct ocfs2_lock_res *lockres = &OCFS2_I(inode)->ip_meta_lockres;
1723
1724 mlog_entry_void();
1725
b0697053
MF
1726 mlog(0, "inode %llu drop %s META lock\n",
1727 (unsigned long long)OCFS2_I(inode)->ip_blkno,
ccd979bd
MF
1728 ex ? "EXMODE" : "PRMODE");
1729
1730 if (!ocfs2_is_hard_readonly(OCFS2_SB(inode->i_sb)))
1731 ocfs2_cluster_unlock(OCFS2_SB(inode->i_sb), lockres, level);
1732
1733 mlog_exit_void();
1734}
1735
1736int ocfs2_super_lock(struct ocfs2_super *osb,
1737 int ex)
1738{
1739 int status;
1740 int level = ex ? LKM_EXMODE : LKM_PRMODE;
1741 struct ocfs2_lock_res *lockres = &osb->osb_super_lockres;
1742 struct buffer_head *bh;
1743 struct ocfs2_slot_info *si = osb->slot_info;
1744
1745 mlog_entry_void();
1746
1747 if (ocfs2_is_hard_readonly(osb))
1748 return -EROFS;
1749
1750 status = ocfs2_cluster_lock(osb, lockres, level, 0, 0);
1751 if (status < 0) {
1752 mlog_errno(status);
1753 goto bail;
1754 }
1755
1756 /* The super block lock path is really in the best position to
1757 * know when resources covered by the lock need to be
1758 * refreshed, so we do it here. Of course, making sense of
1759 * everything is up to the caller :) */
1760 status = ocfs2_should_refresh_lock_res(lockres);
1761 if (status < 0) {
1762 mlog_errno(status);
1763 goto bail;
1764 }
1765 if (status) {
1766 bh = si->si_bh;
1767 status = ocfs2_read_block(osb, bh->b_blocknr, &bh, 0,
1768 si->si_inode);
1769 if (status == 0)
1770 ocfs2_update_slot_info(si);
1771
1772 ocfs2_complete_lock_res_refresh(lockres, status);
1773
1774 if (status < 0)
1775 mlog_errno(status);
1776 }
1777bail:
1778 mlog_exit(status);
1779 return status;
1780}
1781
1782void ocfs2_super_unlock(struct ocfs2_super *osb,
1783 int ex)
1784{
1785 int level = ex ? LKM_EXMODE : LKM_PRMODE;
1786 struct ocfs2_lock_res *lockres = &osb->osb_super_lockres;
1787
1788 ocfs2_cluster_unlock(osb, lockres, level);
1789}
1790
1791int ocfs2_rename_lock(struct ocfs2_super *osb)
1792{
1793 int status;
1794 struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres;
1795
1796 if (ocfs2_is_hard_readonly(osb))
1797 return -EROFS;
1798
1799 status = ocfs2_cluster_lock(osb, lockres, LKM_EXMODE, 0, 0);
1800 if (status < 0)
1801 mlog_errno(status);
1802
1803 return status;
1804}
1805
1806void ocfs2_rename_unlock(struct ocfs2_super *osb)
1807{
1808 struct ocfs2_lock_res *lockres = &osb->osb_rename_lockres;
1809
1810 ocfs2_cluster_unlock(osb, lockres, LKM_EXMODE);
1811}
1812
d680efe9
MF
1813int ocfs2_dentry_lock(struct dentry *dentry, int ex)
1814{
1815 int ret;
1816 int level = ex ? LKM_EXMODE : LKM_PRMODE;
1817 struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
1818 struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb);
1819
1820 BUG_ON(!dl);
1821
1822 if (ocfs2_is_hard_readonly(osb))
1823 return -EROFS;
1824
1825 ret = ocfs2_cluster_lock(osb, &dl->dl_lockres, level, 0, 0);
1826 if (ret < 0)
1827 mlog_errno(ret);
1828
1829 return ret;
1830}
1831
1832void ocfs2_dentry_unlock(struct dentry *dentry, int ex)
1833{
1834 int level = ex ? LKM_EXMODE : LKM_PRMODE;
1835 struct ocfs2_dentry_lock *dl = dentry->d_fsdata;
1836 struct ocfs2_super *osb = OCFS2_SB(dentry->d_sb);
1837
1838 ocfs2_cluster_unlock(osb, &dl->dl_lockres, level);
1839}
1840
ccd979bd
MF
1841/* Reference counting of the dlm debug structure. We want this because
1842 * open references on the debug inodes can live on after a mount, so
1843 * we can't rely on the ocfs2_super to always exist. */
1844static void ocfs2_dlm_debug_free(struct kref *kref)
1845{
1846 struct ocfs2_dlm_debug *dlm_debug;
1847
1848 dlm_debug = container_of(kref, struct ocfs2_dlm_debug, d_refcnt);
1849
1850 kfree(dlm_debug);
1851}
1852
1853void ocfs2_put_dlm_debug(struct ocfs2_dlm_debug *dlm_debug)
1854{
1855 if (dlm_debug)
1856 kref_put(&dlm_debug->d_refcnt, ocfs2_dlm_debug_free);
1857}
1858
1859static void ocfs2_get_dlm_debug(struct ocfs2_dlm_debug *debug)
1860{
1861 kref_get(&debug->d_refcnt);
1862}
1863
1864struct ocfs2_dlm_debug *ocfs2_new_dlm_debug(void)
1865{
1866 struct ocfs2_dlm_debug *dlm_debug;
1867
1868 dlm_debug = kmalloc(sizeof(struct ocfs2_dlm_debug), GFP_KERNEL);
1869 if (!dlm_debug) {
1870 mlog_errno(-ENOMEM);
1871 goto out;
1872 }
1873
1874 kref_init(&dlm_debug->d_refcnt);
1875 INIT_LIST_HEAD(&dlm_debug->d_lockres_tracking);
1876 dlm_debug->d_locking_state = NULL;
1877out:
1878 return dlm_debug;
1879}
1880
1881/* Access to this is arbitrated for us via seq_file->sem. */
1882struct ocfs2_dlm_seq_priv {
1883 struct ocfs2_dlm_debug *p_dlm_debug;
1884 struct ocfs2_lock_res p_iter_res;
1885 struct ocfs2_lock_res p_tmp_res;
1886};
1887
1888static struct ocfs2_lock_res *ocfs2_dlm_next_res(struct ocfs2_lock_res *start,
1889 struct ocfs2_dlm_seq_priv *priv)
1890{
1891 struct ocfs2_lock_res *iter, *ret = NULL;
1892 struct ocfs2_dlm_debug *dlm_debug = priv->p_dlm_debug;
1893
1894 assert_spin_locked(&ocfs2_dlm_tracking_lock);
1895
1896 list_for_each_entry(iter, &start->l_debug_list, l_debug_list) {
1897 /* discover the head of the list */
1898 if (&iter->l_debug_list == &dlm_debug->d_lockres_tracking) {
1899 mlog(0, "End of list found, %p\n", ret);
1900 break;
1901 }
1902
1903 /* We track our "dummy" iteration lockres' by a NULL
1904 * l_ops field. */
1905 if (iter->l_ops != NULL) {
1906 ret = iter;
1907 break;
1908 }
1909 }
1910
1911 return ret;
1912}
1913
1914static void *ocfs2_dlm_seq_start(struct seq_file *m, loff_t *pos)
1915{
1916 struct ocfs2_dlm_seq_priv *priv = m->private;
1917 struct ocfs2_lock_res *iter;
1918
1919 spin_lock(&ocfs2_dlm_tracking_lock);
1920 iter = ocfs2_dlm_next_res(&priv->p_iter_res, priv);
1921 if (iter) {
1922 /* Since lockres' have the lifetime of their container
1923 * (which can be inodes, ocfs2_supers, etc) we want to
1924 * copy this out to a temporary lockres while still
1925 * under the spinlock. Obviously after this we can't
1926 * trust any pointers on the copy returned, but that's
1927 * ok as the information we want isn't typically held
1928 * in them. */
1929 priv->p_tmp_res = *iter;
1930 iter = &priv->p_tmp_res;
1931 }
1932 spin_unlock(&ocfs2_dlm_tracking_lock);
1933
1934 return iter;
1935}
1936
1937static void ocfs2_dlm_seq_stop(struct seq_file *m, void *v)
1938{
1939}
1940
1941static void *ocfs2_dlm_seq_next(struct seq_file *m, void *v, loff_t *pos)
1942{
1943 struct ocfs2_dlm_seq_priv *priv = m->private;
1944 struct ocfs2_lock_res *iter = v;
1945 struct ocfs2_lock_res *dummy = &priv->p_iter_res;
1946
1947 spin_lock(&ocfs2_dlm_tracking_lock);
1948 iter = ocfs2_dlm_next_res(iter, priv);
1949 list_del_init(&dummy->l_debug_list);
1950 if (iter) {
1951 list_add(&dummy->l_debug_list, &iter->l_debug_list);
1952 priv->p_tmp_res = *iter;
1953 iter = &priv->p_tmp_res;
1954 }
1955 spin_unlock(&ocfs2_dlm_tracking_lock);
1956
1957 return iter;
1958}
1959
1960/* So that debugfs.ocfs2 can determine which format is being used */
1961#define OCFS2_DLM_DEBUG_STR_VERSION 1
1962static int ocfs2_dlm_seq_show(struct seq_file *m, void *v)
1963{
1964 int i;
1965 char *lvb;
1966 struct ocfs2_lock_res *lockres = v;
1967
1968 if (!lockres)
1969 return -EINVAL;
1970
d680efe9
MF
1971 seq_printf(m, "0x%x\t", OCFS2_DLM_DEBUG_STR_VERSION);
1972
1973 if (lockres->l_type == OCFS2_LOCK_TYPE_DENTRY)
1974 seq_printf(m, "%.*s%08x\t", OCFS2_DENTRY_LOCK_INO_START - 1,
1975 lockres->l_name,
1976 (unsigned int)ocfs2_get_dentry_lock_ino(lockres));
1977 else
1978 seq_printf(m, "%.*s\t", OCFS2_LOCK_ID_MAX_LEN, lockres->l_name);
1979
1980 seq_printf(m, "%d\t"
ccd979bd
MF
1981 "0x%lx\t"
1982 "0x%x\t"
1983 "0x%x\t"
1984 "%u\t"
1985 "%u\t"
1986 "%d\t"
1987 "%d\t",
ccd979bd
MF
1988 lockres->l_level,
1989 lockres->l_flags,
1990 lockres->l_action,
1991 lockres->l_unlock_action,
1992 lockres->l_ro_holders,
1993 lockres->l_ex_holders,
1994 lockres->l_requested,
1995 lockres->l_blocking);
1996
1997 /* Dump the raw LVB */
1998 lvb = lockres->l_lksb.lvb;
1999 for(i = 0; i < DLM_LVB_LEN; i++)
2000 seq_printf(m, "0x%x\t", lvb[i]);
2001
2002 /* End the line */
2003 seq_printf(m, "\n");
2004 return 0;
2005}
2006
2007static struct seq_operations ocfs2_dlm_seq_ops = {
2008 .start = ocfs2_dlm_seq_start,
2009 .stop = ocfs2_dlm_seq_stop,
2010 .next = ocfs2_dlm_seq_next,
2011 .show = ocfs2_dlm_seq_show,
2012};
2013
2014static int ocfs2_dlm_debug_release(struct inode *inode, struct file *file)
2015{
2016 struct seq_file *seq = (struct seq_file *) file->private_data;
2017 struct ocfs2_dlm_seq_priv *priv = seq->private;
2018 struct ocfs2_lock_res *res = &priv->p_iter_res;
2019
2020 ocfs2_remove_lockres_tracking(res);
2021 ocfs2_put_dlm_debug(priv->p_dlm_debug);
2022 return seq_release_private(inode, file);
2023}
2024
2025static int ocfs2_dlm_debug_open(struct inode *inode, struct file *file)
2026{
2027 int ret;
2028 struct ocfs2_dlm_seq_priv *priv;
2029 struct seq_file *seq;
2030 struct ocfs2_super *osb;
2031
2032 priv = kzalloc(sizeof(struct ocfs2_dlm_seq_priv), GFP_KERNEL);
2033 if (!priv) {
2034 ret = -ENOMEM;
2035 mlog_errno(ret);
2036 goto out;
2037 }
2038 osb = (struct ocfs2_super *) inode->u.generic_ip;
2039 ocfs2_get_dlm_debug(osb->osb_dlm_debug);
2040 priv->p_dlm_debug = osb->osb_dlm_debug;
2041 INIT_LIST_HEAD(&priv->p_iter_res.l_debug_list);
2042
2043 ret = seq_open(file, &ocfs2_dlm_seq_ops);
2044 if (ret) {
2045 kfree(priv);
2046 mlog_errno(ret);
2047 goto out;
2048 }
2049
2050 seq = (struct seq_file *) file->private_data;
2051 seq->private = priv;
2052
2053 ocfs2_add_lockres_tracking(&priv->p_iter_res,
2054 priv->p_dlm_debug);
2055
2056out:
2057 return ret;
2058}
2059
4b6f5d20 2060static const struct file_operations ocfs2_dlm_debug_fops = {
ccd979bd
MF
2061 .open = ocfs2_dlm_debug_open,
2062 .release = ocfs2_dlm_debug_release,
2063 .read = seq_read,
2064 .llseek = seq_lseek,
2065};
2066
2067static int ocfs2_dlm_init_debug(struct ocfs2_super *osb)
2068{
2069 int ret = 0;
2070 struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug;
2071
2072 dlm_debug->d_locking_state = debugfs_create_file("locking_state",
2073 S_IFREG|S_IRUSR,
2074 osb->osb_debug_root,
2075 osb,
2076 &ocfs2_dlm_debug_fops);
2077 if (!dlm_debug->d_locking_state) {
2078 ret = -EINVAL;
2079 mlog(ML_ERROR,
2080 "Unable to create locking state debugfs file.\n");
2081 goto out;
2082 }
2083
2084 ocfs2_get_dlm_debug(dlm_debug);
2085out:
2086 return ret;
2087}
2088
2089static void ocfs2_dlm_shutdown_debug(struct ocfs2_super *osb)
2090{
2091 struct ocfs2_dlm_debug *dlm_debug = osb->osb_dlm_debug;
2092
2093 if (dlm_debug) {
2094 debugfs_remove(dlm_debug->d_locking_state);
2095 ocfs2_put_dlm_debug(dlm_debug);
2096 }
2097}
2098
2099int ocfs2_dlm_init(struct ocfs2_super *osb)
2100{
2101 int status;
2102 u32 dlm_key;
2103 struct dlm_ctxt *dlm;
2104
2105 mlog_entry_void();
2106
2107 status = ocfs2_dlm_init_debug(osb);
2108 if (status < 0) {
2109 mlog_errno(status);
2110 goto bail;
2111 }
2112
2113 /* launch vote thread */
78427043 2114 osb->vote_task = kthread_run(ocfs2_vote_thread, osb, "ocfs2vote");
ccd979bd
MF
2115 if (IS_ERR(osb->vote_task)) {
2116 status = PTR_ERR(osb->vote_task);
2117 osb->vote_task = NULL;
2118 mlog_errno(status);
2119 goto bail;
2120 }
2121
2122 /* used by the dlm code to make message headers unique, each
2123 * node in this domain must agree on this. */
2124 dlm_key = crc32_le(0, osb->uuid_str, strlen(osb->uuid_str));
2125
2126 /* for now, uuid == domain */
2127 dlm = dlm_register_domain(osb->uuid_str, dlm_key);
2128 if (IS_ERR(dlm)) {
2129 status = PTR_ERR(dlm);
2130 mlog_errno(status);
2131 goto bail;
2132 }
2133
2134 ocfs2_super_lock_res_init(&osb->osb_super_lockres, osb);
2135 ocfs2_rename_lock_res_init(&osb->osb_rename_lockres, osb);
2136
2137 dlm_register_eviction_cb(dlm, &osb->osb_eviction_cb);
2138
2139 osb->dlm = dlm;
2140
2141 status = 0;
2142bail:
2143 if (status < 0) {
2144 ocfs2_dlm_shutdown_debug(osb);
2145 if (osb->vote_task)
2146 kthread_stop(osb->vote_task);
2147 }
2148
2149 mlog_exit(status);
2150 return status;
2151}
2152
2153void ocfs2_dlm_shutdown(struct ocfs2_super *osb)
2154{
2155 mlog_entry_void();
2156
2157 dlm_unregister_eviction_cb(&osb->osb_eviction_cb);
2158
2159 ocfs2_drop_osb_locks(osb);
2160
2161 if (osb->vote_task) {
2162 kthread_stop(osb->vote_task);
2163 osb->vote_task = NULL;
2164 }
2165
2166 ocfs2_lock_res_free(&osb->osb_super_lockres);
2167 ocfs2_lock_res_free(&osb->osb_rename_lockres);
2168
2169 dlm_unregister_domain(osb->dlm);
2170 osb->dlm = NULL;
2171
2172 ocfs2_dlm_shutdown_debug(osb);
2173
2174 mlog_exit_void();
2175}
2176
2a45f2d1 2177static void ocfs2_unlock_ast(void *opaque, enum dlm_status status)
ccd979bd
MF
2178{
2179 struct ocfs2_lock_res *lockres = opaque;
2180 unsigned long flags;
2181
2182 mlog_entry_void();
2183
2184 mlog(0, "UNLOCK AST called on lock %s, action = %d\n", lockres->l_name,
2185 lockres->l_unlock_action);
2186
2187 spin_lock_irqsave(&lockres->l_lock, flags);
2188 /* We tried to cancel a convert request, but it was already
2189 * granted. All we want to do here is clear our unlock
2190 * state. The wake_up call done at the bottom is redundant
2191 * (ocfs2_prepare_cancel_convert doesn't sleep on this) but doesn't
2192 * hurt anything anyway */
2193 if (status == DLM_CANCELGRANT &&
2194 lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) {
2195 mlog(0, "Got cancelgrant for %s\n", lockres->l_name);
2196
2197 /* We don't clear the busy flag in this case as it
2198 * should have been cleared by the ast which the dlm
2199 * has called. */
2200 goto complete_unlock;
2201 }
2202
2203 if (status != DLM_NORMAL) {
2204 mlog(ML_ERROR, "Dlm passes status %d for lock %s, "
2205 "unlock_action %d\n", status, lockres->l_name,
2206 lockres->l_unlock_action);
2207 spin_unlock_irqrestore(&lockres->l_lock, flags);
2208 return;
2209 }
2210
2211 switch(lockres->l_unlock_action) {
2212 case OCFS2_UNLOCK_CANCEL_CONVERT:
2213 mlog(0, "Cancel convert success for %s\n", lockres->l_name);
2214 lockres->l_action = OCFS2_AST_INVALID;
2215 break;
2216 case OCFS2_UNLOCK_DROP_LOCK:
2217 lockres->l_level = LKM_IVMODE;
2218 break;
2219 default:
2220 BUG();
2221 }
2222
2223 lockres_clear_flags(lockres, OCFS2_LOCK_BUSY);
2224complete_unlock:
2225 lockres->l_unlock_action = OCFS2_UNLOCK_INVALID;
2226 spin_unlock_irqrestore(&lockres->l_lock, flags);
2227
2228 wake_up(&lockres->l_event);
2229
2230 mlog_exit_void();
2231}
2232
2233typedef void (ocfs2_pre_drop_cb_t)(struct ocfs2_lock_res *, void *);
2234
2235struct drop_lock_cb {
2236 ocfs2_pre_drop_cb_t *drop_func;
2237 void *drop_data;
2238};
2239
2240static int ocfs2_drop_lock(struct ocfs2_super *osb,
2241 struct ocfs2_lock_res *lockres,
2242 struct drop_lock_cb *dcb)
2243{
2244 enum dlm_status status;
2245 unsigned long flags;
b80fc012 2246 int lkm_flags = 0;
ccd979bd
MF
2247
2248 /* We didn't get anywhere near actually using this lockres. */
2249 if (!(lockres->l_flags & OCFS2_LOCK_INITIALIZED))
2250 goto out;
2251
b80fc012
MF
2252 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB)
2253 lkm_flags |= LKM_VALBLK;
2254
ccd979bd
MF
2255 spin_lock_irqsave(&lockres->l_lock, flags);
2256
2257 mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_FREEING),
2258 "lockres %s, flags 0x%lx\n",
2259 lockres->l_name, lockres->l_flags);
2260
2261 while (lockres->l_flags & OCFS2_LOCK_BUSY) {
2262 mlog(0, "waiting on busy lock \"%s\": flags = %lx, action = "
2263 "%u, unlock_action = %u\n",
2264 lockres->l_name, lockres->l_flags, lockres->l_action,
2265 lockres->l_unlock_action);
2266
2267 spin_unlock_irqrestore(&lockres->l_lock, flags);
2268
2269 /* XXX: Today we just wait on any busy
2270 * locks... Perhaps we need to cancel converts in the
2271 * future? */
2272 ocfs2_wait_on_busy_lock(lockres);
2273
2274 spin_lock_irqsave(&lockres->l_lock, flags);
2275 }
2276
2277 if (dcb)
2278 dcb->drop_func(lockres, dcb->drop_data);
2279
2280 if (lockres->l_flags & OCFS2_LOCK_BUSY)
2281 mlog(ML_ERROR, "destroying busy lock: \"%s\"\n",
2282 lockres->l_name);
2283 if (lockres->l_flags & OCFS2_LOCK_BLOCKED)
2284 mlog(0, "destroying blocked lock: \"%s\"\n", lockres->l_name);
2285
2286 if (!(lockres->l_flags & OCFS2_LOCK_ATTACHED)) {
2287 spin_unlock_irqrestore(&lockres->l_lock, flags);
2288 goto out;
2289 }
2290
2291 lockres_clear_flags(lockres, OCFS2_LOCK_ATTACHED);
2292
2293 /* make sure we never get here while waiting for an ast to
2294 * fire. */
2295 BUG_ON(lockres->l_action != OCFS2_AST_INVALID);
2296
2297 /* is this necessary? */
2298 lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
2299 lockres->l_unlock_action = OCFS2_UNLOCK_DROP_LOCK;
2300 spin_unlock_irqrestore(&lockres->l_lock, flags);
2301
2302 mlog(0, "lock %s\n", lockres->l_name);
2303
b80fc012 2304 status = dlmunlock(osb->dlm, &lockres->l_lksb, lkm_flags,
2a45f2d1 2305 ocfs2_unlock_ast, lockres);
ccd979bd
MF
2306 if (status != DLM_NORMAL) {
2307 ocfs2_log_dlm_error("dlmunlock", status, lockres);
2308 mlog(ML_ERROR, "lockres flags: %lu\n", lockres->l_flags);
2309 dlm_print_one_lock(lockres->l_lksb.lockid);
2310 BUG();
2311 }
2312 mlog(0, "lock %s, successfull return from dlmunlock\n",
2313 lockres->l_name);
2314
2315 ocfs2_wait_on_busy_lock(lockres);
2316out:
2317 mlog_exit(0);
2318 return 0;
2319}
2320
2321/* Mark the lockres as being dropped. It will no longer be
2322 * queued if blocking, but we still may have to wait on it
2323 * being dequeued from the vote thread before we can consider
2324 * it safe to drop.
2325 *
2326 * You can *not* attempt to call cluster_lock on this lockres anymore. */
2327void ocfs2_mark_lockres_freeing(struct ocfs2_lock_res *lockres)
2328{
2329 int status;
2330 struct ocfs2_mask_waiter mw;
2331 unsigned long flags;
2332
2333 ocfs2_init_mask_waiter(&mw);
2334
2335 spin_lock_irqsave(&lockres->l_lock, flags);
2336 lockres->l_flags |= OCFS2_LOCK_FREEING;
2337 while (lockres->l_flags & OCFS2_LOCK_QUEUED) {
2338 lockres_add_mask_waiter(lockres, &mw, OCFS2_LOCK_QUEUED, 0);
2339 spin_unlock_irqrestore(&lockres->l_lock, flags);
2340
2341 mlog(0, "Waiting on lockres %s\n", lockres->l_name);
2342
2343 status = ocfs2_wait_for_mask(&mw);
2344 if (status)
2345 mlog_errno(status);
2346
2347 spin_lock_irqsave(&lockres->l_lock, flags);
2348 }
2349 spin_unlock_irqrestore(&lockres->l_lock, flags);
2350}
2351
d680efe9
MF
2352void ocfs2_simple_drop_lockres(struct ocfs2_super *osb,
2353 struct ocfs2_lock_res *lockres)
ccd979bd 2354{
d680efe9 2355 int ret;
ccd979bd 2356
d680efe9
MF
2357 ocfs2_mark_lockres_freeing(lockres);
2358 ret = ocfs2_drop_lock(osb, lockres, NULL);
2359 if (ret)
2360 mlog_errno(ret);
2361}
ccd979bd 2362
d680efe9
MF
2363static void ocfs2_drop_osb_locks(struct ocfs2_super *osb)
2364{
2365 ocfs2_simple_drop_lockres(osb, &osb->osb_super_lockres);
2366 ocfs2_simple_drop_lockres(osb, &osb->osb_rename_lockres);
ccd979bd
MF
2367}
2368
2369static void ocfs2_meta_pre_drop(struct ocfs2_lock_res *lockres, void *data)
2370{
2371 struct inode *inode = data;
2372
2373 /* the metadata lock requires a bit more work as we have an
2374 * LVB to worry about. */
2375 if (lockres->l_flags & OCFS2_LOCK_ATTACHED &&
2376 lockres->l_level == LKM_EXMODE &&
2377 !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH))
2378 __ocfs2_stuff_meta_lvb(inode);
2379}
2380
2381int ocfs2_drop_inode_locks(struct inode *inode)
2382{
2383 int status, err;
2384 struct drop_lock_cb meta_dcb = { ocfs2_meta_pre_drop, inode, };
2385
2386 mlog_entry_void();
2387
2388 /* No need to call ocfs2_mark_lockres_freeing here -
2389 * ocfs2_clear_inode has done it for us. */
2390
2391 err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
2392 &OCFS2_I(inode)->ip_data_lockres,
2393 NULL);
2394 if (err < 0)
2395 mlog_errno(err);
2396
2397 status = err;
2398
2399 err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
2400 &OCFS2_I(inode)->ip_meta_lockres,
2401 &meta_dcb);
2402 if (err < 0)
2403 mlog_errno(err);
2404 if (err < 0 && !status)
2405 status = err;
2406
2407 err = ocfs2_drop_lock(OCFS2_SB(inode->i_sb),
2408 &OCFS2_I(inode)->ip_rw_lockres,
2409 NULL);
2410 if (err < 0)
2411 mlog_errno(err);
2412 if (err < 0 && !status)
2413 status = err;
2414
2415 mlog_exit(status);
2416 return status;
2417}
2418
2419static void ocfs2_prepare_downconvert(struct ocfs2_lock_res *lockres,
2420 int new_level)
2421{
2422 assert_spin_locked(&lockres->l_lock);
2423
2424 BUG_ON(lockres->l_blocking <= LKM_NLMODE);
2425
2426 if (lockres->l_level <= new_level) {
2427 mlog(ML_ERROR, "lockres->l_level (%u) <= new_level (%u)\n",
2428 lockres->l_level, new_level);
2429 BUG();
2430 }
2431
2432 mlog(0, "lock %s, new_level = %d, l_blocking = %d\n",
2433 lockres->l_name, new_level, lockres->l_blocking);
2434
2435 lockres->l_action = OCFS2_AST_DOWNCONVERT;
2436 lockres->l_requested = new_level;
2437 lockres_or_flags(lockres, OCFS2_LOCK_BUSY);
2438}
2439
2440static int ocfs2_downconvert_lock(struct ocfs2_super *osb,
2441 struct ocfs2_lock_res *lockres,
2442 int new_level,
2443 int lvb)
2444{
2445 int ret, dlm_flags = LKM_CONVERT;
2446 enum dlm_status status;
2447
2448 mlog_entry_void();
2449
2450 if (lvb)
2451 dlm_flags |= LKM_VALBLK;
2452
2453 status = dlmlock(osb->dlm,
2454 new_level,
2455 &lockres->l_lksb,
2456 dlm_flags,
2457 lockres->l_name,
f0681062 2458 OCFS2_LOCK_ID_MAX_LEN - 1,
e92d57df 2459 ocfs2_locking_ast,
ccd979bd 2460 lockres,
aa2623ad 2461 ocfs2_blocking_ast);
ccd979bd
MF
2462 if (status != DLM_NORMAL) {
2463 ocfs2_log_dlm_error("dlmlock", status, lockres);
2464 ret = -EINVAL;
2465 ocfs2_recover_from_dlm_error(lockres, 1);
2466 goto bail;
2467 }
2468
2469 ret = 0;
2470bail:
2471 mlog_exit(ret);
2472 return ret;
2473}
2474
2475/* returns 1 when the caller should unlock and call dlmunlock */
2476static int ocfs2_prepare_cancel_convert(struct ocfs2_super *osb,
2477 struct ocfs2_lock_res *lockres)
2478{
2479 assert_spin_locked(&lockres->l_lock);
2480
2481 mlog_entry_void();
2482 mlog(0, "lock %s\n", lockres->l_name);
2483
2484 if (lockres->l_unlock_action == OCFS2_UNLOCK_CANCEL_CONVERT) {
2485 /* If we're already trying to cancel a lock conversion
2486 * then just drop the spinlock and allow the caller to
2487 * requeue this lock. */
2488
2489 mlog(0, "Lockres %s, skip convert\n", lockres->l_name);
2490 return 0;
2491 }
2492
2493 /* were we in a convert when we got the bast fire? */
2494 BUG_ON(lockres->l_action != OCFS2_AST_CONVERT &&
2495 lockres->l_action != OCFS2_AST_DOWNCONVERT);
2496 /* set things up for the unlockast to know to just
2497 * clear out the ast_action and unset busy, etc. */
2498 lockres->l_unlock_action = OCFS2_UNLOCK_CANCEL_CONVERT;
2499
2500 mlog_bug_on_msg(!(lockres->l_flags & OCFS2_LOCK_BUSY),
2501 "lock %s, invalid flags: 0x%lx\n",
2502 lockres->l_name, lockres->l_flags);
2503
2504 return 1;
2505}
2506
2507static int ocfs2_cancel_convert(struct ocfs2_super *osb,
2508 struct ocfs2_lock_res *lockres)
2509{
2510 int ret;
2511 enum dlm_status status;
2512
2513 mlog_entry_void();
2514 mlog(0, "lock %s\n", lockres->l_name);
2515
2516 ret = 0;
2517 status = dlmunlock(osb->dlm,
2518 &lockres->l_lksb,
2519 LKM_CANCEL,
2a45f2d1 2520 ocfs2_unlock_ast,
ccd979bd
MF
2521 lockres);
2522 if (status != DLM_NORMAL) {
2523 ocfs2_log_dlm_error("dlmunlock", status, lockres);
2524 ret = -EINVAL;
2525 ocfs2_recover_from_dlm_error(lockres, 0);
2526 }
2527
2528 mlog(0, "lock %s return from dlmunlock\n", lockres->l_name);
2529
ccd979bd
MF
2530 mlog_exit(ret);
2531 return ret;
2532}
2533
b5e500e2
MF
2534static int ocfs2_unblock_lock(struct ocfs2_super *osb,
2535 struct ocfs2_lock_res *lockres,
2536 struct ocfs2_unblock_ctl *ctl)
ccd979bd
MF
2537{
2538 unsigned long flags;
2539 int blocking;
2540 int new_level;
2541 int ret = 0;
5ef0d4ea 2542 int set_lvb = 0;
ccd979bd
MF
2543
2544 mlog_entry_void();
2545
2546 spin_lock_irqsave(&lockres->l_lock, flags);
2547
2548 BUG_ON(!(lockres->l_flags & OCFS2_LOCK_BLOCKED));
2549
2550recheck:
2551 if (lockres->l_flags & OCFS2_LOCK_BUSY) {
d680efe9 2552 ctl->requeue = 1;
ccd979bd
MF
2553 ret = ocfs2_prepare_cancel_convert(osb, lockres);
2554 spin_unlock_irqrestore(&lockres->l_lock, flags);
2555 if (ret) {
2556 ret = ocfs2_cancel_convert(osb, lockres);
2557 if (ret < 0)
2558 mlog_errno(ret);
2559 }
2560 goto leave;
2561 }
2562
2563 /* if we're blocking an exclusive and we have *any* holders,
2564 * then requeue. */
2565 if ((lockres->l_blocking == LKM_EXMODE)
f7fbfdd1
MF
2566 && (lockres->l_ex_holders || lockres->l_ro_holders))
2567 goto leave_requeue;
ccd979bd
MF
2568
2569 /* If it's a PR we're blocking, then only
2570 * requeue if we've got any EX holders */
2571 if (lockres->l_blocking == LKM_PRMODE &&
f7fbfdd1
MF
2572 lockres->l_ex_holders)
2573 goto leave_requeue;
2574
2575 /*
2576 * Can we get a lock in this state if the holder counts are
2577 * zero? The meta data unblock code used to check this.
2578 */
2579 if ((lockres->l_ops->flags & LOCK_TYPE_REQUIRES_REFRESH)
2580 && (lockres->l_flags & OCFS2_LOCK_REFRESHING))
2581 goto leave_requeue;
ccd979bd 2582
16d5b956
MF
2583 new_level = ocfs2_highest_compat_lock_level(lockres->l_blocking);
2584
2585 if (lockres->l_ops->check_downconvert
2586 && !lockres->l_ops->check_downconvert(lockres, new_level))
2587 goto leave_requeue;
2588
ccd979bd
MF
2589 /* If we get here, then we know that there are no more
2590 * incompatible holders (and anyone asking for an incompatible
2591 * lock is blocked). We can now downconvert the lock */
cc567d89 2592 if (!lockres->l_ops->downconvert_worker)
ccd979bd
MF
2593 goto downconvert;
2594
2595 /* Some lockres types want to do a bit of work before
2596 * downconverting a lock. Allow that here. The worker function
2597 * may sleep, so we save off a copy of what we're blocking as
2598 * it may change while we're not holding the spin lock. */
2599 blocking = lockres->l_blocking;
2600 spin_unlock_irqrestore(&lockres->l_lock, flags);
2601
cc567d89 2602 ctl->unblock_action = lockres->l_ops->downconvert_worker(lockres, blocking);
d680efe9
MF
2603
2604 if (ctl->unblock_action == UNBLOCK_STOP_POST)
2605 goto leave;
ccd979bd
MF
2606
2607 spin_lock_irqsave(&lockres->l_lock, flags);
2608 if (blocking != lockres->l_blocking) {
2609 /* If this changed underneath us, then we can't drop
2610 * it just yet. */
2611 goto recheck;
2612 }
2613
2614downconvert:
d680efe9 2615 ctl->requeue = 0;
ccd979bd 2616
5ef0d4ea
MF
2617 if (lockres->l_ops->flags & LOCK_TYPE_USES_LVB) {
2618 if (lockres->l_level == LKM_EXMODE)
2619 set_lvb = 1;
2620
2621 /*
2622 * We only set the lvb if the lock has been fully
2623 * refreshed - otherwise we risk setting stale
2624 * data. Otherwise, there's no need to actually clear
2625 * out the lvb here as it's value is still valid.
2626 */
2627 if (set_lvb && !(lockres->l_flags & OCFS2_LOCK_NEEDS_REFRESH))
2628 lockres->l_ops->set_lvb(lockres);
2629 }
2630
ccd979bd
MF
2631 ocfs2_prepare_downconvert(lockres, new_level);
2632 spin_unlock_irqrestore(&lockres->l_lock, flags);
5ef0d4ea 2633 ret = ocfs2_downconvert_lock(osb, lockres, new_level, set_lvb);
ccd979bd
MF
2634leave:
2635 mlog_exit(ret);
2636 return ret;
f7fbfdd1
MF
2637
2638leave_requeue:
2639 spin_unlock_irqrestore(&lockres->l_lock, flags);
2640 ctl->requeue = 1;
2641
2642 mlog_exit(0);
2643 return 0;
ccd979bd
MF
2644}
2645
d680efe9
MF
2646static int ocfs2_data_convert_worker(struct ocfs2_lock_res *lockres,
2647 int blocking)
ccd979bd
MF
2648{
2649 struct inode *inode;
2650 struct address_space *mapping;
2651
ccd979bd
MF
2652 inode = ocfs2_lock_res_inode(lockres);
2653 mapping = inode->i_mapping;
2654
2655 if (filemap_fdatawrite(mapping)) {
b0697053
MF
2656 mlog(ML_ERROR, "Could not sync inode %llu for downconvert!",
2657 (unsigned long long)OCFS2_I(inode)->ip_blkno);
ccd979bd
MF
2658 }
2659 sync_mapping_buffers(mapping);
2660 if (blocking == LKM_EXMODE) {
2661 truncate_inode_pages(mapping, 0);
2662 unmap_mapping_range(mapping, 0, 0, 0);
2663 } else {
2664 /* We only need to wait on the I/O if we're not also
2665 * truncating pages because truncate_inode_pages waits
2666 * for us above. We don't truncate pages if we're
2667 * blocking anything < EXMODE because we want to keep
2668 * them around in that case. */
2669 filemap_fdatawait(mapping);
2670 }
2671
d680efe9 2672 return UNBLOCK_CONTINUE;
ccd979bd
MF
2673}
2674
810d5aeb
MF
2675static int ocfs2_check_meta_downconvert(struct ocfs2_lock_res *lockres,
2676 int new_level)
2677{
2678 struct inode *inode = ocfs2_lock_res_inode(lockres);
2679 int checkpointed = ocfs2_inode_fully_checkpointed(inode);
2680
2681 BUG_ON(new_level != LKM_NLMODE && new_level != LKM_PRMODE);
2682 BUG_ON(lockres->l_level != LKM_EXMODE && !checkpointed);
2683
2684 if (checkpointed)
2685 return 1;
2686
2687 ocfs2_start_checkpoint(OCFS2_SB(inode->i_sb));
2688 return 0;
2689}
2690
2691static void ocfs2_set_meta_lvb(struct ocfs2_lock_res *lockres)
2692{
2693 struct inode *inode = ocfs2_lock_res_inode(lockres);
2694
2695 __ocfs2_stuff_meta_lvb(inode);
2696}
2697
d680efe9
MF
2698/*
2699 * Does the final reference drop on our dentry lock. Right now this
2700 * happens in the vote thread, but we could choose to simplify the
2701 * dlmglue API and push these off to the ocfs2_wq in the future.
2702 */
2703static void ocfs2_dentry_post_unlock(struct ocfs2_super *osb,
2704 struct ocfs2_lock_res *lockres)
2705{
2706 struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres);
2707 ocfs2_dentry_lock_put(osb, dl);
2708}
2709
2710/*
2711 * d_delete() matching dentries before the lock downconvert.
2712 *
2713 * At this point, any process waiting to destroy the
2714 * dentry_lock due to last ref count is stopped by the
2715 * OCFS2_LOCK_QUEUED flag.
2716 *
2717 * We have two potential problems
2718 *
2719 * 1) If we do the last reference drop on our dentry_lock (via dput)
2720 * we'll wind up in ocfs2_release_dentry_lock(), waiting on
2721 * the downconvert to finish. Instead we take an elevated
2722 * reference and push the drop until after we've completed our
2723 * unblock processing.
2724 *
2725 * 2) There might be another process with a final reference,
2726 * waiting on us to finish processing. If this is the case, we
2727 * detect it and exit out - there's no more dentries anyway.
2728 */
2729static int ocfs2_dentry_convert_worker(struct ocfs2_lock_res *lockres,
2730 int blocking)
2731{
2732 struct ocfs2_dentry_lock *dl = ocfs2_lock_res_dl(lockres);
2733 struct ocfs2_inode_info *oi = OCFS2_I(dl->dl_inode);
2734 struct dentry *dentry;
2735 unsigned long flags;
2736 int extra_ref = 0;
2737
2738 /*
2739 * This node is blocking another node from getting a read
2740 * lock. This happens when we've renamed within a
2741 * directory. We've forced the other nodes to d_delete(), but
2742 * we never actually dropped our lock because it's still
2743 * valid. The downconvert code will retain a PR for this node,
2744 * so there's no further work to do.
2745 */
2746 if (blocking == LKM_PRMODE)
2747 return UNBLOCK_CONTINUE;
2748
2749 /*
2750 * Mark this inode as potentially orphaned. The code in
2751 * ocfs2_delete_inode() will figure out whether it actually
2752 * needs to be freed or not.
2753 */
2754 spin_lock(&oi->ip_lock);
2755 oi->ip_flags |= OCFS2_INODE_MAYBE_ORPHANED;
2756 spin_unlock(&oi->ip_lock);
2757
2758 /*
2759 * Yuck. We need to make sure however that the check of
2760 * OCFS2_LOCK_FREEING and the extra reference are atomic with
2761 * respect to a reference decrement or the setting of that
2762 * flag.
2763 */
2764 spin_lock_irqsave(&lockres->l_lock, flags);
2765 spin_lock(&dentry_attach_lock);
2766 if (!(lockres->l_flags & OCFS2_LOCK_FREEING)
2767 && dl->dl_count) {
2768 dl->dl_count++;
2769 extra_ref = 1;
2770 }
2771 spin_unlock(&dentry_attach_lock);
2772 spin_unlock_irqrestore(&lockres->l_lock, flags);
2773
2774 mlog(0, "extra_ref = %d\n", extra_ref);
2775
2776 /*
2777 * We have a process waiting on us in ocfs2_dentry_iput(),
2778 * which means we can't have any more outstanding
2779 * aliases. There's no need to do any more work.
2780 */
2781 if (!extra_ref)
2782 return UNBLOCK_CONTINUE;
2783
2784 spin_lock(&dentry_attach_lock);
2785 while (1) {
2786 dentry = ocfs2_find_local_alias(dl->dl_inode,
2787 dl->dl_parent_blkno, 1);
2788 if (!dentry)
2789 break;
2790 spin_unlock(&dentry_attach_lock);
2791
2792 mlog(0, "d_delete(%.*s);\n", dentry->d_name.len,
2793 dentry->d_name.name);
2794
2795 /*
2796 * The following dcache calls may do an
2797 * iput(). Normally we don't want that from the
2798 * downconverting thread, but in this case it's ok
2799 * because the requesting node already has an
2800 * exclusive lock on the inode, so it can't be queued
2801 * for a downconvert.
2802 */
2803 d_delete(dentry);
2804 dput(dentry);
2805
2806 spin_lock(&dentry_attach_lock);
2807 }
2808 spin_unlock(&dentry_attach_lock);
2809
2810 /*
2811 * If we are the last holder of this dentry lock, there is no
2812 * reason to downconvert so skip straight to the unlock.
2813 */
2814 if (dl->dl_count == 1)
2815 return UNBLOCK_STOP_POST;
2816
2817 return UNBLOCK_CONTINUE_POST;
2818}
2819
ccd979bd
MF
2820void ocfs2_process_blocked_lock(struct ocfs2_super *osb,
2821 struct ocfs2_lock_res *lockres)
2822{
2823 int status;
d680efe9 2824 struct ocfs2_unblock_ctl ctl = {0, 0,};
ccd979bd
MF
2825 unsigned long flags;
2826
2827 /* Our reference to the lockres in this function can be
2828 * considered valid until we remove the OCFS2_LOCK_QUEUED
2829 * flag. */
2830
2831 mlog_entry_void();
2832
2833 BUG_ON(!lockres);
2834 BUG_ON(!lockres->l_ops);
ccd979bd
MF
2835
2836 mlog(0, "lockres %s blocked.\n", lockres->l_name);
2837
2838 /* Detect whether a lock has been marked as going away while
2839 * the vote thread was processing other things. A lock can
2840 * still be marked with OCFS2_LOCK_FREEING after this check,
2841 * but short circuiting here will still save us some
2842 * performance. */
2843 spin_lock_irqsave(&lockres->l_lock, flags);
2844 if (lockres->l_flags & OCFS2_LOCK_FREEING)
2845 goto unqueue;
2846 spin_unlock_irqrestore(&lockres->l_lock, flags);
2847
b5e500e2 2848 status = ocfs2_unblock_lock(osb, lockres, &ctl);
ccd979bd
MF
2849 if (status < 0)
2850 mlog_errno(status);
2851
2852 spin_lock_irqsave(&lockres->l_lock, flags);
2853unqueue:
d680efe9 2854 if (lockres->l_flags & OCFS2_LOCK_FREEING || !ctl.requeue) {
ccd979bd
MF
2855 lockres_clear_flags(lockres, OCFS2_LOCK_QUEUED);
2856 } else
2857 ocfs2_schedule_blocked_lock(osb, lockres);
2858
2859 mlog(0, "lockres %s, requeue = %s.\n", lockres->l_name,
d680efe9 2860 ctl.requeue ? "yes" : "no");
ccd979bd
MF
2861 spin_unlock_irqrestore(&lockres->l_lock, flags);
2862
d680efe9
MF
2863 if (ctl.unblock_action != UNBLOCK_CONTINUE
2864 && lockres->l_ops->post_unlock)
2865 lockres->l_ops->post_unlock(osb, lockres);
2866
ccd979bd
MF
2867 mlog_exit_void();
2868}
2869
2870static void ocfs2_schedule_blocked_lock(struct ocfs2_super *osb,
2871 struct ocfs2_lock_res *lockres)
2872{
2873 mlog_entry_void();
2874
2875 assert_spin_locked(&lockres->l_lock);
2876
2877 if (lockres->l_flags & OCFS2_LOCK_FREEING) {
2878 /* Do not schedule a lock for downconvert when it's on
2879 * the way to destruction - any nodes wanting access
2880 * to the resource will get it soon. */
2881 mlog(0, "Lockres %s won't be scheduled: flags 0x%lx\n",
2882 lockres->l_name, lockres->l_flags);
2883 return;
2884 }
2885
2886 lockres_or_flags(lockres, OCFS2_LOCK_QUEUED);
2887
2888 spin_lock(&osb->vote_task_lock);
2889 if (list_empty(&lockres->l_blocked_list)) {
2890 list_add_tail(&lockres->l_blocked_list,
2891 &osb->blocked_lock_list);
2892 osb->blocked_lock_count++;
2893 }
2894 spin_unlock(&osb->vote_task_lock);
2895
2896 mlog_exit_void();
2897}
2898
2899/* This aids in debugging situations where a bad LVB might be involved. */
2900void ocfs2_dump_meta_lvb_info(u64 level,
2901 const char *function,
2902 unsigned int line,
2903 struct ocfs2_lock_res *lockres)
2904{
2905 struct ocfs2_meta_lvb *lvb = (struct ocfs2_meta_lvb *) lockres->l_lksb.lvb;
2906
2907 mlog(level, "LVB information for %s (called from %s:%u):\n",
2908 lockres->l_name, function, line);
f9e2d82e
MF
2909 mlog(level, "version: %u, clusters: %u, generation: 0x%x\n",
2910 lvb->lvb_version, be32_to_cpu(lvb->lvb_iclusters),
2911 be32_to_cpu(lvb->lvb_igeneration));
b0697053
MF
2912 mlog(level, "size: %llu, uid %u, gid %u, mode 0x%x\n",
2913 (unsigned long long)be64_to_cpu(lvb->lvb_isize),
2914 be32_to_cpu(lvb->lvb_iuid), be32_to_cpu(lvb->lvb_igid),
2915 be16_to_cpu(lvb->lvb_imode));
2916 mlog(level, "nlink %u, atime_packed 0x%llx, ctime_packed 0x%llx, "
ca4d147e 2917 "mtime_packed 0x%llx iattr 0x%x\n", be16_to_cpu(lvb->lvb_inlink),
b0697053
MF
2918 (long long)be64_to_cpu(lvb->lvb_iatime_packed),
2919 (long long)be64_to_cpu(lvb->lvb_ictime_packed),
ca4d147e
HP
2920 (long long)be64_to_cpu(lvb->lvb_imtime_packed),
2921 be32_to_cpu(lvb->lvb_iattr));
ccd979bd 2922}