]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
dnsdist: Add a few more regression tests for DNS over TLS
authorRemi Gacogne <remi.gacogne@powerdns.com>
Wed, 3 Apr 2019 15:29:39 +0000 (17:29 +0200)
committerRemi Gacogne <remi.gacogne@powerdns.com>
Thu, 4 Apr 2019 09:54:05 +0000 (11:54 +0200)
regression-tests.dnsdist/test_TLS.py

index 68aaa83e6d18562556964e00ff2047f7f63e407d..b31c6c2f9bfcf4933bdc99103b5eb2febbf2a035 100644 (file)
@@ -38,3 +38,55 @@ class TestTLS(DNSDistTest):
         receivedQuery.id = query.id
         self.assertEquals(query, receivedQuery)
         self.assertEquals(response, receivedResponse)
+
+    def testTLKA(self):
+        """
+        TLS: Several queries over the same connection
+        """
+        name = 'ka.tls.tests.powerdns.com.'
+        query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
+        response = dns.message.make_response(query)
+        rrset = dns.rrset.from_text(name,
+                                    3600,
+                                    dns.rdataclass.IN,
+                                    dns.rdatatype.A,
+                                    '127.0.0.1')
+        response.answer.append(rrset)
+
+        conn = self.openTLSConnection(self._tlsServerPort, self._serverName, self._caCert)
+
+        for idx in range(5):
+            self.sendTCPQueryOverConnection(conn, query, response=response)
+            (receivedQuery, receivedResponse) = self.recvTCPResponseOverConnection(conn, useQueue=True)
+            self.assertTrue(receivedQuery)
+            self.assertTrue(receivedResponse)
+            receivedQuery.id = query.id
+            self.assertEquals(query, receivedQuery)
+            self.assertEquals(response, receivedResponse)
+
+    def testTLSPipelining(self):
+        """
+        TLS: Several queries over the same connection without waiting for the responses
+        """
+        name = 'pipelining.tls.tests.powerdns.com.'
+        query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
+        response = dns.message.make_response(query)
+        rrset = dns.rrset.from_text(name,
+                                    3600,
+                                    dns.rdataclass.IN,
+                                    dns.rdatatype.A,
+                                    '127.0.0.1')
+        response.answer.append(rrset)
+
+        conn = self.openTLSConnection(self._tlsServerPort, self._serverName, self._caCert)
+
+        for idx in range(100):
+            self.sendTCPQueryOverConnection(conn, query, response=response)
+
+        for idx in range(100):
+            (receivedQuery, receivedResponse) = self.recvTCPResponseOverConnection(conn, useQueue=True)
+            self.assertTrue(receivedQuery)
+            self.assertTrue(receivedResponse)
+            receivedQuery.id = query.id
+            self.assertEquals(query, receivedQuery)
+            self.assertEquals(response, receivedResponse)