RDS: Implement atomic operations
authorAndy Grover <andy.grover@oracle.com>
Tue, 12 Jan 2010 22:33:38 +0000 (14:33 -0800)
committerAndy Grover <andy.grover@oracle.com>
Thu, 9 Sep 2010 01:11:41 +0000 (18:11 -0700)
Implement a CMSG-based interface to do FADD and CSWP ops.

Alter send routines to handle atomic ops.

Add atomic counters to stats.

Add xmit_atomic() to struct rds_transport

Inline rds_ib_send_unmap_rdma into unmap_rm

Signed-off-by: Andy Grover <andy.grover@oracle.com>
include/linux/rds.h
net/rds/ib.c
net/rds/ib.h
net/rds/ib_rdma.c
net/rds/ib_send.c
net/rds/rdma.c
net/rds/rds.h
net/rds/send.c
net/rds/stats.c

index 7f3971d9fc5cf32ab63ad665201c9fee1d692fdf..9239152abf7a825312a55c8877bc559a548e0bbc 100644 (file)
@@ -73,6 +73,8 @@
 #define RDS_CMSG_RDMA_MAP              3
 #define RDS_CMSG_RDMA_STATUS           4
 #define RDS_CMSG_CONG_UPDATE           5
+#define RDS_CMSG_ATOMIC_FADD           6
+#define RDS_CMSG_ATOMIC_CSWP           7
 
 #define RDS_INFO_FIRST                 10000
 #define RDS_INFO_COUNTERS              10000
@@ -237,6 +239,23 @@ struct rds_rdma_args {
        u_int64_t       user_token;
 };
 
