from sqlalchemy import String
from sqlalchemy import testing
from sqlalchemy import Unicode
-from sqlalchemy import util
-from sqlalchemy.dialects import sqlite
-from sqlalchemy.schema import CreateSequence
from sqlalchemy.schema import CreateTable
-from sqlalchemy.schema import DropSequence
from sqlalchemy.sql import literal_column
from sqlalchemy.sql import select
from sqlalchemy.sql import text
from sqlalchemy.testing import expect_warnings
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import mock
-from sqlalchemy.testing.assertsql import AllOf
-from sqlalchemy.testing.assertsql import CompiledSQL
-from sqlalchemy.testing.assertsql import EachOf
from sqlalchemy.testing.schema import Column
from sqlalchemy.testing.schema import Table
from sqlalchemy.types import TypeDecorator
from sqlalchemy.util import u
-t = f = f2 = ts = currenttime = metadata = default_generator = None
-
-
class DDLTest(fixtures.TestBase, AssertsCompiledSQL):
__dialect__ = "default"
)
-class DefaultTest(fixtures.TestBase):
- __backend__ = True
-
- @classmethod
- def setup_class(cls):
- global t, f, f2, ts, currenttime, metadata, default_generator
-
- db = testing.db
- metadata = MetaData(db)
- default_generator = {"x": 50}
-
- def mydefault():
- default_generator["x"] += 1
- return default_generator["x"]
-
- def myupdate_with_ctx(ctx):
- conn = ctx.connection
- return conn.execute(sa.select([sa.text("13")])).scalar()
-
- def mydefault_using_connection(ctx):
- conn = ctx.connection
- return conn.execute(sa.select([sa.text("12")])).scalar()
-
- use_function_defaults = testing.against("postgresql", "mssql")
- is_oracle = testing.against("oracle")
-
- class MyClass(object):
- @classmethod
- def gen_default(cls, ctx):
- return "hi"
-
- class MyType(TypeDecorator):
- impl = String(50)
-
- def process_bind_param(self, value, dialect):
- if value is not None:
- value = "BIND" + value
- return value
-
- # select "count(1)" returns different results on different DBs also
- # correct for "current_date" compatible as column default, value
- # differences
- currenttime = func.current_date(type_=sa.Date, bind=db)
- if is_oracle:
- ts = db.scalar(
- sa.select(
- [
- func.trunc(
- func.current_timestamp(),
- sa.literal_column("'DAY'"),
- type_=sa.Date,
- )
- ]
- )
- )
- assert isinstance(ts, datetime.date) and not isinstance(
- ts, datetime.datetime
- )
- f = sa.select([func.length("abcdef")], bind=db).scalar()
- f2 = sa.select([func.length("abcdefghijk")], bind=db).scalar()
- # TODO: engine propagation across nested functions not working
- currenttime = func.trunc(
- currenttime, sa.literal_column("'DAY'"), bind=db, type_=sa.Date
- )
- def1 = currenttime
- def2 = func.trunc(
- sa.text("current_timestamp"),
- sa.literal_column("'DAY'"),
- type_=sa.Date,
- )
-
- deftype = sa.Date
- elif use_function_defaults:
- f = sa.select([func.length("abcdef")], bind=db).scalar()
- f2 = sa.select([func.length("abcdefghijk")], bind=db).scalar()
- def1 = currenttime
- deftype = sa.Date
- if testing.against("mssql"):
- def2 = sa.text("getdate()")
- else:
- def2 = sa.text("current_date")
- ts = db.scalar(func.current_date())
- else:
- f = len("abcdef")
- f2 = len("abcdefghijk")
- def1 = def2 = "3"
- ts = 3
- deftype = Integer
-
- t = Table(
- "default_test1",
- metadata,
- # python function
- Column("col1", Integer, primary_key=True, default=mydefault),
- # python literal
- Column(
- "col2",
- String(20),
- default="imthedefault",
- onupdate="im the update",
- ),
- # preexecute expression
- Column(
- "col3",
- Integer,
- default=func.length("abcdef"),
- onupdate=func.length("abcdefghijk"),
- ),
- # SQL-side default from sql expression
- Column("col4", deftype, server_default=def1),
- # SQL-side default from literal expression
- Column("col5", deftype, server_default=def2),
- # preexecute + update timestamp
- Column("col6", sa.Date, default=currenttime, onupdate=currenttime),
- Column("boolcol1", sa.Boolean, default=True),
- Column("boolcol2", sa.Boolean, default=False),
- # python function which uses ExecutionContext
- Column(
- "col7",
- Integer,
- default=mydefault_using_connection,
- onupdate=myupdate_with_ctx,
- ),
- # python builtin
- Column(
- "col8",
- sa.Date,
- default=datetime.date.today,
- onupdate=datetime.date.today,
- ),
- # combo
- Column("col9", String(20), default="py", server_default="ddl"),
- # python method w/ context
- Column("col10", String(20), default=MyClass.gen_default),
- # fixed default w/ type that has bound processor
- Column("col11", MyType(), default="foo"),
- )
-
- t.create()
-
- @classmethod
- def teardown_class(cls):
- t.drop()
-
- def teardown(self):
- default_generator["x"] = 50
- t.delete().execute()
-
+class DefaultObjectTest(fixtures.TestBase):
def test_bad_arg_signature(self):
ex_msg = (
"ColumnDefault Python function takes zero "
c = sa.ColumnDefault(fn)
c.arg("context")
- @testing.fails_on("firebird", "Data type unknown")
- def test_standalone(self):
- c = testing.db.engine.connect()
- x = c.execute(t.c.col1.default)
- y = t.c.col2.default.execute()
- z = c.execute(t.c.col3.default)
- assert 50 <= x <= 57
- eq_(y, "imthedefault")
- eq_(z, f)
- eq_(f2, 11)
-
- def test_py_vs_server_default_detection(self):
- def has_(name, *wanted):
- slots = [
- "default",
- "onupdate",
- "server_default",
- "server_onupdate",
- ]
- col = tbl.c[name]
- for slot in wanted:
- slots.remove(slot)
- assert getattr(col, slot) is not None, getattr(col, slot)
- for slot in slots:
- assert getattr(col, slot) is None, getattr(col, slot)
-
- tbl = t
- has_("col1", "default")
- has_("col2", "default", "onupdate")
- has_("col3", "default", "onupdate")
- has_("col4", "server_default")
- has_("col5", "server_default")
- has_("col6", "default", "onupdate")
- has_("boolcol1", "default")
- has_("boolcol2", "default")
- has_("col7", "default", "onupdate")
- has_("col8", "default", "onupdate")
- has_("col9", "default", "server_default")
+ def _check_default_slots(self, tbl, name, *wanted):
+ slots = [
+ "default",
+ "onupdate",
+ "server_default",
+ "server_onupdate",
+ ]
+ col = tbl.c[name]
+ for slot in wanted:
+ slots.remove(slot)
+ assert getattr(col, slot) is not None, getattr(col, slot)
+ for slot in slots:
+ assert getattr(col, slot) is None, getattr(col, slot)
+
+ def test_py_vs_server_default_detection_one(self):
+ has_ = self._check_default_slots
+
+ metadata = MetaData()
+ tbl = Table(
+ "default_test",
+ metadata,
+ # python function
+ Column("col1", Integer, primary_key=True, default="1"),
+ # python literal
+ Column(
+ "col2",
+ String(20),
+ default="imthedefault",
+ onupdate="im the update",
+ ),
+ # preexecute expression
+ Column(
+ "col3",
+ Integer,
+ default=func.length("abcdef"),
+ onupdate=func.length("abcdefghijk"),
+ ),
+ # SQL-side default from sql expression
+ Column("col4", Integer, server_default="1"),
+ # SQL-side default from literal expression
+ Column("col5", Integer, server_default="1"),
+ # preexecute + update timestamp
+ Column(
+ "col6",
+ sa.Date,
+ default=datetime.datetime.today,
+ onupdate=datetime.datetime.today,
+ ),
+ Column("boolcol1", sa.Boolean, default=True),
+ Column("boolcol2", sa.Boolean, default=False),
+ # python function which uses ExecutionContext
+ Column("col7", Integer, default=lambda: 5, onupdate=lambda: 10,),
+ # python builtin
+ Column(
+ "col8",
+ sa.Date,
+ default=datetime.date.today,
+ onupdate=datetime.date.today,
+ ),
+ Column("col9", String(20), default="py", server_default="ddl"),
+ )
+
+ has_(tbl, "col1", "default")
+ has_(tbl, "col2", "default", "onupdate")
+ has_(tbl, "col3", "default", "onupdate")
+ has_(tbl, "col4", "server_default")
+ has_(tbl, "col5", "server_default")
+ has_(tbl, "col6", "default", "onupdate")
+ has_(tbl, "boolcol1", "default")
+ has_(tbl, "boolcol2", "default")
+ has_(tbl, "col7", "default", "onupdate")
+ has_(tbl, "col8", "default", "onupdate")
+ has_(tbl, "col9", "default", "server_default")
+
+ def test_py_vs_server_default_detection_two(self):
+ has_ = self._check_default_slots
+ metadata = MetaData()
ColumnDefault, DefaultClause = sa.ColumnDefault, sa.DefaultClause
- t2 = Table(
+ tbl = Table(
"t2",
- MetaData(),
+ metadata,
Column("col1", Integer, Sequence("foo")),
Column(
"col2", Integer, default=Sequence("foo"), server_default="y"
onupdate="z",
),
)
- tbl = t2
- has_("col1", "default")
- has_("col2", "default", "server_default")
- has_("col3", "default", "server_default")
- has_("col4", "default", "server_default", "server_onupdate")
- has_("col5", "default", "server_default", "onupdate")
- has_("col6", "default", "server_default", "onupdate")
- has_("col7", "default", "server_default", "onupdate")
+ has_(tbl, "col1", "default")
+ has_(tbl, "col2", "default", "server_default")
+ has_(tbl, "col3", "default", "server_default")
+ has_(tbl, "col4", "default", "server_default", "server_onupdate")
+ has_(tbl, "col5", "default", "server_default", "onupdate")
+ has_(tbl, "col6", "default", "server_default", "onupdate")
+ has_(tbl, "col7", "default", "server_default", "onupdate")
has_(
- "col8", "default", "server_default", "onupdate", "server_onupdate"
+ tbl,
+ "col8",
+ "default",
+ "server_default",
+ "onupdate",
+ "server_onupdate",
)
- @testing.fails_on("firebird", "Data type unknown")
- def test_insert(self):
- r = t.insert().execute()
+ def test_no_embed_in_sql(self):
+ """Using a DefaultGenerator, Sequence, DefaultClause
+ in the columns, where clause of a select, or in the values
+ clause of insert, update, raises an informative error"""
+
+ t = Table("some_table", MetaData(), Column("id", Integer))
+ for const in (
+ sa.Sequence("y"),
+ sa.ColumnDefault("y"),
+ sa.DefaultClause("y"),
+ ):
+ assert_raises_message(
+ sa.exc.ArgumentError,
+ r"SQL expression for WHERE/HAVING role expected, "
+ r"got \[(?:Sequence|ColumnDefault|DefaultClause)\('y'.*\)\]",
+ t.select,
+ [const],
+ )
+ assert_raises_message(
+ sa.exc.ArgumentError,
+ "SQL expression element expected, got %s"
+ % const.__class__.__name__,
+ t.insert().values,
+ col4=const,
+ )
+ assert_raises_message(
+ sa.exc.ArgumentError,
+ "SQL expression element expected, got %s"
+ % const.__class__.__name__,
+ t.update().values,
+ col4=const,
+ )
+
+
+class DefaultRoundTripTest(fixtures.TablesTest):
+ __backend__ = True
+
+ @classmethod
+ def define_tables(cls, metadata):
+ default_generator = cls.default_generator = {"x": 50}
+
+ def mydefault():
+ default_generator["x"] += 1
+ return default_generator["x"]
+
+ def myupdate_with_ctx(ctx):
+ conn = ctx.connection
+ return conn.execute(sa.select([sa.text("13")])).scalar()
+
+ def mydefault_using_connection(ctx):
+ conn = ctx.connection
+ return conn.execute(sa.select([sa.text("12")])).scalar()
+
+ use_function_defaults = testing.against("postgresql", "mssql")
+ is_oracle = testing.against("oracle")
+
+ class MyClass(object):
+ @classmethod
+ def gen_default(cls, ctx):
+ return "hi"
+
+ class MyType(TypeDecorator):
+ impl = String(50)
+
+ def process_bind_param(self, value, dialect):
+ if value is not None:
+ value = "BIND" + value
+ return value
+
+ cls.f = 6
+ cls.f2 = 11
+ with testing.db.connect() as conn:
+ currenttime = cls.currenttime = func.current_date(type_=sa.Date)
+ if is_oracle:
+ ts = conn.scalar(
+ sa.select(
+ [
+ func.trunc(
+ func.current_timestamp(),
+ sa.literal_column("'DAY'"),
+ type_=sa.Date,
+ )
+ ]
+ )
+ )
+ currenttime = cls.currenttime = func.trunc(
+ currenttime, sa.literal_column("'DAY'"), type_=sa.Date
+ )
+ def1 = currenttime
+ def2 = func.trunc(
+ sa.text("current_timestamp"),
+ sa.literal_column("'DAY'"),
+ type_=sa.Date,
+ )
+
+ deftype = sa.Date
+ elif use_function_defaults:
+ def1 = currenttime
+ deftype = sa.Date
+ if testing.against("mssql"):
+ def2 = sa.text("getdate()")
+ else:
+ def2 = sa.text("current_date")
+ ts = conn.scalar(func.current_date())
+ else:
+ def1 = def2 = "3"
+ ts = 3
+ deftype = Integer
+
+ cls.ts = ts
+
+ Table(
+ "default_test",
+ metadata,
+ # python function
+ Column("col1", Integer, primary_key=True, default=mydefault),
+ # python literal
+ Column(
+ "col2",
+ String(20),
+ default="imthedefault",
+ onupdate="im the update",
+ ),
+ # preexecute expression
+ Column(
+ "col3",
+ Integer,
+ default=func.length("abcdef"),
+ onupdate=func.length("abcdefghijk"),
+ ),
+ # SQL-side default from sql expression
+ Column("col4", deftype, server_default=def1),
+ # SQL-side default from literal expression
+ Column("col5", deftype, server_default=def2),
+ # preexecute + update timestamp
+ Column("col6", sa.Date, default=currenttime, onupdate=currenttime),
+ Column("boolcol1", sa.Boolean, default=True),
+ Column("boolcol2", sa.Boolean, default=False),
+ # python function which uses ExecutionContext
+ Column(
+ "col7",
+ Integer,
+ default=mydefault_using_connection,
+ onupdate=myupdate_with_ctx,
+ ),
+ # python builtin
+ Column(
+ "col8",
+ sa.Date,
+ default=datetime.date.today,
+ onupdate=datetime.date.today,
+ ),
+ # combo
+ Column("col9", String(20), default="py", server_default="ddl"),
+ # python method w/ context
+ Column("col10", String(20), default=MyClass.gen_default),
+ # fixed default w/ type that has bound processor
+ Column("col11", MyType(), default="foo"),
+ )
+
+ def teardown(self):
+ self.default_generator["x"] = 50
+ super(DefaultRoundTripTest, self).teardown()
+
+ def test_standalone(self, connection):
+ t = self.tables.default_test
+ x = connection.execute(t.c.col1.default)
+ y = connection.execute(t.c.col2.default)
+ z = connection.execute(t.c.col3.default)
+ assert 50 <= x <= 57
+ eq_(y, "imthedefault")
+ eq_(z, self.f)
+
+ def test_insert(self, connection):
+ t = self.tables.default_test
+
+ r = connection.execute(t.insert())
assert r.lastrow_has_defaults()
eq_(
set(r.context.postfetch_cols),
set([t.c.col3, t.c.col5, t.c.col4, t.c.col6]),
)
- r = t.insert().inline().execute()
+ r = connection.execute(t.insert().inline())
assert r.lastrow_has_defaults()
eq_(
set(r.context.postfetch_cols),
set([t.c.col3, t.c.col5, t.c.col4, t.c.col6]),
)
- t.insert().execute()
+ connection.execute(t.insert())
- ctexec = sa.select(
- [currenttime.label("now")], bind=testing.db
+ ctexec = connection.execute(
+ sa.select([self.currenttime.label("now")])
).scalar()
- result = t.select().order_by(t.c.col1).execute()
+ result = connection.execute(t.select().order_by(t.c.col1))
today = datetime.date.today()
eq_(
result.fetchall(),
(
x,
"imthedefault",
- f,
- ts,
- ts,
+ self.f,
+ self.ts,
+ self.ts,
ctexec,
True,
False,
],
)
- t.insert().execute(col9=None)
+ connection.execute(t.insert(), dict(col9=None))
+
+ # TODO: why are we looking at 'r' when we just executed something
+ # else ?
assert r.lastrow_has_defaults()
+
eq_(
set(r.context.postfetch_cols),
set([t.c.col3, t.c.col5, t.c.col4, t.c.col6]),
)
eq_(
- t.select(t.c.col1 == 54).execute().fetchall(),
+ list(connection.execute(t.select().where(t.c.col1 == 54))),
[
(
54,
"imthedefault",
- f,
- ts,
- ts,
+ self.f,
+ self.ts,
+ self.ts,
ctexec,
True,
False,
],
)
- def test_insertmany(self):
- t.insert().execute({}, {}, {})
+ def test_insertmany(self, connection):
+ t = self.tables.default_test
+
+ connection.execute(t.insert(), [{}, {}, {}])
- ctexec = currenttime.scalar()
- result = t.select().order_by(t.c.col1).execute()
+ ctexec = connection.scalar(self.currenttime)
+ result = connection.execute(t.select().order_by(t.c.col1))
today = datetime.date.today()
eq_(
- result.fetchall(),
+ list(result),
[
(
51,
"imthedefault",
- f,
- ts,
- ts,
+ self.f,
+ self.ts,
+ self.ts,
ctexec,
True,
False,
(
52,
"imthedefault",
- f,
- ts,
- ts,
+ self.f,
+ self.ts,
+ self.ts,
ctexec,
True,
False,
(
53,
"imthedefault",
- f,
- ts,
- ts,
+ self.f,
+ self.ts,
+ self.ts,
ctexec,
True,
False,
)
@testing.requires.multivalues_inserts
- def test_insert_multivalues(self):
+ def test_insert_multivalues(self, connection):
+ t = self.tables.default_test
- t.insert().values([{}, {}, {}]).execute()
+ connection.execute(t.insert().values([{}, {}, {}]))
- ctexec = currenttime.scalar()
- result = t.select().order_by(t.c.col1).execute()
+ ctexec = connection.execute(self.currenttime).scalar()
+ result = connection.execute(t.select().order_by(t.c.col1))
today = datetime.date.today()
eq_(
- result.fetchall(),
+ list(result),
[
(
51,
"imthedefault",
- f,
- ts,
- ts,
+ self.f,
+ self.ts,
+ self.ts,
ctexec,
True,
False,
(
52,
"imthedefault",
- f,
- ts,
- ts,
+ self.f,
+ self.ts,
+ self.ts,
ctexec,
True,
False,
(
53,
"imthedefault",
- f,
- ts,
- ts,
+ self.f,
+ self.ts,
+ self.ts,
ctexec,
True,
False,
],
)
- def test_no_embed_in_sql(self):
- """Using a DefaultGenerator, Sequence, DefaultClause
- in the columns, where clause of a select, or in the values
- clause of insert, update, raises an informative error"""
-
- for const in (
- sa.Sequence("y"),
- sa.ColumnDefault("y"),
- sa.DefaultClause("y"),
- ):
- assert_raises_message(
- sa.exc.ArgumentError,
- r"SQL expression for WHERE/HAVING role expected, "
- r"got \[(?:Sequence|ColumnDefault|DefaultClause)\('y'.*\)\]",
- t.select,
- [const],
- )
- assert_raises_message(
- sa.exc.ArgumentError,
- "SQL expression element expected, got %s"
- % const.__class__.__name__,
- t.insert().values,
- col4=const,
- )
- assert_raises_message(
- sa.exc.ArgumentError,
- "SQL expression element expected, got %s"
- % const.__class__.__name__,
- t.update().values,
- col4=const,
- )
-
- def test_missing_many_param(self):
+ def test_missing_many_param(self, connection):
+ t = self.tables.default_test
assert_raises_message(
exc.StatementError,
"A value is required for bind parameter 'col7', in parameter "
"group 1",
- t.insert().execute,
+ connection.execute,
+ t.insert(),
{"col4": 7, "col7": 12, "col8": 19},
{"col4": 7, "col8": 19},
{"col4": 7, "col7": 12, "col8": 19},
)
- def test_insert_values(self):
- t.insert(values={"col3": 50}).execute()
- result = t.select().execute()
+ def test_insert_values(self, connection):
+ t = self.tables.default_test
+ connection.execute(t.insert().values(col3=50))
+ result = connection.execute(t.select().order_by(t.c.col1))
eq_(50, result.first()._mapping["col3"])
- @testing.fails_on("firebird", "Data type unknown")
- def test_updatemany(self):
- # MySQL-Python 1.2.2 breaks functions in execute_many :(
- if testing.against(
- "mysql+mysqldb"
- ) and testing.db.dialect.dbapi.version_info[:3] == (1, 2, 2):
- return
+ def test_updatemany(self, connection):
+ t = self.tables.default_test
- t.insert().execute({}, {}, {})
+ connection.execute(t.insert(), [{}, {}, {}])
- t.update(t.c.col1 == sa.bindparam("pkval")).execute(
- {"pkval": 51, "col7": None, "col8": None, "boolcol1": False}
+ connection.execute(
+ t.update().where(t.c.col1 == sa.bindparam("pkval")),
+ {"pkval": 51, "col7": None, "col8": None, "boolcol1": False},
)
- t.update(t.c.col1 == sa.bindparam("pkval")).execute(
- {"pkval": 51}, {"pkval": 52}, {"pkval": 53}
+ connection.execute(
+ t.update().where(t.c.col1 == sa.bindparam("pkval")),
+ [{"pkval": 51}, {"pkval": 52}, {"pkval": 53}],
)
- result = t.select().execute()
- ctexec = currenttime.scalar()
+ ctexec = connection.scalar(self.currenttime)
today = datetime.date.today()
+ result = connection.execute(t.select().order_by(t.c.col1))
eq_(
- result.fetchall(),
+ list(result),
[
(
51,
"im the update",
- f2,
- ts,
- ts,
+ self.f2,
+ self.ts,
+ self.ts,
ctexec,
False,
False,
(
52,
"im the update",
- f2,
- ts,
- ts,
+ self.f2,
+ self.ts,
+ self.ts,
ctexec,
True,
False,
(
53,
"im the update",
- f2,
- ts,
- ts,
+ self.f2,
+ self.ts,
+ self.ts,
ctexec,
True,
False,
],
)
- @testing.fails_on("firebird", "Data type unknown")
- def test_update(self):
- r = t.insert().execute()
+ def test_update(self, connection):
+ t = self.tables.default_test
+ r = connection.execute(t.insert())
pk = r.inserted_primary_key[0]
- t.update(t.c.col1 == pk).execute(col4=None, col5=None)
- ctexec = currenttime.scalar()
- result = t.select(t.c.col1 == pk).execute()
+ connection.execute(
+ t.update().where(t.c.col1 == pk), dict(col4=None, col5=None)
+ )
+ ctexec = connection.scalar(self.currenttime)
+ result = connection.execute(t.select().where(t.c.col1 == pk))
result = result.first()
eq_(
result,
(
pk,
"im the update",
- f2,
+ self.f2,
None,
None,
ctexec,
"BINDfoo",
),
)
- eq_(11, f2)
- @testing.fails_on("firebird", "Data type unknown")
- def test_update_values(self):
- r = t.insert().execute()
+ def test_update_values(self, connection):
+ t = self.tables.default_test
+ r = connection.execute(t.insert())
pk = r.inserted_primary_key[0]
- t.update(t.c.col1 == pk, values={"col3": 55}).execute()
- result = t.select(t.c.col1 == pk).execute()
- result = result.first()
- eq_(55, result._mapping["col3"])
+ connection.execute(t.update().where(t.c.col1 == pk).values(col3=55))
+ result = connection.execute(t.select().where(t.c.col1 == pk))
+ row = result.first()
+ eq_(55, row._mapping["col3"])
class CTEDefaultTest(fixtures.TablesTest):
Column("u", Integer, onupdate=1),
)
- def _test_a_in_b(self, a, b):
+ @testing.combinations(
+ ("update", "select", testing.requires.ctes_on_dml),
+ ("update", "select", testing.requires.ctes_on_dml),
+ ("update", "select", testing.requires.ctes_on_dml),
+ ("select", "update"),
+ ("select", "insert"),
+ argnames="a, b",
+ )
+ def test_a_in_b(self, a, b, connection):
q = self.tables.q
p = self.tables.p
- with testing.db.connect() as conn:
- if a == "delete":
- conn.execute(q.insert().values(y=10, z=1))
- cte = q.delete().where(q.c.z == 1).returning(q.c.z).cte("c")
- expected = None
- elif a == "insert":
- cte = q.insert().values(z=1, y=10).returning(q.c.z).cte("c")
- expected = (2, 10)
- elif a == "update":
- conn.execute(q.insert().values(x=5, y=10, z=1))
- cte = (
- q.update()
- .where(q.c.z == 1)
- .values(x=7)
- .returning(q.c.z)
- .cte("c")
- )
- expected = (7, 5)
- elif a == "select":
- conn.execute(q.insert().values(x=5, y=10, z=1))
- cte = sa.select([q.c.z]).cte("c")
- expected = (5, 10)
-
- if b == "select":
- conn.execute(p.insert().values(s=1))
- stmt = select([p.c.s, cte.c.z]).where(p.c.s == cte.c.z)
- elif b == "insert":
- sel = select([1, cte.c.z])
- stmt = (
- p.insert()
- .from_select(["s", "t"], sel)
- .returning(p.c.s, p.c.t)
- )
- elif b == "delete":
- stmt = (
- p.insert().values(s=1, t=cte.c.z).returning(p.c.s, cte.c.z)
- )
- elif b == "update":
- conn.execute(p.insert().values(s=1))
- stmt = (
- p.update()
- .values(t=5)
- .where(p.c.s == cte.c.z)
- .returning(p.c.u, cte.c.z)
- )
- eq_(conn.execute(stmt).fetchall(), [(1, 1)])
-
- eq_(conn.execute(select([q.c.x, q.c.y])).fetchone(), expected)
-
- @testing.requires.ctes_on_dml
- def test_update_in_select(self):
- self._test_a_in_b("update", "select")
-
- @testing.requires.ctes_on_dml
- def test_delete_in_select(self):
- self._test_a_in_b("update", "select")
-
- @testing.requires.ctes_on_dml
- def test_insert_in_select(self):
- self._test_a_in_b("update", "select")
-
- def test_select_in_update(self):
- self._test_a_in_b("select", "update")
-
- def test_select_in_insert(self):
- self._test_a_in_b("select", "insert")
+ conn = connection
+ if a == "delete":
+ conn.execute(q.insert().values(y=10, z=1))
+ cte = q.delete().where(q.c.z == 1).returning(q.c.z).cte("c")
+ expected = None
+ elif a == "insert":
+ cte = q.insert().values(z=1, y=10).returning(q.c.z).cte("c")
+ expected = (2, 10)
+ elif a == "update":
+ conn.execute(q.insert().values(x=5, y=10, z=1))
+ cte = (
+ q.update()
+ .where(q.c.z == 1)
+ .values(x=7)
+ .returning(q.c.z)
+ .cte("c")
+ )
+ expected = (7, 5)
+ elif a == "select":
+ conn.execute(q.insert().values(x=5, y=10, z=1))
+ cte = sa.select([q.c.z]).cte("c")
+ expected = (5, 10)
+
+ if b == "select":
+ conn.execute(p.insert().values(s=1))
+ stmt = select([p.c.s, cte.c.z]).where(p.c.s == cte.c.z)
+ elif b == "insert":
+ sel = select([1, cte.c.z])
+ stmt = (
+ p.insert().from_select(["s", "t"], sel).returning(p.c.s, p.c.t)
+ )
+ elif b == "delete":
+ stmt = p.insert().values(s=1, t=cte.c.z).returning(p.c.s, cte.c.z)
+ elif b == "update":
+ conn.execute(p.insert().values(s=1))
+ stmt = (
+ p.update()
+ .values(t=5)
+ .where(p.c.s == cte.c.z)
+ .returning(p.c.u, cte.c.z)
+ )
+ eq_(list(conn.execute(stmt)), [(1, 1)])
- # TODO: updates / inserts can be run in one statement w/ CTE ?
- # deletes?
+ eq_(conn.execute(select([q.c.x, q.c.y])).first(), expected)
class PKDefaultTest(fixtures.TablesTest):
self._test_autoincrement(testing.db)
def test_autoincrement_transaction(self):
- con = testing.db.connect()
- tx = con.begin()
- try:
- try:
- self._test_autoincrement(con)
- except Exception:
- try:
- tx.rollback()
- except Exception:
- pass
- raise
- else:
- tx.commit()
- finally:
- con.close()
+ with testing.db.begin() as conn:
+ self._test_autoincrement(conn)
class EmptyInsertTest(fixtures.TestBase):
__backend__ = True
- @testing.exclude("sqlite", "<", (3, 3, 8), "no empty insert support")
@testing.fails_on("oracle", "FIXME: unknown")
@testing.provide_metadata
- def test_empty_insert(self):
+ def test_empty_insert(self, connection):
t1 = Table(
"t1",
self.metadata,
Column("is_true", Boolean, server_default=("1")),
)
- self.metadata.create_all()
- t1.insert().execute()
- eq_(1, select([func.count(text("*"))], from_obj=t1).scalar())
- eq_(True, t1.select().scalar())
+ self.metadata.create_all(connection)
+ connection.execute(t1.insert())
+ eq_(
+ 1,
+ connection.scalar(select([func.count(text("*"))]).select_from(t1)),
+ )
+ eq_(True, connection.scalar(t1.select()))
-class AutoIncrementTest(fixtures.TablesTest):
+class AutoIncrementTest(fixtures.TestBase):
__requires__ = ("identity",)
- run_define_tables = "each"
__backend__ = True
- @classmethod
- def define_tables(cls, metadata):
- """Each test manipulates self.metadata individually."""
-
- @testing.exclude("sqlite", "<", (3, 4), "no database support")
- def test_autoincrement_single_col(self):
+ @testing.provide_metadata
+ def test_autoincrement_single_col(self, connection):
single = Table(
"single", self.metadata, Column("id", Integer, primary_key=True)
)
- single.create()
+ self.metadata.create_all(connection)
- r = single.insert().execute()
+ r = connection.execute(single.insert())
id_ = r.inserted_primary_key[0]
eq_(id_, 1)
- eq_(1, sa.select([func.count(sa.text("*"))], from_obj=single).scalar())
-
- def test_autoincrement_fk(self):
- nodes = Table(
- "nodes",
- self.metadata,
- Column("id", Integer, primary_key=True),
- Column("parent_id", Integer, ForeignKey("nodes.id")),
- Column("data", String(30)),
- )
- nodes.create()
-
- r = nodes.insert().execute(data="foo")
- id_ = r.inserted_primary_key[0]
- nodes.insert().execute(data="bar", parent_id=id_)
+ eq_(connection.scalar(sa.select([single.c.id])), 1)
def test_autoinc_detection_no_affinity(self):
class MyType(TypeDecorator):
assert x._autoincrement_column is None
@testing.only_on("sqlite")
- def test_non_autoincrement(self):
+ @testing.provide_metadata
+ def test_non_autoincrement(self, connection):
# sqlite INT primary keys can be non-unique! (only for ints)
nonai = Table(
"nonaitest",
Column("id", Integer, autoincrement=False, primary_key=True),
Column("data", String(20)),
)
- nonai.create()
-
- def go():
- # postgresql + mysql strict will fail on first row,
- # mysql in legacy mode fails on second row
- nonai.insert().execute(data="row 1")
- nonai.insert().execute(data="row 2")
+ nonai.create(connection)
# just testing SQLite for now, it passes
with expect_warnings(".*has no Python-side or server-side default.*"):
- go()
+ # postgresql + mysql strict will fail on first row,
+ # mysql in legacy mode fails on second row
+ connection.execute(nonai.insert(), dict(data="row 1"))
+ connection.execute(nonai.insert(), dict(data="row 2"))
@testing.metadata_fixture(ddl="function")
def dataset_no_autoinc(self, metadata):
return dataset_no_autoinc
def test_col_w_optional_sequence_non_autoinc_no_firing(
- self, dataset_no_autoinc
+ self, dataset_no_autoinc, connection
):
"""this is testing that a Table which includes a Sequence, when
run against a DB that does not support sequences, the Sequence
"""
dataset_no_autoinc.c.set_id.default.optional = True
- with testing.db.connect() as conn:
- conn.execute(dataset_no_autoinc.insert())
- eq_(
- conn.scalar(
- select([func.count("*")]).select_from(dataset_no_autoinc)
- ),
- 1,
- )
+ connection.execute(dataset_no_autoinc.insert())
+ eq_(
+ connection.scalar(
+ select([func.count("*")]).select_from(dataset_no_autoinc)
+ ),
+ 1,
+ )
@testing.fails_if(testing.requires.sequences)
def test_col_w_nonoptional_sequence_non_autoinc_no_firing(
- self, dataset_no_autoinc
+ self, dataset_no_autoinc, connection
):
"""When the sequence is not optional and sequences are supported,
the test fails because we didn't create the sequence.
"""
dataset_no_autoinc.c.set_id.default.optional = False
- with testing.db.connect() as conn:
- conn.execute(dataset_no_autoinc.insert())
- eq_(
- conn.scalar(
- select([func.count("*")]).select_from(dataset_no_autoinc)
- ),
- 1,
- )
-
-
-class SequenceDDLTest(fixtures.TestBase, testing.AssertsCompiledSQL):
- __dialect__ = "default"
- __backend__ = True
-
- def test_create_drop_ddl(self):
- self.assert_compile(
- CreateSequence(Sequence("foo_seq")), "CREATE SEQUENCE foo_seq"
- )
-
- self.assert_compile(
- CreateSequence(Sequence("foo_seq", start=5)),
- "CREATE SEQUENCE foo_seq START WITH 5",
- )
-
- self.assert_compile(
- CreateSequence(Sequence("foo_seq", increment=2)),
- "CREATE SEQUENCE foo_seq INCREMENT BY 2",
- )
-
- self.assert_compile(
- CreateSequence(Sequence("foo_seq", increment=2, start=5)),
- "CREATE SEQUENCE foo_seq INCREMENT BY 2 START WITH 5",
- )
-
- self.assert_compile(
- CreateSequence(
- Sequence("foo_seq", increment=2, start=0, minvalue=0)
- ),
- "CREATE SEQUENCE foo_seq INCREMENT BY 2 START WITH 0 MINVALUE 0",
- )
-
- self.assert_compile(
- CreateSequence(
- Sequence("foo_seq", increment=2, start=1, maxvalue=5)
- ),
- "CREATE SEQUENCE foo_seq INCREMENT BY 2 START WITH 1 MAXVALUE 5",
- )
-
- self.assert_compile(
- CreateSequence(
- Sequence("foo_seq", increment=2, start=1, nomaxvalue=True)
- ),
- "CREATE SEQUENCE foo_seq INCREMENT BY 2 START WITH 1 NO MAXVALUE",
- )
-
- self.assert_compile(
- CreateSequence(
- Sequence("foo_seq", increment=2, start=0, nominvalue=True)
- ),
- "CREATE SEQUENCE foo_seq INCREMENT BY 2 START WITH 0 NO MINVALUE",
- )
-
- self.assert_compile(
- CreateSequence(
- Sequence("foo_seq", start=1, maxvalue=10, cycle=True)
- ),
- "CREATE SEQUENCE foo_seq START WITH 1 MAXVALUE 10 CYCLE",
- )
-
- self.assert_compile(
- CreateSequence(Sequence("foo_seq", cache=1000, order=True)),
- "CREATE SEQUENCE foo_seq CACHE 1000 ORDER",
- )
-
- self.assert_compile(
- CreateSequence(Sequence("foo_seq", order=True)),
- "CREATE SEQUENCE foo_seq ORDER",
- )
-
- self.assert_compile(
- DropSequence(Sequence("foo_seq")), "DROP SEQUENCE foo_seq"
- )
-
-
-class SequenceExecTest(fixtures.TestBase):
- __requires__ = ("sequences",)
- __backend__ = True
-
- @classmethod
- def setup_class(cls):
- cls.seq = Sequence("my_sequence")
- cls.seq.create(testing.db)
-
- @classmethod
- def teardown_class(cls):
- cls.seq.drop(testing.db)
-
- def _assert_seq_result(self, ret):
- """asserts return of next_value is an int"""
-
- assert isinstance(ret, util.int_types)
- assert ret > 0
-
- def test_implicit_connectionless(self):
- s = Sequence("my_sequence", metadata=MetaData(testing.db))
- self._assert_seq_result(s.execute())
-
- def test_explicit(self):
- s = Sequence("my_sequence")
- self._assert_seq_result(s.execute(testing.db))
-
- def test_explicit_optional(self):
- """test dialect executes a Sequence, returns nextval, whether
- or not "optional" is set """
-
- s = Sequence("my_sequence", optional=True)
- self._assert_seq_result(s.execute(testing.db))
-
- def test_func_implicit_connectionless_execute(self):
- """test func.next_value().execute()/.scalar() works
- with connectionless execution. """
-
- s = Sequence("my_sequence", metadata=MetaData(testing.db))
- self._assert_seq_result(s.next_value().execute().scalar())
-
- def test_func_explicit(self):
- s = Sequence("my_sequence")
- self._assert_seq_result(testing.db.scalar(s.next_value()))
-
- def test_func_implicit_connectionless_scalar(self):
- """test func.next_value().execute()/.scalar() works. """
-
- s = Sequence("my_sequence", metadata=MetaData(testing.db))
- self._assert_seq_result(s.next_value().scalar())
-
- def test_func_embedded_select(self):
- """test can use next_value() in select column expr"""
-
- s = Sequence("my_sequence")
- self._assert_seq_result(testing.db.scalar(select([s.next_value()])))
-
- @testing.fails_on("oracle", "ORA-02287: sequence number not allowed here")
- @testing.provide_metadata
- def test_func_embedded_whereclause(self):
- """test can use next_value() in whereclause"""
-
- metadata = self.metadata
- t1 = Table("t", metadata, Column("x", Integer))
- t1.create(testing.db)
- testing.db.execute(t1.insert(), [{"x": 1}, {"x": 300}, {"x": 301}])
- s = Sequence("my_sequence")
+ connection.execute(dataset_no_autoinc.insert())
eq_(
- testing.db.execute(
- t1.select().where(t1.c.x > s.next_value())
- ).fetchall(),
- [(300,), (301,)],
- )
-
- @testing.provide_metadata
- def test_func_embedded_valuesbase(self):
- """test can use next_value() in values() of _ValuesBase"""
-
- metadata = self.metadata
- t1 = Table("t", metadata, Column("x", Integer))
- t1.create(testing.db)
- s = Sequence("my_sequence")
- testing.db.execute(t1.insert().values(x=s.next_value()))
- self._assert_seq_result(testing.db.scalar(t1.select()))
-
- @testing.requires.supports_lastrowid
- @testing.provide_metadata
- def test_inserted_pk_no_returning_w_lastrowid(self):
- """test inserted_primary_key contains the pk when
- pk_col=next_value(), lastrowid is supported."""
-
- metadata = self.metadata
- e = engines.testing_engine(options={"implicit_returning": False})
- s = Sequence("my_sequence")
- metadata.bind = e
- t1 = Table("t", metadata, Column("x", Integer, primary_key=True))
- t1.create()
- r = e.execute(t1.insert().values(x=s.next_value()))
- self._assert_seq_result(r.inserted_primary_key[0])
-
- @testing.requires.no_lastrowid_support
- @testing.provide_metadata
- def test_inserted_pk_no_returning_no_lastrowid(self):
- """test inserted_primary_key contains [None] when
- pk_col=next_value(), implicit returning is not used."""
-
- metadata = self.metadata
- e = engines.testing_engine(options={"implicit_returning": False})
- s = Sequence("my_sequence")
- metadata.bind = e
- t1 = Table("t", metadata, Column("x", Integer, primary_key=True))
- t1.create()
- r = e.execute(t1.insert().values(x=s.next_value()))
- eq_(r.inserted_primary_key, [None])
-
- @testing.requires.returning
- @testing.provide_metadata
- def test_inserted_pk_implicit_returning(self):
- """test inserted_primary_key contains the result when
- pk_col=next_value(), when implicit returning is used."""
-
- metadata = self.metadata
- e = engines.testing_engine(options={"implicit_returning": True})
- s = Sequence("my_sequence")
- metadata.bind = e
- t1 = Table("t", metadata, Column("x", Integer, primary_key=True))
- t1.create()
- r = e.execute(t1.insert().values(x=s.next_value()))
- self._assert_seq_result(r.inserted_primary_key[0])
-
-
-class SequenceTest(fixtures.TestBase, testing.AssertsCompiledSQL):
- __requires__ = ("sequences",)
- __backend__ = True
-
- @testing.fails_on("firebird", "no FB support for start/increment")
- def test_start_increment(self):
- for seq in (
- Sequence("foo_seq"),
- Sequence("foo_seq", start=8),
- Sequence("foo_seq", increment=5),
- ):
- seq.create(testing.db)
- try:
- values = [testing.db.execute(seq) for i in range(3)]
- start = seq.start or 1
- inc = seq.increment or 1
- assert values == list(range(start, start + inc * 3, inc))
-
- finally:
- seq.drop(testing.db)
-
- def _has_sequence(self, name):
- return testing.db.dialect.has_sequence(testing.db, name)
-
- def test_nextval_render(self):
- """test dialect renders the "nextval" construct,
- whether or not "optional" is set """
-
- for s in (Sequence("my_seq"), Sequence("my_seq", optional=True)):
- assert str(s.next_value().compile(dialect=testing.db.dialect)) in (
- "nextval('my_seq')",
- "nextval(my_seq)",
- "gen_id(my_seq, 1)",
- "my_seq.nextval",
- )
-
- def test_nextval_unsupported(self):
- """test next_value() used on non-sequence platform
- raises NotImplementedError."""
-
- s = Sequence("my_seq")
- d = sqlite.dialect()
- assert_raises_message(
- NotImplementedError,
- "Dialect 'sqlite' does not support sequence increments.",
- s.next_value().compile,
- dialect=d,
- )
-
- def test_checkfirst_sequence(self):
- s = Sequence("my_sequence")
- s.create(testing.db, checkfirst=False)
- assert self._has_sequence("my_sequence")
- s.create(testing.db, checkfirst=True)
- s.drop(testing.db, checkfirst=False)
- assert not self._has_sequence("my_sequence")
- s.drop(testing.db, checkfirst=True)
-
- def test_checkfirst_metadata(self):
- m = MetaData()
- Sequence("my_sequence", metadata=m)
- m.create_all(testing.db, checkfirst=False)
- assert self._has_sequence("my_sequence")
- m.create_all(testing.db, checkfirst=True)
- m.drop_all(testing.db, checkfirst=False)
- assert not self._has_sequence("my_sequence")
- m.drop_all(testing.db, checkfirst=True)
-
- def test_checkfirst_table(self):
- m = MetaData()
- s = Sequence("my_sequence")
- t = Table("t", m, Column("c", Integer, s, primary_key=True))
- t.create(testing.db, checkfirst=False)
- assert self._has_sequence("my_sequence")
- t.create(testing.db, checkfirst=True)
- t.drop(testing.db, checkfirst=False)
- assert not self._has_sequence("my_sequence")
- t.drop(testing.db, checkfirst=True)
-
- @testing.provide_metadata
- def test_table_overrides_metadata_create(self):
- metadata = self.metadata
- Sequence("s1", metadata=metadata)
- s2 = Sequence("s2", metadata=metadata)
- s3 = Sequence("s3")
- t = Table("t", metadata, Column("c", Integer, s3, primary_key=True))
- assert s3.metadata is metadata
-
- t.create(testing.db, checkfirst=True)
- s3.drop(testing.db)
-
- # 't' is created, and 's3' won't be
- # re-created since it's linked to 't'.
- # 's1' and 's2' are, however.
- metadata.create_all(testing.db)
- assert self._has_sequence("s1")
- assert self._has_sequence("s2")
- assert not self._has_sequence("s3")
-
- s2.drop(testing.db)
- assert self._has_sequence("s1")
- assert not self._has_sequence("s2")
-
- metadata.drop_all(testing.db)
- assert not self._has_sequence("s1")
- assert not self._has_sequence("s2")
-
- @testing.requires.returning
- @testing.provide_metadata
- def test_freestanding_sequence_via_autoinc(self):
- t = Table(
- "some_table",
- self.metadata,
- Column(
- "id",
- Integer,
- autoincrement=True,
- primary_key=True,
- default=Sequence(
- "my_sequence", metadata=self.metadata
- ).next_value(),
- ),
- )
- self.metadata.create_all(testing.db)
-
- result = testing.db.execute(t.insert())
- eq_(result.inserted_primary_key, [1])
-
-
-cartitems = sometable = metadata = None
-
-
-class TableBoundSequenceTest(fixtures.TestBase):
- __requires__ = ("sequences",)
- __backend__ = True
-
- @classmethod
- def setup_class(cls):
- global cartitems, sometable, metadata
- metadata = MetaData(testing.db)
- cartitems = Table(
- "cartitems",
- metadata,
- Column(
- "cart_id", Integer, Sequence("cart_id_seq"), primary_key=True
- ),
- Column("description", String(40)),
- Column("createdate", sa.DateTime()),
- )
- sometable = Table(
- "Manager",
- metadata,
- Column("obj_id", Integer, Sequence("obj_id_seq")),
- Column("name", String(128)),
- Column(
- "id",
- Integer,
- Sequence("Manager_id_seq", optional=True),
- primary_key=True,
+ connection.scalar(
+ select([func.count("*")]).select_from(dataset_no_autoinc)
),
- )
-
- metadata.create_all()
-
- @classmethod
- def teardown_class(cls):
- metadata.drop_all()
-
- def test_insert_via_seq(self):
- cartitems.insert().execute(description="hi")
- cartitems.insert().execute(description="there")
- r = cartitems.insert().execute(description="lala")
-
- assert r.inserted_primary_key and r.inserted_primary_key[0] is not None
- id_ = r.inserted_primary_key[0]
-
- eq_(
1,
- sa.select(
- [func.count(cartitems.c.cart_id)],
- sa.and_(
- cartitems.c.description == "lala",
- cartitems.c.cart_id == id_,
- ),
- ).scalar(),
- )
-
- cartitems.select().execute().fetchall()
-
- def test_seq_nonpk(self):
- """test sequences fire off as defaults on non-pk columns"""
-
- engine = engines.testing_engine(options={"implicit_returning": False})
- result = engine.execute(sometable.insert(), name="somename")
-
- assert set(result.postfetch_cols()) == set([sometable.c.obj_id])
-
- result = engine.execute(sometable.insert(), name="someother")
- assert set(result.postfetch_cols()) == set([sometable.c.obj_id])
-
- sometable.insert().execute({"name": "name3"}, {"name": "name4"})
- eq_(
- sometable.select().order_by(sometable.c.id).execute().fetchall(),
- [
- (1, "somename", 1),
- (2, "someother", 2),
- (3, "name3", 3),
- (4, "name4", 4),
- ],
- )
-
-
-class SequenceAsServerDefaultTest(
- testing.AssertsExecutionResults, fixtures.TablesTest
-):
- __requires__ = ("sequences_as_server_defaults",)
- __backend__ = True
-
- run_create_tables = "each"
-
- @classmethod
- def define_tables(cls, metadata):
- m = metadata
-
- s = Sequence("t_seq", metadata=m)
- Table(
- "t_seq_test",
- m,
- Column("id", Integer, s, server_default=s.next_value()),
- Column("data", String(50)),
- )
-
- s2 = Sequence("t_seq_2", metadata=m)
- Table(
- "t_seq_test_2",
- m,
- Column("id", Integer, server_default=s2.next_value()),
- Column("data", String(50)),
- )
-
- def test_default_textual_w_default(self):
- with testing.db.connect() as conn:
- conn.exec_driver_sql(
- "insert into t_seq_test (data) values ('some data')"
- )
-
- eq_(conn.exec_driver_sql("select id from t_seq_test").scalar(), 1)
-
- def test_default_core_w_default(self):
- t_seq_test = self.tables.t_seq_test
- with testing.db.connect() as conn:
- conn.execute(t_seq_test.insert().values(data="some data"))
-
- eq_(conn.scalar(select([t_seq_test.c.id])), 1)
-
- def test_default_textual_server_only(self):
- with testing.db.connect() as conn:
- conn.exec_driver_sql(
- "insert into t_seq_test_2 (data) values ('some data')"
- )
-
- eq_(
- conn.exec_driver_sql("select id from t_seq_test_2").scalar(), 1
- )
-
- def test_default_core_server_only(self):
- t_seq_test = self.tables.t_seq_test_2
- with testing.db.connect() as conn:
- conn.execute(t_seq_test.insert().values(data="some data"))
-
- eq_(conn.scalar(select([t_seq_test.c.id])), 1)
-
- def test_drop_ordering(self):
- self.assert_sql_execution(
- testing.db,
- lambda: self.metadata.drop_all(checkfirst=False),
- AllOf(
- CompiledSQL("DROP TABLE t_seq_test_2", {}),
- EachOf(
- CompiledSQL("DROP TABLE t_seq_test", {}),
- CompiledSQL(
- "DROP SEQUENCE t_seq", # dropped as part of t_seq_test
- {},
- ),
- ),
- ),
- CompiledSQL(
- "DROP SEQUENCE t_seq_2", # dropped as part of metadata level
- {},
- ),
)
implicit_returning=implicit_returning,
)
- t.create()
- r = t.insert().values(data=5).execute()
-
- # we don't pre-fetch 'server_default'.
- if "server_default" in kw and (
- not testing.db.dialect.implicit_returning or not implicit_returning
- ):
- eq_(r.inserted_primary_key, [None])
- else:
- eq_(r.inserted_primary_key, ["INT_1"])
- r.close()
+ with testing.db.connect() as conn:
+ t.create(conn)
+ r = conn.execute(t.insert().values(data=5))
+
+ # we don't pre-fetch 'server_default'.
+ if "server_default" in kw and (
+ not testing.db.dialect.implicit_returning
+ or not implicit_returning
+ ):
+ eq_(r.inserted_primary_key, [None])
+ else:
+ eq_(r.inserted_primary_key, ["INT_1"])
- eq_(t.select().execute().first(), ("INT_1", 5))
+ eq_(conn.execute(t.select()).first(), ("INT_1", 5))
def test_plain(self):
# among other things, tests that autoincrement
__backend__ = True
@testing.provide_metadata
- def test_string_default_none_on_insert(self):
+ def test_string_default_none_on_insert(self, connection):
"""Test that without implicit returning, we return None for
a string server default.
Column("data", String(10)),
implicit_returning=False,
)
- metadata.create_all()
- r = t.insert().execute(data="data")
+ metadata.create_all(connection)
+ r = connection.execute(t.insert(), dict(data="data"))
eq_(r.inserted_primary_key, [None])
- eq_(t.select().execute().fetchall(), [("key_one", "data")])
+ eq_(list(connection.execute(t.select())), [("key_one", "data")])
@testing.requires.returning
@testing.provide_metadata
- def test_string_default_on_insert_with_returning(self):
+ def test_string_default_on_insert_with_returning(self, connection):
"""With implicit_returning, we get a string PK default back no
problem."""
),
Column("data", String(10)),
)
- metadata.create_all()
- r = t.insert().execute(data="data")
+ metadata.create_all(connection)
+ r = connection.execute(t.insert(), dict(data="data"))
eq_(r.inserted_primary_key, ["key_one"])
- eq_(t.select().execute().fetchall(), [("key_one", "data")])
+ eq_(list(connection.execute(t.select())), [("key_one", "data")])
@testing.provide_metadata
- def test_int_default_none_on_insert(self):
+ def test_int_default_none_on_insert(self, connection):
metadata = self.metadata
t = Table(
"x",
implicit_returning=False,
)
assert t._autoincrement_column is None
- metadata.create_all()
- r = t.insert().execute(data="data")
+ metadata.create_all(connection)
+ r = connection.execute(t.insert(), dict(data="data"))
eq_(r.inserted_primary_key, [None])
if testing.against("sqlite"):
- eq_(t.select().execute().fetchall(), [(1, "data")])
+ eq_(list(connection.execute(t.select())), [(1, "data")])
else:
- eq_(t.select().execute().fetchall(), [(5, "data")])
+ eq_(list(connection.execute(t.select())), [(5, "data")])
@testing.provide_metadata
- def test_autoincrement_reflected_from_server_default(self):
+ def test_autoincrement_reflected_from_server_default(self, connection):
metadata = self.metadata
t = Table(
"x",
implicit_returning=False,
)
assert t._autoincrement_column is None
- metadata.create_all()
+ metadata.create_all(connection)
- m2 = MetaData(metadata.bind)
- t2 = Table("x", m2, autoload=True, implicit_returning=False)
+ m2 = MetaData()
+ t2 = Table("x", m2, autoload_with=connection, implicit_returning=False)
assert t2._autoincrement_column is None
@testing.provide_metadata
- def test_int_default_none_on_insert_reflected(self):
+ def test_int_default_none_on_insert_reflected(self, connection):
metadata = self.metadata
Table(
"x",
Column("data", String(10)),
implicit_returning=False,
)
- metadata.create_all()
+ metadata.create_all(connection)
- m2 = MetaData(metadata.bind)
- t2 = Table("x", m2, autoload=True, implicit_returning=False)
+ m2 = MetaData()
+ t2 = Table("x", m2, autoload_with=connection, implicit_returning=False)
- r = t2.insert().execute(data="data")
+ r = connection.execute(t2.insert(), dict(data="data"))
eq_(r.inserted_primary_key, [None])
if testing.against("sqlite"):
- eq_(t2.select().execute().fetchall(), [(1, "data")])
+ eq_(list(connection.execute(t2.select())), [(1, "data")])
else:
- eq_(t2.select().execute().fetchall(), [(5, "data")])
+ eq_(list(connection.execute(t2.select())), [(5, "data")])
@testing.requires.returning
@testing.provide_metadata
- def test_int_default_on_insert_with_returning(self):
+ def test_int_default_on_insert_with_returning(self, connection):
metadata = self.metadata
t = Table(
"x",
Column("data", String(10)),
)
- metadata.create_all()
- r = t.insert().execute(data="data")
+ metadata.create_all(connection)
+ r = connection.execute(t.insert(), dict(data="data"))
eq_(r.inserted_primary_key, [5])
- eq_(t.select().execute().fetchall(), [(5, "data")])
+ eq_(list(connection.execute(t.select())), [(5, "data")])
class UnicodeDefaultsTest(fixtures.TestBase):
)
-class InsertFromSelectTest(fixtures.TestBase):
+class InsertFromSelectTest(fixtures.TablesTest):
__backend__ = True
- def _fixture(self):
- data = Table(
- "data", self.metadata, Column("x", Integer), Column("y", Integer)
- )
- data.create()
- testing.db.execute(data.insert(), {"x": 2, "y": 5}, {"x": 7, "y": 12})
- return data
+ @classmethod
+ def define_tables(cls, metadata):
+ Table("data", metadata, Column("x", Integer), Column("y", Integer))
+
+ @classmethod
+ def insert_data(cls):
+ data = cls.tables.data
+
+ with testing.db.connect() as conn:
+ conn.execute(data.insert(), [{"x": 2, "y": 5}, {"x": 7, "y": 12}])
@testing.provide_metadata
- def test_insert_from_select_override_defaults(self):
- data = self._fixture()
+ def test_insert_from_select_override_defaults(self, connection):
+ data = self.tables.data
table = Table(
"sometable",
Column("y", Integer),
)
- table.create()
+ table.create(connection)
sel = select([data.c.x, data.c.y])
ins = table.insert().from_select(["x", "y"], sel)
- testing.db.execute(ins)
+ connection.execute(ins)
eq_(
- testing.db.execute(table.select().order_by(table.c.x)).fetchall(),
+ list(connection.execute(table.select().order_by(table.c.x))),
[(2, 12, 5), (7, 12, 12)],
)
@testing.provide_metadata
- def test_insert_from_select_fn_defaults(self):
- data = self._fixture()
+ def test_insert_from_select_fn_defaults(self, connection):
+ data = self.tables.data
counter = itertools.count(1)
Column("y", Integer),
)
- table.create()
+ table.create(connection)
sel = select([data.c.x, data.c.y])
ins = table.insert().from_select(["x", "y"], sel)
- testing.db.execute(ins)
+ connection.execute(ins)
# counter is only called once!
eq_(
- testing.db.execute(table.select().order_by(table.c.x)).fetchall(),
+ list(connection.execute(table.select().order_by(table.c.x))),
[(2, 1, 5), (7, 1, 12)],
)
some_table.c.x.default.arg = gen_default
return fn
- def _test(self, exec_type, usemethod):
+ @testing.combinations(
+ ("single", "attribute"),
+ ("single", "method"),
+ ("executemany", "attribute"),
+ ("executemany", "method"),
+ ("multivalues", "method", testing.requires.multivalues_inserts),
+ argnames="exec_type, usemethod",
+ )
+ def test_parameters(self, exec_type, usemethod, connection):
collect = mock.Mock()
@self._fixture
else:
stmt, params = table.insert(), parameters
- with testing.db.connect() as conn:
- conn.execute(stmt, params)
+ connection.execute(stmt, params)
eq_(
collect.mock_calls,
[mock.call({"y": param["y"], "x": None}) for param in parameters],
)
-
- def test_single_w_attribute(self):
- self._test("single", "attribute")
-
- def test_single_w_method(self):
- self._test("single", "method")
-
- def test_executemany_w_attribute(self):
- self._test("executemany", "attribute")
-
- def test_executemany_w_method(self):
- self._test("executemany", "method")
-
- @testing.requires.multivalues_inserts
- def test_multivalued_w_method(self):
- self._test("multivalues", "method")
--- /dev/null
+import sqlalchemy as sa
+from sqlalchemy import Integer
+from sqlalchemy import MetaData
+from sqlalchemy import Sequence
+from sqlalchemy import String
+from sqlalchemy import testing
+from sqlalchemy import util
+from sqlalchemy.dialects import sqlite
+from sqlalchemy.schema import CreateSequence
+from sqlalchemy.schema import DropSequence
+from sqlalchemy.sql import select
+from sqlalchemy.testing import assert_raises_message
+from sqlalchemy.testing import engines
+from sqlalchemy.testing import eq_
+from sqlalchemy.testing import fixtures
+from sqlalchemy.testing.assertsql import AllOf
+from sqlalchemy.testing.assertsql import CompiledSQL
+from sqlalchemy.testing.assertsql import EachOf
+from sqlalchemy.testing.schema import Column
+from sqlalchemy.testing.schema import Table
+
+
+class SequenceDDLTest(fixtures.TestBase, testing.AssertsCompiledSQL):
+ __dialect__ = "default"
+ __backend__ = True
+
+ def test_create_drop_ddl(self):
+ self.assert_compile(
+ CreateSequence(Sequence("foo_seq")), "CREATE SEQUENCE foo_seq"
+ )
+
+ self.assert_compile(
+ CreateSequence(Sequence("foo_seq", start=5)),
+ "CREATE SEQUENCE foo_seq START WITH 5",
+ )
+
+ self.assert_compile(
+ CreateSequence(Sequence("foo_seq", increment=2)),
+ "CREATE SEQUENCE foo_seq INCREMENT BY 2",
+ )
+
+ self.assert_compile(
+ CreateSequence(Sequence("foo_seq", increment=2, start=5)),
+ "CREATE SEQUENCE foo_seq INCREMENT BY 2 START WITH 5",
+ )
+
+ self.assert_compile(
+ CreateSequence(
+ Sequence("foo_seq", increment=2, start=0, minvalue=0)
+ ),
+ "CREATE SEQUENCE foo_seq INCREMENT BY 2 START WITH 0 MINVALUE 0",
+ )
+
+ self.assert_compile(
+ CreateSequence(
+ Sequence("foo_seq", increment=2, start=1, maxvalue=5)
+ ),
+ "CREATE SEQUENCE foo_seq INCREMENT BY 2 START WITH 1 MAXVALUE 5",
+ )
+
+ self.assert_compile(
+ CreateSequence(
+ Sequence("foo_seq", increment=2, start=1, nomaxvalue=True)
+ ),
+ "CREATE SEQUENCE foo_seq INCREMENT BY 2 START WITH 1 NO MAXVALUE",
+ )
+
+ self.assert_compile(
+ CreateSequence(
+ Sequence("foo_seq", increment=2, start=0, nominvalue=True)
+ ),
+ "CREATE SEQUENCE foo_seq INCREMENT BY 2 START WITH 0 NO MINVALUE",
+ )
+
+ self.assert_compile(
+ CreateSequence(
+ Sequence("foo_seq", start=1, maxvalue=10, cycle=True)
+ ),
+ "CREATE SEQUENCE foo_seq START WITH 1 MAXVALUE 10 CYCLE",
+ )
+
+ self.assert_compile(
+ CreateSequence(Sequence("foo_seq", cache=1000, order=True)),
+ "CREATE SEQUENCE foo_seq CACHE 1000 ORDER",
+ )
+
+ self.assert_compile(
+ CreateSequence(Sequence("foo_seq", order=True)),
+ "CREATE SEQUENCE foo_seq ORDER",
+ )
+
+ self.assert_compile(
+ DropSequence(Sequence("foo_seq")), "DROP SEQUENCE foo_seq"
+ )
+
+
+class LegacySequenceExecTest(fixtures.TestBase):
+ __requires__ = ("sequences",)
+ __backend__ = True
+
+ @classmethod
+ def setup_class(cls):
+ cls.seq = Sequence("my_sequence")
+ cls.seq.create(testing.db)
+
+ @classmethod
+ def teardown_class(cls):
+ cls.seq.drop(testing.db)
+
+ def _assert_seq_result(self, ret):
+ """asserts return of next_value is an int"""
+
+ assert isinstance(ret, util.int_types)
+ assert ret > 0
+
+ def test_implicit_connectionless(self):
+ s = Sequence("my_sequence", metadata=MetaData(testing.db))
+ self._assert_seq_result(s.execute())
+
+ def test_explicit(self, connection):
+ s = Sequence("my_sequence")
+ self._assert_seq_result(s.execute(connection))
+
+ def test_explicit_optional(self):
+ """test dialect executes a Sequence, returns nextval, whether
+ or not "optional" is set """
+
+ s = Sequence("my_sequence", optional=True)
+ self._assert_seq_result(s.execute(testing.db))
+
+ def test_func_implicit_connectionless_execute(self):
+ """test func.next_value().execute()/.scalar() works
+ with connectionless execution. """
+
+ s = Sequence("my_sequence", metadata=MetaData(testing.db))
+ self._assert_seq_result(s.next_value().execute().scalar())
+
+ def test_func_explicit(self):
+ s = Sequence("my_sequence")
+ self._assert_seq_result(testing.db.scalar(s.next_value()))
+
+ def test_func_implicit_connectionless_scalar(self):
+ """test func.next_value().execute()/.scalar() works. """
+
+ s = Sequence("my_sequence", metadata=MetaData(testing.db))
+ self._assert_seq_result(s.next_value().scalar())
+
+ def test_func_embedded_select(self):
+ """test can use next_value() in select column expr"""
+
+ s = Sequence("my_sequence")
+ self._assert_seq_result(testing.db.scalar(select([s.next_value()])))
+
+
+class SequenceExecTest(fixtures.TestBase):
+ __requires__ = ("sequences",)
+ __backend__ = True
+
+ @classmethod
+ def setup_class(cls):
+ cls.seq = Sequence("my_sequence")
+ cls.seq.create(testing.db)
+
+ @classmethod
+ def teardown_class(cls):
+ cls.seq.drop(testing.db)
+
+ def _assert_seq_result(self, ret):
+ """asserts return of next_value is an int"""
+
+ assert isinstance(ret, util.int_types)
+ assert ret > 0
+
+ def test_execute(self, connection):
+ s = Sequence("my_sequence")
+ self._assert_seq_result(connection.execute(s))
+
+ def test_execute_optional(self, connection):
+ """test dialect executes a Sequence, returns nextval, whether
+ or not "optional" is set """
+
+ s = Sequence("my_sequence", optional=True)
+ self._assert_seq_result(connection.execute(s))
+
+ def test_execute_next_value(self, connection):
+ """test func.next_value().execute()/.scalar() works
+ with connectionless execution. """
+
+ s = Sequence("my_sequence")
+ self._assert_seq_result(connection.scalar(s.next_value()))
+
+ def test_execute_optional_next_value(self, connection):
+ """test func.next_value().execute()/.scalar() works
+ with connectionless execution. """
+
+ s = Sequence("my_sequence", optional=True)
+ self._assert_seq_result(connection.scalar(s.next_value()))
+
+ def test_func_embedded_select(self, connection):
+ """test can use next_value() in select column expr"""
+
+ s = Sequence("my_sequence")
+ self._assert_seq_result(connection.scalar(select([s.next_value()])))
+
+ @testing.fails_on("oracle", "ORA-02287: sequence number not allowed here")
+ @testing.provide_metadata
+ def test_func_embedded_whereclause(self, connection):
+ """test can use next_value() in whereclause"""
+
+ metadata = self.metadata
+ t1 = Table("t", metadata, Column("x", Integer))
+ t1.create(testing.db)
+ connection.execute(t1.insert(), [{"x": 1}, {"x": 300}, {"x": 301}])
+ s = Sequence("my_sequence")
+ eq_(
+ list(
+ connection.execute(t1.select().where(t1.c.x > s.next_value()))
+ ),
+ [(300,), (301,)],
+ )
+
+ @testing.provide_metadata
+ def test_func_embedded_valuesbase(self, connection):
+ """test can use next_value() in values() of _ValuesBase"""
+
+ metadata = self.metadata
+ t1 = Table("t", metadata, Column("x", Integer))
+ t1.create(testing.db)
+ s = Sequence("my_sequence")
+ connection.execute(t1.insert().values(x=s.next_value()))
+ self._assert_seq_result(connection.scalar(t1.select()))
+
+ @testing.requires.supports_lastrowid
+ @testing.provide_metadata
+ def test_inserted_pk_no_returning_w_lastrowid(self):
+ """test inserted_primary_key contains the pk when
+ pk_col=next_value(), lastrowid is supported."""
+
+ metadata = self.metadata
+ t1 = Table("t", metadata, Column("x", Integer, primary_key=True))
+ t1.create(testing.db)
+ e = engines.testing_engine(options={"implicit_returning": False})
+ s = Sequence("my_sequence")
+
+ with e.connect() as conn:
+ r = conn.execute(t1.insert().values(x=s.next_value()))
+ self._assert_seq_result(r.inserted_primary_key[0])
+
+ @testing.requires.no_lastrowid_support
+ @testing.provide_metadata
+ def test_inserted_pk_no_returning_no_lastrowid(self):
+ """test inserted_primary_key contains [None] when
+ pk_col=next_value(), implicit returning is not used."""
+
+ metadata = self.metadata
+ t1 = Table("t", metadata, Column("x", Integer, primary_key=True))
+ t1.create(testing.db)
+
+ e = engines.testing_engine(options={"implicit_returning": False})
+ s = Sequence("my_sequence")
+ with e.connect() as conn:
+ r = conn.execute(t1.insert().values(x=s.next_value()))
+ eq_(r.inserted_primary_key, [None])
+
+ @testing.requires.returning
+ @testing.provide_metadata
+ def test_inserted_pk_implicit_returning(self):
+ """test inserted_primary_key contains the result when
+ pk_col=next_value(), when implicit returning is used."""
+
+ metadata = self.metadata
+ s = Sequence("my_sequence")
+ t1 = Table("t", metadata, Column("x", Integer, primary_key=True))
+ t1.create(testing.db)
+
+ e = engines.testing_engine(options={"implicit_returning": True})
+ with e.connect() as conn:
+ r = conn.execute(t1.insert().values(x=s.next_value()))
+ self._assert_seq_result(r.inserted_primary_key[0])
+
+
+class SequenceTest(fixtures.TestBase, testing.AssertsCompiledSQL):
+ __requires__ = ("sequences",)
+ __backend__ = True
+
+ @testing.combinations(
+ (Sequence("foo_seq"),),
+ (Sequence("foo_seq", start=8),),
+ (Sequence("foo_seq", increment=5),),
+ )
+ def test_start_increment(self, seq):
+ seq.create(testing.db)
+ try:
+ with testing.db.connect() as conn:
+ values = [conn.execute(seq) for i in range(3)]
+ start = seq.start or 1
+ inc = seq.increment or 1
+ eq_(values, list(range(start, start + inc * 3, inc)))
+
+ finally:
+ seq.drop(testing.db)
+
+ def _has_sequence(self, connection, name):
+ return testing.db.dialect.has_sequence(connection, name)
+
+ def test_nextval_unsupported(self):
+ """test next_value() used on non-sequence platform
+ raises NotImplementedError."""
+
+ s = Sequence("my_seq")
+ d = sqlite.dialect()
+ assert_raises_message(
+ NotImplementedError,
+ "Dialect 'sqlite' does not support sequence increments.",
+ s.next_value().compile,
+ dialect=d,
+ )
+
+ def test_checkfirst_sequence(self, connection):
+ s = Sequence("my_sequence")
+ s.create(connection, checkfirst=False)
+ assert self._has_sequence(connection, "my_sequence")
+ s.create(connection, checkfirst=True)
+ s.drop(connection, checkfirst=False)
+ assert not self._has_sequence(connection, "my_sequence")
+ s.drop(connection, checkfirst=True)
+
+ def test_checkfirst_metadata(self, connection):
+ m = MetaData()
+ Sequence("my_sequence", metadata=m)
+ m.create_all(connection, checkfirst=False)
+ assert self._has_sequence(connection, "my_sequence")
+ m.create_all(connection, checkfirst=True)
+ m.drop_all(connection, checkfirst=False)
+ assert not self._has_sequence(connection, "my_sequence")
+ m.drop_all(connection, checkfirst=True)
+
+ def test_checkfirst_table(self, connection):
+ m = MetaData()
+ s = Sequence("my_sequence")
+ t = Table("t", m, Column("c", Integer, s, primary_key=True))
+ t.create(connection, checkfirst=False)
+ assert self._has_sequence(connection, "my_sequence")
+ t.create(connection, checkfirst=True)
+ t.drop(connection, checkfirst=False)
+ assert not self._has_sequence(connection, "my_sequence")
+ t.drop(connection, checkfirst=True)
+
+ @testing.provide_metadata
+ def test_table_overrides_metadata_create(self, connection):
+ metadata = self.metadata
+ Sequence("s1", metadata=metadata)
+ s2 = Sequence("s2", metadata=metadata)
+ s3 = Sequence("s3")
+ t = Table("t", metadata, Column("c", Integer, s3, primary_key=True))
+ assert s3.metadata is metadata
+
+ t.create(connection, checkfirst=True)
+ s3.drop(connection)
+
+ # 't' is created, and 's3' won't be
+ # re-created since it's linked to 't'.
+ # 's1' and 's2' are, however.
+ metadata.create_all(connection)
+ assert self._has_sequence(connection, "s1")
+ assert self._has_sequence(connection, "s2")
+ assert not self._has_sequence(connection, "s3")
+
+ s2.drop(connection)
+ assert self._has_sequence(connection, "s1")
+ assert not self._has_sequence(connection, "s2")
+
+ metadata.drop_all(connection)
+ assert not self._has_sequence(connection, "s1")
+ assert not self._has_sequence(connection, "s2")
+
+ @testing.requires.returning
+ @testing.provide_metadata
+ def test_freestanding_sequence_via_autoinc(self, connection):
+ t = Table(
+ "some_table",
+ self.metadata,
+ Column(
+ "id",
+ Integer,
+ autoincrement=True,
+ primary_key=True,
+ default=Sequence(
+ "my_sequence", metadata=self.metadata
+ ).next_value(),
+ ),
+ )
+ self.metadata.create_all(connection)
+
+ result = connection.execute(t.insert())
+ eq_(result.inserted_primary_key, [1])
+
+
+class TableBoundSequenceTest(fixtures.TablesTest):
+ __requires__ = ("sequences",)
+ __backend__ = True
+
+ @classmethod
+ def define_tables(cls, metadata):
+ Table(
+ "cartitems",
+ metadata,
+ Column(
+ "cart_id", Integer, Sequence("cart_id_seq"), primary_key=True
+ ),
+ Column("description", String(40)),
+ Column("createdate", sa.DateTime()),
+ )
+
+ # a little bit of implicit case sensitive naming test going on here
+ Table(
+ "Manager",
+ metadata,
+ Column("obj_id", Integer, Sequence("obj_id_seq")),
+ Column("name", String(128)),
+ Column(
+ "id",
+ Integer,
+ Sequence("Manager_id_seq", optional=True),
+ primary_key=True,
+ ),
+ )
+
+ def test_insert_via_seq(self, connection):
+ cartitems = self.tables.cartitems
+
+ connection.execute(cartitems.insert(), dict(description="hi"))
+ connection.execute(cartitems.insert(), dict(description="there"))
+ r = connection.execute(cartitems.insert(), dict(description="lala"))
+
+ eq_(r.inserted_primary_key[0], 3)
+
+ eq_(
+ connection.scalar(
+ sa.select([cartitems.c.cart_id]).where(
+ cartitems.c.description == "lala"
+ ),
+ ),
+ 3,
+ )
+
+ def test_seq_nonpk(self):
+ """test sequences fire off as defaults on non-pk columns"""
+
+ sometable = self.tables.Manager
+
+ engine = engines.testing_engine(options={"implicit_returning": False})
+
+ with engine.connect() as conn:
+ result = conn.execute(sometable.insert(), dict(name="somename"))
+
+ eq_(result.postfetch_cols(), [sometable.c.obj_id])
+
+ result = conn.execute(sometable.insert(), dict(name="someother"))
+
+ conn.execute(
+ sometable.insert(), [{"name": "name3"}, {"name": "name4"}]
+ )
+ eq_(
+ list(
+ conn.execute(sometable.select().order_by(sometable.c.id))
+ ),
+ [
+ (1, "somename", 1),
+ (2, "someother", 2),
+ (3, "name3", 3),
+ (4, "name4", 4),
+ ],
+ )
+
+
+class SequenceAsServerDefaultTest(
+ testing.AssertsExecutionResults, fixtures.TablesTest
+):
+ __requires__ = ("sequences_as_server_defaults",)
+ __backend__ = True
+
+ run_create_tables = "each"
+
+ @classmethod
+ def define_tables(cls, metadata):
+ m = metadata
+
+ s = Sequence("t_seq", metadata=m)
+ Table(
+ "t_seq_test",
+ m,
+ Column("id", Integer, s, server_default=s.next_value()),
+ Column("data", String(50)),
+ )
+
+ s2 = Sequence("t_seq_2", metadata=m)
+ Table(
+ "t_seq_test_2",
+ m,
+ Column("id", Integer, server_default=s2.next_value()),
+ Column("data", String(50)),
+ )
+
+ def test_default_textual_w_default(self, connection):
+ connection.exec_driver_sql(
+ "insert into t_seq_test (data) values ('some data')"
+ )
+
+ eq_(
+ connection.exec_driver_sql("select id from t_seq_test").scalar(), 1
+ )
+
+ def test_default_core_w_default(self, connection):
+ t_seq_test = self.tables.t_seq_test
+ connection.execute(t_seq_test.insert().values(data="some data"))
+
+ eq_(connection.scalar(select([t_seq_test.c.id])), 1)
+
+ def test_default_textual_server_only(self, connection):
+ connection.exec_driver_sql(
+ "insert into t_seq_test_2 (data) values ('some data')"
+ )
+
+ eq_(
+ connection.exec_driver_sql("select id from t_seq_test_2").scalar(),
+ 1,
+ )
+
+ def test_default_core_server_only(self, connection):
+ t_seq_test = self.tables.t_seq_test_2
+ connection.execute(t_seq_test.insert().values(data="some data"))
+
+ eq_(connection.scalar(select([t_seq_test.c.id])), 1)
+
+ def test_drop_ordering(self):
+ with self.sql_execution_asserter(testing.db) as asserter:
+ self.metadata.drop_all(checkfirst=False)
+
+ asserter.assert_(
+ AllOf(
+ CompiledSQL("DROP TABLE t_seq_test_2", {}),
+ EachOf(
+ CompiledSQL("DROP TABLE t_seq_test", {}),
+ CompiledSQL(
+ "DROP SEQUENCE t_seq", # dropped as part of t_seq_test
+ {},
+ ),
+ ),
+ ),
+ CompiledSQL(
+ "DROP SEQUENCE t_seq_2", # dropped as part of metadata level
+ {},
+ ),
+ )