From: Remi Gacogne Date: Tue, 13 Apr 2021 14:44:48 +0000 (+0200) Subject: dnsdist: Add a regression test for DoH TC=1 fallback to TCP X-Git-Tag: dnsdist-1.7.0-alpha1~45^2~44 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=a0c1431dfb24f0c297ce4ad2031d7ea9065ef725;p=thirdparty%2Fpdns.git dnsdist: Add a regression test for DoH TC=1 fallback to TCP --- diff --git a/regression-tests.dnsdist/test_DOH.py b/regression-tests.dnsdist/test_DOH.py index 527090ac2e..20fe38fd1d 100644 --- a/regression-tests.dnsdist/test_DOH.py +++ b/regression-tests.dnsdist/test_DOH.py @@ -342,6 +342,49 @@ class TestDOH(DNSDistDOHTest): (_, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, caFile=self._caCert, query=query, response=None, useQueue=False) self.assertEqual(receivedResponse, expectedResponse) + def testTruncation(self): + """ + DOH: Truncation over UDP + """ + # the query is first forwarded over UDP, leading to a TC=1 answer from the + # backend, then over TCP + name = 'truncated-udp.doh.tests.powerdns.com.' + query = dns.message.make_query(name, 'A', 'IN') + query.id = 0 + expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096) + expectedQuery.id = 0 + response = dns.message.make_response(query) + rrset = dns.rrset.from_text(name, + 3600, + dns.rdataclass.IN, + dns.rdatatype.A, + '127.0.0.1') + response.answer.append(rrset) + + # first response is a TC=1 + tcResponse = dns.message.make_response(query) + tcResponse.flags |= dns.flags.TC + self._toResponderQueue.put(tcResponse, True, 2.0) + + (receivedQuery, receivedResponse) = self.sendDOHQuery(self._dohServerPort, self._serverName, self._dohBaseURL, query, caFile=self._caCert, response=response) + # first query, over UDP + self.assertTrue(receivedQuery) + receivedQuery.id = expectedQuery.id + self.assertEqual(expectedQuery, receivedQuery) + self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery) + + # check the response + self.assertTrue(receivedResponse) + self.assertEqual(response, receivedResponse) + + # second query, over TCP + receivedQuery = self._fromResponderQueue.get(True, 2.0) + self.assertTrue(receivedQuery) + receivedQuery.id = expectedQuery.id + self.assertEqual(expectedQuery, receivedQuery) + self.checkQueryEDNSWithoutECS(expectedQuery, receivedQuery) + + def testSpoof(self): """ DOH: Spoofed