static void ceph_msg_remove(struct ceph_msg *msg)
{
list_del_init(&msg->list_head);
- BUG_ON(msg->con == NULL);
- msg->con->ops->put(msg->con);
- msg->con = NULL;
ceph_msg_put(msg);
}
if (con->in_msg) {
BUG_ON(con->in_msg->con != con);
- con->in_msg->con = NULL;
ceph_msg_put(con->in_msg);
con->in_msg = NULL;
- con->ops->put(con);
}
con->connect_seq = 0;
con->out_seq = 0;
if (con->out_msg) {
+ BUG_ON(con->out_msg->con != con);
ceph_msg_put(con->out_msg);
con->out_msg = NULL;
}
*/
static void process_message(struct ceph_connection *con)
{
- struct ceph_msg *msg;
+ struct ceph_msg *msg = con->in_msg;
BUG_ON(con->in_msg->con != con);
- con->in_msg->con = NULL;
- msg = con->in_msg;
con->in_msg = NULL;
- con->ops->put(con);
/* if first message, set peer_name */
if (con->peer_name.type == 0)
if (con->in_msg) {
BUG_ON(con->in_msg->con != con);
- con->in_msg->con = NULL;
ceph_msg_put(con->in_msg);
con->in_msg = NULL;
- con->ops->put(con);
}
/* Requeue anything that hasn't been acked */
}
EXPORT_SYMBOL(ceph_messenger_fini);
+static void msg_con_set(struct ceph_msg *msg, struct ceph_connection *con)
+{
+ if (msg->con)
+ msg->con->ops->put(msg->con);
+
+ msg->con = con ? con->ops->get(con) : NULL;
+ BUG_ON(msg->con != con);
+}
+
static void clear_standby(struct ceph_connection *con)
{
/* come back from STANDBY? */
return;
}
- BUG_ON(msg->con != NULL);
- msg->con = con->ops->get(con);
- BUG_ON(msg->con == NULL);
+ msg_con_set(msg, con);
BUG_ON(!list_empty(&msg->list_head));
list_add_tail(&msg->list_head, &con->out_queue);
{
struct ceph_connection *con = msg->con;
- if (!con)
+ if (!con) {
+ dout("%s msg %p null con\n", __func__, msg);
return; /* Message not in our possession */
+ }
mutex_lock(&con->mutex);
if (!list_empty(&msg->list_head)) {
dout("%s %p msg %p - was on queue\n", __func__, con, msg);
list_del_init(&msg->list_head);
- BUG_ON(msg->con == NULL);
- msg->con->ops->put(msg->con);
- msg->con = NULL;
msg->hdr.seq = 0;
ceph_msg_put(msg);
*/
void ceph_msg_revoke_incoming(struct ceph_msg *msg)
{
- struct ceph_connection *con;
+ struct ceph_connection *con = msg->con;
- BUG_ON(msg == NULL);
- if (!msg->con) {
+ if (!con) {
dout("%s msg %p null con\n", __func__, msg);
-
return; /* Message not in our possession */
}
- con = msg->con;
mutex_lock(&con->mutex);
if (con->in_msg == msg) {
unsigned int front_len = le32_to_cpu(con->in_hdr.front_len);
}
if (msg) {
BUG_ON(*skip);
+ msg_con_set(msg, con);
con->in_msg = msg;
- con->in_msg->con = con->ops->get(con);
- BUG_ON(con->in_msg->con == NULL);
} else {
/*
* Null message pointer means either we should skip
dout("%s %p\n", __func__, m);
WARN_ON(!list_empty(&m->list_head));
+ msg_con_set(m, NULL);
+
/* drop middle, data, if any */
if (m->middle) {
ceph_buffer_put(m->middle);