static int qr_task_on_send_internal(struct qr_task *task, const uv_handle_t *handle, int status)
{
qr_task_assert_weakptr(task);
+ qr_task_weakptr_t taskptr = task->weakptr;
if (task->finished) {
kr_require(task->leading == false);
qr_task_complete(task);
}
cleanup:;
- if (task->finished) {
+ if (worker_task_exists(taskptr) && task->finished) {
/* Answer has been sent or an error has occurred,
* the task is complete, we can free it. */
qr_task_free(task);
return qr_task_on_send_internal(task, handle, status);
}
+struct async_write_data {
+ uv_req_t *req;
+ qr_task_weakptr_t taskptr;
+ char buf[];
+};
+
+static struct async_write_data *make_req_async(uv_req_t *req, const uv_buf_t bufs[], unsigned int nbufs)
+{
+ size_t offs[nbufs + 1];
+ offs[0] = 0;
+ for (unsigned int i = 0; i < nbufs; i++)
+ offs[i + 1] = offs[i] + bufs[i].len;
+ struct async_write_data *adata = malloc(sizeof(*adata) + offs[nbufs]);
+ kr_require(adata);
+
+ adata->taskptr = (qr_task_weakptr_t)req->data;
+ req->data = adata;
+ adata->req = req;
+
+ for (unsigned int i = 0; i < nbufs; i++)
+ memcpy(adata->buf + offs[i], bufs[i].base, bufs[i].len);
+
+ return adata;
+}
+
static void on_send(uv_udp_send_t *req, int status)
{
qr_task_weakptr_t taskptr = (qr_task_weakptr_t) req->data;
free(req);
}
+static void on_async_send(uv_udp_send_t *req, int status)
+{
+ struct async_write_data *adata = req->data;
+ req->data = (void *)adata->taskptr;
+ on_send(req, status);
+ free(adata);
+}
+
+static int kr_udp_send(uv_udp_send_t *req, uv_udp_t *handle, const uv_buf_t bufs[],
+ unsigned int nbufs, const struct sockaddr *addr)
+{
+ size_t total_len = 0;
+ for (unsigned int i = 0; i < nbufs; i++)
+ total_len += bufs[i].len;
+ req->handle = handle;
+ int ret = uv_udp_try_send(handle, bufs, nbufs, addr);
+ if (ret == total_len) {
+ on_send(req, 0);
+ return 0;
+ }
+
+ if (ret >= 0)
+ return UV_EIO;
+ if (ret != UV_EAGAIN)
+ return ret;
+
+ struct async_write_data *adata = make_req_async((uv_req_t *)req, bufs, nbufs);
+ uv_buf_t buf = { .base = adata->buf, .len = total_len };
+ return uv_udp_send(req, handle, &buf, 1, addr, &on_async_send);
+}
+
static void on_write(uv_write_t *req, int status)
{
qr_task_weakptr_t taskptr = (qr_task_weakptr_t) req->data;
free(req);
}
+static void on_async_write(uv_write_t *req, int status)
+{
+ struct async_write_data *adata = req->data;
+ req->data = (void *)adata->taskptr;
+ on_write(req, status);
+ free(adata);
+}
+
+static int kr_write(uv_write_t *req, uv_stream_t *handle, const uv_buf_t bufs[],
+ unsigned int nbufs)
+{
+ size_t total_len = 0;
+ for (unsigned int i = 0; i < nbufs; i++)
+ total_len += bufs[i].len;
+ req->handle = handle;
+ int ret = uv_try_write(handle, bufs, nbufs);
+ if (ret == total_len) {
+ on_write(req, 0);
+ return 0;
+ }
+
+ if (ret >= 0)
+ return UV_EIO;
+ if (ret != UV_EAGAIN)
+ return ret;
+
+ struct async_write_data *adata = make_req_async((uv_req_t *)req, bufs, nbufs);
+ uv_buf_t buf = { .base = adata->buf, .len = total_len };
+ return uv_write(req, handle, &buf, 1, &on_async_write);
+}
+
static int qr_task_send(struct qr_task *task, struct session *session,
const struct sockaddr *addr, knot_pkt_t *pkt)
{
uv_udp_send_t *send_req = (uv_udp_send_t *)ioreq;
uv_buf_t buf = { (char *)pkt->wire, pkt->size };
send_req->data = (void *)task->weakptr;
- ret = uv_udp_send(send_req, (uv_udp_t *)handle, &buf, 1, addr, &on_send);
+ ret = kr_udp_send(send_req, (uv_udp_t *)handle, &buf, 1, addr);
} else if (handle->type == UV_TCP) {
uv_write_t *write_req = (uv_write_t *)ioreq;
/* We need to write message length in native byte order,
{ (char *)pkt->wire, pkt->size },
};
write_req->data = (void *)task->weakptr;
- ret = uv_write(write_req, (uv_stream_t *)handle, buf, 3, &on_write);
+ ret = kr_write(write_req, (uv_stream_t *)handle, buf, 3);
} else {
kr_assert(false);
}
the_worker->stats.err_udp += 1;
else
the_worker->stats.err_tcp += 1;
+
+ qr_task_on_send_internal(task, handle, ret);
}
/* Update outgoing query statistics */