staging: zcache: support multiple clients, prep for KVM and RAMster
authorDan Magenheimer <dan.magenheimer@oracle.com>
Thu, 7 Jul 2011 14:37:19 +0000 (07:37 -0700)
committerGreg Kroah-Hartman <gregkh@suse.de>
Fri, 8 Jul 2011 21:18:53 +0000 (14:18 -0700)
This is version 3 of an update to zcache, incorporating feedback from the list.
This patch adds support to the in-kernel transcendent memory ("tmem") code
and the zcache driver for multiple clients, which will be needed for both
RAMster and KVM support.  It also adds additional tmem callbacks to support
RAMster and corresponding no-op stubs in the zcache driver.  In v2, I've
also taken the liberty of adding some additional sysfs variables to
both surface information and allow policy control.  Those experimenting
with zcache should find them useful.  V3 clarifies some code walking
and declaring arrays.

Signed-off-by: Dan Magenheimer <dan.magenheimer@oracle.com>
[v3: error27@gmail.com: fix array bounds/walking]
[v2: konrad.wilk@oracle.com: fix bools, add check for NULL, fix a comment]
[v2: sjenning@linux.vnet.ibm.com: add info/tunables for poor compression]
[v2: marcusklemm@googlemail.com: add tunable for max persistent pages]
Acked-by: Dan Carpenter <error27@gmail.com>
Cc: Nitin Gupta <ngupta@vflare.org>
Cc: linux-mm@kvack.org
Cc: kvm@vger.kernel.org
Signed-off-by: Greg Kroah-Hartman <gregkh@suse.de>
drivers/staging/zcache/tmem.c
drivers/staging/zcache/tmem.h
drivers/staging/zcache/zcache.c

