libceph: implement pages array cursor
authorAlex Elder <elder@inktank.com>
Thu, 7 Mar 2013 21:38:28 +0000 (15:38 -0600)
committerSage Weil <sage@inktank.com>
Thu, 2 May 2013 04:17:01 +0000 (21:17 -0700)
Implement and use cursor routines for page array message data items
for outbound message data.

Signed-off-by: Alex Elder <elder@inktank.com>
Reviewed-by: Josh Durgin <josh.durgin@inktank.com>
include/linux/ceph/messenger.h
net/ceph/messenger.c

index 76b4645e2dff89fd69ca85619743085da2e9eaaf..b53b9ef650091f248e8acc1fde0cc3381686eefb 100644 (file)
@@ -105,6 +105,12 @@ struct ceph_msg_data_cursor {
                        unsigned int    vector_offset;  /* bytes from vector */
                };
 #endif /* CONFIG_BLOCK */
+               struct {                                /* pages */
+                       size_t          resid;          /* bytes from array */
+                       unsigned int    page_offset;    /* offset in page */
+                       unsigned short  page_index;     /* index in array */
+                       unsigned short  page_count;     /* pages in array */
+               };
                struct {                                /* pagelist */
                        struct page     *page;          /* page from list */
                        size_t          offset;         /* bytes from list */
index 209990a853e5312403dea346f5668bd8d336fe8e..d611156808b3c7fd3417fea073935c3f73dec234 100644 (file)
@@ -830,6 +830,79 @@ static bool ceph_msg_data_bio_advance(struct ceph_msg_data *data, size_t bytes)
 }
 #endif
 
+/*
+ * For a page array, a piece comes from the first page in the array
+ * that has not already been fully consumed.
+ */
+static void ceph_msg_data_pages_cursor_init(struct ceph_msg_data *data)
+{
+       struct ceph_msg_data_cursor *cursor = &data->cursor;
+       int page_count;
+
+       BUG_ON(data->type != CEPH_MSG_DATA_PAGES);
+
+       BUG_ON(!data->pages);
+       BUG_ON(!data->length);
+
+       page_count = calc_pages_for(data->alignment, (u64)data->length);
+       BUG_ON(page_count > (int) USHRT_MAX);
+       cursor->resid = data->length;
+       cursor->page_offset = data->alignment & ~PAGE_MASK;
+       cursor->page_index = 0;
+       cursor->page_count = (unsigned short) page_count;
+       cursor->last_piece = cursor->page_count == 1;
+}
+
+static struct page *ceph_msg_data_pages_next(struct ceph_msg_data *data,
+                                       size_t *page_offset,
+                                       size_t *length)
+{
+       struct ceph_msg_data_cursor *cursor = &data->cursor;
+
+       BUG_ON(data->type != CEPH_MSG_DATA_PAGES);
+
+       BUG_ON(cursor->page_index >= cursor->page_count);
+       BUG_ON(cursor->page_offset >= PAGE_SIZE);
+       BUG_ON(!cursor->resid);
+
+       *page_offset = cursor->page_offset;
+       if (cursor->last_piece) {
+               BUG_ON(*page_offset + cursor->resid > PAGE_SIZE);
+               *length = cursor->resid;
+       } else {
+               *length = PAGE_SIZE - *page_offset;
+       }
+
+       return data->pages[cursor->page_index];
+}
+
+static bool ceph_msg_data_pages_advance(struct ceph_msg_data *data,
+                                               size_t bytes)
+{
+       struct ceph_msg_data_cursor *cursor = &data->cursor;
+
+       BUG_ON(data->type != CEPH_MSG_DATA_PAGES);
+
+       BUG_ON(cursor->page_offset + bytes > PAGE_SIZE);
+       BUG_ON(bytes > cursor->resid);
+
+       /* Advance the cursor page offset */
+
+       cursor->resid -= bytes;
+       cursor->page_offset += bytes;
+       if (!bytes || cursor->page_offset & ~PAGE_MASK)
+               return false;   /* more bytes to process in the current page */
+
+       /* Move on to the next page */
+
+       BUG_ON(cursor->page_index >= cursor->page_count);
+       cursor->page_offset = 0;
+       cursor->page_index++;
+       cursor->last_piece = cursor->page_index == cursor->page_count - 1;
+
+       return true;
+}
+
 /*
  * For a pagelist, a piece is whatever remains to be consumed in the
  * first page in the list, or the front of the next page.
@@ -932,13 +1005,15 @@ static void ceph_msg_data_cursor_init(struct ceph_msg_data *data)
        case CEPH_MSG_DATA_PAGELIST:
                ceph_msg_data_pagelist_cursor_init(data);
                break;
+       case CEPH_MSG_DATA_PAGES:
+               ceph_msg_data_pages_cursor_init(data);
+               break;
 #ifdef CONFIG_BLOCK
        case CEPH_MSG_DATA_BIO:
                ceph_msg_data_bio_cursor_init(data);
                break;
 #endif /* CONFIG_BLOCK */
        case CEPH_MSG_DATA_NONE:
