ceph: use connection mutex to protect read and write stages
authorSage Weil <sage@newdream.net>
Tue, 22 Dec 2009 18:43:42 +0000 (10:43 -0800)
committerSage Weil <sage@newdream.net>
Wed, 23 Dec 2009 16:17:19 +0000 (08:17 -0800)
Use a single mutex (previously out_mutex) to protect both read and write
activity from concurrent ceph_con_* calls.  Drop the mutex when doing
callbacks to avoid nested locking (the callback may need to call something
like ceph_con_close).

Signed-off-by: Sage Weil <sage@newdream.net>
fs/ceph/messenger.c
fs/ceph/messenger.h

index 2e4e9773c46bf998ba62cba4fef6ad4b4da58607..c03b4185c143c3deb74a24baacced90ee760ae80 100644 (file)
@@ -316,7 +316,6 @@ static void reset_connection(struct ceph_connection *con)
 {
        /* reset connection, out_queue, msg_ and connect_seq */
        /* discard existing out_queue and msg_seq */
-       mutex_lock(&con->out_mutex);
        ceph_msg_remove_list(&con->out_queue);
        ceph_msg_remove_list(&con->out_sent);
 
@@ -332,7 +331,6 @@ static void reset_connection(struct ceph_connection *con)
                con->out_msg = NULL;
        }
        con->in_seq = 0;
-       mutex_unlock(&con->out_mutex);
 }
 
 /*
@@ -343,7 +341,9 @@ void ceph_con_close(struct ceph_connection *con)
        dout("con_close %p peer %s\n", con, pr_addr(&con->peer_addr.in_addr));
        set_bit(CLOSED, &con->state);  /* in case there's queued work */
        clear_bit(STANDBY, &con->state);  /* avoid connect_seq bump */
+       mutex_lock(&con->mutex);
        reset_connection(con);
+       mutex_unlock(&con->mutex);
        queue_con(con);
 }
 
@@ -392,7 +392,7 @@ void ceph_con_init(struct ceph_messenger *msgr, struct ceph_connection *con)
        memset(con, 0, sizeof(*con));
        atomic_set(&con->nref, 1);
        con->msgr = msgr;
-       mutex_init(&con->out_mutex);
+       mutex_init(&con->mutex);
        INIT_LIST_HEAD(&con->out_queue);
        INIT_LIST_HEAD(&con->out_sent);
        INIT_DELAYED_WORK(&con->work, con_work);
@@ -571,11 +571,13 @@ static void prepare_connect_authorizer(struct ceph_connection *con)
        int auth_len = 0;
        int auth_protocol = 0;
 
+       mutex_unlock(&con->mutex);
        if (con->ops->get_authorizer)
                con->ops->get_authorizer(con, &auth_buf, &auth_len,
                                         &auth_protocol, &con->auth_reply_buf,
                                         &con->auth_reply_buf_len,
                                         con->auth_retry);
+       mutex_lock(&con->mutex);
 
        con->out_connect.authorizer_protocol = cpu_to_le32(auth_protocol);
        con->out_connect.authorizer_len = cpu_to_le32(auth_len);
@@ -1094,10 +1096,13 @@ static int process_connect(struct ceph_connection *con)
                       le32_to_cpu(con->out_connect.protocol_version),
                       le32_to_cpu(con->in_reply.protocol_version));
                con->error_msg = "protocol version mismatch";
-               if (con->ops->bad_proto)
-                       con->ops->bad_proto(con);
                reset_connection(con);
                set_bit(CLOSED, &con->state);  /* in case there's queued work */
+
+               mutex_unlock(&con->mutex);
+               if (con->ops->bad_proto)
+                       con->ops->bad_proto(con);
+               mutex_lock(&con->mutex);
                return -1;
 
        case CEPH_MSGR_TAG_BADAUTHORIZER:
@@ -1133,9 +1138,11 @@ static int process_connect(struct ceph_connection *con)
                prepare_read_connect(con);
 
                /* Tell ceph about it. */
+               mutex_unlock(&con->mutex);
                pr_info("reset on %s%lld\n", ENTITY_NAME(con->peer_name));
                if (con->ops->peer_reset)
                        con->ops->peer_reset(con);
+               mutex_lock(&con->mutex);
                break;
 
        case CEPH_MSGR_TAG_RETRY_SESSION:
@@ -1221,7 +1228,6 @@ static void process_ack(struct ceph_connection *con)
        u64 ack = le64_to_cpu(con->in_temp_ack);
        u64 seq;
 
-       mutex_lock(&con->out_mutex);
        while (!list_empty(&con->out_sent)) {
                m = list_first_entry(&con->out_sent, struct ceph_msg,
                                     list_head);
@@ -1232,7 +1238,6 @@ static void process_ack(struct ceph_connection *con)
                     le16_to_cpu(m->hdr.type), m);
                ceph_msg_remove(m);
        }
-       mutex_unlock(&con->out_mutex);
        prepare_read_tag(con);
 }
 
@@ -1366,8 +1371,10 @@ static int read_partial_message(struct ceph_connection *con)
                /* find pages for data payload */
                want = calc_pages_for(data_off & ~PAGE_MASK, data_len);
                ret = -1;
+               mutex_unlock(&con->mutex);
                if (con->ops->prepare_pages)
                        ret = con->ops->prepare_pages(con, m, want);
