From: Samir Aguiar Date: Wed, 28 May 2025 01:21:42 +0000 (+0000) Subject: dnsdist: Add a test for SetEDNSOptionResponseAction X-Git-Tag: dnsdist-2.0.0-beta1~47^2~4 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=7038248b311eeda73a6f746cf5d532028dd02484;p=thirdparty%2Fpdns.git dnsdist: Add a test for SetEDNSOptionResponseAction --- diff --git a/regression-tests.dnsdist/test_Responses.py b/regression-tests.dnsdist/test_Responses.py index 26c1042c0a..e6d209b900 100644 --- a/regression-tests.dnsdist/test_Responses.py +++ b/regression-tests.dnsdist/test_Responses.py @@ -2,6 +2,7 @@ from datetime import datetime, timedelta import time import dns +import cookiesoption from dnsdisttests import DNSDistTest class TestResponseRuleNXDelayed(DNSDistTest): @@ -566,3 +567,91 @@ class TestResponseRewriteServFail(DNSDistTest): receivedQuery.id = query.id self.assertEqual(query, receivedQuery) self.assertEqual(response, receivedResponse) + +class TestAdvancedSetEDNSOptionResponseAction(DNSDistTest): + _config_template = """ + addResponseAction(AllRule(), SetEDNSOptionResponseAction(10, "deadbeefdeadc0de")) + newServer{address="127.0.0.1:%s"} + """ + + def testAdvancedSetEDNSOptionResponse(self): + """ + Responses: Set EDNS Option in response + """ + name = 'setednsoptionresponse.responses.tests.powerdns.com.' + query = dns.message.make_query(name, 'A', 'IN') + response = dns.message.make_response(query) + rrset = dns.rrset.from_text(name, + 3600, + dns.rdataclass.IN, + dns.rdatatype.A, + '127.0.0.1') + response.answer.append(rrset) + + eco = cookiesoption.CookiesOption(b'deadbeef', b'deadc0de') + expectedResponse = dns.message.make_response(query, use_edns=True, payload=512, options=[eco]) + expectedResponse.answer.append(rrset) + + for method in ("sendUDPQuery", "sendTCPQuery"): + sender = getattr(self, method) + (receivedQuery, receivedResponse) = sender(query, response) + self.assertTrue(receivedQuery) + self.assertTrue(receivedResponse) + receivedQuery.id = query.id + self.assertEqual(query, receivedQuery) + self.checkResponseEDNS(expectedResponse, receivedResponse) + + def testAdvancedSetEDNSOptionResponseOverwrite(self): + """ + Responses: Set EDNS Option in response overwrites existing option + """ + name = 'setednsoptionresponse-overwrite.responses.tests.powerdns.com.' + initialECO = cookiesoption.CookiesOption(b'aaaaaaaa', b'bbbbbbbb') + query = dns.message.make_query(name, 'A', 'IN') + response = dns.message.make_response(query, use_edns=True, payload=512, options=[initialECO]) + rrset = dns.rrset.from_text(name, + 3600, + dns.rdataclass.IN, + dns.rdatatype.A, + '127.0.0.1') + response.answer.append(rrset) + + overWrittenECO = cookiesoption.CookiesOption(b'deadbeef', b'deadc0de') + expectedResponse = dns.message.make_response(query, use_edns=True, payload=512, options=[overWrittenECO]) + expectedResponse.answer.append(rrset) + + for method in ("sendUDPQuery", "sendTCPQuery"): + sender = getattr(self, method) + (receivedQuery, receivedResponse) = sender(query, response) + self.assertTrue(receivedQuery) + self.assertTrue(receivedResponse) + receivedQuery.id = query.id + self.assertEqual(query, receivedQuery) + self.checkResponseEDNS(expectedResponse, receivedResponse) + + def testAdvancedSetEDNSOptionResponseWithDOSet(self): + """ + Responses: Set EDNS Option in response (DO bit set) + """ + name = 'setednsoptionresponse-do.responses.tests.powerdns.com.' + query = dns.message.make_query(name, 'A', 'IN', use_edns=True, want_dnssec=True, payload=4096) + response = dns.message.make_response(query) + rrset = dns.rrset.from_text(name, + 3600, + dns.rdataclass.IN, + dns.rdatatype.A, + '127.0.0.1') + response.answer.append(rrset) + + eco = cookiesoption.CookiesOption(b'deadbeef', b'deadc0de') + expectedResponse = dns.message.make_response(query, use_edns=True, payload=4096, options=[eco], want_dnssec=True) + expectedResponse.answer.append(rrset) + + for method in ("sendUDPQuery", "sendTCPQuery"): + sender = getattr(self, method) + (receivedQuery, receivedResponse) = sender(query, response) + self.assertTrue(receivedQuery) + self.assertTrue(receivedResponse) + receivedQuery.id = query.id + self.assertEqual(query, receivedQuery) + self.checkResponseEDNS(expectedResponse, receivedResponse)