libceph: add some fine ASCII art
[GitHub/mt8127/android_kernel_alcatel_ttab.git] / net / ceph / messenger.c
index d636903ad4b24a58a59399ec149ada768d23307c..dcc50e4cd5cd704cb05d9191ff1eed22526d1835 100644 (file)
  * the sender.
  */
 
-/* State values for ceph_connection->sock_state; NEW is assumed to be 0 */
+/*
+ * We track the state of the socket on a given connection using
+ * values defined below.  The transition to a new socket state is
+ * handled by a function which verifies we aren't coming from an
+ * unexpected state.
+ *
+ *      --------
+ *      | NEW* |  transient initial state
+ *      --------
+ *          | con_sock_state_init()
+ *          v
+ *      ----------
+ *      | CLOSED |  initialized, but no socket (and no
+ *      ----------  TCP connection)
+ *       ^      \
+ *       |       \ con_sock_state_connecting()
+ *       |        ----------------------
+ *       |                              \
+ *       + con_sock_state_closed()       \
+ *       |\                               \
+ *       | \                               \
+ *       |  -----------                     \
+ *       |  | CLOSING |  socket event;       \
+ *       |  -----------  await close          \
+ *       |       ^                            |
+ *       |       |                            |
+ *       |       + con_sock_state_closing()   |
+ *       |      / \                           |
+ *       |     /   ---------------            |
+ *       |    /                   \           v
+ *       |   /                    --------------
+ *       |  /    -----------------| CONNECTING |  socket created, TCP
+ *       |  |   /                 --------------  connect initiated
+ *       |  |   | con_sock_state_connected()
+ *       |  |   v
+ *      -------------
+ *      | CONNECTED |  TCP connection established
+ *      -------------
+ *
+ * State values for ceph_connection->sock_state; NEW is assumed to be 0.
+ */
 
 #define CON_SOCK_STATE_NEW             0       /* -> CLOSED */
 #define CON_SOCK_STATE_CLOSED          1       /* -> CONNECTING */
@@ -261,13 +301,8 @@ static void ceph_sock_state_change(struct sock *sk)
        case TCP_CLOSE_WAIT:
                dout("%s TCP_CLOSE_WAIT\n", __func__);
                con_sock_state_closing(con);
-               if (test_and_set_bit(SOCK_CLOSED, &con->flags) == 0) {
-                       if (test_bit(CONNECTING, &con->state))
-                               con->error_msg = "connection failed";
-                       else
-                               con->error_msg = "socket closed";
-                       queue_con(con);
-               }
+               set_bit(SOCK_CLOSED, &con->flags);
+               queue_con(con);
                break;
        case TCP_ESTABLISHED:
                dout("%s TCP_ESTABLISHED\n", __func__);
@@ -321,6 +356,7 @@ static int ceph_tcp_connect(struct ceph_connection *con)
 
        dout("connect %s\n", ceph_pr_addr(&con->peer_addr.in_addr));
 
+       con_sock_state_connecting(con);
        ret = sock->ops->connect(sock, (struct sockaddr *)paddr, sizeof(*paddr),
                                 O_NONBLOCK);
        if (ret == -EINPROGRESS) {
@@ -336,8 +372,6 @@ static int ceph_tcp_connect(struct ceph_connection *con)
                return ret;
        }
        con->sock = sock;
-       con_sock_state_connecting(con);
-
        return 0;
 }
 
@@ -398,11 +432,17 @@ static int con_close_socket(struct ceph_connection *con)
        dout("con_close_socket on %p sock %p\n", con, con->sock);
        if (!con->sock)
                return 0;
-       set_bit(SOCK_CLOSED, &con->state);
        rc = con->sock->ops->shutdown(con->sock, SHUT_RDWR);
        sock_release(con->sock);
        con->sock = NULL;
-       clear_bit(SOCK_CLOSED, &con->state);
+
+       /*
+        * Forcibly clear the SOCK_CLOSE flag.  It gets set
+        * independent of the connection mutex, and we could have
+        * received a socket close event before we had the chance to
+        * shut the socket down.
+        */
+       clear_bit(SOCK_CLOSED, &con->flags);
        con_sock_state_closed(con);
        return rc;
 }
