--- /dev/null
+.. change::
+ :tags: feature, mssql
+ :tickets: 4086
+
+ Added a new :class:`.mssql.TIMESTAMP` datatype, that
+ correctly acts like a binary datatype for SQL Server
+ rather than a datetime type, as SQL Server breaks the
+ SQL standard here. Also added :class:`.mssql.ROWVERSION`,
+ as the "TIMESTAMP" type in SQL Server is deprecated in
+ favor of ROWVERSION.
.. autoclass:: REAL
:members: __init__
+.. autoclass:: ROWVERSION
+ :members: __init__
.. autoclass:: SMALLDATETIME
:members: __init__
:members: __init__
+.. autoclass:: TIMESTAMP
+ :members: __init__
+
.. autoclass:: TINYINT
:members: __init__
INTEGER, BIGINT, SMALLINT, TINYINT, VARCHAR, NVARCHAR, CHAR, \
NCHAR, TEXT, NTEXT, DECIMAL, NUMERIC, FLOAT, DATETIME,\
DATETIME2, DATETIMEOFFSET, DATE, TIME, SMALLDATETIME, \
- BINARY, VARBINARY, BIT, REAL, IMAGE, TIMESTAMP,\
+ BINARY, VARBINARY, BIT, REAL, IMAGE, TIMESTAMP, ROWVERSION, \
MONEY, SMALLMONEY, UNIQUEIDENTIFIER, SQL_VARIANT, dialect
'INTEGER', 'BIGINT', 'SMALLINT', 'TINYINT', 'VARCHAR', 'NVARCHAR', 'CHAR',
'NCHAR', 'TEXT', 'NTEXT', 'DECIMAL', 'NUMERIC', 'FLOAT', 'DATETIME',
'DATETIME2', 'DATETIMEOFFSET', 'DATE', 'TIME', 'SMALLDATETIME',
- 'BINARY', 'VARBINARY', 'BIT', 'REAL', 'IMAGE', 'TIMESTAMP',
+ 'BINARY', 'VARBINARY', 'BIT', 'REAL', 'IMAGE', 'TIMESTAMP', 'ROWVERSION',
'MONEY', 'SMALLMONEY', 'UNIQUEIDENTIFIER', 'SQL_VARIANT', 'dialect'
)
"""
+import codecs
import datetime
import operator
import re
from ...engine import reflection, default
from ... import types as sqltypes
from ...types import INTEGER, BIGINT, SMALLINT, DECIMAL, NUMERIC, \
- FLOAT, TIMESTAMP, DATETIME, DATE, BINARY,\
+ FLOAT, DATETIME, DATE, BINARY,\
TEXT, VARCHAR, NVARCHAR, CHAR, NCHAR
super(_StringType, self).__init__(collation=collation)
+class TIMESTAMP(sqltypes._Binary):
+ """Implement the SQL Server TIMESTAMP type.
+
+ Note this is **completely different** than the SQL Standard
+ TIMESTAMP type, which is not supported by SQL Server. It
+ is a read-only datatype that does not support INSERT of values.
+
+ .. versionadded:: 1.2
+
+ .. seealso::
+
+ :class:`.mssql.ROWVERSION`
+
+ """
+
+ __visit_name__ = 'TIMESTAMP'
+
+ # expected by _Binary to be present
+ length = None
+
+ def __init__(self, convert_int=False):
+ """Construct a TIMESTAMP or ROWVERSION type.
+
+ :param convert_int: if True, binary integer values will
+ be converted to integers on read.
+
+ .. versionadded:: 1.2
+
+ """
+ self.convert_int = convert_int
+
+ def result_processor(self, dialect, coltype):
+ super_ = super(TIMESTAMP, self).result_processor(dialect, coltype)
+ if self.convert_int:
+ def process(value):
+ value = super_(value)
+ if value is not None:
+ # https://stackoverflow.com/a/30403242/34549
+ value = int(codecs.encode(value, 'hex'), 16)
+ return value
+ return process
+ else:
+ return super_
+
+
+class ROWVERSION(TIMESTAMP):
+ """Implement the SQL Server ROWVERSION type.
+
+ The ROWVERSION datatype is a SQL Server synonym for the TIMESTAMP
+ datatype, however current SQL Server documentation suggests using
+ ROWVERSION for new datatypes going forward.
+
+ The ROWVERSION datatype does **not** reflect (e.g. introspect) from the
+ database as itself; the returned datatype will be
+ :class:`.mssql.TIMESTAMP`.
+
+ This is a read-only datatype that does not support INSERT of values.
+
+ .. versionadded:: 1.2
+
+ .. seealso::
+
+ :class:`.mssql.TIMESTAMP`
+
+ """
+
+ __visit_name__ = 'ROWVERSION'
+
+
class NTEXT(sqltypes.UnicodeText):
"""MSSQL NTEXT type, for variable-length unicode text up to 2^30
else:
return "TIME"
+ def visit_TIMESTAMP(self, type_, **kw):
+ return "TIMESTAMP"
+
+ def visit_ROWVERSION(self, type_, **kw):
+ return "ROWVERSION"
+
def visit_DATETIME2(self, type_, **kw):
precision = getattr(type_, 'precision', None)
if precision is not None:
import datetime
import os
from sqlalchemy import Table, Column, MetaData, Float, \
- Integer, String, Boolean, TIMESTAMP, Sequence, Numeric, select, \
+ Integer, String, Boolean, Sequence, Numeric, select, \
Date, Time, DateTime, DefaultClause, PickleType, text, Text, \
UnicodeText, LargeBinary
+from sqlalchemy.dialects.mssql import TIMESTAMP, ROWVERSION
from sqlalchemy import types, schema
from sqlalchemy import util
from sqlalchemy.databases import mssql
from sqlalchemy.testing import emits_warning_on
import decimal
from sqlalchemy.util import b
+from sqlalchemy import inspect
+from sqlalchemy.sql import sqltypes
+import sqlalchemy as sa
+import codecs
class TimeTypeTest(fixtures.TestBase):
eq_(r, exp)
+class RowVersionTest(fixtures.TablesTest):
+ __only_on__ = 'mssql'
+ __backend__ = True
+
+ @classmethod
+ def define_tables(cls, metadata):
+ Table(
+ 'rv_t', metadata,
+ Column('data', String(50)),
+ Column('rv', ROWVERSION)
+ )
+
+ Table(
+ 'ts_t', metadata,
+ Column('data', String(50)),
+ Column('rv', TIMESTAMP)
+ )
+
+ def test_rowversion_reflection(self):
+ # ROWVERSION is only a synonym for TIMESTAMP
+ insp = inspect(testing.db)
+ assert isinstance(
+ insp.get_columns('rv_t')[1]['type'], TIMESTAMP
+ )
+
+ def test_timestamp_reflection(self):
+ insp = inspect(testing.db)
+ assert isinstance(
+ insp.get_columns('ts_t')[1]['type'], TIMESTAMP
+ )
+
+ def test_class_hierarchy(self):
+ """TIMESTAMP and ROWVERSION aren't datetime types, theyre binary."""
+
+ assert issubclass(TIMESTAMP, sqltypes._Binary)
+ assert issubclass(ROWVERSION, sqltypes._Binary)
+
+ def test_round_trip_ts(self):
+ self._test_round_trip('ts_t', TIMESTAMP, False)
+
+ def test_round_trip_rv(self):
+ self._test_round_trip('rv_t', ROWVERSION, False)
+
+ def test_round_trip_ts_int(self):
+ self._test_round_trip('ts_t', TIMESTAMP, True)
+
+ def test_round_trip_rv_int(self):
+ self._test_round_trip('rv_t', ROWVERSION, True)
+
+ def _test_round_trip(self, tab, cls, convert_int):
+ t = Table(
+ tab, MetaData(),
+ Column('data', String(50)),
+ Column('rv', cls(convert_int=convert_int))
+ )
+
+ with testing.db.connect() as conn:
+ conn.execute(t.insert().values(data='foo'))
+ last_ts_1 = conn.scalar("SELECT @@DBTS")
+
+ if convert_int:
+ last_ts_1 = int(codecs.encode(last_ts_1, 'hex'), 16)
+
+ eq_(conn.scalar(select([t.c.rv])), last_ts_1)
+
+ conn.execute(
+ t.update().values(data='bar').where(t.c.data == 'foo'))
+ last_ts_2 = conn.scalar("SELECT @@DBTS")
+ if convert_int:
+ last_ts_2 = int(codecs.encode(last_ts_2, 'hex'), 16)
+
+ eq_(conn.scalar(select([t.c.rv])), last_ts_2)
+
+ def test_cant_insert_rowvalue(self):
+ self._test_cant_insert(self.tables.rv_t)
+
+ def test_cant_insert_timestamp(self):
+ self._test_cant_insert(self.tables.ts_t)
+
+ def _test_cant_insert(self, tab):
+ with testing.db.connect() as conn:
+ assert_raises_message(
+ sa.exc.DBAPIError,
+ r".*Cannot insert an explicit value into a timestamp column.",
+ conn.execute,
+ tab.insert().values(data='ins', rv=b'000')
+ )
+
+
class TypeDDLTest(fixtures.TestBase):
def test_boolean(self):
"IMAGE"
)
- def test_timestamp(self):
- """Exercise TIMESTAMP column."""
-
- dialect = mssql.dialect()
-
- metadata = MetaData()
- spec, expected = (TIMESTAMP, 'TIMESTAMP')
- t = Table(
- 'mssql_ts', metadata,
- Column('id', Integer, primary_key=True),
- Column('t', spec, nullable=None))
- gen = dialect.ddl_compiler(dialect, schema.CreateTable(t))
- testing.eq_(gen.get_column_specification(t.c.t), "t %s" % expected)
- self.assert_(repr(t.c.t))
-
def test_money(self):
"""Exercise type specification for money types."""
Column('test3', sa.Text),
Column('test4', sa.Numeric(10, 2), nullable=False),
Column('test5', sa.Date),
- Column('test5_1', sa.TIMESTAMP),
Column('parent_user_id', sa.Integer,
sa.ForeignKey('%susers.user_id' % schema_prefix)),
Column('test6', sa.Date, nullable=False),