#include "daemon/engine.h"
#include "daemon/io.h"
+/* @internal IO request entry. */
+struct ioreq
+{
+ union {
+ uv_udp_send_t send;
+ uv_write_t write;
+ uv_connect_t connect;
+ } as;
+};
+
+static inline struct ioreq *ioreq_take(struct worker_ctx *worker)
+{
+ struct ioreq *req = NULL;
+ if (worker->ioreqs.len > 0) {
+ req = array_tail(worker->ioreqs);
+ array_pop(worker->ioreqs);
+ } else {
+ req = malloc(sizeof(*req));
+ }
+ return req;
+}
+
+static inline void ioreq_release(struct worker_ctx *worker, struct ioreq *req)
+{
+ if (!req || worker->ioreqs.len < MP_FREELIST_SIZE) {
+ array_push(worker->ioreqs, req);
+ } else {
+ free(req);
+ }
+}
+
/** @internal Query resolution task. */
struct qr_task
{
return status;
}
+static void on_send(uv_udp_send_t *req, int status)
+{
+ struct qr_task *task = req->data;
+ qr_task_on_send(task, status);
+ ioreq_release(task->worker, (struct ioreq *)req);
+}
+
+static void on_write(uv_write_t *req, int status)
+{
+ struct qr_task *task = req->data;
+ qr_task_on_send(task, status);
+ ioreq_release(task->worker, (struct ioreq *)req);
+}
+
static int qr_task_send(struct qr_task *task, uv_handle_t *handle, struct sockaddr *addr, knot_pkt_t *pkt)
{
int ret = 0;
- if (!handle) {
+ struct ioreq *req = ioreq_take(task->worker);
+ if (!handle || !req) {
return qr_task_on_send(task, kr_error(EIO));
}
if (handle->type == UV_UDP) {
uv_buf_t buf = { (char *)pkt->wire, pkt->size };
- ret = uv_udp_try_send((uv_udp_t *)handle, &buf, 1, addr);
+ req->as.send.data = task;
+ ret = uv_udp_send(&req->as.send, (uv_udp_t *)handle, &buf, 1, addr, &on_send);
if (handle != task->source.handle)
task->worker->stats.udp += 1;
} else {
{ (char *)&pkt_size, sizeof(pkt_size) },
{ (char *)pkt->wire, pkt->size }
};
- ret = uv_try_write((uv_stream_t *)handle, buf, 2);
+ req->as.write.data = task;
+ ret = uv_write(&req->as.write, (uv_stream_t *)handle, buf, 2, &on_write);
if (handle != task->source.handle)
task->worker->stats.tcp += 1;
}
- return qr_task_on_send(task, (ret >= 0) ? 0 : -1);
+ return ret;
}
-static void qr_task_on_connect(uv_connect_t *connect, int status)
+static void on_connect(uv_connect_t *req, int status)
{
+ struct qr_task *task = req->data;
if (status == 0) {
- struct qr_task *task = connect->data;
- qr_task_send(task, (uv_handle_t *)connect->handle, NULL, task->next_query);
+ qr_task_send(task, (uv_handle_t *)req->handle, NULL, task->next_query);
}
- free(connect);
+ ioreq_release(task->worker, (struct ioreq *)req);
}
static int qr_task_finalize(struct qr_task *task, int state)
/* Connect or issue query datagram */
task->next_handle->data = task;
if (sock_type == SOCK_STREAM) {
- /* connect handle must be persistent even if the task mempool drops,
- * as it is referenced internally in the libuv event loop */
- uv_connect_t *connect = malloc(sizeof(*connect));
- if (!connect || uv_tcp_connect(connect, (uv_tcp_t *)task->next_handle, addr, qr_task_on_connect) != 0) {
- free(connect);
+ struct ioreq *req = ioreq_take(task->worker);
+ if (!req || uv_tcp_connect(&req->as.connect, (uv_tcp_t *)task->next_handle, addr, on_connect) != 0) {
+ ioreq_release(task->worker, req);
return qr_task_step(task, NULL);
}
- connect->data = task;
+ req->as.connect.data = task;
} else {
if (qr_task_send(task, task->next_handle, addr, next_query) != 0) {
return qr_task_step(task, NULL);
return array_reserve(worker->pools, ring_maxlen);
}
+#define reclaim_freelist(list, cb) \
+ for (unsigned i = 0; i < list.len; ++i) { \
+ cb(list.at[i]); \
+ } \
+ array_clear(list)
+
void worker_reclaim(struct worker_ctx *worker)
{
- mp_freelist_t *pools = &worker->pools;
- for (unsigned i = 0; i < pools->len; ++i) {
- mp_delete(pools->at[i]);
- }
- array_clear(*pools);
+ reclaim_freelist(worker->pools, mp_delete);
+ reclaim_freelist(worker->ioreqs, free);
}