* something went wrong, normally should not happen. */
break;
}
+ if (session->sflags.closing)
+ break; /* Submitting into a closing session could cause trouble. */
}
/* worker_submit() may cause the session to close (e.g. due to IO
return transfer;
}
-static void on_write_complete(uv_write_t *req, int status)
-{
- if (kr_fails_assert(req->data))
- return;
- struct async_write_ctx *async_ctx = (struct async_write_ctx *)req->data;
- struct tls_common_ctx *t = async_ctx->t;
- if (t->write_queue_size)
- t->write_queue_size -= 1;
- else
- kr_assert(false);
- free(req->data);
-}
-
-static bool stream_queue_is_empty(struct tls_common_ctx *t)
-{
- return (t->write_queue_size == 0);
-}
-
static ssize_t kres_gnutls_vec_push(gnutls_transport_ptr_t h, const giovec_t * iov, int iovcnt)
{
struct tls_common_ctx *t = (struct tls_common_ctx *)h;
return -1;
}
- /*
- * This is a little bit complicated. There are two different writes:
- * 1. Immediate, these don't need to own the buffered data and return immediately
- * 2. Asynchronous, these need to own the buffers until the write completes
- * In order to avoid copying the buffer, an immediate write is tried first if possible.
- * If it isn't possible to write the data without queueing, an asynchronous write
- * is created (with copied buffered data).
- */
-
size_t total_len = 0;
uv_buf_t uv_buf[iovcnt];
for (int i = 0; i < iovcnt; ++i) {
total_len += iov[i].iov_len;
}
- /* Try to perform the immediate write first to avoid copy */
- int ret = 0;
- if (stream_queue_is_empty(t)) {
+ int ret;
+ { // indentation kept to reduce diff:
ret = uv_try_write(handle, uv_buf, iovcnt);
DEBUG_MSG("[%s] push %zu <%p> = %d\n",
t->client_side ? "tls_client" : "tls", total_len, h, ret);
errno = EIO;
return ret;
}
- /* Since we are here expression below is true
- * (ret != total_len) && (ret >= 0 || ret == UV_EAGAIN)
- * or the same
- * (ret != total_len && ret >= 0) || (ret != total_len && ret == UV_EAGAIN)
- * i.e. either occurs partial write or UV_EAGAIN.
- * Proceed and copy data amount to owned memory and perform async write.
- */
- if (ret == UV_EAGAIN) {
- /* No data were buffered, so we must buffer all the data. */
- ret = 0;
- }
- }
-
- /* Fallback when the queue is full, and it's not possible to do an immediate write */
- char *p = malloc(sizeof(struct async_write_ctx) + total_len - ret);
- if (p != NULL) {
- struct async_write_ctx *async_ctx = (struct async_write_ctx *)p;
- /* Save pointer to session tls context */
- async_ctx->t = t;
- char *buf = async_ctx->buf;
- /* Skip data written in the partial write */
- size_t to_skip = ret;
- /* Copy the buffer into owned memory */
- size_t off = 0;
- for (int i = 0; i < iovcnt; ++i) {
- if (to_skip > 0) {
- /* Ignore current buffer if it's all skipped */
- if (to_skip >= uv_buf[i].len) {
- to_skip -= uv_buf[i].len;
- continue;
- }
- /* Skip only part of the buffer */
- uv_buf[i].base += to_skip;
- uv_buf[i].len -= to_skip;
- to_skip = 0;
- }
- memcpy(buf + off, uv_buf[i].base, uv_buf[i].len);
- off += uv_buf[i].len;
- }
- uv_buf[0].base = buf;
- uv_buf[0].len = off;
-
- /* Create an asynchronous write request */
- uv_write_t *write_req = &async_ctx->write_req;
- memset(write_req, 0, sizeof(uv_write_t));
- write_req->data = p;
-
- /* Perform an asynchronous write with a callback */
- if (uv_write(write_req, handle, uv_buf, 1, on_write_complete) == 0) {
- ret = total_len;
- t->write_queue_size += 1;
- } else {
- free(p);
- VERBOSE_MSG(t->client_side, "uv_write error: %s\n",
- uv_strerror(ret));
- errno = EIO;
- ret = -1;
- }
- } else {
- errno = ENOMEM;
- ret = -1;
+ return kr_error(ENOBUFS);
}
-
- DEBUG_MSG("[%s] queued %zu <%p> = %d\n",
- t->client_side ? "tls_client" : "tls", total_len, h, ret);
-
- return ret;
}
/** Perform TLS handshake and handle error codes according to the documentation.
free(tls);
}
-int tls_write(uv_write_t *req, uv_handle_t *handle, knot_pkt_t *pkt, uv_write_cb cb)
+int tls_write(uv_handle_t *handle, knot_pkt_t *pkt)
{
if (!pkt || !handle || !handle->data) {
return kr_error(EINVAL);
}
/* The data is now accepted in gnutls internal buffers, the message can be treated as sent */
- req->handle = (uv_stream_t *)handle;
- cb(req, 0);
-
return kr_ok();
}
uint8_t recv_buf[16384];
tls_handshake_cb handshake_cb;
struct worker_ctx *worker;
- size_t write_queue_size;
};
struct tls_ctx {
void tls_free(struct tls_ctx* tls);
/*! Push new data to TLS context for sending */
-int tls_write(uv_write_t *req, uv_handle_t* handle, knot_pkt_t * pkt, uv_write_cb cb);
+int tls_write(uv_handle_t* handle, knot_pkt_t * pkt);
/*! Unwrap incoming data from a TLS stream and pass them to TCP session.
* @return the number of newly-completed requests (>=0) or an error code
return status;
}
-static void on_send(uv_udp_send_t *req, int status)
-{
- struct qr_task *task = req->data;
- uv_handle_t *h = (uv_handle_t *)req->handle;
- qr_task_on_send(task, h, status);
- qr_task_unref(task);
- free(req);
-}
-
static void on_write(uv_write_t *req, int status)
{
struct qr_task *task = req->data;
if (kr_fails_assert(!session_flags(session)->closing))
return qr_task_on_send(task, NULL, kr_error(EIO));
- uv_handle_t *ioreq = malloc(is_stream ? sizeof(uv_write_t) : sizeof(uv_udp_send_t));
- if (!ioreq)
- return qr_task_on_send(task, handle, kr_error(ENOMEM));
-
/* Pending ioreq on current task */
qr_task_ref(task);
if (session_flags(session)->has_http) {
#if ENABLE_DOH2
- uv_write_t *write_req = (uv_write_t *)ioreq;
+ uv_write_t *write_req = malloc(sizeof(*write_req));
+ if (!write_req)
+ return qr_task_on_send(task, handle, kr_error(ENOMEM));
write_req->data = task;
ret = http_write(write_req, handle, pkt, ctx->req.qsource.stream_id, &on_write);
+ if (ret != 0)
+ free(write_req);
#else
ret = kr_error(ENOPROTOOPT);
#endif
} else if (session_flags(session)->has_tls) {
- uv_write_t *write_req = (uv_write_t *)ioreq;
- write_req->data = task;
- ret = tls_write(write_req, handle, pkt, &on_write);
+ ret = tls_write(handle, pkt);
} else if (handle->type == UV_UDP) {
- uv_udp_send_t *send_req = (uv_udp_send_t *)ioreq;
uv_buf_t buf = { (char *)pkt->wire, pkt->size };
- send_req->data = task;
- ret = uv_udp_send(send_req, (uv_udp_t *)handle, &buf, 1, addr, &on_send);
- } else if (handle->type == UV_TCP) {
- uv_write_t *write_req = (uv_write_t *)ioreq;
- /* We need to write message length in native byte order,
- * but we don't have a convenient place to store those bytes.
- * The problem is that all memory referenced from buf[] MUST retain
- * its contents at least until on_write() is called, and I currently
- * can't see any convenient place outside the `pkt` structure.
- * So we use directly the *individual* bytes in pkt->size.
- * The call to htonl() and the condition will probably be inlinable. */
- int lsbi, slsbi; /* (second) least significant byte index */
- if (htonl(1) == 1) { /* big endian */
- lsbi = sizeof(pkt->size) - 1;
- slsbi = sizeof(pkt->size) - 2;
- } else {
- lsbi = 0;
- slsbi = 1;
+ ret = uv_udp_try_send((uv_udp_t *)handle, &buf, 1, addr);
+ if (ret >= 0 && ret == pkt->size) {
+ ret = 0; // this is the success
+ } else if (ret == UV_EAGAIN || ret >= 0) {
+ ret = kr_error(ENOBUFS);
}
- uv_buf_t buf[3] = {
- { (char *)&pkt->size + slsbi, 1 },
- { (char *)&pkt->size + lsbi, 1 },
+ } else if (handle->type == UV_TCP) {
+ uint8_t size_buf[2] = { pkt->size / 256, pkt->size % 256 };
+ uv_buf_t buf[2] = {
+ { (char *)&size_buf, 2 },
{ (char *)pkt->wire, pkt->size },
};
- write_req->data = task;
- ret = uv_write(write_req, (uv_stream_t *)handle, buf, 3, &on_write);
+ ret = uv_try_write((uv_stream_t *)handle, buf, 2);
+ if (ret >= 0 && ret == pkt->size + 2) {
+ ret = 0; // this is the success
+ } else if (ret == UV_EAGAIN || ret >= 0) {
+ ret = kr_error(ENOBUFS);
+ }
} else {
kr_assert(false);
+ ret = kr_error(EINVAL);
}
if (ret == 0) {
+ if (!session_flags(session)->has_http) { // instead of completion callback
+ qr_task_on_send(task, handle, ret);
+ qr_task_unref(task);
+ }
session_touch(session);
if (session_flags(session)->outgoing) {
session_tasklist_add(session, task);
worker->too_many_open = false;
}
} else {
- free(ioreq);
qr_task_unref(task);
if (ret == UV_EMFILE) {
worker->too_many_open = true;