libceph: implement bio message data item cursor
authorAlex Elder <elder@inktank.com>
Thu, 7 Mar 2013 05:39:39 +0000 (23:39 -0600)
committerSage Weil <sage@inktank.com>
Thu, 2 May 2013 04:16:59 +0000 (21:16 -0700)
Implement and use cursor routines for bio message data items for
outbound message data.

(See the previous commit for reasoning in support of the changes
in out_msg_pos_next().)

Signed-off-by: Alex Elder <elder@inktank.com>
Reviewed-by: Josh Durgin <josh.durgin@inktank.com>
include/linux/ceph/messenger.h
net/ceph/messenger.c

index 716c3fdeb257a82f221f650e9ac8e6047a6dbc74..76b4645e2dff89fd69ca85619743085da2e9eaaf 100644 (file)
@@ -98,6 +98,13 @@ static __inline__ bool ceph_msg_data_type_valid(enum ceph_msg_data_type type)
 struct ceph_msg_data_cursor {
        bool            last_piece;     /* now at last piece of data item */
        union {
+#ifdef CONFIG_BLOCK
+               struct {                                /* bio */
+                       struct bio      *bio;           /* bio from list */
+                       unsigned int    vector_index;   /* vector from bio */
+                       unsigned int    vector_offset;  /* bytes from vector */
+               };
+#endif /* CONFIG_BLOCK */
                struct {                                /* pagelist */
                        struct page     *page;          /* page from list */
                        size_t          offset;         /* bytes from list */
index 30c8792be180d48feb36c6138d45c05cb5e392ae..209990a853e5312403dea346f5668bd8d336fe8e 100644 (file)
@@ -739,6 +739,95 @@ static void iter_bio_next(struct bio **bio_iter, unsigned int *seg)
        if (*seg == (*bio_iter)->bi_vcnt)
                init_bio_iter((*bio_iter)->bi_next, bio_iter, seg);
 }
+
+/*
+ * For a bio data item, a piece is whatever remains of the next
+ * entry in the current bio iovec, or the first entry in the next
+ * bio in the list.
+ */
+static void ceph_msg_data_bio_cursor_init(struct ceph_msg_data *data)
+{
+       struct ceph_msg_data_cursor *cursor = &data->cursor;
+       struct bio *bio;
+
+       BUG_ON(data->type != CEPH_MSG_DATA_BIO);
+
+       bio = data->bio;
+       BUG_ON(!bio);
+       BUG_ON(!bio->bi_vcnt);
+       /* resid = bio->bi_size */
+
+       cursor->bio = bio;
+       cursor->vector_index = 0;
+       cursor->vector_offset = 0;
+       cursor->last_piece = !bio->bi_next && bio->bi_vcnt == 1;
+}
+
+static struct page *ceph_msg_data_bio_next(struct ceph_msg_data *data,
+                                               size_t *page_offset,
+                                               size_t *length)
+{
+       struct ceph_msg_data_cursor *cursor = &data->cursor;
+       struct bio *bio;
+       struct bio_vec *bio_vec;
+       unsigned int index;
+
+       BUG_ON(data->type != CEPH_MSG_DATA_BIO);
+
+       bio = cursor->bio;
+       BUG_ON(!bio);
+
+       index = cursor->vector_index;
+       BUG_ON(index >= (unsigned int) bio->bi_vcnt);
+
+       bio_vec = &bio->bi_io_vec[index];
+       BUG_ON(cursor->vector_offset >= bio_vec->bv_len);
+       *page_offset = (size_t) (bio_vec->bv_offset + cursor->vector_offset);
+       BUG_ON(*page_offset >= PAGE_SIZE);
+       *length = (size_t) (bio_vec->bv_len - cursor->vector_offset);
+       BUG_ON(*length > PAGE_SIZE);
+
+       return bio_vec->bv_page;
+}
+
+static bool ceph_msg_data_bio_advance(struct ceph_msg_data *data, size_t bytes)
+{
+       struct ceph_msg_data_cursor *cursor = &data->cursor;
+       struct bio *bio;
+       struct bio_vec *bio_vec;
+       unsigned int index;
+
+       BUG_ON(data->type != CEPH_MSG_DATA_BIO);
+
+       bio = cursor->bio;
+       BUG_ON(!bio);
+
+       index = cursor->vector_index;
+       BUG_ON(index >= (unsigned int) bio->bi_vcnt);
+       bio_vec = &bio->bi_io_vec[index];
+       BUG_ON(cursor->vector_offset + bytes > bio_vec->bv_len);
+
+       /* Advance the cursor offset */
+
+       cursor->vector_offset += bytes;
+       if (cursor->vector_offset < bio_vec->bv_len)
+               return false;   /* more bytes to process in this segment */
+
+       /* Move on to the next segment, and possibly the next bio */
+
+       if (++cursor->vector_index == (unsigned int) bio->bi_vcnt) {
+               bio = bio->bi_next;
+               cursor->bio = bio;
+               cursor->vector_index = 0;
+       }
+       cursor->vector_offset = 0;
+
+       if (!cursor->last_piece && bio && !bio->bi_next)
+               if (cursor->vector_index == (unsigned int) bio->bi_vcnt - 1)
+                       cursor->last_piece = true;
+
+       return true;
+}
 #endif
 
 /*
@@ -843,11 +932,13 @@ static void ceph_msg_data_cursor_init(struct ceph_msg_data *data)
        case CEPH_MSG_DATA_PAGELIST:
                ceph_msg_data_pagelist_cursor_init(data);
                break;
-       case CEPH_MSG_DATA_NONE:
-       case CEPH_MSG_DATA_PAGES:
 #ifdef CONFIG_BLOCK
        case CEPH_MSG_DATA_BIO:
+               ceph_msg_data_bio_cursor_init(data);
+               break;
 #endif /* CONFIG_BLOCK */
+       case CEPH_MSG_DATA_NONE:
+       case CEPH_MSG_DATA_PAGES:
        default:
                /* BUG(); */
                break;
@@ -870,11 +961,13 @@ static struct page *ceph_msg_data_next(struct ceph_msg_data *data,
        case CEPH_MSG_DATA_PAGELIST:
                page = ceph_msg_data_pagelist_next(data, page_offset, length);
                break;
-       case CEPH_MSG_DATA_NONE:
-       case CEPH_MSG_DATA_PAGES:
 #ifdef CONFIG_BLOCK
        case CEPH_MSG_DATA_BIO:
+               page = ceph_msg_data_bio_next(data, page_offset, length);
+               break;
 #endif /* CONFIG_BLOCK */
+       case CEPH_MSG_DATA_NONE:
+       case CEPH_MSG_DATA_PAGES:
        default:
                page = NULL;
                break;
@@ -900,11 +993,13 @@ static bool ceph_msg_data_advance(struct ceph_msg_data *data, size_t bytes)
        case CEPH_MSG_DATA_PAGELIST:
                new_piece = ceph_msg_data_pagelist_advance(data, bytes);
                break;
-       case CEPH_MSG_DATA_NONE:
-       case CEPH_MSG_DATA_PAGES:
 #ifdef CONFIG_BLOCK
        case CEPH_MSG_DATA_BIO:
+               new_piece = ceph_msg_data_bio_advance(data, bytes);
+               break;
 #endif /* CONFIG_BLOCK */
+       case CEPH_MSG_DATA_NONE:
+       case CEPH_MSG_DATA_PAGES:
        default:
                BUG();
                break;
@@ -933,6 +1028,10 @@ static void prepare_message_data(struct ceph_msg *msg,
 
        /* Initialize data cursors */
 
+#ifdef CONFIG_BLOCK
+       if (ceph_msg_has_bio(msg))
+               ceph_msg_data_cursor_init(&msg->b);
+#endif /* CONFIG_BLOCK */
        if (ceph_msg_has_pagelist(msg))
                ceph_msg_data_cursor_init(&msg->l);
        if (ceph_msg_has_trail(msg))
@@ -1233,6 +1332,10 @@ static void out_msg_pos_next(struct ceph_connection *con, struct page *page,
                need_crc = ceph_msg_data_advance(&msg->t, sent);
        else if (ceph_msg_has_pagelist(msg))
                need_crc = ceph_msg_data_advance(&msg->l, sent);
+#ifdef CONFIG_BLOCK
+       else if (ceph_msg_has_bio(msg))
+               need_crc = ceph_msg_data_advance(&msg->b, sent);
+#endif /* CONFIG_BLOCK */
        BUG_ON(need_crc && sent != len);
 
        if (sent < len)
@@ -1242,10 +1345,6 @@ static void out_msg_pos_next(struct ceph_connection *con, struct page *page,
        msg_pos->page_pos = 0;
        msg_pos->page++;
        msg_pos->did_page_crc = false;
-#ifdef CONFIG_BLOCK
-       if (ceph_msg_has_bio(msg))
-               iter_bio_next(&msg->b.bio_iter, &msg->b.bio_seg);
-#endif
 }
 
 static void in_msg_pos_next(struct ceph_connection *con, size_t len,
@@ -1323,8 +1422,6 @@ static int write_partial_message_data(struct ceph_connection *con)
                struct page *page = NULL;
                size_t page_offset;
                size_t length;
-               int max_write = PAGE_SIZE;
-               int bio_offset = 0;
                bool use_cursor = false;
                bool last_piece = true; /* preserve existing behavior */
 
@@ -1345,21 +1442,19 @@ static int write_partial_message_data(struct ceph_connection *con)
                                                        &length, &last_piece);
 #ifdef CONFIG_BLOCK
                } else if (ceph_msg_has_bio(msg)) {
-                       struct bio_vec *bv;
-
-                       bv = bio_iovec_idx(msg->b.bio_iter, msg->b.bio_seg);
-                       page = bv->bv_page;
-                       bio_offset = bv->bv_offset;
-                       max_write = bv->bv_len;
+                       use_cursor = true;
+                       page = ceph_msg_data_next(&msg->b, &page_offset,
+                                                       &length, &last_piece);
 #endif
                } else {
                        page = zero_page;
                }
-               if (!use_cursor)
-                       length = min_t(int, max_write - msg_pos->page_pos,
+               if (!use_cursor) {
+                       length = min_t(int, PAGE_SIZE - msg_pos->page_pos,
                                            total_max_write);
 
-               page_offset = msg_pos->page_pos + bio_offset;
+                       page_offset = msg_pos->page_pos;
+               }
                if (do_datacrc && !msg_pos->did_page_crc) {
                        u32 crc = le32_to_cpu(msg->footer.data_crc);