]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MEDIUM: mux-quic: release Tx buf on too small room
authorAmaury Denoyelle <adenoyelle@haproxy.com>
Mon, 22 Jan 2024 16:03:41 +0000 (17:03 +0100)
committerAmaury Denoyelle <adenoyelle@haproxy.com>
Wed, 31 Jan 2024 15:28:54 +0000 (16:28 +0100)
This commit is a direct follow-up on the major rearchitecture of send
buffering. It allows application protocol to react if current QCS
sending buffer space is too small. In this case, the buffer can be
released to the quic-conn layer. This allows to allocate a new QCS
buffer and retry HTX parsing, unless connection buffer pool is already
depleted.

A new function qcc_release_stream_txbuf() serves as API for app protocol
to release the QCS sending buffer. This operation fails if there is
unsent data in it. In this case, MUX has to keep it to finalize transfer
of unsent data to quic-conn layer. QCS is thus flagged with
QC_SF_BLK_MROOM to interrupt snd_buf operation.

When all data are sent to the quic-conn layer, QC_SF_BLK_MROOM is
cleared via qcc_streams_sent_done() and stream layer is woken up to
restart snd_buf.

Note that a new function qcc_stream_can_send() has been defined. It
allows app proto to check if sending is currently blocked for the
current QCS. For now, it checks QC_SF_BLK_MROOM flag. However, it will
be extended to other conditions with the following patches.

include/haproxy/mux_quic-t.h
include/haproxy/mux_quic.h
src/h3.c
src/hq_interop.c
src/mux_quic.c

index f4300de570dab43bec796ce54f123c4855ff2b13..a98234253dfff9ccbc70946159cddad297fb5e86 100644 (file)
@@ -104,7 +104,7 @@ struct qcc {
 #define QC_SF_NONE              0x00000000
 #define QC_SF_SIZE_KNOWN        0x00000001  /* last frame received for this stream */
 #define QC_SF_FIN_STREAM        0x00000002  /* FIN bit must be set for last frame of the stream */
-/* unused 0x00000004 */
+#define QC_SF_BLK_MROOM         0x00000004  /* app layer is blocked waiting for room in the qcs.tx.buf */
 #define QC_SF_DETACH            0x00000008  /* sc is detached but there is remaining data to send */
 /* unused 0x00000010 */
 #define QC_SF_DEM_FULL          0x00000020  /* demux blocked on request channel buffer full */
index 425924212041c29cf6652258053f8644bab2ccda..a35dddc9bfe55b5c3bac035a1bf5c52470aa4edb 100644 (file)
@@ -24,6 +24,8 @@ void qcs_notify_send(struct qcs *qcs);
 
 struct buffer *qcc_get_stream_rxbuf(struct qcs *qcs);
 struct buffer *qcc_get_stream_txbuf(struct qcs *qcs);
+int qcc_release_stream_txbuf(struct qcs *qcs);
+int qcc_stream_can_send(const struct qcs *qcs);
 void qcc_reset_stream(struct qcs *qcs, int err);
 void qcc_send_stream(struct qcs *qcs, int urg, int count);
 void qcc_abort_stream_read(struct qcs *qcs);
index 778f5bb69d3495d267c8c46fc25efad682945075..431922f01194a118e75bf2a0c133e235eaaef176 100644 (file)
--- a/src/h3.c
+++ b/src/h3.c
@@ -1707,6 +1707,7 @@ static int h3_resp_trailers_send(struct qcs *qcs, struct htx *htx)
 
        list[hdr].n = ist("");
 
+ start:
        if (!(res = qcc_get_stream_txbuf(qcs))) {
                TRACE_ERROR("cannot allocate Tx buffer", H3_EV_TX_FRAME|H3_EV_TX_HDR, qcs->qcc->conn, qcs);
                h3c->err = H3_INTERNAL_ERROR;
@@ -1715,9 +1716,12 @@ static int h3_resp_trailers_send(struct qcs *qcs, struct htx *htx)
 
        /* At least 9 bytes to store frame type + length as a varint max size */
        if (b_room(res) < 9) {
-               /* TODO */
                TRACE_STATE("not enough room for trailers frame", H3_EV_TX_FRAME|H3_EV_TX_HDR, qcs->qcc->conn, qcs);
-               ABORT_NOW();
+               if (qcc_release_stream_txbuf(qcs))
+                       goto end;
+
+               /* Buffer released, restart processing. */
+               goto start;
        }
 
        /* Force buffer realignment as size required to encode headers is unknown. */
@@ -1727,9 +1731,12 @@ static int h3_resp_trailers_send(struct qcs *qcs, struct htx *htx)
        headers_buf = b_make(b_peek(res, b_data(res) + 9), b_contig_space(res) - 9, 0, 0);
 
        if (qpack_encode_field_section_line(&headers_buf)) {
-               /* TODO */
                TRACE_STATE("not enough room for trailers section line", H3_EV_TX_FRAME|H3_EV_TX_HDR, qcs->qcc->conn, qcs);
-               ABORT_NOW();
+               if (qcc_release_stream_txbuf(qcs))
+                       goto end;
+
+               /* Buffer released, restart processing. */
+               goto start;
        }
 
        tail = b_tail(&headers_buf);
@@ -1749,9 +1756,12 @@ static int h3_resp_trailers_send(struct qcs *qcs, struct htx *htx)
                }
 
                if (qpack_encode_header(&headers_buf, list[hdr].n, list[hdr].v)) {
-                       /* TODO */
                        TRACE_STATE("not enough room for all trailers", H3_EV_TX_FRAME|H3_EV_TX_HDR, qcs->qcc->conn, qcs);
-                       ABORT_NOW();
+                       if (qcc_release_stream_txbuf(qcs))
+                               goto end;
+
+                       /* Buffer released, restart processing. */
+                       goto start;
                }
        }
 
