extern u32 svc_max_payload(const struct svc_rqst *rqstp);
/*
- * RPC Requests and replies are stored in one or more pages.
- * We maintain an array of pages for each server thread.
- * Requests are copied into these pages as they arrive. Remaining
- * pages are available to write the reply into.
+ * RPC Call and Reply messages each have their own page array.
+ * rq_pages holds the incoming Call message; rq_respages holds
+ * the outgoing Reply message. Both arrays are sized to
+ * svc_serv_maxpages() entries and are allocated dynamically.
*
- * Pages are sent using ->sendmsg with MSG_SPLICE_PAGES so each server thread
- * needs to allocate more to replace those used in sending. To help keep track
- * of these pages we have a receive list where all pages initialy live, and a
- * send list where pages are moved to when there are to be part of a reply.
+ * Pages are sent using ->sendmsg with MSG_SPLICE_PAGES so each
+ * server thread needs to allocate more to replace those used in
+ * sending.
*
- * We use xdr_buf for holding responses as it fits well with NFS
- * read responses (that have a header, and some data pages, and possibly
- * a tail) and means we can share some client side routines.
+ * xdr_buf holds responses; the structure fits NFS read responses
+ * (header, data pages, optional tail) and enables sharing of
+ * client-side routines.
*
- * The xdr_buf.head kvec always points to the first page in the rq_*pages
- * list. The xdr_buf.pages pointer points to the second page on that
- * list. xdr_buf.tail points to the end of the first page.
- * This assumes that the non-page part of an rpc reply will fit
- * in a page - NFSd ensures this. lockd also has no trouble.
+ * The xdr_buf.head kvec always points to the first page in the
+ * rq_*pages list. The xdr_buf.pages pointer points to the second
+ * page on that list. xdr_buf.tail points to the end of the first
+ * page. This assumes that the non-page part of an rpc reply will
+ * fit in a page - NFSd ensures this. lockd also has no trouble.
*/
/**
* Returns a count of pages or vectors that can hold the maximum
* size RPC message for @serv.
*
- * Each request/reply pair can have at most one "payload", plus two
- * pages, one for the request, and one for the reply.
- * nfsd_splice_actor() might need an extra page when a READ payload
- * is not page-aligned.
+ * Each page array can hold at most one payload plus two
+ * overhead pages (one for the RPC header, one for tail data).
+ * nfsd_splice_actor() might need an extra page when a READ
+ * payload is not page-aligned.
*/
static inline unsigned long svc_serv_maxpages(const struct svc_serv *serv)
{
struct xdr_stream rq_res_stream;
struct folio *rq_scratch_folio;
struct xdr_buf rq_res;
- unsigned long rq_maxpages; /* num of entries in rq_pages */
- struct page * *rq_pages;
- struct page * *rq_respages; /* points into rq_pages */
+ unsigned long rq_maxpages; /* entries per page array */
+ struct page * *rq_pages; /* Call buffer pages */
+ struct page * *rq_respages; /* Reply buffer pages */
struct page * *rq_next_page; /* next reply page to use */
- struct page * *rq_page_end; /* one past the last page */
+ struct page * *rq_page_end; /* one past the last reply page */
struct folio_batch rq_fbatch;
struct bio_vec *rq_bvec;
{
rqstp->rq_maxpages = svc_serv_maxpages(serv);
- /* rq_pages' last entry is NULL for historical reasons. */
+ /* +1 for a NULL sentinel readable by nfsd_splice_actor() */
rqstp->rq_pages = kcalloc_node(rqstp->rq_maxpages + 1,
sizeof(struct page *),
GFP_KERNEL, node);
if (!rqstp->rq_pages)
return false;
+ /* +1 for a NULL sentinel at rq_page_end (see svc_rqst_replace_page) */
+ rqstp->rq_respages = kcalloc_node(rqstp->rq_maxpages + 1,
+ sizeof(struct page *),
+ GFP_KERNEL, node);
+ if (!rqstp->rq_respages) {
+ kfree(rqstp->rq_pages);
+ rqstp->rq_pages = NULL;
+ return false;
+ }
+
return true;
}
{
unsigned long i;
- for (i = 0; i < rqstp->rq_maxpages; i++)
- if (rqstp->rq_pages[i])
- put_page(rqstp->rq_pages[i]);
- kfree(rqstp->rq_pages);
+ if (rqstp->rq_pages) {
+ for (i = 0; i < rqstp->rq_maxpages; i++)
+ if (rqstp->rq_pages[i])
+ put_page(rqstp->rq_pages[i]);
+ kfree(rqstp->rq_pages);
+ }
+
+ if (rqstp->rq_respages) {
+ for (i = 0; i < rqstp->rq_maxpages; i++)
+ if (rqstp->rq_respages[i])
+ put_page(rqstp->rq_respages[i]);
+ kfree(rqstp->rq_respages);
+ }
}
static void
}
}
-static bool svc_alloc_arg(struct svc_rqst *rqstp)
+static bool svc_fill_pages(struct svc_rqst *rqstp, struct page **pages,
+ unsigned long npages)
{
- struct xdr_buf *arg = &rqstp->rq_arg;
- unsigned long pages, filled, ret;
+ unsigned long filled, ret;
- pages = rqstp->rq_maxpages;
- for (filled = 0; filled < pages; filled = ret) {
- ret = alloc_pages_bulk(GFP_KERNEL, pages, rqstp->rq_pages);
+ for (filled = 0; filled < npages; filled = ret) {
+ ret = alloc_pages_bulk(GFP_KERNEL, npages, pages);
if (ret > filled)
/* Made progress, don't sleep yet */
continue;
set_current_state(TASK_RUNNING);
return false;
}
- trace_svc_alloc_arg_err(pages, ret);
+ trace_svc_alloc_arg_err(npages, ret);
memalloc_retry_wait(GFP_KERNEL);
}
- rqstp->rq_page_end = &rqstp->rq_pages[pages];
- rqstp->rq_pages[pages] = NULL; /* this might be seen in nfsd_splice_actor() */
+ return true;
+}
+
+static bool svc_alloc_arg(struct svc_rqst *rqstp)
+{
+ struct xdr_buf *arg = &rqstp->rq_arg;
+ unsigned long pages;
+
+ pages = rqstp->rq_maxpages;
+
+ if (!svc_fill_pages(rqstp, rqstp->rq_pages, pages))
+ return false;
+ if (!svc_fill_pages(rqstp, rqstp->rq_respages, pages))
+ return false;
+ rqstp->rq_next_page = rqstp->rq_respages;
+ rqstp->rq_page_end = &rqstp->rq_respages[pages];
+ /* svc_rqst_replace_page() dereferences *rq_next_page even
+ * at rq_page_end; NULL prevents releasing a garbage page.
+ */
+ rqstp->rq_page_end[0] = NULL;
/* Make arg->head point to first page and arg->pages point to rest */
arg->head[0].iov_base = page_address(rqstp->rq_pages[0]);
rqstp->rq_addrlen = dr->addrlen;
/* Save off transport header len in case we get deferred again */
rqstp->rq_daddr = dr->daddr;
- rqstp->rq_respages = rqstp->rq_pages;
rqstp->rq_xprt_ctxt = dr->xprt_ctxt;
dr->xprt_ctxt = NULL;
for (i = 0, t = 0; t < buflen; i++, t += PAGE_SIZE)
bvec_set_page(&bvec[i], rqstp->rq_pages[i], PAGE_SIZE, 0);
- rqstp->rq_respages = &rqstp->rq_pages[i];
- rqstp->rq_next_page = rqstp->rq_respages + 1;
iov_iter_bvec(&msg.msg_iter, ITER_DEST, bvec, i, buflen);
if (seek) {
if (len <= rqstp->rq_arg.head[0].iov_len) {
rqstp->rq_arg.head[0].iov_len = len;
rqstp->rq_arg.page_len = 0;
- rqstp->rq_respages = rqstp->rq_pages+1;
} else {
rqstp->rq_arg.page_len = len - rqstp->rq_arg.head[0].iov_len;
- rqstp->rq_respages = rqstp->rq_pages + 1 +
- DIV_ROUND_UP(rqstp->rq_arg.page_len, PAGE_SIZE);
}
- rqstp->rq_next_page = rqstp->rq_respages+1;
if (serv->sv_stats)
serv->sv_stats->netudpcnt++;
unsigned int i;
/* Transfer the Read chunk pages into @rqstp.rq_pages, replacing
- * the rq_pages that were already allocated for this rqstp.
+ * the receive buffer pages already allocated for this rqstp.
*/
- release_pages(rqstp->rq_respages, ctxt->rc_page_count);
+ release_pages(rqstp->rq_pages, ctxt->rc_page_count);
for (i = 0; i < ctxt->rc_page_count; i++)
rqstp->rq_pages[i] = ctxt->rc_pages[i];
- /* Update @rqstp's result send buffer to start after the
- * last page in the RDMA Read payload.
- */
- rqstp->rq_respages = &rqstp->rq_pages[ctxt->rc_page_count];
- rqstp->rq_next_page = rqstp->rq_respages + 1;
-
/* Prevent svc_rdma_recv_ctxt_put() from releasing the
* pages in ctxt::rc_pages a second time.
*/
struct svc_rdma_recv_ctxt *ctxt;
int ret;
- /* Prevent svc_xprt_release() from releasing pages in rq_pages
- * when returning 0 or an error.
+ /* Precaution: a zero page count on error return causes
+ * svc_rqst_release_pages() to release nothing.
*/
- rqstp->rq_respages = rqstp->rq_pages;
rqstp->rq_next_page = rqstp->rq_respages;
rqstp->rq_xprt_ctxt = NULL;