*/
struct svc_rdma_write_info {
struct svcxprt_rdma *wi_rdma;
+ struct list_head wi_list;
const struct svc_rdma_chunk *wi_chunk;
struct ib_cqe sc_cqe;
struct xdr_buf sc_hdrbuf;
struct xdr_stream sc_stream;
+
+ struct list_head sc_write_info_list;
struct svc_rdma_write_info sc_reply_info;
+
void *sc_xprt_buf;
int sc_page_count;
int sc_cur_sge_no;
extern void svc_rdma_cc_release(struct svcxprt_rdma *rdma,
struct svc_rdma_chunk_ctxt *cc,
enum dma_data_direction dir);
+extern void svc_rdma_write_chunk_release(struct svcxprt_rdma *rdma,
+ struct svc_rdma_send_ctxt *ctxt);
extern void svc_rdma_reply_chunk_release(struct svcxprt_rdma *rdma,
struct svc_rdma_send_ctxt *ctxt);
-extern int svc_rdma_send_write_list(struct svcxprt_rdma *rdma,
- const struct svc_rdma_recv_ctxt *rctxt,
- const struct xdr_buf *xdr);
+extern int svc_rdma_prepare_write_list(struct svcxprt_rdma *rdma,
+ const struct svc_rdma_recv_ctxt *rctxt,
+ struct svc_rdma_send_ctxt *sctxt,
+ const struct xdr_buf *xdr);
extern int svc_rdma_prepare_reply_chunk(struct svcxprt_rdma *rdma,
const struct svc_rdma_pcl *write_pcl,
const struct svc_rdma_pcl *reply_pcl,
queue_work(svcrdma_wq, &info->wi_work);
}
+/**
+ * svc_rdma_write_chunk_release - Release Write chunk I/O resources
+ * @rdma: controlling transport
+ * @ctxt: Send context that is being released
+ *
+ * Write chunk resources remain live until Send completion because
+ * Write WRs are chained to the Send WR. This function releases all
+ * write_info structures accumulated on @ctxt->sc_write_info_list.
+ */
+void svc_rdma_write_chunk_release(struct svcxprt_rdma *rdma,
+ struct svc_rdma_send_ctxt *ctxt)
+{
+ struct svc_rdma_write_info *info;
+
+ while (!list_empty(&ctxt->sc_write_info_list)) {
+ info = list_first_entry(&ctxt->sc_write_info_list,
+ struct svc_rdma_write_info, wi_list);
+ list_del(&info->wi_list);
+ svc_rdma_write_info_free(info);
+ }
+}
+
/**
* svc_rdma_reply_chunk_release - Release Reply chunk I/O resources
* @rdma: controlling transport
struct ib_cqe *cqe = wc->wr_cqe;
struct svc_rdma_chunk_ctxt *cc =
container_of(cqe, struct svc_rdma_chunk_ctxt, cc_cqe);
- struct svc_rdma_write_info *info =
- container_of(cc, struct svc_rdma_write_info, wi_cc);
switch (wc->status) {
case IB_WC_SUCCESS:
trace_svcrdma_wc_write(&cc->cc_cid);
- break;
+ return;
case IB_WC_WR_FLUSH_ERR:
trace_svcrdma_wc_write_flush(wc, &cc->cc_cid);
break;
trace_svcrdma_wc_write_err(wc, &cc->cc_cid);
}
- svc_rdma_wake_send_waiters(rdma, cc->cc_sqecount);
-
- if (unlikely(wc->status != IB_WC_SUCCESS))
- svc_xprt_deferred_close(&rdma->sc_xprt);
-
- svc_rdma_write_info_free(info);
+ /* The RDMA Write has flushed, so the client won't get
+ * some of the outgoing RPC message. Signal the loss
+ * to the client by closing the connection.
+ */
+ svc_xprt_deferred_close(&rdma->sc_xprt);
}
/**
return xdr->len;
}
-static int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma,
- const struct svc_rdma_chunk *chunk,
- const struct xdr_buf *xdr)
+/*
+ * svc_rdma_prepare_write_chunk - Link Write WRs for @chunk onto @sctxt's chain
+ *
+ * Write WRs are prepended to the Send WR chain so that a single
+ * ib_post_send() posts both RDMA Writes and the final Send. Only
+ * the first WR in each chunk gets a CQE for error detection;
+ * subsequent WRs complete without individual completion events.
+ * The Send WR's signaled completion indicates all chained
+ * operations have finished.
+ */
+static int svc_rdma_prepare_write_chunk(struct svcxprt_rdma *rdma,
+ struct svc_rdma_send_ctxt *sctxt,
+ const struct svc_rdma_chunk *chunk,
+ const struct xdr_buf *xdr)
{
struct svc_rdma_write_info *info;
struct svc_rdma_chunk_ctxt *cc;
+ struct ib_send_wr *first_wr;
struct xdr_buf payload;
+ struct list_head *pos;
+ struct ib_cqe *cqe;
int ret;
if (xdr_buf_subsegment(xdr, &payload, chunk->ch_position,
if (ret != payload.len)
goto out_err;
- trace_svcrdma_post_write_chunk(&cc->cc_cid, cc->cc_sqecount);
- ret = svc_rdma_post_chunk_ctxt(rdma, cc);
- if (ret < 0)
+ ret = -EINVAL;
+ if (unlikely(sctxt->sc_sqecount + cc->cc_sqecount > rdma->sc_sq_depth))
goto out_err;
+
+ first_wr = sctxt->sc_wr_chain;
+ cqe = &cc->cc_cqe;
+ list_for_each(pos, &cc->cc_rwctxts) {
+ struct svc_rdma_rw_ctxt *rwc;
+
+ rwc = list_entry(pos, struct svc_rdma_rw_ctxt, rw_list);
+ first_wr = rdma_rw_ctx_wrs(&rwc->rw_ctx, rdma->sc_qp,
+ rdma->sc_port_num, cqe, first_wr);
+ cqe = NULL;
+ }
+ sctxt->sc_wr_chain = first_wr;
+ sctxt->sc_sqecount += cc->cc_sqecount;
+ list_add(&info->wi_list, &sctxt->sc_write_info_list);
+
+ trace_svcrdma_post_write_chunk(&cc->cc_cid, cc->cc_sqecount);
return 0;
out_err:
}
/**
- * svc_rdma_send_write_list - Send all chunks on the Write list
+ * svc_rdma_prepare_write_list - Construct WR chain for sending Write list
* @rdma: controlling RDMA transport
* @rctxt: Write list provisioned by the client
+ * @sctxt: Send WR resources
* @xdr: xdr_buf containing an RPC Reply message
*
- * Returns zero on success, or a negative errno if one or more
- * Write chunks could not be sent.
+ * Returns zero on success, or a negative errno if WR chain
+ * construction fails for one or more Write chunks.
*/
-int svc_rdma_send_write_list(struct svcxprt_rdma *rdma,
- const struct svc_rdma_recv_ctxt *rctxt,
- const struct xdr_buf *xdr)
+int svc_rdma_prepare_write_list(struct svcxprt_rdma *rdma,
+ const struct svc_rdma_recv_ctxt *rctxt,
+ struct svc_rdma_send_ctxt *sctxt,
+ const struct xdr_buf *xdr)
{
struct svc_rdma_chunk *chunk;
int ret;
pcl_for_each_chunk(chunk, &rctxt->rc_write_pcl) {
if (!chunk->ch_payload_length)
break;
- ret = svc_rdma_send_write_chunk(rdma, chunk, xdr);
+ ret = svc_rdma_prepare_write_chunk(rdma, sctxt, chunk, xdr);
if (ret < 0)
return ret;
}
ctxt->sc_send_wr.sg_list = ctxt->sc_sges;
ctxt->sc_send_wr.send_flags = IB_SEND_SIGNALED;
ctxt->sc_cqe.done = svc_rdma_wc_send;
+ INIT_LIST_HEAD(&ctxt->sc_write_info_list);
ctxt->sc_xprt_buf = buffer;
xdr_buf_init(&ctxt->sc_hdrbuf, ctxt->sc_xprt_buf,
rdma->sc_max_req_size);
struct ib_device *device = rdma->sc_cm_id->device;
unsigned int i;
+ svc_rdma_write_chunk_release(rdma, ctxt);
svc_rdma_reply_chunk_release(rdma, ctxt);
if (ctxt->sc_page_count)
sctxt->sc_send_wr.num_sge = 1;
sctxt->sc_send_wr.opcode = IB_WR_SEND;
sctxt->sc_sges[0].length = sctxt->sc_hdrbuf.len;
+
+ /* Ensure only the error message is posted, not any previously
+ * prepared Write chunk WRs.
+ */
+ sctxt->sc_wr_chain = &sctxt->sc_send_wr;
+ sctxt->sc_sqecount = 1;
if (svc_rdma_post_send(rdma, sctxt))
goto put_ctxt;
return;
if (!p)
goto put_ctxt;
- ret = svc_rdma_send_write_list(rdma, rctxt, &rqstp->rq_res);
+ ret = svc_rdma_prepare_write_list(rdma, rctxt, sctxt, &rqstp->rq_res);
if (ret < 0)
goto put_ctxt;