@@ -415,7 +455,7 @@ static void ceph_msg_remove(struct ceph_msg *msg)
 {
        list_del_init(&msg->list_head);
        BUG_ON(msg->con == NULL);
-       ceph_con_put(msg->con);
+       msg->con->ops->put(msg->con);
        msg->con = NULL;
 
        ceph_msg_put(msg);
@@ -441,7 +481,7 @@ static void reset_connection(struct ceph_connection *con)
                con->in_msg->con = NULL;
                ceph_msg_put(con->in_msg);
                con->in_msg = NULL;
-               ceph_con_put(con->in_msg->con);
+               con->ops->put(con);
        }
 
        con->connect_seq = 0;
@@ -462,6 +502,8 @@ void ceph_con_close(struct ceph_connection *con)
        dout("con_close %p peer %s\n", con,
             ceph_pr_addr(&con->peer_addr.in_addr));
        clear_bit(NEGOTIATING, &con->state);
+       clear_bit(CONNECTING, &con->state);
+       clear_bit(CONNECTED, &con->state);
        clear_bit(STANDBY, &con->state);  /* avoid connect_seq bump */
        set_bit(CLOSED, &con->state);
 
@@ -501,30 +543,6 @@ bool ceph_con_opened(struct ceph_connection *con)
        return con->connect_seq > 0;
 }
 
-/*
- * generic get/put
- */
-struct ceph_connection *ceph_con_get(struct ceph_connection *con)
-{
-       int nref = __atomic_add_unless(&con->nref, 1, 0);
-
-       dout("con_get %p nref = %d -> %d\n", con, nref, nref + 1);
-
-       return nref ? con : NULL;
-}
-
-void ceph_con_put(struct ceph_connection *con)
-{
-       int nref = atomic_dec_return(&con->nref);
-
-       BUG_ON(nref < 0);
-       if (nref == 0) {
-               BUG_ON(con->sock);
-               kfree(con);
-       }
-       dout("con_put %p nref = %d -> %d\n", con, nref + 1, nref);
-}
-
 /*
  * initialize a new connection.
  */
@@ -536,7 +554,6 @@ void ceph_con_init(struct ceph_connection *con, void *private,
        memset(con, 0, sizeof(*con));
        con->private = private;
        con->ops = ops;
-       atomic_set(&con->nref, 1);
        con->msgr = msgr;
 
        con_sock_state_init(con);
@@ -591,6 +608,53 @@ static void con_out_kvec_add(struct ceph_connection *con,
        con->out_kvec_bytes += size;
 }
 
+#ifdef CONFIG_BLOCK
+static void init_bio_iter(struct bio *bio, struct bio **iter, int *seg)
+{
+       if (!bio) {
+               *iter = NULL;
+               *seg = 0;
+               return;
+       }
+       *iter = bio;
+       *seg = bio->bi_idx;
+}
+
+static void iter_bio_next(struct bio **bio_iter, int *seg)
+{
+       if (*bio_iter == NULL)
+               return;
+
+       BUG_ON(*seg >= (*bio_iter)->bi_vcnt);
+
+       (*seg)++;
+       if (*seg == (*bio_iter)->bi_vcnt)
+               init_bio_iter((*bio_iter)->bi_next, bio_iter, seg);
+}
+#endif
+
+static void prepare_write_message_data(struct ceph_connection *con)
+{
+       struct ceph_msg *msg = con->out_msg;
+
+       BUG_ON(!msg);
+       BUG_ON(!msg->hdr.data_len);
+
+       /* initialize page iterator */
+       con->out_msg_pos.page = 0;
+       if (msg->pages)
+               con->out_msg_pos.page_pos = msg->page_alignment;
+       else
+               con->out_msg_pos.page_pos = 0;
+#ifdef CONFIG_BLOCK
+       if (msg->bio)
+               init_bio_iter(msg->bio, &msg->bio_iter, &msg->bio_seg);
+#endif
+       con->out_msg_pos.data_pos = 0;
+       con->out_msg_pos.did_page_crc = false;
+       con->out_more = 1;  /* data + footer will follow */
+}
+
 /*
  * Prepare footer for currently outgoing message, and finish things
  * off.  Assumes out_kvec* are already valid.. we just add on to the end.
@@ -600,6 +664,8 @@ static void prepare_write_message_footer(struct ceph_connection *con)
        struct ceph_msg *m = con->out_msg;
        int v = con->out_kvec_left;
 
+       m->footer.flags |= CEPH_MSG_FOOTER_COMPLETE;
+
        dout("prepare_write_message_footer %p\n", con);
        con->out_kvec_is_msg = true;
        con->out_kvec[v].iov_base = &m->footer;
@@ -669,7 +735,7 @@ static void prepare_write_message(struct ceph_connection *con)
        /* fill in crc (except data pages), footer */
        crc = crc32c(0, &m->hdr, offsetof(struct ceph_msg_header, crc));
        con->out_msg->hdr.crc = cpu_to_le32(crc);
-       con->out_msg->footer.flags = CEPH_MSG_FOOTER_COMPLETE;
+       con->out_msg->footer.flags = 0;
 
        crc = crc32c(0, m->front.iov_base, m->front.iov_len);
        con->out_msg->footer.front_crc = cpu_to_le32(crc);
@@ -679,26 +745,17 @@ static void prepare_write_message(struct ceph_connection *con)
                con->out_msg->footer.middle_crc = cpu_to_le32(crc);
        } else
                con->out_msg->footer.middle_crc = 0;
