]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
daemon/io: reassemble DNS/TCP message fragments
authorMarek Vavruša <marek.vavrusa@nic.cz>
Sat, 10 Oct 2015 17:40:53 +0000 (19:40 +0200)
committerMarek Vavruša <marek.vavrusa@nic.cz>
Sat, 10 Oct 2015 17:40:53 +0000 (19:40 +0200)
daemon/io.c
daemon/worker.c
daemon/worker.h

index 272b589b75033ff0d378aa3d8f94b9d26c75397d..5f4232cec1f43ed24a6dc57ddc199d9b513160b7 100644 (file)
@@ -86,25 +86,15 @@ static void tcp_recv(uv_stream_t *handle, ssize_t nread, const uv_buf_t *buf)
        uv_loop_t *loop = handle->loop;
        struct worker_ctx *worker = loop->data;
 
-       /* Check for originator connection close / not enough bytes */
-       if (nread < 2) {
-               if (!handle->data) {
-                       /* @todo Notify the endpoint if master socket */
+       /* Check for originator connection close. */
+       if (nread <= 0) {
+               if (handle->data) {
+                       worker_exec(worker, (uv_handle_t *)handle, NULL, NULL);
                }
-               worker_exec(worker, (uv_handle_t *)handle, NULL, NULL);
                return;
        }
-
-       /** @todo This is not going to work if the packet is fragmented in the stream ! */
-       uint16_t nbytes = wire_read_u16((const uint8_t *)buf->base);
-       if (nbytes + 2 < nread) {
-               worker_exec(worker, (uv_handle_t *)handle, NULL, NULL);
-               return;
-       }
-
-       knot_pkt_t *query = knot_pkt_new(buf->base + 2, nbytes, &worker->pkt_pool);
-       query->max_size = sizeof(worker->wire_buf);
-       int ret = worker_exec(worker, (uv_handle_t *)handle, query, NULL);
+       
+       int ret = worker_process_tcp(worker, (uv_handle_t *)handle, (const uint8_t *)buf->base, nread);
        if (ret == 0) {
                /* Push - pull, stop reading from this handle until
                 * the task is finished. Since the handle has no track of the
index e95d749dae9a676e44ee87e001fb4bf8be4c6214..fcdb8d59da919207d42817b5a226db00c498e7fc 100644 (file)
@@ -79,6 +79,7 @@ struct qr_task
        } source;
        uint16_t iter_count;
        uint16_t refs;
+       uint16_t bytes_remaining;
 };
 
 /* Convenience macros */
@@ -143,6 +144,7 @@ static struct qr_task *qr_task_create(struct worker_ctx *worker, uv_handle_t *ha
        task->req.answer = answer;
        task->pktbuf = pktbuf;
        task->iohandle = NULL;
+       task->bytes_remaining = 0;
        task->iter_count = 0;
        task->refs = 1;
        task->worker = worker;
@@ -435,6 +437,67 @@ int worker_exec(struct worker_ctx *worker, uv_handle_t *handle, knot_pkt_t *quer
        return qr_task_step(task, addr, query);
 }
 
+/* Return DNS/TCP message size. */
+static int msg_size(const uint8_t *msg, size_t len)
+{
+               if (len < 2) {
+                       return kr_error(EMSGSIZE);
+               }
+               uint16_t nbytes = wire_read_u16(msg);
+               if (nbytes > len - 2) {
+                       return kr_error(EMSGSIZE);
+               }
+               return nbytes;
+}
+
+int worker_process_tcp(struct worker_ctx *worker, uv_handle_t *handle, const uint8_t *msg, size_t len)
+{
+       if (!worker || !handle || !msg) {
+               return kr_error(EINVAL);
+       }
+
+       int nbytes = msg_size(msg, len);
+       struct qr_task *task = handle->data;
+       const bool start_assembly = (task && task->bytes_remaining == 0);
+
+       /* Message is a query (we have no context to buffer it) or complete. */
+       if (!task || (start_assembly && nbytes == len - 2)) {
+               if (nbytes <= 0) {
+                       return worker_exec(worker, (uv_handle_t *)handle, NULL, NULL);  
+               }
+               knot_pkt_t *pkt_nocopy = knot_pkt_new((void *)(msg + 2), nbytes, &worker->pkt_pool);
+               return worker_exec(worker, handle, pkt_nocopy, NULL);
+       }
+       /* Starting a new message assembly */
+       knot_pkt_t *pkt_buf = task->pktbuf;
+       if (start_assembly) {
+               if (nbytes <= 0) {
+                       return worker_exec(worker, (uv_handle_t *)handle, NULL, NULL);  
+               }       
+               knot_pkt_clear(pkt_buf);
+               pkt_buf->size = 0;
+               /* Cut off message length */
+               task->bytes_remaining = nbytes;
+               len -= 2;
+               msg += 2;
+       }
+       /* Message is too long, can't process it. */
+       if (len > pkt_buf->max_size - pkt_buf->size) {
+               task->bytes_remaining = 0;
+               return worker_exec(worker, handle, NULL, NULL);
+       }
+       /* Buffer message and check if it's complete */
+       memcpy(pkt_buf->wire + pkt_buf->size, msg, len);
+       pkt_buf->size += len;
+       if (len >= task->bytes_remaining) {
+               task->bytes_remaining = 0;
+               return worker_exec(worker, handle, pkt_buf, NULL);
+       }
+       /* Return number of bytes remaining to receive. */
+       task->bytes_remaining -= len;
+       return task->bytes_remaining;
+}
+
 int worker_resolve(struct worker_ctx *worker, knot_pkt_t *query, unsigned options, worker_cb_t on_complete, void *baton)
 {
        if (!worker || !query) {
index 537281b61002a6a781e9e14cd3fdb3c66b37cc3a..691680a44669af6e7c1efa33c20fd00e2867423f 100644 (file)
@@ -57,6 +57,14 @@ typedef void (*worker_cb_t)(struct worker_ctx *worker, struct kr_request *req, v
  */
 int worker_exec(struct worker_ctx *worker, uv_handle_t *handle, knot_pkt_t *query, const struct sockaddr* addr);
 
+/**
+ * Process incoming DNS/TCP message fragment.
+ * If the fragment contains only a partial message, it is buffered.
+ * If the fragment contains a complete query or completes current fragment, execute it.
+ * @return 0, number of bytes remaining to assemble, or an error code
+ */
+int worker_process_tcp(struct worker_ctx *worker, uv_handle_t *handle, const uint8_t *msg, size_t len);
+
 /**
  * Schedule query for resolution.
  * @return 0 or an error code