From: Amaury Denoyelle Date: Tue, 29 Mar 2022 13:15:54 +0000 (+0200) Subject: MEDIUM: quic: move transport fields from qcs to qc_conn_stream X-Git-Tag: v2.6-dev5~82 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=7272cd76fc2db0d8778d13c3092e5f8303a49c07;p=thirdparty%2Fhaproxy.git MEDIUM: quic: move transport fields from qcs to qc_conn_stream Move the xprt-buf and ack related fields from qcs to the qc_stream_desc structure. In exchange, qcs has a pointer to the low-level stream. For each new qcs, a qc_stream_desc is automatically allocated. This simplify the transport layer by removing qcs/mux manipulation during ACK frame parsing. An additional check is done to not notify the MUX on sending if the stream is already released : this case may now happen on retransmission. To complete this change, the quic_stream frame now references the quic_stream instance instead of a qcs. --- diff --git a/include/haproxy/mux_quic-t.h b/include/haproxy/mux_quic-t.h index 20d633b5eb..acc855bf73 100644 --- a/include/haproxy/mux_quic-t.h +++ b/include/haproxy/mux_quic-t.h @@ -10,6 +10,7 @@ #include #include +#include /* Stream types */ enum qcs_type { @@ -98,14 +99,12 @@ struct qcs { struct { uint64_t offset; /* last offset of data ready to be sent */ uint64_t sent_offset; /* last offset sent by transport layer */ - struct eb_root acked_frms; /* acked frames ordered by their offsets */ - uint64_t ack_offset; /* last acked ordered byte offset */ struct buffer buf; /* transmit buffer before sending via xprt */ - struct buffer xprt_buf; /* buffer for xprt sending, cleared on ACK. */ uint64_t msd; /* fctl bytes limit to respect on emission */ } tx; struct eb64_node by_id; /* place in qcc's streams_by_id */ + struct qc_stream_desc *stream; struct wait_event wait_event; struct wait_event *subs; diff --git a/include/haproxy/quic_frame-t.h b/include/haproxy/quic_frame-t.h index 2e94e057f9..12e3cd04c3 100644 --- a/include/haproxy/quic_frame-t.h +++ b/include/haproxy/quic_frame-t.h @@ -34,6 +34,7 @@ #include #include +#include /* QUIC frame types. */ enum quic_frame_type { @@ -147,7 +148,7 @@ struct quic_new_token { struct quic_stream { uint64_t id; - struct qcs *qcs; + struct qc_stream_desc *stream; /* used only on TX when constructing frames. * Data cleared when processing ACK related to this STREAM frame. diff --git a/src/mux_quic.c b/src/mux_quic.c index 40e26d65d7..f7ef248ba4 100644 --- a/src/mux_quic.c +++ b/src/mux_quic.c @@ -96,6 +96,7 @@ INITCALL1(STG_REGISTER, trace_register_source, TRACE_SOURCE); struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type) { struct qcs *qcs; + struct qc_stream_desc *stream; TRACE_ENTER(QMUX_EV_QCS_NEW, qcc->conn); @@ -103,6 +104,15 @@ struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type) if (!qcs) goto out; + /* allocate transport layer stream descriptor */ + stream = qc_stream_desc_new(qcc->conn->qc, id, qcs); + if (!stream) { + pool_free(pool_head_qcs, qcs); + qcs = NULL; + goto out; + } + + qcs->stream = stream; qcs->qcc = qcc; qcs->cs = NULL; qcs->flags = QC_SF_NONE; @@ -122,11 +132,8 @@ struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type) qcs->rx.frms = EB_ROOT_UNIQUE; qcs->tx.buf = BUF_NULL; - qcs->tx.xprt_buf = BUF_NULL; qcs->tx.offset = 0; qcs->tx.sent_offset = 0; - qcs->tx.ack_offset = 0; - qcs->tx.acked_frms = EB_ROOT; qcs->wait_event.tasklet = NULL; qcs->wait_event.events = 0; @@ -145,11 +152,12 @@ void qcs_free(struct qcs *qcs) { b_free(&qcs->rx.buf); b_free(&qcs->tx.buf); - b_free(&qcs->tx.xprt_buf); BUG_ON(!qcs->qcc->strms[qcs_id_type(qcs->by_id.key)].nb_streams); --qcs->qcc->strms[qcs_id_type(qcs->by_id.key)].nb_streams; + qc_stream_desc_release(qcs->stream); + eb64_delete(&qcs->by_id); pool_free(pool_head_qcs, qcs); } @@ -260,6 +268,7 @@ struct qcs *qcc_get_qcs(struct qcc *qcc, uint64_t id) for (i = largest_id + 1; i <= sub_id; i++) { uint64_t id = (i << QCS_ID_TYPE_SHIFT) | strm_type; enum qcs_type type = id & QCS_ID_DIR_BIT ? QCS_CLT_UNI : QCS_CLT_BIDI; + tmp_qcs = qcs_new(qcc, id, type); if (!tmp_qcs) { /* allocation failure */ @@ -558,10 +567,10 @@ static int qcs_push_frame(struct qcs *qcs, struct buffer *out, * |xxxxxxxxxxxxxxxxx| */ - BUG_ON_HOT(qcs->tx.sent_offset < qcs->tx.ack_offset); + BUG_ON_HOT(qcs->tx.sent_offset < qcs->stream->ack_offset); BUG_ON_HOT(qcs->tx.offset < qcs->tx.sent_offset); - head = qcs->tx.sent_offset - qcs->tx.ack_offset; + head = qcs->tx.sent_offset - qcs->stream->ack_offset; left = qcs->tx.offset - qcs->tx.sent_offset; to_xfer = QUIC_MIN(b_data(payload), b_room(out)); @@ -585,7 +594,7 @@ static int qcs_push_frame(struct qcs *qcs, struct buffer *out, total = b_force_xfer(out, payload, to_xfer); frm->type = QUIC_FT_STREAM_8; - frm->stream.qcs = (struct qcs *)qcs; + frm->stream.stream = qcs->stream; frm->stream.id = qcs->by_id.key; frm->stream.buf = out; frm->stream.data = (unsigned char *)b_peek(out, head); @@ -753,7 +762,7 @@ static int qc_send(struct qcc *qcc) while (node) { struct qcs *qcs = container_of(node, struct qcs, by_id); struct buffer *buf = &qcs->tx.buf; - struct buffer *out = &qcs->tx.xprt_buf; + struct buffer *out = &qcs->stream->buf; /* TODO * for the moment, unidirectional streams have their own @@ -819,7 +828,8 @@ static int qc_release_detached_streams(struct qcc *qcc) node = eb64_next(node); if (qcs->flags & QC_SF_DETACH) { - if ((!b_data(&qcs->tx.buf) && !b_data(&qcs->tx.xprt_buf))) { + if (!b_data(&qcs->tx.buf) && + qcs->tx.offset == qcs->tx.sent_offset) { qcs_destroy(qcs); release = 1; } @@ -1033,7 +1043,7 @@ static void qc_detach(struct conn_stream *cs) * managment between xprt and mux is reorganized. */ - if ((b_data(&qcs->tx.buf) || b_data(&qcs->tx.xprt_buf))) { + if (b_data(&qcs->tx.buf) || qcs->tx.offset > qcs->tx.sent_offset) { TRACE_DEVEL("leaving with remaining data, detaching qcs", QMUX_EV_STRM_END, qcc->conn, qcs); qcs->flags |= QC_SF_DETACH; return; diff --git a/src/xprt_quic.c b/src/xprt_quic.c index 36b50fd5b3..5f31d0edd5 100644 --- a/src/xprt_quic.c +++ b/src/xprt_quic.c @@ -447,12 +447,12 @@ static void quic_trace(enum trace_level level, uint64_t mask, const struct trace if (mask & QUIC_EV_CONN_ACKSTRM) { const struct quic_stream *s = a2; - const struct qcs *qcs = a3; + const struct qc_stream_desc *stream = a3; if (s) chunk_appendf(&trace_buf, " off=%llu len=%llu", (ull)s->offset.key, (ull)s->len); - if (qcs) - chunk_appendf(&trace_buf, " ack_offset=%llu", (ull)qcs->tx.ack_offset); + if (stream) + chunk_appendf(&trace_buf, " ack_offset=%llu", (ull)stream->ack_offset); } if (mask & QUIC_EV_CONN_RTTUPDT) { @@ -1421,38 +1421,35 @@ static int qc_stream_desc_free(struct qc_stream_desc *stream) return 0; } -/* Remove from stream the acknowledged frames. +/* Remove from the acknowledged frames. * * Returns 1 if at least one frame was removed else 0. */ -static int qcs_try_to_consume(struct qcs *qcs) +static int quic_stream_try_to_consume(struct quic_conn *qc, + struct qc_stream_desc *stream) { int ret; struct eb64_node *frm_node; ret = 0; - frm_node = eb64_first(&qcs->tx.acked_frms); + frm_node = eb64_first(&stream->acked_frms); while (frm_node) { struct quic_stream *strm; struct quic_frame *frm; strm = eb64_entry(&frm_node->node, struct quic_stream, offset); - if (strm->offset.key > qcs->tx.ack_offset) + if (strm->offset.key > stream->ack_offset) break; TRACE_PROTO("stream consumed", QUIC_EV_CONN_ACKSTRM, - qcs->qcc->conn->qc, strm, qcs); - if (strm->offset.key + strm->len > qcs->tx.ack_offset) { + qc, strm, stream); + + if (strm->offset.key + strm->len > stream->ack_offset) { const size_t diff = strm->offset.key + strm->len - - qcs->tx.ack_offset; - qcs->tx.ack_offset += diff; + stream->ack_offset; + stream->ack_offset += diff; b_del(strm->buf, diff); ret = 1; - - if (!b_data(strm->buf)) { - b_free(strm->buf); - offer_buffers(NULL, 1); - } } frm_node = eb64_next(frm_node); @@ -1464,6 +1461,9 @@ static int qcs_try_to_consume(struct qcs *qcs) pool_free(pool_head_quic_frame, frm); } + if (!b_data(&stream->buf)) + qc_stream_desc_free(stream); + return ret; } @@ -1478,23 +1478,22 @@ static inline void qc_treat_acked_tx_frm(struct quic_conn *qc, switch (frm->type) { case QUIC_FT_STREAM_8 ... QUIC_FT_STREAM_F: { - struct quic_stream *strm = &frm->stream; + struct quic_stream *strm_frm = &frm->stream; struct eb64_node *node = NULL; - struct qcs *qcs = NULL; + struct qc_stream_desc *stream = NULL; - /* do not use strm->qcs as the qcs instance might be freed at - * this stage. Use the id to do a proper lookup. + /* do not use strm_frm->stream as the qc_stream_desc instance + * might be freed at this stage. Use the id to do a proper + * lookup. * * TODO if lookup operation impact on the perf is noticeable, - * implement a refcount on qcs instances. + * implement a refcount on qc_stream_desc instances. */ - if (qc->mux_state == QC_MUX_READY) { - node = eb64_lookup(&qc->qcc->streams_by_id, strm->id); - qcs = eb64_entry(node, struct qcs, by_id); - } + node = eb64_lookup(&qc->streams_by_id, strm_frm->id); + stream = eb64_entry(node, struct qc_stream_desc, by_id); - if (!qcs) { - TRACE_PROTO("acked stream for released stream", QUIC_EV_CONN_ACKSTRM, qc, strm); + if (!stream) { + TRACE_PROTO("acked stream for released stream", QUIC_EV_CONN_ACKSTRM, qc, strm_frm); LIST_DELETE(&frm->list); quic_tx_packet_refdec(frm->pkt); pool_free(pool_head_quic_frame, frm); @@ -1503,32 +1502,34 @@ static inline void qc_treat_acked_tx_frm(struct quic_conn *qc, return; } - TRACE_PROTO("acked stream", QUIC_EV_CONN_ACKSTRM, qc, strm, qcs); - if (strm->offset.key <= qcs->tx.ack_offset) { - if (strm->offset.key + strm->len > qcs->tx.ack_offset) { - const size_t diff = strm->offset.key + strm->len - - qcs->tx.ack_offset; - qcs->tx.ack_offset += diff; - b_del(strm->buf, diff); + TRACE_PROTO("acked stream", QUIC_EV_CONN_ACKSTRM, qc, strm_frm, stream); + if (strm_frm->offset.key <= stream->ack_offset) { + if (strm_frm->offset.key + strm_frm->len > stream->ack_offset) { + const size_t diff = strm_frm->offset.key + strm_frm->len - + stream->ack_offset; + stream->ack_offset += diff; + b_del(strm_frm->buf, diff); stream_acked = 1; - if (!b_data(strm->buf)) { - b_free(strm->buf); - offer_buffers(NULL, 1); + if (!b_data(strm_frm->buf)) { + if (qc_stream_desc_free(stream)) { + /* early return */ + return; + } } } TRACE_PROTO("stream consumed", QUIC_EV_CONN_ACKSTRM, - qcs->qcc->conn->qc, strm, qcs); + qc, strm_frm, stream); LIST_DELETE(&frm->list); quic_tx_packet_refdec(frm->pkt); pool_free(pool_head_quic_frame, frm); } else { - eb64_insert(&qcs->tx.acked_frms, &strm->offset); + eb64_insert(&stream->acked_frms, &strm_frm->offset); } - stream_acked |= qcs_try_to_consume(qcs); + stream_acked |= quic_stream_try_to_consume(qc, stream); } break; default: @@ -5089,9 +5090,16 @@ static inline int qc_build_frms(struct list *outlist, struct list *inlist, LIST_DELETE(&cf->list); LIST_APPEND(outlist, &cf->list); - qcc_streams_sent_done(cf->stream.qcs, - cf->stream.len, - cf->stream.offset.key); + /* The MUX stream might be released at this + * stage. This can most notably happen on + * retransmission. + */ + if (qc->mux_state == QC_MUX_READY && + !cf->stream.stream->release) { + qcc_streams_sent_done(cf->stream.stream->ctx, + cf->stream.len, + cf->stream.offset.key); + } } else { struct quic_frame *new_cf; @@ -5104,7 +5112,7 @@ static inline int qc_build_frms(struct list *outlist, struct list *inlist, } new_cf->type = cf->type; - new_cf->stream.qcs = cf->stream.qcs; + new_cf->stream.stream = cf->stream.stream; new_cf->stream.buf = cf->stream.buf; new_cf->stream.id = cf->stream.id; if (cf->type & QUIC_STREAM_FRAME_TYPE_OFF_BIT) @@ -5124,9 +5132,16 @@ static inline int qc_build_frms(struct list *outlist, struct list *inlist, cf->stream.offset.key += dlen; cf->stream.data = (unsigned char *)b_peek(&cf_buf, dlen); - qcc_streams_sent_done(new_cf->stream.qcs, - new_cf->stream.len, - new_cf->stream.offset.key); + /* The MUX stream might be released at this + * stage. This can most notably happen on + * retransmission. + */ + if (qc->mux_state == QC_MUX_READY && + !cf->stream.stream->release) { + qcc_streams_sent_done(new_cf->stream.stream->ctx, + new_cf->stream.len, + new_cf->stream.offset.key); + } } /* TODO the MUX is notified about the frame sending via