From: Amaury Denoyelle Date: Mon, 4 Jul 2022 09:44:38 +0000 (+0200) Subject: MEDIUM: mux-quic: implement RESET_STREAM emission X-Git-Tag: v2.7-dev2~66 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=843a1196b3fc42b99ec1880142e11f3ca5e8866b;p=thirdparty%2Fhaproxy.git MEDIUM: mux-quic: implement RESET_STREAM emission Implement functions to be able to reset a stream via RESET_STREAM. If needed, a qcs instance is flagged with QC_SF_TO_RESET to schedule a stream reset. This will interrupt all future send operations. On stream emission, if a stream is flagged with QC_SF_TO_RESET, a RESET_STREAM frame is generated and emitted to the transport layer. If this operation succeeds, the stream is locally closed. If upper layer is instantiated, error flag is set on it. --- diff --git a/include/haproxy/mux_quic-t.h b/include/haproxy/mux_quic-t.h index ecee484070..a78609edb1 100644 --- a/include/haproxy/mux_quic-t.h +++ b/include/haproxy/mux_quic-t.h @@ -107,6 +107,7 @@ struct qcc { #define QC_SF_BLK_SFCTL 0x00000010 /* stream blocked due to stream flow control limit */ #define QC_SF_DEM_FULL 0x00000020 /* demux blocked on request channel buffer full */ #define QC_SF_READ_ABORTED 0x00000040 /* stream rejected by app layer */ +#define QC_SF_TO_RESET 0x00000080 /* a RESET_STREAM must be sent */ /* Maximum size of stream Rx buffer. */ #define QC_S_RX_BUF_SZ (global.tune.bufsize - NCB_RESERVED_SZ) @@ -163,6 +164,8 @@ struct qcs { struct wait_event wait_event; struct wait_event *subs; + + uint64_t err; /* error code to transmit via RESET_STREAM */ }; /* QUIC application layer operations */ diff --git a/include/haproxy/mux_quic.h b/include/haproxy/mux_quic.h index fbfa58c242..23a17cb251 100644 --- a/include/haproxy/mux_quic.h +++ b/include/haproxy/mux_quic.h @@ -22,6 +22,7 @@ void qcs_notify_recv(struct qcs *qcs); void qcs_notify_send(struct qcs *qcs); void qcc_emit_cc_app(struct qcc *qcc, int err); +void qcc_reset_stream(struct qcs *qcs, int err); int qcc_recv(struct qcc *qcc, uint64_t id, uint64_t len, uint64_t offset, char fin, char *data); int qcc_recv_max_data(struct qcc *qcc, uint64_t max); diff --git a/src/mux_quic.c b/src/mux_quic.c index 073fc977f8..44fdf6a7f6 100644 --- a/src/mux_quic.c +++ b/src/mux_quic.c @@ -171,6 +171,8 @@ static struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type) qcs->wait_event.events = 0; qcs->subs = NULL; + qcs->err = 0; + out: TRACE_LEAVE(QMUX_EV_QCS_NEW, qcc->conn, qcs); return qcs; @@ -674,6 +676,20 @@ void qcc_emit_cc_app(struct qcc *qcc, int err) tasklet_wakeup(qcc->wait_event.tasklet); } +/* Prepare for the emission of RESET_STREAM on with error code . */ +void qcc_reset_stream(struct qcs *qcs, int err) +{ + struct qcc *qcc = qcs->qcc; + + if ((qcs->flags & QC_SF_TO_RESET) || qcs_is_close_local(qcs)) + return; + + qcs->flags |= QC_SF_TO_RESET; + qcs->err = err; + tasklet_wakeup(qcc->wait_event.tasklet); + TRACE_DEVEL("reset stream", QMUX_EV_QCS_END, qcc->conn, qcs); +} + /* Handle a new STREAM frame for stream with id . Payload is pointed by * with length and represents the offset . is set if * the QUIC frame FIN bit is set. @@ -1275,6 +1291,46 @@ static int qc_send_frames(struct qcc *qcc, struct list *frms) return 0; } +/* Emit a RESET_STREAM on . + * + * Returns 0 if the frame has been successfully sent else non-zero. + */ +static int qcs_send_reset(struct qcs *qcs) +{ + struct list frms = LIST_HEAD_INIT(frms); + struct quic_frame *frm; + + TRACE_ENTER(QMUX_EV_QCS_SEND, qcs->qcc->conn, qcs); + + frm = pool_zalloc(pool_head_quic_frame); + if (!frm) + return 1; + + LIST_INIT(&frm->reflist); + frm->type = QUIC_FT_RESET_STREAM; + frm->reset_stream.id = qcs->id; + frm->reset_stream.app_error_code = qcs->err; + frm->reset_stream.final_size = qcs->tx.sent_offset; + + LIST_APPEND(&frms, &frm->list); + if (qc_send_frames(qcs->qcc, &frms)) { + pool_free(pool_head_quic_frame, frm); + TRACE_DEVEL("cannot send RESET_STREAM", QMUX_EV_QCS_SEND, qcs->qcc->conn, qcs); + return 1; + } + + if (qcs_sc(qcs)) { + se_fl_set_error(qcs->sd); + qcs_alert(qcs); + } + + qcs_close_local(qcs); + qcs->flags &= ~QC_SF_TO_RESET; + + TRACE_LEAVE(QMUX_EV_QCS_SEND, qcs->qcc->conn, qcs); + return 0; +} + /* Used internally by qc_send function. Proceed to send for . This will * transfer data from qcs buffer to its quic_stream counterpart. A STREAM frame * is then generated and inserted in list. @@ -1378,6 +1434,12 @@ static int qc_send(struct qcc *qcc) continue; } + if (qcs->flags & QC_SF_TO_RESET) { + qcs_send_reset(qcs); + node = eb64_next(node); + continue; + } + if (qcs_is_close_local(qcs)) { node = eb64_next(node); continue; @@ -1834,7 +1896,7 @@ static size_t qc_snd_buf(struct stconn *sc, struct buffer *buf, TRACE_ENTER(QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs); - if (qcs_is_close_local(qcs)) { + if (qcs_is_close_local(qcs) || (qcs->flags & QC_SF_TO_RESET)) { ret = count; goto end; }