]> git.ipfire.org Git - thirdparty/sqlalchemy/alembic.git/commitdiff
Add operation implementation replacement
authorjustanothercatgirl <sotov@twistea.su>
Wed, 12 Nov 2025 16:51:03 +0000 (11:51 -0500)
committerMike Bayer <mike_mp@zzzcomputing.com>
Fri, 14 Nov 2025 19:43:11 +0000 (14:43 -0500)
Added :paramref:`.Operations.implementation_for.replace` parameter to
:meth:`.Operations.implementation_for`, allowing replacement of existing
operation implementations.  This allows for existing operations such as
:class:`.CreateTableOp` to be extended directly.  Pull request courtesy
justanothercatgirl.

Fixes: #1750
Closes: #1751
Pull-request: https://github.com/sqlalchemy/alembic/pull/1751
Pull-request-sha: 5eef47af3f8f14ff736f60a7dd7adc3bbfa47c62

Change-Id: I69526c3111d41640264f226f6655dc61f83595e5

alembic/operations/base.py
alembic/testing/fixtures.py
alembic/util/langhelpers.py
docs/build/api/operations.rst
docs/build/unreleased/1750.rst [new file with mode: 0644]
tests/test_op.py

index 26c3272427200fc8eb4b67b4e92f6573b6e3fbe2..bea7f9dc2c3fa86daa45f12a1e473f952396e502 100644 (file)
@@ -202,19 +202,32 @@ class AbstractOperations(util.ModuleClsProxy):
         return register
 
     @classmethod
-    def implementation_for(cls, op_cls: Any) -> Callable[[_C], _C]:
+    def implementation_for(
+        cls, op_cls: Any, replace: bool = False
+    ) -> Callable[[_C], _C]:
         """Register an implementation for a given :class:`.MigrateOperation`.
 
+        :param replace: when True, allows replacement of an already
+         registered implementation for the given operation class. This
+         enables customization of built-in operations such as
+         :class:`.CreateTableOp` by providing an alternate implementation
+         that can augment, modify, or conditionally invoke the default
+         behavior.
+
+         .. versionadded:: 1.17.2
+
         This is part of the operation extensibility API.
 
         .. seealso::
 
-            :ref:`operation_plugins` - example of use
+            :ref:`operation_plugins`
+
+            :ref:`operations_extending_builtin`
 
         """
 
         def decorate(fn: _C) -> _C:
-            cls._to_impl.dispatch_for(op_cls)(fn)
+            cls._to_impl.dispatch_for(op_cls, replace=replace)(fn)
             return fn
 
         return decorate
index 62084dccb92e9940376c1e1954dc2846f1137abb..73e421259d8a1e90dab750ed3b90f166da0a3de9 100644 (file)
@@ -96,6 +96,23 @@ class TestBase(SQLAlchemyTestBase):
 
             _connection_fixture_connection = None
 
+    @testing.fixture
+    def restore_operations(self):
+        """Restore runners for modified operations"""
+
+        saved_impls = None
+        op_cls = None
+
+        def _save_attrs(_op_cls):
+            nonlocal saved_impls, op_cls
+            saved_impls = _op_cls._to_impl._registry.copy()
+            op_cls = _op_cls
+
+        yield _save_attrs
+
+        if op_cls is not None and saved_impls is not None:
+            op_cls._to_impl._registry = saved_impls
+
     @config.fixture()
     def metadata(self, request):
         """Provide bound MetaData for a single test, dropping afterwards."""
index 80d88cbcec56e280c55c395d6f00b3a72c5946f1..baba898f17ebf708cfa26f9263b4d586c8a49e8f 100644 (file)
@@ -270,13 +270,18 @@ class Dispatcher:
         self.uselist = uselist
 
     def dispatch_for(
-        self, target: Any, qualifier: str = "default"
+        self, target: Any, qualifier: str = "default", replace: bool = False
     ) -> Callable[[_C], _C]:
         def decorate(fn: _C) -> _C:
             if self.uselist:
                 self._registry.setdefault((target, qualifier), []).append(fn)
             else:
