/docs/build/_build/
/pysqlite_test_schema.db
*.sqlite3
+.mypy_cache/
from . import context
from . import op
-__version__ = "1.14.2"
+__version__ = "1.15.0"
(inspector),
# fmt: on
)
- sqla_compat._reflect_table(inspector, t)
+ inspector.reflect_table(t, include_columns=None)
if autogen_context.run_object_filters(t, tname, "table", True, None):
modify_table_ops = ops.ModifyTableOps(tname, [], schema=s)
_compat_autogen_column_reflect(inspector),
# fmt: on
)
- sqla_compat._reflect_table(inspector, t)
+ inspector.reflect_table(t, include_columns=None)
conn_column_info[(s, tname)] = t
for s, tname in sorted(existing_tables, key=lambda x: (x[0] or "", x[1])):
return False
if sqla_compat._server_default_is_computed(metadata_default):
- # return False in case of a computed column as the server
- # default. Note that DDL for adding or removing "GENERATED AS" from
- # an existing column is not currently known for any backend.
- # Once SQLAlchemy can reflect "GENERATED" as the "computed" element,
- # we would also want to ignore and/or warn for changes vs. the
- # metadata (or support backend specific DDL if applicable).
- if not sqla_compat.has_computed_reflection:
- return False
-
- else:
- return (
- _compare_computed_default( # type:ignore[func-returns-value]
- autogen_context,
- alter_column_op,
- schema,
- tname,
- cname,
- conn_col,
- metadata_col,
- )
- )
+ return _compare_computed_default( # type:ignore[func-returns-value]
+ autogen_context,
+ alter_column_op,
+ schema,
+ tname,
+ cname,
+ conn_col,
+ metadata_col,
+ )
if sqla_compat._server_default_is_computed(conn_col_default):
_warn_computed_not_supported(tname, cname)
return False
if TYPE_CHECKING:
from typing import Literal
+ from sqlalchemy import Computed
+ from sqlalchemy import Identity
from sqlalchemy.sql.base import DialectKWArgs
from sqlalchemy.sql.elements import ColumnElement
from sqlalchemy.sql.elements import TextClause
from alembic.config import Config
from alembic.operations.ops import MigrationScript
from alembic.operations.ops import ModifyTableOps
- from alembic.util.sqla_compat import Computed
- from alembic.util.sqla_compat import Identity
MAX_PYTHON_ARGS = 255
+ [
"%s=%s"
% (key, _render_potential_expr(val, autogen_context))
- for key, val in sqla_compat._column_kwargs(column).items()
+ for key, val in column.kwargs.items()
]
)
),
if TYPE_CHECKING:
from typing import Any
+ from sqlalchemy import Computed
+ from sqlalchemy import Identity
from sqlalchemy.sql.compiler import Compiled
from sqlalchemy.sql.compiler import DDLCompiler
from sqlalchemy.sql.elements import TextClause
from sqlalchemy.sql.type_api import TypeEngine
from .impl import DefaultImpl
- from ..util.sqla_compat import Computed
- from ..util.sqla_compat import Identity
_ServerDefault = Union["TextClause", "FetchedValue", "Function[Any]", str]
if self.as_sql:
for row in rows:
self._exec(
- sqla_compat._insert_inline(table).values(
+ table.insert()
+ .inline()
+ .values(
**{
k: (
sqla_compat._literal_bindparam(
else:
if rows:
if multiinsert:
- self._exec(
- sqla_compat._insert_inline(table), multiparams=rows
- )
+ self._exec(table.insert().inline(), multiparams=rows)
else:
for row in rows:
- self._exec(
- sqla_compat._insert_inline(table).values(**row)
- )
+ self._exec(table.insert().inline().values(**row))
def _tokenize_column_type(self, column: Column) -> Params:
definition: str
diff, ignored = _compare_identity_options(
metadata_identity,
inspector_identity,
- sqla_compat.Identity(),
+ schema.Identity(),
skip={"always"},
)
set(meta_d).union(insp_d),
)
if sqla_compat.identity_has_dialect_kwargs:
+ assert hasattr(default_io, "dialect_kwargs")
# use only the dialect kwargs in inspector_io since metadata_io
# can have options for many backends
check_dicts(
getattr(metadata_io, "dialect_kwargs", {}),
getattr(inspector_io, "dialect_kwargs", {}),
- default_io.dialect_kwargs, # type: ignore[union-attr]
+ default_io.dialect_kwargs,
getattr(inspector_io, "dialect_kwargs", {}),
)
from .impl import DefaultImpl
from .. import util
from ..util import sqla_compat
-from ..util.sqla_compat import _is_mariadb
from ..util.sqla_compat import _is_type_bound
from ..util.sqla_compat import compiles
# note that SQLAlchemy as of 1.2 does not yet support
# DROP CONSTRAINT for MySQL/MariaDB, so we implement fully
# here.
- if _is_mariadb(compiler.dialect):
+ if compiler.dialect.is_mariadb: # type: ignore[attr-defined]
return "ALTER TABLE %s DROP CONSTRAINT %s" % (
compiler.preparer.format_table(constraint.table),
compiler.preparer.format_constraint(constraint),
from sqlalchemy import Column
from sqlalchemy import Float
+from sqlalchemy import Identity
from sqlalchemy import literal_column
from sqlalchemy import Numeric
+from sqlalchemy import select
from sqlalchemy import text
from sqlalchemy import types as sqltypes
from sqlalchemy.dialects.postgresql import BIGINT
conn = self.connection
assert conn is not None
return not conn.scalar(
- sqla_compat._select(
- literal_column(conn_col_default) == metadata_default
- )
+ select(literal_column(conn_col_default) == metadata_default)
)
def alter_column( # type:ignore[override]
)
else:
text += "SET %s " % compiler.get_identity_options(
- sqla_compat.Identity(**{attr: getattr(identity, attr)})
+ Identity(**{attr: getattr(identity, attr)})
)
return text
from typing import Union
from sqlalchemy import cast
+from sqlalchemy import Computed
from sqlalchemy import JSON
from sqlalchemy import schema
from sqlalchemy import sql
) and isinstance(col.server_default.arg, sql.ClauseElement):
return True
elif (
- isinstance(col.server_default, util.sqla_compat.Computed)
+ isinstance(col.server_default, Computed)
and col.server_default.persisted
):
return True
from sqlalchemy import MetaData
from sqlalchemy import PrimaryKeyConstraint
from sqlalchemy import schema as sql_schema
+from sqlalchemy import select
from sqlalchemy import Table
from sqlalchemy import types as sqltypes
from sqlalchemy.sql.schema import SchemaEventTarget
from ..util.sqla_compat import _ensure_scope_for_ddl
from ..util.sqla_compat import _fk_is_self_referential
from ..util.sqla_compat import _idx_table_bound_expressions
-from ..util.sqla_compat import _insert_inline
from ..util.sqla_compat import _is_type_bound
from ..util.sqla_compat import _remove_column_from_collection
from ..util.sqla_compat import _resolve_for_variant
-from ..util.sqla_compat import _select
from ..util.sqla_compat import constraint_name_defined
from ..util.sqla_compat import constraint_name_string
try:
op_impl._exec(
- _insert_inline(self.new_table).from_select(
+ self.new_table.insert()
+ .inline()
+ .from_select(
list(
k
for k, transfer in self.column_transfers.items()
if "expr" in transfer
),
- _select(
+ select(
*[
transfer["expr"]
for transfer in self.column_transfers.values()
from . import ops
from .base import Operations
from ..util.sqla_compat import _copy
-from ..util.sqla_compat import sqla_14
if TYPE_CHECKING:
from sqlalchemy.sql.schema import Table
def drop_table(operations: "Operations", operation: "ops.DropTableOp") -> None:
kw = {}
if operation.if_exists is not None:
- if not sqla_14:
- raise NotImplementedError("SQLAlchemy 1.4+ required")
-
kw["if_exists"] = operation.if_exists
operations.impl.drop_table(
operation.to_table(operations.migration_context), **kw
idx = operation.to_index(operations.migration_context)
kw = {}
if operation.if_not_exists is not None:
- if not sqla_14:
- raise NotImplementedError("SQLAlchemy 1.4+ required")
-
kw["if_not_exists"] = operation.if_not_exists
operations.impl.create_index(idx, **kw)
def drop_index(operations: "Operations", operation: "ops.DropIndexOp") -> None:
kw = {}
if operation.if_exists is not None:
- if not sqla_14:
- raise NotImplementedError("SQLAlchemy 1.4+ required")
-
kw["if_exists"] = operation.if_exists
operations.impl.drop_index(
) -> "Table":
kw = {}
if operation.if_not_exists is not None:
- if not sqla_14:
- raise NotImplementedError("SQLAlchemy 1.4+ required")
-
kw["if_not_exists"] = operation.if_not_exists
table = operation.to_table(operations.migration_context)
operations.impl.create_table(table, **kw)
from sqlalchemy import Column
from sqlalchemy import literal_column
+from sqlalchemy import select
from sqlalchemy.engine import Engine
from sqlalchemy.engine import url as sqla_url
from sqlalchemy.engine.strategies import MockEngineStrategy
from .. import util
from ..util import sqla_compat
from ..util.compat import EncodedIO
-from ..util.sqla_compat import _select
if TYPE_CHECKING:
from sqlalchemy.engine import Dialect
return tuple(
row[0]
for row in self.connection.execute(
- _select(self._version.c.version_num)
+ select(self._version.c.version_num)
)
)
from sqlalchemy import exc as sa_exc
from sqlalchemy.engine import default
+from sqlalchemy.engine import URL
from sqlalchemy.testing.assertions import _expect_warnings
from sqlalchemy.testing.assertions import eq_ # noqa
from sqlalchemy.testing.assertions import is_ # noqa
from sqlalchemy.testing.assertions import ne_ # noqa
from sqlalchemy.util import decorator
-from ..util import sqla_compat
-
def _assert_proper_exception_context(exception):
"""assert that any exception we're catching does not have a __context__
if name is None or name == "default":
return default.DefaultDialect()
else:
- d = sqla_compat._create_url(name).get_dialect()()
+ d = URL.create(name).get_dialect()()
if name == "postgresql":
d.implicit_returning = True
def _sqlite_file_db(tempname="foo.db", future=False, scope=None, **options):
dir_ = _join_path(_get_staging_directory(), "scripts")
url = "sqlite:///%s/%s" % (dir_, tempname)
- if scope and util.sqla_14:
+ if scope:
options["scope"] = scope
return testing_util.testing_engine(url=url, future=future, options=options)
from typing import Dict
from sqlalchemy import Column
+from sqlalchemy import create_mock_engine
from sqlalchemy import inspect
from sqlalchemy import MetaData
from sqlalchemy import String
from sqlalchemy.testing import config
from sqlalchemy.testing import mock
from sqlalchemy.testing.assertions import eq_
+from sqlalchemy.testing.fixtures import FutureEngineMixin
from sqlalchemy.testing.fixtures import TablesTest as SQLAlchemyTablesTest
from sqlalchemy.testing.fixtures import TestBase as SQLAlchemyTestBase
from ..migration import MigrationContext
from ..operations import Operations
from ..util import sqla_compat
-from ..util.sqla_compat import create_mock_engine
-from ..util.sqla_compat import sqla_14
from ..util.sqla_compat import sqla_2
pass
-if sqla_14:
- from sqlalchemy.testing.fixtures import FutureEngineMixin
-else:
-
- class FutureEngineMixin: # type:ignore[no-redef]
- __requires__ = ("sqlalchemy_14",)
-
-
FutureEngineMixin.is_sqlalchemy_future = True
opts["as_sql"] = as_sql
if literal_binds:
opts["literal_binds"] = literal_binds
- if not sqla_14 and dialect == "mariadb":
- ctx_dialect = _get_dialect("mysql")
- ctx_dialect.server_version_info = (10, 4, 0, "MariaDB")
- else:
- ctx_dialect = _get_dialect(dialect)
+ ctx_dialect = _get_dialect(dialect)
if native_boolean is not None:
ctx_dialect.supports_native_boolean = native_boolean
# this is new as of SQLAlchemy 1.2.7 and is used by SQL Server,
from sqlalchemy.testing.requirements import Requirements
from alembic import util
-from alembic.util import sqla_compat
from ..testing import exclusions
def reflects_fk_options(self):
return exclusions.closed()
- @property
- def sqlalchemy_14(self):
- return exclusions.skip_if(
- lambda config: not util.sqla_14,
- "SQLAlchemy 1.4 or greater required",
- )
-
@property
def sqlalchemy_1x(self):
return exclusions.skip_if(
else:
return True
- return self.sqlalchemy_14 + exclusions.only_if(go)
+ return exclusions.only_if(go)
@property
def comments(self):
def computed_columns(self):
return exclusions.closed()
- @property
- def computed_columns_api(self):
- return exclusions.only_if(
- exclusions.BooleanPredicate(sqla_compat.has_computed)
- )
-
- @property
- def computed_reflects_normally(self):
- return exclusions.only_if(
- exclusions.BooleanPredicate(sqla_compat.has_computed_reflection)
- )
-
- @property
- def computed_reflects_as_server_default(self):
- return exclusions.closed()
-
- @property
- def computed_doesnt_reflect_as_server_default(self):
- return exclusions.closed()
-
@property
def autoincrement_on_composite_pk(self):
return exclusions.closed()
@property
def identity_columns_alter(self):
return exclusions.closed()
-
- @property
- def identity_columns_api(self):
- return exclusions.only_if(
- exclusions.BooleanPredicate(sqla_compat.has_identity)
- )
from ._autogen_fixtures import AutogenFixtureTest
from ... import testing
-from ...testing import config
from ...testing import eq_
-from ...testing import exclusions
from ...testing import is_
from ...testing import is_true
from ...testing import mock
c = diffs[0][3]
eq_(c.name, "foo")
- if config.requirements.computed_reflects_normally.enabled:
- is_true(isinstance(c.computed, sa.Computed))
- else:
- is_(c.computed, None)
-
- if config.requirements.computed_reflects_as_server_default.enabled:
- is_true(isinstance(c.server_default, sa.DefaultClause))
- eq_(str(c.server_default.arg.text), "5")
- elif config.requirements.computed_reflects_normally.enabled:
- is_true(isinstance(c.computed, sa.Computed))
- else:
- is_(c.computed, None)
+ is_true(isinstance(c.computed, sa.Computed))
+ is_true(isinstance(c.server_default, sa.Computed))
@testing.combinations(
lambda: (None, sa.Computed("bar*5")),
),
lambda: (sa.Computed("bar*5"), sa.Computed("bar * 42")),
)
- @config.requirements.computed_reflects_normally
def test_cant_change_computed_warning(self, test_case):
arg_before, arg_after = testing.resolve_lambda(test_case, **locals())
m1 = MetaData()
lambda: (sa.Computed("5"), sa.Computed("5")),
lambda: (sa.Computed("bar*5"), sa.Computed("bar*5")),
lambda: (sa.Computed("bar*5"), sa.Computed("bar * \r\n\t5")),
- (
- lambda: (sa.Computed("bar*5"), None),
- config.requirements.computed_doesnt_reflect_as_server_default,
- ),
)
def test_computed_unchanged(self, test_case):
arg_before, arg_after = testing.resolve_lambda(test_case, **locals())
eq_(mock_warn.mock_calls, [])
eq_(list(diffs), [])
-
- @config.requirements.computed_reflects_as_server_default
- def test_remove_computed_default_on_computed(self):
- """Asserts the current behavior which is that on PG and Oracle,
- the GENERATED ALWAYS AS is reflected as a server default which we can't
- tell is actually "computed", so these come out as a modification to
- the server default.
-
- """
- m1 = MetaData()
- m2 = MetaData()
-
- Table(
- "user",
- m1,
- Column("id", Integer, primary_key=True),
- Column("bar", Integer),
- Column("foo", Integer, sa.Computed("bar + 42")),
- )
-
- Table(
- "user",
- m2,
- Column("id", Integer, primary_key=True),
- Column("bar", Integer),
- Column("foo", Integer),
- )
-
- diffs = self._fixture(m1, m2)
-
- eq_(diffs[0][0][0], "modify_default")
- eq_(diffs[0][0][2], "user")
- eq_(diffs[0][0][3], "foo")
- old = diffs[0][0][-2]
- new = diffs[0][0][-1]
-
- is_(new, None)
- is_true(isinstance(old, sa.DefaultClause))
-
- if exclusions.against(config, "postgresql"):
- eq_(str(old.arg.text), "(bar + 42)")
- elif exclusions.against(config, "oracle"):
- eq_(str(old.arg.text), '"BAR"+42')
from sqlalchemy import exc as sa_exc
-from ..util import sqla_14
-
def setup_filters():
"""Set global warning behavior for the test suite."""
# some selected deprecations...
warnings.filterwarnings("error", category=DeprecationWarning)
- if not sqla_14:
- # 1.3 uses pkg_resources in PluginLoader
- warnings.filterwarnings(
- "ignore",
- "pkg_resources is deprecated as an API",
- DeprecationWarning,
- )
try:
import pytest
except ImportError:
from .pyfiles import load_python_file as load_python_file
from .pyfiles import pyc_file_from_path as pyc_file_from_path
from .pyfiles import template_to_file as template_to_file
-from .sqla_compat import has_computed as has_computed
-from .sqla_compat import sqla_13 as sqla_13
-from .sqla_compat import sqla_14 as sqla_14
from .sqla_compat import sqla_2 as sqla_2
-
-
-if not sqla_13:
- raise CommandError("SQLAlchemy 1.3.0 or greater is required.")
from sqlalchemy.engine import url
-from . import sqla_compat
-
log = logging.getLogger(__name__)
# disable "no handler found" errors
def obfuscate_url_pw(input_url: str) -> str:
- u = url.make_url(input_url)
- return sqla_compat.url_render_as_string(u, hide_password=True) # type: ignore # noqa: E501
+ return url.make_url(input_url).render_as_string(hide_password=True)
def warn(msg: str, stacklevel: int = 2) -> None:
from typing import Dict
from typing import Iterable
from typing import Iterator
-from typing import Mapping
from typing import Optional
from typing import Protocol
from typing import Set
from typing import Union
from sqlalchemy import __version__
-from sqlalchemy import inspect
from sqlalchemy import schema
from sqlalchemy import sql
from sqlalchemy import types as sqltypes
-from sqlalchemy.engine import url
from sqlalchemy.schema import CheckConstraint
from sqlalchemy.schema import Column
from sqlalchemy.schema import ForeignKeyConstraint
from sqlalchemy.sql.base import DialectKWArgs
from sqlalchemy.sql.elements import BindParameter
from sqlalchemy.sql.elements import ColumnClause
-from sqlalchemy.sql.elements import quoted_name
from sqlalchemy.sql.elements import TextClause
from sqlalchemy.sql.elements import UnaryExpression
from sqlalchemy.sql.visitors import traverse
from typing_extensions import TypeGuard
+if True:
+ from sqlalchemy.sql.naming import _NONE_NAME as _NONE_NAME # type: ignore[attr-defined] # noqa: E501
+
if TYPE_CHECKING:
from sqlalchemy import ClauseElement
+ from sqlalchemy import Identity
from sqlalchemy import Index
from sqlalchemy import Table
from sqlalchemy.engine import Connection
from sqlalchemy.engine import Dialect
from sqlalchemy.engine import Transaction
- from sqlalchemy.engine.reflection import Inspector
from sqlalchemy.sql.base import ColumnCollection
from sqlalchemy.sql.compiler import SQLCompiler
- from sqlalchemy.sql.dml import Insert
from sqlalchemy.sql.elements import ColumnElement
from sqlalchemy.sql.schema import Constraint
from sqlalchemy.sql.schema import SchemaItem
- from sqlalchemy.sql.selectable import Select
- from sqlalchemy.sql.selectable import TableClause
_CE = TypeVar("_CE", bound=Union["ColumnElement[Any]", "SchemaItem"])
_vers = tuple(
[_safe_int(x) for x in re.findall(r"(\d+|[abc]\d)", __version__)]
)
-sqla_13 = _vers >= (1, 3)
-sqla_14 = _vers >= (1, 4)
# https://docs.sqlalchemy.org/en/latest/changelog/changelog_14.html#change-0c6e0cc67dfe6fac5164720e57ef307d
sqla_14_18 = _vers >= (1, 4, 18)
sqla_14_26 = _vers >= (1, 4, 26)
sqla_2 = _vers >= (2,)
sqlalchemy_version = __version__
-try:
- from sqlalchemy.sql.naming import _NONE_NAME as _NONE_NAME # type: ignore[attr-defined] # noqa: E501
-except ImportError:
- from sqlalchemy.sql.elements import _NONE_NAME as _NONE_NAME # type: ignore # noqa: E501
-
-
-class _Unsupported:
- "Placeholder for unsupported SQLAlchemy classes"
-
-
if TYPE_CHECKING:
def compiles(
else:
from sqlalchemy.ext.compiler import compiles
-try:
- from sqlalchemy import Computed as Computed
-except ImportError:
- if not TYPE_CHECKING:
- class Computed(_Unsupported):
- pass
+identity_has_dialect_kwargs = issubclass(schema.Identity, DialectKWArgs)
- has_computed = False
- has_computed_reflection = False
-else:
- has_computed = True
- has_computed_reflection = _vers >= (1, 3, 16)
-
-try:
- from sqlalchemy import Identity as Identity
-except ImportError:
- if not TYPE_CHECKING:
- class Identity(_Unsupported):
- pass
+def _get_identity_options_dict(
+ identity: Union[Identity, schema.Sequence, None],
+ dialect_kwargs: bool = False,
+) -> Dict[str, Any]:
+ if identity is None:
+ return {}
+ elif identity_has_dialect_kwargs:
+ assert hasattr(identity, "_as_dict")
+ as_dict = identity._as_dict()
+ if dialect_kwargs:
+ assert isinstance(identity, DialectKWArgs)
+ as_dict.update(identity.dialect_kwargs)
+ else:
+ as_dict = {}
+ if isinstance(identity, schema.Identity):
+ # always=None means something different than always=False
+ as_dict["always"] = identity.always
+ if identity.on_null is not None:
+ as_dict["on_null"] = identity.on_null
+ # attributes common to Identity and Sequence
+ attrs = (
+ "start",
+ "increment",
+ "minvalue",
+ "maxvalue",
+ "nominvalue",
+ "nomaxvalue",
+ "cycle",
+ "cache",
+ "order",
+ )
+ as_dict.update(
+ {
+ key: getattr(identity, key, None)
+ for key in attrs
+ if getattr(identity, key, None) is not None
+ }
+ )
+ return as_dict
- has_identity = False
-else:
- identity_has_dialect_kwargs = issubclass(Identity, DialectKWArgs)
-
- def _get_identity_options_dict(
- identity: Union[Identity, schema.Sequence, None],
- dialect_kwargs: bool = False,
- ) -> Dict[str, Any]:
- if identity is None:
- return {}
- elif identity_has_dialect_kwargs:
- as_dict = identity._as_dict() # type: ignore
- if dialect_kwargs:
- assert isinstance(identity, DialectKWArgs)
- as_dict.update(identity.dialect_kwargs)
- else:
- as_dict = {}
- if isinstance(identity, Identity):
- # always=None means something different than always=False
- as_dict["always"] = identity.always
- if identity.on_null is not None:
- as_dict["on_null"] = identity.on_null
- # attributes common to Identity and Sequence
- attrs = (
- "start",
- "increment",
- "minvalue",
- "maxvalue",
- "nominvalue",
- "nomaxvalue",
- "cycle",
- "cache",
- "order",
- )
- as_dict.update(
- {
- key: getattr(identity, key, None)
- for key in attrs
- if getattr(identity, key, None) is not None
- }
- )
- return as_dict
-
- has_identity = True
if sqla_2:
from sqlalchemy.sql.base import _NoneName
_ConstraintName = Union[None, str, _NoneName]
-
_ConstraintNameDefined = Union[str, _NoneName]
return name is _NONE_NAME or isinstance(name, (str, _NoneName))
-def constraint_name_string(
- name: _ConstraintName,
-) -> TypeGuard[str]:
+def constraint_name_string(name: _ConstraintName) -> TypeGuard[str]:
return isinstance(name, str)
-def constraint_name_or_none(
- name: _ConstraintName,
-) -> Optional[str]:
+def constraint_name_or_none(name: _ConstraintName) -> Optional[str]:
return name if constraint_name_string(name) else None
yield
-def url_render_as_string(url, hide_password=True):
- if sqla_14:
- return url.render_as_string(hide_password=hide_password)
- else:
- return url.__to_string__(hide_password=hide_password)
-
-
def _safe_begin_connection_transaction(
connection: Connection,
) -> Transaction:
- transaction = _get_connection_transaction(connection)
+ transaction = connection.get_transaction()
if transaction:
return transaction
else:
def _safe_commit_connection_transaction(
connection: Connection,
) -> None:
- transaction = _get_connection_transaction(connection)
+ transaction = connection.get_transaction()
if transaction:
transaction.commit()
def _safe_rollback_connection_transaction(
connection: Connection,
) -> None:
- transaction = _get_connection_transaction(connection)
+ transaction = connection.get_transaction()
if transaction:
transaction.rollback()
return schema_item.copy(**kw) # type: ignore[union-attr]
-def _get_connection_transaction(
- connection: Connection,
-) -> Optional[Transaction]:
- if sqla_14:
- return connection.get_transaction()
- else:
- r = connection._root # type: ignore[attr-defined]
- return r._Connection__transaction
-
-
-def _create_url(*arg, **kw) -> url.URL:
- if hasattr(url.URL, "create"):
- return url.URL.create(*arg, **kw)
- else:
- return url.URL(*arg, **kw)
-
-
def _connectable_has_table(
connectable: Connection, tablename: str, schemaname: Union[str, None]
) -> bool:
- if sqla_14:
- return inspect(connectable).has_table(tablename, schemaname)
- else:
- return connectable.dialect.has_table(
- connectable, tablename, schemaname
- )
+ return connectable.dialect.has_table(connectable, tablename, schemaname)
def _exec_on_inspector(inspector, statement, **params):
- if sqla_14:
- with inspector._operation_context() as conn:
- return conn.execute(statement, params)
- else:
- return inspector.bind.execute(statement, params)
+ with inspector._operation_context() as conn:
+ return conn.execute(statement, params)
def _nullability_might_be_unset(metadata_column):
- if not sqla_14:
- return metadata_column.nullable
- else:
- from sqlalchemy.sql import schema
+ from sqlalchemy.sql import schema
- return (
- metadata_column._user_defined_nullable is schema.NULL_UNSPECIFIED
- )
+ return metadata_column._user_defined_nullable is schema.NULL_UNSPECIFIED
def _server_default_is_computed(*server_default) -> bool:
- if not has_computed:
- return False
- else:
- return any(isinstance(sd, Computed) for sd in server_default)
+ return any(isinstance(sd, schema.Computed) for sd in server_default)
def _server_default_is_identity(*server_default) -> bool:
- if not sqla_14:
- return False
- else:
- return any(isinstance(sd, Identity) for sd in server_default)
+ return any(isinstance(sd, schema.Identity) for sd in server_default)
def _table_for_constraint(constraint: Constraint) -> Table:
return list(constraint.columns)
-def _reflect_table(inspector: Inspector, table: Table) -> None:
- if sqla_14:
- return inspector.reflect_table(table, None)
- else:
- return inspector.reflecttable( # type: ignore[attr-defined]
- table, None
- )
-
-
def _resolve_for_variant(type_, dialect):
if _type_has_variants(type_):
base_type, mapping = _get_variant_mapping(type_)
return type_
-if hasattr(sqltypes.TypeEngine, "_variant_mapping"):
+if hasattr(sqltypes.TypeEngine, "_variant_mapping"): # 2.0
def _type_has_variants(type_):
return bool(type_._variant_mapping)
return compiler.render_literal_bindparam(element, **kw)
-def _column_kwargs(col: Column) -> Mapping:
- if sqla_13:
- return col.kwargs
- else:
- return {}
-
-
def _get_constraint_final_name(
constraint: Union[Index, Constraint], dialect: Optional[Dialect]
) -> Optional[str]:
if constraint.name is None:
return None
assert dialect is not None
- if sqla_14:
- # for SQLAlchemy 1.4 we would like to have the option to expand
- # the use of "deferred" names for constraints as well as to have
- # some flexibility with "None" name and similar; make use of new
- # SQLAlchemy API to return what would be the final compiled form of
- # the name for this dialect.
- return dialect.identifier_preparer.format_constraint(
- constraint, _alembic_quote=False
- )
- else:
- # prior to SQLAlchemy 1.4, work around quoting logic to get at the
- # final compiled name without quotes.
- if hasattr(constraint.name, "quote"):
- # might be quoted_name, might be truncated_name, keep it the
- # same
- quoted_name_cls: type = type(constraint.name)
- else:
- quoted_name_cls = quoted_name
-
- new_name = quoted_name_cls(str(constraint.name), quote=False)
- constraint = constraint.__class__(name=new_name)
-
- if isinstance(constraint, schema.Index):
- # name should not be quoted.
- d = dialect.ddl_compiler(dialect, None) # type: ignore[arg-type]
- return d._prepared_index_name(constraint)
- else:
- # name should not be quoted.
- return dialect.identifier_preparer.format_constraint(constraint)
+ # for SQLAlchemy 1.4 we would like to have the option to expand
+ # the use of "deferred" names for constraints as well as to have
+ # some flexibility with "None" name and similar; make use of new
+ # SQLAlchemy API to return what would be the final compiled form of
+ # the name for this dialect.
+ return dialect.identifier_preparer.format_constraint(
+ constraint, _alembic_quote=False
+ )
def _constraint_is_named(
constraint: Union[Constraint, Index], dialect: Optional[Dialect]
) -> bool:
- if sqla_14:
- if constraint.name is None:
- return False
- assert dialect is not None
- name = dialect.identifier_preparer.format_constraint(
- constraint, _alembic_quote=False
- )
- return name is not None
- else:
- return constraint.name is not None
-
-
-def _is_mariadb(mysql_dialect: Dialect) -> bool:
- if sqla_14:
- return mysql_dialect.is_mariadb # type: ignore[attr-defined]
- else:
- return bool(
- mysql_dialect.server_version_info
- and mysql_dialect._is_mariadb # type: ignore[attr-defined]
- )
-
-
-def _mariadb_normalized_version_info(mysql_dialect):
- return mysql_dialect._mariadb_normalized_version_info
-
-
-def _insert_inline(table: Union[TableClause, Table]) -> Insert:
- if sqla_14:
- return table.insert().inline()
- else:
- return table.insert(inline=True) # type: ignore[call-arg]
-
-
-if sqla_14:
- from sqlalchemy import create_mock_engine
-
- # weird mypy workaround
- from sqlalchemy import select as _sa_select
-
- _select = _sa_select
-else:
- from sqlalchemy import create_engine
-
- def create_mock_engine(url, executor, **kw): # type: ignore[misc]
- return create_engine(
- "postgresql://", strategy="mock", executor=executor
- )
-
- def _select(*columns, **kw) -> Select:
- return sql.select(list(columns), **kw) # type: ignore[call-overload]
+ if constraint.name is None:
+ return False
+ assert dialect is not None
+ name = dialect.identifier_preparer.format_constraint(
+ constraint, _alembic_quote=False
+ )
+ return name is not None
def is_expression_index(index: Index) -> bool:
==========
.. changelog::
- :version: 1.14.2
+ :version: 1.15.0
:include_notes_from: unreleased
.. changelog::
Alembic's install process will ensure that SQLAlchemy_
is installed, in addition to other dependencies. Alembic will work with
-SQLAlchemy as of version **1.3.0**.
+SQLAlchemy as of version **1.4.0**.
-.. versionchanged:: 1.5.0 Support for SQLAlchemy older than 1.3.0 was dropped.
+.. versionchanged:: 1.15.0 Support for SQLAlchemy older than 1.4.0 was dropped.
-Alembic supports Python versions **3.8 and above**
+Alembic supports Python versions **3.9 and above**
-.. versionchanged:: 1.13 Alembic now supports Python 3.8 and newer.
+.. versionchanged:: 1.15 Alembic now supports Python 3.9 and newer.
.. _versioning_scheme:
--- /dev/null
+.. change::
+ :tags: change, general
+
+ Support for Python 3.8 is dropped as of Alembic 1.15.0. Python 3.8 is EOL.
--- /dev/null
+.. change::
+ :tags: change, general
+
+ Support for SQLAlchemy 1.3, which was EOL as of 2021, is now dropped from
+ Alembic as of version 1.15.0.
packages = find_namespace:
include_package_data = true
zip_safe = false
-python_requires = >=3.8
+python_requires = >=3.9
install_requires =
- SQLAlchemy>=1.3.0
+ SQLAlchemy>=1.4.0
Mako
- importlib-metadata;python_version<"3.9"
- importlib-resources;python_version<"3.9"
typing-extensions>=4.12
[options.extras_require]
tz =
- backports.zoneinfo;python_version<"3.9"
tzdata
[options.package_data]
@property
def long_names(self):
- if sqla_compat.sqla_14:
- return exclusions.skip_if("oracle<18")
- else:
- return exclusions.skip_if("oracle")
+ return exclusions.skip_if("oracle<18")
@property
def reflects_pk_names(self):
def computed_columns(self):
# TODO: in theory if these could come from SQLAlchemy dialects
# that would be helpful
- return self.computed_columns_api + exclusions.skip_if(
+ return exclusions.skip_if(
["postgresql < 12", "sqlite < 3.31", "mysql < 5.7"]
)
- @property
- def computed_reflects_as_server_default(self):
- # note that this rule will go away when SQLAlchemy correctly
- # supports reflection of the "computed" construct; the element
- # will consistently be present as both column.computed and
- # column.server_default for all supported backends.
- return (
- self.computed_columns
- + exclusions.only_if(
- ["postgresql", "oracle"],
- "backend reflects computed construct as a server default",
- )
- + exclusions.skip_if(self.computed_reflects_normally)
- )
-
- @property
- def computed_doesnt_reflect_as_server_default(self):
- # note that this rule will go away when SQLAlchemy correctly
- # supports reflection of the "computed" construct; the element
- # will consistently be present as both column.computed and
- # column.server_default for all supported backends.
- return (
- self.computed_columns
- + exclusions.skip_if(
- ["postgresql", "oracle"],
- "backend reflects computed construct as a server default",
- )
- + exclusions.skip_if(self.computed_reflects_normally)
- )
-
@property
def check_constraint_reflection(self):
return exclusions.fails_on_everything_except(
# they prevent a column's name from being changed due to a bug in
# MariaDB 10.2 as well as MySQL 8.0.16
if exclusions.against(config, ["mysql", "mariadb"]):
- if sqla_compat._is_mariadb(config.db.dialect):
- mnvi = sqla_compat._mariadb_normalized_version_info
- norm_version_info = mnvi(config.db.dialect)
+ if config.db.dialect.is_mariadb:
+ norm_version_info = (
+ config.db.dialect._mariadb_normalized_version_info
+ )
return norm_version_info >= (10, 2) and norm_version_info < (
10,
2,
# 1. we have mysql / mariadb and
# 2. it enforces check constraints
if exclusions.against(config, ["mysql", "mariadb"]):
- if sqla_compat._is_mariadb(config.db.dialect):
- mnvi = sqla_compat._mariadb_normalized_version_info
- norm_version_info = mnvi(config.db.dialect)
+ if config.db.dialect.is_mariadb:
+ norm_version_info = (
+ config.db.dialect._mariadb_normalized_version_info
+ )
return norm_version_info >= (10, 2)
else:
norm_version_info = config.db.dialect.server_version_info
lambda config: exclusions.against(config, "mysql")
and (
(
- not config.db.dialect._is_mariadb
+ not config.db.dialect.is_mariadb
and exclusions.against(config, "mysql >= 5.7")
)
or (
)
def _mssql_json(self, config):
- if not sqla_compat.sqla_14:
- return False
- else:
- return exclusions.against(config, "mssql")
+ return exclusions.against(config, "mssql")
def _sqlite_json(self, config):
- if not sqla_compat.sqla_14:
- return False
- elif not exclusions.against(config, "sqlite >= 3.9"):
+ if not exclusions.against(config, "sqlite >= 3.9"):
return False
else:
with config.db.connect() as conn:
def identity_columns(self):
# TODO: in theory if these could come from SQLAlchemy dialects
# that would be helpful
- return self.identity_columns_api + exclusions.only_on(
+ return exclusions.only_on(
["postgresql >= 10", "oracle >= 12", "mssql"]
)
def identity_columns_alter(self):
# TODO: in theory if these could come from SQLAlchemy dialects
# that would be helpful
- return self.identity_columns_api + exclusions.only_on(
- ["postgresql >= 10", "oracle >= 12"]
- )
+ return exclusions.only_on(["postgresql >= 10", "oracle >= 12"])
@property
def legacy_engine(self):
eq_(diffs, [])
@config.requirements.covering_indexes
- @config.requirements.sqlalchemy_14
def test_nothing_changed_covering_index(self):
m1 = MetaData()
m2 = MetaData()
")",
)
- @config.requirements.computed_columns_api
def test_render_add_column_computed(self):
c = sa.Computed("5")
op_obj = ops.AddColumnOp("foo", Column("x", Integer, c))
"sa.Computed('5', ), nullable=True))",
)
- @config.requirements.computed_columns_api
@testing.combinations((True,), (False,))
def test_render_add_column_computed_persisted(self, persisted):
op_obj = ops.AddColumnOp(
"sa.Computed('5', persisted=%s), nullable=True))" % persisted,
)
- @config.requirements.computed_columns_api
def test_render_alter_column_computed_modify_default(self):
op_obj = ops.AlterColumnOp(
"sometable", "somecolumn", modify_server_default=sa.Computed("7")
"server_default=sa.Computed('7', ))",
)
- @config.requirements.computed_columns_api
def test_render_alter_column_computed_existing_default(self):
op_obj = ops.AlterColumnOp(
"sometable",
"existing_server_default=sa.Computed('42', ))",
)
- @config.requirements.computed_columns_api
@testing.combinations((True,), (False,))
def test_render_alter_column_computed_modify_default_persisted(
self, persisted
"=sa.Computed('7', persisted=%s))" % persisted,
)
- @config.requirements.computed_columns_api
@testing.combinations((True,), (False,))
def test_render_alter_column_computed_existing_default_persisted(
self, persisted
),
)
- @config.requirements.identity_columns_api
@identity_comb
def test_render_add_column_identity(self, kw, text):
col = Column("x", Integer, sa.Identity(**kw))
"%s, nullable=%r))" % (text, col.nullable),
)
- @config.requirements.identity_columns_api
@identity_comb
def test_render_alter_column_add_identity(self, kw, text):
op_obj = ops.AlterColumnOp(
"server_default=%s)" % text,
)
- @config.requirements.identity_columns_api
def test_render_alter_column_drop_identity(self):
op_obj = ops.AlterColumnOp(
"foo",
from sqlalchemy import Boolean
from sqlalchemy import CheckConstraint
from sqlalchemy import Column
+from sqlalchemy import Computed
from sqlalchemy import DateTime
from sqlalchemy import Enum
from sqlalchemy import ForeignKey
from sqlalchemy import ForeignKeyConstraint
from sqlalchemy import func
+from sqlalchemy import Identity
from sqlalchemy import Index
from sqlalchemy import inspect
from sqlalchemy import Integer
from sqlalchemy import JSON
from sqlalchemy import MetaData
from sqlalchemy import PrimaryKeyConstraint
+from sqlalchemy import select
from sqlalchemy import String
from sqlalchemy import Table
from sqlalchemy import Text
from alembic.util import exc as alembic_exc
from alembic.util.sqla_compat import _NONE_NAME
from alembic.util.sqla_compat import _safe_commit_connection_transaction
-from alembic.util.sqla_compat import _select
-from alembic.util.sqla_compat import has_computed
-from alembic.util.sqla_compat import has_identity
-from alembic.util.sqla_compat import sqla_14
-
-if has_computed:
- from alembic.util.sqla_compat import Computed
-
-if has_identity:
- from alembic.util.sqla_compat import Identity
class BatchApplyTest(TestBase):
"schema": args["schema"],
"name": name,
"type": impl.new_table.c[name].type,
- "cast_label": name if sqla_14 else "anon_1",
+ "cast_label": name,
}
if (
impl.new_table.c[name].type._type_affinity
"data INTEGER, x INTEGER, toj JSON, fromj TEXT, PRIMARY KEY (id))",
"INSERT INTO _alembic_tmp_foo (id, data, x, toj, fromj) "
"SELECT foo.id, "
- "CAST(foo.data AS INTEGER) AS %s, foo.x, foo.toj, "
- "CAST(foo.fromj AS TEXT) AS %s FROM foo"
- % (
- ("data" if sqla_14 else "anon_1"),
- ("fromj" if sqla_14 else "anon_2"),
- ),
+ "CAST(foo.data AS INTEGER) AS data, foo.x, foo.toj, "
+ "CAST(foo.fromj AS TEXT) AS fromj FROM foo",
"DROP TABLE foo",
"ALTER TABLE _alembic_tmp_foo RENAME TO foo",
)
"CREATE TABLE _alembic_tmp_foo (id INTEGER NOT NULL, "
"data VARCHAR(50), x INTEGER, y INTEGER, PRIMARY KEY (id))",
"INSERT INTO _alembic_tmp_foo (id, data, x, y) SELECT foo.id, "
- "foo.data, foo.x, CAST(foo.y AS INTEGER) AS %s FROM foo"
- % (("y" if sqla_14 else "anon_1"),),
+ "foo.data, foo.x, CAST(foo.y AS INTEGER) AS y FROM foo",
"DROP TABLE foo",
"ALTER TABLE _alembic_tmp_foo RENAME TO foo",
)
"data VARCHAR(50), x INTEGER, y BOOLEAN, PRIMARY KEY (id), "
"CONSTRAINT ck1 CHECK (y IN (0, 1)))",
"INSERT INTO _alembic_tmp_foo (id, data, x, y) SELECT foo.id, "
- "foo.data, foo.x, CAST(foo.y AS BOOLEAN) AS %s FROM foo"
- % (("y" if sqla_14 else "anon_1"),),
+ "foo.data, foo.x, CAST(foo.y AS BOOLEAN) AS y FROM foo",
"DROP TABLE foo",
"ALTER TABLE _alembic_tmp_foo RENAME TO foo",
)
"CREATE TABLE _alembic_tmp_foo (id INTEGER NOT NULL, "
"data INTEGER, x INTEGER, PRIMARY KEY (id))",
"INSERT INTO _alembic_tmp_foo (id, data, x) SELECT foo.id, "
- "CAST(foo.data AS INTEGER) AS %s, foo.x FROM foo"
- % (("data" if sqla_14 else "anon_1"),),
+ "CAST(foo.data AS INTEGER) AS data, foo.x FROM foo",
"DROP TABLE foo",
"ALTER TABLE _alembic_tmp_foo RENAME TO foo",
"CREATE UNIQUE INDEX ix_data ON foo (data)",
batch_op.alter_column("x", type_=DateTime())
eq_(
- self.conn.execute(_select(t.c.x)).fetchall(),
+ self.conn.execute(select(t.c.x)).fetchall(),
[(datetime.datetime(2012, 5, 18, 15, 32, 5),)],
)
with self.conn.begin():
self.conn.execute(t.insert())
- res = self.conn.execute(_select(t.c.x))
- if sqla_14:
- assert res.scalar_one_or_none() is not None
- else:
- row = res.fetchone()
- assert row["x"] is not None
+ res = self.conn.execute(select(t.c.x))
+ assert res.scalar_one_or_none() is not None
def test_drop_col_schematype(self):
self._boolean_fixture()
def _assert_data(self, data, tablename="foo"):
res = self.conn.execute(text("select * from %s" % tablename))
- if sqla_14:
- res = res.mappings()
+ res = res.mappings()
eq_([dict(row) for row in res], data)
def test_ix_existing(self):
from sqlalchemy import CheckConstraint
from sqlalchemy import Column
+from sqlalchemy import Computed
from sqlalchemy import exc
from sqlalchemy import ForeignKey
+from sqlalchemy import Identity
from sqlalchemy import inspect
from sqlalchemy import Integer
from sqlalchemy import MetaData
from alembic.testing.fixtures import capture_context_buffer
from alembic.testing.fixtures import op_fixture
from alembic.testing.fixtures import TestBase
-from alembic.util import sqla_compat
class FullEnvironmentTests(TestBase):
"exec('alter table t drop constraint ' + @const_name)"
)
- @config.requirements.computed_columns_api
def test_add_column_computed(self):
context = op_fixture("mssql")
op.add_column(
"t1",
- Column("some_column", Integer, sqla_compat.Computed("foo * 5")),
+ Column("some_column", Integer, Computed("foo * 5")),
)
context.assert_("ALTER TABLE t1 ADD some_column AS (foo * 5)")
)
@combinations(
- (lambda: sqla_compat.Computed("foo * 5"), lambda: None),
- (lambda: None, lambda: sqla_compat.Computed("foo * 5")),
+ (lambda: Computed("foo * 5"), lambda: None),
+ (lambda: None, lambda: Computed("foo * 5")),
(
- lambda: sqla_compat.Computed("foo * 42"),
- lambda: sqla_compat.Computed("foo * 5"),
+ lambda: Computed("foo * 42"),
+ lambda: Computed("foo * 5"),
),
)
@config.requirements.computed_columns
context = op_fixture("mssql")
op.add_column(
"t1",
- Column("some_column", Integer, sqla_compat.Identity(**kw)),
+ Column("some_column", Integer, Identity(**kw)),
)
if "start" in kw or "increment" in kw:
options = "(%s,%s)" % (
)
@combinations(
- (lambda: sqla_compat.Identity(), lambda: None),
- (lambda: None, lambda: sqla_compat.Identity()),
+ (lambda: Identity(), lambda: None),
+ (lambda: None, lambda: Identity()),
(
- lambda: sqla_compat.Identity(),
- lambda: sqla_compat.Identity(),
+ lambda: Identity(),
+ lambda: Identity(),
),
)
@config.requirements.identity_columns
from sqlalchemy import Boolean
from sqlalchemy import Column
+from sqlalchemy import Computed
from sqlalchemy import DATETIME
from sqlalchemy import exc
from sqlalchemy import Float
from sqlalchemy import func
+from sqlalchemy import Identity
from sqlalchemy import inspect
from sqlalchemy import Integer
from sqlalchemy import MetaData
from alembic.testing.fixtures import AlterColRoundTripFixture
from alembic.testing.fixtures import op_fixture
from alembic.testing.fixtures import TestBase
-from alembic.util import sqla_compat
class MySQLOpTest(TestBase):
op.drop_table_comment("t2", existing_comment="t2 table", schema="foo")
context.assert_("ALTER TABLE foo.t2 COMMENT ''")
- @config.requirements.computed_columns_api
def test_add_column_computed(self):
context = op_fixture("mysql")
op.add_column(
"t1",
- Column("some_column", Integer, sqla_compat.Computed("foo * 5")),
+ Column("some_column", Integer, Computed("foo * 5")),
)
context.assert_(
"ALTER TABLE t1 ADD COLUMN some_column "
)
@combinations(
- (lambda: sqla_compat.Computed("foo * 5"), lambda: None),
- (lambda: None, lambda: sqla_compat.Computed("foo * 5")),
+ (lambda: Computed("foo * 5"), lambda: None),
+ (lambda: None, lambda: Computed("foo * 5")),
(
- lambda: sqla_compat.Computed("foo * 42"),
- lambda: sqla_compat.Computed("foo * 5"),
+ lambda: Computed("foo * 42"),
+ lambda: Computed("foo * 5"),
),
)
- @config.requirements.computed_columns_api
def test_alter_column_computed_not_supported(self, sd, esd):
op_fixture("mysql")
assert_raises_message(
)
@combinations(
- (lambda: sqla_compat.Identity(), lambda: None),
- (lambda: None, lambda: sqla_compat.Identity()),
- (
- lambda: sqla_compat.Identity(),
- lambda: sqla_compat.Identity(),
- ),
+ (lambda: Identity(), lambda: None),
+ (lambda: None, lambda: Identity()),
+ (lambda: Identity(), lambda: Identity()),
)
- @config.requirements.identity_columns_api
def test_alter_column_identity_not_supported(self, sd, esd):
op_fixture()
assert_raises_message(
insp = inspect(self.bind)
cols = insp.get_columns(t1.name)
refl = Table(t1.name, MetaData())
- sqla_compat._reflect_table(insp, refl)
+ insp.reflect_table(refl, include_columns=None)
ctx = self.autogen_context["context"]
return ctx.impl.compare_server_default(
refl.c[cols[0]["name"]], col, rendered, cols[0]["default"]
from sqlalchemy import Boolean
from sqlalchemy import CheckConstraint
from sqlalchemy import Column
+from sqlalchemy import Computed
from sqlalchemy import event
from sqlalchemy import exc
from sqlalchemy import ForeignKey
+from sqlalchemy import Identity
from sqlalchemy import Index
from sqlalchemy import Integer
from sqlalchemy import MetaData
context.assert_("ALTER TABLE foo.t ALTER COLUMN c DROP DEFAULT")
@combinations(
- (lambda: sqla_compat.Computed("foo * 5"), lambda: None),
- (lambda: None, lambda: sqla_compat.Computed("foo * 5")),
+ (lambda: Computed("foo * 5"), lambda: None),
+ (lambda: None, lambda: Computed("foo * 5")),
(
- lambda: sqla_compat.Computed("foo * 42"),
- lambda: sqla_compat.Computed("foo * 5"),
+ lambda: Computed("foo * 42"),
+ lambda: Computed("foo * 5"),
),
)
- @config.requirements.computed_columns_api
def test_alter_column_computed_not_supported(self, sd, esd):
op_fixture()
assert_raises_message(
)
@combinations(
- (lambda: sqla_compat.Identity(), lambda: None),
- (lambda: None, lambda: sqla_compat.Identity()),
- (
- lambda: sqla_compat.Identity(),
- lambda: sqla_compat.Identity(),
- ),
+ (lambda: Identity(), lambda: None),
+ (lambda: None, lambda: Identity()),
+ (lambda: Identity(), lambda: Identity()),
)
- @config.requirements.identity_columns_api
def test_alter_column_identity_not_supported(self, sd, esd):
op_fixture()
assert_raises_message(
op.create_index("ik_test", "t1", ["foo", "bar"])
context.assert_("CREATE INDEX ik_test ON t1 (foo, bar)")
- @config.requirements.sqlalchemy_14
def test_create_index_if_not_exists(self):
context = op_fixture()
op.create_index("ik_test", "t1", ["foo", "bar"], if_not_exists=True)
op.drop_index("ik_test", schema="foo")
context.assert_("DROP INDEX foo.ik_test")
- @config.requirements.sqlalchemy_14
def test_drop_index_if_exists(self):
context = op_fixture()
op.drop_index("ik_test", if_exists=True)
op.drop_table("tb_test", schema="foo")
context.assert_("DROP TABLE foo.tb_test")
- @config.requirements.sqlalchemy_14
def test_drop_table_if_exists(self):
context = op_fixture()
op.drop_table("tb_test", if_exists=True)
"FOREIGN KEY(foo_bar) REFERENCES foo (bar))"
)
- @config.requirements.sqlalchemy_14
def test_create_table_if_not_exists(self):
context = op_fixture()
op.create_table(
("after_drop", "tb_test"),
]
- @config.requirements.sqlalchemy_14
def test_run_async_error(self):
op_fixture()
import sqlalchemy as sa
from sqlalchemy import Column
+from sqlalchemy import Computed
from sqlalchemy import exc
+from sqlalchemy import Identity
from sqlalchemy import Integer
from sqlalchemy import MetaData
from sqlalchemy import Table
context = op_fixture("oracle")
op.add_column(
"t1",
- Column("some_column", Integer, sqla_compat.Computed("foo * 5")),
+ Column("some_column", Integer, Computed("foo * 5")),
)
context.assert_(
"ALTER TABLE t1 ADD some_column "
)
@combinations(
- (lambda: sqla_compat.Computed("foo * 5"), lambda: None),
- (lambda: None, lambda: sqla_compat.Computed("foo * 5")),
+ (lambda: Computed("foo * 5"), lambda: None),
+ (lambda: None, lambda: Computed("foo * 5")),
(
- lambda: sqla_compat.Computed("foo * 42"),
- lambda: sqla_compat.Computed("foo * 5"),
+ lambda: Computed("foo * 42"),
+ lambda: Computed("foo * 5"),
),
)
@config.requirements.computed_columns
Column(
"some_column",
Integer,
- sqla_compat.Identity(**self._adapt_identity_kw(kw)),
+ Identity(**self._adapt_identity_kw(kw)),
),
)
qualification = self._identity_qualification(kw)
op.alter_column(
"t1",
"some_column",
- server_default=sqla_compat.Identity(**self._adapt_identity_kw(kw)),
+ server_default=Identity(**self._adapt_identity_kw(kw)),
existing_server_default=None,
)
qualification = self._identity_qualification(kw)
"t1",
"some_column",
server_default=None,
- existing_server_default=sqla_compat.Identity(),
+ existing_server_default=Identity(),
)
context.assert_("ALTER TABLE t1 MODIFY some_column DROP IDENTITY")
op.alter_column(
"t1",
"some_column",
- server_default=sqla_compat.Identity(
- **self._adapt_identity_kw(updated)
- ),
- existing_server_default=sqla_compat.Identity(
+ server_default=Identity(**self._adapt_identity_kw(updated)),
+ existing_server_default=Identity(
**self._adapt_identity_kw(existing)
),
)
Column(
"id",
Integer,
- sqla_compat.Identity(
+ Identity(
**self._adapt_identity_kw(
dict(start=2, oracle_on_null=True)
)
Column(
"id",
Integer,
- sa.Identity(
+ Identity(
**self._adapt_identity_kw(
dict(start=2, oracle_on_null=False)
)
from sqlalchemy import BigInteger
from sqlalchemy import Boolean
from sqlalchemy import Column
+from sqlalchemy import Computed
from sqlalchemy import DateTime
from sqlalchemy import exc
from sqlalchemy import Float
from sqlalchemy import func
+from sqlalchemy import Identity
from sqlalchemy import Index
from sqlalchemy import inspect
from sqlalchemy import Integer
from alembic.testing.fixtures import TablesTest
from alembic.testing.fixtures import TestBase
from alembic.testing.suite._autogen_fixtures import AutogenFixtureTest
-from alembic.util import sqla_compat
class PostgresqlOpTest(TestBase):
"CREATE INDEX CONCURRENTLY geocoded ON locations (coordinates)"
)
- @config.requirements.sqlalchemy_14
def test_create_index_postgresql_include(self):
context = op_fixture("postgresql")
op.create_index(
op.create_index("i", "t", ["c1", "c2"], unique=False)
context.assert_("CREATE INDEX i ON t (c1, c2)")
- @config.requirements.sqlalchemy_14
def test_create_index_postgresql_if_not_exists(self):
context = op_fixture("postgresql")
op.create_index("i", "t", ["c1", "c2"], if_not_exists=True)
op.drop_index("geocoded", postgresql_concurrently=True)
context.assert_("DROP INDEX CONCURRENTLY geocoded")
- @config.requirements.sqlalchemy_14
def test_drop_index_postgresql_if_exists(self):
context = op_fixture("postgresql")
op.drop_index("geocoded", if_exists=True)
context = op_fixture("postgresql")
op.add_column(
"t1",
- Column("some_column", Integer, sqla_compat.Computed("foo * 5")),
+ Column("some_column", Integer, Computed("foo * 5")),
)
context.assert_(
"ALTER TABLE t1 ADD COLUMN some_column "
)
@combinations(
- (lambda: sqla_compat.Computed("foo * 5"), lambda: None),
- (lambda: None, lambda: sqla_compat.Computed("foo * 5")),
+ (lambda: Computed("foo * 5"), lambda: None),
+ (lambda: None, lambda: Computed("foo * 5")),
(
- lambda: sqla_compat.Computed("foo * 42"),
- lambda: sqla_compat.Computed("foo * 5"),
+ lambda: Computed("foo * 42"),
+ lambda: Computed("foo * 5"),
),
)
@config.requirements.computed_columns
context = op_fixture("postgresql")
op.add_column(
"t1",
- Column("some_column", Integer, sqla_compat.Identity(**kw)),
+ Column("some_column", Integer, Identity(**kw)),
)
qualification = "ALWAYS" if kw.get("always", False) else "BY DEFAULT"
options = " (%s)" % text if text else ""
op.alter_column(
"t1",
"some_column",
- server_default=sqla_compat.Identity(**kw),
+ server_default=Identity(**kw),
existing_server_default=None,
)
qualification = "ALWAYS" if kw.get("always", False) else "BY DEFAULT"
"t1",
"some_column",
server_default=None,
- existing_server_default=sqla_compat.Identity(),
+ existing_server_default=Identity(),
)
context.assert_(
"ALTER TABLE t1 ALTER COLUMN some_column DROP IDENTITY"
op.alter_column(
"t1",
"some_column",
- server_default=sqla_compat.Identity(**updated),
- existing_server_default=sqla_compat.Identity(**existing),
+ server_default=Identity(**updated),
+ existing_server_default=Identity(**existing),
)
context.assert_("ALTER TABLE t1 ALTER COLUMN some_column %s" % text)
from alembic.script import ScriptDirectory
from alembic.testing import assert_raises_message
from alembic.testing import assertions
-from alembic.testing import config
from alembic.testing import eq_
from alembic.testing import expect_raises_message
from alembic.testing import mock
@staticmethod
def _branched_connection_env():
- if config.requirements.sqlalchemy_14.enabled:
- connect_warning = (
- 'r"The Connection.connect\\(\\) method is considered legacy"'
- )
- close_warning = (
- 'r"The .close\\(\\) method on a '
- "so-called 'branched' connection\""
- )
- else:
- connect_warning = close_warning = ""
+ connect_warning = (
+ 'r"The Connection.connect\\(\\) method is considered legacy"'
+ )
+ close_warning = (
+ 'r"The .close\\(\\) method on a '
+ "so-called 'branched' connection\""
+ )
env_file_fixture(
textwrap.dedent(
with self._patch_environment(
transactional_ddl=False, transaction_per_migration=False
):
- if config.requirements.sqlalchemy_14.enabled:
- if self.is_sqlalchemy_future:
- with testing.expect_raises_message(
- sa.exc.InvalidRequestError,
- r".*already",
- ):
- command.upgrade(self.cfg, c)
- else:
- with testing.expect_sqlalchemy_deprecated_20(
- r"Calling .begin\(\) when a transaction "
- "is already begun"
- ):
- command.upgrade(self.cfg, c)
+ if self.is_sqlalchemy_future:
+ with testing.expect_raises_message(
+ sa.exc.InvalidRequestError,
+ r".*already",
+ ):
+ command.upgrade(self.cfg, c)
else:
- command.upgrade(self.cfg, c)
+ with testing.expect_sqlalchemy_deprecated_20(
+ r"Calling .begin\(\) when a transaction "
+ "is already begun"
+ ):
+ command.upgrade(self.cfg, c)
def test_raise_when_rev_leaves_open_transaction_tpm(self):
a, b, c = self._opened_transaction_fixture()
with self._patch_environment(
transactional_ddl=False, transaction_per_migration=True
):
- if config.requirements.sqlalchemy_14.enabled:
- if self.is_sqlalchemy_future:
- with testing.expect_raises_message(
- sa.exc.InvalidRequestError,
- r".*already",
- ):
- command.upgrade(self.cfg, c)
- else:
- with testing.expect_sqlalchemy_deprecated_20(
- r"Calling .begin\(\) when a transaction is "
- "already begun"
- ):
- command.upgrade(self.cfg, c)
+ if self.is_sqlalchemy_future:
+ with testing.expect_raises_message(
+ sa.exc.InvalidRequestError,
+ r".*already",
+ ):
+ command.upgrade(self.cfg, c)
else:
- command.upgrade(self.cfg, c)
+ with testing.expect_sqlalchemy_deprecated_20(
+ r"Calling .begin\(\) when a transaction is "
+ "already begun"
+ ):
+ command.upgrade(self.cfg, c)
def test_noerr_rev_leaves_open_transaction_transactional_ddl(self):
a, b, c = self._opened_transaction_fixture()
with self._patch_environment(
transactional_ddl=True, transaction_per_migration=False
):
- if config.requirements.sqlalchemy_14.enabled:
- if self.is_sqlalchemy_future:
- with testing.expect_raises_message(
- sa.exc.InvalidRequestError,
- r".*already",
- ):
- command.upgrade(self.cfg, c)
- else:
- with testing.expect_sqlalchemy_deprecated_20(
- r"Calling .begin\(\) when a transaction "
- "is already begun"
- ):
- command.upgrade(self.cfg, c)
+ if self.is_sqlalchemy_future:
+ with testing.expect_raises_message(
+ sa.exc.InvalidRequestError,
+ r".*already",
+ ):
+ command.upgrade(self.cfg, c)
else:
- command.upgrade(self.cfg, c)
+ with testing.expect_sqlalchemy_deprecated_20(
+ r"Calling .begin\(\) when a transaction "
+ "is already begun"
+ ):
+ command.upgrade(self.cfg, c)
def test_noerr_transaction_opened_externally(self):
a, b, c = self._opened_transaction_fixture()
[testenv]
cov_args=--cov=alembic --cov-report term --cov-report xml
-deps=pytest>4.6,<8.2
+deps=pytest>4.6,<8.4
pytest-xdist
- sqla13: pytest<7
- sqla13: {[tox]SQLA_REPO}@rel_1_3#egg=sqlalchemy
sqla14: {[tox]SQLA_REPO}@rel_1_4#egg=sqlalchemy
sqla20: {[tox]SQLA_REPO}@rel_2_0#egg=sqlalchemy
sqlamain: {[tox]SQLA_REPO}#egg=sqlalchemy
oracle: cx_oracle>=7
mssql: pyodbc
cov: pytest-cov
- sqlalchemy: sqlalchemy>=1.3.0
+ sqlalchemy: sqlalchemy>=1.4.0
mako
- backports.zoneinfo;python_version<"3.9"
tzdata
zimports
- black==24.10.0;python_version>"3.8"
+ black==24.10.0
greenlet>=1
setenv=
BASECOMMAND=python -m pytest {tty:--color=yes} --rootdir {toxinidir}
WORKERS={env:TOX_WORKERS:-n4}
- sqla079: WORKERS=--dropfirst
cov: COVERAGE={[testenv]cov_args}
sqlite: SQLITE={env:TOX_SQLITE:--db sqlite}
postgresql: POSTGRESQL={env:TOX_POSTGRESQL:--db postgresql}
pydocstyle<4.0.0
# used by flake8-rst-docstrings
pygments
- black==24.10.0;python_version>"3.8"
+ black==24.10.0
commands =
flake8 ./alembic/ ./tests/ setup.py docs/build/conf.py {posargs}
black --check setup.py tests alembic
sqlalchemy>=2
mako
zimports
- black==24.10.0;python_version>"3.8"
+ black==24.10.0
commands = python tools/write_pyi.py