--- /dev/null
+.. change::
+ :tags: usecase, sql
+ :tickets: 7910
+
+ Added method :meth:`.TableClause.insert_column` to complement
+ :meth:`.TableClause.append_column`, which inserts the given column at a
+ specific index. This can be helpful for prepending primary key columns to
+ tables, etc.
+
self._index.update({k: (k, col) for k, col, _ in reversed(collection)})
def add(
- self, column: ColumnElement[Any], key: Optional[_COLKEY] = None
+ self,
+ column: ColumnElement[Any],
+ key: Optional[_COLKEY] = None,
) -> None:
"""Add a column to this :class:`_sql.ColumnCollection`.
(colkey, _column, _ColumnMetrics(self, _column))
)
self._colset.add(_column._deannotate())
+
self._index[l] = (colkey, _column)
if colkey not in self._index:
self._index[colkey] = (colkey, _column)
"""
def add( # type: ignore[override]
- self, column: _NAMEDCOL, key: Optional[str] = None
+ self,
+ column: _NAMEDCOL,
+ key: Optional[str] = None,
+ *,
+ index: Optional[int] = None,
) -> None:
if key is not None and column.key != key:
raise exc.ArgumentError(
if existing is column:
return
- self.replace(column)
+ self.replace(column, index=index)
# pop out memoized proxy_set as this
# operation may very well be occurring
# in a _make_proxy operation
util.memoized_property.reset(column, "proxy_set")
else:
- self._append_new_column(key, column)
+ self._append_new_column(key, column, index=index)
+
+ def _append_new_column(
+ self, key: str, named_column: _NAMEDCOL, *, index: Optional[int] = None
+ ) -> None:
+ collection_length = len(self._collection)
+
+ if index is None:
+ l = collection_length
+ else:
+ if index < 0:
+ index = max(0, collection_length + index)
+ l = index
+
+ if index is None:
+ self._collection.append(
+ (key, named_column, _ColumnMetrics(self, named_column))
+ )
+ else:
+ self._collection.insert(
+ index, (key, named_column, _ColumnMetrics(self, named_column))
+ )
- def _append_new_column(self, key: str, named_column: _NAMEDCOL) -> None:
- l = len(self._collection)
- self._collection.append(
- (key, named_column, _ColumnMetrics(self, named_column))
- )
self._colset.add(named_column._deannotate())
+
+ if index is not None:
+ for idx in reversed(range(index, collection_length)):
+ self._index[idx + 1] = self._index[idx]
+
self._index[l] = (key, named_column)
self._index[key] = (key, named_column)
def replace(
self,
column: _NAMEDCOL,
+ *,
extra_remove: Optional[Iterable[_NAMEDCOL]] = None,
+ index: Optional[int] = None,
) -> None:
"""add the given column to this collection, removing unaliased
versions of this column as well as existing columns with the
remove_col.add(self._index[column.key][1])
if not remove_col:
- self._append_new_column(column.key, column)
+ self._append_new_column(column.key, column, index=index)
return
new_cols: List[Tuple[str, _NAMEDCOL, _ColumnMetrics[_NAMEDCOL]]] = []
- replaced = False
- for k, col, metrics in self._collection:
+ replace_index = None
+
+ for idx, (k, col, metrics) in enumerate(self._collection):
if col in remove_col:
- if not replaced:
- replaced = True
+ if replace_index is None:
+ replace_index = idx
new_cols.append(
(column.key, column, _ColumnMetrics(self, column))
)
for metrics in self._proxy_index.get(rc, ()):
metrics.dispose(self)
- if not replaced:
- new_cols.append((column.key, column, _ColumnMetrics(self, column)))
+ if replace_index is None:
+ if index is not None:
+ new_cols.insert(
+ index, (column.key, column, _ColumnMetrics(self, column))
+ )
+
+ else:
+ new_cols.append(
+ (column.key, column, _ColumnMetrics(self, column))
+ )
+ elif index is not None:
+ to_move = new_cols[replace_index]
+ effective_positive_index = (
+ index if index >= 0 else max(0, len(new_cols) + index)
+ )
+ new_cols.insert(index, to_move)
+ if replace_index > effective_positive_index:
+ del new_cols[replace_index + 1]
+ else:
+ del new_cols[replace_index]
self._colset.add(column._deannotate())
self._collection[:] = new_cols
@util.ro_non_memoized_property
def foreign_keys(self) -> Set[ForeignKey]: ...
- _columns: DedupeColumnCollection[Column[Any]]
+ _columns: DedupeColumnCollection[Column[Any]] # type: ignore[assignment]
_sentinel_column: Optional[Column[Any]]
"""
self._extra_dependencies.add(table)
+ def _insert_col_impl(
+ self,
+ column: ColumnClause[Any],
+ *,
+ index: Optional[int] = None,
+ replace_existing: bool = False,
+ ) -> None:
+ try:
+ column._set_parent_with_dispatch(
+ self,
+ allow_replacements=replace_existing,
+ all_names={c.name: c for c in self.c},
+ index=index,
+ )
+ except exc.DuplicateColumnError as de:
+ raise exc.DuplicateColumnError(
+ f"{de.args[0]} Specify replace_existing=True to "
+ "Table.append_column() or Table.insert_column() to replace an "
+ "existing column."
+ ) from de
+
+ def insert_column(
+ self,
+ column: ColumnClause[Any],
+ index: int,
+ *,
+ replace_existing: bool = False,
+ ) -> None:
+ """Insert a :class:`_schema.Column` to this :class:`_schema.Table` at
+ a specific position.
+
+ Behavior is identical to :meth:`.Table.append_column` except that
+ the index position can be controlled using the
+ :paramref:`.Table.insert_column.index`
+ parameter.
+
+ :param replace_existing:
+ see :paramref:`.Table.append_column.replace_existing`
+ :param index: integer index to insert the new column.
+
+ .. versionadded:: 2.1
+
+ """
+ self._insert_col_impl(
+ column, index=index, replace_existing=replace_existing
+ )
+
def append_column(
- self, column: ColumnClause[Any], replace_existing: bool = False
+ self, column: ColumnClause[Any], *, replace_existing: bool = False
) -> None:
"""Append a :class:`_schema.Column` to this :class:`_schema.Table`.
version of sqlalchemy will instead rise a warning.
.. versionadded:: 1.4.0
- """
- try:
- column._set_parent_with_dispatch(
- self,
- allow_replacements=replace_existing,
- all_names={c.name: c for c in self.c},
- )
- except exc.DuplicateColumnError as de:
- raise exc.DuplicateColumnError(
- f"{de.args[0]} Specify replace_existing=True to "
- "Table.append_column() to replace an "
- "existing column."
- ) from de
+ .. seealso::
+
+ :meth:`.Table.insert_column`
+
+ """
+ self._insert_col_impl(column, replace_existing=replace_existing)
def append_constraint(self, constraint: Union[Index, Constraint]) -> None:
"""Append a :class:`_schema.Constraint` to this
*,
all_names: Dict[str, Column[Any]],
allow_replacements: bool,
+ index: Optional[int] = None,
**kw: Any,
) -> None:
table = parent
"reflection operation, specify autoload_replace=False to "
"prevent this replacement."
)
- table._columns.replace(self, extra_remove=extra_remove)
+ table._columns.replace(self, extra_remove=extra_remove, index=index)
all_names[self.name] = self
self.table = table
doesn't support having a primary key or column
-level defaults, so implicit returning doesn't apply."""
+ _columns: DedupeColumnCollection[ColumnClause[Any]]
+
@util.ro_memoized_property
def _autoincrement_column(self) -> Optional[ColumnClause[Any]]:
"""No PK or default support so no autoincrement column."""
def __init__(self, name: str, *columns: ColumnClause[Any], **kw: Any):
super().__init__()
self.name = name
- self._columns = DedupeColumnCollection()
+ self._columns = DedupeColumnCollection() # type: ignore[unused-ignore]
self.primary_key = ColumnSet() # type: ignore
self.foreign_keys = set() # type: ignore
for c in columns:
def description(self) -> str:
return self.name
- def append_column(self, c: ColumnClause[Any]) -> None:
+ def _insert_col_impl(
+ self,
+ c: ColumnClause[Any],
+ *,
+ index: Optional[int] = None,
+ ) -> None:
existing = c.table
if existing is not None and existing is not self:
raise exc.ArgumentError(
"column object '%s' already assigned to table '%s'"
% (c.key, existing)
)
-
- self._columns.add(c)
+ self._columns.add(c, index=index)
c.table = self
+ def append_column(self, c: ColumnClause[Any]) -> None:
+ self._insert_col_impl(c)
+
+ def insert_column(self, c: ColumnClause[Any], index: int) -> None:
+ self._insert_col_impl(c, index=index)
+
@util.preload_module("sqlalchemy.sql.dml")
def insert(self) -> util.preloaded.sql_dml.Insert:
"""Generate an :class:`_sql.Insert` construct against this
Column("col", String),
)
+ @testing.combinations(
+ ((0,),), ((0, 1),), ((1, 2),), ((3,),), ((-2,),), argnames="positions"
+ )
+ @testing.variation("add_to_pk", [True, False])
+ @testing.variation("existing_pk", [True, False])
+ def test_insert_column_table(self, positions, add_to_pk, existing_pk):
+ t = Table(
+ "t",
+ MetaData(),
+ Column("a", Integer, primary_key=bool(existing_pk)),
+ Column("b", Integer),
+ Column("c", Integer),
+ )
+ expected_cols = ["a", "b", "c"]
+
+ if existing_pk:
+ expected_pk_cols = ["a"]
+ else:
+ expected_pk_cols = []
+
+ for pos in positions:
+ t.insert_column(
+ Column(f"i{pos}", Integer, primary_key=bool(add_to_pk)), pos
+ )
+ expected_cols.insert(pos, f"i{pos}")
+ if add_to_pk:
+ expected_pk_cols.append(f"i{pos}")
+ eq_([c.key for c in t.c], expected_cols)
+
+ eq_([c.key for c in t.primary_key], expected_pk_cols)
+
+ @testing.combinations(-4, -3, -2, -1, 0, 1, 2, 3)
+ def test_replace_col_with_index(self, new_index):
+ t = Table(
+ "t",
+ MetaData(),
+ Column("a", Integer),
+ Column("b", Integer),
+ Column("c", Integer),
+ Column("d", Integer),
+ )
+ newcol = Column("b", String)
+
+ expected = ["a", "q", "c", "d"]
+ expected.insert(new_index, "b")
+ expected.remove("q")
+
+ t.insert_column(newcol, index=new_index, replace_existing=True)
+ is_(t.c.b, newcol)
+ is_(t.c.b.type._type_affinity, String)
+
+ eq_([c.key for c in t.c], expected)
+
+ effective_positive_index = (
+ new_index if new_index >= 0 else max(0, 4 + new_index)
+ )
+ if effective_positive_index > 1:
+ # because we replaced
+ effective_positive_index -= 1
+
+ is_(t.c[effective_positive_index], newcol)
+
+ @testing.combinations(
+ ((0,),), ((0, 1),), ((1, 2),), ((3,),), argnames="positions"
+ )
+ def test_insert_column_tableclause(self, positions):
+ t = table(
+ "t",
+ column("a", Integer),
+ column("b", Integer),
+ column("c", Integer),
+ )
+
+ expected_cols = ["a", "b", "c"]
+ for pos in positions:
+ t.insert_column(column(f"i{pos}", Integer), pos)
+ expected_cols.insert(pos, f"i{pos}")
+
+ eq_([c.key for c in t.c], expected_cols)
+
def test_append_column_existing_name(self):
t = Table("t", MetaData(), Column("col", Integer))
with testing.expect_raises_message(
exc.DuplicateColumnError,
r"A column with name 'col' is already present in table 't'. "
- r"Specify replace_existing=True to Table.append_column\(\) to "
+ r"Specify replace_existing=True to Table.append_column\(\) or "
+ r"Table.insert_column\(\) to "
r"replace an existing column.",
):
t.append_column(Column("col", String))
with testing.expect_raises_message(
exc.DuplicateColumnError,
r"A column with key 'c2' is already present in table 't'. "
- r"Specify replace_existing=True to Table.append_column\(\) "
- r"to replace an existing column.",
+ r"Specify replace_existing=True to Table.append_column\(\) or "
+ r"Table.insert_column\(\) to "
+ r"replace an existing column.",
):
t.append_column(Column("col", String, key="c2"))
with expect_raises_message(
exc.DuplicateColumnError,
r"A column with name 'b' is already present in table 'users'. "
- r"Specify replace_existing=True to Table.append_column\(\) "
- r"to replace an existing column.",
+ r"Specify replace_existing=True to Table.append_column\(\) ",
):
t1.append_column(Column("b", String, key="b2"))
return