struct kr_request req;
struct worker_ctx *worker;
knot_pkt_t *pktbuf;
+ array_t(struct qr_task *) waiting;
uv_handle_t *pending[MAX_PENDING];
uint16_t pending_count;
uint16_t addrlist_count;
uint16_t iter_count;
uint16_t refs;
uint16_t bytes_remaining;
- uint16_t finished;
+ bool finished;
+ bool leading;
};
/* Convenience macros */
}
task->req.answer = answer;
task->pktbuf = pktbuf;
+ array_init(task->waiting);
task->addrlist = NULL;
task->pending_count = 0;
task->bytes_remaining = 0;
task->iter_count = 0;
task->refs = 1;
task->finished = false;
+ task->leading = false;
task->worker = worker;
task->source.handle = handle;
uv_timer_init(worker->loop, &task->retry);
struct worker_ctx *worker = task->worker;
/* Kill pending I/O requests */
ioreq_killall(task);
+ assert(task->waiting.len == 0);
+ assert(task->leading == false);
/* Run the completion callback. */
if (task->on_complete) {
task->on_complete(worker, &task->req, task->baton);
}
}
-static int qr_task_finalize(struct qr_task *task, int state)
+/** @internal Get key from current outstanding subrequest. */
+static int subreq_key(char *dst, struct qr_task *task)
{
- kr_resolve_finish(&task->req, state);
- task->finished = true;
- /* Send back answer */
- (void) qr_task_send(task, task->source.handle, (struct sockaddr *)&task->source.addr, task->req.answer);
- return state == KNOT_STATE_DONE ? 0 : kr_error(EIO);
+ assert(task);
+ knot_pkt_t *pkt = task->pktbuf;
+ assert(knot_wire_get_qr(pkt->wire) == false);
+ return kr_rrmap_key(dst, knot_pkt_qname(pkt), knot_pkt_qtype(pkt), knot_pkt_qclass(pkt));
}
-static void cancel_subrequests(struct qr_task *task)
+static void subreq_finalize(struct qr_task *task, const struct sockaddr *packet_source, knot_pkt_t *pkt)
{
/* Close pending I/O requests */
if (uv_is_active((uv_handle_t *)&task->retry))
if (uv_is_active((uv_handle_t *)&task->timeout))
uv_timer_stop(&task->timeout);
ioreq_killall(task);
+ /* Clear from outstanding table. */
+ if (!task->leading)
+ return;
+ char key[RRMAP_KEYSIZE];
+ int ret = subreq_key(key, task);
+ if (ret > 0) {
+ assert(map_get(&task->worker->outstanding, key) == task);
+ map_del(&task->worker->outstanding, key);
+ }
+ /* Notify waiting tasks. */
+ struct kr_query *leader_qry = TAIL(task->req.rplan.pending);
+ for (size_t i = task->waiting.len; i --> 0;) {
+ struct qr_task *follower = task->waiting.at[i];
+ struct kr_query *qry = TAIL(follower->req.rplan.pending);
+ /* Reuse MSGID and 0x20 secret */
+ if (qry) {
+ qry->id = leader_qry->id;
+ qry->secret = leader_qry->secret;
+ leader_qry->secret = 0; /* Next will be already decoded */
+ }
+ qr_task_step(follower, packet_source, pkt);
+ qr_task_unref(follower);
+ }
+ task->waiting.len = 0;
+ task->leading = false;
+}
+
+static void subreq_lead(struct qr_task *task)
+{
+ assert(task);
+ char key[RRMAP_KEYSIZE];
+ if (subreq_key(key, task) > 0) {
+ assert(map_contains(&task->worker->outstanding, key) == false);
+ map_set(&task->worker->outstanding, key, task);
+ task->leading = true;
+ }
+}
+
+static bool subreq_enqueue(struct qr_task *task)
+{
+ assert(task);
+ char key[RRMAP_KEYSIZE];
+ if (subreq_key(key, task) > 0) {
+ struct qr_task *leader = map_get(&task->worker->outstanding, key);
+ if (leader) {
+ /* Enqueue itself to leader for this subrequest. */
+ int ret = array_reserve_mm(leader->waiting, leader->waiting.len + 1, mm_reserve, &leader->req.pool);
+ if (ret == 0) {
+ array_push(leader->waiting, task);
+ qr_task_ref(task);
+ return true;
+ }
+ }
+ }
+ return false;
+}
+
+static int qr_task_finalize(struct qr_task *task, int state)
+{
+ assert(task && task->leading == false);
+ kr_resolve_finish(&task->req, state);
+ task->finished = true;
+ /* Send back answer */
+ (void) qr_task_send(task, task->source.handle, (struct sockaddr *)&task->source.addr, task->req.answer);
+ return state == KNOT_STATE_DONE ? 0 : kr_error(EIO);
}
static int qr_task_step(struct qr_task *task, const struct sockaddr *packet_source, knot_pkt_t *packet)
return kr_error(ESTALE);
}
/* Close pending I/O requests */
- cancel_subrequests(task);
+ subreq_finalize(task, packet_source, packet);
/* Consume input and produce next query */
int sock_type = -1;
task->addrlist = NULL;
/* Start fast retransmit with UDP, otherwise connect. */
if (sock_type == SOCK_DGRAM) {
+ /* If such subrequest is outstanding, enqueue to it. */
+ if (subreq_enqueue(task)) {
+ return kr_ok(); /* Will be notified when outstanding subrequest finishes. */
+ }
+ /* Start transmitting */
if (retransmit(task)) {
uv_timer_start(&task->retry, on_retransmit, KR_CONN_RETRY, KR_CONN_RETRY);
} else {
return qr_task_step(task, NULL, NULL);
}
+ /* Announce and start subrequest.
+ * @note Only UDP can lead I/O as it doesn't touch 'task->pktbuf' for reassembly.
+ */
+ subreq_lead(task);
} else {
struct ioreq *conn = ioreq_take(task->worker);
if (!conn) {
}
conn->as.connect.data = task;
if (uv_tcp_connect(&conn->as.connect, (uv_tcp_t *)client, task->addrlist, on_connect) != 0) {
- DEBUG_MSG("task conn_start %p => failed\n", task);
ioreq_release(task->worker, conn);
return qr_task_step(task, NULL, NULL);
}
/* Start next step with timeout, fatal if can't start a timer. */
int ret = uv_timer_start(&task->timeout, on_timeout, KR_CONN_RTT_MAX, 0);
- if (ret != 0)
+ if (ret != 0) {
+ subreq_finalize(task, packet_source, packet);
return qr_task_finalize(task, KNOT_STATE_FAIL);
- return kr_ok();
+ }
+
+ return ret;
}
static int parse_packet(knot_pkt_t *query)
memset(&worker->pkt_pool, 0, sizeof(worker->pkt_pool));
worker->pkt_pool.ctx = mp_new (4 * sizeof(knot_pkt_t));
worker->pkt_pool.alloc = (mm_alloc_t) mp_alloc;
+ worker->outstanding = map_make();
return kr_ok();
}
reclaim_freelist(worker->ioreqs, struct ioreq, free);
mp_delete(worker->pkt_pool.ctx);
worker->pkt_pool.ctx = NULL;
+ map_clear(&worker->outstanding);
}
#undef DEBUG_MSG