It's better, the majority of these changes look more readable to me.
also found some docstrings that had formatting / quoting issues.
Cross-implemented with master
Change-Id: I582a45fde3a5648b2f36bab96bad56881321899b
import sys
+
from packaging import tags
to_check = "--"
# See https://pre-commit.com/hooks.html for more hooks
repos:
- repo: https://github.com/python/black
- rev: 19.10b0
+ rev: 20.8b1
hooks:
- id: black
def __iter__(self):
"""override __iter__ to pull results from dogpile
- if particular attributes have been configured.
+ if particular attributes have been configured.
- Note that this approach does *not* detach the loaded objects from
- the current session. If the cache backend is an in-process cache
- (like "memory") and lives beyond the scope of the current session's
- transaction, those objects may be expired. The method here can be
- modified to first expunge() each loaded item from the current
- session before returning the list of items, so that the items
- in the cache are not the same ones in the current Session.
+ Note that this approach does *not* detach the loaded objects from
+ the current session. If the cache backend is an in-process cache
+ (like "memory") and lives beyond the scope of the current session's
+ transaction, those objects may be expired. The method here can be
+ modified to first expunge() each loaded item from the current
+ session before returning the list of items, so that the items
+ in the cache are not the same ones in the current Session.
"""
super_ = super(CachingQuery, self)
def _execute_and_instances(self, context):
"""override _execute_and_instances to pull results from dogpile
- if the query is invoked directly from an external context.
+ if the query is invoked directly from an external context.
- This method is necessary in order to maintain compatibility
- with the "baked query" system now used by default in some
- relationship loader scenarios. Note also the
- RelationshipCache._generate_cache_key method which enables
- the baked query to be used within lazy loads.
+ This method is necessary in order to maintain compatibility
+ with the "baked query" system now used by default in some
+ relationship loader scenarios. Note also the
+ RelationshipCache._generate_cache_key method which enables
+ the baked query to be used within lazy loads.
- .. versionadded:: 1.2.7
+ .. versionadded:: 1.2.7
"""
super_ = super(CachingQuery, self)
class RelationshipCache(MapperOption):
"""Specifies that a Query as called within a "lazy load"
- should load results from a cache."""
+ should load results from a cache."""
propagate_to_loaders = True
e1.data = "e2"
session.commit()
-assert session.query(
- Example.id,
- Example.version_id,
- Example.is_current_version,
- Example.calc_is_current_version,
- Example.data,
-).order_by(Example.id, Example.version_id).all() == (
- [(1, 1, False, False, "e1"), (1, 2, True, True, "e2")]
+assert (
+ session.query(
+ Example.id,
+ Example.version_id,
+ Example.is_current_version,
+ Example.calc_is_current_version,
+ Example.data,
+ )
+ .order_by(Example.id, Example.version_id)
+ .all()
+ == ([(1, 1, False, False, "e1"), (1, 2, True, True, "e2")])
)
# example 2, versioning with a parent
assert p1.child_id == 1
assert p1.child.version_id == 2
-assert session.query(
- Child.id,
- Child.version_id,
- Child.is_current_version,
- Child.calc_is_current_version,
- Child.data,
-).order_by(Child.id, Child.version_id).all() == (
- [(1, 1, False, False, "c1"), (1, 2, True, True, "c2")]
+assert (
+ session.query(
+ Child.id,
+ Child.version_id,
+ Child.is_current_version,
+ Child.calc_is_current_version,
+ Child.data,
+ )
+ .order_by(Child.id, Child.version_id)
+ .all()
+ == ([(1, 1, False, False, "c1"), (1, 2, True, True, "c2")])
)
@value.comparator
class value(PropComparator):
- """A comparator for .value, builds a polymorphic comparison via CASE.
-
- """
+ """A comparator for .value, builds a polymorphic comparison via
+ CASE."""
def __init__(self, cls):
self.cls = cls
from .engine import engine_from_config # noqa nosort
-__version__ = '1.3.20'
+__version__ = "1.3.20"
def __go(lcls):
@classmethod
def _load_mx_exceptions(cls):
- """ Import mxODBC exception classes into the module namespace,
+ """Import mxODBC exception classes into the module namespace,
as if they had been imported normally. This is done here
to avoid requiring all SQLAlchemy users to install mxODBC.
"""
return connect
def _error_handler(self):
- """ Return a handler that adjusts mxODBC's raised Warnings to
+ """Return a handler that adjusts mxODBC's raised Warnings to
emit Python standard warnings.
"""
from mx.ODBC.Error import Warning as MxOdbcWarning
self.__zero_date, value.time()
)
elif isinstance(value, datetime.time):
- """ issue #5339
+ """issue #5339
per: https://github.com/mkleehammer/pyodbc/wiki/Tips-and-Tricks-by-Database-Platform#time-columns
pass TIME value as string
""" # noqa
class TryCast(sql.elements.Cast):
- """Represent a SQL Server TRY_CAST expression.
-
- """
+ """Represent a SQL Server TRY_CAST expression."""
__visit_name__ = "try_cast"
class _MSNumeric_mxodbc(_MSNumeric_pyodbc):
- """Include pyodbc's numeric processor.
- """
+ """Include pyodbc's numeric processor."""
class _MSDate_mxodbc(_MSDate):
class _cymysqlBIT(BIT):
def result_processor(self, dialect, coltype):
- """Convert a MySQL's 64 bit, variable length binary string to a long.
- """
+ """Convert a MySQL's 64 bit, variable length binary string to a
+ long."""
def process(value):
if value is not None:
class TIMESTAMP(sqltypes.TIMESTAMP):
- """MySQL TIMESTAMP type.
-
- """
+ """MySQL TIMESTAMP type."""
__visit_name__ = "TIMESTAMP"
class DATETIME(sqltypes.DATETIME):
- """MySQL DATETIME type.
-
- """
+ """MySQL DATETIME type."""
__visit_name__ = "DATETIME"
# allow all strings to come back natively as Unicode
elif (
dialect.coerce_to_unicode
- and default_type in (cx_Oracle.STRING, cx_Oracle.FIXED_CHAR,)
+ and default_type
+ in (
+ cx_Oracle.STRING,
+ cx_Oracle.FIXED_CHAR,
+ )
and default_type is not cx_Oracle.CLOB
and default_type is not cx_Oracle.NCLOB
):
cx_Oracle.BLOB,
):
return cursor.var(
- cx_Oracle.LONG_BINARY, size, cursor.arraysize,
+ cx_Oracle.LONG_BINARY,
+ size,
+ cursor.arraysize,
)
return output_type_handler
as Python uuid objects, converting to/from string via the
DBAPI.
- """
+ """
if as_uuid and _python_UUID is None:
raise NotImplementedError(
"This version of Python does not support "
def initialize(self, connection):
super(PGDialect, self).initialize(connection)
- self.implicit_returning = self.server_version_info > (
- 8,
- 2,
- ) and self.__dict__.get("implicit_returning", True)
+ self.implicit_returning = (
+ self.server_version_info
+ > (
+ 8,
+ 2,
+ )
+ and self.__dict__.get("implicit_returning", True)
+ )
self.supports_native_enum = self.server_version_info >= (8, 3)
if not self.supports_native_enum:
self.colspecs = self.colspecs.copy()
return self.operate(HAS_KEY, other, result_type=sqltypes.Boolean)
def has_all(self, other):
- """Boolean expression. Test for presence of all keys in jsonb
- """
+ """Boolean expression. Test for presence of all keys in jsonb"""
return self.operate(HAS_ALL, other, result_type=sqltypes.Boolean)
def has_any(self, other):
- """Boolean expression. Test for presence of any key in jsonb
- """
+ """Boolean expression. Test for presence of any key in jsonb"""
return self.operate(HAS_ANY, other, result_type=sqltypes.Boolean)
def contains(self, other, **kwargs):
.. versionadded:: 1.1
- """
+ """
super(JSON, self).__init__(none_as_null=none_as_null)
if astext_type is not None:
self.astext_type = astext_type
return self.operate(HAS_KEY, other, result_type=sqltypes.Boolean)
def has_all(self, other):
- """Boolean expression. Test for presence of all keys in jsonb
- """
+ """Boolean expression. Test for presence of all keys in jsonb"""
return self.operate(HAS_ALL, other, result_type=sqltypes.Boolean)
def has_any(self, other):
- """Boolean expression. Test for presence of any key in jsonb
- """
+ """Boolean expression. Test for presence of any key in jsonb"""
return self.operate(HAS_ANY, other, result_type=sqltypes.Boolean)
def contains(self, other, **kwargs):
class INT4RANGE(RangeOperators, sqltypes.TypeEngine):
- """Represent the PostgreSQL INT4RANGE type.
-
- """
+ """Represent the PostgreSQL INT4RANGE type."""
__visit_name__ = "INT4RANGE"
class INT8RANGE(RangeOperators, sqltypes.TypeEngine):
- """Represent the PostgreSQL INT8RANGE type.
-
- """
+ """Represent the PostgreSQL INT8RANGE type."""
__visit_name__ = "INT8RANGE"
class NUMRANGE(RangeOperators, sqltypes.TypeEngine):
- """Represent the PostgreSQL NUMRANGE type.
-
- """
+ """Represent the PostgreSQL NUMRANGE type."""
__visit_name__ = "NUMRANGE"
class DATERANGE(RangeOperators, sqltypes.TypeEngine):
- """Represent the PostgreSQL DATERANGE type.
-
- """
+ """Represent the PostgreSQL DATERANGE type."""
__visit_name__ = "DATERANGE"
class TSRANGE(RangeOperators, sqltypes.TypeEngine):
- """Represent the PostgreSQL TSRANGE type.
-
- """
+ """Represent the PostgreSQL TSRANGE type."""
__visit_name__ = "TSRANGE"
class TSTZRANGE(RangeOperators, sqltypes.TypeEngine):
- """Represent the PostgreSQL TSTZRANGE type.
-
- """
+ """Represent the PostgreSQL TSTZRANGE type."""
__visit_name__ = "TSTZRANGE"
return c
def get_execution_options(self):
- """ Get the non-SQL options which will take effect during execution.
+ """Get the non-SQL options which will take effect during execution.
.. versionadded:: 1.3
self.connection._discard_transaction(self)
def rollback(self):
- """Roll back this :class:`.Transaction`.
-
- """
+ """Roll back this :class:`.Transaction`."""
if self._parent.is_active:
self._do_rollback()
self.is_active = False
return OptionEngine(self, opt)
def get_execution_options(self):
- """ Get the non-SQL options which will take effect during execution.
+ """Get the non-SQL options which will take effect during execution.
.. versionadded: 1.3
:param dbapi_connection: a DBAPI connection, typically
proxied within a :class:`.ConnectionFairy`.
- """
+ """
raise NotImplementedError()
:param dbapi_connection: a DBAPI connection, typically
proxied within a :class:`.ConnectionFairy`.
- """
+ """
raise NotImplementedError()
":meth:`_schema.MetaData.create_all`.",
)
def create(self, entity, **kwargs):
- """Emit CREATE statements for the given schema entity.
- """
+ """Emit CREATE statements for the given schema entity."""
raise NotImplementedError()
":meth:`_schema.MetaData.drop_all`.",
)
def drop(self, entity, **kwargs):
- """Emit DROP statements for the given schema entity.
- """
+ """Emit DROP statements for the given schema entity."""
raise NotImplementedError()
return self.dialect.default_schema_name
def get_schema_names(self):
- """Return all schema names.
- """
+ """Return all schema names."""
if hasattr(self.dialect, "get_schema_names"):
return self.dialect.get_schema_names(
def _update(self, other, only_propagate=True):
"""Populate from the listeners in another :class:`_Dispatch`
- object."""
+ object."""
existing_listeners = self.listeners
existing_listener_set = set(existing_listeners)
def _update(self, other, only_propagate=True):
"""Populate from the listeners in another :class:`_Dispatch`
- object."""
+ object."""
for ls in other._event_descriptors:
if isinstance(ls, _EmptyListener):
continue
class _EventKey(object):
- """Represent :func:`.listen` arguments.
- """
+ """Represent :func:`.listen` arguments."""
__slots__ = (
"target",
collection.remove(self.with_wrapper(listener_fn))
def contains(self):
- """Return True if this event key is registered to listen.
- """
+ """Return True if this event key is registered to listen."""
return self._key in _key_to_collection
def base_listen(
else:
return (
"(Background on this error at: "
- "http://sqlalche.me/e/%s/%s)" % (_version_token, self.code,)
+ "http://sqlalche.me/e/%s/%s)"
+ % (
+ _version_token,
+ self.code,
+ )
)
def _message(self, as_unicode=compat.py3k):
class ObjectAssociationProxyInstance(AssociationProxyInstance):
- """an :class:`.AssociationProxyInstance` that has an object as a target.
- """
+ """an :class:`.AssociationProxyInstance` that has an object as a target."""
_target_is_object = True
_is_canonical = True
return self._cache_key + (session._query_cls,)
def _with_lazyload_options(self, options, effective_path, cache_path=None):
- """Cloning version of _add_lazyload_options.
- """
+ """Cloning version of _add_lazyload_options."""
q = self._clone()
q._add_lazyload_options(options, effective_path, cache_path=cache_path)
return q
:ref:`hybrid_reuse_subclass`
- """
+ """
return self
def getter(self, fget):
def _reconstitute(cls, dict_, items):
- """ Reconstitute an :class:`.OrderingList`.
+ """Reconstitute an :class:`.OrderingList`.
This is the adjoint to :meth:`.OrderingList.__reduce__`. It is used for
unpickling :class:`.OrderingList` objects.
:class:`sqlalchemy.exc.NoInspectionAvailable`
is raised. If ``False``, ``None`` is returned.
- """
+ """
type_ = type(subject)
for cls in type_.__mro__:
if cls in _registrars:
class ScalarObjectAttributeImpl(ScalarAttributeImpl):
"""represents a scalar-holding InstrumentedAttribute,
- where the target object is also instrumented.
+ where the target object is also instrumented.
- Adds events to delete/set operations.
+ Adds events to delete/set operations.
"""
check_old=None,
pop=False,
):
- """Set a value on the given InstanceState.
-
- """
+ """Set a value on the given InstanceState."""
if self.dispatch._active_history:
old = self.get(
state,
"""Execute before flush process has started.
`instances` is an optional list of objects which were passed to
- the ``flush()`` method. """
+ the ``flush()`` method."""
def after_flush(self, session, flush_context):
"""Execute after flush has completed, but before commit has been
This will be when the 'new', 'dirty', and 'deleted' lists are in
their final state. An actual commit() may or may not have
occurred, depending on whether or not the flush started its own
- transaction or participated in a larger transaction. """
+ transaction or participated in a larger transaction."""
def after_begin(self, session, transaction, connection):
"""Execute after a transaction is begun on a connection
`transaction` is the SessionTransaction. This method is called
- after an engine level transaction is begun on a connection. """
+ after an engine level transaction is begun on a connection."""
def after_attach(self, session, instance):
"""Execute after an instance is attached to a session.
- This is called after an add, delete or merge. """
+ This is called after an add, delete or merge."""
def after_bulk_update(self, session, query, query_context, result):
"""Execute after a bulk update operation to the session.
class DescriptorProperty(MapperProperty):
""":class:`.MapperProperty` which proxies access to a
- user-defined descriptor."""
+ user-defined descriptor."""
doc = None
_polymorphic_from=None,
):
"""Produce a mapper level row processor callable
- which processes rows into mapped instances."""
+ which processes rows into mapped instances."""
# note that this method, most of which exists in a closure
# called _instance(), resists being broken out, as
class PostLoad(object):
- """Track loaders and states for "post load" operations.
-
- """
+ """Track loaders and states for "post load" operations."""
__slots__ = "loaders", "states", "load_keys"
@classmethod
def _configure_all(cls):
- """Class-level path to the :func:`.configure_mappers` call.
- """
+ """Class-level path to the :func:`.configure_mappers` call."""
configure_mappers()
def dispose(self):
def _property_from_column(self, key, prop):
"""generate/update a :class:`.ColumnProprerty` given a
- :class:`_schema.Column` object. """
+ :class:`_schema.Column` object."""
# we were passed a Column or a list of Columns;
# generate a properties.ColumnProperty
return key in self._props
def get_property(self, key, _configure_mappers=True):
- """return a MapperProperty associated with the given key.
- """
+ """return a MapperProperty associated with the given key."""
if _configure_mappers and Mapper._new_mappers:
configure_mappers()
return self.with_hint(None, text, dialect_name)
def get_execution_options(self):
- """ Get the non-SQL options which will take effect during execution.
+ """Get the non-SQL options which will take effect during execution.
.. versionadded:: 1.3
@_generative()
def execution_options(self, **kwargs):
- """ Set non-SQL options which take effect during execution.
+ """Set non-SQL options which take effect during execution.
The options are the same as those accepted by
:meth:`_engine.Connection.execution_options`.
class _ColInAnnotations(object):
"""Seralizable equivalent to:
- lambda c: "name" in c._annotations
+ lambda c: "name" in c._annotations
"""
def __init__(self, name):
except sa_exc.NoInspectionAvailable as err:
if isinstance(mapper, type):
util.raise_(
- exc.UnmappedClassError(mapper), replace_context=err,
+ exc.UnmappedClassError(mapper),
+ replace_context=err,
)
else:
raise
state = attributes.instance_state(instance)
except exc.NO_STATE as err:
util.raise_(
- exc.UnmappedInstanceError(instance), replace_context=err,
+ exc.UnmappedInstanceError(instance),
+ replace_context=err,
)
self._expire_state(state, attribute_names)
state = attributes.instance_state(instance)
except exc.NO_STATE as err:
util.raise_(
- exc.UnmappedInstanceError(instance), replace_context=err,
+ exc.UnmappedInstanceError(instance),
+ replace_context=err,
)
self._expire_state(state, attribute_names)
state = attributes.instance_state(instance)
except exc.NO_STATE as err:
util.raise_(
- exc.UnmappedInstanceError(instance), replace_context=err,
+ exc.UnmappedInstanceError(instance),
+ replace_context=err,
)
if state.session_id is not self.hash_key:
raise sa_exc.InvalidRequestError(
state = attributes.instance_state(instance)
except exc.NO_STATE as err:
util.raise_(
- exc.UnmappedInstanceError(instance), replace_context=err,
+ exc.UnmappedInstanceError(instance),
+ replace_context=err,
)
self._save_or_update_state(state)
state = attributes.instance_state(instance)
except exc.NO_STATE as err:
util.raise_(
- exc.UnmappedInstanceError(instance), replace_context=err,
+ exc.UnmappedInstanceError(instance),
+ replace_context=err,
)
self._delete_impl(state, instance, head=True)
state = attributes.instance_state(instance)
except exc.NO_STATE as err:
util.raise_(
- exc.UnmappedInstanceError(instance), replace_context=err,
+ exc.UnmappedInstanceError(instance),
+ replace_context=err,
)
return self._contains_state(state)
except exc.NO_STATE as err:
util.raise_(
- exc.UnmappedInstanceError(o), replace_context=err,
+ exc.UnmappedInstanceError(o),
+ replace_context=err,
)
objset.add(state)
else:
state = attributes.instance_state(instance)
except exc.NO_STATE as err:
util.raise_(
- exc.UnmappedInstanceError(instance), replace_context=err,
+ exc.UnmappedInstanceError(instance),
+ replace_context=err,
)
else:
return _state_session(state)
:ref:`session_object_states`
- """
+ """
return self.key is not None and self._attached and not self._deleted
@property
def _reset(self, dict_, key):
"""Remove the given attribute and any
- callables associated with it."""
+ callables associated with it."""
old = dict_.pop(key, None)
if old is not None and self.manager[key].impl.collection:
from .sql.schema import FetchedValue # noqa
from .sql.schema import ForeignKey # noqa
from .sql.schema import ForeignKeyConstraint # noqa
-from .sql.schema import Index # noqa
from .sql.schema import IdentityOptions # noqa
+from .sql.schema import Index # noqa
from .sql.schema import MetaData # noqa
from .sql.schema import PassiveDefault # noqa
from .sql.schema import PrimaryKeyConstraint # noqa
return self._execution_options
def execute(self, *multiparams, **params):
- """Compile and execute this :class:`.Executable`.
-
- """
+ """Compile and execute this :class:`.Executable`."""
e = self.bind
if e is None:
label = getattr(self, "description", self.__class__.__name__)
def replace(self, column):
"""Add the given column to this collection, removing unaliased
- versions of this column as well as existing columns with the
- same key.
+ versions of this column as well as existing columns with the
+ same key.
- E.g.::
+ E.g.::
- t = Table('sometable', metadata, Column('col1', Integer))
- t.columns.replace(Column('col1', Integer, key='columnone'))
+ t = Table('sometable', metadata, Column('col1', Integer))
+ t.columns.replace(Column('col1', Integer, key='columnone'))
- will remove the original 'col1' from the collection, and add
- the new column under the name 'columnname'.
+ will remove the original 'col1' from the collection, and add
+ the new column under the name 'columnname'.
- Used by schema.Column to override columns during table reflection.
+ Used by schema.Column to override columns during table reflection.
"""
remove_col = None
"does nothing.",
)
def compile(self):
- """Produce the internal string representation of this element.
- """
+ """Produce the internal string representation of this element."""
pass
def _execute_on_connection(self, connection, multiparams, params):
def sort_tables(
- tables, skip_fn=None, extra_dependencies=None,
+ tables,
+ skip_fn=None,
+ extra_dependencies=None,
):
"""Sort a collection of :class:`_schema.Table` objects based on
dependency.
class UpdateBase(
HasCTE, DialectKWArgs, HasPrefixes, Executable, ClauseElement
):
- """Form the base for ``INSERT``, ``UPDATE``, and ``DELETE`` statements.
-
- """
+ """Form the base for ``INSERT``, ``UPDATE``, and ``DELETE`` statements."""
__visit_name__ = "update_base"
class IndexExpression(BinaryExpression):
- """Represent the class of expressions that are like an "index" operation.
- """
+ """Represent the class of expressions that are like an "index"
+ operation."""
pass
":class:`.DDLEvents`.",
)
def append_ddl_listener(self, event_name, listener):
- """Append a DDL event listener to this ``Table``.
-
- """
+ """Append a DDL event listener to this ``Table``."""
def adapt_listener(target, connection, **kw):
listener(event_name, target, connection)
"future release. Please refer to :class:`.DefaultClause`.",
)
class PassiveDefault(DefaultClause):
- """A DDL-specified DEFAULT column value.
- """
+ """A DDL-specified DEFAULT column value."""
def __init__(self, *arg, **kw):
DefaultClause.__init__(self, *arg, **kw)
if col.autoincrement is True:
_validate_autoinc(col, True)
return col
- elif col.autoincrement in (
- "auto",
- "ignore_fk",
- ) and _validate_autoinc(col, False):
+ elif (
+ col.autoincrement
+ in (
+ "auto",
+ "ignore_fk",
+ )
+ and _validate_autoinc(col, False)
+ ):
return col
else:
":class:`.DDLEvents`.",
)
def append_ddl_listener(self, event_name, listener):
- """Append a DDL event listener to this ``MetaData``.
-
-
- """
+ """Append a DDL event listener to this ``MetaData``."""
def adapt_listener(target, connection, **kw):
tables = kw["tables"]
@property
def for_update(self):
- """Provide legacy dialect support for the ``for_update`` attribute.
- """
+ """Provide legacy dialect support for the ``for_update`` attribute."""
if self._for_update_arg is not None:
return self._for_update_arg.legacy_for_update_value
else:
class Exists(UnaryExpression):
- """Represent an ``EXISTS`` clause.
-
- """
+ """Represent an ``EXISTS`` clause."""
__visit_name__ = UnaryExpression.__visit_name__
_from_objects = []
:attr:`.types.JSON.NULL`
- """
+ """
self.none_as_null = none_as_null
class JSONElementType(TypeEngine):
@util.memoized_property
def _has_literal_processor(self):
- """memoized boolean, check if process_literal_param is implemented.
-
-
- """
+ """memoized boolean, check if process_literal_param is implemented."""
return (
self.__class__.process_literal_param.__code__
def assert_raises_context_ok(except_cls, callable_, *args, **kw):
_assert_raises(
- except_cls, callable_, args, kw,
+ except_cls,
+ callable_,
+ args,
+ kw,
)
if msg is not None:
assert re.search(
msg, util.text_type(err), re.UNICODE
- ), "%r !~ %s" % (msg, err,)
+ ), "%r !~ %s" % (
+ msg,
+ err,
+ )
if check_context and not are_we_already_in_a_traceback:
_assert_proper_exception_context(err)
print(util.text_type(err).encode("utf-8"))
assert reflected_table.primary_key.columns[c.name] is not None
def assert_types_base(self, c1, c2):
- assert c1.type._compare_type_affinity(c2.type), (
- "On column %r, type '%s' doesn't correspond to type '%s'"
- % (c1.name, c1.type, c2.type)
+ assert c1.type._compare_type_affinity(
+ c2.type
+ ), "On column %r, type '%s' doesn't correspond to type '%s'" % (
+ c1.name,
+ c1.type,
+ c2.type,
)
@register.init
def update_db_opts(db_url, db_opts):
- """Set database options (db_opts) for a test database that we created.
- """
+ """Set database options (db_opts) for a test database that we created."""
pass
@property
def on_update_cascade(self):
- """"target database must support ON UPDATE..CASCADE behavior in
+ """target database must support ON UPDATE..CASCADE behavior in
foreign keys."""
return exclusions.open()
@property
def implements_get_lastrowid(self):
- """"target dialect implements the executioncontext.get_lastrowid()
+ """target dialect implements the executioncontext.get_lastrowid()
method without reliance on RETURNING.
"""
@property
def emulated_lastrowid(self):
- """"target dialect retrieves cursor.lastrowid, or fetches
+ """target dialect retrieves cursor.lastrowid, or fetches
from a database-side function after an insert() construct executes,
within the get_lastrowid() method.
@property
def dbapi_lastrowid(self):
- """"target platform includes a 'lastrowid' accessor on the DBAPI
+ """target platform includes a 'lastrowid' accessor on the DBAPI
cursor object.
"""
@property
def cross_schema_fk_reflection(self):
- """target system must support reflection of inter-schema foreign keys
-
- """
+ """target system must support reflection of inter-schema foreign
+ keys"""
return exclusions.closed()
@property
def implicit_default_schema(self):
"""target system has a strong concept of 'default' schema that can
- be referred to implicitly.
+ be referred to implicitly.
- basically, PostgreSQL.
+ basically, PostgreSQL.
"""
return exclusions.closed()
@property
def view_reflection(self):
- """target database must support inspection of the full CREATE VIEW definition.
- """
+ """target database must support inspection of the full CREATE VIEW
+ definition."""
return self.views
@property
@property
def symbol_names_w_double_quote(self):
- """Target driver can create tables with a name like 'some " table'
-
- """
+ """Target driver can create tables with a name like 'some " table'"""
return exclusions.open()
@property
@property
def json_array_indexes(self):
- """"target platform supports numeric array indexes
+ """target platform supports numeric array indexes
within a JSON structure"""
return self.json_type
Column("related_id", Integer),
sa.PrimaryKeyConstraint("id", name="pk quote ' one"),
sa.Index("ix quote ' one", "name"),
- sa.UniqueConstraint("data", name="uq quote' one",),
+ sa.UniqueConstraint(
+ "data",
+ name="uq quote' one",
+ ),
sa.ForeignKeyConstraint(
["id"], ["related.id"], name="fk quote ' one"
),
Column("related_id", Integer),
sa.PrimaryKeyConstraint("id", name='pk quote " two'),
sa.Index('ix quote " two', "name"),
- sa.UniqueConstraint("data", name='uq quote" two',),
+ sa.UniqueConstraint(
+ "data",
+ name='uq quote" two',
+ ),
sa.ForeignKeyConstraint(
["id"], ["related.id"], name='fk quote " two'
),
)
if testing.requires.computed_columns_virtual.enabled:
self.check_column(
- data, "computed_virtual", "normal+2", False,
+ data,
+ "computed_virtual",
+ "normal+2",
+ False,
)
if testing.requires.computed_columns_stored.enabled:
self.check_column(
- data, "computed_stored", "normal-42", True,
+ data,
+ "computed_stored",
+ "normal-42",
+ True,
)
@testing.requires.schemas
)
if testing.requires.computed_columns_virtual.enabled:
self.check_column(
- data, "computed_virtual", "normal/2", False,
+ data,
+ "computed_virtual",
+ "normal/2",
+ False,
)
if testing.requires.computed_columns_stored.enabled:
self.check_column(
- data, "computed_stored", "normal*42", True,
+ data,
+ "computed_stored",
+ "normal*42",
+ True,
)
tbl.select(tbl.c.col_a.is_distinct_from(tbl.c.col_b))
).fetchall()
eq_(
- len(result), expected_row_count_for_is,
+ len(result),
+ expected_row_count_for_is,
)
expected_row_count_for_isnot = (
tbl.select(tbl.c.col_a.isnot_distinct_from(tbl.c.col_b))
).fetchall()
eq_(
- len(result), expected_row_count_for_isnot,
+ len(result),
+ expected_row_count_for_isnot,
)
@testing.requires.precision_numerics_enotation_large
def test_enotation_decimal_large(self):
- """test exceedingly large decimals.
-
- """
+ """test exceedingly large decimals."""
numbers = set(
[
conn = connection
conn.execute(
- self.tables.data_table.insert(), {"name": "r1", "data": JSON.NULL},
+ self.tables.data_table.insert(),
+ {"name": "r1", "data": JSON.NULL},
)
eq_(
if py35:
def _formatannotation(annotation, base_module=None):
- """vendored from python 3.7
- """
+ """vendored from python 3.7"""
if getattr(annotation, "__module__", None) == "typing":
return repr(annotation).replace("typing.", "")
self._exc_info = None # remove potential circular references
if not self.warn_only:
compat.raise_(
- exc_value, with_traceback=exc_tb,
+ exc_value,
+ with_traceback=exc_tb,
)
else:
if not compat.py3k and self._exc_info and self._exc_info[1]:
def iterate_attributes(cls):
"""iterate all the keys and attributes associated
- with a class, without using getattr().
+ with a class, without using getattr().
- Does not use getattr() so that class-sensitive
- descriptors (i.e. property.__get__()) are not called.
+ Does not use getattr() so that class-sensitive
+ descriptors (i.e. property.__get__()) are not called.
"""
keys = dir(cls)
def repr_tuple_names(names):
- """ Trims a list of strings from the middle and return a string of up to
- four elements. Strings greater than 11 characters will be truncated"""
+ """Trims a list of strings from the middle and return a string of up to
+ four elements. Strings greater than 11 characters will be truncated"""
if len(names) == 0:
return None
flag = len(names) <= 4
identity = C4DAFEE1
[flake8]
-show-source = true
+show-source = false
enable-extensions = G
# E203 is due to https://github.com/PyCQA/pycodestyle/issues/373
ignore =
class ZooMarkTest(replay_fixture.ReplayFixtureTest):
- """Runs the ZooMark and squawks if method counts vary from the norm.
-
-
- """
+ """Runs the ZooMark and squawks if method counts vary from the norm."""
__requires__ = ("cpython",)
__only_on__ = "postgresql+psycopg2"
)
def test_warn_deprecated_limited_cap(self):
- """ warn_deprecated_limited() and warn_limited() use
+ """warn_deprecated_limited() and warn_limited() use
_hash_limit_string
actually just verifying that _hash_limit_string works as expected
result = []
- def fail_on_exec(stmt,):
+ def fail_on_exec(
+ stmt,
+ ):
if view is not None and view in stmt:
result.append(("SERIALIZABLE",))
else:
@classmethod
def insert_data(cls, connection):
connection.execute(
- cls.tables.error_t.insert(), [{"error_code": "01002"}],
+ cls.tables.error_t.insert(),
+ [{"error_code": "01002"}],
)
def test_invalid_transaction_detection(self, connection):
).encode("UTF-8")
)
r = con.execute(t1.select()).first()
- assert isinstance(r[1], util.text_type), (
- "%s is %s instead of unicode, working on %s"
- % (r[1], type(r[1]), meta.bind)
+ assert isinstance(
+ r[1], util.text_type
+ ), "%s is %s instead of unicode, working on %s" % (
+ r[1],
+ type(r[1]),
+ meta.bind,
)
eq_(r[1], util.ue("abc \xc3\xa9 def"))
for i in range(self.col_num)
]
)
- self.view_str = view_str = (
- "CREATE VIEW huge_named_view AS SELECT %s FROM base_table"
- % (
- ",".join(
- "long_named_column_number_%d" % i
- for i in range(self.col_num)
- )
+ self.view_str = (
+ view_str
+ ) = "CREATE VIEW huge_named_view AS SELECT %s FROM base_table" % (
+ ",".join(
+ "long_named_column_number_%d" % i for i in range(self.col_num)
)
)
assert len(view_str) > 4000
eq_(schema, expected_schema)
mock_connection = mock.Mock(
- dialect=dialect, scalar=mock.Mock(return_value="Some Database"),
+ dialect=dialect,
+ scalar=mock.Mock(return_value="Some Database"),
)
mock_lambda = mock.Mock()
base._switch_db(schema, mock_connection, mock_lambda, "x", y="bar")
@testing.metadata_fixture()
def datetimeoffset_fixture(self, metadata):
t = Table(
- "test_dates", metadata, Column("adatetimeoffset", DATETIMEOFFSET),
+ "test_dates",
+ metadata,
+ Column("adatetimeoffset", DATETIMEOFFSET),
)
return t
return
conn.execute(
- t.insert(), adatetimeoffset=dto_param_value,
+ t.insert(),
+ adatetimeoffset=dto_param_value,
)
row = conn.execute(t.select()).first()
)
_oracle_char_combinations = testing.combinations(
- ("STRING", cx_Oracle_STRING,),
- ("FIXED_CHAR", cx_Oracle_FIXED_CHAR,),
- ("CLOB", cx_Oracle_CLOB,),
- ("NCLOB", cx_Oracle_NCLOB,),
+ (
+ "STRING",
+ cx_Oracle_STRING,
+ ),
+ (
+ "FIXED_CHAR",
+ cx_Oracle_FIXED_CHAR,
+ ),
+ (
+ "CLOB",
+ cx_Oracle_CLOB,
+ ),
+ (
+ "NCLOB",
+ cx_Oracle_NCLOB,
+ ),
argnames="cx_oracle_type",
id_="ia",
)
@_oracle_char_combinations
@testing.requires.python2
def test_encoding_errors_sqla_py2k(
- self, cx_Oracle, cx_oracle_type,
+ self,
+ cx_Oracle,
+ cx_oracle_type,
):
ignore_dialect = cx_oracle.dialect(
dbapi=cx_Oracle, encoding_errors="ignore"
@_oracle_char_combinations
@testing.requires.python2
def test_no_encoding_errors_sqla_py2k(
- self, cx_Oracle, cx_oracle_type,
+ self,
+ cx_Oracle,
+ cx_oracle_type,
):
plain_dialect = cx_oracle.dialect(dbapi=cx_Oracle)
@_oracle_char_combinations
@testing.requires.python3
def test_encoding_errors_cx_oracle_py3k(
- self, cx_Oracle, cx_oracle_type,
+ self,
+ cx_Oracle,
+ cx_oracle_type,
):
ignore_dialect = cx_oracle.dialect(
dbapi=cx_Oracle, encoding_errors="ignore"
cursor.mock_calls,
[
mock.call.var(
- mock.ANY, None, cursor.arraysize, encodingErrors="ignore",
+ mock.ANY,
+ None,
+ cursor.arraysize,
+ encodingErrors="ignore",
)
],
)
@_oracle_char_combinations
@testing.requires.python3
def test_no_encoding_errors_cx_oracle_py3k(
- self, cx_Oracle, cx_oracle_type,
+ self,
+ cx_Oracle,
+ cx_oracle_type,
):
plain_dialect = cx_oracle.dialect(dbapi=cx_Oracle)
),
{"text": "my table comment"},
)
- eq_(insp.get_table_comment("parent",), {"text": "my local comment"})
+ eq_(
+ insp.get_table_comment(
+ "parent",
+ ),
+ {"text": "my local comment"},
+ )
eq_(
insp.get_table_comment(
"parent", schema=testing.db.dialect.default_schema_name
)
self.assert_compile(
- c.any(5), "%(param_1)s = ANY (x)", checkparams={"param_1": 5},
+ c.any(5),
+ "%(param_1)s = ANY (x)",
+ checkparams={"param_1": 5},
)
self.assert_compile(
)
self.assert_compile(
- c.all(5), "%(param_1)s = ALL (x)", checkparams={"param_1": 5},
+ c.all(5),
+ "%(param_1)s = ALL (x)",
+ checkparams={"param_1": 5},
)
self.assert_compile(
class FullTextSearchTest(fixtures.TestBase, AssertsCompiledSQL):
- """Tests for full text searching
- """
+ """Tests for full text searching"""
__dialect__ = postgresql.dialect()
def _raise_query(self, q):
"""
- useful for debugging. just do...
- self._raise_query(q)
+ useful for debugging. just do...
+ self._raise_query(q)
"""
c = q.compile(dialect=postgresql.dialect())
raise ValueError(c)
@testing.provide_metadata
def test_index_reflection(self):
- """ Reflecting partial & expression-based indexes should warn
- """
+ """Reflecting partial & expression-based indexes should warn"""
metadata = self.metadata
c = "ccc"
tbl.append_column(
- Column("pyenum_col", array_cls(enum_cls(MyEnum)),),
+ Column(
+ "pyenum_col",
+ array_cls(enum_cls(MyEnum)),
+ ),
)
self.metadata.create_all(connection)
"json_table",
self.metadata,
Column("id", Integer, primary_key=True),
- Column("json_col", array_cls(json_cls),),
+ Column(
+ "json_col",
+ array_cls(json_cls),
+ ),
)
self.metadata.create_all(connection)
__only_on__ = "sqlite"
def test_boolean(self):
- """Test that the boolean only treats 1 as True
-
- """
+ """Test that the boolean only treats 1 as True"""
meta = MetaData(testing.db)
t = Table(
'true', 'false', and 'column' are undocumented reserved words
when used as column identifiers (as of 3.5.1). Covering them
here to ensure they remain in place if the dialect's
- reserved_words set is updated in the future. """
+ reserved_words set is updated in the future."""
meta = MetaData(testing.db)
t = Table(
@testing.provide_metadata
def test_quoted_identifiers_functional_two(self):
- """"test the edgiest of edge cases, quoted table/col names
+ """ "test the edgiest of edge cases, quoted table/col names
that start and end with quotes.
SQLite claims to have fixed this in
"""test the 'autocommit' flag on select() and text() objects.
Requires PostgreSQL so that we may define a custom function which
- modifies the database. """
+ modifies the database."""
__only_on__ = "postgresql"
self.assert_tables_equal(addresses, reflected_addresses)
@testing.provide_metadata
- def test_autoload_with_imply_autoload(self,):
+ def test_autoload_with_imply_autoload(
+ self,
+ ):
meta = self.metadata
t = Table(
"t",
def test_override_existing_fk(self):
"""test that you can override columns and specify new foreign
keys to other reflected tables, on columns which *do* already
- have that foreign key, and that the FK is not duped. """
+ have that foreign key, and that the FK is not duped."""
meta = self.metadata
Table(
)
if testing.requires.computed_columns_virtual.enabled:
self.check_table_column(
- table, "computed_virtual", "normal+2", False,
+ table,
+ "computed_virtual",
+ "normal+2",
+ False,
)
if testing.requires.computed_columns_stored.enabled:
self.check_table_column(
- table, "computed_stored", "normal-42", True,
+ table,
+ "computed_stored",
+ "normal-42",
+ True,
)
"""test the 'autocommit' flag on select() and text() objects.
Requires PostgreSQL so that we may define a custom function which
- modifies the database. """
+ modifies the database."""
__only_on__ = "postgresql"
)
def test_columns_single_inheritance_cascading_resolution_pk(self):
- """An additional test for #4352 in terms of the requested use case.
-
- """
+ """An additional test for #4352 in terms of the requested use case."""
class TestBase(Base):
__abstract__ = True
self._assert_raises_ambiguous(lambda: D.c_data == 5)
def test_rel_expressions_not_available(self):
- B, D, = self.classes("B", "D")
+ (
+ B,
+ D,
+ ) = self.classes("B", "D")
self._assert_raises_ambiguous(lambda: D.c_data.any(B.id == 5))
class SelectinloadRegressionTest(fixtures.DeclarativeMappedTest):
- """test #4175
- """
+ """test #4175"""
@classmethod
def setup_classes(cls):
class FixtureTest(fixtures.MappedTest):
- """A MappedTest pre-configured with a common set of fixtures.
-
- """
+ """A MappedTest pre-configured with a common set of fixtures."""
run_define_tables = "once"
run_setup_classes = "once"
)
def test_entirely_oob_assignment(self):
- """test warn on an unknown polymorphic identity.
- """
+ """test warn on an unknown polymorphic identity."""
B = self.classes.B
sess = Session()
)
def test_optimized_passes(self):
- """"test that the 'optimized load' routine doesn't crash when
+ """ "test that the 'optimized load' routine doesn't crash when
a column in the join condition is not available."""
base, sub = self.tables.base, self.tables.sub
("vlad", "Elbonia, Inc."),
]
eq_(
- q(self, sess).all(), expected,
+ q(self, sess).all(),
+ expected,
)
def test_mixed_entities_two(self):
"""this tests the RasterDocument being attached to the Assembly, but
*not* the Document. this means only a "sub-class" task, i.e.
corresponding to an inheriting mapper but not the base mapper,
- is created. """
+ is created."""
product_mapper = mapper(
Product,
class SubClassToSubClassFromParentTest(fixtures.MappedTest):
- """test #2617
-
- """
+ """test #2617"""
run_setup_classes = "once"
run_setup_mappers = "once"
def test_lazytrackparent(self):
"""test that the "hasparent" flag works properly
- when lazy loaders and backrefs are used
+ when lazy loaders and backrefs are used
"""
"""changeset: 1633 broke ability to use ORM to map classes with
unusual descriptor attributes (for example, classes that inherit
from ones implementing zope.interface.Interface). This is a
- simple regression test to prevent that defect. """
+ simple regression test to prevent that defect."""
class des(object):
def __get__(self, instance, owner):
def test_set_commited_value_none_uselist(self):
"""test that set_committed_value->None to a uselist generates an
- empty list """
+ empty list"""
class Foo(object):
pass
class PartialFlushTest(fixtures.MappedTest):
- """test cascade behavior as it relates to object lists passed to flush().
- """
+ """test cascade behavior as it relates to object lists passed to
+ flush()."""
@classmethod
def define_tables(cls, metadata):
class SelfReferentialPostUpdateTest(fixtures.MappedTest):
- """Post_update on a single self-referential mapper.
-
-
- """
+ """Post_update on a single self-referential mapper."""
@classmethod
def define_tables(cls, metadata):
class PostUpdateBatchingTest(fixtures.MappedTest):
- """test that lots of post update cols batch together into a single UPDATE.
- """
+ """test that lots of post update cols batch together into a single
+ UPDATE."""
@classmethod
def define_tables(cls, metadata):
True,
testing.requires.computed_columns_on_update_returning,
),
- ("noneagerload", False,),
+ (
+ "noneagerload",
+ False,
+ ),
id_="ia",
)
def test_update_computed(self, eager):
self.sql_count_(0, go)
def test_preserve_changes(self):
- """A deferred load operation doesn't revert modifications on attributes
- """
+ """A deferred load operation doesn't revert modifications on
+ attributes"""
orders, Order = self.tables.orders, self.classes.Order
def test_locates_col(self):
"""changed in 1.0 - we don't search for deferred cols in the result
- now. """
+ now."""
orders, Order = self.tables.orders, self.classes.Order
def test_selectload(self):
"""tests lazy loading with two relationships simultaneously,
- from the same table, using aliases. """
+ from the same table, using aliases."""
users, orders, User, Address, Order, addresses = (
self.tables.users,
def test_joinedload(self):
"""Eager loading with two relationships simultaneously,
- from the same table, using aliases."""
+ from the same table, using aliases."""
users, orders, User, Address, Order, addresses = (
self.tables.users,
def test_orderby_related(self):
"""A regular mapper select on a single table can
- order by a relationship to a second table"""
+ order by a relationship to a second table"""
Address, addresses, users, User = (
self.classes.Address,
def test_double_w_ac(self):
"""Eager loading with two relationships simultaneously,
- from the same table, using aliases."""
+ from the same table, using aliases."""
(
users,
def test_double_w_ac_against_subquery(self):
"""Eager loading with two relationships simultaneously,
- from the same table, using aliases."""
+ from the same table, using aliases."""
(
users,
class DeferredMapperEventsTest(_RemoveListeners, _fixtures.FixtureTest):
- """"test event listeners against unmapped classes.
+ """ "test event listeners against unmapped classes.
This incurs special logic. Note if we ever do the "remove" case,
it has to get all of these, too.
eq_(len(list(sess)), 9)
def test_state_change_col_to_deferred(self):
- """Behavioral test to verify the current activity of loader callables
- """
+ """Behavioral test to verify the current activity of loader
+ callables"""
users, User = self.tables.users, self.classes.User
assert "name" not in attributes.instance_state(u1).callables
def test_state_deferred_to_col(self):
- """Behavioral test to verify the current activity of loader callables
- """
+ """Behavioral test to verify the current activity of loader
+ callables"""
users, User = self.tables.users, self.classes.User
assert "name" not in attributes.instance_state(u1).callables
def test_state_noload_to_lazy(self):
- """Behavioral test to verify the current activity of loader callables
- """
+ """Behavioral test to verify the current activity of loader
+ callables"""
users, Address, addresses, User = (
self.tables.users,
)
def test_single_prop_4(self):
- Order, User, = (self.classes.Order, self.classes.User)
+ (
+ Order,
+ User,
+ ) = (self.classes.Order, self.classes.User)
sess = create_session()
oalias1 = aliased(Order)
)
def test_single_prop_5(self):
- Order, User, = (self.classes.Order, self.classes.User)
+ (
+ Order,
+ User,
+ ) = (self.classes.Order, self.classes.User)
sess = create_session()
self.assert_compile(
)
def test_single_prop_8(self):
- Order, User, = (self.classes.Order, self.classes.User)
+ (
+ Order,
+ User,
+ ) = (self.classes.Order, self.classes.User)
sess = create_session()
# same as before using an aliased() for User as well
def test_no_relationship_cascade(self):
"""test that merge doesn't interfere with a relationship()
- target that specifically doesn't include 'merge' cascade.
+ target that specifically doesn't include 'merge' cascade.
"""
Address, addresses, users, User = (
# test passive_updates=True; update user
self.assert_sql_count(testing.db, go, 1)
sess.expunge_all()
- assert User(
- username="jack",
- addresses=[Address(username="jack"), Address(username="jack")],
- ) == sess.query(User).get("jack")
+ assert (
+ User(
+ username="jack",
+ addresses=[Address(username="jack"), Address(username="jack")],
+ )
+ == sess.query(User).get("jack")
+ )
u1 = sess.query(User).get("jack")
u1.addresses = []
# test passive_updates=True; update user
self.assert_sql_count(testing.db, go, 1)
sess.expunge_all()
- assert User(
- username="jack",
- addresses=[Address(username="jack"), Address(username="jack")],
- ) == sess.query(User).get(u1.id)
+ assert (
+ User(
+ username="jack",
+ addresses=[Address(username="jack"), Address(username="jack")],
+ )
+ == sess.query(User).get(u1.id)
+ )
sess.expunge_all()
u1 = sess.query(User).get(u1.id)
)
def test_any_walias(self):
- DataContainer, Job, = (self.classes.DataContainer, self.classes.Job)
+ (
+ DataContainer,
+ Job,
+ ) = (self.classes.DataContainer, self.classes.Job)
Job_A = aliased(Job)
)
def test_join_walias(self):
- DataContainer, Job, = (self.classes.DataContainer, self.classes.Job)
+ (
+ DataContainer,
+ Job,
+ ) = (self.classes.DataContainer, self.classes.Job)
Job_A = aliased(Job)
).all()
# test that the contents are not adapted by the aliased join
- assert (
- [User(id=7), User(id=8)]
- == sess.query(User)
- .join("addresses", aliased=True)
- .filter(
- ~User.addresses.any(Address.email_address == "fred@fred.com")
- )
- .all()
- )
+ assert [User(id=7), User(id=8)] == sess.query(User).join(
+ "addresses", aliased=True
+ ).filter(
+ ~User.addresses.any(Address.email_address == "fred@fred.com")
+ ).all()
assert [User(id=10)] == sess.query(User).outerjoin(
"addresses", aliased=True
sess = create_session()
# test that any() doesn't overcorrelate
- assert (
- [User(id=7), User(id=8)]
- == sess.query(User)
- .join("addresses")
- .filter(
- ~User.addresses.any(Address.email_address == "fred@fred.com")
- )
- .all()
- )
+ assert [User(id=7), User(id=8)] == sess.query(User).join(
+ "addresses"
+ ).filter(
+ ~User.addresses.any(Address.email_address == "fred@fred.com")
+ ).all()
def test_has(self):
# see also HasAnyTest, a newer suite which tests these at the level of
Address.user.has(name="fred")
).all()
- assert (
- [Address(id=2), Address(id=3), Address(id=4), Address(id=5)]
- == sess.query(Address)
- .filter(Address.user.has(User.name.like("%ed%")))
- .order_by(Address.id)
- .all()
- )
+ assert [
+ Address(id=2),
+ Address(id=3),
+ Address(id=4),
+ Address(id=5),
+ ] == sess.query(Address).filter(
+ Address.user.has(User.name.like("%ed%"))
+ ).order_by(
+ Address.id
+ ).all()
- assert (
- [Address(id=2), Address(id=3), Address(id=4)]
- == sess.query(Address)
- .filter(Address.user.has(User.name.like("%ed%"), id=8))
- .order_by(Address.id)
- .all()
- )
+ assert [Address(id=2), Address(id=3), Address(id=4)] == sess.query(
+ Address
+ ).filter(Address.user.has(User.name.like("%ed%"), id=8)).order_by(
+ Address.id
+ ).all()
# test has() doesn't overcorrelate
- assert (
- [Address(id=2), Address(id=3), Address(id=4)]
- == sess.query(Address)
- .join("user")
- .filter(Address.user.has(User.name.like("%ed%"), id=8))
- .order_by(Address.id)
- .all()
- )
+ assert [Address(id=2), Address(id=3), Address(id=4)] == sess.query(
+ Address
+ ).join("user").filter(
+ Address.user.has(User.name.like("%ed%"), id=8)
+ ).order_by(
+ Address.id
+ ).all()
# test has() doesn't get subquery contents adapted by aliased join
- assert (
- [Address(id=2), Address(id=3), Address(id=4)]
- == sess.query(Address)
- .join("user", aliased=True)
- .filter(Address.user.has(User.name.like("%ed%"), id=8))
- .order_by(Address.id)
- .all()
- )
+ assert [Address(id=2), Address(id=3), Address(id=4)] == sess.query(
+ Address
+ ).join("user", aliased=True).filter(
+ Address.user.has(User.name.like("%ed%"), id=8)
+ ).order_by(
+ Address.id
+ ).all()
dingaling = sess.query(Dingaling).get(2)
assert [User(id=9)] == sess.query(User).filter(
def test_union_literal_expressions_compile(self):
"""test that column expressions translate during
- the _from_statement() portion of union(), others"""
+ the _from_statement() portion of union(), others"""
User = self.classes.User
User, Address = self.classes.User, self.classes.Address
sess = create_session()
- assert (
- [User(name="ed", id=8)]
- == sess.query(User)
- .order_by(User.id)
- .group_by(User)
- .join("addresses")
- .having(func.count(Address.id) > 2)
- .all()
- )
+ assert [User(name="ed", id=8)] == sess.query(User).order_by(
+ User.id
+ ).group_by(User).join("addresses").having(
+ func.count(Address.id) > 2
+ ).all()
- assert (
- [User(name="jack", id=7), User(name="fred", id=9)]
- == sess.query(User)
- .order_by(User.id)
- .group_by(User)
- .join("addresses")
- .having(func.count(Address.id) < 2)
- .all()
- )
+ assert [
+ User(name="jack", id=7),
+ User(name="fred", id=9),
+ ] == sess.query(User).order_by(User.id).group_by(User).join(
+ "addresses"
+ ).having(
+ func.count(Address.id) < 2
+ ).all()
class ExistsTest(QueryTest, AssertsCompiledSQL):
class TypeMatchTest(fixtures.MappedTest):
"""test errors raised when trying to add items
- whose type is not handled by a relationship"""
+ whose type is not handled by a relationship"""
@classmethod
def define_tables(cls, metadata):
return
mapper(
- A, self.tables.t1, properties={"bs": rel()},
+ A,
+ self.tables.t1,
+ properties={"bs": rel()},
)
mapper(B, self.tables.t2)
class FunctionAsPrimaryJoinTest(fixtures.DeclarativeMappedTest):
- """test :ticket:`3831`
-
- """
+ """test :ticket:`3831`"""
__only_on__ = "sqlite"
def test_orderby_related(self):
"""A regular mapper select on a single table can
- order by a relationship to a second table"""
+ order by a relationship to a second table"""
Address, addresses, users, User = (
self.classes.Address,
def test_orderby_related(self):
"""A regular mapper select on a single table can
- order by a relationship to a second table"""
+ order by a relationship to a second table"""
Address, addresses, users, User = (
self.classes.Address,
)
assert_raises_message(
- sa.exc.InvalidRequestError, message, s.flush,
+ sa.exc.InvalidRequestError,
+ message,
+ s.flush,
)
else:
s.flush()
@property
def cross_schema_fk_reflection(self):
- """target system must support reflection of inter-schema foreign keys
- """
+ """target system must support reflection of inter-schema foreign
+ keys"""
return only_on(["postgresql", "mysql", "mssql"])
@property
def implicit_default_schema(self):
"""target system has a strong concept of 'default' schema that can
- be referred to implicitly.
+ be referred to implicitly.
- basically, PostgreSQL.
+ basically, PostgreSQL.
"""
return only_on(["postgresql"])
@property
def symbol_names_w_double_quote(self):
- """Target driver can create tables with a name like 'some " table'
-
- """
+ """Target driver can create tables with a name like 'some " table'"""
return skip_if(
[no_support("oracle", "ORA-03001: unimplemented feature")]
@property
def emulated_lastrowid(self):
- """"target dialect retrieves cursor.lastrowid or an equivalent
+ """ "target dialect retrieves cursor.lastrowid or an equivalent
after an insert() construct executes.
"""
return fails_on_everything_except(
@property
def dbapi_lastrowid(self):
- """"target backend includes a 'lastrowid' accessor on the DBAPI
+ """ "target backend includes a 'lastrowid' accessor on the DBAPI
cursor object.
"""
eq_(s.positiontup, ["a", "b", "c"])
def test_nested_label_targeting(self):
- """test nested anonymous label generation.
-
- """
+ """test nested anonymous label generation."""
s1 = table1.select()
s2 = s1.alias()
s3 = select([s2], use_labels=True)
def _test_binds_no_hash_collision(self):
"""test that construct_params doesn't corrupt dict
- due to hash collisions"""
+ due to hash collisions"""
total_params = 100000
{
"anotherid": (
"anotherid",
- (t1.c.anotherid, "anotherid", "anotherid",),
+ (
+ t1.c.anotherid,
+ "anotherid",
+ "anotherid",
+ ),
t1.c.anotherid.type,
)
},
)
def test_recursive_union_alias_two(self):
- """
-
- """
+ """"""
# I know, this is the PG VALUES keyword,
# we're cheating here. also yes we need the SELECT,
Column("boolcol1", sa.Boolean, default=True),
Column("boolcol2", sa.Boolean, default=False),
# python function which uses ExecutionContext
- Column("col7", Integer, default=lambda: 5, onupdate=lambda: 10,),
+ Column(
+ "col7",
+ Integer,
+ default=lambda: 5,
+ onupdate=lambda: 10,
+ ),
# python builtin
Column(
"col8",
"Column('foo', Integer(), table=None, primary_key=True, "
"nullable=False, onupdate=%s, default=%s, server_default=%s, "
"comment='foo')"
- % (ColumnDefault(1), ColumnDefault(42), DefaultClause("42"),),
+ % (
+ ColumnDefault(1),
+ ColumnDefault(42),
+ DefaultClause("42"),
+ ),
),
(
Table("bar", MetaData(), Column("x", String)),
@classmethod
def check_dialect_options_(cls, t):
eq_(
- t.dialect_kwargs["copydialectoptionstest_some_table_arg"], "a1",
+ t.dialect_kwargs["copydialectoptionstest_some_table_arg"],
+ "a1",
)
eq_(
t.c.foo.dialect_kwargs["copydialectoptionstest_some_column_arg"],
copydialectoptionstest_some_table_arg="a1",
)
Index(
- "idx", t1.c.foo, copydialectoptionstest_some_index_arg="a4",
+ "idx",
+ t1.c.foo,
+ copydialectoptionstest_some_index_arg="a4",
)
self.check_dialect_options_(t1)
class ConjunctionTest(fixtures.TestBase, testing.AssertsCompiledSQL):
- """test interaction of and_()/or_() with boolean , null constants
- """
+ """test interaction of and_()/or_() with boolean , null constants"""
__dialect__ = default.DefaultDialect(supports_native_boolean=True)
@classmethod
def insert_data(cls, connection):
connection.execute(
- cls.tables.text1.insert(), [dict(a="a1", b="b1", c="c1", d="d1")],
+ cls.tables.text1.insert(),
+ [dict(a="a1", b="b1", c="c1", d="d1")],
)
def test_via_column(self):
def test_explicit_optional(self):
"""test dialect executes a Sequence, returns nextval, whether
- or not "optional" is set """
+ or not "optional" is set"""
s = Sequence("my_sequence", optional=True)
self._assert_seq_result(s.execute(testing.db))
def test_func_implicit_connectionless_execute(self):
"""test func.next_value().execute()/.scalar() works
- with connectionless execution. """
+ with connectionless execution."""
s = Sequence("my_sequence", metadata=MetaData(testing.db))
self._assert_seq_result(s.next_value().execute().scalar())
def test_execute_optional(self, connection):
"""test dialect executes a Sequence, returns nextval, whether
- or not "optional" is set """
+ or not "optional" is set"""
s = Sequence("my_sequence", optional=True)
self._assert_seq_result(connection.execute(s))
def test_execute_next_value(self, connection):
"""test func.next_value().execute()/.scalar() works
- with connectionless execution. """
+ with connectionless execution."""
s = Sequence("my_sequence")
self._assert_seq_result(connection.scalar(s.next_value()))
def test_execute_optional_next_value(self, connection):
"""test func.next_value().execute()/.scalar() works
- with connectionless execution. """
+ with connectionless execution."""
s = Sequence("my_sequence", optional=True)
self._assert_seq_result(connection.scalar(s.next_value()))
)
eq_(
- connection.execute("select id from t_seq_test_2").scalar(), 1,
+ connection.execute("select id from t_seq_test_2").scalar(),
+ 1,
)
def test_default_core_server_only(self, connection):