return register
@classmethod
- def implementation_for(cls, op_cls: Any) -> Callable[[_C], _C]:
+ def implementation_for(
+ cls, op_cls: Any, replace: bool = False
+ ) -> Callable[[_C], _C]:
"""Register an implementation for a given :class:`.MigrateOperation`.
+ :param replace: when True, allows replacement of an already
+ registered implementation for the given operation class. This
+ enables customization of built-in operations such as
+ :class:`.CreateTableOp` by providing an alternate implementation
+ that can augment, modify, or conditionally invoke the default
+ behavior.
+
+ .. versionadded:: 1.17.2
+
This is part of the operation extensibility API.
.. seealso::
- :ref:`operation_plugins` - example of use
+ :ref:`operation_plugins`
+
+ :ref:`operations_extending_builtin`
"""
def decorate(fn: _C) -> _C:
- cls._to_impl.dispatch_for(op_cls)(fn)
+ cls._to_impl.dispatch_for(op_cls, replace=replace)(fn)
return fn
return decorate
_connection_fixture_connection = None
+ @testing.fixture
+ def restore_operations(self):
+ """Restore runners for modified operations"""
+
+ saved_impls = None
+ op_cls = None
+
+ def _save_attrs(_op_cls):
+ nonlocal saved_impls, op_cls
+ saved_impls = _op_cls._to_impl._registry.copy()
+ op_cls = _op_cls
+
+ yield _save_attrs
+
+ if op_cls is not None and saved_impls is not None:
+ op_cls._to_impl._registry = saved_impls
+
@config.fixture()
def metadata(self, request):
"""Provide bound MetaData for a single test, dropping afterwards."""
self.uselist = uselist
def dispatch_for(
- self, target: Any, qualifier: str = "default"
+ self, target: Any, qualifier: str = "default", replace: bool = False
) -> Callable[[_C], _C]:
def decorate(fn: _C) -> _C:
if self.uselist:
self._registry.setdefault((target, qualifier), []).append(fn)
else:
- assert (target, qualifier) not in self._registry
+ if (target, qualifier) in self._registry and not replace:
+ raise ValueError(
+ "Can not set dispatch function for object "
+ f"{target!r}: key already exists. To replace "
+ "existing function, use replace=True."
+ )
self._registry[(target, qualifier)] = fn
return fn
.. automodule:: alembic.operations.ops
:members:
+
+.. _operations_extending_builtin:
+
+Extending Existing Operations
+==============================
+
+.. versionadded:: 1.17.2
+
+The :paramref:`.Operations.implementation_for.replace` parameter allows
+replacement of existing operation implementations, including built-in
+operations such as :class:`.CreateTableOp`. This enables customization of
+migration execution for purposes such as logging operations, running
+integrity checks, conditionally canceling operations, or adapting
+operations with dialect-specific options.
+
+The example below illustrates replacing the implementation of
+:class:`.CreateTableOp` to log each table creation to a separate metadata
+table::
+
+ from alembic import op
+ from alembic.operations import Operations
+ from alembic.operations.ops import CreateTableOp
+ from alembic.operations.toimpl import create_table as _create_table
+ from sqlalchemy import MetaData, Table, Column, String
+
+ # Define a metadata table to track table operations
+ log_table = Table(
+ "table_metadata_log",
+ MetaData(),
+ Column("operation", String),
+ Column("table_name", String),
+ )
+
+ @Operations.implementation_for(CreateTableOp, replace=True)
+ def create_table_with_logging(operations, operation):
+ # First, run the original CREATE TABLE implementation
+ _create_table(operations, operation)
+
+ # Then, log the operation to the metadata table
+ operations.execute(
+ log_table.insert().values(
+ operation="create",
+ table_name=operation.table_name
+ )
+ )
+
+The above code can be placed in the ``env.py`` file to ensure it is loaded
+before migrations run. Once registered, all ``op.create_table()`` calls
+within migration scripts will use the augmented implementation.
+
+The original implementation is imported from :mod:`alembic.operations.toimpl`
+and invoked within the replacement implementation. The ``replace`` parameter
+also enables conditional execution or complete replacement of operation
+behavior. The example below demonstrates skipping a :class:`.CreateTableOp`
+based on custom logic::
+
+ from alembic.operations import Operations
+ from alembic.operations.ops import CreateTableOp
+ from alembic.operations.toimpl import create_table as _create_table
+
+ @Operations.implementation_for(CreateTableOp, replace=True)
+ def create_table_conditional(operations, operation):
+ # Check if the table should be created based on custom logic
+ if should_create_table(operation.table_name):
+ _create_table(operations, operation)
+ else:
+ # Skip creation and optionally log
+ operations.execute(
+ "-- Skipped creation of table %s" % operation.table_name
+ )
+
+ def should_create_table(table_name):
+ # Custom logic to determine if table should be created
+ # For example, check a configuration or metadata table
+ return table_name not in get_ignored_tables()
+
--- /dev/null
+.. change::
+ :tags: feature, operations
+ :tickets: 1750
+
+ Added :paramref:`.Operations.implementation_for.replace` parameter to
+ :meth:`.Operations.implementation_for`, allowing replacement of existing
+ operation implementations. This allows for existing operations such as
+ :class:`.CreateTableOp` to be extended directly. Pull request courtesy
+ justanothercatgirl.
+
+ .. seealso::
+
+ :ref:`operations_extending_builtin`
from alembic.operations import Operations
from alembic.operations import ops
from alembic.operations import schemaobj
+from alembic.operations.toimpl import create_table as _create_table
+from alembic.operations.toimpl import drop_table as _drop_table
from alembic.testing import assert_raises_message
from alembic.testing import combinations
from alembic.testing import config
class CustomOpTest(TestBase):
def test_custom_op(self):
+
@Operations.register_operation("create_sequence")
class CreateSequenceOp(MigrateOperation):
"""Create a SEQUENCE."""
op.create_sequence("foob")
context.assert_("CREATE SEQUENCE foob")
+ def test_replace_op(self, restore_operations):
+ restore_operations(Operations)
+ context = op_fixture()
+
+ log_table = Table(
+ "log_table",
+ MetaData(),
+ Column("action", String),
+ Column("table_name", String),
+ )
+
+ @Operations.implementation_for(ops.CreateTableOp, replace=True)
+ def create_table_proxy_log(operations, operation):
+ _create_table(operations, operation)
+ operations.execute(
+ log_table.insert().values(["create", operation.table_name])
+ )
+
+ op.create_table("some_table", Column("colname", Integer))
+
+ @Operations.implementation_for(ops.CreateTableOp, replace=True)
+ def create_table_proxy_invert(operations, operation):
+ _drop_table(operations, ops.DropTableOp(operation.table_name))
+ operations.execute(
+ log_table.insert().values(["delete", operation.table_name])
+ )
+
+ op.create_table("some_table")
+
+ context.assert_(
+ "CREATE TABLE some_table (colname INTEGER)",
+ "INSERT INTO log_table (action, table_name) "
+ "VALUES (:action, :table_name)",
+ "DROP TABLE some_table",
+ "INSERT INTO log_table (action, table_name) "
+ "VALUES (:action, :table_name)",
+ )
+
+ def test_replace_error(self):
+ with expect_raises_message(
+ ValueError,
+ "Can not set dispatch function for object "
+ "<class 'alembic.operations.ops.CreateTableOp'>: "
+ "key already exists. To replace existing function, use "
+ "replace=True.",
+ ):
+
+ @Operations.implementation_for(ops.CreateTableOp)
+ def create_table(operations, operation):
+ pass
+
+ def test_replace_custom_op(self, restore_operations):
+ restore_operations(Operations)
+ context = op_fixture()
+
+ @Operations.register_operation("create_user")
+ class CreateUserOp(MigrateOperation):
+ def __init__(self, user_name):
+ self.user_name = user_name
+
+ @classmethod
+ def create_user(cls, operations, user_name):
+ op = CreateUserOp(user_name)
+ return operations.invoke(op)
+
+ @Operations.implementation_for(CreateUserOp, replace=True)
+ def create_user(operations, operation):
+ operations.execute("CREATE USER %s" % operation.user_name)
+
+ op.create_user("bob")
+
+ @Operations.implementation_for(CreateUserOp, replace=True)
+ def create_user_alternative(operations, operation):
+ operations.execute(
+ "CREATE ROLE %s WITH LOGIN" % operation.user_name
+ )
+
+ op.create_user("bob")
+
+ context.assert_("CREATE USER bob", "CREATE ROLE bob WITH LOGIN")
+
class ObjectFromToTest(TestBase):
"""Test operation round trips for to_obj() / from_obj().