DNS_TIMESTAMP_2101 = 4383000
DNS_TIMESTAMP_1981 = 3333333 # a middling timestamp
+IPv4_ADDR = "127.0.0.33"
+IPv6_ADDR = "::1"
+IPv4_ADDR_2 = "127.0.0.66"
+IPv6_ADDR_2 = "1::1"
+
+
def get_samdb():
return SamDB(url=f"ldap://{SERVER_IP}",
lp=LP,
def get_unique_ip_record(self, name, addr, wtype=None):
"""Get an A or AAAA record on name with the matching data."""
if wtype is None:
- wtype = guess_wtype(addr)
+ addr, wtype = guess_wtype(addr)
recs = self.ldap_get_records(name)
self.assert_tombstoned(ts2)
+ def _test_A_and_AAAA_records(self, A, B, a_days, b_days, aging):
+ self.set_aging(aging)
+
+ name = 'aargh'
+ now = dsdb_dns.unix_to_dns_timestamp(int(time.time()))
+ a_initial = now - 24 * a_days
+ b_initial = now - 24 * b_days
+
+ self.dns_update_non_text(name, A)
+ self.ldap_modify_timestamps(name, a_days * -24)
+
+ rec_a = self.get_unique_ip_record(name, A)
+ rec_b = self.add_ip_record(name, B, dwTimeStamp=b_initial)
+
+ self.assert_timestamps_equal(rec_a, a_initial)
+ self.assert_timestamps_equal(rec_b, b_initial)
+
+ # touch the A record.
+ self.dns_update_non_text(name, A)
+
+ # check the A timestamp, depending on norefresh
+ rec_a = self.get_unique_ip_record(name, A)
+ if aging and a_days > 7:
+ time_a = now
+ self.assert_soon_after(rec_a, now)
+ elif a_days > 7:
+ # when we have NO aging and are in the refresh window, the
+ # timestamp now reads as a_initial, but will become now
+ # after we manipulate B for a bit.
+ time_a = now
+ self.assert_timestamps_equal(rec_a, a_initial)
+ else:
+ time_a = a_initial
+ self.assert_timestamps_equal(rec_a, a_initial)
+
+ # B timestamp should be unchanged?
+ rec_b = self.get_unique_ip_record(name, B)
+ self.assert_timestamps_equal(rec_b, b_initial)
+
+ # touch the B record.
+ self.dns_update_non_text(name, B)
+
+ # check the B timestamp
+ rec_b = self.get_unique_ip_record(name, B)
+
+ self.assert_soon_after(rec_b, now)
+
+ # rewind B
+ rec_b = self.add_ip_record(name, B, dwTimeStamp=b_initial)
+
+ # NOW rec A might have changed! with no aging, and out of refresh.
+ rec_a = self.get_unique_ip_record(name, A)
+ self.assert_timestamps_equal(rec_a, time_a)
+
+ self.dns_update_non_text(name, A)
+
+ rec_a = self.get_unique_ip_record(name, B)
+ self.assert_timestamps_equal(rec_b, b_initial)
+
+ # now delete A
+ _, wtype = guess_wtype(A)
+ self.ldap_delete_record(name, A, wtype=wtype)
+
+ # re-add it
+ self.dns_update_non_text(name, A)
+
+ rec_a = self.get_unique_ip_record(name, A)
+ self.assert_soon_after(rec_a, now)
+
+ rec_b = self.get_unique_ip_record(name, B)
+ self.assert_timestamps_equal(rec_b, b_initial)
+
+ def test_A_5_days_AAAA_5_days_aging(self):
+ self._test_A_and_AAAA_records(IPv4_ADDR, IPv6_ADDR, 5, 5, aging=True)
+
+ def test_A_5_days_AAAA_5_days_no_aging(self):
+ self._test_A_and_AAAA_records(IPv4_ADDR, IPv6_ADDR, 5, 5, aging=False)
+
+ def test_A_5_days_AAAA_10_days_aging(self):
+ self._test_A_and_AAAA_records(IPv4_ADDR, IPv6_ADDR, 5, 10, aging=True)
+
+ def test_A_5_days_AAAA_10_days_no_aging(self):
+ self._test_A_and_AAAA_records(IPv4_ADDR, IPv6_ADDR, 5, 10, aging=False)
+
+ def test_A_10_days_AAAA_5_days_aging(self):
+ self._test_A_and_AAAA_records(IPv4_ADDR, IPv6_ADDR, 10, 5, aging=True)
+
+ def test_A_10_days_AAAA_5_days_no_aging(self):
+ self._test_A_and_AAAA_records(IPv4_ADDR, IPv6_ADDR, 10, 5, aging=False)
+
+ def test_A_10_days_AAAA_9_days_aging(self):
+ self._test_A_and_AAAA_records(IPv4_ADDR, IPv6_ADDR, 10, 9, aging=True)
+
+ def test_A_9_days_AAAA_10_days_no_aging(self):
+ self._test_A_and_AAAA_records(IPv4_ADDR, IPv6_ADDR, 9, 10, aging=False)
+
+ def test_A_20_days_AAAA_2_days_aging(self):
+ self._test_A_and_AAAA_records(IPv4_ADDR, IPv6_ADDR, 20, 2, aging=True)
+
+ def test_A_6_days_AAAA_40_days_no_aging(self):
+ self._test_A_and_AAAA_records(IPv4_ADDR, IPv6_ADDR, 6, 40, aging=False)
+
+ def test_A_5_days_A_5_days_aging(self):
+ self._test_A_and_AAAA_records(IPv4_ADDR, IPv4_ADDR_2, 5, 5, aging=True)
+
+ def test_A_5_days_A_10_days_no_aging(self):
+ self._test_A_and_AAAA_records(IPv4_ADDR, IPv4_ADDR_2, 5, 10, aging=False)
+
+ def test_AAAA_5_days_AAAA_6_days_aging(self):
+ self._test_A_and_AAAA_records(IPv6_ADDR, IPv6_ADDR_2, 5, 6, aging=True)
+
+ def test_AAAA_5_days_AAAA_6_days_no_aging(self):
+ self._test_A_and_AAAA_records(IPv6_ADDR, IPv6_ADDR_2, 5, 6, aging=False)
+
TestProgram(module=__name__, opts=subunitopts)