#define QC_CF_ERRL 0x00000001 /* fatal error detected locally, connection should be closed soon */
#define QC_CF_ERRL_DONE 0x00000002 /* local error properly handled, connection can be released */
/* unused 0x00000004 */
-/* unused 0x00000008 */
+#define QC_CF_CONN_FULL 0x00000008 /* no stream buffers available on connection */
#define QC_CF_APP_SHUT 0x00000010 /* Application layer shutdown done. */
#define QC_CF_ERR_CONN 0x00000020 /* fatal error reported by transport layer */
struct list send_retry_list; /* list of qcs eligible to send retry */
struct list send_list; /* list of qcs ready to send (STREAM, STOP_SENDING or RESET_STREAM emission) */
struct list fctl_list; /* list of sending qcs blocked on conn flow control */
+ struct list buf_wait_list; /* list of qcs blocked on stream desc buf */
struct wait_event wait_event; /* To be used if we're waiting for I/Os */
struct list el_send; /* element of qcc.send_list */
struct list el_opening; /* element of qcc.opening_list */
struct list el_fctl; /* element of qcc.fctl_list */
+ struct list el_buf; /* element of qcc.buf_wait_list */
struct wait_event wait_event;
struct wait_event *subs;
int qcs_subscribe(struct qcs *qcs, int event_type, struct wait_event *es);
void qcs_notify_recv(struct qcs *qcs);
void qcs_notify_send(struct qcs *qcs);
+int qcc_notify_buf(struct qcc *qcc);
struct buffer *qcc_get_stream_rxbuf(struct qcs *qcs);
-struct buffer *qcc_get_stream_txbuf(struct qcs *qcs);
+struct buffer *qcc_get_stream_txbuf(struct qcs *qcs, int *err);
int qcc_release_stream_txbuf(struct qcs *qcs);
int qcc_stream_can_send(const struct qcs *qcs);
void qcc_reset_stream(struct qcs *qcs, int err);
*/
static int h3_control_send(struct qcs *qcs, void *ctx)
{
+ int err;
int ret;
struct h3c *h3c = ctx;
unsigned char data[(2 + 3) * 2 * QUIC_VARINT_MAX_SIZE]; /* enough for 3 settings */
goto err;
}
- if (!(res = qcc_get_stream_txbuf(qcs))) {
+ if (!(res = qcc_get_stream_txbuf(qcs, &err))) {
+ /* Consider alloc failure fatal for control stream even on conn buf limit. */
TRACE_ERROR("cannot allocate Tx buffer", H3_EV_TX_FRAME|H3_EV_TX_SETTINGS, qcs->qcc->conn, qcs);
goto err;
}
static int h3_resp_headers_send(struct qcs *qcs, struct htx *htx)
{
+ int err;
struct h3s *h3s = qcs->ctx;
struct h3c *h3c = h3s->h3c;
struct buffer outbuf;
list[hdr].n = ist("");
- if (!(res = qcc_get_stream_txbuf(qcs))) {
- TRACE_ERROR("cannot allocate Tx buffer", H3_EV_TX_FRAME|H3_EV_TX_HDR, qcs->qcc->conn, qcs);
- h3c->err = H3_INTERNAL_ERROR;
- goto err;
+ if (!(res = qcc_get_stream_txbuf(qcs, &err))) {
+ if (err) {
+ TRACE_ERROR("cannot allocate Tx buffer", H3_EV_TX_FRAME|H3_EV_TX_HDR, qcs->qcc->conn, qcs);
+ h3c->err = H3_INTERNAL_ERROR;
+ goto err;
+ }
+
+ TRACE_STATE("conn buf limit reached", H3_EV_TX_FRAME|H3_EV_TX_HDR, qcs->qcc->conn, qcs);
+ goto end;
}
/* At least 5 bytes to store frame type + length as a varint max size */
break;
}
+ end:
TRACE_LEAVE(H3_EV_TX_FRAME|H3_EV_TX_HDR, qcs->qcc->conn, qcs);
return ret;
*/
static int h3_resp_trailers_send(struct qcs *qcs, struct htx *htx)
{
+ int err;
struct h3s *h3s = qcs->ctx;
struct h3c *h3c = h3s->h3c;
struct buffer headers_buf = BUF_NULL;
list[hdr].n = ist("");
start:
- if (!(res = qcc_get_stream_txbuf(qcs))) {
- TRACE_ERROR("cannot allocate Tx buffer", H3_EV_TX_FRAME|H3_EV_TX_HDR, qcs->qcc->conn, qcs);
- h3c->err = H3_INTERNAL_ERROR;
- goto err;
+ if (!(res = qcc_get_stream_txbuf(qcs, &err))) {
+ if (err) {
+ TRACE_ERROR("cannot allocate Tx buffer", H3_EV_TX_FRAME|H3_EV_TX_HDR, qcs->qcc->conn, qcs);
+ h3c->err = H3_INTERNAL_ERROR;
+ goto err;
+ }
+
+ TRACE_STATE("conn buf limit reached", H3_EV_TX_FRAME|H3_EV_TX_HDR, qcs->qcc->conn, qcs);
+ goto end;
}
/* At least 9 bytes to store frame type + length as a varint max size */
static int h3_resp_data_send(struct qcs *qcs, struct htx *htx,
struct buffer *buf, size_t count)
{
+ int err;
struct h3s *h3s = qcs->ctx;
struct h3c *h3c = h3s->h3c;
struct buffer outbuf;
if (type != HTX_BLK_DATA)
goto end;
- if (!(res = qcc_get_stream_txbuf(qcs))) {
- TRACE_ERROR("cannot allocate Tx buffer", H3_EV_TX_FRAME|H3_EV_TX_DATA, qcs->qcc->conn, qcs);
- h3c->err = H3_INTERNAL_ERROR;
- goto err;
+ if (!(res = qcc_get_stream_txbuf(qcs, &err))) {
+ if (err) {
+ TRACE_ERROR("cannot allocate Tx buffer", H3_EV_TX_FRAME|H3_EV_TX_DATA, qcs->qcc->conn, qcs);
+ h3c->err = H3_INTERNAL_ERROR;
+ goto err;
+ }
+
+ /* Connection buf limit reached, stconn will subscribe on SEND. */
+ TRACE_STATE("conn buf limit reached", H3_EV_TX_FRAME|H3_EV_TX_HDR, qcs->qcc->conn, qcs);
+ goto end;
}
/* If HTX contains only one DATA block, try to exchange it with MUX
static size_t h3_nego_ff(struct qcs *qcs, size_t count)
{
+ int err;
struct buffer *res;
int hsize;
size_t sz, ret = 0;
TRACE_ENTER(H3_EV_STRM_SEND, qcs->qcc->conn, qcs);
start:
- if (!(res = qcc_get_stream_txbuf(qcs))) {
- qcs->sd->iobuf.flags |= IOBUF_FL_NO_FF;
+ if (!(res = qcc_get_stream_txbuf(qcs, &err))) {
+ if (err) {
+ qcs->sd->iobuf.flags |= IOBUF_FL_NO_FF;
+ goto end;
+ }
+
+ qcs->sd->iobuf.flags |= IOBUF_FL_FF_BLOCKED;
goto end;
}
*/
static int h3_send_goaway(struct h3c *h3c)
{
+ int err;
struct qcs *qcs = h3c->ctrl_strm;
struct buffer pos, *res;
unsigned char data[3 * QUIC_VARINT_MAX_SIZE];
b_quic_enc_int(&pos, frm_len, 0);
b_quic_enc_int(&pos, h3c->id_goaway, 0);
- res = qcc_get_stream_txbuf(qcs);
+ res = qcc_get_stream_txbuf(qcs, &err);
if (!res || b_room(res) < b_data(&pos) ||
qfctl_sblocked(&qcs->tx.fc) || qfctl_sblocked(&h3c->qcc->tx.fc)) {
- /* Do not try forcefully to emit GOAWAY if no space left. */
+ /* Do not try forcefully to emit GOAWAY if no buffer available or not enough space left. */
TRACE_ERROR("cannot send GOAWAY", H3_EV_H3C_END, h3c->qcc->conn, qcs);
goto err;
}
uint32_t bsize, fsize;
struct buffer *res = NULL;
size_t total = 0;
+ int err;
htx = htx_from_buf(buf);
switch (btype) {
case HTX_BLK_DATA:
- res = qcc_get_stream_txbuf(qcs);
+ res = qcc_get_stream_txbuf(qcs, &err);
if (!res) {
- /* TODO */
- ABORT_NOW();
+ if (err)
+ ABORT_NOW();
+ goto end;
}
if (unlikely(fsize == count &&
static size_t hq_interop_nego_ff(struct qcs *qcs, size_t count)
{
- int ret = 0;
+ int err, ret = 0;
struct buffer *res;
start:
- res = qcc_get_stream_txbuf(qcs);
+ res = qcc_get_stream_txbuf(qcs, &err);
if (!res) {
+ if (err)
+ ABORT_NOW();
qcs->sd->iobuf.flags |= IOBUF_FL_FF_BLOCKED;
goto end;
- /* TODO */
- ABORT_NOW();
}
if (!b_room(res)) {
LIST_DEL_INIT(&qcs->el_opening);
LIST_DEL_INIT(&qcs->el_send);
LIST_DEL_INIT(&qcs->el_fctl);
+ LIST_DEL_INIT(&qcs->el_buf);
/* Release stream endpoint descriptor. */
BUG_ON(qcs->sd && !se_fl_test(qcs->sd, SE_FL_ORPHAN));
LIST_INIT(&qcs->el_opening);
LIST_INIT(&qcs->el_send);
LIST_INIT(&qcs->el_fctl);
+ LIST_INIT(&qcs->el_buf);
qcs->start = TICK_ETERNITY;
/* store transport layer stream descriptor in qcc tree */
}
}
+/* Notify on a new stream-desc buffer available for <qcc> connection.
+ *
+ * Returns true if a stream was woken up. If false is returned, this indicates
+ * to the caller that it's currently unnecessary to notify for the rest of the
+ * available buffers.
+ */
+int qcc_notify_buf(struct qcc *qcc)
+{
+ struct qcs *qcs;
+ int ret = 0;
+
+ TRACE_ENTER(QMUX_EV_QCC_WAKE, qcc->conn);
+
+ if (qcc->flags & QC_CF_CONN_FULL) {
+ TRACE_STATE("new stream desc buffer available", QMUX_EV_QCC_WAKE, qcc->conn);
+ qcc->flags &= ~QC_CF_CONN_FULL;
+ }
+
+ if (!LIST_ISEMPTY(&qcc->buf_wait_list)) {
+ qcs = LIST_ELEM(qcc->buf_wait_list.n, struct qcs *, el_buf);
+ LIST_DEL_INIT(&qcs->el_buf);
+ qcs_notify_send(qcs);
+ ret = 1;
+ }
+
+ TRACE_LEAVE(QMUX_EV_QCC_WAKE, qcc->conn);
+ return ret;
+}
+
/* A fatal error is detected locally for <qcc> connection. It should be closed
* with a CONNECTION_CLOSE using <err> code. Set <app> to true to indicate that
* the code must be considered as an application level error. This function
/* Allocate if needed and retrieve <qcs> stream buffer for data emission.
*
- * Returns buffer pointer. May be NULL on allocation failure.
+ * <err> is an output argument which is useful to differentiate the failure
+ * cause when the buffer cannot be allocated. It is set to 0 if the connection
+ * buffer limit is reached. For fatal errors, its value is non-zero.
+ *
+ * Returns buffer pointer. May be NULL on allocation failure, in which case
+ * <err> will refer to the cause.
*/
-struct buffer *qcc_get_stream_txbuf(struct qcs *qcs)
+struct buffer *qcc_get_stream_txbuf(struct qcs *qcs, int *err)
{
struct qcc *qcc = qcs->qcc;
int buf_avail;
struct buffer *out = qc_stream_buf_get(qcs->stream);
+ /* Stream must not try to reallocate a buffer if currently waiting for one. */
+ BUG_ON(LIST_INLIST(&qcs->el_buf));
+
+ *err = 0;
+
if (!out) {
+ if (qcc->flags & QC_CF_CONN_FULL) {
+ LIST_APPEND(&qcc->buf_wait_list, &qcs->el_buf);
+ goto out;
+ }
+
out = qc_stream_buf_alloc(qcs->stream, qcs->tx.fc.off_real,
&buf_avail);
- if (!out)
+ if (!out) {
+ if (buf_avail) {
+ TRACE_ERROR("stream desc alloc failure", QMUX_EV_QCS_SEND, qcc->conn, qcs);
+ *err = 1;
+ goto out;
+ }
+
+ TRACE_STATE("hitting stream desc buffer limit", QMUX_EV_QCS_SEND, qcc->conn, qcs);
+ LIST_APPEND(&qcc->buf_wait_list, &qcs->el_buf);
+ qcc->flags |= QC_CF_CONN_FULL;
goto out;
+ }
if (!b_alloc(out)) {
TRACE_ERROR("buffer alloc failure", QMUX_EV_QCS_SEND, qcc->conn, qcs);
+ *err = 1;
goto out;
}
}
/* Returns true if stream layer can proceed to emission via <qcs>. */
int qcc_stream_can_send(const struct qcs *qcs)
{
- return !(qcs->flags & QC_SF_BLK_MROOM);
+ return !(qcs->flags & QC_SF_BLK_MROOM) && !LIST_INLIST(&qcs->el_buf);
}
/* Wakes up every streams of <qcc> which are currently waiting for sending but
if ((qcs->flags & QC_SF_TO_RESET) || qcs_is_close_local(qcs))
return;
+ /* TODO if QCS waiting for buffer, it could be removed from
+ * <qcc.buf_wait_list> if sending is closed now.
+ */
+
TRACE_STATE("reset stream", QMUX_EV_QCS_END, qcc->conn, qcs);
qcs->flags |= QC_SF_TO_RESET;
qcs->err = err;
LIST_INIT(&qcc->send_list);
LIST_INIT(&qcc->fctl_list);
+ LIST_INIT(&qcc->buf_wait_list);
qcc->wait_event.tasklet->process = qcc_io_cb;
qcc->wait_event.tasklet->context = qcc;
TRACE_ENTER(QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs);
+ /* Stream must not be woken up if already waiting for conn buffer. */
+ BUG_ON(LIST_INLIST(&qcs->el_buf));
+
/* Sending forbidden if QCS is locally closed (FIN or RESET_STREAM sent). */
BUG_ON(qcs_is_close_local(qcs) || (qcs->flags & QC_SF_TO_RESET));
TRACE_ENTER(QMUX_EV_STRM_SEND, qcs->qcc->conn, qcs);
+ /* Stream must not be woken up if already waiting for conn buffer. */
+ BUG_ON(LIST_INLIST(&qcs->el_buf));
+
/* Sending forbidden if QCS is locally closed (FIN or RESET_STREAM sent). */
BUG_ON(qcs_is_close_local(qcs) || (qcs->flags & QC_SF_TO_RESET));
#include <haproxy/buf.h>
#include <haproxy/dynbuf.h>
#include <haproxy/list.h>
-#include <haproxy/mux_quic-t.h>
+#include <haproxy/mux_quic.h>
#include <haproxy/pool.h>
#include <haproxy/quic_conn.h>
#include <haproxy/task.h>
/* notify MUX about available buffers. */
--qc->stream_buf_count;
if (qc->mux_state == QC_MUX_READY) {
- /* TODO notify MUX for available buffer. */
+ /* notify MUX about available buffers.
+ *
+ * TODO several streams may be woken up even if a single buffer
+ * is available for now.
+ */
+ while (qcc_notify_buf(qc->qcc))
+ ;
}
}
qc->stream_buf_count -= free_count;
if (qc->mux_state == QC_MUX_READY) {
- /* TODO notify MUX for available buffer. */
+ /* notify MUX about available buffers.
+ *
+ * TODO several streams may be woken up even if a single buffer
+ * is available for now.
+ */
+ while (qcc_notify_buf(qc->qcc))
+ ;
}
}