--- Batch soon-expiring records in a queue and fetch them periodically.
--- This helps to reduce a latency for records that are often accessed.
+-- Speculative prefetching for repetitive and soon-expiring records to reduce latency.
-- @module prefetch
-- @field queue table of scheduled records
-- @field queue_max maximum length of the queue
--- @field queue_len current length of the queue
-- @field window length of the coalescing window
local prefetch = {
queue = {},
- queue_max = 1000,
- queue_len = 0,
- window = 30,
- layer = {
- -- Schedule cached entries that are expiring soon
- finish = function(state, req, answer)
- local qry = kres.query_resolved(req)
- if not kres.query.has_flag(qry, kres.query.EXPIRING) then
- return state
- end
- -- Refresh entries that probably expire in this time window
- local qlen = prefetch.queue_len
- if qlen > prefetch.queue_max then
- return state
- end
- -- Key: {qtype [1], qname [1-255]}
- local key = string.char(answer:qtype())..answer:qname()
- local val = prefetch.queue[key]
- if not val then
- prefetch.queue[key] = 1
- prefetch.queue_len = qlen + 1
- else
- prefetch.queue[key] = val + 1
- end
- return state
- end
- }
+ batch = 0,
+ epoch = 0,
+ period = 4 * 24,
+ window = 15,
+ log = {}
}
+-- Calculate current epoch (number of quarter-hours today)
+local function current_epoch()
+ return os.date('%H')*(60/prefetch.window) + math.floor(os.date('%M')/prefetch.window) + 1
+end
+
-- Resolve queued records and flush the queue
-function prefetch.batch(module)
+function prefetch.dispatch(ev)
-- Defer prefetching if the server is loaded
if worker.stats().concurrent > 10 then
+ event.after(minute, prefetch.dispatch)
+ prefetch.batch = prefetch.batch + prefetch.batch / 2
return 0
end
- local to_delete = prefetch.queue_max / 5
local deleted = 0
for key, val in pairs(prefetch.queue) do
worker.resolve(string.sub(key, 2), string.byte(key))
- prefetch.queue[key] = nil
+ if val > 1 then
+ prefetch.queue[key] = val - 1
+ else
+ prefetch.queue[key] = nil
+ end
deleted = deleted + 1
- if deleted == to_delete then
+ if deleted == prefetch.batch then
break
end
end
- prefetch.queue_len = prefetch.queue_len - deleted
+ if deleted > 0 then
+ event.after(minute, prefetch.dispatch)
+ end
return 0
end
+-- Process current epoch
+function prefetch.process(ev)
+ -- Process current learning epoch
+ local start = os.clock()
+ local recent_queries = stats.queries()
+ stats.queries_clear()
+ local current = {}
+ for i = 1, #recent_queries do
+ local entry = recent_queries[i]
+ local key = string.char(entry.type)..entry.name
+ current[key] = entry.count
+ -- print('.. learning', entry.name, entry.type)
+ end
+ print (string.format('[prob] learned epoch: %d, %.2f sec', prefetch.epoch, os.clock() - start))
+ prefetch.log[prefetch.epoch] = current
+ prefetch.epoch = prefetch.epoch % prefetch.period + 1
+ -- Predict queries for the next epoch based on the usage patterns
+ for i = 1, prefetch.period / 2 - 1 do
+ current = prefetch.log[prefetch.epoch - i]
+ local past = prefetch.log[prefetch.epoch - 2*i]
+ if current and past then
+ for k, v in pairs(current) do
+ if past[k] ~= nil then
+ prefetch.queue[k] = v
+ end
+ end
+ end
+ end
+ print (string.format('[prob] predicted epoch: %d, %.2f sec', prefetch.epoch, os.clock() - start))
+ -- TODO: batch in soon-expiring queries
+ -- TODO: clusterize records often found together
+ -- Dispatch prefetch requests
+ prefetch.batch = #prefetch.queue / prefetch.window
+ event.after(0, prefetch.dispatch)
+end
+
function prefetch.init(module)
- event.recurrent(prefetch.window * sec, prefetch.batch)
+ prefetch.epoch = current_epoch()
+ event.recurrent(prefetch.window * minute, prefetch.process)
end
function prefetch.deinit(module)