It's better, the majority of these changes look more readable to me.
also found some docstrings that had formatting / quoting issues.
Change-Id: I582a45fde3a5648b2f36bab96bad56881321899b
# See https://pre-commit.com/hooks.html for more hooks
repos:
- repo: https://github.com/python/black
- rev: 19.10b0
+ rev: 20.8b1
hooks:
- id: black
"""Main program function."""
engine = create_async_engine(
- "postgresql+asyncpg://scott:tiger@localhost/test", echo=True,
+ "postgresql+asyncpg://scott:tiger@localhost/test",
+ echo=True,
)
async with engine.begin() as conn:
async def async_main():
# engine is an instance of AsyncEngine
engine = create_async_engine(
- "postgresql+asyncpg://scott:tiger@localhost/test", echo=True,
+ "postgresql+asyncpg://scott:tiger@localhost/test",
+ echo=True,
)
# conn is an instance of AsyncConnection
"""Main program function."""
engine = create_async_engine(
- "postgresql+asyncpg://scott:tiger@localhost/test", echo=True,
+ "postgresql+asyncpg://scott:tiger@localhost/test",
+ echo=True,
)
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
class RelationshipCache(FromCache):
"""Specifies that a Query as called within a "lazy load"
- should load results from a cache."""
+ should load results from a cache."""
propagate_to_loaders = True
e1.data = "e2"
session.commit()
-assert session.query(
- Example.id,
- Example.version_id,
- Example.is_current_version,
- Example.calc_is_current_version,
- Example.data,
-).order_by(Example.id, Example.version_id).all() == (
- [(1, 1, False, False, "e1"), (1, 2, True, True, "e2")]
+assert (
+ session.query(
+ Example.id,
+ Example.version_id,
+ Example.is_current_version,
+ Example.calc_is_current_version,
+ Example.data,
+ )
+ .order_by(Example.id, Example.version_id)
+ .all()
+ == ([(1, 1, False, False, "e1"), (1, 2, True, True, "e2")])
)
# example 2, versioning with a parent
assert p1.child_id == 1
assert p1.child.version_id == 2
-assert session.query(
- Child.id,
- Child.version_id,
- Child.is_current_version,
- Child.calc_is_current_version,
- Child.data,
-).order_by(Child.id, Child.version_id).all() == (
- [(1, 1, False, False, "c1"), (1, 2, True, True, "c2")]
+assert (
+ session.query(
+ Child.id,
+ Child.version_id,
+ Child.is_current_version,
+ Child.calc_is_current_version,
+ Child.data,
+ )
+ .order_by(Child.id, Child.version_id)
+ .all()
+ == ([(1, 1, False, False, "c1"), (1, 2, True, True, "c2")])
)
@value.comparator
class value(PropComparator):
- """A comparator for .value, builds a polymorphic comparison via CASE.
-
- """
+ """A comparator for .value, builds a polymorphic comparison
+ via CASE."""
def __init__(self, cls):
self.cls = cls
@classmethod
def _load_mx_exceptions(cls):
- """ Import mxODBC exception classes into the module namespace,
+ """Import mxODBC exception classes into the module namespace,
as if they had been imported normally. This is done here
to avoid requiring all SQLAlchemy users to install mxODBC.
"""
return connect
def _error_handler(self):
- """ Return a handler that adjusts mxODBC's raised Warnings to
+ """Return a handler that adjusts mxODBC's raised Warnings to
emit Python standard warnings.
"""
from mx.ODBC.Error import Warning as MxOdbcWarning
self.__zero_date, value.time()
)
elif isinstance(value, datetime.time):
- """ issue #5339
+ """issue #5339
per: https://github.com/mkleehammer/pyodbc/wiki/Tips-and-Tricks-by-Database-Platform#time-columns
pass TIME value as string
""" # noqa
class TryCast(sql.elements.Cast):
- """Represent a SQL Server TRY_CAST expression.
-
- """
+ """Represent a SQL Server TRY_CAST expression."""
__visit_name__ = "try_cast"
elif (
self.isinsert or self.isupdate or self.isdelete
) and self.compiled.returning:
- self.cursor_fetch_strategy = _cursor.FullyBufferedCursorFetchStrategy( # noqa
- self.cursor, self.cursor.description, self.cursor.fetchall()
+ self.cursor_fetch_strategy = (
+ _cursor.FullyBufferedCursorFetchStrategy(
+ self.cursor,
+ self.cursor.description,
+ self.cursor.fetchall(),
+ )
)
if self._enable_identity_insert:
return text
def limit_clause(self, select, **kw):
- """ MSSQL 2012 supports OFFSET/FETCH operators
- Use it instead subquery with row_number
+ """MSSQL 2012 supports OFFSET/FETCH operators
+ Use it instead subquery with row_number
"""
class _MSNumeric_mxodbc(_MSNumeric_pyodbc):
- """Include pyodbc's numeric processor.
- """
+ """Include pyodbc's numeric processor."""
class _MSDate_mxodbc(_MSDate):
class _cymysqlBIT(BIT):
def result_processor(self, dialect, coltype):
- """Convert a MySQL's 64 bit, variable length binary string to a long.
- """
+ """Convert MySQL's 64 bit, variable length binary string to a long."""
def process(value):
if value is not None:
driver_cls = getattr(driver_mod, driver).dialect
return type(
- "MariaDBDialect_%s" % driver, (MariaDBDialect, driver_cls,), {}
+ "MariaDBDialect_%s" % driver,
+ (
+ MariaDBDialect,
+ driver_cls,
+ ),
+ {},
)
class TIMESTAMP(sqltypes.TIMESTAMP):
- """MySQL TIMESTAMP type.
-
- """
+ """MySQL TIMESTAMP type."""
__visit_name__ = "TIMESTAMP"
class DATETIME(sqltypes.DATETIME):
- """MySQL DATETIME type.
-
- """
+ """MySQL DATETIME type."""
__visit_name__ = "DATETIME"
# allow all strings to come back natively as Unicode
elif (
dialect.coerce_to_unicode
- and default_type in (cx_Oracle.STRING, cx_Oracle.FIXED_CHAR,)
+ and default_type
+ in (
+ cx_Oracle.STRING,
+ cx_Oracle.FIXED_CHAR,
+ )
and default_type is not cx_Oracle.CLOB
and default_type is not cx_Oracle.NCLOB
):
cx_Oracle.BLOB,
):
return cursor.var(
- cx_Oracle.LONG_BINARY, size, cursor.arraysize,
+ cx_Oracle.LONG_BINARY,
+ size,
+ cursor.arraysize,
)
return output_type_handler
if async_fallback:
return AsyncAdaptFallback_asyncpg_connection(
- self, await_fallback(self.asyncpg.connect(*arg, **kw)),
+ self,
+ await_fallback(self.asyncpg.connect(*arg, **kw)),
)
else:
return AsyncAdapt_asyncpg_connection(
- self, await_only(self.asyncpg.connect(*arg, **kw)),
+ self,
+ await_only(self.asyncpg.connect(*arg, **kw)),
)
class Error(Exception):
class INTERVAL(sqltypes.NativeForEmulated, sqltypes._AbstractInterval):
- """PostgreSQL INTERVAL type.
-
- """
+ """PostgreSQL INTERVAL type."""
__visit_name__ = "INTERVAL"
native = True
as Python uuid objects, converting to/from string via the
DBAPI.
- """
+ """
self.as_uuid = as_uuid
def coerce_compared_value(self, op, value):
type_=sqltypes.Unicode,
),
sql.bindparam(
- "schema", util.text_type(schema), type_=sqltypes.Unicode,
+ "schema",
+ util.text_type(schema),
+ type_=sqltypes.Unicode,
),
)
)
"n.nspname=:schema"
).bindparams(
sql.bindparam(
- "schema", util.text_type(schema), type_=sqltypes.Unicode,
+ "schema",
+ util.text_type(schema),
+ type_=sqltypes.Unicode,
),
)
)
return self.operate(HAS_KEY, other, result_type=sqltypes.Boolean)
def has_all(self, other):
- """Boolean expression. Test for presence of all keys in jsonb
- """
+ """Boolean expression. Test for presence of all keys in jsonb"""
return self.operate(HAS_ALL, other, result_type=sqltypes.Boolean)
def has_any(self, other):
- """Boolean expression. Test for presence of any key in jsonb
- """
+ """Boolean expression. Test for presence of any key in jsonb"""
return self.operate(HAS_ANY, other, result_type=sqltypes.Boolean)
def contains(self, other, **kwargs):
.. versionadded:: 1.1
- """
+ """
super(JSON, self).__init__(none_as_null=none_as_null)
if astext_type is not None:
self.astext_type = astext_type
return self.operate(HAS_KEY, other, result_type=sqltypes.Boolean)
def has_all(self, other):
- """Boolean expression. Test for presence of all keys in jsonb
- """
+ """Boolean expression. Test for presence of all keys in jsonb"""
return self.operate(HAS_ALL, other, result_type=sqltypes.Boolean)
def has_any(self, other):
- """Boolean expression. Test for presence of any key in jsonb
- """
+ """Boolean expression. Test for presence of any key in jsonb"""
return self.operate(HAS_ANY, other, result_type=sqltypes.Boolean)
def contains(self, other, **kwargs):
class INT4RANGE(RangeOperators, sqltypes.TypeEngine):
- """Represent the PostgreSQL INT4RANGE type.
-
- """
+ """Represent the PostgreSQL INT4RANGE type."""
__visit_name__ = "INT4RANGE"
class INT8RANGE(RangeOperators, sqltypes.TypeEngine):
- """Represent the PostgreSQL INT8RANGE type.
-
- """
+ """Represent the PostgreSQL INT8RANGE type."""
__visit_name__ = "INT8RANGE"
class NUMRANGE(RangeOperators, sqltypes.TypeEngine):
- """Represent the PostgreSQL NUMRANGE type.
-
- """
+ """Represent the PostgreSQL NUMRANGE type."""
__visit_name__ = "NUMRANGE"
class DATERANGE(RangeOperators, sqltypes.TypeEngine):
- """Represent the PostgreSQL DATERANGE type.
-
- """
+ """Represent the PostgreSQL DATERANGE type."""
__visit_name__ = "DATERANGE"
class TSRANGE(RangeOperators, sqltypes.TypeEngine):
- """Represent the PostgreSQL TSRANGE type.
-
- """
+ """Represent the PostgreSQL TSRANGE type."""
__visit_name__ = "TSRANGE"
class TSTZRANGE(RangeOperators, sqltypes.TypeEngine):
- """Represent the PostgreSQL TSTZRANGE type.
-
- """
+ """Represent the PostgreSQL TSTZRANGE type."""
__visit_name__ = "TSTZRANGE"
dbapi_connection = connection
dbapi_connection.create_function(
- "regexp", 2, regexp,
+ "regexp",
+ 2,
+ regexp,
)
fns = [set_regexp]
_dispatch=None,
_has_events=None,
):
- """Construct a new Connection.
-
- """
+ """Construct a new Connection."""
self.engine = engine
self.dialect = engine.dialect
self.__branch_from = _branch_from
return c
def get_execution_options(self):
- """ Get the non-SQL options which will take effect during execution.
+ """Get the non-SQL options which will take effect during execution.
.. versionadded:: 1.3
for fn in self.dispatch.before_execute:
elem, event_multiparams, event_params = fn(
- self, elem, event_multiparams, event_params, execution_options,
+ self,
+ elem,
+ event_multiparams,
+ event_params,
+ execution_options,
)
if event_multiparams:
assert not self.is_active
def rollback(self):
- """Roll back this :class:`.Transaction`.
-
- """
+ """Roll back this :class:`.Transaction`."""
try:
self._do_rollback()
finally:
return self._option_cls(self, opt)
def get_execution_options(self):
- """ Get the non-SQL options which will take effect during execution.
+ """Get the non-SQL options which will take effect during execution.
.. versionadded: 1.3
@classmethod
def create(cls, result):
return BufferedRowCursorFetchStrategy(
- result.cursor, result.context.execution_options,
+ result.cursor,
+ result.context.execution_options,
)
def _buffer_rows(self, result, dbapi_cursor):
class BaseCursorResult(object):
- """Base class for database result objects.
-
- """
+ """Base class for database result objects."""
out_parameters = None
_metadata = None
:param dbapi_connection: a DBAPI connection, typically
proxied within a :class:`.ConnectionFairy`.
- """
+ """
raise NotImplementedError()
:param dbapi_connection: a DBAPI connection, typically
proxied within a :class:`.ConnectionFairy`.
- """
+ """
raise NotImplementedError()
return self.dialect.default_schema_name
def get_schema_names(self):
- """Return all schema names.
- """
+ """Return all schema names."""
if hasattr(self.dialect, "get_schema_names"):
with self._operation_context() as conn:
if extra:
recs_names = [
- ((name,) + extras, (index, name, extras),)
+ (
+ (name,) + extras,
+ (index, name, extras),
+ )
for index, (name, extras) in enumerate(zip(self._keys, extra))
]
else:
rows = [
made_row
for made_row, sig_row in [
- (made_row, strategy(made_row) if strategy else made_row,)
+ (
+ made_row,
+ strategy(made_row) if strategy else made_row,
+ )
for made_row in made_rows
]
if sig_row not in uniques and not uniques.add(sig_row)
return manyrows
def _only_one_row(
- self, raise_for_second_row, raise_for_none, scalar,
+ self,
+ raise_for_second_row,
+ raise_for_none,
+ scalar,
):
onerow = self._fetchone_impl
def columns(self, *col_expressions):
# type: (*object) -> MappingResult
- r"""Establish the columns that should be returned in each row.
-
-
- """
+ r"""Establish the columns that should be returned in each row."""
return self._column_slices(col_expressions)
def partitions(self, size=None):
:class:`_engine.URL`, use the :meth:`_engine.URL.set` and
:meth:`_engine.URL.update_query` methods.
- """
+ """
return cls(
cls._assert_str(drivername, "drivername"),
return util.immutabledict(
{
- _assert_str(key): _assert_value(value,)
+ _assert_str(key): _assert_value(
+ value,
+ )
for key, value in dict_items
}
)
def contains(target, identifier, fn):
- """Return True if the given target/ident/fn is set up to listen.
-
- """
+ """Return True if the given target/ident/fn is set up to listen."""
return _event_key(target, identifier, fn).contains()
def _update(self, other, only_propagate=True):
"""Populate from the listeners in another :class:`_Dispatch`
- object."""
+ object."""
existing_listeners = self.listeners
existing_listener_set = set(existing_listeners)
def _update(self, other, only_propagate=True):
"""Populate from the listeners in another :class:`_Dispatch`
- object."""
+ object."""
for ls in other._event_descriptors:
if isinstance(ls, _EmptyListener):
continue
class _EventKey(object):
- """Represent :func:`.listen` arguments.
- """
+ """Represent :func:`.listen` arguments."""
__slots__ = (
"target",
collection.remove(self.with_wrapper(listener_fn))
def contains(self):
- """Return True if this event key is registered to listen.
- """
+ """Return True if this event key is registered to listen."""
return self._key in _key_to_collection
def base_listen(
else:
return (
"(Background on this error at: "
- "http://sqlalche.me/e/%s/%s)" % (_version_token, self.code,)
+ "http://sqlalche.me/e/%s/%s)"
+ % (
+ _version_token,
+ self.code,
+ )
)
def _message(self, as_unicode=compat.py3k):
class ObjectAssociationProxyInstance(AssociationProxyInstance):
- """an :class:`.AssociationProxyInstance` that has an object as a target.
- """
+ """an :class:`.AssociationProxyInstance` that has an object as a target."""
_target_is_object = True
_is_canonical = True
return self.sync_connection
def begin(self) -> "AsyncTransaction":
- """Begin a transaction prior to autobegin occurring.
-
- """
+ """Begin a transaction prior to autobegin occurring."""
self._sync_connection()
return AsyncTransaction(self)
def begin_nested(self) -> "AsyncTransaction":
- """Begin a nested transaction and return a transaction handle.
-
- """
+ """Begin a nested transaction and return a transaction handle."""
self._sync_connection()
return AsyncTransaction(self, nested=True)
conn = self._sync_connection()
result = await greenlet_spawn(
- conn.exec_driver_sql, statement, parameters, execution_options,
+ conn.exec_driver_sql,
+ statement,
+ parameters,
+ execution_options,
)
if result.context._is_server_side:
raise async_exc.AsyncMethodRequired(
conn = self._sync_connection()
result = await greenlet_spawn(
- conn._execute_20, statement, parameters, execution_options,
+ conn._execute_20,
+ statement,
+ parameters,
+ execution_options,
)
if result.context._is_server_side:
raise async_exc.AsyncMethodRequired(
return result.scalar()
async def run_sync(self, fn: Callable, *arg, **kw) -> Any:
- """"Invoke the given sync callable passing self as the first argument.
+ """Invoke the given sync callable passing self as the first argument.
This method maintains the asyncio event loop all the way through
to the database connection by running the given callable in a
await greenlet_spawn(self._sync_transaction().close)
async def rollback(self):
- """Roll back this :class:`.Transaction`.
-
- """
+ """Roll back this :class:`.Transaction`."""
await greenlet_spawn(self._sync_transaction().rollback)
async def commit(self):
def columns(self, *col_expressions):
# type: (*object) -> AsyncMappingResult
- r"""Establish the columns that should be returned in each row.
-
-
- """
+ r"""Establish the columns that should be returned in each row."""
return self._column_slices(col_expressions)
async def partitions(self, size=None):
return self.sync_transaction
async def rollback(self):
- """Roll back this :class:`_asyncio.AsyncTransaction`.
-
- """
+ """Roll back this :class:`_asyncio.AsyncTransaction`."""
await greenlet_spawn(self._sync_transaction().rollback)
async def commit(self):
return self._cache_key + (session._query_cls,)
def _with_lazyload_options(self, options, effective_path, cache_path=None):
- """Cloning version of _add_lazyload_options.
- """
+ """Cloning version of _add_lazyload_options."""
q = self._clone()
q._add_lazyload_options(options, effective_path, cache_path=cache_path)
return q
:ref:`hybrid_reuse_subclass`
- """
+ """
return self
def getter(self, fget):
def _reconstitute(cls, dict_, items):
- """ Reconstitute an :class:`.OrderingList`.
+ """Reconstitute an :class:`.OrderingList`.
This is the adjoint to :meth:`.OrderingList.__reduce__`. It is used for
unpickling :class:`.OrderingList` objects.
:class:`sqlalchemy.exc.NoInspectionAvailable`
is raised. If ``False``, ``None`` is returned.
- """
+ """
type_ = type(subject)
for cls in type_.__mro__:
if cls in _registrars:
@_sa_util.deprecated_20("relation", "Please use :func:`.relationship`.")
def relation(*arg, **kw):
- """A synonym for :func:`relationship`.
-
- """
+ """A synonym for :func:`relationship`."""
return relationship(*arg, **kw)
class ScalarObjectAttributeImpl(ScalarAttributeImpl):
"""represents a scalar-holding InstrumentedAttribute,
- where the target object is also instrumented.
+ where the target object is also instrumented.
- Adds events to delete/set operations.
+ Adds events to delete/set operations.
"""
check_old=None,
pop=False,
):
- """Set a value on the given InstanceState.
-
- """
+ """Set a value on the given InstanceState."""
if self.dispatch._active_history:
old = self.get(
state,
# figure out the final "left" and "right" sides and create an
# ORMJoin to add to our _from_obj tuple
self._join_left_to_right(
- left, right, onclause, prop, False, False, isouter, full,
+ left,
+ right,
+ onclause,
+ prop,
+ False,
+ False,
+ isouter,
+ full,
)
def _legacy_join(self, args):
self._mapper_loads_polymorphically_with(
right_mapper,
sql_util.ColumnAdapter(
- right_mapper.selectable, right_mapper._equivalent_columns,
+ right_mapper.selectable,
+ right_mapper._equivalent_columns,
),
)
# if the onclause is a ClauseElement, adapt it with any
)
def __init__(
- self, compile_state, column, parententity, parent_bundle=None,
+ self,
+ compile_state,
+ column,
+ parententity,
+ parent_bundle=None,
):
annotations = column._annotations
metadata=metadata,
class_registry=class_registry,
constructor=constructor,
- ).generate_base(mapper=mapper, cls=cls, name=name, metaclass=metaclass,)
+ ).generate_base(
+ mapper=mapper,
+ cls=cls,
+ name=name,
+ metaclass=metaclass,
+ )
class registry(object):
clsregistry.remove_class(cls.__name__, cls, self._class_registry)
def generate_base(
- self, mapper=None, cls=object, name="Base", metaclass=DeclarativeMeta,
+ self,
+ mapper=None,
+ cls=object,
+ name="Base",
+ metaclass=DeclarativeMeta,
):
"""Generate a declarative base class.
__slots__ = ("dict_", "local_table", "inherits")
def __init__(
- self, registry, cls_, table, mapper_kw,
+ self,
+ registry,
+ cls_,
+ table,
+ mapper_kw,
):
super(_ImperativeMapperConfig, self).__init__(registry, cls_)
mapper_cls = mapper
return self.set_cls_attribute(
- "__mapper__", mapper_cls(self.cls, self.local_table, **mapper_kw),
+ "__mapper__",
+ mapper_cls(self.cls, self.local_table, **mapper_kw),
)
def _setup_inheritance(self, mapper_kw):
)
def __init__(
- self, registry, cls_, dict_, table, mapper_kw,
+ self,
+ registry,
+ cls_,
+ dict_,
+ table,
+ mapper_kw,
):
super(_ClassScanMapperConfig, self).__init__(registry, cls_)
class DescriptorProperty(MapperProperty):
""":class:`.MapperProperty` which proxies access to a
- user-defined descriptor."""
+ user-defined descriptor."""
doc = None
iterator = (
(item,)
for item in self.attr._get_collection_history(
- state, attributes.PASSIVE_NO_INITIALIZE,
+ state,
+ attributes.PASSIVE_NO_INITIALIZE,
).added_items
)
row_metadata = _result.SimpleResultMetaData(
- (self.mapper.class_.__name__,), [], _unique_filters=[id],
+ (self.mapper.class_.__name__,),
+ [],
+ _unique_filters=[id],
)
return _result.IteratorResult(row_metadata, iterator).scalars()
_polymorphic_from=None,
):
"""Produce a mapper level row processor callable
- which processes rows into mapped instances."""
+ which processes rows into mapped instances."""
# note that this method, most of which exists in a closure
# called _instance(), resists being broken out, as
class PostLoad(object):
- """Track loaders and states for "post load" operations.
-
- """
+ """Track loaders and states for "post load" operations."""
__slots__ = "loaders", "states", "load_keys"
@classmethod
def _configure_all(cls):
- """Class-level path to the :func:`.configure_mappers` call.
- """
+ """Class-level path to the :func:`.configure_mappers` call."""
configure_mappers()
def dispose(self):
@util.preload_module("sqlalchemy.orm.descriptor_props")
def _property_from_column(self, key, prop):
"""generate/update a :class:`.ColumnProperty` given a
- :class:`_schema.Column` object. """
+ :class:`_schema.Column` object."""
descriptor_props = util.preloaded.orm_descriptor_props
# we were passed a Column or a list of Columns;
# generate a properties.ColumnProperty
return key in self._props
def get_property(self, key, _configure_mappers=True):
- """return a MapperProperty associated with the given key.
- """
+ """return a MapperProperty associated with the given key."""
if _configure_mappers and Mapper._new_mappers:
configure_mappers()
c.returned_defaults_rows or (),
):
for pk, col in zip(
- inserted_primary_key, mapper._pks_by_table[table],
+ inserted_primary_key,
+ mapper._pks_by_table[table],
):
prop = mapper_rec._columntoproperty[col]
if state_dict.get(prop.key) is None:
session.identity_map[identity_key]
for identity_key in [
target_mapper.identity_key_from_primary_key(
- list(primary_key), identity_token=identity_token,
+ list(primary_key),
+ identity_token=identity_token,
)
for primary_key, identity_token in [
(row[0:-1], row[-1]) for row in matched_rows
# TODO: inline this and call remove_newly_deleted
# once
identity_key = target_mapper.identity_key_from_primary_key(
- list(primary_key), identity_token=identity_token,
+ list(primary_key),
+ identity_token=identity_token,
)
if identity_key in session.identity_map:
session._remove_newly_deleted(
return stmt
def subquery(
- self, name=None, with_labels=False, reduce_columns=False,
+ self,
+ name=None,
+ with_labels=False,
+ reduce_columns=False,
):
"""Return the full SELECT statement represented by
this :class:`_query.Query`, embedded within an
return fn(self)
def get_execution_options(self):
- """ Get the non-SQL options which will take effect during execution.
+ """Get the non-SQL options which will take effect during execution.
.. versionadded:: 1.3
@_generative
def execution_options(self, **kwargs):
- """ Set non-SQL options which take effect during execution.
+ """Set non-SQL options which take effect during execution.
Options allowed here include all of those accepted by
:meth:`_engine.Connection.execution_options`, as well as a series
"""
- bulk_del = BulkDelete(self,)
+ bulk_del = BulkDelete(
+ self,
+ )
if self.dispatch.before_compile_delete:
for fn in self.dispatch.before_compile_delete:
new_query = fn(bulk_del.query, bulk_del)
class _ColInAnnotations(object):
- """Seralizable object that tests for a name in c._annotations.
-
- """
+ """Seralizable object that tests for a name in c._annotations."""
__slots__ = ("name",)
def _state_session(state):
"""Given an :class:`.InstanceState`, return the :class:`.Session`
- associated, if any.
+ associated, if any.
"""
if state.session_id:
try:
_rollback_exception = None
def __init__(
- self, session, parent=None, nested=False, autobegin=False,
+ self,
+ session,
+ parent=None,
+ nested=False,
+ autobegin=False,
):
self.session = session
self._connections = {}
except sa_exc.NoInspectionAvailable as err:
if isinstance(mapper, type):
util.raise_(
- exc.UnmappedClassError(mapper), replace_context=err,
+ exc.UnmappedClassError(mapper),
+ replace_context=err,
)
else:
raise
state = attributes.instance_state(instance)
except exc.NO_STATE as err:
util.raise_(
- exc.UnmappedInstanceError(instance), replace_context=err,
+ exc.UnmappedInstanceError(instance),
+ replace_context=err,
)
self._expire_state(state, attribute_names)
state = attributes.instance_state(instance)
except exc.NO_STATE as err:
util.raise_(
- exc.UnmappedInstanceError(instance), replace_context=err,
+ exc.UnmappedInstanceError(instance),
+ replace_context=err,
)
self._expire_state(state, attribute_names)
state = attributes.instance_state(instance)
except exc.NO_STATE as err:
util.raise_(
- exc.UnmappedInstanceError(instance), replace_context=err,
+ exc.UnmappedInstanceError(instance),
+ replace_context=err,
)
if state.session_id is not self.hash_key:
raise sa_exc.InvalidRequestError(
state = attributes.instance_state(instance)
except exc.NO_STATE as err:
util.raise_(
- exc.UnmappedInstanceError(instance), replace_context=err,
+ exc.UnmappedInstanceError(instance),
+ replace_context=err,
)
self._save_or_update_state(state)
state = attributes.instance_state(instance)
except exc.NO_STATE as err:
util.raise_(
- exc.UnmappedInstanceError(instance), replace_context=err,
+ exc.UnmappedInstanceError(instance),
+ replace_context=err,
)
self._delete_impl(state, instance, head=True)
if execution_options:
statement = statement.execution_options(**execution_options)
return db_load_fn(
- self, statement, primary_key_identity, load_options=load_options,
+ self,
+ statement,
+ primary_key_identity,
+ load_options=load_options,
)
def merge(self, instance, load=True):
state = attributes.instance_state(instance)
except exc.NO_STATE as err:
util.raise_(
- exc.UnmappedInstanceError(instance), replace_context=err,
+ exc.UnmappedInstanceError(instance),
+ replace_context=err,
)
return self._contains_state(state)
except exc.NO_STATE as err:
util.raise_(
- exc.UnmappedInstanceError(o), replace_context=err,
+ exc.UnmappedInstanceError(o),
+ replace_context=err,
)
objset.add(state)
else:
state = attributes.instance_state(instance)
except exc.NO_STATE as err:
util.raise_(
- exc.UnmappedInstanceError(instance), replace_context=err,
+ exc.UnmappedInstanceError(instance),
+ replace_context=err,
)
else:
return _state_session(state)
:ref:`session_object_states`
- """
+ """
return self.key is not None and self._attached and not self._deleted
@property
def _reset(self, dict_, key):
"""Remove the given attribute and any
- callables associated with it."""
+ callables associated with it."""
old = dict_.pop(key, None)
if old is not None and self.manager[key].impl.collection:
self._load()
def _setup_query_from_rowproc(
- self, context, path, entity, loadopt, adapter,
+ self,
+ context,
+ path,
+ entity,
+ loadopt,
+ adapter,
):
compile_state = context.compile_state
if (
return
subq = self._setup_query_from_rowproc(
- context, path, path[-1], loadopt, adapter,
+ context,
+ path,
+ path[-1],
+ loadopt,
+ adapter,
)
if subq is None:
prop.mapper, None
)
path.set(
- target_attributes, "user_defined_eager_row_processor", adapter,
+ target_attributes,
+ "user_defined_eager_row_processor",
+ adapter,
)
return adapter
if alias is None:
alias = mapper._with_polymorphic_selectable._anonymous_fromclause(
- name=name, flat=flat,
+ name=name,
+ flat=flat,
)
self._aliased_insp = AliasedInsp(
from .sql.schema import FetchedValue # noqa
from .sql.schema import ForeignKey # noqa
from .sql.schema import ForeignKeyConstraint # noqa
-from .sql.schema import Index # noqa
from .sql.schema import Identity # noqa
+from .sql.schema import Index # noqa
from .sql.schema import MetaData # noqa
from .sql.schema import PrimaryKeyConstraint # noqa
from .sql.schema import SchemaItem # noqa
class Options(util.with_metaclass(_MetaOptions)):
- """A cacheable option dictionary with defaults.
-
-
- """
+ """A cacheable option dictionary with defaults."""
def __init__(self, **kw):
self.__dict__.update(kw)
def from_execution_options(
cls, key, attrs, exec_options, statement_exec_options
):
- """"process Options argument in terms of execution options.
+ """process Options argument in terms of execution options.
e.g.::
__visit_name__ = "executable_option"
def _clone(self):
- """Create a shallow copy of this ExecutableOption.
-
- """
+ """Create a shallow copy of this ExecutableOption."""
c = self.__class__.__new__(self.__class__)
c.__dict__ = dict(self.__dict__)
return c
@_generative
def execution_options(self, **kw):
- """ Set non-SQL options for the statement which take effect during
+ """Set non-SQL options for the statement which take effect during
execution.
Execution options can be set on a per-statement or
self._execution_options = self._execution_options.union(kw)
def get_execution_options(self):
- """ Get the non-SQL options which will take effect during execution.
+ """Get the non-SQL options which will take effect during execution.
.. versionadded:: 1.3
":class:`.Session`.",
)
def execute(self, *multiparams, **params):
- """Compile and execute this :class:`.Executable`.
-
- """
+ """Compile and execute this :class:`.Executable`."""
e = self.bind
if e is None:
label = getattr(self, "description", self.__class__.__name__)
def replace(self, column):
"""add the given column to this collection, removing unaliased
- versions of this column as well as existing columns with the
- same key.
+ versions of this column as well as existing columns with the
+ same key.
- e.g.::
+ e.g.::
- t = Table('sometable', metadata, Column('col1', Integer))
- t.columns.replace(Column('col1', Integer, key='columnone'))
+ t = Table('sometable', metadata, Column('col1', Integer))
+ t.columns.replace(Column('col1', Integer, key='columnone'))
- will remove the original 'col1' from the collection, and add
- the new column under the name 'columnname'.
+ will remove the original 'col1' from the collection, and add
+ the new column under the name 'columnname'.
- Used by schema.Column to override columns during table reflection.
+ Used by schema.Column to override columns during table reflection.
"""
"""
- return not isinstance(
- element, (Visitable, schema.SchemaEventTarget),
- ) and not hasattr(element, "__clause_element__")
+ return (
+ not isinstance(
+ element,
+ (Visitable, schema.SchemaEventTarget),
+ )
+ and not hasattr(element, "__clause_element__")
+ )
def _deep_is_literal(element):
(
c,
compiler.preparer.format_column(
- c, use_table=include_table,
+ c,
+ use_table=include_table,
),
compiler.process(c.onupdate.arg.self_group(), **kw),
)
(
c,
compiler.preparer.format_column(
- c, use_table=include_table,
+ c,
+ use_table=include_table,
),
_create_update_prefetch_bind_param(compiler, c, **kw),
)
def sort_tables(
- tables, skip_fn=None, extra_dependencies=None,
+ tables,
+ skip_fn=None,
+ extra_dependencies=None,
):
"""Sort a collection of :class:`_schema.Table` objects based on
dependency.
Executable,
ClauseElement,
):
- """Form the base for ``INSERT``, ``UPDATE``, and ``DELETE`` statements.
-
- """
+ """Form the base for ``INSERT``, ``UPDATE``, and ``DELETE`` statements."""
__visit_name__ = "update_base"
:param dialect_name: defaults to ``*``, if specified as the name
of a particular dialect, will apply these hints only when
that dialect is in use.
- """
+ """
if selectable is None:
selectable = self.table
("operator", InternalTraversal.dp_operator),
("negate", InternalTraversal.dp_operator),
("modifiers", InternalTraversal.dp_plain_dict),
- ("type", InternalTraversal.dp_type,), # affects JSON CAST operators
+ (
+ "type",
+ InternalTraversal.dp_type,
+ ), # affects JSON CAST operators
]
_is_implicitly_boolean = True
class IndexExpression(BinaryExpression):
- """Represent the class of expressions that are like an "index" operation.
- """
+ """Represent the class of expressions that are like an "index"
+ operation."""
pass
# create trackers to catch those.
analyzed_function = AnalyzedFunction(
- self, lambda_element, None, lambda_kw, fn,
+ self,
+ lambda_element,
+ None,
+ lambda_kw,
+ fn,
)
closure_trackers = self.closure_trackers
)
def __init__(
- self, analyzed_code, lambda_element, apply_propagate_attrs, kw, fn,
+ self,
+ analyzed_code,
+ lambda_element,
+ apply_propagate_attrs,
+ kw,
+ fn,
):
self.analyzed_code = analyzed_code
self.fn = fn
":meth:`_reflection.Inspector.has_table`.",
)
def exists(self, bind=None):
- """Return True if this table exists.
-
- """
+ """Return True if this table exists."""
if bind is None:
bind = _bind_or_error(self)
if col.autoincrement is True:
_validate_autoinc(col, True)
return col
- elif col.autoincrement in (
- "auto",
- "ignore_fk",
- ) and _validate_autoinc(col, False):
+ elif (
+ col.autoincrement
+ in (
+ "auto",
+ "ignore_fk",
+ )
+ and _validate_autoinc(col, False)
+ ):
return col
else:
needs_isinstance = (
needs_convert
and dialect.returns_unicode_strings
- in (String.RETURNS_CONDITIONAL, String.RETURNS_UNICODE,)
+ in (
+ String.RETURNS_CONDITIONAL,
+ String.RETURNS_UNICODE,
+ )
and self._expect_unicode != "force_nocheck"
)
if needs_convert:
:attr:`.types.JSON.NULL`
- """
+ """
self.none_as_null = none_as_null
class JSONElementType(TypeEngine):
@util.memoized_property
def _has_literal_processor(self):
- """memoized boolean, check if process_literal_param is implemented.
-
-
- """
+ """memoized boolean, check if process_literal_param is implemented."""
return (
self.__class__.process_literal_param.__code__
def _make_slice(limit_clause, offset_clause, start, stop):
- """Compute LIMIT/OFFSET in terms of slice start/end
- """
+ """Compute LIMIT/OFFSET in terms of slice start/end"""
# for calculated limit/offset, try to do the addition of
# values to offset in Python, however if a SQL clause is present
assert reflected_table.primary_key.columns[c.name] is not None
def assert_types_base(self, c1, c2):
- assert c1.type._compare_type_affinity(c2.type), (
- "On column %r, type '%s' doesn't correspond to type '%s'"
- % (c1.name, c1.type, c2.type)
+ assert c1.type._compare_type_affinity(
+ c2.type
+ ), "On column %r, type '%s' doesn't correspond to type '%s'" % (
+ c1.name,
+ c1.type,
+ c2.type,
)
# type: (URL, str, str) -> URL
backend = url.get_backend_name()
- new_url = url.set(drivername="%s+%s" % (backend, driver),)
+ new_url = url.set(
+ drivername="%s+%s" % (backend, driver),
+ )
new_url = new_url.update_query_string(query_str)
try:
@register.init
def update_db_opts(db_url, db_opts):
- """Set database options (db_opts) for a test database that we created.
- """
+ """Set database options (db_opts) for a test database that we created."""
pass
@property
def on_update_cascade(self):
- """"target database must support ON UPDATE..CASCADE behavior in
+ """target database must support ON UPDATE..CASCADE behavior in
foreign keys."""
return exclusions.open()
@property
def implements_get_lastrowid(self):
- """"target dialect implements the executioncontext.get_lastrowid()
+ """target dialect implements the executioncontext.get_lastrowid()
method without reliance on RETURNING.
"""
@property
def emulated_lastrowid(self):
- """"target dialect retrieves cursor.lastrowid, or fetches
+ """target dialect retrieves cursor.lastrowid, or fetches
from a database-side function after an insert() construct executes,
within the get_lastrowid() method.
@property
def emulated_lastrowid_even_with_sequences(self):
- """"target dialect retrieves cursor.lastrowid or an equivalent
+ """target dialect retrieves cursor.lastrowid or an equivalent
after an insert() construct executes, even if the table has a
Sequence on it.
@property
def dbapi_lastrowid(self):
- """"target platform includes a 'lastrowid' accessor on the DBAPI
+ """target platform includes a 'lastrowid' accessor on the DBAPI
cursor object.
"""
@property
def cross_schema_fk_reflection(self):
- """target system must support reflection of inter-schema foreign keys
-
- """
+ """target system must support reflection of inter-schema
+ foreign keys"""
return exclusions.closed()
@property
def implicit_default_schema(self):
"""target system has a strong concept of 'default' schema that can
- be referred to implicitly.
+ be referred to implicitly.
- basically, PostgreSQL.
+ basically, PostgreSQL.
"""
return exclusions.closed()
@property
def view_reflection(self):
- """target database must support inspection of the full CREATE VIEW definition.
- """
+ """target database must support inspection of the full CREATE VIEW
+ definition."""
return self.views
@property
@property
def symbol_names_w_double_quote(self):
- """Target driver can create tables with a name like 'some " table'
-
- """
+ """Target driver can create tables with a name like 'some " table'"""
return exclusions.open()
@property
@property
def json_array_indexes(self):
- """"target platform supports numeric array indexes
+ """target platform supports numeric array indexes
within a JSON structure"""
return self.json_type
def _assert_round_trip(self, table, conn):
row = conn.execute(table.select()).first()
eq_(
- row, (conn.dialect.default_sequence_base, "some data",),
+ row,
+ (
+ conn.dialect.default_sequence_base,
+ "some data",
+ ),
)
def test_autoincrement_on_insert(self, connection):
def _assert_round_trip(self, table, conn):
row = conn.execute(table.select()).first()
eq_(
- row, (conn.dialect.default_sequence_base, "some data",),
+ row,
+ (
+ conn.dialect.default_sequence_base,
+ "some data",
+ ),
)
@classmethod
Column("related_id", Integer),
sa.PrimaryKeyConstraint("id", name="pk quote ' one"),
sa.Index("ix quote ' one", "name"),
- sa.UniqueConstraint("data", name="uq quote' one",),
+ sa.UniqueConstraint(
+ "data",
+ name="uq quote' one",
+ ),
sa.ForeignKeyConstraint(
["id"], ["related.id"], name="fk quote ' one"
),
Column("related_id", Integer),
sa.PrimaryKeyConstraint("id", name='pk quote " two'),
sa.Index('ix quote " two', "name"),
- sa.UniqueConstraint("data", name='uq quote" two',),
+ sa.UniqueConstraint(
+ "data",
+ name='uq quote" two',
+ ),
sa.ForeignKeyConstraint(
["id"], ["related.id"], name='fk quote " two'
),
"Skipped unsupported reflection of expression-based index t_idx"
):
eq_(
- insp.get_indexes("t"), expected,
+ insp.get_indexes("t"),
+ expected,
)
@testing.requires.index_reflects_included_columns
if testing.requires.index_reflects_included_columns.enabled:
expected[0]["include_columns"] = []
eq_(
- [idx for idx in indexes if idx["name"] == "user_tmp_ix"], expected,
+ [idx for idx in indexes if idx["name"] == "user_tmp_ix"],
+ expected,
)
@testing.requires.unique_constraint_reflection
)
if testing.requires.computed_columns_virtual.enabled:
self.check_column(
- data, "computed_virtual", "normal+2", False,
+ data,
+ "computed_virtual",
+ "normal+2",
+ False,
)
if testing.requires.computed_columns_stored.enabled:
self.check_column(
- data, "computed_stored", "normal-42", True,
+ data,
+ "computed_stored",
+ "normal-42",
+ True,
)
@testing.requires.schemas
)
if testing.requires.computed_columns_virtual.enabled:
self.check_column(
- data, "computed_virtual", "normal/2", False,
+ data,
+ "computed_virtual",
+ "normal/2",
+ False,
)
if testing.requires.computed_columns_stored.enabled:
self.check_column(
- data, "computed_stored", "normal*42", True,
+ data,
+ "computed_stored",
+ "normal*42",
+ True,
)
)
eq_(
- result.fetchmany(5), [(i, "data%d" % i) for i in range(1, 6)],
+ result.fetchmany(5),
+ [(i, "data%d" % i) for i in range(1, 6)],
)
eq_(
result.fetchmany(10),
Column(
"id",
Integer,
- Identity(increment=-5, start=0, minvalue=-1000, maxvalue=0,),
+ Identity(
+ increment=-5,
+ start=0,
+ minvalue=-1000,
+ maxvalue=0,
+ ),
primary_key=True,
),
Column("desc", String(100)),
@classmethod
def insert_data(cls, connection):
connection.execute(
- cls.tables.tbl_a.insert(), [{"desc": "a"}, {"desc": "b"}],
+ cls.tables.tbl_a.insert(),
+ [{"desc": "a"}, {"desc": "b"}],
)
connection.execute(
- cls.tables.tbl_b.insert(), [{"desc": "a"}, {"desc": "b"}],
+ cls.tables.tbl_b.insert(),
+ [{"desc": "a"}, {"desc": "b"}],
)
connection.execute(
- cls.tables.tbl_b.insert(), [{"id": 42, "desc": "c"}],
+ cls.tables.tbl_b.insert(),
+ [{"id": 42, "desc": "c"}],
)
def test_select_all(self, connection):
def test_insert_always_error(self, connection):
def fn():
connection.execute(
- self.tables.tbl_a.insert(), [{"id": 200, "desc": "a"}],
+ self.tables.tbl_a.insert(),
+ [{"id": 200, "desc": "a"}],
)
assert_raises((DatabaseError, ProgrammingError), fn)
tbl.select(tbl.c.col_a.is_distinct_from(tbl.c.col_b))
).fetchall()
eq_(
- len(result), expected_row_count_for_is,
+ len(result),
+ expected_row_count_for_is,
)
expected_row_count_for_isnot = (
tbl.select(tbl.c.col_a.isnot_distinct_from(tbl.c.col_b))
).fetchall()
eq_(
- len(result), expected_row_count_for_isnot,
+ len(result),
+ expected_row_count_for_isnot,
)
Table(
"seq_pk",
metadata,
- Column("id", Integer, Sequence("tab_id_seq"), primary_key=True,),
+ Column(
+ "id",
+ Integer,
+ Sequence("tab_id_seq"),
+ primary_key=True,
+ ),
Column("data", String(50)),
)
"schema_seq", schema=config.test_schema, metadata=metadata
)
Table(
- "user_id_table", metadata, Column("id", Integer, primary_key=True),
+ "user_id_table",
+ metadata,
+ Column("id", Integer, primary_key=True),
)
def test_has_sequence(self, connection):
eq_(
- inspect(connection).has_sequence("user_id_seq"), True,
+ inspect(connection).has_sequence("user_id_seq"),
+ True,
)
def test_has_sequence_other_object(self, connection):
eq_(
- inspect(connection).has_sequence("user_id_table"), False,
+ inspect(connection).has_sequence("user_id_table"),
+ False,
)
@testing.requires.schemas
def test_has_sequence_neg(self, connection):
eq_(
- inspect(connection).has_sequence("some_sequence"), False,
+ inspect(connection).has_sequence("some_sequence"),
+ False,
)
@testing.requires.schemas
@testing.requires.schemas
def test_has_sequence_remote_not_in_default(self, connection):
eq_(
- inspect(connection).has_sequence("schema_seq"), False,
+ inspect(connection).has_sequence("schema_seq"),
+ False,
)
def test_get_sequence_names(self, connection):
def test_get_sequence_names_no_sequence(self, connection):
eq_(
- inspect(connection).get_sequence_names(), [],
+ inspect(connection).get_sequence_names(),
+ [],
)
@testing.requires.precision_numerics_enotation_large
def test_enotation_decimal_large(self):
- """test exceedingly large decimals.
-
- """
+ """test exceedingly large decimals."""
numbers = set(
[
conn = connection
conn.execute(
- self.tables.data_table.insert(), {"name": "r1", "data": JSON.NULL},
+ self.tables.data_table.insert(),
+ {"name": "r1", "data": JSON.NULL},
)
eq_(
# "cannot extract array element from a non-array", which is
# fixed in 9.4 but may exist in 9.3
self._test_index_criteria(
- and_(name == "r4", cast(col[1], String) == '"two"',), "r4",
+ and_(
+ name == "r4",
+ cast(col[1], String) == '"two"',
+ ),
+ "r4",
)
def test_string_cast_crit_mixed_path(self):
col = self.tables.data_table.c["data"]
self._test_index_criteria(
- cast(col[("key3", 1, "six")], String) == '"seven"', "r3",
+ cast(col[("key3", 1, "six")], String) == '"seven"',
+ "r3",
)
def test_string_cast_crit_string_path(self):
col = self.tables.data_table.c["data"]
self._test_index_criteria(
- and_(name == "r6", cast(col["b"], String) == '"some value"',),
+ and_(
+ name == "r6",
+ cast(col["b"], String) == '"some value"',
+ ),
"r6",
)
r"The Session.begin.subtransactions flag is deprecated",
]:
warnings.filterwarnings(
- "ignore", message=msg, category=sa_exc.RemovedIn20Warning,
+ "ignore",
+ message=msg,
+ category=sa_exc.RemovedIn20Warning,
)
try:
return FacadeDict, (dict(self),)
def _insert_item(self, key, value):
- """insert an item into the dictionary directly.
-
-
- """
+ """insert an item into the dictionary directly."""
dict.__setitem__(self, key, value)
def __repr__(self):
if py3k:
def _formatannotation(annotation, base_module=None):
- """vendored from python 3.7
- """
+ """vendored from python 3.7"""
if getattr(annotation, "__module__", None) == "typing":
return repr(annotation).replace("typing.", "")
warning = exc.RemovedIn20Warning
version = "1.4"
if add_deprecation_to_docstring:
- header = ".. deprecated:: %s %s" % (version, (message or ""),)
+ header = ".. deprecated:: %s %s" % (
+ version,
+ (message or ""),
+ )
else:
header = None
self._exc_info = None # remove potential circular references
if not self.warn_only:
compat.raise_(
- exc_value, with_traceback=exc_tb,
+ exc_value,
+ with_traceback=exc_tb,
)
else:
if not compat.py3k and self._exc_info and self._exc_info[1]:
def iterate_attributes(cls):
"""iterate all the keys and attributes associated
- with a class, without using getattr().
+ with a class, without using getattr().
- Does not use getattr() so that class-sensitive
- descriptors (i.e. property.__get__()) are not called.
+ Does not use getattr() so that class-sensitive
+ descriptors (i.e. property.__get__()) are not called.
"""
keys = dir(cls)
@classmethod
def memoized_instancemethod(cls, fn):
- """Decorate a method memoize its return value.
-
- """
+ """Decorate a method memoize its return value."""
def oneshot(self, *args, **kw):
result = fn(self, *args, **kw)
def repr_tuple_names(names):
- """ Trims a list of strings from the middle and return a string of up to
- four elements. Strings greater than 11 characters will be truncated"""
+ """Trims a list of strings from the middle and return a string of up to
+ four elements. Strings greater than 11 characters will be truncated"""
if len(names) == 0:
return None
flag = len(names) <= 4
return self._queue.put_nowait(item)
except asyncio.queues.QueueFull as err:
compat.raise_(
- Full(), replace_context=err,
+ Full(),
+ replace_context=err,
)
def put(self, item, block=True, timeout=None):
return self.await_(self._queue.put(item))
except asyncio.queues.QueueFull as err:
compat.raise_(
- Full(), replace_context=err,
+ Full(),
+ replace_context=err,
)
def get_nowait(self):
return self._queue.get_nowait()
except asyncio.queues.QueueEmpty as err:
compat.raise_(
- Empty(), replace_context=err,
+ Empty(),
+ replace_context=err,
)
def get(self, block=True, timeout=None):
return self.await_(self._queue.get())
except asyncio.queues.QueueEmpty as err:
compat.raise_(
- Empty(), replace_context=err,
+ Empty(),
+ replace_context=err,
)
r.context.compiled.compile_state = compile_state
obj = ORMCompileState.orm_setup_cursor_result(
- sess, compile_state.statement, {}, exec_opts, {}, r,
+ sess,
+ compile_state.statement,
+ {},
+ exec_opts,
+ {},
+ r,
)
list(obj.unique())
sess.close()
[row["field%d" % fnum] for fnum in range(NUM_FIELDS)]
@testing.combinations(
- (False, 0), (True, 1), (False, 1), (False, 2),
+ (False, 0),
+ (True, 1),
+ (False, 1),
+ (False, 2),
)
def test_one_or_none(self, one_or_first, rows_present):
# TODO: this is not testing the ORM level "scalar_mapping"
object(),
)
- result = self._fixture(extras=[(ex1a, ex1b), (ex2,), (ex3a, ex3b,)])
+ result = self._fixture(
+ extras=[
+ (ex1a, ex1b),
+ (ex2,),
+ (
+ ex3a,
+ ex3b,
+ ),
+ ]
+ )
eq_(
result.columns(ex2, ex3b).columns(ex3a).all(),
[(1,), (2,), (2,), (2,)],
)
- result = self._fixture(extras=[(ex1a, ex1b), (ex2,), (ex3a, ex3b,)])
+ result = self._fixture(
+ extras=[
+ (ex1a, ex1b),
+ (ex2,),
+ (
+ ex3a,
+ ex3b,
+ ),
+ ]
+ )
eq_([row._mapping[ex1b] for row in result], [1, 2, 1, 4])
- result = self._fixture(extras=[(ex1a, ex1b), (ex2,), (ex3a, ex3b,)])
+ result = self._fixture(
+ extras=[
+ (ex1a, ex1b),
+ (ex2,),
+ (
+ ex3a,
+ ex3b,
+ ),
+ ]
+ )
eq_(
[
dict(r)
result = r1.merge(r2, r3, r4)
eq_(
- result.first(), (7, "u1"),
+ result.first(),
+ (7, "u1"),
)
def test_columns(self, merge_fixture):
r = r.columns(0).mappings()
eq_(
- list(r), [{"a": 1}, {"a": 2}, {"a": 1}, {"a": 1}, {"a": 4}],
+ list(r),
+ [{"a": 1}, {"a": 2}, {"a": 1}, {"a": 1}, {"a": 4}],
)
def test_scalar_mode_but_accessed_nonscalar_result(self, no_tuple_fixture):
)
r = result.ChunkedIteratorResult(
- metadata, no_tuple_fixture, source_supports_scalars=True,
+ metadata,
+ no_tuple_fixture,
+ source_supports_scalars=True,
)
r = r.unique()
)
r = result.ChunkedIteratorResult(
- metadata, no_tuple_fixture, source_supports_scalars=True,
+ metadata,
+ no_tuple_fixture,
+ source_supports_scalars=True,
)
r = r.unique()
)
r = result.ChunkedIteratorResult(
- metadata, no_tuple_fixture, source_supports_scalars=True,
+ metadata,
+ no_tuple_fixture,
+ source_supports_scalars=True,
)
r = r.scalars().unique()
)
def test_warn_deprecated_limited_cap(self):
- """ warn_deprecated_limited() and warn_limited() use
+ """warn_deprecated_limited() and warn_limited() use
_hash_limit_string
actually just verifying that _hash_limit_string works as expected
def test_identity_object_no_primary_key(self):
metadata = MetaData()
tbl = Table(
- "test", metadata, Column("id", Integer, Identity(increment=42)),
+ "test",
+ metadata,
+ Column("id", Integer, Identity(increment=42)),
)
self.assert_compile(
schema.CreateTable(tbl),
tbl = Table(
"test",
metadata,
- Column("id", Integer, Identity(start=3), nullable=False,),
+ Column(
+ "id",
+ Integer,
+ Identity(start=3),
+ nullable=False,
+ ),
)
self.assert_compile(
schema.CreateTable(tbl),
"test",
metadata,
Column("id", Integer, autoincrement=False, primary_key=True),
- Column("x", Integer, Identity(start=3, increment=42),),
+ Column(
+ "x",
+ Integer,
+ Identity(start=3, increment=42),
+ ),
)
self.assert_compile(
schema.CreateTable(tbl),
Identity(start=3, increment=42),
autoincrement=True,
),
- Column("id2", Integer, Identity(start=7, increment=2),),
+ Column(
+ "id2",
+ Integer,
+ Identity(start=7, increment=2),
+ ),
)
# this will be rejected by the database, just asserting this is what
# the two autoincrements will do right now
def test_identity_object_no_options(self):
metadata = MetaData()
- tbl = Table("test", metadata, Column("id", Integer, Identity()),)
+ tbl = Table(
+ "test",
+ metadata,
+ Column("id", Integer, Identity()),
+ )
self.assert_compile(
schema.CreateTable(tbl),
"CREATE TABLE test (id INTEGER NOT NULL IDENTITY)",
result = []
- def fail_on_exec(stmt,):
+ def fail_on_exec(
+ stmt,
+ ):
if view is not None and view in stmt:
result.append(("SERIALIZABLE",))
else:
@classmethod
def insert_data(cls, connection):
connection.execute(
- cls.tables.error_t.insert(), [{"error_code": "01002"}],
+ cls.tables.error_t.insert(),
+ [{"error_code": "01002"}],
)
def test_invalid_transaction_detection(self, connection):
)
)
r = connection.execute(t1.select()).first()
- assert isinstance(r[1], util.text_type), (
- "%s is %s instead of unicode, working on %s"
- % (r[1], type(r[1]), meta.bind)
+ assert isinstance(
+ r[1], util.text_type
+ ), "%s is %s instead of unicode, working on %s" % (
+ r[1],
+ type(r[1]),
+ meta.bind,
)
eq_(r[1], util.ue("abc \xc3\xa9 def"))
if not exists:
with expect_raises(exc.NoSuchTableError):
Table(
- table_name, metadata, autoload_with=connection,
+ table_name,
+ metadata,
+ autoload_with=connection,
)
else:
tmp_t = Table(table_name, metadata, autoload_with=connection)
tmp_t.select().where(tmp_t.c.id == 2)
).fetchall()
eq_(
- result, [(2, "bar", datetime.datetime(2020, 2, 2, 2, 2, 2))],
+ result,
+ [(2, "bar", datetime.datetime(2020, 2, 2, 2, 2, 2))],
)
@testing.provide_metadata
)
def test_has_table_temporary(self, connection, table_name, exists):
if exists:
- tt = Table(table_name, self.metadata, Column("id", Integer),)
+ tt = Table(
+ table_name,
+ self.metadata,
+ Column("id", Integer),
+ )
tt.create(connection)
found_it = testing.db.dialect.has_table(connection, table_name)
for i in range(self.col_num)
]
)
- self.view_str = view_str = (
- "CREATE VIEW huge_named_view AS SELECT %s FROM base_table"
- % (
- ",".join(
- "long_named_column_number_%d" % i
- for i in range(self.col_num)
- )
+ self.view_str = (
+ view_str
+ ) = "CREATE VIEW huge_named_view AS SELECT %s FROM base_table" % (
+ ",".join(
+ "long_named_column_number_%d" % i for i in range(self.col_num)
)
)
assert len(view_str) > 4000
"bigint_seq_t",
metadata,
Column(
- "id", BIGINT, default=Sequence("bigint_seq", start=3000000000),
+ "id",
+ BIGINT,
+ default=Sequence("bigint_seq", start=3000000000),
),
Column("txt", String(50)),
)
"id",
DECIMAL(10, 0),
default=Sequence(
- "decimal_seq", data_type=DECIMAL(10, 0), start=3000000000,
+ "decimal_seq",
+ data_type=DECIMAL(10, 0),
+ start=3000000000,
),
),
Column("txt", String(50)),
@testing.metadata_fixture()
def datetimeoffset_fixture(self, metadata):
t = Table(
- "test_dates", metadata, Column("adatetimeoffset", DATETIMEOFFSET),
+ "test_dates",
+ metadata,
+ Column("adatetimeoffset", DATETIMEOFFSET),
)
return t
return
conn.execute(
- t.insert(), adatetimeoffset=dto_param_value,
+ t.insert(),
+ adatetimeoffset=dto_param_value,
)
row = conn.execute(t.select()).first()
@testing.combinations(
((10, 2, 7), "10.2.7-MariaDB", (10, 2, 7), True),
- ((10, 2, 7), "5.6.15.10.2.7-MariaDB", (5, 6, 15, 10, 2, 7), True,),
+ (
+ (10, 2, 7),
+ "5.6.15.10.2.7-MariaDB",
+ (5, 6, 15, 10, 2, 7),
+ True,
+ ),
((5, 0, 51, 24), "5.0.51a.24+lenny5", (5, 0, 51, 24), False),
((10, 2, 10), "10.2.10-MariaDB", (10, 2, 10), True),
((5, 7, 20), "5.7.20", (5, 7, 20), False),
def test_is_boolean_symbols_despite_no_native(self, connection):
is_(
- connection.scalar(select(cast(true().is_(true()), Boolean))), True,
+ connection.scalar(select(cast(true().is_(true()), Boolean))),
+ True,
)
is_(
t.create(connection)
connection.execute(
t.insert(),
- dict(scale_value=45.768392065789, unscale_value=45.768392065789,),
+ dict(
+ scale_value=45.768392065789,
+ unscale_value=45.768392065789,
+ ),
)
result = connection.scalar(select(t.c.scale_value))
eq_(result, decimal.Decimal("45.768392065789"))
dict(e1="a", e2="a", e3="a", e4="'a'", e5="a,b"),
)
connection.execute(
- set_table.insert(), dict(e1="b", e2="b", e3="b", e4="b", e5="a,b"),
+ set_table.insert(),
+ dict(e1="b", e2="b", e3="b", e4="b", e5="a,b"),
)
expected = [
set(["'a'"]),
set(["a", "b"]),
),
- (set(["b"]), set(["b"]), set(["b"]), set(["b"]), set(["a", "b"]),),
+ (
+ set(["b"]),
+ set(["b"]),
+ set(["b"]),
+ set(["b"]),
+ set(["a", "b"]),
+ ),
]
res = connection.execute(set_table.select()).fetchall()
Column("e6", mysql.SET("", "a", retrieve_as_bitwise=True)),
Column(
"e7",
- mysql.SET("", "'a'", "b'b", "'", retrieve_as_bitwise=True,),
+ mysql.SET(
+ "",
+ "'a'",
+ "b'b",
+ "'",
+ retrieve_as_bitwise=True,
+ ),
),
)
)
_oracle_char_combinations = testing.combinations(
- ("STRING", cx_Oracle_STRING,),
- ("FIXED_CHAR", cx_Oracle_FIXED_CHAR,),
- ("CLOB", cx_Oracle_CLOB,),
- ("NCLOB", cx_Oracle_NCLOB,),
+ (
+ "STRING",
+ cx_Oracle_STRING,
+ ),
+ (
+ "FIXED_CHAR",
+ cx_Oracle_FIXED_CHAR,
+ ),
+ (
+ "CLOB",
+ cx_Oracle_CLOB,
+ ),
+ (
+ "NCLOB",
+ cx_Oracle_NCLOB,
+ ),
argnames="cx_oracle_type",
id_="ia",
)
@_oracle_char_combinations
@testing.requires.python2
def test_encoding_errors_sqla_py2k(
- self, cx_Oracle, cx_oracle_type,
+ self,
+ cx_Oracle,
+ cx_oracle_type,
):
ignore_dialect = cx_oracle.dialect(
dbapi=cx_Oracle, encoding_errors="ignore"
@_oracle_char_combinations
@testing.requires.python2
def test_no_encoding_errors_sqla_py2k(
- self, cx_Oracle, cx_oracle_type,
+ self,
+ cx_Oracle,
+ cx_oracle_type,
):
plain_dialect = cx_oracle.dialect(dbapi=cx_Oracle)
@_oracle_char_combinations
@testing.requires.python3
def test_encoding_errors_cx_oracle_py3k(
- self, cx_Oracle, cx_oracle_type,
+ self,
+ cx_Oracle,
+ cx_oracle_type,
):
ignore_dialect = cx_oracle.dialect(
dbapi=cx_Oracle, encoding_errors="ignore"
cursor.mock_calls,
[
mock.call.var(
- mock.ANY, None, cursor.arraysize, encodingErrors="ignore",
+ mock.ANY,
+ None,
+ cursor.arraysize,
+ encodingErrors="ignore",
)
],
)
@_oracle_char_combinations
@testing.requires.python3
def test_no_encoding_errors_cx_oracle_py3k(
- self, cx_Oracle, cx_oracle_type,
+ self,
+ cx_Oracle,
+ cx_oracle_type,
):
plain_dialect = cx_oracle.dialect(dbapi=cx_Oracle)
),
{"text": "my table comment"},
)
- eq_(insp.get_table_comment("parent",), {"text": "my local comment"})
+ eq_(
+ insp.get_table_comment(
+ "parent",
+ ),
+ {"text": "my local comment"},
+ )
eq_(
insp.get_table_comment(
"parent", schema=testing.db.dialect.default_schema_name
)
with engine.connect() as conn:
result = exec_sql(
- conn, "select id, data, bindata from z_test order by id",
+ conn,
+ "select id, data, bindata from z_test order by id",
)
results = result.fetchall()
)
self.assert_compile(
- c.any(5), "%(param_1)s = ANY (x)", checkparams={"param_1": 5},
+ c.any(5),
+ "%(param_1)s = ANY (x)",
+ checkparams={"param_1": 5},
)
self.assert_compile(
)
self.assert_compile(
- c.all(5), "%(param_1)s = ALL (x)", checkparams={"param_1": 5},
+ c.all(5),
+ "%(param_1)s = ALL (x)",
+ checkparams={"param_1": 5},
)
self.assert_compile(
class FullTextSearchTest(fixtures.TestBase, AssertsCompiledSQL):
- """Tests for full text searching
- """
+ """Tests for full text searching"""
__dialect__ = postgresql.dialect()
def _raise_query(self, q):
"""
- useful for debugging. just do...
- self._raise_query(q)
+ useful for debugging. just do...
+ self._raise_query(q)
"""
c = q.compile(dialect=postgresql.dialect())
raise ValueError(c)
assert t.c.id not in result.keys()
assert not result._soft_closed
assert isinstance(
- result.cursor_strategy, _cursor.FullyBufferedCursorFetchStrategy,
+ result.cursor_strategy,
+ _cursor.FullyBufferedCursorFetchStrategy,
)
assert not result.cursor.closed
assert not result.closed
eq_(
conn.scalar(
select(
- cast(literal(quoted_name("some_name", False)), String,)
+ cast(
+ literal(quoted_name("some_name", False)),
+ String,
+ )
)
),
"some_name",
@testing.provide_metadata
def test_index_reflection(self):
- """ Reflecting expression-based indexes should warn
- """
+ """Reflecting expression-based indexes should warn"""
metadata = self.metadata
@testing.provide_metadata
def test_index_reflection_partial(self, connection):
- """Reflect the filter defintion on partial indexes
- """
+ """Reflect the filter defintion on partial indexes"""
metadata = self.metadata
stmt = select(
func.array_cat(
- array([1, 2, 3]), array([4, 5, 6]), type_=self.ARRAY(Integer),
+ array([1, 2, 3]),
+ array([4, 5, 6]),
+ type_=self.ARRAY(Integer),
)[2:5]
)
eq_(connection.execute(stmt).scalar(), [2, 3, 4, 5])
c = "ccc"
tbl.append_column(
- Column("pyenum_col", array_cls(enum_cls(MyEnum)),),
+ Column(
+ "pyenum_col",
+ array_cls(enum_cls(MyEnum)),
+ ),
)
self.metadata.create_all(connection)
"json_table",
self.metadata,
Column("id", Integer, primary_key=True),
- Column("json_col", array_cls(json_cls),),
+ Column(
+ "json_col",
+ array_cls(json_cls),
+ ),
)
self.metadata.create_all(connection)
connection.execute(t.update(), data="'a' 'cat' 'fat' 'mat' 'sat'")
eq_(
- connection.scalar(select(t.c.data)), "'a' 'cat' 'fat' 'mat' 'sat'",
+ connection.scalar(select(t.c.data)),
+ "'a' 'cat' 'fat' 'mat' 'sat'",
)
@testing.provide_metadata
def _test_insert_none_as_null(self, conn):
conn.execute(
- self.tables.data_table.insert(), {"name": "r1", "nulldata": None},
+ self.tables.data_table.insert(),
+ {"name": "r1", "nulldata": None},
)
self._assert_column_is_NULL(conn, column="nulldata")
__only_on__ = "sqlite"
def test_boolean(self):
- """Test that the boolean only treats 1 as True
-
- """
+ """Test that the boolean only treats 1 as True"""
meta = MetaData(testing.db)
t = Table(
'true', 'false', and 'column' are undocumented reserved words
when used as column identifiers (as of 3.5.1). Covering them
here to ensure they remain in place if the dialect's
- reserved_words set is updated in the future. """
+ reserved_words set is updated in the future."""
meta = MetaData(testing.db)
t = Table(
@testing.provide_metadata
def test_quoted_identifiers_functional_two(self):
- """"test the edgiest of edge cases, quoted table/col names
+ """ "test the edgiest of edge cases, quoted table/col names
that start and end with quotes.
SQLite claims to have fixed this in
def test_empty_insert_pk1(self, connection):
self._test_empty_insert(
connection,
- Table("a", MetaData(), Column("id", Integer, primary_key=True),),
+ Table(
+ "a",
+ MetaData(),
+ Column("id", Integer, primary_key=True),
+ ),
)
def test_empty_insert_pk2(self, connection):
self._test_empty_insert(
connection,
Table(
- "f", MetaData(), Column("x", Integer), Column("y", Integer),
+ "f",
+ MetaData(),
+ Column("x", Integer),
+ Column("y", Integer),
),
)
eq_(conn.scalar(stmt), 2)
with config.db.connect().execution_options(
- compiled_cache=cache, schema_translate_map={None: None},
+ compiled_cache=cache,
+ schema_translate_map={None: None},
) as conn:
# should use default schema again even though statement
# was compiled with test_schema in the map
"testtable",
metadata,
Column(
- "pk", Integer, Sequence("testtable_pk_seq"), primary_key=True,
+ "pk",
+ Integer,
+ Sequence("testtable_pk_seq"),
+ primary_key=True,
),
)
compiled = [
("CREATE TABLE t1", {}, None),
- ("INSERT INTO t1 (c1, c2)", {"c2": "some data", "c1": 5}, (),),
+ (
+ "INSERT INTO t1 (c1, c2)",
+ {"c2": "some data", "c1": 5},
+ (),
+ ),
("INSERT INTO t1 (c1, c2)", {"c1": 6}, ()),
("select * from t1", {}, None),
("DROP TABLE t1", {}, None),
t = Table(
"t",
self.metadata,
- Column("x", Integer, Sequence("t_id_seq"), primary_key=True,),
+ Column(
+ "x",
+ Integer,
+ Sequence("t_id_seq"),
+ primary_key=True,
+ ),
implicit_returning=False,
)
self.metadata.create_all(engine)
Mock(side_effect=tsa.exc.InvalidRequestError("duplicate col")),
):
assert_raises(
- tsa.exc.InvalidRequestError, conn.execute, text("select 1"),
+ tsa.exc.InvalidRequestError,
+ conn.execute,
+ text("select 1"),
)
# cursor is closed
stmt = "insert into table foo"
params = {"foo": "bar"}
ctx = dialect.execution_ctx_cls._init_statement(
- dialect, conn, conn.connection, {}, stmt, [params],
+ dialect,
+ conn,
+ conn.connection,
+ {},
+ stmt,
+ [params],
)
conn._cursor_execute(ctx.cursor, stmt, params, ctx)
is_false(url1 == url3)
@testing.combinations(
- "drivername", "username", "password", "host", "database",
+ "drivername",
+ "username",
+ "password",
+ "host",
+ "database",
)
def test_component_set(self, component):
common_url = (
)
@testing.combinations(
- "username", "host", "database",
+ "username",
+ "host",
+ "database",
)
def test_only_str_constructor(self, argname):
assert_raises_message(
)
@testing.combinations(
- "username", "host", "database",
+ "username",
+ "host",
+ "database",
)
def test_only_str_set(self, argname):
u1 = url.URL.create("somedriver")
"sqlite:///?plugin=engineplugin1&foo=bar&myplugin1_arg=bat"
"&plugin=engineplugin2&myplugin2_arg=hoho"
)
- e = create_engine(url_str, logging_name="foob",)
+ e = create_engine(
+ url_str,
+ logging_name="foob",
+ )
eq_(e.dialect.name, "sqlite")
eq_(e.logging_name, "bar")
self.assert_tables_equal(addresses, reflected_addresses)
@testing.provide_metadata
- def test_autoload_with_imply_autoload(self,):
+ def test_autoload_with_imply_autoload(
+ self,
+ ):
meta = self.metadata
t = Table(
"t",
def test_override_existing_fk(self):
"""test that you can override columns and specify new foreign
keys to other reflected tables, on columns which *do* already
- have that foreign key, and that the FK is not duped. """
+ have that foreign key, and that the FK is not duped."""
meta = self.metadata
Table(
)
if testing.requires.computed_columns_virtual.enabled:
self.check_table_column(
- table, "computed_virtual", "normal+2", False,
+ table,
+ "computed_virtual",
+ "normal+2",
+ False,
)
if testing.requires.computed_columns_stored.enabled:
self.check_table_column(
- table, "computed_stored", "normal-42", True,
+ table,
+ "computed_stored",
+ "normal-42",
+ True,
)
with testing.db.connect() as conn:
eq_(
- conn.scalar(select(func.count(1)).select_from(users)), 0,
+ conn.scalar(select(func.count(1)).select_from(users)),
+ 0,
)
def test_inactive_due_to_subtransaction_no_commit(self, local_connection):
"""test the 'autocommit' flag on select() and text() objects.
Requires PostgreSQL so that we may define a custom function which
- modifies the database. """
+ modifies the database."""
__only_on__ = "postgresql"
c1 = c1.execution_options(foo="new_foo")
eq_(
- engine.dialect.get_foo(c1.connection), "new_foo",
+ engine.dialect.get_foo(c1.connection),
+ "new_foo",
)
# stays outside of transaction
eq_(engine.dialect.get_foo(c1.connection), "new_foo")
conn = eng.connect()
eq_(
- eng.dialect.get_foo(conn.connection), "new_value",
+ eng.dialect.get_foo(conn.connection),
+ "new_value",
)
with testing.db.connect() as conn:
eq_(
- conn.scalar(select(func.count(1)).select_from(users)), 1,
+ conn.scalar(select(func.count(1)).select_from(users)),
+ 1,
)
@testing.requires.autocommit
assert not conn.in_transaction()
eq_(
- conn.scalar(select(func.count(1)).select_from(users)), 1,
+ conn.scalar(select(func.count(1)).select_from(users)),
+ 1,
)
conn.execute(users.insert(), {"user_id": 2, "user_name": "name 2"})
eq_(
- conn.scalar(select(func.count(1)).select_from(users)), 2,
+ conn.scalar(select(func.count(1)).select_from(users)),
+ 2,
)
assert conn.in_transaction()
assert not conn.in_transaction()
eq_(
- conn.scalar(select(func.count(1)).select_from(users)), 1,
+ conn.scalar(select(func.count(1)).select_from(users)),
+ 1,
)
def test_rollback_on_close(self):
conn.rollback()
eq_(
- conn.scalar(select(func.count(1)).select_from(users)), 1,
+ conn.scalar(select(func.count(1)).select_from(users)),
+ 1,
)
def test_rollback_no_begin(self):
conn.commit()
eq_(
- conn.scalar(select(func.count(1)).select_from(users)), 1,
+ conn.scalar(select(func.count(1)).select_from(users)),
+ 1,
)
def test_no_double_begin(self):
with testing.db.connect() as conn:
eq_(
- conn.scalar(select(func.count(1)).select_from(users)), 0,
+ conn.scalar(select(func.count(1)).select_from(users)),
+ 0,
)
def test_begin_block(self):
with testing.db.connect() as conn:
eq_(
- conn.scalar(select(func.count(1)).select_from(users)), 1,
+ conn.scalar(select(func.count(1)).select_from(users)),
+ 1,
)
@testing.requires.savepoints
conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"})
eq_(
- conn.scalar(select(func.count(1)).select_from(users)), 2,
+ conn.scalar(select(func.count(1)).select_from(users)),
+ 2,
)
savepoint.rollback()
eq_(
- conn.scalar(select(func.count(1)).select_from(users)), 1,
+ conn.scalar(select(func.count(1)).select_from(users)),
+ 1,
)
with testing.db.connect() as conn:
eq_(
- conn.scalar(select(func.count(1)).select_from(users)), 1,
+ conn.scalar(select(func.count(1)).select_from(users)),
+ 1,
)
@testing.requires.savepoints
conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"})
eq_(
- conn.scalar(select(func.count(1)).select_from(users)), 2,
+ conn.scalar(select(func.count(1)).select_from(users)),
+ 2,
)
savepoint.commit()
eq_(
- conn.scalar(select(func.count(1)).select_from(users)), 2,
+ conn.scalar(select(func.count(1)).select_from(users)),
+ 2,
)
with testing.db.connect() as conn:
eq_(
- conn.scalar(select(func.count(1)).select_from(users)), 2,
+ conn.scalar(select(func.count(1)).select_from(users)),
+ 2,
)
@testing.requires.savepoints
with testing.db.connect() as conn:
eq_(
- conn.scalar(select(func.count(1)).select_from(users)), 0,
+ conn.scalar(select(func.count(1)).select_from(users)),
+ 0,
)
@testing.requires.savepoints
with testing.db.connect() as conn:
eq_(
- conn.scalar(select(func.count(1)).select_from(users)), 2,
+ conn.scalar(select(func.count(1)).select_from(users)),
+ 2,
)
@testing.requires.savepoints
with testing.db.connect() as conn:
eq_(
- conn.scalar(select(func.count(1)).select_from(users)), 3,
+ conn.scalar(select(func.count(1)).select_from(users)),
+ 3,
)
@testing.requires.savepoints
with testing.db.connect() as conn:
eq_(
- conn.scalar(select(func.count(1)).select_from(users)), 1,
+ conn.scalar(select(func.count(1)).select_from(users)),
+ 1,
)
@testing.requires.savepoints
with testing.db.connect() as conn:
eq_(
- conn.scalar(select(func.count(1)).select_from(users)), 0,
+ conn.scalar(select(func.count(1)).select_from(users)),
+ 0,
)
@async_test
async def test_pool_exhausted(self, async_engine):
engine = create_async_engine(
- testing.db.url, pool_size=1, max_overflow=0, pool_timeout=0.1,
+ testing.db.url,
+ pool_size=1,
+ max_overflow=0,
+ pool_timeout=0.1,
)
async with engine.connect():
await assert_raises_message_async(
- asyncio.TimeoutError, "", engine.connect(),
+ asyncio.TimeoutError,
+ "",
+ engine.connect(),
)
@async_test
)
elif filter_ == "scalars":
eq_(
- all_, ["name%d" % i for i in range(1, 20)],
+ all_,
+ ["name%d" % i for i in range(1, 20)],
)
else:
eq_(all_, [(i, "name%d" % i) for i in range(1, 20)])
)
elif filter_ == "scalars":
eq_(
- rows, ["name%d" % i for i in range(1, 20)],
+ rows,
+ ["name%d" % i for i in range(1, 20)],
)
else:
eq_(rows, [(i, "name%d" % i) for i in range(1, 20)])
self._assert_raises_ambiguous(lambda: D.c_data == 5)
def test_rel_expressions_not_available(self):
- B, D, = self.classes("B", "D")
+ (
+ B,
+ D,
+ ) = self.classes("B", "D")
self._assert_raises_ambiguous(lambda: D.c_data.any(B.id == 5))
if ckey is not None:
return get_value(
- ckey, CachingQuery.cache, orm_context.invoke_statement,
+ ckey,
+ CachingQuery.cache,
+ orm_context.invoke_statement,
)
return s1
q = sess.query(User).filter(User.id == 7).set_cache_key("user7")
eq_(
- sess.execute(q).all(), [(User(id=7, addresses=[Address(id=1)]),)],
+ sess.execute(q).all(),
+ [(User(id=7, addresses=[Address(id=1)]),)],
)
eq_(list(q.cache), ["user7"])
eq_(
- sess.execute(q).all(), [(User(id=7, addresses=[Address(id=1)]),)],
+ sess.execute(q).all(),
+ [(User(id=7, addresses=[Address(id=1)]),)],
)
def test_use_w_baked(self):
)
).scalars()
eq_(
- {c.city for c in asia_and_europe}, {"Tokyo", "London", "Dublin"},
+ {c.city for c in asia_and_europe},
+ {"Tokyo", "London", "Dublin"},
)
def test_roundtrip(self):
WeatherLocation.continent == "North America"
)
eq_(
- {c.city for c in north_american_cities}, {"New York", "Toronto"},
+ {c.city for c in north_american_cities},
+ {"New York", "Toronto"},
)
asia_and_europe = sess.query(WeatherLocation).filter(
WeatherLocation.continent.in_(["Europe", "Asia"])
)
eq_(
- {c.city for c in asia_and_europe}, {"Tokyo", "London", "Dublin"},
+ {c.city for c in asia_and_europe},
+ {"Tokyo", "London", "Dublin"},
)
# inspect the shard token stored with each instance
eq_(
- {inspect(c).key[2] for c in asia_and_europe}, {"europe", "asia"},
+ {inspect(c).key[2] for c in asia_and_europe},
+ {"europe", "asia"},
)
eq_(
sess.execute(
update(Report)
.filter(Report.temperature >= 80)
- .values({"temperature": Report.temperature + 6},)
+ .values(
+ {"temperature": Report.temperature + 6},
+ )
.execution_options(synchronize_session="evaluate")
)
# four shards
sess.execute(
update(Report)
- .values({"temperature": Report.temperature + 6},)
+ .values(
+ {"temperature": Report.temperature + 6},
+ )
.execution_options(synchronize_session="fetch")
)
class SelectinloadRegressionTest(fixtures.DeclarativeMappedTest):
- """test #4175
- """
+ """test #4175"""
@classmethod
def setup_classes(cls):
class FixtureTest(fixtures.MappedTest):
- """A MappedTest pre-configured with a common set of fixtures.
-
- """
+ """A MappedTest pre-configured with a common set of fixtures."""
run_define_tables = "once"
run_setup_classes = "once"
)
def test_columns_single_inheritance_cascading_resolution_pk(self):
- """An additional test for #4352 in terms of the requested use case.
-
- """
+ """An additional test for #4352 in terms of the requested use case."""
class TestBase(Base):
__abstract__ = True
ta = ["a", metadata]
ta.append(
Column(
- "id", Integer, primary_key=True, test_needs_autoincrement=True,
+ "id",
+ Integer,
+ primary_key=True,
+ test_needs_autoincrement=True,
)
),
ta.append(Column("a_data", String(30)))
def go():
testcar = session.get(
- Car, car1.car_id, options=[joinedload("employee")],
+ Car,
+ car1.car_id,
+ options=[joinedload("employee")],
)
assert str(testcar.employee) == "Engineer E4, status X"
)
def test_entirely_oob_assignment(self):
- """test warn on an unknown polymorphic identity.
- """
+ """test warn on an unknown polymorphic identity."""
B = self.classes.B
sess = Session()
eq_(s1.sub, "s1sub")
def test_optimized_passes(self):
- """"test that the 'optimized load' routine doesn't crash when
+ """ "test that the 'optimized load' routine doesn't crash when
a column in the join condition is not available."""
base, sub = self.tables.base, self.tables.sub
def test_filter_on_subclass_one_future(self):
sess = create_session(future=True)
eq_(
- sess.execute(select(Engineer)).scalar(), Engineer(name="dilbert"),
+ sess.execute(select(Engineer)).scalar(),
+ Engineer(name="dilbert"),
)
def test_filter_on_subclass_two(self):
)
def test_self_referential_two_point_five(self):
- """Using two aliases, the above case works.
- """
+ """Using two aliases, the above case works."""
sess = create_session()
palias = aliased(Person)
palias2 = aliased(Person)
stmt2 = select(pa1, pa2).order_by(pa1.person_id, pa2.person_id)
eq_(
- sess.execute(stmt2).unique().all(), expected,
+ sess.execute(stmt2).unique().all(),
+ expected,
)
def test_self_referential_two_point_five_future(self):
stmt2 = select(pa1, pa2).order_by(pa1.person_id, pa2.person_id)
eq_(
- sess.execute(stmt2).unique().all(), expected,
+ sess.execute(stmt2).unique().all(),
+ expected,
)
def test_nesting_queries(self):
("vlad", "Elbonia, Inc."),
]
eq_(
- q(self, sess).all(), expected,
+ q(self, sess).all(),
+ expected,
)
def test_mixed_entities_two(self):
"""this tests the RasterDocument being attached to the Assembly, but
*not* the Document. this means only a "sub-class" task, i.e.
corresponding to an inheriting mapper but not the base mapper,
- is created. """
+ is created."""
product_mapper = mapper(
Product,
class SubClassToSubClassFromParentTest(fixtures.MappedTest):
- """test #2617
-
- """
+ """test #2617"""
run_setup_classes = "once"
run_setup_mappers = "once"
return b.name
assert_raises(
- orm_exc.UnmappedInstanceError, go,
+ orm_exc.UnmappedInstanceError,
+ go,
)
def test_del_scalar_nonobject(self):
def test_lazytrackparent(self):
"""test that the "hasparent" flag works properly
- when lazy loaders and backrefs are used
+ when lazy loaders and backrefs are used
"""
"""changeset: 1633 broke ability to use ORM to map classes with
unusual descriptor attributes (for example, classes that inherit
from ones implementing zope.interface.Interface). This is a
- simple regression test to prevent that defect. """
+ simple regression test to prevent that defect."""
class des(object):
def __get__(self, instance, owner):
def test_set_commited_value_none_uselist(self):
"""test that set_committed_value->None to a uselist generates an
- empty list """
+ empty list"""
class Foo(object):
pass
},
"e2",
),
- (lambda User: {"clause": select(1).where(User.name == "ed")}, "e1",),
+ (
+ lambda User: {"clause": select(1).where(User.name == "ed")},
+ "e1",
+ ),
(lambda: {"clause": select(1)}, "e3"),
(lambda User: {"clause": Query([User])._statement_20()}, "e1"),
(lambda: {"clause": Query([1])._statement_20()}, "e3"),
)
self._run_cache_key_fixture(
- lambda: stmt_20(one(), two(), three()), compare_values=True,
+ lambda: stmt_20(one(), two(), three()),
+ compare_values=True,
)
class PartialFlushTest(fixtures.MappedTest):
- """test cascade behavior as it relates to object lists passed to flush().
+ """test cascade behavior as it relates to object lists passed
+ to flush().
+
"""
@classmethod
desc_values = (
select(values, descriptions.c.d1, descriptions.c.d2)
- .where(descriptions.c.id == values.c.description_id,)
+ .where(
+ descriptions.c.id == values.c.description_id,
+ )
.alias("descriptions_values")
)
"FROM users) AS anon_1"
)
self.assert_compile(
- stmt1._final_statement(legacy_query_style=False), expected,
+ stmt1._final_statement(legacy_query_style=False),
+ expected,
)
self.assert_compile(stmt2, expected)
)
self.assert_compile(
- stmt1._final_statement(legacy_query_style=False), expected,
+ stmt1._final_statement(legacy_query_style=False),
+ expected,
)
self.assert_compile(stmt2, expected)
"count",
column_property(
select(func.count(addresses.c.id))
- .where(users.c.id == addresses.c.user_id,)
+ .where(
+ users.c.id == addresses.c.user_id,
+ )
.correlate(users)
.scalar_subquery()
),
),
)
- mapper(Address, addresses, properties={"user": relationship(User,)})
+ mapper(
+ Address,
+ addresses,
+ properties={
+ "user": relationship(
+ User,
+ )
+ },
+ )
return User, Address
)
mapper(
- User, users,
+ User,
+ users,
)
- mapper(Address, addresses, properties={"user": relationship(User,)})
+ mapper(
+ Address,
+ addresses,
+ properties={
+ "user": relationship(
+ User,
+ )
+ },
+ )
return User, Address
def test_column_properties_can_we_use(self, column_property_fixture):
"""test querying mappings that reference external columns or
- selectables. """
+ selectables."""
# User, Address = column_property_fixture
# col properties will retain anonymous labels, however will
# adopt the .key within the subquery collection so they can
# be addressed.
- stmt = select(User.id, User.name, User.concat, User.count,)
+ stmt = select(
+ User.id,
+ User.name,
+ User.concat,
+ User.count,
+ )
subq = stmt.subquery()
# here, the subquery needs to export the columns that include
self.assert_compile(stmt, expected)
self.assert_compile(
- q._final_statement(legacy_query_style=False), expected,
+ q._final_statement(legacy_query_style=False),
+ expected,
)
def test_select_where_baseclass(self):
self.assert_compile(stmt, expected)
self.assert_compile(
- q._final_statement(legacy_query_style=False), expected,
+ q._final_statement(legacy_query_style=False),
+ expected,
)
def test_select_where_subclass(self):
self.assert_compile(stmt, expected)
self.assert_compile(
- q._final_statement(legacy_query_style=False), expected,
+ q._final_statement(legacy_query_style=False),
+ expected,
)
class SelfReferentialPostUpdateTest(fixtures.MappedTest):
- """Post_update on a single self-referential mapper.
-
-
- """
+ """Post_update on a single self-referential mapper."""
@classmethod
def define_tables(cls, metadata):
class PostUpdateBatchingTest(fixtures.MappedTest):
- """test that lots of post update cols batch together into a single UPDATE.
- """
+ """test that lots of post update cols batch together into a single
+ UPDATE."""
@classmethod
def define_tables(cls, metadata):
name: Optional[str] = None
__mapper_args__ = dict(
- polymorphic_on=widgets.c.type, polymorphic_identity="normal",
+ polymorphic_on=widgets.c.type,
+ polymorphic_identity="normal",
)
@declarative
magic: bool = False
- __mapper_args__ = dict(polymorphic_identity="special",)
+ __mapper_args__ = dict(
+ polymorphic_identity="special",
+ )
@declarative
@dataclasses.dataclass
True,
testing.requires.computed_columns_on_update_returning,
),
- ("noneagerload", False,),
+ (
+ "noneagerload",
+ False,
+ ),
id_="ia",
)
def test_update_computed(self, eager):
],
[
CompiledSQL(
- "INSERT INTO test (foo) VALUES (:foo)", [{"foo": 5}],
+ "INSERT INTO test (foo) VALUES (:foo)",
+ [{"foo": 5}],
),
CompiledSQL(
- "INSERT INTO test (foo) VALUES (:foo)", [{"foo": 10}],
+ "INSERT INTO test (foo) VALUES (:foo)",
+ [{"foo": 10}],
),
],
)
self.sql_count_(0, go)
def test_preserve_changes(self):
- """A deferred load operation doesn't revert modifications on attributes
- """
+ """A deferred load operation doesn't revert modifications on
+ attributes"""
orders, Order = self.tables.orders, self.classes.Order
def test_locates_col(self):
"""changed in 1.0 - we don't search for deferred cols in the result
- now. """
+ now."""
orders, Order = self.tables.orders, self.classes.Order
s = Session(connection)
- as_ = [A(id=i, cs=[C(), C()],) for i in range(1, 5)]
+ as_ = [
+ A(
+ id=i,
+ cs=[C(), C()],
+ )
+ for i in range(1, 5)
+ ]
s.add_all(
[
def test_selectload(self):
"""tests lazy loading with two relationships simultaneously,
- from the same table, using aliases. """
+ from the same table, using aliases."""
users, orders, User, Address, Order, addresses = (
self.tables.users,
def test_joinedload(self):
"""Eager loading with two relationships simultaneously,
- from the same table, using aliases."""
+ from the same table, using aliases."""
users, orders, User, Address, Order, addresses = (
self.tables.users,
def test_orderby_related(self):
"""A regular mapper select on a single table can
- order by a relationship to a second table"""
+ order by a relationship to a second table"""
Address, addresses, users, User = (
self.classes.Address,
def test_double_w_ac(self):
"""Eager loading with two relationships simultaneously,
- from the same table, using aliases."""
+ from the same table, using aliases."""
(
users,
def test_double_w_ac_against_subquery(self):
"""Eager loading with two relationships simultaneously,
- from the same table, using aliases."""
+ from the same table, using aliases."""
(
users,
# against a select. original issue from ticket #904
sel = (
sa.select(users, addresses.c.email_address)
- .where(users.c.id == addresses.c.user_id,)
+ .where(
+ users.c.id == addresses.c.user_id,
+ )
.alias("useralias")
)
mapper(
tag_score = tags_table.c.score1 * tags_table.c.score2
user_score = sa.select(
sa.func.sum(tags_table.c.score1 * tags_table.c.score2)
- ).where(tags_table.c.user_id == users_table.c.id,)
+ ).where(
+ tags_table.c.user_id == users_table.c.id,
+ )
if labeled:
tag_score = tag_score.label(labelname)
ckey = orm_context.execution_options["cache_key"]
if ckey is not None:
- return get_value(ckey, cache, orm_context.invoke_statement,)
+ return get_value(
+ ckey,
+ cache,
+ orm_context.invoke_statement,
+ )
return maker()
class DeferredMapperEventsTest(_RemoveListeners, _fixtures.FixtureTest):
- """"test event listeners against unmapped classes.
+ """ "test event listeners against unmapped classes.
This incurs special logic. Note if we ever do the "remove" case,
it has to get all of these, too.
eq_(len(list(sess)), 9)
def test_state_change_col_to_deferred(self):
- """Behavioral test to verify the current activity of loader callables
+ """Behavioral test to verify the current activity of loader
+ callables
+
"""
users, User = self.tables.users, self.classes.User
assert "name" not in attributes.instance_state(u1).callables
def test_state_deferred_to_col(self):
- """Behavioral test to verify the current activity of loader callables
+ """Behavioral test to verify the current activity of
+ loader callables
+
"""
users, User = self.tables.users, self.classes.User
assert "name" not in attributes.instance_state(u1).callables
def test_state_noload_to_lazy(self):
- """Behavioral test to verify the current activity of loader callables
+ """Behavioral test to verify the current activity of
+ loader callables
+
"""
users, Address, addresses, User = (
subq = select(User).filter(User.id.in_([8, 9])).subquery()
q = create_session().query(aliased(User, subq))
eq_(
- [User(id=8), User(id=9)], q.all(),
+ [User(id=8), User(id=9)],
+ q.all(),
)
subq = select(User).order_by(User.id).slice(1, 3).subquery()
u = aliased(User, subq)
q = create_session().query(u).order_by(u.id)
eq_(
- [User(id=8)], list(q[0:1]),
+ [User(id=8)],
+ list(q[0:1]),
)
def test_join(self):
aq = aliased(Address, subq)
q = create_session().query(aq.user_id, subq.c.count)
eq_(
- q.all(), [(7, 1), (8, 3), (9, 1)],
+ q.all(),
+ [(7, 1), (8, 3), (9, 1)],
)
subq = select(Address.user_id, Address.id)
)
eq_(
- q.all(), [(7, 1), (8, 3), (9, 1)],
+ q.all(),
+ [(7, 1), (8, 3), (9, 1)],
)
def test_error_w_aliased_against_select(self):
)
eq_(
- q.all(), [("chuck", "ed"), ("fred", "ed")],
+ q.all(),
+ [("chuck", "ed"), ("fred", "ed")],
)
q = (
q3 = sess.query(q2)
eq_(
- q3.all(), [(7, 1), (8, 1), (9, 1), (10, 1)],
+ q3.all(),
+ [(7, 1), (8, 1), (9, 1), (10, 1)],
)
q3 = select(q2)
select(User, Address).from_statement(selectquery)
)
eq_(
- list(result), expected,
+ list(result),
+ expected,
)
sess.expunge_all()
"concat": column_property((users.c.id * 2)),
"count": column_property(
select(func.count(addresses.c.id))
- .where(users.c.id == addresses.c.user_id,)
+ .where(
+ users.c.id == addresses.c.user_id,
+ )
.correlate(users)
.scalar_subquery()
),
},
)
- mapper(Address, addresses, properties={"user": relationship(User,)})
+ mapper(
+ Address,
+ addresses,
+ properties={
+ "user": relationship(
+ User,
+ )
+ },
+ )
sess = create_session()
"concat": column_property((users.c.id * 2)),
"count": column_property(
select(func.count(addresses.c.id))
- .where(users.c.id == addresses.c.user_id,)
+ .where(
+ users.c.id == addresses.c.user_id,
+ )
.correlate(users)
.scalar_subquery()
),
)
def test_single_prop_4(self):
- Order, User, = (self.classes.Order, self.classes.User)
+ (
+ Order,
+ User,
+ ) = (self.classes.Order, self.classes.User)
sess = create_session()
oalias1 = aliased(Order)
)
def test_single_prop_5(self):
- Order, User, = (self.classes.Order, self.classes.User)
+ (
+ Order,
+ User,
+ ) = (self.classes.Order, self.classes.User)
sess = create_session()
self.assert_compile(
)
def test_single_prop_8(self):
- Order, User, = (self.classes.Order, self.classes.User)
+ (
+ Order,
+ User,
+ ) = (self.classes.Order, self.classes.User)
sess = create_session()
# same as before using an aliased() for User as well
def test_no_relationship_cascade(self):
"""test that merge doesn't interfere with a relationship()
- target that specifically doesn't include 'merge' cascade.
+ target that specifically doesn't include 'merge' cascade.
"""
Address, addresses, users, User = (
# test passive_updates=True; update user
self.assert_sql_count(testing.db, go, 1)
sess.expunge_all()
- assert User(
- username="jack",
- addresses=[Address(username="jack"), Address(username="jack")],
- ) == sess.query(User).get("jack")
+ assert (
+ User(
+ username="jack",
+ addresses=[Address(username="jack"), Address(username="jack")],
+ )
+ == sess.query(User).get("jack")
+ )
u1 = sess.query(User).get("jack")
u1.addresses = []
# test passive_updates=True; update user
self.assert_sql_count(testing.db, go, 1)
sess.expunge_all()
- assert User(
- username="jack",
- addresses=[Address(username="jack"), Address(username="jack")],
- ) == sess.query(User).get(u1.id)
+ assert (
+ User(
+ username="jack",
+ addresses=[Address(username="jack"), Address(username="jack")],
+ )
+ == sess.query(User).get(u1.id)
+ )
sess.expunge_all()
u1 = sess.query(User).get(u1.id)
)
def test_any_walias(self):
- DataContainer, Job, = (self.classes.DataContainer, self.classes.Job)
+ (
+ DataContainer,
+ Job,
+ ) = (self.classes.DataContainer, self.classes.Job)
Job_A = aliased(Job)
)
def test_join_walias(self):
- DataContainer, Job, = (self.classes.DataContainer, self.classes.Job)
+ (
+ DataContainer,
+ Job,
+ ) = (self.classes.DataContainer, self.classes.Job)
Job_A = aliased(Job)
stmt = select(User).execution_options(populate_existing=True)
- s.execute(stmt,).scalars().all()
+ s.execute(
+ stmt,
+ ).scalars().all()
self.assert_(u not in s.dirty)
# test that the contents are not adapted by the aliased join
ua = aliased(Address)
- assert (
- [User(id=7), User(id=8)]
- == sess.query(User)
- .join(ua, "addresses")
- .filter(
- ~User.addresses.any(Address.email_address == "fred@fred.com")
- )
- .all()
- )
+ assert [User(id=7), User(id=8)] == sess.query(User).join(
+ ua, "addresses"
+ ).filter(
+ ~User.addresses.any(Address.email_address == "fred@fred.com")
+ ).all()
assert [User(id=10)] == sess.query(User).outerjoin(
ua, "addresses"
sess = create_session()
# test that any() doesn't overcorrelate
- assert (
- [User(id=7), User(id=8)]
- == sess.query(User)
- .join("addresses")
- .filter(
- ~User.addresses.any(Address.email_address == "fred@fred.com")
- )
- .all()
- )
+ assert [User(id=7), User(id=8)] == sess.query(User).join(
+ "addresses"
+ ).filter(
+ ~User.addresses.any(Address.email_address == "fred@fred.com")
+ ).all()
def test_has(self):
# see also HasAnyTest, a newer suite which tests these at the level of
Address.user.has(name="fred")
).all()
- assert (
- [Address(id=2), Address(id=3), Address(id=4), Address(id=5)]
- == sess.query(Address)
- .filter(Address.user.has(User.name.like("%ed%")))
- .order_by(Address.id)
- .all()
- )
+ assert [
+ Address(id=2),
+ Address(id=3),
+ Address(id=4),
+ Address(id=5),
+ ] == sess.query(Address).filter(
+ Address.user.has(User.name.like("%ed%"))
+ ).order_by(
+ Address.id
+ ).all()
- assert (
- [Address(id=2), Address(id=3), Address(id=4)]
- == sess.query(Address)
- .filter(Address.user.has(User.name.like("%ed%"), id=8))
- .order_by(Address.id)
- .all()
- )
+ assert [Address(id=2), Address(id=3), Address(id=4)] == sess.query(
+ Address
+ ).filter(Address.user.has(User.name.like("%ed%"), id=8)).order_by(
+ Address.id
+ ).all()
# test has() doesn't overcorrelate
- assert (
- [Address(id=2), Address(id=3), Address(id=4)]
- == sess.query(Address)
- .join("user")
- .filter(Address.user.has(User.name.like("%ed%"), id=8))
- .order_by(Address.id)
- .all()
- )
+ assert [Address(id=2), Address(id=3), Address(id=4)] == sess.query(
+ Address
+ ).join("user").filter(
+ Address.user.has(User.name.like("%ed%"), id=8)
+ ).order_by(
+ Address.id
+ ).all()
# test has() doesn't get subquery contents adapted by aliased join
ua = aliased(User)
- assert (
- [Address(id=2), Address(id=3), Address(id=4)]
- == sess.query(Address)
- .join(ua, "user")
- .filter(Address.user.has(User.name.like("%ed%"), id=8))
- .order_by(Address.id)
- .all()
- )
+ assert [Address(id=2), Address(id=3), Address(id=4)] == sess.query(
+ Address
+ ).join(ua, "user").filter(
+ Address.user.has(User.name.like("%ed%"), id=8)
+ ).order_by(
+ Address.id
+ ).all()
dingaling = sess.query(Dingaling).get(2)
assert [User(id=9)] == sess.query(User).filter(
def test_union_literal_expressions_compile(self):
"""test that column expressions translate during
- the _from_statement() portion of union(), others"""
+ the _from_statement() portion of union(), others"""
User = self.classes.User
User, Address = self.classes.User, self.classes.Address
sess = create_session()
- assert (
- [User(name="ed", id=8)]
- == sess.query(User)
- .order_by(User.id)
- .group_by(User)
- .join("addresses")
- .having(func.count(Address.id) > 2)
- .all()
- )
+ assert [User(name="ed", id=8)] == sess.query(User).order_by(
+ User.id
+ ).group_by(User).join("addresses").having(
+ func.count(Address.id) > 2
+ ).all()
- assert (
- [User(name="jack", id=7), User(name="fred", id=9)]
- == sess.query(User)
- .order_by(User.id)
- .group_by(User)
- .join("addresses")
- .having(func.count(Address.id) < 2)
- .all()
- )
+ assert [
+ User(name="jack", id=7),
+ User(name="fred", id=9),
+ ] == sess.query(User).order_by(User.id).group_by(User).join(
+ "addresses"
+ ).having(
+ func.count(Address.id) < 2
+ ).all()
class ExistsTest(QueryTest, AssertsCompiledSQL):
s = create_session()
eq_(
- s.execute(select(func.count()).select_from(User)).scalar(), 4,
+ s.execute(select(func.count()).select_from(User)).scalar(),
+ 4,
)
eq_(
stmt = select(User, Address).join(Address, true()).limit(2)
eq_(
- s.scalar(select(func.count()).select_from(stmt.subquery())), 2,
+ s.scalar(select(func.count()).select_from(stmt.subquery())),
+ 2,
)
stmt = select(User, Address).join(Address, true()).limit(100)
eq_(
- s.scalar(select(func.count()).select_from(stmt.subquery())), 20,
+ s.scalar(select(func.count()).select_from(stmt.subquery())),
+ 20,
)
stmt = select(User, Address).join(Address).limit(100)
eq_(
- s.scalar(select(func.count()).select_from(stmt.subquery())), 5,
+ s.scalar(select(func.count()).select_from(stmt.subquery())),
+ 5,
)
def test_cols(self):
stmt = select(func.count(distinct(User.name)))
eq_(
- s.scalar(select(func.count()).select_from(stmt.subquery())), 1,
+ s.scalar(select(func.count()).select_from(stmt.subquery())),
+ 1,
)
stmt = select(func.count(distinct(User.name))).distinct()
eq_(
- s.scalar(select(func.count()).select_from(stmt.subquery())), 1,
+ s.scalar(select(func.count()).select_from(stmt.subquery())),
+ 1,
)
stmt = select(User.name)
eq_(
- s.scalar(select(func.count()).select_from(stmt.subquery())), 4,
+ s.scalar(select(func.count()).select_from(stmt.subquery())),
+ 4,
)
stmt = select(User.name, Address).join(Address, true())
eq_(
- s.scalar(select(func.count()).select_from(stmt.subquery())), 20,
+ s.scalar(select(func.count()).select_from(stmt.subquery())),
+ 20,
)
stmt = select(Address.user_id)
eq_(
- s.scalar(select(func.count()).select_from(stmt.subquery())), 5,
+ s.scalar(select(func.count()).select_from(stmt.subquery())),
+ 5,
)
stmt = stmt.distinct()
eq_(
- s.scalar(select(func.count()).select_from(stmt.subquery())), 3,
+ s.scalar(select(func.count()).select_from(stmt.subquery())),
+ 3,
)
.order_by(User.id, User.name, Address.email_address)
)
q2 = sess.query(
- User.id, User.name.label("foo"), Address.id, Address.email_address,
+ User.id,
+ User.name.label("foo"),
+ Address.id,
+ Address.email_address,
)
self.assert_compile(
sess = create_session()
q = (
- sess.query(User.id, User.name.label("foo"), Address.id,)
+ sess.query(
+ User.id,
+ User.name.label("foo"),
+ Address.id,
+ )
.distinct(Address.email_address)
.order_by(User.id, User.name)
)
pass
mapper(
- UserWFoob, users,
+ UserWFoob,
+ users,
)
return HasFoob, UserWFoob
s.execute(stmt).all()
asserter.assert_(
- CompiledSQL("SELECT users.id, users.name FROM users", [],),
+ CompiledSQL(
+ "SELECT users.id, users.name FROM users",
+ [],
+ ),
CompiledSQL(
"SELECT addresses.user_id AS addresses_user_id, addresses.id "
"AS addresses_id, addresses.email_address "
asserter.assert_(
CompiledSQL(
- "SELECT users.id, users.name FROM users ORDER BY users.id", [],
+ "SELECT users.id, users.name FROM users ORDER BY users.id",
+ [],
),
CompiledSQL(
"SELECT addresses.id AS addresses_id, "
.outerjoin(User.addresses)
.options(
with_loader_criteria(
- Address, ~Address.email_address.like("ed@%"),
+ Address,
+ ~Address.email_address.like("ed@%"),
)
)
.order_by(User.id)
class TypeMatchTest(fixtures.MappedTest):
"""test errors raised when trying to add items
- whose type is not handled by a relationship"""
+ whose type is not handled by a relationship"""
@classmethod
def define_tables(cls, metadata):
return
mapper(
- A, self.tables.t1, properties={"bs": rel()},
+ A,
+ self.tables.t1,
+ properties={"bs": rel()},
)
mapper(B, self.tables.t2)
class FunctionAsPrimaryJoinTest(fixtures.DeclarativeMappedTest):
- """test :ticket:`3831`
-
- """
+ """test :ticket:`3831`"""
__only_on__ = "sqlite"
def test_orderby_related(self):
"""A regular mapper select on a single table can
- order by a relationship to a second table"""
+ order by a relationship to a second table"""
Address, addresses, users, User = (
self.classes.Address,
def test_orderby_related(self):
"""A regular mapper select on a single table can
- order by a relationship to a second table"""
+ order by a relationship to a second table"""
Address, addresses, users, User = (
self.classes.Address,
s = Session(connection)
- as_ = [A(id=i, cs=[C(), C()],) for i in range(1, 5)]
+ as_ = [
+ A(
+ id=i,
+ cs=[C(), C()],
+ )
+ for i in range(1, 5)
+ ]
s.add_all(
[
class NonFutureJoinIntoAnExternalTransactionTest(
- NewStyleJoinIntoAnExternalTransactionTest, fixtures.TestBase,
+ NewStyleJoinIntoAnExternalTransactionTest,
+ fixtures.TestBase,
):
pass
class LegacyJoinIntoAnExternalTransactionTest(
- JoinIntoAnExternalTransactionFixture, fixtures.TestBase,
+ JoinIntoAnExternalTransactionFixture,
+ fixtures.TestBase,
):
def setup_session(self):
# begin a non-ORM transaction
)
assert_raises_message(
- sa.exc.InvalidRequestError, message, s.flush,
+ sa.exc.InvalidRequestError,
+ message,
+ s.flush,
)
else:
s.flush()
stmt = (
update(User)
.filter(User.id == 15)
- .ordered_values(("name", "foob"), ("age", 123),)
+ .ordered_values(
+ ("name", "foob"),
+ ("age", 123),
+ )
)
result = session.execute(stmt)
cols = [
@property
def cross_schema_fk_reflection(self):
- """target system must support reflection of inter-schema foreign keys
- """
+ """target system must support reflection of inter-schema foreign
+ keys"""
return only_on(["postgresql", "mysql", "mariadb", "mssql"])
@property
def implicit_default_schema(self):
"""target system has a strong concept of 'default' schema that can
- be referred to implicitly.
+ be referred to implicitly.
- basically, PostgreSQL.
+ basically, PostgreSQL.
"""
return only_on(["postgresql"])
@property
def symbol_names_w_double_quote(self):
- """Target driver can create tables with a name like 'some " table'
-
- """
+ """Target driver can create tables with a name like 'some " table'"""
return skip_if(
[no_support("oracle", "ORA-03001: unimplemented feature")]
@property
def emulated_lastrowid(self):
- """"target dialect retrieves cursor.lastrowid or an equivalent
+ """ "target dialect retrieves cursor.lastrowid or an equivalent
after an insert() construct executes.
"""
return fails_on_everything_except(
@property
def emulated_lastrowid_even_with_sequences(self):
- """"target dialect retrieves cursor.lastrowid or an equivalent
+ """ "target dialect retrieves cursor.lastrowid or an equivalent
after an insert() construct executes, even if the table has a
Sequence on it.
"""
@property
def dbapi_lastrowid(self):
- """"target backend includes a 'lastrowid' accessor on the DBAPI
+ """ "target backend includes a 'lastrowid' accessor on the DBAPI
cursor object.
"""
(table_a.c.b == 10, 20),
(table_a.c.a == 9, 12),
),
- case((table_a.c.a == 5, 10), (table_a.c.a == 10, 20), else_=30,),
+ case(
+ (table_a.c.a == 5, 10),
+ (table_a.c.a == 10, 20),
+ else_=30,
+ ),
case({"wendy": "W", "jack": "J"}, value=table_a.c.a, else_="E"),
case({"wendy": "W", "jack": "J"}, value=table_a.c.b, else_="E"),
case({"wendy_w": "W", "jack": "J"}, value=table_a.c.a, else_="E"),
)
self._run_cache_key_fixture(
- fixture, True,
+ fixture,
+ True,
)
def test_bindparam_subclass_nocache(self):
# this is native_boolean=False for default dialect
self.assert_compile(
- select(not_(True)).apply_labels(), "SELECT :param_1 = 0 AS anon_1",
+ select(not_(True)).apply_labels(),
+ "SELECT :param_1 = 0 AS anon_1",
)
self.assert_compile(
foo_bar__id = foo_bar.c.id._annotate({"some_orm_thing": True})
stmt = select(
- foo.c.bar_id, foo_bar.c.id, foo_bar.c.id, foo_bar__id, foo_bar__id,
+ foo.c.bar_id,
+ foo_bar.c.id,
+ foo_bar.c.id,
+ foo_bar__id,
+ foo_bar__id,
).apply_labels()
self.assert_compile(
)
def test_nested_label_targeting(self):
- """test nested anonymous label generation.
-
- """
+ """test nested anonymous label generation."""
s1 = table1.select()
s2 = s1.alias()
s3 = select(s2).apply_labels()
def test_order_by_nulls(self):
self.assert_compile(
table2.select().order_by(
- table2.c.otherid, table2.c.othername.desc().nullsfirst(),
+ table2.c.otherid,
+ table2.c.othername.desc().nullsfirst(),
),
"SELECT myothertable.otherid, myothertable.othername FROM "
"myothertable ORDER BY myothertable.otherid, "
self.assert_compile(
table2.select().order_by(
- table2.c.otherid, table2.c.othername.desc().nullslast(),
+ table2.c.otherid,
+ table2.c.othername.desc().nullslast(),
),
"SELECT myothertable.otherid, myothertable.othername FROM "
"myothertable ORDER BY myothertable.otherid, "
self.assert_compile(
table2.select().order_by(
- table2.c.otherid.nullsfirst(), table2.c.othername.desc(),
+ table2.c.otherid.nullsfirst(),
+ table2.c.othername.desc(),
),
"SELECT myothertable.otherid, myothertable.othername FROM "
"myothertable ORDER BY myothertable.otherid NULLS FIRST, "
"Can't resolve label reference for ORDER BY / GROUP BY / "
"DISTINCT etc. Textual "
"SQL expression 'noname'",
- union(select(table1.c.myid, table1.c.name), select(table2),)
+ union(
+ select(table1.c.myid, table1.c.name),
+ select(table2),
+ )
.order_by("noname")
.compile,
)
def _test_binds_no_hash_collision(self):
"""test that construct_params doesn't corrupt dict
- due to hash collisions"""
+ due to hash collisions"""
total_params = 100000
compiled = stmt_adapted.compile(cache_key=cache_key)
# params set up as 5
- eq_(compiled.construct_params(params={},), {"myid_1": 5})
+ eq_(
+ compiled.construct_params(
+ params={},
+ ),
+ {"myid_1": 5},
+ )
# also works w the original cache key
eq_(
compiled = modified_stmt.compile(cache_key=cache_key)
eq_(
- compiled.construct_params(params={}), {"myid_1": 10, "myid_2": 12},
+ compiled.construct_params(params={}),
+ {"myid_1": 10, "myid_2": 12},
)
# make a new statement doing the same thing and make sure
)
def test_recursive_union_alias_two(self):
- """
-
- """
+ """"""
# I know, this is the PG VALUES keyword,
# we're cheating here. also yes we need the SELECT,
s2 = (
select(
- orders.c.order == "y", s1a.c.order, orders.c.order, s1.c.order,
+ orders.c.order == "y",
+ s1a.c.order,
+ orders.c.order,
+ s1.c.order,
)
.where(orders.c.order == "z")
.cte("regional_sales_2")
s2 = (
select(
- orders.c.order == "y", s1a.c.order, orders.c.order, s1.c.order,
+ orders.c.order == "y",
+ s1a.c.order,
+ orders.c.order,
+ s1.c.order,
)
.where(orders.c.order == "z")
.cte("regional_sales_2")
Column("boolcol1", sa.Boolean, default=True),
Column("boolcol2", sa.Boolean, default=False),
# python function which uses ExecutionContext
- Column("col7", Integer, default=lambda: 5, onupdate=lambda: 10,),
+ Column(
+ "col7",
+ Integer,
+ default=lambda: 5,
+ onupdate=lambda: 10,
+ ),
# python builtin
Column(
"col8",
eq_(r.inserted_primary_key, (None,))
else:
eq_(
- r.inserted_primary_key, (expected_result,),
+ r.inserted_primary_key,
+ (expected_result,),
)
eq_(
- conn.execute(t.select()).first(), (expected_result, 5),
+ conn.execute(t.select()).first(),
+ (expected_result, 5),
)
def test_plain(self):
r"The \"whens\" argument to case\(\) is now passed"
):
stmt = select(t1).where(
- case(whens={t1.c.q == 5: "foo"}, else_="bat",) != "bat"
+ case(
+ whens={t1.c.q == 5: "foo"},
+ else_="bat",
+ )
+ != "bat"
)
self.assert_compile(
@classmethod
def insert_data(cls, connection):
connection.execute(
- cls.tables.text1.insert(), [dict(a="a1", b="b1", c="c1", d="d1")],
+ cls.tables.text1.insert(),
+ [dict(a="a1", b="b1", c="c1", d="d1")],
)
def test_anon_aliased_overlapping(self, connection):
stmt = table.insert(values={}, inline=True)
self.assert_compile(
- stmt, "INSERT INTO sometable (foo) VALUES (foobar())",
+ stmt,
+ "INSERT INTO sometable (foo) VALUES (foobar())",
)
with testing.expect_deprecated_20(
stmt = table.insert(inline=True)
self.assert_compile(
- stmt, "INSERT INTO sometable (foo) VALUES (foobar())", params={},
+ stmt,
+ "INSERT INTO sometable (foo) VALUES (foobar())",
+ params={},
)
def test_update_inline_kw_defaults(self):
def test_update_whereclause(self):
table1 = table(
- "mytable", Column("myid", Integer), Column("name", String(30)),
+ "mytable",
+ Column("myid", Integer),
+ Column("name", String(30)),
)
with testing.expect_deprecated_20(
def test_update_values(self):
table1 = table(
- "mytable", Column("myid", Integer), Column("name", String(30)),
+ "mytable",
+ Column("myid", Integer),
+ Column("name", String(30)),
)
with testing.expect_deprecated_20(
)
def test_delete_whereclause(self):
- table1 = table("mytable", Column("myid", Integer),)
+ table1 = table(
+ "mytable",
+ Column("myid", Integer),
+ )
with testing.expect_deprecated_20(
"The delete.whereclause parameter will be "
subq = subq.alias("subq")
s = select(t1.c.col1, subq.c.col1).select_from(
- t1, subq, t1.join(subq, t1.c.col1 == subq.c.col2),
+ t1,
+ subq,
+ t1.join(subq, t1.c.col1 == subq.c.col2),
)
s5 = CloningVisitor().traverse(s)
eq_(str(s), str(s5))
compile_state = i._compile_state_factory(i, None)
self._compare_param_dict(
- compile_state._dict_parameters, {"col1": 5, "col2": 6, "col3": 7},
+ compile_state._dict_parameters,
+ {"col1": 5, "col2": 6, "col3": 7},
)
def test_kw_and_dict_simultaneously_single(self):
i = i.values([(5, 6, 7), (8, 9, 10)])
compile_state = i._compile_state_factory(i, None)
eq_(
- compile_state._dict_parameters, {"col1": 5, "col2": 6, "col3": 7},
+ compile_state._dict_parameters,
+ {"col1": 5, "col2": 6, "col3": 7},
)
eq_(compile_state._has_multi_parameters, True)
eq_(
connection.execute(t2.insert())
connection.execute(t2.insert().values(value=func.length("one")))
connection.execute(
- t2.insert().values(value=func.length("asfda") + -19), stuff="hi",
+ t2.insert().values(value=func.length("asfda") + -19),
+ stuff="hi",
)
res = sorted(connection.execute(select(t2.c.value, t2.c.stuff)))
dict(always=False, cache=1000, order=True),
"BY DEFAULT AS IDENTITY (CACHE 1000 ORDER)",
),
- (dict(order=True), "BY DEFAULT AS IDENTITY (ORDER)",),
+ (
+ dict(order=True),
+ "BY DEFAULT AS IDENTITY (ORDER)",
+ ),
)
def test_create_ddl(self, identity_args, text):
MetaData(),
Column("foo", Integer(), Identity("always", start=3)),
)
- t2 = Table("foo_table", MetaData(), Column("foo", Integer()),)
+ t2 = Table(
+ "foo_table",
+ MetaData(),
+ Column("foo", Integer()),
+ )
exp = CreateTable(t2).compile(dialect=testing.db.dialect)
self.assert_compile(
- CreateTable(t), re.sub(r"[\n\t]", "", str(exp)),
+ CreateTable(t),
+ re.sub(r"[\n\t]", "", str(exp)),
)
def fn(**kwargs):
Table(
- "t", MetaData(), Column("y", Integer, Identity(), **kwargs),
+ "t",
+ MetaData(),
+ Column("y", Integer, Identity(), **kwargs),
)
assert_raises_message(ArgumentError, text, fn, server_default="42")
"t4",
metadata,
Column(
- "id", Integer, Sequence("t4_id_seq"), primary_key=True,
+ "id",
+ Integer,
+ Sequence("t4_id_seq"),
+ primary_key=True,
),
Column("foo", String(30)),
),
Table(
"foo",
metadata,
- Column("id", Integer, Sequence("t_id_seq"), primary_key=True,),
+ Column(
+ "id",
+ Integer,
+ Sequence("t_id_seq"),
+ primary_key=True,
+ ),
Column("data", String(50)),
Column("x", Integer),
)
metadata,
# note this will have full AUTO INCREMENT on MariaDB
# whereas "foo" will not due to sequence support
- Column("id", Integer, primary_key=True,),
+ Column(
+ "id",
+ Integer,
+ primary_key=True,
+ ),
Column("data", String(50)),
Column("x", Integer),
)
"Column('foo', Integer(), table=None, primary_key=True, "
"nullable=False, onupdate=%s, default=%s, server_default=%s, "
"comment='foo')"
- % (ColumnDefault(1), ColumnDefault(42), DefaultClause("42"),),
+ % (
+ ColumnDefault(1),
+ ColumnDefault(42),
+ DefaultClause("42"),
+ ),
),
(
Table("bar", MetaData(), Column("x", String)),
@classmethod
def check_dialect_options_(cls, t):
eq_(
- t.dialect_kwargs["copydialectoptionstest_some_table_arg"], "a1",
+ t.dialect_kwargs["copydialectoptionstest_some_table_arg"],
+ "a1",
)
eq_(
t.c.foo.dialect_kwargs["copydialectoptionstest_some_column_arg"],
copydialectoptionstest_some_table_arg="a1",
)
Index(
- "idx", t1.c.foo, copydialectoptionstest_some_index_arg="a4",
+ "idx",
+ t1.c.foo,
+ copydialectoptionstest_some_index_arg="a4",
)
self.check_dialect_options_(t1)
class ConjunctionTest(fixtures.TestBase, testing.AssertsCompiledSQL):
- """test interaction of and_()/or_() with boolean , null constants
- """
+ """test interaction of and_()/or_() with boolean , null constants"""
__dialect__ = default.DefaultDialect(supports_native_boolean=True)
)
.select_from(
self.table1.join(
- self.table2, self.table1.c.myid == self.table2.c.otherid,
+ self.table2,
+ self.table1.c.myid == self.table2.c.otherid,
)
)
.order_by(self.table1.c.myid),
eq_(connection.execute(select(or_(true, false))).scalar(), True)
eq_(connection.execute(select(or_(false, false))).scalar(), False)
eq_(
- connection.execute(select(not_(or_(false, false)))).scalar(), True,
+ connection.execute(select(not_(or_(false, false)))).scalar(),
+ True,
)
row = connection.execute(
def test_select_tuple(self, connection):
connection.execute(
- users.insert(), {"user_id": 1, "user_name": "apples"},
+ users.insert(),
+ {"user_id": 1, "user_name": "apples"},
)
assert_raises_message(
return "INT_%d" % value
eq_(
- connection.scalar(select(cast("INT_5", type_=MyInteger))), "INT_5",
+ connection.scalar(select(cast("INT_5", type_=MyInteger))),
+ "INT_5",
)
eq_(
connection.scalar(
@testing.fails_on("sqlite", "FIXME: unknown")
def test_union_all(self, connection):
e = union_all(
- select(t1.c.col3), union(select(t1.c.col3), select(t1.c.col3)),
+ select(t1.c.col3),
+ union(select(t1.c.col3), select(t1.c.col3)),
)
wanted = [("aaa",), ("aaa",), ("bbb",), ("bbb",), ("ccc",), ("ccc",)]
for criteria in (t2.c.t2_id == t3.c.t2_id, t3.c.t2_id == t2.c.t2_id):
expr = (
select(t1.c.t1_id, t2.c.t2_id, t3.c.t3_id)
- .where(t1.c.name == "t1 #10",)
+ .where(
+ t1.c.name == "t1 #10",
+ )
.select_from((t1.join(t2).outerjoin(t3, criteria)))
)
self.assertRows(expr, [(10, 20, 30)])
expr = (
select(t1.c.t1_id, t2.c.t2_id, t3.c.t3_id)
- .where(t2.c.name == "t2 #20",)
+ .where(
+ t2.c.name == "t2 #20",
+ )
.select_from((t1.join(t2).outerjoin(t3, criteria)))
)
self.assertRows(expr, [(10, 20, 30)])
expr = (
select(t1.c.t1_id, t2.c.t2_id, t3.c.t3_id)
- .where(t3.c.name == "t3 #30",)
+ .where(
+ t3.c.name == "t3 #30",
+ )
.select_from((t1.join(t2).outerjoin(t3, criteria)))
)
self.assertRows(expr, [(10, 20, 30)])
expr = (
select(t1.c.t1_id, t2.c.t2_id, t3.c.t3_id)
- .where(and_(t1.c.name == "t1 #10", t2.c.name == "t2 #20"),)
+ .where(
+ and_(t1.c.name == "t1 #10", t2.c.name == "t2 #20"),
+ )
.select_from((t1.join(t2).outerjoin(t3, criteria)))
)
self.assertRows(expr, [(10, 20, 30)])
expr = (
select(t1.c.t1_id, t2.c.t2_id, t3.c.t3_id)
- .where(and_(t2.c.name == "t2 #20", t3.c.name == "t3 #30"),)
+ .where(
+ and_(t2.c.name == "t2 #20", t3.c.name == "t3 #30"),
+ )
.select_from((t1.join(t2).outerjoin(t3, criteria)))
)
self.assertRows(expr, [(10, 20, 30)])
t2 = Table("t2", m, Column("x", Integer), quote=True)
self.assert_compile(
- select(t2.c.x).apply_labels(), 'SELECT "t2".x AS "t2_x" FROM "t2"',
+ select(t2.c.x).apply_labels(),
+ 'SELECT "t2".x AS "t2_x" FROM "t2"',
)
Column("team_id", metadata, ForeignKey("teams.id")),
)
Table(
- "teams", metadata, Column("id", Integer, primary_key=True),
+ "teams",
+ metadata,
+ Column("id", Integer, primary_key=True),
)
@classmethod
# this has _result_columns structure that is not ordered
# the same as the cursor.description.
return text("select a AS keyed2_a, b AS keyed2_b from keyed2").columns(
- keyed2_b=CHAR, keyed2_a=CHAR,
+ keyed2_b=CHAR,
+ keyed2_a=CHAR,
)
def _adapt_result_columns_fixture_seven(self):
@classmethod
def insert_data(cls, connection):
connection.execute(
- cls.tables.text1.insert(), [dict(a="a1", b="b1", c="c1", d="d1")],
+ cls.tables.text1.insert(),
+ [dict(a="a1", b="b1", c="c1", d="d1")],
)
def test_via_column(self, connection):
result = r1.merge(r2, r3, r4)
eq_(
- result.first(), (7, "u1"),
+ result.first(),
+ (7, "u1"),
)
for r in [r1, r2, r3, r4]:
assert r.closed
table = Table(
"tables",
meta,
- Column("id", Integer, seq, primary_key=True,),
+ Column(
+ "id",
+ Integer,
+ seq,
+ primary_key=True,
+ ),
Column("data", String(50)),
)
with testing.db.connect() as conn:
"implicitly coercing SELECT object to scalar subquery"
):
expect(
- roles.LabeledColumnExprRole, select(column("q")),
+ roles.LabeledColumnExprRole,
+ select(column("q")),
)
with testing.expect_warnings(
"implicitly coercing SELECT object to scalar subquery"
):
expect(
- roles.LabeledColumnExprRole, select(column("q")).alias(),
+ roles.LabeledColumnExprRole,
+ select(column("q")).alias(),
)
def test_statement_no_text_coercion(self):
def test_labels_overlap_label(self):
sel = self._labels_overlap().apply_labels()
eq_(
- list(sel.selected_columns.keys()), ["t_x_id", "t_x_id_1"],
+ list(sel.selected_columns.keys()),
+ ["t_x_id", "t_x_id_1"],
)
eq_(
list(sel.subquery().c.keys()),
def test_keylabels_overlap_labels_dont_label(self):
sel = self._keylabels_overlap_labels_dont().apply_labels()
eq_(
- list(sel.selected_columns.keys()), ["t_x_id", "t_x_b_1"],
+ list(sel.selected_columns.keys()),
+ ["t_x_id", "t_x_b_1"],
)
eq_(
- list(sel.subquery().c.keys()), ["t_x_id", "t_x_b_1"],
+ list(sel.subquery().c.keys()),
+ ["t_x_id", "t_x_b_1"],
)
self._assert_result_keys(sel, ["t_a", "t_x_b"])
self._assert_subq_result_keys(sel, ["t_a", "t_x_b"])
def test_keylabels_overlap_labels_overlap_label(self):
sel = self._keylabels_overlap_labels_overlap().apply_labels()
eq_(
- list(sel.selected_columns.keys()), ["t_x_a", "t_x_id_1"],
+ list(sel.selected_columns.keys()),
+ ["t_x_a", "t_x_id_1"],
)
# deduping for different cols but same label
def test_explicit_optional(self):
"""test dialect executes a Sequence, returns nextval, whether
- or not "optional" is set """
+ or not "optional" is set"""
s = Sequence("my_sequence", optional=True)
self._assert_seq_result(s.execute(testing.db))
def test_func_implicit_connectionless_execute(self):
"""test func.next_value().execute()/.scalar() works
- with connectionless execution. """
+ with connectionless execution."""
s = Sequence("my_sequence", metadata=MetaData(testing.db))
self._assert_seq_result(s.next_value().execute().scalar())
def test_execute_optional(self, connection):
"""test dialect executes a Sequence, returns nextval, whether
- or not "optional" is set """
+ or not "optional" is set"""
s = Sequence("my_sequence", optional=True)
self._assert_seq_result(connection.execute(s))
def test_execute_next_value(self, connection):
"""test func.next_value().execute()/.scalar() works
- with connectionless execution. """
+ with connectionless execution."""
s = Sequence("my_sequence")
self._assert_seq_result(connection.scalar(s.next_value()))
def test_execute_optional_next_value(self, connection):
"""test func.next_value().execute()/.scalar() works
- with connectionless execution. """
+ with connectionless execution."""
s = Sequence("my_sequence", optional=True)
self._assert_seq_result(connection.scalar(s.next_value()))
"""test can use next_value() in values() of _ValuesBase"""
metadata = self.metadata
- t1 = Table("t", metadata, Column("x", Integer),)
+ t1 = Table(
+ "t",
+ metadata,
+ Column("x", Integer),
+ )
t1.create(testing.db)
s = Sequence("my_sequence")
connection.execute(t1.insert().values(x=s.next_value()))
metadata = self.metadata
s = Sequence("my_sequence")
- t1 = Table("t", metadata, Column("x", Integer, primary_key=True,),)
+ t1 = Table(
+ "t",
+ metadata,
+ Column(
+ "x",
+ Integer,
+ primary_key=True,
+ ),
+ )
t1.create(testing.db)
e = engines.testing_engine(options={"implicit_returning": True})
Table(
"Manager",
metadata,
- Column("obj_id", Integer, Sequence("obj_id_seq"),),
+ Column(
+ "obj_id",
+ Integer,
+ Sequence("obj_id_seq"),
+ ),
Column("name", String(128)),
Column(
"id",
conn.execute(sometable.select().order_by(sometable.c.id))
),
[
- (dsb, "somename", dsb,),
- (dsb + 1, "someother", dsb + 1,),
- (dsb + 2, "name3", dsb + 2,),
- (dsb + 3, "name4", dsb + 3,),
+ (
+ dsb,
+ "somename",
+ dsb,
+ ),
+ (
+ dsb + 1,
+ "someother",
+ dsb + 1,
+ ),
+ (
+ dsb + 2,
+ "name3",
+ dsb + 2,
+ ),
+ (
+ dsb + 3,
+ "name4",
+ dsb + 3,
+ ),
],
)
)
eq_(
- conn.scalar(select(boolean_table.c.unconstrained_value)), True,
+ conn.scalar(select(boolean_table.c.unconstrained_value)),
+ True,
)
def test_bind_processor_coercion_native_true(self):
self.assert_compile(
update(table1)
.where(table1.c.name == bindparam("crit"))
- .values({table1.c.name: "hi"},),
+ .values(
+ {table1.c.name: "hi"},
+ ),
"UPDATE mytable SET name=:name WHERE mytable.name = :crit",
params={"crit": "notthere"},
checkparams={"crit": "notthere", "name": "hi"},
self.assert_compile(
update(table1)
.where(table1.c.myid == 12)
- .values({table1.c.name: table1.c.myid},),
+ .values(
+ {table1.c.name: table1.c.myid},
+ ),
"UPDATE mytable "
"SET name=mytable.myid, description=:description "
"WHERE mytable.myid = :myid_1",
def test_with_join_unnamed(self):
people = self.tables.people
values = Values(
- column("column1", Integer), column("column2", Integer),
+ column("column1", Integer),
+ column("column2", Integer),
).data([(1, 1), (2, 1), (3, 2), (3, 3)])
stmt = select(people, values).select_from(
people.join(values, values.c.column2 == people.c.people_id)
# in case it requires a version pin
pydocstyle
pygments
- black==19.10b0
+ black==20.8b1
commands =
flake8 ./lib/ ./test/ ./examples/ setup.py doc/build/conf.py {posargs}
black --check .