downgrade_ops=upgrade_ops.reverse(),
)
- render._render_migration_script(
+ render._render_python_into_templatevars(
autogen_context, migration_script, template_args
)
migration_context = None
"""The :class:`.MigrationContext` established by the ``env.py`` script."""
- def __init__(self, migration_context, metadata=None, opts=None):
+ def __init__(
+ self, migration_context, metadata=None,
+ opts=None, autogenerate=True):
- if migration_context is not None and migration_context.as_sql:
+ if autogenerate and \
+ migration_context is not None and migration_context.as_sql:
raise util.CommandError(
"autogenerate can't use as_sql=True as it prevents querying "
"the database for schema information")
if opts is None:
opts = migration_context.opts
+
self.metadata = metadata = opts.get('target_metadata', None) \
if metadata is None else metadata
for k, v in self.template_args.items():
template_args.setdefault(k, v)
- if migration_script._autogen_context is not None:
- render._render_migration_script(
- migration_script._autogen_context, migration_script,
- template_args
+ if getattr(migration_script, '_needs_render', False):
+ autogen_context = self._last_autogen_context
+
+ autogen_context._imports = set()
+ if migration_script.imports:
+ autogen_context._imports.union_update(migration_script.imports)
+ render._render_python_into_templatevars(
+ autogen_context, migration_script, template_args
)
return self.script_directory.generate_revision(
depends_on=migration_script.depends_on,
**template_args)
- def run_autogenerate(self, rev, context):
- if self.command_args['sql']:
- raise util.CommandError(
- "Using --sql with --autogenerate does not make any sense")
- if set(self.script_directory.get_revisions(rev)) != \
- set(self.script_directory.get_revisions("heads")):
- raise util.CommandError("Target database is not up to date.")
-
- autogen_context = AutogenContext(context)
-
- migration_script = self.generated_revisions[0]
-
- compare._populate_migration_script(autogen_context, migration_script)
+ def run_autogenerate(self, rev, migration_context):
+ self._run_environment(rev, migration_context, True)
+
+ def run_no_autogenerate(self, rev, migration_context):
+ self._run_environment(rev, migration_context, False)
+
+ def _run_environment(self, rev, migration_context, autogenerate):
+ if autogenerate:
+ if self.command_args['sql']:
+ raise util.CommandError(
+ "Using --sql with --autogenerate does not make any sense")
+ if set(self.script_directory.get_revisions(rev)) != \
+ set(self.script_directory.get_revisions("heads")):
+ raise util.CommandError("Target database is not up to date.")
+
+ upgrade_token = migration_context.opts['upgrade_token']
+ downgrade_token = migration_context.opts['downgrade_token']
+
+ migration_script = self.generated_revisions[-1]
+ if not getattr(migration_script, '_needs_render', False):
+ migration_script.upgrade_ops_list[-1].upgrade_token = upgrade_token
+ migration_script.downgrade_ops_list[-1].downgrade_token = \
+ downgrade_token
+ migration_script._needs_render = True
+ else:
+ pass
+ migration_script._upgrade_ops.append(
+ ops.UpgradeOps([], upgrade_token=upgrade_token)
+ )
+ migration_script._downgrade_ops.append(
+ ops.DowngradeOps([], downgrade_token=downgrade_token)
+ )
- hook = context.opts.get('process_revision_directives', None)
- if hook:
- hook(context, rev, self.generated_revisions)
+ self._last_autogen_context = autogen_context = \
+ AutogenContext(migration_context, autogenerate=autogenerate)
- for migration_script in self.generated_revisions:
- migration_script._autogen_context = autogen_context
+ if autogenerate:
+ compare._populate_migration_script(
+ autogen_context, migration_script)
- def run_no_autogenerate(self, rev, context):
- hook = context.opts.get('process_revision_directives', None)
+ hook = migration_context.opts['process_revision_directives']
if hook:
- hook(context, rev, self.generated_revisions)
+ hook(migration_context, rev, self.generated_revisions)
for migration_script in self.generated_revisions:
- migration_script._autogen_context = None
+ migration_script._needs_render = True
def _default_revision(self):
op = ops.MigrationScript(
rev_id=self.command_args['rev_id'] or util.rev_id(),
message=self.command_args['message'],
- imports=set(),
upgrade_ops=ops.UpgradeOps([]),
downgrade_ops=ops.DowngradeOps([]),
head=self.command_args['head'],
version_path=self.command_args['version_path'],
depends_on=self.command_args['depends_on']
)
- op._autogen_context = None
return op
def generate_scripts(self):
def _populate_migration_script(autogen_context, migration_script):
- _produce_net_changes(autogen_context, migration_script.upgrade_ops)
- migration_script.upgrade_ops.reverse_into(migration_script.downgrade_ops)
+ upgrade_ops = migration_script.upgrade_ops_list[-1]
+ downgrade_ops = migration_script.downgrade_ops_list[-1]
+
+ _produce_net_changes(autogen_context, upgrade_ops)
+ upgrade_ops.reverse_into(downgrade_ops)
comparators = util.Dispatcher(uselist=True)
return text
-def _render_migration_script(autogen_context, migration_script, template_args):
- opts = autogen_context.opts
+def _render_python_into_templatevars(
+ autogen_context, migration_script, template_args):
imports = autogen_context._imports
- template_args[opts['upgrade_token']] = _indent(_render_cmd_body(
- migration_script.upgrade_ops, autogen_context))
- template_args[opts['downgrade_token']] = _indent(_render_cmd_body(
- migration_script.downgrade_ops, autogen_context))
+
+ for upgrade_ops, downgrade_ops in zip(
+ migration_script.upgrade_ops_list,
+ migration_script.downgrade_ops_list):
+ template_args[upgrade_ops.upgrade_token] = _indent(
+ _render_cmd_body(upgrade_ops, autogen_context))
+ template_args[downgrade_ops.downgrade_token] = _indent(
+ _render_cmd_body(downgrade_ops, autogen_context))
template_args['imports'] = "\n".join(sorted(imports))
@_traverse.dispatch_for(ops.MigrationScript)
def _traverse_script(self, context, revision, directive):
- ret = self._traverse_for(context, revision, directive.upgrade_ops)
- if len(ret) != 1:
- raise ValueError(
- "Can only return single object for UpgradeOps traverse")
- directive.upgrade_ops = ret[0]
- ret = self._traverse_for(context, revision, directive.downgrade_ops)
- if len(ret) != 1:
- raise ValueError(
- "Can only return single object for DowngradeOps traverse")
- directive.downgrade_ops = ret[0]
+ upgrade_ops_list = []
+ for upgrade_ops in directive.upgrade_ops_list:
+ ret = self._traverse_for(context, revision, directive.upgrade_ops)
+ if len(ret) != 1:
+ raise ValueError(
+ "Can only return single object for UpgradeOps traverse")
+ upgrade_ops_list.append(ret[0])
+ directive.upgrade_ops = upgrade_ops_list
+
+ downgrade_ops_list = []
+ for downgrade_ops in directive.downgrade_ops_list:
+ ret = self._traverse_for(
+ context, revision, directive.downgrade_ops)
+ if len(ret) != 1:
+ raise ValueError(
+ "Can only return single object for DowngradeOps traverse")
+ downgrade_ops_list.append(ret[0])
+ directive.downgrade_ops = downgrade_ops_list
@_traverse.dispatch_for(ops.OpContainer)
def _traverse_op_container(self, context, revision, directive):
"""
+ def __init__(self, ops=(), upgrade_token="upgrades"):
+ super(UpgradeOps, self).__init__(ops=ops)
+ self.upgrade_token = upgrade_token
+
def reverse_into(self, downgrade_ops):
downgrade_ops.ops[:] = list(reversed(
[op.reverse() for op in self.ops]
"""
+ def __init__(self, ops=(), downgrade_token="downgrades"):
+ super(DowngradeOps, self).__init__(ops=ops)
+ self.downgrade_token = downgrade_token
+
def reverse(self):
return UpgradeOps(
ops=list(reversed(
A normal :class:`.MigrationScript` object would contain a single
:class:`.UpgradeOps` and a single :class:`.DowngradeOps` directive.
+ These are accessible via the ``.upgrade_ops`` and ``.downgrade_ops``
+ attributes.
+
+ In the case of an autogenerate operation that runs multiple times,
+ such as the multiple database example in the "multidb" template,
+ the ``.upgrade_ops`` and ``.downgrade_ops`` attributes are disabled,
+ and instead these objects should be accessed via the ``.upgrade_ops_list``
+ and ``.downgrade_ops_list`` list-based attributes. These latter
+ attributes are always available at the very least as single-element lists.
+
+ .. versionchanged:: 0.8.1 the ``.upgrade_ops`` and ``.downgrade_ops``
+ attributes should be accessed via the ``.upgrade_ops_list``
+ and ``.downgrade_ops_list`` attributes if multiple autogenerate
+ passes proceed on the same :class:`.MigrationScript` object.
.. seealso::
def __init__(
self, rev_id, upgrade_ops, downgrade_ops,
message=None,
- imports=None, head=None, splice=None,
+ imports=set(), head=None, splice=None,
branch_label=None, version_path=None, depends_on=None):
self.rev_id = rev_id
self.message = message
self.depends_on = depends_on
self.upgrade_ops = upgrade_ops
self.downgrade_ops = downgrade_ops
+
+ @property
+ def upgrade_ops(self):
+ """An instance of :class:`.UpgradeOps`.
+
+ .. seealso::
+
+ :attr:`.MigrationScript.upgrade_ops_list`
+ """
+ if len(self._upgrade_ops) > 1:
+ raise ValueError(
+ "This MigrationScript instance has a multiple-entry "
+ "list for UpgradeOps; please use the "
+ "upgrade_ops_list attribute.")
+ elif not self._upgrade_ops:
+ return None
+ else:
+ return self._upgrade_ops[0]
+
+ @upgrade_ops.setter
+ def upgrade_ops(self, upgrade_ops):
+ self._upgrade_ops = util.to_list(upgrade_ops)
+ for elem in self._upgrade_ops:
+ assert isinstance(elem, UpgradeOps)
+
+ @property
+ def downgrade_ops(self):
+ """An instance of :class:`.DowngradeOps`.
+
+ .. seealso::
+
+ :attr:`.MigrationScript.downgrade_ops_list`
+ """
+ if len(self._downgrade_ops) > 1:
+ raise ValueError(
+ "This MigrationScript instance has a multiple-entry "
+ "list for DowngradeOps; please use the "
+ "downgrade_ops_list attribute.")
+ elif not self._downgrade_ops:
+ return None
+ else:
+ return self._downgrade_ops[0]
+
+ @downgrade_ops.setter
+ def downgrade_ops(self, downgrade_ops):
+ self._downgrade_ops = util.to_list(downgrade_ops)
+ for elem in self._downgrade_ops:
+ assert isinstance(elem, DowngradeOps)
+
+ @property
+ def upgrade_ops_list(self):
+ """A list of :class:`.UpgradeOps` instances.
+
+ This is used in place of the :attr:`.MigrationScript.upgrade_ops`
+ attribute when dealing with a revision operation that does
+ multiple autogenerate passes.
+
+ .. versionadded:: 0.8.1
+
+ """
+ return self._upgrade_ops
+
+ @property
+ def downgrade_ops_list(self):
+ """A list of :class:`.DowngradeOps` instances.
+
+ This is used in place of the :attr:`.MigrationScript.downgrade_ops`
+ attribute when dealing with a revision operation that does
+ multiple autogenerate passes.
+
+ .. versionadded:: 0.8.1
+
+ """
+ return self._downgrade_ops
+
path = os.path.join(_get_staging_directory(), 'scripts')
if os.path.exists(path):
shutil.rmtree(path)
- command.init(cfg, path)
+ command.init(cfg, path, template=template)
if sourceless:
try:
# do an import so that a .pyc/.pyo is generated.
f.write(txt)
-def _sqlite_file_db():
+def _sqlite_file_db(tempname="foo.db"):
dir_ = os.path.join(_get_staging_directory(), 'scripts')
- url = "sqlite:///%s/foo.db" % dir_
+ url = "sqlite:///%s/%s" % (dir_, tempname)
return engines.testing_engine(url=url)
""" % (dir_, url, "true" if sourceless else "false"))
+
+
def _multi_dir_testing_config(sourceless=False):
dir_ = os.path.join(_get_staging_directory(), 'scripts')
url = "sqlite:///%s/foo.db" % dir_
""" % (c, b))
return a, b, c
+
+
+def _multidb_testing_config(engines):
+ """alembic.ini fixture to work exactly with the 'multidb' template"""
+
+ dir_ = os.path.join(_get_staging_directory(), 'scripts')
+
+ databases = ", ".join(
+ engines.keys()
+ )
+ engines = "\n\n".join(
+ "[%s]\n"
+ "sqlalchemy.url = %s" % (key, value.url)
+ for key, value in engines.items()
+ )
+
+ return _write_config_file("""
+[alembic]
+script_location = %s
+sourceless = false
+
+databases = %s
+
+%s
+[loggers]
+keys = root
+
+[handlers]
+keys = console
+
+[logger_root]
+level = WARN
+handlers = console
+qualname =
+
+[handler_console]
+class = StreamHandler
+args = (sys.stderr,)
+level = NOTSET
+formatter = generic
+
+[formatters]
+keys = generic
+
+[formatter_generic]
+format = %%(levelname)-5.5s [%%(name)s] %%(message)s
+datefmt = %%H:%%M:%%S
+ """ % (dir_, databases, engines)
+ )
from ..util.compat import py33
if py33:
- from unittest.mock import MagicMock, Mock, call, patch
+ from unittest.mock import MagicMock, Mock, call, patch, ANY
else:
try:
- from mock import MagicMock, Mock, call, patch # noqa
+ from mock import MagicMock, Mock, call, patch, ANY # noqa
except ImportError:
raise ImportError(
"SQLAlchemy's test suite requires the "
.. autoclass:: alembic.autogenerate.rewriter.Rewriter
:members:
+.. _autogen_customizing_multiengine_revision:
+
+Revision Generation with Multiple Engines / ``run_migrations()`` calls
+----------------------------------------------------------------------
+
+A lesser-used technique which allows autogenerated migrations to run
+against multiple databse backends at once, generating changes into
+a single migration script, is illustrated in the
+provided ``multidb`` template. This template features a special ``env.py``
+which iterates through multiple :class:`~sqlalchemy.engine.Engine` instances
+and calls upon :meth:`.MigrationContext.run_migrations` for each::
+
+ for name, rec in engines.items():
+ logger.info("Migrating database %s" % name)
+ context.configure(
+ connection=rec['connection'],
+ upgrade_token="%s_upgrades" % name,
+ downgrade_token="%s_downgrades" % name,
+ target_metadata=target_metadata.get(name)
+ )
+ context.run_migrations(engine_name=name)
+
+Above, :meth:`.MigrationContext.run_migrations` is run multiple times,
+once for each engine. Within the context of autogeneration, each time
+the method is called the :paramref:`~.EnvironmentContext.configure.upgrade_token`
+and :paramref:`~.EnvironmentContext.configure.downgrade_token` parameters
+are changed, so that the collection of template variables gains distinct
+entries for each engine, which are then referred to explicitly
+within ``script.py.mako``.
+
+In terms of the
+:paramref:`.EnvironmentContext.configure.process_revision_directives` hook,
+the behavior here is that the ``process_revision_directives`` hook
+is invoked **multiple times, once for each call to
+context.run_migrations()**. This means that if
+a multi-``run_migrations()`` approach is to be combined with the
+``process_revision_directives`` hook, care must be taken to use the
+hook appropriately.
+
+The first point to note is that when a **second** call to
+``run_migrations()`` occurs, the ``.upgrade_ops`` and ``.downgrade_ops``
+attributes are **converted into Python lists**, and new
+:class:`.UpgradeOps` and :class:`.DowngradeOps` objects are appended
+to these lists. Each :class:`.UpgradeOps` and :class:`.DowngradeOps`
+object maintains an ``.upgrade_token`` and a ``.downgrade_token`` attribute
+respectively, which serves to render their contents into the appropriate
+template token.
+
+For example, a multi-engine run that has the engine names ``engine1``
+and ``engine2`` will generate tokens of ``engine1_upgrades``,
+``engine1_downgrades``, ``engine2_upgrades`` and ``engine2_downgrades`` as
+it runs. The resulting migration structure would look like this::
+
+ from alembic.operations import ops
+ import sqlalchemy as sa
+
+ migration_script = ops.MigrationScript(
+ 'eced083f5df',
+ [
+ ops.UpgradeOps(
+ ops=[
+ # upgrade operations for "engine1"
+ ],
+ upgrade_token="engine1_upgrades"
+ ),
+ ops.UpgradeOps(
+ ops=[
+ # upgrade operations for "engine2"
+ ],
+ upgrade_token="engine2_upgrades"
+ ),
+ ],
+ [
+ ops.DowngradeOps(
+ ops=[
+ # downgrade operations for "engine1"
+ ],
+ downgrade_token="engine1_downgrades"
+ ),
+ ops.DowngradeOps(
+ ops=[
+ # downgrade operations for "engine2"
+ ],
+ downgrade_token="engine2_downgrades"
+ )
+ ],
+ message='migration message'
+ )
+
+
+Given the above, the following guidelines should be considered when
+the ``env.py`` script calls upon :meth:`.MigrationContext.run_migrations`
+mutiple times when running autogenerate:
+
+* If the ``process_revision_directives`` hook aims to **add elements
+ based on inspection of the current database /
+ connection**, it should do its operation **on each iteration**. This is
+ so that each time the hook runs, the database is available.
+
+* Alternatively, if the ``process_revision_directives`` hook aims to
+ **modify the list of migration directives in place**, this should
+ be called **only on the last iteration**. This is so that the hook
+ isn't being given an ever-growing structure each time which it has already
+ modified previously.
+
+* The :class:`.Rewriter` object, if used, should be called **only on the
+ last iteration**, because it will always deliver all directives every time,
+ so again to avoid double/triple/etc. processing of directives it should
+ be called only when the structure is complete.
+
+* The :attr:`.MigrationScript.upgrade_ops_list` and
+ :attr:`.MigrationScript.downgrade_ops_list` attributes should be consulted
+ when referring to the collection of :class:`.UpgradeOps` and
+ :class:`.DowngradeOps` objects.
+
+.. versionchanged:: 0.8.1 - multiple calls to
+ :meth:`.MigrationContext.run_migrations` within an autogenerate operation,
+ such as that proposed within the ``multidb`` script template,
+ are now accommodated by the new extensible migration system
+ introduced in 0.8.0.
+
+
.. _autogen_custom_ops:
Autogenerating Custom Operation Directives
.. changelog::
:version: 0.8.1
+ .. change::
+ :tags: bug, autogenerate
+ :tickets: 318
+
+ Fixed a regression 0.8 whereby the "multidb" environment template
+ failed to produce independent migration script segments for the
+ output template. This was due to the reorganization of the script
+ rendering system for 0.8. To accommodate this change, the
+ :class:`.MigrationScript` structure will in the case of multiple
+ calls to :meth:`.MigrationContext.run_migrations` produce lists
+ for the :attr:`.MigrationScript.upgrade_ops` and
+ :attr:`.MigrationScript.downgrade_ops` attributes; each :class:`.UpgradeOps`
+ and :class:`.DowngradeOps` instance keeps track of its own
+ ``upgrade_token`` and ``downgrade_token``, and each are rendered
+ individually.
+
+ .. seealso::
+
+ :ref:`autogen_customizing_multiengine_revision` - additional detail
+ on the workings of the
+ :paramref:`.EnvironmentContext.configure.process_revision_directives`
+ parameter when multiple calls to :meth:`.MigrationContext.run_migrations`
+ are made.
+
+
.. change::
:tags: feature, autogenerate
:tickets: 317
"github": "https://github.com/zzzeek/alembic/pull/%s",
}
+autodoc_default_flags = ["members"]
# Add any paths that contain templates here, relative to this directory.
templates_path = ['_templates']
def test_create_rev_autogen_db_not_up_to_date(self):
self._env_fixture()
- command.revision(self.cfg)
+ assert command.revision(self.cfg)
assert_raises_message(
util.CommandError,
"Target database is not up to date.",
_get_staging_directory, _no_sql_testing_config, env_file_fixture, \
script_file_fixture, _testing_config, _sqlite_testing_config, \
three_rev_fixture, _multi_dir_testing_config, write_script,\
- _sqlite_file_db
+ _sqlite_file_db, _multidb_testing_config
from alembic import command
from alembic.script import ScriptDirectory
from alembic.environment import EnvironmentContext
self.cfg.set_main_option("revision_environment", "true")
script = ScriptDirectory.from_config(self.cfg)
- # MARKMARK
self.model1 = util.rev_id()
self.model2 = util.rev_id()
self.model3 = util.rev_id()
),
ops.MigrationScript(
util.rev_id(),
- existing_downgrades,
+ ops.UpgradeOps(ops=existing_downgrades.ops),
ops.DowngradeOps(),
version_path=os.path.join(
_get_staging_directory(), "model2"),
)
+class ScriptAccessorTest(TestBase):
+ def test_upgrade_downgrade_ops_list_accessors(self):
+ u1 = ops.UpgradeOps(ops=[])
+ d1 = ops.DowngradeOps(ops=[])
+ m1 = ops.MigrationScript(
+ "somerev", u1, d1
+ )
+ is_(
+ m1.upgrade_ops, u1
+ )
+ is_(
+ m1.downgrade_ops, d1
+ )
+ u2 = ops.UpgradeOps(ops=[])
+ d2 = ops.DowngradeOps(ops=[])
+ m1._upgrade_ops.append(u2)
+ m1._downgrade_ops.append(d2)
+
+ assert_raises_message(
+ ValueError,
+ "This MigrationScript instance has a multiple-entry list for "
+ "UpgradeOps; please use the upgrade_ops_list attribute.",
+ getattr, m1, "upgrade_ops"
+ )
+ assert_raises_message(
+ ValueError,
+ "This MigrationScript instance has a multiple-entry list for "
+ "DowngradeOps; please use the downgrade_ops_list attribute.",
+ getattr, m1, "downgrade_ops"
+ )
+ eq_(m1.upgrade_ops_list, [u1, u2])
+ eq_(m1.downgrade_ops_list, [d1, d2])
+
+
+class ImportsTest(TestBase):
+ def setUp(self):
+ self.env = staging_env()
+ self.cfg = _sqlite_testing_config()
+
+ def tearDown(self):
+ clear_staging_env()
+
+ def _env_fixture(self, target_metadata):
+ self.engine = engine = _sqlite_file_db()
+
+ def run_env(self):
+ from alembic import context
+
+ with engine.connect() as connection:
+ context.configure(
+ connection=connection,
+ target_metadata=target_metadata)
+ with context.begin_transaction():
+ context.run_migrations()
+
+ return mock.patch(
+ "alembic.script.base.ScriptDirectory.run_env",
+ run_env
+ )
+
+ def test_imports_in_script(self):
+ from sqlalchemy import MetaData, Table, Column
+ from sqlalchemy.dialects.mysql import VARCHAR
+
+ type_ = VARCHAR(20, charset='utf8', national=True)
+
+ m = MetaData()
+
+ Table(
+ 't', m,
+ Column('x', type_)
+ )
+
+ with self._env_fixture(m):
+ rev = command.revision(
+ self.cfg, message="some message",
+ autogenerate=True)
+
+ with open(rev.path) as file_:
+ assert "from sqlalchemy.dialects import mysql" in file_.read()
+
+
+class MultiContextTest(TestBase):
+ """test the multidb template for autogenerate front-to-back"""
+
+ def setUp(self):
+ self.engine1 = _sqlite_file_db(tempname='eng1.db')
+ self.engine2 = _sqlite_file_db(tempname='eng2.db')
+ self.engine3 = _sqlite_file_db(tempname='eng3.db')
+
+ self.env = staging_env(template="multidb")
+ self.cfg = _multidb_testing_config({
+ "engine1": self.engine1,
+ "engine2": self.engine2,
+ "engine3": self.engine3
+ })
+
+ def _write_metadata(self, meta):
+ path = os.path.join(_get_staging_directory(), 'scripts', 'env.py')
+ with open(path) as env_:
+ existing_env = env_.read()
+ existing_env = existing_env.replace(
+ "target_metadata = {}",
+ meta)
+ with open(path, "w") as env_:
+ env_.write(existing_env)
+
+ def tearDown(self):
+ clear_staging_env()
+
+ def test_autogen(self):
+ self._write_metadata(
+ """
+import sqlalchemy as sa
+
+m1 = sa.MetaData()
+m2 = sa.MetaData()
+m3 = sa.MetaData()
+target_metadata = {"engine1": m1, "engine2": m2, "engine3": m3}
+
+sa.Table('e1t1', m1, sa.Column('x', sa.Integer))
+sa.Table('e2t1', m2, sa.Column('y', sa.Integer))
+sa.Table('e3t1', m3, sa.Column('z', sa.Integer))
+
+"""
+ )
+
+ rev = command.revision(
+ self.cfg, message="some message",
+ autogenerate=True
+ )
+ with mock.patch.object(rev.module, "op") as op_mock:
+ rev.module.upgrade_engine1()
+ eq_(
+ op_mock.mock_calls[-1],
+ mock.call.create_table('e1t1', mock.ANY)
+ )
+ rev.module.upgrade_engine2()
+ eq_(
+ op_mock.mock_calls[-1],
+ mock.call.create_table('e2t1', mock.ANY)
+ )
+ rev.module.upgrade_engine3()
+ eq_(
+ op_mock.mock_calls[-1],
+ mock.call.create_table('e3t1', mock.ANY)
+ )
+ rev.module.downgrade_engine1()
+ eq_(
+ op_mock.mock_calls[-1],
+ mock.call.drop_table('e1t1')
+ )
+ rev.module.downgrade_engine2()
+ eq_(
+ op_mock.mock_calls[-1],
+ mock.call.drop_table('e2t1')
+ )
+ rev.module.downgrade_engine3()
+ eq_(
+ op_mock.mock_calls[-1],
+ mock.call.drop_table('e3t1')
+ )
+
+
class RewriterTest(TestBase):
def test_all_traverse(self):
writer = autogenerate.Rewriter()