]> git.ipfire.org Git - thirdparty/haproxy.git/commitdiff
MINOR: mux-quic: implement immediate send retry
authorAmaury Denoyelle <adenoyelle@haproxy.com>
Fri, 15 Apr 2022 15:32:04 +0000 (17:32 +0200)
committerAmaury Denoyelle <adenoyelle@haproxy.com>
Thu, 21 Apr 2022 10:04:04 +0000 (12:04 +0200)
Complete qc_send function. After having processed each qcs emission, it
will now retry send on qcs where transfer can continue. This is useful
when qc_stream_desc buffer is full and there is still data present in
qcs buf.

To implement this, each eligible qcs is inserted in a new list
<qcc.send_retry_list>. This is done on send notification from the
transport layer through qcc_streams_sent_done(). Retry emission until
send_retry_list is empty or the transport layer cannot proceed more
data.

Several send operations are now called on two different places. Thus a
new _qc_send_qcs() function is defined to factorize the code.

This change should maximize the throughput during QUIC transfers.

include/haproxy/mux_quic-t.h
include/haproxy/quic_stream.h
src/mux_quic.c
src/quic_stream.c

index 44fb4d62746122824df86e4d3a1cdbe2ec1fc1af..d95e9dc0d22def7ceef7959be93ee9f62549d7c7 100644 (file)
@@ -10,6 +10,7 @@
 
 #include <haproxy/buf-t.h>
 #include <haproxy/connection-t.h>
+#include <haproxy/list-t.h>
 #include <haproxy/quic_stream-t.h>
 #include <haproxy/xprt_quic-t.h>
 #include <haproxy/conn_stream-t.h>
