print(f"*** End startAuth log for {logFile} ***")
raise AssertionError('%s failed (%d)' % (authcmd, cls._auths[ipaddress].returncode))
+ @classmethod
+ def checkConfdir(cls, confdir):
+ if cls.__name__ != 'FlagsTest' and os.path.basename(confdir) + 'Test' != cls.__name__:
+ raise AssertionError('conf dir ' + confdir + ' and ' + cls.__name__ + ' inconsistent with convention')
+
@classmethod
def generateRecursorConfig(cls, confdir):
+ cls.checkConfdir(confdir)
params = tuple([getattr(cls, param) for param in cls._config_params])
if len(params):
print(params)
@classmethod
def generateRecursorYamlConfig(cls, confdir, luaConfig=True):
+ cls.checkConfdir(confdir)
params = tuple([getattr(cls, param) for param in cls._config_params])
if len(params):
print(params)
cls.tearDownRecursor()
class APIAllowedRecursorTest(APIRecursorTest):
- _confdir = 'API'
+ _confdir = 'APIAllowedRecursor'
_wsPort = 8042
_wsTimeout = 2
_wsPassword = 'secretpassword'
self.assertTrue(r.json())
class APIDeniedRecursorTest(APIRecursorTest):
- _confdir = 'API'
+ _confdir = 'APIDeniedRecursor'
_wsPort = 8042
_wsTimeout = 2
_wsPassword = 'secretpassword'
self.assertEqual(res.options[0].otype, 15)
self.assertEqual(res.options[0], extendederrors.ExtendedErrorOption(29, b'Result synthesized from aggressive NSEC cache (RFC8198)'))
-class AggressiveNSECCacheNSEC(AggressiveNSECCacheBase):
+class AggressiveNSECCacheNSECTest(AggressiveNSECCacheBase):
_confdir = 'AggressiveNSECCacheNSEC'
__test__ = True
self.assertEqual(res.options[0].otype, 15)
self.assertEqual(res.options[0], extendederrors.ExtendedErrorOption(9, b''))
-class AggressiveNSECCacheNSEC3(AggressiveNSECCacheBase):
+class AggressiveNSECCacheNSEC3Test(AggressiveNSECCacheBase):
_confdir = 'AggressiveNSECCacheNSEC3'
__test__ = True
import socket
from recursortests import RecursorTest
-class testAnyBind(RecursorTest):
+class AnyBindTest(RecursorTest):
_confdir = 'AnyBind'
_config_template = """dnssec=validate
@ 3600 IN SOA {soa}
@ 3600 IN A 192.0.2.88
""".format(soa=cls._SOA))
- super(testAnyBind, cls).generateRecursorConfig(confdir)
+ super(AnyBindTest, cls).generateRecursorConfig(confdir)
@classmethod
def setUpSockets(cls):
from recursortests import RecursorTest
-class TestCarbon(RecursorTest):
+class CarbonTest(RecursorTest):
_confdir = 'Carbon'
_carbonNamespace = 'NS'
_carbonInstance = 'Instance'
from recursortests import RecursorTest
-class DNS64RecursorTest(RecursorTest):
+class DNS64Test(RecursorTest):
_confdir = 'DNS64'
_config_template = """
1.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.0.8.b.d.0.1.0.0.2 IN PTR aaaa.example.dns64.
""".format(soa=cls._SOA))
- super(DNS64RecursorTest, cls).generateRecursorConfig(confdir)
+ super(DNS64Test, cls).generateRecursorConfig(confdir)
# this type (A) exists for this name
def testExistingA(self):
self.assertEqual(message.edns, -1)
-class EDNSBufferTest16801680(EDNSBufferTest):
+class EDNSBuffer16801680Test(EDNSBufferTest):
"""
Runs test cases 1, 2, 5, 6, 7, 8
"""
+ _confdir = 'EDNSBuffer16801680'
def testEdnsBufferTestCase01(self):
query = self.getMessage('01', 4096)
message = dns.message.from_wire(raw)
self.checkEDNS(message, 512)
-class EDNSBufferTest16801681(EDNSBufferTest):
+class EDNSBuffer16801681Test(EDNSBufferTest):
"""
Runs test case 3
"""
self.checkEDNS(message, 512)
-class EDNSBufferTest16801679(EDNSBufferTest):
+class EDNSBuffer16801679Test(EDNSBufferTest):
"""
Runs test case 4
"""
from recursortests import RecursorTest
-class testExpired(RecursorTest):
+class ExpiredTest(RecursorTest):
"""This regression test starts the authoritative servers with a clock that is
set 15 days into the past. Hence, the recursor must reject the signatures
because they are expired.
self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
-class testExpiredWithEDE(RecursorTest):
+class ExpiredWithEDETest(RecursorTest):
"""This regression test starts the authoritative servers with a clock that is
set 15 days into the past. Hence, the recursor must reject the signatures
because they are expired.
from recursortests import RecursorTest
-class ExtendedErrorsRecursorTest(RecursorTest):
+class ExtendedErrorsTest(RecursorTest):
_confdir = 'ExtendedErrors'
_config_template = """
*.rpz.extended.zone.rpz. 60 IN CNAME .
""".format(soa=cls._SOA))
- super(ExtendedErrorsRecursorTest, cls).generateRecursorConfig(confdir)
+ super(ExtendedErrorsTest, cls).generateRecursorConfig(confdir)
def testNotIncepted(self):
qname = 'signotincepted.bad-dnssec.wb.sidnlabs.nl.'
self.assertEqual(res.options[0].otype, 15)
self.assertEqual(res.options[0], extendederrors.ExtendedErrorOption(10, b'Extra text from Lua!'))
-class NoExtendedErrorsRecursorTest(RecursorTest):
+class NoExtendedErrorsTest(RecursorTest):
- _confdir = 'ExtendedErrorsDisabled'
+ _confdir = 'NoExtendedErrors'
_config_template = """
dnssec=validate
extended-resolution-errors=no
@classmethod
def generateRecursorConfig(cls, confdir):
- super(NoExtendedErrorsRecursorTest, cls).generateRecursorConfig(confdir)
+ super(NoExtendedErrorsTest, cls).generateRecursorConfig(confdir)
def testNotIncepted(self):
qname = 'signotincepted.bad-dnssec.wb.sidnlabs.nl.'
from twisted.internet import reactor
import threading
-class testInterop(RecursorTest):
+class InteropTest(RecursorTest):
_confdir = 'Interop'
_config_template = """dnssec=validate
cls._UDPResponder.setDaemon(True)
cls._UDPResponder.start()
-class testInteropProcess(RecursorTest):
+class InteropProcessTest(RecursorTest):
_confdir = 'InteropProcess'
_config_template = """dnssec=process
from recursortests import RecursorTest
-class testKeepOpenTCP(RecursorTest):
+class KeepOpenTCPTest(RecursorTest):
_confdir = 'KeepOpenTCP'
_config_template = """dnssec=validate
@ 3600 IN SOA {soa}
@ 3600 IN A 192.0.2.88
""".format(soa=cls._SOA))
- super(testKeepOpenTCP, cls).generateRecursorConfig(confdir)
+ super(KeepOpenTCPTest, cls).generateRecursorConfig(confdir)
def sendTCPQueryKeepOpen(cls, sock, query, timeout=2.0):
try:
from recursortests import RecursorTest
-class testLockedCache(RecursorTest):
+class LockedCacheTest(RecursorTest):
"""
Test that a locked cached entry is *not* updated by the same additional encountered in a second query
"""
ttl2 = self.getCacheTTL()
self.assertGreater(ttl1, ttl2)
-class testNotLockedCache(RecursorTest):
+class NotLockedCacheTest(RecursorTest):
"""
Test that a not locked cached entry *is* updated by the same additional encountered in a second query
"""
import dns
from recursortests import RecursorTest
-class testNTA(RecursorTest):
+class NTATest(RecursorTest):
_confdir = 'NTA'
_config_template = """dnssec=validate"""
import time
from recursortests import RecursorTest
-class testNamedForward(RecursorTest):
+class NamedForwardTest(RecursorTest):
"""
This is forwarding test using a name as target
"""
self.assertMatchingRRSIGInAnswer(res, expected)
@unittest.skipUnless('ENABLE_SUDO_TESTS' in os.environ, "sudo is not available")
-class testNamedForwardWithChange(RecursorTest):
+class NamedForwardWithChangeTest(RecursorTest):
"""
This is forwarding test using a name as target and a changing resolve
"""
from recursortests import RecursorTest
-class testNoDS(RecursorTest):
+class NoDSTest(RecursorTest):
_confdir = 'NoDS'
_config_template = """dnssec=validate"""
import dns
from recursortests import RecursorTest
-class testNoDSYAML(RecursorTest):
+class NoDSYAMLTest(RecursorTest):
_confdir = 'NoDSYAML'
_config_template = """
"""
@classmethod
def generateRecursorConfig(cls, confdir):
- super(testNoDSYAML, cls).generateRecursorYamlConfig(confdir, False)
+ super(NoDSYAMLTest, cls).generateRecursorYamlConfig(confdir, False)
def testNoDSInsecure(self):
"""#4430 When the root DS is removed, the result must be Insecure"""
from recursortests import RecursorTest
-class testNotYetValid(RecursorTest):
+class NotYetValidTest(RecursorTest):
"""This regression test starts the authoritative servers with a clock that is
set 15 days into the future. Hence, the recursor must reject the signatures
because they are not yet valid.
from recursortests import RecursorTest
-class NotifyRecursorTest(RecursorTest):
+class NotifyTest(RecursorTest):
_auth_zones = {
'8': {'threads': 1,
e 3600 IN A 192.0.2.42
f 3600 IN CNAME f ; CNAME loop: dirty trick to get a ServFail in an authzone
""".format(soa=cls._SOA))
- super(NotifyRecursorTest, cls).generateRecursorConfig(confdir)
+ super(NotifyTest, cls).generateRecursorConfig(confdir)
def checkRecordCacheMetrics(self, expectedHits, expectedMisses):
headers = {'x-api-key': self._apiKey}
import time
from recursortests import RecursorTest
-class testOOOTCP(RecursorTest):
+class OOOTCPTest(RecursorTest):
_confdir = 'OOOTCP'
_config_template = """dnssec=validate
@classmethod
def generateRecursorConfig(cls, confdir):
- super(testOOOTCP, cls).generateRecursorConfig(confdir)
+ super(OOOTCPTest, cls).generateRecursorConfig(confdir)
def testOOOVeryBasic(self):
expected = {}
from recursortests import RecursorTest
-class PacketCacheRecursorTest(RecursorTest):
+class PacketCacheTest(RecursorTest):
_auth_zones = {
'8': {'threads': 1,
e 3600 IN A 192.0.2.42
f 3600 IN CNAME f ; CNAME loop: dirty trick to get a ServFail in an authzone
""".format(soa=cls._SOA))
- super(PacketCacheRecursorTest, cls).generateRecursorConfig(confdir)
+ super(PacketCacheTest, cls).generateRecursorConfig(confdir)
def checkPacketCacheMetrics(self, expectedHits, expectedMisses):
self.waitForTCPSocket("127.0.0.1", self._wsPort)
raise AssertionError('%s returned an unexpected output. Faulty line is "%s", complete content is "%s"' % (testcmd, line, output))
class BasicPrometheusTest(RecPrometheusTest):
- _confdir = 'Prometheus'
+ _confdir = 'BasicPrometheus'
_wsPort = 8042
_wsTimeout = 2
_wsPassword = 'secretpassword'
import os
from recursortests import RecursorTest
-class testProxyByTable(RecursorTest):
+class ProxyByTableTest(RecursorTest):
"""
This test makes sure that we correctly use the proxy-mapped address during the ACL check
"""
@ 3600 IN SOA {soa}
@ 3600 IN A 192.0.2.88
""".format(soa=cls._SOA))
- super(testProxyByTable, cls).generateRecursorConfig(confdir)
+ super(ProxyByTableTest, cls).generateRecursorConfig(confdir)
def testA(self):
from recursortests import RecursorTest
from proxyprotocol import ProxyProtocol
-class ProxyProtocolRecursorTest(RecursorTest):
+class ProxyProtocolTest(RecursorTest):
@classmethod
def setUpClass(cls):
def tearDownClass(cls):
cls.tearDownRecursor()
-class ProxyProtocolAllowedRecursorTest(ProxyProtocolRecursorTest):
- _confdir = 'ProxyProtocol'
+class ProxyProtocolAllowedTest(ProxyProtocolTest):
+ _confdir = 'ProxyProtocolAllowed'
_wsPort = 8042
_wsTimeout = 2
_wsPassword = 'secretpassword'
dq:addAnswer(pdns.A, '192.0.2.1', 60)
return true
end
- """ % (ProxyProtocolRecursorTest._recursorPort, ProxyProtocolRecursorTest._recursorPort)
+ """ % (ProxyProtocolTest._recursorPort, ProxyProtocolTest._recursorPort)
_config_template = """
proxy-protocol-from=127.0.0.1
self.assertEqual(count, 5)
sock.close()
-class ProxyProtocolAllowedFFIRecursorTest(ProxyProtocolAllowedRecursorTest):
- # same tests than ProxyProtocolAllowedRecursorTest but with the Lua FFI interface instead of the regular one
- _confdir = 'ProxyProtocolFFI'
+class ProxyProtocolAllowedFFITest(ProxyProtocolAllowedTest):
+ # same tests than ProxyProtocolAllowedTest but with the Lua FFI interface instead of the regular one
+ _confdir = 'ProxyProtocolAllowedFFI'
_lua_dns_script_file = """
local ffi = require("ffi")
dq:addAnswer(pdns.A, '192.0.2.1', 60)
return true
end
- """ % (ProxyProtocolAllowedRecursorTest._recursorPort)
+ """ % (ProxyProtocolAllowedTest._recursorPort)
-class ProxyProtocolNotAllowedRecursorTest(ProxyProtocolRecursorTest):
+class ProxyProtocolNotAllowedTest(ProxyProtocolTest):
_confdir = 'ProxyProtocolNotAllowed'
_lua_dns_script_file = """
res = sender(query, False, '127.0.0.42', '255.255.255.255', 0, 65535, [ [0, b'foo' ], [ 255, b'bar'] ])
self.assertEqual(res, None)
-class ProxyProtocolExceptionRecursorTest(ProxyProtocolRecursorTest):
+class ProxyProtocolExceptionTest(ProxyProtocolTest):
_confdir = 'ProxyProtocolException'
_lua_dns_script_file = """
proxy-protocol-from=127.0.0.1/32
proxy-protocol-exceptions=127.0.0.1:%d
allow-from=127.0.0.0/24, ::1/128
-""" % (ProxyProtocolRecursorTest._recursorPort)
+""" % (ProxyProtocolTest._recursorPort)
def testNoHeaderProxyProtocol(self):
qname = 'no-header.proxy-protocol-not-allowed.recursor-tests.powerdns.com.'
cls.tearDownRecursor()
os.unlink('tagfile')
-class testRoutingTag(RoutingTagTest):
+class RoutingTagTest(RoutingTagTest):
_confdir = 'RoutingTag'
_config_template = """
print(e.output)
raise
-class testRoutingTagFFI(RoutingTagTest):
+class RoutingTagFFITest(RoutingTagTest):
_confdir = 'RoutingTagFFI'
_config_template = """
from recursortests import RecursorTest
-class TestSNMP(RecursorTest):
+class SNMPTest(RecursorTest):
_snmpTimeout = 2.0
_snmpServer = '127.0.0.1'
import os
from recursortests import RecursorTest
-class testServerNames(RecursorTest):
+class ServerNamesTest(RecursorTest):
"""
This tests all kinds naming things
"""
import os
from recursortests import RecursorTest
-class testSimple(RecursorTest):
+class SimpleTest(RecursorTest):
_confdir = 'Simple'
_config_template = """dnssec=validate
@ 3600 IN SOA {soa}
@ 3600 IN A 192.0.2.88
""".format(soa=cls._SOA))
- super(testSimple, cls).generateRecursorConfig(confdir)
+ super(SimpleTest, cls).generateRecursorConfig(confdir)
def testSOAs(self):
for zone in ['.', 'example.', 'secure.example.']:
import subprocess
from recursortests import RecursorTest
-class testSimpleDoT(RecursorTest):
+class SimpleDoTTest(RecursorTest):
"""
This tests DoT to auth server in a very basic way and is dependent on powerdns.com nameservers having DoT enabled.
"""
import subprocess
from recursortests import RecursorTest
-class testSimpleForwardOverDoT(RecursorTest):
+class SimpleForwardOverDoTTest(RecursorTest):
"""
This is forwarding to a DoT server in a very basic way and is dependent on Quad9 working
"""
import os
from recursortests import RecursorTest
-class testSimpleTCP(RecursorTest):
+class SimpleTCPTest(RecursorTest):
_confdir = 'SimpleTCP'
_config_template = """dnssec=validate
@ 3600 IN SOA {soa}
@ 3600 IN A 192.0.2.88
""".format(soa=cls._SOA))
- super(testSimpleTCP, cls).generateRecursorConfig(confdir)
+ super(SimpleTCPTest, cls).generateRecursorConfig(confdir)
def testSOAs(self):
for zone in ['.', 'example.', 'secure.example.']:
import os
from recursortests import RecursorTest
-class testSimpleYAML(RecursorTest):
+class SimpleYAMLTest(RecursorTest):
_confdir = 'SimpleYAML'
_config_template = """
@ 3600 IN SOA {soa}
@ 3600 IN A 192.0.2.88
""".format(soa=cls._SOA))
- super(testSimpleYAML, cls).generateRecursorYamlConfig(confdir)
+ super(SimpleYAMLTest, cls).generateRecursorYamlConfig(confdir)
def testSOAs(self):
for zone in ['.', 'example.', 'secure.example.']:
from recursortests import RecursorTest
-class testSortlist(RecursorTest):
+class SortlistTest(RecursorTest):
_confdir = 'Sortlist'
_config_template = """dnssec=off"""
self.assertEqual(indexCNAME, 0)
self.assertGreater(indexMX, 0)
- self.assertEqual(recordsA, ['17.238.240.5', '17.38.42.80', '192.168.0.1'])
\ No newline at end of file
+ self.assertEqual(recordsA, ['17.238.240.5', '17.38.42.80', '192.168.0.1'])
import subprocess
from recursortests import RecursorTest
-class testTraceFail(RecursorTest):
+class TraceFailTest(RecursorTest):
_confdir = 'TraceFail'
_config_template = """
from recursortests import RecursorTest
-class testZTC(RecursorTest):
+class ZTCTest(RecursorTest):
_confdir = 'ZTC'
_config_template = """
from basicDNSSEC import BasicDNSSEC
import unittest
-class basicNSEC(BasicDNSSEC):
+class basicNSECTest(BasicDNSSEC):
__test__ = True
_confdir = 'basicNSEC'
import os
import subprocess
-class basicNSEC3(BasicDNSSEC):
+class basicNSEC3Test(BasicDNSSEC):
__test__ = True
_confdir = 'basicNSEC3'