From: Remi Gacogne Date: Thu, 21 Feb 2019 16:33:08 +0000 (+0100) Subject: Merge branch 'master' into empty_response X-Git-Tag: auth-4.2.0-beta1~4^2 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=refs%2Fpull%2F7431%2Fhead;p=thirdparty%2Fpdns.git Merge branch 'master' into empty_response --- 02b3e91978968e2812d8678786c815c2de5c5226 diff --cc regression-tests.dnsdist/test_Advanced.py index 072a7e0115,cf93eff2b9..04adcc0e06 --- a/regression-tests.dnsdist/test_Advanced.py +++ b/regression-tests.dnsdist/test_Advanced.py @@@ -1714,63 -1714,60 +1714,120 @@@ class TestAdvancedEDNSOptionRule(DNSDis receivedQuery.id = query.id self.assertEquals(query, receivedQuery) self.assertEquals(receivedResponse, response) + +class TestAdvancedAllowHeaderOnly(DNSDistTest): + + _config_template = """ + newServer{address="127.0.0.1:%s"} + setAllowEmptyResponse(true) + """ + + def testHeaderOnlyRefused(self): + """ + Advanced: Header-only refused response + """ + name = 'header-only-refused-response.advanced.tests.powerdns.com.' + query = dns.message.make_query(name, 'A', 'IN') + response = dns.message.make_response(query) + response.set_rcode(dns.rcode.REFUSED) + response.question = [] + + for method in ("sendUDPQuery", "sendTCPQuery"): + sender = getattr(self, method) + (receivedQuery, receivedResponse) = sender(query, response) + self.assertTrue(receivedQuery) + receivedQuery.id = query.id + self.assertEquals(query, receivedQuery) + self.assertEquals(receivedResponse, response) + + def testHeaderOnlyNoErrorResponse(self): + """ + Advanced: Header-only NoError response should be allowed + """ + name = 'header-only-noerror-response.advanced.tests.powerdns.com.' + query = dns.message.make_query(name, 'A', 'IN') + response = dns.message.make_response(query) + response.question = [] + + for method in ("sendUDPQuery", "sendTCPQuery"): + sender = getattr(self, method) + (receivedQuery, receivedResponse) = sender(query, response) + self.assertTrue(receivedQuery) + receivedQuery.id = query.id + self.assertEquals(query, receivedQuery) + self.assertEquals(receivedResponse, response) + + def testHeaderOnlyNXDResponse(self): + """ + Advanced: Header-only NXD response should be allowed + """ + name = 'header-only-nxd-response.advanced.tests.powerdns.com.' + query = dns.message.make_query(name, 'A', 'IN') + response = dns.message.make_response(query) + response.set_rcode(dns.rcode.NXDOMAIN) + response.question = [] + + for method in ("sendUDPQuery", "sendTCPQuery"): + sender = getattr(self, method) + (receivedQuery, receivedResponse) = sender(query, response) + self.assertTrue(receivedQuery) + receivedQuery.id = query.id + self.assertEquals(query, receivedQuery) + self.assertEquals(receivedResponse, response) + + class TestAdvancedEDNSVersionnRule(DNSDistTest): + + _config_template = """ + newServer{address="127.0.0.1:%s"} + addAction(EDNSVersionRule(0), ERCodeAction(dnsdist.BADVERS)) + """ + + def testDropped(self): + """ + Advanced: A question with ECS version larger than 0 is dropped + """ + + name = 'ednsversionrule.advanced.tests.powerdns.com.' + + query = dns.message.make_query(name, 'A', 'IN', use_edns=1) + expectedResponse = dns.message.make_response(query) + expectedResponse.set_rcode(dns.rcode.BADVERS) + + for method in ("sendUDPQuery", "sendTCPQuery"): + sender = getattr(self, method) + (_, receivedResponse) = sender(query, response=None) + self.assertEquals(receivedResponse, expectedResponse) + + def testNoEDNS0Pass(self): + """ + Advanced: A question with ECS version 0 goes through + """ + + name = 'ednsversionrule.advanced.tests.powerdns.com.' + + query = dns.message.make_query(name, 'A', 'IN', use_edns=True) + response = dns.message.make_response(query) + + for method in ("sendUDPQuery", "sendTCPQuery"): + sender = getattr(self, method) + (receivedQuery, receivedResponse) = sender(query, response) + receivedQuery.id = query.id + self.assertEquals(query, receivedQuery) + self.assertEquals(receivedResponse, response) + + def testReplied(self): + """ + Advanced: A question without ECS goes through + """ + + name = 'ednsoptionrule.advanced.tests.powerdns.com.' + + query = dns.message.make_query(name, 'A', 'IN', use_edns=False) + response = dns.message.make_response(query) + + for method in ("sendUDPQuery", "sendTCPQuery"): + sender = getattr(self, method) + (receivedQuery, receivedResponse) = sender(query, response) + receivedQuery.id = query.id + self.assertEquals(query, receivedQuery) + self.assertEquals(receivedResponse, response)