]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MEDIUM: mux-quic: report errors on conn-streams
authorAmaury Denoyelle <adenoyelle@haproxy.com>
Wed, 6 Apr 2022 13:46:30 +0000 (15:46 +0200)
committerAmaury Denoyelle <adenoyelle@haproxy.com>
Thu, 7 Apr 2022 08:37:45 +0000 (10:37 +0200)
Complete the error reporting. For each attached streams, if CO_FL_ERROR
is set, mark them with CS_FL_ERR_PENDING|CS_FL_ERROR. This will notify
the upper layer to trigger streams detach and release of the MUX.

This reporting is implemented in a new function qc_wake_some_streams(),
called by qc_wake(). This ensures that a lower-layer error is quickly
reported to the individual streams.

include/haproxy/mux_quic.h
src/mux_quic.c

index c01d1560e13434a26c95adc1d1631bc970333eed..44dfe1efdd2cff61b58d3885cd517c188a8eb5bd 100644 (file)
@@ -112,6 +112,7 @@ static inline struct conn_stream *qc_attach_cs(struct qcs *qcs, struct buffer *b
                return NULL;
        cs_attach_endp(cs, &qcs->qcc->conn->obj_type, qcs);
 
+       qcs->cs = cs;
        cs->ctx = qcs;
        stream_new(qcs->qcc->conn->owner, cs, buf);
 
index 243fb09b2d744ba2f886a16a877560fbb8b687dd..cdac9a6f552e5ac6c0e3a3c1ae894a40aaf53e29 100644 (file)
@@ -1068,7 +1068,8 @@ static void qc_detach(struct conn_stream *cs)
 
        --qcc->nb_cs;
 
-       if (b_data(&qcs->tx.buf) || qcs->tx.offset > qcs->tx.sent_offset) {
+       if ((b_data(&qcs->tx.buf) || qcs->tx.offset > qcs->tx.sent_offset) &&
+           !(qcc->conn->flags & CO_FL_ERROR)) {
                TRACE_DEVEL("leaving with remaining data, detaching qcs", QMUX_EV_STRM_END, qcc->conn, qcs);
                qcs->flags |= QC_SF_DETACH;
                return;
@@ -1203,6 +1204,42 @@ static int qc_unsubscribe(struct conn_stream *cs, int event_type, struct wait_ev
        return 0;
 }
 
+/* Loop through all qcs from <qcc>. If CO_FL_ERROR is set on the connection,
+ * report CS_FL_ERR_PENDING|CS_FL_ERROR on the attached conn-streams and wake
+ * them.
+ */
+static int qc_wake_some_streams(struct qcc *qcc)
+{
+       struct qc_stream_desc *stream;
+       struct qcs *qcs;
+       struct eb64_node *node;
+
+       for (node = eb64_first(&qcc->streams_by_id); node;
+            node = eb64_next(node)) {
+               stream = eb64_entry(node, struct qc_stream_desc, by_id);
+               qcs = stream->ctx;
+
+               if (!qcs->cs)
+                       continue;
+
+               if (qcc->conn->flags & CO_FL_ERROR) {
+                       qcs->cs->flags |= CS_FL_ERR_PENDING;
+                       if (qcs->cs->flags & CS_FL_EOS)
+                               qcs->cs->flags |= CS_FL_ERROR;
+
+                       if (qcs->subs) {
+                               qcs_notify_recv(qcs);
+                               qcs_notify_send(qcs);
+                       }
+                       else if (qcs->cs->data_cb->wake) {
+                               qcs->cs->data_cb->wake(qcs->cs);
+                       }
+               }
+       }
+
+       return 0;
+}
+
 static int qc_wake(struct connection *conn)
 {
        struct qcc *qcc = conn->ctx;
@@ -1220,6 +1257,8 @@ static int qc_wake(struct connection *conn)
 
        qc_send(qcc);
 
+       qc_wake_some_streams(qcc);
+
        if (qcc_is_dead(qcc))
                goto release;