From: Marek VavruĊĦa Date: Sat, 10 Oct 2015 17:40:53 +0000 (+0200) Subject: daemon/io: reassemble DNS/TCP message fragments X-Git-Tag: v1.0.0-beta2~99 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=9719c8c22d5da2ee1d43f571c11ffc3ca3cc2715;p=thirdparty%2Fknot-resolver.git daemon/io: reassemble DNS/TCP message fragments --- diff --git a/daemon/io.c b/daemon/io.c index 272b589b7..5f4232cec 100644 --- a/daemon/io.c +++ b/daemon/io.c @@ -86,25 +86,15 @@ static void tcp_recv(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf) uv_loop_t *loop = handle->loop; struct worker_ctx *worker = loop->data; - /* Check for originator connection close / not enough bytes */ - if (nread < 2) { - if (!handle->data) { - /* @todo Notify the endpoint if master socket */ + /* Check for originator connection close. */ + if (nread <= 0) { + if (handle->data) { + worker_exec(worker, (uv_handle_t *)handle, NULL, NULL); } - worker_exec(worker, (uv_handle_t *)handle, NULL, NULL); return; } - - /** @todo This is not going to work if the packet is fragmented in the stream ! */ - uint16_t nbytes = wire_read_u16((const uint8_t *)buf->base); - if (nbytes + 2 < nread) { - worker_exec(worker, (uv_handle_t *)handle, NULL, NULL); - return; - } - - knot_pkt_t *query = knot_pkt_new(buf->base + 2, nbytes, &worker->pkt_pool); - query->max_size = sizeof(worker->wire_buf); - int ret = worker_exec(worker, (uv_handle_t *)handle, query, NULL); + + int ret = worker_process_tcp(worker, (uv_handle_t *)handle, (const uint8_t *)buf->base, nread); if (ret == 0) { /* Push - pull, stop reading from this handle until * the task is finished. Since the handle has no track of the diff --git a/daemon/worker.c b/daemon/worker.c index e95d749da..fcdb8d59d 100644 --- a/daemon/worker.c +++ b/daemon/worker.c @@ -79,6 +79,7 @@ struct qr_task } source; uint16_t iter_count; uint16_t refs; + uint16_t bytes_remaining; }; /* Convenience macros */ @@ -143,6 +144,7 @@ static struct qr_task *qr_task_create(struct worker_ctx *worker, uv_handle_t *ha task->req.answer = answer; task->pktbuf = pktbuf; task->iohandle = NULL; + task->bytes_remaining = 0; task->iter_count = 0; task->refs = 1; task->worker = worker; @@ -435,6 +437,67 @@ int worker_exec(struct worker_ctx *worker, uv_handle_t *handle, knot_pkt_t *quer return qr_task_step(task, addr, query); } +/* Return DNS/TCP message size. */ +static int msg_size(const uint8_t *msg, size_t len) +{ + if (len < 2) { + return kr_error(EMSGSIZE); + } + uint16_t nbytes = wire_read_u16(msg); + if (nbytes > len - 2) { + return kr_error(EMSGSIZE); + } + return nbytes; +} + +int worker_process_tcp(struct worker_ctx *worker, uv_handle_t *handle, const uint8_t *msg, size_t len) +{ + if (!worker || !handle || !msg) { + return kr_error(EINVAL); + } + + int nbytes = msg_size(msg, len); + struct qr_task *task = handle->data; + const bool start_assembly = (task && task->bytes_remaining == 0); + + /* Message is a query (we have no context to buffer it) or complete. */ + if (!task || (start_assembly && nbytes == len - 2)) { + if (nbytes <= 0) { + return worker_exec(worker, (uv_handle_t *)handle, NULL, NULL); + } + knot_pkt_t *pkt_nocopy = knot_pkt_new((void *)(msg + 2), nbytes, &worker->pkt_pool); + return worker_exec(worker, handle, pkt_nocopy, NULL); + } + /* Starting a new message assembly */ + knot_pkt_t *pkt_buf = task->pktbuf; + if (start_assembly) { + if (nbytes <= 0) { + return worker_exec(worker, (uv_handle_t *)handle, NULL, NULL); + } + knot_pkt_clear(pkt_buf); + pkt_buf->size = 0; + /* Cut off message length */ + task->bytes_remaining = nbytes; + len -= 2; + msg += 2; + } + /* Message is too long, can't process it. */ + if (len > pkt_buf->max_size - pkt_buf->size) { + task->bytes_remaining = 0; + return worker_exec(worker, handle, NULL, NULL); + } + /* Buffer message and check if it's complete */ + memcpy(pkt_buf->wire + pkt_buf->size, msg, len); + pkt_buf->size += len; + if (len >= task->bytes_remaining) { + task->bytes_remaining = 0; + return worker_exec(worker, handle, pkt_buf, NULL); + } + /* Return number of bytes remaining to receive. */ + task->bytes_remaining -= len; + return task->bytes_remaining; +} + int worker_resolve(struct worker_ctx *worker, knot_pkt_t *query, unsigned options, worker_cb_t on_complete, void *baton) { if (!worker || !query) { diff --git a/daemon/worker.h b/daemon/worker.h index 537281b61..691680a44 100644 --- a/daemon/worker.h +++ b/daemon/worker.h @@ -57,6 +57,14 @@ typedef void (*worker_cb_t)(struct worker_ctx *worker, struct kr_request *req, v */ int worker_exec(struct worker_ctx *worker, uv_handle_t *handle, knot_pkt_t *query, const struct sockaddr* addr); +/** + * Process incoming DNS/TCP message fragment. + * If the fragment contains only a partial message, it is buffered. + * If the fragment contains a complete query or completes current fragment, execute it. + * @return 0, number of bytes remaining to assemble, or an error code + */ +int worker_process_tcp(struct worker_ctx *worker, uv_handle_t *handle, const uint8_t *msg, size_t len); + /** * Schedule query for resolution. * @return 0 or an error code