RDS: Pass rds_conn_path to rds_send_xmit()
authorSowmini Varadhan <sowmini.varadhan@oracle.com>
Mon, 13 Jun 2016 16:44:34 +0000 (09:44 -0700)
committerDavid S. Miller <davem@davemloft.net>
Wed, 15 Jun 2016 06:50:43 +0000 (23:50 -0700)
Pass a struct rds_conn_path to rds_send_xmit so that MP capable
transports can transmit packets on something other than c_path[0].
The eventual goal for MP capable transports is to hash the rds
socket to a path based on the bound local address/port, and use
this path as the argument to rds_send_xmit()

Signed-off-by: Sowmini Varadhan <sowmini.varadhan@oracle.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
net/rds/ib_cm.c
net/rds/rds.h
net/rds/send.c
net/rds/threads.c

index 4de5a35f5c4054aec0ec64cfd587fe1be0d520cf..334287602b780b283a6b557dbc2db77337ee2e45 100644 (file)
@@ -274,7 +274,7 @@ static void rds_ib_tasklet_fn_send(unsigned long data)
        if (rds_conn_up(conn) &&
            (!test_bit(RDS_LL_SEND_FULL, &conn->c_flags) ||
            test_bit(0, &conn->c_map_queued)))
-               rds_send_xmit(ic->conn);
+               rds_send_xmit(&ic->conn->c_path[0]);
 }
 
 static void poll_rcq(struct rds_ib_connection *ic, struct ib_cq *cq,
index 2cffd37a550f5ce5bcf27963849e5f8d69fde871..b6072eb05fb637e59445d7b27194a6767e301373 100644 (file)
@@ -457,7 +457,9 @@ struct rds_transport {
        int (*conn_connect)(struct rds_connection *conn);
        void (*conn_shutdown)(struct rds_connection *conn);
        void (*xmit_prepare)(struct rds_connection *conn);
+       void (*xmit_path_prepare)(struct rds_conn_path *cp);
        void (*xmit_complete)(struct rds_connection *conn);
+       void (*xmit_path_complete)(struct rds_conn_path *cp);
        int (*xmit)(struct rds_connection *conn, struct rds_message *rm,
                    unsigned int hdr_off, unsigned int sg, unsigned int off);
        int (*xmit_rdma)(struct rds_connection *conn, struct rm_rdma_op *op);
@@ -780,7 +782,7 @@ void rds_inc_info_copy(struct rds_incoming *inc,
 /* send.c */
 int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len);
 void rds_send_reset(struct rds_connection *conn);
-int rds_send_xmit(struct rds_connection *conn);
+int rds_send_xmit(struct rds_conn_path *cp);
 struct sockaddr_in;
 void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in *dest);
 typedef int (*is_acked_func)(struct rds_message *rm, uint64_t ack);
index 076ee413d21cb6387f137a4444d413c3e62ece89..966311d135af4c5e869e242b29a864524b6ed9c6 100644 (file)
@@ -107,14 +107,14 @@ void rds_send_reset(struct rds_connection *conn)
 }
 EXPORT_SYMBOL_GPL(rds_send_reset);
 
-static int acquire_in_xmit(struct rds_connection *conn)
+static int acquire_in_xmit(struct rds_conn_path *cp)
 {
-       return test_and_set_bit(RDS_IN_XMIT, &conn->c_flags) == 0;
+       return test_and_set_bit(RDS_IN_XMIT, &cp->cp_flags) == 0;
 }
 
-static void release_in_xmit(struct rds_connection *conn)
+static void release_in_xmit(struct rds_conn_path *cp)
 {
-       clear_bit(RDS_IN_XMIT, &conn->c_flags);
+       clear_bit(RDS_IN_XMIT, &cp->cp_flags);
        smp_mb__after_atomic();
        /*
         * We don't use wait_on_bit()/wake_up_bit() because our waking is in a
@@ -122,8 +122,8 @@ static void release_in_xmit(struct rds_connection *conn)
         * the system-wide hashed waitqueue buckets in the fast path only to
         * almost never find waiters.
         */
-       if (waitqueue_active(&conn->c_waitq))
-               wake_up_all(&conn->c_waitq);
+       if (waitqueue_active(&cp->cp_waitq))
+               wake_up_all(&cp->cp_waitq);
 }
 
 /*
@@ -140,8 +140,9 @@ static void release_in_xmit(struct rds_connection *conn)
  *      - small message latency is higher behind queued large messages
  *      - large message latency isn't starved by intervening small sends
  */
