return getattr(dnsp, 'DNS_TYPE_' + rtype)
except AttributeError:
raise DNSParseError('Unknown type of DNS record %s' % rec_type) from e
+
+
+def dns_name_equal(n1, n2):
+ """Match dns name (of type DNS_RPC_NAME)"""
+ return n1.str.rstrip('.').lower() == n2.str.rstrip('.').lower()
+
+
+def dns_record_match(dns_conn, server, zone, name, record_type, data):
+ """Find a dns record that matches the specified data"""
+
+ # The matching is not as precises as that offered by
+ # dsdb_dns.match_record, which, for example, compares IPv6 records
+ # semantically rather than as strings. However that function
+ # compares database DnssrvRpcRecord structures, not wire
+ # DNS_RPC_RECORD structures.
+ #
+ # While it would be possible, perhaps desirable, to wrap that
+ # function for use in samba-tool, there is value in having a
+ # separate implementation for tests, to avoid the circularity of
+ # asserting the function matches itself.
+
+ urec = record_from_string(record_type, data)
+
+ select_flags = dnsserver.DNS_RPC_VIEW_AUTHORITY_DATA
+
+ try:
+ buflen, res = dns_conn.DnssrvEnumRecords2(
+ dnsserver.DNS_CLIENT_VERSION_LONGHORN, 0, server, zone, name, None,
+ record_type, select_flags, None, None)
+ except WERRORError as e:
+ if e.args[0] == werror.WERR_DNS_ERROR_NAME_DOES_NOT_EXIST:
+ # Either the zone doesn't exist, or there were no records.
+ # We can't differentiate the two.
+ return None
+ raise e
+
+ if not res or res.count == 0:
+ return None
+
+ for rec in res.rec[0].records:
+ if rec.wType != record_type:
+ continue
+
+ found = False
+ if record_type == dnsp.DNS_TYPE_A:
+ if rec.data == urec.data:
+ found = True
+ elif record_type == dnsp.DNS_TYPE_AAAA:
+ if rec.data == urec.data:
+ found = True
+ elif record_type == dnsp.DNS_TYPE_PTR:
+ if dns_name_equal(rec.data, urec.data):
+ found = True
+ elif record_type == dnsp.DNS_TYPE_CNAME:
+ if dns_name_equal(rec.data, urec.data):
+ found = True
+ elif record_type == dnsp.DNS_TYPE_NS:
+ if dns_name_equal(rec.data, urec.data):
+ found = True
+ elif record_type == dnsp.DNS_TYPE_MX:
+ if dns_name_equal(rec.data.nameExchange, urec.data.nameExchange) and \
+ rec.data.wPreference == urec.data.wPreference:
+ found = True
+ elif record_type == dnsp.DNS_TYPE_SRV:
+ if rec.data.wPriority == urec.data.wPriority and \
+ rec.data.wWeight == urec.data.wWeight and \
+ rec.data.wPort == urec.data.wPort and \
+ dns_name_equal(rec.data.nameTarget, urec.data.nameTarget):
+ found = True
+ elif record_type == dnsp.DNS_TYPE_SOA:
+ if rec.data.dwSerialNo == urec.data.dwSerialNo and \
+ rec.data.dwRefresh == urec.data.dwRefresh and \
+ rec.data.dwRetry == urec.data.dwRetry and \
+ rec.data.dwExpire == urec.data.dwExpire and \
+ rec.data.dwMinimumTtl == urec.data.dwMinimumTtl and \
+ dns_name_equal(rec.data.NamePrimaryServer,
+ urec.data.NamePrimaryServer) and \
+ dns_name_equal(rec.data.ZoneAdministratorEmail,
+ urec.data.ZoneAdministratorEmail):
+ found = True
+ elif record_type == dnsp.DNS_TYPE_TXT:
+ if rec.data.count == urec.data.count:
+ found = True
+ for i in range(rec.data.count):
+ found = found and \
+ (rec.data.str[i].str == urec.data.str[i].str)
+
+ if found:
+ return rec
+
+ return None
from samba.dcerpc import dnsp, dnsserver
from samba.dnsserver import record_from_string, DNSParseError, flag_from_string
+from samba.dnsserver import dns_record_match
def dns_connect(server, lp, creds):
return rec
-# Match dns name (of type DNS_RPC_NAME)
-def dns_name_equal(n1, n2):
- return n1.str.rstrip('.').lower() == n2.str.rstrip('.').lower()
-
-
-# Match a dns record with specified data
-def dns_record_match(dns_conn, server, zone, name, record_type, data):
- urec = data_to_dns_record(record_type, data)
-
- select_flags = dnsserver.DNS_RPC_VIEW_AUTHORITY_DATA
-
- try:
- buflen, res = dns_conn.DnssrvEnumRecords2(
- dnsserver.DNS_CLIENT_VERSION_LONGHORN, 0, server, zone, name, None,
- record_type, select_flags, None, None)
- except WERRORError as e:
- if e.args[0] == werror.WERR_DNS_ERROR_NAME_DOES_NOT_EXIST:
- # Either the zone doesn't exist, or there were no records.
- # We can't differentiate the two.
- return None
- raise e
-
- if not res or res.count == 0:
- return None
-
- for rec in res.rec[0].records:
- if rec.wType != record_type:
- continue
-
- found = False
- if record_type == dnsp.DNS_TYPE_A:
- if rec.data == urec.data:
- found = True
- elif record_type == dnsp.DNS_TYPE_AAAA:
- if rec.data == urec.data:
- found = True
- elif record_type == dnsp.DNS_TYPE_PTR:
- if dns_name_equal(rec.data, urec.data):
- found = True
- elif record_type == dnsp.DNS_TYPE_CNAME:
- if dns_name_equal(rec.data, urec.data):
- found = True
- elif record_type == dnsp.DNS_TYPE_NS:
- if dns_name_equal(rec.data, urec.data):
- found = True
- elif record_type == dnsp.DNS_TYPE_MX:
- if dns_name_equal(rec.data.nameExchange, urec.data.nameExchange) and \
- rec.data.wPreference == urec.data.wPreference:
- found = True
- elif record_type == dnsp.DNS_TYPE_SRV:
- if rec.data.wPriority == urec.data.wPriority and \
- rec.data.wWeight == urec.data.wWeight and \
- rec.data.wPort == urec.data.wPort and \
- dns_name_equal(rec.data.nameTarget, urec.data.nameTarget):
- found = True
- elif record_type == dnsp.DNS_TYPE_SOA:
- if rec.data.dwSerialNo == urec.data.dwSerialNo and \
- rec.data.dwRefresh == urec.data.dwRefresh and \
- rec.data.dwRetry == urec.data.dwRetry and \
- rec.data.dwExpire == urec.data.dwExpire and \
- rec.data.dwMinimumTtl == urec.data.dwMinimumTtl and \
- dns_name_equal(rec.data.NamePrimaryServer,
- urec.data.NamePrimaryServer) and \
- dns_name_equal(rec.data.ZoneAdministratorEmail,
- urec.data.ZoneAdministratorEmail):
- found = True
- elif record_type == dnsp.DNS_TYPE_TXT:
- if rec.data.count == urec.data.count:
- found = True
- for i in range(rec.data.count):
- found = found and \
- (rec.data.str[i].str == urec.data.str[i].str)
-
- if found:
- return rec
-
- return None
-
-
class cmd_serverinfo(Command):
"""Query for Server information."""
self.creds = credopts.get_credentials(self.lp)
dns_conn = dns_connect(server, self.lp, self.creds)
- rec_match = dns_record_match(dns_conn, server, zone, name, record_type,
- olddata)
+ try:
+ rec_match = dns_record_match(dns_conn, server, zone, name, record_type,
+ olddata)
+ except DNSParseError as e:
+ raise CommandError(*e.args) from None
+
if not rec_match:
raise CommandError('Record or zone does not exist.')
from samba import credentials
from samba.dcerpc import dns, dnsp, dnsserver
from samba.dnsserver import TXTRecord
-from samba.dnsserver import record_from_string
-from samba.netcmd.dns import dns_record_match
+from samba.dnsserver import record_from_string, dns_record_match
from samba.tests.subunitrun import SubunitOptions, TestProgram
from samba import werror, WERRORError
from samba.tests.dns_base import DNSTest