]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
support insert of table columns in specific positions
authorMike Bayer <mike_mp@zzzcomputing.com>
Thu, 19 Jun 2025 16:39:17 +0000 (12:39 -0400)
committerMike Bayer <mike_mp@zzzcomputing.com>
Wed, 2 Jul 2025 21:22:40 +0000 (17:22 -0400)
Added method :meth:`.TableClause.insert_column` to complement
:meth:`.TableClause.append_column`, which inserts the given column at a
specific index.   This can be helpful for prepending primary key columns to
tables, etc.

Fixes: #7910
Change-Id: Ife1eb8ad90aa61d38c457a86312cfe5d0d471106

doc/build/changelog/unreleased_21/7910.rst [new file with mode: 0644]
lib/sqlalchemy/sql/base.py
lib/sqlalchemy/sql/schema.py
lib/sqlalchemy/sql/selectable.py
test/sql/test_metadata.py

diff --git a/doc/build/changelog/unreleased_21/7910.rst b/doc/build/changelog/unreleased_21/7910.rst
new file mode 100644 (file)
index 0000000..3a95e7e
--- /dev/null
@@ -0,0 +1,9 @@
+.. change::
+    :tags: usecase, sql
+    :tickets: 7910
+
+    Added method :meth:`.TableClause.insert_column` to complement
+    :meth:`.TableClause.append_column`, which inserts the given column at a
+    specific index.   This can be helpful for prepending primary key columns to
+    tables, etc.
+
index fe6cdf6a07b7cd2a69c0f206fde801c2a22cb221..73f809198461a624dca709f95342ec71b0d29d2b 100644 (file)
@@ -1929,7 +1929,9 @@ class ColumnCollection(Generic[_COLKEY, _COL_co]):
         self._index.update({k: (k, col) for k, col, _ in reversed(collection)})
 
     def add(
-        self, column: ColumnElement[Any], key: Optional[_COLKEY] = None
+        self,
+        column: ColumnElement[Any],
+        key: Optional[_COLKEY] = None,
     ) -> None:
         """Add a column to this :class:`_sql.ColumnCollection`.
 
@@ -1960,6 +1962,7 @@ class ColumnCollection(Generic[_COLKEY, _COL_co]):
             (colkey, _column, _ColumnMetrics(self, _column))
         )
         self._colset.add(_column._deannotate())
+
         self._index[l] = (colkey, _column)
         if colkey not in self._index:
             self._index[colkey] = (colkey, _column)
@@ -2155,7 +2158,11 @@ class DedupeColumnCollection(ColumnCollection[str, _NAMEDCOL]):
     """
 
     def add(  # type: ignore[override]
-        self, column: _NAMEDCOL, key: Optional[str] = None
+        self,
+        column: _NAMEDCOL,
+        key: Optional[str] = None,
+        *,
+        index: Optional[int] = None,
     ) -> None:
         if key is not None and column.key != key:
             raise exc.ArgumentError(
@@ -2175,21 +2182,42 @@ class DedupeColumnCollection(ColumnCollection[str, _NAMEDCOL]):
             if existing is column:
                 return
 
-            self.replace(column)
+            self.replace(column, index=index)
 
             # pop out memoized proxy_set as this
             # operation may very well be occurring
             # in a _make_proxy operation
             util.memoized_property.reset(column, "proxy_set")
         else:
-            self._append_new_column(key, column)
+            self._append_new_column(key, column, index=index)
+
+    def _append_new_column(
+        self, key: str, named_column: _NAMEDCOL, *, index: Optional[int] = None
+    ) -> None:
+        collection_length = len(self._collection)
+
+        if index is None:
+            l = collection_length
+        else:
+            if index < 0:
+                index = max(0, collection_length + index)
+            l = index
+
+        if index is None:
+            self._collection.append(
+                (key, named_column, _ColumnMetrics(self, named_column))
+            )
+        else:
+            self._collection.insert(
+                index, (key, named_column, _ColumnMetrics(self, named_column))
+            )
 
-    def _append_new_column(self, key: str, named_column: _NAMEDCOL) -> None:
-        l = len(self._collection)
-        self._collection.append(
-            (key, named_column, _ColumnMetrics(self, named_column))
-        )
         self._colset.add(named_column._deannotate())
+
+        if index is not None:
+            for idx in reversed(range(index, collection_length)):
+                self._index[idx + 1] = self._index[idx]
+
         self._index[l] = (key, named_column)
         self._index[key] = (key, named_column)
 
@@ -2249,7 +2277,9 @@ class DedupeColumnCollection(ColumnCollection[str, _NAMEDCOL]):
     def replace(
         self,
         column: _NAMEDCOL,
+        *,
         extra_remove: Optional[Iterable[_NAMEDCOL]] = None,
+        index: Optional[int] = None,
     ) -> None:
         """add the given column to this collection, removing unaliased
         versions of this column  as well as existing columns with the
@@ -2281,14 +2311,15 @@ class DedupeColumnCollection(ColumnCollection[str, _NAMEDCOL]):
             remove_col.add(self._index[column.key][1])
 
         if not remove_col:
-            self._append_new_column(column.key, column)
+            self._append_new_column(column.key, column, index=index)
             return
         new_cols: List[Tuple[str, _NAMEDCOL, _ColumnMetrics[_NAMEDCOL]]] = []
-        replaced = False
-        for k, col, metrics in self._collection:
+        replace_index = None
+
+        for idx, (k, col, metrics) in enumerate(self._collection):
             if col in remove_col:
-                if not replaced:
-                    replaced = True
+                if replace_index is None:
+                    replace_index = idx
                     new_cols.append(
                         (column.key, column, _ColumnMetrics(self, column))
                     )
@@ -2302,8 +2333,26 @@ class DedupeColumnCollection(ColumnCollection[str, _NAMEDCOL]):
                 for metrics in self._proxy_index.get(rc, ()):
                     metrics.dispose(self)
 
-        if not replaced:
-            new_cols.append((column.key, column, _ColumnMetrics(self, column)))
+        if replace_index is None:
+            if index is not None:
+                new_cols.insert(
+                    index, (column.key, column, _ColumnMetrics(self, column))
+                )
+
+            else:
+                new_cols.append(
+                    (column.key, column, _ColumnMetrics(self, column))
+                )
+        elif index is not None:
+            to_move = new_cols[replace_index]
+            effective_positive_index = (
+                index if index >= 0 else max(0, len(new_cols) + index)
+            )
+            new_cols.insert(index, to_move)
+            if replace_index > effective_positive_index:
+                del new_cols[replace_index + 1]
+            else:
+                del new_cols[replace_index]
 
         self._colset.add(column._deannotate())
         self._collection[:] = new_cols
index 079fac98cc161608a46a933b164c05f57ea2ddc6..ddb6db62ef5d348d5fca0a1ba983907016082076 100644 (file)
@@ -353,7 +353,7 @@ class Table(
         @util.ro_non_memoized_property
         def foreign_keys(self) -> Set[ForeignKey]: ...
 
-    _columns: DedupeColumnCollection[Column[Any]]
+    _columns: DedupeColumnCollection[Column[Any]]  # type: ignore[assignment]
 
     _sentinel_column: Optional[Column[Any]]
 
@@ -1200,8 +1200,55 @@ class Table(
         """
         self._extra_dependencies.add(table)
 
+    def _insert_col_impl(
+        self,
+        column: ColumnClause[Any],
+        *,
+        index: Optional[int] = None,
+        replace_existing: bool = False,
+    ) -> None:
+        try:
+            column._set_parent_with_dispatch(
+                self,
+                allow_replacements=replace_existing,
+                all_names={c.name: c for c in self.c},
+                index=index,
+            )
+        except exc.DuplicateColumnError as de:
+            raise exc.DuplicateColumnError(
+                f"{de.args[0]} Specify replace_existing=True to "
+                "Table.append_column() or Table.insert_column() to replace an "
+                "existing column."
+            ) from de
+
+    def insert_column(
+        self,
+        column: ColumnClause[Any],
+        index: int,
+        *,
+        replace_existing: bool = False,
+    ) -> None:
+        """Insert a :class:`_schema.Column` to this :class:`_schema.Table` at
+        a specific position.
+
+        Behavior is identical to :meth:`.Table.append_column` except that
+        the index position can be controlled using the
+        :paramref:`.Table.insert_column.index`
+        parameter.
+
+        :param replace_existing:
+         see :paramref:`.Table.append_column.replace_existing`
+        :param index: integer index to insert the new column.
+
+        .. versionadded:: 2.1
+
+        """
+        self._insert_col_impl(
+            column, index=index, replace_existing=replace_existing
+        )
+
     def append_column(
-        self, column: ColumnClause[Any], replace_existing: bool = False
+        self, column: ColumnClause[Any], *, replace_existing: bool = False
     ) -> None:
         """Append a :class:`_schema.Column` to this :class:`_schema.Table`.
 
@@ -1226,20 +1273,13 @@ class Table(
             version of sqlalchemy will instead rise a warning.
 
             .. versionadded:: 1.4.0
-        """
 
-        try:
-            column._set_parent_with_dispatch(
-                self,
-                allow_replacements=replace_existing,
-                all_names={c.name: c for c in self.c},
-            )
-        except exc.DuplicateColumnError as de:
-            raise exc.DuplicateColumnError(
-                f"{de.args[0]} Specify replace_existing=True to "
-                "Table.append_column() to replace an "
-                "existing column."
-            ) from de
+        .. seealso::
+
+            :meth:`.Table.insert_column`
+
+        """
+        self._insert_col_impl(column, replace_existing=replace_existing)
 
     def append_constraint(self, constraint: Union[Index, Constraint]) -> None:
         """Append a :class:`_schema.Constraint` to this
@@ -2313,6 +2353,7 @@ class Column(DialectKWArgs, SchemaItem, ColumnClause[_T]):
         *,
         all_names: Dict[str, Column[Any]],
         allow_replacements: bool,
+        index: Optional[int] = None,
         **kw: Any,
     ) -> None:
         table = parent
