ceph: allocate middle of message before stating to read
authorYehuda Sadeh <yehuda@hq.newdream.net>
Fri, 8 Jan 2010 21:58:34 +0000 (13:58 -0800)
committerSage Weil <sage@newdream.net>
Mon, 25 Jan 2010 20:57:37 +0000 (12:57 -0800)
Both front and middle parts of the message are now being
allocated at the ceph_alloc_msg().

Signed-off-by: Yehuda Sadeh <yehuda@hq.newdream.net>
fs/ceph/mds_client.c
fs/ceph/messenger.c
fs/ceph/messenger.h
fs/ceph/mon_client.c
fs/ceph/osd_client.c

index 623c67cd484ba35f8dc43c323a8a3c381ebd5a2d..93998a0678c4ea20ac5f888089af6a2bddfe14e5 100644 (file)
@@ -2953,8 +2953,6 @@ const static struct ceph_connection_operations mds_con_ops = {
        .get_authorizer = get_authorizer,
        .verify_authorizer_reply = verify_authorizer_reply,
        .peer_reset = peer_reset,
-       .alloc_msg = ceph_alloc_msg,
-       .alloc_middle = ceph_alloc_middle,
 };
 
 
index 1360708d7505b8a2b748c71af8683c931ef88b92..25de15c006b1ac3c384cb45a4b0ca8f0cecf9923 100644 (file)
@@ -1279,8 +1279,34 @@ static void process_ack(struct ceph_connection *con)
 
 
 
+static int read_partial_message_section(struct ceph_connection *con,
+                                       struct kvec *section, unsigned int sec_len,
+                                       u32 *crc)
+{
+       int left;
+       int ret;
+
+       BUG_ON(!section);
+
+       while (section->iov_len < sec_len) {
+               BUG_ON(section->iov_base == NULL);
+               left = sec_len - section->iov_len;
+               ret = ceph_tcp_recvmsg(con->sock, (char *)section->iov_base +
+                                      section->iov_len, left);
+               if (ret <= 0)
+                       return ret;
+               section->iov_len += ret;
+               if (section->iov_len == sec_len)
+                       *crc = crc32c(0, section->iov_base,
+                                     section->iov_len);
+       }
 
+       return 1;
+}
 
+static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
+                               struct ceph_msg_header *hdr,
+                               int *skip);
 /*
  * read (part of) a message.
  */
@@ -1292,6 +1318,7 @@ static int read_partial_message(struct ceph_connection *con)
        int to, want, left;
        unsigned front_len, middle_len, data_len, data_off;
        int datacrc = con->msgr->nocrc;
+       int skip;
 
        dout("read_partial_message con %p msg %p\n", con, m);
 
@@ -1315,7 +1342,6 @@ static int read_partial_message(struct ceph_connection *con)
                        }
                }
        }
-
        front_len = le32_to_cpu(con->in_hdr.front_len);
        if (front_len > CEPH_MSG_MAX_FRONT_LEN)
                return -EIO;
