]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
modules/prefetch: frequent sampling, fetch iteration
authorMarek Vavruša <marek.vavrusa@nic.cz>
Sat, 18 Jul 2015 11:57:15 +0000 (13:57 +0200)
committerMarek Vavruša <marek.vavrusa@nic.cz>
Sat, 18 Jul 2015 11:58:53 +0000 (13:58 +0200)
modules/prefetch/prefetch.lua
modules/stats/stats.c

index 32149c70ddd1418c11899469eb69d4d54cccc914..bc187158f72c8e386189ae123f59c6e7a8a09eca 100644 (file)
@@ -26,22 +26,18 @@ local function next_event()
 end
 
 -- Resolve queued records and flush the queue
-function prefetch.dispatch(ev)
+function prefetch.drain(ev)
        local deleted = 0
        for key, val in pairs(prefetch.queue) do
                worker.resolve(string.sub(key, 2), string.byte(key))
-               if val > 1 then
-                       prefetch.queue[key] = val - 1
-               else
-                       prefetch.queue[key] = nil
-               end
+               prefetch.queue[key] = nil
                deleted = deleted + 1
                if deleted >= prefetch.batch then
                        break
                end
        end
        if deleted > 0 then
-               event.after((prefetch.window * 6) * sec, prefetch.dispatch)
+               event.after((prefetch.window * 6) * sec, prefetch.drain)
        end
        prefetch.queue_len = prefetch.queue_len - deleted
        stats['predict.queue'] = prefetch.queue_len
@@ -49,13 +45,38 @@ function prefetch.dispatch(ev)
        return 0
 end
 
+-- Enqueue queries from set
+local function enqueue(queries)
+       local queued = 0
+       local nr_queries = #queries
+       for i = 1, nr_queries do
+               local entry = queries[i]
+               local key = string.char(entry.type)..entry.name
+               if not prefetch.queue[key] then
+                       prefetch.queue[key] = 1
+                       queued = queued + 1
+               end
+       end
+       return queued   
+end
+
+-- Prefetch soon-to-expire records
+local function refresh()
+       local queries = stats.expiring()
+       stats.clear_expiring()
+       return enqueue(queries)
+end
+
 -- Sample current epoch, return number of sampled queries
 local function sample(epoch_now)
        local queries = stats.frequent()
        stats.clear_frequent()
-       local start = os.clock()
+       local queued = 0
        local current = prefetch.log[epoch_now]
        if prefetch.epoch ~= epoch_now or current == nil then
+               if current ~= nil then
+                       queued = enqueue(current)
+               end
                current = {}
        end
        local nr_samples = #queries
@@ -64,29 +85,13 @@ local function sample(epoch_now)
                local key = string.char(entry.type)..entry.name
                current[key] = entry.count
        end
-       print (string.format('[prob] .. sampling epoch: %d/%d, %.2f sec (%d items)', epoch_now, prefetch.sample, os.clock() - start, nr_samples))
        prefetch.log[epoch_now] = current
        prefetch.sample = prefetch.sample + 1
-       return nr_samples
-end
-
--- Prefetch soon-to-expire records
-local function refresh()
-       local queries = stats.expiring()
-       stats.clear_expiring()
-       local nr_samples = #queries
-       for i = 1, nr_samples do
-               local entry = queries[i]
-               local key = string.char(entry.type)..entry.name
-               prefetch.queue[key] = 1
-       end
-       print (string.format('[prob]    .. prefetching %d items', nr_samples))
-       return nr_samples
+       return nr_samples, queued
 end
 
 -- Sample current epoch, return number of sampled queries
 local function predict(epoch_now)
-       local start = os.clock()
        local queued = 0
        local period = prefetch.period + 1
        for i = 1, prefetch.period / 2 - 1 do
@@ -101,7 +106,6 @@ local function predict(epoch_now)
                        end
                end
        end
-       print (string.format('[prob] predicted epoch: %d, %.2f sec (%d items)', epoch_now, os.clock() - start, queued))
        return queued
 end
 
@@ -109,16 +113,12 @@ end
 function prefetch.process(ev)
        -- Start a new epoch, or continue sampling
        local epoch_now = current_epoch()
-       local nr_learned = sample(epoch_now)
-       local nr_queued = 0
+       local nr_learned, nr_queued = sample(epoch_now)
        -- End of epoch, predict next
        if prefetch.epoch ~= epoch_now then
-               prefetch.queue = {}
-               prefetch.queue_len = 0
                prefetch.epoch = epoch_now
                prefetch.sample = 0
                nr_queued = nr_queued + predict(epoch_now)
-               prefetch.queue_len = prefetch.queue_len + nr_queued
        end
        -- Prefetch expiring records
        nr_queued = nr_queued + refresh()
@@ -126,7 +126,7 @@ function prefetch.process(ev)
        if nr_queued > 0 then
                prefetch.queue_len = prefetch.queue_len + nr_queued
                prefetch.batch = prefetch.queue_len / 5
-               event.after(0, prefetch.dispatch)
+               event.after(0, prefetch.drain)
        end
        event.after(next_event(), prefetch.process)
        stats['predict.epoch'] = epoch_now
index fe9a9fd308898fbf36ad7a4a1a2736cd1413f7ba..6c1fdc40e86834e07a296e3726705ea22dcf3b9f 100644 (file)
@@ -38,7 +38,7 @@
 /* Defaults */
 #define DEBUG_MSG(qry, fmt...) QRDEBUG(qry, "stat",  fmt)
 #define FREQUENT_COUNT 5000 /* Size of frequent tables */
-#define FREQUENT_PSAMPLE 50 /* Sampling rate, 1 in N */
+#define FREQUENT_PSAMPLE 10 /* Sampling rate, 1 in N */
 
 /** @cond internal Fixed-size map of predefined metrics. */
 #define CONST_METRICS(X) \
@@ -121,6 +121,12 @@ static inline int collect_key(char *key, const knot_dname_t *name, uint16_t type
 
 static void collect_sample(struct stat_data *data, struct kr_rplan *rplan, knot_pkt_t *pkt)
 {
+       /* Probabilistic sampling of all queries (consider half) */
+       unsigned roll = kr_rand_uint(FREQUENT_PSAMPLE);
+       if (roll > FREQUENT_PSAMPLE / 2) {
+               return;
+       }
+
        /* Sample key = {[2] type, [1-255] owner} */
        char key[sizeof(uint16_t) + KNOT_DNAME_MAXLEN];
        struct kr_query *qry = NULL;
@@ -134,7 +140,8 @@ static void collect_sample(struct stat_data *data, struct kr_rplan *rplan, knot_
                        unsigned *count = lru_set(data->queries.expiring, key, key_len);
                        if (count)
                                *count += 1;
-               } else {
+               /* Consider 1 in N for frequent sampling. */
+               } else if (roll <= 1) {
                        unsigned *count = lru_set(data->queries.frequent, key, key_len);
                        if (count)
                                *count += 1;
@@ -151,10 +158,7 @@ static int collect(knot_layer_t *ctx)
 
        /* Collect data on final answer */
        collect_answer(data, param->answer);
-       /* Probabilistic sampling of queries */
-       if (kr_rand_uint(FREQUENT_PSAMPLE) <= 1) {
-               collect_sample(data, rplan, param->answer);
-       }
+       collect_sample(data, rplan, param->answer);
        /* Count cached and unresolved */
        if (!EMPTY_LIST(rplan->resolved)) {
                /* Histogram of answer latency. */