import contextlib
-from sqlalchemy.engine.reflection import Inspector
+from sqlalchemy import inspect
from . import compare
from . import render
@util.memoized_property
def inspector(self):
- return Inspector.from_engine(self.connection)
+ return inspect(self.connection)
@contextlib.contextmanager
def _within_batch(self):
import re
from sqlalchemy import event
+from sqlalchemy import inspect
from sqlalchemy import schema as sa_schema
from sqlalchemy import types as sqltypes
-from sqlalchemy.engine.reflection import Inspector
from sqlalchemy.util import OrderedSet
from alembic.ddl.base import _fk_spec
connection = autogen_context.connection
include_schemas = autogen_context.opts.get("include_schemas", False)
- inspector = Inspector.from_engine(connection)
+ inspector = inspect(connection)
default_schema = connection.dialect.default_schema_name
if include_schemas:
--- /dev/null
+.. change::
+ :tags: change
+
+ The internal inspection routines no longer use SQLAlchemy's
+ ``Inspector.from_engine()`` method, which is expected to be deprecated in
+ 1.4. The ``inspect()`` function is now used.
+
from sqlalchemy import event
from sqlalchemy import ForeignKey
from sqlalchemy import Index
+from sqlalchemy import inspect
from sqlalchemy import Integer
from sqlalchemy import MetaData
from sqlalchemy import Numeric
from sqlalchemy import Text
from sqlalchemy import text
from sqlalchemy import UniqueConstraint
-from sqlalchemy.engine.reflection import Inspector
from alembic import autogenerate
from alembic import util
eq_([elem.column.name for elem in diff[1].elements], target_columns)
if conditional_name is not None:
if conditional_name == "servergenerated":
- fks = Inspector.from_engine(self.bind).get_foreign_keys(
- source_table
- )
+ fks = inspect(self.bind).get_foreign_keys(source_table)
server_fk_name = fks[0]["name"]
eq_(diff[1].name, server_fk_name)
else:
from sqlalchemy import ForeignKey
from sqlalchemy import ForeignKeyConstraint
from sqlalchemy import Index
+from sqlalchemy import inspect
from sqlalchemy import INTEGER
from sqlalchemy import Integer
from sqlalchemy import MetaData
from sqlalchemy import UniqueConstraint
from sqlalchemy import VARCHAR
from sqlalchemy.dialects import sqlite
-from sqlalchemy.engine.reflection import Inspector
from sqlalchemy.types import NULLTYPE
from sqlalchemy.types import VARBINARY
def test_dont_barf_on_already_reflected(self):
from sqlalchemy.util import OrderedSet
- inspector = Inspector.from_engine(self.bind)
+ inspector = inspect(self.bind)
uo = ops.UpgradeOps(ops=[])
autogenerate.compare._compare_tables(
OrderedSet([(None, "extra"), (None, "user")]),
from sqlalchemy import ForeignKeyConstraint
from sqlalchemy import func
from sqlalchemy import Index
+from sqlalchemy import inspect
from sqlalchemy import Integer
from sqlalchemy import MetaData
from sqlalchemy import PrimaryKeyConstraint
from sqlalchemy import String
from sqlalchemy import Table
from sqlalchemy import UniqueConstraint
-from sqlalchemy.engine.reflection import Inspector
from sqlalchemy.schema import CreateIndex
from sqlalchemy.schema import CreateTable
from sqlalchemy.sql import column
type_=Integer,
existing_type=Boolean(create_constraint=True, name="ck1"),
)
- insp = Inspector.from_engine(config.db)
+ insp = inspect(config.db)
eq_(
[
self._boolean_fixture()
with self.op.batch_alter_table("hasbool") as batch_op:
batch_op.drop_column("x")
- insp = Inspector.from_engine(config.db)
+ insp = inspect(config.db)
assert "x" not in (c["name"] for c in insp.get_columns("hasbool"))
batch_op.alter_column(
"x", type_=Boolean(create_constraint=True, name="ck1")
)
- insp = Inspector.from_engine(config.db)
+ insp = inspect(config.db)
if exclusions.against(config, "sqlite"):
eq_(
batch_op.alter_column("data", type_=String(30))
batch_op.create_index("ix_data", ["data"])
- insp = Inspector.from_engine(config.db)
+ insp = inspect(config.db)
eq_(
set(
(ix["name"], tuple(ix["column_names"]))
"data", new_column_name="newdata", existing_type=String(50)
)
- insp = Inspector.from_engine(self.conn)
+ insp = inspect(self.conn)
eq_(
[
(
"data", new_column_name="newdata", existing_type=String(50)
)
- insp = Inspector.from_engine(self.conn)
+ insp = inspect(self.conn)
- insp = Inspector.from_engine(self.conn)
eq_(
[
(
batch_op.drop_column("id")
batch_op.add_column(Column("id", Integer))
- pk_const = Inspector.from_engine(self.conn).get_pk_constraint("foo")
+ pk_const = inspect(self.conn).get_pk_constraint("foo")
eq_(pk_const["constrained_columns"], [])
def test_drop_pk_col_readd_pk_col(self):
batch_op.drop_column("id")
batch_op.add_column(Column("id", Integer, primary_key=True))
- pk_const = Inspector.from_engine(self.conn).get_pk_constraint("foo")
+ pk_const = inspect(self.conn).get_pk_constraint("foo")
eq_(pk_const["constrained_columns"], ["id"])
def test_drop_pk_col_readd_col_also_pk_const(self):
batch_op.add_column(Column("id", Integer))
batch_op.create_primary_key("newpk", ["id"])
- pk_const = Inspector.from_engine(self.conn).get_pk_constraint("foo")
+ pk_const = inspect(self.conn).get_pk_constraint("foo")
eq_(pk_const["constrained_columns"], ["id"])
def test_add_pk_constraint(self):
with self.op.batch_alter_table("nopk", recreate="always") as batch_op:
batch_op.create_primary_key("newpk", ["a", "b"])
- pk_const = Inspector.from_engine(self.conn).get_pk_constraint("nopk")
+ pk_const = inspect(self.conn).get_pk_constraint("nopk")
with config.requirements.reflects_pk_names.fail_if():
eq_(pk_const["name"], "newpk")
eq_(pk_const["constrained_columns"], ["a", "b"])
"bar", naming_convention=naming_convention
) as batch_op:
batch_op.drop_constraint("fk_bar_foo_id_foo", type_="foreignkey")
- eq_(Inspector.from_engine(self.conn).get_foreign_keys("bar"), [])
+ eq_(inspect(self.conn).get_foreign_keys("bar"), [])
def test_drop_column_fk_recreate(self):
with self.op.batch_alter_table("foo", recreate="always") as batch_op:
)
def test_create_drop_index(self):
- insp = Inspector.from_engine(config.db)
+ insp = inspect(config.db)
eq_(insp.get_indexes("foo"), [])
with self.op.batch_alter_table("foo", recreate="always") as batch_op:
]
)
- insp = Inspector.from_engine(config.db)
+ insp = inspect(config.db)
eq_(
[
dict(
with self.op.batch_alter_table("foo", recreate="always") as batch_op:
batch_op.drop_index("ix_data")
- insp = Inspector.from_engine(config.db)
+ insp = inspect(config.db)
eq_(insp.get_indexes("foo"), [])
from dateutil import tz
import sqlalchemy as sa
-from sqlalchemy.engine.reflection import Inspector
+from sqlalchemy import inspect
from alembic import autogenerate
from alembic import command
with self._env_fixture(process_revision_directives, m):
command.upgrade(self.cfg, "heads")
- eq_(
- Inspector.from_engine(self.engine).get_table_names(),
- ["alembic_version"],
- )
+ eq_(inspect(self.engine).get_table_names(), ["alembic_version"])
command.revision(
self.cfg, message="some message", autogenerate=True
command.upgrade(self.cfg, "model1@head")
eq_(
- Inspector.from_engine(self.engine).get_table_names(),
+ inspect(self.engine).get_table_names(),
["alembic_version", "t"],
)
command.upgrade(self.cfg, "model2@head")
- eq_(
- Inspector.from_engine(self.engine).get_table_names(),
- ["alembic_version"],
- )
+ eq_(inspect(self.engine).get_table_names(), ["alembic_version"])
def test_programmatic_command_option(self):
def process_revision_directives(context, rev, generate_revisions):
from sqlalchemy import DateTime
from sqlalchemy import Float
from sqlalchemy import func
+from sqlalchemy import inspect
from sqlalchemy import Integer
from sqlalchemy import MetaData
from sqlalchemy import String
from sqlalchemy import Table
from sqlalchemy import text
-from sqlalchemy.engine.reflection import Inspector
from sqlalchemy.sql import column
from alembic import autogenerate
t1.create(self.bind)
- insp = Inspector.from_engine(self.bind)
+ insp = inspect(self.bind)
cols = insp.get_columns(t1.name)
insp_col = Column(
"somecol", cols[0]["type"], server_default=text(cols[0]["default"])
def _compare_default(self, t1, t2, col, rendered):
t1.create(self.bind, checkfirst=True)
- insp = Inspector.from_engine(self.bind)
+ insp = inspect(self.bind)
cols = insp.get_columns(t1.name)
ctx = self.autogen_context.migration_context
from sqlalchemy import Column
+from sqlalchemy import inspect
from sqlalchemy import MetaData
from sqlalchemy import String
from sqlalchemy import Table
-from sqlalchemy.engine.reflection import Inspector
from alembic import migration
from alembic.testing import assert_raises
connection=self.connection, opts={"version_table": "version_table"}
)
eq_(context.get_current_revision(), None)
- insp = Inspector(self.connection)
+ insp = inspect(self.connection)
assert "version_table" not in insp.get_table_names()
def test_get_current_revision(self):
def test_stamp_api_creates_table(self):
context = self.make_one(connection=self.connection)
assert (
- "alembic_version"
- not in Inspector(self.connection).get_table_names()
+ "alembic_version" not in inspect(self.connection).get_table_names()
)
script = mock.Mock(
context.stamp(script, "b")
eq_(context.get_current_heads(), ("a", "b"))
- assert (
- "alembic_version" in Inspector(self.connection).get_table_names()
- )
+ assert "alembic_version" in inspect(self.connection).get_table_names()
class UpdateRevTest(TestBase):