]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MAJOR: mux-quic: rework stream sending priorization
authorAmaury Denoyelle <adenoyelle@haproxy.com>
Tue, 3 Jan 2023 13:39:24 +0000 (14:39 +0100)
committerAmaury Denoyelle <adenoyelle@haproxy.com>
Tue, 10 Jan 2023 16:49:50 +0000 (17:49 +0100)
Implement a mechanism to register streams ready to send data in new
STREAM frames. Internally, this is implemented with a new list
<qcc.send_list> which contains qcs instances.

A qcs can be registered safely using the new function qcc_send_stream().
This is done automatically in qc_send_buf() which covers most cases.
Also, application layer is free to use it for internal usage streams.
This is currently the case for H3 control stream with SETTINGS sending.

The main point of this patch is to handle stream sending fairly. This is
in stark contrast with previous code where streams with lower ID were
always prioritized. This could cause other streams to be indefinitely
blocked behind a stream which has a lot of data to transfer. Now,
streams are handled in an order scheduled by se_desc layer.

This commit is the first one of a serie which will bring other
improvments which also relied on the send_list implementation.

This must be backported up to 2.7 when deemed sufficiently stable.

include/haproxy/mux_quic-t.h
include/haproxy/mux_quic.h
src/h3.c
src/mux_quic.c

index f1c96f077630f508d01c99e51d3a81c9d916f73a..256cdd93a1cbe0b8de25cc9b6b129108a73f0719 100644 (file)
@@ -95,6 +95,7 @@ struct qcc {
        struct eb_root streams_by_id; /* all active streams by their ID */
 
        struct list send_retry_list; /* list of qcs eligible to send retry */
+       struct list send_list; /* list of qcs ready to send */
 
        struct wait_event wait_event;  /* To be used if we're waiting for I/Os */
 
@@ -174,6 +175,7 @@ struct qcs {
        struct qc_stream_desc *stream;
 
        struct list el; /* element of qcc.send_retry_list */
+       struct list el_send; /* element of qcc.send_list */
        struct list el_opening; /* element of qcc.opening_list */
 
        struct wait_event wait_event;
index a165779fc209344e6368a41680b380fd9b71493a..1d374b0b0cbc8eb0dbb95cf11712044b19de8e86 100644 (file)
@@ -21,6 +21,7 @@ void qcs_notify_send(struct qcs *qcs);
 
 void qcc_emit_cc_app(struct qcc *qcc, int err, int immediate);
 void qcc_reset_stream(struct qcs *qcs, int err);
+void qcc_send_stream(struct qcs *qcs);
 void qcc_abort_stream_read(struct qcs *qcs);
 int qcc_recv(struct qcc *qcc, uint64_t id, uint64_t len, uint64_t offset,
              char fin, char *data);
index 80357647b740c6a871aab8e630980315a66ad39c..e8404485716fd1465a4da2f2e7dc56fcdbde7d14 100644 (file)
--- a/src/h3.c
+++ b/src/h3.c
@@ -1032,8 +1032,11 @@ static int h3_control_send(struct qcs *qcs, void *ctx)
        }
 
        ret = b_force_xfer(res, &pos, b_data(&pos));
-       if (ret > 0)
+       if (ret > 0) {
+               /* Register qcs for sending before other streams. */
+               qcc_send_stream(qcs);
                h3c->flags |= H3_CF_SETTINGS_SENT;
+       }
 
        TRACE_LEAVE(H3_EV_TX_SETTINGS, qcs->qcc->conn, qcs);
        return ret;
index 120eeb1f1a9c73271b53d9aa45c09abb42645baa..6a324acf52819bc93fe86aa81c7baf38fbb7e879 100644 (file)
@@ -61,6 +61,7 @@ static void qcs_free(struct qcs *qcs)
 
        /* Safe to use even if already removed from the list. */
        LIST_DEL_INIT(&qcs->el_opening);
+       LIST_DEL_INIT(&qcs->el_send);
 
        /* Release stream endpoint descriptor. */
        BUG_ON(qcs->sd && !se_fl_test(qcs->sd, SE_FL_ORPHAN));
@@ -112,6 +113,7 @@ static struct qcs *qcs_new(struct qcc *qcc, uint64_t id, enum qcs_type type)
         * These fields must be initialed before.
         */
        LIST_INIT(&qcs->el_opening);
+       LIST_INIT(&qcs->el_send);
        qcs->start = TICK_ETERNITY;
 
        /* store transport layer stream descriptor in qcc tree */
@@ -819,6 +821,22 @@ void qcc_reset_stream(struct qcs *qcs, int err)
        tasklet_wakeup(qcc->wait_event.tasklet);
 }
 
+/* Register <qcs> stream for emission of STREAM, STOP_SENDING or RESET_STREAM. */
+void qcc_send_stream(struct qcs *qcs)
+{
+       struct qcc *qcc = qcs->qcc;
+
+       TRACE_ENTER(QMUX_EV_QCS_SEND, qcc->conn, qcs);
+
+       /* Cannot send if already closed. */
+       BUG_ON(qcs_is_close_local(qcs));
+
+       if (!LIST_INLIST(&qcs->el_send))
+               LIST_APPEND(&qcs->qcc->send_list, &qcs->el_send);
+
+       TRACE_LEAVE(QMUX_EV_QCS_SEND, qcc->conn, qcs);
+}
+
 /* Prepare for the emission of STOP_SENDING on <qcs>. */
 void qcc_abort_stream_read(struct qcs *qcs)
 {
@@ -1067,6 +1085,7 @@ int qcc_recv_max_stream_data(struct qcc *qcc, uint64_t id, uint64_t max)
 
                        if (qcs->flags & QC_SF_BLK_SFCTL) {
                                qcs->flags &= ~QC_SF_BLK_SFCTL;
+                               /* TODO optim: only wakeup IO-CB if stream has data to sent. */
                                tasklet_wakeup(qcc->wait_event.tasklet);
                        }
                }
