From: Amaury Denoyelle Date: Mon, 4 Jul 2022 13:50:33 +0000 (+0200) Subject: MEDIUM: mux-quic: refactor streams opening X-Git-Tag: v2.7-dev2~101 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=a509ffb505bf2f81c2963312c6bcd0b6f2cf28a6;p=thirdparty%2Fhaproxy.git MEDIUM: mux-quic: refactor streams opening Review the whole API used to access/instantiate qcs. A public function qcc_open_stream_local() is available to the application protocol layer. It allows to easily opening a local stream. The ID is automatically attributed to the next one available. For remote streams, qcc_open_stream_remote() has been implemented. It will automatically take care of allocating streams in a linear way according to the ID. This function is called via qcc_get_qcs() which can be used for each qcc_recv*() operations. For the moment, it is only used for STREAM frames via qcc_recv(), but soon it will be implemented for other frames types which can also be used to open a new stream. qcs_new() and qcs_free() has been restricted to the MUX QUIC only as they are now reserved for internal usage. This change is a pure refactoring and should not have any noticeable impact. It clarifies the developer intent and help to ensure that a stream is not automatically opened when not desired. --- diff --git a/include/haproxy/mux_quic-t.h b/include/haproxy/mux_quic-t.h index 1772a3847b..168f074443 100644 --- a/include/haproxy/mux_quic-t.h +++ b/include/haproxy/mux_quic-t.h @@ -37,7 +37,6 @@ struct qcc { struct { uint64_t max_streams; /* maximum number of concurrent streams */ - uint64_t largest_id; /* Largest ID of the open streams */ uint64_t nb_streams; /* Number of open streams */ struct { uint64_t max_data; /* Maximum number of bytes which may be received */ @@ -80,6 +79,11 @@ struct qcc { uint64_t sent_offsets; /* sum of all offset sent */ } tx; + uint64_t largest_bidi_r; /* largest remote bidi stream ID opened. */ + uint64_t largest_uni_r; /* largest remote uni stream ID opened. */ + uint64_t next_bidi_l; /* next stream ID to use for local bidi stream */ + uint64_t next_uni_l; /* next stream ID to use for local uni stream */ + struct eb_root streams_by_id; /* all active streams by their ID */ struct list send_retry_list; /* list of qcs eligible to send retry */ diff --git a/include/haproxy/mux_quic.h b/include/haproxy/mux_quic.h index 3da5736e75..75e39a769e 100644 --- a/include/haproxy/mux_quic.h +++ b/include/haproxy/mux_quic.h @@ -14,9 +14,7 @@ #include #include -struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type); -void qcs_free(struct qcs *qcs); - +struct qcs *qcc_open_stream_local(struct qcc *qcc, int bidi); struct buffer *qc_get_buf(struct qcs *qcs, struct buffer *bptr); int qcs_subscribe(struct qcs *qcs, int event_type, struct wait_event *es); @@ -71,8 +69,6 @@ static inline int quic_stream_is_bidi(uint64_t id) return !quic_stream_is_uni(id); } -struct qcs *qcc_get_qcs(struct qcc *qcc, uint64_t id); - /* Install the applicative layer of a QUIC connection on mux . * Returns 0 on success else non-zero. */ diff --git a/src/h3.c b/src/h3.c index 60c003bfe8..eb930af141 100644 --- a/src/h3.c +++ b/src/h3.c @@ -1081,7 +1081,7 @@ static int h3_finalize(void *ctx) struct h3c *h3c = ctx; struct qcs *qcs; - qcs = qcs_new(h3c->qcc, 0x3, QCS_SRV_UNI); + qcs = qcc_open_stream_local(h3c->qcc, 0); if (!qcs) return 0; diff --git a/src/mux_quic.c b/src/mux_quic.c index 8abf1640b3..24c7373ba6 100644 --- a/src/mux_quic.c +++ b/src/mux_quic.c @@ -113,7 +113,7 @@ static void qcc_emit_cc(struct qcc *qcc, int err) } /* Allocate a new QUIC streams with id and type . */ -struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type) +static struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type) { struct qcs *qcs; @@ -201,7 +201,7 @@ static void qc_free_ncbuf(struct qcs *qcs, struct ncbuf *ncbuf) * error or connection shutdown. Else use qcs_destroy which handle all the * QUIC connection mechanism. */ -void qcs_free(struct qcs *qcs) +static void qcs_free(struct qcs *qcs) { qc_free_ncbuf(qcs, &qcs->rx.ncbuf); b_free(&qcs->tx.buf); @@ -291,92 +291,180 @@ void qcs_notify_send(struct qcs *qcs) } } -/* Retrieve as an ebtree node the stream with as ID, possibly allocates - * several streams, depending on the already open ones. - * Return this node if succeeded, NULL if not. +/* Open a locally initiated stream for the connection . Set for a + * bidirectional stream, else an unidirectional stream is opened. The next + * available ID on the connection will be used according to the stream type. + * + * Returns the allocated stream instance or NULL on error. */ -struct qcs *qcc_get_qcs(struct qcc *qcc, uint64_t id) +struct qcs *qcc_open_stream_local(struct qcc *qcc, int bidi) +{ + struct qcs *qcs; + enum qcs_type type; + uint64_t *next; + + TRACE_ENTER(QMUX_EV_QCS_NEW, qcc->conn); + + if (bidi) { + next = &qcc->next_bidi_l; + type = conn_is_back(qcc->conn) ? QCS_CLT_BIDI : QCS_SRV_BIDI; + } + else { + next = &qcc->next_uni_l; + type = conn_is_back(qcc->conn) ? QCS_CLT_UNI : QCS_SRV_UNI; + } + + /* TODO ensure that we won't overflow remote peer flow control limit on + * streams. Else, we should emit a STREAMS_BLOCKED frame. + */ + + qcs = qcs_new(qcc, *next, type); + if (!qcs) + return NULL; + + TRACE_DEVEL("opening local stream", QMUX_EV_QCS_NEW, qcc->conn, qcs); + *next += 4; + + TRACE_LEAVE(QMUX_EV_QCS_NEW, qcc->conn); + return qcs; +} + +/* Open a remote initiated stream for the connection with ID . The + * caller is responsible to ensure that a stream with the same ID was not + * already opened. This function will also create all intermediaries streams + * with ID smaller than not already opened before. + * + * Returns the allocated stream instance or NULL on error. + */ +static struct qcs *qcc_open_stream_remote(struct qcc *qcc, uint64_t id) { - unsigned int strm_type; - int64_t sub_id; - struct eb64_node *node; struct qcs *qcs = NULL; + enum qcs_type type; + uint64_t *largest; - strm_type = id & QCS_ID_TYPE_MASK; - sub_id = id >> QCS_ID_TYPE_SHIFT; - node = NULL; - if (quic_stream_is_local(qcc, id)) { - /* Local streams: this stream must be already opened. */ - node = eb64_lookup(&qcc->streams_by_id, id); - if (!node) { - /* unknown stream id */ - goto out; - } - qcs = eb64_entry(node, struct qcs, by_id); + TRACE_ENTER(QMUX_EV_QCS_NEW, qcc->conn); + + BUG_ON_HOT(quic_stream_is_local(qcc, id)); + + if (quic_stream_is_bidi(id)) { + largest = &qcc->largest_bidi_r; + type = conn_is_back(qcc->conn) ? QCS_SRV_BIDI : QCS_CLT_BIDI; } else { - /* Remote streams. */ - struct eb_root *strms; - uint64_t largest_id; - enum qcs_type qcs_type; - - strms = &qcc->streams_by_id; - qcs_type = qcs_id_type(id); - - /* TODO also checks max-streams for uni streams */ - if (quic_stream_is_bidi(id)) { - if (sub_id + 1 > qcc->lfctl.ms_bidi) { - /* RFC 9000 4.6. Controlling Concurrency - * - * An endpoint that receives a frame with a - * stream ID exceeding the limit it has sent - * MUST treat this as a connection error of - * type STREAM_LIMIT_ERROR - */ - qcc_emit_cc(qcc, QC_ERR_STREAM_LIMIT_ERROR); - goto out; - } - } + largest = &qcc->largest_uni_r; + type = conn_is_back(qcc->conn) ? QCS_SRV_UNI : QCS_CLT_UNI; + } - /* Note: ->largest_id was initialized with (uint64_t)-1 as value, 0 being a - * correct value. - */ - largest_id = qcc->strms[qcs_type].largest_id; - if (sub_id > (int64_t)largest_id) { - /* RFC: "A stream ID that is used out of order results in all streams - * of that type with lower-numbered stream IDs also being opened". - * So, let's "open" these streams. + /* TODO also checks max-streams for uni streams */ + if (quic_stream_is_bidi(id)) { + if (id >= qcc->lfctl.ms_bidi * 4) { + /* RFC 9000 4.6. Controlling Concurrency + * + * An endpoint that receives a frame with a + * stream ID exceeding the limit it has sent + * MUST treat this as a connection error of + * type STREAM_LIMIT_ERROR */ - int64_t i; - struct qcs *tmp_qcs; + TRACE_DEVEL("leaving on flow control error", QMUX_EV_QCS_NEW, qcc->conn); + qcc_emit_cc(qcc, QC_ERR_STREAM_LIMIT_ERROR); + return NULL; + } + } - tmp_qcs = NULL; - for (i = largest_id + 1; i <= sub_id; i++) { - uint64_t id = (i << QCS_ID_TYPE_SHIFT) | strm_type; - enum qcs_type type = id & QCS_ID_DIR_BIT ? QCS_CLT_UNI : QCS_CLT_BIDI; + /* Only stream ID not already opened can be used. */ + BUG_ON(id < *largest); - tmp_qcs = qcs_new(qcc, id, type); - if (!tmp_qcs) { - /* allocation failure */ - goto out; - } + while (id >= *largest) { + const char *str = *largest < id ? "opening intermediary stream" : + "opening remote stream"; - qcc->strms[qcs_type].largest_id = i; - } - if (tmp_qcs) - qcs = tmp_qcs; - } - else { - node = eb64_lookup(strms, id); - if (node) - qcs = eb64_entry(node, struct qcs, by_id); + qcs = qcs_new(qcc, *largest, type); + if (!qcs) { + TRACE_DEVEL("leaving on stream fallocation failure", QMUX_EV_QCS_NEW, qcc->conn); + return NULL; } + + TRACE_DEVEL(str, QMUX_EV_QCS_NEW, qcc->conn, qcs); + *largest += 4; } + TRACE_LEAVE(QMUX_EV_QCS_NEW, qcc->conn); return qcs; +} + +/* Use this function for a stream which is not in stream tree. It + * returns true if the associated stream is closed. + */ +static int qcc_stream_id_is_closed(struct qcc *qcc, uint64_t id) +{ + uint64_t *largest; + + /* This function must only be used for stream not present in the stream tree. */ + BUG_ON_HOT(eb64_lookup(&qcc->streams_by_id, id)); + + if (quic_stream_is_local(qcc, id)) { + largest = quic_stream_is_uni(id) ? &qcc->next_uni_l : + &qcc->next_bidi_l; + } + else { + largest = quic_stream_is_uni(id) ? &qcc->largest_uni_r : + &qcc->largest_bidi_r; + } + + return id < *largest; +} + +/* Retrieve the stream instance from ID. This can be used when receiving + * STREAM, STREAM_DATA_BLOCKED, RESET_STREAM, MAX_STREAM_DATA or STOP_SENDING + * frames. + * + * Return the stream instance or NULL if not found. + */ +static struct qcs *qcc_get_qcs(struct qcc *qcc, uint64_t id) +{ + struct eb64_node *node; + struct qcs *qcs = NULL; + + TRACE_ENTER(QMUX_EV_QCC_RECV, qcc->conn); + + /* Search the stream in the connection tree. */ + node = eb64_lookup(&qcc->streams_by_id, id); + if (node) { + qcs = eb64_entry(node, struct qcs, by_id); + TRACE_DEVEL("using stream from connection tree", QMUX_EV_QCC_RECV, qcc->conn, qcs); + return qcs; + } + + /* Check if stream is already closed. */ + if (qcc_stream_id_is_closed(qcc, id)) { + TRACE_DEVEL("already released stream", QMUX_EV_QCC_RECV|QMUX_EV_QCC_NQCS, qcc->conn, NULL, &id); + return NULL; + } + + /* Create the stream. This is valid only for remote initiated one. A + * local stream must have already been explicitely created by the + * application protocol layer. + */ + if (quic_stream_is_local(qcc, id)) { + /* RFC 9000 19.8. STREAM Frames + * + * An endpoint MUST terminate the connection with error + * STREAM_STATE_ERROR if it receives a STREAM frame for a locally + * initiated stream that has not yet been created, or for a send-only + * stream. + */ + TRACE_DEVEL("leaving on locally initiated stream not yet created", QMUX_EV_QCC_RECV|QMUX_EV_QCC_NQCS, qcc->conn, NULL, &id); + qcc_emit_cc(qcc, QC_ERR_STREAM_STATE_ERROR); + return NULL; + } + else { + /* Remote stream not found - try to open it. */ + qcs = qcc_open_stream_remote(qcc, id); + } out: - return NULL; + TRACE_LEAVE(QMUX_EV_QCC_RECV, qcc->conn); + return qcs; } /* Simple function to duplicate a buffer */ @@ -515,30 +603,8 @@ int qcc_recv(struct qcc *qcc, uint64_t id, uint64_t len, uint64_t offset, } qcs = qcc_get_qcs(qcc, id); - if (!qcs) { - if ((id >> QCS_ID_TYPE_SHIFT) <= qcc->strms[qcs_id_type(id)].largest_id) { - TRACE_DEVEL("already released stream", QMUX_EV_QCC_RECV|QMUX_EV_QCC_NQCS, qcc->conn, NULL, &id); - return 0; - } - else { - /* RFC 9000 19.8. STREAM Frames - * - * An endpoint MUST terminate the connection with error - * STREAM_STATE_ERROR if it receives a STREAM frame for a locally - * initiated stream that has not yet been created, or for a send-only - * stream. - */ - if (quic_stream_is_local(qcc, id)) { - TRACE_DEVEL("leaving on locally initiated stream not yet created", QMUX_EV_QCC_RECV|QMUX_EV_QCC_NQCS, qcc->conn, NULL, &id); - qcc_emit_cc(qcc, QC_ERR_STREAM_STATE_ERROR); - return 1; - } - else { - TRACE_DEVEL("leaving on stream not found", QMUX_EV_QCC_RECV|QMUX_EV_QCC_NQCS, qcc->conn, NULL, &id); - return 1; - } - } - } + if (!qcs) + return 0; if (offset + len <= qcs->rx.offset) { TRACE_DEVEL("leaving on already received offset", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV, qcc->conn, qcs); @@ -1370,26 +1436,22 @@ static int qc_init(struct connection *conn, struct proxy *prx, /* Client initiated streams must respect the server flow control. */ qcc->strms[QCS_CLT_BIDI].max_streams = lparams->initial_max_streams_bidi; qcc->strms[QCS_CLT_BIDI].nb_streams = 0; - qcc->strms[QCS_CLT_BIDI].largest_id = -1; qcc->strms[QCS_CLT_BIDI].rx.max_data = 0; qcc->strms[QCS_CLT_BIDI].tx.max_data = lparams->initial_max_stream_data_bidi_remote; qcc->strms[QCS_CLT_UNI].max_streams = lparams->initial_max_streams_uni; qcc->strms[QCS_CLT_UNI].nb_streams = 0; - qcc->strms[QCS_CLT_UNI].largest_id = -1; qcc->strms[QCS_CLT_UNI].rx.max_data = 0; qcc->strms[QCS_CLT_UNI].tx.max_data = lparams->initial_max_stream_data_uni; /* Server initiated streams must respect the server flow control. */ qcc->strms[QCS_SRV_BIDI].max_streams = 0; qcc->strms[QCS_SRV_BIDI].nb_streams = 0; - qcc->strms[QCS_SRV_BIDI].largest_id = -1; qcc->strms[QCS_SRV_BIDI].rx.max_data = lparams->initial_max_stream_data_bidi_local; qcc->strms[QCS_SRV_BIDI].tx.max_data = 0; qcc->strms[QCS_SRV_UNI].max_streams = 0; qcc->strms[QCS_SRV_UNI].nb_streams = 0; - qcc->strms[QCS_SRV_UNI].largest_id = -1; qcc->strms[QCS_SRV_UNI].rx.max_data = lparams->initial_max_stream_data_uni; qcc->strms[QCS_SRV_UNI].tx.max_data = 0; @@ -1407,6 +1469,19 @@ static int qc_init(struct connection *conn, struct proxy *prx, qcc->rfctl.msd_bidi_l = rparams->initial_max_stream_data_bidi_local; qcc->rfctl.msd_bidi_r = rparams->initial_max_stream_data_bidi_remote; + if (conn_is_back(conn)) { + qcc->next_bidi_l = 0x00; + qcc->largest_bidi_r = 0x01; + qcc->next_uni_l = 0x02; + qcc->largest_uni_r = 0x03; + } + else { + qcc->largest_bidi_r = 0x00; + qcc->next_bidi_l = 0x01; + qcc->largest_uni_r = 0x02; + qcc->next_uni_l = 0x03; + } + qcc->wait_event.tasklet = tasklet_new(); if (!qcc->wait_event.tasklet) goto fail_no_tasklet;