import sys
import os
+import ldb
+
+
+from samba import dsdb
+
+from samba.dcerpc import krb5pac
+
sys.path.insert(0, "bin/python")
os.environ["PYTHONUNBUFFERED"] = "1"
KRB_TGS_REP,
KDC_ERR_BADMATCH,
KDC_ERR_BADOPTION,
+ KDC_ERR_CLIENT_NAME_MISMATCH,
+ KDC_ERR_POLICY,
+ KDC_ERR_S_PRINCIPAL_UNKNOWN,
+ KDC_ERR_TGT_REVOKED,
NT_PRINCIPAL,
NT_SRV_INST,
)
self._make_tgs_request(client_creds, service_creds, tgt,
expect_pac=False, expect_error=True)
+ # Test making a TGS request.
+ def test_tgs_req(self):
+ creds = self._get_creds()
+ tgt = self._get_tgt(creds)
+ self._run_tgs(tgt, expected_error=0)
+
+ def test_renew_req(self):
+ creds = self._get_creds()
+ tgt = self._get_tgt(creds, renewable=True)
+ self._renew_tgt(tgt, expected_error=0)
+
+ def test_validate_req(self):
+ creds = self._get_creds()
+ tgt = self._get_tgt(creds, invalid=True)
+ self._validate_tgt(tgt, expected_error=0)
+
+ def test_s4u2self_req(self):
+ creds = self._get_creds()
+ tgt = self._get_tgt(creds)
+ self._s4u2self(tgt, creds, expected_error=0)
+
+ def test_user2user_req(self):
+ creds = self._get_creds()
+ tgt = self._get_tgt(creds)
+ self._user2user(tgt, creds, expected_error=0)
+
+ # Test making a request without a PAC.
+ def test_tgs_no_pac(self):
+ creds = self._get_creds()
+ tgt = self._get_tgt(creds, remove_pac=True)
+ self._run_tgs(tgt, expected_error=KDC_ERR_BADOPTION)
+
+ def test_renew_no_pac(self):
+ creds = self._get_creds()
+ tgt = self._get_tgt(creds, renewable=True, remove_pac=True)
+ self._renew_tgt(tgt, expected_error=KDC_ERR_BADOPTION)
+
+ def test_validate_no_pac(self):
+ creds = self._get_creds()
+ tgt = self._get_tgt(creds, invalid=True, remove_pac=True)
+ self._validate_tgt(tgt, expected_error=KDC_ERR_BADOPTION)
+
+ def test_s4u2self_no_pac(self):
+ creds = self._get_creds()
+ tgt = self._get_tgt(creds, remove_pac=True)
+ self._s4u2self(tgt, creds, expected_error=KDC_ERR_BADOPTION)
+
+ def test_user2user_no_pac(self):
+ creds = self._get_creds()
+ tgt = self._get_tgt(creds, remove_pac=True)
+ self._user2user(tgt, creds, expected_error=KDC_ERR_BADOPTION)
+
+ # Test changing the SID in the PAC to that of another account.
+ def test_tgs_sid_mismatch_existing(self):
+ creds = self._get_creds()
+ existing_rid = self._get_existing_rid()
+ tgt = self._get_tgt(creds, new_rid=existing_rid)
+ self._run_tgs(tgt, expected_error=KDC_ERR_CLIENT_NAME_MISMATCH)
+
+ def test_renew_sid_mismatch_existing(self):
+ creds = self._get_creds()
+ existing_rid = self._get_existing_rid()
+ tgt = self._get_tgt(creds, renewable=True, new_rid=existing_rid)
+ self._renew_tgt(tgt, expected_error=KDC_ERR_CLIENT_NAME_MISMATCH)
+
+ def test_validate_sid_mismatch_existing(self):
+ creds = self._get_creds()
+ existing_rid = self._get_existing_rid()
+ tgt = self._get_tgt(creds, invalid=True, new_rid=existing_rid)
+ self._validate_tgt(tgt, expected_error=KDC_ERR_CLIENT_NAME_MISMATCH)
+
+ def test_s4u2self_sid_mismatch_existing(self):
+ creds = self._get_creds()
+ existing_rid = self._get_existing_rid()
+ tgt = self._get_tgt(creds, new_rid=existing_rid)
+ self._s4u2self(tgt, creds,
+ expected_error=KDC_ERR_CLIENT_NAME_MISMATCH)
+
+ def test_user2user_sid_mismatch_existing(self):
+ creds = self._get_creds()
+ existing_rid = self._get_existing_rid()
+ tgt = self._get_tgt(creds, new_rid=existing_rid)
+ self._user2user(tgt, creds,
+ expected_error=KDC_ERR_CLIENT_NAME_MISMATCH)
+
+ # Test changing the SID in the PAC to a non-existent one.
+ def test_tgs_sid_mismatch_nonexisting(self):
+ creds = self._get_creds()
+ nonexistent_rid = self._get_non_existent_rid()
+ tgt = self._get_tgt(creds, new_rid=nonexistent_rid)
+ self._run_tgs(tgt, expected_error=KDC_ERR_CLIENT_NAME_MISMATCH)
+
+ def test_renew_sid_mismatch_nonexisting(self):
+ creds = self._get_creds()
+ nonexistent_rid = self._get_non_existent_rid()
+ tgt = self._get_tgt(creds, renewable=True,
+ new_rid=nonexistent_rid)
+ self._renew_tgt(tgt, expected_error=KDC_ERR_CLIENT_NAME_MISMATCH)
+
+ def test_validate_sid_mismatch_nonexisting(self):
+ creds = self._get_creds()
+ nonexistent_rid = self._get_non_existent_rid()
+ tgt = self._get_tgt(creds, invalid=True,
+ new_rid=nonexistent_rid)
+ self._validate_tgt(tgt, expected_error=KDC_ERR_CLIENT_NAME_MISMATCH)
+
+ def test_s4u2self_sid_mismatch_nonexisting(self):
+ creds = self._get_creds()
+ nonexistent_rid = self._get_non_existent_rid()
+ tgt = self._get_tgt(creds, new_rid=nonexistent_rid)
+ self._s4u2self(tgt, creds,
+ expected_error=KDC_ERR_CLIENT_NAME_MISMATCH)
+
+ def test_user2user_sid_mismatch_nonexisting(self):
+ creds = self._get_creds()
+ nonexistent_rid = self._get_non_existent_rid()
+ tgt = self._get_tgt(creds, new_rid=nonexistent_rid)
+ self._user2user(tgt, creds,
+ expected_error=KDC_ERR_CLIENT_NAME_MISMATCH)
+
+ # Test with an RODC-issued ticket where the client is revealed to the RODC.
+ def test_tgs_rodc_revealed(self):
+ creds = self._get_creds(replication_allowed=True,
+ revealed_to_rodc=True)
+ tgt = self._get_tgt(creds, from_rodc=True)
+ self._run_tgs(tgt, expected_error=0)
+
+ def test_renew_rodc_revealed(self):
+ creds = self._get_creds(replication_allowed=True,
+ revealed_to_rodc=True)
+ tgt = self._get_tgt(creds, renewable=True, from_rodc=True)
+ self._renew_tgt(tgt, expected_error=0)
+
+ def test_validate_rodc_revealed(self):
+ creds = self._get_creds(replication_allowed=True,
+ revealed_to_rodc=True)
+ tgt = self._get_tgt(creds, invalid=True, from_rodc=True)
+ self._validate_tgt(tgt, expected_error=0)
+
+ def test_s4u2self_rodc_revealed(self):
+ creds = self._get_creds(replication_allowed=True,
+ revealed_to_rodc=True)
+ tgt = self._get_tgt(creds, from_rodc=True)
+ self._s4u2self(tgt, creds, expected_error=0)
+
+ def test_user2user_rodc_revealed(self):
+ creds = self._get_creds(replication_allowed=True,
+ revealed_to_rodc=True)
+ tgt = self._get_tgt(creds, from_rodc=True)
+ self._user2user(tgt, creds, expected_error=0)
+
+ # Test with an RODC-issued ticket where the SID in the PAC is changed to
+ # that of another account.
+ def test_tgs_rodc_sid_mismatch_existing(self):
+ creds = self._get_creds(replication_allowed=True,
+ revealed_to_rodc=True)
+ existing_rid = self._get_existing_rid(replication_allowed=True,
+ revealed_to_rodc=True)
+ tgt = self._get_tgt(creds, from_rodc=True, new_rid=existing_rid)
+ self._run_tgs(tgt, expected_error=KDC_ERR_CLIENT_NAME_MISMATCH)
+
+ def test_renew_rodc_sid_mismatch_existing(self):
+ creds = self._get_creds(replication_allowed=True,
+ revealed_to_rodc=True)
+ existing_rid = self._get_existing_rid(replication_allowed=True,
+ revealed_to_rodc=True)
+ tgt = self._get_tgt(creds, renewable=True, from_rodc=True,
+ new_rid=existing_rid)
+ self._renew_tgt(tgt, expected_error=KDC_ERR_CLIENT_NAME_MISMATCH)
+
+ def test_validate_rodc_sid_mismatch_existing(self):
+ creds = self._get_creds(replication_allowed=True,
+ revealed_to_rodc=True)
+ existing_rid = self._get_existing_rid(replication_allowed=True,
+ revealed_to_rodc=True)
+ tgt = self._get_tgt(creds, invalid=True, from_rodc=True,
+ new_rid=existing_rid)
+ self._validate_tgt(tgt, expected_error=KDC_ERR_CLIENT_NAME_MISMATCH)
+
+ def test_s4u2self_rodc_sid_mismatch_existing(self):
+ creds = self._get_creds(replication_allowed=True,
+ revealed_to_rodc=True)
+ existing_rid = self._get_existing_rid(replication_allowed=True,
+ revealed_to_rodc=True)
+ tgt = self._get_tgt(creds, from_rodc=True, new_rid=existing_rid)
+ self._s4u2self(tgt, creds, expected_error=KDC_ERR_CLIENT_NAME_MISMATCH)
+
+ def test_user2user_rodc_sid_mismatch_existing(self):
+ creds = self._get_creds(replication_allowed=True,
+ revealed_to_rodc=True)
+ existing_rid = self._get_existing_rid(replication_allowed=True,
+ revealed_to_rodc=True)
+ tgt = self._get_tgt(creds, from_rodc=True, new_rid=existing_rid)
+ self._user2user(tgt, creds,
+ expected_error=KDC_ERR_CLIENT_NAME_MISMATCH)
+
+ # Test with an RODC-issued ticket where the SID in the PAC is changed to a
+ # non-existent one.
+ def test_tgs_rodc_sid_mismatch_nonexisting(self):
+ creds = self._get_creds(replication_allowed=True,
+ revealed_to_rodc=True)
+ nonexistent_rid = self._get_non_existent_rid()
+ tgt = self._get_tgt(creds, from_rodc=True, new_rid=nonexistent_rid)
+ self._run_tgs(tgt, expected_error=KDC_ERR_CLIENT_NAME_MISMATCH)
+
+ def test_renew_rodc_sid_mismatch_nonexisting(self):
+ creds = self._get_creds(replication_allowed=True,
+ revealed_to_rodc=True)
+ nonexistent_rid = self._get_non_existent_rid()
+ tgt = self._get_tgt(creds, renewable=True, from_rodc=True,
+ new_rid=nonexistent_rid)
+ self._renew_tgt(tgt, expected_error=KDC_ERR_CLIENT_NAME_MISMATCH)
+
+ def test_validate_rodc_sid_mismatch_nonexisting(self):
+ creds = self._get_creds(replication_allowed=True,
+ revealed_to_rodc=True)
+ nonexistent_rid = self._get_non_existent_rid()
+ tgt = self._get_tgt(creds, invalid=True, from_rodc=True,
+ new_rid=nonexistent_rid)
+ self._validate_tgt(tgt, expected_error=KDC_ERR_CLIENT_NAME_MISMATCH)
+
+ def test_s4u2self_rodc_sid_mismatch_nonexisting(self):
+ creds = self._get_creds(replication_allowed=True,
+ revealed_to_rodc=True)
+ nonexistent_rid = self._get_non_existent_rid()
+ tgt = self._get_tgt(creds, from_rodc=True, new_rid=nonexistent_rid)
+ self._s4u2self(tgt, creds, expected_error=KDC_ERR_CLIENT_NAME_MISMATCH)
+
+ def test_user2user_rodc_sid_mismatch_nonexisting(self):
+ creds = self._get_creds(replication_allowed=True,
+ revealed_to_rodc=True)
+ nonexistent_rid = self._get_non_existent_rid()
+ tgt = self._get_tgt(creds, from_rodc=True, new_rid=nonexistent_rid)
+ self._user2user(tgt, creds,
+ expected_error=KDC_ERR_CLIENT_NAME_MISMATCH)
+
+ # Test with an RODC-issued ticket where the client is not revealed to the
+ # RODC.
+ def test_tgs_rodc_not_revealed(self):
+ creds = self._get_creds(replication_allowed=True)
+ tgt = self._get_tgt(creds, from_rodc=True)
+ # TODO: error code
+ self._run_tgs(tgt, expected_error=KDC_ERR_TGT_REVOKED)
+
+ def test_renew_rodc_not_revealed(self):
+ creds = self._get_creds(replication_allowed=True)
+ tgt = self._get_tgt(creds, renewable=True, from_rodc=True)
+ self._renew_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
+
+ def test_validate_rodc_not_revealed(self):
+ creds = self._get_creds(replication_allowed=True)
+ tgt = self._get_tgt(creds, invalid=True, from_rodc=True)
+ self._validate_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
+
+ def test_s4u2self_rodc_not_revealed(self):
+ creds = self._get_creds(replication_allowed=True)
+ tgt = self._get_tgt(creds, from_rodc=True)
+ self._s4u2self(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
+
+ def test_user2user_rodc_not_revealed(self):
+ creds = self._get_creds(replication_allowed=True)
+ tgt = self._get_tgt(creds, from_rodc=True)
+ self._user2user(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
+
+ # Test with an RODC-issued ticket where the RODC account does not have the
+ # PARTIAL_SECRETS bit set.
+ def test_tgs_rodc_no_partial_secrets(self):
+ creds = self._get_creds(replication_allowed=True,
+ revealed_to_rodc=True)
+ tgt = self._get_tgt(creds, from_rodc=True)
+ self._remove_rodc_partial_secrets()
+ self._run_tgs(tgt, expected_error=KDC_ERR_POLICY)
+
+ def test_renew_rodc_no_partial_secrets(self):
+ creds = self._get_creds(replication_allowed=True,
+ revealed_to_rodc=True)
+ tgt = self._get_tgt(creds, renewable=True, from_rodc=True)
+ self._remove_rodc_partial_secrets()
+ self._renew_tgt(tgt, expected_error=KDC_ERR_POLICY)
+
+ def test_validate_rodc_no_partial_secrets(self):
+ creds = self._get_creds(replication_allowed=True,
+ revealed_to_rodc=True)
+ tgt = self._get_tgt(creds, invalid=True, from_rodc=True)
+ self._remove_rodc_partial_secrets()
+ self._validate_tgt(tgt, expected_error=KDC_ERR_POLICY)
+
+ def test_s4u2self_rodc_no_partial_secrets(self):
+ creds = self._get_creds(replication_allowed=True,
+ revealed_to_rodc=True)
+ tgt = self._get_tgt(creds, from_rodc=True)
+ self._remove_rodc_partial_secrets()
+ self._s4u2self(tgt, creds, expected_error=KDC_ERR_POLICY)
+
+ def test_user2user_rodc_no_partial_secrets(self):
+ creds = self._get_creds(replication_allowed=True,
+ revealed_to_rodc=True)
+ tgt = self._get_tgt(creds, from_rodc=True)
+ self._remove_rodc_partial_secrets()
+ self._user2user(tgt, creds, expected_error=KDC_ERR_POLICY)
+
+ # Test with an RODC-issued ticket where the RODC account does not have an
+ # msDS-KrbTgtLink.
+ def test_tgs_rodc_no_krbtgt_link(self):
+ creds = self._get_creds(replication_allowed=True,
+ revealed_to_rodc=True)
+ tgt = self._get_tgt(creds, from_rodc=True)
+ self._remove_rodc_krbtgt_link()
+ self._run_tgs(tgt, expected_error=KDC_ERR_POLICY)
+
+ def test_renew_rodc_no_krbtgt_link(self):
+ creds = self._get_creds(replication_allowed=True,
+ revealed_to_rodc=True)
+ tgt = self._get_tgt(creds, renewable=True, from_rodc=True)
+ self._remove_rodc_krbtgt_link()
+ self._renew_tgt(tgt, expected_error=KDC_ERR_POLICY)
+
+ def test_validate_rodc_no_krbtgt_link(self):
+ creds = self._get_creds(replication_allowed=True,
+ revealed_to_rodc=True)
+ tgt = self._get_tgt(creds, invalid=True, from_rodc=True)
+ self._remove_rodc_krbtgt_link()
+ self._validate_tgt(tgt, expected_error=KDC_ERR_POLICY)
+
+ def test_s4u2self_rodc_no_krbtgt_link(self):
+ creds = self._get_creds(replication_allowed=True,
+ revealed_to_rodc=True)
+ tgt = self._get_tgt(creds, from_rodc=True)
+ self._remove_rodc_krbtgt_link()
+ self._s4u2self(tgt, creds, expected_error=KDC_ERR_POLICY)
+
+ def test_user2user_rodc_no_krbtgt_link(self):
+ creds = self._get_creds(replication_allowed=True,
+ revealed_to_rodc=True)
+ tgt = self._get_tgt(creds, from_rodc=True)
+ self._remove_rodc_krbtgt_link()
+ self._user2user(tgt, creds, expected_error=KDC_ERR_POLICY)
+
+ # Test with an RODC-issued ticket where the client is not allowed to
+ # replicate to the RODC.
+ def test_tgs_rodc_not_allowed(self):
+ creds = self._get_creds(revealed_to_rodc=True)
+ tgt = self._get_tgt(creds, from_rodc=True)
+ self._run_tgs(tgt, expected_error=KDC_ERR_TGT_REVOKED)
+
+ def test_renew_rodc_not_allowed(self):
+ creds = self._get_creds(revealed_to_rodc=True)
+ tgt = self._get_tgt(creds, renewable=True, from_rodc=True)
+ self._renew_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
+
+ def test_validate_rodc_not_allowed(self):
+ creds = self._get_creds(revealed_to_rodc=True)
+ tgt = self._get_tgt(creds, invalid=True, from_rodc=True)
+ self._validate_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
+
+ def test_s4u2self_rodc_not_allowed(self):
+ creds = self._get_creds(revealed_to_rodc=True)
+ tgt = self._get_tgt(creds, from_rodc=True)
+ self._s4u2self(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
+
+ def test_user2user_rodc_not_allowed(self):
+ creds = self._get_creds(revealed_to_rodc=True)
+ tgt = self._get_tgt(creds, from_rodc=True)
+ self._user2user(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
+
+ # Test with an RODC-issued ticket where the client is denied from
+ # replicating to the RODC.
+ def test_tgs_rodc_denied(self):
+ creds = self._get_creds(replication_denied=True,
+ revealed_to_rodc=True)
+ tgt = self._get_tgt(creds, from_rodc=True)
+ self._run_tgs(tgt, expected_error=KDC_ERR_TGT_REVOKED)
+
+ def test_renew_rodc_denied(self):
+ creds = self._get_creds(replication_denied=True,
+ revealed_to_rodc=True)
+ tgt = self._get_tgt(creds, renewable=True, from_rodc=True)
+ self._renew_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
+
+ def test_validate_rodc_denied(self):
+ creds = self._get_creds(replication_denied=True,
+ revealed_to_rodc=True)
+ tgt = self._get_tgt(creds, invalid=True, from_rodc=True)
+ self._validate_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
+
+ def test_s4u2self_rodc_denied(self):
+ creds = self._get_creds(replication_denied=True,
+ revealed_to_rodc=True)
+ tgt = self._get_tgt(creds, from_rodc=True)
+ self._s4u2self(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
+
+ def test_user2user_rodc_denied(self):
+ creds = self._get_creds(replication_denied=True,
+ revealed_to_rodc=True)
+ tgt = self._get_tgt(creds, from_rodc=True)
+ self._user2user(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
+
+ # Test with an RODC-issued ticket where the client is both allowed and
+ # denied replicating to the RODC.
+ def test_tgs_rodc_allowed_denied(self):
+ creds = self._get_creds(replication_allowed=True,
+ replication_denied=True,
+ revealed_to_rodc=True)
+ tgt = self._get_tgt(creds, from_rodc=True)
+ self._run_tgs(tgt, expected_error=KDC_ERR_TGT_REVOKED)
+
+ def test_renew_rodc_allowed_denied(self):
+ creds = self._get_creds(replication_allowed=True,
+ replication_denied=True,
+ revealed_to_rodc=True)
+ tgt = self._get_tgt(creds, renewable=True, from_rodc=True)
+ self._renew_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
+
+ def test_validate_rodc_allowed_denied(self):
+ creds = self._get_creds(replication_allowed=True,
+ replication_denied=True,
+ revealed_to_rodc=True)
+ tgt = self._get_tgt(creds, invalid=True, from_rodc=True)
+ self._validate_tgt(tgt, expected_error=KDC_ERR_TGT_REVOKED)
+
+ def test_s4u2self_rodc_allowed_denied(self):
+ creds = self._get_creds(replication_allowed=True,
+ replication_denied=True,
+ revealed_to_rodc=True)
+ tgt = self._get_tgt(creds, from_rodc=True)
+ self._s4u2self(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
+
+ def test_user2user_rodc_allowed_denied(self):
+ creds = self._get_creds(replication_allowed=True,
+ replication_denied=True,
+ revealed_to_rodc=True)
+ tgt = self._get_tgt(creds, from_rodc=True)
+ self._user2user(tgt, creds, expected_error=KDC_ERR_TGT_REVOKED)
+
+ # Test user-to-user with incorrect service principal names.
+ def test_user2user_matching_sname_host(self):
+ creds = self._get_creds()
+ tgt = self._get_tgt(creds)
+
+ user_name = self._get_mach_creds().get_username()
+ sname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
+ names=['host', user_name])
+
+ self._user2user(tgt, creds, sname=sname,
+ expected_error=KDC_ERR_S_PRINCIPAL_UNKNOWN)
+
+ def test_user2user_matching_sname_no_host(self):
+ creds = self._get_creds()
+ tgt = self._get_tgt(creds)
+
+ user_name = self._get_mach_creds().get_username()
+ sname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
+ names=[user_name])
+
+ self._user2user(tgt, creds, sname=sname,
+ expected_error=KDC_ERR_BADMATCH)
+
+ def test_user2user_wrong_sname(self):
+ creds = self._get_creds()
+ tgt = self._get_tgt(creds)
+
+ other_creds = self.get_service_creds()
+ user_name = other_creds.get_username()
+ sname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
+ names=[user_name])
+
+ self._user2user(tgt, creds, sname=sname,
+ expected_error=(KDC_ERR_BADMATCH,
+ KDC_ERR_BADOPTION))
+
+ def test_user2user_non_existent_sname(self):
+ creds = self._get_creds()
+ tgt = self._get_tgt(creds)
+
+ sname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
+ names=['host', 'non_existent_user'])
+
+ self._user2user(tgt, creds, sname=sname,
+ expected_error=KDC_ERR_S_PRINCIPAL_UNKNOWN)
+
+ def test_user2user_service_ticket(self):
+ creds = self._get_creds()
+ tgt = self._get_tgt(creds)
+
+ service_creds = self.get_service_creds()
+ service_ticket = self.get_service_ticket(tgt, service_creds)
+
+ self._user2user(service_ticket, creds, expected_error=KDC_ERR_POLICY)
+
+ def _get_tgt(self,
+ client_creds,
+ renewable=False,
+ invalid=False,
+ from_rodc=False,
+ new_rid=None,
+ remove_pac=False):
+ self.assertFalse(renewable and invalid)
+
+ if remove_pac:
+ self.assertIsNone(new_rid)
+
+ tgt = self.get_tgt(client_creds)
+
+ if from_rodc:
+ krbtgt_creds = self.get_mock_rodc_krbtgt_creds()
+ else:
+ krbtgt_creds = self.get_krbtgt_creds()
+
+ if new_rid is not None:
+ def change_sid_fn(pac):
+ for pac_buffer in pac.buffers:
+ if pac_buffer.type == krb5pac.PAC_TYPE_LOGON_INFO:
+ logon_info = pac_buffer.info.info
+
+ logon_info.info3.base.rid = new_rid
+
+ return pac
+
+ modify_pac_fn = change_sid_fn
+ else:
+ modify_pac_fn = None
+
+ krbtgt_key = self.TicketDecryptionKey_from_creds(krbtgt_creds)
+
+ if remove_pac:
+ checksum_keys = None
+ else:
+ checksum_keys = {
+ krb5pac.PAC_TYPE_KDC_CHECKSUM: krbtgt_key
+ }
+
+ if renewable:
+ def set_renewable(enc_part):
+ # Set the renewable flag.
+ renewable_flag = krb5_asn1.TicketFlags('renewable')
+ pos = len(tuple(renewable_flag)) - 1
+
+ flags = enc_part['flags']
+ self.assertLessEqual(pos, len(flags))
+
+ new_flags = flags[:pos] + '1' + flags[pos + 1:]
+ enc_part['flags'] = new_flags
+
+ # Set the renew-till time to be in the future.
+ renew_till = self.get_KerberosTime(offset=100 * 60 * 60)
+ enc_part['renew-till'] = renew_till
+
+ return enc_part
+
+ modify_fn = set_renewable
+ elif invalid:
+ def set_invalid(enc_part):
+ # Set the invalid flag.
+ invalid_flag = krb5_asn1.TicketFlags('invalid')
+ pos = len(tuple(invalid_flag)) - 1
+
+ flags = enc_part['flags']
+ self.assertLessEqual(pos, len(flags))
+
+ new_flags = flags[:pos] + '1' + flags[pos + 1:]
+ enc_part['flags'] = new_flags
+
+ # Set the ticket start time to be in the past.
+ past_time = self.get_KerberosTime(offset=-100 * 60 * 60)
+ enc_part['starttime'] = past_time
+
+ return enc_part
+
+ modify_fn = set_invalid
+ else:
+ modify_fn = None
+
+ return self.modified_ticket(
+ tgt,
+ new_ticket_key=krbtgt_key,
+ modify_fn=modify_fn,
+ modify_pac_fn=modify_pac_fn,
+ exclude_pac=remove_pac,
+ update_pac_checksums=not remove_pac,
+ checksum_keys=checksum_keys)
+
+ def _remove_rodc_partial_secrets(self):
+ samdb = self.get_samdb()
+
+ rodc_ctx = self.get_mock_rodc_ctx()
+ rodc_dn = ldb.Dn(samdb, rodc_ctx.acct_dn)
+
+ def add_rodc_partial_secrets():
+ msg = ldb.Message()
+ msg.dn = rodc_dn
+ msg['userAccountControl'] = ldb.MessageElement(
+ str(rodc_ctx.userAccountControl),
+ ldb.FLAG_MOD_REPLACE,
+ 'userAccountControl')
+ samdb.modify(msg)
+
+ self.addCleanup(add_rodc_partial_secrets)
+
+ uac = rodc_ctx.userAccountControl & ~dsdb.UF_PARTIAL_SECRETS_ACCOUNT
+
+ msg = ldb.Message()
+ msg.dn = rodc_dn
+ msg['userAccountControl'] = ldb.MessageElement(
+ str(uac),
+ ldb.FLAG_MOD_REPLACE,
+ 'userAccountControl')
+ samdb.modify(msg)
+
+ def _remove_rodc_krbtgt_link(self):
+ samdb = self.get_samdb()
+
+ rodc_ctx = self.get_mock_rodc_ctx()
+ rodc_dn = ldb.Dn(samdb, rodc_ctx.acct_dn)
+
+ def add_rodc_krbtgt_link():
+ msg = ldb.Message()
+ msg.dn = rodc_dn
+ msg['msDS-KrbTgtLink'] = ldb.MessageElement(
+ rodc_ctx.new_krbtgt_dn,
+ ldb.FLAG_MOD_ADD,
+ 'msDS-KrbTgtLink')
+ samdb.modify(msg)
+
+ self.addCleanup(add_rodc_krbtgt_link)
+
+ msg = ldb.Message()
+ msg.dn = rodc_dn
+ msg['msDS-KrbTgtLink'] = ldb.MessageElement(
+ [],
+ ldb.FLAG_MOD_DELETE,
+ 'msDS-KrbTgtLink')
+ samdb.modify(msg)
+
+ def _get_creds(self,
+ replication_allowed=False,
+ replication_denied=False,
+ revealed_to_rodc=False):
+ return self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER,
+ opts={
+ 'allowed_replication_mock': replication_allowed,
+ 'denied_replication_mock': replication_denied,
+ 'revealed_to_mock_rodc': revealed_to_rodc,
+ 'id': 0
+ })
+
+ def _get_existing_rid(self,
+ replication_allowed=False,
+ replication_denied=False,
+ revealed_to_rodc=False):
+ other_creds = self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER,
+ opts={
+ 'allowed_replication_mock': replication_allowed,
+ 'denied_replication_mock': replication_denied,
+ 'revealed_to_mock_rodc': revealed_to_rodc,
+ 'id': 1
+ })
+
+ samdb = self.get_samdb()
+
+ other_dn = other_creds.get_dn()
+ other_sid = self.get_objectSid(samdb, other_dn)
+
+ other_rid = int(other_sid.rsplit('-', 1)[1])
+
+ return other_rid
+
+ def _get_mach_creds(self):
+ return self.get_cached_creds(
+ account_type=self.AccountType.COMPUTER,
+ opts={
+ 'allowed_replication_mock': True,
+ 'denied_replication_mock': False,
+ 'revealed_to_mock_rodc': True,
+ 'id': 2
+ })
+
+ def _get_non_existent_rid(self):
+ return (1 << 30) - 1
+
+ def _run_tgs(self, tgt, expected_error):
+ target_creds = self.get_service_creds()
+ self._tgs_req(tgt, expected_error, target_creds)
+
+ def _renew_tgt(self, tgt, expected_error):
+ krbtgt_creds = self.get_krbtgt_creds()
+ kdc_options = str(krb5_asn1.KDCOptions('renew'))
+ self._tgs_req(tgt, expected_error, krbtgt_creds,
+ kdc_options=kdc_options)
+
+ def _validate_tgt(self, tgt, expected_error):
+ krbtgt_creds = self.get_krbtgt_creds()
+ kdc_options = str(krb5_asn1.KDCOptions('validate'))
+ self._tgs_req(tgt, expected_error, krbtgt_creds,
+ kdc_options=kdc_options)
+
+ def _s4u2self(self, tgt, tgt_creds, expected_error):
+ user_creds = self._get_mach_creds()
+
+ user_name = user_creds.get_username()
+ user_cname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
+ names=[user_name])
+ user_realm = user_creds.get_realm()
+
+ def generate_s4u2self_padata(_kdc_exchange_dict,
+ _callback_dict,
+ req_body):
+ padata = self.PA_S4U2Self_create(
+ name=user_cname,
+ realm=user_realm,
+ tgt_session_key=tgt.session_key,
+ ctype=None)
+
+ return [padata], req_body
+
+ self._tgs_req(tgt, expected_error, tgt_creds,
+ expected_cname=user_cname,
+ generate_padata_fn=generate_s4u2self_padata,
+ expect_claims=False)
+
+ def _user2user(self, tgt, tgt_creds, expected_error, sname=None):
+ user_creds = self._get_mach_creds()
+ user_tgt = self.get_tgt(user_creds)
+
+ kdc_options = str(krb5_asn1.KDCOptions('enc-tkt-in-skey'))
+ self._tgs_req(user_tgt, expected_error, tgt_creds,
+ kdc_options=kdc_options,
+ additional_ticket=tgt,
+ sname=sname)
+
+ def _tgs_req(self, tgt, expected_error, target_creds,
+ kdc_options='0',
+ expected_cname=None,
+ additional_ticket=None,
+ generate_padata_fn=None,
+ sname=None,
+ expect_claims=True):
+ srealm = target_creds.get_realm()
+
+ if sname is None:
+ target_name = target_creds.get_username()
+ if target_name == 'krbtgt':
+ sname = self.PrincipalName_create(name_type=NT_SRV_INST,
+ names=[target_name, srealm])
+ else:
+ if target_name[-1] == '$':
+ target_name = target_name[:-1]
+ sname = self.PrincipalName_create(name_type=NT_PRINCIPAL,
+ names=['host', target_name])
+
+ if additional_ticket is not None:
+ additional_tickets = [additional_ticket.ticket]
+ decryption_key = additional_ticket.session_key
+ else:
+ additional_tickets = None
+ decryption_key = self.TicketDecryptionKey_from_creds(
+ target_creds)
+
+ subkey = self.RandomKey(tgt.session_key.etype)
+
+ etypes = (AES256_CTS_HMAC_SHA1_96, ARCFOUR_HMAC_MD5)
+
+ if expected_error:
+ check_error_fn = self.generic_check_kdc_error
+ check_rep_fn = None
+ else:
+ check_error_fn = None
+ check_rep_fn = self.generic_check_kdc_rep
+
+ if expected_cname is None:
+ expected_cname = tgt.cname
+
+ kdc_exchange_dict = self.tgs_exchange_dict(
+ expected_crealm=tgt.crealm,
+ expected_cname=expected_cname,
+ expected_srealm=srealm,
+ expected_sname=sname,
+ ticket_decryption_key=decryption_key,
+ generate_padata_fn=generate_padata_fn,
+ check_error_fn=check_error_fn,
+ check_rep_fn=check_rep_fn,
+ check_kdc_private_fn=self.generic_check_kdc_private,
+ expected_error_mode=expected_error,
+ tgt=tgt,
+ authenticator_subkey=subkey,
+ kdc_options=kdc_options,
+ expect_edata=False,
+ expect_claims=expect_claims)
+
+ self._generic_kdc_exchange(kdc_exchange_dict,
+ cname=None,
+ realm=srealm,
+ sname=sname,
+ etypes=etypes,
+ additional_tickets=additional_tickets)
+
if __name__ == "__main__":
global_asn1_print = False