+ query = dns.message.make_query(name, 'A', 'IN')
+ response = dns.message.make_response(query)
+ rrset = dns.rrset.from_text(name,
+ 3600,
+ dns.rdataclass.IN,
+ dns.rdatatype.A,
+ '127.0.0.1')
+ response.answer.append(rrset)
+
+ raw = query.to_wire()
+ raw = raw + b'A'* 20
+
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+
+ # Verify that queries with no trailing data make it through.
+ (receivedQuery, receivedResponse) = sender(query, response)
+ self.assertTrue(receivedQuery)
+ self.assertTrue(receivedResponse)
+ receivedQuery.id = query.id
+ self.assertEquals(query, receivedQuery)
+ self.assertEquals(response, receivedResponse)
+
+ # Verify that queries with trailing data don't make it through.
+ (_, receivedResponse) = sender(raw, response, rawQuery=True)
+ self.assertEquals(receivedResponse, None)
+
+ def testTrailingRemoved(self):
+ """
+ Trailing data: Remove
+
+ """
+ name = 'removed.trailing.tests.powerdns.com.'
+ query = dns.message.make_query(name, 'A', 'IN')
+ response = dns.message.make_response(query)
+ rrset = dns.rrset.from_text(name,
+ 3600,
+ dns.rdataclass.IN,
+ dns.rdatatype.A,
+ '127.0.0.1')
+ response.answer.append(rrset)
+
+ raw = query.to_wire()
+ raw = raw + b'A'* 20
+
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (receivedQuery, receivedResponse) = sender(raw, response, rawQuery=True)
+ self.assertTrue(receivedQuery)
+ self.assertTrue(receivedResponse)
+ receivedQuery.id = query.id
+ self.assertEquals(receivedQuery, query)
+ self.assertEquals(receivedResponse, response)
+
+ def testTrailingRead(self):
+ """
+ Trailing data: Echo
+
+ """
+ name = 'echoed.trailing.tests.powerdns.com.'
+ query = dns.message.make_query(name, 'A', 'IN')
+ response = dns.message.make_response(query)
+ response.set_rcode(dns.rcode.SERVFAIL)
+ expectedResponse = dns.message.make_response(query)
+ rrset = dns.rrset.from_text(name,
+ 60,
+ dns.rdataclass.IN,
+ dns.rdatatype.CNAME,
+ '-TrailingData.echoed.trailing.tests.powerdns.com.')
+ expectedResponse.answer.append(rrset)
+
+ raw = query.to_wire()
+ raw = raw + b'TrailingData'
+
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (_, receivedResponse) = sender(raw, response, rawQuery=True)
+ self.assertTrue(receivedResponse)
+ expectedResponse.flags = receivedResponse.flags
+ self.assertEquals(receivedResponse, expectedResponse)
+
+ def testTrailingReplaced(self):
+ """
+ Trailing data: Replace
+
+ """
+ name = 'replaced.trailing.tests.powerdns.com.'
+ query = dns.message.make_query(name, 'A', 'IN')
+ response = dns.message.make_response(query)
+ response.set_rcode(dns.rcode.SERVFAIL)
+ expectedResponse = dns.message.make_response(query)
+ rrset = dns.rrset.from_text(name,
+ 60,
+ dns.rdataclass.IN,
+ dns.rdatatype.CNAME,
+ '-ABC.echoed.trailing.tests.powerdns.com.')
+ expectedResponse.answer.append(rrset)
+
+ raw = query.to_wire()
+ raw = raw + b'TrailingData'
+
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (_, receivedResponse) = sender(raw, response, rawQuery=True)
+ self.assertTrue(receivedResponse)
+ expectedResponse.flags = receivedResponse.flags
+ self.assertEquals(receivedResponse, expectedResponse)
+
+ def testTrailingReadUnsafe(self):
+ """
+ Trailing data: Echo as hex
+
+ """
+ name = 'echoed-hex.trailing.tests.powerdns.com.'
+ query = dns.message.make_query(name, 'A', 'IN')
+ response = dns.message.make_response(query)
+ response.set_rcode(dns.rcode.SERVFAIL)
+ expectedResponse = dns.message.make_response(query)
+ rrset = dns.rrset.from_text(name,
+ 60,
+ dns.rdataclass.IN,
+ dns.rdatatype.CNAME,
+ '-0x0000DEAD.echoed-hex.trailing.tests.powerdns.com.')
+ expectedResponse.answer.append(rrset)
+
+ raw = query.to_wire()
+ raw = raw + b'\x00\x00\xDE\xAD'
+
+ for method in ("sendUDPQuery", "sendTCPQuery"):
+ sender = getattr(self, method)
+ (_, receivedResponse) = sender(raw, response, rawQuery=True)
+ self.assertTrue(receivedResponse)
+ expectedResponse.flags = receivedResponse.flags
+ self.assertEquals(receivedResponse, expectedResponse)
+
+ def testTrailingReplacedUnsafe(self):
+ """
+ Trailing data: Replace with null and/or non-ASCII bytes
+
+ """
+ name = 'replaced-unsafe.trailing.tests.powerdns.com.'
+ query = dns.message.make_query(name, 'A', 'IN')
+ response = dns.message.make_response(query)
+ response.set_rcode(dns.rcode.SERVFAIL)
+ expectedResponse = dns.message.make_response(query)
+ rrset = dns.rrset.from_text(name,
+ 60,
+ dns.rdataclass.IN,
+ dns.rdatatype.CNAME,
+ '-0xB000DEAD42F09F91BBC3BE.echoed-hex.trailing.tests.powerdns.com.')
+ expectedResponse.answer.append(rrset)