ceph: keep reserved replies on the request structure
authorYehuda Sadeh <yehuda@hq.newdream.net>
Thu, 14 Jan 2010 01:03:23 +0000 (17:03 -0800)
committerSage Weil <sage@newdream.net>
Mon, 25 Jan 2010 20:58:08 +0000 (12:58 -0800)
This includes treating all the data preallocation and revokation
at the same place, not having to have a special case for
the reserved pages.

Signed-off-by: Yehuda Sadeh <yehuda@hq.newdream.net>
fs/ceph/messenger.c
fs/ceph/messenger.h
fs/ceph/osd_client.c
fs/ceph/osd_client.h

index f708803e6857b2c2d4f6dd7d54f217bdd7bf9055..81bc779adb9045dd17aea04525c9ff9ffb4fab33 100644 (file)
@@ -1985,30 +1985,30 @@ void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg)
 }
 
 /*
- * Revoke a page vector that we may be reading data into
+ * Revoke a message that we may be reading data into
  */
-void ceph_con_revoke_pages(struct ceph_connection *con, struct page **pages)
+void ceph_con_revoke_message(struct ceph_connection *con, struct ceph_msg *msg)
 {
        mutex_lock(&con->mutex);
-       if (con->in_msg && con->in_msg->pages == pages) {
+       if (con->in_msg && con->in_msg == msg) {
+               unsigned front_len = le32_to_cpu(con->in_hdr.front_len);
+               unsigned middle_len = le32_to_cpu(con->in_hdr.middle_len);
                unsigned data_len = le32_to_cpu(con->in_hdr.data_len);
 
                /* skip rest of message */
-               dout("con_revoke_pages %p msg %p pages %p revoked\n", con,
-                    con->in_msg, pages);
-               if (con->in_msg_pos.data_pos < data_len)
-                       con->in_base_pos = con->in_msg_pos.data_pos - data_len;
-               else
+               dout("con_revoke_pages %p msg %p revoked\n", con, msg);
                        con->in_base_pos = con->in_base_pos -
                                sizeof(struct ceph_msg_header) -
+                               front_len -
+                               middle_len -
+                               data_len -
                                sizeof(struct ceph_msg_footer);
-               con->in_msg->pages = NULL;
                ceph_msg_put(con->in_msg);
                con->in_msg = NULL;
                con->in_tag = CEPH_MSGR_TAG_READY;
        } else {
                dout("con_revoke_pages %p msg %p pages %p no-op\n",
-                    con, con->in_msg, pages);
+                    con, con->in_msg, msg);
        }
        mutex_unlock(&con->mutex);
 }
index dca2d32b40de7d39a0c034598a7200aa4805bc0b..c26a3d8aa78c7759c49227857748cbb715a67fb2 100644 (file)
@@ -226,8 +226,8 @@ extern void ceph_con_open(struct ceph_connection *con,
 extern void ceph_con_close(struct ceph_connection *con);
 extern void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg);
 extern void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg);
-extern void ceph_con_revoke_pages(struct ceph_connection *con,
-                                 struct page **pages);
+extern void ceph_con_revoke_message(struct ceph_connection *con,
+                                 struct ceph_msg *msg);
 extern void ceph_con_keepalive(struct ceph_connection *con);
 extern struct ceph_connection *ceph_con_get(struct ceph_connection *con);
 extern void ceph_con_put(struct ceph_connection *con);
index 44abe299c69f84e0ab8693aae776e87fc0b69291..df21068397139dd35cb20c89e6516f187f773cc7 100644 (file)
@@ -13,6 +13,8 @@
 #include "decode.h"
 #include "auth.h"
 
+#define OSD_REPLY_RESERVE_FRONT_LEN    512
+
 const static struct ceph_connection_operations osd_con_ops;
 
 static void kick_requests(struct ceph_osd_client *osdc, struct ceph_osd *osd);
@@ -73,6 +75,16 @@ static void calc_layout(struct ceph_osd_client *osdc,
             req->r_oid, req->r_oid_len, objoff, objlen, req->r_num_pages);
 }
 
