From: Amaury Denoyelle Date: Wed, 25 Sep 2024 16:25:08 +0000 (+0200) Subject: MINOR: quic: refactor STREAM room notification X-Git-Tag: v3.1-dev9~15 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=db68f8ed8604c5865cc044f577629b1d75df4153;p=thirdparty%2Fhaproxy.git MINOR: quic: refactor STREAM room notification qc_stream_desc is an intermediary layer between QUIC MUX and quic_conn. It is a facility which permits to store data to emit and keep them for retransmission until acknowledgment. This layer is responsible to notify QUIC MUX each time a buffer is freed. This is necessary as MUX buffer allocation is limited by the underlying congestion window size. Refactor this to use a mechanism similar to send notification. A new callback notify_room can now be registered to qc_stream_desc instance. This is set by QUIC MUX to qmux_ctrl_room(). On MUX QUIC free, special care is now taken to reset notify_room callback to NULL. Thanks to this refactoring, further adjustment have been made to refine the architecture. One of them is the removal of qc_stream_desc QC_SD_FL_OOB_BUF, which is now converted to a MUX layer flag QC_SF_TXBUF_OOB. --- diff --git a/include/haproxy/mux_quic-t.h b/include/haproxy/mux_quic-t.h index ad2ebd9045..cb9e27befc 100644 --- a/include/haproxy/mux_quic-t.h +++ b/include/haproxy/mux_quic-t.h @@ -243,7 +243,7 @@ static forceinline char *qcc_show_flags(char *buf, size_t len, const char *delim #define QC_SF_FIN_STREAM 0x00000002 /* FIN bit must be set for last frame of the stream */ #define QC_SF_BLK_MROOM 0x00000004 /* app layer is blocked waiting for room in the qcs.tx.buf */ #define QC_SF_DETACH 0x00000008 /* sc is detached but there is remaining data to send */ -/* unused 0x00000010 */ +#define QC_SF_TXBUB_OOB 0x00000010 /* stream reserved for metadata out-of-band transmission; txbuf allocation is unrestricted */ #define QC_SF_DEM_FULL 0x00000020 /* demux blocked on request channel buffer full */ #define QC_SF_READ_ABORTED 0x00000040 /* Rx closed using STOP_SENDING*/ #define QC_SF_TO_RESET 0x00000080 /* a RESET_STREAM must be sent */ diff --git a/include/haproxy/quic_stream-t.h b/include/haproxy/quic_stream-t.h index 8e0dcfd484..78ec8b562a 100644 --- a/include/haproxy/quic_stream-t.h +++ b/include/haproxy/quic_stream-t.h @@ -22,7 +22,6 @@ struct qc_stream_buf { #define QC_SD_FL_RELEASE 0x00000001 /* set when MUX has finished to use this stream */ #define QC_SD_FL_WAIT_FOR_FIN 0x00000002 /* set if sent FIN is waiting for acknowledgement */ -#define QC_SD_FL_OOB_BUF 0x00000004 /* buffers not accounted against conn limit */ /* QUIC STREAM descriptor. * @@ -47,6 +46,7 @@ struct qc_stream_desc { int flags; /* QC_SD_FL_* values */ void (*notify_send)(struct qc_stream_desc *, uint64_t offset, uint64_t len); + void (*notify_room)(struct qc_stream_desc *, uint64_t room); void *ctx; /* notify context */ }; diff --git a/include/haproxy/quic_stream.h b/include/haproxy/quic_stream.h index fcd8430108..fccf1f8dba 100644 --- a/include/haproxy/quic_stream.h +++ b/include/haproxy/quic_stream.h @@ -10,7 +10,8 @@ struct quic_conn; struct qc_stream_desc *qc_stream_desc_new(uint64_t id, enum qcs_type, void *ctx, struct quic_conn *qc); -void qc_stream_desc_release(struct qc_stream_desc *stream, uint64_t final_size); +void qc_stream_desc_release(struct qc_stream_desc *stream, uint64_t final_size, + void *new_ctx); int qc_stream_desc_ack(struct qc_stream_desc **stream, size_t offset, size_t len, int fin); void qc_stream_desc_free(struct qc_stream_desc *stream, int closing); @@ -37,5 +38,12 @@ static inline void qc_stream_desc_sub_send(struct qc_stream_desc *stream, stream->notify_send = cb; } +/* Subscribe for room notification on . */ +static inline void qc_stream_desc_sub_room(struct qc_stream_desc *stream, + void (*cb)(struct qc_stream_desc *s, uint64_t offset)) +{ + stream->notify_room = cb; +} + #endif /* USE_QUIC */ #endif /* _HAPROXY_QUIC_STREAM_H_ */ diff --git a/src/mux_quic.c b/src/mux_quic.c index a7299fbc73..ed951c3d3b 100644 --- a/src/mux_quic.c +++ b/src/mux_quic.c @@ -33,6 +33,7 @@ DECLARE_POOL(pool_head_qcc, "qcc", sizeof(struct qcc)); DECLARE_POOL(pool_head_qcs, "qcs", sizeof(struct qcs)); static void qmux_ctrl_send(struct qc_stream_desc *, uint64_t data, uint64_t offset); +static void qmux_ctrl_room(struct qc_stream_desc *, uint64_t room); static void qcs_free_ncbuf(struct qcs *qcs, struct ncbuf *ncbuf) { @@ -82,7 +83,7 @@ static void qcs_free(struct qcs *qcs) /* Release qc_stream_desc buffer from quic-conn layer. */ if (qcs->stream) { qc_stream_desc_sub_send(qcs->stream, NULL); - qc_stream_desc_release(qcs->stream, qcs->tx.fc.off_real); + qc_stream_desc_release(qcs->stream, qcs->tx.fc.off_real, qcc); } /* Free Rx buffer. */ @@ -186,6 +187,7 @@ static struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type) } qc_stream_desc_sub_send(qcs->stream, qmux_ctrl_send); + qc_stream_desc_sub_room(qcs->stream, qmux_ctrl_room); } if (qcc->app_ops->attach && qcc->app_ops->attach(qcs, qcc->ctx)) { @@ -634,6 +636,14 @@ static inline int qcc_bufwnd_full(const struct qcc *qcc) return qcc->tx.buf_in_flight >= qc->path->cwnd; } +static void qmux_ctrl_room(struct qc_stream_desc *stream, uint64_t room) +{ + /* Context is different for active and released streams. */ + struct qcc *qcc = !(stream->flags & QC_SD_FL_RELEASE) ? + ((struct qcs *)stream->ctx)->qcc : stream->ctx; + qcc_notify_buf(qcc, room); +} + /* Report that one or several stream-desc buffers have been released for * connection. represent the sum of freed buffers sizes. May also * be used to notify about congestion window increase, in which case @@ -841,7 +851,8 @@ void qcs_send_metadata(struct qcs *qcs) /* Cannot use if some data already transferred for this stream. */ BUG_ON(qcs->stream->ack_offset || !LIST_ISEMPTY(&qcs->stream->buf_list)); - qcs->stream->flags |= QC_SD_FL_OOB_BUF; + qcs->flags |= QC_SF_TXBUB_OOB; + qc_stream_desc_sub_room(qcs->stream, NULL); } struct stconn *qcs_attach_sc(struct qcs *qcs, struct buffer *buf, char fin) @@ -1149,7 +1160,6 @@ struct buffer *qcc_get_stream_txbuf(struct qcs *qcs, int *err, int small) { struct qcc *qcc = qcs->qcc; struct buffer *out = qc_stream_buf_get(qcs->stream); - const int unlimited = qcs->stream->flags & QC_SD_FL_OOB_BUF; /* Stream must not try to reallocate a buffer if currently waiting for one. */ BUG_ON(LIST_INLIST(&qcs->el_buf)); @@ -1157,7 +1167,7 @@ struct buffer *qcc_get_stream_txbuf(struct qcs *qcs, int *err, int small) *err = 0; if (!out) { - if (likely(!unlimited)) { + if (likely(!(qcs->flags & QC_SF_TXBUB_OOB))) { if ((qcc->flags & QC_CF_CONN_FULL)) { LIST_APPEND(&qcc->buf_wait_list, &qcs->el_buf); tot_time_start(&qcs->timer.buf); @@ -1180,7 +1190,7 @@ struct buffer *qcc_get_stream_txbuf(struct qcs *qcs, int *err, int small) goto out; } - if (likely(!unlimited)) + if (likely(!(qcs->flags & QC_SF_TXBUB_OOB))) qcc->tx.buf_in_flight += b_size(out); } @@ -1199,12 +1209,11 @@ struct buffer *qcc_realloc_stream_txbuf(struct qcs *qcs) { struct qcc *qcc = qcs->qcc; struct buffer *out = qc_stream_buf_get(qcs->stream); - const int unlimited = qcs->stream->flags & QC_SD_FL_OOB_BUF; /* Stream must not try to reallocate a buffer if currently waiting for one. */ BUG_ON(LIST_INLIST(&qcs->el_buf)); - if (likely(!unlimited)) { + if (likely(!(qcs->flags & QC_SF_TXBUB_OOB))) { /* Reduce buffer window. As such there is always some space * left for a new buffer allocation. */ @@ -1218,7 +1227,7 @@ struct buffer *qcc_realloc_stream_txbuf(struct qcs *qcs) goto out; } - if (likely(!unlimited)) + if (likely(!(qcs->flags & QC_SF_TXBUB_OOB))) qcc->tx.buf_in_flight += b_size(out); out: @@ -2617,6 +2626,7 @@ static void qcc_release(struct qcc *qcc) { struct connection *conn = qcc->conn; struct eb64_node *node; + struct quic_conn *qc = conn->handle.qc; TRACE_ENTER(QMUX_EV_QCC_END, conn); @@ -2633,6 +2643,14 @@ static void qcc_release(struct qcc *qcc) qcs_free(qcs); } + /* unsubscribe from all remaining qc_stream_desc */ + node = eb64_first(&qc->streams_by_id); + while (node) { + struct qc_stream_desc *stream = eb64_entry(node, struct qc_stream_desc, by_id); + qc_stream_desc_sub_room(stream, NULL); + node = eb64_next(node); + } + tasklet_free(qcc->wait_event.tasklet); if (conn && qcc->wait_event.events) { conn->xprt->unsubscribe(conn, conn->xprt_ctx, diff --git a/src/quic_stream.c b/src/quic_stream.c index 80d56ba383..4aead31ff1 100644 --- a/src/quic_stream.c +++ b/src/quic_stream.c @@ -28,7 +28,6 @@ static inline int qc_stream_desc_done(const struct qc_stream_desc *s) static void qc_stream_buf_free(struct qc_stream_desc *stream, struct qc_stream_buf **stream_buf) { - struct quic_conn *qc = stream->qc; struct buffer *buf = &(*stream_buf)->buf; uint64_t free_size; @@ -50,12 +49,8 @@ static void qc_stream_buf_free(struct qc_stream_desc *stream, *stream_buf = NULL; /* notify MUX about available buffers. */ - if (qc->mux_state == QC_MUX_READY) { - if (!(stream->flags & QC_SD_FL_OOB_BUF)) { - /* notify MUX about available buffers. */ - qcc_notify_buf(qc->qcc, free_size); - } - } + if (stream->notify_room) + stream->notify_room(stream, free_size); } /* Allocate a new stream descriptor with id . The caller is responsible to @@ -92,6 +87,7 @@ struct qc_stream_desc *qc_stream_desc_new(uint64_t id, enum qcs_type type, void stream->flags = 0; stream->ctx = ctx; stream->notify_send = NULL; + stream->notify_room = NULL; return stream; } @@ -102,15 +98,19 @@ struct qc_stream_desc *qc_stream_desc_new(uint64_t id, enum qcs_type type, void * corresponds to the last offset sent for this stream. If there * is unsent data present, they will be remove first to guarantee that buffer * is freed after receiving all acknowledges. + * + * It is expected that upper layer instance related to may disappear + * after this operation. As such, must be set to reassociate + * for notifications. */ void qc_stream_desc_release(struct qc_stream_desc *stream, - uint64_t final_size) + uint64_t final_size, void *new_ctx) { /* A stream can be released only one time. */ BUG_ON(stream->flags & QC_SD_FL_RELEASE); stream->flags |= QC_SD_FL_RELEASE; - stream->ctx = NULL; + stream->ctx = new_ctx; if (stream->buf) { struct qc_stream_buf *stream_buf = stream->buf; diff --git a/src/quic_tls.c b/src/quic_tls.c index 5a16ebdb10..87265a887b 100644 --- a/src/quic_tls.c +++ b/src/quic_tls.c @@ -124,7 +124,7 @@ void quic_cstream_free(struct quic_cstream *cs) quic_free_ncbuf(&cs->rx.ncbuf); - qc_stream_desc_release(cs->desc, 0); + qc_stream_desc_release(cs->desc, 0, NULL); pool_free(pool_head_quic_cstream, cs); }