@@ -2377,7 +2418,7 @@ class Column(DialectKWArgs, SchemaItem, ColumnClause[_T]):
                 "reflection operation, specify autoload_replace=False to "
                 "prevent this replacement."
             )
-        table._columns.replace(self, extra_remove=extra_remove)
+        table._columns.replace(self, extra_remove=extra_remove, index=index)
         all_names[self.name] = self
         self.table = table
 
index 73b936d24fafe6a7efb104181113bb38b008e837..349f189302aa268d9a0682d6a92031dcaef19f6e 100644 (file)
@@ -3127,6 +3127,8 @@ class TableClause(roles.DMLTableRole, Immutable, NamedFromClause):
     doesn't support having a primary key or column
     -level defaults, so implicit returning doesn't apply."""
 
+    _columns: DedupeColumnCollection[ColumnClause[Any]]
+
     @util.ro_memoized_property
     def _autoincrement_column(self) -> Optional[ColumnClause[Any]]:
         """No PK or default support so no autoincrement column."""
@@ -3135,7 +3137,7 @@ class TableClause(roles.DMLTableRole, Immutable, NamedFromClause):
     def __init__(self, name: str, *columns: ColumnClause[Any], **kw: Any):
         super().__init__()
         self.name = name
-        self._columns = DedupeColumnCollection()
+        self._columns = DedupeColumnCollection()  # type: ignore[unused-ignore]
         self.primary_key = ColumnSet()  # type: ignore
         self.foreign_keys = set()  # type: ignore
         for c in columns:
