libceph: use keepalive2 to verify the mon session is alive
authorYan, Zheng <zyan@redhat.com>
Tue, 1 Sep 2015 09:19:38 +0000 (17:19 +0800)
committerIlya Dryomov <idryomov@gmail.com>
Tue, 8 Sep 2015 20:14:30 +0000 (23:14 +0300)
Signed-off-by: Yan, Zheng <zyan@redhat.com>
Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
include/linux/ceph/libceph.h
include/linux/ceph/messenger.h
include/linux/ceph/msgr.h
net/ceph/ceph_common.c
net/ceph/messenger.c
net/ceph/mon_client.c

index 9ebee53d3bf586ef80690fa778c0a3e030e410f1..397c5cd09794854ebc8891fac59b4dfa50e7e141 100644 (file)
@@ -46,6 +46,7 @@ struct ceph_options {
        unsigned long mount_timeout;            /* jiffies */
        unsigned long osd_idle_ttl;             /* jiffies */
        unsigned long osd_keepalive_timeout;    /* jiffies */
+       unsigned long monc_ping_timeout;        /* jiffies */
 
        /*
         * any type that can't be simply compared or doesn't need need
@@ -66,6 +67,7 @@ struct ceph_options {
 #define CEPH_MOUNT_TIMEOUT_DEFAULT     msecs_to_jiffies(60 * 1000)
 #define CEPH_OSD_KEEPALIVE_DEFAULT     msecs_to_jiffies(5 * 1000)
 #define CEPH_OSD_IDLE_TTL_DEFAULT      msecs_to_jiffies(60 * 1000)
+#define CEPH_MONC_PING_TIMEOUT_DEFAULT msecs_to_jiffies(30 * 1000)
 
 #define CEPH_MSG_MAX_FRONT_LEN (16*1024*1024)
 #define CEPH_MSG_MAX_MIDDLE_LEN        (16*1024*1024)
index 37753278987ac654a5e00797e6d6b81e4b2353c0..7e1252e97a30bc92d4065faf4ee5ab38d7839156 100644 (file)
@@ -248,6 +248,8 @@ struct ceph_connection {
        int in_base_pos;     /* bytes read */
        __le64 in_temp_ack;  /* for reading an ack */
 
+       struct timespec last_keepalive_ack;
+
        struct delayed_work work;           /* send|recv work */
        unsigned long       delay;          /* current delay interval */
 };
@@ -285,6 +287,8 @@ extern void ceph_msg_revoke(struct ceph_msg *msg);
 extern void ceph_msg_revoke_incoming(struct ceph_msg *msg);
 
 extern void ceph_con_keepalive(struct ceph_connection *con);
+extern bool ceph_con_keepalive_expired(struct ceph_connection *con,
+                                      unsigned long interval);
 
 extern void ceph_msg_data_add_pages(struct ceph_msg *msg, struct page **pages,
                                size_t length, size_t alignment);
