From: Marek VavruĊĦa Date: Wed, 8 Jul 2015 01:36:48 +0000 (+0200) Subject: daemon/worker: asynchronous I/O requests X-Git-Tag: v1.0.0-beta1~83^2~1 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=8b4bb4432dd579d6d76c1c91d75f7c9f09835649;p=thirdparty%2Fknot-resolver.git daemon/worker: asynchronous I/O requests this can coalesce sends/writes in future versions of libuv --- diff --git a/daemon/worker.c b/daemon/worker.c index d5dc65ee1..3df6c805d 100644 --- a/daemon/worker.c +++ b/daemon/worker.c @@ -26,6 +26,37 @@ #include "daemon/engine.h" #include "daemon/io.h" +/* @internal IO request entry. */ +struct ioreq +{ + union { + uv_udp_send_t send; + uv_write_t write; + uv_connect_t connect; + } as; +}; + +static inline struct ioreq *ioreq_take(struct worker_ctx *worker) +{ + struct ioreq *req = NULL; + if (worker->ioreqs.len > 0) { + req = array_tail(worker->ioreqs); + array_pop(worker->ioreqs); + } else { + req = malloc(sizeof(*req)); + } + return req; +} + +static inline void ioreq_release(struct worker_ctx *worker, struct ioreq *req) +{ + if (!req || worker->ioreqs.len < MP_FREELIST_SIZE) { + array_push(worker->ioreqs, req); + } else { + free(req); + } +} + /** @internal Query resolution task. */ struct qr_task { @@ -178,15 +209,31 @@ static int qr_task_on_send(struct qr_task *task, int status) return status; } +static void on_send(uv_udp_send_t *req, int status) +{ + struct qr_task *task = req->data; + qr_task_on_send(task, status); + ioreq_release(task->worker, (struct ioreq *)req); +} + +static void on_write(uv_write_t *req, int status) +{ + struct qr_task *task = req->data; + qr_task_on_send(task, status); + ioreq_release(task->worker, (struct ioreq *)req); +} + static int qr_task_send(struct qr_task *task, uv_handle_t *handle, struct sockaddr *addr, knot_pkt_t *pkt) { int ret = 0; - if (!handle) { + struct ioreq *req = ioreq_take(task->worker); + if (!handle || !req) { return qr_task_on_send(task, kr_error(EIO)); } if (handle->type == UV_UDP) { uv_buf_t buf = { (char *)pkt->wire, pkt->size }; - ret = uv_udp_try_send((uv_udp_t *)handle, &buf, 1, addr); + req->as.send.data = task; + ret = uv_udp_send(&req->as.send, (uv_udp_t *)handle, &buf, 1, addr, &on_send); if (handle != task->source.handle) task->worker->stats.udp += 1; } else { @@ -195,20 +242,21 @@ static int qr_task_send(struct qr_task *task, uv_handle_t *handle, struct sockad { (char *)&pkt_size, sizeof(pkt_size) }, { (char *)pkt->wire, pkt->size } }; - ret = uv_try_write((uv_stream_t *)handle, buf, 2); + req->as.write.data = task; + ret = uv_write(&req->as.write, (uv_stream_t *)handle, buf, 2, &on_write); if (handle != task->source.handle) task->worker->stats.tcp += 1; } - return qr_task_on_send(task, (ret >= 0) ? 0 : -1); + return ret; } -static void qr_task_on_connect(uv_connect_t *connect, int status) +static void on_connect(uv_connect_t *req, int status) { + struct qr_task *task = req->data; if (status == 0) { - struct qr_task *task = connect->data; - qr_task_send(task, (uv_handle_t *)connect->handle, NULL, task->next_query); + qr_task_send(task, (uv_handle_t *)req->handle, NULL, task->next_query); } - free(connect); + ioreq_release(task->worker, (struct ioreq *)req); } static int qr_task_finalize(struct qr_task *task, int state) @@ -257,14 +305,12 @@ static int qr_task_step(struct qr_task *task, knot_pkt_t *packet) /* Connect or issue query datagram */ task->next_handle->data = task; if (sock_type == SOCK_STREAM) { - /* connect handle must be persistent even if the task mempool drops, - * as it is referenced internally in the libuv event loop */ - uv_connect_t *connect = malloc(sizeof(*connect)); - if (!connect || uv_tcp_connect(connect, (uv_tcp_t *)task->next_handle, addr, qr_task_on_connect) != 0) { - free(connect); + struct ioreq *req = ioreq_take(task->worker); + if (!req || uv_tcp_connect(&req->as.connect, (uv_tcp_t *)task->next_handle, addr, on_connect) != 0) { + ioreq_release(task->worker, req); return qr_task_step(task, NULL); } - connect->data = task; + req->as.connect.data = task; } else { if (qr_task_send(task, task->next_handle, addr, next_query) != 0) { return qr_task_step(task, NULL); @@ -323,11 +369,14 @@ int worker_reserve(struct worker_ctx *worker, size_t ring_maxlen) return array_reserve(worker->pools, ring_maxlen); } +#define reclaim_freelist(list, cb) \ + for (unsigned i = 0; i < list.len; ++i) { \ + cb(list.at[i]); \ + } \ + array_clear(list) + void worker_reclaim(struct worker_ctx *worker) { - mp_freelist_t *pools = &worker->pools; - for (unsigned i = 0; i < pools->len; ++i) { - mp_delete(pools->at[i]); - } - array_clear(*pools); + reclaim_freelist(worker->pools, mp_delete); + reclaim_freelist(worker->ioreqs, free); }