def get_nc_changes(self, req_level, req) -> bool:
return self.repl.get_nc_changes(req_level, req)
+
+
+class drs_SecretFilter(drs_ReplicatorImplBase):
+ # Objects of these types contain sensitive attributes that cannot simply be
+ # filtered out, because they are required attributes for their class
+ # (mustContain and systemMustContain). Instead of filtering out those
+ # sensitive attributes, we filter out the entire object.
+ object_classes_to_filter_out = {
+ "msKds-ProvRootKey",
+ "msFVE-RecoveryInformation",
+ "msTPM-InformationObject",
+ }
+
+ def __init__(self, repl):
+ self.repl = repl
+
+ def process_chunk(self, samdb, level, ctr, schema, req_level, req, first_chunk):
+ """Processes a single chunk of received replication data"""
+
+ def get_searchFlags(attr: drsuapi.DsReplicaAttribute) -> int:
+ attr_name = samdb.get_lDAPDisplayName_by_attid(attr.attid)
+ return samdb.get_searchFlags_from_lDAPDisplayName(attr_name)
+
+ def is_confidential(attr: drsuapi.DsReplicaAttribute) -> bool:
+ return get_searchFlags(attr) & dsdb.SEARCH_FLAG_CONFIDENTIAL
+
+ prev_obj = ctr
+ obj = ctr.first_object
+ first = True
+ while obj is not None:
+ object_class = None
+ must_contain = set()
+ for attr in obj.object.attribute_ctr.attributes:
+ if (
+ attr.attid == drsuapi.DRSUAPI_ATTID_objectClass
+ and attr.value_ctr.num_values
+ ):
+ object_class = samdb.get_lDAPDisplayName_by_governsID_id(
+ int.from_bytes(
+ attr.value_ctr.values[0].blob, byteorder="little"
+ )
+ )
+ must_contain = samdb.get_must_contain_from_lDAPDisplayName(
+ object_class
+ )
+ break
+
+ if object_class in self.object_classes_to_filter_out:
+ obj = obj.next_object
+ if first:
+ prev_obj.first_object = obj
+ else:
+ prev_obj.next_object = obj
+
+ ctr.object_count -= 1
+ continue
+
+ for attr in filter(is_confidential, obj.object.attribute_ctr.attributes):
+ attr_name = samdb.get_lDAPDisplayName_by_attid(attr.attid)
+ if attr_name in must_contain:
+ print(
+ f"Warning: {attr_name} is a required attribute of {object_class} "
+ f"— not filtering with --no-secrets"
+ )
+ else:
+ attr.value_ctr.num_values = 0
+
+ prev_obj = obj
+ obj = obj.next_object
+ first = False
+
+ # then do the normal repl processing to apply this chunk to our DB
+ self.repl.process_chunk(samdb, level, ctr, schema, req_level, req, first_chunk)
+
+ def supports_ext(self, ext) -> bool:
+ return self.repl.supports_ext(ext)
+
+ def get_nc_changes(self, req_level, req) -> bool:
+ return self.repl.get_nc_changes(req_level, req)
promote_existing=False, plaintext_secrets=False,
backend_store=None,
backend_store_size=None,
- forced_local_samdb=None):
+ forced_local_samdb=None,
+ filter_secrets=False):
ctx.logger = logger
ctx.creds = creds
ctx.plaintext_secrets = plaintext_secrets
ctx.backend_store = backend_store
ctx.backend_store_size = backend_store_size
+ ctx.filter_secrets = filter_secrets
ctx.promote_existing = promote_existing
ctx.promote_from_dn = None
ctx.local_samdb,
ctx.invocation_id,
)
+ if ctx.filter_secrets:
+ repl = drs_utils.drs_SecretFilter(repl)
return repl
def join_replicate(ctx):
targetdir=targetdir, domain=domain,
dns_backend=dns_backend,
backend_store=backend_store,
- backend_store_size=backend_store_size)
+ backend_store_size=backend_store_size,
+ filter_secrets=not include_secrets)
# As we don't want to create or delete these DNs, we set them to None
ctx.server_dn = None