raise util.CommandError(
"Can't proceed with --autogenerate option; environment "
"script %s does not provide "
- "a MetaData object to the context." % (
+ "a MetaData object or sequence of objects to the context." % (
migration_context.script.env_py_location
))
else:
return True
+ @util.memoized_property
+ def sorted_tables(self):
+ """Return an aggregate of the :attr:`.MetaData.sorted_tables` collection(s).
+
+ For a sequence of :class:`.MetaData` objects, this
+ concatenates the :attr:`.MetaData.sorted_tables` collection
+ for each individual :class:`.MetaData` in the order of the
+ sequence. It does **not** collate the sorted tables collections.
+
+ .. versionadded:: 0.9.0
+
+ """
+ result = []
+ for m in util.to_list(self.metadata):
+ result.extend(m.sorted_tables)
+ return result
+
+ @util.memoized_property
+ def table_key_to_table(self):
+ """Return an aggregate of the :attr:`.MetaData.tables` dictionaries.
+
+ The :attr:`.MetaData.tables` collection is a dictionary of table key
+ to :class:`.Table`; this method aggregates the dictionary across
+ multiple :class:`.MetaData` objects into one dictionary.
+
+ Duplicate table keys are **not** supported; if two :class:`.MetaData`
+ objects contain the same table key, an exception is raised.
+
+ .. versionadded:: 0.9.0
+
+ """
+ result = {}
+ for m in util.to_list(self.metadata):
+ intersect = set(result).intersection(set(m.tables))
+ if intersect:
+ raise ValueError(
+ "Duplicate table keys across multiple "
+ "MetaData objects: %s" %
+ (", ".join('"%s"' % key for key in sorted(intersect)))
+ )
+
+ result.update(m.tables)
+ return result
+
class RevisionContext(object):
"""Maintains configuration and state that's specific to a revision
def _autogen_for_tables(autogen_context, upgrade_ops, schemas):
inspector = autogen_context.inspector
- metadata = autogen_context.metadata
-
conn_table_names = set()
version_table_schema = \
conn_table_names.update(zip([s] * len(tables), tables))
metadata_table_names = OrderedSet(
- [(table.schema, table.name) for table in metadata.sorted_tables]
+ [(table.schema, table.name) for table in autogen_context.sorted_tables]
).difference([(version_table_schema, version_table)])
_compare_tables(conn_table_names, metadata_table_names,
- inspector, metadata, upgrade_ops, autogen_context)
+ inspector, upgrade_ops, autogen_context)
def _compare_tables(conn_table_names, metadata_table_names,
- inspector, metadata, upgrade_ops, autogen_context):
+ inspector, upgrade_ops, autogen_context):
default_schema = inspector.bind.dialect.default_schema_name
tname_to_table = dict(
(
no_dflt_schema,
- metadata.tables[sa_schema._get_table_key(tname, schema)]
+ autogen_context.table_key_to_table[
+ sa_schema._get_table_key(tname, schema)]
)
for no_dflt_schema, (schema, tname) in zip(
metadata_table_names_no_dflt_schema,
``alembic revision`` is run with the ``--autogenerate`` feature:
:param target_metadata: a :class:`sqlalchemy.schema.MetaData`
- object that
- will be consulted during autogeneration. The tables present
+ object, or a sequence of :class:`~sqlalchemy.schema.MetaData`
+ objects, that will be consulted during autogeneration.
+ The tables present in each :class:`~sqlalchemy.schema.MetaData`
will be compared against
what is locally available on the target
:class:`~sqlalchemy.engine.Connection`
to produce candidate upgrade/downgrade operations.
+ .. versionchanged:: 0.9.0 the
+ :paramref:`.EnvironmentContext.configure.target_metadata`
+ parameter may now be passed a sequence of
+ :class:`~sqlalchemy.schema.MetaData` objects to support
+ autogeneration of multiple :class:`~sqlalchemy.schema.MetaData`
+ collections.
+
:param compare_type: Indicates type comparison behavior during
an autogenerate
operation. Defaults to ``False`` which disables type
the supporting SQLAlchemy dialect.
* Sequence additions, removals - not yet implemented.
+Autogenerating Multiple MetaData collections
+--------------------------------------------
+
+The ``target_metadata`` collection may also be defined as a sequence
+if an application has multiple :class:`~sqlalchemy.schema.MetaData`
+collections involved::
+
+ from myapp.mymodel1 import Model1Base
+ from myapp.mymodel2 import Model2Base
+ target_metadata = [Model1Base.metadata, Model2Base.metadata]
+
+The sequence of :class:`~sqlalchemy.schema.MetaData` collections will be
+consulted in order during the autogenerate process. Note that each
+:class:`~sqlalchemy.schema.MetaData` must contain **unique** table keys
+(e.g. the "key" is the combination of the table's name and schema);
+if two :class:`~sqlalchemy.schema.MetaData` objects contain a table
+with the same schema/name combination, an error is raised.
+
+.. versionchanged:: 0.9.0 the
+ :paramref:`.EnvironmentContext.configure.target_metadata`
+ parameter may now be passed a sequence of
+ :class:`~sqlalchemy.schema.MetaData` objects to support
+ autogeneration of multiple :class:`~sqlalchemy.schema.MetaData`
+ collections.
Comparing and Rendering Types
------------------------------
:version: 0.9.0
:released:
+ .. change:: 38
+ :tags: feature, autogenerate
+ :tickets: 38
+
+ The :paramref:`.EnvironmentContext.configure.target_metadata` parameter
+ may now be optionally specified as a sequence of :class:`.MetaData`
+ objects instead of a single :class:`.MetaData` object. The
+ autogenerate process will process the sequence of :class:`.MetaData`
+ objects in order.
+
.. change:: 369
:tags: bug, commands
:tickets: 369
from alembic.testing import eq_
from alembic.ddl.base import _fk_spec
from alembic.autogenerate import api
+from alembic import util
+from sqlalchemy import event
names_in_this_test = set()
-from sqlalchemy import event
-
@event.listens_for(Table, "after_parent_attach")
def new_table(table, parent):
opts=None, object_filters=_default_object_filters,
return_ops=False):
self.metadata, model_metadata = m1, m2
- self.metadata.create_all(self.bind)
+ for m in util.to_list(self.metadata):
+ m.create_all(self.bind)
with self.bind.connect() as conn:
ctx_opts = {
def tearDown(self):
if hasattr(self, 'metadata'):
- self.metadata.drop_all(self.bind)
+ for m in util.to_list(self.metadata):
+ m.drop_all(self.bind)
clear_staging_env()
autogenerate.compare._compare_tables(
OrderedSet([(None, 'extra'), (None, 'user')]),
OrderedSet(), inspector,
- MetaData(), uo, self.autogen_context
+ uo, self.autogen_context
)
eq_(
[(rec[0], rec[1].name) for rec in uo.as_diffs()],
return m
def test_no_version_table(self):
- diffs = []
ctx = self.autogen_context
- autogenerate._produce_net_changes(ctx, diffs)
- eq_(diffs, [])
+ uo = ops.UpgradeOps(ops=[])
+ autogenerate._produce_net_changes(ctx, uo)
+ eq_(uo.as_diffs(), [])
def test_version_table_in_target(self):
- diffs = []
Table(
self.version_table_name,
self.m2, Column('x', Integer), schema=self.version_table_schema)
ctx = self.autogen_context
- autogenerate._produce_net_changes(ctx, diffs)
- eq_(diffs, [])
+ uo = ops.UpgradeOps(ops=[])
+ autogenerate._produce_net_changes(ctx, uo)
+ eq_(uo.as_diffs(), [])
class AutogenCustomVersionTableSchemaTest(AutogenVersionTableTest):
is_(op.reverse().to_index(), self.ix)
+class MultipleMetaDataTest(AutogenFixtureTest, TestBase):
+ def test_multiple(self):
+ m1a = MetaData()
+ m1b = MetaData()
+ m1c = MetaData()
+
+ m2a = MetaData()
+ m2b = MetaData()
+ m2c = MetaData()
+
+ Table('a', m1a, Column('id', Integer, primary_key=True))
+ Table('b1', m1b, Column('id', Integer, primary_key=True))
+ Table('b2', m1b, Column('id', Integer, primary_key=True))
+ Table('c1', m1c, Column('id', Integer, primary_key=True),
+ Column('x', Integer))
+
+ a = Table('a', m2a, Column('id', Integer, primary_key=True),
+ Column('q', Integer))
+ Table('b1', m2b, Column('id', Integer, primary_key=True))
+ Table('c1', m2c, Column('id', Integer, primary_key=True))
+ c2 = Table('c2', m2c, Column('id', Integer, primary_key=True))
+
+ diffs = self._fixture([m1a, m1b, m1c], [m2a, m2b, m2c])
+ eq_(diffs[0], ('add_table', c2))
+ eq_(diffs[1][0], 'remove_table')
+ eq_(diffs[1][1].name, 'b2')
+ eq_(diffs[2], ('add_column', None, 'a', a.c.q))
+ eq_(diffs[3][0:3], ('remove_column', None, 'c1'))
+ eq_(diffs[3][3].name, 'x')
+
+ def test_empty_list(self):
+ # because they're going to do it....
+
+ diffs = self._fixture([], [])
+ eq_(diffs, [])
+
+ def test_non_list_sequence(self):
+ # we call it "sequence", let's check that
+
+ m1a = MetaData()
+ m1b = MetaData()
+
+ m2a = MetaData()
+ m2b = MetaData()
+
+ Table('a', m1a, Column('id', Integer, primary_key=True))
+ Table('b', m1b, Column('id', Integer, primary_key=True))
+
+ Table('a', m2a, Column('id', Integer, primary_key=True))
+ b = Table('b', m2b, Column('id', Integer, primary_key=True),
+ Column('q', Integer))
+
+ diffs = self._fixture((m1a, m1b), (m2a, m2b))
+ eq_(
+ diffs,
+ [('add_column', None, 'b', b.c.q)]
+ )
+
+ def test_raise_on_dupe(self):
+ m1a = MetaData()
+ m1b = MetaData()
+
+ m2a = MetaData()
+ m2b = MetaData()
+
+ Table('a', m1a, Column('id', Integer, primary_key=True))
+ Table('b1', m1b, Column('id', Integer, primary_key=True))
+ Table('b2', m1b, Column('id', Integer, primary_key=True))
+ Table('b3', m1b, Column('id', Integer, primary_key=True))
+
+ Table('a', m2a, Column('id', Integer, primary_key=True))
+ Table('a', m2b, Column('id', Integer, primary_key=True))
+ Table('b1', m2b, Column('id', Integer, primary_key=True))
+ Table('b2', m2a, Column('id', Integer, primary_key=True))
+ Table('b2', m2b, Column('id', Integer, primary_key=True))
+
+ assert_raises_message(
+ ValueError,
+ 'Duplicate table keys across multiple MetaData objects: "a", "b2"',
+ self._fixture,
+ [m1a, m1b], [m2a, m2b]
+ )
+
+
class AutoincrementTest(AutogenFixtureTest, TestBase):
__backend__ = True
__requires__ = 'integer_subtype_comparisons',
def _expect_default(self, c_expected, col, seq=None):
Table('t', self.metadata, col)
+ self.autogen_context.metadata = self.metadata
+
if seq:
seq._set_metadata(self.metadata)
self.metadata.create_all(config.db)
uo = ops.UpgradeOps(ops=[])
_compare_tables(
set([(None, 't')]), set([]),
- insp, self.metadata, uo, self.autogen_context)
+ insp, uo, self.autogen_context)
diffs = uo.as_diffs()
tab = diffs[0][1]
uo = ops.UpgradeOps(ops=[])
m2 = MetaData()
Table('t', m2, Column('x', BigInteger()))
+ self.autogen_context.metadata = m2
_compare_tables(
set([(None, 't')]), set([(None, 't')]),
- insp, m2, uo, self.autogen_context)
+ insp, uo, self.autogen_context)
diffs = uo.as_diffs()
server_default = diffs[0][0][4]['existing_server_default']
eq_(_render_server_default_for_compare(