@@ -3174,17 +3176,27 @@ class TableClause(roles.DMLTableRole, Immutable, NamedFromClause):
     def description(self) -> str:
         return self.name
 
-    def append_column(self, c: ColumnClause[Any]) -> None:
+    def _insert_col_impl(
+        self,
+        c: ColumnClause[Any],
+        *,
+        index: Optional[int] = None,
+    ) -> None:
         existing = c.table
         if existing is not None and existing is not self:
             raise exc.ArgumentError(
                 "column object '%s' already assigned to table '%s'"
                 % (c.key, existing)
             )
-
-        self._columns.add(c)
+        self._columns.add(c, index=index)
         c.table = self
 
+    def append_column(self, c: ColumnClause[Any]) -> None:
+        self._insert_col_impl(c)
+
+    def insert_column(self, c: ColumnClause[Any], index: int) -> None:
+        self._insert_col_impl(c, index=index)
+
     @util.preload_module("sqlalchemy.sql.dml")
     def insert(self) -> util.preloaded.sql_dml.Insert:
         """Generate an :class:`_sql.Insert` construct against this
index e963fca6a3b5b8ca6595d76902cdb1eee6d8547a..690206f54da478081880030d75be2c163244947e 100644 (file)
@@ -1907,13 +1907,94 @@ class TableTest(fixtures.TestBase, AssertsCompiledSQL):
                 Column("col", String),
             )
 
+    @testing.combinations(
+        ((0,),), ((0, 1),), ((1, 2),), ((3,),), ((-2,),), argnames="positions"
+    )
+    @testing.variation("add_to_pk", [True, False])
+    @testing.variation("existing_pk", [True, False])
+    def test_insert_column_table(self, positions, add_to_pk, existing_pk):
+        t = Table(
+            "t",
+            MetaData(),
+            Column("a", Integer, primary_key=bool(existing_pk)),
+            Column("b", Integer),
+            Column("c", Integer),
+        )
+        expected_cols = ["a", "b", "c"]
+
+        if existing_pk:
+            expected_pk_cols = ["a"]
+        else:
+            expected_pk_cols = []
+
+        for pos in positions:
+            t.insert_column(
+                Column(f"i{pos}", Integer, primary_key=bool(add_to_pk)), pos
+            )
+            expected_cols.insert(pos, f"i{pos}")
+            if add_to_pk:
+                expected_pk_cols.append(f"i{pos}")
+        eq_([c.key for c in t.c], expected_cols)
+
+        eq_([c.key for c in t.primary_key], expected_pk_cols)
+
+    @testing.combinations(-4, -3, -2, -1, 0, 1, 2, 3)
+    def test_replace_col_with_index(self, new_index):
+        t = Table(
+            "t",
+            MetaData(),
+            Column("a", Integer),
+            Column("b", Integer),
+            Column("c", Integer),
+            Column("d", Integer),
+        )
+        newcol = Column("b", String)
+
+        expected = ["a", "q", "c", "d"]
+        expected.insert(new_index, "b")
+        expected.remove("q")
+
+        t.insert_column(newcol, index=new_index, replace_existing=True)
+        is_(t.c.b, newcol)
+        is_(t.c.b.type._type_affinity, String)
+
+        eq_([c.key for c in t.c], expected)
+
+        effective_positive_index = (
+            new_index if new_index >= 0 else max(0, 4 + new_index)
+        )
+        if effective_positive_index > 1:
+            # because we replaced
+            effective_positive_index -= 1
+
+        is_(t.c[effective_positive_index], newcol)
+
+    @testing.combinations(
+        ((0,),), ((0, 1),), ((1, 2),), ((3,),), argnames="positions"
+    )
+    def test_insert_column_tableclause(self, positions):
+        t = table(
+            "t",
+            column("a", Integer),
+            column("b", Integer),
+            column("c", Integer),
+        )
+
+        expected_cols = ["a", "b", "c"]
+        for pos in positions:
+            t.insert_column(column(f"i{pos}", Integer), pos)
+            expected_cols.insert(pos, f"i{pos}")
+
+        eq_([c.key for c in t.c], expected_cols)
+
     def test_append_column_existing_name(self):
         t = Table("t", MetaData(), Column("col", Integer))
 
         with testing.expect_raises_message(
             exc.DuplicateColumnError,
             r"A column with name 'col' is already present in table 't'. "
-            r"Specify replace_existing=True to Table.append_column\(\) to "
+            r"Specify replace_existing=True to Table.append_column\(\) or "
+            r"Table.insert_column\(\) to "
             r"replace an existing column.",
         ):
             t.append_column(Column("col", String))
@@ -1924,8 +2005,9 @@ class TableTest(fixtures.TestBase, AssertsCompiledSQL):
         with testing.expect_raises_message(
             exc.DuplicateColumnError,
             r"A column with key 'c2' is already present in table 't'. "
-            r"Specify replace_existing=True to Table.append_column\(\) "
-            r"to replace an existing column.",
+            r"Specify replace_existing=True to Table.append_column\(\) or "
+            r"Table.insert_column\(\) to "
+            r"replace an existing column.",
         ):
             t.append_column(Column("col", String, key="c2"))
 
@@ -3029,8 +3111,7 @@ class UseExistingTest(testing.AssertsCompiledSQL, fixtures.TablesTest):
             with expect_raises_message(
                 exc.DuplicateColumnError,
                 r"A column with name 'b' is already present in table 'users'. "
-                r"Specify replace_existing=True to Table.append_column\(\) "
-                r"to replace an existing column.",
+                r"Specify replace_existing=True to Table.append_column\(\) ",
             ):
                 t1.append_column(Column("b", String, key="b2"))
             return