finalize = function (_ --[[metrics]]) end,
}
-local snapshots, snapshots_count = {}, 120
-
-- Gauge metrics
local gauges = {
['worker.concurrent'] = true,
return t
end
-local function snapshot_end()
- snapshots_count = false
-end
-
--- Function to sort frequency list
-local function snapshot_start()
- local prev = getstats()
- while snapshots_count do
- local is_empty = true
- -- Get current snapshot
- local cur, stats_dt = getstats(), {}
- for k,v in pairs(cur) do
- if gauges[k] then
- stats_dt[k] = v
- else
- stats_dt[k] = v - (prev[k] or 0)
- end
- is_empty = is_empty and stats_dt[k] == 0
+-- @returns current stats + difference against previous data set passed in @param prev
+local function snapshot_start(prev)
+ assert(type(prev) == 'table', 'table with previous values expected')
+ local is_empty = true
+ -- Get current snapshot
+ local cur, stats_dt = getstats(), {}
+ for k,v in pairs(cur) do
+ if gauges[k] then
+ stats_dt[k] = v
+ else
+ stats_dt[k] = v - (prev[k] or 0)
end
- prev = cur
- -- Calculate upstreams and geotag them if possible
- local upstreams
- if http.geoip then
- upstreams = stats.upstreams()
- for k,v in pairs(upstreams) do
- local gi
- if string.find(k, '.', 1, true) then
- gi = http.geoip:search_ipv4(k)
- else
- gi = http.geoip:search_ipv6(k)
- end
- if gi then
- upstreams[k] = {data=v, location=gi.location, country=gi.country and gi.country.iso_code}
- end
+ is_empty = is_empty and stats_dt[k] == 0
+ end
+ -- Calculate upstreams and geotag them if possible
+ local upstreams
+ if http.geoip then
+ upstreams = stats.upstreams()
+ for k,v in pairs(upstreams) do
+ local gi
+ if string.find(k, '.', 1, true) then
+ gi = http.geoip:search_ipv4(k)
+ else
+ gi = http.geoip:search_ipv6(k)
end
- end
- -- Aggregate per-worker metrics
- local wdata = {}
- for _, info in pairs(map 'worker.info()') do
- if type(info) == 'table' then
- wdata[tostring(info.pid)] = {
- rss = info.rss,
- usertime = info.usertime,
- systime = info.systime,
- pagefaults = info.pagefaults,
- queries = info.queries
- }
+ if gi then
+ upstreams[k] = {data=v, location=gi.location, country=gi.country and gi.country.iso_code}
end
end
- -- Publish stats updates periodically
- if not is_empty then
- local update = {time=os.time(), stats=stats_dt, upstreams=upstreams, workers=wdata}
- table.insert(snapshots, update)
- if #snapshots > snapshots_count then
- table.remove(snapshots, 1)
- end
+ end
+ -- Aggregate per-worker metrics
+ local wdata = {}
+ for _, info in pairs(map 'worker.info()') do
+ if type(info) == 'table' then
+ wdata[tostring(info.pid)] = {
+ rss = info.rss,
+ usertime = info.usertime,
+ systime = info.systime,
+ pagefaults = info.pagefaults,
+ queries = info.queries
+ }
end
- worker.sleep(1)
end
+ -- Publish stats updates periodically
+ if not is_empty then
+ local update = {time=os.time(), stats=stats_dt, upstreams=upstreams, workers=wdata}
+ return cur, update
+ end
+ return cur, nil
end
-- Function to sort frequency list
local function stream_stats(_, ws)
- -- Initially, stream history
- local ok, last = true, nil
- local batch = {}
- for i, s in ipairs(snapshots) do
- table.insert(batch, s)
- if #batch == 20 or i + 1 == #snapshots then
- ok = ws:send(tojson(batch))
- batch = {}
- end
- end
+ local ok = true
-- Publish stats updates periodically
+ local prev = getstats()
while ok do
- -- Get last snapshot
- local id = #snapshots - 1
- if id > 0 and snapshots[id].time ~= last then
- local push = tojson(snapshots[id])
- last = snapshots[id].time
- ok = ws:send(push)
- end
worker.sleep(1)
+ local update
+ prev, update = snapshot_start(prev)
+ local push = tojson(update)
+ ok = ws:send(push)
end
end
end
-- Export module interface
-M.init = snapshot_start
-M.deinit = snapshot_end
M.endpoints = {
['/stats'] = {'application/json', getstats, stream_stats},
['/frequent'] = {'application/json', function () return stats.frequent() end},