ceph: do caps accounting per mds_client
authorYehuda Sadeh <yehuda@hq.newdream.net>
Thu, 17 Jun 2010 23:16:12 +0000 (16:16 -0700)
committerSage Weil <sage@newdream.net>
Mon, 2 Aug 2010 03:11:40 +0000 (20:11 -0700)
Caps related accounting is now being done per mds client instead
of just being global. This prepares ground work for a later revision
of the caps preallocated reservation list.

Signed-off-by: Yehuda Sadeh <yehuda@hq.newdream.net>
Signed-off-by: Sage Weil <sage@newdream.net>
fs/ceph/caps.c
fs/ceph/mds_client.c
fs/ceph/mds_client.h
fs/ceph/super.c
fs/ceph/super.h

index d992880d21d4264288ac411c7ff38e14e322950a..47068b10baf833a1cc70c256a937bcc9128e8998 100644 (file)
@@ -113,58 +113,41 @@ const char *ceph_cap_string(int caps)
        return cap_str[i];
 }
 
-/*
- * Cap reservations
- *
- * Maintain a global pool of preallocated struct ceph_caps, referenced
- * by struct ceph_caps_reservations.  This ensures that we preallocate
- * memory needed to successfully process an MDS response.  (If an MDS
- * sends us cap information and we fail to process it, we will have
- * problems due to the client and MDS being out of sync.)
- *
- * Reservations are 'owned' by a ceph_cap_reservation context.
- */
-static spinlock_t caps_list_lock;
-static struct list_head caps_list;  /* unused (reserved or unreserved) */
-static int caps_total_count;        /* total caps allocated */
-static int caps_use_count;          /* in use */
-static int caps_reserve_count;      /* unused, reserved */
-static int caps_avail_count;        /* unused, unreserved */
-static int caps_min_count;          /* keep at least this many (unreserved) */
-
-void __init ceph_caps_init(void)
+void ceph_caps_init(struct ceph_mds_client *mdsc)
 {
-       INIT_LIST_HEAD(&caps_list);
-       spin_lock_init(&caps_list_lock);
+       INIT_LIST_HEAD(&mdsc->caps_list);
+       spin_lock_init(&mdsc->caps_list_lock);
 }
 
