#include "internal/quic_record_tx.h"
#include "internal/quic_record_rx.h"
#include "internal/quic_record_rx_wrap.h"
+#include "internal/quic_fc.h"
+#include "internal/quic_statm.h"
/*
* QUIC Send Stream
* QUIC Receive Stream Manager
* ===========================
*
- * The QUIC Receive Stream Manager (QUIC_RSTREAM) is responsible for:
- *
+ * The QUIC Receive Stream Manager (QUIC_RSTREAM) is responsible for
+ * storing the received stream data frames until the application
+ * is able to read the data.
*
* The QUIC_RSTREAM is instantiated once for every stream that can receive data.
* (i.e., for a unidirectional receiving stream or for the receiving component
typedef struct quic_rstream_st QUIC_RSTREAM;
/*
- * Create a new instance of QUIC_RSTREAM.
+ * Create a new instance of QUIC_RSTREAM with pointers to the flow
+ * controller and statistics module. They can be NULL for unit testing.
+ * If they are non-NULL, the `rxfc` is called when receive stream data
+ * is queued or read by application. `statm` is queried for current rtt.
*/
-QUIC_RSTREAM *ossl_quic_rstream_new(OSSL_QRX *qrx);
+QUIC_RSTREAM *ossl_quic_rstream_new(OSSL_QRX *qrx, QUIC_RXFC *rxfc,
+ OSSL_STATM *statm);
/*
* Frees a QUIC_RSTREAM and any associated storage.
void ossl_quic_rstream_free(QUIC_RSTREAM *qrs);
/*
- *
+ * Adds received stream frame data to `qrs`. The `pkt_wrap` refcount is
+ * incremented if the `data` is queued directly without copying.
+ * It can be NULL for unit-testing purposes, i.e. if `data` is static or
+ * never released before calling ossl_quic_rstream_free().
+ * The `offset` is the absolute offset of the data in the stream.
+ * `data_len` can be 0 - can be useful for indicating `fin` for empty stream.
+ * Or to indicate `fin` without any further data added to the stream.
*/
int ossl_quic_rstream_queue_data(QUIC_RSTREAM *qrs, OSSL_QRX_PKT_WRAP *pkt_wrap,
* https://www.openssl.org/source/license.html
*/
#include "internal/common.h"
+#include "internal/time.h"
#include "internal/quic_stream.h"
#include "internal/quic_sf_list.h"
+#include "internal/quic_fc.h"
+#include "internal/quic_error.h"
struct quic_rstream_st {
SFRAME_LIST fl;
+ QUIC_RXFC *rxfc;
+ OSSL_STATM *statm;
};
-QUIC_RSTREAM *ossl_quic_rstream_new(OSSL_QRX *qrx)
+QUIC_RSTREAM *ossl_quic_rstream_new(OSSL_QRX *qrx, QUIC_RXFC *rxfc,
+ OSSL_STATM *statm)
{
QUIC_RSTREAM *ret = OPENSSL_malloc(sizeof(*ret));
return NULL;
ossl_sframe_list_init(&ret->fl, qrx);
+ ret->rxfc = rxfc;
+ ret->statm = statm;
return ret;
}
range.start = offset;
range.end = offset + data_len;
+
+ if (qrs->rxfc != NULL
+ && (!ossl_quic_rxfc_on_rx_stream_frame(qrs->rxfc, range.end, fin)
+ || ossl_quic_rxfc_get_error(qrs->rxfc, 0) != QUIC_ERR_NO_ERROR))
+ /* QUIC_ERR_FLOW_CONTROL_ERROR or QUIC_ERR_FINAL_SIZE detected */
+ return 0;
+
return ossl_sframe_list_insert(&qrs->fl, &range, pkt_wrap, data, fin);
}
break;
}
- if (drop && offset != 0) {
+ if (drop && offset != 0)
ret = ossl_sframe_list_drop_frames(&qrs->fl, offset);
- }
if (ret) {
*readbytes = readbytes_;
int ossl_quic_rstream_read(QUIC_RSTREAM *qrs, unsigned char *buf, size_t size,
size_t *readbytes, int *fin)
{
- return read_internal(qrs, buf, size, readbytes, fin, 1);
+ OSSL_TIME rtt;
+
+ if (qrs->statm != NULL) {
+ OSSL_RTT_INFO rtt_info;
+
+ ossl_statm_get_rtt_info(qrs->statm, &rtt_info);
+ rtt = rtt_info.smoothed_rtt;
+ } else {
+ rtt = ossl_time_zero();
+ }
+
+ if (!read_internal(qrs, buf, size, readbytes, fin, 1))
+ return 0;
+
+ if (qrs->rxfc != NULL
+ && !ossl_quic_rxfc_on_retire(qrs->rxfc, *readbytes, rtt))
+ return 0;
+
+ return 1;
}
int ossl_quic_rstream_peek(QUIC_RSTREAM *qrs, unsigned char *buf, size_t size,
const unsigned char *data, int fin)
{
STREAM_FRAME *sf, *new_frame, *prev_frame, *next_frame;
+#ifndef NDEBUG
+ uint64_t curr_end = fl->tail != NULL ? fl->tail->range.end
+ : fl->offset;
- /* nothing there yet */
- if (fl->tail == NULL) {
- if ((fin && fl->offset > range->end)
- || (fl->fin && fl->offset < range->end)) {
- /* TODO(QUIC): protocol violation: FINAL_SIZE_ERROR */
- return 0;
- }
- fl->fin = fin || fl->fin;
+ /* This check for FINAL_SIZE_ERROR is handled by QUIC FC already */
+ assert((!fin || curr_end <= range->end)
+ && (!fl->fin || curr_end >= range->end));
+#endif
- if (fl->offset >= range->end)
- return 1;
+ if (fl->offset >= range->end)
+ goto end;
+ /* nothing there yet */
+ if (fl->tail == NULL) {
fl->tail = fl->head = stream_frame_new(range, pkt, data);
if (fl->tail == NULL)
return 0;
- ++fl->num_frames;
- return 1;
- }
- if ((fin && fl->tail->range.end > range->end)
- || (fl->fin && fl->tail->range.end < range->end)) {
- /* TODO(QUIC): protocol violation: FINAL_SIZE_ERROR */
- return 0;
+ ++fl->num_frames;
+ goto end;
}
- fl->fin = fin || fl->fin;
-
- if (fl->offset >= range->end)
- return 1;
/* TODO(QUIC): Check for fl->num_frames and start copying if too many */
/* optimize insertion at the end */
if (fl->tail->range.start < range->start) {
- if (fl->tail->range.end >= range->end) {
- return 1;
- }
+ if (fl->tail->range.end >= range->end)
+ goto end;
+
return append_frame(fl, range, pkt, data);
}
sf = sf->next)
prev_frame = sf;
- if (prev_frame != NULL && prev_frame->range.end >= range->end) {
- return 1;
- }
+ if (!ossl_assert(sf != NULL))
+ /* frame list invariant broken */
+ return 0;
- if (sf == NULL)
- return append_frame(fl, range, pkt, data);
+ if (prev_frame != NULL && prev_frame->range.end >= range->end)
+ goto end;
/*
* Now we must create a new frame although in the end we might drop it,
--fl->num_frames;
stream_frame_free(fl, drop_frame);
}
+
if (next_frame != NULL) {
/* check whether the new_frame is redundant because there is no gap */
if (prev_frame != NULL
&& next_frame->range.start <= prev_frame->range.end) {
stream_frame_free(fl, new_frame);
- return 1;
+ goto end;
}
next_frame->prev = new_frame;
} else {
fl->tail = new_frame;
}
+
new_frame->next = next_frame;
new_frame->prev = prev_frame;
+
if (prev_frame != NULL)
prev_frame->next = new_frame;
else
fl->head = new_frame;
+
++fl->num_frames;
+
+ end:
+ fl->fin = fin || fl->fin;
+
return 1;
}
stream_frame_free(fl, drop_frame);
}
fl->head = sf;
+
if (sf != NULL)
sf->prev = NULL;
else
fl->tail = NULL;
+
return 1;
}
size_t readbytes = 0, avail = 0;
int fin = 0;
- if (!TEST_ptr(rstream = ossl_quic_rstream_new(NULL)))
+ if (!TEST_ptr(rstream = ossl_quic_rstream_new(NULL, NULL, NULL)))
goto err;
if (!TEST_true(ossl_quic_rstream_queue_data(rstream, NULL, 5,
if (!TEST_ptr(bulk_data = OPENSSL_malloc(data_size))
|| !TEST_ptr(read_buf = OPENSSL_malloc(data_size))
- || !TEST_ptr(rstream = ossl_quic_rstream_new(NULL)))
+ || !TEST_ptr(rstream = ossl_quic_rstream_new(NULL, NULL, NULL)))
goto err;
for (i = 0; i < data_size; ++i)