]> git.ipfire.org Git - thirdparty/samba.git/blame - python/samba/tests/krb5/gmsa_tests.py
tests/krb5: Test that computers (and, by extension, gMSAs) cannot perform interactive...
[thirdparty/samba.git] / python / samba / tests / krb5 / gmsa_tests.py
CommitLineData
bb5ca9f4
JS
1#!/usr/bin/env python3
2# Unix SMB/CIFS implementation.
3# Copyright (C) Stefan Metzmacher 2020
4# Copyright (C) Catalyst.Net Ltd 2024
5#
6# This program is free software: you can redistribute it and/or modify
7# it under the terms of the GNU General Public License as published by
8# the Free Software Foundation, either version 3 of the License, or
9# (at your option) any later version.
10#
11# This program is distributed in the hope that it will be useful,
12# but WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14# GNU General Public License for more details.
15#
16# You should have received a copy of the GNU General Public License
17# along with this program. If not, see <https://www.gnu.org/licenses/>.
18#
19
20import sys
21import os
22
23sys.path.insert(0, "bin/python")
24os.environ["PYTHONUNBUFFERED"] = "1"
25
26from typing import Iterable, NewType, Optional, Tuple, TypeVar
27
28import datetime
29from itertools import chain
30
31import ldb
32
aa4347ff 33from samba import auth, dsdb, gensec, ntstatus, NTSTATUSError, werror
bb5ca9f4
JS
34from samba.dcerpc import gkdi, gmsa, misc, netlogon, security
35from samba.ndr import ndr_pack, ndr_unpack
36from samba.nt_time import (
37 nt_time_delta_from_timedelta,
38 nt_time_from_datetime,
39 NtTime,
40 NtTimeDelta,
41 timedelta_from_nt_time_delta,
42)
43from samba.samdb import SamDB
44from samba.credentials import Credentials, DONT_USE_KERBEROS
45from samba.gkdi import (
46 Gkid,
47 GroupKey,
48 KEY_CYCLE_DURATION,
47c519af 49 MAX_CLOCK_SKEW,
bb5ca9f4
JS
50)
51
52from samba.tests import connect_samdb
53from samba.tests.krb5 import kcrypto
1b765edb 54from samba.tests.gkdi import GkdiBaseTest, ROOT_KEY_START_TIME
bb5ca9f4
JS
55from samba.tests.krb5.kdc_base_test import KDCBaseTest
56from samba.tests.krb5.raw_testcase import KerberosCredentials
57from samba.tests.krb5.rfc4120_constants import (
58 KU_PA_ENC_TIMESTAMP,
59 NT_PRINCIPAL,
60 PADATA_ENC_TIMESTAMP,
61)
62import samba.tests.krb5.rfc4120_pyasn1 as krb5_asn1
63
64GMSA_DEFAULT_MANAGED_PASSWORD_INTERVAL = 30
65
66Gmsa = NewType("Gmsa", ldb.Message)
67
68
69def gkdi_rollover_interval(managed_password_interval: int) -> NtTimeDelta:
70 rollover_interval = NtTimeDelta(
71 managed_password_interval * 24 // 10 * KEY_CYCLE_DURATION
72 )
73 if rollover_interval == 0:
74 raise ValueError("rollover interval must not be zero")
75 return rollover_interval
76
77
78class GmsaSeries:
79 start_time: NtTime
80 rollover_interval: NtTimeDelta
81
82 def __init__(self, start_gkid: Gkid, rollover_interval: NtTimeDelta) -> None:
83 self.start_time = start_gkid.start_nt_time()
84 self.rollover_interval = rollover_interval
85
86 def interval_gkid(self, n: int) -> Gkid:
87 return Gkid.from_nt_time(self.start_of_interval(n))
88
89 def start_of_interval(self, n: int) -> NtTime:
90 if not isinstance(n, int):
91 raise ValueError(f"{n} must be an integer")
92 return NtTime(int(self.start_time + n * self.rollover_interval))
93
94 def during_interval(self, n: int) -> NtTime:
95 return NtTime(int(self.start_of_interval(n) + self.rollover_interval // 2))
96
97 def during_skew_window(self, n: int) -> NtTime:
98 two_minutes = nt_time_delta_from_timedelta(datetime.timedelta(minutes=2))
99 return NtTime(
100 int(self.start_of_interval(n) + self.rollover_interval - two_minutes)
101 )
102
ad074075
JS
103 def outside_previous_password_valid_window(self, n: int) -> NtTime:
104 return NtTime(self.start_of_interval(n) + MAX_CLOCK_SKEW)
105
106 def within_previous_password_valid_window(self, n: int) -> NtTime:
107 return NtTime(self.outside_previous_password_valid_window(n) - 1)
108
bb5ca9f4
JS
109
110class GmsaTests(GkdiBaseTest, KDCBaseTest):
111 def _as_req(
112 self,
113 creds: KerberosCredentials,
114 target_creds: KerberosCredentials,
115 enctype: kcrypto.Enctype,
116 ) -> dict:
117 preauth_key = self.PasswordKey_from_creds(creds, enctype)
118
119 def generate_padata_fn(
120 _kdc_exchange_dict: dict, _callback_dict: Optional[dict], req_body: dict
121 ) -> Tuple[list, dict]:
122 padata = []
123
124 patime, pausec = self.get_KerberosTimeWithUsec()
125 enc_ts = self.PA_ENC_TS_ENC_create(patime, pausec)
126 enc_ts = self.der_encode(enc_ts, asn1Spec=krb5_asn1.PA_ENC_TS_ENC())
127
128 enc_ts = self.EncryptedData_create(preauth_key, KU_PA_ENC_TIMESTAMP, enc_ts)
129 enc_ts = self.der_encode(enc_ts, asn1Spec=krb5_asn1.EncryptedData())
130
131 enc_ts = self.PA_DATA_create(PADATA_ENC_TIMESTAMP, enc_ts)
132
133 padata.append(enc_ts)
134
135 return padata, req_body
136
137 user_name = creds.get_username()
138 cname = self.PrincipalName_create(
139 name_type=NT_PRINCIPAL, names=user_name.split("/")
140 )
141
142 target_name = target_creds.get_username()
143 target_realm = target_creds.get_realm()
144
145 sname = self.PrincipalName_create(
146 name_type=NT_PRINCIPAL, names=["host", target_name[:-1]]
147 )
148
149 check_error_fn = None
150 check_rep_fn = self.generic_check_kdc_rep
151
152 expected_sname = self.PrincipalName_create(
153 name_type=NT_PRINCIPAL, names=[target_name]
154 )
155
156 kdc_options = "forwardable,renewable,canonicalize,renewable-ok"
157 kdc_options = krb5_asn1.KDCOptions(kdc_options)
158
159 ticket_decryption_key = self.TicketDecryptionKey_from_creds(target_creds)
160
161 kdc_exchange_dict = self.as_exchange_dict(
162 creds=creds,
163 expected_crealm=creds.get_realm(),
164 expected_cname=cname,
165 expected_srealm=target_realm,
166 expected_sname=expected_sname,
167 expected_supported_etypes=target_creds.tgs_supported_enctypes,
168 ticket_decryption_key=ticket_decryption_key,
169 generate_padata_fn=generate_padata_fn,
170 check_error_fn=check_error_fn,
171 check_rep_fn=check_rep_fn,
172 check_kdc_private_fn=self.generic_check_kdc_private,
173 expected_error_mode=0,
174 expected_salt=creds.get_salt(),
175 preauth_key=preauth_key,
176 kdc_options=str(kdc_options),
177 )
178
179 till = self.get_KerberosTime(offset=36000)
180
181 etypes = kcrypto.Enctype.AES256, kcrypto.Enctype.RC4
182
183 rep = self._generic_kdc_exchange(
184 kdc_exchange_dict,
185 cname=cname,
186 realm=target_realm,
187 sname=sname,
188 till_time=till,
189 etypes=etypes,
190 )
191 self.check_as_reply(rep)
192
193 return kdc_exchange_dict
194
195 # Note: unused
196 def gkdi_get_key_start_time(self, key_id: gkdi.KeyEnvelope) -> NtTime:
197 return Gkid.from_key_envelope(key_id).start_nt_time()
198
199 def get_password(
200 self,
201 samdb: SamDB,
202 target_sd: bytes,
203 root_key_id: Optional[misc.GUID],
204 gkid: Gkid,
205 sid: security.dom_sid,
206 ) -> bytes:
207 group_key = self.get_key_exact(samdb, target_sd, root_key_id, gkid)
208
209 password = self.generate_gmsa_password(group_key, sid)
210 return self.post_process_password_buffer(password)
211
212 def get_password_based_on_gkid(
213 self, samdb: SamDB, gkid: Gkid, sid: security.dom_sid
214 ) -> bytes:
215 return self.get_password(samdb, self.gmsa_sd, None, gkid, sid)
216
217 def get_password_based_on_timestamp(
218 self, samdb: SamDB, timestamp: NtTime, sid: security.dom_sid
219 ) -> bytes:
220 return self.get_password_based_on_gkid(samdb, Gkid.from_nt_time(timestamp), sid)
221
222 # Note: unused
223 def get_password_based_on_key_id(
224 self, samdb: SamDB, managed_password: gkdi.KeyEnvelope, sid: str
225 ) -> bytes:
226 return self.get_password(
227 samdb,
228 self.gmsa_sd,
229 managed_password.root_key_id,
230 Gkid.from_key_envelope(managed_password),
231 sid,
232 )
233
234 def generate_gmsa_password(self, key: GroupKey, sid: str) -> bytes:
235 context = ndr_pack(security.dom_sid(sid))
236 algorithm = key.hash_algorithm.algorithm()
237 gmsa_password_len = 256
238
239 return self.kdf(
240 algorithm,
241 key.key,
242 context,
243 label="GMSA PASSWORD",
244 len_in_bytes=gmsa_password_len,
245 )
246
247 def post_process_password_buffer(self, key: bytes) -> bytes:
248 self.assertEqual(0, len(key) & 1, f"length of key ({len(key)}) is not even")
249
250 def convert_null(t: Tuple[int, int]) -> Tuple[int, int]:
251 if t == (0, 0):
252 return 1, 0
253
254 return t
255
256 T = TypeVar("T")
257
258 def take_pairs(iterable: Iterable[T]) -> Iterable[Tuple[T, T]]:
259 it = iter(iterable)
260 while True:
261 try:
262 first = next(it)
263 except StopIteration:
264 break
265
266 yield first, next(it)
267
268 return bytes(chain.from_iterable(map(convert_null, take_pairs(key))))
269
270 def get_gmsa_object(self, samdb: SamDB, dn: ldb.Dn) -> Gmsa:
271 res = samdb.search(
272 dn,
273 scope=ldb.SCOPE_BASE,
274 attrs=[
275 "msDS-ManagedPasswordInterval",
276 "msDS-ManagedPasswordId",
277 "msDS-ManagedPasswordPreviousId",
278 "whenCreated",
279 ],
280 )
281 return res[0]
282
283 def gmsa_rollover_interval(self, gmsa_object: Gmsa) -> NtTimeDelta:
284 managed_password_interval = gmsa_object.get(
285 "msDS-ManagedPasswordInterval", idx=0
286 )
287 if managed_password_interval is None:
288 managed_password_interval = GMSA_DEFAULT_MANAGED_PASSWORD_INTERVAL
289 else:
290 managed_password_interval = int(managed_password_interval)
291
292 return gkdi_rollover_interval(managed_password_interval)
293
294 def gmsa_creation_nt_time(self, gmsa_object: Gmsa) -> NtTime:
295 creation_time: Optional[bytes] = gmsa_object.get("whenCreated", idx=0)
296 self.assertIsNotNone(creation_time)
297 assert creation_time is not None # to help the type checker
298
299 create_time = datetime.datetime.fromtimestamp(
300 ldb.string_to_time(creation_time.decode()), tz=datetime.timezone.utc
301 )
302 return nt_time_from_datetime(create_time)
303
304 def gmsa_series(self, managed_password_interval: int) -> GmsaSeries:
305 return GmsaSeries(
306 self.future_gkid(), gkdi_rollover_interval(managed_password_interval)
307 )
308
41e71406
JS
309 def gmsa_series_for_account(
310 self, samdb: SamDB, creds: KerberosCredentials, managed_password_interval: int
311 ) -> GmsaSeries:
312 gmsa_object = self.get_gmsa_object(samdb, creds.get_dn())
313 current_nt_time = self.current_nt_time(samdb)
314 gkid = Gkid.from_nt_time(
315 self.account_quantized_time(gmsa_object, current_nt_time)
316 )
317 return GmsaSeries(gkid, gkdi_rollover_interval(managed_password_interval))
318
577aa790
JS
319 def quantized_time(
320 self, key_start_time: NtTime, time: NtTime, gkdi_rollover_interval: NtTimeDelta
321 ) -> NtTime:
322 self.assertLessEqual(key_start_time, time)
323
324 time_since_key_start = NtTimeDelta(time - key_start_time)
325 quantized_time_since_key_start = NtTimeDelta(
326 time_since_key_start // gkdi_rollover_interval * gkdi_rollover_interval
327 )
328 return NtTime(key_start_time + quantized_time_since_key_start)
329
41e71406
JS
330 def account_quantized_time(self, gmsa_object: Gmsa, time: NtTime) -> NtTime:
331 pwd_id_blob = gmsa_object.get("msDS-ManagedPasswordId", idx=0)
332 self.assertIsNotNone(pwd_id_blob, "SAM should have initialized password ID")
333
334 pwd_id = ndr_unpack(gkdi.KeyEnvelope, pwd_id_blob)
335 key_start_time = Gkid.from_key_envelope(pwd_id).start_nt_time()
336
337 gkdi_rollover_interval = self.gmsa_rollover_interval(gmsa_object)
338 return self.quantized_time(key_start_time, time, gkdi_rollover_interval)
339
bb5ca9f4
JS
340 def expected_gmsa_password_blob(
341 self,
342 samdb: SamDB,
343 creds: KerberosCredentials,
344 gkid: Gkid,
345 *,
346 query_expiration_gkid: Gkid,
347 previous_gkid: Optional[Gkid] = None,
348 return_future_key: bool = False,
349 ) -> gmsa.MANAGEDPASSWORD_BLOB:
350 new_password = self.get_password_based_on_gkid(samdb, gkid, creds.get_sid())
351 old_password = None
352 if previous_gkid is not None:
353 old_password = self.get_password_based_on_gkid(
354 samdb, previous_gkid, creds.get_sid()
355 )
356
357 current_time = self.current_nt_time(samdb)
358
359 gmsa_object = self.get_gmsa_object(samdb, creds.get_dn())
360 gkdi_rollover_interval = self.gmsa_rollover_interval(gmsa_object)
361
362 query_expiration_time = query_expiration_gkid.start_nt_time()
363 query_password_interval = NtTimeDelta(query_expiration_time - current_time)
364 unchanged_password_interval = NtTimeDelta(
365 max(
366 0,
367 query_expiration_time
368 + (gkdi_rollover_interval if return_future_key else 0)
369 - current_time
370 - MAX_CLOCK_SKEW,
371 )
372 )
373
374 return self.marshal_password(
375 new_password,
376 old_password,
377 query_password_interval,
378 unchanged_password_interval,
379 )
380
381 def expected_current_gmsa_password_blob(
382 self,
383 samdb: SamDB,
384 creds: KerberosCredentials,
385 *,
386 future_key_is_acceptable: bool,
387 ) -> gmsa.MANAGEDPASSWORD_BLOB:
388 gmsa_object = self.get_gmsa_object(samdb, creds.get_dn())
389
390 gkdi_rollover_interval = self.gmsa_rollover_interval(gmsa_object)
391
392 pwd_id_blob = gmsa_object.get("msDS-ManagedPasswordId", idx=0)
393 self.assertIsNotNone(pwd_id_blob, "SAM should have initialized password ID")
394
395 pwd_id = ndr_unpack(gkdi.KeyEnvelope, pwd_id_blob)
396 key_start_time = Gkid.from_key_envelope(pwd_id).start_nt_time()
397
398 current_time = self.current_nt_time(samdb)
399
577aa790
JS
400 new_key_start_time = self.quantized_time(
401 key_start_time, current_time, gkdi_rollover_interval
bb5ca9f4 402 )
bb5ca9f4
JS
403 new_key_expiration_time = NtTime(new_key_start_time + gkdi_rollover_interval)
404
405 account_sid = creds.get_sid()
406
407 within_clock_skew_window = (
408 new_key_expiration_time - current_time <= MAX_CLOCK_SKEW
409 )
410 return_future_key = future_key_is_acceptable and within_clock_skew_window
411 if return_future_key:
412 new_password = self.get_password_based_on_timestamp(
413 samdb, new_key_expiration_time, account_sid
414 )
415 old_password = self.get_password_based_on_timestamp(
416 samdb, new_key_start_time, account_sid
417 )
418 else:
419 new_password = self.get_password_based_on_timestamp(
420 samdb, new_key_start_time, account_sid
421 )
422
423 account_age = NtTimeDelta(
424 current_time - self.gmsa_creation_nt_time(gmsa_object)
425 )
426 if account_age >= gkdi_rollover_interval:
427 old_password = self.get_password_based_on_timestamp(
428 samdb,
429 NtTime(new_key_start_time - gkdi_rollover_interval),
430 account_sid,
431 )
432 else:
433 # The account is not old enough to have a previous password.
434 old_password = None
435
436 key_expiration_time = NtTime(key_start_time + gkdi_rollover_interval)
437 key_is_expired = key_expiration_time <= current_time
438
439 query_expiration_time = NtTime(
440 new_key_expiration_time if key_is_expired else key_expiration_time
441 )
442 query_password_interval = NtTimeDelta(query_expiration_time - current_time)
443 unchanged_password_interval = NtTimeDelta(
444 max(
445 0,
446 query_expiration_time
447 + (gkdi_rollover_interval if return_future_key else 0)
448 - current_time
449 - MAX_CLOCK_SKEW,
450 )
451 )
452
453 return self.marshal_password(
454 new_password,
455 old_password,
456 query_password_interval,
457 unchanged_password_interval,
458 )
459
460 def marshal_password(
461 self,
462 current_password: bytes,
463 previous_password: Optional[bytes],
464 query_password_interval: NtTimeDelta,
465 unchanged_password_interval: NtTimeDelta,
466 ) -> gmsa.MANAGEDPASSWORD_BLOB:
467 managed_password = gmsa.MANAGEDPASSWORD_BLOB()
468
469 managed_password.passwords.current = current_password
470 managed_password.passwords.previous = previous_password
471 managed_password.passwords.query_interval = query_password_interval
472 managed_password.passwords.unchanged_interval = unchanged_password_interval
473
474 return managed_password
475
476 def gmsa_account(
477 self,
478 *,
479 samdb: Optional[SamDB] = None,
480 interval: int = 1,
481 msa_membership: Optional[str] = None,
482 **kwargs,
483 ) -> KerberosCredentials:
484 if msa_membership is None:
485 allow_world_sddl = "O:SYD:(A;;RP;;;WD)"
486 msa_membership = allow_world_sddl
487
488 msa_membership_sd = ndr_pack(
489 security.descriptor.from_sddl(msa_membership, security.dom_sid())
490 )
491
492 try:
493 creds = self.get_cached_creds(
494 samdb=samdb,
495 account_type=self.AccountType.GROUP_MANAGED_SERVICE,
496 opts={
497 "additional_details": self.freeze(
498 {
499 "msDS-GroupMSAMembership": msa_membership_sd,
500 "msDS-ManagedPasswordInterval": str(interval),
501 }
502 ),
503 **kwargs,
504 },
505 # Ensure the gMSA is a brand‐new account.
506 use_cache=False,
507 )
508 except ldb.LdbError as err:
509 if err.args[0] == ldb.ERR_UNWILLING_TO_PERFORM:
510 self.fail(
511 "If you’re running these tests against Windows, try “warming up”"
512 " the GKDI service by running `samba.tests.krb5.gkdi_tests` first."
513 )
514
515 raise
516
517 # Derive the account’s current password. The account is too new to have a previous password yet.
518 managed_pwd = self.expected_current_gmsa_password_blob(
519 self.get_samdb() if samdb is None else samdb,
520 creds,
521 future_key_is_acceptable=False,
522 )
523
524 # Set the password.
525 self.assertIsNotNone(
526 managed_pwd.passwords.current, "current password must be present"
527 )
528 creds.set_utf16_password(managed_pwd.passwords.current)
529
530 return creds
531
532 def get_local_samdb(self) -> SamDB:
533 """Return a connection to the local database."""
534
535 lp = self.get_lp()
536 samdb = connect_samdb(
537 samdb_url=lp.samdb_url(), lp=lp, credentials=self.get_admin_creds()
538 )
539 self.assertLocalSamDB(samdb)
540
541 return samdb
542
543 # Perform a gensec logon using NTLMSSP. As samdb is passed in as a
544 # parameter, it can have a time set on it with set_db_time().
545 def gensec_ntlmssp_logon(
aa4347ff
JS
546 self, client_creds: Credentials, samdb: SamDB, expect_success: bool = True
547 ) -> "Optional[auth.session_info]":
bb5ca9f4
JS
548 lp = self.get_lp()
549 lp.set("server role", "active directory domain controller")
550
551 settings = {"lp_ctx": lp, "target_hostname": lp.get("netbios name")}
552
553 gensec_client = gensec.Security.start_client(settings)
554 # Ensure that we don’t use Kerberos.
555 self.assertEqual(DONT_USE_KERBEROS, client_creds.get_kerberos_state())
556 gensec_client.set_credentials(client_creds)
557 gensec_client.want_feature(gensec.FEATURE_SEAL)
558 gensec_client.start_mech_by_name("ntlmssp")
559
560 auth_context = auth.AuthContext(lp_ctx=lp, ldb=samdb)
561
562 gensec_server = gensec.Security.start_server(settings, auth_context)
563 machine_creds = Credentials()
564 machine_creds.guess(lp)
565 machine_creds.set_machine_account(lp)
566 gensec_server.set_credentials(machine_creds)
567
568 gensec_server.start_mech_by_name("ntlmssp")
569
570 client_finished = False
571 server_finished = False
572 client_to_server = b""
573 server_to_client = b""
574
575 # Operate as both the client and the server to verify the user’s credentials.
576 while not client_finished or not server_finished:
577 if not client_finished:
578 client_finished, client_to_server = gensec_client.update(
579 server_to_client
580 )
581 if not server_finished:
aa4347ff
JS
582 try:
583 server_finished, server_to_client = gensec_server.update(
584 client_to_server
585 )
586 except NTSTATUSError as err:
587 self.assertFalse(expect_success, "got an unexpected error")
588
589 self.assertEqual(ntstatus.NT_STATUS_WRONG_PASSWORD, err.args[0])
590 return None
591
592 self.assertTrue(expect_success, "expected to get an error")
bb5ca9f4
JS
593
594 # Retrieve the SIDs from the security token.
595 return gensec_server.session_info()
596
597 def check_nt_interval(
598 self,
599 expected_nt_interval: NtTimeDelta,
600 nt_interval: NtTimeDelta,
601 interval_name: str,
602 ) -> None:
603 """Check that the intervals match to within thirty seconds or so."""
604
605 threshold = datetime.timedelta(seconds=30)
606
607 interval = timedelta_from_nt_time_delta(nt_interval)
608 expected_interval = timedelta_from_nt_time_delta(expected_nt_interval)
609 interval_difference = abs(interval - expected_interval)
610 self.assertLess(
611 interval_difference,
612 threshold,
613 f"{interval_name} ({interval}) is out by {interval_difference} from"
614 f" expected ({expected_interval})",
615 )
616
617 def check_managed_pwd_intervals(
618 self,
619 expected_managed_pwd: gmsa.MANAGEDPASSWORD_BLOB,
620 managed_pwd: gmsa.MANAGEDPASSWORD_BLOB,
621 ) -> None:
622 expected_passwords = expected_managed_pwd.passwords
623 passwords = managed_pwd.passwords
624
625 self.check_nt_interval(
626 expected_passwords.query_interval,
627 passwords.query_interval,
628 "query interval",
629 )
630 self.check_nt_interval(
631 expected_passwords.unchanged_interval,
632 passwords.unchanged_interval,
633 "unchanged interval",
634 )
635
636 def check_managed_pwd(
637 self,
638 samdb: SamDB,
639 creds: KerberosCredentials,
640 *,
641 expected_managed_pwd: gmsa.MANAGEDPASSWORD_BLOB,
642 ) -> None:
643 res = samdb.search(
644 creds.get_dn(), scope=ldb.SCOPE_BASE, attrs=["msDS-ManagedPassword"]
645 )
646 self.assertEqual(1, len(res), "gMSA not found")
647 managed_password = res[0].get("msDS-ManagedPassword", idx=0)
648
649 self.assertIsNotNone(managed_password)
650 managed_pwd = ndr_unpack(gmsa.MANAGEDPASSWORD_BLOB, managed_password)
651
652 self.assertEqual(1, managed_pwd.version)
653 self.assertEqual(0, managed_pwd.reserved)
654 self.assertEqual(len(managed_password), managed_pwd.length)
655
656 self.assertIsNotNone(expected_managed_pwd.passwords.current)
657
658 self.assertEqual(
659 managed_pwd.passwords.current, expected_managed_pwd.passwords.current
660 )
661 self.assertEqual(
662 managed_pwd.passwords.previous, expected_managed_pwd.passwords.previous
663 )
664
665 self.check_managed_pwd_intervals(expected_managed_pwd, managed_pwd)
666
667 # When creating a gMSA, Windows seems to pick the root key with the
668 # greatest msKds-CreateTime having msKds-UseStartTime ≤ ten hours ago.
669 # Bear in mind that it seems also to cache the key, so it won’t always
670 # use the latest one.
671
672 def get_managed_service_accounts_dn(self) -> ldb.Dn:
673 samdb = self.get_samdb()
674 return samdb.get_wellknown_dn(
675 samdb.get_default_basedn(), dsdb.DS_GUID_MANAGED_SERVICE_ACCOUNTS_CONTAINER
676 )
677
678 def check_managed_password_access(
20ce68f1
JS
679 self,
680 creds: Credentials,
681 *,
682 samdb: Optional[SamDB] = None,
683 expect_access: bool = False,
684 expected_werror: int = werror.WERR_SUCCESS,
bb5ca9f4 685 ) -> None:
20ce68f1
JS
686 if samdb is None:
687 samdb = self.get_samdb()
688 if expected_werror:
689 self.assertFalse(expect_access)
bb5ca9f4
JS
690 managed_service_accounts_dn = self.get_managed_service_accounts_dn()
691 username = creds.get_username()
692
693 # Try base, subtree, and one‐level searches.
694 searches = (
695 (creds.get_dn(), ldb.SCOPE_BASE),
696 (managed_service_accounts_dn, ldb.SCOPE_SUBTREE),
697 (managed_service_accounts_dn, ldb.SCOPE_ONELEVEL),
698 )
699
700 for dn, scope in searches:
701 # Perform a search and see whether we’re allowed to view the managed password.
702
20ce68f1
JS
703 try:
704 res = samdb.search(
705 dn,
706 scope=scope,
707 expression=f"sAMAccountName={username}",
708 attrs=["msDS-ManagedPassword"],
709 )
710 except ldb.LdbError as err:
711 self.assertTrue(expected_werror, "got an unexpected error")
712
713 num, estr = err.args
714 if num != ldb.ERR_OPERATIONS_ERROR:
715 raise
716
717 self.assertIn(f"{expected_werror:08X}", estr)
718 return
719
720 self.assertFalse(expected_werror, "expected to get an error")
bb5ca9f4
JS
721 self.assertEqual(1, len(res), "should always find the gMSA")
722
723 managed_password = res[0].get("msDS-ManagedPassword", idx=0)
724 if expect_access:
725 self.assertIsNotNone(
726 managed_password, "should be allowed to view the password"
727 )
728 else:
729 self.assertIsNone(
730 managed_password, "should not be allowed to view the password"
731 )
732
733 def test_retrieved_password_allowed(self):
734 """Test being allowed to view the managed password."""
735 self.check_managed_password_access(self.gmsa_account(), expect_access=True)
736
737 def test_retrieved_password_denied(self):
738 """Test not being allowed to view the managed password."""
739 deny_world_sddl = "O:SYD:(D;;RP;;;WD)"
740 self.check_managed_password_access(
741 self.gmsa_account(msa_membership=deny_world_sddl), expect_access=False
742 )
743
9fac9b77
JS
744 def test_retrieving_password_over_sealed_connection(self):
745 lp = self.get_lp()
746 samdb = SamDB(
747 f"ldap://{self.dc_host}",
748 credentials=self.get_admin_creds(),
749 session_info=auth.system_session(lp),
750 lp=lp,
751 )
752
753 self.check_managed_password_access(
754 self.gmsa_account(), samdb=samdb, expect_access=True
755 )
756
757 def test_retrieving_password_over_unsealed_connection(self):
758 # Requires --use-kerberos=required, or it automatically upgrades to an
759 # encrypted connection.
760
761 # Remove FEATURE_SEAL which gets added by insta_creds.
762 creds = self.insta_creds(template=self.get_admin_creds())
763 creds.set_gensec_features(creds.get_gensec_features() & ~gensec.FEATURE_SEAL)
764
765 lp = self.get_lp()
766
767 sasl_wrap = lp.get("client ldap sasl wrapping")
768 self.addCleanup(lp.set, "client ldap sasl wrapping", sasl_wrap)
769 lp.set("client ldap sasl wrapping", "sign")
770
771 # Create a second ldb connection without seal.
772 samdb = SamDB(
773 f"ldap://{self.dc_host}",
774 credentials=creds,
775 session_info=auth.system_session(lp),
776 lp=lp,
777 )
778
779 self.check_managed_password_access(
780 self.gmsa_account(),
781 samdb=samdb,
782 expected_werror=werror.WERR_DS_CONFIDENTIALITY_REQUIRED,
783 )
784
20ce68f1
JS
785 def test_retrieving_denied_password_over_unsealed_connection(self):
786 # Requires --use-kerberos=required, or it automatically upgrades to an
787 # encrypted connection.
788
789 # Remove FEATURE_SEAL which gets added by insta_creds.
790 creds = self.insta_creds(template=self.get_admin_creds())
791 creds.set_gensec_features(creds.get_gensec_features() & ~gensec.FEATURE_SEAL)
792
793 lp = self.get_lp()
794
795 sasl_wrap = lp.get("client ldap sasl wrapping")
796 self.addCleanup(lp.set, "client ldap sasl wrapping", sasl_wrap)
797 lp.set("client ldap sasl wrapping", "sign")
798
799 # Create a second ldb connection without seal.
800 samdb = SamDB(
801 f"ldap://{self.dc_host}",
802 credentials=creds,
803 session_info=auth.system_session(lp),
804 lp=lp,
805 )
806
807 # Deny anyone from being able to view the password.
808 deny_world_sddl = "O:SYD:(D;;RP;;;WD)"
809 self.check_managed_password_access(
810 self.gmsa_account(msa_membership=deny_world_sddl),
811 samdb=samdb,
812 expected_werror=werror.WERR_DS_CONFIDENTIALITY_REQUIRED,
813 )
814
bb5ca9f4
JS
815 def future_gkid(self) -> Gkid:
816 """Return (6333, 26, 5)—an arbitrary GKID far enough in the future that
817 it’s situated beyond any reasonable rollover period. But not so far in
818 the future that Python’s datetime library will throw OverflowErrors."""
819 future_date = datetime.datetime(9000, 1, 1, 0, 0, tzinfo=datetime.timezone.utc)
820 return Gkid.from_nt_time(nt_time_from_datetime(future_date))
821
822 def future_time(self) -> NtTime:
823 """Return an arbitrary time far enough in the future that it’s situated
824 beyond any reasonable rollover period. But not so far in the future that
825 Python’s datetime library will throw OverflowErrors."""
826 return self.future_gkid().start_nt_time()
827
828 def test_retrieved_password(self):
829 """Test that we can retrieve the correct password for a gMSA."""
830
831 samdb = self.get_samdb()
832 creds = self.gmsa_account()
833
834 expected = self.expected_current_gmsa_password_blob(
835 samdb,
836 creds,
837 future_key_is_acceptable=True,
838 )
839 self.check_managed_pwd(samdb, creds, expected_managed_pwd=expected)
840
841 def test_retrieved_password_when_current_key_is_valid(self):
842 """Test that we can retrieve the correct password for a gMSA at a time
843 when we are sure it is valid."""
844 password_interval = 37
845
846 samdb = self.get_local_samdb()
847 series = self.gmsa_series(password_interval)
848 self.set_db_time(samdb, series.start_of_interval(0))
849
850 creds = self.gmsa_account(samdb=samdb, interval=password_interval)
851
852 # Check the managed password of the account the moment it has been
853 # created.
854 expected = self.expected_gmsa_password_blob(
855 samdb,
856 creds,
857 series.interval_gkid(0),
858 previous_gkid=series.interval_gkid(-1),
859 query_expiration_gkid=series.interval_gkid(1),
860 )
861 self.check_managed_pwd(samdb, creds, expected_managed_pwd=expected)
862
863 def test_retrieved_password_when_current_key_is_expired(self):
864 """Test that we can retrieve the correct password for a gMSA when the
865 original password has expired."""
866 password_interval = 14
867
868 samdb = self.get_local_samdb()
869 series = self.gmsa_series(password_interval)
870 self.set_db_time(samdb, series.start_of_interval(0))
871
872 creds = self.gmsa_account(samdb=samdb, interval=password_interval)
873
874 # Set the time to the moment the original password has expired, and
875 # check that the managed password is correct.
876 expired_time = series.start_of_interval(1)
877 self.set_db_time(samdb, expired_time)
878 expected = self.expected_gmsa_password_blob(
879 samdb,
880 creds,
881 series.interval_gkid(1),
882 previous_gkid=series.interval_gkid(0),
883 query_expiration_gkid=series.interval_gkid(2),
884 )
885 self.check_managed_pwd(samdb, creds, expected_managed_pwd=expected)
886
887 def test_retrieved_password_when_next_key_is_expired(self):
888 password_interval = 1
889
890 samdb = self.get_local_samdb()
891 series = self.gmsa_series(password_interval)
892 self.set_db_time(samdb, series.start_of_interval(0))
893
894 creds = self.gmsa_account(samdb=samdb, interval=password_interval)
895
896 expired_time = series.start_of_interval(2)
897 self.set_db_time(samdb, expired_time)
898
899 expected = self.expected_gmsa_password_blob(
900 samdb,
901 creds,
902 series.interval_gkid(2),
903 previous_gkid=series.interval_gkid(1),
904 query_expiration_gkid=series.interval_gkid(3),
905 )
906 self.check_managed_pwd(samdb, creds, expected_managed_pwd=expected)
907
908 def test_retrieved_password_during_clock_skew_window_when_current_key_is_valid(
909 self,
910 ):
911 password_interval = 60
912
913 samdb = self.get_local_samdb()
914 series = self.gmsa_series(password_interval)
915 self.set_db_time(samdb, series.start_of_interval(0))
916
917 creds = self.gmsa_account(samdb=samdb, interval=password_interval)
918
919 self.set_db_time(samdb, series.during_skew_window(0))
920
921 expected = self.expected_gmsa_password_blob(
922 samdb,
923 creds,
924 series.interval_gkid(1),
925 previous_gkid=series.interval_gkid(0),
926 query_expiration_gkid=series.interval_gkid(1),
927 return_future_key=True,
928 )
929 self.check_managed_pwd(samdb, creds, expected_managed_pwd=expected)
930
931 def test_retrieved_password_during_clock_skew_window_when_current_key_is_expired(
932 self,
933 ):
934 password_interval = 100
935
936 samdb = self.get_local_samdb()
937 series = self.gmsa_series(password_interval)
938 self.set_db_time(samdb, series.start_of_interval(0))
939
940 creds = self.gmsa_account(samdb=samdb, interval=password_interval)
941
942 self.set_db_time(samdb, series.during_skew_window(1))
943
944 expected = self.expected_gmsa_password_blob(
945 samdb,
946 creds,
947 series.interval_gkid(2),
948 previous_gkid=series.interval_gkid(1),
949 query_expiration_gkid=series.interval_gkid(2),
950 return_future_key=True,
951 )
952 self.check_managed_pwd(samdb, creds, expected_managed_pwd=expected)
953
954 def test_retrieved_password_during_clock_skew_window_when_next_key_is_expired(
955 self,
956 ):
957 password_interval = 16
958
959 samdb = self.get_local_samdb()
960 series = self.gmsa_series(password_interval)
961 self.set_db_time(samdb, series.start_of_interval(0))
962
963 creds = self.gmsa_account(samdb=samdb, interval=password_interval)
964
965 self.set_db_time(samdb, series.during_skew_window(2))
966
967 expected = self.expected_gmsa_password_blob(
968 samdb,
969 creds,
970 series.interval_gkid(3),
971 previous_gkid=series.interval_gkid(2),
972 query_expiration_gkid=series.interval_gkid(3),
973 return_future_key=True,
974 )
975 self.check_managed_pwd(samdb, creds, expected_managed_pwd=expected)
976
1b765edb
JS
977 def test_retrieving_managed_password_triggers_keys_update(self):
978 # Create a root key with a start time early enough to be usable at the
979 # time the gMSA is purported to be created.
980 samdb = self.get_samdb()
981 domain_dn = self.get_server_dn(samdb)
982 self.create_root_key(samdb, domain_dn, use_start_time=ROOT_KEY_START_TIME)
983
984 password_interval = 16
985
986 local_samdb = self.get_local_samdb()
987 series = GmsaSeries(Gkid(100, 0, 0), gkdi_rollover_interval(password_interval))
988 self.set_db_time(local_samdb, series.start_of_interval(0))
989
990 creds = self.gmsa_account(samdb=local_samdb, interval=password_interval)
991 dn = creds.get_dn()
992
65fe0900 993 current_nt_time = self.current_nt_time(samdb)
1b765edb
JS
994 self.set_db_time(local_samdb, current_nt_time)
995
996 # Search the local database for the account’s keys.
997 res = local_samdb.search(
998 dn, scope=ldb.SCOPE_BASE, attrs=["unicodePwd", "supplementalCredentials"]
999 )
1000 self.assertEqual(1, len(res))
1001
1002 previous_nt_hash = res[0].get("unicodePwd", idx=0)
1003 previous_supplemental_creds = self.unpack_supplemental_credentials(
1004 res[0].get("supplementalCredentials", idx=0)
1005 )
1006
103ca027
JS
1007 # Check that the NT hash is the value we expect.
1008 self.assertEqual(creds.get_nt_hash(), previous_nt_hash)
1009
1b765edb
JS
1010 # Search for the managed password over LDAP, triggering an update of the
1011 # keys in the database.
1012 res = samdb.search(dn, scope=ldb.SCOPE_BASE, attrs=["msDS-ManagedPassword"])
1013 self.assertEqual(1, len(res))
1014
1015 # Verify that the password is present in the result.
1016 managed_password = res[0].get("msDS-ManagedPassword", idx=0)
1017 self.assertIsNotNone(managed_password, "should be allowed to view the password")
1018
1019 # Search the local database again for the account’s keys, which should
1020 # have been updated.
1021 res = local_samdb.search(
1022 dn, scope=ldb.SCOPE_BASE, attrs=["unicodePwd", "supplementalCredentials"]
1023 )
1024 self.assertEqual(1, len(res))
1025
1026 nt_hash = res[0].get("unicodePwd", idx=0)
1027 supplemental_creds = self.unpack_supplemental_credentials(
1028 res[0].get("supplementalCredentials", idx=0)
1029 )
1030
1031 self.assertNotEqual(
1032 previous_nt_hash, nt_hash, "NT hash has not been updated (yet)"
1033 )
1034 self.assertNotEqual(
1035 previous_supplemental_creds,
1036 supplemental_creds,
1037 "supplementalCredentials has not been updated (yet)",
1038 )
1039
103ca027
JS
1040 # Set the new password.
1041 managed_pwd = ndr_unpack(gmsa.MANAGEDPASSWORD_BLOB, managed_password)
1042 self.assertIsNotNone(
1043 managed_pwd.passwords.current, "current password must be present"
1044 )
1045 creds.set_utf16_password(managed_pwd.passwords.current)
1046
1047 # Check that the new NT hash is the value we expect.
1048 self.assertEqual(creds.get_nt_hash(), nt_hash)
1049
1b765edb
JS
1050 def test_authentication_triggers_keys_update(self):
1051 # Create a root key with a start time early enough to be usable at the
1052 # time the gMSA is purported to be created. But don’t create it on a
1053 # local samdb with a specifically set time, because (if the key isn’t
1054 # deleted later) we could end up with multiple keys with identical
1055 # creation and start times, and tests failing when the test and the
1056 # server don’t agree on which root key to use at a specific time.
1057 samdb = self.get_samdb()
1058 domain_dn = self.get_server_dn(samdb)
1059 self.create_root_key(samdb, domain_dn, use_start_time=ROOT_KEY_START_TIME)
1060
1061 password_interval = 16
1062
1063 local_samdb = self.get_local_samdb()
1064 series = GmsaSeries(Gkid(100, 0, 0), gkdi_rollover_interval(password_interval))
1065 self.set_db_time(local_samdb, series.start_of_interval(0))
1066
1067 creds = self.gmsa_account(samdb=local_samdb, interval=password_interval)
1068 dn = creds.get_dn()
1069
65fe0900 1070 current_nt_time = self.current_nt_time(samdb)
1b765edb
JS
1071 self.set_db_time(local_samdb, current_nt_time)
1072
1073 # Search the local database for the account’s keys.
1074 res = local_samdb.search(
1075 dn, scope=ldb.SCOPE_BASE, attrs=["unicodePwd", "supplementalCredentials"]
1076 )
1077 self.assertEqual(1, len(res))
1078
1079 previous_nt_hash = res[0].get("unicodePwd", idx=0)
1080 previous_supplemental_creds = self.unpack_supplemental_credentials(
1081 res[0].get("supplementalCredentials", idx=0)
1082 )
1083
103ca027
JS
1084 # Check that the NT hash is the value we expect.
1085 self.assertEqual(creds.get_nt_hash(), previous_nt_hash)
1086
1b765edb 1087 # Calculate the password with which to authenticate.
41e71406
JS
1088 current_series = self.gmsa_series_for_account(
1089 local_samdb, creds, password_interval
1090 )
1091 managed_pwd = self.expected_gmsa_password_blob(
1092 local_samdb,
1093 creds,
1094 current_series.interval_gkid(0),
1095 query_expiration_gkid=current_series.interval_gkid(1),
1b765edb
JS
1096 )
1097
1098 # Set the new password.
1099 self.assertIsNotNone(
1100 managed_pwd.passwords.current, "current password must be present"
1101 )
1102 creds.set_utf16_password(managed_pwd.passwords.current)
1103
1104 # Perform an authentication using the new password. The KDC should
1105 # recognize that the keys in the database are out of date and update
1106 # them.
1107 self._as_req(creds, self.get_service_creds(), kcrypto.Enctype.AES256)
1108
1109 # Search the local database again for the account’s keys, which should
1110 # have been updated.
1111 res = local_samdb.search(
1112 dn, scope=ldb.SCOPE_BASE, attrs=["unicodePwd", "supplementalCredentials"]
1113 )
1114 self.assertEqual(1, len(res))
1115
1116 nt_hash = res[0].get("unicodePwd", idx=0)
1117 supplemental_creds = self.unpack_supplemental_credentials(
1118 res[0].get("supplementalCredentials", idx=0)
1119 )
1120
1121 self.assertNotEqual(
1122 previous_nt_hash, nt_hash, "NT hash has not been updated (yet)"
1123 )
1124 self.assertNotEqual(
1125 previous_supplemental_creds,
1126 supplemental_creds,
1127 "supplementalCredentials has not been updated (yet)",
1128 )
1129
103ca027
JS
1130 # Check that the new NT hash is the value we expect.
1131 self.assertEqual(creds.get_nt_hash(), nt_hash)
1132
bb5ca9f4
JS
1133 def test_gmsa_can_perform_gensec_ntlmssp_logon(self):
1134 creds = self.gmsa_account(kerberos_enabled=False)
1135
1136 # Perform a gensec logon.
1137 session = self.gensec_ntlmssp_logon(creds, self.get_local_samdb())
1138
1139 # Ensure that the first SID contained within the security token is the gMSA’s SID.
1140 token = session.security_token
1141 token_sids = token.sids
1142 self.assertGreater(len(token_sids), 0)
1143
1144 # Ensure that they match.
1145 self.assertEqual(security.dom_sid(creds.get_sid()), token_sids[0])
1146
ad074075
JS
1147 def test_gmsa_can_perform_gensec_ntlmssp_logon_when_current_key_is_valid(self):
1148 """Test that we can perform a gensec logon at a time when we are sure
1149 the current gMSA password is valid."""
1150
1151 password_interval = 18
1152
1153 samdb = self.get_local_samdb()
1154 series = self.gmsa_series(password_interval)
1155 self.set_db_time(samdb, series.start_of_interval(0))
1156
1157 creds = self.gmsa_account(
1158 samdb=samdb, interval=password_interval, kerberos_enabled=False
1159 )
1160
1161 # Perform a gensec logon.
1162 session = self.gensec_ntlmssp_logon(creds, samdb)
1163
1164 # Ensure that the first SID contained within the security token is the gMSA’s SID.
1165 token = session.security_token
1166 token_sids = token.sids
1167 self.assertGreater(len(token_sids), 0)
1168
1169 # Ensure that they match.
1170 self.assertEqual(security.dom_sid(creds.get_sid()), token_sids[0])
1171
1172 def test_gmsa_can_perform_gensec_ntlmssp_logon_when_current_key_is_expired(self):
1173 """Test that we can perform a gensec logon using NTLMSSP at a time when
1174 the current gMSA password has expired."""
1175
1176 password_interval = 40
1177
1178 samdb = self.get_local_samdb()
1179 series = self.gmsa_series(password_interval)
1180 self.set_db_time(samdb, series.start_of_interval(0))
1181
1182 creds = self.gmsa_account(
1183 samdb=samdb, interval=password_interval, kerberos_enabled=False
1184 )
1185
1186 # Set the time to the moment the original password has expired, and
1187 # perform a gensec logon.
1188 expired_time = series.start_of_interval(1)
1189 self.set_db_time(samdb, expired_time)
1190
1191 # Calculate the password with which to authenticate.
1192 current_series = self.gmsa_series_for_account(samdb, creds, password_interval)
1193 managed_pwd = self.expected_gmsa_password_blob(
1194 samdb,
1195 creds,
1196 current_series.interval_gkid(0),
1197 previous_gkid=current_series.interval_gkid(-1),
1198 query_expiration_gkid=current_series.interval_gkid(1),
1199 )
1200
1201 # Set the new password.
1202 self.assertIsNotNone(
1203 managed_pwd.passwords.current, "current password must be present"
1204 )
1205 creds.set_utf16_password(managed_pwd.passwords.current)
1206
1207 # Perform a gensec logon.
1208 session = self.gensec_ntlmssp_logon(creds, samdb)
1209
1210 # Ensure that the first SID contained within the security token is the gMSA’s SID.
1211 token = session.security_token
1212 token_sids = token.sids
1213 self.assertGreater(len(token_sids), 0)
1214
1215 # Ensure that they match.
1216 self.assertEqual(security.dom_sid(creds.get_sid()), token_sids[0])
1217
1218 def test_gmsa_can_perform_gensec_ntlmssp_logon_when_next_key_is_expired(self):
1219 password_interval = 42
1220
1221 samdb = self.get_local_samdb()
1222 series = self.gmsa_series(password_interval)
1223 self.set_db_time(samdb, series.start_of_interval(0))
1224
1225 creds = self.gmsa_account(
1226 samdb=samdb, interval=password_interval, kerberos_enabled=False
1227 )
1228
1229 expired_time = series.start_of_interval(2)
1230 self.set_db_time(samdb, expired_time)
1231
1232 # Calculate the password with which to authenticate.
1233 current_series = self.gmsa_series_for_account(samdb, creds, password_interval)
1234 managed_pwd = self.expected_gmsa_password_blob(
1235 samdb,
1236 creds,
1237 current_series.interval_gkid(0),
1238 previous_gkid=current_series.interval_gkid(-1),
1239 query_expiration_gkid=current_series.interval_gkid(1),
1240 )
1241
1242 # Set the new password.
1243 self.assertIsNotNone(
1244 managed_pwd.passwords.current, "current password must be present"
1245 )
1246 creds.set_utf16_password(managed_pwd.passwords.current)
1247
1248 # Perform a gensec logon.
1249 session = self.gensec_ntlmssp_logon(creds, samdb)
1250
1251 # Ensure that the first SID contained within the security token is the gMSA’s SID.
1252 token = session.security_token
1253 token_sids = token.sids
1254 self.assertGreater(len(token_sids), 0)
1255
1256 # Ensure that they match.
1257 self.assertEqual(security.dom_sid(creds.get_sid()), token_sids[0])
1258
1259 def test_gmsa_can_perform_gensec_ntlmssp_logon_during_clock_skew_window_when_current_key_is_valid(
1260 self,
1261 ):
1262 password_interval = 43
1263
1264 samdb = self.get_local_samdb()
1265 series = self.gmsa_series(password_interval)
1266 self.set_db_time(samdb, series.start_of_interval(0))
1267
1268 creds = self.gmsa_account(
1269 samdb=samdb, interval=password_interval, kerberos_enabled=False
1270 )
1271
1272 self.set_db_time(samdb, series.during_skew_window(0))
1273
1274 # Calculate the password with which to authenticate.
1275 current_series = self.gmsa_series_for_account(samdb, creds, password_interval)
1276 managed_pwd = self.expected_gmsa_password_blob(
1277 samdb,
1278 creds,
1279 current_series.interval_gkid(0),
1280 previous_gkid=current_series.interval_gkid(-1),
1281 query_expiration_gkid=current_series.interval_gkid(1),
1282 )
1283
1284 # Set the new password.
1285 self.assertIsNotNone(
1286 managed_pwd.passwords.current, "current password must be present"
1287 )
1288 creds.set_utf16_password(managed_pwd.passwords.current)
1289
1290 # Perform a gensec logon.
1291 session = self.gensec_ntlmssp_logon(creds, samdb)
1292
1293 # Ensure that the first SID contained within the security token is the gMSA’s SID.
1294 token = session.security_token
1295 token_sids = token.sids
1296 self.assertGreater(len(token_sids), 0)
1297
1298 # Ensure that they match.
1299 self.assertEqual(security.dom_sid(creds.get_sid()), token_sids[0])
1300
1301 def test_gmsa_can_perform_gensec_ntlmssp_logon_during_clock_skew_window_when_current_key_is_expired(
1302 self,
1303 ):
1304 password_interval = 44
1305
1306 samdb = self.get_local_samdb()
1307 series = self.gmsa_series(password_interval)
1308 self.set_db_time(samdb, series.start_of_interval(0))
1309
1310 creds = self.gmsa_account(
1311 samdb=samdb, interval=password_interval, kerberos_enabled=False
1312 )
1313
1314 self.set_db_time(samdb, series.during_skew_window(1))
1315
1316 # Calculate the password with which to authenticate.
1317 current_series = self.gmsa_series_for_account(samdb, creds, password_interval)
1318 managed_pwd = self.expected_gmsa_password_blob(
1319 samdb,
1320 creds,
1321 current_series.interval_gkid(0),
1322 previous_gkid=current_series.interval_gkid(-1),
1323 query_expiration_gkid=current_series.interval_gkid(1),
1324 )
1325
1326 # Set the new password.
1327 self.assertIsNotNone(
1328 managed_pwd.passwords.current, "current password must be present"
1329 )
1330 creds.set_utf16_password(managed_pwd.passwords.current)
1331
1332 # Perform a gensec logon.
1333 session = self.gensec_ntlmssp_logon(creds, samdb)
1334
1335 # Ensure that the first SID contained within the security token is the gMSA’s SID.
1336 token = session.security_token
1337 token_sids = token.sids
1338 self.assertGreater(len(token_sids), 0)
1339
1340 # Ensure that they match.
1341 self.assertEqual(security.dom_sid(creds.get_sid()), token_sids[0])
1342
1343 def test_gmsa_can_perform_gensec_ntlmssp_logon_during_clock_skew_window_when_next_key_is_expired(
1344 self,
1345 ):
1346 password_interval = 47
1347
1348 samdb = self.get_local_samdb()
1349 series = self.gmsa_series(password_interval)
1350 self.set_db_time(samdb, series.start_of_interval(0))
1351
1352 creds = self.gmsa_account(
1353 samdb=samdb, interval=password_interval, kerberos_enabled=False
1354 )
1355
1356 self.set_db_time(samdb, series.during_skew_window(2))
1357
1358 # Calculate the password with which to authenticate.
1359 current_series = self.gmsa_series_for_account(samdb, creds, password_interval)
1360 managed_pwd = self.expected_gmsa_password_blob(
1361 samdb,
1362 creds,
1363 current_series.interval_gkid(0),
1364 previous_gkid=current_series.interval_gkid(-1),
1365 query_expiration_gkid=current_series.interval_gkid(1),
1366 )
1367
1368 # Set the new password.
1369 self.assertIsNotNone(
1370 managed_pwd.passwords.current, "current password must be present"
1371 )
1372 creds.set_utf16_password(managed_pwd.passwords.current)
1373
1374 # Perform a gensec logon.
1375 session = self.gensec_ntlmssp_logon(creds, samdb)
1376
1377 # Ensure that the first SID contained within the security token is the gMSA’s SID.
1378 token = session.security_token
1379 token_sids = token.sids
1380 self.assertGreater(len(token_sids), 0)
1381
1382 # Ensure that they match.
1383 self.assertEqual(security.dom_sid(creds.get_sid()), token_sids[0])
1384
1385 def test_gmsa_can_perform_gensec_ntlmssp_logon_with_previous_password_within_five_minutes(
1386 self,
1387 ):
1388 password_interval = 123
1389
1390 samdb = self.get_local_samdb()
1391 series = self.gmsa_series(password_interval)
1392 self.set_db_time(samdb, series.start_of_interval(0))
1393
1394 creds = self.gmsa_account(
1395 samdb=samdb, interval=password_interval, kerberos_enabled=False
1396 )
1397
1398 # Set the time to within five minutes of the original password’s expiry,
1399 # and perform a gensec logon with the original password.
1400 expired_time = series.within_previous_password_valid_window(1)
1401 self.set_db_time(samdb, expired_time)
1402
1403 # Perform a gensec logon.
1404 session = self.gensec_ntlmssp_logon(creds, samdb)
1405
1406 # Ensure that the first SID contained within the security token is the gMSA’s SID.
1407 token = session.security_token
1408 token_sids = token.sids
1409 self.assertGreater(len(token_sids), 0)
1410
1411 # Ensure that they match.
1412 self.assertEqual(security.dom_sid(creds.get_sid()), token_sids[0])
1413
1414 def test_gmsa_cannot_perform_gensec_ntlmssp_logon_with_previous_but_one_password_within_five_minutes(
1415 self,
1416 ):
1417 password_interval = 123
1418
1419 samdb = self.get_local_samdb()
1420 series = self.gmsa_series(password_interval)
1421 self.set_db_time(samdb, series.start_of_interval(0))
1422
1423 creds = self.gmsa_account(
1424 samdb=samdb, interval=password_interval, kerberos_enabled=False
1425 )
1426
1427 # Set the time to within five minutes of the *following* password’s expiry,
1428 # and perform a gensec logon with the original password.
1429 expired_time = series.within_previous_password_valid_window(2)
1430 self.set_db_time(samdb, expired_time)
1431
1432 # Expect the gensec logon to fail.
1433 self.gensec_ntlmssp_logon(creds, samdb, expect_success=False)
1434
1435 def test_gmsa_can_perform_gensec_ntlmssp_logon_with_previous_password_beyond_five_minutes(
1436 self,
1437 ):
1438 password_interval = 456
1439
1440 samdb = self.get_local_samdb()
1441 series = self.gmsa_series(password_interval)
1442 self.set_db_time(samdb, series.start_of_interval(0))
1443
1444 creds = self.gmsa_account(
1445 samdb=samdb, interval=password_interval, kerberos_enabled=False
1446 )
1447
1448 # Set the time to five minutes beyond the original password’s expiry,
1449 # and try to perform a gensec logon with the original password.
1450 expired_time = series.outside_previous_password_valid_window(1)
1451 self.set_db_time(samdb, expired_time)
1452
1453 # Perform a gensec logon.
1454 session = self.gensec_ntlmssp_logon(creds, samdb)
1455
1456 # Ensure that the first SID contained within the security token is the gMSA’s SID.
1457 token = session.security_token
1458 token_sids = token.sids
1459 self.assertGreater(len(token_sids), 0)
1460
1461 # Ensure that they match.
1462 self.assertEqual(security.dom_sid(creds.get_sid()), token_sids[0])
1463
1464 def test_gmsa_cannot_perform_gensec_ntlmssp_logon_with_previous_password_five_minutes_apart(
1465 self,
1466 ):
1467 password_interval = 789
1468
1469 samdb = self.get_local_samdb()
1470 series = self.gmsa_series(password_interval)
1471 self.set_db_time(samdb, series.start_of_interval(0))
1472
1473 creds = self.gmsa_account(
1474 samdb=samdb, interval=password_interval, kerberos_enabled=False
1475 )
1476 gmsa_sid = creds.get_sid()
1477
1478 # Set the time to after the original password’s expiry, and perform a
1479 # gensec logon with the original password.
1480 db_time = series.during_interval(1)
1481 self.set_db_time(samdb, db_time)
1482
1483 # Perform a gensec logon.
1484 session = self.gensec_ntlmssp_logon(creds, samdb)
1485
1486 # Ensure that the first SID contained within the security token is the gMSA’s SID.
1487 token = session.security_token
1488 token_sids = token.sids
1489 self.assertGreater(len(token_sids), 0)
1490
1491 # Ensure that they match.
1492 self.assertEqual(security.dom_sid(gmsa_sid), token_sids[0])
1493
1494 # Set the time to not quite five minutes later, and perform a gensec
1495 # logon with the original password.
1496 self.set_db_time(samdb, NtTime(db_time + MAX_CLOCK_SKEW - 1))
1497
1498 # Perform a gensec logon.
1499 session = self.gensec_ntlmssp_logon(creds, samdb)
1500
1501 # Ensure that the first SID contained within the security token is the gMSA’s SID.
1502 token = session.security_token
1503 token_sids = token.sids
1504 self.assertGreater(len(token_sids), 0)
1505
1506 # Ensure that they match.
1507 self.assertEqual(security.dom_sid(gmsa_sid), token_sids[0])
1508
1509 # Now set the time to exactly five minutes later, and try to perform a
1510 # gensec logon with the original password.
1511 self.set_db_time(samdb, NtTime(db_time + MAX_CLOCK_SKEW))
1512
1513 # Expect the gensec logon to fail.
1514 self.gensec_ntlmssp_logon(creds, samdb, expect_success=False)
1515
bb5ca9f4 1516 def test_gmsa_can_perform_netlogon(self):
bb5ca9f4 1517 self._test_samlogon(
336a5847 1518 self.gmsa_account(kerberos_enabled=False),
bb5ca9f4
JS
1519 netlogon.NetlogonNetworkInformation,
1520 validation_level=netlogon.NetlogonValidationSamInfo4,
bb5ca9f4
JS
1521 )
1522
f9cbda9c
JS
1523 def test_computer_cannot_perform_interactive_logon(self):
1524 self._test_samlogon(
1525 self.get_mach_creds(),
1526 netlogon.NetlogonInteractiveInformation,
1527 expect_error=ntstatus.NT_STATUS_NO_SUCH_USER,
1528 validation_level=netlogon.NetlogonValidationSamInfo4,
1529 )
1530
1531 def test_gmsa_cannot_perform_interactive_logon(self):
1532 self._test_samlogon(
1533 self.gmsa_account(kerberos_enabled=False),
1534 netlogon.NetlogonInteractiveInformation,
1535 expect_error=ntstatus.NT_STATUS_NO_SUCH_USER,
1536 validation_level=netlogon.NetlogonValidationSamInfo4,
1537 )
1538
bb5ca9f4
JS
1539 def _gmsa_can_perform_as_req(self, *, enctype: kcrypto.Enctype) -> None:
1540 self._as_req(self.gmsa_account(), self.get_service_creds(), enctype)
1541
1542 def test_gmsa_can_perform_as_req_with_aes256(self):
1543 self._gmsa_can_perform_as_req(enctype=kcrypto.Enctype.AES256)
1544
1545 def test_gmsa_can_perform_as_req_with_rc4(self):
1546 self._gmsa_can_perform_as_req(enctype=kcrypto.Enctype.RC4)
1547
1548 def _gmsa_can_authenticate_to_ldap(self, *, with_kerberos: bool) -> None:
1549 creds = self.gmsa_account(kerberos_enabled=with_kerberos)
1550
1551 protocol = "ldap"
1552
1553 # Authenticate to LDAP.
1554 samdb_user = SamDB(
1555 url=f"{protocol}://{self.dc_host}", credentials=creds, lp=self.get_lp()
1556 )
1557
1558 # Search for the user’s token groups.
1559 res = samdb_user.search("", scope=ldb.SCOPE_BASE, attrs=["tokenGroups"])
1560 self.assertEqual(1, len(res))
1561
1562 token_groups = res[0].get("tokenGroups", idx=0)
1563 self.assertIsNotNone(token_groups)
1564
1565 # Ensure that the token SID matches.
1566 token_sid = ndr_unpack(security.dom_sid, token_groups)
1567 self.assertEqual(security.dom_sid(creds.get_sid()), token_sid)
1568
1569 def test_gmsa_can_authenticate_to_ldap_with_kerberos(self):
1570 self._gmsa_can_authenticate_to_ldap(with_kerberos=True)
1571
1572 def test_gmsa_can_authenticate_to_ldap_without_kerberos(self):
1573 self._gmsa_can_authenticate_to_ldap(with_kerberos=False)
1574
1575
1576if __name__ == "__main__":
1577 import unittest
1578
1579 unittest.main()