From: justanothercatgirl Date: Wed, 12 Nov 2025 16:51:03 +0000 (-0500) Subject: Add operation implementation replacement X-Git-Tag: rel_1_17_2~1 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=74c54dd455ac8e5aecc24bdc597a7d2b401ae82c;p=thirdparty%2Fsqlalchemy%2Falembic.git Add operation implementation replacement Added :paramref:`.Operations.implementation_for.replace` parameter to :meth:`.Operations.implementation_for`, allowing replacement of existing operation implementations. This allows for existing operations such as :class:`.CreateTableOp` to be extended directly. Pull request courtesy justanothercatgirl. Fixes: #1750 Closes: #1751 Pull-request: https://github.com/sqlalchemy/alembic/pull/1751 Pull-request-sha: 5eef47af3f8f14ff736f60a7dd7adc3bbfa47c62 Change-Id: I69526c3111d41640264f226f6655dc61f83595e5 --- diff --git a/alembic/operations/base.py b/alembic/operations/base.py index 26c32724..bea7f9dc 100644 --- a/alembic/operations/base.py +++ b/alembic/operations/base.py @@ -202,19 +202,32 @@ class AbstractOperations(util.ModuleClsProxy): return register @classmethod - def implementation_for(cls, op_cls: Any) -> Callable[[_C], _C]: + def implementation_for( + cls, op_cls: Any, replace: bool = False + ) -> Callable[[_C], _C]: """Register an implementation for a given :class:`.MigrateOperation`. + :param replace: when True, allows replacement of an already + registered implementation for the given operation class. This + enables customization of built-in operations such as + :class:`.CreateTableOp` by providing an alternate implementation + that can augment, modify, or conditionally invoke the default + behavior. + + .. versionadded:: 1.17.2 + This is part of the operation extensibility API. .. seealso:: - :ref:`operation_plugins` - example of use + :ref:`operation_plugins` + + :ref:`operations_extending_builtin` """ def decorate(fn: _C) -> _C: - cls._to_impl.dispatch_for(op_cls)(fn) + cls._to_impl.dispatch_for(op_cls, replace=replace)(fn) return fn return decorate diff --git a/alembic/testing/fixtures.py b/alembic/testing/fixtures.py index 62084dcc..73e42125 100644 --- a/alembic/testing/fixtures.py +++ b/alembic/testing/fixtures.py @@ -96,6 +96,23 @@ class TestBase(SQLAlchemyTestBase): _connection_fixture_connection = None + @testing.fixture + def restore_operations(self): + """Restore runners for modified operations""" + + saved_impls = None + op_cls = None + + def _save_attrs(_op_cls): + nonlocal saved_impls, op_cls + saved_impls = _op_cls._to_impl._registry.copy() + op_cls = _op_cls + + yield _save_attrs + + if op_cls is not None and saved_impls is not None: + op_cls._to_impl._registry = saved_impls + @config.fixture() def metadata(self, request): """Provide bound MetaData for a single test, dropping afterwards.""" diff --git a/alembic/util/langhelpers.py b/alembic/util/langhelpers.py index 80d88cbc..baba898f 100644 --- a/alembic/util/langhelpers.py +++ b/alembic/util/langhelpers.py @@ -270,13 +270,18 @@ class Dispatcher: self.uselist = uselist def dispatch_for( - self, target: Any, qualifier: str = "default" + self, target: Any, qualifier: str = "default", replace: bool = False ) -> Callable[[_C], _C]: def decorate(fn: _C) -> _C: if self.uselist: self._registry.setdefault((target, qualifier), []).append(fn) else: - assert (target, qualifier) not in self._registry + if (target, qualifier) in self._registry and not replace: + raise ValueError( + "Can not set dispatch function for object " + f"{target!r}: key already exists. To replace " + "existing function, use replace=True." + ) self._registry[(target, qualifier)] = fn return fn diff --git a/docs/build/api/operations.rst b/docs/build/api/operations.rst index 85d03be2..fd30bf8f 100644 --- a/docs/build/api/operations.rst +++ b/docs/build/api/operations.rst @@ -163,3 +163,79 @@ The built-in operation objects are listed below. .. automodule:: alembic.operations.ops :members: + +.. _operations_extending_builtin: + +Extending Existing Operations +============================== + +.. versionadded:: 1.17.2 + +The :paramref:`.Operations.implementation_for.replace` parameter allows +replacement of existing operation implementations, including built-in +operations such as :class:`.CreateTableOp`. This enables customization of +migration execution for purposes such as logging operations, running +integrity checks, conditionally canceling operations, or adapting +operations with dialect-specific options. + +The example below illustrates replacing the implementation of +:class:`.CreateTableOp` to log each table creation to a separate metadata +table:: + + from alembic import op + from alembic.operations import Operations + from alembic.operations.ops import CreateTableOp + from alembic.operations.toimpl import create_table as _create_table + from sqlalchemy import MetaData, Table, Column, String + + # Define a metadata table to track table operations + log_table = Table( + "table_metadata_log", + MetaData(), + Column("operation", String), + Column("table_name", String), + ) + + @Operations.implementation_for(CreateTableOp, replace=True) + def create_table_with_logging(operations, operation): + # First, run the original CREATE TABLE implementation + _create_table(operations, operation) + + # Then, log the operation to the metadata table + operations.execute( + log_table.insert().values( + operation="create", + table_name=operation.table_name + ) + ) + +The above code can be placed in the ``env.py`` file to ensure it is loaded +before migrations run. Once registered, all ``op.create_table()`` calls +within migration scripts will use the augmented implementation. + +The original implementation is imported from :mod:`alembic.operations.toimpl` +and invoked within the replacement implementation. The ``replace`` parameter +also enables conditional execution or complete replacement of operation +behavior. The example below demonstrates skipping a :class:`.CreateTableOp` +based on custom logic:: + + from alembic.operations import Operations + from alembic.operations.ops import CreateTableOp + from alembic.operations.toimpl import create_table as _create_table + + @Operations.implementation_for(CreateTableOp, replace=True) + def create_table_conditional(operations, operation): + # Check if the table should be created based on custom logic + if should_create_table(operation.table_name): + _create_table(operations, operation) + else: + # Skip creation and optionally log + operations.execute( + "-- Skipped creation of table %s" % operation.table_name + ) + + def should_create_table(table_name): + # Custom logic to determine if table should be created + # For example, check a configuration or metadata table + return table_name not in get_ignored_tables() + diff --git a/docs/build/unreleased/1750.rst b/docs/build/unreleased/1750.rst new file mode 100644 index 00000000..7837c648 --- /dev/null +++ b/docs/build/unreleased/1750.rst @@ -0,0 +1,13 @@ +.. change:: + :tags: feature, operations + :tickets: 1750 + + Added :paramref:`.Operations.implementation_for.replace` parameter to + :meth:`.Operations.implementation_for`, allowing replacement of existing + operation implementations. This allows for existing operations such as + :class:`.CreateTableOp` to be extended directly. Pull request courtesy + justanothercatgirl. + + .. seealso:: + + :ref:`operations_extending_builtin` diff --git a/tests/test_op.py b/tests/test_op.py index 473b586e..f03024de 100644 --- a/tests/test_op.py +++ b/tests/test_op.py @@ -28,6 +28,8 @@ from alembic.operations import MigrateOperation from alembic.operations import Operations from alembic.operations import ops from alembic.operations import schemaobj +from alembic.operations.toimpl import create_table as _create_table +from alembic.operations.toimpl import drop_table as _drop_table from alembic.testing import assert_raises_message from alembic.testing import combinations from alembic.testing import config @@ -1362,6 +1364,7 @@ class SQLModeOpTest(TestBase): class CustomOpTest(TestBase): def test_custom_op(self): + @Operations.register_operation("create_sequence") class CreateSequenceOp(MigrateOperation): """Create a SEQUENCE.""" @@ -1385,6 +1388,87 @@ class CustomOpTest(TestBase): op.create_sequence("foob") context.assert_("CREATE SEQUENCE foob") + def test_replace_op(self, restore_operations): + restore_operations(Operations) + context = op_fixture() + + log_table = Table( + "log_table", + MetaData(), + Column("action", String), + Column("table_name", String), + ) + + @Operations.implementation_for(ops.CreateTableOp, replace=True) + def create_table_proxy_log(operations, operation): + _create_table(operations, operation) + operations.execute( + log_table.insert().values(["create", operation.table_name]) + ) + + op.create_table("some_table", Column("colname", Integer)) + + @Operations.implementation_for(ops.CreateTableOp, replace=True) + def create_table_proxy_invert(operations, operation): + _drop_table(operations, ops.DropTableOp(operation.table_name)) + operations.execute( + log_table.insert().values(["delete", operation.table_name]) + ) + + op.create_table("some_table") + + context.assert_( + "CREATE TABLE some_table (colname INTEGER)", + "INSERT INTO log_table (action, table_name) " + "VALUES (:action, :table_name)", + "DROP TABLE some_table", + "INSERT INTO log_table (action, table_name) " + "VALUES (:action, :table_name)", + ) + + def test_replace_error(self): + with expect_raises_message( + ValueError, + "Can not set dispatch function for object " + ": " + "key already exists. To replace existing function, use " + "replace=True.", + ): + + @Operations.implementation_for(ops.CreateTableOp) + def create_table(operations, operation): + pass + + def test_replace_custom_op(self, restore_operations): + restore_operations(Operations) + context = op_fixture() + + @Operations.register_operation("create_user") + class CreateUserOp(MigrateOperation): + def __init__(self, user_name): + self.user_name = user_name + + @classmethod + def create_user(cls, operations, user_name): + op = CreateUserOp(user_name) + return operations.invoke(op) + + @Operations.implementation_for(CreateUserOp, replace=True) + def create_user(operations, operation): + operations.execute("CREATE USER %s" % operation.user_name) + + op.create_user("bob") + + @Operations.implementation_for(CreateUserOp, replace=True) + def create_user_alternative(operations, operation): + operations.execute( + "CREATE ROLE %s WITH LOGIN" % operation.user_name + ) + + op.create_user("bob") + + context.assert_("CREATE USER bob", "CREATE ROLE bob WITH LOGIN") + class ObjectFromToTest(TestBase): """Test operation round trips for to_obj() / from_obj().