]> git.ipfire.org Git - thirdparty/samba.git/commitdiff
dns: record aging tests
authorAaron Haslett <aaronhaslett@catalyst.net.nz>
Wed, 9 May 2018 06:02:28 +0000 (18:02 +1200)
committerAndrew Bartlett <abartlet@samba.org>
Thu, 12 Jul 2018 02:31:52 +0000 (04:31 +0200)
First basic DNS record aging tests.  These check that we can
turn aging on and off, and that timestamps are written on DNS
add and update calls, but not RPC calls.

BUG: https://bugzilla.samba.org/show_bug.cgi?id=10812

Signed-off-by: Aaron Haslett <aaronhaslett@catalyst.net.nz>
Reviewed-by: Gary Lockyer <gary@catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
python/samba/tests/dns.py
python/samba/tests/dns_base.py
selftest/knownfail.d/dns
selftest/knownfail.d/dns-scavenging [new file with mode: 0644]

index 4082b5375d1a1348af49f3e846a60d2fecea5741..083dc460afd7e3cafb2f9fc8410632c33fe79f47 100644 (file)
 #
 
 from __future__ import print_function
+
+from samba import dsdb
+from samba.ndr import ndr_unpack, ndr_pack
+from samba.samdb import SamDB
+from samba.auth import system_session
+import ldb
 import os
 import sys
 import struct
@@ -652,11 +658,9 @@ class TestComplexQueries(DNSTest):
         r.rr_class = dns.DNS_QCLASS_IN
         r.ttl = 900
         r.length = 0xffff
-        rdata = value
-        r.rdata = rdata
-        updates = [r]
+        r.rdata = value
         p.nscount = 1
