#include <haproxy/qpack-dec.h>
#include <haproxy/qpack-enc.h>
#include <haproxy/quic_enc.h>
+#include <haproxy/quic_fctl.h>
#include <haproxy/quic_frame.h>
#include <haproxy/stats-t.h>
#include <haproxy/tools.h>
b_quic_enc_int(&pos, h3_settings_max_field_section_size, 0);
}
+ if (qfctl_sblocked(&qcs->tx.fc)) {
+ TRACE_ERROR("not enough initial credit for control stream", H3_EV_TX_FRAME|H3_EV_TX_SETTINGS, qcs->qcc->conn, qcs);
+ goto err;
+ }
+
if (!(res = qcc_get_stream_txbuf(qcs))) {
TRACE_ERROR("cannot allocate Tx buffer", H3_EV_TX_FRAME|H3_EV_TX_SETTINGS, qcs->qcc->conn, qcs);
goto err;
b_quic_enc_int(&pos, h3c->id_goaway, 0);
res = qcc_get_stream_txbuf(qcs);
- if (!res || b_room(res) < b_data(&pos)) {
+ if (!res || b_room(res) < b_data(&pos) ||
+ qfctl_sblocked(&qcs->tx.fc)) {
/* Do not try forcefully to emit GOAWAY if no space left. */
TRACE_ERROR("cannot send GOAWAY", H3_EV_H3C_END, h3c->qcc->conn, qcs);
goto err;
#include <haproxy/qmux_http.h>
#include <haproxy/qmux_trace.h>
#include <haproxy/quic_conn.h>
+#include <haproxy/quic_fctl.h>
#include <haproxy/quic_frame.h>
#include <haproxy/quic_sock.h>
#include <haproxy/quic_stream.h>
qcs->id = qcs->by_id.key = id;
eb64_insert(&qcc->streams_by_id, &qcs->by_id);
- /* If stream is local, use peer remote-limit, or else the opposite. */
+ /* Different limits can be set by the peer for local and remote bidi streams. */
if (quic_stream_is_bidi(id)) {
- qcs->tx.msd = quic_stream_is_local(qcc, id) ? qcc->rfctl.msd_bidi_r :
- qcc->rfctl.msd_bidi_l;
+ qfctl_init(&qcs->tx.fc, quic_stream_is_local(qcc, id) ?
+ qcc->rfctl.msd_bidi_r : qcc->rfctl.msd_bidi_l);
}
else if (quic_stream_is_local(qcc, id)) {
- qcs->tx.msd = qcc->rfctl.msd_uni_l;
+ qfctl_init(&qcs->tx.fc, qcc->rfctl.msd_uni_l);
}
- /* Properly set flow-control blocking if initial MSD is nul. */
- if (!qcs->tx.msd)
- qcs->flags |= QC_SF_BLK_SFCTL;
-
qcs->rx.ncbuf = NCBUF_NULL;
qcs->rx.app_buf = BUF_NULL;
qcs->rx.offset = qcs->rx.offset_max = 0;
/* Register <qcs> stream for emission of STREAM, STOP_SENDING or RESET_STREAM.
* Set <urg> to 1 if stream content should be treated in priority compared to
* other streams. For STREAM emission, <count> must contains the size of the
- * frame payload.
+ * frame payload. This is used for flow control accounting.
*/
void qcc_send_stream(struct qcs *qcs, int urg, int count)
{
LIST_APPEND(&qcs->qcc->send_list, &qcs->el_send);
}
+ if (count)
+ qfctl_sinc(&qcs->tx.fc, count);
+
TRACE_LEAVE(QMUX_EV_QCS_SEND, qcc->conn, qcs);
}
goto err;
if (qcs) {
+ int unblock_soft = 0, unblock_real = 0;
+
TRACE_PROTO("receiving MAX_STREAM_DATA", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV, qcc->conn, qcs);
- if (max > qcs->tx.msd) {
- qcs->tx.msd = max;
+ if (qfctl_set_max(&qcs->tx.fc, max, &unblock_soft, &unblock_real)) {
TRACE_DATA("increase remote max-stream-data", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV, qcc->conn, qcs);
-
- if (qcs->flags & QC_SF_BLK_SFCTL) {
- qcs->flags &= ~QC_SF_BLK_SFCTL;
+ if (unblock_real) {
/* TODO optim: only wakeup IO-CB if stream has data to sent. */
tasklet_wakeup(qcc->wait_event.tasklet);
}
+
+ if (unblock_soft)
+ qcs_notify_send(qcs);
}
}
left = qcs->tx.offset - qcs->tx.sent_offset;
to_xfer = QUIC_MIN(b_data(in), b_room(out));
- BUG_ON_HOT(qcs->tx.offset > qcs->tx.msd);
+ BUG_ON_HOT(qcs->tx.offset > qcs->tx.fc.limit);
/* do not exceed flow control limit */
- if (qcs->tx.offset + to_xfer > qcs->tx.msd) {
+ if (qcs->tx.offset + to_xfer > qcs->tx.fc.limit) {
TRACE_DATA("do not exceed stream flow control", QMUX_EV_QCS_SEND, qcc->conn, qcs);
- to_xfer = qcs->tx.msd - qcs->tx.offset;
+ to_xfer = qcs->tx.fc.limit - qcs->tx.offset;
}
BUG_ON_HOT(qcc->tx.offsets > qcc->rfctl.md);
diff = offset + data - qcs->tx.sent_offset;
if (diff) {
+ struct quic_fctl *fc_strm = &qcs->tx.fc;
+
+ /* Ensure real offset never exceeds soft value. */
+ BUG_ON(fc_strm->off_real + diff > fc_strm->off_soft);
+
/* increase offset sum on connection */
qcc->tx.sent_offsets += diff;
BUG_ON_HOT(qcc->tx.sent_offsets > qcc->rfctl.md);
/* increase offset on stream */
qcs->tx.sent_offset += diff;
- BUG_ON_HOT(qcs->tx.sent_offset > qcs->tx.msd);
BUG_ON_HOT(qcs->tx.sent_offset > qcs->tx.offset);
- if (qcs->tx.sent_offset == qcs->tx.msd) {
- qcs->flags |= QC_SF_BLK_SFCTL;
- TRACE_STATE("stream flow-control reached", QMUX_EV_QCS_SEND, qcc->conn, qcs);
+ if (qfctl_rinc(fc_strm, diff)) {
+ TRACE_STATE("stream flow-control reached",
+ QMUX_EV_QCS_SEND, qcc->conn, qcs);
}
/* If qcs.stream.buf is full, release it to the lower layer. */
}
qcs->tx.offset += xfer;
- BUG_ON_HOT(qcs->tx.offset > qcs->tx.msd);
qcc->tx.offsets += xfer;
- BUG_ON_HOT(qcc->tx.offsets > qcc->rfctl.md);
/* out buffer cannot be emptied if qcs offsets differ. */
BUG_ON(!b_data(out) && qcs->tx.sent_offset != qcs->tx.offset);
}
if (!(qcc->flags & QC_CF_BLK_MFCTL) &&
- !(qcs->flags & QC_SF_BLK_SFCTL)) {
+ !qfctl_rblocked(&qcs->tx.fc)) {
if ((ret = qcs_send(qcs, &frms)) < 0) {
/* Temporarily remove QCS from send-list. */
LIST_DEL_INIT(&qcs->el_send);
* new qc_stream_desc should be present in send_list as
* long as transport layer can handle all data.
*/
- BUG_ON(qcs->stream->buf && !(qcs->flags & QC_SF_BLK_SFCTL));
+ BUG_ON(qcs->stream->buf && !qfctl_rblocked(&qcs->tx.fc));
- if (!(qcs->flags & QC_SF_BLK_SFCTL)) {
+ if (!qfctl_rblocked(&qcs->tx.fc)) {
if ((ret = qcs_send(qcs, &frms)) < 0) {
LIST_DEL_INIT(&qcs->el_send);
LIST_APPEND(&qcs_failed, &qcs->el_send);
goto end;
}
+ if (qfctl_sblocked(&qcs->tx.fc)) {
+ TRACE_DEVEL("leaving on flow-control reached",
+ QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs);
+ goto end;
+ }
+
ret = qcs_http_snd_buf(qcs, buf, count, &fin);
if (fin) {
TRACE_STATE("reached stream fin", QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs);
goto end;
}
+ if (qfctl_sblocked(&qcs->tx.fc)) {
+ TRACE_DEVEL("leaving on flow-control reached", QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs);
+ qcs->sd->iobuf.flags |= IOBUF_FL_FF_BLOCKED;
+ goto end;
+ }
+
/* Alawys disable splicing */
qcs->sd->iobuf.flags |= IOBUF_FL_NO_SPLICING;