@@ -70,6 +71,8 @@ struct qcc {
 
        struct eb_root streams_by_id; /* all active streams by their ID */
 
+       struct list send_retry_list; /* list of qcs eligible to send retry */
+
        struct wait_event wait_event;  /* To be used if we're waiting for I/Os */
        struct wait_event *subs;
 
@@ -111,6 +114,8 @@ struct qcs {
        uint64_t id;
        struct qc_stream_desc *stream;
 
+       struct list el; /* element of qcc.send_retry_list */
+
        struct wait_event wait_event;
        struct wait_event *subs;
 };
index 6d9359d0fb47d2e63c87840777632fa97d9a6b88..0550f4f0c3181ce9c6370fd84d25383103387259 100644 (file)
@@ -14,7 +14,6 @@ int qc_stream_desc_ack(struct qc_stream_desc **stream, size_t offset, size_t len
 void qc_stream_desc_free(struct qc_stream_desc *stream);
 
 struct buffer *qc_stream_buf_get(struct qc_stream_desc *stream);
-int qc_stream_buf_avail(struct quic_conn *qc);
 struct buffer *qc_stream_buf_alloc(struct qc_stream_desc *stream,
                                    uint64_t offset);
 void qc_stream_buf_release(struct qc_stream_desc *stream);
index b969d3a4bda2099750e493fc7d7eaa45352e4d1c..6a65d6923b4ae94f750d9fc518ec6f6990afdba3 100644 (file)
@@ -7,6 +7,7 @@
 #include <haproxy/conn_stream.h>
 #include <haproxy/dynbuf.h>
 #include <haproxy/htx.h>
+#include <haproxy/list.h>
 #include <haproxy/pool.h>
 #include <haproxy/quic_stream.h>
 #include <haproxy/sink.h>
@@ -722,14 +723,9 @@ void qcc_streams_sent_done(struct qcs *qcs, uint64_t data, uint64_t offset)
 
        if (qcs->tx.offset == qcs->tx.sent_offset && b_full(&qcs->stream->buf->buf)) {
                qc_stream_buf_release(qcs->stream);
-
-               /* reschedule send if buffers available */
-               if (qc_stream_buf_avail(qcc->conn->handle.qc)) {
-                       tasklet_wakeup(qcc->wait_event.tasklet);
-               }
-               else {
-                       qcc->flags |= QC_CF_CONN_FULL;
-               }
+               /* prepare qcs for immediate send retry if data to send */
+               if (b_data(&qcs->tx.buf))
+                       LIST_APPEND(&qcc->send_retry_list, &qcs->el);
        }
 }
 
@@ -760,9 +756,11 @@ static int qc_send_frames(struct qcc *qcc, struct list *frms)
 
        if (LIST_ISEMPTY(frms)) {
                TRACE_DEVEL("leaving with no frames to send", QMUX_EV_QCC_SEND, qcc->conn);
-               return 0;
+               return 1;
        }
 
+       LIST_INIT(&qcc->send_retry_list);
+
  retry_send:
        first_frm = LIST_ELEM(frms->n, struct quic_frame *, list);
        if ((first_frm->type & QUIC_FT_STREAM_8) == QUIC_FT_STREAM_8) {
@@ -837,6 +835,65 @@ static int qc_send_max_streams(struct qcc *qcc)
        return 0;
 }
 
+/* Used internally by qc_send function. Proceed to send for <qcs>. This will
+ * transfer data from qcs buffer to its quic_stream counterpart. A STREAM frame
+ * is then generated and inserted in <frms> list. <qcc_max_data> is the current
+ * flow-control max-data at the connection level which must not be surpassed.
+ *
+ * Returns the total bytes transferred between qcs and quic_stream buffers. Can
+ * be null if out buffer cannot be allocated.
+ */
+static int _qc_send_qcs(struct qcs *qcs, struct list *frms,
+                        uint64_t qcc_max_data)
+{
+       struct qcc *qcc = qcs->qcc;
+       struct buffer *buf = &qcs->tx.buf;
+       struct buffer *out = qc_stream_buf_get(qcs->stream);
+       int xfer = 0;
+
+       /* Allocate <out> buffer if necessary. */
+       if (!out) {
+               if (qcc->flags & QC_CF_CONN_FULL)
+                       return 0;
+
+               out = qc_stream_buf_alloc(qcs->stream, qcs->tx.offset);
+               if (!out) {
+                       qcc->flags |= QC_CF_CONN_FULL;
+                       return 0;
+               }
+       }
+
+       /* Transfer data from <buf> to <out>. */
+       if (b_data(buf)) {
+               xfer = qcs_xfer_data(qcs, out, buf, qcc_max_data);
+               BUG_ON(xfer < 0); /* TODO handle this properly */
+
+               if (xfer > 0) {
+                       qcs_notify_send(qcs);
+                       qcs->flags &= ~QC_SF_BLK_MROOM;
+               }
+
+               qcs->tx.offset += xfer;
+       }
+
+       /* out buffer cannot be emptied if qcs offsets differ. */
+       BUG_ON(!b_data(out) && qcs->tx.sent_offset != qcs->tx.offset);
+
+       /* Build a new STREAM frame with <out> buffer. */
+       if (qcs->tx.sent_offset != qcs->tx.offset) {
+               int ret;
+               char fin = !!(qcs->flags & QC_SF_FIN_STREAM);
+
+               /* FIN is set if all incoming data were transfered. */
+               fin = !!(fin && !b_data(buf));
+
+               ret = qcs_build_stream_frm(qcs, out, fin, frms);
+               BUG_ON(ret < 0); /* TODO handle this properly */
+       }
+
+       return xfer;
+}
+
 /* Proceed to sending. Loop through all available streams for the <qcc>
  * instance and try to send as much as possible.
  *
@@ -846,7 +903,8 @@ static int qc_send(struct qcc *qcc)
 {
        struct list frms = LIST_HEAD_INIT(frms);
        struct eb64_node *node;
-       int total = 0;
+       struct qcs *qcs, *qcs_tmp;
+       int total = 0, tmp_total = 0;
 
        TRACE_ENTER(QMUX_EV_QCC_SEND);
 
@@ -867,9 +925,8 @@ static int qc_send(struct qcc *qcc)
         */
        node = eb64_first(&qcc->streams_by_id);
        while (node) {
-               struct qcs *qcs = eb64_entry(node, struct qcs, by_id);
-               struct buffer *buf = &qcs->tx.buf;
-               struct buffer *out = qc_stream_buf_get(qcs->stream);
+               int ret;
+               qcs = eb64_entry(node, struct qcs, by_id);
 
                /* TODO
                 * for the moment, unidirectional streams have their own
@@ -886,63 +943,38 @@ static int qc_send(struct qcc *qcc)
                        continue;
                }
 
-               if (!b_data(buf) && !out) {
+               if (!b_data(&qcs->tx.buf) && !qc_stream_buf_get(qcs->stream)) {
                        node = eb64_next(node);
                        continue;
                }
 
-               if (!out && (qcc->flags & QC_CF_CONN_FULL)) {
-                       node = eb64_next(node);
-                       continue;
-               }
-
-               if (!out) {
-                       out = qc_stream_buf_alloc(qcs->stream, qcs->tx.offset);
-                       if (!out) {
-                               qcc->flags |= QC_CF_CONN_FULL;
-                               node = eb64_next(node);
-                               continue;
-                       }
-               }
-
-               /* Prepare <out> buffer with data from <buf>. */
-               if (b_data(buf)) {
-                       int ret = qcs_xfer_data(qcs, out, buf,
-                                               qcc->tx.sent_offsets + total);
-                       BUG_ON(ret < 0); /* TODO handle this properly */
-
-                       if (ret > 0) {
-                               qcs_notify_send(qcs);
-                               if (qcs->flags & QC_SF_BLK_MROOM)
-                                       qcs->flags &= ~QC_SF_BLK_MROOM;
-                       }
-
-                       qcs->tx.offset += ret;
-                       total += ret;
-               }
-
-               /* Subscribe if not all data can be transfered. */
-               if (b_data(buf)) {
-                       qcc->conn->xprt->subscribe(qcc->conn, qcc->conn->xprt_ctx,
-                                                  SUB_RETRY_SEND, &qcc->wait_event);
-               }
+               ret = _qc_send_qcs(qcs, &frms, qcc->tx.sent_offsets + total);
+               total += ret;
+               node = eb64_next(node);
+       }
 
-               /* Build a new STREAM frame with <out> buffer. */
-               if (b_data(out) && qcs->tx.sent_offset != qcs->tx.offset) {
-                       int ret;
-                       char fin = !!(qcs->flags & QC_SF_FIN_STREAM);
+       if (qc_send_frames(qcc, &frms)) {
+               /* data rejected by transport layer, do not retry. */
+               goto out;
+       }
 
-                       /* FIN is set if all incoming data were transfered. */
-                       fin = !!(fin && !b_data(buf));
-                       ret = qcs_build_stream_frm(qcs, out, fin, &frms);
-                       BUG_ON(ret < 0); /* TODO handle this properly */
-               }
+ retry:
+       tmp_total = 0;
+       list_for_each_entry_safe(qcs, qcs_tmp, &qcc->send_retry_list, el) {
+               int ret;
+               BUG_ON(!b_data(&qcs->tx.buf));
+               BUG_ON(qc_stream_buf_get(qcs->stream));
 
-               node = eb64_next(node);
+               ret = _qc_send_qcs(qcs, &frms, qcc->tx.sent_offsets + tmp_total);
+               tmp_total += ret;
+               LIST_DELETE(&qcs->el);
        }
 
-       qc_send_frames(qcc, &frms);
+       total += tmp_total;
+       if (!qc_send_frames(qcc, &frms) && !LIST_ISEMPTY(&qcc->send_retry_list))
+               goto retry;
 
+ out:
        TRACE_LEAVE(QMUX_EV_QCC_SEND);
 
        return total;
@@ -1105,6 +1137,8 @@ static int qc_init(struct connection *conn, struct proxy *prx,
        if (!qcc->wait_event.tasklet)
                goto fail_no_tasklet;
 
+       LIST_INIT(&qcc->send_retry_list);
+
        qcc->subs = NULL;
        qcc->wait_event.tasklet->process = qc_io_cb;
        qcc->wait_event.tasklet->context = qcc;
index 2dd9b1c4ce7cd9415e9679e113feef43e977d67e..839efd0868956ba49b97e211c93b2259a28e741f 100644 (file)
@@ -206,7 +206,7 @@ struct buffer *qc_stream_buf_get(struct qc_stream_desc *stream)
 /* Check if a new stream buffer can be allocated for the connection <qc>.
  * Returns a boolean.
  */
-int qc_stream_buf_avail(struct quic_conn *qc)
+static int qc_stream_buf_avail(struct quic_conn *qc)
 {
        /* TODO use a global tune settings for max */
        return qc->stream_buf_count < 30;