From: Mike Bayer Date: Thu, 19 Jun 2025 16:39:17 +0000 (-0400) Subject: support insert of table columns in specific positions X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=637ecd40eb02b66973728a1516bc83a82f9c34ae;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git support insert of table columns in specific positions Added method :meth:`.TableClause.insert_column` to complement :meth:`.TableClause.append_column`, which inserts the given column at a specific index. This can be helpful for prepending primary key columns to tables, etc. Fixes: #7910 Change-Id: Ife1eb8ad90aa61d38c457a86312cfe5d0d471106 --- diff --git a/doc/build/changelog/unreleased_21/7910.rst b/doc/build/changelog/unreleased_21/7910.rst new file mode 100644 index 0000000000..3a95e7ea19 --- /dev/null +++ b/doc/build/changelog/unreleased_21/7910.rst @@ -0,0 +1,9 @@ +.. change:: + :tags: usecase, sql + :tickets: 7910 + + Added method :meth:`.TableClause.insert_column` to complement + :meth:`.TableClause.append_column`, which inserts the given column at a + specific index. This can be helpful for prepending primary key columns to + tables, etc. + diff --git a/lib/sqlalchemy/sql/base.py b/lib/sqlalchemy/sql/base.py index fe6cdf6a07..73f8091984 100644 --- a/lib/sqlalchemy/sql/base.py +++ b/lib/sqlalchemy/sql/base.py @@ -1929,7 +1929,9 @@ class ColumnCollection(Generic[_COLKEY, _COL_co]): self._index.update({k: (k, col) for k, col, _ in reversed(collection)}) def add( - self, column: ColumnElement[Any], key: Optional[_COLKEY] = None + self, + column: ColumnElement[Any], + key: Optional[_COLKEY] = None, ) -> None: """Add a column to this :class:`_sql.ColumnCollection`. @@ -1960,6 +1962,7 @@ class ColumnCollection(Generic[_COLKEY, _COL_co]): (colkey, _column, _ColumnMetrics(self, _column)) ) self._colset.add(_column._deannotate()) + self._index[l] = (colkey, _column) if colkey not in self._index: self._index[colkey] = (colkey, _column) @@ -2155,7 +2158,11 @@ class DedupeColumnCollection(ColumnCollection[str, _NAMEDCOL]): """ def add( # type: ignore[override] - self, column: _NAMEDCOL, key: Optional[str] = None + self, + column: _NAMEDCOL, + key: Optional[str] = None, + *, + index: Optional[int] = None, ) -> None: if key is not None and column.key != key: raise exc.ArgumentError( @@ -2175,21 +2182,42 @@ class DedupeColumnCollection(ColumnCollection[str, _NAMEDCOL]): if existing is column: return - self.replace(column) + self.replace(column, index=index) # pop out memoized proxy_set as this # operation may very well be occurring # in a _make_proxy operation util.memoized_property.reset(column, "proxy_set") else: - self._append_new_column(key, column) + self._append_new_column(key, column, index=index) + + def _append_new_column( + self, key: str, named_column: _NAMEDCOL, *, index: Optional[int] = None + ) -> None: + collection_length = len(self._collection) + + if index is None: + l = collection_length + else: + if index < 0: + index = max(0, collection_length + index) + l = index + + if index is None: + self._collection.append( + (key, named_column, _ColumnMetrics(self, named_column)) + ) + else: + self._collection.insert( + index, (key, named_column, _ColumnMetrics(self, named_column)) + ) - def _append_new_column(self, key: str, named_column: _NAMEDCOL) -> None: - l = len(self._collection) - self._collection.append( - (key, named_column, _ColumnMetrics(self, named_column)) - ) self._colset.add(named_column._deannotate()) + + if index is not None: + for idx in reversed(range(index, collection_length)): + self._index[idx + 1] = self._index[idx] + self._index[l] = (key, named_column) self._index[key] = (key, named_column) @@ -2249,7 +2277,9 @@ class DedupeColumnCollection(ColumnCollection[str, _NAMEDCOL]): def replace( self, column: _NAMEDCOL, + *, extra_remove: Optional[Iterable[_NAMEDCOL]] = None, + index: Optional[int] = None, ) -> None: """add the given column to this collection, removing unaliased versions of this column as well as existing columns with the @@ -2281,14 +2311,15 @@ class DedupeColumnCollection(ColumnCollection[str, _NAMEDCOL]): remove_col.add(self._index[column.key][1]) if not remove_col: - self._append_new_column(column.key, column) + self._append_new_column(column.key, column, index=index) return new_cols: List[Tuple[str, _NAMEDCOL, _ColumnMetrics[_NAMEDCOL]]] = [] - replaced = False - for k, col, metrics in self._collection: + replace_index = None + + for idx, (k, col, metrics) in enumerate(self._collection): if col in remove_col: - if not replaced: - replaced = True + if replace_index is None: + replace_index = idx new_cols.append( (column.key, column, _ColumnMetrics(self, column)) ) @@ -2302,8 +2333,26 @@ class DedupeColumnCollection(ColumnCollection[str, _NAMEDCOL]): for metrics in self._proxy_index.get(rc, ()): metrics.dispose(self) - if not replaced: - new_cols.append((column.key, column, _ColumnMetrics(self, column))) + if replace_index is None: + if index is not None: + new_cols.insert( + index, (column.key, column, _ColumnMetrics(self, column)) + ) + + else: + new_cols.append( + (column.key, column, _ColumnMetrics(self, column)) + ) + elif index is not None: + to_move = new_cols[replace_index] + effective_positive_index = ( + index if index >= 0 else max(0, len(new_cols) + index) + ) + new_cols.insert(index, to_move) + if replace_index > effective_positive_index: + del new_cols[replace_index + 1] + else: + del new_cols[replace_index] self._colset.add(column._deannotate()) self._collection[:] = new_cols diff --git a/lib/sqlalchemy/sql/schema.py b/lib/sqlalchemy/sql/schema.py index 079fac98cc..ddb6db62ef 100644 --- a/lib/sqlalchemy/sql/schema.py +++ b/lib/sqlalchemy/sql/schema.py @@ -353,7 +353,7 @@ class Table( @util.ro_non_memoized_property def foreign_keys(self) -> Set[ForeignKey]: ... - _columns: DedupeColumnCollection[Column[Any]] + _columns: DedupeColumnCollection[Column[Any]] # type: ignore[assignment] _sentinel_column: Optional[Column[Any]] @@ -1200,8 +1200,55 @@ class Table( """ self._extra_dependencies.add(table) + def _insert_col_impl( + self, + column: ColumnClause[Any], + *, + index: Optional[int] = None, + replace_existing: bool = False, + ) -> None: + try: + column._set_parent_with_dispatch( + self, + allow_replacements=replace_existing, + all_names={c.name: c for c in self.c}, + index=index, + ) + except exc.DuplicateColumnError as de: + raise exc.DuplicateColumnError( + f"{de.args[0]} Specify replace_existing=True to " + "Table.append_column() or Table.insert_column() to replace an " + "existing column." + ) from de + + def insert_column( + self, + column: ColumnClause[Any], + index: int, + *, + replace_existing: bool = False, + ) -> None: + """Insert a :class:`_schema.Column` to this :class:`_schema.Table` at + a specific position. + + Behavior is identical to :meth:`.Table.append_column` except that + the index position can be controlled using the + :paramref:`.Table.insert_column.index` + parameter. + + :param replace_existing: + see :paramref:`.Table.append_column.replace_existing` + :param index: integer index to insert the new column. + + .. versionadded:: 2.1 + + """ + self._insert_col_impl( + column, index=index, replace_existing=replace_existing + ) + def append_column( - self, column: ColumnClause[Any], replace_existing: bool = False + self, column: ColumnClause[Any], *, replace_existing: bool = False ) -> None: """Append a :class:`_schema.Column` to this :class:`_schema.Table`. @@ -1226,20 +1273,13 @@ class Table( version of sqlalchemy will instead rise a warning. .. versionadded:: 1.4.0 - """ - try: - column._set_parent_with_dispatch( - self, - allow_replacements=replace_existing, - all_names={c.name: c for c in self.c}, - ) - except exc.DuplicateColumnError as de: - raise exc.DuplicateColumnError( - f"{de.args[0]} Specify replace_existing=True to " - "Table.append_column() to replace an " - "existing column." - ) from de + .. seealso:: + + :meth:`.Table.insert_column` + + """ + self._insert_col_impl(column, replace_existing=replace_existing) def append_constraint(self, constraint: Union[Index, Constraint]) -> None: """Append a :class:`_schema.Constraint` to this @@ -2313,6 +2353,7 @@ class Column(DialectKWArgs, SchemaItem, ColumnClause[_T]): *, all_names: Dict[str, Column[Any]], allow_replacements: bool, + index: Optional[int] = None, **kw: Any, ) -> None: table = parent @@ -2377,7 +2418,7 @@ class Column(DialectKWArgs, SchemaItem, ColumnClause[_T]): "reflection operation, specify autoload_replace=False to " "prevent this replacement." ) - table._columns.replace(self, extra_remove=extra_remove) + table._columns.replace(self, extra_remove=extra_remove, index=index) all_names[self.name] = self self.table = table diff --git a/lib/sqlalchemy/sql/selectable.py b/lib/sqlalchemy/sql/selectable.py index 73b936d24f..349f189302 100644 --- a/lib/sqlalchemy/sql/selectable.py +++ b/lib/sqlalchemy/sql/selectable.py @@ -3127,6 +3127,8 @@ class TableClause(roles.DMLTableRole, Immutable, NamedFromClause): doesn't support having a primary key or column -level defaults, so implicit returning doesn't apply.""" + _columns: DedupeColumnCollection[ColumnClause[Any]] + @util.ro_memoized_property def _autoincrement_column(self) -> Optional[ColumnClause[Any]]: """No PK or default support so no autoincrement column.""" @@ -3135,7 +3137,7 @@ class TableClause(roles.DMLTableRole, Immutable, NamedFromClause): def __init__(self, name: str, *columns: ColumnClause[Any], **kw: Any): super().__init__() self.name = name - self._columns = DedupeColumnCollection() + self._columns = DedupeColumnCollection() # type: ignore[unused-ignore] self.primary_key = ColumnSet() # type: ignore self.foreign_keys = set() # type: ignore for c in columns: @@ -3174,17 +3176,27 @@ class TableClause(roles.DMLTableRole, Immutable, NamedFromClause): def description(self) -> str: return self.name - def append_column(self, c: ColumnClause[Any]) -> None: + def _insert_col_impl( + self, + c: ColumnClause[Any], + *, + index: Optional[int] = None, + ) -> None: existing = c.table if existing is not None and existing is not self: raise exc.ArgumentError( "column object '%s' already assigned to table '%s'" % (c.key, existing) ) - - self._columns.add(c) + self._columns.add(c, index=index) c.table = self + def append_column(self, c: ColumnClause[Any]) -> None: + self._insert_col_impl(c) + + def insert_column(self, c: ColumnClause[Any], index: int) -> None: + self._insert_col_impl(c, index=index) + @util.preload_module("sqlalchemy.sql.dml") def insert(self) -> util.preloaded.sql_dml.Insert: """Generate an :class:`_sql.Insert` construct against this diff --git a/test/sql/test_metadata.py b/test/sql/test_metadata.py index e963fca6a3..690206f54d 100644 --- a/test/sql/test_metadata.py +++ b/test/sql/test_metadata.py @@ -1907,13 +1907,94 @@ class TableTest(fixtures.TestBase, AssertsCompiledSQL): Column("col", String), ) + @testing.combinations( + ((0,),), ((0, 1),), ((1, 2),), ((3,),), ((-2,),), argnames="positions" + ) + @testing.variation("add_to_pk", [True, False]) + @testing.variation("existing_pk", [True, False]) + def test_insert_column_table(self, positions, add_to_pk, existing_pk): + t = Table( + "t", + MetaData(), + Column("a", Integer, primary_key=bool(existing_pk)), + Column("b", Integer), + Column("c", Integer), + ) + expected_cols = ["a", "b", "c"] + + if existing_pk: + expected_pk_cols = ["a"] + else: + expected_pk_cols = [] + + for pos in positions: + t.insert_column( + Column(f"i{pos}", Integer, primary_key=bool(add_to_pk)), pos + ) + expected_cols.insert(pos, f"i{pos}") + if add_to_pk: + expected_pk_cols.append(f"i{pos}") + eq_([c.key for c in t.c], expected_cols) + + eq_([c.key for c in t.primary_key], expected_pk_cols) + + @testing.combinations(-4, -3, -2, -1, 0, 1, 2, 3) + def test_replace_col_with_index(self, new_index): + t = Table( + "t", + MetaData(), + Column("a", Integer), + Column("b", Integer), + Column("c", Integer), + Column("d", Integer), + ) + newcol = Column("b", String) + + expected = ["a", "q", "c", "d"] + expected.insert(new_index, "b") + expected.remove("q") + + t.insert_column(newcol, index=new_index, replace_existing=True) + is_(t.c.b, newcol) + is_(t.c.b.type._type_affinity, String) + + eq_([c.key for c in t.c], expected) + + effective_positive_index = ( + new_index if new_index >= 0 else max(0, 4 + new_index) + ) + if effective_positive_index > 1: + # because we replaced + effective_positive_index -= 1 + + is_(t.c[effective_positive_index], newcol) + + @testing.combinations( + ((0,),), ((0, 1),), ((1, 2),), ((3,),), argnames="positions" + ) + def test_insert_column_tableclause(self, positions): + t = table( + "t", + column("a", Integer), + column("b", Integer), + column("c", Integer), + ) + + expected_cols = ["a", "b", "c"] + for pos in positions: + t.insert_column(column(f"i{pos}", Integer), pos) + expected_cols.insert(pos, f"i{pos}") + + eq_([c.key for c in t.c], expected_cols) + def test_append_column_existing_name(self): t = Table("t", MetaData(), Column("col", Integer)) with testing.expect_raises_message( exc.DuplicateColumnError, r"A column with name 'col' is already present in table 't'. " - r"Specify replace_existing=True to Table.append_column\(\) to " + r"Specify replace_existing=True to Table.append_column\(\) or " + r"Table.insert_column\(\) to " r"replace an existing column.", ): t.append_column(Column("col", String)) @@ -1924,8 +2005,9 @@ class TableTest(fixtures.TestBase, AssertsCompiledSQL): with testing.expect_raises_message( exc.DuplicateColumnError, r"A column with key 'c2' is already present in table 't'. " - r"Specify replace_existing=True to Table.append_column\(\) " - r"to replace an existing column.", + r"Specify replace_existing=True to Table.append_column\(\) or " + r"Table.insert_column\(\) to " + r"replace an existing column.", ): t.append_column(Column("col", String, key="c2")) @@ -3029,8 +3111,7 @@ class UseExistingTest(testing.AssertsCompiledSQL, fixtures.TablesTest): with expect_raises_message( exc.DuplicateColumnError, r"A column with name 'b' is already present in table 'users'. " - r"Specify replace_existing=True to Table.append_column\(\) " - r"to replace an existing column.", + r"Specify replace_existing=True to Table.append_column\(\) ", ): t1.append_column(Column("b", String, key="b2")) return