-        p.nsrecs = updates
+        p.nsrecs = [r]
         (response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
         self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
 
@@ -884,11 +888,22 @@ class TestZones(DNSTest):
         self.timeout = timeout
 
         self.zone = "test.lan"
-        self.rpc_conn = dnsserver.dnsserver("ncacn_ip_tcp:%s[sign]" % (self.server_ip),
+        self.rpc_conn = dnsserver.dnsserver("ncacn_ip_tcp:%s[sign]" %\
+                                            (self.server_ip),
                                             self.lp, self.creds)
 
+        self.samdb = SamDB(url="ldap://" + self.server_ip,
+                           lp = self.get_loadparm(),
+                           session_info=system_session(),
+                           credentials=self.creds)
+
+        self.zone_dn = "DC=" + self.zone +\
+                       ",CN=MicrosoftDNS,DC=DomainDNSZones," +\
+                       str(self.samdb.get_default_basedn())
+
     def tearDown(self):
         super(TestZones, self).tearDown()
+
         try:
             self.delete_zone(self.zone)
         except RuntimeError as e:
@@ -896,15 +911,18 @@ class TestZones(DNSTest):
             if num != werror.WERR_DNS_ERROR_ZONE_DOES_NOT_EXIST:
                 raise
 
-    def create_zone(self, zone):
+    def create_zone(self, zone, aging_enabled=False):
         zone_create = dnsserver.DNS_RPC_ZONE_CREATE_INFO_LONGHORN()
         zone_create.pszZoneName = zone
         zone_create.dwZoneType = dnsp.DNS_ZONE_TYPE_PRIMARY
-        zone_create.fAllowUpdate = dnsp.DNS_ZONE_UPDATE_SECURE
-        zone_create.fAging = 0
+        zone_create.fAging = int(aging_enabled)
         zone_create.dwDpFlags = dnsserver.DNS_DP_DOMAIN_DEFAULT
+        zone_create.fDsIntegrated = 1
+        zone_create.fLoadExisting = 1
+        zone_create.fAllowUpdate = dnsp.DNS_ZONE_UPDATE_UNSECURE
         try:
-            self.rpc_conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+            client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
+            self.rpc_conn.DnssrvOperation2(client_version,
                                            0,
                                            self.server_ip,
                                            None,
@@ -915,6 +933,182 @@ class TestZones(DNSTest):
         except WERRORError as e:
             self.fail(str(e))
 
+    def set_params(self, **kwargs):
+        zone = kwargs.pop('zone', None)
+        for key,val in kwargs.items():
+            name_param = dnsserver.DNS_RPC_NAME_AND_PARAM()
+            name_param.dwParam = val
+            name_param.pszNodeName = key
+
+            client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
+            nap_type = dnsserver.DNSSRV_TYPEID_NAME_AND_PARAM
+            try:
+                self.rpc_conn.DnssrvOperation2(client_version, 0, self.server, zone,
+                                               0, 'ResetDwordProperty', nap_type,
+                                               name_param)
+            except WERRORError as e:
+                self.fail(str(e))
+
+    def ldap_modify_dnsrecs(self, name, func):
+        dn = 'DC={},{}'.format(name, self.zone_dn)
+        dns_recs = self.ldap_get_dns_records(name)
+        for rec in dns_recs:
+            func(rec)
+        update_dict = {'dn':dn, 'dnsRecord':[ndr_pack(r) for r in dns_recs]}
+        self.samdb.modify(ldb.Message.from_dict(self.samdb,
+                                                update_dict,
+                                                ldb.FLAG_MOD_REPLACE))
+
+    def dns_update_record(self, prefix, txt):
+        p = self.make_txt_update(prefix, txt, self.zone)
+        (code, response) = self.dns_transaction_udp(p, host=self.server_ip)
+        self.assert_dns_rcode_equals(code, dns.DNS_RCODE_OK)
+        recs = self.ldap_get_dns_records(prefix)
+        recs = [r for r in recs if r.data.str == txt]
+        self.assertEqual(len(recs), 1)
+        return recs[0]
+
+    def ldap_get_records(self, name):
+        dn = 'DC={},{}'.format(name, self.zone_dn)
+        expr = "(&(objectClass=dnsNode)(name={}))".format(name)
+        return self.samdb.search(base=dn, scope=ldb.SCOPE_SUBTREE,
+                                 expression=expr, attrs=["*"])
+
+    def ldap_get_dns_records(self, name):
+        records = self.ldap_get_records(name)
+        return [ndr_unpack(dnsp.DnssrvRpcRecord, r)
+                for r in records[0].get('dnsRecord')]
+
+    def ldap_get_zone_settings(self):
+        records = self.samdb.search(base=self.zone_dn, scope=ldb.SCOPE_BASE,
+                   expression="(&(objectClass=dnsZone)"+\
+                                "(name={}))".format(self.zone),
+                                    attrs=["dNSProperty"])
+        self.assertEqual(len(records), 1)
+        props = [ndr_unpack(dnsp.DnsProperty, r)
+                 for r in records[0].get('dNSProperty')]
+
+        #We have no choice but to repeat these here.
+        zone_prop_ids = {0x00: "EMPTY",
+                         0x01: "TYPE",
+                         0x02: "ALLOW_UPDATE",
+                         0x08: "SECURE_TIME",
+                         0x10: "NOREFRESH_INTERVAL",
+                         0x11: "SCAVENGING_SERVERS",
+                         0x12: "AGING_ENABLED_TIME",
+                         0x20: "REFRESH_INTERVAL",
+                         0x40: "AGING_STATE",
+                         0x80: "DELETED_FROM_HOSTNAME",
+                         0x81: "MASTER_SERVERS",
+                         0x82: "AUTO_NS_SERVERS",
+                         0x83: "DCPROMO_CONVERT",
+                         0x90: "SCAVENGING_SERVERS_DA",
+                         0x91: "MASTER_SERVERS_DA",
+                         0x92: "NS_SERVERS_DA",
+                         0x100: "NODE_DBFLAGS"}
+        return {zone_prop_ids[p.id].lower(): p.data for p in props}
+
+    def set_aging(self, enable=False):
+        self.create_zone(self.zone, aging_enabled=enable)
+        self.set_params(NoRefreshInterval=1, RefreshInterval=1,
+                        Aging=int(bool(enable)), zone=self.zone,
+                        AllowUpdate = dnsp.DNS_ZONE_UPDATE_UNSECURE)
+
+    def test_set_aging(self, enable=True, name='agingtest', txt=['test txt']):
+        self.set_aging(enable=True)
+        settings = self.ldap_get_zone_settings()
+        self.assertTrue(settings['aging_state'] is not None)
+        self.assertTrue(settings['aging_state'])
+
+        rec = self.dns_update_record('agingtest', ['test txt'])
+        self.assertNotEqual(rec.dwTimeStamp, 0)
+
+    def test_set_aging_disabled(self):
+        self.set_aging(enable=False)
+        settings = self.ldap_get_zone_settings()
+        self.assertTrue(settings['aging_state'] is not None)
+        self.assertFalse(settings['aging_state'])
+
+        rec = self.dns_update_record('agingtest', ['test txt'])
+        self.assertNotEqual(rec.dwTimeStamp, 0)
+
+    def test_aging_update(self, enable=True):
+        name, txt = 'agingtest', ['test txt']
+        self.set_aging(enable=True)
+        before_mod = self.dns_update_record(name, txt)
+        if not enable:
+            self.set_params(zone=self.zone, Aging=0)
+        dec = 2
+        def mod_ts(rec):
+            rec.dwTimeStamp -= dec
+        self.ldap_modify_dnsrecs(name, mod_ts)
+        after_mod = self.ldap_get_dns_records(name)
+        self.assertEqual(len(after_mod), 1)
+        after_mod = after_mod[0]
+        self.assertEqual(after_mod.dwTimeStamp,
+                         before_mod.dwTimeStamp - dec)
+        after_update = self.dns_update_record(name, txt)
+        after_should_equal = before_mod if enable else after_mod
+        self.assertEqual(after_should_equal.dwTimeStamp,
+                         after_update.dwTimeStamp)
+
+    def test_aging_update_disabled(self):
+        self.test_aging_update(enable=False)
+
+    def test_aging_refresh(self):
+        name,txt = 'agingtest', ['test txt']
+        self.create_zone(self.zone, aging_enabled=True)
+        interval = 10
+        self.set_params(NoRefreshInterval=interval, RefreshInterval=interval,
+                        Aging=1, zone=self.zone,
+                        AllowUpdate = dnsp.DNS_ZONE_UPDATE_UNSECURE)
+        before_mod = self.dns_update_record(name, txt)
+        def mod_ts(rec):
+            rec.dwTimeStamp -= interval/2
+        self.ldap_modify_dnsrecs(name, mod_ts)
+        update_during_norefresh = self.dns_update_record(name, txt)
+        def mod_ts(rec):
+            rec.dwTimeStamp -= interval + interval/2
+        self.ldap_modify_dnsrecs(name, mod_ts)
+        update_during_refresh = self.dns_update_record(name, txt)
+        self.assertEqual(update_during_norefresh.dwTimeStamp,
+                         before_mod.dwTimeStamp - interval/2)
+        self.assertEqual(update_during_refresh.dwTimeStamp,
+                         before_mod.dwTimeStamp)
+
+    def test_rpc_add_no_timestamp(self):
+        name,txt = 'agingtest', ['test txt']
+        self.set_aging(enable=True)
+        rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
+        rec_buf.rec = TXTRecord(txt)
+        self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+                                          0, self.server_ip,
+                                          self.zone, name, rec_buf, None)
+        recs = self.ldap_get_dns_records(name)
+        self.assertEqual(len(recs), 1)
+        self.assertEqual(recs[0].dwTimeStamp, 0)
+
+    def test_basic_scavenging(self):
+        self.create_zone(self.zone, aging_enabled=True)
+        interval = 1
+        self.set_params(NoRefreshInterval=interval, RefreshInterval=interval,
+                        zone=self.zone, Aging=1,
+                        AllowUpdate = dnsp.DNS_ZONE_UPDATE_UNSECURE)
+        name, txt = 'agingtest', ['test txt']
+        rec = self.dns_update_record(name,txt)
+        rec = self.dns_update_record(name+'2',txt)
+        def mod_ts(rec):
+            rec.dwTimeStamp -= interval*5
+        self.ldap_modify_dnsrecs(name, mod_ts)
+        dsdb._scavenge_dns_records(self.samdb)
+
+        recs = self.ldap_get_dns_records(name)
+        self.assertEqual(len(recs), 1)
+        self.assertEqual(recs[0].wType, dnsp.DNS_TYPE_TOMBSTONE)
+        recs = self.ldap_get_dns_records(name+'2')
+        self.assertEqual(len(recs), 1)
+        self.assertEqual(recs[0].wType, dnsp.DNS_TYPE_TXT)
+
     def delete_zone(self, zone):
         self.rpc_conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
                                        0,
index 10c7a0a6af5b6631d8a9f7454ddbd5628b7175a0..0709bbe1c407b480f60051d357f661dd9ea82de5 100644 (file)
@@ -166,18 +166,18 @@ class DNSTest(TestCaseInTempDir):
 
         return (response, recv_packet[2:])
 
-    def make_txt_update(self, prefix, txt_array):
+    def make_txt_update(self, prefix, txt_array, domain=None):
         p = self.make_name_packet(dns.DNS_OPCODE_UPDATE)
         updates = []
 
-        name = self.get_dns_domain()
+        name = domain or self.get_dns_domain()
         u = self.make_name_question(name, dns.DNS_QTYPE_SOA, dns.DNS_QCLASS_IN)
         updates.append(u)
         self.finish_name_packet(p, updates)
 
         updates = []
         r = dns.res_rec()
-        r.name = "%s.%s" % (prefix, self.get_dns_domain())
+        r.name = "%s.%s" % (prefix, name)
         r.rr_type = dns.DNS_QTYPE_TXT
         r.rr_class = dns.DNS_QCLASS_IN
         r.ttl = 900
@@ -190,8 +190,8 @@ class DNSTest(TestCaseInTempDir):
 
         return p
 
-    def check_query_txt(self, prefix, txt_array):
-        name = "%s.%s" % (prefix, self.get_dns_domain())
+    def check_query_txt(self, prefix, txt_array, zone=None):
+        name = "%s.%s" % (prefix, zone or self.get_dns_domain())
         p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
         questions = []
 
index cb3003240ea30e3a89c1e8aab7c21e730d4ea13e..9fa35a2b6ea8b9be63037b40975fc5bcf1c4999b 100644 (file)
@@ -33,7 +33,23 @@ samba.tests.dns.__main__.TestRPCRoundtrip.test_update_add_padding_rpc_to_dns\(ro
 samba.tests.dns.__main__.TestRPCRoundtrip.test_update_add_slash_rpc_to_dns\(rodc:local\)
 samba.tests.dns.__main__.TestRPCRoundtrip.test_update_add_two_rpc_to_dns\(rodc:local\)
 samba.tests.dns.__main__.TestRPCRoundtrip.test_update_add_txt_rpc_to_dns\(rodc:local\)
+
+samba.tests.dns.__main__.TestZones.test_set_aging_disabled
+
 samba.tests.dns.__main__.TestZones.test_soa_query\(rodc:local\)
+samba.tests.dns.__main__.TestZones.test_set_aging\(rodc:local\)
+samba.tests.dns.__main__.TestZones.test_aging_update\(rodc:local\)
+samba.tests.dns.__main__.TestZones.test_aging_update_disabled\(rodc:local\)
+samba.tests.dns.__main__.TestZones.test_aging_refresh\(rodc:local\)
+samba.tests.dns.__main__.TestZones.test_rpc_add_no_timestamp\(rodc:local\)
+samba.tests.dns.__main__.TestZones.test_basic_scavenging\(rodc:local\)
+
+samba.tests.dns.__main__.TestZones.test_set_aging\(vampire_dc:local\)
+samba.tests.dns.__main__.TestZones.test_aging_update\(vampire_dc:local\)
+samba.tests.dns.__main__.TestZones.test_aging_update_disabled\(vampire_dc:local\)
+samba.tests.dns.__main__.TestZones.test_aging_refresh\(vampire_dc:local\)
+samba.tests.dns.__main__.TestZones.test_basic_scavenging\(vampire_dc:local\)
+
 samba.tests.dns.__main__.TestComplexQueries.test_cname_two_chain\(vampire_dc:local\)
 samba.tests.dns.__main__.TestComplexQueries.test_one_a_query\(vampire_dc:local\)
 samba.tests.dns.__main__.TestSimpleQueries.test_one_a_query\(vampire_dc:local\)
diff --git a/selftest/knownfail.d/dns-scavenging b/selftest/knownfail.d/dns-scavenging
new file mode 100644 (file)
index 0000000..3c4cdc9
--- /dev/null
@@ -0,0 +1,12 @@
+#
+# Tests added for the dns scavenging changes
+#
+# Will be removed once the tests are implemented.
+#
+samba.tests.dns.__main__.TestZones.test_aging_refresh\(fl2003dc:local\)
+samba.tests.dns.__main__.TestZones.test_aging_update\(fl2003dc:local\)
+samba.tests.dns.__main__.TestZones.test_aging_update_disabled\(fl2003dc:local\)
+samba.tests.dns.__main__.TestZones.test_basic_scavenging\(fl2003dc:local\)
+samba.tests.dns.__main__.TestZones.test_rpc_add_no_timestamp\(fl2003dc:local\)
+samba.tests.dns.__main__.TestZones.test_set_aging\(fl2003dc:local\)
+samba.tests.dns.__main__.TestZones.test_rpc_add_no_timestamp\(vampire_dc:local\)