-       con->out_msg->footer.data_crc = 0;
-       dout("prepare_write_message front_crc %u data_crc %u\n",
+       dout("%s front_crc %u middle_crc %u\n", __func__,
             le32_to_cpu(con->out_msg->footer.front_crc),
             le32_to_cpu(con->out_msg->footer.middle_crc));
 
        /* is there a data payload? */
-       if (le32_to_cpu(m->hdr.data_len) > 0) {
-               /* initialize page iterator */
-               con->out_msg_pos.page = 0;
-               if (m->pages)
-                       con->out_msg_pos.page_pos = m->page_alignment;
-               else
-                       con->out_msg_pos.page_pos = 0;
-               con->out_msg_pos.data_pos = 0;
-               con->out_msg_pos.did_page_crc = false;
-               con->out_more = 1;  /* data + footer will follow */
-       } else {
+       con->out_msg->footer.data_crc = 0;
+       if (m->hdr.data_len)
+               prepare_write_message_data(con);
+       else
                /* no, queue up footer too and be done */
                prepare_write_message_footer(con);
-       }
 
        set_bit(WRITE_PENDING, &con->flags);
 }
@@ -786,7 +843,7 @@ static void prepare_write_banner(struct ceph_connection *con)
 
 static int prepare_write_connect(struct ceph_connection *con)
 {
-       unsigned global_seq = get_global_seq(con->msgr, 0);
+       unsigned int global_seq = get_global_seq(con->msgr, 0);
        int proto;
        int auth_proto;
        struct ceph_auth_handshake *auth;
@@ -824,6 +881,7 @@ static int prepare_write_connect(struct ceph_connection *con)
        con->out_connect.authorizer_len = auth ?
                cpu_to_le32(auth->authorizer_buf_len) : 0;
 
+       con_out_kvec_reset(con);
        con_out_kvec_add(con, sizeof (con->out_connect),
                                        &con->out_connect);
        if (auth && auth->authorizer_buf_len)
@@ -879,30 +937,34 @@ out:
        return ret;  /* done! */
 }
 
-#ifdef CONFIG_BLOCK
-static void init_bio_iter(struct bio *bio, struct bio **iter, int *seg)
+static void out_msg_pos_next(struct ceph_connection *con, struct page *page,
+                       size_t len, size_t sent, bool in_trail)
 {
-       if (!bio) {
-               *iter = NULL;
-               *seg = 0;
-               return;
-       }
-       *iter = bio;
-       *seg = bio->bi_idx;
-}
+       struct ceph_msg *msg = con->out_msg;
 
-static void iter_bio_next(struct bio **bio_iter, int *seg)
-{
-       if (*bio_iter == NULL)
-               return;
+       BUG_ON(!msg);
+       BUG_ON(!sent);
 
-       BUG_ON(*seg >= (*bio_iter)->bi_vcnt);
+       con->out_msg_pos.data_pos += sent;
+       con->out_msg_pos.page_pos += sent;
+       if (sent < len)
+               return;
 
-       (*seg)++;
-       if (*seg == (*bio_iter)->bi_vcnt)
-               init_bio_iter((*bio_iter)->bi_next, bio_iter, seg);
-}
+       BUG_ON(sent != len);
+       con->out_msg_pos.page_pos = 0;
+       con->out_msg_pos.page++;
+       con->out_msg_pos.did_page_crc = false;
+       if (in_trail)
+               list_move_tail(&page->lru,
+                              &msg->trail->head);
+       else if (msg->pagelist)
+               list_move_tail(&page->lru,
+                              &msg->pagelist->head);
+#ifdef CONFIG_BLOCK
+       else if (msg->bio)
+               iter_bio_next(&msg->bio_iter, &msg->bio_seg);
 #endif
