-- Speculative prefetching for repetitive and soon-expiring records to reduce latency.
-- @module predict
--- @field queue table of scheduled records
--- @field queue_max maximum length of the queue
--- @field window length of the coalescing window
+-- @field queue queue of scheduled queries
+-- @field queue_len number of scheduled queries
+-- @field period length of prediction history (number of windows)
+-- @field window length of the prediction window
local predict = {
queue = {},
queue_len = 0,
batch = 0,
- epoch = 0,
period = 24,
window = 15,
log = {},
-- Calculate current epoch (which window fits current time)
local function current_epoch()
+ if not predict.period or predict.period <= 1 then return nil end
return (os.date('%H')*(60/predict.window) +
math.floor(os.date('%M')/predict.window)) % predict.period + 1
end
-- Sample current epoch, return number of sampled queries
function predict.sample(epoch_now)
- if not epoch_now then return end
+ if not epoch_now then return 0, 0 end
local queries = stats.frequent()
stats.clear_frequent()
local queued = 0
-- Predict queries for the upcoming epoch
local function generate(epoch_now)
+ if not epoch_now then return 0 end
local queued = 0
local period = predict.period + 1
for i = 1, predict.period / 2 - 1 do
return queued
end
--- Process current epoch
function predict.process(ev)
-- Start a new epoch, or continue sampling
predict.ev_sample = nil
local nr_learned, nr_queued = predict.sample(epoch_now)
-- End of epoch, predict next
if predict.epoch ~= epoch_now then
+ stats['predict.epoch'] = epoch_now
predict.epoch = epoch_now
nr_queued = nr_queued + generate(epoch_now)
end
end
end
predict.ev_sample = event.after(next_event(), predict.process)
- stats['predict.epoch'] = epoch_now
stats['predict.queue'] = predict.queue_len
stats['predict.learned'] = nr_learned
collectgarbage()
end
function predict.init(module)
- if predict.window > 0 and predict.period > 0 then
+ if predict.window > 0 then
predict.epoch = current_epoch()
predict.ev_sample = event.after(next_event(), predict.process)
end