]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MEDIUM: mux-quic: implement RESET_STREAM emission
authorAmaury Denoyelle <adenoyelle@haproxy.com>
Mon, 4 Jul 2022 09:44:38 +0000 (11:44 +0200)
committerAmaury Denoyelle <adenoyelle@haproxy.com>
Mon, 11 Jul 2022 14:45:04 +0000 (16:45 +0200)
Implement functions to be able to reset a stream via RESET_STREAM.

If needed, a qcs instance is flagged with QC_SF_TO_RESET to schedule a
stream reset. This will interrupt all future send operations.

On stream emission, if a stream is flagged with QC_SF_TO_RESET, a
RESET_STREAM frame is generated and emitted to the transport layer. If
this operation succeeds, the stream is locally closed. If upper layer is
instantiated, error flag is set on it.

include/haproxy/mux_quic-t.h
include/haproxy/mux_quic.h
src/mux_quic.c

index ecee4840708a1fe3f27fea9ff7f0acba2c65185f..a78609edb10f53c5cebaacab51c2052151430b21 100644 (file)
@@ -107,6 +107,7 @@ struct qcc {
 #define QC_SF_BLK_SFCTL         0x00000010  /* stream blocked due to stream flow control limit */
 #define QC_SF_DEM_FULL          0x00000020  /* demux blocked on request channel buffer full */
 #define QC_SF_READ_ABORTED      0x00000040  /* stream rejected by app layer */
+#define QC_SF_TO_RESET          0x00000080  /* a RESET_STREAM must be sent */
 
 /* Maximum size of stream Rx buffer. */
 #define QC_S_RX_BUF_SZ   (global.tune.bufsize - NCB_RESERVED_SZ)
@@ -163,6 +164,8 @@ struct qcs {
 
        struct wait_event wait_event;
        struct wait_event *subs;
+
+       uint64_t err; /* error code to transmit via RESET_STREAM */
 };
 
 /* QUIC application layer operations */
index fbfa58c242a768a00c0e816f82355d758d62155f..23a17cb2515f1dfafff4e2d1196c49f0d466f818 100644 (file)
@@ -22,6 +22,7 @@ void qcs_notify_recv(struct qcs *qcs);
 void qcs_notify_send(struct qcs *qcs);
 
 void qcc_emit_cc_app(struct qcc *qcc, int err);
+void qcc_reset_stream(struct qcs *qcs, int err);
 int qcc_recv(struct qcc *qcc, uint64_t id, uint64_t len, uint64_t offset,
              char fin, char *data);
 int qcc_recv_max_data(struct qcc *qcc, uint64_t max);
index 073fc977f82c3d72947a063aa9c13d93ced256aa..44fdf6a7f6fc6b4e29d40e17817d2dc75d595c67 100644 (file)
@@ -171,6 +171,8 @@ static struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type)
        qcs->wait_event.events = 0;
        qcs->subs = NULL;
 
+       qcs->err = 0;
+
  out:
        TRACE_LEAVE(QMUX_EV_QCS_NEW, qcc->conn, qcs);
        return qcs;
@@ -674,6 +676,20 @@ void qcc_emit_cc_app(struct qcc *qcc, int err)
        tasklet_wakeup(qcc->wait_event.tasklet);
 }
 
+/* Prepare for the emission of RESET_STREAM on <qcs> with error code <err>. */
+void qcc_reset_stream(struct qcs *qcs, int err)
+{
+       struct qcc *qcc = qcs->qcc;
+
+       if ((qcs->flags & QC_SF_TO_RESET) || qcs_is_close_local(qcs))
+               return;
+
+       qcs->flags |= QC_SF_TO_RESET;
+       qcs->err = err;
+       tasklet_wakeup(qcc->wait_event.tasklet);
+       TRACE_DEVEL("reset stream", QMUX_EV_QCS_END, qcc->conn, qcs);
+}
+
 /* Handle a new STREAM frame for stream with id <id>. Payload is pointed by
  * <data> with length <len> and represents the offset <offset>. <fin> is set if
  * the QUIC frame FIN bit is set.
@@ -1275,6 +1291,46 @@ static int qc_send_frames(struct qcc *qcc, struct list *frms)
        return 0;
 }
 
+/* Emit a RESET_STREAM on <qcs>.
+ *
+ * Returns 0 if the frame has been successfully sent else non-zero.
+ */
+static int qcs_send_reset(struct qcs *qcs)
+{
+       struct list frms = LIST_HEAD_INIT(frms);
+       struct quic_frame *frm;
+
+       TRACE_ENTER(QMUX_EV_QCS_SEND, qcs->qcc->conn, qcs);
+
+       frm = pool_zalloc(pool_head_quic_frame);
+       if (!frm)
+               return 1;
+
+       LIST_INIT(&frm->reflist);
+       frm->type = QUIC_FT_RESET_STREAM;
+       frm->reset_stream.id = qcs->id;
+       frm->reset_stream.app_error_code = qcs->err;
+       frm->reset_stream.final_size = qcs->tx.sent_offset;
+
+       LIST_APPEND(&frms, &frm->list);
+       if (qc_send_frames(qcs->qcc, &frms)) {
+               pool_free(pool_head_quic_frame, frm);
+               TRACE_DEVEL("cannot send RESET_STREAM", QMUX_EV_QCS_SEND, qcs->qcc->conn, qcs);
+               return 1;
+       }
+
+       if (qcs_sc(qcs)) {
+               se_fl_set_error(qcs->sd);
+               qcs_alert(qcs);
+       }
+
+       qcs_close_local(qcs);
+       qcs->flags &= ~QC_SF_TO_RESET;
+
+       TRACE_LEAVE(QMUX_EV_QCS_SEND, qcs->qcc->conn, qcs);
+       return 0;
+}
+
 /* Used internally by qc_send function. Proceed to send for <qcs>. This will
  * transfer data from qcs buffer to its quic_stream counterpart. A STREAM frame
  * is then generated and inserted in <frms> list.
@@ -1378,6 +1434,12 @@ static int qc_send(struct qcc *qcc)
                        continue;
                }
 
+               if (qcs->flags & QC_SF_TO_RESET) {
+                       qcs_send_reset(qcs);
+                       node = eb64_next(node);
+                       continue;
+               }
+
                if (qcs_is_close_local(qcs)) {
                        node = eb64_next(node);
                        continue;
@@ -1834,7 +1896,7 @@ static size_t qc_snd_buf(struct stconn *sc, struct buffer *buf,
 
        TRACE_ENTER(QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs);
 
-       if (qcs_is_close_local(qcs)) {
+       if (qcs_is_close_local(qcs) || (qcs->flags & QC_SF_TO_RESET)) {
                ret = count;
                goto end;
        }