{
/* reset connection, out_queue, msg_ and connect_seq */
/* discard existing out_queue and msg_seq */
- mutex_lock(&con->out_mutex);
ceph_msg_remove_list(&con->out_queue);
ceph_msg_remove_list(&con->out_sent);
con->out_msg = NULL;
}
con->in_seq = 0;
- mutex_unlock(&con->out_mutex);
}
/*
dout("con_close %p peer %s\n", con, pr_addr(&con->peer_addr.in_addr));
set_bit(CLOSED, &con->state); /* in case there's queued work */
clear_bit(STANDBY, &con->state); /* avoid connect_seq bump */
+ mutex_lock(&con->mutex);
reset_connection(con);
+ mutex_unlock(&con->mutex);
queue_con(con);
}
memset(con, 0, sizeof(*con));
atomic_set(&con->nref, 1);
con->msgr = msgr;
- mutex_init(&con->out_mutex);
+ mutex_init(&con->mutex);
INIT_LIST_HEAD(&con->out_queue);
INIT_LIST_HEAD(&con->out_sent);
INIT_DELAYED_WORK(&con->work, con_work);
int auth_len = 0;
int auth_protocol = 0;
+ mutex_unlock(&con->mutex);
if (con->ops->get_authorizer)
con->ops->get_authorizer(con, &auth_buf, &auth_len,
&auth_protocol, &con->auth_reply_buf,
&con->auth_reply_buf_len,
con->auth_retry);
+ mutex_lock(&con->mutex);
con->out_connect.authorizer_protocol = cpu_to_le32(auth_protocol);
con->out_connect.authorizer_len = cpu_to_le32(auth_len);
le32_to_cpu(con->out_connect.protocol_version),
le32_to_cpu(con->in_reply.protocol_version));
con->error_msg = "protocol version mismatch";
- if (con->ops->bad_proto)
- con->ops->bad_proto(con);
reset_connection(con);
set_bit(CLOSED, &con->state); /* in case there's queued work */
+
+ mutex_unlock(&con->mutex);
+ if (con->ops->bad_proto)
+ con->ops->bad_proto(con);
+ mutex_lock(&con->mutex);
return -1;
case CEPH_MSGR_TAG_BADAUTHORIZER:
prepare_read_connect(con);
/* Tell ceph about it. */
+ mutex_unlock(&con->mutex);
pr_info("reset on %s%lld\n", ENTITY_NAME(con->peer_name));
if (con->ops->peer_reset)
con->ops->peer_reset(con);
+ mutex_lock(&con->mutex);
break;
case CEPH_MSGR_TAG_RETRY_SESSION:
u64 ack = le64_to_cpu(con->in_temp_ack);
u64 seq;
- mutex_lock(&con->out_mutex);
while (!list_empty(&con->out_sent)) {
m = list_first_entry(&con->out_sent, struct ceph_msg,
list_head);
le16_to_cpu(m->hdr.type), m);
ceph_msg_remove(m);
}
- mutex_unlock(&con->out_mutex);
prepare_read_tag(con);
}
/* find pages for data payload */
want = calc_pages_for(data_off & ~PAGE_MASK, data_len);
ret = -1;
+ mutex_unlock(&con->mutex);
if (con->ops->prepare_pages)
ret = con->ops->prepare_pages(con, m, want);
+ mutex_lock(&con->mutex);
if (ret < 0) {
dout("%p prepare_pages failed, skipping payload\n", m);
con->in_base_pos = -data_len - sizeof(m->footer);
if (con->peer_name.type == 0)
con->peer_name = msg->hdr.src.name;
- mutex_lock(&con->out_mutex);
con->in_seq++;
- mutex_unlock(&con->out_mutex);
+ mutex_unlock(&con->mutex);
dout("===== %p %llu from %s%lld %d=%s len %d+%d (%u %u %u) =====\n",
msg, le64_to_cpu(msg->hdr.seq),
le32_to_cpu(msg->hdr.data_len),
con->in_front_crc, con->in_middle_crc, con->in_data_crc);
con->ops->dispatch(con, msg);
+
+ mutex_lock(&con->mutex);
prepare_read_tag(con);
}
dout("try_write start %p state %lu nref %d\n", con, con->state,
atomic_read(&con->nref));
- mutex_lock(&con->out_mutex);
+ mutex_lock(&con->mutex);
more:
dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes);
done:
ret = 0;
out:
- mutex_unlock(&con->out_mutex);
+ mutex_unlock(&con->mutex);
dout("try_write done on %p\n", con);
return ret;
}
dout("try_read start on %p\n", con);
msgr = con->msgr;
+ mutex_lock(&con->mutex);
+
more:
dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag,
con->in_base_pos);
done:
ret = 0;
out:
+ mutex_unlock(&con->mutex);
dout("try_read done on %p\n", con);
return ret;
clear_bit(BUSY, &con->state); /* to avoid an improbable race */
+ mutex_lock(&con->mutex);
+
con_close_socket(con);
if (con->in_msg) {
/* If there are no messages in the queue, place the connection
* in a STANDBY state (i.e., don't try to reconnect just yet). */
- mutex_lock(&con->out_mutex);
if (list_empty(&con->out_queue) && !con->out_keepalive_pending) {
dout("fault setting STANDBY\n");
set_bit(STANDBY, &con->state);
- mutex_unlock(&con->out_mutex);
+ mutex_unlock(&con->mutex);
goto out;
}
/* Requeue anything that hasn't been acked, and retry after a
* delay. */
list_splice_init(&con->out_sent, &con->out_queue);
- mutex_unlock(&con->out_mutex);
if (con->delay == 0)
con->delay = BASE_DELAY_INTERVAL;
else if (con->delay < MAX_DELAY_INTERVAL)
con->delay *= 2;
+ mutex_unlock(&con->mutex);
+
/* explicitly schedule work to try to reconnect again later. */
dout("fault queueing %p delay %lu\n", con, con->delay);
con->ops->get(con);
msg->hdr.dst_erank = con->peer_addr.erank;
/* queue */
- mutex_lock(&con->out_mutex);
+ mutex_lock(&con->mutex);
BUG_ON(!list_empty(&msg->list_head));
list_add_tail(&msg->list_head, &con->out_queue);
dout("----- %p to %s%lld %d=%s len %d+%d+%d -----\n", msg,
le32_to_cpu(msg->hdr.front_len),
le32_to_cpu(msg->hdr.middle_len),
le32_to_cpu(msg->hdr.data_len));
- mutex_unlock(&con->out_mutex);
+ mutex_unlock(&con->mutex);
/* if there wasn't anything waiting to send before, queue
* new work */
*/
void ceph_con_revoke(struct ceph_connection *con, struct ceph_msg *msg)
{
- mutex_lock(&con->out_mutex);
+ mutex_lock(&con->mutex);
if (!list_empty(&msg->list_head)) {
dout("con_revoke %p msg %p\n", con, msg);
list_del_init(&msg->list_head);
} else {
dout("con_revoke %p msg %p - not queued (sent?)\n", con, msg);
}
- mutex_unlock(&con->out_mutex);
+ mutex_unlock(&con->mutex);
}
/*