]> git.ipfire.org Git - thirdparty/pdns.git/blobdiff - regression-tests.dnsdist/test_EDNSOptions.py
Merge pull request #8722 from rgacogne/ddist-lua-raw-content-rebased
[thirdparty/pdns.git] / regression-tests.dnsdist / test_EDNSOptions.py
index f5838bf40ce1efc50a58ff6352dcd06f7c76b372..0315ed589dbc4ac32e60ea32ec2ae8bf1e28e8a0 100644 (file)
@@ -23,17 +23,17 @@ class EDNSOptionsBase(DNSDistTest):
         if options[EDNSOptionCode.COOKIE]:count() ~= 2 then
           return DNSAction.Spoof, "192.0.2.2"
         end
-        if options[EDNSOptionCode.COOKIE]:getValues()[0]:len() ~= 16 then
+        if options[EDNSOptionCode.COOKIE]:getValues()[1]:len() ~= 16 then
           return DNSAction.Spoof, "192.0.2.3"
         end
-        if options[EDNSOptionCode.COOKIE]:getValues()[1]:len() ~= 16 then
+        if options[EDNSOptionCode.COOKIE]:getValues()[2]:len() ~= 16 then
           return DNSAction.Spoof, "192.0.2.4"
         end
       elseif string.match(qname, 'cookie') then
         if options[EDNSOptionCode.COOKIE] == nil then
           return DNSAction.Spoof, "192.0.2.1"
         end
-        if options[EDNSOptionCode.COOKIE]:count() ~= 1 or options[EDNSOptionCode.COOKIE]:getValues()[0]:len() ~= 16 then
+        if options[EDNSOptionCode.COOKIE]:count() ~= 1 or options[EDNSOptionCode.COOKIE]:getValues()[1]:len() ~= 16 then
           return DNSAction.Spoof, "192.0.2.2"
         end
       end
@@ -42,7 +42,7 @@ class EDNSOptionsBase(DNSDistTest):
         if options[EDNSOptionCode.ECS] == nil then
           return DNSAction.Spoof, "192.0.2.51"
         end
-        if options[EDNSOptionCode.ECS]:count() ~= 1 or options[EDNSOptionCode.ECS]:getValues()[0]:len() ~= 8 then
+        if options[EDNSOptionCode.ECS]:count() ~= 1 or options[EDNSOptionCode.ECS]:getValues()[1]:len() ~= 8 then
           return DNSAction.Spoof, "192.0.2.52"
         end
       end
@@ -51,7 +51,7 @@ class EDNSOptionsBase(DNSDistTest):
         if options[EDNSOptionCode.ECS] == nil then
           return DNSAction.Spoof, "192.0.2.101"
         end
-        if options[EDNSOptionCode.ECS]:count() ~= 1 or options[EDNSOptionCode.ECS]:getValues()[0]:len() ~= 20 then
+        if options[EDNSOptionCode.ECS]:count() ~= 1 or options[EDNSOptionCode.ECS]:getValues()[1]:len() ~= 20 then
           return DNSAction.Spoof, "192.0.2.102"
         end
       end
@@ -66,7 +66,7 @@ class TestEDNSOptions(EDNSOptionsBase):
     _config_template = """
     %s
 
-    addLuaAction(AllRule(), testEDNSOptions)
+    addAction(AllRule(), LuaAction(testEDNSOptions))
 
     newServer{address="127.0.0.1:%s"}
     """
@@ -86,26 +86,21 @@ class TestEDNSOptions(EDNSOptionsBase):
                                     '192.0.2.255')
         response.answer.append(rrset)
 