@@ -1469,12 +1488,17 @@ void qcc_streams_sent_done(struct qcs *qcs, uint64_t data, uint64_t offset)
                }
        }
 
-       if (qcs->tx.offset == qcs->tx.sent_offset && !b_data(&qcs->tx.buf) &&
-            qcs->flags & (QC_SF_FIN_STREAM|QC_SF_DETACH)) {
-               /* Close stream locally. */
-               qcs_close_local(qcs);
-               /* Reset flag to not emit multiple FIN STREAM frames. */
-               qcs->flags &= ~QC_SF_FIN_STREAM;
+       if (qcs->tx.offset == qcs->tx.sent_offset && !b_data(&qcs->tx.buf)) {
+               /* Remove stream from send_list if all was sent. */
+               LIST_DEL_INIT(&qcs->el_send);
+               TRACE_STATE("stream sent done", QMUX_EV_QCS_SEND, qcc->conn, qcs);
+
+               if (qcs->flags & (QC_SF_FIN_STREAM|QC_SF_DETACH)) {
+                       /* Close stream locally. */
+                       qcs_close_local(qcs);
+                       /* Reset flag to not emit multiple FIN STREAM frames. */
+                       qcs->flags &= ~QC_SF_FIN_STREAM;
+               }
        }
 
  out:
@@ -1627,6 +1651,9 @@ static int _qc_send_qcs(struct qcs *qcs, struct list *frms)
        int xfer = 0;
        char fin = 0;
 
+       /* Cannot send STREAM on remote unidirectional streams. */
+       BUG_ON(quic_stream_is_uni(qcs->id) && quic_stream_is_remote(qcc, qcs->id));
+
        /* Allocate <out> buffer if necessary. */
        if (!out) {
                if (qcc->flags & QC_CF_CONN_FULL)
@@ -1708,12 +1735,13 @@ static int qc_send(struct qcc *qcc)
                qcc->flags |= QC_CF_APP_FINAL;
        }
 
-       /* loop through all streams, construct STREAM frames if data available.
-        * TODO optimize the loop to favor streams which are not too heavy.
+       /* Loop through all streams for STOP_SENDING/RESET_STREAM sending. Each
+        * frame is send individually to guarantee emission.
+        *
+        * TODO Optimize sending by multiplexing several frames in one datagram.
         */
        node = eb64_first(&qcc->streams_by_id);
        while (node) {
-               int ret;
                uint64_t id;
 
                qcs = eb64_entry(node, struct qcs, by_id);
@@ -1733,26 +1761,23 @@ static int qc_send(struct qcc *qcc)
                        continue;
                }
 
-               if (qcs_is_close_local(qcs)) {
-                       node = eb64_next(node);
-                       continue;
-               }
+               node = eb64_next(node);
+       }
 
-               if (qcs->flags & QC_SF_BLK_SFCTL) {
-                       node = eb64_next(node);
-                       continue;
-               }
+       /* Send STREAM data for registered streams. */
+       list_for_each_entry(qcs, &qcc->send_list, el_send) {
+               /* Stream must not be present in send_list if it has nothing to send. */
+               BUG_ON(!b_data(&qcs->tx.buf) &&
+                      qcs->tx.sent_offset == qcs->tx.offset &&
+                      !qcs_stream_fin(qcs));
 
-               /* Check if there is something to send. */
-               if (!b_data(&qcs->tx.buf) && !qcs_stream_fin(qcs) &&
-                   !qc_stream_buf_get(qcs->stream)) {
-                       node = eb64_next(node);
+               if (qcs_is_close_local(qcs)) {
+                       LIST_DEL_INIT(&qcs->el_send);
                        continue;
                }
 
-               ret = _qc_send_qcs(qcs, &frms);
-               total += ret;
-               node = eb64_next(node);
+               if (!(qcs->flags & QC_SF_BLK_SFCTL))
+                       total += _qc_send_qcs(qcs, &frms);
        }
 
        if (qc_send_frames(qcc, &frms)) {
@@ -1780,6 +1805,7 @@ static int qc_send(struct qcc *qcc)
        /* Deallocate frames that the transport layer has rejected. */
        if (!LIST_ISEMPTY(&frms)) {
                struct quic_frame *frm, *frm2;
+
                list_for_each_entry_safe(frm, frm2, &frms, list) {
                        LIST_DELETE(&frm->list);
                        pool_free(pool_head_quic_frame, frm);
@@ -2130,6 +2156,7 @@ static int qc_init(struct connection *conn, struct proxy *prx,
                goto fail_no_tasklet;
        }
 
+       LIST_INIT(&qcc->send_list);
        LIST_INIT(&qcc->send_retry_list);
 
        qcc->wait_event.tasklet->process = qc_io_cb;
@@ -2300,6 +2327,7 @@ static size_t qc_send_buf(struct stconn *sc, struct buffer *buf,
                qcs->flags |= QC_SF_FIN_STREAM;
 
        if (ret || fin) {
+               qcc_send_stream(qcs);
                if (!(qcs->qcc->wait_event.events & SUB_RETRY_SEND))
                        tasklet_wakeup(qcs->qcc->wait_event.tasklet);
        }