import os
import socket
-import unittest
import dns
from recursortests import RecursorTest
+
class TestFlags(RecursorTest):
_confdir = 'Flags'
_config_template = """dnssec=%s"""
_dnssec_setting = None
_recursors = {}
- _dnssec_setting_ports = {'off': 5300, 'process': 5301, 'validate': 5302}
+ _dnssec_setting_ports = {'off': 5300,
+ 'process-no-validate': 5301,
+ 'process': 5302,
+ 'validate': 5303}
@classmethod
def setUp(cls):
cls._sock = {}
for dnssec_setting, port in cls._dnssec_setting_ports.items():
print("Setting up UDP socket..")
- cls._sock[dnssec_setting] = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ cls._sock[dnssec_setting] = socket.socket(socket.AF_INET,
+ socket.SOCK_DGRAM)
cls._sock[dnssec_setting].settimeout(2.0)
cls._sock[dnssec_setting].connect(("127.0.0.1", port))
cls._recursor = recursor
cls.tearDownRecursor()
- def createQuery(self, name, rdtype, flags, ednsflags):
- """Helper function that creates the query with the specified flags.
- The flags need to be strings (no checking is performed atm)"""
- msg = dns.message.make_query(name, rdtype)
- msg.flags = dns.flags.from_text(flags)
- msg.flags += dns.flags.from_text('RD')
- msg.use_edns(edns=0, ednsflags=dns.flags.edns_from_text(ednsflags))
- return msg
-
def getQueryForSecure(self, flags='', ednsflags=''):
return self.createQuery('ns1.example.', 'A', flags, ednsflags)
self.assertMessageHasFlags(res, ['QR', 'RA', 'RD'])
self.assertNoRRSIGsInAnswer(res)
+ def testProcessNoValidate_Secure_None(self):
+ msg = self.getQueryForSecure()
+ res = self.sendUDPQuery(msg, 'process-no-validate')
+ self.assertMessageHasFlags(res, ['QR', 'RA', 'RD'])
+ self.assertNoRRSIGsInAnswer(res)
+
def testProcess_Secure_None(self):
msg = self.getQueryForSecure()
res = self.sendUDPQuery(msg, 'process')
self.assertNoRRSIGsInAnswer(res)
+ def testProcessNoValidate_Secure_AD(self):
+ msg = self.getQueryForSecure('AD')
+ res = self.sendUDPQuery(msg, 'process-no-validate')
+ self.assertMessageHasFlags(res, ['QR', 'RA', 'RD'])
+ self.assertNoRRSIGsInAnswer(res)
+
def testProcess_Secure_AD(self):
msg = self.getQueryForSecure('AD')
res = self.sendUDPQuery(msg, 'process')
self.assertMessageHasFlags(res, ['QR', 'RA', 'RD'])
self.assertNoRRSIGsInAnswer(res)
+ def testProcessNoValidate_Secure_ADDO(self):
+ msg = self.getQueryForSecure('AD', 'DO')
+ expected = dns.rrset.from_text('ns1.example.', 0, dns.rdataclass.IN, 'A', '{prefix}.10'.format(prefix=self._PREFIX))
+ res = self.sendUDPQuery(msg, 'process-no-validate')
+
+ self.assertMessageHasFlags(res, ['QR', 'RA', 'RD'], ['DO'])
+ self.assertMatchingRRSIGInAnswer(res, expected)
+
def testProcess_Secure_ADDO(self):
msg = self.getQueryForSecure('AD', 'DO')
expected = dns.rrset.from_text('ns1.example.', 0, dns.rdataclass.IN, 'A', '{prefix}.10'.format(prefix=self._PREFIX))
self.assertMessageHasFlags(res, ['QR', 'RA', 'RD'])
+ def testProcessNoValidate_Secure_ADDOCD(self):
+ msg = self.getQueryForSecure('AD CD', 'DO')
+ expected = dns.rrset.from_text('ns1.example.', 0, dns.rdataclass.IN, 'A', '{prefix}.10'.format(prefix=self._PREFIX))
+ res = self.sendUDPQuery(msg, 'process-no-validate')
+
+ self.assertMessageHasFlags(res, ['CD', 'QR', 'RA', 'RD'], ['DO'])
+ self.assertMatchingRRSIGInAnswer(res, expected)
+
def testProcess_Secure_ADDOCD(self):
msg = self.getQueryForSecure('AD CD', 'DO')
expected = dns.rrset.from_text('ns1.example.', 0, dns.rdataclass.IN, 'A', '{prefix}.10'.format(prefix=self._PREFIX))
self.assertMessageHasFlags(res, ['QR', 'RA', 'RD'])
self.assertNoRRSIGsInAnswer(res)
+ def testProcessNoValidate_Secure_DO(self):
+ msg = self.getQueryForSecure('', 'DO')
+ expected = dns.rrset.from_text('ns1.example.', 0, dns.rdataclass.IN, 'A', '{prefix}.10'.format(prefix=self._PREFIX))
+ res = self.sendUDPQuery(msg, 'process-no-validate')
+
+ self.assertMessageHasFlags(res, ['QR', 'RA', 'RD'], ['DO'])
+ self.assertMatchingRRSIGInAnswer(res, expected)
+
def testProcess_Secure_DO(self):
msg = self.getQueryForSecure('', 'DO')
expected = dns.rrset.from_text('ns1.example.', 0, dns.rdataclass.IN, 'A', '{prefix}.10'.format(prefix=self._PREFIX))
res = self.sendUDPQuery(msg, 'process')
- self.assertMessageHasFlags(res, ['QR', 'RA', 'RD'], ['DO'])
+ self.assertMessageHasFlags(res, ['AD', 'QR', 'RA', 'RD'], ['DO'])
self.assertMatchingRRSIGInAnswer(res, expected)
def testValidate_Secure_DO(self):
expected = dns.rrset.from_text('ns1.example.', 0, dns.rdataclass.IN, 'A', '{prefix}.10'.format(prefix=self._PREFIX))
res = self.sendUDPQuery(msg, 'validate')
- self.assertMessageHasFlags(res, ['QR', 'RA', 'RD'], ['DO'])
+ self.assertMessageHasFlags(res, ['AD', 'QR', 'RA', 'RD'], ['DO'])
self.assertMatchingRRSIGInAnswer(res, expected)
##
self.assertMessageHasFlags(res, ['QR', 'RA', 'RD'])
self.assertNoRRSIGsInAnswer(res)
+ def testProcessNoValidate_Secure_DOCD(self):
+ msg = self.getQueryForSecure('CD', 'DO')
+ expected = dns.rrset.from_text('ns1.example.', 0, dns.rdataclass.IN, 'A', '{prefix}.10'.format(prefix=self._PREFIX))
+ res = self.sendUDPQuery(msg, 'process-no-validate')
+
+ self.assertMessageHasFlags(res, ['QR', 'RA', 'RD', 'CD'], ['DO'])
+ self.assertMatchingRRSIGInAnswer(res, expected)
+
def testProcess_Secure_DOCD(self):
msg = self.getQueryForSecure('CD', 'DO')
expected = dns.rrset.from_text('ns1.example.', 0, dns.rdataclass.IN, 'A', '{prefix}.10'.format(prefix=self._PREFIX))
res = self.sendUDPQuery(msg, 'process')
- self.assertMessageHasFlags(res, ['QR', 'RA', 'RD', 'CD'], ['DO'])
+ self.assertMessageHasFlags(res, ['AD', 'QR', 'RA', 'RD', 'CD'], ['DO'])
self.assertMatchingRRSIGInAnswer(res, expected)
def testValidate_Secure_DOCD(self):
expected = dns.rrset.from_text('ns1.example.', 0, dns.rdataclass.IN, 'A', '{prefix}.10'.format(prefix=self._PREFIX))
res = self.sendUDPQuery(msg, 'validate')
- self.assertMessageHasFlags(res, ['QR', 'RA', 'RD', 'CD'], ['DO'])
+ self.assertMessageHasFlags(res, ['AD', 'QR', 'RA', 'RD', 'CD'], ['DO'])
self.assertMatchingRRSIGInAnswer(res, expected)
##
self.assertRRsetInAnswer(res, expected)
self.assertNoRRSIGsInAnswer(res)
+ def testProcessNoValidate_Secure_CD(self):
+ msg = self.getQueryForSecure('CD')
+ expected = dns.rrset.from_text('ns1.example.', 0, dns.rdataclass.IN, 'A', '{prefix}.10'.format(prefix=self._PREFIX))
+ res = self.sendUDPQuery(msg, 'process-no-validate')
+
+ self.assertMessageHasFlags(res, ['QR', 'RA', 'RD', 'CD'])
+ self.assertRRsetInAnswer(res, expected)
+ self.assertNoRRSIGsInAnswer(res)
+
def testProcess_Secure_CD(self):
msg = self.getQueryForSecure('CD')
expected = dns.rrset.from_text('ns1.example.', 0, dns.rdataclass.IN, 'A', '{prefix}.10'.format(prefix=self._PREFIX))
self.assertMessageHasFlags(res, ['QR', 'RA', 'RD'])
self.assertRcodeEqual(res, dns.rcode.NOERROR)
+ def testProcessNoValidate_Bogus_None(self):
+ msg = self.getQueryForBogus()
+ res = self.sendUDPQuery(msg, 'process-no-validate')
+ self.assertMessageHasFlags(res, ['QR', 'RA', 'RD'])
+ self.assertRcodeEqual(res, dns.rcode.NOERROR)
+
def testProcess_Bogus_None(self):
msg = self.getQueryForBogus()
res = self.sendUDPQuery(msg, 'process')
self.assertMessageHasFlags(res, ['QR', 'RA', 'RD'])
self.assertRcodeEqual(res, dns.rcode.NOERROR)
+ def testProcessNoValidate_Bogus_AD(self):
+ msg = self.getQueryForBogus('AD')
+ res = self.sendUDPQuery(msg, 'process-no-validate')
+ self.assertMessageHasFlags(res, ['QR', 'RA', 'RD'])
+ self.assertRcodeEqual(res, dns.rcode.NOERROR)
+
def testProcess_Bogus_AD(self):
msg = self.getQueryForBogus('AD')
res = self.sendUDPQuery(msg, 'process')
self.assertMessageHasFlags(res, ['QR', 'RA', 'RD'])
self.assertRcodeEqual(res, dns.rcode.NOERROR)
+ def testProcessNoValidate_Bogus_ADDO(self):
+ msg = self.getQueryForBogus('AD', 'DO')
+ res = self.sendUDPQuery(msg, 'process-no-validate')
+
+ self.assertMessageHasFlags(res, ['QR', 'RA', 'RD'], ['DO'])
+ self.assertRcodeEqual(res, dns.rcode.NOERROR)
+
def testProcess_Bogus_ADDO(self):
msg = self.getQueryForBogus('AD', 'DO')
res = self.sendUDPQuery(msg, 'process')
self.assertMessageHasFlags(res, ['QR', 'RA', 'RD'])
self.assertRcodeEqual(res, dns.rcode.NOERROR)
+ def testProcessNoValidate_Bogus_ADDOCD(self):
+ msg = self.getQueryForBogus('AD CD', 'DO')
+ expected = dns.rrset.from_text('ted.bogus.example.', 0, dns.rdataclass.IN, 'A', '192.0.2.1')
+ res = self.sendUDPQuery(msg, 'process-no-validate')
+
+ self.assertRcodeEqual(res, dns.rcode.NOERROR)
+ self.assertMessageHasFlags(res, ['CD', 'QR', 'RA', 'RD'], ['DO'])
+ self.assertMatchingRRSIGInAnswer(res, expected)
+
def testProcess_Bogus_ADDOCD(self):
msg = self.getQueryForBogus('AD CD', 'DO')
expected = dns.rrset.from_text('ted.bogus.example.', 0, dns.rdataclass.IN, 'A', '192.0.2.1')
self.assertMessageHasFlags(res, ['QR', 'RA', 'RD'])
self.assertNoRRSIGsInAnswer(res)
- def testProcess_Bogus_DO(self):
+ def testProcessNoValidate_Bogus_DO(self):
msg = self.getQueryForBogus('', 'DO')
expected = dns.rrset.from_text('ted.bogus.example.', 0, dns.rdataclass.IN, 'A', '192.0.2.1')
- res = self.sendUDPQuery(msg, 'process')
+ res = self.sendUDPQuery(msg, 'process-no-validate')
self.assertRcodeEqual(res, dns.rcode.NOERROR)
self.assertMessageHasFlags(res, ['QR', 'RA', 'RD'], ['DO'])
self.assertMatchingRRSIGInAnswer(res, expected)
+ def testProcess_Bogus_DO(self):
+ msg = self.getQueryForBogus('', 'DO')
+ expected = dns.rrset.from_text('ted.bogus.example.', 0, dns.rdataclass.IN, 'A', '192.0.2.1')
+ res = self.sendUDPQuery(msg, 'process')
+
+ self.assertMessageHasFlags(res, ['QR', 'RA', 'RD'], ['DO'])
+ self.assertRcodeEqual(res, dns.rcode.SERVFAIL)
+ self.assertAnswerEmpty(res)
+
def testValidate_Bogus_DO(self):
msg = self.getQueryForBogus('', 'DO')
res = self.sendUDPQuery(msg, 'validate')
self.assertMessageHasFlags(res, ['QR', 'RA', 'RD'])
self.assertNoRRSIGsInAnswer(res)
+ def testProcessNoValidate_Bogus_DOCD(self):
+ msg = self.getQueryForBogus('CD', 'DO')
+ expected = dns.rrset.from_text('ted.bogus.example.', 0, dns.rdataclass.IN, 'A', '192.0.2.1')
+ res = self.sendUDPQuery(msg, 'process-no-validate')
+
+ self.assertRcodeEqual(res, dns.rcode.NOERROR)
+ self.assertMessageHasFlags(res, ['QR', 'RA', 'RD', 'CD'], ['DO'])
+ self.assertMatchingRRSIGInAnswer(res, expected)
+
def testProcess_Bogus_DOCD(self):
msg = self.getQueryForBogus('CD', 'DO')
expected = dns.rrset.from_text('ted.bogus.example.', 0, dns.rdataclass.IN, 'A', '192.0.2.1')
self.assertRRsetInAnswer(res, expected)
self.assertNoRRSIGsInAnswer(res)
+ def testProcessNoValidate_Bogus_CD(self):
+ msg = self.getQueryForBogus('CD')
+ expected = dns.rrset.from_text('ted.bogus.example.', 0, dns.rdataclass.IN, 'A', '192.0.2.1')
+ res = self.sendUDPQuery(msg, 'process-no-validate')
+
+ self.assertRcodeEqual(res, dns.rcode.NOERROR)
+ self.assertMessageHasFlags(res, ['QR', 'RA', 'RD', 'CD'])
+ self.assertRRsetInAnswer(res, expected)
+ self.assertNoRRSIGsInAnswer(res)
+
def testProcess_Bogus_CD(self):
msg = self.getQueryForBogus('CD')
expected = dns.rrset.from_text('ted.bogus.example.', 0, dns.rdataclass.IN, 'A', '192.0.2.1')
self.assertMessageHasFlags(res, ['QR', 'RA', 'RD'])
self.assertNoRRSIGsInAnswer(res)
+ def testProcessNoValidate_Insecure_None(self):
+ msg = self.getQueryForInsecure()
+ res = self.sendUDPQuery(msg, 'process-no-validate')
+ self.assertRcodeEqual(res, dns.rcode.NOERROR)
+ self.assertMessageHasFlags(res, ['QR', 'RA', 'RD'])
+ self.assertNoRRSIGsInAnswer(res)
+
def testProcess_Insecure_None(self):
msg = self.getQueryForInsecure()
res = self.sendUDPQuery(msg, 'process')
self.assertNoRRSIGsInAnswer(res)
self.assertRcodeEqual(res, dns.rcode.NOERROR)
+ def testProcessNoValidate_Insecure_AD(self):
+ msg = self.getQueryForInsecure('AD')
+ res = self.sendUDPQuery(msg, 'process-no-validate')
+
+ self.assertMessageHasFlags(res, ['QR', 'RA', 'RD'])
+ self.assertNoRRSIGsInAnswer(res)
+ self.assertRcodeEqual(res, dns.rcode.NOERROR)
+
def testProcess_Insecure_AD(self):
msg = self.getQueryForInsecure('AD')
res = self.sendUDPQuery(msg, 'process')
self.assertNoRRSIGsInAnswer(res)
self.assertRcodeEqual(res, dns.rcode.NOERROR)
+ def testProcessNoValidate_Insecure_ADDO(self):
+ msg = self.getQueryForInsecure('AD', 'DO')
+ res = self.sendUDPQuery(msg, 'process-no-validate')
+
+ self.assertMessageHasFlags(res, ['QR', 'RA', 'RD'], ['DO'])
+ self.assertNoRRSIGsInAnswer(res)
+ self.assertRcodeEqual(res, dns.rcode.NOERROR)
+
def testProcess_Insecure_ADDO(self):
msg = self.getQueryForInsecure('AD', 'DO')
res = self.sendUDPQuery(msg, 'process')
self.assertNoRRSIGsInAnswer(res)
self.assertRcodeEqual(res, dns.rcode.NOERROR)
+ def testProcessNoValidate_Insecure_ADDOCD(self):
+ msg = self.getQueryForInsecure('AD CD', 'DO')
+ res = self.sendUDPQuery(msg, 'process-no-validate')
+
+ self.assertMessageHasFlags(res, ['CD', 'QR', 'RA', 'RD'], ['DO'])
+ self.assertNoRRSIGsInAnswer(res)
+ self.assertRcodeEqual(res, dns.rcode.NOERROR)
+
def testProcess_Insecure_ADDOCD(self):
msg = self.getQueryForInsecure('AD CD', 'DO')
res = self.sendUDPQuery(msg, 'process')
self.assertNoRRSIGsInAnswer(res)
self.assertRcodeEqual(res, dns.rcode.NOERROR)
+ def testProcessNoValidate_Insecure_DO(self):
+ msg = self.getQueryForInsecure('', 'DO')
+ res = self.sendUDPQuery(msg, 'process-no-validate')
+
+ self.assertMessageHasFlags(res, ['QR', 'RA', 'RD'], ['DO'])
+ self.assertNoRRSIGsInAnswer(res)
+ self.assertRcodeEqual(res, dns.rcode.NOERROR)
+
def testProcess_Insecure_DO(self):
msg = self.getQueryForInsecure('', 'DO')
res = self.sendUDPQuery(msg, 'process')
self.assertNoRRSIGsInAnswer(res)
self.assertRcodeEqual(res, dns.rcode.NOERROR)
+ def testProcessNoValidate_Insecure_DOCD(self):
+ msg = self.getQueryForInsecure('CD', 'DO')
+ res = self.sendUDPQuery(msg, 'process-no-validate')
+
+ self.assertMessageHasFlags(res, ['QR', 'RA', 'RD', 'CD'], ['DO'])
+ self.assertNoRRSIGsInAnswer(res)
+ self.assertRcodeEqual(res, dns.rcode.NOERROR)
+
def testProcess_Insecure_DOCD(self):
msg = self.getQueryForInsecure('CD', 'DO')
res = self.sendUDPQuery(msg, 'process')
self.assertNoRRSIGsInAnswer(res)
self.assertRcodeEqual(res, dns.rcode.NOERROR)
+ def testProcessNoValidate_Insecure_CD(self):
+ msg = self.getQueryForInsecure('CD')
+ res = self.sendUDPQuery(msg, 'process-no-validate')
+
+ self.assertMessageHasFlags(res, ['QR', 'RA', 'RD', 'CD'])
+ self.assertNoRRSIGsInAnswer(res)
+ self.assertRcodeEqual(res, dns.rcode.NOERROR)
+
def testProcess_Insecure_CD(self):
msg = self.getQueryForInsecure('CD')
res = self.sendUDPQuery(msg, 'process')