]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MINOR: mux-quic: define basic stream states
authorAmaury Denoyelle <adenoyelle@haproxy.com>
Fri, 1 Jul 2022 14:48:42 +0000 (16:48 +0200)
committerAmaury Denoyelle <adenoyelle@haproxy.com>
Mon, 11 Jul 2022 14:37:21 +0000 (16:37 +0200)
Implement a basic state machine to represent stream lifecycle. By
default a stream is idle. It is marked as open when sending or receiving
the first data on a stream.

Bidirectional streams has two states to represent the closing on both
receive and send channels. This distinction does not exists for
unidirectional streams which passed automatically from open to close
state.

This patch is mostly internal and has a limited visible impact. Some
behaviors are slightly updated :
* closed streams are garbage collected at the start of io handler
* send operation is interrupted if a stream is close locally

Outside of this, there is no functional change. However, some additional
BUG_ON guards are implemented to ensure that we do not conduct invalid
operation on a stream. This should strengthen the code safety. Also,
stream states are displayed on trace which should help debugging.

include/haproxy/mux_quic-t.h
src/mux_quic.c

index a2004a49c7506034479dd763476551df1f72d248..ecee4840708a1fe3f27fea9ff7f0acba2c65185f 100644 (file)
@@ -111,10 +111,33 @@ struct qcc {
 /* Maximum size of stream Rx buffer. */
 #define QC_S_RX_BUF_SZ   (global.tune.bufsize - NCB_RESERVED_SZ)
 
+/* QUIC stream states
+ *
+ * On initialization a stream is put on idle state. It is opened as soon as
+ * data has been successfully sent or received on it.
+ *
+ * A bidirectional stream has two channels which can be closed separately. The
+ * local channel is closed when the STREAM frame with FIN or a RESET_STREAM has
+ * been emitted. The remote channel is closed as soon as all data from the peer
+ * has been received. The stream goes instantely to the close state once both
+ * channels are closed.
+ *
+ * A unidirectional stream has only one channel of communication. Thus, it does
+ * not use half closed states and transition directly from open to close state.
+ */
+enum qcs_state {
+       QC_SS_IDLE = 0, /* initial state */
+       QC_SS_OPEN,     /* opened */
+       QC_SS_HLOC,     /* half-closed local */
+       QC_SS_HREM,     /* half-closed remote */
+       QC_SS_CLO,      /* closed */
+} __attribute__((packed));
+
 struct qcs {
        struct qcc *qcc;
        struct sedesc *sd;
        uint32_t flags;      /* QC_SF_* */
+       enum qcs_state st;   /* QC_SS_* state */
        void *ctx;           /* app-ops context */
 
        struct {
index f03a58170e6105a8b324187214c9ba5cf35f7244..53d0aab662852bfcf870034f8914bed5b28d1a0b 100644 (file)
@@ -127,6 +127,7 @@ static struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type)
        qcs->qcc = qcc;
        qcs->sd = NULL;
        qcs->flags = QC_SF_NONE;
+       qcs->st = QC_SS_IDLE;
        qcs->ctx = NULL;
 
        /* Allocate transport layer stream descriptor. Only needed for TX. */
@@ -226,6 +227,72 @@ static forceinline struct stconn *qcs_sc(const struct qcs *qcs)
        return qcs->sd ? qcs->sd->sc : NULL;
 }
 
+/* Mark a stream as open if it was idle. This can be used on every
+ * successful emission/reception operation to update the stream state.
+ */
+static void qcs_idle_open(struct qcs *qcs)
+{
+       /* This operation must not be used if the stream is already closed. */
+       BUG_ON_HOT(qcs->st == QC_SS_CLO);
+
+       if (qcs->st == QC_SS_IDLE) {
+               qcs->st = QC_SS_OPEN;
+               TRACE_DEVEL("opening stream", QMUX_EV_QCS_NEW, qcs->qcc->conn, qcs);
+       }
+}
+
+/* Close the local channel of <qcs> instance. */
+static void qcs_close_local(struct qcs *qcs)
+{
+       /* The stream must have already been opened. */
+       BUG_ON_HOT(qcs->st == QC_SS_IDLE);
+
+       /* This operation cannot be used multiple times. */
+       BUG_ON_HOT(qcs->st == QC_SS_HLOC || qcs->st == QC_SS_CLO);
+
+       if (quic_stream_is_bidi(qcs->id)) {
+               qcs->st = (qcs->st == QC_SS_HREM) ? QC_SS_CLO : QC_SS_HLOC;
+       }
+       else {
+               /* Only local uni streams are valid for this operation. */
+               BUG_ON_HOT(quic_stream_is_remote(qcs->qcc, qcs->id));
+               qcs->st = QC_SS_CLO;
+       }
+
+       TRACE_DEVEL("closing stream locally", QMUX_EV_QCS_END, qcs->qcc->conn, qcs);
+}
+
+/* Close the remote channel of <qcs> instance. */
+static void qcs_close_remote(struct qcs *qcs)
+{
+       /* The stream must have already been opened. */
+       BUG_ON_HOT(qcs->st == QC_SS_IDLE);
+
+       /* This operation cannot be used multiple times. */
+       BUG_ON_HOT(qcs->st == QC_SS_HREM || qcs->st == QC_SS_CLO);
+
+       if (quic_stream_is_bidi(qcs->id)) {
+               qcs->st = (qcs->st == QC_SS_HLOC) ? QC_SS_CLO : QC_SS_HREM;
+       }
+       else {
+               /* Only remote uni streams are valid for this operation. */
+               BUG_ON_HOT(quic_stream_is_local(qcs->qcc, qcs->id));
+               qcs->st = QC_SS_CLO;
+       }
+
+       TRACE_DEVEL("closing stream remotely", QMUX_EV_QCS_END, qcs->qcc->conn, qcs);
+}
+
+static int qcs_is_close_local(struct qcs *qcs)
+{
+       return qcs->st == QC_SS_HLOC || qcs->st == QC_SS_CLO;
+}
+
+static __maybe_unused int qcs_is_close_remote(struct qcs *qcs)
+{
+       return qcs->st == QC_SS_HREM || qcs->st == QC_SS_CLO;
+}
+
 struct buffer *qc_get_buf(struct qcs *qcs, struct buffer *bptr)
 {
        struct buffer *buf = b_alloc(bptr);
@@ -665,6 +732,8 @@ int qcc_recv(struct qcc *qcc, uint64_t id, uint64_t len, uint64_t offset,
                return 0;
        }
 
+       qcs_idle_open(qcs);
+
        if (offset + len > qcs->rx.offset_max) {
                uint64_t diff = offset + len - qcs->rx.offset_max;
                qcs->rx.offset_max = offset + len;
@@ -724,6 +793,9 @@ int qcc_recv(struct qcc *qcc, uint64_t id, uint64_t len, uint64_t offset,
        if (fin)
                qcs->flags |= QC_SF_SIZE_KNOWN;
 
+       if (qcs->flags & QC_SF_SIZE_KNOWN && !ncb_is_fragmented(&qcs->rx.ncbuf))
+               qcs_close_remote(qcs);
+
        if (ncb_data(&qcs->rx.ncbuf, 0) && !(qcs->flags & QC_SF_DEM_FULL))
                qcc_decode_qcs(qcc, qcs);
 
@@ -1093,6 +1165,8 @@ void qcc_streams_sent_done(struct qcs *qcs, uint64_t data, uint64_t offset)
        if (offset + data < qcs->tx.sent_offset)
                return;
 
+       qcs_idle_open(qcs);
+
        diff = offset + data - qcs->tx.sent_offset;
        if (diff) {
                /* increase offset sum on connection */
@@ -1118,6 +1192,8 @@ void qcc_streams_sent_done(struct qcs *qcs, uint64_t data, uint64_t offset)
        }
 
        if (qcs->tx.offset == qcs->tx.sent_offset && qcs_stream_fin(qcs)) {
+               /* Close stream locally. */
+               qcs_close_local(qcs);
                /* Reset flag to not emit multiple FIN STREAM frames. */
                qcs->flags &= ~QC_SF_FIN_STREAM;
        }
@@ -1302,6 +1378,11 @@ static int qc_send(struct qcc *qcc)
                        continue;
                }
 
+               if (qcs_is_close_local(qcs)) {
+                       node = eb64_next(node);
+                       continue;
+               }
+
                if (qcs->flags & QC_SF_BLK_SFCTL) {
                        node = eb64_next(node);
                        continue;
@@ -1417,6 +1498,14 @@ static int qc_purge_streams(struct qcc *qcc)
                struct qcs *qcs = eb64_entry(node, struct qcs, by_id);
                node = eb64_next(node);
 
+               /* Release not attached closed streams. */
+               if (qcs->st == QC_SS_CLO && !qcs_sc(qcs)) {
+                       TRACE_DEVEL("purging closed stream", QMUX_EV_QCC_WAKE, qcs->qcc->conn, qcs);
+                       qcs_destroy(qcs);
+                       release = 1;
+                       continue;
+               }
+
                /* Release detached streams with empty buffer. */
                if (qcs->flags & QC_SF_DETACH) {
                        if (!b_data(&qcs->tx.buf) &&
@@ -1635,6 +1724,14 @@ static void qc_detach(struct sedesc *sd)
 
        TRACE_ENTER(QMUX_EV_STRM_END, qcc->conn, qcs);
 
+       /* TODO this BUG_ON_HOT() is not correct as the stconn layer may detach
+        * from the stream even if it is not closed remotely at the QUIC layer.
+        * This happens for example when a stream must be closed due to a
+        * rejected request. To better handle these cases, it will be required
+        * to implement shutr/shutw MUX operations. Once this is done, this
+        * BUG_ON_HOT() statement can be adjusted.
+        */
+       //BUG_ON_HOT(!qcs_is_close_remote(qcs));
        --qcc->nb_sc;
 
        if ((b_data(&qcs->tx.buf) || qcs->tx.offset > qcs->tx.sent_offset) &&
@@ -1739,8 +1836,15 @@ static size_t qc_snd_buf(struct stconn *sc, struct buffer *buf,
 
        TRACE_ENTER(QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs);
 
+       if (qcs_is_close_local(qcs)) {
+               ret = count;
+               count = 0;
+               goto end;
+       }
+
        ret = qcs->qcc->app_ops->snd_buf(sc, buf, count, flags);
 
+ end:
        TRACE_LEAVE(QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs);
 
        return ret;
@@ -1842,6 +1946,18 @@ static int qc_wake(struct connection *conn)
 }
 
 
+static char *qcs_st_to_str(enum qcs_state st)
+{
+       switch (st) {
+       case QC_SS_IDLE: return "IDL";
+       case QC_SS_OPEN: return "OPN";
+       case QC_SS_HLOC: return "HCL";
+       case QC_SS_HREM: return "HCR";
+       case QC_SS_CLO:  return "CLO";
+       default:         return "???";
+       }
+}
+
 static void qmux_trace_frm(const struct quic_frame *frm)
 {
        switch (frm->type) {
@@ -1877,7 +1993,9 @@ static void qmux_trace(enum trace_level level, uint64_t mask,
                chunk_appendf(&trace_buf, " : qcc=%p(F)", qcc);
 
                if (qcs)
-                       chunk_appendf(&trace_buf, " qcs=%p(%llu)", qcs, (ull)qcs->id);
+                       chunk_appendf(&trace_buf, " qcs=%p .id=%llu .st=%s",
+                                     qcs, (ull)qcs->id,
+                                     qcs_st_to_str(qcs->st));
 
                if (mask & QMUX_EV_QCC_NQCS) {
                        const uint64_t *id = a3;