libceph: make message data be a pointer
authorAlex Elder <elder@inktank.com>
Tue, 12 Mar 2013 04:34:24 +0000 (23:34 -0500)
committerSage Weil <sage@inktank.com>
Thu, 2 May 2013 04:17:37 +0000 (21:17 -0700)
Begin the transition from a single message data item to a list of
them by replacing the "data" structure in a message with a pointer
to a ceph_msg_data structure.

A null pointer will indicate the message has no data; replace the
use of ceph_msg_has_data() with a simple check for a null pointer.

Create functions ceph_msg_data_create() and ceph_msg_data_destroy()
to dynamically allocate and free a data item structure of a given type.

When a message has its data item "set," allocate one of these to
hold the data description, and free it when the last reference to
the message is dropped.

This partially resolves:
    http://tracker.ceph.com/issues/4429

Signed-off-by: Alex Elder <elder@inktank.com>
Reviewed-by: Josh Durgin <josh.durgin@inktank.com>
include/linux/ceph/messenger.h
net/ceph/messenger.c

index 686df5bfa717aa7543b47fa54682554ebd4000b3..3181321bed6d98c79289678adbd05ce157c35f6a 100644 (file)
@@ -64,8 +64,6 @@ struct ceph_messenger {
        u32 required_features;
 };
 
