from __future__ import annotations
+import logging
import os
from typing import Callable
from typing import List
from alembic.config import Config
from alembic.script.base import Script
+log = logging.getLogger(__name__)
def list_templates(config):
"""List available templates.
return scripts
+def check(
+ config: "Config",
+) -> Union[Optional["Script"], List[Optional["Script"]]]:
+ """Checks if the revision command with autogenerate has pending upgrade ops to run.
+
+ :param config: a :class:`.Config` object.
+
+ """
+
+ script_directory = ScriptDirectory.from_config(config)
+
+ command_args = dict(
+ message=None,
+ autogenerate=True,
+ sql=False,
+ head="head",
+ splice=False,
+ branch_label=None,
+ version_path=None,
+ rev_id=None,
+ depends_on=None,
+ )
+ revision_context = autogen.RevisionContext(
+ config,
+ script_directory,
+ command_args,
+ )
+
+ def retrieve_migrations(rev, context):
+ revision_context.run_autogenerate(rev, context)
+ return []
+
+ with EnvironmentContext(
+ config,
+ script_directory,
+ fn=retrieve_migrations,
+ as_sql=False,
+ template_args=revision_context.template_args,
+ revision_context=revision_context,
+ ):
+ script_directory.run_env()
+
+ # the revision_context now has MigrationScript structure(s) present.
+
+ migration_script = revision_context.generated_revisions[-1]
+ diffs = migration_script.upgrade_ops.as_diffs()
+ if diffs:
+ raise util.RevisionOpsNotEmptyError(f"Revision has upgrade ops to run: {diffs}.")
+ else:
+ log.info("Revision has no upgrade ops to run.")
+
+
def merge(
config: "Config",
revisions: str,
from typing import cast
from sqlalchemy import exc as sqla_exc
-from sqlalchemy import text
+from sqlalchemy import VARCHAR, text
from sqlalchemy.engine import Engine
+from sqlalchemy.sql.schema import Column
from alembic import __version__
from alembic import command
command.revision(self.cfg, sql=True)
+class CheckTest(TestBase):
+ def setUp(self):
+ self.env = staging_env()
+ self.cfg = _sqlite_testing_config()
+
+ def tearDown(self):
+ clear_staging_env()
+
+ def _env_fixture(self, version_table_pk=True):
+ env_file_fixture(
+ """
+
+from sqlalchemy import MetaData, engine_from_config
+target_metadata = MetaData()
+
+engine = engine_from_config(
+ config.get_section(config.config_ini_section),
+ prefix='sqlalchemy.')
+
+connection = engine.connect()
+
+context.configure(
+ connection=connection, target_metadata=target_metadata,
+ version_table_pk=%r
+)
+
+try:
+ with context.begin_transaction():
+ context.run_migrations()
+finally:
+ connection.close()
+ engine.dispose()
+
+"""
+ % (version_table_pk,)
+ )
+
+ def test_check_no_changes(self):
+ self._env_fixture()
+ command.check(self.cfg) # no problem
+
+ def test_check_changes_detected(self):
+ self._env_fixture()
+ with mock.patch(
+ "alembic.operations.ops.UpgradeOps.as_diffs",
+ return_value=[('remove_column', None, 'foo', Column('old_data', VARCHAR()))]
+ ):
+ assert_raises_message(
+ util.RevisionOpsNotEmptyError,
+ "Revision has upgrade ops to run:",
+ command.check,
+ self.cfg,
+ )
+
+
class _StampTest:
def _assert_sql(self, emitted_sql, origin, destinations):
ins_expr = (