]> git.ipfire.org Git - thirdparty/pdns.git/commitdiff
dnsdist: Add a few tests using POST queries for DoH
authorRemi Gacogne <remi.gacogne@powerdns.com>
Mon, 13 May 2019 12:51:06 +0000 (14:51 +0200)
committerRemi Gacogne <remi.gacogne@powerdns.com>
Mon, 13 May 2019 12:51:06 +0000 (14:51 +0200)
regression-tests.dnsdist/test_DOH.py

index 69c4ea53f922a9bef1966fe50e67899989c9945c..4f2012d247520fb8e8108b40b7f97a551f7e379e 100644 (file)
@@ -56,6 +56,40 @@ class DNSDistDOHTest(DNSDistTest):
 
         return (receivedQuery, message)
 
+    @classmethod
+    def sendDOHPostQuery(cls, port, servername, baseurl, query, response=None, timeout=2.0, caFile=None, useQueue=True, rawQuery=False):
+        url = baseurl
+        conn = cls.openDOHConnection(port, caFile=caFile, timeout=timeout)
+        #conn.setopt(pycurl.VERBOSE, True)
+        conn.setopt(pycurl.URL, url)
+        conn.setopt(pycurl.RESOLVE, ["%s:%d:127.0.0.1" % (servername, port)])
+        conn.setopt(pycurl.SSL_VERIFYPEER, 1)
+        conn.setopt(pycurl.SSL_VERIFYHOST, 2)
+        conn.setopt(pycurl.POST, True)
+        data = query
+        if not rawQuery:
+            data = data.to_wire()
+
+        conn.setopt(pycurl.POSTFIELDS, data)
+
+        if caFile:
+            conn.setopt(pycurl.CAINFO, caFile)
+
+        if response:
+            cls._toResponderQueue.put(response, True, timeout)
+
+        receivedQuery = None
+        message = None
+        data = conn.perform_rb()
+        rcode = conn.getinfo(pycurl.RESPONSE_CODE)
+        if rcode == 200:
+            message = dns.message.from_wire(data)
+
+        if useQueue and not cls._fromResponderQueue.empty():
+            receivedQuery = cls._fromResponderQueue.get(True, timeout)
+
+        return (receivedQuery, message)
+
 #     @classmethod
 #     def openDOHConnection(cls, port, caFile, timeout=2.0):
 #         sslctx = SSLContext(PROTOCOL_TLSv1_2)
@@ -130,6 +164,31 @@ class TestDOH(DNSDistDOHTest):
         self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery)
         self.assertEquals(response, receivedResponse)
 
+    def testDOHSimplePOST(self):
+        """
+        DOH: Simple POST query
+        """
+        name = 'simple-post.doh.tests.powerdns.com.'
+        query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
+        query.id = 0
+        expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
+        expectedQuery.id = 0
+        response = dns.message.make_response(query)
+        rrset = dns.rrset.from_text(name,
+                                    3600,
+                                    dns.rdataclass.IN,
+                                    dns.rdatatype.A,
+                                    '127.0.0.1')
+        response.answer.append(rrset)
+
+        (receivedQuery, receivedResponse) = self.sendDOHPostQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
+        self.assertTrue(receivedQuery)
+        self.assertTrue(receivedResponse)
+        receivedQuery.id = expectedQuery.id
+        self.assertEquals(expectedQuery, receivedQuery)
+        self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery)
+        self.assertEquals(response, receivedResponse)
+
     def testDOHExistingEDNS(self):
         """
         DOH: Existing EDNS
@@ -256,6 +315,35 @@ class TestDOH(DNSDistDOHTest):
         self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery)
         self.assertEquals(response, receivedResponse)
 
+    def testDOHEmptyPOST(self):
+        """
+        DOH: Empty POST query
+        """
+        name = 'empty-post.doh.tests.powerdns.com.'
+
+        (_, receivedResponse) = self.sendDOHPostQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query="", rawQuery=True, response=None, caFile=self._caCert)
+        self.assertEquals(receivedResponse, None)
+
+        # and now a valid one
+        query = dns.message.make_query(name, 'A', 'IN', use_edns=False)
+        query.id = 0
+        expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096)
+        expectedQuery.id = 0
+        response = dns.message.make_response(query)
+        rrset = dns.rrset.from_text(name,
+                                    3600,
+                                    dns.rdataclass.IN,
+                                    dns.rdatatype.A,
+                                    '127.0.0.1')
+        response.answer.append(rrset)
+        (receivedQuery, receivedResponse) = self.sendDOHPostQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, response=response, caFile=self._caCert)
+        self.assertTrue(receivedQuery)
+        self.assertTrue(receivedResponse)
+        receivedQuery.id = expectedQuery.id
+        self.assertEquals(expectedQuery, receivedQuery)
+        self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery)
+        self.assertEquals(response, receivedResponse)
+
 class TestDOHAddingECS(DNSDistDOHTest):
 
     _serverKey = 'server.key'