}
/* Allocate a new QUIC streams with id <id> and type <type>. */
-struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type)
+static struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type)
{
struct qcs *qcs;
* error or connection shutdown. Else use qcs_destroy which handle all the
* QUIC connection mechanism.
*/
-void qcs_free(struct qcs *qcs)
+static void qcs_free(struct qcs *qcs)
{
qc_free_ncbuf(qcs, &qcs->rx.ncbuf);
b_free(&qcs->tx.buf);
}
}
-/* Retrieve as an ebtree node the stream with <id> as ID, possibly allocates
- * several streams, depending on the already open ones.
- * Return this node if succeeded, NULL if not.
+/* Open a locally initiated stream for the connection <qcc>. Set <bidi> for a
+ * bidirectional stream, else an unidirectional stream is opened. The next
+ * available ID on the connection will be used according to the stream type.
+ *
+ * Returns the allocated stream instance or NULL on error.
*/
-struct qcs *qcc_get_qcs(struct qcc *qcc, uint64_t id)
+struct qcs *qcc_open_stream_local(struct qcc *qcc, int bidi)
+{
+ struct qcs *qcs;
+ enum qcs_type type;
+ uint64_t *next;
+
+ TRACE_ENTER(QMUX_EV_QCS_NEW, qcc->conn);
+
+ if (bidi) {
+ next = &qcc->next_bidi_l;
+ type = conn_is_back(qcc->conn) ? QCS_CLT_BIDI : QCS_SRV_BIDI;
+ }
+ else {
+ next = &qcc->next_uni_l;
+ type = conn_is_back(qcc->conn) ? QCS_CLT_UNI : QCS_SRV_UNI;
+ }
+
+ /* TODO ensure that we won't overflow remote peer flow control limit on
+ * streams. Else, we should emit a STREAMS_BLOCKED frame.
+ */
+
+ qcs = qcs_new(qcc, *next, type);
+ if (!qcs)
+ return NULL;
+
+ TRACE_DEVEL("opening local stream", QMUX_EV_QCS_NEW, qcc->conn, qcs);
+ *next += 4;
+
+ TRACE_LEAVE(QMUX_EV_QCS_NEW, qcc->conn);
+ return qcs;
+}
+
+/* Open a remote initiated stream for the connection <qcc> with ID <id>. The
+ * caller is responsible to ensure that a stream with the same ID was not
+ * already opened. This function will also create all intermediaries streams
+ * with ID smaller than <id> not already opened before.
+ *
+ * Returns the allocated stream instance or NULL on error.
+ */
+static struct qcs *qcc_open_stream_remote(struct qcc *qcc, uint64_t id)
{
- unsigned int strm_type;
- int64_t sub_id;
- struct eb64_node *node;
struct qcs *qcs = NULL;
+ enum qcs_type type;
+ uint64_t *largest;
- strm_type = id & QCS_ID_TYPE_MASK;
- sub_id = id >> QCS_ID_TYPE_SHIFT;
- node = NULL;
- if (quic_stream_is_local(qcc, id)) {
- /* Local streams: this stream must be already opened. */
- node = eb64_lookup(&qcc->streams_by_id, id);
- if (!node) {
- /* unknown stream id */
- goto out;
- }
- qcs = eb64_entry(node, struct qcs, by_id);
+ TRACE_ENTER(QMUX_EV_QCS_NEW, qcc->conn);
+
+ BUG_ON_HOT(quic_stream_is_local(qcc, id));
+
+ if (quic_stream_is_bidi(id)) {
+ largest = &qcc->largest_bidi_r;
+ type = conn_is_back(qcc->conn) ? QCS_SRV_BIDI : QCS_CLT_BIDI;
}
else {
- /* Remote streams. */
- struct eb_root *strms;
- uint64_t largest_id;
- enum qcs_type qcs_type;
-
- strms = &qcc->streams_by_id;
- qcs_type = qcs_id_type(id);
-
- /* TODO also checks max-streams for uni streams */
- if (quic_stream_is_bidi(id)) {
- if (sub_id + 1 > qcc->lfctl.ms_bidi) {
- /* RFC 9000 4.6. Controlling Concurrency
- *
- * An endpoint that receives a frame with a
- * stream ID exceeding the limit it has sent
- * MUST treat this as a connection error of
- * type STREAM_LIMIT_ERROR
- */
- qcc_emit_cc(qcc, QC_ERR_STREAM_LIMIT_ERROR);
- goto out;
- }
- }
+ largest = &qcc->largest_uni_r;
+ type = conn_is_back(qcc->conn) ? QCS_SRV_UNI : QCS_CLT_UNI;
+ }
- /* Note: ->largest_id was initialized with (uint64_t)-1 as value, 0 being a
- * correct value.
- */
- largest_id = qcc->strms[qcs_type].largest_id;
- if (sub_id > (int64_t)largest_id) {
- /* RFC: "A stream ID that is used out of order results in all streams
- * of that type with lower-numbered stream IDs also being opened".
- * So, let's "open" these streams.
+ /* TODO also checks max-streams for uni streams */
+ if (quic_stream_is_bidi(id)) {
+ if (id >= qcc->lfctl.ms_bidi * 4) {
+ /* RFC 9000 4.6. Controlling Concurrency
+ *
+ * An endpoint that receives a frame with a
+ * stream ID exceeding the limit it has sent
+ * MUST treat this as a connection error of
+ * type STREAM_LIMIT_ERROR
*/
- int64_t i;
- struct qcs *tmp_qcs;
+ TRACE_DEVEL("leaving on flow control error", QMUX_EV_QCS_NEW, qcc->conn);
+ qcc_emit_cc(qcc, QC_ERR_STREAM_LIMIT_ERROR);
+ return NULL;
+ }
+ }
- tmp_qcs = NULL;
- for (i = largest_id + 1; i <= sub_id; i++) {
- uint64_t id = (i << QCS_ID_TYPE_SHIFT) | strm_type;
- enum qcs_type type = id & QCS_ID_DIR_BIT ? QCS_CLT_UNI : QCS_CLT_BIDI;
+ /* Only stream ID not already opened can be used. */
+ BUG_ON(id < *largest);
- tmp_qcs = qcs_new(qcc, id, type);
- if (!tmp_qcs) {
- /* allocation failure */
- goto out;
- }
+ while (id >= *largest) {
+ const char *str = *largest < id ? "opening intermediary stream" :
+ "opening remote stream";
- qcc->strms[qcs_type].largest_id = i;
- }
- if (tmp_qcs)
- qcs = tmp_qcs;
- }
- else {
- node = eb64_lookup(strms, id);
- if (node)
- qcs = eb64_entry(node, struct qcs, by_id);
+ qcs = qcs_new(qcc, *largest, type);
+ if (!qcs) {
+ TRACE_DEVEL("leaving on stream fallocation failure", QMUX_EV_QCS_NEW, qcc->conn);
+ return NULL;
}
+
+ TRACE_DEVEL(str, QMUX_EV_QCS_NEW, qcc->conn, qcs);
+ *largest += 4;
}
+ TRACE_LEAVE(QMUX_EV_QCS_NEW, qcc->conn);
return qcs;
+}
+
+/* Use this function for a stream <id> which is not in <qcc> stream tree. It
+ * returns true if the associated stream is closed.
+ */
+static int qcc_stream_id_is_closed(struct qcc *qcc, uint64_t id)
+{
+ uint64_t *largest;
+
+ /* This function must only be used for stream not present in the stream tree. */
+ BUG_ON_HOT(eb64_lookup(&qcc->streams_by_id, id));
+
+ if (quic_stream_is_local(qcc, id)) {
+ largest = quic_stream_is_uni(id) ? &qcc->next_uni_l :
+ &qcc->next_bidi_l;
+ }
+ else {
+ largest = quic_stream_is_uni(id) ? &qcc->largest_uni_r :
+ &qcc->largest_bidi_r;
+ }
+
+ return id < *largest;
+}
+
+/* Retrieve the stream instance from <id> ID. This can be used when receiving
+ * STREAM, STREAM_DATA_BLOCKED, RESET_STREAM, MAX_STREAM_DATA or STOP_SENDING
+ * frames.
+ *
+ * Return the stream instance or NULL if not found.
+ */
+static struct qcs *qcc_get_qcs(struct qcc *qcc, uint64_t id)
+{
+ struct eb64_node *node;
+ struct qcs *qcs = NULL;
+
+ TRACE_ENTER(QMUX_EV_QCC_RECV, qcc->conn);
+
+ /* Search the stream in the connection tree. */
+ node = eb64_lookup(&qcc->streams_by_id, id);
+ if (node) {
+ qcs = eb64_entry(node, struct qcs, by_id);
+ TRACE_DEVEL("using stream from connection tree", QMUX_EV_QCC_RECV, qcc->conn, qcs);
+ return qcs;
+ }
+
+ /* Check if stream is already closed. */
+ if (qcc_stream_id_is_closed(qcc, id)) {
+ TRACE_DEVEL("already released stream", QMUX_EV_QCC_RECV|QMUX_EV_QCC_NQCS, qcc->conn, NULL, &id);
+ return NULL;
+ }
+
+ /* Create the stream. This is valid only for remote initiated one. A
+ * local stream must have already been explicitely created by the
+ * application protocol layer.
+ */
+ if (quic_stream_is_local(qcc, id)) {
+ /* RFC 9000 19.8. STREAM Frames
+ *
+ * An endpoint MUST terminate the connection with error
+ * STREAM_STATE_ERROR if it receives a STREAM frame for a locally
+ * initiated stream that has not yet been created, or for a send-only
+ * stream.
+ */
+ TRACE_DEVEL("leaving on locally initiated stream not yet created", QMUX_EV_QCC_RECV|QMUX_EV_QCC_NQCS, qcc->conn, NULL, &id);
+ qcc_emit_cc(qcc, QC_ERR_STREAM_STATE_ERROR);
+ return NULL;
+ }
+ else {
+ /* Remote stream not found - try to open it. */
+ qcs = qcc_open_stream_remote(qcc, id);
+ }
out:
- return NULL;
+ TRACE_LEAVE(QMUX_EV_QCC_RECV, qcc->conn);
+ return qcs;
}
/* Simple function to duplicate a buffer */
}
qcs = qcc_get_qcs(qcc, id);
- if (!qcs) {
- if ((id >> QCS_ID_TYPE_SHIFT) <= qcc->strms[qcs_id_type(id)].largest_id) {
- TRACE_DEVEL("already released stream", QMUX_EV_QCC_RECV|QMUX_EV_QCC_NQCS, qcc->conn, NULL, &id);
- return 0;
- }
- else {
- /* RFC 9000 19.8. STREAM Frames
- *
- * An endpoint MUST terminate the connection with error
- * STREAM_STATE_ERROR if it receives a STREAM frame for a locally
- * initiated stream that has not yet been created, or for a send-only
- * stream.
- */
- if (quic_stream_is_local(qcc, id)) {
- TRACE_DEVEL("leaving on locally initiated stream not yet created", QMUX_EV_QCC_RECV|QMUX_EV_QCC_NQCS, qcc->conn, NULL, &id);
- qcc_emit_cc(qcc, QC_ERR_STREAM_STATE_ERROR);
- return 1;
- }
- else {
- TRACE_DEVEL("leaving on stream not found", QMUX_EV_QCC_RECV|QMUX_EV_QCC_NQCS, qcc->conn, NULL, &id);
- return 1;
- }
- }
- }
+ if (!qcs)
+ return 0;
if (offset + len <= qcs->rx.offset) {
TRACE_DEVEL("leaving on already received offset", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV, qcc->conn, qcs);
/* Client initiated streams must respect the server flow control. */
qcc->strms[QCS_CLT_BIDI].max_streams = lparams->initial_max_streams_bidi;
qcc->strms[QCS_CLT_BIDI].nb_streams = 0;
- qcc->strms[QCS_CLT_BIDI].largest_id = -1;
qcc->strms[QCS_CLT_BIDI].rx.max_data = 0;
qcc->strms[QCS_CLT_BIDI].tx.max_data = lparams->initial_max_stream_data_bidi_remote;
qcc->strms[QCS_CLT_UNI].max_streams = lparams->initial_max_streams_uni;
qcc->strms[QCS_CLT_UNI].nb_streams = 0;
- qcc->strms[QCS_CLT_UNI].largest_id = -1;
qcc->strms[QCS_CLT_UNI].rx.max_data = 0;
qcc->strms[QCS_CLT_UNI].tx.max_data = lparams->initial_max_stream_data_uni;
/* Server initiated streams must respect the server flow control. */
qcc->strms[QCS_SRV_BIDI].max_streams = 0;
qcc->strms[QCS_SRV_BIDI].nb_streams = 0;
- qcc->strms[QCS_SRV_BIDI].largest_id = -1;
qcc->strms[QCS_SRV_BIDI].rx.max_data = lparams->initial_max_stream_data_bidi_local;
qcc->strms[QCS_SRV_BIDI].tx.max_data = 0;
qcc->strms[QCS_SRV_UNI].max_streams = 0;
qcc->strms[QCS_SRV_UNI].nb_streams = 0;
- qcc->strms[QCS_SRV_UNI].largest_id = -1;
qcc->strms[QCS_SRV_UNI].rx.max_data = lparams->initial_max_stream_data_uni;
qcc->strms[QCS_SRV_UNI].tx.max_data = 0;
qcc->rfctl.msd_bidi_l = rparams->initial_max_stream_data_bidi_local;
qcc->rfctl.msd_bidi_r = rparams->initial_max_stream_data_bidi_remote;
+ if (conn_is_back(conn)) {
+ qcc->next_bidi_l = 0x00;
+ qcc->largest_bidi_r = 0x01;
+ qcc->next_uni_l = 0x02;
+ qcc->largest_uni_r = 0x03;
+ }
+ else {
+ qcc->largest_bidi_r = 0x00;
+ qcc->next_bidi_l = 0x01;
+ qcc->largest_uni_r = 0x02;
+ qcc->next_uni_l = 0x03;
+ }
+
qcc->wait_event.tasklet = tasklet_new();
if (!qcc->wait_event.tasklet)
goto fail_no_tasklet;