]> git.ipfire.org Git - thirdparty/pdns.git/blobdiff - regression-tests.dnsdist/test_DOH.py
Merge pull request #8113 from rgacogne/ddist-tcp-outstanding
[thirdparty/pdns.git] / regression-tests.dnsdist / test_DOH.py
index f730821abaa00fe6bf967811a13728a1354a43fe..4943c96ec11443eb2e2b6befc8ea0843af571c4f 100644 (file)
@@ -12,8 +12,11 @@ import pycurl
 class DNSDistDOHTest(DNSDistTest):
 
     @classmethod
-    def getDOHGetURL(cls, baseurl, query):
-        wire = query.to_wire()
+    def getDOHGetURL(cls, baseurl, query, rawQuery=False):
+        if rawQuery:
+            wire = query
+        else:
+            wire = query.to_wire()
         param = base64.urlsafe_b64encode(wire).decode('UTF8').rstrip('=')
         return baseurl + "?dns=" + param
 
@@ -27,14 +30,49 @@ class DNSDistDOHTest(DNSDistTest):
         return conn
 
     @classmethod
-    def sendDOHQuery(cls, port, servername, baseurl, query, response=None, timeout=2.0, caFile=None, useQueue=True):
-        url = cls.getDOHGetURL(baseurl, query)
+    def sendDOHQuery(cls, port, servername, baseurl, query, response=None, timeout=2.0, caFile=None, useQueue=True, rawQuery=False, customHeaders=[]):
+        url = cls.getDOHGetURL(baseurl, query, rawQuery)
         conn = cls.openDOHConnection(port, caFile=caFile, timeout=timeout)
         #conn.setopt(pycurl.VERBOSE, True)
         conn.setopt(pycurl.URL, url)
         conn.setopt(pycurl.RESOLVE, ["%s:%d:127.0.0.1" % (servername, port)])
         conn.setopt(pycurl.SSL_VERIFYPEER, 1)
         conn.setopt(pycurl.SSL_VERIFYHOST, 2)
+        conn.setopt(pycurl.HTTPHEADER, customHeaders)
+        if caFile:
+            conn.setopt(pycurl.CAINFO, caFile)
+
+        if response:
+            cls._toResponderQueue.put(response, True, timeout)
+
+        receivedQuery = None
+        message = None
+        data = conn.perform_rb()
+        rcode = conn.getinfo(pycurl.RESPONSE_CODE)
+        if rcode == 200:
+            message = dns.message.from_wire(data)
+
+        if useQueue and not cls._fromResponderQueue.empty():
+            receivedQuery = cls._fromResponderQueue.get(True, timeout)
+
+        return (receivedQuery, message)
+
+    @classmethod
+    def sendDOHPostQuery(cls, port, servername, baseurl, query, response=None, timeout=2.0, caFile=None, useQueue=True, rawQuery=False):
+        url = baseurl
+        conn = cls.openDOHConnection(port, caFile=caFile, timeout=timeout)
+        #conn.setopt(pycurl.VERBOSE, True)
+        conn.setopt(pycurl.URL, url)
+        conn.setopt(pycurl.RESOLVE, ["%s:%d:127.0.0.1" % (servername, port)])
+        conn.setopt(pycurl.SSL_VERIFYPEER, 1)
+        conn.setopt(pycurl.SSL_VERIFYHOST, 2)
+        conn.setopt(pycurl.POST, True)
+        data = query
+        if not rawQuery:
+            data = data.to_wire()
+
+        conn.setopt(pycurl.POSTFIELDS, data)
+
         if caFile:
             conn.setopt(pycurl.CAINFO, caFile)
 
@@ -99,6 +137,8 @@ class TestDOH(DNSDistDOHTest):
     addAction("drop.doh.tests.powerdns.com.", DropAction())
     addAction("refused.doh.tests.powerdns.com.", RCodeAction(DNSRCode.REFUSED))
     addAction("spoof.doh.tests.powerdns.com.", SpoofAction("1.2.3.4"))