-                assert (target, qualifier) not in self._registry
+                if (target, qualifier) in self._registry and not replace:
+                    raise ValueError(
+                        "Can not set dispatch function for object "
+                        f"{target!r}: key already exists. To replace "
+                        "existing function, use replace=True."
+                    )
                 self._registry[(target, qualifier)] = fn
             return fn
 
index 85d03be27c6a1da964aeeb994501b9ed5b72ae28..fd30bf8fc71bee8c420ef6865a608fcada933112 100644 (file)
@@ -163,3 +163,79 @@ The built-in operation objects are listed below.
 
 .. automodule:: alembic.operations.ops
     :members:
+
+.. _operations_extending_builtin:
+
+Extending Existing Operations
+==============================
+
+.. versionadded:: 1.17.2
+
+The :paramref:`.Operations.implementation_for.replace` parameter allows
+replacement of existing operation implementations, including built-in
+operations such as :class:`.CreateTableOp`. This enables customization of
+migration execution for purposes such as logging operations, running
+integrity checks, conditionally canceling operations, or adapting
+operations with dialect-specific options.
+
+The example below illustrates replacing the implementation of
+:class:`.CreateTableOp` to log each table creation to a separate metadata
+table::
+
+    from alembic import op
+    from alembic.operations import Operations
+    from alembic.operations.ops import CreateTableOp
+    from alembic.operations.toimpl import create_table as _create_table
+    from sqlalchemy import MetaData, Table, Column, String
+
+    # Define a metadata table to track table operations
+    log_table = Table(
+        "table_metadata_log",
+        MetaData(),
+        Column("operation", String),
+        Column("table_name", String),
+    )
+
+    @Operations.implementation_for(CreateTableOp, replace=True)
+    def create_table_with_logging(operations, operation):
+        # First, run the original CREATE TABLE implementation
+        _create_table(operations, operation)
+
+        # Then, log the operation to the metadata table
+        operations.execute(
+            log_table.insert().values(
+                operation="create",
+                table_name=operation.table_name
+            )
+        )
+
+The above code can be placed in the ``env.py`` file to ensure it is loaded
+before migrations run. Once registered, all ``op.create_table()`` calls
+within migration scripts will use the augmented implementation.
+
+The original implementation is imported from :mod:`alembic.operations.toimpl`
+and invoked within the replacement implementation. The ``replace`` parameter
+also enables conditional execution or complete replacement of operation
+behavior. The example below demonstrates skipping a :class:`.CreateTableOp`
+based on custom logic::
+
+    from alembic.operations import Operations
+    from alembic.operations.ops import CreateTableOp
+    from alembic.operations.toimpl import create_table as _create_table
+
+    @Operations.implementation_for(CreateTableOp, replace=True)
+    def create_table_conditional(operations, operation):
+        # Check if the table should be created based on custom logic
+        if should_create_table(operation.table_name):
+            _create_table(operations, operation)
+        else:
+            # Skip creation and optionally log
+            operations.execute(
+                "-- Skipped creation of table %s" % operation.table_name
+            )
+
+    def should_create_table(table_name):
+        # Custom logic to determine if table should be created
+        # For example, check a configuration or metadata table
+        return table_name not in get_ignored_tables()
+
diff --git a/docs/build/unreleased/1750.rst b/docs/build/unreleased/1750.rst
new file mode 100644 (file)
index 0000000..7837c64
--- /dev/null
@@ -0,0 +1,13 @@
+.. change::
+    :tags: feature, operations
+    :tickets: 1750
+
+    Added :paramref:`.Operations.implementation_for.replace` parameter to
+    :meth:`.Operations.implementation_for`, allowing replacement of existing
+    operation implementations.  This allows for existing operations such as
+    :class:`.CreateTableOp` to be extended directly.  Pull request courtesy
+    justanothercatgirl.
+
+    .. seealso::
+
+        :ref:`operations_extending_builtin`
index 473b586ef61fa7e9f2832888489214334722daa8..f03024de7f6b0d3964a2420dfcf5295c20c6dc25 100644 (file)
@@ -28,6 +28,8 @@ from alembic.operations import MigrateOperation
 from alembic.operations import Operations
 from alembic.operations import ops
 from alembic.operations import schemaobj