+               mutex_lock(&con->mutex);
                if (ret < 0) {
                        dout("%p prepare_pages failed, skipping payload\n", m);
                        con->in_base_pos = -data_len - sizeof(m->footer);
@@ -1454,9 +1461,8 @@ static void process_message(struct ceph_connection *con)
        if (con->peer_name.type == 0)
                con->peer_name = msg->hdr.src.name;
 
-       mutex_lock(&con->out_mutex);
        con->in_seq++;
-       mutex_unlock(&con->out_mutex);
+       mutex_unlock(&con->mutex);
 
        dout("===== %p %llu from %s%lld %d=%s len %d+%d (%u %u %u) =====\n",
             msg, le64_to_cpu(msg->hdr.seq),
@@ -1467,6 +1473,8 @@ static void process_message(struct ceph_connection *con)
             le32_to_cpu(msg->hdr.data_len),
             con->in_front_crc, con->in_middle_crc, con->in_data_crc);
        con->ops->dispatch(con, msg);
+
+       mutex_lock(&con->mutex);
        prepare_read_tag(con);
 }
 
@@ -1483,7 +1491,7 @@ static int try_write(struct ceph_connection *con)
        dout("try_write start %p state %lu nref %d\n", con, con->state,
             atomic_read(&con->nref));
 
-       mutex_lock(&con->out_mutex);
+       mutex_lock(&con->mutex);
 more:
        dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes);
 
@@ -1576,7 +1584,7 @@ do_next:
 done:
        ret = 0;
 out:
-       mutex_unlock(&con->out_mutex);
+       mutex_unlock(&con->mutex);
        dout("try_write done on %p\n", con);
        return ret;
 }
@@ -1600,6 +1608,8 @@ static int try_read(struct ceph_connection *con)
        dout("try_read start on %p\n", con);
        msgr = con->msgr;
 
+       mutex_lock(&con->mutex);
+
 more:
        dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag,
             con->in_base_pos);
@@ -1693,6 +1703,7 @@ more:
 done:
        ret = 0;
 out:
+       mutex_unlock(&con->mutex);
        dout("try_read done on %p\n", con);
        return ret;
 
@@ -1818,6 +1829,8 @@ static void ceph_fault(struct ceph_connection *con)
 
        clear_bit(BUSY, &con->state);  /* to avoid an improbable race */
 
+       mutex_lock(&con->mutex);
+
        con_close_socket(con);
 
        if (con->in_msg) {
@@ -1827,24 +1840,24 @@ static void ceph_fault(struct ceph_connection *con)
 
        /* If there are no messages in the queue, place the connection
         * in a STANDBY state (i.e., don't try to reconnect just yet). */
-       mutex_lock(&con->out_mutex);
        if (list_empty(&con->out_queue) && !con->out_keepalive_pending) {
                dout("fault setting STANDBY\n");
                set_bit(STANDBY, &con->state);
-               mutex_unlock(&con->out_mutex);
+               mutex_unlock(&con->mutex);
                goto out;
        }
 
        /* Requeue anything that hasn't been acked, and retry after a
         * delay. */
        list_splice_init(&con->out_sent, &con->out_queue);
-       mutex_unlock(&con->out_mutex);
 
        if (con->delay == 0)
                con->delay = BASE_DELAY_INTERVAL;
        else if (con->delay < MAX_DELAY_INTERVAL)
                con->delay *= 2;
 
+       mutex_unlock(&con->mutex);
+
        /* explicitly schedule work to try to reconnect again later. */
        dout("fault queueing %p delay %lu\n", con, con->delay);
        con->ops->get(con);
@@ -1920,7 +1933,7 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
        msg->hdr.dst_erank = con->peer_addr.erank;
 
        /* queue */
-       mutex_lock(&con->out_mutex);
+       mutex_lock(&con->mutex);
        BUG_ON(!list_empty(&msg->list_head));
        list_add_tail(&msg->list_head, &con->out_queue);
        dout("----- %p to %s%lld %d=%s len %d+%d+%d -----\n", msg,
@@ -1929,7 +1942,7 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
             le32_to_cpu(msg->hdr.front_len),
             le32_to_cpu(msg->hdr.middle_len),
             le32_to_cpu(msg->hdr.data_len));
-       mutex_unlock(&con->out_mutex);
+       mutex_unlock(&con->mutex);
 
        /* if there wasn't anything waiting to send before, queue
         * new work */
@@ -1942,7 +1955,7 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
  */
 void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg)
 {
-       mutex_lock(&con->out_mutex);
+       mutex_lock(&con->mutex);
        if (!list_empty(&msg->list_head)) {
                dout("con_revoke %p msg %p\n", con, msg);
                list_del_init(&msg->list_head);
@@ -1959,7 +1972,7 @@ void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg)
        } else {
                dout("con_revoke %p msg %p - not queued (sent?)\n", con, msg);
        }
-       mutex_unlock(&con->out_mutex);
+       mutex_unlock(&con->mutex);
 }
 
 /*
index e04c214b4f6f000fb14a7230444656d0ea3f0668..94b55de90331aa29f2f5eb58162e7285da1ff46a 100644 (file)
@@ -155,8 +155,9 @@ struct ceph_connection {
        void *auth_reply_buf;   /* where to put the authorizer reply */
        int auth_reply_buf_len;
 
+       struct mutex mutex;
+
        /* out queue */
-       struct mutex out_mutex;
        struct list_head out_queue;
        struct list_head out_sent;   /* sending or sent but unacked */
        u64 out_seq;                 /* last message queued for send */