-void ceph_caps_finalize(void)
+void ceph_caps_finalize(struct ceph_mds_client *mdsc)
 {
        struct ceph_cap *cap;
 
-       spin_lock(&caps_list_lock);
-       while (!list_empty(&caps_list)) {
-               cap = list_first_entry(&caps_list, struct ceph_cap, caps_item);
+       spin_lock(&mdsc->caps_list_lock);
+       while (!list_empty(&mdsc->caps_list)) {
+               cap = list_first_entry(&mdsc->caps_list,
+                                      struct ceph_cap, caps_item);
                list_del(&cap->caps_item);
                kmem_cache_free(ceph_cap_cachep, cap);
        }
-       caps_total_count = 0;
-       caps_avail_count = 0;
-       caps_use_count = 0;
-       caps_reserve_count = 0;
-       caps_min_count = 0;
-       spin_unlock(&caps_list_lock);
+       mdsc->caps_total_count = 0;
+       mdsc->caps_avail_count = 0;
+       mdsc->caps_use_count = 0;
+       mdsc->caps_reserve_count = 0;
+       mdsc->caps_min_count = 0;
+       spin_unlock(&mdsc->caps_list_lock);
 }
 
-void ceph_adjust_min_caps(int delta)
+void ceph_adjust_min_caps(struct ceph_mds_client *mdsc, int delta)
 {
-       spin_lock(&caps_list_lock);
-       caps_min_count += delta;
-       BUG_ON(caps_min_count < 0);
-       spin_unlock(&caps_list_lock);
+       spin_lock(&mdsc->caps_list_lock);
+       mdsc->caps_min_count += delta;
+       BUG_ON(mdsc->caps_min_count < 0);
+       spin_unlock(&mdsc->caps_list_lock);
 }
 
-int ceph_reserve_caps(struct ceph_cap_reservation *ctx, int need)
+int ceph_reserve_caps(struct ceph_mds_client *mdsc,
+                     struct ceph_cap_reservation *ctx, int need)
 {
        int i;
        struct ceph_cap *cap;
@@ -176,16 +159,17 @@ int ceph_reserve_caps(struct ceph_cap_reservation *ctx, int need)
        dout("reserve caps ctx=%p need=%d\n", ctx, need);
 
        /* first reserve any caps that are already allocated */
-       spin_lock(&caps_list_lock);
-       if (caps_avail_count >= need)
+       spin_lock(&mdsc->caps_list_lock);
+       if (mdsc->caps_avail_count >= need)
                have = need;
        else
-               have = caps_avail_count;
-       caps_avail_count -= have;
-       caps_reserve_count += have;
-       BUG_ON(caps_total_count != caps_use_count + caps_reserve_count +
-              caps_avail_count);
-       spin_unlock(&caps_list_lock);
+               have = mdsc->caps_avail_count;
+       mdsc->caps_avail_count -= have;
+       mdsc->caps_reserve_count += have;
+       BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
+                                        mdsc->caps_reserve_count +
+                                        mdsc->caps_avail_count);
+       spin_unlock(&mdsc->caps_list_lock);
 
        for (i = have; i < need; i++) {
                cap = kmem_cache_alloc(ceph_cap_cachep, GFP_NOFS);
@@ -198,19 +182,20 @@ int ceph_reserve_caps(struct ceph_cap_reservation *ctx, int need)
        }
        BUG_ON(have + alloc != need);
 
-       spin_lock(&caps_list_lock);
-       caps_total_count += alloc;
-       caps_reserve_count += alloc;
-       list_splice(&newcaps, &caps_list);
+       spin_lock(&mdsc->caps_list_lock);
+       mdsc->caps_total_count += alloc;
+       mdsc->caps_reserve_count += alloc;
+       list_splice(&newcaps, &mdsc->caps_list);
 
-       BUG_ON(caps_total_count != caps_use_count + caps_reserve_count +
-              caps_avail_count);
-       spin_unlock(&caps_list_lock);
+       BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
+                                        mdsc->caps_reserve_count +
+                                        mdsc->caps_avail_count);
+       spin_unlock(&mdsc->caps_list_lock);
 
        ctx->count = need;
        dout("reserve caps ctx=%p %d = %d used + %d resv + %d avail\n",
-            ctx, caps_total_count, caps_use_count, caps_reserve_count,
-            caps_avail_count);
+            ctx, mdsc->caps_total_count, mdsc->caps_use_count,
+            mdsc->caps_reserve_count, mdsc->caps_avail_count);
        return 0;
 
 out_alloc_count:
@@ -220,26 +205,29 @@ out_alloc_count:
        return ret;
 }
 
-int ceph_unreserve_caps(struct ceph_cap_reservation *ctx)
+int ceph_unreserve_caps(struct ceph_mds_client *mdsc,
+                       struct ceph_cap_reservation *ctx)
 {
        dout("unreserve caps ctx=%p count=%d\n", ctx, ctx->count);
        if (ctx->count) {
-               spin_lock(&caps_list_lock);
-               BUG_ON(caps_reserve_count < ctx->count);
-               caps_reserve_count -= ctx->count;
-               caps_avail_count += ctx->count;
+               spin_lock(&mdsc->caps_list_lock);
+               BUG_ON(mdsc->caps_reserve_count < ctx->count);
+               mdsc->caps_reserve_count -= ctx->count;
+               mdsc->caps_avail_count += ctx->count;
                ctx->count = 0;
                dout("unreserve caps %d = %d used + %d resv + %d avail\n",
-                    caps_total_count, caps_use_count, caps_reserve_count,
-                    caps_avail_count);
-               BUG_ON(caps_total_count != caps_use_count + caps_reserve_count +
-                      caps_avail_count);
-               spin_unlock(&caps_list_lock);
+                    mdsc->caps_total_count, mdsc->caps_use_count,
+                    mdsc->caps_reserve_count, mdsc->caps_avail_count);
+               BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
+                                                mdsc->caps_reserve_count +
+                                                mdsc->caps_avail_count);
+               spin_unlock(&mdsc->caps_list_lock);
        }
        return 0;
 }
 
