]> git.ipfire.org Git - thirdparty/linux.git/commitdiff
svcrdma: Support multiple write chunks when pulling up
authorChuck Lever <chuck.lever@oracle.com>
Fri, 13 Mar 2020 14:42:11 +0000 (10:42 -0400)
committerChuck Lever <chuck.lever@oracle.com>
Mon, 30 Nov 2020 18:00:22 +0000 (13:00 -0500)
When counting the number of SGEs needed to construct a Send request,
do not count result payloads. And, when copying the Reply message
into the pull-up buffer, result payloads are not to be copied to the
Send buffer.

Signed-off-by: Chuck Lever <chuck.lever@oracle.com>
include/linux/sunrpc/svc_rdma.h
include/trace/events/rpcrdma.h
net/sunrpc/xprtrdma/svc_rdma_backchannel.c
net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
net/sunrpc/xprtrdma/svc_rdma_sendto.c

index d9148787efff7bc62547c50d62afd81b99547cf2..7090af1a9791db4f7f4de0e23f7b59c37b7acebf 100644 (file)
@@ -182,6 +182,8 @@ extern void svc_rdma_handle_bc_reply(struct svc_rqst *rqstp,
 /* svc_rdma_recvfrom.c */
 extern void svc_rdma_recv_ctxts_destroy(struct svcxprt_rdma *rdma);
 extern bool svc_rdma_post_recvs(struct svcxprt_rdma *rdma);
+extern struct svc_rdma_recv_ctxt *
+               svc_rdma_recv_ctxt_get(struct svcxprt_rdma *rdma);
 extern void svc_rdma_recv_ctxt_put(struct svcxprt_rdma *rdma,
                                   struct svc_rdma_recv_ctxt *ctxt);
 extern void svc_rdma_flush_recv_queues(struct svcxprt_rdma *rdma);
index 5218e0f9596acd8ee114e42ab7e8cb2e319f4be8..afc58accb9cf08e0b48fb6c10d273a1b27277ab4 100644 (file)
@@ -1805,20 +1805,30 @@ TRACE_EVENT(svcrdma_small_wrch_err,
 
 TRACE_EVENT(svcrdma_send_pullup,
        TP_PROTO(
-               unsigned int len
+               const struct svc_rdma_send_ctxt *ctxt,
+               unsigned int msglen
        ),
 
-       TP_ARGS(len),
+       TP_ARGS(ctxt, msglen),
 
        TP_STRUCT__entry(
-               __field(unsigned int, len)
+               __field(u32, cq_id)
+               __field(int, completion_id)
+               __field(unsigned int, hdrlen)
+               __field(unsigned int, msglen)
        ),
 
        TP_fast_assign(
-               __entry->len = len;
+               __entry->cq_id = ctxt->sc_cid.ci_queue_id;
+               __entry->completion_id = ctxt->sc_cid.ci_completion_id;
+               __entry->hdrlen = ctxt->sc_hdrbuf.len,
+               __entry->msglen = msglen;
        ),
 
-       TP_printk("len=%u", __entry->len)
+       TP_printk("cq_id=%u cid=%d hdr=%u msg=%u (total %u)",
+               __entry->cq_id, __entry->completion_id,
+               __entry->hdrlen, __entry->msglen,
+               __entry->hdrlen + __entry->msglen)
 );
 
 TRACE_EVENT(svcrdma_send_err,
index 5e7c4ba9e1476c3b12e843da96bffa1867f01cf6..63f8be974df2033d92a4008e9472b3a5269c87c9 100644 (file)
@@ -74,11 +74,17 @@ out_unlock:
  */
 static int svc_rdma_bc_sendto(struct svcxprt_rdma *rdma,
                              struct rpc_rqst *rqst,
-                             struct svc_rdma_send_ctxt *ctxt)
+                             struct svc_rdma_send_ctxt *sctxt)
 {
+       struct svc_rdma_recv_ctxt *rctxt;
        int ret;
 
-       ret = svc_rdma_map_reply_msg(rdma, ctxt, NULL, &rqst->rq_snd_buf);
+       rctxt = svc_rdma_recv_ctxt_get(rdma);
+       if (!rctxt)
+               return -EIO;
+
+       ret = svc_rdma_map_reply_msg(rdma, sctxt, rctxt, &rqst->rq_snd_buf);
+       svc_rdma_recv_ctxt_put(rdma, rctxt);
        if (ret < 0)
                return -EIO;
 
@@ -86,8 +92,8 @@ static int svc_rdma_bc_sendto(struct svcxprt_rdma *rdma,
         * the rq_buffer before all retransmits are complete.
         */
        get_page(virt_to_page(rqst->rq_buffer));
-       ctxt->sc_send_wr.opcode = IB_WR_SEND;
-       return svc_rdma_send(rdma, ctxt);
+       sctxt->sc_send_wr.opcode = IB_WR_SEND;
+       return svc_rdma_send(rdma, sctxt);
 }
 
 /* Server-side transport endpoint wants a whole page for its send
index 7d44e9d2e7a3c956f903555332988e5f2b2ac607..af32c3ad45a6eb0f4aab4497983a52ff9c0b4371 100644 (file)
@@ -194,8 +194,13 @@ void svc_rdma_recv_ctxts_destroy(struct svcxprt_rdma *rdma)
        }
 }
 
-static struct svc_rdma_recv_ctxt *
-svc_rdma_recv_ctxt_get(struct svcxprt_rdma *rdma)
+/**
+ * svc_rdma_recv_ctxt_get - Allocate a recv_ctxt
+ * @rdma: controlling svcxprt_rdma
+ *
+ * Returns a recv_ctxt or (rarely) NULL if none are available.
+ */
+struct svc_rdma_recv_ctxt *svc_rdma_recv_ctxt_get(struct svcxprt_rdma *rdma)
 {
        struct svc_rdma_recv_ctxt *ctxt;
        struct llist_node *node;
index 1fdbbad3f7dc436e0561409af3e524710ad29218..50f0216fb08cc32d8117919193a5c5fdd8bdea36 100644 (file)
@@ -531,6 +531,45 @@ static int svc_rdma_dma_map_buf(struct svcxprt_rdma *rdma,
                                     offset_in_page(base), len);
 }
 
+struct svc_rdma_pullup_data {
+       u8              *pd_dest;
+       unsigned int    pd_length;
+       unsigned int    pd_num_sges;
+};
+
+/**
+ * svc_rdma_xb_count_sges - Count how many SGEs will be needed
+ * @xdr: xdr_buf containing portion of an RPC message to transmit
+ * @data: pointer to arguments
+ *
+ * Returns:
+ *   Number of SGEs needed to Send the contents of @xdr inline
+ */
+static int svc_rdma_xb_count_sges(const struct xdr_buf *xdr,
+                                 void *data)
+{
+       struct svc_rdma_pullup_data *args = data;
+       unsigned int remaining;
+       unsigned long offset;
+
+       if (xdr->head[0].iov_len)
+               ++args->pd_num_sges;
+
+       offset = offset_in_page(xdr->page_base);
+       remaining = xdr->page_len;
+       while (remaining) {
+               ++args->pd_num_sges;
+               remaining -= min_t(u32, PAGE_SIZE - offset, remaining);
+               offset = 0;
+       }
+
+       if (xdr->tail[0].iov_len)
+               ++args->pd_num_sges;
+
+       args->pd_length += xdr->len;
+       return 0;
+}
+
 /**
  * svc_rdma_pull_up_needed - Determine whether to use pull-up
  * @rdma: controlling transport
@@ -539,50 +578,71 @@ static int svc_rdma_dma_map_buf(struct svcxprt_rdma *rdma,
  * @xdr: xdr_buf containing RPC message to transmit
  *
  * Returns:
- *     %true if pull-up must be used
- *     %false otherwise
+ *   %true if pull-up must be used
+ *   %false otherwise
  */
-static bool svc_rdma_pull_up_needed(struct svcxprt_rdma *rdma,
-                                   struct svc_rdma_send_ctxt *sctxt,
+static bool svc_rdma_pull_up_needed(const struct svcxprt_rdma *rdma,
+                                   const struct svc_rdma_send_ctxt *sctxt,
                                    const struct svc_rdma_recv_ctxt *rctxt,
-                                   struct xdr_buf *xdr)
+                                   const struct xdr_buf *xdr)
 {
-       bool write_chunk_present = rctxt && rctxt->rc_write_list;
-       int elements;
+       /* Resources needed for the transport header */
+       struct svc_rdma_pullup_data args = {
+               .pd_length      = sctxt->sc_hdrbuf.len,
+               .pd_num_sges    = 1,
+       };
+       int ret;
 
-       /* For small messages, copying bytes is cheaper than DMA mapping.
-        */
-       if (!write_chunk_present &&
-           sctxt->sc_hdrbuf.len + xdr->len < RPCRDMA_PULLUP_THRESH)
+       ret = pcl_process_nonpayloads(&rctxt->rc_write_pcl, xdr,
+                                     svc_rdma_xb_count_sges, &args);
+       if (ret < 0)
+               return false;
+
+       if (args.pd_length < RPCRDMA_PULLUP_THRESH)
                return true;
+       return args.pd_num_sges >= rdma->sc_max_send_sges;
+}
 
-       /* Check whether the xdr_buf has more elements than can
-        * fit in a single RDMA Send.
-        */
-       /* xdr->head */
-       elements = 1;
-
-       /* xdr->pages */
-       if (!rctxt || !rctxt->rc_write_list) {
-               unsigned int remaining;
-               unsigned long pageoff;
-
-               pageoff = xdr->page_base & ~PAGE_MASK;
-               remaining = xdr->page_len;
-               while (remaining) {
-                       ++elements;
-                       remaining -= min_t(u32, PAGE_SIZE - pageoff,
-                                          remaining);
-                       pageoff = 0;
-               }
+/**
+ * svc_rdma_xb_linearize - Copy region of xdr_buf to flat buffer
+ * @xdr: xdr_buf containing portion of an RPC message to copy
+ * @data: pointer to arguments
+ *
+ * Returns:
+ *   Always zero.
+ */
+static int svc_rdma_xb_linearize(const struct xdr_buf *xdr,
+                                void *data)
+{
+       struct svc_rdma_pullup_data *args = data;
+       unsigned int len, remaining;
+       unsigned long pageoff;
+       struct page **ppages;
+
+       if (xdr->head[0].iov_len) {
+               memcpy(args->pd_dest, xdr->head[0].iov_base, xdr->head[0].iov_len);
+               args->pd_dest += xdr->head[0].iov_len;
        }
 
-       /* xdr->tail */
-       if (xdr->tail[0].iov_len)
-               ++elements;
+       ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT);
+       pageoff = offset_in_page(xdr->page_base);
+       remaining = xdr->page_len;
+       while (remaining) {
+               len = min_t(u32, PAGE_SIZE - pageoff, remaining);
+               memcpy(args->pd_dest, page_address(*ppages) + pageoff, len);
+               remaining -= len;
+               args->pd_dest += len;
+               pageoff = 0;
+               ppages++;
+       }
 
-       /* assume 1 SGE is needed for the transport header */
-       return elements >= rdma->sc_max_send_sges;
+       if (xdr->tail[0].iov_len) {
+               memcpy(args->pd_dest, xdr->tail[0].iov_base, xdr->tail[0].iov_len);
+               args->pd_dest += xdr->tail[0].iov_len;
+       }
+
+       args->pd_length += xdr->len;
+       return 0;
 }
 
 /**
@@ -595,54 +655,30 @@ static bool svc_rdma_pull_up_needed(struct svcxprt_rdma *rdma,
  * The device is not capable of sending the reply directly.
  * Assemble the elements of @xdr into the transport header buffer.
  *
- * Returns zero on success, or a negative errno on failure.
+ * Assumptions:
+ *  pull_up_needed has determined that @xdr will fit in the buffer.
+ *
+ * Returns:
+ *   %0 if pull-up was successful
+ *   %-EMSGSIZE if a buffer manipulation problem occurred
  */
-static int svc_rdma_pull_up_reply_msg(struct svcxprt_rdma *rdma,
+static int svc_rdma_pull_up_reply_msg(const struct svcxprt_rdma *rdma,
                                      struct svc_rdma_send_ctxt *sctxt,
                                      const struct svc_rdma_recv_ctxt *rctxt,
                                      const struct xdr_buf *xdr)
 {
-       unsigned char *dst, *tailbase;
-       unsigned int taillen;
-
-       dst = sctxt->sc_xprt_buf + sctxt->sc_hdrbuf.len;
-       memcpy(dst, xdr->head[0].iov_base, xdr->head[0].iov_len);
-       dst += xdr->head[0].iov_len;
-
-       tailbase = xdr->tail[0].iov_base;
-       taillen = xdr->tail[0].iov_len;
-       if (rctxt && rctxt->rc_write_list) {
-               u32 xdrpad;
-
-               xdrpad = xdr_pad_size(xdr->page_len);
-               if (taillen && xdrpad) {
-                       tailbase += xdrpad;
-                       taillen -= xdrpad;
-               }
-       } else {
-               unsigned int len, remaining;
-               unsigned long pageoff;
-               struct page **ppages;
-
-               ppages = xdr->pages + (xdr->page_base >> PAGE_SHIFT);
-               pageoff = xdr->page_base & ~PAGE_MASK;
-               remaining = xdr->page_len;
-               while (remaining) {
-                       len = min_t(u32, PAGE_SIZE - pageoff, remaining);
-
-                       memcpy(dst, page_address(*ppages) + pageoff, len);
-                       remaining -= len;
-                       dst += len;
-                       pageoff = 0;
-                       ppages++;
-               }
-       }
+       struct svc_rdma_pullup_data args = {
+               .pd_dest        = sctxt->sc_xprt_buf + sctxt->sc_hdrbuf.len,
+       };
+       int ret;
 
-       if (taillen)
-               memcpy(dst, tailbase, taillen);
+       ret = pcl_process_nonpayloads(&rctxt->rc_write_pcl, xdr,
+                                     svc_rdma_xb_linearize, &args);
+       if (ret < 0)
+               return ret;
 
-       sctxt->sc_sges[0].length += xdr->len;
-       trace_svcrdma_send_pullup(sctxt->sc_sges[0].length);
+       sctxt->sc_sges[0].length = sctxt->sc_hdrbuf.len + args.pd_length;
+       trace_svcrdma_send_pullup(sctxt, args.pd_length);
        return 0;
 }