]> git.ipfire.org Git - thirdparty/samba.git/commitdiff
pytests: add dns_aging, embracing and extending ageing tests
authorDouglas Bagnall <douglas.bagnall@catalyst.net.nz>
Wed, 28 Apr 2021 05:40:08 +0000 (17:40 +1200)
committerAndrew Bartlett <abartlet@samba.org>
Fri, 11 Jun 2021 09:29:23 +0000 (09:29 +0000)
This incorporates tests from various dns*.py files, but makes them
correct.

All but one of these tests pass against Windows 2012r2.

Further patches will remove the broken tests in other files, and fix
Samba so it passes these.

Signed-off-by: Douglas Bagnall <douglas.bagnall@catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet@samba.org>
Autobuild-User(master): Andrew Bartlett <abartlet@samba.org>
Autobuild-Date(master): Fri Jun 11 09:29:23 UTC 2021 on sn-devel-184

python/samba/tests/dns_aging.py [new file with mode: 0644]
selftest/knownfail.d/dns-aging [new file with mode: 0644]
source4/selftest/tests.py

diff --git a/python/samba/tests/dns_aging.py b/python/samba/tests/dns_aging.py
new file mode 100644 (file)
index 0000000..f9bcdb2
--- /dev/null
@@ -0,0 +1,1895 @@
+# Unix SMB/CIFS implementation.
+# Copyright (C) Kai Blin  <kai@samba.org> 2011
+# Copyright (C) Catalyst.NET 2021
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+#
+
+import sys
+from samba import dsdb
+from samba import dsdb_dns
+from samba.ndr import ndr_unpack, ndr_pack
+from samba.samdb import SamDB
+from samba.auth import system_session
+import ldb
+from samba import credentials
+from samba.dcerpc import dns, dnsp, dnsserver
+from samba.dnsserver import TXTRecord
+from samba.dnsserver import recbuf_from_string
+from samba.tests.subunitrun import SubunitOptions, TestProgram
+from samba import werror, WERRORError
+from samba.tests.dns_base import DNSTest
+import samba.getopt as options
+import optparse
+import time
+
+
+parser = optparse.OptionParser(
+    "dns_aging.py <server name> <server ip> [options]")
+sambaopts = options.SambaOptions(parser)
+parser.add_option_group(sambaopts)
+
+
+# use command line creds if available
+credopts = options.CredentialsOptions(parser)
+parser.add_option_group(credopts)
+subunitopts = SubunitOptions(parser)
+parser.add_option_group(subunitopts)
+
+opts, args = parser.parse_args()
+if len(args) < 2:
+    parser.print_usage()
+    sys.exit(1)
+
+LP = sambaopts.get_loadparm()
+CREDS = credopts.get_credentials(LP)
+SERVER_NAME = args[0]
+SERVER_IP = args[1]
+CREDS.set_krb_forwardable(credentials.NO_KRB_FORWARDABLE)
+
+DOMAIN = CREDS.get_realm().lower()
+
+# Unix time start, in DNS timestamp (24 * 365.25 * 369)
+# These are ballpark extremes for the timestamp.
+DNS_TIMESTAMP_1970 = 3234654
+DNS_TIMESTAMP_2101 = 4383000
+DNS_TIMESTAMP_1981 = 3333333  # a middling timestamp
+
+def get_samdb():
+    return SamDB(url=f"ldap://{SERVER_IP}",
+                 lp=LP,
+                 session_info=system_session(),
+                 credentials=CREDS)
+
+
+def get_file_samdb():
+    # For Samba only direct file access, needed for the tombstoning functions.
+    # (For Windows, we instruct it to tombstone over RPC).
+    return SamDB(url=LP.samdb_url(),
+                 lp=LP,
+                 session_info=system_session(),
+                 credentials=CREDS)
+
+
+def get_rpc():
+    return dnsserver.dnsserver(f"ncacn_ip_tcp:{SERVER_IP}[sign]", LP, CREDS)
+
+
+def create_zone(name, rpc=None, aging=True):
+    if rpc is None:
+        rpc = get_rpc()
+    z = dnsserver.DNS_RPC_ZONE_CREATE_INFO_LONGHORN()
+    z.pszZoneName = name
+    z.dwZoneType = dnsp.DNS_ZONE_TYPE_PRIMARY
+    z.fAging = int(bool(aging))
+    z.dwDpFlags = dnsserver.DNS_DP_DOMAIN_DEFAULT
+    z.fDsIntegrated = 1
+    z.fLoadExisting = 1
+    z.fAllowUpdate = dnsp.DNS_ZONE_UPDATE_UNSECURE
+    rpc.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+                         0,
+                         SERVER_IP,
+                         None,
+                         0,
+                         'ZoneCreate',
+                         dnsserver.DNSSRV_TYPEID_ZONE_CREATE,
+                         z)
+
+
+def delete_zone(name, rpc=None):
+    if rpc is None:
+        rpc = get_rpc()
+    rpc.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+                         0,
+                         SERVER_IP,
+                         name,
+                         0,
+                         'DeleteZoneFromDs',
+                         dnsserver.DNSSRV_TYPEID_NULL,
+                         None)
+
+
+def txt_s_list(txt):
+    """Construct a txt record string list, which is a fiddly matter."""
+    if isinstance(txt, str):
+        txt = [txt]
+    s_list = dnsp.string_list()
+    s_list.count = len(txt)
+    s_list.str = txt
+    return s_list
+
+
+def copy_rec(rec):
+    copy = dnsserver.DNS_RPC_RECORD()
+    copy.wType = rec.wType
+    copy.dwFlags = rec.dwFlags
+    copy.dwSerial = rec.dwSerial
+    copy.dwTtlSeconds = rec.dwTtlSeconds
+    copy.data = rec.data
+    copy.dwTimeStamp = rec.dwTimeStamp
+    return copy
+
+
+class TestDNSAging(DNSTest):
+    """Probe DNS aging and scavenging, using LDAP and RPC to set and test
+    the timestamps behind DNS's back."""
+    server = SERVER_NAME
+    server_ip = SERVER_IP
+    creds = CREDS
+
+    def setUp(self):
+        super().setUp()
+        self.rpc_conn = get_rpc()
+        self.samdb = get_samdb()
+
+        # We always have a zone of our own named after the test function.
+        self.zone = self.id().rsplit('.', 1)[1]
+        self.addCleanup(delete_zone, self.zone, self.rpc_conn)
+        try:
+            create_zone(self.zone, self.rpc_conn)
+        except WERRORError as e:
+            if e.args[0] != werror.WERR_DNS_ERROR_ZONE_ALREADY_EXISTS:
+                raise
+            print(f"zone {self.zone} already exists")
+
+        # Though we set this in create_zone(), that doesn't work on
+        # Windows, so we repeat again here.
+        self.set_zone_int_params(AllowUpdate=dnsp.DNS_ZONE_UPDATE_UNSECURE)
+
+        self.zone_dn = (f"DC={self.zone},CN=MicrosoftDNS,DC=DomainDNSZones,"
+                        f"{self.samdb.get_default_basedn()}")
+
+    def set_zone_int_params(self, zone=None, **kwargs):
+        """Keyword arguments set parameters on the zone. e.g.:
+
+            self.set_zone_int_params(Aging=1,
+                                     RefreshInterval=222)
+
+        See [MS-DNSP] 3.1.1.2.1 "DNS Zone Integer Properties" for names.
+        """
+        if zone is None:
+            zone = self.zone
+        for key, val in kwargs.items():
+            name_param = dnsserver.DNS_RPC_NAME_AND_PARAM()
+            name_param.dwParam = val
+            name_param.pszNodeName = key
+            try:
+                self.rpc_conn.DnssrvOperation2(
+                    dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+                    0,
+                    SERVER_IP,
+                    zone,
+                    0,
+                    'ResetDwordProperty',
+                    dnsserver.DNSSRV_TYPEID_NAME_AND_PARAM,
+                    name_param)
+            except WERRORError as e:
+                self.fail(str(e))
+
+    def rpc_replace(self, name, old=None, new=None):
+        """Replace a DNS_RPC_RECORD or DNS_RPC_RECORD_BUF"""
+        # wrap our recs, if necessary
+        if isinstance(new, dnsserver.DNS_RPC_RECORD):
+            rec = new
+            new = dnsserver.DNS_RPC_RECORD_BUF()
+            new.rec = rec
+
+        if isinstance(old, dnsserver.DNS_RPC_RECORD):
+            rec = old
+            old = dnsserver.DNS_RPC_RECORD_BUF()
+            old.rec = rec
+
+        try:
+            self.rpc_conn.DnssrvUpdateRecord2(
+                dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+                0,
+                SERVER_IP,
+                self.zone,
+                name,
+                new,
+                old)
+        except WERRORError as e:
+            self.fail(f"could not replace record ({e})")
+
+    def rpc_add(self, name, data, wtype):
+        rec_buf = recbuf_from_string(wtype, data)
+        self.rpc_replace(name, None, rec_buf)
+
+    def rpc_delete(self, name, data, wtype):
+        rec_buf = recbuf_from_string(wtype, data)
+        self.rpc_replace(name, rec_buf, None)
+
+    def get_unique_txt_record(self, name, txt):
+        """Get the TXT record on Name with value txt, asserting that there is
+        only one."""
+        if isinstance(txt, str):
+            txt = [txt]
+        recs = self.ldap_get_records(name)
+
+        match = None
+        for r in recs:
+            if r.wType != dnsp.DNS_TYPE_TXT:
+                continue
+            txt2 = [x for x in r.data.str]
+            if txt2 == txt:
+                self.assertIsNone(match)
+                match = r
+        return match
+
+    def dns_update_record(self, name, txt, ttl=900):
+        if isinstance(txt, str):
+            txt = [txt]
+        p = self.make_txt_update(name, txt, self.zone, ttl=ttl)
+        (code, response) = self.dns_transaction_udp(p, host=SERVER_IP)
+        self.assert_dns_rcode_equals(code, dns.DNS_RCODE_OK)
+        return self.get_unique_txt_record(name, txt)
+
+    def rpc_update_record(self, name, txt, **kwargs):
+        """Add the record that self.dns_update_record() would add, via the
+        dnsserver RPC pipe.
+
+        As with DNS update, if the record already exists, we replace it.
+        """
+        if isinstance(txt, str):
+            txt = [txt]
+
+        old = TXTRecord(txt)
+        rec = TXTRecord(txt)
+        for k, v in kwargs.items():
+            setattr(rec, k, v)
+
+        try:
+            self.rpc_replace(name, old, rec)
+        except AssertionError as e:
+            # we have caught and wrapped the WERRor inside
+            if 'WERR_DNS_ERROR_RECORD_DOES_NOT_EXIST' not in str(e):
+                raise
+            self.rpc_replace(name, None, rec)
+
+        return self.get_unique_txt_record(name, txt)
+
+    def get_one_node(self, name):
+        expr = f"(&(objectClass=dnsNode)(name={name}))"
+        nodes = self.samdb.search(base=self.zone_dn,
+                                  scope=ldb.SCOPE_SUBTREE,
+                                  expression=expr,
+                                  attrs=["dnsRecord", "dNSTombstoned", "name"])
+
+        if len(nodes) > 1:
+            self.fail(
+                f"expected 0 or 1 dnsNodes for {name}, found {len(nodes)}")
+
+        if len(nodes) == 0:
+            return None
+        return nodes[0]
+
+    def ldap_get_records(self, name):
+        node = self.get_one_node(name)
+        if node is None:
+            return []
+
+        records = node.get('dnsRecord')
+        return [ndr_unpack(dnsp.DnssrvRpcRecord, r) for r in records]
+
+    def assert_tombstoned(self, name, tombstoned=True, timestamp=None):
+        # If run with tombstoned=False, assert it isn't tombstoned
+        # (and has no traces of tombstone). Otherwise assert it has
+        # all the necessary bits.
+        node = self.get_one_node(name)
+        if node is None:
+            self.fail(f"no node named {name}")
+
+        dnsts = node.get("dNSTombstoned")
+        if dnsts is None:
+            is_tombstoned = False
+        else:
+            self.assertEqual(len(dnsts), 1)
+            if dnsts[0] == b'TRUE':
+                is_tombstoned = True
+            else:
+                is_tombstoned = False
+
+        if tombstoned != is_tombstoned:
+            if is_tombstoned:
+                self.fail(f"{name} is tombstoned")
+            else:
+                self.fail(f"{name} is not tombstoned")
+
+        recs = self.ldap_get_records(name)
+        if is_tombstoned:
+            self.assertEqual(len(recs), 1)
+            self.assertEqual(recs[0].wType, dnsp.DNS_TYPE_TOMBSTONE)
+            if timestamp is None:
+                self.assert_nttime_in_hour_range(recs[0].data)
+            else:
+                self.assert_nttime_in_hour_range(recs[0].data,
+                                                 timestamp - 3,
+                                                 timestamp + 3)
+
+        else:
+            for r in recs:
+                self.assertNotEqual(recs[0].wType, dnsp.DNS_TYPE_TOMBSTONE)
+
+    def ldap_replace_records(self, name, records):
+        # We use raw ldap to avoid the "helpfulness" of dsdb_dns.replace()
+
+        dn = f'DC={name},{self.zone_dn}'
+
+        msg = ldb.Message.from_dict(self.samdb,
+                                    {'dn': dn,
+                                     'dnsRecord': [ndr_pack(r) for r in records]
+                                    },
+                                    ldb.FLAG_MOD_REPLACE)
+
+        try:
+            self.samdb.modify(msg)
+        except ldb.LdbError as e:
+            if 'LDAP_NO_SUCH_OBJECT' not in e.args[1]:
+                raise
+            # We need to do an add
+            msg["objectClass"] = ["top", "dnsNode"]
+            msg["dnsRecord"].set_flags(ldb.FLAG_MOD_ADD)
+            self.samdb.add(msg)
+
+    def ldap_update_record(self, name, txt, **kwargs):
+        """Add the record that self.dns_update_record() would add, via ldap,
+        thus allowing us to set additional dnsRecord features like
+        dwTimestamp.
+        """
+        records = self.ldap_get_records(name)
+
+        # default values
+        rec = dnsp.DnssrvRpcRecord()
+        rec.wType = dnsp.DNS_TYPE_TXT
+        rec.rank = dnsp.DNS_RANK_ZONE
+        rec.dwTtlSeconds = 900
+        rec.dwSerial = 110
+        rec.dwTimeStamp = 0
+        rec.data = txt_s_list(txt)
+
+        # override defaults, as required
+        for k, v in kwargs.items():
+            setattr(rec, k, v)
+
+        for i, r in enumerate(records[:]):
+            if dsdb_dns.records_match(r, rec):
+                records[i] = rec
+                break
+        else:  # record not found
+            records.append(rec)
+
+        self.ldap_replace_records(name, records)
+
+        recs = self.ldap_get_records(name)
+        match = None
+        for r in recs:
+            if r.wType != rec.wType:
+                continue
+            if r.data.str == rec.data.str:
+                self.assertIsNone(match, f"duplicate records for {name}")
+                match = r
+        self.assertEqual(match.rank, rec.rank & 255)
+        self.assertEqual(match.dwTtlSeconds, rec.dwTtlSeconds)
+        self.assertEqual(match.dwTimeStamp, rec.dwTimeStamp)
+        return match
+
+    def ldap_delete_record(self, name, txt):
+        rec = dnsp.DnssrvRpcRecord()
+        rec.wType = dnsp.DNS_TYPE_TXT
+        rec.data = txt_s_list(txt)
+        records = self.ldap_get_records(name)
+        for i, r in enumerate(records[:]):
+            if dsdb_dns.records_match(r, rec):
+                del records[i]
+                break
+        else:
+            self.fail(f"record {txt} not found")
+
+        self.ldap_replace_records(name, records)
+
+    def ldap_modify_timestamps(self, name, delta):
+        records = self.ldap_get_records(name)
+        for rec in records:
+            rec.dwTimeStamp += delta
+        self.ldap_replace_records(name, records)
+
+    def get_rpc_records(self, name, dns_type=None):
+        if dns_type is None:
+            dns_type = dnsp.DNS_TYPE_ALL
+        select_flags = dnsserver.DNS_RPC_VIEW_AUTHORITY_DATA
+        buflen, res = self.rpc_conn.DnssrvEnumRecords2(
+            dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+            0,
+            SERVER_IP,
+            self.zone,
+            name,
+            None,
+            dns_type,
+            select_flags,
+            None,
+            None)
+        recs = []
+        if not res or res.count == 0:
+            return []
+        for rec in res.rec:
+            recs.extend(rec.records)
+        return recs
+
+    def dns_tombstone(self, name,
+                      epoch_hours=DNS_TIMESTAMP_1981,
+                      epoch_nttime=None):
+        dn = f'DC={name},{self.zone_dn}'
+        r = dnsp.DnssrvRpcRecord()
+        r.wType = dnsp.DNS_TYPE_TOMBSTONE
+        # r.dwTimeStamp is a 32 bit value in hours, and r.data is an
+        # NTTIME (100 nanosecond intervals), both in the 1601 epoch. A
+        # tombstome will have both, but expiration calculations use
+        # the r.data NTTIME EntombedTime timestamp (see [MS-DNSP]).
+        r.dwTimeStamp = epoch_hours
+        if epoch_nttime is None:
+            r.data = epoch_hours * 3600 * 10 * 1000 * 1000
+        else:
+            r.data = epoch_nttime
+
+        msg = ldb.Message.from_dict(self.samdb,
+                                    {'dn': dn,
+                                     'dnsRecord': [ndr_pack(r)],
+                                     'dnsTombstoned': 'TRUE'
+                                    },
+                                    ldb.FLAG_MOD_REPLACE)
+        try:
+            self.samdb.modify(msg)
+        except ldb.LdbError as e:
+            if 'LDAP_NO_SUCH_OBJECT' not in e.args[1]:
+                raise
+            # We need to do an add
+            msg["objectClass"] = ["top", "dnsNode"]
+            self.samdb.add(msg)
+
+    def set_aging(self, enable=False):
+        self.set_zone_int_params(Aging=int(bool(enable)))
+
+    def assert_timestamp_in_ballpark(self, rec):
+        self.assertGreater(rec.dwTimeStamp, DNS_TIMESTAMP_1970)
+        self.assertLess(rec.dwTimeStamp, DNS_TIMESTAMP_2101)
+
+    def assert_nttime_in_hour_range(self, t,
+                                    hour_min=DNS_TIMESTAMP_1970,
+                                    hour_max=DNS_TIMESTAMP_2101):
+        t //= int(3600 * 1e7)
+        self.assertGreater(t, hour_min)
+        self.assertLess(t, hour_max)
+
+    def assert_soon_after(self, timestamp, reference):
+        """Assert that a timestamp is the same or very slightly higher than a
+        reference timestamp.
+
+        Typically we expect the timestamps to be identical, unless an
+        hour has clicked over since the reference was taken. However
+        we allow one more hour in case it happens during a daylight
+        savings transition or something.
+        """
+        self.assertGreaterEqual(timestamp, reference)
+        self.assertLess(timestamp, reference + 3)
+
+    def test_update_timestamps_aging_off_then_on(self):
+        # we will add a record with aging off
+        # it will have the current timestamp
+        self.set_aging(False)
+        name = 'timestamp-now'
+        name2 = 'timestamp-eightdays'
+
+        rec = self.dns_update_record(name, [name])
+        start_time = rec.dwTimeStamp
+        self.assert_timestamp_in_ballpark(rec)
+        # alter the timestamp -8 days using RPC
+        # with aging turned off, we expect no change
+        # when aging is on, we expect change
+        eight_days_ago = start_time - 8 * 24
+        rec = self.ldap_update_record(name2, [name2],
+                                      dwTimeStamp=eight_days_ago)
+
+        self.assertEqual(rec.dwTimeStamp, eight_days_ago)
+
+        # if aging was on, this would change
+        rec = self.dns_update_record(name2, [name2])
+        self.assertEqual(rec.dwTimeStamp, eight_days_ago)
+
+        self.set_aging(True)
+        rec = self.dns_update_record(name2, [name2])
+        self.assertGreaterEqual(rec.dwTimeStamp, start_time)
+
+    def test_rpc_update_timestamps(self):
+        # RPC always sets timestamps to zero on Windows.
+        self.set_aging(False)
+        name = 'timestamp-now'
+
+        rec = self.dns_update_record(name, [name])
+        start_time = rec.dwTimeStamp
+        self.assert_timestamp_in_ballpark(rec)
+        # attempt to alter the timestamp to something close by.
+        eight_days_ago = start_time - 8 * 24
+        rec = self.rpc_update_record(name, [name],
+                                     dwTimeStamp=eight_days_ago)
+        self.assertEqual(rec.dwTimeStamp, 0)
+
+        # try again, with aging on
+        self.set_aging(True)
+        rec = self.rpc_update_record(name, [name],
+                                     dwTimeStamp=eight_days_ago)
+        self.assertEqual(rec.dwTimeStamp, 0)
+
+        # now that the record is static, a dns update won't change it
+        rec = self.dns_update_record(name, [name])
+        self.assertEqual(rec.dwTimeStamp, 0)
+
+        # but another record on the same node will behave normally
+        # i.e. the node is not static, the record is.
+        name2 = 'timestamp-eightdays'
+        rec = self.dns_update_record(name2, [name2])
+        self.assert_soon_after(rec.dwTimeStamp,
+                               start_time)
+
+    def get_txt_timestamps(self, name, *txts):
+        records = self.ldap_get_records(name)
+
+        ret = []
+        for t in txts:
+            for r in records:
+                t2 = [x for x in r.data.str]
+                if t == t2:
+                    ret.append(r.dwTimeStamp)
+        return ret
+
+    def test_update_aging_disabled_2(self):
+        # With aging disabled, Windows updates the timestamps of all
+        # records when one is updated.
+        name = 'test'
+        txt1 = ['test txt']
+        txt2 = ['test', 'txt2']
+        txt3 = ['test', 'txt3']
+
+        self.set_aging(False)
+
+        current_time = self.dns_update_record(name, txt1).dwTimeStamp
+
+        six_days_ago = current_time - 6 * 24
+        eight_days_ago = current_time - 8 * 24
+        fifteen_days_ago = current_time - 15 * 24
+        hundred_days_ago = current_time - 100 * 24
+        thousand_days_ago = current_time - 1000 * 24
+
+        for timestamp in (current_time,
+                          six_days_ago,
+                          eight_days_ago,
+                          fifteen_days_ago,
+                          hundred_days_ago,
+                          thousand_days_ago):
+            # wind back
+            self.ldap_update_record(name, txt1, dwTimeStamp=timestamp)
+            self.assertEqual(self.get_txt_timestamps(name, txt1), [timestamp])
+
+            # no change here
+            update_timestamp = self.dns_update_record(name, txt1).dwTimeStamp
+            self.assertEqual(update_timestamp, timestamp)
+
+        # adding a fresh record
+        for timestamp in (current_time,
+                          six_days_ago,
+                          eight_days_ago,
+                          fifteen_days_ago,
+                          hundred_days_ago,
+                          thousand_days_ago,
+                          100000,
+                          100):
+            # wind back
+            timestamp1 = self.ldap_update_record(
+                name,
+                txt1,
+                dwTimeStamp=timestamp).dwTimeStamp
+            self.assertEqual(timestamp1, timestamp)
+
+            self.dns_update_record(name, txt2)
+            timestamps = self.get_txt_timestamps(name, txt1, txt2)
+            self.assertEqual(timestamps, [timestamp, current_time])
+
+            self.ldap_delete_record(name, txt2)
+            timestamps = self.get_txt_timestamps(name, txt1)
+            self.assertEqual(timestamps, [timestamp])
+
+        # add record 2.
+        timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp
+        self.assert_soon_after(timestamp2, current_time)
+
+        for timestamp in (current_time,
+                          six_days_ago,
+                          eight_days_ago,
+                          fifteen_days_ago,
+                          hundred_days_ago,
+                          thousand_days_ago,
+                          100000,
+                          100):
+            # wind back
+            self.ldap_update_record(name, txt1, dwTimeStamp=timestamp)
+            timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
+            self.assertEqual(timestamp1, timestamp)
+
+            timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp
+            # txt1 timestamp is now current time
+            timestamps = self.get_txt_timestamps(name, txt1, txt2)
+            self.assertEqual(timestamps, [timestamp, current_time])
+
+        # with 3 records, no change
+        for timestamp in (current_time,
+                          six_days_ago,
+                          eight_days_ago,
+                          fifteen_days_ago,
+                          hundred_days_ago,
+                          thousand_days_ago,
+                          100000,
+                          10):
+            # wind back
+            self.ldap_update_record(name, txt1, dwTimeStamp=timestamp)
+            self.ldap_update_record(name, txt2, dwTimeStamp=timestamp)
+            self.ldap_update_record(name, txt3, dwTimeStamp=(timestamp + 30))
+            timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp
+            self.assertEqual(timestamp3, timestamp + 30)
+
+            self.dns_update_record(name, txt2).dwTimeStamp
+            timestamps = self.get_txt_timestamps(name, txt1, txt2, txt3)
+            self.assertEqual(timestamps, [timestamp,
+                                          timestamp,
+                                          timestamp + 30])
+
+        # with 3 records, one of which is static
+        # first we set the updatee's timestamp to a recognisable number
+        self.ldap_update_record(name, txt2, dwTimeStamp=999999)
+        for timestamp in (current_time,
+                          six_days_ago,
+                          eight_days_ago,
+                          fifteen_days_ago,
+                          hundred_days_ago,
+                          thousand_days_ago,
+                          100000,
+                          10):
+            # wind back
+            self.ldap_update_record(name, txt1, dwTimeStamp=0)
+            self.ldap_update_record(name, txt3, dwTimeStamp=(timestamp - 9))
+            timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp
+            self.assertEqual(timestamp3, timestamp - 9)
+
+            self.dns_update_record(name, txt2)
+            timestamps = self.get_txt_timestamps(name, txt1, txt2, txt3)
+            self.assertEqual(timestamps, [0,
+                                          999999,
+                                          timestamp - 9])
+
+        # with 3 records, updating one which is static
+        timestamp3 = self.dns_update_record(name, txt3).dwTimeStamp
+        for timestamp in (current_time,
+                          six_days_ago,
+                          eight_days_ago,
+                          fifteen_days_ago,
+                          hundred_days_ago,
+                          thousand_days_ago,
+                          100000,
+                          10):
+            # wind back
+            self.ldap_update_record(name, txt1, dwTimeStamp=0)
+            self.ldap_update_record(name, txt2, dwTimeStamp=0)
+            self.ldap_update_record(name, txt3, dwTimeStamp=(timestamp + 30))
+            timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp
+            self.assertEqual(timestamp3, timestamp + 30)
+
+            self.dns_update_record(name, txt2).dwTimeStamp
+            timestamps = self.get_txt_timestamps(name, txt1, txt2, txt3)
+            self.assertEqual(timestamps, [0,
+                                          0,
+                                          timestamp + 30])
+
+        # with 3 records, after the static nodes have been replaced
+        self.ldap_update_record(name, txt1, dwTimeStamp=777777)
+        self.ldap_update_record(name, txt2, dwTimeStamp=888888)
+        timestamp3 = self.dns_update_record(name, txt3).dwTimeStamp
+        for timestamp in (current_time,
+                          six_days_ago,
+                          eight_days_ago,
+                          fifteen_days_ago,
+                          hundred_days_ago,
+                          thousand_days_ago,
+                          100000,
+                          10):
+            # wind back
+            self.ldap_update_record(name, txt3, dwTimeStamp=(timestamp))
+            timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp
+            self.assertEqual(timestamp3, timestamp)
+
+            self.dns_update_record(name, txt2)
+            timestamps = self.get_txt_timestamps(name, txt1, txt2, txt3)
+            self.assertEqual(timestamps, [777777,
+                                          888888,
+                                          timestamp])
+
+    def broken_test_update_aging_disabled_rpc(self):
+        # This one doesn't work reliably on Windows because there is a
+        # race between RPC and ldap.
+        name = 'test'
+        txt1 = ['test txt']
+        txt2 = ['test', 'txt2']
+        txt3 = ['test', 'txt3']
+
+        self.set_aging(False)
+
+        current_time = self.dns_update_record(name, txt1).dwTimeStamp
+
+        six_days_ago = current_time - 6 * 24
+        eight_days_ago = current_time - 8 * 24
+        fifteen_days_ago = current_time - 15 * 24
+        hundred_days_ago = current_time - 100 * 24
+        thousand_days_ago = current_time - 1000 * 24
+
+        # with 3 records, rpc updates
+        timestamp3 = self.dns_update_record(name, txt3).dwTimeStamp
+        for timestamp in (current_time,
+                          six_days_ago,
+                          eight_days_ago,
+                          fifteen_days_ago,
+                          hundred_days_ago,
+                          thousand_days_ago,
+                          100000,
+                          10):
+            # wind back
+            self.ldap_update_record(name, txt1, dwTimeStamp=777777)
+            self.ldap_update_record(name, txt2, dwTimeStamp=888888)
+            self.ldap_update_record(name, txt3, dwTimeStamp=(timestamp))
+            timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp
+            self.assertEqual(timestamp3, timestamp)
+
+            self.rpc_update_record(name, txt2)
+            time.sleep(2)
+            timestamps = self.get_txt_timestamps(name, txt1, txt2, txt3)
+            self.assertEqual(timestamps, [777777,
+                                          0,
+                                          timestamp])
+
+    def _test_update_aging_disabled_n_days_ago(self, n_days):
+        name = 'test'
+        txt1 = ['1']
+        txt2 = ['2']
+
+        self.set_aging(False)
+        current_time = self.dns_update_record(name, txt1).dwTimeStamp
+
+        # rewind timestamp using ldap
+        self.ldap_modify_timestamps(name, n_days * -24)
+        n_days_ago = self.get_unique_txt_record(name, txt1).dwTimeStamp
+        self.assertGreater(current_time, n_days_ago)
+
+        # no change when updating this record
+        update_timestamp = self.dns_update_record(name, txt1).dwTimeStamp
+        self.assertEqual(update_timestamp, n_days_ago)
+
+        # add another record, which should have the current timestamp
+        timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp
+        self.assert_soon_after(timestamp2, current_time)
+
+        # get the original record timestamp. NOW it matches current_time
+        timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
+        self.assertEqual(timestamp1, timestamp2)
+
+        # let's repeat that, this time with txt2 existing
+        self.ldap_update_record(name, txt1, dwTimeStamp=n_days_ago)
+
+        timestamp1 = self.dns_update_record(name, txt1).dwTimeStamp
+        self.assertEqual(timestamp1, n_days_ago)
+
+        # this update is not an add
+        timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp
+        self.assert_soon_after(timestamp2, current_time)
+
+        # now timestamp1 is not changed
+        timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
+        self.assertEqual(timestamp1, n_days_ago)
+
+        # delete record2, try again
+        self.ldap_delete_record(name, txt2)
+        self.ldap_update_record(name, txt1, dwTimeStamp=n_days_ago)
+
+        timestamp1 = self.dns_update_record(name, txt1).dwTimeStamp
+        self.assertEqual(timestamp1, n_days_ago)
+
+        # here we are re-adding the deleted record
+        timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp
+        self.assert_soon_after(timestamp2, current_time)
+
+        timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
+
+        # It gets weird HERE.
+        # note how the SIBLING of the deleted, re-added record differs
+        # from the sibling of freshly added record, depending on the
+        # time difference.
+        if n_days <= 7:
+            self.assertEqual(timestamp1, n_days_ago)
+        else:
+            self.assertEqual(timestamp1, timestamp2)
+
+        # re-timestamp record2, try again
+        self.ldap_update_record(name, txt2, dwTimeStamp=n_days_ago)
+        self.ldap_update_record(name, txt1, dwTimeStamp=n_days_ago)
+
+        timestamp1 = self.dns_update_record(name, txt1).dwTimeStamp
+        self.assertEqual(timestamp1, n_days_ago)
+
+        # no change
+        timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp
+        self.assertEqual(timestamp2, n_days_ago)
+        # also no change
+        timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
+        self.assertEqual(timestamp1, timestamp2)
+
+        # let's introduce another record
+        txt3 = ['3']
+        self.ldap_update_record(name, txt2, dwTimeStamp=n_days_ago)
+        self.ldap_update_record(name, txt1, dwTimeStamp=n_days_ago)
+
+        timestamp3 = self.dns_update_record(name, txt3).dwTimeStamp
+        self.assert_soon_after(timestamp3, current_time)
+
+        timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
+        timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp
+
+        if n_days <= 7:
+            self.assertEqual(timestamp1, n_days_ago)
+        else:
+            self.assertEqual(timestamp1, timestamp3)
+
+        self.assertEqual(timestamp2, timestamp3)
+
+        self.ldap_delete_record(name, txt3)
+        timestamp2 = self.dns_update_record(name, txt3).dwTimeStamp
+        self.assert_soon_after(timestamp3, current_time)
+        timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
+        timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp
+
+        if n_days <= 7:
+            self.assertEqual(timestamp1, n_days_ago)
+        else:
+            self.assertEqual(timestamp1, timestamp3)
+
+        self.assertEqual(timestamp2, timestamp3)
+
+        # and here we'll make txt3 static
+        txt4 = ['4']
+
+        # and here we'll make txt1 static
+        self.ldap_update_record(name, txt1, dwTimeStamp=0)
+        self.ldap_update_record(name, txt2, dwTimeStamp=n_days_ago)
+        self.ldap_update_record(name, txt3, dwTimeStamp=n_days_ago)
+        timestamp1 = self.dns_update_record(name, txt1).dwTimeStamp
+        timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp
+        timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp
+        timestamp4 = self.dns_update_record(name, txt4).dwTimeStamp
+
+        self.assertEqual(timestamp1, 0)
+        self.assertEqual(timestamp2, n_days_ago)
+        self.assertEqual(timestamp3, n_days_ago)
+        self.assert_soon_after(timestamp4, current_time)
+
+    def test_update_aging_disabled_in_no_refresh_window(self):
+        self._test_update_aging_disabled_n_days_ago(4)
+
+    def test_update_aging_disabled_on_no_refresh_boundary(self):
+        self._test_update_aging_disabled_n_days_ago(7)
+
+    def test_update_aging_disabled_in_refresh_window(self):
+        self._test_update_aging_disabled_n_days_ago(9)
+
+    def test_update_aging_disabled_beyond_refresh_window(self):
+        self._test_update_aging_disabled_n_days_ago(16)
+
+    def test_update_aging_disabled_in_eighteenth_century(self):
+        self._test_update_aging_disabled_n_days_ago(100000)
+
+    def test_update_aging_disabled_static(self):
+        name = 'test'
+        txt1 = ['1']
+        txt2 = ['2']
+
+        self.set_aging(False)
+
+        current_time = self.dns_update_record(name, txt1).dwTimeStamp
+        self.ldap_update_record(name, txt1, dwTimeStamp=0)
+
+        # no change when updating this record
+        timestamp1 = self.dns_update_record(name, txt1).dwTimeStamp
+        self.assertEqual(timestamp1, 0)
+
+        # add another record, which should have the current timestamp
+        timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp
+        self.assert_soon_after(timestamp2, current_time)
+
+        timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
+        self.assert_soon_after(timestamp1, current_time)
+
+        # let's repeat that, this time with txt2 existing
+        timestamp1 = self.dns_update_record(name, txt1).dwTimeStamp
+        self.assert_soon_after(timestamp2, current_time)
+
+        timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp
+        self.assert_soon_after(timestamp2, current_time)
+
+        # delete record2, try again
+        self.ldap_delete_record(name, txt2)
+        self.ldap_update_record(name, txt1, dwTimeStamp=0)
+        # no change when updating this record
+        timestamp1 = self.dns_update_record(name, txt1).dwTimeStamp
+        self.assertEqual(timestamp1, 0)
+
+        timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp
+        self.assertEqual(timestamp2, 0)
+
+        timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
+        self.assertEqual(timestamp1, 0)
+        # re-timestamp record2, try again
+        self.ldap_update_record(name, txt2, dwTimeStamp=1)
+        self.ldap_update_record(name, txt1, dwTimeStamp=0)
+        # no change when updating this record
+        timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp
+        self.assertEqual(timestamp2, 1)
+
+    def test_update_aging_disabled(self):
+        # With aging disabled, Windows updates the timestamps of all
+        # records when one is updated.
+        name = 'test'
+        txt1 = ['test txt']
+        txt2 = ['test', 'txt2']
+        txt3 = ['test', 'txt3']
+        minus_6 = -6 * 24
+        minus_8 = -8 * 24
+
+        self.set_aging(False)
+
+        current_time = self.dns_update_record(name, txt1).dwTimeStamp
+
+        # rewind timestamp using ldap
+        self.ldap_modify_timestamps(name, minus_6)
+        after_mod = self.get_unique_txt_record(name, txt1)
+        six_days_ago = after_mod.dwTimeStamp
+        self.assertEqual(six_days_ago, current_time + minus_6)
+
+        # no change
+        update_timestamp = self.dns_update_record(name, txt1).dwTimeStamp
+        self.assertEqual(update_timestamp, six_days_ago)
+
+        self.check_query_txt(name, txt1, zone=self.zone)
+
+        # another record
+        timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp
+        self.assert_soon_after(timestamp2, current_time)
+
+        timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
+        # without aging, timestamp1 is changed!!
+        self.assertEqual(timestamp1, timestamp2)
+
+        # Set both records back to 8 days ago.
+        self.ldap_modify_timestamps(name, minus_8)
+
+        eight_days_ago = self.get_unique_txt_record(name, txt1).dwTimeStamp
+        self.assertEqual(eight_days_ago, current_time + minus_8)
+
+        update2 = self.dns_update_record(name, txt2)
+
+        # Without aging on, an update should not change the timestamps.
+        self.assertEqual(update2.dwTimeStamp, eight_days_ago)
+        timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
+        self.assertEqual(timestamp1, eight_days_ago)
+
+        # Add another txt record. The new record should have the now
+        # timestamp, and drag the others up with it.
+        timestamp3 = self.dns_update_record(name, txt3).dwTimeStamp
+        self.assert_soon_after(timestamp3, current_time)
+        timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
+        timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp
+        self.assertEqual(timestamp1, timestamp3)
+        self.assertEqual(timestamp2, timestamp3)
+
+        hundred_days_ago = current_time - 100 * 24
+        thousand_days_ago = current_time - 1000 * 24
+        record = self.ldap_update_record(name, txt1,
+                                         dwTimeStamp=hundred_days_ago)
+        self.assertEqual(record.dwTimeStamp, hundred_days_ago)
+        record = self.ldap_update_record(name, txt2,
+                                         dwTimeStamp=thousand_days_ago)
+        self.assertEqual(record.dwTimeStamp, thousand_days_ago)
+
+        # update 3, will others change (because beyond RefreshInterval)? yes.
+        timestamp3 = self.dns_update_record(name, txt3).dwTimeStamp
+        timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
+        timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp
+        self.assert_soon_after(timestamp3, current_time)
+        self.assertEqual(timestamp1, hundred_days_ago)
+        self.assertEqual(timestamp2, thousand_days_ago)
+
+        fifteen_days_ago = current_time - 15 * 24
+        self.ldap_update_record(name, txt3, dwTimeStamp=fifteen_days_ago)
+
+        timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp
+        timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
+        timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp
+        # DNS update has no effect because all records are old
+        self.assertEqual(timestamp2, thousand_days_ago)
+        self.assertEqual(timestamp1, hundred_days_ago)
+        self.assertEqual(timestamp3, fifteen_days_ago)
+
+        # Does update of old record affect timestamp of refreshable record? No.
+        self.ldap_update_record(name, txt3, dwTimeStamp=eight_days_ago)
+        timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp
+        timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
+        timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp
+        # DNS update has no effect because all records are old
+        self.assertEqual(timestamp2, thousand_days_ago)
+        self.assertEqual(timestamp1, hundred_days_ago)
+        self.assertEqual(timestamp3, eight_days_ago)
+
+        # RPC zeros timestamp, after which updates won't change it.
+        # BUT it refreshes all others!
+        self.rpc_update_record(name, txt2)
+
+        timestamp2 = self.dns_update_record(name, txt3).dwTimeStamp
+        timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
+        timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp
+        self.assertEqual(timestamp2, 0)
+        self.assert_soon_after(timestamp1, current_time)
+        self.assertEqual(timestamp3, eight_days_ago)
+
+    def test_update_aging_enabled(self):
+        name = 'test'
+        txt1 = ['test txt']
+        txt2 = ['test', 'txt2']
+        txt3 = ['test', 'txt3']
+        txt4 = ['4']
+
+        self.set_aging(True)
+
+        current_time = self.dns_update_record(name, txt2).dwTimeStamp
+
+        six_days_ago = current_time - 6 * 24
+        eight_days_ago = current_time - 8 * 24
+        fifteen_days_ago = current_time - 15 * 24
+        hundred_days_ago = current_time - 100 * 24
+
+        self.ldap_update_record(name, txt1, dwTimeStamp=six_days_ago)
+
+        # with or without aging, a delta of -6 days does not affect
+        # timestamps, because dwNoRefreshInterval is 7 days.
+        timestamp1 = self.dns_update_record(name, txt1).dwTimeStamp
+        timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp
+
+        self.assertEqual(timestamp1, six_days_ago)
+        self.assert_soon_after(timestamp2, current_time)
+
+        self.ldap_update_record(name, txt3, dwTimeStamp=eight_days_ago)
+        timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp
+        self.assertEqual(timestamp3, eight_days_ago)
+
+        # update 1, what happens to 2 and 3? Nothing?
+        timestamp1 = self.dns_update_record(name, txt1).dwTimeStamp
+        timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp
+        timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp
+        self.assertEqual(timestamp1, six_days_ago)
+        self.assert_soon_after(timestamp2, current_time)
+        self.assertEqual(timestamp3, eight_days_ago)
+
+        # now set 1 to 8 days, and we should see changes
+        self.ldap_update_record(name, txt1, dwTimeStamp=eight_days_ago)
+
+        # update 1, what happens to 2 and 3? Nothing?
+        timestamp1 = self.dns_update_record(name, txt1).dwTimeStamp
+        timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp
+        timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp
+        self.assert_soon_after(timestamp1, current_time)
+        self.assert_soon_after(timestamp2, current_time)
+        self.assertEqual(timestamp3, eight_days_ago)
+
+        # next few ones use these numbers
+        self.ldap_update_record(name, txt1, dwTimeStamp=fifteen_days_ago)
+        self.ldap_update_record(name, txt2, dwTimeStamp=six_days_ago)
+        self.ldap_update_record(name, txt3, dwTimeStamp=eight_days_ago)
+
+        # change even though 1 is outside the window
+        timestamp1 = self.dns_update_record(name, txt1).dwTimeStamp
+        timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp
+        timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp
+        self.assert_soon_after(timestamp1, current_time)
+        self.assertEqual(timestamp2, six_days_ago)
+        self.assertEqual(timestamp3, eight_days_ago)
+
+        # reset 1
+        self.ldap_update_record(name, txt1, dwTimeStamp=fifteen_days_ago)
+
+        # no change, because 2 is outside the window
+        timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp
+        timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
+        timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp
+        self.assertEqual(timestamp1, fifteen_days_ago)
+        self.assertEqual(timestamp2, six_days_ago)
+        self.assertEqual(timestamp3, eight_days_ago)
+
+        # 3 changes, others do not
+        timestamp3 = self.dns_update_record(name, txt3).dwTimeStamp
+        timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
+        timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp
+        self.assertEqual(timestamp1, fifteen_days_ago)
+        self.assertEqual(timestamp2, six_days_ago)
+        self.assert_soon_after(timestamp3, current_time)
+
+        # reset 3 to 100 days
+        self.ldap_update_record(name, txt3, dwTimeStamp=hundred_days_ago)
+
+        # 3 changes, others do not
+        timestamp3 = self.dns_update_record(name, txt3).dwTimeStamp
+        timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
+        timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp
+        self.assertEqual(timestamp1, fifteen_days_ago)
+        self.assertEqual(timestamp2, six_days_ago)
+        self.assert_soon_after(timestamp3, current_time)
+
+        # reset 1 and 3 to 8 days. does update of 1 affect 3?
+        self.ldap_update_record(name, txt1, dwTimeStamp=eight_days_ago)
+        self.ldap_update_record(name, txt3, dwTimeStamp=eight_days_ago)
+
+        # 1 changes, others do not
+        timestamp1 = self.dns_update_record(name, txt1).dwTimeStamp
+        timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp
+        timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp
+        self.assert_soon_after(timestamp1, current_time)
+        self.assertEqual(timestamp2, six_days_ago)
+        self.assertEqual(timestamp3, eight_days_ago)
+
+        # Try an RPC update, zeroing 1 --> what happens to 3?
+        timestamp1 = self.rpc_update_record(name, txt1).dwTimeStamp
+        timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp
+        timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp
+        self.assertEqual(timestamp1, 0)
+        self.assertEqual(timestamp2, six_days_ago)
+        self.assertEqual(timestamp3, eight_days_ago)
+
+        # with 2 and 3 at 8 days, does static record change things?
+        self.ldap_update_record(name, txt2, dwTimeStamp=eight_days_ago)
+        # 2 changes, but to zero!
+        timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp
+        timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
+        timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp
+        self.assertEqual(timestamp1, 0)
+        self.assertEqual(timestamp2, 0)
+        self.assertEqual(timestamp3, eight_days_ago)
+
+        self.ldap_update_record(name, txt2, dwTimeStamp=six_days_ago)
+        self.ldap_update_record(name, txt1, dwTimeStamp=3000000)
+        timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
+        self.assertEqual(timestamp1, 3000000)
+
+        # dns update remembers that node is static, even with no
+        # static records.
+        timestamp1 = self.dns_update_record(name, txt1).dwTimeStamp
+        self.assertEqual(timestamp1, 0)
+
+        # Add another txt record. The new record should have the now
+        # timestamp, and the others should remain unchanged.
+        # BUT somehow record 1 is static!?
+        timestamp4 = self.dns_update_record(name, txt4).dwTimeStamp
+        timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
+        timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp
+        timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp
+        self.assertEqual(timestamp1, 0)
+        self.assertEqual(timestamp2, six_days_ago)
+        self.assertEqual(timestamp3, eight_days_ago)
+        self.assertEqual(timestamp4, 0)
+
+    def _test_update_aging_enabled_n_days_ago(self, n_days):
+        name = 'test'
+        txt1 = ['1']
+        txt2 = ['2']
+        delta = n_days * -24
+
+        self.set_aging(True)
+        current_time = self.dns_update_record(name, txt1).dwTimeStamp
+
+        # rewind timestamp using ldap
+        self.ldap_modify_timestamps(name, delta)
+        n_days_ago = self.get_unique_txt_record(name, txt1).dwTimeStamp
+        self.assertGreater(current_time, n_days_ago)
+
+        # update changes timestamp depending on time.
+        timestamp1 = self.dns_update_record(name, txt1).dwTimeStamp
+        if n_days <= 7:
+            self.assertEqual(timestamp1, n_days_ago)
+        else:
+            self.assert_soon_after(timestamp1, current_time)
+
+        # add another record, which should have the current timestamp
+        timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp
+        self.assert_soon_after(timestamp2, current_time)
+
+        # first record should not have changed
+        timestamp1_b = self.get_unique_txt_record(name, txt1).dwTimeStamp
+        self.assertEqual(timestamp1, timestamp1_b)
+
+        # let's repeat that, this time with txt2 existing
+        self.ldap_update_record(name, txt1, dwTimeStamp=n_days_ago)
+
+        timestamp1 = self.dns_update_record(name, txt1).dwTimeStamp
+        self.assertEqual(timestamp1, timestamp1_b)
+
+        # this update is not an add. record 2 is already up-to-date
+        timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp
+        self.assert_soon_after(timestamp2, current_time)
+
+        # now timestamp1 is not changed
+        timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
+        self.assertEqual(timestamp1, timestamp1_b)
+
+        # delete record2, try again
+        self.ldap_delete_record(name, txt2)
+        self.ldap_update_record(name, txt1, dwTimeStamp=n_days_ago)
+
+        timestamp1 = self.dns_update_record(name, txt1).dwTimeStamp
+        if n_days <= 7:
+            self.assertEqual(timestamp1, n_days_ago)
+        else:
+            self.assert_soon_after(timestamp1, current_time)
+
+        # here we are re-adding the deleted record
+        timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp
+        self.assert_soon_after(timestamp2, current_time)
+
+        timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
+
+        # It gets weird HERE.
+        # note how the SIBLING of the deleted, re-added record differs
+        # from the sibling of freshly added record, depending on the
+        # time difference.
+        if n_days <= 7:
+            self.assertEqual(timestamp1, n_days_ago)
+        else:
+            self.assertEqual(timestamp1, timestamp2)
+
+        # re-timestamp record2, try again
+        self.ldap_update_record(name, txt2, dwTimeStamp=n_days_ago)
+        self.ldap_update_record(name, txt1, dwTimeStamp=n_days_ago)
+
+        # this should make no difference
+        timestamp1_b = self.dns_update_record(name, txt1).dwTimeStamp
+        self.assertEqual(timestamp1, timestamp1_b)
+
+        # no change
+        timestamp2 = self.dns_update_record(name, txt2).dwTimeStamp
+        self.assertEqual(timestamp2, timestamp1)
+        # also no change
+        timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
+        self.assertEqual(timestamp1, timestamp2)
+
+        # let's introduce another record
+        txt3 = ['3']
+        self.ldap_update_record(name, txt2, dwTimeStamp=n_days_ago)
+        self.ldap_update_record(name, txt1, dwTimeStamp=n_days_ago)
+
+        timestamp3 = self.dns_update_record(name, txt3).dwTimeStamp
+        self.assert_soon_after(timestamp3, current_time)
+
+        timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
+        timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp
+
+        self.assertEqual(timestamp1, n_days_ago)
+        self.assertEqual(timestamp2, n_days_ago)
+
+        self.ldap_delete_record(name, txt3)
+        timestamp2 = self.dns_update_record(name, txt3).dwTimeStamp
+        self.assert_soon_after(timestamp3, current_time)
+        timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
+        timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp
+
+        self.assertEqual(timestamp1, n_days_ago)
+        self.assertEqual(timestamp2, n_days_ago)
+
+        txt4 = ['4']
+
+        # Because txt1 is static, txt4 is static
+        self.ldap_update_record(name, txt1, dwTimeStamp=0)
+        self.ldap_update_record(name, txt2, dwTimeStamp=n_days_ago)
+        self.ldap_update_record(name, txt3, dwTimeStamp=n_days_ago)
+        timestamp1 = self.dns_update_record(name, txt1).dwTimeStamp
+        timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp
+        timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp
+        timestamp4 = self.dns_update_record(name, txt4).dwTimeStamp
+
+        self.assertEqual(timestamp1, 0)
+        self.assertEqual(timestamp2, n_days_ago)
+        self.assertEqual(timestamp3, n_days_ago)
+        self.assertEqual(timestamp4, 0)
+
+        longer_ago = n_days_ago // 2
+
+        # remove all static records.
+        self.ldap_delete_record(name, txt4)
+        self.ldap_update_record(name, txt1, dwTimeStamp=longer_ago)
+        self.ldap_update_record(name, txt2, dwTimeStamp=n_days_ago)
+        self.ldap_update_record(name, txt3, dwTimeStamp=n_days_ago)
+        timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
+        self.assertEqual(timestamp1, longer_ago)
+
+        timestamp4 = self.dns_update_record(name, txt4).dwTimeStamp
+        timestamp2 = self.get_unique_txt_record(name, txt2).dwTimeStamp
+        timestamp3 = self.get_unique_txt_record(name, txt3).dwTimeStamp
+        timestamp1 = self.get_unique_txt_record(name, txt1).dwTimeStamp
+
+        # Here, although there is no record frm which to get the zero
+        # timestamp, record 4 does it anyway.
+        self.assertEqual(timestamp1, longer_ago)
+        self.assertEqual(timestamp2, n_days_ago)
+        self.assertEqual(timestamp3, n_days_ago)
+        self.assertEqual(timestamp4, 0)
+
+        # and now record 1 wants to be static.
+        self.ldap_update_record(name, txt4, dwTimeStamp=longer_ago)
+        timestamp4 = self.get_unique_txt_record(name, txt4).dwTimeStamp
+        self.assertEqual(timestamp4, longer_ago)
+        timestamp1 = self.dns_update_record(name, txt1).dwTimeStamp
+        timestamp4 = self.get_unique_txt_record(name, txt4).dwTimeStamp
+        self.assertEqual(timestamp1, 0)
+        self.assertEqual(timestamp4, longer_ago)
+
+    def test_update_aging_enabled_in_no_refresh_window(self):
+        self._test_update_aging_enabled_n_days_ago(4)
+
+    def test_update_aging_enabled_on_no_refresh_boundary(self):
+        self._test_update_aging_enabled_n_days_ago(7)
+
+    def test_update_aging_enabled_in_refresh_window(self):
+        self._test_update_aging_enabled_n_days_ago(9)
+
+    def test_update_aging_enabled_beyond_refresh_window(self):
+        self._test_update_aging_enabled_n_days_ago(16)
+
+    def test_update_aging_enabled_in_eighteenth_century(self):
+        self._test_update_aging_enabled_n_days_ago(100000)
+
+    def test_update_static_stickiness(self):
+        name = 'test'
+        A = ['A']
+        B = ['B']
+        C = ['C']
+        D = ['D']
+
+        self.set_aging(False)
+        self.dns_update_record(name, A).dwTimeStamp
+        self.ldap_update_record(name, B, dwTimeStamp=0)
+        self.dns_update_record(name, B)
+        self.dns_update_record(name, C)
+        ctime = self.get_unique_txt_record(name, C).dwTimeStamp
+        self.assertEqual(ctime, 0)
+        btime = self.get_unique_txt_record(name, B).dwTimeStamp
+        self.assertEqual(btime, 0)
+
+        self.ldap_replace_records(name, [])
+
+        self.dns_update_record(name, D)
+        dtime = self.get_unique_txt_record(name, D).dwTimeStamp
+        self.assertEqual(dtime, 0)
+
+    def _test_update_timestamp_weirdness(self, n_days, aging=True):
+        name = 'test'
+        A = ['A']
+        B = ['B']
+
+        self.set_aging(aging)
+
+        current_time = self.dns_update_record(name, A).dwTimeStamp
+
+        # rewind timestamp using ldap
+        self.ldap_modify_timestamps(name, n_days * -24)
+        n_days_ago = self.get_unique_txt_record(name, A).dwTimeStamp
+        time_A = self.dns_update_record(name, A).dwTimeStamp
+        # that dns_update should have reset the timestamp ONLY if
+        # aging is on and the old timestamp is > noRefresh period (7
+        # days)
+        if n_days > 7 and aging:
+            self.assert_soon_after(time_A, current_time)
+        else:
+            self.assertEqual(time_A, n_days_ago)
+
+        # add another record, which should have the current timestamp
+        time_B = self.dns_update_record(name, B).dwTimeStamp
+        self.assert_soon_after(time_B, current_time)
+
+        time_A = self.get_unique_txt_record(name, A).dwTimeStamp
+        if aging and n_days <= 7:
+            self.assertEqual(time_A, n_days_ago)
+        else:
+            self.assert_soon_after(time_A, current_time)
+
+        # delete B, try again
+        self.ldap_delete_record(name, B)
+        self.ldap_update_record(name, A, dwTimeStamp=n_days_ago)
+
+        time_A = self.dns_update_record(name, A).dwTimeStamp
+
+        # here we are re-adding the deleted record
+        time_B = self.dns_update_record(name, B).dwTimeStamp
+        self.assert_soon_after(time_B, current_time)
+
+        time_A = self.get_unique_txt_record(name, A).dwTimeStamp
+        return n_days_ago, time_A, time_B
+
+    def test_update_timestamp_weirdness_no_refresh_no_aging(self):
+        n_days_ago, time_A, time_B = \
+            self._test_update_timestamp_weirdness(5, False)
+        # the timestamp of the SIBLING of the deleted, re-added record
+        # differs from the sibling of freshly added record.
+        self.assertEqual(time_A, n_days_ago)
+
+    def test_update_timestamp_weirdness_no_refresh_aging(self):
+        n_days_ago, time_A, time_B = \
+            self._test_update_timestamp_weirdness(5, True)
+        # the timestamp of the SIBLING of the deleted, re-added record
+        # differs from the sibling of freshly added record.
+        self.assertEqual(time_A, n_days_ago)
+
+    def test_update_timestamp_weirdness_refresh_no_aging(self):
+        n_days_ago, time_A, time_B = \
+            self._test_update_timestamp_weirdness(9, False)
+        self.assertEqual(time_A, time_B)
+
+    def test_update_timestamp_weirdness_refresh_aging(self):
+        n_days_ago, time_A, time_B = \
+            self._test_update_timestamp_weirdness(9, True)
+        self.assertEqual(time_A, time_B)
+
+    def test_aging_refresh(self):
+        name, txt = 'agingtest', ['test txt']
+        no_refresh = 100
+        refresh = 80
+        self.set_zone_int_params(NoRefreshInterval=no_refresh,
+                                 RefreshInterval=refresh,
+                                 Aging=1)
+        before_mod = self.dns_update_record(name, txt)
+        start_time = before_mod.dwTimeStamp
+
+        # go back 86 hours, which is in the no-refresh time (but
+        # wouldn't be if we had stuck to the default of 84).
+        self.ldap_modify_timestamps(name, -86)
+        rec = self.dns_update_record(name, txt)
+        self.assertEqual(rec.dwTimeStamp,
+                         start_time - 86)
+
+        # back to -102 hours, into the refresh zone
+        # the update should reset the timestamp to now.
+        self.ldap_modify_timestamps(name, -16)
+        rec = self.dns_update_record(name, txt)
+        self.assert_soon_after(rec.dwTimeStamp, start_time)
+
+        # back to -182 hours, beyond the end of the refresh period.
+        # Actually nothing changes at this time -- we can still
+        # refresh, but the record is liable for scavenging.
+        self.ldap_modify_timestamps(name, -182)
+        rec = self.dns_update_record(name, txt)
+        self.assert_soon_after(rec.dwTimeStamp, start_time)
+
+    def test_add_no_timestamp(self):
+        # check zero timestamp is implicit
+        self.set_aging(True)
+        rec = self.ldap_update_record('ldap', 'test')
+        self.assertEqual(rec.dwTimeStamp, 0)
+        rec = self.rpc_update_record('rpc', 'test')
+        self.assertEqual(rec.dwTimeStamp, 0)
+
+    def test_add_zero_timestamp(self):
+        rec = self.ldap_update_record('ldap', 'test', dwTimeStamp=0)
+        self.assertEqual(rec.dwTimeStamp, 0)
+        rec = self.rpc_update_record('rpc', 'test', dwTimeStamp=0)
+        self.assertEqual(rec.dwTimeStamp, 0)
+
+    def test_add_update_timestamp(self):
+        # LDAP can change timestamp, RPC can't
+        rec = self.ldap_update_record('ldap', 'test', dwTimeStamp=123456)
+        self.assertEqual(rec.dwTimeStamp, 123456)
+        rec = self.rpc_update_record('rpc', 'test', dwTimeStamp=123456)
+        self.assertEqual(rec.dwTimeStamp, 0)
+        # second time is a different code path (add vs update)
+        rec = self.rpc_update_record('rpc', 'test', dwTimeStamp=123456)
+        self.assertEqual(rec.dwTimeStamp, 0)
+        # RPC update the one with timestamp, zeroing it.
+        rec = self.rpc_update_record('ldap', 'test', dwTimeStamp=123456)
+        self.assertEqual(rec.dwTimeStamp, 0)
+
+    def test_add_update_ttl(self):
+        # RPC *can* set dwTtlSeconds.
+        rec = self.ldap_update_record('ldap', 'test',
+                                      dwTtlSeconds=1234)
+        self.assertEqual(rec.dwTtlSeconds, 1234)
+        rec = self.rpc_update_record('rpc', 'test', dwTtlSeconds=1234)
+        self.assertEqual(rec.dwTtlSeconds, 1234)
+        # does update work like add?
+        rec = self.rpc_update_record('rpc', 'test', dwTtlSeconds=4321)
+        self.assertEqual(rec.dwTtlSeconds, 4321)
+        rec = self.rpc_update_record('ldap', 'test', dwTtlSeconds=5678)
+        self.assertEqual(rec.dwTtlSeconds, 5678)
+
+    def test_add_update_ttl_serial(self):
+        # when setting dwTtlSeconds, what happens to serial number?
+        rec = self.ldap_update_record('ldap', 'test',
+                                      dwTtlSeconds=1234,
+                                      dwSerial=123)
+        self.assertEqual(rec.dwTtlSeconds, 1234)
+        self.assertEqual(rec.dwSerial, 123)
+        rec = self.rpc_update_record('rpc', 'test', dwTtlSeconds=1234)
+        self.assertEqual(rec.dwTtlSeconds, 1234)
+        serial = rec.dwSerial
+        self.assertLess(serial, 4)
+        rec = self.rpc_update_record('rpc', 'test', dwTtlSeconds=4321)
+        self.assertEqual(rec.dwTtlSeconds, 4321)
+        self.assertEqual(rec.dwSerial, serial + 1)
+        rec = self.rpc_update_record('ldap', 'test', dwTtlSeconds=5678)
+        self.assertEqual(rec.dwTtlSeconds, 5678)
+        self.assertEqual(rec.dwSerial, 124)
+
+    def test_add_update_dwFlags(self):
+        # dwFlags splits into rank and flags.
+        # according to [MS-DNSP] 2.3.2.2, flags MUST be zero
+        rec = self.ldap_update_record('ldap', 'test', flags=22222, rank=222)
+        self.assertEqual(rec.flags, 22222)
+        self.assertEqual(rec.rank, 222)
+
+        rec = self.rpc_update_record('ldap', 'test', dwFlags=3333333)
+        # rank != 3333333 & 0xff == 213
+        self.assertEqual(rec.rank, 240)    # RPC fixes rank
+        self.assertEqual(rec.flags, 0)
+
+        self.assertRaises(OverflowError,
+                          self.ldap_update_record,
+                          'ldap', 'test', flags=777777777, rank=777)
+
+        # reset to no default (rank overflows)
+        rec = self.ldap_update_record('ldap', 'test', flags=7777, rank=777)
+        self.assertEqual(rec.flags, 7777)
+        self.assertEqual(rec.rank, 9)
+
+        # DNS update zeros flags, sets rank to 240 (RANK_ZONE)
+        rec = self.dns_update_record('ldap', 'test', ttl=999)
+        self.assertEqual(rec.flags, 0)
+        self.assertEqual(rec.rank, 240)
+
+        rec = self.rpc_update_record('ldap', 'test', dwFlags=321)
+        self.assertEqual(rec.flags, 0)
+        self.assertEqual(rec.rank, 240)
+
+        # RPC adding a new record: fixed rank, zero flags
+        rec = self.rpc_update_record('ldap', 'test 2', dwFlags=12345)
+        self.assertEqual(rec.rank, 240)
+        self.assertEqual(rec.flags, 0)
+
+    def test_add_update_dwReserved(self):
+        # RPC does not change dwReserved.
+        rec = self.ldap_update_record('ldap', 'test', dwReserved=54321)
+        self.assertEqual(rec.dwReserved, 54321)
+        rec = self.rpc_update_record('rpc', 'test', dwReserved=54321)
+        self.assertEqual(rec.dwReserved, 0)
+        rec = self.rpc_update_record('rpc', 'test', dwReserved=54321)
+        self.assertEqual(rec.dwReserved, 0)
+        rec = self.rpc_update_record('ldap', 'test', dwReserved=12345)
+        self.assertEqual(rec.dwReserved, 54321)
+
+    def test_add_update_dwSerial(self):
+        # On Windows the RPC record ends up with serial 2, on Samba
+        # serial 3. Rather than knownfail this, we accept anything
+        # below 4 (for now).
+        rec = self.ldap_update_record('ldap', 'test', dwSerial=123)
+        self.assertEqual(rec.dwSerial, 123)
+        rec = self.rpc_update_record('rpc', 'test', dwSerial=123)
+        self.assertLess(rec.dwSerial, 4)
+        rec = self.rpc_update_record('rpc', 'test', dwSerial=123)
+        self.assertLess(rec.dwSerial, 4)
+        rec = self.dns_update_record('rpc', 'test')
+        self.assertLess(rec.dwSerial, 4)
+        rec = self.dns_update_record('dns-0', 'test')
+        self.assertLess(rec.dwSerial, 5)
+
+        rec = self.dns_update_record('ldap', 'test')
+        self.assertEqual(rec.dwSerial, 123)
+        rec = self.rpc_update_record('ldap', 'test', dwSerial=123)
+        self.assertEqual(rec.dwSerial, 123)
+        rec = self.ldap_update_record('ldap', 'test', dwSerial=12)
+        self.assertEqual(rec.dwSerial, 12)
+        # when we dns-updated ldap/test, we alerted Windows to 123 as
+        # a high water mark for the zone. (even though we have since
+        # dropped the serial to 12, 123 is the base serial for new
+        # records).
+        rec = self.dns_update_record('dns', 'test')
+        self.assertEqual(rec.dwSerial, 124)
+        rec = self.dns_update_record('dns2', 'test')
+        self.assertEqual(rec.dwSerial, 125)
+        rec = self.rpc_update_record('rpc2', 'test')
+        self.assertEqual(rec.dwSerial, 126)
+        rec = self.dns_update_record('dns', 'test 2')
+        self.assertEqual(rec.dwSerial, 127)
+
+    def test_add_update_dwSerial_2(self):
+        # On Samba the RPC update resets the serial to a low number,
+        # while Windows leaves it high.
+        rec = self.ldap_update_record('ldap', 'test', dwSerial=123)
+        self.assertEqual(rec.dwSerial, 123)
+        rec = self.rpc_update_record('ldap', 'test', dwSerial=321)
+        self.assertEqual(rec.dwSerial, 123)
+        rec = self.dns_update_record('ldap', 'test')
+        self.assertEqual(rec.dwSerial, 123)
+
+    def test_add_update_many(self):
+        # Samba fails often in this set, but we want to see how it
+        # goes further down, so we print the problems and defer the
+        # failure.
+        failures = 0
+        total = 0
+
+        def _defer_wrap(f):
+            def _defer(*args):
+                nonlocal failures, total
+                total += 1
+                try:
+                    f(*args)
+                except self.failureException as e:
+                    from traceback import format_stack
+                    print(f"{format_stack()[-2]} {e}\n")
+                    failures += 1
+            return _defer
+
+        defer_assertEqual = _defer_wrap(self.assertEqual)
+        defer_assert_timestamp_in_ballpark = \
+            _defer_wrap(self.assert_timestamp_in_ballpark)
+
+        self.set_aging(False)
+        rec = self.ldap_update_record('ldap', 'test',
+                                      version=11,
+                                      rank=22,
+                                      flags=33,
+                                      dwSerial=44,
+                                      dwTtlSeconds=55,
+                                      dwReserved=66,
+                                      dwTimeStamp=77)
+
+        self.assertEqual(rec.version, 5)  # disobeys request
+        self.assertEqual(rec.rank, 22)
+        self.assertEqual(rec.flags, 33)
+        self.assertEqual(rec.dwSerial, 44)
+        self.assertEqual(rec.dwTtlSeconds, 55)
+        self.assertEqual(rec.dwReserved, 66)
+        self.assertEqual(rec.dwTimeStamp, 77)
+        # DNS updates first
+        rec = self.dns_update_record('ldap', 'test', ttl=999)
+        self.assertEqual(rec.version, 5)
+        self.assertEqual(rec.rank, 240)          # rank gets fixed by DNS update
+        defer_assertEqual(rec.flags, 0)          # flags gets fixed
+        defer_assertEqual(rec.dwSerial, 45)      # serial increments
+        self.assertEqual(rec.dwTtlSeconds, 999)  # TTL set
+        defer_assertEqual(rec.dwReserved, 0)     # reserved fixed
+        defer_assert_timestamp_in_ballpark(rec)  # changed on Windows ?!
+
+        self.set_aging(True)
+        rec = self.dns_update_record('ldap', 'test', ttl=1111)
+        self.assertEqual(rec.version, 5)
+        self.assertEqual(rec.rank, 240)
+        defer_assertEqual(rec.flags, 0)
+        defer_assertEqual(rec.dwSerial, 46)
+        self.assertEqual(rec.dwTtlSeconds, 1111)  # TTL set
+        defer_assertEqual(rec.dwReserved, 0)
+        self.assert_timestamp_in_ballpark(rec)
+
+        # RPC update
+        rec = self.rpc_update_record('ldap', 'test',
+                                     version=111,
+                                     dwFlags=333,
+                                     dwSerial=444,
+                                     dwTtlSeconds=555,
+                                     dwReserved=666,
+                                     dwTimeStamp=777)
+
+        self.assertEqual(rec.version, 5)         # no change
+        self.assertEqual(rec.rank, 240)          # no change
+        defer_assertEqual(rec.flags, 0)           # no change
+        defer_assertEqual(rec.dwSerial, 47)       # Serial increments
+        self.assertEqual(rec.dwTtlSeconds, 555)  # TTL set
+        defer_assertEqual(rec.dwReserved, 0)      # no change
+        self.assertEqual(rec.dwTimeStamp, 0)     # timestamp zeroed
+
+        # RPC update, using default values
+        rec = self.rpc_update_record('ldap', 'test')
+        self.assertEqual(rec.version, 5)
+        self.assertEqual(rec.rank, 240)
+        defer_assertEqual(rec.flags, 0)
+        defer_assertEqual(rec.dwSerial, 48)       # serial increments
+        self.assertEqual(rec.dwTtlSeconds, 900)  # TTL changed
+        defer_assertEqual(rec.dwReserved, 0)
+        self.assertEqual(rec.dwTimeStamp, 0)
+
+        self.set_aging(False)
+        rec = self.dns_update_record('ldap', 'test', ttl=888)
+        self.assertEqual(rec.version, 5)
+        self.assertEqual(rec.rank, 240)
+        defer_assertEqual(rec.flags, 0)
+        defer_assertEqual(rec.dwSerial, 49)       # serial increments
+        self.assertEqual(rec.dwTtlSeconds, 888)  # TTL set
+        defer_assertEqual(rec.dwReserved, 0)
+        self.assertEqual(rec.dwTimeStamp, 0)     # timestamp stays zero
+
+        if failures:
+            self.fail(f"failed {failures}/{total} defered assertions")
+
+    def test_static_record_dynamic_update(self):
+        """Add a static record, then a dynamic record.
+        The dynamic record should have a timestamp set."""
+        name = 'test'
+        txt = ['static txt']
+        txt2 = ['dynamic txt']
+        self.set_aging(True)
+        rec = self.ldap_update_record(name, txt, dwTimeStamp=0)
+        rec2 = self.dns_update_record(name, txt2)
+        self.assert_timestamp_in_ballpark(rec2)
+        ts2 = rec2.dwTimeStamp
+        # update the first record. It should stay static (timestamp 0)
+        rec = self.dns_update_record(name, txt)
+        self.assertEqual(rec.dwTimeStamp, 0)
+        # and rec2 should be unchanged.
+        self.assertEqual(rec2.dwTimeStamp, ts2)
+
+    def test_dynamic_record_static_update(self):
+        name = 'agingtest'
+        txt1 = ['dns update before']
+        txt2 = ['ldap update']
+        txt3 = ['dns update after']
+        self.set_aging(True)
+
+        self.dns_update_record(name, txt1)
+        self.ldap_update_record(name, txt2)
+        self.dns_update_record(name, txt3)
+
+        recs = self.get_rpc_records(name)
+        for r in recs:
+            d = [x.str for x in r.data.str]
+            if d == txt1:
+                self.assertNotEqual(r.dwTimeStamp, 0)
+            elif d == txt2:
+                self.assertEqual(r.dwTimeStamp, 0)
+            elif d == txt3:
+                self.assertNotEqual(r.dwTimeStamp, 0)
+
+    def test_basic_scavenging(self):
+        # NOTE: This one fails on Windows, because the RPC call to
+        # prompt scavenging is not immediate. On Samba, in the
+        # testenv, we don't have the RPC call but we can connect to
+        # the database directly.
+
+        # just to be sure we have the right limits.
+        self.set_zone_int_params(NoRefreshInterval=84,
+                                 RefreshInterval=84,
+                                 Aging=1)
+
+        ts1, ts2, ts3, ts4, ts5, ts6 = ('1', '2', '3', '4', '5', '6')
+        self.dns_update_record(ts1, ts1)
+        self.dns_update_record(ts2, ts2)
+        # ts2 is tombstoned and timestamped in 1981
+        self.dns_tombstone(ts2)
+        # ts3 is tombstoned and timestamped in the future
+        self.dns_tombstone(ts3, epoch_hours=(DNS_TIMESTAMP_2101 - 1))
+        # ts4 is tombstoned and timestamped in the past
+        self.dns_tombstone(ts4, epoch_hours=1111111)
+        # ts5 is tombstoned in the past and timestamped in the future
+        self.dns_tombstone(ts5, epoch_hours=5555555, epoch_nttime=int(1e10))
+
+        # ts2 and ts3 should now be tombstoned.
+        self.assert_tombstoned(ts2)
+        self.assert_tombstoned(ts3)
+
+        # let's un-tombstone ts2
+        # ending up with dnsTombstoned: FALSE in Samba
+        # and no dNSTombstoned in Windows.
+        self.dns_update_record(ts2, "ts2 untombstoned")
+        ts2_node = self.get_one_node(ts2)
+        ts2_tombstone = ts2_node.get("dNSTombstoned")
+        if ts2_tombstone is not None:
+            self.assertEqual(ts2_tombstone[0], b"FALSE")
+
+        self.assert_tombstoned(ts2, tombstoned=False)
+
+        r = self.dns_update_record(ts6, ts6)
+
+        # put some records into the death zone.
+        self.ldap_modify_timestamps(ts1, -15 * 24)
+        self.ldap_modify_timestamps(ts2, -14 * 24 - 2)
+        self.ldap_modify_timestamps(ts6, -14 * 24 + 2)
+
+        # ts1 will be saved by this record
+        self.dns_update_record(ts1, "another record")
+
+        try:
+            # Tell the server to clean-up records.
+            # This is how it *should* work on Windows:
+            self.rpc_conn.DnssrvOperation2(
+                dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+                0,
+                SERVER_IP,
+                None,
+                0,
+                "StartScavenging",
+                dnsserver.DNSSRV_TYPEID_NULL,
+                None)
+            # Samba won't get here (NOT_IMPLEMENTED error)
+            # wait for Windows to do its cleanup.
+            time.sleep(2)
+        except WERRORError as e:
+            if e.args[0] == werror.WERR_CALL_NOT_IMPLEMENTED:
+                # This is the Samba way, talking to the file directly,
+                # as if we were the server process. The direct
+                # connection is needed because the tombstoning search
+                # involves a magic system only filter.
+                file_samdb = get_file_samdb()
+                dsdb._scavenge_dns_records(file_samdb)
+                dsdb._dns_delete_tombstones(file_samdb)
+            else:
+                raise
+
+        # Now what we should have:
+        # ts1: alive: the old record is deleted, the new one not.
+        # ts2: tombstoned
+        # ts3: tombstoned
+        # ts4: deleted. gone.
+        # ts5: deleted. timestamp affects tombstoning, but not deletion.
+        # ts6: alive
+        #
+        # We order our assertions to make the windows test
+        # fail as late as possible (on ts4, ts5, ts2).
+        r = self.get_unique_txt_record(ts1, ["another record"])
+        self.assertIsNotNone(r)
+        r = self.get_unique_txt_record(ts6, [ts6])
+        self.assertIsNotNone(r)
+
+        self.assert_tombstoned(ts3)
+
+        n = self.get_one_node(ts4)
+        self.assertIsNone(n)
+        n = self.get_one_node(ts5)
+        self.assertIsNone(n)
+
+        self.assert_tombstoned(ts2)
+
+
+TestProgram(module=__name__, opts=subunitopts)
diff --git a/selftest/knownfail.d/dns-aging b/selftest/knownfail.d/dns-aging
new file mode 100644 (file)
index 0000000..0260708
--- /dev/null
@@ -0,0 +1,33 @@
+# known failures for python/samba/tests/dns_aging.py
+#
+# These all pass on Windows, apart from test_basic_scavenging, which
+# fails due to technical issues.
+
+samba.tests.dns_aging.+test_add_update_dwFlags
+samba.tests.dns_aging.+test_add_update_dwReserved
+samba.tests.dns_aging.+test_add_update_dwSerial
+samba.tests.dns_aging.+test_add_update_dwSerial_2
+samba.tests.dns_aging.+test_add_update_many
+samba.tests.dns_aging.+test_add_update_timestamp
+samba.tests.dns_aging.+test_add_update_ttl
+samba.tests.dns_aging.+test_add_update_ttl_serial
+samba.tests.dns_aging.+test_basic_scavenging
+samba.tests.dns_aging.+test_dynamic_record_static_update
+samba.tests.dns_aging.+test_rpc_update_timestamps
+samba.tests.dns_aging.+test_static_record_dynamic_update
+samba.tests.dns_aging.+test_update_aging_disabled\b
+samba.tests.dns_aging.+test_update_aging_disabled_beyond_refresh_window
+samba.tests.dns_aging.+test_update_aging_disabled_in_eighteenth_century
+samba.tests.dns_aging.+test_update_aging_disabled_in_no_refresh_window
+samba.tests.dns_aging.+test_update_aging_disabled_in_refresh_window
+samba.tests.dns_aging.+test_update_aging_disabled_on_no_refresh_boundary
+samba.tests.dns_aging.+test_update_aging_disabled_static
+samba.tests.dns_aging.+test_update_aging_enabled
+samba.tests.dns_aging.+test_update_aging_enabled_beyond_refresh_window
+samba.tests.dns_aging.+test_update_aging_enabled_in_eighteenth_century
+samba.tests.dns_aging.+test_update_aging_enabled_in_no_refresh_window
+samba.tests.dns_aging.+test_update_aging_enabled_in_refresh_window
+samba.tests.dns_aging.+test_update_aging_enabled_on_no_refresh_boundary
+samba.tests.dns_aging.+test_update_static_stickiness
+samba.tests.dns_aging.+test_update_timestamp_weirdness_no_refresh_no_aging
+samba.tests.dns_aging.+test_update_timestamp_weirdness_refresh_no_aging
index 0a5fe13f2f9c5f5d84c9b1a83ad2239376f9b3c2..dc466e72f638d08892992bbace53568fc112cd69 100755 (executable)
@@ -453,6 +453,16 @@ plantestsuite_loadlist("samba.tests.dns", "fl2003dc:local", [python, os.path.joi
 plantestsuite_loadlist("samba.tests.dns", "rodc:local", [python, os.path.join(srcdir(), "python/samba/tests/dns.py"), '$SERVER', '$SERVER_IP', '--machine-pass', '-U"$USERNAME%$PASSWORD"', '--workgroup=$DOMAIN', '$LOADLIST', '$LISTOPT'])
 plantestsuite_loadlist("samba.tests.dns", "vampire_dc:local", [python, os.path.join(srcdir(), "python/samba/tests/dns.py"), '$SERVER', '$SERVER_IP', '--machine-pass', '-U"$USERNAME%$PASSWORD"', '--workgroup=$DOMAIN', '$LOADLIST', '$LISTOPT'])
 
+plantestsuite_loadlist("samba.tests.dns_aging", "fl2003dc:local",
+                       [python,
+                        f"{srcdir()}/python/samba/tests/dns_aging.py",
+                        '$SERVER',
+                        '$SERVER_IP',
+                        '--machine-pass',
+                        '-U"$USERNAME%$PASSWORD"',
+                        '--workgroup=$DOMAIN',
+                        '$LOADLIST', '$LISTOPT'])
+
 plantestsuite_loadlist("samba.tests.dns_forwarder", "fl2003dc:local", [python, os.path.join(srcdir(), "python/samba/tests/dns_forwarder.py"), '$SERVER', '$SERVER_IP', '$DNS_FORWARDER1', '$DNS_FORWARDER2', '--machine-pass', '-U"$USERNAME%$PASSWORD"', '--workgroup=$DOMAIN', '$LOADLIST', '$LISTOPT'])
 
 plantestsuite_loadlist("samba.tests.dns_tkey", "fl2008r2dc", [python, os.path.join(srcdir(), "python/samba/tests/dns_tkey.py"), '$SERVER', '$SERVER_IP', '--machine-pass', '-U"$USERNAME%$PASSWORD"', '--workgroup=$DOMAIN', '$LOADLIST', '$LISTOPT'])