From: Marek VavruĊĦa Date: Sat, 18 Jul 2015 11:57:15 +0000 (+0200) Subject: modules/prefetch: frequent sampling, fetch iteration X-Git-Tag: v1.0.0-beta1~72^2~10 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=22e0092d1134caa8bb4ba997090bd594d2aaa931;p=thirdparty%2Fknot-resolver.git modules/prefetch: frequent sampling, fetch iteration --- diff --git a/modules/prefetch/prefetch.lua b/modules/prefetch/prefetch.lua index 32149c70d..bc187158f 100644 --- a/modules/prefetch/prefetch.lua +++ b/modules/prefetch/prefetch.lua @@ -26,22 +26,18 @@ local function next_event() end -- Resolve queued records and flush the queue -function prefetch.dispatch(ev) +function prefetch.drain(ev) local deleted = 0 for key, val in pairs(prefetch.queue) do worker.resolve(string.sub(key, 2), string.byte(key)) - if val > 1 then - prefetch.queue[key] = val - 1 - else - prefetch.queue[key] = nil - end + prefetch.queue[key] = nil deleted = deleted + 1 if deleted >= prefetch.batch then break end end if deleted > 0 then - event.after((prefetch.window * 6) * sec, prefetch.dispatch) + event.after((prefetch.window * 6) * sec, prefetch.drain) end prefetch.queue_len = prefetch.queue_len - deleted stats['predict.queue'] = prefetch.queue_len @@ -49,13 +45,38 @@ function prefetch.dispatch(ev) return 0 end +-- Enqueue queries from set +local function enqueue(queries) + local queued = 0 + local nr_queries = #queries + for i = 1, nr_queries do + local entry = queries[i] + local key = string.char(entry.type)..entry.name + if not prefetch.queue[key] then + prefetch.queue[key] = 1 + queued = queued + 1 + end + end + return queued +end + +-- Prefetch soon-to-expire records +local function refresh() + local queries = stats.expiring() + stats.clear_expiring() + return enqueue(queries) +end + -- Sample current epoch, return number of sampled queries local function sample(epoch_now) local queries = stats.frequent() stats.clear_frequent() - local start = os.clock() + local queued = 0 local current = prefetch.log[epoch_now] if prefetch.epoch ~= epoch_now or current == nil then + if current ~= nil then + queued = enqueue(current) + end current = {} end local nr_samples = #queries @@ -64,29 +85,13 @@ local function sample(epoch_now) local key = string.char(entry.type)..entry.name current[key] = entry.count end - print (string.format('[prob] .. sampling epoch: %d/%d, %.2f sec (%d items)', epoch_now, prefetch.sample, os.clock() - start, nr_samples)) prefetch.log[epoch_now] = current prefetch.sample = prefetch.sample + 1 - return nr_samples -end - --- Prefetch soon-to-expire records -local function refresh() - local queries = stats.expiring() - stats.clear_expiring() - local nr_samples = #queries - for i = 1, nr_samples do - local entry = queries[i] - local key = string.char(entry.type)..entry.name - prefetch.queue[key] = 1 - end - print (string.format('[prob] .. prefetching %d items', nr_samples)) - return nr_samples + return nr_samples, queued end -- Sample current epoch, return number of sampled queries local function predict(epoch_now) - local start = os.clock() local queued = 0 local period = prefetch.period + 1 for i = 1, prefetch.period / 2 - 1 do @@ -101,7 +106,6 @@ local function predict(epoch_now) end end end - print (string.format('[prob] predicted epoch: %d, %.2f sec (%d items)', epoch_now, os.clock() - start, queued)) return queued end @@ -109,16 +113,12 @@ end function prefetch.process(ev) -- Start a new epoch, or continue sampling local epoch_now = current_epoch() - local nr_learned = sample(epoch_now) - local nr_queued = 0 + local nr_learned, nr_queued = sample(epoch_now) -- End of epoch, predict next if prefetch.epoch ~= epoch_now then - prefetch.queue = {} - prefetch.queue_len = 0 prefetch.epoch = epoch_now prefetch.sample = 0 nr_queued = nr_queued + predict(epoch_now) - prefetch.queue_len = prefetch.queue_len + nr_queued end -- Prefetch expiring records nr_queued = nr_queued + refresh() @@ -126,7 +126,7 @@ function prefetch.process(ev) if nr_queued > 0 then prefetch.queue_len = prefetch.queue_len + nr_queued prefetch.batch = prefetch.queue_len / 5 - event.after(0, prefetch.dispatch) + event.after(0, prefetch.drain) end event.after(next_event(), prefetch.process) stats['predict.epoch'] = epoch_now diff --git a/modules/stats/stats.c b/modules/stats/stats.c index fe9a9fd30..6c1fdc40e 100644 --- a/modules/stats/stats.c +++ b/modules/stats/stats.c @@ -38,7 +38,7 @@ /* Defaults */ #define DEBUG_MSG(qry, fmt...) QRDEBUG(qry, "stat", fmt) #define FREQUENT_COUNT 5000 /* Size of frequent tables */ -#define FREQUENT_PSAMPLE 50 /* Sampling rate, 1 in N */ +#define FREQUENT_PSAMPLE 10 /* Sampling rate, 1 in N */ /** @cond internal Fixed-size map of predefined metrics. */ #define CONST_METRICS(X) \ @@ -121,6 +121,12 @@ static inline int collect_key(char *key, const knot_dname_t *name, uint16_t type static void collect_sample(struct stat_data *data, struct kr_rplan *rplan, knot_pkt_t *pkt) { + /* Probabilistic sampling of all queries (consider half) */ + unsigned roll = kr_rand_uint(FREQUENT_PSAMPLE); + if (roll > FREQUENT_PSAMPLE / 2) { + return; + } + /* Sample key = {[2] type, [1-255] owner} */ char key[sizeof(uint16_t) + KNOT_DNAME_MAXLEN]; struct kr_query *qry = NULL; @@ -134,7 +140,8 @@ static void collect_sample(struct stat_data *data, struct kr_rplan *rplan, knot_ unsigned *count = lru_set(data->queries.expiring, key, key_len); if (count) *count += 1; - } else { + /* Consider 1 in N for frequent sampling. */ + } else if (roll <= 1) { unsigned *count = lru_set(data->queries.frequent, key, key_len); if (count) *count += 1; @@ -151,10 +158,7 @@ static int collect(knot_layer_t *ctx) /* Collect data on final answer */ collect_answer(data, param->answer); - /* Probabilistic sampling of queries */ - if (kr_rand_uint(FREQUENT_PSAMPLE) <= 1) { - collect_sample(data, rplan, param->answer); - } + collect_sample(data, rplan, param->answer); /* Count cached and unresolved */ if (!EMPTY_LIST(rplan->resolved)) { /* Histogram of answer latency. */