#include <haproxy/intops.h>
#include <haproxy/istbuf.h>
#include <haproxy/mux_quic.h>
-#include <haproxy/ncbuf.h>
#include <haproxy/pool.h>
#include <haproxy/qpack-dec.h>
#include <haproxy/qpack-enc.h>
DECLARE_STATIC_POOL(pool_head_h3s, "h3s", sizeof(struct h3s));
-/* Simple function to duplicate a buffer */
-static inline struct buffer h3_b_dup(const struct ncbuf *b)
-{
- return b_make(ncb_orig(b), b->size, b->head, ncb_data(b, 0));
-}
-
-/* Initialize an uni-stream <qcs> by reading its type from <rxbuf>.
+/* Initialize an uni-stream <qcs> by reading its type from <b>.
*
* Returns 0 on success else non-zero.
*/
static int h3_init_uni_stream(struct h3c *h3c, struct qcs *qcs,
- struct ncbuf *rxbuf)
+ struct buffer *b)
{
/* decode unidirectional stream type */
struct h3s *h3s = qcs->ctx;
- struct buffer b;
uint64_t type;
size_t len = 0, ret;
BUG_ON_HOT(!quic_stream_is_uni(qcs->id) ||
h3s->flags & H3_SF_UNI_INIT);
- b = h3_b_dup(rxbuf);
- ret = b_quic_dec_int(&type, &b, &len);
+ ret = b_quic_dec_int(&type, b, &len);
if (!ret) {
ABORT_NOW();
}
};
h3s->flags |= H3_SF_UNI_INIT;
- qcs_consume(qcs, len);
TRACE_LEAVE(H3_EV_H3S_NEW, qcs->qcc->conn, qcs);
return 0;
*
* Returns 0 on success else non-zero.
*/
-static int h3_parse_uni_stream_no_h3(struct qcs *qcs, struct ncbuf *rxbuf)
+static int h3_parse_uni_stream_no_h3(struct qcs *qcs, struct buffer *b)
{
struct h3s *h3s = qcs->ctx;
* consumed.
*/
static inline size_t h3_decode_frm_header(uint64_t *ftype, uint64_t *flen,
- struct ncbuf *rxbuf)
+ struct buffer *b)
{
size_t hlen;
- struct buffer b = h3_b_dup(rxbuf);
hlen = 0;
- if (!b_quic_dec_int(ftype, &b, &hlen) ||
- !b_quic_dec_int(flen, &b, &hlen)) {
+ if (!b_quic_dec_int(ftype, b, &hlen) ||
+ !b_quic_dec_int(flen, b, &hlen)) {
return 0;
}
*
* Returns the number of bytes handled or a negative error code.
*/
-static int h3_headers_to_htx(struct qcs *qcs, struct ncbuf *buf, uint64_t len,
- char fin)
+static int h3_headers_to_htx(struct qcs *qcs, const struct buffer *buf,
+ uint64_t len, char fin)
{
struct buffer htx_buf = BUF_NULL;
struct buffer *tmp = get_trash_chunk();
TRACE_ENTER(H3_EV_RX_FRAME|H3_EV_RX_HDR, qcs->qcc->conn, qcs);
/* TODO support buffer wrapping */
- BUG_ON(ncb_head(buf) + len >= ncb_wrap(buf));
- if (qpack_decode_fs((const unsigned char *)ncb_head(buf), len, tmp, list) < 0)
+ BUG_ON(b_head(buf) + len >= b_wrap(buf));
+ if (qpack_decode_fs((const unsigned char *)b_head(buf), len, tmp, list) < 0)
return -1;
qc_get_buf(qcs, &htx_buf);
*
* Returns the number of bytes handled or a negative error code.
*/
-static int h3_data_to_htx(struct qcs *qcs, struct ncbuf *buf, uint64_t len,
- char fin)
+static int h3_data_to_htx(struct qcs *qcs, const struct buffer *buf,
+ uint64_t len, char fin)
{
struct buffer *appbuf;
struct htx *htx = NULL;
BUG_ON(!appbuf);
htx = htx_from_buf(appbuf);
- if (len > ncb_data(buf, 0)) {
- len = ncb_data(buf, 0);
+ if (len > b_data(buf)) {
+ len = b_data(buf);
fin = 0;
}
- head = ncb_head(buf);
+ head = b_head(buf);
retry:
htx_space = htx_free_data_space(htx);
if (!htx_space) {
fin = 0;
}
- if (head + len > ncb_wrap(buf)) {
- size_t contig = ncb_wrap(buf) - head;
- htx_sent = htx_add_data(htx, ist2(ncb_head(buf), contig));
+ if (head + len > b_wrap(buf)) {
+ size_t contig = b_wrap(buf) - head;
+ htx_sent = htx_add_data(htx, ist2(b_head(buf), contig));
if (htx_sent < contig) {
qcs->flags |= QC_SF_DEM_FULL;
goto out;
}
len -= contig;
- head = ncb_orig(buf);
+ head = b_orig(buf);
goto retry;
}
return htx_sent;
}
-/* Parse a SETTINGS frame of length <len> of payload <rxbuf>.
+/* Parse a SETTINGS frame of length <len> of payload <buf>.
*
* Returns the number of bytes handled or a negative error code.
*/
-static size_t h3_parse_settings_frm(struct h3c *h3c, const struct ncbuf *rxbuf,
+static size_t h3_parse_settings_frm(struct h3c *h3c, const struct buffer *buf,
size_t len)
{
struct buffer b;
TRACE_ENTER(H3_EV_RX_FRAME|H3_EV_RX_SETTINGS, h3c->qcc->conn);
- b = h3_b_dup(rxbuf);
- b_set_data(&b, len);
+ /* Work on a copy of <buf>. */
+ b = b_make(b_orig(buf), b_size(buf), b_head_ofs(buf), b_data(buf));
+
+ /* TODO handle incomplete SETTINGS frame */
+ BUG_ON(len < b_data(&b));
while (b_data(&b)) {
if (!b_quic_dec_int(&id, &b, &ret) || !b_quic_dec_int(&value, &b, &ret)) {
*
* Returns 0 on success else non-zero.
*/
-static int h3_decode_qcs(struct qcs *qcs, int fin)
+static int h3_decode_qcs(struct qcs *qcs, struct buffer *b, int fin)
{
- struct ncbuf *rxbuf = &qcs->rx.ncbuf;
struct h3s *h3s = qcs->ctx;
struct h3c *h3c = h3s->h3c;
ssize_t ret;
h3_debug_printf(stderr, "%s: STREAM ID: %lu\n", __func__, qcs->id);
- if (!ncb_data(rxbuf, 0))
+ if (!b_data(b))
return 0;
if (quic_stream_is_uni(qcs->id) && !(h3s->flags & H3_SF_UNI_INIT)) {
- if (h3_init_uni_stream(h3c, qcs, rxbuf))
+ if (h3_init_uni_stream(h3c, qcs, b))
return 1;
}
if (quic_stream_is_uni(qcs->id) && (h3s->flags & H3_SF_UNI_NO_H3)) {
/* For non-h3 STREAM, parse it and return immediately. */
- if (h3_parse_uni_stream_no_h3(qcs, rxbuf))
+ if (h3_parse_uni_stream_no_h3(qcs, b))
return 1;
return 0;
}
- while (ncb_data(rxbuf, 0) && !(qcs->flags & QC_SF_DEM_FULL)) {
+ while (b_data(b) && !(qcs->flags & QC_SF_DEM_FULL)) {
uint64_t ftype, flen;
char last_stream_frame = 0;
/* Work on a copy of <rxbuf> */
if (!h3s->demux_frame_len) {
- size_t hlen = h3_decode_frm_header(&ftype, &flen, rxbuf);
+ size_t hlen = h3_decode_frm_header(&ftype, &flen, b);
if (!hlen)
break;
return 1;
}
- qcs_consume(qcs, hlen);
- if (!ncb_data(rxbuf, 0))
+ if (!b_data(b))
break;
}
/* Do not demux incomplete frames except H3 DATA which can be
* fragmented in multiple HTX blocks.
*/
- if (flen > ncb_data(rxbuf, 0) && ftype != H3_FT_DATA) {
+ if (flen > b_data(b) && ftype != H3_FT_DATA) {
/* Reject frames bigger than bufsize.
*
* TODO HEADERS should in complement be limited with H3
* SETTINGS_MAX_FIELD_SECTION_SIZE parameter to prevent
* excessive decompressed size.
*/
- if (flen > ncb_size(rxbuf)) {
+ if (flen > QC_S_RX_BUF_SZ) {
qcc_emit_cc_app(qcs->qcc, H3_EXCESSIVE_LOAD);
return 1;
}
break;
}
- last_stream_frame = (fin && flen == ncb_total_data(rxbuf));
+ last_stream_frame = (fin && flen == b_data(b));
h3_inc_frame_type_cnt(h3c->prx_counters, ftype);
switch (ftype) {
case H3_FT_DATA:
- ret = h3_data_to_htx(qcs, rxbuf, flen, last_stream_frame);
+ ret = h3_data_to_htx(qcs, b, flen, last_stream_frame);
/* TODO handle error reporting. Stream closure required. */
if (ret < 0) { ABORT_NOW(); }
break;
case H3_FT_HEADERS:
- ret = h3_headers_to_htx(qcs, rxbuf, flen, last_stream_frame);
+ ret = h3_headers_to_htx(qcs, b, flen, last_stream_frame);
/* TODO handle error reporting. Stream closure required. */
if (ret < 0) { ABORT_NOW(); }
break;
ret = flen;
break;
case H3_FT_SETTINGS:
- ret = h3_parse_settings_frm(qcs->qcc->ctx, rxbuf, flen);
+ ret = h3_parse_settings_frm(qcs->qcc->ctx, b, flen);
if (ret < 0) {
qcc_emit_cc_app(qcs->qcc, h3c->err);
return 1;
if (ret) {
BUG_ON(h3s->demux_frame_len < ret);
h3s->demux_frame_len -= ret;
- qcs_consume(qcs, ret);
+ b_del(b, ret);
}
}
}
}
-/* Remove <bytes> from <qcs> Rx buffer. This must be called by transcoders
- * after STREAM parsing. Flow-control for received offsets may be allocated for
- * the peer if needed.
- */
-void qcs_consume(struct qcs *qcs, uint64_t bytes)
-{
- struct qcc *qcc = qcs->qcc;
- struct quic_frame *frm;
- struct ncbuf *buf = &qcs->rx.ncbuf;
- enum ncb_ret ret;
-
- ret = ncb_advance(buf, bytes);
- if (ret) {
- ABORT_NOW(); /* should not happens because removal only in data */
- }
-
- if (ncb_is_empty(buf))
- qc_free_ncbuf(qcs, buf);
-
- qcs->rx.offset += bytes;
- if (qcs->rx.msd - qcs->rx.offset < qcs->rx.msd_init / 2) {
- frm = pool_zalloc(pool_head_quic_frame);
- BUG_ON(!frm); /* TODO handle this properly */
-
- qcs->rx.msd = qcs->rx.offset + qcs->rx.msd_init;
-
- LIST_INIT(&frm->reflist);
- frm->type = QUIC_FT_MAX_STREAM_DATA;
- frm->max_stream_data.id = qcs->id;
- frm->max_stream_data.max_stream_data = qcs->rx.msd;
-
- LIST_APPEND(&qcc->lfctl.frms, &frm->list);
- tasklet_wakeup(qcc->wait_event.tasklet);
- }
-
- qcc->lfctl.offsets_consume += bytes;
- if (qcc->lfctl.md - qcc->lfctl.offsets_consume < qcc->lfctl.md_init / 2) {
- frm = pool_zalloc(pool_head_quic_frame);
- BUG_ON(!frm); /* TODO handle this properly */
-
- qcc->lfctl.md = qcc->lfctl.offsets_consume + qcc->lfctl.md_init;
-
- LIST_INIT(&frm->reflist);
- frm->type = QUIC_FT_MAX_DATA;
- frm->max_data.max_data = qcc->lfctl.md;
-
- LIST_APPEND(&qcs->qcc->lfctl.frms, &frm->list);
- tasklet_wakeup(qcs->qcc->wait_event.tasklet);
- }
-}
-
/* Retrieve as an ebtree node the stream with <id> as ID, possibly allocates
* several streams, depending on the already open ones.
* Return this node if succeeded, NULL if not.
return NULL;
}
+/* Simple function to duplicate a buffer */
+static inline struct buffer qcs_b_dup(const struct ncbuf *b)
+{
+ return b_make(ncb_orig(b), b->size, b->head, ncb_data(b, 0));
+}
+
+/* Remove <bytes> from <qcs> Rx buffer. This must be called by transcoders
+ * after STREAM parsing. Flow-control for received offsets may be allocated for
+ * the peer if needed.
+ */
+static void qcs_consume(struct qcs *qcs, uint64_t bytes)
+{
+ struct qcc *qcc = qcs->qcc;
+ struct quic_frame *frm;
+ struct ncbuf *buf = &qcs->rx.ncbuf;
+ enum ncb_ret ret;
+
+ ret = ncb_advance(buf, bytes);
+ if (ret) {
+ ABORT_NOW(); /* should not happens because removal only in data */
+ }
+
+ if (ncb_is_empty(buf))
+ qc_free_ncbuf(qcs, buf);
+
+ qcs->rx.offset += bytes;
+ if (qcs->rx.msd - qcs->rx.offset < qcs->rx.msd_init / 2) {
+ frm = pool_zalloc(pool_head_quic_frame);
+ BUG_ON(!frm); /* TODO handle this properly */
+
+ qcs->rx.msd = qcs->rx.offset + qcs->rx.msd_init;
+
+ LIST_INIT(&frm->reflist);
+ frm->type = QUIC_FT_MAX_STREAM_DATA;
+ frm->max_stream_data.id = qcs->id;
+ frm->max_stream_data.max_stream_data = qcs->rx.msd;
+
+ LIST_APPEND(&qcc->lfctl.frms, &frm->list);
+ tasklet_wakeup(qcc->wait_event.tasklet);
+ }
+
+ qcc->lfctl.offsets_consume += bytes;
+ if (qcc->lfctl.md - qcc->lfctl.offsets_consume < qcc->lfctl.md_init / 2) {
+ frm = pool_zalloc(pool_head_quic_frame);
+ BUG_ON(!frm); /* TODO handle this properly */
+
+ qcc->lfctl.md = qcc->lfctl.offsets_consume + qcc->lfctl.md_init;
+
+ LIST_INIT(&frm->reflist);
+ frm->type = QUIC_FT_MAX_DATA;
+ frm->max_data.max_data = qcc->lfctl.md;
+
+ LIST_APPEND(&qcs->qcc->lfctl.frms, &frm->list);
+ tasklet_wakeup(qcs->qcc->wait_event.tasklet);
+ }
+}
+
/* Decode the content of STREAM frames already received on the stream instance
* <qcs>.
*
*/
static int qcc_decode_qcs(struct qcc *qcc, struct qcs *qcs)
{
+ struct buffer b;
+ size_t data, done;
+ int ret;
+
TRACE_ENTER(QMUX_EV_QCS_RECV, qcc->conn, qcs);
- if (qcc->app_ops->decode_qcs(qcs, qcs->flags & QC_SF_FIN_RECV)) {
+ b = qcs_b_dup(&qcs->rx.ncbuf);
+ data = b_data(&b);
+
+ ret = qcc->app_ops->decode_qcs(qcs, &b, qcs->flags & QC_SF_FIN_RECV);
+ if (ret) {
TRACE_DEVEL("leaving on decoding error", QMUX_EV_QCS_RECV, qcc->conn, qcs);
return 1;
}
- qcs_notify_recv(qcs);
+ BUG_ON_HOT(data < b_data(&b));
+ done = data - b_data(&b);
+ if (done) {
+ qcs_consume(qcs, done);
+ qcs_notify_recv(qcs);
+ }
TRACE_LEAVE(QMUX_EV_QCS_RECV, qcc->conn, qcs);