--- /dev/null
+.. change::
+ :tags: change, engine
+ :tickets: 7122
+
+ Some small API changes regarding engines and dialects:
+
+ * The :meth:`.Dialect.set_isolation_level`, :meth:`.Dialect.get_isolation_level`,
+ :meth:
+ dialect methods will always be passed the raw DBAPI connection
+
+ * The :class:`.Connection` and :class:`.Engine` classes no longer share a base
+ ``Connectable`` superclass, which has been removed.
+
+ * Added a new interface class :class:`.PoolProxiedConnection` - this is the
+ public facing interface for the familiar :class:`._ConnectionFairy`
+ class which is nonetheless a private class.
.. autoclass:: sqlalchemy.engine.Compiled
:members:
+.. autoclass:: sqlalchemy.engine.interfaces.DBAPIConnection
+ :members:
+ :undoc-members:
+
+.. autoclass:: sqlalchemy.engine.interfaces.DBAPICursor
+ :members:
+ :undoc-members:
+
+.. autoclass:: sqlalchemy.engine.interfaces.DBAPIType
+ :members:
+ :undoc-members:
+
.. autoclass:: sqlalchemy.sql.compiler.DDLCompiler
:members:
:inherited-members:
.. autoclass:: StaticPool
+.. autoclass:: PoolProxiedConnection
+ :members:
+
.. autoclass:: _ConnectionFairy
:members:
:members:
:undoc-members:
+.. autoclass:: sqlalchemy.engine.interfaces.ReflectedColumn
+ :members:
+ :inherited-members: dict
+
+.. autoclass:: sqlalchemy.engine.interfaces.ReflectedComputed
+ :members:
+ :inherited-members: dict
+
+.. autoclass:: sqlalchemy.engine.interfaces.ReflectedCheckConstraint
+ :members:
+ :inherited-members: dict
+
+.. autoclass:: sqlalchemy.engine.interfaces.ReflectedForeignKeyConstraint
+ :members:
+ :inherited-members: dict
+
+.. autoclass:: sqlalchemy.engine.interfaces.ReflectedIdentity
+ :members:
+ :inherited-members: dict
+
+.. autoclass:: sqlalchemy.engine.interfaces.ReflectedIndex
+ :members:
+ :inherited-members: dict
+
+.. autoclass:: sqlalchemy.engine.interfaces.ReflectedPrimaryKeyConstraint
+ :members:
+ :inherited-members: dict
+
+.. autoclass:: sqlalchemy.engine.interfaces.ReflectedUniqueConstraint
+ :members:
+ :inherited-members: dict
+
+.. autoclass:: sqlalchemy.engine.interfaces.ReflectedTableComment
+ :members:
+ :inherited-members: dict
+
+
.. _metadata_reflection_dbagnostic_types:
Reflecting with Database-Agnostic Types
]
)
- def get_isolation_level_values(self, dbapi_conn):
- return super().get_isolation_level_values(dbapi_conn) + ["AUTOCOMMIT"]
+ def get_isolation_level_values(self, dbapi_connection):
+ return super().get_isolation_level_values(dbapi_connection) + [
+ "AUTOCOMMIT"
+ ]
- def set_isolation_level(self, connection, level):
+ def set_isolation_level(self, dbapi_connection, level):
# adjust for ConnectionFairy being present
# allows attribute set e.g. "connection.autocommit = True"
# to work properly
- if hasattr(connection, "dbapi_connection"):
- connection = connection.dbapi_connection
if level == "AUTOCOMMIT":
- connection.autocommit = True
+ dbapi_connection.autocommit = True
else:
- connection.autocommit = False
- super(PyODBCConnector, self).set_isolation_level(connection, level)
+ dbapi_connection.autocommit = False
+ super(PyODBCConnector, self).set_isolation_level(
+ dbapi_connection, level
+ )
]
)
- def get_isolation_level_values(self, dbapi_conn):
+ def get_isolation_level_values(self, dbapi_connection):
return list(self._isolation_lookup)
- def set_isolation_level(self, connection, level):
- cursor = connection.cursor()
- cursor.execute("SET TRANSACTION ISOLATION LEVEL %s" % level)
+ def set_isolation_level(self, dbapi_connection, level):
+ cursor = dbapi_connection.cursor()
+ cursor.execute(f"SET TRANSACTION ISOLATION LEVEL {level}")
cursor.close()
if level == "SNAPSHOT":
- connection.commit()
+ dbapi_connection.commit()
- def get_isolation_level(self, connection):
+ def get_isolation_level(self, dbapi_connection):
last_error = None
views = ("sys.dm_exec_sessions", "sys.dm_pdw_nodes_exec_sessions")
for view in views:
- cursor = connection.cursor()
+ cursor = dbapi_connection.cursor()
try:
cursor.execute(
- """
+ f"""
SELECT CASE transaction_isolation_level
WHEN 0 THEN NULL
WHEN 1 THEN 'READ UNCOMMITTED'
WHEN 3 THEN 'REPEATABLE READ'
WHEN 4 THEN 'SERIALIZABLE'
WHEN 5 THEN 'SNAPSHOT' END AS TRANSACTION_ISOLATION_LEVEL
- FROM %s
+ FROM {view}
where session_id = @@SPID
"""
- % view
)
val = cursor.fetchone()[0]
except self.dbapi.Error as err:
# DefaultDialect, so the warning here is all that displays
util.warn(
"Could not fetch transaction isolation level, "
- "tried views: %s; final error was: %s" % (views, last_error)
+ f"tried views: {views}; final error was: {last_error}"
)
raise NotImplementedError(
"Can't fetch isolation level on this particular "
- "SQL Server version. tried views: %s; final error was: %s"
- % (views, last_error)
+ f"SQL Server version. tried views: {views}; final "
+ f"error was: {last_error}"
)
def initialize(self, connection):
else:
return False
- def get_isolation_level_values(self, dbapi_conn):
- return super().get_isolation_level_values(dbapi_conn) + ["AUTOCOMMIT"]
+ def get_isolation_level_values(self, dbapi_connection):
+ return super().get_isolation_level_values(dbapi_connection) + [
+ "AUTOCOMMIT"
+ ]
- def set_isolation_level(self, connection, level):
+ def set_isolation_level(self, dbapi_connection, level):
if level == "AUTOCOMMIT":
- connection.autocommit(True)
+ dbapi_connection.autocommit(True)
else:
- connection.autocommit(False)
+ dbapi_connection.autocommit(False)
super(MSDialect_pymssql, self).set_isolation_level(
- connection, level
+ dbapi_connection, level
)
"REPEATABLE READ",
)
- def set_isolation_level(self, dbapi_conn, level):
- cursor = dbapi_conn.cursor()
- cursor.execute("SET SESSION TRANSACTION ISOLATION LEVEL %s" % level)
+ def set_isolation_level(self, dbapi_connection, level):
+ cursor = dbapi_connection.cursor()
+ cursor.execute(f"SET SESSION TRANSACTION ISOLATION LEVEL {level}")
cursor.execute("COMMIT")
cursor.close()
- def get_isolation_level(self, connection):
- cursor = connection.cursor()
+ def get_isolation_level(self, dbapi_connection):
+ cursor = dbapi_connection.cursor()
if self._is_mysql and self.server_version_info >= (5, 7, 20):
cursor.execute("SELECT @@transaction_isolation")
else:
else:
return cset_name()
- def get_isolation_level_values(self, dbapi_conn):
+ def get_isolation_level_values(self, dbapi_connection):
return (
"SERIALIZABLE",
"READ UNCOMMITTED",
"AUTOCOMMIT",
)
- def set_isolation_level(self, dbapi_conn, level):
+ def set_isolation_level(self, dbapi_connection, level):
if level == "AUTOCOMMIT":
- dbapi_conn.autocommit(True)
+ dbapi_connection.autocommit(True)
else:
- dbapi_conn.autocommit(False)
+ dbapi_connection.autocommit(False)
super(MySQLDialect_mysqldb, self).set_isolation_level(
- dbapi_conn, level
+ dbapi_connection, level
)
# use the default
return None
- def get_isolation_level_values(self, dbapi_conn):
+ def get_isolation_level_values(self, dbapi_connection):
return ["READ COMMITTED", "SERIALIZABLE"]
def get_default_isolation_level(self, dbapi_conn):
super(OracleDialect_cx_oracle, self).initialize(connection)
self._detect_decimal_char(connection)
- def get_isolation_level(self, connection):
+ def get_isolation_level(self, dbapi_connection):
# sources:
# general idea of transaction id, have to start one, etc.
# Oracle tuple comparison without using IN:
# https://www.sql-workbench.eu/comparison/tuple_comparison.html
- with connection.cursor() as cursor:
+ with dbapi_connection.cursor() as cursor:
# this is the only way to ensure a transaction is started without
# actually running DML. There's no way to see the configured
# isolation level without getting it from v$transaction which
return result
- def get_isolation_level_values(self, dbapi_conn):
- return super().get_isolation_level_values(dbapi_conn) + ["AUTOCOMMIT"]
+ def get_isolation_level_values(self, dbapi_connection):
+ return super().get_isolation_level_values(dbapi_connection) + [
+ "AUTOCOMMIT"
+ ]
- def set_isolation_level(self, connection, level):
- if hasattr(connection, "dbapi_connection"):
- dbapi_connection = connection.dbapi_connection
- else:
- dbapi_connection = connection
+ def set_isolation_level(self, dbapi_connection, level):
if level == "AUTOCOMMIT":
dbapi_connection.autocommit = True
else:
dbapi_connection.autocommit = False
- connection.rollback()
- with connection.cursor() as cursor:
- cursor.execute("ALTER SESSION SET ISOLATION_LEVEL=%s" % level)
+ dbapi_connection.rollback()
+ with dbapi_connection.cursor() as cursor:
+ cursor.execute(f"ALTER SESSION SET ISOLATION_LEVEL={level}")
def _detect_decimal_char(self, connection):
# we have the option to change this setting upon connect,
# requires that "dsn" be present as a blank string.
return ([""], opts)
- def get_isolation_level_values(self, dbapi_conn):
+ def get_isolation_level_values(self, dbapi_connection):
return (
"AUTOCOMMIT",
"READ COMMITTED",
"SERIALIZABLE": "serializable",
}
- def get_isolation_level_values(self, dbapi_conn):
+ def get_isolation_level_values(self, dbapi_connection):
return list(self._isolation_lookup)
- def set_isolation_level(self, connection, level):
- connection.set_isolation_level(self._isolation_lookup[level])
+ def set_isolation_level(self, dbapi_connection, level):
+ dbapi_connection.set_isolation_level(self._isolation_lookup[level])
def set_readonly(self, connection, value):
connection.readonly = value
"REPEATABLE READ",
)
- def set_isolation_level(self, connection, level):
- cursor = connection.cursor()
+ def set_isolation_level(self, dbapi_connection, level):
+ cursor = dbapi_connection.cursor()
cursor.execute(
"SET SESSION CHARACTERISTICS AS TRANSACTION "
- "ISOLATION LEVEL %s" % level
+ f"ISOLATION LEVEL {level}"
)
cursor.execute("COMMIT")
cursor.close()
- def get_isolation_level(self, connection):
- cursor = connection.cursor()
+ def get_isolation_level(self, dbapi_connection):
+ cursor = dbapi_connection.cursor()
cursor.execute("show transaction isolation level")
val = cursor.fetchone()[0]
cursor.close()
# connection was closed normally
return "connection is closed" in str(e)
- def get_isolation_level_values(self, dbapi_conn):
+ def get_isolation_level_values(self, dbapi_connection):
return (
"AUTOCOMMIT",
"READ COMMITTED",
"SERIALIZABLE",
)
- def set_isolation_level(self, connection, level):
+ def set_isolation_level(self, dbapi_connection, level):
level = level.replace("_", " ")
- # adjust for ConnectionFairy possibly being present
- if hasattr(connection, "dbapi_connection"):
- connection = connection.dbapi_connection
-
if level == "AUTOCOMMIT":
- connection.autocommit = True
+ dbapi_connection.autocommit = True
else:
- connection.autocommit = False
- cursor = connection.cursor()
+ dbapi_connection.autocommit = False
+ cursor = dbapi_connection.cursor()
cursor.execute(
"SET SESSION CHARACTERISTICS AS TRANSACTION "
- "ISOLATION LEVEL %s" % level
+ f"ISOLATION LEVEL {level}"
)
cursor.execute("COMMIT")
cursor.close()
return val == "on"
- def set_client_encoding(self, connection, client_encoding):
- # adjust for ConnectionFairy possibly being present
- if hasattr(connection, "dbapi_connection"):
- connection = connection.dbapi_connection
-
- cursor = connection.cursor()
- cursor.execute("SET CLIENT_ENCODING TO '" + client_encoding + "'")
+ def _set_client_encoding(self, dbapi_connection, client_encoding):
+ cursor = dbapi_connection.cursor()
+ cursor.execute(
+ f"""SET CLIENT_ENCODING TO '{
+ client_encoding.replace("'", "''")
+ }'"""
+ )
cursor.execute("COMMIT")
cursor.close()
if self.client_encoding is not None:
def on_connect(conn):
- self.set_client_encoding(conn, self.client_encoding)
+ self._set_client_encoding(conn, self.client_encoding)
fns.append(on_connect)
connection.isolation_level = isolation_level
def get_isolation_level(self, dbapi_connection):
- if hasattr(dbapi_connection, "dbapi_connection"):
- dbapi_connection = dbapi_connection.dbapi_connection
-
status_before = dbapi_connection.info.transaction_status
value = super().get_isolation_level(dbapi_connection)
dbapi_connection.rollback()
return value
- def set_isolation_level(self, connection, level):
- connection = getattr(connection, "dbapi_connection", connection)
+ def set_isolation_level(self, dbapi_connection, level):
if level == "AUTOCOMMIT":
self._do_isolation_level(
- connection, autocommit=True, isolation_level=None
+ dbapi_connection, autocommit=True, isolation_level=None
)
else:
self._do_isolation_level(
- connection,
+ dbapi_connection,
autocommit=False,
isolation_level=self._isolation_lookup[level],
)
super(PGDialect_psycopg2, self).initialize(connection)
self._has_native_hstore = (
self.use_native_hstore
- and self._hstore_oids(connection.connection) is not None
+ and self._hstore_oids(connection.connection.dbapi_connection)
+ is not None
)
# PGDialect.initialize() checks server version for <= 8.2 and sets
"SERIALIZABLE": extensions.ISOLATION_LEVEL_SERIALIZABLE,
}
- def set_isolation_level(self, connection, level):
- connection.set_isolation_level(self._isolation_lookup[level])
+ def set_isolation_level(self, dbapi_connection, level):
+ dbapi_connection.set_isolation_level(self._isolation_lookup[level])
def set_readonly(self, connection, value):
connection.readonly = value
fns = []
if self.client_encoding is not None:
- def on_connect(conn):
- conn.set_client_encoding(self.client_encoding)
+ def on_connect(dbapi_conn):
+ dbapi_conn.set_client_encoding(self.client_encoding)
fns.append(on_connect)
if self.dbapi and self.use_native_uuid:
- def on_connect(conn):
- extras.register_uuid(None, conn)
+ def on_connect(dbapi_conn):
+ extras.register_uuid(None, dbapi_conn)
fns.append(on_connect)
if self.dbapi and self.use_native_hstore:
- def on_connect(conn):
- hstore_oids = self._hstore_oids(conn)
+ def on_connect(dbapi_conn):
+ hstore_oids = self._hstore_oids(dbapi_conn)
if hstore_oids is not None:
oid, array_oid = hstore_oids
kw = {"oid": oid}
kw["array_oid"] = array_oid
- extras.register_hstore(conn, **kw)
+ extras.register_hstore(dbapi_conn, **kw)
fns.append(on_connect)
if self.dbapi and self._json_deserializer:
- def on_connect(conn):
+ def on_connect(dbapi_conn):
extras.register_default_json(
- conn, loads=self._json_deserializer
+ dbapi_conn, loads=self._json_deserializer
)
extras.register_default_jsonb(
- conn, loads=self._json_deserializer
+ dbapi_conn, loads=self._json_deserializer
)
fns.append(on_connect)
if fns:
- def on_connect(conn):
+ def on_connect(dbapi_conn):
for fn in fns:
- fn(conn)
+ fn(dbapi_conn)
return on_connect
else:
)
@util.memoized_instancemethod
- def _hstore_oids(self, conn):
+ def _hstore_oids(self, dbapi_connection):
+
extras = self._psycopg2_extras
- if hasattr(conn, "dbapi_connection"):
- conn = conn.dbapi_connection
- oids = extras.HstoreAdapter.get_oids(conn)
+ oids = extras.HstoreAdapter.get_oids(dbapi_connection)
if oids is not None and oids[0]:
return oids[0:2]
else:
{"READ UNCOMMITTED": 1, "SERIALIZABLE": 0}
)
- def get_isolation_level_values(self, dbapi_conn):
+ def get_isolation_level_values(self, dbapi_connection):
return list(self._isolation_lookup)
- def set_isolation_level(self, connection, level):
+ def set_isolation_level(self, dbapi_connection, level):
isolation_level = self._isolation_lookup[level]
- cursor = connection.cursor()
- cursor.execute("PRAGMA read_uncommitted = %d" % isolation_level)
+ cursor = dbapi_connection.cursor()
+ cursor.execute(f"PRAGMA read_uncommitted = {isolation_level}")
cursor.close()
- def get_isolation_level(self, connection):
- cursor = connection.cursor()
+ def get_isolation_level(self, dbapi_connection):
+ cursor = dbapi_connection.cursor()
cursor.execute("PRAGMA read_uncommitted")
res = cursor.fetchone()
if res:
}
)
- def set_isolation_level(self, connection, level):
- if hasattr(connection, "dbapi_connection"):
- dbapi_connection = connection.dbapi_connection
- else:
- dbapi_connection = connection
+ def set_isolation_level(self, dbapi_connection, level):
if level == "AUTOCOMMIT":
dbapi_connection.isolation_level = None
else:
dbapi_connection.isolation_level = ""
return super(SQLiteDialect_pysqlite, self).set_isolation_level(
- connection, level
+ dbapi_connection, level
)
def on_connect(self):
return None
return re.search(a, b) is not None
- def set_regexp(connection):
- if hasattr(connection, "dbapi_connection"):
- dbapi_connection = connection.dbapi_connection
- else:
- dbapi_connection = connection
+ def set_regexp(dbapi_connection):
dbapi_connection.create_function(
"regexp",
2,
from .interfaces import AdaptedConnection
from .interfaces import BindTyping
from .interfaces import Compiled
-from .interfaces import Connectable
from .interfaces import CreateEnginePlugin
from .interfaces import Dialect
from .interfaces import ExceptionContext
# the MIT License: https://www.opensource.org/licenses/mit-license.php
import contextlib
import sys
+import typing
+from typing import Any
+from typing import Mapping
+from typing import Optional
+from typing import Union
from .interfaces import BindTyping
-from .interfaces import Connectable
from .interfaces import ConnectionEventsTarget
from .interfaces import ExceptionContext
from .util import _distill_params_20
from ..sql import compiler
from ..sql import util as sql_util
+if typing.TYPE_CHECKING:
+ from .interfaces import Dialect
+ from .url import URL
+ from ..pool import Pool
+ from ..pool import PoolProxiedConnection
"""Defines :class:`_engine.Connection` and :class:`_engine.Engine`.
NO_OPTIONS = util.immutabledict()
-class Connection(Connectable):
+class Connection(ConnectionEventsTarget):
"""Provides high-level functionality for a wrapped DB-API connection.
The :class:`_engine.Connection` object is procured by calling
return self._dbapi_connection is None and not self.closed
@property
- def connection(self):
+ def connection(self) -> "PoolProxiedConnection":
"""The underlying DB-API connection managed by this Connection.
This is a SQLAlchemy connection-pool proxied connection
"""
try:
- return self.dialect.get_isolation_level(self.connection)
+ return self.dialect.get_isolation_level(
+ self.connection.dbapi_connection
+ )
except BaseException as e:
self._handle_dbapi_exception(e, None, None, None, None)
def __init__(
self,
- pool,
- dialect,
- url,
- logging_name=None,
- echo=None,
- query_cache_size=500,
- execution_options=None,
- hide_parameters=False,
+ pool: "Pool",
+ dialect: "Dialect",
+ url: "URL",
+ logging_name: Optional[str] = None,
+ echo: Union[None, str, bool] = None,
+ query_cache_size: int = 500,
+ execution_options: Optional[Mapping[str, Any]] = None,
+ hide_parameters: bool = False,
):
self.pool = pool
self.url = url
constructs::
engine = create_engine("mysql+mysqldb://scott:tiger@hostname/dbname",
- encoding='latin1', echo=True)
+ pool_recycle=3600, echo=True)
The string form of the URL is
``dialect[+driver]://user:password@host/dbname[?key=value..]``, where
engine = engineclass(pool, dialect, u, **engine_args)
if _initialize:
+
do_on_connect = dialect.on_connect_url(u)
if do_on_connect:
def first_connect(dbapi_connection, connection_record):
c = base.Connection(
engine,
- connection=dbapi_connection,
+ connection=poollib._AdhocProxiedConnection(
+ dbapi_connection, connection_record
+ ),
_has_events=False,
# reconnecting will be a reentrant condition, so if the
# connection goes away, Connection is then closed
dialect = context.dialect
translate_colname = context._translate_colname
- description_decoder = (
- dialect._description_decoder
- if dialect.description_encoding
- else None
- )
normalize_name = (
dialect.normalize_name if dialect.requires_name_normalize else None
)
colname = rec[0]
coltype = rec[1]
- if description_decoder:
- colname = description_decoder(colname)
-
if translate_colname:
colname, untranslated = translate_colname(colname)
# *not* the FLOAT type however.
supports_native_decimal = False
- description_encoding = None
-
name = "default"
# length at which to truncate
NO_CACHE_KEY = NO_CACHE_KEY
NO_DIALECT_SUPPORT = NO_DIALECT_SUPPORT
+ # TODO: this is not to be part of 2.0. implement rudimentary binary
+ # literals for SQLite, PostgreSQL, MySQL only within
+ # _Binary.literal_processor
+ _legacy_binary_type_literal_encoding = "utf-8"
+
@util.deprecated_params(
empty_in_strategy=(
"1.4",
)
def __init__(
self,
- encoding="utf-8",
paramstyle=None,
isolation_level=None,
dbapi=None,
)
self.bind_typing = interfaces.BindTyping.SETINPUTSIZES
- self.encoding = encoding
self.positional = False
self._ischema = None
self.dbapi = dbapi
try:
self.default_isolation_level = self.get_default_isolation_level(
- connection.connection
+ connection.connection.dbapi_connection
)
except NotImplementedError:
self.default_isolation_level = None
"""Define core interfaces used by the engine system."""
from enum import Enum
-
+from typing import Any
+from typing import Callable
+from typing import Dict
+from typing import List
+from typing import Mapping
+from typing import Optional
+from typing import Sequence
+from typing import Tuple
+from typing import Type
+from typing import TYPE_CHECKING
+from typing import Union
+
+from ..pool import PoolProxiedConnection
from ..sql.compiler import Compiled # noqa
from ..sql.compiler import TypeCompiler # noqa
+from ..util.typing import _TypeToInstance
+from ..util.typing import NotRequired
+from ..util.typing import Protocol
+from ..util.typing import TypedDict
+
+if TYPE_CHECKING:
+ from .base import Connection
+ from .base import Engine
+ from .url import URL
+ from ..sql.compiler import DDLCompiler
+ from ..sql.compiler import IdentifierPreparer
+ from ..sql.compiler import SQLCompiler
+ from ..sql.type_api import TypeEngine
+
+
+class DBAPIConnection(Protocol):
+ """protocol representing a :pep:`249` database connection.
+
+ .. versionadded:: 2.0
+
+ .. seealso::
+
+ `Connection Objects <https://www.python.org/dev/peps/pep-0249/#connection-objects>`_
+ - in :pep:`249`
+
+ """ # noqa: E501
+
+ def close(self) -> None:
+ ...
+
+ def commit(self) -> None:
+ ...
+
+ def cursor(self) -> "DBAPICursor":
+ ...
+
+ def rollback(self) -> None:
+ ...
+
+
+class DBAPIType(Protocol):
+ """protocol representing a :pep:`249` database type.
+
+ .. versionadded:: 2.0
+
+ .. seealso::
+
+ `Type Objects <https://www.python.org/dev/peps/pep-0249/#type-objects>`_
+ - in :pep:`249`
+
+ """ # noqa: E501
+
+
+class DBAPICursor(Protocol):
+ """protocol representing a :pep:`249` database cursor.
+
+ .. versionadded:: 2.0
+
+ .. seealso::
+
+ `Cursor Objects <https://www.python.org/dev/peps/pep-0249/#cursor-objects>`_
+ - in :pep:`249`
+
+ """ # noqa: E501
+
+ @property
+ def description(
+ self,
+ ) -> Sequence[
+ Tuple[
+ str,
+ "DBAPIType",
+ Optional[int],
+ Optional[int],
+ Optional[int],
+ Optional[int],
+ Optional[bool],
+ ]
+ ]:
+ """The description attribute of the Cursor.
+
+ .. seealso::
+
+ `cursor.description <https://www.python.org/dev/peps/pep-0249/#description>`_
+ - in :pep:`249`
+
+
+ """ # noqa: E501
+ ...
+
+ @property
+ def rowcount(self) -> int:
+ ...
+
+ arraysize: int
+
+ def close(self) -> None:
+ ...
+
+ def execute(
+ self,
+ operation: Any,
+ parameters: Optional[Union[Sequence[Any], Mapping[str, Any]]],
+ ) -> Any:
+ ...
+
+ def executemany(
+ self,
+ operation: Any,
+ parameters: Sequence[Union[Sequence[Any], Mapping[str, Any]]],
+ ) -> Any:
+ ...
+
+ def fetchone(self) -> Optional[Any]:
+ ...
+
+ def fetchmany(self, size: int = ...) -> Sequence[Any]:
+ ...
+
+ def fetchall(self) -> Sequence[Any]:
+ ...
+
+ def setinputsizes(self, sizes: Sequence[Any]) -> None:
+ ...
+
+ def setoutputsize(self, size: Any, column: Any) -> None:
+ ...
+
+ def callproc(self, procname: str, parameters: Sequence[Any] = ...) -> Any:
+ ...
+
+ def nextset(self) -> Optional[bool]:
+ ...
+
+
+class ReflectedIdentity(TypedDict):
+ """represent the reflected IDENTITY structure of a column, corresponding
+ to the :class:`_schema.Identity` construct.
+
+ The :class:`.ReflectedIdentity` structure is part of the
+ :class:`.ReflectedColumn` structure, which is returned by the
+ :meth:`.Inspector.get_columns` method.
+
+ """
+
+ always: bool
+ """type of identity column"""
+
+ on_null: bool
+ """indicates ON NULL"""
+
+ start: int
+ """starting index of the sequence"""
+
+ increment: int
+ """increment value of the sequence"""
+
+ minvalue: int
+ """the minimum value of the sequence."""
+
+ maxvalue: int
+ """the maximum value of the sequence."""
+
+ nominvalue: bool
+ """no minimum value of the sequence."""
+
+ nomaxvalue: bool
+ """no maximum value of the sequence."""
+
+ cycle: bool
+ """allows the sequence to wrap around when the maxvalue
+ or minvalue has been reached."""
+
+ cache: Optional[int]
+ """number of future values in the
+ sequence which are calculated in advance."""
+
+ order: bool
+ """if true, renders the ORDER keyword."""
+
+
+class ReflectedComputed(TypedDict):
+ """Represent the reflected elements of a computed column, corresponding
+ to the :class:`_schema.Computed` construct.
+
+ The :class:`.ReflectedComputed` structure is part of the
+ :class:`.ReflectedColumn` structure, which is returned by the
+ :meth:`.Inspector.get_columns` method.
+
+ """
+
+ sqltext: str
+ """the expression used to generate this column returned
+ as a string SQL expression"""
+
+ persisted: bool
+ """indicates if the value is stored or computed on demand"""
+
+
+class ReflectedColumn(TypedDict):
+ """Dictionary representing the reflected elements corresponding to
+ a :class:`_schema.Column` object.
+
+ The :class:`.ReflectedColumn` structure is returned by the
+ :class:`.Inspector.get_columns` method.
+
+ """
+
+ name: str
+ """column name"""
+
+ type: "TypeEngine"
+ """column type represented as a :class:`.TypeEngine` instance."""
+
+ nullable: bool
+ """column nullability"""
+
+ default: str
+ """column default expression as a SQL string"""
+
+ autoincrement: NotRequired[bool]
+ """database-dependent autoincrement flag.
+
+ This flag indicates if the column has a database-side "autoincrement"
+ flag of some kind. Within SQLAlchemy, other kinds of columns may
+ also act as an "autoincrement" column without necessarily having
+ such a flag on them.
+
+ See :paramref:`_schema.Column.autoincrement` for more background on
+ "autoincrement".
+
+ """
+
+ comment: NotRequired[Optional[str]]
+ """comment for the column, if present"""
+
+ computed: NotRequired[Optional[ReflectedComputed]]
+ """indicates this column is computed at insert (possibly update) time by
+ the database."""
+
+ identity: NotRequired[Optional[ReflectedIdentity]]
+ """indicates this column is an IDENTITY column"""
+
+ dialect_options: NotRequired[Dict[str, Any]]
+ """Additional dialect-specific options detected for this reflected
+ object"""
+
+
+class ReflectedCheckConstraint(TypedDict):
+ """Dictionary representing the reflected elements corresponding to
+ :class:`.CheckConstraint`.
+
+ The :class:`.ReflectedCheckConstraint` structure is returned by the
+ :meth:`.Inspector.get_check_constraints` method.
+
+ """
+
+ name: Optional[str]
+ """constraint name"""
+
+ sqltext: str
+ """the check constraint's SQL expression"""
+
+ dialect_options: NotRequired[Dict[str, Any]]
+ """Additional dialect-specific options detected for this reflected
+ object"""
+
+
+class ReflectedUniqueConstraint(TypedDict):
+ """Dictionary representing the reflected elements corresponding to
+ :class:`.UniqueConstraint`.
+
+ The :class:`.ReflectedUniqueConstraint` structure is returned by the
+ :meth:`.Inspector.get_unique_constraints` method.
+
+ """
+
+ name: Optional[str]
+ """constraint name"""
+
+ column_names: List[str]
+ """column names which comprise the constraint"""
+
+ dialect_options: NotRequired[Dict[str, Any]]
+ """Additional dialect-specific options detected for this reflected
+ object"""
+
+
+class ReflectedPrimaryKeyConstraint(TypedDict):
+ """Dictionary representing the reflected elements corresponding to
+ :class:`.PrimaryKeyConstraint`.
+
+ The :class:`.ReflectedPrimaryKeyConstraint` structure is returned by the
+ :meth:`.Inspector.get_pk_constraint` method.
+
+ """
+
+ name: Optional[str]
+ """constraint name"""
+
+ constrained_columns: List[str]
+ """column names which comprise the constraint"""
+
+ dialect_options: NotRequired[Dict[str, Any]]
+ """Additional dialect-specific options detected for this reflected
+ object"""
+
+
+class ReflectedForeignKeyConstraint(TypedDict):
+ """Dictionary representing the reflected elements corresponding to
+ :class:`.ForeignKeyConstraint`.
+
+ The :class:`.ReflectedForeignKeyConstraint` structure is returned by
+ the :meth:`.Inspector.get_foreign_keys` method.
+
+ """
+
+ name: Optional[str]
+ """constraint name"""
+
+ constrained_columns: List[str]
+ """local column names which comprise the constraint"""
+
+ referred_schema: Optional[str]
+ """schema name of the table being referenced"""
+
+ referred_table: str
+ """name of the table being referenced"""
+
+ referred_columns: List[str]
+ """referenced column names"""
+
+ dialect_options: NotRequired[Dict[str, Any]]
+ """Additional dialect-specific options detected for this reflected
+ object"""
+
+
+class ReflectedIndex(TypedDict):
+ """Dictionary representing the reflected elements corresponding to
+ :class:`.Index`.
+
+ The :class:`.ReflectedIndex` structure is returned by the
+ :meth:`.Inspector.get_indexes` method.
+
+ """
+
+ name: Optional[str]
+ """constraint name"""
+
+ column_names: List[str]
+ """column names which the index refers towards"""
+
+ unique: bool
+ """whether or not the index has a unique flag"""
+
+ duplicates_constraint: NotRequired[bool]
+ """boolean indicating this index mirrors a unique constraint of the same
+ name"""
+
+ include_columns: NotRequired[List[str]]
+ """columns to include in the INCLUDE clause for supporting databases.
+
+ .. deprecated:: 2.0
+
+ Legacy value, will be replaced with
+ ``d["dialect_options"][<dialect name>]["include"]``
+
+ """
+
+ column_sorting: NotRequired[Dict[str, Tuple[str]]]
+ """optional dict mapping column names to tuple of sort keywords,
+ which may include ``asc``, ``desc``, ``nulls_first``, ``nulls_last``."""
+
+ dialect_options: NotRequired[Dict[str, Any]]
+ """Additional dialect-specific options detected for this reflected
+ object"""
+
+
+class ReflectedTableComment(TypedDict):
+ """Dictionary representing the reflected comment corresponding to
+ the :attr:`_schema.Table.comment` attribute.
+
+ The :class:`.ReflectedTableComment` structure is returned by the
+ :meth:`.Inspector.get_table_comment` method.
+
+ """
+
+ text: str
+ """text of the comment"""
class BindTyping(Enum):
directly. Instead, subclass :class:`.default.DefaultDialect` or
descendant class.
- All dialects include the following attributes. There are many other
- attributes that may be supported as well:
+ """
- ``name``
- identifying name for the dialect from a DBAPI-neutral point of view
+ name: str
+ """identifying name for the dialect from a DBAPI-neutral point of view
(i.e. 'sqlite')
+ """
- ``driver``
- identifying name for the dialect's DBAPI
+ driver: str
+ """identifying name for the dialect's DBAPI"""
- ``positional``
- True if the paramstyle for this Dialect is positional.
+ positional: bool
+ """True if the paramstyle for this Dialect is positional."""
- ``paramstyle``
- the paramstyle to be used (some DB-APIs support multiple
+ paramstyle: str
+ """the paramstyle to be used (some DB-APIs support multiple
paramstyles).
+ """
- ``encoding``
- type of encoding to use for unicode, usually defaults to
- 'utf-8'.
+ statement_compiler: Type["SQLCompiler"]
+ """a :class:`.Compiled` class used to compile SQL statements"""
- ``statement_compiler``
- a :class:`.Compiled` class used to compile SQL statements
+ ddl_compiler: Type["DDLCompiler"]
+ """a :class:`.Compiled` class used to compile DDL statements"""
- ``ddl_compiler``
- a :class:`.Compiled` class used to compile DDL statements
+ type_compiler: _TypeToInstance["TypeCompiler"]
+ """a :class:`.Compiled` class used to compile SQL type objects"""
- ``server_version_info``
- a tuple containing a version number for the DB backend in use.
- This value is only available for supporting dialects, and is
- typically populated during the initial connection to the database.
+ preparer: Type["IdentifierPreparer"]
+ """a :class:`.IdentifierPreparer` class used to
+ quote identifiers.
+ """
- ``default_schema_name``
- the name of the default schema. This value is only available for
- supporting dialects, and is typically populated during the
- initial connection to the database.
+ identifier_preparer: "IdentifierPreparer"
+ """This element will refer to an instance of :class:`.IdentifierPreparer`
+ once a :class:`.DefaultDialect` has been constructed.
- ``execution_ctx_cls``
- a :class:`.ExecutionContext` class used to handle statement execution
+ """
- ``execute_sequence_format``
- either the 'tuple' or 'list' type, depending on what cursor.execute()
- accepts for the second argument (they vary).
+ server_version_info: Optional[Tuple[Any, ...]]
+ """a tuple containing a version number for the DB backend in use.
- ``preparer``
- a :class:`~sqlalchemy.sql.compiler.IdentifierPreparer` class used to
- quote identifiers.
+ This value is only available for supporting dialects, and is
+ typically populated during the initial connection to the database.
+ """
- ``supports_alter``
- ``True`` if the database supports ``ALTER TABLE`` - used only for
- generating foreign key constraints in certain circumstances
+ default_schema_name: Optional[str]
+ """the name of the default schema. This value is only available for
+ supporting dialects, and is typically populated during the
+ initial connection to the database.
- ``max_identifier_length``
- The maximum length of identifier names.
+ """
- ``supports_sane_rowcount``
- Indicate whether the dialect properly implements rowcount for
+ execution_ctx_cls: Type["ExecutionContext"]
+ """a :class:`.ExecutionContext` class used to handle statement execution"""
+
+ execute_sequence_format: Union[Type[Tuple[Any, ...]], Type[List[Any]]]
+ """either the 'tuple' or 'list' type, depending on what cursor.execute()
+ accepts for the second argument (they vary)."""
+
+ supports_alter: bool
+ """``True`` if the database supports ``ALTER TABLE`` - used only for
+ generating foreign key constraints in certain circumstances
+ """
+
+ max_identifier_length: int
+ """The maximum length of identifier names."""
+
+ supports_sane_rowcount: bool
+ """Indicate whether the dialect properly implements rowcount for
``UPDATE`` and ``DELETE`` statements.
+ """
- ``supports_sane_multi_rowcount``
- Indicate whether the dialect properly implements rowcount for
+ supports_sane_multi_rowcount: bool
+ """Indicate whether the dialect properly implements rowcount for
``UPDATE`` and ``DELETE`` statements when executed via
executemany.
+ """
+
+ supports_default_values: bool
+ """Indicates if the construct ``INSERT INTO tablename DEFAULT
+ VALUES`` is supported
+ """
- ``preexecute_autoincrement_sequences``
- True if 'implicit' primary key functions must be executed separately
+ preexecute_autoincrement_sequences: bool
+ """True if 'implicit' primary key functions must be executed separately
in order to get their value. This is currently oriented towards
PostgreSQL.
+ """
+
+ implicit_returning: bool
+ """For dialects that support RETURNING, indicate RETURNING may be used
+ to fetch newly generated primary key values and other defaults from
+ an INSERT statement automatically.
+
+ .. seealso::
+
+ :paramref:`_schema.Table.implicit_returning`
+
+ """
- ``colspecs``
- A dictionary of TypeEngine classes from sqlalchemy.types mapped
+ colspecs: Dict[Type["TypeEngine[Any]"], Type["TypeEngine[Any]"]]
+ """A dictionary of TypeEngine classes from sqlalchemy.types mapped
to subclasses that are specific to the dialect class. This
dictionary is class-level only and is not accessed from the
dialect instance itself.
+ """
- ``supports_default_values``
- Indicates if the construct ``INSERT INTO tablename DEFAULT
- VALUES`` is supported
-
- ``supports_sequences``
- Indicates if the dialect supports CREATE SEQUENCE or similar.
+ supports_sequences: bool
+ """Indicates if the dialect supports CREATE SEQUENCE or similar."""
- ``sequences_optional``
- If True, indicates if the "optional" flag on the Sequence() construct
+ sequences_optional: bool
+ """If True, indicates if the :paramref:`_schema.Sequence.optional`
+ parameter on the :class:`_schema.Sequence` construct
should signal to not generate a CREATE SEQUENCE. Applies only to
dialects that support sequences. Currently used only to allow PostgreSQL
SERIAL to be used on a column that specifies Sequence() for usage on
other backends.
+ """
- ``supports_native_enum``
- Indicates if the dialect supports a native ENUM construct.
- This will prevent types.Enum from generating a CHECK
- constraint when that type is used.
+ supports_native_enum: bool
+ """Indicates if the dialect supports a native ENUM construct.
+ This will prevent :class:`_types.Enum` from generating a CHECK
+ constraint when that type is used in "native" mode.
+ """
- ``supports_native_boolean``
- Indicates if the dialect supports a native boolean construct.
- This will prevent types.Boolean from generating a CHECK
+ supports_native_boolean: bool
+ """Indicates if the dialect supports a native boolean construct.
+ This will prevent :class:`_types.Boolean` from generating a CHECK
constraint when that type is used.
+ """
- ``dbapi_exception_translation_map``
- A dictionary of names that will contain as values the names of
+ dbapi_exception_translation_map: Dict[str, str]
+ """A dictionary of names that will contain as values the names of
pep-249 exceptions ("IntegrityError", "OperationalError", etc)
keyed to alternate class names, to support the case where a
DBAPI has exception classes that aren't named as they are
referred to (e.g. IntegrityError = MyException). In the vast
majority of cases this dictionary is empty.
+ """
- .. versionadded:: 1.0.5
+ supports_comments: bool
+ """Indicates the dialect supports comment DDL on tables and columns."""
- """
+ inline_comments: bool
+ """Indicates the dialect supports comment DDL that's inline with the
+ definition of a Table or Column. If False, this implies that ALTER must
+ be used to set table and column comments."""
_has_events = False
- supports_statement_cache = True
+ supports_statement_cache: bool = True
"""indicates if this dialect supports caching.
All dialects that are compatible with statement caching should set this
"""
- def create_connect_args(self, url):
+ def create_connect_args(
+ self, url: "URL"
+ ) -> Tuple[Tuple[str], Mapping[str, Any]]:
"""Build DB-API compatible connection arguments.
Given a :class:`.URL` object, returns a tuple
raise NotImplementedError()
@classmethod
- def type_descriptor(cls, typeobj):
+ def type_descriptor(cls, typeobj: "TypeEngine") -> "TypeEngine":
"""Transform a generic type to a dialect-specific type.
Dialect classes will usually use the
raise NotImplementedError()
- def initialize(self, connection):
+ def initialize(self, connection: "Connection") -> None:
"""Called during strategized creation of the dialect with a
connection.
pass
- def get_columns(self, connection, table_name, schema=None, **kw):
- """Return information about columns in `table_name`.
+ def get_columns(
+ self,
+ connection: "Connection",
+ table_name: str,
+ schema: Optional[str] = None,
+ **kw
+ ) -> List[ReflectedColumn]:
+ """Return information about columns in ``table_name``.
Given a :class:`_engine.Connection`, a string
- `table_name`, and an optional string `schema`, return column
- information as a list of dictionaries with these keys:
-
- name
- the column's name
-
- type
- [sqlalchemy.types#TypeEngine]
-
- nullable
- boolean
-
- default
- the column's default value
-
- autoincrement
- boolean
-
- sequence
- a dictionary of the form
- {'name' : str, 'start' :int, 'increment': int, 'minvalue': int,
- 'maxvalue': int, 'nominvalue': bool, 'nomaxvalue': bool,
- 'cycle': bool, 'cache': int, 'order': bool}
+ ``table_name``, and an optional string ``schema``, return column
+ information as a list of dictionaries
+ corresponding to the :class:`.ReflectedColumn` dictionary.
- Additional column attributes may be present.
"""
raise NotImplementedError()
- def get_pk_constraint(self, connection, table_name, schema=None, **kw):
+ def get_pk_constraint(
+ self,
+ connection: "Connection",
+ table_name: str,
+ schema: Optional[str] = None,
+ **kw: Any
+ ) -> ReflectedPrimaryKeyConstraint:
"""Return information about the primary key constraint on
table_name`.
Given a :class:`_engine.Connection`, a string
- `table_name`, and an optional string `schema`, return primary
- key information as a dictionary with these keys:
+ ``table_name``, and an optional string ``schema``, return primary
+ key information as a dictionary corresponding to the
+ :class:`.ReflectedPrimaryKeyConstraint` dictionary.
- constrained_columns
- a list of column names that make up the primary key
-
- name
- optional name of the primary key constraint.
"""
raise NotImplementedError()
- def get_foreign_keys(self, connection, table_name, schema=None, **kw):
- """Return information about foreign_keys in `table_name`.
+ def get_foreign_keys(
+ self,
+ connection: "Connection",
+ table_name: str,
+ schema: Optional[str] = None,
+ **kw: Any
+ ) -> List[ReflectedForeignKeyConstraint]:
+ """Return information about foreign_keys in ``table_name``.
Given a :class:`_engine.Connection`, a string
- `table_name`, and an optional string `schema`, return foreign
- key information as a list of dicts with these keys:
-
- name
- the constraint's name
-
- constrained_columns
- a list of column names that make up the foreign key
-
- referred_schema
- the name of the referred schema
+ ``table_name``, and an optional string ``schema``, return foreign
+ key information as a list of dicts corresponding to the
+ :class:`.ReflectedForeignKeyConstraint` dictionary.
- referred_table
- the name of the referred table
-
- referred_columns
- a list of column names in the referred table that correspond to
- constrained_columns
"""
raise NotImplementedError()
- def get_table_names(self, connection, schema=None, **kw):
- """Return a list of table names for `schema`."""
+ def get_table_names(
+ self, connection: "Connection", schema: Optional[str] = None, **kw: Any
+ ) -> List[str]:
+ """Return a list of table names for ``schema``."""
raise NotImplementedError()
- def get_temp_table_names(self, connection, schema=None, **kw):
+ def get_temp_table_names(
+ self, connection: "Connection", schema: Optional[str] = None, **kw: Any
+ ) -> List[str]:
"""Return a list of temporary table names on the given connection,
if supported by the underlying backend.
raise NotImplementedError()
- def get_view_names(self, connection, schema=None, **kw):
+ def get_view_names(
+ self, connection: "Connection", schema: Optional[str] = None, **kw: Any
+ ) -> List[str]:
"""Return a list of all view names available in the database.
:param schema: schema name to query, if not the default schema.
raise NotImplementedError()
- def get_sequence_names(self, connection, schema=None, **kw):
+ def get_sequence_names(
+ self, connection: "Connection", schema: Optional[str] = None, **kw: Any
+ ) -> List[str]:
"""Return a list of all sequence names available in the database.
:param schema: schema name to query, if not the default schema.
raise NotImplementedError()
- def get_temp_view_names(self, connection, schema=None, **kw):
+ def get_temp_view_names(
+ self, connection: "Connection", schema: Optional[str] = None, **kw: Any
+ ) -> List[str]:
"""Return a list of temporary view names on the given connection,
if supported by the underlying backend.
raise NotImplementedError()
- def get_view_definition(self, connection, view_name, schema=None, **kw):
+ def get_view_definition(
+ self,
+ connection: "Connection",
+ view_name: str,
+ schema: Optional[str] = None,
+ **kw: Any
+ ) -> str:
"""Return view definition.
Given a :class:`_engine.Connection`, a string
- `view_name`, and an optional string `schema`, return the view
+ `view_name`, and an optional string ``schema``, return the view
definition.
"""
raise NotImplementedError()
- def get_indexes(self, connection, table_name, schema=None, **kw):
- """Return information about indexes in `table_name`.
+ def get_indexes(
+ self,
+ connection: "Connection",
+ table_name: str,
+ schema: Optional[str] = None,
+ **kw: Any
+ ) -> List[ReflectedIndex]:
+ """Return information about indexes in ``table_name``.
Given a :class:`_engine.Connection`, a string
- `table_name` and an optional string `schema`, return index
- information as a list of dictionaries with these keys:
-
- name
- the index's name
-
- column_names
- list of column names in order
+ ``table_name`` and an optional string ``schema``, return index
+ information as a list of dictionaries corresponding to the
+ :class:`.ReflectedIndex` dictionary.
- unique
- boolean
"""
raise NotImplementedError()
def get_unique_constraints(
- self, connection, table_name, schema=None, **kw
- ):
- r"""Return information about unique constraints in `table_name`.
-
- Given a string `table_name` and an optional string `schema`, return
- unique constraint information as a list of dicts with these keys:
-
- name
- the unique constraint's name
-
- column_names
- list of column names in order
-
- \**kw
- other options passed to the dialect's get_unique_constraints()
- method.
-
- .. versionadded:: 0.9.0
+ self,
+ connection: "Connection",
+ table_name: str,
+ schema: Optional[str] = None,
+ **kw: Any
+ ) -> List[ReflectedUniqueConstraint]:
+ r"""Return information about unique constraints in ``table_name``.
+
+ Given a string ``table_name`` and an optional string ``schema``, return
+ unique constraint information as a list of dicts corresponding
+ to the :class:`.ReflectedUniqueConstraint` dictionary.
"""
raise NotImplementedError()
- def get_check_constraints(self, connection, table_name, schema=None, **kw):
- r"""Return information about check constraints in `table_name`.
-
- Given a string `table_name` and an optional string `schema`, return
- check constraint information as a list of dicts with these keys:
+ def get_check_constraints(
+ self,
+ connection: "Connection",
+ table_name: str,
+ schema: Optional[str] = None,
+ **kw: Any
+ ) -> List[ReflectedCheckConstraint]:
+ r"""Return information about check constraints in ``table_name``.
- * ``name`` -
- the check constraint's name
+ Given a string ``table_name`` and an optional string ``schema``, return
+ check constraint information as a list of dicts corresponding
+ to the :class:`.ReflectedCheckConstraint` dictionary.
- * ``sqltext`` -
- the check constraint's SQL expression
+ """
- * ``**kw`` -
- other options passed to the dialect's get_check_constraints()
- method.
+ raise NotImplementedError()
- .. versionadded:: 1.1.0
+ def get_table_options(
+ self,
+ connection: "Connection",
+ table_name: str,
+ schema: Optional[str] = None,
+ **kw: Any
+ ) -> Dict[str, Any]:
+ r"""Return the "options" for the table identified by ``table_name``
+ as a dictionary.
"""
- raise NotImplementedError()
-
- def get_table_comment(self, connection, table_name, schema=None, **kw):
- r"""Return the "comment" for the table identified by `table_name`.
+ def get_table_comment(
+ self,
+ connection: "Connection",
+ table_name: str,
+ schema: Optional[str] = None,
+ **kw: Any
+ ) -> ReflectedTableComment:
+ r"""Return the "comment" for the table identified by ``table_name``.
- Given a string `table_name` and an optional string `schema`, return
- table comment information as a dictionary with this key:
+ Given a string ``table_name`` and an optional string ``schema``, return
+ table comment information as a dictionary corresponding to the
+ :class:`.ReflectedTableComment` dictionary.
- text
- text of the comment
- Raises ``NotImplementedError`` for dialects that don't support
- comments.
+ :raise: ``NotImplementedError`` for dialects that don't support
+ comments.
.. versionadded:: 1.2
raise NotImplementedError()
- def normalize_name(self, name):
+ def normalize_name(self, name: str) -> str:
"""convert the given name to lowercase if it is detected as
case insensitive.
"""
raise NotImplementedError()
- def denormalize_name(self, name):
+ def denormalize_name(self, name: str) -> str:
"""convert the given name to a case insensitive identifier
for the backend if it is an all-lowercase name.
"""
raise NotImplementedError()
- def has_table(self, connection, table_name, schema=None, **kw):
+ def has_table(
+ self,
+ connection: "Connection",
+ table_name: str,
+ schema: Optional[str] = None,
+ **kw: Any
+ ) -> bool:
"""For internal dialect use, check the existence of a particular table
or view in the database.
raise NotImplementedError()
- def has_index(self, connection, table_name, index_name, schema=None):
+ def has_index(
+ self,
+ connection: "Connection",
+ table_name: str,
+ index_name: str,
+ schema: Optional[str] = None,
+ ) -> bool:
"""Check the existence of a particular index name in the database.
Given a :class:`_engine.Connection` object, a string
- `table_name` and string index name, return True if an index of the
+ ``table_name`` and string index name, return True if an index of the
given name on the given table exists, false otherwise.
The :class:`.DefaultDialect` implements this in terms of the
raise NotImplementedError()
- def has_sequence(self, connection, sequence_name, schema=None, **kw):
+ def has_sequence(
+ self,
+ connection: "Connection",
+ sequence_name: str,
+ schema: Optional[str] = None,
+ **kw: Any
+ ) -> bool:
"""Check the existence of a particular sequence in the database.
Given a :class:`_engine.Connection` object and a string
raise NotImplementedError()
- def _get_server_version_info(self, connection):
+ def _get_server_version_info(self, connection: "Connection") -> Any:
"""Retrieve the server version info from the given connection.
This is used by the default implementation to populate the
raise NotImplementedError()
- def _get_default_schema_name(self, connection):
+ def _get_default_schema_name(self, connection: "Connection") -> str:
"""Return the string name of the currently selected schema from
the given connection.
raise NotImplementedError()
- def do_begin(self, dbapi_connection):
+ def do_begin(self, dbapi_connection: PoolProxiedConnection) -> None:
"""Provide an implementation of ``connection.begin()``, given a
DB-API connection.
raise NotImplementedError()
- def do_rollback(self, dbapi_connection):
+ def do_rollback(self, dbapi_connection: PoolProxiedConnection) -> None:
"""Provide an implementation of ``connection.rollback()``, given
a DB-API connection.
raise NotImplementedError()
- def do_commit(self, dbapi_connection):
+ def do_commit(self, dbapi_connection: PoolProxiedConnection) -> None:
"""Provide an implementation of ``connection.commit()``, given a
DB-API connection.
raise NotImplementedError()
- def do_close(self, dbapi_connection):
+ def do_close(self, dbapi_connection: PoolProxiedConnection) -> None:
"""Provide an implementation of ``connection.close()``, given a DBAPI
connection.
raise NotImplementedError()
- def do_set_input_sizes(self, cursor, list_of_tuples, context):
+ def do_set_input_sizes(
+ self,
+ cursor: DBAPICursor,
+ list_of_tuples: List[Tuple[str, Any, "TypeEngine"]],
+ context: "ExecutionContext",
+ ) -> Any:
"""invoke the cursor.setinputsizes() method with appropriate arguments
This hook is called if the :attr:`.Dialect.bind_typing` attribute is
"""
raise NotImplementedError()
- def create_xid(self):
+ def create_xid(self) -> Any:
"""Create a two-phase transaction ID.
This id will be passed to do_begin_twophase(),
raise NotImplementedError()
- def do_savepoint(self, connection, name):
+ def do_savepoint(self, connection: "Connection", name: str) -> None:
"""Create a savepoint with the given name.
:param connection: a :class:`_engine.Connection`.
raise NotImplementedError()
- def do_rollback_to_savepoint(self, connection, name):
+ def do_rollback_to_savepoint(
+ self, connection: "Connection", name: str
+ ) -> None:
"""Rollback a connection to the named savepoint.
:param connection: a :class:`_engine.Connection`.
raise NotImplementedError()
- def do_release_savepoint(self, connection, name):
+ def do_release_savepoint(
+ self, connection: "Connection", name: str
+ ) -> None:
"""Release the named savepoint on a connection.
:param connection: a :class:`_engine.Connection`.
raise NotImplementedError()
- def do_begin_twophase(self, connection, xid):
+ def do_begin_twophase(self, connection: "Connection", xid: Any) -> None:
"""Begin a two phase transaction on the given connection.
:param connection: a :class:`_engine.Connection`.
raise NotImplementedError()
- def do_prepare_twophase(self, connection, xid):
+ def do_prepare_twophase(self, connection: "Connection", xid: Any) -> None:
"""Prepare a two phase transaction on the given connection.
:param connection: a :class:`_engine.Connection`.
raise NotImplementedError()
def do_rollback_twophase(
- self, connection, xid, is_prepared=True, recover=False
- ):
+ self,
+ connection: "Connection",
+ xid: Any,
+ is_prepared: bool = True,
+ recover: bool = False,
+ ) -> None:
"""Rollback a two phase transaction on the given connection.
:param connection: a :class:`_engine.Connection`.
raise NotImplementedError()
def do_commit_twophase(
- self, connection, xid, is_prepared=True, recover=False
- ):
+ self,
+ connection: "Connection",
+ xid: Any,
+ is_prepared: bool = True,
+ recover: bool = False,
+ ) -> None:
"""Commit a two phase transaction on the given connection.
raise NotImplementedError()
- def do_recover_twophase(self, connection):
+ def do_recover_twophase(self, connection: "Connection") -> None:
"""Recover list of uncommitted prepared two phase transaction
identifiers on the given connection.
raise NotImplementedError()
- def do_executemany(self, cursor, statement, parameters, context=None):
+ def do_executemany(
+ self,
+ cursor: DBAPICursor,
+ statement: str,
+ parameters: List[Union[Dict[str, Any], Tuple[Any]]],
+ context: Optional["ExecutionContext"] = None,
+ ) -> None:
"""Provide an implementation of ``cursor.executemany(statement,
parameters)``."""
raise NotImplementedError()
- def do_execute(self, cursor, statement, parameters, context=None):
+ def do_execute(
+ self,
+ cursor: DBAPICursor,
+ statement: str,
+ parameters: Union[Mapping[str, Any], Tuple[Any]],
+ context: Optional["ExecutionContext"] = None,
+ ):
"""Provide an implementation of ``cursor.execute(statement,
parameters)``."""
raise NotImplementedError()
def do_execute_no_params(
- self, cursor, statement, parameters, context=None
+ self,
+ cursor: DBAPICursor,
+ statement: str,
+ context: Optional["ExecutionContext"] = None,
):
"""Provide an implementation of ``cursor.execute(statement)``.
raise NotImplementedError()
- def is_disconnect(self, e, connection, cursor):
+ def is_disconnect(
+ self,
+ e: Exception,
+ connection: Optional[PoolProxiedConnection],
+ cursor: DBAPICursor,
+ ) -> bool:
"""Return True if the given DB-API error indicates an invalid
connection"""
raise NotImplementedError()
- def connect(self, *cargs, **cparams):
+ def connect(self, *cargs: Any, **cparams: Any) -> Any:
r"""Establish a connection using this dialect's DBAPI.
The default implementation of this method is::
"""
- def on_connect_url(self, url):
+ def on_connect_url(self, url: "URL") -> Optional[Callable[[Any], Any]]:
"""return a callable which sets up a newly created DBAPI connection.
This method is a new hook that supersedes the
"""
return self.on_connect()
- def on_connect(self):
+ def on_connect(self) -> Optional[Callable[[Any], Any]]:
"""return a callable which sets up a newly created DBAPI connection.
The callable should accept a single argument "conn" which is the
"""
return None
- def reset_isolation_level(self, dbapi_conn):
+ def reset_isolation_level(self, dbapi_connection: DBAPIConnection) -> None:
"""Given a DBAPI connection, revert its isolation to the default.
Note that this is a dialect-level method which is used as part
raise NotImplementedError()
- def set_isolation_level(self, dbapi_conn, level):
+ def set_isolation_level(
+ self, dbapi_connection: DBAPIConnection, level: str
+ ) -> None:
"""Given a DBAPI connection, set its isolation level.
Note that this is a dialect-level method which is used as part
raise NotImplementedError()
- def get_isolation_level(self, dbapi_conn):
+ def get_isolation_level(self, dbapi_connection: DBAPIConnection) -> str:
"""Given a DBAPI connection, return its isolation level.
When working with a :class:`_engine.Connection` object,
raise NotImplementedError()
- def get_default_isolation_level(self, dbapi_conn):
+ def get_default_isolation_level(self, dbapi_conn: Any) -> str:
"""Given a DBAPI connection, return its isolation level, or
a default isolation level if one cannot be retrieved.
"""
raise NotImplementedError()
- def get_isolation_level_values(self, dbapi_conn):
+ def get_isolation_level_values(self, dbapi_conn: Any) -> List[str]:
"""return a sequence of string isolation level names that are accepted
by this dialect.
raise NotImplementedError()
@classmethod
- def get_dialect_cls(cls, url):
+ def get_dialect_cls(cls, url: "URL") -> Type:
"""Given a URL, return the :class:`.Dialect` that will be used.
This is a hook that allows an external plugin to provide functionality
return cls
@classmethod
- def get_async_dialect_cls(cls, url):
+ def get_async_dialect_cls(cls, url: "URL") -> None:
"""Given a URL, return the :class:`.Dialect` that will be used by
an async engine.
return cls.get_dialect_cls(url)
@classmethod
- def load_provisioning(cls):
+ def load_provisioning(cls) -> None:
"""set up the provision.py module for this dialect.
For dialects that include a provision.py module that sets up
"""
@classmethod
- def engine_created(cls, engine):
+ def engine_created(cls, engine: "Engine") -> None:
"""A convenience hook called before returning the final
:class:`_engine.Engine`.
"""
- def get_driver_connection(self, connection):
+ def get_driver_connection(self, connection: PoolProxiedConnection) -> Any:
"""Returns the connection object as returned by the external driver
package.
"""
-class Connectable(ConnectionEventsTarget):
- """Interface for an object which supports execution of SQL constructs.
-
- This is the base for :class:`_engine.Connection` and similar objects.
-
- .. versionchanged:: 2.0 :class:`_engine.Connectable` is no longer the
- base class for :class:`_engine.Engine`, replaced with
- :class:`_engine.ConnectionEventsTarget`.
-
- """
-
- engine = None
- """The :class:`_engine.Engine` instance referred to by this
- :class:`.Connectable`.
-
- """
-
- dialect = None
- """The :class:`_engine.Dialect` instance referred to by this
- :class:`.Connectable`.
-
- """
-
- def execute(self, object_, *multiparams, **params):
- """Executes the given construct and returns a
- :class:`_result.Result`.
-
- """
- raise NotImplementedError()
-
- def scalar(self, object_, *multiparams, **params):
- """Executes and returns the first column of the first row.
-
- The underlying cursor is closed after execution.
-
- """
- raise NotImplementedError()
-
-
class ExceptionContext:
"""Encapsulate information about an error condition in progress.
from operator import attrgetter
-from . import base
from . import url as _url
from .. import util
-class MockConnection(base.Connectable):
+class MockConnection:
def __init__(self, dialect, execute):
self._dialect = dialect
self.execute = execute
import contextlib
-from .base import Connectable
from .base import Connection
from .base import Engine
from .. import exc
def __init__(self, bind):
"""Initialize a new :class:`_reflection.Inspector`.
- :param bind: a :class:`~sqlalchemy.engine.Connectable`,
+ :param bind: a :class:`~sqlalchemy.engine.Connection`,
which is typically an instance of
:class:`~sqlalchemy.engine.Engine` or
:class:`~sqlalchemy.engine.Connection`.
"""Construct a new dialect-specific Inspector object from the given
engine or connection.
- :param bind: a :class:`~sqlalchemy.engine.Connectable`,
- which is typically an instance of
- :class:`~sqlalchemy.engine.Engine` or
- :class:`~sqlalchemy.engine.Connection`.
+ :param bind: a :class:`~sqlalchemy.engine.Connection`
+ or :class:`~sqlalchemy.engine.Engine`.
This method differs from direct a direct constructor call of
:class:`_reflection.Inspector` in that the
"""
return cls._construct(cls._init_legacy, bind)
- @inspection._inspects(Connectable)
- def _connectable_insp(bind):
- # this method should not be used unless some unusual case
- # has subclassed "Connectable"
-
- return Inspector._construct(Inspector._init_legacy, bind)
-
@inspection._inspects(Engine)
def _engine_insp(bind):
return Inspector._construct(Inspector._init_engine, bind)
"""
from . import events
+from .base import _AdhocProxiedConnection
from .base import _ConnectionFairy
from .base import _ConnectionRecord
from .base import _finalize_fairy
from .base import Pool
+from .base import PoolProxiedConnection
from .base import reset_commit
from .base import reset_none
from .base import reset_rollback
__all__ = [
"Pool",
+ "PoolProxiedConnection",
"reset_commit",
"reset_none",
"reset_rollback",
from collections import deque
import time
+from typing import Any
+from typing import Dict
+from typing import Optional
+from typing import TYPE_CHECKING
import weakref
from .. import event
from .. import log
from .. import util
+if TYPE_CHECKING:
+ from ..engine.interfaces import DBAPIConnection
reset_rollback = util.symbol("reset_rollback")
reset_commit = util.symbol("reset_commit")
_strong_ref_connection_records = {}
-class _ConnectionFairy:
+class PoolProxiedConnection:
+ """interface for the wrapper connection that is used by the connection
+ pool.
+
+ :class:`.PoolProxiedConnection` is basically the public-facing interface
+ for the :class:`._ConnectionFairy` implemenatation object, users familiar
+ with :class:`._ConnectionFairy` can consider this object to be
+ equivalent.
+
+ .. versionadded:: 2.0
+
+ """
+
+ __slots__ = ()
+
+ @util.memoized_property
+ def dbapi_connection(self) -> "DBAPIConnection":
+ """A reference to the actual DBAPI connection being tracked.
+
+ .. seealso::
+
+ :attr:`.PoolProxiedConnection.driver_connection`
+
+ :attr:`.PoolProxiedConnection.dbapi_connection`
+
+ :ref:`faq_dbapi_connection`
+
+ """
+ raise NotImplementedError()
+
+ @property
+ def driver_connection(self) -> Any:
+ """The connection object as returned by the driver after a connect.
+
+ .. seealso::
+
+ :attr:`.PoolProxiedConnection.dbapi_connection`
+
+ :attr:`._ConnectionRecord.driver_connection`
+
+ :ref:`faq_dbapi_connection`
+
+ """
+ raise NotImplementedError()
+
+ @property
+ def is_valid(self) -> bool:
+ """Return True if this :class:`.PoolProxiedConnection` still refers
+ to an active DBAPI connection."""
+
+ raise NotImplementedError()
+
+ @util.memoized_property
+ def info(self) -> Dict[str, Any]:
+ """Info dictionary associated with the underlying DBAPI connection
+ referred to by this :class:`.ConnectionFairy`, allowing user-defined
+ data to be associated with the connection.
+
+ The data here will follow along with the DBAPI connection including
+ after it is returned to the connection pool and used again
+ in subsequent instances of :class:`._ConnectionFairy`. It is shared
+ with the :attr:`._ConnectionRecord.info` and
+ :attr:`_engine.Connection.info`
+ accessors.
+
+ The dictionary associated with a particular DBAPI connection is
+ discarded when the connection itself is discarded.
+
+ """
+
+ raise NotImplementedError()
+
+ @property
+ def record_info(self) -> Dict[str, Any]:
+ """Info dictionary associated with the :class:`._ConnectionRecord
+ container referred to by this :class:`.PoolProxiedConnection`.
+
+ Unlike the :attr:`.PoolProxiedConnection.info` dictionary, the lifespan
+ of this dictionary is persistent across connections that are
+ disconnected and/or invalidated within the lifespan of a
+ :class:`._ConnectionRecord`.
+
+ """
+
+ raise NotImplementedError()
+
+ def invalidate(
+ self, e: Optional[Exception] = None, soft: bool = False
+ ) -> None:
+ """Mark this connection as invalidated.
+
+ This method can be called directly, and is also called as a result
+ of the :meth:`_engine.Connection.invalidate` method. When invoked,
+ the DBAPI connection is immediately closed and discarded from
+ further use by the pool. The invalidation mechanism proceeds
+ via the :meth:`._ConnectionRecord.invalidate` internal method.
+
+ :param e: an exception object indicating a reason for the invalidation.
+
+ :param soft: if True, the connection isn't closed; instead, this
+ connection will be recycled on next checkout.
+
+ .. seealso::
+
+ :ref:`pool_connection_invalidation`
+
+
+ """
+ raise NotImplementedError()
+
+ def detach(self) -> None:
+ """Separate this connection from its Pool.
+
+ This means that the connection will no longer be returned to the
+ pool when closed, and will instead be literally closed. The
+ containing ConnectionRecord is separated from the DB-API connection,
+ and will create a new connection when next used.
+
+ Note that any overall connection limiting constraints imposed by a
+ Pool implementation may be violated after a detach, as the detached
+ connection is removed from the pool's knowledge and control.
+
+ """
+
+ raise NotImplementedError()
+
+ def close(self) -> None:
+ """Release this connection back to the pool.
+
+ The :meth:`.PoolProxiedConnection.close` method shadows the
+ :pep:`249` ``.close()`` method, altering its behavior to instead
+ :term:`release` the proxied connection back to the connection pool.
+
+ Upon release to the pool, whether the connection stays "opened" and
+ pooled in the Python process, versus actually closed out and removed
+ from the Python process, is based on the pool implementation in use and
+ its configuration and current state.
+
+ """
+ raise NotImplementedError()
+
+
+class _AdhocProxiedConnection(PoolProxiedConnection):
+ """provides the :class:`.PoolProxiedConnection` interface for cases where
+ the DBAPI connection is not actually proxied.
+
+ This is used by the engine internals to pass a consistent
+ :class:`.PoolProxiedConnection` object to consuming dialects in response to
+ pool events that may not always have the :class:`._ConnectionFairy`
+ available.
+
+ """
+
+ __slots__ = ("dbapi_connection", "_connection_record")
+
+ def __init__(self, dbapi_connection, connection_record):
+ self.dbapi_connection = dbapi_connection
+ self._connection_record = connection_record
+
+ @property
+ def driver_connection(self):
+ return self._connection_record.driver_connection
+
+ @property
+ def connection(self):
+ """An alias to :attr:`._ConnectionFairy.dbapi_connection`.
+
+ This alias is deprecated, please use the new name.
+
+ .. deprecated:: 1.4.24
+
+ """
+ return self._dbapi_connection
+
+ @property
+ def is_valid(self):
+ raise AttributeError("is_valid not implemented by this proxy")
+
+ @property
+ def record_info(self):
+ return self._connection_record.record_info
+
+ def cursor(self, *args, **kwargs):
+ """Return a new DBAPI cursor for the underlying connection.
+
+ This method is a proxy for the ``connection.cursor()`` DBAPI
+ method.
+
+ """
+ return self.dbapi_connection.cursor(*args, **kwargs)
+
+ def __getattr__(self, key):
+ return getattr(self.dbapi_connection, key)
+
+
+class _ConnectionFairy(PoolProxiedConnection):
"""Proxies a DBAPI connection and provides return-on-dereference
support.
This is an internal object used by the :class:`_pool.Pool` implementation
to provide context management to a DBAPI connection delivered by
- that :class:`_pool.Pool`.
+ that :class:`_pool.Pool`. The public facing interface for this class
+ is described by the :class:`.PoolProxiedConnection` class.
The name "fairy" is inspired by the fact that the
:class:`._ConnectionFairy` object's lifespan is transitory, as it lasts
self._connection_record = connection_record
self._echo = echo
- dbapi_connection = None
- """A reference to the actual DBAPI connection being tracked.
-
- .. versionadded:: 1.4.24
-
- .. seealso::
-
- :attr:`._ConnectionFairy.driver_connection`
-
- :attr:`._ConnectionRecord.dbapi_connection`
-
- :ref:`faq_dbapi_connection`
-
- """
-
_connection_record = None
"""A reference to the :class:`._ConnectionRecord` object associated
with the DBAPI connection.
# try to checkin a second time.
del fairy
+ # never called, this is for code linters
+ raise
+
attempts -= 1
pool.logger.info("Reconnection attempts exhausted on checkout")
referred to by this :class:`.ConnectionFairy`, allowing user-defined
data to be associated with the connection.
- The data here will follow along with the DBAPI connection including
- after it is returned to the connection pool and used again
- in subsequent instances of :class:`._ConnectionFairy`. It is shared
- with the :attr:`._ConnectionRecord.info` and
- :attr:`_engine.Connection.info`
- accessors.
-
- The dictionary associated with a particular DBAPI connection is
- discarded when the connection itself is discarded.
+ See :attr:`.PoolProxiedConnection.info` for full description.
"""
return self._connection_record.info
"""Info dictionary associated with the :class:`._ConnectionRecord
container referred to by this :class:`.ConnectionFairy`.
- Unlike the :attr:`._ConnectionFairy.info` dictionary, the lifespan
- of this dictionary is persistent across connections that are
- disconnected and/or invalidated within the lifespan of a
- :class:`._ConnectionRecord`.
-
- .. versionadded:: 1.1
+ See :attr:`.PoolProxiedConnection.record_info` for full description.
"""
if self._connection_record:
def invalidate(self, e=None, soft=False):
"""Mark this connection as invalidated.
- This method can be called directly, and is also called as a result
- of the :meth:`_engine.Connection.invalidate` method. When invoked,
- the DBAPI connection is immediately closed and discarded from
- further use by the pool. The invalidation mechanism proceeds
- via the :meth:`._ConnectionRecord.invalidate` internal method.
-
- :param e: an exception object indicating a reason for the invalidation.
-
- :param soft: if True, the connection isn't closed; instead, this
- connection will be recycled on next checkout.
-
- .. versionadded:: 1.0.3
+ See :attr:`.PoolProxiedConnection.invalidate` for full description.
.. seealso::
def detach(self):
"""Separate this connection from its Pool.
- This means that the connection will no longer be returned to the
- pool when closed, and will instead be literally closed. The
- containing ConnectionRecord is separated from the DB-API connection,
- and will create a new connection when next used.
+ See :meth:`.PoolProxiedConnection.detach` for full description.
- Note that any overall connection limiting constraints imposed by a
- Pool implementation may be violated after a detach, as the detached
- connection is removed from the pool's knowledge and control.
"""
if self._connection_record is not None:
self._pool.dispatch.detach(self.dbapi_connection, rec)
def close(self):
+ """Release this connection back to the pool.
+
+ See :meth:`.PoolProxiedConnection.close` for full description.
+
+ """
self._counter -= 1
if self._counter == 0:
self._checkin()
from .base import Pool
from .. import event
-from ..engine.base import Engine
+from .. import util
class PoolEvents(event.Events):
_target_class_doc = "SomeEngineOrPool"
_dispatch_target = Pool
+ @util.preload_module("sqlalchemy.engine")
@classmethod
def _accept_with(cls, target):
+ Engine = util.preloaded.engine.Engine
+
if isinstance(target, type):
if issubclass(target, Engine):
return Pool
def literal_processor(self, dialect):
def process(value):
- value = value.decode(dialect.encoding).replace("'", "''")
+ # TODO: this is useless for real world scenarios; implement
+ # real binary literals
+ value = value.decode(
+ dialect._legacy_binary_type_literal_encoding
+ ).replace("'", "''")
return "'%s'" % value
return process
import collections
import re
+import typing
+from typing import Any
+from typing import Dict
+from typing import Optional
import warnings
import weakref
from .util import gc_collect
from .. import event
from .. import pool
+from ..util.typing import Literal
+
+
+if typing.TYPE_CHECKING:
+ from ..engine import Engine
+ from ..engine.url import URL
+ from ..ext.asyncio import AsyncEngine
class ConnectionKiller:
return engine
+@typing.overload
+def testing_engine(
+ url: Optional["URL"] = None,
+ options: Optional[Dict[str, Any]] = None,
+ asyncio: Literal[False] = False,
+ transfer_staticpool: bool = False,
+) -> "Engine":
+ ...
+
+
+@typing.overload
+def testing_engine(
+ url: Optional["URL"] = None,
+ options: Optional[Dict[str, Any]] = None,
+ asyncio: Literal[True] = True,
+ transfer_staticpool: bool = False,
+) -> "AsyncEngine":
+ ...
+
+
def testing_engine(
url=None,
options=None,
asyncio=False,
transfer_staticpool=False,
):
- """Produce an engine configured by --options with optional overrides."""
-
if asyncio:
from sqlalchemy.ext.asyncio import create_async_engine as create_engine
else:
eq_(conn.get_isolation_level(), non_default)
- conn.dialect.reset_isolation_level(conn.connection)
+ conn.dialect.reset_isolation_level(
+ conn.connection.dbapi_connection
+ )
eq_(conn.get_isolation_level(), existing)
c2 = conn.execution_options(isolation_level="AUTOCOMMIT")
self._test_conn_autocommits(c2, True)
- c2.dialect.reset_isolation_level(c2.connection)
+ c2.dialect.reset_isolation_level(c2.connection.dbapi_connection)
self._test_conn_autocommits(conn, False)
import platform
import sys
+py311 = sys.version_info >= (3, 11)
py39 = sys.version_info >= (3, 9)
py38 = sys.version_info >= (3, 8)
pypy = platform.python_implementation() == "PyPy"
--- /dev/null
+from typing import Any
+from typing import Generic
+from typing import overload
+from typing import Type
+from typing import TypeVar
+
+from . import compat
+
+if compat.py38:
+ from typing import Literal
+ from typing import Protocol
+ from typing import TypedDict
+else:
+ from typing_extensions import Literal # noqa
+ from typing_extensions import Protocol # noqa
+ from typing_extensions import TypedDict # noqa
+
+
+if compat.py311:
+ from typing import NotRequired # noqa
+else:
+ from typing_extensions import NotRequired # noqa
+
+
+_T = TypeVar("_T")
+
+
+class _TypeToInstance(Generic[_T]):
+ @overload
+ def __get__(self, instance: None, owner: Any) -> Type[_T]:
+ ...
+
+ @overload
+ def __get__(self, instance: object, owner: Any) -> _T:
+ ...
+
+ @overload
+ def __set__(self, instance: None, value: Type[_T]) -> None:
+ ...
+
+ @overload
+ def __set__(self, instance: object, value: _T) -> None:
+ ...
install_requires =
importlib-metadata;python_version<"3.8"
greenlet != 0.4.17;python_version>='3' and (platform_machine=='aarch64' or (platform_machine=='ppc64le' or (platform_machine=='x86_64' or (platform_machine=='amd64' or (platform_machine=='AMD64' or (platform_machine=='win32' or platform_machine=='WIN32'))))))
+ typing-extensions >= 4;python_version<"3.11"
[options.extras_require]
asyncio =
eng = testing_engine()
isolation_level = eng.dialect.get_isolation_level(
- eng.connect().connection
+ eng.connect().connection.dbapi_connection
)
level = self._non_default_isolation_level()
ne_(isolation_level, level)
eng = testing_engine(options=dict(isolation_level=level))
- eq_(eng.dialect.get_isolation_level(eng.connect().connection), level)
+ eq_(
+ eng.dialect.get_isolation_level(
+ eng.connect().connection.dbapi_connection
+ ),
+ level,
+ )
# check that it stays
conn = eng.connect()
- eq_(eng.dialect.get_isolation_level(conn.connection), level)
+ eq_(
+ eng.dialect.get_isolation_level(conn.connection.dbapi_connection),
+ level,
+ )
conn.close()
conn = eng.connect()
- eq_(eng.dialect.get_isolation_level(conn.connection), level)
+ eq_(
+ eng.dialect.get_isolation_level(conn.connection.dbapi_connection),
+ level,
+ )
conn.close()
def test_default_level(self):
eng = testing_engine(options=dict())
isolation_level = eng.dialect.get_isolation_level(
- eng.connect().connection
+ eng.connect().connection.dbapi_connection
)
eq_(isolation_level, self._default_isolation_level())
eng = testing_engine(options=dict())
conn = eng.connect()
eq_(
- eng.dialect.get_isolation_level(conn.connection),
+ eng.dialect.get_isolation_level(conn.connection.dbapi_connection),
self._default_isolation_level(),
)
eng.dialect.set_isolation_level(
- conn.connection, self._non_default_isolation_level()
+ conn.connection.dbapi_connection,
+ self._non_default_isolation_level(),
)
eq_(
- eng.dialect.get_isolation_level(conn.connection),
+ eng.dialect.get_isolation_level(conn.connection.dbapi_connection),
self._non_default_isolation_level(),
)
- eng.dialect.reset_isolation_level(conn.connection)
+ eng.dialect.reset_isolation_level(conn.connection.dbapi_connection)
eq_(
- eng.dialect.get_isolation_level(conn.connection),
+ eng.dialect.get_isolation_level(conn.connection.dbapi_connection),
self._default_isolation_level(),
)
)
conn = eng.connect()
eq_(
- eng.dialect.get_isolation_level(conn.connection),
+ eng.dialect.get_isolation_level(conn.connection.dbapi_connection),
self._non_default_isolation_level(),
)
eng.dialect.set_isolation_level(
- conn.connection, self._default_isolation_level()
+ conn.connection.dbapi_connection, self._default_isolation_level()
)
eq_(
- eng.dialect.get_isolation_level(conn.connection),
+ eng.dialect.get_isolation_level(conn.connection.dbapi_connection),
self._default_isolation_level(),
)
- eng.dialect.reset_isolation_level(conn.connection)
+ eng.dialect.reset_isolation_level(conn.connection.dbapi_connection)
eq_(
- eng.dialect.get_isolation_level(conn.connection),
+ eng.dialect.get_isolation_level(conn.connection.dbapi_connection),
self._non_default_isolation_level(),
)
conn.close()
)
c2 = eng.connect()
eq_(
- eng.dialect.get_isolation_level(c1.connection),
+ eng.dialect.get_isolation_level(c1.connection.dbapi_connection),
self._non_default_isolation_level(),
)
eq_(
- eng.dialect.get_isolation_level(c2.connection),
+ eng.dialect.get_isolation_level(c2.connection.dbapi_connection),
self._default_isolation_level(),
)
c1.close()
c2.close()
c3 = eng.connect()
eq_(
- eng.dialect.get_isolation_level(c3.connection),
+ eng.dialect.get_isolation_level(c3.connection.dbapi_connection),
self._default_isolation_level(),
)
c4 = eng.connect()
eq_(
- eng.dialect.get_isolation_level(c4.connection),
+ eng.dialect.get_isolation_level(c4.connection.dbapi_connection),
self._default_isolation_level(),
)
# was never set, so we are on original value
eq_(
- eng.dialect.get_isolation_level(c1.connection),
+ eng.dialect.get_isolation_level(c1.connection.dbapi_connection),
self._default_isolation_level(),
)
)
conn = eng.connect()
eq_(
- eng.dialect.get_isolation_level(conn.connection),
+ eng.dialect.get_isolation_level(conn.connection.dbapi_connection),
self._non_default_isolation_level(),
)
conn = eng.connect()
eq_(
- eng.dialect.get_isolation_level(conn.connection),
+ eng.dialect.get_isolation_level(conn.connection.dbapi_connection),
self._non_default_isolation_level(),
)
@async_test
async def test_engine_eq_ne(self, async_engine):
e2 = _async_engine.AsyncEngine(async_engine.sync_engine)
- e3 = testing.engines.testing_engine(
- asyncio=True, transfer_staticpool=True
- )
+ e3 = engines.testing_engine(asyncio=True, transfer_staticpool=True)
eq_(async_engine, e2)
ne_(async_engine, e3)
result.all()
try:
- engine = testing_engine(
+ engine = engines.testing_engine(
asyncio=True, transfer_staticpool=False
)