]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MINOR: mux-quic: properly report end-of-stream on recv
authorAmaury Denoyelle <adenoyelle@haproxy.com>
Mon, 15 May 2023 09:31:20 +0000 (11:31 +0200)
committerAmaury Denoyelle <adenoyelle@haproxy.com>
Tue, 16 May 2023 15:53:45 +0000 (17:53 +0200)
MUX is responsible to put EOS on stream when read channel is closed.
This happens if underlying connection is closed or a RESET_STREAM is
received. FIN STREAM is ignored in this case.

For connection closure, simply check for CO_FL_SOCK_RD_SH.

For RESET_STREAM reception, a new flag QC_CF_RECV_RESET has been
introduced. It is set when RESET_STREAM is received, unless we already
received all data. This is conform to QUIC RFC which allows to ignore a
RESET_STREAM in this case. During RESET_STREAM processing, input buffer
is emptied so EOS can be reported right away on recv_buf operation.

This should be backported up to 2.7.

include/haproxy/mux_quic-t.h
src/mux_quic.c

index b501c90b16aa2ed2a417dcef16b7f92a3fa74196..204258a5e955cceffdd0e9ed51b29818c388b944 100644 (file)
@@ -128,6 +128,7 @@ struct qcc {
 #define QC_SF_HREQ_RECV         0x00000100  /* a full HTTP request has been received */
 #define QC_SF_TO_STOP_SENDING   0x00000200  /* a STOP_SENDING must be sent */
 #define QC_SF_UNKNOWN_PL_LENGTH 0x00000400  /* HTX EOM may be missing from the stream layer */
+#define QC_SF_RECV_RESET        0x00000800  /* a RESET_STREAM was received */
 
 /* Maximum size of stream Rx buffer. */
 #define QC_S_RX_BUF_SZ   (global.tune.bufsize - NCB_RESERVED_SZ)
index 7db03210ae91785247aa4b97020f62f1ce537eea..42308d8f5f409b453e0b6febee6a6c600aa454bc 100644 (file)
@@ -1217,12 +1217,20 @@ int qcc_recv_reset_stream(struct qcc *qcc, uint64_t id, uint64_t err, uint64_t f
                goto err;
        }
 
+       /* RFC 9000 3.2. Receiving Stream States
+        *
+        * A RESET_STREAM signal might be suppressed or withheld
+        * if stream data is completely received and is buffered to be read by
+        * the application. If the RESET_STREAM is suppressed, the receiving
+        * part of the stream remains in "Data Recvd".
+        */
        if (!qcs || qcs_is_close_remote(qcs))
                goto out;
 
        TRACE_PROTO("receiving RESET_STREAM", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV, qcc->conn, qcs);
        qcs_idle_open(qcs);
 
+       /* Ensure stream closure is not forbidden by application protocol. */
        if (qcc->app_ops->close) {
                if (qcc->app_ops->close(qcs, QCC_APP_OPS_CLOSE_SIDE_RD)) {
                        TRACE_ERROR("closure rejected by app layer", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV, qcc->conn, qcs);
@@ -1237,7 +1245,14 @@ int qcc_recv_reset_stream(struct qcc *qcc, uint64_t id, uint64_t err, uint64_t f
                goto err;
        }
 
-       qcs->flags |= QC_SF_SIZE_KNOWN;
+       /* RFC 9000 3.2. Receiving Stream States
+        *
+        * An
+        * implementation MAY interrupt delivery of stream data, discard any
+        * data that was not consumed, and signal the receipt of the
+        * RESET_STREAM.
+        */
+       qcs->flags |= QC_SF_SIZE_KNOWN|QC_SF_RECV_RESET;
        qcs_close_remote(qcs);
        qc_free_ncbuf(qcs, &qcs->rx.ncbuf);
 
@@ -2669,6 +2684,21 @@ static size_t qc_recv_buf(struct stconn *sc, struct buffer *buf,
                        se_expect_data(qcs->sd);
                }
 
+               /* Set end-of-stream on read closed. */
+               if (qcs->flags & QC_SF_RECV_RESET ||
+                   qcc->conn->flags & CO_FL_SOCK_RD_SH) {
+                       TRACE_STATE("report end-of-stream", QMUX_EV_STRM_RECV, qcc->conn, qcs);
+                       se_fl_set(qcs->sd, SE_FL_EOS);
+
+                       /* Set error if EOI not reached. This may happen on
+                        * RESET_STREAM reception or connection error.
+                        */
+                       if (!se_fl_test(qcs->sd, SE_FL_EOI)) {
+                               TRACE_STATE("report error on stream aborted", QMUX_EV_STRM_RECV, qcc->conn, qcs);
+                               se_fl_set(qcs->sd, SE_FL_EOS | SE_FL_ERROR);
+                       }
+               }
+
                if (se_fl_test(qcs->sd, SE_FL_ERR_PENDING)) {
                        TRACE_STATE("report error", QMUX_EV_STRM_RECV, qcc->conn, qcs);
                        se_fl_set(qcs->sd, SE_FL_ERROR);