from sqlalchemy import types as sqltypes
from sqlalchemy.events import SchemaEventTarget
from sqlalchemy.util import OrderedDict
+from sqlalchemy.util import topological
+from ..util import exc
from ..util.sqla_compat import _columns_for_constraint
from ..util.sqla_compat import _fk_is_self_referential
from ..util.sqla_compat import _is_type_bound
reflect_args,
reflect_kwargs,
naming_convention,
+ partial_reordering,
):
self.operations = operations
self.table_name = table_name
("column_reflect", operations.impl.autogen_column_reflect)
)
self.naming_convention = naming_convention
+ self.partial_reordering = partial_reordering
self.batch = []
@property
reflected = True
batch_impl = ApplyBatchImpl(
- existing_table, self.table_args, self.table_kwargs, reflected
+ existing_table,
+ self.table_args,
+ self.table_kwargs,
+ reflected,
+ partial_reordering=self.partial_reordering,
)
for opname, arg, kw in self.batch:
fn = getattr(batch_impl, opname)
self.batch.append(("alter_column", arg, kw))
def add_column(self, *arg, **kw):
+ if (
+ "insert_before" in kw or "insert_after" in kw
+ ) and not self._should_recreate():
+ raise exc.CommandError(
+ "Can't specify insert_before or insert_after when using "
+ "ALTER; please specify recreate='always'"
+ )
self.batch.append(("add_column", arg, kw))
def drop_column(self, *arg, **kw):
class ApplyBatchImpl(object):
- def __init__(self, table, table_args, table_kwargs, reflected):
+ def __init__(
+ self, table, table_args, table_kwargs, reflected, partial_reordering=()
+ ):
self.table = table # this is a Table object
self.table_args = table_args
self.table_kwargs = table_kwargs
self.temp_table_name = self._calc_temp_name(table.name)
self.new_table = None
+
+ self.partial_reordering = partial_reordering # tuple of tuples
+ self.add_col_ordering = () # tuple of tuples
+
self.column_transfers = OrderedDict(
(c.name, {"expr": c}) for c in self.table.c
)
+ self.existing_ordering = list(self.column_transfers)
+
self.reflected = reflected
self._grab_table_elements()
for k in self.table.kwargs:
self.table_kwargs.setdefault(k, self.table.kwargs[k])
+ def _adjust_self_columns_for_partial_reordering(self):
+ pairs = set()
+
+ col_by_idx = list(self.columns)
+
+ if self.partial_reordering:
+ for tuple_ in self.partial_reordering:
+ for index, elem in enumerate(tuple_):
+ if index > 0:
+ pairs.add((tuple_[index - 1], elem))
+ else:
+ for index, elem in enumerate(self.existing_ordering):
+ if index > 0:
+ pairs.add((col_by_idx[index - 1], elem))
+
+ pairs.update(self.add_col_ordering)
+
+ # this can happen if some columns were dropped and not removed
+ # from existing_ordering. this should be prevented already, but
+ # conservatively making sure this didn't happen
+ pairs = [p for p in pairs if p[0] != p[1]]
+
+ sorted_ = list(
+ topological.sort(pairs, col_by_idx, deterministic_order=True)
+ )
+ self.columns = OrderedDict((k, self.columns[k]) for k in sorted_)
+ self.column_transfers = OrderedDict(
+ (k, self.column_transfers[k]) for k in sorted_
+ )
+
def _transfer_elements_to_new_table(self):
assert self.new_table is None, "Can only create new table once"
m = MetaData()
schema = self.table.schema
+ if self.partial_reordering or self.add_col_ordering:
+ self._adjust_self_columns_for_partial_reordering()
+
self.new_table = new_table = Table(
self.temp_table_name,
m,
if autoincrement is not None:
existing.autoincrement = bool(autoincrement)
- def add_column(self, table_name, column, **kw):
+ def _setup_dependencies_for_add_column(
+ self, colname, insert_before, insert_after
+ ):
+ index_cols = self.existing_ordering
+ col_indexes = {name: i for i, name in enumerate(index_cols)}
+
+ if not self.partial_reordering:
+ if insert_after:
+ if not insert_before:
+ if insert_after in col_indexes:
+ # insert after an existing column
+ idx = col_indexes[insert_after] + 1
+ if idx < len(index_cols):
+ insert_before = index_cols[idx]
+ else:
+ # insert after a column that is also new
+ insert_before = dict(self.add_col_ordering)[
+ insert_after
+ ]
+ if insert_before:
+ if not insert_after:
+ if insert_before in col_indexes:
+ # insert before an existing column
+ idx = col_indexes[insert_before] - 1
+ if idx >= 0:
+ insert_after = index_cols[idx]
+ else:
+ # insert before a column that is also new
+ insert_after = dict(
+ (b, a) for a, b in self.add_col_ordering
+ )[insert_before]
+
+ if insert_before:
+ self.add_col_ordering += ((colname, insert_before),)
+ if insert_after:
+ self.add_col_ordering += ((insert_after, colname),)
+
+ if (
+ not self.partial_reordering
+ and not insert_before
+ and not insert_after
+ and col_indexes
+ ):
+ self.add_col_ordering += ((index_cols[-1], colname),)
+
+ def add_column(
+ self, table_name, column, insert_before=None, insert_after=None, **kw
+ ):
+ self._setup_dependencies_for_add_column(
+ column.name, insert_before, insert_after
+ )
# we copy the column because operations.add_column()
# gives us a Column that is part of a Table already.
self.columns[column.name] = column.copy(schema=self.table.schema)
)
del self.columns[column.name]
del self.column_transfers[column.name]
+ self.existing_ordering.remove(column.name)
def add_constraint(self, const):
if not const.name:
from alembic.testing import mock
from alembic.testing import TestBase
from alembic.testing.fixtures import op_fixture
+from alembic.util import exc as alembic_exc
from alembic.util.sqla_compat import sqla_14
def setUp(self):
self.op = Operations(mock.Mock(opts={}))
- def _simple_fixture(self, table_args=(), table_kwargs={}):
+ def _simple_fixture(self, table_args=(), table_kwargs={}, **kw):
m = MetaData()
t = Table(
"tname",
Column("x", String(10)),
Column("y", Integer),
)
- return ApplyBatchImpl(t, table_args, table_kwargs, False)
+ return ApplyBatchImpl(t, table_args, table_kwargs, False, **kw)
def _uq_fixture(self, table_args=(), table_kwargs={}):
m = MetaData()
new_table = self._assert_impl(impl, colnames=["id", "x", "y", "g"])
eq_(new_table.c.g.name, "g")
+ def test_partial_reordering(self):
+ impl = self._simple_fixture(partial_reordering=[("x", "id", "y")])
+ new_table = self._assert_impl(impl, colnames=["x", "id", "y"])
+ eq_(new_table.c.x.name, "x")
+
+ def test_add_col_partial_reordering(self):
+ impl = self._simple_fixture(partial_reordering=[("id", "x", "g", "y")])
+ col = Column("g", Integer)
+ # operations.add_column produces a table
+ t = self.op.schema_obj.table("tname", col) # noqa
+ impl.add_column("tname", col)
+ new_table = self._assert_impl(impl, colnames=["id", "x", "g", "y"])
+ eq_(new_table.c.g.name, "g")
+
+ def test_add_col_insert_before(self):
+ impl = self._simple_fixture()
+ col = Column("g", Integer)
+ # operations.add_column produces a table
+ t = self.op.schema_obj.table("tname", col) # noqa
+ impl.add_column("tname", col, insert_before="x")
+ new_table = self._assert_impl(impl, colnames=["id", "g", "x", "y"])
+ eq_(new_table.c.g.name, "g")
+
+ def test_add_col_insert_before_beginning(self):
+ impl = self._simple_fixture()
+ impl.add_column("tname", Column("g", Integer), insert_before="id")
+ new_table = self._assert_impl(impl, colnames=["g", "id", "x", "y"])
+ eq_(new_table.c.g.name, "g")
+
+ def test_add_col_insert_before_middle(self):
+ impl = self._simple_fixture()
+ impl.add_column("tname", Column("g", Integer), insert_before="y")
+ new_table = self._assert_impl(impl, colnames=["id", "x", "g", "y"])
+ eq_(new_table.c.g.name, "g")
+
+ def test_add_col_insert_after_middle(self):
+ impl = self._simple_fixture()
+ impl.add_column("tname", Column("g", Integer), insert_after="id")
+ new_table = self._assert_impl(impl, colnames=["id", "g", "x", "y"])
+ eq_(new_table.c.g.name, "g")
+
+ def test_add_col_insert_after_penultimate(self):
+ impl = self._simple_fixture()
+ impl.add_column("tname", Column("g", Integer), insert_after="x")
+ self._assert_impl(impl, colnames=["id", "x", "g", "y"])
+
+ def test_add_col_insert_after_end(self):
+ impl = self._simple_fixture()
+ impl.add_column("tname", Column("g", Integer), insert_after="y")
+ new_table = self._assert_impl(impl, colnames=["id", "x", "y", "g"])
+ eq_(new_table.c.g.name, "g")
+
+ def test_add_col_insert_after_plus_no_order(self):
+ impl = self._simple_fixture()
+ # operations.add_column produces a table
+ impl.add_column("tname", Column("g", Integer), insert_after="id")
+ impl.add_column("tname", Column("q", Integer))
+ new_table = self._assert_impl(
+ impl, colnames=["id", "g", "x", "y", "q"]
+ )
+ eq_(new_table.c.g.name, "g")
+
+ def test_add_col_no_order_plus_insert_after(self):
+ impl = self._simple_fixture()
+ col = Column("g", Integer)
+ # operations.add_column produces a table
+ t = self.op.schema_obj.table("tname", col) # noqa
+ impl.add_column("tname", Column("q", Integer))
+ impl.add_column("tname", Column("g", Integer), insert_after="id")
+ new_table = self._assert_impl(
+ impl, colnames=["id", "g", "x", "y", "q"]
+ )
+ eq_(new_table.c.g.name, "g")
+
+ def test_add_col_insert_after_another_insert(self):
+ impl = self._simple_fixture()
+ impl.add_column("tname", Column("g", Integer), insert_after="id")
+ impl.add_column("tname", Column("q", Integer), insert_after="g")
+ new_table = self._assert_impl(
+ impl, colnames=["id", "g", "q", "x", "y"]
+ )
+ eq_(new_table.c.g.name, "g")
+
+ def test_add_col_insert_before_another_insert(self):
+ impl = self._simple_fixture()
+ impl.add_column("tname", Column("g", Integer), insert_after="id")
+ impl.add_column("tname", Column("q", Integer), insert_before="g")
+ new_table = self._assert_impl(
+ impl, colnames=["id", "q", "g", "x", "y"]
+ )
+ eq_(new_table.c.g.name, "g")
+
def test_add_server_default(self):
impl = self._simple_fixture()
impl.alter_column("tname", "y", server_default="10")
{"id": 5, "data": "d5", "x": 9, "data2": "hi"},
]
)
+ eq_(
+ [col["name"] for col in inspect(config.db).get_columns("foo")],
+ ["id", "data", "x", "data2"],
+ )
+
+ def test_add_column_insert_before_recreate(self):
+ with self.op.batch_alter_table("foo", recreate="always") as batch_op:
+ batch_op.add_column(
+ Column("data2", String(50), server_default="hi"),
+ insert_before="data",
+ )
+ self._assert_data(
+ [
+ {"id": 1, "data": "d1", "x": 5, "data2": "hi"},
+ {"id": 2, "data": "22", "x": 6, "data2": "hi"},
+ {"id": 3, "data": "8.5", "x": 7, "data2": "hi"},
+ {"id": 4, "data": "9.46", "x": 8, "data2": "hi"},
+ {"id": 5, "data": "d5", "x": 9, "data2": "hi"},
+ ]
+ )
+ eq_(
+ [col["name"] for col in inspect(config.db).get_columns("foo")],
+ ["id", "data2", "data", "x"],
+ )
+
+ def test_add_column_insert_after_recreate(self):
+ with self.op.batch_alter_table("foo", recreate="always") as batch_op:
+ batch_op.add_column(
+ Column("data2", String(50), server_default="hi"),
+ insert_after="data",
+ )
+ self._assert_data(
+ [
+ {"id": 1, "data": "d1", "x": 5, "data2": "hi"},
+ {"id": 2, "data": "22", "x": 6, "data2": "hi"},
+ {"id": 3, "data": "8.5", "x": 7, "data2": "hi"},
+ {"id": 4, "data": "9.46", "x": 8, "data2": "hi"},
+ {"id": 5, "data": "d5", "x": 9, "data2": "hi"},
+ ]
+ )
+ eq_(
+ [col["name"] for col in inspect(config.db).get_columns("foo")],
+ ["id", "data", "data2", "x"],
+ )
+
+ def test_add_column_insert_before_raise_on_alter(self):
+ def go():
+ with self.op.batch_alter_table("foo") as batch_op:
+ batch_op.add_column(
+ Column("data2", String(50), server_default="hi"),
+ insert_before="data",
+ )
+
+ assert_raises_message(
+ alembic_exc.CommandError,
+ "Can't specify insert_before or insert_after when using ALTER",
+ go,
+ )
def test_add_column_recreate(self):
with self.op.batch_alter_table("foo", recreate="always") as batch_op:
{"id": 5, "data": "d5", "x": 9, "data2": "hi"},
]
)
+ eq_(
+ [col["name"] for col in inspect(config.db).get_columns("foo")],
+ ["id", "data", "x", "data2"],
+ )
def test_create_drop_index(self):
insp = inspect(config.db)