except NameError:
pass
+def getWorkerID():
+ if not 'PYTEST_XDIST_WORKER' in os.environ:
+ return 0
+ workerName = os.environ['PYTEST_XDIST_WORKER']
+ return int(workerName[2:])
+
+workerPorts = {}
+
+def pickAvailablePort():
+ global workerPorts
+ workerID = getWorkerID()
+ if workerID in workerPorts:
+ port = workerPorts[workerID] + 1
+ else:
+ port = 11000 + (workerID * 1000)
+ workerPorts[workerID] = port
+ return port
class DNSDistTest(AssertEqualDNSMessageMixin, unittest.TestCase):
"""
from dnsdist on a separate queue, allowing the tests to check
that the queries sent from dnsdist were as expected.
"""
- _dnsDistPort = 5340
_dnsDistListeningAddr = "127.0.0.1"
- _testServerPort = 5350
_toResponderQueue = Queue()
_fromResponderQueue = Queue()
_queueTimeout = 1
"""
_config_params = ['_testServerPort']
_acl = ['127.0.0.1/32']
- _consolePort = 5199
_consoleKey = None
_healthCheckName = 'a.root-servers.net.'
_healthCheckCounter = 0
_UDPResponder = None
_TCPResponder = None
_extraStartupSleep = 0
+ _dnsDistPort = pickAvailablePort()
+ _consolePort = pickAvailablePort()
+ _testServerPort = pickAvailablePort()
@classmethod
def waitForTCPSocket(cls, ipaddress, port):
@classmethod
def startResponders(cls):
print("Launching responders..")
+ cls._testServerPort = pickAvailablePort()
cls._UDPResponder = threading.Thread(name='UDP Responder', target=cls.UDPResponder, args=[cls._testServerPort, cls._toResponderQueue, cls._fromResponderQueue])
- cls._UDPResponder.setDaemon(True)
+ cls._UDPResponder.daemon = True
cls._UDPResponder.start()
cls._TCPResponder = threading.Thread(name='TCP Responder', target=cls.TCPResponder, args=[cls._testServerPort, cls._toResponderQueue, cls._fromResponderQueue])
- cls._TCPResponder.setDaemon(True)
+ cls._TCPResponder.daemon = True
cls._TCPResponder.start()
cls.waitForTCPSocket("127.0.0.1", cls._testServerPort);
@classmethod
def startDNSDist(cls):
+ cls._dnsDistPort = pickAvailablePort()
+ cls._consolePort = pickAvailablePort()
+
print("Launching dnsdist..")
confFile = os.path.join('configs', 'dnsdist_%s.conf' % (cls.__name__))
params = tuple([getattr(cls, param) for param in cls._config_params])
print(params)
with open(confFile, 'w') as conf:
conf.write("-- Autogenerated by dnsdisttests.py\n")
+ conf.write(f"-- dnsdist will listen on {cls._dnsDistPort}")
conf.write(cls._config_template % params)
conf.write("setSecurityPollSuffix('')")
else:
expectedOutput = ('Configuration \'%s\' OK!\n' % (confFile)).encode()
if not cls._verboseMode and output != expectedOutput:
- raise AssertionError('dnsdist --check-config failed: %s' % output)
+ raise AssertionError('dnsdist --check-config failed: %s (expected %s)' % (output, expectedOutput))
logFile = os.path.join('configs', 'dnsdist_%s.log' % (cls.__name__))
with open(logFile, 'w') as fdLog:
@classmethod
def _ResponderIncrementCounter(cls):
- if threading.currentThread().name in cls._responsesCounter:
- cls._responsesCounter[threading.currentThread().name] += 1
+ if threading.current_thread().name in cls._responsesCounter:
+ cls._responsesCounter[threading.current_thread().name] += 1
else:
- cls._responsesCounter[threading.currentThread().name] = 1
+ cls._responsesCounter[threading.current_thread().name] = 1
@classmethod
def _getResponse(cls, request, fromQueue, toQueue, synthesize=None):
thread = threading.Thread(name='TCP Connection Handler',
target=cls.handleTCPConnection,
args=[conn, fromQueue, toQueue, trailingDataResponse, multipleResponses, callback, partialWrite])
- thread.setDaemon(True)
+ thread.daemon = True
thread.start()
else:
cls.handleTCPConnection(conn, fromQueue, toQueue, trailingDataResponse, multipleResponses, callback, partialWrite)
thread = threading.Thread(name='DoH Connection Handler',
target=cls.handleDoHConnection,
args=[config, conn, fromQueue, toQueue, trailingDataResponse, multipleResponses, callback, tlsContext, useProxyProtocol])
- thread.setDaemon(True)
+ thread.daemon = True
thread.start()
sock.close()
dnspython>=2.2.0
-nose>=1.3.7
+pytest
+pytest-xdist
libnacl>=1.4.3,<1.7
requests>=2.1.0
protobuf>=3.0
out=$(mktemp)
set -o pipefail
-if ! nosetests --with-xunit $@ 2>&1 | tee "${out}" ; then
+if ! pytest --junitxml=pytest.xml --dist=loadfile -n auto $@ 2>&1 | tee "${out}" ; then
for log in configs/*.log; do
echo "=== ${log} ==="
cat "${log}"
echo
done
- echo "=== nosetests log ==="
+ echo "=== pytest log ==="
cat "${out}"
- echo "=== end of nosetests log ==="
+ echo "=== end of pytest log ==="
false
fi
rm -f "${out}"
import requests
import socket
import time
-from dnsdisttests import DNSDistTest
+from dnsdisttests import DNSDistTest, pickAvailablePort
class APITestsBase(DNSDistTest):
__test__ = False
_webTimeout = 5.0
- _webServerPort = 8083
+ _webServerPort = pickAvailablePort()
_webServerBasicAuthPassword = 'secret'
_webServerBasicAuthPasswordHashed = '$scrypt$ln=10,p=1,r=8$6DKLnvUYEeXWh3JNOd3iwg==$kSrhdHaRbZ7R74q3lGBqO1xetgxRxhmWzYJ2Qvfm7JM='
_webServerAPIKey = 'apisecret'
import threading
import time
import dns
-from dnsdisttests import DNSDistTest
+from dnsdisttests import DNSDistTest, pickAvailablePort
class TestAXFR(DNSDistTest):
# because, contrary to the other ones, its
# TCP responder allows multiple responses and we don't want
# to mix things up.
- _testServerPort = 5370
+ _testServerPort = pickAvailablePort()
_config_template = """
newServer{address="127.0.0.1:%s"}
"""
print("Launching responders..")
cls._UDPResponder = threading.Thread(name='UDP Responder', target=cls.UDPResponder, args=[cls._testServerPort, cls._toResponderQueue, cls._fromResponderQueue])
- cls._UDPResponder.setDaemon(True)
+ cls._UDPResponder.daemon = True
cls._UDPResponder.start()
cls._TCPResponder = threading.Thread(name='TCP Responder', target=cls.TCPResponder, args=[cls._testServerPort, cls._toResponderQueue, cls._fromResponderQueue, False, True, None, None, True])
- cls._TCPResponder.setDaemon(True)
+ cls._TCPResponder.daemon = True
cls._TCPResponder.start()
def setUp(self):
query = query.to_wire()
sock.send(query)
(data, remote) = sock.recvfrom(4096)
- self.assertEquals(remote[0], '127.0.0.2')
+ self.assertEqual(remote[0], '127.0.0.2')
except socket.timeout:
data = None
data = query.to_wire()
sock.send(data)
(data, remote) = sock.recvfrom(4096)
- self.assertEquals(remote[0], '127.0.0.2')
+ self.assertEqual(remote[0], '127.0.0.2')
except socket.timeout:
data = None
data = query.to_wire()
sock.send(data)
(data, remote) = sock.recvfrom(4096)
- self.assertEquals(remote[0], '::1')
+ self.assertEqual(remote[0], '::1')
except socket.timeout:
data = None
data = query.to_wire()
sock.send(data)
(data, remote) = sock.recvfrom(4096)
- self.assertEquals(remote[0], '127.0.1.19')
+ self.assertEqual(remote[0], '127.0.1.19')
except socket.timeout:
data = None
import threading
import unittest
import dns
-from dnsdisttests import DNSDistTest
+from dnsdisttests import DNSDistTest, pickAvailablePort
def AsyncResponder(listenPath, responsePath):
# Make sure the socket does not already exist
asyncResponderSocketPath = '/tmp/async-responder.sock'
dnsdistSocketPath = '/tmp/dnsdist.sock'
asyncResponder = threading.Thread(name='Asynchronous Responder', target=AsyncResponder, args=[asyncResponderSocketPath, dnsdistSocketPath])
-asyncResponder.setDaemon(True)
+asyncResponder.daemon = True
asyncResponder.start()
class AsyncTests(object):
_serverCert = 'server.chain'
_serverName = 'tls.tests.dnsdist.org'
_caCert = 'ca.pem'
- _tlsServerPort = 8453
- _dohServerPort = 8443
+ _tlsServerPort = pickAvailablePort()
+ _dohServerPort = pickAvailablePort()
_dohBaseURL = ("https://%s:%d/" % (_serverName, _dohServerPort))
_config_template = """
_serverCert = 'server.chain'
_serverName = 'tls.tests.dnsdist.org'
_caCert = 'ca.pem'
- _tlsServerPort = 8453
- _dohServerPort = 8443
+ _tlsServerPort = pickAvailablePort()
+ _dohServerPort = pickAvailablePort()
_dohBaseURL = ("https://%s:%d/" % (_serverName, _dohServerPort))
_config_template = """
from dnsdisttests import DNSDistTest
class TestBackendDiscovery(DNSDistTest):
+ # these ports are hardcoded for now, sorry about that!
_noSVCBackendPort = 10600
_svcNoUpgradeBackendPort = 10601
_svcUpgradeDoTBackendPort = 10602
tlsContext.load_cert_chain('server.chain', 'server.key')
TCPNoSVCResponder = threading.Thread(name='TCP no SVC Responder', target=cls.TCPResponder, args=[cls._noSVCBackendPort, cls._toResponderQueue, cls._fromResponderQueue, True, False, cls.NoSVCCallback])
- TCPNoSVCResponder.setDaemon(True)
+ TCPNoSVCResponder.daemon = True
TCPNoSVCResponder.start()
TCPNoUpgradeResponder = threading.Thread(name='TCP no upgrade Responder', target=cls.TCPResponder, args=[cls._svcNoUpgradeBackendPort, cls._toResponderQueue, cls._fromResponderQueue, False, False, cls.NoUpgradePathCallback])
- TCPNoUpgradeResponder.setDaemon(True)
+ TCPNoUpgradeResponder.daemon = True
TCPNoUpgradeResponder.start()
# this one is special, does partial writes!
TCPUpgradeToDoTResponder = threading.Thread(name='TCP upgrade to DoT Responder', target=cls.TCPResponder, args=[cls._svcUpgradeDoTBackendPort, cls._toResponderQueue, cls._fromResponderQueue, False, False, cls.UpgradeDoTCallback, None, False, '127.0.0.1', True])
- TCPUpgradeToDoTResponder.setDaemon(True)
+ TCPUpgradeToDoTResponder.daemon = True
TCPUpgradeToDoTResponder.start()
# and the corresponding DoT responder
UpgradedDoTResponder = threading.Thread(name='DoT upgraded Responder', target=cls.TCPResponder, args=[10652, cls._toResponderQueue, cls._fromResponderQueue, False, False, None, tlsContext])
- UpgradedDoTResponder.setDaemon(True)
+ UpgradedDoTResponder.daemon = True
UpgradedDoTResponder.start()
TCPUpgradeToDoHResponder = threading.Thread(name='TCP upgrade to DoH Responder', target=cls.TCPResponder, args=[cls._svcUpgradeDoHBackendPort, cls._toResponderQueue, cls._fromResponderQueue, False, False, cls.UpgradeDoHCallback])
- TCPUpgradeToDoHResponder.setDaemon(True)
+ TCPUpgradeToDoHResponder.daemon = True
TCPUpgradeToDoHResponder.start()
# and the corresponding DoH responder
UpgradedDOHResponder = threading.Thread(name='DOH Responder', target=cls.DOHResponder, args=[10653, cls._toResponderQueue, cls._fromResponderQueue, False, False, None, tlsContext])
- UpgradedDOHResponder.setDaemon(True)
+ UpgradedDOHResponder.daemon = True
UpgradedDOHResponder.start()
TCPUpgradeToDoTDifferentAddrResponder = threading.Thread(name='TCP upgrade to DoT different addr 1 Responder', target=cls.TCPResponder, args=[cls._svcUpgradeDoTBackendDifferentAddrPort1, cls._toResponderQueue, cls._fromResponderQueue, False, False, cls.UpgradeDoTDifferentAddr1Callback])
- TCPUpgradeToDoTDifferentAddrResponder.setDaemon(True)
+ TCPUpgradeToDoTDifferentAddrResponder.daemon = True
TCPUpgradeToDoTDifferentAddrResponder.start()
# and the corresponding DoT responder
UpgradedDoTResponder = threading.Thread(name='DoT upgraded different addr 1 Responder', target=cls.TCPResponder, args=[10654, cls._toResponderQueue, cls._fromResponderQueue, False, False, None, tlsContext, False, '127.0.0.2'])
- UpgradedDoTResponder.setDaemon(True)
+ UpgradedDoTResponder.daemon = True
UpgradedDoTResponder.start()
TCPUpgradeToDoTDifferentAddrResponder = threading.Thread(name='TCP upgrade to DoT different addr 2 Responder', target=cls.TCPResponder, args=[cls._svcUpgradeDoTBackendDifferentAddrPort2, cls._toResponderQueue, cls._fromResponderQueue, False, False, cls.UpgradeDoTDifferentAddr2Callback, None, False, '127.0.0.2'])
- TCPUpgradeToDoTDifferentAddrResponder.setDaemon(True)
+ TCPUpgradeToDoTDifferentAddrResponder.daemon = True
TCPUpgradeToDoTDifferentAddrResponder.start()
# and the corresponding DoT responder
UpgradedDoTResponder = threading.Thread(name='DoT upgraded different addr 2 Responder', target=cls.TCPResponder, args=[10655, cls._toResponderQueue, cls._fromResponderQueue, False, False, None, tlsContext, False])
- UpgradedDoTResponder.setDaemon(True)
+ UpgradedDoTResponder.daemon = True
UpgradedDoTResponder.start()
TCPUpgradeToUnreachableDoTResponder = threading.Thread(name='TCP upgrade to unreachable DoT Responder', target=cls.TCPResponder, args=[cls._svcUpgradeDoTUnreachableBackendPort, cls._toResponderQueue, cls._fromResponderQueue, False, False, cls.UpgradeDoTUnreachableCallback])
- TCPUpgradeToUnreachableDoTResponder.setDaemon(True)
+ TCPUpgradeToUnreachableDoTResponder.daemon = True
TCPUpgradeToUnreachableDoTResponder.start()
# and NO corresponding DoT responder
# this is not a mistake!
BrokenResponseResponder = threading.Thread(name='Broken response Responder', target=cls.TCPResponder, args=[cls._svcBrokenDNSResponseBackendPort, cls._toResponderQueue, cls._fromResponderQueue, False, False, cls.BrokenResponseCallback])
- BrokenResponseResponder.setDaemon(True)
+ BrokenResponseResponder.daemon = True
BrokenResponseResponder.start()
DOHMissingPathResponder = threading.Thread(name='DoH missing path Responder', target=cls.TCPResponder, args=[cls._svcUpgradeDoHBackendWithoutPathPort, cls._toResponderQueue, cls._fromResponderQueue, False, False, cls.UpgradeDoHMissingPathCallback])
- DOHMissingPathResponder.setDaemon(True)
+ DOHMissingPathResponder.daemon = True
DOHMissingPathResponder.start()
EOFResponder = threading.Thread(name='EOF Responder', target=cls.TCPResponder, args=[cls._eofBackendPort, cls._toResponderQueue, cls._fromResponderQueue, False, False, cls.EOFCallback])
- EOFResponder.setDaemon(True)
+ EOFResponder.daemon = True
EOFResponder.start()
ServFailResponder = threading.Thread(name='ServFail Responder', target=cls.TCPResponder, args=[cls._servfailBackendPort, cls._toResponderQueue, cls._fromResponderQueue, False, False, cls.ServFailCallback])
- ServFailResponder.setDaemon(True)
+ ServFailResponder.daemon = True
ServFailResponder.start()
WrongNameResponder = threading.Thread(name='Wrong Name Responder', target=cls.TCPResponder, args=[cls._wrongNameBackendPort, cls._toResponderQueue, cls._fromResponderQueue, False, False, cls.WrongNameCallback])
- WrongNameResponder.setDaemon(True)
+ WrongNameResponder.daemon = True
WrongNameResponder.start()
WrongIDResponder = threading.Thread(name='Wrong ID Responder', target=cls.TCPResponder, args=[cls._wrongIDBackendPort, cls._toResponderQueue, cls._fromResponderQueue, False, False, cls.WrongIDCallback])
- WrongIDResponder.setDaemon(True)
+ WrongIDResponder.daemon = True
WrongIDResponder.start()
TooManyQuestionsResponder = threading.Thread(name='Too many questions Responder', target=cls.TCPResponder, args=[cls._tooManyQuestionsBackendPort, cls._toResponderQueue, cls._fromResponderQueue, False, False, cls.TooManyQuestionsCallback])
- TooManyQuestionsResponder.setDaemon(True)
+ TooManyQuestionsResponder.daemon = True
TooManyQuestionsResponder.start()
badQNameResponder = threading.Thread(name='Bad QName Responder', target=cls.TCPResponder, args=[cls._badQNameBackendPort, cls._toResponderQueue, cls._fromResponderQueue, False, False, cls.BadQNameCallback])
- badQNameResponder.setDaemon(True)
+ badQNameResponder.daemon = True
badQNameResponder.start()
TCPUpgradeToDoTNoPortResponder = threading.Thread(name='TCP upgrade to DoT (no port) Responder', target=cls.TCPResponder, args=[cls._svcUpgradeDoTNoPortBackendPort, cls._toResponderQueue, cls._fromResponderQueue, False, False, cls.UpgradeDoTNoPortCallback])
- TCPUpgradeToDoTNoPortResponder.setDaemon(True)
+ TCPUpgradeToDoTNoPortResponder.daemon = True
TCPUpgradeToDoTNoPortResponder.start()
TCPUpgradeToDoHNoPortResponder = threading.Thread(name='TCP upgrade to DoH (no port) Responder', target=cls.TCPResponder, args=[cls._svcUpgradeDoHNoPortBackendPort, cls._toResponderQueue, cls._fromResponderQueue, False, False, cls.UpgradeDoHNoPortCallback])
- TCPUpgradeToDoHNoPortResponder.setDaemon(True)
+ TCPUpgradeToDoHNoPortResponder.daemon = True
TCPUpgradeToDoHNoPortResponder.start()
# in this particular case, the upgraded backend
# does not replace the existing one and thus
# the health-check is forced to auto (or lazy auto)
- self.assertEquals(tokens[2], 'up')
+ self.assertEqual(tokens[2], 'up')
else:
- self.assertEquals(tokens[2], 'UP')
+ self.assertEqual(tokens[2], 'UP')
pool = ''
if len(tokens) == 14:
pool = tokens[13]
import threading
import clientsubnetoption
import dns
-from dnsdisttests import DNSDistTest
+from dnsdisttests import DNSDistTest, pickAvailablePort
def responseCallback(request):
if len(request.question) != 1:
# this test suite uses a different responder port
# because, contrary to the other ones, its
# responders send raw, broken data
- _testServerPort = 5400
+ _testServerPort = pickAvailablePort()
_config_template = """
setECSSourcePrefixV4(32)
newServer{address="127.0.0.1:%s", useClientSubnet=true}
# Returns broken data for non-healthcheck queries
cls._UDPResponder = threading.Thread(name='UDP Responder', target=cls.UDPResponder, args=[cls._testServerPort, cls._toResponderQueue, cls._fromResponderQueue, False, responseCallback])
- cls._UDPResponder.setDaemon(True)
+ cls._UDPResponder.daemon = True
cls._UDPResponder.start()
# Returns broken data for non-healthcheck queries
cls._TCPResponder = threading.Thread(name='TCP Responder', target=cls.TCPResponder, args=[cls._testServerPort, cls._toResponderQueue, cls._fromResponderQueue, False, False, responseCallback])
- cls._TCPResponder.setDaemon(True)
+ cls._TCPResponder.daemon = True
cls._TCPResponder.start()
def testUDPWithInvalidAnswer(self):
import clientsubnetoption
import cookiesoption
import requests
-from dnsdisttests import DNSDistTest
+from dnsdisttests import DNSDistTest, pickAvailablePort
class TestCaching(DNSDistTest):
class TestAPICache(DNSDistTest):
_webTimeout = 2.0
- _webServerPort = 8083
+ _webServerPort = pickAvailablePort()
_webServerBasicAuthPassword = 'secret'
_webServerBasicAuthPasswordHashed = '$scrypt$ln=10,p=1,r=8$6DKLnvUYEeXWh3JNOd3iwg==$kSrhdHaRbZ7R74q3lGBqO1xetgxRxhmWzYJ2Qvfm7JM='
_webServerAPIKey = 'apisecret'
import socket
import sys
import time
-from dnsdisttests import DNSDistTest, Queue
+from dnsdisttests import DNSDistTest, Queue, pickAvailablePort
class TestCarbon(DNSDistTest):
- _carbonServer1Port = 8000
+ _carbonServer1Port = pickAvailablePort()
_carbonServer1Name = "carbonname1"
- _carbonServer2Port = 8001
+ _carbonServer2Port = pickAvailablePort()
_carbonServer2Name = "carbonname2"
_carbonQueue1 = Queue()
_carbonQueue2 = Queue()
cls._carbonQueue1.put(lines, True, timeout=2.0)
else:
cls._carbonQueue2.put(lines, True, timeout=2.0)
- if threading.currentThread().name in cls._carbonCounters:
- cls._carbonCounters[threading.currentThread().name] += 1
+ if threading.current_thread().name in cls._carbonCounters:
+ cls._carbonCounters[threading.current_thread().name] += 1
else:
- cls._carbonCounters[threading.currentThread().name] = 1
+ cls._carbonCounters[threading.current_thread().name] = 1
conn.close()
sock.close()
@classmethod
def startResponders(cls):
cls._CarbonResponder1 = threading.Thread(name='Carbon Responder 1', target=cls.CarbonResponder, args=[cls._carbonServer1Port])
- cls._CarbonResponder1.setDaemon(True)
+ cls._CarbonResponder1.daemon = True
cls._CarbonResponder1.start()
cls._CarbonResponder2 = threading.Thread(name='Carbon Responder 2', target=cls.CarbonResponder, args=[cls._carbonServer2Port])
- cls._CarbonResponder2.setDaemon(True)
+ cls._CarbonResponder2.daemon = True
cls._CarbonResponder2.start()
def isfloat(self, num):
import time
import dns
import dns.message
-from dnsdisttests import DNSDistTest
+from dnsdisttests import DNSDistTest, pickAvailablePort
import dnscrypt
class DNSCryptTest(DNSDistTest):
Be careful to change the _providerFingerprint below if you want to regenerate the keys.
"""
- _dnsDistPort = 5340
- _dnsDistPortDNSCrypt = 8443
+ _dnsDistPortDNSCrypt = pickAvailablePort()
_consoleKey = DNSDistTest.generateConsoleKey()
_consoleKeyB64 = base64.b64encode(_consoleKey).decode('ascii')
"""
DNSCrypt: encrypted A query
"""
- client = dnscrypt.DNSCryptClient(self._providerName, self._providerFingerprint, "127.0.0.1", 8443)
+ client = dnscrypt.DNSCryptClient(self._providerName, self._providerFingerprint, "127.0.0.1", self._dnsDistPortDNSCrypt)
name = 'a.dnscrypt.tests.powerdns.com.'
query = dns.message.make_query(name, 'A', 'IN')
response = dns.message.make_response(query)
the padding into account) and check that the response
is truncated.
"""
- client = dnscrypt.DNSCryptClient(self._providerName, self._providerFingerprint, "127.0.0.1", 8443)
+ client = dnscrypt.DNSCryptClient(self._providerName, self._providerFingerprint, "127.0.0.1", self._dnsDistPortDNSCrypt)
name = 'smallquerylargeresponse.dnscrypt.tests.powerdns.com.'
query = dns.message.make_query(name, 'TXT', 'IN', use_edns=True, payload=4096)
response = dns.message.make_response(query)
"""
DNSCrypt: certificate rotation
"""
- client = dnscrypt.DNSCryptClient(self._providerName, self._providerFingerprint, "127.0.0.1", 8443)
+ client = dnscrypt.DNSCryptClient(self._providerName, self._providerFingerprint, "127.0.0.1", self._dnsDistPortDNSCrypt)
client.refreshResolverCertificates()
cert = client.getResolverCertificate()
"""
DNSCrypt: Test DNSQuestion.Protocol over UDP
"""
- client = dnscrypt.DNSCryptClient(self._providerName, self._providerFingerprint, "127.0.0.1", 8443)
+ client = dnscrypt.DNSCryptClient(self._providerName, self._providerFingerprint, "127.0.0.1", self._dnsDistPortDNSCrypt)
name = 'udp.protocols.dnscrypt.tests.powerdns.com.'
query = dns.message.make_query(name, 'A', 'IN')
response = dns.message.make_response(query)
"""
DNSCrypt: Test DNSQuestion.Protocol over TCP
"""
- client = dnscrypt.DNSCryptClient(self._providerName, self._providerFingerprint, "127.0.0.1", 8443)
+ client = dnscrypt.DNSCryptClient(self._providerName, self._providerFingerprint, "127.0.0.1", self._dnsDistPortDNSCrypt)
name = 'tcp.protocols.dnscrypt.tests.powerdns.com.'
query = dns.message.make_query(name, 'A', 'IN')
response = dns.message.make_response(query)
DNSCrypt: encrypted A query served from cache
"""
misses = 0
- client = dnscrypt.DNSCryptClient(self._providerName, self._providerFingerprint, "127.0.0.1", 8443)
+ client = dnscrypt.DNSCryptClient(self._providerName, self._providerFingerprint, "127.0.0.1", self._dnsDistPortDNSCrypt)
name = 'cacheda.dnscrypt.tests.powerdns.com.'
query = dns.message.make_query(name, 'A', 'IN')
response = dns.message.make_response(query)
"""
DNSCrypt: automatic certificate rotation
"""
- client = dnscrypt.DNSCryptClient(self._providerName, self._providerFingerprint, "127.0.0.1", 8443)
+ client = dnscrypt.DNSCryptClient(self._providerName, self._providerFingerprint, "127.0.0.1", self._dnsDistPortDNSCrypt)
client.refreshResolverCertificates()
cert = client.getResolverCertificate()
import clientsubnetoption
from dnsdistdohtests import DNSDistDOHTest
+from dnsdisttests import pickAvailablePort
import pycurl
from io import BytesIO
_serverCert = 'server.chain'
_serverName = 'tls.tests.dnsdist.org'
_caCert = 'ca.pem'
- _dohServerPort = 8443
+ _dohServerPort = pickAvailablePort()
_customResponseHeader1 = 'access-control-allow-origin: *'
_customResponseHeader2 = 'user-agent: derp'
_dohBaseURL = ("https://%s:%d/" % (_serverName, _dohServerPort))
_serverCert = 'server.chain'
_serverName = 'tls.tests.dnsdist.org'
_caCert = 'ca.pem'
- _dohServerPort = 8443
+ _dohServerPort = pickAvailablePort()
_dohBaseURL = ("https://%s:%d/" % (_serverName, _dohServerPort))
_config_template = """
newServer{address="127.0.0.1:%s"}
_serverCert = 'server.chain'
_serverName = 'tls.tests.dnsdist.org'
_caCert = 'ca.pem'
- _dohServerPort = 8443
+ _dohServerPort = pickAvailablePort()
_dohBaseURL = ("https://%s:%d/" % (_serverName, _dohServerPort))
_config_template = """
newServer{address="127.0.0.1:%s", useClientSubnet=true}
class TestDOHOverHTTP(DNSDistDOHTest):
- _dohServerPort = 8480
+ _dohServerPort = pickAvailablePort()
_serverName = 'tls.tests.dnsdist.org'
_dohBaseURL = ("http://%s:%d/dns-query" % (_serverName, _dohServerPort))
_config_template = """
addDOHLocal("127.0.0.1:%s")
"""
_config_params = ['_testServerPort', '_dohServerPort']
- _checkConfigExpectedOutput = b"""No certificate provided for DoH endpoint 127.0.0.1:8480, running in DNS over HTTP mode instead of DNS over HTTPS
+ _checkConfigExpectedOutput = b"""No certificate provided for DoH endpoint 127.0.0.1:%d, running in DNS over HTTP mode instead of DNS over HTTPS
Configuration 'configs/dnsdist_TestDOHOverHTTP.conf' OK!
-"""
+""" % (_dohServerPort)
def testDOHSimple(self):
"""
_serverCert = 'server.chain'
_serverName = 'tls.tests.dnsdist.org'
_caCert = 'ca.pem'
- _dohServerPort = 8443
+ _dohServerPort = pickAvailablePort()
_dohBaseURL = ("https://%s:%d/dns-query" % (_serverName, _dohServerPort))
_config_template = """
newServer{address="127.0.0.1:%s"}
_serverCert = 'server.chain'
_serverName = 'tls.tests.dnsdist.org'
_caCert = 'ca.pem'
- _dohServerPort = 8443
+ _dohServerPort = pickAvailablePort()
_dohBaseURL = ("https://%s:%d/" % (_serverName, _dohServerPort))
_config_template = """
newServer{address="127.0.0.1:%s"}
_serverCert = 'server.chain'
_serverName = 'tls.tests.dnsdist.org'
_caCert = 'ca.pem'
- _dohServerPort = 8443
+ _dohServerPort = pickAvailablePort()
_customResponseHeader1 = 'access-control-allow-origin: *'
_customResponseHeader2 = 'user-agent: derp'
_dohBaseURL = ("https://%s:%d/" % (_serverName, _dohServerPort))
_serverCert = 'server.chain'
_serverName = 'tls.tests.dnsdist.org'
_caCert = 'ca.pem'
- _dohServerPort = 8443
+ _dohServerPort = pickAvailablePort()
_dohBaseURL = ("https://%s:%d/" % (_serverName, _dohServerPort))
_config_template = """
newServer{address="127.0.0.1:%s"}
_serverCert = 'server.chain'
_serverName = 'tls.tests.dnsdist.org'
_caCert = 'ca.pem'
- _dohServerPort = 8443
+ _dohServerPort = pickAvailablePort()
_dohBaseURL = ("https://%s:%d/" % (_serverName, _dohServerPort))
_config_template = """
newServer{address="127.0.0.1:%s"}
# this test suite uses a different responder port
# because it uses a different health check configuration
- _testServerPort = 5395
+ _testServerPort = pickAvailablePort()
_answerUnexpected = True
_serverKey = 'server.key'
_serverCert = 'server.chain'
_serverName = 'tls.tests.dnsdist.org'
_caCert = 'ca.pem'
- _dohServerPort = 8443
+ _dohServerPort = pickAvailablePort()
_dohBaseURL = ("https://%s:%d/" % (_serverName, _dohServerPort))
_skipListeningOnCL = True
_serverCert = 'server.chain'
_serverName = 'tls.tests.dnsdist.org'
_caCert = 'ca.pem'
- _dohServerPort = 8443
+ _dohServerPort = pickAvailablePort()
_customResponseHeader1 = 'access-control-allow-origin: *'
_customResponseHeader2 = 'user-agent: derp'
_dohBaseURL = ("https://%s:%d/" % (_serverName, _dohServerPort))
_pkcs12Password = 'passw0rd'
_serverName = 'tls.tests.dnsdist.org'
_caCert = 'ca.pem'
- _dohServerPort = 8443
+ _dohServerPort = pickAvailablePort()
_dohBaseURL = ("https://%s:%d/" % (_serverName, _dohServerPort))
_config_template = """
newServer{address="127.0.0.1:%s"}
_serverCert = 'server.chain'
_serverName = 'tls.tests.dnsdist.org'
_caCert = 'ca.pem'
- _dohServerPort = 8443
+ _dohServerPort = pickAvailablePort()
_dohBaseURL = ("https://%s:%d/" % (_serverName, _dohServerPort))
_config_template = """
newServer{address="127.0.0.1:%s", tcpOnly=true}
class TestDOHLimits(DNSDistDOHTest):
_serverName = 'tls.tests.dnsdist.org'
_caCert = 'ca.pem'
- _dohServerPort = 8443
+ _dohServerPort = pickAvailablePort()
_dohBaseURL = ("https://%s:%d/" % (_serverName, _dohServerPort))
_serverKey = 'server.key'
_serverCert = 'server.chain'
import struct
import sys
import time
-from dnsdisttests import DNSDistTest, Queue
+from dnsdisttests import DNSDistTest, Queue, pickAvailablePort
import dns
import dnstap_pb2
class TestDnstapOverRemoteLogger(DNSDistTest):
- _remoteLoggerServerPort = 4243
+ _remoteLoggerServerPort = pickAvailablePort()
_remoteLoggerQueue = Queue()
_remoteLoggerCounter = 0
_config_params = ['_testServerPort', '_remoteLoggerServerPort']
DNSDistTest.startResponders()
cls._remoteLoggerListener = threading.Thread(name='RemoteLogger Listener', target=cls.RemoteLoggerListener, args=[cls._remoteLoggerServerPort])
- cls._remoteLoggerListener.setDaemon(True)
+ cls._remoteLoggerListener.daemon = True
cls._remoteLoggerListener.start()
def getFirstDnstap(self):
DNSDistTest.startResponders()
cls._fstrmLoggerListener = threading.Thread(name='FrameStreamUnixListener', target=cls.FrameStreamUnixListener, args=[cls._fstrmLoggerAddress])
- cls._fstrmLoggerListener.setDaemon(True)
+ cls._fstrmLoggerListener.daemon = True
cls._fstrmLoggerListener.start()
def getFirstDnstap(self):
class TestDnstapOverFrameStreamTcpLogger(DNSDistTest):
- _fstrmLoggerPort = 4000
+ _fstrmLoggerPort = pickAvailablePort()
_fstrmLoggerQueue = Queue()
_fstrmLoggerCounter = 0
_config_params = ['_testServerPort', '_fstrmLoggerPort']
DNSDistTest.startResponders()
cls._fstrmLoggerListener = threading.Thread(name='FrameStreamUnixListener', target=cls.FrameStreamUnixListener, args=[cls._fstrmLoggerPort])
- cls._fstrmLoggerListener.setDaemon(True)
+ cls._fstrmLoggerListener.daemon = True
cls._fstrmLoggerListener.start()
def getFirstDnstap(self):
import socket
import time
import dns
-from dnsdisttests import DNSDistTest
+from dnsdisttests import DNSDistTest, pickAvailablePort
try:
range = xrange
except NameError:
class DynBlocksTest(DNSDistTest):
_webTimeout = 2.0
- _webServerPort = 8083
+ _webServerPort = pickAvailablePort()
_webServerBasicAuthPassword = 'secret'
_webServerBasicAuthPasswordHashed = '$scrypt$ln=10,p=1,r=8$6DKLnvUYEeXWh3JNOd3iwg==$kSrhdHaRbZ7R74q3lGBqO1xetgxRxhmWzYJ2Qvfm7JM='
_webServerAPIKey = 'apisecret'
import threading
import time
import dns
-from dnsdisttests import DNSDistTest
+from dnsdisttests import DNSDistTest, pickAvailablePort
class HealthCheckTest(DNSDistTest):
_consoleKey = DNSDistTest.generateConsoleKey()
_consoleKeyB64 = base64.b64encode(_consoleKey).decode('ascii')
_webTimeout = 2.0
- _webServerPort = 8083
+ _webServerPort = pickAvailablePort()
_webServerAPIKey = 'apisecret'
_webServerAPIKeyHashed = '$scrypt$ln=10,p=1,r=8$9v8JxDfzQVyTpBkTbkUqYg==$bDQzAOHeK1G9UvTPypNhrX48w974ZXbFPtRKS34+aso='
_config_params = ['_consoleKeyB64', '_consolePort', '_webServerPort', '_webServerAPIKeyHashed', '_testServerPort']
class TestDefaultHealthCheck(HealthCheckTest):
# this test suite uses a different responder port
# because we need fresh counters
- _testServerPort = 5380
+ _testServerPort = pickAvailablePort()
def testDefault(self):
"""
class TestHealthCheckForcedUP(HealthCheckTest):
# this test suite uses a different responder port
# because we need fresh counters
- _testServerPort = 5381
+ _testServerPort = pickAvailablePort()
_config_template = """
setKey("%s")
class TestHealthCheckForcedDown(HealthCheckTest):
# this test suite uses a different responder port
# because we need fresh counters
- _testServerPort = 5382
+ _testServerPort = pickAvailablePort()
_config_template = """
setKey("%s")
class TestHealthCheckCustomName(HealthCheckTest):
# this test suite uses a different responder port
# because it uses a different health check name
- _testServerPort = 5383
+ _testServerPort = pickAvailablePort()
_healthCheckName = 'powerdns.com.'
_config_params = ['_consoleKeyB64', '_consolePort', '_webServerPort', '_webServerAPIKeyHashed', '_testServerPort', '_healthCheckName']
class TestHealthCheckCustomNameNoAnswer(HealthCheckTest):
# this test suite uses a different responder port
# because it uses a different health check configuration
- _testServerPort = 5384
+ _testServerPort = pickAvailablePort()
_answerUnexpected = False
_config_template = """
class TestHealthCheckCustomFunction(HealthCheckTest):
# this test suite uses a different responder port
# because it uses a different health check configuration
- _testServerPort = 5385
+ _testServerPort = pickAvailablePort()
_answerUnexpected = False
_healthCheckName = 'powerdns.com.'
class TestLazyHealthChecks(HealthCheckTest):
_extraStartupSleep = 1
- _do53Port = 10700
- _dotPort = 10701
- _dohPort = 10702
+ _do53Port = pickAvailablePort()
+ _dotPort = pickAvailablePort()
+ _dohPort = pickAvailablePort()
_consoleKey = DNSDistTest.generateConsoleKey()
_consoleKeyB64 = base64.b64encode(_consoleKey).decode('ascii')
tlsContext.load_cert_chain('server.chain', 'server.key')
Do53Responder = threading.Thread(name='Do53 Lazy Responder', target=cls.UDPResponder, args=[cls._do53Port, cls._toResponderQueue, cls._fromResponderQueue, False, cls.Do53Callback])
- Do53Responder.setDaemon(True)
+ Do53Responder.daemon = True
Do53Responder.start()
Do53TCPResponder = threading.Thread(name='Do53 TCP Lazy Responder', target=cls.TCPResponder, args=[cls._do53Port, cls._toResponderQueue, cls._fromResponderQueue, False, False, cls.Do53Callback])
- Do53TCPResponder.setDaemon(True)
+ Do53TCPResponder.daemon = True
Do53TCPResponder.start()
DoTResponder = threading.Thread(name='DoT Lazy Responder', target=cls.TCPResponder, args=[cls._dotPort, cls._toResponderQueue, cls._fromResponderQueue, False, False, cls.DoTCallback, tlsContext])
- DoTResponder.setDaemon(True)
+ DoTResponder.daemon = True
DoTResponder.start()
DoHResponder = threading.Thread(name='DoH Lazy Responder', target=cls.DOHResponder, args=[cls._dohPort, cls._toResponderQueue, cls._fromResponderQueue, False, False, cls.DoHCallback, tlsContext])
- DoHResponder.setDaemon(True)
+ DoHResponder.daemon = True
DoHResponder.start()
def testDo53Lazy(self):
import threading
import unittest
import dns
-from dnsdisttests import DNSDistTest
+from dnsdisttests import DNSDistTest, pickAvailablePort
class TestRuleMetrics(DNSDistTest):
addAction('cache.metrics.tests.powerdns.com', PoolAction('cache'))
"""
_webTimeout = 2.0
- _webServerPort = 8083
+ _webServerPort = pickAvailablePort()
_webServerAPIKey = 'apisecret'
_webServerAPIKeyHashed = '$scrypt$ln=10,p=1,r=8$9v8JxDfzQVyTpBkTbkUqYg==$bDQzAOHeK1G9UvTPypNhrX48w974ZXbFPtRKS34+aso='
_serverKey = 'server.key'
_serverCert = 'server.chain'
_serverName = 'tls.tests.dnsdist.org'
_caCert = 'ca.pem'
- _tlsServerPort = 8453
- _dohServerPort = 8443
+ _tlsServerPort = pickAvailablePort()
+ _dohServerPort = pickAvailablePort()
_dohBaseURL = ("https://%s:%d/" % (_serverName, _dohServerPort))
_config_params = ['_tlsServerPort', '_serverCert', '_serverKey', '_dohServerPort', '_serverCert', '_serverKey', '_testServerPort', '_webServerPort', '_webServerAPIKeyHashed']
import os
import subprocess
import unittest
-from dnsdisttests import DNSDistTest
+from dnsdisttests import DNSDistTest, pickAvailablePort
class DNSDistOCSPStaplingTest(DNSDistTest):
_ocspFile = 'server.ocsp'
_caCert = 'ca.pem'
_caKey = 'ca.key'
- _dohServerPort = 8443
+ _dohServerPort = pickAvailablePort()
_config_template = """
newServer{address="127.0.0.1:%s"}
setKey("%s")
_caCert = 'ca.pem'
# invalid OCSP file!
_ocspFile = '/dev/null'
- _tlsServerPort = 8443
+ _tlsServerPort = pickAvailablePort()
_config_template = """
newServer{address="127.0.0.1:%s"}
setKey("%s")
_ocspFile = 'server.ocsp'
_caCert = 'ca.pem'
_caKey = 'ca.key'
- _tlsServerPort = 8443
+ _tlsServerPort = pickAvailablePort()
_config_template = """
newServer{address="127.0.0.1:%s"}
setKey("%s")
"""
output = self.checkOCSPStaplingStatus('127.0.0.1', self._tlsServerPort, self._serverName, self._caCert)
self.assertIn('OCSP Response Status: successful (0x0)', output)
- self.assertEquals(self.getTLSProvider(), "gnutls")
+ self.assertEqual(self.getTLSProvider(), "gnutls")
serialNumber = self.getOCSPSerial(output)
self.assertTrue(serialNumber)
_caCert = 'ca.pem'
# invalid OCSP file!
_ocspFile = '/dev/null'
- _tlsServerPort = 8443
+ _tlsServerPort = pickAvailablePort()
_config_template = """
newServer{address="127.0.0.1:%s"}
setKey("%s")
"""
output = self.checkOCSPStaplingStatus('127.0.0.1', self._tlsServerPort, self._serverName, self._caCert)
self.assertNotIn('OCSP Response Status: successful (0x0)', output)
- self.assertEquals(self.getTLSProvider(), "gnutls")
+ self.assertEqual(self.getTLSProvider(), "gnutls")
class TestOCSPStaplingTLSOpenSSL(DNSDistOCSPStaplingTest):
_ocspFile = 'server.ocsp'
_caCert = 'ca.pem'
_caKey = 'ca.key'
- _tlsServerPort = 8443
+ _tlsServerPort = pickAvailablePort()
_config_template = """
newServer{address="127.0.0.1:%s"}
setKey("%s")
"""
output = self.checkOCSPStaplingStatus('127.0.0.1', self._tlsServerPort, self._serverName, self._caCert)
self.assertIn('OCSP Response Status: successful (0x0)', output)
- self.assertEquals(self.getTLSProvider(), "openssl")
+ self.assertEqual(self.getTLSProvider(), "openssl")
serialNumber = self.getOCSPSerial(output)
self.assertTrue(serialNumber)
_caCert = 'ca.pem'
# invalid OCSP file!
_ocspFile = '/dev/null'
- _tlsServerPort = 8443
+ _tlsServerPort = pickAvailablePort()
_config_template = """
newServer{address="127.0.0.1:%s"}
setKey("%s")
"""
output = self.checkOCSPStaplingStatus('127.0.0.1', self._tlsServerPort, self._serverName, self._caCert)
self.assertNotIn('OCSP Response Status: successful (0x0)', output)
- self.assertEquals(self.getTLSProvider(), "openssl")
+ self.assertEqual(self.getTLSProvider(), "openssl")
import struct
import time
import threading
-from dnsdisttests import DNSDistTest
+from dnsdisttests import DNSDistTest, pickAvailablePort
class OOORTCPResponder(object):
thread = threading.Thread(name='Connection Handler',
target=self.handleConnection,
args=[conn])
- thread.setDaemon(True)
+ thread.daemon = True
thread.start()
sock.close()
thread = threading.Thread(name='Connection Handler',
target=self.handleConnection,
args=[conn])
- thread.setDaemon(True)
+ thread.daemon = True
thread.start()
sock.close()
-OOORResponderPort = 5371
+OOORResponderPort = pickAvailablePort()
ooorTCPResponder = threading.Thread(name='TCP Responder', target=OOORTCPResponder, args=[OOORResponderPort])
-ooorTCPResponder.setDaemon(True)
+ooorTCPResponder.daemon = True
ooorTCPResponder.start()
-ReverseOOORResponderPort = 5372
+ReverseOOORResponderPort = pickAvailablePort()
ReverseOoorTCPResponder = threading.Thread(name='TCP Responder', target=ReverseOOORTCPResponder, args=[ReverseOOORResponderPort])
-ReverseOoorTCPResponder.setDaemon(True)
+ReverseOoorTCPResponder.daemon = True
ReverseOoorTCPResponder.start()
class TestOOORWithClientNotBackend(DNSDistTest):
import threading
import time
-from dnsdisttests import DNSDistTest
+from dnsdisttests import DNSDistTest, pickAvailablePort
class OutgoingDOHTests(object):
_webTimeout = 2.0
- _webServerPort = 8083
+ _webServerPort = pickAvailablePort()
_webServerBasicAuthPassword = 'secret'
_webServerAPIKey = 'apisecret'
_webServerBasicAuthPasswordHashed = '$scrypt$ln=10,p=1,r=8$6DKLnvUYEeXWh3JNOd3iwg==$kSrhdHaRbZ7R74q3lGBqO1xetgxRxhmWzYJ2Qvfm7JM='
class BrokenOutgoingDOHTests(object):
_webTimeout = 2.0
- _webServerPort = 8083
+ _webServerPort = pickAvailablePort()
_webServerBasicAuthPassword = 'secret'
_webServerAPIKey = 'apisecret'
_webServerBasicAuthPasswordHashed = '$scrypt$ln=10,p=1,r=8$6DKLnvUYEeXWh3JNOd3iwg==$kSrhdHaRbZ7R74q3lGBqO1xetgxRxhmWzYJ2Qvfm7JM='
class OutgoingDOHBrokenResponsesTests(object):
_webTimeout = 2.0
- _webServerPort = 8083
+ _webServerPort = pickAvailablePort()
_webServerBasicAuthPassword = 'secret'
_webServerAPIKey = 'apisecret'
_webServerBasicAuthPasswordHashed = '$scrypt$ln=10,p=1,r=8$6DKLnvUYEeXWh3JNOd3iwg==$kSrhdHaRbZ7R74q3lGBqO1xetgxRxhmWzYJ2Qvfm7JM='
self.assertEqual(response, receivedResponse)
class TestOutgoingDOHOpenSSL(DNSDistTest, OutgoingDOHTests):
- _tlsBackendPort = 10543
+ _tlsBackendPort = pickAvailablePort()
_tlsProvider = 'openssl'
_consoleKey = DNSDistTest.generateConsoleKey()
_consoleKeyB64 = base64.b64encode(_consoleKey).decode('ascii')
print("Launching DOH responder..")
cls._DOHResponder = threading.Thread(name='DOH Responder', target=cls.DOHResponder, args=[cls._tlsBackendPort, cls._toResponderQueue, cls._fromResponderQueue, False, False, None, tlsContext])
- cls._DOHResponder.setDaemon(True)
+ cls._DOHResponder.daemon = True
cls._DOHResponder.start()
class TestOutgoingDOHGnuTLS(DNSDistTest, OutgoingDOHTests):
- _tlsBackendPort = 10544
+ _tlsBackendPort = pickAvailablePort()
_tlsProvider = 'gnutls'
_consoleKey = DNSDistTest.generateConsoleKey()
_consoleKeyB64 = base64.b64encode(_consoleKey).decode('ascii')
print("Launching DOH responder..")
cls._DOHResponder = threading.Thread(name='DOH Responder', target=cls.DOHResponder, args=[cls._tlsBackendPort, cls._toResponderQueue, cls._fromResponderQueue, False, False, None, tlsContext])
- cls._DOHResponder.setDaemon(True)
+ cls._DOHResponder.daemon = True
cls._DOHResponder.start()
class TestOutgoingDOHOpenSSLWrongCertName(DNSDistTest, BrokenOutgoingDOHTests):
- _tlsBackendPort = 10545
+ _tlsBackendPort = pickAvailablePort()
_config_params = ['_tlsBackendPort', '_webServerPort', '_webServerBasicAuthPasswordHashed', '_webServerAPIKeyHashed']
_config_template = """
setMaxTCPClientThreads(1)
print("Launching DOH responder..")
cls._DOHResponder = threading.Thread(name='DOH Responder', target=cls.DOHResponder, args=[cls._tlsBackendPort, cls._toResponderQueue, cls._fromResponderQueue, False, False, None, tlsContext])
- cls._DOHResponder.setDaemon(True)
+ cls._DOHResponder.daemon = True
cls._DOHResponder.start()
class TestOutgoingDOHGnuTLSWrongCertName(DNSDistTest, BrokenOutgoingDOHTests):
- _tlsBackendPort = 10546
+ _tlsBackendPort = pickAvailablePort()
_config_params = ['_tlsBackendPort', '_webServerPort', '_webServerBasicAuthPasswordHashed', '_webServerAPIKeyHashed']
_config_template = """
setMaxTCPClientThreads(1)
print("Launching DOH responder..")
cls._DOHResponder = threading.Thread(name='DOH Responder', target=cls.DOHResponder, args=[cls._tlsBackendPort, cls._toResponderQueue, cls._fromResponderQueue, False, False, None, tlsContext])
- cls._DOHResponder.setDaemon(True)
+ cls._DOHResponder.daemon = True
cls._DOHResponder.start()
class TestOutgoingDOHOpenSSLWrongCertNameButNoCheck(DNSDistTest, OutgoingDOHTests):
- _tlsBackendPort = 10547
+ _tlsBackendPort = pickAvailablePort()
_tlsProvider = 'openssl'
_consoleKey = DNSDistTest.generateConsoleKey()
_consoleKeyB64 = base64.b64encode(_consoleKey).decode('ascii')
print("Launching DOH responder..")
cls._DOHResponder = threading.Thread(name='DOH Responder', target=cls.DOHResponder, args=[cls._tlsBackendPort, cls._toResponderQueue, cls._fromResponderQueue, False, False, None, tlsContext])
- cls._DOHResponder.setDaemon(True)
+ cls._DOHResponder.daemon = True
cls._DOHResponder.start()
class TestOutgoingDOHGnuTLSWrongCertNameButNoCheck(DNSDistTest, OutgoingDOHTests):
- _tlsBackendPort = 10548
+ _tlsBackendPort = pickAvailablePort()
_tlsProvider = 'gnutls'
_consoleKey = DNSDistTest.generateConsoleKey()
_consoleKeyB64 = base64.b64encode(_consoleKey).decode('ascii')
print("Launching DOH responder..")
cls._DOHResponder = threading.Thread(name='DOH Responder', target=cls.DOHResponder, args=[cls._tlsBackendPort, cls._toResponderQueue, cls._fromResponderQueue, False, False, None, tlsContext])
- cls._DOHResponder.setDaemon(True)
+ cls._DOHResponder.daemon = True
cls._DOHResponder.start()
class TestOutgoingDOHBrokenResponsesOpenSSL(DNSDistTest, OutgoingDOHBrokenResponsesTests):
- _tlsBackendPort = 10549
+ _tlsBackendPort = pickAvailablePort()
_config_params = ['_tlsBackendPort', '_webServerPort', '_webServerBasicAuthPasswordHashed', '_webServerAPIKeyHashed']
_config_template = """
setMaxTCPClientThreads(1)
print("Launching DOH responder..")
cls._DOHResponder = threading.Thread(name='DOH Responder', target=cls.DOHResponder, args=[cls._tlsBackendPort, cls._toResponderQueue, cls._fromResponderQueue, False, False, cls.callback, tlsContext])
- cls._DOHResponder.setDaemon(True)
+ cls._DOHResponder.daemon = True
cls._DOHResponder.start()
class TestOutgoingDOHBrokenResponsesGnuTLS(DNSDistTest, OutgoingDOHBrokenResponsesTests):
- _tlsBackendPort = 10550
+ _tlsBackendPort = pickAvailablePort()
_config_params = ['_tlsBackendPort', '_webServerPort', '_webServerBasicAuthPasswordHashed', '_webServerAPIKeyHashed']
_config_template = """
setMaxTCPClientThreads(1)
print("Launching DOH responder..")
cls._DOHResponder = threading.Thread(name='DOH Responder', target=cls.DOHResponder, args=[cls._tlsBackendPort, cls._toResponderQueue, cls._fromResponderQueue, False, False, cls.callback, tlsContext])
- cls._DOHResponder.setDaemon(True)
+ cls._DOHResponder.daemon = True
cls._DOHResponder.start()
class TestOutgoingDOHProxyProtocol(DNSDistTest):
- _tlsBackendPort = 10551
+ _tlsBackendPort = pickAvailablePort()
_config_params = ['_tlsBackendPort']
_config_template = """
setMaxTCPClientThreads(1)
print("Launching DOH woth Proxy Protocol responder..")
cls._DOHResponder = threading.Thread(name='DOH with Proxy Protocol Responder', target=cls.DOHResponder, args=[cls._tlsBackendPort, cls._toResponderQueue, cls._fromResponderQueue, False, False, None, tlsContext, True])
- cls._DOHResponder.setDaemon(True)
+ cls._DOHResponder.daemon = True
cls._DOHResponder.start()
def testPP(self):
self.checkMessageProxyProtocol(receivedProxyPayload, '127.0.0.1', '127.0.0.1', True)
class TestOutgoingDOHXForwarded(DNSDistTest):
- _tlsBackendPort = 10560
+ _tlsBackendPort = pickAvailablePort()
_config_params = ['_tlsBackendPort']
_config_template = """
setMaxTCPClientThreads(1)
print("Launching DOH responder..")
cls._DOHResponder = threading.Thread(name='DOH Responder', target=cls.DOHResponder, args=[cls._tlsBackendPort, cls._toResponderQueue, cls._fromResponderQueue, False, False, cls.callback, tlsContext])
- cls._DOHResponder.setDaemon(True)
+ cls._DOHResponder.daemon = True
cls._DOHResponder.start()
def testXForwarded(self):
import threading
import time
-from dnsdisttests import DNSDistTest
+from dnsdisttests import DNSDistTest, pickAvailablePort
class OutgoingTLSTests(object):
_webTimeout = 2.0
- _webServerPort = 8083
+ _webServerPort = pickAvailablePort()
_webServerBasicAuthPassword = 'secret'
_webServerAPIKey = 'apisecret'
_webServerBasicAuthPasswordHashed = '$scrypt$ln=10,p=1,r=8$6DKLnvUYEeXWh3JNOd3iwg==$kSrhdHaRbZ7R74q3lGBqO1xetgxRxhmWzYJ2Qvfm7JM='
class BrokenOutgoingTLSTests(object):
_webTimeout = 2.0
- _webServerPort = 8083
+ _webServerPort = pickAvailablePort()
_webServerBasicAuthPassword = 'secret'
_webServerAPIKey = 'apisecret'
_webServerBasicAuthPasswordHashed = '$scrypt$ln=10,p=1,r=8$6DKLnvUYEeXWh3JNOd3iwg==$kSrhdHaRbZ7R74q3lGBqO1xetgxRxhmWzYJ2Qvfm7JM='
self.checkNoResponderHit()
class TestOutgoingTLSOpenSSL(DNSDistTest, OutgoingTLSTests):
- _tlsBackendPort = 10443
+ _tlsBackendPort = pickAvailablePort()
_config_params = ['_tlsBackendPort', '_webServerPort', '_webServerBasicAuthPasswordHashed', '_webServerAPIKeyHashed']
_config_template = """
setMaxTCPClientThreads(1)
print("Launching TLS responder..")
cls._TLSResponder = threading.Thread(name='TLS Responder', target=cls.TCPResponder, args=[cls._tlsBackendPort, cls._toResponderQueue, cls._fromResponderQueue, False, False, None, tlsContext])
- cls._TLSResponder.setDaemon(True)
+ cls._TLSResponder.daemon = True
cls._TLSResponder.start()
class TestOutgoingTLSGnuTLS(DNSDistTest, OutgoingTLSTests):
- _tlsBackendPort = 10444
+ _tlsBackendPort = pickAvailablePort()
_config_params = ['_tlsBackendPort', '_webServerPort', '_webServerBasicAuthPasswordHashed', '_webServerAPIKeyHashed']
_config_template = """
setMaxTCPClientThreads(1)
print("Launching TLS responder..")
cls._TLSResponder = threading.Thread(name='TLS Responder', target=cls.TCPResponder, args=[cls._tlsBackendPort, cls._toResponderQueue, cls._fromResponderQueue, False, False, None, tlsContext])
- cls._TLSResponder.setDaemon(True)
+ cls._TLSResponder.daemon = True
cls._TLSResponder.start()
class TestOutgoingTLSOpenSSLWrongCertName(DNSDistTest, BrokenOutgoingTLSTests):
- _tlsBackendPort = 10445
+ _tlsBackendPort = pickAvailablePort()
_config_params = ['_tlsBackendPort', '_webServerPort', '_webServerBasicAuthPasswordHashed', '_webServerAPIKeyHashed']
_config_template = """
setMaxTCPClientThreads(1)
print("Launching TLS responder..")
cls._TLSResponder = threading.Thread(name='TLS Responder', target=cls.TCPResponder, args=[cls._tlsBackendPort, cls._toResponderQueue, cls._fromResponderQueue, False, False, None, tlsContext])
- cls._TLSResponder.setDaemon(True)
+ cls._TLSResponder.daemon = True
cls._TLSResponder.start()
class TestOutgoingTLSGnuTLSWrongCertName(DNSDistTest, BrokenOutgoingTLSTests):
- _tlsBackendPort = 10446
+ _tlsBackendPort = pickAvailablePort()
_config_params = ['_tlsBackendPort', '_webServerPort', '_webServerBasicAuthPasswordHashed', '_webServerAPIKeyHashed']
_config_template = """
setMaxTCPClientThreads(1)
print("Launching TLS responder..")
cls._TLSResponder = threading.Thread(name='TLS Responder', target=cls.TCPResponder, args=[cls._tlsBackendPort, cls._toResponderQueue, cls._fromResponderQueue, False, False, None, tlsContext])
- cls._TLSResponder.setDaemon(True)
+ cls._TLSResponder.daemon = True
cls._TLSResponder.start()
class TestOutgoingTLSOpenSSLWrongCertNameButNoCheck(DNSDistTest, OutgoingTLSTests):
- _tlsBackendPort = 10447
+ _tlsBackendPort = pickAvailablePort()
_config_params = ['_tlsBackendPort', '_webServerPort', '_webServerBasicAuthPasswordHashed', '_webServerAPIKeyHashed']
_config_template = """
setMaxTCPClientThreads(1)
print("Launching TLS responder..")
cls._TLSResponder = threading.Thread(name='TLS Responder', target=cls.TCPResponder, args=[cls._tlsBackendPort, cls._toResponderQueue, cls._fromResponderQueue, False, False, None, tlsContext])
- cls._TLSResponder.setDaemon(True)
+ cls._TLSResponder.daemon = True
cls._TLSResponder.start()
class TestOutgoingTLSGnuTLSWrongCertNameButNoCheck(DNSDistTest, OutgoingTLSTests):
- _tlsBackendPort = 10448
+ _tlsBackendPort = pickAvailablePort()
_config_params = ['_tlsBackendPort', '_webServerPort', '_webServerBasicAuthPasswordHashed', '_webServerAPIKeyHashed']
_config_template = """
setMaxTCPClientThreads(1)
print("Launching TLS responder..")
cls._TLSResponder = threading.Thread(name='TLS Responder', target=cls.TCPResponder, args=[cls._tlsBackendPort, cls._toResponderQueue, cls._fromResponderQueue, False, False, None, tlsContext])
- cls._TLSResponder.setDaemon(True)
+ cls._TLSResponder.daemon = True
cls._TLSResponder.start()
import requests
import subprocess
import unittest
-from dnsdisttests import DNSDistTest
+from dnsdisttests import DNSDistTest, pickAvailablePort
@unittest.skipIf('SKIP_PROMETHEUS_TESTS' in os.environ, 'Prometheus tests are disabled')
class TestPrometheus(DNSDistTest):
_webTimeout = 2.0
- _webServerPort = 8083
+ _webServerPort = pickAvailablePort()
_webServerBasicAuthPassword = 'secret'
_webServerBasicAuthPasswordHashed = '$scrypt$ln=10,p=1,r=8$6DKLnvUYEeXWh3JNOd3iwg==$kSrhdHaRbZ7R74q3lGBqO1xetgxRxhmWzYJ2Qvfm7JM='
_webServerAPIKey = 'apisecret'
import struct
import sys
import time
-from dnsdisttests import DNSDistTest, Queue
+from dnsdisttests import DNSDistTest, pickAvailablePort, Queue
from proxyprotocol import ProxyProtocol
import dns
import dnsmessage_pb2
class DNSDistProtobufTest(DNSDistTest):
- _protobufServerPort = 4242
+ _protobufServerPort = pickAvailablePort()
_protobufQueue = Queue()
_protobufServerID = 'dnsdist-server-1'
_protobufCounter = 0
@classmethod
def startResponders(cls):
cls._UDPResponder = threading.Thread(name='UDP Responder', target=cls.UDPResponder, args=[cls._testServerPort, cls._toResponderQueue, cls._fromResponderQueue])
- cls._UDPResponder.setDaemon(True)
+ cls._UDPResponder.daemon = True
cls._UDPResponder.start()
cls._TCPResponder = threading.Thread(name='TCP Responder', target=cls.TCPResponder, args=[cls._testServerPort, cls._toResponderQueue, cls._fromResponderQueue])
- cls._TCPResponder.setDaemon(True)
+ cls._TCPResponder.daemon = True
cls._TCPResponder.start()
cls._protobufListener = threading.Thread(name='Protobuf Listener', target=cls.ProtobufListener, args=[cls._protobufServerPort])
- cls._protobufListener.setDaemon(True)
+ cls._protobufListener.daemon = True
cls._protobufListener.start()
def getFirstProtobufMessage(self):
_serverCert = 'server.chain'
_serverName = 'tls.tests.dnsdist.org'
_caCert = 'ca.pem'
- _tlsServerPort = 8453
- _dohServerPort = 8443
+ _tlsServerPort = pickAvailablePort()
+ _dohServerPort = pickAvailablePort()
_dohBaseURL = ("https://%s:%d/dns-query" % (_serverName, _dohServerPort))
_config_params = ['_testServerPort', '_protobufServerPort', '_tlsServerPort', '_serverCert', '_serverKey', '_dohServerPort', '_serverCert', '_serverKey']
_config_template = """
import threading
import time
-from dnsdisttests import DNSDistTest
+from dnsdisttests import DNSDistTest, pickAvailablePort
from proxyprotocol import ProxyProtocol
from dnsdistdohtests import DNSDistDOHTest
toProxyQueue = Queue()
fromProxyQueue = Queue()
-proxyResponderPort = 5470
+proxyResponderPort = pickAvailablePort()
udpResponder = threading.Thread(name='UDP Proxy Protocol Responder', target=ProxyProtocolUDPResponder, args=[proxyResponderPort, toProxyQueue, fromProxyQueue])
-udpResponder.setDaemon(True)
+udpResponder.daemon = True
udpResponder.start()
tcpResponder = threading.Thread(name='TCP Proxy Protocol Responder', target=ProxyProtocolTCPResponder, args=[proxyResponderPort, toProxyQueue, fromProxyQueue])
-tcpResponder.setDaemon(True)
+tcpResponder.daemon = True
tcpResponder.start()
class ProxyProtocolTest(DNSDistTest):
_serverCert = 'server.chain'
_serverName = 'tls.tests.dnsdist.org'
_caCert = 'ca.pem'
- _dohServerPort = 8443
+ _dohServerPort = pickAvailablePort()
_dohBaseURL = ("https://%s:%d/dns-query" % (_serverName, _dohServerPort))
_proxyResponderPort = proxyResponderPort
_config_template = """
import threading
import clientsubnetoption
import dns
-from dnsdisttests import DNSDistTest
+from dnsdisttests import DNSDistTest, pickAvailablePort
def servFailResponseCallback(request):
response = dns.message.make_response(request)
class TestRestartQuery(DNSDistTest):
# this test suite uses different responder ports
- _testNormalServerPort = 5420
- _testServfailServerPort = 5421
+ _testNormalServerPort = pickAvailablePort()
+ _testServfailServerPort = pickAvailablePort()
_config_template = """
newServer{address="127.0.0.1:%d", pool='restarted'}:setUp()
newServer{address="127.0.0.1:%d", pool=''}:setUp()
# servfail
cls._UDPResponder = threading.Thread(name='UDP Responder', target=cls.UDPResponder, args=[cls._testServfailServerPort, cls._toResponderQueue, cls._fromResponderQueue, False, servFailResponseCallback])
- cls._UDPResponder.setDaemon(True)
+ cls._UDPResponder.daemon = True
cls._UDPResponder.start()
cls._TCPResponder = threading.Thread(name='TCP Responder', target=cls.TCPResponder, args=[cls._testServfailServerPort, cls._toResponderQueue, cls._fromResponderQueue, False, False, servFailResponseCallback])
- cls._TCPResponder.setDaemon(True)
+ cls._TCPResponder.daemon = True
cls._TCPResponder.start()
cls._UDPResponderNormal = threading.Thread(name='UDP ResponderNormal', target=cls.UDPResponder, args=[cls._testNormalServerPort, cls._toResponderQueue, cls._fromResponderQueue, False, normalResponseCallback])
- cls._UDPResponderNormal.setDaemon(True)
+ cls._UDPResponderNormal.daemon = True
cls._UDPResponderNormal.start()
cls._TCPResponderNormal = threading.Thread(name='TCP ResponderNormal', target=cls.TCPResponder, args=[cls._testNormalServerPort, cls._toResponderQueue, cls._fromResponderQueue, False, False, normalResponseCallback])
- cls._TCPResponderNormal.setDaemon(True)
+ cls._TCPResponderNormal.daemon = True
cls._TCPResponderNormal.start()
def testRestartingQuery(self):
sender = getattr(self, method)
(_, receivedResponse) = sender(query, response=None, useQueue=False)
self.assertTrue(receivedResponse)
- self.assertEquals(receivedResponse, expectedResponse)
+ self.assertEqual(receivedResponse, expectedResponse)
import threading
import time
import dns
-from dnsdisttests import DNSDistTest
+from dnsdisttests import DNSDistTest, pickAvailablePort
class TestRoutingPoolRouting(DNSDistTest):
class TestRoutingRoundRobinLB(DNSDistTest):
- _testServer2Port = 5351
+ _testServer2Port = pickAvailablePort()
_config_params = ['_testServerPort', '_testServer2Port']
_config_template = """
setServerPolicy(roundrobin)
def startResponders(cls):
print("Launching responders..")
cls._UDPResponder = threading.Thread(name='UDP Responder', target=cls.UDPResponder, args=[cls._testServerPort, cls._toResponderQueue, cls._fromResponderQueue])
- cls._UDPResponder.setDaemon(True)
+ cls._UDPResponder.daemon = True
cls._UDPResponder.start()
cls._UDPResponder2 = threading.Thread(name='UDP Responder 2', target=cls.UDPResponder, args=[cls._testServer2Port, cls._toResponderQueue, cls._fromResponderQueue])
- cls._UDPResponder2.setDaemon(True)
+ cls._UDPResponder2.daemon = True
cls._UDPResponder2.start()
cls._TCPResponder = threading.Thread(name='TCP Responder', target=cls.TCPResponder, args=[cls._testServerPort, cls._toResponderQueue, cls._fromResponderQueue])
- cls._TCPResponder.setDaemon(True)
+ cls._TCPResponder.daemon = True
cls._TCPResponder.start()
cls._TCPResponder2 = threading.Thread(name='TCP Responder 2', target=cls.TCPResponder, args=[cls._testServer2Port, cls._toResponderQueue, cls._fromResponderQueue])
- cls._TCPResponder2.setDaemon(True)
+ cls._TCPResponder2.daemon = True
cls._TCPResponder2.start()
def testRR(self):
class TestRoutingRoundRobinLBOneDown(DNSDistTest):
- _testServer2Port = 5351
+ _testServer2Port = pickAvailablePort()
_config_params = ['_testServerPort', '_testServer2Port']
_config_template = """
setServerPolicy(roundrobin)
class TestRoutingRoundRobinLBAllDown(DNSDistTest):
- _testServer2Port = 5351
+ _testServer2Port = pickAvailablePort()
_config_params = ['_testServerPort', '_testServer2Port']
_config_template = """
setServerPolicy(roundrobin)
class TestRoutingLuaFFIPerThreadRoundRobinLB(DNSDistTest):
- _testServer2Port = 5351
+ _testServer2Port = pickAvailablePort()
_config_params = ['_testServerPort', '_testServer2Port']
_config_template = """
-- otherwise we start too many TCP workers, and as each thread
def startResponders(cls):
print("Launching responders..")
cls._UDPResponder = threading.Thread(name='UDP Responder', target=cls.UDPResponder, args=[cls._testServerPort, cls._toResponderQueue, cls._fromResponderQueue])
- cls._UDPResponder.setDaemon(True)
+ cls._UDPResponder.daemon = True
cls._UDPResponder.start()
cls._UDPResponder2 = threading.Thread(name='UDP Responder 2', target=cls.UDPResponder, args=[cls._testServer2Port, cls._toResponderQueue, cls._fromResponderQueue])
- cls._UDPResponder2.setDaemon(True)
+ cls._UDPResponder2.daemon = True
cls._UDPResponder2.start()
cls._TCPResponder = threading.Thread(name='TCP Responder', target=cls.TCPResponder, args=[cls._testServerPort, cls._toResponderQueue, cls._fromResponderQueue])
- cls._TCPResponder.setDaemon(True)
+ cls._TCPResponder.daemon = True
cls._TCPResponder.start()
cls._TCPResponder2 = threading.Thread(name='TCP Responder 2', target=cls.TCPResponder, args=[cls._testServer2Port, cls._toResponderQueue, cls._fromResponderQueue])
- cls._TCPResponder2.setDaemon(True)
+ cls._TCPResponder2.daemon = True
cls._TCPResponder2.start()
def testRR(self):
class TestRoutingOrder(DNSDistTest):
- _testServer2Port = 5351
+ _testServer2Port = pickAvailablePort()
_config_params = ['_testServerPort', '_testServer2Port']
_config_template = """
setServerPolicy(firstAvailable)
def startResponders(cls):
print("Launching responders..")
cls._UDPResponder = threading.Thread(name='UDP Responder', target=cls.UDPResponder, args=[cls._testServerPort, cls._toResponderQueue, cls._fromResponderQueue])
- cls._UDPResponder.setDaemon(True)
+ cls._UDPResponder.daemon = True
cls._UDPResponder.start()
cls._UDPResponder2 = threading.Thread(name='UDP Responder 2', target=cls.UDPResponder, args=[cls._testServer2Port, cls._toResponderQueue, cls._fromResponderQueue])
- cls._UDPResponder2.setDaemon(True)
+ cls._UDPResponder2.daemon = True
cls._UDPResponder2.start()
cls._TCPResponder = threading.Thread(name='TCP Responder', target=cls.TCPResponder, args=[cls._testServerPort, cls._toResponderQueue, cls._fromResponderQueue])
- cls._TCPResponder.setDaemon(True)
+ cls._TCPResponder.daemon = True
cls._TCPResponder.start()
cls._TCPResponder2 = threading.Thread(name='TCP Responder 2', target=cls.TCPResponder, args=[cls._testServer2Port, cls._toResponderQueue, cls._fromResponderQueue])
- cls._TCPResponder2.setDaemon(True)
+ cls._TCPResponder2.daemon = True
cls._TCPResponder2.start()
def testOrder(self):
class TestFirstAvailableQPSPacketCacheHits(DNSDistTest):
_verboseMode = True
- _testServer2Port = 5351
+ _testServer2Port = pickAvailablePort()
_config_params = ['_testServerPort', '_testServer2Port']
_config_template = """
setServerPolicy(firstAvailable)
def startResponders(cls):
print("Launching responders..")
cls._UDPResponder = threading.Thread(name='UDP Responder', target=cls.UDPResponder, args=[cls._testServerPort, cls._toResponderQueue, cls._fromResponderQueue])
- cls._UDPResponder.setDaemon(True)
+ cls._UDPResponder.daemon = True
cls._UDPResponder.start()
cls._UDPResponder2 = threading.Thread(name='UDP Responder 2', target=cls.UDPResponder, args=[cls._testServer2Port, cls._toResponderQueue, cls._fromResponderQueue])
- cls._UDPResponder2.setDaemon(True)
+ cls._UDPResponder2.daemon = True
cls._UDPResponder2.start()
cls._TCPResponder = threading.Thread(name='TCP Responder', target=cls.TCPResponder, args=[cls._testServerPort, cls._toResponderQueue, cls._fromResponderQueue])
- cls._TCPResponder.setDaemon(True)
+ cls._TCPResponder.daemon = True
cls._TCPResponder.start()
cls._TCPResponder2 = threading.Thread(name='TCP Responder 2', target=cls.TCPResponder, args=[cls._testServer2Port, cls._toResponderQueue, cls._fromResponderQueue])
- cls._TCPResponder2.setDaemon(True)
+ cls._TCPResponder2.daemon = True
cls._TCPResponder2.start()
def testOrderQPSCacheHits(self):
class TestRoutingWRandom(DNSDistTest):
- _testServer2Port = 5351
+ _testServer2Port = pickAvailablePort()
_config_params = ['_testServerPort', '_testServer2Port']
_config_template = """
setServerPolicy(wrandom)
def startResponders(cls):
print("Launching responders..")
cls._UDPResponder = threading.Thread(name='UDP Responder', target=cls.UDPResponder, args=[cls._testServerPort, cls._toResponderQueue, cls._fromResponderQueue])
- cls._UDPResponder.setDaemon(True)
+ cls._UDPResponder.daemon = True
cls._UDPResponder.start()
cls._UDPResponder2 = threading.Thread(name='UDP Responder 2', target=cls.UDPResponder, args=[cls._testServer2Port, cls._toResponderQueue, cls._fromResponderQueue])
- cls._UDPResponder2.setDaemon(True)
+ cls._UDPResponder2.daemon = True
cls._UDPResponder2.start()
cls._TCPResponder = threading.Thread(name='TCP Responder', target=cls.TCPResponder, args=[cls._testServerPort, cls._toResponderQueue, cls._fromResponderQueue])
- cls._TCPResponder.setDaemon(True)
+ cls._TCPResponder.daemon = True
cls._TCPResponder.start()
cls._TCPResponder2 = threading.Thread(name='TCP Responder 2', target=cls.TCPResponder, args=[cls._testServer2Port, cls._toResponderQueue, cls._fromResponderQueue])
- cls._TCPResponder2.setDaemon(True)
+ cls._TCPResponder2.daemon = True
cls._TCPResponder2.start()
def testWRandom(self):
class TestRoutingHighValueWRandom(DNSDistTest):
- _testServer2Port = 5351
+ _testServer2Port = pickAvailablePort()
_consoleKey = DNSDistTest.generateConsoleKey()
_consoleKeyB64 = base64.b64encode(_consoleKey).decode('ascii')
_config_params = ['_consoleKeyB64', '_consolePort', '_testServerPort', '_testServer2Port']
def startResponders(cls):
print("Launching responders..")
cls._UDPResponder = threading.Thread(name='UDP Responder', target=cls.UDPResponder, args=[cls._testServerPort, cls._toResponderQueue, cls._fromResponderQueue])
- cls._UDPResponder.setDaemon(True)
+ cls._UDPResponder.daemon = True
cls._UDPResponder.start()
cls._UDPResponder2 = threading.Thread(name='UDP Responder 2', target=cls.UDPResponder, args=[cls._testServer2Port, cls._toResponderQueue, cls._fromResponderQueue])
- cls._UDPResponder2.setDaemon(True)
+ cls._UDPResponder2.daemon = True
cls._UDPResponder2.start()
cls._TCPResponder = threading.Thread(name='TCP Responder', target=cls.TCPResponder, args=[cls._testServerPort, cls._toResponderQueue, cls._fromResponderQueue])
- cls._TCPResponder.setDaemon(True)
+ cls._TCPResponder.daemon = True
cls._TCPResponder.start()
cls._TCPResponder2 = threading.Thread(name='TCP Responder 2', target=cls.TCPResponder, args=[cls._testServer2Port, cls._toResponderQueue, cls._fromResponderQueue])
- cls._TCPResponder2.setDaemon(True)
+ cls._TCPResponder2.daemon = True
cls._TCPResponder2.start()
def testHighValueWRandom(self):
import requests
import socket
import struct
-from dnsdisttests import DNSDistTest
+from dnsdisttests import DNSDistTest, pickAvailablePort
class TestBrokenTCPFastOpen(DNSDistTest):
# because, contrary to the other ones, its
# TCP responder will accept a connection, read the
# query then just close the connection right away
- _testServerPort = 5410
+ _testServerPort = pickAvailablePort()
_testServerRetries = 5
_webTimeout = 2.0
- _webServerPort = 8083
+ _webServerPort = pickAvailablePort()
_webServerBasicAuthPassword = 'secret'
_webServerBasicAuthPasswordHashed = '$scrypt$ln=10,p=1,r=8$6DKLnvUYEeXWh3JNOd3iwg==$kSrhdHaRbZ7R74q3lGBqO1xetgxRxhmWzYJ2Qvfm7JM='
_webServerAPIKey = 'apisecret'
# Normal responder
cls._UDPResponder = threading.Thread(name='UDP Responder', target=cls.UDPResponder, args=[cls._testServerPort, cls._toResponderQueue, cls._fromResponderQueue])
- cls._UDPResponder.setDaemon(True)
+ cls._UDPResponder.daemon = True
cls._UDPResponder.start()
# Close the connection right after reading the query
cls._TCPResponder = threading.Thread(name='Broken TCP Responder', target=cls.BrokenTCPResponder, args=[cls._testServerPort])
- cls._TCPResponder.setDaemon(True)
+ cls._TCPResponder.daemon = True
cls._TCPResponder.start()
def testTCOFastOpenOnCloseAfterRead(self):
import struct
import time
import dns
-from dnsdisttests import DNSDistTest
+from dnsdisttests import DNSDistTest, pickAvailablePort
try:
range = xrange
# this test suite uses a different responder port
# because it uses a different health check configuration
- _testServerPort = 5395
+ _testServerPort = pickAvailablePort()
_answerUnexpected = True
_tcpIdleTimeout = 2
# this test suite uses a different responder port
# because it uses a different health check configuration
- _testServerPort = 5395
+ _testServerPort = pickAvailablePort()
_answerUnexpected = True
_skipListeningOnCL = True
import threading
import time
import dns
-from dnsdisttests import DNSDistTest
+from dnsdisttests import DNSDistTest, pickAvailablePort
try:
range = xrange
# because, contrary to the other ones, its
# responders allow trailing data and multiple responses,
# and we don't want to mix things up.
- _testServerPort = 5361
+ _testServerPort = pickAvailablePort()
_serverKey = 'server.key'
_serverCert = 'server.chain'
_serverName = 'tls.tests.dnsdist.org'
_caCert = 'ca.pem'
- _tlsServerPort = 8453
+ _tlsServerPort = pickAvailablePort()
_tcpSendTimeout = 60
_config_template = """
newServer{address="127.0.0.1:%s"}
print("Launching responders..")
cls._UDPResponder = threading.Thread(name='UDP Responder', target=cls.UDPResponder, args=[cls._testServerPort, cls._toResponderQueue, cls._fromResponderQueue, True])
- cls._UDPResponder.setDaemon(True)
+ cls._UDPResponder.daemon = True
cls._UDPResponder.start()
cls._TCPResponder = threading.Thread(name='TCP Responder', target=cls.TCPResponder, args=[cls._testServerPort, cls._toResponderQueue, cls._fromResponderQueue, True, True])
- cls._TCPResponder.setDaemon(True)
+ cls._TCPResponder.daemon = True
cls._TCPResponder.start()
def testTCPShortRead(self):
import subprocess
import time
import unittest
-from dnsdisttests import DNSDistTest
+from dnsdisttests import DNSDistTest, pickAvailablePort
class TLSTests(object):
_serverCert = 'server.chain'
_serverName = 'tls.tests.dnsdist.org'
_caCert = 'ca.pem'
- _tlsServerPort = 8453
+ _tlsServerPort = pickAvailablePort()
_config_template = """
setKey("%s")
controlSocket("127.0.0.1:%s")
_config_params = ['_consoleKeyB64', '_consolePort', '_testServerPort', '_tlsServerPort', '_serverCert', '_serverKey']
def testProvider(self):
- self.assertEquals(self.getTLSProvider(), "openssl")
+ self.assertEqual(self.getTLSProvider(), "openssl")
class TestGnuTLS(DNSDistTest, TLSTests):
_serverCert = 'server.chain'
_serverName = 'tls.tests.dnsdist.org'
_caCert = 'ca.pem'
- _tlsServerPort = 8453
+ _tlsServerPort = pickAvailablePort()
_config_template = """
setKey("%s")
controlSocket("127.0.0.1:%s")
_config_params = ['_consoleKeyB64', '_consolePort', '_testServerPort', '_tlsServerPort', '_serverCert', '_serverKey']
def testProvider(self):
- self.assertEquals(self.getTLSProvider(), "gnutls")
+ self.assertEqual(self.getTLSProvider(), "gnutls")
class TestDOTWithCache(DNSDistTest):
_serverKey = 'server.key'
_serverCert = 'server.chain'
_serverName = 'tls.tests.dnsdist.org'
_caCert = 'ca.pem'
- _tlsServerPort = 8453
+ _tlsServerPort = pickAvailablePort()
_config_template = """
newServer{address="127.0.0.1:%s"}
# this test suite uses a different responder port
# because it uses a different health check configuration
- _testServerPort = 5395
+ _testServerPort = pickAvailablePort()
_answerUnexpected = True
_serverKey = 'server.key'
_serverCert = 'server.chain'
_serverName = 'tls.tests.dnsdist.org'
_caCert = 'ca.pem'
- _tlsServerPort = 8453
+ _tlsServerPort = pickAvailablePort()
_skipListeningOnCL = True
_tcpIdleTimeout = 2
_serverCert = 'server.chain'
_serverName = 'tls.tests.dnsdist.org'
_caCert = 'ca.pem'
- _tlsServerPort = 8453
+ _tlsServerPort = pickAvailablePort()
_config_template = """
function checkDOT(dq)
_pkcsPassphrase = 'passw0rd'
_serverName = 'tls.tests.dnsdist.org'
_caCert = 'ca.pem'
- _tlsServerPort = 8453
+ _tlsServerPort = pickAvailablePort()
_config_template = """
setKey("%s")
controlSocket("127.0.0.1:%s")
import tempfile
import time
import unittest
-from dnsdisttests import DNSDistTest
+from dnsdisttests import DNSDistTest, pickAvailablePort
try:
range = xrange
except NameError:
_serverCert = 'server.chain'
_serverName = 'tls.tests.dnsdist.org'
_caCert = 'ca.pem'
- _dohServerPort = 8443
+ _dohServerPort = pickAvailablePort()
_numberOfKeys = 0
_config_template = """
newServer{address="127.0.0.1:%s"}
_serverCert = 'server.chain'
_serverName = 'tls.tests.dnsdist.org'
_caCert = 'ca.pem'
- _dohServerPort = 8443
+ _dohServerPort = pickAvailablePort()
_numberOfKeys = 5
_config_template = """
setKey("%s")
_serverCert = 'server.chain'
_serverName = 'tls.tests.dnsdist.org'
_caCert = 'ca.pem'
- _tlsServerPort = 8443
+ _tlsServerPort = pickAvailablePort()
_numberOfKeys = 0
_config_template = """
newServer{address="127.0.0.1:%s"}
_serverCert = 'server.chain'
_serverName = 'tls.tests.dnsdist.org'
_caCert = 'ca.pem'
- _tlsServerPort = 8443
+ _tlsServerPort = pickAvailablePort()
_numberOfKeys = 5
_config_template = """
setKey("%s")
import threading
import clientsubnetoption
import dns
-from dnsdisttests import DNSDistTest, Queue
+from dnsdisttests import DNSDistTest, Queue, pickAvailablePort
class TestTeeAction(DNSDistTest):
_consoleKey = DNSDistTest.generateConsoleKey()
_consoleKeyB64 = base64.b64encode(_consoleKey).decode('ascii')
- _teeServerPort = 5390
+ _teeServerPort = pickAvailablePort()
_toTeeQueue = Queue()
_fromTeeQueue = Queue()
_config_template = """
print("Launching responders..")
cls._UDPResponder = threading.Thread(name='UDP Responder', target=cls.UDPResponder, args=[cls._testServerPort, cls._toResponderQueue, cls._fromResponderQueue])
- cls._UDPResponder.setDaemon(True)
+ cls._UDPResponder.daemon = True
cls._UDPResponder.start()
cls._TCPResponder = threading.Thread(name='TCP Responder', target=cls.TCPResponder, args=[cls._testServerPort, cls._toResponderQueue, cls._fromResponderQueue, False, True])
- cls._TCPResponder.setDaemon(True)
+ cls._TCPResponder.daemon = True
cls._TCPResponder.start()
cls._TeeResponder = threading.Thread(name='Tee Responder', target=cls.UDPResponder, args=[cls._teeServerPort, cls._toTeeQueue, cls._fromTeeQueue])
- cls._TeeResponder.setDaemon(True)
+ cls._TeeResponder.daemon = True
cls._TeeResponder.start()
def testTeeWithECS(self):
#!/usr/bin/env python
import threading
import dns
-from dnsdisttests import DNSDistTest
+from dnsdisttests import DNSDistTest, pickAvailablePort
class TestTrailingDataToBackend(DNSDistTest):
# because, contrary to the other ones, its
# responders allow trailing data and we don't want
# to mix things up.
- _testServerPort = 5360
+ _testServerPort = pickAvailablePort()
_verboseMode = True
_config_template = """
newServer{address="127.0.0.1:%s"}
# Respond REFUSED to queries with trailing data.
cls._UDPResponder = threading.Thread(name='UDP Responder', target=cls.UDPResponder, args=[cls._testServerPort, cls._toResponderQueue, cls._fromResponderQueue, dns.rcode.REFUSED])
- cls._UDPResponder.setDaemon(True)
+ cls._UDPResponder.daemon = True
cls._UDPResponder.start()
# Respond REFUSED to queries with trailing data.
cls._TCPResponder = threading.Thread(name='TCP Responder', target=cls.TCPResponder, args=[cls._testServerPort, cls._toResponderQueue, cls._fromResponderQueue, dns.rcode.REFUSED])
- cls._TCPResponder.setDaemon(True)
+ cls._TCPResponder.daemon = True
cls._TCPResponder.start()
def testTrailingPassthrough(self):