-static struct ceph_cap *get_cap(struct ceph_cap_reservation *ctx)
+static struct ceph_cap *get_cap(struct ceph_mds_client *mdsc,
+                               struct ceph_cap_reservation *ctx)
 {
        struct ceph_cap *cap = NULL;
 
@@ -247,71 +235,74 @@ static struct ceph_cap *get_cap(struct ceph_cap_reservation *ctx)
        if (!ctx) {
                cap = kmem_cache_alloc(ceph_cap_cachep, GFP_NOFS);
                if (cap) {
-                       caps_use_count++;
-                       caps_total_count++;
+                       mdsc->caps_use_count++;
+                       mdsc->caps_total_count++;
                }
                return cap;
        }
 
-       spin_lock(&caps_list_lock);
+       spin_lock(&mdsc->caps_list_lock);
        dout("get_cap ctx=%p (%d) %d = %d used + %d resv + %d avail\n",
-            ctx, ctx->count, caps_total_count, caps_use_count,
-            caps_reserve_count, caps_avail_count);
+            ctx, ctx->count, mdsc->caps_total_count, mdsc->caps_use_count,
+            mdsc->caps_reserve_count, mdsc->caps_avail_count);
        BUG_ON(!ctx->count);
-       BUG_ON(ctx->count > caps_reserve_count);
-       BUG_ON(list_empty(&caps_list));
+       BUG_ON(ctx->count > mdsc->caps_reserve_count);
+       BUG_ON(list_empty(&mdsc->caps_list));
 
        ctx->count--;
-       caps_reserve_count--;
-       caps_use_count++;
+       mdsc->caps_reserve_count--;
+       mdsc->caps_use_count++;
 
-       cap = list_first_entry(&caps_list, struct ceph_cap, caps_item);
+       cap = list_first_entry(&mdsc->caps_list, struct ceph_cap, caps_item);
        list_del(&cap->caps_item);
 
-       BUG_ON(caps_total_count != caps_use_count + caps_reserve_count +
-              caps_avail_count);
-       spin_unlock(&caps_list_lock);
+       BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
+              mdsc->caps_reserve_count + mdsc->caps_avail_count);
+       spin_unlock(&mdsc->caps_list_lock);
        return cap;
 }
 
-void ceph_put_cap(struct ceph_cap *cap)
+void ceph_put_cap(struct ceph_mds_client *mdsc, struct ceph_cap *cap)
 {
-       spin_lock(&caps_list_lock);
+       spin_lock(&mdsc->caps_list_lock);
        dout("put_cap %p %d = %d used + %d resv + %d avail\n",
-            cap, caps_total_count, caps_use_count,
-            caps_reserve_count, caps_avail_count);
-       caps_use_count--;
+            cap, mdsc->caps_total_count, mdsc->caps_use_count,
+            mdsc->caps_reserve_count, mdsc->caps_avail_count);
+       mdsc->caps_use_count--;
        /*
         * Keep some preallocated caps around (ceph_min_count), to
         * avoid lots of free/alloc churn.
         */
