#
from __future__ import print_function
+
+from samba import dsdb
+from samba.ndr import ndr_unpack, ndr_pack
+from samba.samdb import SamDB
+from samba.auth import system_session
+import ldb
import os
import sys
import struct
r.rr_class = dns.DNS_QCLASS_IN
r.ttl = 900
r.length = 0xffff
- rdata = value
- r.rdata = rdata
- updates = [r]
+ r.rdata = value
p.nscount = 1
- p.nsrecs = updates
+ p.nsrecs = [r]
(response, response_packet) = self.dns_transaction_udp(p, host=server_ip)
self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
self.timeout = timeout
self.zone = "test.lan"
- self.rpc_conn = dnsserver.dnsserver("ncacn_ip_tcp:%s[sign]" % (self.server_ip),
+ self.rpc_conn = dnsserver.dnsserver("ncacn_ip_tcp:%s[sign]" %\
+ (self.server_ip),
self.lp, self.creds)
+ self.samdb = SamDB(url="ldap://" + self.server_ip,
+ lp = self.get_loadparm(),
+ session_info=system_session(),
+ credentials=self.creds)
+
+ self.zone_dn = "DC=" + self.zone +\
+ ",CN=MicrosoftDNS,DC=DomainDNSZones," +\
+ str(self.samdb.get_default_basedn())
+
def tearDown(self):
super(TestZones, self).tearDown()
+
try:
self.delete_zone(self.zone)
except RuntimeError as e:
if num != werror.WERR_DNS_ERROR_ZONE_DOES_NOT_EXIST:
raise
- def create_zone(self, zone):
+ def create_zone(self, zone, aging_enabled=False):
zone_create = dnsserver.DNS_RPC_ZONE_CREATE_INFO_LONGHORN()
zone_create.pszZoneName = zone
zone_create.dwZoneType = dnsp.DNS_ZONE_TYPE_PRIMARY
- zone_create.fAllowUpdate = dnsp.DNS_ZONE_UPDATE_SECURE
- zone_create.fAging = 0
+ zone_create.fAging = int(aging_enabled)
zone_create.dwDpFlags = dnsserver.DNS_DP_DOMAIN_DEFAULT
+ zone_create.fDsIntegrated = 1
+ zone_create.fLoadExisting = 1
+ zone_create.fAllowUpdate = dnsp.DNS_ZONE_UPDATE_UNSECURE
try:
- self.rpc_conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+ client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
+ self.rpc_conn.DnssrvOperation2(client_version,
0,
self.server_ip,
None,
except WERRORError as e:
self.fail(str(e))
+ def set_params(self, **kwargs):
+ zone = kwargs.pop('zone', None)
+ for key,val in kwargs.items():
+ name_param = dnsserver.DNS_RPC_NAME_AND_PARAM()
+ name_param.dwParam = val
+ name_param.pszNodeName = key
+
+ client_version = dnsserver.DNS_CLIENT_VERSION_LONGHORN
+ nap_type = dnsserver.DNSSRV_TYPEID_NAME_AND_PARAM
+ try:
+ self.rpc_conn.DnssrvOperation2(client_version, 0, self.server, zone,
+ 0, 'ResetDwordProperty', nap_type,
+ name_param)
+ except WERRORError as e:
+ self.fail(str(e))
+
+ def ldap_modify_dnsrecs(self, name, func):
+ dn = 'DC={},{}'.format(name, self.zone_dn)
+ dns_recs = self.ldap_get_dns_records(name)
+ for rec in dns_recs:
+ func(rec)
+ update_dict = {'dn':dn, 'dnsRecord':[ndr_pack(r) for r in dns_recs]}
+ self.samdb.modify(ldb.Message.from_dict(self.samdb,
+ update_dict,
+ ldb.FLAG_MOD_REPLACE))
+
+ def dns_update_record(self, prefix, txt):
+ p = self.make_txt_update(prefix, txt, self.zone)
+ (code, response) = self.dns_transaction_udp(p, host=self.server_ip)
+ self.assert_dns_rcode_equals(code, dns.DNS_RCODE_OK)
+ recs = self.ldap_get_dns_records(prefix)
+ recs = [r for r in recs if r.data.str == txt]
+ self.assertEqual(len(recs), 1)
+ return recs[0]
+
+ def ldap_get_records(self, name):
+ dn = 'DC={},{}'.format(name, self.zone_dn)
+ expr = "(&(objectClass=dnsNode)(name={}))".format(name)
+ return self.samdb.search(base=dn, scope=ldb.SCOPE_SUBTREE,
+ expression=expr, attrs=["*"])
+
+ def ldap_get_dns_records(self, name):
+ records = self.ldap_get_records(name)
+ return [ndr_unpack(dnsp.DnssrvRpcRecord, r)
+ for r in records[0].get('dnsRecord')]
+
+ def ldap_get_zone_settings(self):
+ records = self.samdb.search(base=self.zone_dn, scope=ldb.SCOPE_BASE,
+ expression="(&(objectClass=dnsZone)"+\
+ "(name={}))".format(self.zone),
+ attrs=["dNSProperty"])
+ self.assertEqual(len(records), 1)
+ props = [ndr_unpack(dnsp.DnsProperty, r)
+ for r in records[0].get('dNSProperty')]
+
+ #We have no choice but to repeat these here.
+ zone_prop_ids = {0x00: "EMPTY",
+ 0x01: "TYPE",
+ 0x02: "ALLOW_UPDATE",
+ 0x08: "SECURE_TIME",
+ 0x10: "NOREFRESH_INTERVAL",
+ 0x11: "SCAVENGING_SERVERS",
+ 0x12: "AGING_ENABLED_TIME",
+ 0x20: "REFRESH_INTERVAL",
+ 0x40: "AGING_STATE",
+ 0x80: "DELETED_FROM_HOSTNAME",
+ 0x81: "MASTER_SERVERS",
+ 0x82: "AUTO_NS_SERVERS",
+ 0x83: "DCPROMO_CONVERT",
+ 0x90: "SCAVENGING_SERVERS_DA",
+ 0x91: "MASTER_SERVERS_DA",
+ 0x92: "NS_SERVERS_DA",
+ 0x100: "NODE_DBFLAGS"}
+ return {zone_prop_ids[p.id].lower(): p.data for p in props}
+
+ def set_aging(self, enable=False):
+ self.create_zone(self.zone, aging_enabled=enable)
+ self.set_params(NoRefreshInterval=1, RefreshInterval=1,
+ Aging=int(bool(enable)), zone=self.zone,
+ AllowUpdate = dnsp.DNS_ZONE_UPDATE_UNSECURE)
+
+ def test_set_aging(self, enable=True, name='agingtest', txt=['test txt']):
+ self.set_aging(enable=True)
+ settings = self.ldap_get_zone_settings()
+ self.assertTrue(settings['aging_state'] is not None)
+ self.assertTrue(settings['aging_state'])
+
+ rec = self.dns_update_record('agingtest', ['test txt'])
+ self.assertNotEqual(rec.dwTimeStamp, 0)
+
+ def test_set_aging_disabled(self):
+ self.set_aging(enable=False)
+ settings = self.ldap_get_zone_settings()
+ self.assertTrue(settings['aging_state'] is not None)
+ self.assertFalse(settings['aging_state'])
+
+ rec = self.dns_update_record('agingtest', ['test txt'])
+ self.assertNotEqual(rec.dwTimeStamp, 0)
+
+ def test_aging_update(self, enable=True):
+ name, txt = 'agingtest', ['test txt']
+ self.set_aging(enable=True)
+ before_mod = self.dns_update_record(name, txt)
+ if not enable:
+ self.set_params(zone=self.zone, Aging=0)
+ dec = 2
+ def mod_ts(rec):
+ rec.dwTimeStamp -= dec
+ self.ldap_modify_dnsrecs(name, mod_ts)
+ after_mod = self.ldap_get_dns_records(name)
+ self.assertEqual(len(after_mod), 1)
+ after_mod = after_mod[0]
+ self.assertEqual(after_mod.dwTimeStamp,
+ before_mod.dwTimeStamp - dec)
+ after_update = self.dns_update_record(name, txt)
+ after_should_equal = before_mod if enable else after_mod
+ self.assertEqual(after_should_equal.dwTimeStamp,
+ after_update.dwTimeStamp)
+
+ def test_aging_update_disabled(self):
+ self.test_aging_update(enable=False)
+
+ def test_aging_refresh(self):
+ name,txt = 'agingtest', ['test txt']
+ self.create_zone(self.zone, aging_enabled=True)
+ interval = 10
+ self.set_params(NoRefreshInterval=interval, RefreshInterval=interval,
+ Aging=1, zone=self.zone,
+ AllowUpdate = dnsp.DNS_ZONE_UPDATE_UNSECURE)
+ before_mod = self.dns_update_record(name, txt)
+ def mod_ts(rec):
+ rec.dwTimeStamp -= interval/2
+ self.ldap_modify_dnsrecs(name, mod_ts)
+ update_during_norefresh = self.dns_update_record(name, txt)
+ def mod_ts(rec):
+ rec.dwTimeStamp -= interval + interval/2
+ self.ldap_modify_dnsrecs(name, mod_ts)
+ update_during_refresh = self.dns_update_record(name, txt)
+ self.assertEqual(update_during_norefresh.dwTimeStamp,
+ before_mod.dwTimeStamp - interval/2)
+ self.assertEqual(update_during_refresh.dwTimeStamp,
+ before_mod.dwTimeStamp)
+
+ def test_rpc_add_no_timestamp(self):
+ name,txt = 'agingtest', ['test txt']
+ self.set_aging(enable=True)
+ rec_buf = dnsserver.DNS_RPC_RECORD_BUF()
+ rec_buf.rec = TXTRecord(txt)
+ self.rpc_conn.DnssrvUpdateRecord2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+ 0, self.server_ip,
+ self.zone, name, rec_buf, None)
+ recs = self.ldap_get_dns_records(name)
+ self.assertEqual(len(recs), 1)
+ self.assertEqual(recs[0].dwTimeStamp, 0)
+
+ def test_basic_scavenging(self):
+ self.create_zone(self.zone, aging_enabled=True)
+ interval = 1
+ self.set_params(NoRefreshInterval=interval, RefreshInterval=interval,
+ zone=self.zone, Aging=1,
+ AllowUpdate = dnsp.DNS_ZONE_UPDATE_UNSECURE)
+ name, txt = 'agingtest', ['test txt']
+ rec = self.dns_update_record(name,txt)
+ rec = self.dns_update_record(name+'2',txt)
+ def mod_ts(rec):
+ rec.dwTimeStamp -= interval*5
+ self.ldap_modify_dnsrecs(name, mod_ts)
+ dsdb._scavenge_dns_records(self.samdb)
+
+ recs = self.ldap_get_dns_records(name)
+ self.assertEqual(len(recs), 1)
+ self.assertEqual(recs[0].wType, dnsp.DNS_TYPE_TOMBSTONE)
+ recs = self.ldap_get_dns_records(name+'2')
+ self.assertEqual(len(recs), 1)
+ self.assertEqual(recs[0].wType, dnsp.DNS_TYPE_TXT)
+
def delete_zone(self, zone):
self.rpc_conn.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
0,