class PatchEnvironment(object):
+ branched_connection = False
+
@contextmanager
def _patch_environment(self, transactional_ddl, transaction_per_migration):
conf = EnvironmentContext.configure
# mode
assert not conn[0].in_transaction()
+ @staticmethod
+ def _branched_connection_env():
+ if config.requirements.sqlalchemy_14.enabled:
+ connect_warning = (
+ 'r"The Connection.connect\\(\\) method is considered legacy"'
+ )
+ close_warning = (
+ 'r"The .close\\(\\) method on a '
+ "so-called 'branched' connection\""
+ )
+ else:
+ connect_warning = close_warning = ""
+
+ env_file_fixture(
+ textwrap.dedent(
+ """\
+ import alembic
+ from alembic import context
+ from sqlalchemy import engine_from_config, pool
+ from sqlalchemy.testing import expect_warnings
+
+ config = context.config
+
+ target_metadata = None
+
+ def run_migrations_online():
+ connectable = engine_from_config(
+ config.get_section(config.config_ini_section),
+ prefix='sqlalchemy.',
+ poolclass=pool.NullPool)
+
+ with connectable.connect() as conn:
+
+ with expect_warnings(%(connect_warning)s):
+ connection = conn.connect()
+ try:
+ context.configure(
+ connection=connection,
+ target_metadata=target_metadata,
+ )
+ with context.begin_transaction():
+ context.run_migrations()
+ finally:
+ with expect_warnings(%(close_warning)s):
+ connection.close()
+
+ if context.is_offline_mode():
+ assert False
+ else:
+ run_migrations_online()
+ """
+ % {
+ "connect_warning": connect_warning,
+ "close_warning": close_warning,
+ }
+ )
+ )
+
@testing.combinations(
- (
- False,
- True,
- ),
- (
- True,
- False,
- ),
- (
- True,
- True,
- ),
- argnames="transactional_ddl,transaction_per_migration",
- id_="rr",
+ (False, True, False),
+ (True, False, False),
+ (True, True, False),
+ (False, True, True),
+ (True, False, True),
+ (True, True, True),
+ argnames="transactional_ddl,transaction_per_migration,branched_connection",
+ id_="rrr",
)
class ApplyVersionsFunctionalTest(PatchEnvironment, TestBase):
__only_on__ = "sqlite"
future = False
transactional_ddl = False
transaction_per_migration = True
+ branched_connection = False
def setUp(self):
self.bind = _sqlite_file_db(future=self.future)
self.cfg = _sqlite_testing_config(
sourceless=self.sourceless, future=self.future
)
+ if self.branched_connection:
+ self._branched_connection_env()
def tearDown(self):
clear_staging_env()
# class level combinations can't do the skips for SQLAlchemy 1.3
# so we have a separate class
@testing.combinations(
- (
- False,
- True,
- ),
- (
- True,
- False,
- ),
- (
- True,
- True,
- ),
+ (False, True),
+ (True, False),
+ (True, True),
argnames="transactional_ddl,transaction_per_migration",
id_="rr",
)
else:
self.cfg = _sqlite_testing_config()
+ if self.branched_connection:
+ self._branched_connection_env()
+
script = ScriptDirectory.from_config(self.cfg)
a = util.rev_id()
b = util.rev_id()
c = util.rev_id()
+
script.generate_revision(a, "revision a", refresh=True)
write_script(
script,
# these tests might not be supported anymore; the connection is always
# going to be in a transaction now even on 1.3.
- @testing.combinations((False,), (True, config.requirements.sqlalchemy_14))
- def test_raise_when_rev_leaves_open_transaction(self, future):
- a, b, c = self._opened_transaction_fixture(future)
+ def test_raise_when_rev_leaves_open_transaction(self):
+ a, b, c = self._opened_transaction_fixture()
with self._patch_environment(
transactional_ddl=False, transaction_per_migration=False
):
- if future:
- with testing.expect_raises_message(
- sa.exc.InvalidRequestError,
- "a transaction is already begun",
- ):
- command.upgrade(self.cfg, c)
- elif config.requirements.sqlalchemy_14.enabled:
+ if config.requirements.sqlalchemy_14.enabled:
if self.is_sqlalchemy_future:
with testing.expect_raises_message(
sa.exc.InvalidRequestError,
else:
command.upgrade(self.cfg, c)
- @testing.combinations((False,), (True, config.requirements.sqlalchemy_14))
- def test_raise_when_rev_leaves_open_transaction_tpm(self, future):
- a, b, c = self._opened_transaction_fixture(future)
+ def test_raise_when_rev_leaves_open_transaction_tpm(self):
+ a, b, c = self._opened_transaction_fixture()
with self._patch_environment(
transactional_ddl=False, transaction_per_migration=True
):
- if future:
- with testing.expect_raises_message(
- sa.exc.InvalidRequestError,
- "a transaction is already begun",
- ):
- command.upgrade(self.cfg, c)
- elif config.requirements.sqlalchemy_14.enabled:
+ if config.requirements.sqlalchemy_14.enabled:
if self.is_sqlalchemy_future:
with testing.expect_raises_message(
sa.exc.InvalidRequestError,
else:
command.upgrade(self.cfg, c)
- @testing.combinations((False,), (True, config.requirements.sqlalchemy_14))
- def test_noerr_rev_leaves_open_transaction_transactional_ddl(self, future):
+ def test_noerr_rev_leaves_open_transaction_transactional_ddl(self):
a, b, c = self._opened_transaction_fixture()
with self._patch_environment(
command.stamp(self.cfg, c)
+class BranchedOnlineTransactionalDDLTest(OnlineTransactionalDDLTest):
+ branched_connection = True
+
+
class FutureOnlineTransactionalDDLTest(
FutureEngineMixin, OnlineTransactionalDDLTest
):