]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
modules/prefetch: wip
authorMarek Vavruša <marek.vavrusa@nic.cz>
Wed, 15 Jul 2015 08:21:44 +0000 (10:21 +0200)
committerMarek Vavruša <marek.vavrusa@nic.cz>
Fri, 17 Jul 2015 14:57:32 +0000 (16:57 +0200)
modules/prefetch/prefetch.lua
modules/stats/stats.c

index b0c84055740501dbaec2966474c6918b9ab0b4b6..ff3b316647d406092f5091bb5b420452ecdeb5f8 100644 (file)
@@ -1,63 +1,88 @@
--- Batch soon-expiring records in a queue and fetch them periodically.
--- This helps to reduce a latency for records that are often accessed.
+-- Speculative prefetching for repetitive and soon-expiring records to reduce latency.
 -- @module prefetch
 -- @field queue table of scheduled records
 -- @field queue_max maximum length of the queue
--- @field queue_len current length of the queue
 -- @field window length of the coalescing window
 local prefetch = {
        queue = {},
-       queue_max = 1000,
-       queue_len = 0,
-       window = 30,
-       layer = {
-               -- Schedule cached entries that are expiring soon
-               finish = function(state, req, answer)
-                       local qry = kres.query_resolved(req)
-                       if not kres.query.has_flag(qry, kres.query.EXPIRING) then
-                               return state
-                       end
-                       -- Refresh entries that probably expire in this time window
-                       local qlen = prefetch.queue_len
-                       if qlen > prefetch.queue_max then
-                               return state
-                       end
-                       -- Key: {qtype [1], qname [1-255]}
-                       local key = string.char(answer:qtype())..answer:qname()
-                       local val = prefetch.queue[key]
-                       if not val then
-                               prefetch.queue[key] = 1
-                               prefetch.queue_len = qlen + 1
-                       else
-                               prefetch.queue[key] = val + 1
-                       end
-                       return state
-               end
-       }
+       batch = 0,
+       epoch = 0,
+       period = 4 * 24,
+       window = 15,
+       log = {}
 }
 
+-- Calculate current epoch (number of quarter-hours today)
+local function current_epoch()
+       return os.date('%H')*(60/prefetch.window) + math.floor(os.date('%M')/prefetch.window) + 1
+end
+
 -- Resolve queued records and flush the queue
-function prefetch.batch(module)
+function prefetch.dispatch(ev)
        -- Defer prefetching if the server is loaded
        if worker.stats().concurrent > 10 then
+               event.after(minute, prefetch.dispatch)
+               prefetch.batch = prefetch.batch + prefetch.batch / 2
                return 0
        end
-       local to_delete = prefetch.queue_max / 5
        local deleted = 0
        for key, val in pairs(prefetch.queue) do
                worker.resolve(string.sub(key, 2), string.byte(key))
-               prefetch.queue[key] = nil
+               if val > 1 then
+                       prefetch.queue[key] = val - 1
+               else
+                       prefetch.queue[key] = nil
+               end
                deleted = deleted + 1
-               if deleted == to_delete then
+               if deleted == prefetch.batch then
                        break
                end
        end
-       prefetch.queue_len = prefetch.queue_len - deleted
+       if deleted > 0 then
+               event.after(minute, prefetch.dispatch)
+       end
        return 0
 end
 
+-- Process current epoch
+function prefetch.process(ev)
+       -- Process current learning epoch
+       local start = os.clock()
+       local recent_queries = stats.queries()
+       stats.queries_clear()
+       local current = {}
+       for i = 1, #recent_queries do
+               local entry = recent_queries[i]
+               local key = string.char(entry.type)..entry.name
+               current[key] = entry.count
+               -- print('.. learning', entry.name, entry.type)
+       end
+       print (string.format('[prob] learned epoch: %d, %.2f sec', prefetch.epoch, os.clock() - start))
+       prefetch.log[prefetch.epoch] = current
+       prefetch.epoch = prefetch.epoch % prefetch.period + 1
+       -- Predict queries for the next epoch based on the usage patterns
+       for i = 1, prefetch.period / 2 - 1 do
+               current = prefetch.log[prefetch.epoch - i]
+               local past = prefetch.log[prefetch.epoch - 2*i]
+               if current and past then
+                       for k, v in pairs(current) do
+                               if past[k] ~= nil then
+                                       prefetch.queue[k] = v
+                               end
+                       end
+               end
+       end
+       print (string.format('[prob] predicted epoch: %d, %.2f sec', prefetch.epoch, os.clock() - start))
+       -- TODO: batch in soon-expiring queries
+       -- TODO: clusterize records often found together
+       -- Dispatch prefetch requests
+       prefetch.batch = #prefetch.queue / prefetch.window
+       event.after(0, prefetch.dispatch)
+end
+
 function prefetch.init(module)
-       event.recurrent(prefetch.window * sec, prefetch.batch)
+       prefetch.epoch = current_epoch()
+       event.recurrent(prefetch.window * minute, prefetch.process)
 end
 
 function prefetch.deinit(module)
index db2c942ac1c75ac67e3dc732c8bc6d2bc3c5c8f1..0ab33518aeed7295ad461e094bbc933a7c5d0b22 100644 (file)
@@ -122,10 +122,10 @@ static void collect_sample(struct stat_data *data, struct kr_rplan *rplan, knot_
 {
        /* Sample key = {[2] type, [1-255] owner} */
        char key[sizeof(uint16_t) + KNOT_DNAME_MAXLEN];
-       /* Sample queries leading to iteration */
+       /* Sample queries leading to iteration or expiring */
        struct kr_query *qry = NULL;
        WALK_LIST(qry, rplan->resolved) {
-               if (!(qry->flags & QUERY_CACHED)) {
+               if (!(qry->flags & QUERY_CACHED) || (qry->flags & QUERY_EXPIRING)) {
                        int key_len = collect_key(key, qry->sname, qry->stype);
                        unsigned *count = lru_set(data->frequent.names, key, key_len);
                        if (count) {