From 5b09d8179e6bd329829f7c196cb7c05f8fca20bd Mon Sep 17 00:00:00 2001 From: Michael Trier Date: Tue, 23 Dec 2008 04:08:13 +0000 Subject: [PATCH] Merge branch 'collation' --- CHANGES | 24 +- .../databases/information_schema.py | 1 + lib/sqlalchemy/databases/mssql.py | 342 ++++++++++++++++-- test/dialect/mssql.py | 314 ++++++++++++++++ 4 files changed, 645 insertions(+), 36 deletions(-) diff --git a/CHANGES b/CHANGES index 615d911207..955301f2dc 100644 --- a/CHANGES +++ b/CHANGES @@ -38,6 +38,9 @@ CHANGES classes used for both the dynamic collection and the queries built from it. +- mssql + - Added in the MSReal and MSNText types. + - bugfixes, behavioral changes - general - orm @@ -199,6 +202,10 @@ CHANGES new doc section "Custom Comparators". - mssql + - Added collation support through the use of a new collation + argument. This is supported on the following types: char, + nchar, varchar, nvarchar, text, ntext. [ticket:1248] + - Changes to the connection string parameters favor DSN as the default specification for pyodbc. See the mssql.py docstring for detailed usage instructions. @@ -206,14 +213,15 @@ CHANGES - Added experimental support of savepoints. It currently does not work fully with sessions. - - Support for three levels of column nullability: NULL, NOT NULL, - and the database's configured default. The default Column - configuration (nullable=True) will now generate NULL in the DDL. - Previously no specification was emitted and the database default - would take effect (usually NULL, but not always). To explicitly - request the database default, configure columns with - nullable=None and no specification will be emitted in DDL. This - is backwards incompatible behavior. [ticket:1243] + - Support for three levels of column nullability: NULL, + NOT NULL, and the database's configured default. The default + Column configuration (nullable=True) will now generate NULL + in the DDL. Previously no specification was emitted and the + database default would take effect (usually NULL, but not + always). To explicitly request the database default, + configure columns with nullable=None and no specification + will be emitted in DDL. This is backwards incompatible + behavior. [ticket:1243] - postgres - Calling alias.execute() in conjunction with diff --git a/lib/sqlalchemy/databases/information_schema.py b/lib/sqlalchemy/databases/information_schema.py index 659bfc33a0..b15082ac2e 100644 --- a/lib/sqlalchemy/databases/information_schema.py +++ b/lib/sqlalchemy/databases/information_schema.py @@ -29,6 +29,7 @@ columns = Table("columns", ischema, Column("numeric_precision", Integer), Column("numeric_scale", Integer), Column("column_default", Integer), + Column("collation_name", String), schema="information_schema") constraints = Table("table_constraints", ischema, diff --git a/lib/sqlalchemy/databases/mssql.py b/lib/sqlalchemy/databases/mssql.py index 8fb6bfa4ab..7d2cae5b90 100644 --- a/lib/sqlalchemy/databases/mssql.py +++ b/lib/sqlalchemy/databases/mssql.py @@ -151,6 +151,21 @@ optional and will default to 1,1. * Support for auto-fetching of ``@@IDENTITY/@@SCOPE_IDENTITY()`` on ``INSERT`` +Collation Support +----------------- + +MSSQL specific string types support a collation parameter that +creates a column-level specific collation for the column. The +collation parameter accepts a Windows Collation Name or a SQL +Collation Name. Supported types are MSChar, MSNChar, MSString, +MSNVarchar, MSText, and MSNText. For example:: + + Column('login', String(32, collation='Latin1_General_CI_AS')) + +will yield:: + + login VARCHAR(32) COLLATE Latin1_General_CI_AS NULL + LIMIT/OFFSET Support -------------------- @@ -194,7 +209,7 @@ Known Issues does **not** work around """ -import datetime, operator, re, sys, urllib +import datetime, inspect, operator, re, sys, urllib from sqlalchemy import sql, schema, exc, util from sqlalchemy.sql import compiler, expression, operators as sqlops, functions as sql_functions @@ -206,6 +221,39 @@ from decimal import Decimal as _python_Decimal MSSQL_RESERVED_WORDS = set(['function']) +class _StringType(object): + """Base for MSSQL string types.""" + + def __init__(self, collation=None, **kwargs): + self.collation = kwargs.get('collate', collation) + + def _extend(self, spec): + """Extend a string-type declaration with standard SQL + COLLATE annotations. + """ + + if self.collation: + collation = 'COLLATE %s' % self.collation + else: + collation = None + + return ' '.join([c for c in (spec, collation) + if c is not None]) + + def __repr__(self): + attributes = inspect.getargspec(self.__init__)[0][1:] + attributes.extend(inspect.getargspec(_StringType.__init__)[0][1:]) + + params = {} + for attr in attributes: + val = getattr(self, attr) + if val is not None and val is not False: + params[attr] = val + + return "%s(%s)" % (self.__class__.__name__, + ', '.join(['%s=%r' % (k, params[k]) for k in params])) + + class MSNumeric(sqltypes.Numeric): def result_processor(self, dialect): if self.asdecimal: @@ -242,9 +290,13 @@ class MSNumeric(sqltypes.Numeric): else: return "NUMERIC(%(precision)s, %(scale)s)" % {'precision': self.precision, 'scale' : self.scale} + class MSFloat(sqltypes.Float): def get_col_spec(self): - return "FLOAT(%(precision)s)" % {'precision': self.precision} + if self.precision is None: + return "FLOAT" + else: + return "FLOAT(%(precision)s)" % {'precision': self.precision} def bind_processor(self, dialect): def process(value): @@ -254,22 +306,52 @@ class MSFloat(sqltypes.Float): return None return process + +class MSReal(MSFloat): + """A type for ``real`` numbers.""" + + def __init__(self): + """ + Construct a Real. + + """ + super(MSReal, self).__init__(precision=24) + + def adapt(self, impltype): + return impltype() + + def bind_processor(self, dialect): + def process(value): + if value is not None: + return float(value) + else: + return value + return process + + def get_col_spec(self): + return "REAL" + + class MSInteger(sqltypes.Integer): def get_col_spec(self): return "INTEGER" + class MSBigInteger(MSInteger): def get_col_spec(self): return "BIGINT" + class MSTinyInteger(MSInteger): def get_col_spec(self): return "TINYINT" + class MSSmallInteger(MSInteger): def get_col_spec(self): return "SMALLINT" + class MSDateTime(sqltypes.DateTime): def __init__(self, *a, **kw): super(MSDateTime, self).__init__(False) @@ -277,6 +359,7 @@ class MSDateTime(sqltypes.DateTime): def get_col_spec(self): return "DATETIME" + class MSSmallDate(sqltypes.Date): def __init__(self, *a, **kw): super(MSSmallDate, self).__init__(False) @@ -292,6 +375,7 @@ class MSSmallDate(sqltypes.Date): return value return process + class MSDate(sqltypes.Date): def __init__(self, *a, **kw): super(MSDate, self).__init__(False) @@ -307,6 +391,7 @@ class MSDate(sqltypes.Date): return value return process + class MSTime(sqltypes.Time): __zero_date = datetime.date(1900, 1, 1) @@ -334,6 +419,7 @@ class MSTime(sqltypes.Time): return value return process + class MSDateTime_adodbapi(MSDateTime): def result_processor(self, dialect): def process(value): @@ -344,6 +430,7 @@ class MSDateTime_adodbapi(MSDateTime): return value return process + class MSDateTime_pyodbc(MSDateTime): def bind_processor(self, dialect): def process(value): @@ -352,6 +439,7 @@ class MSDateTime_pyodbc(MSDateTime): return value return process + class MSDate_pyodbc(MSDate): def bind_processor(self, dialect): def process(value): @@ -360,41 +448,233 @@ class MSDate_pyodbc(MSDate): return value return process -class MSText(sqltypes.Text): + +class MSText(_StringType, sqltypes.Text): + """MSSQL TEXT type, for variable-length text up to 2^31 characters.""" + + def __init__(self, *args, **kwargs): + """Construct a TEXT. + + :param collation: Optional, a column-level collation for this string + value. Accepts a Windows Collation Name or a SQL Collation Name. + + """ + _StringType.__init__(self, **kwargs) + sqltypes.Text.__init__(self, None, + convert_unicode=kwargs.get('convert_unicode', False), + assert_unicode=kwargs.get('assert_unicode', None)) + def get_col_spec(self): if self.dialect.text_as_varchar: - return "VARCHAR(max)" + return self._extend("VARCHAR(max)") else: - return "TEXT" + return self._extend("TEXT") + + +class MSNText(_StringType, sqltypes.UnicodeText): + """MSSQL NTEXT type, for variable-length unicode text up to 2^30 + characters.""" + + def __init__(self, *args, **kwargs): + """Construct a NTEXT. + + :param collation: Optional, a column-level collation for this string + value. Accepts a Windows Collation Name or a SQL Collation Name. + + """ + _StringType.__init__(self, **kwargs) + sqltypes.UnicodeText.__init__(self, None, + convert_unicode=kwargs.get('convert_unicode', True), + assert_unicode=kwargs.get('assert_unicode', 'warn')) + + def get_col_spec(self): + if self.dialect.text_as_varchar: + return self._extend("NVARCHAR(max)") + else: + return self._extend("NTEXT") + + +class MSString(_StringType, sqltypes.String): + """MSSQL VARCHAR type, for variable-length non-Unicode data with a maximum + of 8,000 characters.""" + + def __init__(self, length=None, convert_unicode=False, assert_unicode=None, **kwargs): + """Construct a VARCHAR. + + :param length: Optinal, maximum data length, in characters. + + :param convert_unicode: defaults to False. If True, convert + ``unicode`` data sent to the database to a ``str`` + bytestring, and convert bytestrings coming back from the + database into ``unicode``. + + Bytestrings are encoded using the dialect's + :attr:`~sqlalchemy.engine.base.Dialect.encoding`, which + defaults to `utf-8`. + + If False, may be overridden by + :attr:`sqlalchemy.engine.base.Dialect.convert_unicode`. + + :param assert_unicode: + + If None (the default), no assertion will take place unless + overridden by :attr:`sqlalchemy.engine.base.Dialect.assert_unicode`. + + If 'warn', will issue a runtime warning if a ``str`` + instance is used as a bind value. + + If true, will raise an :exc:`sqlalchemy.exc.InvalidRequestError`. + + :param collation: Optional, a column-level collation for this string + value. Accepts a Windows Collation Name or a SQL Collation Name. + + """ + _StringType.__init__(self, **kwargs) + sqltypes.String.__init__(self, length=length, + convert_unicode=convert_unicode, + assert_unicode=assert_unicode) -class MSString(sqltypes.String): def get_col_spec(self): - return "VARCHAR" + (self.length and "(%d)" % self.length or "") + if self.length: + return self._extend("VARCHAR(%s)" % self.length) + else: + return self._extend("VARCHAR") + + +class MSNVarchar(_StringType, sqltypes.Unicode): + """MSSQL NVARCHAR type. + + For variable-length unicode character data up to 4,000 characters.""" + + def __init__(self, length=None, **kwargs): + """Construct a NVARCHAR. + + :param length: Optional, Maximum data length, in characters. + + :param collation: Optional, a column-level collation for this string + value. Accepts a Windows Collation Name or a SQL Collation Name. + + """ + _StringType.__init__(self, **kwargs) + sqltypes.Unicode.__init__(self, length=length, + convert_unicode=kwargs.get('convert_unicode', True), + assert_unicode=kwargs.get('assert_unicode', 'warn')) + + def adapt(self, impltype): + return impltype(length=self.length, + convert_unicode=self.convert_unicode, + assert_unicode=self.assert_unicode, + collation=self.collation) -class MSNVarchar(sqltypes.Unicode): def get_col_spec(self): if self.length: - return "NVARCHAR(%(length)s)" % {'length' : self.length} - elif self.dialect.text_as_varchar: - return "NVARCHAR(max)" + return self._extend("NVARCHAR(%(length)s)" % {'length' : self.length}) else: - return "NTEXT" + return self._extend("NVARCHAR") -class AdoMSNVarchar(MSNVarchar): + +class AdoMSNVarchar(_StringType, sqltypes.Unicode): """overrides bindparam/result processing to not convert any unicode strings""" + + def __init__(self, length=None, **kwargs): + """Construct a NVARCHAR. + + :param length: Optional, Maximum data length, in characters. + + :param collation: Optional, a column-level collation for this string + value. Accepts a Windows Collation Name or a SQL Collation Name. + + """ + _StringType.__init__(self, **kwargs) + sqltypes.Unicode.__init__(self, length=length, + convert_unicode=kwargs.get('convert_unicode', True), + assert_unicode=kwargs.get('assert_unicode', 'warn')) + def bind_processor(self, dialect): return None def result_processor(self, dialect): return None -class MSChar(sqltypes.CHAR): def get_col_spec(self): - return "CHAR(%(length)s)" % {'length' : self.length} + if self.length: + return self._extend("NVARCHAR(%(length)s)" % {'length' : self.length}) + else: + return self._extend("NVARCHAR") + + +class MSChar(_StringType, sqltypes.CHAR): + """MSSQL CHAR type, for fixed-length non-Unicode data with a maximum + of 8,000 characters.""" + + def __init__(self, length=None, convert_unicode=False, assert_unicode=None, **kwargs): + """Construct a CHAR. + + :param length: Optinal, maximum data length, in characters. + + :param convert_unicode: defaults to False. If True, convert + ``unicode`` data sent to the database to a ``str`` + bytestring, and convert bytestrings coming back from the + database into ``unicode``. + + Bytestrings are encoded using the dialect's + :attr:`~sqlalchemy.engine.base.Dialect.encoding`, which + defaults to `utf-8`. + + If False, may be overridden by + :attr:`sqlalchemy.engine.base.Dialect.convert_unicode`. + + :param assert_unicode: + + If None (the default), no assertion will take place unless + overridden by :attr:`sqlalchemy.engine.base.Dialect.assert_unicode`. + + If 'warn', will issue a runtime warning if a ``str`` + instance is used as a bind value. + + If true, will raise an :exc:`sqlalchemy.exc.InvalidRequestError`. + + :param collation: Optional, a column-level collation for this string + value. Accepts a Windows Collation Name or a SQL Collation Name. + + """ + _StringType.__init__(self, **kwargs) + sqltypes.CHAR.__init__(self, length=length, + convert_unicode=convert_unicode, + assert_unicode=assert_unicode) + + def get_col_spec(self): + if self.length: + return self._extend("CHAR(%s)" % self.length) + else: + return self._extend("CHAR") + + +class MSNChar(_StringType, sqltypes.NCHAR): + """MSSQL NCHAR type. + + For fixed-length unicode character data up to 4,000 characters.""" + + def __init__(self, length=None, **kwargs): + """Construct an NCHAR. + + :param length: Optional, Maximum data length, in characters. + + :param collation: Optional, a column-level collation for this string + value. Accepts a Windows Collation Name or a SQL Collation Name. + + """ + _StringType.__init__(self, **kwargs) + sqltypes.NCHAR.__init__(self, length=length, + convert_unicode=kwargs.get('convert_unicode', True), + assert_unicode=kwargs.get('assert_unicode', 'warn')) -class MSNChar(sqltypes.NCHAR): def get_col_spec(self): - return "NCHAR(%(length)s)" % {'length' : self.length} + if self.length: + return self._extend("NCHAR(%(length)s)" % {'length' : self.length}) + else: + return self._extend("NCHAR") + class MSBinary(sqltypes.Binary): def get_col_spec(self): @@ -557,6 +837,7 @@ class MSSQLDialect(default.DefaultDialect): sqltypes.Binary : MSBinary, sqltypes.Boolean : MSBoolean, sqltypes.Text : MSText, + sqltypes.UnicodeText : MSNText, sqltypes.CHAR: MSChar, sqltypes.NCHAR: MSNChar, sqltypes.TIMESTAMP: MSTimeStamp, @@ -572,7 +853,7 @@ class MSSQLDialect(default.DefaultDialect): 'char' : MSChar, 'nchar' : MSNChar, 'text' : MSText, - 'ntext' : MSText, + 'ntext' : MSNText, 'decimal' : MSNumeric, 'numeric' : MSNumeric, 'float' : MSFloat, @@ -670,7 +951,7 @@ class MSSQLDialect(default.DefaultDialect): def type_descriptor(self, typeobj): newobj = sqltypes.adapt_type(typeobj, self.colspecs) # Some types need to know about the dialect - if isinstance(newobj, (MSText, MSNVarchar)): + if isinstance(newobj, (MSText, MSNText)): newobj.dialect = self return newobj @@ -728,14 +1009,15 @@ class MSSQLDialect(default.DefaultDialect): if row is None: break found_table = True - (name, type, nullable, charlen, numericprec, numericscale, default) = ( + (name, type, nullable, charlen, numericprec, numericscale, default, collation) = ( row[columns.c.column_name], row[columns.c.data_type], row[columns.c.is_nullable] == 'YES', row[columns.c.character_maximum_length], row[columns.c.numeric_precision], row[columns.c.numeric_scale], - row[columns.c.column_default] + row[columns.c.column_default], + row[columns.c.collation_name] ) if include_columns and name not in include_columns: continue @@ -745,8 +1027,14 @@ class MSSQLDialect(default.DefaultDialect): if a is not None: args.append(a) coltype = self.ischema_names.get(type, None) + + kwargs = {} + if coltype in (MSString, MSChar, MSNVarchar, AdoMSNVarchar, MSNChar, MSText, MSNText): + if collation: + kwargs.update(collation=collation) + if coltype == MSText or (coltype == MSString and charlen == -1): - coltype = MSText() + coltype = MSText(**kwargs) else: if coltype is None: util.warn("Did not recognize type '%s' of column '%s'" % @@ -755,7 +1043,7 @@ class MSSQLDialect(default.DefaultDialect): elif coltype in (MSNVarchar, AdoMSNVarchar) and charlen == -1: args[0] = None - coltype = coltype(*args) + coltype = coltype(*args, **kwargs) colargs = [] if default is not None: colargs.append(schema.DefaultClause(sql.text(default))) @@ -912,16 +1200,14 @@ class MSSQLDialect_pyodbc(MSSQLDialect): return module colspecs = MSSQLDialect.colspecs.copy() - if supports_unicode: - colspecs[sqltypes.Unicode] = AdoMSNVarchar colspecs[sqltypes.Date] = MSDate_pyodbc colspecs[sqltypes.DateTime] = MSDateTime_pyodbc - ischema_names = MSSQLDialect.ischema_names.copy() - if supports_unicode: - ischema_names['nvarchar'] = AdoMSNVarchar ischema_names['smalldatetime'] = MSDate_pyodbc ischema_names['datetime'] = MSDateTime_pyodbc + if supports_unicode: + colspecs[sqltypes.Unicode] = AdoMSNVarchar + ischema_names['nvarchar'] = AdoMSNVarchar def make_connect_string(self, keys, query): if 'max_identifier_length' in keys: diff --git a/test/dialect/mssql.py b/test/dialect/mssql.py index e38ee82b78..440221cb6d 100755 --- a/test/dialect/mssql.py +++ b/test/dialect/mssql.py @@ -7,6 +7,7 @@ from sqlalchemy.sql import table, column from sqlalchemy.databases import mssql import sqlalchemy.engine.url as url from testlib import * +from testlib.testing import eq_ class CompileTest(TestBase, AssertsCompiledSQL): @@ -503,5 +504,318 @@ class TypesTest(TestBase): except: assert False + +class TypesTest2(TestBase, AssertsExecutionResults): + "Test Microsoft SQL Server column types" + + __only_on__ = 'mssql' + + def test_money(self): + "Exercise type specification for money types." + + columns = [ + # column type, args, kwargs, expected ddl + (mssql.MSMoney, [], {}, + 'MONEY'), + (mssql.MSSmallMoney, [], {}, + 'SMALLMONEY'), + ] + + table_args = ['test_mssql_money', MetaData(testing.db)] + for index, spec in enumerate(columns): + type_, args, kw, res = spec + table_args.append(Column('c%s' % index, type_(*args, **kw), nullable=None)) + + money_table = Table(*table_args) + gen = testing.db.dialect.schemagenerator(testing.db.dialect, testing.db, None, None) + + for col in money_table.c: + index = int(col.name[1:]) + testing.eq_(gen.get_column_specification(col), + "%s %s" % (col.name, columns[index][3])) + self.assert_(repr(col)) + + try: + money_table.create(checkfirst=True) + assert True + except: + raise + money_table.drop() + + def test_binary(self): + "Exercise type specification for binary types." + + columns = [ + # column type, args, kwargs, expected ddl + (mssql.MSBinary, [], {}, + 'IMAGE') + ] + + table_args = ['test_mssql_binary', MetaData(testing.db)] + for index, spec in enumerate(columns): + type_, args, kw, res = spec + table_args.append(Column('c%s' % index, type_(*args, **kw), nullable=None)) + + binary_table = Table(*table_args) + gen = testing.db.dialect.schemagenerator(testing.db.dialect, testing.db, None, None) + + for col in binary_table.c: + index = int(col.name[1:]) + testing.eq_(gen.get_column_specification(col), + "%s %s" % (col.name, columns[index][3])) + self.assert_(repr(col)) + + try: + binary_table.create(checkfirst=True) + assert True + except: + raise + binary_table.drop() + + def test_boolean(self): + "Exercise type specification for boolean type." + + columns = [ + # column type, args, kwargs, expected ddl + (mssql.MSBoolean, [], {}, + 'BIT'), + ] + + table_args = ['test_mssql_boolean', MetaData(testing.db)] + for index, spec in enumerate(columns): + type_, args, kw, res = spec + table_args.append(Column('c%s' % index, type_(*args, **kw), nullable=None)) + + boolean_table = Table(*table_args) + gen = testing.db.dialect.schemagenerator(testing.db.dialect, testing.db, None, None) + + for col in boolean_table.c: + index = int(col.name[1:]) + testing.eq_(gen.get_column_specification(col), + "%s %s" % (col.name, columns[index][3])) + self.assert_(repr(col)) + + try: + boolean_table.create(checkfirst=True) + assert True + except: + raise + boolean_table.drop() + + def test_numeric(self): + "Exercise type specification and options for numeric types." + + columns = [ + # column type, args, kwargs, expected ddl + (mssql.MSNumeric, [], {}, + 'NUMERIC(10, 2)'), + (mssql.MSNumeric, [None], {}, + 'NUMERIC'), + (mssql.MSNumeric, [12], {}, + 'NUMERIC(12, 2)'), + (mssql.MSNumeric, [12, 4], {}, + 'NUMERIC(12, 4)'), + + (mssql.MSFloat, [], {}, + 'FLOAT(10)'), + (mssql.MSFloat, [None], {}, + 'FLOAT'), + (mssql.MSFloat, [12], {}, + 'FLOAT(12)'), + (mssql.MSReal, [], {}, + 'REAL'), + + (mssql.MSInteger, [], {}, + 'INTEGER'), + (mssql.MSBigInteger, [], {}, + 'BIGINT'), + (mssql.MSTinyInteger, [], {}, + 'TINYINT'), + (mssql.MSSmallInteger, [], {}, + 'SMALLINT'), + ] + + table_args = ['test_mssql_numeric', MetaData(testing.db)] + for index, spec in enumerate(columns): + type_, args, kw, res = spec + table_args.append(Column('c%s' % index, type_(*args, **kw), nullable=None)) + + numeric_table = Table(*table_args) + gen = testing.db.dialect.schemagenerator(testing.db.dialect, testing.db, None, None) + + for col in numeric_table.c: + index = int(col.name[1:]) + testing.eq_(gen.get_column_specification(col), + "%s %s" % (col.name, columns[index][3])) + self.assert_(repr(col)) + + try: + numeric_table.create(checkfirst=True) + assert True + except: + raise + numeric_table.drop() + + def test_char(self): + """Exercise COLLATE-ish options on string types.""" + + columns = [ + (mssql.MSChar, [], {}, + 'CHAR'), + (mssql.MSChar, [1], {}, + 'CHAR(1)'), + (mssql.MSChar, [1], {'collation': 'Latin1_General_CI_AS'}, + 'CHAR(1) COLLATE Latin1_General_CI_AS'), + + (mssql.MSNChar, [], {}, + 'NCHAR'), + (mssql.MSNChar, [1], {}, + 'NCHAR(1)'), + (mssql.MSNChar, [1], {'collation': 'Latin1_General_CI_AS'}, + 'NCHAR(1) COLLATE Latin1_General_CI_AS'), + + (mssql.MSString, [], {}, + 'VARCHAR'), + (mssql.MSString, [1], {}, + 'VARCHAR(1)'), + (mssql.MSString, ['max'], {}, + 'VARCHAR(max)'), + (mssql.MSString, [1], {'collation': 'Latin1_General_CI_AS'}, + 'VARCHAR(1) COLLATE Latin1_General_CI_AS'), + + (mssql.MSNVarchar, [], {}, + 'NVARCHAR'), + (mssql.MSNVarchar, [1], {}, + 'NVARCHAR(1)'), + (mssql.MSNVarchar, ['max'], {}, + 'NVARCHAR(max)'), + (mssql.MSNVarchar, [1], {'collation': 'Latin1_General_CI_AS'}, + 'NVARCHAR(1) COLLATE Latin1_General_CI_AS'), + + (mssql.MSText, [], {}, + 'TEXT'), + (mssql.MSText, [], {'collation': 'Latin1_General_CI_AS'}, + 'TEXT COLLATE Latin1_General_CI_AS'), + + (mssql.MSNText, [], {}, + 'NTEXT'), + (mssql.MSNText, [], {'collation': 'Latin1_General_CI_AS'}, + 'NTEXT COLLATE Latin1_General_CI_AS'), + ] + + table_args = ['test_mssql_charset', MetaData(testing.db)] + for index, spec in enumerate(columns): + type_, args, kw, res = spec + table_args.append(Column('c%s' % index, type_(*args, **kw), nullable=None)) + + charset_table = Table(*table_args) + gen = testing.db.dialect.schemagenerator(testing.db.dialect, testing.db, None, None) + + for col in charset_table.c: + index = int(col.name[1:]) + testing.eq_(gen.get_column_specification(col), + "%s %s" % (col.name, columns[index][3])) + self.assert_(repr(col)) + + try: + charset_table.create(checkfirst=True) + assert True + except: + raise + charset_table.drop() + + def test_timestamp(self): + """Exercise TIMESTAMP column.""" + + meta = MetaData(testing.db) + + try: + columns = [ + (TIMESTAMP, + 'TIMESTAMP'), + (mssql.MSTimeStamp, + 'TIMESTAMP'), + ] + for idx, (spec, expected) in enumerate(columns): + t = Table('mssql_ts%s' % idx, meta, + Column('id', Integer, primary_key=True), + Column('t', spec, nullable=None)) + testing.eq_(colspec(t.c.t), "t %s" % expected) + self.assert_(repr(t.c.t)) + try: + t.create(checkfirst=True) + assert True + except: + raise + t.drop() + finally: + meta.drop_all() + + def test_autoincrement(self): + meta = MetaData(testing.db) + try: + Table('ai_1', meta, + Column('int_y', Integer, primary_key=True), + Column('int_n', Integer, DefaultClause('0'), + primary_key=True)) + Table('ai_2', meta, + Column('int_y', Integer, primary_key=True), + Column('int_n', Integer, DefaultClause('0'), + primary_key=True)) + Table('ai_3', meta, + Column('int_n', Integer, DefaultClause('0'), + primary_key=True, autoincrement=False), + Column('int_y', Integer, primary_key=True)) + Table('ai_4', meta, + Column('int_n', Integer, DefaultClause('0'), + primary_key=True, autoincrement=False), + Column('int_n2', Integer, DefaultClause('0'), + primary_key=True, autoincrement=False)) + Table('ai_5', meta, + Column('int_y', Integer, primary_key=True), + Column('int_n', Integer, DefaultClause('0'), + primary_key=True, autoincrement=False)) + Table('ai_6', meta, + Column('o1', String(1), DefaultClause('x'), + primary_key=True), + Column('int_y', Integer, primary_key=True)) + Table('ai_7', meta, + Column('o1', String(1), DefaultClause('x'), + primary_key=True), + Column('o2', String(1), DefaultClause('x'), + primary_key=True), + Column('int_y', Integer, primary_key=True)) + Table('ai_8', meta, + Column('o1', String(1), DefaultClause('x'), + primary_key=True), + Column('o2', String(1), DefaultClause('x'), + primary_key=True)) + meta.create_all() + + table_names = ['ai_1', 'ai_2', 'ai_3', 'ai_4', + 'ai_5', 'ai_6', 'ai_7', 'ai_8'] + mr = MetaData(testing.db) + mr.reflect(only=table_names) + + for tbl in [mr.tables[name] for name in table_names]: + for c in tbl.c: + if c.name.startswith('int_y'): + assert c.autoincrement + elif c.name.startswith('int_n'): + assert not c.autoincrement + tbl.insert().execute() + if 'int_y' in tbl.c: + assert select([tbl.c.int_y]).scalar() == 1 + assert list(tbl.select().execute().fetchone()).count(1) == 1 + else: + assert 1 not in list(tbl.select().execute().fetchone()) + finally: + meta.drop_all() + +def colspec(c): + return testing.db.dialect.schemagenerator(testing.db.dialect, + testing.db, None, None).get_column_specification(c) + + if __name__ == "__main__": testenv.main() -- 2.47.3