]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MEDIUM: quic: refactor buffered STREAM ACK consuming
authorAmaury Denoyelle <adenoyelle@haproxy.com>
Mon, 30 Sep 2024 07:48:29 +0000 (09:48 +0200)
committerAmaury Denoyelle <adenoyelle@haproxy.com>
Tue, 1 Oct 2024 14:22:23 +0000 (16:22 +0200)
For the moment, streamdesc layer can only deal with in-order ACK at the
stream level. Received out-of-order ACKs are buffered in a tree attached
to a streambuf instance.

Previously, caller of qc_stream_desc_ack() was responsible to implement
consumption of these buffered ACKs. Refactor this by implementing it
directly at the streamdesc layer within qc_stream_desc_ack(). This
simplifies quic_rx ACK handling and ensure buffered ACKs are consumed as
soon as possible.

src/quic_rx.c
src/quic_stream.c

index 5537d63f385abf5354709b8eb29920efe3d50fb8..08e73b4ec06c91d5a8e8e0091656650b1454d167 100644 (file)
@@ -201,71 +201,6 @@ static int qc_pkt_decrypt(struct quic_conn *qc, struct quic_enc_level *qel,
        return ret;
 }
 
-/* Remove from <stream> the acknowledged frames.
- *
- * Returns 1 if at least one frame was removed else 0.
- */
-static int quic_stream_try_to_consume(struct quic_conn *qc,
-                                      struct qc_stream_desc *stream)
-{
-       int ret;
-       struct eb64_node *frm_node;
-       struct qc_stream_buf *stream_buf;
-       struct eb64_node *buf_node;
-
-       TRACE_ENTER(QUIC_EV_CONN_ACKSTRM, qc);
-
-       ret = 0;
-       buf_node = eb64_first(&stream->buf_tree);
-       if (buf_node) {
-               stream_buf = eb64_entry(buf_node, struct qc_stream_buf,
-                                       offset_node);
-
-               frm_node = eb64_first(&stream_buf->acked_frms);
-               while (frm_node) {
-                       struct qf_stream *strm_frm;
-                       struct quic_frame *frm;
-                       size_t offset;
-
-                       strm_frm = eb64_entry(frm_node, struct qf_stream, offset);
-                       frm = container_of(strm_frm, struct quic_frame, stream);
-                       offset = strm_frm->offset.key;
-
-                       if (offset > stream->ack_offset)
-                               break;
-
-                       /* First delete frm from tree. This prevents BUG_ON() if
-                        * stream_buf instance is removed via qc_stream_desc_ack().
-                        */
-                       eb64_delete(frm_node);
-
-                       if (qc_stream_desc_ack(stream, frm)) {
-                               TRACE_DEVEL("stream consumed", QUIC_EV_CONN_ACKSTRM,
-                                           qc, strm_frm, stream);
-                               ret = 1;
-                       }
-                       qc_release_frm(qc, frm);
-
-                       /* Always retrieve first buffer as the previously used
-                        * instance could have been removed during qc_stream_desc_ack().
-                        */
-                       buf_node = eb64_first(&stream->buf_tree);
-                       if (buf_node) {
-                               stream_buf = eb64_entry(buf_node, struct qc_stream_buf,
-                                                       offset_node);
-                               frm_node = eb64_first(&stream_buf->acked_frms);
-                               BUG_ON(!frm_node && !b_data(&stream_buf->buf));
-                       }
-                       else {
-                               frm_node = NULL;
-                       }
-               }
-       }
-
-       TRACE_LEAVE(QUIC_EV_CONN_ACKSTRM, qc);
-       return ret;
-}
-
 /* Handle <frm> frame whose packet it is attached to has just been acknowledged. The memory allocated
  * for this frame will be at least released in every cases.
  * Never fail.
@@ -298,13 +233,10 @@ static void qc_handle_newly_acked_frm(struct quic_conn *qc, struct quic_frame *f
                }
                stream = eb64_entry(node, struct qc_stream_desc, by_id);
 
-               TRACE_DEVEL("acked stream", QUIC_EV_CONN_ACKSTRM, qc, strm_frm, stream);
                if (!qc_stream_desc_ack(stream, frm)) {
-                       TRACE_DEVEL("stream consumed", QUIC_EV_CONN_ACKSTRM,
+                       TRACE_DEVEL("stream consumed on ACK received", QUIC_EV_CONN_ACKSTRM,
                                    qc, strm_frm, stream);
 
-                       quic_stream_try_to_consume(qc, stream);
-
                        if (qc_stream_desc_done(stream)) {
                                /* no need to continue if stream freed. */
                                TRACE_DEVEL("stream released and freed", QUIC_EV_CONN_ACKSTRM, qc);
index 47546b579f67b6d2c81c404e545c86820e748395..9d0b3ed8f11e063c611d9efeb563c1bdaf6f0d5f 100644 (file)
@@ -134,6 +134,79 @@ void qc_stream_desc_release(struct qc_stream_desc *stream,
        }
 }
 
