self.assert_tombstoned(ts2)
+ def test_samba_scavenging(self):
+ # We expect this one to fail on Windows, because scavenging
+ # and tombstoning cannot be performed on demand.
+
+ try:
+ file_samdb = get_file_samdb()
+ except ldb.LdbError as e:
+ raise AssertionError(
+ f"failing because '{e}': this is Windows?") from None
+
+ # let's try different limits.
+ self.set_zone_int_params(NoRefreshInterval=30,
+ RefreshInterval=20,
+ Aging=1)
+
+ now = dsdb_dns.unix_to_dns_timestamp(int(time.time()))
+
+ A, B, C, D = 'ABCD'
+ # A has current time
+ # B has safe, non-updateable time
+ # C has safe time
+ # D is scavengeable
+ atime = self.dns_update_record(A, A).dwTimeStamp
+ btime = self.ldap_update_record(B, B, dwTimeStamp=now-20).dwTimeStamp
+ btime = self.ldap_update_record(C, C, dwTimeStamp=now-40).dwTimeStamp
+ dtime = self.ldap_update_record(D, D, dwTimeStamp=now-60).dwTimeStamp
+ self.assert_soon_after(atime, now)
+ self.assert_timestamps_equal(btime, now-20)
+ self.assert_timestamps_equal(ctime, now-40)
+ self.assert_timestamps_equal(dtime, now-60)
+
+ dsdb._scavenge_dns_records(file_samdb)
+
+ # D should be gone (tombstoned)
+ r = self.get_unique_txt_record(D, D)
+ self.assertIsNone(r)
+ r = dns_query(self, D, qtype=dns.DNS_QTYPE_TXT)
+ self.assertEqual(r.ancount, 0)
+ recs = self.ldap_get_records(D)
+ self.assertEqual(len(recs), 1)
+ self.assert_tombstoned(recs[0])
+
+ # others unchanged.
+ atime = self.get_unique_txt_record(A, A).dwTimeStamp
+ btime = self.get_unique_txt_record(B, B).dwTimeStamp
+ ctime = self.get_unique_txt_record(C, C).dwTimeStamp
+ self.assert_soon_after(atime, now)
+ self.assert_timestamps_equal(btime, now-20)
+ self.assert_timestamps_equal(ctime, now-40)
+
+ btime = self.dns_update_record(B, B).dwTimeStamp
+ ctime = self.dns_update_record(C, C).dwTimeStamp
+ self.assert_timestamps_equal(btime, now-40)
+ self.assert_soon_after(ctime, now)
+
+ # after this, D *should* still be a tombstone, because its
+ # tombstone timestamp is not very old.
+ dsdb._dns_delete_tombstones(file_samdb)
+ recs = self.ldap_get_records(D)
+ self.assertEqual(len(recs), 1)
+ self.assert_tombstoned(recs[0])
+
+ # Let's delete C using rpc, and ensure it survives dns_delete_tombstones
+ self.rpc_delete_txt(C, C)
+ recs = self.ldap_get_records(C)
+ self.assertEqual(len(recs), 1)
+ self.assert_tombstoned(recs[0])
+ dsdb._dns_delete_tombstones(file_samdb)
+ recs = self.ldap_get_records(C)
+ self.assertEqual(len(recs), 1)
+ self.assert_tombstoned(recs[0])
+
+ # now let's wind A and B back to either side of the two week
+ # threshold. A should survive, B should not.
+ self.dns_tombstone(A, (now - 166))
+ self.dns_tombstone(B, (now - 170))
+ dsdb._dns_delete_tombstones(file_samdb)
+
+ recs = self.ldap_get_records(A)
+ self.assertEqual(len(recs), 1)
+ self.assert_tombstoned(recs[0])
+
+ recs = self.ldap_get_records(B)
+ self.assertEqual(len(recs), 0)
+
def _test_A_and_AAAA_records(self, A, B, a_days, b_days, aging):
self.set_aging(aging)