libceph: don't mark footer complete before it is
[GitHub/mt8127/android_kernel_alcatel_ttab.git] / net / ceph / messenger.c
index 88ac083bb995f512186b85a669e7c93da801d8ef..5354d59ba8b94305d0c3afbbb2adf9b1280d8d50 100644 (file)
@@ -321,6 +321,7 @@ static int ceph_tcp_connect(struct ceph_connection *con)
 
        dout("connect %s\n", ceph_pr_addr(&con->peer_addr.in_addr));
 
+       con_sock_state_connecting(con);
        ret = sock->ops->connect(sock, (struct sockaddr *)paddr, sizeof(*paddr),
                                 O_NONBLOCK);
        if (ret == -EINPROGRESS) {
@@ -336,8 +337,6 @@ static int ceph_tcp_connect(struct ceph_connection *con)
                return ret;
        }
        con->sock = sock;
-       con_sock_state_connecting(con);
-
        return 0;
 }
 
@@ -415,7 +414,7 @@ static void ceph_msg_remove(struct ceph_msg *msg)
 {
        list_del_init(&msg->list_head);
        BUG_ON(msg->con == NULL);
-       ceph_con_put(msg->con);
+       msg->con->ops->put(msg->con);
        msg->con = NULL;
 
        ceph_msg_put(msg);
@@ -441,7 +440,7 @@ static void reset_connection(struct ceph_connection *con)
                con->in_msg->con = NULL;
                ceph_msg_put(con->in_msg);
                con->in_msg = NULL;
-               ceph_con_put(con->in_msg->con);
+               con->ops->put(con);
        }
 
        con->connect_seq = 0;
@@ -501,30 +500,6 @@ bool ceph_con_opened(struct ceph_connection *con)
        return con->connect_seq > 0;
 }
 
-/*
- * generic get/put
- */
-struct ceph_connection *ceph_con_get(struct ceph_connection *con)
-{
-       int nref = __atomic_add_unless(&con->nref, 1, 0);
-
-       dout("con_get %p nref = %d -> %d\n", con, nref, nref + 1);
-
-       return nref ? con : NULL;
-}
-
-void ceph_con_put(struct ceph_connection *con)
-{
-       int nref = atomic_dec_return(&con->nref);
-
-       BUG_ON(nref < 0);
-       if (nref == 0) {
-               BUG_ON(con->sock);
-               kfree(con);
-       }
-       dout("con_put %p nref = %d -> %d\n", con, nref + 1, nref);
-}
-
 /*
  * initialize a new connection.
  */
@@ -536,7 +511,6 @@ void ceph_con_init(struct ceph_connection *con, void *private,
        memset(con, 0, sizeof(*con));
        con->private = private;
        con->ops = ops;
-       atomic_set(&con->nref, 1);
        con->msgr = msgr;
 
        con_sock_state_init(con);
@@ -591,6 +565,24 @@ static void con_out_kvec_add(struct ceph_connection *con,
        con->out_kvec_bytes += size;
 }
 
+static void prepare_write_message_data(struct ceph_connection *con)
+{
+       struct ceph_msg *msg = con->out_msg;
+
+       BUG_ON(!msg);
+       BUG_ON(!msg->hdr.data_len);
+
+       /* initialize page iterator */
+       con->out_msg_pos.page = 0;
+       if (msg->pages)
+               con->out_msg_pos.page_pos = msg->page_alignment;
+       else
+               con->out_msg_pos.page_pos = 0;
+       con->out_msg_pos.data_pos = 0;
+       con->out_msg_pos.did_page_crc = false;
+       con->out_more = 1;  /* data + footer will follow */
+}
+
 /*
  * Prepare footer for currently outgoing message, and finish things
  * off.  Assumes out_kvec* are already valid.. we just add on to the end.
@@ -600,6 +592,8 @@ static void prepare_write_message_footer(struct ceph_connection *con)
        struct ceph_msg *m = con->out_msg;
        int v = con->out_kvec_left;
 
+       m->footer.flags |= CEPH_MSG_FOOTER_COMPLETE;
+
        dout("prepare_write_message_footer %p\n", con);
        con->out_kvec_is_msg = true;
        con->out_kvec[v].iov_base = &m->footer;
@@ -649,6 +643,10 @@ static void prepare_write_message(struct ceph_connection *con)
                m->hdr.seq = cpu_to_le64(++con->out_seq);
                m->needs_out_seq = false;
        }
+#ifdef CONFIG_BLOCK
+       else
+               m->bio_iter = NULL;
+#endif
 
        dout("prepare_write_message %p seq %lld type %d len %d+%d+%d %d pgs\n",
             m, con->out_seq, le16_to_cpu(m->hdr.type),
@@ -669,7 +667,7 @@ static void prepare_write_message(struct ceph_connection *con)
        /* fill in crc (except data pages), footer */
        crc = crc32c(0, &m->hdr, offsetof(struct ceph_msg_header, crc));
        con->out_msg->hdr.crc = cpu_to_le32(crc);
-       con->out_msg->footer.flags = CEPH_MSG_FOOTER_COMPLETE;
+       con->out_msg->footer.flags = 0;
 
        crc = crc32c(0, m->front.iov_base, m->front.iov_len);
        con->out_msg->footer.front_crc = cpu_to_le32(crc);
@@ -679,26 +677,17 @@ static void prepare_write_message(struct ceph_connection *con)
                con->out_msg->footer.middle_crc = cpu_to_le32(crc);
        } else
                con->out_msg->footer.middle_crc = 0;
