]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MEDIUM: quic: move transport fields from qcs to qc_conn_stream
authorAmaury Denoyelle <adenoyelle@haproxy.com>
Tue, 29 Mar 2022 13:15:54 +0000 (15:15 +0200)
committerAmaury Denoyelle <adenoyelle@haproxy.com>
Wed, 30 Mar 2022 14:19:48 +0000 (16:19 +0200)
Move the xprt-buf and ack related fields from qcs to the qc_stream_desc
structure. In exchange, qcs has a pointer to the low-level stream. For
each new qcs, a qc_stream_desc is automatically allocated.

This simplify the transport layer by removing qcs/mux manipulation
during ACK frame parsing. An additional check is done to not notify the
MUX on sending if the stream is already released : this case may now
happen on retransmission.

To complete this change, the quic_stream frame now references the
quic_stream instance instead of a qcs.

include/haproxy/mux_quic-t.h
include/haproxy/quic_frame-t.h
src/mux_quic.c
src/xprt_quic.c

index 20d633b5eb614b3499def4b5b1ce5e471b86a863..acc855bf7337242c78fd38e03910a544a1a83bf8 100644 (file)
@@ -10,6 +10,7 @@
 
 #include <haproxy/buf-t.h>
 #include <haproxy/connection-t.h>
+#include <haproxy/xprt_quic-t.h>
 
 /* Stream types */
 enum qcs_type {
@@ -98,14 +99,12 @@ struct qcs {
        struct {
                uint64_t offset; /* last offset of data ready to be sent */
                uint64_t sent_offset; /* last offset sent by transport layer */
-               struct eb_root acked_frms; /* acked frames ordered by their offsets */
-               uint64_t ack_offset; /* last acked ordered byte offset */
                struct buffer buf; /* transmit buffer before sending via xprt */
-               struct buffer xprt_buf; /* buffer for xprt sending, cleared on ACK. */
                uint64_t msd; /* fctl bytes limit to respect on emission */
        } tx;
 
        struct eb64_node by_id; /* place in qcc's streams_by_id */
+       struct qc_stream_desc *stream;
 
        struct wait_event wait_event;
        struct wait_event *subs;
index 2e94e057f90407bfb611583f2eb24d5d20dd07c3..12e3cd04c37924aa40b2a085113281908068afb9 100644 (file)
@@ -34,6 +34,7 @@
 #include <import/ebtree-t.h>
 
 #include <haproxy/mux_quic-t.h>
+#include <haproxy/xprt_quic-t.h>
 
 /* QUIC frame types. */
 enum quic_frame_type {
@@ -147,7 +148,7 @@ struct quic_new_token {
 
 struct quic_stream {
        uint64_t id;
-       struct qcs *qcs;
+       struct qc_stream_desc *stream;
 
        /* used only on TX when constructing frames.
         * Data cleared when processing ACK related to this STREAM frame.
index 40e26d65d7b75224ea2e9d1e3c2395732e2d2471..f7ef248ba4413d34b9d60ead3390b683460d346a 100644 (file)
@@ -96,6 +96,7 @@ INITCALL1(STG_REGISTER, trace_register_source, TRACE_SOURCE);
 struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type)
 {
        struct qcs *qcs;
+       struct qc_stream_desc *stream;
 
        TRACE_ENTER(QMUX_EV_QCS_NEW, qcc->conn);
 
@@ -103,6 +104,15 @@ struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type)
        if (!qcs)
                goto out;
 
+       /* allocate transport layer stream descriptor */
+       stream = qc_stream_desc_new(qcc->conn->qc, id, qcs);
+       if (!stream) {
+               pool_free(pool_head_qcs, qcs);
+               qcs = NULL;
+               goto out;
+       }
+
+       qcs->stream = stream;
        qcs->qcc = qcc;
        qcs->cs = NULL;
        qcs->flags = QC_SF_NONE;
@@ -122,11 +132,8 @@ struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type)
        qcs->rx.frms = EB_ROOT_UNIQUE;
 
        qcs->tx.buf = BUF_NULL;
-       qcs->tx.xprt_buf = BUF_NULL;
        qcs->tx.offset = 0;
        qcs->tx.sent_offset = 0;
-       qcs->tx.ack_offset = 0;
-       qcs->tx.acked_frms = EB_ROOT;
 
        qcs->wait_event.tasklet = NULL;
        qcs->wait_event.events = 0;
@@ -145,11 +152,12 @@ void qcs_free(struct qcs *qcs)
 {
        b_free(&qcs->rx.buf);
        b_free(&qcs->tx.buf);
-       b_free(&qcs->tx.xprt_buf);
 
        BUG_ON(!qcs->qcc->strms[qcs_id_type(qcs->by_id.key)].nb_streams);
        --qcs->qcc->strms[qcs_id_type(qcs->by_id.key)].nb_streams;
 
+       qc_stream_desc_release(qcs->stream);
+
        eb64_delete(&qcs->by_id);
        pool_free(pool_head_qcs, qcs);
 }
@@ -260,6 +268,7 @@ struct qcs *qcc_get_qcs(struct qcc *qcc, uint64_t id)
                        for (i = largest_id + 1; i <= sub_id; i++) {
                                uint64_t id = (i << QCS_ID_TYPE_SHIFT) | strm_type;
                                enum qcs_type type = id & QCS_ID_DIR_BIT ? QCS_CLT_UNI : QCS_CLT_BIDI;
+
                                tmp_qcs = qcs_new(qcc, id, type);
                                if (!tmp_qcs) {
                                        /* allocation failure */
@@ -558,10 +567,10 @@ static int qcs_push_frame(struct qcs *qcs, struct buffer *out,
         *                       |xxxxxxxxxxxxxxxxx|
         */
 
-       BUG_ON_HOT(qcs->tx.sent_offset < qcs->tx.ack_offset);
+       BUG_ON_HOT(qcs->tx.sent_offset < qcs->stream->ack_offset);
        BUG_ON_HOT(qcs->tx.offset < qcs->tx.sent_offset);
 
-       head = qcs->tx.sent_offset - qcs->tx.ack_offset;
+       head = qcs->tx.sent_offset - qcs->stream->ack_offset;
        left = qcs->tx.offset - qcs->tx.sent_offset;
        to_xfer = QUIC_MIN(b_data(payload), b_room(out));
 
@@ -585,7 +594,7 @@ static int qcs_push_frame(struct qcs *qcs, struct buffer *out,
        total = b_force_xfer(out, payload, to_xfer);
 
        frm->type = QUIC_FT_STREAM_8;
-       frm->stream.qcs = (struct qcs *)qcs;
+       frm->stream.stream = qcs->stream;
        frm->stream.id = qcs->by_id.key;
        frm->stream.buf = out;
        frm->stream.data = (unsigned char *)b_peek(out, head);
@@ -753,7 +762,7 @@ static int qc_send(struct qcc *qcc)
        while (node) {
                struct qcs *qcs = container_of(node, struct qcs, by_id);
                struct buffer *buf = &qcs->tx.buf;
-               struct buffer *out = &qcs->tx.xprt_buf;
+               struct buffer *out = &qcs->stream->buf;
 
                /* TODO
                 * for the moment, unidirectional streams have their own
@@ -819,7 +828,8 @@ static int qc_release_detached_streams(struct qcc *qcc)
                node = eb64_next(node);
 
                if (qcs->flags & QC_SF_DETACH) {
-                       if ((!b_data(&qcs->tx.buf) && !b_data(&qcs->tx.xprt_buf))) {
+                       if (!b_data(&qcs->tx.buf) &&
+                           qcs->tx.offset == qcs->tx.sent_offset) {
                                qcs_destroy(qcs);
                                release = 1;
                        }
@@ -1033,7 +1043,7 @@ static void qc_detach(struct conn_stream *cs)
         * managment between xprt and mux is reorganized.
         */
 
-       if ((b_data(&qcs->tx.buf) || b_data(&qcs->tx.xprt_buf))) {
+       if (b_data(&qcs->tx.buf) || qcs->tx.offset > qcs->tx.sent_offset) {
                TRACE_DEVEL("leaving with remaining data, detaching qcs", QMUX_EV_STRM_END, qcc->conn, qcs);
                qcs->flags |= QC_SF_DETACH;
                return;
index 36b50fd5b345950cfff19c0f63a660f755e6df1d..5f31d0edd5ed9eba12a600da05b30de653e989b8 100644 (file)
@@ -447,12 +447,12 @@ static void quic_trace(enum trace_level level, uint64_t mask, const struct trace
 
                if (mask & QUIC_EV_CONN_ACKSTRM) {
                        const struct quic_stream *s = a2;
-                       const struct qcs *qcs = a3;
+                       const struct qc_stream_desc *stream = a3;
 
                        if (s)
                                chunk_appendf(&trace_buf, " off=%llu len=%llu", (ull)s->offset.key, (ull)s->len);
-                       if (qcs)
-                               chunk_appendf(&trace_buf, " ack_offset=%llu", (ull)qcs->tx.ack_offset);
+                       if (stream)
+                               chunk_appendf(&trace_buf, " ack_offset=%llu", (ull)stream->ack_offset);
                }
 
                if (mask & QUIC_EV_CONN_RTTUPDT) {
@@ -1421,38 +1421,35 @@ static int qc_stream_desc_free(struct qc_stream_desc *stream)
        return 0;
 }
 
-/* Remove from <qcs> stream the acknowledged frames.
+/* Remove from <stream> the acknowledged frames.
  *
  * Returns 1 if at least one frame was removed else 0.
  */
-static int qcs_try_to_consume(struct qcs *qcs)
+static int quic_stream_try_to_consume(struct quic_conn *qc,
+                                      struct qc_stream_desc *stream)
 {
        int ret;
        struct eb64_node *frm_node;
 
        ret = 0;
-       frm_node = eb64_first(&qcs->tx.acked_frms);
+       frm_node = eb64_first(&stream->acked_frms);
        while (frm_node) {
                struct quic_stream *strm;
                struct quic_frame *frm;
 
                strm = eb64_entry(&frm_node->node, struct quic_stream, offset);
-               if (strm->offset.key > qcs->tx.ack_offset)
+               if (strm->offset.key > stream->ack_offset)
                        break;
 
                TRACE_PROTO("stream consumed", QUIC_EV_CONN_ACKSTRM,
-                           qcs->qcc->conn->qc, strm, qcs);
-               if (strm->offset.key + strm->len > qcs->tx.ack_offset) {
+                           qc, strm, stream);
+
+               if (strm->offset.key + strm->len > stream->ack_offset) {
                        const size_t diff = strm->offset.key + strm->len -
-                                           qcs->tx.ack_offset;
-                       qcs->tx.ack_offset += diff;
+                                           stream->ack_offset;
+                       stream->ack_offset += diff;
                        b_del(strm->buf, diff);
                        ret = 1;
-
-                       if (!b_data(strm->buf)) {
-                               b_free(strm->buf);
-                               offer_buffers(NULL, 1);
-                       }
                }
 
                frm_node = eb64_next(frm_node);
@@ -1464,6 +1461,9 @@ static int qcs_try_to_consume(struct qcs *qcs)
                pool_free(pool_head_quic_frame, frm);
        }
 
+       if (!b_data(&stream->buf))
+               qc_stream_desc_free(stream);
+
        return ret;
 }
 
@@ -1478,23 +1478,22 @@ static inline void qc_treat_acked_tx_frm(struct quic_conn *qc,
        switch (frm->type) {
        case QUIC_FT_STREAM_8 ... QUIC_FT_STREAM_F:
        {
-               struct quic_stream *strm = &frm->stream;
+               struct quic_stream *strm_frm = &frm->stream;
                struct eb64_node *node = NULL;
-               struct qcs *qcs = NULL;
+               struct qc_stream_desc *stream = NULL;
 
-               /* do not use strm->qcs as the qcs instance might be freed at
-                * this stage. Use the id to do a proper lookup.
+               /* do not use strm_frm->stream as the qc_stream_desc instance
+                * might be freed at this stage. Use the id to do a proper
+                * lookup.
                 *
                 * TODO if lookup operation impact on the perf is noticeable,
-                * implement a refcount on qcs instances.
+                * implement a refcount on qc_stream_desc instances.
                 */
-               if (qc->mux_state == QC_MUX_READY) {
-                       node = eb64_lookup(&qc->qcc->streams_by_id, strm->id);
-                       qcs = eb64_entry(node, struct qcs, by_id);
-               }
+               node = eb64_lookup(&qc->streams_by_id, strm_frm->id);
+               stream = eb64_entry(node, struct qc_stream_desc, by_id);
 
-               if (!qcs) {
-                       TRACE_PROTO("acked stream for released stream", QUIC_EV_CONN_ACKSTRM, qc, strm);
+               if (!stream) {
+                       TRACE_PROTO("acked stream for released stream", QUIC_EV_CONN_ACKSTRM, qc, strm_frm);
                        LIST_DELETE(&frm->list);
                        quic_tx_packet_refdec(frm->pkt);
                        pool_free(pool_head_quic_frame, frm);
@@ -1503,32 +1502,34 @@ static inline void qc_treat_acked_tx_frm(struct quic_conn *qc,
                        return;
                }
 
-               TRACE_PROTO("acked stream", QUIC_EV_CONN_ACKSTRM, qc, strm, qcs);
-               if (strm->offset.key <= qcs->tx.ack_offset) {
-                       if (strm->offset.key + strm->len > qcs->tx.ack_offset) {
-                               const size_t diff = strm->offset.key + strm->len -
-                                                   qcs->tx.ack_offset;
-                               qcs->tx.ack_offset += diff;
-                               b_del(strm->buf, diff);
+               TRACE_PROTO("acked stream", QUIC_EV_CONN_ACKSTRM, qc, strm_frm, stream);
+               if (strm_frm->offset.key <= stream->ack_offset) {
+                       if (strm_frm->offset.key + strm_frm->len > stream->ack_offset) {
+                               const size_t diff = strm_frm->offset.key + strm_frm->len -
+                                                   stream->ack_offset;
+                               stream->ack_offset += diff;
+                               b_del(strm_frm->buf, diff);
                                stream_acked = 1;
 
-                               if (!b_data(strm->buf)) {
-                                       b_free(strm->buf);
-                                       offer_buffers(NULL, 1);
+                               if (!b_data(strm_frm->buf)) {
+                                       if (qc_stream_desc_free(stream)) {
+                                               /* early return */
+                                               return;
+                                       }
                                }
                        }
 
                        TRACE_PROTO("stream consumed", QUIC_EV_CONN_ACKSTRM,
-                                   qcs->qcc->conn->qc, strm, qcs);
+                                   qc, strm_frm, stream);
                        LIST_DELETE(&frm->list);
                        quic_tx_packet_refdec(frm->pkt);
                        pool_free(pool_head_quic_frame, frm);
                }
                else {
-                       eb64_insert(&qcs->tx.acked_frms, &strm->offset);
+                       eb64_insert(&stream->acked_frms, &strm_frm->offset);
                }
 
-               stream_acked |= qcs_try_to_consume(qcs);
+               stream_acked |= quic_stream_try_to_consume(qc, stream);
        }
        break;
        default:
@@ -5089,9 +5090,16 @@ static inline int qc_build_frms(struct list *outlist, struct list *inlist,
                                LIST_DELETE(&cf->list);
                                LIST_APPEND(outlist, &cf->list);
 
-                               qcc_streams_sent_done(cf->stream.qcs,
-                                                     cf->stream.len,
-                                                     cf->stream.offset.key);
+                               /* The MUX stream might be released at this
+                                * stage. This can most notably happen on
+                                * retransmission.
+                                */
+                               if (qc->mux_state == QC_MUX_READY &&
+                                   !cf->stream.stream->release) {
+                                       qcc_streams_sent_done(cf->stream.stream->ctx,
+                                                             cf->stream.len,
+                                                             cf->stream.offset.key);
+                               }
                        }
                        else {
                                struct quic_frame *new_cf;
@@ -5104,7 +5112,7 @@ static inline int qc_build_frms(struct list *outlist, struct list *inlist,
                                }
 
                                new_cf->type = cf->type;
-                               new_cf->stream.qcs = cf->stream.qcs;
+                               new_cf->stream.stream = cf->stream.stream;
                                new_cf->stream.buf = cf->stream.buf;
                                new_cf->stream.id = cf->stream.id;
                                if (cf->type & QUIC_STREAM_FRAME_TYPE_OFF_BIT)
@@ -5124,9 +5132,16 @@ static inline int qc_build_frms(struct list *outlist, struct list *inlist,
                                cf->stream.offset.key += dlen;
                                cf->stream.data = (unsigned char *)b_peek(&cf_buf, dlen);
 
-                               qcc_streams_sent_done(new_cf->stream.qcs,
-                                                     new_cf->stream.len,
-                                                     new_cf->stream.offset.key);
+                               /* The MUX stream might be released at this
+                                * stage. This can most notably happen on
+                                * retransmission.
+                                */
+                               if (qc->mux_state == QC_MUX_READY &&
+                                   !cf->stream.stream->release) {
+                                       qcc_streams_sent_done(new_cf->stream.stream->ctx,
+                                                             new_cf->stream.len,
+                                                             new_cf->stream.offset.key);
+                               }
                        }
 
                        /* TODO the MUX is notified about the frame sending via