} buffer;
bool new_with_sport_idx = false;
u32 new_peer_gen_num = 0;
+ int new_npaths;
+
+ new_npaths = conn->c_npaths;
while (1) {
len = sizeof(buffer);
/* Process extension header here */
switch (type) {
case RDS_EXTHDR_NPATHS:
- conn->c_npaths = min_t(int, RDS_MPATH_WORKERS,
- be16_to_cpu(buffer.rds_npaths));
+ new_npaths = min_t(int, RDS_MPATH_WORKERS,
+ be16_to_cpu(buffer.rds_npaths));
break;
case RDS_EXTHDR_GEN_NUM:
new_peer_gen_num = be32_to_cpu(buffer.rds_gen_num);
}
conn->c_with_sport_idx = new_with_sport_idx;
+
+ if (new_npaths > 1 && new_npaths != conn->c_npaths) {
+ /* We're about to fan-out.
+ * Make sure that messages from cp_index#0
+ * are sent prior to handling other lanes.
+ */
+ struct rds_conn_path *cp0 = conn->c_path;
+ unsigned long flags;
+
+ spin_lock_irqsave(&cp0->cp_lock, flags);
+ conn->c_cp0_mprds_catchup_tx_seq = cp0->cp_next_tx_seq;
+ spin_unlock_irqrestore(&cp0->cp_lock, flags);
+ }
/* if RDS_EXTHDR_NPATHS was not found, default to a single-path */
- conn->c_npaths = max_t(int, conn->c_npaths, 1);
+ conn->c_npaths = max_t(int, new_npaths, 1);
+
conn->c_ping_triggered = 0;
rds_conn_peer_gen_update(conn, new_peer_gen_num);
wake_up_all(&cp->cp_waitq);
}
+/*
+ * Helper function for multipath fanout to ensure lane 0 transmits queued
+ * messages before other lanes to prevent out-of-order delivery.
+ *
+ * Returns true if lane 0 still has messages or false otherwise
+ */
+static bool rds_mprds_cp0_catchup(struct rds_connection *conn)
+{
+ struct rds_conn_path *cp0 = conn->c_path;
+ struct rds_message *rm0;
+ unsigned long flags;
+ bool ret = false;
+
+ spin_lock_irqsave(&cp0->cp_lock, flags);
+
+ /* the oldest / first message in the retransmit queue
+ * has to be at or beyond c_cp0_mprds_catchup_tx_seq
+ */
+ if (!list_empty(&cp0->cp_retrans)) {
+ rm0 = list_entry(cp0->cp_retrans.next, struct rds_message,
+ m_conn_item);
+ if (be64_to_cpu(rm0->m_inc.i_hdr.h_sequence) <
+ conn->c_cp0_mprds_catchup_tx_seq) {
+ /* the retransmit queue of cp_index#0 has not
+ * quite caught up yet
+ */
+ ret = true;
+ goto unlock;
+ }
+ }
+
+ /* the oldest / first message of the send queue
+ * has to be at or beyond c_cp0_mprds_catchup_tx_seq
+ */
+ rm0 = cp0->cp_xmit_rm;
+ if (!rm0 && !list_empty(&cp0->cp_send_queue))
+ rm0 = list_entry(cp0->cp_send_queue.next, struct rds_message,
+ m_conn_item);
+ if (rm0 && be64_to_cpu(rm0->m_inc.i_hdr.h_sequence) <
+ conn->c_cp0_mprds_catchup_tx_seq) {
+ /* the send queue of cp_index#0 has not quite
+ * caught up yet
+ */
+ ret = true;
+ }
+
+unlock:
+ spin_unlock_irqrestore(&cp0->cp_lock, flags);
+ return ret;
+}
+
/*
* We're making the conscious trade-off here to only send one message
* down the connection at a time.
if (batch_count >= send_batch_count)
goto over_batch;
+ /* make sure cp_index#0 caught up during fan-out in
+ * order to avoid lane races
+ */
+ if (cp->cp_index > 0 && rds_mprds_cp0_catchup(conn)) {
+ rds_stats_inc(s_mprds_catchup_tx0_retries);
+ goto over_batch;
+ }
+
spin_lock_irqsave(&cp->cp_lock, flags);
if (!list_empty(&cp->cp_send_queue)) {
return ret;
}
-static int rds_send_mprds_hash(struct rds_sock *rs,
- struct rds_connection *conn, int nonblock)
-{
- int hash;
-
- if (conn->c_npaths == 0)
- hash = RDS_MPATH_HASH(rs, RDS_MPATH_WORKERS);
- else
- hash = RDS_MPATH_HASH(rs, conn->c_npaths);
- if (conn->c_npaths == 0 && hash != 0) {
- rds_send_ping(conn, 0);
-
- /* The underlying connection is not up yet. Need to wait
- * until it is up to be sure that the non-zero c_path can be
- * used. But if we are interrupted, we have to use the zero
- * c_path in case the connection ends up being non-MP capable.
- */
- if (conn->c_npaths == 0) {
- /* Cannot wait for the connection be made, so just use
- * the base c_path.
- */
- if (nonblock)
- return 0;
- if (wait_event_interruptible(conn->c_hs_waitq,
- conn->c_npaths != 0))
- hash = 0;
- }
- if (conn->c_npaths == 1)
- hash = 0;
- }
- return hash;
-}
-
static int rds_rdma_bytes(struct msghdr *msg, size_t *rdma_bytes)
{
struct rds_rdma_args *args;
rs->rs_conn = conn;
}
- if (conn->c_trans->t_mp_capable)
- cpath = &conn->c_path[rds_send_mprds_hash(rs, conn, nonblock)];
- else
+ if (conn->c_trans->t_mp_capable) {
+ /* Use c_path[0] until we learn that
+ * the peer supports more (c_npaths > 1)
+ */
+ cpath = &conn->c_path[RDS_MPATH_HASH(rs, conn->c_npaths ? : 1)];
+ } else {
cpath = &conn->c_path[0];
+ }
rm->m_conn_path = cpath;