-       if (caps_avail_count >= caps_reserve_count + caps_min_count) {
-               caps_total_count--;
+       if (mdsc->caps_avail_count >= mdsc->caps_reserve_count +
+                                     mdsc->caps_min_count) {
+               mdsc->caps_total_count--;
                kmem_cache_free(ceph_cap_cachep, cap);
        } else {
-               caps_avail_count++;
-               list_add(&cap->caps_item, &caps_list);
+               mdsc->caps_avail_count++;
+               list_add(&cap->caps_item, &mdsc->caps_list);
        }
 
-       BUG_ON(caps_total_count != caps_use_count + caps_reserve_count +
-              caps_avail_count);
-       spin_unlock(&caps_list_lock);
+       BUG_ON(mdsc->caps_total_count != mdsc->caps_use_count +
+              mdsc->caps_reserve_count + mdsc->caps_avail_count);
+       spin_unlock(&mdsc->caps_list_lock);
 }
 
 void ceph_reservation_status(struct ceph_client *client,
                             int *total, int *avail, int *used, int *reserved,
                             int *min)
 {
+       struct ceph_mds_client *mdsc = &client->mdsc;
+
        if (total)
-               *total = caps_total_count;
+               *total = mdsc->caps_total_count;
        if (avail)
-               *avail = caps_avail_count;
+               *avail = mdsc->caps_avail_count;
        if (used)
-               *used = caps_use_count;
+               *used = mdsc->caps_use_count;
        if (reserved)
-               *reserved = caps_reserve_count;
+               *reserved = mdsc->caps_reserve_count;
        if (min)
-               *min = caps_min_count;
+               *min = mdsc->caps_min_count;
 }
 
 /*
@@ -540,7 +531,7 @@ retry:
                        new_cap = NULL;
                } else {
                        spin_unlock(&inode->i_lock);
-                       new_cap = get_cap(caps_reservation);
+                       new_cap = get_cap(mdsc, caps_reservation);
                        if (new_cap == NULL)
                                return -ENOMEM;
                        goto retry;
@@ -898,7 +889,7 @@ void __ceph_remove_cap(struct ceph_cap *cap)
                ci->i_auth_cap = NULL;
 
        if (removed)
-               ceph_put_cap(cap);
+               ceph_put_cap(mdsc, cap);
 
        if (!__ceph_is_any_caps(ci) && ci->i_snap_realm) {
                struct ceph_snap_realm *realm = ci->i_snap_realm;
index 6e40db2a00141a8a4fc3b8360b2c0793e64d0b85..641a8a37e7b3f790304a398fd44f6e3d5b964053 100644 (file)
@@ -449,7 +449,7 @@ void ceph_mdsc_release_request(struct kref *kref)
        kfree(req->r_path1);
        kfree(req->r_path2);
        put_request_session(req);
-       ceph_unreserve_caps(&req->r_caps_reservation);
+       ceph_unreserve_caps(req->r_mdsc, &req->r_caps_reservation);
        kfree(req);
 }
 
@@ -512,7 +512,8 @@ static void __register_request(struct ceph_mds_client *mdsc,
 {
        req->r_tid = ++mdsc->last_tid;
        if (req->r_num_caps)
-               ceph_reserve_caps(&req->r_caps_reservation, req->r_num_caps);
+               ceph_reserve_caps(mdsc, &req->r_caps_reservation,
+                                 req->r_num_caps);
        dout("__register_request %p tid %lld\n", req, req->r_tid);
        ceph_mdsc_get_request(req);
        __insert_request(mdsc, req);
@@ -764,7 +765,7 @@ static int iterate_session_caps(struct ceph_mds_session *session,
                        last_inode = NULL;
                }
                if (old_cap) {
-                       ceph_put_cap(old_cap);
+                       ceph_put_cap(session->s_mdsc, old_cap);
                        old_cap = NULL;
                }
 
@@ -793,7 +794,7 @@ out:
        if (last_inode)
                iput(last_inode);
        if (old_cap)
-               ceph_put_cap(old_cap);
+               ceph_put_cap(session->s_mdsc, old_cap);
 
        return ret;
 }
@@ -1251,6 +1252,7 @@ ceph_mdsc_create_request(struct ceph_mds_client *mdsc, int op, int mode)
                return ERR_PTR(-ENOMEM);
 
        mutex_init(&req->r_fill_mutex);
+       req->r_mdsc = mdsc;
        req->r_started = jiffies;
        req->r_resend_mds = -1;
        INIT_LIST_HEAD(&req->r_unsafe_dir_item);
@@ -1986,7 +1988,7 @@ static void handle_reply(struct ceph_mds_session *session, struct ceph_msg *msg)
        if (err == 0) {
                if (result == 0 && rinfo->dir_nr)
                        ceph_readdir_prepopulate(req, req->r_session);
-               ceph_unreserve_caps(&req->r_caps_reservation);
+               ceph_unreserve_caps(mdsc, &req->r_caps_reservation);
        }
        mutex_unlock(&req->r_fill_mutex);
 
@@ -2767,6 +2769,9 @@ int ceph_mdsc_init(struct ceph_mds_client *mdsc, struct ceph_client *client)
        spin_lock_init(&mdsc->dentry_lru_lock);
        INIT_LIST_HEAD(&mdsc->dentry_lru);
 
+       ceph_caps_init(mdsc);
+       ceph_adjust_min_caps(mdsc, client->min_caps);
+
        return 0;
 }
 
@@ -2962,6 +2967,7 @@ void ceph_mdsc_stop(struct ceph_mds_client *mdsc)
        if (mdsc->mdsmap)
                ceph_mdsmap_destroy(mdsc->mdsmap);
        kfree(mdsc->sessions);
+       ceph_caps_finalize(mdsc);
 }
 
 
index e389902db13148bf3938499ca6e6ca5b10082830..8f2126321f2d2ab6d5baac7a81b46537a7a615b8 100644 (file)
@@ -151,6 +151,7 @@ typedef void (*ceph_mds_request_callback_t) (struct ceph_mds_client *mdsc,
 struct ceph_mds_request {
        u64 r_tid;                   /* transaction id */
        struct rb_node r_node;
+       struct ceph_mds_client *r_mdsc;
 
        int r_op;                    /* mds op code */
        int r_mds;
