Now alembic supports only python from version 3.6.
Change-Id: Iccf124c2d74af801d90a16c9003cdad318768625
For more elaborate CI-style test running, the tox script provided will
run against various Python / database targets. For a basic run against
-Python 3.8 using an in-memory SQLite database::
+Python 3.9 using an in-memory SQLite database::
- tox -e py38-sqlite
+ tox -e py39-sqlite
The tox runner contains a series of target combinations that can run
against various combinations of databases. The test suite can be
run against SQLite with "backend" tests also running against a PostgreSQL
database::
- tox -e py38-sqlite-postgresql
+ tox -e py39-sqlite-postgresql
Or to run just "backend" tests against a MySQL database::
- tox -e py38-mysql-backendonly
+ tox -e py39-mysql-backendonly
Running against backends other than SQLite requires that a database of that
vendor be available at a specific URL. See "Setting Up Databases" below
[db]
postgresql=postgresql://username:pass@hostname/dbname
-Now when we run ``tox -e py27-postgresql``, it will use our custom URL instead
+Now when we run ``tox -e py39-postgresql``, it will use our custom URL instead
of the fixed one in setup.cfg.
Database Configuration
import sys
-from . import context # noqa
-from . import op # noqa
+from . import context
+from . import op
from .runtime import environment
from .runtime import migration
-__version__ = "1.6.6"
+__version__ = "1.7.0"
sys.modules["alembic.migration"] = migration
sys.modules["alembic.environment"] = environment
-from .api import _render_migration_diffs # noqa
-from .api import compare_metadata # noqa
-from .api import produce_migrations # noqa
-from .api import render_python_code # noqa
-from .api import RevisionContext # noqa
-from .compare import _produce_net_changes # noqa
-from .compare import comparators # noqa
-from .render import render_op_text # noqa
-from .render import renderers # noqa
-from .rewriter import Rewriter # noqa
+from .api import _render_migration_diffs
+from .api import compare_metadata
+from .api import produce_migrations
+from .api import render_python_code
+from .api import RevisionContext
+from .compare import _produce_net_changes
+from .compare import comparators
+from .render import render_op_text
+from .render import renderers
+from .rewriter import Rewriter
)
-class AutogenContext(object):
+class AutogenContext:
"""Maintains configuration and state that's specific to an
autogenerate operation."""
return result
-class RevisionContext(object):
+class RevisionContext:
"""Maintains configuration and state that's specific to a revision
file generation operation."""
from .render import _user_defined_render
from .. import util
from ..operations import ops
-from ..util import compat
from ..util import sqla_compat
log = logging.getLogger(__name__)
log.info("Detected removed column '%s.%s'", name, cname)
-class _constraint_sig(object):
+class _constraint_sig:
def md_name_to_sql_name(self, context):
return sqla_compat._get_constraint_final_name(
self.const, context.dialect
return rendered
if isinstance(metadata_default, sa_schema.DefaultClause):
- if isinstance(metadata_default.arg, compat.string_types):
+ if isinstance(metadata_default.arg, str):
metadata_default = metadata_default.arg
else:
metadata_default = str(
compile_kwargs={"literal_binds": True},
)
)
- if isinstance(metadata_default, compat.string_types):
+ if isinstance(metadata_default, str):
if metadata_col.type._type_affinity is sqltypes.String:
metadata_default = re.sub(r"^'|'$", "", metadata_default)
return repr(metadata_default)
from collections import OrderedDict
+from io import StringIO
import re
from mako.pygen import PythonPrinter
from ..util import compat
from ..util import sqla_compat
from ..util.compat import string_types
-from ..util.compat import StringIO
MAX_PYTHON_ARGS = 255
return text
-class _f_name(object):
+class _f_name:
def __init__(self, prefix, name):
self.prefix = prefix
self.name = name
if name is None:
return name
elif isinstance(name, sql.elements.quoted_name):
- if compat.py2k:
- # the attempt to encode to ascii here isn't super ideal,
- # however we are trying to cut down on an explosion of
- # u'' literals only when py2k + SQLA 0.9, in particular
- # makes unit tests testing code generation very difficult
- try:
- return name.encode("ascii")
- except UnicodeError:
- return compat.text_type(name)
- else:
- return compat.text_type(name)
+ return compat.text_type(name)
elif isinstance(name, compat.string_types):
return name
from alembic.operations import ops
-class Rewriter(object):
+class Rewriter:
"""A helper object that allows easy 'rewriting' of ops streams.
The :class:`.Rewriter` object is intended to be passed along
from argparse import ArgumentParser
+from configparser import ConfigParser
import inspect
import os
import sys
from . import command
from . import util
from .util import compat
-from .util.compat import SafeConfigParser
-class Config(object):
+class Config:
r"""Represent an Alembic configuration.
is **copied** to a new one, stored locally as the attribute
``.config_args``. When the :attr:`.Config.file_config` attribute is
first invoked, the replacement variable ``here`` will be added to this
- dictionary before the dictionary is passed to ``SafeConfigParser()``
+ dictionary before the dictionary is passed to ``ConfigParser()``
to parse the .ini file.
:param attributes: optional dictionary of arbitrary Python keys/values,
else:
here = ""
self.config_args["here"] = here
- file_config = SafeConfigParser(self.config_args)
+ file_config = ConfigParser(self.config_args)
if self.config_file_name:
file_config.read([self.config_file_name])
else:
return self.get_section_option(self.config_ini_section, name, default)
-class CommandLine(object):
+class CommandLine:
def __init__(self, prog=None):
self._generate_args(prog)
-from . import mssql # noqa
-from . import mysql # noqa
-from . import oracle # noqa
-from . import postgresql # noqa
-from . import sqlite # noqa
-from .impl import DefaultImpl # noqa
+from . import mssql
+from . import mysql
+from . import oracle
+from . import postgresql
+from . import sqlite
+from .impl import DefaultImpl
from ..util import sqla_compat
from ..util.compat import string_types
from ..util.compat import text_type
-from ..util.compat import with_metaclass
class ImplMeta(type):
Params = namedtuple("Params", ["token0", "tokens", "args", "kwargs"])
-class DefaultImpl(with_metaclass(ImplMeta)):
+class DefaultImpl(metaclass=ImplMeta):
"""Provide the entrypoint for major migration operations,
including database-specific behavioral variances.
if None in (conn_col_default, rendered_metadata_default):
return not defaults_equal
- if compat.py2k:
- # look for a python 2 "u''" string and filter
- m = re.match(r"^u'(.*)'$", rendered_metadata_default)
- if m:
- rendered_metadata_default = "'%s'" % m.group(1)
-
# check for unquoted string and quote for PG String types
if (
not isinstance(inspector_column.type, Numeric)
-from . import toimpl # noqa
+from . import toimpl
from .base import BatchOperations
from .base import Operations
from .ops import MigrateOperation
from . import schemaobj
from .. import util
from ..util import sqla_compat
-from ..util.compat import exec_
from ..util.compat import inspect_formatargspec
from ..util.compat import inspect_getargspec
)
globals_ = {"op_cls": op_cls}
lcl = {}
- exec_(func_text, globals_, lcl)
+ exec(func_text, globals_, lcl)
setattr(cls, name, lcl[name])
fn.__func__.__doc__ = (
"This method is proxied on "
from ..util.sqla_compat import _select
-class BatchOperationsImpl(object):
+class BatchOperationsImpl:
def __init__(
self,
operations,
self.batch.append(("create_column_comment", (column,), {}))
-class ApplyBatchImpl(object):
+class ApplyBatchImpl:
def __init__(
self,
impl,
from ..util import sqla_compat
-class MigrateOperation(object):
+class MigrateOperation:
"""base class for migration command and organization objects.
This system is part of the operation extensibility API.
from .. import util
from ..util import sqla_compat
-from ..util.compat import raise_
from ..util.compat import string_types
-class SchemaObjects(object):
+class SchemaObjects:
def __init__(self, migration_context=None):
self.migration_context = migration_context
try:
const = types[type_]
except KeyError as ke:
- raise_(
- TypeError(
- "'type' can be one of %s"
- % ", ".join(sorted(repr(x) for x in types))
- ),
- from_=ke,
- )
+ raise TypeError(
+ "'type' can be one of %s"
+ % ", ".join(sorted(repr(x) for x in types))
+ ) from ke
else:
const = const(name=name)
t.append_constraint(const)
from .. import ddl
from .. import util
from ..util import sqla_compat
-from ..util.compat import callable
from ..util.compat import EncodedIO
log = logging.getLogger(__name__)
-class _ProxyTransaction(object):
+class _ProxyTransaction:
def __init__(self, migration_context):
self.migration_context = migration_context
self.migration_context._transaction = None
-class MigrationContext(object):
+class MigrationContext:
"""Represent the database state made available to a migration
script.
)
-class HeadMaintainer(object):
+class HeadMaintainer:
def __init__(self, context, heads):
self.context = context
self.heads = set(heads)
self._update_version(from_, to_)
-class MigrationInfo(object):
+class MigrationInfo:
"""Exposes information about a migration step to a callback listener.
The :class:`.MigrationInfo` object is available exclusively for the
return self.revision_map.get_revisions(self.destination_revision_ids)
-class MigrationStep(object):
+class MigrationStep:
@property
def name(self):
return self.migration_fn.__name__
-from .base import Script # noqa
-from .base import ScriptDirectory # noqa
+from .base import Script
+from .base import ScriptDirectory
__all__ = ["ScriptDirectory", "Script"]
from . import write_hooks
from .. import util
from ..runtime import migration
-from ..util import compat
try:
from dateutil import tz
_split_on_space_comma_colon = re.compile(r", *|(?: +)|\:")
-class ScriptDirectory(object):
+class ScriptDirectory:
"""Provides operations upon an Alembic script directory.
"ancestor/descendant revisions along the same branch"
)
ancestor = ancestor % {"start": start, "end": end}
- compat.raise_(util.CommandError(ancestor), from_=rna)
+ raise util.CommandError(ancestor) from rna
except revision.MultipleHeads as mh:
if not multiple_heads:
multiple_heads = (
"head_arg": end or mh.argument,
"heads": util.format_as_comma(mh.heads),
}
- compat.raise_(util.CommandError(multiple_heads), from_=mh)
+ raise util.CommandError(multiple_heads) from mh
except revision.ResolutionError as re:
if resolution is None:
resolution = "Can't locate revision identified by '%s'" % (
re.argument
)
- compat.raise_(util.CommandError(resolution), from_=re)
+ raise util.CommandError(resolution) from re
except revision.RevisionError as err:
- compat.raise_(util.CommandError(err.args[0]), from_=err)
+ raise util.CommandError(err.args[0]) from err
def walk_revisions(self, base="base", head="heads"):
"""Iterate through all revisions.
try:
Script.verify_rev_id(revid)
except revision.RevisionError as err:
- compat.raise_(util.CommandError(err.args[0]), from_=err)
+ raise util.CommandError(err.args[0]) from err
with self._catch_revision_errors(
multiple_heads=(
try:
script = Script._from_path(self, path)
except revision.RevisionError as err:
- compat.raise_(util.CommandError(err.args[0]), from_=err)
+ raise util.CommandError(err.args[0]) from err
if branch_labels and not script.branch_labels:
raise util.CommandError(
"Version %s specified branch_labels %s, however the "
super(DependencyLoopDetected, self).__init__(revision)
-class RevisionMap(object):
+class RevisionMap:
"""Maintains a map of :class:`.Revision` objects.
:class:`.RevisionMap` is used by :class:`.ScriptDirectory` to maintain
try:
nonbranch_rev = self._revision_for_ident(branch_label)
except ResolutionError as re:
- util.raise_(
- ResolutionError(
- "No such branch: '%s'" % branch_label, branch_label
- ),
- from_=re,
- )
+ raise ResolutionError(
+ "No such branch: '%s'" % branch_label, branch_label
+ ) from re
+
else:
return nonbranch_rev
else:
# No relative destination given, revision specified is absolute.
branch_label, _, symbol = target.rpartition("@")
if not branch_label:
- branch_label is None
+ branch_label = None
return branch_label, self.get_revision(symbol)
def _parse_upgrade_target(
return needs, targets
-class Revision(object):
+class Revision:
"""Base class for revisioned objects.
The :class:`.Revision` class is the base of the more public-facing
try:
hook = _registry[name]
except KeyError as ke:
- compat.raise_(
- util.CommandError("No formatter with name '%s' registered" % name),
- from_=ke,
- )
+ raise util.CommandError(
+ "No formatter with name '%s' registered" % name
+ ) from ke
else:
return hook(revision, options)
try:
type_ = opts["type"]
except KeyError as ke:
- compat.raise_(
- util.CommandError(
- "Key %s.type is required for post write hook %r"
- % (name, name)
- ),
- from_=ke,
- )
+ raise util.CommandError(
+ "Key %s.type is required for post write hook %r" % (name, name)
+ ) from ke
else:
util.status(
'Running post write hook "%s"' % name,
try:
entrypoint_name = options["entrypoint"]
except KeyError as ke:
- compat.raise_(
- util.CommandError(
- "Key %s.entrypoint is required for post write hook %r"
- % (options["_hook_name"], options["_hook_name"])
- ),
- from_=ke,
- )
+ raise util.CommandError(
+ "Key %s.entrypoint is required for post write hook %r"
+ % (options["_hook_name"], options["_hook_name"])
+ ) from ke
iter_ = pkg_resources.iter_entry_points("console_scripts", entrypoint_name)
impl = next(iter_)
cwd = options.get("cwd", None)
-from sqlalchemy.testing import config # noqa
-from sqlalchemy.testing import emits_warning # noqa
-from sqlalchemy.testing import engines # noqa
-from sqlalchemy.testing import exclusions # noqa
-from sqlalchemy.testing import mock # noqa
-from sqlalchemy.testing import provide_metadata # noqa
-from sqlalchemy.testing import uses_deprecated # noqa
-from sqlalchemy.testing.config import combinations # noqa
-from sqlalchemy.testing.config import fixture # noqa
-from sqlalchemy.testing.config import requirements as requires # noqa
+from sqlalchemy.testing import config
+from sqlalchemy.testing import emits_warning
+from sqlalchemy.testing import engines
+from sqlalchemy.testing import exclusions
+from sqlalchemy.testing import mock
+from sqlalchemy.testing import provide_metadata
+from sqlalchemy.testing import uses_deprecated
+from sqlalchemy.testing.config import combinations
+from sqlalchemy.testing.config import fixture
+from sqlalchemy.testing.config import requirements as requires
-from alembic import util # noqa
-from .assertions import assert_raises # noqa
-from .assertions import assert_raises_message # noqa
-from .assertions import emits_python_deprecation_warning # noqa
-from .assertions import eq_ # noqa
-from .assertions import eq_ignore_whitespace # noqa
-from .assertions import expect_raises # noqa
-from .assertions import expect_raises_message # noqa
-from .assertions import expect_sqlalchemy_deprecated # noqa
-from .assertions import expect_sqlalchemy_deprecated_20 # noqa
-from .assertions import expect_warnings # noqa
-from .assertions import is_ # noqa
-from .assertions import is_false # noqa
-from .assertions import is_not_ # noqa
-from .assertions import is_true # noqa
-from .assertions import ne_ # noqa
-from .fixtures import TestBase # noqa
-from .util import resolve_lambda # noqa
+from .assertions import assert_raises
+from .assertions import assert_raises_message
+from .assertions import emits_python_deprecation_warning
+from .assertions import eq_
+from .assertions import eq_ignore_whitespace
+from .assertions import expect_raises
+from .assertions import expect_raises_message
+from .assertions import expect_sqlalchemy_deprecated
+from .assertions import expect_sqlalchemy_deprecated_20
+from .assertions import expect_warnings
+from .assertions import is_
+from .assertions import is_false
+from .assertions import is_not_
+from .assertions import is_true
+from .assertions import ne_
+from .fixtures import TestBase
+from .util import resolve_lambda
try:
from sqlalchemy.testing import asyncio
from sqlalchemy.util import decorator
from ..util import sqla_compat
-from ..util.compat import py3k
def _assert_proper_exception_context(exception):
"""
- if not util.py3k:
- return
-
if (
exception.__context__ is not exception.__cause__
and not exception.__suppress_context__
return ec.error
-class _ErrorContainer(object):
+class _ErrorContainer:
error = None
def eq_ignore_whitespace(a, b, msg=None):
- # sqlalchemy.testing.assertion has this function
- # but not with the special "!U" detection part
a = re.sub(r"^\s+?|\n", "", a)
a = re.sub(r" {2,}", " ", a)
b = re.sub(r"^\s+?|\n", "", b)
b = re.sub(r" {2,}", " ", b)
- # convert for unicode string rendering,
- # using special escape character "!U"
- if py3k:
- b = re.sub(r"!U", "", b)
- else:
- b = re.sub(r"!U", "u", b)
-
assert a == b, msg or "%r != %r" % (a, b)
#!coding: utf-8
-
+import importlib.machinery
import os
import shutil
import textwrap
from .. import util
from ..script import Script
from ..script import ScriptDirectory
-from ..util.compat import get_current_bytecode_suffixes
-from ..util.compat import has_pep3147
-from ..util.compat import u
def _get_staging_directory():
py_compile.compile(path)
- if style == "simple" and has_pep3147():
+ if style == "simple":
pyc_path = util.pyc_file_from_path(path)
- suffix = get_current_bytecode_suffixes()[0]
+ suffix = importlib.machinery.BYTECODE_SUFFIXES[0]
filepath, ext = os.path.splitext(path)
simple_pyc_path = filepath + suffix
shutil.move(pyc_path, simple_pyc_path)
pyc_path = simple_pyc_path
- elif style == "pep3147" and not has_pep3147():
- raise NotImplementedError()
else:
assert style in ("pep3147", "simple")
pyc_path = util.pyc_file_from_path(path)
write_script(
script,
b,
- u(
- """# coding: utf-8
+ f"""# coding: utf-8
"Rev B, méil, %3"
-revision = '{}'
-down_revision = '{}'
+revision = '{b}'
+down_revision = '{a}'
from alembic import op
def downgrade():
op.execute("DROP STEP 2")
-"""
- ).format(b, a),
+""",
encoding="utf-8",
)
# coding: utf-8
+import configparser
from contextlib import contextmanager
import io
import re
from ..environment import EnvironmentContext
from ..migration import MigrationContext
from ..operations import Operations
-from ..util import compat
from ..util import sqla_compat
-from ..util.compat import configparser
from ..util.compat import string_types
from ..util.compat import text_type
from ..util.sqla_compat import create_mock_engine
from sqlalchemy.testing.fixtures import FutureEngineMixin
else:
- class FutureEngineMixin(object):
+ class FutureEngineMixin:
__requires__ = ("sqlalchemy_14",)
from .env import _sqlite_file_db
from sqlalchemy import event
- buf = compat.StringIO()
+ buf = io.StringIO()
eng = _sqlite_file_db()
if naming_convention:
opts["target_metadata"] = MetaData(naming_convention=naming_convention)
- class buffer_(object):
+ class buffer_:
def __init__(self):
self.lines = []
return context
-class AlterColRoundTripFixture(object):
+class AlterColRoundTripFixture:
# since these tests are about syntax, use more recent SQLAlchemy as some of
# the type / server default compare logic might not work on older
lambda: sys.version_info < (3,), "Python version 3.xx is required."
)
- @property
- def pep3147(self):
-
- return exclusions.only_if(lambda config: util.compat.has_pep3147())
-
@property
def comments(self):
return exclusions.only_if(
from sqlalchemy import util
-class CompareTable(object):
+class CompareTable:
def __init__(self, table):
self.table = table
return not self.__eq__(other)
-class CompareColumn(object):
+class CompareColumn:
def __init__(self, column):
self.column = column
return not self.__eq__(other)
-class CompareIndex(object):
+class CompareIndex:
def __init__(self, index):
self.index = index
return not self.__eq__(other)
-class CompareCheckConstraint(object):
+class CompareCheckConstraint:
def __init__(self, constraint):
self.constraint = constraint
return not self.__eq__(other)
-class CompareForeignKey(object):
+class CompareForeignKey:
def __init__(self, constraint):
self.constraint = constraint
return not self.__eq__(other)
-class ComparePrimaryKey(object):
+class ComparePrimaryKey:
def __init__(self, constraint):
self.constraint = constraint
return not self.__eq__(other)
-class CompareUniqueConstraint(object):
+class CompareUniqueConstraint:
def __init__(self, constraint):
self.constraint = constraint
_default_name_filters = None
-class ModelOne(object):
+class ModelOne:
__requires__ = ("unique_constraint_reflection",)
schema = None
return m
-class _ComparesFKs(object):
+class _ComparesFKs:
def _assert_fk_diff(
self,
diff,
from ...testing import eq_
from ...testing import mock
from ...testing import TestBase
-from ...util import compat
-
-py3k = compat.py3k
class AutogenerateCommentsTest(AutogenFixtureTest, TestBase):
from ...testing import eq_
from ...testing import is_
from ...testing import TestBase
-from ...util import compat
-
-py3k = compat.py3k
class AlterColumnTest(AutogenFixtureTest, TestBase):
from ...testing import eq_
from ...testing import mock
from ...testing import TestBase
-from ...util import compat
-
-py3k = compat.py3k
class AutogenerateForeignKeysTest(AutogenFixtureTest, TestBase):
-#!coding: utf-8
+import io
+
from ...migration import MigrationContext
from ...testing import assert_raises
from ...testing import config
from ...testing import is_false
from ...testing import is_true
from ...testing.fixtures import TestBase
-from ...util import compat
class MigrationTransactionTest(TestBase):
)
self.context.output_buffer = (
self.context.impl.output_buffer
- ) = compat.StringIO()
+ ) = io.StringIO()
else:
self.context = MigrationContext.configure(
connection=conn, opts=opts
def _assert_impl_steps(self, *steps):
to_check = self.context.output_buffer.getvalue()
- self.context.impl.output_buffer = buf = compat.StringIO()
+ self.context.impl.output_buffer = buf = io.StringIO()
for step in steps:
if step == "BEGIN":
self.context.impl.emit_begin()
-from .compat import raise_ # noqa
-from .editor import open_in_editor # noqa
+from .editor import open_in_editor
from .exc import CommandError
-from .langhelpers import _with_legacy_names # noqa
-from .langhelpers import asbool # noqa
-from .langhelpers import dedupe_tuple # noqa
-from .langhelpers import Dispatcher # noqa
-from .langhelpers import immutabledict # noqa
-from .langhelpers import memoized_property # noqa
-from .langhelpers import ModuleClsProxy # noqa
-from .langhelpers import rev_id # noqa
-from .langhelpers import to_list # noqa
-from .langhelpers import to_tuple # noqa
-from .langhelpers import unique_list # noqa
-from .messaging import err # noqa
-from .messaging import format_as_comma # noqa
-from .messaging import msg # noqa
-from .messaging import obfuscate_url_pw # noqa
-from .messaging import status # noqa
-from .messaging import warn # noqa
-from .messaging import write_outstream # noqa
-from .pyfiles import coerce_resource_to_filename # noqa
-from .pyfiles import load_python_file # noqa
-from .pyfiles import pyc_file_from_path # noqa
-from .pyfiles import template_to_file # noqa
-from .sqla_compat import has_computed # noqa
-from .sqla_compat import sqla_13 # noqa
-from .sqla_compat import sqla_14 # noqa
+from .langhelpers import _with_legacy_names
+from .langhelpers import asbool
+from .langhelpers import dedupe_tuple
+from .langhelpers import Dispatcher
+from .langhelpers import immutabledict
+from .langhelpers import memoized_property
+from .langhelpers import ModuleClsProxy
+from .langhelpers import rev_id
+from .langhelpers import to_list
+from .langhelpers import to_tuple
+from .langhelpers import unique_list
+from .messaging import err
+from .messaging import format_as_comma
+from .messaging import msg
+from .messaging import obfuscate_url_pw
+from .messaging import status
+from .messaging import warn
+from .messaging import write_outstream
+from .pyfiles import coerce_resource_to_filename
+from .pyfiles import load_python_file
+from .pyfiles import pyc_file_from_path
+from .pyfiles import template_to_file
+from .sqla_compat import has_computed
+from .sqla_compat import sqla_13
+from .sqla_compat import sqla_14
if not sqla_13:
import inspect
import io
import os
-import sys
-py2k = sys.version_info.major < 3
-py3k = sys.version_info.major >= 3
-py36 = sys.version_info >= (3, 6)
is_posix = os.name == "posix"
-
ArgSpec = collections.namedtuple(
"ArgSpec", ["args", "varargs", "keywords", "defaults"]
)
nargs = co.co_argcount
names = co.co_varnames
- nkwargs = co.co_kwonlyargcount if py3k else 0
+ nkwargs = co.co_kwonlyargcount
args = list(names[:nargs])
nargs += nkwargs
return ArgSpec(args, varargs, varkw, func.__defaults__)
-if py3k:
- from io import StringIO
-else:
- # accepts strings
- from StringIO import StringIO # noqa
-
-if py3k:
- import builtins as compat_builtins
-
- string_types = (str,)
- binary_type = bytes
- text_type = str
-
- def callable(fn): # noqa
- return hasattr(fn, "__call__")
-
- def u(s):
- return s
-
- def ue(s):
- return s
-
- range = range # noqa
-else:
- import __builtin__ as compat_builtins
-
- string_types = (basestring,) # noqa
- binary_type = str
- text_type = unicode # noqa
- callable = callable # noqa
-
- def u(s):
- return unicode(s, "utf-8") # noqa
-
- def ue(s):
- return unicode(s, "unicode_escape") # noqa
-
- range = xrange # noqa
-
-if py3k:
- import collections.abc as collections_abc
-else:
- import collections as collections_abc # noqa
-
-if py3k:
-
- def _formatannotation(annotation, base_module=None):
- """vendored from python 3.7"""
-
- if getattr(annotation, "__module__", None) == "typing":
- return repr(annotation).replace("typing.", "")
- if isinstance(annotation, type):
- if annotation.__module__ in ("builtins", base_module):
- return annotation.__qualname__
- return annotation.__module__ + "." + annotation.__qualname__
- return repr(annotation)
-
- def inspect_formatargspec(
- args,
- varargs=None,
- varkw=None,
- defaults=None,
- kwonlyargs=(),
- kwonlydefaults={},
- annotations={},
- formatarg=str,
- formatvarargs=lambda name: "*" + name,
- formatvarkw=lambda name: "**" + name,
- formatvalue=lambda value: "=" + repr(value),
- formatreturns=lambda text: " -> " + text,
- formatannotation=_formatannotation,
- ):
- """Copy formatargspec from python 3.7 standard library.
-
- Python 3 has deprecated formatargspec and requested that Signature
- be used instead, however this requires a full reimplementation
- of formatargspec() in terms of creating Parameter objects and such.
- Instead of introducing all the object-creation overhead and having
- to reinvent from scratch, just copy their compatibility routine.
-
- """
-
- def formatargandannotation(arg):
- result = formatarg(arg)
- if arg in annotations:
- result += ": " + formatannotation(annotations[arg])
- return result
-
- specs = []
- if defaults:
- firstdefault = len(args) - len(defaults)
- for i, arg in enumerate(args):
- spec = formatargandannotation(arg)
- if defaults and i >= firstdefault:
- spec = spec + formatvalue(defaults[i - firstdefault])
- specs.append(spec)
- if varargs is not None:
- specs.append(formatvarargs(formatargandannotation(varargs)))
- else:
- if kwonlyargs:
- specs.append("*")
- if kwonlyargs:
- for kwonlyarg in kwonlyargs:
- spec = formatargandannotation(kwonlyarg)
- if kwonlydefaults and kwonlyarg in kwonlydefaults:
- spec += formatvalue(kwonlydefaults[kwonlyarg])
- specs.append(spec)
- if varkw is not None:
- specs.append(formatvarkw(formatargandannotation(varkw)))
- result = "(" + ", ".join(specs) + ")"
- if "return" in annotations:
- result += formatreturns(formatannotation(annotations["return"]))
+string_types = (str,)
+binary_type = bytes
+text_type = str
+
+
+def _formatannotation(annotation, base_module=None):
+ """vendored from python 3.7"""
+
+ if getattr(annotation, "__module__", None) == "typing":
+ return repr(annotation).replace("typing.", "")
+ if isinstance(annotation, type):
+ if annotation.__module__ in ("builtins", base_module):
+ return annotation.__qualname__
+ return annotation.__module__ + "." + annotation.__qualname__
+ return repr(annotation)
+
+
+def inspect_formatargspec(
+ args,
+ varargs=None,
+ varkw=None,
+ defaults=None,
+ kwonlyargs=(),
+ kwonlydefaults={},
+ annotations={},
+ formatarg=str,
+ formatvarargs=lambda name: "*" + name,
+ formatvarkw=lambda name: "**" + name,
+ formatvalue=lambda value: "=" + repr(value),
+ formatreturns=lambda text: " -> " + text,
+ formatannotation=_formatannotation,
+):
+ """Copy formatargspec from python 3.7 standard library.
+
+ Python 3 has deprecated formatargspec and requested that Signature
+ be used instead, however this requires a full reimplementation
+ of formatargspec() in terms of creating Parameter objects and such.
+ Instead of introducing all the object-creation overhead and having
+ to reinvent from scratch, just copy their compatibility routine.
+
+ """
+
+ def formatargandannotation(arg):
+ result = formatarg(arg)
+ if arg in annotations:
+ result += ": " + formatannotation(annotations[arg])
return result
-
-else:
- from inspect import formatargspec as inspect_formatargspec # noqa
-
-
-if py3k:
- from configparser import ConfigParser as SafeConfigParser
- import configparser
-else:
- from ConfigParser import SafeConfigParser # noqa
- import ConfigParser as configparser # noqa
-
-if py2k:
- from mako.util import parse_encoding
-
-if py3k:
- import importlib.machinery
-
- import importlib.util
-
- def load_module_py(module_id, path):
- spec = importlib.util.spec_from_file_location(module_id, path)
- module = importlib.util.module_from_spec(spec)
- spec.loader.exec_module(module)
- return module
-
- def load_module_pyc(module_id, path):
- spec = importlib.util.spec_from_file_location(module_id, path)
- module = importlib.util.module_from_spec(spec)
- spec.loader.exec_module(module)
- return module
-
- def get_bytecode_suffixes():
- try:
- return importlib.machinery.BYTECODE_SUFFIXES
- except AttributeError:
- return importlib.machinery.DEBUG_BYTECODE_SUFFIXES
-
- def get_current_bytecode_suffixes():
- if py3k:
- suffixes = importlib.machinery.BYTECODE_SUFFIXES
- else:
- if sys.flags.optimize:
- suffixes = importlib.machinery.OPTIMIZED_BYTECODE_SUFFIXES
- else:
- suffixes = importlib.machinery.BYTECODE_SUFFIXES
-
- return suffixes
-
- def has_pep3147():
- return True
-
-
-else:
- import imp
-
- def load_module_py(module_id, path): # noqa
- with open(path, "rb") as fp:
- mod = imp.load_source(module_id, path, fp)
- if py2k:
- source_encoding = parse_encoding(fp)
- if source_encoding:
- mod._alembic_source_encoding = source_encoding
- del sys.modules[module_id]
- return mod
-
- def load_module_pyc(module_id, path): # noqa
- with open(path, "rb") as fp:
- mod = imp.load_compiled(module_id, path, fp)
- # no source encoding here
- del sys.modules[module_id]
- return mod
-
- def get_current_bytecode_suffixes():
- if sys.flags.optimize:
- return [".pyo"] # e.g. .pyo
- else:
- return [".pyc"] # e.g. .pyc
-
- def has_pep3147():
- return False
-
-
-try:
- exec_ = getattr(compat_builtins, "exec")
-except AttributeError:
- # Python 2
- def exec_(func_text, globals_, lcl):
- exec("exec func_text in globals_, lcl")
-
-
-################################################
-# cross-compatible metaclass implementation
-# Copyright (c) 2010-2012 Benjamin Peterson
-
-
-def with_metaclass(meta, base=object):
- """Create a base class with a metaclass."""
- return meta("%sBase" % meta.__name__, (base,), {})
-
-
-################################################
-
-if py3k:
-
- def raise_(
- exception, with_traceback=None, replace_context=None, from_=False
- ):
- r"""implement "raise" with cause support.
-
- :param exception: exception to raise
- :param with_traceback: will call exception.with_traceback()
- :param replace_context: an as-yet-unsupported feature. This is
- an exception object which we are "replacing", e.g., it's our
- "cause" but we don't want it printed. Basically just what
- ``__suppress_context__`` does but we don't want to suppress
- the enclosing context, if any. So for now we make it the
- cause.
- :param from\_: the cause. this actually sets the cause and doesn't
- hope to hide it someday.
-
- """
- if with_traceback is not None:
- exception = exception.with_traceback(with_traceback)
-
- if from_ is not False:
- exception.__cause__ = from_
- elif replace_context is not None:
- # no good solution here, we would like to have the exception
- # have only the context of replace_context.__context__ so that the
- # intermediary exception does not change, but we can't figure
- # that out.
- exception.__cause__ = replace_context
-
- try:
- raise exception
- finally:
- # credit to
- # https://cosmicpercolator.com/2016/01/13/exception-leaks-in-python-2-and-3/
- # as the __traceback__ object creates a cycle
- del exception, replace_context, from_, with_traceback
-
-
-else:
- exec(
- "def raise_(exception, with_traceback=None, replace_context=None, "
- "from_=False):\n"
- " if with_traceback:\n"
- " raise type(exception), exception, with_traceback\n"
- " else:\n"
- " raise exception\n"
- )
+ specs = []
+ if defaults:
+ firstdefault = len(args) - len(defaults)
+ for i, arg in enumerate(args):
+ spec = formatargandannotation(arg)
+ if defaults and i >= firstdefault:
+ spec = spec + formatvalue(defaults[i - firstdefault])
+ specs.append(spec)
+ if varargs is not None:
+ specs.append(formatvarargs(formatargandannotation(varargs)))
+ else:
+ if kwonlyargs:
+ specs.append("*")
+ if kwonlyargs:
+ for kwonlyarg in kwonlyargs:
+ spec = formatargandannotation(kwonlyarg)
+ if kwonlydefaults and kwonlyarg in kwonlydefaults:
+ spec += formatvalue(kwonlydefaults[kwonlyarg])
+ specs.append(spec)
+ if varkw is not None:
+ specs.append(formatvarkw(formatargandannotation(varkw)))
+ result = "(" + ", ".join(specs) + ")"
+ if "return" in annotations:
+ result += formatreturns(formatannotation(annotations["return"]))
+ return result
# produce a wrapper that allows encoded text to stream
class EncodedIO(io.TextIOWrapper):
def close(self):
pass
-
-
-if py2k:
- # in Py2K, the io.* package is awkward because it does not
- # easily wrap the file type (e.g. sys.stdout) and I can't
- # figure out at all how to wrap StringIO.StringIO
- # and also might be user specified too. So create a full
- # adapter.
-
- class ActLikePy3kIO(object):
-
- """Produce an object capable of wrapping either
- sys.stdout (e.g. file) *or* StringIO.StringIO().
-
- """
-
- def _false(self):
- return False
-
- def _true(self):
- return True
-
- readable = seekable = _false
- writable = _true
- closed = False
-
- def __init__(self, file_):
- self.file_ = file_
-
- def write(self, text):
- return self.file_.write(text)
-
- def flush(self):
- return self.file_.flush()
-
- class EncodedIO(EncodedIO):
- def __init__(self, file_, encoding):
- super(EncodedIO, self).__init__(
- ActLikePy3kIO(file_), encoding=encoding
- )
from subprocess import check_call
from .compat import is_posix
-from .compat import raise_
from .exc import CommandError
editor = _find_editor(environ)
check_call([editor, filename])
except Exception as exc:
- raise_(CommandError("Error executing editor (%s)" % (exc,)), from_=exc)
+ raise CommandError("Error executing editor (%s)" % (exc,)) from exc
def _find_editor(environ=None):
import collections
+from collections.abc import Iterable
import textwrap
import uuid
import warnings
-from .compat import callable
-from .compat import collections_abc
-from .compat import exec_
+from sqlalchemy.util import asbool # noqa
+from sqlalchemy.util import immutabledict # noqa
+from sqlalchemy.util import memoized_property # noqa
+from sqlalchemy.util import to_list # noqa
+from sqlalchemy.util import unique_list # noqa
+
from .compat import inspect_getargspec
-from .compat import raise_
from .compat import string_types
-from .compat import with_metaclass
class _ModuleClsMeta(type):
cls._update_module_proxies(key)
-class ModuleClsProxy(with_metaclass(_ModuleClsMeta)):
+class ModuleClsProxy(metaclass=_ModuleClsMeta):
"""Create module level proxy functions for the
methods on a given class.
fn = getattr(cls, name)
def _name_error(name, from_):
- raise_(
- NameError(
- "Can't invoke function '%s', as the proxy object has "
- "not yet been "
- "established for the Alembic '%s' class. "
- "Try placing this code inside a callable."
- % (name, cls.__name__)
- ),
- from_=from_,
- )
+ raise NameError(
+ "Can't invoke function '%s', as the proxy object has "
+ "not yet been "
+ "established for the Alembic '%s' class. "
+ "Try placing this code inside a callable."
+ % (name, cls.__name__)
+ ) from from_
globals_["_name_error"] = _name_error
}
)
lcl = {}
- exec_(func_text, globals_, lcl)
+ exec(func_text, globals_, lcl)
return lcl[name]
return decorate
-def asbool(value):
- return value is not None and value.lower() == "true"
-
-
def rev_id():
return uuid.uuid4().hex[-12:]
-def to_list(x, default=None):
- if x is None:
- return default
- elif isinstance(x, string_types):
- return [x]
- elif isinstance(x, collections_abc.Iterable):
- return list(x)
- else:
- return [x]
-
-
def to_tuple(x, default=None):
if x is None:
return default
elif isinstance(x, string_types):
return (x,)
- elif isinstance(x, collections_abc.Iterable):
+ elif isinstance(x, Iterable):
return tuple(x)
else:
return (x,)
-def unique_list(seq, hashfunc=None):
- seen = set()
- seen_add = seen.add
- if not hashfunc:
- return [x for x in seq if x not in seen and not seen_add(x)]
- else:
- return [
- x
- for x in seq
- if hashfunc(x) not in seen and not seen_add(hashfunc(x))
- ]
-
-
def dedupe_tuple(tup):
return tuple(unique_list(tup))
-class memoized_property(object):
-
- """A read-only @property that is only evaluated once."""
-
- def __init__(self, fget, doc=None):
- self.fget = fget
- self.__doc__ = doc or fget.__doc__
- self.__name__ = fget.__name__
-
- def __get__(self, obj, cls):
- if obj is None:
- return self
- obj.__dict__[self.__name__] = result = self.fget(obj)
- return result
-
-
-class immutabledict(dict):
- def _immutable(self, *arg, **kw):
- raise TypeError("%s object is immutable" % self.__class__.__name__)
-
- __delitem__ = (
- __setitem__
- ) = __setattr__ = clear = pop = popitem = setdefault = update = _immutable
-
- def __new__(cls, *args):
- new = dict.__new__(cls)
- dict.__init__(new, *args)
- return new
-
- def __init__(self, *args):
- pass
-
- def __reduce__(self):
- return immutabledict, (dict(self),)
-
- def union(self, d):
- if not self:
- return immutabledict(d)
- else:
- d2 = immutabledict(self)
- dict.update(d2, d)
- return d2
-
- def __repr__(self):
- return "immutabledict(%s)" % dict.__repr__(self)
-
-
-class Dispatcher(object):
+class Dispatcher:
def __init__(self, uselist=False):
self._registry = {}
self.uselist = uselist
+from collections.abc import Iterable
import logging
import sys
import textwrap
from . import sqla_compat
from .compat import binary_type
-from .compat import collections_abc
from .compat import string_types
log = logging.getLogger(__name__)
return ""
elif isinstance(value, string_types):
return value
- elif isinstance(value, collections_abc.Iterable):
+ elif isinstance(value, Iterable):
return ", ".join(value)
else:
raise ValueError("Don't know how to comma-format %r" % value)
+import importlib
+import importlib.machinery
+import importlib.util
import os
import re
import tempfile
from mako import exceptions
from mako.template import Template
-from .compat import get_current_bytecode_suffixes
-from .compat import has_pep3147
-from .compat import load_module_py
-from .compat import load_module_pyc
-from .compat import py3k
from .exc import CommandError
def pyc_file_from_path(path):
"""Given a python source path, locate the .pyc."""
- if has_pep3147():
- if py3k:
- import importlib
-
- candidate = importlib.util.cache_from_source(path)
- else:
- import imp
-
- candidate = imp.cache_from_source(path)
- if os.path.exists(candidate):
- return candidate
+ candidate = importlib.util.cache_from_source(path)
+ if os.path.exists(candidate):
+ return candidate
# even for pep3147, fall back to the old way of finding .pyc files,
# to support sourceless operation
filepath, ext = os.path.splitext(path)
- for ext in get_current_bytecode_suffixes():
+ for ext in importlib.machinery.BYTECODE_SUFFIXES:
if os.path.exists(filepath + ext):
return filepath + ext
else:
elif ext in (".pyc", ".pyo"):
module = load_module_pyc(module_id, path)
return module
+
+
+def load_module_py(module_id, path):
+ spec = importlib.util.spec_from_file_location(module_id, path)
+ module = importlib.util.module_from_spec(spec)
+ spec.loader.exec_module(module)
+ return module
+
+
+def load_module_pyc(module_id, path):
+ spec = importlib.util.spec_from_file_location(module_id, path)
+ module = importlib.util.module_from_spec(spec)
+ spec.loader.exec_module(module)
+ return module
represent any named set of SQL text to send to a "CREATE" statement of
some kind::
- class ReplaceableObject(object):
+ class ReplaceableObject:
def __init__(self, name, sqltext):
self.name = name
self.sqltext = sqltext
:meth:`.ScriptDirectory.get_heads`
-Support Non-Ascii Migration Scripts / Messages under Python 2
-==============================================================
-
-To work with a migration file that has non-ascii characters in it under Python
-2, the ``script.py.mako`` file inside of the Alembic environment has to have an
-encoding comment added to the top that will render into a ``.py`` file:
-
-.. code-block:: mako
-
- <%text># coding: utf-8</%text>
-
-Additionally, individual fields if they are to have non-ascii characters in
-them may require decode operations on the template values. Such as, if the
-revision message given on the command line to ``alembic revision`` has
-non-ascii characters in it, under Python 2 the command interface passes this
-through as bytes, and Alembic has no decode step built in for this as it is not
-necessary under Python 3. To decode, add a decoding step to the template for
-each variable that potentially may have non-ascii characters within it. An
-example of applying this to the "message" field is as follows:
-
-.. code-block:: mako
-
- <%!
- import sys
- %>\
- <%text># coding: utf-8</%text>
- """${message.decode("utf-8") \
- if sys.version_info < (3, ) \
- and isinstance(message, str) else message}
-
- Revision ID: ${up_revision}
- Revises: ${down_revision | comma,n}
- Create Date: ${create_date}
-
- """
- from alembic import op
- import sqlalchemy as sa
- ${imports if imports else ""}
-
- # revision identifiers, used by Alembic.
- revision = ${repr(up_revision)}
- down_revision = ${repr(down_revision)}
- branch_labels = ${repr(branch_labels)}
- depends_on = ${repr(depends_on)}
-
-
- def upgrade():
- ${upgrades if upgrades else "pass"}
-
-
- def downgrade():
- ${downgrades if downgrades else "pass"}
Using Asyncio with Alembic
==========================
.. versionchanged:: 1.5.0 Support for SQLAlchemy older than 1.3.0 was dropped.
-Alembic supports Python versions 2.7, 3.6 and above.
+Alembic supports Python versions **3.6 and above**.
-.. versionchanged:: 1.5.0 Support for Python 3.5 was dropped.
+.. versionchanged:: 1.7 Alembic now supports Python 3.6 and newer; support
+ for Python 2.7 has been dropped.
Community
=========
--- /dev/null
+.. change::
+ :tags: python
+
+ Alembic 1.7 now supports Python 3.6 and above; support for prior versions
+ including Python 2.7 has been dropped.
License :: OSI Approved :: MIT License
Operating System :: OS Independent
Programming Language :: Python
- Programming Language :: Python :: 2
- Programming Language :: Python :: 2.7
Programming Language :: Python :: 3
Programming Language :: Python :: 3.6
Programming Language :: Python :: 3.7
packages = find:
include_package_data = true
zip_safe = false
-python_requires = >=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*, !=3.5.*
+python_requires = >=3.6
install_requires =
SQLAlchemy>=1.3.0
exclude = .venv,.git,.tox,dist,doc,*egg,build
import-order-style = google
application-import-names = alembic,tests
-
+per-file-ignores =
+ **/__init__.py:F401
[sqla_testing]
requirement_cls=tests.requirements:DefaultRequirements
-import sys
-
from sqlalchemy import BIGINT
from sqlalchemy import BigInteger
from sqlalchemy import Boolean
# TODO: we should make an adaptation of CompareMetadataToInspectorTest that is
# more well suited towards generic backends (2021-06-10)
-py3k = sys.version_info >= (3,)
-
class AutogenCrossSchemaTest(AutogenTest, TestBase):
__only_on__ = "postgresql"
eq_(len(diffs), 0)
-class ModelOne(object):
+class ModelOne:
__requires__ = ("unique_constraint_reflection",)
schema = None
object_filters=include_object, include_schemas=True
)
- class ExtFunction(object):
+ class ExtFunction:
pass
extfunc = ExtFunction()
-import sys
-
from sqlalchemy import Column
from sqlalchemy import ForeignKey
from sqlalchemy import ForeignKeyConstraint
# subset of the tests here. @zzzeek can work on this at a later point.
# (2021-06-10)
-py3k = sys.version_info >= (3,)
-
-class NoUqReflection(object):
+class NoUqReflection:
__requires__ = ()
def setUp(self):
import re
-import sys
import sqlalchemy as sa # noqa
from sqlalchemy import BigInteger
from alembic.testing import mock
from alembic.testing import TestBase
from alembic.testing.fixtures import op_fixture
-from alembic.util import compat
-
-py3k = sys.version_info >= (3,)
class AutogenRenderTest(TestBase):
eq_ignore_whitespace(
autogenerate.render_op_text(self.autogen_context, op_obj),
"op.create_index('test_lower_code_idx', 'test', "
- "[sa.text(!U'lower(code)')], unique=False)",
+ "[sa.text('lower(code)')], unique=False)",
)
def test_render_add_index_cast(self):
eq_ignore_whitespace(
autogenerate.render_op_text(self.autogen_context, op_obj),
"op.create_index('test_lower_code_idx', 'test', "
- "[sa.text(!U'CAST(code AS VARCHAR)')], unique=False)",
+ "[sa.text('CAST(code AS VARCHAR)')], unique=False)",
)
def test_render_add_index_desc(self):
eq_ignore_whitespace(
autogenerate.render_op_text(self.autogen_context, op_obj),
"op.create_index('test_desc_code_idx', 'test', "
- "[sa.text(!U'code DESC')], unique=False)",
+ "[sa.text('code DESC')], unique=False)",
)
def test_drop_index(self):
def test_render_table_w_unicode_name(self):
m = MetaData()
t = Table(
- compat.ue("\u0411\u0435\u0437"),
+ "\u0411\u0435\u0437",
m,
Column("id", Integer, primary_key=True),
)
autogenerate.render_op_text(self.autogen_context, op_obj),
"op.create_table(%r,"
"sa.Column('id', sa.Integer(), nullable=False),"
- "sa.PrimaryKeyConstraint('id'))" % compat.ue("\u0411\u0435\u0437"),
+ "sa.PrimaryKeyConstraint('id'))" % "\u0411\u0435\u0437",
)
def test_render_table_w_unicode_schema(self):
"test",
m,
Column("id", Integer, primary_key=True),
- schema=compat.ue("\u0411\u0435\u0437"),
+ schema="\u0411\u0435\u0437",
)
op_obj = ops.CreateTableOp.from_table(t)
eq_ignore_whitespace(
"op.create_table('test',"
"sa.Column('id', sa.Integer(), nullable=False),"
"sa.PrimaryKeyConstraint('id'),"
- "schema=%r)" % compat.ue("\u0411\u0435\u0437"),
+ "schema=%r)" % "\u0411\u0435\u0437",
)
def test_render_table_w_unsupported_constraint(self):
)
def test_render_unicode_server_default(self):
- default = compat.ue(
+ default = (
"\u0411\u0435\u0437 "
"\u043d\u0430\u0437\u0432\u0430\u043d\u0438\u044f"
)
self.autogen_context,
None,
),
- "sa.CheckConstraint(!U'im a constraint', name='cc1')",
+ "sa.CheckConstraint('im a constraint', name='cc1')",
)
def test_render_check_constraint_sqlexpr(self):
self.autogen_context,
None,
),
- "sa.CheckConstraint(!U'c > 5 AND c < 10')",
+ "sa.CheckConstraint('c > 5 AND c < 10')",
)
def test_render_check_constraint_literal_binds(self):
self.autogen_context,
None,
),
- "sa.CheckConstraint(!U'c > 5 AND c < 10')",
+ "sa.CheckConstraint('c > 5 AND c < 10')",
)
def test_render_unique_constraint_opts(self):
"t",
m,
Column("c", Integer),
- schema=compat.ue("\u0411\u0435\u0437"),
+ schema="\u0411\u0435\u0437",
)
op_obj = ops.AddConstraintOp.from_constraint(UniqueConstraint(t.c.c))
eq_ignore_whitespace(
autogenerate.render_op_text(self.autogen_context, op_obj),
"op.create_unique_constraint(None, 't', ['c'], "
- "schema=%r)" % compat.ue("\u0411\u0435\u0437"),
+ "schema=%r)" % "\u0411\u0435\u0437",
)
def test_render_modify_nullable_w_default(self):
"# ### commands auto generated by Alembic - please adjust! ###\n"
" op.create_table('sometable',\n"
" sa.Column('x', sa.DateTime(), "
- "server_default=sa.text(!U'now()'), nullable=True)\n"
+ "server_default=sa.text('now()'), nullable=True)\n"
" )\n"
" # ### end Alembic commands ###",
)
"# ### commands auto generated by Alembic - please adjust! ###\n"
" op.create_table('sometable',\n"
" sa.Column('x', sa.DateTime(), "
- "server_default=sa.text(!U'(CURRENT_TIMESTAMP)'), nullable=True)\n"
+ "server_default=sa.text('(CURRENT_TIMESTAMP)'), nullable=True)\n"
" )\n"
" # ### end Alembic commands ###",
)
eq_ignore_whitespace(
result,
"sa.Column('updated_at', sa.TIMESTAMP(), "
- "server_default=sa.text(!U'now()'), "
+ "server_default=sa.text('now()'), "
"nullable=False)",
)
eq_ignore_whitespace(
result,
"sa.Column('updated_at', sa.Boolean(), "
- "server_default=sa.text(!U'0'), "
+ "server_default=sa.text('0'), "
"nullable=False)",
)
eq_ignore_whitespace(
result,
"sa.Column('updated_at', sa.TIMESTAMP(), "
- "server_default=sa.text(!U'now()'), "
+ "server_default=sa.text('now()'), "
"nullable=False)",
)
autogenerate.render_op_text(self.autogen_context, op_obj),
"op.alter_column('sometable', 'somecolumn', "
"existing_type=sa.Integer(), nullable=True, "
- "existing_server_default=sa.text(!U'5'))",
+ "existing_server_default=sa.text('5'))",
)
def test_render_executesql_plaintext(self):
eq_ignore_whitespace(
autogenerate.render_op_text(self.autogen_context, op_obj),
"op.add_column('foo', sa.Column('x', sa.Integer(), "
- "sa.Computed(!U'5', ), nullable=True))",
+ "sa.Computed('5', ), nullable=True))",
)
@config.requirements.computed_columns_api
eq_ignore_whitespace(
autogenerate.render_op_text(self.autogen_context, op_obj),
"op.add_column('foo', sa.Column('x', sa.Integer(), "
- "sa.Computed(!U'5', persisted=%s), nullable=True))" % persisted,
+ "sa.Computed('5', persisted=%s), nullable=True))" % persisted,
)
@config.requirements.computed_columns_api
eq_ignore_whitespace(
autogenerate.render_op_text(self.autogen_context, op_obj),
"op.alter_column('sometable', 'somecolumn', "
- "server_default=sa.Computed(!U'7', ))",
+ "server_default=sa.Computed('7', ))",
)
@config.requirements.computed_columns_api
eq_ignore_whitespace(
autogenerate.render_op_text(self.autogen_context, op_obj),
"op.alter_column('sometable', 'somecolumn', "
- "existing_server_default=sa.Computed(!U'42', ))",
+ "existing_server_default=sa.Computed('42', ))",
)
@config.requirements.computed_columns_api
eq_ignore_whitespace(
autogenerate.render_op_text(self.autogen_context, op_obj),
"op.alter_column('sometable', 'somecolumn', server_default"
- "=sa.Computed(!U'7', persisted=%s))" % persisted,
+ "=sa.Computed('7', persisted=%s))" % persisted,
)
@config.requirements.computed_columns_api
eq_ignore_whitespace(
autogenerate.render_op_text(self.autogen_context, op_obj),
"op.alter_column('sometable', 'somecolumn', "
- "existing_server_default=sa.Computed(!U'42', persisted=%s))"
+ "existing_server_default=sa.Computed('42', persisted=%s))"
% persisted,
)
eq_ignore_whitespace(
autogenerate.render_op_text(self.autogen_context, op_obj),
"op.create_table('t',sa.Column('c', sa.Integer(), nullable=True),"
- "sa.CheckConstraint(!U'c > 5', name=op.f('ck_ct_t')))",
+ "sa.CheckConstraint('c > 5', name=op.f('ck_ct_t')))",
)
def test_inline_fk(self):
autogenerate.render._render_check_constraint(
ck, self.autogen_context, None
),
- "sa.CheckConstraint(!U'im a constraint', name=op.f('ck_t_cc1'))",
+ "sa.CheckConstraint('im a constraint', name=op.f('ck_t_cc1'))",
)
def test_create_table_plus_add_index_in_modify(self):
from contextlib import contextmanager
import inspect
from io import BytesIO
+from io import StringIO
from io import TextIOWrapper
import os
import re
from alembic.testing.fixtures import capture_context_buffer
from alembic.testing.fixtures import capture_engine_context_buffer
from alembic.testing.fixtures import TestBase
-from alembic.util import compat
from alembic.util.sqla_compat import _connectable_has_table
-class _BufMixin(object):
+class _BufMixin:
def _buf_fixture(self):
# try to simulate how sys.stdout looks - we send it u''
# but then it's trying to encode to something.
command.revision(self.cfg, sql=True)
-class _StampTest(object):
+class _StampTest:
def _assert_sql(self, emitted_sql, origin, destinations):
ins_expr = (
r"INSERT INTO alembic_version \(version_num\) "
)
def test_version_text(self):
- buf = compat.StringIO()
- to_mock = "sys.stdout" if util.compat.py3k else "sys.stderr"
+ buf = StringIO()
+ to_mock = "sys.stdout"
with mock.patch(to_mock, buf):
try:
from alembic.testing.env import staging_env
from alembic.testing.fixtures import capture_db
from alembic.testing.fixtures import TestBase
-from alembic.util import compat
class FileConfigTest(TestBase):
def test_utf8_unicode(self):
stdout = mock.Mock(encoding="latin-1")
cfg = config.Config(stdout=stdout)
- cfg.print_stdout(compat.u("méil %s %s"), "x", "y")
+ cfg.print_stdout("méil %s %s", "x", "y")
eq_(
stdout.mock_calls,
- [mock.call.write(compat.u("méil x y")), mock.call.write("\n")],
+ [mock.call.write("méil x y"), mock.call.write("\n")],
)
def test_ascii_unicode(self):
stdout = mock.Mock(encoding=None)
cfg = config.Config(stdout=stdout)
- cfg.print_stdout(compat.u("méil %s %s"), "x", "y")
+ cfg.print_stdout("méil %s %s", "x", "y")
eq_(
stdout.mock_calls,
[mock.call.write("m?il x y"), mock.call.write("\n")],
def test_only_formats_output_with_args(self):
stdout = mock.Mock(encoding=None)
cfg = config.Config(stdout=stdout)
- cfg.print_stdout(compat.u("test 3%"))
+ cfg.print_stdout("test 3%")
eq_(
stdout.mock_calls,
[mock.call.write("test 3%"), mock.call.write("\n")],
import os
from os.path import join
+from unittest.mock import patch
from alembic import util
from alembic.testing import combinations
from alembic.testing import expect_raises_message
-from alembic.testing import mock
from alembic.testing.fixtures import TestBase
class TestHelpers(TestBase):
def common(self, cb, is_posix=True):
- with mock.patch(
- "alembic.util.editor.check_call"
- ) as check_call, mock.patch(
+ with patch("alembic.util.editor.check_call") as check_call, patch(
"alembic.util.editor.exists"
- ) as exists, mock.patch(
+ ) as exists, patch(
"alembic.util.editor.is_posix",
new=is_posix,
- ), mock.patch(
+ ), patch(
"os.pathsep", new=":" if is_posix else ";"
):
cb(check_call, exists)
autogenerate.render_op_text(autogen_context, op_obj),
"""op.create_index('foo_idx', 't', \
['x', 'y'], unique=False, """
- """postgresql_where=sa.text(!U"y = 'something'"))""",
+ """postgresql_where=sa.text("y = 'something'"))""",
)
def test_render_server_default_native_boolean(self):
eq_ignore_whitespace(
result,
"sa.Column('updated_at', sa.Boolean(), "
- "server_default=sa.text(!U'false'), "
+ "server_default=sa.text('false'), "
"nullable=False)",
)
autogenerate.render_op_text(autogen_context, op_obj),
"op.create_exclude_constraint('t_excl_x', "
"'t', (sa.column('x'), '>'), "
- "where=sa.text(!U'x != 2'), using='gist')",
+ "where=sa.text('x != 2'), using='gist')",
)
def test_add_exclude_constraint_case_sensitive(self):
autogenerate.render_op_text(autogen_context, op_obj),
"op.create_exclude_constraint('t_excl_x', 'TTAble', "
"(sa.column('XColumn'), '>'), "
- "where=sa.text(!U'\"XColumn\" != 2'), using='gist')",
+ "where=sa.text('\"XColumn\" != 2'), using='gist')",
)
def test_inline_exclude_constraint(self):
"op.create_table('t',sa.Column('x', sa.String(), nullable=True),"
"sa.Column('y', sa.String(), nullable=True),"
"postgresql.ExcludeConstraint((sa.column('x'), '>'), "
- "where=sa.text(!U'x != 2'), using='gist', name='t_excl_x')"
+ "where=sa.text('x != 2'), using='gist', name='t_excl_x')"
")",
)
"nullable=True),"
"sa.Column('YColumn', sa.String(), nullable=True),"
"postgresql.ExcludeConstraint((sa.column('XColumn'), '>'), "
- "where=sa.text(!U'\"XColumn\" != 2'), using='gist', "
+ "where=sa.text('\"XColumn\" != 2'), using='gist', "
"name='TExclX'))",
)
from alembic.util import compat
-class PatchEnvironment(object):
+class PatchEnvironment:
branched_connection = False
@contextmanager
id_="r",
)
class NewFangledSourcelessApplyVersionsTest(ApplyVersionsFunctionalTest):
-
- __requires__ = ("pep3147",)
+ pass
class CallbackEnvironmentTest(ApplyVersionsFunctionalTest):
script,
self.a,
(
- compat.u(
- """# coding: utf-8
+ """# coding: utf-8
from __future__ import unicode_literals
revision = '%s'
down_revision = None
op.execute("drôle de petite voix m’a réveillé")
"""
- )
% self.a
),
encoding="utf-8",
bytes_io=True, output_encoding="utf-8"
) as buf:
command.upgrade(self.cfg, self.a, sql=True)
- assert compat.u("« S’il vous plaît…").encode("utf-8") in buf.getvalue()
+ assert "« S’il vous plaît…".encode("utf-8") in buf.getvalue()
class VersionNameTemplateTest(TestBase):
class NewFangledEnvOnlySourcelessIgnoreFilesTest(IgnoreFilesTest):
sourceless = "pep3147_envonly"
- __requires__ = ("pep3147",)
-
class NewFangledEverythingSourcelessIgnoreFilesTest(IgnoreFilesTest):
sourceless = "pep3147_everything"
- __requires__ = ("pep3147",)
-
class SourcelessNeedsFlagTest(TestBase):
def setUp(self):
eq_ignore_whitespace(
result,
"sa.Column('date_value', sa.DateTime(), "
- "server_default=sa.text(!U\"(datetime('now', 'localtime'))\"), "
+ "server_default=sa.text(\"(datetime('now', 'localtime'))\"), "
"nullable=True)",
)
eq_ignore_whitespace(
result,
"sa.Column('date_value', sa.DateTime(), "
- "server_default=sa.text(!U\"(datetime('now', 'localtime'))\"), "
+ "server_default=sa.text(\"(datetime('now', 'localtime'))\"), "
"nullable=True)",
)
postgresql: psycopg2>=2.7
mysql: mysqlclient>=1.4.0
mysql: pymysql
- oracle: cx_oracle>=7,<8;python_version<"3"
- oracle: cx_oracle>=7;python_version>="3"
+ oracle: cx_oracle>=7
mssql: pyodbc
cov: pytest-cov
sqlalchemy: sqlalchemy>=1.3.0