From acc1b0614e22224680707099cf741f83de60e85f Mon Sep 17 00:00:00 2001 From: Mike Bayer Date: Thu, 9 Apr 2020 18:03:30 -0400 Subject: [PATCH] Modernize test_defaults Use modern execution patterns, goal is so that these same tests can work for the future engine break sequence tests into test_sequences suite sequence tests that are testing implicit execution patterns at least move into their own suite that will go into test_deprecations eventually. Change-Id: I27cac9bd265c86ff2a3381ff9f844f60ef991cfc (cherry picked from commit 4cadeaf6e68d71c2cb36219f72cc4d337e31df88) --- lib/sqlalchemy/sql/schema.py | 2 +- lib/sqlalchemy/testing/requirements.py | 18 + test/sql/test_defaults.py | 1572 +++++++++--------------- test/sql/test_sequences.py | 552 +++++++++ 4 files changed, 1125 insertions(+), 1019 deletions(-) create mode 100644 test/sql/test_sequences.py diff --git a/lib/sqlalchemy/sql/schema.py b/lib/sqlalchemy/sql/schema.py index 4d2cc3fee7..f5b10c16eb 100644 --- a/lib/sqlalchemy/sql/schema.py +++ b/lib/sqlalchemy/sql/schema.py @@ -2165,7 +2165,7 @@ class DefaultGenerator(_NotAColumnExpr, SchemaItem): def execute(self, bind=None, **kwargs): if bind is None: bind = _bind_or_error(self) - return bind._execute_default(self, **kwargs) + return bind.execute(self, **kwargs) def _execute_on_connection(self, connection, multiparams, params): return connection._execute_default(self, multiparams, params) diff --git a/lib/sqlalchemy/testing/requirements.py b/lib/sqlalchemy/testing/requirements.py index 8720dbae6d..515b18b156 100644 --- a/lib/sqlalchemy/testing/requirements.py +++ b/lib/sqlalchemy/testing/requirements.py @@ -422,6 +422,24 @@ class SuiteRequirements(Requirements): "no sequence support, or sequences not optional", ) + @property + def supports_lastrowid(self): + """target database / driver supports cursor.lastrowid as a means + of retrieving the last inserted primary key value. + + note that if the target DB supports sequences also, this is still + assumed to work. This is a new use case brought on by MariaDB 10.3. + + """ + return exclusions.only_if( + [lambda config: config.db.dialect.postfetch_lastrowid] + ) + + @property + def no_lastrowid_support(self): + """the opposite of supports_lastrowid""" + return exclusions.NotPredicate(self.supports_lastrowid) + @property def reflects_pk_names(self): return exclusions.closed() diff --git a/test/sql/test_defaults.py b/test/sql/test_defaults.py index 76ef38e1f1..cbed21e262 100644 --- a/test/sql/test_defaults.py +++ b/test/sql/test_defaults.py @@ -15,11 +15,7 @@ from sqlalchemy import Sequence from sqlalchemy import String from sqlalchemy import testing from sqlalchemy import Unicode -from sqlalchemy import util -from sqlalchemy.dialects import sqlite -from sqlalchemy.schema import CreateSequence from sqlalchemy.schema import CreateTable -from sqlalchemy.schema import DropSequence from sqlalchemy.sql import literal_column from sqlalchemy.sql import select from sqlalchemy.sql import text @@ -30,9 +26,6 @@ from sqlalchemy.testing import eq_ from sqlalchemy.testing import expect_warnings from sqlalchemy.testing import fixtures from sqlalchemy.testing import mock -from sqlalchemy.testing.assertsql import AllOf -from sqlalchemy.testing.assertsql import CompiledSQL -from sqlalchemy.testing.assertsql import EachOf from sqlalchemy.testing.schema import Column from sqlalchemy.testing.schema import Table from sqlalchemy.types import TypeDecorator @@ -41,9 +34,6 @@ from sqlalchemy.util import b from sqlalchemy.util import u -t = f = f2 = ts = currenttime = metadata = default_generator = None - - class DDLTest(fixtures.TestBase, AssertsCompiledSQL): __dialect__ = "default" @@ -142,159 +132,7 @@ class DDLTest(fixtures.TestBase, AssertsCompiledSQL): ) -class DefaultTest(fixtures.TestBase): - __backend__ = True - - @classmethod - def setup_class(cls): - global t, f, f2, ts, currenttime, metadata, default_generator - - db = testing.db - metadata = MetaData(db) - default_generator = {"x": 50} - - def mydefault(): - default_generator["x"] += 1 - return default_generator["x"] - - def myupdate_with_ctx(ctx): - conn = ctx.connection - return conn.execute(sa.select([sa.text("13")])).scalar() - - def mydefault_using_connection(ctx): - conn = ctx.connection - try: - return conn.execute(sa.select([sa.text("12")])).scalar() - finally: - # ensure a "close()" on this connection does nothing, - # since its a "branched" connection - conn.close() - - use_function_defaults = testing.against("postgresql", "mssql") - is_oracle = testing.against("oracle") - - class MyClass(object): - @classmethod - def gen_default(cls, ctx): - return "hi" - - class MyType(TypeDecorator): - impl = String(50) - - def process_bind_param(self, value, dialect): - if value is not None: - value = "BIND" + value - return value - - # select "count(1)" returns different results on different DBs also - # correct for "current_date" compatible as column default, value - # differences - currenttime = func.current_date(type_=sa.Date, bind=db) - if is_oracle: - ts = db.scalar( - sa.select( - [ - func.trunc( - func.current_timestamp(), - sa.literal_column("'DAY'"), - type_=sa.Date, - ) - ] - ) - ) - assert isinstance(ts, datetime.date) and not isinstance( - ts, datetime.datetime - ) - f = sa.select([func.length("abcdef")], bind=db).scalar() - f2 = sa.select([func.length("abcdefghijk")], bind=db).scalar() - # TODO: engine propagation across nested functions not working - currenttime = func.trunc( - currenttime, sa.literal_column("'DAY'"), bind=db, type_=sa.Date - ) - def1 = currenttime - def2 = func.trunc( - sa.text("current_timestamp"), - sa.literal_column("'DAY'"), - type_=sa.Date, - ) - - deftype = sa.Date - elif use_function_defaults: - f = sa.select([func.length("abcdef")], bind=db).scalar() - f2 = sa.select([func.length("abcdefghijk")], bind=db).scalar() - def1 = currenttime - deftype = sa.Date - if testing.against("mssql"): - def2 = sa.text("getdate()") - else: - def2 = sa.text("current_date") - ts = db.scalar(func.current_date()) - else: - f = len("abcdef") - f2 = len("abcdefghijk") - def1 = def2 = "3" - ts = 3 - deftype = Integer - - t = Table( - "default_test1", - metadata, - # python function - Column("col1", Integer, primary_key=True, default=mydefault), - # python literal - Column( - "col2", - String(20), - default="imthedefault", - onupdate="im the update", - ), - # preexecute expression - Column( - "col3", - Integer, - default=func.length("abcdef"), - onupdate=func.length("abcdefghijk"), - ), - # SQL-side default from sql expression - Column("col4", deftype, server_default=def1), - # SQL-side default from literal expression - Column("col5", deftype, server_default=def2), - # preexecute + update timestamp - Column("col6", sa.Date, default=currenttime, onupdate=currenttime), - Column("boolcol1", sa.Boolean, default=True), - Column("boolcol2", sa.Boolean, default=False), - # python function which uses ExecutionContext - Column( - "col7", - Integer, - default=mydefault_using_connection, - onupdate=myupdate_with_ctx, - ), - # python builtin - Column( - "col8", - sa.Date, - default=datetime.date.today, - onupdate=datetime.date.today, - ), - # combo - Column("col9", String(20), default="py", server_default="ddl"), - # python method w/ context - Column("col10", String(20), default=MyClass.gen_default), - # fixed default w/ type that has bound processor - Column("col11", MyType(), default="foo"), - ) - - t.create() - - @classmethod - def teardown_class(cls): - t.drop() - - def teardown(self): - default_generator["x"] = 50 - t.delete().execute() - +class DefaultObjectTest(fixtures.TestBase): def test_bad_arg_signature(self): ex_msg = ( "ColumnDefault Python function takes zero " @@ -361,50 +199,89 @@ class DefaultTest(fixtures.TestBase): c = sa.ColumnDefault(fn) c.arg("context") - @testing.fails_on("firebird", "Data type unknown") - def test_standalone(self): - c = testing.db.engine.connect() - x = c.execute(t.c.col1.default) - y = t.c.col2.default.execute() - z = c.execute(t.c.col3.default) - assert 50 <= x <= 57 - eq_(y, "imthedefault") - eq_(z, f) - eq_(f2, 11) - - def test_py_vs_server_default_detection(self): - def has_(name, *wanted): - slots = [ - "default", - "onupdate", - "server_default", - "server_onupdate", - ] - col = tbl.c[name] - for slot in wanted: - slots.remove(slot) - assert getattr(col, slot) is not None, getattr(col, slot) - for slot in slots: - assert getattr(col, slot) is None, getattr(col, slot) - - tbl = t - has_("col1", "default") - has_("col2", "default", "onupdate") - has_("col3", "default", "onupdate") - has_("col4", "server_default") - has_("col5", "server_default") - has_("col6", "default", "onupdate") - has_("boolcol1", "default") - has_("boolcol2", "default") - has_("col7", "default", "onupdate") - has_("col8", "default", "onupdate") - has_("col9", "default", "server_default") + def _check_default_slots(self, tbl, name, *wanted): + slots = [ + "default", + "onupdate", + "server_default", + "server_onupdate", + ] + col = tbl.c[name] + for slot in wanted: + slots.remove(slot) + assert getattr(col, slot) is not None, getattr(col, slot) + for slot in slots: + assert getattr(col, slot) is None, getattr(col, slot) + + def test_py_vs_server_default_detection_one(self): + has_ = self._check_default_slots + + metadata = MetaData() + tbl = Table( + "default_test", + metadata, + # python function + Column("col1", Integer, primary_key=True, default="1"), + # python literal + Column( + "col2", + String(20), + default="imthedefault", + onupdate="im the update", + ), + # preexecute expression + Column( + "col3", + Integer, + default=func.length("abcdef"), + onupdate=func.length("abcdefghijk"), + ), + # SQL-side default from sql expression + Column("col4", Integer, server_default="1"), + # SQL-side default from literal expression + Column("col5", Integer, server_default="1"), + # preexecute + update timestamp + Column( + "col6", + sa.Date, + default=datetime.datetime.today, + onupdate=datetime.datetime.today, + ), + Column("boolcol1", sa.Boolean, default=True), + Column("boolcol2", sa.Boolean, default=False), + # python function which uses ExecutionContext + Column("col7", Integer, default=lambda: 5, onupdate=lambda: 10,), + # python builtin + Column( + "col8", + sa.Date, + default=datetime.date.today, + onupdate=datetime.date.today, + ), + Column("col9", String(20), default="py", server_default="ddl"), + ) + + has_(tbl, "col1", "default") + has_(tbl, "col2", "default", "onupdate") + has_(tbl, "col3", "default", "onupdate") + has_(tbl, "col4", "server_default") + has_(tbl, "col5", "server_default") + has_(tbl, "col6", "default", "onupdate") + has_(tbl, "boolcol1", "default") + has_(tbl, "boolcol2", "default") + has_(tbl, "col7", "default", "onupdate") + has_(tbl, "col8", "default", "onupdate") + has_(tbl, "col9", "default", "server_default") + def test_py_vs_server_default_detection_two(self): + has_ = self._check_default_slots + + metadata = MetaData() ColumnDefault, DefaultClause = sa.ColumnDefault, sa.DefaultClause - t2 = Table( + tbl = Table( "t2", - MetaData(), + metadata, Column("col1", Integer, Sequence("foo")), Column( "col2", Integer, default=Sequence("foo"), server_default="y" @@ -444,40 +321,221 @@ class DefaultTest(fixtures.TestBase): onupdate="z", ), ) - tbl = t2 - has_("col1", "default") - has_("col2", "default", "server_default") - has_("col3", "default", "server_default") - has_("col4", "default", "server_default", "server_onupdate") - has_("col5", "default", "server_default", "onupdate") - has_("col6", "default", "server_default", "onupdate") - has_("col7", "default", "server_default", "onupdate") + has_(tbl, "col1", "default") + has_(tbl, "col2", "default", "server_default") + has_(tbl, "col3", "default", "server_default") + has_(tbl, "col4", "default", "server_default", "server_onupdate") + has_(tbl, "col5", "default", "server_default", "onupdate") + has_(tbl, "col6", "default", "server_default", "onupdate") + has_(tbl, "col7", "default", "server_default", "onupdate") has_( - "col8", "default", "server_default", "onupdate", "server_onupdate" + tbl, + "col8", + "default", + "server_default", + "onupdate", + "server_onupdate", ) - @testing.fails_on("firebird", "Data type unknown") - def test_insert(self): - r = t.insert().execute() + def test_no_embed_in_sql(self): + """Using a DefaultGenerator, Sequence, DefaultClause + in the columns, where clause of a select, or in the values + clause of insert, update, raises an informative error""" + + t = Table( + "some_table", + MetaData(), + Column("id", Integer), + Column("col4", String()), + ) + for const in ( + sa.Sequence("y"), + sa.ColumnDefault("y"), + sa.DefaultClause("y"), + ): + assert_raises_message( + sa.exc.ArgumentError, + "SQL expression object expected, got object of type " + "<.* 'list'> instead", + t.select, + [const], + ) + assert_raises_message( + sa.exc.InvalidRequestError, + "cannot be used directly as a column expression.", + str, + t.insert().values(col4=const), + ) + assert_raises_message( + sa.exc.InvalidRequestError, + "cannot be used directly as a column expression.", + str, + t.update().values(col4=const), + ) + + +class DefaultRoundTripTest(fixtures.TablesTest): + __backend__ = True + + @classmethod + def define_tables(cls, metadata): + default_generator = cls.default_generator = {"x": 50} + + def mydefault(): + default_generator["x"] += 1 + return default_generator["x"] + + def myupdate_with_ctx(ctx): + conn = ctx.connection + return conn.execute(sa.select([sa.text("13")])).scalar() + + def mydefault_using_connection(ctx): + conn = ctx.connection + return conn.execute(sa.select([sa.text("12")])).scalar() + + use_function_defaults = testing.against("postgresql", "mssql") + is_oracle = testing.against("oracle") + + class MyClass(object): + @classmethod + def gen_default(cls, ctx): + return "hi" + + class MyType(TypeDecorator): + impl = String(50) + + def process_bind_param(self, value, dialect): + if value is not None: + value = "BIND" + value + return value + + cls.f = 6 + cls.f2 = 11 + with testing.db.connect() as conn: + currenttime = cls.currenttime = func.current_date(type_=sa.Date) + if is_oracle: + ts = conn.scalar( + sa.select( + [ + func.trunc( + func.current_timestamp(), + sa.literal_column("'DAY'"), + type_=sa.Date, + ) + ] + ) + ) + currenttime = cls.currenttime = func.trunc( + currenttime, sa.literal_column("'DAY'"), type_=sa.Date + ) + def1 = currenttime + def2 = func.trunc( + sa.text("current_timestamp"), + sa.literal_column("'DAY'"), + type_=sa.Date, + ) + + deftype = sa.Date + elif use_function_defaults: + def1 = currenttime + deftype = sa.Date + if testing.against("mssql"): + def2 = sa.text("getdate()") + else: + def2 = sa.text("current_date") + ts = conn.scalar(func.current_date()) + else: + def1 = def2 = "3" + ts = 3 + deftype = Integer + + cls.ts = ts + + Table( + "default_test", + metadata, + # python function + Column("col1", Integer, primary_key=True, default=mydefault), + # python literal + Column( + "col2", + String(20), + default="imthedefault", + onupdate="im the update", + ), + # preexecute expression + Column( + "col3", + Integer, + default=func.length("abcdef"), + onupdate=func.length("abcdefghijk"), + ), + # SQL-side default from sql expression + Column("col4", deftype, server_default=def1), + # SQL-side default from literal expression + Column("col5", deftype, server_default=def2), + # preexecute + update timestamp + Column("col6", sa.Date, default=currenttime, onupdate=currenttime), + Column("boolcol1", sa.Boolean, default=True), + Column("boolcol2", sa.Boolean, default=False), + # python function which uses ExecutionContext + Column( + "col7", + Integer, + default=mydefault_using_connection, + onupdate=myupdate_with_ctx, + ), + # python builtin + Column( + "col8", + sa.Date, + default=datetime.date.today, + onupdate=datetime.date.today, + ), + # combo + Column("col9", String(20), default="py", server_default="ddl"), + # python method w/ context + Column("col10", String(20), default=MyClass.gen_default), + # fixed default w/ type that has bound processor + Column("col11", MyType(), default="foo"), + ) + + def teardown(self): + self.default_generator["x"] = 50 + super(DefaultRoundTripTest, self).teardown() + + def test_standalone(self, connection): + t = self.tables.default_test + x = connection.execute(t.c.col1.default) + y = connection.execute(t.c.col2.default) + z = connection.execute(t.c.col3.default) + assert 50 <= x <= 57 + eq_(y, "imthedefault") + eq_(z, self.f) + + def test_insert(self, connection): + t = self.tables.default_test + + r = connection.execute(t.insert()) assert r.lastrow_has_defaults() eq_( set(r.context.postfetch_cols), set([t.c.col3, t.c.col5, t.c.col4, t.c.col6]), ) - r = t.insert(inline=True).execute() + r = connection.execute(t.insert(inline=True)) assert r.lastrow_has_defaults() eq_( set(r.context.postfetch_cols), set([t.c.col3, t.c.col5, t.c.col4, t.c.col6]), ) - t.insert().execute() + connection.execute(t.insert()) - ctexec = sa.select( - [currenttime.label("now")], bind=testing.db + ctexec = connection.execute( + sa.select([self.currenttime.label("now")]) ).scalar() - result = t.select().order_by(t.c.col1).execute() + result = connection.execute(t.select().order_by(t.c.col1)) today = datetime.date.today() eq_( result.fetchall(), @@ -485,9 +543,9 @@ class DefaultTest(fixtures.TestBase): ( x, "imthedefault", - f, - ts, - ts, + self.f, + self.ts, + self.ts, ctexec, True, False, @@ -501,22 +559,26 @@ class DefaultTest(fixtures.TestBase): ], ) - t.insert().execute(col9=None) + connection.execute(t.insert(), dict(col9=None)) + + # TODO: why are we looking at 'r' when we just executed something + # else ? assert r.lastrow_has_defaults() + eq_( set(r.context.postfetch_cols), set([t.c.col3, t.c.col5, t.c.col4, t.c.col6]), ) eq_( - t.select(t.c.col1 == 54).execute().fetchall(), + list(connection.execute(t.select().where(t.c.col1 == 54))), [ ( 54, "imthedefault", - f, - ts, - ts, + self.f, + self.ts, + self.ts, ctexec, True, False, @@ -529,21 +591,23 @@ class DefaultTest(fixtures.TestBase): ], ) - def test_insertmany(self): - t.insert().execute({}, {}, {}) + def test_insertmany(self, connection): + t = self.tables.default_test + + connection.execute(t.insert(), [{}, {}, {}]) - ctexec = currenttime.scalar() - result = t.select().order_by(t.c.col1).execute() + ctexec = connection.scalar(self.currenttime) + result = connection.execute(t.select().order_by(t.c.col1)) today = datetime.date.today() eq_( - result.fetchall(), + list(result), [ ( 51, "imthedefault", - f, - ts, - ts, + self.f, + self.ts, + self.ts, ctexec, True, False, @@ -556,9 +620,9 @@ class DefaultTest(fixtures.TestBase): ( 52, "imthedefault", - f, - ts, - ts, + self.f, + self.ts, + self.ts, ctexec, True, False, @@ -571,9 +635,9 @@ class DefaultTest(fixtures.TestBase): ( 53, "imthedefault", - f, - ts, - ts, + self.f, + self.ts, + self.ts, ctexec, True, False, @@ -587,22 +651,23 @@ class DefaultTest(fixtures.TestBase): ) @testing.requires.multivalues_inserts - def test_insert_multivalues(self): + def test_insert_multivalues(self, connection): + t = self.tables.default_test - t.insert().values([{}, {}, {}]).execute() + connection.execute(t.insert().values([{}, {}, {}])) - ctexec = currenttime.scalar() - result = t.select().order_by(t.c.col1).execute() + ctexec = connection.execute(self.currenttime).scalar() + result = connection.execute(t.select().order_by(t.c.col1)) today = datetime.date.today() eq_( - result.fetchall(), + list(result), [ ( 51, "imthedefault", - f, - ts, - ts, + self.f, + self.ts, + self.ts, ctexec, True, False, @@ -615,9 +680,9 @@ class DefaultTest(fixtures.TestBase): ( 52, "imthedefault", - f, - ts, - ts, + self.f, + self.ts, + self.ts, ctexec, True, False, @@ -630,97 +695,67 @@ class DefaultTest(fixtures.TestBase): ( 53, "imthedefault", - f, - ts, - ts, - ctexec, - True, - False, - 12, - today, - "py", - "hi", - "BINDfoo", - ), - ], - ) - - def test_no_embed_in_sql(self): - """Using a DefaultGenerator, Sequence, DefaultClause - in the columns, where clause of a select, or in the values - clause of insert, update, raises an informative error""" - - for const in ( - sa.Sequence("y"), - sa.ColumnDefault("y"), - sa.DefaultClause("y"), - ): - assert_raises_message( - sa.exc.ArgumentError, - "SQL expression object expected, got object of type " - "<.* 'list'> instead", - t.select, - [const], - ) - assert_raises_message( - sa.exc.InvalidRequestError, - "cannot be used directly as a column expression.", - str, - t.insert().values(col4=const), - ) - assert_raises_message( - sa.exc.InvalidRequestError, - "cannot be used directly as a column expression.", - str, - t.update().values(col4=const), - ) + self.f, + self.ts, + self.ts, + ctexec, + True, + False, + 12, + today, + "py", + "hi", + "BINDfoo", + ), + ], + ) - def test_missing_many_param(self): + def test_missing_many_param(self, connection): + t = self.tables.default_test assert_raises_message( exc.StatementError, "A value is required for bind parameter 'col7', in parameter " "group 1", - t.insert().execute, + connection.execute, + t.insert(), {"col4": 7, "col7": 12, "col8": 19}, {"col4": 7, "col8": 19}, {"col4": 7, "col7": 12, "col8": 19}, ) - def test_insert_values(self): - t.insert(values={"col3": 50}).execute() - result = t.select().execute() + def test_insert_values(self, connection): + t = self.tables.default_test + connection.execute(t.insert().values(col3=50)) + result = connection.execute(t.select().order_by(t.c.col1)) eq_(50, result.first()["col3"]) - @testing.fails_on("firebird", "Data type unknown") - def test_updatemany(self): - # MySQL-Python 1.2.2 breaks functions in execute_many :( - if testing.against( - "mysql+mysqldb" - ) and testing.db.dialect.dbapi.version_info[:3] == (1, 2, 2): - return + def test_updatemany(self, connection): + t = self.tables.default_test - t.insert().execute({}, {}, {}) + connection.execute(t.insert(), [{}, {}, {}]) - t.update(t.c.col1 == sa.bindparam("pkval")).execute( - {"pkval": 51, "col7": None, "col8": None, "boolcol1": False} + connection.execute( + t.update().where(t.c.col1 == sa.bindparam("pkval")), + {"pkval": 51, "col7": None, "col8": None, "boolcol1": False}, ) - t.update(t.c.col1 == sa.bindparam("pkval")).execute( - {"pkval": 51}, {"pkval": 52}, {"pkval": 53} + connection.execute( + t.update().where(t.c.col1 == sa.bindparam("pkval")), + [{"pkval": 51}, {"pkval": 52}, {"pkval": 53}], ) - result = t.select().execute() - ctexec = currenttime.scalar() + ctexec = connection.scalar(self.currenttime) today = datetime.date.today() + result = connection.execute(t.select().order_by(t.c.col1)) eq_( - result.fetchall(), + list(result), [ ( 51, "im the update", - f2, - ts, - ts, + self.f2, + self.ts, + self.ts, ctexec, False, False, @@ -733,9 +768,9 @@ class DefaultTest(fixtures.TestBase): ( 52, "im the update", - f2, - ts, - ts, + self.f2, + self.ts, + self.ts, ctexec, True, False, @@ -748,9 +783,9 @@ class DefaultTest(fixtures.TestBase): ( 53, "im the update", - f2, - ts, - ts, + self.f2, + self.ts, + self.ts, ctexec, True, False, @@ -763,20 +798,22 @@ class DefaultTest(fixtures.TestBase): ], ) - @testing.fails_on("firebird", "Data type unknown") - def test_update(self): - r = t.insert().execute() + def test_update(self, connection): + t = self.tables.default_test + r = connection.execute(t.insert()) pk = r.inserted_primary_key[0] - t.update(t.c.col1 == pk).execute(col4=None, col5=None) - ctexec = currenttime.scalar() - result = t.select(t.c.col1 == pk).execute() + connection.execute( + t.update().where(t.c.col1 == pk), dict(col4=None, col5=None) + ) + ctexec = connection.scalar(self.currenttime) + result = connection.execute(t.select().where(t.c.col1 == pk)) result = result.first() eq_( result, ( pk, "im the update", - f2, + self.f2, None, None, ctexec, @@ -789,16 +826,15 @@ class DefaultTest(fixtures.TestBase): "BINDfoo", ), ) - eq_(11, f2) - @testing.fails_on("firebird", "Data type unknown") - def test_update_values(self): - r = t.insert().execute() + def test_update_values(self, connection): + t = self.tables.default_test + r = connection.execute(t.insert()) pk = r.inserted_primary_key[0] - t.update(t.c.col1 == pk, values={"col3": 55}).execute() - result = t.select(t.c.col1 == pk).execute() - result = result.first() - eq_(55, result["col3"]) + connection.execute(t.update().where(t.c.col1 == pk).values(col3=55)) + result = connection.execute(t.select().where(t.c.col1 == pk)) + row = result.first() + eq_(55, row["col3"]) class CTEDefaultTest(fixtures.TablesTest): @@ -823,79 +859,62 @@ class CTEDefaultTest(fixtures.TablesTest): Column("u", Integer, onupdate=1), ) - def _test_a_in_b(self, a, b): + @testing.combinations( + ("update", "select", testing.requires.ctes_on_dml), + ("update", "select", testing.requires.ctes_on_dml), + ("update", "select", testing.requires.ctes_on_dml), + ("select", "update"), + ("select", "insert"), + argnames="a, b", + ) + def test_a_in_b(self, a, b, connection): q = self.tables.q p = self.tables.p - with testing.db.connect() as conn: - if a == "delete": - conn.execute(q.insert().values(y=10, z=1)) - cte = q.delete().where(q.c.z == 1).returning(q.c.z).cte("c") - expected = None - elif a == "insert": - cte = q.insert().values(z=1, y=10).returning(q.c.z).cte("c") - expected = (2, 10) - elif a == "update": - conn.execute(q.insert().values(x=5, y=10, z=1)) - cte = ( - q.update() - .where(q.c.z == 1) - .values(x=7) - .returning(q.c.z) - .cte("c") - ) - expected = (7, 5) - elif a == "select": - conn.execute(q.insert().values(x=5, y=10, z=1)) - cte = sa.select([q.c.z]).cte("c") - expected = (5, 10) - - if b == "select": - conn.execute(p.insert().values(s=1)) - stmt = select([p.c.s, cte.c.z]) - elif b == "insert": - sel = select([1, cte.c.z]) - stmt = ( - p.insert() - .from_select(["s", "t"], sel) - .returning(p.c.s, p.c.t) - ) - elif b == "delete": - stmt = ( - p.insert().values(s=1, t=cte.c.z).returning(p.c.s, cte.c.z) - ) - elif b == "update": - conn.execute(p.insert().values(s=1)) - stmt = ( - p.update() - .values(t=5) - .where(p.c.s == cte.c.z) - .returning(p.c.u, cte.c.z) - ) - eq_(conn.execute(stmt).fetchall(), [(1, 1)]) - - eq_(conn.execute(select([q.c.x, q.c.y])).fetchone(), expected) - - @testing.requires.ctes_on_dml - def test_update_in_select(self): - self._test_a_in_b("update", "select") - - @testing.requires.ctes_on_dml - def test_delete_in_select(self): - self._test_a_in_b("update", "select") - - @testing.requires.ctes_on_dml - def test_insert_in_select(self): - self._test_a_in_b("update", "select") - - def test_select_in_update(self): - self._test_a_in_b("select", "update") - - def test_select_in_insert(self): - self._test_a_in_b("select", "insert") + conn = connection + if a == "delete": + conn.execute(q.insert().values(y=10, z=1)) + cte = q.delete().where(q.c.z == 1).returning(q.c.z).cte("c") + expected = None + elif a == "insert": + cte = q.insert().values(z=1, y=10).returning(q.c.z).cte("c") + expected = (2, 10) + elif a == "update": + conn.execute(q.insert().values(x=5, y=10, z=1)) + cte = ( + q.update() + .where(q.c.z == 1) + .values(x=7) + .returning(q.c.z) + .cte("c") + ) + expected = (7, 5) + elif a == "select": + conn.execute(q.insert().values(x=5, y=10, z=1)) + cte = sa.select([q.c.z]).cte("c") + expected = (5, 10) + + if b == "select": + conn.execute(p.insert().values(s=1)) + stmt = select([p.c.s, cte.c.z]).where(p.c.s == cte.c.z) + elif b == "insert": + sel = select([1, cte.c.z]) + stmt = ( + p.insert().from_select(["s", "t"], sel).returning(p.c.s, p.c.t) + ) + elif b == "delete": + stmt = p.insert().values(s=1, t=cte.c.z).returning(p.c.s, cte.c.z) + elif b == "update": + conn.execute(p.insert().values(s=1)) + stmt = ( + p.update() + .values(t=5) + .where(p.c.s == cte.c.z) + .returning(p.c.u, cte.c.z) + ) + eq_(list(conn.execute(stmt)), [(1, 1)]) - # TODO: updates / inserts can be run in one statement w/ CTE ? - # deletes? + eq_(conn.execute(select([q.c.x, q.c.y])).first(), expected) class PKDefaultTest(fixtures.TablesTest): @@ -1022,75 +1041,45 @@ class PKIncrementTest(fixtures.TablesTest): self._test_autoincrement(testing.db) def test_autoincrement_transaction(self): - con = testing.db.connect() - tx = con.begin() - try: - try: - self._test_autoincrement(con) - except Exception: - try: - tx.rollback() - except Exception: - pass - raise - else: - tx.commit() - finally: - con.close() + with testing.db.begin() as conn: + self._test_autoincrement(conn) class EmptyInsertTest(fixtures.TestBase): __backend__ = True - @testing.exclude("sqlite", "<", (3, 3, 8), "no empty insert support") @testing.fails_on("oracle", "FIXME: unknown") @testing.provide_metadata - def test_empty_insert(self): + def test_empty_insert(self, connection): t1 = Table( "t1", self.metadata, Column("is_true", Boolean, server_default=("1")), ) - self.metadata.create_all() - t1.insert().execute() - eq_(1, select([func.count(text("*"))], from_obj=t1).scalar()) - eq_(True, t1.select().scalar()) + self.metadata.create_all(connection) + connection.execute(t1.insert()) + eq_( + 1, + connection.scalar(select([func.count(text("*"))]).select_from(t1)), + ) + eq_(True, connection.scalar(t1.select())) -class AutoIncrementTest(fixtures.TablesTest): +class AutoIncrementTest(fixtures.TestBase): __requires__ = ("identity",) - run_define_tables = "each" __backend__ = True - @classmethod - def define_tables(cls, metadata): - """Each test manipulates self.metadata individually.""" - - @testing.exclude("sqlite", "<", (3, 4), "no database support") - def test_autoincrement_single_col(self): + @testing.provide_metadata + def test_autoincrement_single_col(self, connection): single = Table( "single", self.metadata, Column("id", Integer, primary_key=True) ) - single.create() + self.metadata.create_all(connection) - r = single.insert().execute() + r = connection.execute(single.insert()) id_ = r.inserted_primary_key[0] eq_(id_, 1) - eq_(1, sa.select([func.count(sa.text("*"))], from_obj=single).scalar()) - - def test_autoincrement_fk(self): - nodes = Table( - "nodes", - self.metadata, - Column("id", Integer, primary_key=True), - Column("parent_id", Integer, ForeignKey("nodes.id")), - Column("data", String(30)), - ) - nodes.create() - - r = nodes.insert().execute(data="foo") - id_ = r.inserted_primary_key[0] - nodes.insert().execute(data="bar", parent_id=id_) + eq_(connection.scalar(sa.select([single.c.id])), 1) def test_autoinc_detection_no_affinity(self): class MyType(TypeDecorator): @@ -1127,7 +1116,8 @@ class AutoIncrementTest(fixtures.TablesTest): assert x._autoincrement_column is None @testing.only_on("sqlite") - def test_non_autoincrement(self): + @testing.provide_metadata + def test_non_autoincrement(self, connection): # sqlite INT primary keys can be non-unique! (only for ints) nonai = Table( "nonaitest", @@ -1135,522 +1125,73 @@ class AutoIncrementTest(fixtures.TablesTest): Column("id", Integer, autoincrement=False, primary_key=True), Column("data", String(20)), ) - nonai.create() - - def go(): - # postgresql + mysql strict will fail on first row, - # mysql in legacy mode fails on second row - nonai.insert().execute(data="row 1") - nonai.insert().execute(data="row 2") + nonai.create(connection) # just testing SQLite for now, it passes with expect_warnings(".*has no Python-side or server-side default.*"): - go() + # postgresql + mysql strict will fail on first row, + # mysql in legacy mode fails on second row + connection.execute(nonai.insert(), dict(data="row 1")) + connection.execute(nonai.insert(), dict(data="row 2")) - def test_col_w_sequence_non_autoinc_no_firing(self): - metadata = self.metadata + @testing.metadata_fixture(ddl="function") + def dataset_no_autoinc(self, metadata): # plain autoincrement/PK table in the actual schema Table("x", metadata, Column("set_id", Integer, primary_key=True)) - metadata.create_all() # for the INSERT use a table with a Sequence # and autoincrement=False. Using a ForeignKey # would have the same effect + + some_seq = Sequence("some_seq") + dataset_no_autoinc = Table( "x", MetaData(), Column( "set_id", Integer, - Sequence("some_seq"), + some_seq, primary_key=True, autoincrement=False, ), ) + return dataset_no_autoinc - testing.db.execute(dataset_no_autoinc.insert()) - eq_( - testing.db.scalar( - select([func.count("*")]).select_from(dataset_no_autoinc) - ), - 1, - ) - - -class SequenceDDLTest(fixtures.TestBase, testing.AssertsCompiledSQL): - __dialect__ = "default" - __backend__ = True - - def test_create_drop_ddl(self): - self.assert_compile( - CreateSequence(Sequence("foo_seq")), "CREATE SEQUENCE foo_seq" - ) - - self.assert_compile( - CreateSequence(Sequence("foo_seq", start=5)), - "CREATE SEQUENCE foo_seq START WITH 5", - ) - - self.assert_compile( - CreateSequence(Sequence("foo_seq", increment=2)), - "CREATE SEQUENCE foo_seq INCREMENT BY 2", - ) - - self.assert_compile( - CreateSequence(Sequence("foo_seq", increment=2, start=5)), - "CREATE SEQUENCE foo_seq INCREMENT BY 2 START WITH 5", - ) - - self.assert_compile( - CreateSequence( - Sequence("foo_seq", increment=2, start=0, minvalue=0) - ), - "CREATE SEQUENCE foo_seq INCREMENT BY 2 START WITH 0 MINVALUE 0", - ) - - self.assert_compile( - CreateSequence( - Sequence("foo_seq", increment=2, start=1, maxvalue=5) - ), - "CREATE SEQUENCE foo_seq INCREMENT BY 2 START WITH 1 MAXVALUE 5", - ) - - self.assert_compile( - CreateSequence( - Sequence("foo_seq", increment=2, start=1, nomaxvalue=True) - ), - "CREATE SEQUENCE foo_seq INCREMENT BY 2 START WITH 1 NO MAXVALUE", - ) - - self.assert_compile( - CreateSequence( - Sequence("foo_seq", increment=2, start=0, nominvalue=True) - ), - "CREATE SEQUENCE foo_seq INCREMENT BY 2 START WITH 0 NO MINVALUE", - ) - - self.assert_compile( - CreateSequence( - Sequence("foo_seq", start=1, maxvalue=10, cycle=True) - ), - "CREATE SEQUENCE foo_seq START WITH 1 MAXVALUE 10 CYCLE", - ) - - self.assert_compile( - CreateSequence(Sequence("foo_seq", cache=1000, order=True)), - "CREATE SEQUENCE foo_seq CACHE 1000 ORDER", - ) - - self.assert_compile( - CreateSequence(Sequence("foo_seq", order=True)), - "CREATE SEQUENCE foo_seq ORDER", - ) - - self.assert_compile( - DropSequence(Sequence("foo_seq")), "DROP SEQUENCE foo_seq" - ) - - -class SequenceExecTest(fixtures.TestBase): - __requires__ = ("sequences",) - __backend__ = True - - @classmethod - def setup_class(cls): - cls.seq = Sequence("my_sequence") - cls.seq.create(testing.db) - - @classmethod - def teardown_class(cls): - cls.seq.drop(testing.db) - - def _assert_seq_result(self, ret): - """asserts return of next_value is an int""" - - assert isinstance(ret, util.int_types) - assert ret > 0 - - def test_implicit_connectionless(self): - s = Sequence("my_sequence", metadata=MetaData(testing.db)) - self._assert_seq_result(s.execute()) - - def test_explicit(self): - s = Sequence("my_sequence") - self._assert_seq_result(s.execute(testing.db)) - - def test_explicit_optional(self): - """test dialect executes a Sequence, returns nextval, whether - or not "optional" is set """ - - s = Sequence("my_sequence", optional=True) - self._assert_seq_result(s.execute(testing.db)) - - def test_func_implicit_connectionless_execute(self): - """test func.next_value().execute()/.scalar() works - with connectionless execution. """ + def test_col_w_optional_sequence_non_autoinc_no_firing( + self, dataset_no_autoinc, connection + ): + """this is testing that a Table which includes a Sequence, when + run against a DB that does not support sequences, the Sequence + does not get in the way. - s = Sequence("my_sequence", metadata=MetaData(testing.db)) - self._assert_seq_result(s.next_value().execute().scalar()) - - def test_func_explicit(self): - s = Sequence("my_sequence") - self._assert_seq_result(testing.db.scalar(s.next_value())) - - def test_func_implicit_connectionless_scalar(self): - """test func.next_value().execute()/.scalar() works. """ - - s = Sequence("my_sequence", metadata=MetaData(testing.db)) - self._assert_seq_result(s.next_value().scalar()) - - def test_func_embedded_select(self): - """test can use next_value() in select column expr""" - - s = Sequence("my_sequence") - self._assert_seq_result(testing.db.scalar(select([s.next_value()]))) - - @testing.fails_on("oracle", "ORA-02287: sequence number not allowed here") - @testing.provide_metadata - def test_func_embedded_whereclause(self): - """test can use next_value() in whereclause""" + """ + dataset_no_autoinc.c.set_id.default.optional = True - metadata = self.metadata - t1 = Table("t", metadata, Column("x", Integer)) - t1.create(testing.db) - testing.db.execute(t1.insert(), [{"x": 1}, {"x": 300}, {"x": 301}]) - s = Sequence("my_sequence") + connection.execute(dataset_no_autoinc.insert()) eq_( - testing.db.execute( - t1.select().where(t1.c.x > s.next_value()) - ).fetchall(), - [(300,), (301,)], - ) - - @testing.provide_metadata - def test_func_embedded_valuesbase(self): - """test can use next_value() in values() of _ValuesBase""" - - metadata = self.metadata - t1 = Table("t", metadata, Column("x", Integer)) - t1.create(testing.db) - s = Sequence("my_sequence") - testing.db.execute(t1.insert().values(x=s.next_value())) - self._assert_seq_result(testing.db.scalar(t1.select())) - - @testing.provide_metadata - def test_inserted_pk_no_returning(self): - """test inserted_primary_key contains [None] when - pk_col=next_value(), implicit returning is not used.""" - - metadata = self.metadata - e = engines.testing_engine(options={"implicit_returning": False}) - s = Sequence("my_sequence") - metadata.bind = e - t1 = Table("t", metadata, Column("x", Integer, primary_key=True)) - t1.create() - r = e.execute(t1.insert().values(x=s.next_value())) - eq_(r.inserted_primary_key, [None]) - - @testing.requires.returning - @testing.provide_metadata - def test_inserted_pk_implicit_returning(self): - """test inserted_primary_key contains the result when - pk_col=next_value(), when implicit returning is used.""" - - metadata = self.metadata - e = engines.testing_engine(options={"implicit_returning": True}) - s = Sequence("my_sequence") - metadata.bind = e - t1 = Table("t", metadata, Column("x", Integer, primary_key=True)) - t1.create() - r = e.execute(t1.insert().values(x=s.next_value())) - self._assert_seq_result(r.inserted_primary_key[0]) - - -class SequenceTest(fixtures.TestBase, testing.AssertsCompiledSQL): - __requires__ = ("sequences",) - __backend__ = True - - @testing.fails_on("firebird", "no FB support for start/increment") - def test_start_increment(self): - for seq in ( - Sequence("foo_seq"), - Sequence("foo_seq", start=8), - Sequence("foo_seq", increment=5), - ): - seq.create(testing.db) - try: - values = [testing.db.execute(seq) for i in range(3)] - start = seq.start or 1 - inc = seq.increment or 1 - assert values == list(range(start, start + inc * 3, inc)) - - finally: - seq.drop(testing.db) - - def _has_sequence(self, name): - return testing.db.dialect.has_sequence(testing.db, name) - - def test_nextval_render(self): - """test dialect renders the "nextval" construct, - whether or not "optional" is set """ - - for s in (Sequence("my_seq"), Sequence("my_seq", optional=True)): - assert str(s.next_value().compile(dialect=testing.db.dialect)) in ( - "nextval('my_seq')", - "gen_id(my_seq, 1)", - "my_seq.nextval", - ) - - def test_nextval_unsupported(self): - """test next_value() used on non-sequence platform - raises NotImplementedError.""" - - s = Sequence("my_seq") - d = sqlite.dialect() - assert_raises_message( - NotImplementedError, - "Dialect 'sqlite' does not support sequence increments.", - s.next_value().compile, - dialect=d, - ) - - def test_checkfirst_sequence(self): - s = Sequence("my_sequence") - s.create(testing.db, checkfirst=False) - assert self._has_sequence("my_sequence") - s.create(testing.db, checkfirst=True) - s.drop(testing.db, checkfirst=False) - assert not self._has_sequence("my_sequence") - s.drop(testing.db, checkfirst=True) - - def test_checkfirst_metadata(self): - m = MetaData() - Sequence("my_sequence", metadata=m) - m.create_all(testing.db, checkfirst=False) - assert self._has_sequence("my_sequence") - m.create_all(testing.db, checkfirst=True) - m.drop_all(testing.db, checkfirst=False) - assert not self._has_sequence("my_sequence") - m.drop_all(testing.db, checkfirst=True) - - def test_checkfirst_table(self): - m = MetaData() - s = Sequence("my_sequence") - t = Table("t", m, Column("c", Integer, s, primary_key=True)) - t.create(testing.db, checkfirst=False) - assert self._has_sequence("my_sequence") - t.create(testing.db, checkfirst=True) - t.drop(testing.db, checkfirst=False) - assert not self._has_sequence("my_sequence") - t.drop(testing.db, checkfirst=True) - - @testing.provide_metadata - def test_table_overrides_metadata_create(self): - metadata = self.metadata - Sequence("s1", metadata=metadata) - s2 = Sequence("s2", metadata=metadata) - s3 = Sequence("s3") - t = Table("t", metadata, Column("c", Integer, s3, primary_key=True)) - assert s3.metadata is metadata - - t.create(testing.db, checkfirst=True) - s3.drop(testing.db) - - # 't' is created, and 's3' won't be - # re-created since it's linked to 't'. - # 's1' and 's2' are, however. - metadata.create_all(testing.db) - assert self._has_sequence("s1") - assert self._has_sequence("s2") - assert not self._has_sequence("s3") - - s2.drop(testing.db) - assert self._has_sequence("s1") - assert not self._has_sequence("s2") - - metadata.drop_all(testing.db) - assert not self._has_sequence("s1") - assert not self._has_sequence("s2") - - @testing.requires.returning - @testing.provide_metadata - def test_freestanding_sequence_via_autoinc(self): - t = Table( - "some_table", - self.metadata, - Column( - "id", - Integer, - autoincrement=True, - primary_key=True, - default=Sequence( - "my_sequence", metadata=self.metadata - ).next_value(), - ), - ) - self.metadata.create_all(testing.db) - - result = testing.db.execute(t.insert()) - eq_(result.inserted_primary_key, [1]) - - -cartitems = sometable = metadata = None - - -class TableBoundSequenceTest(fixtures.TestBase): - __requires__ = ("sequences",) - __backend__ = True - - @classmethod - def setup_class(cls): - global cartitems, sometable, metadata - metadata = MetaData(testing.db) - cartitems = Table( - "cartitems", - metadata, - Column( - "cart_id", Integer, Sequence("cart_id_seq"), primary_key=True - ), - Column("description", String(40)), - Column("createdate", sa.DateTime()), - ) - sometable = Table( - "Manager", - metadata, - Column("obj_id", Integer, Sequence("obj_id_seq")), - Column("name", String(128)), - Column( - "id", - Integer, - Sequence("Manager_id_seq", optional=True), - primary_key=True, + connection.scalar( + select([func.count("*")]).select_from(dataset_no_autoinc) ), - ) - - metadata.create_all() - - @classmethod - def teardown_class(cls): - metadata.drop_all() - - def test_insert_via_seq(self): - cartitems.insert().execute(description="hi") - cartitems.insert().execute(description="there") - r = cartitems.insert().execute(description="lala") - - assert r.inserted_primary_key and r.inserted_primary_key[0] is not None - id_ = r.inserted_primary_key[0] - - eq_( 1, - sa.select( - [func.count(cartitems.c.cart_id)], - sa.and_( - cartitems.c.description == "lala", - cartitems.c.cart_id == id_, - ), - ).scalar(), ) - cartitems.select().execute().fetchall() - - def test_seq_nonpk(self): - """test sequences fire off as defaults on non-pk columns""" + @testing.fails_if(testing.requires.sequences) + def test_col_w_nonoptional_sequence_non_autoinc_no_firing( + self, dataset_no_autoinc, connection + ): + """When the sequence is not optional and sequences are supported, + the test fails because we didn't create the sequence. - engine = engines.testing_engine(options={"implicit_returning": False}) - result = engine.execute(sometable.insert(), name="somename") - - assert set(result.postfetch_cols()) == set([sometable.c.obj_id]) - - result = engine.execute(sometable.insert(), name="someother") - assert set(result.postfetch_cols()) == set([sometable.c.obj_id]) + """ + dataset_no_autoinc.c.set_id.default.optional = False - sometable.insert().execute({"name": "name3"}, {"name": "name4"}) + connection.execute(dataset_no_autoinc.insert()) eq_( - sometable.select().order_by(sometable.c.id).execute().fetchall(), - [ - (1, "somename", 1), - (2, "someother", 2), - (3, "name3", 3), - (4, "name4", 4), - ], - ) - - -class SequenceAsServerDefaultTest( - testing.AssertsExecutionResults, fixtures.TablesTest -): - __requires__ = ("sequences_as_server_defaults",) - __backend__ = True - - run_create_tables = "each" - - @classmethod - def define_tables(cls, metadata): - m = metadata - - s = Sequence("t_seq", metadata=m) - Table( - "t_seq_test", - m, - Column("id", Integer, s, server_default=s.next_value()), - Column("data", String(50)), - ) - - s2 = Sequence("t_seq_2", metadata=m) - Table( - "t_seq_test_2", - m, - Column("id", Integer, server_default=s2.next_value()), - Column("data", String(50)), - ) - - def test_default_textual_w_default(self): - with testing.db.connect() as conn: - conn.execute("insert into t_seq_test (data) values ('some data')") - - eq_(conn.scalar("select id from t_seq_test"), 1) - - def test_default_core_w_default(self): - t_seq_test = self.tables.t_seq_test - with testing.db.connect() as conn: - conn.execute(t_seq_test.insert().values(data="some data")) - - eq_(conn.scalar(select([t_seq_test.c.id])), 1) - - def test_default_textual_server_only(self): - with testing.db.connect() as conn: - conn.execute( - "insert into t_seq_test_2 (data) values ('some data')" - ) - - eq_(conn.scalar("select id from t_seq_test_2"), 1) - - def test_default_core_server_only(self): - t_seq_test = self.tables.t_seq_test_2 - with testing.db.connect() as conn: - conn.execute(t_seq_test.insert().values(data="some data")) - - eq_(conn.scalar(select([t_seq_test.c.id])), 1) - - def test_drop_ordering(self): - self.assert_sql_execution( - testing.db, - lambda: self.metadata.drop_all(checkfirst=False), - AllOf( - CompiledSQL("DROP TABLE t_seq_test_2", {}), - EachOf( - CompiledSQL("DROP TABLE t_seq_test", {}), - CompiledSQL( - "DROP SEQUENCE t_seq", # dropped as part of t_seq_test - {}, - ), - ), - ), - CompiledSQL( - "DROP SEQUENCE t_seq_2", # dropped as part of metadata level - {}, + connection.scalar( + select([func.count("*")]).select_from(dataset_no_autoinc) ), + 1, ) @@ -1697,19 +1238,20 @@ class SpecialTypePKTest(fixtures.TestBase): implicit_returning=implicit_returning, ) - t.create() - r = t.insert().values(data=5).execute() - - # we don't pre-fetch 'server_default'. - if "server_default" in kw and ( - not testing.db.dialect.implicit_returning or not implicit_returning - ): - eq_(r.inserted_primary_key, [None]) - else: - eq_(r.inserted_primary_key, ["INT_1"]) - r.close() + with testing.db.connect() as conn: + t.create(conn) + r = conn.execute(t.insert().values(data=5)) + + # we don't pre-fetch 'server_default'. + if "server_default" in kw and ( + not testing.db.dialect.implicit_returning + or not implicit_returning + ): + eq_(r.inserted_primary_key, [None]) + else: + eq_(r.inserted_primary_key, ["INT_1"]) - eq_(t.select().execute().first(), ("INT_1", 5)) + eq_(conn.execute(t.select()).first(), ("INT_1", 5)) def test_plain(self): # among other things, tests that autoincrement @@ -1756,7 +1298,7 @@ class ServerDefaultsOnPKTest(fixtures.TestBase): __backend__ = True @testing.provide_metadata - def test_string_default_none_on_insert(self): + def test_string_default_none_on_insert(self, connection): """Test that without implicit returning, we return None for a string server default. @@ -1776,14 +1318,14 @@ class ServerDefaultsOnPKTest(fixtures.TestBase): Column("data", String(10)), implicit_returning=False, ) - metadata.create_all() - r = t.insert().execute(data="data") + metadata.create_all(connection) + r = connection.execute(t.insert(), dict(data="data")) eq_(r.inserted_primary_key, [None]) - eq_(t.select().execute().fetchall(), [("key_one", "data")]) + eq_(list(connection.execute(t.select())), [("key_one", "data")]) @testing.requires.returning @testing.provide_metadata - def test_string_default_on_insert_with_returning(self): + def test_string_default_on_insert_with_returning(self, connection): """With implicit_returning, we get a string PK default back no problem.""" @@ -1796,13 +1338,13 @@ class ServerDefaultsOnPKTest(fixtures.TestBase): ), Column("data", String(10)), ) - metadata.create_all() - r = t.insert().execute(data="data") + metadata.create_all(connection) + r = connection.execute(t.insert(), dict(data="data")) eq_(r.inserted_primary_key, ["key_one"]) - eq_(t.select().execute().fetchall(), [("key_one", "data")]) + eq_(list(connection.execute(t.select())), [("key_one", "data")]) @testing.provide_metadata - def test_int_default_none_on_insert(self): + def test_int_default_none_on_insert(self, connection): metadata = self.metadata t = Table( "x", @@ -1812,16 +1354,16 @@ class ServerDefaultsOnPKTest(fixtures.TestBase): implicit_returning=False, ) assert t._autoincrement_column is None - metadata.create_all() - r = t.insert().execute(data="data") + metadata.create_all(connection) + r = connection.execute(t.insert(), dict(data="data")) eq_(r.inserted_primary_key, [None]) if testing.against("sqlite"): - eq_(t.select().execute().fetchall(), [(1, "data")]) + eq_(list(connection.execute(t.select())), [(1, "data")]) else: - eq_(t.select().execute().fetchall(), [(5, "data")]) + eq_(list(connection.execute(t.select())), [(5, "data")]) @testing.provide_metadata - def test_autoincrement_reflected_from_server_default(self): + def test_autoincrement_reflected_from_server_default(self, connection): metadata = self.metadata t = Table( "x", @@ -1831,14 +1373,14 @@ class ServerDefaultsOnPKTest(fixtures.TestBase): implicit_returning=False, ) assert t._autoincrement_column is None - metadata.create_all() + metadata.create_all(connection) - m2 = MetaData(metadata.bind) - t2 = Table("x", m2, autoload=True, implicit_returning=False) + m2 = MetaData() + t2 = Table("x", m2, autoload_with=connection, implicit_returning=False) assert t2._autoincrement_column is None @testing.provide_metadata - def test_int_default_none_on_insert_reflected(self): + def test_int_default_none_on_insert_reflected(self, connection): metadata = self.metadata Table( "x", @@ -1847,21 +1389,21 @@ class ServerDefaultsOnPKTest(fixtures.TestBase): Column("data", String(10)), implicit_returning=False, ) - metadata.create_all() + metadata.create_all(connection) - m2 = MetaData(metadata.bind) - t2 = Table("x", m2, autoload=True, implicit_returning=False) + m2 = MetaData() + t2 = Table("x", m2, autoload_with=connection, implicit_returning=False) - r = t2.insert().execute(data="data") + r = connection.execute(t2.insert(), dict(data="data")) eq_(r.inserted_primary_key, [None]) if testing.against("sqlite"): - eq_(t2.select().execute().fetchall(), [(1, "data")]) + eq_(list(connection.execute(t2.select())), [(1, "data")]) else: - eq_(t2.select().execute().fetchall(), [(5, "data")]) + eq_(list(connection.execute(t2.select())), [(5, "data")]) @testing.requires.returning @testing.provide_metadata - def test_int_default_on_insert_with_returning(self): + def test_int_default_on_insert_with_returning(self, connection): metadata = self.metadata t = Table( "x", @@ -1870,10 +1412,10 @@ class ServerDefaultsOnPKTest(fixtures.TestBase): Column("data", String(10)), ) - metadata.create_all() - r = t.insert().execute(data="data") + metadata.create_all(connection) + r = connection.execute(t.insert(), dict(data="data")) eq_(r.inserted_primary_key, [5]) - eq_(t.select().execute().fetchall(), [(5, "data")]) + eq_(list(connection.execute(t.select())), [(5, "data")]) class UnicodeDefaultsTest(fixtures.TestBase): @@ -1899,20 +1441,23 @@ class UnicodeDefaultsTest(fixtures.TestBase): ) -class InsertFromSelectTest(fixtures.TestBase): +class InsertFromSelectTest(fixtures.TablesTest): __backend__ = True - def _fixture(self): - data = Table( - "data", self.metadata, Column("x", Integer), Column("y", Integer) - ) - data.create() - testing.db.execute(data.insert(), {"x": 2, "y": 5}, {"x": 7, "y": 12}) - return data + @classmethod + def define_tables(cls, metadata): + Table("data", metadata, Column("x", Integer), Column("y", Integer)) + + @classmethod + def insert_data(cls): + data = cls.tables.data + + with testing.db.connect() as conn: + conn.execute(data.insert(), [{"x": 2, "y": 5}, {"x": 7, "y": 12}]) @testing.provide_metadata - def test_insert_from_select_override_defaults(self): - data = self._fixture() + def test_insert_from_select_override_defaults(self, connection): + data = self.tables.data table = Table( "sometable", @@ -1922,21 +1467,21 @@ class InsertFromSelectTest(fixtures.TestBase): Column("y", Integer), ) - table.create() + table.create(connection) sel = select([data.c.x, data.c.y]) ins = table.insert().from_select(["x", "y"], sel) - testing.db.execute(ins) + connection.execute(ins) eq_( - testing.db.execute(table.select().order_by(table.c.x)).fetchall(), + list(connection.execute(table.select().order_by(table.c.x))), [(2, 12, 5), (7, 12, 12)], ) @testing.provide_metadata - def test_insert_from_select_fn_defaults(self): - data = self._fixture() + def test_insert_from_select_fn_defaults(self, connection): + data = self.tables.data counter = itertools.count(1) @@ -1951,16 +1496,16 @@ class InsertFromSelectTest(fixtures.TestBase): Column("y", Integer), ) - table.create() + table.create(connection) sel = select([data.c.x, data.c.y]) ins = table.insert().from_select(["x", "y"], sel) - testing.db.execute(ins) + connection.execute(ins) # counter is only called once! eq_( - testing.db.execute(table.select().order_by(table.c.x)).fetchall(), + list(connection.execute(table.select().order_by(table.c.x))), [(2, 1, 5), (7, 1, 12)], ) @@ -1988,7 +1533,15 @@ class CurrentParametersTest(fixtures.TablesTest): some_table.c.x.default.arg = gen_default return fn - def _test(self, exec_type, usemethod): + @testing.combinations( + ("single", "attribute"), + ("single", "method"), + ("executemany", "attribute"), + ("executemany", "method"), + ("multivalues", "method", testing.requires.multivalues_inserts), + argnames="exec_type, usemethod", + ) + def test_parameters(self, exec_type, usemethod, connection): collect = mock.Mock() @self._fixture @@ -2006,25 +1559,8 @@ class CurrentParametersTest(fixtures.TablesTest): else: stmt, params = table.insert(), parameters - with testing.db.connect() as conn: - conn.execute(stmt, params) + connection.execute(stmt, params) eq_( collect.mock_calls, [mock.call({"y": param["y"], "x": None}) for param in parameters], ) - - def test_single_w_attribute(self): - self._test("single", "attribute") - - def test_single_w_method(self): - self._test("single", "method") - - def test_executemany_w_attribute(self): - self._test("executemany", "attribute") - - def test_executemany_w_method(self): - self._test("executemany", "method") - - @testing.requires.multivalues_inserts - def test_multivalued_w_method(self): - self._test("multivalues", "method") diff --git a/test/sql/test_sequences.py b/test/sql/test_sequences.py new file mode 100644 index 0000000000..9f16576a2f --- /dev/null +++ b/test/sql/test_sequences.py @@ -0,0 +1,552 @@ +import sqlalchemy as sa +from sqlalchemy import Integer +from sqlalchemy import MetaData +from sqlalchemy import Sequence +from sqlalchemy import String +from sqlalchemy import testing +from sqlalchemy import util +from sqlalchemy.dialects import sqlite +from sqlalchemy.schema import CreateSequence +from sqlalchemy.schema import DropSequence +from sqlalchemy.sql import select +from sqlalchemy.testing import assert_raises_message +from sqlalchemy.testing import engines +from sqlalchemy.testing import eq_ +from sqlalchemy.testing import fixtures +from sqlalchemy.testing.assertsql import AllOf +from sqlalchemy.testing.assertsql import CompiledSQL +from sqlalchemy.testing.assertsql import EachOf +from sqlalchemy.testing.schema import Column +from sqlalchemy.testing.schema import Table + + +class SequenceDDLTest(fixtures.TestBase, testing.AssertsCompiledSQL): + __dialect__ = "default" + __backend__ = True + + def test_create_drop_ddl(self): + self.assert_compile( + CreateSequence(Sequence("foo_seq")), "CREATE SEQUENCE foo_seq" + ) + + self.assert_compile( + CreateSequence(Sequence("foo_seq", start=5)), + "CREATE SEQUENCE foo_seq START WITH 5", + ) + + self.assert_compile( + CreateSequence(Sequence("foo_seq", increment=2)), + "CREATE SEQUENCE foo_seq INCREMENT BY 2", + ) + + self.assert_compile( + CreateSequence(Sequence("foo_seq", increment=2, start=5)), + "CREATE SEQUENCE foo_seq INCREMENT BY 2 START WITH 5", + ) + + self.assert_compile( + CreateSequence( + Sequence("foo_seq", increment=2, start=0, minvalue=0) + ), + "CREATE SEQUENCE foo_seq INCREMENT BY 2 START WITH 0 MINVALUE 0", + ) + + self.assert_compile( + CreateSequence( + Sequence("foo_seq", increment=2, start=1, maxvalue=5) + ), + "CREATE SEQUENCE foo_seq INCREMENT BY 2 START WITH 1 MAXVALUE 5", + ) + + self.assert_compile( + CreateSequence( + Sequence("foo_seq", increment=2, start=1, nomaxvalue=True) + ), + "CREATE SEQUENCE foo_seq INCREMENT BY 2 START WITH 1 NO MAXVALUE", + ) + + self.assert_compile( + CreateSequence( + Sequence("foo_seq", increment=2, start=0, nominvalue=True) + ), + "CREATE SEQUENCE foo_seq INCREMENT BY 2 START WITH 0 NO MINVALUE", + ) + + self.assert_compile( + CreateSequence( + Sequence("foo_seq", start=1, maxvalue=10, cycle=True) + ), + "CREATE SEQUENCE foo_seq START WITH 1 MAXVALUE 10 CYCLE", + ) + + self.assert_compile( + CreateSequence(Sequence("foo_seq", cache=1000, order=True)), + "CREATE SEQUENCE foo_seq CACHE 1000 ORDER", + ) + + self.assert_compile( + CreateSequence(Sequence("foo_seq", order=True)), + "CREATE SEQUENCE foo_seq ORDER", + ) + + self.assert_compile( + DropSequence(Sequence("foo_seq")), "DROP SEQUENCE foo_seq" + ) + + +class LegacySequenceExecTest(fixtures.TestBase): + __requires__ = ("sequences",) + __backend__ = True + + @classmethod + def setup_class(cls): + cls.seq = Sequence("my_sequence") + cls.seq.create(testing.db) + + @classmethod + def teardown_class(cls): + cls.seq.drop(testing.db) + + def _assert_seq_result(self, ret): + """asserts return of next_value is an int""" + + assert isinstance(ret, util.int_types) + assert ret > 0 + + def test_implicit_connectionless(self): + s = Sequence("my_sequence", metadata=MetaData(testing.db)) + self._assert_seq_result(s.execute()) + + def test_explicit(self, connection): + s = Sequence("my_sequence") + self._assert_seq_result(s.execute(connection)) + + def test_explicit_optional(self): + """test dialect executes a Sequence, returns nextval, whether + or not "optional" is set """ + + s = Sequence("my_sequence", optional=True) + self._assert_seq_result(s.execute(testing.db)) + + def test_func_implicit_connectionless_execute(self): + """test func.next_value().execute()/.scalar() works + with connectionless execution. """ + + s = Sequence("my_sequence", metadata=MetaData(testing.db)) + self._assert_seq_result(s.next_value().execute().scalar()) + + def test_func_explicit(self): + s = Sequence("my_sequence") + self._assert_seq_result(testing.db.scalar(s.next_value())) + + def test_func_implicit_connectionless_scalar(self): + """test func.next_value().execute()/.scalar() works. """ + + s = Sequence("my_sequence", metadata=MetaData(testing.db)) + self._assert_seq_result(s.next_value().scalar()) + + def test_func_embedded_select(self): + """test can use next_value() in select column expr""" + + s = Sequence("my_sequence") + self._assert_seq_result(testing.db.scalar(select([s.next_value()]))) + + +class SequenceExecTest(fixtures.TestBase): + __requires__ = ("sequences",) + __backend__ = True + + @classmethod + def setup_class(cls): + cls.seq = Sequence("my_sequence") + cls.seq.create(testing.db) + + @classmethod + def teardown_class(cls): + cls.seq.drop(testing.db) + + def _assert_seq_result(self, ret): + """asserts return of next_value is an int""" + + assert isinstance(ret, util.int_types) + assert ret > 0 + + def test_execute(self, connection): + s = Sequence("my_sequence") + self._assert_seq_result(connection.execute(s)) + + def test_execute_optional(self, connection): + """test dialect executes a Sequence, returns nextval, whether + or not "optional" is set """ + + s = Sequence("my_sequence", optional=True) + self._assert_seq_result(connection.execute(s)) + + def test_execute_next_value(self, connection): + """test func.next_value().execute()/.scalar() works + with connectionless execution. """ + + s = Sequence("my_sequence") + self._assert_seq_result(connection.scalar(s.next_value())) + + def test_execute_optional_next_value(self, connection): + """test func.next_value().execute()/.scalar() works + with connectionless execution. """ + + s = Sequence("my_sequence", optional=True) + self._assert_seq_result(connection.scalar(s.next_value())) + + def test_func_embedded_select(self, connection): + """test can use next_value() in select column expr""" + + s = Sequence("my_sequence") + self._assert_seq_result(connection.scalar(select([s.next_value()]))) + + @testing.fails_on("oracle", "ORA-02287: sequence number not allowed here") + @testing.provide_metadata + def test_func_embedded_whereclause(self, connection): + """test can use next_value() in whereclause""" + + metadata = self.metadata + t1 = Table("t", metadata, Column("x", Integer)) + t1.create(testing.db) + connection.execute(t1.insert(), [{"x": 1}, {"x": 300}, {"x": 301}]) + s = Sequence("my_sequence") + eq_( + list( + connection.execute(t1.select().where(t1.c.x > s.next_value())) + ), + [(300,), (301,)], + ) + + @testing.provide_metadata + def test_func_embedded_valuesbase(self, connection): + """test can use next_value() in values() of _ValuesBase""" + + metadata = self.metadata + t1 = Table("t", metadata, Column("x", Integer)) + t1.create(testing.db) + s = Sequence("my_sequence") + connection.execute(t1.insert().values(x=s.next_value())) + self._assert_seq_result(connection.scalar(t1.select())) + + @testing.requires.supports_lastrowid + @testing.provide_metadata + def test_inserted_pk_no_returning_w_lastrowid(self): + """test inserted_primary_key contains the pk when + pk_col=next_value(), lastrowid is supported.""" + + metadata = self.metadata + t1 = Table("t", metadata, Column("x", Integer, primary_key=True)) + t1.create(testing.db) + e = engines.testing_engine(options={"implicit_returning": False}) + s = Sequence("my_sequence") + + with e.connect() as conn: + r = conn.execute(t1.insert().values(x=s.next_value())) + self._assert_seq_result(r.inserted_primary_key[0]) + + @testing.requires.no_lastrowid_support + @testing.provide_metadata + def test_inserted_pk_no_returning_no_lastrowid(self): + """test inserted_primary_key contains [None] when + pk_col=next_value(), implicit returning is not used.""" + + metadata = self.metadata + t1 = Table("t", metadata, Column("x", Integer, primary_key=True)) + t1.create(testing.db) + + e = engines.testing_engine(options={"implicit_returning": False}) + s = Sequence("my_sequence") + with e.connect() as conn: + r = conn.execute(t1.insert().values(x=s.next_value())) + eq_(r.inserted_primary_key, [None]) + + @testing.requires.returning + @testing.provide_metadata + def test_inserted_pk_implicit_returning(self): + """test inserted_primary_key contains the result when + pk_col=next_value(), when implicit returning is used.""" + + metadata = self.metadata + s = Sequence("my_sequence") + t1 = Table("t", metadata, Column("x", Integer, primary_key=True)) + t1.create(testing.db) + + e = engines.testing_engine(options={"implicit_returning": True}) + with e.connect() as conn: + r = conn.execute(t1.insert().values(x=s.next_value())) + self._assert_seq_result(r.inserted_primary_key[0]) + + +class SequenceTest(fixtures.TestBase, testing.AssertsCompiledSQL): + __requires__ = ("sequences",) + __backend__ = True + + @testing.combinations( + (Sequence("foo_seq"),), + (Sequence("foo_seq", start=8),), + (Sequence("foo_seq", increment=5),), + ) + def test_start_increment(self, seq): + seq.create(testing.db) + try: + with testing.db.connect() as conn: + values = [conn.execute(seq) for i in range(3)] + start = seq.start or 1 + inc = seq.increment or 1 + eq_(values, list(range(start, start + inc * 3, inc))) + + finally: + seq.drop(testing.db) + + def _has_sequence(self, connection, name): + return testing.db.dialect.has_sequence(connection, name) + + def test_nextval_unsupported(self): + """test next_value() used on non-sequence platform + raises NotImplementedError.""" + + s = Sequence("my_seq") + d = sqlite.dialect() + assert_raises_message( + NotImplementedError, + "Dialect 'sqlite' does not support sequence increments.", + s.next_value().compile, + dialect=d, + ) + + def test_checkfirst_sequence(self, connection): + s = Sequence("my_sequence") + s.create(connection, checkfirst=False) + assert self._has_sequence(connection, "my_sequence") + s.create(connection, checkfirst=True) + s.drop(connection, checkfirst=False) + assert not self._has_sequence(connection, "my_sequence") + s.drop(connection, checkfirst=True) + + def test_checkfirst_metadata(self, connection): + m = MetaData() + Sequence("my_sequence", metadata=m) + m.create_all(connection, checkfirst=False) + assert self._has_sequence(connection, "my_sequence") + m.create_all(connection, checkfirst=True) + m.drop_all(connection, checkfirst=False) + assert not self._has_sequence(connection, "my_sequence") + m.drop_all(connection, checkfirst=True) + + def test_checkfirst_table(self, connection): + m = MetaData() + s = Sequence("my_sequence") + t = Table("t", m, Column("c", Integer, s, primary_key=True)) + t.create(connection, checkfirst=False) + assert self._has_sequence(connection, "my_sequence") + t.create(connection, checkfirst=True) + t.drop(connection, checkfirst=False) + assert not self._has_sequence(connection, "my_sequence") + t.drop(connection, checkfirst=True) + + @testing.provide_metadata + def test_table_overrides_metadata_create(self, connection): + metadata = self.metadata + Sequence("s1", metadata=metadata) + s2 = Sequence("s2", metadata=metadata) + s3 = Sequence("s3") + t = Table("t", metadata, Column("c", Integer, s3, primary_key=True)) + assert s3.metadata is metadata + + t.create(connection, checkfirst=True) + s3.drop(connection) + + # 't' is created, and 's3' won't be + # re-created since it's linked to 't'. + # 's1' and 's2' are, however. + metadata.create_all(connection) + assert self._has_sequence(connection, "s1") + assert self._has_sequence(connection, "s2") + assert not self._has_sequence(connection, "s3") + + s2.drop(connection) + assert self._has_sequence(connection, "s1") + assert not self._has_sequence(connection, "s2") + + metadata.drop_all(connection) + assert not self._has_sequence(connection, "s1") + assert not self._has_sequence(connection, "s2") + + @testing.requires.returning + @testing.provide_metadata + def test_freestanding_sequence_via_autoinc(self, connection): + t = Table( + "some_table", + self.metadata, + Column( + "id", + Integer, + autoincrement=True, + primary_key=True, + default=Sequence( + "my_sequence", metadata=self.metadata + ).next_value(), + ), + ) + self.metadata.create_all(connection) + + result = connection.execute(t.insert()) + eq_(result.inserted_primary_key, [1]) + + +class TableBoundSequenceTest(fixtures.TablesTest): + __requires__ = ("sequences",) + __backend__ = True + + @classmethod + def define_tables(cls, metadata): + Table( + "cartitems", + metadata, + Column( + "cart_id", Integer, Sequence("cart_id_seq"), primary_key=True + ), + Column("description", String(40)), + Column("createdate", sa.DateTime()), + ) + + # a little bit of implicit case sensitive naming test going on here + Table( + "Manager", + metadata, + Column("obj_id", Integer, Sequence("obj_id_seq")), + Column("name", String(128)), + Column( + "id", + Integer, + Sequence("Manager_id_seq", optional=True), + primary_key=True, + ), + ) + + def test_insert_via_seq(self, connection): + cartitems = self.tables.cartitems + + connection.execute(cartitems.insert(), dict(description="hi")) + connection.execute(cartitems.insert(), dict(description="there")) + r = connection.execute(cartitems.insert(), dict(description="lala")) + + eq_(r.inserted_primary_key[0], 3) + + eq_( + connection.scalar( + sa.select([cartitems.c.cart_id]).where( + cartitems.c.description == "lala" + ), + ), + 3, + ) + + def test_seq_nonpk(self): + """test sequences fire off as defaults on non-pk columns""" + + sometable = self.tables.Manager + + engine = engines.testing_engine(options={"implicit_returning": False}) + + with engine.connect() as conn: + result = conn.execute(sometable.insert(), dict(name="somename")) + + eq_(result.postfetch_cols(), [sometable.c.obj_id]) + + result = conn.execute(sometable.insert(), dict(name="someother")) + + conn.execute( + sometable.insert(), [{"name": "name3"}, {"name": "name4"}] + ) + eq_( + list( + conn.execute(sometable.select().order_by(sometable.c.id)) + ), + [ + (1, "somename", 1), + (2, "someother", 2), + (3, "name3", 3), + (4, "name4", 4), + ], + ) + + +class SequenceAsServerDefaultTest( + testing.AssertsExecutionResults, fixtures.TablesTest +): + __requires__ = ("sequences_as_server_defaults",) + __backend__ = True + + run_create_tables = "each" + + @classmethod + def define_tables(cls, metadata): + m = metadata + + s = Sequence("t_seq", metadata=m) + Table( + "t_seq_test", + m, + Column("id", Integer, s, server_default=s.next_value()), + Column("data", String(50)), + ) + + s2 = Sequence("t_seq_2", metadata=m) + Table( + "t_seq_test_2", + m, + Column("id", Integer, server_default=s2.next_value()), + Column("data", String(50)), + ) + + def test_default_textual_w_default(self, connection): + connection.execute( + "insert into t_seq_test (data) values ('some data')" + ) + + eq_(connection.execute("select id from t_seq_test").scalar(), 1) + + def test_default_core_w_default(self, connection): + t_seq_test = self.tables.t_seq_test + connection.execute(t_seq_test.insert().values(data="some data")) + + eq_(connection.scalar(select([t_seq_test.c.id])), 1) + + def test_default_textual_server_only(self, connection): + connection.execute( + "insert into t_seq_test_2 (data) values ('some data')" + ) + + eq_( + connection.execute("select id from t_seq_test_2").scalar(), 1, + ) + + def test_default_core_server_only(self, connection): + t_seq_test = self.tables.t_seq_test_2 + connection.execute(t_seq_test.insert().values(data="some data")) + + eq_(connection.scalar(select([t_seq_test.c.id])), 1) + + def test_drop_ordering(self): + with self.sql_execution_asserter(testing.db) as asserter: + self.metadata.drop_all(checkfirst=False) + + asserter.assert_( + AllOf( + CompiledSQL("DROP TABLE t_seq_test_2", {}), + EachOf( + CompiledSQL("DROP TABLE t_seq_test", {}), + CompiledSQL( + "DROP SEQUENCE t_seq", # dropped as part of t_seq_test + {}, + ), + ), + ), + CompiledSQL( + "DROP SEQUENCE t_seq_2", # dropped as part of metadata level + {}, + ), + ) -- 2.47.2