From 58bb3b374b07a2a43315213f00a48a5ffd6d0915 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Wed, 23 Dec 2009 12:12:31 -0800 Subject: [PATCH] ceph: support ceph_pagelist for message payload The ceph_pagelist is a simple list of whole pages, strung together via their lru list_head. It facilitates encoding to a "buffer" of unknown size. Allow its use in place of the ceph_msg page vector. This will be used to fix the huge buffer preallocation woes of MDS reconnection. Signed-off-by: Sage Weil --- fs/ceph/Makefile | 2 +- fs/ceph/messenger.c | 24 ++++++++++++++++---- fs/ceph/messenger.h | 1 + fs/ceph/pagelist.c | 54 +++++++++++++++++++++++++++++++++++++++++++++ fs/ceph/pagelist.h | 54 +++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 130 insertions(+), 5 deletions(-) create mode 100644 fs/ceph/pagelist.c create mode 100644 fs/ceph/pagelist.h diff --git a/fs/ceph/Makefile b/fs/ceph/Makefile index 827629c85768..47caf2f1b75a 100644 --- a/fs/ceph/Makefile +++ b/fs/ceph/Makefile @@ -8,7 +8,7 @@ obj-$(CONFIG_CEPH_FS) += ceph.o ceph-objs := super.o inode.o dir.o file.o addr.o ioctl.o \ export.o caps.o snap.o xattr.o \ - messenger.o msgpool.o buffer.o \ + messenger.o msgpool.o buffer.o pagelist.o \ mds_client.o mdsmap.o \ mon_client.o \ osd_client.o osdmap.o crush/crush.o crush/mapper.o crush/hash.o \ diff --git a/fs/ceph/messenger.c b/fs/ceph/messenger.c index 68052f664280..c1106e8360f0 100644 --- a/fs/ceph/messenger.c +++ b/fs/ceph/messenger.c @@ -13,6 +13,7 @@ #include "super.h" #include "messenger.h" #include "decode.h" +#include "pagelist.h" /* * Ceph uses the messenger to exchange ceph_msg messages with other @@ -728,6 +729,11 @@ static int write_partial_msg_pages(struct ceph_connection *con) page = msg->pages[con->out_msg_pos.page]; if (crc) kaddr = kmap(page); + } else if (msg->pagelist) { + page = list_first_entry(&msg->pagelist->head, + struct page, lru); + if (crc) + kaddr = kmap(page); } else { page = con->msgr->zero_page; if (crc) @@ -750,7 +756,7 @@ static int write_partial_msg_pages(struct ceph_connection *con) MSG_DONTWAIT | MSG_NOSIGNAL | MSG_MORE); - if (crc && msg->pages) + if (crc && (msg->pages || msg->pagelist)) kunmap(page); if (ret <= 0) @@ -762,6 +768,9 @@ static int write_partial_msg_pages(struct ceph_connection *con) con->out_msg_pos.page_pos = 0; con->out_msg_pos.page++; con->out_msg_pos.did_page_crc = 0; + if (msg->pagelist) + list_move_tail(&page->lru, + &msg->pagelist->head); } } @@ -1051,13 +1060,13 @@ static int process_banner(struct ceph_connection *con) &con->actual_peer_addr) && !(addr_is_blank(&con->actual_peer_addr.in_addr) && con->actual_peer_addr.nonce == con->peer_addr.nonce)) { - pr_err("wrong peer, want %s/%d, " - "got %s/%d, wtf\n", + pr_warning("wrong peer, want %s/%d, " + "got %s/%d\n", pr_addr(&con->peer_addr.in_addr), con->peer_addr.nonce, pr_addr(&con->actual_peer_addr.in_addr), con->actual_peer_addr.nonce); - con->error_msg = "protocol error, wrong peer"; + con->error_msg = "wrong peer at address"; return -1; } @@ -2096,6 +2105,7 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, /* data */ m->nr_pages = calc_pages_for(page_off, page_len); m->pages = pages; + m->pagelist = NULL; dout("ceph_msg_new %p page %d~%d -> %d\n", m, page_off, page_len, m->nr_pages); @@ -2181,6 +2191,12 @@ void ceph_msg_last_put(struct kref *kref) m->nr_pages = 0; m->pages = NULL; + if (m->pagelist) { + ceph_pagelist_release(m->pagelist); + kfree(m->pagelist); + m->pagelist = NULL; + } + if (m->pool) ceph_msgpool_put(m->pool, m); else diff --git a/fs/ceph/messenger.h b/fs/ceph/messenger.h index 7e2aab1d3ce2..a7b684145092 100644 --- a/fs/ceph/messenger.h +++ b/fs/ceph/messenger.h @@ -85,6 +85,7 @@ struct ceph_msg { struct ceph_buffer *middle; struct page **pages; /* data payload. NOT OWNER. */ unsigned nr_pages; /* size of page array */ + struct ceph_pagelist *pagelist; /* instead of pages */ struct list_head list_head; struct kref kref; bool front_is_vmalloc; diff --git a/fs/ceph/pagelist.c b/fs/ceph/pagelist.c new file mode 100644 index 000000000000..370e93695474 --- /dev/null +++ b/fs/ceph/pagelist.c @@ -0,0 +1,54 @@ + +#include +#include + +#include "pagelist.h" + +int ceph_pagelist_release(struct ceph_pagelist *pl) +{ + if (pl->mapped_tail) + kunmap(pl->mapped_tail); + while (!list_empty(&pl->head)) { + struct page *page = list_first_entry(&pl->head, struct page, + lru); + list_del(&page->lru); + __free_page(page); + } + return 0; +} + +static int ceph_pagelist_addpage(struct ceph_pagelist *pl) +{ + struct page *page = alloc_page(GFP_NOFS); + if (!page) + return -ENOMEM; + pl->room += PAGE_SIZE; + list_add_tail(&page->lru, &pl->head); + if (pl->mapped_tail) + kunmap(pl->mapped_tail); + pl->mapped_tail = kmap(page); + return 0; +} + +int ceph_pagelist_append(struct ceph_pagelist *pl, void *buf, size_t len) +{ + while (pl->room < len) { + size_t bit = pl->room; + int ret; + + memcpy(pl->mapped_tail + (pl->length & ~PAGE_CACHE_MASK), + buf, bit); + pl->length += bit; + pl->room -= bit; + buf += bit; + len -= bit; + ret = ceph_pagelist_addpage(pl); + if (ret) + return ret; + } + + memcpy(pl->mapped_tail + (pl->length & ~PAGE_CACHE_MASK), buf, len); + pl->length += len; + pl->room -= len; + return 0; +} diff --git a/fs/ceph/pagelist.h b/fs/ceph/pagelist.h new file mode 100644 index 000000000000..e8a4187e1087 --- /dev/null +++ b/fs/ceph/pagelist.h @@ -0,0 +1,54 @@ +#ifndef __FS_CEPH_PAGELIST_H +#define __FS_CEPH_PAGELIST_H + +#include + +struct ceph_pagelist { + struct list_head head; + void *mapped_tail; + size_t length; + size_t room; +}; + +static inline void ceph_pagelist_init(struct ceph_pagelist *pl) +{ + INIT_LIST_HEAD(&pl->head); + pl->mapped_tail = NULL; + pl->length = 0; + pl->room = 0; +} +extern int ceph_pagelist_release(struct ceph_pagelist *pl); + +extern int ceph_pagelist_append(struct ceph_pagelist *pl, void *d, size_t l); + +static inline int ceph_pagelist_encode_64(struct ceph_pagelist *pl, u64 v) +{ + __le64 ev = cpu_to_le64(v); + return ceph_pagelist_append(pl, &ev, sizeof(ev)); +} +static inline int ceph_pagelist_encode_32(struct ceph_pagelist *pl, u32 v) +{ + __le32 ev = cpu_to_le32(v); + return ceph_pagelist_append(pl, &ev, sizeof(ev)); +} +static inline int ceph_pagelist_encode_16(struct ceph_pagelist *pl, u16 v) +{ + __le16 ev = cpu_to_le16(v); + return ceph_pagelist_append(pl, &ev, sizeof(ev)); +} +static inline int ceph_pagelist_encode_8(struct ceph_pagelist *pl, u8 v) +{ + return ceph_pagelist_append(pl, &v, 1); +} +static inline int ceph_pagelist_encode_string(struct ceph_pagelist *pl, + char *s, size_t len) +{ + int ret = ceph_pagelist_encode_32(pl, len); + if (ret) + return ret; + if (len) + return ceph_pagelist_append(pl, s, len); + return 0; +} + +#endif -- 2.20.1