# Samba common functions
#
# Copyright (C) Matthieu Patou <mat@matws.net>
+# Copyright (C) Lumir Balhar <lbalhar@redhat.com> 2017
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
-from samba.compat import PY3
+def cmp(x, y):
+ """
+ Replacement for built-in function cmp that was removed in Python 3
-if PY3:
- # cmp() exists only in Python 2
- def cmp(a, b):
- return (a > b) - (a < b)
+ Compare the two objects x and y and return an integer according to
+ the outcome. The return value is negative if x < y, zero if x == y
+ and strictly positive if x > y.
+ """
- raw_input = input
+ return (x > y) - (x < y)
def confirm(msg, forced=False, allow_all=False):
prompt = '[y/N/all/none]'
while True:
- v = raw_input(msg + ' %s ' % prompt)
+ v = input(msg + ' %s ' % prompt)
v = v.upper()
if v in mapping:
return mapping[v]
return str(ivalue)
+# Sometimes in PY3 we have variables whose content can be 'bytes' or
+# 'str' and we can't be sure which. Generally this is because the
+# code variable can be initialised (or reassigned) a value from different
+# api(s) or functions depending on complex conditions or logic. Or another
+# common case is in PY2 the variable is 'type <str>' and in PY3 it is
+# 'class <str>' and the function to use e.g. b64encode requires 'bytes'
+# in PY3. In such cases it would be nice to avoid excessive testing in
+# the client code. Calling such a helper function should be avoided
+# if possible but sometimes this just isn't possible.
+# If a 'str' object is passed in it is encoded using 'utf8' or if 'bytes'
+# is passed in it is returned unchanged.
+# Using this function is PY2/PY3 code should ensure in most cases
+# the PY2 code runs unchanged in PY2 whereas the code in PY3 possibly
+# encodes the variable (see PY2 implementation of this function below)
+def get_bytes(bytesorstring):
+ tmp = bytesorstring
+ if isinstance(bytesorstring, str):
+ tmp = bytesorstring.encode('utf8')
+ elif not isinstance(bytesorstring, bytes):
+ raise ValueError('Expected byte or string for %s:%s' % (type(bytesorstring), bytesorstring))
+ return tmp
+
+# helper function to get a string from a variable that maybe 'str' or
+# 'bytes' if 'bytes' then it is decoded using 'utf8'. If 'str' is passed
+# it is returned unchanged
+# Using this function is PY2/PY3 code should ensure in most cases
+# the PY2 code runs unchanged in PY2 whereas the code in PY3 possibly
+# decodes the variable (see PY2 implementation of this function below)
+def get_string(bytesorstring):
+ tmp = bytesorstring
+ if isinstance(bytesorstring, bytes):
+ tmp = bytesorstring.decode('utf8')
+ elif not isinstance(bytesorstring, str):
+ raise ValueError('Expected byte of string for %s:%s' % (type(bytesorstring), bytesorstring))
+ return tmp
+++ /dev/null
-# module which helps with porting to Python 3
-#
-# Copyright (C) Lumir Balhar <lbalhar@redhat.com> 2017
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-"""module which helps with porting to Python 3"""
-
-import sys
-
-PY3 = sys.version_info[0] == 3
-
-if PY3:
- # Sometimes in PY3 we have variables whose content can be 'bytes' or
- # 'str' and we can't be sure which. Generally this is because the
- # code variable can be initialised (or reassigned) a value from different
- # api(s) or functions depending on complex conditions or logic. Or another
- # common case is in PY2 the variable is 'type <str>' and in PY3 it is
- # 'class <str>' and the function to use e.g. b64encode requires 'bytes'
- # in PY3. In such cases it would be nice to avoid excessive testing in
- # the client code. Calling such a helper function should be avoided
- # if possible but sometimes this just isn't possible.
- # If a 'str' object is passed in it is encoded using 'utf8' or if 'bytes'
- # is passed in it is returned unchanged.
- # Using this function is PY2/PY3 code should ensure in most cases
- # the PY2 code runs unchanged in PY2 whereas the code in PY3 possibly
- # encodes the variable (see PY2 implementation of this function below)
- def get_bytes(bytesorstring):
- tmp = bytesorstring
- if isinstance(bytesorstring, str):
- tmp = bytesorstring.encode('utf8')
- elif not isinstance(bytesorstring, bytes):
- raise ValueError('Expected byte or string for %s:%s' % (type(bytesorstring), bytesorstring))
- return tmp
-
- # helper function to get a string from a variable that maybe 'str' or
- # 'bytes' if 'bytes' then it is decoded using 'utf8'. If 'str' is passed
- # it is returned unchanged
- # Using this function is PY2/PY3 code should ensure in most cases
- # the PY2 code runs unchanged in PY2 whereas the code in PY3 possibly
- # decodes the variable (see PY2 implementation of this function below)
- def get_string(bytesorstring):
- tmp = bytesorstring
- if isinstance(bytesorstring, bytes):
- tmp = bytesorstring.decode('utf8')
- elif not isinstance(bytesorstring, str):
- raise ValueError('Expected byte of string for %s:%s' % (type(bytesorstring), bytesorstring))
- return tmp
-
- def cmp_fn(x, y):
- """
- Replacement for built-in function cmp that was removed in Python 3
-
- Compare the two objects x and y and return an integer according to
- the outcome. The return value is negative if x < y, zero if x == y
- and strictly positive if x > y.
- """
-
- return (x > y) - (x < y)
- # compat functions
- from functools import cmp_to_key as cmp_to_key_fn
-
-
-else:
- raise NotImplementedError("Samba versions >= 4.11 do not support Python 2.x")
from samba.dcerpc.misc import SEC_CHAN_BDC
from samba import gensec
from samba import sd_utils
-from samba.compat import get_string
+from samba.common import get_string
from samba.logger import get_samba_logger
import bisect
from io import BytesIO
from xml.etree.ElementTree import ElementTree, fromstring, tostring
from hashlib import md5
-from samba.compat import get_bytes
+from samba.common import get_bytes
ENTITY_USER_ID = 0
import os.path
from samba.gpclass import gp_inf_ext
from samba.auth import system_session
-from samba.compat import get_string
+from samba.common import get_string
try:
from ldb import LdbError
from samba.samdb import SamDB
from samba import NTSTATUSError
from configparser import ConfigParser
from io import StringIO
-from samba.compat import get_bytes
+from samba.common import get_bytes
from abc import ABCMeta, abstractmethod
import xml.etree.ElementTree as etree
import re
import os
import tempfile
from collections import OrderedDict
-from samba.compat import get_string
+from samba.common import get_string
from samba.netcmd import CommandError
from samba.kcc.debug import DEBUG, DEBUG_FN, logger
from samba.kcc import debug
-from samba.compat import cmp_fn
+from samba.common import cmp
def sort_dsa_by_gc_and_guid(dsa1, dsa2):
return -1
if not dsa1.is_gc() and dsa2.is_gc():
return +1
- return cmp_fn(ndr_pack(dsa1.dsa_guid), ndr_pack(dsa2.dsa_guid))
+ return cmp(ndr_pack(dsa1.dsa_guid), ndr_pack(dsa2.dsa_guid))
def is_smtp_replication_available():
import os
import markdown
import xml.etree.ElementTree as ET
-from samba.compat import get_string
+from samba.common import get_string
# Display specifier updates or otherwise (ignored in forest_update.py)
from samba.remove_dc import remove_dns_references
from samba.auth import system_session
from samba.samdb import SamDB
-from samba.compat import get_bytes
+from samba.common import get_bytes
from subprocess import check_call, CalledProcessError
from . import common
SuperCommand,
Option,
)
-from samba.compat import get_bytes
+from samba.common import get_bytes
from . import common
from samba.netcmd.pso import cmd_domain_passwordsettings_pso
from samba.netcmd.domain_backup import cmd_domain_backup
-from samba.compat import get_string
+from samba.common import get_string
string_version_to_constant = {
"2008_R2": DS_DOMAIN_FUNCTION_2008_R2,
get_utdv_summary,
get_kcc_and_dsas,
)
-from samba.compat import get_string
+from samba.common import get_string
from samba.samdb import get_default_backend_store
def drsuapi_connect(ctx):
)
from collections import defaultdict
from subprocess import check_call, CalledProcessError
-from samba.compat import get_bytes
+from samba.common import get_bytes
import os
import tempfile
from . import common
SuperCommand,
Option,
)
-from samba.compat import get_bytes
-from samba.compat import get_string
+from samba.common import get_bytes
+from samba.common import get_string
from . import common
# python[3]-gpgme is abandoned since ubuntu 1804 and debian 9
)
from samba.samdb import get_default_backend_store
-from samba.compat import get_string
+from samba.common import get_string
def get_domainguid(samdb, domaindn):
res = samdb.search(base=domaindn, scope=ldb.SCOPE_BASE, attrs=["objectGUID"])
from samba.samba3 import passdb
from samba.samba3 import param as s3param
-from samba.compat import get_bytes
+from samba.common import get_bytes
def fetch_uint32(db, key):
try:
from samba.ndr import ndr_unpack, ndr_pack
from samba.dcerpc import drsblobs, misc
from samba.common import normalise_int32
-from samba.compat import get_bytes, cmp
+from samba.common import get_bytes, cmp
from samba.dcerpc import security
import binascii
from samba.ms_schema import read_ms_schema
from samba.ndr import ndr_pack
from samba.samdb import SamDB
-from samba.compat import get_string
+from samba.common import get_string
from samba import dsdb
from ldb import SCOPE_SUBTREE, SCOPE_ONELEVEL
from samba.dsdb import UF_WORKSTATION_TRUST_ACCOUNT, UF_PASSWD_NOTREQD
from samba.dcerpc.misc import SEC_CHAN_WKSTA
from samba.dcerpc.netlogon import NETLOGON_NEG_STRONG_KEYS
-from samba.compat import get_string
+from samba.common import get_string
from samba.dcerpc.windows_event_ids import (
EVT_ID_UNSUCCESSFUL_LOGON,
EVT_LOGON_NETWORK
from samba.auth import system_session
from samba.credentials import Credentials
-from samba.compat import get_string, get_bytes
+from samba.common import get_string, get_bytes
from samba.dcerpc.messaging import AUTH_EVENT_NAME, MSG_AUTH_LOG
from samba.dsdb import UF_NORMAL_ACCOUNT
from samba.messaging import Messaging
import re
import samba.tests
-from samba.compat import get_string
+from samba.common import get_string
COMMAND = "bin/net ads"
# extract keys from non-json version
import samba.tests
from io import StringIO
-from samba.compat import get_string
+from samba.common import get_string
from samba.netcmd.main import cmd_sambatool
from samba.credentials import Credentials
from samba.auth import system_session
from samba.dcerpc import misc
import samba.tests
-from samba.compat import PY3
+from samba.common import cmp
text1 = "76f53846-a7c2-476a-ae2c-20e2b80d7b34"
text2 = "344edffa-330a-4b39-b96e-2c34da52e8b1"
text3 = "00112233-4455-6677-8899-aabbccddeeff"
-if PY3:
- # cmp() exists only in Python 2
- def cmp(a, b):
- return (a > b) - (a < b)
-
-
class GUIDTests(samba.tests.TestCase):
def test_str(self):
import logging
from samba.credentials import Credentials
from samba.gp_msgs_ext import gp_msgs_ext
-from samba.compat import get_bytes
+from samba.common import get_bytes
from samba.dcerpc import preg
from samba.ndr import ndr_pack
import codecs
from samba.tests import TestCase
from samba.credentials import Credentials
from samba import generate_random_bytes as get_random_bytes
-from samba.compat import get_string, get_bytes
+from samba.common import get_string, get_bytes
class Enctype(object):
DES_CRC = 1
import os
from subprocess import Popen, PIPE
from samba.tests.ntlm_auth_base import NTLMAuthTestCase
-from samba.compat import get_string
+from samba.common import get_string
class NTLMAuthHelpersTests(NTLMAuthTestCase):
from samba.messaging import Messaging
from samba.samdb import SamDB
from samba.credentials import Credentials, DONT_USE_KERBEROS
-from samba.compat import get_string
+from samba.common import get_string
from samba.dsdb import (
UF_WORKSTATION_TRUST_ACCOUNT,
UF_PASSWD_NOTREQD)
from samba.ndr import ndr_pack
from samba.samdb import SamDB
from samba import NTSTATUSError, ntstatus
-from samba.compat import get_string
+from samba.common import get_string
import ctypes
from samba.tests.samba_tool.base import SambaToolCmdTest
from samba.tests import BlackboxProcessError
from samba.tests import check_help_consistency
-from samba.compat import get_string
+from samba.common import get_string
class HelpTestCase(SambaToolCmdTest):
)
from samba.ndr import ndr_unpack
from samba.dcerpc import drsblobs
-from samba.compat import get_bytes
-from samba.compat import get_string
+from samba.common import get_bytes
+from samba.common import get_string
from samba.tests import env_loadparm
import shutil
import samba
-from samba.compat import cmp_fn
+from samba.common import cmp
from samba import Ldb, version, ntacls
from ldb import SCOPE_SUBTREE, SCOPE_ONELEVEL, SCOPE_BASE
import ldb
len2 = len(tab2) - 1
# Note: python range go up to upper limit but do not include it
for i in range(0, minimum):
- ret = cmp_fn(tab1[len1 - i], tab2[len2 - i])
+ ret = cmp(tab1[len1 - i], tab2[len2 - i])
if ret != 0:
return ret
else:
flapping=flapping)
try:
- from samba.compat import PY3
from io import TextIOWrapper as TextIOWrapper
- if PY3:
- forgiving_stdin = TextIOWrapper(sys.stdin.buffer, errors='ignore', encoding='utf-8')
- else:
- forgiving_stdin = sys.stdin
+ forgiving_stdin = TextIOWrapper(sys.stdin.buffer, errors='ignore', encoding='utf-8')
ret = subunithelper.parse_results(msg_ops, statistics, forgiving_stdin)
except subunithelper.ImmediateFail:
sys.stdout.flush()
import sys
import os
import subprocess
-from samba.compat import get_string
+from samba.common import get_string
if len(sys.argv) != 3:
import samba
from samba.tests.subunitrun import SubunitOptions, TestProgram
-from samba.compat import get_string
+from samba.common import get_string
import samba.getopt as options
from samba.join import DCJoinContext
from samba.samdb import SamDB
from samba.tests import delete_force
from samba import dsdb
-from samba.compat import get_string
+from samba.common import get_string
parser = optparse.OptionParser("deletetest.py [options] <host|file>")
sambaopts = options.SambaOptions(parser)
from samba.ndr import ndr_pack, ndr_unpack
from samba.dcerpc import security, lsa
from samba.tests import delete_force
-from samba.compat import get_string
+from samba.common import get_string
parser = optparse.OptionParser("ldap.py [options] <host>")
sambaopts = options.SambaOptions(parser)
from samba.credentials import Credentials, DONT_USE_KERBEROS
from samba.auth import system_session
-from samba.compat import get_string
+from samba.common import get_string
from ldb import SCOPE_BASE, LdbError
from ldb import ERR_NO_SUCH_OBJECT, ERR_ATTRIBUTE_OR_VALUE_EXISTS
sys.path.insert(0, "bin/python")
import samba
from samba.tests.subunitrun import SubunitOptions, TestProgram
-from samba.compat import cmp_fn
-from samba.compat import cmp_to_key_fn
+from samba.common import cmp
+from functools import cmp_to_key
import samba.getopt as options
from samba.auth import system_session
return locale.strcoll(a[0], b[0])
def cmp_binary(a, b):
- return cmp_fn(a[0], b[0])
+ return cmp(a[0], b[0])
def cmp_numeric(a, b):
- return cmp_fn(int(a[0]), int(b[0]))
+ return cmp(int(a[0]), int(b[0]))
# For testing simplicity, the attributes in here need to be
# unique for each user. Otherwise there are multiple possible
for sort_attr, result_attr in attr_pairs:
forward = sorted(((norm(x[sort_attr]), norm(x[result_attr]))
for x in self.users),
- key=cmp_to_key_fn(sort_functions[sort_attr]))
+ key=cmp_to_key(sort_functions[sort_attr]))
reverse = list(reversed(forward))
for rev in (0, 1):
from samba.dcerpc import drsblobs
from samba.dcerpc.drsuapi import *
from samba.tests.password_test import PasswordCommon
-from samba.compat import get_string
+from samba.common import get_string
import samba.tests
from ldb import (SCOPE_BASE, FLAG_MOD_ADD, FLAG_MOD_DELETE, FLAG_MOD_REPLACE, Dn, Message,
from samba.auth import system_session
import ldb
from samba.samdb import SamDB
-from samba.compat import get_bytes
-from samba.compat import get_string
+from samba.common import get_bytes
+from samba.common import get_string
import time
from samba.netcmd.dns import cmd_dns
from samba import gensec
from samba.kcc import kcc_utils
-from samba.compat import get_string
+from samba.common import get_string
import ldb
import dns.resolver
from samba.auth import system_session
from samba.samdb import SamDB
from samba.credentials import Credentials, DONT_USE_KERBEROS
-from samba.compat import get_string
+from samba.common import get_string
parser = optparse.OptionParser("samba_spnupdate")
sambaopts = options.SambaOptions(parser)
increment_calculated_keyversion_number,
print_provision_ranges)
from samba.xattr import copytree_with_xattrs
-from samba.compat import cmp_to_key_fn
+from functools import cmp_to_key
# make sure the script dies immediately when hitting control-C,
# rather than raising KeyboardInterrupt. As we do all database
# Sort the missing object in order to have object of the lowest level
# first (which can be containers for higher level objects)
- listMissing.sort(key=cmp_to_key_fn(dn_sort))
- listPresent.sort(key=cmp_to_key_fn(dn_sort))
+ listMissing.sort(key=cmp_to_key(dn_sort))
+ listPresent.sort(key=cmp_to_key(dn_sort))
# The following lines is to load the up to
# date schema into our current LDB
Message,
FLAG_MOD_REPLACE,
)
-from samba.compat import cmp_fn
-from samba.compat import get_string
+from samba.common import cmp
+from samba.common import get_string
class DrsBaseTestCase(SambaToolCmdTest):
print("AbstractLink.__internal_cmp__(%r, %r) => wrong type" % (self, other))
return NotImplemented
- c = cmp_fn(self.selfGUID_blob, other.selfGUID_blob)
+ c = cmp(self.selfGUID_blob, other.selfGUID_blob)
if c != 0:
if verbose:
print("AbstractLink.__internal_cmp__(%r, %r) => %d different identifier" % (self, other, c))
print("AbstractLink.__internal_cmp__(%r, %r) => %d different FLAG_ACTIVE" % (self, other, c))
return c
- c = cmp_fn(self.targetGUID_blob, other.targetGUID_blob)
+ c = cmp(self.targetGUID_blob, other.targetGUID_blob)
if c != 0:
if verbose:
print("AbstractLink.__internal_cmp__(%r, %r) => %d different target" % (self, other, c))
from samba.dcerpc import drsuapi, misc, drsblobs
from samba.drs_utils import drs_DsBind
from samba.ndr import ndr_unpack, ndr_pack
-from samba.compat import cmp_to_key_fn
-from samba.compat import cmp_fn
+from functools import cmp_to_key
+from samba.common import cmp
def _linked_attribute_compare(la1, la2):
la2, la2_target = la2
# Ascending host object GUID
- c = cmp_fn(ndr_pack(la1.identifier.guid), ndr_pack(la2.identifier.guid))
+ c = cmp(ndr_pack(la1.identifier.guid), ndr_pack(la2.identifier.guid))
if c != 0:
return c
return 1 if la1_active else -1
# Ascending target object GUID
- return cmp_fn(ndr_pack(la1_target), ndr_pack(la2_target))
+ return cmp(ndr_pack(la1_target), ndr_pack(la2_target))
class DrsReplicaSyncTestCase(drs_base.DrsBaseTestCase):
link.identifier.guid,
target_guid) in expected_links)
- no_inactive.sort(key=cmp_to_key_fn(_linked_attribute_compare))
+ no_inactive.sort(key=cmp_to_key(_linked_attribute_compare))
# assert the two arrays are the same
self.assertEqual(len(expected_links), ctr.linked_attributes_count)
link.identifier.guid,
target_guid) in expected_links)
- has_inactive.sort(key=cmp_to_key_fn(_linked_attribute_compare))
+ has_inactive.sort(key=cmp_to_key(_linked_attribute_compare))
# assert the two arrays are the same
self.assertEqual(len(expected_links), ctr.linked_attributes_count)
link.value.blob).guid
no_inactive.append((link, target_guid))
- no_inactive.sort(key=cmp_to_key_fn(_linked_attribute_compare))
+ no_inactive.sort(key=cmp_to_key(_linked_attribute_compare))
# assert the two arrays are the same
self.assertEqual([x[0] for x in no_inactive], ctr.linked_attributes)
import samba.tests
import time
import ldb
-from samba.compat import get_string
+from samba.common import get_string
from ldb import (
SCOPE_BASE, LdbError, ERR_NO_SUCH_OBJECT)
import drs_base
from samba.tests import BlackboxProcessError
-from samba.compat import get_string
+from samba.common import get_string
class SambaToolDrsNoDnsTests(drs_base.DrsBaseTestCase):
import json
import ldb
import random
-from samba.compat import get_string
+from samba.common import get_string
GUID_RE = r'[\da-f]{8}-[\da-f]{4}-[\da-f]{4}-[\da-f]{4}-[\da-f]{12}'
HEX8_RE = r'0x[\da-f]{8}'