--- /dev/null
+.. change::
+ :tags: usecase, engine
+ :tickets: 5131
+
+ The :meth:`.Connection.connect` method is deprecated as is the concept of
+ "connection branching", which copies a :class:`.Connection` into a new one
+ that has a no-op ".close()" method. This pattern is oriented around the
+ "connectionless execution" concept which is also being removed in 2.0.
class PGInspector(reflection.Inspector):
- def __init__(self, conn):
- reflection.Inspector.__init__(self, conn)
-
def get_table_oid(self, table_name, schema=None):
"""Return the OID for the given table name."""
self.__transaction = None
self.__savepoint_seq = 0
self.should_close_with_result = close_with_result
+ if close_with_result:
+ util.warn_deprecated_20(
+ '"Connectionless" execution, which refers to running '
+ "SQL commands using the Engine.execute() (or "
+ "some_statement.execute()) method without "
+ "calling .connect() or .begin() to get a Connection, is "
+ "deprecated and will be removed SQLAlchemy 2.0"
+ )
+
self.__invalid = False
self.__can_reconnect = True
self._echo = self.engine._should_log_info()
return self.connection.info
+ @util.deprecated_20(":meth:`.Connection.connect`")
def connect(self, close_with_result=False):
"""Returns a branched version of this :class:`.Connection`.
"""
if self.__branch_from:
+ util.warn_deprecated(
+ "The .close() method on a so-called 'branched' connection is "
+ "deprecated as of 1.4, as are 'branched' connections overall, "
+ "and will be removed in a future release. If this is a "
+ "default-handling function, don't close the connection."
+ )
try:
del self.__connection
except AttributeError:
resource to be returned to the connection pool.
"""
-
connection = self.connect(close_with_result=True)
return connection.execute(statement, *multiparams, **params)
parameters = []
if compiled.positional:
for compiled_params in self.compiled_parameters:
- param = []
- for key in positiontup:
- if key in processors:
- param.append(processors[key](compiled_params[key]))
- else:
- param.append(compiled_params[key])
+ param = [
+ processors[key](compiled_params[key])
+ if key in processors
+ else compiled_params[key]
+ for key in positiontup
+ ]
parameters.append(dialect.execute_sequence_format(param))
else:
encode = not dialect.supports_unicode_statements
else:
return autocommit
- def _execute_scalar(self, stmt, type_):
+ def _execute_scalar(self, stmt, type_, parameters=None):
"""Execute a string statement on the current cursor, returning a
scalar result.
):
stmt = self.dialect._encoder(stmt)[0]
- if self.dialect.positional:
- default_params = self.dialect.execute_sequence_format()
- else:
- default_params = {}
+ if not parameters:
+ if self.dialect.positional:
+ parameters = self.dialect.execute_sequence_format()
+ else:
+ parameters = {}
- conn._cursor_execute(self.cursor, stmt, default_params, context=self)
+ conn._cursor_execute(self.cursor, stmt, parameters, context=self)
r = self.cursor.fetchone()[0]
if type_ is not None:
# apply type post processors to the result
self.current_column = column
return default.arg(self)
elif default.is_clause_element:
- # TODO: expensive branching here should be
- # pulled into _exec_scalar()
- conn = self.connection
- if not default._arg_is_typed:
- default_arg = expression.type_coerce(default.arg, type_)
- else:
- default_arg = default.arg
- c = expression.select([default_arg]).compile(bind=conn)
- return conn._execute_compiled(c, (), {}).scalar()
+ return self._exec_default_clause_element(column, default, type_)
else:
return default.arg
+ def _exec_default_clause_element(self, column, default, type_):
+ # execute a default that's a complete clause element. Here, we have
+ # to re-implement a miniature version of the compile->parameters->
+ # cursor.execute() sequence, since we don't want to modify the state
+ # of the connection / result in progress or create new connection/
+ # result objects etc.
+ # .. versionchanged:: 1.4
+
+ if not default._arg_is_typed:
+ default_arg = expression.type_coerce(default.arg, type_)
+ else:
+ default_arg = default.arg
+ compiled = expression.select([default_arg]).compile(
+ dialect=self.dialect
+ )
+ compiled_params = compiled.construct_params()
+ processors = compiled._bind_processors
+ if compiled.positional:
+ positiontup = compiled.positiontup
+ parameters = self.dialect.execute_sequence_format(
+ [
+ processors[key](compiled_params[key])
+ if key in processors
+ else compiled_params[key]
+ for key in positiontup
+ ]
+ )
+ else:
+ parameters = dict(
+ (
+ key,
+ processors[key](compiled_params[key])
+ if key in processors
+ else compiled_params[key],
+ )
+ for key in compiled_params
+ )
+ return self._execute_scalar(
+ util.text_type(compiled), type_, parameters=parameters
+ )
+
current_parameters = None
"""A dictionary of parameters applied to the current row.
raise NotImplementedError()
+@util.deprecated_20_cls(
+ ":class:`.Connectable`",
+ alternative=(
+ "The :class:`.Engine` will be the only Core "
+ "object that features a .connect() method, and the "
+ ":class:`.Connection` will be the only object that features "
+ "an .execute() method."
+ ),
+ constructor=None,
+)
class Connectable(object):
"""Interface for an object which supports execution of SQL constructs.
"""
- @util.deprecated(
- "0.7",
- "The :meth:`.Connectable.create` method is deprecated and will be "
- "removed in a future release. Please use the ``.create()`` method "
- "on specific schema objects to emit DDL sequences, including "
- ":meth:`.Table.create`, :meth:`.Index.create`, and "
- ":meth:`.MetaData.create_all`.",
- )
- def create(self, entity, **kwargs):
- """Emit CREATE statements for the given schema entity.
- """
-
- raise NotImplementedError()
-
- @util.deprecated(
- "0.7",
- "The :meth:`.Connectable.drop` method is deprecated and will be "
- "removed in a future release. Please use the ``.drop()`` method "
- "on specific schema objects to emit DDL sequences, including "
- ":meth:`.Table.drop`, :meth:`.Index.drop`, and "
- ":meth:`.MetaData.drop_all`.",
- )
- def drop(self, entity, **kwargs):
- """Emit DROP statements for the given schema entity.
- """
-
- raise NotImplementedError()
-
def execute(self, object_, *multiparams, **params):
"""Executes the given construct and returns a :class:`.ResultProxy`."""
raise NotImplementedError()
'name' attribute..
"""
+import contextlib
+
from .base import Connectable
+from .base import Connection
+from .base import Engine
from .. import exc
from .. import inspection
from .. import sql
fetched metadata.
A :class:`.Inspector` object is usually created via the
- :func:`.inspect` function::
+ :func:`.inspect` function, which may be passed an :class:`.Engine`
+ or a :class:`.Connection`::
from sqlalchemy import inspect, create_engine
engine = create_engine('...')
insp = inspect(engine)
- The inspection method above is equivalent to using the
- :meth:`.Inspector.from_engine` method, i.e.::
-
- engine = create_engine('...')
- insp = Inspector.from_engine(engine)
-
- Where above, the :class:`~sqlalchemy.engine.interfaces.Dialect` may opt
- to return an :class:`.Inspector` subclass that provides additional
- methods specific to the dialect's target database.
+ Where above, the :class:`~sqlalchemy.engine.interfaces.Dialect` associated
+ with the engine may opt to return an :class:`.Inspector` subclass that
+ provides additional methods specific to the dialect's target database.
"""
+ @util.deprecated(
+ "1.4",
+ "The __init__() method on :class:`.Inspector` is deprecated and "
+ "will be removed in a future release. Please use the "
+ ":func:`.sqlalchemy.inspect` "
+ "function on an :class:`.Engine` or :class:`.Connection` in order to "
+ "acquire an :class:`.Inspector`.",
+ )
def __init__(self, bind):
"""Initialize a new :class:`.Inspector`.
:meth:`.Inspector.from_engine`
"""
- # this might not be a connection, it could be an engine.
- self.bind = bind
+ return self._init_legacy(bind)
+
+ @classmethod
+ def _construct(cls, init, bind):
- # set the engine
+ if hasattr(bind.dialect, "inspector"):
+ cls = bind.dialect.inspector
+
+ self = cls.__new__(cls)
+ init(self, bind)
+ return self
+
+ def _init_legacy(self, bind):
if hasattr(bind, "engine"):
- self.engine = bind.engine
+ self._init_connection(bind)
else:
- self.engine = bind
+ self._init_engine(bind)
- if self.engine is bind:
- # if engine, ensure initialized
- bind.connect().close()
+ def _init_engine(self, engine):
+ self.bind = self.engine = engine
+ engine.connect().close()
+ self._op_context_requires_connect = True
+ self.dialect = self.engine.dialect
+ self.info_cache = {}
+ def _init_connection(self, connection):
+ self.bind = connection
+ self.engine = connection.engine
+ self._op_context_requires_connect = False
self.dialect = self.engine.dialect
self.info_cache = {}
@classmethod
+ @util.deprecated(
+ "1.4",
+ "The from_engine() method on :class:`.Inspector` is deprecated and "
+ "will be removed in a future release. Please use the "
+ ":func:`.sqlalchemy.inspect` "
+ "function on an :class:`.Engine` or :class:`.Connection` in order to "
+ "acquire an :class:`.Inspector`.",
+ )
def from_engine(cls, bind):
"""Construct a new dialect-specific Inspector object from the given
engine or connection.
See the example at :class:`.Inspector`.
"""
- if hasattr(bind.dialect, "inspector"):
- return bind.dialect.inspector(bind)
- return Inspector(bind)
+ return cls._construct(cls._init_legacy, bind)
@inspection._inspects(Connectable)
- def _insp(bind):
- return Inspector.from_engine(bind)
+ def _connectable_insp(bind):
+ # this method should not be used unless some unusual case
+ # has subclassed "Connectable"
+
+ return Inspector._construct(Inspector._init_legacy, bind)
+
+ @inspection._inspects(Engine)
+ def _engine_insp(bind):
+ return Inspector._construct(Inspector._init_engine, bind)
+
+ @inspection._inspects(Connection)
+ def _connection_insp(bind):
+ return Inspector._construct(Inspector._init_connection, bind)
+
+ @contextlib.contextmanager
+ def _operation_context(self):
+ """Return a context that optimizes for multiple operations on a single
+ transaction.
+
+ This essentially allows connect()/close() to be called if we detected
+ that we're against an :class:`.Engine` and not a :class:`.Connection`.
+
+ """
+ if self._op_context_requires_connect:
+ conn = self.bind.connect()
+ else:
+ conn = self.bind
+ try:
+ yield conn
+ finally:
+ if self._op_context_requires_connect:
+ conn.close()
+
+ @contextlib.contextmanager
+ def _inspection_context(self):
+ """Return an :class:`.Inspector` from this one that will run all
+ operations on a single connection.
+
+ """
+
+ with self._operation_context() as conn:
+ sub_insp = self._construct(self.__class__._init_connection, conn)
+ sub_insp.info_cache = self.info_cache
+ yield sub_insp
@property
def default_schema_name(self):
"""
if hasattr(self.dialect, "get_schema_names"):
- return self.dialect.get_schema_names(
- self.bind, info_cache=self.info_cache
- )
+ with self._operation_context() as conn:
+ return self.dialect.get_schema_names(
+ conn, info_cache=self.info_cache
+ )
return []
def get_table_names(self, schema=None):
"""
- return self.dialect.get_table_names(
- self.bind, schema, info_cache=self.info_cache
- )
+ with self._operation_context() as conn:
+ return self.dialect.get_table_names(
+ conn, schema, info_cache=self.info_cache
+ )
def has_table(self, table_name, schema=None):
"""Return True if the backend has a table of the given name.
"""
# TODO: info_cache?
- return self.dialect.has_table(self.bind, table_name, schema)
+ with self._operation_context() as conn:
+ return self.dialect.has_table(conn, table_name, schema)
def get_sorted_table_and_fkc_names(self, schema=None):
"""Return dependency-sorted table and foreign key constraint names in
with an already-given :class:`.MetaData`.
"""
- if hasattr(self.dialect, "get_table_names"):
+
+ with self._operation_context() as conn:
tnames = self.dialect.get_table_names(
- self.bind, schema, info_cache=self.info_cache
+ conn, schema, info_cache=self.info_cache
)
- else:
- tnames = self.engine.table_names(schema)
tuples = set()
remaining_fkcs = set()
.. versionadded:: 1.0.0
"""
- return self.dialect.get_temp_table_names(
- self.bind, info_cache=self.info_cache
- )
+
+ with self._operation_context() as conn:
+ return self.dialect.get_temp_table_names(
+ conn, info_cache=self.info_cache
+ )
def get_temp_view_names(self):
"""return a list of temporary view names for the current bind.
.. versionadded:: 1.0.0
"""
- return self.dialect.get_temp_view_names(
- self.bind, info_cache=self.info_cache
- )
+ with self._operation_context() as conn:
+ return self.dialect.get_temp_view_names(
+ conn, info_cache=self.info_cache
+ )
def get_table_options(self, table_name, schema=None, **kw):
"""Return a dictionary of options specified when the table of the
"""
if hasattr(self.dialect, "get_table_options"):
- return self.dialect.get_table_options(
- self.bind, table_name, schema, info_cache=self.info_cache, **kw
- )
+ with self._operation_context() as conn:
+ return self.dialect.get_table_options(
+ conn, table_name, schema, info_cache=self.info_cache, **kw
+ )
return {}
def get_view_names(self, schema=None):
"""
- return self.dialect.get_view_names(
- self.bind, schema, info_cache=self.info_cache
- )
+ with self._operation_context() as conn:
+ return self.dialect.get_view_names(
+ conn, schema, info_cache=self.info_cache
+ )
def get_view_definition(self, view_name, schema=None):
"""Return definition for `view_name`.
"""
- return self.dialect.get_view_definition(
- self.bind, view_name, schema, info_cache=self.info_cache
- )
+ with self._operation_context() as conn:
+ return self.dialect.get_view_definition(
+ conn, view_name, schema, info_cache=self.info_cache
+ )
def get_columns(self, table_name, schema=None, **kw):
"""Return information about columns in `table_name`.
"""
- col_defs = self.dialect.get_columns(
- self.bind, table_name, schema, info_cache=self.info_cache, **kw
- )
+ with self._operation_context() as conn:
+ col_defs = self.dialect.get_columns(
+ conn, table_name, schema, info_cache=self.info_cache, **kw
+ )
for col_def in col_defs:
# make this easy and only return instances for coltype
coltype = col_def["type"]
primary key information as a list of column names.
"""
- return self.dialect.get_pk_constraint(
- self.bind, table_name, schema, info_cache=self.info_cache, **kw
- )["constrained_columns"]
+ with self._operation_context() as conn:
+ return self.dialect.get_pk_constraint(
+ conn, table_name, schema, info_cache=self.info_cache, **kw
+ )["constrained_columns"]
def get_pk_constraint(self, table_name, schema=None, **kw):
"""Return information about primary key constraint on `table_name`.
use :class:`.quoted_name`.
"""
- return self.dialect.get_pk_constraint(
- self.bind, table_name, schema, info_cache=self.info_cache, **kw
- )
+ with self._operation_context() as conn:
+ return self.dialect.get_pk_constraint(
+ conn, table_name, schema, info_cache=self.info_cache, **kw
+ )
def get_foreign_keys(self, table_name, schema=None, **kw):
"""Return information about foreign_keys in `table_name`.
"""
- return self.dialect.get_foreign_keys(
- self.bind, table_name, schema, info_cache=self.info_cache, **kw
- )
+ with self._operation_context() as conn:
+ return self.dialect.get_foreign_keys(
+ conn, table_name, schema, info_cache=self.info_cache, **kw
+ )
def get_indexes(self, table_name, schema=None, **kw):
"""Return information about indexes in `table_name`.
"""
- return self.dialect.get_indexes(
- self.bind, table_name, schema, info_cache=self.info_cache, **kw
- )
+ with self._operation_context() as conn:
+ return self.dialect.get_indexes(
+ conn, table_name, schema, info_cache=self.info_cache, **kw
+ )
def get_unique_constraints(self, table_name, schema=None, **kw):
"""Return information about unique constraints in `table_name`.
"""
- return self.dialect.get_unique_constraints(
- self.bind, table_name, schema, info_cache=self.info_cache, **kw
- )
+ with self._operation_context() as conn:
+ return self.dialect.get_unique_constraints(
+ conn, table_name, schema, info_cache=self.info_cache, **kw
+ )
def get_table_comment(self, table_name, schema=None, **kw):
"""Return information about the table comment for ``table_name``.
"""
- return self.dialect.get_table_comment(
- self.bind, table_name, schema, info_cache=self.info_cache, **kw
- )
+ with self._operation_context() as conn:
+ return self.dialect.get_table_comment(
+ conn, table_name, schema, info_cache=self.info_cache, **kw
+ )
def get_check_constraints(self, table_name, schema=None, **kw):
"""Return information about check constraints in `table_name`.
"""
- return self.dialect.get_check_constraints(
- self.bind, table_name, schema, info_cache=self.info_cache, **kw
- )
+ with self._operation_context() as conn:
+ return self.dialect.get_check_constraints(
+ conn, table_name, schema, info_cache=self.info_cache, **kw
+ )
def reflecttable(
self,
if bind is None:
bind = _bind_or_error(self)
- with bind.connect() as conn:
- insp = inspection.inspect(conn)
-
+ with inspection.inspect(bind)._inspection_context() as insp:
reflect_opts = {
"autoload_with": insp,
"extend_existing": extend_existing,
from .assertions import eq_ignore_whitespace # noqa
from .assertions import eq_regex # noqa
from .assertions import expect_deprecated # noqa
+from .assertions import expect_deprecated_20 # noqa
from .assertions import expect_warnings # noqa
from .assertions import in_ # noqa
from .assertions import is_ # noqa
return _expect_warnings(sa_exc.SADeprecationWarning, messages, **kw)
+def expect_deprecated_20(*messages, **kw):
+ return _expect_warnings(sa_exc.RemovedIn20Warning, messages, **kw)
+
+
def emits_warning_on(db, *messages):
"""Mark a test as emitting a warning on a specific dialect.
from ... import String
from ... import testing
from ... import types as sql_types
-from ...engine.reflection import Inspector
from ...schema import DDL
from ...schema import Index
from ...sql.elements import quoted_name
def test_deprecated_get_primary_keys(self):
meta = self.metadata
users = self.tables.users
- insp = Inspector(meta.bind)
+ insp = inspect(meta.bind)
assert_raises_message(
sa_exc.SADeprecationWarning,
r".*get_primary_keys\(\) method is deprecated",
from .compat import zip_longest # noqa
from .deprecations import deprecated # noqa
from .deprecations import deprecated_20 # noqa
+from .deprecations import deprecated_20_cls # noqa
from .deprecations import deprecated_cls # noqa
from .deprecations import deprecated_params # noqa
from .deprecations import inject_docstring_text # noqa
def warn_deprecated_20(msg, stacklevel=3):
- msg += "(Background on SQLAlchemy 2.0 at: http://sqlalche.me/e/b8d9)"
+ msg += " (Background on SQLAlchemy 2.0 at: http://sqlalche.me/e/b8d9)"
warnings.warn(msg, exc.RemovedIn20Warning, stacklevel=stacklevel)
return decorate
+def deprecated_20_cls(clsname, alternative=None, constructor="__init__"):
+ message = (
+ ".. deprecated:: 2.0 The %s class is considered legacy as of the "
+ "1.x series of SQLAlchemy and will be removed in 2.0." % clsname
+ )
+
+ if alternative:
+ message += " " + alternative
+
+ def decorate(cls):
+ return _decorate_cls_with_warning(
+ cls, constructor, exc.RemovedIn20Warning, message, message
+ )
+
+ return decorate
+
+
def deprecated(
version, message=None, add_deprecation_to_docstring=True, warning=None
):
def deprecated_20(api_name, alternative=None, **kw):
message = (
- "The %s() function/method is considered legacy as of the "
+ "The %s function/method is considered legacy as of the "
"1.x series of SQLAlchemy and will be removed in 2.0." % api_name
)
if alternative:
message += " " + alternative
- message += " (Background on SQLAlchemy 2.0 at: http://sqlalche.me/e/b8d9)"
-
return deprecated(
"2.0", message=message, warning=exc.RemovedIn20Warning, **kw
)
):
doc = cls.__doc__ is not None and cls.__doc__ or ""
if docstring_header is not None:
- docstring_header %= dict(func=constructor)
+ if constructor is not None:
+ docstring_header %= dict(func=constructor)
+
+ if issubclass(wtype, exc.RemovedIn20Warning):
+ docstring_header += (
+ " (Background on SQLAlchemy 2.0 at: "
+ ":ref:`migration_20_toplevel`)"
+ )
doc = inject_docstring_text(doc, docstring_header, 1)
if type(cls) is type:
clsdict = dict(cls.__dict__)
clsdict["__doc__"] = doc
+ clsdict.pop("__dict__", None)
cls = type(cls.__name__, cls.__bases__, clsdict)
- constructor_fn = clsdict[constructor]
+ if constructor is not None:
+ constructor_fn = clsdict[constructor]
+
else:
cls.__doc__ = doc
- constructor_fn = getattr(cls, constructor)
-
- setattr(
- cls,
- constructor,
- _decorate_with_warning(constructor_fn, wtype, message, None),
- )
-
+ if constructor is not None:
+ constructor_fn = getattr(cls, constructor)
+
+ if constructor is not None:
+ setattr(
+ cls,
+ constructor,
+ _decorate_with_warning(constructor_fn, wtype, message, None),
+ )
return cls
message = _sanitize_restructured_text(message)
+ if issubclass(wtype, exc.RemovedIn20Warning):
+ doc_only = (
+ " (Background on SQLAlchemy 2.0 at: "
+ ":ref:`migration_20_toplevel`)"
+ )
+ warning_only = (
+ " (Background on SQLAlchemy 2.0 at: http://sqlalche.me/e/b8d9)"
+ )
+ else:
+ doc_only = warning_only = ""
+
@decorator
def warned(fn, *args, **kwargs):
skip_warning = kwargs.pop("_sa_skip_warning", False)
if not skip_warning:
- warnings.warn(message, wtype, stacklevel=3)
+ warnings.warn(message + warning_only, wtype, stacklevel=3)
return fn(*args, **kwargs)
doc = func.__doc__ is not None and func.__doc__ or ""
if docstring_header is not None:
docstring_header %= dict(func=func.__name__)
+ docstring_header += doc_only
+
doc = inject_docstring_text(doc, docstring_header, 1)
decorated = warned(func)
from sqlalchemy.dialects.mssql import base
from sqlalchemy.dialects.mssql.information_schema import CoerceUnicode
from sqlalchemy.dialects.mssql.information_schema import tables
-from sqlalchemy.engine.reflection import Inspector
from sqlalchemy.testing import AssertsCompiledSQL
from sqlalchemy.testing import ComparesTables
from sqlalchemy.testing import eq_
self.metadata.drop_all()
def test_inspect_view_definition(self):
- inspector = Inspector.from_engine(testing.db)
+ inspector = inspect(testing.db)
view_def = inspector.get_view_definition("huge_named_view")
eq_(view_def, self.view_str)
from sqlalchemy.dialects.postgresql import INTEGER
from sqlalchemy.dialects.postgresql import INTERVAL
from sqlalchemy.dialects.postgresql import TSRANGE
-from sqlalchemy.engine import reflection
from sqlalchemy.sql.schema import CheckConstraint
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import mock
metadata=self.metadata,
)
enum_type.create(conn)
- inspector = reflection.Inspector.from_engine(conn.engine)
+ inspector = inspect(conn)
eq_(
inspector.get_enums("test_schema"),
[
"cat", "dog", "rat", name="pet", metadata=self.metadata
)
enum_type.create(testing.db)
- inspector = reflection.Inspector.from_engine(testing.db)
+ inspector = inspect(testing.db)
eq_(
inspector.get_enums(),
[
)
enum_type.create(testing.db)
schema_enum_type.create(testing.db)
- inspector = reflection.Inspector.from_engine(testing.db)
+ inspector = inspect(testing.db)
eq_(
inspector.get_enums(),
def test_inspect_enum_empty(self):
enum_type = postgresql.ENUM(name="empty", metadata=self.metadata)
enum_type.create(testing.db)
- inspector = reflection.Inspector.from_engine(testing.db)
+ inspector = inspect(testing.db)
eq_(
inspector.get_enums(),
from sqlalchemy import util
from sqlalchemy.dialects.sqlite import base as sqlite
from sqlalchemy.dialects.sqlite import pysqlite as pysqlite_dialect
-from sqlalchemy.engine.reflection import Inspector
from sqlalchemy.engine.url import make_url
from sqlalchemy.schema import CreateTable
from sqlalchemy.schema import FetchedValue
def test_foreign_key_name_is_none(self):
# and not "0"
- inspector = Inspector(testing.db)
+ inspector = inspect(testing.db)
fks = inspector.get_foreign_keys("b")
eq_(
fks,
)
def test_foreign_key_name_is_not_none(self):
- inspector = Inspector(testing.db)
+ inspector = inspect(testing.db)
fks = inspector.get_foreign_keys("c")
eq_(
fks,
)
def test_foreign_key_implicit_parent(self):
- inspector = Inspector(testing.db)
+ inspector = inspect(testing.db)
fks = inspector.get_foreign_keys("implicit_referrer")
eq_(
fks,
)
def test_foreign_key_composite_implicit_parent(self):
- inspector = Inspector(testing.db)
+ inspector = inspect(testing.db)
fks = inspector.get_foreign_keys("implicit_referrer_comp")
eq_(
fks,
def test_foreign_key_implicit_missing_parent(self):
# test when the FK refers to a non-existent table and column names
# aren't given. only sqlite allows this case to exist
- inspector = Inspector(testing.db)
+ inspector = inspect(testing.db)
fks = inspector.get_foreign_keys("implicit_referrer_comp_fake")
# the referred table doesn't exist but the operation does not fail
eq_(
)
def test_unnamed_inline_foreign_key(self):
- inspector = Inspector(testing.db)
+ inspector = inspect(testing.db)
fks = inspector.get_foreign_keys("e")
eq_(
fks,
)
def test_unnamed_inline_foreign_key_quoted(self):
- inspector = Inspector(testing.db)
+ inspector = inspect(testing.db)
fks = inspector.get_foreign_keys("e1")
eq_(
fks,
)
def test_foreign_key_composite_broken_casing(self):
- inspector = Inspector(testing.db)
+ inspector = inspect(testing.db)
fks = inspector.get_foreign_keys("j")
eq_(
fks,
)
def test_foreign_key_ondelete_onupdate(self):
- inspector = Inspector(testing.db)
+ inspector = inspect(testing.db)
fks = inspector.get_foreign_keys("onud_test")
eq_(
fks,
)
def test_dont_reflect_autoindex(self):
- inspector = Inspector(testing.db)
+ inspector = inspect(testing.db)
eq_(inspector.get_indexes("o"), [])
eq_(
inspector.get_indexes("o", include_auto_indexes=True),
def test_create_index_with_schema(self):
"""Test creation of index with explicit schema"""
- inspector = Inspector(testing.db)
+ inspector = inspect(testing.db)
eq_(
inspector.get_indexes("l", schema="main"),
[
)
def test_unique_constraint_named(self):
- inspector = Inspector(testing.db)
+ inspector = inspect(testing.db)
eq_(
inspector.get_unique_constraints("f"),
[{"column_names": ["x"], "name": "foo_fx"}],
)
def test_unique_constraint_named_broken_casing(self):
- inspector = Inspector(testing.db)
+ inspector = inspect(testing.db)
eq_(
inspector.get_unique_constraints("h"),
[{"column_names": ["x"], "name": "foo_hx"}],
)
def test_unique_constraint_named_broken_temp(self):
- inspector = Inspector(testing.db)
+ inspector = inspect(testing.db)
eq_(
inspector.get_unique_constraints("g"),
[{"column_names": ["x"], "name": "foo_gx"}],
)
def test_unique_constraint_unnamed_inline(self):
- inspector = Inspector(testing.db)
+ inspector = inspect(testing.db)
eq_(
inspector.get_unique_constraints("d"),
[{"column_names": ["x"], "name": None}],
)
def test_unique_constraint_unnamed_inline_quoted(self):
- inspector = Inspector(testing.db)
+ inspector = inspect(testing.db)
eq_(
inspector.get_unique_constraints("d1"),
[{"column_names": ["some ( STUPID n,ame"], "name": None}],
)
def test_unique_constraint_unnamed_normal(self):
- inspector = Inspector(testing.db)
+ inspector = inspect(testing.db)
eq_(
inspector.get_unique_constraints("m"),
[{"column_names": ["x"], "name": None}],
)
def test_unique_constraint_unnamed_normal_temporary(self):
- inspector = Inspector(testing.db)
+ inspector = inspect(testing.db)
eq_(
inspector.get_unique_constraints("n"),
[{"column_names": ["x"], "name": None}],
)
def test_primary_key_constraint_named(self):
- inspector = Inspector(testing.db)
+ inspector = inspect(testing.db)
eq_(
inspector.get_pk_constraint("p"),
{"constrained_columns": ["id"], "name": "pk_name"},
)
def test_primary_key_constraint_unnamed(self):
- inspector = Inspector(testing.db)
+ inspector = inspect(testing.db)
eq_(
inspector.get_pk_constraint("q"),
{"constrained_columns": ["id"], "name": None},
)
def test_primary_key_constraint_no_pk(self):
- inspector = Inspector(testing.db)
+ inspector = inspect(testing.db)
eq_(
inspector.get_pk_constraint("d"),
{"constrained_columns": [], "name": None},
)
def test_check_constraint(self):
- inspector = Inspector(testing.db)
+ inspector = inspect(testing.db)
eq_(
inspector.get_check_constraints("cp"),
[
assert not conn.closed
assert conn.closed
- def test_bind_close_conn(self):
- e = testing.db
- conn = e.connect()
- with conn.connect() as c2:
- assert not c2.closed
- assert not conn.closed
- assert c2.closed
-
def test_create_drop_explicit(self):
metadata = MetaData()
table = Table("test_table", metadata, Column("foo", Integer))
from sqlalchemy import testing
from sqlalchemy import text
from sqlalchemy import TypeDecorator
+from sqlalchemy.engine import reflection
from sqlalchemy.engine.base import Engine
from sqlalchemy.engine.mock import MockConnection
from sqlalchemy.testing import assert_raises
from sqlalchemy.testing import engines
from sqlalchemy.testing import eq_
from sqlalchemy.testing import fixtures
+from sqlalchemy.testing import is_
from sqlalchemy.testing import is_false
from sqlalchemy.testing import is_true
from sqlalchemy.testing.mock import Mock
pass
+class ConnectionlessDeprecationTest(fixtures.TestBase):
+ """test various things associated with "connectionless" executions."""
+
+ def test_inspector_constructor_engine(self):
+ with testing.expect_deprecated(
+ r"The __init__\(\) method on Inspector is deprecated and will "
+ r"be removed in a future release."
+ ):
+ i1 = reflection.Inspector(testing.db)
+
+ is_(i1.bind, testing.db)
+
+ def test_inspector_constructor_connection(self):
+ with testing.db.connect() as conn:
+ with testing.expect_deprecated(
+ r"The __init__\(\) method on Inspector is deprecated and "
+ r"will be removed in a future release."
+ ):
+ i1 = reflection.Inspector(conn)
+
+ is_(i1.bind, conn)
+ is_(i1.engine, testing.db)
+
+ def test_inspector_from_engine(self):
+ with testing.expect_deprecated(
+ r"The from_engine\(\) method on Inspector is deprecated and will "
+ r"be removed in a future release."
+ ):
+ i1 = reflection.Inspector.from_engine(testing.db)
+
+ is_(i1.bind, testing.db)
+
+ def test_bind_close_conn(self):
+ e = testing.db
+ conn = e.connect()
+
+ with testing.expect_deprecated(
+ r"The .close\(\) method on a so-called 'branched' "
+ "connection is deprecated"
+ ):
+ with conn.connect() as c2:
+ assert not c2.closed
+ assert not conn.closed
+ assert c2.closed
+
+
class CreateEngineTest(fixtures.TestBase):
def test_strategy_keyword_mock(self):
def executor(x, y):
):
table_names = testing.db.table_names()
is_true(set(table_names).issuperset(metadata.tables))
+
+
+class ExecutionOptionsTest(fixtures.TestBase):
+ def test_branched_connection_execution_options(self):
+ engine = engines.testing_engine("sqlite://")
+
+ conn = engine.connect()
+ c2 = conn.execution_options(foo="bar")
+
+ with testing.expect_deprecated_20(
+ r"The Connection.connect\(\) function/method is considered "
+ ):
+ c2_branch = c2.connect()
+ eq_(c2_branch._execution_options, {"foo": "bar"})
eq_(c1._execution_options, {"foo": "bar"})
eq_(c2._execution_options, {"foo": "bar", "bat": "hoho"})
- def test_branched_connection_execution_options(self):
- engine = testing_engine("sqlite://")
-
- conn = engine.connect()
- c2 = conn.execution_options(foo="bar")
- c2_branch = c2.connect()
- eq_(c2_branch._execution_options, {"foo": "bar"})
-
def test_get_engine_execution_options(self):
engine = testing_engine("sqlite://")
engine.dialect = Mock()
from sqlalchemy.util import ue
-metadata, users = None, None
-
-
class ReflectionTest(fixtures.TestBase, ComparesTables):
__backend__ = True
test_needs_fk=True,
)
- meta.create_all()
- meta2 = MetaData(testing.db)
+ meta.create_all(testing.db)
+ meta2 = MetaData()
a2 = Table(
"addresses",
meta2,
Column("user_id", sa.Integer, sa.ForeignKey("users.id")),
- autoload=True,
+ autoload_with=testing.db,
)
- u2 = Table("users", meta2, autoload=True)
+ u2 = Table("users", meta2, autoload_with=testing.db)
s = sa.select([a2]).subquery()
assert s.c.user_id is not None
assert list(a2.c.user_id.foreign_keys)[0].parent is a2.c.user_id
assert u2.join(a2).onclause.compare(u2.c.id == a2.c.user_id)
- meta2 = MetaData(testing.db)
+ meta2 = MetaData()
u2 = Table(
"users",
meta2,
Column("id", sa.Integer, primary_key=True),
- autoload=True,
+ autoload_with=testing.db,
)
a2 = Table(
"addresses",
meta2,
Column("id", sa.Integer, primary_key=True),
Column("user_id", sa.Integer, sa.ForeignKey("users.id")),
- autoload=True,
+ autoload_with=testing.db,
)
s = sa.select([a2]).subquery()
for attr in test_attrs:
eq_(getattr(fk, attr), getattr(ref, attr))
+ @testing.provide_metadata
def test_pks_not_uniques(self):
"""test that primary key reflection not tripped up by unique
indexes"""
- testing.db.execute(
- """
- CREATE TABLE book (
- id INTEGER NOT NULL,
- title VARCHAR(100) NOT NULL,
- series INTEGER,
- series_id INTEGER,
- UNIQUE(series, series_id),
- PRIMARY KEY(id)
- )"""
- )
- try:
- metadata = MetaData(bind=testing.db)
- book = Table("book", metadata, autoload=True)
- assert book.primary_key.contains_column(book.c.id)
- assert not book.primary_key.contains_column(book.c.series)
- assert len(book.primary_key) == 1
- finally:
- testing.db.execute("drop table book")
+ with testing.db.begin() as conn:
+ conn.execute(
+ """
+ CREATE TABLE book (
+ id INTEGER NOT NULL,
+ title VARCHAR(100) NOT NULL,
+ series INTEGER,
+ series_id INTEGER,
+ UNIQUE(series, series_id),
+ PRIMARY KEY(id)
+ )"""
+ )
+
+ book = Table("book", self.metadata, autoload_with=testing.db)
+ assert book.primary_key.contains_column(book.c.id)
+ assert not book.primary_key.contains_column(book.c.series)
+ eq_(len(book.primary_key), 1)
def test_fk_error(self):
metadata = MetaData(testing.db)
metadata.create_all,
)
+ @testing.provide_metadata
def test_composite_pks(self):
"""test reflection of a composite primary key"""
- testing.db.execute(
- """
- CREATE TABLE book (
- id INTEGER NOT NULL,
- isbn VARCHAR(50) NOT NULL,
- title VARCHAR(100) NOT NULL,
- series INTEGER NOT NULL,
- series_id INTEGER NOT NULL,
- UNIQUE(series, series_id),
- PRIMARY KEY(id, isbn)
- )"""
- )
- try:
- metadata = MetaData(bind=testing.db)
- book = Table("book", metadata, autoload=True)
- assert book.primary_key.contains_column(book.c.id)
- assert book.primary_key.contains_column(book.c.isbn)
- assert not book.primary_key.contains_column(book.c.series)
- assert len(book.primary_key) == 2
- finally:
- testing.db.execute("drop table book")
+ with testing.db.begin() as conn:
+ conn.execute(
+ """
+ CREATE TABLE book (
+ id INTEGER NOT NULL,
+ isbn VARCHAR(50) NOT NULL,
+ title VARCHAR(100) NOT NULL,
+ series INTEGER NOT NULL,
+ series_id INTEGER NOT NULL,
+ UNIQUE(series, series_id),
+ PRIMARY KEY(id, isbn)
+ )"""
+ )
+ book = Table("book", self.metadata, autoload_with=testing.db)
+ assert book.primary_key.contains_column(book.c.id)
+ assert book.primary_key.contains_column(book.c.isbn)
+ assert not book.primary_key.contains_column(book.c.series)
+ eq_(len(book.primary_key), 2)
@testing.exclude("mysql", "<", (4, 1, 1), "innodb funkiness")
@testing.provide_metadata
"dialect %s doesn't have a has_schema method"
% testing.db.dialect.name
)
- eq_(
- testing.db.dialect.has_schema(
- testing.db, testing.config.test_schema
- ),
- True,
- )
- eq_(
- testing.db.dialect.has_schema(testing.db, "sa_fake_schema_123"),
- False,
- )
+ with testing.db.connect() as conn:
+ eq_(
+ testing.db.dialect.has_schema(
+ conn, testing.config.test_schema
+ ),
+ True,
+ )
+ eq_(
+ testing.db.dialect.has_schema(conn, "sa_fake_schema_123"),
+ False,
+ )
@testing.requires.schemas
@testing.requires.cross_schema_fk_reflection
@testing.requires.views
def _create_views(con, schema=None):
- for table_name in ("users", "email_addresses"):
- fullname = table_name
- if schema:
- fullname = "%s.%s" % (schema, table_name)
- view_name = fullname + "_v"
- query = "CREATE VIEW %s AS SELECT * FROM %s" % (view_name, fullname)
- con.execute(sa.sql.text(query))
+ with testing.db.connect() as conn:
+ for table_name in ("users", "email_addresses"):
+ fullname = table_name
+ if schema:
+ fullname = "%s.%s" % (schema, table_name)
+ view_name = fullname + "_v"
+ query = "CREATE VIEW %s AS SELECT * FROM %s" % (
+ view_name,
+ fullname,
+ )
+ conn.execute(sa.sql.text(query))
@testing.requires.views
def _drop_views(con, schema=None):
- for table_name in ("email_addresses", "users"):
- fullname = table_name
- if schema:
- fullname = "%s.%s" % (schema, table_name)
- view_name = fullname + "_v"
- query = "DROP VIEW %s" % view_name
- con.execute(sa.sql.text(query))
+ with testing.db.connect() as conn:
+ for table_name in ("email_addresses", "users"):
+ fullname = table_name
+ if schema:
+ fullname = "%s.%s" % (schema, table_name)
+ view_name = fullname + "_v"
+ query = "DROP VIEW %s" % view_name
+ conn.execute(sa.sql.text(query))
class ReverseCasingReflectTest(fixtures.TestBase, AssertsCompiledSQL):
def mydefault_using_connection(ctx):
conn = ctx.connection
- try:
- return conn.execute(sa.select([sa.text("12")])).scalar()
- finally:
- # ensure a "close()" on this connection does nothing,
- # since its a "branched" connection
- conn.close()
+ return conn.execute(sa.select([sa.text("12")])).scalar()
use_function_defaults = testing.against("postgresql", "mssql")
is_oracle = testing.against("oracle")
"Could not locate column in row for column 'text1.b'",
lambda: row[text1.c.b],
)
+
+
+class DefaultTest(fixtures.TestBase):
+ __backend__ = True
+
+ @testing.provide_metadata
+ def test_close_on_branched(self):
+ metadata = self.metadata
+
+ def mydefault_using_connection(ctx):
+ conn = ctx.connection
+ try:
+ return conn.execute(select([text("12")])).scalar()
+ finally:
+ # ensure a "close()" on this connection does nothing,
+ # since its a "branched" connection
+ conn.close()
+
+ table = Table(
+ "foo",
+ metadata,
+ Column("x", Integer),
+ Column("y", Integer, default=mydefault_using_connection),
+ )
+
+ metadata.create_all(testing.db)
+ with testing.db.connect() as conn:
+ with testing.expect_deprecated(
+ r"The .close\(\) method on a so-called 'branched' "
+ r"connection is deprecated as of 1.4, as are "
+ r"'branched' connections overall"
+ ):
+ conn.execute(table.insert().values(x=5))
+
+ eq_(conn.execute(select([table])).first(), (5, 12))