+}
 
 /*
  * Write as much message data payload as we can.  If we finish, queue
@@ -914,46 +976,41 @@ static void iter_bio_next(struct bio **bio_iter, int *seg)
 static int write_partial_msg_pages(struct ceph_connection *con)
 {
        struct ceph_msg *msg = con->out_msg;
-       unsigned data_len = le32_to_cpu(msg->hdr.data_len);
+       unsigned int data_len = le32_to_cpu(msg->hdr.data_len);
        size_t len;
        bool do_datacrc = !con->msgr->nocrc;
        int ret;
        int total_max_write;
-       int in_trail = 0;
-       size_t trail_len = (msg->trail ? msg->trail->length : 0);
+       bool in_trail = false;
+       const size_t trail_len = (msg->trail ? msg->trail->length : 0);
+       const size_t trail_off = data_len - trail_len;
 
        dout("write_partial_msg_pages %p msg %p page %d/%d offset %d\n",
-            con, con->out_msg, con->out_msg_pos.page, con->out_msg->nr_pages,
+            con, msg, con->out_msg_pos.page, msg->nr_pages,
             con->out_msg_pos.page_pos);
 
-#ifdef CONFIG_BLOCK
-       if (msg->bio && !msg->bio_iter)
-               init_bio_iter(msg->bio, &msg->bio_iter, &msg->bio_seg);
-#endif
-
+       /*
+        * Iterate through each page that contains data to be
+        * written, and send as much as possible for each.
+        *
+        * If we are calculating the data crc (the default), we will
+        * need to map the page.  If we have no pages, they have
+        * been revoked, so use the zero page.
+        */
        while (data_len > con->out_msg_pos.data_pos) {
                struct page *page = NULL;
                int max_write = PAGE_SIZE;
                int bio_offset = 0;
 
-               total_max_write = data_len - trail_len -
-                       con->out_msg_pos.data_pos;
-
-               /*
-                * if we are calculating the data crc (the default), we need
-                * to map the page.  if our pages[] has been revoked, use the
-                * zero page.
-                */
-
-               /* have we reached the trail part of the data? */
-               if (con->out_msg_pos.data_pos >= data_len - trail_len) {
-                       in_trail = 1;
+               in_trail = in_trail || con->out_msg_pos.data_pos >= trail_off;
+               if (!in_trail)
+                       total_max_write = trail_off - con->out_msg_pos.data_pos;
 
+               if (in_trail) {
                        total_max_write = data_len - con->out_msg_pos.data_pos;
 
                        page = list_first_entry(&msg->trail->head,
                                                struct page, lru);
-                       max_write = PAGE_SIZE;
                } else if (msg->pages) {
                        page = msg->pages[con->out_msg_pos.page];
                } else if (msg->pagelist) {
@@ -976,15 +1033,14 @@ static int write_partial_msg_pages(struct ceph_connection *con)
 
                if (do_datacrc && !con->out_msg_pos.did_page_crc) {
                        void *base;
-                       u32 crc;
-                       u32 tmpcrc = le32_to_cpu(con->out_msg->footer.data_crc);
+                       u32 crc = le32_to_cpu(msg->footer.data_crc);
                        char *kaddr;
 
                        kaddr = kmap(page);
                        BUG_ON(kaddr == NULL);
                        base = kaddr + con->out_msg_pos.page_pos + bio_offset;
-                       crc = crc32c(tmpcrc, base, len);
-                       con->out_msg->footer.data_crc = cpu_to_le32(crc);
+                       crc = crc32c(crc, base, len);
+                       msg->footer.data_crc = cpu_to_le32(crc);
                        con->out_msg_pos.did_page_crc = true;
                }
                ret = ceph_tcp_sendpage(con->sock, page,
@@ -997,30 +1053,14 @@ static int write_partial_msg_pages(struct ceph_connection *con)
                if (ret <= 0)
                        goto out;
 
-               con->out_msg_pos.data_pos += ret;
-               con->out_msg_pos.page_pos += ret;
-               if (ret == len) {
-                       con->out_msg_pos.page_pos = 0;
-                       con->out_msg_pos.page++;
-                       con->out_msg_pos.did_page_crc = false;
-                       if (in_trail)
-                               list_move_tail(&page->lru,
-                                              &msg->trail->head);
-                       else if (msg->pagelist)
-                               list_move_tail(&page->lru,
-                                              &msg->pagelist->head);
-#ifdef CONFIG_BLOCK
-                       else if (msg->bio)
-                               iter_bio_next(&msg->bio_iter, &msg->bio_seg);
-#endif
-               }
+               out_msg_pos_next(con, page, len, (size_t) ret, in_trail);
        }
 
        dout("write_partial_msg_pages %p msg %p done\n", con, msg);
 
        /* prepare and queue up footer, too */
        if (!do_datacrc)
-               con->out_msg->footer.flags |= CEPH_MSG_FOOTER_NOCRC;
+               msg->footer.flags |= CEPH_MSG_FOOTER_NOCRC;
        con_out_kvec_reset(con);
        prepare_write_message_footer(con);
        ret = 1;
@@ -1433,8 +1473,6 @@ static int process_banner(struct ceph_connection *con)
                     ceph_pr_addr(&con->msgr->inst.addr.in_addr));
        }
 
-       set_bit(NEGOTIATING, &con->state);
-       prepare_read_connect(con);
        return 0;
 }
 
