# define RPCDBG_FACILITY RPCDBG_TRANS
#endif
-enum rpcrdma_chunktype {
- rpcrdma_noch = 0,
- rpcrdma_readch,
- rpcrdma_areadch,
- rpcrdma_writech,
- rpcrdma_replych
-};
-
static const char transfertypes[][12] = {
"inline", /* no chunks */
"read list", /* some argument via rdma read */
return rqst->rq_rcv_buf.buflen <= ia->ri_max_inline_read;
}
-static int
-rpcrdma_tail_pullup(struct xdr_buf *buf)
-{
- size_t tlen = buf->tail[0].iov_len;
- size_t skip = tlen & 3;
-
- /* Do not include the tail if it is only an XDR pad */
- if (tlen < 4)
- return 0;
-
- /* xdr_write_pages() adds a pad at the beginning of the tail
- * if the content in "buf->pages" is unaligned. Force the
- * tail's actual content to land at the next XDR position
- * after the head instead.
- */
- if (skip) {
- unsigned char *src, *dst;
- unsigned int count;
-
- src = buf->tail[0].iov_base;
- dst = buf->head[0].iov_base;
- dst += buf->head[0].iov_len;
-
- src += skip;
- tlen -= skip;
-
- dprintk("RPC: %s: skip=%zu, memmove(%p, %p, %zu)\n",
- __func__, skip, dst, src, tlen);
-
- for (count = tlen; count; count--)
- *dst++ = *src++;
- }
-
- return tlen;
-}
-
/* Split "vec" on page boundaries into segments. FMR registers pages,
* not a byte range. Other modes coalesce these segments into a single
* MR when they can.
return iptr;
}
-/*
- * Copy write data inline.
- * This function is used for "small" requests. Data which is passed
- * to RPC via iovecs (or page list) is copied directly into the
- * pre-registered memory buffer for this request. For small amounts
- * of data, this is efficient. The cutoff value is tunable.
+/* Prepare the RPC-over-RDMA header SGE.
*/
-static void rpcrdma_inline_pullup(struct rpc_rqst *rqst)
+static bool
+rpcrdma_prepare_hdr_sge(struct rpcrdma_ia *ia, struct rpcrdma_req *req,
+ u32 len)
{
- int i, npages, curlen;
- int copy_len;
- unsigned char *srcp, *destp;
- struct rpcrdma_xprt *r_xprt = rpcx_to_rdmax(rqst->rq_xprt);
- int page_base;
- struct page **ppages;
+ struct rpcrdma_regbuf *rb = req->rl_rdmabuf;
+ struct ib_sge *sge = &req->rl_send_sge[0];
+
+ if (unlikely(!rpcrdma_regbuf_is_mapped(rb))) {
+ if (!__rpcrdma_dma_map_regbuf(ia, rb))
+ return false;
+ sge->addr = rdmab_addr(rb);
+ sge->lkey = rdmab_lkey(rb);
+ }
+ sge->length = len;
- destp = rqst->rq_svec[0].iov_base;
- curlen = rqst->rq_svec[0].iov_len;
- destp += curlen;
+ ib_dma_sync_single_for_device(ia->ri_device, sge->addr,
+ sge->length, DMA_TO_DEVICE);
+ req->rl_send_wr.num_sge++;
+ return true;
+}
- dprintk("RPC: %s: destp 0x%p len %d hdrlen %d\n",
- __func__, destp, rqst->rq_slen, curlen);
+/* Prepare the Send SGEs. The head and tail iovec, and each entry
+ * in the page list, gets its own SGE.
+ */
+static bool
+rpcrdma_prepare_msg_sges(struct rpcrdma_ia *ia, struct rpcrdma_req *req,
+ struct xdr_buf *xdr, enum rpcrdma_chunktype rtype)
+{
+ unsigned int sge_no, page_base, len, remaining;
+ struct rpcrdma_regbuf *rb = req->rl_sendbuf;
+ struct ib_device *device = ia->ri_device;
+ struct ib_sge *sge = req->rl_send_sge;
+ u32 lkey = ia->ri_pd->local_dma_lkey;
+ struct page *page, **ppages;
+
+ /* The head iovec is straightforward, as it is already
+ * DMA-mapped. Sync the content that has changed.
+ */
+ if (!rpcrdma_dma_map_regbuf(ia, rb))
+ return false;
+ sge_no = 1;
+ sge[sge_no].addr = rdmab_addr(rb);
+ sge[sge_no].length = xdr->head[0].iov_len;
+ sge[sge_no].lkey = rdmab_lkey(rb);
+ ib_dma_sync_single_for_device(device, sge[sge_no].addr,
+ sge[sge_no].length, DMA_TO_DEVICE);
+
+ /* If there is a Read chunk, the page list is being handled
+ * via explicit RDMA, and thus is skipped here. However, the
+ * tail iovec may include an XDR pad for the page list, as
+ * well as additional content, and may not reside in the
+ * same page as the head iovec.
+ */
+ if (rtype == rpcrdma_readch) {
+ len = xdr->tail[0].iov_len;
- copy_len = rqst->rq_snd_buf.page_len;
+ /* Do not include the tail if it is only an XDR pad */
+ if (len < 4)
+ goto out;
- if (rqst->rq_snd_buf.tail[0].iov_len) {
- curlen = rqst->rq_snd_buf.tail[0].iov_len;
- if (destp + copy_len != rqst->rq_snd_buf.tail[0].iov_base) {
- memmove(destp + copy_len,
- rqst->rq_snd_buf.tail[0].iov_base, curlen);
- r_xprt->rx_stats.pullup_copy_count += curlen;
+ page = virt_to_page(xdr->tail[0].iov_base);
+ page_base = (unsigned long)xdr->tail[0].iov_base & ~PAGE_MASK;
+
+ /* If the content in the page list is an odd length,
+ * xdr_write_pages() has added a pad at the beginning
+ * of the tail iovec. Force the tail's non-pad content
+ * to land at the next XDR position in the Send message.
+ */
+ page_base += len & 3;
+ len -= len & 3;
+ goto map_tail;
+ }
+
+ /* If there is a page list present, temporarily DMA map
+ * and prepare an SGE for each page to be sent.
+ */
+ if (xdr->page_len) {
+ ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT);
+ page_base = xdr->page_base & ~PAGE_MASK;
+ remaining = xdr->page_len;
+ while (remaining) {
+ sge_no++;
+ if (sge_no > RPCRDMA_MAX_SEND_SGES - 2)
+ goto out_mapping_overflow;
+
+ len = min_t(u32, PAGE_SIZE - page_base, remaining);
+ sge[sge_no].addr = ib_dma_map_page(device, *ppages,
+ page_base, len,
+ DMA_TO_DEVICE);
+ if (ib_dma_mapping_error(device, sge[sge_no].addr))
+ goto out_mapping_err;
+ sge[sge_no].length = len;
+ sge[sge_no].lkey = lkey;
+
+ req->rl_mapped_sges++;
+ ppages++;
+ remaining -= len;
+ page_base = 0;
}
- dprintk("RPC: %s: tail destp 0x%p len %d\n",
- __func__, destp + copy_len, curlen);
- rqst->rq_svec[0].iov_len += curlen;
}
- r_xprt->rx_stats.pullup_copy_count += copy_len;
- page_base = rqst->rq_snd_buf.page_base;
- ppages = rqst->rq_snd_buf.pages + (page_base >> PAGE_SHIFT);
- page_base &= ~PAGE_MASK;
- npages = PAGE_ALIGN(page_base+copy_len) >> PAGE_SHIFT;
- for (i = 0; copy_len && i < npages; i++) {
- curlen = PAGE_SIZE - page_base;
- if (curlen > copy_len)
- curlen = copy_len;
- dprintk("RPC: %s: page %d destp 0x%p len %d curlen %d\n",
- __func__, i, destp, copy_len, curlen);
- srcp = kmap_atomic(ppages[i]);
- memcpy(destp, srcp+page_base, curlen);
- kunmap_atomic(srcp);
- rqst->rq_svec[0].iov_len += curlen;
- destp += curlen;
- copy_len -= curlen;
- page_base = 0;
+ /* The tail iovec is not always constructed in the same
+ * page where the head iovec resides (see, for example,
+ * gss_wrap_req_priv). To neatly accommodate that case,
+ * DMA map it separately.
+ */
+ if (xdr->tail[0].iov_len) {
+ page = virt_to_page(xdr->tail[0].iov_base);
+ page_base = (unsigned long)xdr->tail[0].iov_base & ~PAGE_MASK;
+ len = xdr->tail[0].iov_len;
+
+map_tail:
+ sge_no++;
+ sge[sge_no].addr = ib_dma_map_page(device, page,
+ page_base, len,
+ DMA_TO_DEVICE);
+ if (ib_dma_mapping_error(device, sge[sge_no].addr))
+ goto out_mapping_err;
+ sge[sge_no].length = len;
+ sge[sge_no].lkey = lkey;
+ req->rl_mapped_sges++;
}
- /* header now contains entire send message */
+
+out:
+ req->rl_send_wr.num_sge = sge_no + 1;
+ return true;
+
+out_mapping_overflow:
+ pr_err("rpcrdma: too many Send SGEs (%u)\n", sge_no);
+ return false;
+
+out_mapping_err:
+ pr_err("rpcrdma: Send mapping error\n");
+ return false;
+}
+
+bool
+rpcrdma_prepare_send_sges(struct rpcrdma_ia *ia, struct rpcrdma_req *req,
+ u32 hdrlen, struct xdr_buf *xdr,
+ enum rpcrdma_chunktype rtype)
+{
+ req->rl_send_wr.num_sge = 0;
+ req->rl_mapped_sges = 0;
+
+ if (!rpcrdma_prepare_hdr_sge(ia, req, hdrlen))
+ goto out_map;
+
+ if (rtype != rpcrdma_areadch)
+ if (!rpcrdma_prepare_msg_sges(ia, req, xdr, rtype))
+ goto out_map;
+
+ return true;
+
+out_map:
+ pr_err("rpcrdma: failed to DMA map a Send buffer\n");
+ return false;
+}
+
+void
+rpcrdma_unmap_sges(struct rpcrdma_ia *ia, struct rpcrdma_req *req)
+{
+ struct ib_device *device = ia->ri_device;
+ struct ib_sge *sge;
+ int count;
+
+ sge = &req->rl_send_sge[2];
+ for (count = req->rl_mapped_sges; count--; sge++)
+ ib_dma_unmap_page(device, sge->addr, sge->length,
+ DMA_TO_DEVICE);
+ req->rl_mapped_sges = 0;
}
/*
* Marshal a request: the primary job of this routine is to choose
* the transfer modes. See comments below.
*
- * Prepares up to two IOVs per Call message:
- *
- * [0] -- RPC RDMA header
- * [1] -- the RPC header/data
- *
* Returns zero on success, otherwise a negative errno.
*/
*/
if (rpcrdma_args_inline(r_xprt, rqst)) {
rtype = rpcrdma_noch;
- rpcrdma_inline_pullup(rqst);
- rpclen = rqst->rq_svec[0].iov_len;
+ rpclen = rqst->rq_snd_buf.len;
} else if (ddp_allowed && rqst->rq_snd_buf.flags & XDRBUF_WRITE) {
rtype = rpcrdma_readch;
- rpclen = rqst->rq_svec[0].iov_len;
- rpclen += rpcrdma_tail_pullup(&rqst->rq_snd_buf);
+ rpclen = rqst->rq_snd_buf.head[0].iov_len +
+ rqst->rq_snd_buf.tail[0].iov_len;
} else {
r_xprt->rx_stats.nomsg_call_count++;
headerp->rm_type = htonl(RDMA_NOMSG);
goto out_unmap;
hdrlen = (unsigned char *)iptr - (unsigned char *)headerp;
- if (hdrlen + rpclen > r_xprt->rx_data.inline_wsize)
- goto out_overflow;
-
dprintk("RPC: %5u %s: %s/%s: hdrlen %zd rpclen %zd\n",
rqst->rq_task->tk_pid, __func__,
transfertypes[rtype], transfertypes[wtype],
hdrlen, rpclen);
- if (!rpcrdma_dma_map_regbuf(&r_xprt->rx_ia, req->rl_rdmabuf))
- goto out_map;
- req->rl_send_iov[0].addr = rdmab_addr(req->rl_rdmabuf);
- req->rl_send_iov[0].length = hdrlen;
- req->rl_send_iov[0].lkey = rdmab_lkey(req->rl_rdmabuf);
-
- req->rl_send_wr.num_sge = 1;
- if (rtype == rpcrdma_areadch)
- return 0;
-
- if (!rpcrdma_dma_map_regbuf(&r_xprt->rx_ia, req->rl_sendbuf))
- goto out_map;
- req->rl_send_iov[1].addr = rdmab_addr(req->rl_sendbuf);
- req->rl_send_iov[1].length = rpclen;
- req->rl_send_iov[1].lkey = rdmab_lkey(req->rl_sendbuf);
-
- req->rl_send_wr.num_sge = 2;
-
+ if (!rpcrdma_prepare_send_sges(&r_xprt->rx_ia, req, hdrlen,
+ &rqst->rq_snd_buf, rtype)) {
+ iptr = ERR_PTR(-EIO);
+ goto out_unmap;
+ }
return 0;
-out_overflow:
- pr_err("rpcrdma: send overflow: hdrlen %zd rpclen %zu %s/%s\n",
- hdrlen, rpclen, transfertypes[rtype], transfertypes[wtype]);
- iptr = ERR_PTR(-EIO);
-
out_unmap:
r_xprt->rx_ia.ri_ops->ro_unmap_safe(r_xprt, req, false);
return PTR_ERR(iptr);
-
-out_map:
- pr_err("rpcrdma: failed to DMA map a Send buffer\n");
- iptr = ERR_PTR(-EIO);
- goto out_unmap;
}
/*