-       con->out_msg->footer.data_crc = 0;
-       dout("prepare_write_message front_crc %u data_crc %u\n",
+       dout("%s front_crc %u middle_crc %u\n", __func__,
             le32_to_cpu(con->out_msg->footer.front_crc),
             le32_to_cpu(con->out_msg->footer.middle_crc));
 
        /* is there a data payload? */
-       if (le32_to_cpu(m->hdr.data_len) > 0) {
-               /* initialize page iterator */
-               con->out_msg_pos.page = 0;
-               if (m->pages)
-                       con->out_msg_pos.page_pos = m->page_alignment;
-               else
-                       con->out_msg_pos.page_pos = 0;
-               con->out_msg_pos.data_pos = 0;
-               con->out_msg_pos.did_page_crc = false;
-               con->out_more = 1;  /* data + footer will follow */
-       } else {
+       con->out_msg->footer.data_crc = 0;
+       if (m->hdr.data_len)
+               prepare_write_message_data(con);
+       else
                /* no, queue up footer too and be done */
                prepare_write_message_footer(con);
-       }
 
        set_bit(WRITE_PENDING, &con->flags);
 }
@@ -786,7 +775,7 @@ static void prepare_write_banner(struct ceph_connection *con)
 
 static int prepare_write_connect(struct ceph_connection *con)
 {
-       unsigned global_seq = get_global_seq(con->msgr, 0);
+       unsigned int global_seq = get_global_seq(con->msgr, 0);
        int proto;
        int auth_proto;
        struct ceph_auth_handshake *auth;
@@ -904,6 +893,33 @@ static void iter_bio_next(struct bio **bio_iter, int *seg)
 }
 #endif
 
