2 * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
4 * This software is available to you under a choice of one of two
5 * licenses. You may choose to be licensed under the terms of the GNU
6 * General Public License (GPL) Version 2, available from the file
7 * COPYING in the main directory of this source tree, or the BSD-type
10 * Redistribution and use in source and binary forms, with or without
11 * modification, are permitted provided that the following conditions
14 * Redistributions of source code must retain the above copyright
15 * notice, this list of conditions and the following disclaimer.
17 * Redistributions in binary form must reproduce the above
18 * copyright notice, this list of conditions and the following
19 * disclaimer in the documentation and/or other materials provided
20 * with the distribution.
22 * Neither the name of the Network Appliance, Inc. nor the names of
23 * its contributors may be used to endorse or promote products
24 * derived from this software without specific prior written
27 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
28 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
29 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
30 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
31 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
32 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
33 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
34 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
35 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
36 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
37 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
43 * This file contains the guts of the RPC RDMA protocol, and
44 * does marshaling/unmarshaling, etc. It is also where interfacing
45 * to the Linux RPC framework lives.
48 #include "xprt_rdma.h"
50 #include <linux/highmem.h>
52 #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
53 # define RPCDBG_FACILITY RPCDBG_TRANS
56 enum rpcrdma_chunktype
{
64 static const char transfertypes
[][12] = {
65 "inline", /* no chunks */
66 "read list", /* some argument via rdma read */
67 "*read list", /* entire request via rdma read */
68 "write list", /* some result via rdma write */
69 "reply chunk" /* entire reply via rdma write */
72 /* Returns size of largest RPC-over-RDMA header in a Call message
74 * The largest Call header contains a full-size Read list and a
75 * minimal Reply chunk.
77 static unsigned int rpcrdma_max_call_header_size(unsigned int maxsegs
)
81 /* Fixed header fields and list discriminators */
82 size
= RPCRDMA_HDRLEN_MIN
;
84 /* Maximum Read list size */
85 maxsegs
+= 2; /* segment for head and tail buffers */
86 size
= maxsegs
* sizeof(struct rpcrdma_read_chunk
);
88 /* Minimal Read chunk size */
89 size
+= sizeof(__be32
); /* segment count */
90 size
+= sizeof(struct rpcrdma_segment
);
91 size
+= sizeof(__be32
); /* list discriminator */
93 dprintk("RPC: %s: max call header size = %u\n",
98 /* Returns size of largest RPC-over-RDMA header in a Reply message
100 * There is only one Write list or one Reply chunk per Reply
101 * message. The larger list is the Write list.
103 static unsigned int rpcrdma_max_reply_header_size(unsigned int maxsegs
)
107 /* Fixed header fields and list discriminators */
108 size
= RPCRDMA_HDRLEN_MIN
;
110 /* Maximum Write list size */
111 maxsegs
+= 2; /* segment for head and tail buffers */
112 size
= sizeof(__be32
); /* segment count */
113 size
+= maxsegs
* sizeof(struct rpcrdma_segment
);
114 size
+= sizeof(__be32
); /* list discriminator */
116 dprintk("RPC: %s: max reply header size = %u\n",
121 void rpcrdma_set_max_header_sizes(struct rpcrdma_ia
*ia
,
122 struct rpcrdma_create_data_internal
*cdata
,
123 unsigned int maxsegs
)
125 ia
->ri_max_inline_write
= cdata
->inline_wsize
-
126 rpcrdma_max_call_header_size(maxsegs
);
127 ia
->ri_max_inline_read
= cdata
->inline_rsize
-
128 rpcrdma_max_reply_header_size(maxsegs
);
131 /* The client can send a request inline as long as the RPCRDMA header
132 * plus the RPC call fit under the transport's inline limit. If the
133 * combined call message size exceeds that limit, the client must use
134 * the read chunk list for this operation.
136 static bool rpcrdma_args_inline(struct rpcrdma_xprt
*r_xprt
,
137 struct rpc_rqst
*rqst
)
139 struct rpcrdma_ia
*ia
= &r_xprt
->rx_ia
;
141 return rqst
->rq_snd_buf
.len
<= ia
->ri_max_inline_write
;
144 /* The client can't know how large the actual reply will be. Thus it
145 * plans for the largest possible reply for that particular ULP
146 * operation. If the maximum combined reply message size exceeds that
147 * limit, the client must provide a write list or a reply chunk for
150 static bool rpcrdma_results_inline(struct rpcrdma_xprt
*r_xprt
,
151 struct rpc_rqst
*rqst
)
153 struct rpcrdma_ia
*ia
= &r_xprt
->rx_ia
;
155 return rqst
->rq_rcv_buf
.buflen
<= ia
->ri_max_inline_read
;
159 rpcrdma_tail_pullup(struct xdr_buf
*buf
)
161 size_t tlen
= buf
->tail
[0].iov_len
;
162 size_t skip
= tlen
& 3;
164 /* Do not include the tail if it is only an XDR pad */
168 /* xdr_write_pages() adds a pad at the beginning of the tail
169 * if the content in "buf->pages" is unaligned. Force the
170 * tail's actual content to land at the next XDR position
171 * after the head instead.
174 unsigned char *src
, *dst
;
177 src
= buf
->tail
[0].iov_base
;
178 dst
= buf
->head
[0].iov_base
;
179 dst
+= buf
->head
[0].iov_len
;
184 dprintk("RPC: %s: skip=%zu, memmove(%p, %p, %zu)\n",
185 __func__
, skip
, dst
, src
, tlen
);
187 for (count
= tlen
; count
; count
--)
194 /* Split "vec" on page boundaries into segments. FMR registers pages,
195 * not a byte range. Other modes coalesce these segments into a single
199 rpcrdma_convert_kvec(struct kvec
*vec
, struct rpcrdma_mr_seg
*seg
, int n
)
205 base
= vec
->iov_base
;
206 page_offset
= offset_in_page(base
);
207 remaining
= vec
->iov_len
;
208 while (remaining
&& n
< RPCRDMA_MAX_SEGS
) {
209 seg
[n
].mr_page
= NULL
;
210 seg
[n
].mr_offset
= base
;
211 seg
[n
].mr_len
= min_t(u32
, PAGE_SIZE
- page_offset
, remaining
);
212 remaining
-= seg
[n
].mr_len
;
213 base
+= seg
[n
].mr_len
;
221 * Chunk assembly from upper layer xdr_buf.
223 * Prepare the passed-in xdr_buf into representation as RPC/RDMA chunk
224 * elements. Segments are then coalesced when registered, if possible
225 * within the selected memreg mode.
227 * Returns positive number of segments converted, or a negative errno.
231 rpcrdma_convert_iovs(struct xdr_buf
*xdrbuf
, unsigned int pos
,
232 enum rpcrdma_chunktype type
, struct rpcrdma_mr_seg
*seg
)
234 int len
, n
, p
, page_base
;
235 struct page
**ppages
;
239 n
= rpcrdma_convert_kvec(&xdrbuf
->head
[0], seg
, n
);
240 if (n
== RPCRDMA_MAX_SEGS
)
244 len
= xdrbuf
->page_len
;
245 ppages
= xdrbuf
->pages
+ (xdrbuf
->page_base
>> PAGE_SHIFT
);
246 page_base
= xdrbuf
->page_base
& ~PAGE_MASK
;
248 while (len
&& n
< RPCRDMA_MAX_SEGS
) {
250 /* alloc the pagelist for receiving buffer */
251 ppages
[p
] = alloc_page(GFP_ATOMIC
);
255 seg
[n
].mr_page
= ppages
[p
];
256 seg
[n
].mr_offset
= (void *)(unsigned long) page_base
;
257 seg
[n
].mr_len
= min_t(u32
, PAGE_SIZE
- page_base
, len
);
258 if (seg
[n
].mr_len
> PAGE_SIZE
)
260 len
-= seg
[n
].mr_len
;
263 page_base
= 0; /* page offset only applies to first page */
266 /* Message overflows the seg array */
267 if (len
&& n
== RPCRDMA_MAX_SEGS
)
270 /* When encoding the read list, the tail is always sent inline */
271 if (type
== rpcrdma_readch
)
274 if (xdrbuf
->tail
[0].iov_len
) {
275 /* the rpcrdma protocol allows us to omit any trailing
276 * xdr pad bytes, saving the server an RDMA operation. */
277 if (xdrbuf
->tail
[0].iov_len
< 4 && xprt_rdma_pad_optimize
)
279 n
= rpcrdma_convert_kvec(&xdrbuf
->tail
[0], seg
, n
);
280 if (n
== RPCRDMA_MAX_SEGS
)
287 pr_err("rpcrdma: segment array overflow\n");
291 static inline __be32
*
292 xdr_encode_rdma_segment(__be32
*iptr
, struct rpcrdma_mw
*mw
)
294 *iptr
++ = cpu_to_be32(mw
->mw_handle
);
295 *iptr
++ = cpu_to_be32(mw
->mw_length
);
296 return xdr_encode_hyper(iptr
, mw
->mw_offset
);
299 /* XDR-encode the Read list. Supports encoding a list of read
300 * segments that belong to a single read chunk.
302 * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
304 * Read chunklist (a linked list):
305 * N elements, position P (same P for all chunks of same arg!):
306 * 1 - PHLOO - 1 - PHLOO - ... - 1 - PHLOO - 0
308 * Returns a pointer to the XDR word in the RDMA header following
309 * the end of the Read list, or an error pointer.
312 rpcrdma_encode_read_list(struct rpcrdma_xprt
*r_xprt
,
313 struct rpcrdma_req
*req
, struct rpc_rqst
*rqst
,
314 __be32
*iptr
, enum rpcrdma_chunktype rtype
)
316 struct rpcrdma_mr_seg
*seg
;
317 struct rpcrdma_mw
*mw
;
321 if (rtype
== rpcrdma_noch
) {
322 *iptr
++ = xdr_zero
; /* item not present */
326 pos
= rqst
->rq_snd_buf
.head
[0].iov_len
;
327 if (rtype
== rpcrdma_areadch
)
329 seg
= req
->rl_segments
;
330 nsegs
= rpcrdma_convert_iovs(&rqst
->rq_snd_buf
, pos
, rtype
, seg
);
332 return ERR_PTR(nsegs
);
335 n
= r_xprt
->rx_ia
.ri_ops
->ro_map(r_xprt
, seg
, nsegs
,
339 list_add(&mw
->mw_list
, &req
->rl_registered
);
341 *iptr
++ = xdr_one
; /* item present */
343 /* All read segments in this chunk
344 * have the same "position".
346 *iptr
++ = cpu_to_be32(pos
);
347 iptr
= xdr_encode_rdma_segment(iptr
, mw
);
349 dprintk("RPC: %5u %s: pos %u %u@0x%016llx:0x%08x (%s)\n",
350 rqst
->rq_task
->tk_pid
, __func__
, pos
,
351 mw
->mw_length
, (unsigned long long)mw
->mw_offset
,
352 mw
->mw_handle
, n
< nsegs
? "more" : "last");
354 r_xprt
->rx_stats
.read_chunk_count
++;
359 /* Finish Read list */
360 *iptr
++ = xdr_zero
; /* Next item not present */
364 /* XDR-encode the Write list. Supports encoding a list containing
365 * one array of plain segments that belong to a single write chunk.
367 * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
369 * Write chunklist (a list of (one) counted array):
371 * 1 - N - HLOO - HLOO - ... - HLOO - 0
373 * Returns a pointer to the XDR word in the RDMA header following
374 * the end of the Write list, or an error pointer.
377 rpcrdma_encode_write_list(struct rpcrdma_xprt
*r_xprt
, struct rpcrdma_req
*req
,
378 struct rpc_rqst
*rqst
, __be32
*iptr
,
379 enum rpcrdma_chunktype wtype
)
381 struct rpcrdma_mr_seg
*seg
;
382 struct rpcrdma_mw
*mw
;
383 int n
, nsegs
, nchunks
;
386 if (wtype
!= rpcrdma_writech
) {
387 *iptr
++ = xdr_zero
; /* no Write list present */
391 seg
= req
->rl_segments
;
392 nsegs
= rpcrdma_convert_iovs(&rqst
->rq_rcv_buf
,
393 rqst
->rq_rcv_buf
.head
[0].iov_len
,
396 return ERR_PTR(nsegs
);
398 *iptr
++ = xdr_one
; /* Write list present */
399 segcount
= iptr
++; /* save location of segment count */
403 n
= r_xprt
->rx_ia
.ri_ops
->ro_map(r_xprt
, seg
, nsegs
,
407 list_add(&mw
->mw_list
, &req
->rl_registered
);
409 iptr
= xdr_encode_rdma_segment(iptr
, mw
);
411 dprintk("RPC: %5u %s: %u@0x016%llx:0x%08x (%s)\n",
412 rqst
->rq_task
->tk_pid
, __func__
,
413 mw
->mw_length
, (unsigned long long)mw
->mw_offset
,
414 mw
->mw_handle
, n
< nsegs
? "more" : "last");
416 r_xprt
->rx_stats
.write_chunk_count
++;
417 r_xprt
->rx_stats
.total_rdma_request
+= seg
->mr_len
;
423 /* Update count of segments in this Write chunk */
424 *segcount
= cpu_to_be32(nchunks
);
426 /* Finish Write list */
427 *iptr
++ = xdr_zero
; /* Next item not present */
431 /* XDR-encode the Reply chunk. Supports encoding an array of plain
432 * segments that belong to a single write (reply) chunk.
434 * Encoding key for single-list chunks (HLOO = Handle32 Length32 Offset64):
436 * Reply chunk (a counted array):
438 * 1 - N - HLOO - HLOO - ... - HLOO
440 * Returns a pointer to the XDR word in the RDMA header following
441 * the end of the Reply chunk, or an error pointer.
444 rpcrdma_encode_reply_chunk(struct rpcrdma_xprt
*r_xprt
,
445 struct rpcrdma_req
*req
, struct rpc_rqst
*rqst
,
446 __be32
*iptr
, enum rpcrdma_chunktype wtype
)
448 struct rpcrdma_mr_seg
*seg
;
449 struct rpcrdma_mw
*mw
;
450 int n
, nsegs
, nchunks
;
453 if (wtype
!= rpcrdma_replych
) {
454 *iptr
++ = xdr_zero
; /* no Reply chunk present */
458 seg
= req
->rl_segments
;
459 nsegs
= rpcrdma_convert_iovs(&rqst
->rq_rcv_buf
, 0, wtype
, seg
);
461 return ERR_PTR(nsegs
);
463 *iptr
++ = xdr_one
; /* Reply chunk present */
464 segcount
= iptr
++; /* save location of segment count */
468 n
= r_xprt
->rx_ia
.ri_ops
->ro_map(r_xprt
, seg
, nsegs
,
472 list_add(&mw
->mw_list
, &req
->rl_registered
);
474 iptr
= xdr_encode_rdma_segment(iptr
, mw
);
476 dprintk("RPC: %5u %s: %u@0x%016llx:0x%08x (%s)\n",
477 rqst
->rq_task
->tk_pid
, __func__
,
478 mw
->mw_length
, (unsigned long long)mw
->mw_offset
,
479 mw
->mw_handle
, n
< nsegs
? "more" : "last");
481 r_xprt
->rx_stats
.reply_chunk_count
++;
482 r_xprt
->rx_stats
.total_rdma_request
+= seg
->mr_len
;
488 /* Update count of segments in the Reply chunk */
489 *segcount
= cpu_to_be32(nchunks
);
495 * Copy write data inline.
496 * This function is used for "small" requests. Data which is passed
497 * to RPC via iovecs (or page list) is copied directly into the
498 * pre-registered memory buffer for this request. For small amounts
499 * of data, this is efficient. The cutoff value is tunable.
501 static void rpcrdma_inline_pullup(struct rpc_rqst
*rqst
)
503 int i
, npages
, curlen
;
505 unsigned char *srcp
, *destp
;
506 struct rpcrdma_xprt
*r_xprt
= rpcx_to_rdmax(rqst
->rq_xprt
);
508 struct page
**ppages
;
510 destp
= rqst
->rq_svec
[0].iov_base
;
511 curlen
= rqst
->rq_svec
[0].iov_len
;
514 dprintk("RPC: %s: destp 0x%p len %d hdrlen %d\n",
515 __func__
, destp
, rqst
->rq_slen
, curlen
);
517 copy_len
= rqst
->rq_snd_buf
.page_len
;
519 if (rqst
->rq_snd_buf
.tail
[0].iov_len
) {
520 curlen
= rqst
->rq_snd_buf
.tail
[0].iov_len
;
521 if (destp
+ copy_len
!= rqst
->rq_snd_buf
.tail
[0].iov_base
) {
522 memmove(destp
+ copy_len
,
523 rqst
->rq_snd_buf
.tail
[0].iov_base
, curlen
);
524 r_xprt
->rx_stats
.pullup_copy_count
+= curlen
;
526 dprintk("RPC: %s: tail destp 0x%p len %d\n",
527 __func__
, destp
+ copy_len
, curlen
);
528 rqst
->rq_svec
[0].iov_len
+= curlen
;
530 r_xprt
->rx_stats
.pullup_copy_count
+= copy_len
;
532 page_base
= rqst
->rq_snd_buf
.page_base
;
533 ppages
= rqst
->rq_snd_buf
.pages
+ (page_base
>> PAGE_SHIFT
);
534 page_base
&= ~PAGE_MASK
;
535 npages
= PAGE_ALIGN(page_base
+copy_len
) >> PAGE_SHIFT
;
536 for (i
= 0; copy_len
&& i
< npages
; i
++) {
537 curlen
= PAGE_SIZE
- page_base
;
538 if (curlen
> copy_len
)
540 dprintk("RPC: %s: page %d destp 0x%p len %d curlen %d\n",
541 __func__
, i
, destp
, copy_len
, curlen
);
542 srcp
= kmap_atomic(ppages
[i
]);
543 memcpy(destp
, srcp
+page_base
, curlen
);
545 rqst
->rq_svec
[0].iov_len
+= curlen
;
550 /* header now contains entire send message */
554 * Marshal a request: the primary job of this routine is to choose
555 * the transfer modes. See comments below.
557 * Prepares up to two IOVs per Call message:
559 * [0] -- RPC RDMA header
560 * [1] -- the RPC header/data
562 * Returns zero on success, otherwise a negative errno.
566 rpcrdma_marshal_req(struct rpc_rqst
*rqst
)
568 struct rpc_xprt
*xprt
= rqst
->rq_xprt
;
569 struct rpcrdma_xprt
*r_xprt
= rpcx_to_rdmax(xprt
);
570 struct rpcrdma_req
*req
= rpcr_to_rdmar(rqst
);
571 enum rpcrdma_chunktype rtype
, wtype
;
572 struct rpcrdma_msg
*headerp
;
578 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
579 if (test_bit(RPC_BC_PA_IN_USE
, &rqst
->rq_bc_pa_state
))
580 return rpcrdma_bc_marshal_reply(rqst
);
583 headerp
= rdmab_to_msg(req
->rl_rdmabuf
);
584 /* don't byte-swap XID, it's already done in request */
585 headerp
->rm_xid
= rqst
->rq_xid
;
586 headerp
->rm_vers
= rpcrdma_version
;
587 headerp
->rm_credit
= cpu_to_be32(r_xprt
->rx_buf
.rb_max_requests
);
588 headerp
->rm_type
= rdma_msg
;
590 /* When the ULP employs a GSS flavor that guarantees integrity
591 * or privacy, direct data placement of individual data items
594 ddp_allowed
= !(rqst
->rq_cred
->cr_auth
->au_flags
&
595 RPCAUTH_AUTH_DATATOUCH
);
598 * Chunks needed for results?
600 * o If the expected result is under the inline threshold, all ops
602 * o Large read ops return data as write chunk(s), header as
604 * o Large non-read ops return as a single reply chunk.
606 if (rpcrdma_results_inline(r_xprt
, rqst
))
607 wtype
= rpcrdma_noch
;
608 else if (ddp_allowed
&& rqst
->rq_rcv_buf
.flags
& XDRBUF_READ
)
609 wtype
= rpcrdma_writech
;
611 wtype
= rpcrdma_replych
;
614 * Chunks needed for arguments?
616 * o If the total request is under the inline threshold, all ops
617 * are sent as inline.
618 * o Large write ops transmit data as read chunk(s), header as
620 * o Large non-write ops are sent with the entire message as a
621 * single read chunk (protocol 0-position special case).
623 * This assumes that the upper layer does not present a request
624 * that both has a data payload, and whose non-data arguments
625 * by themselves are larger than the inline threshold.
627 if (rpcrdma_args_inline(r_xprt
, rqst
)) {
628 rtype
= rpcrdma_noch
;
629 rpcrdma_inline_pullup(rqst
);
630 rpclen
= rqst
->rq_svec
[0].iov_len
;
631 } else if (ddp_allowed
&& rqst
->rq_snd_buf
.flags
& XDRBUF_WRITE
) {
632 rtype
= rpcrdma_readch
;
633 rpclen
= rqst
->rq_svec
[0].iov_len
;
634 rpclen
+= rpcrdma_tail_pullup(&rqst
->rq_snd_buf
);
636 r_xprt
->rx_stats
.nomsg_call_count
++;
637 headerp
->rm_type
= htonl(RDMA_NOMSG
);
638 rtype
= rpcrdma_areadch
;
642 /* This implementation supports the following combinations
643 * of chunk lists in one RPC-over-RDMA Call message:
648 * - Read list + Reply chunk
650 * It might not yet support the following combinations:
652 * - Read list + Write list
654 * It does not support the following combinations:
656 * - Write list + Reply chunk
657 * - Read list + Write list + Reply chunk
659 * This implementation supports only a single chunk in each
660 * Read or Write list. Thus for example the client cannot
661 * send a Call message with a Position Zero Read chunk and a
662 * regular Read chunk at the same time.
664 iptr
= headerp
->rm_body
.rm_chunks
;
665 iptr
= rpcrdma_encode_read_list(r_xprt
, req
, rqst
, iptr
, rtype
);
668 iptr
= rpcrdma_encode_write_list(r_xprt
, req
, rqst
, iptr
, wtype
);
671 iptr
= rpcrdma_encode_reply_chunk(r_xprt
, req
, rqst
, iptr
, wtype
);
674 hdrlen
= (unsigned char *)iptr
- (unsigned char *)headerp
;
676 if (hdrlen
+ rpclen
> RPCRDMA_INLINE_WRITE_THRESHOLD(rqst
))
679 dprintk("RPC: %5u %s: %s/%s: hdrlen %zd rpclen %zd\n",
680 rqst
->rq_task
->tk_pid
, __func__
,
681 transfertypes
[rtype
], transfertypes
[wtype
],
684 req
->rl_send_iov
[0].addr
= rdmab_addr(req
->rl_rdmabuf
);
685 req
->rl_send_iov
[0].length
= hdrlen
;
686 req
->rl_send_iov
[0].lkey
= rdmab_lkey(req
->rl_rdmabuf
);
689 if (rtype
== rpcrdma_areadch
)
692 req
->rl_send_iov
[1].addr
= rdmab_addr(req
->rl_sendbuf
);
693 req
->rl_send_iov
[1].length
= rpclen
;
694 req
->rl_send_iov
[1].lkey
= rdmab_lkey(req
->rl_sendbuf
);
700 pr_err("rpcrdma: send overflow: hdrlen %zd rpclen %zu %s/%s\n",
701 hdrlen
, rpclen
, transfertypes
[rtype
], transfertypes
[wtype
]);
702 iptr
= ERR_PTR(-EIO
);
705 r_xprt
->rx_ia
.ri_ops
->ro_unmap_safe(r_xprt
, req
, false);
706 return PTR_ERR(iptr
);
710 * Chase down a received write or reply chunklist to get length
711 * RDMA'd by server. See map at rpcrdma_create_chunks()! :-)
714 rpcrdma_count_chunks(struct rpcrdma_rep
*rep
, int wrchunk
, __be32
**iptrp
)
716 unsigned int i
, total_len
;
717 struct rpcrdma_write_chunk
*cur_wchunk
;
718 char *base
= (char *)rdmab_to_msg(rep
->rr_rdmabuf
);
720 i
= be32_to_cpu(**iptrp
);
721 cur_wchunk
= (struct rpcrdma_write_chunk
*) (*iptrp
+ 1);
724 struct rpcrdma_segment
*seg
= &cur_wchunk
->wc_target
;
727 xdr_decode_hyper((__be32
*)&seg
->rs_offset
, &off
);
728 dprintk("RPC: %s: chunk %d@0x%llx:0x%x\n",
730 be32_to_cpu(seg
->rs_length
),
731 (unsigned long long)off
,
732 be32_to_cpu(seg
->rs_handle
));
734 total_len
+= be32_to_cpu(seg
->rs_length
);
737 /* check and adjust for properly terminated write chunk */
739 __be32
*w
= (__be32
*) cur_wchunk
;
740 if (*w
++ != xdr_zero
)
742 cur_wchunk
= (struct rpcrdma_write_chunk
*) w
;
744 if ((char *)cur_wchunk
> base
+ rep
->rr_len
)
747 *iptrp
= (__be32
*) cur_wchunk
;
752 * rpcrdma_inline_fixup - Scatter inline received data into rqst's iovecs
753 * @rqst: controlling RPC request
754 * @srcp: points to RPC message payload in receive buffer
755 * @copy_len: remaining length of receive buffer content
756 * @pad: Write chunk pad bytes needed (zero for pure inline)
758 * The upper layer has set the maximum number of bytes it can
759 * receive in each component of rq_rcv_buf. These values are set in
760 * the head.iov_len, page_len, tail.iov_len, and buflen fields.
762 * Unlike the TCP equivalent (xdr_partial_copy_from_skb), in
763 * many cases this function simply updates iov_base pointers in
764 * rq_rcv_buf to point directly to the received reply data, to
765 * avoid copying reply data.
767 * Returns the count of bytes which had to be memcopied.
770 rpcrdma_inline_fixup(struct rpc_rqst
*rqst
, char *srcp
, int copy_len
, int pad
)
772 unsigned long fixup_copy_count
;
773 int i
, npages
, curlen
;
775 struct page
**ppages
;
778 /* The head iovec is redirected to the RPC reply message
779 * in the receive buffer, to avoid a memcopy.
781 rqst
->rq_rcv_buf
.head
[0].iov_base
= srcp
;
782 rqst
->rq_private_buf
.head
[0].iov_base
= srcp
;
784 /* The contents of the receive buffer that follow
785 * head.iov_len bytes are copied into the page list.
787 curlen
= rqst
->rq_rcv_buf
.head
[0].iov_len
;
788 if (curlen
> copy_len
)
790 dprintk("RPC: %s: srcp 0x%p len %d hdrlen %d\n",
791 __func__
, srcp
, copy_len
, curlen
);
795 page_base
= rqst
->rq_rcv_buf
.page_base
;
796 ppages
= rqst
->rq_rcv_buf
.pages
+ (page_base
>> PAGE_SHIFT
);
797 page_base
&= ~PAGE_MASK
;
798 fixup_copy_count
= 0;
799 if (copy_len
&& rqst
->rq_rcv_buf
.page_len
) {
802 pagelist_len
= rqst
->rq_rcv_buf
.page_len
;
803 if (pagelist_len
> copy_len
)
804 pagelist_len
= copy_len
;
805 npages
= PAGE_ALIGN(page_base
+ pagelist_len
) >> PAGE_SHIFT
;
806 for (i
= 0; i
< npages
; i
++) {
807 curlen
= PAGE_SIZE
- page_base
;
808 if (curlen
> pagelist_len
)
809 curlen
= pagelist_len
;
811 dprintk("RPC: %s: page %d"
812 " srcp 0x%p len %d curlen %d\n",
813 __func__
, i
, srcp
, copy_len
, curlen
);
814 destp
= kmap_atomic(ppages
[i
]);
815 memcpy(destp
+ page_base
, srcp
, curlen
);
816 flush_dcache_page(ppages
[i
]);
817 kunmap_atomic(destp
);
820 fixup_copy_count
+= curlen
;
821 pagelist_len
-= curlen
;
827 /* Implicit padding for the last segment in a Write
828 * chunk is inserted inline at the front of the tail
829 * iovec. The upper layer ignores the content of
830 * the pad. Simply ensure inline content in the tail
831 * that follows the Write chunk is properly aligned.
837 /* The tail iovec is redirected to the remaining data
838 * in the receive buffer, to avoid a memcopy.
840 if (copy_len
|| pad
) {
841 rqst
->rq_rcv_buf
.tail
[0].iov_base
= srcp
;
842 rqst
->rq_private_buf
.tail
[0].iov_base
= srcp
;
845 return fixup_copy_count
;
849 rpcrdma_connect_worker(struct work_struct
*work
)
851 struct rpcrdma_ep
*ep
=
852 container_of(work
, struct rpcrdma_ep
, rep_connect_worker
.work
);
853 struct rpcrdma_xprt
*r_xprt
=
854 container_of(ep
, struct rpcrdma_xprt
, rx_ep
);
855 struct rpc_xprt
*xprt
= &r_xprt
->rx_xprt
;
857 spin_lock_bh(&xprt
->transport_lock
);
858 if (++xprt
->connect_cookie
== 0) /* maintain a reserved value */
859 ++xprt
->connect_cookie
;
860 if (ep
->rep_connected
> 0) {
861 if (!xprt_test_and_set_connected(xprt
))
862 xprt_wake_pending_tasks(xprt
, 0);
864 if (xprt_test_and_clear_connected(xprt
))
865 xprt_wake_pending_tasks(xprt
, -ENOTCONN
);
867 spin_unlock_bh(&xprt
->transport_lock
);
870 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
871 /* By convention, backchannel calls arrive via rdma_msg type
872 * messages, and never populate the chunk lists. This makes
873 * the RPC/RDMA header small and fixed in size, so it is
874 * straightforward to check the RPC header's direction field.
877 rpcrdma_is_bcall(struct rpcrdma_msg
*headerp
)
879 __be32
*p
= (__be32
*)headerp
;
881 if (headerp
->rm_type
!= rdma_msg
)
883 if (headerp
->rm_body
.rm_chunks
[0] != xdr_zero
)
885 if (headerp
->rm_body
.rm_chunks
[1] != xdr_zero
)
887 if (headerp
->rm_body
.rm_chunks
[2] != xdr_zero
)
891 if (p
[7] != headerp
->rm_xid
)
894 if (p
[8] != cpu_to_be32(RPC_CALL
))
899 #endif /* CONFIG_SUNRPC_BACKCHANNEL */
902 * This function is called when an async event is posted to
903 * the connection which changes the connection state. All it
904 * does at this point is mark the connection up/down, the rpc
905 * timers do the rest.
908 rpcrdma_conn_func(struct rpcrdma_ep
*ep
)
910 schedule_delayed_work(&ep
->rep_connect_worker
, 0);
913 /* Process received RPC/RDMA messages.
915 * Errors must result in the RPC task either being awakened, or
916 * allowed to timeout, to discover the errors at that time.
919 rpcrdma_reply_handler(struct rpcrdma_rep
*rep
)
921 struct rpcrdma_msg
*headerp
;
922 struct rpcrdma_req
*req
;
923 struct rpc_rqst
*rqst
;
924 struct rpcrdma_xprt
*r_xprt
= rep
->rr_rxprt
;
925 struct rpc_xprt
*xprt
= &r_xprt
->rx_xprt
;
927 int rdmalen
, status
, rmerr
;
930 dprintk("RPC: %s: incoming rep %p\n", __func__
, rep
);
932 if (rep
->rr_len
== RPCRDMA_BAD_LEN
)
934 if (rep
->rr_len
< RPCRDMA_HDRLEN_ERR
)
937 headerp
= rdmab_to_msg(rep
->rr_rdmabuf
);
938 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
939 if (rpcrdma_is_bcall(headerp
))
943 /* Match incoming rpcrdma_rep to an rpcrdma_req to
944 * get context for handling any incoming chunks.
946 spin_lock_bh(&xprt
->transport_lock
);
947 rqst
= xprt_lookup_rqst(xprt
, headerp
->rm_xid
);
951 req
= rpcr_to_rdmar(rqst
);
955 /* Sanity checking has passed. We are now committed
956 * to complete this transaction.
958 list_del_init(&rqst
->rq_list
);
959 spin_unlock_bh(&xprt
->transport_lock
);
960 dprintk("RPC: %s: reply %p completes request %p (xid 0x%08x)\n",
961 __func__
, rep
, req
, be32_to_cpu(headerp
->rm_xid
));
963 /* from here on, the reply is no longer an orphan */
965 xprt
->reestablish_timeout
= 0;
967 if (headerp
->rm_vers
!= rpcrdma_version
)
970 /* check for expected message types */
971 /* The order of some of these tests is important. */
972 switch (headerp
->rm_type
) {
974 /* never expect read chunks */
975 /* never expect reply chunks (two ways to check) */
976 /* never expect write chunks without having offered RDMA */
977 if (headerp
->rm_body
.rm_chunks
[0] != xdr_zero
||
978 (headerp
->rm_body
.rm_chunks
[1] == xdr_zero
&&
979 headerp
->rm_body
.rm_chunks
[2] != xdr_zero
) ||
980 (headerp
->rm_body
.rm_chunks
[1] != xdr_zero
&&
981 list_empty(&req
->rl_registered
)))
983 if (headerp
->rm_body
.rm_chunks
[1] != xdr_zero
) {
984 /* count any expected write chunks in read reply */
985 /* start at write chunk array count */
986 iptr
= &headerp
->rm_body
.rm_chunks
[2];
987 rdmalen
= rpcrdma_count_chunks(rep
, 1, &iptr
);
988 /* check for validity, and no reply chunk after */
989 if (rdmalen
< 0 || *iptr
++ != xdr_zero
)
992 ((unsigned char *)iptr
- (unsigned char *)headerp
);
993 status
= rep
->rr_len
+ rdmalen
;
994 r_xprt
->rx_stats
.total_rdma_reply
+= rdmalen
;
995 /* special case - last chunk may omit padding */
997 rdmalen
= 4 - rdmalen
;
1001 /* else ordinary inline */
1003 iptr
= (__be32
*)((unsigned char *)headerp
+
1004 RPCRDMA_HDRLEN_MIN
);
1005 rep
->rr_len
-= RPCRDMA_HDRLEN_MIN
;
1006 status
= rep
->rr_len
;
1009 r_xprt
->rx_stats
.fixup_copy_count
+=
1010 rpcrdma_inline_fixup(rqst
, (char *)iptr
, rep
->rr_len
,
1015 /* never expect read or write chunks, always reply chunks */
1016 if (headerp
->rm_body
.rm_chunks
[0] != xdr_zero
||
1017 headerp
->rm_body
.rm_chunks
[1] != xdr_zero
||
1018 headerp
->rm_body
.rm_chunks
[2] != xdr_one
||
1019 list_empty(&req
->rl_registered
))
1021 iptr
= (__be32
*)((unsigned char *)headerp
+
1022 RPCRDMA_HDRLEN_MIN
);
1023 rdmalen
= rpcrdma_count_chunks(rep
, 0, &iptr
);
1026 r_xprt
->rx_stats
.total_rdma_reply
+= rdmalen
;
1027 /* Reply chunk buffer already is the reply vector - no fixup. */
1036 dprintk("RPC: %5u %s: invalid rpcrdma reply (type %u)\n",
1037 rqst
->rq_task
->tk_pid
, __func__
,
1038 be32_to_cpu(headerp
->rm_type
));
1040 r_xprt
->rx_stats
.bad_reply_count
++;
1045 /* Invalidate and flush the data payloads before waking the
1046 * waiting application. This guarantees the memory region is
1047 * properly fenced from the server before the application
1048 * accesses the data. It also ensures proper send flow
1049 * control: waking the next RPC waits until this RPC has
1050 * relinquished all its Send Queue entries.
1052 if (!list_empty(&req
->rl_registered
))
1053 r_xprt
->rx_ia
.ri_ops
->ro_unmap_sync(r_xprt
, req
);
1055 spin_lock_bh(&xprt
->transport_lock
);
1057 xprt
->cwnd
= atomic_read(&r_xprt
->rx_buf
.rb_credits
) << RPC_CWNDSHIFT
;
1058 if (xprt
->cwnd
> cwnd
)
1059 xprt_release_rqst_cong(rqst
->rq_task
);
1061 xprt_complete_rqst(rqst
->rq_task
, status
);
1062 spin_unlock_bh(&xprt
->transport_lock
);
1063 dprintk("RPC: %s: xprt_complete_rqst(0x%p, 0x%p, %d)\n",
1064 __func__
, xprt
, rqst
, status
);
1068 rpcrdma_recv_buffer_put(rep
);
1069 if (r_xprt
->rx_ep
.rep_connected
== 1) {
1070 r_xprt
->rx_ep
.rep_connected
= -EIO
;
1071 rpcrdma_conn_func(&r_xprt
->rx_ep
);
1075 #if defined(CONFIG_SUNRPC_BACKCHANNEL)
1077 rpcrdma_bc_receive_call(r_xprt
, rep
);
1081 /* If the incoming reply terminated a pending RPC, the next
1082 * RPC call will post a replacement receive buffer as it is
1086 dprintk("RPC: %s: invalid version %d\n",
1087 __func__
, be32_to_cpu(headerp
->rm_vers
));
1089 r_xprt
->rx_stats
.bad_reply_count
++;
1093 rmerr
= be32_to_cpu(headerp
->rm_body
.rm_error
.rm_err
);
1096 pr_err("%s: server reports header version error (%u-%u)\n",
1098 be32_to_cpu(headerp
->rm_body
.rm_error
.rm_vers_low
),
1099 be32_to_cpu(headerp
->rm_body
.rm_error
.rm_vers_high
));
1102 pr_err("%s: server reports header decoding error\n",
1106 pr_err("%s: server reports unknown error %d\n",
1109 status
= -EREMOTEIO
;
1110 r_xprt
->rx_stats
.bad_reply_count
++;
1113 /* If no pending RPC transaction was matched, post a replacement
1114 * receive buffer before returning.
1117 dprintk("RPC: %s: short/invalid reply\n", __func__
);
1121 spin_unlock_bh(&xprt
->transport_lock
);
1122 dprintk("RPC: %s: no match for incoming xid 0x%08x len %d\n",
1123 __func__
, be32_to_cpu(headerp
->rm_xid
),
1128 spin_unlock_bh(&xprt
->transport_lock
);
1130 "duplicate reply %p to RPC request %p: xid 0x%08x\n",
1131 __func__
, rep
, req
, be32_to_cpu(headerp
->rm_xid
));
1134 r_xprt
->rx_stats
.bad_reply_count
++;
1135 if (rpcrdma_ep_post_recv(&r_xprt
->rx_ia
, &r_xprt
->rx_ep
, rep
))
1136 rpcrdma_recv_buffer_put(rep
);