RDS: IB: split send completion handling and do batch ack
authorSantosh Shilimkar <santosh.shilimkar@oracle.com>
Sun, 6 Sep 2015 06:18:51 +0000 (02:18 -0400)
committerSantosh Shilimkar <santosh.shilimkar@oracle.com>
Mon, 5 Oct 2015 18:19:01 +0000 (11:19 -0700)
Similar to what we did with receive CQ completion handling, we split
the transmit completion handler so that it lets us implement batched
work completion handling.

We re-use the cq_poll routine and makes use of RDS_IB_SEND_OP to
identify the send vs receive completion event handler invocation.

Signed-off-by: Santosh Shilimkar <ssantosh@kernel.org>
Signed-off-by: Santosh Shilimkar <santosh.shilimkar@oracle.com>
net/rds/ib.h
net/rds/ib_cm.c
net/rds/ib_send.c
net/rds/ib_stats.c
net/rds/send.c

index 727759b305795c03df6e37dc99af5a7b24b4921d..3a8cd31d4048eda5f14f3533fc1aa3c0719f1bbf 100644 (file)
@@ -25,6 +25,7 @@
 #define RDS_IB_RECYCLE_BATCH_COUNT     32
 
 #define RDS_IB_WC_MAX                  32
+#define RDS_IB_SEND_OP                 BIT_ULL(63)
 
 extern struct rw_semaphore rds_ib_devices_lock;
 extern struct list_head rds_ib_devices;
@@ -118,9 +119,11 @@ struct rds_ib_connection {
        struct ib_pd            *i_pd;
        struct ib_cq            *i_send_cq;
        struct ib_cq            *i_recv_cq;
+       struct ib_wc            i_send_wc[RDS_IB_WC_MAX];
        struct ib_wc            i_recv_wc[RDS_IB_WC_MAX];
 
        /* interrupt handling */
+       struct tasklet_struct   i_send_tasklet;
        struct tasklet_struct   i_recv_tasklet;
 
        /* tx */
@@ -217,7 +220,6 @@ struct rds_ib_device {
 struct rds_ib_statistics {
        uint64_t        s_ib_connect_raced;
        uint64_t        s_ib_listen_closed_stale;
-       uint64_t        s_ib_tx_cq_call;
        uint64_t        s_ib_evt_handler_call;
        uint64_t        s_ib_tasklet_call;
        uint64_t        s_ib_tx_cq_event;
@@ -371,7 +373,7 @@ extern wait_queue_head_t rds_ib_ring_empty_wait;
 void rds_ib_xmit_complete(struct rds_connection *conn);
 int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
                unsigned int hdr_off, unsigned int sg, unsigned int off);
-void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context);
+void rds_ib_send_cqe_handler(struct rds_ib_connection *ic, struct ib_wc *wc);
 void rds_ib_send_init_ring(struct rds_ib_connection *ic);
 void rds_ib_send_clear_ring(struct rds_ib_connection *ic);
 int rds_ib_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op);
index 28e0979720b2b030291d62a4a7bb37baaa0b00d7..8f51d0d26578ec02908380a06d35ad0c574082c5 100644 (file)
@@ -250,11 +250,34 @@ static void poll_cq(struct rds_ib_connection *ic, struct ib_cq *cq,
                        rdsdebug("wc wr_id 0x%llx status %u byte_len %u imm_data %u\n",
                                 (unsigned long long)wc->wr_id, wc->status,
                                 wc->byte_len, be32_to_cpu(wc->ex.imm_data));
-                       rds_ib_recv_cqe_handler(ic, wc, ack_state);
+
+                       if (wc->wr_id & RDS_IB_SEND_OP)
+                               rds_ib_send_cqe_handler(ic, wc);
+                       else
+                               rds_ib_recv_cqe_handler(ic, wc, ack_state);
                }
        }
 }
 
+static void rds_ib_tasklet_fn_send(unsigned long data)
+{
+       struct rds_ib_connection *ic = (struct rds_ib_connection *)data;
+       struct rds_connection *conn = ic->conn;
+       struct rds_ib_ack_state state;
+
+       rds_ib_stats_inc(s_ib_tasklet_call);
+
+       memset(&state, 0, sizeof(state));
+       poll_cq(ic, ic->i_send_cq, ic->i_send_wc, &state);
+       ib_req_notify_cq(ic->i_send_cq, IB_CQ_NEXT_COMP);
+       poll_cq(ic, ic->i_send_cq, ic->i_send_wc, &state);
+
+       if (rds_conn_up(conn) &&
+           (!test_bit(RDS_LL_SEND_FULL, &conn->c_flags) ||
+           test_bit(0, &conn->c_map_queued)))
+               rds_send_xmit(ic->conn);
+}
+
 static void rds_ib_tasklet_fn_recv(unsigned long data)
 {
        struct rds_ib_connection *ic = (struct rds_ib_connection *)data;
@@ -304,6 +327,18 @@ static void rds_ib_qp_event_handler(struct ib_event *event, void *data)
        }
 }
 
