end
-- Resolve queued records and flush the queue
-function prefetch.dispatch(ev)
+function prefetch.drain(ev)
local deleted = 0
for key, val in pairs(prefetch.queue) do
worker.resolve(string.sub(key, 2), string.byte(key))
- if val > 1 then
- prefetch.queue[key] = val - 1
- else
- prefetch.queue[key] = nil
- end
+ prefetch.queue[key] = nil
deleted = deleted + 1
if deleted >= prefetch.batch then
break
end
end
if deleted > 0 then
- event.after((prefetch.window * 6) * sec, prefetch.dispatch)
+ event.after((prefetch.window * 6) * sec, prefetch.drain)
end
prefetch.queue_len = prefetch.queue_len - deleted
stats['predict.queue'] = prefetch.queue_len
return 0
end
+-- Enqueue queries from set
+local function enqueue(queries)
+ local queued = 0
+ local nr_queries = #queries
+ for i = 1, nr_queries do
+ local entry = queries[i]
+ local key = string.char(entry.type)..entry.name
+ if not prefetch.queue[key] then
+ prefetch.queue[key] = 1
+ queued = queued + 1
+ end
+ end
+ return queued
+end
+
+-- Prefetch soon-to-expire records
+local function refresh()
+ local queries = stats.expiring()
+ stats.clear_expiring()
+ return enqueue(queries)
+end
+
-- Sample current epoch, return number of sampled queries
local function sample(epoch_now)
local queries = stats.frequent()
stats.clear_frequent()
- local start = os.clock()
+ local queued = 0
local current = prefetch.log[epoch_now]
if prefetch.epoch ~= epoch_now or current == nil then
+ if current ~= nil then
+ queued = enqueue(current)
+ end
current = {}
end
local nr_samples = #queries
local key = string.char(entry.type)..entry.name
current[key] = entry.count
end
- print (string.format('[prob] .. sampling epoch: %d/%d, %.2f sec (%d items)', epoch_now, prefetch.sample, os.clock() - start, nr_samples))
prefetch.log[epoch_now] = current
prefetch.sample = prefetch.sample + 1
- return nr_samples
-end
-
--- Prefetch soon-to-expire records
-local function refresh()
- local queries = stats.expiring()
- stats.clear_expiring()
- local nr_samples = #queries
- for i = 1, nr_samples do
- local entry = queries[i]
- local key = string.char(entry.type)..entry.name
- prefetch.queue[key] = 1
- end
- print (string.format('[prob] .. prefetching %d items', nr_samples))
- return nr_samples
+ return nr_samples, queued
end
-- Sample current epoch, return number of sampled queries
local function predict(epoch_now)
- local start = os.clock()
local queued = 0
local period = prefetch.period + 1
for i = 1, prefetch.period / 2 - 1 do
end
end
end
- print (string.format('[prob] predicted epoch: %d, %.2f sec (%d items)', epoch_now, os.clock() - start, queued))
return queued
end
function prefetch.process(ev)
-- Start a new epoch, or continue sampling
local epoch_now = current_epoch()
- local nr_learned = sample(epoch_now)
- local nr_queued = 0
+ local nr_learned, nr_queued = sample(epoch_now)
-- End of epoch, predict next
if prefetch.epoch ~= epoch_now then
- prefetch.queue = {}
- prefetch.queue_len = 0
prefetch.epoch = epoch_now
prefetch.sample = 0
nr_queued = nr_queued + predict(epoch_now)
- prefetch.queue_len = prefetch.queue_len + nr_queued
end
-- Prefetch expiring records
nr_queued = nr_queued + refresh()
if nr_queued > 0 then
prefetch.queue_len = prefetch.queue_len + nr_queued
prefetch.batch = prefetch.queue_len / 5
- event.after(0, prefetch.dispatch)
+ event.after(0, prefetch.drain)
end
event.after(next_event(), prefetch.process)
stats['predict.epoch'] = epoch_now
/* Defaults */
#define DEBUG_MSG(qry, fmt...) QRDEBUG(qry, "stat", fmt)
#define FREQUENT_COUNT 5000 /* Size of frequent tables */
-#define FREQUENT_PSAMPLE 50 /* Sampling rate, 1 in N */
+#define FREQUENT_PSAMPLE 10 /* Sampling rate, 1 in N */
/** @cond internal Fixed-size map of predefined metrics. */
#define CONST_METRICS(X) \
static void collect_sample(struct stat_data *data, struct kr_rplan *rplan, knot_pkt_t *pkt)
{
+ /* Probabilistic sampling of all queries (consider half) */
+ unsigned roll = kr_rand_uint(FREQUENT_PSAMPLE);
+ if (roll > FREQUENT_PSAMPLE / 2) {
+ return;
+ }
+
/* Sample key = {[2] type, [1-255] owner} */
char key[sizeof(uint16_t) + KNOT_DNAME_MAXLEN];
struct kr_query *qry = NULL;
unsigned *count = lru_set(data->queries.expiring, key, key_len);
if (count)
*count += 1;
- } else {
+ /* Consider 1 in N for frequent sampling. */
+ } else if (roll <= 1) {
unsigned *count = lru_set(data->queries.frequent, key, key_len);
if (count)
*count += 1;
/* Collect data on final answer */
collect_answer(data, param->answer);
- /* Probabilistic sampling of queries */
- if (kr_rand_uint(FREQUENT_PSAMPLE) <= 1) {
- collect_sample(data, rplan, param->answer);
- }
+ collect_sample(data, rplan, param->answer);
/* Count cached and unresolved */
if (!EMPTY_LIST(rplan->resolved)) {
/* Histogram of answer latency. */