uv_loop_t *loop = handle->loop;
struct worker_ctx *worker = loop->data;
- /* Check for originator connection close / not enough bytes */
- if (nread < 2) {
- if (!handle->data) {
- /* @todo Notify the endpoint if master socket */
+ /* Check for originator connection close. */
+ if (nread <= 0) {
+ if (handle->data) {
+ worker_exec(worker, (uv_handle_t *)handle, NULL, NULL);
}
- worker_exec(worker, (uv_handle_t *)handle, NULL, NULL);
return;
}
-
- /** @todo This is not going to work if the packet is fragmented in the stream ! */
- uint16_t nbytes = wire_read_u16((const uint8_t *)buf->base);
- if (nbytes + 2 < nread) {
- worker_exec(worker, (uv_handle_t *)handle, NULL, NULL);
- return;
- }
-
- knot_pkt_t *query = knot_pkt_new(buf->base + 2, nbytes, &worker->pkt_pool);
- query->max_size = sizeof(worker->wire_buf);
- int ret = worker_exec(worker, (uv_handle_t *)handle, query, NULL);
+
+ int ret = worker_process_tcp(worker, (uv_handle_t *)handle, (const uint8_t *)buf->base, nread);
if (ret == 0) {
/* Push - pull, stop reading from this handle until
* the task is finished. Since the handle has no track of the
} source;
uint16_t iter_count;
uint16_t refs;
+ uint16_t bytes_remaining;
};
/* Convenience macros */
task->req.answer = answer;
task->pktbuf = pktbuf;
task->iohandle = NULL;
+ task->bytes_remaining = 0;
task->iter_count = 0;
task->refs = 1;
task->worker = worker;
return qr_task_step(task, addr, query);
}
+/* Return DNS/TCP message size. */
+static int msg_size(const uint8_t *msg, size_t len)
+{
+ if (len < 2) {
+ return kr_error(EMSGSIZE);
+ }
+ uint16_t nbytes = wire_read_u16(msg);
+ if (nbytes > len - 2) {
+ return kr_error(EMSGSIZE);
+ }
+ return nbytes;
+}
+
+int worker_process_tcp(struct worker_ctx *worker, uv_handle_t *handle, const uint8_t *msg, size_t len)
+{
+ if (!worker || !handle || !msg) {
+ return kr_error(EINVAL);
+ }
+
+ int nbytes = msg_size(msg, len);
+ struct qr_task *task = handle->data;
+ const bool start_assembly = (task && task->bytes_remaining == 0);
+
+ /* Message is a query (we have no context to buffer it) or complete. */
+ if (!task || (start_assembly && nbytes == len - 2)) {
+ if (nbytes <= 0) {
+ return worker_exec(worker, (uv_handle_t *)handle, NULL, NULL);
+ }
+ knot_pkt_t *pkt_nocopy = knot_pkt_new((void *)(msg + 2), nbytes, &worker->pkt_pool);
+ return worker_exec(worker, handle, pkt_nocopy, NULL);
+ }
+ /* Starting a new message assembly */
+ knot_pkt_t *pkt_buf = task->pktbuf;
+ if (start_assembly) {
+ if (nbytes <= 0) {
+ return worker_exec(worker, (uv_handle_t *)handle, NULL, NULL);
+ }
+ knot_pkt_clear(pkt_buf);
+ pkt_buf->size = 0;
+ /* Cut off message length */
+ task->bytes_remaining = nbytes;
+ len -= 2;
+ msg += 2;
+ }
+ /* Message is too long, can't process it. */
+ if (len > pkt_buf->max_size - pkt_buf->size) {
+ task->bytes_remaining = 0;
+ return worker_exec(worker, handle, NULL, NULL);
+ }
+ /* Buffer message and check if it's complete */
+ memcpy(pkt_buf->wire + pkt_buf->size, msg, len);
+ pkt_buf->size += len;
+ if (len >= task->bytes_remaining) {
+ task->bytes_remaining = 0;
+ return worker_exec(worker, handle, pkt_buf, NULL);
+ }
+ /* Return number of bytes remaining to receive. */
+ task->bytes_remaining -= len;
+ return task->bytes_remaining;
+}
+
int worker_resolve(struct worker_ctx *worker, knot_pkt_t *query, unsigned options, worker_cb_t on_complete, void *baton)
{
if (!worker || !query) {
*/
int worker_exec(struct worker_ctx *worker, uv_handle_t *handle, knot_pkt_t *query, const struct sockaddr* addr);
+/**
+ * Process incoming DNS/TCP message fragment.
+ * If the fragment contains only a partial message, it is buffered.
+ * If the fragment contains a complete query or completes current fragment, execute it.
+ * @return 0, number of bytes remaining to assemble, or an error code
+ */
+int worker_process_tcp(struct worker_ctx *worker, uv_handle_t *handle, const uint8_t *msg, size_t len);
+
/**
* Schedule query for resolution.
* @return 0 or an error code