]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
modules/prefetch: multisampling, expiring prefetch, configurable window
authorMarek Vavruša <marek.vavrusa@nic.cz>
Fri, 17 Jul 2015 16:19:29 +0000 (18:19 +0200)
committerMarek Vavruša <marek.vavrusa@nic.cz>
Fri, 17 Jul 2015 16:19:29 +0000 (18:19 +0200)
modules/prefetch/prefetch.lua

index ff3b316647d406092f5091bb5b420452ecdeb5f8..4da6e3ec90396e59c9ca119c87fb93bc4bd9c5de 100644 (file)
@@ -5,26 +5,22 @@
 -- @field window length of the coalescing window
 local prefetch = {
        queue = {},
+       queue_len = 0,
        batch = 0,
        epoch = 0,
-       period = 4 * 24,
-       window = 15,
+       period = 24,
+       window = 10,
+       sample = 0,
        log = {}
 }
 
--- Calculate current epoch (number of quarter-hours today)
+-- Calculate current epoch (which window fits current time)
 local function current_epoch()
-       return os.date('%H')*(60/prefetch.window) + math.floor(os.date('%M')/prefetch.window) + 1
+       return (os.date('%H')*(60/prefetch.window) + math.floor(os.date('%M')/prefetch.window)) % prefetch.period + 1
 end
 
 -- Resolve queued records and flush the queue
 function prefetch.dispatch(ev)
-       -- Defer prefetching if the server is loaded
-       if worker.stats().concurrent > 10 then
-               event.after(minute, prefetch.dispatch)
-               prefetch.batch = prefetch.batch + prefetch.batch / 2
-               return 0
-       end
        local deleted = 0
        for key, val in pairs(prefetch.queue) do
                worker.resolve(string.sub(key, 2), string.byte(key))
@@ -34,55 +30,107 @@ function prefetch.dispatch(ev)
                        prefetch.queue[key] = nil
                end
                deleted = deleted + 1
-               if deleted == prefetch.batch then
+               if deleted >= prefetch.batch then
                        break
                end
        end
        if deleted > 0 then
-               event.after(minute, prefetch.dispatch)
+               event.after((prefetch.window * 6) * sec, prefetch.dispatch)
        end
+       prefetch.queue_len = prefetch.queue_len - deleted
+       stats['predict.queue'] = prefetch.queue_len
+       collectgarbage()
        return 0
 end
 
--- Process current epoch
-function prefetch.process(ev)
-       -- Process current learning epoch
+-- Sample current epoch, return number of sampled queries
+local function sample(epoch_now)
+       local queries = stats.frequent()
+       stats.clear_frequent()
        local start = os.clock()
-       local recent_queries = stats.queries()
-       stats.queries_clear()
-       local current = {}
-       for i = 1, #recent_queries do
-               local entry = recent_queries[i]
+       local current = prefetch.log[prefetch.epoch]
+       if prefetch.epoch ~= epoch_now then
+               current = {}
+       end
+       local nr_samples = #queries
+       for i = 1, nr_samples do
+               local entry = queries[i]
                local key = string.char(entry.type)..entry.name
                current[key] = entry.count
-               -- print('.. learning', entry.name, entry.type)
        end
-       print (string.format('[prob] learned epoch: %d, %.2f sec', prefetch.epoch, os.clock() - start))
+       print (string.format('[prob] sampling epoch: %d/%d, %.2f sec (%d items)', prefetch.epoch, prefetch.sample, os.clock() - start, #queries))
        prefetch.log[prefetch.epoch] = current
-       prefetch.epoch = prefetch.epoch % prefetch.period + 1
-       -- Predict queries for the next epoch based on the usage patterns
+       prefetch.sample = prefetch.sample + 1
+       return nr_samples
+end
+
+-- Prefetch soon-to-expire records
+local function refresh()
+       local queries = stats.expiring()
+       stats.clear_expiring()
+       local nr_samples = #queries
+       for i = 1, nr_samples do
+               local entry = queries[i]
+               local key = string.char(entry.type)..entry.name
+               prefetch.queue[key] = 1
+       end
+       print (string.format('[prob] prefetching epoch: %d/%d (%d items)', prefetch.epoch, prefetch.sample, nr_samples))
+       return nr_samples
+end
+
+-- Sample current epoch, return number of sampled queries
+local function predict(epoch_now)
+       local start = os.clock()
+       local queued = 0
        for i = 1, prefetch.period / 2 - 1 do
-               current = prefetch.log[prefetch.epoch - i]
-               local past = prefetch.log[prefetch.epoch - 2*i]
+               local current = prefetch.log[epoch_now - i]
+               local past = prefetch.log[epoch_now - 2*i]
                if current and past then
                        for k, v in pairs(current) do
-                               if past[k] ~= nil then
-                                       prefetch.queue[k] = v
+                               if past[k] ~= nil and not prefetch.queue[k] then
+                                       queued = queued + 1
+                                       prefetch.queue[k] = 1
                                end
                        end
                end
        end
-       print (string.format('[prob] predicted epoch: %d, %.2f sec', prefetch.epoch, os.clock() - start))
-       -- TODO: batch in soon-expiring queries
-       -- TODO: clusterize records often found together
+       print (string.format('[prob] predicted epoch: %d, %.2f sec (%d items)', prefetch.epoch, os.clock() - start, queued))
+       return queued
+end
+
+-- Process current epoch
+function prefetch.process(ev)
+       -- Start a new epoch, or continue sampling
+       local epoch_now = current_epoch()
+       local nr_learned = sample(epoch_now)
+       local nr_queued = 0
+       -- End of epoch, predict next
+       if prefetch.epoch ~= epoch_now then
+               prefetch.queue = {}
+               prefetch.queue_len = 0
+               prefetch.epoch = epoch_now
+               prefetch.sample = 0
+               nr_queued = nr_queued + predict(epoch_now)
+               prefetch.queue_len = prefetch.queue_len + nr_queued
+       end
+       -- Prefetch expiring records
+       nr_queued = nr_queued + refresh()
        -- Dispatch prefetch requests
-       prefetch.batch = #prefetch.queue / prefetch.window
-       event.after(0, prefetch.dispatch)
+       if nr_queued > 0 then
+               prefetch.queue_len = prefetch.queue_len + nr_queued
+               prefetch.batch = prefetch.queue_len / 10
+               event.after(0, prefetch.dispatch)
+       end
+       event.after(prefetch.window * minute, prefetch.process)
+       stats['predict.epoch'] = epoch_now
+       stats['predict.queue'] = prefetch.queue_len
+       stats['predict.learned'] = nr_learned
+       collectgarbage()
 end
 
 function prefetch.init(module)
        prefetch.epoch = current_epoch()
-       event.recurrent(prefetch.window * minute, prefetch.process)
+       event.after(prefetch.window * minute, prefetch.process)
 end
 
 function prefetch.deinit(module)