+static void remove_replies(struct ceph_osd_request *req)
+{
+       int i;
+       int max = ARRAY_SIZE(req->replies);
+
+       for (i=0; i<max; i++) {
+               if (req->replies[i])
+                       ceph_msg_put(req->replies[i]);
+       }
+}
 
 /*
  * requests
@@ -87,12 +99,13 @@ void ceph_osdc_release_request(struct kref *kref)
                ceph_msg_put(req->r_request);
        if (req->r_reply)
                ceph_msg_put(req->r_reply);
-       if (req->r_con_filling_pages) {
+       remove_replies(req);
+       if (req->r_con_filling_msg) {
                dout("release_request revoking pages %p from con %p\n",
-                    req->r_pages, req->r_con_filling_pages);
-               ceph_con_revoke_pages(req->r_con_filling_pages,
-                                     req->r_pages);
-               ceph_con_put(req->r_con_filling_pages);
+                    req->r_pages, req->r_con_filling_msg);
+               ceph_con_revoke_message(req->r_con_filling_msg,
+                                     req->r_reply);
+               ceph_con_put(req->r_con_filling_msg);
        }
        if (req->r_own_pages)
                ceph_release_page_vector(req->r_pages,
@@ -104,6 +117,60 @@ void ceph_osdc_release_request(struct kref *kref)
                kfree(req);
 }
 
+static int alloc_replies(struct ceph_osd_request *req, int num_reply)
+{
+       int i;
+       int max = ARRAY_SIZE(req->replies);
+
+       BUG_ON(num_reply > max);
+
+       for (i=0; i<num_reply; i++) {
+               req->replies[i] = ceph_msg_new(0, OSD_REPLY_RESERVE_FRONT_LEN, 0, 0, NULL);
+               if (IS_ERR(req->replies[i])) {
+                       int j;
+                       int err = PTR_ERR(req->replies[i]);
+                       for (j = 0; j<=i; j++) {
+                               ceph_msg_put(req->replies[j]);
+                       }
+                       return err;
+               }
+       }
+
+       for (; i<max; i++) {
+               req->replies[i] = NULL;
+       }
+
+       req->cur_reply = 0;
+
+       return 0;
+}
+
+static struct ceph_msg *__get_next_reply(struct ceph_connection *con,
+                                      struct ceph_osd_request *req,
+                                      int front_len)
+{
+       struct ceph_msg *reply;
+       if (req->r_con_filling_msg) {
+               dout("revoking reply msg %p from old con %p\n", req->r_reply,
+                    req->r_con_filling_msg);
+               ceph_con_revoke_message(req->r_con_filling_msg, req->r_reply);
+               ceph_con_put(req->r_con_filling_msg);
+               req->cur_reply = 0;
+       }
+       reply = req->replies[req->cur_reply];
+       if (!reply || front_len > OSD_REPLY_RESERVE_FRONT_LEN) {
+               /* maybe we can allocate it now? */
+               reply = ceph_msg_new(0, front_len, 0, 0, NULL);
+               if (!reply || IS_ERR(reply)) {
+                       pr_err(" reply alloc failed, front_len=%d\n", front_len);
+                       return ERR_PTR(-ENOMEM);
+               }
+       }
+       req->r_con_filling_msg = ceph_con_get(con);
+       req->r_reply = ceph_msg_get(reply); /* for duration of read over socket */
+       return ceph_msg_get(reply);
+}
+
 /*
  * build new request AND message, calculate layout, and adjust file
  * extent as needed.
@@ -147,7 +214,7 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
        if (req == NULL)
                return ERR_PTR(-ENOMEM);
 
-       err = ceph_msgpool_resv(&osdc->msgpool_op_reply, num_reply);
+       err = alloc_replies(req, num_reply);
        if (err) {
                ceph_osdc_put_request(req);
                return ERR_PTR(-ENOMEM);
@@ -173,7 +240,6 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
        else
                msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, 0, 0, NULL);
        if (IS_ERR(msg)) {
-               ceph_msgpool_resv(&osdc->msgpool_op_reply, -num_reply);
                ceph_osdc_put_request(req);
                return ERR_PTR(PTR_ERR(msg));
        }
@@ -471,8 +537,6 @@ static void __unregister_request(struct ceph_osd_client *osdc,
        rb_erase(&req->r_node, &osdc->requests);
        osdc->num_requests--;
 
-       ceph_msgpool_resv(&osdc->msgpool_op_reply, -req->r_num_prealloc_reply);
-
        if (req->r_osd) {
                /* make sure the original request isn't in flight. */
                ceph_con_revoke(&req->r_osd->o_con, req->r_request);
@@ -724,12 +788,12 @@ static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
        flags = le32_to_cpu(rhead->flags);
 
        /*
-        * if this connection filled our pages, drop our reference now, to
+        * if this connection filled our message, drop our reference now, to
         * avoid a (safe but slower) revoke later.
         */