+struct rds_atomic_args {
+       rds_rdma_cookie_t cookie;
+       uint64_t        local_addr;
+       uint64_t        remote_addr;
+       union {
+               struct {
+                       uint64_t        compare;
+                       uint64_t        swap;
+               } cswp;
+               struct {
+                       uint64_t        add;
+               } fadd;
+       };
+       uint64_t        flags;
+       uint64_t        user_token;
+};
+
 struct rds_rdma_notify {
        u_int64_t       user_token;
        int32_t         status;
index 8f2d6dd7700a8a0e20192b521c30dc7bc2df553b..f0d29656baff725ede7f240f5087d61ad625ae4b 100644 (file)
@@ -264,6 +264,7 @@ struct rds_transport rds_ib_transport = {
        .xmit                   = rds_ib_xmit,
        .xmit_cong_map          = NULL,
        .xmit_rdma              = rds_ib_xmit_rdma,
+       .xmit_atomic            = rds_ib_xmit_atomic,
        .recv                   = rds_ib_recv,
        .conn_alloc             = rds_ib_conn_alloc,
        .conn_free              = rds_ib_conn_free,
index 64df4e79b29f27ffcb8e8242df4b344284734e74..d2fd0aa4fde7da2b8725d372f55d540592f07c79 100644 (file)
@@ -336,6 +336,7 @@ void rds_ib_send_add_credits(struct rds_connection *conn, unsigned int credits);
 void rds_ib_advertise_credits(struct rds_connection *conn, unsigned int posted);
 int rds_ib_send_grab_credits(struct rds_ib_connection *ic, u32 wanted,
                             u32 *adv_credits, int need_posted, int max_posted);
+int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op);
 
 /* ib_stats.c */
 DECLARE_PER_CPU(struct rds_ib_statistics, rds_ib_stats);
index 0f3b5a2f3fe0293adb313ed40dff1d321c502701..242231f09464166fd3d9590952a2555c3fa654fc 100644 (file)
@@ -298,7 +298,9 @@ static struct rds_ib_mr *rds_ib_alloc_fmr(struct rds_ib_device *rds_ibdev)
        ibmr->fmr = ib_alloc_fmr(rds_ibdev->pd,
                        (IB_ACCESS_LOCAL_WRITE |
                         IB_ACCESS_REMOTE_READ |
-                        IB_ACCESS_REMOTE_WRITE),
+                        IB_ACCESS_REMOTE_WRITE|
+                        IB_ACCESS_REMOTE_ATOMIC),
+
                        &pool->fmr_attr);
        if (IS_ERR(ibmr->fmr)) {
                err = PTR_ERR(ibmr->fmr);
index f0edfdb2866cf265902df8f02374bdd5176b9a35..b2bd164434adddcb5ddcc463fb1870906df0c2da 100644 (file)
@@ -62,15 +62,17 @@ static void rds_ib_send_rdma_complete(struct rds_message *rm,
        rds_rdma_send_complete(rm, notify_status);
 }
 
-static void rds_ib_send_unmap_rdma(struct rds_ib_connection *ic,
-                                  struct rds_rdma_op *op)
+static void rds_ib_send_atomic_complete(struct rds_message *rm,
+                                     int wc_status)
 {
-       if (op->r_mapped) {
-               ib_dma_unmap_sg(ic->i_cm_id->device,
-                       op->r_sg, op->r_nents,
-                       op->r_write ? DMA_TO_DEVICE : DMA_FROM_DEVICE);
-               op->r_mapped = 0;
-       }
+       int notify_status;
+
+       if (wc_status != IB_WC_SUCCESS)
+               notify_status = RDS_RDMA_OTHER_ERROR;
+       else
+               notify_status = RDS_RDMA_SUCCESS;
+
+       rds_atomic_send_complete(rm, notify_status);
 }
 
 static void rds_ib_send_unmap_rm(struct rds_ib_connection *ic,
@@ -86,7 +88,14 @@ static void rds_ib_send_unmap_rm(struct rds_ib_connection *ic,
                        DMA_TO_DEVICE);
 
        if (rm->rdma.m_rdma_op.r_active) {
-               rds_ib_send_unmap_rdma(ic, &rm->rdma.m_rdma_op);
+               struct rds_rdma_op *op = &rm->rdma.m_rdma_op;
+
+               if (op->r_mapped) {
+                       ib_dma_unmap_sg(ic->i_cm_id->device,
+                                       op->r_sg, op->r_nents,
+                                       op->r_write ? DMA_TO_DEVICE : DMA_FROM_DEVICE);
+                       op->r_mapped = 0;
+               }
 
                /* If the user asked for a completion notification on this
                 * message, we can implement three different semantics:
@@ -116,6 +125,24 @@ static void rds_ib_send_unmap_rm(struct rds_ib_connection *ic,
                        rds_stats_add(s_recv_rdma_bytes, rm->rdma.m_rdma_op.r_bytes);
        }
 
+       if (rm->atomic.op_active) {
+               struct rm_atomic_op *op = &rm->atomic;
+
+               /* unmap atomic recvbuf */
+               if (op->op_mapped) {
+                       ib_dma_unmap_sg(ic->i_cm_id->device, op->op_sg, 1,
+                                       DMA_FROM_DEVICE);
+                       op->op_mapped = 0;
+               }
+
+               rds_ib_send_atomic_complete(rm, wc_status);
+
+               if (rm->atomic.op_type == RDS_ATOMIC_TYPE_CSWP)
+                       rds_stats_inc(s_atomic_cswp);
+               else
+                       rds_stats_inc(s_atomic_fadd);
+       }
+
        /* If anyone waited for this message to get flushed out, wake
         * them up now */
        rds_message_unmapped(rm);
@@ -158,12 +185,9 @@ void rds_ib_send_clear_ring(struct rds_ib_connection *ic)
        u32 i;
 
        for (i = 0, send = ic->i_sends; i < ic->i_send_ring.w_nr; i++, send++) {
-               if (send->s_wr.opcode == 0xdead)
+               if (!send->s_rm || send->s_wr.opcode == 0xdead)
                        continue;
-               if (send->s_rm)
-                       rds_ib_send_unmap_rm(ic, send, IB_WC_WR_FLUSH_ERR);
-               if (send->s_op)
-                       rds_ib_send_unmap_rdma(ic, send->s_op);
+               rds_ib_send_unmap_rm(ic, send, IB_WC_WR_FLUSH_ERR);
        }
 }
 
@@ -218,6 +242,8 @@ void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context)
                                break;
                        case IB_WR_RDMA_WRITE:
                        case IB_WR_RDMA_READ:
+                       case IB_WR_ATOMIC_FETCH_AND_ADD:
+                       case IB_WR_ATOMIC_CMP_AND_SWP:
                                /* Nothing to be done - the SG list will be unmapped
                                 * when the SEND completes. */
                                break;
