object_filters,
inspector, metadata, diffs, autogen_context):
+ default_schema = inspector.bind.dialect.default_schema_name
+
+ # tables coming from the connection will not have "schema"
+ # set if it matches default_schema_name; so we need a list
+ # of table names from local metadata that also have "None" if schema
+ # == default_schema_name. Most setups will be like this anyway but
+ # some are not (see #170)
+ metadata_table_names_no_dflt_schema = OrderedSet([
+ (schema if schema != default_schema else None, tname)
+ for schema, tname in metadata_table_names
+ ])
+
+ # to adjust for the MetaData collection storing the tables either
+ # as "schemaname.tablename" or just "tablename", create a new lookup
+ # which will match the "non-default-schema" keys to the Table object.
+ tname_to_table = dict(
+ (
+ no_dflt_schema,
+ metadata.tables[sa_schema._get_table_key(tname, schema)]
+ )
+ for no_dflt_schema, (schema, tname) in zip(
+ metadata_table_names_no_dflt_schema,
+ metadata_table_names)
+ )
+ metadata_table_names = metadata_table_names_no_dflt_schema
+
for s, tname in metadata_table_names.difference(conn_table_names):
name = '%s.%s' % (s, tname) if s else tname
- metadata_table = metadata.tables[sa_schema._get_table_key(tname, s)]
+ metadata_table = tname_to_table[(s, tname)]
if _run_filters(metadata_table, tname, "table", False, None, object_filters):
- diffs.append(("add_table", metadata.tables[name]))
+ diffs.append(("add_table", metadata_table))
log.info("Detected added table %r", name)
_compare_indexes_and_uniques(s, tname, object_filters,
None,
for s, tname in sorted(existing_tables):
name = '%s.%s' % (s, tname) if s else tname
- metadata_table = metadata.tables[name]
+ metadata_table = tname_to_table[(s, tname)]
conn_table = existing_metadata.tables[name]
if _run_filters(metadata_table, tname, "table", False, conn_table, object_filters):
cls.m1.drop_all(cls.bind)
clear_staging_env()
+class AutogenFixtureTest(object):
+ def _fixture(self, m1, m2, include_schemas=False):
+ self.metadata, model_metadata = m1, m2
+ self.metadata.create_all(self.bind)
+
+ with self.bind.connect() as conn:
+ self.context = context = MigrationContext.configure(
+ connection=conn,
+ opts={
+ 'compare_type': True,
+ 'compare_server_default': True,
+ 'target_metadata': model_metadata,
+ 'upgrade_token': "upgrades",
+ 'downgrade_token': "downgrades",
+ 'alembic_module_prefix': 'op.',
+ 'sqlalchemy_module_prefix': 'sa.',
+ }
+ )
+
+ connection = context.bind
+ autogen_context = {
+ 'imports': set(),
+ 'connection': connection,
+ 'dialect': connection.dialect,
+ 'context': context
+ }
+ diffs = []
+ autogenerate._produce_net_changes(connection, model_metadata, diffs,
+ autogen_context,
+ object_filters=_default_object_filters,
+ include_schemas=include_schemas
+ )
+ return diffs
+
+ reports_unnamed_constraints = False
+
+ def setUp(self):
+ staging_env()
+ self.bind = self._get_bind()
+
+ def tearDown(self):
+ self.metadata.drop_all(self.bind)
+ clear_staging_env()
+
+ @classmethod
+ def _get_bind(cls):
+ return sqlite_db()
+
class AutogenCrossSchemaTest(AutogenTest, TestCase):
@classmethod
eq_(diffs[0][1].schema, self.test_schema_name)
+class AutogenDefaultSchemaTest(AutogenFixtureTest, TestCase):
+ @classmethod
+ def _get_bind(cls):
+ cls.test_schema_name = "test_schema"
+ return db_for_dialect('postgresql')
+
+ def test_uses_explcit_schema_in_default_one(self):
+
+ default_schema = self.bind.dialect.default_schema_name
+
+ m1 = MetaData()
+ m2 = MetaData()
+
+ Table('a', m1, Column('x', String(50)))
+ Table('a', m2, Column('x', String(50)), schema=default_schema)
+
+ diffs = self._fixture(m1, m2, include_schemas=True)
+ eq_(diffs, [])
+
+ def test_uses_explcit_schema_in_default_two(self):
+
+ default_schema = self.bind.dialect.default_schema_name
+
+ m1 = MetaData()
+ m2 = MetaData()
+
+ Table('a', m1, Column('x', String(50)))
+ Table('a', m2, Column('x', String(50)), schema=default_schema)
+ Table('a', m2, Column('y', String(50)), schema="test_schema")
+
+ diffs = self._fixture(m1, m2, include_schemas=True)
+ eq_(len(diffs), 1)
+ eq_(diffs[0][0], "add_table")
+ eq_(diffs[0][1].schema, "test_schema")
+ eq_(diffs[0][1].c.keys(), ['y'])
+
+ def test_uses_explcit_schema_in_default_three(self):
+
+ default_schema = self.bind.dialect.default_schema_name
+
+ m1 = MetaData()
+ m2 = MetaData()
+
+ Table('a', m1, Column('y', String(50)), schema="test_schema")
+
+ Table('a', m2, Column('x', String(50)), schema=default_schema)
+ Table('a', m2, Column('y', String(50)), schema="test_schema")
+
+
+ diffs = self._fixture(m1, m2, include_schemas=True)
+ eq_(len(diffs), 1)
+ eq_(diffs[0][0], "add_table")
+ eq_(diffs[0][1].schema, default_schema)
+ eq_(diffs[0][1].c.keys(), ['x'])
+
+
class ModelOne(object):
schema = None
### end Alembic commands ###""" % {"schema": self.schema})
-class AutogenerateUniqueIndexTest(TestCase):
+
+class AutogenerateUniqueIndexTest(AutogenFixtureTest, TestCase):
def test_index_flag_becomes_named_unique_constraint(self):
m1 = MetaData()
eq_(diffs, [])
- def _fixture(self, m1, m2, include_schemas=False):
- self.metadata, model_metadata = m1, m2
- self.metadata.create_all(self.bind)
-
- with self.bind.connect() as conn:
- self.context = context = MigrationContext.configure(
- connection=conn,
- opts={
- 'compare_type': True,
- 'compare_server_default': True,
- 'target_metadata': model_metadata,
- 'upgrade_token': "upgrades",
- 'downgrade_token': "downgrades",
- 'alembic_module_prefix': 'op.',
- 'sqlalchemy_module_prefix': 'sa.',
- }
- )
-
- connection = context.bind
- autogen_context = {
- 'imports': set(),
- 'connection': connection,
- 'dialect': connection.dialect,
- 'context': context
- }
- diffs = []
- autogenerate._produce_net_changes(connection, model_metadata, diffs,
- autogen_context,
- object_filters=_default_object_filters,
- include_schemas=include_schemas
- )
- return diffs
-
- reports_unnamed_constraints = False
-
- def setUp(self):
- staging_env()
- self.bind = self._get_bind()
-
- def tearDown(self):
- self.metadata.drop_all(self.bind)
- clear_staging_env()
-
- @classmethod
- def _get_bind(cls):
- return sqlite_db()
-
-
class PGUniqueIndexTest(AutogenerateUniqueIndexTest):
reports_unnamed_constraints = True
eq_(diffs[0][0], "add_table")
eq_(len(diffs), 1)
+
class MySQLUniqueIndexTest(AutogenerateUniqueIndexTest):
reports_unnamed_constraints = True