import threading
import unittest
+import clientsubnetoption
import dns
from twisted.internet.protocol import DatagramProtocol
resolver=%s.1:5301
any-to-tcp=no
launch=bind
+edns-subnet-processing=yes
"""
_config_params = ['_PREFIX']
noerror.example.org. 3600 IN ALIAS noerror.example.com.
nxd.example.org. 3600 IN ALIAS nxd.example.com.
-servfail.example.org. 3600 IN ALIAS servfail.example.com
+servfail.example.org. 3600 IN ALIAS servfail.example.com.
+subnet.example.org. 3600 IN ALIAS subnet.example.com.
""",
}
res = self.sendTCPQuery(query)
self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
+ def testECS(self):
+ expected_a = [dns.rrset.from_text('subnet.example.org.',
+ 0, dns.rdataclass.IN, 'A',
+ '192.0.2.1')]
+ expected_aaaa = [dns.rrset.from_text('subnet.example.org.',
+ 0, dns.rdataclass.IN, 'AAAA',
+ '2001:DB8::1')]
+
+ ecso = clientsubnetoption.ClientSubnetOption('1.2.3.0', 24)
+ ecso2 = clientsubnetoption.ClientSubnetOption('1.2.3.0', 24, 22)
+ query = dns.message.make_query('subnet.example.org', 'A', use_edns=True, options=[ecso])
+ res = self.sendUDPQuery(query)
+ self.assertRcodeEqual(res, dns.rcode.NOERROR)
+ self.assertAnyRRsetInAnswer(res, expected_a)
+ self.assertEqual(res.options[0], ecso2)
+
+ ecso = clientsubnetoption.ClientSubnetOption('2001:db8:db6:db5::', 64)
+ ecso2 = clientsubnetoption.ClientSubnetOption('2001:db8:db6::', 64, 48)
+ query = dns.message.make_query('subnet.example.org', 'A', use_edns=True, options=[ecso])
+ res = self.sendUDPQuery(query)
+ self.assertRcodeEqual(res, dns.rcode.NOERROR)
+ self.assertAnyRRsetInAnswer(res, expected_a)
+ self.assertEqual(res.options[0], ecso2)
+
class AliasUDPResponder(DatagramProtocol):
def datagramReceived(self, datagram, address):
name = question.name
name_text = name.to_text()
- if name_text == 'noerror.example.com.':
+ if name_text in ('noerror.example.com.', 'subnet.example.com.'):
+
+ do_ecs = False
+ if name_text == 'subnet.example.com.':
+ do_ecs=True
+
response.set_rcode(dns.rcode.NOERROR)
if question.rdtype in [dns.rdatatype.A,
dns.rdatatype.ANY]:
dns.rrset.from_text(name,
0, dns.rdataclass.IN, 'AAAA',
'2001:DB8::1'))
+
+ if do_ecs:
+ if request.options[0].family == clientsubnetoption.FAMILY_IPV4:
+ ecso = clientsubnetoption.ClientSubnetOption('1.2.3.0', 24, 22)
+ else:
+ ecso = clientsubnetoption.ClientSubnetOption('2001:db8:db6::', 64, 48)
+ response.use_edns(edns=True, options=[ecso])
+
if name_text == 'nxd.example.com.':
response.set_rcode(dns.rcode.NXDOMAIN)
response.authority.append(