From: Remi Gacogne Date: Thu, 10 Sep 2020 10:20:57 +0000 (+0200) Subject: dnsdist: Add some tests for out of order processing X-Git-Tag: auth-4.5.0-alpha0~14^2~18 X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=db7acdaf5dd498528dd151a746f225b3ffea8b1b;p=thirdparty%2Fpdns.git dnsdist: Add some tests for out of order processing --- diff --git a/regression-tests.dnsdist/dnsdisttests.py b/regression-tests.dnsdist/dnsdisttests.py index 3145830121..4bea3f9a11 100644 --- a/regression-tests.dnsdist/dnsdisttests.py +++ b/regression-tests.dnsdist/dnsdisttests.py @@ -59,6 +59,7 @@ class DNSDistTest(AssertEqualDNSMessageMixin, unittest.TestCase): _answerUnexpected = True _checkConfigExpectedOutput = None _verboseMode = False + _skipListeningOnCL = False @classmethod def startResponders(cls): @@ -81,8 +82,12 @@ class DNSDistTest(AssertEqualDNSMessageMixin, unittest.TestCase): conf.write("-- Autogenerated by dnsdisttests.py\n") conf.write(cls._config_template % params) - dnsdistcmd = [os.environ['DNSDISTBIN'], '--supervised', '-C', confFile, - '-l', '%s:%d' % (cls._dnsDistListeningAddr, cls._dnsDistPort) ] + if cls._skipListeningOnCL: + dnsdistcmd = [os.environ['DNSDISTBIN'], '--supervised', '-C', confFile ] + else: + dnsdistcmd = [os.environ['DNSDISTBIN'], '--supervised', '-C', confFile, + '-l', '%s:%d' % (cls._dnsDistListeningAddr, cls._dnsDistPort) ] + if cls._verboseMode: dnsdistcmd.append('-v') @@ -328,13 +333,16 @@ class DNSDistTest(AssertEqualDNSMessageMixin, unittest.TestCase): return (receivedQuery, message) @classmethod - def openTCPConnection(cls, timeout=None): + def openTCPConnection(cls, timeout=None, port=None): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) if timeout: sock.settimeout(timeout) - sock.connect(("127.0.0.1", cls._dnsDistPort)) + if not port: + port = cls._dnsDistPort + + sock.connect(("127.0.0.1", port)) return sock @classmethod diff --git a/regression-tests.dnsdist/test_OOOR.py b/regression-tests.dnsdist/test_OOOR.py new file mode 100644 index 0000000000..d6ac134382 --- /dev/null +++ b/regression-tests.dnsdist/test_OOOR.py @@ -0,0 +1,311 @@ +#!/usr/bin/env python +import dns +import socket +import struct +import time +import threading +from dnsdisttests import DNSDistTest + +class OOORTCPResponder(object): + + def handleConnection(self, conn): + try: + + while True: + try: + data = conn.recv(2) + except socket.timeout: + data = None + + if not data: + conn.close() + break + + (datalen,) = struct.unpack("!H", data) + data = conn.recv(datalen) + + # computing the correct ID for the response + request = dns.message.from_wire(data) + #print("got a query for %s" % (request.question[0].name)) + if request.question[0].name == "0.simple.ooor.tests.powerdns.com": + time.sleep(1) + + response = dns.message.make_response(request) + + wire = response.to_wire() + conn.send(struct.pack("!H", len(wire))) + conn.send(wire) + + finally: + conn.close() + + def __init__(self, port): + OOORTCPResponder.numberOfConnections = 0 + + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) + sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + try: + sock.bind(("127.0.0.1", port)) + except socket.error as e: + print("Error binding in the TCP responder: %s" % str(e)) + sys.exit(1) + + sock.listen(100) + while True: + (conn, _) = sock.accept() + conn.settimeout(5.0) + + OOORTCPResponder.numberOfConnections = OOORTCPResponder.numberOfConnections + 1 + thread = threading.Thread(name='Connection Handler', + target=self.handleConnection, + args=[conn]) + thread.setDaemon(True) + thread.start() + + sock.close() + +class ReverseOOORTCPResponder(OOORTCPResponder): + + def handleConnection(self, conn): + try: + # short timeout since we want to answer only after receiving 5 requests + # or a timeout + conn.settimeout(0.2) + + queuedResponses = [] + while True: + timedout = False + try: + data = conn.recv(2) + except socket.timeout: + data = None + timedout = True + + if timedout or len(queuedResponses) >= 5: + queuedResponses.reverse() + for response in queuedResponses: + wire = response.to_wire() + conn.send(struct.pack("!H", len(wire))) + conn.send(wire) + queuedResponses = [] + if timedout: + continue + elif not data: + conn.close() + break + + (datalen,) = struct.unpack("!H", data) + data = conn.recv(datalen) + + # computing the correct ID for the response + request = dns.message.from_wire(data) + #print("got a query for %s" % (request.question[0].name)) + + response = dns.message.make_response(request) + queuedResponses.append(response) + + finally: + conn.close() + + def __init__(self, port): + ReverseOOORTCPResponder.numberOfConnections = 0 + + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) + sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + try: + sock.bind(("127.0.0.1", port)) + except socket.error as e: + print("Error binding in the TCP responder: %s" % str(e)) + sys.exit(1) + + sock.listen(100) + while True: + (conn, _) = sock.accept() + + ReverseOOORTCPResponder.numberOfConnections = ReverseOOORTCPResponder.numberOfConnections + 1 + thread = threading.Thread(name='Connection Handler', + target=self.handleConnection, + args=[conn]) + thread.setDaemon(True) + thread.start() + + sock.close() + + +OOORResponderPort = 5371 +ooorTCPResponder = threading.Thread(name='TCP Responder', target=OOORTCPResponder, args=[OOORResponderPort]) +ooorTCPResponder.setDaemon(True) +ooorTCPResponder.start() + +ReverseOOORResponderPort = 5372 +ReverseOoorTCPResponder = threading.Thread(name='TCP Responder', target=ReverseOOORTCPResponder, args=[ReverseOOORResponderPort]) +ReverseOoorTCPResponder.setDaemon(True) +ReverseOoorTCPResponder.start() + +class TestOOORWithClientNotBackend(DNSDistTest): + # this test suite uses a different responder port + _testServerPort = OOORResponderPort + + _concurrentQueriesFromClient = 10 + _config_template = """ + newServer{address="127.0.0.1:%d", maxInFlight=0, pool={""}}:setUp() + newServer{address="127.0.0.1:%d", maxInFlight=0, pool={"more-queries"}}:setUp() + -- route these queries to a different backend so we don't reuse the connection from a previous test + addAction("more-queries.ooor.tests.powerdns.com.", PoolAction("more-queries")) + setLocal("%s:%d", {maxInFlight=%d}) + """ + _config_params = ['_testServerPort', '_testServerPort', '_dnsDistListeningAddr', '_dnsDistPort', '_concurrentQueriesFromClient'] + _verboseMode = True + _skipListeningOnCL = True + + @classmethod + def startResponders(cls): + return + + def testSimple(self): + """ + OOOR: 5 queries + """ + names = [] + OOORTCPResponder.numberOfConnections = 0 + + for idx in range(5): + names.append('%d.simple.ooor.tests.powerdns.com.' % (idx)) + + conn = self.openTCPConnection() + + for name in names: + query = dns.message.make_query(name, 'A', 'IN', use_edns=False) + + self.sendTCPQueryOverConnection(conn, query) + + receivedResponses = {} + + for name in names: + receivedResponse = self.recvTCPResponseOverConnection(conn) + self.assertTrue(receivedResponse) + receivedResponses[str(receivedResponse.question[0].name)] = (receivedResponse) + + self.assertEquals(len(receivedResponses), 5) + for idx in range(5): + self.assertIn('%d.simple.ooor.tests.powerdns.com.' % (idx), receivedResponses) + + # we can get a response to one of the first query before they all have + # been read, reusing a backend connection + self.assertLessEqual(OOORTCPResponder.numberOfConnections, 5) + + def testMoreQueriesThanAllowedInFlight(self): + """ + OOOR: 100 queries, 10 in flight + """ + names = [] + OOORTCPResponder.numberOfConnections = 0 + + for idx in range(100): + names.append('%d.more-queries.ooor.tests.powerdns.com.' % (idx)) + + conn = self.openTCPConnection() + + for name in names: + query = dns.message.make_query(name, 'A', 'IN', use_edns=False) + + self.sendTCPQueryOverConnection(conn, query) + + receivedResponses = {} + + for name in names: + receivedResponse = self.recvTCPResponseOverConnection(conn) + self.assertTrue(receivedResponse) + receivedResponses[str(receivedResponse.question[0].name)] = (receivedResponse) + + self.assertEquals(len(receivedResponses), 100) + for idx in range(5): + self.assertIn('%d.more-queries.ooor.tests.powerdns.com.' % (idx), receivedResponses) + + self.assertLessEqual(OOORTCPResponder.numberOfConnections, 10) + +class TestOOORWithClientAndBackend(DNSDistTest): + # this test suite uses a different responder port + _testServerPort = ReverseOOORResponderPort + + _concurrentQueriesFromClient = 10 + _concurrentQueriesToServer = 5 + _config_template = """ + newServer{address="127.0.0.1:%d", maxInFlight=%d, pool={""}}:setUp() + newServer{address="127.0.0.1:%d", maxInFlight=%d, pool={"more-queries"}}:setUp() + -- route these queries to a different backend so we don't reuse the connection from a previous test + addAction("more-queries.reverse-ooor.tests.powerdns.com.", PoolAction("more-queries")) + setLocal("%s:%d", {maxInFlight=%d}) + """ + _config_params = ['_testServerPort', '_concurrentQueriesToServer', '_testServerPort', '_concurrentQueriesToServer', '_dnsDistListeningAddr', '_dnsDistPort', '_concurrentQueriesFromClient'] + _verboseMode = True + _skipListeningOnCL = True + + @classmethod + def startResponders(cls): + return + + def testSimple(self): + """ + OOOR Reverse: 5 queries + """ + names = [] + ReverseOOORTCPResponder.numberOfConnections = 0 + + for idx in range(5): + names.append('%d.simple.reverse-ooor.tests.powerdns.com.' % (idx)) + + conn = self.openTCPConnection() + + for name in names: + query = dns.message.make_query(name, 'A', 'IN', use_edns=False) + + self.sendTCPQueryOverConnection(conn, query) + + receivedResponses = {} + + for name in names: + receivedResponse = self.recvTCPResponseOverConnection(conn) + self.assertTrue(receivedResponse) + receivedResponses[str(receivedResponse.question[0].name)] = (receivedResponse) + + self.assertEquals(len(receivedResponses), 5) + for idx in range(5): + self.assertIn('%d.simple.reverse-ooor.tests.powerdns.com.' % (idx), receivedResponses) + + self.assertEquals(ReverseOOORTCPResponder.numberOfConnections, 1) + + def testMoreQueriesThanAllowedInFlight(self): + """ + OOOR Reverse: 100 queries, 10 in flight, 5 per backend + """ + names = [] + ReverseOOORTCPResponder.numberOfConnections = 0 + + for idx in range(100): + names.append('%d.more-queries.reverse-ooor.tests.powerdns.com.' % (idx)) + + conn = self.openTCPConnection() + + for name in names: + query = dns.message.make_query(name, 'A', 'IN', use_edns=False) + + self.sendTCPQueryOverConnection(conn, query) + + receivedResponses = {} + + for name in names: + receivedResponse = self.recvTCPResponseOverConnection(conn) + self.assertTrue(receivedResponse) + receivedResponses[str(receivedResponse.question[0].name)] = (receivedResponse) + #print("Received a response for %s" % (receivedResponse.question[0].name)) + + self.assertEquals(len(receivedResponses), 100) + for idx in range(5): + self.assertIn('%d.more-queries.reverse-ooor.tests.powerdns.com.' % (idx), receivedResponses) + + # in theory they could all be handled by the same backend if we get the responses + # fast enough, but over 100 queries that's very, very unlikely + self.assertEquals(ReverseOOORTCPResponder.numberOfConnections, 2) diff --git a/regression-tests.dnsdist/test_ProxyProtocol.py b/regression-tests.dnsdist/test_ProxyProtocol.py index 9c3313d40f..9dd9a3fac0 100644 --- a/regression-tests.dnsdist/test_ProxyProtocol.py +++ b/regression-tests.dnsdist/test_ProxyProtocol.py @@ -64,7 +64,7 @@ def ProxyProtocolTCPResponder(port, fromQueue, toQueue): # be aware that this responder will not accept a new connection # until the last one has been closed. This is done on purpose to # to check for connection reuse, making sure that a lot of connections - # are not open in parallel. + # are not opened in parallel. sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)