@@ -1484,7 +1522,6 @@ static int process_connect(struct ceph_connection *con)
                        return -1;
                }
                con->auth_retry = 1;
-               con_out_kvec_reset(con);
                ret = prepare_write_connect(con);
                if (ret < 0)
                        return ret;
@@ -1505,7 +1542,6 @@ static int process_connect(struct ceph_connection *con)
                       ENTITY_NAME(con->peer_name),
                       ceph_pr_addr(&con->peer_addr.in_addr));
                reset_connection(con);
-               con_out_kvec_reset(con);
                ret = prepare_write_connect(con);
                if (ret < 0)
                        return ret;
@@ -1531,7 +1567,6 @@ static int process_connect(struct ceph_connection *con)
                     le32_to_cpu(con->out_connect.connect_seq),
                     le32_to_cpu(con->in_connect.connect_seq));
                con->connect_seq = le32_to_cpu(con->in_connect.connect_seq);
-               con_out_kvec_reset(con);
                ret = prepare_write_connect(con);
                if (ret < 0)
                        return ret;
@@ -1548,7 +1583,6 @@ static int process_connect(struct ceph_connection *con)
                     le32_to_cpu(con->in_connect.global_seq));
                get_global_seq(con->msgr,
                               le32_to_cpu(con->in_connect.global_seq));
-               con_out_kvec_reset(con);
                ret = prepare_write_connect(con);
                if (ret < 0)
                        return ret;
@@ -1566,7 +1600,8 @@ static int process_connect(struct ceph_connection *con)
                        fail_protocol(con);
                        return -1;
                }
-               clear_bit(CONNECTING, &con->state);
+               clear_bit(NEGOTIATING, &con->state);
+               set_bit(CONNECTED, &con->state);
                con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq);
                con->connect_seq++;
                con->peer_features = server_feat;
