account_type=account_type, opts=opts, use_cache=use_cache
)
- def _as_req(
- self,
- creds,
- target_creds,
- *,
- expect_error=0,
- expect_status=False,
- expected_status=None,
- expect_edata=False,
- etypes=None,
- freshness=None,
- send_enc_ts=False,
- ):
- if send_enc_ts:
- if creds.get_password() is None:
- # Try the NT hash if there isn't a password
- preauth_key = self.PasswordKey_from_creds(creds, kcrypto.Enctype.RC4)
- else:
- preauth_key = self.PasswordKey_from_creds(creds, kcrypto.Enctype.AES256)
- else:
- preauth_key = None
-
- if freshness is not None or send_enc_ts:
-
- def generate_padata_fn(_kdc_exchange_dict, _callback_dict, req_body):
- padata = []
-
- if freshness is not None:
- freshness_padata = self.PA_DATA_create(
- PADATA_AS_FRESHNESS, freshness
- )
- padata.append(freshness_padata)
-
- if send_enc_ts:
- patime, pausec = self.get_KerberosTimeWithUsec()
- enc_ts = self.PA_ENC_TS_ENC_create(patime, pausec)
- enc_ts = self.der_encode(enc_ts, asn1Spec=krb5_asn1.PA_ENC_TS_ENC())
-
- enc_ts = self.EncryptedData_create(
- preauth_key, KU_PA_ENC_TIMESTAMP, enc_ts
- )
- enc_ts = self.der_encode(enc_ts, asn1Spec=krb5_asn1.EncryptedData())
-
- enc_ts = self.PA_DATA_create(PADATA_ENC_TIMESTAMP, enc_ts)
-
- padata.append(enc_ts)
-
- return padata, req_body
- else:
- generate_padata_fn = None
-
- user_name = creds.get_username()
- cname = self.PrincipalName_create(
- name_type=NT_PRINCIPAL, names=user_name.split("/")
- )
-
- target_name = target_creds.get_username()
- target_realm = target_creds.get_realm()
-
- if target_name == "krbtgt":
- sname = self.PrincipalName_create(
- name_type=NT_SRV_INST, names=["krbtgt", target_realm]
- )
- else:
- sname = self.PrincipalName_create(
- name_type=NT_PRINCIPAL, names=["host", target_name[:-1]]
- )
-
- if expect_error:
- check_error_fn = self.generic_check_kdc_error
- check_rep_fn = None
-
- expected_sname = sname
- else:
- check_error_fn = None
- check_rep_fn = self.generic_check_kdc_rep
-
- if target_name == "krbtgt":
- expected_sname = sname
- else:
- expected_sname = self.PrincipalName_create(
- name_type=NT_PRINCIPAL, names=[target_name]
- )
-
- kdc_options = "forwardable,renewable,canonicalize,renewable-ok"
- kdc_options = krb5_asn1.KDCOptions(kdc_options)
-
- ticket_decryption_key = self.TicketDecryptionKey_from_creds(target_creds)
-
- kdc_exchange_dict = self.as_exchange_dict(
- creds=creds,
- expected_crealm=creds.get_realm(),
- expected_cname=cname,
- expected_srealm=target_realm,
- expected_sname=expected_sname,
- expected_supported_etypes=target_creds.tgs_supported_enctypes,
- ticket_decryption_key=ticket_decryption_key,
- generate_padata_fn=generate_padata_fn,
- check_error_fn=check_error_fn,
- check_rep_fn=check_rep_fn,
- check_kdc_private_fn=self.generic_check_kdc_private,
- expected_error_mode=expect_error,
- expected_salt=creds.get_salt(),
- preauth_key=preauth_key,
- kdc_options=str(kdc_options),
- expect_edata=expect_edata,
- expect_status=expect_status,
- expected_status=expected_status,
- )
-
- till = self.get_KerberosTime(offset=36000)
-
- if etypes is None:
- etypes = (
- kcrypto.Enctype.AES256,
- kcrypto.Enctype.RC4,
- )
-
- rep = self._generic_kdc_exchange(
- kdc_exchange_dict,
- cname=cname,
- realm=target_realm,
- sname=sname,
- till_time=till,
- etypes=etypes,
- )
- if expect_error:
- self.check_error_rep(rep, expect_error)
- else:
- self.check_as_reply(rep)
-
- return kdc_exchange_dict
-
def get_ca_cert_and_private_key(self):
# The password with which to try to encrypt the certificate or private
# key specified on the command line.