% (migration_context.script.env_py_location)
)
- include_symbol = opts.get("include_symbol", None)
include_object = opts.get("include_object", None)
+ include_name = opts.get("include_name", None)
object_filters = []
- if include_symbol:
-
- def include_symbol_filter(
- object_, name, type_, reflected, compare_to
- ):
- if type_ == "table":
- return include_symbol(name, object_.schema)
- else:
- return True
-
- object_filters.append(include_symbol_filter)
+ name_filters = []
if include_object:
object_filters.append(include_object)
+ if include_name:
+ name_filters.append(include_name)
self._object_filters = object_filters
+ self._name_filters = name_filters
self.migration_context = migration_context
if self.migration_context is not None:
yield
self._has_batch = False
- def run_filters(self, object_, name, type_, reflected, compare_to):
+ def run_name_filters(self, name, type_, parent_names):
+ """Run the context's name filters and return True if the targets
+ should be part of the autogenerate operation.
+
+ This method should be run for every kind of name encountered within the
+ reflection side of an autogenerate operation, giving the environment
+ the chance to filter what names should be reflected as database
+ objects. The filters here are produced directly via the
+ :paramref:`.EnvironmentContext.configure.include_name` parameter.
+
+ """
+
+ if "schema_name" in parent_names:
+ if type_ == "table":
+ table_name = name
+ else:
+ table_name = parent_names["table_name"]
+ schema_name = parent_names["schema_name"]
+ if schema_name:
+ parent_names["schema_qualified_table_name"] = "%s.%s" % (
+ schema_name,
+ table_name,
+ )
+ else:
+ parent_names["schema_qualified_table_name"] = table_name
+
+ for fn in self._name_filters:
+
+ if not fn(name, type_, parent_names):
+ return False
+ else:
+ return True
+
+ def run_object_filters(self, object_, name, type_, reflected, compare_to):
"""Run the context's object filters and return True if the targets
should be part of the autogenerate operation.
an autogenerate operation, giving the environment the chance
to filter what objects should be included in the comparison.
The filters here are produced directly via the
- :paramref:`.EnvironmentContext.configure.include_object`
- and :paramref:`.EnvironmentContext.configure.include_symbol`
- functions, if present.
+ :paramref:`.EnvironmentContext.configure.include_object` parameter.
"""
for fn in self._object_filters:
else:
return True
+ run_filters = run_object_filters
+
@util.memoized_property
def sorted_tables(self):
"""Return an aggregate of the :attr:`.MetaData.sorted_tables` collection(s).
else:
schemas = [None]
+ schemas = {
+ s for s in schemas if autogen_context.run_name_filters(s, "schema", {})
+ }
+
comparators.dispatch("schema", autogen_context.dialect.name)(
autogen_context, upgrade_ops, schemas
)
)
version_table = autogen_context.migration_context.version_table
- for s in schemas:
- tables = set(inspector.get_table_names(schema=s))
- if s == version_table_schema:
+ for schema_name in schemas:
+ tables = set(inspector.get_table_names(schema=schema_name))
+ if schema_name == version_table_schema:
tables = tables.difference(
[autogen_context.migration_context.version_table]
)
- conn_table_names.update(zip([s] * len(tables), tables))
+
+ conn_table_names.update(
+ (schema_name, tname)
+ for tname in tables
+ if autogen_context.run_name_filters(
+ tname, "table", {"schema_name": schema_name}
+ )
+ )
metadata_table_names = OrderedSet(
[(table.schema, table.name) for table in autogen_context.sorted_tables]
for s, tname in metadata_table_names.difference(conn_table_names):
name = "%s.%s" % (s, tname) if s else tname
metadata_table = tname_to_table[(s, tname)]
- if autogen_context.run_filters(
+ if autogen_context.run_object_filters(
metadata_table, tname, "table", False, None
):
upgrade_ops.ops.append(
# fmt: on
)
sqla_compat._reflect_table(inspector, t, None)
- if autogen_context.run_filters(t, tname, "table", True, None):
+ if autogen_context.run_object_filters(t, tname, "table", True, None):
modify_table_ops = ops.ModifyTableOps(tname, [], schema=s)
metadata_table = tname_to_table[(s, tname)]
conn_table = existing_metadata.tables[name]
- if autogen_context.run_filters(
+ if autogen_context.run_object_filters(
metadata_table, tname, "table", False, conn_table
):
metadata_cols_by_name = dict(
(c.name, c) for c in metadata_table.c if not c.system
)
- conn_col_names = dict((c.name, c) for c in conn_table.c)
+ conn_col_names = dict(
+ (c.name, c)
+ for c in conn_table.c
+ if autogen_context.run_name_filters(
+ c.name, "column", {"table_name": tname, "schema_name": schema}
+ )
+ )
metadata_col_names = OrderedSet(sorted(metadata_cols_by_name))
for cname in metadata_col_names.difference(conn_col_names):
- if autogen_context.run_filters(
+ if autogen_context.run_object_filters(
metadata_cols_by_name[cname], cname, "column", False, None
):
modify_table_ops.ops.append(
for colname in metadata_col_names.intersection(conn_col_names):
metadata_col = metadata_cols_by_name[colname]
conn_col = conn_table.c[colname]
- if not autogen_context.run_filters(
+ if not autogen_context.run_object_filters(
metadata_col, colname, "column", False, conn_col
):
continue
yield
for cname in set(conn_col_names).difference(metadata_col_names):
- if autogen_context.run_filters(
+ if autogen_context.run_object_filters(
conn_table.c[cname], cname, "column", True, None
):
modify_table_ops.ops.append(
# not being present
pass
else:
+ conn_uniques = [
+ uq
+ for uq in conn_uniques
+ if autogen_context.run_name_filters(
+ uq["name"],
+ "unique_constraint",
+ {"table_name": tname, "schema_name": schema},
+ )
+ ]
for uq in conn_uniques:
if uq.get("duplicates_index"):
unique_constraints_duplicate_unique_indexes = True
conn_indexes = inspector.get_indexes(tname, schema=schema)
except NotImplementedError:
pass
+ else:
+ conn_indexes = [
+ ix
+ for ix in conn_indexes
+ if autogen_context.run_name_filters(
+ ix["name"],
+ "index",
+ {"table_name": tname, "schema_name": schema},
+ )
+ ]
# 2. convert conn-level objects from raw inspector records
# into schema objects
def obj_added(obj):
if obj.is_index:
- if autogen_context.run_filters(
+ if autogen_context.run_object_filters(
obj.const, obj.name, "index", False, None
):
modify_ops.ops.append(ops.CreateIndexOp.from_index(obj.const))
if is_create_table or is_drop_table:
# unique constraints are created inline with table defs
return
- if autogen_context.run_filters(
+ if autogen_context.run_object_filters(
obj.const, obj.name, "unique_constraint", False, None
):
modify_ops.ops.append(
# be sure what we're doing here
return
- if autogen_context.run_filters(
+ if autogen_context.run_object_filters(
obj.const, obj.name, "index", True, None
):
modify_ops.ops.append(ops.DropIndexOp.from_index(obj.const))
# if the whole table is being dropped, we don't need to
# consider unique constraint separately
return
- if autogen_context.run_filters(
+ if autogen_context.run_object_filters(
obj.const, obj.name, "unique_constraint", True, None
):
modify_ops.ops.append(
def obj_changed(old, new, msg):
if old.is_index:
- if autogen_context.run_filters(
+ if autogen_context.run_object_filters(
new.const, new.name, "index", False, old.const
):
log.info(
modify_ops.ops.append(ops.DropIndexOp.from_index(old.const))
modify_ops.ops.append(ops.CreateIndexOp.from_index(new.const))
else:
- if autogen_context.run_filters(
+ if autogen_context.run_object_filters(
new.const, new.name, "unique_constraint", False, old.const
):
log.info(
if isinstance(fk, sa_schema.ForeignKeyConstraint)
)
- conn_fks = inspector.get_foreign_keys(tname, schema=schema)
+ conn_fks = [
+ fk
+ for fk in inspector.get_foreign_keys(tname, schema=schema)
+ if autogen_context.run_name_filters(
+ fk["name"],
+ "foreign_key_constraint",
+ {"table_name": tname, "schema_name": schema},
+ )
+ ]
backend_reflects_fk_options = conn_fks and "options" in conn_fks[0]
)
def _add_fk(obj, compare_to):
- if autogen_context.run_filters(
+ if autogen_context.run_object_filters(
obj.const, obj.name, "foreign_key_constraint", False, compare_to
):
modify_table_ops.ops.append(
)
def _remove_fk(obj, compare_to):
- if autogen_context.run_filters(
+ if autogen_context.run_object_filters(
obj.const, obj.name, "foreign_key_constraint", True, compare_to
):
modify_table_ops.ops.append(
template_args=None,
render_as_batch=False,
target_metadata=None,
- include_symbol=None,
+ include_name=None,
include_object=None,
include_schemas=False,
process_revision_directives=None,
:paramref:`.EnvironmentContext.configure.compare_type`
+ :param include_name: A callable function which is given
+ the chance to return ``True`` or ``False`` for any database reflected
+ object based on its name, including database schema names when
+ the :paramref:`.EnvironmentContext.configure.include_schemas` flag
+ is set to ``True``.
+
+ The function accepts the following positional arguments:
+
+ * ``name``: the name of the object, such as schema name or table name.
+ Will be ``None`` when indicating the default schema name of the
+ database connection.
+ * ``type``: a string describing the type of object; currently
+ ``"schema"``, ``"table"``, ``"column"``, ``"index"``,
+ ``"unique_constraint"``, or ``"foreign_key_constraint"``
+ * ``parent_names``: a dictionary of "parent" object names, that are
+ relative to the name being given. Keys in this dictionary may
+ include: ``"schema_name"``, ``"table_name"``.
+
+ E.g.::
+
+ def include_name(name, type_, parent_names):
+ if type_ == "schema":
+ return name in ["schema_one", "schema_two"]
+ else:
+ return True
+
+ context.configure(
+ # ...
+ include_schemas = True,
+ include_name = include_name
+ )
+
+ .. versionadded:: 1.5
+
+ .. seealso::
+
+ :ref:`autogenerate_include_hooks`
+
+ :paramref:`.EnvironmentContext.configure.include_object`
+
+ :paramref:`.EnvironmentContext.configure.include_schemas`
+
+
:param include_object: A callable function which is given
the chance to return ``True`` or ``False`` for any object,
indicating if the given object should be considered in the
* ``type``: a string describing the type of object; currently
``"table"``, ``"column"``, ``"index"``, ``"unique_constraint"``,
or ``"foreign_key_constraint"``
-
- .. versionadded:: 0.7.0 Support for indexes and unique constraints
- within the
- :paramref:`~.EnvironmentContext.configure.include_object` hook.
-
- .. versionadded:: 0.7.1 Support for foreign keys within the
- :paramref:`~.EnvironmentContext.configure.include_object` hook.
-
* ``reflected``: ``True`` if the given object was produced based on
table reflection, ``False`` if it's from a local :class:`.MetaData`
object.
include_object = include_object
)
- :paramref:`.EnvironmentContext.configure.include_object` can also
- be used to filter on specific schemas to include or omit, when
- the :paramref:`.EnvironmentContext.configure.include_schemas`
- flag is set to ``True``. The :attr:`.Table.schema` attribute
- on each :class:`.Table` object reflected will indicate the name of the
- schema from which the :class:`.Table` originates.
+ For the use case of omitting specific schemas from a target database
+ when :paramref:`.EnvironmentContext.configure.include_schemas` is
+ set to ``True``, the :attr:`~sqlalchemy.schema.Table.schema`
+ attribute can be checked for each :class:`~sqlalchemy.schema.Table`
+ object passed to the hook, however it is much more efficient
+ to filter on schemas before reflection of objects takes place
+ using the :paramref:`.EnvironmentContext.configure.include_name`
+ hook.
.. versionadded:: 0.6.0
.. seealso::
- :paramref:`.EnvironmentContext.configure.include_schemas`
-
- :param include_symbol: A callable function which, given a table name
- and schema name (may be ``None``), returns ``True`` or ``False``,
- indicating if the given table should be considered in the
- autogenerate sweep.
-
- .. deprecated:: 0.6.0
- :paramref:`.EnvironmentContext.configure.include_symbol`
- is superceded by the more generic
- :paramref:`.EnvironmentContext.configure.include_object`
- parameter.
-
- E.g.::
+ :ref:`autogenerate_include_hooks`
- def include_symbol(tablename, schema):
- return tablename not in ("skip_table_one", "skip_table_two")
-
- context.configure(
- # ...
- include_symbol = include_symbol
- )
-
- .. seealso::
+ :paramref:`.EnvironmentContext.configure.include_name`
:paramref:`.EnvironmentContext.configure.include_schemas`
- :paramref:`.EnvironmentContext.configure.include_object`
-
:param render_as_batch: if True, commands which alter elements
within a table will be placed under a ``with batch_alter_table():``
directive, so that batch migrations will take place.
:meth:`~sqlalchemy.engine.reflection.Inspector.get_schema_names`
method, and include all differences in tables found across all
those schemas. When using this option, you may want to also
- use the :paramref:`.EnvironmentContext.configure.include_object`
- option to specify a callable which
+ use the :paramref:`.EnvironmentContext.configure.include_name`
+ parameter to specify a callable which
can filter the tables/schemas that get included.
.. seealso::
+ :ref:`autogenerate_include_hooks`
+
+ :paramref:`.EnvironmentContext.configure.include_name`
+
:paramref:`.EnvironmentContext.configure.include_object`
:param render_item: Callable that can be used to override how
opts["template_args"].update(template_args)
opts["transaction_per_migration"] = transaction_per_migration
opts["target_metadata"] = target_metadata
- opts["include_symbol"] = include_symbol
+ opts["include_name"] = include_name
opts["include_object"] = include_object
opts["include_schemas"] = include_schemas
opts["render_as_batch"] = render_as_batch
autogeneration of multiple :class:`~sqlalchemy.schema.MetaData`
collections.
+.. _autogenerate_include_hooks:
+
+Controlling What to be Autogenerated
+------------------------------------
+
+The autogenerate process scans across all table objects within
+the database that is referred towards by the current database connection
+in use.
+
+The list of objects that are scanned in the target database connection include:
+
+* The "default" schema currently referred towards by the database connection.
+
+* If the :paramref:`.EnvironmentContext.configure.include_schemas` is set to
+ ``True``, all non-default "schemas", which are those names returned by the
+ :meth:`~sqlalchemy.engine.reflection.Inspector.get_schema_names` method of
+ :class:`~sqlalchemy.engine.reflection.Inspector`. The SQLAlchemy document
+ :ref:`sqla:schema_table_schema_name` discusses the concept of a
+ "schema" in detail.
+
+* Within each "schema", all tables present are scanned using the
+ :meth:`~sqlalchemy.engine.reflection.Inspector.get_table_names` method of
+ :class:`~sqlalchemy.engine.reflection.Inspector`.
+
+* Within each "table", most sub-objects of the each
+ :class:`~sqlalchemy.schema.Table` construct are scanned, including columns
+ and some forms of constraints. This process ultimately involves the use of
+ methods on :class:`~sqlalchemy.engine.reflection.Inspector` including
+ :meth:`~sqlalchemy.engine.reflection.Inspector.get_columns`,
+ :meth:`~sqlalchemy.engine.reflection.Inspector.get_indexes`,
+ :meth:`~sqlalchemy.engine.reflection.Inspector.get_unique_constraints`,
+ :meth:`~sqlalchemy.engine.reflection.Inspector.get_foreign_keys` (as of this
+ writing, CHECK constraints and primary key constraints are not yet included).
+
+Omitting Schema Names from the Autogenerate Process
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+As the above set of database objects are typically to be compared to the contents of
+a single :class:`~sqlalchemy.schema.MetaData` object, particularly when the
+:paramref:`.EnvironmentContext.configure.include_schemas` flag is enabled
+there is an important need to filter out unwanted "schemas", which for some
+database backends might be the list of all the databases present. This
+filtering is best performed using the :paramref:`.EnvironmentContext.configure.include_name`
+hook, which provides for a callable that may return a boolean true/false
+indicating if a particular schema name should be included::
+
+ def include_name(name, type_, parent_names):
+ if type_ == "schema":
+ # note this will not include the default schema
+ return name in ["schema_one", "schema_two"]
+ else:
+ return True
+
+ context.configure(
+ # ...
+ include_schemas = True,
+ include_name = include_name
+ )
+
+Above, when the list of schema names is first retrieved, the names will be
+filtered through the above ``include_name`` function so that only schemas
+named ``"schema_one"`` and ``"schema_two"`` will be considered by the
+autogenerate process.
+
+In order to include **the default schema**, that is, the schema that is
+referred towards by the database connection **without** any explicit
+schema being specified, the name passed to the hook is ``None``. To alter
+our above example to also include the default schema, we compare to
+``None`` as well::
+
+ def include_name(name, type_, parent_names):
+ if type_ == "schema":
+ # this **will* include the default schema
+ return name in [None, "schema_one", "schema_two"]
+ else:
+ return True
+
+ context.configure(
+ # ...
+ include_schemas = True,
+ include_name = include_name
+ )
+
+Omitting Table Names from the Autogenerate Process
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+The :paramref:`.EnvironmentContext.configure.include_name` hook is also
+most appropriate to limit the names of tables in the target database
+to be considered. If a target database has many tables that are not
+part of the :class:`~sqlalchemy.schema.MetaData`, the autogenerate process
+will normally assume these are extraneous tables in the database to be
+dropped, and it will generate a :meth:`.Operations.drop_table` operation
+for each. To prevent this, the :paramref:`.EnvironmentContext.configure.include_name`
+hook may be used to search for each name within the
+:attr:`~sqlalchemy.schema.MetaData.tables` collection of the
+:class:`~sqlalchemy.schema.MetaData` object and ensure names
+which aren't present are not included::
+
+ target_metadata = MyModel.metadata
+
+ def include_name(name, type_, parent_names):
+ if type_ == "table":
+ return name in target_metadata.tables
+ else:
+ return True
+
+ context.configure(
+ # ...
+ target_metadata = target_metadata,
+ include_name = include_name,
+ include_schemas = False
+ )
+
+The above example is limited to table names present in the default schema only.
+In order to search within a :class:`~sqlalchemy.schema.MetaData` collection for
+schema-qualified table names as well, a table present in the non
+default schema will be present under a name of the form
+``<schemaname>.<tablename>``. The
+:paramref:`.EnvironmentContext.configure.include_name` hook will present
+this schema name on a per-tablename basis in the ``parent_names`` dictionary,
+using the key ``"schema_name"`` that refers to the name of the
+schema currently being considered, or ``None`` if the schema is the default
+schema of the database connection::
+
+ # example fragment
+
+ if parent_names["schema_name"] is None:
+ return name in target_metadata.tables
+ else:
+ # build out schema-qualified name explicitly...
+ return (
+ "%s.%s" % (parent_names["schema_name"], name) in
+ target_metadata.tables
+ )
+
+However more simply, the ``parent_names`` dictionary will also include
+the dot-concatenated name already constructed under the key
+``"schema_qualified_table_name"``, which will also be suitably formatted
+for tables in the default schema as well with the dot omitted. So the
+full example of omitting tables with schema support may look like::
+
+ target_metadata = MyModel.metadata
+
+ def include_name(name, type_, parent_names):
+ if type == "schema":
+ return name in [None, "schema_one", "schema_two"]
+ elif type_ == "table":
+ # use schema_qualified_table_name directly
+ return (
+ parent_names["schema_qualified_table_name"] in
+ target_metadata.tables
+ )
+ else:
+ return True
+
+ context.configure(
+ # ...
+ target_metadata = target_metadata,
+ include_name = include_name,
+ include_schemas = True
+ )
+
+The ``parent_names`` dictionary will also include the key ``"table_name"``
+when the name being considered is that of a column or constraint object
+local to a particular table.
+
+The :paramref:`.EnvironmentContext.configure.include_name` hook only refers
+to **reflected** objects, and not those located within the target
+:class:`~sqlalchemy.schema.MetaData` collection. For more fine-grained
+rules that include both :class:`~sqlalchemy.schema.MetaData` and reflected
+object, the :paramref:`.EnvironmentContext.configure.include_object` hook
+discussed in the next section is more appropriate.
+
+.. versionadded:: 1.5 added the :paramref:`.EnvironmentContext.configure.include_name`
+ hook.
+
+Omitting Based on Object
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+The :paramref:`.EnvironmentContext.configure.include_object` hook provides
+for object-level inclusion/exclusion rules based on the :class:`~sqlalchemy.schema.Table`
+object being reflected as well as the elements within it. This hook can
+be used to limit objects both from the local :class:`~sqlalchemy.schema.MetaData`
+collection as well as from the target database. The limitation is that when
+it reports on objects in the database, it will have fully reflected that object,
+which can be expensive if a large number of objects will be omitted. The
+example below refers to a fine-grained rule that will skip changes on
+:class:`~sqlalchemy.schema.Column` objects that have a user-defined flag
+``skip_autogenerate`` placed into the :attr:`~sqlalchemy.schema.Column.info`
+dictionary::
+
+ def include_object(object, name, type_, reflected, compare_to):
+ if (type_ == "column" and
+ not reflected and
+ object.info.get("skip_autogenerate", False)):
+ return False
+ else:
+ return True
+
+ context.configure(
+ # ...
+ include_object = include_object
+ )
+
+
+
Comparing and Rendering Types
------------------------------
--- /dev/null
+.. change::
+ :tags: feature, autogenerate
+ :tickets: 650
+
+ Added new hook :paramref:`.EnvironmentContext.configure.include_name`,
+ which complements the
+ :paramref:`.EnvironmentContext.configure.include_object` hook by providing
+ a means of preventing objects of a certain name from being autogenerated
+ **before** the SQLAlchemy reflection process takes place, and notably
+ includes explicit support for passing each schema name when
+ :paramref:`.EnvironmentContext.configure.include_schemas` is set to True.
+ This is most important especially for enviroments that make use of
+ :paramref:`.EnvironmentContext.configure.include_schemas` where schemas are
+ actually databases (e.g. MySQL) in order to prevent reflection sweeps of
+ the entire server.
+
+ .. seealso::
+
+ :ref:`autogenerate_include_hooks` - new documentation section
+
+.. change::
+ :tags: removed, autogenerate
+
+ The long deprecated
+ :paramref:`.EnvironmentContext.configure.include_symbol` hook is removed.
+ The :paramref:`.EnvironmentContext.configure.include_object`
+ and :paramref:`.EnvironmentContext.configure.include_name`
+ hooks both achieve the goals of this hook.
+
_default_object_filters = _default_include_object
+_default_name_filters = None
+
class ModelOne(object):
__requires__ = ("unique_constraint_reflection",)
"alembic_module_prefix": "op.",
"sqlalchemy_module_prefix": "sa.",
"include_object": _default_object_filters,
+ "include_name": _default_name_filters,
}
if self.configure_opts:
ctx_opts.update(self.configure_opts)
def tearDown(self):
self.conn.close()
- def _update_context(self, object_filters=None, include_schemas=None):
+ def _update_context(
+ self, object_filters=None, name_filters=None, include_schemas=None
+ ):
if include_schemas is not None:
self.autogen_context.opts["include_schemas"] = include_schemas
if object_filters is not None:
self.autogen_context._object_filters = [object_filters]
+ if name_filters is not None:
+ self.autogen_context._name_filters = [name_filters]
return self.autogen_context
include_schemas=False,
opts=None,
object_filters=_default_object_filters,
+ name_filters=_default_name_filters,
return_ops=False,
max_identifier_length=None,
):
"alembic_module_prefix": "op.",
"sqlalchemy_module_prefix": "sa.",
"include_object": object_filters,
+ "include_name": name_filters,
"include_schemas": include_schemas,
}
if opts:
"downgrade_token": "downgrades",
"alembic_module_prefix": "op.",
"sqlalchemy_module_prefix": "sa.",
- "include_symbol": lambda name, schema: False,
+ "include_object": lambda name, *args: False,
},
)
template_args = {}
from alembic.testing.env import clear_staging_env
from alembic.testing.env import staging_env
from alembic.util import CommandError
+from ._autogen_fixtures import _default_name_filters
from ._autogen_fixtures import _default_object_filters
from ._autogen_fixtures import AutogenFixtureTest
from ._autogen_fixtures import AutogenTest
eq_(diffs[0][0], "add_table")
eq_(diffs[0][1].schema, None)
+ def test_default_schema_omitted_by_table_name_upgrade(self):
+ def include_name(name, type_, parent_names):
+ if type_ == "table":
+ retval = name in ["t1", "t6"]
+ if retval:
+ eq_(parent_names["schema_name"], None)
+ eq_(parent_names["schema_qualified_table_name"], name)
+ else:
+ eq_(parent_names["schema_name"], config.test_schema)
+ eq_(
+ parent_names["schema_qualified_table_name"],
+ "%s.%s" % (config.test_schema, name),
+ )
+ return retval
+ else:
+ return True
+
+ self._update_context(name_filters=include_name, include_schemas=True)
+ uo = ops.UpgradeOps(ops=[])
+ autogenerate._produce_net_changes(self.autogen_context, uo)
+
+ diffs = uo.as_diffs()
+ eq_(
+ {(d[0], d[1].name) for d in diffs},
+ {
+ ("add_table", "t3"),
+ ("add_table", "t4"),
+ ("remove_table", "t1"),
+ ("add_table", "t7"),
+ },
+ )
+
+ def test_default_schema_omitted_by_schema_name_upgrade(self):
+ def include_name(name, type_, parent_names):
+ if type_ == "schema":
+ assert not parent_names
+ return name is None
+ else:
+ return True
+
+ self._update_context(name_filters=include_name, include_schemas=True)
+ uo = ops.UpgradeOps(ops=[])
+ autogenerate._produce_net_changes(self.autogen_context, uo)
+
+ diffs = uo.as_diffs()
+ eq_(
+ {(d[0], d[1].name) for d in diffs},
+ {
+ ("add_table", "t3"),
+ ("add_table", "t4"),
+ ("remove_table", "t1"),
+ ("add_table", "t7"),
+ },
+ )
+
def test_alt_schema_included_upgrade(self):
def include_object(obj, name, type_, reflected, compare_to):
if type_ == "table":
eq_(diffs[0][0], "add_table")
eq_(diffs[0][1].schema, config.test_schema)
+ def test_alt_schema_included_by_schema_name(self):
+ def include_name(name, type_, parent_names):
+ if type_ == "schema":
+ assert not parent_names
+ return name == config.test_schema
+ else:
+ return True
+
+ self._update_context(name_filters=include_name, include_schemas=True)
+ uo = ops.UpgradeOps(ops=[])
+ autogenerate._produce_net_changes(self.autogen_context, uo)
+
+ # does not include "t1" in drops because t1 is in default schema
+ # includes "t6" in adds because t6 is in default schema, was omitted,
+ # so reflection added it
+ diffs = uo.as_diffs()
+ eq_(
+ {(d[0], d[1].name) for d in diffs},
+ {
+ ("add_table", "t3"),
+ ("add_table", "t6"),
+ ("add_table", "t4"),
+ ("remove_table", "t2"),
+ },
+ )
+
def test_default_schema_omitted_downgrade(self):
def include_object(obj, name, type_, reflected, compare_to):
if type_ == "table":
eq_(diffs[10][3].table.name, "user")
assert isinstance(diffs[10][3].type, String)
- def test_include_symbol(self):
-
- diffs = []
-
- def include_symbol(name, schema=None):
- return name in ("address", "order")
+ def test_include_object(self):
+ def include_object(obj, name, type_, reflected, compare_to):
+ assert obj.name == name
+ if type_ == "table":
+ if reflected:
+ assert obj.metadata is not self.m2
+ else:
+ assert obj.metadata is self.m2
+ return name in ("address", "order", "user")
+ elif type_ == "column":
+ if reflected:
+ assert obj.table.metadata is not self.m2
+ else:
+ assert obj.table.metadata is self.m2
+ return name != "street"
+ else:
+ return True
context = MigrationContext.configure(
connection=self.bind.connect(),
"compare_type": True,
"compare_server_default": True,
"target_metadata": self.m2,
- "include_symbol": include_symbol,
+ "include_object": include_object,
},
)
context, context.opts["target_metadata"]
)
- alter_cols = set(
- [
- d[2]
+ alter_cols = (
+ set(
+ [
+ d[2]
+ for d in self._flatten_diffs(diffs)
+ if d[0].startswith("modify")
+ ]
+ )
+ .union(
+ d[3].name
+ for d in self._flatten_diffs(diffs)
+ if d[0] == "add_column"
+ )
+ .union(
+ d[1].name
for d in self._flatten_diffs(diffs)
- if d[0].startswith("modify")
- ]
+ if d[0] == "add_table"
+ )
)
- eq_(alter_cols, set(["order"]))
+ eq_(alter_cols, set(["user_id", "order", "user"]))
- def test_include_object(self):
- def include_object(obj, name, type_, reflected, compare_to):
- assert obj.name == name
+ def test_include_name(self):
+ all_names = set()
+
+ def include_name(name, type_, parent_names):
+ all_names.add((name, type_, parent_names.get("table_name", None)))
if type_ == "table":
- if reflected:
- assert obj.metadata is not self.m2
- else:
- assert obj.metadata is self.m2
+ eq_(
+ parent_names,
+ {"schema_name": None, "schema_qualified_table_name": name},
+ )
return name in ("address", "order", "user")
elif type_ == "column":
- if reflected:
- assert obj.table.metadata is not self.m2
- else:
- assert obj.table.metadata is self.m2
return name != "street"
else:
return True
"compare_type": True,
"compare_server_default": True,
"target_metadata": self.m2,
- "include_object": include_object,
+ "include_name": include_name,
},
)
diffs = autogenerate.compare_metadata(
context, context.opts["target_metadata"]
)
+ eq_(
+ all_names,
+ {
+ (None, "schema", None),
+ ("user", "table", None),
+ ("id", "column", "user"),
+ ("name", "column", "user"),
+ ("a1", "column", "user"),
+ ("pw", "column", "user"),
+ ("pw_idx", "index", "user"),
+ ("order", "table", None),
+ ("order_id", "column", "order"),
+ ("amount", "column", "order"),
+ ("address", "table", None),
+ ("id", "column", "address"),
+ ("email_address", "column", "address"),
+ ("extra", "table", None),
+ },
+ )
alter_cols = (
set(
if d[0] == "add_table"
)
)
- eq_(alter_cols, set(["user_id", "order", "user"]))
+ eq_(alter_cols, {"user_id", "order", "user", "street", "item"})
def test_skip_null_type_comparison_reflected(self):
ac = ops.AlterColumnOp("sometable", "somecol")
"alembic_module_prefix": "op.",
"sqlalchemy_module_prefix": "sa.",
"include_object": _default_object_filters,
+ "include_name": _default_name_filters,
}
if self.configure_opts:
ctx_opts.update(self.configure_opts)
eq_(diffs[1][2], "order")
eq_(diffs[1][3], metadata.tables["order"].c.user_id)
- def test_compare_metadata_include_symbol(self):
+ def test_compare_metadata_include_name(self):
metadata = self.m2
- def include_symbol(table_name, schema_name):
- return table_name in ("extra", "order")
+ all_names = set()
+
+ def include_name(name, type_, parent_names):
+ all_names.add((name, type_, parent_names.get("table_name", None)))
+ if type_ == "table":
+ return name in ("extra", "order")
+ elif type_ == "column":
+ return name != "amount"
+ else:
+ return True
context = MigrationContext.configure(
connection=self.bind.connect(),
opts={
"compare_type": True,
"compare_server_default": True,
- "include_symbol": include_symbol,
+ "include_name": include_name,
},
)
diffs = autogenerate.compare_metadata(context, metadata)
+ eq_(
+ all_names,
+ {
+ ("user", "table", None),
+ ("order", "table", None),
+ ("address", "table", None),
+ (None, "schema", None),
+ ("amount", "column", "order"),
+ ("extra", "table", None),
+ ("order_id", "column", "order"),
+ },
+ )
- eq_(diffs[0][0], "remove_table")
- eq_(diffs[0][1].name, "extra")
-
- eq_(diffs[1][0], "add_column")
- eq_(diffs[1][1], None)
- eq_(diffs[1][2], "order")
- eq_(diffs[1][3], metadata.tables["order"].c.user_id)
-
- eq_(diffs[2][0][0], "modify_type")
- eq_(diffs[2][0][1], None)
- eq_(diffs[2][0][2], "order")
- eq_(diffs[2][0][3], "amount")
- eq_(repr(diffs[2][0][5]), "NUMERIC(precision=8, scale=2)")
- eq_(repr(diffs[2][0][6]), "Numeric(precision=10, scale=2)")
-
- eq_(diffs[2][1][0], "modify_nullable")
- eq_(diffs[2][1][2], "order")
- eq_(diffs[2][1][5], False)
- eq_(diffs[2][1][6], True)
+ eq_(
+ {
+ (
+ d[0],
+ d[3].name if d[0] == "add_column" else d[1].name,
+ d[2] if d[0] == "add_column" else None,
+ )
+ for d in diffs
+ },
+ {
+ ("remove_table", "extra", None),
+ ("add_fk", None, None),
+ ("add_column", "amount", "order"),
+ ("add_table", "user", None),
+ ("add_table", "item", None),
+ ("add_column", "user_id", "order"),
+ ("add_table", "address", None),
+ },
+ )
def test_compare_metadata_as_sql(self):
context = MigrationContext.configure(
from sqlalchemy import String
from sqlalchemy import Table
+from alembic.testing import combinations
from alembic.testing import config
from alembic.testing import eq_
from alembic.testing import mock
__backend__ = True
__requires__ = ("fk_names",)
+ @combinations(("object",), ("name",))
@config.requirements.no_name_normalize
- def test_remove_connection_fk(self):
+ def test_remove_connection_fk(self, hook_type):
m1 = MetaData()
m2 = MetaData()
mysql_engine="InnoDB",
)
- def include_object(object_, name, type_, reflected, compare_to):
- return not (
- isinstance(object_, ForeignKeyConstraint)
- and type_ == "foreign_key_constraint"
- and reflected
- and name == "fk1"
- )
-
- diffs = self._fixture(m1, m2, object_filters=include_object)
+ if hook_type == "object":
+
+ def include_object(object_, name, type_, reflected, compare_to):
+ return not (
+ isinstance(object_, ForeignKeyConstraint)
+ and type_ == "foreign_key_constraint"
+ and reflected
+ and name == "fk1"
+ )
+
+ diffs = self._fixture(m1, m2, object_filters=include_object)
+ elif hook_type == "name":
+
+ def include_name(name, type_, parent_names):
+ if name == "fk1":
+ if type_ == "index": # MariaDB thing
+ return True
+ eq_(type_, "foreign_key_constraint")
+ eq_(
+ parent_names,
+ {
+ "schema_name": None,
+ "table_name": "t",
+ "schema_qualified_table_name": "t",
+ },
+ )
+ return False
+ else:
+ return True
+
+ diffs = self._fixture(m1, m2, name_filters=include_name)
self._assert_fk_diff(
diffs[0],
)
eq_(len(diffs), 1)
+ @combinations(("object",), ("name",))
@config.requirements.no_name_normalize
- def test_change_fk(self):
+ def test_change_fk(self, hook_type):
m1 = MetaData()
m2 = MetaData()
)
)
- def include_object(object_, name, type_, reflected, compare_to):
- return not (
- isinstance(object_, ForeignKeyConstraint)
- and type_ == "foreign_key_constraint"
- and name == "fk1"
- )
+ if hook_type == "object":
- diffs = self._fixture(m1, m2, object_filters=include_object)
+ def include_object(object_, name, type_, reflected, compare_to):
+ return not (
+ isinstance(object_, ForeignKeyConstraint)
+ and type_ == "foreign_key_constraint"
+ and name == "fk1"
+ )
- self._assert_fk_diff(
- diffs[0], "remove_fk", "t", ["y"], "ref_a", ["a"], name="fk2"
- )
- self._assert_fk_diff(
- diffs[1],
- "add_fk",
- "t",
- ["y", "z"],
- "ref_b",
- ["a", "b"],
- name="fk2",
- )
- eq_(len(diffs), 2)
+ diffs = self._fixture(m1, m2, object_filters=include_object)
+ elif hook_type == "name":
+
+ def include_name(name, type_, parent_names):
+ if type_ == "index":
+ return True # MariaDB thing
+
+ if name == "fk1":
+ eq_(type_, "foreign_key_constraint")
+ eq_(
+ parent_names,
+ {
+ "schema_name": None,
+ "table_name": "t",
+ "schema_qualified_table_name": "t",
+ },
+ )
+ return False
+ else:
+ return True
+
+ diffs = self._fixture(m1, m2, name_filters=include_name)
+
+ if hook_type == "object":
+ self._assert_fk_diff(
+ diffs[0], "remove_fk", "t", ["y"], "ref_a", ["a"], name="fk2"
+ )
+ self._assert_fk_diff(
+ diffs[1],
+ "add_fk",
+ "t",
+ ["y", "z"],
+ "ref_b",
+ ["a", "b"],
+ name="fk2",
+ )
+ eq_(len(diffs), 2)
+ elif hook_type == "name":
+ eq_(
+ {(d[0], d[1].name) for d in diffs},
+ {("add_fk", "fk2"), ("add_fk", "fk1"), ("remove_fk", "fk2")},
+ )
class AutogenerateFKOptionsTest(AutogenFixtureTest, TestBase):
from sqlalchemy import UniqueConstraint
from alembic.testing import assertions
+from alembic.testing import combinations
from alembic.testing import config
from alembic.testing import eq_
from alembic.testing import TestBase
class IncludeHooksTest(AutogenFixtureTest, TestBase):
__backend__ = True
- def test_remove_connection_index(self):
+ @combinations(("name",), ("object",))
+ def test_remove_connection_index(self, hook_type):
m1 = MetaData()
m2 = MetaData()
Table("t", m2, Column("x", Integer), Column("y", Integer))
- def include_object(object_, name, type_, reflected, compare_to):
- if type_ == "unique_constraint":
- return False
- return not (
- isinstance(object_, Index)
- and type_ == "index"
- and reflected
- and name == "ix1"
- )
+ if hook_type == "object":
- diffs = self._fixture(m1, m2, object_filters=include_object)
+ def include_object(object_, name, type_, reflected, compare_to):
+ if type_ == "unique_constraint":
+ return False
+ return not (
+ isinstance(object_, Index)
+ and type_ == "index"
+ and reflected
+ and name == "ix1"
+ )
+
+ diffs = self._fixture(m1, m2, object_filters=include_object)
+ elif hook_type == "name":
+ all_names = set()
+
+ def include_name(name, type_, parent_names):
+ all_names.add((name, type_))
+ if name == "ix1":
+ eq_(type_, "index")
+ eq_(
+ parent_names,
+ {
+ "table_name": "t",
+ "schema_name": None,
+ "schema_qualified_table_name": "t",
+ },
+ )
+ return False
+ else:
+ return True
+
+ diffs = self._fixture(m1, m2, name_filters=include_name)
+ eq_(
+ all_names,
+ {
+ ("ix1", "index"),
+ ("ix2", "index"),
+ ("y", "column"),
+ ("t", "table"),
+ (None, "schema"),
+ ("x", "column"),
+ },
+ )
eq_(diffs[0][0], "remove_index")
eq_(diffs[0][1].name, "ix2")
eq_(len(diffs), 1)
+ @combinations(("name",), ("object",))
@config.requirements.unique_constraint_reflection
@config.requirements.reflects_unique_constraints_unambiguously
- def test_remove_connection_uq(self):
+ def test_remove_connection_uq(self, hook_type):
m1 = MetaData()
m2 = MetaData()
Table("t", m2, Column("x", Integer), Column("y", Integer))
- def include_object(object_, name, type_, reflected, compare_to):
- if type_ == "index":
- return False
- return not (
- isinstance(object_, UniqueConstraint)
- and type_ == "unique_constraint"
- and reflected
- and name == "uq1"
- )
+ if hook_type == "object":
- diffs = self._fixture(m1, m2, object_filters=include_object)
+ def include_object(object_, name, type_, reflected, compare_to):
+ if type_ == "index":
+ return False
+ return not (
+ isinstance(object_, UniqueConstraint)
+ and type_ == "unique_constraint"
+ and reflected
+ and name == "uq1"
+ )
+
+ diffs = self._fixture(m1, m2, object_filters=include_object)
+ elif hook_type == "name":
+ all_names = set()
+
+ def include_name(name, type_, parent_names):
+ if type_ == "index":
+ return False # PostgreSQL thing
+
+ all_names.add((name, type_))
+
+ if name == "uq1":
+ eq_(type_, "unique_constraint")
+ eq_(
+ parent_names,
+ {
+ "table_name": "t",
+ "schema_name": None,
+ "schema_qualified_table_name": "t",
+ },
+ )
+ return False
+ return True
+
+ diffs = self._fixture(m1, m2, name_filters=include_name)
+ eq_(
+ all_names,
+ {
+ ("t", "table"),
+ (None, "schema"),
+ ("uq2", "unique_constraint"),
+ ("x", "column"),
+ ("y", "column"),
+ ("uq1", "unique_constraint"),
+ },
+ )
eq_(diffs[0][0], "remove_constraint")
eq_(diffs[0][1].name, "uq2")