@@ -267,6 +268,27 @@ struct ceph_mds_client {
        spinlock_t        cap_dirty_lock;   /* protects above items */
        wait_queue_head_t cap_flushing_wq;
 
+       /*
+        * Cap reservations
+        *
+        * Maintain a global pool of preallocated struct ceph_caps, referenced
+        * by struct ceph_caps_reservations.  This ensures that we preallocate
+        * memory needed to successfully process an MDS response.  (If an MDS
+        * sends us cap information and we fail to process it, we will have
+        * problems due to the client and MDS being out of sync.)
+        *
+        * Reservations are 'owned' by a ceph_cap_reservation context.
+        */
+       spinlock_t      caps_list_lock;
+       struct          list_head caps_list; /* unused (reserved or
+                                               unreserved) */
+       int             caps_total_count;    /* total caps allocated */
+       int             caps_use_count;      /* in use */
+       int             caps_reserve_count;  /* unused, reserved */
+       int             caps_avail_count;    /* unused, unreserved */
+       int             caps_min_count;      /* keep at least this many
+                                               (unreserved) */
+
 #ifdef CONFIG_DEBUG_FS
        struct dentry     *debugfs_file;
 #endif
index fa87f51e38e112a38f8a95d87ac4c96e02e7d7ef..1a0bb4863a5d9feff183006b9b467d4908223595 100644 (file)
@@ -630,7 +630,6 @@ static struct ceph_client *ceph_create_client(struct ceph_mount_args *args)
 
        /* caps */
        client->min_caps = args->max_readdir;
-       ceph_adjust_min_caps(client->min_caps);
 
        /* subsystems */
        err = ceph_monc_init(&client->monc, client);
@@ -680,8 +679,6 @@ static void ceph_destroy_client(struct ceph_client *client)
 
        ceph_monc_stop(&client->monc);
 
-       ceph_adjust_min_caps(-client->min_caps);
-
        ceph_debugfs_client_cleanup(client);
        destroy_workqueue(client->wb_wq);
        destroy_workqueue(client->pg_inv_wq);
@@ -1043,8 +1040,6 @@ static int __init init_ceph(void)
        if (ret)
                goto out_msgr;
 
-       ceph_caps_init();
-
        ret = register_filesystem(&ceph_fs_type);
        if (ret)
                goto out_icache;
@@ -1069,7 +1064,6 @@ static void __exit exit_ceph(void)
 {
        dout("exit_ceph\n");
        unregister_filesystem(&ceph_fs_type);
-       ceph_caps_finalize();
        destroy_caches();
        ceph_msgr_exit();
        ceph_debugfs_cleanup();
index 10a4a406e887506b4d104e8052f9dd50b07fba5f..44d10cb0aeca5ea8561b6f3eeb9f804e35b30401 100644 (file)
@@ -560,11 +560,13 @@ static inline int __ceph_caps_wanted(struct ceph_inode_info *ci)
 /* what the mds thinks we want */
 extern int __ceph_caps_mds_wanted(struct ceph_inode_info *ci);
 
-extern void ceph_caps_init(void);
-extern void ceph_caps_finalize(void);
-extern void ceph_adjust_min_caps(int delta);
-extern int ceph_reserve_caps(struct ceph_cap_reservation *ctx, int need);
-extern int ceph_unreserve_caps(struct ceph_cap_reservation *ctx);
+extern void ceph_caps_init(struct ceph_mds_client *mdsc);
+extern void ceph_caps_finalize(struct ceph_mds_client *mdsc);
+extern void ceph_adjust_min_caps(struct ceph_mds_client *mdsc, int delta);
+extern int ceph_reserve_caps(struct ceph_mds_client *mdsc,
+                            struct ceph_cap_reservation *ctx, int need);
+extern int ceph_unreserve_caps(struct ceph_mds_client *mdsc,
+                              struct ceph_cap_reservation *ctx);
 extern void ceph_reservation_status(struct ceph_client *client,
                                    int *total, int *avail, int *used,
                                    int *reserved, int *min);
@@ -806,7 +808,8 @@ static inline void ceph_remove_cap(struct ceph_cap *cap)
        __ceph_remove_cap(cap);
        spin_unlock(&inode->i_lock);
 }
-extern void ceph_put_cap(struct ceph_cap *cap);
+extern void ceph_put_cap(struct ceph_mds_client *mdsc,
+                        struct ceph_cap *cap);
 
 extern void ceph_queue_caps_release(struct inode *inode);
 extern int ceph_write_inode(struct inode *inode, struct writeback_control *wbc);