+    addAction(HTTPHeaderRule("X-PowerDNS", "^[a]{5}$"), SpoofAction("2.3.4.5"))
+    addAction(HTTPPathRule("/PowerDNS"), SpoofAction("3.4.5.6"))
     """
     _config_params = ['_testServerPort', '_dohServerPort', '_serverCert', '_serverKey']
 
@@ -127,6 +167,31 @@ class TestDOH(DNSDistDOHTest):
         self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery)
         self.assertEquals(response, receivedResponse)
 
+    def testDOHSimplePOST(self):
+        """
+        DOH: Simple POST query
+        """
+        name = 'simple-post.doh.tests.powerdns.com.'
+        query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
+        query.id = 0
+        expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
+        expectedQuery.id = 0
+        response = dns.message.make_response(query)
+        rrset = dns.rrset.from_text(name,
+                                    3600,
+                                    dns.rdataclass.IN,
+                                    dns.rdatatype.A,
+                                    '127.0.0.1')
+        response.answer.append(rrset)
+
+        (receivedQuery, receivedResponse) = self.sendDOHPostQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
+        self.assertTrue(receivedQuery)
+        self.assertTrue(receivedResponse)
+        receivedQuery.id = expectedQuery.id
+        self.assertEquals(expectedQuery, receivedQuery)
+        self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery)
+        self.assertEquals(response, receivedResponse)
+
     def testDOHExistingEDNS(self):
         """
         DOH: Existing EDNS
@@ -185,7 +250,6 @@ class TestDOH(DNSDistDOHTest):
         name = 'drop.doh.tests.powerdns.com.'
         query = dns.message.make_query(name, 'A', 'IN')
         (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, caFile=self._caCert, query=query, response=None, useQueue=False)
-        print(receivedResponse)
         self.assertEquals(receivedResponse, None)
 
     def testRefused(self):
@@ -220,6 +284,163 @@ class TestDOH(DNSDistDOHTest):
         (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, caFile=self._caCert, query=query, response=None, useQueue=False)
         self.assertEquals(receivedResponse, expectedResponse)
 