@@ -243,8 +269,7 @@ void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context)
 
                                rm = rds_send_get_message(conn, send->s_op);
                                if (rm) {
-                                       if (rm->rdma.m_rdma_op.r_active)
-                                               rds_ib_send_unmap_rdma(ic, &rm->rdma.m_rdma_op);
+                                       rds_ib_send_unmap_rm(ic, send, wc.status);
                                        rds_ib_send_rdma_complete(rm, wc.status);
                                        rds_message_put(rm);
                                }
@@ -736,6 +761,89 @@ out:
        return ret;
 }
 
+/*
+ * Issue atomic operation.
+ * A simplified version of the rdma case, we always map 1 SG, and
+ * only 8 bytes, for the return value from the atomic operation.
+ */
+int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op)
+{
+       struct rds_ib_connection *ic = conn->c_transport_data;
+       struct rds_ib_send_work *send = NULL;
+       struct ib_send_wr *failed_wr;
+       struct rds_ib_device *rds_ibdev;
+       u32 pos;
+       u32 work_alloc;
+       int ret;
+
+       rds_ibdev = ib_get_client_data(ic->i_cm_id->device, &rds_ib_client);
+
+       work_alloc = rds_ib_ring_alloc(&ic->i_send_ring, 1, &pos);
+       if (work_alloc != 1) {
+               rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
+               rds_ib_stats_inc(s_ib_tx_ring_full);
+               ret = -ENOMEM;
+               goto out;
+       }
+
+       /* address of send request in ring */
+       send = &ic->i_sends[pos];
+       send->s_queued = jiffies;
+
+       if (op->op_type == RDS_ATOMIC_TYPE_CSWP) {
+               send->s_wr.opcode = IB_WR_ATOMIC_CMP_AND_SWP;
+               send->s_wr.wr.atomic.compare_add = op->op_compare;
+               send->s_wr.wr.atomic.swap = op->op_swap_add;
+       } else { /* FADD */
+               send->s_wr.opcode = IB_WR_ATOMIC_FETCH_AND_ADD;
+               send->s_wr.wr.atomic.compare_add = op->op_swap_add;
+               send->s_wr.wr.atomic.swap = 0;
+       }
+       send->s_wr.send_flags = IB_SEND_SIGNALED;
+       send->s_wr.num_sge = 1;
+       send->s_wr.next = NULL;
+       send->s_wr.wr.atomic.remote_addr = op->op_remote_addr;
+       send->s_wr.wr.atomic.rkey = op->op_rkey;
+
+       /* map 8 byte retval buffer to the device */
+       ret = ib_dma_map_sg(ic->i_cm_id->device, op->op_sg, 1, DMA_FROM_DEVICE);
+       rdsdebug("ic %p mapping atomic op %p. mapped %d pg\n", ic, op, ret);
+       if (ret != 1) {
+               rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
+               rds_ib_stats_inc(s_ib_tx_sg_mapping_failure);
+               ret = -ENOMEM; /* XXX ? */
+               goto out;
+       }
+
+       /* Convert our struct scatterlist to struct ib_sge */
+       send->s_sge[0].addr = ib_sg_dma_address(ic->i_cm_id->device, op->op_sg);
+       send->s_sge[0].length = ib_sg_dma_len(ic->i_cm_id->device, op->op_sg);
+       send->s_sge[0].lkey = ic->i_mr->lkey;
+
+       rdsdebug("rva %Lx rpa %Lx len %u\n", op->op_remote_addr,
+                send->s_sge[0].addr, send->s_sge[0].length);
+
+       failed_wr = &send->s_wr;
+       ret = ib_post_send(ic->i_cm_id->qp, &send->s_wr, &failed_wr);
+       rdsdebug("ic %p send %p (wr %p) ret %d wr %p\n", ic,
+                send, &send->s_wr, ret, failed_wr);
+       BUG_ON(failed_wr != &send->s_wr);
+       if (ret) {
+               printk(KERN_WARNING "RDS/IB: atomic ib_post_send to %pI4 "
+                      "returned %d\n", &conn->c_faddr, ret);
+               rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
+               goto out;
+       }
+
+       if (unlikely(failed_wr != &send->s_wr)) {
+               printk(KERN_WARNING "RDS/IB: atomic ib_post_send() rc=%d, but failed_wqe updated!\n", ret);
+               BUG_ON(failed_wr != &send->s_wr);
+       }
+
+out:
+       return ret;
+}
+
 int rds_ib_xmit_rdma(struct rds_connection *conn, struct rds_rdma_op *op)
 {
        struct rds_ib_connection *ic = conn->c_transport_data;
index 4fda33045598510268d1b238b392580a6fe2c334..a7019df38c7031dbfe4fa349d25383443e84a607 100644 (file)
@@ -719,3 +719,76 @@ int rds_cmsg_rdma_map(struct rds_sock *rs, struct rds_message *rm,
 
        return __rds_rdma_map(rs, CMSG_DATA(cmsg), &rm->m_rdma_cookie, &rm->rdma.m_rdma_mr);
 }
+
+/*
+ * Fill in rds_message for an atomic request.
+ */
+int rds_cmsg_atomic(struct rds_sock *rs, struct rds_message *rm,
+                   struct cmsghdr *cmsg)
+{
+       struct page *page = NULL;
+       struct rds_atomic_args *args;
+       int ret = 0;
+
+       if (cmsg->cmsg_len < CMSG_LEN(sizeof(struct rds_atomic_args))
+        || rm->atomic.op_active)
+               return -EINVAL;
+
+       args = CMSG_DATA(cmsg);
+
+       if (cmsg->cmsg_type == RDS_CMSG_ATOMIC_CSWP) {
+               rm->atomic.op_type = RDS_ATOMIC_TYPE_CSWP;
+               rm->atomic.op_swap_add = args->cswp.swap;
+               rm->atomic.op_compare = args->cswp.compare;
+       } else {
+               rm->atomic.op_type = RDS_ATOMIC_TYPE_FADD;
+               rm->atomic.op_swap_add = args->fadd.add;
+       }
+
+       rm->m_rdma_cookie = args->cookie;
+       rm->atomic.op_notify = !!(args->flags & RDS_RDMA_NOTIFY_ME);
+       rm->atomic.op_recverr = rs->rs_recverr;
+       rm->atomic.op_sg = rds_message_alloc_sgs(rm, 1);
+
+       /* verify 8 byte-aligned */
+       if (args->local_addr & 0x7) {
+               ret = -EFAULT;
+               goto err;
+       }
+
+       ret = rds_pin_pages(args->local_addr, 1, &page, 1);
+       if (ret != 1)
+               goto err;
+       ret = 0;
+
+       sg_set_page(rm->atomic.op_sg, page, 8, offset_in_page(args->local_addr));
+
+       if (rm->atomic.op_notify || rm->atomic.op_recverr) {
+               /* We allocate an uninitialized notifier here, because
+                * we don't want to do that in the completion handler. We
+                * would have to use GFP_ATOMIC there, and don't want to deal
+                * with failed allocations.
+                */
+               rm->atomic.op_notifier = kmalloc(sizeof(*rm->atomic.op_notifier), GFP_KERNEL);
+               if (!rm->atomic.op_notifier) {
+                       ret = -ENOMEM;
+                       goto err;
+               }
+
+               rm->atomic.op_notifier->n_user_token = args->user_token;
+               rm->atomic.op_notifier->n_status = RDS_RDMA_SUCCESS;
+       }
+
+       rm->atomic.op_rkey = rds_rdma_cookie_key(rm->m_rdma_cookie);
+       rm->atomic.op_remote_addr = args->remote_addr + rds_rdma_cookie_offset(args->cookie);
+
+       rm->atomic.op_active = 1;
+
+       return ret;
+err:
+       if (page)
+               put_page(page);
+       kfree(rm->atomic.op_notifier);
+
+       return ret;
+}
index 0bb4957e0cfcb55102186e94dc75da2aafe14b3b..830e2bbb33322f1c617a35128663620f88a00c6c 100644 (file)
@@ -97,6 +97,7 @@ struct rds_connection {
        unsigned int            c_xmit_hdr_off;
        unsigned int            c_xmit_data_off;
        unsigned int            c_xmit_rdma_sent;
+       unsigned int            c_xmit_atomic_sent;
 
        spinlock_t              c_lock;         /* protect msg queues */
        u64                     c_next_tx_seq;
@@ -260,6 +261,10 @@ static inline u32 rds_rdma_cookie_offset(rds_rdma_cookie_t cookie)
        return cookie >> 32;
 }
 
+/* atomic operation types */
+#define RDS_ATOMIC_TYPE_CSWP           0
+#define RDS_ATOMIC_TYPE_FADD           1
+
 /*
  * m_sock_item and m_conn_item are on lists that are serialized under
  * conn->c_lock.  m_sock_item has additional meaning in that once it is empty
@@ -315,11 +320,27 @@ struct rds_message {
        struct rds_sock         *m_rs;
        rds_rdma_cookie_t       m_rdma_cookie;
        struct {
-               struct {
+               struct rm_atomic_op {
+                       int                     op_type;
+                       uint64_t                op_swap_add;
+                       uint64_t                op_compare;
+
+                       u32                     op_rkey;
+                       u64                     op_remote_addr;
+                       unsigned int            op_notify:1;
+                       unsigned int            op_recverr:1;
+                       unsigned int            op_mapped:1;
+                       unsigned int            op_active:1;
+                       struct rds_notifier     *op_notifier;
+                       struct scatterlist      *op_sg;
+
+                       struct rds_mr           *op_rdma_mr;
+               } atomic;
+               struct rm_rdma_op {
                        struct rds_rdma_op      m_rdma_op;
                        struct rds_mr           *m_rdma_mr;
                } rdma;
-               struct {
+               struct rm_data_op {
                        unsigned int            m_nents;
                        unsigned int            m_count;
                        struct scatterlist      *m_sg;
@@ -397,6 +418,7 @@ struct rds_transport {
        int (*xmit_cong_map)(struct rds_connection *conn,
                             struct rds_cong_map *map, unsigned long offset);
        int (*xmit_rdma)(struct rds_connection *conn, struct rds_rdma_op *op);
+       int (*xmit_atomic)(struct rds_connection *conn, struct rm_atomic_op *op);
        int (*recv)(struct rds_connection *conn);
        int (*inc_copy_to_user)(struct rds_incoming *inc, struct iovec *iov,
                                size_t size);
@@ -546,6 +568,8 @@ struct rds_statistics {
        uint64_t        s_cong_update_received;
        uint64_t        s_cong_send_error;
        uint64_t        s_cong_send_blocked;
+       uint64_t        s_atomic_cswp;
+       uint64_t        s_atomic_fadd;
 };
 
 /* af_rds.c */
@@ -722,7 +746,10 @@ int rds_cmsg_rdma_args(struct rds_sock *rs, struct rds_message *rm,
 int rds_cmsg_rdma_map(struct rds_sock *rs, struct rds_message *rm,
                          struct cmsghdr *cmsg);
 void rds_rdma_free_op(struct rds_rdma_op *ro);
-void rds_rdma_send_complete(struct rds_message *rm, int);
+void rds_rdma_send_complete(struct rds_message *rm, int wc_status);
+void rds_atomic_send_complete(struct rds_message *rm, int wc_status);
+int rds_cmsg_atomic(struct rds_sock *rs, struct rds_message *rm,
+                   struct cmsghdr *cmsg);
 
 extern void __rds_put_mr_final(struct rds_mr *mr);
 static inline void rds_mr_put(struct rds_mr *mr)
index b751a8e77c411505dab265f6d031c9a10bdfa5de..f3f4e79274bf7e155c4588e5429addd85e2139ad 100644 (file)
@@ -73,6 +73,7 @@ void rds_send_reset(struct rds_connection *conn)
        conn->c_xmit_hdr_off = 0;
        conn->c_xmit_data_off = 0;
        conn->c_xmit_rdma_sent = 0;
+       conn->c_xmit_atomic_sent = 0;
 
        conn->c_map_queued = 0;
 
@@ -171,6 +172,7 @@ int rds_send_xmit(struct rds_connection *conn)
                        conn->c_xmit_hdr_off = 0;
                        conn->c_xmit_data_off = 0;
                        conn->c_xmit_rdma_sent = 0;
+                       conn->c_xmit_atomic_sent = 0;
 
                        /* Release the reference to the previous message. */
                        rds_message_put(rm);
@@ -262,6 +264,17 @@ int rds_send_xmit(struct rds_connection *conn)
                        conn->c_xmit_rm = rm;
                }
 
+
+               if (rm->atomic.op_active && !conn->c_xmit_atomic_sent) {
+                       ret = conn->c_trans->xmit_atomic(conn, &rm->atomic);
+                       if (ret)
+                               break;
+                       conn->c_xmit_atomic_sent = 1;
+                       /* The transport owns the mapped memory for now.
+                        * You can't unmap it while it's on the send queue */
+                       set_bit(RDS_MSG_MAPPED, &rm->m_flags);
+               }
+
                /*
                 * Try and send an rdma message.  Let's see if we can
                 * keep this simple and require that the transport either
@@ -442,6 +455,41 @@ void rds_rdma_send_complete(struct rds_message *rm, int status)
 }
 EXPORT_SYMBOL_GPL(rds_rdma_send_complete);
 
+/*
+ * Just like above, except looks at atomic op
+ */
+void rds_atomic_send_complete(struct rds_message *rm, int status)
+{
+       struct rds_sock *rs = NULL;
+       struct rm_atomic_op *ao;
+       struct rds_notifier *notifier;
+
+       spin_lock(&rm->m_rs_lock);
+
+       ao = &rm->atomic;
+       if (test_bit(RDS_MSG_ON_SOCK, &rm->m_flags)
+           && ao->op_active && ao->op_notify && ao->op_notifier) {
+               notifier = ao->op_notifier;
+               rs = rm->m_rs;
+               sock_hold(rds_rs_to_sk(rs));
+
+               notifier->n_status = status;
+               spin_lock(&rs->rs_lock);
+               list_add_tail(&notifier->n_list, &rs->rs_notify_queue);
+               spin_unlock(&rs->rs_lock);
+
+               ao->op_notifier = NULL;
+       }
+
+       spin_unlock(&rm->m_rs_lock);
+
+       if (rs) {
+               rds_wake_sk_sleep(rs);
+               sock_put(rds_rs_to_sk(rs));
+       }
+}
+EXPORT_SYMBOL_GPL(rds_atomic_send_complete);
+
 /*
  * This is the same as rds_rdma_send_complete except we
  * don't do any locking - we have all the ingredients (message,
@@ -788,6 +836,11 @@ static int rds_rm_size(struct msghdr *msg, int data_len)
                        /* these are valid but do no add any size */
                        break;
 
+               case RDS_CMSG_ATOMIC_CSWP:
+               case RDS_CMSG_ATOMIC_FADD:
+                       size += sizeof(struct scatterlist);
+                       break;
+
                default:
                        return -EINVAL;
                }
