# See https://pre-commit.com/hooks.html for more hooks
repos:
- repo: https://github.com/python/black
- rev: 22.3.0
+ rev: 23.3.0
hooks:
- id: black
- repo: https://github.com/sqlalchemyorg/zimports
- rev: v0.4.0
+ rev: v0.6.0
hooks:
- id: zimports
args:
- --keep-unused-type-checking
- repo: https://github.com/pycqa/flake8
- rev: 3.9.2
+ rev: 6.0.0
hooks:
- id: flake8
additional_dependencies:
opts: Optional[dict] = None,
autogenerate: bool = True,
) -> None:
-
if (
autogenerate
and migration_context is not None
parent_names["schema_qualified_table_name"] = table_name
for fn in self._name_filters:
-
if not fn(name, type_, parent_names):
return False
else:
def _produce_net_changes(
autogen_context: AutogenContext, upgrade_ops: UpgradeOps
) -> None:
-
connection = autogen_context.connection
assert connection is not None
include_schemas = autogen_context.opts.get("include_schemas", False)
upgrade_ops: UpgradeOps,
autogen_context: AutogenContext,
) -> None:
-
default_schema = inspector.bind.dialect.default_schema_name
# tables coming from the connection will not have "schema"
)
sqla_compat._reflect_table(inspector, t)
if autogen_context.run_object_filters(t, tname, "table", True, None):
-
modify_table_ops = ops.ModifyTableOps(tname, [], schema=s)
comparators.dispatch("table")(
if autogen_context.run_object_filters(
metadata_table, tname, "table", False, conn_table
):
-
modify_table_ops = ops.ModifyTableOps(tname, [], schema=s)
with _compare_columns(
s,
autogen_context,
inspector,
):
-
comparators.dispatch("table")(
autogen_context,
modify_table_ops,
conn_table: Optional[Table],
metadata_table: Optional[Table],
) -> None:
-
inspector = autogen_context.inspector
is_create_table = conn_table is None
is_drop_table = metadata_table is None
_uq_constraint_sig(uqs_dupe_indexes[overlap]).sig
not in unnamed_metadata_uqs
):
-
conn_unique_constraints.discard(uqs_dupe_indexes[overlap])
elif overlap not in metadata_ix_names:
conn_indexes.discard(conn_ix_names[overlap])
conn_col: Column[Any],
metadata_col: Column[Any],
) -> None:
-
metadata_col_nullable = metadata_col.nullable
conn_col_nullable = conn_col.nullable
alter_column_op.existing_nullable = conn_col_nullable
conn_col: Column[Any],
metadata_col: Column[Any],
) -> None:
-
if metadata_col.table._autoincrement_column is metadata_col:
alter_column_op.kw["autoincrement"] = True
elif metadata_col.autoincrement is True:
conn_col: Column[Any],
metadata_col: Column[Any],
) -> None:
-
conn_type = conn_col.type
alter_column_op.existing_type = conn_type
metadata_type = metadata_col.type
def _render_server_default_for_compare(
metadata_default: Optional[Any], autogen_context: AutogenContext
) -> Optional[str]:
-
if isinstance(metadata_default, sa_schema.DefaultClause):
if isinstance(metadata_default.arg, str):
metadata_default = metadata_default.arg
conn_col: Column[Any],
metadata_col: Column[Any],
) -> Optional[bool]:
-
metadata_default = metadata_col.server_default
conn_col_default = conn_col.server_default
if conn_col_default is None and metadata_default is None:
conn_col: Column[Any],
metadata_col: Column[Any],
) -> Optional[Literal[False]]:
-
assert autogen_context.dialect is not None
if not autogen_context.dialect.supports_comments:
return None
conn_table: Optional[Table],
metadata_table: Optional[Table],
) -> None:
-
# if we're doing CREATE TABLE, all FKs are created
# inline within the table def
if conn_table is None or metadata_table is None:
conn_table: Optional[Table],
metadata_table: Optional[Table],
) -> None:
-
assert autogen_context.dialect is not None
if not autogen_context.dialect.supports_comments:
return
op_container: ops.OpContainer,
autogen_context: AutogenContext,
) -> str:
-
buf = StringIO()
printer = PythonPrinter(buf)
def _render_create_table_comment(
autogen_context: AutogenContext, op: ops.CreateTableCommentOp
) -> str:
-
templ = (
"{prefix}create_table_comment(\n"
"{indent}'{tname}',\n"
def _render_drop_table_comment(
autogen_context: AutogenContext, op: ops.DropTableCommentOp
) -> str:
-
templ = (
"{prefix}drop_table_comment(\n"
"{indent}'{tname}',\n"
def _add_fk_constraint(
autogen_context: AutogenContext, op: ops.CreateForeignKeyOp
) -> str:
-
args = [repr(_render_gen_name(autogen_context, op.constraint_name))]
if not autogen_context._has_batch:
args.append(repr(_ident(op.source_table)))
def _drop_constraint(
autogen_context: AutogenContext, op: ops.DropConstraintOp
) -> str:
-
if autogen_context._has_batch:
template = "%(prefix)sdrop_constraint" "(%(name)r, type_=%(type)r)"
else:
@renderers.dispatch_for(ops.AddColumnOp)
def _add_column(autogen_context: AutogenContext, op: ops.AddColumnOp) -> str:
-
schema, tname, column = op.schema, op.table_name, op.column
if autogen_context._has_batch:
template = "%(prefix)sadd_column(%(column)s)"
@renderers.dispatch_for(ops.DropColumnOp)
def _drop_column(autogen_context: AutogenContext, op: ops.DropColumnOp) -> str:
-
schema, tname, column_name = op.schema, op.table_name, op.column_name
if autogen_context._has_batch:
def _alter_column(
autogen_context: AutogenContext, op: ops.AlterColumnOp
) -> str:
-
tname = op.table_name
cname = op.column_name
server_default = op.modify_server_default
is_server_default: bool = False,
) -> str:
if isinstance(value, sql.ClauseElement):
-
if wrap_in_text:
template = "%(prefix)stext(%(sql)r)"
else:
opts: List[Tuple[str, Any]] = []
if column.server_default:
-
rendered = _render_server_default( # type:ignore[assignment]
column.server_default, autogen_context
)
def _populate_render_fk_opts(
constraint: ForeignKeyConstraint, opts: List[Tuple[str, str]]
) -> None:
-
if constraint.onupdate:
opts.append(("onupdate", repr(constraint.onupdate)))
if constraint.ondelete:
for sc in script.walk_revisions(
base=base or "base", head=head or "heads"
):
-
if indicate_current:
sc._db_current_indicator = sc.revision in currents
class Config:
-
r"""Represent an Alembic configuration.
Within an ``env.py`` script, this is available
and fn.__name__[0] != "_"
and fn.__module__ == "alembic.command"
):
-
spec = compat.inspect_getfullargspec(fn)
if spec[3] is not None:
positional = spec[0][1 : -len(spec[3])]
return False
def _compare_identity_default(self, metadata_identity, inspector_identity):
-
# ignored contains the attributes that were not considered
# because assumed to their default values in the db.
diff, ignored = _compare_identity_options(
existing_nullable: Optional[bool] = None,
**kw: Any,
) -> None:
-
if nullable is not None:
if type_ is not None:
# the NULL/NOT NULL alter will handle
rendered_metadata_default,
rendered_inspector_default,
):
-
if rendered_metadata_default is not None:
-
rendered_metadata_default = re.sub(
r"[\(\) \"\']", "", rendered_metadata_default
)
metadata_unique_constraints,
metadata_indexes,
):
-
# TODO: if SQLA 1.0, make use of "duplicates_index"
# metadata
removed = set()
self._exec(CreateIndex(index))
def prep_table_for_batch(self, batch_impl, table):
-
for constraint in table.constraints:
if (
constraint.name is not None
existing_autoincrement: Optional[bool] = None,
**kw: Any,
) -> None:
-
using = kw.pop("postgresql_using", None)
if using is not None and type_ is None:
metadata_unique_constraints,
metadata_indexes,
):
-
doubled_constraints = {
index
for index in conn_indexes
def _postgresql_autogenerate_prefix(autogen_context: AutogenContext) -> str:
-
imports = autogen_context.imports
if imports is not None:
imports.add("from sqlalchemy.dialects import postgresql")
rendered_metadata_default: Optional[str],
rendered_inspector_default: Optional[str],
) -> bool:
-
if rendered_metadata_default is not None:
rendered_metadata_default = re.sub(
r"^\((.+)\)$", r"\1", rendered_metadata_default
metadata_unique_constraints,
metadata_indexes,
):
-
self._skip_functional_indexes(metadata_indexes, conn_indexes)
for const in (
list(self.named_constraints.values()) + self.unnamed_constraints
):
-
const_columns = {c.key for c in _columns_for_constraint(const)}
if not const_columns.issubset(self.column_transfers):
)
def to_constraint(self) -> Constraint:
-
if self._reverse is not None:
constraint = self._reverse.to_constraint()
constraint.name = self.constraint_name
def from_constraint(
cls, constraint: Constraint
) -> CreateUniqueConstraintOp:
-
constraint_table = sqla_compat._table_for_constraint(constraint)
uq_constraint = cast("UniqueConstraint", constraint)
@classmethod
def from_constraint(cls, constraint: Constraint) -> CreateForeignKeyOp:
-
fk_constraint = cast("ForeignKeyConstraint", constraint)
kw: dict = {}
if fk_constraint.onupdate:
return False
def reverse(self) -> AlterColumnOp:
-
kw = self.kw.copy()
kw["existing_type"] = self.existing_type
kw["existing_nullable"] = self.existing_nullable
def alter_column(
operations: "Operations", operation: "ops.AlterColumnOp"
) -> None:
-
compiler = operations.impl.dialect.statement_compiler(
operations.impl.dialect, None
)
from ..operations import Operations
if TYPE_CHECKING:
-
from sqlalchemy.engine import URL
from sqlalchemy.engine.base import Connection
from sqlalchemy.sql.elements import ClauseElement
if start_from_rev == "base":
start_from_rev = None
elif start_from_rev is not None and self.script:
-
start_from_rev = [
cast("Script", self.script.get_revision(sfr)).revision
for sfr in util.to_list(start_from_rev)
assert self._migrations_fn is not None
for step in self._migrations_fn(heads, self):
with self.begin_transaction(_per_migration=True):
-
if self.as_sql and not head_maintainer.heads:
# for offline mode, include a CREATE TABLE from
# the base
rendered_metadata_default: Optional[str],
rendered_column_default: Optional[str],
) -> bool:
-
if self._user_compare_server_default is False:
return False
class MigrationStep:
-
from_revisions_no_deps: Tuple[str, ...]
to_revisions_no_deps: Tuple[str, ...]
is_upgrade: bool
multiple_heads="Multiple heads are present; please specify a "
"single target revision"
):
-
heads_revs = self.get_revisions(heads)
steps = []
dests = self.get_revisions(revision) or [None]
for dest in dests:
-
if dest is None:
# dest is 'base'. Return a "delete branch" migration
# for all applicable heads.
and not parent._is_real_branch_point
and not parent.is_merge_point
):
-
parent.branch_labels.update(revision.branch_labels)
if parent.down_revision:
parent = map_[parent.down_revision]
omit_immediate_dependencies: bool = False,
include_dependencies: bool = True,
) -> Iterator[Any]:
-
if omit_immediate_dependencies:
def fn(rev):
check: bool = False,
include_dependencies: bool = True,
) -> Iterator[Revision]:
-
if include_dependencies:
def fn(rev):
current_candidate_idx = 0
while current_heads:
-
candidate = current_heads[current_candidate_idx]
for check_head_index, ancestors in enumerate(ancestors_by_idx):
def console_scripts(
path: str, options: dict, ignore_output: bool = False
) -> None:
-
try:
entrypoint_name = options["entrypoint"]
except KeyError as ke:
def _assert_raises(
except_cls, callable_, args, kwargs, msg=None, check_context=False
):
-
with _expect_raises(except_cls, msg, check_context) as ec:
callable_(*args, **kwargs)
return ec.error
def eq_ignore_whitespace(a, b, msg=None):
-
a = re.sub(r"^\s+?|\n", "", a)
a = re.sub(r" {2,}", " ", a)
b = re.sub(r"^\s+?|\n", "", b)
if name is None or name == "default":
return default.DefaultDialect()
else:
-
d = sqla_compat._create_url(name).get_dialect()()
if name == "postgresql":
def staging_env(create=True, template="generic", sourceless=False):
-
cfg = _testing_config()
if create:
-
path = os.path.join(_get_staging_directory(), "scripts")
assert not os.path.exists(path), (
"staging directory %s already exists; poor cleanup?" % path
def make_sourceless(path, style):
-
import py_compile
py_compile.compile(path)
literal_binds=False,
native_boolean=None,
):
-
opts = {}
if naming_convention:
opts["target_metadata"] = MetaData(naming_convention=naming_convention)
class AlterColRoundTripFixture:
-
# since these tests are about syntax, use more recent SQLAlchemy as some of
# the type / server default compare logic might not work on older
# SQLAlchemy versions as seems to be the case for SQLAlchemy 1.1 on Oracle
return_ops=False,
max_identifier_length=None,
):
-
if max_identifier_length:
dialect = self.bind.dialect
existing_length = dialect.max_identifier_length
return decorate
def dispatch(self, obj: Any, qualifier: str = "default") -> Any:
-
if isinstance(obj, str):
targets: Sequence = [obj]
elif isinstance(obj, type):
"""
if not os.path.isabs(fname) and ":" in fname:
-
tokens = fname.split(":")
# from https://importlib-resources.readthedocs.io/en/latest/migration.html#pkg-resources-resource-filename # noqa E501
constraint, _alembic_quote=False
)
else:
-
# prior to SQLAlchemy 1.4, work around quoting logic to get at the
# final compiled name without quotes.
if hasattr(constraint.name, "quote"):
__backend__ = True
def test_uses_explcit_schema_in_default_one(self):
-
default_schema = self.bind.dialect.default_schema_name
m1 = MetaData()
eq_(diffs, [])
def test_uses_explcit_schema_in_default_two(self):
-
default_schema = self.bind.dialect.default_schema_name
m1 = MetaData()
eq_(diffs[0][1].c.keys(), ["y"])
def test_uses_explcit_schema_in_default_three(self):
-
default_schema = self.bind.dialect.default_schema_name
m1 = MetaData()
eq_(self.bind.dialect.default_schema_name, None)
def test_no_default_schema(self):
-
m1 = MetaData()
m2 = MetaData()
def test_compare_type(
self, impl_fixture, inspected_type, metadata_type, expected
):
-
is_(
impl_fixture.compare_type(
Column("x", inspected_type), Column("x", metadata_type)
symbols = ["someothertable", "sometable"]
def test_autogen(self):
-
uo = ops.UpgradeOps(ops=[])
ctx = self.autogen_context
req = config.requirements.reflects_indexes_column_sorting
if flatten:
-
flat = list(itertools.chain.from_iterable(diff_pairs))
for f1, f2 in with_sort:
flat.extend([(f1, req), (f2, req)])
)
def test_generic_array_type(self):
-
eq_ignore_whitespace(
autogenerate.render._repr_type(
types.ARRAY(Integer), self.autogen_context
)
def test_render_variant(self):
-
self.autogen_context.opts["user_module_prefix"] = None
type_ = (
class RenderNamingConventionTest(TestBase):
def setUp(self):
-
convention = {
"ix": "ix_%(custom)s_%(column_0_label)s",
"uq": "uq_%(custom)s_%(table_name)s_%(column_0_name)s",
class BatchAPITest(TestBase):
@contextmanager
def _fixture(self, schema=None):
-
migration_context = mock.Mock(
opts={},
impl=mock.MagicMock(__dialect__="sqlite", connection=object()),
)
def test_init_w_package(self):
-
path = os.path.join(_get_staging_directory(), "foobar")
with mock.patch("alembic.command.open") as open_:
)
def test_external_nested_render_sqla_type(self):
-
eq_ignore_whitespace(
autogenerate.render._repr_type(
EXT_ARRAY(sqla_types.Integer), self.autogen_context
)
def test_external_nested_render_external_type(self):
-
eq_ignore_whitespace(
autogenerate.render._repr_type(
EXT_ARRAY(FOOBARTYPE), self.autogen_context
command.downgrade(self.cfg, "%s:%s" % (b, a), sql=True)
def test_running_comments_not_in_sql(self):
-
message = "this is a very long \nand multiline\nmessage"
d = command.revision(self.cfg, message=message)
context.assert_("CREATE INDEX name ON tname (foo(x))")
def test_add_column_schema_hard_quoting(self):
-
context = op_fixture("postgresql")
op.add_column(
"somename",
)
def test_rename_table_schema_hard_quoting(self):
-
context = op_fixture("postgresql")
op.rename_table(
"t1", "t2", schema=quoted_name("some.schema", quote=True)
context.assert_('ALTER TABLE "some.schema".t1 RENAME TO t2')
def test_add_constraint_schema_hard_quoting(self):
-
context = op_fixture("postgresql")
op.create_check_constraint(
"ck_user_name_len",
), mock.patch(
"alembic.script.write_hooks.subprocess"
) as mock_subprocess:
-
rev = command.revision(self.cfg, message="x")
eq_(importlib_metadata_get.mock_calls, [mock.call("console_scripts")])
@combinations(True, False)
def test_filename_interpolation(self, posix):
-
input_config = """
[post_write_hooks]
hooks = black
)
def test_path_in_config(self):
-
input_config = """
[post_write_hooks]
hooks = black
)
def test_postgresql_array_type(self):
-
eq_ignore_whitespace(
autogenerate.render._repr_type(
ARRAY(Integer), self.autogen_context
)
def test_generic_array_type(self):
-
eq_ignore_whitespace(
autogenerate.render._repr_type(
types.ARRAY(Integer), self.autogen_context
@config.requirements.btree_gist
def test_exclude_const_unchanged(self):
-
m1 = MetaData()
m2 = MetaData()
)
def test_three_branches_end_in_single_branch(self):
-
self._assert_iteration(
["merge", "fe1b1"],
"a3",
)
def test_two_branches_to_root(self):
-
# here we want 'a3' as a "stop" branch point, but *not*
# 'db1', as we don't have multiple traversals on db1
self._assert_iteration(
)
def test_three_branches_to_root(self):
-
# in this case, both "a3" and "db1" are stop points
self._assert_iteration(
["merge", "fe1b1"],
)
def test_three_branches_end_multiple_bases(self):
-
# in this case, both "a3" and "db1" are stop points
self._assert_iteration(
["merge", "fe1b1"],
)
def test_three_branches_end_multiple_bases_exclusive(self):
-
self._assert_iteration(
["merge", "fe1b1"],
["cb1", "cb2"],
clear_staging_env()
def _test_ignore_file_py(self, fname):
-
command.revision(self.cfg, message="some rev")
script = ScriptDirectory.from_config(self.cfg)
path = os.path.join(script.versions, fname)
@testing.fixture
def multi_base_fixture(self):
-
self.env = staging_env()
self.cfg = _multi_dir_testing_config()
self.cfg.set_main_option("recursive_version_locations", "true")
@config.requirements.comments
def test_create_table_with_comment_ignored(self):
-
context = op_fixture("sqlite")
op.create_table(
"t2",
@config.requirements.comments
def test_add_column_with_comment_ignored(self):
-
context = op_fixture("sqlite")
op.add_column("t1", Column("c1", Integer, comment="c1 comment"))
context.assert_("ALTER TABLE t1 ADD COLUMN c1 INTEGER")
)
def test_invalid_relative_upgrade_path(self):
-
assert_raises_message(
util.CommandError,
"Relative revision -2 didn't produce 2 migrations",
)
def test_downgrade_path(self):
-
self._assert_downgrade(
self.c.revision,
self.e.revision,
)
def test_relative_downgrade_path(self):
-
self._assert_downgrade(
"-1", self.c.revision, [self.down_(self.c)], {self.b.revision}
)
)
def test_invalid_relative_downgrade_path(self):
-
assert_raises_message(
util.CommandError,
"Relative revision -5 didn't produce 5 migrations",
)
def test_invalid_move_rev_to_none(self):
-
assert_raises_message(
util.CommandError,
r"Destination %s is not a valid downgrade "
)
def test_invalid_move_higher_to_lower(self):
-
assert_raises_message(
util.CommandError,
r"Destination %s is not a valid downgrade "
)
def test_upgrade_single_branch(self):
-
self._assert_upgrade(
self.d1.revision,
self.b.revision,
)
def test_relative_upgrade(self):
-
self._assert_upgrade(
"c2branch@head-1",
self.b.revision,
)
def test_mergepoint_to_only_one_side_downgrade(self):
-
self._assert_downgrade(
self.b1.revision,
(self.d2.revision, self.d1.revision),
clear_staging_env()
def test_mergepoint_to_only_one_side_upgrade(self):
-
self._assert_upgrade(
self.d1.revision,
(self.d3.revision, self.d2.revision, self.b1.revision),
)
def test_mergepoint_to_two_sides_upgrade(self):
-
self._assert_upgrade(
self.d1.revision,
(self.d3.revision, self.b2.revision, self.b1.revision),
clear_staging_env()
def test_dependencies_are_normalized(self):
-
heads = [self.b4.revision]
self._assert_downgrade(
)
def test_upgrade_across_merge_point(self):
-
eq_(
self.env._upgrade_revs(self.f.revision, self.b.revision),
[
)
def test_downgrade_across_merge_point(self):
-
eq_(
self.env._downgrade_revs(self.b.revision, self.f.revision),
[
"sqlalchemy.sql.type_api.",
"sqlalchemy.sql.functions.",
"sqlalchemy.sql.dml.",
- "typing."
+ "typing.",
]
ADDITIONAL_ENV = {
"MigrationContext": MigrationContext,
pydocstyle<4.0.0
# used by flake8-rst-docstrings
pygments
- black==22.3.0
+ black==23.3.0
commands =
flake8 ./alembic/ ./tests/ setup.py docs/build/conf.py {posargs}
black --check setup.py tests alembic