-- Load dependent modules
if not stats then modules.load('stats') end
--- Calculate current epoch (which window fits current time)
-local function current_epoch()
- if not predict.period or predict.period <= 1 then return nil end
- return (os.date('%H')*(60/predict.window) +
- math.floor(os.date('%M')/predict.window)) % predict.period + 1
-end
-
-- Calculate next sample with jitter [1-2/5 of window]
local function next_event()
local jitter = (predict.window * minute) / 5;
return math.random(jitter, 2 * jitter)
end
+-- Calculate current epoch (which window fits current time)
+function predict.epoch()
+ if not predict.period or predict.period <= 1 then return nil end
+ return (os.date('%H')*(60/predict.window) +
+ math.floor(os.date('%M')/predict.window)) % predict.period + 1
+end
+
-- Resolve queued records and flush the queue
function predict.drain()
local deleted = 0
worker.resolve(qname, kres.type[qtype], kres.class.IN, 'NO_CACHE')
predict.queue[key] = nil
deleted = deleted + 1
- if deleted >= predict.batch then
+ -- Resolve smaller batches at a time
+ if predict.batch > 0 and deleted >= predict.batch then
break
end
end
+ -- Schedule prefetch of another batch if not complete
+ if predict.ev_drain then event.cancel(predict.ev_drain) end
predict.ev_drain = nil
if deleted > 0 then
predict.ev_drain = event.after((predict.window * 3) * sec, predict.drain)
function predict.process()
-- Start a new epoch, or continue sampling
- predict.ev_sample = nil
- local epoch_now = current_epoch()
+ local epoch_now = predict.epoch()
local nr_queued = 0
-- End of epoch
- if predict.epoch ~= epoch_now then
+ if predict.current_epoch ~= epoch_now then
stats['predict.epoch'] = epoch_now
- predict.epoch = epoch_now
+ predict.current_epoch = epoch_now
-- enqueue records from upcoming epoch
nr_queued = enqueue_from_log(predict.log[epoch_now])
-- predict next epoch
predict.ev_drain = event.after(0, predict.drain)
end
end
+
+ if predict.ev_sample then event.cancel(predict.ev_sample) end
predict.ev_sample = event.after(next_event(), predict.process)
if stats then
stats['predict.queue'] = predict.queue_len
function predict.init()
if predict.window > 0 then
- predict.epoch = current_epoch()
+ predict.current_epoch = predict.epoch()
predict.ev_sample = event.after(next_event(), predict.process)
end
end
-- setup resolver
modules = { 'predict' }
--- test if prediction of non-standard types works
-function test_predict_drain_typex()
- predict.queue_len = 1
+-- mock global functions
+local resolve_count = 0
+worker.resolve = function ()
+ resolve_count = resolve_count + 1
+end
+stats.frequent = function ()
+ return {
+ {name = 'example.com', type = 'TYPE65535'},
+ {name = 'example.com', type = 'SOA'},
+ }
+end
+local current_epoch = 0
+predict.epoch = function ()
+ return current_epoch % predict.period + 1
+end
+
+-- test if draining of prefetch queue works
+function test_predict_drain()
+ predict.queue_len = 2
predict.queue['TYPE65535 example.com'] = 1
+ predict.queue['SOA example.com'] = 1
+ predict.drain()
+ -- test that it attempted to prefetch
+ assert.same(2, resolve_count)
+ assert.same(0, predict.queue_len)
+end
+
+-- test if prediction process works
+function test_predict_process()
+ -- start new epoch
+ predict.process()
+ assert.same(0, predict.queue_len)
+ -- next epoch, still no period for frequent queries
+ current_epoch = current_epoch + 1
+ predict.process()
+ assert.same(0, predict.queue_len)
+ -- next epoch, found period
+ current_epoch = current_epoch + 1
+ predict.process()
+ assert.same(2, predict.queue_len)
+ -- drain works with scheduled prefetches (two batches)
+ resolve_count = 0
+ predict.drain()
predict.drain()
+ assert.same(2, resolve_count)
+ assert.same(0, predict.queue_len)
end
-- run test after processed config file
-- default config will be used and we can test it.
event.after(0, function (ev)
- test(test_predict_drain_typex)
+ test(test_predict_drain)
+ test(test_predict_process)
quit()
end)