import logging
import re
+from sqlalchemy.exc import NoSuchTableError
from sqlalchemy.engine.reflection import Inspector
from sqlalchemy.util import OrderedSet
from sqlalchemy import schema as sa_schema, types as sqltypes
inspector.reflecttable(t, None)
conn_column_info[(s, tname)] = t
+ if hasattr(inspector, 'get_unique_constraints'):
+ can_inspect_uniques = True
+ else:
+ log.warn(
+ "Unique constraints have not been inspected because the version "
+ "of SQLAlchemy in use does not support it. Please see "
+ "SQLAlchemy's documentation for which versions' "
+ "sqlalchemy.engine.reflection.Inspector object include "
+ "get_unique_constraints()."
+ )
+ can_inspect_uniques = False
+ c_uniques = {}
+
for s, tname in sorted(existing_tables):
name = '%s.%s' % (s, tname) if s else tname
metadata_table = metadata.tables[name]
_compare_columns(s, tname, object_filters,
conn_table,
metadata_table,
- diffs, autogen_context)
+ diffs, autogen_context, inspector)
+ if can_inspect_uniques:
+ c_uniques = _compare_uniques(s, tname,
+ object_filters, conn_table, metadata_table,
+ diffs, autogen_context, inspector)
+ _compare_indexes(s, tname, object_filters,
+ conn_table,
+ metadata_table,
+ diffs, autogen_context, inspector,
+ can_inspect_uniques, c_uniques)
# TODO:
- # index add/drop
# table constraints
# sequences
###################################################
# element comparison
+def _make_index(params, conn_table):
+ return sa_schema.Index(
+ params['name'],
+ *[conn_table.c[cname] for cname in params['column_names']],
+ unique=params['unique']
+ )
+
+def _make_unique_constraint(params, conn_table):
+ return sa_schema.UniqueConstraint(
+ *[conn_table.c[cname] for cname in params['column_names']],
+ name=params['name']
+ )
+
def _compare_columns(schema, tname, object_filters, conn_table, metadata_table,
- diffs, autogen_context):
+ diffs, autogen_context, inspector):
name = '%s.%s' % (schema, tname) if schema else tname
metadata_cols_by_name = dict((c.name, c) for c in metadata_table.c)
conn_col_names = dict((c.name, c) for c in conn_table.c)
metadata_col_names = OrderedSet(sorted(metadata_cols_by_name))
-
for cname in metadata_col_names.difference(conn_col_names):
if _run_filters(metadata_cols_by_name[cname], cname,
"column", False, None, object_filters):
if col_diff:
diffs.append(col_diff)
+
+def _compare_uniques(schema, tname, object_filters, conn_table,
+ metadata_table, diffs, autogen_context, inspector):
+
+ m_objs = dict(
+ (i.name or _autogenerate_unique_constraint_name(i), i) \
+ for i in metadata_table.constraints \
+ if isinstance(i, sa_schema.UniqueConstraint)
+ )
+ m_keys = set(m_objs.keys())
+
+ if hasattr(inspector, 'get_unique_constraints'):
+ try:
+ conn_uniques = inspector.get_unique_constraints(tname)
+ except NoSuchTableError:
+ conn_uniques = []
+ else:
+ conn_uniques = []
+ c_objs = dict(
+ (i['name'] or _autogenerate_unique_constraint_name({
+ 'table': conn_table, 'columns': i['columns']}),
+ _make_unique_constraint(i, conn_table)) \
+ for i in conn_uniques
+ )
+ c_keys = set(c_objs.keys())
+
+ for key in (m_keys - c_keys):
+ meta = m_objs[key]
+ diffs.append(("add_constraint", meta))
+ log.info("Detected added unique constraint '%s' on %s",
+ key, ', '.join([
+ "'%s'" % y.name for y in meta.columns
+ ])
+ )
+
+ for key in (c_keys - m_keys):
+ diffs.append(("remove_constraint", c_objs[key]))
+ log.info("Detected removed unique constraint '%s' on '%s'",
+ key, tname
+ )
+
+ for key in (m_keys & c_keys):
+ meta = m_objs[key]
+ conn = c_objs[key]
+ conn_cols = [col.name for col in conn.columns]
+ meta_cols = [col.name for col in meta.columns]
+
+ if meta_cols != conn_cols:
+ diffs.append(("remove_constraint", conn))
+ diffs.append(("add_constraint", meta))
+ log.info("Detected changed unique constraint '%s' on '%s':%s",
+ key, tname, ' columns %r to %r' % (conn_cols, meta_cols)
+ )
+
+ # inspector.get_indexes() can conflate indexes and unique
+ # constraints when unique constraints are implemented by the database
+ # as an index. so we pass uniques to _compare_indexes() for
+ # deduplication
+ return c_keys
+
+def _compare_indexes(schema, tname, object_filters, conn_table,
+ metadata_table, diffs, autogen_context, inspector,
+ can_inspect_uniques, c_uniques_keys):
+
+ try:
+ c_objs = dict(
+ (i['name'], _make_index(i, conn_table)) \
+ for i in inspector.get_indexes(tname)
+ )
+ except NoSuchTableError:
+ c_objs = {}
+
+ # deduplicate between conn uniques and indexes, because either:
+ # 1. a backend reports uniques as indexes, because uniques
+ # are implemented as a type of index.
+ # 2. our SQLA version does not reflect uniques
+ # in either case, we need to avoid comparing a connection index
+ # for what we can tell from the metadata is meant as a unique constraint
+ if not can_inspect_uniques:
+ c_uniques_keys = set([
+ i.name or _autogenerate_unique_constraint_name(i) \
+ for i in metadata_table.constraints \
+ if isinstance(i, sa_schema.UniqueConstraint)]
+ )
+ for name in c_objs.keys():
+ if name in c_uniques_keys:
+ c_objs.pop(name)
+
+ c_keys = set(c_objs.keys())
+
+ m_objs = dict(
+ (i.name, i) for i in metadata_table.indexes \
+ if i.name not in c_uniques_keys
+ )
+ m_keys = set(m_objs.keys())
+
+ for key in (m_keys - c_keys):
+ meta = m_objs[key]
+ diffs.append(("add_index", meta))
+ log.info("Detected added index '%s' on %s",
+ key, ', '.join([
+ "'%s'" % y.name for y in meta.expressions
+ ])
+ )
+
+ for key in (c_keys - m_keys):
+ diffs.append(("remove_index", c_objs[key]))
+ log.info("Detected removed index '%s' on '%s'", key, tname)
+
+ for key in (m_keys & c_keys):
+
+ meta = m_objs[key]
+ conn = c_objs[key]
+ conn_exps = [exp.name for exp in conn.expressions]
+ meta_exps = [exp.name for exp in meta.expressions]
+
+ # todo: kwargs can differ, e.g., changing the type of index
+ # we can't detect this via the inspection API, though
+ if (meta.unique or False != conn.unique or False)\
+ or meta_exps != conn_exps:
+ diffs.append(("remove_index", conn))
+ diffs.append(("add_index", meta))
+
+ msg = []
+ if meta.unique or False != conn.unique or False:
+ msg.append(' unique=%r to unique=%r' % (
+ conn.unique, meta.unique
+ ))
+ if meta_exps != conn_exps:
+ msg.append(' columns %r to %r' % (
+ conn_exps, meta_exps
+ ))
+ log.info("Detected changed index '%s' on '%s':%s",
+ key, tname, ', '.join(msg)
+ )
+
def _compare_nullable(schema, tname, cname, conn_col,
metadata_col_nullable, diffs,
autogen_context):
metadata_type = metadata_col.type
if conn_type._type_affinity is sqltypes.NullType:
log.info("Couldn't determine database type "
- "for column '%s.%s'" % (tname, cname))
+ "for column '%s.%s'", tname, cname)
return
if metadata_type._type_affinity is sqltypes.NullType:
log.info("Column '%s.%s' has no type within "
- "the model; can't compare" % (tname, cname))
+ "the model; can't compare", tname, cname)
return
isdiff = autogen_context['context']._compare_type(conn_col, metadata_col)
_commands = {
"table": (_drop_table, _add_table),
"column": (_drop_column, _add_column),
+ "index": (_drop_index, _add_index),
+ "constraint": (_drop_constraint, _add_constraint),
}
cmd_callables = _commands[cmd_type]
text += ")"
return text
+def _add_index(index, autogen_context):
+ """
+ Generate Alembic operations for the CREATE INDEX of an
+ :class:`~sqlalchemy.schema.Index` instance.
+ """
+ text = "op.create_index('%(name)s', '%(table)s', %(columns)s, unique=%(unique)r%(schema)s%(kwargs)s)" % {
+ 'name': index.name,
+ 'table': index.table,
+ 'columns': [exp.name for exp in index.expressions],
+ 'unique': index.unique or False,
+ 'schema': (", schema='%s'" % index.table.schema) if index.table.schema else '',
+ 'kwargs': (', '+', '.join(
+ ["%s='%s'" % (key, val) for key, val in index.kwargs.items()]))\
+ if len(index.kwargs) else ''
+ }
+ return text
+
+def _drop_index(index, autogen_context):
+ """
+ Generate Alembic operations for the DROP INDEX of an
+ :class:`~sqlalchemy.schema.Index` instance.
+ """
+ text = "op.drop_index('%s', '%s')" % (index.name, index.table)
+ return text
+
+def _autogenerate_unique_constraint_name(constraint):
+ """
+ In order to both create and drop a constraint, we need a name known
+ ahead of time.
+ """
+ return 'uq_%s_%s' % (
+ str(constraint.table).replace('.', '_'),
+ '_'.join([col.name for col in constraint.columns])
+ )
+
+def _add_unique_constraint(constraint, autogen_context):
+ """
+ Generate Alembic operations for the ALTER TABLE .. ADD CONSTRAINT ...
+ UNIQUE of a :class:`~sqlalchemy.schema.UniqueConstraint` instance.
+ """
+ text = "%(prefix)screate_unique_constraint('%(name)s', '%(table)s', %(columns)s"\
+ "%(deferrable)s%(initially)s%(schema)s)" % {
+ 'prefix': _alembic_autogenerate_prefix(autogen_context),
+ 'name': constraint.name or _autogenerate_unique_constraint_name(constraint),
+ 'table': constraint.table,
+ 'columns': [col.name for col in constraint.columns],
+ 'deferrable': (", deferrable='%s'" % constraint.deferrable) if constraint.deferrable else '',
+ 'initially': (", initially='%s'" % constraint.initially) if constraint.initially else '',
+ 'schema': (", schema='%s'" % constraint.table.schema) if constraint.table.schema else ''
+ }
+ return text
+
+def _add_fk_constraint(constraint, autogen_context):
+ raise NotImplementedError()
+
+def _add_pk_constraint(constraint, autogen_context):
+ raise NotImplementedError()
+
+def _add_check_constraint(constraint, autogen_context):
+ raise NotImplementedError()
+
+def _add_constraint(constraint, autogen_context):
+ """
+ Dispatcher for the different types of constraints.
+ """
+ funcs = {
+ "unique_constraint": _add_unique_constraint,
+ "foreign_key_constraint": _add_fk_constraint,
+ "primary_key_constraint": _add_pk_constraint,
+ "check_constraint": _add_check_constraint,
+ "column_check_constraint": _add_check_constraint,
+ }
+ return funcs[constraint.__visit_name__](constraint, autogen_context)
+
+def _drop_constraint(constraint, autogen_context):
+ """
+ Generate Alembic operations for the ALTER TABLE ... DROP CONSTRAINT
+ of a :class:`~sqlalchemy.schema.UniqueConstraint` instance.
+ """
+ text = "%(prefix)sdrop_constraint('%(name)s', '%(table)s')" % {
+ 'prefix': _alembic_autogenerate_prefix(autogen_context),
+ 'name': constraint.name or _autogenerate_unique_constraint_name(constraint),
+ 'table': constraint.table,
+ }
+ return text
+
def _add_column(schema, tname, column, autogen_context):
text = "%(prefix)sadd_column(%(tname)r, %(column)s" % {
"prefix": _alembic_autogenerate_prefix(autogen_context),
def _alembic_autogenerate_prefix(autogen_context):
return autogen_context['opts']['alembic_module_prefix'] or ''
-
def _user_defined_render(type_, object_, autogen_context):
if 'opts' in autogen_context and \
'render_item' in autogen_context['opts']:
Numeric, CHAR, ForeignKey, DATETIME, INTEGER, \
TypeDecorator, CheckConstraint, Unicode, Enum,\
UniqueConstraint, Boolean, ForeignKeyConstraint,\
- PrimaryKeyConstraint
+ PrimaryKeyConstraint, Index, func
from sqlalchemy.types import NULLTYPE, TIMESTAMP
from sqlalchemy.dialects import mysql
from sqlalchemy.engine.reflection import Inspector
Table('address', m,
Column('id', Integer, primary_key=True),
Column('email_address', String(100), nullable=False),
+ Index('address_email_address_idx', 'email_address')
)
Table('order', m,
Column('order_id', Integer, primary_key=True),
Column("amount", Numeric(8, 2), nullable=False,
server_default="0"),
+ CheckConstraint('amount >= 0', name='ck_order_amount')
)
Table('extra', m,
Table('user', m,
Column('id', Integer, primary_key=True),
- Column('name', String(50), nullable=False),
- Column('a1', Text, server_default="x"),
+ Column('name', String(50), nullable=False, index=True),
+ Column('a1', Text, server_default="x")
)
Table('address', m,
Column('id', Integer, primary_key=True),
- Column('email_address', String(100), nullable=False),
+ Column('email_address', String(100), unique=True, nullable=False),
Column('street', String(50))
)
Table('order', m,
Column('order_id', Integer, primary_key=True),
- Column("amount", Numeric(10, 2), nullable=True,
+ Column('amount', Numeric(10, 2), nullable=True,
server_default="0"),
Column('user_id', Integer, ForeignKey('user.id')),
+ CheckConstraint('amount > -1', name='ck_order_amount'),
+ UniqueConstraint('order_id', 'user_id',
+ name='order_order_id_user_id_unique'
+ ),
+ Index('order_user_id_amount_idx', 'user_id', 'amount')
)
Table('item', m,
)
return m
+
def _model_three():
m = MetaData()
return m
return m
+
def _default_include_object(obj, name, type_, reflected, compare_to):
if type_ == "table":
return name in ("parent", "child",
def _get_model_schema(cls):
return _model_two(schema=cls.test_schema_name)
-
def test_diffs(self):
"""test generation of diff rules"""
eq_(diffs[2][2], "address")
eq_(diffs[2][3], metadata.tables['%s.address' % self.test_schema_name].c.street)
- eq_(diffs[3][0], "add_column")
- eq_(diffs[3][1], self.test_schema_name)
- eq_(diffs[3][2], "order")
- eq_(diffs[3][3], metadata.tables['%s.order' % self.test_schema_name].c.user_id)
+ eq_(diffs[3][0], "add_constraint")
+
+ eq_(diffs[4][0], "add_column")
+ eq_(diffs[4][1], self.test_schema_name)
+ eq_(diffs[4][2], "order")
+ eq_(diffs[4][3], metadata.tables['%s.order' % self.test_schema_name].c.user_id)
- eq_(diffs[4][0][0], "modify_type")
- eq_(diffs[4][0][1], self.test_schema_name)
- eq_(diffs[4][0][2], "order")
- eq_(diffs[4][0][3], "amount")
- eq_(repr(diffs[4][0][5]), "NUMERIC(precision=8, scale=2)")
- eq_(repr(diffs[4][0][6]), "Numeric(precision=10, scale=2)")
+ eq_(diffs[5][0][0], "modify_type")
+ eq_(diffs[5][0][1], self.test_schema_name)
+ eq_(diffs[5][0][2], "order")
+ eq_(diffs[5][0][3], "amount")
+ eq_(repr(diffs[5][0][5]), "NUMERIC(precision=8, scale=2)")
+ eq_(repr(diffs[5][0][6]), "Numeric(precision=10, scale=2)")
+ eq_(diffs[6][0], "add_constraint")
+ eq_(diffs[6][1].name, "order_order_id_user_id_unique")
- eq_(diffs[5][0], 'remove_column')
- eq_(diffs[5][3].name, 'pw')
- eq_(diffs[6][0][0], "modify_default")
- eq_(diffs[6][0][1], self.test_schema_name)
- eq_(diffs[6][0][2], "user")
- eq_(diffs[6][0][3], "a1")
- eq_(diffs[6][0][6].arg, "x")
+ eq_(diffs[7][0], "add_index")
+ eq_(diffs[7][1].name, "order_user_id_amount_idx")
- eq_(diffs[7][0][0], 'modify_nullable')
- eq_(diffs[7][0][5], True)
- eq_(diffs[7][0][6], False)
+ eq_(diffs[8][0], 'remove_column')
+ eq_(diffs[8][3].name, 'pw')
+
+ eq_(diffs[9][0][0], "modify_default")
+ eq_(diffs[9][0][1], self.test_schema_name)
+ eq_(diffs[9][0][2], "user")
+ eq_(diffs[9][0][3], "a1")
+ eq_(diffs[9][0][6].arg, "x")
+
+ eq_(diffs[10][0][0], 'modify_nullable')
+ eq_(diffs[10][0][5], True)
+ eq_(diffs[10][0][6], False)
def test_render_nothing(self):
context = MigrationContext.configure(
pass
### end Alembic commands ###""")
- def test_render_diffs(self):
- """test a full render including indentation"""
+ def test_render_diffs_extras(self):
+ """test a full render including indentation (include and schema)"""
template_args = {}
autogenerate._produce_migration_diffs(
include_object=_default_include_object,
include_schemas=True
)
+
eq_(re.sub(r"u'", "'", template_args['upgrades']),
"""### commands auto generated by Alembic - please adjust! ###
op.create_table('item',
)
op.drop_table('extra', schema='%(schema)s')
op.add_column('address', sa.Column('street', sa.String(length=50), nullable=True), schema='%(schema)s')
+ op.create_unique_constraint('uq_test_schema_address_email_address', 'test_schema.address', ['email_address'], schema='test_schema')
op.add_column('order', sa.Column('user_id', sa.Integer(), nullable=True), schema='%(schema)s')
op.alter_column('order', 'amount',
existing_type=sa.NUMERIC(precision=8, scale=2),
nullable=True,
existing_server_default='0::numeric',
schema='%(schema)s')
+ op.create_unique_constraint('order_order_id_user_id_unique', 'test_schema.order', ['order_id', 'user_id'], schema='test_schema')
+ op.create_index('order_user_id_amount_idx', 'test_schema.order', ['user_id', 'amount'], unique=False, schema='test_schema')
op.drop_column('user', 'pw', schema='%(schema)s')
op.alter_column('user', 'a1',
existing_type=sa.TEXT(),
existing_type=sa.VARCHAR(length=50),
nullable=False,
schema='%(schema)s')
+ op.create_index('ix_test_schema_user_name', 'test_schema.user', ['name'], unique=False, schema='test_schema')
### end Alembic commands ###""" % {"schema": self.test_schema_name})
eq_(re.sub(r"u'", "'", template_args['downgrades']),
"""### commands auto generated by Alembic - please adjust! ###
+ op.drop_index('ix_test_schema_user_name', 'test_schema.user')
op.alter_column('user', 'name',
existing_type=sa.VARCHAR(length=50),
nullable=True,
existing_nullable=True,
schema='%(schema)s')
op.add_column('user', sa.Column('pw', sa.VARCHAR(length=50), nullable=True), schema='%(schema)s')
+ op.drop_index('order_user_id_amount_idx', 'test_schema.order')
+ op.drop_constraint('order_order_id_user_id_unique', 'test_schema.order')
op.alter_column('order', 'amount',
existing_type=sa.Numeric(precision=10, scale=2),
type_=sa.NUMERIC(precision=8, scale=2),
existing_server_default='0::numeric',
schema='%(schema)s')
op.drop_column('order', 'user_id', schema='%(schema)s')
+ op.drop_constraint('uq_test_schema_address_email_address', 'test_schema.address')
op.drop_column('address', 'street', schema='%(schema)s')
op.create_table('extra',
sa.Column('x', sa.CHAR(length=1), autoincrement=False, nullable=True),
eq_(diffs[2][2], "address")
eq_(diffs[2][3], metadata.tables['address'].c.street)
- eq_(diffs[3][0], "add_column")
- eq_(diffs[3][1], None)
- eq_(diffs[3][2], "order")
- eq_(diffs[3][3], metadata.tables['order'].c.user_id)
+ eq_(diffs[4][0], "remove_index")
+ eq_(diffs[4][1].name, "address_email_address_idx")
- eq_(diffs[4][0][0], "modify_type")
- eq_(diffs[4][0][1], None)
- eq_(diffs[4][0][2], "order")
- eq_(diffs[4][0][3], "amount")
- eq_(repr(diffs[4][0][5]), "NUMERIC(precision=8, scale=2)")
- eq_(repr(diffs[4][0][6]), "Numeric(precision=10, scale=2)")
+ eq_(diffs[5][0], "add_column")
+ eq_(diffs[5][1], None)
+ eq_(diffs[5][2], "order")
+ eq_(diffs[5][3], metadata.tables['order'].c.user_id)
+ eq_(diffs[6][0][0], "modify_type")
+ eq_(diffs[6][0][1], None)
+ eq_(diffs[6][0][2], "order")
+ eq_(diffs[6][0][3], "amount")
+ eq_(repr(diffs[6][0][5]), "NUMERIC(precision=8, scale=2)")
+ eq_(repr(diffs[6][0][6]), "Numeric(precision=10, scale=2)")
- eq_(diffs[5][0], 'remove_column')
- eq_(diffs[5][3].name, 'pw')
+ eq_(diffs[7][0], "add_constraint")
+ eq_(diffs[7][1].name, "order_order_id_user_id_unique")
- eq_(diffs[6][0][0], "modify_default")
- eq_(diffs[6][0][1], None)
- eq_(diffs[6][0][2], "user")
- eq_(diffs[6][0][3], "a1")
- eq_(diffs[6][0][6].arg, "x")
+ eq_(diffs[8][0], 'add_index')
+ eq_(diffs[8][1].name, 'order_user_id_amount_idx')
+
+ eq_(diffs[9][0], 'remove_column')
+ eq_(diffs[9][3].name, 'pw')
- eq_(diffs[7][0][0], 'modify_nullable')
- eq_(diffs[7][0][5], True)
- eq_(diffs[7][0][6], False)
+ eq_(diffs[10][0][0], "modify_default")
+ eq_(diffs[10][0][1], None)
+ eq_(diffs[10][0][2], "user")
+ eq_(diffs[10][0][3], "a1")
+ eq_(diffs[10][0][6].arg, "x")
+
+ eq_(diffs[11][0][0], 'modify_nullable')
+ eq_(diffs[11][0][5], True)
+ eq_(diffs[11][0][6], False)
def test_render_nothing(self):
context = MigrationContext.configure(
)
template_args = {}
autogenerate._produce_migration_diffs(context, template_args, set())
+
eq_(re.sub(r"u'", "'", template_args['upgrades']),
"""### commands auto generated by Alembic - please adjust! ###
pass
pass
### end Alembic commands ###""")
- def test_render_diffs(self):
+ def test_render_diffs_standard(self):
"""test a full render including indentation"""
metadata = self.m2
template_args = {}
autogenerate._produce_migration_diffs(self.context, template_args, set())
+
eq_(re.sub(r"u'", "'", template_args['upgrades']),
"""### commands auto generated by Alembic - please adjust! ###
op.create_table('item',
)
op.drop_table('extra')
op.add_column('address', sa.Column('street', sa.String(length=50), nullable=True))
+ op.create_unique_constraint('uq_address_email_address', 'address', ['email_address'])
+ op.drop_index('address_email_address_idx', 'address')
op.add_column('order', sa.Column('user_id', sa.Integer(), nullable=True))
op.alter_column('order', 'amount',
existing_type=sa.NUMERIC(precision=8, scale=2),
type_=sa.Numeric(precision=10, scale=2),
nullable=True,
existing_server_default='0')
+ op.create_unique_constraint('order_order_id_user_id_unique', 'order', ['order_id', 'user_id'])
+ op.create_index('order_user_id_amount_idx', 'order', ['user_id', 'amount'], unique=False)
op.drop_column('user', 'pw')
op.alter_column('user', 'a1',
existing_type=sa.TEXT(),
op.alter_column('user', 'name',
existing_type=sa.VARCHAR(length=50),
nullable=False)
+ op.create_index('ix_user_name', 'user', ['name'], unique=False)
### end Alembic commands ###""")
eq_(re.sub(r"u'", "'", template_args['downgrades']),
"""### commands auto generated by Alembic - please adjust! ###
+ op.drop_index('ix_user_name', 'user')
op.alter_column('user', 'name',
existing_type=sa.VARCHAR(length=50),
nullable=True)
server_default=None,
existing_nullable=True)
op.add_column('user', sa.Column('pw', sa.VARCHAR(length=50), nullable=True))
+ op.drop_index('order_user_id_amount_idx', 'order')
+ op.drop_constraint('order_order_id_user_id_unique', 'order')
op.alter_column('order', 'amount',
existing_type=sa.Numeric(precision=10, scale=2),
type_=sa.NUMERIC(precision=8, scale=2),
nullable=False,
existing_server_default='0')
op.drop_column('order', 'user_id')
+ op.create_index('address_email_address_idx', 'address', ['email_address'], unique=False)
+ op.drop_constraint('uq_address_email_address', 'address')
op.drop_column('address', 'street')
op.create_table('extra',
sa.Column('x', sa.CHAR(), nullable=True),
)
+class AutogenerateUniqueIndexTest(AutogenTest, TestCase):
+
+ @classmethod
+ def _get_db_schema(cls):
+ m = MetaData()
+
+ Table('user', m,
+ Column('id', Integer, primary_key=True),
+ Column('name', String(50), nullable=False, index=True),
+ Column('a1', Text, server_default="x")
+ )
+
+ Table('address', m,
+ Column('id', Integer, primary_key=True),
+ Column('email_address', String(100), nullable=False)
+ )
+
+ Table('order', m,
+ Column('order_id', Integer, primary_key=True),
+ Column('amount', Numeric(10, 2), nullable=True),
+ Column('user_id', Integer, ForeignKey('user.id')),
+ CheckConstraint('amount > -1', name='ck_order_amount'),
+ UniqueConstraint('order_id', 'user_id',
+ name='order_order_id_user_id_unique'
+ ),
+ Index('order_user_id_amount_idx', 'user_id', 'amount')
+ )
+ return m
+
+
+ @classmethod
+ def _get_model_schema(cls):
+ m = MetaData()
+
+ Table('user', m,
+ Column('id', Integer, primary_key=True),
+ Column('name', String(50), nullable=False, unique=True),
+ Column('a1', Text, server_default="x")
+ )
+
+ Table('address', m,
+ Column('id', Integer, primary_key=True),
+ Column('email_address', String(100), unique=True, nullable=False)
+ )
+
+ Table('order', m,
+ Column('order_id', Integer, primary_key=True),
+ Column('amount', Numeric(10, 2), nullable=True),
+ Column('user_id', Integer, ForeignKey('user.id')),
+ UniqueConstraint('order_id', 'user_id',
+ name='order_order_id_user_id_unique'
+ ),
+ Index('order_user_id_amount_idx', 'user_id', 'amount', unique=True),
+ CheckConstraint('amount >= 0', name='ck_order_amount')
+ )
+
+ return m
+
+ @classmethod
+ @requires_07
+ def setup_class(cls):
+ staging_env()
+ cls.bind = cls._get_bind()
+ cls.m2 = cls._get_db_schema()
+ cls.m2.create_all(cls.bind)
+ cls.m5 = cls._get_model_schema()
+
+ conn = cls.bind.connect()
+ cls.context = context = MigrationContext.configure(
+ connection=conn,
+ opts={
+ 'compare_type': True,
+ 'compare_server_default': True,
+ 'target_metadata': cls.m5,
+ 'upgrade_token': "upgrades",
+ 'downgrade_token': "downgrades",
+ 'alembic_module_prefix': 'op.',
+ 'sqlalchemy_module_prefix': 'sa.',
+ }
+ )
+
+ connection = context.bind
+ cls.autogen_context = {
+ 'imports': set(),
+ 'connection': connection,
+ 'dialect': connection.dialect,
+ 'context': context
+ }
+
+ @classmethod
+ def teardown_class(cls):
+ cls.m2.drop_all(cls.bind)
+ clear_staging_env()
+
+ def test_diffs(self):
+ """test generation of diff rules"""
+
+ metadata = self.m5
+ connection = self.context.bind
+ diffs = []
+ autogenerate._produce_net_changes(connection, metadata, diffs,
+ self.autogen_context,
+ object_filters=_default_object_filters,
+ )
+
+ eq_(diffs[0][0], "add_constraint")
+ eq_(diffs[0][1].table.name, "address")
+
+ eq_(diffs[1][0], "remove_index")
+ eq_(diffs[1][1].name, "order_user_id_amount_idx")
+ eq_(diffs[1][1].unique, False)
+
+ eq_(diffs[2][0], "add_index")
+ eq_(diffs[2][1].name, "order_user_id_amount_idx")
+ eq_(diffs[2][1].unique, True)
+
+ eq_(diffs[3][0], "add_constraint")
+ eq_(diffs[3][1].table.name, "user")
+
+ eq_(diffs[4][0], "remove_index")
+ eq_(diffs[4][1].name, "ix_user_name")
+
+
class AutogenerateCustomCompareTypeTest(AutogenTest, TestCase):
@classmethod
def _get_db_schema(cls):
'dialect': mysql.dialect()
}
+ def test_render_add_index(self):
+ """
+ autogenerate._add_index
+ """
+ m = MetaData()
+ t = Table('test', m,
+ Column('id', Integer, primary_key=True),
+ Column('active', Boolean()),
+ Column('code', String(255)),
+ )
+ idx = Index('test_active_code_idx', t.c.active, t.c.code)
+ eq_ignore_whitespace(
+ autogenerate._add_index(idx, self.autogen_context),
+ "op.create_index('test_active_code_idx', 'test', "
+ "['active', 'code'], unique=False)"
+ )
+
+ def test_render_add_index_schema(self):
+ """
+ autogenerate._add_index using schema
+ """
+ m = MetaData()
+ t = Table('test', m,
+ Column('id', Integer, primary_key=True),
+ Column('active', Boolean()),
+ Column('code', String(255)),
+ schema='CamelSchema'
+ )
+ idx = Index('test_active_code_idx', t.c.active, t.c.code)
+ eq_ignore_whitespace(
+ autogenerate._add_index(idx, self.autogen_context),
+ "op.create_index('test_active_code_idx', 'CamelSchema.test', "
+ "['active', 'code'], unique=False, schema='CamelSchema')"
+ )
+
+ # def test_render_add_index_func(self):
+ # """
+ # autogenerate._drop_index using func -- TODO: SQLA needs to
+ # reflect expressions as well as columns
+ # """
+ # m = MetaData()
+ # t = Table('test', m,
+ # Column('id', Integer, primary_key=True),
+ # Column('active', Boolean()),
+ # Column('code', String(255)),
+ # )
+ # idx = Index('test_active_lower_code_idx', t.c.active, func.lower(t.c.code))
+ # eq_ignore_whitespace(
+ # autogenerate._add_index(idx, self.autogen_context),
+ # ""
+ # )
+
+ def test_drop_index(self):
+ """
+ autogenerate._drop_index
+ """
+ m = MetaData()
+ t = Table('test', m,
+ Column('id', Integer, primary_key=True),
+ Column('active', Boolean()),
+ Column('code', String(255)),
+ )
+ idx = Index('test_active_code_idx', t.c.active, t.c.code)
+ eq_ignore_whitespace(
+ autogenerate._drop_index(idx, self.autogen_context),
+ "op.drop_index('test_active_code_idx', 'test')"
+ )
+
+ def test_add_unique_constraint(self):
+ """
+ autogenerate._add_unique_constraint
+ """
+ m = MetaData()
+ t = Table('test', m,
+ Column('id', Integer, primary_key=True),
+ Column('active', Boolean()),
+ Column('code', String(255)),
+ )
+ uq = UniqueConstraint(t.c.code, name='uq_test_code')
+ eq_ignore_whitespace(
+ autogenerate._add_unique_constraint(uq, self.autogen_context),
+ "op.create_unique_constraint('uq_test_code', 'test', ['code'])"
+ )
+
+ def test_drop_constraint(self):
+ """
+ autogenerate._drop_constraint
+ """
+ m = MetaData()
+ t = Table('test', m,
+ Column('id', Integer, primary_key=True),
+ Column('active', Boolean()),
+ Column('code', String(255)),
+ )
+ uq = UniqueConstraint(t.c.code, name='uq_test_code')
+ eq_ignore_whitespace(
+ autogenerate._drop_constraint(uq, self.autogen_context),
+ "op.drop_constraint('uq_test_code', 'test')"
+ )
+
def test_render_table_upgrade(self):
m = MetaData()
t = Table('test', m,