@@ -813,7 +866,7 @@ static int rds_cmsg_send(struct rds_sock *rs, struct rds_message *rm,
                        continue;
 
                /* As a side effect, RDMA_DEST and RDMA_MAP will set
-                * rm->m_rdma_cookie and rm->m_rdma_mr.
+                * rm->rdma.m_rdma_cookie and rm->rdma.m_rdma_mr.
                 */
                switch (cmsg->cmsg_type) {
                case RDS_CMSG_RDMA_ARGS:
@@ -829,6 +882,10 @@ static int rds_cmsg_send(struct rds_sock *rs, struct rds_message *rm,
                        if (!ret)
                                *allocated_mr = 1;
                        break;
+               case RDS_CMSG_ATOMIC_CSWP:
+               case RDS_CMSG_ATOMIC_FADD:
+                       ret = rds_cmsg_atomic(rs, rm, cmsg);
+                       break;
 
                default:
                        return -EINVAL;
@@ -926,10 +983,18 @@ int rds_sendmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg,
                goto out;
 
        if ((rm->m_rdma_cookie || rm->rdma.m_rdma_op.r_active) &&
-           !conn->c_trans->xmit_rdma) {
+              !conn->c_trans->xmit_rdma) {
                if (printk_ratelimit())
                        printk(KERN_NOTICE "rdma_op %p conn xmit_rdma %p\n",
-                               &rm->rdma.m_rdma_op, conn->c_trans->xmit_rdma);
+                              &rm->rdma.m_rdma_op, conn->c_trans->xmit_rdma);
+               ret = -EOPNOTSUPP;
+               goto out;
+       }
+
+       if (rm->atomic.op_active && !conn->c_trans->xmit_atomic) {
+               if (printk_ratelimit())
+                       printk(KERN_NOTICE "atomic_op %p conn xmit_atomic %p\n",
+                              &rm->atomic, conn->c_trans->xmit_atomic);
                ret = -EOPNOTSUPP;
                goto out;
        }
index 7598eb07cfb149264355695d4582f28e7b9b0962..c66d95d9c262682de2a381a66688a02e9439adb8 100644 (file)
@@ -75,6 +75,8 @@ static const char *const rds_stat_names[] = {
        "cong_update_received",
        "cong_send_error",
        "cong_send_blocked",
+       "s_atomic_cswp",
+       "s_atomic_fadd",
 };
 
 void rds_stats_info_copy(struct rds_info_iterator *iter,