-       case CEPH_MSG_DATA_PAGES:
        default:
                /* BUG(); */
                break;
@@ -961,13 +1036,15 @@ static struct page *ceph_msg_data_next(struct ceph_msg_data *data,
        case CEPH_MSG_DATA_PAGELIST:
                page = ceph_msg_data_pagelist_next(data, page_offset, length);
                break;
+       case CEPH_MSG_DATA_PAGES:
+               page = ceph_msg_data_pages_next(data, page_offset, length);
+               break;
 #ifdef CONFIG_BLOCK
        case CEPH_MSG_DATA_BIO:
                page = ceph_msg_data_bio_next(data, page_offset, length);
                break;
 #endif /* CONFIG_BLOCK */
        case CEPH_MSG_DATA_NONE:
-       case CEPH_MSG_DATA_PAGES:
        default:
                page = NULL;
                break;
@@ -993,13 +1070,15 @@ static bool ceph_msg_data_advance(struct ceph_msg_data *data, size_t bytes)
        case CEPH_MSG_DATA_PAGELIST:
                new_piece = ceph_msg_data_pagelist_advance(data, bytes);
                break;
+       case CEPH_MSG_DATA_PAGES:
+               new_piece = ceph_msg_data_pages_advance(data, bytes);
+               break;
 #ifdef CONFIG_BLOCK
        case CEPH_MSG_DATA_BIO:
                new_piece = ceph_msg_data_bio_advance(data, bytes);
                break;
 #endif /* CONFIG_BLOCK */
        case CEPH_MSG_DATA_NONE:
-       case CEPH_MSG_DATA_PAGES:
        default:
                BUG();
                break;
@@ -1032,6 +1111,8 @@ static void prepare_message_data(struct ceph_msg *msg,
        if (ceph_msg_has_bio(msg))
                ceph_msg_data_cursor_init(&msg->b);
 #endif /* CONFIG_BLOCK */
+       if (ceph_msg_has_pages(msg))
+               ceph_msg_data_cursor_init(&msg->p);
        if (ceph_msg_has_pagelist(msg))
                ceph_msg_data_cursor_init(&msg->l);
        if (ceph_msg_has_trail(msg))
@@ -1330,6 +1411,8 @@ static void out_msg_pos_next(struct ceph_connection *con, struct page *page,
        msg_pos->page_pos += sent;
        if (in_trail)
                need_crc = ceph_msg_data_advance(&msg->t, sent);
+       else if (ceph_msg_has_pages(msg))
+               need_crc = ceph_msg_data_advance(&msg->p, sent);
        else if (ceph_msg_has_pagelist(msg))
                need_crc = ceph_msg_data_advance(&msg->l, sent);
 #ifdef CONFIG_BLOCK
@@ -1435,7 +1518,9 @@ static int write_partial_message_data(struct ceph_connection *con)
                        page = ceph_msg_data_next(&msg->t, &page_offset,
                                                        &length, &last_piece);
                } else if (ceph_msg_has_pages(msg)) {
-                       page = msg->p.pages[msg_pos->page];
+                       use_cursor = true;
+                       page = ceph_msg_data_next(&msg->p, &page_offset,
+                                                       &length, &last_piece);
                } else if (ceph_msg_has_pagelist(msg)) {
                        use_cursor = true;
                        page = ceph_msg_data_next(&msg->l, &page_offset,