#include <haproxy/conn_stream.h>
#include <haproxy/dynbuf.h>
#include <haproxy/htx.h>
+#include <haproxy/list.h>
#include <haproxy/pool.h>
#include <haproxy/quic_stream.h>
#include <haproxy/sink.h>
if (qcs->tx.offset == qcs->tx.sent_offset && b_full(&qcs->stream->buf->buf)) {
qc_stream_buf_release(qcs->stream);
-
- /* reschedule send if buffers available */
- if (qc_stream_buf_avail(qcc->conn->handle.qc)) {
- tasklet_wakeup(qcc->wait_event.tasklet);
- }
- else {
- qcc->flags |= QC_CF_CONN_FULL;
- }
+ /* prepare qcs for immediate send retry if data to send */
+ if (b_data(&qcs->tx.buf))
+ LIST_APPEND(&qcc->send_retry_list, &qcs->el);
}
}
if (LIST_ISEMPTY(frms)) {
TRACE_DEVEL("leaving with no frames to send", QMUX_EV_QCC_SEND, qcc->conn);
- return 0;
+ return 1;
}
+ LIST_INIT(&qcc->send_retry_list);
+
retry_send:
first_frm = LIST_ELEM(frms->n, struct quic_frame *, list);
if ((first_frm->type & QUIC_FT_STREAM_8) == QUIC_FT_STREAM_8) {
return 0;
}
+/* Used internally by qc_send function. Proceed to send for <qcs>. This will
+ * transfer data from qcs buffer to its quic_stream counterpart. A STREAM frame
+ * is then generated and inserted in <frms> list. <qcc_max_data> is the current
+ * flow-control max-data at the connection level which must not be surpassed.
+ *
+ * Returns the total bytes transferred between qcs and quic_stream buffers. Can
+ * be null if out buffer cannot be allocated.
+ */
+static int _qc_send_qcs(struct qcs *qcs, struct list *frms,
+ uint64_t qcc_max_data)
+{
+ struct qcc *qcc = qcs->qcc;
+ struct buffer *buf = &qcs->tx.buf;
+ struct buffer *out = qc_stream_buf_get(qcs->stream);
+ int xfer = 0;
+
+ /* Allocate <out> buffer if necessary. */
+ if (!out) {
+ if (qcc->flags & QC_CF_CONN_FULL)
+ return 0;
+
+ out = qc_stream_buf_alloc(qcs->stream, qcs->tx.offset);
+ if (!out) {
+ qcc->flags |= QC_CF_CONN_FULL;
+ return 0;
+ }
+ }
+
+ /* Transfer data from <buf> to <out>. */
+ if (b_data(buf)) {
+ xfer = qcs_xfer_data(qcs, out, buf, qcc_max_data);
+ BUG_ON(xfer < 0); /* TODO handle this properly */
+
+ if (xfer > 0) {
+ qcs_notify_send(qcs);
+ qcs->flags &= ~QC_SF_BLK_MROOM;
+ }
+
+ qcs->tx.offset += xfer;
+ }
+
+ /* out buffer cannot be emptied if qcs offsets differ. */
+ BUG_ON(!b_data(out) && qcs->tx.sent_offset != qcs->tx.offset);
+
+ /* Build a new STREAM frame with <out> buffer. */
+ if (qcs->tx.sent_offset != qcs->tx.offset) {
+ int ret;
+ char fin = !!(qcs->flags & QC_SF_FIN_STREAM);
+
+ /* FIN is set if all incoming data were transfered. */
+ fin = !!(fin && !b_data(buf));
+
+ ret = qcs_build_stream_frm(qcs, out, fin, frms);
+ BUG_ON(ret < 0); /* TODO handle this properly */
+ }
+
+ return xfer;
+}
+
/* Proceed to sending. Loop through all available streams for the <qcc>
* instance and try to send as much as possible.
*
{
struct list frms = LIST_HEAD_INIT(frms);
struct eb64_node *node;
- int total = 0;
+ struct qcs *qcs, *qcs_tmp;
+ int total = 0, tmp_total = 0;
TRACE_ENTER(QMUX_EV_QCC_SEND);
*/
node = eb64_first(&qcc->streams_by_id);
while (node) {
- struct qcs *qcs = eb64_entry(node, struct qcs, by_id);
- struct buffer *buf = &qcs->tx.buf;
- struct buffer *out = qc_stream_buf_get(qcs->stream);
+ int ret;
+ qcs = eb64_entry(node, struct qcs, by_id);
/* TODO
* for the moment, unidirectional streams have their own
continue;
}
- if (!b_data(buf) && !out) {
+ if (!b_data(&qcs->tx.buf) && !qc_stream_buf_get(qcs->stream)) {
node = eb64_next(node);
continue;
}
- if (!out && (qcc->flags & QC_CF_CONN_FULL)) {
- node = eb64_next(node);
- continue;
- }
-
- if (!out) {
- out = qc_stream_buf_alloc(qcs->stream, qcs->tx.offset);
- if (!out) {
- qcc->flags |= QC_CF_CONN_FULL;
- node = eb64_next(node);
- continue;
- }
- }
-
- /* Prepare <out> buffer with data from <buf>. */
- if (b_data(buf)) {
- int ret = qcs_xfer_data(qcs, out, buf,
- qcc->tx.sent_offsets + total);
- BUG_ON(ret < 0); /* TODO handle this properly */
-
- if (ret > 0) {
- qcs_notify_send(qcs);
- if (qcs->flags & QC_SF_BLK_MROOM)
- qcs->flags &= ~QC_SF_BLK_MROOM;
- }
-
- qcs->tx.offset += ret;
- total += ret;
- }
-
- /* Subscribe if not all data can be transfered. */
- if (b_data(buf)) {
- qcc->conn->xprt->subscribe(qcc->conn, qcc->conn->xprt_ctx,
- SUB_RETRY_SEND, &qcc->wait_event);
- }
+ ret = _qc_send_qcs(qcs, &frms, qcc->tx.sent_offsets + total);
+ total += ret;
+ node = eb64_next(node);
+ }
- /* Build a new STREAM frame with <out> buffer. */
- if (b_data(out) && qcs->tx.sent_offset != qcs->tx.offset) {
- int ret;
- char fin = !!(qcs->flags & QC_SF_FIN_STREAM);
+ if (qc_send_frames(qcc, &frms)) {
+ /* data rejected by transport layer, do not retry. */
+ goto out;
+ }
- /* FIN is set if all incoming data were transfered. */
- fin = !!(fin && !b_data(buf));
- ret = qcs_build_stream_frm(qcs, out, fin, &frms);
- BUG_ON(ret < 0); /* TODO handle this properly */
- }
+ retry:
+ tmp_total = 0;
+ list_for_each_entry_safe(qcs, qcs_tmp, &qcc->send_retry_list, el) {
+ int ret;
+ BUG_ON(!b_data(&qcs->tx.buf));
+ BUG_ON(qc_stream_buf_get(qcs->stream));
- node = eb64_next(node);
+ ret = _qc_send_qcs(qcs, &frms, qcc->tx.sent_offsets + tmp_total);
+ tmp_total += ret;
+ LIST_DELETE(&qcs->el);
}
- qc_send_frames(qcc, &frms);
+ total += tmp_total;
+ if (!qc_send_frames(qcc, &frms) && !LIST_ISEMPTY(&qcc->send_retry_list))
+ goto retry;
+ out:
TRACE_LEAVE(QMUX_EV_QCC_SEND);
return total;
if (!qcc->wait_event.tasklet)
goto fail_no_tasklet;
+ LIST_INIT(&qcc->send_retry_list);
+
qcc->subs = NULL;
qcc->wait_event.tasklet->process = qc_io_cb;
qcc->wait_event.tasklet->context = qcc;