-        (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
-        self.assertTrue(receivedQuery)
-        self.assertTrue(receivedResponse)
-        receivedQuery.id = query.id
-        self.assertEquals(receivedQuery, query)
-        self.assertEquals(receivedResponse, response)
-
-        (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
-        self.assertTrue(receivedQuery)
-        self.assertTrue(receivedResponse)
-        receivedQuery.id = query.id
-        self.assertEquals(receivedQuery, query)
-        self.assertEquals(receivedResponse, response)
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+            (receivedQuery, receivedResponse) = sender(query, response)
+            self.assertTrue(receivedQuery)
+            self.assertTrue(receivedResponse)
+            receivedQuery.id = query.id
+            self.assertEquals(receivedQuery, query)
+            self.assertEquals(receivedResponse, response)
 
     def testCookie(self):
         """
         EDNS Options: Cookie
         """
         name = 'cookie.ednsoptions.tests.powerdns.com.'
-        eco = cookiesoption.CookiesOption('deadbeef', 'deadbeef')
+        eco = cookiesoption.CookiesOption(b'deadbeef', b'deadbeef')
         query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[eco])
         response = dns.message.make_response(query)
         rrset = dns.rrset.from_text(name,
@@ -115,19 +110,14 @@ class TestEDNSOptions(EDNSOptionsBase):
                                     '127.0.0.1')
         response.answer.append(rrset)
 
-        (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
-        self.assertTrue(receivedQuery)
-        self.assertTrue(receivedResponse)
-        receivedQuery.id = query.id
-        self.assertEquals(receivedQuery, query)
-        self.assertEquals(receivedResponse, response)
-
-        (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
-        self.assertTrue(receivedQuery)
-        self.assertTrue(receivedResponse)
-        receivedQuery.id = query.id
-        self.assertEquals(receivedQuery, query)
-        self.assertEquals(receivedResponse, response)
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+            (receivedQuery, receivedResponse) = sender(query, response)
+            self.assertTrue(receivedQuery)
+            self.assertTrue(receivedResponse)
+            receivedQuery.id = query.id
+            self.assertEquals(receivedQuery, query)
+            self.assertEquals(receivedResponse, response)
 
     def testECS4(self):
         """
@@ -144,19 +134,14 @@ class TestEDNSOptions(EDNSOptionsBase):
                                     '127.0.0.1')
         response.answer.append(rrset)
 
-        (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
-        self.assertTrue(receivedQuery)
-        self.assertTrue(receivedResponse)
-        receivedQuery.id = query.id
-        self.assertEquals(receivedQuery, query)
-        self.assertEquals(receivedResponse, response)
-
-        (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
-        self.assertTrue(receivedQuery)
-        self.assertTrue(receivedResponse)
-        receivedQuery.id = query.id
-        self.assertEquals(receivedQuery, query)
-        self.assertEquals(receivedResponse, response)
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+            (receivedQuery, receivedResponse) = sender(query, response)
+            self.assertTrue(receivedQuery)
+            self.assertTrue(receivedResponse)
+            receivedQuery.id = query.id
+            self.assertEquals(receivedQuery, query)
+            self.assertEquals(receivedResponse, response)
 
     def testECS6(self):
         """
@@ -173,26 +158,21 @@ class TestEDNSOptions(EDNSOptionsBase):
                                     '127.0.0.1')
         response.answer.append(rrset)
 
-        (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
-        self.assertTrue(receivedQuery)
-        self.assertTrue(receivedResponse)
-        receivedQuery.id = query.id
-        self.assertEquals(receivedQuery, query)
-        self.assertEquals(receivedResponse, response)
-
-        (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
-        self.assertTrue(receivedQuery)
-        self.assertTrue(receivedResponse)
-        receivedQuery.id = query.id
-        self.assertEquals(receivedQuery, query)
-        self.assertEquals(receivedResponse, response)
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+            (receivedQuery, receivedResponse) = sender(query, response)
+            self.assertTrue(receivedQuery)
+            self.assertTrue(receivedResponse)
+            receivedQuery.id = query.id
+            self.assertEquals(receivedQuery, query)
+            self.assertEquals(receivedResponse, response)
 
     def testECS6Cookie(self):
         """
         EDNS Options: Cookie + ECS6
         """
         name = 'cookie-ecs6.ednsoptions.tests.powerdns.com.'
-        eco = cookiesoption.CookiesOption('deadbeef', 'deadbeef')
+        eco = cookiesoption.CookiesOption(b'deadbeef', b'deadbeef')
         ecso = clientsubnetoption.ClientSubnetOption('2001:DB8::1', 128)
         query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso,eco])
         response = dns.message.make_response(query)
@@ -203,28 +183,23 @@ class TestEDNSOptions(EDNSOptionsBase):
                                     '127.0.0.1')
         response.answer.append(rrset)
 
-        (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
-        self.assertTrue(receivedQuery)
-        self.assertTrue(receivedResponse)
-        receivedQuery.id = query.id
-        self.assertEquals(receivedQuery, query)
-        self.assertEquals(receivedResponse, response)
-
-        (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
-        self.assertTrue(receivedQuery)
-        self.assertTrue(receivedResponse)
-        receivedQuery.id = query.id
-        self.assertEquals(receivedQuery, query)
-        self.assertEquals(receivedResponse, response)
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+            (receivedQuery, receivedResponse) = sender(query, response)
+            self.assertTrue(receivedQuery)
+            self.assertTrue(receivedResponse)
+            receivedQuery.id = query.id
+            self.assertEquals(receivedQuery, query)
+            self.assertEquals(receivedResponse, response)
 
     def testMultiCookiesECS6(self):
         """
         EDNS Options: Two Cookies + ECS6
         """
         name = 'multiplecookies-ecs6.ednsoptions.tests.powerdns.com.'
-        eco1 = cookiesoption.CookiesOption('deadbeef', 'deadbeef')
+        eco1 = cookiesoption.CookiesOption(b'deadbeef', b'deadbeef')
         ecso = clientsubnetoption.ClientSubnetOption('2001:DB8::1', 128)
-        eco2 = cookiesoption.CookiesOption('deadc0de', 'deadc0de')
+        eco2 = cookiesoption.CookiesOption(b'deadc0de', b'deadc0de')
         query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[eco1, ecso, eco2])
         response = dns.message.make_response(query)
         rrset = dns.rrset.from_text(name,
@@ -234,26 +209,21 @@ class TestEDNSOptions(EDNSOptionsBase):
                                     '127.0.0.1')
         response.answer.append(rrset)
 
-        (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
-        self.assertTrue(receivedQuery)
-        self.assertTrue(receivedResponse)
-        receivedQuery.id = query.id
-        self.assertEquals(receivedQuery, query)
-        self.assertEquals(receivedResponse, response)
-
-        (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
-        self.assertTrue(receivedQuery)
-        self.assertTrue(receivedResponse)
-        receivedQuery.id = query.id
-        self.assertEquals(receivedQuery, query)
-        self.assertEquals(receivedResponse, response)
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+            (receivedQuery, receivedResponse) = sender(query, response)
+            self.assertTrue(receivedQuery)
+            self.assertTrue(receivedResponse)
+            receivedQuery.id = query.id
+            self.assertEquals(receivedQuery, query)
+            self.assertEquals(receivedResponse, response)
 
 class TestEDNSOptionsAddingECS(EDNSOptionsBase):
 
     _config_template = """
     %s
 
-    addLuaAction(AllRule(), testEDNSOptions)
+    addAction(AllRule(), LuaAction(testEDNSOptions))
 
     newServer{address="127.0.0.1:%s", useClientSubnet=true}
     """
@@ -275,27 +245,22 @@ class TestEDNSOptionsAddingECS(EDNSOptionsBase):
                                     '127.0.0.1')
         response.answer.append(rrset)
 
-        (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
-        self.assertTrue(receivedQuery)
-        self.assertTrue(receivedResponse)
-        receivedQuery.id = expectedQuery.id
-        self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
-        self.checkResponseNoEDNS(response, receivedResponse)
-
-        (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
-        self.assertTrue(receivedQuery)
-        self.assertTrue(receivedResponse)
-        receivedQuery.id = expectedQuery.id
-        self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
-        self.checkResponseNoEDNS(response, receivedResponse)
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+            (receivedQuery, receivedResponse) = sender(query, response)
+            self.assertTrue(receivedQuery)
+            self.assertTrue(receivedResponse)
+            receivedQuery.id = expectedQuery.id
+            self.checkQueryEDNSWithECS(expectedQuery, receivedQuery)
+            self.checkResponseNoEDNS(response, receivedResponse)
 
     def testCookie(self):
         """
         EDNS Options: Cookie (adding ECS)
         """
         name = 'cookie.ednsoptions-ecs.tests.powerdns.com.'
-        eco = cookiesoption.CookiesOption('deadbeef', 'deadbeef')
-        query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[eco])
+        eco = cookiesoption.CookiesOption(b'deadbeef', b'deadbeef')
+        query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=512, options=[eco])
         ecso = clientsubnetoption.ClientSubnetOption('127.0.0.1', 24)
         expectedQuery = dns.message.make_query(name, 'A', 'IN', use_edns=True, options=[eco,ecso], payload=512)
         response = dns.message.make_response(query)
@@ -306,19 +271,14 @@ class TestEDNSOptionsAddingECS(EDNSOptionsBase):
                                     '127.0.0.1')
         response.answer.append(rrset)
 
-        (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
-        self.assertTrue(receivedQuery)
-        self.assertTrue(receivedResponse)
-        receivedQuery.id = expectedQuery.id
-        self.checkQueryEDNSWithECS(expectedQuery, receivedQuery, 1)
-        self.checkResponseEDNSWithoutECS(response, receivedResponse)
-
-        (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
-        self.assertTrue(receivedQuery)
-        self.assertTrue(receivedResponse)
-        receivedQuery.id = expectedQuery.id
-        self.checkQueryEDNSWithECS(expectedQuery, receivedQuery, 1)
-        self.checkResponseEDNSWithoutECS(response, receivedResponse)
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+            (receivedQuery, receivedResponse) = sender(query, response)
+            self.assertTrue(receivedQuery)
+            self.assertTrue(receivedResponse)
+            receivedQuery.id = expectedQuery.id
+            self.checkQueryEDNSWithECS(expectedQuery, receivedQuery, 1)
+            self.checkResponseEDNSWithoutECS(response, receivedResponse)
 
     def testECS4(self):
         """
@@ -337,19 +297,14 @@ class TestEDNSOptionsAddingECS(EDNSOptionsBase):
                                     '127.0.0.1')
         response.answer.append(rrset)
 
-        (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
-        self.assertTrue(receivedQuery)
-        self.assertTrue(receivedResponse)
-        receivedQuery.id = query.id
-        self.checkQueryEDNSWithECS(query, receivedQuery)
-        self.checkResponseEDNSWithECS(response, receivedResponse)
-
-        (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
-        self.assertTrue(receivedQuery)
-        self.assertTrue(receivedResponse)
-        receivedQuery.id = query.id
-        self.checkQueryEDNSWithECS(query, receivedQuery)
-        self.checkResponseEDNSWithECS(response, receivedResponse)
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+            (receivedQuery, receivedResponse) = sender(query, response)
+            self.assertTrue(receivedQuery)
+            self.assertTrue(receivedResponse)
+            receivedQuery.id = query.id
+            self.checkQueryEDNSWithECS(query, receivedQuery)
+            self.checkResponseEDNSWithECS(response, receivedResponse)
 
     def testECS6(self):
         """
@@ -368,26 +323,21 @@ class TestEDNSOptionsAddingECS(EDNSOptionsBase):
                                     '127.0.0.1')
         response.answer.append(rrset)
 
-        (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
-        self.assertTrue(receivedQuery)
-        self.assertTrue(receivedResponse)
-        receivedQuery.id = query.id
-        self.checkQueryEDNSWithECS(query, receivedQuery)
-        self.checkResponseEDNSWithECS(response, receivedResponse)
-
-        (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
-        self.assertTrue(receivedQuery)
-        self.assertTrue(receivedResponse)
-        receivedQuery.id = query.id
-        self.checkQueryEDNSWithECS(query, receivedQuery)
-        self.checkResponseEDNSWithECS(response, receivedResponse)
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+            (receivedQuery, receivedResponse) = sender(query, response)
+            self.assertTrue(receivedQuery)
+            self.assertTrue(receivedResponse)
+            receivedQuery.id = query.id
+            self.checkQueryEDNSWithECS(query, receivedQuery)
+            self.checkResponseEDNSWithECS(response, receivedResponse)
 
     def testECS6Cookie(self):
         """
         EDNS Options: Cookie + ECS6 (adding ECS)
         """
         name = 'cookie-ecs6.ednsoptions-ecs.tests.powerdns.com.'
-        eco = cookiesoption.CookiesOption('deadbeef', 'deadbeef')
+        eco = cookiesoption.CookiesOption(b'deadbeef', b'deadbeef')
         ecso = clientsubnetoption.ClientSubnetOption('2001:DB8::1', 128)
         query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[ecso,eco])
         ecsoResponse = clientsubnetoption.ClientSubnetOption('2001:DB8::1', 128, scope=56)
@@ -400,28 +350,23 @@ class TestEDNSOptionsAddingECS(EDNSOptionsBase):
                                     '127.0.0.1')
         response.answer.append(rrset)
 
-        (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
-        self.assertTrue(receivedQuery)
-        self.assertTrue(receivedResponse)
-        receivedQuery.id = query.id
-        self.checkQueryEDNSWithECS(query, receivedQuery, 1)
-        self.checkResponseEDNSWithECS(response, receivedResponse)
-
-        (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
-        self.assertTrue(receivedQuery)
-        self.assertTrue(receivedResponse)
-        receivedQuery.id = query.id
-        self.checkQueryEDNSWithECS(query, receivedQuery, 1)
-        self.checkResponseEDNSWithECS(response, receivedResponse)
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+            (receivedQuery, receivedResponse) = sender(query, response)
+            self.assertTrue(receivedQuery)
+            self.assertTrue(receivedResponse)
+            receivedQuery.id = query.id
+            self.checkQueryEDNSWithECS(query, receivedQuery, 1)
+            self.checkResponseEDNSWithECS(response, receivedResponse)
 
     def testMultiCookiesECS6(self):
         """
         EDNS Options: Two Cookies + ECS6
         """
         name = 'multiplecookies-ecs6.ednsoptions.tests.powerdns.com.'
-        eco1 = cookiesoption.CookiesOption('deadbeef', 'deadbeef')
+        eco1 = cookiesoption.CookiesOption(b'deadbeef', b'deadbeef')
         ecso = clientsubnetoption.ClientSubnetOption('2001:DB8::1', 128)
-        eco2 = cookiesoption.CookiesOption('deadc0de', 'deadc0de')
+        eco2 = cookiesoption.CookiesOption(b'deadc0de', b'deadc0de')
         query = dns.message.make_query(name, 'A', 'IN', use_edns=True, payload=4096, options=[eco1, ecso, eco2])
         response = dns.message.make_response(query)
         rrset = dns.rrset.from_text(name,
@@ -431,16 +376,11 @@ class TestEDNSOptionsAddingECS(EDNSOptionsBase):
                                     '127.0.0.1')
         response.answer.append(rrset)
 
-        (receivedQuery, receivedResponse) = self.sendUDPQuery(query, response)
-        self.assertTrue(receivedQuery)
-        self.assertTrue(receivedResponse)
-        receivedQuery.id = query.id
-        self.assertEquals(receivedQuery, query)
-        self.assertEquals(receivedResponse, response)
-
-        (receivedQuery, receivedResponse) = self.sendTCPQuery(query, response)
-        self.assertTrue(receivedQuery)
-        self.assertTrue(receivedResponse)
-        receivedQuery.id = query.id
-        self.assertEquals(receivedQuery, query)
-        self.assertEquals(receivedResponse, response)
+        for method in ("sendUDPQuery", "sendTCPQuery"):
+            sender = getattr(self, method)
+            (receivedQuery, receivedResponse) = sender(query, response)
+            self.assertTrue(receivedQuery)
+            self.assertTrue(receivedResponse)
+            receivedQuery.id = query.id
+            self.assertEquals(receivedQuery, query)
+            self.assertEquals(receivedResponse, response)