sys.path.insert(0, 'bin/python')
os.environ['PYTHONUNBUFFERED'] = '1'
-from samba.dcerpc import krb5pac
-from samba.ndr import ndr_unpack, ndr_print
+from samba.dcerpc import krb5pac, claims
+from samba.ndr import ndr_pack, ndr_unpack, ndr_print
from samba.tests import TestCase
class PacClaimsTests(TestCase):
self.assertEqual(pac_unpacked_raw.num_buffers, 8)
self.assertEqual(pac_unpacked_raw.version, 0)
+ def confirm_uncompressed_claims(self, claim_metadata):
+ self.assertEqual(claim_metadata.uncompressed_claims_set_size,
+ 344)
+ claims_set = claim_metadata.claims_set.claims.claims
+ self.assertEqual(claims_set.claims_array_count,
+ 1)
+ claim_arrays = claims_set.claims_arrays
+ self.assertEqual(claim_arrays[0].claims_source_type,
+ claims.CLAIMS_SOURCE_TYPE_AD)
+ self.assertEqual(claim_arrays[0].claims_count,
+ 3)
+ claim_entries = claim_arrays[0].claim_entries
+ self.assertEqual(claim_entries[0].id,
+ '720fd3c3_9')
+ self.assertEqual(claim_entries[0].type,
+ claims.CLAIM_TYPE_BOOLEAN)
+ self.assertEqual(claim_entries[0].values.value_count,
+ 1)
+ self.assertEqual(claim_entries[0].values.values[0],
+ 1)
+
+ self.assertEqual(claim_entries[1].id,
+ '720fd3c3_7')
+ self.assertEqual(claim_entries[1].type,
+ claims.CLAIM_TYPE_STRING)
+ self.assertEqual(claim_entries[1].values.value_count,
+ 3)
+ self.assertEqual(claim_entries[1].values.values[0],
+ "foo")
+ self.assertEqual(claim_entries[1].values.values[1],
+ "bar")
+ self.assertEqual(claim_entries[1].values.values[2],
+ "baz")
+
+ self.assertEqual(claim_entries[2].id,
+ '720fd3c3_8')
+ self.assertEqual(claim_entries[2].type,
+ claims.CLAIM_TYPE_UINT64)
+ self.assertEqual(claim_entries[2].values.value_count,
+ 4)
+ self.assertEqual(claim_entries[2].values.values[0],
+ 655369)
+ self.assertEqual(claim_entries[2].values.values[1],
+ 65543)
+ self.assertEqual(claim_entries[2].values.values[2],
+ 65542)
+ self.assertEqual(claim_entries[2].values.values[3],
+ 65536)
+
def test_unpack_claims_pac_uncompressed(self):
pac = ndr_unpack(krb5pac.PAC_DATA, self.pac_data_uncompressed)
self.assertEqual(pac.buffers[0].type, krb5pac.PAC_TYPE_LOGON_INFO)
self.assertEqual(pac.buffers[0].info.info.info3.base.account_name.string, "720fd3c3_10")
+ self.assertEqual(pac.buffers[5].type, krb5pac.PAC_TYPE_CLIENT_CLAIMS_INFO)
+ self.assertIsNotNone(pac.buffers[5].info.remaining)
+
+ client_claims = ndr_unpack(claims.CLAIMS_SET_METADATA_NDR, pac.buffers[5].info.remaining)
+ claim_metadata = client_claims.claims.metadata
+
+ self.assertEqual(pac.buffers[6].type, krb5pac.PAC_TYPE_ATTRIBUTES_INFO)
+ self.assertEqual(pac.buffers[7].type, krb5pac.PAC_TYPE_REQUESTER_SID)
+
+ self.assertEqual(claim_metadata.compression_format,
+ claims.CLAIMS_COMPRESSION_FORMAT_NONE)
+ self.confirm_uncompressed_claims(claim_metadata)
+
+ def confirm_compressed_claims(self, claim_metadata):
+ self.assertEqual(claim_metadata.uncompressed_claims_set_size,
+ 8232)
+ claims_set = claim_metadata.claims_set.claims.claims
+ self.assertEqual(claims_set.claims_array_count,
+ 1)
+ claim_arrays = claims_set.claims_arrays
+ self.assertEqual(claim_arrays[0].claims_source_type,
+ claims.CLAIMS_SOURCE_TYPE_AD)
+ self.assertEqual(claim_arrays[0].claims_count,
+ 5)
+ claim_entries = claim_arrays[0].claim_entries
+ self.assertEqual(claim_entries[0].id,
+ '720fd3c3_4')
+ self.assertEqual(claim_entries[0].type,
+ claims.CLAIM_TYPE_BOOLEAN)
+ self.assertEqual(claim_entries[0].values.value_count,
+ 1)
+ self.assertEqual(claim_entries[0].values.values[0],
+ 1)
+
+ self.assertEqual(claim_entries[1].id,
+ '720fd3c3_0')
+ self.assertEqual(claim_entries[1].type,
+ claims.CLAIM_TYPE_STRING)
+ self.assertEqual(claim_entries[1].values.value_count,
+ 4)
+ self.assertEqual(claim_entries[1].values.values[0],
+ "A first value.")
+ self.assertEqual(claim_entries[1].values.values[1],
+ "A second value.")
+ self.assertEqual(claim_entries[1].values.values[2],
+ "A third value.")
+
+ self.assertEqual(claim_entries[2].id,
+ '720fd3c3_1')
+ self.assertEqual(claim_entries[2].type,
+ claims.CLAIM_TYPE_STRING)
+ self.assertEqual(claim_entries[2].values.value_count,
+ 3)
+ self.assertEqual(claim_entries[2].values.values[0],
+ "DC=win22,DC=example,DC=com")
+ self.assertEqual(claim_entries[2].values.values[1],
+ "CN=Users,DC=win22,DC=example,DC=com")
+ self.assertEqual(claim_entries[2].values.values[2],
+ "CN=Computers,DC=win22,DC=example,DC=com")
+
+ self.assertEqual(claim_entries[3].id,
+ '720fd3c3_2')
+ self.assertEqual(claim_entries[3].type,
+ claims.CLAIM_TYPE_UINT64)
+ self.assertEqual(claim_entries[3].values.value_count,
+ 4)
+ self.assertEqual(claim_entries[3].values.values[0],
+ 655369)
+ self.assertEqual(claim_entries[3].values.values[1],
+ 65543)
+ self.assertEqual(claim_entries[3].values.values[2],
+ 65542)
+ self.assertEqual(claim_entries[3].values.values[3],
+ 65536)
+
def test_unpack_claims_pac_compressed(self):
pac = ndr_unpack(krb5pac.PAC_DATA, self.pac_data_compressed)
self.assertEqual(pac.buffers[0].type, krb5pac.PAC_TYPE_LOGON_INFO)
self.assertEqual(pac.buffers[0].info.info.info3.base.account_name.string, "720fd3c3_6")
+ self.assertEqual(pac.buffers[5].type, krb5pac.PAC_TYPE_CLIENT_CLAIMS_INFO)
+ self.assertIsNotNone(pac.buffers[5].info.remaining)
+
+ client_claims = ndr_unpack(claims.CLAIMS_SET_METADATA_NDR, pac.buffers[5].info.remaining)
+ claim_metadata = client_claims.claims.metadata
+
+ self.assertEqual(pac.buffers[6].type, krb5pac.PAC_TYPE_ATTRIBUTES_INFO)
+ self.assertEqual(pac.buffers[7].type, krb5pac.PAC_TYPE_REQUESTER_SID)
+
+ self.assertEqual(claim_metadata.compression_format,
+ claims.CLAIMS_COMPRESSION_FORMAT_XPRESS_HUFF)
+ self.assertEqual(claim_metadata.claims_set_size,
+ 553)
+ self.confirm_compressed_claims(claim_metadata)
+
+ def test_repack_claims_pac_uncompressed(self):
+ pac = ndr_unpack(krb5pac.PAC_DATA, self.pac_data_uncompressed)
+ client_claims = ndr_unpack(claims.CLAIMS_SET_METADATA_NDR, pac.buffers[5].info.remaining)
+ client_claims_bytes1 = ndr_pack(client_claims)
+ client_claims2 = ndr_unpack(claims.CLAIMS_SET_METADATA_NDR, client_claims_bytes1)
+ client_claims_bytes2 = ndr_pack(client_claims2)
+ self.assertEqual(client_claims_bytes1, client_claims_bytes2)
+
+ claim_metadata = client_claims2.claims.metadata
+ self.assertEqual(claim_metadata.compression_format,
+ claims.CLAIMS_COMPRESSION_FORMAT_NONE)
+ self.confirm_uncompressed_claims(claim_metadata)
+
+ def test_repack_claims_pac_compressed(self):
+ pac = ndr_unpack(krb5pac.PAC_DATA, self.pac_data_compressed)
+ client_claims = ndr_unpack(claims.CLAIMS_SET_METADATA_NDR, pac.buffers[5].info.remaining)
+ client_claims_bytes1 = ndr_pack(client_claims)
+ client_claims2 = ndr_unpack(claims.CLAIMS_SET_METADATA_NDR, client_claims_bytes1)
+ client_claims_bytes2 = ndr_pack(client_claims2)
+ self.assertEqual(client_claims_bytes1, client_claims_bytes2)
+
+ # This confirms that after compression and decompression, we
+ # still get the values we expect
+ claim_metadata = client_claims2.claims.metadata
+ self.assertEqual(claim_metadata.compression_format,
+ claims.CLAIMS_COMPRESSION_FORMAT_XPRESS_HUFF)
+ self.assertEqual(claim_metadata.claims_set_size,
+ 585)
+ self.confirm_compressed_claims(claim_metadata)
+
+ def test_repack_claims_pac_uncompressed_set_compressed(self):
+ pac = ndr_unpack(krb5pac.PAC_DATA, self.pac_data_uncompressed)
+ client_claims = ndr_unpack(claims.CLAIMS_SET_METADATA_NDR, pac.buffers[5].info.remaining)
+ client_claims.claims.metadata.compression_format = claims.CLAIMS_COMPRESSION_FORMAT_XPRESS_HUFF
+ client_claims_bytes1 = ndr_pack(client_claims)
+ client_claims2 = ndr_unpack(claims.CLAIMS_SET_METADATA_NDR, client_claims_bytes1)
+
+ # Confirm that despite setting FORMAT_XPRESS_HUFF compression is never attempted
+ self.assertEqual(client_claims2.claims.metadata.uncompressed_claims_set_size,
+ 344)
+ self.assertEqual(client_claims2.claims.metadata.claims_set_size,
+ 344)
+ self.assertEqual(client_claims2.claims.metadata.compression_format,
+ claims.CLAIMS_COMPRESSION_FORMAT_NONE)
+
+ # Confirm we match the originally uncompressed sample
+ claim_metadata = client_claims2.claims.metadata
+ self.confirm_uncompressed_claims(claim_metadata)
+
+ # Finally confirm a re-pack gets identical bytes
+ client_claims_bytes2 = ndr_pack(client_claims2)
+ self.assertEqual(client_claims_bytes1, client_claims_bytes2)
+
+
+ def test_repack_claims_pac_compressed_set_uncompressed(self):
+ pac = ndr_unpack(krb5pac.PAC_DATA, self.pac_data_compressed)
+ client_claims = ndr_unpack(claims.CLAIMS_SET_METADATA_NDR, pac.buffers[5].info.remaining)
+ client_claims.claims.metadata.compression_format = claims.CLAIMS_COMPRESSION_FORMAT_NONE
+ client_claims_bytes1 = ndr_pack(client_claims)
+ client_claims2 = ndr_unpack(claims.CLAIMS_SET_METADATA_NDR, client_claims_bytes1)
+
+ # Confirm that by setting FORMAT_NONE compression is never attempted
+ self.assertEqual(client_claims2.claims.metadata.uncompressed_claims_set_size,
+ 8232)
+ self.assertEqual(client_claims2.claims.metadata.claims_set_size,
+ 8232)
+ self.assertEqual(client_claims2.claims.metadata.compression_format,
+ claims.CLAIMS_COMPRESSION_FORMAT_NONE)
+
+ # This confirms that after pack and unpack, despite being
+ # larger than the compression minimum we get add the data and
+ # the values we expect for the originally-compressed data
+ claim_metadata = client_claims2.claims.metadata
+ self.confirm_compressed_claims(claim_metadata)
+
+ # Finally confirm a re-pack gets identical bytes
+ client_claims_bytes2 = ndr_pack(client_claims2)
+ self.assertEqual(client_claims_bytes1, client_claims_bytes2)
+
+ def test_repack_claims_pac_uncompressed_uninit_lengths(self):
+ pac = ndr_unpack(krb5pac.PAC_DATA, self.pac_data_uncompressed)
+ client_claims = ndr_unpack(claims.CLAIMS_SET_METADATA_NDR, pac.buffers[5].info.remaining)
+ # This matches what we expect the KDC to do, which is to ask for compression always
+ client_claims.claims.metadata.compression_format = claims.CLAIMS_COMPRESSION_FORMAT_XPRESS_HUFF
+ client_claims.claims.metadata.uncompressed_claims_set_size = 0
+ client_claims.claims.metadata.claims_set_size = 0
+
+ client_claims_bytes1 = ndr_pack(client_claims)
+ client_claims2 = ndr_unpack(claims.CLAIMS_SET_METADATA_NDR, client_claims_bytes1)
+
+ # Confirm that the NDR code did not compress and sent FORMAT_NONE on the wire
+ self.assertEqual(client_claims2.claims.metadata.uncompressed_claims_set_size,
+ 344)
+ self.assertEqual(client_claims2.claims.metadata.claims_set_size,
+ 344)
+ self.assertEqual(client_claims2.claims.metadata.compression_format,
+ claims.CLAIMS_COMPRESSION_FORMAT_NONE)
+
+ claim_metadata = client_claims2.claims.metadata
+ self.confirm_uncompressed_claims(claim_metadata)
+
+ # Finally confirm a re-pack gets identical bytes
+ client_claims_bytes2 = ndr_pack(client_claims2)
+ self.assertEqual(client_claims_bytes1, client_claims_bytes2)
+
+ def test_repack_claims_pac_compressed_uninit_lengths(self):
+ pac = ndr_unpack(krb5pac.PAC_DATA, self.pac_data_compressed)
+ client_claims = ndr_unpack(claims.CLAIMS_SET_METADATA_NDR, pac.buffers[5].info.remaining)
+ client_claims.claims.metadata.compression_format = claims.CLAIMS_COMPRESSION_FORMAT_XPRESS_HUFF
+ client_claims.claims.metadata.uncompressed_claims_set_size = 0
+ client_claims.claims.metadata.claims_set_size = 0
+
+ client_claims_bytes1 = ndr_pack(client_claims)
+ client_claims2 = ndr_unpack(claims.CLAIMS_SET_METADATA_NDR, client_claims_bytes1)
+
+ # Confirm that despite no lenghts being set, the data is compressed correctly
+ self.assertEqual(client_claims2.claims.metadata.uncompressed_claims_set_size,
+ 8232)
+ self.assertEqual(client_claims2.claims.metadata.claims_set_size,
+ 585)
+ self.assertEqual(client_claims2.claims.metadata.compression_format,
+ claims.CLAIMS_COMPRESSION_FORMAT_XPRESS_HUFF)
+
+ claim_metadata = client_claims2.claims.metadata
+ self.confirm_compressed_claims(claim_metadata)
+
+ # Finally confirm a re-pack gets identical bytes
+ client_claims_bytes2 = ndr_pack(client_claims2)
+ self.assertEqual(client_claims_bytes1, client_claims_bytes2)
if __name__ == '__main__':
import unittest