ret = o2nm_cluster_attr_write(page, count, &val);
if (ret > 0) {
- if (val <= cluster->cl_keepalive_delay_ms) {
+ if (cluster->cl_idle_timeout_ms != val
+ && o2net_num_connected_peers()) {
+ mlog(ML_NOTICE,
+ "o2net: cannot change idle timeout after "
+ "the first peer has agreed to it."
+ " %d connected peers\n",
+ o2net_num_connected_peers());
+ ret = -EINVAL;
+ } else if (val <= cluster->cl_keepalive_delay_ms) {
mlog(ML_NOTICE, "o2net: idle timeout must be larger "
"than keepalive delay\n");
- return -EINVAL;
+ ret = -EINVAL;
+ } else {
+ cluster->cl_idle_timeout_ms = val;
}
- cluster->cl_idle_timeout_ms = val;
}
return ret;
ret = o2nm_cluster_attr_write(page, count, &val);
if (ret > 0) {
- if (val >= cluster->cl_idle_timeout_ms) {
+ if (cluster->cl_keepalive_delay_ms != val
+ && o2net_num_connected_peers()) {
+ mlog(ML_NOTICE,
+ "o2net: cannot change keepalive delay after"
+ " the first peer has agreed to it."
+ " %d connected peers\n",
+ o2net_num_connected_peers());
+ ret = -EINVAL;
+ } else if (val >= cluster->cl_idle_timeout_ms) {
mlog(ML_NOTICE, "o2net: keepalive delay must be "
"smaller than idle timeout\n");
- return -EINVAL;
+ ret = -EINVAL;
+ } else {
+ cluster->cl_keepalive_delay_ms = val;
}
- cluster->cl_keepalive_delay_ms = val;
}
return ret;
sc_put(sc);
}
+static atomic_t o2net_connected_peers = ATOMIC_INIT(0);
+
+int o2net_num_connected_peers(void)
+{
+ return atomic_read(&o2net_connected_peers);
+}
+
static void o2net_set_nn_state(struct o2net_node *nn,
struct o2net_sock_container *sc,
unsigned valid, int err)
assert_spin_locked(&nn->nn_lock);
+ if (old_sc && !sc)
+ atomic_dec(&o2net_connected_peers);
+ else if (!old_sc && sc)
+ atomic_inc(&o2net_connected_peers);
+
/* the node num comparison and single connect/accept path should stop
* an non-null sc from being overwritten with another */
BUG_ON(sc && nn->nn_sc && nn->nn_sc != sc);
return -1;
}
+ /*
+ * Ensure timeouts are consistent with other nodes, otherwise
+ * we can end up with one node thinking that the other must be down,
+ * but isn't. This can ultimately cause corruption.
+ */
+ if (be32_to_cpu(hand->o2net_idle_timeout_ms) !=
+ o2net_idle_timeout(sc->sc_node)) {
+ mlog(ML_NOTICE, SC_NODEF_FMT " uses a network idle timeout of "
+ "%u ms, but we use %u ms locally. disconnecting\n",
+ SC_NODEF_ARGS(sc),
+ be32_to_cpu(hand->o2net_idle_timeout_ms),
+ o2net_idle_timeout(sc->sc_node));
+ o2net_ensure_shutdown(nn, sc, -ENOTCONN);
+ return -1;
+ }
+
+ if (be32_to_cpu(hand->o2net_keepalive_delay_ms) !=
+ o2net_keepalive_delay(sc->sc_node)) {
+ mlog(ML_NOTICE, SC_NODEF_FMT " uses a keepalive delay of "
+ "%u ms, but we use %u ms locally. disconnecting\n",
+ SC_NODEF_ARGS(sc),
+ be32_to_cpu(hand->o2net_keepalive_delay_ms),
+ o2net_keepalive_delay(sc->sc_node));
+ o2net_ensure_shutdown(nn, sc, -ENOTCONN);
+ return -1;
+ }
+
+ if (be32_to_cpu(hand->o2hb_heartbeat_timeout_ms) !=
+ O2HB_MAX_WRITE_TIMEOUT_MS) {
+ mlog(ML_NOTICE, SC_NODEF_FMT " uses a heartbeat timeout of "
+ "%u ms, but we use %u ms locally. disconnecting\n",
+ SC_NODEF_ARGS(sc),
+ be32_to_cpu(hand->o2hb_heartbeat_timeout_ms),
+ O2HB_MAX_WRITE_TIMEOUT_MS);
+ o2net_ensure_shutdown(nn, sc, -ENOTCONN);
+ return -1;
+ }
+
sc->sc_handshake_ok = 1;
spin_lock(&nn->nn_lock);
sclog(sc, "receiving\n");
do_gettimeofday(&sc->sc_tv_advance_start);
+ if (unlikely(sc->sc_handshake_ok == 0)) {
+ if(sc->sc_page_off < sizeof(struct o2net_handshake)) {
+ data = page_address(sc->sc_page) + sc->sc_page_off;
+ datalen = sizeof(struct o2net_handshake) - sc->sc_page_off;
+ ret = o2net_recv_tcp_msg(sc->sc_sock, data, datalen);
+ if (ret > 0)
+ sc->sc_page_off += ret;
+ }
+
+ if (sc->sc_page_off == sizeof(struct o2net_handshake)) {
+ o2net_check_handshake(sc);
+ if (unlikely(sc->sc_handshake_ok == 0))
+ ret = -EPROTO;
+ }
+ goto out;
+ }
+
/* do we need more header? */
if (sc->sc_page_off < sizeof(struct o2net_msg)) {
data = page_address(sc->sc_page) + sc->sc_page_off;
ret = o2net_recv_tcp_msg(sc->sc_sock, data, datalen);
if (ret > 0) {
sc->sc_page_off += ret;
-
- /* this working relies on the handshake being
- * smaller than the normal message header */
- if (sc->sc_page_off >= sizeof(struct o2net_handshake)&&
- !sc->sc_handshake_ok && o2net_check_handshake(sc)) {
- ret = -EPROTO;
- goto out;
- }
-
/* only swab incoming here.. we can
* only get here once as we cross from
* being under to over */
return ret;
}
+static void o2net_initialize_handshake(void)
+{
+ o2net_hand->o2hb_heartbeat_timeout_ms = cpu_to_be32(
+ O2HB_MAX_WRITE_TIMEOUT_MS);
+ o2net_hand->o2net_idle_timeout_ms = cpu_to_be32(
+ o2net_idle_timeout(NULL));
+ o2net_hand->o2net_keepalive_delay_ms = cpu_to_be32(
+ o2net_keepalive_delay(NULL));
+ o2net_hand->o2net_reconnect_delay_ms = cpu_to_be32(
+ o2net_reconnect_delay(NULL));
+}
+
/* ------------------------------------------------------------ */
/* called when a connect completes and after a sock is accepted. the
(unsigned long long)O2NET_PROTOCOL_VERSION,
(unsigned long long)be64_to_cpu(o2net_hand->connector_id));
+ o2net_initialize_handshake();
o2net_sendpage(sc, o2net_hand, sizeof(*o2net_hand));
sc_put(sc);
}
if (node_num != o2nm_this_node())
o2net_disconnect_node(node);
+
+ BUG_ON(atomic_read(&o2net_connected_peers) < 0);
}
static void o2net_hb_node_up_cb(struct o2nm_node *node, int node_num,
o2net_register_callbacks(sc->sc_sock->sk, sc);
o2net_sc_queue_work(sc, &sc->sc_rx_work);
+ o2net_initialize_handshake();
o2net_sendpage(sc, o2net_hand, sizeof(*o2net_hand));
out: