libceph: handle connection reopen race with callbacks
[GitHub/mt8127/android_kernel_alcatel_ttab.git] / net / ceph / messenger.c
index 05f357828a2fb8de1bb2be9f4103c90058484259..b140dd3515de1b46177e862a495067a98e7b1837 100644 (file)
@@ -598,7 +598,7 @@ static void prepare_write_keepalive(struct ceph_connection *con)
  * Connection negotiation.
  */
 
-static void prepare_connect_authorizer(struct ceph_connection *con)
+static int prepare_connect_authorizer(struct ceph_connection *con)
 {
        void *auth_buf;
        int auth_len = 0;
@@ -612,6 +612,10 @@ static void prepare_connect_authorizer(struct ceph_connection *con)
                                         con->auth_retry);
        mutex_lock(&con->mutex);
 
+       if (test_bit(CLOSED, &con->state) ||
+           test_bit(OPENING, &con->state))
+               return -EAGAIN;
+
        con->out_connect.authorizer_protocol = cpu_to_le32(auth_protocol);
        con->out_connect.authorizer_len = cpu_to_le32(auth_len);
 
@@ -619,6 +623,8 @@ static void prepare_connect_authorizer(struct ceph_connection *con)
        con->out_kvec[con->out_kvec_left].iov_len = auth_len;
        con->out_kvec_left++;
        con->out_kvec_bytes += auth_len;
+
+       return 0;
 }
 
 /*
@@ -640,9 +646,9 @@ static void prepare_write_banner(struct ceph_messenger *msgr,
        set_bit(WRITE_PENDING, &con->state);
 }
 
-static void prepare_write_connect(struct ceph_messenger *msgr,
-                                 struct ceph_connection *con,
-                                 int after_banner)
+static int prepare_write_connect(struct ceph_messenger *msgr,
+                                struct ceph_connection *con,
+                                int after_banner)
 {
        unsigned global_seq = get_global_seq(con->msgr, 0);
        int proto;
@@ -683,7 +689,7 @@ static void prepare_write_connect(struct ceph_messenger *msgr,
        con->out_more = 0;
        set_bit(WRITE_PENDING, &con->state);
 
-       prepare_connect_authorizer(con);
+       return prepare_connect_authorizer(con);
 }
 
 
@@ -1216,6 +1222,7 @@ static int process_connect(struct ceph_connection *con)
        u64 sup_feat = con->msgr->supported_features;
        u64 req_feat = con->msgr->required_features;
        u64 server_feat = le64_to_cpu(con->in_reply.features);
+       int ret;
 
        dout("process_connect on %p tag %d\n", con, (int)con->in_tag);
 
@@ -1250,7 +1257,9 @@ static int process_connect(struct ceph_connection *con)
                        return -1;
                }
                con->auth_retry = 1;
-               prepare_write_connect(con->msgr, con, 0);
+               ret = prepare_write_connect(con->msgr, con, 0);
+               if (ret < 0)
+                       return ret;
                prepare_read_connect(con);
                break;
 
@@ -1277,6 +1286,9 @@ static int process_connect(struct ceph_connection *con)
                if (con->ops->peer_reset)
                        con->ops->peer_reset(con);
                mutex_lock(&con->mutex);
+               if (test_bit(CLOSED, &con->state) ||
+                   test_bit(OPENING, &con->state))
+                       return -EAGAIN;
                break;
 
        case CEPH_MSGR_TAG_RETRY_SESSION:
@@ -1810,6 +1822,17 @@ static int try_read(struct ceph_connection *con)
 more:
        dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag,
             con->in_base_pos);
+
+       /*
+        * process_connect and process_message drop and re-take
+        * con->mutex.  make sure we handle a racing close or reopen.
+        */
+       if (test_bit(CLOSED, &con->state) ||
+           test_bit(OPENING, &con->state)) {
+               ret = -EAGAIN;
+               goto out;
+       }
+
        if (test_bit(CONNECTING, &con->state)) {
                if (!test_bit(NEGOTIATING, &con->state)) {
                        dout("try_read connecting\n");
@@ -1938,8 +1961,10 @@ static void con_work(struct work_struct *work)
 {
        struct ceph_connection *con = container_of(work, struct ceph_connection,
                                                   work.work);
+       int ret;
 
        mutex_lock(&con->mutex);
+restart:
        if (test_and_clear_bit(BACKOFF, &con->state)) {
                dout("con_work %p backing off\n", con);
                if (queue_delayed_work(ceph_msgr_wq, &con->work,
@@ -1969,18 +1994,31 @@ static void con_work(struct work_struct *work)
                con_close_socket(con);
        }
 
-       if (test_and_clear_bit(SOCK_CLOSED, &con->state) ||
-           try_read(con) < 0 ||
-           try_write(con) < 0) {
-               mutex_unlock(&con->mutex);
-               ceph_fault(con);     /* error/fault path */
-               goto done_unlocked;
-       }
+       if (test_and_clear_bit(SOCK_CLOSED, &con->state))
+               goto fault;
+
+       ret = try_read(con);
+       if (ret == -EAGAIN)
+               goto restart;
+       if (ret < 0)
+               goto fault;
+
+       ret = try_write(con);
+       if (ret == -EAGAIN)
+               goto restart;
+       if (ret < 0)
+               goto fault;
 
 done:
        mutex_unlock(&con->mutex);
 done_unlocked:
        con->ops->put(con);
+       return;
+
+fault:
+       mutex_unlock(&con->mutex);
+       ceph_fault(con);     /* error/fault path */
+       goto done_unlocked;
 }
 
 
@@ -2267,6 +2305,19 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags)
        m->more_to_follow = false;
        m->pool = NULL;
 
+       /* middle */
+       m->middle = NULL;
+
+       /* data */
+       m->nr_pages = 0;
+       m->page_alignment = 0;
+       m->pages = NULL;
+       m->pagelist = NULL;
+       m->bio = NULL;
+       m->bio_iter = NULL;
+       m->bio_seg = 0;
+       m->trail = NULL;
+
        /* front */
        if (front_len) {
                if (front_len > PAGE_CACHE_SIZE) {
@@ -2286,19 +2337,6 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags)
        }
        m->front.iov_len = front_len;
 
-       /* middle */
-       m->middle = NULL;
-
-       /* data */
-       m->nr_pages = 0;
-       m->page_alignment = 0;
-       m->pages = NULL;
-       m->pagelist = NULL;
-       m->bio = NULL;
-       m->bio_iter = NULL;
-       m->bio_seg = 0;
-       m->trail = NULL;
-
        dout("ceph_msg_new %p front %d\n", m, front_len);
        return m;