]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MINOR: quic: refactor STREAM room notification
authorAmaury Denoyelle <adenoyelle@haproxy.com>
Wed, 25 Sep 2024 16:25:08 +0000 (18:25 +0200)
committerAmaury Denoyelle <adenoyelle@haproxy.com>
Tue, 1 Oct 2024 14:19:25 +0000 (16:19 +0200)
qc_stream_desc is an intermediary layer between QUIC MUX and quic_conn.
It is a facility which permits to store data to emit and keep them for
retransmission until acknowledgment. This layer is responsible to notify
QUIC MUX each time a buffer is freed. This is necessary as MUX buffer
allocation is limited by the underlying congestion window size.

Refactor this to use a mechanism similar to send notification. A new
callback notify_room can now be registered to qc_stream_desc instance.
This is set by QUIC MUX to qmux_ctrl_room(). On MUX QUIC free, special
care is now taken to reset notify_room callback to NULL.

Thanks to this refactoring, further adjustment have been made to refine
the architecture. One of them is the removal of qc_stream_desc
QC_SD_FL_OOB_BUF, which is now converted to a MUX layer flag
QC_SF_TXBUF_OOB.

include/haproxy/mux_quic-t.h
include/haproxy/quic_stream-t.h
include/haproxy/quic_stream.h
src/mux_quic.c
src/quic_stream.c
src/quic_tls.c

index ad2ebd9045c5f9e7c159429f1cac06376bda5679..cb9e27befcc1fdd800e78ea5e227e60d8b1fdcf6 100644 (file)
@@ -243,7 +243,7 @@ static forceinline char *qcc_show_flags(char *buf, size_t len, const char *delim
 #define QC_SF_FIN_STREAM        0x00000002  /* FIN bit must be set for last frame of the stream */
 #define QC_SF_BLK_MROOM         0x00000004  /* app layer is blocked waiting for room in the qcs.tx.buf */
 #define QC_SF_DETACH            0x00000008  /* sc is detached but there is remaining data to send */
-/* unused 0x00000010 */
+#define QC_SF_TXBUB_OOB         0x00000010  /* stream reserved for metadata out-of-band transmission; txbuf allocation is unrestricted */
 #define QC_SF_DEM_FULL          0x00000020  /* demux blocked on request channel buffer full */
 #define QC_SF_READ_ABORTED      0x00000040  /* Rx closed using STOP_SENDING*/
 #define QC_SF_TO_RESET          0x00000080  /* a RESET_STREAM must be sent */
index 8e0dcfd48437b6c3b6218ea0596eb59dfbe0667b..78ec8b562a8f1956e48dc654693498d16b448470 100644 (file)
@@ -22,7 +22,6 @@ struct qc_stream_buf {
 
 #define QC_SD_FL_RELEASE       0x00000001 /* set when MUX has finished to use this stream */
 #define QC_SD_FL_WAIT_FOR_FIN  0x00000002 /* set if sent FIN is waiting for acknowledgement */
-#define QC_SD_FL_OOB_BUF       0x00000004 /* buffers not accounted against conn limit */
 
 /* QUIC STREAM descriptor.
  *
@@ -47,6 +46,7 @@ struct qc_stream_desc {
        int flags; /* QC_SD_FL_* values */
 
        void (*notify_send)(struct qc_stream_desc *, uint64_t offset, uint64_t len);
+       void (*notify_room)(struct qc_stream_desc *, uint64_t room);
        void *ctx; /* notify context */
 };
 
index fcd843010804eb49ecbc986ffc3ba52e872e42aa..fccf1f8dba1a6cf437be5ccbec861d708f98b308 100644 (file)
@@ -10,7 +10,8 @@ struct quic_conn;
 
 struct qc_stream_desc *qc_stream_desc_new(uint64_t id, enum qcs_type, void *ctx,
                                           struct quic_conn *qc);
-void qc_stream_desc_release(struct qc_stream_desc *stream, uint64_t final_size);
+void qc_stream_desc_release(struct qc_stream_desc *stream, uint64_t final_size,
+                            void *new_ctx);
 int qc_stream_desc_ack(struct qc_stream_desc **stream, size_t offset, size_t len, int fin);
 void qc_stream_desc_free(struct qc_stream_desc *stream, int closing);
 
@@ -37,5 +38,12 @@ static inline void qc_stream_desc_sub_send(struct qc_stream_desc *stream,
        stream->notify_send = cb;
 }
 
+/* Subscribe for room notification on <stream>. */
+static inline void qc_stream_desc_sub_room(struct qc_stream_desc *stream,
+                                           void (*cb)(struct qc_stream_desc *s, uint64_t offset))
+{
+       stream->notify_room = cb;
+}
+
 #endif /* USE_QUIC */
 #endif /* _HAPROXY_QUIC_STREAM_H_ */
index a7299fbc733dc9d8311c13178c8d77624d70d0ce..ed951c3d3b376e1bfc2c4966c6150c9a4c58bf4f 100644 (file)
@@ -33,6 +33,7 @@ DECLARE_POOL(pool_head_qcc, "qcc", sizeof(struct qcc));
 DECLARE_POOL(pool_head_qcs, "qcs", sizeof(struct qcs));
 
 static void qmux_ctrl_send(struct qc_stream_desc *, uint64_t data, uint64_t offset);
+static void qmux_ctrl_room(struct qc_stream_desc *, uint64_t room);
 
 static void qcs_free_ncbuf(struct qcs *qcs, struct ncbuf *ncbuf)
 {
@@ -82,7 +83,7 @@ static void qcs_free(struct qcs *qcs)
        /* Release qc_stream_desc buffer from quic-conn layer. */
        if (qcs->stream) {
                qc_stream_desc_sub_send(qcs->stream, NULL);
-               qc_stream_desc_release(qcs->stream, qcs->tx.fc.off_real);
+               qc_stream_desc_release(qcs->stream, qcs->tx.fc.off_real, qcc);
        }
 
        /* Free Rx buffer. */
@@ -186,6 +187,7 @@ static struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type)
                }
 
                qc_stream_desc_sub_send(qcs->stream, qmux_ctrl_send);