+    def testDOHInvalid(self):
+        """
+        DOH: Invalid query
+        """
+        name = 'invalid.doh.tests.powerdns.com.'
+        invalidQuery = dns.message.make_query(name, 'A', 'IN', use_edns=False)
+        invalidQuery.id = 0
+        # first an invalid query
+        invalidQuery = invalidQuery.to_wire()
+        invalidQuery = invalidQuery[:-5]
+        (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, caFile=self._caCert, query=invalidQuery, response=None, useQueue=False, rawQuery=True)
+        self.assertEquals(receivedResponse, None)
+
+        # and now a valid one
+        query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
+        query.id = 0
+        expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
+        expectedQuery.id = 0
+        response = dns.message.make_response(query)
+        rrset = dns.rrset.from_text(name,
+                                    3600,
+                                    dns.rdataclass.IN,
+                                    dns.rdatatype.A,
+                                    '127.0.0.1')
+        response.answer.append(rrset)
+        (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
+        self.assertTrue(receivedQuery)
+        self.assertTrue(receivedResponse)
+        receivedQuery.id = expectedQuery.id
+        self.assertEquals(expectedQuery, receivedQuery)
+        self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery)
+        self.assertEquals(response, receivedResponse)
+
+    def testDOHWithoutQuery(self):
+        """
+        DOH: Empty GET query
+        """
+        name = 'empty-get.doh.tests.powerdns.com.'
+        url = self._dohBaseURL
+        conn = self.openDOHConnection(self._dohServerPort, self._caCert, timeout=2.0)
+        conn.setopt(pycurl.URL, url)
+        conn.setopt(pycurl.RESOLVE, ["%s:%d:127.0.0.1" % (self._serverName, self._dohServerPort)])
+        conn.setopt(pycurl.SSL_VERIFYPEER, 1)
+        conn.setopt(pycurl.SSL_VERIFYHOST, 2)
+        conn.setopt(pycurl.CAINFO, self._caCert)
+        data = conn.perform_rb()
+        rcode = conn.getinfo(pycurl.RESPONSE_CODE)
+        self.assertEquals(rcode, 400)
+
+    def testDOHEmptyPOST(self):
+        """
+        DOH: Empty POST query
+        """
+        name = 'empty-post.doh.tests.powerdns.com.'
+
+        (_, receivedResponse) = self.sendDOHPostQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query="", rawQuery=True, response=None, caFile=self._caCert)
+        self.assertEquals(receivedResponse, None)
+
+        # and now a valid one
+        query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
+        query.id = 0
+        expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
+        expectedQuery.id = 0
+        response = dns.message.make_response(query)
+        rrset = dns.rrset.from_text(name,
+                                    3600,
+                                    dns.rdataclass.IN,
+                                    dns.rdatatype.A,
+                                    '127.0.0.1')
+        response.answer.append(rrset)
+        (receivedQuery, receivedResponse) = self.sendDOHPostQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
+        self.assertTrue(receivedQuery)
+        self.assertTrue(receivedResponse)
+        receivedQuery.id = expectedQuery.id
+        self.assertEquals(expectedQuery, receivedQuery)
+        self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery)
+        self.assertEquals(response, receivedResponse)
+
+    def testHeaderRule(self):
+        """
+        DOH: HeaderRule
+        """
+        name = 'header-rule.doh.tests.powerdns.com.'
+        query = dns.message.make_query(name, 'A', 'IN')
+        query.id = 0
+        query.flags &= ~dns.flags.RD
+        expectedResponse = dns.message.make_response(query)
+        rrset = dns.rrset.from_text(name,
+                                    3600,
+                                    dns.rdataclass.IN,
+                                    dns.rdatatype.A,
+                                    '2.3.4.5')
+        expectedResponse.answer.append(rrset)
+
+        # this header should match
+        (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, caFile=self._caCert, query=query, response=None, useQueue=False, customHeaders=['x-powerdnS: aaaaa'])
+        self.assertEquals(receivedResponse, expectedResponse)
+
+        expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
+        expectedQuery.flags &= ~dns.flags.RD
+        expectedQuery.id = 0
+        response = dns.message.make_response(query)
+        rrset = dns.rrset.from_text(name,
+                                    3600,
+                                    dns.rdataclass.IN,
+                                    dns.rdatatype.A,
+                                    '127.0.0.1')
+        response.answer.append(rrset)
+
+        # this content of the header should NOT match
+        (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert, customHeaders=['x-powerdnS: bbbbb'])
+        self.assertTrue(receivedQuery)
+        self.assertTrue(receivedResponse)
+        receivedQuery.id = expectedQuery.id
+        self.assertEquals(expectedQuery, receivedQuery)
+        self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery)
+        self.assertEquals(response, receivedResponse)
+
+    def testHTTPPath(self):
+        """
+        DOH: HTTPPath
+        """
+        name = 'http-path.doh.tests.powerdns.com.'
+        query = dns.message.make_query(name, 'A', 'IN')
+        query.id = 0
+        query.flags &= ~dns.flags.RD
+        expectedResponse = dns.message.make_response(query)
+        rrset = dns.rrset.from_text(name,
+                                    3600,
+                                    dns.rdataclass.IN,
+                                    dns.rdatatype.A,
+                                    '3.4.5.6')
+        expectedResponse.answer.append(rrset)
+
+        # this path should match
+        (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL + 'PowerDNS', caFile=self._caCert, query=query, response=None, useQueue=False)
+        self.assertEquals(receivedResponse, expectedResponse)
+
+        expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
+        expectedQuery.id = 0
+        expectedQuery.flags &= ~dns.flags.RD
+        response = dns.message.make_response(query)
+        rrset = dns.rrset.from_text(name,
+                                    3600,
+                                    dns.rdataclass.IN,
+                                    dns.rdatatype.A,
+                                    '127.0.0.1')
+        response.answer.append(rrset)
+
+        # this path should NOT match
+        (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL + "PowerDNS2", query, response=response, caFile=self._caCert)
+        self.assertTrue(receivedQuery)
+        self.assertTrue(receivedResponse)
+        receivedQuery.id = expectedQuery.id
+        self.assertEquals(expectedQuery, receivedQuery)
+        self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery)
+        self.assertEquals(response, receivedResponse)
 
 class TestDOHAddingECS(DNSDistDOHTest):