:ref:`defaults_sequences`
-:ticket:`4976`
\ No newline at end of file
+:ticket:`4976`
+
+
+.. _change_4235:
+
+Added Sequence support distinct from IDENTITY to SQL Server
+-----------------------------------------------------------
+
+The :class:`.Sequence` construct is now fully functional with Microsoft
+SQL Server. When applied to a :class:`.Column`, the DDL for the table will
+no longer include IDENTITY keywords and instead will rely upon "CREATE SEQUENCE"
+to ensure a sequence is present which will then be used for INSERT statements
+on the table.
+
+The :class:`.Sequence` prior to version 1.3 was used to control parameters for
+the IDENTITY column in SQL Server; this usage emitted deprecation warnings
+throughout 1.3 and is now removed in 1.4. For control of paramters for an
+IDENTITY column, the ``mssql_identity_start`` and ``mssql_identity_increment``
+parameters should be used; see the MSSQL dialect documentation linked below.
+
+
+.. seealso::
+
+ :ref:`mssql_identity`
+
+:ticket:`4235`
+
+:ticket:`4633`
--- /dev/null
+.. change::
+ :tags: sql, mssql
+ :tickets: 4235, 4633
+
+ Added support for "CREATE SEQUENCE" and full :class:`.Sequence` support for
+ Microsoft SQL Server. This removes the deprecated feature of using
+ :class:`.Sequence` objects to manipulate IDENTITY characteristics which
+ should now be performed using ``mssql_identity_start`` and
+ ``mssql_identity_increment`` as documented at :ref:`mssql_identity`. The
+ change includes a new parameter :paramref:`.Sequence.data_type` to
+ accommodate SQL Server's choice of datatype, which for that backend
+ includes INTEGER, BIGINT, and DECIMAL(n, 0). The default starting value
+ for SQL Server's version of :class:`.Sequence` has been set at 1; this
+ default is now emitted within the CREATE SEQUENCE DDL for all backends.
+
+ .. seealso::
+
+ :ref:`change_4235`
\ No newline at end of file
the first
integer primary key column in a :class:`_schema.Table`
will be considered to be the
-identity column and will generate DDL as such::
+identity column - unless it is associated with a :class:`.Sequence` - and will
+generate DDL as such::
from sqlalchemy import Table, MetaData, Column, Integer
the ``mssql_identity_start`` and ``mssql_identity_increment`` parameters
documented at :ref:`mssql_identity`.
+.. versionchanged:: 1.4 Removed the ability to use a :class:`.Sequence`
+ object to modify IDENTITY characteristics. :class:`.Sequence` objects
+ now only manipulate true T-SQL SEQUENCE types.
+
.. note::
There can only be one IDENTITY column on the table. When using
This
is an auxiliary use case suitable for testing and bulk insert scenarios.
+SEQUENCE support
+----------------
+
+The :class:`.Sequence` object now creates "real" sequences, i.e.,
+``CREATE SEQUENCE``. To provide compatibility with other dialects,
+:class:`.Sequence` defaults to a data type of Integer and a start value of 1,
+even though the T-SQL defaults are BIGINT and -9223372036854775808,
+respectively.
+
+.. versionadded:: 1.4.0
+
MAX on VARCHAR / NVARCHAR
-------------------------
from . import information_schema as ischema
from ... import exc
from ... import schema as sa_schema
+from ... import Sequence
from ... import sql
from ... import types as sqltypes
from ... import util
from ...sql import func
from ...sql import quoted_name
from ...sql import util as sql_util
+from ...sql.type_api import to_instance
from ...types import BIGINT
from ...types import BINARY
from ...types import CHAR
if self.isinsert:
tbl = self.compiled.statement.table
- seq_column = tbl._autoincrement_column
- insert_has_sequence = seq_column is not None
+ id_column = tbl._autoincrement_column
+ insert_has_identity = (id_column is not None) and (
+ not isinstance(id_column.default, Sequence)
+ )
- if insert_has_sequence:
+ if insert_has_identity:
compile_state = self.compiled.compile_state
self._enable_identity_insert = (
- seq_column.key in self.compiled_parameters[0]
+ id_column.key in self.compiled_parameters[0]
) or (
compile_state._dict_parameters
and (
- seq_column.key in compile_state._dict_parameters
- or seq_column in compile_state._dict_parameters
+ id_column.key in compile_state._dict_parameters
+ or id_column in compile_state._dict_parameters
)
)
self._select_lastrowid = (
not self.compiled.inline
- and insert_has_sequence
+ and insert_has_identity
and not self.compiled.returning
and not self._enable_identity_insert
and not self.executemany
except Exception:
pass
+ def get_result_cursor_strategy(self, result):
+ if self._result_strategy:
+ return self._result_strategy
+ else:
+ return super(MSExecutionContext, self).get_result_cursor_strategy(
+ result
+ )
+
+ def fire_sequence(self, seq, type_):
+ return self._execute_scalar(
+ (
+ "SELECT NEXT VALUE FOR %s"
+ % self.dialect.identifier_preparer.format_sequence(seq)
+ ),
+ type_,
+ )
+
class MSSQLCompiler(compiler.SQLCompiler):
returning_precedes_values = True
self.process(binary.right),
)
+ def visit_sequence(self, seq, **kw):
+ return "NEXT VALUE FOR %s" % self.preparer.format_sequence(seq)
+
class MSSQLStrictCompiler(MSSQLCompiler):
"in order to generate DDL"
)
- # install an IDENTITY Sequence if we either a sequence or an implicit
- # IDENTITY column
- if isinstance(column.default, sa_schema.Sequence):
-
- if (
- column.default.start is not None
- or column.default.increment is not None
- or column is not column.table._autoincrement_column
- ):
- util.warn_deprecated(
- "Use of Sequence with SQL Server in order to affect the "
- "parameters of the IDENTITY value is deprecated, as "
- "Sequence "
- "will correspond to an actual SQL Server "
- "CREATE SEQUENCE in "
- "a future release. Please use the mssql_identity_start "
- "and mssql_identity_increment parameters.",
- version="1.3",
- )
- if column.default.start == 0:
- start = 0
- else:
- start = column.default.start or 1
-
- colspec += " IDENTITY(%s,%s)" % (
- start,
- column.default.increment or 1,
- )
- elif (
+ if (
column is column.table._autoincrement_column
or column.autoincrement is True
):
- start = column.dialect_options["mssql"]["identity_start"]
- increment = column.dialect_options["mssql"]["identity_increment"]
- colspec += " IDENTITY(%s,%s)" % (start, increment)
+ if not isinstance(column.default, Sequence):
+ start = column.dialect_options["mssql"]["identity_start"]
+ increment = column.dialect_options["mssql"][
+ "identity_increment"
+ ]
+ colspec += " IDENTITY(%s,%s)" % (start, increment)
else:
default = self.get_column_default_string(column)
if default is not None:
text += " PERSISTED"
return text
+ def visit_create_sequence(self, create, **kw):
+
+ if create.element.data_type is not None:
+ data_type = create.element.data_type
+ else:
+ data_type = to_instance(self.dialect.sequence_default_column_type)
+
+ prefix = " AS %s" % self.type_compiler.process(data_type)
+ return super(MSDDLCompiler, self).visit_create_sequence(
+ create, prefix=prefix, **kw
+ )
+
class MSIdentifierPreparer(compiler.IdentifierPreparer):
reserved_words = RESERVED_WORDS
ischema_names = ischema_names
+ supports_sequences = True
+ # T-SQL's actual default is BIGINT
+ sequence_default_column_type = INTEGER
+ # T-SQL's actual default is -9223372036854775808
+ default_sequence_base = 1
+
supports_native_boolean = False
non_native_boolean_check_constraint = False
supports_unicode_binds = True
return c.first() is not None
+ @_db_plus_owner
+ def has_sequence(self, connection, sequencename, dbname, owner, schema):
+ sequences = ischema.sequences
+
+ s = sql.select([sequences.c.sequence_name]).where(
+ sequences.c.sequence_name == sequencename
+ )
+
+ if owner:
+ s = s.where(sequences.c.sequence_schema == owner)
+
+ c = connection.execute(s)
+
+ return c.first() is not None
+
@reflection.cache
def get_schema_names(self, connection, **kw):
s = sql.select(
Column("definition", CoerceUnicode),
schema="sys",
)
+
+sequences = Table(
+ "SEQUENCES",
+ ischema,
+ Column("SEQUENCE_CATALOG", CoerceUnicode, key="sequence_catalog"),
+ Column("SEQUENCE_SCHEMA", CoerceUnicode, key="sequence_schema"),
+ Column("SEQUENCE_NAME", CoerceUnicode, key="sequence_name"),
+ schema="INFORMATION_SCHEMA",
+)
supports_sequences = True
sequences_optional = True
+ sequence_default_column_type = INTEGER
preexecute_autoincrement_sequences = True
postfetch_lastrowid = False
from . import interfaces
from .. import event
from .. import exc
+from .. import Integer
from .. import pool
from .. import processors
from .. import types as sqltypes
# not cx_oracle.
execute_sequence_format = tuple
+ sequence_default_column_type = Integer
+
supports_views = True
supports_sequences = False
sequences_optional = False
drop.element, use_table=True
)
- def visit_create_sequence(self, create, **kw):
+ def visit_create_sequence(self, create, prefix=None, **kw):
text = "CREATE SEQUENCE %s" % self.preparer.format_sequence(
create.element
)
+ if prefix:
+ text += prefix
if create.element.increment is not None:
text += " INCREMENT BY %d" % create.element.increment
- if create.element.start is not None:
- text += " START WITH %d" % create.element.start
+ if create.element.start is None:
+ create.element.start = self.dialect.default_sequence_base
+ text += " START WITH %d" % create.element.start
if create.element.minvalue is not None:
text += " MINVALUE %d" % create.element.minvalue
if create.element.maxvalue is not None:
from .elements import quoted_name
from .elements import TextClause
from .selectable import TableClause
+from .type_api import to_instance
from .visitors import InternalTraversal
from .. import event
from .. import exc
cycle=None,
cache=None,
order=None,
+ data_type=None,
):
"""Construct a :class:`.IdentityOptions` object.
sequence which are calculated in advance.
:param order: optional boolean value; if true, renders the
ORDER keyword.
- name.
+ :param data_type: The type to be returned by the sequence.
+
"""
self.start = start
self.increment = increment
self.cycle = cycle
self.cache = cache
self.order = order
+ if data_type is not None:
+ self.data_type = to_instance(data_type)
+ else:
+ self.data_type = None
class Sequence(IdentityOptions, roles.StatementRole, DefaultGenerator):
schema=None,
cache=None,
order=None,
+ data_type=None,
optional=False,
quote=None,
metadata=None,
"""Construct a :class:`.Sequence` object.
:param name: the name of the sequence.
+
:param start: the starting index of the sequence. This value is
used when the CREATE SEQUENCE command is emitted to the database
as the value of the "START WITH" clause. If ``None``, the
.. versionadded:: 1.1.12
+ :param data_type: The type to be returned by the sequence, for
+ dialects that allow us to choose between INTEGER, BIGINT, etc.
+ (e.g., mssql).
+
+ .. versionadded:: 1.4.0
+
:param optional: boolean value, when ``True``, indicates that this
:class:`.Sequence` object only needs to be explicitly generated
on backends that don't provide another way to generate primary
cycle=cycle,
cache=cache,
order=order,
+ data_type=data_type,
)
self.name = quoted_name(name, quote)
self.optional = optional
)
)
+ # TODO: need to do a get_sequences and drop them also after tables
+
@post
def _reverse_topological(options, file_config):
import sys
from . import exclusions
+from . import fails_on_everything_except
from .. import util
lambda config: not config.db.dialect.supports_is_distinct_from,
"driver doesn't support an IS DISTINCT FROM construct",
)
+
+ @property
+ def emulated_lastrowid_even_with_sequences(self):
+ """"target dialect retrieves cursor.lastrowid or an equivalent
+ after an insert() construct executes, even if the table has a
+ Sequence on it..
+ """
+ return fails_on_everything_except(
+ "mysql", "sqlite+pysqlite", "sqlite+pysqlcipher", "sybase",
+ )
def _assert_round_trip(self, table, conn):
row = conn.execute(table.select()).first()
- eq_(row, (conn.engine.dialect.default_sequence_base, "some data"))
+ eq_(
+ row, (conn.dialect.default_sequence_base, "some data",),
+ )
def test_autoincrement_on_insert(self, connection):
def _assert_round_trip(self, table, conn):
row = conn.execute(table.select()).first()
- eq_(row, (conn.engine.dialect.default_sequence_base, "some data"))
+ eq_(
+ row, (conn.dialect.default_sequence_base, "some data",),
+ )
@classmethod
def define_tables(cls, metadata):
Table(
"seq_pk",
metadata,
- Column("id", Integer, Sequence("tab_id_seq"), primary_key=True),
+ Column("id", Integer, Sequence("tab_id_seq"), primary_key=True,),
Column("data", String(50)),
)
Column(
"id",
Integer,
- Sequence("tab_id_seq", optional=True),
+ Sequence("tab_id_seq", data_type=Integer, optional=True),
primary_key=True,
),
Column("data", String(50)),
def test_insert_lastrowid(self, connection):
r = connection.execute(self.tables.seq_pk.insert(), data="some data")
- eq_(r.inserted_primary_key, [1])
+ eq_(r.inserted_primary_key, [testing.db.dialect.default_sequence_base])
def test_nextval_direct(self, connection):
r = connection.execute(self.tables.seq_pk.c.id.default)
- eq_(r, 1)
+ eq_(r, testing.db.dialect.default_sequence_base)
@requirements.sequences_optional
def test_optional_seq(self, connection):
def _assert_round_trip(self, table, conn):
row = conn.execute(table.select()).first()
- eq_(row, (1, "some data"))
+ eq_(row, (testing.db.dialect.default_sequence_base, "some data"))
class SequenceCompilerTest(testing.AssertsCompiledSQL, fixtures.TestBase):
from sqlalchemy import PrimaryKeyConstraint
from sqlalchemy import schema
from sqlalchemy import select
-from sqlalchemy import Sequence
from sqlalchemy import sql
from sqlalchemy import String
from sqlalchemy import Table
"PRIMARY KEY (id))",
)
- def test_sequence_start_0(self):
- metadata = MetaData()
- tbl = Table(
- "test",
- metadata,
- Column("id", Integer, Sequence("", 0), primary_key=True),
- )
- with testing.expect_deprecated(
- "Use of Sequence with SQL Server in order to affect "
- ):
- self.assert_compile(
- schema.CreateTable(tbl),
- "CREATE TABLE test (id INTEGER NOT NULL IDENTITY(0,1), "
- "PRIMARY KEY (id))",
- )
-
- def test_sequence_non_primary_key(self):
- metadata = MetaData()
- tbl = Table(
- "test",
- metadata,
- Column("id", Integer, Sequence("", start=5), primary_key=False),
- )
- with testing.expect_deprecated(
- "Use of Sequence with SQL Server in order to affect "
- ):
- self.assert_compile(
- schema.CreateTable(tbl),
- "CREATE TABLE test (id INTEGER NOT NULL IDENTITY(5,1))",
- )
-
- def test_sequence_ignore_nullability(self):
- metadata = MetaData()
- tbl = Table(
- "test",
- metadata,
- Column("id", Integer, Sequence("", start=5), nullable=True),
- )
- with testing.expect_deprecated(
- "Use of Sequence with SQL Server in order to affect "
- ):
- self.assert_compile(
- schema.CreateTable(tbl),
- "CREATE TABLE test (id INTEGER NOT NULL IDENTITY(5,1))",
- )
-
def test_table_pkc_clustering(self):
metadata = MetaData()
tbl = Table(
@testing.requires.mssql_freetds
@testing.requires.python2
@testing.provide_metadata
- def test_convert_unicode(self):
+ def test_convert_unicode(self, connection):
meta = self.metadata
t1 = Table(
"unitest_table",
Column("id", Integer, primary_key=True),
Column("descr", mssql.MSText()),
)
- meta.create_all()
- with testing.db.connect() as con:
- con.execute(
- ue(
- "insert into unitest_table values ('abc \xc3\xa9 def')"
- ).encode("UTF-8")
+ meta.create_all(connection)
+ connection.execute(
+ ue("insert into unitest_table values ('abc \xc3\xa9 def')").encode(
+ "UTF-8"
)
- r = con.execute(t1.select()).first()
- assert isinstance(r[1], util.text_type), (
- "%s is %s instead of unicode, working on %s"
- % (r[1], type(r[1]), meta.bind)
- )
- eq_(r[1], util.ue("abc \xc3\xa9 def"))
+ )
+ r = connection.execute(t1.select()).first()
+ assert isinstance(r[1], util.text_type), (
+ "%s is %s instead of unicode, working on %s"
+ % (r[1], type(r[1]), meta.bind)
+ )
+ eq_(r[1], util.ue("abc \xc3\xa9 def"))
class QueryTest(testing.AssertsExecutionResults, fixtures.TestBase):
__only_on__ = "mssql"
__backend__ = True
- def test_fetchid_trigger(self):
+ @testing.provide_metadata
+ def test_fetchid_trigger(self, connection):
+ # TODO: investigate test hang on mssql when connection fixture is used
"""
Verify identity return value on inserting to a trigger table.
with the init parameter 'implicit_returning = False'.
"""
- # todo: this same test needs to be tried in a multithreaded context
+ # TODO: this same test needs to be tried in a multithreaded context
# with multiple threads inserting to the same table.
- # todo: check whether this error also occurs with clients other
+ # TODO: check whether this error also occurs with clients other
# than the SQL Server Native Client. Maybe an assert_raises
# test should be written.
- meta = MetaData(testing.db)
+ meta = self.metadata
t1 = Table(
"t1",
meta,
Column("id", Integer, mssql_identity_start=200, primary_key=True),
Column("descr", String(200)),
)
- meta.create_all()
- con = testing.db.connect()
- con.exec_driver_sql(
- """create trigger paj on t1 for insert as
- insert into t2 (descr) select descr from inserted"""
+
+ event.listen(
+ meta,
+ "after_create",
+ DDL(
+ "create trigger paj on t1 for insert as "
+ "insert into t2 (descr) select descr from inserted"
+ ),
)
- try:
- tr = con.begin()
- r = con.execute(t2.insert(), descr="hello")
- self.assert_(r.inserted_primary_key == [200])
- r = con.execute(t1.insert(), descr="hello")
- self.assert_(r.inserted_primary_key == [100])
+ # this DROP is not actually needed since SQL Server transactional
+ # DDL is reverting it with the connection fixture. however,
+ # since we can use "if exists" it's safe to have this here in
+ # case things change.
+ event.listen(
+ meta, "before_drop", DDL("""drop trigger if exists paj""")
+ )
- finally:
- tr.commit()
- con.exec_driver_sql("""drop trigger paj""")
- meta.drop_all()
+ # seems to work with all linux drivers + backend. not sure
+ # if windows drivers / servers have different behavior here.
+ meta.create_all(connection)
+ r = connection.execute(t2.insert(), descr="hello")
+ self.assert_(r.inserted_primary_key == [200])
+ r = connection.execute(t1.insert(), descr="hello")
+ self.assert_(r.inserted_primary_key == [100])
@testing.provide_metadata
def _test_disable_scope_identity(self):
options=dict(legacy_schema_aliasing=False)
)
meta.bind = eng
- con = eng.connect()
- con.exec_driver_sql("create schema paj")
+ conn = eng.connect()
+ conn.exec_driver_sql("create schema paj")
@event.listens_for(meta, "after_drop")
def cleanup(target, connection, **kw):
tbl = Table(
"test", meta, Column("id", Integer, primary_key=True), schema="paj"
)
- tbl.create()
- tbl.insert().execute({"id": 1})
- eq_(tbl.select().scalar(), 1)
+ tbl.create(conn)
+ conn.execute(tbl.insert(), {"id": 1})
+ eq_(conn.scalar(tbl.select()), 1)
@testing.provide_metadata
def test_insertid_schema_legacy(self):
meta = self.metadata
eng = engines.testing_engine(options=dict(legacy_schema_aliasing=True))
meta.bind = eng
- con = eng.connect()
- con.exec_driver_sql("create schema paj")
+ conn = eng.connect()
+ conn.exec_driver_sql("create schema paj")
@event.listens_for(meta, "after_drop")
def cleanup(target, connection, **kw):
eq_(tbl.select().scalar(), 1)
@testing.provide_metadata
- def test_returning_no_autoinc(self):
+ def test_returning_no_autoinc(self, connection):
meta = self.metadata
table = Table(
"t1",
Column("id", Integer, primary_key=True),
Column("data", String(50)),
)
- table.create()
- result = (
+ table.create(connection)
+ result = connection.execute(
table.insert()
.values(id=1, data=func.lower("SomeString"))
.returning(table.c.id, table.c.data)
- .execute()
)
eq_(result.fetchall(), [(1, "somestring")])
options=dict(legacy_schema_aliasing=False)
)
meta.bind = eng
- con = eng.connect()
- con.exec_driver_sql("create schema paj")
+ conn = eng.connect()
+ conn.exec_driver_sql("create schema paj")
@event.listens_for(meta, "after_drop")
def cleanup(target, connection, **kw):
tbl = Table(
"test", meta, Column("id", Integer, primary_key=True), schema="paj"
)
- tbl.create()
- tbl.insert().execute({"id": 1})
- eq_(tbl.select().scalar(), 1)
- tbl.delete(tbl.c.id == 1).execute()
- eq_(tbl.select().scalar(), None)
+ tbl.create(conn)
+ conn.execute(tbl.insert(), {"id": 1})
+ eq_(conn.scalar(tbl.select()), 1)
+ conn.execute(tbl.delete(tbl.c.id == 1))
+ eq_(conn.scalar(tbl.select()), None)
@testing.provide_metadata
def test_delete_schema_legacy(self):
meta = self.metadata
eng = engines.testing_engine(options=dict(legacy_schema_aliasing=True))
meta.bind = eng
- con = eng.connect()
- con.exec_driver_sql("create schema paj")
+ conn = eng.connect()
+ conn.exec_driver_sql("create schema paj")
@event.listens_for(meta, "after_drop")
def cleanup(target, connection, **kw):
tbl = Table(
"test", meta, Column("id", Integer, primary_key=True), schema="paj"
)
- tbl.create()
- tbl.insert().execute({"id": 1})
- eq_(tbl.select().scalar(), 1)
- tbl.delete(tbl.c.id == 1).execute()
- eq_(tbl.select().scalar(), None)
+ tbl.create(conn)
+ conn.execute(tbl.insert(), {"id": 1})
+ eq_(conn.scalar(tbl.select()), 1)
+ conn.execute(tbl.delete(tbl.c.id == 1))
+ eq_(conn.scalar(tbl.select()), None)
@testing.provide_metadata
- def test_insertid_reserved(self):
+ def test_insertid_reserved(self, connection):
meta = self.metadata
table = Table("select", meta, Column("col", Integer, primary_key=True))
- table.create()
+ table.create(connection)
- table.insert().execute(col=7)
- eq_(table.select().scalar(), 7)
+ connection.execute(table.insert(), {"col": 7})
+ eq_(connection.scalar(table.select()), 7)
class Foo(object):
--- /dev/null
+from decimal import Decimal
+
+from sqlalchemy import BIGINT
+from sqlalchemy import Column
+from sqlalchemy import DECIMAL
+from sqlalchemy import Integer
+from sqlalchemy import Sequence
+from sqlalchemy import String
+from sqlalchemy import Table
+from sqlalchemy.future import select
+from sqlalchemy.testing import eq_
+from sqlalchemy.testing import fixtures
+
+
+class SequenceTest(fixtures.TablesTest):
+ __only_on__ = "mssql"
+ __backend__ = True
+
+ @classmethod
+ def define_tables(cls, metadata):
+ Table(
+ "int_seq_t",
+ metadata,
+ Column("id", Integer, default=Sequence("int_seq")),
+ Column("txt", String(50)),
+ )
+
+ Table(
+ "bigint_seq_t",
+ metadata,
+ Column(
+ "id",
+ BIGINT,
+ default=Sequence(
+ "bigint_seq", data_type=BIGINT, start=3000000000
+ ),
+ ),
+ Column("txt", String(50)),
+ )
+
+ Table(
+ "decimal_seq_t",
+ metadata,
+ Column(
+ "id",
+ DECIMAL(10, 0),
+ default=Sequence(
+ "decimal_seq", data_type=DECIMAL(10, 0), start=3000000000,
+ ),
+ ),
+ Column("txt", String(50)),
+ )
+
+ def test_int_seq(self, connection):
+ t = self.tables.int_seq_t
+ connection.execute(t.insert({"txt": "int_seq test"}))
+ result = connection.scalar(select(t.c.id))
+ eq_(result, 1)
+
+ def test_bigint_seq(self, connection):
+ t = self.tables.bigint_seq_t
+ connection.execute(t.insert({"txt": "bigint_seq test"}))
+ result = connection.scalar(select(t.c.id))
+ eq_(result, 3000000000)
+
+ def test_decimal_seq(self, connection):
+ t = self.tables.decimal_seq_t
+ connection.execute(t.insert({"txt": "decimal_seq test"}))
+ result = connection.scalar(select(t.c.id))
+ eq_(result, Decimal("3000000000"))
metadata,
Column(
"id",
- Integer,
+ testing.db.dialect.sequence_default_column_type,
Sequence("numeric_id_seq", optional=True),
primary_key=True,
+ autoincrement=False,
),
Column(
"numericcol", Numeric(precision=38, scale=20, asdecimal=True)
metadata,
Column(
"id",
- Integer,
+ testing.db.dialect.sequence_default_column_type,
Sequence("numeric_id_seq", optional=True),
primary_key=True,
),
"testtable",
metadata,
Column(
- "pk", Integer, Sequence("testtable_pk_seq"), primary_key=True
+ "pk", Integer, Sequence("testtable_pk_seq"), primary_key=True,
),
)
t = Table(
"t",
self.metadata,
- Column("x", Integer, Sequence("t_id_seq"), primary_key=True),
+ Column(
+ "x",
+ testing.db.dialect.sequence_default_column_type,
+ Sequence("t_id_seq"),
+ primary_key=True,
+ ),
implicit_returning=False,
)
self.metadata.create_all(engine)
seq.create(testing.db)
try:
sess = create_session(bind=testing.db)
- eq_(sess.execute(seq), 1)
+ eq_(sess.execute(seq), testing.db.dialect.default_sequence_base)
finally:
seq.drop(testing.db)
@property
def supports_for_update_of(self):
return only_if(lambda config: config.db.dialect.supports_for_update_of)
+
+ @property
+ def sequences_in_other_clauses(self):
+ """sequences allowed in WHERE, GROUP BY, HAVING, etc."""
+ return skip_if(["mssql", "oracle"])
+
+ @property
+ def supports_lastrowid_for_expressions(self):
+ """sequences allowed in WHERE, GROUP BY, HAVING, etc."""
+ return skip_if("mssql")
+
+ @property
+ def supports_sequence_for_autoincrement_column(self):
+ """for mssql, autoincrement means IDENTITY, not sequence"""
+ return skip_if("mssql")
self.assert_compile(
schema.CreateSequence(s1),
- "CREATE SEQUENCE [SCHEMA__none].s1",
+ "CREATE SEQUENCE [SCHEMA__none].s1 START WITH 1",
schema_translate_map=schema_translate_map,
)
self.assert_compile(
schema.CreateSequence(s2),
- "CREATE SEQUENCE [SCHEMA_foo].s2",
+ "CREATE SEQUENCE [SCHEMA_foo].s2 START WITH 1",
schema_translate_map=schema_translate_map,
)
self.assert_compile(
schema.CreateSequence(s3),
- "CREATE SEQUENCE [SCHEMA_bar].s3",
+ "CREATE SEQUENCE [SCHEMA_bar].s3 START WITH 1",
schema_translate_map=schema_translate_map,
)
metadata,
Column(
"id",
- Integer,
+ testing.db.dialect.sequence_default_column_type,
Sequence("ai_id_seq", optional=True),
primary_key=True,
),
self.assert_(last not in ids)
ids.add(last)
- eq_(ids, set([1, 2, 3, 4]))
+ eq_(
+ ids,
+ set(
+ range(
+ testing.db.dialect.default_sequence_base,
+ testing.db.dialect.default_sequence_base + 4,
+ )
+ ),
+ )
eq_(
list(bind.execute(aitable.select().order_by(aitable.c.id))),
- [(1, 1, None), (2, None, "row 2"), (3, 3, "row 3"), (4, 4, None)],
+ [
+ (testing.db.dialect.default_sequence_base, 1, None),
+ (testing.db.dialect.default_sequence_base + 1, None, "row 2"),
+ (testing.db.dialect.default_sequence_base + 2, 3, "row 3"),
+ (testing.db.dialect.default_sequence_base + 3, 4, None),
+ ],
)
def test_autoincrement_autocommit(self):
)
return dataset_no_autoinc
+ @testing.skip_if(testing.requires.sequences)
def test_col_w_optional_sequence_non_autoinc_no_firing(
self, dataset_no_autoinc, connection
):
@classmethod
def setup_class(cls):
class MyInteger(TypeDecorator):
- impl = Integer
+ impl = testing.db.dialect.sequence_default_column_type
def process_bind_param(self, value, dialect):
if value is None:
t.create(conn)
r = conn.execute(t.insert().values(data=5))
+ expected_result = "INT_" + str(
+ testing.db.dialect.default_sequence_base
+ if (arg and isinstance(arg[0], Sequence))
+ else 1
+ )
+
# we don't pre-fetch 'server_default'.
if "server_default" in kw and (
not testing.db.dialect.implicit_returning
):
eq_(r.inserted_primary_key, [None])
else:
- eq_(r.inserted_primary_key, ["INT_1"])
+ eq_(
+ r.inserted_primary_key, [expected_result],
+ )
- eq_(conn.execute(t.select()).first(), ("INT_1", 5))
+ eq_(
+ conn.execute(t.select()).first(), (expected_result, 5),
+ )
def test_plain(self):
# among other things, tests that autoincrement
meta,
Column(
"id",
- Integer,
+ testing.db.dialect.sequence_default_column_type,
Sequence("t1idseq", optional=True),
primary_key=True,
),
meta,
Column(
"id",
- Integer,
+ testing.db.dialect.sequence_default_column_type,
Sequence("t2idseq", optional=True),
primary_key=True,
),
Table(
"foo",
metadata,
- Column("id", Integer, Sequence("t_id_seq"), primary_key=True),
+ Column(
+ "id",
+ testing.db.dialect.sequence_default_column_type,
+ Sequence("t_id_seq"),
+ primary_key=True,
+ ),
Column("data", String(50)),
Column("x", Integer),
)
t.insert().values(
id=func.next_value(Sequence("t_id_seq")), data="data", x=5
),
- (1, "data", 5),
+ (testing.db.dialect.default_sequence_base, "data", 5),
)
def test_uppercase(self):
t = self.tables.foo
self._test(
t.insert().values(data="data", x=5),
- (1, "data", 5),
- inserted_primary_key=[1],
+ (testing.db.dialect.default_sequence_base, "data", 5),
+ inserted_primary_key=[testing.db.dialect.default_sequence_base],
)
def test_uppercase_direct_params(self):
returning=(1, 5),
)
- @testing.fails_on(
- "mssql", "lowercase table doesn't support identity insert disable"
- )
def test_direct_params(self):
t = self._fixture()
self._test(
inserted_primary_key=[],
)
- @testing.fails_on(
- "mssql", "lowercase table doesn't support identity insert disable"
- )
@testing.requires.returning
def test_direct_params_returning(self):
t = self._fixture()
self._test(
t.insert().values(id=1, data="data", x=5).returning(t.c.id, t.c.x),
- (1, "data", 5),
- returning=(1, 5),
+ (testing.db.dialect.default_sequence_base, "data", 5),
+ returning=(testing.db.dialect.default_sequence_base, 5),
)
+ @testing.requires.emulated_lastrowid_even_with_sequences
@testing.requires.emulated_lastrowid
def test_implicit_pk(self):
t = self._fixture()
self._test(
t.insert().values(data="data", x=5),
- (1, "data", 5),
+ (testing.db.dialect.default_sequence_base, "data", 5),
inserted_primary_key=[],
)
+ @testing.requires.emulated_lastrowid_even_with_sequences
@testing.requires.emulated_lastrowid
def test_implicit_pk_multi_rows(self):
t = self._fixture()
[(1, "d1", 5), (2, "d2", 6), (3, "d3", 7)],
)
+ @testing.requires.emulated_lastrowid_even_with_sequences
@testing.requires.emulated_lastrowid
def test_implicit_pk_inline(self):
t = self._fixture()
self._test(
t.insert().inline().values(data="data", x=5),
- (1, "data", 5),
+ (testing.db.dialect.default_sequence_base, "data", 5),
inserted_primary_key=[],
)
table = Table(
"tables",
meta,
- Column("id", Integer, seq, primary_key=True),
+ Column("id", Integer, seq, primary_key=True,),
Column("data", String(50)),
)
with testing.db.connect() as conn:
r = connection.execute(
table.insert().values(data="hi").returning(table.c.id)
)
- assert r.first() == (1,)
- assert connection.execute(seq) == 2
+ eq_(r.first(), tuple([testing.db.dialect.default_sequence_base]))
+ eq_(
+ connection.execute(seq),
+ testing.db.dialect.default_sequence_base + 1,
+ )
class KeyReturningTest(fixtures.TestBase, AssertsExecutionResults):
def test_create_drop_ddl(self):
self.assert_compile(
- CreateSequence(Sequence("foo_seq")), "CREATE SEQUENCE foo_seq"
+ CreateSequence(Sequence("foo_seq")),
+ "CREATE SEQUENCE foo_seq START WITH 1",
)
self.assert_compile(
self.assert_compile(
CreateSequence(Sequence("foo_seq", increment=2)),
- "CREATE SEQUENCE foo_seq INCREMENT BY 2",
+ "CREATE SEQUENCE foo_seq INCREMENT BY 2 START WITH 1",
)
self.assert_compile(
self.assert_compile(
CreateSequence(Sequence("foo_seq", cache=1000, order=True)),
- "CREATE SEQUENCE foo_seq CACHE 1000 ORDER",
+ "CREATE SEQUENCE foo_seq START WITH 1 CACHE 1000 ORDER",
)
self.assert_compile(
CreateSequence(Sequence("foo_seq", order=True)),
- "CREATE SEQUENCE foo_seq ORDER",
+ "CREATE SEQUENCE foo_seq START WITH 1 ORDER",
)
self.assert_compile(
"""asserts return of next_value is an int"""
assert isinstance(ret, util.int_types)
- assert ret > 0
+ assert ret >= testing.db.dialect.default_sequence_base
def test_implicit_connectionless(self):
s = Sequence("my_sequence", metadata=MetaData(testing.db))
"""asserts return of next_value is an int"""
assert isinstance(ret, util.int_types)
- assert ret > 0
+ assert ret >= testing.db.dialect.default_sequence_base
def test_execute(self, connection):
s = Sequence("my_sequence")
s = Sequence("my_sequence")
self._assert_seq_result(connection.scalar(select([s.next_value()])))
- @testing.fails_on("oracle", "ORA-02287: sequence number not allowed here")
+ @testing.requires.sequences_in_other_clauses
@testing.provide_metadata
def test_func_embedded_whereclause(self, connection):
"""test can use next_value() in whereclause"""
"""test can use next_value() in values() of _ValuesBase"""
metadata = self.metadata
- t1 = Table("t", metadata, Column("x", Integer))
+ t1 = Table("t", metadata, Column("x", Integer),)
t1.create(testing.db)
s = Sequence("my_sequence")
connection.execute(t1.insert().values(x=s.next_value()))
self._assert_seq_result(connection.scalar(t1.select()))
- @testing.requires.supports_lastrowid
+ @testing.requires.no_lastrowid_support
@testing.provide_metadata
- def test_inserted_pk_no_returning_w_lastrowid(self):
- """test inserted_primary_key contains the pk when
- pk_col=next_value(), lastrowid is supported."""
+ def test_inserted_pk_no_returning_no_lastrowid(self):
+ """test inserted_primary_key contains [None] when
+ pk_col=next_value(), implicit returning is not used."""
metadata = self.metadata
t1 = Table("t", metadata, Column("x", Integer, primary_key=True))
t1.create(testing.db)
+
e = engines.testing_engine(options={"implicit_returning": False})
s = Sequence("my_sequence")
-
with e.connect() as conn:
r = conn.execute(t1.insert().values(x=s.next_value()))
- self._assert_seq_result(r.inserted_primary_key[0])
+ eq_(r.inserted_primary_key, [None])
- @testing.requires.no_lastrowid_support
+ @testing.requires.supports_lastrowid
+ @testing.requires.supports_lastrowid_for_expressions
@testing.provide_metadata
- def test_inserted_pk_no_returning_no_lastrowid(self):
- """test inserted_primary_key contains [None] when
- pk_col=next_value(), implicit returning is not used."""
+ def test_inserted_pk_no_returning_w_lastrowid(self):
+ """test inserted_primary_key contains the pk when
+ pk_col=next_value(), lastrowid is supported."""
metadata = self.metadata
- t1 = Table("t", metadata, Column("x", Integer, primary_key=True))
+ t1 = Table("t", metadata, Column("x", Integer, primary_key=True,),)
t1.create(testing.db)
-
e = engines.testing_engine(options={"implicit_returning": False})
s = Sequence("my_sequence")
+
with e.connect() as conn:
r = conn.execute(t1.insert().values(x=s.next_value()))
- eq_(r.inserted_primary_key, [None])
+ self._assert_seq_result(r.inserted_primary_key[0])
@testing.requires.returning
@testing.provide_metadata
metadata = self.metadata
s = Sequence("my_sequence")
- t1 = Table("t", metadata, Column("x", Integer, primary_key=True))
+ t1 = Table("t", metadata, Column("x", Integer, primary_key=True,),)
t1.create(testing.db)
e = engines.testing_engine(options={"implicit_returning": True})
try:
with testing.db.connect() as conn:
values = [conn.execute(seq) for i in range(3)]
- start = seq.start or 1
+ start = seq.start or testing.db.dialect.default_sequence_base
inc = seq.increment or 1
eq_(values, list(range(start, start + inc * 3, inc)))
assert not self._has_sequence(connection, "s2")
@testing.requires.returning
+ @testing.requires.supports_sequence_for_autoincrement_column
@testing.provide_metadata
def test_freestanding_sequence_via_autoinc(self, connection):
t = Table(
"cartitems",
metadata,
Column(
- "cart_id", Integer, Sequence("cart_id_seq"), primary_key=True
+ "cart_id",
+ Integer,
+ Sequence("cart_id_seq"),
+ primary_key=True,
+ autoincrement=False,
),
Column("description", String(40)),
Column("createdate", sa.DateTime()),
Table(
"Manager",
metadata,
- Column("obj_id", Integer, Sequence("obj_id_seq")),
+ Column("obj_id", Integer, Sequence("obj_id_seq"),),
Column("name", String(128)),
Column(
"id",
connection.execute(cartitems.insert(), dict(description="there"))
r = connection.execute(cartitems.insert(), dict(description="lala"))
- eq_(r.inserted_primary_key[0], 3)
+ expected = 2 + testing.db.dialect.default_sequence_base
+ eq_(r.inserted_primary_key[0], expected)
eq_(
connection.scalar(
cartitems.c.description == "lala"
),
),
- 3,
+ expected,
)
def test_seq_nonpk(self):
conn.execute(
sometable.insert(), [{"name": "name3"}, {"name": "name4"}]
)
+
+ dsb = testing.db.dialect.default_sequence_base
eq_(
list(
conn.execute(sometable.select().order_by(sometable.c.id))
),
[
- (1, "somename", 1),
- (2, "someother", 2),
- (3, "name3", 3),
- (4, "name4", 4),
+ (dsb, "somename", dsb,),
+ (dsb + 1, "someother", dsb + 1,),
+ (dsb + 2, "name3", dsb + 2,),
+ (dsb + 3, "name4", dsb + 3,),
],
)