+               qc_stream_desc_sub_room(qcs->stream, qmux_ctrl_room);
        }
 
        if (qcc->app_ops->attach && qcc->app_ops->attach(qcs, qcc->ctx)) {
@@ -634,6 +636,14 @@ static inline int qcc_bufwnd_full(const struct qcc *qcc)
        return qcc->tx.buf_in_flight >= qc->path->cwnd;
 }
 
+static void qmux_ctrl_room(struct qc_stream_desc *stream, uint64_t room)
+{
+       /* Context is different for active and released streams. */
+       struct qcc *qcc = !(stream->flags & QC_SD_FL_RELEASE) ?
+         ((struct qcs *)stream->ctx)->qcc : stream->ctx;
+       qcc_notify_buf(qcc, room);
+}
+
 /* Report that one or several stream-desc buffers have been released for <qcc>
  * connection. <free_size> represent the sum of freed buffers sizes. May also
  * be used to notify about congestion window increase, in which case
@@ -841,7 +851,8 @@ void qcs_send_metadata(struct qcs *qcs)
        /* Cannot use if some data already transferred for this stream. */
        BUG_ON(qcs->stream->ack_offset || !LIST_ISEMPTY(&qcs->stream->buf_list));
 
-       qcs->stream->flags |= QC_SD_FL_OOB_BUF;
+       qcs->flags |= QC_SF_TXBUB_OOB;
+       qc_stream_desc_sub_room(qcs->stream, NULL);
 }
 
 struct stconn *qcs_attach_sc(struct qcs *qcs, struct buffer *buf, char fin)