+static void rds_ib_cq_comp_handler_send(struct ib_cq *cq, void *context)
+{
+       struct rds_connection *conn = context;
+       struct rds_ib_connection *ic = conn->c_transport_data;
+
+       rdsdebug("conn %p cq %p\n", conn, cq);
+
+       rds_ib_stats_inc(s_ib_evt_handler_call);
+
+       tasklet_schedule(&ic->i_send_tasklet);
+}
+
 /*
  * This needs to be very careful to not leave IS_ERR pointers around for
  * cleanup to trip over.
@@ -337,7 +372,8 @@ static int rds_ib_setup_qp(struct rds_connection *conn)
        ic->i_pd = rds_ibdev->pd;
 
        cq_attr.cqe = ic->i_send_ring.w_nr + 1;
-       ic->i_send_cq = ib_create_cq(dev, rds_ib_send_cq_comp_handler,
+
+       ic->i_send_cq = ib_create_cq(dev, rds_ib_cq_comp_handler_send,
                                     rds_ib_cq_event_handler, conn,
                                     &cq_attr);
        if (IS_ERR(ic->i_send_cq)) {
@@ -703,6 +739,7 @@ void rds_ib_conn_shutdown(struct rds_connection *conn)
                wait_event(rds_ib_ring_empty_wait,
                           rds_ib_ring_empty(&ic->i_recv_ring) &&
                           (atomic_read(&ic->i_signaled_sends) == 0));
+               tasklet_kill(&ic->i_send_tasklet);
                tasklet_kill(&ic->i_recv_tasklet);
 
                /* first destroy the ib state that generates callbacks */
@@ -809,8 +846,10 @@ int rds_ib_conn_alloc(struct rds_connection *conn, gfp_t gfp)
        }
 
        INIT_LIST_HEAD(&ic->ib_node);
+       tasklet_init(&ic->i_send_tasklet, rds_ib_tasklet_fn_send,
+                    (unsigned long)ic);
        tasklet_init(&ic->i_recv_tasklet, rds_ib_tasklet_fn_recv,
-                    (unsigned long) ic);
+                    (unsigned long)ic);
        mutex_init(&ic->i_recv_mutex);
 #ifndef KERNEL_HAS_ATOMIC64
        spin_lock_init(&ic->i_ack_lock);
index 4e88047086b6e10c62485f95b63f397900c32b28..670882c752e9470e6016fc51b0375006f4a94780 100644 (file)
@@ -195,7 +195,7 @@ void rds_ib_send_init_ring(struct rds_ib_connection *ic)
 
                send->s_op = NULL;
 
-               send->s_wr.wr_id = i;
+               send->s_wr.wr_id = i | RDS_IB_SEND_OP;
                send->s_wr.sg_list = send->s_sge;
                send->s_wr.ex.imm_data = 0;
 
@@ -237,81 +237,73 @@ static void rds_ib_sub_signaled(struct rds_ib_connection *ic, int nr)
  * unallocs the next free entry in the ring it doesn't alter which is
  * the next to be freed, which is what this is concerned with.
  */
