}
}
+/* Allocate a fixed-size buffer in which to construct and send the
+ * RPC-over-RDMA header for this request.
+ */
+static bool
+rpcrdma_get_rdmabuf(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
+ gfp_t flags)
+{
+ size_t size = r_xprt->rx_data.inline_wsize;
+ struct rpcrdma_regbuf *rb;
+
+ if (req->rl_rdmabuf)
+ return true;
+
+ rb = rpcrdma_alloc_regbuf(&r_xprt->rx_ia, size, flags);
+ if (IS_ERR(rb))
+ return false;
+
+ r_xprt->rx_stats.hardway_register_count += size;
+ req->rl_rdmabuf = rb;
+ return true;
+}
+
+/* RPC/RDMA marshaling may choose to send payload bearing ops inline,
+ * if the resulting Call message is smaller than the inline threshold.
+ * The value of the "rq_callsize" argument accounts for RPC header
+ * requirements, but not for the data payload in these cases.
+ *
+ * See rpcrdma_inline_pullup.
+ */
+static bool
+rpcrdma_get_sendbuf(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
+ size_t size, gfp_t flags)
+{
+ struct rpcrdma_regbuf *rb;
+ size_t min_size;
+
+ if (req->rl_sendbuf && rdmab_length(req->rl_sendbuf) >= size)
+ return true;
+
+ min_size = max_t(size_t, size, r_xprt->rx_data.inline_wsize);
+ rb = rpcrdma_alloc_regbuf(&r_xprt->rx_ia, min_size, flags);
+ if (IS_ERR(rb))
+ return false;
+
+ rpcrdma_free_regbuf(&r_xprt->rx_ia, req->rl_sendbuf);
+ r_xprt->rx_stats.hardway_register_count += min_size;
+ req->rl_sendbuf = rb;
+ return true;
+}
+
+/* The rq_rcv_buf is used only if a Reply chunk is necessary.
+ * The decision to use a Reply chunk is made later in
+ * rpcrdma_marshal_req. This buffer is registered at that time.
+ *
+ * Otherwise, the associated RPC Reply arrives in a separate
+ * Receive buffer, arbitrarily chosen by the HCA. The buffer
+ * allocated here for the RPC Reply is not utilized in that
+ * case. See rpcrdma_inline_fixup.
+ *
+ * A regbuf is used here to remember the buffer size.
+ */
+static bool
+rpcrdma_get_recvbuf(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req,
+ size_t size, gfp_t flags)
+{
+ struct rpcrdma_regbuf *rb;
+
+ if (req->rl_recvbuf && rdmab_length(req->rl_recvbuf) >= size)
+ return true;
+
+ rb = rpcrdma_alloc_regbuf(&r_xprt->rx_ia, size, flags);
+ if (IS_ERR(rb))
+ return false;
+
+ rpcrdma_free_regbuf(&r_xprt->rx_ia, req->rl_recvbuf);
+ r_xprt->rx_stats.hardway_register_count += size;
+ req->rl_recvbuf = rb;
+ return true;
+}
+
/**
* xprt_rdma_allocate - allocate transport resources for an RPC
* @task: RPC task
* EIO: A permanent error occurred, do not retry
*
* The RDMA allocate/free functions need the task structure as a place
- * to hide the struct rpcrdma_req, which is necessary for the actual send/recv
- * sequence.
+ * to hide the struct rpcrdma_req, which is necessary for the actual
+ * send/recv sequence.
*
- * The RPC layer allocates both send and receive buffers in the same call
- * (rq_send_buf and rq_rcv_buf are both part of a single contiguous buffer).
- * We may register rq_rcv_buf when using reply chunks.
+ * xprt_rdma_allocate provides buffers that are already mapped for
+ * DMA, and a local DMA lkey is provided for each.
*/
static int
xprt_rdma_allocate(struct rpc_task *task)
{
struct rpc_rqst *rqst = task->tk_rqstp;
- size_t size = rqst->rq_callsize + rqst->rq_rcvsize;
struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);
- struct rpcrdma_regbuf *rb;
struct rpcrdma_req *req;
- size_t min_size;
gfp_t flags;
req = rpcrdma_buffer_get(&r_xprt->rx_buf);
if (RPC_IS_SWAPPER(task))
flags = __GFP_MEMALLOC | GFP_NOWAIT | __GFP_NOWARN;
- if (req->rl_rdmabuf == NULL)
- goto out_rdmabuf;
- if (req->rl_sendbuf == NULL)
- goto out_sendbuf;
- if (size > req->rl_sendbuf->rg_size)
- goto out_sendbuf;
+ if (!rpcrdma_get_rdmabuf(r_xprt, req, flags))
+ goto out_fail;
+ if (!rpcrdma_get_sendbuf(r_xprt, req, rqst->rq_callsize, flags))
+ goto out_fail;
+ if (!rpcrdma_get_recvbuf(r_xprt, req, rqst->rq_rcvsize, flags))
+ goto out_fail;
+
+ dprintk("RPC: %5u %s: send size = %zd, recv size = %zd, req = %p\n",
+ task->tk_pid, __func__, rqst->rq_callsize,
+ rqst->rq_rcvsize, req);
-out:
- dprintk("RPC: %s: size %zd, request 0x%p\n", __func__, size, req);
req->rl_connect_cookie = 0; /* our reserved value */
rpcrdma_set_xprtdata(rqst, req);
rqst->rq_buffer = req->rl_sendbuf->rg_base;
- rqst->rq_rbuffer = (char *)rqst->rq_buffer + rqst->rq_rcvsize;
+ rqst->rq_rbuffer = req->rl_recvbuf->rg_base;
return 0;
-out_rdmabuf:
- min_size = r_xprt->rx_data.inline_wsize;
- rb = rpcrdma_alloc_regbuf(&r_xprt->rx_ia, min_size, flags);
- if (IS_ERR(rb))
- goto out_fail;
- req->rl_rdmabuf = rb;
-
-out_sendbuf:
- /* XDR encoding and RPC/RDMA marshaling of this request has not
- * yet occurred. Thus a lower bound is needed to prevent buffer
- * overrun during marshaling.
- *
- * RPC/RDMA marshaling may choose to send payload bearing ops
- * inline, if the result is smaller than the inline threshold.
- * The value of the "size" argument accounts for header
- * requirements but not for the payload in these cases.
- *
- * Likewise, allocate enough space to receive a reply up to the
- * size of the inline threshold.
- *
- * It's unlikely that both the send header and the received
- * reply will be large, but slush is provided here to allow
- * flexibility when marshaling.
- */
- min_size = r_xprt->rx_data.inline_rsize;
- min_size += r_xprt->rx_data.inline_wsize;
- if (size < min_size)
- size = min_size;
-
- rb = rpcrdma_alloc_regbuf(&r_xprt->rx_ia, size, flags);
- if (IS_ERR(rb))
- goto out_fail;
-
- r_xprt->rx_stats.hardway_register_count += size;
- rpcrdma_free_regbuf(&r_xprt->rx_ia, req->rl_sendbuf);
- req->rl_sendbuf = rb;
- goto out;
-
out_fail:
rpcrdma_buffer_put(req);
return -ENOMEM;