]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MEDIUM: mux-quic: implement STOP_SENDING emission
authorAmaury Denoyelle <adenoyelle@haproxy.com>
Fri, 9 Dec 2022 13:58:28 +0000 (14:58 +0100)
committerAmaury Denoyelle <adenoyelle@haproxy.com>
Thu, 22 Dec 2022 15:38:16 +0000 (16:38 +0100)
Implement STOP_SENDING. This is divided in two main functions :
* qcc_abort_stream_read() which can be used by application protocol to
  request for a STOP_SENDING. This set the flag QC_SF_READ_ABORTED.
* qcs_send_reset() is a static function called after the preceding one.
  It will send a STOP_SENDING via qcc_send().

QC_SF_READ_ABORTED flag is now properly used : if activated on a stream
during qcc_recv(), <qcc.app_ops.decode_qcs> callback is skipped. Also,
abort reading on unknown unidirection remote stream is now fully
supported with the emission of a STOP_SENDING as specified by RFC 9000.

This commit is part of implementing H3 errors at the stream level. This
will allows the H3 layer to request the peer to close its endpoint for
an error on a stream.

This should be backported up to 2.7.

include/haproxy/mux_quic-t.h
include/haproxy/mux_quic.h
src/h3.c
src/mux_quic.c

index d716e08d52c1852ffb14b54bfe249ea897926e66..f1c96f077630f508d01c99e51d3a81c9d916f73a 100644 (file)
@@ -117,9 +117,10 @@ struct qcc {
 #define QC_SF_DETACH            0x00000008  /* sc is detached but there is remaining data to send */
 #define QC_SF_BLK_SFCTL         0x00000010  /* stream blocked due to stream flow control limit */
 #define QC_SF_DEM_FULL          0x00000020  /* demux blocked on request channel buffer full */
-#define QC_SF_READ_ABORTED      0x00000040  /* stream rejected by app layer */
+#define QC_SF_READ_ABORTED      0x00000040  /* Rx closed using STOP_SENDING*/
 #define QC_SF_TO_RESET          0x00000080  /* a RESET_STREAM must be sent */
 #define QC_SF_HREQ_RECV         0x00000100  /* a full HTTP request has been received */
+#define QC_SF_TO_STOP_SENDING   0x00000200  /* a STOP_SENDING must be sent */
 
 /* Maximum size of stream Rx buffer. */
 #define QC_S_RX_BUF_SZ   (global.tune.bufsize - NCB_RESERVED_SZ)
index 0a88f6b4be00da401009f1c396e3ac2d14c65841..a165779fc209344e6368a41680b380fd9b71493a 100644 (file)
@@ -21,6 +21,7 @@ void qcs_notify_send(struct qcs *qcs);
 
 void qcc_emit_cc_app(struct qcc *qcc, int err, int immediate);
 void qcc_reset_stream(struct qcs *qcs, int err);
+void qcc_abort_stream_read(struct qcs *qcs);
 int qcc_recv(struct qcc *qcc, uint64_t id, uint64_t len, uint64_t offset,
              char fin, char *data);
 int qcc_recv_max_data(struct qcc *qcc, uint64_t max);
index 8912fa5d9d51a5f65221fc7f08c4f13820f8ddb1..518f413e5fcf9eec287a313e83715f474c264a03 100644 (file)
--- a/src/h3.c
+++ b/src/h3.c
@@ -220,7 +220,7 @@ static ssize_t h3_init_uni_stream(struct h3c *h3c, struct qcs *qcs,
                 * Implementations MUST [...] abort reading on unidirectional
                 * streams that have unknown or unsupported types.
                 */
-               qcs->flags |= QC_SF_READ_ABORTED;
+               qcc_abort_stream_read(qcs);
                return -1;
        };
 
index c780ebcad4c00c0c6e42d62c9fad6f719b7b0393..fbc4a935a0a46d355526aa2686308496ea3d3525 100644 (file)
@@ -755,10 +755,16 @@ static int qcc_decode_qcs(struct qcc *qcc, struct qcs *qcs)
        if (qcs_is_close_remote(qcs))
                fin = 1;
 
-       ret = qcc->app_ops->decode_qcs(qcs, &b, fin);
-       if (ret < 0) {
-               TRACE_ERROR("decoding error", QMUX_EV_QCS_RECV, qcc->conn, qcs);
-               goto err;
+       if (!(qcs->flags & QC_SF_READ_ABORTED)) {
+               ret = qcc->app_ops->decode_qcs(qcs, &b, fin);
+               if (ret < 0) {
+                       TRACE_ERROR("decoding error", QMUX_EV_QCS_RECV, qcc->conn, qcs);
+                       goto err;
+               }
+       }
+       else {
+               TRACE_DATA("ignore read on stream", QMUX_EV_QCS_RECV, qcc->conn, qcs);
+               ret = b_data(&b);
        }
 
        if (ret) {
@@ -813,6 +819,24 @@ void qcc_reset_stream(struct qcs *qcs, int err)
        tasklet_wakeup(qcc->wait_event.tasklet);
 }
 
+/* Prepare for the emission of STOP_SENDING on <qcs>. */
+void qcc_abort_stream_read(struct qcs *qcs)
+{
+       struct qcc *qcc = qcs->qcc;
+
+       TRACE_ENTER(QMUX_EV_QCC_NEW, qcc->conn, qcs);
+
+       if ((qcs->flags & QC_SF_TO_STOP_SENDING) || qcs_is_close_remote(qcs))
+               goto end;
+
+       TRACE_STATE("abort stream read", QMUX_EV_QCS_END, qcc->conn, qcs);
+       qcs->flags |= (QC_SF_TO_STOP_SENDING|QC_SF_READ_ABORTED);
+       tasklet_wakeup(qcc->wait_event.tasklet);
+
+ end:
+       TRACE_LEAVE(QMUX_EV_QCC_NEW, qcc->conn, qcs);
+}
+
 /* Install the <app_ops> applicative layer of a QUIC connection on mux <qcc>.
  * Returns 0 on success else non-zero.
  */
@@ -977,11 +1001,6 @@ int qcc_recv(struct qcc *qcc, uint64_t id, uint64_t len, uint64_t offset,
                qcc_refresh_timeout(qcc);
        }
 
-       if (qcs->flags & QC_SF_READ_ABORTED) {
-               /* TODO should send a STOP_SENDING */
-               qcs_free(qcs);
-       }
-
  out:
        TRACE_LEAVE(QMUX_EV_QCC_RECV, qcc->conn);
        return 0;
@@ -1538,6 +1557,58 @@ static int qcs_send_reset(struct qcs *qcs)
        return 0;
 }
 
+/* Emit a STOP_SENDING on <qcs>.
+ *
+ * Returns 0 if the frame has been successfully sent else non-zero.
+ */
+static int qcs_send_stop_sending(struct qcs *qcs)
+{
+       struct list frms = LIST_HEAD_INIT(frms);
+       struct quic_frame *frm;
+       struct qcc *qcc = qcs->qcc;
+
+       TRACE_ENTER(QMUX_EV_QCS_SEND, qcs->qcc->conn, qcs);
+
+       /* RFC 9000 3.3. Permitted Frame Types
+        *
+        * A
+        * receiver MAY send a STOP_SENDING frame in any state where it has not
+        * received a RESET_STREAM frame -- that is, states other than "Reset
+        * Recvd" or "Reset Read". However, there is little value in sending a
+        * STOP_SENDING frame in the "Data Recvd" state, as all stream data has
+        * been received. A sender could receive either of these two types of
+        * frames in any state as a result of delayed delivery of packets.ΒΆ
+        */
+       if (qcs_is_close_remote(qcs)) {
+               TRACE_STATE("skip STOP_SENDING on remote already closed", QMUX_EV_QCS_SEND, qcc->conn, qcs);
+               goto done;
+       }
+
+       frm = pool_zalloc(pool_head_quic_frame);
+       if (!frm) {
+               TRACE_LEAVE(QMUX_EV_QCS_SEND, qcs->qcc->conn, qcs);
+               return 1;
+       }
+
+       LIST_INIT(&frm->reflist);
+       frm->type = QUIC_FT_STOP_SENDING;
+       frm->stop_sending.id = qcs->id;
+       frm->stop_sending.app_error_code = qcs->err;
+
+       LIST_APPEND(&frms, &frm->list);
+       if (qc_send_frames(qcs->qcc, &frms)) {
+               pool_free(pool_head_quic_frame, frm);
+               TRACE_DEVEL("cannot send STOP_SENDING", QMUX_EV_QCS_SEND, qcs->qcc->conn, qcs);
+               return 1;
+       }
+
+ done:
+       qcs->flags &= ~QC_SF_TO_STOP_SENDING;
+
+       TRACE_LEAVE(QMUX_EV_QCS_SEND, qcs->qcc->conn, qcs);
+       return 0;
+}
+
 /* Used internally by qc_send function. Proceed to send for <qcs>. This will
  * transfer data from qcs buffer to its quic_stream counterpart. A STREAM frame
  * is then generated and inserted in <frms> list.
@@ -1650,6 +1721,9 @@ static int qc_send(struct qcc *qcc)
                        continue;
                }
 
+               if (qcs->flags & QC_SF_TO_STOP_SENDING)
+                       qcs_send_stop_sending(qcs);
+
                if (qcs->flags & QC_SF_TO_RESET) {
                        qcs_send_reset(qcs);
                        node = eb64_next(node);
@@ -1751,11 +1825,6 @@ static int qc_recv(struct qcc *qcc)
 
                qcc_decode_qcs(qcc, qcs);
                node = eb64_next(node);
-
-               if (qcs->flags & QC_SF_READ_ABORTED) {
-                       /* TODO should send a STOP_SENDING */
-                       qcs_free(qcs);
-               }
        }
 
        TRACE_LEAVE(QMUX_EV_QCC_RECV, qcc->conn);