cqe = NULL;
}
- do {
- if (atomic_sub_return(cc->cc_sqecount,
- &rdma->sc_sq_avail) > 0) {
- cc->cc_posttime = ktime_get();
- ret = ib_post_send(rdma->sc_qp, first_wr, &bad_wr);
- if (ret)
- break;
- return 0;
- }
-
- percpu_counter_inc(&svcrdma_stat_sq_starve);
- trace_svcrdma_sq_full(rdma, &cc->cc_cid);
- atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail);
- wait_event(rdma->sc_send_wait,
- atomic_read(&rdma->sc_sq_avail) > cc->cc_sqecount);
- trace_svcrdma_sq_retry(rdma, &cc->cc_cid);
- } while (1);
-
- trace_svcrdma_sq_post_err(rdma, &cc->cc_cid, ret);
- svc_xprt_deferred_close(&rdma->sc_xprt);
-
- /* If even one was posted, there will be a completion. */
- if (bad_wr != first_wr)
- return 0;
+ ret = svc_rdma_sq_wait(rdma, &cc->cc_cid, cc->cc_sqecount);
+ if (ret < 0)
+ return ret;
- atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail);
- wake_up(&rdma->sc_send_wait);
- return -ENOTCONN;
+ cc->cc_posttime = ktime_get();
+ ret = ib_post_send(rdma->sc_qp, first_wr, &bad_wr);
+ if (ret)
+ return svc_rdma_post_send_err(rdma, &cc->cc_cid, bad_wr,
+ first_wr, cc->cc_sqecount,
+ ret);
+ return 0;
}
/* Build a bvec that covers one kvec in an xdr_buf.
wake_up(&rdma->sc_send_wait);
}
+/**
+ * svc_rdma_sq_wait - Wait for SQ slots using fair queuing
+ * @rdma: controlling transport
+ * @cid: completion ID for tracing
+ * @sqecount: number of SQ entries needed
+ *
+ * A ticket-based system ensures fair ordering when multiple threads
+ * wait for Send Queue capacity. Each waiter takes a ticket and is
+ * served in order, preventing starvation.
+ *
+ * Protocol invariant: every ticket holder must increment
+ * sc_sq_ticket_tail exactly once, whether the reservation
+ * succeeds or the connection closes. Failing to advance the
+ * tail stalls all subsequent waiters.
+ *
+ * The ticket counters are signed 32-bit atomics. After
+ * wrapping through INT_MAX, the equality check
+ * (tail == ticket) remains correct because both counters
+ * advance monotonically and the comparison uses exact
+ * equality rather than relational operators.
+ *
+ * Return values:
+ * %0: SQ slots were reserved successfully
+ * %-ENOTCONN: The connection was lost
+ */
+int svc_rdma_sq_wait(struct svcxprt_rdma *rdma,
+ const struct rpc_rdma_cid *cid, int sqecount)
+{
+ int ticket;
+
+ /* Fast path: try to reserve SQ slots without waiting.
+ *
+ * A failed reservation temporarily understates sc_sq_avail
+ * until the compensating atomic_add restores it. A Send
+ * completion arriving in that window sees a lower count
+ * than reality, but the value self-corrects once the add
+ * completes. No ordering guarantee is needed here because
+ * the slow path serializes all contended waiters.
+ */
+ if (likely(atomic_sub_return(sqecount, &rdma->sc_sq_avail) >= 0))
+ return 0;
+ atomic_add(sqecount, &rdma->sc_sq_avail);
+
+ /* Slow path: take a ticket and wait in line */
+ ticket = atomic_fetch_inc(&rdma->sc_sq_ticket_head);
+
+ percpu_counter_inc(&svcrdma_stat_sq_starve);
+ trace_svcrdma_sq_full(rdma, cid);
+
+ /* Wait until all earlier tickets have been served */
+ wait_event(rdma->sc_sq_ticket_wait,
+ test_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags) ||
+ atomic_read(&rdma->sc_sq_ticket_tail) == ticket);
+ if (test_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags))
+ goto out_close;
+
+ /* It's our turn. Wait for enough SQ slots to be available. */
+ while (atomic_sub_return(sqecount, &rdma->sc_sq_avail) < 0) {
+ atomic_add(sqecount, &rdma->sc_sq_avail);
+
+ wait_event(rdma->sc_send_wait,
+ test_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags) ||
+ atomic_read(&rdma->sc_sq_avail) >= sqecount);
+ if (test_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags))
+ goto out_close;
+ }
+
+ /* Slots reserved successfully. Let the next waiter proceed. */
+ atomic_inc(&rdma->sc_sq_ticket_tail);
+ wake_up(&rdma->sc_sq_ticket_wait);
+ trace_svcrdma_sq_retry(rdma, cid);
+ return 0;
+
+out_close:
+ atomic_inc(&rdma->sc_sq_ticket_tail);
+ wake_up(&rdma->sc_sq_ticket_wait);
+ return -ENOTCONN;
+}
+
+/**
+ * svc_rdma_post_send_err - Handle ib_post_send failure
+ * @rdma: controlling transport
+ * @cid: completion ID for tracing
+ * @bad_wr: first WR that was not posted
+ * @first_wr: first WR in the chain
+ * @sqecount: number of SQ entries that were reserved
+ * @ret: error code from ib_post_send
+ *
+ * Return values:
+ * %0: At least one WR was posted; a completion handles cleanup
+ * %-ENOTCONN: No WRs were posted; SQ slots are released
+ */
+int svc_rdma_post_send_err(struct svcxprt_rdma *rdma,
+ const struct rpc_rdma_cid *cid,
+ const struct ib_send_wr *bad_wr,
+ const struct ib_send_wr *first_wr,
+ int sqecount, int ret)
+{
+ trace_svcrdma_sq_post_err(rdma, cid, ret);
+ svc_xprt_deferred_close(&rdma->sc_xprt);
+
+ /* If even one WR was posted, a Send completion will
+ * return the reserved SQ slots.
+ */
+ if (bad_wr != first_wr)
+ return 0;
+
+ svc_rdma_wake_send_waiters(rdma, sqecount);
+ return -ENOTCONN;
+}
+
/**
* svc_rdma_wc_send - Invoked by RDMA provider for each polled Send WC
* @cq: Completion Queue context
* that these values remain available after the ib_post_send() call.
* In some error flow cases, svc_rdma_wc_send() releases @ctxt.
*
- * Note there is potential for starvation when the Send Queue is
- * full because there is no order to when waiting threads are
- * awoken. The transport is typically provisioned with a deep
- * enough Send Queue that SQ exhaustion should be a rare event.
- *
* Return values:
* %0: @ctxt's WR chain was posted successfully
* %-ENOTCONN: The connection was lost
send_wr->sg_list[0].length,
DMA_TO_DEVICE);
- /* If the SQ is full, wait until an SQ entry is available */
- while (!test_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags)) {
- if (atomic_sub_return(sqecount, &rdma->sc_sq_avail) < 0) {
- svc_rdma_wake_send_waiters(rdma, sqecount);
-
- /* When the transport is torn down, assume
- * ib_drain_sq() will trigger enough Send
- * completions to wake us. The XPT_CLOSE test
- * above should then cause the while loop to
- * exit.
- */
- percpu_counter_inc(&svcrdma_stat_sq_starve);
- trace_svcrdma_sq_full(rdma, &cid);
- wait_event(rdma->sc_send_wait,
- atomic_read(&rdma->sc_sq_avail) > 0);
- trace_svcrdma_sq_retry(rdma, &cid);
- continue;
- }
-
- trace_svcrdma_post_send(ctxt);
- ret = ib_post_send(rdma->sc_qp, first_wr, &bad_wr);
- if (ret) {
- trace_svcrdma_sq_post_err(rdma, &cid, ret);
- svc_xprt_deferred_close(&rdma->sc_xprt);
-
- /* If even one WR was posted, there will be a
- * Send completion that bumps sc_sq_avail.
- */
- if (bad_wr == first_wr) {
- svc_rdma_wake_send_waiters(rdma, sqecount);
- break;
- }
- }
- return 0;
- }
- return -ENOTCONN;
+ ret = svc_rdma_sq_wait(rdma, &cid, sqecount);
+ if (ret < 0)
+ return ret;
+
+ trace_svcrdma_post_send(ctxt);
+ ret = ib_post_send(rdma->sc_qp, first_wr, &bad_wr);
+ if (ret)
+ return svc_rdma_post_send_err(rdma, &cid, bad_wr,
+ first_wr, sqecount, ret);
+ return 0;
}
/**