self.named_constraints = {}
self.unnamed_constraints = []
self.indexes = {}
+ self.new_indexes = {}
for const in self.table.constraints:
if _is_type_bound(const):
continue
self._setup_referent(m, const)
new_table.append_constraint(const_copy)
- for index in self.indexes.values():
- Index(index.name,
- unique=index.unique,
- *[new_table.c[col] for col in index.columns.keys()],
- **index.kwargs)
+ def _gather_indexes_from_both_tables(self):
+ idx = []
+ idx.extend(self.indexes.values())
+ for index in self.new_indexes.values():
+ idx.append(
+ Index(
+ index.name,
+ unique=index.unique,
+ *[self.new_table.c[col] for col in index.columns.keys()],
+ **index.kwargs)
+ )
+ return idx
def _setup_referent(self, metadata, constraint):
spec = constraint.elements[0]._get_colspec()
self.table.name,
schema=self.table.schema
)
+ self.new_table.name = self.table.name
+ try:
+ for idx in self._gather_indexes_from_both_tables():
+ op_impl.create_index(idx)
+ finally:
+ self.new_table.name = "_alembic_batch_temp"
def alter_column(self, table_name, column_name,
nullable=None,
raise ValueError("No such constraint: '%s'" % const.name)
def create_index(self, idx):
- self.indexes[idx.name] = idx
+ self.new_indexes[idx.name] = idx
def drop_index(self, idx):
try:
create_stmt = re.sub(r'[\n\t]', '', create_stmt)
idx_stmt = ""
- for idx in impl.new_table.indexes:
+ for idx in impl.indexes.values():
idx_stmt += str(CreateIndex(idx).compile(dialect=context.dialect))
+ for idx in impl.new_indexes.values():
+ impl.new_table.name = impl.table.name
+ idx_stmt += str(CreateIndex(idx).compile(dialect=context.dialect))
+ impl.new_table.name = '_alembic_batch_temp'
idx_stmt = re.sub(r'[\n\t]', '', idx_stmt)
if ddl_contains:
expected = [
create_stmt,
]
- if impl.new_table.indexes:
- expected.append(idx_stmt)
if schema:
args = {"schema": "%s." % schema}
'ALTER TABLE %(schema)s_alembic_batch_temp '
'RENAME TO %(schema)stname' % args
])
+ if idx_stmt:
+ expected.append(idx_stmt)
context.assert_(*expected)
return impl.new_table
'CREATE TABLE _alembic_batch_temp (id INTEGER NOT NULL, '
'data VARCHAR(50), '
'x INTEGER, PRIMARY KEY (id))',
- 'CREATE UNIQUE INDEX ix_data ON _alembic_batch_temp (data)',
'INSERT INTO _alembic_batch_temp (id, data, x) '
'SELECT foo.id, foo.data, foo.x FROM foo',
'DROP TABLE foo',
- 'ALTER TABLE _alembic_batch_temp RENAME TO foo'
+ 'ALTER TABLE _alembic_batch_temp RENAME TO foo',
+ 'CREATE UNIQUE INDEX ix_data ON foo (data)',
)
context.clear_assertions()
context.assert_(
'CREATE TABLE _alembic_batch_temp (id INTEGER NOT NULL, '
'data INTEGER, x INTEGER, PRIMARY KEY (id))',
- 'CREATE UNIQUE INDEX ix_data ON _alembic_batch_temp (data)',
'INSERT INTO _alembic_batch_temp (id, data, x) SELECT foo.id, '
'CAST(foo.data AS INTEGER) AS anon_1, foo.x FROM foo',
'DROP TABLE foo',
- 'ALTER TABLE _alembic_batch_temp RENAME TO foo'
+ 'ALTER TABLE _alembic_batch_temp RENAME TO foo',
+ 'CREATE UNIQUE INDEX ix_data ON foo (data)',
)
context.clear_assertions()
)
return nopk
+ def _table_w_index_fixture(self):
+ t = Table(
+ 't_w_ix', self.metadata,
+ Column('id', Integer, primary_key=True),
+ Column('thing', Integer),
+ Column('data', String(20)),
+ )
+ Index('ix_thing', t.c.thing)
+ t.create(self.conn)
+ return t
+
def tearDown(self):
self.metadata.drop_all(self.conn)
self.conn.close()
data
)
+ def test_ix_existing(self):
+ self._table_w_index_fixture()
+
+ with self.op.batch_alter_table("t_w_ix") as batch_op:
+ batch_op.alter_column('data', type_=String(30))
+ batch_op.create_index("ix_data", ["data"])
+
+ insp = Inspector.from_engine(config.db)
+ eq_(
+ sorted(insp.get_indexes('t_w_ix'), key=lambda idx: idx['name']),
+ [
+ {'unique': 0, 'name': 'ix_data', 'column_names': ['data']},
+ {'unique': 0, 'name': 'ix_thing', 'column_names': ['thing']}
+ ]
+ )
+
def test_fk_points_to_me_auto(self):
self._test_fk_points_to_me("auto")