index e954d405b1382553fa25feb61d04aa22d91cf2d0..975e34bcd722aecf4ed753a9b86ad25e90c5d19c 100644 (file)
@@ -142,6 +142,7 @@ static void tmem_obj_init(struct tmem_obj *obj, struct tmem_hashbucket *hb,
        obj->oid = *oidp;
        obj->objnode_count = 0;
        obj->pampd_count = 0;
+       (*tmem_pamops.new_obj)(obj);
        SET_SENTINEL(obj, OBJ);
        while (*new) {
                BUG_ON(RB_EMPTY_NODE(*new));
@@ -274,7 +275,7 @@ static void tmem_objnode_free(struct tmem_objnode *objnode)
 /*
  * lookup index in object and return associated pampd (or NULL if not found)
  */
-static void *tmem_pampd_lookup_in_obj(struct tmem_obj *obj, uint32_t index)
+static void **__tmem_pampd_lookup_in_obj(struct tmem_obj *obj, uint32_t index)
 {
        unsigned int height, shift;
        struct tmem_objnode **slot = NULL;
@@ -303,9 +304,33 @@ static void *tmem_pampd_lookup_in_obj(struct tmem_obj *obj, uint32_t index)
                height--;
        }
 out:
+       return slot != NULL ? (void **)slot : NULL;
+}
+
+static void *tmem_pampd_lookup_in_obj(struct tmem_obj *obj, uint32_t index)
+{
+       struct tmem_objnode **slot;
+
+       slot = (struct tmem_objnode **)__tmem_pampd_lookup_in_obj(obj, index);
        return slot != NULL ? *slot : NULL;
 }
 
+static void *tmem_pampd_replace_in_obj(struct tmem_obj *obj, uint32_t index,
+                                       void *new_pampd)
+{
+       struct tmem_objnode **slot;
+       void *ret = NULL;
+
+       slot = (struct tmem_objnode **)__tmem_pampd_lookup_in_obj(obj, index);
+       if ((slot != NULL) && (*slot != NULL)) {
+               void *old_pampd = *(void **)slot;
+               *(void **)slot = new_pampd;
+               (*tmem_pamops.free)(old_pampd, obj->pool, NULL, 0);
+               ret = new_pampd;
+       }
+       return ret;
+}
+
 static int tmem_pampd_add_to_obj(struct tmem_obj *obj, uint32_t index,
                                        void *pampd)
 {
@@ -456,7 +481,7 @@ static void tmem_objnode_node_destroy(struct tmem_obj *obj,
                        if (ht == 1) {
                                obj->pampd_count--;
                                (*tmem_pamops.free)(objnode->slots[i],
-                                                               obj->pool);
+                                               obj->pool, NULL, 0);
                                objnode->slots[i] = NULL;
                                continue;
                        }
@@ -473,7 +498,7 @@ static void tmem_pampd_destroy_all_in_obj(struct tmem_obj *obj)
                return;
        if (obj->objnode_tree_height == 0) {
                obj->pampd_count--;
-               (*tmem_pamops.free)(obj->objnode_tree_root, obj->pool);
+               (*tmem_pamops.free)(obj->objnode_tree_root, obj->pool, NULL, 0);
        } else {
                tmem_objnode_node_destroy(obj, obj->objnode_tree_root,
                                        obj->objnode_tree_height);
@@ -481,6 +506,7 @@ static void tmem_pampd_destroy_all_in_obj(struct tmem_obj *obj)
                obj->objnode_tree_height = 0;
        }
        obj->objnode_tree_root = NULL;
+       (*tmem_pamops.free_obj)(obj->pool, obj);
 }
 
 /*
@@ -503,15 +529,13 @@ static void tmem_pampd_destroy_all_in_obj(struct tmem_obj *obj)
  * always flushes for simplicity.
  */
 int tmem_put(struct tmem_pool *pool, struct tmem_oid *oidp, uint32_t index,
-               struct page *page)
+               char *data, size_t size, bool raw, bool ephemeral)
 {
        struct tmem_obj *obj = NULL, *objfound = NULL, *objnew = NULL;
        void *pampd = NULL, *pampd_del = NULL;
        int ret = -ENOMEM;
-       bool ephemeral;
        struct tmem_hashbucket *hb;
 
-       ephemeral = is_ephemeral(pool);
        hb = &pool->hashbucket[tmem_oid_hash(oidp)];
        spin_lock(&hb->lock);
        obj = objfound = tmem_obj_find(hb, oidp);
@@ -521,7 +545,7 @@ int tmem_put(struct tmem_pool *pool, struct tmem_oid *oidp, uint32_t index,
                        /* if found, is a dup put, flush the old one */
                        pampd_del = tmem_pampd_delete_from_obj(obj, index);
                        BUG_ON(pampd_del != pampd);
-                       (*tmem_pamops.free)(pampd, pool);
+                       (*tmem_pamops.free)(pampd, pool, oidp, index);
                        if (obj->pampd_count == 0) {
                                objnew = obj;
                                objfound = NULL;
@@ -538,7 +562,8 @@ int tmem_put(struct tmem_pool *pool, struct tmem_oid *oidp, uint32_t index,
        }
        BUG_ON(obj == NULL);
        BUG_ON(((objnew != obj) && (objfound != obj)) || (objnew == objfound));
-       pampd = (*tmem_pamops.create)(obj->pool, &obj->oid, index, page);
+       pampd = (*tmem_pamops.create)(data, size, raw, ephemeral,
+                                       obj->pool, &obj->oid, index);
        if (unlikely(pampd == NULL))
                goto free;
        ret = tmem_pampd_add_to_obj(obj, index, pampd);
@@ -551,7 +576,7 @@ delete_and_free:
        (void)tmem_pampd_delete_from_obj(obj, index);
 free:
        if (pampd)
-               (*tmem_pamops.free)(pampd, pool);
+               (*tmem_pamops.free)(pampd, pool, NULL, 0);
        if (objnew) {
                tmem_obj_free(objnew, hb);
                (*tmem_hostops.obj_free)(objnew, pool);
@@ -573,41 +598,52 @@ out:
  * "put" done with the same handle).
 
  */
-int tmem_get(struct tmem_pool *pool, struct tmem_oid *oidp,
-                               uint32_t index, struct page *page)
+int tmem_get(struct tmem_pool *pool, struct tmem_oid *oidp, uint32_t index,
+               char *data, size_t *size, bool raw, int get_and_free)
 {
        struct tmem_obj *obj;
        void *pampd;
        bool ephemeral = is_ephemeral(pool);
        uint32_t ret = -1;
        struct tmem_hashbucket *hb;
+       bool free = (get_and_free == 1) || ((get_and_free == 0) && ephemeral);
+       bool lock_held = false;
 
        hb = &pool->hashbucket[tmem_oid_hash(oidp)];
        spin_lock(&hb->lock);
+       lock_held = true;
        obj = tmem_obj_find(hb, oidp);
        if (obj == NULL)
                goto out;
-       ephemeral = is_ephemeral(pool);
-       if (ephemeral)
+       if (free)
                pampd = tmem_pampd_delete_from_obj(obj, index);
        else
                pampd = tmem_pampd_lookup_in_obj(obj, index);
        if (pampd == NULL)
                goto out;
-       ret = (*tmem_pamops.get_data)(page, pampd, pool);
-       if (ret < 0)
-               goto out;
-       if (ephemeral) {
-               (*tmem_pamops.free)(pampd, pool);
+       if (free) {
                if (obj->pampd_count == 0) {
                        tmem_obj_free(obj, hb);
                        (*tmem_hostops.obj_free)(obj, pool);
                        obj = NULL;
                }
        }
+       if (tmem_pamops.is_remote(pampd)) {
+               lock_held = false;
+               spin_unlock(&hb->lock);
+       }
+       if (free)
+               ret = (*tmem_pamops.get_data_and_free)(
+                               data, size, raw, pampd, pool, oidp, index);
+       else
+               ret = (*tmem_pamops.get_data)(
+                               data, size, raw, pampd, pool, oidp, index);
+       if (ret < 0)
+               goto out;
        ret = 0;
 out:
-       spin_unlock(&hb->lock);
+       if (lock_held)
+               spin_unlock(&hb->lock);
        return ret;
 }
 
@@ -632,7 +668,7 @@ int tmem_flush_page(struct tmem_pool *pool,
        pampd = tmem_pampd_delete_from_obj(obj, index);
        if (pampd == NULL)
                goto out;
-       (*tmem_pamops.free)(pampd, pool);
+       (*tmem_pamops.free)(pampd, pool, oidp, index);
        if (obj->pampd_count == 0) {
                tmem_obj_free(obj, hb);
                (*tmem_hostops.obj_free)(obj, pool);
@@ -644,6 +680,30 @@ out:
        return ret;
 }
 
+/*
+ * If a page in tmem matches the handle, replace the page so that any
+ * subsequent "get" gets the new page.  Returns 0 if
+ * there was a page to replace, else returns -1.
+ */
+int tmem_replace(struct tmem_pool *pool, struct tmem_oid *oidp,
+                       uint32_t index, void *new_pampd)
+{
+       struct tmem_obj *obj;
+       int ret = -1;
+       struct tmem_hashbucket *hb;
+
+       hb = &pool->hashbucket[tmem_oid_hash(oidp)];
+       spin_lock(&hb->lock);
+       obj = tmem_obj_find(hb, oidp);
+       if (obj == NULL)
+               goto out;
+       new_pampd = tmem_pampd_replace_in_obj(obj, index, new_pampd);
+       ret = (*tmem_pamops.replace_in_obj)(new_pampd, obj);
+out:
+       spin_unlock(&hb->lock);
+       return ret;
+}
+
 /*
  * "Flush" all pages in tmem matching this oid.
  */
index 2e07e217d51feae40e3da57133a42fac728a24cd..ed147c4b110d41b8f7febbb9b5efcb0ce3666f0e 100644 (file)
@@ -147,6 +147,7 @@ struct tmem_obj {
        unsigned int objnode_tree_height;
        unsigned long objnode_count;
        long pampd_count;
+       void *extra; /* for private use by pampd implementation */
        DECL_SENTINEL
 };
 
@@ -166,10 +167,18 @@ struct tmem_objnode {
 
 /* pampd abstract datatype methods provided by the PAM implementation */
 struct tmem_pamops {
-       void *(*create)(struct tmem_pool *, struct tmem_oid *, uint32_t,
-                       struct page *);
-       int (*get_data)(struct page *, void *, struct tmem_pool *);
-       void (*free)(void *, struct tmem_pool *);
+       void *(*create)(char *, size_t, bool, int,
+                       struct tmem_pool *, struct tmem_oid *, uint32_t);
+       int (*get_data)(char *, size_t *, bool, void *, struct tmem_pool *,
+                               struct tmem_oid *, uint32_t);
+       int (*get_data_and_free)(char *, size_t *, bool, void *,
+                               struct tmem_pool *, struct tmem_oid *,
+                               uint32_t);
+       void (*free)(void *, struct tmem_pool *, struct tmem_oid *, uint32_t);
+       void (*free_obj)(struct tmem_pool *, struct tmem_obj *);
+       bool (*is_remote)(void *);
+       void (*new_obj)(struct tmem_obj *);
+       int (*replace_in_obj)(void *, struct tmem_obj *);
 };
 extern void tmem_register_pamops(struct tmem_pamops *m);
 
@@ -184,9 +193,11 @@ extern void tmem_register_hostops(struct tmem_hostops *m);
 
 /* core tmem accessor functions */
 extern int tmem_put(struct tmem_pool *, struct tmem_oid *, uint32_t index,
-                       struct page *page);
+                       char *, size_t, bool, bool);
 extern int tmem_get(struct tmem_pool *, struct tmem_oid *, uint32_t index,
-                       struct page *page);
+                       char *, size_t *, bool, int);
+extern int tmem_replace(struct tmem_pool *, struct tmem_oid *, uint32_t index,
+                       void *);
 extern int tmem_flush_page(struct tmem_pool *, struct tmem_oid *,
                        uint32_t index);
 extern int tmem_flush_object(struct tmem_pool *, struct tmem_oid *);
index 77ac2d4d3ef17dc72e9d2633f8f9846ff766c2c8..65a81a0d7c4921a8515cc92757b27977c5958f05 100644 (file)
        (__GFP_FS | __GFP_NORETRY | __GFP_NOWARN | __GFP_NOMEMALLOC)
 #endif
 
+#define MAX_POOLS_PER_CLIENT 16
+
+#define MAX_CLIENTS 16
+#define LOCAL_CLIENT ((uint16_t)-1)
+struct zcache_client {
+       struct tmem_pool *tmem_pools[MAX_POOLS_PER_CLIENT];
+       struct xv_pool *xvpool;
+       bool allocated;
+       atomic_t refcount;
+};
+
+static struct zcache_client zcache_host;
+static struct zcache_client zcache_clients[MAX_CLIENTS];
+
+static inline uint16_t get_client_id_from_client(struct zcache_client *cli)
+{
+       BUG_ON(cli == NULL);
+       if (cli == &zcache_host)
+               return LOCAL_CLIENT;
+       return cli - &zcache_clients[0];
+}
+
+static inline bool is_local_client(struct zcache_client *cli)
+{
+       return cli == &zcache_host;
+}
+
 /**********
  * Compression buddies ("zbud") provides for packing two (or, possibly
  * in the future, more) compressed ephemeral pages into a single "raw"
@@ -72,7 +99,8 @@
 #define ZBUD_MAX_BUDS 2
 
 struct zbud_hdr {
-       uint32_t pool_id;
+       uint16_t client_id;
+       uint16_t pool_id;
        struct tmem_oid oid;
        uint32_t index;
        uint16_t size; /* compressed size in bytes, zero means unused */
@@ -120,6 +148,7 @@ static unsigned long zcache_zbud_curr_zbytes;
 static unsigned long zcache_zbud_cumul_zpages;
 static unsigned long zcache_zbud_cumul_zbytes;
 static unsigned long zcache_compress_poor;
+static unsigned long zcache_mean_compress_poor;
 
 /* forward references */
 static void *zcache_get_free_page(void);
@@ -294,7 +323,8 @@ static void zbud_free_and_delist(struct zbud_hdr *zh)
        }
 }
 
-static struct zbud_hdr *zbud_create(uint32_t pool_id, struct tmem_oid *oid,
+static struct zbud_hdr *zbud_create(uint16_t client_id, uint16_t pool_id,
+                                       struct tmem_oid *oid,
                                        uint32_t index, struct page *page,
                                        void *cdata, unsigned size)
 {
@@ -353,6 +383,7 @@ init_zh:
        zh->index = index;
        zh->oid = *oid;
        zh->pool_id = pool_id;
+       zh->client_id = client_id;
        /* can wait to copy the data until the list locks are dropped */
        spin_unlock(&zbud_budlists_spinlock);
 
@@ -407,7 +438,8 @@ static unsigned long zcache_evicted_raw_pages;
 static unsigned long zcache_evicted_buddied_pages;
 static unsigned long zcache_evicted_unbuddied_pages;
 
-static struct tmem_pool *zcache_get_pool_by_id(uint32_t poolid);
+static struct tmem_pool *zcache_get_pool_by_id(uint16_t cli_id,
+                                               uint16_t poolid);
 static void zcache_put_pool(struct tmem_pool *pool);
 
 /*
@@ -417,7 +449,8 @@ static void zbud_evict_zbpg(struct zbud_page *zbpg)
 {
        struct zbud_hdr *zh;
        int i, j;
-       uint32_t pool_id[ZBUD_MAX_BUDS], index[ZBUD_MAX_BUDS];
+       uint32_t pool_id[ZBUD_MAX_BUDS], client_id[ZBUD_MAX_BUDS];
+       uint32_t index[ZBUD_MAX_BUDS];
        struct tmem_oid oid[ZBUD_MAX_BUDS];
        struct tmem_pool *pool;
 
@@ -426,6 +459,7 @@ static void zbud_evict_zbpg(struct zbud_page *zbpg)
        for (i = 0, j = 0; i < ZBUD_MAX_BUDS; i++) {
                zh = &zbpg->buddy[i];
                if (zh->size) {
+                       client_id[j] = zh->client_id;
                        pool_id[j] = zh->pool_id;
                        oid[j] = zh->oid;
                        index[j] = zh->index;
@@ -435,7 +469,7 @@ static void zbud_evict_zbpg(struct zbud_page *zbpg)
        }
        spin_unlock(&zbpg->lock);
        for (i = 0; i < j; i++) {
-               pool = zcache_get_pool_by_id(pool_id[i]);
+               pool = zcache_get_pool_by_id(client_id[i], pool_id[i]);
                if (pool != NULL) {
                        tmem_flush_page(pool, &oid[i], index[i]);
                        zcache_put_pool(pool);
@@ -552,9 +586,8 @@ static int zbud_show_unbuddied_list_counts(char *buf)
        int i;
        char *p = buf;
 
-       for (i = 0; i < NCHUNKS - 1; i++)
+       for (i = 0; i < NCHUNKS; i++)
                p += sprintf(p, "%u ", zbud_unbuddied[i].count);
-       p += sprintf(p, "%d\n", zbud_unbuddied[i].count);
        return p - buf;
 }
 
@@ -602,7 +635,23 @@ struct zv_hdr {
        DECL_SENTINEL
 };
 
-static const int zv_max_page_size = (PAGE_SIZE / 8) * 7;
+/* rudimentary policy limits */
+/* total number of persistent pages may not exceed this percentage */
+static unsigned int zv_page_count_policy_percent = 75;
+/*
+ * byte count defining poor compression; pages with greater zsize will be
+ * rejected
+ */
+static unsigned int zv_max_zsize = (PAGE_SIZE / 8) * 7;
+/*
+ * byte count defining poor *mean* compression; pages with greater zsize
+ * will be rejected until sufficient better-compressed pages are accepted
+ * driving the man below this threshold
+ */
+static unsigned int zv_max_mean_zsize = (PAGE_SIZE / 8) * 5;
+
+static unsigned long zv_curr_dist_counts[NCHUNKS];
+static unsigned long zv_cumul_dist_counts[NCHUNKS];
 
 static struct zv_hdr *zv_create(struct xv_pool *xvpool, uint32_t pool_id,
                                struct tmem_oid *oid, uint32_t index,
@@ -611,13 +660,18 @@ static struct zv_hdr *zv_create(struct xv_pool *xvpool, uint32_t pool_id,
        struct page *page;
        struct zv_hdr *zv = NULL;
        uint32_t offset;
+       int alloc_size = clen + sizeof(struct zv_hdr);
+       int chunks = (alloc_size + (CHUNK_SIZE - 1)) >> CHUNK_SHIFT;
        int ret;
 
        BUG_ON(!irqs_disabled());
-       ret = xv_malloc(xvpool, clen + sizeof(struct zv_hdr),
+       BUG_ON(chunks >= NCHUNKS);
+       ret = xv_malloc(xvpool, alloc_size,
                        &page, &offset, ZCACHE_GFP_MASK);
        if (unlikely(ret))
                goto out;
+       zv_curr_dist_counts[chunks]++;
+       zv_cumul_dist_counts[chunks]++;
        zv = kmap_atomic(page, KM_USER0) + offset;
        zv->index = index;
        zv->oid = *oid;
@@ -634,11 +688,14 @@ static void zv_free(struct xv_pool *xvpool, struct zv_hdr *zv)
        unsigned long flags;
        struct page *page;
        uint32_t offset;
-       uint16_t size;
+       uint16_t size = xv_get_object_size(zv);
+       int chunks = (size + (CHUNK_SIZE - 1)) >> CHUNK_SHIFT;
 
        ASSERT_SENTINEL(zv, ZVH);
-       size = xv_get_object_size(zv) - sizeof(*zv);
-       BUG_ON(size == 0 || size > zv_max_page_size);
+       BUG_ON(chunks >= NCHUNKS);
+       zv_curr_dist_counts[chunks]--;
+       size -= sizeof(*zv);
+       BUG_ON(size == 0);
        INVERT_SENTINEL(zv, ZVH);
        page = virt_to_page(zv);
        offset = (unsigned long)zv & ~PAGE_MASK;
@@ -656,7 +713,7 @@ static void zv_decompress(struct page *page, struct zv_hdr *zv)
 
        ASSERT_SENTINEL(zv, ZVH);
        size = xv_get_object_size(zv) - sizeof(*zv);
-       BUG_ON(size == 0 || size > zv_max_page_size);
+       BUG_ON(size == 0);
        to_va = kmap_atomic(page, KM_USER0);
        ret = lzo1x_decompress_safe((char *)zv + sizeof(*zv),
                                        size, to_va, &clen);
@@ -665,6 +722,159 @@ static void zv_decompress(struct page *page, struct zv_hdr *zv)
        BUG_ON(clen != PAGE_SIZE);
 }
 
+#ifdef CONFIG_SYSFS
+/*
+ * show a distribution of compression stats for zv pages.
+ */
+
+static int zv_curr_dist_counts_show(char *buf)
+{
+       unsigned long i, n, chunks = 0, sum_total_chunks = 0;
+       char *p = buf;
+
+       for (i = 0; i < NCHUNKS; i++) {
+               n = zv_curr_dist_counts[i];
+               p += sprintf(p, "%lu ", n);
+               chunks += n;
+               sum_total_chunks += i * n;
+       }
+       p += sprintf(p, "mean:%lu\n",
+               chunks == 0 ? 0 : sum_total_chunks / chunks);
+       return p - buf;
+}
+
+static int zv_cumul_dist_counts_show(char *buf)
+{
+       unsigned long i, n, chunks = 0, sum_total_chunks = 0;
+       char *p = buf;
+
+       for (i = 0; i < NCHUNKS; i++) {
+               n = zv_cumul_dist_counts[i];
+               p += sprintf(p, "%lu ", n);
+               chunks += n;
+               sum_total_chunks += i * n;
+       }
+       p += sprintf(p, "mean:%lu\n",
+               chunks == 0 ? 0 : sum_total_chunks / chunks);
+       return p - buf;
+}
+
+/*
+ * setting zv_max_zsize via sysfs causes all persistent (e.g. swap)
+ * pages that don't compress to less than this value (including metadata
+ * overhead) to be rejected.  We don't allow the value to get too close
+ * to PAGE_SIZE.
+ */
+static ssize_t zv_max_zsize_show(struct kobject *kobj,
+                                   struct kobj_attribute *attr,
+                                   char *buf)
+{
+       return sprintf(buf, "%u\n", zv_max_zsize);
+}
+
+static ssize_t zv_max_zsize_store(struct kobject *kobj,
+                                   struct kobj_attribute *attr,
+                                   const char *buf, size_t count)
+{
+       unsigned long val;
+       int err;
+
+       if (!capable(CAP_SYS_ADMIN))
+               return -EPERM;
+
+       err = strict_strtoul(buf, 10, &val);
+       if (err || (val == 0) || (val > (PAGE_SIZE / 8) * 7))
+               return -EINVAL;
+       zv_max_zsize = val;
+       return count;
+}
+
+/*
+ * setting zv_max_mean_zsize via sysfs causes all persistent (e.g. swap)
+ * pages that don't compress to less than this value (including metadata
+ * overhead) to be rejected UNLESS the mean compression is also smaller
+ * than this value.  In other words, we are load-balancing-by-zsize the
+ * accepted pages.  Again, we don't allow the value to get too close
+ * to PAGE_SIZE.
+ */
+static ssize_t zv_max_mean_zsize_show(struct kobject *kobj,
+                                   struct kobj_attribute *attr,
+                                   char *buf)
+{
+       return sprintf(buf, "%u\n", zv_max_mean_zsize);
+}
+
+static ssize_t zv_max_mean_zsize_store(struct kobject *kobj,
+                                   struct kobj_attribute *attr,
+                                   const char *buf, size_t count)
+{
+       unsigned long val;
+       int err;
+
+       if (!capable(CAP_SYS_ADMIN))
+               return -EPERM;
+
+       err = strict_strtoul(buf, 10, &val);
+       if (err || (val == 0) || (val > (PAGE_SIZE / 8) * 7))
+               return -EINVAL;
+       zv_max_mean_zsize = val;
+       return count;
+}
+
+/*
+ * setting zv_page_count_policy_percent via sysfs sets an upper bound of
+ * persistent (e.g. swap) pages that will be retained according to:
+ *     (zv_page_count_policy_percent * totalram_pages) / 100)
+ * when that limit is reached, further puts will be rejected (until
+ * some pages have been flushed).  Note that, due to compression,
+ * this number may exceed 100; it defaults to 75 and we set an
+ * arbitary limit of 150.  A poor choice will almost certainly result
+ * in OOM's, so this value should only be changed prudently.
+ */
+static ssize_t zv_page_count_policy_percent_show(struct kobject *kobj,
+                                                struct kobj_attribute *attr,
+                                                char *buf)
+{
+       return sprintf(buf, "%u\n", zv_page_count_policy_percent);
+}
+
+static ssize_t zv_page_count_policy_percent_store(struct kobject *kobj,
+                                                 struct kobj_attribute *attr,
+                                                 const char *buf, size_t count)
+{
+       unsigned long val;
+       int err;
+
+       if (!capable(CAP_SYS_ADMIN))
+               return -EPERM;
+
+       err = strict_strtoul(buf, 10, &val);
+       if (err || (val == 0) || (val > 150))
+               return -EINVAL;
+       zv_page_count_policy_percent = val;
+       return count;
+}
+
+static struct kobj_attribute zcache_zv_max_zsize_attr = {
+               .attr = { .name = "zv_max_zsize", .mode = 0644 },
+               .show = zv_max_zsize_show,
+               .store = zv_max_zsize_store,
+};
+
+static struct kobj_attribute zcache_zv_max_mean_zsize_attr = {
+               .attr = { .name = "zv_max_mean_zsize", .mode = 0644 },
+               .show = zv_max_mean_zsize_show,
+               .store = zv_max_mean_zsize_store,
+};
+
+static struct kobj_attribute zcache_zv_page_count_policy_percent_attr = {
+               .attr = { .name = "zv_page_count_policy_percent",
+                         .mode = 0644 },
+               .show = zv_page_count_policy_percent_show,
+               .store = zv_page_count_policy_percent_store,
+};
+#endif
+
 /*
  * zcache core code starts here
  */
@@ -677,36 +887,70 @@ static unsigned long zcache_flobj_found;
 static unsigned long zcache_failed_eph_puts;
 static unsigned long zcache_failed_pers_puts;
 
-#define MAX_POOLS_PER_CLIENT 16
-
-static struct {
-       struct tmem_pool *tmem_pools[MAX_POOLS_PER_CLIENT];
-       struct xv_pool *xvpool;
-} zcache_client;
-
 /*
  * Tmem operations assume the poolid implies the invoking client.
- * Zcache only has one client (the kernel itself), so translate
- * the poolid into the tmem_pool allocated for it.  A KVM version
+ * Zcache only has one client (the kernel itself): LOCAL_CLIENT.
+ * RAMster has each client numbered by cluster node, and a KVM version
  * of zcache would have one client per guest and each client might
  * have a poolid==N.
  */
-static struct tmem_pool *zcache_get_pool_by_id(uint32_t poolid)
+static struct tmem_pool *zcache_get_pool_by_id(uint16_t cli_id, uint16_t poolid)
 {
        struct tmem_pool *pool = NULL;
+       struct zcache_client *cli = NULL;
 
-       if (poolid >= 0) {
-               pool = zcache_client.tmem_pools[poolid];
+       if (cli_id == LOCAL_CLIENT)
+               cli = &zcache_host;
+       else {
+               if (cli_id >= MAX_CLIENTS)
+                       goto out;
+               cli = &zcache_clients[cli_id];
+               if (cli == NULL)
+                       goto out;
+               atomic_inc(&cli->refcount);
+       }
+       if (poolid < MAX_POOLS_PER_CLIENT) {
+               pool = cli->tmem_pools[poolid];
                if (pool != NULL)
                        atomic_inc(&pool->refcount);
        }
+out:
        return pool;
 }
 
 static void zcache_put_pool(struct tmem_pool *pool)
 {
-       if (pool != NULL)
-               atomic_dec(&pool->refcount);
+       struct zcache_client *cli = NULL;
+
+       if (pool == NULL)
+               BUG();
+       cli = pool->client;
+       atomic_dec(&pool->refcount);
+       atomic_dec(&cli->refcount);
+}
+
+int zcache_new_client(uint16_t cli_id)
+{
+       struct zcache_client *cli = NULL;
+       int ret = -1;
+
+       if (cli_id == LOCAL_CLIENT)
+               cli = &zcache_host;
+       else if ((unsigned int)cli_id < MAX_CLIENTS)
+               cli = &zcache_clients[cli_id];
+       if (cli == NULL)
+               goto out;
+       if (cli->allocated)
+               goto out;
+       cli->allocated = 1;
+#ifdef CONFIG_FRONTSWAP
+       cli->xvpool = xv_create_pool();
+       if (cli->xvpool == NULL)
+               goto out;
+#endif
+       ret = 0;
+out:
+       return ret;
 }
 
 /* counters for debugging */
@@ -901,48 +1145,59 @@ static unsigned long zcache_curr_pers_pampd_count_max;
 /* forward reference */
 static int zcache_compress(struct page *from, void **out_va, size_t *out_len);
 
-static void *zcache_pampd_create(struct tmem_pool *pool, struct tmem_oid *oid,
-                                uint32_t index, struct page *page)
+static void *zcache_pampd_create(char *data, size_t size, bool raw, int eph,
+                               struct tmem_pool *pool, struct tmem_oid *oid,
+                                uint32_t index)
 {
        void *pampd = NULL, *cdata;
        size_t clen;
        int ret;
-       bool ephemeral = is_ephemeral(pool);
        unsigned long count;
+       struct page *page = virt_to_page(data);
+       struct zcache_client *cli = pool->client;
+       uint16_t client_id = get_client_id_from_client(cli);
+       unsigned long zv_mean_zsize;
+       unsigned long curr_pers_pampd_count;
 
-       if (ephemeral) {
+       if (eph) {
                ret = zcache_compress(page, &cdata, &clen);
                if (ret == 0)
-
                        goto out;
                if (clen == 0 || clen > zbud_max_buddy_size()) {
                        zcache_compress_poor++;
                        goto out;
                }
-               pampd = (void *)zbud_create(pool->pool_id, oid, index,
-                                               page, cdata, clen);
+               pampd = (void *)zbud_create(client_id, pool->pool_id, oid,
+                                               index, page, cdata, clen);
                if (pampd != NULL) {
                        count = atomic_inc_return(&zcache_curr_eph_pampd_count);
                        if (count > zcache_curr_eph_pampd_count_max)
                                zcache_curr_eph_pampd_count_max = count;
                }
        } else {
-               /*
-                * FIXME: This is all the "policy" there is for now.
-                * 3/4 totpages should allow ~37% of RAM to be filled with
-                * compressed frontswap pages
-                */
-               if (atomic_read(&zcache_curr_pers_pampd_count) >
-                                                       3 * totalram_pages / 4)
+               curr_pers_pampd_count =
+                       atomic_read(&zcache_curr_pers_pampd_count);
+               if (curr_pers_pampd_count >
+                   (zv_page_count_policy_percent * totalram_pages) / 100)
                        goto out;
                ret = zcache_compress(page, &cdata, &clen);
                if (ret == 0)
                        goto out;
-               if (clen > zv_max_page_size) {
+               /* reject if compression is too poor */
+               if (clen > zv_max_zsize) {
                        zcache_compress_poor++;
                        goto out;
                }
-               pampd = (void *)zv_create(zcache_client.xvpool, pool->pool_id,
+               /* reject if mean compression is too poor */
+               if ((clen > zv_max_mean_zsize) && (curr_pers_pampd_count > 0)) {
+                       zv_mean_zsize = xv_get_total_size_bytes(cli->xvpool) /
+                                               curr_pers_pampd_count;
+                       if (zv_mean_zsize > zv_max_mean_zsize) {
+                               zcache_mean_compress_poor++;
+                               goto out;
+                       }
+               }
+               pampd = (void *)zv_create(cli->xvpool, pool->pool_id,
                                                oid, index, cdata, clen);
                if (pampd == NULL)
                        goto out;
@@ -958,15 +1213,31 @@ out:
  * fill the pageframe corresponding to the struct page with the data
  * from the passed pampd
  */
-static int zcache_pampd_get_data(struct page *page, void *pampd,
-                                               struct tmem_pool *pool)
+static int zcache_pampd_get_data(char *data, size_t *bufsize, bool raw,
+                                       void *pampd, struct tmem_pool *pool,
+                                       struct tmem_oid *oid, uint32_t index)
 {
        int ret = 0;
 
-       if (is_ephemeral(pool))
-               ret = zbud_decompress(page, pampd);
-       else
-               zv_decompress(page, pampd);
+       BUG_ON(is_ephemeral(pool));
+       zv_decompress(virt_to_page(data), pampd);
+       return ret;
+}
+
+/*
+ * fill the pageframe corresponding to the struct page with the data
+ * from the passed pampd
+ */
+static int zcache_pampd_get_data_and_free(char *data, size_t *bufsize, bool raw,
+                                       void *pampd, struct tmem_pool *pool,
+                                       struct tmem_oid *oid, uint32_t index)
+{
+       int ret = 0;
+
+       BUG_ON(!is_ephemeral(pool));
+       zbud_decompress(virt_to_page(data), pampd);
+       zbud_free_and_delist((struct zbud_hdr *)pampd);
+       atomic_dec(&zcache_curr_eph_pampd_count);
        return ret;
 }
 
@@ -974,23 +1245,49 @@ static int zcache_pampd_get_data(struct page *page, void *pampd,
  * free the pampd and remove it from any zcache lists
  * pampd must no longer be pointed to from any tmem data structures!
  */
-static void zcache_pampd_free(void *pampd, struct tmem_pool *pool)
+static void zcache_pampd_free(void *pampd, struct tmem_pool *pool,
+                               struct tmem_oid *oid, uint32_t index)
 {
+       struct zcache_client *cli = pool->client;
+
        if (is_ephemeral(pool)) {
                zbud_free_and_delist((struct zbud_hdr *)pampd);
                atomic_dec(&zcache_curr_eph_pampd_count);
                BUG_ON(atomic_read(&zcache_curr_eph_pampd_count) < 0);
        } else {
-               zv_free(zcache_client.xvpool, (struct zv_hdr *)pampd);
+               zv_free(cli->xvpool, (struct zv_hdr *)pampd);
                atomic_dec(&zcache_curr_pers_pampd_count);
                BUG_ON(atomic_read(&zcache_curr_pers_pampd_count) < 0);
        }
 }
 
+static void zcache_pampd_free_obj(struct tmem_pool *pool, struct tmem_obj *obj)
+{
+}
+
+static void zcache_pampd_new_obj(struct tmem_obj *obj)
+{
+}
+
+static int zcache_pampd_replace_in_obj(void *pampd, struct tmem_obj *obj)
+{
+       return -1;
+}
+
+static bool zcache_pampd_is_remote(void *pampd)
+{
+       return 0;
+}
+
 static struct tmem_pamops zcache_pamops = {
        .create = zcache_pampd_create,
        .get_data = zcache_pampd_get_data,
+       .get_data_and_free = zcache_pampd_get_data_and_free,
        .free = zcache_pampd_free,
+       .free_obj = zcache_pampd_free_obj,
+       .new_obj = zcache_pampd_new_obj,
+       .replace_in_obj = zcache_pampd_replace_in_obj,
+       .is_remote = zcache_pampd_is_remote,
 };
 
 /*
@@ -1122,6 +1419,7 @@ ZCACHE_SYSFS_RO(put_to_flush);
 ZCACHE_SYSFS_RO(aborted_preload);
 ZCACHE_SYSFS_RO(aborted_shrink);
 ZCACHE_SYSFS_RO(compress_poor);
+ZCACHE_SYSFS_RO(mean_compress_poor);
 ZCACHE_SYSFS_RO_ATOMIC(zbud_curr_raw_pages);
 ZCACHE_SYSFS_RO_ATOMIC(zbud_curr_zpages);
 ZCACHE_SYSFS_RO_ATOMIC(curr_obj_count);
@@ -1130,6 +1428,10 @@ ZCACHE_SYSFS_RO_CUSTOM(zbud_unbuddied_list_counts,
                        zbud_show_unbuddied_list_counts);
 ZCACHE_SYSFS_RO_CUSTOM(zbud_cumul_chunk_counts,
                        zbud_show_cumul_chunk_counts);
+ZCACHE_SYSFS_RO_CUSTOM(zv_curr_dist_counts,
+                       zv_curr_dist_counts_show);
+ZCACHE_SYSFS_RO_CUSTOM(zv_cumul_dist_counts,
+                       zv_cumul_dist_counts_show);
 
 static struct attribute *zcache_attrs[] = {
        &zcache_curr_obj_count_attr.attr,
@@ -1143,6 +1445,7 @@ static struct attribute *zcache_attrs[] = {
        &zcache_failed_eph_puts_attr.attr,
        &zcache_failed_pers_puts_attr.attr,
        &zcache_compress_poor_attr.attr,
+       &zcache_mean_compress_poor_attr.attr,
        &zcache_zbud_curr_raw_pages_attr.attr,
        &zcache_zbud_curr_zpages_attr.attr,
        &zcache_zbud_curr_zbytes_attr.attr,
@@ -1160,6 +1463,11 @@ static struct attribute *zcache_attrs[] = {
        &zcache_aborted_shrink_attr.attr,
        &zcache_zbud_unbuddied_list_counts_attr.attr,
        &zcache_zbud_cumul_chunk_counts_attr.attr,
+       &zcache_zv_curr_dist_counts_attr.attr,
+       &zcache_zv_cumul_dist_counts_attr.attr,
+       &zcache_zv_max_zsize_attr.attr,
+       &zcache_zv_max_mean_zsize_attr.attr,
+       &zcache_zv_page_count_policy_percent_attr.attr,
        NULL,
 };
 
@@ -1212,19 +1520,20 @@ static struct shrinker zcache_shrinker = {
  * zcache shims between cleancache/frontswap ops and tmem
  */
 
-static int zcache_put_page(int pool_id, struct tmem_oid *oidp,
+static int zcache_put_page(int cli_id, int pool_id, struct tmem_oid *oidp,
                                uint32_t index, struct page *page)
 {
        struct tmem_pool *pool;
        int ret = -1;
 
        BUG_ON(!irqs_disabled());
-       pool = zcache_get_pool_by_id(pool_id);
+       pool = zcache_get_pool_by_id(cli_id, pool_id);
        if (unlikely(pool == NULL))
                goto out;
        if (!zcache_freeze && zcache_do_preload(pool) == 0) {
                /* preload does preempt_disable on success */
-               ret = tmem_put(pool, oidp, index, page);
+               ret = tmem_put(pool, oidp, index, page_address(page),
+                               PAGE_SIZE, 0, is_ephemeral(pool));
                if (ret < 0) {
                        if (is_ephemeral(pool))
                                zcache_failed_eph_puts++;
@@ -1244,25 +1553,28 @@ out:
        return ret;
 }
 
-static int zcache_get_page(int pool_id, struct tmem_oid *oidp,
+static int zcache_get_page(int cli_id, int pool_id, struct tmem_oid *oidp,
                                uint32_t index, struct page *page)
 {
        struct tmem_pool *pool;
        int ret = -1;
        unsigned long flags;
+       size_t size = PAGE_SIZE;
 
        local_irq_save(flags);
-       pool = zcache_get_pool_by_id(pool_id);
+       pool = zcache_get_pool_by_id(cli_id, pool_id);
        if (likely(pool != NULL)) {
                if (atomic_read(&pool->obj_count) > 0)
-                       ret = tmem_get(pool, oidp, index, page);
+                       ret = tmem_get(pool, oidp, index, page_address(page),
+                                       &size, 0, is_ephemeral(pool));
                zcache_put_pool(pool);
        }
        local_irq_restore(flags);
        return ret;
 }
 
-static int zcache_flush_page(int pool_id, struct tmem_oid *oidp, uint32_t index)
+static int zcache_flush_page(int cli_id, int pool_id,
+                               struct tmem_oid *oidp, uint32_t index)
 {
        struct tmem_pool *pool;
        int ret = -1;
@@ -1270,7 +1582,7 @@ static int zcache_flush_page(int pool_id, struct tmem_oid *oidp, uint32_t index)
 
        local_irq_save(flags);
        zcache_flush_total++;
-       pool = zcache_get_pool_by_id(pool_id);
+       pool = zcache_get_pool_by_id(cli_id, pool_id);
        if (likely(pool != NULL)) {
                if (atomic_read(&pool->obj_count) > 0)
                        ret = tmem_flush_page(pool, oidp, index);
@@ -1282,7 +1594,8 @@ static int zcache_flush_page(int pool_id, struct tmem_oid *oidp, uint32_t index)
        return ret;
 }
 
-static int zcache_flush_object(int pool_id, struct tmem_oid *oidp)
+static int zcache_flush_object(int cli_id, int pool_id,
+                               struct tmem_oid *oidp)
 {
        struct tmem_pool *pool;
        int ret = -1;
@@ -1290,7 +1603,7 @@ static int zcache_flush_object(int pool_id, struct tmem_oid *oidp)
 
        local_irq_save(flags);
        zcache_flobj_total++;
-       pool = zcache_get_pool_by_id(pool_id);
+       pool = zcache_get_pool_by_id(cli_id, pool_id);
        if (likely(pool != NULL)) {
                if (atomic_read(&pool->obj_count) > 0)
                        ret = tmem_flush_object(pool, oidp);
@@ -1302,34 +1615,52 @@ static int zcache_flush_object(int pool_id, struct tmem_oid *oidp)
        return ret;
 }
 
-static int zcache_destroy_pool(int pool_id)
+static int zcache_destroy_pool(int cli_id, int pool_id)
 {
        struct tmem_pool *pool = NULL;
+       struct zcache_client *cli = NULL;
        int ret = -1;
 
        if (pool_id < 0)
                goto out;
-       pool = zcache_client.tmem_pools[pool_id];
+       if (cli_id == LOCAL_CLIENT)
+               cli = &zcache_host;
+       else if ((unsigned int)cli_id < MAX_CLIENTS)
+               cli = &zcache_clients[cli_id];
+       if (cli == NULL)
+               goto out;
+       atomic_inc(&cli->refcount);
+       pool = cli->tmem_pools[pool_id];
        if (pool == NULL)
                goto out;
-       zcache_client.tmem_pools[pool_id] = NULL;
+       cli->tmem_pools[pool_id] = NULL;
        /* wait for pool activity on other cpus to quiesce */
        while (atomic_read(&pool->refcount) != 0)
                ;
+       atomic_dec(&cli->refcount);
        local_bh_disable();
        ret = tmem_destroy_pool(pool);
        local_bh_enable();
        kfree(pool);
-       pr_info("zcache: destroyed pool id=%d\n", pool_id);
+       pr_info("zcache: destroyed pool id=%d, cli_id=%d\n",
+                       pool_id, cli_id);
 out:
        return ret;
 }
 
-static int zcache_new_pool(uint32_t flags)
+static int zcache_new_pool(uint16_t cli_id, uint32_t flags)
 {
        int poolid = -1;
        struct tmem_pool *pool;
+       struct zcache_client *cli = NULL;
 
+       if (cli_id == LOCAL_CLIENT)
+               cli = &zcache_host;
+       else if ((unsigned int)cli_id < MAX_CLIENTS)
+               cli = &zcache_clients[cli_id];
+       if (cli == NULL)
+               goto out;
+       atomic_inc(&cli->refcount);
        pool = kmalloc(sizeof(struct tmem_pool), GFP_KERNEL);
        if (pool == NULL) {
                pr_info("zcache: pool creation failed: out of memory\n");
@@ -1337,7 +1668,7 @@ static int zcache_new_pool(uint32_t flags)
        }
 
        for (poolid = 0; poolid < MAX_POOLS_PER_CLIENT; poolid++)
-               if (zcache_client.tmem_pools[poolid] == NULL)
+               if (cli->tmem_pools[poolid] == NULL)
                        break;
        if (poolid >= MAX_POOLS_PER_CLIENT) {
                pr_info("zcache: pool creation failed: max exceeded\n");
@@ -1346,14 +1677,16 @@ static int zcache_new_pool(uint32_t flags)
                goto out;
        }
        atomic_set(&pool->refcount, 0);
-       pool->client = &zcache_client;
+       pool->client = cli;
        pool->pool_id = poolid;
        tmem_new_pool(pool, flags);
-       zcache_client.tmem_pools[poolid] = pool;
-       pr_info("zcache: created %s tmem pool, id=%d\n",
+       cli->tmem_pools[poolid] = pool;
+       pr_info("zcache: created %s tmem pool, id=%d, client=%d\n",
                flags & TMEM_POOL_PERSIST ? "persistent" : "ephemeral",
-               poolid);
+               poolid, cli_id);
 out:
+       if (cli != NULL)
+               atomic_dec(&cli->refcount);
        return poolid;
 }
 
@@ -1374,7 +1707,7 @@ static void zcache_cleancache_put_page(int pool_id,
        struct tmem_oid oid = *(struct tmem_oid *)&key;
 
        if (likely(ind == index))
-               (void)zcache_put_page(pool_id, &oid, index, page);
+               (void)zcache_put_page(LOCAL_CLIENT, pool_id, &oid, index, page);
 }
 
 static int zcache_cleancache_get_page(int pool_id,
@@ -1386,7 +1719,7 @@ static int zcache_cleancache_get_page(int pool_id,
        int ret = -1;
 
        if (likely(ind == index))
-               ret = zcache_get_page(pool_id, &oid, index, page);
+               ret = zcache_get_page(LOCAL_CLIENT, pool_id, &oid, index, page);
        return ret;
 }
 
@@ -1398,7 +1731,7 @@ static void zcache_cleancache_flush_page(int pool_id,
        struct tmem_oid oid = *(struct tmem_oid *)&key;
 
        if (likely(ind == index))
-               (void)zcache_flush_page(pool_id, &oid, ind);
+               (void)zcache_flush_page(LOCAL_CLIENT, pool_id, &oid, ind);
 }
 
 static void zcache_cleancache_flush_inode(int pool_id,
@@ -1406,13 +1739,13 @@ static void zcache_cleancache_flush_inode(int pool_id,
 {
        struct tmem_oid oid = *(struct tmem_oid *)&key;
 
-       (void)zcache_flush_object(pool_id, &oid);
+       (void)zcache_flush_object(LOCAL_CLIENT, pool_id, &oid);
 }
 
 static void zcache_cleancache_flush_fs(int pool_id)
 {
        if (pool_id >= 0)
-               (void)zcache_destroy_pool(pool_id);
+               (void)zcache_destroy_pool(LOCAL_CLIENT, pool_id);
 }
 
 static int zcache_cleancache_init_fs(size_t pagesize)
@@ -1420,7 +1753,7 @@ static int zcache_cleancache_init_fs(size_t pagesize)
        BUG_ON(sizeof(struct cleancache_filekey) !=
                                sizeof(struct tmem_oid));
        BUG_ON(pagesize != PAGE_SIZE);
-       return zcache_new_pool(0);
+       return zcache_new_pool(LOCAL_CLIENT, 0);
 }
 
 static int zcache_cleancache_init_shared_fs(char *uuid, size_t pagesize)
@@ -1429,7 +1762,7 @@ static int zcache_cleancache_init_shared_fs(char *uuid, size_t pagesize)
        BUG_ON(sizeof(struct cleancache_filekey) !=
                                sizeof(struct tmem_oid));
        BUG_ON(pagesize != PAGE_SIZE);
-       return zcache_new_pool(0);
+       return zcache_new_pool(LOCAL_CLIENT, 0);
 }
 
 static struct cleancache_ops zcache_cleancache_ops = {
@@ -1483,8 +1816,8 @@ static int zcache_frontswap_put_page(unsigned type, pgoff_t offset,
        BUG_ON(!PageLocked(page));
        if (likely(ind64 == ind)) {
                local_irq_save(flags);
-               ret = zcache_put_page(zcache_frontswap_poolid, &oid,
-                                       iswiz(ind), page);
+               ret = zcache_put_page(LOCAL_CLIENT, zcache_frontswap_poolid,
+                                       &oid, iswiz(ind), page);
                local_irq_restore(flags);
        }
        return ret;
@@ -1502,8 +1835,8 @@ static int zcache_frontswap_get_page(unsigned type, pgoff_t offset,
 
        BUG_ON(!PageLocked(page));
        if (likely(ind64 == ind))
-               ret = zcache_get_page(zcache_frontswap_poolid, &oid,
-                                       iswiz(ind), page);
+               ret = zcache_get_page(LOCAL_CLIENT, zcache_frontswap_poolid,
+                                       &oid, iswiz(ind), page);
        return ret;
 }
 
@@ -1515,8 +1848,8 @@ static void zcache_frontswap_flush_page(unsigned type, pgoff_t offset)
        struct tmem_oid oid = oswiz(type, ind);
 
        if (likely(ind64 == ind))
-               (void)zcache_flush_page(zcache_frontswap_poolid, &oid,
-                                       iswiz(ind));
+               (void)zcache_flush_page(LOCAL_CLIENT, zcache_frontswap_poolid,
+                                       &oid, iswiz(ind));
 }
 
 /* flush all pages from the passed swaptype */
@@ -1527,7 +1860,8 @@ static void zcache_frontswap_flush_area(unsigned type)
 
        for (ind = SWIZ_MASK; ind >= 0; ind--) {
                oid = oswiz(type, ind);
-               (void)zcache_flush_object(zcache_frontswap_poolid, &oid);
+               (void)zcache_flush_object(LOCAL_CLIENT,
+                                               zcache_frontswap_poolid, &oid);
        }
 }
 
@@ -1535,7 +1869,8 @@ static void zcache_frontswap_init(unsigned ignored)
 {
        /* a single tmem poolid is used for all frontswap "types" (swapfiles) */
        if (zcache_frontswap_poolid < 0)
-               zcache_frontswap_poolid = zcache_new_pool(TMEM_POOL_PERSIST);
+               zcache_frontswap_poolid =
+                       zcache_new_pool(LOCAL_CLIENT, TMEM_POOL_PERSIST);
 }
 
 static struct frontswap_ops zcache_frontswap_ops = {
@@ -1624,6 +1959,11 @@ static int __init zcache_init(void)
                                sizeof(struct tmem_objnode), 0, 0, NULL);
        zcache_obj_cache = kmem_cache_create("zcache_obj",
                                sizeof(struct tmem_obj), 0, 0, NULL);
+       ret = zcache_new_client(LOCAL_CLIENT);
+       if (ret) {
+               pr_err("zcache: can't create client\n");
+               goto out;
+       }
 #endif
 #ifdef CONFIG_CLEANCACHE
        if (zcache_enabled && use_cleancache) {
@@ -1642,11 +1982,6 @@ static int __init zcache_init(void)
        if (zcache_enabled && use_frontswap) {
                struct frontswap_ops old_ops;
 
-               zcache_client.xvpool = xv_create_pool();
-               if (zcache_client.xvpool == NULL) {
-                       pr_err("zcache: can't create xvpool\n");
-                       goto out;
-               }
                old_ops = zcache_frontswap_register_ops();
                pr_info("zcache: frontswap enabled using kernel "
                        "transcendent memory and xvmalloc\n");