import unittest
from dnsdisttests import DNSDistTest, pickAvailablePort
-@unittest.skipIf('SKIP_PROMETHEUS_TESTS' in os.environ, 'Prometheus tests are disabled')
+
+@unittest.skipIf("SKIP_PROMETHEUS_TESTS" in os.environ, "Prometheus tests are disabled")
class TestPrometheus(DNSDistTest):
_webTimeout = 2.0
_webServerPort = pickAvailablePort()
- _webServerBasicAuthPassword = 'secret'
- _webServerBasicAuthPasswordHashed = '$scrypt$ln=10,p=1,r=8$6DKLnvUYEeXWh3JNOd3iwg==$kSrhdHaRbZ7R74q3lGBqO1xetgxRxhmWzYJ2Qvfm7JM='
- _webServerAPIKey = 'apisecret'
- _webServerAPIKeyHashed = '$scrypt$ln=10,p=1,r=8$9v8JxDfzQVyTpBkTbkUqYg==$bDQzAOHeK1G9UvTPypNhrX48w974ZXbFPtRKS34+aso='
- _config_params = ['_testServerPort', '_webServerPort', '_webServerBasicAuthPasswordHashed', '_webServerAPIKeyHashed']
+ _webServerBasicAuthPassword = "secret"
+ _webServerBasicAuthPasswordHashed = "$scrypt$ln=10,p=1,r=8$6DKLnvUYEeXWh3JNOd3iwg==$kSrhdHaRbZ7R74q3lGBqO1xetgxRxhmWzYJ2Qvfm7JM="
+ _webServerAPIKey = "apisecret"
+ _webServerAPIKeyHashed = "$scrypt$ln=10,p=1,r=8$9v8JxDfzQVyTpBkTbkUqYg==$bDQzAOHeK1G9UvTPypNhrX48w974ZXbFPtRKS34+aso="
+ _config_params = [
+ "_testServerPort",
+ "_webServerPort",
+ "_webServerBasicAuthPasswordHashed",
+ "_webServerAPIKeyHashed",
+ ]
_config_template = """
newServer{address="127.0.0.1:%d"}
webserver("127.0.0.1:%d")
if line in linesSeen:
raise AssertionError(f"Duplicate line in prometheus output: '{line}'")
linesSeen[line] = True
- if line.startswith('# HELP'):
- tokens = line.split(' ')
+ if line.startswith("# HELP"):
+ tokens = line.split(" ")
self.assertGreaterEqual(len(tokens), 4)
- elif line.startswith('# TYPE'):
- tokens = line.split(' ')
+ elif line.startswith("# TYPE"):
+ tokens = line.split(" ")
self.assertEqual(len(tokens), 4)
- self.assertIn(tokens[3], ['counter', 'gauge', 'histogram'])
- elif not line.startswith('#'):
- tokens = line.split(' ')
+ self.assertIn(tokens[3], ["counter", "gauge", "histogram"])
+ elif not line.startswith("#"):
+ tokens = line.split(" ")
self.assertEqual(len(tokens), 2)
- if not line.startswith('dnsdist_') and not line.startswith('custom_'):
+ if not line.startswith("dnsdist_") and not line.startswith("custom_"):
raise AssertionError(
- 'Expecting prometheus metric to be prefixed by \'dnsdist_\', got: "%s"' % (line))
+ "Expecting prometheus metric to be prefixed by 'dnsdist_', got: \"%s\""
+ % (line)
+ )
key = tokens[0]
if key in keysSeen:
raise AssertionError(f"Duplicate prometheus key: '{key}'")
keysSeen[key] = True
- def checkMetric(self, content, name, expectedType, expectedValue, expectedLabels=""):
+ def checkMetric(
+ self, content, name, expectedType, expectedValue, expectedLabels=""
+ ):
typeFound = False
helpFound = False
valueFound = False
labelsFound = True
for line in content.splitlines():
if name in str(line):
- tokens = line.split(' ')
- if line.startswith('# HELP'):
+ tokens = line.split(" ")
+ if line.startswith("# HELP"):
self.assertGreaterEqual(len(tokens), 4)
if tokens[2] == name:
helpFound = True
- elif line.startswith('# TYPE'):
+ elif line.startswith("# TYPE"):
self.assertEqual(len(tokens), 4)
if tokens[2] == name:
typeFound = True
self.assertEqual(tokens[3], expectedType)
- elif not line.startswith('#'):
+ elif not line.startswith("#"):
self.assertEqual(len(tokens), 2)
if tokens[0] == name:
valueFound = True
def checkPrometheusContentPromtool(self, content):
output = None
try:
- testcmd = ['promtool', 'check', 'metrics']
- process = subprocess.Popen(testcmd, stdout=subprocess.PIPE, stdin=subprocess.PIPE, stderr=subprocess.STDOUT, close_fds=True)
+ testcmd = ["promtool", "check", "metrics"]
+ process = subprocess.Popen(
+ testcmd,
+ stdout=subprocess.PIPE,
+ stdin=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
+ close_fds=True,
+ )
output = process.communicate(input=content)
except subprocess.CalledProcessError as exc:
- raise AssertionError('%s failed (%d): %s' % (testcmd, process.returncode, process.output))
+ raise AssertionError(
+ "%s failed (%d): %s" % (testcmd, process.returncode, process.output)
+ )
# promtool may return 3 because of the "_total" suffix warnings
if not process.returncode in [0, 3]:
- raise AssertionError('%s failed (%d): %s' % (testcmd, process.returncode, output))
+ raise AssertionError(
+ "%s failed (%d): %s" % (testcmd, process.returncode, output)
+ )
for line in output[0].splitlines():
- if line.endswith(b"should have \"_total\" suffix"):
+ if line.endswith(b'should have "_total" suffix'):
continue
- raise AssertionError('%s returned an unexpected output. Faulty line is "%s", complete content is "%s"' % (testcmd, line, output))
+ raise AssertionError(
+ '%s returned an unexpected output. Faulty line is "%s", complete content is "%s"'
+ % (testcmd, line, output)
+ )
def testMetrics(self):
"""
Prometheus: Retrieve metrics
"""
- url = 'http://127.0.0.1:' + str(self._webServerPort) + '/metrics'
- r = requests.get(url, auth=('whatever', self._webServerBasicAuthPassword), timeout=self._webTimeout)
+ url = "http://127.0.0.1:" + str(self._webServerPort) + "/metrics"
+ r = requests.get(
+ url,
+ auth=("whatever", self._webServerBasicAuthPassword),
+ timeout=self._webTimeout,
+ )
self.assertTrue(r)
self.assertEqual(r.status_code, 200)
self.checkPrometheusContentBasic(r.text)
self.checkPrometheusContentPromtool(r.content)
- self.checkMetric(r.text, 'dnsdist_custom_metric1', 'counter', 1)
- self.checkMetric(r.text, 'dnsdist_custom_metric2', 'gauge', 0)
- self.checkMetric(r.text, 'custom_prometheus_name', 'counter', 0)
- self.checkMetric(r.text, 'dnsdist_custom_metric_foo', 'counter', 1, '{x="bar",y="xyz"}')
- self.checkMetric(r.text, 'dnsdist_custom_metric_foo', 'counter', 1, '{x="baz",y="abc"}')
+ self.checkMetric(r.text, "dnsdist_custom_metric1", "counter", 1)
+ self.checkMetric(r.text, "dnsdist_custom_metric2", "gauge", 0)
+ self.checkMetric(r.text, "custom_prometheus_name", "counter", 0)
+ self.checkMetric(
+ r.text, "dnsdist_custom_metric_foo", "counter", 1, '{x="bar",y="xyz"}'
+ )
+ self.checkMetric(
+ r.text, "dnsdist_custom_metric_foo", "counter", 1, '{x="baz",y="abc"}'
+ )