* The ownership of all of the Reply's pages are transferred into that
* ctxt, the Send WR is posted, and sendto returns.
*
- * The svc_rdma_send_ctxt is presented when the Send WR completes. The
- * Send completion handler finally releases the Reply's pages.
- *
- * This mechanism also assumes that completions on the transport's Send
- * Completion Queue do not run in parallel. Otherwise a Write completion
- * and Send completion running at the same time could release pages that
- * are still DMA-mapped.
+ * The svc_rdma_send_ctxt is presented when the Send WR completes.
+ * The Send completion handler queues the send_ctxt onto the
+ * per-transport sc_send_release_list (a lock-free llist). The
+ * nfsd thread drains sc_send_release_list in xpo_release_ctxt
+ * between RPCs, DMA-unmapping SGEs, releasing chunk I/O
+ * resources and pages, and returning send_ctxts to the free
+ * list in a batch.
*
* Error Handling
*
* - If the Send WR is posted successfully, it will either complete
* successfully, or get flushed. Either way, the Send completion
- * handler releases the Reply's pages.
- * - If the Send WR cannot be not posted, the forward path releases
- * the Reply's pages.
+ * handler queues the send_ctxt for deferred release.
+ * - If the Send WR cannot be posted, the forward path releases the
+ * Reply's pages.
*
* This handles the case, without the use of page reference counting,
* where two different Write segments send portions of the same page.
return ctxt;
out_empty:
+ svc_rdma_send_ctxts_drain(rdma);
+
+ spin_lock(&rdma->sc_send_lock);
+ node = llist_del_first(&rdma->sc_send_ctxts);
+ spin_unlock(&rdma->sc_send_lock);
+ if (node) {
+ ctxt = llist_entry(node, struct svc_rdma_send_ctxt, sc_node);
+ goto out;
+ }
+
ctxt = svc_rdma_send_ctxt_alloc(rdma);
if (!ctxt)
return NULL;
goto out;
}
-static void svc_rdma_send_ctxt_release(struct svcxprt_rdma *rdma,
- struct svc_rdma_send_ctxt *ctxt)
+/* Release chunk I/O resources and DMA-unmap SGEs. */
+static void svc_rdma_send_ctxt_unmap(struct svcxprt_rdma *rdma,
+ struct svc_rdma_send_ctxt *ctxt)
{
struct ib_device *device = rdma->sc_cm_id->device;
unsigned int i;
svc_rdma_write_chunk_release(rdma, ctxt);
svc_rdma_reply_chunk_release(rdma, ctxt);
- if (ctxt->sc_page_count)
- release_pages(ctxt->sc_pages, ctxt->sc_page_count);
-
/* The first SGE contains the transport header, which
* remains mapped until @ctxt is destroyed.
*/
ctxt->sc_sges[i].length,
DMA_TO_DEVICE);
}
+}
+
+/* Unmap, release pages, and return send_ctxt to the free list. */
+static void svc_rdma_send_ctxt_release(struct svcxprt_rdma *rdma,
+ struct svc_rdma_send_ctxt *ctxt)
+{
+ svc_rdma_send_ctxt_unmap(rdma, ctxt);
+
+ if (ctxt->sc_page_count)
+ release_pages(ctxt->sc_pages, ctxt->sc_page_count);
llist_add(&ctxt->sc_node, &rdma->sc_send_ctxts);
}
-static void svc_rdma_send_ctxt_put_async(struct work_struct *work)
+/**
+ * svc_rdma_send_ctxts_drain - Release completed send_ctxts
+ * @rdma: controlling svcxprt_rdma
+ */
+void svc_rdma_send_ctxts_drain(struct svcxprt_rdma *rdma)
{
- struct svc_rdma_send_ctxt *ctxt;
+ struct svc_rdma_send_ctxt *ctxt, *next;
+ struct llist_node *node;
- ctxt = container_of(work, struct svc_rdma_send_ctxt, sc_work);
- svc_rdma_send_ctxt_release(ctxt->sc_rdma, ctxt);
+ node = llist_del_all(&rdma->sc_send_release_list);
+ llist_for_each_entry_safe(ctxt, next, node, sc_node)
+ svc_rdma_send_ctxt_release(rdma, ctxt);
}
/**
- * svc_rdma_send_ctxt_put - Return send_ctxt to free list
+ * svc_rdma_send_ctxt_put - Queue send_ctxt for deferred release
* @rdma: controlling svcxprt_rdma
- * @ctxt: object to return to the free list
+ * @ctxt: send_ctxt to queue for deferred release
+ *
+ * Queues @ctxt onto sc_send_release_list. DMA unmap and
+ * page release run later in svc_rdma_send_ctxts_drain(),
+ * typically from xpo_release_ctxt.
*
- * Pages left in sc_pages are DMA unmapped and released.
+ * On the empty-to-non-empty transition, set XPT_DATA and
+ * enqueue the transport. Without this self-trigger, a Send
+ * completion arriving after the last xpo_release_ctxt on an
+ * idle connection would leave the send_ctxt's DMA mappings
+ * and reply pages pinned until another drain occurred.
*/
void svc_rdma_send_ctxt_put(struct svcxprt_rdma *rdma,
struct svc_rdma_send_ctxt *ctxt)
{
- INIT_WORK(&ctxt->sc_work, svc_rdma_send_ctxt_put_async);
- queue_work(svcrdma_wq, &ctxt->sc_work);
+ if (llist_add(&ctxt->sc_node, &rdma->sc_send_release_list)) {
+ set_bit(XPT_DATA, &rdma->sc_xprt.xpt_flags);
+ svc_xprt_enqueue(&rdma->sc_xprt);
+ }
}
/**
atomic_inc(&rdma->sc_sq_ticket_tail);
wake_up(&rdma->sc_sq_ticket_wait);
trace_svcrdma_sq_retry(rdma, cid);
+
+ /*
+ * While this thread sat on sc_send_wait or sc_sq_ticket_wait,
+ * Send completions that tried to enqueue this transport for a
+ * release-list drain were rejected: svc_rdma_has_wspace returns
+ * 0 while either waitqueue is active, and svc_xprt_ready
+ * rejects the enqueue. Drain the release list now.
+ */
+ svc_rdma_send_ctxts_drain(rdma);
return 0;
out_close: