"""
from hashlib import md5
import os
-import sys
from dogpile.cache.region import make_region
from . import caching_query
-py2k = sys.version_info < (3, 0)
-
-if py2k:
- input = raw_input # noqa
-
# dogpile cache regions. A home base for cache configurations.
regions = {}
from ... import Column
from ... import MetaData
from ... import Table
-from ... import util
from ...ext.compiler import compiles
from ...sql import expression
from ...types import Boolean
impl = Unicode
cache_ok = True
- def process_bind_param(self, value, dialect):
- if util.py2k and isinstance(value, util.binary_type):
- value = value.decode(dialect.encoding)
- return value
-
def bind_expression(self, bindvalue):
return _cast_on_2005(bindvalue)
rp = connection.execute(sql.text(text), params).scalar()
if rp:
- if util.py2k:
- rp = rp.decode(self.encoding)
return rp
else:
return None
from ... import types as sqltypes
from ... import util
from ...engine import cursor as _cursor
-from ...util import compat
class _OracleInteger(sqltypes.Integer):
" cx_oracle" % (bindparam.key, bindparam.type)
)
- if compat.py2k and dbtype in (
- cx_Oracle.CLOB,
- cx_Oracle.NCLOB,
- ):
- outconverter = (
- processors.to_unicode_processor_factory(
- self.dialect.encoding,
- errors=self.dialect.encoding_errors,
- )
- )
- self.out_parameters[name] = self.cursor.var(
- dbtype,
- outconverter=lambda value: outconverter(
- value.read()
- ),
- )
-
- elif dbtype in (
+ if dbtype in (
cx_Oracle.BLOB,
cx_Oracle.CLOB,
cx_Oracle.NCLOB,
self.out_parameters[name] = self.cursor.var(
dbtype, outconverter=lambda value: value.read()
)
- elif compat.py2k and isinstance(
- type_impl, sqltypes.Unicode
- ):
- outconverter = (
- processors.to_unicode_processor_factory(
- self.dialect.encoding,
- errors=self.dialect.encoding_errors,
- )
- )
- self.out_parameters[name] = self.cursor.var(
- dbtype, outconverter=outconverter
- )
else:
self.out_parameters[name] = self.cursor.var(dbtype)
self.parameters[0][
and default_type is not cx_Oracle.CLOB
and default_type is not cx_Oracle.NCLOB
):
- if compat.py2k:
- outconverter = processors.to_unicode_processor_factory(
- dialect.encoding, errors=dialect.encoding_errors
- )
- return cursor.var(
- cx_Oracle.STRING,
- size,
- cursor.arraysize,
- outconverter=outconverter,
- )
- else:
- return cursor.var(
- util.text_type,
- size,
- cursor.arraysize,
- **dialect._cursor_var_unicode_kwargs
- )
+ return cursor.var(
+ util.text_type,
+ size,
+ cursor.arraysize,
+ **dialect._cursor_var_unicode_kwargs
+ )
elif dialect.auto_convert_lobs and default_type in (
cx_Oracle.CLOB,
cx_Oracle.NCLOB,
):
- if compat.py2k:
- outconverter = processors.to_unicode_processor_factory(
- dialect.encoding, errors=dialect.encoding_errors
- )
- return cursor.var(
- cx_Oracle.LONG_STRING,
- size,
- cursor.arraysize,
- outconverter=outconverter,
- )
- else:
- return cursor.var(
- cx_Oracle.LONG_STRING,
- size,
- cursor.arraysize,
- **dialect._cursor_var_unicode_kwargs
- )
+ return cursor.var(
+ cx_Oracle.LONG_STRING,
+ size,
+ cursor.arraysize,
+ **dialect._cursor_var_unicode_kwargs
+ )
elif dialect.auto_convert_lobs and default_type in (
cx_Oracle.BLOB,
comparator_factory = Comparator
def bind_processor(self, dialect):
- if util.py2k:
- encoding = dialect.encoding
-
- def process(value):
- if isinstance(value, dict):
- return _serialize_hstore(value).encode(encoding)
- else:
- return value
-
- else:
-
- def process(value):
- if isinstance(value, dict):
- return _serialize_hstore(value)
- else:
- return value
+ def process(value):
+ if isinstance(value, dict):
+ return _serialize_hstore(value)
+ else:
+ return value
return process
def result_processor(self, dialect, coltype):
- if util.py2k:
- encoding = dialect.encoding
-
- def process(value):
- if value is not None:
- return _parse_hstore(value.decode(encoding))
- else:
- return value
-
- else:
-
- def process(value):
- if value is not None:
- return _parse_hstore(value)
- else:
- return value
+ def process(value):
+ if value is not None:
+ return _parse_hstore(value)
+ else:
+ return value
return process
from .base import _DECIMAL_TYPES
from .base import _FLOAT_TYPES
from .base import _INT_TYPES
-from .base import ENUM
from .base import PGCompiler
from .base import PGDialect
from .base import PGExecutionContext
)
-class _PGEnum(ENUM):
- def result_processor(self, dialect, coltype):
- if util.py2k and self._expect_unicode is True:
- # for py2k, if the enum type needs unicode data (which is set up as
- # part of the Enum() constructor based on values passed as py2k
- # unicode objects) we have to use our own converters since
- # psycopg2's don't work, a rare exception to the "modern DBAPIs
- # support unicode everywhere" theme of deprecating
- # convert_unicode=True. Use the special "force_nocheck" directive
- # which forces unicode conversion to happen on the Python side
- # without an isinstance() check. in py3k psycopg2 does the right
- # thing automatically.
- self._expect_unicode = "force_nocheck"
- return super(_PGEnum, self).result_processor(dialect, coltype)
-
-
class _PGHStore(HSTORE):
def bind_processor(self, dialect):
if dialect._has_native_hstore:
driver = "psycopg2"
supports_statement_cache = True
-
- if util.py2k:
- # turn off supports_unicode_statements for Python 2. psycopg2 supports
- # unicode statements in Py2K. But! it does not support unicode *bound
- # parameter names* because it uses the Python "%" operator to
- # interpolate these into the string, and this fails. So for Py2K, we
- # have to use full-on encoding for statements and parameters before
- # passing to cursor.execute().
- supports_unicode_statements = False
-
supports_server_side_cursors = True
default_paramstyle = "pyformat"
PGDialect.colspecs,
{
sqltypes.Numeric: _PGNumeric,
- ENUM: _PGEnum, # needs force_unicode
- sqltypes.Enum: _PGEnum, # needs force_unicode
HSTORE: _PGHStore,
JSON: _PGJSON,
sqltypes.JSON: _PGJSON,
):
PGDialect.__init__(self, **kwargs)
self.use_native_unicode = use_native_unicode
- if not use_native_unicode and not util.py2k:
+ if not use_native_unicode:
raise exc.ArgumentError(
"psycopg2 native_unicode mode is required under Python 3"
)
def on_connect(self):
extras = self._psycopg2_extras
- extensions = self._psycopg2_extensions
fns = []
if self.client_encoding is not None:
fns.append(on_connect)
- if util.py2k and self.dbapi and self.use_native_unicode:
-
- def on_connect(conn):
- extensions.register_type(extensions.UNICODE, conn)
- extensions.register_type(extensions.UNICODEARRAY, conn)
-
- fns.append(on_connect)
-
if self.dbapi and self.use_native_hstore:
def on_connect(conn):
if hstore_oids is not None:
oid, array_oid = hstore_oids
kw = {"oid": oid}
- if util.py2k:
- kw["unicode"] = True
kw["array_oid"] = array_oid
extras.register_hstore(conn, **kw)
},
)
- if not util.py2k:
- description_encoding = None
+ description_encoding = None
driver = "pysqlite"
@classmethod
def dbapi(cls):
- if util.py2k:
- try:
- from pysqlite2 import dbapi2 as sqlite
- except ImportError:
- try:
- from sqlite3 import dbapi2 as sqlite
- except ImportError as e:
- raise e
- else:
- from sqlite3 import dbapi2 as sqlite
+ from sqlite3 import dbapi2 as sqlite
+
return sqlite
@classmethod
return self.get_isolation_level(dbapi_conn)
def _check_unicode_returns(self, connection, additional_tests=None):
- # this now runs in py2k only and will be removed in 2.0; disabled for
- # Python 3 in all cases under #5315
- if util.py2k and not self.supports_unicode_statements:
- cast_to = util.binary_type
- else:
- cast_to = util.text_type
+ cast_to = util.text_type
if self.positional:
parameters = self.execute_sequence_format()
)
def _check_unicode_description(self, connection):
- # all DBAPIs on Py2K return cursor.description as encoded
-
- if util.py2k and not self.supports_unicode_statements:
- cast_to = util.binary_type
- else:
- cast_to = util.text_type
+ cast_to = util.text_type
cursor = connection.connection.cursor()
try:
def normalize_name(self, name):
if name is None:
return None
- if util.py2k:
- if isinstance(name, str):
- name = name.decode(self.encoding)
name_lower = name.lower()
name_upper = name.upper()
self.identifier_preparer._requires_quotes
)(name_lower):
name = name_upper
- if util.py2k:
- if not self.supports_unicode_binds:
- name = name.encode(self.encoding)
- else:
- name = unicode(name) # noqa
return name
def get_driver_connection(self, connection):
self.executemany = len(parameters) > 1
- # this must occur before create_cursor() since the statement
- # has to be regexed in some cases for server side cursor
- if util.py2k:
- self.unicode_statement = util.text_type(compiled.string)
- else:
- self.unicode_statement = compiled.string
+ self.unicode_statement = compiled.string
self.cursor = self.create_cursor()
# returned them
table._validate_dialect_kwargs(tbl_opts)
- if util.py2k:
- if isinstance(schema, str):
- schema = schema.decode(dialect.encoding)
- if isinstance(table_name, str):
- table_name = table_name.decode(dialect.encoding)
-
found_table = False
cols_by_orig_name = {}
from ..sql.base import HasMemoized
from ..sql.base import InPlaceGenerative
from ..util import collections_abc
-from ..util import py2k
if _baserow_usecext:
class _WithKeys(object):
# used mainly to share documentation on the keys method.
- # py2k does not allow overriding the __doc__ attribute.
def keys(self):
"""Return an iterable view which yields the string keys that would
be represented by each :class:`.Row`.
def __next__(self):
return self._next_impl()
- if py2k:
-
- def next(self): # noqa
- return self._next_impl()
-
def partitions(self, size=None):
"""Iterate through sub-lists of rows of the size given.
def __next__(self):
return self._next_impl()
- if py2k:
-
- def next(self): # noqa
- return self._next_impl()
-
def first(self):
"""Fetch the first object or None if no object is present.
def __next__(self):
return self._next_impl()
- if py2k:
-
- def next(self): # noqa
- return self._next_impl()
-
def first(self):
"""Fetch the first object or None if no object is present.
query = {}
for key, value in util.parse_qsl(components["query"]):
- if util.py2k:
- key = key.encode("ascii")
if key in query:
query[key] = util.to_list(query[key])
query[key].append(value)
def keys(self):
return self.col.keys()
- if util.py2k:
+ def items(self):
+ return ((key, self._get(self.col[key])) for key in self.col)
- def iteritems(self):
- return ((key, self._get(self.col[key])) for key in self.col)
-
- def itervalues(self):
- return (self._get(self.col[key]) for key in self.col)
-
- def iterkeys(self):
- return self.col.iterkeys()
-
- def values(self):
- return [self._get(member) for member in self.col.values()]
-
- def items(self):
- return [(k, self._get(self.col[k])) for k in self]
-
- else:
-
- def items(self):
- return ((key, self._get(self.col[key])) for key in self.col)
-
- def values(self):
- return (self._get(self.col[key]) for key in self.col)
+ def values(self):
+ return (self._get(self.col[key]) for key in self.col)
def pop(self, key, default=_NotProvided):
if default is _NotProvided:
_tidy(__delitem__)
return __delitem__
- if util.py2k:
-
- def __setslice__(fn):
- def __setslice__(self, start, end, values):
- for value in self[start:end]:
- __del(self, value)
- values = [__set(self, value) for value in values]
- fn(self, start, end, values)
-
- _tidy(__setslice__)
- return __setslice__
-
- def __delslice__(fn):
- def __delslice__(self, start, end):
- for value in self[start:end]:
- __del(self, value)
- fn(self, start, end)
-
- _tidy(__delslice__)
- return __delslice__
-
def extend(fn):
def extend(self, iterable):
for value in iterable:
_tidy(pop)
return pop
- if not util.py2k:
-
- def clear(fn):
- def clear(self, index=-1):
- for item in self:
- __del(self, item)
- fn(self)
+ def clear(fn):
+ def clear(self, index=-1):
+ for item in self:
+ __del(self, item)
+ fn(self)
- _tidy(clear)
- return clear
+ _tidy(clear)
+ return clear
# __imul__ : not wrapping this. all members of the collection are already
# present, so no need to fire appends... wrapping it with an explicit
"add",
"mul",
"sub",
- "div",
"mod",
"truediv",
"lt",
from . import util as orm_util
from .. import exc as sa_exc
-from .. import util
class IdentityMap(object):
def __iter__(self):
return iter(self.keys())
- if util.py2k:
-
- def iteritems(self):
- return iter(self.items())
-
- def itervalues(self):
- return iter(self.values())
-
def all_states(self):
- if util.py2k:
- return self._dict.values()
- else:
- return list(self._dict.values())
+ return list(self._dict.values())
def _fast_discard(self, state):
# used by InstanceState for state being
func_vars = util.format_argspec_init(original_init, grouped=False)
func_text = func_body % func_vars
- if util.py2k:
- func = getattr(original_init, "im_func", original_init)
- func_defaults = getattr(func, "func_defaults", None)
- else:
- func_defaults = getattr(original_init, "__defaults__", None)
- func_kw_defaults = getattr(original_init, "__kwdefaults__", None)
+ func_defaults = getattr(original_init, "__defaults__", None)
+ func_kw_defaults = getattr(original_init, "__kwdefaults__", None)
env = locals().copy()
env["__name__"] = __name__
if func_defaults:
__init__.__defaults__ = func_defaults
- if not util.py2k and func_kw_defaults:
+ if func_kw_defaults:
__init__.__kwdefaults__ = func_kw_defaults
return __init__
operators.add: " + ",
operators.mul: " * ",
operators.sub: " - ",
- operators.div: " / ",
operators.mod: " % ",
operators.truediv: " / ",
operators.neg: "-",
else:
return util.text_type(self).upper()
- def __repr__(self):
- if util.py2k:
- backslashed = self.encode("ascii", "backslashreplace")
- if not util.py2k:
- backslashed = backslashed.decode("ascii")
- return "'%s'" % backslashed
- else:
- return str.__repr__(self)
-
def _find_columns(clause):
"""locate Column objects within the given expression."""
from .. import util
-if util.py2k:
- from operator import div
-else:
- div = truediv
-
-
class Operators(object):
"""Base of comparison and logical operators.
"""
return self.reverse_operate(mul, other)
- def __rdiv__(self, other):
- """Implement the ``/`` operator in reverse.
-
- See :meth:`.ColumnOperators.__div__`.
-
- """
- return self.reverse_operate(div, other)
-
def __rmod__(self, other):
"""Implement the ``%`` operator in reverse.
"""
return self.operate(mul, other)
- def __div__(self, other):
- """Implement the ``/`` operator.
-
- In a column context, produces the clause ``a / b``.
-
- """
- return self.operate(div, other)
-
def __mod__(self, other):
"""Implement the ``%`` operator.
return self.operate(mod, other)
def __truediv__(self, other):
- """Implement the ``//`` operator.
+ """Implement the ``/`` operator.
In a column context, produces the clause ``a / b``.
return self.operate(truediv, other)
def __rtruediv__(self, other):
- """Implement the ``//`` operator in reverse.
+ """Implement the ``/`` operator in reverse.
See :meth:`.ColumnOperators.__truediv__`.
json_path_getitem_op: 15,
mul: 8,
truediv: 8,
- div: 8,
mod: 8,
neg: 8,
add: 7,
Integer: self.__class__,
Numeric: Numeric,
},
- operators.div: {Integer: self.__class__, Numeric: Numeric},
operators.truediv: {Integer: self.__class__, Numeric: Numeric},
operators.sub: {Integer: self.__class__, Numeric: Numeric},
}
Numeric: self.__class__,
Integer: self.__class__,
},
- operators.div: {Numeric: self.__class__, Integer: self.__class__},
operators.truediv: {
Numeric: self.__class__,
Integer: self.__class__,
# Python 3 has native bytes() type
# both sqlite3 and pg8000 seem to return it,
# psycopg2 as of 2.5 returns 'memoryview'
- if util.py2k:
-
- def result_processor(self, dialect, coltype):
- return processors.to_str
-
- else:
-
- def result_processor(self, dialect, coltype):
- def process(value):
- if value is not None:
- value = bytes(value)
- return value
+ def result_processor(self, dialect, coltype):
+ def process(value):
+ if value is not None:
+ value = bytes(value)
+ return value
- return process
+ return process
def coerce_compared_value(self, op, value):
"""See :meth:`.TypeEngine.coerce_compared_value` for a description."""
self.validate_strings = kw.pop("validate_strings", False)
if convert_unicode is None:
- for e in self.enums:
- # this is all py2k logic that can go away for py3k only,
- # "expect unicode" will always be implicitly true
- if isinstance(e, util.text_type):
- _expect_unicode = True
- break
- else:
- _expect_unicode = False
+ _expect_unicode = True
else:
_expect_unicode = convert_unicode
operators.sub: {Interval: self.__class__},
operators.mul: {Numeric: self.__class__},
operators.truediv: {Numeric: self.__class__},
- operators.div: {Numeric: self.__class__},
}
@property
return default.StrCompileDialect()
def __str__(self):
- if util.py2k:
- return unicode(self.compile()).encode( # noqa
- "ascii", "backslashreplace"
- ) # noqa
- else:
- return str(self.compile())
+ return str(self.compile())
def __repr__(self):
return util.generic_repr(self)
def _expect_failure(self, config, ex, name="block"):
for fail in self.fails:
if fail(config):
- if util.py2k:
- str_ex = unicode(ex).encode( # noqa: F821
- "utf-8", errors="ignore"
- )
- else:
- str_ex = str(ex)
print(
(
"%s failed as expected (%s): %s "
- % (name, fail._as_string(config), str_ex)
+ % (name, fail._as_string(config), ex)
)
)
break
if to_bootstrap == "pytest":
sys.modules["sqla_plugin_base"] = load_file_as_module("plugin_base")
sys.modules["sqla_plugin_base"].bootstrapped_as_sqlalchemy = True
- if sys.version_info < (3, 0):
- sys.modules["sqla_reinvent_fixtures"] = load_file_as_module(
- "reinvent_fixtures_py2k"
- )
sys.modules["sqla_pytestplugin"] = load_file_as_module("pytestplugin")
else:
raise Exception("unknown bootstrap: %s" % to_bootstrap) # noqa
has_xdist = False
-py2k = sys.version_info < (3, 0)
-if py2k:
- try:
- import sqla_reinvent_fixtures as reinvent_fixtures_py2k
- except ImportError:
- from . import reinvent_fixtures_py2k
-
-
def pytest_addoption(parser):
group = parser.getgroup("sqlalchemy")
else:
newitems.append(item)
- if py2k:
- for item in newitems:
- reinvent_fixtures_py2k.scan_for_fixtures_to_use_for_class(item)
-
# seems like the functions attached to a test class aren't sorted already?
# is that true and why's that? (when using unittest, they're sorted)
items[:] = sorted(
for arg, val in zip(argname_split, param.values):
cls_variables[arg] = val
parametrized_name = "_".join(
- # token is a string, but in py2k pytest is giving us a unicode,
- # so call str() on it.
- str(re.sub(r"\W", "", token))
+ re.sub(r"\W", "", token)
for param in full_param_set
for token in param.id.split("-")
)
if hasattr(cls, "setup_test_class"):
asyncio._maybe_async(cls.setup_test_class)
- if py2k:
- reinvent_fixtures_py2k.run_class_fixture_setup(request)
-
yield
- if py2k:
- reinvent_fixtures_py2k.run_class_fixture_teardown(request)
-
if hasattr(cls, "teardown_test_class"):
asyncio._maybe_async(cls.teardown_test_class)
# 1. function level "autouse" fixtures under py3k (examples: TablesTest
# define tables / data, MappedTest define tables / mappers / data)
- # 2. run homegrown function level "autouse" fixtures under py2k
- if py2k:
- reinvent_fixtures_py2k.run_fn_fixture_setup(request)
+ # 2. was for p2k. no longer applies
# 3. run outer xdist-style setup
if hasattr(self, "setup_test"):
if hasattr(self, "teardown_test"):
asyncio._maybe_async(self.teardown_test)
- # 11. run homegrown function-level "autouse" fixtures under py2k
- if py2k:
- reinvent_fixtures_py2k.run_fn_fixture_teardown(request)
+ # 11. was for p2k. no longer applies
# 12. function level "autouse" fixtures under py3k (examples: TablesTest /
# MappedTest delete table data, possibly drop tables and clear mappers
fn = asyncio._maybe_async_wrapper(fn)
# other wrappers may be added here
- if py2k and "autouse" in kw:
- # py2k workaround for too-slow collection of autouse fixtures
- # in pytest 4.6.11. See notes in reinvent_fixtures_py2k for
- # rationale.
-
- # comment this condition out in order to disable the
- # py2k workaround entirely.
- reinvent_fixtures_py2k.add_fixture(fn, fixture)
- else:
- # now apply FixtureFunctionMarker
- fn = fixture(fn)
+ # now apply FixtureFunctionMarker
+ fn = fixture(fn)
return fn
+++ /dev/null
-"""
-invent a quick version of pytest autouse fixtures as pytest's unacceptably slow
-collection/high memory use in pytest 4.6.11, which is the highest version that
-works in py2k.
-
-by "too-slow" we mean the test suite can't even manage to be collected for a
-single process in less than 70 seconds or so and memory use seems to be very
-high as well. for two or four workers the job just times out after ten
-minutes.
-
-so instead we have invented a very limited form of these fixtures, as our
-current use of "autouse" fixtures are limited to those in fixtures.py.
-
-assumptions for these fixtures:
-
-1. we are only using "function" or "class" scope
-
-2. the functions must be associated with a test class
-
-3. the fixture functions cannot themselves use pytest fixtures
-
-4. the fixture functions must use yield, not return
-
-When py2k support is removed and we can stay on a modern pytest version, this
-can all be removed.
-
-
-"""
-import collections
-
-
-_py2k_fixture_fn_names = collections.defaultdict(set)
-_py2k_class_fixtures = collections.defaultdict(
- lambda: collections.defaultdict(set)
-)
-_py2k_function_fixtures = collections.defaultdict(
- lambda: collections.defaultdict(set)
-)
-
-_py2k_cls_fixture_stack = []
-_py2k_fn_fixture_stack = []
-
-
-def add_fixture(fn, fixture):
- assert fixture.scope in ("class", "function")
- _py2k_fixture_fn_names[fn.__name__].add((fn, fixture.scope))
-
-
-def scan_for_fixtures_to_use_for_class(item):
- test_class = item.parent.parent.obj
-
- for name in _py2k_fixture_fn_names:
- for fixture_fn, scope in _py2k_fixture_fn_names[name]:
- meth = getattr(test_class, name, None)
- if meth and meth.im_func is fixture_fn:
- for sup in test_class.__mro__:
- if name in sup.__dict__:
- if scope == "class":
- _py2k_class_fixtures[test_class][sup].add(meth)
- elif scope == "function":
- _py2k_function_fixtures[test_class][sup].add(meth)
- break
- break
-
-
-def run_class_fixture_setup(request):
-
- cls = request.cls
- self = cls.__new__(cls)
-
- fixtures_for_this_class = _py2k_class_fixtures.get(cls)
-
- if fixtures_for_this_class:
- for sup_ in cls.__mro__:
- for fn in fixtures_for_this_class.get(sup_, ()):
- iter_ = fn(self)
- next(iter_)
-
- _py2k_cls_fixture_stack.append(iter_)
-
-
-def run_class_fixture_teardown(request):
- while _py2k_cls_fixture_stack:
- iter_ = _py2k_cls_fixture_stack.pop(-1)
- try:
- next(iter_)
- except StopIteration:
- pass
-
-
-def run_fn_fixture_setup(request):
- cls = request.cls
- self = request.instance
-
- fixtures_for_this_class = _py2k_function_fixtures.get(cls)
-
- if fixtures_for_this_class:
- for sup_ in reversed(cls.__mro__):
- for fn in fixtures_for_this_class.get(sup_, ()):
- iter_ = fn(self)
- next(iter_)
-
- _py2k_fn_fixture_stack.append(iter_)
-
-
-def run_fn_fixture_teardown(request):
- while _py2k_fn_fixture_stack:
- iter_ = _py2k_fn_fixture_stack.pop(-1)
- try:
- next(iter_)
- except StopIteration:
- pass
return exclusions.only_if(check)
- @property
- def python2(self):
- return exclusions.skip_if(
- lambda: sys.version_info >= (3,),
- "Python version 2.xx is required.",
- )
-
- @property
- def python3(self):
- return exclusions.skip_if(
- lambda: sys.version_info < (3,), "Python version 3.xx is required."
- )
-
- @property
- def pep520(self):
- return self.python36
-
- @property
- def insert_order_dicts(self):
- return self.python37
-
- @property
- def python36(self):
- return exclusions.skip_if(
- lambda: sys.version_info < (3, 6),
- "Python version 3.6 or greater is required.",
- )
-
- @property
- def python37(self):
- return exclusions.skip_if(
- lambda: sys.version_info < (3, 7),
- "Python version 3.7 or greater is required.",
- )
-
- @property
- def dataclasses(self):
- return self.python37
-
@property
def cpython(self):
return exclusions.only_if(
from ... import literal_column
from ... import select
from ... import String
-from ...util import compat
class ExceptionTest(fixtures.TablesTest):
assert str(err.orig) in str(err)
- # test that we are actually getting string on Py2k, unicode
- # on Py3k.
- if compat.py2k:
- assert isinstance(err_str, str)
- else:
- assert isinstance(err_str, str)
+ assert isinstance(err_str, str)
class IsolationLevelTest(fixtures.TestBase):
("numeric", 1234567.89),
# this one "works" because the float value you see here is
# lost immediately to floating point stuff
- ("numeric", 99998969694839.983485848, requirements.python3),
- ("numeric", 99939.983485848, requirements.python3),
+ (
+ "numeric",
+ 99998969694839.983485848,
+ ),
+ ("numeric", 99939.983485848),
("_decimal", decimal.Decimal("1234567.89")),
(
"_decimal",
roundtrip = conn.scalar(select(expr))
eq_(roundtrip, compare_value)
- if util.py3k: # skip py2k to avoid comparing unicode to str etc.
- is_(type(roundtrip), type(compare_value))
+ is_(type(roundtrip), type(compare_value))
@_index_fixtures(True)
@testing.emits_warning(r".*does \*not\* support Decimal objects natively")
from sqlalchemy import Integer
from sqlalchemy import MetaData
from sqlalchemy import testing
-from sqlalchemy import util
from sqlalchemy.testing import eq_
from sqlalchemy.testing import fixtures
from sqlalchemy.testing.schema import Column
t = Table(
ue("\u6e2c\u8a66"), meta, Column(ue("\u6e2c\u8a66_id"), Integer)
)
-
- if util.py2k:
- eq_(
- repr(t),
- (
- "Table('\\u6e2c\\u8a66', MetaData(), "
- "Column('\\u6e2c\\u8a66_id', Integer(), "
- "table=<\u6e2c\u8a66>), "
- "schema=None)"
- ),
- )
- else:
- eq_(
- repr(t),
- (
- "Table('測試', MetaData(), "
- "Column('測試_id', Integer(), "
- "table=<測試>), "
- "schema=None)"
- ),
- )
+ eq_(
+ repr(t),
+ (
+ "Table('測試', MetaData(), "
+ "Column('測試_id', Integer(), "
+ "table=<測試>), "
+ "schema=None)"
+ ),
+ )
from ..util import defaultdict
from ..util import has_refcount_gc
from ..util import inspect_getfullargspec
-from ..util import py2k
if not has_refcount_gc:
def picklers():
picklers = set()
- if py2k:
- try:
- import cPickle
-
- picklers.add(cPickle)
- except ImportError:
- pass
-
import pickle
picklers.add(pickle)
yield pickle_.loads, lambda d: pickle_.dumps(d, protocol)
-if py2k:
-
- def random_choices(population, k=1):
- pop = list(population)
- # lame but works :)
- random.shuffle(pop)
- return pop[0:k]
-
-
-else:
-
- def random_choices(population, k=1):
- return random.choices(population, k=k)
+def random_choices(population, k=1):
+ return random.choices(population, k=k)
def round_decimal(value, prec):
from .compat import perf_counter
from .compat import pickle
from .compat import print_
-from .compat import py2k
from .compat import py37
from .compat import py38
from .compat import py39
from .compat import binary_types
from .compat import collections_abc
from .compat import itertools_filterfalse
-from .compat import py2k
from .compat import py37
from .compat import string_types
from .compat import threading
def items(self):
return [(key, self[key]) for key in self._list]
- if py2k:
-
- def itervalues(self):
- return iter(self.values())
-
- def iterkeys(self):
- return iter(self)
-
- def iteritems(self):
- return iter(self.items())
-
def __setitem__(self, key, obj):
if key not in self:
try:
py38 = sys.version_info >= (3, 8)
py37 = sys.version_info >= (3, 7)
py3k = sys.version_info >= (3, 0)
-py2k = sys.version_info < (3, 0)
pypy = platform.python_implementation() == "PyPy"
% (decorated.__module__,)
)
- if compat.py2k or hasattr(fn, "__func__"):
+ if hasattr(fn, "__func__"):
fn.__func__.__doc__ = doc
if not hasattr(fn.__func__, "_linked_to"):
fn.__func__._linked_to = (decorated, location)
will not be descended.
"""
- if compat.py2k:
- if isinstance(cls, types.ClassType):
- return list()
hier = {cls}
process = list(cls.__mro__)
while process:
c = process.pop()
- if compat.py2k:
- if isinstance(c, types.ClassType):
- continue
- bases = (
- _
- for _ in c.__bases__
- if _ not in hier and not isinstance(_, types.ClassType)
- )
- else:
- bases = (_ for _ in c.__bases__ if _ not in hier)
+ bases = (_ for _ in c.__bases__ if _ not in hier)
for b in bases:
process.append(b)
class CacheKeyTest(fixtures.TestBase):
- # python3 is just to have less variability in test counts
- __requires__ = ("cpython", "python_profiling_backend", "python3")
+ __requires__ = ("cpython", "python_profiling_backend")
@testing.fixture(scope="class")
def mapping_fixture(self):
import threading
from sqlalchemy import exc
-from sqlalchemy import testing
from sqlalchemy.testing import async_test
from sqlalchemy.testing import eq_
from sqlalchemy.testing import expect_raises
await to_await
@async_test
- @testing.requires.python37
async def test_contextvars(self):
import asyncio
import contextvars
class TestAsyncAdaptedQueue(fixtures.TestBase):
- # uses asyncio.run() in alternate threads which is not available
- # in Python 3.6
- __requires__ = ("python37", "greenlet")
+ __requires__ = ("greenlet",)
def test_lazy_init(self):
run = [False]
class SlotsEventsTest(fixtures.TestBase):
- @testing.requires.python3
def test_no_slots_dispatch(self):
class Target(object):
__slots__ = ()
class DocTest(fixtures.TestBase):
- __requires__ = ("python3",)
-
def _setup_logger(self):
rootlogger = logging.getLogger("sqlalchemy.engine.Engine")
from sqlalchemy.testing import assert_raises
from sqlalchemy.testing import assert_raises_message
from sqlalchemy.testing import eq_
-from sqlalchemy.testing import expect_warnings
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import in_
from sqlalchemy.testing import is_
def test_no_sort_legacy_dictionary(self):
d1 = {"c": 1, "b": 2, "a": 3}
-
- if testing.requires.python37.enabled:
- util.sort_dictionary(d1)
- eq_(list(d1), ["a", "b", "c"])
- else:
- assert_raises(AttributeError, util.sort_dictionary, d1)
+ util.sort_dictionary(d1)
+ eq_(list(d1), ["a", "b", "c"])
def test_sort_dictionary(self):
o = util.OrderedDict()
d = subdict(a=1, b=2, c=3)
self._ok(d)
- if util.py2k:
-
- def test_UserDict(self):
- import UserDict
-
- d = UserDict.UserDict(a=1, b=2, c=3)
- self._ok(d)
-
def test_object(self):
self._notok(object())
- if util.py2k:
-
- def test_duck_1(self):
- class duck1(object):
- def iteritems(duck):
- return iter(self.baseline)
-
- self._ok(duck1())
-
def test_duck_2(self):
class duck2(object):
def items(duck):
self._ok(duck2())
- if util.py2k:
-
- def test_duck_3(self):
- class duck3(object):
- def iterkeys(duck):
- return iter(["a", "b", "c"])
-
- def __getitem__(duck, key):
- return dict(a=1, b=2, c=3).get(key)
-
- self._ok(duck3())
-
def test_duck_4(self):
class duck4(object):
def iterkeys(duck):
"apply_kw_proxied": "a, b=b, c=c",
},
False,
- testing.requires.python3,
),
(
py3k_fixtures._kw_plus_posn_fixture,
"apply_kw_proxied": "a, b=b, c=c, *args",
},
False,
- testing.requires.python3,
),
(
py3k_fixtures._kw_opt_fixture,
"apply_kw_proxied": "a, b=b, c=c",
},
False,
- testing.requires.python3,
),
argnames="fn,wanted,grouped",
)
eq_(set(util.class_hierarchy(A)), set((A, B, C, object)))
eq_(set(util.class_hierarchy(B)), set((A, B, C, object)))
- if util.py2k:
-
- def test_oldstyle_mixin(self):
- class A(object):
- pass
-
- class Mixin:
- pass
-
- class B(A, Mixin):
- pass
-
- eq_(set(util.class_hierarchy(B)), set((A, B, object)))
- eq_(set(util.class_hierarchy(Mixin)), set())
- eq_(set(util.class_hierarchy(A)), set((A, B, object)))
-
class ReraiseTest(fixtures.TestBase):
- @testing.requires.python3
def test_raise_from_cause_same_cause(self):
class MyException(Exception):
pass
go()
assert False
except MyOtherException as moe:
- if testing.requires.python3.enabled:
- is_(moe.__cause__, me)
+ is_(moe.__cause__, me)
def test_raise_from(self):
class MyException(Exception):
go()
assert False
except MyOtherException as moe:
- if testing.requires.python3.enabled:
- is_(moe.__cause__, me)
-
- @testing.requires.python2
- def test_safe_reraise_py2k_warning(self):
- class MyException(Exception):
- pass
-
- class MyOtherException(Exception):
- pass
-
- m1 = MyException("exc one")
- m2 = MyOtherException("exc two")
-
- def go2():
- raise m2
-
- def go():
- try:
- raise m1
- except Exception:
- with util.safe_reraise():
- go2()
-
- with expect_warnings(
- "An exception has occurred during handling of a previous "
- "exception. The previous exception "
- "is:.*MyException.*exc one"
- ):
- try:
- go()
- assert False
- except MyOtherException:
- pass
+ is_(moe.__cause__, me)
class TestClassProperty(fixtures.TestBase):
datetime.datetime(2017, 10, 5, 4, 55, 38, tzinfo=timezone(del_)),
)
- @testing.requires.python3
- def test_repr_py3k(self):
+ def test_repr(self):
eq_(
repr(timezone(datetime.timedelta(hours=5))),
"datetime.timezone(%r)" % (datetime.timedelta(hours=5)),
)
- @testing.requires.python2
- def test_repr_py2k(self):
- eq_(
- repr(timezone(datetime.timedelta(hours=5))),
- "sqlalchemy.util.timezone(%r)" % (datetime.timedelta(hours=5)),
- )
-
class TestModuleRegistry(fixtures.TestBase):
def test_modules_are_loaded(self):
from sqlalchemy import String
from sqlalchemy import Table
from sqlalchemy import testing
-from sqlalchemy import util
from sqlalchemy.dialects.mssql import base as mssql
from sqlalchemy.testing import AssertsCompiledSQL
from sqlalchemy.testing import config
from sqlalchemy.testing import fixtures
from sqlalchemy.testing.assertsql import CursorSQL
from sqlalchemy.testing.assertsql import DialectSQL
-from sqlalchemy.util import ue
class IdentityInsertTest(fixtures.TablesTest, AssertsCompiledSQL):
eq_(conn.execute(select(t)).first(), (1, "descrip"))
-class QueryUnicodeTest(fixtures.TestBase):
-
- __only_on__ = "mssql"
- __backend__ = True
-
- @testing.requires.mssql_freetds
- @testing.requires.python2
- @testing.provide_metadata
- def test_convert_unicode(self, connection):
- meta = self.metadata
- t1 = Table(
- "unitest_table",
- meta,
- Column("id", Integer, primary_key=True),
- Column("descr", mssql.MSText()),
- )
- meta.create_all(connection)
- connection.execute(
- ue("insert into unitest_table values ('abc \xc3\xa9 def')").encode(
- "UTF-8"
- )
- )
- r = connection.execute(t1.select()).first()
- assert isinstance(
- r[1], util.text_type
- ), "%s is %s instead of unicode, working on %s" % (
- r[1],
- type(r[1]),
- meta.bind,
- )
- eq_(r[1], util.ue("abc \xc3\xa9 def"))
-
-
class QueryTest(testing.AssertsExecutionResults, fixtures.TestBase):
__only_on__ = "mssql"
__backend__ = True
),
None,
True,
- testing.requires.python37,
),
(
"dto_param_datetime_naive",
assert_raises(UnicodeDecodeError, outconverter, utf8_w_errors)
@_oracle_char_combinations
- @testing.requires.python3
def test_older_cx_oracle_warning(self, cx_Oracle, cx_oracle_type):
cx_Oracle.version = "6.3"
)
@_oracle_char_combinations
- @testing.requires.python2
- def test_encoding_errors_sqla_py2k(
- self,
- cx_Oracle,
- cx_oracle_type,
- ):
- ignore_dialect = cx_oracle.dialect(
- dbapi=cx_Oracle, encoding_errors="ignore"
- )
-
- ignore_outputhandler = (
- ignore_dialect._generate_connection_outputtype_handler()
- )
-
- cursor = mock.Mock()
- ignore_outputhandler(cursor, "foo", cx_oracle_type, None, None, None)
- outconverter = cursor.mock_calls[0][2]["outconverter"]
- self._assert_errorhandler(outconverter, True)
-
- @_oracle_char_combinations
- @testing.requires.python2
- def test_no_encoding_errors_sqla_py2k(
- self,
- cx_Oracle,
- cx_oracle_type,
- ):
- plain_dialect = cx_oracle.dialect(dbapi=cx_Oracle)
-
- plain_outputhandler = (
- plain_dialect._generate_connection_outputtype_handler()
- )
-
- cursor = mock.Mock()
- plain_outputhandler(cursor, "foo", cx_oracle_type, None, None, None)
- outconverter = cursor.mock_calls[0][2]["outconverter"]
- self._assert_errorhandler(outconverter, False)
-
- @_oracle_char_combinations
- @testing.requires.python3
- def test_encoding_errors_cx_oracle_py3k(
+ def test_encoding_errors_cx_oracle(
self,
cx_Oracle,
cx_oracle_type,
)
@_oracle_char_combinations
- @testing.requires.python3
- def test_no_encoding_errors_cx_oracle_py3k(
+ def test_no_encoding_errors_cx_oracle(
self,
cx_Oracle,
cx_oracle_type,
from sqlalchemy.testing.schema import Column
from sqlalchemy.testing.schema import Table
from sqlalchemy.util import b
-from sqlalchemy.util import py2k
from sqlalchemy.util import u
Column("id", Integer, primary_key=True),
Column("data", char_type(30), nullable=False),
)
-
- if py2k and char_type is NCHAR:
- v1, v2, v3 = u"value 1", u"value 2", u"value 3"
- else:
- v1, v2, v3 = "value 1", "value 2", "value 3"
+ v1, v2, v3 = "value 1", "value 2", "value 3"
t.create(connection)
connection.execute(
eq_(sqla_result, cx_oracle_result)
- @testing.only_on("oracle+cx_oracle", "cx_oracle-specific feature")
- @testing.fails_if(
- testing.requires.python3, "cx_oracle always returns unicode on py3k"
- )
def test_coerce_to_unicode(self, connection):
engine = testing_engine(options=dict(coerce_to_unicode=False))
with engine.connect() as conn_no_coerce:
value = exec_sql(
conn_no_coerce, "SELECT 'hello' FROM DUAL"
).scalar()
- assert isinstance(value, util.binary_type)
+ assert not isinstance(value, util.binary_type)
+ assert isinstance(value, util.text_type)
value = exec_sql(connection, "SELECT 'hello' FROM DUAL").scalar()
assert isinstance(value, util.text_type)
from sqlalchemy.dialects.postgresql.psycopg2 import EXECUTEMANY_PLAIN
from sqlalchemy.dialects.postgresql.psycopg2 import EXECUTEMANY_VALUES
from sqlalchemy.engine import cursor as _cursor
-from sqlalchemy.engine import engine_from_config
from sqlalchemy.engine import url
from sqlalchemy.sql.selectable import LABEL_STYLE_TABLENAME_PLUS_COL
from sqlalchemy.testing import config
future_connection.dialect.server_version_info,
)
- @testing.requires.python3
@testing.requires.psycopg2_compatibility
- def test_pg_dialect_no_native_unicode_in_python3(self, testing_engine):
+ def test_pg_dialect_no_native_unicode_in(self, testing_engine):
with testing.expect_raises_message(
exc.ArgumentError,
"psycopg2 native_unicode mode is required under Python 3",
):
testing_engine(options=dict(use_native_unicode=False))
- @testing.requires.python2
- @testing.requires.psycopg2_compatibility
- def test_pg_dialect_no_native_unicode_in_python2(self, testing_engine):
- e = testing_engine(options=dict(use_native_unicode=False))
- with e.connect() as conn:
- eq_(
- conn.exec_driver_sql(u"SELECT '🐍 voix m’a réveillé'").scalar(),
- u"🐍 voix m’a réveillé".encode("utf-8"),
- )
-
- @testing.requires.python2
- @testing.requires.psycopg2_compatibility
- def test_pg_dialect_use_native_unicode_from_config(self):
- config = {
- "sqlalchemy.url": testing.db.url,
- "sqlalchemy.use_native_unicode": "false",
- }
-
- e = engine_from_config(config, _initialize=False)
- eq_(e.dialect.use_native_unicode, False)
-
- config = {
- "sqlalchemy.url": testing.db.url,
- "sqlalchemy.use_native_unicode": "true",
- }
-
- e = engine_from_config(config, _initialize=False)
- eq_(e.dialect.use_native_unicode, True)
-
def test_psycopg2_empty_connection_string(self):
dialect = psycopg2_dialect.dialect()
u = url.make_url("postgresql://")
eq_(connection.execute(sel).fetchall(), [(["foo", "bar"],)])
@_enum_combinations
- @testing.requires.python3
def test_array_of_enums_native_roundtrip(
self, array_of_enum_fixture, connection, array_cls, enum_cls
):
)
)
- @testing.requires.insert_order_dicts
@testing.only_on("sqlite+pysqlite")
def test_isolation_level_message(self):
# needs to test that all three words are present and we also
@testing.provide_metadata
def test_description_encoding(self, connection):
- # amazingly, pysqlite seems to still deliver cursor.description
- # as encoded bytes in py2k
-
t = Table(
"x",
self.metadata,
with testing.db.connect() as conn:
assert_raises_message(
tsa.exc.StatementError,
- util.u(
- "A value is required for bind parameter 'uname'\n"
- r".*SELECT users.user_name AS .m\xe9il."
- )
- if util.py2k
- else util.u(
- "A value is required for bind parameter 'uname'\n"
- ".*SELECT users.user_name AS .méil."
- ),
+ "A value is required for bind parameter 'uname'\n"
+ ".*SELECT users.user_name AS .méil.",
conn.execute,
select(users.c.user_name.label(name)).where(
users.c.user_name == bindparam("uname")
message = util.u("some message méil").encode("utf-8")
err = tsa.exc.SQLAlchemyError(message)
- if util.py2k:
- # string passes it through
- eq_(str(err), message)
-
- # unicode accessor decodes to utf-8
- eq_(unicode(err), util.u("some message méil")) # noqa F821
- else:
- eq_(str(err), util.u("some message méil"))
+ eq_(str(err), util.u("some message méil"))
def test_stmt_exception_bytestring_latin1(self):
# uncommon case for Py3K, bytestring object passed
message = util.u("some message méil").encode("latin-1")
err = tsa.exc.SQLAlchemyError(message)
- if util.py2k:
- # string passes it through
- eq_(str(err), message)
-
- # unicode accessor decodes to utf-8
- eq_(unicode(err), util.u("some message m\\xe9il")) # noqa F821
- else:
- eq_(str(err), util.u("some message m\\xe9il"))
+ eq_(str(err), util.u("some message m\\xe9il"))
def test_stmt_exception_unicode_hook_unicode(self):
# uncommon case for Py2K, Unicode object passed
message = util.u("some message méil")
err = tsa.exc.SQLAlchemyError(message)
- if util.py2k:
- eq_(unicode(err), util.u("some message méil")) # noqa F821
- else:
- eq_(str(err), util.u("some message méil"))
+ eq_(str(err), util.u("some message méil"))
def test_stmt_exception_object_arg(self):
err = tsa.exc.SQLAlchemyError(Foo())
eq_(str(err), "foo")
- if util.py2k:
- eq_(unicode(err), util.u("fóó")) # noqa F821
-
def test_stmt_exception_str_multi_args(self):
err = tsa.exc.SQLAlchemyError("some message", 206)
eq_(str(err), "('some message', 206)")
class UnicodeReturnsTest(fixtures.TestBase):
- @testing.requires.python3
- def test_unicode_test_not_in_python3(self):
+ def test_unicode_test_not_in(self):
eng = engines.testing_engine()
eng.dialect.returns_unicode_strings = String.RETURNS_UNKNOWN
eng.connect,
)
- @testing.requires.python2
- def test_unicode_test_fails_warning(self):
- class MockCursor(engines.DBAPIProxyCursor):
- def execute(self, stmt, params=None, **kw):
- if "test unicode returns" in stmt:
- raise self.engine.dialect.dbapi.DatabaseError("boom")
- else:
- return super(MockCursor, self).execute(stmt, params, **kw)
-
- eng = engines.proxying_engine(cursor_cls=MockCursor)
- with testing.expect_warnings(
- "Exception attempting to detect unicode returns"
- ):
- eng.connect()
-
- # because plain varchar passed, we don't know the correct answer
- eq_(eng.dialect.returns_unicode_strings, String.RETURNS_CONDITIONAL)
- eng.dispose()
-
class ConvenienceExecuteTest(fixtures.TablesTest):
__backend__ = True
),
)
- @testing.requires.python3
def test_arg_validation_all_seven_posn(self):
"""test #7130"""
with testing.expect_deprecated(
assert canary.call_args_list[0][0][0] is dbapi_con
assert canary.call_args_list[0][0][2] is exc
- @testing.combinations((True, testing.requires.python3), (False,))
+ @testing.combinations((True,), (False,))
def test_checkin_event_gc(self, detach_gced):
p, canary = self._checkin_event_fixture(_is_asyncio=detach_gced)
self._assert_cleanup_on_pooled_reconnect(dbapi, p)
- @testing.combinations((True, testing.requires.python3), (False,))
+ @testing.combinations((True,), (False,))
def test_userspace_disconnectionerror_weakref_finalizer(self, detach_gced):
dbapi, pool = self._queuepool_dbapi_fixture(
pool_size=1, max_overflow=2, _is_asyncio=detach_gced
is_false(async_engine == None)
@async_test
- @testing.requires.python37
async def test_no_attach_to_event_loop(self, testing_engine):
"""test #6409"""
import sqlalchemy as sa
from sqlalchemy import func
from sqlalchemy import select
-from sqlalchemy import testing
from sqlalchemy.ext.asyncio import async_scoped_session
from sqlalchemy.ext.asyncio import AsyncSession as _AsyncSession
from sqlalchemy.orm import sessionmaker
class AsyncScopedSessionTest(AsyncFixture):
- @testing.requires.python37
@async_test
async def test_basic(self, async_engine):
from asyncio import current_task
await AsyncSession.flush()
eq_(await conn.scalar(stmt), 0)
- @testing.requires.python37
def test_attributes(self, async_engine):
from asyncio import current_task
][0]
is_(ck.columns.cprop, Bar.__table__.c.cprop)
- if testing.requires.python3.enabled:
- # test the existing failure case in case something changes
- def go():
- class Bat(Base):
- __tablename__ = "bat"
+ # test the existing failure case in case something changes
+ def go():
+ class Bat(Base):
+ __tablename__ = "bat"
- id = Column(Integer, primary_key=True)
- cprop = deferred(Column(Integer))
+ id = Column(Integer, primary_key=True)
+ cprop = deferred(Column(Integer))
- # we still can't do an expression like
- # "cprop > 5" because the column property isn't
- # a full blown column
+ # we still can't do an expression like
+ # "cprop > 5" because the column property isn't
+ # a full blown column
- __table_args__ = (CheckConstraint(cprop > 5),)
+ __table_args__ = (CheckConstraint(cprop > 5),)
- assert_raises(TypeError, go)
+ assert_raises(TypeError, go)
def test_relationship_level_msg_for_invalid_callable(self):
class A(Base):
mt = MyTable(id=5)
eq_(mt.id, 5)
- @testing.requires.python36
def test_kw_support_in_declarative_meta_init(self):
# This will not fail if DeclarativeMeta __init__ supports **kw
class OrderedDictFixture(object):
@testing.fixture
def ordered_dict_mro(self):
- if testing.requires.python37.enabled:
- return type("ordered", (collections.MappedCollection,), {})
- else:
- return type(
- "ordered", (util.OrderedDict, collections.MappedCollection), {}
- )
+ return type("ordered", (collections.MappedCollection,), {})
class CollectionsTest(OrderedDictFixture, fixtures.ORMTest):
class DataclassesTest(fixtures.MappedTest, testing.AssertsCompiledSQL):
- __requires__ = ("dataclasses",)
-
@classmethod
def define_tables(cls, metadata):
Table(
class PlainDeclarativeDataclassesTest(DataclassesTest):
- __requires__ = ("dataclasses",)
-
run_setup_classes = "each"
run_setup_mappers = "each"
class FieldEmbeddedDeclarativeDataclassesTest(
fixtures.DeclarativeMappedTest, DataclassesTest
):
- __requires__ = ("dataclasses",)
-
@classmethod
def setup_classes(cls):
declarative = cls.DeclarativeBasic.registry.mapped
class FieldEmbeddedWMixinTest(FieldEmbeddedDeclarativeDataclassesTest):
- __requires__ = ("dataclasses",)
-
@classmethod
def setup_classes(cls):
declarative = cls.DeclarativeBasic.registry.mapped
class FieldEmbeddedMixinWLambdaTest(fixtures.DeclarativeMappedTest):
- __requires__ = ("dataclasses",)
-
@classmethod
def setup_classes(cls):
declarative = cls.DeclarativeBasic.registry.mapped
class FieldEmbeddedMixinWDeclaredAttrTest(FieldEmbeddedMixinWLambdaTest):
- __requires__ = ("dataclasses",)
-
@classmethod
def setup_classes(cls):
declarative = cls.DeclarativeBasic.registry.mapped
class PropagationFromMixinTest(fixtures.TestBase):
- __requires__ = ("dataclasses",)
-
def test_propagate_w_plain_mixin_col(self, run_test):
@dataclasses.dataclass
class CommonMixin:
class PropagationFromAbstractTest(fixtures.TestBase):
- __requires__ = ("dataclasses",)
-
def test_propagate_w_plain_mixin_col(self, run_test):
@dataclasses.dataclass
class BaseType:
Column("value", String(10)),
)
- if util.py2k:
-
- def test_baseclass_map_imperatively(self):
- ht1 = self.tables.ht1
-
- class OldStyle:
- pass
-
- assert_raises(
- sa.exc.ArgumentError,
- self.mapper_registry.map_imperatively,
- OldStyle,
- ht1,
- )
-
- assert_raises(
- sa.exc.ArgumentError,
- self.mapper_registry.map_imperatively,
- 123,
- )
-
- def test_baseclass_legacy_mapper(self):
- ht1 = self.tables.ht1
-
- class OldStyle:
- pass
-
- assert_raises(
- sa.exc.ArgumentError,
- mapper,
- OldStyle,
- ht1,
- )
-
- assert_raises(
- sa.exc.ArgumentError,
- mapper,
- 123,
- )
-
- class NoWeakrefSupport(str):
- pass
-
- # TODO: is weakref support detectable without an instance?
- # self.assertRaises(
- # sa.exc.ArgumentError, mapper, NoWeakrefSupport, t2)
-
class DeferredOptionsTest(AssertsCompiledSQL, _fixtures.FixtureTest):
__dialect__ = "default"
exec(code, glbls)
return names, glbls[clsname]
- @testing.requires.pep520
def test_all_orm_descriptors_pep520_noinh(self):
from sqlalchemy.orm import declarative_base
eq_(MyClass.__mapper__.all_orm_descriptors.keys(), names)
- @testing.requires.pep520
def test_all_orm_descriptors_pep520_onelevel_inh(self):
from sqlalchemy.orm import declarative_base
sub_names + base_names,
)
- @testing.requires.pep520
def test_all_orm_descriptors_pep520_classical(self):
class MyClass(object):
pass
class Py3KFunctionInstTest(fixtures.ORMTest):
- __requires__ = ("python3",)
-
def _instrument(self, cls):
manager = instrumentation.register_class(cls)
canary = []
m = self.mapper(Foo, foo_t)
class DontCompareMeToString(int):
- if util.py2k:
-
- def __lt__(self, other):
- assert not isinstance(other, basestring) # noqa
- return int(self) < other
+ pass
foos = [Foo(id_="f%d" % i) for i in range(5)]
states = [attributes.instance_state(f) for f in foos]
(operators.mul, "*"),
(operators.sub, "-"),
(operators.truediv, "/"),
- (operators.div, "/"),
argnames="py_op, sql_op",
id_="ar",
)
eq_(mock_object_session.mock_calls, [mock.call("foo")])
@testing.combinations(
- ("style1", testing.requires.python3),
- ("style2", testing.requires.python3),
+ "style1",
+ "style2",
"style3",
"style4",
)
# new in 1.4
is_(session._legacy_transaction(), None)
- @testing.requires.python2
- @testing.requires.savepoints_w_release
- def test_report_primary_error_when_rollback_fails(self):
- User, users = self.classes.User, self.tables.users
-
- self.mapper_registry.map_imperatively(User, users)
-
- with fixture_session() as session:
-
- with expect_warnings(
- ".*during handling of a previous exception.*"
- ):
- session.begin_nested()
- savepoint = session.connection()._nested_transaction._savepoint
-
- # force the savepoint to disappear
- session.connection().dialect.do_release_savepoint(
- session.connection(), savepoint
- )
-
- # now do a broken flush
- session.add_all([User(id=1), User(id=1)])
-
- assert_raises_message(
- sa_exc.DBAPIError, "ROLLBACK TO SAVEPOINT ", session.flush
- )
-
class _LocalFixture(FixtureTest):
run_setup_mappers = "once"
)
l2 = ClauseList(
- table_c.c.x, table_c.c.y, table_d.c.y, operator=operators.div
+ table_c.c.x, table_c.c.y, table_d.c.y, operator=operators.truediv
)
is_false(l1.compare(l2))
("add", operator.add, "+"),
("mul", operator.mul, "*"),
("sub", operator.sub, "-"),
- ("div", operator.truediv if util.py3k else operator.div, "/"),
+ ("div", operator.truediv, "/"),
("mod", operator.mod, "%"),
id_="iaa",
)
self._assert_types(expr.right.type.types)
# since we want to infer "binary"
- @testing.requires.python3
def test_tuple_type_expanding_inference(self):
a, b, c = column("a"), column("b"), column("c")
self._assert_types(expr.right.type.types)
- @testing.requires.python3
def test_tuple_type_plain_inference(self):
a, b, c = column("a"), column("b"), column("c")
from sqlalchemy import sql
from sqlalchemy import Table
from sqlalchemy import testing
-from sqlalchemy import util
from sqlalchemy.engine import default
from sqlalchemy.sql import compiler
from sqlalchemy.sql import LABEL_STYLE_TABLENAME_PLUS_COL
def test_repr_unicode(self):
name = quoted_name(u"姓名", None)
- if util.py2k:
- eq_(repr(name), "'\u59d3\u540d'")
- else:
- eq_(repr(name), repr(u"姓名"))
+ eq_(repr(name), repr(u"姓名"))
def test_lower_case_names(self):
# Create table with quote defaults
eq_(row._mapping[s.c.user_id], 7)
eq_(row._mapping[s.c.user_name], "ed")
- @testing.requires.python3
def test_ro_mapping_py3k(self, connection):
users = self.tables.users
eq_(odict_row.values(), mapping_row.values())
eq_(odict_row.items(), mapping_row.items())
- @testing.requires.python2
- def test_ro_mapping_py2k(self, connection):
- users = self.tables.users
-
- connection.execute(users.insert(), dict(user_id=1, user_name="foo"))
- result = connection.execute(users.select())
-
- row = result.first()
- dict_row = row._asdict()
-
- odict_row = collections.OrderedDict(
- [("user_id", 1), ("user_name", "foo")]
- )
- eq_(dict_row, odict_row)
- mapping_row = row._mapping
-
- eq_(list(mapping_row), list(mapping_row.keys()))
- eq_(odict_row.keys(), list(mapping_row.keys()))
- eq_(odict_row.values(), list(mapping_row.values()))
- eq_(odict_row.items(), list(mapping_row.items()))
-
@testing.combinations(
(lambda result: result),
(lambda result: result.first(),),