#include "system/network.h"
#include "system/filesys.h"
#include "system/time.h"
+#include "lib/util/dlinklist.h"
+#include "lib/util/time_basic.h"
#include "lib/util/util_file.h"
#include "lib/util/tevent_ntstatus.h"
#include "../util/tevent_unix.h"
#include "lib/param/param.h"
#include <gnutls/gnutls.h>
+#include <gnutls/crypto.h>
#include <gnutls/x509.h>
#include "lib/crypto/gnutls_helpers.h"
#include <netinet/quic.h>
#endif
+#ifdef HAVE_LIBNGTCP2
+#include <ngtcp2/ngtcp2.h>
+#include <ngtcp2/ngtcp2_crypto.h>
+#include <ngtcp2/ngtcp2_crypto_gnutls.h>
+#endif
+
#define DH_BITS 2048
const char *tls_verify_peer_string(enum tls_verify_peer_state verify_peer)
TALLOC_FREE(frame);
return status;
}
+
+#ifdef HAVE_LIBNGTCP2
+
+static const struct tstream_context_ops tstream_ngtcp2_ops;
+
+struct tstream_ngtcp2_buffer {
+ struct tstream_ngtcp2_buffer *prev, *next;
+ uint64_t offset;
+ size_t length;
+ uint8_t buffer[NGTCP2_MAX_PMTUD_UDP_PAYLOAD_SIZE];
+};
+
+struct tstream_ngtcp2 {
+ struct tdgram_context *plain_dgram;
+ int error;
+
+ ngtcp2_tstamp last_expire;
+ ngtcp2_crypto_conn_ref conn_ref;
+ ngtcp2_conn *conn;
+ int64_t stream_id;
+ struct samba_sockaddr laddr;
+ struct samba_sockaddr raddr;
+ ngtcp2_path path;
+
+ struct tevent_context *current_ev;
+
+ struct tevent_immediate *retry_im;
+ uint32_t keepalive_usecs;
+ struct tevent_timer *keepalive_timer;
+
+ struct {
+ struct tstream_tls *tlss;
+ struct tevent_req *req;
+ bool done;
+ } handshake;
+
+ struct {
+ struct tstream_ngtcp2_buffer b;
+ struct tevent_req *subreq;
+ uint64_t blocked;
+ } push;
+
+ struct {
+ struct tstream_ngtcp2_buffer b;
+ struct tevent_req *subreq;
+ } pull;
+
+ struct {
+ struct tstream_ngtcp2_buffer *pushed;
+ uint64_t pushed_offset;
+ struct tstream_ngtcp2_buffer *pending;
+ struct tevent_req *req;
+ } writev;
+
+ struct {
+ struct tstream_ngtcp2_buffer *buffers;
+ struct tevent_req *req;
+ } readv;
+
+ struct {
+ struct tevent_req *req;
+ } disconnect;
+
+ struct {
+ struct tevent_req *req;
+ } monitor;
+};
+
+static void tstream_ngtcp2_close_stream(struct tstream_ngtcp2 *si);
+static void tstream_ngtcp2_retry_handshake(struct tstream_context *stream);
+static void tstream_ngtcp2_timer_start(struct tstream_context *stream);
+static void tstream_ngtcp2_timer_handler(struct tevent_context *ev,
+ struct tevent_timer *te,
+ struct timeval current_time,
+ void *private_data);
+static void tstream_ngtcp2_sendto_start(struct tstream_context *stream);
+static void tstream_ngtcp2_sendto_done(struct tevent_req *subreq);
+static void tstream_ngtcp2_recvfrom_start(struct tstream_context *stream);
+static void tstream_ngtcp2_recvfrom_done(struct tevent_req *subreq);
+static void tstream_ngtcp2_readv_retry(struct tstream_context *stream);
+static void tstream_ngtcp2_writev_retry(struct tstream_context *stream);
+static void tstream_ngtcp2_monitor_retry(struct tstream_context *stream);
+static void tstream_ngtcp2_deferred_retry(struct tstream_context *stream);
+
+static ngtcp2_conn *qwrap_ngtcp2_conn_ref_get_conn(ngtcp2_crypto_conn_ref *conn_ref)
+{
+ struct tstream_context *stream =
+ talloc_get_type_abort(conn_ref->user_data,
+ struct tstream_context);
+ struct tstream_ngtcp2 *si =
+ tstream_context_data(stream,
+ struct tstream_ngtcp2);
+
+ return si->conn;
+}
+
+static bool tstream_ngtcp2_verbose;
+
+static void tstream_ngtcp2_log_printf(void *user_data, const char *fmt, ...) PRINTF_ATTRIBUTE(2, 3);
+static void tstream_ngtcp2_log_printf(void *user_data, const char *fmt, ...)
+{
+ if (tstream_ngtcp2_verbose) {
+ char buffer[1024];
+ va_list ap;
+ struct timespec ts = timespec_current();
+ struct timeval_buf tsbuf;
+
+ va_start(ap, fmt);
+ vsnprintf(buffer, sizeof(buffer), fmt, ap);
+ va_end(ap);
+
+ D_ERR("NGTCP2:%s: %s\n",
+ timespec_string_buf(&ts, true, &tsbuf),
+ buffer);
+ }
+}
+
+static void tstream_ngtcp2_qlog_write_cb(void *user_data, uint32_t flags,
+ const void *data, size_t datalen)
+{
+ if (tstream_ngtcp2_verbose) {
+ struct timespec ts = timespec_current();
+ struct timeval_buf tsbuf;
+
+ D_ERR("NGTCP2:%s: flags[%"PRIu32"] len[%zu] %*.*s\n",
+ timespec_string_buf(&ts, true, &tsbuf),
+ flags, datalen,
+ (int)datalen, (int)datalen, (const char *)data);
+ }
+}
+
+static inline ngtcp2_tstamp timespec2ngtcp2_tstamp(struct timespec ts)
+{
+ return (uint64_t)ts.tv_sec * NGTCP2_SECONDS + (uint64_t)ts.tv_nsec;
+}
+
+static inline struct timespec ngtcp2_tstamp2timespec(ngtcp2_tstamp _ts,
+ ngtcp2_duration rtc_offset)
+{
+ ngtcp2_tstamp ts = _ts + rtc_offset;
+ return (struct timespec) {
+ .tv_sec = ts / NGTCP2_SECONDS,
+ .tv_nsec = ts % NGTCP2_SECONDS,
+ };
+}
+
+static inline struct timeval timespec2timeval(struct timespec ts)
+{
+ return (struct timeval) {
+ .tv_sec = ts.tv_sec,
+ .tv_usec = ts.tv_nsec / 1000,
+ };
+}
+
+static ngtcp2_tstamp _tstream_ngtcp2_timestamp(ngtcp2_duration *_rtc_offsetp,
+ const char *func,
+ unsigned line)
+{
+ struct timespec ts_rtc = { .tv_sec = 0, };
+ struct timespec ts_mono;
+ ngtcp2_tstamp ret_rtc = 0;
+ ngtcp2_tstamp ret_mono;
+ bool need_rtc_offset = false;
+ ngtcp2_duration rtc_offset = 0;
+
+ if (_rtc_offsetp != NULL || CHECK_DEBUGLVL(DBGLVL_DEBUG)) {
+ need_rtc_offset = true;
+ }
+
+ if (need_rtc_offset) {
+ ts_rtc = timespec_current();
+ ret_rtc = timespec2ngtcp2_tstamp(ts_rtc);
+ }
+
+ clock_gettime_mono(&ts_mono);
+ ret_mono = timespec2ngtcp2_tstamp(ts_mono);
+
+ if (need_rtc_offset) {
+ rtc_offset = ret_rtc - ret_mono;
+ }
+
+ if (CHECK_DEBUGLVL(DBGLVL_DEBUG)) {
+ struct timeval_buf rtc_buf;
+
+ DBG_DEBUG("%s:%s:%u: rtc_offset=%"PRIu64" stamp=%"PRIu64"\n",
+ timespec_string_buf(&ts_rtc, true, &rtc_buf),
+ func, line, rtc_offset, ret_mono);
+ }
+
+ if (_rtc_offsetp != NULL) {
+ *_rtc_offsetp = rtc_offset;
+ }
+
+ return ret_mono;
+}
+#define tstream_ngtcp2_timestamp(__rtc_offsetp) \
+ _tstream_ngtcp2_timestamp(__rtc_offsetp, __func__, __LINE__)
+
+static int tstream_ngtcp2_recv_stream_data_cb(ngtcp2_conn *conn, uint32_t flags,
+ int64_t stream_id, uint64_t offset,
+ const uint8_t *data, size_t datalen,
+ void *user_data, void *stream_user_data)
+{
+ struct tstream_context *stream =
+ talloc_get_type_abort(user_data,
+ struct tstream_context);
+ struct tstream_ngtcp2 *si =
+ tstream_context_data(stream,
+ struct tstream_ngtcp2);
+ struct tstream_ngtcp2_buffer *cbuf = NULL;
+
+ DBG_DEBUG("Called stream_id[%"PRIi64"] "
+ "offset[%"PRIu64"] datalen[%"PRIu64"]\n",
+ stream_id, offset, datalen);
+
+ if (si->stream_id != stream_id) {
+ return NGTCP2_ERR_STREAM_NOT_FOUND;
+ }
+
+next_buf:
+ cbuf = talloc(si, struct tstream_ngtcp2_buffer);
+ if (cbuf == NULL) {
+ return NGTCP2_ERR_NOMEM;
+ }
+ cbuf->prev = cbuf->next = NULL;
+ cbuf->offset = 0;
+ cbuf->length = MIN(ARRAY_SIZE(cbuf->buffer), datalen);
+ memcpy(cbuf->buffer, data, cbuf->length);
+ DLIST_ADD_END(si->readv.buffers, cbuf);
+
+ data += cbuf->length;
+ datalen -= cbuf->length;
+ if (datalen > 0) {
+ goto next_buf;
+ }
+
+ return 0;
+}
+
+static int tstream_ngtcp2_acked_stream_data_offset_cb(ngtcp2_conn *conn,
+ int64_t stream_id,
+ uint64_t offset,
+ uint64_t datalen,
+ void *user_data,
+ void *stream_user_data)
+{
+ struct tstream_context *stream =
+ talloc_get_type_abort(user_data,
+ struct tstream_context);
+ struct tstream_ngtcp2 *si =
+ tstream_context_data(stream,
+ struct tstream_ngtcp2);
+ struct tstream_ngtcp2_buffer *cbuf = NULL;
+ struct tstream_ngtcp2_buffer *cnext = NULL;
+
+ DBG_DEBUG("Called stream_id[%"PRIi64"] "
+ "offset[%"PRIu64"] datalen[%"PRIu64"]\n",
+ stream_id, offset, datalen);
+
+ if (si->stream_id != stream_id) {
+ return NGTCP2_ERR_STREAM_NOT_FOUND;
+ }
+
+ for (cbuf = si->writev.pushed; cbuf != NULL; cbuf = cnext) {
+ cnext = cbuf->next;
+
+ if (cbuf->offset != offset) {
+ continue;
+ }
+ if (cbuf->length != datalen) {
+ continue;
+ }
+
+ DBG_DEBUG("REMOVE pushed[%"PRIu64"][%zd]\n",
+ cbuf->offset, cbuf->length);
+ DLIST_REMOVE(si->writev.pushed, cbuf);
+ TALLOC_FREE(cbuf);
+ }
+
+ DBG_DEBUG("SI stream_id[%"PRIi64"] "
+ "offset[%"PRIu64"] pushed[%"PRIu64"][%zd]\n",
+ si->stream_id, si->writev.pushed_offset,
+ si->writev.pushed ? si->writev.pushed->offset : 0,
+ si->writev.pushed ? si->writev.pushed->length : -1);
+
+ return 0;
+}
+
+static int tstream_ngtcp2_stream_close_cb(ngtcp2_conn *conn,
+ uint32_t flags,
+ int64_t stream_id,
+ uint64_t app_error_code,
+ void *user_data,
+ void *stream_user_data)
+{
+ struct tstream_context *stream =
+ talloc_get_type_abort(user_data,
+ struct tstream_context);
+ struct tstream_ngtcp2 *si =
+ tstream_context_data(stream,
+ struct tstream_ngtcp2);
+
+ DBG_WARNING("Called stream_id[%"PRIi64"] "
+ "flags[0x%x] app_error_code[%"PRIu64"]\n",
+ stream_id, flags, app_error_code);
+
+ if (si->stream_id != stream_id) {
+ return NGTCP2_ERR_STREAM_NOT_FOUND;
+ }
+
+ return 0;
+}
+
+static int tstream_ngtcp2_recv_stateless_reset_cb(ngtcp2_conn *conn,
+ const ngtcp2_pkt_stateless_reset *sr,
+ void *user_data)
+{
+ struct tstream_context *stream =
+ talloc_get_type_abort(user_data,
+ struct tstream_context);
+ struct tstream_ngtcp2 *si =
+ tstream_context_data(stream,
+ struct tstream_ngtcp2);
+
+ DBG_WARNING("Called stream_id[%"PRIi64"]\n",
+ si->stream_id);
+
+ return 0;
+}
+
+static void tstream_ngtcp2_rand_cb(uint8_t *dest, size_t destlen,
+ const ngtcp2_rand_ctx *rand_ctx)
+{
+ gnutls_rnd(GNUTLS_RND_RANDOM, dest, destlen);
+ return;
+}
+
+static int tstream_ngtcp2_get_new_connection_id_cb(ngtcp2_conn *conn,
+ ngtcp2_cid *cid,
+ uint8_t *token,
+ size_t cidlen,
+ void *user_data)
+{
+ int ret;
+
+ ret = gnutls_rnd(GNUTLS_RND_RANDOM, cid->data, cidlen);
+ if (ret != 0) {
+ return NGTCP2_ERR_CALLBACK_FAILURE;
+ }
+
+ cid->datalen = cidlen;
+
+ ret = gnutls_rnd(GNUTLS_RND_RANDOM, token,
+ NGTCP2_STATELESS_RESET_TOKENLEN);
+ if (ret != 0) {
+ return NGTCP2_ERR_CALLBACK_FAILURE;
+ }
+
+ return 0;
+}
+
+static int tstream_ngtcp2_stream_reset_cb(ngtcp2_conn *conn,
+ int64_t stream_id,
+ uint64_t final_size,
+ uint64_t app_error_code,
+ void *user_data,
+ void *stream_user_data)
+{
+ struct tstream_context *stream =
+ talloc_get_type_abort(user_data,
+ struct tstream_context);
+ struct tstream_ngtcp2 *si =
+ tstream_context_data(stream,
+ struct tstream_ngtcp2);
+
+ DBG_WARNING("Called stream_id[%"PRIi64"] "
+ "final_size[%"PRIu64"] app_error_code[%"PRIu64"]\n",
+ stream_id, final_size, app_error_code);
+
+ if (si->stream_id != stream_id) {
+ return NGTCP2_ERR_STREAM_NOT_FOUND;
+ }
+
+ return 0;
+}
+
+static int tstream_ngtcp2_destructor(struct tstream_ngtcp2 *si)
+{
+ /*
+ * We want tevent_req_poll()
+ * to return without any blocking.
+ *
+ * So we use tevent_context_set_wait_timeout(0)
+ * that will cause tevent_loop_once() in
+ * tevent_req_poll() to break with ENODATA.
+ */
+ si->current_ev = samba_tevent_context_init(NULL);
+ if (si->current_ev != NULL) {
+ tevent_context_set_wait_timeout(si->current_ev, 0);
+ }
+ tevent_reset_immediate(si->retry_im);
+ tstream_ngtcp2_close_stream(si);
+ if (si->push.subreq != NULL) {
+ /*
+ * We don't care about any success,
+ * we just want to send out the disconnect
+ * message if possible without blocking,
+ * using tevent_context_set_wait_timeout(0).
+ */
+ tevent_req_poll(si->push.subreq, si->current_ev);
+ TALLOC_FREE(si->push.subreq);
+ }
+ TALLOC_FREE(si->current_ev);
+
+ return 0;
+}
+
+static void tstream_ngtcp2_close_stream(struct tstream_ngtcp2 *si)
+{
+ struct tstream_ngtcp2_buffer *cbuf = NULL;
+ struct tstream_ngtcp2_buffer *cnext = NULL;
+ ngtcp2_ccerr ccerr;
+ ngtcp2_ssize ret;
+
+ if (si->conn == NULL) {
+ return;
+ }
+
+ si->error = ECONNABORTED;
+
+ TALLOC_FREE(si->keepalive_timer);
+ TALLOC_FREE(si->pull.subreq);
+ TALLOC_FREE(si->push.subreq);
+
+ for (cbuf = si->writev.pushed; cbuf != NULL; cbuf = cnext) {
+ cnext = cbuf->next;
+ DLIST_REMOVE(si->writev.pushed, cbuf);
+ TALLOC_FREE(cbuf);
+ }
+
+ for (cbuf = si->writev.pending; cbuf != NULL; cbuf = cnext) {
+ cnext = cbuf->next;
+ DLIST_REMOVE(si->writev.pending, cbuf);
+ TALLOC_FREE(cbuf);
+ }
+
+ for (cbuf = si->readv.buffers; cbuf != NULL; cbuf = cnext) {
+ cnext = cbuf->next;
+ DLIST_REMOVE(si->readv.buffers, cbuf);
+ TALLOC_FREE(cbuf);
+ }
+
+ if (si->disconnect.req != NULL) {
+ tevent_req_received(si->disconnect.req);
+ si->disconnect.req = NULL;
+ }
+
+ if (si->writev.req != NULL) {
+ tevent_req_received(si->writev.req);
+ si->writev.req = NULL;
+ }
+
+ if (si->readv.req != NULL) {
+ tevent_req_received(si->readv.req);
+ si->readv.req = NULL;
+ }
+
+ if (si->monitor.req != NULL) {
+ tevent_req_received(si->monitor.req);
+ si->monitor.req = NULL;
+ }
+
+ ngtcp2_ccerr_default(&ccerr);
+ ret = ngtcp2_conn_write_connection_close(si->conn,
+ &si->path,
+ NULL,
+ si->push.b.buffer,
+ sizeof(si->push.b.buffer),
+ &ccerr,
+ tstream_ngtcp2_timestamp(NULL));
+ ngtcp2_conn_del(si->conn);
+ si->conn = NULL;
+
+ if (ret <= 0) {
+ return;
+ }
+
+ if (si->current_ev == NULL) {
+ return;
+ }
+
+ si->push.b.length = ret;
+ si->push.subreq = tdgram_sendto_send(si,
+ si->current_ev,
+ si->plain_dgram,
+ si->push.b.buffer,
+ si->push.b.length,
+ NULL);
+ if (si->push.subreq == NULL) {
+ return;
+ }
+
+ /*
+ * We don't call tevent_req_set_callback()
+ * here as we don't care about the
+ * result by default.
+ *
+ * We only care in tstream_ngtcp2_disconnect_send()
+ * so it's called there.
+ */
+ return;
+}
+
+static void tstream_ngtcp2_retry_handshake(struct tstream_context *stream)
+{
+ struct tstream_ngtcp2 *si =
+ tstream_context_data(stream,
+ struct tstream_ngtcp2);
+ struct tevent_req *req = si->handshake.req;
+ NTSTATUS status;
+ ssize_t ret;
+
+ si->handshake.req = NULL;
+
+ if (si->error != 0) {
+ DBG_WARNING("si->error[%d] \n", si->error);
+ tevent_req_error(req, si->error);
+ return;
+ }
+
+ if (si->handshake.done) {
+ si->error = EINVAL;
+ DBG_WARNING("si->error[%d] \n", si->error);
+ tevent_req_error(req, si->error);
+ return;
+ }
+
+ si->handshake.done = ngtcp2_conn_get_handshake_completed(si->conn);
+ if (si->handshake.done) {
+ goto verify;
+ }
+
+ si->handshake.req = req;
+ tstream_ngtcp2_sendto_start(stream);
+ tstream_ngtcp2_recvfrom_start(stream);
+ return;
+
+verify:
+ status = tstream_tls_verify_peer(si->handshake.tlss);
+ if (NT_STATUS_EQUAL(status, NT_STATUS_IMAGE_CERT_REVOKED)) {
+ si->error = EINVAL;
+ DBG_WARNING("si->error[%d] \n", si->error);
+ tevent_req_error(req, si->error);
+ return;
+ }
+ if (!NT_STATUS_IS_OK(status)) {
+ si->error = EIO;
+ DBG_WARNING("si->error[%d] \n", si->error);
+ tevent_req_error(req, si->error);
+ return;
+ }
+
+ ret = ngtcp2_conn_open_bidi_stream(si->conn,
+ &si->stream_id,
+ si);
+ if (ret != 0) {
+ si->error = EIO;
+ DBG_WARNING("si->error[%d] \n", si->error);
+ tevent_req_error(req, si->error);
+ return;
+ }
+
+ /*
+ * We don't expect incoming messages for
+ * this handshake anymore.
+ */
+ TALLOC_FREE(si->pull.subreq);
+ if (si->push.subreq != NULL) {
+ /*
+ * But we need to wait until we flushed all
+ * pending messages to the kernel socket.
+ */
+ si->handshake.req = req;
+ return;
+ }
+
+ tevent_req_done(req);
+}
+
+static void tstream_ngtcp2_timer_start(struct tstream_context *stream)
+{
+ struct tstream_ngtcp2 *si =
+ tstream_context_data(stream,
+ struct tstream_ngtcp2);
+ ngtcp2_tstamp expire = ngtcp2_conn_get_expiry(si->conn);
+ struct timespec ts_expire = { .tv_sec = 0, };
+ struct timeval tv_expire = { .tv_sec = 0, };
+ struct timeval_buf expire_buf;
+ ngtcp2_duration rtc_offset = 0;
+ ngtcp2_tstamp now = tstream_ngtcp2_timestamp(&rtc_offset);
+ struct timespec ts_now = { .tv_sec = 0, };
+ struct timeval tv_now = { .tv_sec = 0, };
+ struct timeval_buf now_buf;
+ int64_t diff = 0;
+ bool was_last = (si->last_expire == expire);
+
+ ts_expire = ngtcp2_tstamp2timespec(expire, rtc_offset);
+ tv_expire = timespec2timeval(ts_expire);
+ ts_now = ngtcp2_tstamp2timespec(now, rtc_offset);
+ tv_now = timespec2timeval(ts_now);
+
+ DBG_DEBUG("\nNOW: %s\nEXP: %s\n",
+ timespec_string_buf(&ts_now, true, &now_buf),
+ timespec_string_buf(&ts_expire, true, &expire_buf));
+
+ diff = expire - now;
+
+ DBG_DEBUG("si->last_expire[%"PRIu64"] %c= expire[%"PRIu64"] "
+ "now[%"PRIu64"] diff[%"PRId64"]\n",
+ si->last_expire,
+ was_last ? '=' : '!',
+ expire,
+ now,
+ diff);
+
+ if (!was_last) {
+ si->last_expire = expire;
+ }
+
+ if (diff <= 0) {
+ /*
+ * already expired
+ *
+ * If we got the same value from
+ * ngtcp2_conn_get_expiry() as the
+ * last time we should avoid cpu spinning,
+ * so we always wait a keepalive cycle.
+ *
+ * Otherwise we want the timer to fire directly.
+ */
+ if (was_last) {
+ tv_expire = timeval_add(&tv_now, 0, si->keepalive_usecs);
+ } else {
+ tv_expire = (struct timeval) { .tv_sec = 0, };
+ }
+ }
+
+ /*
+ * If we need to push out pending data from the caller
+ * and didn't hit a blocking state from
+ * ngtcp2_conn_writev_stream(), we want fire the timer
+ * directly.
+ */
+ if (si->writev.pending != NULL && si->push.blocked == 0) {
+ tv_expire = (struct timeval) { .tv_sec = 0, };
+ }
+
+ DBG_DEBUG("NEW-TIMER:\nnow: %s\nexp: %s\n",
+ timeval_str_buf(&tv_now, false, true, &now_buf),
+ timeval_str_buf(&tv_expire, false, true, &expire_buf));
+
+ TALLOC_FREE(si->keepalive_timer);
+ si->keepalive_timer = tevent_add_timer(si->current_ev,
+ si,
+ tv_expire,
+ tstream_ngtcp2_timer_handler,
+ stream);
+ if (si->keepalive_timer == NULL) {
+ si->error = ENOMEM;
+ DBG_WARNING("si->error[%d] \n", si->error);
+ tstream_ngtcp2_deferred_retry(stream);
+ return;
+ }
+}
+
+static void tstream_ngtcp2_timer_handler(struct tevent_context *ev,
+ struct tevent_timer *te,
+ struct timeval current_time,
+ void *private_data)
+{
+ struct tstream_context *stream =
+ talloc_get_type_abort(private_data,
+ struct tstream_context);
+ struct tstream_ngtcp2 *si =
+ tstream_context_data(stream,
+ struct tstream_ngtcp2);
+
+ TALLOC_FREE(si->keepalive_timer);
+
+ DBG_DEBUG("tstream_ngtcp2_sendto_start...\n");
+ tstream_ngtcp2_sendto_start(stream);
+}
+
+static void tstream_ngtcp2_sendto_start(struct tstream_context *stream)
+{
+ struct tstream_ngtcp2 *si =
+ tstream_context_data(stream,
+ struct tstream_ngtcp2);
+ struct tevent_req *req = si->writev.req;
+ struct tstream_ngtcp2_buffer *cbuf = NULL;
+ ngtcp2_ssize nwritten = -1;
+ ngtcp2_vec _datav[1] = {{}};
+ ngtcp2_ssize *pnwritten = NULL;
+ int64_t stream_id = -1;
+ ngtcp2_vec *datav = NULL;
+ size_t datavcnt = 0;
+ uint32_t sflags = NGTCP2_WRITE_STREAM_FLAG_NONE;
+ ssize_t ret;
+ size_t dbytes = 0;
+
+ if (si->error != 0) {
+ DBG_WARNING("si->error[%d] \n", si->error);
+ tevent_req_error(req, si->error);
+ return;
+ }
+
+ if (si->push.subreq != NULL) {
+ DBG_DEBUG("ALREADY...\n");
+ return;
+ }
+
+ if (si->push.b.length != 0) {
+ DBG_DEBUG("DIRECTLY(%zu)...\n", si->push.b.length);
+ goto send_directly;
+ }
+
+write_more:
+ cbuf = si->writev.pending;
+ if (cbuf != NULL) {
+ _datav[0].base = cbuf->buffer + cbuf->offset;
+ _datav[0].len = cbuf->length - cbuf->offset;
+ dbytes = _datav[0].len;
+ datav = _datav;
+ datavcnt = ARRAY_SIZE(_datav);
+ pnwritten = &nwritten;
+ stream_id = si->stream_id;
+ if (cbuf->next != NULL) {
+ sflags |= NGTCP2_WRITE_STREAM_FLAG_MORE;
+ } else {
+ sflags &= ~NGTCP2_WRITE_STREAM_FLAG_MORE;
+ }
+ }
+
+ ret = ngtcp2_conn_writev_stream(si->conn,
+ &si->path,
+ NULL,
+ si->push.b.buffer,
+ sizeof(si->push.b.buffer),
+ pnwritten,
+ sflags,
+ stream_id,
+ datav,
+ datavcnt,
+ tstream_ngtcp2_timestamp(NULL));
+
+ DBG_DEBUG("sid[%"PRIi64"] "
+ "ngtcp2_conn_writev_stream ret[%zd] %s "
+ "dbytes[%zu] nwritten[%zd]\n",
+ si->stream_id,
+ ret, ngtcp2_strerror(ret),
+ dbytes, nwritten);
+
+ if (ret == 0 || ret == NGTCP2_ERR_STREAM_DATA_BLOCKED) {
+ if (dbytes != 0) {
+ /*
+ * The congestion windows is full
+ * we need to stop send and wait
+ * for incoming messages.
+ *
+ * We still call tstream_ngtcp2_timer_start()
+ * but that will see si->push.blocked.
+ */
+ si->push.blocked += 1;
+ tstream_ngtcp2_recvfrom_start(stream);
+ }
+ DBG_DEBUG("tstream_ngtcp2_timer_start...\n");
+ tstream_ngtcp2_timer_start(stream);
+ return;
+ }
+
+ if (ret == NGTCP2_ERR_WRITE_MORE) {
+ if (nwritten < 1) {
+ ngtcp2_conn_set_tls_error(si->conn, ret);
+ si->error = EPIPE;
+ DBG_WARNING("si->error[%d] \n", si->error);
+ tstream_ngtcp2_deferred_retry(stream);
+ return;
+ }
+ /* handled below */
+ } else if (ret < 0) {
+ ngtcp2_conn_set_tls_error(si->conn, ret);
+ si->error = EPIPE;
+ DBG_WARNING("si->error[%d] \n", si->error);
+ tstream_ngtcp2_deferred_retry(stream);
+ return;
+ }
+
+ if (nwritten > 0) {
+ cbuf->offset += nwritten;
+ if (cbuf->offset == cbuf->length) {
+ DLIST_REMOVE(si->writev.pending, cbuf);
+ cbuf->offset = si->writev.pushed_offset;
+ si->writev.pushed_offset += cbuf->length;
+ DLIST_ADD_END(si->writev.pushed, cbuf);
+ }
+ }
+ if (ret == NGTCP2_ERR_WRITE_MORE) {
+ DBG_DEBUG("MORE...\n");
+ goto write_more;
+ }
+
+ DBG_DEBUG("tstream_ngtcp2_timer_start...\n");
+ tstream_ngtcp2_timer_start(stream);
+
+ si->push.b.length = ret;
+send_directly:
+ si->push.subreq = tdgram_sendto_send(si,
+ si->current_ev,
+ si->plain_dgram,
+ si->push.b.buffer,
+ si->push.b.length,
+ NULL);
+ if (si->push.subreq == NULL) {
+ si->error = ENOMEM;
+ DBG_WARNING("si->error[%d] \n", si->error);
+ tstream_ngtcp2_deferred_retry(stream);
+ return;
+ }
+ tevent_req_set_callback(si->push.subreq,
+ tstream_ngtcp2_sendto_done,
+ stream);
+
+ return;
+}
+
+static void tstream_ngtcp2_sendto_done(struct tevent_req *subreq)
+{
+ struct tstream_context *stream =
+ tevent_req_callback_data(subreq,
+ struct tstream_context);
+ struct tstream_ngtcp2 *si =
+ tstream_context_data(stream,
+ struct tstream_ngtcp2);
+ ssize_t ret;
+ int error = 0;
+
+ SMB_ASSERT(si->push.subreq == subreq);
+ si->push.subreq = NULL;
+
+ ret = tdgram_sendto_recv(subreq, &error);
+ TALLOC_FREE(subreq);
+ if (ret == -1) {
+ si->error = error;
+ DBG_WARNING("si->error[%d] \n", si->error);
+ tstream_ngtcp2_deferred_retry(stream);
+ return;
+ }
+
+ if (si->push.b.length != ret) {
+ si->error = EIO;
+ tstream_ngtcp2_deferred_retry(stream);
+ return;
+ }
+ si->push.b.length = 0;
+
+ DBG_DEBUG("tstream_ngtcp2_deferred_retry...\n");
+ tstream_ngtcp2_deferred_retry(stream);
+}
+
+static void tstream_ngtcp2_recvfrom_start(struct tstream_context *stream)
+{
+ struct tstream_ngtcp2 *si =
+ tstream_context_data(stream,
+ struct tstream_ngtcp2);
+ struct timespec ts = timespec_current();
+ struct timeval_buf tsbuf;
+
+ if (si->pull.subreq != NULL) {
+ DBG_DEBUG("%s: ALREADY... in_progress[%u]\n",
+ timespec_string_buf(&ts, true, &tsbuf),
+ tevent_req_is_in_progress(si->pull.subreq));
+ return;
+ }
+
+ DBG_DEBUG("RECVFROM...\n");
+ si->pull.subreq = tdgram_recvfrom_send(si,
+ si->current_ev,
+ si->plain_dgram);
+ if (si->pull.subreq == NULL) {
+ si->error = ENOMEM;
+ DBG_WARNING("si->error[%d] \n", si->error);
+ tstream_ngtcp2_deferred_retry(stream);
+ return;
+ }
+ DBG_DEBUG("%s: ...RECVFROM in_progress[%u]\n",
+ timespec_string_buf(&ts, true, &tsbuf),
+ tevent_req_is_in_progress(si->pull.subreq));
+ tevent_req_set_callback(si->pull.subreq,
+ tstream_ngtcp2_recvfrom_done,
+ stream);
+}
+
+static void tstream_ngtcp2_recvfrom_done(struct tevent_req *subreq)
+{
+ struct tstream_context *stream =
+ tevent_req_callback_data(subreq,
+ struct tstream_context);
+ struct tstream_ngtcp2 *si =
+ tstream_context_data(stream,
+ struct tstream_ngtcp2);
+ ssize_t ret;
+ int error = 0;
+ uint8_t *buf = NULL;
+ struct timespec ts = timespec_current();
+ struct timeval_buf tsbuf;
+
+ SMB_ASSERT(si->pull.subreq == subreq);
+ si->pull.subreq = NULL;
+
+ ret = tdgram_recvfrom_recv(subreq, &error, si, &buf, NULL);
+ TALLOC_FREE(subreq);
+ if (ret == -1) {
+ si->error = error;
+ DBG_WARNING("si->error[%d] \n", si->error);
+ tstream_ngtcp2_deferred_retry(stream);
+ return;
+ }
+
+ ret = ngtcp2_conn_read_pkt(si->conn,
+ &si->path,
+ NULL,
+ buf,
+ ret,
+ tstream_ngtcp2_timestamp(NULL));
+
+ DBG_DEBUG("%s: handshake_done[%u] sid[%"PRIi64"] "
+ "ngtcp2_conn_read_pkt ret[%zd] %s\n",
+ timespec_string_buf(&ts, true, &tsbuf),
+ si->handshake.done, si->stream_id,
+ ret, ngtcp2_strerror(ret));
+ if (ret < 0) {
+ si->error = ret;
+ tstream_ngtcp2_deferred_retry(stream);
+ return;
+ }
+
+ /*
+ * Once we got a message from the peer
+ * ngtcp2_conn_read_pkt() reset the
+ * internal state, so we might be able
+ * to send more data now or need to
+ * send some acks or pings.
+ */
+ si->push.blocked = 0;
+ DBG_DEBUG("tstream_ngtcp2_sendto_start...\n");
+ tstream_ngtcp2_sendto_start(stream);
+
+ /*
+ * We likely also got some incoming stream
+ * data so we need to check if a pending
+ * readv_send can make some progress.
+ */
+ DBG_DEBUG("tstream_ngtcp2_deferred_retry...\n");
+ tstream_ngtcp2_deferred_retry(stream);
+}
+
+static size_t tstream_ngtcp2_common_retry(struct tstream_context *stream)
+{
+ struct tstream_ngtcp2 *si =
+ tstream_context_data(stream,
+ struct tstream_ngtcp2);
+ size_t num_requests = 0;
+
+ if (si->handshake.req != NULL && !si->handshake.done) {
+ num_requests += 1;
+ }
+
+ if (si->writev.req != NULL) {
+ num_requests += 1;
+ }
+
+ if (si->readv.req != NULL) {
+ num_requests += 1;
+ }
+
+ if (si->monitor.req != NULL) {
+ num_requests += 1;
+ }
+
+ if (num_requests == 0) {
+ DBG_DEBUG("%s: DISMANTLE\n", __location__);
+ si->last_expire = 0;
+ TALLOC_FREE(si->keepalive_timer);
+ TALLOC_FREE(si->pull.subreq);
+ TALLOC_FREE(si->push.subreq);
+ tevent_reset_immediate(si->retry_im);
+ si->current_ev = NULL;
+ }
+
+ if (si->push.subreq == NULL && si->pull.subreq == NULL) {
+ if (si->handshake.req != NULL && si->handshake.done) {
+ struct tevent_req *req = si->handshake.req;
+
+ si->handshake.req = NULL;
+
+ /* tevent_req_defer_callback was used */
+ tevent_req_done(req);
+ }
+
+ if (si->disconnect.req != NULL) {
+ struct tevent_req *req = si->disconnect.req;
+
+ si->disconnect.req = NULL;
+
+ /* tevent_req_defer_callback was used */
+ tevent_req_done(req);
+ }
+ }
+
+ return num_requests;
+}
+
+static void tstream_ngtcp2_direct_retry(struct tstream_context *stream)
+{
+ struct tstream_ngtcp2 *si =
+ tstream_context_data(stream,
+ struct tstream_ngtcp2);
+ size_t num_requests = tstream_ngtcp2_common_retry(stream);
+
+ /*
+ * If there are more than one pending highlevel
+ * request we need to retry again.
+ *
+ * But we can't do that if
+ * ngtcp2_conn_writev_stream() indicated
+ * a blocking situation. So we need to
+ * wait for [e]poll to notice an incoming
+ * message or the keepalive timer to
+ * trigger more progress.
+ */
+ if (num_requests > 1 && si->push.blocked == 0) {
+ DBG_DEBUG("tstream_ngtcp2_deferred_retry...\n");
+ tstream_ngtcp2_deferred_retry(stream);
+ }
+
+ if (si->handshake.req != NULL && !si->handshake.done) {
+ tstream_ngtcp2_retry_handshake(stream);
+ return;
+ }
+
+ if (si->writev.req != NULL) {
+ tstream_ngtcp2_writev_retry(stream);
+ return;
+ }
+
+ if (si->readv.req != NULL) {
+ tstream_ngtcp2_readv_retry(stream);
+ return;
+ }
+
+ if (si->monitor.req != NULL) {
+ tstream_ngtcp2_monitor_retry(stream);
+ return;
+ }
+}
+
+static void tstream_ngtcp2_retry_trigger(struct tevent_context *ctx,
+ struct tevent_immediate *im,
+ void *private_data)
+{
+ struct tstream_context *stream =
+ talloc_get_type_abort(private_data,
+ struct tstream_context);
+
+ tstream_ngtcp2_direct_retry(stream);
+}
+
+static void tstream_ngtcp2_deferred_retry(struct tstream_context *stream)
+{
+ struct tstream_ngtcp2 *si =
+ tstream_context_data(stream,
+ struct tstream_ngtcp2);
+ size_t num_requests = tstream_ngtcp2_common_retry(stream);
+
+ if (num_requests == 0) {
+ /*
+ * connection is dismantled
+ * and si->current_ev is NULL,
+ * so we need to stop here
+ * and wait for the next
+ * highlevel request to start
+ * the engine again.
+ */
+ return;
+ }
+
+ tevent_schedule_immediate(si->retry_im,
+ si->current_ev,
+ tstream_ngtcp2_retry_trigger,
+ stream);
+}
+
+static ssize_t tstream_ngtcp2_pending_bytes(struct tstream_context *stream)
+{
+ struct tstream_ngtcp2 *si =
+ tstream_context_data(stream,
+ struct tstream_ngtcp2);
+ struct tstream_ngtcp2_buffer *cbuf = NULL;
+ size_t ret = 0;
+
+ if (si->error != 0) {
+ DBG_WARNING("si->error[%d] \n", si->error);
+ errno = si->error;
+ return -1;
+ }
+
+ for (cbuf = si->readv.buffers; cbuf != NULL; cbuf = cbuf->next) {
+ ret += cbuf->length - cbuf->offset;
+ }
+
+ DBG_DEBUG("ret[%zu]\n", ret);
+
+ return ret;
+}
+
+struct tstream_ngtcp2_readv_state {
+ struct tstream_context *stream;
+
+ struct iovec *vector;
+ int count;
+
+ int ret;
+};
+
+static void tstream_ngtcp2_readv_cleanup(struct tevent_req *req,
+ enum tevent_req_state req_state)
+{
+ struct tstream_ngtcp2_readv_state *state =
+ tevent_req_data(req,
+ struct tstream_ngtcp2_readv_state);
+
+ if (state->stream != NULL) {
+ struct tstream_context *stream = state->stream;
+ struct tstream_ngtcp2 *si =
+ tstream_context_data(stream,
+ struct tstream_ngtcp2);
+
+ state->stream = NULL;
+
+ SMB_ASSERT(si->readv.req == req);
+ si->readv.req = NULL;
+
+ tstream_ngtcp2_deferred_retry(stream);
+ }
+}
+
+static void tstream_ngtcp2_readv_next(struct tevent_req *req);
+
+static struct tevent_req *tstream_ngtcp2_readv_send(TALLOC_CTX *mem_ctx,
+ struct tevent_context *ev,
+ struct tstream_context *stream,
+ struct iovec *vector,
+ size_t count)
+{
+ struct tstream_ngtcp2 *si =
+ tstream_context_data(stream,
+ struct tstream_ngtcp2);
+ struct tevent_req *req;
+ struct tstream_ngtcp2_readv_state *state;
+
+ SMB_ASSERT(si->readv.req == NULL);
+
+ if (si->current_ev != ev) {
+ SMB_ASSERT(si->push.subreq == NULL);
+ SMB_ASSERT(si->pull.subreq == NULL);
+ SMB_ASSERT(si->keepalive_timer == NULL);
+ }
+
+ si->current_ev = ev;
+
+ req = tevent_req_create(mem_ctx, &state,
+ struct tstream_ngtcp2_readv_state);
+ if (req == NULL) {
+ return NULL;
+ }
+
+ state->stream = stream;
+ state->ret = 0;
+
+ if (si->error != 0) {
+ DBG_WARNING("si->error[%d] \n", si->error);
+ tevent_req_error(req, si->error);
+ return tevent_req_post(req, ev);
+ }
+
+ /*
+ * we make a copy of the vector so we can change the structure
+ */
+ state->vector = talloc_array(state, struct iovec, count);
+ if (tevent_req_nomem(state->vector, req)) {
+ return tevent_req_post(req, ev);
+ }
+ memcpy(state->vector, vector, sizeof(struct iovec) * count);
+ state->count = count;
+
+ DBG_DEBUG("tstream_ngtcp2_readv_next...\n");
+ si->readv.req = req;
+ tevent_req_defer_callback(req, ev);
+ tevent_req_set_cleanup_fn(req, tstream_ngtcp2_readv_cleanup);
+ tstream_ngtcp2_readv_next(req);
+ if (!tevent_req_is_in_progress(req)) {
+ return tevent_req_post(req, ev);
+ }
+
+ return req;
+}
+
+static void tstream_ngtcp2_readv_next(struct tevent_req *req)
+{
+ struct tstream_ngtcp2_readv_state *state =
+ tevent_req_data(req,
+ struct tstream_ngtcp2_readv_state);
+ struct tstream_ngtcp2 *si =
+ tstream_context_data(state->stream,
+ struct tstream_ngtcp2);
+
+ DBG_DEBUG("START si->read.buffers[%u] state->count[%u] state->ret[%u]\n",
+ !!si->readv.buffers, state->count, state->ret);
+
+ /*
+ * copy the pending buffer first
+ */
+ while (si->readv.buffers != NULL && state->count > 0) {
+ struct tstream_ngtcp2_buffer *cbuf = si->readv.buffers;
+ uint8_t *base = (uint8_t *)state->vector[0].iov_base;
+ size_t len = MIN(cbuf->length - cbuf->offset, state->vector[0].iov_len);
+
+ memcpy(base, cbuf->buffer + cbuf->offset, len);
+
+ base += len;
+ state->vector[0].iov_base = (char *) base;
+ state->vector[0].iov_len -= len;
+
+ cbuf->offset += len;
+ if (cbuf->offset == cbuf->length) {
+ DLIST_REMOVE(si->readv.buffers, cbuf);
+ ngtcp2_conn_extend_max_offset(si->conn, cbuf->length);
+ ngtcp2_conn_extend_max_stream_offset(si->conn,
+ si->stream_id,
+ cbuf->length);
+ TALLOC_FREE(cbuf);
+ }
+
+ if (state->vector[0].iov_len == 0) {
+ state->vector += 1;
+ state->count -= 1;
+ }
+
+ state->ret += len;
+ }
+
+ if (state->count == 0) {
+ DBG_DEBUG("DONE state->red[%d]\n", state->ret);
+ tevent_req_done(req);
+ return;
+ }
+
+ DBG_DEBUG("tstream_ngtcp2_recvfrom_start...\n");
+ tstream_ngtcp2_recvfrom_start(state->stream);
+}
+
+static void tstream_ngtcp2_readv_retry(struct tstream_context *stream)
+{
+ struct tstream_ngtcp2 *si =
+ tstream_context_data(stream,
+ struct tstream_ngtcp2);
+ struct tevent_req *req = si->readv.req;
+
+ if (si->error != 0) {
+ DBG_WARNING("si->error[%d] \n", si->error);
+ tevent_req_error(req, si->error);
+ return;
+ }
+
+ DBG_DEBUG("tstream_ngtcp2_readv_next...\n");
+ tstream_ngtcp2_readv_next(req);
+}
+
+static int tstream_ngtcp2_readv_recv(struct tevent_req *req,
+ int *perrno)
+{
+ struct tstream_ngtcp2_readv_state *state =
+ tevent_req_data(req,
+ struct tstream_ngtcp2_readv_state);
+ int ret;
+
+ ret = tsocket_simple_int_recv(req, perrno);
+ if (ret == 0) {
+ ret = state->ret;
+ }
+ DBG_DEBUG("tsocket_simple_int_recv... %d: %s\n",
+ ret, strerror(ret == -1 ? *perrno : 0));
+
+ tevent_req_received(req);
+ return ret;
+}
+
+struct tstream_ngtcp2_writev_state {
+ struct tstream_context *stream;
+
+ int ret;
+};
+
+static void tstream_ngtcp2_writev_cleanup(struct tevent_req *req,
+ enum tevent_req_state req_state)
+{
+ struct tstream_ngtcp2_writev_state *state =
+ tevent_req_data(req,
+ struct tstream_ngtcp2_writev_state);
+
+ if (state->stream != NULL) {
+ struct tstream_context *stream = state->stream;
+ struct tstream_ngtcp2 *si =
+ tstream_context_data(stream,
+ struct tstream_ngtcp2);
+
+ state->stream = NULL;
+
+ SMB_ASSERT(si->writev.req == req);
+ si->writev.req = NULL;
+
+ tstream_ngtcp2_deferred_retry(stream);
+ }
+}
+
+static struct tevent_req *tstream_ngtcp2_writev_send(TALLOC_CTX *mem_ctx,
+ struct tevent_context *ev,
+ struct tstream_context *stream,
+ const struct iovec *vector,
+ size_t count)
+{
+ struct tstream_ngtcp2 *si =
+ tstream_context_data(stream,
+ struct tstream_ngtcp2);
+ struct tevent_req *req = NULL;
+ struct tstream_ngtcp2_writev_state *state = NULL;
+ struct tstream_ngtcp2_buffer *buffers = NULL;
+ struct tstream_ngtcp2_buffer *cbuf = NULL;
+ size_t vi = 0;
+ size_t vofs = 0;
+ size_t nbytes = 0;
+ size_t nbuffers = 0;
+
+ SMB_ASSERT(si->writev.req == NULL);
+
+ if (si->current_ev != ev) {
+ SMB_ASSERT(si->push.subreq == NULL);
+ SMB_ASSERT(si->pull.subreq == NULL);
+ SMB_ASSERT(si->keepalive_timer == NULL);
+ }
+
+ si->current_ev = ev;
+
+ req = tevent_req_create(mem_ctx, &state,
+ struct tstream_ngtcp2_writev_state);
+ if (req == NULL) {
+ return NULL;
+ }
+
+ state->stream = stream;
+ state->ret = 0;
+
+ if (si->error != 0) {
+ DBG_WARNING("si->error[%d] \n", si->error);
+ tevent_req_error(req, si->error);
+ return tevent_req_post(req, ev);
+ }
+
+ for (vi = 0; vi < count;) {
+ const uint8_t *b = vector[vi].iov_base;
+ size_t l = vector[vi].iov_len;
+ size_t n;
+
+ b += vofs;
+ l -= vofs;
+
+ if (l == 0) {
+ vofs = 0;
+ vi += 1;
+ continue;
+ }
+
+ if (cbuf == NULL) {
+ cbuf = talloc(si, struct tstream_ngtcp2_buffer);
+ if (cbuf == NULL) {
+ si->error = ENOMEM;
+ DBG_WARNING("si->error[%d] \n", si->error);
+ tevent_req_error(req, si->error);
+ return tevent_req_post(req, ev);
+ }
+ cbuf->prev = cbuf->next = NULL;
+ cbuf->offset = 0;
+ cbuf->length = 0;
+ talloc_reparent(si, state, cbuf);
+ DLIST_ADD_END(buffers, cbuf);
+ }
+
+ n = ARRAY_SIZE(cbuf->buffer) - cbuf->length;
+ n = MIN(n, l);
+
+ memcpy(cbuf->buffer + cbuf->length, b, n);
+
+ nbytes += n;
+
+ vofs += n;
+ cbuf->length += n;
+ if (ARRAY_SIZE(cbuf->buffer) == cbuf->length) {
+ cbuf = NULL;
+ }
+ }
+
+ while (buffers != NULL) {
+ cbuf = buffers;
+
+ DLIST_REMOVE(buffers, cbuf);
+
+ nbuffers += 1;
+ DLIST_ADD_END(si->writev.pending, cbuf);
+ talloc_reparent(state, si, cbuf);
+ }
+
+ DBG_DEBUG("tstream_ngtcp2_writev_retry... "
+ "count[%zu] buffers[%zu] bytes[%zu]\n",
+ count, nbuffers, nbytes);
+ si->writev.req = req;
+ tevent_req_defer_callback(req, ev);
+ tevent_req_set_cleanup_fn(req, tstream_ngtcp2_writev_cleanup);
+ tstream_ngtcp2_writev_retry(state->stream);
+ if (!tevent_req_is_in_progress(req)) {
+ return tevent_req_post(req, ev);
+ }
+
+ return req;
+}
+
+static void tstream_ngtcp2_writev_retry(struct tstream_context *stream)
+{
+ struct tstream_ngtcp2 *si =
+ tstream_context_data(stream,
+ struct tstream_ngtcp2);
+ struct tevent_req *req = si->writev.req;
+
+ if (si->error != 0) {
+ DBG_WARNING("si->error[%d] \n", si->error);
+ tevent_req_error(req, si->error);
+ return;
+ }
+
+ if (si->writev.pending == NULL) {
+ DBG_DEBUG("sid[%"PRIi64"] done\n", si->stream_id);
+ tevent_req_done(req);
+ return;
+ }
+
+ DBG_DEBUG("tstream_ngtcp2_sendto_start...\n");
+ tstream_ngtcp2_sendto_start(stream);
+}
+
+static int tstream_ngtcp2_writev_recv(struct tevent_req *req,
+ int *perrno)
+{
+ struct tstream_ngtcp2_writev_state *state =
+ tevent_req_data(req,
+ struct tstream_ngtcp2_writev_state);
+ int ret;
+
+ ret = tsocket_simple_int_recv(req, perrno);
+ if (ret == 0) {
+ ret = state->ret;
+ }
+ DBG_DEBUG("tsocket_simple_int_recv... %d: %s\n",
+ ret, strerror(ret == -1 ? *perrno : 0));
+
+ tevent_req_received(req);
+ return ret;
+}
+
+struct tstream_ngtcp2_disconnect_state {
+ struct tstream_context *stream;
+};
+
+static void tstream_ngtcp2_disconnect_cleanup(struct tevent_req *req,
+ enum tevent_req_state req_state)
+{
+ struct tstream_ngtcp2_disconnect_state *state =
+ tevent_req_data(req,
+ struct tstream_ngtcp2_disconnect_state);
+
+ if (state->stream != NULL) {
+ struct tstream_ngtcp2 *si =
+ tstream_context_data(state->stream,
+ struct tstream_ngtcp2);
+
+ SMB_ASSERT(si->disconnect.req == req);
+ si->disconnect.req = NULL;
+ state->stream = NULL;
+ }
+}
+
+static struct tevent_req *tstream_ngtcp2_disconnect_send(TALLOC_CTX *mem_ctx,
+ struct tevent_context *ev,
+ struct tstream_context *stream)
+{
+ struct tstream_ngtcp2 *si =
+ tstream_context_data(stream,
+ struct tstream_ngtcp2);
+ struct tevent_req *req;
+ struct tstream_ngtcp2_disconnect_state *state;
+
+ SMB_ASSERT(si->disconnect.req == NULL);
+
+ if (si->current_ev != ev) {
+ SMB_ASSERT(si->push.subreq == NULL);
+ SMB_ASSERT(si->pull.subreq == NULL);
+ SMB_ASSERT(si->keepalive_timer == NULL);
+ }
+
+ si->current_ev = ev;
+
+ req = tevent_req_create(mem_ctx, &state,
+ struct tstream_ngtcp2_disconnect_state);
+ if (req == NULL) {
+ return NULL;
+ }
+ state->stream = stream;
+
+ if (si->error != 0) {
+ DBG_WARNING("si->error[%d] \n", si->error);
+ tevent_req_error(req, si->error);
+ return tevent_req_post(req, ev);
+ }
+
+ tevent_req_defer_callback(req, ev);
+ tevent_req_set_cleanup_fn(req, tstream_ngtcp2_disconnect_cleanup);
+
+ tstream_ngtcp2_close_stream(si);
+
+ si->disconnect.req = req;
+ if (si->push.subreq != NULL) {
+ /*
+ * We need to wait until we flushed all
+ * pending messages to the kernel socket.
+ */
+ tevent_req_set_callback(si->push.subreq,
+ tstream_ngtcp2_sendto_done,
+ stream);
+ return req;
+ }
+
+ tevent_req_done(req);
+ return tevent_req_post(req, ev);
+}
+
+static int tstream_ngtcp2_disconnect_recv(struct tevent_req *req,
+ int *perrno)
+{
+ int ret;
+
+ ret = tsocket_simple_int_recv(req, perrno);
+ DBG_DEBUG("tsocket_simple_int_recv... %d: %s\n",
+ ret, strerror(ret == -1 ? *perrno : 0));
+
+ tevent_req_received(req);
+ return ret;
+}
+
+struct tstream_ngtcp2_monitor_state {
+ struct tstream_context *stream;
+};
+
+static void tstream_ngtcp2_monitor_cleanup(struct tevent_req *req,
+ enum tevent_req_state req_state)
+{
+ struct tstream_ngtcp2_monitor_state *state =
+ tevent_req_data(req,
+ struct tstream_ngtcp2_monitor_state);
+
+ if (state->stream != NULL) {
+ struct tstream_context *stream = state->stream;
+ struct tstream_ngtcp2 *si =
+ tstream_context_data(stream,
+ struct tstream_ngtcp2);
+
+ state->stream = NULL;
+
+ SMB_ASSERT(si->monitor.req == req);
+ si->monitor.req = NULL;
+
+ tstream_ngtcp2_deferred_retry(stream);
+ }
+}
+
+static struct tevent_req *tstream_ngtcp2_monitor_send(
+ TALLOC_CTX *mem_ctx,
+ struct tevent_context *ev,
+ struct tstream_context *stream)
+{
+ struct tstream_ngtcp2 *si = tstream_context_data(stream,
+ struct tstream_ngtcp2);
+ struct tevent_req *req;
+ struct tstream_ngtcp2_monitor_state *state;
+
+ SMB_ASSERT(si->monitor.req == NULL);
+
+ if (si->current_ev != ev) {
+ SMB_ASSERT(si->push.subreq == NULL);
+ SMB_ASSERT(si->pull.subreq == NULL);
+ SMB_ASSERT(si->keepalive_timer == NULL);
+ }
+
+ si->current_ev = ev;
+
+ req = tevent_req_create(mem_ctx, &state,
+ struct tstream_ngtcp2_monitor_state);
+ if (req == NULL) {
+ return NULL;
+ }
+ state->stream = stream;
+
+ if (si->error != 0) {
+ DBG_WARNING("si->error[%d] \n", si->error);
+ tevent_req_error(req, si->error);
+ return tevent_req_post(req, ev);
+ }
+
+ DBG_DEBUG("tstream_ngtcp2_monitor_retry...\n");
+ si->monitor.req = req;
+ tevent_req_defer_callback(req, ev);
+ tevent_req_set_cleanup_fn(req, tstream_ngtcp2_monitor_cleanup);
+ tstream_ngtcp2_monitor_retry(stream);
+ if (!tevent_req_is_in_progress(req)) {
+ return tevent_req_post(req, ev);
+ }
+
+ return req;
+}
+
+static void tstream_ngtcp2_monitor_retry(struct tstream_context *stream)
+{
+ struct tstream_ngtcp2 *si = tstream_context_data(stream,
+ struct tstream_ngtcp2);
+ struct tevent_req *req = si->monitor.req;
+
+ if (si->error != 0) {
+ DBG_WARNING("si->error[%d] \n", si->error);
+ tevent_req_error(req, si->error);
+ return;
+ }
+
+ DBG_DEBUG("AGAIN...\n");
+
+ tstream_ngtcp2_timer_start(stream);
+ tstream_ngtcp2_recvfrom_start(stream);
+}
+
+static int tstream_ngtcp2_monitor_recv(struct tevent_req *req, int *perrno)
+{
+ int ret;
+
+ ret = tsocket_simple_int_recv(req, perrno);
+ DBG_DEBUG("tsocket_simple_int_recv... %d: %s\n",
+ ret, strerror(ret == -1 ? *perrno : 0));
+
+ tevent_req_received(req);
+ return ret;
+}
+
+static const struct tstream_context_ops tstream_ngtcp2_ops = {
+ .name = "ngtcp2",
+
+ .pending_bytes = tstream_ngtcp2_pending_bytes,
+
+ .readv_send = tstream_ngtcp2_readv_send,
+ .readv_recv = tstream_ngtcp2_readv_recv,
+
+ .writev_send = tstream_ngtcp2_writev_send,
+ .writev_recv = tstream_ngtcp2_writev_recv,
+
+ .disconnect_send = tstream_ngtcp2_disconnect_send,
+ .disconnect_recv = tstream_ngtcp2_disconnect_recv,
+
+ .monitor_send = tstream_ngtcp2_monitor_send,
+ .monitor_recv = tstream_ngtcp2_monitor_recv,
+};
+
+#endif /* HAVE_LIBNGTCP2 */
+
+struct tstream_tls_ngtcp2_connect_state {
+ struct tstream_context *quic_stream;
+};
+
+static void tstream_tls_ngtcp2_connect_cleanup(struct tevent_req *req,
+ enum tevent_req_state req_state);
+
+struct tevent_req *_tstream_tls_ngtcp2_connect_send(TALLOC_CTX *mem_ctx,
+ struct tevent_context *ev,
+ struct tstream_tls_params *tlsp,
+ uint32_t timeout_msec,
+ const char *alpn,
+ int *sockfd,
+ const char *location)
+{
+ struct tevent_req *req = NULL;
+ struct tstream_tls_ngtcp2_connect_state *state = NULL;
+#ifdef HAVE_LIBNGTCP2
+ struct tstream_ngtcp2 *si = NULL;
+ gnutls_datum_t alpn_data = {
+ .data = discard_const_p(unsigned char, "smb"),
+ .size = 3,
+ };
+ ngtcp2_callbacks callbacks = {
+ .client_initial = /* required client */
+ ngtcp2_crypto_client_initial_cb,
+ .recv_crypto_data = /* required */
+ ngtcp2_crypto_recv_crypto_data_cb,
+ .encrypt = ngtcp2_crypto_encrypt_cb, /* required */
+ .decrypt = ngtcp2_crypto_decrypt_cb, /* required */
+ .hp_mask = ngtcp2_crypto_hp_mask_cb, /* required */
+ .recv_stream_data =
+ tstream_ngtcp2_recv_stream_data_cb, /* used */
+ .acked_stream_data_offset =
+ tstream_ngtcp2_acked_stream_data_offset_cb, /* used */
+ .stream_close =
+ tstream_ngtcp2_stream_close_cb, /* used */
+ .recv_stateless_reset =
+ tstream_ngtcp2_recv_stateless_reset_cb, /* used */
+ .recv_retry = ngtcp2_crypto_recv_retry_cb, /* required client */
+ .rand = tstream_ngtcp2_rand_cb, /* required */
+ .get_new_connection_id = /* required */
+ tstream_ngtcp2_get_new_connection_id_cb,
+ .update_key = ngtcp2_crypto_update_key_cb, /* required */
+ .stream_reset =
+ tstream_ngtcp2_stream_reset_cb, /* used */
+ .delete_crypto_aead_ctx = /* required */
+ ngtcp2_crypto_delete_crypto_aead_ctx_cb,
+ .delete_crypto_cipher_ctx = /* required */
+ ngtcp2_crypto_delete_crypto_cipher_ctx_cb,
+ .get_path_challenge_data = /* required */
+ ngtcp2_crypto_get_path_challenge_data_cb,
+ .version_negotiation = /* required */
+ ngtcp2_crypto_version_negotiation_cb,
+ };
+ ngtcp2_cid dcid = {
+ .datalen = NGTCP2_MIN_INITIAL_DCIDLEN,
+ };
+ ngtcp2_cid scid = {
+ .datalen = NGTCP2_MIN_INITIAL_DCIDLEN,
+ };
+ ngtcp2_settings settings = {};
+ ngtcp2_transport_params params = {};
+ uint32_t available_versions32[2];
+ union {
+ uint32_t v32[2];
+ uint8_t v8[8];
+ } available_versions;
+ NTSTATUS status;
+ int ret;
+#endif /* HAVE_LIBNGTCP2 */
+ bool ok;
+
+ req = tevent_req_create(mem_ctx, &state,
+ struct tstream_tls_ngtcp2_connect_state);
+ if (req == NULL) {
+ return NULL;
+ }
+ tevent_req_defer_callback(req, ev);
+ tevent_req_set_cleanup_fn(req, tstream_tls_ngtcp2_connect_cleanup);
+
+#ifdef HAVE_LIBNGTCP2
+ state->quic_stream = tstream_context_create(state,
+ &tstream_ngtcp2_ops,
+ &si,
+ struct tstream_ngtcp2,
+ location);
+ if (tevent_req_nomem(state->quic_stream, req)) {
+ return tevent_req_post(req, ev);
+ }
+ ZERO_STRUCTP(si);
+ talloc_set_destructor(si, tstream_ngtcp2_destructor);
+
+ si->laddr = (struct samba_sockaddr) {
+ .sa_socklen = sizeof(struct sockaddr_storage),
+ };
+ si->raddr = (struct samba_sockaddr) {
+ .sa_socklen = sizeof(struct sockaddr_storage),
+ };
+
+ ret = getsockname(*sockfd, &si->laddr.u.sa, &si->laddr.sa_socklen);
+ if (ret != 0) {
+ tevent_req_nterror(req, NT_STATUS_INTERNAL_ERROR);
+ return tevent_req_post(req, ev);
+ }
+
+ ret = getpeername(*sockfd, &si->raddr.u.sa, &si->raddr.sa_socklen);
+ if (ret != 0) {
+ tevent_req_nterror(req, NT_STATUS_INTERNAL_ERROR);
+ return tevent_req_post(req, ev);
+ }
+
+ ret = tdgram_bsd_existing_socket(si, *sockfd, &si->plain_dgram);
+ if (ret != 0) {
+ tevent_req_nterror(req, NT_STATUS_INTERNAL_ERROR);
+ return tevent_req_post(req, ev);
+ }
+ *sockfd = -1;
+ tdgram_bsd_optimize_recvfrom(si->plain_dgram, true);
+#endif /* HAVE_LIBNGTCP2 */
+
+ ok = tevent_req_set_endtime(req, ev,
+ timeval_current_ofs_msec(timeout_msec));
+ if (!ok) {
+ tevent_req_oom(req);
+ return tevent_req_post(req, ev);
+ }
+
+ ok = tstream_tls_params_quic_enabled(tlsp);
+ if (!ok) {
+ goto invalid_parameter_mix;
+ }
+
+#ifdef HAVE_LIBNGTCP2
+
+ si->conn_ref.get_conn = qwrap_ngtcp2_conn_ref_get_conn;
+ si->conn_ref.user_data = state->quic_stream;
+
+ si->handshake.tlss = talloc_zero(state, struct tstream_tls);
+ if (tevent_req_nomem(si->handshake.tlss, req)) {
+ return tevent_req_post(req, ev);
+ }
+ talloc_set_destructor(si->handshake.tlss, tstream_tls_destructor);
+ si->handshake.tlss->is_server = false;
+
+ status = tstream_tls_prepare_gnutls(tlsp, si->handshake.tlss);
+ if (tevent_req_nterror(req, status)) {
+ return tevent_req_post(req, ev);
+ }
+
+ gnutls_session_set_ptr(si->handshake.tlss->tls_session, &si->conn_ref);
+ ngtcp2_crypto_gnutls_configure_client_session(si->handshake.tlss->tls_session);
+
+ ret = gnutls_alpn_set_protocols(si->handshake.tlss->tls_session,
+ &alpn_data, 1,
+ GNUTLS_ALPN_MANDATORY);
+ if (ret != 0) {
+ tevent_req_nterror(req, NT_STATUS_INTERNAL_ERROR);
+ return tevent_req_post(req, ev);
+ }
+
+ ret = gnutls_rnd(GNUTLS_RND_RANDOM, dcid.data, dcid.datalen);
+ if (ret != 0) {
+ tevent_req_nterror(req, NT_STATUS_INTERNAL_ERROR);
+ return tevent_req_post(req, ev);
+ }
+
+ ret = gnutls_rnd(GNUTLS_RND_RANDOM, scid.data, scid.datalen);
+ if (ret != 0) {
+ tevent_req_nterror(req, NT_STATUS_INTERNAL_ERROR);
+ return tevent_req_post(req, ev);
+ }
+
+ si->path = (ngtcp2_path) {
+ .local = {
+ .addr = &si->laddr.u.sa,
+ .addrlen = si->laddr.sa_socklen,
+ },
+ .remote = {
+ .addr = &si->raddr.u.sa,
+ .addrlen = si->raddr.sa_socklen,
+ },
+ };
+
+ available_versions32[0] = NGTCP2_PROTO_VER_V2;
+ available_versions32[1] = NGTCP2_PROTO_VER_V1;
+
+ available_versions.v32[0] = htonl(available_versions32[0]);
+ available_versions.v32[1] = htonl(available_versions32[1]);
+
+ ngtcp2_settings_default(&settings);
+
+ settings.initial_ts = tstream_ngtcp2_timestamp(NULL);
+ settings.handshake_timeout = timeout_msec * NGTCP2_MILLISECONDS;
+ settings.log_printf = tstream_ngtcp2_log_printf;
+ settings.qlog_write = tstream_ngtcp2_qlog_write_cb;
+
+ if (CHECK_DEBUGLVL(11)) {
+ tstream_ngtcp2_verbose = true;
+ }
+
+ settings.available_versions = available_versions32;
+ settings.available_versionslen = ARRAY_SIZE(available_versions32);
+
+ /*
+ * Copied from quic_transport_param_init
+ */
+ params.max_udp_payload_size = 65527 /* QUIC_MAX_UDP_PAYLOAD */;
+ params.ack_delay_exponent = 3 /* QUIC_DEF_ACK_DELAY_EXPONENT */;
+ params.max_ack_delay = 25000 /* QUIC_DEF_ACK_DELAY */;
+ params.active_connection_id_limit = 7 /* QUIC_CONN_ID_DEF */;
+ params.max_idle_timeout = 30000000 /* QUIC_DEF_IDLE_TIMEOUT */;
+ params.initial_max_data = (uint64_t)65536U /* QUIC_PATH_MAX_PMTU */ * 32;
+ params.initial_max_stream_data_bidi_local = (uint64_t)65536U /* QUIC_PATH_MAX_PMTU */ * 16;
+ params.initial_max_stream_data_bidi_remote = (uint64_t)65536U /* QUIC_PATH_MAX_PMTU */ * 16;
+ params.initial_max_stream_data_uni = (uint64_t)65536U /* QUIC_PATH_MAX_PMTU */ * 16;
+ params.initial_max_streams_bidi = 100 /* QUIC_DEF_STREAMS */;
+ params.initial_max_streams_uni = 100 /* QUIC_DEF_STREAMS */;
+
+ params.version_info_present = 1;
+ params.version_info.chosen_version = NGTCP2_PROTO_VER_V1;
+ params.version_info.available_versions = available_versions.v8;
+ params.version_info.available_versionslen = ARRAY_SIZE(available_versions.v8);
+
+ params.max_ack_delay *= NGTCP2_MICROSECONDS;
+ params.max_idle_timeout *= NGTCP2_MICROSECONDS;
+
+ ret = ngtcp2_conn_client_new(&si->conn,
+ &dcid,
+ &scid,
+ &si->path,
+ NGTCP2_PROTO_VER_V1,
+ &callbacks,
+ &settings,
+ ¶ms,
+ NULL,
+ state->quic_stream);
+ if (ret != 0) {
+ tevent_req_nterror(req, NT_STATUS_INTERNAL_ERROR);
+ return tevent_req_post(req, ev);
+ }
+
+ si->keepalive_usecs = 1500 * 1000;
+ ngtcp2_conn_set_keep_alive_timeout(si->conn, si->keepalive_usecs * NGTCP2_MICROSECONDS);
+ ngtcp2_conn_set_tls_native_handle(si->conn,
+ si->handshake.tlss->tls_session);
+
+ si->retry_im = tevent_create_immediate(si);
+ if (tevent_req_nomem(si->retry_im, req)) {
+ return tevent_req_post(req, ev);
+ }
+
+ si->current_ev = ev;
+ si->handshake.req = req;
+ tstream_ngtcp2_retry_handshake(state->quic_stream);
+ if (!tevent_req_is_in_progress(req)) {
+ return tevent_req_post(req, ev);
+ }
+
+ return req;
+#endif /* HAVE_LIBNGTCP2 */
+invalid_parameter_mix:
+ tevent_req_nterror(req, NT_STATUS_INVALID_PARAMETER_MIX);
+ return tevent_req_post(req, ev);
+}
+
+static void tstream_tls_ngtcp2_connect_cleanup(struct tevent_req *req,
+ enum tevent_req_state req_state)
+{
+ struct tstream_tls_ngtcp2_connect_state *state =
+ tevent_req_data(req,
+ struct tstream_tls_ngtcp2_connect_state);
+
+ if (req_state == TEVENT_REQ_DONE) {
+ return;
+ }
+
+ TALLOC_FREE(state->quic_stream);
+}
+
+int tstream_tls_ngtcp2_connect_recv(struct tevent_req *req,
+ int *perrno,
+ TALLOC_CTX *mem_ctx,
+ struct tstream_context **quic_stream)
+{
+ struct tstream_tls_ngtcp2_connect_state *state =
+ tevent_req_data(req,
+ struct tstream_tls_ngtcp2_connect_state);
+
+ if (tevent_req_is_unix_error(req, perrno)) {
+ tevent_req_received(req);
+ return -1;
+ }
+
+ *quic_stream = talloc_move(mem_ctx, &state->quic_stream);
+ tevent_req_received(req);
+ return 0;
+}