@@ -1330,8 +1356,8 @@ static int read_partial_message(struct ceph_connection *con)
        if (!con->in_msg) {
                dout("got hdr type %d front %d data %d\n", con->in_hdr.type,
                     con->in_hdr.front_len, con->in_hdr.data_len);
-               con->in_msg = con->ops->alloc_msg(con, &con->in_hdr);
-               if (!con->in_msg) {
+               con->in_msg = ceph_alloc_msg(con, &con->in_hdr, &skip);
+               if (skip) {
                        /* skip this message */
                        pr_err("alloc_msg returned NULL, skipping message\n");
                        con->in_base_pos = -front_len - middle_len - data_len -
@@ -1342,56 +1368,28 @@ static int read_partial_message(struct ceph_connection *con)
                if (IS_ERR(con->in_msg)) {
                        ret = PTR_ERR(con->in_msg);
                        con->in_msg = NULL;
-                       con->error_msg = "out of memory for incoming message";
+                       con->error_msg = "error allocating memory for incoming message";
                        return ret;
                }
                m = con->in_msg;
                m->front.iov_len = 0;    /* haven't read it yet */
+               if (m->middle)
+                       m->middle->vec.iov_len = 0;
                memcpy(&m->hdr, &con->in_hdr, sizeof(con->in_hdr));
        }
 
        /* front */
-       while (m->front.iov_len < front_len) {
-               BUG_ON(m->front.iov_base == NULL);
-               left = front_len - m->front.iov_len;
-               ret = ceph_tcp_recvmsg(con->sock, (char *)m->front.iov_base +
-                                      m->front.iov_len, left);
-               if (ret <= 0)
-                       return ret;
-               m->front.iov_len += ret;
-               if (m->front.iov_len == front_len)
-                       con->in_front_crc = crc32c(0, m->front.iov_base,
-                                                     m->front.iov_len);
-       }
+       ret = read_partial_message_section(con, &m->front, front_len,
+                                          &con->in_front_crc);
+       if (ret <= 0)
+               return ret;
 
        /* middle */
-       while (middle_len > 0 && (!m->middle ||
-                                 m->middle->vec.iov_len < middle_len)) {
-               if (m->middle == NULL) {
-                       ret = -EOPNOTSUPP;
-                       if (con->ops->alloc_middle)
-                               ret = con->ops->alloc_middle(con, m);
-                       if (ret < 0) {
-                               pr_err("alloc_middle fail skipping payload\n");
-                               con->in_base_pos = -middle_len - data_len
-                                       - sizeof(m->footer);
-                               ceph_msg_put(con->in_msg);
-                               con->in_msg = NULL;
-                               con->in_tag = CEPH_MSGR_TAG_READY;
-                               return 0;
-                       }
-                       m->middle->vec.iov_len = 0;
-               }
-               left = middle_len - m->middle->vec.iov_len;
-               ret = ceph_tcp_recvmsg(con->sock,
-                                      (char *)m->middle->vec.iov_base +
-                                      m->middle->vec.iov_len, left);
+       if (m->middle) {
+               ret = read_partial_message_section(con, &m->middle->vec, middle_len,
+                                                  &con->in_middle_crc);
                if (ret <= 0)
                        return ret;
-               m->middle->vec.iov_len += ret;
-               if (m->middle->vec.iov_len == middle_len)
-                       con->in_middle_crc = crc32c(0, m->middle->vec.iov_base,
-                                                     m->middle->vec.iov_len);
        }
 
        /* (page) data */
@@ -2115,24 +2113,6 @@ out:
        return ERR_PTR(-ENOMEM);
 }
 
-/*
- * Generic message allocator, for incoming messages.
- */
-struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
-                               struct ceph_msg_header *hdr)
-{
-       int type = le16_to_cpu(hdr->type);
-       int front_len = le32_to_cpu(hdr->front_len);
-       struct ceph_msg *msg = ceph_msg_new(type, front_len, 0, 0, NULL);
-
-       if (!msg) {
-               pr_err("unable to allocate msg type %d len %d\n",
-                      type, front_len);
-               return ERR_PTR(-ENOMEM);
-       }
-       return msg;
-}
-
 /*
  * Allocate "middle" portion of a message, if it is needed and wasn't
  * allocated by alloc_msg.  This allows us to read a small fixed-size
@@ -2140,7 +2120,7 @@ struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
  * propagate the error to the caller based on info in the front) when
  * the middle is too large.
  */
-int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg)
+static int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg)
 {
        int type = le16_to_cpu(msg->hdr.type);
        int middle_len = le32_to_cpu(msg->hdr.middle_len);
@@ -2156,6 +2136,48 @@ int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg)
        return 0;
 }
 
