rendered_metadata_default = "'%s'" % rendered_metadata_default
return not self.connection.scalar(
- "SELECT %s = %s" % (conn_col_default, rendered_metadata_default)
+ text(
+ "SELECT %s = %s"
+ % (conn_col_default, rendered_metadata_default)
+ )
)
def alter_column(
r"nextval\('(.+?)'::regclass\)", column_info["default"]
)
if seq_match:
- info = inspector.bind.execute(
+ info = sqla_compat._exec_on_inspector(
+ inspector,
text(
"select c.relname, a.attname "
"from pg_class as c join "
from .. import ddl
from .. import util
+from ..util import sqla_compat
from ..util.compat import callable
from ..util.compat import EncodedIO
"got %r" % connection,
stacklevel=3,
)
+
dialect = connection.dialect
elif url:
url = sqla_url.make_url(url)
self.connection.execute(self._version.delete())
def _has_version_table(self):
- return self.connection.dialect.has_table(
+ return sqla_compat._connectable_has_table(
self.connection, self.version_table, self.version_table_schema
)
from sqlalchemy.testing import engines # noqa
from sqlalchemy.testing import mock # noqa
from sqlalchemy.testing import provide_metadata # noqa
+from sqlalchemy.testing import uses_deprecated # noqa
from sqlalchemy.testing.config import requirements as requires # noqa
from alembic import util # noqa
"once", category=pytest.PytestDeprecationWarning
)
+ from sqlalchemy import exc
+
+ if hasattr(exc, "RemovedIn20Warning"):
+ warnings.filterwarnings(
+ "error",
+ category=exc.RemovedIn20Warning,
+ message=".*Engine.execute",
+ )
+ warnings.filterwarnings(
+ "error",
+ category=exc.RemovedIn20Warning,
+ message=".*Passing a string",
+ )
+
# override selected SQLAlchemy pytest hooks with vendored functionality
def stop_test_class(cls):
"SQLAlchemy 1.3 or greater required",
)
+ @property
+ def sqlalchemy_14(self):
+ return exclusions.skip_if(
+ lambda config: not util.sqla_14,
+ "SQLAlchemy 1.4 or greater required",
+ )
+
@property
def sqlalchemy_1115(self):
return exclusions.skip_if(
from .sqla_compat import sqla_120 # noqa
from .sqla_compat import sqla_1216 # noqa
from .sqla_compat import sqla_13 # noqa
+from .sqla_compat import sqla_14 # noqa
if not sqla_110:
import re
from sqlalchemy import __version__
+from sqlalchemy import inspect
from sqlalchemy import schema
from sqlalchemy import sql
from sqlalchemy import types as sqltypes
AUTOINCREMENT_DEFAULT = "auto"
+def _connectable_has_table(connectable, tablename, schemaname):
+ if sqla_14:
+ return inspect(connectable).has_table(tablename, schemaname)
+ else:
+ return connectable.dialect.has_table(
+ connectable, tablename, schemaname
+ )
+
+
+def _exec_on_inspector(inspector, statement, **params):
+ if sqla_14:
+ with inspector._operation_context() as conn:
+ return conn.execute(statement, params)
+ else:
+ return inspector.bind.execute(statement, params)
+
+
def _server_default_is_computed(column):
if not has_computed:
return False
+from sqlalchemy import text
+
from alembic.testing import exclusions
from alembic.testing.requirements import SuiteRequirements
from alembic.util import sqla_compat
def check(config):
if not exclusions.against(config, "postgresql"):
return False
- count = config.db.scalar(
- "SELECT count(*) FROM pg_extension "
- "WHERE extname='%s'" % name
- )
+ with config.db.connect() as conn:
+ count = conn.scalar(
+ text(
+ "SELECT count(*) FROM pg_extension "
+ "WHERE extname='%s'" % name
+ )
+ )
return bool(count)
return exclusions.only_if(check, "needs %s extension" % name)
eq_(
[
dict(row)
- for row in self.conn.execute("select * from %s" % tablename)
+ for row in self.conn.execute(
+ text("select * from %s" % tablename)
+ )
],
data,
)
import re
from sqlalchemy import exc as sqla_exc
+from sqlalchemy import text
from alembic import command
from alembic import config
r2 = command.revision(self.cfg)
db = _sqlite_file_db()
command.upgrade(self.cfg, "head")
- assert_raises(
- sqla_exc.IntegrityError,
- db.execute,
- "insert into alembic_version values ('%s')" % r2.revision,
- )
+ with db.connect() as conn:
+ assert_raises(
+ sqla_exc.IntegrityError,
+ conn.execute,
+ text(
+ "insert into alembic_version values ('%s')" % r2.revision
+ ),
+ )
def test_err_correctly_raised_on_dupe_rows_no_pk(self):
self._env_fixture(version_table_pk=False)
r2 = command.revision(self.cfg)
db = _sqlite_file_db()
command.upgrade(self.cfg, "head")
- db.execute("insert into alembic_version values ('%s')" % r2.revision)
+ with db.connect() as conn:
+ conn.execute(
+ text("insert into alembic_version values ('%s')" % r2.revision)
+ )
assert_raises_message(
util.CommandError,
"Online migration expected to match one row when "
eng = _sqlite_file_db()
with eng.connect() as conn:
result = conn.execute(
- "update alembic_version set version_num='fake'"
+ text("update alembic_version set version_num='fake'")
)
eq_(result.rowcount, 1)
def test_stamp_creates_table(self):
command.stamp(self.cfg, "head")
- eq_(
- self.bind.scalar("select version_num from alembic_version"), self.b
- )
+ with self.bind.connect() as conn:
+ eq_(
+ conn.scalar(text("select version_num from alembic_version")),
+ self.b,
+ )
def test_stamp_existing_upgrade(self):
command.stamp(self.cfg, self.a)
command.stamp(self.cfg, self.b)
- eq_(
- self.bind.scalar("select version_num from alembic_version"), self.b
- )
+ with self.bind.connect() as conn:
+ eq_(
+ conn.scalar(text("select version_num from alembic_version")),
+ self.b,
+ )
def test_stamp_existing_downgrade(self):
command.stamp(self.cfg, self.b)
command.stamp(self.cfg, self.a)
- eq_(
- self.bind.scalar("select version_num from alembic_version"), self.a
- )
+ with self.bind.connect() as conn:
+ eq_(
+ conn.scalar(text("select version_num from alembic_version")),
+ self.a,
+ )
def test_stamp_version_already_there(self):
command.stamp(self.cfg, self.b)
command.stamp(self.cfg, self.b)
- eq_(
- self.bind.scalar("select version_num from alembic_version"), self.b
- )
+ with self.bind.connect() as conn:
+ eq_(
+ conn.scalar(text("select version_num from alembic_version")),
+ self.b,
+ )
class EditTest(TestBase):
#!coding: utf-8
from alembic import command
+from alembic import testing
from alembic.environment import EnvironmentContext
from alembic.migration import MigrationContext
from alembic.script import ScriptDirectory
command.upgrade(self.cfg, "arev", sql=True)
assert "do some SQL thing with a % percent sign %" in buf.getvalue()
+ @testing.uses_deprecated(
+ r"The Engine.execute\(\) function/method is considered legacy"
+ )
def test_warning_on_passing_engine(self):
env = self._fixture()
self.conn = conn = config.db.connect()
with conn.begin():
- conn.execute("CREATE TYPE mood AS ENUM ('sad', 'ok', 'happy');")
+ conn.execute(
+ text("CREATE TYPE mood AS ENUM ('sad', 'ok', 'happy')")
+ )
def tearDown(self):
with self.conn.begin():
- self.conn.execute("DROP TYPE mood")
+ self.conn.execute(text("DROP TYPE mood"))
def test_alter_enum(self):
context = MigrationContext.configure(connection=self.conn)
with context.begin_transaction(_per_migration=True):
with context.autocommit_block():
- context.execute("ALTER TYPE mood ADD VALUE 'soso'")
+ context.execute(text("ALTER TYPE mood ADD VALUE 'soso'"))
class PGOfflineEnumTest(TestBase):
@classmethod
def setup_class(cls):
cls.bind = config.db
- cls.bind.execute(
+ with config.db.connect() as conn:
+ conn.execute(
+ text(
+ """
+ create table tab (
+ col varchar(50)
+ )
"""
- create table tab (
- col varchar(50)
+ )
)
- """
- )
- cls.bind.execute(
+ conn.execute(
+ text(
+ """
+ insert into tab (col) values
+ ('old data 1'),
+ ('old data 2.1'),
+ ('old data 3')
"""
- insert into tab (col) values
- ('old data 1'),
- ('old data 2.1'),
- ('old data 3')
- """
- )
+ )
+ )
@classmethod
def teardown_class(cls):
- cls.bind.execute("drop table tab")
+ with cls.bind.connect() as conn:
+ conn.execute(text("drop table tab"))
def setUp(self):
self.conn = self.bind.connect()
)
eq_(
self.conn.execute(
- "select count(*) from tab where col='new data'"
+ text("select count(*) from tab where col='new data'")
).scalar(),
1,
)