DRSUAPI_SUPPORTED_EXTENSION_GETCHGREQ_V8,
DRSUAPI_SUPPORTED_EXTENSION_GETCHGREQ_V10)
import re
+from abc import ABCMeta, abstractmethod
class drsException(Exception):
hwm.highest_usn = new_hwm.highest_usn
-class drs_Replicate(object):
- """DRS replication calls"""
+class drs_ReplicatorImplBase(metaclass=ABCMeta):
+ @abstractmethod
+ def process_chunk(
+ self, samdb, level, ctr, schema, req_level, req, first_chunk,
+ ) -> None: ...
+
+ @abstractmethod
+ def supports_ext(self, ext) -> bool: ...
+
+ @abstractmethod
+ def get_nc_changes(self, req_level, req) -> bool: ...
+
+class drs_ReplicatorImpl(drs_ReplicatorImplBase):
def __init__(self, binding_string, lp, creds, samdb, invocation_id):
self.drs = drsuapi.drsuapi(binding_string, lp, creds)
- (self.drs_handle, self.supports_ext) = drs_DsBind(self.drs)
+ (self.drs_handle, self._supports_ext) = drs_DsBind(self.drs)
self.net = Net(creds=creds, lp=lp)
- self.samdb = samdb
if not isinstance(invocation_id, misc.GUID):
raise RuntimeError("Must supply GUID for invocation_id")
if invocation_id == misc.GUID("00000000-0000-0000-0000-000000000000"):
raise RuntimeError("Must not set GUID 00000000-0000-0000-0000-000000000000 as invocation_id")
- self.replication_state = self.net.replicate_init(self.samdb, lp, self.drs, invocation_id)
+ self.replication_state = self.net.replicate_init(samdb, lp, self.drs, invocation_id)
+
+ def process_chunk(self, samdb, level, ctr, schema, req_level, req, first_chunk):
+ """Processes a single chunk of received replication data"""
+ # pass the replication into the py_net.c python bindings for processing
+ self.net.replicate_chunk(self.replication_state, level, ctr,
+ schema=schema, req_level=req_level, req=req)
+
+ def supports_ext(self, ext) -> bool:
+ return self._supports_ext & ext
+
+ def get_nc_changes(self, req_level, req) -> bool:
+ return self.drs.DsGetNCChanges(self.drs_handle, req_level, req)
+
+
+class drs_Replicator:
+ """DRS replication implementation"""
+
+ def __init__(self, repl, samdb):
+ self.samdb = samdb
+ self.repl = repl
self.more_flags = 0
@staticmethod
object_to_check = object_to_check.next_object
- def process_chunk(self, level, ctr, schema, req_level, req, first_chunk):
- """Processes a single chunk of received replication data"""
- # pass the replication into the py_net.c python bindings for processing
- self.net.replicate_chunk(self.replication_state, level, ctr,
- schema=schema, req_level=req_level, req=req)
-
def replicate(self, dn, source_dsa_invocation_id, destination_dsa_guid,
schema=False, exop=drsuapi.DRSUAPI_EXOP_NONE, rodc=False,
replica_flags=None, full_sync=True, sync_forced=False):
"""replicate a single DN"""
# setup for a GetNCChanges call
- if self.supports_ext & DRSUAPI_SUPPORTED_EXTENSION_GETCHGREQ_V10:
+ if self.repl.supports_ext(DRSUAPI_SUPPORTED_EXTENSION_GETCHGREQ_V10):
req_level = 10
req = drsuapi.DsGetNCChangesRequest10()
req.more_flags = self.more_flags
- elif self.supports_ext & DRSUAPI_SUPPORTED_EXTENSION_GETCHGREQ_V8:
+ elif self.repl.supports_ext(DRSUAPI_SUPPORTED_EXTENSION_GETCHGREQ_V8):
req_level = 8
req = drsuapi.DsGetNCChangesRequest8()
else:
first_chunk = True
while True:
- (level, ctr) = self.drs.DsGetNCChanges(self.drs_handle, req_level, req)
+ (level, ctr) = self.repl.get_nc_changes(req_level, req)
if ctr.first_object is None and ctr.object_count != 0:
raise RuntimeError("DsGetNCChanges: NULL first_object with object_count=%u" % (ctr.object_count))
try:
- self.process_chunk(level, ctr, schema, req_level, req, first_chunk)
+ self.repl.process_chunk(
+ self.samdb, level, ctr, schema, req_level, req, first_chunk
+ )
except WERRORError as e:
# Check if retrying with the GET_TGT flag set might resolve this error
if self._should_retry_with_get_tgt(e.args[0], req):
return (num_objects, num_links)
+class drs_Replicate(drs_Replicator):
+ """DRS replication calls"""
+
+ def __init__(self, binding_string, lp, creds, samdb, invocation_id):
+ repl = drs_ReplicatorImpl(binding_string, lp, creds, samdb, invocation_id)
+ super().__init__(repl, samdb)
+
# Handles the special case of creating a new clone of a DB, while also renaming
# the entire DB's objects on the way through
-class drs_ReplicateRenamer(drs_Replicate):
+class drs_ReplicateRenamer(drs_ReplicatorImplBase):
"""Uses DRS replication to rename the entire DB"""
- def __init__(self, binding_string, lp, creds, samdb, invocation_id,
- old_base_dn, new_base_dn):
- super().__init__(binding_string, lp, creds, samdb, invocation_id)
+ def __init__(self, repl, old_base_dn, new_base_dn):
+ self.repl = repl
self.old_base_dn = old_base_dn
self.new_base_dn = new_base_dn
"""Uses string substitution to replace the base DN"""
return re.sub('%s$' % self.old_base_dn, self.new_base_dn, dn_str)
- def update_name_attr(self, base_obj):
+ @staticmethod
+ def update_name_attr(base_obj, samdb):
"""Updates the 'name' attribute for the base DN object"""
for attr in base_obj.attribute_ctr.attributes:
if attr.attid == DRSUAPI_ATTID_name:
- base_dn = ldb.Dn(self.samdb, base_obj.identifier.dn)
+ base_dn = ldb.Dn(samdb, base_obj.identifier.dn)
new_name = base_dn.get_rdn_value()
attr.value_ctr.values[0].blob = new_name.encode("utf-16-le")
- def rename_top_level_object(self, first_obj):
+ def rename_top_level_object(self, first_obj, samdb):
"""Renames the first/top-level object in a partition"""
old_dn = first_obj.identifier.dn
first_obj.identifier.dn = self.rename_dn(first_obj.identifier.dn)
# we also need to fix up the 'name' attribute for the base DN,
# otherwise the RDNs won't match
if first_obj.identifier.dn == self.new_base_dn:
- self.update_name_attr(first_obj)
+ self.update_name_attr(first_obj, samdb)
- def process_chunk(self, level, ctr, schema, req_level, req, first_chunk):
+ def process_chunk(self, samdb, level, ctr, schema, req_level, req, first_chunk):
"""Processes a single chunk of received replication data"""
# we need to rename the NC in every chunk - this gets used in searches
# rename the first object in each partition. This will cause every
# subsequent object in the partition to be renamed as a side-effect
if first_chunk and ctr.object_count != 0:
- self.rename_top_level_object(ctr.first_object.object)
+ self.rename_top_level_object(ctr.first_object.object, samdb)
# then do the normal repl processing to apply this chunk to our DB
- super().process_chunk(level, ctr, schema, req_level, req, first_chunk)
+ self.repl.process_chunk(samdb, level, ctr, schema, req_level, req, first_chunk)
+
+ def supports_ext(self, ext) -> bool:
+ return self.repl.supports_ext(ext)
+
+ def get_nc_changes(self, req_level, req) -> bool:
+ return self.repl.get_nc_changes(req_level, req)