-int rds_send_xmit(struct rds_connection *conn)
+int rds_send_xmit(struct rds_conn_path *cp)
 {
+       struct rds_connection *conn = cp->cp_conn;
        struct rds_message *rm;
        unsigned long flags;
        unsigned int tmp;
@@ -161,7 +162,7 @@ restart:
         * avoids blocking the caller and trading per-connection data between
         * caches per message.
         */
-       if (!acquire_in_xmit(conn)) {
+       if (!acquire_in_xmit(cp)) {
                rds_stats_inc(s_send_lock_contention);
                ret = -ENOMEM;
                goto out;
@@ -175,21 +176,25 @@ restart:
         * The acquire_in_xmit() check above ensures that only one
         * caller can increment c_send_gen at any time.
         */
-       conn->c_send_gen++;
-       send_gen = conn->c_send_gen;
+       cp->cp_send_gen++;
+       send_gen = cp->cp_send_gen;
 
        /*
         * rds_conn_shutdown() sets the conn state and then tests RDS_IN_XMIT,
         * we do the opposite to avoid races.
         */
-       if (!rds_conn_up(conn)) {
-               release_in_xmit(conn);
+       if (!rds_conn_path_up(cp)) {
+               release_in_xmit(cp);
                ret = 0;
                goto out;
        }
 
-       if (conn->c_trans->xmit_prepare)
+       if (conn->c_trans->t_mp_capable) {
+               if (conn->c_trans->xmit_path_prepare)
+                       conn->c_trans->xmit_path_prepare(cp);
+       } else if (conn->c_trans->xmit_prepare) {
                conn->c_trans->xmit_prepare(conn);
+       }
 
        /*
         * spin trying to push headers and data down the connection until
@@ -197,7 +202,7 @@ restart:
         */
        while (1) {
 
-               rm = conn->c_xmit_rm;
+               rm = cp->cp_xmit_rm;
 
                /*
                 * If between sending messages, we can send a pending congestion
@@ -210,14 +215,16 @@ restart:
                                break;
                        }
                        rm->data.op_active = 1;
+                       rm->m_inc.i_conn_path = cp;
+                       rm->m_inc.i_conn = cp->cp_conn;
 
-                       conn->c_xmit_rm = rm;
+                       cp->cp_xmit_rm = rm;
                }
 
                /*
                 * If not already working on one, grab the next message.
                 *
-                * c_xmit_rm holds a ref while we're sending this message down
+                * cp_xmit_rm holds a ref while we're sending this message down
                 * the connction.  We can use this ref while holding the
                 * send_sem.. rds_send_reset() is serialized with it.
                 */
@@ -234,10 +241,10 @@ restart:
                        if (batch_count >= send_batch_count)
                                goto over_batch;
 
-                       spin_lock_irqsave(&conn->c_lock, flags);
+                       spin_lock_irqsave(&cp->cp_lock, flags);
 
-                       if (!list_empty(&conn->c_send_queue)) {
-                               rm = list_entry(conn->c_send_queue.next,
+                       if (!list_empty(&cp->cp_send_queue)) {
+                               rm = list_entry(cp->cp_send_queue.next,
                                                struct rds_message,
                                                m_conn_item);
                                rds_message_addref(rm);
@@ -246,10 +253,11 @@ restart:
                                 * Move the message from the send queue to the retransmit
                                 * list right away.
                                 */
-                               list_move_tail(&rm->m_conn_item, &conn->c_retrans);
+                               list_move_tail(&rm->m_conn_item,
+                                              &cp->cp_retrans);
                        }
 
-                       spin_unlock_irqrestore(&conn->c_lock, flags);
+                       spin_unlock_irqrestore(&cp->cp_lock, flags);
 
                        if (!rm)
                                break;
@@ -263,32 +271,34 @@ restart:
                         */
                        if (rm->rdma.op_active &&
                            test_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags)) {
-                               spin_lock_irqsave(&conn->c_lock, flags);
+                               spin_lock_irqsave(&cp->cp_lock, flags);
                                if (test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags))
                                        list_move(&rm->m_conn_item, &to_be_dropped);
-                               spin_unlock_irqrestore(&conn->c_lock, flags);
+                               spin_unlock_irqrestore(&cp->cp_lock, flags);
                                continue;
                        }
 
                        /* Require an ACK every once in a while */
                        len = ntohl(rm->m_inc.i_hdr.h_len);
-                       if (conn->c_unacked_packets == 0 ||
-                           conn->c_unacked_bytes < len) {
+                       if (cp->cp_unacked_packets == 0 ||
+                           cp->cp_unacked_bytes < len) {
                                __set_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags);
 
-                               conn->c_unacked_packets = rds_sysctl_max_unacked_packets;
-                               conn->c_unacked_bytes = rds_sysctl_max_unacked_bytes;
+                               cp->cp_unacked_packets =
+                                       rds_sysctl_max_unacked_packets;
+                               cp->cp_unacked_bytes =
+                                       rds_sysctl_max_unacked_bytes;
                                rds_stats_inc(s_send_ack_required);
                        } else {
-                               conn->c_unacked_bytes -= len;
-                               conn->c_unacked_packets--;
+                               cp->cp_unacked_bytes -= len;
+                               cp->cp_unacked_packets--;
                        }
 
-                       conn->c_xmit_rm = rm;
+                       cp->cp_xmit_rm = rm;
                }
 
                /* The transport either sends the whole rdma or none of it */
-               if (rm->rdma.op_active && !conn->c_xmit_rdma_sent) {
+               if (rm->rdma.op_active && !cp->cp_xmit_rdma_sent) {
                        rm->m_final_op = &rm->rdma;
                        /* The transport owns the mapped memory for now.
                         * You can't unmap it while it's on the send queue
@@ -300,11 +310,11 @@ restart:
                                wake_up_interruptible(&rm->m_flush_wait);
                                break;
                        }
-                       conn->c_xmit_rdma_sent = 1;
+                       cp->cp_xmit_rdma_sent = 1;
 
                }
 
-               if (rm->atomic.op_active && !conn->c_xmit_atomic_sent) {
+               if (rm->atomic.op_active && !cp->cp_xmit_atomic_sent) {
                        rm->m_final_op = &rm->atomic;
                        /* The transport owns the mapped memory for now.
                         * You can't unmap it while it's on the send queue
@@ -316,7 +326,7 @@ restart:
                                wake_up_interruptible(&rm->m_flush_wait);
                                break;
                        }
-                       conn->c_xmit_atomic_sent = 1;
+                       cp->cp_xmit_atomic_sent = 1;
 
                }
 
@@ -342,41 +352,42 @@ restart:
                                rm->data.op_active = 0;
                }
 
-               if (rm->data.op_active && !conn->c_xmit_data_sent) {
+               if (rm->data.op_active && !cp->cp_xmit_data_sent) {
                        rm->m_final_op = &rm->data;
+
                        ret = conn->c_trans->xmit(conn, rm,
-                                                 conn->c_xmit_hdr_off,
-                                                 conn->c_xmit_sg,
-                                                 conn->c_xmit_data_off);
+                                                 cp->cp_xmit_hdr_off,
+                                                 cp->cp_xmit_sg,
+                                                 cp->cp_xmit_data_off);
                        if (ret <= 0)
                                break;
 
-                       if (conn->c_xmit_hdr_off < sizeof(struct rds_header)) {
+                       if (cp->cp_xmit_hdr_off < sizeof(struct rds_header)) {
                                tmp = min_t(int, ret,
                                            sizeof(struct rds_header) -
-                                           conn->c_xmit_hdr_off);
-                               conn->c_xmit_hdr_off += tmp;
+                                           cp->cp_xmit_hdr_off);
+                               cp->cp_xmit_hdr_off += tmp;
                                ret -= tmp;
                        }
 
-                       sg = &rm->data.op_sg[conn->c_xmit_sg];
+                       sg = &rm->data.op_sg[cp->cp_xmit_sg];
                        while (ret) {
                                tmp = min_t(int, ret, sg->length -
-                                                     conn->c_xmit_data_off);
-                               conn->c_xmit_data_off += tmp;
+                                                     cp->cp_xmit_data_off);
+                               cp->cp_xmit_data_off += tmp;
                                ret -= tmp;
-                               if (conn->c_xmit_data_off == sg->length) {
-                                       conn->c_xmit_data_off = 0;
+                               if (cp->cp_xmit_data_off == sg->length) {
+                                       cp->cp_xmit_data_off = 0;
                                        sg++;
-                                       conn->c_xmit_sg++;
-                                       BUG_ON(ret != 0 &&
-                                              conn->c_xmit_sg == rm->data.op_nents);
+                                       cp->cp_xmit_sg++;
+                                       BUG_ON(ret != 0 && cp->cp_xmit_sg ==
+                                              rm->data.op_nents);
                                }
                        }
 
-                       if (conn->c_xmit_hdr_off == sizeof(struct rds_header) &&
-                           (conn->c_xmit_sg == rm->data.op_nents))
-                               conn->c_xmit_data_sent = 1;
+                       if (cp->cp_xmit_hdr_off == sizeof(struct rds_header) &&
+                           (cp->cp_xmit_sg == rm->data.op_nents))
+                               cp->cp_xmit_data_sent = 1;
                }
 
                /*
@@ -384,23 +395,27 @@ restart:
                 * if there is a data op. Thus, if the data is sent (or there was
                 * none), then we're done with the rm.
                 */
-               if (!rm->data.op_active || conn->c_xmit_data_sent) {
-                       conn->c_xmit_rm = NULL;
-                       conn->c_xmit_sg = 0;
-                       conn->c_xmit_hdr_off = 0;
-                       conn->c_xmit_data_off = 0;
-                       conn->c_xmit_rdma_sent = 0;
-                       conn->c_xmit_atomic_sent = 0;
-                       conn->c_xmit_data_sent = 0;
+               if (!rm->data.op_active || cp->cp_xmit_data_sent) {
+                       cp->cp_xmit_rm = NULL;
+                       cp->cp_xmit_sg = 0;
+                       cp->cp_xmit_hdr_off = 0;
+                       cp->cp_xmit_data_off = 0;
+                       cp->cp_xmit_rdma_sent = 0;
+                       cp->cp_xmit_atomic_sent = 0;
+                       cp->cp_xmit_data_sent = 0;
 
                        rds_message_put(rm);
                }
        }
 
 over_batch:
-       if (conn->c_trans->xmit_complete)
+       if (conn->c_trans->t_mp_capable) {
+               if (conn->c_trans->xmit_path_complete)
+                       conn->c_trans->xmit_path_complete(cp);
+       } else if (conn->c_trans->xmit_complete) {
                conn->c_trans->xmit_complete(conn);
-       release_in_xmit(conn);
+       }
+       release_in_xmit(cp);
 
        /* Nuke any messages we decided not to retransmit. */
        if (!list_empty(&to_be_dropped)) {
@@ -428,12 +443,12 @@ over_batch:
        if (ret == 0) {
                smp_mb();
                if ((test_bit(0, &conn->c_map_queued) ||
-                    !list_empty(&conn->c_send_queue)) &&
-                   send_gen == conn->c_send_gen) {
+                    !list_empty(&cp->cp_send_queue)) &&
+                   send_gen == cp->cp_send_gen) {
                        rds_stats_inc(s_send_lock_queue_raced);
                        if (batch_count < send_batch_count)
                                goto restart;
-                       queue_delayed_work(rds_wq, &conn->c_send_w, 1);
+                       queue_delayed_work(rds_wq, &cp->cp_send_w, 1);
                }
        }
 out:
@@ -1110,9 +1125,9 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
         */
        rds_stats_inc(s_send_queued);
 
-       ret = rds_send_xmit(conn);
+       ret = rds_send_xmit(cpath);
        if (ret == -ENOMEM || ret == -EAGAIN)
-               queue_delayed_work(rds_wq, &conn->c_send_w, 1);
+               queue_delayed_work(rds_wq, &cpath->cp_send_w, 1);
 
        rds_message_put(rm);
        return payload_len;
index 6d0979b8dc6366169f2fda404f397d08b90fa52a..50d26576dee7acd8f1ad4ed29d384a8a8e778225 100644 (file)
@@ -177,7 +177,7 @@ void rds_send_worker(struct work_struct *work)
 
        if (rds_conn_path_state(cp) == RDS_CONN_UP) {
                clear_bit(RDS_LL_SEND_FULL, &cp->cp_flags);
-               ret = rds_send_xmit(cp->cp_conn);
+               ret = rds_send_xmit(cp);
                cond_resched();
                rdsdebug("conn %p ret %d\n", cp->cp_conn, ret);
                switch (ret) {