log = {},
}
+-- Load dependent modules
+if not stats then modules.load('stats') end
+
-- Calculate current epoch (which window fits current time)
local function current_epoch()
if not predict.period or predict.period <= 1 then return nil end
end
-- Resolve queued records and flush the queue
-function predict.drain(ev)
+function predict.drain()
local deleted = 0
- for key, val in pairs(predict.queue) do
+ for key, _ in pairs(predict.queue) do
local qtype, qname = key:match('(%S*)%s(.*)')
worker.resolve(qname, kres.type[qtype], kres.class.IN, 'NO_CACHE')
predict.queue[key] = nil
return 0
end
--- Enqueue queries from set
-local function enqueue(queries)
- local queued = 0
- local nr_queries = #queries
- for i = 1, nr_queries do
- local entry = queries[i]
- local key = string.format('%s %s', entry.type, entry.name)
- if not predict.queue[key] then
- predict.queue[key] = 1
- queued = queued + 1
- end
- end
- return queued
-end
-
--- Enqueue queries from same format as predict.queue or predict.log
+-- Enqueue queries from same format as predict.queue or predict.log
local function enqueue_from_log(current)
if not current then return 0 end
- queued = 0
- for key, val in pairs(current) do
+ local queued = 0
+ for key, val in pairs(current) do
if val and not predict.queue[key] then
predict.queue[key] = val
queued = queued + 1
-- Sample current epoch, return number of sampled queries
function predict.sample(epoch_now)
if not epoch_now then return 0, 0 end
- local current = predict.log[epoch_now] or {}
+ local current = predict.log[epoch_now] or {}
local queries = stats.frequent()
stats.clear_frequent()
local nr_samples = #queries
local current = predict.log[(epoch_now - i - 1) % predict.period + 1]
local past = predict.log[(epoch_now - 2*i - 1) % predict.period + 1]
if current and past then
- for k, v in pairs(current) do
+ for k, _ in pairs(current) do
if past[k] ~= nil and not predict.queue[k] then
queued = queued + 1
predict.queue[k] = 1
return queued
end
-function predict.process(ev)
- if (predict.period or 0) ~= 0 and not stats then
- error("'stats' module required")
- end
+function predict.process()
-- Start a new epoch, or continue sampling
predict.ev_sample = nil
local epoch_now = current_epoch()
local nr_queued = 0
- -- End of epoch
+ -- End of epoch
if predict.epoch ~= epoch_now then
stats['predict.epoch'] = epoch_now
predict.epoch = epoch_now
- -- enqueue records from upcoming epoch
+ -- enqueue records from upcoming epoch
nr_queued = enqueue_from_log(predict.log[epoch_now])
-- predict next epoch
nr_queued = nr_queued + generate(epoch_now)
-- clear log for new epoch
predict.log[epoch_now] = {}
end
-
+
-- Sample current epoch
local nr_learned = predict.sample(epoch_now)
-
+
-- Dispatch predicted queries
if nr_queued > 0 then
predict.queue_len = predict.queue_len + nr_queued
collectgarbage()
end
-function predict.init(module)
+function predict.init()
if predict.window > 0 then
predict.epoch = current_epoch()
predict.ev_sample = event.after(next_event(), predict.process)
end
end
-function predict.deinit(module)
+function predict.deinit()
if predict.ev_sample then event.cancel(predict.ev_sample) end
if predict.ev_drain then event.cancel(predict.ev_drain) end
predict.ev_sample = nil
predict.layer = {
-- Prefetch all expiring (sub-)queries immediately after the request finishes.
-- Doing that immediately is simplest and avoids creating (new) large bursts of activity.
- finish = function (state, req)
+ finish = function (_, req)
req = kres.request_t(req)
local qrys = req.rplan.resolved
for i = 0, (tonumber(qrys.len) - 1) do -- size_t doesn't work for some reason