--- /dev/null
+.. change::
+ :tags: change, sql
+ :tickets: 4632
+
+ The "threadlocal" execution strategy, deprecated in 1.3, has been
+ removed for 1.4, as well as the concept of "engine strategies" and the
+ ``Engine.contextual_connect`` method. The "strategy='mock'" keyword
+ argument is still accepted for now with a deprecation warning; use
+ :func:`.create_mock_engine` instead for this use case.
+
+ .. seealso::
+
+ :ref:`change_4393_threadlocal` - from the 1.3 migration notes which
+ discusses the rationale for deprecation.
\ No newline at end of file
.. autofunction:: sqlalchemy.engine_from_config
+.. autofunction:: sqlalchemy.create_mock_engine
+
.. autofunction:: sqlalchemy.engine.url.make_url
for t in ti:
print(t)
+.. _faq_ddl_as_string:
+
How can I get the CREATE TABLE/ DROP TABLE output as a string?
==============================================================
print(CreateTable(mytable).compile(engine))
-There's also a special form of :class:`.Engine` that can let you dump an entire
-metadata creation sequence, using this recipe::
+There's also a special form of :class:`.Engine` available via
+:func:`.create_mock_engine` that allows one to dump an entire
+metadata creation sequence as a string, using this recipe::
+
+ from sqlalchemy import create_mock_engine
def dump(sql, *multiparams, **params):
print(sql.compile(dialect=engine.dialect))
- engine = create_engine('postgresql://', strategy='mock', executor=dump)
+ engine = create_mock_engine('postgresql://', dump)
metadata.create_all(engine, checkfirst=False)
The `Alembic <https://alembic.sqlalchemy.org>`_ tool also supports
from .types import VARBINARY # noqa
from .types import VARCHAR # noqa
+# these are placed last because there are
+# cross dependencies between sqlalchemy.sql and
+# sqlalchemy.engine that cause import cycles
from .engine import create_engine # noqa nosort
from .engine import engine_from_config # noqa nosort
+from .engine import create_mock_engine # noqa nosort
__version__ = "1.4.0b1"
):
return MySQLDialect._show_create_table(
self,
- connection._contextual_connect(
+ connection.connect(
close_with_result=True
).execution_options(_oursql_plain_query=True),
table,
"entry point" class into this package is the Engine and its public
constructor ``create_engine()``.
-This package includes:
-
-base.py
- Defines interface classes and some implementation classes which
- comprise the basic components used to interface between a DB-API,
- constructed and plain-text statements, connections, transactions,
- and results.
-
-default.py
- Contains default implementations of some of the components defined
- in base.py. All current database dialects use the classes in
- default.py as base classes for their own database-specific
- implementations.
-
-strategies.py
- The mechanics of constructing ``Engine`` objects are represented
- here. Defines the ``EngineStrategy`` class which represents how
- to go from arguments specified to the ``create_engine()``
- function, to a fully constructed ``Engine``, including
- initialization of connection pooling, dialects, and specific
- subclasses of ``Engine``.
-
-threadlocal.py
- The ``TLEngine`` class is defined here, which is a subclass of
- the generic ``Engine`` and tracks ``Connection`` and
- ``Transaction`` objects against the identity of the current
- thread. This allows certain programming patterns based around
- the concept of a "thread-local connection" to be possible.
- The ``TLEngine`` is created by using the "threadlocal" engine
- strategy in conjunction with the ``create_engine()`` function.
-
-url.py
- Defines the ``URL`` class which represents the individual
- components of a string URL passed to ``create_engine()``. Also
- defines a basic module-loading strategy for the dialect specifier
- within a URL.
"""
-from . import strategies
from . import util # noqa
from .base import Connection # noqa
from .base import Engine # noqa
from .base import RootTransaction # noqa
from .base import Transaction # noqa
from .base import TwoPhaseTransaction # noqa
+from .create import create_engine
+from .create import engine_from_config
from .interfaces import Compiled # noqa
from .interfaces import Connectable # noqa
from .interfaces import CreateEnginePlugin # noqa
from .interfaces import ExceptionContext # noqa
from .interfaces import ExecutionContext # noqa
from .interfaces import TypeCompiler # noqa
+from .mock import create_mock_engine
from .result import BaseRowProxy # noqa
from .result import BufferedColumnResultProxy # noqa
from .result import BufferedColumnRow # noqa
from ..sql import ddl # noqa
-# backwards compat
-
-default_strategy = "plain"
-
-
-def create_engine(*args, **kwargs):
- """Create a new :class:`.Engine` instance.
-
- The standard calling form is to send the URL as the
- first positional argument, usually a string
- that indicates database dialect and connection arguments::
-
-
- engine = create_engine("postgresql://scott:tiger@localhost/test")
-
- Additional keyword arguments may then follow it which
- establish various options on the resulting :class:`.Engine`
- and its underlying :class:`.Dialect` and :class:`.Pool`
- constructs::
-
- engine = create_engine("mysql://scott:tiger@hostname/dbname",
- encoding='latin1', echo=True)
-
- The string form of the URL is
- ``dialect[+driver]://user:password@host/dbname[?key=value..]``, where
- ``dialect`` is a database name such as ``mysql``, ``oracle``,
- ``postgresql``, etc., and ``driver`` the name of a DBAPI, such as
- ``psycopg2``, ``pyodbc``, ``cx_oracle``, etc. Alternatively,
- the URL can be an instance of :class:`~sqlalchemy.engine.url.URL`.
-
- ``**kwargs`` takes a wide variety of options which are routed
- towards their appropriate components. Arguments may be specific to
- the :class:`.Engine`, the underlying :class:`.Dialect`, as well as the
- :class:`.Pool`. Specific dialects also accept keyword arguments that
- are unique to that dialect. Here, we describe the parameters
- that are common to most :func:`.create_engine()` usage.
-
- Once established, the newly resulting :class:`.Engine` will
- request a connection from the underlying :class:`.Pool` once
- :meth:`.Engine.connect` is called, or a method which depends on it
- such as :meth:`.Engine.execute` is invoked. The :class:`.Pool` in turn
- will establish the first actual DBAPI connection when this request
- is received. The :func:`.create_engine` call itself does **not**
- establish any actual DBAPI connections directly.
-
- .. seealso::
-
- :doc:`/core/engines`
-
- :doc:`/dialects/index`
-
- :ref:`connections_toplevel`
-
- :param case_sensitive=True: if False, result column names
- will match in a case-insensitive fashion, that is,
- ``row['SomeColumn']``.
-
- :param connect_args: a dictionary of options which will be
- passed directly to the DBAPI's ``connect()`` method as
- additional keyword arguments. See the example
- at :ref:`custom_dbapi_args`.
-
- :param convert_unicode=False: if set to True, causes
- all :class:`.String` datatypes to act as though the
- :paramref:`.String.convert_unicode` flag has been set to ``True``,
- regardless of a setting of ``False`` on an individual :class:`.String`
- type. This has the effect of causing all :class:`.String` -based
- columns to accommodate Python Unicode objects directly as though the
- datatype were the :class:`.Unicode` type.
-
- .. deprecated:: 1.3
-
- The :paramref:`.create_engine.convert_unicode` parameter
- is deprecated and will be removed in a future release.
- All modern DBAPIs now support Python Unicode directly and this
- parameter is unnecessary.
-
- :param creator: a callable which returns a DBAPI connection.
- This creation function will be passed to the underlying
- connection pool and will be used to create all new database
- connections. Usage of this function causes connection
- parameters specified in the URL argument to be bypassed.
-
- :param echo=False: if True, the Engine will log all statements
- as well as a ``repr()`` of their parameter lists to the default log
- handler, which defaults to ``sys.stdout`` for output. If set to the
- string ``"debug"``, result rows will be printed to the standard output
- as well. The ``echo`` attribute of ``Engine`` can be modified at any
- time to turn logging on and off; direct control of logging is also
- available using the standard Python ``logging`` module.
-
- .. seealso::
-
- :ref:`dbengine_logging` - further detail on how to configure
- logging.
-
- :param echo_pool=False: if True, the connection pool will log
- informational output such as when connections are invalidated
- as well as when connections are recycled to the default log handler,
- which defaults to ``sys.stdout`` for output. If set to the string
- ``"debug"``, the logging will include pool checkouts and checkins.
- Direct control of logging is also available using the standard Python
- ``logging`` module.
-
- .. seealso::
-
- :ref:`dbengine_logging` - further detail on how to configure
- logging.
-
-
- :param empty_in_strategy: The SQL compilation strategy to use when
- rendering an IN or NOT IN expression for :meth:`.ColumnOperators.in_`
- where the right-hand side
- is an empty set. This is a string value that may be one of
- ``static``, ``dynamic``, or ``dynamic_warn``. The ``static``
- strategy is the default, and an IN comparison to an empty set
- will generate a simple false expression "1 != 1". The ``dynamic``
- strategy behaves like that of SQLAlchemy 1.1 and earlier, emitting
- a false expression of the form "expr != expr", which has the effect
- of evaluting to NULL in the case of a null expression.
- ``dynamic_warn`` is the same as ``dynamic``, however also emits a
- warning when an empty set is encountered; this because the "dynamic"
- comparison is typically poorly performing on most databases.
-
- .. versionadded:: 1.2 Added the ``empty_in_strategy`` setting and
- additionally defaulted the behavior for empty-set IN comparisons
- to a static boolean expression.
-
- :param encoding: Defaults to ``utf-8``. This is the string
- encoding used by SQLAlchemy for string encode/decode
- operations which occur within SQLAlchemy, **outside of
- the DBAPI.** Most modern DBAPIs feature some degree of
- direct support for Python ``unicode`` objects,
- what you see in Python 2 as a string of the form
- ``u'some string'``. For those scenarios where the
- DBAPI is detected as not supporting a Python ``unicode``
- object, this encoding is used to determine the
- source/destination encoding. It is **not used**
- for those cases where the DBAPI handles unicode
- directly.
-
- To properly configure a system to accommodate Python
- ``unicode`` objects, the DBAPI should be
- configured to handle unicode to the greatest
- degree as is appropriate - see
- the notes on unicode pertaining to the specific
- target database in use at :ref:`dialect_toplevel`.
-
- Areas where string encoding may need to be accommodated
- outside of the DBAPI include zero or more of:
-
- * the values passed to bound parameters, corresponding to
- the :class:`.Unicode` type or the :class:`.String` type
- when ``convert_unicode`` is ``True``;
- * the values returned in result set columns corresponding
- to the :class:`.Unicode` type or the :class:`.String`
- type when ``convert_unicode`` is ``True``;
- * the string SQL statement passed to the DBAPI's
- ``cursor.execute()`` method;
- * the string names of the keys in the bound parameter
- dictionary passed to the DBAPI's ``cursor.execute()``
- as well as ``cursor.setinputsizes()`` methods;
- * the string column names retrieved from the DBAPI's
- ``cursor.description`` attribute.
-
- When using Python 3, the DBAPI is required to support
- *all* of the above values as Python ``unicode`` objects,
- which in Python 3 are just known as ``str``. In Python 2,
- the DBAPI does not specify unicode behavior at all,
- so SQLAlchemy must make decisions for each of the above
- values on a per-DBAPI basis - implementations are
- completely inconsistent in their behavior.
-
- :param execution_options: Dictionary execution options which will
- be applied to all connections. See
- :meth:`~sqlalchemy.engine.Connection.execution_options`
-
- :param implicit_returning=True: When ``True``, a RETURNING-
- compatible construct, if available, will be used to
- fetch newly generated primary key values when a single row
- INSERT statement is emitted with no existing returning()
- clause. This applies to those backends which support RETURNING
- or a compatible construct, including PostgreSQL, Firebird, Oracle,
- Microsoft SQL Server. Set this to ``False`` to disable
- the automatic usage of RETURNING.
-
- :param isolation_level: this string parameter is interpreted by various
- dialects in order to affect the transaction isolation level of the
- database connection. The parameter essentially accepts some subset of
- these string arguments: ``"SERIALIZABLE"``, ``"REPEATABLE_READ"``,
- ``"READ_COMMITTED"``, ``"READ_UNCOMMITTED"`` and ``"AUTOCOMMIT"``.
- Behavior here varies per backend, and
- individual dialects should be consulted directly.
-
- Note that the isolation level can also be set on a
- per-:class:`.Connection` basis as well, using the
- :paramref:`.Connection.execution_options.isolation_level`
- feature.
-
- .. seealso::
-
- :attr:`.Connection.default_isolation_level` - view default level
-
- :paramref:`.Connection.execution_options.isolation_level`
- - set per :class:`.Connection` isolation level
-
- :ref:`SQLite Transaction Isolation <sqlite_isolation_level>`
-
- :ref:`PostgreSQL Transaction Isolation <postgresql_isolation_level>`
-
- :ref:`MySQL Transaction Isolation <mysql_isolation_level>`
-
- :ref:`session_transaction_isolation` - for the ORM
-
- :param label_length=None: optional integer value which limits
- the size of dynamically generated column labels to that many
- characters. If less than 6, labels are generated as
- "_(counter)". If ``None``, the value of
- ``dialect.max_identifier_length`` is used instead.
-
- :param listeners: A list of one or more
- :class:`~sqlalchemy.interfaces.PoolListener` objects which will
- receive connection pool events.
-
- :param logging_name: String identifier which will be used within
- the "name" field of logging records generated within the
- "sqlalchemy.engine" logger. Defaults to a hexstring of the
- object's id.
-
- :param max_overflow=10: the number of connections to allow in
- connection pool "overflow", that is connections that can be
- opened above and beyond the pool_size setting, which defaults
- to five. this is only used with :class:`~sqlalchemy.pool.QueuePool`.
-
- :param module=None: reference to a Python module object (the module
- itself, not its string name). Specifies an alternate DBAPI module to
- be used by the engine's dialect. Each sub-dialect references a
- specific DBAPI which will be imported before first connect. This
- parameter causes the import to be bypassed, and the given module to
- be used instead. Can be used for testing of DBAPIs as well as to
- inject "mock" DBAPI implementations into the :class:`.Engine`.
-
- :param paramstyle=None: The `paramstyle <http://legacy.python.org/dev/peps/pep-0249/#paramstyle>`_
- to use when rendering bound parameters. This style defaults to the
- one recommended by the DBAPI itself, which is retrieved from the
- ``.paramstyle`` attribute of the DBAPI. However, most DBAPIs accept
- more than one paramstyle, and in particular it may be desirable
- to change a "named" paramstyle into a "positional" one, or vice versa.
- When this attribute is passed, it should be one of the values
- ``"qmark"``, ``"numeric"``, ``"named"``, ``"format"`` or
- ``"pyformat"``, and should correspond to a parameter style known
- to be supported by the DBAPI in use.
-
- :param pool=None: an already-constructed instance of
- :class:`~sqlalchemy.pool.Pool`, such as a
- :class:`~sqlalchemy.pool.QueuePool` instance. If non-None, this
- pool will be used directly as the underlying connection pool
- for the engine, bypassing whatever connection parameters are
- present in the URL argument. For information on constructing
- connection pools manually, see :ref:`pooling_toplevel`.
-
- :param poolclass=None: a :class:`~sqlalchemy.pool.Pool`
- subclass, which will be used to create a connection pool
- instance using the connection parameters given in the URL. Note
- this differs from ``pool`` in that you don't actually
- instantiate the pool in this case, you just indicate what type
- of pool to be used.
-
- :param pool_logging_name: String identifier which will be used within
- the "name" field of logging records generated within the
- "sqlalchemy.pool" logger. Defaults to a hexstring of the object's
- id.
-
- :param pool_pre_ping: boolean, if True will enable the connection pool
- "pre-ping" feature that tests connections for liveness upon
- each checkout.
-
- .. versionadded:: 1.2
-
- .. seealso::
-
- :ref:`pool_disconnects_pessimistic`
-
- :param pool_size=5: the number of connections to keep open
- inside the connection pool. This used with
- :class:`~sqlalchemy.pool.QueuePool` as
- well as :class:`~sqlalchemy.pool.SingletonThreadPool`. With
- :class:`~sqlalchemy.pool.QueuePool`, a ``pool_size`` setting
- of 0 indicates no limit; to disable pooling, set ``poolclass`` to
- :class:`~sqlalchemy.pool.NullPool` instead.
-
- :param pool_recycle=-1: this setting causes the pool to recycle
- connections after the given number of seconds has passed. It
- defaults to -1, or no timeout. For example, setting to 3600
- means connections will be recycled after one hour. Note that
- MySQL in particular will disconnect automatically if no
- activity is detected on a connection for eight hours (although
- this is configurable with the MySQLDB connection itself and the
- server configuration as well).
-
- .. seealso::
-
- :ref:`pool_setting_recycle`
-
- :param pool_reset_on_return='rollback': set the
- :paramref:`.Pool.reset_on_return` parameter of the underlying
- :class:`.Pool` object, which can be set to the values
- ``"rollback"``, ``"commit"``, or ``None``.
-
- .. seealso::
-
- :paramref:`.Pool.reset_on_return`
-
- :param pool_timeout=30: number of seconds to wait before giving
- up on getting a connection from the pool. This is only used
- with :class:`~sqlalchemy.pool.QueuePool`.
-
- :param pool_use_lifo=False: use LIFO (last-in-first-out) when retrieving
- connections from :class:`.QueuePool` instead of FIFO
- (first-in-first-out). Using LIFO, a server-side timeout scheme can
- reduce the number of connections used during non- peak periods of
- use. When planning for server-side timeouts, ensure that a recycle or
- pre-ping strategy is in use to gracefully handle stale connections.
-
- .. versionadded:: 1.3
-
- .. seealso::
-
- :ref:`pool_use_lifo`
-
- :ref:`pool_disconnects`
-
- :param plugins: string list of plugin names to load. See
- :class:`.CreateEnginePlugin` for background.
-
- .. versionadded:: 1.2.3
-
- :param strategy='plain': selects alternate engine implementations.
- Currently available are:
-
- * the ``threadlocal`` strategy, which is described in
- :ref:`threadlocal_strategy`;
- * the ``mock`` strategy, which dispatches all statement
- execution to a function passed as the argument ``executor``.
- See `example in the FAQ
- <http://docs.sqlalchemy.org/en/latest/faq/metadata_schema.html#how-can-i-get-the-create-table-drop-table-output-as-a-string>`_.
-
- :param executor=None: a function taking arguments
- ``(sql, *multiparams, **params)``, to which the ``mock`` strategy will
- dispatch all statement execution. Used only by ``strategy='mock'``.
-
- """ # noqa
-
- strategy = kwargs.pop("strategy", default_strategy)
- strategy = strategies.strategies[strategy]
- return strategy.create(*args, **kwargs)
-
-
-def engine_from_config(configuration, prefix="sqlalchemy.", **kwargs):
- """Create a new Engine instance using a configuration dictionary.
-
- The dictionary is typically produced from a config file.
-
- The keys of interest to ``engine_from_config()`` should be prefixed, e.g.
- ``sqlalchemy.url``, ``sqlalchemy.echo``, etc. The 'prefix' argument
- indicates the prefix to be searched for. Each matching key (after the
- prefix is stripped) is treated as though it were the corresponding keyword
- argument to a :func:`.create_engine` call.
-
- The only required key is (assuming the default prefix) ``sqlalchemy.url``,
- which provides the :ref:`database URL <database_urls>`.
-
- A select set of keyword arguments will be "coerced" to their
- expected type based on string values. The set of arguments
- is extensible per-dialect using the ``engine_config_types`` accessor.
-
- :param configuration: A dictionary (typically produced from a config file,
- but this is not a requirement). Items whose keys start with the value
- of 'prefix' will have that prefix stripped, and will then be passed to
- :ref:`create_engine`.
-
- :param prefix: Prefix to match and then strip from keys
- in 'configuration'.
-
- :param kwargs: Each keyword argument to ``engine_from_config()`` itself
- overrides the corresponding item taken from the 'configuration'
- dictionary. Keyword arguments should *not* be prefixed.
-
- """
-
- options = dict(
- (key[len(prefix) :], configuration[key])
- for key in configuration
- if key.startswith(prefix)
- )
- options["_coerce_config"] = True
- options.update(kwargs)
- url = options.pop("url")
- return create_engine(url, **options)
-
-
-__all__ = ("create_engine", "engine_from_config")
+__all__ = ("create_engine", "engine_from_config", "create_mock_engine")
):
"""Construct a new Connection.
- The constructor here is not public and is only called only by an
- :class:`.Engine`. See :meth:`.Engine.connect` and
- :meth:`.Engine.contextual_connect` methods.
-
"""
self.engine = engine
self.dialect = engine.dialect
primary key of a row where we need to get the value back, so we have
to invoke it distinctly - this is a very uncommon case.
- Userland code accesses _branch() when the connect() or
- contextual_connect() methods are called. The branched connection
+ Userland code accesses _branch() when the connect()
+ method is called. The branched connection
acts as much as possible like the parent, except that it stays
connected when a close() event occurs.
return self.connection.info
- def connect(self):
+ def connect(self, close_with_result=False):
"""Returns a branched version of this :class:`.Connection`.
The :meth:`.Connection.close` method on the returned
return self._branch()
- def _contextual_connect(self, **kwargs):
- return self._branch()
-
def invalidate(self, exception=None):
"""Invalidate the underlying DBAPI connection associated with
this :class:`.Connection`.
self.dispatch.engine_disposed(self)
def _execute_default(self, default):
- with self._contextual_connect() as conn:
+ with self.connect() as conn:
return conn._execute_default(default, (), {})
@contextlib.contextmanager
def _optional_conn_ctx_manager(self, connection=None):
if connection is None:
- with self._contextual_connect() as conn:
+ with self.connect() as conn:
yield conn
else:
yield connection
for a particular :class:`.Connection`.
"""
- conn = self._contextual_connect(close_with_result=close_with_result)
+ conn = self.connect(close_with_result=close_with_result)
try:
trans = conn.begin()
except:
r"""Execute the given function within a transaction boundary.
The function is passed a :class:`.Connection` newly procured
- from :meth:`.Engine.contextual_connect` as the first argument,
+ from :meth:`.Engine.connect` as the first argument,
followed by the given \*args and \**kwargs.
e.g.::
"""
- with self._contextual_connect() as conn:
+ with self.connect() as conn:
return conn.transaction(callable_, *args, **kwargs)
def run_callable(self, callable_, *args, **kwargs):
which one is being dealt with.
"""
- with self._contextual_connect() as conn:
+ with self.connect() as conn:
return conn.run_callable(callable_, *args, **kwargs)
def execute(self, statement, *multiparams, **params):
:meth:`.Connection.execute`.
Here, a :class:`.Connection` is acquired using the
- :meth:`~.Engine.contextual_connect` method, and the statement executed
+ :meth:`~.Engine.connect` method, and the statement executed
with that connection. The returned :class:`.ResultProxy` is flagged
such that when the :class:`.ResultProxy` is exhausted and its
underlying cursor is closed, the :class:`.Connection` created here
"""
- connection = self._contextual_connect(close_with_result=True)
+ connection = self.connect(close_with_result=True)
return connection.execute(statement, *multiparams, **params)
def scalar(self, statement, *multiparams, **params):
return self.execute(statement, *multiparams, **params).scalar()
def _execute_clauseelement(self, elem, multiparams=None, params=None):
- connection = self._contextual_connect(close_with_result=True)
+ connection = self.connect(close_with_result=True)
return connection._execute_clauseelement(elem, multiparams, params)
def _execute_compiled(self, compiled, multiparams, params):
- connection = self._contextual_connect(close_with_result=True)
+ connection = self.connect(close_with_result=True)
return connection._execute_compiled(compiled, multiparams, params)
- def connect(self, **kwargs):
+ def connect(self, close_with_result=False):
"""Return a new :class:`.Connection` object.
The :class:`.Connection` object is a facade that uses a DBAPI
"""
- return self._connection_cls(self, **kwargs)
-
- @util.deprecated(
- "1.3",
- "The :meth:`.Engine.contextual_connect` method is deprecated. This "
- "method is an artifact of the threadlocal engine strategy which is "
- "also to be deprecated. For explicit connections from an "
- ":class:`.Engine`, use the :meth:`.Engine.connect` method.",
- )
- def contextual_connect(self, close_with_result=False, **kwargs):
- """Return a :class:`.Connection` object which may be part of some
- ongoing context.
-
- By default, this method does the same thing as :meth:`.Engine.connect`.
- Subclasses of :class:`.Engine` may override this method
- to provide contextual behavior.
-
- :param close_with_result: When True, the first :class:`.ResultProxy`
- created by the :class:`.Connection` will call the
- :meth:`.Connection.close` method of that connection as soon as any
- pending result rows are exhausted. This is used to supply the
- "connectionless execution" behavior provided by the
- :meth:`.Engine.execute` method.
-
- """
-
- return self._contextual_connect(
- close_with_result=close_with_result, **kwargs
- )
-
- def _contextual_connect(self, close_with_result=False, **kwargs):
- return self._connection_cls(
- self,
- self._wrap_pool_connect(self.pool.connect, None),
- close_with_result=close_with_result,
- **kwargs
- )
+ return self._connection_cls(self, close_with_result=close_with_result)
def table_names(self, schema=None, connection=None):
"""Return a list of all table names available in the database.
:param schema: Optional, retrieve names from a non-default schema.
- :param connection: Optional, use a specified connection. Default is
- the ``contextual_connect`` for this ``Engine``.
+ :param connection: Optional, use a specified connection.
"""
with self._optional_conn_ctx_manager(connection) as conn:
:ref:`dbapi_connections`
"""
- return self._wrap_pool_connect(
- self.pool.unique_connection, _connection
- )
+ return self._wrap_pool_connect(self.pool.connect, _connection)
class OptionEngine(Engine):
--- /dev/null
+# engine/create.py
+# Copyright (C) 2005-2019 the SQLAlchemy authors and contributors
+# <see AUTHORS file>
+#
+# This module is part of SQLAlchemy and is released under
+# the MIT License: http://www.opensource.org/licenses/mit-license.php
+
+
+from . import base
+from . import url as _url
+from .mock import create_mock_engine
+from .. import event
+from .. import exc
+from .. import pool as poollib
+from .. import util
+
+
+@util.deprecated_params(
+ strategy=(
+ "1.4",
+ "The :paramref:`.create_engine.strategy` keyword is deprecated, "
+ "and the only argument accepted is 'mock'; please use "
+ ":func:`.create_mock_engine` going forward. For general "
+ "customization of create_engine which may have been accomplished "
+ "using strategies, see :class:`.CreateEnginePlugin`.",
+ )
+)
+def create_engine(url, **kwargs):
+ """Create a new :class:`.Engine` instance.
+
+ The standard calling form is to send the URL as the
+ first positional argument, usually a string
+ that indicates database dialect and connection arguments::
+
+
+ engine = create_engine("postgresql://scott:tiger@localhost/test")
+
+ Additional keyword arguments may then follow it which
+ establish various options on the resulting :class:`.Engine`
+ and its underlying :class:`.Dialect` and :class:`.Pool`
+ constructs::
+
+ engine = create_engine("mysql://scott:tiger@hostname/dbname",
+ encoding='latin1', echo=True)
+
+ The string form of the URL is
+ ``dialect[+driver]://user:password@host/dbname[?key=value..]``, where
+ ``dialect`` is a database name such as ``mysql``, ``oracle``,
+ ``postgresql``, etc., and ``driver`` the name of a DBAPI, such as
+ ``psycopg2``, ``pyodbc``, ``cx_oracle``, etc. Alternatively,
+ the URL can be an instance of :class:`~sqlalchemy.engine.url.URL`.
+
+ ``**kwargs`` takes a wide variety of options which are routed
+ towards their appropriate components. Arguments may be specific to
+ the :class:`.Engine`, the underlying :class:`.Dialect`, as well as the
+ :class:`.Pool`. Specific dialects also accept keyword arguments that
+ are unique to that dialect. Here, we describe the parameters
+ that are common to most :func:`.create_engine()` usage.
+
+ Once established, the newly resulting :class:`.Engine` will
+ request a connection from the underlying :class:`.Pool` once
+ :meth:`.Engine.connect` is called, or a method which depends on it
+ such as :meth:`.Engine.execute` is invoked. The :class:`.Pool` in turn
+ will establish the first actual DBAPI connection when this request
+ is received. The :func:`.create_engine` call itself does **not**
+ establish any actual DBAPI connections directly.
+
+ .. seealso::
+
+ :doc:`/core/engines`
+
+ :doc:`/dialects/index`
+
+ :ref:`connections_toplevel`
+
+ :param case_sensitive=True: if False, result column names
+ will match in a case-insensitive fashion, that is,
+ ``row['SomeColumn']``.
+
+ :param connect_args: a dictionary of options which will be
+ passed directly to the DBAPI's ``connect()`` method as
+ additional keyword arguments. See the example
+ at :ref:`custom_dbapi_args`.
+
+ :param convert_unicode=False: if set to True, causes
+ all :class:`.String` datatypes to act as though the
+ :paramref:`.String.convert_unicode` flag has been set to ``True``,
+ regardless of a setting of ``False`` on an individual :class:`.String`
+ type. This has the effect of causing all :class:`.String` -based
+ columns to accommodate Python Unicode objects directly as though the
+ datatype were the :class:`.Unicode` type.
+
+ .. deprecated:: 1.3
+
+ The :paramref:`.create_engine.convert_unicode` parameter
+ is deprecated and will be removed in a future release.
+ All modern DBAPIs now support Python Unicode directly and this
+ parameter is unnecessary.
+
+ :param creator: a callable which returns a DBAPI connection.
+ This creation function will be passed to the underlying
+ connection pool and will be used to create all new database
+ connections. Usage of this function causes connection
+ parameters specified in the URL argument to be bypassed.
+
+ :param echo=False: if True, the Engine will log all statements
+ as well as a ``repr()`` of their parameter lists to the default log
+ handler, which defaults to ``sys.stdout`` for output. If set to the
+ string ``"debug"``, result rows will be printed to the standard output
+ as well. The ``echo`` attribute of ``Engine`` can be modified at any
+ time to turn logging on and off; direct control of logging is also
+ available using the standard Python ``logging`` module.
+
+ .. seealso::
+
+ :ref:`dbengine_logging` - further detail on how to configure
+ logging.
+
+ :param echo_pool=False: if True, the connection pool will log
+ informational output such as when connections are invalidated
+ as well as when connections are recycled to the default log handler,
+ which defaults to ``sys.stdout`` for output. If set to the string
+ ``"debug"``, the logging will include pool checkouts and checkins.
+ Direct control of logging is also available using the standard Python
+ ``logging`` module.
+
+ .. seealso::
+
+ :ref:`dbengine_logging` - further detail on how to configure
+ logging.
+
+
+ :param empty_in_strategy: The SQL compilation strategy to use when
+ rendering an IN or NOT IN expression for :meth:`.ColumnOperators.in_`
+ where the right-hand side
+ is an empty set. This is a string value that may be one of
+ ``static``, ``dynamic``, or ``dynamic_warn``. The ``static``
+ strategy is the default, and an IN comparison to an empty set
+ will generate a simple false expression "1 != 1". The ``dynamic``
+ strategy behaves like that of SQLAlchemy 1.1 and earlier, emitting
+ a false expression of the form "expr != expr", which has the effect
+ of evaluting to NULL in the case of a null expression.
+ ``dynamic_warn`` is the same as ``dynamic``, however also emits a
+ warning when an empty set is encountered; this because the "dynamic"
+ comparison is typically poorly performing on most databases.
+
+ .. versionadded:: 1.2 Added the ``empty_in_strategy`` setting and
+ additionally defaulted the behavior for empty-set IN comparisons
+ to a static boolean expression.
+
+ :param encoding: Defaults to ``utf-8``. This is the string
+ encoding used by SQLAlchemy for string encode/decode
+ operations which occur within SQLAlchemy, **outside of
+ the DBAPI.** Most modern DBAPIs feature some degree of
+ direct support for Python ``unicode`` objects,
+ what you see in Python 2 as a string of the form
+ ``u'some string'``. For those scenarios where the
+ DBAPI is detected as not supporting a Python ``unicode``
+ object, this encoding is used to determine the
+ source/destination encoding. It is **not used**
+ for those cases where the DBAPI handles unicode
+ directly.
+
+ To properly configure a system to accommodate Python
+ ``unicode`` objects, the DBAPI should be
+ configured to handle unicode to the greatest
+ degree as is appropriate - see
+ the notes on unicode pertaining to the specific
+ target database in use at :ref:`dialect_toplevel`.
+
+ Areas where string encoding may need to be accommodated
+ outside of the DBAPI include zero or more of:
+
+ * the values passed to bound parameters, corresponding to
+ the :class:`.Unicode` type or the :class:`.String` type
+ when ``convert_unicode`` is ``True``;
+ * the values returned in result set columns corresponding
+ to the :class:`.Unicode` type or the :class:`.String`
+ type when ``convert_unicode`` is ``True``;
+ * the string SQL statement passed to the DBAPI's
+ ``cursor.execute()`` method;
+ * the string names of the keys in the bound parameter
+ dictionary passed to the DBAPI's ``cursor.execute()``
+ as well as ``cursor.setinputsizes()`` methods;
+ * the string column names retrieved from the DBAPI's
+ ``cursor.description`` attribute.
+
+ When using Python 3, the DBAPI is required to support
+ *all* of the above values as Python ``unicode`` objects,
+ which in Python 3 are just known as ``str``. In Python 2,
+ the DBAPI does not specify unicode behavior at all,
+ so SQLAlchemy must make decisions for each of the above
+ values on a per-DBAPI basis - implementations are
+ completely inconsistent in their behavior.
+
+ :param execution_options: Dictionary execution options which will
+ be applied to all connections. See
+ :meth:`~sqlalchemy.engine.Connection.execution_options`
+
+ :param implicit_returning=True: When ``True``, a RETURNING-
+ compatible construct, if available, will be used to
+ fetch newly generated primary key values when a single row
+ INSERT statement is emitted with no existing returning()
+ clause. This applies to those backends which support RETURNING
+ or a compatible construct, including PostgreSQL, Firebird, Oracle,
+ Microsoft SQL Server. Set this to ``False`` to disable
+ the automatic usage of RETURNING.
+
+ :param isolation_level: this string parameter is interpreted by various
+ dialects in order to affect the transaction isolation level of the
+ database connection. The parameter essentially accepts some subset of
+ these string arguments: ``"SERIALIZABLE"``, ``"REPEATABLE_READ"``,
+ ``"READ_COMMITTED"``, ``"READ_UNCOMMITTED"`` and ``"AUTOCOMMIT"``.
+ Behavior here varies per backend, and
+ individual dialects should be consulted directly.
+
+ Note that the isolation level can also be set on a
+ per-:class:`.Connection` basis as well, using the
+ :paramref:`.Connection.execution_options.isolation_level`
+ feature.
+
+ .. seealso::
+
+ :attr:`.Connection.default_isolation_level` - view default level
+
+ :paramref:`.Connection.execution_options.isolation_level`
+ - set per :class:`.Connection` isolation level
+
+ :ref:`SQLite Transaction Isolation <sqlite_isolation_level>`
+
+ :ref:`PostgreSQL Transaction Isolation <postgresql_isolation_level>`
+
+ :ref:`MySQL Transaction Isolation <mysql_isolation_level>`
+
+ :ref:`session_transaction_isolation` - for the ORM
+
+ :param label_length=None: optional integer value which limits
+ the size of dynamically generated column labels to that many
+ characters. If less than 6, labels are generated as
+ "_(counter)". If ``None``, the value of
+ ``dialect.max_identifier_length`` is used instead.
+
+ :param listeners: A list of one or more
+ :class:`~sqlalchemy.interfaces.PoolListener` objects which will
+ receive connection pool events.
+
+ :param logging_name: String identifier which will be used within
+ the "name" field of logging records generated within the
+ "sqlalchemy.engine" logger. Defaults to a hexstring of the
+ object's id.
+
+ :param max_overflow=10: the number of connections to allow in
+ connection pool "overflow", that is connections that can be
+ opened above and beyond the pool_size setting, which defaults
+ to five. this is only used with :class:`~sqlalchemy.pool.QueuePool`.
+
+ :param module=None: reference to a Python module object (the module
+ itself, not its string name). Specifies an alternate DBAPI module to
+ be used by the engine's dialect. Each sub-dialect references a
+ specific DBAPI which will be imported before first connect. This
+ parameter causes the import to be bypassed, and the given module to
+ be used instead. Can be used for testing of DBAPIs as well as to
+ inject "mock" DBAPI implementations into the :class:`.Engine`.
+
+ :param paramstyle=None: The `paramstyle <http://legacy.python.org/dev/peps/pep-0249/#paramstyle>`_
+ to use when rendering bound parameters. This style defaults to the
+ one recommended by the DBAPI itself, which is retrieved from the
+ ``.paramstyle`` attribute of the DBAPI. However, most DBAPIs accept
+ more than one paramstyle, and in particular it may be desirable
+ to change a "named" paramstyle into a "positional" one, or vice versa.
+ When this attribute is passed, it should be one of the values
+ ``"qmark"``, ``"numeric"``, ``"named"``, ``"format"`` or
+ ``"pyformat"``, and should correspond to a parameter style known
+ to be supported by the DBAPI in use.
+
+ :param pool=None: an already-constructed instance of
+ :class:`~sqlalchemy.pool.Pool`, such as a
+ :class:`~sqlalchemy.pool.QueuePool` instance. If non-None, this
+ pool will be used directly as the underlying connection pool
+ for the engine, bypassing whatever connection parameters are
+ present in the URL argument. For information on constructing
+ connection pools manually, see :ref:`pooling_toplevel`.
+
+ :param poolclass=None: a :class:`~sqlalchemy.pool.Pool`
+ subclass, which will be used to create a connection pool
+ instance using the connection parameters given in the URL. Note
+ this differs from ``pool`` in that you don't actually
+ instantiate the pool in this case, you just indicate what type
+ of pool to be used.
+
+ :param pool_logging_name: String identifier which will be used within
+ the "name" field of logging records generated within the
+ "sqlalchemy.pool" logger. Defaults to a hexstring of the object's
+ id.
+
+ :param pool_pre_ping: boolean, if True will enable the connection pool
+ "pre-ping" feature that tests connections for liveness upon
+ each checkout.
+
+ .. versionadded:: 1.2
+
+ .. seealso::
+
+ :ref:`pool_disconnects_pessimistic`
+
+ :param pool_size=5: the number of connections to keep open
+ inside the connection pool. This used with
+ :class:`~sqlalchemy.pool.QueuePool` as
+ well as :class:`~sqlalchemy.pool.SingletonThreadPool`. With
+ :class:`~sqlalchemy.pool.QueuePool`, a ``pool_size`` setting
+ of 0 indicates no limit; to disable pooling, set ``poolclass`` to
+ :class:`~sqlalchemy.pool.NullPool` instead.
+
+ :param pool_recycle=-1: this setting causes the pool to recycle
+ connections after the given number of seconds has passed. It
+ defaults to -1, or no timeout. For example, setting to 3600
+ means connections will be recycled after one hour. Note that
+ MySQL in particular will disconnect automatically if no
+ activity is detected on a connection for eight hours (although
+ this is configurable with the MySQLDB connection itself and the
+ server configuration as well).
+
+ .. seealso::
+
+ :ref:`pool_setting_recycle`
+
+ :param pool_reset_on_return='rollback': set the
+ :paramref:`.Pool.reset_on_return` parameter of the underlying
+ :class:`.Pool` object, which can be set to the values
+ ``"rollback"``, ``"commit"``, or ``None``.
+
+ .. seealso::
+
+ :paramref:`.Pool.reset_on_return`
+
+ :param pool_timeout=30: number of seconds to wait before giving
+ up on getting a connection from the pool. This is only used
+ with :class:`~sqlalchemy.pool.QueuePool`.
+
+ :param pool_use_lifo=False: use LIFO (last-in-first-out) when retrieving
+ connections from :class:`.QueuePool` instead of FIFO
+ (first-in-first-out). Using LIFO, a server-side timeout scheme can
+ reduce the number of connections used during non- peak periods of
+ use. When planning for server-side timeouts, ensure that a recycle or
+ pre-ping strategy is in use to gracefully handle stale connections.
+
+ .. versionadded:: 1.3
+
+ .. seealso::
+
+ :ref:`pool_use_lifo`
+
+ :ref:`pool_disconnects`
+
+ :param plugins: string list of plugin names to load. See
+ :class:`.CreateEnginePlugin` for background.
+
+ .. versionadded:: 1.2.3
+
+
+ """ # noqa
+
+ if "strategy" in kwargs:
+ strat = kwargs.pop("strategy")
+ if strat == "mock":
+ return create_mock_engine(url, **kwargs)
+ else:
+ raise exc.ArgumentError("unknown strategy: %r" % strat)
+
+ # create url.URL object
+ u = _url.make_url(url)
+
+ plugins = u._instantiate_plugins(kwargs)
+
+ u.query.pop("plugin", None)
+ kwargs.pop("plugins", None)
+
+ entrypoint = u._get_entrypoint()
+ dialect_cls = entrypoint.get_dialect_cls(u)
+
+ if kwargs.pop("_coerce_config", False):
+
+ def pop_kwarg(key, default=None):
+ value = kwargs.pop(key, default)
+ if key in dialect_cls.engine_config_types:
+ value = dialect_cls.engine_config_types[key](value)
+ return value
+
+ else:
+ pop_kwarg = kwargs.pop
+
+ dialect_args = {}
+ # consume dialect arguments from kwargs
+ for k in util.get_cls_kwargs(dialect_cls):
+ if k in kwargs:
+ dialect_args[k] = pop_kwarg(k)
+
+ dbapi = kwargs.pop("module", None)
+ if dbapi is None:
+ dbapi_args = {}
+ for k in util.get_func_kwargs(dialect_cls.dbapi):
+ if k in kwargs:
+ dbapi_args[k] = pop_kwarg(k)
+ dbapi = dialect_cls.dbapi(**dbapi_args)
+
+ dialect_args["dbapi"] = dbapi
+
+ for plugin in plugins:
+ plugin.handle_dialect_kwargs(dialect_cls, dialect_args)
+
+ # create dialect
+ dialect = dialect_cls(**dialect_args)
+
+ # assemble connection arguments
+ (cargs, cparams) = dialect.create_connect_args(u)
+ cparams.update(pop_kwarg("connect_args", {}))
+ cargs = list(cargs) # allow mutability
+
+ # look for existing pool or create
+ pool = pop_kwarg("pool", None)
+ if pool is None:
+
+ def connect(connection_record=None):
+ if dialect._has_events:
+ for fn in dialect.dispatch.do_connect:
+ connection = fn(dialect, connection_record, cargs, cparams)
+ if connection is not None:
+ return connection
+ return dialect.connect(*cargs, **cparams)
+
+ creator = pop_kwarg("creator", connect)
+
+ poolclass = pop_kwarg("poolclass", None)
+ if poolclass is None:
+ poolclass = dialect_cls.get_pool_class(u)
+ pool_args = {"dialect": dialect}
+
+ # consume pool arguments from kwargs, translating a few of
+ # the arguments
+ translate = {
+ "logging_name": "pool_logging_name",
+ "echo": "echo_pool",
+ "timeout": "pool_timeout",
+ "recycle": "pool_recycle",
+ "events": "pool_events",
+ "reset_on_return": "pool_reset_on_return",
+ "pre_ping": "pool_pre_ping",
+ "use_lifo": "pool_use_lifo",
+ }
+ for k in util.get_cls_kwargs(poolclass):
+ tk = translate.get(k, k)
+ if tk in kwargs:
+ pool_args[k] = pop_kwarg(tk)
+
+ for plugin in plugins:
+ plugin.handle_pool_kwargs(poolclass, pool_args)
+
+ pool = poolclass(creator, **pool_args)
+ else:
+ if isinstance(pool, poollib.dbapi_proxy._DBProxy):
+ pool = pool.get_pool(*cargs, **cparams)
+ else:
+ pool = pool
+
+ pool._dialect = dialect
+
+ # create engine.
+ engineclass = base.Engine
+ engine_args = {}
+ for k in util.get_cls_kwargs(engineclass):
+ if k in kwargs:
+ engine_args[k] = pop_kwarg(k)
+
+ _initialize = kwargs.pop("_initialize", True)
+
+ # all kwargs should be consumed
+ if kwargs:
+ raise TypeError(
+ "Invalid argument(s) %s sent to create_engine(), "
+ "using configuration %s/%s/%s. Please check that the "
+ "keyword arguments are appropriate for this combination "
+ "of components."
+ % (
+ ",".join("'%s'" % k for k in kwargs),
+ dialect.__class__.__name__,
+ pool.__class__.__name__,
+ engineclass.__name__,
+ )
+ )
+
+ engine = engineclass(pool, dialect, u, **engine_args)
+
+ if _initialize:
+ do_on_connect = dialect.on_connect()
+ if do_on_connect:
+
+ def on_connect(dbapi_connection, connection_record):
+ conn = getattr(
+ dbapi_connection, "_sqla_unwrap", dbapi_connection
+ )
+ if conn is None:
+ return
+ do_on_connect(conn)
+
+ event.listen(pool, "first_connect", on_connect)
+ event.listen(pool, "connect", on_connect)
+
+ def first_connect(dbapi_connection, connection_record):
+ c = base.Connection(
+ engine, connection=dbapi_connection, _has_events=False
+ )
+ c._execution_options = util.immutabledict()
+ dialect.initialize(c)
+ dialect.do_rollback(c.connection)
+
+ event.listen(pool, "first_connect", first_connect, once=True)
+
+ dialect_cls.engine_created(engine)
+ if entrypoint is not dialect_cls:
+ entrypoint.engine_created(engine)
+
+ for plugin in plugins:
+ plugin.engine_created(engine)
+
+ return engine
+
+
+def engine_from_config(configuration, prefix="sqlalchemy.", **kwargs):
+ """Create a new Engine instance using a configuration dictionary.
+
+ The dictionary is typically produced from a config file.
+
+ The keys of interest to ``engine_from_config()`` should be prefixed, e.g.
+ ``sqlalchemy.url``, ``sqlalchemy.echo``, etc. The 'prefix' argument
+ indicates the prefix to be searched for. Each matching key (after the
+ prefix is stripped) is treated as though it were the corresponding keyword
+ argument to a :func:`.create_engine` call.
+
+ The only required key is (assuming the default prefix) ``sqlalchemy.url``,
+ which provides the :ref:`database URL <database_urls>`.
+
+ A select set of keyword arguments will be "coerced" to their
+ expected type based on string values. The set of arguments
+ is extensible per-dialect using the ``engine_config_types`` accessor.
+
+ :param configuration: A dictionary (typically produced from a config file,
+ but this is not a requirement). Items whose keys start with the value
+ of 'prefix' will have that prefix stripped, and will then be passed to
+ :ref:`create_engine`.
+
+ :param prefix: Prefix to match and then strip from keys
+ in 'configuration'.
+
+ :param kwargs: Each keyword argument to ``engine_from_config()`` itself
+ overrides the corresponding item taken from the 'configuration'
+ dictionary. Keyword arguments should *not* be prefixed.
+
+ """
+
+ options = dict(
+ (key[len(prefix) :], configuration[key])
+ for key in configuration
+ if key.startswith(prefix)
+ )
+ options["_coerce_config"] = True
+ options.update(kwargs)
+ url = options.pop("url")
+ return create_engine(url, **options)
("pool_recycle", util.asint),
("pool_size", util.asint),
("max_overflow", util.asint),
- ("pool_threadlocal", util.asbool),
]
)
"""
- @util.deprecated(
- "1.3",
- "The :meth:`.Engine.contextual_connect` and "
- ":meth:`.Connection.contextual_connect` methods are deprecated. This "
- "method is an artifact of the threadlocal engine strategy which is "
- "also to be deprecated. For explicit connections from an "
- ":class:`.Engine`, use the :meth:`.Engine.connect` method.",
- )
- def contextual_connect(self, *arg, **kw):
- """Return a :class:`.Connection` object which may be part of an ongoing
- context.
-
- Depending on context, this may be ``self`` if this object
- is already an instance of :class:`.Connection`, or a newly
- procured :class:`.Connection` if this object is an instance
- of :class:`.Engine`.
-
- """
-
- return self._contextual_connect(*arg, **kw)
-
- def _contextual_connect(self):
- raise NotImplementedError()
-
@util.deprecated(
"0.7",
"The :meth:`.Connectable.create` method is deprecated and will be "
--- /dev/null
+# engine/mock.py
+# Copyright (C) 2005-2019 the SQLAlchemy authors and contributors
+# <see AUTHORS file>
+#
+# This module is part of SQLAlchemy and is released under
+# the MIT License: http://www.opensource.org/licenses/mit-license.php
+
+from operator import attrgetter
+
+from . import base
+from . import url as _url
+from .. import util
+from ..sql import ddl
+from ..sql import schema
+
+
+class MockConnection(base.Connectable):
+ def __init__(self, dialect, execute):
+ self._dialect = dialect
+ self.execute = execute
+
+ engine = property(lambda s: s)
+ dialect = property(attrgetter("_dialect"))
+ name = property(lambda s: s._dialect.name)
+
+ schema_for_object = schema._schema_getter(None)
+
+ def connect(self, **kwargs):
+ return self
+
+ def execution_options(self, **kw):
+ return self
+
+ def compiler(self, statement, parameters, **kwargs):
+ return self._dialect.compiler(
+ statement, parameters, engine=self, **kwargs
+ )
+
+ def create(self, entity, **kwargs):
+ kwargs["checkfirst"] = False
+
+ ddl.SchemaGenerator(self.dialect, self, **kwargs).traverse_single(
+ entity
+ )
+
+ def drop(self, entity, **kwargs):
+ kwargs["checkfirst"] = False
+
+ ddl.SchemaDropper(self.dialect, self, **kwargs).traverse_single(entity)
+
+ def _run_visitor(
+ self, visitorcallable, element, connection=None, **kwargs
+ ):
+ kwargs["checkfirst"] = False
+ visitorcallable(self.dialect, self, **kwargs).traverse_single(element)
+
+ def execute(self, object_, *multiparams, **params):
+ raise NotImplementedError()
+
+
+def create_mock_engine(url, executor, **kw):
+ """Create a "mock" engine used for echoing DDL.
+
+ This is a utility function used for debugging or storing the output of DDL
+ sequences as generated by :meth:`.MetaData.create_all` and related methods.
+
+ The function accepts a URL which is used only to determine the kind of
+ dialect to be used, as well as an "executor" callable function which
+ will receive a SQL expression object and parameters, which can then be
+ echoed or otherwise printed. The executor's return value is not handled,
+ nor does the engine allow regular string statements to be invoked, and
+ is therefore only useful for DDL that is sent to the database without
+ receiving any results.
+
+ E.g.::
+
+ from sqlalchemy import create_mock_engine
+
+ def dump(sql, *multiparams, **params):
+ print(sql.compile(dialect=engine.dialect))
+
+ engine = create_mock_engine('postgresql://', dump)
+ metadata.create_all(engine, checkfirst=False)
+
+ :param url: A string URL which typically needs to contain only the
+ database backend name.
+
+ :param executor: a callable which receives the arguments ``sql``,
+ ``*multiparams`` and ``**params``. The ``sql`` parameter is typically
+ an instance of :class:`.DDLElement`, which can then be compiled into a
+ string using :meth:`.DDLElement.compile`.
+
+ .. versionadded:: 1.4 - the :func:`.create_mock_engine` function replaces
+ the previous "mock" engine strategy used with :func:`.create_engine`.
+
+ .. seealso::
+
+ :ref:`faq_ddl_as_string`
+
+ """
+
+ # create url.URL object
+ u = _url.make_url(url)
+
+ dialect_cls = u.get_dialect()
+
+ dialect_args = {}
+ # consume dialect arguments from kwargs
+ for k in util.get_cls_kwargs(dialect_cls):
+ if k in kw:
+ dialect_args[k] = kwargs.pop(k)
+
+ # create dialect
+ dialect = dialect_cls(**dialect_args)
+
+ return MockConnection(dialect, executor)
# This module is part of SQLAlchemy and is released under
# the MIT License: http://www.opensource.org/licenses/mit-license.php
-"""Strategies for creating new instances of Engine types.
+"""Deprecated mock engine strategy used by Alembic.
-These are semi-private implementation classes which provide the
-underlying behavior for the "strategy" keyword argument available on
-:func:`~sqlalchemy.engine.create_engine`. Current available options are
-``plain``, ``threadlocal``, and ``mock``.
-New strategies can be added via new ``EngineStrategy`` classes.
"""
-from operator import attrgetter
+from .mock import MockConnection # noqa
-from . import base
-from . import threadlocal
-from . import url
-from .. import event
-from .. import pool as poollib
-from .. import util
-from ..sql import schema
-
-strategies = {}
-
-
-class EngineStrategy(object):
- """An adaptor that processes input arguments and produces an Engine.
-
- Provides a ``create`` method that receives input arguments and
- produces an instance of base.Engine or a subclass.
-
- """
-
- def __init__(self):
- strategies[self.name] = self
-
- def create(self, *args, **kwargs):
- """Given arguments, returns a new Engine instance."""
-
- raise NotImplementedError()
-
-
-class DefaultEngineStrategy(EngineStrategy):
- """Base class for built-in strategies."""
-
- def create(self, name_or_url, **kwargs):
- # create url.URL object
- u = url.make_url(name_or_url)
-
- plugins = u._instantiate_plugins(kwargs)
-
- u.query.pop("plugin", None)
- kwargs.pop("plugins", None)
-
- entrypoint = u._get_entrypoint()
- dialect_cls = entrypoint.get_dialect_cls(u)
-
- if kwargs.pop("_coerce_config", False):
-
- def pop_kwarg(key, default=None):
- value = kwargs.pop(key, default)
- if key in dialect_cls.engine_config_types:
- value = dialect_cls.engine_config_types[key](value)
- return value
-
- else:
- pop_kwarg = kwargs.pop
-
- dialect_args = {}
- # consume dialect arguments from kwargs
- for k in util.get_cls_kwargs(dialect_cls):
- if k in kwargs:
- dialect_args[k] = pop_kwarg(k)
-
- dbapi = kwargs.pop("module", None)
- if dbapi is None:
- dbapi_args = {}
- for k in util.get_func_kwargs(dialect_cls.dbapi):
- if k in kwargs:
- dbapi_args[k] = pop_kwarg(k)
- dbapi = dialect_cls.dbapi(**dbapi_args)
-
- dialect_args["dbapi"] = dbapi
-
- for plugin in plugins:
- plugin.handle_dialect_kwargs(dialect_cls, dialect_args)
-
- # create dialect
- dialect = dialect_cls(**dialect_args)
-
- # assemble connection arguments
- (cargs, cparams) = dialect.create_connect_args(u)
- cparams.update(pop_kwarg("connect_args", {}))
- cargs = list(cargs) # allow mutability
-
- # look for existing pool or create
- pool = pop_kwarg("pool", None)
- if pool is None:
-
- def connect(connection_record=None):
- if dialect._has_events:
- for fn in dialect.dispatch.do_connect:
- connection = fn(
- dialect, connection_record, cargs, cparams
- )
- if connection is not None:
- return connection
- return dialect.connect(*cargs, **cparams)
-
- creator = pop_kwarg("creator", connect)
-
- poolclass = pop_kwarg("poolclass", None)
- if poolclass is None:
- poolclass = dialect_cls.get_pool_class(u)
- pool_args = {"dialect": dialect}
-
- # consume pool arguments from kwargs, translating a few of
- # the arguments
- translate = {
- "logging_name": "pool_logging_name",
- "echo": "echo_pool",
- "timeout": "pool_timeout",
- "recycle": "pool_recycle",
- "events": "pool_events",
- "use_threadlocal": "pool_threadlocal",
- "reset_on_return": "pool_reset_on_return",
- "pre_ping": "pool_pre_ping",
- "use_lifo": "pool_use_lifo",
- }
- for k in util.get_cls_kwargs(poolclass):
- tk = translate.get(k, k)
- if tk in kwargs:
- pool_args[k] = pop_kwarg(tk)
-
- for plugin in plugins:
- plugin.handle_pool_kwargs(poolclass, pool_args)
-
- pool = poolclass(creator, **pool_args)
- else:
- if isinstance(pool, poollib.dbapi_proxy._DBProxy):
- pool = pool.get_pool(*cargs, **cparams)
- else:
- pool = pool
-
- pool._dialect = dialect
-
- # create engine.
- engineclass = self.engine_cls
- engine_args = {}
- for k in util.get_cls_kwargs(engineclass):
- if k in kwargs:
- engine_args[k] = pop_kwarg(k)
-
- _initialize = kwargs.pop("_initialize", True)
-
- # all kwargs should be consumed
- if kwargs:
- raise TypeError(
- "Invalid argument(s) %s sent to create_engine(), "
- "using configuration %s/%s/%s. Please check that the "
- "keyword arguments are appropriate for this combination "
- "of components."
- % (
- ",".join("'%s'" % k for k in kwargs),
- dialect.__class__.__name__,
- pool.__class__.__name__,
- engineclass.__name__,
- )
- )
-
- engine = engineclass(pool, dialect, u, **engine_args)
-
- if _initialize:
- do_on_connect = dialect.on_connect()
- if do_on_connect:
-
- def on_connect(dbapi_connection, connection_record):
- conn = getattr(
- dbapi_connection, "_sqla_unwrap", dbapi_connection
- )
- if conn is None:
- return
- do_on_connect(conn)
-
- event.listen(pool, "first_connect", on_connect)
- event.listen(pool, "connect", on_connect)
-
- def first_connect(dbapi_connection, connection_record):
- c = base.Connection(
- engine, connection=dbapi_connection, _has_events=False
- )
- c._execution_options = util.immutabledict()
- dialect.initialize(c)
- dialect.do_rollback(c.connection)
-
- event.listen(pool, "first_connect", first_connect, once=True)
-
- dialect_cls.engine_created(engine)
- if entrypoint is not dialect_cls:
- entrypoint.engine_created(engine)
-
- for plugin in plugins:
- plugin.engine_created(engine)
-
- return engine
-
-
-class PlainEngineStrategy(DefaultEngineStrategy):
- """Strategy for configuring a regular Engine."""
-
- name = "plain"
- engine_cls = base.Engine
-
-
-PlainEngineStrategy()
-
-
-class ThreadLocalEngineStrategy(DefaultEngineStrategy):
- """Strategy for configuring an Engine with threadlocal behavior."""
-
- name = "threadlocal"
- engine_cls = threadlocal.TLEngine
-
-
-ThreadLocalEngineStrategy()
-
-
-class MockEngineStrategy(EngineStrategy):
- """Strategy for configuring an Engine-like object with mocked execution.
-
- Produces a single mock Connectable object which dispatches
- statement execution to a passed-in function.
-
- """
-
- name = "mock"
-
- def create(self, name_or_url, executor, **kwargs):
- # create url.URL object
- u = url.make_url(name_or_url)
-
- dialect_cls = u.get_dialect()
-
- dialect_args = {}
- # consume dialect arguments from kwargs
- for k in util.get_cls_kwargs(dialect_cls):
- if k in kwargs:
- dialect_args[k] = kwargs.pop(k)
-
- # create dialect
- dialect = dialect_cls(**dialect_args)
-
- return MockEngineStrategy.MockConnection(dialect, executor)
-
- class MockConnection(base.Connectable):
- def __init__(self, dialect, execute):
- self._dialect = dialect
- self.execute = execute
-
- engine = property(lambda s: s)
- dialect = property(attrgetter("_dialect"))
- name = property(lambda s: s._dialect.name)
-
- schema_for_object = schema._schema_getter(None)
-
- def contextual_connect(self, **kwargs):
- return self
-
- def connect(self, **kwargs):
- return self
-
- def execution_options(self, **kw):
- return self
-
- def compiler(self, statement, parameters, **kwargs):
- return self._dialect.compiler(
- statement, parameters, engine=self, **kwargs
- )
-
- def create(self, entity, **kwargs):
- kwargs["checkfirst"] = False
- from sqlalchemy.engine import ddl
-
- ddl.SchemaGenerator(self.dialect, self, **kwargs).traverse_single(
- entity
- )
-
- def drop(self, entity, **kwargs):
- kwargs["checkfirst"] = False
- from sqlalchemy.engine import ddl
-
- ddl.SchemaDropper(self.dialect, self, **kwargs).traverse_single(
- entity
- )
-
- def _run_visitor(
- self, visitorcallable, element, connection=None, **kwargs
- ):
- kwargs["checkfirst"] = False
- visitorcallable(self.dialect, self, **kwargs).traverse_single(
- element
- )
-
- def execute(self, object_, *multiparams, **params):
- raise NotImplementedError()
-
-
-MockEngineStrategy()
+class MockEngineStrategy(object):
+ MockConnection = MockConnection
+++ /dev/null
-# engine/threadlocal.py
-# Copyright (C) 2005-2019 the SQLAlchemy authors and contributors
-# <see AUTHORS file>
-#
-# This module is part of SQLAlchemy and is released under
-# the MIT License: http://www.opensource.org/licenses/mit-license.php
-
-"""Provides a thread-local transactional wrapper around the root Engine class.
-
-The ``threadlocal`` module is invoked when using the
-``strategy="threadlocal"`` flag with :func:`~sqlalchemy.engine.create_engine`.
-This module is semi-private and is invoked automatically when the threadlocal
-engine strategy is used.
-"""
-
-import weakref
-
-from . import base
-from .. import util
-
-
-class TLConnection(base.Connection):
- def __init__(self, *arg, **kw):
- super(TLConnection, self).__init__(*arg, **kw)
- self.__opencount = 0
-
- def _increment_connect(self):
- self.__opencount += 1
- return self
-
- def close(self):
- if self.__opencount == 1:
- base.Connection.close(self)
- self.__opencount -= 1
-
- def _force_close(self):
- self.__opencount = 0
- base.Connection.close(self)
-
-
-class TLEngine(base.Engine):
- """An Engine that includes support for thread-local managed
- transactions.
-
- """
-
- _tl_connection_cls = TLConnection
-
- @util.deprecated(
- "1.3",
- "The 'threadlocal' engine strategy is deprecated, and will be "
- "removed in a future release. The strategy is no longer relevant "
- "to modern usage patterns (including that of the ORM "
- ":class:`.Session` object) which make use of a :class:`.Connection` "
- "object in order to invoke statements.",
- )
- def __init__(self, *args, **kwargs):
- super(TLEngine, self).__init__(*args, **kwargs)
- self._connections = util.threading.local()
-
- def contextual_connect(self, **kw):
- return self._contextual_connect(**kw)
-
- def _contextual_connect(self, **kw):
- if not hasattr(self._connections, "conn"):
- connection = None
- else:
- connection = self._connections.conn()
-
- if connection is None or connection.closed:
- # guards against pool-level reapers, if desired.
- # or not connection.connection.is_valid:
- connection = self._tl_connection_cls(
- self,
- self._wrap_pool_connect(self.pool.connect, connection),
- **kw
- )
- self._connections.conn = weakref.ref(connection)
-
- return connection._increment_connect()
-
- def begin_twophase(self, xid=None):
- if not hasattr(self._connections, "trans"):
- self._connections.trans = []
- self._connections.trans.append(
- self._contextual_connect().begin_twophase(xid=xid)
- )
- return self
-
- def begin_nested(self):
- if not hasattr(self._connections, "trans"):
- self._connections.trans = []
- self._connections.trans.append(
- self._contextual_connect().begin_nested()
- )
- return self
-
- def begin(self):
- if not hasattr(self._connections, "trans"):
- self._connections.trans = []
- self._connections.trans.append(self._contextual_connect().begin())
- return self
-
- def __enter__(self):
- return self
-
- def __exit__(self, type_, value, traceback):
- if type_ is None:
- self.commit()
- else:
- self.rollback()
-
- def prepare(self):
- if (
- not hasattr(self._connections, "trans")
- or not self._connections.trans
- ):
- return
- self._connections.trans[-1].prepare()
-
- def commit(self):
- if (
- not hasattr(self._connections, "trans")
- or not self._connections.trans
- ):
- return
- trans = self._connections.trans.pop(-1)
- trans.commit()
-
- def rollback(self):
- if (
- not hasattr(self._connections, "trans")
- or not self._connections.trans
- ):
- return
- trans = self._connections.trans.pop(-1)
- trans.rollback()
-
- def dispose(self):
- self._connections = util.threading.local()
- super(TLEngine, self).dispose()
-
- @property
- def closed(self):
- return (
- not hasattr(self._connections, "conn")
- or self._connections.conn() is None
- or self._connections.conn().closed
- )
-
- def close(self):
- if not self.closed:
- self._contextual_connect().close()
- connection = self._connections.conn()
- connection._force_close()
- del self._connections.conn
- self._connections.trans = []
-
- def __repr__(self):
- return "TLEngine(%r)" % self.url
"given Connection's Engine"
)
else:
- conn = bind._contextual_connect()
+ conn = bind.connect()
if execution_options:
conn = conn.execution_options(**execution_options)
engine, execution_options
)
else:
- conn = engine._contextual_connect(**kw)
+ conn = engine.connect(**kw)
if execution_options:
conn = conn.execution_options(**execution_options)
return conn
_dialect = _ConnDialect()
@util.deprecated_params(
- use_threadlocal=(
- "1.3",
- "The :paramref:`.Pool.use_threadlocal` parameter is "
- "deprecated and will be removed in a future release.",
- ),
listeners=(
"0.7",
":class:`.PoolListener` is deprecated in favor of the "
":class:`.PoolEvents` listener interface. The "
":paramref:`.Pool.listeners` parameter will be removed in a "
"future release.",
- ),
+ )
)
def __init__(
self,
creator,
recycle=-1,
echo=None,
- use_threadlocal=False,
logging_name=None,
reset_on_return=True,
listeners=None,
:ref:`dbengine_logging` - further detail on how to configure
logging.
- :param use_threadlocal: If set to True, repeated calls to
- :meth:`connect` within the same application thread will be
- guaranteed to return the same connection object that is already
- checked out. This is a legacy use case and the flag has no
- effect when using the pool with a :class:`.Engine` object.
-
:param reset_on_return: Determine steps to take on
connections as they are returned to the pool.
reset_on_return can have any of these values:
self._creator = creator
self._recycle = recycle
self._invalidate_time = 0
- self._use_threadlocal = use_threadlocal
self._pre_ping = pre_ping
if reset_on_return in ("rollback", True, reset_rollback):
self._reset_on_return = reset_rollback
"""
interfaces.PoolListener._adapt_listener(self, listener)
- def unique_connection(self):
- """Produce a DBAPI connection that is not referenced by any
- thread-local context.
-
- This method is equivalent to :meth:`.Pool.connect` when the
- :paramref:`.Pool.use_threadlocal` flag is not set to True.
- When :paramref:`.Pool.use_threadlocal` is True, the
- :meth:`.Pool.unique_connection` method provides a means of bypassing
- the threadlocal context.
-
- """
- return _ConnectionFairy._checkout(self)
-
def _create_connection(self):
"""Called by subclasses to create a new ConnectionRecord."""
the pool.
"""
- if not self._use_threadlocal:
- return _ConnectionFairy._checkout(self)
-
- try:
- rec = self._threadconns.current()
- except AttributeError:
- pass
- else:
- if rec is not None:
- return rec._checkout_existing()
-
- return _ConnectionFairy._checkout(self, self._threadconns)
+ return _ConnectionFairy._checkout(self)
def _return_conn(self, record):
"""Given a _ConnectionRecord, return it to the :class:`.Pool`.
has its ``close()`` method called.
"""
- if self._use_threadlocal:
- try:
- del self._threadconns.current
- except AttributeError:
- pass
self._do_return_conn(record)
def _do_get(self):
recycle=self._recycle,
echo=self.echo,
logging_name=self._orig_logging_name,
- use_threadlocal=self._use_threadlocal,
reset_on_return=self._reset_on_return,
_dispatch=self.dispatch,
dialect=self._dialect,
recycle=self._recycle,
echo=self.echo,
logging_name=self._orig_logging_name,
- use_threadlocal=self._use_threadlocal,
reset_on_return=self._reset_on_return,
_dispatch=self.dispatch,
dialect=self._dialect,
recycle=self._recycle,
echo=self.echo,
logging_name=self._orig_logging_name,
- use_threadlocal=self._use_threadlocal,
reset_on_return=self._reset_on_return,
_dispatch=self.dispatch,
dialect=self._dialect,
return c
def connect(self):
- # vendored from Pool to include use_threadlocal behavior
+ # vendored from Pool to include the now removed use_threadlocal
+ # behavior
try:
rec = self._fairy.current()
except AttributeError:
return self.__class__(
creator=self._creator,
recycle=self._recycle,
- use_threadlocal=self._use_threadlocal,
reset_on_return=self._reset_on_return,
echo=self.echo,
logging_name=self._orig_logging_name,
"""
- from sqlalchemy import create_engine
+ from sqlalchemy import create_mock_engine
if not dialect_name:
dialect_name = config.db.name
d = engine.dialect
return "\n".join(str(s.compile(dialect=d)) for s in engine.mock)
- engine = create_engine(
- dialect_name + "://", strategy="mock", executor=executor
+ engine = create_mock_engine(
+ dialect_name + "://", executor
)
assert not hasattr(engine, "mock")
engine.mock = buffer
import sqlalchemy as tsa
from sqlalchemy import create_engine
+from sqlalchemy import create_mock_engine
from sqlalchemy import event
from sqlalchemy import Integer
from sqlalchemy import MetaData
def executor(*a, **kw):
return None
- engine = create_engine(
- testing.db.name + "://", strategy="mock", executor=executor
+ engine = create_mock_engine(
+ testing.db.name + "://", executor
)
# fmt: off
engine.dialect.identifier_preparer = \
import re
-import time
import sqlalchemy as tsa
from sqlalchemy import column
from sqlalchemy import create_engine
-from sqlalchemy import engine_from_config
from sqlalchemy import event
from sqlalchemy import ForeignKey
from sqlalchemy import func
from sqlalchemy import inspect
-from sqlalchemy import INT
from sqlalchemy import Integer
from sqlalchemy import literal
from sqlalchemy import MetaData
from sqlalchemy import pool
from sqlalchemy import select
-from sqlalchemy import Sequence
from sqlalchemy import String
from sqlalchemy import testing
from sqlalchemy import text
from sqlalchemy import TypeDecorator
-from sqlalchemy import VARCHAR
from sqlalchemy.engine.base import Engine
+from sqlalchemy.engine.mock import MockConnection
from sqlalchemy.interfaces import ConnectionProxy
from sqlalchemy.testing import assert_raises_message
from sqlalchemy.testing import engines
from sqlalchemy.testing import eq_
from sqlalchemy.testing import fixtures
-from sqlalchemy.testing.engines import testing_engine
-from sqlalchemy.testing.mock import call
from sqlalchemy.testing.mock import Mock
from sqlalchemy.testing.schema import Column
from sqlalchemy.testing.schema import Table
-from sqlalchemy.testing.util import gc_collect
from sqlalchemy.testing.util import lazy_gc
-from .test_parseconnect import mock_dbapi
-
-tlengine = None
class SomeException(Exception):
pass
-def _tlengine_deprecated():
- return testing.expect_deprecated(
- "The 'threadlocal' engine strategy is deprecated"
- )
+class CreateEngineTest(fixtures.TestBase):
+ def test_strategy_keyword_mock(self):
+ def executor(x, y):
+ pass
+
+ with testing.expect_deprecated(
+ "The create_engine.strategy keyword is deprecated, and the "
+ "only argument accepted is 'mock'"
+ ):
+ e = create_engine(
+ "postgresql://", strategy="mock", executor=executor
+ )
+
+ assert isinstance(e, MockConnection)
+
+ def test_strategy_keyword_unknown(self):
+ with testing.expect_deprecated(
+ "The create_engine.strategy keyword is deprecated, and the "
+ "only argument accepted is 'mock'"
+ ):
+ assert_raises_message(
+ tsa.exc.ArgumentError,
+ "unknown strategy: 'threadlocal'",
+ create_engine,
+ "postgresql://",
+ strategy="threadlocal",
+ )
class TableNamesOrderByTest(fixtures.TestBase):
eq_(tnames, ["t1", "t2", "t3"])
-class CreateEngineTest(fixtures.TestBase):
- def test_pool_threadlocal_from_config(self):
- dbapi = mock_dbapi
-
- config = {
- "sqlalchemy.url": "postgresql://scott:tiger@somehost/test",
- "sqlalchemy.pool_threadlocal": "false",
- }
-
- e = engine_from_config(config, module=dbapi, _initialize=False)
- eq_(e.pool._use_threadlocal, False)
-
- config = {
- "sqlalchemy.url": "postgresql://scott:tiger@somehost/test",
- "sqlalchemy.pool_threadlocal": "true",
- }
-
- with testing.expect_deprecated(
- "The Pool.use_threadlocal parameter is deprecated"
- ):
- e = engine_from_config(config, module=dbapi, _initialize=False)
- eq_(e.pool._use_threadlocal, True)
-
-
-class RecycleTest(fixtures.TestBase):
- __backend__ = True
-
- def test_basic(self):
- with testing.expect_deprecated(
- "The Pool.use_threadlocal parameter is deprecated"
- ):
- engine = engines.reconnecting_engine(
- options={"pool_threadlocal": True}
- )
-
- with testing.expect_deprecated(
- r"The Engine.contextual_connect\(\) method is deprecated"
- ):
- conn = engine.contextual_connect()
- eq_(conn.execute(select([1])).scalar(), 1)
- conn.close()
-
- # set the pool recycle down to 1.
- # we aren't doing this inline with the
- # engine create since cx_oracle takes way
- # too long to create the 1st connection and don't
- # want to build a huge delay into this test.
-
- engine.pool._recycle = 1
-
- # kill the DB connection
- engine.test_shutdown()
-
- # wait until past the recycle period
- time.sleep(2)
-
- # can connect, no exception
- with testing.expect_deprecated(
- r"The Engine.contextual_connect\(\) method is deprecated"
- ):
- conn = engine.contextual_connect()
- eq_(conn.execute(select([1])).scalar(), 1)
- conn.close()
-
-
-class TLTransactionTest(fixtures.TestBase):
- __requires__ = ("ad_hoc_engines",)
- __backend__ = True
-
- @classmethod
- def setup_class(cls):
- global users, metadata, tlengine
-
- with _tlengine_deprecated():
- tlengine = testing_engine(options=dict(strategy="threadlocal"))
- metadata = MetaData()
- users = Table(
- "query_users",
- metadata,
- Column(
- "user_id",
- INT,
- Sequence("query_users_id_seq", optional=True),
- primary_key=True,
- ),
- Column("user_name", VARCHAR(20)),
- test_needs_acid=True,
- )
- metadata.create_all(tlengine)
-
- def teardown(self):
- tlengine.execute(users.delete()).close()
-
- @classmethod
- def teardown_class(cls):
- tlengine.close()
- metadata.drop_all(tlengine)
- tlengine.dispose()
-
- def setup(self):
-
- # ensure tests start with engine closed
-
- tlengine.close()
-
- @testing.crashes(
- "oracle", "TNS error of unknown origin occurs on the buildbot."
- )
- def test_rollback_no_trans(self):
- with _tlengine_deprecated():
- tlengine = testing_engine(options=dict(strategy="threadlocal"))
-
- # shouldn't fail
- tlengine.rollback()
-
- tlengine.begin()
- tlengine.rollback()
-
- # shouldn't fail
- tlengine.rollback()
-
- def test_commit_no_trans(self):
- with _tlengine_deprecated():
- tlengine = testing_engine(options=dict(strategy="threadlocal"))
-
- # shouldn't fail
- tlengine.commit()
-
- tlengine.begin()
- tlengine.rollback()
-
- # shouldn't fail
- tlengine.commit()
-
- def test_prepare_no_trans(self):
- with _tlengine_deprecated():
- tlengine = testing_engine(options=dict(strategy="threadlocal"))
-
- # shouldn't fail
- tlengine.prepare()
-
- tlengine.begin()
- tlengine.rollback()
-
- # shouldn't fail
- tlengine.prepare()
-
- def test_connection_close(self):
- """test that when connections are closed for real, transactions
- are rolled back and disposed."""
-
- c = tlengine.contextual_connect()
- c.begin()
- assert c.in_transaction()
- c.close()
- assert not c.in_transaction()
-
- def test_transaction_close(self):
- c = tlengine.contextual_connect()
- t = c.begin()
- tlengine.execute(users.insert(), user_id=1, user_name="user1")
- tlengine.execute(users.insert(), user_id=2, user_name="user2")
- t2 = c.begin()
- tlengine.execute(users.insert(), user_id=3, user_name="user3")
- tlengine.execute(users.insert(), user_id=4, user_name="user4")
- t2.close()
- result = c.execute("select * from query_users")
- assert len(result.fetchall()) == 4
- t.close()
- external_connection = tlengine.connect()
- result = external_connection.execute("select * from query_users")
- try:
- assert len(result.fetchall()) == 0
- finally:
- c.close()
- external_connection.close()
-
- def test_rollback(self):
- """test a basic rollback"""
-
- tlengine.begin()
- tlengine.execute(users.insert(), user_id=1, user_name="user1")
- tlengine.execute(users.insert(), user_id=2, user_name="user2")
- tlengine.execute(users.insert(), user_id=3, user_name="user3")
- tlengine.rollback()
- external_connection = tlengine.connect()
- result = external_connection.execute("select * from query_users")
- try:
- assert len(result.fetchall()) == 0
- finally:
- external_connection.close()
-
- def test_commit(self):
- """test a basic commit"""
-
- tlengine.begin()
- tlengine.execute(users.insert(), user_id=1, user_name="user1")
- tlengine.execute(users.insert(), user_id=2, user_name="user2")
- tlengine.execute(users.insert(), user_id=3, user_name="user3")
- tlengine.commit()
- external_connection = tlengine.connect()
- result = external_connection.execute("select * from query_users")
- try:
- assert len(result.fetchall()) == 3
- finally:
- external_connection.close()
-
- def test_with_interface(self):
- trans = tlengine.begin()
- tlengine.execute(users.insert(), user_id=1, user_name="user1")
- tlengine.execute(users.insert(), user_id=2, user_name="user2")
- trans.commit()
-
- trans = tlengine.begin()
- tlengine.execute(users.insert(), user_id=3, user_name="user3")
- trans.__exit__(Exception, "fake", None)
- trans = tlengine.begin()
- tlengine.execute(users.insert(), user_id=4, user_name="user4")
- trans.__exit__(None, None, None)
- eq_(
- tlengine.execute(
- users.select().order_by(users.c.user_id)
- ).fetchall(),
- [(1, "user1"), (2, "user2"), (4, "user4")],
- )
-
- def test_commits(self):
- connection = tlengine.connect()
- assert (
- connection.execute("select count(*) from query_users").scalar()
- == 0
- )
- connection.close()
- connection = tlengine.contextual_connect()
- transaction = connection.begin()
- connection.execute(users.insert(), user_id=1, user_name="user1")
- transaction.commit()
- transaction = connection.begin()
- connection.execute(users.insert(), user_id=2, user_name="user2")
- connection.execute(users.insert(), user_id=3, user_name="user3")
- transaction.commit()
- transaction = connection.begin()
- result = connection.execute("select * from query_users")
- rows = result.fetchall()
- assert len(rows) == 3, "expected 3 got %d" % len(rows)
- transaction.commit()
- connection.close()
-
- def test_rollback_off_conn(self):
-
- # test that a TLTransaction opened off a TLConnection allows
- # that TLConnection to be aware of the transactional context
-
- conn = tlengine.contextual_connect()
- trans = conn.begin()
- conn.execute(users.insert(), user_id=1, user_name="user1")
- conn.execute(users.insert(), user_id=2, user_name="user2")
- conn.execute(users.insert(), user_id=3, user_name="user3")
- trans.rollback()
- external_connection = tlengine.connect()
- result = external_connection.execute("select * from query_users")
- try:
- assert len(result.fetchall()) == 0
- finally:
- conn.close()
- external_connection.close()
-
- def test_morerollback_off_conn(self):
-
- # test that an existing TLConnection automatically takes place
- # in a TLTransaction opened on a second TLConnection
-
- conn = tlengine.contextual_connect()
- conn2 = tlengine.contextual_connect()
- trans = conn2.begin()
- conn.execute(users.insert(), user_id=1, user_name="user1")
- conn.execute(users.insert(), user_id=2, user_name="user2")
- conn.execute(users.insert(), user_id=3, user_name="user3")
- trans.rollback()
- external_connection = tlengine.connect()
- result = external_connection.execute("select * from query_users")
- try:
- assert len(result.fetchall()) == 0
- finally:
- conn.close()
- conn2.close()
- external_connection.close()
-
- def test_commit_off_connection(self):
- conn = tlengine.contextual_connect()
- trans = conn.begin()
- conn.execute(users.insert(), user_id=1, user_name="user1")
- conn.execute(users.insert(), user_id=2, user_name="user2")
- conn.execute(users.insert(), user_id=3, user_name="user3")
- trans.commit()
- external_connection = tlengine.connect()
- result = external_connection.execute("select * from query_users")
- try:
- assert len(result.fetchall()) == 3
- finally:
- conn.close()
- external_connection.close()
-
- def test_nesting_rollback(self):
- """tests nesting of transactions, rollback at the end"""
-
- external_connection = tlengine.connect()
- self.assert_(
- external_connection.connection
- is not tlengine.contextual_connect().connection
- )
- tlengine.begin()
- tlengine.execute(users.insert(), user_id=1, user_name="user1")
- tlengine.execute(users.insert(), user_id=2, user_name="user2")
- tlengine.execute(users.insert(), user_id=3, user_name="user3")
- tlengine.begin()
- tlengine.execute(users.insert(), user_id=4, user_name="user4")
- tlengine.execute(users.insert(), user_id=5, user_name="user5")
- tlengine.commit()
- tlengine.rollback()
- try:
- self.assert_(
- external_connection.scalar("select count(*) from query_users")
- == 0
- )
- finally:
- external_connection.close()
-
- def test_nesting_commit(self):
- """tests nesting of transactions, commit at the end."""
-
- external_connection = tlengine.connect()
- self.assert_(
- external_connection.connection
- is not tlengine.contextual_connect().connection
- )
- tlengine.begin()
- tlengine.execute(users.insert(), user_id=1, user_name="user1")
- tlengine.execute(users.insert(), user_id=2, user_name="user2")
- tlengine.execute(users.insert(), user_id=3, user_name="user3")
- tlengine.begin()
- tlengine.execute(users.insert(), user_id=4, user_name="user4")
- tlengine.execute(users.insert(), user_id=5, user_name="user5")
- tlengine.commit()
- tlengine.commit()
- try:
- self.assert_(
- external_connection.scalar("select count(*) from query_users")
- == 5
- )
- finally:
- external_connection.close()
-
- def test_mixed_nesting(self):
- """tests nesting of transactions off the TLEngine directly
- inside of transactions off the connection from the TLEngine"""
-
- external_connection = tlengine.connect()
- self.assert_(
- external_connection.connection
- is not tlengine.contextual_connect().connection
- )
- conn = tlengine.contextual_connect()
- trans = conn.begin()
- trans2 = conn.begin()
- tlengine.execute(users.insert(), user_id=1, user_name="user1")
- tlengine.execute(users.insert(), user_id=2, user_name="user2")
- tlengine.execute(users.insert(), user_id=3, user_name="user3")
- tlengine.begin()
- tlengine.execute(users.insert(), user_id=4, user_name="user4")
- tlengine.begin()
- tlengine.execute(users.insert(), user_id=5, user_name="user5")
- tlengine.execute(users.insert(), user_id=6, user_name="user6")
- tlengine.execute(users.insert(), user_id=7, user_name="user7")
- tlengine.commit()
- tlengine.execute(users.insert(), user_id=8, user_name="user8")
- tlengine.commit()
- trans2.commit()
- trans.rollback()
- conn.close()
- try:
- self.assert_(
- external_connection.scalar("select count(*) from query_users")
- == 0
- )
- finally:
- external_connection.close()
-
- def test_more_mixed_nesting(self):
- """tests nesting of transactions off the connection from the
- TLEngine inside of transactions off the TLEngine directly."""
-
- external_connection = tlengine.connect()
- self.assert_(
- external_connection.connection
- is not tlengine.contextual_connect().connection
- )
- tlengine.begin()
- connection = tlengine.contextual_connect()
- connection.execute(users.insert(), user_id=1, user_name="user1")
- tlengine.begin()
- connection.execute(users.insert(), user_id=2, user_name="user2")
- connection.execute(users.insert(), user_id=3, user_name="user3")
- trans = connection.begin()
- connection.execute(users.insert(), user_id=4, user_name="user4")
- connection.execute(users.insert(), user_id=5, user_name="user5")
- trans.commit()
- tlengine.commit()
- tlengine.rollback()
- connection.close()
- try:
- self.assert_(
- external_connection.scalar("select count(*) from query_users")
- == 0
- )
- finally:
- external_connection.close()
-
- @testing.requires.savepoints
- def test_nested_subtransaction_rollback(self):
- tlengine.begin()
- tlengine.execute(users.insert(), user_id=1, user_name="user1")
- tlengine.begin_nested()
- tlengine.execute(users.insert(), user_id=2, user_name="user2")
- tlengine.rollback()
- tlengine.execute(users.insert(), user_id=3, user_name="user3")
- tlengine.commit()
- tlengine.close()
- eq_(
- tlengine.execute(
- select([users.c.user_id]).order_by(users.c.user_id)
- ).fetchall(),
- [(1,), (3,)],
- )
- tlengine.close()
-
- @testing.requires.savepoints
- @testing.crashes(
- "oracle+zxjdbc",
- "Errors out and causes subsequent tests to " "deadlock",
- )
- def test_nested_subtransaction_commit(self):
- tlengine.begin()
- tlengine.execute(users.insert(), user_id=1, user_name="user1")
- tlengine.begin_nested()
- tlengine.execute(users.insert(), user_id=2, user_name="user2")
- tlengine.commit()
- tlengine.execute(users.insert(), user_id=3, user_name="user3")
- tlengine.commit()
- tlengine.close()
- eq_(
- tlengine.execute(
- select([users.c.user_id]).order_by(users.c.user_id)
- ).fetchall(),
- [(1,), (2,), (3,)],
- )
- tlengine.close()
-
- @testing.requires.savepoints
- def test_rollback_to_subtransaction(self):
- tlengine.begin()
- tlengine.execute(users.insert(), user_id=1, user_name="user1")
- tlengine.begin_nested()
- tlengine.execute(users.insert(), user_id=2, user_name="user2")
- tlengine.begin()
- tlengine.execute(users.insert(), user_id=3, user_name="user3")
- tlengine.rollback()
- tlengine.rollback()
- tlengine.execute(users.insert(), user_id=4, user_name="user4")
- tlengine.commit()
- tlengine.close()
- eq_(
- tlengine.execute(
- select([users.c.user_id]).order_by(users.c.user_id)
- ).fetchall(),
- [(1,), (4,)],
- )
- tlengine.close()
-
- def test_connections(self):
- """tests that contextual_connect is threadlocal"""
-
- c1 = tlengine.contextual_connect()
- c2 = tlengine.contextual_connect()
- assert c1.connection is c2.connection
- c2.close()
- assert not c1.closed
- assert not tlengine.closed
-
- @testing.requires.independent_cursors
- def test_result_closing(self):
- """tests that contextual_connect is threadlocal"""
-
- r1 = tlengine.execute(select([1]))
- r2 = tlengine.execute(select([1]))
- r1.fetchone()
- r2.fetchone()
- r1.close()
- assert r2.connection is r1.connection
- assert not r2.connection.closed
- assert not tlengine.closed
-
- # close again, nothing happens since resultproxy calls close()
- # only once
-
- r1.close()
- assert r2.connection is r1.connection
- assert not r2.connection.closed
- assert not tlengine.closed
- r2.close()
- assert r2.connection.closed
- assert tlengine.closed
-
- @testing.crashes(
- "oracle+cx_oracle", "intermittent failures on the buildbot"
- )
- def test_dispose(self):
- with _tlengine_deprecated():
- eng = testing_engine(options=dict(strategy="threadlocal"))
- eng.execute(select([1]))
- eng.dispose()
- eng.execute(select([1]))
-
- @testing.requires.two_phase_transactions
- def test_two_phase_transaction(self):
- tlengine.begin_twophase()
- tlengine.execute(users.insert(), user_id=1, user_name="user1")
- tlengine.prepare()
- tlengine.commit()
- tlengine.begin_twophase()
- tlengine.execute(users.insert(), user_id=2, user_name="user2")
- tlengine.commit()
- tlengine.begin_twophase()
- tlengine.execute(users.insert(), user_id=3, user_name="user3")
- tlengine.rollback()
- tlengine.begin_twophase()
- tlengine.execute(users.insert(), user_id=4, user_name="user4")
- tlengine.prepare()
- tlengine.rollback()
- eq_(
- tlengine.execute(
- select([users.c.user_id]).order_by(users.c.user_id)
- ).fetchall(),
- [(1,), (2,)],
- )
-
-
-class ConvenienceExecuteTest(fixtures.TablesTest):
- __backend__ = True
-
- @classmethod
- def define_tables(cls, metadata):
- cls.table = Table(
- "exec_test",
- metadata,
- Column("a", Integer),
- Column("b", Integer),
- test_needs_acid=True,
- )
-
- def _trans_fn(self, is_transaction=False):
- def go(conn, x, value=None):
- if is_transaction:
- conn = conn.connection
- conn.execute(self.table.insert().values(a=x, b=value))
-
- return go
-
- def _trans_rollback_fn(self, is_transaction=False):
- def go(conn, x, value=None):
- if is_transaction:
- conn = conn.connection
- conn.execute(self.table.insert().values(a=x, b=value))
- raise SomeException("breakage")
-
- return go
-
- def _assert_no_data(self):
- eq_(
- testing.db.scalar(
- select([func.count("*")]).select_from(self.table)
- ),
- 0,
- )
-
- def _assert_fn(self, x, value=None):
- eq_(testing.db.execute(self.table.select()).fetchall(), [(x, value)])
-
- def test_transaction_tlocal_engine_ctx_commit(self):
- fn = self._trans_fn()
- with _tlengine_deprecated():
- engine = engines.testing_engine(
- options=dict(strategy="threadlocal", pool=testing.db.pool)
- )
- ctx = engine.begin()
- testing.run_as_contextmanager(ctx, fn, 5, value=8)
- self._assert_fn(5, value=8)
-
- def test_transaction_tlocal_engine_ctx_rollback(self):
- fn = self._trans_rollback_fn()
- with _tlengine_deprecated():
- engine = engines.testing_engine(
- options=dict(strategy="threadlocal", pool=testing.db.pool)
- )
- ctx = engine.begin()
- assert_raises_message(
- Exception,
- "breakage",
- testing.run_as_contextmanager,
- ctx,
- fn,
- 5,
- value=8,
- )
- self._assert_no_data()
-
-
def _proxy_execute_deprecated():
return (
testing.expect_deprecated("ConnectionProxy.execute is deprecated."),
options=dict(implicit_returning=False, proxy=MyProxy())
)
- with testing.expect_deprecated(
- "ConnectionProxy.execute is deprecated.",
- "ConnectionProxy.cursor_execute is deprecated.",
- "The 'threadlocal' engine strategy is deprecated",
- ):
-
- tl_engine = engines.testing_engine(
- options=dict(
- implicit_returning=False,
- proxy=MyProxy(),
- strategy="threadlocal",
- )
- )
-
- for engine in (plain_engine, tl_engine):
+ for engine in (plain_engine,):
m = MetaData(engine)
t1 = Table(
"t1",
self.dbapi = dbapi
self.ProgrammingError = sqlite3.ProgrammingError
- def test_dont_touch_non_dbapi_exception_on_contextual_connect(self):
- dbapi = self.dbapi
- dbapi.connect = Mock(side_effect=TypeError("I'm not a DBAPI error"))
-
- e = create_engine("sqlite://", module=dbapi)
- e.dialect.is_disconnect = is_disconnect = Mock()
- with testing.expect_deprecated(
- r"The Engine.contextual_connect\(\) method is deprecated"
- ):
- assert_raises_message(
- TypeError, "I'm not a DBAPI error", e.contextual_connect
- )
- eq_(is_disconnect.call_count, 0)
-
- def test_invalidate_on_contextual_connect(self):
- """test that is_disconnect() is called during connect.
-
- interpretation of connection failures are not supported by
- every backend.
-
- """
-
- dbapi = self.dbapi
- dbapi.connect = Mock(
- side_effect=self.ProgrammingError(
- "Cannot operate on a closed database."
- )
- )
- e = create_engine("sqlite://", module=dbapi)
- try:
- with testing.expect_deprecated(
- r"The Engine.contextual_connect\(\) method is deprecated"
- ):
- e.contextual_connect()
- assert False
- except tsa.exc.DBAPIError as de:
- assert de.connection_invalidated
-
class HandleErrorTest(fixtures.TestBase):
__requires__ = ("ad_hoc_engines",)
assert counts == [1, 2, 2]
-class PoolTest(PoolTestBase):
- def test_manager(self):
- with testing.expect_deprecated(
- r"The pool.manage\(\) function is deprecated,"
- ):
- manager = pool.manage(MockDBAPI(), use_threadlocal=True)
-
- with testing.expect_deprecated(
- r".*Pool.use_threadlocal parameter is deprecated"
- ):
- c1 = manager.connect("foo.db")
- c2 = manager.connect("foo.db")
- c3 = manager.connect("bar.db")
- c4 = manager.connect("foo.db", bar="bat")
- c5 = manager.connect("foo.db", bar="hoho")
- c6 = manager.connect("foo.db", bar="bat")
-
- assert c1.cursor() is not None
- assert c1 is c2
- assert c1 is not c3
- assert c4 is c6
- assert c4 is not c5
-
- def test_manager_with_key(self):
-
- dbapi = MockDBAPI()
-
- with testing.expect_deprecated(
- r"The pool.manage\(\) function is deprecated,"
- ):
- manager = pool.manage(dbapi, use_threadlocal=True)
-
- with testing.expect_deprecated(
- r".*Pool.use_threadlocal parameter is deprecated"
- ):
- c1 = manager.connect("foo.db", sa_pool_key="a")
- c2 = manager.connect("foo.db", sa_pool_key="b")
- c3 = manager.connect("bar.db", sa_pool_key="a")
-
- assert c1.cursor() is not None
- assert c1 is not c2
- assert c1 is c3
-
- eq_(dbapi.connect.mock_calls, [call("foo.db"), call("foo.db")])
-
- def test_bad_args(self):
- with testing.expect_deprecated(
- r"The pool.manage\(\) function is deprecated,"
- ):
- manager = pool.manage(MockDBAPI())
- manager.connect(None)
-
- def test_non_thread_local_manager(self):
- with testing.expect_deprecated(
- r"The pool.manage\(\) function is deprecated,"
- ):
- manager = pool.manage(MockDBAPI(), use_threadlocal=False)
-
- connection = manager.connect("foo.db")
- connection2 = manager.connect("foo.db")
-
- self.assert_(connection.cursor() is not None)
- self.assert_(connection is not connection2)
-
- def test_threadlocal_del(self):
- self._do_testthreadlocal(useclose=False)
-
- def test_threadlocal_close(self):
- self._do_testthreadlocal(useclose=True)
-
- def _do_testthreadlocal(self, useclose=False):
- dbapi = MockDBAPI()
-
- with testing.expect_deprecated(
- r".*Pool.use_threadlocal parameter is deprecated"
- ):
- for p in (
- pool.QueuePool(
- creator=dbapi.connect,
- pool_size=3,
- max_overflow=-1,
- use_threadlocal=True,
- ),
- pool.SingletonThreadPool(
- creator=dbapi.connect, use_threadlocal=True
- ),
- ):
- c1 = p.connect()
- c2 = p.connect()
- self.assert_(c1 is c2)
- c3 = p.unique_connection()
- self.assert_(c3 is not c1)
- if useclose:
- c2.close()
- else:
- c2 = None
- c2 = p.connect()
- self.assert_(c1 is c2)
- self.assert_(c3 is not c1)
- if useclose:
- c2.close()
- else:
- c2 = None
- lazy_gc()
- if useclose:
- c1 = p.connect()
- c2 = p.connect()
- c3 = p.connect()
- c3.close()
- c2.close()
- self.assert_(c1.connection is not None)
- c1.close()
- c1 = c2 = c3 = None
-
- # extra tests with QueuePool to ensure connections get
- # __del__()ed when dereferenced
-
- if isinstance(p, pool.QueuePool):
- lazy_gc()
- self.assert_(p.checkedout() == 0)
- c1 = p.connect()
- c2 = p.connect()
- if useclose:
- c2.close()
- c1.close()
- else:
- c2 = None
- c1 = None
- lazy_gc()
- self.assert_(p.checkedout() == 0)
-
- def test_mixed_close(self):
- pool._refs.clear()
- with testing.expect_deprecated(
- r".*Pool.use_threadlocal parameter is deprecated"
- ):
- p = self._queuepool_fixture(
- pool_size=3, max_overflow=-1, use_threadlocal=True
- )
- c1 = p.connect()
- c2 = p.connect()
- assert c1 is c2
- c1.close()
- c2 = None
- assert p.checkedout() == 1
- c1 = None
- lazy_gc()
- assert p.checkedout() == 0
- lazy_gc()
- assert not pool._refs
-
-
-class QueuePoolTest(PoolTestBase):
- def test_threadfairy(self):
- with testing.expect_deprecated(
- r".*Pool.use_threadlocal parameter is deprecated"
- ):
- p = self._queuepool_fixture(
- pool_size=3, max_overflow=-1, use_threadlocal=True
- )
- c1 = p.connect()
- c1.close()
- c2 = p.connect()
- assert c2.connection is not None
-
- def test_trick_the_counter(self):
- """this is a "flaw" in the connection pool; since threadlocal
- uses a single ConnectionFairy per thread with an open/close
- counter, you can fool the counter into giving you a
- ConnectionFairy with an ambiguous counter. i.e. its not true
- reference counting."""
-
- with testing.expect_deprecated(
- r".*Pool.use_threadlocal parameter is deprecated"
- ):
- p = self._queuepool_fixture(
- pool_size=3, max_overflow=-1, use_threadlocal=True
- )
- c1 = p.connect()
- c2 = p.connect()
- assert c1 is c2
- c1.close()
- c2 = p.connect()
- c2.close()
- self.assert_(p.checkedout() != 0)
- c2.close()
- self.assert_(p.checkedout() == 0)
-
- @testing.requires.predictable_gc
- def test_weakref_kaboom(self):
- with testing.expect_deprecated(
- r".*Pool.use_threadlocal parameter is deprecated"
- ):
- p = self._queuepool_fixture(
- pool_size=3, max_overflow=-1, use_threadlocal=True
- )
- c1 = p.connect()
- c2 = p.connect()
- c1.close()
- c2 = None
- del c1
- del c2
- gc_collect()
- assert p.checkedout() == 0
- c3 = p.connect()
- assert c3 is not None
-
-
class ExplicitAutoCommitDeprecatedTest(fixtures.TestBase):
"""test the 'autocommit' flag on select() and text() objects.
import sqlalchemy as tsa
from sqlalchemy import bindparam
from sqlalchemy import create_engine
+from sqlalchemy import create_mock_engine
from sqlalchemy import event
from sqlalchemy import func
from sqlalchemy import INT
def dump(sql, *multiparams, **params):
buf.write(util.text_type(sql.compile(dialect=engine.dialect)))
- engine = create_engine("postgresql://", strategy="mock", executor=dump)
+ engine = create_mock_engine("postgresql://", executor=dump)
return engine, buf
def test_sequence_not_duped(self):
):
cursor_stmts.append((str(statement), parameters, None))
- with testing.expect_deprecated(
- "The 'threadlocal' engine strategy is deprecated"
- ):
- tl_engine = engines.testing_engine(
- options=dict(implicit_returning=False, strategy="threadlocal")
- )
-
for engine in [
engines.testing_engine(options=dict(implicit_returning=False)),
- tl_engine,
engines.testing_engine(
options=dict(implicit_returning=False)
).connect(),
creator=lambda: dbapi.connect(delay=0.05),
pool_size=2,
max_overflow=1,
- use_threadlocal=False,
timeout=3,
)
timeouts = []
p2 = p.recreate()
assert p2.size() == 1
assert p2._reset_on_return is pool.reset_none
- assert p2._use_threadlocal is False
assert p2._max_overflow == 0
def test_reconnect(self):
conn.close()
+ @testing.requires.independent_connections
def test_multiple_invalidate(self):
c1 = self.engine.connect()
c2 = self.engine.connect()
from sqlalchemy.testing import assert_raises_message
from sqlalchemy.testing import assertions
from sqlalchemy.testing import AssertsCompiledSQL
-from sqlalchemy.testing import engines
from sqlalchemy.testing import eq_
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import is_
]
-class TLTransactionTest(fixtures.MappedTest):
- run_dispose_bind = "once"
- __backend__ = True
-
- @classmethod
- def setup_bind(cls):
- with testing.expect_deprecated(
- ".*'threadlocal' engine strategy is deprecated"
- ):
- return engines.testing_engine(options=dict(strategy="threadlocal"))
-
- @classmethod
- def define_tables(cls, metadata):
- Table(
- "users",
- metadata,
- Column(
- "id", Integer, primary_key=True, test_needs_autoincrement=True
- ),
- Column("name", String(20)),
- test_needs_acid=True,
- )
-
- @classmethod
- def setup_classes(cls):
- class User(cls.Basic):
- pass
-
- @classmethod
- def setup_mappers(cls):
- users, User = cls.tables.users, cls.classes.User
-
- mapper(User, users)
-
- @testing.exclude("mysql", "<", (5, 0, 3), "FIXME: unknown")
- def test_session_nesting(self):
- User = self.classes.User
-
- sess = create_session(bind=self.bind)
- self.bind.begin()
- u = User(name="ed")
- sess.add(u)
- sess.flush()
- self.bind.commit()
-
-
class DeprecatedSessionFeatureTest(_fixtures.FixtureTest):
run_inserts = None
eq_(
bind.mock_calls,
[
- mock.call._contextual_connect(),
- mock.call._contextual_connect().execution_options(
+ mock.call.connect(),
+ mock.call.connect().execution_options(
isolation_level="FOO"
),
- mock.call._contextual_connect().execution_options().begin(),
+ mock.call.connect().execution_options().begin(),
],
)
- eq_(c1, bind._contextual_connect().execution_options())
+ eq_(c1, bind.connect().execution_options())
def test_execution_options_ignored_mid_transaction(self):
bind = mock.Mock()
conn = mock.Mock(engine=bind)
- bind._contextual_connect = mock.Mock(return_value=conn)
+ bind.connect = mock.Mock(return_value=conn)
sess = Session(bind=bind)
sess.execute("select 1")
with expect_warnings(