return NULL;
}
+/* Handle a new STREAM frame <strm_frm>. The frame content will be copied in
+ * the buffer of the stream instance. The stream instance will be stored in
+ * <out_qcs>. In case of success, the caller can immediatly call qcc_decode_qcs
+ * to process the frame content.
+ *
+ * Returns 0 on success. On errors, two codes are present.
+ * - 1 is returned if the frame cannot be decoded and must be discarded.
+ * - 2 is returned if the stream cannot decode at the moment the frame. The
+ * frame should be buffered to be handled later.
+ */
+int qcc_recv(struct qcc *qcc, uint64_t id, uint64_t len, uint64_t offset,
+ char fin, char *data, struct qcs **out_qcs)
+{
+ struct qcs *qcs;
+ struct eb64_node *strm_node;
+ size_t total, diff;
+
+ strm_node = qcc_get_qcs(qcc, id);
+ if (!strm_node) {
+ fprintf(stderr, "%s: stream not found\n", __func__);
+ return 1;
+ }
+
+ qcs = eb64_entry(&strm_node->node, struct qcs, by_id);
+ *out_qcs = qcs;
+
+ if (offset > qcs->rx.offset)
+ return 2;
+
+ if (offset + len <= qcs->rx.offset) {
+ fprintf(stderr, "%s: already received STREAM data\n", __func__);
+ return 1;
+ }
+
+ /* Last frame already handled for this stream. */
+ BUG_ON(qcs->flags & QC_SF_FIN_RECV);
+
+ if (!qc_get_buf(qcs, &qcs->rx.buf)) {
+ /* TODO should mark qcs as full */
+ return 2;
+ }
+
+ fprintf(stderr, "%s: new STREAM data\n", __func__);
+ diff = qcs->rx.offset - offset;
+
+ /* TODO do not partially copy a frame if not enough size left. Maybe
+ * this can be optimized.
+ */
+ if (len > b_room(&qcs->rx.buf)) {
+ /* TODO handle STREAM frames larger than RX buffer. */
+ BUG_ON(len > b_size(&qcs->rx.buf));
+ return 2;
+ }
+
+ len -= diff;
+ data += diff;
+
+ total = b_putblk(&qcs->rx.buf, data, len);
+ /* TODO handle partial copy of a STREAM frame. */
+ BUG_ON(len != total);
+
+ qcs->rx.offset += total;
+
+ if (fin)
+ qcs->flags |= QC_SF_FIN_RECV;
+
+ out:
+ return 0;
+}
+
+/* Decode the content of STREAM frames already received on the stream instance
+ * <qcs>.
+ *
+ * Returns 0 on success else non-zero.
+ */
+int qcc_decode_qcs(struct qcc *qcc, struct qcs *qcs)
+{
+ if (qcc->app_ops->decode_qcs(qcs, qcs->flags & QC_SF_FIN_RECV, qcc->ctx) < 0) {
+ fprintf(stderr, "%s: decoding error\n", __func__);
+ return 1;
+ }
+
+ return 0;
+}
+
/* detaches the QUIC stream from its QCC and releases it to the QCS pool. */
static void qcs_destroy(struct qcs *qcs)
{
return ret;
}
-/* Copy as most as possible STREAM data from <strm_frm> into <buf> buffer.
- *
- * Note that <strm_frm> is not updated as it is implied that the frame may be
- * present in a tree and offset node is used as the key. The caller should
- * update offset/lenght of the frame after the function call.
- *
- * Return the total count of copied bytes.
- */
-static size_t qc_rx_strm_frm_cpy(struct buffer *buf,
- struct quic_rx_strm_frm *strm_frm)
-{
- size_t flen = strm_frm->len;
- size_t ret = 0;
- size_t try;
-
- ret = 0;
- while (flen && (try = b_contig_space(buf))) {
- if (try > flen)
- try = flen;
-
- memcpy(b_tail(buf), strm_frm->data + ret, try);
- b_add(buf, try);
- ret += try;
- flen -= try;
- }
-
- return ret;
-}
-
-/* Process as much as possible RX STREAM frames received for <qcs> */
-static size_t qc_treat_rx_strm_frms(struct qcs *qcs)
-{
- int total;
- struct eb64_node *frm_node;
-
- total = 0;
- frm_node = eb64_first(&qcs->rx.frms);
- while (frm_node) {
- int ret;
- struct quic_rx_strm_frm *frm;
- size_t diff;
-
- frm = eb64_entry(&frm_node->node, struct quic_rx_strm_frm, offset_node);
- if (frm->offset_node.key + frm->len < qcs->rx.offset) {
- /* fully already received STREAM offset */
- goto next;
- }
-
- BUG_ON(qcs->flags & QC_SF_FIN_RECV);
- if (frm->offset_node.key > qcs->rx.offset)
- break;
-
- diff = qcs->rx.offset - frm->offset_node.key;
- frm->data += diff;
- frm->len -= diff;
-
- ret = qc_rx_strm_frm_cpy(&qcs->rx.buf, frm);
- qcs->rx.offset += ret;
- total += ret;
-
- BUG_ON(frm->len < ret);
- if (frm->len - ret > 0) {
- /* Remove the frame from the tree before updating the
- * offset field.
- */
- eb64_delete(&frm->offset_node);
- frm->offset_node.key += (diff + ret);
- frm->data += ret;
- frm->len -= ret;
- eb64_insert(&qcs->rx.frms, &frm->offset_node);
- break;
- }
-
- if (frm->fin)
- qcs->flags |= QC_SF_FIN_RECV;
-
- next:
- frm_node = eb64_next(frm_node);
- quic_rx_packet_refdec(frm->pkt);
- eb64_delete(&frm->offset_node);
- pool_free(pool_head_quic_rx_strm_frm, frm);
- }
-
- return total;
-}
-
/* Handle <strm_frm> bidirectional STREAM frame. Depending on its ID, several
* streams may be open. The data are copied to the stream RX buffer if possible.
* If not, the STREAM frame is stored to be treated again later.
struct quic_stream *strm_frm,
struct quic_conn *qc)
{
- int total;
- struct qcs *strm;
- struct eb64_node *strm_node;
struct quic_rx_strm_frm *frm;
+ struct eb64_node *frm_node;
+ struct qcs *qcs = NULL;
+ int ret;
- strm_node = qcc_get_qcs(qc->qcc, strm_frm->id);
- if (!strm_node) {
- TRACE_PROTO("Stream not found", QUIC_EV_CONN_PSTRM, qc);
- return 0;
- }
+ ret = qcc_recv(qc->qcc, strm_frm->id, strm_frm->len,
+ strm_frm->offset.key, strm_frm->fin,
+ (char *)strm_frm->data, &qcs);
- strm = eb64_entry(&strm_node->node, struct qcs, by_id);
- if (strm_frm->offset.key < strm->rx.offset) {
- size_t diff;
+ /* invalid or already received frame */
+ if (ret == 1)
+ return 0;
- if (strm_frm->offset.key + strm_frm->len <= strm->rx.offset) {
- TRACE_PROTO("Already received STREAM data",
+ if (ret == 2) {
+ /* frame cannot be parsed at the moment and should be
+ * buffered.
+ */
+ frm = new_quic_rx_strm_frm(strm_frm, pkt);
+ if (!frm) {
+ TRACE_PROTO("Could not alloc RX STREAM frame",
QUIC_EV_CONN_PSTRM, qc);
- goto out;
+ return 0;
}
- TRACE_PROTO("Partially already received STREAM data", QUIC_EV_CONN_PSTRM, qc);
- diff = strm->rx.offset - strm_frm->offset.key;
- strm_frm->offset.key = strm->rx.offset;
- strm_frm->len -= diff;
- strm_frm->data += diff;
- }
-
- BUG_ON(strm->flags & QC_SF_FIN_RECV);
-
- total = 0;
- if (strm_frm->offset.key == strm->rx.offset) {
- int ret;
-
- if (!qc_get_buf(strm, &strm->rx.buf))
- goto store_frm;
+ eb64_insert(&qcs->rx.frms, &frm->offset_node);
+ quic_rx_packet_refinc(pkt);
- ret = qc_strm_cpy(&strm->rx.buf, strm_frm);
- total += ret;
- strm->rx.offset += ret;
+ return 1;
}
- /* FIN is set only if all data were copied. */
- if (strm_frm->fin && !strm_frm->len)
- strm->flags |= QC_SF_FIN_RECV;
-
- total += qc_treat_rx_strm_frms(strm);
+ /* Frame correctly received by the mux.
+ * If there is buffered frame for next offset, it may be possible to
+ * receive them now.
+ */
+ frm_node = eb64_first(&qcs->rx.frms);
+ while (frm_node) {
+ frm = eb64_entry(&frm_node->node,
+ struct quic_rx_strm_frm, offset_node);
- if (total && qc->qcc->app_ops->decode_qcs(strm, strm->flags & QC_SF_FIN_RECV, qc->qcc->ctx) < 0) {
- TRACE_PROTO("Decoding error", QUIC_EV_CONN_PSTRM, qc);
- return 0;
- }
+ ret = qcc_recv(qc->qcc, qcs->by_id.key, frm->len,
+ frm->offset_node.key, frm->fin,
+ (char *)frm->data, &qcs);
- if (!strm_frm->len)
- goto out;
+ /* interrupt the parsing if the frame cannot be handled for the
+ * moment only by the MUX.
+ */
+ if (ret == 2)
+ break;
- store_frm:
- frm = new_quic_rx_strm_frm(strm_frm, pkt);
- if (!frm) {
- TRACE_PROTO("Could not alloc RX STREAM frame",
- QUIC_EV_CONN_PSTRM, qc);
- return 0;
+ /* Remove a newly received frame or an invalid one. */
+ frm_node = eb64_next(frm_node);
+ eb64_delete(&frm->offset_node);
+ quic_rx_packet_refdec(frm->pkt);
+ pool_free(pool_head_quic_rx_strm_frm, frm);
}
- eb64_insert(&strm->rx.frms, &frm->offset_node);
- quic_rx_packet_refinc(pkt);
+ /* Decode the received data. */
+ if (qcc_decode_qcs(qc->qcc, qcs))
+ return 0;
- out:
return 1;
}