}
/* Rule builder submit */
$('#daf-add').click(function () {
- const form = $('#daf-builder-form');
+ const form = $('#daf-builder-form').parent();
if (dafBuilder.items.length == 0 || form.hasClass('has-error')) {
return;
}
/* Clear previous errors and resubmit. */
- form.find('.alert').remove();
+ form.parent().find('.alert').remove();
$.post('daf', dafBuilder.items.join(' '))
.done(function (data) {
dafBuilder.clear();
})
.fail(function (data) {
const reason = data.responseText.length > 0 ? data.responseText : 'internal error';
- form.append(
+ form.after(
'<div class="alert alert-danger" role="alert">'+
'Couldn\'t add rule (code: '+data.status+', reason: '+reason+').'+
'</div>'
-local cqueues = require('cqueues')
-
-- Load dependent modules
if not view then modules.load('view') end
if not policy then modules.load('policy') end
end,
}
-local function parse_filter(tok, g)
+local function parse_filter(tok, g, prev)
+ if not tok then error(string.format('expected filter after "%s"', prev)) end
local filter = filters[tok:lower()]
if not filter then error(string.format('invalid filter "%s"', tok)) end
return filter(g)
-- or terminate filter chain and return
tok = g()
while tok do
- if tok == 'AND' then
- local fa, fb = f, parse_filter(g(), g)
+ if tok:lower() == 'and' then
+ local fa, fb = f, parse_filter(g(), g, tok)
f = function (req, qry) return fa(req, qry) and fb(req, qry) end
- elseif tok == 'OR' then
- local fa, fb = f, parse_filter(g(), g)
+ elseif tok:lower() == 'or' then
+ local fa, fb = f, parse_filter(g(), g, tok)
f = function (req, qry) return fa(req, qry) or fb(req, qry) end
else
break
-- @function Cleanup module
function M.deinit()
- if http then
+ if http and http.endpoints then
http.endpoints['/daf'] = nil
http.endpoints['/daf.js'] = nil
http.snippets['/daf'] = nil
-- @function Add rule
function M.add(rule)
+ -- Ignore duplicates
+ for _, r in ipairs(M.rules) do
+ if r.info == rule then return r end
+ end
local id, action, filter = compile(rule)
if not id then error(action) end
-- Combine filter and action into policy
return M.toggle(id, true)
end
+local function consensus(op, ...)
+ local ret = true
+ local results = map(string.format(op, ...))
+ for _, r in ipairs(results) do
+ ret = ret and r
+ end
+ return ret
+end
+
-- @function Public-facing API
local function api(h, stream)
local m = h:get(':method')
local path = h:get(':path')
local id = tonumber(path:match '/([^/]*)$')
if id then
- if M.del(id) then
+ if consensus('daf.del "%s"', id) then
return tojson(true)
end
return 404, '"No such rule"' -- Not found
elseif m == 'POST' then
local query = stream:get_body_as_string()
if query then
- local ok, r, err = pcall(M.add, query)
- if not ok then return 500, string.format('"%s"', r) end
+ local ok, r = pcall(M.add, query)
+ if not ok then return 500, string.format('"%s"', r:match('/([^/]+)$')) end
+ -- Dispatch to all other workers
+ consensus('daf.add "%s"', query)
return rule_info(r)
end
return 400
end
-- We do not support more actions
if action == 'active' then
- if M.toggle(id, val == 'true') then
+ if consensus('daf.toggle(%d, %s)', id, val == 'true' or 'false') then
return tojson(true)
else
return 404, '"No such rule"'
end
end
+local function getmatches()
+ local update = {}
+ for _, rules in ipairs(map 'daf.rules') do
+ for _, r in ipairs(rules) do
+ local id = tostring(r.rule.id)
+ -- Must have string keys for JSON object and not an array
+ update[id] = (update[id] or 0) + r.rule.count
+ end
+ end
+ return update
+end
+
-- @function Publish DAF statistics
local function publish(h, ws)
- local ok, counters = true, {}
+ local cqueues = require('cqueues')
+ local ok, last = true, nil
while ok do
-- Check if we have new rule matches
- local update = {}
- for _, r in ipairs(M.rules) do
- local id = r.rule.id
- if counters[id] ~= r.rule.count then
- -- Must have string keys for JSON object and not an array
- update[tostring(id)] = r.rule.count
- counters[id] = r.rule.count
+ local diff = {}
+ local has_update, update = pcall(getmatches)
+ if has_update then
+ if last then
+ for id, count in pairs(update) do
+ if not last[id] or last[id] < count then
+ diff[id] = count
+ end
+ end
end
+ last = update
end
-- Update counters when there is a new data
- if next(update) ~= nil then
- ok = ws:send(tojson(update))
+ if next(diff) ~= nil then
+ ok = ws:send(tojson(diff))
else
ok = ws:send_ping()
end
-- @function Configure module
function M.config(conf)
- if not http then error('"http" module is not loaded, cannot load DAF') end
+ if not http or not http.endpoints then return end
-- Export API and data publisher
http.endpoints['/daf.js'] = http.page('daf.js', 'daf')
http.endpoints['/daf'] = {'application/json', api, publish}
<div class="col-md-11">
<input type="text" id="daf-builder" class="form-control" aria-label="..." />
</div>
- <button type="button" id="daf-add" class="btn btn-default btn-sm">Add</button>
+ <div class="col-md-1">
+ <button type="button" id="daf-add" class="btn btn-default btn-sm">Add</button>
+ </div>
</form>
</div>
<div class="row">
- <table id="daf-rules" class="table table-striped table-responsive">
- <th><td>No rules here yet.</td></th>
- </table>
+ <div class="col-md-12">
+ <table id="daf-rules" class="table table-striped table-responsive">
+ <th><td>No rules here yet.</td></th>
+ </table>
+ </div>
</div>
]]}
end
---- @module graphite
-local graphite = {}
+-- Load dependent modules
+if not stats then modules.load('stats') end
+
+-- This is leader-only module
+if worker.id > 0 then return {} end
+local M = {}
local socket = require('socket')
-- Create connected UDP socket
return s
end
+local function merge(results)
+ local t = {}
+ for _, result in ipairs(results) do
+ for k, v in pairs(result) do
+ t[k] = (t[k] or 0) + v
+ end
+ end
+ return t
+end
+
-- Send the metrics in a table to multiple Graphite consumers
local function publish_table(metrics, prefix, now)
for key,val in pairs(metrics) do
if prefix then
msg = prefix..'.'..msg
end
- for i in ipairs(graphite.cli) do
- local ok, err = graphite.cli[i]:send(msg)
+ for i in ipairs(M.cli) do
+ local ok, err = M.cli[i]:send(msg)
if not ok then
-- Best-effort reconnect once per two tries
- local tcp = graphite.cli[i]['connect'] ~= nil
- local host = graphite.info[i]
- if tcp and host.seen + 2 * graphite.interval / 1000 <= now then
+ local tcp = M.cli[i]['connect'] ~= nil
+ local host = M.info[i]
+ if tcp and host.seen + 2 * M.interval / 1000 <= now then
print(string.format('[graphite] reconnecting: %s#%d reason: %s',
host.addr, host.port, err))
- graphite.cli[i] = make_tcp(host.addr, host.port)
+ M.cli[i] = make_tcp(host.addr, host.port)
host.seen = now
end
end
end
end
-function graphite.init(module)
- graphite.ev = nil
- graphite.cli = {}
- graphite.info = {}
- graphite.interval = 5 * sec
- graphite.prefix = 'kresd.' .. hostname()
+function M.init(module)
+ M.ev = nil
+ M.cli = {}
+ M.info = {}
+ M.interval = 5 * sec
+ M.prefix = 'kresd.' .. hostname()
return 0
end
-function graphite.deinit(module)
- if graphite.ev then event.cancel(graphite.ev) end
+function M.deinit(module)
+ if M.ev then event.cancel(M.ev) end
return 0
end
-- @function Publish results to the Graphite server(s)
-function graphite.publish()
+function M.publish()
local now = os.time()
-- Publish built-in statistics
- if not graphite.cli then error("no graphite server configured") end
- publish_table(cache.stats(), graphite.prefix..'.cache', now)
- publish_table(worker.stats(), graphite.prefix..'.worker', now)
+ if not M.cli then error("no graphite server configured") end
+ publish_table(merge(map 'cache.stats()'), M.prefix..'.cache', now)
+ publish_table(merge(map 'worker.stats()'), M.prefix..'.worker', now)
-- Publish extended statistics if available
- if not stats then
- return 0
- end
- local now_metrics = stats.list()
- if type(now_metrics) ~= 'table' then
- return 0 -- No metrics to watch
- end
- publish_table(now_metrics, graphite.prefix, now)
+ publish_table(merge(map 'stats.list()'), M.prefix, now)
return 0
end
-- @function Make connection to Graphite server.
-function graphite.add_server(graphite, host, port, tcp)
+function M.add_server(graphite, host, port, tcp)
local s, err = tcp and make_tcp(host, port) or make_udp(host, port)
if not s then
error(err)
end
- table.insert(graphite.cli, s)
- table.insert(graphite.info, {addr = host, port = port, seen = 0})
+ table.insert(M.cli, s)
+ table.insert(M.info, {addr = host, port = port, seen = 0})
return 0
end
-function graphite.config(conf)
+function M.config(conf)
-- config defaults
if not conf then return 0 end
if not conf.port then conf.port = 2003 end
- if conf.interval then graphite.interval = conf.interval end
- if conf.prefix then graphite.prefix = conf.prefix end
+ if conf.interval then M.interval = conf.interval end
+ if conf.prefix then M.prefix = conf.prefix end
-- connect to host(s)
if type(conf.host) == 'table' then
for key, val in pairs(conf.host) do
graphite:add_server(conf.host, conf.port, conf.tcp)
end
-- start publishing stats
- if graphite.ev then event.cancel(graphite.ev) end
- graphite.ev = event.recurrent(graphite.interval, graphite.publish)
+ if M.ev then event.cancel(M.ev) end
+ M.ev = event.recurrent(M.interval, M.publish)
return 0
end
-return graphite
+return M
+-- Load dependent modules
+if not stats then modules.load('stats') end
+
+-- This is leader-only module
+if worker.id > 0 then return {} end
+
-- This is a module that does the heavy lifting to provide an HTTP/2 enabled
-- server that supports TLS by default and provides endpoint for other modules
-- in order to enable them to export restful APIs and websocket streams.
-- @function Configure module
function M.config(conf)
- conf = conf or {}
- assert(type(conf) == 'table', 'config { host = "...", port = 443, cert = "...", key = "..." }')
- -- Configure web interface for resolver
- if not conf.port then conf.port = 8053 end
- if not conf.host then conf.host = 'localhost' end
- if conf.geoip and has_mmdb then M.geoip = mmdb.open(conf.geoip) end
- M.interface(conf.host, conf.port, M.endpoints, conf.cert, conf.key)
- -- TODO: configure DNS/HTTP(s) interface
- if M.ev then return end
- -- Schedule both I/O activity notification and timeouts
- local poll_step
- poll_step = function ()
- local ok, err, _, co = cq:step(0)
- if not ok then warn('[http] error: %s %s', err, debug.traceback(co)) end
- -- Reschedule timeout or create new one
- local timeout = cq:timeout()
- if timeout then
- -- Throttle web requests
- if timeout == 0 then timeout = 0.001 end
- -- Convert from seconds to duration
- timeout = timeout * sec
- if not M.timeout then
- M.timeout = event.after(timeout, poll_step)
- else
- event.reschedule(M.timeout, timeout)
- end
- else -- Cancel running timeout when there is no next deadline
- if M.timeout then
- event.cancel(M.timeout)
- M.timeout = nil
- end
+ if conf == true then conf = {} end
+ assert(type(conf) == 'table', 'config { host = "...", port = 443, cert = "...", key = "..." }')
+ -- Configure web interface for resolver
+ if not conf.port then conf.port = 8053 end
+ if not conf.host then conf.host = 'localhost' end
+ if conf.geoip and has_mmdb then M.geoip = mmdb.open(conf.geoip) end
+ M.interface(conf.host, conf.port, M.endpoints, conf.cert, conf.key)
+ -- TODO: configure DNS/HTTP(s) interface
+ if M.ev then return end
+ -- Schedule both I/O activity notification and timeouts
+ local poll_step
+ poll_step = function ()
+ local ok, err, _, co = cq:step(0)
+ if not ok then warn('[http] error: %s %s', err, debug.traceback(co)) end
+ -- Reschedule timeout or create new one
+ local timeout = cq:timeout()
+ if timeout then
+ -- Throttle web requests
+ if timeout == 0 then timeout = 0.001 end
+ -- Convert from seconds to duration
+ timeout = timeout * sec
+ if not M.timeout then
+ M.timeout = event.after(timeout, poll_step)
+ else
+ event.reschedule(M.timeout, timeout)
+ end
+ else -- Cancel running timeout when there is no next deadline
+ if M.timeout then
+ event.cancel(M.timeout)
+ M.timeout = nil
end
end
- M.ev = event.socket(cq:pollfd(), poll_step)
+ end
+ M.ev = event.socket(cq:pollfd(), poll_step)
end
return M
-- Gauge metrics
local gauges = {
['worker.concurrent'] = true,
+ ['worker.rss'] = true,
}
--- Load dependent modules
-if not stats then modules.load('stats') end
+local function merge(t, results, prefix)
+ for x, result in pairs(results) do
+ if type(result) == 'table' then
+ for k, v in pairs(result) do
+ local val = t[prefix..k]
+ t[prefix..k] = (val or 0) + v
+ end
+ end
+ end
+end
local function getstats()
- local t = stats.list()
- for k,v in pairs(cache.stats()) do t['cache.'..k] = v end
- for k,v in pairs(worker.stats()) do t['worker.'..k] = v end
+ local t = {}
+ merge(t, map 'stats.list()', '')
+ merge(t, map 'cache.stats()', 'cache.')
+ merge(t, map 'worker.stats()', 'worker.')
return t
end
end
end
end
+ -- Aggregate per-worker metrics
+ local wdata = {}
+ for i, info in pairs(map 'worker.info()') do
+ if type(info) == 'table' then
+ wdata[tostring(info.pid)] = {rss=info.rss, usertime=info.usertime, systime=info.systime, pagefaults=info.pagefaults, queries=info.queries}
+ end
+ end
-- Publish stats updates periodically
if not is_empty then
- local update = {time=os.time(), stats=stats_dt, upstreams=upstreams or {}}
+ local update = {time=os.time(), stats=stats_dt, upstreams=upstreams, workers=wdata}
table.insert(snapshots, update)
if #snapshots > snapshots_count then
table.remove(snapshots, 1)
border-color: #4cae4c !important;
color: #fff !important;
}
+
+.spark {
+ display: inline-block;
+}
+
+.spark-legend {
+ display: inline-block;
+}
\ No newline at end of file
/* Render other interesting metrics as lines (hidden by default) */
var data = [];
- var last_metric = 15;
+ var last_metric = 17;
var metrics = {
'answer.noerror': [0, 'NOERROR', null, 'By RCODE'],
'answer.nodata': [1, 'NODATA', null, 'By RCODE'],
'worker.concurrent': [13, 'Queries outstanding'],
'worker.queries': [14, 'Queries received/s'],
'worker.dropped': [15, 'Queries dropped'],
+ 'worker.usertime': [16, 'CPU (user)', null, 'Workers'],
+ 'worker.systime': [17, 'CPU (sys)', null, 'Workers'],
};
/* Render latency metrics as sort of a heatmap */
var bubblemap = {};
function pushUpstreams(resp) {
if (resp == null) {
+ $('#map-container').hide();
return;
+ } else {
+ $('#map-container').show();
}
/* Get current maximum number of queries for bubble diameter adjustment */
var maxQueries = 1;
age = age + 1;
}
+ /* Per-worker information */
+ function updateRate(x, y, dt) {
+ return (100.0 * ((x - y) / dt)).toFixed(1);
+ }
+ function updateWorker(row, next, data, timestamp, buffer) {
+ const dt = timestamp - data.timestamp;
+ const cell = row.find('td');
+ /* Update spark lines and CPU times first */
+ if (dt > 0.0) {
+ const utimeRate = updateRate(next.usertime, data.last.usertime, dt);
+ const stimeRate = updateRate(next.systime, data.last.systime, dt);
+ cell.eq(1).find('span').text(utimeRate + '% / ' + stimeRate + '%');
+ /* Update sparkline graph */
+ data.data.push([new Date(timestamp * 1000), utimeRate, stimeRate]);
+ if (data.data.length > 60) {
+ data.data.shift();
+ }
+ if (!buffer) {
+ data.graph.updateOptions( { 'file': data.data } );
+ }
+ }
+ /* Update other fields */
+ if (!buffer) {
+ cell.eq(2).text(formatNumber(next.rss) + 'B');
+ cell.eq(3).text(next.pagefaults);
+ cell.eq(4).text('Healthy').addClass('text-success');
+ }
+ }
+
+ var workerData = {};
+ function pushWorkers(resp, timestamp, buffer) {
+ if (resp == null) {
+ return;
+ }
+ const workerTable = $('#workers');
+ for (var pid in resp) {
+ var row = workerTable.find('tr[data-pid='+pid+']');
+ if (row.length == 0) {
+ row = workerTable.append(
+ '<tr data-pid='+pid+'><td>'+pid+'</td>'+
+ '<td><div class="spark" id="spark-'+pid+'" /><span /></td><td></td><td></td><td></td>'+
+ '</tr>');
+ /* Create sparkline visualisation */
+ const spark = row.find('#spark-'+pid);
+ spark.css({'margin-right': '1em', width: '80px', height: '1.4em'});
+ workerData[pid] = {timestamp: timestamp, data: [[new Date(timestamp * 1000),0,0]], last: resp[pid]};
+ const workerGraph = new Dygraph(spark[0],
+ workerData[pid].data, {
+ valueRange: [0, 100],
+ legend: 'never',
+ axes : {
+ x : {
+ drawGrid: false,
+ drawAxis : false,
+ },
+ y : {
+ drawGrid: false,
+ drawAxis : false,
+ }
+ },
+ labels: ['x', '%user', '%sys'],
+ labelsDiv: '',
+ }
+ );
+ workerData[pid].graph = workerGraph;
+ }
+ updateWorker(row, resp[pid], workerData[pid], timestamp, buffer);
+ /* Track last datapoint */
+ workerData[pid].last = resp[pid];
+ workerData[pid].timestamp = timestamp;
+ }
+ /* Prune unhealthy PIDs */
+ if (!buffer) {
+ workerTable.find('tr').each(function () {
+ const e = $(this);
+ if (!(e.data('pid') in resp)) {
+ const healthCell = e.find('td').last();
+ healthCell.removeClass('text-success')
+ healthCell.text('Dead').addClass('text-danger');
+ }
+ });
+ }
+ }
+
/* WebSocket endpoints */
var wsStats = (secure ? 'wss://' : 'ws://') + location.host + '/stats';
var ws = new Socket(wsStats);
if (data.length > 0) {
pushUpstreams(data[data.length - 1].upstreams);
}
+ /* Buffer datapoints and redraw last */
for (var i in data) {
- pushMetrics(data[i].stats, data[i].time, true);
+ const is_last = (i == data.length - 1);
+ pushWorkers(data[i].workers, data[i].time, !is_last);
+ pushMetrics(data[i].stats, data[i].time, !is_last);
}
- graph.updateOptions( { 'file': data } );
} else {
- pushMetrics(data.stats, data.time);
pushUpstreams(data.upstreams);
+ pushWorkers(data.workers, data.time);
+ pushMetrics(data.stats, data.time);
}
-
};
});
\ No newline at end of file
</div>
</div>
</div>
+ <div class="row">
+ <h3>Running workers</h3>
+ <div class="col-md-12">
+ <table id="workers" class="table table-responsive">
+ <tr>
+ <th>PID</th><th>CPU per-worker (user/sys)</th>
+ <th>RSS</th><th>Page faults</th><th>Status</th>
+ </tr>
+ </table>
+ </div>
+ </div>
</div>
- <a name="worldmap"></a>
- <h2 class="sub-header">Where do the queries go?</h2>
- <div class="col-md-12">
- <div id="map" style="position: relative;"></div>
+ <div class="row" id="map-container">
+ <a name="worldmap"></a>
+ <h2 class="sub-header">Where do the queries go?</h2>
+ <div class="col-md-12">
+ <div id="map" style="position: relative;"></div>
+ </div>
</div>
<div class="col-md-12">
{{ snippets }}
end
-- RPZ policy set
-function policy.rpz(action, path, format)
- if format == 'lmdb' then
- error('lmdb zone format is NYI')
- else
- return rpz_zonefile(action, path)
- end
+function policy.rpz(action, path)
+ return rpz_zonefile(action, path)
end
-- Evaluate packet in given rules to determine policy action