X-Git-Url: http://git.ipfire.org/?a=blobdiff_plain;f=regression-tests.dnsdist%2Ftest_EDNSOptions.py;h=0315ed589dbc4ac32e60ea32ec2ae8bf1e28e8a0;hb=26c434d8b0fd218f87f034171eb87e2242199579;hp=f5838bf40ce1efc50a58ff6352dcd06f7c76b372;hpb=bd0331e8e7350ece8ca969e8f62d8d992d005ed7;p=thirdparty%2Fpdns.git diff --git a/regression-tests.dnsdist/test_EDNSOptions.py b/regression-tests.dnsdist/test_EDNSOptions.py index f5838bf40c..0315ed589d 100644 --- a/regression-tests.dnsdist/test_EDNSOptions.py +++ b/regression-tests.dnsdist/test_EDNSOptions.py @@ -23,17 +23,17 @@ class EDNSOptionsBase(DNSDistTest): if options[EDNSOptionCode.COOKIE]:count() ~= 2 then return DNSAction.Spoof, "192.0.2.2" end - if options[EDNSOptionCode.COOKIE]:getValues()[0]:len() ~= 16 then + if options[EDNSOptionCode.COOKIE]:getValues()[1]:len() ~= 16 then return DNSAction.Spoof, "192.0.2.3" end - if options[EDNSOptionCode.COOKIE]:getValues()[1]:len() ~= 16 then + if options[EDNSOptionCode.COOKIE]:getValues()[2]:len() ~= 16 then return DNSAction.Spoof, "192.0.2.4" end elseif string.match(qname, 'cookie') then if options[EDNSOptionCode.COOKIE] == nil then return DNSAction.Spoof, "192.0.2.1" end - if options[EDNSOptionCode.COOKIE]:count() ~= 1 or options[EDNSOptionCode.COOKIE]:getValues()[0]:len() ~= 16 then + if options[EDNSOptionCode.COOKIE]:count() ~= 1 or options[EDNSOptionCode.COOKIE]:getValues()[1]:len() ~= 16 then return DNSAction.Spoof, "192.0.2.2" end end @@ -42,7 +42,7 @@ class EDNSOptionsBase(DNSDistTest): if options[EDNSOptionCode.ECS] == nil then return DNSAction.Spoof, "192.0.2.51" end - if options[EDNSOptionCode.ECS]:count() ~= 1 or options[EDNSOptionCode.ECS]:getValues()[0]:len() ~= 8 then + if options[EDNSOptionCode.ECS]:count() ~= 1 or options[EDNSOptionCode.ECS]:getValues()[1]:len() ~= 8 then return DNSAction.Spoof, "192.0.2.52" end end @@ -51,7 +51,7 @@ class EDNSOptionsBase(DNSDistTest): if options[EDNSOptionCode.ECS] == nil then return DNSAction.Spoof, "192.0.2.101" end - if options[EDNSOptionCode.ECS]:count() ~= 1 or options[EDNSOptionCode.ECS]:getValues()[0]:len() ~= 20 then + if options[EDNSOptionCode.ECS]:count() ~= 1 or options[EDNSOptionCode.ECS]:getValues()[1]:len() ~= 20 then return DNSAction.Spoof, "192.0.2.102" end end @@ -66,7 +66,7 @@ class TestEDNSOptions(EDNSOptionsBase): _config_template = """ %s - addLuaAction(AllRule(), testEDNSOptions) + addAction(AllRule(), LuaAction(testEDNSOptions)) newServer{address="127.0.0.1:%s"} """ @@ -86,26 +86,21 @@ class TestEDNSOptions(EDNSOptionsBase): '192.0.2.255') response.answer.append(rrset) - (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response) - self.assertTrue(receivedQuery) - self.assertTrue(receivedResponse) - receivedQuery.id = query.id - self.assertEquals(receivedQuery, query) - self.assertEquals(receivedResponse, response) - - (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response) - self.assertTrue(receivedQuery) - self.assertTrue(receivedResponse) - receivedQuery.id = query.id - self.assertEquals(receivedQuery, query) - self.assertEquals(receivedResponse, response) + for method in ("sendUDPQuery", "sendTCPQuery"): + sender = getattr(self, method) + (receivedQuery, receivedResponse) = sender(query, response) + self.assertTrue(receivedQuery) + self.assertTrue(receivedResponse) + receivedQuery.id = query.id + self.assertEquals(receivedQuery, query) + self.assertEquals(receivedResponse, response) def testCookie(self): """ EDNS Options: Cookie """ name = 'cookie.ednsoptions.tests.powerdns.com.' - eco = cookiesoption.CookiesOption('deadbeef', 'deadbeef') + eco = cookiesoption.CookiesOption(b'deadbeef', b'deadbeef') query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[eco]) response = dns.message.make_response(query) rrset = dns.rrset.from_text(name, @@ -115,19 +110,14 @@ class TestEDNSOptions(EDNSOptionsBase): '127.0.0.1') response.answer.append(rrset) - (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response) - self.assertTrue(receivedQuery) - self.assertTrue(receivedResponse) - receivedQuery.id = query.id - self.assertEquals(receivedQuery, query) - self.assertEquals(receivedResponse, response) - - (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response) - self.assertTrue(receivedQuery) - self.assertTrue(receivedResponse) - receivedQuery.id = query.id - self.assertEquals(receivedQuery, query) - self.assertEquals(receivedResponse, response) + for method in ("sendUDPQuery", "sendTCPQuery"): + sender = getattr(self, method) + (receivedQuery, receivedResponse) = sender(query, response) + self.assertTrue(receivedQuery) + self.assertTrue(receivedResponse) + receivedQuery.id = query.id + self.assertEquals(receivedQuery, query) + self.assertEquals(receivedResponse, response) def testECS4(self): """ @@ -144,19 +134,14 @@ class TestEDNSOptions(EDNSOptionsBase): '127.0.0.1') response.answer.append(rrset) - (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response) - self.assertTrue(receivedQuery) - self.assertTrue(receivedResponse) - receivedQuery.id = query.id - self.assertEquals(receivedQuery, query) - self.assertEquals(receivedResponse, response) - - (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response) - self.assertTrue(receivedQuery) - self.assertTrue(receivedResponse) - receivedQuery.id = query.id - self.assertEquals(receivedQuery, query) - self.assertEquals(receivedResponse, response) + for method in ("sendUDPQuery", "sendTCPQuery"): + sender = getattr(self, method) + (receivedQuery, receivedResponse) = sender(query, response) + self.assertTrue(receivedQuery) + self.assertTrue(receivedResponse) + receivedQuery.id = query.id + self.assertEquals(receivedQuery, query) + self.assertEquals(receivedResponse, response) def testECS6(self): """ @@ -173,26 +158,21 @@ class TestEDNSOptions(EDNSOptionsBase): '127.0.0.1') response.answer.append(rrset) - (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response) - self.assertTrue(receivedQuery) - self.assertTrue(receivedResponse) - receivedQuery.id = query.id - self.assertEquals(receivedQuery, query) - self.assertEquals(receivedResponse, response) - - (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response) - self.assertTrue(receivedQuery) - self.assertTrue(receivedResponse) - receivedQuery.id = query.id - self.assertEquals(receivedQuery, query) - self.assertEquals(receivedResponse, response) + for method in ("sendUDPQuery", "sendTCPQuery"): + sender = getattr(self, method) + (receivedQuery, receivedResponse) = sender(query, response) + self.assertTrue(receivedQuery) + self.assertTrue(receivedResponse) + receivedQuery.id = query.id + self.assertEquals(receivedQuery, query) + self.assertEquals(receivedResponse, response) def testECS6Cookie(self): """ EDNS Options: Cookie + ECS6 """ name = 'cookie-ecs6.ednsoptions.tests.powerdns.com.' - eco = cookiesoption.CookiesOption('deadbeef', 'deadbeef') + eco = cookiesoption.CookiesOption(b'deadbeef', b'deadbeef') ecso = clientsubnetoption.ClientSubnetOption('2001:DB8::1', 128) query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso,eco]) response = dns.message.make_response(query) @@ -203,28 +183,23 @@ class TestEDNSOptions(EDNSOptionsBase): '127.0.0.1') response.answer.append(rrset) - (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response) - self.assertTrue(receivedQuery) - self.assertTrue(receivedResponse) - receivedQuery.id = query.id - self.assertEquals(receivedQuery, query) - self.assertEquals(receivedResponse, response) - - (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response) - self.assertTrue(receivedQuery) - self.assertTrue(receivedResponse) - receivedQuery.id = query.id - self.assertEquals(receivedQuery, query) - self.assertEquals(receivedResponse, response) + for method in ("sendUDPQuery", "sendTCPQuery"): + sender = getattr(self, method) + (receivedQuery, receivedResponse) = sender(query, response) + self.assertTrue(receivedQuery) + self.assertTrue(receivedResponse) + receivedQuery.id = query.id + self.assertEquals(receivedQuery, query) + self.assertEquals(receivedResponse, response) def testMultiCookiesECS6(self): """ EDNS Options: Two Cookies + ECS6 """ name = 'multiplecookies-ecs6.ednsoptions.tests.powerdns.com.' - eco1 = cookiesoption.CookiesOption('deadbeef', 'deadbeef') + eco1 = cookiesoption.CookiesOption(b'deadbeef', b'deadbeef') ecso = clientsubnetoption.ClientSubnetOption('2001:DB8::1', 128) - eco2 = cookiesoption.CookiesOption('deadc0de', 'deadc0de') + eco2 = cookiesoption.CookiesOption(b'deadc0de', b'deadc0de') query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[eco1, ecso, eco2]) response = dns.message.make_response(query) rrset = dns.rrset.from_text(name, @@ -234,26 +209,21 @@ class TestEDNSOptions(EDNSOptionsBase): '127.0.0.1') response.answer.append(rrset) - (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response) - self.assertTrue(receivedQuery) - self.assertTrue(receivedResponse) - receivedQuery.id = query.id - self.assertEquals(receivedQuery, query) - self.assertEquals(receivedResponse, response) - - (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response) - self.assertTrue(receivedQuery) - self.assertTrue(receivedResponse) - receivedQuery.id = query.id - self.assertEquals(receivedQuery, query) - self.assertEquals(receivedResponse, response) + for method in ("sendUDPQuery", "sendTCPQuery"): + sender = getattr(self, method) + (receivedQuery, receivedResponse) = sender(query, response) + self.assertTrue(receivedQuery) + self.assertTrue(receivedResponse) + receivedQuery.id = query.id + self.assertEquals(receivedQuery, query) + self.assertEquals(receivedResponse, response) class TestEDNSOptionsAddingECS(EDNSOptionsBase): _config_template = """ %s - addLuaAction(AllRule(), testEDNSOptions) + addAction(AllRule(), LuaAction(testEDNSOptions)) newServer{address="127.0.0.1:%s", useClientSubnet=true} """ @@ -275,27 +245,22 @@ class TestEDNSOptionsAddingECS(EDNSOptionsBase): '127.0.0.1') response.answer.append(rrset) - (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response) - self.assertTrue(receivedQuery) - self.assertTrue(receivedResponse) - receivedQuery.id = expectedQuery.id - self.checkQueryEDNSWithECS(expectedQuery, receivedQuery) - self.checkResponseNoEDNS(response, receivedResponse) - - (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response) - self.assertTrue(receivedQuery) - self.assertTrue(receivedResponse) - receivedQuery.id = expectedQuery.id - self.checkQueryEDNSWithECS(expectedQuery, receivedQuery) - self.checkResponseNoEDNS(response, receivedResponse) + for method in ("sendUDPQuery", "sendTCPQuery"): + sender = getattr(self, method) + (receivedQuery, receivedResponse) = sender(query, response) + self.assertTrue(receivedQuery) + self.assertTrue(receivedResponse) + receivedQuery.id = expectedQuery.id + self.checkQueryEDNSWithECS(expectedQuery, receivedQuery) + self.checkResponseNoEDNS(response, receivedResponse) def testCookie(self): """ EDNS Options: Cookie (adding ECS) """ name = 'cookie.ednsoptions-ecs.tests.powerdns.com.' - eco = cookiesoption.CookiesOption('deadbeef', 'deadbeef') - query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[eco]) + eco = cookiesoption.CookiesOption(b'deadbeef', b'deadbeef') + query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=512, options=[eco]) ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24) expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[eco,ecso], payload=512) response = dns.message.make_response(query) @@ -306,19 +271,14 @@ class TestEDNSOptionsAddingECS(EDNSOptionsBase): '127.0.0.1') response.answer.append(rrset) - (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response) - self.assertTrue(receivedQuery) - self.assertTrue(receivedResponse) - receivedQuery.id = expectedQuery.id - self.checkQueryEDNSWithECS(expectedQuery, receivedQuery, 1) - self.checkResponseEDNSWithoutECS(response, receivedResponse) - - (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response) - self.assertTrue(receivedQuery) - self.assertTrue(receivedResponse) - receivedQuery.id = expectedQuery.id - self.checkQueryEDNSWithECS(expectedQuery, receivedQuery, 1) - self.checkResponseEDNSWithoutECS(response, receivedResponse) + for method in ("sendUDPQuery", "sendTCPQuery"): + sender = getattr(self, method) + (receivedQuery, receivedResponse) = sender(query, response) + self.assertTrue(receivedQuery) + self.assertTrue(receivedResponse) + receivedQuery.id = expectedQuery.id + self.checkQueryEDNSWithECS(expectedQuery, receivedQuery, 1) + self.checkResponseEDNSWithoutECS(response, receivedResponse) def testECS4(self): """ @@ -337,19 +297,14 @@ class TestEDNSOptionsAddingECS(EDNSOptionsBase): '127.0.0.1') response.answer.append(rrset) - (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response) - self.assertTrue(receivedQuery) - self.assertTrue(receivedResponse) - receivedQuery.id = query.id - self.checkQueryEDNSWithECS(query, receivedQuery) - self.checkResponseEDNSWithECS(response, receivedResponse) - - (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response) - self.assertTrue(receivedQuery) - self.assertTrue(receivedResponse) - receivedQuery.id = query.id - self.checkQueryEDNSWithECS(query, receivedQuery) - self.checkResponseEDNSWithECS(response, receivedResponse) + for method in ("sendUDPQuery", "sendTCPQuery"): + sender = getattr(self, method) + (receivedQuery, receivedResponse) = sender(query, response) + self.assertTrue(receivedQuery) + self.assertTrue(receivedResponse) + receivedQuery.id = query.id + self.checkQueryEDNSWithECS(query, receivedQuery) + self.checkResponseEDNSWithECS(response, receivedResponse) def testECS6(self): """ @@ -368,26 +323,21 @@ class TestEDNSOptionsAddingECS(EDNSOptionsBase): '127.0.0.1') response.answer.append(rrset) - (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response) - self.assertTrue(receivedQuery) - self.assertTrue(receivedResponse) - receivedQuery.id = query.id - self.checkQueryEDNSWithECS(query, receivedQuery) - self.checkResponseEDNSWithECS(response, receivedResponse) - - (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response) - self.assertTrue(receivedQuery) - self.assertTrue(receivedResponse) - receivedQuery.id = query.id - self.checkQueryEDNSWithECS(query, receivedQuery) - self.checkResponseEDNSWithECS(response, receivedResponse) + for method in ("sendUDPQuery", "sendTCPQuery"): + sender = getattr(self, method) + (receivedQuery, receivedResponse) = sender(query, response) + self.assertTrue(receivedQuery) + self.assertTrue(receivedResponse) + receivedQuery.id = query.id + self.checkQueryEDNSWithECS(query, receivedQuery) + self.checkResponseEDNSWithECS(response, receivedResponse) def testECS6Cookie(self): """ EDNS Options: Cookie + ECS6 (adding ECS) """ name = 'cookie-ecs6.ednsoptions-ecs.tests.powerdns.com.' - eco = cookiesoption.CookiesOption('deadbeef', 'deadbeef') + eco = cookiesoption.CookiesOption(b'deadbeef', b'deadbeef') ecso = clientsubnetoption.ClientSubnetOption('2001:DB8::1', 128) query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso,eco]) ecsoResponse = clientsubnetoption.ClientSubnetOption('2001:DB8::1', 128, scope=56) @@ -400,28 +350,23 @@ class TestEDNSOptionsAddingECS(EDNSOptionsBase): '127.0.0.1') response.answer.append(rrset) - (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response) - self.assertTrue(receivedQuery) - self.assertTrue(receivedResponse) - receivedQuery.id = query.id - self.checkQueryEDNSWithECS(query, receivedQuery, 1) - self.checkResponseEDNSWithECS(response, receivedResponse) - - (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response) - self.assertTrue(receivedQuery) - self.assertTrue(receivedResponse) - receivedQuery.id = query.id - self.checkQueryEDNSWithECS(query, receivedQuery, 1) - self.checkResponseEDNSWithECS(response, receivedResponse) + for method in ("sendUDPQuery", "sendTCPQuery"): + sender = getattr(self, method) + (receivedQuery, receivedResponse) = sender(query, response) + self.assertTrue(receivedQuery) + self.assertTrue(receivedResponse) + receivedQuery.id = query.id + self.checkQueryEDNSWithECS(query, receivedQuery, 1) + self.checkResponseEDNSWithECS(response, receivedResponse) def testMultiCookiesECS6(self): """ EDNS Options: Two Cookies + ECS6 """ name = 'multiplecookies-ecs6.ednsoptions.tests.powerdns.com.' - eco1 = cookiesoption.CookiesOption('deadbeef', 'deadbeef') + eco1 = cookiesoption.CookiesOption(b'deadbeef', b'deadbeef') ecso = clientsubnetoption.ClientSubnetOption('2001:DB8::1', 128) - eco2 = cookiesoption.CookiesOption('deadc0de', 'deadc0de') + eco2 = cookiesoption.CookiesOption(b'deadc0de', b'deadc0de') query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[eco1, ecso, eco2]) response = dns.message.make_response(query) rrset = dns.rrset.from_text(name, @@ -431,16 +376,11 @@ class TestEDNSOptionsAddingECS(EDNSOptionsBase): '127.0.0.1') response.answer.append(rrset) - (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response) - self.assertTrue(receivedQuery) - self.assertTrue(receivedResponse) - receivedQuery.id = query.id - self.assertEquals(receivedQuery, query) - self.assertEquals(receivedResponse, response) - - (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response) - self.assertTrue(receivedQuery) - self.assertTrue(receivedResponse) - receivedQuery.id = query.id - self.assertEquals(receivedQuery, query) - self.assertEquals(receivedResponse, response) + for method in ("sendUDPQuery", "sendTCPQuery"): + sender = getattr(self, method) + (receivedQuery, receivedResponse) = sender(query, response) + self.assertTrue(receivedQuery) + self.assertTrue(receivedResponse) + receivedQuery.id = query.id + self.assertEquals(receivedQuery, query) + self.assertEquals(receivedResponse, response)