IPoIB/cm: Add connected mode support for devices without SRQs
authorPradeep Satyanarayana <pradeeps@linux.vnet.ibm.com>
Fri, 25 Jan 2008 22:15:24 +0000 (14:15 -0800)
committerRoland Dreier <rolandd@cisco.com>
Fri, 25 Jan 2008 22:15:24 +0000 (14:15 -0800)
Some IB adapters (notably IBM's eHCA) do not implement SRQs (shared
receive queues).  The current IPoIB connected mode support only works
on devices that support SRQs.

Fix this by adding support for using the receive queue of each
connected mode receive QP.  The disadvantage of this compared to using
an SRQ is that it means a full queue of receives must be posted for
each remote connected mode peer, which means that total memory usage
is potentially much higher than when using SRQs.  To manage this, add
a new module parameter "max_nonsrq_conn_qp" that limits the number of
connections allowed per interface.

The rest of the changes are fairly straightforward: we use a table of
struct ipoib_cm_rx to hold all the active connections, and put the
table index of the connection in the high bits of receive WR IDs.
This is needed because we cannot rely on the struct ib_wc.qp field for
non-SRQ receive completions.  Most of the rest of the changes just
test whether or not an SRQ is available, and post receives or find
received packets in the right place depending on the answer.

Cleaning up dead connections actually becomes simpler, because we do
not have to do the "last WQE reached" dance that is required to
destroy QPs attached to an SRQ.  We just move the QP to the error
state and wait for all pending receives to be flushed.

Signed-off-by: Pradeep Satyanarayana <pradeeps@linux.vnet.ibm.com>
[ Completely rewritten and split up, based on Pradeep's work.  Several
  bugs fixed and no doubt several bugs introduced.  - Roland ]

Signed-off-by: Roland Dreier <rolandd@cisco.com>
drivers/infiniband/ulp/ipoib/ipoib.h
drivers/infiniband/ulp/ipoib/ipoib_cm.c
drivers/infiniband/ulp/ipoib/ipoib_main.c
drivers/infiniband/ulp/ipoib/ipoib_verbs.c

