# See https://pre-commit.com/hooks.html for more hooks
repos:
- repo: https://github.com/python/black
- rev: 23.3.0
+ rev: 24.1.1
hooks:
- id: black
migration_script = self.generated_revisions[-1]
if not getattr(migration_script, "_needs_render", False):
migration_script.upgrade_ops_list[-1].upgrade_token = upgrade_token
- migration_script.downgrade_ops_list[
- -1
- ].downgrade_token = downgrade_token
+ migration_script.downgrade_ops_list[-1].downgrade_token = (
+ downgrade_token
+ )
migration_script._needs_render = True
else:
migration_script._upgrade_ops.append(
prefix=_alembic_autogenerate_prefix(autogen_context),
tname=op.table_name,
comment="%r" % op.comment if op.comment is not None else None,
- existing="%r" % op.existing_comment
- if op.existing_comment is not None
- else None,
+ existing=(
+ "%r" % op.existing_comment
+ if op.existing_comment is not None
+ else None
+ ),
schema="'%s'" % op.schema if op.schema is not None else None,
indent=" ",
)
return templ.format(
prefix=_alembic_autogenerate_prefix(autogen_context),
tname=op.table_name,
- existing="%r" % op.existing_comment
- if op.existing_comment is not None
- else None,
+ existing=(
+ "%r" % op.existing_comment
+ if op.existing_comment is not None
+ else None
+ ),
schema="'%s'" % op.schema if op.schema is not None else None,
indent=" ",
)
_get_index_rendered_expressions(index, autogen_context)
),
"unique": index.unique or False,
- "schema": (", schema=%r" % _ident(index.table.schema))
- if index.table.schema
- else "",
+ "schema": (
+ (", schema=%r" % _ident(index.table.schema))
+ if index.table.schema
+ else ""
+ ),
"kwargs": ", " + ", ".join(opts) if opts else "",
}
return text
idx: Index, autogen_context: AutogenContext
) -> List[str]:
return [
- repr(_ident(getattr(exp, "name", None)))
- if isinstance(exp, sa_schema.Column)
- else _render_potential_expr(exp, autogen_context, is_index=True)
+ (
+ repr(_ident(getattr(exp, "name", None)))
+ if isinstance(exp, sa_schema.Column)
+ else _render_potential_expr(exp, autogen_context, is_index=True)
+ )
for exp in idx.expressions
]
)
return "%(prefix)sCheckConstraint(%(sqltext)s%(opts)s)" % {
"prefix": _sqlalchemy_autogenerate_prefix(autogen_context),
- "opts": ", " + (", ".join("%s=%s" % (k, v) for k, v in opts))
- if opts
- else "",
+ "opts": (
+ ", " + (", ".join("%s=%s" % (k, v) for k, v in opts))
+ if opts
+ else ""
+ ),
"sqltext": _render_potential_expr(
constraint.sqltext, autogen_context, wrap_in_text=False
),
@overload
def get_section(
self, name: str, default: None = ...
- ) -> Optional[Dict[str, str]]:
- ...
+ ) -> Optional[Dict[str, str]]: ...
# "default" here could also be a TypeVar
# _MT = TypeVar("_MT", bound=Mapping[str, str]),
@overload
def get_section(
self, name: str, default: Dict[str, str]
- ) -> Dict[str, str]:
- ...
+ ) -> Dict[str, str]: ...
@overload
def get_section(
self, name: str, default: Mapping[str, str]
- ) -> Union[Dict[str, str], Mapping[str, str]]:
- ...
+ ) -> Union[Dict[str, str], Mapping[str, str]]: ...
def get_section(
self, name: str, default: Optional[Mapping[str, str]] = None
return default
@overload
- def get_main_option(self, name: str, default: str) -> str:
- ...
+ def get_main_option(self, name: str, default: str) -> str: ...
@overload
def get_main_option(
self, name: str, default: Optional[str] = None
- ) -> Optional[str]:
- ...
+ ) -> Optional[str]: ...
def get_main_option(
self, name: str, default: Optional[str] = None
self.target_table,
tuple(self.target_columns),
) + (
- (None if onupdate.lower() == "no action" else onupdate.lower())
- if onupdate
- else None,
- (None if ondelete.lower() == "no action" else ondelete.lower())
- if ondelete
- else None,
+ (
+ (None if onupdate.lower() == "no action" else onupdate.lower())
+ if onupdate
+ else None
+ ),
+ (
+ (None if ondelete.lower() == "no action" else ondelete.lower())
+ if ondelete
+ else None
+ ),
# convert initially + deferrable into one three-state value
- "initially_deferrable"
- if initially and initially.lower() == "deferred"
- else "deferrable"
- if deferrable
- else "not deferrable",
+ (
+ "initially_deferrable"
+ if initially and initially.lower() == "deferred"
+ else "deferrable" if deferrable else "not deferrable"
+ ),
)
@util.memoized_property
class AlterTable(DDLElement):
-
"""Represent an ALTER TABLE statement.
Only the string name and optional schema name of the table
return "%s %s %s" % (
alter_table(compiler, element.table_name, element.schema),
alter_column(compiler, element.column_name),
- "SET DEFAULT %s" % format_server_default(compiler, element.default)
- if element.default is not None
- else "DROP DEFAULT",
+ (
+ "SET DEFAULT %s" % format_server_default(compiler, element.default)
+ if element.default is not None
+ else "DROP DEFAULT"
+ ),
)
class DefaultImpl(metaclass=ImplMeta):
-
"""Provide the entrypoint for major migration operations,
including database-specific behavioral variances.
self._exec(
sqla_compat._insert_inline(table).values(
**{
- k: sqla_compat._literal_bindparam(
- k, v, type_=table.c[k].type
- )
- if not isinstance(
- v, sqla_compat._literal_bindparam
+ k: (
+ sqla_compat._literal_bindparam(
+ k, v, type_=table.c[k].type
+ )
+ if not isinstance(
+ v, sqla_compat._literal_bindparam
+ )
+ else v
)
- else v
for k, v in row.items()
}
)
column_name,
schema=schema,
newname=name if name is not None else column_name,
- nullable=nullable
- if nullable is not None
- else existing_nullable
- if existing_nullable is not None
- else True,
+ nullable=(
+ nullable
+ if nullable is not None
+ else (
+ existing_nullable
+ if existing_nullable is not None
+ else True
+ )
+ ),
type_=type_ if type_ is not None else existing_type,
- default=server_default
- if server_default is not False
- else existing_server_default,
- autoincrement=autoincrement
- if autoincrement is not None
- else existing_autoincrement,
- comment=comment
- if comment is not False
- else existing_comment,
+ default=(
+ server_default
+ if server_default is not False
+ else existing_server_default
+ ),
+ autoincrement=(
+ autoincrement
+ if autoincrement is not None
+ else existing_autoincrement
+ ),
+ comment=(
+ comment if comment is not False else existing_comment
+ ),
)
)
elif (
column_name,
schema=schema,
newname=name if name is not None else column_name,
- nullable=nullable
- if nullable is not None
- else existing_nullable
- if existing_nullable is not None
- else True,
+ nullable=(
+ nullable
+ if nullable is not None
+ else (
+ existing_nullable
+ if existing_nullable is not None
+ else True
+ )
+ ),
type_=type_ if type_ is not None else existing_type,
- default=server_default
- if server_default is not False
- else existing_server_default,
- autoincrement=autoincrement
- if autoincrement is not None
- else existing_autoincrement,
- comment=comment
- if comment is not False
- else existing_comment,
+ default=(
+ server_default
+ if server_default is not False
+ else existing_server_default
+ ),
+ autoincrement=(
+ autoincrement
+ if autoincrement is not None
+ else existing_autoincrement
+ ),
+ comment=(
+ comment if comment is not False else existing_comment
+ ),
)
)
elif server_default is not False:
return "%s ALTER COLUMN %s %s" % (
alter_table(compiler, element.table_name, element.schema),
format_column_name(compiler, element.column_name),
- "SET DEFAULT %s" % format_server_default(compiler, element.default)
- if element.default is not None
- else "DROP DEFAULT",
+ (
+ "SET DEFAULT %s" % format_server_default(compiler, element.default)
+ if element.default is not None
+ else "DROP DEFAULT"
+ ),
)
return "%s %s %s" % (
alter_table(compiler, element.table_name, element.schema),
alter_column(compiler, element.column_name),
- "DEFAULT %s" % format_server_default(compiler, element.default)
- if element.default is not None
- else "DEFAULT NULL",
+ (
+ "DEFAULT %s" % format_server_default(compiler, element.default)
+ if element.default is not None
+ else "DEFAULT NULL"
+ ),
)
return self.migration_context
@overload
- def invoke(self, operation: CreateTableOp) -> Table:
- ...
+ def invoke(self, operation: CreateTableOp) -> Table: ...
@overload
def invoke(
DropTableOp,
ExecuteSQLOp,
],
- ) -> None:
- ...
+ ) -> None: ...
@overload
- def invoke(self, operation: MigrateOperation) -> Any:
- ...
+ def invoke(self, operation: MigrateOperation) -> Any: ...
def invoke(self, operation: MigrateOperation) -> Any:
"""Given a :class:`.MigrateOperation`, invoke it in terms of
info=self.info.copy() if self.info else {},
prefixes=list(self.prefixes) if self.prefixes else [],
schema=self.schema,
- _constraints_included=self._reverse._constraints_included
- if self._reverse
- else False,
+ _constraints_included=(
+ self._reverse._constraints_included if self._reverse else False
+ ),
**self.table_kw,
)
return t
t = sa_schema.Table(name, m, *cols, **kw)
constraints = [
- sqla_compat._copy(elem, target_table=t)
- if getattr(elem, "parent", None) is not t
- and getattr(elem, "parent", None) is not None
- else elem
+ (
+ sqla_compat._copy(elem, target_table=t)
+ if getattr(elem, "parent", None) is not t
+ and getattr(elem, "parent", None) is not None
+ else elem
+ )
for elem in columns
if isinstance(elem, (Constraint, Index))
]
class EnvironmentContext(util.ModuleClsProxy):
-
"""A configurational facade made available in an ``env.py`` script.
The :class:`.EnvironmentContext` acts as a *facade* to the more
return self.context_opts.get("tag", None) # type: ignore[no-any-return] # noqa: E501
@overload
- def get_x_argument(self, as_dictionary: Literal[False]) -> List[str]:
- ...
+ def get_x_argument(self, as_dictionary: Literal[False]) -> List[str]: ...
@overload
- def get_x_argument(self, as_dictionary: Literal[True]) -> Dict[str, str]:
- ...
+ def get_x_argument(
+ self, as_dictionary: Literal[True]
+ ) -> Dict[str, str]: ...
@overload
def get_x_argument(
self, as_dictionary: bool = ...
- ) -> Union[List[str], Dict[str, str]]:
- ...
+ ) -> Union[List[str], Dict[str, str]]: ...
def get_x_argument(
self, as_dictionary: bool = False
class MigrationContext:
-
"""Represent the database state made available to a migration
script.
log.info("Generating static SQL")
log.info(
"Will assume %s DDL.",
- "transactional"
- if self.impl.transactional_ddl
- else "non-transactional",
+ (
+ "transactional"
+ if self.impl.transactional_ddl
+ else "non-transactional"
+ ),
)
@classmethod
# except that it will not know it's in "autocommit" and will
# emit deprecation warnings when an autocommit action takes
# place.
- self.connection = (
- self.impl.connection
- ) = base_connection.execution_options(isolation_level="AUTOCOMMIT")
+ self.connection = self.impl.connection = (
+ base_connection.execution_options(isolation_level="AUTOCOMMIT")
+ )
# sqlalchemy future mode will "autobegin" in any case, so take
# control of that "transaction" here
if TYPE_CHECKING:
@property
- def doc(self) -> Optional[str]:
- ...
+ def doc(self) -> Optional[str]: ...
@property
def name(self) -> str:
class ScriptDirectory:
-
"""Provides operations upon an Alembic script directory.
This object is useful to get information as to current revisions,
if depends_on:
with self._catch_revision_errors():
resolved_depends_on = [
- dep
- if dep in rev.branch_labels # maintain branch labels
- else rev.revision # resolve partial revision identifiers
+ (
+ dep
+ if dep in rev.branch_labels # maintain branch labels
+ else rev.revision
+ ) # resolve partial revision identifiers
for rev, dep in [
(not_none(self.revision_map.get_revision(dep)), dep)
for dep in util.to_list(depends_on)
class Script(revision.Revision):
-
"""Represent a single revision file in a ``versions/`` directory.
The :class:`.Script` instance is returned by methods
if head_indicators or tree_indicators:
text += "%s%s%s" % (
" (head)" if self._is_real_head else "",
- " (effective head)"
- if self.is_head and not self._is_real_head
- else "",
+ (
+ " (effective head)"
+ if self.is_head and not self._is_real_head
+ else ""
+ ),
" (current)" if self._db_current_indicator else "",
)
if tree_indicators:
inclusive: bool,
implicit_base: bool,
assert_relative_length: bool,
- ) -> Tuple[Set[Revision], Tuple[Optional[_RevisionOrBase], ...]]:
- ...
+ ) -> Tuple[Set[Revision], Tuple[Optional[_RevisionOrBase], ...]]: ...
class RevisionError(Exception):
resolved_target = target
resolved_test_against_revs = [
- self._revision_for_ident(test_against_rev)
- if not isinstance(test_against_rev, Revision)
- else test_against_rev
+ (
+ self._revision_for_ident(test_against_rev)
+ if not isinstance(test_against_rev, Revision)
+ else test_against_rev
+ )
for test_against_rev in util.to_tuple(
test_against_revs, default=()
)
# each time but it was getting complicated
current_heads[current_candidate_idx] = heads_to_add[0]
current_heads.extend(heads_to_add[1:])
- ancestors_by_idx[
- current_candidate_idx
- ] = get_ancestors(heads_to_add[0])
+ ancestors_by_idx[current_candidate_idx] = (
+ get_ancestors(heads_to_add[0])
+ )
ancestors_by_idx.extend(
get_ancestors(head) for head in heads_to_add[1:]
)
branch_label = symbol
# Walk down the tree to find downgrade target.
rev = self._walk(
- start=self.get_revision(symbol)
- if branch_label is None
- else self.get_revision("%s@%s" % (branch_label, symbol)),
+ start=(
+ self.get_revision(symbol)
+ if branch_label is None
+ else self.get_revision(
+ "%s@%s" % (branch_label, symbol)
+ )
+ ),
steps=rel_int,
no_overwalk=assert_relative_length,
)
)
return (
self._walk(
- start=self.get_revision(symbol)
- if branch_label is None
- else self.get_revision("%s@%s" % (branch_label, symbol)),
+ start=(
+ self.get_revision(symbol)
+ if branch_label is None
+ else self.get_revision(
+ "%s@%s" % (branch_label, symbol)
+ )
+ ),
steps=relative,
no_overwalk=assert_relative_length,
),
@overload
-def tuple_rev_as_scalar(rev: None) -> None:
- ...
+def tuple_rev_as_scalar(rev: None) -> None: ...
@overload
def tuple_rev_as_scalar(
rev: Union[Tuple[_T, ...], List[_T]]
-) -> Union[_T, Tuple[_T, ...], List[_T]]:
- ...
+) -> Union[_T, Tuple[_T, ...], List[_T]]: ...
def tuple_rev_as_scalar(
"x",
column.name,
existing_type=column.type,
- existing_server_default=column.server_default
- if column.server_default is not None
- else False,
+ existing_server_default=(
+ column.server_default
+ if column.server_default is not None
+ else False
+ ),
existing_nullable=True if column.nullable else False,
# existing_comment=column.comment,
nullable=to_.get("nullable", None),
new_col["type"],
new_col.get("default", None),
compare.get("type", old_col["type"]),
- compare["server_default"].text
- if "server_default" in compare
- else column.server_default.arg.text
- if column.server_default is not None
- else None,
+ (
+ compare["server_default"].text
+ if "server_default" in compare
+ else (
+ column.server_default.arg.text
+ if column.server_default is not None
+ else None
+ )
+ ),
)
self.context = MigrationContext.configure(
dialect=conn.dialect, opts=opts
)
- self.context.output_buffer = (
- self.context.impl.output_buffer
- ) = io.StringIO()
+ self.context.output_buffer = self.context.impl.output_buffer = (
+ io.StringIO()
+ )
else:
self.context = MigrationContext.configure(
connection=conn, opts=opts
@overload
-def to_tuple(x: Any, default: Tuple[Any, ...]) -> Tuple[Any, ...]:
- ...
+def to_tuple(x: Any, default: Tuple[Any, ...]) -> Tuple[Any, ...]: ...
@overload
-def to_tuple(x: None, default: Optional[_T] = ...) -> _T:
- ...
+def to_tuple(x: None, default: Optional[_T] = ...) -> _T: ...
@overload
def to_tuple(
x: Any, default: Optional[Tuple[Any, ...]] = None
-) -> Tuple[Any, ...]:
- ...
+) -> Tuple[Any, ...]: ...
def to_tuple(
class _CompilerProtocol(Protocol):
- def __call__(self, element: Any, compiler: Any, **kw: Any) -> str:
- ...
+ def __call__(self, element: Any, compiler: Any, **kw: Any) -> str: ...
def _safe_int(value: str) -> Union[int, str]:
def compiles(
element: Type[ClauseElement], *dialects: str
- ) -> Callable[[_CompilerProtocol], _CompilerProtocol]:
- ...
+ ) -> Callable[[_CompilerProtocol], _CompilerProtocol]: ...
else:
from sqlalchemy.ext.compiler import compiles
database in process.
"""
+
import logging
import sys
ignore =
A003,
D,
- E203,E305,E711,E712,E721,E722,E741,
+ E203,E305,E704,E711,E712,E721,E722,E741,
N801,N802,N806,
RST304,RST303,RST299,RST399,
W503,W504
diffs = {
(
cmd,
- isinstance(obj, (UniqueConstraint, Index))
- if obj.name is not None
- else False,
+ (
+ isinstance(obj, (UniqueConstraint, Index))
+ if obj.name is not None
+ else False
+ ),
)
for cmd, obj in diffs
}
class NoUqReportsIndAsUqTest(NoUqReflectionIndexTest):
-
"""this test suite simulates the condition where:
a. the dialect doesn't report unique constraints
class AutogenRenderTest(TestBase):
-
"""test individual directives"""
def setUp(self):
)
args["tname_colnames"] = ", ".join(
- "CAST(%(schema)stname.%(name)s AS %(type)s) AS %(cast_label)s"
- % {
- "schema": args["schema"],
- "name": name,
- "type": impl.new_table.c[name].type,
- "cast_label": name if sqla_14 else "anon_1",
- }
- if (
- impl.new_table.c[name].type._type_affinity
- is not impl.table.c[name].type._type_affinity
+ (
+ "CAST(%(schema)stname.%(name)s AS %(type)s) AS %(cast_label)s"
+ % {
+ "schema": args["schema"],
+ "name": name,
+ "type": impl.new_table.c[name].type,
+ "cast_label": name if sqla_14 else "anon_1",
+ }
+ if (
+ impl.new_table.c[name].type._type_affinity
+ is not impl.table.c[name].type._type_affinity
+ )
+ else "%(schema)stname.%(name)s"
+ % {"schema": args["schema"], "name": name}
)
- else "%(schema)stname.%(name)s"
- % {"schema": args["schema"], "name": name}
for name in colnames
if name in impl.table.c
)
"""Test op functions against MSSQL."""
+
from __future__ import annotations
from typing import Any
expected_nullability = not existing_nullability
args["nullable"] = expected_nullability
else:
- args[
- "existing_nullable"
- ] = expected_nullability = existing_nullability
+ args["existing_nullable"] = expected_nullability = (
+ existing_nullability
+ )
op.alter_column("t", "c", **args)
class BranchFromMergepointTest(MigrationTest):
-
"""this is a form that will come up frequently in the
"many independent roots with cross-dependencies" case.
class BranchFrom3WayMergepointTest(MigrationTest):
-
"""this is a form that will come up frequently in the
"many independent roots with cross-dependencies" case.
backports.zoneinfo;python_version<"3.9"
tzdata
zimports
- black==23.3.0
+ black==24.1.1
greenlet>=1
pydocstyle<4.0.0
# used by flake8-rst-docstrings
pygments
- black==23.3.0
+ black==24.1.1
commands =
flake8 ./alembic/ ./tests/ setup.py docs/build/conf.py {posargs}
black --check setup.py tests alembic