ceph: refactor messages data section allocation
authorYehuda Sadeh <yehuda@hq.newdream.net>
Mon, 11 Jan 2010 18:32:02 +0000 (10:32 -0800)
committerSage Weil <sage@newdream.net>
Mon, 25 Jan 2010 20:57:43 +0000 (12:57 -0800)
Signed-off-by: Yehuda Sadeh <yehuda@hq.newdream.net>
fs/ceph/messenger.c

index 25de15c006b1ac3c384cb45a4b0ca8f0cecf9923..e8742cc9ecdfdccbef0457ba5f2ebe3339ae9a99 100644 (file)
@@ -1315,7 +1315,7 @@ static int read_partial_message(struct ceph_connection *con)
        struct ceph_msg *m = con->in_msg;
        void *p;
        int ret;
-       int to, want, left;
+       int to, left;
        unsigned front_len, middle_len, data_len, data_off;
        int datacrc = con->msgr->nocrc;
        int skip;
@@ -1351,6 +1351,7 @@ static int read_partial_message(struct ceph_connection *con)
        data_len = le32_to_cpu(con->in_hdr.data_len);
        if (data_len > CEPH_MSG_MAX_DATA_LEN)
                return -EIO;
+       data_off = le16_to_cpu(con->in_hdr.data_off);
 
        /* allocate message? */
        if (!con->in_msg) {
@@ -1375,7 +1376,10 @@ static int read_partial_message(struct ceph_connection *con)
                m->front.iov_len = 0;    /* haven't read it yet */
                if (m->middle)
                        m->middle->vec.iov_len = 0;
-               memcpy(&m->hdr, &con->in_hdr, sizeof(con->in_hdr));
+
+               con->in_msg_pos.page = 0;
+               con->in_msg_pos.page_pos = data_off & ~PAGE_MASK;
+               con->in_msg_pos.data_pos = 0;
        }
 
        /* front */
@@ -1393,31 +1397,6 @@ static int read_partial_message(struct ceph_connection *con)
        }
 
        /* (page) data */
-       data_off = le16_to_cpu(m->hdr.data_off);
-       if (data_len == 0)
-               goto no_data;
-
-       if (m->nr_pages == 0) {
-               con->in_msg_pos.page = 0;
-               con->in_msg_pos.page_pos = data_off & ~PAGE_MASK;
-               con->in_msg_pos.data_pos = 0;
-               /* find pages for data payload */
-               want = calc_pages_for(data_off & ~PAGE_MASK, data_len);
-               ret = -1;
-               mutex_unlock(&con->mutex);
-               if (con->ops->prepare_pages)
-                       ret = con->ops->prepare_pages(con, m, want);
-               mutex_lock(&con->mutex);
-               if (ret < 0) {
-                       dout("%p prepare_pages failed, skipping payload\n", m);
-                       con->in_base_pos = -data_len - sizeof(m->footer);
-                       ceph_msg_put(con->in_msg);
-                       con->in_msg = NULL;
-                       con->in_tag = CEPH_MSGR_TAG_READY;
-                       return 0;
-               }
-               BUG_ON(m->nr_pages < want);
-       }
        while (con->in_msg_pos.data_pos < data_len) {
                left = min((int)(data_len - con->in_msg_pos.data_pos),
                           (int)(PAGE_SIZE - con->in_msg_pos.page_pos));
@@ -1440,7 +1419,6 @@ static int read_partial_message(struct ceph_connection *con)
                }
        }
 
-no_data:
        /* footer */
        to = sizeof(m->hdr) + sizeof(m->footer);
        while (con->in_base_pos < to) {
@@ -2136,6 +2114,25 @@ static int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg)
        return 0;
 }
 
+static int ceph_alloc_data_section(struct ceph_connection *con, struct ceph_msg *msg)
+{
+       int ret;
+       int want;
+       int data_len = le32_to_cpu(msg->hdr.data_len);
+       unsigned data_off = le16_to_cpu(msg->hdr.data_off);
+
+       want = calc_pages_for(data_off & ~PAGE_MASK, data_len);
+       ret = -1;
+       mutex_unlock(&con->mutex);
+       if (con->ops->prepare_pages)
+               ret = con->ops->prepare_pages(con, msg, want);
+       mutex_lock(&con->mutex);
+
+       BUG_ON(msg->nr_pages < want);
+
+       return ret;
+}
+
 /*
  * Generic message allocator, for incoming messages.
  */
@@ -2146,6 +2143,7 @@ static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
        int type = le16_to_cpu(hdr->type);
        int front_len = le32_to_cpu(hdr->front_len);
        int middle_len = le32_to_cpu(hdr->middle_len);
+       int data_len = le32_to_cpu(hdr->data_len);
        struct ceph_msg *msg = NULL;
        int ret;
 
@@ -2166,6 +2164,7 @@ static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
                        return ERR_PTR(-ENOMEM);
                }
        }
+       memcpy(&msg->hdr, &con->in_hdr, sizeof(con->in_hdr));
 
        if (middle_len) {
                ret = ceph_alloc_middle(con, msg);
@@ -2175,6 +2174,18 @@ static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
                        return msg;
                }
        }
+
+       if (data_len) {
+               ret = ceph_alloc_data_section(con, msg);
+
+               if (ret < 0) {
+                       *skip = 1;
+                       ceph_msg_put(msg);
+                       return NULL;
+               }
+       }
+
+
        return msg;
 }