from samba.dcerpc import dns, dnsp
from samba import gensec, tests
from samba import credentials
+from samba import NTSTATUSError
import struct
import samba.ndr as ndr
import random
if creds is None:
creds = self.creds
- self.key_name = "%s.%s" % (uuid.uuid4(), self.get_dns_domain())
+ mech = 'spnego'
+
+ tkey = {}
+ tkey['name'] = "%s.%s" % (uuid.uuid4(), self.get_dns_domain())
+ tkey['creds'] = creds
+ tkey['mech'] = mech
+ tkey['algorithm'] = algorithm_name
p = self.make_name_packet(dns.DNS_OPCODE_QUERY)
- q = self.make_name_question(self.key_name,
+ q = self.make_name_question(tkey['name'],
dns.DNS_QTYPE_TKEY,
dns.DNS_QCLASS_IN)
questions = []
self.finish_name_packet(p, questions)
r = dns.res_rec()
- r.name = self.key_name
+ r.name = tkey['name']
r.rr_type = dns.DNS_QTYPE_TKEY
r.rr_class = dns.DNS_QCLASS_IN
r.ttl = 0
rdata.error = 0
rdata.other_size = 0
- self.g = gensec.Security.start_client(self.settings)
- self.g.set_credentials(creds)
- self.g.set_target_service("dns")
- self.g.set_target_hostname(self.server)
- self.g.want_feature(gensec.FEATURE_SIGN)
- self.g.start_mech_by_name("spnego")
+ tkey['gensec'] = gensec.Security.start_client(self.settings)
+ tkey['gensec'].set_credentials(creds)
+ tkey['gensec'].set_target_service("dns")
+ tkey['gensec'].set_target_hostname(self.server)
+ tkey['gensec'].want_feature(gensec.FEATURE_SIGN)
+ tkey['gensec'].start_mech_by_name(tkey['mech'])
finished = False
client_to_server = b""
- (finished, server_to_client) = self.g.update(client_to_server)
+ (finished, server_to_client) = tkey['gensec'].update(client_to_server)
self.assertFalse(finished)
data = [x if isinstance(x, int) else ord(x) for x in list(server_to_client)]
tkey_record = response.answers[0].rdata
server_to_client = bytes(tkey_record.key_data)
- (finished, client_to_server) = self.g.update(server_to_client)
+ (finished, client_to_server) = tkey['gensec'].update(server_to_client)
self.assertTrue(finished)
+ self.tkey = tkey
+
self.verify_packet(response, response_packet)
def verify_packet(self, response, response_packet, request_mac=b""):
response_packet_wo_tsig = ndr.ndr_pack(response_copy)
fake_tsig = dns.fake_tsig_rec()
- fake_tsig.name = self.key_name
+ fake_tsig.name = self.tkey['name']
fake_tsig.rr_class = dns.DNS_QCLASS_ANY
fake_tsig.ttl = 0
fake_tsig.time_prefix = tsig_record.time_prefix
fake_tsig_packet = ndr.ndr_pack(fake_tsig)
data = request_mac + response_packet_wo_tsig + fake_tsig_packet
- self.g.check_packet(data, data, mac)
+ try:
+ self.tkey['gensec'].check_packet(data, data, mac)
+ except NTSTATUSError as nt:
+ raise AssertionError(nt)
def sign_packet(self, packet, key_name,
algorithm_name="gss-tsig",
fake_tsig_packet = ndr.ndr_pack(fake_tsig)
data = packet_data + fake_tsig_packet
- mac = self.g.sign_packet(data, data)
+ mac = self.tkey['gensec'].sign_packet(data, data)
mac_list = [x if isinstance(x, int) else ord(x) for x in list(mac)]
if bad_sig:
if len(mac) > 8:
self.tkey_trans()
p = self.make_update_request()
- self.bad_sign_packet(p, self.key_name)
+ self.bad_sign_packet(p, self.tkey['name'])
(response, response_p) = self.dns_transaction_udp(p, self.server_ip)
self.assert_echoed_dns_error(p, response, response_p, dns.DNS_RCODE_REFUSED)
self.tkey_trans()
p = self.make_update_request()
- mac = self.sign_packet(p, self.key_name)
+ mac = self.sign_packet(p, self.tkey['name'])
(response, response_p) = self.dns_transaction_udp(p, self.server_ip)
self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
self.verify_packet(response, response_p, mac)
# Now delete the record
p = self.make_update_request(delete=True)
- mac = self.sign_packet(p, self.key_name)
+ mac = self.sign_packet(p, self.tkey['name'])
(response, response_p) = self.dns_transaction_udp(p, self.server_ip)
self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
self.verify_packet(response, response_p, mac)
self.assert_echoed_dns_error(p, response, response_p, dns.DNS_RCODE_REFUSED)
self.tkey_trans()
- mac = self.sign_packet(p, self.key_name)
+ mac = self.sign_packet(p, self.tkey['name'])
(response, response_p) = self.dns_transaction_udp(p, self.server_ip)
self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
self.verify_packet(response, response_p, mac)
# Now delete the record
p = self.make_update_request(delete=True)
- mac = self.sign_packet(p, self.key_name)
+ mac = self.sign_packet(p, self.tkey['name'])
(response, response_p) = self.dns_transaction_udp(p, self.server_ip)
self.assert_dns_rcode_equals(response, dns.DNS_RCODE_OK)
self.verify_packet(response, response_p, mac)