struct kr_query *qry = array_tail(follower->ctx->req.rplan.pending);
qry->id = leader_qry->id;
qry->secret = leader_qry->secret;
+
+ // Note that this transport may not be present in `leader_qry`'s server selection
follower->transport = task->transport;
+ if(follower->transport) {
+ follower->transport->deduplicated = true;
+ }
leader_qry->secret = 0; /* Next will be already decoded */
}
qr_task_step(follower, packet_source, pkt);
}
void update_rtt(struct kr_query *qry, struct address_state *addr_state, const struct kr_transport *transport, unsigned rtt) {
- if (!transport) {
+ if (!transport || !addr_state) {
return;
}
}
void cache_timeout(const struct kr_transport *transport, struct address_state *addr_state, struct kr_cache *cache) {
+ if (transport->deduplicated) {
+ // Transport was chosen by a different query, that one will cache the result.
+ return;
+ }
+
uint8_t *address = ip_to_bytes(&transport->address, transport->address_len);
struct rtt_state old_state = addr_state->rtt_state;
struct rtt_state cur_state = get_rtt_state(address, transport->address_len, cache);
void error(struct kr_query *qry, struct address_state *addr_state, const struct kr_transport *transport, enum kr_selection_error sel_error) {
- if (!transport) {
+ if (!transport || !addr_state) {
return;
}
if (sel_error == KR_SELECTION_TIMEOUT) {
qry->server_selection.timeouts++;
- cache_timeout(transport, addr_state, &qry->request->ctx->cache);
+ if (!transport->deduplicated) {
+ // Make sure the query was chosen by this query
+ cache_timeout(transport, addr_state, &qry->request->ctx->cache);
+ }
}
addr_state->errors[sel_error]++;
size_t address_len;
enum kr_transport_protocol protocol;
unsigned timeout;
+ bool deduplicated; // True iff transport was set in worker.c:subreq_finalize,
+ // that means it may be different from the one originally chosen one.
};
struct kr_server_selection
}
void forward_error(struct kr_query *qry, const struct kr_transport *transport, enum kr_selection_error sel_error) {
+ if (!qry->server_selection.initialized) {
+ return;
+ }
struct forward_local_state *local_state = qry->server_selection.local_state;
struct address_state *addr_state = &local_state->addr_states[local_state->last_choice_index];
error(qry, addr_state, transport, sel_error);
}
void forward_update_rtt(struct kr_query *qry, const struct kr_transport *transport, unsigned rtt) {
+ if (!qry->server_selection.initialized) {
+ return;
+ }
+
if (!transport) {
return;
}
unsigned int generation;
};
-void iter_local_state_init(struct knot_mm *mm, void **local_state) {
+void iter_local_state_alloc(struct knot_mm *mm, void **local_state) {
*local_state = mm_alloc(mm, sizeof(struct iter_local_state));
memset(*local_state, 0, sizeof(struct iter_local_state));
}
struct address_state *get_address_state(struct iter_local_state *local_state, const struct kr_transport *transport) {
+ if (!transport) {
+ return NULL;
+ }
+
trie_t *addresses = local_state->addresses;
uint8_t *address = ip_to_bytes(&transport->address, transport->address_len);
trie_val_t *address_state = trie_get_try(addresses, (char *)address, transport->address_len);
if (!address_state) {
+ if (transport->deduplicated) {
+ // Transport was chosen by a different query
+ return NULL;
+ }
+
assert(0);
}
return (struct address_state *)*address_state;
}
void iter_error(struct kr_query *qry, const struct kr_transport *transport, enum kr_selection_error sel_error) {
+ if (!qry->server_selection.initialized) {
+ return;
+ }
struct iter_local_state *local_state = qry->server_selection.local_state;
struct address_state *addr_state = get_address_state(local_state, transport);
error(qry, addr_state, transport, sel_error);
}
void iter_update_rtt(struct kr_query *qry, const struct kr_transport *transport, unsigned rtt) {
+ if (!qry->server_selection.initialized) {
+ return;
+ }
struct iter_local_state *local_state = qry->server_selection.local_state;
struct address_state *addr_state = get_address_state(local_state, transport);
update_rtt(qry, addr_state, transport, rtt);