]> git.ipfire.org Git - thirdparty/openssl.git/commitdiff
QUIC Receive Stream Management
authorTomas Mraz <tomas@openssl.org>
Tue, 27 Sep 2022 12:08:43 +0000 (14:08 +0200)
committerHugo Landau <hlandau@openssl.org>
Mon, 14 Nov 2022 08:01:57 +0000 (08:01 +0000)
Added SFRAME_LIST structure and QUIC_RSTREAM object to
manage received stream data.

Reviewed-by: Matt Caswell <matt@openssl.org>
Reviewed-by: Hugo Landau <hlandau@openssl.org>
(Merged from https://github.com/openssl/openssl/pull/19351)

include/internal/quic_record_rx_wrap.h [moved from ssl/quic/quic_record_rx_wrap.h with 96% similarity]
include/internal/quic_sf_list.h [new file with mode: 0644]
include/internal/quic_stream.h
ssl/quic/build.info
ssl/quic/quic_record_rx.c
ssl/quic/quic_record_rx_wrap.c
ssl/quic/quic_rstream.c [new file with mode: 0644]
ssl/quic/quic_rx_depack.c
ssl/quic/quic_sf_list.c [new file with mode: 0644]
ssl/quic/quic_sstream.c [moved from ssl/quic/quic_stream.c with 100% similarity]
test/quic_stream_test.c

similarity index 96%
rename from ssl/quic/quic_record_rx_wrap.h
rename to include/internal/quic_record_rx_wrap.h
index 842038e2e75205f2e88b608672c7ab4beff36e73..c19f058db365c9373a8d2114e67e9a23f8c496a3 100644 (file)
@@ -10,8 +10,9 @@
 #ifndef OSSL_QUIC_RECORD_RX_WRAP_H
 # define OSSL_QUIC_RECORD_RX_WRAP_H
 
+# include <openssl/crypto.h>
+# include "internal/refcount.h"
 # include "internal/quic_record_rx.h"
-# include "quic_local.h"
 
 /*
  * OSSL_QRX_PKT handle wrapper for counted references
diff --git a/include/internal/quic_sf_list.h b/include/internal/quic_sf_list.h
new file mode 100644 (file)
index 0000000..95c7950
--- /dev/null
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2022 The OpenSSL Project Authors. All Rights Reserved.
+ *
+ * Licensed under the Apache License 2.0 (the "License").  You may not use
+ * this file except in compliance with the License.  You can obtain a copy
+ * in the file LICENSE in the source distribution or at
+ * https://www.openssl.org/source/license.html
+ */
+
+#ifndef OSSL_QUIC_SF_LIST_H
+# define OSSL_QUIC_SF_LIST_H
+
+#include "internal/common.h"
+#include "internal/uint_set.h"
+
+/*
+ * Stream frame list
+ * =================
+ *
+ * This data structure uses supports similar operations as uint64 set but
+ * it has slightly different invariants and also carries data associated with
+ * the ranges in the list.
+ *
+ * Operations:
+ *   Insert frame (optimized insertion at the beginning and at the end).
+ *   Iterated peek into the frame(s) from the beginning.
+ *   Dropping frames from the beginning up to an offset (exclusive).
+ *
+ * Invariant: The frames in the list are sorted by the start and end bounds.
+ * Invariant: There are no fully overlapping frames or frames that would
+ *            be fully encompassed by another frame in the list.
+ * Invariant: No frame has start > end.
+ * Invariant: The range start is inclusive the end is exclusive to be
+ *            able to mark an empty frame.
+ * Invariant: The offset never points further than into the first frame.
+ */
+
+typedef struct stream_frame_st STREAM_FRAME;
+
+typedef struct sframe_list_st {
+    OSSL_QRX *qrx;
+    STREAM_FRAME  *head, *tail;
+    /* Is the tail frame final. */
+    unsigned int fin;
+    /* Number of stream frames in the list. */
+    size_t num_frames;
+    /* Offset of data not yet dropped */
+    uint64_t offset;
+} SFRAME_LIST;
+
+void ossl_sframe_list_init(SFRAME_LIST *fl, OSSL_QRX *qrx);
+void ossl_sframe_list_destroy(SFRAME_LIST *fl);
+int ossl_sframe_list_insert(SFRAME_LIST *fl, UINT_RANGE *range,
+                            OSSL_QRX_PKT_WRAP *pkt,
+                            const unsigned char *data, int fin);
+int ossl_sframe_list_peek(const SFRAME_LIST *fl, void **iter,
+                          UINT_RANGE *range, const unsigned char **data,
+                          int *fin);
+int ossl_sframe_list_drop_frames(SFRAME_LIST *fl, uint64_t limit);
+
+#endif
index 8fa161aed958c392984549ca694d5d6029b2206a..d3b94023cded7f86d0ee9f7d8b474103b1aa9732 100644 (file)
@@ -16,6 +16,8 @@
 #include "internal/quic_types.h"
 #include "internal/quic_wire.h"
 #include "internal/quic_record_tx.h"
+#include "internal/quic_record_rx.h"
+#include "internal/quic_record_rx_wrap.h"
 
 /*
  * QUIC Send Stream
@@ -266,5 +268,61 @@ void ossl_quic_sstream_adjust_iov(size_t len,
                                   OSSL_QTX_IOVEC *iov,
                                   size_t num_iov);
 
+/*
+ * QUIC Receive Stream Manager
+ * ===========================
+ *
+ * The QUIC Receive Stream Manager (QUIC_RSTREAM) is responsible for:
+ *
+ *
+ * The QUIC_RSTREAM is instantiated once for every stream that can receive data.
+ * (i.e., for a unidirectional receiving stream or for the receiving component
+ * of a bidirectional stream).
+ */
+typedef struct quic_rstream_st QUIC_RSTREAM;
+
+/*
+ * Create a new instance of QUIC_RSTREAM.
+ */
+QUIC_RSTREAM *ossl_quic_rstream_new(OSSL_QRX *qrx);
+
+/*
+ * Frees a QUIC_RSTREAM and any associated storage.
+ */
+void ossl_quic_rstream_free(QUIC_RSTREAM *qrs);
+
+/*
+ *
+ */
+
+int ossl_quic_rstream_queue_data(QUIC_RSTREAM *qrs, OSSL_QRX_PKT_WRAP *pkt_wrap,
+                                 uint64_t offset,
+                                 const unsigned char *data, uint64_t data_len,
+                                 int fin);
+
+/*
+ * Copies the data from the stream storage to buffer `buf` of size `size`.
+ * `readbytes` is set to the number of bytes actually copied.
+ * `fin` is set to 1 if all the data from the stream were read so the
+ * stream is finished. It is set to 0 otherwise.
+ */
+int ossl_quic_rstream_read(QUIC_RSTREAM *qrs, unsigned char *buf, size_t size,
+                           size_t *readbytes, int *fin);
+
+/*
+ * Peeks at the data in the stream storage. It copies them to buffer `buf`
+ * of size `size` and sets `readbytes` to the number of bytes actually copied.
+ * `fin` is set to 1 if the copied data reach end of the stream.
+ * It is set to 0 otherwise.
+ */
+int ossl_quic_rstream_peek(QUIC_RSTREAM *qrs, unsigned char *buf, size_t size,
+                           size_t *readbytes, int *fin);
+
+/*
+ * Returns the size of the data available for reading. `fin` is set to 1 if
+ * after reading all the available data the stream will be finished,
+ * set to 0 otherwise.
+ */
+int ossl_quic_rstream_available(QUIC_RSTREAM *qrs, size_t *avail, int *fin);
 
 #endif
index 45440384e11a45b34f60fe16085346ace9b7d589..cc8c1fb5f5a1d767cd0132eb6d3200772978cf9a 100644 (file)
@@ -4,5 +4,5 @@ SOURCE[$LIBSSL]=quic_method.c quic_impl.c quic_wire.c quic_ackm.c quic_statm.c
 SOURCE[$LIBSSL]=cc_dummy.c quic_demux.c quic_record_rx.c
 SOURCE[$LIBSSL]=quic_record_tx.c quic_record_util.c quic_record_shared.c quic_wire_pkt.c
 SOURCE[$LIBSSL]=quic_record_rx_wrap.c quic_rx_depack.c
-SOURCE[$LIBSSL]=quic_fc.c uint_set.c quic_stream.c
+SOURCE[$LIBSSL]=quic_fc.c uint_set.c quic_sf_list.c quic_rstream.c quic_sstream.c
 SOURCE[$LIBSSL]=quic_cfq.c quic_txpim.c quic_fifd.c
index ee552e83c501daad0651639549ae137e4cff8876..636f6f8af4c8e00babb313200e5a00f2edd36759 100644 (file)
@@ -1109,9 +1109,11 @@ int ossl_qrx_read_pkt(OSSL_QRX *qrx, OSSL_QRX_PKT *pkt)
 
 void ossl_qrx_release_pkt(OSSL_QRX *qrx, void *handle)
 {
-    RXE *rxe = handle;
+    if (handle != NULL) {
+        RXE *rxe = handle;
 
-    qrx_recycle_rxe(qrx, rxe);
+        qrx_recycle_rxe(qrx, rxe);
+    }
 }
 
 uint64_t ossl_qrx_get_bytes_received(OSSL_QRX *qrx, int clear)
index 08641596395c44fb1788b4eec2098db58fac97df..1c98682377b1adfef3f1fb7b76371c62eaf19c48 100644 (file)
@@ -9,7 +9,7 @@
 
 #include "internal/cryptlib.h"
 #include "internal/refcount.h"
-#include "quic_record_rx_wrap.h"
+#include "internal/quic_record_rx_wrap.h"
 
 OSSL_QRX_PKT_WRAP *ossl_qrx_pkt_wrap_new(OSSL_QRX_PKT *pkt)
 {
diff --git a/ssl/quic/quic_rstream.c b/ssl/quic/quic_rstream.c
new file mode 100644 (file)
index 0000000..960f6ef
--- /dev/null
@@ -0,0 +1,110 @@
+/*
+* Copyright 2022 The OpenSSL Project Authors. All Rights Reserved.
+*
+* Licensed under the Apache License 2.0 (the "License").  You may not use
+* this file except in compliance with the License.  You can obtain a copy
+* in the file LICENSE in the source distribution or at
+* https://www.openssl.org/source/license.html
+*/
+#include "internal/common.h"
+#include "internal/quic_stream.h"
+#include "internal/quic_sf_list.h"
+
+struct quic_rstream_st {
+    SFRAME_LIST fl;
+};
+
+QUIC_RSTREAM *ossl_quic_rstream_new(OSSL_QRX *qrx)
+{
+    QUIC_RSTREAM *ret = OPENSSL_malloc(sizeof(*ret));
+
+    if (ret == NULL)
+        return NULL;
+
+    ossl_sframe_list_init(&ret->fl, qrx);
+    return ret;
+}
+
+void ossl_quic_rstream_free(QUIC_RSTREAM *qrs)
+{
+    ossl_sframe_list_destroy(&qrs->fl);
+    OPENSSL_free(qrs);
+}
+
+int ossl_quic_rstream_queue_data(QUIC_RSTREAM *qrs, OSSL_QRX_PKT_WRAP *pkt_wrap,
+                                 uint64_t offset,
+                                 const unsigned char *data, uint64_t data_len,
+                                 int fin)
+{
+    UINT_RANGE range;
+
+    range.start = offset;
+    range.end = offset + data_len;
+    return ossl_sframe_list_insert(&qrs->fl, &range, pkt_wrap, data, fin);
+}
+
+static int read_internal(QUIC_RSTREAM *qrs, unsigned char *buf, size_t size,
+                         size_t *readbytes, int *fin, int drop)
+{
+    void *iter = NULL;
+    UINT_RANGE range;
+    const unsigned char *data;
+    uint64_t offset = 0;
+    size_t readbytes_ = 0;
+    int fin_ = 0, ret = 1;
+
+    while (ossl_sframe_list_peek(&qrs->fl, &iter, &range, &data, &fin_)) {
+        size_t l = (size_t)(range.end - range.start);
+
+        if (l > size)
+            l = size;
+        memcpy(buf, data, l);
+        offset = range.start + l;
+        size -= l;
+        buf += l;
+        readbytes_ += l;
+        if (size == 0)
+            break;
+    }
+
+    if (drop && offset != 0) {
+        ret = ossl_sframe_list_drop_frames(&qrs->fl, offset);
+    }
+
+    if (ret) {
+        *readbytes = readbytes_;
+        *fin = fin_;
+    }
+
+    return ret;
+}
+
+int ossl_quic_rstream_read(QUIC_RSTREAM *qrs, unsigned char *buf, size_t size,
+                           size_t *readbytes, int *fin)
+{
+    return read_internal(qrs, buf, size, readbytes, fin, 1);
+}
+
+int ossl_quic_rstream_peek(QUIC_RSTREAM *qrs, unsigned char *buf, size_t size,
+                           size_t *readbytes, int *fin)
+{
+    return read_internal(qrs, buf, size, readbytes, fin, 0);
+}
+
+int ossl_quic_rstream_available(QUIC_RSTREAM *qrs, size_t *avail, int *fin)
+{
+    void *iter = NULL;
+    UINT_RANGE range;
+    const unsigned char *data;
+    uint64_t avail_ = 0;
+
+    while (ossl_sframe_list_peek(&qrs->fl, &iter, &range, &data, fin))
+        avail_ += range.end - range.start;
+
+#if SIZE_MAX < UINT64_MAX
+    *avail = avail_ > SIZE_MAX ? SIZE_MAX : (size_t)avail_;
+#else
+    *avail = (size_t)avail_;
+#endif
+    return 1;
+}
index 42fb8ff34caf92b3e1edb200dfc1d7459287ee65..12a7b9c39e91b90a56ea20590d28ba701081e789 100644 (file)
@@ -13,9 +13,9 @@
 #include "internal/quic_record_rx.h"
 #include "internal/quic_ackm.h"
 #include "internal/quic_rx_depack.h"
+#include "internal/quic_record_rx_wrap.h"
 #include "internal/sockets.h"
 
-#include "quic_record_rx_wrap.h"
 #include "quic_local.h"
 #include "../ssl_local.h"
 
diff --git a/ssl/quic/quic_sf_list.c b/ssl/quic/quic_sf_list.c
new file mode 100644 (file)
index 0000000..90d0c28
--- /dev/null
@@ -0,0 +1,262 @@
+/*
+ * Copyright 2022 The OpenSSL Project Authors. All Rights Reserved.
+ *
+ * Licensed under the Apache License 2.0 (the "License").  You may not use
+ * this file except in compliance with the License.  You can obtain a copy
+ * in the file LICENSE in the source distribution or at
+ * https://www.openssl.org/source/license.html
+ */
+
+#include "internal/uint_set.h"
+#include "internal/common.h"
+#include "internal/quic_record_rx_wrap.h"
+#include "internal/quic_sf_list.h"
+
+/*
+ * Stream frame list
+ * =================
+ *
+ * This data structure uses supports similar operations as uint64 set but
+ * it has slightly different invariants and also carries data associated with
+ * the ranges in the list.
+ *
+ * Operations:
+ *   Insert frame (optimized insertion at the beginning and at the end).
+ *   Iterated peek into the frame(s) from the beginning.
+ *   Dropping frames from the beginning up to an offset (exclusive).
+ *
+ * Invariant: The frames in the list are sorted by the start and end bounds.
+ * Invariant: There are no fully overlapping frames or frames that would
+ *            be fully encompassed by another frame in the list.
+ * Invariant: No frame has start > end.
+ * Invariant: The range start is inclusive the end is exclusive to be
+ *            able to mark an empty frame.
+ * Invariant: The offset never points further than into the first frame.
+ */
+
+struct stream_frame_st {
+    struct stream_frame_st *prev, *next;
+    UINT_RANGE range;
+    OSSL_QRX_PKT_WRAP *pkt;
+    const unsigned char *data;
+};
+
+static void stream_frame_free(SFRAME_LIST *fl, STREAM_FRAME *sf)
+{
+    ossl_qrx_pkt_wrap_free(fl->qrx, sf->pkt);
+    OPENSSL_free(sf);
+}
+
+static STREAM_FRAME *stream_frame_new(UINT_RANGE *range, OSSL_QRX_PKT_WRAP *pkt,
+                                      const unsigned char *data)
+{
+    STREAM_FRAME *sf = OPENSSL_zalloc(sizeof(*sf));
+
+    if (pkt != NULL && !ossl_qrx_pkt_wrap_up_ref(pkt)) {
+        OPENSSL_free(sf);
+        return NULL;
+    }
+
+    sf->range = *range;
+    sf->pkt = pkt;
+    sf->data = data;
+
+    return sf;
+}
+
+void ossl_sframe_list_init(SFRAME_LIST *fl, OSSL_QRX *qrx)
+{
+    memset(fl, 0, sizeof(*fl));
+    fl->qrx = qrx;
+}
+
+void ossl_sframe_list_destroy(SFRAME_LIST *fl)
+{
+    STREAM_FRAME *sf, *next_frame;
+
+    for (sf = fl->head; sf != NULL; sf = next_frame) {
+        next_frame = sf->next;
+        stream_frame_free(fl, sf);
+    }
+}
+
+static int append_frame(SFRAME_LIST *fl, UINT_RANGE *range,
+                        OSSL_QRX_PKT_WRAP *pkt,
+                        const unsigned char *data)
+{
+    STREAM_FRAME *new_frame;
+
+    if ((new_frame = stream_frame_new(range, pkt, data)) == NULL)
+        return 0;
+    new_frame->prev = fl->tail;
+    if (fl->tail != NULL)
+        fl->tail->next = new_frame;
+    fl->tail = new_frame;
+    ++fl->num_frames;
+    return 1;
+}
+
+int ossl_sframe_list_insert(SFRAME_LIST *fl, UINT_RANGE *range,
+                            OSSL_QRX_PKT_WRAP *pkt,
+                            const unsigned char *data, int fin)
+{
+    STREAM_FRAME *sf, *new_frame, *prev_frame, *next_frame;
+
+    /* nothing there yet */
+    if (fl->tail == NULL) {
+        if ((fin && fl->offset > range->end)
+            || (fl->fin && fl->offset < range->end)) {
+            /* TODO(QUIC): protocol violation: FINAL_SIZE_ERROR */
+            return 0;
+        }
+        fl->fin = fin || fl->fin;
+
+        if (fl->offset >= range->end)
+            return 1;
+
+        fl->tail = fl->head = stream_frame_new(range, pkt, data);
+        if (fl->tail == NULL)
+            return 0;
+        ++fl->num_frames;
+        return 1;
+    }
+
+    if ((fin && fl->tail->range.end > range->end)
+        || (fl->fin && fl->tail->range.end < range->end)) {
+        /* TODO(QUIC): protocol violation: FINAL_SIZE_ERROR */
+        return 0;
+    }
+    fl->fin = fin || fl->fin;
+
+    if (fl->offset >= range->end)
+        return 1;
+
+    /* TODO(QUIC): Check for fl->num_frames and start copying if too many */
+
+    /* optimize insertion at the end */
+    if (fl->tail->range.start < range->start) {
+        if (fl->tail->range.end >= range->end) {
+            return 1;
+        }
+        return append_frame(fl, range, pkt, data);
+    }
+
+    prev_frame = NULL;
+    for (sf = fl->head; sf != NULL && sf->range.start < range->start;
+         sf = sf->next)
+        prev_frame = sf;
+
+    if (prev_frame != NULL && prev_frame->range.end >= range->end) {
+        return 1;
+    }
+
+    if (sf == NULL)
+        return append_frame(fl, range, pkt, data);
+
+    /*
+     * Now we must create a new frame although in the end we might drop it,
+     * because we will be potentially dropping existing overlapping frames.
+     */
+    new_frame = stream_frame_new(range, pkt, data);
+    if (new_frame == NULL)
+        return 0;
+
+    for (next_frame = sf;
+         next_frame != NULL && next_frame->range.end <= range->end;) {
+        STREAM_FRAME *drop_frame = next_frame;
+
+        next_frame = next_frame->next;
+        if (next_frame != NULL)
+            next_frame->prev = drop_frame->prev;
+        if (prev_frame != NULL)
+            prev_frame->next = drop_frame->next;
+        if (fl->head == drop_frame)
+            fl->head = next_frame;
+        if (fl->tail == drop_frame)
+            fl->tail = prev_frame;
+        --fl->num_frames;
+        stream_frame_free(fl, drop_frame);
+    }
+    if (next_frame != NULL) {
+        /* check whether the new_frame is redundant because there is no gap */
+        if (prev_frame != NULL
+            && next_frame->range.start <= prev_frame->range.end) {
+            stream_frame_free(fl, new_frame);
+            return 1;
+        }
+        next_frame->prev = new_frame;
+    } else {
+        fl->tail = new_frame;
+    }
+    new_frame->next = next_frame;
+    new_frame->prev = prev_frame;
+    if (prev_frame != NULL)
+        prev_frame->next = new_frame;
+    else
+        fl->head = new_frame;
+    ++fl->num_frames;
+    return 1;
+}
+
+int ossl_sframe_list_peek(const SFRAME_LIST *fl, void **iter,
+                          UINT_RANGE *range, const unsigned char **data,
+                          int *fin)
+{
+    STREAM_FRAME *sf = *iter;
+    uint64_t start;
+
+    if (sf == NULL) {
+        start = fl->offset;
+        sf = fl->head;
+    } else {
+        start = sf->range.end;
+        sf = sf->next;
+    }
+
+    range->start = start;
+
+    if (sf == NULL || sf->range.start > start
+        || !ossl_assert(start < sf->range.end)) {
+        range->end = start;
+        *data = NULL;
+        *iter = NULL;
+        /* set fin only if we are at the end */
+        *fin = sf == NULL ? fl->fin : 0;
+        return 0;
+    }
+
+    range->end = sf->range.end;
+    *data = sf->data + (start - sf->range.start);
+    *fin = sf->next == NULL ? fl->fin : 0;
+    *iter = sf;
+    return 1;
+}
+
+int ossl_sframe_list_drop_frames(SFRAME_LIST *fl, uint64_t limit)
+{
+    STREAM_FRAME *sf;
+
+    /* offset cannot move back or past the data received */
+    if (!ossl_assert(limit >= fl->offset)
+        || !ossl_assert(fl->tail == NULL
+                        || limit <= fl->tail->range.end)
+        || !ossl_assert(fl->tail != NULL
+                        || limit == fl->offset))
+        return 0;
+
+    fl->offset = limit;
+
+    for (sf = fl->head; sf != NULL && sf->range.end <= limit;) {
+        STREAM_FRAME *drop_frame = sf;
+
+        sf = sf->next;
+        --fl->num_frames;
+        stream_frame_free(fl, drop_frame);
+    }
+    fl->head = sf;
+    if (sf != NULL)
+        sf->prev = NULL;
+    else
+        fl->tail = NULL;
+    return 1;
+}
index 3f0532b1ea9999a6498410ddff2228c97aeb5c9d..8f863966286b2e7cbda0cacfa8ba20513db5657f 100644 (file)
@@ -39,7 +39,7 @@ static const unsigned char data_1[] = {
     0x5a, 0x5b, 0x5c, 0x5d, 0x5e, 0x5f
 };
 
-static int test_simple(void)
+static int test_sstream_simple(void)
 {
     int testresult = 0;
     QUIC_SSTREAM *sstream = NULL;
@@ -199,12 +199,12 @@ static int test_simple(void)
         goto err;
 
     testresult = 1;
-err:
+ err:
     ossl_quic_sstream_free(sstream);
     return testresult;
 }
 
-static int test_bulk(int idx)
+static int test_sstream_bulk(int idx)
 {
     int testresult = 0;
     QUIC_SSTREAM *sstream = NULL;
@@ -312,7 +312,7 @@ static int test_bulk(int idx)
         goto err;
 
     testresult = 1;
-err:
+ err:
     OPENSSL_free(src_buf);
     OPENSSL_free(dst_buf);
     OPENSSL_free(ref_src_buf);
@@ -321,9 +321,163 @@ err:
     return testresult;
 }
 
+static const unsigned char simple_data[] =
+    "Hello world! And thank you for all the fish!";
+
+static int test_rstream_simple(void)
+{
+    QUIC_RSTREAM *rstream = NULL;
+    int ret = 0;
+    unsigned char buf[sizeof(simple_data)];
+    size_t readbytes = 0, avail = 0;
+    int fin = 0;
+
+    if (!TEST_ptr(rstream = ossl_quic_rstream_new(NULL)))
+        goto err;
+
+    if (!TEST_true(ossl_quic_rstream_queue_data(rstream, NULL, 5,
+                                                simple_data + 5, 10, 0))
+        || !TEST_true(ossl_quic_rstream_queue_data(rstream, NULL,
+                                                   sizeof(simple_data) - 1,
+                                                   simple_data + sizeof(simple_data) - 1,
+                                                   1, 1))
+        || !TEST_true(ossl_quic_rstream_peek(rstream, buf, sizeof(buf),
+                                             &readbytes, &fin))
+        || !TEST_false(fin)
+        || !TEST_size_t_eq(readbytes, 0)
+        || !TEST_true(ossl_quic_rstream_queue_data(rstream, NULL,
+                                                   sizeof(simple_data) - 10,
+                                                   simple_data + sizeof(simple_data) - 10,
+                                                   10, 1))
+        || !TEST_true(ossl_quic_rstream_queue_data(rstream, NULL, 0,
+                                                   simple_data, 1, 0))
+        || !TEST_true(ossl_quic_rstream_peek(rstream, buf, sizeof(buf),
+                                             &readbytes, &fin))
+        || !TEST_false(fin)
+        || !TEST_size_t_eq(readbytes, 1)
+        || !TEST_mem_eq(buf, 1, simple_data, 1)
+        || !TEST_true(ossl_quic_rstream_queue_data(rstream, NULL,
+                                                   0, simple_data,
+                                                   10, 0))
+        || !TEST_true(ossl_quic_rstream_peek(rstream, buf, sizeof(buf),
+                                             &readbytes, &fin))
+        || !TEST_false(fin)
+        || !TEST_size_t_eq(readbytes, 15)
+        || !TEST_mem_eq(buf, 15, simple_data, 15)
+        || !TEST_true(ossl_quic_rstream_queue_data(rstream, NULL,
+                                                   15,
+                                                   simple_data + 15,
+                                                   sizeof(simple_data) - 15, 1))
+        || !TEST_true(ossl_quic_rstream_available(rstream, &avail, &fin))
+        || !TEST_true(fin)
+        || !TEST_size_t_eq(avail, sizeof(simple_data))
+        || !TEST_true(ossl_quic_rstream_read(rstream, buf, 12,
+                                             &readbytes, &fin))
+        || !TEST_false(fin)
+        || !TEST_size_t_eq(readbytes, 12)
+        || !TEST_mem_eq(buf, 12, simple_data, 12)
+        || !TEST_true(ossl_quic_rstream_read(rstream, buf + 12, sizeof(buf) - 12,
+                                             &readbytes, &fin))
+        || !TEST_true(fin)
+        || !TEST_size_t_eq(readbytes, sizeof(buf) - 12)
+        || !TEST_mem_eq(buf, sizeof(buf), simple_data, sizeof(simple_data))
+        || !TEST_true(ossl_quic_rstream_read(rstream, buf, sizeof(buf),
+                                             &readbytes, &fin))
+        || !TEST_true(fin)
+        || !TEST_size_t_eq(readbytes, 0))
+        goto err;
+
+    ret = 1;
+
+ err:
+    ossl_quic_rstream_free(rstream);
+    return ret;
+}
+
+static int test_rstream_random(int idx)
+{
+    unsigned char *bulk_data = NULL;
+    unsigned char *read_buf = NULL;
+    QUIC_RSTREAM *rstream = NULL;
+    size_t i, read_off, queued_min;
+    const size_t data_size = 10000;
+    int r, s, fin;
+    int ret = 0;
+    size_t readbytes = 0;
+
+    if (!TEST_ptr(bulk_data = OPENSSL_malloc(data_size))
+        || !TEST_ptr(read_buf = OPENSSL_malloc(data_size))
+        || !TEST_ptr(rstream = ossl_quic_rstream_new(NULL)))
+        goto err;
+
+    for (i = 0; i < data_size; ++i)
+        bulk_data[i] = (unsigned char)(test_random() & 0xFF);
+
+    read_off = queued_min = 0;
+    for (r = 0; r < 100; ++r) {
+        for (s = 0; s < 10; ++s) {
+            size_t off = (r * 10 + s) * 10, size = 10;
+
+            if (test_random() % 5 == 0)
+                /* drop packet */
+                continue;
+
+            if (off <= queued_min && off + size > queued_min)
+                queued_min = off + size;
+
+            if (!TEST_true(ossl_quic_rstream_queue_data(rstream, NULL, off,
+                                                        bulk_data + off,
+                                                        size, 0)))
+                goto err;
+
+            if (test_random() % 5 != 0)
+               continue;
+
+            /* random overlapping retransmit */
+            off = read_off + test_random() % 50;
+            if (off > 50)
+                off -= 50;
+            size = test_random() % 100;
+            if (off + size > data_size)
+                off = data_size - size;
+            if (off <= queued_min && off + size > queued_min)
+                queued_min = off + size;
+
+            if (!TEST_true(ossl_quic_rstream_queue_data(rstream, NULL, off,
+                                                        bulk_data + off,
+                                                        size, 0)))
+                goto err;
+        }
+        if (!TEST_true(ossl_quic_rstream_read(rstream, read_buf,
+                                              data_size,
+                                              &readbytes, &fin)))
+            goto err;
+        if (!TEST_size_t_ge(readbytes, queued_min - read_off)
+            || !TEST_size_t_le(readbytes + read_off, data_size)
+            || !TEST_mem_eq(read_buf, readbytes, bulk_data + read_off,
+                            readbytes))
+            goto err;
+        read_off += readbytes;
+        queued_min = read_off;
+    }
+
+    TEST_info("Total read bytes: %zu", read_off);
+
+    ret = 1;
+
+ err:
+    OPENSSL_free(bulk_data);
+    OPENSSL_free(read_buf);
+    ossl_quic_rstream_free(rstream);
+    return ret;
+
+}
+
 int setup_tests(void)
 {
-    ADD_TEST(test_simple);
-    ADD_ALL_TESTS(test_bulk, 100);
+    ADD_TEST(test_sstream_simple);
+    ADD_ALL_TESTS(test_sstream_bulk, 100);
+    ADD_TEST(test_rstream_simple);
+    ADD_ALL_TESTS(test_rstream_random, 100);
     return 1;
 }