-#define ceph_msg_has_data(m)           ((m)->data.type != CEPH_MSG_DATA_NONE)
-
 enum ceph_msg_data_type {
        CEPH_MSG_DATA_NONE,     /* message contains no data payload */
        CEPH_MSG_DATA_PAGES,    /* data source/destination is a page array */
@@ -141,8 +139,7 @@ struct ceph_msg {
        struct kvec front;              /* unaligned blobs of message */
        struct ceph_buffer *middle;
 
-       /* data payload */
-       struct ceph_msg_data    data;
+       struct ceph_msg_data    *data;  /* data payload */
 
        struct ceph_connection *con;
        struct list_head list_head;     /* links for connection lists */
index dd4b8226a48a269b61d9f3861a0b58e66302417e..d4e46d8a088ccefe94cba6a40de8a3012b12b7b1 100644 (file)
@@ -1086,7 +1086,7 @@ static void prepare_message_data(struct ceph_msg *msg)
 
        /* Initialize data cursor */
 
-       ceph_msg_data_cursor_init(&msg->data, data_len);
+       ceph_msg_data_cursor_init(msg->data, data_len);
 }
 
 /*
@@ -1406,13 +1406,13 @@ static u32 ceph_crc32c_page(u32 crc, struct page *page,
 static int write_partial_message_data(struct ceph_connection *con)
 {
        struct ceph_msg *msg = con->out_msg;
-       struct ceph_msg_data_cursor *cursor = &msg->data.cursor;
+       struct ceph_msg_data_cursor *cursor = &msg->data->cursor;
        bool do_datacrc = !con->msgr->nocrc;
        u32 crc;
 
        dout("%s %p msg %p\n", __func__, con, msg);
 
-       if (WARN_ON(!ceph_msg_has_data(msg)))
+       if (WARN_ON(!msg->data))
                return -EINVAL;
 
        /*
@@ -1432,7 +1432,7 @@ static int write_partial_message_data(struct ceph_connection *con)
                bool need_crc;
                int ret;
 
-               page = ceph_msg_data_next(&msg->data, &page_offset, &length,
+               page = ceph_msg_data_next(msg->data, &page_offset, &length,
                                                        &last_piece);
                ret = ceph_tcp_sendpage(con->sock, page, page_offset,
                                      length, last_piece);
@@ -1444,7 +1444,7 @@ static int write_partial_message_data(struct ceph_connection *con)
                }
                if (do_datacrc && cursor->need_crc)
                        crc = ceph_crc32c_page(crc, page, page_offset, length);
-               need_crc = ceph_msg_data_advance(&msg->data, (size_t) ret);
+               need_crc = ceph_msg_data_advance(msg->data, (size_t)ret);
        }
 
        dout("%s %p msg %p done\n", __func__, con, msg);
@@ -2104,7 +2104,7 @@ static int read_partial_message_section(struct ceph_connection *con,
 static int read_partial_msg_data(struct ceph_connection *con)
 {
        struct ceph_msg *msg = con->in_msg;
-       struct ceph_msg_data_cursor *cursor = &msg->data.cursor;
+       struct ceph_msg_data_cursor *cursor = &msg->data->cursor;
        const bool do_datacrc = !con->msgr->nocrc;
        struct page *page;
        size_t page_offset;
@@ -2113,13 +2113,13 @@ static int read_partial_msg_data(struct ceph_connection *con)
        int ret;
 
        BUG_ON(!msg);
-       if (WARN_ON(!ceph_msg_has_data(msg)))
+       if (!msg->data)
                return -EIO;
 
        if (do_datacrc)
                crc = con->in_data_crc;
        while (cursor->resid) {
-               page = ceph_msg_data_next(&msg->data, &page_offset, &length,
+               page = ceph_msg_data_next(msg->data, &page_offset, &length,
                                                        NULL);
                ret = ceph_tcp_recvpage(con->sock, page, page_offset, length);
                if (ret <= 0) {
@@ -2131,7 +2131,7 @@ static int read_partial_msg_data(struct ceph_connection *con)
 
                if (do_datacrc)
                        crc = ceph_crc32c_page(crc, page, page_offset, ret);
-               (void) ceph_msg_data_advance(&msg->data, (size_t) ret);
+               (void) ceph_msg_data_advance(msg->data, (size_t)ret);
        }
        if (do_datacrc)
                con->in_data_crc = crc;
@@ -2947,44 +2947,80 @@ void ceph_con_keepalive(struct ceph_connection *con)
 }
 EXPORT_SYMBOL(ceph_con_keepalive);
 
-static void ceph_msg_data_init(struct ceph_msg_data *data)
+static struct ceph_msg_data *ceph_msg_data_create(enum ceph_msg_data_type type)
 {
-       data->type = CEPH_MSG_DATA_NONE;
+       struct ceph_msg_data *data;
+
+       if (WARN_ON(!ceph_msg_data_type_valid(type)))
+               return NULL;
+
+       data = kzalloc(sizeof (*data), GFP_NOFS);
+       if (data)
+               data->type = type;
+
+       return data;
+}
+
+static void ceph_msg_data_destroy(struct ceph_msg_data *data)
+{
+       if (!data)
+               return;
+
+       if (data->type == CEPH_MSG_DATA_PAGELIST) {
+               ceph_pagelist_release(data->pagelist);
+               kfree(data->pagelist);
+       }
+       kfree(data);
 }
 
 void ceph_msg_data_set_pages(struct ceph_msg *msg, struct page **pages,
                size_t length, size_t alignment)
 {
+       struct ceph_msg_data *data;
+
        BUG_ON(!pages);
        BUG_ON(!length);
-       BUG_ON(msg->data.type != CEPH_MSG_DATA_NONE);
+       BUG_ON(msg->data != NULL);
+
+       data = ceph_msg_data_create(CEPH_MSG_DATA_PAGES);
+       BUG_ON(!data);
+       data->pages = pages;
+       data->length = length;
+       data->alignment = alignment & ~PAGE_MASK;
 
-       msg->data.type = CEPH_MSG_DATA_PAGES;
-       msg->data.pages = pages;
-       msg->data.length = length;
-       msg->data.alignment = alignment & ~PAGE_MASK;
+       msg->data = data;
 }
 EXPORT_SYMBOL(ceph_msg_data_set_pages);
 
 void ceph_msg_data_set_pagelist(struct ceph_msg *msg,
                                struct ceph_pagelist *pagelist)
 {
+       struct ceph_msg_data *data;
+
        BUG_ON(!pagelist);
        BUG_ON(!pagelist->length);
-       BUG_ON(msg->data.type != CEPH_MSG_DATA_NONE);
+       BUG_ON(msg->data != NULL);
 
-       msg->data.type = CEPH_MSG_DATA_PAGELIST;
-       msg->data.pagelist = pagelist;
+       data = ceph_msg_data_create(CEPH_MSG_DATA_PAGELIST);
+       BUG_ON(!data);
+       data->pagelist = pagelist;
+
+       msg->data = data;
 }
 EXPORT_SYMBOL(ceph_msg_data_set_pagelist);
 
 void ceph_msg_data_set_bio(struct ceph_msg *msg, struct bio *bio)
 {
+       struct ceph_msg_data *data;
+
        BUG_ON(!bio);
-       BUG_ON(msg->data.type != CEPH_MSG_DATA_NONE);
+       BUG_ON(msg->data != NULL);
 
-       msg->data.type = CEPH_MSG_DATA_BIO;
-       msg->data.bio = bio;
+       data = ceph_msg_data_create(CEPH_MSG_DATA_BIO);
+       BUG_ON(!data);
+       data->bio = bio;
+
+       msg->data = data;
 }
 EXPORT_SYMBOL(ceph_msg_data_set_bio);
 
@@ -3008,8 +3044,6 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
        INIT_LIST_HEAD(&m->list_head);
        kref_init(&m->kref);
 
-       ceph_msg_data_init(&m->data);
-
        /* front */
        m->front_max = front_len;
        if (front_len) {
@@ -3163,14 +3197,8 @@ void ceph_msg_last_put(struct kref *kref)
                ceph_buffer_put(m->middle);
                m->middle = NULL;
        }
-       if (ceph_msg_has_data(m)) {
-               if (m->data.type == CEPH_MSG_DATA_PAGELIST) {
-                       ceph_pagelist_release(m->data.pagelist);
-                       kfree(m->data.pagelist);
-               }
-               memset(&m->data, 0, sizeof m->data);
-               ceph_msg_data_init(&m->data);
-       }
+       ceph_msg_data_destroy(m->data);
+       m->data = NULL;
 
        if (m->pool)
                ceph_msgpool_put(m->pool, m);
@@ -3182,7 +3210,7 @@ EXPORT_SYMBOL(ceph_msg_last_put);
 void ceph_msg_dump(struct ceph_msg *msg)
 {
        pr_debug("msg_dump %p (front_max %d length %zd)\n", msg,
-                msg->front_max, msg->data.length);
+                msg->front_max, msg->data->length);
        print_hex_dump(KERN_DEBUG, "header: ",
                       DUMP_PREFIX_OFFSET, 16, 1,
                       &msg->hdr, sizeof(msg->hdr), true);