index 1c1887206ffa928264acbc1cf63aa3b36717b3b9..0fe2656ac415711cce5bfc53f3c9ed753bf3ed26 100644 (file)
@@ -84,10 +84,12 @@ struct ceph_entity_inst {
 #define CEPH_MSGR_TAG_MSG           7  /* message */
 #define CEPH_MSGR_TAG_ACK           8  /* message ack */
 #define CEPH_MSGR_TAG_KEEPALIVE     9  /* just a keepalive byte! */
-#define CEPH_MSGR_TAG_BADPROTOVER  10  /* bad protocol version */
+#define CEPH_MSGR_TAG_BADPROTOVER   10 /* bad protocol version */
 #define CEPH_MSGR_TAG_BADAUTHORIZER 11 /* bad authorizer */
 #define CEPH_MSGR_TAG_FEATURES      12 /* insufficient features */
 #define CEPH_MSGR_TAG_SEQ           13 /* 64-bit int follows with seen seq number */
+#define CEPH_MSGR_TAG_KEEPALIVE2    14 /* keepalive2 byte + ceph_timespec */
+#define CEPH_MSGR_TAG_KEEPALIVE2_ACK 15 /* keepalive2 reply */
 
 
 /*
index f30329f726418bdc2e82bb5aced4c3634e215850..3f56eefc2a073e9510bea7c4a8637009e1503466 100644 (file)
@@ -357,6 +357,7 @@ ceph_parse_options(char *options, const char *dev_name,
        opt->osd_keepalive_timeout = CEPH_OSD_KEEPALIVE_DEFAULT;
        opt->mount_timeout = CEPH_MOUNT_TIMEOUT_DEFAULT;
        opt->osd_idle_ttl = CEPH_OSD_IDLE_TTL_DEFAULT;
+       opt->monc_ping_timeout = CEPH_MONC_PING_TIMEOUT_DEFAULT;
 
        /* get mon ip(s) */
        /* ip1[:port1][,ip2[:port2]...] */
index 101ab6285fbae8f87208623366ef9c5ed75869e2..36757d46ac406d300b20a92790fea1b957e27108 100644 (file)
@@ -163,6 +163,7 @@ static struct kmem_cache    *ceph_msg_data_cache;
 static char tag_msg = CEPH_MSGR_TAG_MSG;
 static char tag_ack = CEPH_MSGR_TAG_ACK;
 static char tag_keepalive = CEPH_MSGR_TAG_KEEPALIVE;
+static char tag_keepalive2 = CEPH_MSGR_TAG_KEEPALIVE2;
 
 #ifdef CONFIG_LOCKDEP
 static struct lock_class_key socket_class;
@@ -1351,7 +1352,15 @@ static void prepare_write_keepalive(struct ceph_connection *con)
 {
        dout("prepare_write_keepalive %p\n", con);
        con_out_kvec_reset(con);
-       con_out_kvec_add(con, sizeof (tag_keepalive), &tag_keepalive);
+       if (con->peer_features & CEPH_FEATURE_MSGR_KEEPALIVE2) {
+               struct timespec ts = CURRENT_TIME;
+               struct ceph_timespec ceph_ts;
+               ceph_encode_timespec(&ceph_ts, &ts);
+               con_out_kvec_add(con, sizeof(tag_keepalive2), &tag_keepalive2);
+               con_out_kvec_add(con, sizeof(ceph_ts), &ceph_ts);
+       } else {
+               con_out_kvec_add(con, sizeof(tag_keepalive), &tag_keepalive);
+       }
        con_flag_set(con, CON_FLAG_WRITE_PENDING);
 }
 
@@ -1625,6 +1634,12 @@ static void prepare_read_tag(struct ceph_connection *con)
        con->in_tag = CEPH_MSGR_TAG_READY;
 }
 
+static void prepare_read_keepalive_ack(struct ceph_connection *con)
+{
+       dout("prepare_read_keepalive_ack %p\n", con);
+       con->in_base_pos = 0;
+}
+
 /*
  * Prepare to read a message.
  */
@@ -2457,6 +2472,17 @@ static void process_message(struct ceph_connection *con)
        mutex_lock(&con->mutex);
 }
 
+static int read_keepalive_ack(struct ceph_connection *con)
+{
+       struct ceph_timespec ceph_ts;
+       size_t size = sizeof(ceph_ts);
+       int ret = read_partial(con, size, size, &ceph_ts);
+       if (ret <= 0)
+               return ret;
+       ceph_decode_timespec(&con->last_keepalive_ack, &ceph_ts);
+       prepare_read_tag(con);
+       return 1;
+}
 
 /*
  * Write something to the socket.  Called in a worker thread when the
@@ -2526,6 +2552,10 @@ more_kvec:
 
 do_next:
        if (con->state == CON_STATE_OPEN) {
+               if (con_flag_test_and_clear(con, CON_FLAG_KEEPALIVE_PENDING)) {
+                       prepare_write_keepalive(con);
+                       goto more;
+               }
                /* is anything else pending? */
                if (!list_empty(&con->out_queue)) {
                        prepare_write_message(con);
@@ -2535,10 +2565,6 @@ do_next:
                        prepare_write_ack(con);
                        goto more;
                }
-               if (con_flag_test_and_clear(con, CON_FLAG_KEEPALIVE_PENDING)) {
-                       prepare_write_keepalive(con);
-                       goto more;
-               }
        }
 
        /* Nothing to do! */
@@ -2641,6 +2667,9 @@ more:
                case CEPH_MSGR_TAG_ACK:
                        prepare_read_ack(con);
                        break;