index a376fb6ffa0edfb935e5d75fb76dde0226cc9c95..d35025f0652b5c85d2a48942c4ae6b473f024d25 100644 (file)
@@ -69,6 +69,7 @@ enum {
        IPOIB_TX_RING_SIZE        = 64,
        IPOIB_MAX_QUEUE_SIZE      = 8192,
        IPOIB_MIN_QUEUE_SIZE      = 2,
+       IPOIB_CM_MAX_CONN_QP      = 4096,
 
        IPOIB_NUM_WC              = 4,
 
@@ -188,10 +189,12 @@ enum ipoib_cm_state {
 struct ipoib_cm_rx {
        struct ib_cm_id        *id;
        struct ib_qp           *qp;
+       struct ipoib_cm_rx_buf *rx_ring;
        struct list_head        list;
        struct net_device      *dev;
        unsigned long           jiffies;
        enum ipoib_cm_state     state;
+       int                     recv_count;
 };
 
 struct ipoib_cm_tx {
@@ -234,6 +237,7 @@ struct ipoib_cm_dev_priv {
        struct ib_wc            ibwc[IPOIB_NUM_WC];
        struct ib_sge           rx_sge[IPOIB_CM_RX_SG];
        struct ib_recv_wr       rx_wr;
+       int                     nonsrq_conn_qp;
 };
 
 /*
@@ -461,6 +465,8 @@ void ipoib_drain_cq(struct net_device *dev);
 /* We don't support UC connections at the moment */
 #define IPOIB_CM_SUPPORTED(ha)   (ha[0] & (IPOIB_FLAGS_RC))
 
+extern int ipoib_max_conn_qp;
+
 static inline int ipoib_cm_admin_enabled(struct net_device *dev)
 {
        struct ipoib_dev_priv *priv = netdev_priv(dev);
@@ -491,6 +497,12 @@ static inline void ipoib_cm_set(struct ipoib_neigh *neigh, struct ipoib_cm_tx *t
        neigh->cm = tx;
 }
 
+static inline int ipoib_cm_has_srq(struct net_device *dev)
+{
+       struct ipoib_dev_priv *priv = netdev_priv(dev);
+       return !!priv->cm.srq;
+}
+
 void ipoib_cm_send(struct net_device *dev, struct sk_buff *skb, struct ipoib_cm_tx *tx);
 int ipoib_cm_dev_open(struct net_device *dev);
 void ipoib_cm_dev_stop(struct net_device *dev);
@@ -508,6 +520,8 @@ void ipoib_cm_handle_tx_wc(struct net_device *dev, struct ib_wc *wc);
 
 struct ipoib_cm_tx;
 
+#define ipoib_max_conn_qp 0
+
 static inline int ipoib_cm_admin_enabled(struct net_device *dev)
 {
        return 0;
@@ -533,6 +547,11 @@ static inline void ipoib_cm_set(struct ipoib_neigh *neigh, struct ipoib_cm_tx *t
 {
 }
 
+static inline int ipoib_cm_has_srq(struct net_device *dev)
+{
+       return 0;
+}
+
 static inline
 void ipoib_cm_send(struct net_device *dev, struct sk_buff *skb, struct ipoib_cm_tx *tx)
 {
index 75717a9cbcdc28ca29f9494cfc6ae9f6dd381e0d..fdf33cecc6d584af9701c9f9c2e0253479727a8c 100644 (file)
 #include <linux/icmpv6.h>
 #include <linux/delay.h>
 
+#include "ipoib.h"
+
+int ipoib_max_conn_qp = 128;
+
+module_param_named(max_nonsrq_conn_qp, ipoib_max_conn_qp, int, 0444);
+MODULE_PARM_DESC(max_nonsrq_conn_qp,
+                "Max number of connected-mode QPs per interface "
+                "(applied only if shared receive queue is not available)");
+
 #ifdef CONFIG_INFINIBAND_IPOIB_DEBUG_DATA
 static int data_debug_level;
 
@@ -47,8 +56,6 @@ MODULE_PARM_DESC(cm_data_debug_level,
                 "Enable data path debug tracing for connected mode if > 0");
 #endif
 
-#include "ipoib.h"
-
 #define IPOIB_CM_IETF_ID 0x1000000000000000ULL
 
 #define IPOIB_CM_RX_UPDATE_TIME (256 * HZ)
@@ -81,7 +88,7 @@ static void ipoib_cm_dma_unmap_rx(struct ipoib_dev_priv *priv, int frags,
                ib_dma_unmap_single(priv->ca, mapping[i + 1], PAGE_SIZE, DMA_FROM_DEVICE);
 }
 
-static int ipoib_cm_post_receive(struct net_device *dev, int id)
+static int ipoib_cm_post_receive_srq(struct net_device *dev, int id)
 {
        struct ipoib_dev_priv *priv = netdev_priv(dev);
        struct ib_recv_wr *bad_wr;
@@ -104,7 +111,33 @@ static int ipoib_cm_post_receive(struct net_device *dev, int id)
        return ret;
 }
 
-static struct sk_buff *ipoib_cm_alloc_rx_skb(struct net_device *dev, int id, int frags,
+static int ipoib_cm_post_receive_nonsrq(struct net_device *dev,
+                                       struct ipoib_cm_rx *rx, int id)
+{
+       struct ipoib_dev_priv *priv = netdev_priv(dev);
+       struct ib_recv_wr *bad_wr;
+       int i, ret;
+
+       priv->cm.rx_wr.wr_id = id | IPOIB_OP_CM | IPOIB_OP_RECV;
+
+       for (i = 0; i < IPOIB_CM_RX_SG; ++i)
+               priv->cm.rx_sge[i].addr = rx->rx_ring[id].mapping[i];
+
+       ret = ib_post_recv(rx->qp, &priv->cm.rx_wr, &bad_wr);
+       if (unlikely(ret)) {
+               ipoib_warn(priv, "post recv failed for buf %d (%d)\n", id, ret);
+               ipoib_cm_dma_unmap_rx(priv, IPOIB_CM_RX_SG - 1,
+                                     rx->rx_ring[id].mapping);
+               dev_kfree_skb_any(rx->rx_ring[id].skb);
+               rx->rx_ring[id].skb = NULL;
+       }
+
+       return ret;
+}
+
+static struct sk_buff *ipoib_cm_alloc_rx_skb(struct net_device *dev,
+                                            struct ipoib_cm_rx_buf *rx_ring,
+                                            int id, int frags,
                                             u64 mapping[IPOIB_CM_RX_SG])
 {
        struct ipoib_dev_priv *priv = netdev_priv(dev);
@@ -141,7 +174,7 @@ static struct sk_buff *ipoib_cm_alloc_rx_skb(struct net_device *dev, int id, int
                        goto partial_error;
        }
 
-       priv->cm.srq_ring[id].skb = skb;
+       rx_ring[id].skb = skb;
        return skb;
 
 partial_error:
@@ -224,12 +257,18 @@ static struct ib_qp *ipoib_cm_create_rx_qp(struct net_device *dev,
                .qp_type = IB_QPT_RC,
                .qp_context = p,
        };
+
+       if (!ipoib_cm_has_srq(dev)) {
+               attr.cap.max_recv_wr  = ipoib_recvq_size;
+               attr.cap.max_recv_sge = IPOIB_CM_RX_SG;
+       }
+
        return ib_create_qp(priv->pd, &attr);
 }
 
 static int ipoib_cm_modify_rx_qp(struct net_device *dev,
-                                 struct ib_cm_id *cm_id, struct ib_qp *qp,
-                                 unsigned psn)
+                                struct ib_cm_id *cm_id, struct ib_qp *qp,
+                                unsigned psn)
 {
        struct ipoib_dev_priv *priv = netdev_priv(dev);
        struct ib_qp_attr qp_attr;
@@ -282,6 +321,60 @@ static int ipoib_cm_modify_rx_qp(struct net_device *dev,
        return 0;
 }
 
+static int ipoib_cm_nonsrq_init_rx(struct net_device *dev, struct ib_cm_id *cm_id,
+                                  struct ipoib_cm_rx *rx)
+{
+       struct ipoib_dev_priv *priv = netdev_priv(dev);
+       int ret;
+       int i;
+
+       rx->rx_ring = kcalloc(ipoib_recvq_size, sizeof *rx->rx_ring, GFP_KERNEL);
+       if (!rx->rx_ring)
+               return -ENOMEM;
+
+       spin_lock_irq(&priv->lock);
+
+       if (priv->cm.nonsrq_conn_qp >= ipoib_max_conn_qp) {
+               spin_unlock_irq(&priv->lock);
+               ib_send_cm_rej(cm_id, IB_CM_REJ_NO_QP, NULL, 0, NULL, 0);
+               ret = -EINVAL;
+               goto err_free;
+       } else
+               ++priv->cm.nonsrq_conn_qp;
+
+       spin_unlock_irq(&priv->lock);
+
+       for (i = 0; i < ipoib_recvq_size; ++i) {
+               if (!ipoib_cm_alloc_rx_skb(dev, rx->rx_ring, i, IPOIB_CM_RX_SG - 1,
+                                          rx->rx_ring[i].mapping)) {
+                       ipoib_warn(priv, "failed to allocate receive buffer %d\n", i);
+                               ret = -ENOMEM;
+                               goto err_count;
+                       }
+               ret = ipoib_cm_post_receive_nonsrq(dev, rx, i);
+               if (ret) {
+                       ipoib_warn(priv, "ipoib_cm_post_receive_nonsrq "
+                                  "failed for buf %d\n", i);
+                       ret = -EIO;
+                       goto err_count;
+               }
+       }
+
+       rx->recv_count = ipoib_recvq_size;
+
+       return 0;
+
+err_count:
+       spin_lock_irq(&priv->lock);
+       --priv->cm.nonsrq_conn_qp;
+       spin_unlock_irq(&priv->lock);
+
+err_free:
+       ipoib_cm_free_rx_ring(dev, rx->rx_ring);
+
+       return ret;
+}
+
 static int ipoib_cm_send_rep(struct net_device *dev, struct ib_cm_id *cm_id,
                             struct ib_qp *qp, struct ib_cm_req_event_param *req,
                             unsigned psn)
@@ -297,7 +390,7 @@ static int ipoib_cm_send_rep(struct net_device *dev, struct ib_cm_id *cm_id,
        rep.private_data_len = sizeof data;
        rep.flow_control = 0;
        rep.rnr_retry_count = req->rnr_retry_count;
-       rep.srq = 1;
+       rep.srq = ipoib_cm_has_srq(dev);
        rep.qp_num = qp->qp_num;
        rep.starting_psn = psn;
        return ib_send_cm_rep(cm_id, &rep);
@@ -333,6 +426,12 @@ static int ipoib_cm_req_handler(struct ib_cm_id *cm_id, struct ib_cm_event *even
        if (ret)
                goto err_modify;
 
+       if (!ipoib_cm_has_srq(dev)) {
+               ret = ipoib_cm_nonsrq_init_rx(dev, cm_id, p);
+               if (ret)
+                       goto err_modify;
+       }
+
        spin_lock_irq(&priv->lock);
        queue_delayed_work(ipoib_workqueue,
                           &priv->cm.stale_task, IPOIB_CM_RX_DELAY);
@@ -417,12 +516,14 @@ static void skb_put_frags(struct sk_buff *skb, unsigned int hdr_space,
 void ipoib_cm_handle_rx_wc(struct net_device *dev, struct ib_wc *wc)
 {
        struct ipoib_dev_priv *priv = netdev_priv(dev);
+       struct ipoib_cm_rx_buf *rx_ring;
        unsigned int wr_id = wc->wr_id & ~(IPOIB_OP_CM | IPOIB_OP_RECV);
        struct sk_buff *skb, *newskb;
        struct ipoib_cm_rx *p;
        unsigned long flags;
        u64 mapping[IPOIB_CM_RX_SG];
        int frags;
+       int has_srq;
 
        ipoib_dbg_data(priv, "cm recv completion: id %d, status: %d\n",
                       wr_id, wc->status);
@@ -440,18 +541,32 @@ void ipoib_cm_handle_rx_wc(struct net_device *dev, struct ib_wc *wc)
                return;
        }
 
-       skb  = priv->cm.srq_ring[wr_id].skb;
+       p = wc->qp->qp_context;
+
+       has_srq = ipoib_cm_has_srq(dev);
+       rx_ring = has_srq ? priv->cm.srq_ring : p->rx_ring;
+
+       skb = rx_ring[wr_id].skb;
 
        if (unlikely(wc->status != IB_WC_SUCCESS)) {
                ipoib_dbg(priv, "cm recv error "
                           "(status=%d, wrid=%d vend_err %x)\n",
                           wc->status, wr_id, wc->vendor_err);
                ++dev->stats.rx_dropped;
-               goto repost;
+               if (has_srq)
+                       goto repost;
+               else {
+                       if (!--p->recv_count) {
+                               spin_lock_irqsave(&priv->lock, flags);
+                               list_move(&p->list, &priv->cm.rx_reap_list);
+                               spin_unlock_irqrestore(&priv->lock, flags);
+                               queue_work(ipoib_workqueue, &priv->cm.rx_reap_task);
+                       }
+                       return;
+               }
        }
 
        if (unlikely(!(wr_id & IPOIB_CM_RX_UPDATE_MASK))) {
-               p = wc->qp->qp_context;
                if (p && time_after_eq(jiffies, p->jiffies + IPOIB_CM_RX_UPDATE_TIME)) {
                        spin_lock_irqsave(&priv->lock, flags);
                        p->jiffies = jiffies;
@@ -466,7 +581,7 @@ void ipoib_cm_handle_rx_wc(struct net_device *dev, struct ib_wc *wc)
        frags = PAGE_ALIGN(wc->byte_len - min(wc->byte_len,
                                              (unsigned)IPOIB_CM_HEAD_SIZE)) / PAGE_SIZE;
 
-       newskb = ipoib_cm_alloc_rx_skb(dev, wr_id, frags, mapping);
+       newskb = ipoib_cm_alloc_rx_skb(dev, rx_ring, wr_id, frags, mapping);
        if (unlikely(!newskb)) {
                /*
                 * If we can't allocate a new RX buffer, dump
@@ -477,8 +592,8 @@ void ipoib_cm_handle_rx_wc(struct net_device *dev, struct ib_wc *wc)
                goto repost;
        }
 
-       ipoib_cm_dma_unmap_rx(priv, frags, priv->cm.srq_ring[wr_id].mapping);
-       memcpy(priv->cm.srq_ring[wr_id].mapping, mapping, (frags + 1) * sizeof *mapping);
+       ipoib_cm_dma_unmap_rx(priv, frags, rx_ring[wr_id].mapping);
+       memcpy(rx_ring[wr_id].mapping, mapping, (frags + 1) * sizeof *mapping);
 
        ipoib_dbg_data(priv, "received %d bytes, SLID 0x%04x\n",
                       wc->byte_len, wc->slid);
@@ -499,9 +614,17 @@ void ipoib_cm_handle_rx_wc(struct net_device *dev, struct ib_wc *wc)
        netif_receive_skb(skb);
 
 repost:
-       if (unlikely(ipoib_cm_post_receive(dev, wr_id)))
-               ipoib_warn(priv, "ipoib_cm_post_receive failed "
-                          "for buf %d\n", wr_id);
+       if (has_srq) {
+               if (unlikely(ipoib_cm_post_receive_srq(dev, wr_id)))
+                       ipoib_warn(priv, "ipoib_cm_post_receive_srq failed "
+                                  "for buf %d\n", wr_id);
+       } else {
+               if (unlikely(ipoib_cm_post_receive_nonsrq(dev, p, wr_id))) {
+                       --p->recv_count;
+                       ipoib_warn(priv, "ipoib_cm_post_receive_nonsrq failed "
+                                  "for buf %d\n", wr_id);
+               }
+       }
 }
 
 static inline int post_send(struct ipoib_dev_priv *priv,
@@ -686,6 +809,12 @@ static void ipoib_cm_free_rx_reap_list(struct net_device *dev)
        list_for_each_entry_safe(rx, n, &list, list) {
                ib_destroy_cm_id(rx->id);
                ib_destroy_qp(rx->qp);
+               if (!ipoib_cm_has_srq(dev)) {
+                       ipoib_cm_free_rx_ring(priv->dev, rx->rx_ring);
+                       spin_lock_irq(&priv->lock);
+                       --priv->cm.nonsrq_conn_qp;
+                       spin_unlock_irq(&priv->lock);
+               }
                kfree(rx);
        }
 }
@@ -864,7 +993,7 @@ static int ipoib_cm_send_req(struct net_device *dev,
        req.retry_count                 = 0; /* RFC draft warns against retries */
        req.rnr_retry_count             = 0; /* RFC draft warns against retries */
        req.max_cm_retries              = 15;
-       req.srq                         = 1;
+       req.srq                         = ipoib_cm_has_srq(dev);
        return ib_send_cm_req(id, &req);
 }
 
@@ -1270,7 +1399,7 @@ int ipoib_cm_add_mode_attr(struct net_device *dev)
        return device_create_file(&dev->dev, &dev_attr_mode);
 }
 
-static int ipoib_cm_create_srq(struct net_device *dev)
+static void ipoib_cm_create_srq(struct net_device *dev)
 {
        struct ipoib_dev_priv *priv = netdev_priv(dev);
        struct ib_srq_init_attr srq_init_attr = {
@@ -1279,32 +1408,30 @@ static int ipoib_cm_create_srq(struct net_device *dev)
                        .max_sge = IPOIB_CM_RX_SG
                }
        };
-       int ret;
 
        priv->cm.srq = ib_create_srq(priv->pd, &srq_init_attr);
        if (IS_ERR(priv->cm.srq)) {
-               ret = PTR_ERR(priv->cm.srq);
+               if (PTR_ERR(priv->cm.srq) != -ENOSYS)
+                       printk(KERN_WARNING "%s: failed to allocate SRQ, error %ld\n",
+                              priv->ca->name, PTR_ERR(priv->cm.srq));
                priv->cm.srq = NULL;
-               return ret;
+               return;
        }
 
        priv->cm.srq_ring = kzalloc(ipoib_recvq_size * sizeof *priv->cm.srq_ring,
                                    GFP_KERNEL);
        if (!priv->cm.srq_ring) {
-               printk(KERN_WARNING "%s: failed to allocate CM ring (%d entries)\n",
+               printk(KERN_WARNING "%s: failed to allocate CM SRQ ring (%d entries)\n",
                       priv->ca->name, ipoib_recvq_size);
                ib_destroy_srq(priv->cm.srq);
                priv->cm.srq = NULL;
-               return -ENOMEM;
        }
-
-       return 0;
 }
 
 int ipoib_cm_dev_init(struct net_device *dev)
 {
        struct ipoib_dev_priv *priv = netdev_priv(dev);
-       int ret, i;
+       int i;
 
        INIT_LIST_HEAD(&priv->cm.passive_ids);
        INIT_LIST_HEAD(&priv->cm.reap_list);
@@ -1331,21 +1458,25 @@ int ipoib_cm_dev_init(struct net_device *dev)
        priv->cm.rx_wr.sg_list = priv->cm.rx_sge;
        priv->cm.rx_wr.num_sge = IPOIB_CM_RX_SG;
 
-       ret = ipoib_cm_create_srq(dev);
-       if (ret)
-               return ret;
+       ipoib_cm_create_srq(dev);
+
+       if (ipoib_cm_has_srq(dev)) {
+               for (i = 0; i < ipoib_recvq_size; ++i) {
+                       if (!ipoib_cm_alloc_rx_skb(dev, priv->cm.srq_ring, i,
+                                                  IPOIB_CM_RX_SG - 1,
+                                                  priv->cm.srq_ring[i].mapping)) {
+                               ipoib_warn(priv, "failed to allocate "
+                                          "receive buffer %d\n", i);
+                               ipoib_cm_dev_cleanup(dev);
+                               return -ENOMEM;
+                       }
 
-       for (i = 0; i < ipoib_recvq_size; ++i) {
-               if (!ipoib_cm_alloc_rx_skb(dev, i, IPOIB_CM_RX_SG - 1,
-                                          priv->cm.srq_ring[i].mapping)) {
-                       ipoib_warn(priv, "failed to allocate receive buffer %d\n", i);
-                       ipoib_cm_dev_cleanup(dev);
-                       return -ENOMEM;
-               }
-               if (ipoib_cm_post_receive(dev, i)) {
-                       ipoib_warn(priv, "ipoib_ib_post_receive failed for buf %d\n", i);
-                       ipoib_cm_dev_cleanup(dev);
-                       return -EIO;
+                       if (ipoib_cm_post_receive_srq(dev, i)) {
+                               ipoib_warn(priv, "ipoib_cm_post_receive_srq "
+                                          "failed for buf %d\n", i);
+                               ipoib_cm_dev_cleanup(dev);
+                               return -EIO;
+                       }
                }
        }
 
index 5a9c3b5a39ef1f023be7acba78b7bff581aeb26d..3bfc2ef1303ee7cf8435413bd4cfbd4ccb2b9a1b 100644 (file)
@@ -1268,6 +1268,9 @@ static int __init ipoib_init_module(void)
        ipoib_sendq_size = roundup_pow_of_two(ipoib_sendq_size);
        ipoib_sendq_size = min(ipoib_sendq_size, IPOIB_MAX_QUEUE_SIZE);
        ipoib_sendq_size = max(ipoib_sendq_size, IPOIB_MIN_QUEUE_SIZE);
+#ifdef CONFIG_INFINIBAND_IPOIB_CM
+       ipoib_max_conn_qp = min(ipoib_max_conn_qp, IPOIB_CM_MAX_CONN_QP);
+#endif
 
        ret = ipoib_register_debugfs();
        if (ret)
index b6848a8d35dbb96dbace5116cbd528f35a473b3d..433e99ac227b85a2bd6e8594c69fe47f3dc6e274 100644 (file)
@@ -172,8 +172,12 @@ int ipoib_transport_dev_init(struct net_device *dev, struct ib_device *ca)
 
        size = ipoib_sendq_size + ipoib_recvq_size + 1;
        ret = ipoib_cm_dev_init(dev);
-       if (!ret)
-               size += ipoib_recvq_size + 1 /* 1 extra for rx_drain_qp */;
+       if (!ret) {
+               if (ipoib_cm_has_srq(dev))
+                       size += ipoib_recvq_size + 1; /* 1 extra for rx_drain_qp */
+               else
+                       size += ipoib_recvq_size * ipoib_max_conn_qp;
+       }
 
        priv->cq = ib_create_cq(priv->ca, ipoib_ib_completion, NULL, dev, size, 0);
        if (IS_ERR(priv->cq)) {