]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MEDIUM: mux-quic: refactor streams opening
authorAmaury Denoyelle <adenoyelle@haproxy.com>
Mon, 4 Jul 2022 13:50:33 +0000 (15:50 +0200)
committerAmaury Denoyelle <adenoyelle@haproxy.com>
Tue, 5 Jul 2022 14:18:27 +0000 (16:18 +0200)
Review the whole API used to access/instantiate qcs.

A public function qcc_open_stream_local() is available to the
application protocol layer. It allows to easily opening a local stream.
The ID is automatically attributed to the next one available.

For remote streams, qcc_open_stream_remote() has been implemented. It
will automatically take care of allocating streams in a linear way
according to the ID. This function is called via qcc_get_qcs() which can
be used for each qcc_recv*() operations. For the moment, it is only used
for STREAM frames via qcc_recv(), but soon it will be implemented for
other frames types which can also be used to open a new stream.

qcs_new() and qcs_free() has been restricted to the MUX QUIC only as
they are now reserved for internal usage.

This change is a pure refactoring and should not have any noticeable
impact. It clarifies the developer intent and help to ensure that a
stream is not automatically opened when not desired.

include/haproxy/mux_quic-t.h
include/haproxy/mux_quic.h
src/h3.c
src/mux_quic.c

index 1772a3847bd1de19bc40eb8f2283ee6d0a77c21e..168f07444395ad15b6d39f7a9520faa8128636c1 100644 (file)
@@ -37,7 +37,6 @@ struct qcc {
 
        struct {
                uint64_t max_streams; /* maximum number of concurrent streams */
-               uint64_t largest_id;  /* Largest ID of the open streams */
                uint64_t nb_streams;  /* Number of open streams */
                struct {
                        uint64_t max_data; /* Maximum number of bytes which may be received */
@@ -80,6 +79,11 @@ struct qcc {
                uint64_t sent_offsets; /* sum of all offset sent */
        } tx;
 
+       uint64_t largest_bidi_r; /* largest remote bidi stream ID opened. */
+       uint64_t largest_uni_r;  /* largest remote uni stream ID opened. */
+       uint64_t next_bidi_l; /* next stream ID to use for local bidi stream */
+       uint64_t next_uni_l;  /* next stream ID to use for local uni stream */
+
        struct eb_root streams_by_id; /* all active streams by their ID */
 
        struct list send_retry_list; /* list of qcs eligible to send retry */
index 3da5736e75c054b7f281a9ee15e300d7779679b3..75e39a769e12174eab41a6edf6223aaf65647bb6 100644 (file)
@@ -14,9 +14,7 @@
 #include <haproxy/stream.h>
 #include <haproxy/xprt_quic-t.h>
 
-struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type);
-void qcs_free(struct qcs *qcs);
-
+struct qcs *qcc_open_stream_local(struct qcc *qcc, int bidi);
 struct buffer *qc_get_buf(struct qcs *qcs, struct buffer *bptr);
 
 int qcs_subscribe(struct qcs *qcs, int event_type, struct wait_event *es);
@@ -71,8 +69,6 @@ static inline int quic_stream_is_bidi(uint64_t id)
        return !quic_stream_is_uni(id);
 }
 
-struct qcs *qcc_get_qcs(struct qcc *qcc, uint64_t id);
-
 /* Install the <app_ops> applicative layer of a QUIC connection on mux <qcc>.
  * Returns 0 on success else non-zero.
  */
index 60c003bfe829ef01a7734b4958335819445bdae8..eb930af141a3d40f7af51d3f3d621d5cff5b1d44 100644 (file)
--- a/src/h3.c
+++ b/src/h3.c
@@ -1081,7 +1081,7 @@ static int h3_finalize(void *ctx)
        struct h3c *h3c = ctx;
        struct qcs *qcs;
 
-       qcs = qcs_new(h3c->qcc, 0x3, QCS_SRV_UNI);
+       qcs = qcc_open_stream_local(h3c->qcc, 0);
        if (!qcs)
                return 0;
 
index 8abf1640b343fdb22d48f76bd2c3a4c0cbe333bc..24c7373ba6b761b71493a2c989f676d3d88859df 100644 (file)
@@ -113,7 +113,7 @@ static void qcc_emit_cc(struct qcc *qcc, int err)
 }
 
 /* Allocate a new QUIC streams with id <id> and type <type>. */
-struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type)
+static struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type)
 {
        struct qcs *qcs;
 
@@ -201,7 +201,7 @@ static void qc_free_ncbuf(struct qcs *qcs, struct ncbuf *ncbuf)
  * error or connection shutdown. Else use qcs_destroy which handle all the
  * QUIC connection mechanism.
  */
-void qcs_free(struct qcs *qcs)
+static void qcs_free(struct qcs *qcs)
 {
        qc_free_ncbuf(qcs, &qcs->rx.ncbuf);
        b_free(&qcs->tx.buf);
@@ -291,92 +291,180 @@ void qcs_notify_send(struct qcs *qcs)
        }
 }
 
-/* Retrieve as an ebtree node the stream with <id> as ID, possibly allocates
- * several streams, depending on the already open ones.
- * Return this node if succeeded, NULL if not.
+/* Open a locally initiated stream for the connection <qcc>. Set <bidi> for a
+ * bidirectional stream, else an unidirectional stream is opened. The next
+ * available ID on the connection will be used according to the stream type.
+ *
+ * Returns the allocated stream instance or NULL on error.
  */
-struct qcs *qcc_get_qcs(struct qcc *qcc, uint64_t id)
+struct qcs *qcc_open_stream_local(struct qcc *qcc, int bidi)
+{
+       struct qcs *qcs;
+       enum qcs_type type;
+       uint64_t *next;
+
+       TRACE_ENTER(QMUX_EV_QCS_NEW, qcc->conn);
+
+       if (bidi) {
+               next = &qcc->next_bidi_l;
+               type = conn_is_back(qcc->conn) ? QCS_CLT_BIDI : QCS_SRV_BIDI;
+       }
+       else {
+               next = &qcc->next_uni_l;
+               type = conn_is_back(qcc->conn) ? QCS_CLT_UNI : QCS_SRV_UNI;
+       }
+
+       /* TODO ensure that we won't overflow remote peer flow control limit on
+        * streams. Else, we should emit a STREAMS_BLOCKED frame.
+        */
+
+       qcs = qcs_new(qcc, *next, type);
+       if (!qcs)
+               return NULL;
+
+       TRACE_DEVEL("opening local stream",  QMUX_EV_QCS_NEW, qcc->conn, qcs);
+       *next += 4;
+
+       TRACE_LEAVE(QMUX_EV_QCS_NEW, qcc->conn);
+       return qcs;
+}
+
+/* Open a remote initiated stream for the connection <qcc> with ID <id>. The
+ * caller is responsible to ensure that a stream with the same ID was not
+ * already opened. This function will also create all intermediaries streams
+ * with ID smaller than <id> not already opened before.
+ *
+ * Returns the allocated stream instance or NULL on error.
+ */
+static struct qcs *qcc_open_stream_remote(struct qcc *qcc, uint64_t id)
 {
-       unsigned int strm_type;
-       int64_t sub_id;
-       struct eb64_node *node;
        struct qcs *qcs = NULL;
+       enum qcs_type type;
+       uint64_t *largest;
 
-       strm_type = id & QCS_ID_TYPE_MASK;
-       sub_id = id >> QCS_ID_TYPE_SHIFT;
-       node = NULL;
-       if (quic_stream_is_local(qcc, id)) {
-               /* Local streams: this stream must be already opened. */
-               node = eb64_lookup(&qcc->streams_by_id, id);
-               if (!node) {
-                       /* unknown stream id */
-                       goto out;
-               }
-               qcs = eb64_entry(node, struct qcs, by_id);
+       TRACE_ENTER(QMUX_EV_QCS_NEW, qcc->conn);
+
+       BUG_ON_HOT(quic_stream_is_local(qcc, id));
+
+       if (quic_stream_is_bidi(id)) {
+               largest = &qcc->largest_bidi_r;
+               type = conn_is_back(qcc->conn) ? QCS_SRV_BIDI : QCS_CLT_BIDI;
        }
        else {
-               /* Remote streams. */
-               struct eb_root *strms;
-               uint64_t largest_id;
-               enum qcs_type qcs_type;
-
-               strms = &qcc->streams_by_id;
-               qcs_type = qcs_id_type(id);
-
-               /* TODO also checks max-streams for uni streams */
-               if (quic_stream_is_bidi(id)) {
-                       if (sub_id + 1 > qcc->lfctl.ms_bidi) {
-                               /* RFC 9000 4.6. Controlling Concurrency
-                                *
-                                * An endpoint that receives a frame with a
-                                * stream ID exceeding the limit it has sent
-                                * MUST treat this as a connection error of
-                                * type STREAM_LIMIT_ERROR
-                                */
-                               qcc_emit_cc(qcc, QC_ERR_STREAM_LIMIT_ERROR);
-                               goto out;
-                       }
-               }
+               largest = &qcc->largest_uni_r;
+               type = conn_is_back(qcc->conn) ? QCS_SRV_UNI : QCS_CLT_UNI;
+       }
 
-               /* Note: ->largest_id was initialized with (uint64_t)-1 as value, 0 being a
-                * correct value.
-                */
-               largest_id = qcc->strms[qcs_type].largest_id;
-               if (sub_id > (int64_t)largest_id) {
-                       /* RFC: "A stream ID that is used out of order results in all streams
-                        * of that type with lower-numbered stream IDs also being opened".
-                        * So, let's "open" these streams.
+       /* TODO also checks max-streams for uni streams */
+       if (quic_stream_is_bidi(id)) {
+               if (id >= qcc->lfctl.ms_bidi * 4) {
+                       /* RFC 9000 4.6. Controlling Concurrency
+                        *
+                        * An endpoint that receives a frame with a
+                        * stream ID exceeding the limit it has sent
+                        * MUST treat this as a connection error of
+                        * type STREAM_LIMIT_ERROR
                         */
-                       int64_t i;
-                       struct qcs *tmp_qcs;
+                       TRACE_DEVEL("leaving on flow control error", QMUX_EV_QCS_NEW, qcc->conn);
+                       qcc_emit_cc(qcc, QC_ERR_STREAM_LIMIT_ERROR);
+                       return NULL;
+               }
+       }
 
-                       tmp_qcs = NULL;
-                       for (i = largest_id + 1; i <= sub_id; i++) {
-                               uint64_t id = (i << QCS_ID_TYPE_SHIFT) | strm_type;
-                               enum qcs_type type = id & QCS_ID_DIR_BIT ? QCS_CLT_UNI : QCS_CLT_BIDI;
+       /* Only stream ID not already opened can be used. */
+       BUG_ON(id < *largest);
 
-                               tmp_qcs = qcs_new(qcc, id, type);
-                               if (!tmp_qcs) {
-                                       /* allocation failure */
-                                       goto out;
-                               }
+       while (id >= *largest) {
+               const char *str = *largest < id ? "opening intermediary stream" :
+                                                 "opening remote stream";
 
-                               qcc->strms[qcs_type].largest_id = i;
-                       }
-                       if (tmp_qcs)
-                               qcs = tmp_qcs;
-               }
-               else {
-                       node = eb64_lookup(strms, id);
-                       if (node)
-                               qcs = eb64_entry(node, struct qcs, by_id);
+               qcs = qcs_new(qcc, *largest, type);
+               if (!qcs) {
+                       TRACE_DEVEL("leaving on stream fallocation failure", QMUX_EV_QCS_NEW, qcc->conn);
+                       return NULL;
                }
+
+               TRACE_DEVEL(str, QMUX_EV_QCS_NEW, qcc->conn, qcs);
+               *largest += 4;
        }
 
+       TRACE_LEAVE(QMUX_EV_QCS_NEW, qcc->conn);
        return qcs;
+}
+
+/* Use this function for a stream <id> which is not in <qcc> stream tree. It
+ * returns true if the associated stream is closed.
+ */
+static int qcc_stream_id_is_closed(struct qcc *qcc, uint64_t id)
+{
+       uint64_t *largest;
+
+       /* This function must only be used for stream not present in the stream tree. */
+       BUG_ON_HOT(eb64_lookup(&qcc->streams_by_id, id));
+
+       if (quic_stream_is_local(qcc, id)) {
+               largest = quic_stream_is_uni(id) ? &qcc->next_uni_l :
+                                                  &qcc->next_bidi_l;
+       }
+       else {
+               largest = quic_stream_is_uni(id) ? &qcc->largest_uni_r :
+                                                  &qcc->largest_bidi_r;
+       }
+
+       return id < *largest;
+}
+
+/* Retrieve the stream instance from <id> ID. This can be used when receiving
+ * STREAM, STREAM_DATA_BLOCKED, RESET_STREAM, MAX_STREAM_DATA or STOP_SENDING
+ * frames.
+ *
+ * Return the stream instance or NULL if not found.
+ */
+static struct qcs *qcc_get_qcs(struct qcc *qcc, uint64_t id)
+{
+       struct eb64_node *node;
+       struct qcs *qcs = NULL;
+
+       TRACE_ENTER(QMUX_EV_QCC_RECV, qcc->conn);
+
+       /* Search the stream in the connection tree. */
+       node = eb64_lookup(&qcc->streams_by_id, id);
+       if (node) {
+               qcs = eb64_entry(node, struct qcs, by_id);
+               TRACE_DEVEL("using stream from connection tree", QMUX_EV_QCC_RECV, qcc->conn, qcs);
+               return qcs;
+       }
+
+       /* Check if stream is already closed. */
+       if (qcc_stream_id_is_closed(qcc, id)) {
+               TRACE_DEVEL("already released stream", QMUX_EV_QCC_RECV|QMUX_EV_QCC_NQCS, qcc->conn, NULL, &id);
+               return NULL;
+       }
+
+       /* Create the stream. This is valid only for remote initiated one. A
+        * local stream must have already been explicitely created by the
+        * application protocol layer.
+        */
+       if (quic_stream_is_local(qcc, id)) {
+               /* RFC 9000 19.8. STREAM Frames
+                *
+                * An endpoint MUST terminate the connection with error
+                * STREAM_STATE_ERROR if it receives a STREAM frame for a locally
+                * initiated stream that has not yet been created, or for a send-only
+                * stream.
+                */
+               TRACE_DEVEL("leaving on locally initiated stream not yet created", QMUX_EV_QCC_RECV|QMUX_EV_QCC_NQCS, qcc->conn, NULL, &id);
+               qcc_emit_cc(qcc, QC_ERR_STREAM_STATE_ERROR);
+               return NULL;
+       }
+       else {
+               /* Remote stream not found - try to open it. */
+               qcs = qcc_open_stream_remote(qcc, id);
+       }
 
  out:
-       return NULL;
+       TRACE_LEAVE(QMUX_EV_QCC_RECV, qcc->conn);
+       return qcs;
 }
 
 /* Simple function to duplicate a buffer */
@@ -515,30 +603,8 @@ int qcc_recv(struct qcc *qcc, uint64_t id, uint64_t len, uint64_t offset,
        }
 
        qcs = qcc_get_qcs(qcc, id);
-       if (!qcs) {
-               if ((id >> QCS_ID_TYPE_SHIFT) <= qcc->strms[qcs_id_type(id)].largest_id) {
-                       TRACE_DEVEL("already released stream", QMUX_EV_QCC_RECV|QMUX_EV_QCC_NQCS, qcc->conn, NULL, &id);
-                       return 0;
-               }
-               else {
-                       /* RFC 9000 19.8. STREAM Frames
-                        *
-                        * An endpoint MUST terminate the connection with error
-                        * STREAM_STATE_ERROR if it receives a STREAM frame for a locally
-                        * initiated stream that has not yet been created, or for a send-only
-                        * stream.
-                        */
-                       if (quic_stream_is_local(qcc, id)) {
-                               TRACE_DEVEL("leaving on locally initiated stream not yet created", QMUX_EV_QCC_RECV|QMUX_EV_QCC_NQCS, qcc->conn, NULL, &id);
-                               qcc_emit_cc(qcc, QC_ERR_STREAM_STATE_ERROR);
-                               return 1;
-                       }
-                       else {
-                               TRACE_DEVEL("leaving on stream not found", QMUX_EV_QCC_RECV|QMUX_EV_QCC_NQCS, qcc->conn, NULL, &id);
-                               return 1;
-                       }
-               }
-       }
+       if (!qcs)
+               return 0;
 
        if (offset + len <= qcs->rx.offset) {
                TRACE_DEVEL("leaving on already received offset", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV, qcc->conn, qcs);
@@ -1370,26 +1436,22 @@ static int qc_init(struct connection *conn, struct proxy *prx,
        /* Client initiated streams must respect the server flow control. */
        qcc->strms[QCS_CLT_BIDI].max_streams = lparams->initial_max_streams_bidi;
        qcc->strms[QCS_CLT_BIDI].nb_streams = 0;
-       qcc->strms[QCS_CLT_BIDI].largest_id = -1;
        qcc->strms[QCS_CLT_BIDI].rx.max_data = 0;
        qcc->strms[QCS_CLT_BIDI].tx.max_data = lparams->initial_max_stream_data_bidi_remote;
 
        qcc->strms[QCS_CLT_UNI].max_streams = lparams->initial_max_streams_uni;
        qcc->strms[QCS_CLT_UNI].nb_streams = 0;
-       qcc->strms[QCS_CLT_UNI].largest_id = -1;
        qcc->strms[QCS_CLT_UNI].rx.max_data = 0;
        qcc->strms[QCS_CLT_UNI].tx.max_data = lparams->initial_max_stream_data_uni;
 
        /* Server initiated streams must respect the server flow control. */
        qcc->strms[QCS_SRV_BIDI].max_streams = 0;
        qcc->strms[QCS_SRV_BIDI].nb_streams = 0;
-       qcc->strms[QCS_SRV_BIDI].largest_id = -1;
        qcc->strms[QCS_SRV_BIDI].rx.max_data = lparams->initial_max_stream_data_bidi_local;
        qcc->strms[QCS_SRV_BIDI].tx.max_data = 0;
 
        qcc->strms[QCS_SRV_UNI].max_streams = 0;
        qcc->strms[QCS_SRV_UNI].nb_streams = 0;
-       qcc->strms[QCS_SRV_UNI].largest_id = -1;
        qcc->strms[QCS_SRV_UNI].rx.max_data = lparams->initial_max_stream_data_uni;
        qcc->strms[QCS_SRV_UNI].tx.max_data = 0;
 
@@ -1407,6 +1469,19 @@ static int qc_init(struct connection *conn, struct proxy *prx,
        qcc->rfctl.msd_bidi_l = rparams->initial_max_stream_data_bidi_local;
        qcc->rfctl.msd_bidi_r = rparams->initial_max_stream_data_bidi_remote;
 
+       if (conn_is_back(conn)) {
+               qcc->next_bidi_l    = 0x00;
+               qcc->largest_bidi_r = 0x01;
+               qcc->next_uni_l     = 0x02;
+               qcc->largest_uni_r  = 0x03;
+       }
+       else {
+               qcc->largest_bidi_r = 0x00;
+               qcc->next_bidi_l    = 0x01;
+               qcc->largest_uni_r  = 0x02;
+               qcc->next_uni_l     = 0x03;
+       }
+
        qcc->wait_event.tasklet = tasklet_new();
        if (!qcc->wait_event.tasklet)
                goto fail_no_tasklet;