-       if (req->r_con_filling_pages == con && req->r_pages == msg->pages) {
-               dout(" got pages, dropping con_filling_pages ref %p\n", con);
-               req->r_con_filling_pages = NULL;
+       if (req->r_con_filling_msg == con && req->r_reply == msg) {
+               dout(" got pages, dropping con_filling_msg ref %p\n", con);
+               req->r_con_filling_msg = NULL;
                ceph_con_put(con);
        }
 
@@ -998,7 +1062,7 @@ bad:
  * find those pages.
  *  0 = success, -1 failure.
  */
-static int prepare_pages(struct ceph_connection *con,
+static int __prepare_pages(struct ceph_connection *con,
                         struct ceph_msg_header *hdr,
                         struct ceph_osd_request *req,
                         u64 tid,
@@ -1017,20 +1081,10 @@ static int prepare_pages(struct ceph_connection *con,
 
        osdc = osd->o_osdc;
 
-       dout("prepare_pages on msg %p want %d\n", m, want);
-       dout("prepare_pages tid %llu has %d pages, want %d\n",
+       dout("__prepare_pages on msg %p tid %llu, has %d pages, want %d\n", m,
             tid, req->r_num_pages, want);
        if (unlikely(req->r_num_pages < want))
                goto out;
-
-       if (req->r_con_filling_pages) {
-               dout("revoking pages %p from old con %p\n", req->r_pages,
-                    req->r_con_filling_pages);
-               ceph_con_revoke_pages(req->r_con_filling_pages, req->r_pages);
-               ceph_con_put(req->r_con_filling_pages);
-       }
-       req->r_con_filling_pages = ceph_con_get(con);
-       req->r_reply = ceph_msg_get(m); /* for duration of read over socket */
        m->pages = req->r_pages;
        m->nr_pages = req->r_num_pages;
        ret = 0; /* success */
@@ -1164,13 +1218,8 @@ int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
        err = ceph_msgpool_init(&osdc->msgpool_op, 4096, 10, true);
        if (err < 0)
                goto out_mempool;
-       err = ceph_msgpool_init(&osdc->msgpool_op_reply, 512, 0, false);
-       if (err < 0)
-               goto out_msgpool;
        return 0;
 
-out_msgpool:
-       ceph_msgpool_destroy(&osdc->msgpool_op);
 out_mempool:
        mempool_destroy(osdc->req_mempool);
 out:
@@ -1186,7 +1235,6 @@ void ceph_osdc_stop(struct ceph_osd_client *osdc)
        }
        mempool_destroy(osdc->req_mempool);
        ceph_msgpool_destroy(&osdc->msgpool_op);
-       ceph_msgpool_destroy(&osdc->msgpool_op_reply);
 }
 
 /*
@@ -1323,17 +1371,17 @@ static struct ceph_msg *alloc_msg(struct ceph_connection *con,
        if (!req) {
                *skip = 1;
                m = NULL;
-               dout("prepare_pages unknown tid %llu\n", tid);
+               dout("alloc_msg unknown tid %llu\n", tid);
                goto out;
        }
-       m = ceph_msgpool_get(&osdc->msgpool_op_reply, front);
-       if (!m) {
+       m = __get_next_reply(con, req, front);
+       if (!m || IS_ERR(m)) {
                *skip = 1;
                goto out;
        }
 
        if (data_len > 0) {
-               err = prepare_pages(con, hdr, req, tid, m);
+               err = __prepare_pages(con, hdr, req, tid, m);
                if (err < 0) {
                        *skip = 1;
                        ceph_msg_put(m);
index 4162c6810a8f7c8debb5f6f26ed9a7d31675a7cb..8d533d9406ffca2eae77e5fbec592f6913e74dec 100644 (file)
@@ -44,7 +44,7 @@ struct ceph_osd_request {
        struct ceph_osd *r_osd;
        struct ceph_pg   r_pgid;
 
-       struct ceph_connection *r_con_filling_pages;
+       struct ceph_connection *r_con_filling_msg;
 
        struct ceph_msg  *r_request, *r_reply;
        int               r_result;
@@ -75,6 +75,9 @@ struct ceph_osd_request {
        struct page     **r_pages;            /* pages for data payload */
        int               r_pages_from_pool;
        int               r_own_pages;        /* if true, i own page list */
+
+       struct ceph_msg   *replies[2];
+       int               cur_reply;
 };
 
 struct ceph_osd_client {
@@ -98,8 +101,7 @@ struct ceph_osd_client {
 
        mempool_t              *req_mempool;
 
-       struct ceph_msgpool   msgpool_op;
-       struct ceph_msgpool   msgpool_op_reply;
+       struct ceph_msgpool     msgpool_op;
 };
 
 extern int ceph_osdc_init(struct ceph_osd_client *osdc,