+static void out_msg_pos_next(struct ceph_connection *con, struct page *page,
+                       size_t len, size_t sent, bool in_trail)
+{
+       struct ceph_msg *msg = con->out_msg;
+
+       BUG_ON(!msg);
+       BUG_ON(!sent);
+
+       con->out_msg_pos.data_pos += sent;
+       con->out_msg_pos.page_pos += sent;
+       if (sent == len) {
+               con->out_msg_pos.page_pos = 0;
+               con->out_msg_pos.page++;
+               con->out_msg_pos.did_page_crc = false;
+               if (in_trail)
+                       list_move_tail(&page->lru,
+                                      &msg->trail->head);
+               else if (msg->pagelist)
+                       list_move_tail(&page->lru,
+                                      &msg->pagelist->head);
+#ifdef CONFIG_BLOCK
+               else if (msg->bio)
+                       iter_bio_next(&msg->bio_iter, &msg->bio_seg);
+#endif
+       }
+}
+
 /*
  * Write as much message data payload as we can.  If we finish, queue
  * up the footer.
@@ -914,16 +930,16 @@ static void iter_bio_next(struct bio **bio_iter, int *seg)
 static int write_partial_msg_pages(struct ceph_connection *con)
 {
        struct ceph_msg *msg = con->out_msg;
-       unsigned data_len = le32_to_cpu(msg->hdr.data_len);
+       unsigned int data_len = le32_to_cpu(msg->hdr.data_len);
        size_t len;
        bool do_datacrc = !con->msgr->nocrc;
        int ret;
        int total_max_write;
-       int in_trail = 0;
+       bool in_trail = false;
        size_t trail_len = (msg->trail ? msg->trail->length : 0);
 
        dout("write_partial_msg_pages %p msg %p page %d/%d offset %d\n",
-            con, con->out_msg, con->out_msg_pos.page, con->out_msg->nr_pages,
+            con, msg, con->out_msg_pos.page, msg->nr_pages,
             con->out_msg_pos.page_pos);
 
 #ifdef CONFIG_BLOCK
@@ -947,13 +963,12 @@ static int write_partial_msg_pages(struct ceph_connection *con)
 
                /* have we reached the trail part of the data? */
                if (con->out_msg_pos.data_pos >= data_len - trail_len) {
-                       in_trail = 1;
+                       in_trail = true;
 
                        total_max_write = data_len - con->out_msg_pos.data_pos;
 
                        page = list_first_entry(&msg->trail->head,
                                                struct page, lru);
-                       max_write = PAGE_SIZE;
                } else if (msg->pages) {
                        page = msg->pages[con->out_msg_pos.page];
                } else if (msg->pagelist) {
@@ -977,14 +992,14 @@ static int write_partial_msg_pages(struct ceph_connection *con)
                if (do_datacrc && !con->out_msg_pos.did_page_crc) {
                        void *base;
                        u32 crc;
-                       u32 tmpcrc = le32_to_cpu(con->out_msg->footer.data_crc);
+                       u32 tmpcrc = le32_to_cpu(msg->footer.data_crc);
                        char *kaddr;
 
                        kaddr = kmap(page);
                        BUG_ON(kaddr == NULL);
                        base = kaddr + con->out_msg_pos.page_pos + bio_offset;
                        crc = crc32c(tmpcrc, base, len);
-                       con->out_msg->footer.data_crc = cpu_to_le32(crc);
+                       msg->footer.data_crc = cpu_to_le32(crc);
                        con->out_msg_pos.did_page_crc = true;
                }
                ret = ceph_tcp_sendpage(con->sock, page,
@@ -997,30 +1012,14 @@ static int write_partial_msg_pages(struct ceph_connection *con)
                if (ret <= 0)
                        goto out;
 
-               con->out_msg_pos.data_pos += ret;
-               con->out_msg_pos.page_pos += ret;
-               if (ret == len) {
-                       con->out_msg_pos.page_pos = 0;
-                       con->out_msg_pos.page++;
-                       con->out_msg_pos.did_page_crc = false;
-                       if (in_trail)
-                               list_move_tail(&page->lru,
-                                              &msg->trail->head);
-                       else if (msg->pagelist)
-                               list_move_tail(&page->lru,
-                                              &msg->pagelist->head);
-#ifdef CONFIG_BLOCK
-                       else if (msg->bio)
-                               iter_bio_next(&msg->bio_iter, &msg->bio_seg);
-#endif
-               }
+               out_msg_pos_next(con, page, len, (size_t) ret, in_trail);
        }
 
        dout("write_partial_msg_pages %p msg %p done\n", con, msg);
 
        /* prepare and queue up footer, too */
        if (!do_datacrc)
-               con->out_msg->footer.flags |= CEPH_MSG_FOOTER_NOCRC;
+               msg->footer.flags |= CEPH_MSG_FOOTER_NOCRC;
        con_out_kvec_reset(con);
        prepare_write_message_footer(con);
        ret = 1;
@@ -1670,7 +1669,7 @@ static bool ceph_con_in_msg_alloc(struct ceph_connection *con,
 
 static int read_partial_message_pages(struct ceph_connection *con,
                                      struct page **pages,
-                                     unsigned data_len, bool do_datacrc)
+                                     unsigned int data_len, bool do_datacrc)
 {
        void *p;
        int ret;
@@ -1703,7 +1702,7 @@ static int read_partial_message_pages(struct ceph_connection *con,
 #ifdef CONFIG_BLOCK
 static int read_partial_message_bio(struct ceph_connection *con,
                                    struct bio **bio_iter, int *bio_seg,
-                                   unsigned data_len, bool do_datacrc)
+                                   unsigned int data_len, bool do_datacrc)
 {
        struct bio_vec *bv = bio_iovec_idx(*bio_iter, *bio_seg);
        void *p;
@@ -1746,7 +1745,7 @@ static int read_partial_message(struct ceph_connection *con)
        int size;
        int end;
        int ret;
-       unsigned front_len, middle_len, data_len;
+       unsigned int front_len, middle_len, data_len;
        bool do_datacrc = !con->msgr->nocrc;
        u64 seq;
        u32 crc;
@@ -1916,7 +1915,7 @@ static void process_message(struct ceph_connection *con)
        con->in_msg->con = NULL;
        msg = con->in_msg;
        con->in_msg = NULL;
-       ceph_con_put(con);
+       con->ops->put(con);
 
        /* if first message, set peer_name */
        if (con->peer_name.type == 0)
@@ -1948,8 +1947,7 @@ static int try_write(struct ceph_connection *con)
 {
        int ret = 1;
 
-       dout("try_write start %p state %lu nref %d\n", con, con->state,
-            atomic_read(&con->nref));
+       dout("try_write start %p state %lu\n", con, con->state);
 
 more:
        dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes);
@@ -2278,7 +2276,7 @@ static void ceph_fault(struct ceph_connection *con)
                con->in_msg->con = NULL;
                ceph_msg_put(con->in_msg);
                con->in_msg = NULL;
-               ceph_con_put(con);
+               con->ops->put(con);
        }
 
        /* Requeue anything that hasn't been acked */
@@ -2397,7 +2395,7 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
        mutex_lock(&con->mutex);
 
        BUG_ON(msg->con != NULL);
-       msg->con = ceph_con_get(con);
+       msg->con = con->ops->get(con);
        BUG_ON(msg->con == NULL);
 
        BUG_ON(!list_empty(&msg->list_head));
@@ -2421,14 +2419,19 @@ EXPORT_SYMBOL(ceph_con_send);
 /*
  * Revoke a message that was previously queued for send
  */
-void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg)
+void ceph_msg_revoke(struct ceph_msg *msg)
 {
+       struct ceph_connection *con = msg->con;
+
+       if (!con)
+               return;         /* Message not in our possession */
+
        mutex_lock(&con->mutex);
        if (!list_empty(&msg->list_head)) {
                dout("%s %p msg %p - was on queue\n", __func__, con, msg);
                list_del_init(&msg->list_head);
                BUG_ON(msg->con == NULL);
-               ceph_con_put(msg->con);
+               msg->con->ops->put(msg->con);
                msg->con = NULL;
                msg->hdr.seq = 0;
 
@@ -2451,17 +2454,27 @@ void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg)
 /*
  * Revoke a message that we may be reading data into
  */
-void ceph_con_revoke_message(struct ceph_connection *con, struct ceph_msg *msg)
+void ceph_msg_revoke_incoming(struct ceph_msg *msg)
 {
+       struct ceph_connection *con;
+
+       BUG_ON(msg == NULL);
+       if (!msg->con) {
+               dout("%s msg %p null con\n", __func__, msg);
+
+               return;         /* Message not in our possession */
+       }
+
+       con = msg->con;
        mutex_lock(&con->mutex);
-       if (con->in_msg && con->in_msg == msg) {
-               unsigned front_len = le32_to_cpu(con->in_hdr.front_len);
-               unsigned middle_len = le32_to_cpu(con->in_hdr.middle_len);
-               unsigned data_len = le32_to_cpu(con->in_hdr.data_len);
+       if (con->in_msg == msg) {
+               unsigned int front_len = le32_to_cpu(con->in_hdr.front_len);
+               unsigned int middle_len = le32_to_cpu(con->in_hdr.middle_len);
+               unsigned int data_len = le32_to_cpu(con->in_hdr.data_len);
 
                /* skip rest of message */
-               dout("con_revoke_pages %p msg %p revoked\n", con, msg);
-                       con->in_base_pos = con->in_base_pos -
+               dout("%s %p msg %p revoked\n", __func__, con, msg);
+               con->in_base_pos = con->in_base_pos -
                                sizeof(struct ceph_msg_header) -
                                front_len -
                                middle_len -
@@ -2472,8 +2485,8 @@ void ceph_con_revoke_message(struct ceph_connection *con, struct ceph_msg *msg)
                con->in_tag = CEPH_MSGR_TAG_READY;
                con->in_seq++;
        } else {
-               dout("con_revoke_pages %p msg %p pages %p no-op\n",
-                    con, con->in_msg, msg);
+               dout("%s %p in_msg %p msg %p no-op\n",
+                    __func__, con, con->in_msg, msg);
        }
        mutex_unlock(&con->mutex);
 }
@@ -2628,7 +2641,7 @@ static bool ceph_con_in_msg_alloc(struct ceph_connection *con,
                con->in_msg = con->ops->alloc_msg(con, hdr, &skip);
                mutex_lock(&con->mutex);
                if (con->in_msg) {
-                       con->in_msg->con = ceph_con_get(con);
+                       con->in_msg->con = con->ops->get(con);
                        BUG_ON(con->in_msg->con == NULL);
                }
                if (skip)
@@ -2644,7 +2657,7 @@ static bool ceph_con_in_msg_alloc(struct ceph_connection *con,
                               type, front_len);
                        return false;
                }
-               con->in_msg->con = ceph_con_get(con);
+               con->in_msg->con = con->ops->get(con);
                BUG_ON(con->in_msg->con == NULL);
                con->in_msg->page_alignment = le16_to_cpu(hdr->data_off);
        }