@@ -1871,10 +1881,17 @@ static int h3_resp_data_send(struct qcs *qcs, struct htx *htx,
        /* TODO buffer can be realign only if no data waiting for ACK. */
        outbuf = b_make(b_tail(res), b_contig_space(res), 0, 0);
 
+       /* Not enough room for headers and at least one data byte, try to
+        * release the current buffer and allocate a new one. If not possible,
+        * stconn layer will subscribe on SEND.
+        */
        if (b_size(&outbuf) <= hsize) {
-               /* TODO */
                TRACE_STATE("not enough room for data frame", H3_EV_TX_FRAME|H3_EV_TX_DATA, qcs->qcc->conn, qcs);
-               ABORT_NOW();
+               if (qcc_release_stream_txbuf(qcs))
+                       goto end;
+
+               /* Buffer released, restart processing. */
+               goto new_frame;
        }
 
        if (b_size(&outbuf) < hsize + fsize)
@@ -1925,7 +1942,8 @@ static size_t h3_snd_buf(struct qcs *qcs, struct buffer *buf, size_t count)
 
        htx = htx_from_buf(buf);
 
-       while (count && !htx_is_empty(htx) && !h3c->err) {
+       while (count && !htx_is_empty(htx) && qcc_stream_can_send(qcs) &&
+              !h3c->err) {
 
                idx = htx_get_head(htx);
                blk = htx_get_blk(htx, idx);
@@ -2028,6 +2046,7 @@ static size_t h3_nego_ff(struct qcs *qcs, size_t count)
 
        TRACE_ENTER(H3_EV_STRM_SEND, qcs->qcc->conn, qcs);
 
+ start:
        if (!(res = qcc_get_stream_txbuf(qcs))) {
                qcs->sd->iobuf.flags |= IOBUF_FL_NO_FF;
                goto end;
@@ -2042,9 +2061,13 @@ static size_t h3_nego_ff(struct qcs *qcs, size_t count)
         * on SEND.
         */
        if (b_contig_space(res) <= hsize) {
-               /* TODO */
-               qcs->sd->iobuf.flags |= IOBUF_FL_FF_BLOCKED;
-               goto end;
+               if (qcc_release_stream_txbuf(qcs)) {
+                       qcs->sd->iobuf.flags |= IOBUF_FL_FF_BLOCKED;
+                       goto end;
+               }
+
+               /* Buffer released, restart processing. */
+               goto start;
        }
 
        /* Cannot forward more than available room in output buffer */
index 261db276bb94c2cebba15cbfebd3c90932b1bcb6..0d0e47f59c94b85d52ed30903b29b970126ba7b5 100644 (file)
@@ -98,7 +98,7 @@ static size_t hq_interop_snd_buf(struct qcs *qcs, struct buffer *buf,
 
        htx = htx_from_buf(buf);
 
-       while (count && !htx_is_empty(htx)) {
+       while (count && !htx_is_empty(htx) && qcc_stream_can_send(qcs)) {
                /* Not implemented : QUIC on backend side */
                idx = htx_get_head(htx);
                blk = htx_get_blk(htx, idx);
@@ -144,8 +144,9 @@ static size_t hq_interop_snd_buf(struct qcs *qcs, struct buffer *buf,
                                fsize = b_contig_space(res);
 
                        if (!fsize) {
-                               /* TODO */
-                               ABORT_NOW();
+                               /* Release buf and restart parsing if sending still possible. */
+                               qcc_release_stream_txbuf(qcs);
+                               continue;
                        }
 
                        b_putblk(res, htx_get_blk_ptr(htx, blk), fsize);
@@ -181,6 +182,7 @@ static size_t hq_interop_nego_ff(struct qcs *qcs, size_t count)
        int ret = 0;
        struct buffer *res;
 
+ start:
        res = qcc_get_stream_txbuf(qcs);
        if (!res) {
                qcs->sd->iobuf.flags |= IOBUF_FL_FF_BLOCKED;
@@ -190,8 +192,12 @@ static size_t hq_interop_nego_ff(struct qcs *qcs, size_t count)
        }
 
        if (!b_room(res)) {
-               /* TODO */
-               ABORT_NOW();
+               if (qcc_release_stream_txbuf(qcs)) {
+                       qcs->sd->iobuf.flags |= IOBUF_FL_FF_BLOCKED;
+                       goto end;
+               }
+
+               goto start;
        }
 
        /* No header required for HTTP/0.9, no need to reserve an offset. */
index 0024b5d45ec92e01b77931a3b42a89c153efd6b3..408e20c01aa8a020ec72b18ec2bad58bd2ebcd9f 100644 (file)
@@ -962,6 +962,35 @@ static uint64_t qcs_prep_bytes(const struct qcs *qcs)
        return b_data(out) - diff;
 }
 
+/* Release the current <qcs> Tx buffer. This is useful if space left is not
+ * enough anymore. A new instance can then be allocated to continue sending.
+ *
+ * This operation fails if there is not yet sent bytes in the buffer. In this
+ * case, stream layer should interrupt sending until further notification.
+ *
+ * Returns 0 if buffer is released and a new one can be allocated or non-zero
+ * if there is still remaining data.
+ */
+int qcc_release_stream_txbuf(struct qcs *qcs)
+{
+       const uint64_t bytes = qcs_prep_bytes(qcs);
+
+       /* Cannot release buffer if prepared data is not fully sent. */
+       if (bytes) {
+               qcs->flags |= QC_SF_BLK_MROOM;
+               return 1;
+       }
+
+       qc_stream_buf_release(qcs->stream);
+       return 0;
+}
+
+/* Returns true if stream layer can proceed to emission via <qcs>. */
+int qcc_stream_can_send(const struct qcs *qcs)
+{
+       return !(qcs->flags & QC_SF_BLK_MROOM);
+}
+
 /* Wakes up every streams of <qcc> which are currently waiting for sending but
  * are blocked on connection flow control.
  */
@@ -1726,8 +1755,11 @@ void qcc_streams_sent_done(struct qcs *qcs, uint64_t data, uint64_t offset)
                                    QMUX_EV_QCS_SEND, qcc->conn, qcs);
                }
                /* Release buffer if everything sent and buf is full or stream is waiting for room. */
-               if (!qcs_prep_bytes(qcs) && (b_full(&qcs->stream->buf->buf))) {
+               if (!qcs_prep_bytes(qcs) &&
+                   (b_full(&qcs->stream->buf->buf) || qcs->flags & QC_SF_BLK_MROOM)) {
                        qc_stream_buf_release(qcs->stream);
+                       qcs->flags &= ~QC_SF_BLK_MROOM;
+                       qcs_notify_send(qcs);
                }
 
                /* Add measurement for send rate. This is done at the MUX layer