@@ -1670,7 +1705,7 @@ static bool ceph_con_in_msg_alloc(struct ceph_connection *con,
 
 static int read_partial_message_pages(struct ceph_connection *con,
                                      struct page **pages,
-                                     unsigned data_len, bool do_datacrc)
+                                     unsigned int data_len, bool do_datacrc)
 {
        void *p;
        int ret;
@@ -1703,15 +1738,12 @@ static int read_partial_message_pages(struct ceph_connection *con,
 #ifdef CONFIG_BLOCK
 static int read_partial_message_bio(struct ceph_connection *con,
                                    struct bio **bio_iter, int *bio_seg,
-                                   unsigned data_len, bool do_datacrc)
+                                   unsigned int data_len, bool do_datacrc)
 {
        struct bio_vec *bv = bio_iovec_idx(*bio_iter, *bio_seg);
        void *p;
        int ret, left;
 
-       if (IS_ERR(bv))
-               return PTR_ERR(bv);
-
        left = min((int)(data_len - con->in_msg_pos.data_pos),
                   (int)(bv->bv_len - con->in_msg_pos.page_pos));
 
@@ -1746,7 +1778,7 @@ static int read_partial_message(struct ceph_connection *con)
        int size;
        int end;
        int ret;
-       unsigned front_len, middle_len, data_len;
+       unsigned int front_len, middle_len, data_len;
        bool do_datacrc = !con->msgr->nocrc;
        u64 seq;
        u32 crc;
@@ -1916,7 +1948,7 @@ static void process_message(struct ceph_connection *con)
        con->in_msg->con = NULL;
        msg = con->in_msg;
        con->in_msg = NULL;
-       ceph_con_put(con);
+       con->ops->put(con);
 
        /* if first message, set peer_name */
        if (con->peer_name.type == 0)
@@ -1948,22 +1980,17 @@ static int try_write(struct ceph_connection *con)
 {
        int ret = 1;
 
-       dout("try_write start %p state %lu nref %d\n", con, con->state,
-            atomic_read(&con->nref));
+       dout("try_write start %p state %lu\n", con, con->state);
 
 more:
        dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes);
 
        /* open the socket first? */
        if (con->sock == NULL) {
-               clear_bit(NEGOTIATING, &con->state);
                set_bit(CONNECTING, &con->state);
 
                con_out_kvec_reset(con);
                prepare_write_banner(con);
-               ret = prepare_write_connect(con);
-               if (ret < 0)
-                       goto out;
                prepare_read_banner(con);
 
                BUG_ON(con->in_msg);
@@ -2011,7 +2038,8 @@ more_kvec:
        }
 
 do_next:
-       if (!test_bit(CONNECTING, &con->state)) {
+       if (!test_bit(CONNECTING, &con->state) &&
+                       !test_bit(NEGOTIATING, &con->state)) {
                /* is anything else pending? */
                if (!list_empty(&con->out_queue)) {
                        prepare_write_message(con);
@@ -2068,15 +2096,29 @@ more:
        }
 
        if (test_bit(CONNECTING, &con->state)) {
-               if (!test_bit(NEGOTIATING, &con->state)) {
-                       dout("try_read connecting\n");
-                       ret = read_partial_banner(con);
-                       if (ret <= 0)
-                               goto out;
-                       ret = process_banner(con);
-                       if (ret < 0)
-                               goto out;
-               }
+               dout("try_read connecting\n");
+               ret = read_partial_banner(con);
+               if (ret <= 0)
+                       goto out;
+               ret = process_banner(con);
+               if (ret < 0)
+                       goto out;
+
+               clear_bit(CONNECTING, &con->state);
+               set_bit(NEGOTIATING, &con->state);
+
+               /* Banner is good, exchange connection info */
+               ret = prepare_write_connect(con);
+               if (ret < 0)
+                       goto out;
+               prepare_read_connect(con);
+
+               /* Send connection info before awaiting response */
+               goto out;
+       }
+
+       if (test_bit(NEGOTIATING, &con->state)) {
+               dout("try_read negotiating\n");
                ret = read_partial_connect(con);
                if (ret <= 0)
                        goto out;
@@ -2119,6 +2161,7 @@ more:
                        prepare_read_ack(con);
                        break;
                case CEPH_MSGR_TAG_CLOSE:
+                       clear_bit(CONNECTED, &con->state);
                        set_bit(CLOSED, &con->state);   /* fixme */
                        goto out;
                default:
@@ -2194,6 +2237,18 @@ static void con_work(struct work_struct *work)
 
        mutex_lock(&con->mutex);
 restart:
+       if (test_and_clear_bit(SOCK_CLOSED, &con->flags)) {
+               if (test_and_clear_bit(CONNECTED, &con->state))
+                       con->error_msg = "socket closed";
+               else if (test_and_clear_bit(NEGOTIATING, &con->state))
+                       con->error_msg = "negotiation failed";
+               else if (test_and_clear_bit(CONNECTING, &con->state))
+                       con->error_msg = "connection failed";
+               else
+                       con->error_msg = "unrecognized con state";
+               goto fault;
+       }
+
        if (test_and_clear_bit(BACKOFF, &con->flags)) {
                dout("con_work %p backing off\n", con);
                if (queue_delayed_work(ceph_msgr_wq, &con->work,
@@ -2223,9 +2278,6 @@ restart:
                con_close_socket(con);
        }
 
-       if (test_and_clear_bit(SOCK_CLOSED, &con->flags))
-               goto fault;
-
        ret = try_read(con);
        if (ret == -EAGAIN)
                goto restart;
@@ -2278,7 +2330,7 @@ static void ceph_fault(struct ceph_connection *con)
                con->in_msg->con = NULL;
                ceph_msg_put(con->in_msg);
                con->in_msg = NULL;
-               ceph_con_put(con);
+               con->ops->put(con);
        }
 
        /* Requeue anything that hasn't been acked */
@@ -2397,7 +2449,7 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
        mutex_lock(&con->mutex);
 
        BUG_ON(msg->con != NULL);
-       msg->con = ceph_con_get(con);
+       msg->con = con->ops->get(con);
        BUG_ON(msg->con == NULL);
 
        BUG_ON(!list_empty(&msg->list_head));
@@ -2433,7 +2485,7 @@ void ceph_msg_revoke(struct ceph_msg *msg)
                dout("%s %p msg %p - was on queue\n", __func__, con, msg);
                list_del_init(&msg->list_head);
                BUG_ON(msg->con == NULL);
-               ceph_con_put(msg->con);
+               msg->con->ops->put(msg->con);
                msg->con = NULL;
                msg->hdr.seq = 0;
 
@@ -2456,17 +2508,27 @@ void ceph_msg_revoke(struct ceph_msg *msg)
 /*
  * Revoke a message that we may be reading data into
  */
-void ceph_con_revoke_message(struct ceph_connection *con, struct ceph_msg *msg)
+void ceph_msg_revoke_incoming(struct ceph_msg *msg)
 {
+       struct ceph_connection *con;
+
+       BUG_ON(msg == NULL);
+       if (!msg->con) {
+               dout("%s msg %p null con\n", __func__, msg);
+
+               return;         /* Message not in our possession */
+       }
+
+       con = msg->con;
        mutex_lock(&con->mutex);
-       if (con->in_msg && con->in_msg == msg) {
-               unsigned front_len = le32_to_cpu(con->in_hdr.front_len);
-               unsigned middle_len = le32_to_cpu(con->in_hdr.middle_len);
-               unsigned data_len = le32_to_cpu(con->in_hdr.data_len);
+       if (con->in_msg == msg) {
+               unsigned int front_len = le32_to_cpu(con->in_hdr.front_len);
+               unsigned int middle_len = le32_to_cpu(con->in_hdr.middle_len);
+               unsigned int data_len = le32_to_cpu(con->in_hdr.data_len);
 
                /* skip rest of message */
-               dout("con_revoke_pages %p msg %p revoked\n", con, msg);
-                       con->in_base_pos = con->in_base_pos -
+               dout("%s %p msg %p revoked\n", __func__, con, msg);
+               con->in_base_pos = con->in_base_pos -
                                sizeof(struct ceph_msg_header) -
                                front_len -
                                middle_len -
@@ -2477,8 +2539,8 @@ void ceph_con_revoke_message(struct ceph_connection *con, struct ceph_msg *msg)
                con->in_tag = CEPH_MSGR_TAG_READY;
                con->in_seq++;
        } else {
-               dout("con_revoke_pages %p msg %p pages %p no-op\n",
-                    con, con->in_msg, msg);
+               dout("%s %p in_msg %p msg %p no-op\n",
+                    __func__, con, con->in_msg, msg);
        }
        mutex_unlock(&con->mutex);
 }
@@ -2633,7 +2695,7 @@ static bool ceph_con_in_msg_alloc(struct ceph_connection *con,
                con->in_msg = con->ops->alloc_msg(con, hdr, &skip);
                mutex_lock(&con->mutex);
                if (con->in_msg) {
-                       con->in_msg->con = ceph_con_get(con);
+                       con->in_msg->con = con->ops->get(con);
                        BUG_ON(con->in_msg->con == NULL);
                }
                if (skip)
@@ -2649,7 +2711,7 @@ static bool ceph_con_in_msg_alloc(struct ceph_connection *con,
                               type, front_len);
                        return false;
                }
-               con->in_msg->con = ceph_con_get(con);
+               con->in_msg->con = con->ops->get(con);
                BUG_ON(con->in_msg->con == NULL);
                con->in_msg->page_alignment = le16_to_cpu(hdr->data_off);
        }