.. changelog::
:version: 1.0.17
+ .. change::
+ :tags: bug, py3k
+ :tickets: 3886
+ :versions: 1.1.5
+
+ Fixed Python 3.6 DeprecationWarnings related to escaped strings without
+ the 'r' modifier, and added test coverage for Python 3.6.
+
.. change::
:tags: bug, orm
:tickets: 3884
# of what we're doing here
dbapi_con = connection.connection
version = []
- r = re.compile('[.\-]')
+ r = re.compile(r'[.\-]')
# 18 == pyodbc.SQL_DBMS_VER
for n in r.split(dbapi_con.getinfo(18)[1]):
try:
# queries.
dbapi_con = connection.connection
version = []
- r = re.compile('[.\-]')
+ r = re.compile(r'[.\-]')
for n in r.split(dbapi_con.getinfo(self.dbapi.SQL_DBMS_VER)):
try:
version.append(int(n))
# This module is part of SQLAlchemy and is released under
# the MIT License: http://www.opensource.org/licenses/mit-license.php
-"""
+r"""
.. dialect:: firebird
:name: Firebird
the SQLAlchemy ``returning()`` method, such as::
# INSERT..RETURNING
- result = table.insert().returning(table.c.col1, table.c.col2).\\
+ result = table.insert().returning(table.c.col1, table.c.col2).\
values(name='foo')
print result.fetchall()
# UPDATE..RETURNING
- raises = empl.update().returning(empl.c.id, empl.c.salary).\\
- where(empl.c.sales>100).\\
+ raises = empl.update().returning(empl.c.id, empl.c.salary).\
+ where(empl.c.sales>100).\
values(dict(salary=empl.c.salary * 1.1))
print raises.fetchall()
def _parse_version_info(self, version):
m = match(
- '\w+-V(\d+)\.(\d+)\.(\d+)\.(\d+)( \w+ (\d+)\.(\d+))?', version)
+ r'\w+-V(\d+)\.(\d+)\.(\d+)\.(\d+)( \w+ (\d+)\.(\d+))?', version)
if not m:
raise AssertionError(
"Could not determine version from string '%s'" % version)
# This module is part of SQLAlchemy and is released under
# the MIT License: http://www.opensource.org/licenses/mit-license.php
-"""
+r"""
.. dialect:: mssql+pyodbc
:name: PyODBC
:dbapi: pyodbc
_get_server_version_info(connection)
else:
version = []
- r = re.compile('[.\-]')
+ r = re.compile(r'[.\-]')
for n in r.split(raw):
try:
version.append(int(n))
# This module is part of SQLAlchemy and is released under
# the MIT License: http://www.opensource.org/licenses/mit-license.php
-"""
+r"""
.. dialect:: mysql
:name: MySQL
def _get_server_version_info(self, connection):
dbapi_con = connection.connection
version = []
- r = re.compile('[.\-]')
+ r = re.compile(r'[.\-]')
for n in r.split(dbapi_con.server_version):
try:
version.append(int(n))
def _get_server_version_info(self, connection):
dbapi_con = connection.connection
version = []
- r = re.compile('[.\-]')
+ r = re.compile(r'[.\-]')
for n in r.split(dbapi_con.get_server_info()):
try:
version.append(int(n))
def _get_server_version_info(self, connection):
dbapi_con = connection.connection
version = []
- r = re.compile('[.\-]')
+ r = re.compile(r'[.\-]')
for n in r.split(dbapi_con.server_info):
try:
version.append(int(n))
def _get_server_version_info(self, connection):
dbapi_con = connection.connection
version = []
- r = re.compile('[.\-]')
+ r = re.compile(r'[.\-]')
for n in r.split(dbapi_con.dbversion):
try:
version.append(int(n))
# This module is part of SQLAlchemy and is released under
# the MIT License: http://www.opensource.org/licenses/mit-license.php
-"""
+r"""
.. dialect:: postgresql
:name: PostgreSQL
use the :meth:`._UpdateBase.returning` method on a per-statement basis::
# INSERT..RETURNING
- result = table.insert().returning(table.c.col1, table.c.col2).\\
+ result = table.insert().returning(table.c.col1, table.c.col2).\
values(name='foo')
print result.fetchall()
# UPDATE..RETURNING
- result = table.update().returning(table.c.col1, table.c.col2).\\
+ result = table.update().returning(table.c.col1, table.c.col2).\
where(table.c.name=='foo').values(name='bar')
print result.fetchall()
# DELETE..RETURNING
- result = table.delete().returning(table.c.col1, table.c.col2).\\
+ result = table.delete().returning(table.c.col1, table.c.col2).\
where(table.c.name=='foo')
print result.fetchall()
def _get_server_version_info(self, connection):
v = connection.execute("select version()").scalar()
m = re.match(
- '.*(?:PostgreSQL|EnterpriseDB) '
- '(\d+)\.(\d+)(?:\.(\d+))?(?:\.\d+)?(?:devel)?',
+ r'.*(?:PostgreSQL|EnterpriseDB) '
+ r'(\d+)\.(\d+)(?:\.(\d+))?(?:\.\d+)?(?:devel)?',
v)
if not m:
raise AssertionError(
nullable = not notnull
is_array = format_type.endswith('[]')
- charlen = re.search('\(([\d,]+)\)', format_type)
+ charlen = re.search(r'\(([\d,]+)\)', format_type)
if charlen:
charlen = charlen.group(1)
- args = re.search('\((.*)\)', format_type)
+ args = re.search(r'\((.*)\)', format_type)
if args and args.group(1):
- args = tuple(re.split('\s*,\s*', args.group(1)))
+ args = tuple(re.split(r'\s*,\s*', args.group(1)))
else:
args = ()
kwargs = {}
domains = {}
for domain in c.fetchall():
# strip (30) from character varying(30)
- attype = re.search('([^\(]+)', domain['attype']).group(1)
+ attype = re.search(r'([^\(]+)', domain['attype']).group(1)
if domain['visible']:
# 'visible' just means whether or not the domain is in a
# schema that's on the search path -- or not overridden by
where = None
def __init__(self, *elements, **kw):
- """
+ r"""
:param \*elements:
A sequence of two tuples of the form ``(column, operator)`` where
column must be a column name or Column object and operator must
# This module is part of SQLAlchemy and is released under
# the MIT License: http://www.opensource.org/licenses/mit-license.php
-"""
+r"""
.. dialect:: sqlite
:name: SQLite
union
select x.a, x.b from x where a=2
''')
- assert [c[0] for c in cursor.description] == ['a', 'b'], \\
+ assert [c[0] for c in cursor.description] == ['a', 'b'], \
[c[0] for c in cursor.description]
The second assertion fails::
class DATETIME(_DateTimeMixin, sqltypes.DateTime):
- """Represent a Python datetime object in SQLite using a string.
+ r"""Represent a Python datetime object in SQLite using a string.
The default string storage format is::
class DATE(_DateTimeMixin, sqltypes.Date):
- """Represent a Python date object in SQLite using a string.
+ r"""Represent a Python date object in SQLite using a string.
The default string storage format is::
class TIME(_DateTimeMixin, sqltypes.Time):
- """Represent a Python time object in SQLite using a string.
+ r"""Represent a Python time object in SQLite using a string.
The default string storage format is::
constraint_name = None
table_data = self._get_table_sql(connection, table_name, schema=schema)
if table_data:
- PK_PATTERN = 'CONSTRAINT (\w+) PRIMARY KEY'
+ PK_PATTERN = r'CONSTRAINT (\w+) PRIMARY KEY'
result = re.search(PK_PATTERN, table_data, re.I)
constraint_name = result.group(1) if result else None
def parse_fks():
FK_PATTERN = (
- '(?:CONSTRAINT (\w+) +)?'
- 'FOREIGN KEY *\( *(.+?) *\) +'
- 'REFERENCES +(?:(?:"(.+?)")|([a-z0-9_]+)) *\((.+?)\) *'
- '((?:ON (?:DELETE|UPDATE) '
- '(?:SET NULL|SET DEFAULT|CASCADE|RESTRICT|NO ACTION) *)*)'
+ r'(?:CONSTRAINT (\w+) +)?'
+ r'FOREIGN KEY *\( *(.+?) *\) +'
+ r'REFERENCES +(?:(?:"(.+?)")|([a-z0-9_]+)) *\((.+?)\) *'
+ r'((?:ON (?:DELETE|UPDATE) '
+ r'(?:SET NULL|SET DEFAULT|CASCADE|RESTRICT|NO ACTION) *)*)'
)
for match in re.finditer(FK_PATTERN, table_data, re.I):
(
unique_constraints = []
def parse_uqs():
- UNIQUE_PATTERN = '(?:CONSTRAINT "?(.+?)"? +)?UNIQUE *\((.+?)\)'
+ UNIQUE_PATTERN = r'(?:CONSTRAINT "?(.+?)"? +)?UNIQUE *\((.+?)\)'
INLINE_UNIQUE_PATTERN = (
- '(?:(".+?")|([a-z0-9]+)) '
- '+[a-z0-9_ ]+? +UNIQUE')
+ r'(?:(".+?")|([a-z0-9]+)) '
+ r'+[a-z0-9_ ]+? +UNIQUE')
for match in re.finditer(UNIQUE_PATTERN, table_data, re.I):
name, cols = match.group(1, 2)
return []
CHECK_PATTERN = (
- '(?:CONSTRAINT (\w+) +)?'
- 'CHECK *\( *(.+) *\),? *'
+ r'(?:CONSTRAINT (\w+) +)?'
+ r'CHECK *\( *(.+) *\),? *'
)
check_constraints = []
# NOTE: we aren't using re.S here because we actually are
# This module is part of SQLAlchemy and is released under
# the MIT License: http://www.opensource.org/licenses/mit-license.php
-"""
+r"""
.. dialect:: sqlite+pysqlite
:name: pysqlite
:dbapi: sqlite3
used. Double backslashes are probably needed::
# absolute path on Windows
- e = create_engine('sqlite:///C:\\\\path\\\\to\\\\database.db')
+ e = create_engine('sqlite:///C:\\path\\to\\database.db')
The sqlite ``:memory:`` identifier is the default if no filepath is
present. Specify ``sqlite://`` and nothing else::
self.close()
def execution_options(self, **opt):
- """ Set non-SQL options for the connection which take effect
+ r""" Set non-SQL options for the connection which take effect
during execution.
The method returns a copy of this :class:`.Connection` which references
underlying resource, it's usually a good idea to ensure that the copies
will be discarded immediately, which is implicit if used as in::
- result = connection.execution_options(stream_results=True).\\
+ result = connection.execution_options(stream_results=True).\
execute(stmt)
Note that any key/value can be passed to
return self.execute(object, *multiparams, **params).scalar()
def execute(self, object, *multiparams, **params):
- """Executes a SQL statement construct and returns a
+ r"""Executes a SQL statement construct and returns a
:class:`.ResultProxy`.
:param object: The statement to be executed. May be
util.reraise(*exc_info)
def transaction(self, callable_, *args, **kwargs):
- """Execute the given function within a transaction boundary.
+ r"""Execute the given function within a transaction boundary.
The function is passed this :class:`.Connection`
as the first argument, followed by the given \*args and \**kwargs,
trans.rollback()
def run_callable(self, callable_, *args, **kwargs):
- """Given a callable object or function, execute it, passing
+ r"""Given a callable object or function, execute it, passing
a :class:`.Connection` as the first argument.
The given \*args and \**kwargs are passed subsequent
self.update_execution_options(**execution_options)
def update_execution_options(self, **opt):
- """Update the default execution_options dictionary
+ r"""Update the default execution_options dictionary
of this :class:`.Engine`.
The given keys/values in \**opt are added to the
return Engine._trans_ctx(conn, trans, close_with_result)
def transaction(self, callable_, *args, **kwargs):
- """Execute the given function within a transaction boundary.
+ r"""Execute the given function within a transaction boundary.
The function is passed a :class:`.Connection` newly procured
from :meth:`.Engine.contextual_connect` as the first argument,
return conn.transaction(callable_, *args, **kwargs)
def run_callable(self, callable_, *args, **kwargs):
- """Given a callable object or function, execute it, passing
+ r"""Given a callable object or function, execute it, passing
a :class:`.Connection` as the first argument.
The given \*args and \**kwargs are passed subsequent
def get_unique_constraints(
self, connection, table_name, schema=None, **kw):
- """Return information about unique constraints in `table_name`.
+ r"""Return information about unique constraints in `table_name`.
Given a string `table_name` and an optional string `schema`, return
unique constraint information as a list of dicts with these keys:
def get_check_constraints(
self, connection, table_name, schema=None, **kw):
- """Return information about check constraints in `table_name`.
+ r"""Return information about check constraints in `table_name`.
Given a string `table_name` and an optional string `schema`, return
check constraint information as a list of dicts with these keys:
return dialect_cls
def translate_connect_args(self, names=[], **kw):
- """Translate url attributes into a dictionary of connection arguments.
+ r"""Translate url attributes into a dictionary of connection arguments.
Returns attributes of this url (`host`, `database`, `username`,
`password`, `port`) as a plain dictionary. The attribute names are
_dispatch_target = SchemaEventTarget
def before_create(self, target, connection, **kw):
- """Called before CREATE statements are emitted.
+ r"""Called before CREATE statements are emitted.
:param target: the :class:`.MetaData` or :class:`.Table`
object which is the target of the event.
"""
def after_create(self, target, connection, **kw):
- """Called after CREATE statements are emitted.
+ r"""Called after CREATE statements are emitted.
:param target: the :class:`.MetaData` or :class:`.Table`
object which is the target of the event.
"""
def before_drop(self, target, connection, **kw):
- """Called before DROP statements are emitted.
+ r"""Called before DROP statements are emitted.
:param target: the :class:`.MetaData` or :class:`.Table`
object which is the target of the event.
"""
def after_drop(self, target, connection, **kw):
- """Called after DROP statements are emitted.
+ r"""Called after DROP statements are emitted.
:param target: the :class:`.MetaData` or :class:`.Table`
object which is the target of the event.
"""
def handle_error(self, exception_context):
- """Intercept all exceptions processed by the :class:`.Connection`.
+ r"""Intercept all exceptions processed by the :class:`.Connection`.
This includes all exceptions emitted by the DBAPI as well as
within SQLAlchemy's statement invocation process, including
@event.listens_for(Engine, "handle_error")
def handle_exception(context):
if isinstance(context.original_exception,
- psycopg2.OperationalError) and \\
+ psycopg2.OperationalError) and \
"failed" in str(context.original_exception):
raise MySpecialException("failed operation")
@event.listens_for(Engine, "handle_error", retval=True)
def handle_exception(context):
- if context.chained_exception is not None and \\
+ if context.chained_exception is not None and \
"special" in context.chained_exception.message:
return MySpecialException("failed",
cause=context.chained_exception)
def association_proxy(target_collection, attr, **kw):
- """Return a Python property implementing a view of a target
+ r"""Return a Python property implementing a view of a target
attribute which references an attribute on members of the
target.
# This module is part of SQLAlchemy and is released under
# the MIT License: http://www.opensource.org/licenses/mit-license.php
-"""Define an extension to the :mod:`sqlalchemy.ext.declarative` system
+r"""Define an extension to the :mod:`sqlalchemy.ext.declarative` system
which automatically generates mapped classes and relationships from a database
schema, typically though not necessarily one which is reflected.
"Produce a 'camelized' class name, e.g. "
"'words_and_underscores' -> 'WordsAndUnderscores'"
- return str(tablename[0].upper() + \\
+ return str(tablename[0].upper() + \
re.sub(r'_([a-z])', lambda m: m.group(1).upper(), tablename[1:]))
_pluralizer = inflect.engine()
# This module is part of SQLAlchemy and is released under
# the MIT License: http://www.opensource.org/licenses/mit-license.php
-"""Provides an API for creation of custom ClauseElements and compilers.
+r"""Provides an API for creation of custom ClauseElements and compilers.
Synopsis
========
from sqlalchemy.sql.expression import Executable, ClauseElement
class MyInsertThing(Executable, ClauseElement):
- _execution_options = \\
+ _execution_options = \
Executable._execution_options.union({'autocommit': True})
More succinctly, if the construct is truly similar to an INSERT, UPDATE, or
Example usage::
- Session.query(Account).\\
+ Session.query(Account).\
filter(
greatest(
Account.checking_balance,
name='Base', constructor=_declarative_constructor,
class_registry=None,
metaclass=DeclarativeMeta):
- """Construct a base class for declarative class definitions.
+ r"""Construct a base class for declarative class definitions.
The new base class will be given a metaclass that produces
appropriate :class:`~sqlalchemy.schema.Table` objects and makes
# This module is part of SQLAlchemy and is released under
# the MIT License: http://www.opensource.org/licenses/mit-license.php
-"""Define attributes on ORM-mapped classes that have "hybrid" behavior.
+r"""Define attributes on ORM-mapped classes that have "hybrid" behavior.
"hybrid" means the attribute has distinct behaviors defined at the
class level and at the instance level.
be used in an appropriate context such that an appropriate join to
``SavingsAccount`` will be present::
- >>> print Session().query(User, User.balance).\\
+ >>> print Session().query(User, User.balance).\
... join(User.accounts).filter(User.balance > 5000)
SELECT "user".id AS user_id, "user".name AS user_name,
account.balance AS account_balance
@balance.expression
def balance(cls):
- return select([func.sum(SavingsAccount.balance)]).\\
- where(SavingsAccount.user_id==cls.id).\\
+ return select([func.sum(SavingsAccount.balance)]).\
+ where(SavingsAccount.user_id==cls.id).\
label('total_balance')
The above recipe will give us the ``balance`` column which renders
>>> sw2 = aliased(SearchWord)
>>> print Session().query(
... sw1.word_insensitive,
- ... sw2.word_insensitive).\\
+ ... sw2.word_insensitive).\
... filter(
... sw1.word_insensitive > sw2.word_insensitive
... )
def transform(q):
cls = self.__clause_element__()
parent_alias = aliased(cls)
- return q.join(parent_alias, cls.parent).\\
+ return q.join(parent_alias, cls.parent).\
filter(op(parent_alias.parent, other))
return transform
>>> from sqlalchemy.orm import Session
>>> session = Session()
- {sql}>>> session.query(Node).\\
- ... with_transformation(Node.grandparent==Node(id=5)).\\
+ {sql}>>> session.query(Node).\
+ ... with_transformation(Node.grandparent==Node(id=5)).\
... all()
SELECT node.id AS node_id, node.parent_id AS node_parent_id
FROM node JOIN node AS node_1 ON node_1.id = node.parent_id
.. sourcecode:: pycon+sql
- {sql}>>> session.query(Node).\\
- ... with_transformation(Node.grandparent.join).\\
+ {sql}>>> session.query(Node).\
+ ... with_transformation(Node.grandparent.join).\
... filter(Node.grandparent==Node(id=5))
SELECT node.id AS node_id, node.parent_id AS node_parent_id
FROM node JOIN node AS node_1 ON node_1.id = node.parent_id
# This module is part of SQLAlchemy and is released under
# the MIT License: http://www.opensource.org/licenses/mit-license.php
-"""Provide support for tracking of in-place changes to scalar values,
+r"""Provide support for tracking of in-place changes to scalar values,
which are propagated into ORM change events on owning parent objects.
.. versionadded:: 0.7 :mod:`sqlalchemy.ext.mutable` replaces SQLAlchemy's
return self.x, self.y
def __eq__(self, other):
- return isinstance(other, Point) and \\
- other.x == self.x and \\
+ return isinstance(other, Point) and \
+ other.x == self.x and \
other.y == self.y
def __ne__(self, other):
def create_session(bind=None, **kwargs):
- """Create a new :class:`.Session`
+ r"""Create a new :class:`.Session`
with no automation enabled by default.
This function is used primarily for testing. The usual
def deferred(*columns, **kw):
- """Indicate a column-based mapped attribute that by default will
+ r"""Indicate a column-based mapped attribute that by default will
not load unless accessed.
:param \*columns: columns to be mapped. This is typically a single
parent_token=None, expire_missing=True,
send_modified_events=True,
**kwargs):
- """Construct an AttributeImpl.
+ r"""Construct an AttributeImpl.
\class_
associated class
"""
def __init__(self, class_, *attrs, **kwargs):
- """Return a composite column-based property for use with a Mapper.
+ r"""Return a composite column-based property for use with a Mapper.
See the mapping documentation section :ref:`mapper_composite` for a
full usage example.
_MapperEventsHold._clear()
def instrument_class(self, mapper, class_):
- """Receive a class when the mapper is first constructed,
+ r"""Receive a class when the mapper is first constructed,
before instrumentation is applied to the mapped class.
This event is the earliest phase of mapper construction.
"""
def mapper_configured(self, mapper, class_):
- """Called when a specific mapper has completed its own configuration
+ r"""Called when a specific mapper has completed its own configuration
within the scope of the :func:`.configure_mappers` call.
The :meth:`.MapperEvents.mapper_configured` event is invoked
class PropComparator(operators.ColumnOperators):
- """Defines SQL operators for :class:`.MapperProperty` objects.
+ r"""Defines SQL operators for :class:`.MapperProperty` objects.
SQLAlchemy allows for operators to
be redefined at both the Core and ORM level. :class:`.PropComparator`
# definition of custom PropComparator subclasses
- from sqlalchemy.orm.properties import \\
- ColumnProperty,\\
- CompositeProperty,\\
+ from sqlalchemy.orm.properties import \
+ ColumnProperty,\
+ CompositeProperty,\
RelationshipProperty
class MyColumnComparator(ColumnProperty.Comparator):
return a.of_type(class_)
def of_type(self, class_):
- """Redefine this object in terms of a polymorphic subclass.
+ r"""Redefine this object in terms of a polymorphic subclass.
Returns a new PropComparator from which further criterion can be
evaluated.
e.g.::
- query.join(Company.employees.of_type(Engineer)).\\
+ query.join(Company.employees.of_type(Engineer)).\
filter(Engineer.name=='foo')
:param \class_: a class or mapper indicating that criterion will be
return self.operate(PropComparator.of_type_op, class_)
def any(self, criterion=None, **kwargs):
- """Return true if this collection contains any member that meets the
+ r"""Return true if this collection contains any member that meets the
given criterion.
The usual implementation of ``any()`` is
return self.operate(PropComparator.any_op, criterion, **kwargs)
def has(self, criterion=None, **kwargs):
- """Return true if this element references a member which meets the
+ r"""Return true if this element references a member which meets the
given criterion.
The usual implementation of ``has()`` is
legacy_is_orphan=False,
_compiled_cache_size=100,
):
- """Return a new :class:`~.Mapper` object.
+ r"""Return a new :class:`~.Mapper` object.
This function is typically used behind the scenes
via the Declarative extension. When using Declarative,
@property
def entity(self):
- """Part of the inspection API.
+ r"""Part of the inspection API.
Returns self.class\_.
def validates(*names, **kw):
- """Decorate a method as a 'validator' for one or more named properties.
+ r"""Decorate a method as a 'validator' for one or more named properties.
Designates a method as a validator, a method which receives the
name of the attribute as well as a value to be assigned, or in the
'_mapped_by_synonym', '_deferred_column_loader')
def __init__(self, *columns, **kwargs):
- """Provide a column-level property for use with a Mapper.
+ r"""Provide a column-level property for use with a Mapper.
Column-based properties can normally be applied to the mapper's
``properties`` dictionary using the :class:`.Column` element directly.
return q.alias(name=name)
def cte(self, name=None, recursive=False):
- """Return the full SELECT statement represented by this
+ r"""Return the full SELECT statement represented by this
:class:`.Query` represented as a common table expression (CTE).
Parameters and usage are the same as those of the
included_parts = session.query(
Part.sub_part,
Part.part,
- Part.quantity).\\
- filter(Part.part=="our part").\\
+ Part.quantity).\
+ filter(Part.part=="our part").\
cte(name="included_parts", recursive=True)
incl_alias = aliased(included_parts, name="pr")
session.query(
parts_alias.sub_part,
parts_alias.part,
- parts_alias.quantity).\\
+ parts_alias.quantity).\
filter(parts_alias.part==incl_alias.c.sub_part)
)
included_parts.c.sub_part,
func.sum(included_parts.c.quantity).
label('total_quantity')
- ).\\
+ ).\
group_by(included_parts.c.sub_part)
.. seealso::
@_generative()
def yield_per(self, count):
- """Yield only ``count`` rows at a time.
+ r"""Yield only ``count`` rows at a time.
The purpose of this method is when fetching very large result sets
(> 10K rows), to batch results in sub-collections and yield them
Or more selectively using :func:`.lazyload`; such as with
an asterisk to specify the default loader scheme::
- q = sess.query(Object).yield_per(100).\\
+ q = sess.query(Object).yield_per(100).\
options(lazyload('*'), joinedload(Object.some_related))
.. warning::
self.session = session
def from_self(self, *entities):
- """return a Query that selects from this Query's
+ r"""return a Query that selects from this Query's
SELECT statement.
:meth:`.Query.from_self` essentially turns the SELECT statement
the set of user objects we query against, and then apply additional
joins against that row-limited set::
- q = session.query(User).filter(User.name.like('e%')).\\
- limit(5).from_self().\\
+ q = session.query(User).filter(User.name.like('e%')).\
+ limit(5).from_self().\
join(User.addresses).filter(Address.email.like('q%'))
The above query joins to the ``Address`` entity but only against the
refer to the ``User`` entity without any additional aliasing applied
to it, those references wil be in terms of the subquery::
- q = session.query(User).filter(User.name.like('e%')).\\
- limit(5).from_self().\\
- join(User.addresses).filter(Address.email.like('q%')).\\
+ q = session.query(User).filter(User.name.like('e%')).\
+ limit(5).from_self().\
+ join(User.addresses).filter(Address.email.like('q%')).\
order_by(User.name)
The ORDER BY against ``User.name`` is aliased to be in terms of the
``Address`` entity on the outside, but we only wanted the outer
query to return the ``Address.email`` column::
- q = session.query(User).filter(User.name.like('e%')).\\
- limit(5).from_self(Address.email).\\
+ q = session.query(User).filter(User.name.like('e%')).\
+ limit(5).from_self(Address.email).\
join(User.addresses).filter(Address.email.like('q%'))
yielding:
then a subquery, and then we'd like :func:`.contains_eager` to access
the ``User`` columns::
- q = session.query(Address).join(Address.user).\\
+ q = session.query(Address).join(Address.user).\
filter(User.name.like('e%'))
- q = q.add_entity(User).from_self().\\
+ q = q.add_entity(User).from_self().\
options(contains_eager(Address.user))
We use :meth:`.Query.add_entity` above **before** we call
# Users, filtered on some arbitrary criterion
# and then ordered by related email address
- q = session.query(User).\\
- join(User.address).\\
- filter(User.name.like('%ed%')).\\
+ q = session.query(User).\
+ join(User.address).\
+ filter(User.name.like('%ed%')).\
order_by(Address.email)
# given *only* User.id==5, Address.email, and 'q', what
# would the *next* User in the result be ?
- subq = q.with_entities(Address.email).\\
- order_by(None).\\
- filter(User.id==5).\\
+ subq = q.with_entities(Address.email).\
+ order_by(None).\
+ filter(User.id==5).\
subquery()
- q = q.join((subq, subq.c.email < Address.email)).\\
+ q = q.join((subq, subq.c.email < Address.email)).\
limit(1)
.. versionadded:: 0.6.5
@_generative()
def params(self, *args, **kwargs):
- """add values for bind parameters which may have been
+ r"""add values for bind parameters which may have been
specified in filter().
parameters may be specified using \**kwargs, or optionally a single
@_generative(_no_statement_condition, _no_limit_offset)
def filter(self, *criterion):
- """apply the given filtering criterion to a copy
+ r"""apply the given filtering criterion to a copy
of this :class:`.Query`, using SQL expressions.
e.g.::
is that they will be joined together using the :func:`.and_`
function::
- session.query(MyClass).\\
+ session.query(MyClass).\
filter(MyClass.name == 'some name', MyClass.id > 5)
The criterion is any SQL expression object applicable to the
self._criterion = criterion
def filter_by(self, **kwargs):
- """apply the given filtering criterion to a copy
+ r"""apply the given filtering criterion to a copy
of this :class:`.Query`, using keyword expressions.
e.g.::
is that they will be joined together using the :func:`.and_`
function::
- session.query(MyClass).\\
+ session.query(MyClass).\
filter_by(name = 'some name', id = 5)
The keyword expressions are extracted from the primary
@_generative(_no_statement_condition, _no_limit_offset)
def having(self, criterion):
- """apply a HAVING criterion to the query and return the
+ r"""apply a HAVING criterion to the query and return the
newly resulting :class:`.Query`.
:meth:`~.Query.having` is used in conjunction with
HAVING criterion makes it possible to use filters on aggregate
functions like COUNT, SUM, AVG, MAX, and MIN, eg.::
- q = session.query(User.id).\\
- join(User.addresses).\\
- group_by(User.id).\\
+ q = session.query(User.id).\
+ join(User.addresses).\
+ group_by(User.id).\
having(func.count(Address.id) > 2)
"""
return self._set_op(expression.except_all, *q)
def join(self, *props, **kwargs):
- """Create a SQL JOIN against this :class:`.Query` object's criterion
+ r"""Create a SQL JOIN against this :class:`.Query` object's criterion
and apply generatively, returning the newly resulting :class:`.Query`.
**Simple Relationship Joins**
:meth:`~.Query.join`, each using an explicit attribute to indicate
the source entity::
- q = session.query(User).\\
- join(User.orders).\\
- join(Order.items).\\
+ q = session.query(User).\
+ join(User.orders).\
+ join(Order.items).\
join(Item.keywords)
**Joins to a Target Entity or Selectable**
a_alias = aliased(Address)
- q = session.query(User).\\
- join(User.addresses).\\
- join(a_alias, User.addresses).\\
- filter(Address.email_address=='ed@foo.com').\\
+ q = session.query(User).\
+ join(User.addresses).\
+ join(a_alias, User.addresses).\
+ filter(Address.email_address=='ed@foo.com').\
filter(a_alias.email_address=='ed@bar.com')
Where above, the generated SQL would be similar to::
:func:`.alias` and :func:`.select` constructs, with either the one
or two-argument forms::
- addresses_q = select([Address.user_id]).\\
- where(Address.email_address.endswith("@bar.com")).\\
+ addresses_q = select([Address.user_id]).\
+ where(Address.email_address.endswith("@bar.com")).\
alias()
- q = session.query(User).\\
+ q = session.query(User).\
join(addresses_q, addresses_q.c.user_id==User.id)
:meth:`~.Query.join` also features the ability to *adapt* a
against ``Address``, allowing the relationship denoted by
``User.addresses`` to *adapt* itself to the altered target::
- address_subq = session.query(Address).\\
- filter(Address.email_address == 'ed@foo.com').\\
+ address_subq = session.query(Address).\
+ filter(Address.email_address == 'ed@foo.com').\
subquery()
q = session.query(User).join(address_subq, User.addresses)
The above form allows one to fall back onto an explicit ON
clause at any time::
- q = session.query(User).\\
+ q = session.query(User).\
join(address_subq, User.id==address_subq.c.user_id)
**Controlling what to Join From**
the :class:`.Query` to select first from the ``User``
entity::
- q = session.query(Address).select_from(User).\\
- join(User.addresses).\\
+ q = session.query(Address).select_from(User).\
+ join(User.addresses).\
filter(User.name == 'ed')
Which will produce SQL similar to::
when a query is being joined algorithmically, such as
when querying self-referentially to an arbitrary depth::
- q = session.query(Node).\\
+ q = session.query(Node).\
join("children", "children", aliased=True)
When ``aliased=True`` is used, the actual "alias" construct
:meth:`.Query.filter` will adapt the incoming entity to
the last join point::
- q = session.query(Node).\\
- join("children", "children", aliased=True).\\
+ q = session.query(Node).\
+ join("children", "children", aliased=True).\
filter(Node.name == 'grandchild 1')
When using automatic aliasing, the ``from_joinpoint=True``
multiple calls to :meth:`~.Query.join`, so that
each path along the way can be further filtered::
- q = session.query(Node).\\
- join("children", aliased=True).\\
- filter(Node.name='child 1').\\
- join("children", aliased=True, from_joinpoint=True).\\
+ q = session.query(Node).\
+ join("children", aliased=True).\
+ filter(Node.name='child 1').\
+ join("children", aliased=True, from_joinpoint=True).\
filter(Node.name == 'grandchild 1')
The filtering aliases above can then be reset back to the
original ``Node`` entity using :meth:`~.Query.reset_joinpoint`::
- q = session.query(Node).\\
- join("children", "children", aliased=True).\\
- filter(Node.name == 'grandchild 1').\\
- reset_joinpoint().\\
+ q = session.query(Node).\
+ join("children", "children", aliased=True).\
+ filter(Node.name == 'grandchild 1').\
+ reset_joinpoint().\
filter(Node.name == 'parent 1)
For an example of ``aliased=True``, see the distribution
@_generative(_no_clauseelement_condition)
def select_from(self, *from_obj):
- """Set the FROM clause of this :class:`.Query` explicitly.
+ r"""Set the FROM clause of this :class:`.Query` explicitly.
:meth:`.Query.select_from` is often used in conjunction with
:meth:`.Query.join` in order to control which entity is selected
A typical example::
- q = session.query(Address).select_from(User).\\
- join(User.addresses).\\
+ q = session.query(Address).select_from(User).\
+ join(User.addresses).\
filter(User.name == 'ed')
Which produces SQL equivalent to::
@_generative(_no_clauseelement_condition)
def select_entity_from(self, from_obj):
- """Set the FROM clause of this :class:`.Query` to a
+ r"""Set the FROM clause of this :class:`.Query` to a
core selectable, applying it as a replacement FROM clause
for corresponding mapped entities.
select_stmt = select([User]).where(User.id == 7)
- q = session.query(User).\\
- select_entity_from(select_stmt).\\
+ q = session.query(User).\
+ select_entity_from(select_stmt).\
filter(User.name == 'ed')
The query generated will select ``User`` entities directly
version 0.9, does not affect existing entities. The
statement below::
- q = session.query(User).\\
- select_from(select_stmt).\\
+ q = session.query(User).\
+ select_from(select_stmt).\
filter(User.name == 'ed')
Produces SQL where both the ``user`` table as well as the
@_generative(_no_statement_condition)
def distinct(self, *criterion):
- """Apply a ``DISTINCT`` to the query and return the newly resulting
+ r"""Apply a ``DISTINCT`` to the query and return the newly resulting
``Query``.
@_generative()
def prefix_with(self, *prefixes):
- """Apply the prefixes to the query and return the newly resulting
+ r"""Apply the prefixes to the query and return the newly resulting
``Query``.
:param \*prefixes: optional prefixes, typically strings,
e.g.::
- query = sess.query(User.name).\\
- prefix_with('HIGH_PRIORITY').\\
+ query = sess.query(User.name).\
+ prefix_with('HIGH_PRIORITY').\
prefix_with('SQL_SMALL_RESULT', 'ALL')
Would render::
@_generative()
def suffix_with(self, *suffixes):
- """Apply the suffix to the query and return the newly resulting
+ r"""Apply the suffix to the query and return the newly resulting
``Query``.
:param \*suffixes: optional suffixes, typically strings,
statement.with_only_columns([1]))
def count(self):
- """Return a count of rows this Query would return.
+ r"""Return a count of rows this Query would return.
This generates the SQL for this Query as follows::
# return count of user "id" grouped
# by "name"
- session.query(func.count(User.id)).\\
+ session.query(func.count(User.id)).\
group_by(User.name)
from sqlalchemy import distinct
return self.from_self(col).scalar()
def delete(self, synchronize_session='evaluate'):
- """Perform a bulk delete query.
+ r"""Perform a bulk delete query.
Deletes rows matched by this query from the database.
E.g.::
- sess.query(User).filter(User.age == 25).\\
+ sess.query(User).filter(User.age == 25).\
delete(synchronize_session=False)
- sess.query(User).filter(User.age == 25).\\
+ sess.query(User).filter(User.age == 25).\
delete(synchronize_session='evaluate')
.. warning:: The :meth:`.Query.delete` method is a "bulk" operation,
subclasses ``Employee``, a DELETE against the ``Employee``
table would look like::
- session.query(Engineer).\\
- filter(Engineer.id == Employee.id).\\
- filter(Employee.name == 'dilbert').\\
+ session.query(Engineer).\
+ filter(Engineer.id == Employee.id).\
+ filter(Employee.name == 'dilbert').\
delete()
However the above SQL will not delete from the Engineer table,
return delete_op.rowcount
def update(self, values, synchronize_session='evaluate', update_args=None):
- """Perform a bulk update query.
+ r"""Perform a bulk update query.
Updates rows matched by this query in the database.
E.g.::
- sess.query(User).filter(User.age == 25).\\
+ sess.query(User).filter(User.age == 25).\
update({User.age: User.age - 10}, synchronize_session=False)
- sess.query(User).filter(User.age == 25).\\
+ sess.query(User).filter(User.age == 25).\
update({"age": User.age - 10}, synchronize_session='evaluate')
local table using criteria against the ``Employee``
local table might look like::
- session.query(Engineer).\\
- filter(Engineer.id == Employee.id).\\
- filter(Employee.name == 'dilbert').\\
+ session.query(Engineer).\
+ filter(Engineer.id == Employee.id).\
+ filter(Employee.name == 'dilbert').\
update({"engineer_type": "programmer"})
* The polymorphic identity WHERE criteria is **not** included
is_aliased_class = False
def __init__(self, name, *exprs, **kw):
- """Construct a new :class:`.Bundle`.
+ r"""Construct a new :class:`.Bundle`.
e.g.::
class AliasOption(interfaces.MapperOption):
def __init__(self, alias):
- """Return a :class:`.MapperOption` that will indicate to the :class:`.Query`
+ r"""Return a :class:`.MapperOption` that will indicate to the :class:`.Query`
that the main table has been aliased.
This is a seldom-used option to suit the
statement that aliases the parent table. E.g.::
# define an aliased UNION called 'ulist'
- ulist = users.select(users.c.user_id==7).\\
- union(users.select(users.c.user_id>7)).\\
+ ulist = users.select(users.c.user_id==7).\
+ union(users.select(users.c.user_id>7)).\
alias('ulist')
# add on an eager load of "addresses"
- statement = ulist.outerjoin(addresses).\\
+ statement = ulist.outerjoin(addresses).\
select().apply_labels()
# create query, indicating "ulist" will be an
self.registry = ThreadLocalRegistry(session_factory)
def __call__(self, **kw):
- """Return the current :class:`.Session`, creating it
+ r"""Return the current :class:`.Session`, creating it
using the :attr:`.scoped_session.session_factory` if not present.
:param \**kw: Keyword arguments will be passed to the
weak_identity_map=True, binds=None, extension=None,
info=None,
query_cls=query.Query):
- """Construct a new Session.
+ r"""Construct a new Session.
See also the :class:`.sessionmaker` function which is used to
generate a :class:`.Session`-producing callable with a given
close_with_result=False,
execution_options=None,
**kw):
- """Return a :class:`.Connection` object corresponding to this
+ r"""Return a :class:`.Connection` object corresponding to this
:class:`.Session` object's transactional state.
If this :class:`.Session` is configured with ``autocommit=False``,
return conn
def execute(self, clause, params=None, mapper=None, bind=None, **kw):
- """Execute a SQL expression construct or string statement within
+ r"""Execute a SQL expression construct or string statement within
the current transaction.
Returns a :class:`.ResultProxy` representing
def is_modified(self, instance, include_collections=True,
passive=True):
- """Return ``True`` if the given instance has locally
+ r"""Return ``True`` if the given instance has locally
modified attributes.
This method retrieves the history for each instrumented
or many-to-one foreign keys) that would result in an UPDATE for this
instance upon flush.
:param passive:
+
.. versionchanged:: 0.8
Ignored for backwards compatibility.
When using SQLAlchemy 0.7 and earlier, this flag should always
autocommit=False,
expire_on_commit=True,
info=None, **kw):
- """Construct a new :class:`.sessionmaker`.
+ r"""Construct a new :class:`.sessionmaker`.
All arguments here except for ``class_`` correspond to arguments
accepted by :class:`.Session` directly. See the
@loader_option()
def contains_eager(loadopt, attr, alias=None):
- """Indicate that the given attribute should be eagerly loaded from
+ r"""Indicate that the given attribute should be eagerly loaded from
columns stated manually in the query.
This function is part of the :class:`.Load` interface and supports
The option is used in conjunction with an explicit join that loads
the desired rows, i.e.::
- sess.query(Order).\\
- join(Order.user).\\
+ sess.query(Order).\
+ join(Order.user).\
options(contains_eager(Order.user))
The above query would join from the ``Order`` entity to its related
the eagerly-loaded rows are to come from an aliased table::
user_alias = aliased(User)
- sess.query(Order).\\
- join((user_alias, Order.user)).\\
+ sess.query(Order).\
+ join((user_alias, Order.user)).\
options(contains_eager(Order.user, alias=user_alias))
.. seealso::
@loader_option()
def defer(loadopt, key):
- """Indicate that the given column-oriented attribute should be deferred, e.g.
+ r"""Indicate that the given column-oriented attribute should be deferred, e.g.
not loaded until accessed.
This function is part of the :class:`.Load` interface and supports
@loader_option()
def undefer(loadopt, key):
- """Indicate that the given column-oriented attribute should be undeferred,
+ r"""Indicate that the given column-oriented attribute should be undeferred,
e.g. specified within the SELECT statement of the entity as a whole.
The column being undeferred is typically set up on the mapping as a
def from_string(cls, arg):
values = [
c for c
- in re.split('\s*,\s*', arg or "")
+ in re.split(r'\s*,\s*', arg or "")
if c
]
return cls(values)
class AliasedClass(object):
- """Represents an "aliased" form of a mapped class for usage with Query.
+ r"""Represents an "aliased" form of a mapped class for usage with Query.
The ORM equivalent of a :func:`sqlalchemy.sql.expression.alias`
construct, this object mimics the mapped class using a
# find all pairs of users with the same name
user_alias = aliased(User)
- session.query(User, user_alias).\\
- join((user_alias, User.id > user_alias.id)).\\
+ session.query(User, user_alias).\
+ join((user_alias, User.id > user_alias.id)).\
filter(User.name==user_alias.name)
The resulting object is an instance of :class:`.AliasedClass`.
def join(
left, right, onclause=None, isouter=False,
full=False, join_to_left=None):
- """Produce an inner join between left and right clauses.
+ r"""Produce an inner join between left and right clauses.
:func:`.orm.join` is an extension to the core join interface
provided by :func:`.sql.expression.join()`, where the
:meth:`.Query.select_from` method, as in::
from sqlalchemy.orm import join
- session.query(User).\\
- select_from(join(User, Address, User.addresses)).\\
+ session.query(User).\
+ select_from(join(User, Address, User.addresses)).\
filter(Address.email_address=='foo@bar.com')
In modern SQLAlchemy the above join can be written more
succinctly as::
- session.query(User).\\
- join(User.addresses).\\
+ session.query(User).\
+ join(User.addresses).\
filter(Address.email_address=='foo@bar.com')
See :meth:`.Query.join` for information on modern usage
def manage(module, **params):
- """Return a proxy for a DB-API module that automatically
+ r"""Return a proxy for a DB-API module that automatically
pools connections.
Given a DB-API 2.0 module and pool management parameters, returns
:param poolclass: the class used by the pool module to provide
pooling. Defaults to :class:`.QueuePool`.
- :param \*\*params: will be passed through to *poolclass*
+ :param \**params: will be passed through to *poolclass*
"""
try:
def __init__(self, creator, pool_size=5, max_overflow=10, timeout=30,
**kw):
- """
+ r"""
Construct a QueuePool.
:param creator: a callable function that returns a DB-API
return bool(value)
DATETIME_RE = re.compile(
- "(\d+)-(\d+)-(\d+) (\d+):(\d+):(\d+)(?:\.(\d+))?")
- TIME_RE = re.compile("(\d+):(\d+):(\d+)(?:\.(\d+))?")
- DATE_RE = re.compile("(\d+)-(\d+)-(\d+)")
+ r"(\d+)-(\d+)-(\d+) (\d+):(\d+):(\d+)(?:\.(\d+))?")
+ TIME_RE = re.compile(r"(\d+):(\d+):(\d+)(?:\.(\d+))?")
+ DATE_RE = re.compile(r"(\d+)-(\d+)-(\d+)")
str_to_datetime = str_to_datetime_processor_factory(DATETIME_RE,
datetime.datetime)
class TypeCompiler(util.with_metaclass(util.EnsureKWArgType, object)):
"""Produces DDL specification for TypeEngine objects."""
- ensure_kwarg = 'visit_\w+'
+ ensure_kwarg = r'visit_\w+'
def __init__(self, dialect):
self.dialect = dialect
@_generative
def execute_if(self, dialect=None, callable_=None, state=None):
- """Return a callable that will execute this
+ r"""Return a callable that will execute this
DDLElement conditionally.
Used to provide a wrapper for event listening::
@_generative
def returning(self, *cols):
- """Add a :term:`RETURNING` or equivalent clause to this statement.
+ r"""Add a :term:`RETURNING` or equivalent clause to this statement.
e.g.::
- stmt = table.update().\\
- where(table.c.data == 'value').\\
- values(status='X').\\
+ stmt = table.update().\
+ where(table.c.data == 'value').\
+ values(status='X').\
returning(table.c.server_flag,
table.c.updated_timestamp)
@_generative
def values(self, *args, **kwargs):
- """specify a fixed VALUES clause for an INSERT statement, or the SET
+ r"""specify a fixed VALUES clause for an INSERT statement, or the SET
clause for an UPDATE.
Note that the :class:`.Insert` and :class:`.Update` constructs support
return_defaults=False,
preserve_parameter_order=False,
**dialect_kw):
- """Construct an :class:`.Update` object.
+ r"""Construct an :class:`.Update` object.
E.g.::
from sqlalchemy import update
- stmt = update(users).where(users.c.id==5).\\
+ stmt = update(users).where(users.c.id==5).\
values(name='user #5')
Similar functionality is available via the
:meth:`~.TableClause.update` method on
:class:`.Table`::
- stmt = users.update().\\
- where(users.c.id==5).\\
+ stmt = users.update().\
+ where(users.c.id==5).\
values(name='user #5')
:param table: A :class:`.Table` object representing the database
subquery::
users.update().values(name='ed').where(
- users.c.name==select([addresses.c.email_address]).\\
- where(addresses.c.user_id==users.c.id).\\
+ users.c.name==select([addresses.c.email_address]).\
+ where(addresses.c.user_id==users.c.id).\
as_scalar()
)
being updated::
users.update().values(
- name=select([addresses.c.email_address]).\\
- where(addresses.c.user_id==users.c.id).\\
+ name=select([addresses.c.email_address]).\
+ where(addresses.c.user_id==users.c.id).\
as_scalar()
)
def literal(value, type_=None):
- """Return a literal clause, bound to a bind parameter.
+ r"""Return a literal clause, bound to a bind parameter.
Literal clauses are created automatically when non-
:class:`.ClauseElement` objects (such as strings, ints, dates, etc.) are
return cloned_traverse(self, {}, {'bindparam': visit_bindparam})
def compare(self, other, **kw):
- """Compare this ClauseElement to the given ClauseElement.
+ r"""Compare this ClauseElement to the given ClauseElement.
Subclasses should override the default behavior, which is a
straight identity comparison.
pass
def get_children(self, **kwargs):
- """Return immediate child elements of this :class:`.ClauseElement`.
+ r"""Return immediate child elements of this :class:`.ClauseElement`.
This is used for visit traversal.
class BindParameter(ColumnElement):
- """Represent a "bound expression".
+ r"""Represent a "bound expression".
:class:`.BindParameter` is invoked explicitly using the
:func:`.bindparam` function, as in::
from sqlalchemy import bindparam
- stmt = select([users_table]).\\
+ stmt = select([users_table]).\
where(users_table.c.name == bindparam('username'))
Detailed discussion of how :class:`.BindParameter` is used is
isoutparam=False,
_compared_to_operator=None,
_compared_to_type=None):
- """Produce a "bound expression".
+ r"""Produce a "bound expression".
The return value is an instance of :class:`.BindParameter`; this
is a :class:`.ColumnElement` subclass which represents a so-called
from sqlalchemy import bindparam
- stmt = select([users_table]).\\
+ stmt = select([users_table]).\
where(users_table.c.name == bindparam('username'))
The above statement, when rendered, will produce SQL similar to::
@classmethod
def _create_text(self, text, bind=None, bindparams=None,
typemap=None, autocommit=None):
- """Construct a new :class:`.TextClause` clause, representing
+ r"""Construct a new :class:`.TextClause` clause, representing
a textual SQL string directly.
E.g.::
For SQL statements where a colon is required verbatim, as within
an inline string, use a backslash to escape::
- t = text("SELECT * FROM users WHERE name='\\:username'")
+ t = text("SELECT * FROM users WHERE name='\:username'")
The :class:`.TextClause` construct includes methods which can
provide information about the bound parameters as well as the column
parameter detail, and :meth:`.TextClause.columns` method allows
specification of return columns including names and types::
- t = text("SELECT * FROM users WHERE id=:user_id").\\
- bindparams(user_id=7).\\
+ t = text("SELECT * FROM users WHERE id=:user_id").\
+ bindparams(user_id=7).\
columns(id=Integer, name=String)
for id, name in connection.execute(t):
can be set explicitly so using the
:paramref:`.Connection.execution_options.autocommit` option::
- t = text("EXEC my_procedural_thing()").\\
+ t = text("EXEC my_procedural_thing()").\
execution_options(autocommit=True)
Note that SQLAlchemy's usual "autocommit" behavior applies to
Is equivalent to::
- stmt = text("SELECT * FROM table WHERE id=:id").\\
+ stmt = text("SELECT * FROM table WHERE id=:id").\
bindparams(bindparam('id', value=5, type_=Integer))
.. deprecated:: 0.9.0 the :meth:`.TextClause.bindparams` method
stmt = text("SELECT id, name FROM some_table")
stmt = stmt.columns(column('id'), column('name')).alias('st')
- stmt = select([mytable]).\\
+ stmt = select([mytable]).\
select_from(
mytable.join(stmt, mytable.c.name == stmt.c.name)
).where(stmt.c.id > 5)
times against a statement, which will have the effect of each
clause being combined using :func:`.and_`::
- stmt = select([users_table]).\\
- where(users_table.c.name == 'wendy').\\
+ stmt = select([users_table]).\
+ where(users_table.c.name == 'wendy').\
where(users_table.c.enrolled == True)
.. seealso::
from sqlalchemy import case
- stmt = select([users_table]).\\
+ stmt = select([users_table]).\
where(
case(
[
__visit_name__ = 'case'
def __init__(self, whens, value=None, else_=None):
- """Produce a ``CASE`` expression.
+ r"""Produce a ``CASE`` expression.
The ``CASE`` construct in SQL is a conditional object that
acts somewhat analogously to an "if/then" construct in other
from sqlalchemy import case
- stmt = select([users_table]).\\
+ stmt = select([users_table]).\
where(
case(
[
compared against keyed to result expressions. The statement below is
equivalent to the preceding statement::
- stmt = select([users_table]).\\
+ stmt = select([users_table]).\
where(
case(
{"wendy": "W", "jack": "J"},
def literal_column(text, type_=None):
- """Produce a :class:`.ColumnClause` object that has the
+ r"""Produce a :class:`.ColumnClause` object that has the
:paramref:`.column.is_literal` flag set to True.
:func:`.literal_column` is similar to :func:`.column`, except that
from sqlalchemy import desc, nullsfirst
- stmt = select([users_table]).\\
+ stmt = select([users_table]).\
order_by(nullsfirst(desc(users_table.c.name)))
The SQL expression from the above would resemble::
from sqlalchemy import desc, nullslast
- stmt = select([users_table]).\\
+ stmt = select([users_table]).\
order_by(nullslast(desc(users_table.c.name)))
The SQL expression from the above would resemble::
:meth:`.ColumnElement.nullslast`, rather than as its standalone
function version, as in::
- stmt = select([users_table]).\\
+ stmt = select([users_table]).\
order_by(users_table.c.name.desc().nullslast())
.. seealso::
order_by = None
def __init__(self, element, *order_by):
- """Produce a :class:`.WithinGroup` object against a function.
+ r"""Produce a :class:`.WithinGroup` object against a function.
Used against so-called "ordered set aggregate" and "hypothetical
set aggregate" functions, including :class:`.percentile_cont`,
return None
def alias(self, name=None, flat=False):
- """Produce a :class:`.Alias` construct against this
+ r"""Produce a :class:`.Alias` construct against this
:class:`.FunctionElement`.
This construct wraps the function in a named alias which
from sqlalchemy.sql import column
- stmt = select([column('data_view')]).\\
- select_from(SomeTable).\\
+ stmt = select([column('data_view')]).\
+ select_from(SomeTable).\
select_from(func.unnest(SomeTable.data).alias('data_view')
)
class count(GenericFunction):
- """The ANSI COUNT aggregate function. With no arguments,
+ r"""The ANSI COUNT aggregate function. With no arguments,
emits COUNT \*.
"""
return against
def operate(self, op, *other, **kwargs):
- """Operate on an argument.
+ r"""Operate on an argument.
This is the lowest level of operation, raises
:class:`NotImplementedError` by default.
class Table(DialectKWArgs, SchemaItem, TableClause):
- """Represent a table in a database.
+ r"""Represent a table in a database.
e.g.::
__visit_name__ = 'column'
def __init__(self, *args, **kwargs):
- """
+ r"""
Construct a new ``Column`` object.
:param name: The name of this column as represented in the database.
initially=None, link_to_name=False, match=None,
info=None,
**dialect_kw):
- """
+ r"""
Construct a column-level FOREIGN KEY.
The :class:`.ForeignKey` object when constructed generates a
def __init__(self, name=None, deferrable=None, initially=None,
_create_rule=None, info=None, _type_bound=False,
**dialect_kw):
- """Create a SQL constraint.
+ r"""Create a SQL constraint.
:param name:
Optional, the in-database name of this ``Constraint``.
"""A constraint that proxies a ColumnCollection."""
def __init__(self, *columns, **kw):
- """
+ r"""
:param \*columns:
A sequence of column names or Column objects.
def __init__(self, sqltext, name=None, deferrable=None,
initially=None, table=None, info=None, _create_rule=None,
_autoattach=True, _type_bound=False):
- """Construct a CHECK constraint.
+ r"""Construct a CHECK constraint.
:param sqltext:
A string containing the constraint definition, which will be used
ondelete=None, deferrable=None, initially=None,
use_alter=False, link_to_name=False, match=None,
table=None, info=None, **dialect_kw):
- """Construct a composite-capable FOREIGN KEY.
+ r"""Construct a composite-capable FOREIGN KEY.
:param columns: A sequence of local column names. The named columns
must be defined and present in the parent Table. The names should
__visit_name__ = 'index'
def __init__(self, name, *expressions, **kw):
- """Construct an index object.
+ r"""Construct an index object.
:param name:
The name of the index
extend_existing=False,
autoload_replace=True,
**dialect_kwargs):
- """Load all available table definitions from the database.
+ r"""Load all available table definitions from the database.
Automatically creates ``Table`` entries in this ``MetaData`` for any
table available in the database but not yet present in the
def subquery(alias, *args, **kwargs):
- """Return an :class:`.Alias` object derived
+ r"""Return an :class:`.Alias` object derived
from a :class:`.Select`.
name
@_generative
def prefix_with(self, *expr, **kw):
- """Add one or more expressions following the statement keyword, i.e.
+ r"""Add one or more expressions following the statement keyword, i.e.
SELECT, INSERT, UPDATE, or DELETE. Generative.
This is used to support backend-specific prefix keywords such as those
@_generative
def suffix_with(self, *expr, **kw):
- """Add one or more expressions following the statement as a whole.
+ r"""Add one or more expressions following the statement as a whole.
This is used to support backend-specific suffix keywords on
certain constructs.
"join explicitly." % (a.description, b.description))
def select(self, whereclause=None, **kwargs):
- """Create a :class:`.Select` from this :class:`.Join`.
+ r"""Create a :class:`.Select` from this :class:`.Join`.
The equivalent long-hand form, given a :class:`.Join` object
``j``, is::
from sqlalchemy import select
- j = select([j.left, j.right], **kw).\\
- where(whereclause).\\
+ j = select([j.left, j.right], **kw).\
+ where(whereclause).\
select_from(j)
:param whereclause: the WHERE criterion that will be sent to
@util.dependencies("sqlalchemy.sql.util")
def alias(self, sqlutil, name=None, flat=False):
- """return an alias of this :class:`.Join`.
+ r"""return an alias of this :class:`.Join`.
The default behavior here is to first produce a SELECT
construct from this :class:`.Join`, then to produce an
from sqlalchemy import select, alias
j = alias(
- select([j.left, j.right]).\\
- select_from(j).\\
- with_labels(True).\\
+ select([j.left, j.right]).\
+ select_from(j).\
+ with_labels(True).\
correlate(False),
name=name
)
"""
def cte(self, name=None, recursive=False):
- """Return a new :class:`.CTE`, or Common Table Expression instance.
+ r"""Return a new :class:`.CTE`, or Common Table Expression instance.
Common table expressions are a SQL standard whereby SELECT
statements can draw upon secondary statements specified along
]).group_by(orders.c.region).cte("regional_sales")
- top_regions = select([regional_sales.c.region]).\\
+ top_regions = select([regional_sales.c.region]).\
where(
regional_sales.c.total_sales >
select([
included_parts = select([
parts.c.sub_part,
parts.c.part,
- parts.c.quantity]).\\
- where(parts.c.part=='our part').\\
+ parts.c.quantity]).\
+ where(parts.c.part=='our part').\
cte(recursive=True)
included_parts.c.sub_part,
func.sum(included_parts.c.quantity).
label('total_quantity')
- ]).\\
+ ]).\
group_by(included_parts.c.sub_part)
result = conn.execute(statement).fetchall()
@classmethod
def _create_union(cls, *selects, **kwargs):
- """Return a ``UNION`` of multiple selectables.
+ r"""Return a ``UNION`` of multiple selectables.
The returned object is an instance of
:class:`.CompoundSelect`.
@classmethod
def _create_union_all(cls, *selects, **kwargs):
- """Return a ``UNION ALL`` of multiple selectables.
+ r"""Return a ``UNION ALL`` of multiple selectables.
The returned object is an instance of
:class:`.CompoundSelect`.
@classmethod
def _create_except(cls, *selects, **kwargs):
- """Return an ``EXCEPT`` of multiple selectables.
+ r"""Return an ``EXCEPT`` of multiple selectables.
The returned object is an instance of
:class:`.CompoundSelect`.
@classmethod
def _create_except_all(cls, *selects, **kwargs):
- """Return an ``EXCEPT ALL`` of multiple selectables.
+ r"""Return an ``EXCEPT ALL`` of multiple selectables.
The returned object is an instance of
:class:`.CompoundSelect`.
@classmethod
def _create_intersect(cls, *selects, **kwargs):
- """Return an ``INTERSECT`` of multiple selectables.
+ r"""Return an ``INTERSECT`` of multiple selectables.
The returned object is an instance of
:class:`.CompoundSelect`.
@classmethod
def _create_intersect_all(cls, *selects, **kwargs):
- """Return an ``INTERSECT ALL`` of multiple selectables.
+ r"""Return an ``INTERSECT ALL`` of multiple selectables.
The returned object is an instance of
:class:`.CompoundSelect`.
@_generative
def with_hint(self, selectable, text, dialect_name='*'):
- """Add an indexing or other executional context hint for the given
+ r"""Add an indexing or other executional context hint for the given
selectable to this :class:`.Select`.
The text of the hint is rendered in the appropriate
the table or alias. E.g. when using Oracle, the
following::
- select([mytable]).\\
+ select([mytable]).\
with_hint(mytable, "index(%(name)s ix_mytable)")
Would render SQL as::
hint to a particular backend. Such as, to add hints for both Oracle
and Sybase simultaneously::
- select([mytable]).\\
- with_hint(mytable, "index(%(name)s ix_mytable)", 'oracle').\\
+ select([mytable]).\
+ with_hint(mytable, "index(%(name)s ix_mytable)", 'oracle').\
with_hint(mytable, "WITH INDEX ix_mytable", 'sybase')
.. seealso::
@_generative
def with_only_columns(self, columns):
- """Return a new :func:`.select` construct with its columns
+ r"""Return a new :func:`.select` construct with its columns
clause replaced with the given columns.
.. versionchanged:: 0.7.3
else (i.e. not in the WHERE clause, etc.) is to set it using
:meth:`.Select.select_from`::
- >>> s1 = select([table1.c.a, table2.c.b]).\\
+ >>> s1 = select([table1.c.a, table2.c.b]).\
... select_from(table1.join(table2,
... table1.c.a==table2.c.a))
>>> s2 = s1.with_only_columns([table2.c.b])
@_generative
def distinct(self, *expr):
- """Return a new select() construct which will apply DISTINCT to its
+ r"""Return a new select() construct which will apply DISTINCT to its
columns clause.
:param \*expr: optional column expressions. When present,
@_generative
def select_from(self, fromclause):
- """return a new :func:`.select` construct with the
+ r"""return a new :func:`.select` construct with the
given FROM expression
merged into its list of FROM objects.
table1 = table('t1', column('a'))
table2 = table('t2', column('b'))
- s = select([table1.c.a]).\\
+ s = select([table1.c.a]).\
select_from(
table1.join(table2, table1.c.a==table2.c.b)
)
@_generative
def correlate(self, *fromclauses):
- """return a new :class:`.Select` which will correlate the given FROM
+ r"""return a new :class:`.Select` which will correlate the given FROM
clauses to that of an enclosing :class:`.Select`.
Calling this method turns off the :class:`.Select` object's
@_generative
def correlate_except(self, *fromclauses):
- """return a new :class:`.Select` which will omit the given FROM
+ r"""return a new :class:`.Select` which will omit the given FROM
clauses from the auto-correlation process.
Calling :meth:`.Select.correlate_except` turns off the
def __init__(self, precision=None, asdecimal=False,
decimal_return_scale=None, **kwargs):
- """
+ r"""
Construct a Float.
:param precision: the numeric precision for use in DDL ``CREATE
__visit_name__ = 'enum'
def __init__(self, *enums, **kw):
- """Construct an enum.
+ r"""Construct an enum.
Keyword arguments which don't apply to a specific backend are ignored
by that backend.
def reduce_columns(columns, *clauses, **kw):
- """given a list of columns, return a 'reduced' set based on natural
+ r"""given a list of columns, return a 'reduced' set based on natural
equivalents.
the set is reduced to the smallest list of columns which have no natural
warnings.filterwarnings('error', category=sa_exc.SADeprecationWarning)
warnings.filterwarnings('error', category=sa_exc.SAWarning)
+ # some selected deprecations...
+ warnings.filterwarnings('error', category=DeprecationWarning)
+ warnings.filterwarnings(
+ "ignore", category=DeprecationWarning, message=".*StopIteration")
+ warnings.filterwarnings(
+ "ignore", category=DeprecationWarning, message=".*inspect.getargspec")
+
def assert_warnings(fn, warning_msgs, regex=False):
"""Assert that each of the given warnings are emitted by fn.
"""Handle Python version/platform incompatibilities."""
import sys
+from contextlib import contextmanager
try:
import threading
return metaclass('temporary_class', None, {})
-from contextlib import contextmanager
-try:
- from contextlib import nested
-except ImportError:
- # removed in py3k, credit to mitsuhiko for
- # workaround
-
- @contextmanager
- def nested(*managers):
- exits = []
- vars = []
- exc = (None, None, None)
- try:
- for mgr in managers:
- exit = mgr.__exit__
- enter = mgr.__enter__
- vars.append(enter())
- exits.append(exit)
- yield vars
- except:
- exc = sys.exc_info()
- finally:
- while exits:
- exit = exits.pop()
- try:
- if exit(*exc):
- exc = (None, None, None)
- except:
- exc = sys.exc_info()
- if exc != (None, None, None):
- reraise(exc[0], exc[1], exc[2])
+
+@contextmanager
+def nested(*managers):
+ """Implement contextlib.nested, mostly for unit tests.
+
+ As tests still need to run on py2.6 we can't use multiple-with yet.
+
+ Function is removed in py3k but also emits deprecation warning in 2.7
+ so just roll it here for everyone.
+
+ """
+
+ exits = []
+ vars = []
+ exc = (None, None, None)
+ try:
+ for mgr in managers:
+ exit = mgr.__exit__
+ enter = mgr.__enter__
+ vars.append(enter())
+ exits.append(exit)
+ yield vars
+ except:
+ exc = sys.exc_info()
+ finally:
+ while exits:
+ exit = exits.pop()
+ try:
+ if exit(*exc):
+ exc = (None, None, None)
+ except:
+ exc = sys.exc_info()
+ if exc != (None, None, None):
+ reraise(exc[0], exc[1], exc[2])
def get_cls_kwargs(cls, _set=None):
- """Return the full set of inherited kwargs for the given `cls`.
+ r"""Return the full set of inherited kwargs for the given `cls`.
Probes a class's __init__ method, collecting all named arguments. If the
__init__ defines a \**kwargs catch-all, then the constructor is presumed
def coerce_kw_type(kw, key, type_, flexi_bool=True):
- """If 'key' is present in dict 'kw', coerce its value to type 'type\_' if
+ r"""If 'key' is present in dict 'kw', coerce its value to type 'type\_' if
necessary. If 'flexi_bool' is True, the string '0' is considered false
when coercing to boolean.
"""
assert_raises_message(
ValueError,
r'''After u?'\[\.\.\.\], "key1"=>"value1", ', could not parse '''
- '''residual at position 36: u?'crapcrapcrap, "key3"\[\.\.\.\]''',
+ r'''residual at position 36: u?'crapcrapcrap, "key3"\[\.\.\.\]''',
proc,
'"key2"=>"value2", "key1"=>"value1", '
'crapcrapcrap, "key3"=>"value3"'
"%(year)04d%(month)02d%(day)02d"
"%(hour)02d%(minute)02d%(second)02d%(microsecond)06d"
),
- regexp="(\d{4})(\d{2})(\d{2})(\d{2})(\d{2})(\d{2})(\d{6})",
+ regexp=r"(\d{4})(\d{2})(\d{2})(\d{2})(\d{2})(\d{2})(\d{6})",
)
bp = sldt.bind_processor(None)
eq_(bp(dt), '20080627120000000125')
eq_(str(dt), '2008-06-27')
sldt = sqlite.DATE(
storage_format="%(month)02d/%(day)02d/%(year)04d",
- regexp="(?P<month>\d+)/(?P<day>\d+)/(?P<year>\d+)",
+ regexp=r"(?P<month>\d+)/(?P<day>\d+)/(?P<year>\d+)",
)
bp = sldt.bind_processor(None)
eq_(bp(dt), '06/27/2008')
eq_(str(dt), '2008-06-27')
sldt = sqlite.DATE(
storage_format="%(year)04d%(month)02d%(day)02d",
- regexp="(\d{4})(\d{2})(\d{2})",
+ regexp=r"(\d{4})(\d{2})(\d{2})",
)
bp = sldt.bind_processor(None)
eq_(bp(dt), '20080627')
assert_raises_message(
tsa.exc.StatementError,
r"\(test.engine.test_execute.SomeException\) "
- "nope \[SQL\: u?'SELECT 1 ",
+ r"nope \[SQL\: u?'SELECT 1 ",
conn.execute,
select([1]).
where(
assert_raises_message(
tsa.exc.StatementError,
r"\(test.engine.test_execute.SomeException\) "
- "nope \[SQL\: u?'SELECT 1 ",
+ r"nope \[SQL\: u?'SELECT 1 ",
conn.execute,
select([1]).where(
column('foo') == literal('bar', MyType()))
assert_raises_message(
tsa.exc.StatementError,
r"\(test.engine.test_execute.SomeException\) "
- "nope \[SQL\: u?'SELECT 1 ",
+ r"nope \[SQL\: u?'SELECT 1 ",
conn.execute,
select([1]).where(
column('foo') == literal('bar', MyType()))
assert_raises_message(
tsa.exc.DBAPIError,
r".*'INSERT INTO nonexistent \(data\) values \(:data\)'\] "
- "\[parameters: "
- "\[{'data': '0'}, {'data': '1'}, {'data': '2'}, "
- "{'data': '3'}, {'data': '4'}, {'data': '5'}, "
- "{'data': '6'}, {'data': '7'} ... displaying 10 of "
- "100 total bound parameter sets ... {'data': '98'}, {'data': '99'}\]",
+ r"\[parameters: "
+ r"\[{'data': '0'}, {'data': '1'}, {'data': '2'}, "
+ r"{'data': '3'}, {'data': '4'}, {'data': '5'}, "
+ r"{'data': '6'}, {'data': '7'} ... displaying 10 of "
+ r"100 total bound parameter sets ... {'data': '98'}, {'data': '99'}\]",
lambda: self.eng.execute(
"INSERT INTO nonexistent (data) values (:data)",
[{"data": str(i)} for i in range(100)]
assert_raises_message(
tsa.exc.DBAPIError,
r".*INSERT INTO nonexistent \(data\) values "
- "\(\?\)'\] \[parameters: \[\('0',\), \('1',\), \('2',\), \('3',\), "
- "\('4',\), \('5',\), \('6',\), \('7',\) "
- "... displaying "
- "10 of 100 total bound parameter sets ... "
- "\('98',\), \('99',\)\]",
+ r"\(\?\)'\] \[parameters: \[\('0',\), \('1',\), \('2',\), \('3',\), "
+ r"\('4',\), \('5',\), \('6',\), \('7',\) "
+ r"... displaying "
+ r"10 of 100 total bound parameter sets ... "
+ r"\('98',\), \('99',\)\]",
lambda: self.eng.execute(
"INSERT INTO nonexistent (data) values (?)",
[(str(i), ) for i in range(100)]
with expect_warnings(
"An exception has occurred during handling of a previous "
"exception. The previous exception "
- "is:.*..SQL\:.*RELEASE SAVEPOINT"
+ r"is:.*..SQL\:.*RELEASE SAVEPOINT"
):
def go():
with connection.begin_nested() as savepoint:
connection, savepoint._savepoint)
assert_raises_message(
exc.DBAPIError,
- ".*SQL\:.*ROLLBACK TO SAVEPOINT",
+ r".*SQL\:.*ROLLBACK TO SAVEPOINT",
go
)
resolver = resolver("Foo")
assert_raises_message(
exc.InvalidRequestError,
- "When initializing mapper some_parent, expression "
- "'Foo' failed to locate a name \('Foo'\).",
+ r"When initializing mapper some_parent, expression "
+ r"'Foo' failed to locate a name \('Foo'\).",
resolver
)
assert_raises_message(
exc.ArgumentError,
- "Non-empty has\(\) not allowed",
+ r"Non-empty has\(\) not allowed",
User.singular_value.has,
User.singular_value == "singular4"
)
assert_raises_message(
exc.ArgumentError,
- "Non-empty has\(\) not allowed",
+ r"Non-empty has\(\) not allowed",
User.singular_value.has, singular_value="singular4"
)
# renders here, so the "%" operator in the string needs to
# apply the tuple also
r"Composite expects Column objects or mapped "
- "attributes/attribute names as "
- "arguments, got: \(Column",
+ r"attributes/attribute names as "
+ r"arguments, got: \(Column",
configure_mappers
)
mapper(User, users)
assert_raises_message(
sa.exc.SAWarning,
- "before_configured' and 'after_configured' ORM events only "
- "invoke with the mapper\(\) function or Mapper class as "
- "the target.",
+ r"before_configured' and 'after_configured' ORM events only "
+ r"invoke with the mapper\(\) function or Mapper class as "
+ r"the target.",
event.listen, User, 'before_configured', m1
)
assert_raises_message(
sa.exc.SAWarning,
- "before_configured' and 'after_configured' ORM events only "
- "invoke with the mapper\(\) function or Mapper class as "
- "the target.",
+ r"before_configured' and 'after_configured' ORM events only "
+ r"invoke with the mapper\(\) function or Mapper class as "
+ r"the target.",
event.listen, User, 'after_configured', m1
)
assert_raises_message(
sa.exc.SAWarning,
r"__del__\(\) method on class "
- "<class '.*\.A'> will cause "
- "unreachable cycles and memory leaks, as SQLAlchemy "
- "instrumentation often creates reference cycles. "
- "Please remove this method.",
+ r"<class '.*\.A'> will cause "
+ r"unreachable cycles and memory leaks, as SQLAlchemy "
+ r"instrumentation often creates reference cycles. "
+ r"Please remove this method.",
mapper, A, self.fixture()
)
assert_raises_message(
sa_exc.InvalidRequestError,
- "Don't know how to join from x; please use select_from\(\) to "
- "establish the left entity/selectable of this join",
+ r"Don't know how to join from x; please use select_from\(\) to "
+ r"establish the left entity/selectable of this join",
sess.query(literal_column('x'), User).join, Address
)
assert_raises_message(
sa_exc.InvalidRequestError,
- "No entities to join from; please use select_from\(\) to "
- "establish the left entity/selectable of this join",
+ r"No entities to join from; please use select_from\(\) to "
+ r"establish the left entity/selectable of this join",
sess.query().join, Address
)
assert_raises_message(
sa.orm.exc.NoResultFound,
- "No row was found for one\(\)",
+ r"No row was found for one\(\)",
sess.query(User).filter(User.id == 99).one)
eq_(sess.query(User).filter(User.id == 7).one().id, 7)
assert_raises_message(
sa.orm.exc.MultipleResultsFound,
- "Multiple rows were found for one\(\)",
+ r"Multiple rows were found for one\(\)",
sess.query(User).one)
assert_raises(
assert_raises_message(
sa.orm.exc.MultipleResultsFound,
- "Multiple rows were found for one_or_none\(\)",
+ r"Multiple rows were found for one_or_none\(\)",
sess.query(User).one_or_none)
eq_(sess.query(User.id, User.name).filter(User.id == 99).one_or_none(), None)
assert_raises_message(
exc.SAWarning,
r"relationship .* will copy column .* to column "
- "employee_t.company_id, which conflicts with relationship\(s\)",
+ r"employee_t.company_id, which conflicts with relationship\(s\)",
configure_mappers
)
assert_raises_message(sa.exc.ArgumentError,
r"reverse_property 'dingaling' on relationship "
- "User.addresses references "
- "relationship Address.dingaling, which does not "
- "reference mapper Mapper\|User\|users",
+ r"User.addresses references "
+ r"relationship Address.dingaling, which does not "
+ r"reference mapper Mapper\|User\|users",
configure_mappers)
Address = self.classes.Address
def evt(mapper, conn, instance):
object_session(instance).add(Address(email='x1'))
- self._test(evt, "Session.add\(\)")
+ self._test(evt, r"Session.add\(\)")
def test_plain_merge(self):
Address = self.classes.Address
def evt(mapper, conn, instance):
object_session(instance).merge(Address(email='x1'))
- self._test(evt, "Session.merge\(\)")
+ self._test(evt, r"Session.merge\(\)")
def test_plain_delete(self):
Address = self.classes.Address
def evt(mapper, conn, instance):
object_session(instance).delete(Address(email='x1'))
- self._test(evt, "Session.delete\(\)")
+ self._test(evt, r"Session.delete\(\)")
def _test(self, fn, method):
User = self.classes.User
trans2.rollback()
assert_raises_message(
sa_exc.InvalidRequestError,
- "This Session's transaction has been rolled back by a nested "
- "rollback\(\) call. To begin a new transaction, issue "
- "Session.rollback\(\) first.",
+ r"This Session's transaction has been rolled back by a nested "
+ r"rollback\(\) call. To begin a new transaction, issue "
+ r"Session.rollback\(\) first.",
trans.commit
)
trans2.rollback(_capture_exception=True)
assert_raises_message(
sa_exc.InvalidRequestError,
- "This Session's transaction has been rolled back due to a "
- "previous exception during flush. To begin a new transaction "
- "with this Session, first issue Session.rollback\(\). "
- "Original exception was: test",
+ r"This Session's transaction has been rolled back due to a "
+ r"previous exception during flush. To begin a new transaction "
+ r"with this Session, first issue Session.rollback\(\). "
+ r"Original exception was: test",
trans.commit
)
p1.data = 3
assert_raises_message(
orm_exc.StaleDataError,
- "UPDATE statement on table 'parent' expected to "
- "update 1 row\(s\); 0 were matched.",
+ r"UPDATE statement on table 'parent' expected to "
+ r"update 1 row\(s\); 0 were matched.",
sess.flush
)
p1.data = 3
assert_raises_message(
orm_exc.StaleDataError,
- "UPDATE statement on table 'parent' expected to "
- "update 1 row\(s\); 0 were matched.",
+ r"UPDATE statement on table 'parent' expected to "
+ r"update 1 row\(s\); 0 were matched.",
sess.flush
)
p1.data = literal(1)
assert_raises_message(
orm_exc.StaleDataError,
- "UPDATE statement on table 'parent' expected to "
- "update 1 row\(s\); 0 were matched.",
+ r"UPDATE statement on table 'parent' expected to "
+ r"update 1 row\(s\); 0 were matched.",
sess.flush
)
assert_raises_message(
exc.SAWarning,
- "DELETE statement on table 'parent' expected to "
- "delete 1 row\(s\); 0 were matched.",
+ r"DELETE statement on table 'parent' expected to "
+ r"delete 1 row\(s\); 0 were matched.",
sess.commit
)
assert_raises_message(
exc.SAWarning,
- "DELETE statement on table 'parent' expected to "
- "delete 2 row\(s\); 0 were matched.",
+ r"DELETE statement on table 'parent' expected to "
+ r"delete 2 row\(s\); 0 were matched.",
sess.flush
)
assert_raises_message(
exc.InvalidRequestError,
r"Select objects don't have a type\. Call as_scalar\(\) "
- "on this Select object to return a 'scalar' "
- "version of this Select\.",
+ r"on this Select object to return a 'scalar' "
+ r"version of this Select\.",
func.coalesce, select([table1.c.myid])
)
return t2
assert_raises_message(
exc.ArgumentError,
- "Element Table\('t2', .* is not a string name or column element",
+ r"Element Table\('t2', .* is not a string name or column element",
Index, "foo", SomeClass()
)
assert_raises_message(
exc.InvalidRequestError,
- "Naming convention including \%\(constraint_name\)s token "
- "requires that constraint is explicitly named.",
+ r"Naming convention including \%\(constraint_name\)s token "
+ r"requires that constraint is explicitly named.",
schema.CreateTable(u1).compile, dialect=default.DefaultDialect()
)
a_eq(prep(r'select \foo'), r'select \foo')
a_eq(prep(r"time='12\:30:00'"), r"time='12\:30:00'")
- a_eq(prep(":this \:that"), "? :that")
+ a_eq(prep(r":this \:that"), "? :that")
a_eq(prep(r"(\:that$other)"), "(:that$other)")
a_eq(prep(r".\:that$ :other."), ".:that$ ?.")
with assertions.expect_warnings(
r"Number of columns in textual SQL \(4\) is "
- "smaller than number of columns requested \(2\)"):
+ r"smaller than number of columns requested \(2\)"):
result = testing.db.execute(stmt)
row = result.first()
)
assert_raises_message(
sa_exc.InvalidRequestError,
- "Can't call inserted_primary_key when returning\(\) is used.",
+ r"Can't call inserted_primary_key when returning\(\) is used.",
getattr, result, "inserted_primary_key"
)
def test_missing_bind_kw(self):
assert_raises_message(
exc.ArgumentError,
- "This text\(\) construct doesn't define a bound parameter named 'bar'",
+ r"This text\(\) construct doesn't define "
+ r"a bound parameter named 'bar'",
text(":foo").bindparams,
foo=5,
bar=7)
def test_missing_bind_posn(self):
assert_raises_message(
exc.ArgumentError,
- "This text\(\) construct doesn't define a bound parameter named 'bar'",
+ r"This text\(\) construct doesn't define "
+ r"a bound parameter named 'bar'",
text(":foo").bindparams,
bindparam(
'foo',
def test_escaping_colons(self):
# test escaping out text() params with a backslash
self.assert_compile(
- text("select * from foo where clock='05:06:07' "
- "and mork='\:mindy'"),
+ text(r"select * from foo where clock='05:06:07' "
+ r"and mork='\:mindy'"),
"select * from foo where clock='05:06:07' and mork=':mindy'",
checkparams={},
params={},
def test_escaping_double_colons(self):
self.assert_compile(
text(
- "SELECT * FROM pg_attribute WHERE "
- "attrelid = :tab\:\:regclass"),
+ r"SELECT * FROM pg_attribute WHERE "
+ r"attrelid = :tab\:\:regclass"),
"SELECT * FROM pg_attribute WHERE "
"attrelid = %(tab)s::regclass",
params={'tab': None},
table1 = self.tables.mytable
testing.assert_raises_message(
ValueError,
- "When preserve_parameter_order is True, values\(\) "
- "only accepts a list of 2-tuples",
+ r"When preserve_parameter_order is True, values\(\) "
+ r"only accepts a list of 2-tuples",
table1.update(preserve_parameter_order=True).values,
{"description": "foo", "name": "bar"}
)
[tox]
-envlist = py{26,27,34,35}-{cext,nocext}
+envlist = py{26,27,34,35,36}-{cext,nocext}
[testenv]
# note that we have a .coveragerc file that points coverage specifically
show-source = True
ignore = E711,E712,E721,N806,D
exclude=.venv,.git,.tox,dist,doc,*egg,build
-