ceph: warn on allocation from msgpool with larger front_len
authorSage Weil <sage@newdream.net>
Thu, 15 Oct 2009 00:36:07 +0000 (17:36 -0700)
committerSage Weil <sage@newdream.net>
Fri, 16 Oct 2009 01:14:43 +0000 (18:14 -0700)
Pass the front_len we need when pulling a message off a msgpool,
and WARN if it is greater than the pool's size.  Then try to
allocate a new message (to continue without failing).

Signed-off-by: Sage Weil <sage@newdream.net>
fs/ceph/mon_client.c
fs/ceph/msgpool.c
fs/ceph/msgpool.h
fs/ceph/osd_client.c

index d52e52968d01bbd69554909e961289a5a19f37d5..e6e954cac6b9303e8fda527f05107f43955a4fd4 100644 (file)
@@ -639,14 +639,15 @@ static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con,
 {
        struct ceph_mon_client *monc = con->private;
        int type = le16_to_cpu(hdr->type);
+       int front = le32_to_cpu(hdr->front_len);
 
        switch (type) {
        case CEPH_MSG_CLIENT_MOUNT_ACK:
-               return ceph_msgpool_get(&monc->msgpool_mount_ack);
+               return ceph_msgpool_get(&monc->msgpool_mount_ack, front);
        case CEPH_MSG_MON_SUBSCRIBE_ACK:
-               return ceph_msgpool_get(&monc->msgpool_subscribe_ack);
+               return ceph_msgpool_get(&monc->msgpool_subscribe_ack, front);
        case CEPH_MSG_STATFS_REPLY:
-               return ceph_msgpool_get(&monc->msgpool_statfs_reply);
+               return ceph_msgpool_get(&monc->msgpool_statfs_reply, front);
        }
        return ceph_alloc_msg(con, hdr);
 }
index 39d4d7ed82ce5d0e5ca8d30cbeb0674e8b9adc35..7599b33820768c16c9478581ec7cc6e9465c13b1 100644 (file)
@@ -101,14 +101,28 @@ int ceph_msgpool_resv(struct ceph_msgpool *pool, int delta)
        return ret;
 }
 
-struct ceph_msg *ceph_msgpool_get(struct ceph_msgpool *pool)
+struct ceph_msg *ceph_msgpool_get(struct ceph_msgpool *pool, int front_len)
 {
        wait_queue_t wait;
        struct ceph_msg *msg;
 
+       if (front_len && front_len > pool->front_len) {
+               pr_err("msgpool_get pool %p need front %d, pool size is %d\n",
+                      pool, front_len, pool->front_len);
+               WARN_ON(1);
+
+               /* try to alloc a fresh message */
+               msg = ceph_msg_new(0, front_len, 0, 0, NULL);
+               if (!IS_ERR(msg))
+                       return msg;
+       }
+
+       if (!front_len)
+               front_len = pool->front_len;
+
        if (pool->blocking) {
                /* mempool_t behavior; first try to alloc */
-               msg = ceph_msg_new(0, pool->front_len, 0, 0, NULL);
+               msg = ceph_msg_new(0, front_len, 0, 0, NULL);
                if (!IS_ERR(msg))
                        return msg;
        }
@@ -133,7 +147,7 @@ struct ceph_msg *ceph_msgpool_get(struct ceph_msgpool *pool)
                        WARN_ON(1);
 
                        /* maybe we can allocate it now? */
-                       msg = ceph_msg_new(0, pool->front_len, 0, 0, NULL);
+                       msg = ceph_msg_new(0, front_len, 0, 0, NULL);
                        if (!IS_ERR(msg))
                                return msg;
 
index 07a2decaa6d837ffd711ff1f14f5f774c3e42e81..bc834bfcd7207f42c26b335551e860effd2e6087 100644 (file)
@@ -20,7 +20,8 @@ extern int ceph_msgpool_init(struct ceph_msgpool *pool,
                             int front_len, int size, bool blocking);
 extern void ceph_msgpool_destroy(struct ceph_msgpool *pool);
 extern int ceph_msgpool_resv(struct ceph_msgpool *, int delta);
-extern struct ceph_msg *ceph_msgpool_get(struct ceph_msgpool *);
+extern struct ceph_msg *ceph_msgpool_get(struct ceph_msgpool *,
+                                        int front_len);
 extern void ceph_msgpool_put(struct ceph_msgpool *, struct ceph_msg *);
 
 #endif
index bbd9a5d23712c116f4aae6df27786d3cdeba5c9a..0a254054a82a1f68caefb67d5290c4b505521ffb 100644 (file)
@@ -161,7 +161,7 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
        if (snapc)
                msg_size += sizeof(u64) * snapc->num_snaps;
        if (use_mempool)
-               msg = ceph_msgpool_get(&osdc->msgpool_op);
+               msg = ceph_msgpool_get(&osdc->msgpool_op, 0);
        else
                msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, 0, 0, NULL);
        if (IS_ERR(msg)) {
@@ -1271,10 +1271,11 @@ static struct ceph_msg *alloc_msg(struct ceph_connection *con,
        struct ceph_osd *osd = con->private;
        struct ceph_osd_client *osdc = osd->o_osdc;
        int type = le16_to_cpu(hdr->type);
+       int front = le32_to_cpu(hdr->front_len);
 
        switch (type) {
        case CEPH_MSG_OSD_OPREPLY:
-               return ceph_msgpool_get(&osdc->msgpool_op_reply);
+               return ceph_msgpool_get(&osdc->msgpool_op_reply, front);
        }
        return ceph_alloc_msg(con, hdr);
 }