if _run_filters(metadata_table, tname, "table", False, None, object_filters):
diffs.append(("add_table", metadata.tables[name]))
log.info("Detected added table %r", name)
- _compare_indexes(s, tname, object_filters,
+ _compare_indexes_and_uniques(s, tname, object_filters,
None,
metadata_table,
- diffs, autogen_context, inspector,
- set())
+ diffs, autogen_context, inspector)
removal_metadata = sa_schema.MetaData()
for s, tname in conn_table_names.difference(metadata_table_names):
conn_table,
metadata_table,
diffs, autogen_context, inspector)
- c_uniques = _compare_uniques(s, tname,
- object_filters, conn_table, metadata_table,
- diffs, autogen_context, inspector)
- _compare_indexes(s, tname, object_filters,
+ _compare_indexes_and_uniques(s, tname, object_filters,
conn_table,
metadata_table,
- diffs, autogen_context, inspector,
- c_uniques)
+ diffs, autogen_context, inspector)
# TODO:
# table constraints
if col_diff:
diffs.append(col_diff)
-class _uq_constraint_sig(object):
+class _constraint_sig(object):
+ def __eq__(self, other):
+ return self.const == other.const
+
+ def __ne__(self, other):
+ return self.const != other.const
+
+ def __hash__(self):
+ return hash(self.const)
+
+class _uq_constraint_sig(_constraint_sig):
+ is_index = False
+ is_unique = True
+
def __init__(self, const):
self.const = const
self.name = const.name
self.sig = tuple(sorted([col.name for col in const.columns]))
- def __eq__(self, other):
- if self.name is not None and other.name is not None:
- return other.name == self.name
- else:
- return self.sig == other.sig
+ @property
+ def column_names(self):
+ return [col.name for col in self.const.columns]
- def __ne__(self, other):
- return not self.__eq__(other)
+class _ix_constraint_sig(_constraint_sig):
+ is_index = True
- def __hash__(self):
- return hash(self.sig)
+ def __init__(self, const):
+ self.const = const
+ self.name = const.name
+ self.sig = tuple(sorted([col.name for col in const.columns]))
+ self.is_unique = bool(const.unique)
+
+ @property
+ def column_names(self):
+ return _get_index_column_names(self.const)
+
+def _get_index_column_names(idx):
+ if compat.sqla_08:
+ return [exp.name for exp in idx.expressions]
+ else:
+ return [col.name for col in idx.columns]
-def _compare_uniques(schema, tname, object_filters, conn_table,
+def _compare_indexes_and_uniques(schema, tname, object_filters, conn_table,
metadata_table, diffs, autogen_context, inspector):
- m_objs = dict(
- (_uq_constraint_sig(uq), uq) for uq in metadata_table.constraints
- if isinstance(uq, sa_schema.UniqueConstraint)
+ # 1a. get raw indexes and unique constraints from metadata ...
+ metadata_unique_constraints = set(uq for uq in metadata_table.constraints
+ if isinstance(uq, sa_schema.UniqueConstraint)
)
- m_keys = set(m_objs.keys())
+ metadata_indexes = set(metadata_table.indexes)
- if hasattr(inspector, "get_unique_constraints"):
+ # 1b. ... and from connection
+ if conn_table is not None and hasattr(inspector, "get_unique_constraints"):
try:
conn_uniques = inspector.get_unique_constraints(tname)
- except NotImplementedError:
- return None
- except NoSuchTableError:
+ except (NotImplementedError, NoSuchTableError):
conn_uniques = []
else:
- return None
+ conn_uniques = []
- c_objs = dict(
- (_uq_constraint_sig(uq), uq)
- for uq in
- (_make_unique_constraint(uq_def, conn_table) for uq_def in conn_uniques)
- )
- c_keys = set(c_objs)
-
- c_obj_by_name = dict((uq.name, uq) for uq in c_objs.values())
- m_obj_by_name = dict((uq.name, uq) for uq in m_objs.values())
-
- # for constraints that are named the same on both sides,
- # keep these as a single "drop"/"add" so that the ordering
- # comes out correctly
- names_equal = set(c_obj_by_name).intersection(m_obj_by_name)
- for name_equal in names_equal:
- m_keys.remove(_uq_constraint_sig(m_obj_by_name[name_equal]))
- c_keys.remove(_uq_constraint_sig(c_obj_by_name[name_equal]))
-
- for key in m_keys.difference(c_keys):
- meta_constraint = m_objs[key]
- diffs.append(("add_constraint", meta_constraint))
- log.info("Detected added unique constraint '%s' on %s",
- key, ', '.join([
- "'%s'" % y.name for y in meta_constraint.columns
- ])
- )
- for key in c_keys.difference(m_keys):
- diffs.append(("remove_constraint", c_objs[key]))
- log.info("Detected removed unique constraint '%s' on '%s'",
- key, tname
- )
+ try:
+ conn_indexes = inspector.get_indexes(tname)
+ except NoSuchTableError:
+ conn_indexes = []
+
+ # 2. convert conn-level objects from raw inspector records
+ # into schema objects
+ conn_uniques = set(_make_unique_constraint(uq_def, conn_table)
+ for uq_def in conn_uniques)
+ conn_indexes = set(_make_index(ix, conn_table) for ix in conn_indexes)
+
+ # 3. give the dialect a chance to omit indexes and constraints that
+ # we know are either added implicitly by the DB or that the DB
+ # can't accurately report on
+ autogen_context['context'].impl.\
+ correct_for_autogen_constraints(
+ conn_uniques, conn_indexes,
+ metadata_unique_constraints,
+ metadata_indexes
+ )
+
+ # 4. organize the constraints into "signature" collections, the
+ # _constraint_sig() objects provide a consistent facade over both
+ # Index and UniqueConstraint so we can easily work with them
+ # interchangeably
+ metadata_unique_constraints = set(_uq_constraint_sig(uq)
+ for uq in metadata_unique_constraints
+ )
+
+ metadata_indexes = set(_ix_constraint_sig(ix) for ix in metadata_indexes)
+
+ conn_unique_constraints = set(_uq_constraint_sig(uq) for uq in conn_uniques)
+
+ conn_indexes = set(_ix_constraint_sig(ix) for ix in conn_indexes)
+
+ # 5. index things by name, for those objects that have names
+ metadata_names = dict(
+ (c.name, c) for c in
+ metadata_unique_constraints.union(metadata_indexes)
+ if c.name is not None)
+
+ conn_uniques_by_name = dict((c.name, c) for c in conn_unique_constraints)
+ conn_indexes_by_name = dict((c.name, c) for c in conn_indexes)
+
+ conn_names = dict((c.name, c) for c in
+ conn_unique_constraints.union(conn_indexes)
+ if c.name is not None)
+
+ doubled_constraints = dict(
+ (name, (conn_uniques_by_name[name], conn_indexes_by_name[name]))
+ for name in set(conn_uniques_by_name).intersection(conn_indexes_by_name)
+ )
- for meta_constraint, conn_constraint in [
- (m_objs[key], c_objs[key]) for key in m_keys.intersection(c_keys)
- ] + [
- (m_obj_by_name[key], c_obj_by_name[key]) for key in names_equal
- ]:
- conn_cols = [col.name for col in conn_constraint.columns]
- meta_cols = [col.name for col in meta_constraint.columns]
-
- if meta_cols != conn_cols:
- diffs.append(("remove_constraint", conn_constraint))
- diffs.append(("add_constraint", meta_constraint))
- log.info("Detected changed unique constraint '%s' on '%s':%s",
- meta_constraint.name, tname, ' columns %r to %r' % (conn_cols, meta_cols)
+ # 6. index things by "column signature", to help with unnamed unique
+ # constraints.
+ conn_uniques_by_sig = dict((uq.sig, uq) for uq in conn_unique_constraints)
+ metadata_uniques_by_sig = dict(
+ (uq.sig, uq) for uq in metadata_unique_constraints)
+ metadata_indexes_by_sig = dict(
+ (ix.sig, ix) for ix in metadata_indexes)
+ unnamed_metadata_uniques = dict((uq.sig, uq) for uq in
+ metadata_unique_constraints if uq.name is None)
+
+ # assumptions:
+ # 1. a unique constraint or an index from the connection *always*
+ # has a name.
+ # 2. an index on the metadata side *always* has a name.
+ # 3. a unique constraint on the metadata side *might* have a name.
+ # 4. The backend may double up indexes as unique constraints and
+ # vice versa (e.g. MySQL, Postgresql)
+
+ def obj_added(obj):
+ if obj.is_index:
+ diffs.append(("add_index", obj.const))
+ log.info("Detected added index '%s' on %s",
+ obj.name, ', '.join([
+ "'%s'" % obj.column_names
+ ])
+ )
+ else:
+ diffs.append(("add_constraint", obj.const))
+ log.info("Detected added unique constraint '%s' on %s",
+ obj.name, ', '.join([
+ "'%s'" % obj.column_names
+ ])
)
- # inspector.get_indexes() can conflate indexes and unique
- # constraints when unique constraints are implemented by the database
- # as an index. so we pass uniques to _compare_indexes() for
- # deduplication
- return c_keys
-
-def _get_index_column_names(idx):
- if compat.sqla_08:
- return [exp.name for exp in idx.expressions]
- else:
- return [col.name for col in idx.columns]
+ def obj_removed(obj):
+ if obj.is_index:
+ diffs.append(("remove_index", obj.const))
+ log.info("Detected removed index '%s' on '%s'", obj.name, tname)
+ else:
+ diffs.append(("remove_constraint", obj.const))
+ log.info("Detected removed unique constraint '%s' on '%s'",
+ obj.name, tname
+ )
-def _compare_indexes(schema, tname, object_filters, conn_table,
- metadata_table, diffs, autogen_context, inspector,
- c_uniques_keys):
+ def obj_changed(old, new, msg):
+ if old.is_index:
+ log.info("Detected changed index '%s' on '%s':%s",
+ old.name, tname, ', '.join(msg)
+ )
+ diffs.append(("remove_index", old.const))
+ diffs.append(("add_index", new.const))
+ else:
+ log.info("Detected changed unique constraint '%s' on '%s':%s",
+ old.name, tname, ', '.join(msg)
+ )
+ diffs.append(("remove_constraint", old.const))
+ diffs.append(("add_constraint", new.const))
+ for added_name in sorted(set(metadata_names).difference(conn_names)):
+ obj = metadata_names[added_name]
+ obj_added(obj)
- try:
- reflected_indexes = inspector.get_indexes(tname)
- except NoSuchTableError:
- c_objs = {}
- else:
- c_objs = dict(
- (i['name'], _make_index(i, conn_table))
- for i in reflected_indexes
- )
- m_objs = dict((i.name, i) for i in metadata_table.indexes)
-
- # deduplicate between conn uniques and indexes, because either:
- # 1. a backend reports uniques as indexes, because uniques
- # are implemented as a type of index.
- # 2. our backend and/or SQLA version does not reflect uniques
- # in either case, we need to avoid comparing a connection index
- # for what we can tell from the metadata is meant as a unique constraint
- if c_uniques_keys is None:
- c_uniques_keys = set(
- i.name for i in metadata_table.constraints \
- if isinstance(i, sa_schema.UniqueConstraint) and i.name is not None
- )
- else:
- c_uniques_keys = set(uq.name for uq in c_uniques_keys if uq.name is not None)
-
- c_keys = set(c_objs).difference(c_uniques_keys)
- m_keys = set(m_objs).difference(c_uniques_keys)
-
- for key in m_keys.difference(c_keys):
- meta = m_objs[key]
- diffs.append(("add_index", meta))
- log.info("Detected added index '%s' on %s",
- key, ', '.join([
- "'%s'" % _get_index_column_names(meta)
- ])
- )
+ for existing_name in sorted(set(metadata_names).intersection(conn_names)):
+ metadata_obj = metadata_names[existing_name]
- for key in c_keys.difference(m_keys):
- diffs.append(("remove_index", c_objs[key]))
- log.info("Detected removed index '%s' on '%s'", key, tname)
-
- for key in m_keys.intersection(c_keys):
- meta_index = m_objs[key]
- conn_index = c_objs[key]
- # TODO: why don't we just render the DDL here
- # so we can compare the string output fully
- conn_exps = _get_index_column_names(conn_index)
- meta_exps = _get_index_column_names(meta_index)
-
- # convert between both Nones (SQLA ticket #2825) on the metadata
- # side and zeroes on the reflection side.
- if bool(meta_index.unique) is not bool(conn_index.unique) \
- or meta_exps != conn_exps:
- diffs.append(("remove_index", conn_index))
- diffs.append(("add_index", meta_index))
+ if existing_name in doubled_constraints:
+ conn_uq, conn_idx = doubled_constraints[existing_name]
+ if metadata_obj.is_index:
+ conn_obj = conn_idx
+ else:
+ conn_obj = conn_uq
+ else:
+ conn_obj = conn_names[existing_name]
+ if conn_obj.is_index != metadata_obj.is_index:
+ obj_removed(conn_obj)
+ obj_added(metadata_obj)
+ else:
msg = []
- if meta_index.unique is not conn_index.unique:
+ if conn_obj.is_unique != metadata_obj.is_unique:
msg.append(' unique=%r to unique=%r' % (
- conn_index.unique, meta_index.unique
+ conn_obj.is_unique, metadata_obj.is_unique
))
- if meta_exps != conn_exps:
+ if conn_obj.sig != metadata_obj.sig:
msg.append(' columns %r to %r' % (
- conn_exps, meta_exps
+ conn_obj.sig, metadata_obj.sig
))
- log.info("Detected changed index '%s' on '%s':%s",
- key, tname, ', '.join(msg)
- )
+
+ if msg:
+ obj_changed(conn_obj, metadata_obj, msg)
+
+
+ for removed_name in sorted(set(conn_names).difference(metadata_names)):
+ conn_obj = conn_names[removed_name]
+ if not conn_obj.is_index and conn_obj.sig in unnamed_metadata_uniques:
+ continue
+ elif removed_name in doubled_constraints:
+ if conn_obj.sig not in metadata_indexes_by_sig and \
+ conn_obj.sig not in metadata_uniques_by_sig:
+ conn_uq, conn_idx = doubled_constraints[removed_name]
+ obj_removed(conn_uq)
+ obj_removed(conn_idx)
+ else:
+ obj_removed(conn_obj)
+
+ for uq_sig in unnamed_metadata_uniques:
+ if uq_sig not in conn_uniques_by_sig:
+ obj_added(unnamed_metadata_uniques[uq_sig])
+
def _compare_nullable(schema, tname, cname, conn_col,
metadata_col_nullable, diffs,
return m
-
+names_in_this_test = set()
def _default_include_object(obj, name, type_, reflected, compare_to):
if type_ == "table":
- return name in ("parent", "child",
- "user", "order", "item",
- "address", "extra", "col_change")
+ return name in names_in_this_test
else:
return True
_default_object_filters = [
_default_include_object
]
+from sqlalchemy import event
+@event.listens_for(Table, "after_parent_attach")
+def new_table(table, parent):
+ names_in_this_test.add(table.name)
class AutogenTest(object):
@classmethod
@classmethod
def _get_bind(cls):
- return db_for_dialect('mysql') #sqlite_db()
+ return db_for_dialect('mysql')
@classmethod
def _get_db_schema(cls):
class AutogenerateUniqueIndexTest(TestCase):
-
- def _fixture_one(self):
+ def test_index_flag_becomes_named_unique_constraint(self):
m1 = MetaData()
m2 = MetaData()
Table('user', m1,
Column('id', Integer, primary_key=True),
Column('name', String(50), nullable=False, index=True),
- Column('a1', Text, server_default="x")
+ Column('a1', String(10), server_default="x")
)
Table('user', m2,
Column('id', Integer, primary_key=True),
Column('name', String(50), nullable=False),
- Column('a1', Text, server_default="x"),
+ Column('a1', String(10), server_default="x"),
UniqueConstraint("name", name="uq_user_name")
)
- return m1, m2
- def test_index_flag_becomes_named_unique_constraint(self):
- diffs = self._fixture(self._fixture_one)
+ diffs = self._fixture(m1, m2)
eq_(diffs[0][0], "add_constraint")
eq_(diffs[0][1].name, "uq_user_name")
eq_(diffs[1][0], "remove_index")
eq_(diffs[1][1].name, "ix_user_name")
- def _fixture_two(self):
+
+ def test_add_unique_constraint(self):
m1 = MetaData()
m2 = MetaData()
Table('address', m1,
Column('qpr', String(10), index=True),
UniqueConstraint("email_address", name="uq_email_address")
)
- return m1, m2
- def test_add_unique_constraint(self):
- diffs = self._fixture(self._fixture_two)
+ diffs = self._fixture(m1, m2)
eq_(diffs[0][0], "add_constraint")
eq_(diffs[0][1].name, "uq_email_address")
- def _fixture_three(self):
+
+ def test_index_becomes_unique(self):
m1 = MetaData()
m2 = MetaData()
Table('order', m1,
),
Index('order_user_id_amount_idx', 'user_id', 'amount', unique=True),
)
- return m1, m2
- def test_index_becomes_unique(self):
- diffs = self._fixture(self._fixture_three)
+ diffs = self._fixture(m1, m2)
eq_(diffs[0][0], "remove_index")
eq_(diffs[0][1].name, "order_user_id_amount_idx")
eq_(diffs[0][1].unique, False)
eq_(diffs[1][1].name, "order_user_id_amount_idx")
eq_(diffs[1][1].unique, True)
- def _fixture_four(self):
+
+ def test_mismatch_db_named_col_flag(self):
m1 = MetaData()
m2 = MetaData()
Table('item', m1,
Table('item', m2,
Column('x', Integer, unique=True)
)
- return m1, m2
- def test_mismatch_db_named_col_flag(self):
- diffs = self._fixture(self._fixture_four)
+ diffs = self._fixture(m1, m2)
eq_(diffs, [])
- def _fixture_five(self):
+ def test_new_table_added(self):
m1 = MetaData()
m2 = MetaData()
Table('extra', m2,
Column('bar', Integer),
Index('newtable_idx', 'bar')
)
- return m1, m2
- def test_new_table_added(self):
- diffs = self._fixture(self._fixture_five)
+ diffs = self._fixture(m1, m2)
eq_(diffs[0][0], "add_table")
eq_(diffs[2][0], "add_index")
eq_(diffs[2][1].name, "newtable_idx")
- def _fixture_six(self):
+
+ def test_named_cols_changed(self):
m1 = MetaData()
m2 = MetaData()
Table('col_change', m1,
Column('y', Integer),
UniqueConstraint('x', 'y', name="nochange")
)
- return m1, m2
- def test_named_cols_changed(self):
- diffs = self._fixture(self._fixture_six)
+ diffs = self._fixture(m1, m2)
eq_(diffs[0][0], "remove_constraint")
eq_(diffs[0][1].name, "nochange")
eq_(diffs[1][0], "add_constraint")
eq_(diffs[1][1].name, "nochange")
- def _fixture(self, fn):
- staging_env()
- self.bind = sqlite_db()
- self.metadata, model_metadata = fn()
+
+ def test_nothing_changed_one(self):
+ m1 = MetaData()
+ m2 = MetaData()
+
+ Table('nothing_changed', m1,
+ Column('x', String(20), unique=True, index=True)
+ )
+
+ Table('nothing_changed', m2,
+ Column('x', String(20), unique=True, index=True)
+ )
+
+ diffs = self._fixture(m1, m2)
+ eq_(diffs, [])
+
+
+ def test_nothing_changed_two(self):
+ m1 = MetaData()
+ m2 = MetaData()
+
+ Table('nothing_changed', m1,
+ Column('id1', Integer, primary_key=True),
+ Column('id2', Integer, primary_key=True),
+ Column('x', String(20), unique=True)
+ )
+ Table('nothing_changed_related', m1,
+ Column('id1', Integer),
+ Column('id2', Integer),
+ ForeignKeyConstraint(['id1', 'id2'], ['nothing_changed.id1', 'nothing_changed.id2'])
+ )
+
+ Table('nothing_changed', m2,
+ Column('id1', Integer, primary_key=True),
+ Column('id2', Integer, primary_key=True),
+ Column('x', String(20), unique=True)
+ )
+ Table('nothing_changed_related', m2,
+ Column('id1', Integer),
+ Column('id2', Integer),
+ ForeignKeyConstraint(['id1', 'id2'], ['nothing_changed.id1', 'nothing_changed.id2'])
+ )
+
+
+ diffs = self._fixture(m1, m2)
+ eq_(diffs, [])
+
+
+ def test_unnamed_cols_changed(self):
+ m1 = MetaData()
+ m2 = MetaData()
+ Table('col_change', m1,
+ Column('x', Integer),
+ Column('y', Integer),
+ UniqueConstraint('x')
+ )
+ Table('col_change', m2,
+ Column('x', Integer),
+ Column('y', Integer),
+ UniqueConstraint('x', 'y')
+ )
+
+ diffs = self._fixture(m1, m2)
+
+ diffs = set((cmd,
+ ('x' in obj.name) if obj.name is not None else False)
+ for cmd, obj in diffs)
+ if self.reports_unnamed_constraints:
+ assert ("remove_constraint", True) in diffs
+ assert ("add_constraint", False) in diffs
+
+
+
+ def test_remove_named_unique_index(self):
+ m1 = MetaData()
+ m2 = MetaData()
+
+ Table('remove_idx', m1,
+ Column('x', Integer),
+ Index('xidx', 'x', unique=True)
+ )
+ Table('remove_idx', m2,
+ Column('x', Integer),
+ )
+
+ diffs = self._fixture(m1, m2)
+
+ diffs = set((cmd, obj.name) for cmd, obj in diffs)
+ assert ("remove_index", "xidx") in diffs
+
+
+ def test_remove_named_unique_constraint(self):
+ m1 = MetaData()
+ m2 = MetaData()
+
+ Table('remove_idx', m1,
+ Column('x', Integer),
+ UniqueConstraint('x', name='xidx')
+ )
+ Table('remove_idx', m2,
+ Column('x', Integer),
+ )
+
+ diffs = self._fixture(m1, m2)
+
+ diffs = ((cmd, obj.name) for cmd, obj in diffs)
+ assert ("remove_constraint", "xidx") in diffs
+
+ def _fixture(self, m1, m2):
+ self.metadata, model_metadata = m1, m2
self.metadata.create_all(self.bind)
conn = self.bind.connect()
)
return diffs
+ reports_unnamed_constraints = False
+
+ def setUp(self):
+ staging_env()
+ self.bind = self._get_bind()
+
def tearDown(self):
self.metadata.drop_all(self.bind)
clear_staging_env()
+ @classmethod
+ def _get_bind(cls):
+ return sqlite_db()
class PGUniqueIndexTest(AutogenerateUniqueIndexTest):
+ reports_unnamed_constraints = True
+
@classmethod
def _get_bind(cls):
return db_for_dialect('postgresql')
+class MySQLUniqueIndexTest(AutogenerateUniqueIndexTest):
+ reports_unnamed_constraints = True
+
+ @classmethod
+ def _get_bind(cls):
+ return db_for_dialect('mysql')
+
class AutogenerateCustomCompareTypeTest(AutogenTest, TestCase):
@classmethod