struct {
struct quic_fctl fc; /* stream flow control applied on sending */
+ int avail_bufs; /* count of available buffers for this connection */
} tx;
uint64_t largest_bidi_r; /* largest remote bidi stream ID opened. */
int qcs_subscribe(struct qcs *qcs, int event_type, struct wait_event *es);
void qcs_notify_recv(struct qcs *qcs);
void qcs_notify_send(struct qcs *qcs);
-int qcc_notify_buf(struct qcc *qcc);
+void qcc_notify_buf(struct qcc *qcc, int free_count);
struct buffer *qcc_get_stream_rxbuf(struct qcs *qcs);
struct buffer *qcc_get_stream_txbuf(struct qcs *qcs, int *err);
struct mt_list accept_list; /* chaining element used for accept, only valid for frontend connections */
struct eb_root streams_by_id; /* qc_stream_desc tree */
- int stream_buf_count; /* total count of allocated stream buffers for this connection */
/* MUX */
struct qcc *qcc;
struct buffer *qc_stream_buf_get(struct qc_stream_desc *stream);
struct buffer *qc_stream_buf_alloc(struct qc_stream_desc *stream,
- uint64_t offset, int *avail);
+ uint64_t offset);
void qc_stream_buf_release(struct qc_stream_desc *stream);
#endif /* USE_QUIC */
#include <haproxy/chunk.h>
#include <haproxy/connection.h>
#include <haproxy/dynbuf.h>
+#include <haproxy/global-t.h>
#include <haproxy/h3.h>
#include <haproxy/list.h>
#include <haproxy/ncbuf.h>
}
}
-/* Notify on a new stream-desc buffer available for <qcc> connection.
- *
- * Returns true if a stream was woken up. If false is returned, this indicates
- * to the caller that it's currently unnecessary to notify for the rest of the
- * available buffers.
+/* Report that <free_count> stream-desc buffer have been released for <qcc>
+ * connection.
*/
-int qcc_notify_buf(struct qcc *qcc)
+void qcc_notify_buf(struct qcc *qcc, int free_count)
{
struct qcs *qcs;
- int ret = 0;
TRACE_ENTER(QMUX_EV_QCC_WAKE, qcc->conn);
+ BUG_ON(qcc->tx.avail_bufs + free_count > global.tune.quic_streams_buf);
+ qcc->tx.avail_bufs += free_count;
+
if (qcc->flags & QC_CF_CONN_FULL) {
TRACE_STATE("new stream desc buffer available", QMUX_EV_QCC_WAKE, qcc->conn);
qcc->flags &= ~QC_CF_CONN_FULL;
}
- if (!LIST_ISEMPTY(&qcc->buf_wait_list)) {
+ /* TODO a simple optimization would be to only wake up <free_count> QCS
+ * instances. But it may not work if a woken QCS is in error and does
+ * not try to allocate a buffer, leaving the unwoken QCS indefinitely
+ * in the buflist.
+ */
+ while (!LIST_ISEMPTY(&qcc->buf_wait_list)) {
qcs = LIST_ELEM(qcc->buf_wait_list.n, struct qcs *, el_buf);
LIST_DEL_INIT(&qcs->el_buf);
tot_time_stop(&qcs->timer.buf);
qcs_notify_send(qcs);
- ret = 1;
}
TRACE_LEAVE(QMUX_EV_QCC_WAKE, qcc->conn);
- return ret;
}
/* A fatal error is detected locally for <qcc> connection. It should be closed
struct buffer *qcc_get_stream_txbuf(struct qcs *qcs, int *err)
{
struct qcc *qcc = qcs->qcc;
- int buf_avail;
struct buffer *out = qc_stream_buf_get(qcs->stream);
/* Stream must not try to reallocate a buffer if currently waiting for one. */
goto out;
}
- out = qc_stream_buf_alloc(qcs->stream, qcs->tx.fc.off_real,
- &buf_avail);
- if (!out) {
- if (buf_avail) {
- TRACE_ERROR("stream desc alloc failure", QMUX_EV_QCS_SEND, qcc->conn, qcs);
- *err = 1;
- goto out;
- }
-
+ if (!qcc->tx.avail_bufs) {
TRACE_STATE("hitting stream desc buffer limit", QMUX_EV_QCS_SEND, qcc->conn, qcs);
LIST_APPEND(&qcc->buf_wait_list, &qcs->el_buf);
tot_time_start(&qcs->timer.buf);
qcc->flags |= QC_CF_CONN_FULL;
goto out;
}
+
+ out = qc_stream_buf_alloc(qcs->stream, qcs->tx.fc.off_real);
+ if (!out) {
+ TRACE_ERROR("stream desc alloc failure", QMUX_EV_QCS_SEND, qcc->conn, qcs);
+ *err = 1;
+ goto out;
+ }
+
+ --qcc->tx.avail_bufs;
}
out:
qcc->rfctl.msd_bidi_r = rparams->initial_max_stream_data_bidi_remote;
qcc->rfctl.msd_uni_l = rparams->initial_max_stream_data_uni;
+ qcc->tx.avail_bufs = global.tune.quic_streams_buf;
+
if (conn_is_back(conn)) {
qcc->next_bidi_l = 0x00;
qcc->largest_bidi_r = 0x01;
quic_cc_path_init(qc->path, ipv4, server ? l->bind_conf->max_cwnd : 0,
cc_algo ? cc_algo : default_quic_cc_algo, qc);
- qc->stream_buf_count = 0;
memcpy(&qc->local_addr, local_addr, sizeof(qc->local_addr));
memcpy(&qc->peer_addr, peer_addr, sizeof qc->peer_addr);
*stream_buf = NULL;
/* notify MUX about available buffers. */
- --qc->stream_buf_count;
if (qc->mux_state == QC_MUX_READY) {
- /* notify MUX about available buffers.
- *
- * TODO several streams may be woken up even if a single buffer
- * is available for now.
- */
- while (qcc_notify_buf(qc->qcc))
- ;
+ /* notify MUX about available buffers. */
+ qcc_notify_buf(qc->qcc, 1);
}
}
if (free_count) {
offer_buffers(NULL, free_count);
- qc->stream_buf_count -= free_count;
if (qc->mux_state == QC_MUX_READY) {
- /* notify MUX about available buffers.
- *
- * TODO several streams may be woken up even if a single buffer
- * is available for now.
- */
- while (qcc_notify_buf(qc->qcc))
- ;
+ /* notify MUX about available buffers. */
+ qcc_notify_buf(qc->qcc, free_count);
}
}
return &stream->buf->buf;
}
-/* Returns the count of available buffer left for <qc>. */
-static int qc_stream_buf_avail(struct quic_conn *qc)
-{
- BUG_ON(qc->stream_buf_count > global.tune.quic_streams_buf);
- return global.tune.quic_streams_buf - qc->stream_buf_count;
-}
-
-/* Allocate a new current buffer for <stream>. The buffer limit count for the
- * connection is checked first. This function is not allowed if current buffer
- * is not NULL prior to this call. The new buffer represents stream payload at
- * offset <offset>.
+/* Allocate a new current buffer for <stream>. This function is not allowed if
+ * current buffer is not NULL prior to this call. The new buffer represents
+ * stream payload at offset <offset>.
*
- * Returns the buffer or NULL on error. Caller may check <avail> to ensure if
- * the connection buffer limit was reached or a fatal error was encountered.
+ * Returns the buffer or NULL on error.
*/
struct buffer *qc_stream_buf_alloc(struct qc_stream_desc *stream,
- uint64_t offset, int *avail)
+ uint64_t offset)
{
- struct quic_conn *qc = stream->qc;
-
/* current buffer must be released first before allocate a new one. */
BUG_ON(stream->buf);
- *avail = qc_stream_buf_avail(qc);
- if (!*avail)
- return NULL;
-
stream->buf_offset = offset;
stream->buf = pool_alloc(pool_head_quic_stream_buf);
if (!stream->buf)
return NULL;
+ stream->buf->buf = BUF_NULL;
if (!b_alloc(&stream->buf->buf, DB_MUX_TX)) {
pool_free(pool_head_quic_stream_buf, stream->buf);
stream->buf = NULL;
return NULL;
}
- ++qc->stream_buf_count;
LIST_APPEND(&stream->buf_list, &stream->buf->list);
return &stream->buf->buf;