#include <haproxy/buf-t.h>
#include <haproxy/connection-t.h>
#include <haproxy/list-t.h>
+#include <haproxy/ncbuf-t.h>
#include <haproxy/quic_stream-t.h>
#include <haproxy/conn_stream-t.h>
struct {
struct eb_root frms; /* received frames ordered by their offsets */
- uint64_t offset; /* the current offset of received data */
+ uint64_t offset; /* absolute current base offset of ncbuf */
struct buffer buf; /* receive buffer, always valid (buf_empty or real buffer) */
+ struct ncbuf ncbuf; /* receive buffer - can handle out-of-order offset frames */
struct buffer app_buf; /* receive buffer used by conn_stream layer */
uint64_t msd; /* fctl bytes limit to enforce */
} rx;
void qcs_free(struct qcs *qcs);
struct buffer *qc_get_buf(struct qcs *qcs, struct buffer *bptr);
+struct ncbuf *qc_get_ncbuf(struct qcs *qcs, struct ncbuf *ncbuf);
int qcs_subscribe(struct qcs *qcs, int event_type, struct wait_event *es);
void qcs_notify_recv(struct qcs *qcs);
void qcs_notify_send(struct qcs *qcs);
int qcc_recv(struct qcc *qcc, uint64_t id, uint64_t len, uint64_t offset,
- char fin, char *data, struct qcs **out_qcs, size_t *done);
+ char fin, char *data, struct qcs **out_qcs);
int qcc_recv_max_data(struct qcc *qcc, uint64_t max);
int qcc_recv_max_stream_data(struct qcc *qcc, uint64_t id, uint64_t max);
int qcc_decode_qcs(struct qcc *qcc, struct qcs *qcs);
#include <haproxy/htx.h>
#include <haproxy/istbuf.h>
#include <haproxy/mux_quic.h>
+#include <haproxy/ncbuf.h>
#include <haproxy/pool.h>
#include <haproxy/qpack-dec.h>
#include <haproxy/qpack-enc.h>
DECLARE_STATIC_POOL(pool_head_h3s, "h3s", sizeof(struct h3s));
/* Simple function to duplicate a buffer */
-static inline struct buffer h3_b_dup(struct buffer *b)
+static inline struct buffer h3_b_dup(struct ncbuf *b)
{
- return b_make(b->area, b->size, b->head, b->data);
+ return b_make(ncb_orig(b), b->size, b->head, ncb_data(b, 0));
}
/* Decode a h3 frame header made of two QUIC varints from <b> buffer.
*
* Returns the number of bytes handled or a negative error code.
*/
-static int h3_headers_to_htx(struct qcs *qcs, struct buffer *buf, uint64_t len,
+static int h3_headers_to_htx(struct qcs *qcs, struct ncbuf *buf, uint64_t len,
char fin)
{
struct buffer htx_buf = BUF_NULL;
int hdr_idx;
/* TODO support buffer wrapping */
- BUG_ON(b_contig_data(buf, 0) != b_data(buf));
- if (qpack_decode_fs((const unsigned char *)b_head(buf), len, tmp, list) < 0)
+ BUG_ON(ncb_head(buf) + len >= ncb_wrap(buf));
+ if (qpack_decode_fs((const unsigned char *)ncb_head(buf), len, tmp, list) < 0)
return -1;
qc_get_buf(qcs, &htx_buf);
*
* Returns the number of bytes handled or a negative error code.
*/
-static int h3_data_to_htx(struct qcs *qcs, struct buffer *buf, uint64_t len,
+static int h3_data_to_htx(struct qcs *qcs, struct ncbuf *buf, uint64_t len,
char fin)
{
struct buffer *appbuf;
struct htx *htx = NULL;
- size_t contig = 0, htx_sent = 0;
+ size_t htx_sent = 0;
int htx_space;
char *head;
BUG_ON(!appbuf);
htx = htx_from_buf(appbuf);
- if (len > b_data(buf)) {
- len = b_data(buf);
+ if (len > ncb_data(buf, 0)) {
+ len = ncb_data(buf, 0);
fin = 0;
}
- head = b_head(buf);
+ head = ncb_head(buf);
retry:
htx_space = htx_free_data_space(htx);
if (!htx_space) {
fin = 0;
}
- contig = b_contig_data(buf, contig);
- if (len > contig) {
- htx_sent = htx_add_data(htx, ist2(b_head(buf), contig));
- head = b_orig(buf);
+ if (head + len > ncb_wrap(buf)) {
+ size_t contig = ncb_wrap(buf) - head;
+ htx_sent = htx_add_data(htx, ist2(ncb_head(buf), contig));
+ head = ncb_orig(buf);
len -= contig;
goto retry;
}
*/
static int h3_decode_qcs(struct qcs *qcs, int fin, void *ctx)
{
- struct buffer *rxbuf = &qcs->rx.buf;
+ struct ncbuf *rxbuf = &qcs->rx.ncbuf;
struct h3s *h3s = qcs->ctx;
ssize_t ret;
h3_debug_printf(stderr, "%s: STREAM ID: %lu\n", __func__, qcs->id);
- if (!b_data(rxbuf))
+ if (!ncb_data(rxbuf, 0))
return 0;
- while (b_data(rxbuf) && !(qcs->flags & QC_SF_DEM_FULL)) {
+ while (ncb_data(rxbuf, 0) && !(qcs->flags & QC_SF_DEM_FULL)) {
uint64_t ftype, flen;
struct buffer b;
char last_stream_frame = 0;
h3_debug_printf(stderr, "%s: ftype: %lu, flen: %lu\n",
__func__, ftype, flen);
- b_del(rxbuf, hlen);
+ ncb_advance(rxbuf, hlen);
h3s->demux_frame_type = ftype;
h3s->demux_frame_len = flen;
+ qcs->rx.offset += hlen;
}
flen = h3s->demux_frame_len;
ftype = h3s->demux_frame_type;
- if (flen > b_data(&b) && !b_full(rxbuf))
+ if (flen > b_data(&b) && !ncb_is_full(rxbuf))
break;
- last_stream_frame = (fin && flen == b_data(rxbuf));
+ last_stream_frame = (fin && flen == ncb_total_data(rxbuf));
switch (ftype) {
case H3_FT_DATA:
break;
case H3_FT_PUSH_PROMISE:
/* Not supported */
- ret = MIN(b_data(rxbuf), flen);
+ ret = MIN(ncb_data(rxbuf, 0), flen);
break;
default:
/* draft-ietf-quic-http34 9. Extensions to HTTP/3
* unknown frame types MUST be ignored
*/
h3_debug_printf(stderr, "ignore unknown frame type 0x%lx\n", ftype);
- ret = MIN(b_data(rxbuf), flen);
+ ret = MIN(ncb_data(rxbuf, 0), flen);
}
if (ret) {
- b_del(rxbuf, ret);
+ ncb_advance(rxbuf, ret);
BUG_ON(h3s->demux_frame_len < ret);
h3s->demux_frame_len -= ret;
+ qcs->rx.offset += ret;
}
}
struct buffer b;
/* Work on a copy of <rxbuf> */
- b = h3_b_dup(rxbuf);
+ b = b_make(rxbuf->area, rxbuf->size, rxbuf->head, rxbuf->data);
hlen = h3_decode_frm_header(&ftype, &flen, &b);
if (!hlen)
break;
#include <haproxy/htx.h>
#include <haproxy/http.h>
#include <haproxy/mux_quic.h>
+#include <haproxy/ncbuf.h>
static int hq_interop_decode_qcs(struct qcs *qcs, int fin, void *ctx)
{
- struct buffer *rxbuf = &qcs->rx.buf;
+ struct ncbuf *rxbuf = &qcs->rx.ncbuf;
struct htx *htx;
struct htx_sl *sl;
struct conn_stream *cs;
struct buffer htx_buf = BUF_NULL;
struct ist path;
- char *ptr = b_head(rxbuf);
- char *end = b_wrap(rxbuf);
- size_t size = b_size(rxbuf);
- size_t data = b_data(rxbuf);
+ char *ptr = ncb_head(rxbuf);
+ char *end = ncb_wrap(rxbuf);
+ size_t size = ncb_size(rxbuf);
+ size_t data = ncb_data(rxbuf, 0);
b_alloc(&htx_buf);
htx = htx_from_buf(&htx_buf);
return -1;
- b_del(rxbuf, b_data(rxbuf));
+ qcs->rx.offset += ncb_data(rxbuf, 0);
+ ncb_advance(rxbuf, ncb_data(rxbuf, 0));
b_free(&htx_buf);
if (fin)
#include <haproxy/dynbuf.h>
#include <haproxy/htx.h>
#include <haproxy/list.h>
+#include <haproxy/ncbuf.h>
#include <haproxy/pool.h>
#include <haproxy/quic_stream.h>
#include <haproxy/sink.h>
qcc->rfctl.msd_bidi_l;
qcs->rx.buf = BUF_NULL;
+ qcs->rx.ncbuf = NCBUF_NULL;
qcs->rx.app_buf = BUF_NULL;
qcs->rx.offset = 0;
qcs->rx.frms = EB_ROOT_UNIQUE;
return NULL;
}
+static void qc_free_ncbuf(struct qcs *qcs, struct ncbuf *ncbuf)
+{
+ struct buffer buf;
+
+ buf = b_make(ncbuf->area, ncbuf->size, 0, 0);
+ b_free(&buf);
+
+ *ncbuf = NCBUF_NULL;
+}
+
/* Free a qcs. This function must only be done to remove a stream on allocation
* error or connection shutdown. Else use qcs_destroy which handle all the
* QUIC connection mechanism.
void qcs_free(struct qcs *qcs)
{
b_free(&qcs->rx.buf);
+ qc_free_ncbuf(qcs, &qcs->rx.ncbuf);
b_free(&qcs->tx.buf);
BUG_ON(!qcs->qcc->strms[qcs_id_type(qcs->id)].nb_streams);
return buf;
}
+struct ncbuf *qc_get_ncbuf(struct qcs *qcs, struct ncbuf *ncbuf)
+{
+ struct buffer buf = BUF_NULL;
+
+ if (ncb_is_null(ncbuf)) {
+ b_alloc(&buf);
+ BUG_ON(b_is_null(&buf));
+
+ *ncbuf = ncb_make(buf.area, buf.size, 0);
+ ncb_init(ncbuf, 0);
+ }
+
+ return ncbuf;
+}
+
int qcs_subscribe(struct qcs *qcs, int event_type, struct wait_event *es)
{
struct qcc *qcc = qcs->qcc;
* <out_qcs>. In case of success, the caller can immediatly call qcc_decode_qcs
* to process the frame content.
*
- * Returns a code indicating how the frame was handled.
- * - 0: frame received completely and can be dropped.
- * - 1: frame not received but can be dropped.
- * - 2: frame cannot be handled, either partially or not at all. <done>
- * indicated the number of bytes handled. The rest should be buffered.
+ * Returns 0 on success else non-zero.
*/
int qcc_recv(struct qcc *qcc, uint64_t id, uint64_t len, uint64_t offset,
- char fin, char *data, struct qcs **out_qcs, size_t *done)
+ char fin, char *data, struct qcs **out_qcs)
{
struct qcs *qcs;
- size_t total, diff;
+ enum ncb_ret ret;
TRACE_ENTER(QMUX_EV_QCC_RECV, qcc->conn);
*out_qcs = NULL;
- *done = 0;
qcs = qcc_get_qcs(qcc, id);
if (!qcs) {
*out_qcs = qcs;
- if (offset > qcs->rx.offset)
- return 2;
-
if (offset + len <= qcs->rx.offset) {
TRACE_DEVEL("leaving on already received offset", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV, qcc->conn, qcs);
return 0;
}
- /* Last frame already handled for this stream. */
- BUG_ON(qcs->flags & QC_SF_FIN_RECV);
+ /* TODO if last frame already received, stream size must not change.
+ * Else send FINAL_SIZE_ERROR.
+ */
+
/* TODO initial max-stream-data overflow. Implement FLOW_CONTROL_ERROR emission. */
BUG_ON(offset + len > qcs->rx.msd);
- if (!qc_get_buf(qcs, &qcs->rx.buf) || b_full(&qcs->rx.buf)) {
+ if (!qc_get_ncbuf(qcs, &qcs->rx.ncbuf) || ncb_is_null(&qcs->rx.ncbuf)) {
/* TODO should mark qcs as full */
- return 2;
+ ABORT_NOW();
+ return 1;
}
TRACE_DEVEL("newly received offset", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV, qcc->conn, qcs);
- diff = qcs->rx.offset - offset;
-
- len -= diff;
- data += diff;
-
- /* TODO handle STREAM frames larger than RX buffer. */
- BUG_ON(len > b_size(&qcs->rx.buf));
+ if (offset < qcs->rx.offset) {
+ len -= qcs->rx.offset - offset;
+ offset = qcs->rx.offset;
+ }
- total = b_putblk(&qcs->rx.buf, data, len);
- qcs->rx.offset += total;
- *done = total;
+ ret = ncb_add(&qcs->rx.ncbuf, offset - qcs->rx.offset, data, len, NCB_ADD_COMPARE);
+ if (ret != NCB_RET_OK) {
+ if (ret == NCB_RET_DATA_REJ) {
+ /* TODO generate PROTOCOL_VIOLATION error */
+ TRACE_DEVEL("leaving on data rejected", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV,
+ qcc->conn, qcs);
+ }
+ else if (ret == NCB_RET_GAP_SIZE) {
+ TRACE_DEVEL("cannot bufferize frame due to gap size limit", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV,
+ qcc->conn, qcs);
+ }
+ return 1;
+ }
/* TODO initial max-stream-data reached. Implement MAX_STREAM_DATA emission. */
- BUG_ON(qcs->rx.offset == qcs->rx.msd);
-
- if (total < len) {
- TRACE_DEVEL("leaving on partially received offset", QMUX_EV_QCC_RECV|QMUX_EV_QCS_RECV, qcc->conn, qcs);
- return 2;
- }
+ BUG_ON(offset + len == qcs->rx.msd);
if (fin)
qcs->flags |= QC_SF_FIN_RECV;
struct quic_stream *strm_frm,
struct quic_conn *qc)
{
- struct quic_rx_strm_frm *frm;
- struct eb64_node *frm_node;
struct qcs *qcs = NULL;
- size_t done, buf_was_full;
int ret;
ret = qcc_recv(qc->qcc, strm_frm->id, strm_frm->len,
strm_frm->offset.key, strm_frm->fin,
- (char *)strm_frm->data, &qcs, &done);
+ (char *)strm_frm->data, &qcs);
- /* invalid frame */
- if (ret == 1)
+ /* frame rejected - packet must not be acknowledeged */
+ if (ret)
return 0;
- /* already fully received offset */
- if (ret == 0 && done == 0)
- return 1;
-
- /* frame not handled (partially or completely) must be buffered */
- if (ret == 2) {
- frm = new_quic_rx_strm_frm(strm_frm, pkt);
- if (!frm) {
- TRACE_PROTO("Could not alloc RX STREAM frame",
- QUIC_EV_CONN_PSTRM, qc);
- return 0;
- }
-
- /* frame partially handled by the MUX */
- if (done) {
- BUG_ON(done >= frm->len); /* must never happen */
- frm->len -= done;
- frm->data += done;
- frm->offset_node.key += done;
- }
-
- eb64_insert(&qcs->rx.frms, &frm->offset_node);
- quic_rx_packet_refinc(pkt);
-
- /* interrupt only if frame was not received at all. */
- if (!done)
- return 1;
- }
-
- /* Decode the data if buffer is already full as it's not possible to
- * dequeue a frame in this condition.
- */
- if (b_full(&qcs->rx.buf))
+ if (qcs)
qcc_decode_qcs(qc->qcc, qcs);
-
- retry:
- /* Frame received (partially or not) by the mux.
- * If there is buffered frame for next offset, it may be possible to
- * receive them now.
- */
- frm_node = eb64_first(&qcs->rx.frms);
- while (frm_node) {
- frm = eb64_entry(frm_node,
- struct quic_rx_strm_frm, offset_node);
-
- ret = qcc_recv(qc->qcc, qcs->id, frm->len,
- frm->offset_node.key, frm->fin,
- (char *)frm->data, &qcs, &done);
-
- BUG_ON(ret == 1); /* must never happen for buffered frames */
-
- /* interrupt the parsing if the frame cannot be handled
- * entirely for the moment only.
- */
- if (ret == 2) {
- if (done) {
- BUG_ON(done >= frm->len); /* must never happen */
- frm->len -= done;
- frm->data += done;
-
- eb64_delete(&frm->offset_node);
- frm->offset_node.key += done;
- eb64_insert(&qcs->rx.frms, &frm->offset_node);
- }
- break;
- }
-
- /* Remove a newly received frame or an invalid one. */
- frm_node = eb64_next(frm_node);
- eb64_delete(&frm->offset_node);
- quic_rx_packet_refdec(frm->pkt);
- pool_free(pool_head_quic_rx_strm_frm, frm);
- }
-
- buf_was_full = b_full(&qcs->rx.buf);
- /* Decode the received data. */
- qcc_decode_qcs(qc->qcc, qcs);
-
- /* Buffer was full so the reception was stopped. Now the buffer has
- * space available thanks to qcc_decode_qcs(). We can now retry to
- * handle more data.
- */
- if (buf_was_full && !b_full(&qcs->rx.buf))
- goto retry;
-
return 1;
}