]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
daemon/worker: asynchronous I/O requests
authorMarek Vavruša <marek.vavrusa@nic.cz>
Wed, 8 Jul 2015 01:36:48 +0000 (03:36 +0200)
committerMarek Vavruša <marek.vavrusa@nic.cz>
Wed, 8 Jul 2015 01:36:48 +0000 (03:36 +0200)
this can coalesce sends/writes in future versions of libuv

daemon/worker.c

index d5dc65ee1c80731bb96a11af929410fd73fe5cd4..3df6c805d187a2946270cd36fc865d484df70b37 100644 (file)
 #include "daemon/engine.h"
 #include "daemon/io.h"
 
+/* @internal IO request entry. */
+struct ioreq
+{
+        union {
+                uv_udp_send_t send;
+                uv_write_t    write;
+                uv_connect_t  connect;
+        } as;
+};
+
+static inline struct ioreq *ioreq_take(struct worker_ctx *worker)
+{
+        struct ioreq *req = NULL;
+       if (worker->ioreqs.len > 0) {
+               req = array_tail(worker->ioreqs);
+               array_pop(worker->ioreqs);
+       } else {
+               req = malloc(sizeof(*req));
+       }
+       return req;
+}
+
+static inline void ioreq_release(struct worker_ctx *worker, struct ioreq *req)
+{
+       if (!req || worker->ioreqs.len < MP_FREELIST_SIZE) {
+               array_push(worker->ioreqs, req);
+       } else {
+               free(req);
+       }
+}
+
 /** @internal Query resolution task. */
 struct qr_task
 {
@@ -178,15 +209,31 @@ static int qr_task_on_send(struct qr_task *task, int status)
        return status;
 }
 
+static void on_send(uv_udp_send_t *req, int status)
+{
+       struct qr_task *task = req->data;
+        qr_task_on_send(task, status);
+        ioreq_release(task->worker, (struct ioreq *)req);
+}
+
+static void on_write(uv_write_t *req, int status)
+{
+       struct qr_task *task = req->data;
+        qr_task_on_send(task, status);
+        ioreq_release(task->worker, (struct ioreq *)req);
+}
+
 static int qr_task_send(struct qr_task *task, uv_handle_t *handle, struct sockaddr *addr, knot_pkt_t *pkt)
 {
        int ret = 0;
-       if (!handle) {
+       struct ioreq *req = ioreq_take(task->worker);
+       if (!handle || !req) {
                return qr_task_on_send(task, kr_error(EIO));
        }
        if (handle->type == UV_UDP) {
                uv_buf_t buf = { (char *)pkt->wire, pkt->size };
-               ret = uv_udp_try_send((uv_udp_t *)handle, &buf, 1, addr);
+               req->as.send.data = task;
+               ret = uv_udp_send(&req->as.send, (uv_udp_t *)handle, &buf, 1, addr, &on_send);
                if (handle != task->source.handle)
                        task->worker->stats.udp += 1;
        } else {
@@ -195,20 +242,21 @@ static int qr_task_send(struct qr_task *task, uv_handle_t *handle, struct sockad
                        { (char *)&pkt_size, sizeof(pkt_size) },
                        { (char *)pkt->wire, pkt->size }
                };
-               ret = uv_try_write((uv_stream_t *)handle, buf, 2);
+               req->as.write.data = task;
+               ret = uv_write(&req->as.write, (uv_stream_t *)handle, buf, 2, &on_write);
                if (handle != task->source.handle)
                        task->worker->stats.tcp += 1;
        }
-       return qr_task_on_send(task, (ret >= 0) ? 0 : -1);
+       return ret;
 }
 
-static void qr_task_on_connect(uv_connect_t *connect, int status)
+static void on_connect(uv_connect_t *req, int status)
 {
+       struct qr_task *task = req->data;
        if (status == 0) {
-               struct qr_task *task = connect->data;
-               qr_task_send(task, (uv_handle_t *)connect->handle, NULL, task->next_query);
+               qr_task_send(task, (uv_handle_t *)req->handle, NULL, task->next_query);
        }
-       free(connect);
+       ioreq_release(task->worker, (struct ioreq *)req);
 }
 
 static int qr_task_finalize(struct qr_task *task, int state)
@@ -257,14 +305,12 @@ static int qr_task_step(struct qr_task *task, knot_pkt_t *packet)
        /* Connect or issue query datagram */
        task->next_handle->data = task;
        if (sock_type == SOCK_STREAM) {
-               /* connect handle must be persistent even if the task mempool drops,
-                * as it is referenced internally in the libuv event loop */
-               uv_connect_t *connect = malloc(sizeof(*connect));
-               if (!connect || uv_tcp_connect(connect, (uv_tcp_t *)task->next_handle, addr, qr_task_on_connect) != 0) {
-                       free(connect);
+               struct ioreq *req = ioreq_take(task->worker);
+               if (!req || uv_tcp_connect(&req->as.connect, (uv_tcp_t *)task->next_handle, addr, on_connect) != 0) {
+                       ioreq_release(task->worker, req);
                        return qr_task_step(task, NULL);
                }
-               connect->data = task;
+               req->as.connect.data = task;
        } else {
                if (qr_task_send(task, task->next_handle, addr, next_query) != 0) {
                        return qr_task_step(task, NULL);
@@ -323,11 +369,14 @@ int worker_reserve(struct worker_ctx *worker, size_t ring_maxlen)
        return array_reserve(worker->pools, ring_maxlen);
 }
 
+#define reclaim_freelist(list, cb) \
+       for (unsigned i = 0; i < list.len; ++i) { \
+               cb(list.at[i]); \
+       } \
+       array_clear(list)
+
 void worker_reclaim(struct worker_ctx *worker)
 {
-       mp_freelist_t *pools = &worker->pools;
-       for (unsigned i = 0; i < pools->len; ++i) {
-               mp_delete(pools->at[i]);
-       }
-       array_clear(*pools);
+       reclaim_freelist(worker->pools, mp_delete);
+       reclaim_freelist(worker->ioreqs, free);
 }