-void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context)
+void rds_ib_send_cqe_handler(struct rds_ib_connection *ic, struct ib_wc *wc)
 {
-       struct rds_connection *conn = context;
-       struct rds_ib_connection *ic = conn->c_transport_data;
        struct rds_message *rm = NULL;
-       struct ib_wc wc;
+       struct rds_connection *conn = ic->conn;
        struct rds_ib_send_work *send;
        u32 completed;
        u32 oldest;
        u32 i = 0;
-       int ret;
        int nr_sig = 0;
 
-       rdsdebug("cq %p conn %p\n", cq, conn);
-       rds_ib_stats_inc(s_ib_tx_cq_call);
-       ret = ib_req_notify_cq(cq, IB_CQ_NEXT_COMP);
-       if (ret)
-               rdsdebug("ib_req_notify_cq send failed: %d\n", ret);
-
-       while (ib_poll_cq(cq, 1, &wc) > 0) {
-               rdsdebug("wc wr_id 0x%llx status %u (%s) byte_len %u imm_data %u\n",
-                        (unsigned long long)wc.wr_id, wc.status,
-                        ib_wc_status_msg(wc.status), wc.byte_len,
-                        be32_to_cpu(wc.ex.imm_data));
-               rds_ib_stats_inc(s_ib_tx_cq_event);
-
-               if (wc.wr_id == RDS_IB_ACK_WR_ID) {
-                       if (time_after(jiffies, ic->i_ack_queued + HZ/2))
-                               rds_ib_stats_inc(s_ib_tx_stalled);
-                       rds_ib_ack_send_complete(ic);
-                       continue;
-               }
 
-               oldest = rds_ib_ring_oldest(&ic->i_send_ring);
+       rdsdebug("wc wr_id 0x%llx status %u (%s) byte_len %u imm_data %u\n",
+                (unsigned long long)wc->wr_id, wc->status,
+                ib_wc_status_msg(wc->status), wc->byte_len,
+                be32_to_cpu(wc->ex.imm_data));
+       rds_ib_stats_inc(s_ib_tx_cq_event);
 
-               completed = rds_ib_ring_completed(&ic->i_send_ring, wc.wr_id, oldest);
+       if (wc->wr_id == RDS_IB_ACK_WR_ID) {
+               if (time_after(jiffies, ic->i_ack_queued + HZ / 2))
+                       rds_ib_stats_inc(s_ib_tx_stalled);
+               rds_ib_ack_send_complete(ic);
+               return;
+       }
 
-               for (i = 0; i < completed; i++) {
-                       send = &ic->i_sends[oldest];
-                       if (send->s_wr.send_flags & IB_SEND_SIGNALED)
-                               nr_sig++;
+       oldest = rds_ib_ring_oldest(&ic->i_send_ring);
 
-                       rm = rds_ib_send_unmap_op(ic, send, wc.status);
+       completed = rds_ib_ring_completed(&ic->i_send_ring,
+                                         (wc->wr_id & ~RDS_IB_SEND_OP),
+                                         oldest);
 
-                       if (time_after(jiffies, send->s_queued + HZ/2))
-                               rds_ib_stats_inc(s_ib_tx_stalled);
+       for (i = 0; i < completed; i++) {
+               send = &ic->i_sends[oldest];
+               if (send->s_wr.send_flags & IB_SEND_SIGNALED)
+                       nr_sig++;
 
-                       if (send->s_op) {
-                               if (send->s_op == rm->m_final_op) {
-                                       /* If anyone waited for this message to get flushed out, wake
-                                        * them up now */
-                                       rds_message_unmapped(rm);
-                               }
-                               rds_message_put(rm);
-                               send->s_op = NULL;
-                       }
+               rm = rds_ib_send_unmap_op(ic, send, wc->status);
 
-                       oldest = (oldest + 1) % ic->i_send_ring.w_nr;
-               }
+               if (time_after(jiffies, send->s_queued + HZ / 2))
+                       rds_ib_stats_inc(s_ib_tx_stalled);
 
-               rds_ib_ring_free(&ic->i_send_ring, completed);
-               rds_ib_sub_signaled(ic, nr_sig);
-               nr_sig = 0;
-
-               if (test_and_clear_bit(RDS_LL_SEND_FULL, &conn->c_flags) ||
-                   test_bit(0, &conn->c_map_queued))
-                       queue_delayed_work(rds_wq, &conn->c_send_w, 0);
-
-               /* We expect errors as the qp is drained during shutdown */
-               if (wc.status != IB_WC_SUCCESS && rds_conn_up(conn)) {
-                       rds_ib_conn_error(conn, "send completion on %pI4 had status "
-                                         "%u (%s), disconnecting and reconnecting\n",
-                                         &conn->c_faddr, wc.status,
-                                         ib_wc_status_msg(wc.status));
+               if (send->s_op) {
+                       if (send->s_op == rm->m_final_op) {
+                               /* If anyone waited for this message to get
+                                * flushed out, wake them up now
+                                */
+                               rds_message_unmapped(rm);
+                       }
+                       rds_message_put(rm);
+                       send->s_op = NULL;
                }
+
+               oldest = (oldest + 1) % ic->i_send_ring.w_nr;
+       }
+
+       rds_ib_ring_free(&ic->i_send_ring, completed);
+       rds_ib_sub_signaled(ic, nr_sig);
+       nr_sig = 0;
+
+       if (test_and_clear_bit(RDS_LL_SEND_FULL, &conn->c_flags) ||
+           test_bit(0, &conn->c_map_queued))
+               queue_delayed_work(rds_wq, &conn->c_send_w, 0);
+
+       /* We expect errors as the qp is drained during shutdown */
+       if (wc->status != IB_WC_SUCCESS && rds_conn_up(conn)) {
+               rds_ib_conn_error(conn, "send completion on %pI4 had status %u (%s), disconnecting and reconnecting\n",
+                                 &conn->c_faddr, wc->status,
+                                 ib_wc_status_msg(wc->status));
        }
 }
 
index bdf6115ef6e1b8ec82ef7f923f963f922ba83a86..8c8b84f7bfbb9a39fb191c5f7a50df36b6f3e3f2 100644 (file)
@@ -43,7 +43,6 @@ static const char *const rds_ib_stat_names[] = {
        "ib_connect_raced",
        "ib_listen_closed_stale",
        "s_ib_evt_handler_call",
-       "ib_tx_cq_call",
        "ib_tasklet_call",
        "ib_tx_cq_event",
        "ib_tx_ring_full",
index a081a6478e67ab6df2f12075572df28a971939ac..ee49c2556f4715ee7ad16cc4a4e376b9467af842 100644 (file)
@@ -432,6 +432,7 @@ over_batch:
 out:
        return ret;
 }
+EXPORT_SYMBOL_GPL(rds_send_xmit);
 
 static void rds_send_sndbuf_remove(struct rds_sock *rs, struct rds_message *rm)
 {