* Process any unprocessed RX'd datagrams, by calling registered callbacks by
* connection ID, reading more datagrams from the BIO if necessary.
*
- * Returns 1 on success or 0 on failure.
+ * Returns one of the following values:
+ *
+ * QUIC_DEMUX_PUMP_RES_OK
+ * At least one incoming datagram was processed.
+ *
+ * QUIC_DEMUX_PUMP_RES_TRANSIENT_FAIL
+ * No more incoming datagrams are currently available.
+ * Call again later.
+ *
+ * QUIC_DEMUX_PUMP_RES_PERMANENT_FAIL
+ * Either the network read BIO has failed in a non-transient fashion, or
+ * the QUIC implementation has encountered an internal state, assertion
+ * or allocation error. The caller should tear down the connection
+ * similarly to in the case of a protocol violation.
+ *
*/
+#define QUIC_DEMUX_PUMP_RES_OK 1
+#define QUIC_DEMUX_PUMP_RES_TRANSIENT_FAIL (-1)
+#define QUIC_DEMUX_PUMP_RES_PERMANENT_FAIL (-2)
+
int ossl_quic_demux_pump(QUIC_DEMUX *demux);
/*
#include "internal/quic_wire_pkt.h"
#include "internal/common.h"
#include <openssl/lhash.h>
+#include <openssl/err.h>
#define URXE_DEMUX_STATE_FREE 0 /* on urx_free list */
#define URXE_DEMUX_STATE_PENDING 1 /* on urx_pending list */
assert(urxe->demux_state == URXE_DEMUX_STATE_FREE);
if (demux->net_bio == NULL)
- return 0;
+ /*
+ * If no BIO is plugged in, treat this as no datagram being available.
+ */
+ return QUIC_DEMUX_PUMP_RES_TRANSIENT_FAIL;
/*
* Opportunistically receive as many messages as possible in a single
if (urxe == NULL) {
/* We need at least one URXE to receive into. */
if (!ossl_assert(i > 0))
- return 0;
+ return QUIC_DEMUX_PUMP_RES_PERMANENT_FAIL;
break;
}
urxe = demux_reserve_urxe(demux, urxe, demux->mtu);
if (urxe == NULL)
/* Allocation error, fail. */
- return 0;
+ return QUIC_DEMUX_PUMP_RES_PERMANENT_FAIL;
/* Ensure we zero any fields added to BIO_MSG at a later date. */
memset(&msg[i], 0, sizeof(BIO_MSG));
BIO_ADDR_clear(&urxe->local);
}
- if (!BIO_recvmmsg(demux->net_bio, msg, sizeof(BIO_MSG), i, 0, &rd))
- return 0;
+ ERR_set_mark();
+ if (!BIO_recvmmsg(demux->net_bio, msg, sizeof(BIO_MSG), i, 0, &rd)) {
+ if (BIO_err_is_non_fatal(ERR_peek_last_error())) {
+ /* Transient error, clear the error and stop. */
+ ERR_pop_to_mark();
+ return QUIC_DEMUX_PUMP_RES_TRANSIENT_FAIL;
+ } else {
+ /* Non-transient error, do not clear the error. */
+ ERR_clear_last_mark();
+ return QUIC_DEMUX_PUMP_RES_PERMANENT_FAIL;
+ }
+ }
+ ERR_clear_last_mark();
now = demux->now != NULL ? demux->now(demux->now_arg) : ossl_time_zero();
urxe = ossl_list_urxe_head(&demux->urx_free);
urxe->demux_state = URXE_DEMUX_STATE_PENDING;
}
- return 1;
+ return QUIC_DEMUX_PUMP_RES_OK;
}
/* Extract destination connection ID from the first packet in a datagram. */
if (ossl_list_urxe_head(&demux->urx_pending) == NULL) {
ret = demux_ensure_free_urxe(demux, DEMUX_MAX_MSGS_PER_CALL);
if (ret != 1)
- return 0;
+ return QUIC_DEMUX_PUMP_RES_PERMANENT_FAIL;
ret = demux_recv(demux);
- if (ret != 1)
- return 0;
+ if (ret != QUIC_DEMUX_PUMP_RES_OK)
+ return ret;
/*
* If demux_recv returned successfully, we should always have something.
assert(ossl_list_urxe_head(&demux->urx_pending) != NULL);
}
- return demux_process_pending_urxl(demux);
+ if (!demux_process_pending_urxl(demux))
+ return QUIC_DEMUX_PUMP_RES_PERMANENT_FAIL;
+
+ return QUIC_DEMUX_PUMP_RES_OK;
}
/* Artificially inject a packet into the demuxer for testing purposes. */