./clean.sh
cd ../regression-tests.recursor-dnssec
-./runtests -e SNMP || EXIT=1
+./runtests -e 'SNMP' || EXIT=1
./printlogs.py || true
exit $EXIT
from __future__ import print_function
from __future__ import division
+import math
import socket
import struct
import dns
"""" Determines whether this instance is using the draft option code """
return self.option == DRAFT_OPTION_CODE
- def to_wire(self, file):
+ def to_wire(self, file=None):
"""Create EDNS packet as defined in draft-vandergaast-edns-client-subnet-01."""
ip = self.calculate_ip()
format = "!HBB%ds" % (mask_bits // 8)
data = struct.pack(format, self.family, self.mask, self.scope, test)
- file.write(data)
+ if file:
+ file.write(data)
+ else:
+ return data
def from_wire(cls, otype, wire, current, olen):
"""Read EDNS packet as defined in draft-vandergaast-edns-client-subnet-01.
from_wire = classmethod(from_wire)
+ # needed in 2.0.0..
+ @classmethod
+ def from_wire_parser(cls, otype, parser):
+ family, src, scope = parser.get_struct('!HBB')
+ addrlen = int(math.ceil(src / 8.0))
+ prefix = parser.get_bytes(addrlen)
+ if family == 1:
+ pad = 4 - addrlen
+ addr = dns.ipv4.inet_ntoa(prefix + b'\x00' * pad)
+ elif family == 2:
+ pad = 16 - addrlen
+ addr = dns.ipv6.inet_ntoa(prefix + b'\x00' * pad)
+ else:
+ raise ValueError('unsupported family')
+
+ return cls(addr, src, scope, otype)
+
def __repr__(self):
if self.family == FAMILY_IPV4:
ip = socket.inet_ntop(socket.AF_INET, struct.pack('!L', self.ip))
self.scope
)
+ def to_text(self):
+ return self.__repr__()
+
def __eq__(self, other):
"""Rich comparison method for equality.
self.client = client
self.server = server
- def to_wire(self, file):
+ def to_wire(self, file=None):
"""Create EDNS packet as defined in draft-ietf-dnsop-cookies-09."""
- file.write(self.client)
if self.server and len(self.server) > 0:
- file.write(self.server)
+ data = self.client + self.server
+ else:
+ data = self.client
+
+ if file:
+ file.write(data)
+ else:
+ return data
def from_wire(cls, otype, wire, current, olen):
"""Read EDNS packet as defined in draft-ietf-dnsop-cookies-09.
from_wire = classmethod(from_wire)
+ # needed in 2.0.0
+ @classmethod
+ def from_wire_parser(cls, otype, parser):
+ data = parser.get_remaining()
+
+ if len(data) != 8 and (len(data) < 16 or len(data) > 40):
+ raise Exception('Invalid EDNS Cookies option')
+
+ client = data[:8]
+ if len(data) > 8:
+ server = data[8:]
+ else:
+ server = None
+
+ return cls(client, server)
+
def __repr__(self):
return '%s(%s, %s)' % (
self.__class__.__name__,
self.server
)
+ def to_text(self):
+ return self.__repr__()
+
def __eq__(self, other):
if not isinstance(other, CookiesOption):
return False
dns.edns._type_to_class[0x000A] = CookiesOption
-
raise TypeError("rcode is neither a str nor int")
if msg.rcode() != rcode:
- msgRcode = dns.rcode._by_value[msg.rcode()]
- wantedRcode = dns.rcode._by_value[rcode]
+ msgRcode = dns.rcode.to_text(msg.rcode())
+ wantedRcode = dns.rcode.to_text(rcode)
raise AssertionError("Rcode for %s is %s, expected %s." % (msg.question[0].to_text(), msgRcode, wantedRcode))
response.additional.append(additional)
if ecso:
- response.options = [ecso]
+ response.use_edns(options = [ecso])
self.transport.write(response.to_wire(), address)
Ensure the recursor does not reply with an unknown option when one is
sent in the query
"""
- query = dns.message.make_query('version.bind.', 'TXT', 'CH', use_edns=0,
- payload=4096)
unknownOpt = dns.edns.GenericOption(65005, b'1234567890')
- query.options = [unknownOpt]
+ query = dns.message.make_query('version.bind.', 'TXT', 'CH', use_edns=0,
+ payload=4096, options=[unknownOpt])
response = self.sendUDPQuery(query)
self.assertRcodeEqual(response, dns.rcode.NOERROR)
- self.assertEqual(response.options, [])
+ self.assertEqual(response.options, ())
def testEDNSBadVers(self):
"""
# This is an authoritative answer
response.flags |= dns.flags.AA
# We pretend to do EDNS with a 4096 buffer size
- response.edns = 0
- response.payload = 4096
+ response.use_edns(payload=4096)
# What we use to fill the TXT records
# Test number + 64, so 01 = 'A', 02 = 'B' etc...
if request.edns != -1:
response.set_rcode(dns.rcode.FORMERR)
- response.edns = -1
+ response.use_edns(edns=-1)
+
response.additional = []
else:
if request.question[0].name == dns.name.from_text('host1.insecure-formerr.example.') and request.question[0].rdtype == dns.rdatatype.A:
ans = set()
ret = self.sendUDPQuery(query)
- ans.add(ret.answer[0])
+ ans.add(ret.answer[0][0])
ret = self.sendUDPQuery(query)
- ans.add(ret.answer[0])
+ ans.add(ret.answer[0][0])
self.assertEqual(len(ans), 2)
def checkProtobufIdentity(self, msg, requestorId, deviceId, deviceName):
print(msg)
self.assertTrue((requestorId == '') == (not msg.HasField('requestorId')))
- self.assertTrue((deviceId == '') == (not msg.HasField('deviceId')))
+ self.assertTrue((deviceId == b'') == (not msg.HasField('deviceId')))
self.assertTrue((deviceName == '') == (not msg.HasField('deviceName')))
self.assertEquals(msg.requestorId, requestorId)
self.assertEquals(msg.deviceId, deviceId)
rr = msg.response.rrs[0]
# we don't want to check the TTL for the A record, it has been cached by the previous test
self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dns.rdatatype.CNAME, name, 15)
- self.assertEquals(rr.rdata, 'a.example.')
+ self.assertEquals(rr.rdata, b'a.example.')
rr = msg.response.rrs[1]
# we have max-cache-ttl set to 15
self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dns.rdatatype.A, 'a.example.', 15, checkTTL=False)
self.assertEquals(socket.inet_ntop(socket.AF_INET6, rr.rdata), '2001:db8::1')
elif rr.type == dns.rdatatype.TXT:
self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dns.rdatatype.TXT, name, 15)
- self.assertEquals(rr.rdata, '"Lorem ipsum dolor sit amet"')
+ self.assertEquals(rr.rdata, b'"Lorem ipsum dolor sit amet"')
elif rr.type == dns.rdatatype.MX:
self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dns.rdatatype.MX, name, 15)
- self.assertEquals(rr.rdata, 'a.example.')
+ self.assertEquals(rr.rdata, b'a.example.')
elif rr.type == dns.rdatatype.SPF:
self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dns.rdatatype.SPF, name, 15)
- self.assertEquals(rr.rdata, '"v=spf1 -all"')
+ self.assertEquals(rr.rdata, b'"v=spf1 -all"')
elif rr.type == dns.rdatatype.SRV:
self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dns.rdatatype.SRV, name, 15)
- self.assertEquals(rr.rdata, 'a.example.')
+ self.assertEquals(rr.rdata, b'a.example.')
self.checkNoRemainingMessage()
# check the protobuf messages corresponding to the UDP query and answer
msg = self.getFirstProtobufMessage()
self.checkProtobufQuery(msg, dnsmessage_pb2.PBDNSMessage.UDP, query, dns.rdataclass.IN, dns.rdatatype.A, name)
- self.checkProtobufIdentity(msg, '', '', '')
+ self.checkProtobufIdentity(msg, '', b'', '')
# then the response
msg = self.getFirstProtobufMessage()
# we have max-cache-ttl set to 15
self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dns.rdatatype.A, name, 15)
self.assertEquals(socket.inet_ntop(socket.AF_INET, rr.rdata), '192.0.2.42')
- self.checkProtobufIdentity(msg, '', '', '')
+ self.checkProtobufIdentity(msg, '', b'', '')
self.checkNoRemainingMessage()
def testTagged(self):
# check the protobuf messages corresponding to the UDP query and answer
msg = self.getFirstProtobufMessage()
self.checkProtobufQuery(msg, dnsmessage_pb2.PBDNSMessage.UDP, query, dns.rdataclass.IN, dns.rdatatype.A, name)
- self.checkProtobufIdentity(msg, self._requestorId, self._deviceId, self._deviceName)
+ self.checkProtobufIdentity(msg, self._requestorId, self._deviceId.encode('ascii'), self._deviceName)
# then the response
msg = self.getFirstProtobufMessage()
# we have max-cache-ttl set to 15
self.checkProtobufResponseRecord(rr, dns.rdataclass.IN, dns.rdatatype.A, name, 15)
self.assertEquals(socket.inet_ntop(socket.AF_INET, rr.rdata), '192.0.2.84')
- self.checkProtobufIdentity(msg, self._requestorId, self._deviceId, self._deviceName)
+ self.checkProtobufIdentity(msg, self._requestorId, self._deviceId.encode('ascii'), self._deviceName)
self.checkNoRemainingMessage()
class ProtobufTaggedExtraFieldsFFITest(ProtobufTaggedExtraFieldsTest):
rec_controlCmd = [os.environ['RECCONTROL'],
'--config-dir=%s' % 'configs/' + self._confdir,
'get-tas']
- expected = """Configured Trust Anchors:
+ expected = b"""Configured Trust Anchors:
.
\t\t36914 13 2 c94ed457ff79afe03804c26ce4fa832687db92bc231aff98617791fc71a65870
\t\t42924 13 2 b49e0aafd6e147742afb9eab0e76af0546357dc6c61bf67d7c745cf6f43f460e
'--config-dir=%s' % 'configs/' + self._confdir,
'dump-cache x']
try:
- expected = 'dumped 7 records\n'
+ expected = b'dumped 7 records\n'
ret = subprocess.check_output(rec_controlCmd, stderr=subprocess.STDOUT)
self.assertEqual(ret, expected)
response.additional.append(additional)
if ecso:
- response.options = [ecso]
+ response.use_edns(options = [ecso])
self.transport.write(response.to_wire(), address)
response = self.sendUDPQuery(query)
self.assertEqual(len(response.options), 1)
- self.assertEqual(response.options[0].data, self._servername)
+ self.assertEqual(response.options[0].data, self._servername.encode('ascii'))
def testNSIDTCP(self):
"""
response = self.sendTCPQuery(query)
self.assertEqual(len(response.options), 1)
- self.assertEqual(response.options[0].data, self._servername)
+ self.assertEqual(response.options[0].data, self._servername.encode('ascii'))