+               case CEPH_MSGR_TAG_KEEPALIVE2_ACK:
+                       prepare_read_keepalive_ack(con);
+                       break;
                case CEPH_MSGR_TAG_CLOSE:
                        con_close_socket(con);
                        con->state = CON_STATE_CLOSED;
@@ -2684,6 +2713,12 @@ more:
                process_ack(con);
                goto more;
        }
+       if (con->in_tag == CEPH_MSGR_TAG_KEEPALIVE2_ACK) {
+               ret = read_keepalive_ack(con);
+               if (ret <= 0)
+                       goto out;
+               goto more;
+       }
 
 out:
        dout("try_read done on %p ret %d\n", con, ret);
@@ -3101,6 +3136,20 @@ void ceph_con_keepalive(struct ceph_connection *con)
 }
 EXPORT_SYMBOL(ceph_con_keepalive);
 
+bool ceph_con_keepalive_expired(struct ceph_connection *con,
+                              unsigned long interval)
+{
+       if (interval > 0 &&
+           (con->peer_features & CEPH_FEATURE_MSGR_KEEPALIVE2)) {
+               struct timespec now = CURRENT_TIME;
+               struct timespec ts;
+               jiffies_to_timespec(interval, &ts);
+               ts = timespec_add(con->last_keepalive_ack, ts);
+               return timespec_compare(&now, &ts) >= 0;
+       }
+       return false;
+}
+
 static struct ceph_msg_data *ceph_msg_data_create(enum ceph_msg_data_type type)
 {
        struct ceph_msg_data *data;
index 9d6ff1215928cb69787a85421fdbab9e2a1c8bf5..edda01626a459efbfdaeebd46fd6a0b13a037879 100644 (file)
@@ -149,6 +149,10 @@ static int __open_session(struct ceph_mon_client *monc)
                              CEPH_ENTITY_TYPE_MON, monc->cur_mon,
                              &monc->monmap->mon_inst[monc->cur_mon].addr);
 
+               /* send an initial keepalive to ensure our timestamp is
+                * valid by the time we are in an OPENED state */
+               ceph_con_keepalive(&monc->con);
+
                /* initiatiate authentication handshake */
                ret = ceph_auth_build_hello(monc->auth,
                                            monc->m_auth->front.iov_base,
@@ -170,14 +174,19 @@ static bool __sub_expired(struct ceph_mon_client *monc)
  */
 static void __schedule_delayed(struct ceph_mon_client *monc)
 {
-       unsigned int delay;
+       struct ceph_options *opt = monc->client->options;
+       unsigned long delay;
 
-       if (monc->cur_mon < 0 || __sub_expired(monc))
+       if (monc->cur_mon < 0 || __sub_expired(monc)) {
                delay = 10 * HZ;
-       else
+       } else {
                delay = 20 * HZ;
-       dout("__schedule_delayed after %u\n", delay);
-       schedule_delayed_work(&monc->delayed_work, delay);
+               if (opt->monc_ping_timeout > 0)
+                       delay = min(delay, opt->monc_ping_timeout / 3);
+       }
+       dout("__schedule_delayed after %lu\n", delay);
+       schedule_delayed_work(&monc->delayed_work,
+                             round_jiffies_relative(delay));
 }
 
 /*
@@ -743,11 +752,23 @@ static void delayed_work(struct work_struct *work)
                __close_session(monc);
                __open_session(monc);  /* continue hunting */
        } else {
-               ceph_con_keepalive(&monc->con);
+               struct ceph_options *opt = monc->client->options;
+               int is_auth = ceph_auth_is_authenticated(monc->auth);
+               if (ceph_con_keepalive_expired(&monc->con,
+                                              opt->monc_ping_timeout)) {
+                       dout("monc keepalive timeout\n");
+                       is_auth = 0;
+                       __close_session(monc);
+                       monc->hunting = true;
+                       __open_session(monc);
+               }
 
-               __validate_auth(monc);
+               if (!monc->hunting) {
+                       ceph_con_keepalive(&monc->con);
+                       __validate_auth(monc);
+               }
 
-               if (ceph_auth_is_authenticated(monc->auth))
+               if (is_auth)
                        __send_subscribe(monc);
        }
        __schedule_delayed(monc);