+from alembic.operations.toimpl import create_table as _create_table
+from alembic.operations.toimpl import drop_table as _drop_table
 from alembic.testing import assert_raises_message
 from alembic.testing import combinations
 from alembic.testing import config
@@ -1362,6 +1364,7 @@ class SQLModeOpTest(TestBase):
 
 class CustomOpTest(TestBase):
     def test_custom_op(self):
+
         @Operations.register_operation("create_sequence")
         class CreateSequenceOp(MigrateOperation):
             """Create a SEQUENCE."""
@@ -1385,6 +1388,87 @@ class CustomOpTest(TestBase):
         op.create_sequence("foob")
         context.assert_("CREATE SEQUENCE foob")
 
+    def test_replace_op(self, restore_operations):
+        restore_operations(Operations)
+        context = op_fixture()
+
+        log_table = Table(
+            "log_table",
+            MetaData(),
+            Column("action", String),
+            Column("table_name", String),
+        )
+
+        @Operations.implementation_for(ops.CreateTableOp, replace=True)
+        def create_table_proxy_log(operations, operation):
+            _create_table(operations, operation)
+            operations.execute(
+                log_table.insert().values(["create", operation.table_name])
+            )
+
+        op.create_table("some_table", Column("colname", Integer))
+
+        @Operations.implementation_for(ops.CreateTableOp, replace=True)
+        def create_table_proxy_invert(operations, operation):
+            _drop_table(operations, ops.DropTableOp(operation.table_name))
+            operations.execute(
+                log_table.insert().values(["delete", operation.table_name])
+            )
+
+        op.create_table("some_table")
+
+        context.assert_(
+            "CREATE TABLE some_table (colname INTEGER)",
+            "INSERT INTO log_table (action, table_name) "
+            "VALUES (:action, :table_name)",
+            "DROP TABLE some_table",
+            "INSERT INTO log_table (action, table_name) "
+            "VALUES (:action, :table_name)",
+        )
+
+    def test_replace_error(self):
+        with expect_raises_message(
+            ValueError,
+            "Can not set dispatch function for object "
+            "<class 'alembic.operations.ops.CreateTableOp'>: "
+            "key already exists. To replace existing function, use "
+            "replace=True.",
+        ):
+
+            @Operations.implementation_for(ops.CreateTableOp)
+            def create_table(operations, operation):
+                pass
+
+    def test_replace_custom_op(self, restore_operations):
+        restore_operations(Operations)
+        context = op_fixture()
+
+        @Operations.register_operation("create_user")
+        class CreateUserOp(MigrateOperation):
+            def __init__(self, user_name):
+                self.user_name = user_name
+
+            @classmethod
+            def create_user(cls, operations, user_name):
+                op = CreateUserOp(user_name)
+                return operations.invoke(op)
+
+        @Operations.implementation_for(CreateUserOp, replace=True)
+        def create_user(operations, operation):
+            operations.execute("CREATE USER %s" % operation.user_name)
+
+        op.create_user("bob")
+
+        @Operations.implementation_for(CreateUserOp, replace=True)
+        def create_user_alternative(operations, operation):
+            operations.execute(
+                "CREATE ROLE %s WITH LOGIN" % operation.user_name
+            )
+
+        op.create_user("bob")
+
+        context.assert_("CREATE USER bob", "CREATE ROLE bob WITH LOGIN")
+
 
 class ObjectFromToTest(TestBase):
     """Test operation round trips for to_obj() / from_obj().