@@ -1149,7 +1160,6 @@ struct buffer *qcc_get_stream_txbuf(struct qcs *qcs, int *err, int small)
 {
        struct qcc *qcc = qcs->qcc;
        struct buffer *out = qc_stream_buf_get(qcs->stream);
-       const int unlimited = qcs->stream->flags & QC_SD_FL_OOB_BUF;
 
        /* Stream must not try to reallocate a buffer if currently waiting for one. */
        BUG_ON(LIST_INLIST(&qcs->el_buf));
@@ -1157,7 +1167,7 @@ struct buffer *qcc_get_stream_txbuf(struct qcs *qcs, int *err, int small)
        *err = 0;
 
        if (!out) {
-               if (likely(!unlimited)) {
+               if (likely(!(qcs->flags & QC_SF_TXBUB_OOB))) {
                        if ((qcc->flags & QC_CF_CONN_FULL)) {
                                LIST_APPEND(&qcc->buf_wait_list, &qcs->el_buf);
                                tot_time_start(&qcs->timer.buf);
@@ -1180,7 +1190,7 @@ struct buffer *qcc_get_stream_txbuf(struct qcs *qcs, int *err, int small)
                        goto out;
                }
 
-               if (likely(!unlimited))
+               if (likely(!(qcs->flags & QC_SF_TXBUB_OOB)))
                        qcc->tx.buf_in_flight += b_size(out);
        }
 
@@ -1199,12 +1209,11 @@ struct buffer *qcc_realloc_stream_txbuf(struct qcs *qcs)
 {
        struct qcc *qcc = qcs->qcc;
        struct buffer *out = qc_stream_buf_get(qcs->stream);
-       const int unlimited = qcs->stream->flags & QC_SD_FL_OOB_BUF;
 
        /* Stream must not try to reallocate a buffer if currently waiting for one. */
        BUG_ON(LIST_INLIST(&qcs->el_buf));
 
-       if (likely(!unlimited)) {
+       if (likely(!(qcs->flags & QC_SF_TXBUB_OOB))) {
                /* Reduce buffer window. As such there is always some space
                 * left for a new buffer allocation.
                 */
@@ -1218,7 +1227,7 @@ struct buffer *qcc_realloc_stream_txbuf(struct qcs *qcs)
                goto out;
        }
 
-       if (likely(!unlimited))
+       if (likely(!(qcs->flags & QC_SF_TXBUB_OOB)))
                qcc->tx.buf_in_flight += b_size(out);
 
  out:
@@ -2617,6 +2626,7 @@ static void qcc_release(struct qcc *qcc)
 {
        struct connection *conn = qcc->conn;
        struct eb64_node *node;
+       struct quic_conn *qc = conn->handle.qc;
 
        TRACE_ENTER(QMUX_EV_QCC_END, conn);
 
@@ -2633,6 +2643,14 @@ static void qcc_release(struct qcc *qcc)
                qcs_free(qcs);
        }
 
+       /* unsubscribe from all remaining qc_stream_desc */
+       node = eb64_first(&qc->streams_by_id);
+       while (node) {
+               struct qc_stream_desc *stream = eb64_entry(node, struct qc_stream_desc, by_id);
+               qc_stream_desc_sub_room(stream, NULL);
+               node = eb64_next(node);
+       }
+
        tasklet_free(qcc->wait_event.tasklet);
        if (conn && qcc->wait_event.events) {
                conn->xprt->unsubscribe(conn, conn->xprt_ctx,
index 80d56ba38328a7bfd91c7e2c60f83237aa0e6812..4aead31ff18f833f1685bf374bdc5e96d707b938 100644 (file)
@@ -28,7 +28,6 @@ static inline int qc_stream_desc_done(const struct qc_stream_desc *s)
 static void qc_stream_buf_free(struct qc_stream_desc *stream,
                                struct qc_stream_buf **stream_buf)
 {
-       struct quic_conn *qc = stream->qc;
        struct buffer *buf = &(*stream_buf)->buf;
        uint64_t free_size;
 
@@ -50,12 +49,8 @@ static void qc_stream_buf_free(struct qc_stream_desc *stream,
        *stream_buf = NULL;
 
        /* notify MUX about available buffers. */
-       if (qc->mux_state == QC_MUX_READY) {
-               if (!(stream->flags & QC_SD_FL_OOB_BUF)) {
-                       /* notify MUX about available buffers. */
-                       qcc_notify_buf(qc->qcc, free_size);
-               }
-       }
+       if (stream->notify_room)
+               stream->notify_room(stream, free_size);
 }
 
 /* Allocate a new stream descriptor with id <id>. The caller is responsible to
@@ -92,6 +87,7 @@ struct qc_stream_desc *qc_stream_desc_new(uint64_t id, enum qcs_type type, void
        stream->flags = 0;
        stream->ctx = ctx;
        stream->notify_send = NULL;
+       stream->notify_room = NULL;
 
        return stream;
 }
@@ -102,15 +98,19 @@ struct qc_stream_desc *qc_stream_desc_new(uint64_t id, enum qcs_type type, void
  * <final_size> corresponds to the last offset sent for this stream. If there
  * is unsent data present, they will be remove first to guarantee that buffer
  * is freed after receiving all acknowledges.
+ *
+ * It is expected that upper layer instance related to <stream> may disappear
+ * after this operation. As such, <new_ctx> must be set to reassociate <stream>
+ * for notifications.
  */
 void qc_stream_desc_release(struct qc_stream_desc *stream,
-                            uint64_t final_size)
+                            uint64_t final_size, void *new_ctx)
 {
        /* A stream can be released only one time. */
        BUG_ON(stream->flags & QC_SD_FL_RELEASE);
 
        stream->flags |= QC_SD_FL_RELEASE;
-       stream->ctx = NULL;
+       stream->ctx = new_ctx;
 
        if (stream->buf) {
                struct qc_stream_buf *stream_buf = stream->buf;
index 5a16ebdb101b0230a09c70a9ed9bed52a54a814b..87265a887b9b189b31e89bec76e959aae14dbbf2 100644 (file)
@@ -124,7 +124,7 @@ void quic_cstream_free(struct quic_cstream *cs)
 
        quic_free_ncbuf(&cs->rx.ncbuf);
 
-       qc_stream_desc_release(cs->desc, 0);
+       qc_stream_desc_release(cs->desc, 0, NULL);
        pool_free(pool_head_quic_cstream, cs);
 }