+/* Acknowledges data for buffer <buf> attached to <stream> instance. This covers
+ * the range strating at <offset> and of length <len>, with <fin> sets for the
+ * last stream frame.
+ *
+ * Returns <buf> if there is still data to acknowledge or buffered ACK to
+ * consume after completing the operation. Else, the next buffer instance of
+ * stream is returned if it exists or NULL in the contrary case.
+ */
+static struct qc_stream_buf *qc_stream_buf_ack(struct qc_stream_buf *buf,
+                                               struct qc_stream_desc *stream,
+                                               uint64_t offset, uint64_t len, int fin)
+{
+       /* This function does not deal with out-of-order ACK. */
+       BUG_ON(offset > stream->ack_offset);
+
+       if (offset + len > stream->ack_offset) {
+               const uint64_t diff = offset + len - stream->ack_offset;
+               b_del(&buf->buf, diff);
+               stream->ack_offset += diff;
+       }
+
+       if (fin) {
+               /* Mark FIN as acknowledged. */
+               stream->flags &= ~QC_SD_FL_WAIT_FOR_FIN;
+       }
+
+       if (!b_data(&buf->buf) && eb_is_empty(&buf->acked_frms)) {
+               qc_stream_buf_free(stream, &buf);
+               /* Retrieve next buffer instance. */
+               buf = !eb_is_empty(&stream->buf_tree) ?
+                 eb64_entry(eb64_first(&stream->buf_tree), struct qc_stream_buf, offset_node) :
+                 NULL;
+       }
+
+       return buf;
+}
+
+/* Consume buffered ACK starting at <stream_buf>. If all buffer data is
+ * removed, <stream_buf> is freed and consume will be conducted for following
+ * streambufs from <stream> if present.
+ */
+static void qc_stream_buf_consume(struct qc_stream_buf *stream_buf,
+                                  struct qc_stream_desc *stream)
+{
+       struct eb64_node *frm_node;
+       struct qf_stream *strm_frm;
+       struct quic_frame *frm;
+       uint64_t offset, len;
+       int fin;
+
+       frm_node = eb64_first(&stream_buf->acked_frms);
+       while (frm_node) {
+               strm_frm = eb64_entry(frm_node, struct qf_stream, offset);
+               frm = container_of(strm_frm, struct quic_frame, stream);
+
+               offset = strm_frm->offset.key;
+               len = strm_frm->len;
+               fin = frm->type & QUIC_STREAM_FRAME_TYPE_FIN_BIT;
+
+               if (offset > stream->ack_offset)
+                       break;
+
+               /* Delete frame before acknowledged it. This prevents BUG_ON()
+                * on non-empty acked_frms tree when stream_buf is empty and removed.
+                */
+               eb64_delete(frm_node);
+               stream_buf = qc_stream_buf_ack(stream_buf, stream, offset, len, fin);
+               qc_release_frm(NULL, frm);
+
+               frm_node = stream_buf ? eb64_first(&stream_buf->acked_frms) : NULL;
+       }
+}
+
 /* Acknowledge <frm> STREAM frame whose content is managed by <stream>
  * descriptor.
  *
@@ -150,63 +223,42 @@ int qc_stream_desc_ack(struct qc_stream_desc *stream, struct quic_frame *frm)
 
        struct qc_stream_buf *stream_buf = NULL;
        struct eb64_node *buf_node;
-       struct buffer *buf = NULL;
-       size_t diff;
+       int ret = 0;
 
        /* Cannot advertise FIN for an inferior data range. */
        BUG_ON(fin && offset + len < stream->ack_offset);
 
-       if (offset + len < stream->ack_offset) {
-               return 0;
+       /* Do nothing for offset + len < stream->ack_offset as data were
+        * already acknowledged and removed.
+        */
+
+       if (!len) {
+               BUG_ON(!fin); /* An empty STREAM frame is only needed for a late FIN reporting. */
+
+               /* Empty STREAM frame with FIN can be acknowledged out-of-order. */
+               stream->flags &= ~QC_SD_FL_WAIT_FOR_FIN;
        }
        else if (offset > stream->ack_offset) {
                buf_node = eb64_lookup_le(&stream->buf_tree, offset);
                BUG_ON(!buf_node); /* Cannot acknowledged a STREAM frame for a non existing buffer. */
-
                stream_buf = eb64_entry(buf_node, struct qc_stream_buf, offset_node);
                eb64_insert(&stream_buf->acked_frms, &strm_frm->offset);
-               return 1;
+               ret = 1;
        }
-
-       diff = offset + len - stream->ack_offset;
-       if (diff) {
+       else if (offset + len > stream->ack_offset) {
                /* Buf list cannot be empty if there is still unacked data. */
                BUG_ON(eb_is_empty(&stream->buf_tree));
 
                /* get oldest buffer from buf tree */
                stream_buf = eb64_entry(eb64_first(&stream->buf_tree), struct qc_stream_buf, offset_node);
-               buf = &stream_buf->buf;
+               stream_buf = qc_stream_buf_ack(stream_buf, stream, offset, len, fin);
 
-               stream->ack_offset += diff;
-               b_del(buf, diff);
-
-               /* Free oldest buffer if all data acknowledged. */
-               if (!b_data(buf)) {
-                       /* Remove buffered ACK before deleting buffer instance. */
-                       while (!eb_is_empty(&stream_buf->acked_frms)) {
-                               struct quic_conn *qc = stream->qc;
-                               struct eb64_node *frm_node;
-                               struct qf_stream *strm_frm;
-                               struct quic_frame *frm;
-
-                               frm_node = eb64_first(&stream_buf->acked_frms);
-                               eb64_delete(frm_node);
-
-                               strm_frm = eb64_entry(frm_node, struct qf_stream, offset);
-                               frm = container_of(strm_frm, struct quic_frame, stream);
-                               qc_release_frm(qc, frm);
-                       }
-                       qc_stream_buf_free(stream, &stream_buf);
-                       buf = NULL;
-               }
-       }
-
-       if (fin) {
-               /* Mark FIN as acknowledged. */
-               stream->flags &= ~QC_SD_FL_WAIT_FOR_FIN;
+               /* some data were acknowledged, try to consume buffered ACKs */
+               if (stream_buf)
+                       qc_stream_buf_consume(stream_buf, stream);
        }
 
-       return 0;
+       return ret;
 }
 
 /* Free the stream descriptor <stream> content. This function should be used