log = logging.getLogger(__name__)
+class _ProxyTransaction(object):
+ def __init__(self, migration_context):
+ self.migration_context = migration_context
+
+ @property
+ def _proxied_transaction(self):
+ return self.migration_context._transaction
+
+ def rollback(self):
+ self._proxied_transaction.rollback()
+
+ def commit(self):
+ self._proxied_transaction.commit()
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, type_, value, traceback):
+ self._proxied_transaction.__exit__(type_, value, traceback)
+
+
class MigrationContext(object):
"""Represent the database state made available to a migration
"transaction_per_migration", False
)
self.on_version_apply_callbacks = opts.get("on_version_apply", ())
+ self._transaction = None
if as_sql:
self.connection = self._stdout_connection(connection)
return MigrationContext(dialect, connection, opts, environment_context)
+ @contextmanager
+ def autocommit_block(self):
+ """Enter an "autocommit" block, for databases that support AUTOCOMMIT
+ isolation levels.
+
+ This special directive is intended to support the occasional database
+ DDL or system operation that specifically has to be run outside of
+ any kind of transaction block. The PostgreSQL database platform
+ is the most common target for this style of operation, as many
+ of its DDL operations must be run outside of transaction blocks, even
+ though the database overall supports transactional DDL.
+
+ The method is used as a context manager within a migration script, by
+ calling on :meth:`.Operations.get_context` to retrieve the
+ :class:`.MigrationContext`, then invoking
+ :meth:`.MigrationContext.autocommit_block` using the ``with:``
+ statement::
+
+ def upgrade():
+ with op.get_context().autocommit_block():
+ op.execute("ALTER TYPE mood ADD VALUE 'soso'")
+
+ Above, a PostgreSQL "ALTER TYPE..ADD VALUE" directive is emitted,
+ which must be run outside of a transaction block at the database level.
+ The :meth:`.MigrationContext.autocommit_block` method makes use of the
+ SQLAlchemy ``AUTOCOMMIT`` isolation level setting, which against the
+ psycogp2 DBAPI corresponds to the ``connection.autocommit`` setting,
+ to ensure that the database driver is not inside of a DBAPI level
+ transaction block.
+
+ .. warning::
+
+ As is necessary, **the database transaction preceding the block is
+ unconditionally committed**. This means that the run of migrations
+ preceding the operation will be committed, before the overall
+ migration operation is complete.
+
+ It is recommended that when an application includes migrations with
+ "autocommit" blocks, that
+ :paramref:`.EnvironmentContext.transaction_per_migration` be used
+ so that the calling environment is tuned to expect short per-file
+ migrations whether or not one of them has an autocommit block.
+
+
+ .. versionadded:: 1.1.1
+
+ """
+ _in_connection_transaction = self._in_connection_transaction()
+
+ if self.impl.transactional_ddl:
+ if self.as_sql:
+ self.impl.emit_commit()
+
+ elif _in_connection_transaction:
+ assert self._transaction is not None
+
+ self._transaction.commit()
+ self._transaction = None
+
+ if not self.as_sql:
+ current_level = self.connection.get_isolation_level()
+ self.connection.execution_options(isolation_level="AUTOCOMMIT")
+ try:
+ yield
+ finally:
+ if not self.as_sql:
+ self.connection.execution_options(
+ isolation_level=current_level
+ )
+
+ if self.impl.transactional_ddl:
+ if self.as_sql:
+ self.impl.emit_begin()
+
+ elif _in_connection_transaction:
+ self._transaction = self.bind.begin()
+
def begin_transaction(self, _per_migration=False):
+ """Begin a logical transaction for migration operations.
+
+ This method is used within an ``env.py`` script to demarcate where
+ the outer "transaction" for a series of migrations begins. Example::
+
+ def run_migrations_online():
+ connectable = create_engine(...)
+
+ with connectable.connect() as connection:
+ context.configure(
+ connection=connection, target_metadata=target_metadata
+ )
+
+ with context.begin_transaction():
+ context.run_migrations()
+
+ Above, :meth:`.MigrationContext.begin_transaction` is used to demarcate
+ where the outer logical transaction occurs around the
+ :meth:`.MigrationContext.run_migrations` operation.
+
+ A "Logical" transaction means that the operation may or may not
+ correspond to a real database transaction. If the target database
+ supports transactional DDL (or
+ :paramref:`.EnvironmentContext.configure.transactional_ddl` is true),
+ the :paramref:`.EnvironmentContext.configure.transaction_per_migration`
+ flag is not set, and the migration is against a real database
+ connection (as opposed to using "offline" ``--sql`` mode), a real
+ transaction will be started. If ``--sql`` mode is in effect, the
+ operation would instead correspond to a string such as "BEGIN" being
+ emitted to the string output.
+
+ The returned object is a Python context manager that should only be
+ used in the context of a ``with:`` statement as indicated above.
+ The object has no other guaranteed API features present.
+
+ .. seealso::
+
+ :meth:`.MigrationContext.autocommit_block`
+
+ """
transaction_now = _per_migration == self._transaction_per_migration
if not transaction_now:
return begin_commit()
else:
- return self.bind.begin()
+ self._transaction = self.bind.begin()
+ return _ProxyTransaction(self)
def get_current_revision(self):
"""Return the current revision, usually that which is present
from alembic.environment import EnvironmentContext
from alembic.migration import MigrationContext
from alembic.script import ScriptDirectory
+from alembic.testing import assert_raises
from alembic.testing import config
from alembic.testing import eq_
from alembic.testing import is_
+from alembic.testing import is_false
+from alembic.testing import is_true
from alembic.testing import mock
from alembic.testing.assertions import expect_warnings
from alembic.testing.env import _no_sql_testing_config
from alembic.testing.env import write_script
from alembic.testing.fixtures import capture_context_buffer
from alembic.testing.fixtures import TestBase
+from alembic.util import compat
class EnvironmentTest(TestBase):
env.run_migrations()
eq_(migration_fn.mock_calls, [mock.call((), env._migration_context)])
+
+
+class MigrationTransactionTest(TestBase):
+ __backend__ = True
+
+ conn = None
+
+ def _fixture(self, opts):
+ self.conn = conn = config.db.connect()
+
+ if opts.get("as_sql", False):
+ self.context = MigrationContext.configure(
+ dialect=conn.dialect, opts=opts
+ )
+ self.context.output_buffer = (
+ self.context.impl.output_buffer
+ ) = compat.StringIO()
+ else:
+ self.context = MigrationContext.configure(
+ connection=conn, opts=opts
+ )
+ return self.context
+
+ def teardown(self):
+ if self.conn:
+ self.conn.close()
+
+ def test_proxy_transaction_rollback(self):
+ context = self._fixture(
+ {"transaction_per_migration": True, "transactional_ddl": True}
+ )
+
+ is_false(self.conn.in_transaction())
+ proxy = context.begin_transaction(_per_migration=True)
+ is_true(self.conn.in_transaction())
+ proxy.rollback()
+ is_false(self.conn.in_transaction())
+
+ def test_proxy_transaction_commit(self):
+ context = self._fixture(
+ {"transaction_per_migration": True, "transactional_ddl": True}
+ )
+ proxy = context.begin_transaction(_per_migration=True)
+ is_true(self.conn.in_transaction())
+ proxy.commit()
+ is_false(self.conn.in_transaction())
+
+ def test_proxy_transaction_contextmanager_commit(self):
+ context = self._fixture(
+ {"transaction_per_migration": True, "transactional_ddl": True}
+ )
+ proxy = context.begin_transaction(_per_migration=True)
+ is_true(self.conn.in_transaction())
+ with proxy:
+ pass
+ is_false(self.conn.in_transaction())
+
+ def test_proxy_transaction_contextmanager_rollback(self):
+ context = self._fixture(
+ {"transaction_per_migration": True, "transactional_ddl": True}
+ )
+ proxy = context.begin_transaction(_per_migration=True)
+ is_true(self.conn.in_transaction())
+
+ def go():
+ with proxy:
+ raise Exception("hi")
+
+ assert_raises(Exception, go)
+ is_false(self.conn.in_transaction())
+
+ def test_transaction_per_migration_transactional_ddl(self):
+ context = self._fixture(
+ {"transaction_per_migration": True, "transactional_ddl": True}
+ )
+
+ is_false(self.conn.in_transaction())
+
+ with context.begin_transaction():
+ is_false(self.conn.in_transaction())
+ with context.begin_transaction(_per_migration=True):
+ is_true(self.conn.in_transaction())
+
+ is_false(self.conn.in_transaction())
+ is_false(self.conn.in_transaction())
+
+ def test_transaction_per_migration_non_transactional_ddl(self):
+ context = self._fixture(
+ {"transaction_per_migration": True, "transactional_ddl": False}
+ )
+
+ is_false(self.conn.in_transaction())
+
+ with context.begin_transaction():
+ is_false(self.conn.in_transaction())
+ with context.begin_transaction(_per_migration=True):
+ is_false(self.conn.in_transaction())
+
+ is_false(self.conn.in_transaction())
+ is_false(self.conn.in_transaction())
+
+ def test_transaction_per_all_transactional_ddl(self):
+ context = self._fixture({"transactional_ddl": True})
+
+ is_false(self.conn.in_transaction())
+
+ with context.begin_transaction():
+ is_true(self.conn.in_transaction())
+ with context.begin_transaction(_per_migration=True):
+ is_true(self.conn.in_transaction())
+
+ is_true(self.conn.in_transaction())
+ is_false(self.conn.in_transaction())
+
+ def test_transaction_per_all_non_transactional_ddl(self):
+ context = self._fixture({"transactional_ddl": False})
+
+ is_false(self.conn.in_transaction())
+
+ with context.begin_transaction():
+ is_false(self.conn.in_transaction())
+ with context.begin_transaction(_per_migration=True):
+ is_false(self.conn.in_transaction())
+
+ is_false(self.conn.in_transaction())
+ is_false(self.conn.in_transaction())
+
+ def test_transaction_per_all_sqlmode(self):
+ context = self._fixture({"as_sql": True})
+
+ context.execute("step 1")
+ with context.begin_transaction():
+ context.execute("step 2")
+ with context.begin_transaction(_per_migration=True):
+ context.execute("step 3")
+
+ context.execute("step 4")
+ context.execute("step 5")
+
+ if context.impl.transactional_ddl:
+ self._assert_impl_steps(
+ "step 1",
+ "BEGIN",
+ "step 2",
+ "step 3",
+ "step 4",
+ "COMMIT",
+ "step 5",
+ )
+ else:
+ self._assert_impl_steps(
+ "step 1", "step 2", "step 3", "step 4", "step 5"
+ )
+
+ def test_transaction_per_migration_sqlmode(self):
+ context = self._fixture(
+ {"as_sql": True, "transaction_per_migration": True}
+ )
+
+ context.execute("step 1")
+ with context.begin_transaction():
+ context.execute("step 2")
+ with context.begin_transaction(_per_migration=True):
+ context.execute("step 3")
+
+ context.execute("step 4")
+ context.execute("step 5")
+
+ if context.impl.transactional_ddl:
+ self._assert_impl_steps(
+ "step 1",
+ "step 2",
+ "BEGIN",
+ "step 3",
+ "COMMIT",
+ "step 4",
+ "step 5",
+ )
+ else:
+ self._assert_impl_steps(
+ "step 1", "step 2", "step 3", "step 4", "step 5"
+ )
+
+ @config.requirements.autocommit_isolation
+ def test_autocommit_block(self):
+ context = self._fixture({"transaction_per_migration": True})
+
+ is_false(self.conn.in_transaction())
+
+ with context.begin_transaction():
+ is_false(self.conn.in_transaction())
+ with context.begin_transaction(_per_migration=True):
+ if context.impl.transactional_ddl:
+ is_true(self.conn.in_transaction())
+ else:
+ is_false(self.conn.in_transaction())
+
+ with context.autocommit_block():
+ is_false(self.conn.in_transaction())
+
+ if context.impl.transactional_ddl:
+ is_true(self.conn.in_transaction())
+ else:
+ is_false(self.conn.in_transaction())
+
+ is_false(self.conn.in_transaction())
+ is_false(self.conn.in_transaction())
+
+ @config.requirements.autocommit_isolation
+ def test_autocommit_block_no_transaction(self):
+ context = self._fixture({"transaction_per_migration": True})
+
+ is_false(self.conn.in_transaction())
+
+ with context.autocommit_block():
+ is_false(self.conn.in_transaction())
+ is_false(self.conn.in_transaction())
+
+ def test_autocommit_block_transactional_ddl_sqlmode(self):
+ context = self._fixture(
+ {
+ "transaction_per_migration": True,
+ "transactional_ddl": True,
+ "as_sql": True,
+ }
+ )
+
+ with context.begin_transaction():
+ context.execute("step 1")
+ with context.begin_transaction(_per_migration=True):
+ context.execute("step 2")
+
+ with context.autocommit_block():
+ context.execute("step 3")
+
+ context.execute("step 4")
+
+ context.execute("step 5")
+
+ self._assert_impl_steps(
+ "step 1",
+ "BEGIN",
+ "step 2",
+ "COMMIT",
+ "step 3",
+ "BEGIN",
+ "step 4",
+ "COMMIT",
+ "step 5",
+ )
+
+ def test_autocommit_block_nontransactional_ddl_sqlmode(self):
+ context = self._fixture(
+ {
+ "transaction_per_migration": True,
+ "transactional_ddl": False,
+ "as_sql": True,
+ }
+ )
+
+ with context.begin_transaction():
+ context.execute("step 1")
+ with context.begin_transaction(_per_migration=True):
+ context.execute("step 2")
+
+ with context.autocommit_block():
+ context.execute("step 3")
+
+ context.execute("step 4")
+
+ context.execute("step 5")
+
+ self._assert_impl_steps(
+ "step 1", "step 2", "step 3", "step 4", "step 5"
+ )
+
+ def _assert_impl_steps(self, *steps):
+ to_check = self.context.output_buffer.getvalue()
+
+ self.context.impl.output_buffer = buf = compat.StringIO()
+ for step in steps:
+ if step == "BEGIN":
+ self.context.impl.emit_begin()
+ elif step == "COMMIT":
+ self.context.impl.emit_commit()
+ else:
+ self.context.impl._exec(step)
+
+ eq_(to_check, buf.getvalue())