libceph: introduce reference counted string
authorYan, Zheng <zyan@redhat.com>
Fri, 5 Feb 2016 07:36:22 +0000 (15:36 +0800)
committerIlya Dryomov <idryomov@gmail.com>
Thu, 28 Jul 2016 00:55:37 +0000 (02:55 +0200)
The data structure is for storing namesapce string. It allows namespace
string to be shared between cephfs inodes with same layout. This data
structure can also be referenced by OSD request.

Signed-off-by: Yan, Zheng <zyan@redhat.com>
include/linux/ceph/libceph.h
include/linux/ceph/string_table.h [new file with mode: 0644]
net/ceph/Makefile
net/ceph/ceph_common.c
net/ceph/string_table.c [new file with mode: 0644]

index 690985daad1c6e9ed6d18b7515522f96ef3411c6..6afc5d98fad17c0b03cec08ad281e2090cb9d45a 100644 (file)
@@ -21,6 +21,7 @@
 #include <linux/ceph/mon_client.h>
 #include <linux/ceph/osd_client.h>
 #include <linux/ceph/ceph_fs.h>
+#include <linux/ceph/string_table.h>
 
 /*
  * mount options
diff --git a/include/linux/ceph/string_table.h b/include/linux/ceph/string_table.h
new file mode 100644 (file)
index 0000000..1b02c96
--- /dev/null
@@ -0,0 +1,62 @@
+#ifndef _FS_CEPH_STRING_TABLE_H
+#define _FS_CEPH_STRING_TABLE_H
+
+#include <linux/types.h>
+#include <linux/kref.h>
+#include <linux/rbtree.h>
+#include <linux/rcupdate.h>
+
+struct ceph_string {
+       struct kref kref;
+       union {
+               struct rb_node node;
+               struct rcu_head rcu;
+       };
+       size_t len;
+       char str[];
+};
+
+extern void ceph_release_string(struct kref *ref);
+extern struct ceph_string *ceph_find_or_create_string(const char *str,
+                                                     size_t len);
+extern bool ceph_strings_empty(void);
+
+static inline struct ceph_string *ceph_get_string(struct ceph_string *str)
+{
+       kref_get(&str->kref);
+       return str;
+}
+
+static inline void ceph_put_string(struct ceph_string *str)
+{
+       if (!str)
+               return;
+       kref_put(&str->kref, ceph_release_string);
+}
+
+static inline int ceph_compare_string(struct ceph_string *cs,
+                                     const char* str, size_t len)
+{
+       size_t cs_len = cs ? cs->len : 0;
+       if (cs_len != len)
+               return cs_len - len;
+       if (len == 0)
+               return 0;
+       return strncmp(cs->str, str, len);
+}
+
+#define ceph_try_get_string(x)                                 \
+({                                                             \
+       struct ceph_string *___str;                             \
+       rcu_read_lock();                                        \
+       for (;;) {                                              \
+               ___str = rcu_dereference(x);                    \
+               if (!___str ||                                  \
+                   kref_get_unless_zero(&___str->kref))        \
+                       break;                                  \
+       }                                                       \
+       rcu_read_unlock();                                      \
+       (___str);                                               \
+})
+
+#endif
index 958d9856912c2373ef2726322d78d281f1a9c583..84cbed630c4b1e75d0dd05dabb043793ab11f5bb 100644 (file)
@@ -11,5 +11,5 @@ libceph-y := ceph_common.o messenger.o msgpool.o buffer.o pagelist.o \
        crypto.o armor.o \
        auth_x.o \
        ceph_fs.o ceph_strings.o ceph_hash.o \
-       pagevec.o snapshot.o
+       pagevec.o snapshot.o string_table.o
 
index 55d2bfee16d798f6f01ae44c1052e824e590a0d8..bddfcf6f09c278afccb044183b5cc4c6e7449995 100644 (file)
@@ -747,6 +747,8 @@ out:
 static void __exit exit_ceph_lib(void)
 {
        dout("exit_ceph_lib\n");
+       WARN_ON(!ceph_strings_empty());
+
        ceph_osdc_cleanup();
        ceph_msgr_exit();
        ceph_crypto_shutdown();
diff --git a/net/ceph/string_table.c b/net/ceph/string_table.c
new file mode 100644 (file)
index 0000000..ca53c83
--- /dev/null
@@ -0,0 +1,111 @@
+#include <linux/slab.h>
+#include <linux/gfp.h>
+#include <linux/string.h>
+#include <linux/spinlock.h>
+#include <linux/ceph/string_table.h>
+
+static DEFINE_SPINLOCK(string_tree_lock);
+static struct rb_root string_tree = RB_ROOT;
+
+struct ceph_string *ceph_find_or_create_string(const char* str, size_t len)
+{
+       struct ceph_string *cs, *exist;
+       struct rb_node **p, *parent;
+       int ret;
+
+       exist = NULL;
+       spin_lock(&string_tree_lock);
+       p = &string_tree.rb_node;
+       while (*p) {
+               exist = rb_entry(*p, struct ceph_string, node);
+               ret = ceph_compare_string(exist, str, len);
+               if (ret > 0)
+                       p = &(*p)->rb_left;
+               else if (ret < 0)
+                       p = &(*p)->rb_right;
+               else
+                       break;
+               exist = NULL;
+       }
+       if (exist && !kref_get_unless_zero(&exist->kref)) {
+               rb_erase(&exist->node, &string_tree);
+               RB_CLEAR_NODE(&exist->node);
+               exist = NULL;
+       }
+       spin_unlock(&string_tree_lock);
+       if (exist)
+               return exist;
+
+       cs = kmalloc(sizeof(*cs) + len + 1, GFP_NOFS);
+       if (!cs)
+               return NULL;
+
+       kref_init(&cs->kref);
+       cs->len = len;
+       memcpy(cs->str, str, len);
+       cs->str[len] = 0;
+
+retry:
+       exist = NULL;
+       parent = NULL;
+       p = &string_tree.rb_node;
+       spin_lock(&string_tree_lock);
+       while (*p) {
+               parent = *p;
+               exist = rb_entry(*p, struct ceph_string, node);
+               ret = ceph_compare_string(exist, str, len);
+               if (ret > 0)
+                       p = &(*p)->rb_left;
+               else if (ret < 0)
+                       p = &(*p)->rb_right;
+               else
+                       break;
+               exist = NULL;
+       }
+       ret = 0;
+       if (!exist) {
+               rb_link_node(&cs->node, parent, p);
+               rb_insert_color(&cs->node, &string_tree);
+       } else if (!kref_get_unless_zero(&exist->kref)) {
+               rb_erase(&exist->node, &string_tree);
+               RB_CLEAR_NODE(&exist->node);
+               ret = -EAGAIN;
+       }
+       spin_unlock(&string_tree_lock);
+       if (ret == -EAGAIN)
+               goto retry;
+
+       if (exist) {
+               kfree(cs);
+               cs = exist;
+       }
+
+       return cs;
+}
+EXPORT_SYMBOL(ceph_find_or_create_string);
+
+static void ceph_free_string(struct rcu_head *head)
+{
+       struct ceph_string *cs = container_of(head, struct ceph_string, rcu);
+       kfree(cs);
+}
+
+void ceph_release_string(struct kref *ref)
+{
+       struct ceph_string *cs = container_of(ref, struct ceph_string, kref);
+
+       spin_lock(&string_tree_lock);
+       if (!RB_EMPTY_NODE(&cs->node)) {
+               rb_erase(&cs->node, &string_tree);
+               RB_CLEAR_NODE(&cs->node);
+       }
+       spin_unlock(&string_tree_lock);
+
+       call_rcu(&cs->rcu, ceph_free_string);
+}
+EXPORT_SYMBOL(ceph_release_string);
+
+bool ceph_strings_empty(void)
+{
+       return RB_EMPTY_ROOT(&string_tree);
+}