+/*
+ * Generic message allocator, for incoming messages.
+ */
+static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
+                               struct ceph_msg_header *hdr,
+                               int *skip)
+{
+       int type = le16_to_cpu(hdr->type);
+       int front_len = le32_to_cpu(hdr->front_len);
+       int middle_len = le32_to_cpu(hdr->middle_len);
+       struct ceph_msg *msg = NULL;
+       int ret;
+
+       if (con->ops->alloc_msg) {
+               msg = con->ops->alloc_msg(con, hdr, skip);
+               if (IS_ERR(msg))
+                       return msg;
+
+               if (*skip)
+                       return NULL;
+       }
+       if (!msg) {
+               *skip = 0;
+               msg = ceph_msg_new(type, front_len, 0, 0, NULL);
+               if (!msg) {
+                       pr_err("unable to allocate msg type %d len %d\n",
+                              type, front_len);
+                       return ERR_PTR(-ENOMEM);
+               }
+       }
+
+       if (middle_len) {
+               ret = ceph_alloc_middle(con, msg);
+
+               if (ret < 0) {
+                       ceph_msg_put(msg);
+                       return msg;
+               }
+       }
+       return msg;
+}
+
 
 /*
  * Free a generically kmalloc'd message.
index a7b6841450924557a3ca6396f05b80dcaa5d43bb..b6bec59056d7eb34b8960145baa3ba2d640edde9 100644 (file)
@@ -44,9 +44,8 @@ struct ceph_connection_operations {
        void (*peer_reset) (struct ceph_connection *con);
 
        struct ceph_msg * (*alloc_msg) (struct ceph_connection *con,
-                                       struct ceph_msg_header *hdr);
-       int (*alloc_middle) (struct ceph_connection *con,
-                            struct ceph_msg *msg);
+                                       struct ceph_msg_header *hdr,
+                                       int *skip);
        /* an incoming message has a data payload; tell me what pages I
         * should read the data into. */
        int (*prepare_pages) (struct ceph_connection *con, struct ceph_msg *m,
@@ -242,10 +241,6 @@ extern struct ceph_msg *ceph_msg_new(int type, int front_len,
                                     struct page **pages);
 extern void ceph_msg_kfree(struct ceph_msg *m);
 
-extern struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
-                                      struct ceph_msg_header *hdr);
-extern int ceph_alloc_middle(struct ceph_connection *con, struct ceph_msg *msg);
-
 
 static inline struct ceph_msg *ceph_msg_get(struct ceph_msg *msg)
 {
index 223e8bc207e39aeffb6983ec1b7ab94a51024b15..6c00b37cc5541e3ac252754d23586986c6955567 100644 (file)
@@ -692,21 +692,33 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
  * Allocate memory for incoming message
  */
 static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con,
-                                     struct ceph_msg_header *hdr)
+                                     struct ceph_msg_header *hdr,
+                                     int *skip)
 {
        struct ceph_mon_client *monc = con->private;
        int type = le16_to_cpu(hdr->type);
-       int front = le32_to_cpu(hdr->front_len);
+       int front_len = le32_to_cpu(hdr->front_len);
+       struct ceph_msg *m;
 
+       *skip = 0;
        switch (type) {
        case CEPH_MSG_MON_SUBSCRIBE_ACK:
-               return ceph_msgpool_get(&monc->msgpool_subscribe_ack, front);
+               m = ceph_msgpool_get(&monc->msgpool_subscribe_ack, front_len);
+               break;
        case CEPH_MSG_STATFS_REPLY:
-               return ceph_msgpool_get(&monc->msgpool_statfs_reply, front);
+               m = ceph_msgpool_get(&monc->msgpool_statfs_reply, front_len);
+               break;
        case CEPH_MSG_AUTH_REPLY:
-               return ceph_msgpool_get(&monc->msgpool_auth_reply, front);
+               m = ceph_msgpool_get(&monc->msgpool_auth_reply, front_len);
+               break;
+       default:
+               return NULL;
        }
-       return ceph_alloc_msg(con, hdr);
+
+       if (!m)
+               *skip = 1;
+
+       return m;
 }
 
 /*
@@ -749,5 +761,4 @@ const static struct ceph_connection_operations mon_con_ops = {
        .dispatch = dispatch,
        .fault = mon_fault,
        .alloc_msg = mon_alloc_msg,
-       .alloc_middle = ceph_alloc_middle,
 };
index 8417e21a3cb230836c42fadcc22768ca339d74f0..545e9361799399caf79c7ea9bdd3d3ed56916fb9 100644 (file)
@@ -1304,18 +1304,28 @@ static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
 }
 
 static struct ceph_msg *alloc_msg(struct ceph_connection *con,
-                                 struct ceph_msg_header *hdr)
+                                 struct ceph_msg_header *hdr,
+                                 int *skip)
 {
        struct ceph_osd *osd = con->private;
        struct ceph_osd_client *osdc = osd->o_osdc;
        int type = le16_to_cpu(hdr->type);
        int front = le32_to_cpu(hdr->front_len);
+       struct ceph_msg *m;
 
+       *skip = 0;
        switch (type) {
        case CEPH_MSG_OSD_OPREPLY:
-               return ceph_msgpool_get(&osdc->msgpool_op_reply, front);
+               m = ceph_msgpool_get(&osdc->msgpool_op_reply, front);
+               break;
+       default:
+               return NULL;
        }
-       return ceph_alloc_msg(con, hdr);
+
+       if (!m)
+               *skip = 1;
+
+       return m;
 }
 
 /*
@@ -1390,6 +1400,5 @@ const static struct ceph_connection_operations osd_con_ops = {
        .verify_authorizer_reply = verify_authorizer_reply,
        .alloc_msg = alloc_msg,
        .fault = osd_reset,
-       .alloc_middle = ceph_alloc_middle,
        .prepare_pages = prepare_pages,
 };