spin_unlock(&parent->d_lock);
/* make sure a dentry wasn't dropped while we didn't have parent lock */
- if (!ceph_dir_is_complete(dir)) {
+ if (!ceph_dir_is_complete_ordered(dir)) {
dout(" lost dir complete on %p; falling back to mds\n", dir);
dput(dentry);
err = -EAGAIN;
/* always start with . and .. */
if (ctx->pos == 0) {
- /* note dir version at start of readdir so we can tell
- * if any dentries get dropped */
- fi->dir_release_count = atomic_read(&ci->i_release_count);
-
dout("readdir off 0 -> '.'\n");
if (!dir_emit(ctx, ".", 1,
ceph_translate_ino(inode->i_sb, inode->i_ino),
if ((ctx->pos == 2 || fi->dentry) &&
!ceph_test_mount_opt(fsc, NOASYNCREADDIR) &&
ceph_snap(inode) != CEPH_SNAPDIR &&
- __ceph_dir_is_complete(ci) &&
+ __ceph_dir_is_complete_ordered(ci) &&
__ceph_caps_issued_mask(ci, CEPH_CAP_FILE_SHARED, 1)) {
u32 shared_gen = ci->i_shared_gen;
spin_unlock(&ci->i_ceph_lock);
/* proceed with a normal readdir */
+ if (ctx->pos == 2) {
+ /* note dir version at start of readdir so we can tell
+ * if any dentries get dropped */
+ fi->dir_release_count = atomic_read(&ci->i_release_count);
+ fi->dir_ordered_count = ci->i_ordered_count;
+ }
+
more:
/* do we have the correct frag content buffered? */
if (fi->frag != frag || fi->last_readdir == NULL) {
*/
spin_lock(&ci->i_ceph_lock);
if (atomic_read(&ci->i_release_count) == fi->dir_release_count) {
- dout(" marking %p complete\n", inode);
- __ceph_dir_set_complete(ci, fi->dir_release_count);
+ if (ci->i_ordered_count == fi->dir_ordered_count)
+ dout(" marking %p complete and ordered\n", inode);
+ else
+ dout(" marking %p complete\n", inode);
+ __ceph_dir_set_complete(ci, fi->dir_release_count,
+ fi->dir_ordered_count);
}
spin_unlock(&ci->i_ceph_lock);
ci->i_version = 0;
ci->i_time_warp_seq = 0;
ci->i_ceph_flags = 0;
+ ci->i_ordered_count = 0;
atomic_set(&ci->i_release_count, 1);
atomic_set(&ci->i_complete_count, 0);
ci->i_symlink = NULL;
(issued & CEPH_CAP_FILE_EXCL) == 0 &&
!__ceph_dir_is_complete(ci)) {
dout(" marking %p complete (empty)\n", inode);
- __ceph_dir_set_complete(ci, atomic_read(&ci->i_release_count));
+ __ceph_dir_set_complete(ci, atomic_read(&ci->i_release_count),
+ ci->i_ordered_count);
}
/* were we issued a capability? */
ceph_invalidate_dentry_lease(dn);
/* d_move screws up sibling dentries' offsets */
- ceph_dir_clear_complete(dir);
- ceph_dir_clear_complete(olddir);
+ ceph_dir_clear_ordered(dir);
+ ceph_dir_clear_ordered(olddir);
dout("dn %p gets new offset %lld\n", req->r_old_dentry,
ceph_dentry(req->r_old_dentry)->offset);
if (!rinfo->head->is_target) {
dout("fill_trace null dentry\n");
if (dn->d_inode) {
+ ceph_dir_clear_ordered(dir);
dout("d_delete %p\n", dn);
d_delete(dn);
} else {
/* attach proper inode */
if (!dn->d_inode) {
- ceph_dir_clear_complete(dir);
+ ceph_dir_clear_ordered(dir);
ihold(in);
dn = splice_dentry(dn, in, &have_lease);
if (IS_ERR(dn)) {
BUG_ON(!dir);
BUG_ON(ceph_snap(dir) != CEPH_SNAPDIR);
dout(" linking snapped dir %p to dn %p\n", in, dn);
- ceph_dir_clear_complete(dir);
+ ceph_dir_clear_ordered(dir);
ihold(in);
dn = splice_dentry(dn, in, NULL);
if (IS_ERR(dn)) {
u32 i_time_warp_seq;
unsigned i_ceph_flags;
+ int i_ordered_count;
atomic_t i_release_count;
atomic_t i_complete_count;
/*
* Ceph inode.
*/
-#define CEPH_I_NODELAY 4 /* do not delay cap release */
-#define CEPH_I_FLUSH 8 /* do not delay flush of dirty metadata */
-#define CEPH_I_NOFLUSH 16 /* do not flush dirty caps */
+#define CEPH_I_DIR_ORDERED 1 /* dentries in dir are ordered */
+#define CEPH_I_NODELAY 4 /* do not delay cap release */
+#define CEPH_I_FLUSH 8 /* do not delay flush of dirty metadata */
+#define CEPH_I_NOFLUSH 16 /* do not flush dirty caps */
static inline void __ceph_dir_set_complete(struct ceph_inode_info *ci,
- int release_count)
+ int release_count, int ordered_count)
{
atomic_set(&ci->i_complete_count, release_count);
+ if (ci->i_ordered_count == ordered_count)
+ ci->i_ceph_flags |= CEPH_I_DIR_ORDERED;
+ else
+ ci->i_ceph_flags &= ~CEPH_I_DIR_ORDERED;
}
static inline void __ceph_dir_clear_complete(struct ceph_inode_info *ci)
atomic_read(&ci->i_release_count);
}
+static inline bool __ceph_dir_is_complete_ordered(struct ceph_inode_info *ci)
+{
+ return __ceph_dir_is_complete(ci) &&
+ (ci->i_ceph_flags & CEPH_I_DIR_ORDERED);
+}
+
static inline void ceph_dir_clear_complete(struct inode *inode)
{
__ceph_dir_clear_complete(ceph_inode(inode));
}
-static inline bool ceph_dir_is_complete(struct inode *inode)
+static inline void ceph_dir_clear_ordered(struct inode *inode)
{
- return __ceph_dir_is_complete(ceph_inode(inode));
+ struct ceph_inode_info *ci = ceph_inode(inode);
+ spin_lock(&ci->i_ceph_lock);
+ ci->i_ordered_count++;
+ ci->i_ceph_flags &= ~CEPH_I_DIR_ORDERED;
+ spin_unlock(&ci->i_ceph_lock);
}
+static inline bool ceph_dir_is_complete_ordered(struct inode *inode)
+{
+ struct ceph_inode_info *ci = ceph_inode(inode);
+ bool ret;
+ spin_lock(&ci->i_ceph_lock);
+ ret = __ceph_dir_is_complete_ordered(ci);
+ spin_unlock(&ci->i_ceph_lock);
+ return ret;
+}
/* find a specific frag @f */
extern struct ceph_inode_frag *__ceph_find_frag(struct ceph_inode_info *ci,
char *last_name; /* last entry in previous chunk */
struct dentry *dentry; /* next dentry (for dcache readdir) */
int dir_release_count;
+ int dir_ordered_count;
/* used for -o dirstat read() on directory thing */
char *dir_info;