]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
Merge branch 'collation'
authorMichael Trier <mtrier@gmail.com>
Tue, 23 Dec 2008 04:08:13 +0000 (04:08 +0000)
committerMichael Trier <mtrier@gmail.com>
Tue, 23 Dec 2008 04:08:13 +0000 (04:08 +0000)
CHANGES
lib/sqlalchemy/databases/information_schema.py
lib/sqlalchemy/databases/mssql.py
test/dialect/mssql.py

diff --git a/CHANGES b/CHANGES
index 615d91120773ddd9f1b779c62f31c6d981e0e4c1..955301f2dc7d92e35141c3dee65df50b44ad79f5 100644 (file)
--- a/CHANGES
+++ b/CHANGES
@@ -38,6 +38,9 @@ CHANGES
       classes used for both the dynamic collection and the queries
       built from it.
 
+- mssql
+    - Added in the MSReal and MSNText types.
+
 - bugfixes, behavioral changes
 - general
 - orm
@@ -199,6 +202,10 @@ CHANGES
       new doc section "Custom Comparators".
     
 - mssql
+    - Added collation support through the use of a new collation
+      argument. This is supported on the following types: char,
+      nchar, varchar, nvarchar, text, ntext. [ticket:1248]
+
     - Changes to the connection string parameters favor DSN as the
       default specification for pyodbc. See the mssql.py docstring
       for detailed usage instructions.
@@ -206,14 +213,15 @@ CHANGES
     - Added experimental support of savepoints. It
       currently does not work fully with sessions.
 
-    - Support for three levels of column nullability: NULL, NOT NULL,
-      and the database's configured default.  The default Column
-      configuration (nullable=True) will now generate NULL in the DDL.
-      Previously no specification was emitted and the database default
-      would take effect (usually NULL, but not always).  To explicitly
-      request the database default, configure columns with
-      nullable=None and no specification will be emitted in DDL. This
-      is backwards incompatible behavior. [ticket:1243]
+    - Support for three levels of column nullability: NULL,
+      NOT NULL, and the database's configured default.  The default
+      Column configuration (nullable=True) will now generate NULL
+      in the DDL. Previously no specification was emitted and the
+      database default would take effect (usually NULL, but not
+      always).  To explicitly request the database default,
+      configure columns with nullable=None and no specification
+      will be emitted in DDL. This is backwards incompatible
+      behavior. [ticket:1243]
 
 - postgres
     - Calling alias.execute() in conjunction with
index 659bfc33a0ad49e34c9c7bf4f1994167b25c0fe4..b15082ac2ebb6afa0b4ee97553730f19dceb3d02 100644 (file)
@@ -29,6 +29,7 @@ columns = Table("columns", ischema,
     Column("numeric_precision", Integer),
     Column("numeric_scale", Integer),
     Column("column_default", Integer),
+    Column("collation_name", String),
     schema="information_schema")
 
 constraints = Table("table_constraints", ischema,
index 8fb6bfa4ab9b8ae0ec5686628f0b793556d67492..7d2cae5b906271174db786ab1dc122b20b87e665 100644 (file)
@@ -151,6 +151,21 @@ optional and will default to 1,1.
 * Support for auto-fetching of ``@@IDENTITY/@@SCOPE_IDENTITY()`` on
   ``INSERT``
 
+Collation Support
+-----------------
+
+MSSQL specific string types support a collation parameter that
+creates a column-level specific collation for the column. The
+collation parameter accepts a Windows Collation Name or a SQL
+Collation Name. Supported types are MSChar, MSNChar, MSString,
+MSNVarchar, MSText, and MSNText. For example::
+
+    Column('login', String(32, collation='Latin1_General_CI_AS'))
+
+will yield::
+
+    login VARCHAR(32) COLLATE Latin1_General_CI_AS NULL
+
 LIMIT/OFFSET Support
 --------------------
 
@@ -194,7 +209,7 @@ Known Issues
   does **not** work around
 
 """
-import datetime, operator, re, sys, urllib
+import datetime, inspect, operator, re, sys, urllib
 
 from sqlalchemy import sql, schema, exc, util
 from sqlalchemy.sql import compiler, expression, operators as sqlops, functions as sql_functions
@@ -206,6 +221,39 @@ from decimal import Decimal as _python_Decimal
 
 MSSQL_RESERVED_WORDS = set(['function'])
 
+class _StringType(object):
+    """Base for MSSQL string types."""
+
+    def __init__(self, collation=None, **kwargs):
+        self.collation = kwargs.get('collate', collation)
+
+    def _extend(self, spec):
+        """Extend a string-type declaration with standard SQL
+        COLLATE annotations.
+        """
+
+        if self.collation:
+            collation = 'COLLATE %s' % self.collation
+        else:
+            collation = None
+
+        return ' '.join([c for c in (spec, collation)
+                         if c is not None])
+
+    def __repr__(self):
+        attributes = inspect.getargspec(self.__init__)[0][1:]
+        attributes.extend(inspect.getargspec(_StringType.__init__)[0][1:])
+
+        params = {}
+        for attr in attributes:
+            val = getattr(self, attr)
+            if val is not None and val is not False:
+                params[attr] = val
+
+        return "%s(%s)" % (self.__class__.__name__,
+                           ', '.join(['%s=%r' % (k, params[k]) for k in params]))
+
+
 class MSNumeric(sqltypes.Numeric):
     def result_processor(self, dialect):
         if self.asdecimal:
@@ -242,9 +290,13 @@ class MSNumeric(sqltypes.Numeric):
         else:
             return "NUMERIC(%(precision)s, %(scale)s)" % {'precision': self.precision, 'scale' : self.scale}
 
+
 class MSFloat(sqltypes.Float):
     def get_col_spec(self):
-        return "FLOAT(%(precision)s)" % {'precision': self.precision}
+        if self.precision is None:
+            return "FLOAT"
+        else:
+            return "FLOAT(%(precision)s)" % {'precision': self.precision}
 
     def bind_processor(self, dialect):
         def process(value):
@@ -254,22 +306,52 @@ class MSFloat(sqltypes.Float):
             return None
         return process
 
+
+class MSReal(MSFloat):
+    """A type for ``real`` numbers."""
+
+    def __init__(self):
+        """
+        Construct a Real.
+
+        """
+        super(MSReal, self).__init__(precision=24)
+
+    def adapt(self, impltype):
+        return impltype()
+
+    def bind_processor(self, dialect):
+        def process(value):
+            if value is not None:
+                return float(value)
+            else:
+                return value
+        return process
+
+    def get_col_spec(self):
+        return "REAL"
+
+
 class MSInteger(sqltypes.Integer):
     def get_col_spec(self):
         return "INTEGER"
 
+
 class MSBigInteger(MSInteger):
     def get_col_spec(self):
         return "BIGINT"
 
+
 class MSTinyInteger(MSInteger):
     def get_col_spec(self):
         return "TINYINT"
 
+
 class MSSmallInteger(MSInteger):
     def get_col_spec(self):
         return "SMALLINT"
 
+
 class MSDateTime(sqltypes.DateTime):
     def __init__(self, *a, **kw):
         super(MSDateTime, self).__init__(False)
@@ -277,6 +359,7 @@ class MSDateTime(sqltypes.DateTime):
     def get_col_spec(self):
         return "DATETIME"
 
+
 class MSSmallDate(sqltypes.Date):
     def __init__(self, *a, **kw):
         super(MSSmallDate, self).__init__(False)
@@ -292,6 +375,7 @@ class MSSmallDate(sqltypes.Date):
             return value
         return process
 
+
 class MSDate(sqltypes.Date):
     def __init__(self, *a, **kw):
         super(MSDate, self).__init__(False)
@@ -307,6 +391,7 @@ class MSDate(sqltypes.Date):
             return value
         return process
 
+
 class MSTime(sqltypes.Time):
     __zero_date = datetime.date(1900, 1, 1)
 
@@ -334,6 +419,7 @@ class MSTime(sqltypes.Time):
             return value
         return process
 
+
 class MSDateTime_adodbapi(MSDateTime):
     def result_processor(self, dialect):
         def process(value):
@@ -344,6 +430,7 @@ class MSDateTime_adodbapi(MSDateTime):
             return value
         return process
 
+
 class MSDateTime_pyodbc(MSDateTime):
     def bind_processor(self, dialect):
         def process(value):
@@ -352,6 +439,7 @@ class MSDateTime_pyodbc(MSDateTime):
             return value
         return process
 
+
 class MSDate_pyodbc(MSDate):
     def bind_processor(self, dialect):
         def process(value):
@@ -360,41 +448,233 @@ class MSDate_pyodbc(MSDate):
             return value
         return process
 
-class MSText(sqltypes.Text):
+
+class MSText(_StringType, sqltypes.Text):
+    """MSSQL TEXT type, for variable-length text up to 2^31 characters."""
+
+    def __init__(self, *args, **kwargs):
+        """Construct a TEXT.
+
+        :param collation: Optional, a column-level collation for this string
+          value. Accepts a Windows Collation Name or a SQL Collation Name.
+
+        """
+        _StringType.__init__(self, **kwargs)
+        sqltypes.Text.__init__(self, None,
+                convert_unicode=kwargs.get('convert_unicode', False),
+                assert_unicode=kwargs.get('assert_unicode', None))
+
     def get_col_spec(self):
         if self.dialect.text_as_varchar:
-            return "VARCHAR(max)"
+            return self._extend("VARCHAR(max)")
         else:
-            return "TEXT"
+            return self._extend("TEXT")
+
+
+class MSNText(_StringType, sqltypes.UnicodeText):
+    """MSSQL NTEXT type, for variable-length unicode text up to 2^30
+    characters."""
+
+    def __init__(self, *args, **kwargs):
+        """Construct a NTEXT.
+
+        :param collation: Optional, a column-level collation for this string
+          value. Accepts a Windows Collation Name or a SQL Collation Name.
+
+        """
+        _StringType.__init__(self, **kwargs)
+        sqltypes.UnicodeText.__init__(self, None,
+                convert_unicode=kwargs.get('convert_unicode', True),
+                assert_unicode=kwargs.get('assert_unicode', 'warn'))
+
+    def get_col_spec(self):
+        if self.dialect.text_as_varchar:
+            return self._extend("NVARCHAR(max)")
+        else:
+            return self._extend("NTEXT")
+
+
+class MSString(_StringType, sqltypes.String):
+    """MSSQL VARCHAR type, for variable-length non-Unicode data with a maximum
+    of 8,000 characters."""
+
+    def __init__(self, length=None, convert_unicode=False, assert_unicode=None, **kwargs):
+        """Construct a VARCHAR.
+
+        :param length: Optinal, maximum data length, in characters.
+
+        :param convert_unicode: defaults to False.  If True, convert
+          ``unicode`` data sent to the database to a ``str``
+          bytestring, and convert bytestrings coming back from the
+          database into ``unicode``.
+
+          Bytestrings are encoded using the dialect's
+          :attr:`~sqlalchemy.engine.base.Dialect.encoding`, which
+          defaults to `utf-8`.
+
+          If False, may be overridden by
+          :attr:`sqlalchemy.engine.base.Dialect.convert_unicode`.
+
+        :param assert_unicode:
+
+          If None (the default), no assertion will take place unless
+          overridden by :attr:`sqlalchemy.engine.base.Dialect.assert_unicode`.
+
+          If 'warn', will issue a runtime warning if a ``str``
+          instance is used as a bind value.
+
+          If true, will raise an :exc:`sqlalchemy.exc.InvalidRequestError`.
+
+        :param collation: Optional, a column-level collation for this string
+          value. Accepts a Windows Collation Name or a SQL Collation Name.
+
+        """
+        _StringType.__init__(self, **kwargs)
+        sqltypes.String.__init__(self, length=length,
+                convert_unicode=convert_unicode,
+                assert_unicode=assert_unicode)
 
-class MSString(sqltypes.String):
     def get_col_spec(self):
-        return "VARCHAR" + (self.length and "(%d)" % self.length or "")
+        if self.length:
+            return self._extend("VARCHAR(%s)" % self.length)
+        else:
+            return self._extend("VARCHAR")
+
+
+class MSNVarchar(_StringType, sqltypes.Unicode):
+    """MSSQL NVARCHAR type.
+
+    For variable-length unicode character data up to 4,000 characters."""
+
+    def __init__(self, length=None, **kwargs):
+        """Construct a NVARCHAR.
+
+        :param length: Optional, Maximum data length, in characters.
+
+        :param collation: Optional, a column-level collation for this string
+          value. Accepts a Windows Collation Name or a SQL Collation Name.
+
+        """
+        _StringType.__init__(self, **kwargs)
+        sqltypes.Unicode.__init__(self, length=length,
+                convert_unicode=kwargs.get('convert_unicode', True),
+                assert_unicode=kwargs.get('assert_unicode', 'warn'))
+
+    def adapt(self, impltype):
+        return impltype(length=self.length,
+                        convert_unicode=self.convert_unicode,
+                        assert_unicode=self.assert_unicode,
+                        collation=self.collation)
 
-class MSNVarchar(sqltypes.Unicode):
     def get_col_spec(self):
         if self.length:
-            return "NVARCHAR(%(length)s)" % {'length' : self.length}
-        elif self.dialect.text_as_varchar:
-            return "NVARCHAR(max)"
+            return self._extend("NVARCHAR(%(length)s)" % {'length' : self.length})
         else:
-            return "NTEXT"
+            return self._extend("NVARCHAR")
 
-class AdoMSNVarchar(MSNVarchar):
+
+class AdoMSNVarchar(_StringType, sqltypes.Unicode):
     """overrides bindparam/result processing to not convert any unicode strings"""
+
+    def __init__(self, length=None, **kwargs):
+        """Construct a NVARCHAR.
+
+        :param length: Optional, Maximum data length, in characters.
+
+        :param collation: Optional, a column-level collation for this string
+          value. Accepts a Windows Collation Name or a SQL Collation Name.
+
+        """
+        _StringType.__init__(self, **kwargs)
+        sqltypes.Unicode.__init__(self, length=length,
+                convert_unicode=kwargs.get('convert_unicode', True),
+                assert_unicode=kwargs.get('assert_unicode', 'warn'))
+
     def bind_processor(self, dialect):
         return None
 
     def result_processor(self, dialect):
         return None
 
-class MSChar(sqltypes.CHAR):
     def get_col_spec(self):
-        return "CHAR(%(length)s)" % {'length' : self.length}
+        if self.length:
+            return self._extend("NVARCHAR(%(length)s)" % {'length' : self.length})
+        else:
+            return self._extend("NVARCHAR")
+
+
+class MSChar(_StringType, sqltypes.CHAR):
+    """MSSQL CHAR type, for fixed-length non-Unicode data with a maximum
+    of 8,000 characters."""
+
+    def __init__(self, length=None, convert_unicode=False, assert_unicode=None, **kwargs):
+        """Construct a CHAR.
+
+        :param length: Optinal, maximum data length, in characters.
+
+        :param convert_unicode: defaults to False.  If True, convert
+          ``unicode`` data sent to the database to a ``str``
+          bytestring, and convert bytestrings coming back from the
+          database into ``unicode``.
+
+          Bytestrings are encoded using the dialect's
+          :attr:`~sqlalchemy.engine.base.Dialect.encoding`, which
+          defaults to `utf-8`.
+
+          If False, may be overridden by
+          :attr:`sqlalchemy.engine.base.Dialect.convert_unicode`.
+
+        :param assert_unicode:
+
+          If None (the default), no assertion will take place unless
+          overridden by :attr:`sqlalchemy.engine.base.Dialect.assert_unicode`.
+
+          If 'warn', will issue a runtime warning if a ``str``
+          instance is used as a bind value.
+
+          If true, will raise an :exc:`sqlalchemy.exc.InvalidRequestError`.
+
+        :param collation: Optional, a column-level collation for this string
+          value. Accepts a Windows Collation Name or a SQL Collation Name.
+
+        """
+        _StringType.__init__(self, **kwargs)
+        sqltypes.CHAR.__init__(self, length=length,
+                convert_unicode=convert_unicode,
+                assert_unicode=assert_unicode)
+
+    def get_col_spec(self):
+        if self.length:
+            return self._extend("CHAR(%s)" % self.length)
+        else:
+            return self._extend("CHAR")
+
+
+class MSNChar(_StringType, sqltypes.NCHAR):
+    """MSSQL NCHAR type.
+
+    For fixed-length unicode character data up to 4,000 characters."""
+
+    def __init__(self, length=None, **kwargs):
+        """Construct an NCHAR.
+
+        :param length: Optional, Maximum data length, in characters.
+
+        :param collation: Optional, a column-level collation for this string
+          value. Accepts a Windows Collation Name or a SQL Collation Name.
+
+        """
+        _StringType.__init__(self, **kwargs)
+        sqltypes.NCHAR.__init__(self, length=length,
+                convert_unicode=kwargs.get('convert_unicode', True),
+                assert_unicode=kwargs.get('assert_unicode', 'warn'))
 
-class MSNChar(sqltypes.NCHAR):
     def get_col_spec(self):
-        return "NCHAR(%(length)s)" % {'length' : self.length}
+        if self.length:
+            return self._extend("NCHAR(%(length)s)" % {'length' : self.length})
+        else:
+            return self._extend("NCHAR")
+
 
 class MSBinary(sqltypes.Binary):
     def get_col_spec(self):
@@ -557,6 +837,7 @@ class MSSQLDialect(default.DefaultDialect):
         sqltypes.Binary : MSBinary,
         sqltypes.Boolean : MSBoolean,
         sqltypes.Text : MSText,
+        sqltypes.UnicodeText : MSNText,
         sqltypes.CHAR: MSChar,
         sqltypes.NCHAR: MSNChar,
         sqltypes.TIMESTAMP: MSTimeStamp,
@@ -572,7 +853,7 @@ class MSSQLDialect(default.DefaultDialect):
         'char' : MSChar,
         'nchar' : MSNChar,
         'text' : MSText,
-        'ntext' : MSText,
+        'ntext' : MSNText,
         'decimal' : MSNumeric,
         'numeric' : MSNumeric,
         'float' : MSFloat,
@@ -670,7 +951,7 @@ class MSSQLDialect(default.DefaultDialect):
     def type_descriptor(self, typeobj):
         newobj = sqltypes.adapt_type(typeobj, self.colspecs)
         # Some types need to know about the dialect
-        if isinstance(newobj, (MSText, MSNVarchar)):
+        if isinstance(newobj, (MSText, MSNText)):
             newobj.dialect = self
         return newobj
 
@@ -728,14 +1009,15 @@ class MSSQLDialect(default.DefaultDialect):
             if row is None:
                 break
             found_table = True
-            (name, type, nullable, charlen, numericprec, numericscale, default) = (
+            (name, type, nullable, charlen, numericprec, numericscale, default, collation) = (
                 row[columns.c.column_name],
                 row[columns.c.data_type],
                 row[columns.c.is_nullable] == 'YES',
                 row[columns.c.character_maximum_length],
                 row[columns.c.numeric_precision],
                 row[columns.c.numeric_scale],
-                row[columns.c.column_default]
+                row[columns.c.column_default],
+                row[columns.c.collation_name]
             )
             if include_columns and name not in include_columns:
                 continue
@@ -745,8 +1027,14 @@ class MSSQLDialect(default.DefaultDialect):
                 if a is not None:
                     args.append(a)
             coltype = self.ischema_names.get(type, None)
+
+            kwargs = {}
+            if coltype in (MSString, MSChar, MSNVarchar, AdoMSNVarchar, MSNChar, MSText, MSNText):
+                if collation:
+                    kwargs.update(collation=collation)
+
             if coltype == MSText or (coltype == MSString and charlen == -1):
-                coltype = MSText()
+                coltype = MSText(**kwargs)
             else:
                 if coltype is None:
                     util.warn("Did not recognize type '%s' of column '%s'" %
@@ -755,7 +1043,7 @@ class MSSQLDialect(default.DefaultDialect):
 
                 elif coltype in (MSNVarchar, AdoMSNVarchar) and charlen == -1:
                     args[0] = None
-                coltype = coltype(*args)
+                coltype = coltype(*args, **kwargs)
             colargs = []
             if default is not None:
                 colargs.append(schema.DefaultClause(sql.text(default)))
@@ -912,16 +1200,14 @@ class MSSQLDialect_pyodbc(MSSQLDialect):
         return module
 
     colspecs = MSSQLDialect.colspecs.copy()
-    if supports_unicode:
-        colspecs[sqltypes.Unicode] = AdoMSNVarchar
     colspecs[sqltypes.Date] = MSDate_pyodbc
     colspecs[sqltypes.DateTime] = MSDateTime_pyodbc
-
     ischema_names = MSSQLDialect.ischema_names.copy()
-    if supports_unicode:
-        ischema_names['nvarchar'] = AdoMSNVarchar
     ischema_names['smalldatetime'] = MSDate_pyodbc
     ischema_names['datetime'] = MSDateTime_pyodbc
+    if supports_unicode:
+        colspecs[sqltypes.Unicode] = AdoMSNVarchar
+        ischema_names['nvarchar'] = AdoMSNVarchar
 
     def make_connect_string(self, keys, query):
         if 'max_identifier_length' in keys:
index e38ee82b78f9db9db39278d00f4c8b6f86dc38e7..440221cb6daac907223de39407b4a63c1aabfe63 100755 (executable)
@@ -7,6 +7,7 @@ from sqlalchemy.sql import table, column
 from sqlalchemy.databases import mssql
 import sqlalchemy.engine.url as url
 from testlib import *
+from testlib.testing import eq_
 
 
 class CompileTest(TestBase, AssertsCompiledSQL):
@@ -503,5 +504,318 @@ class TypesTest(TestBase):
         except:
             assert False
 
+
+class TypesTest2(TestBase, AssertsExecutionResults):
+    "Test Microsoft SQL Server column types"
+
+    __only_on__ = 'mssql'
+
+    def test_money(self):
+        "Exercise type specification for money types."
+
+        columns = [
+            # column type, args, kwargs, expected ddl
+            (mssql.MSMoney, [], {},
+             'MONEY'),
+            (mssql.MSSmallMoney, [], {},
+             'SMALLMONEY'),
+           ]
+
+        table_args = ['test_mssql_money', MetaData(testing.db)]
+        for index, spec in enumerate(columns):
+            type_, args, kw, res = spec
+            table_args.append(Column('c%s' % index, type_(*args, **kw), nullable=None))
+
+        money_table = Table(*table_args)
+        gen = testing.db.dialect.schemagenerator(testing.db.dialect, testing.db, None, None)
+
+        for col in money_table.c:
+            index = int(col.name[1:])
+            testing.eq_(gen.get_column_specification(col),
+                           "%s %s" % (col.name, columns[index][3]))
+            self.assert_(repr(col))
+
+        try:
+            money_table.create(checkfirst=True)
+            assert True
+        except:
+            raise
+        money_table.drop()
+
+    def test_binary(self):
+        "Exercise type specification for binary types."
+
+        columns = [
+            # column type, args, kwargs, expected ddl
+            (mssql.MSBinary, [], {},
+             'IMAGE')
+           ]
+
+        table_args = ['test_mssql_binary', MetaData(testing.db)]
+        for index, spec in enumerate(columns):
+            type_, args, kw, res = spec
+            table_args.append(Column('c%s' % index, type_(*args, **kw), nullable=None))
+
+        binary_table = Table(*table_args)
+        gen = testing.db.dialect.schemagenerator(testing.db.dialect, testing.db, None, None)
+
+        for col in binary_table.c:
+            index = int(col.name[1:])
+            testing.eq_(gen.get_column_specification(col),
+                           "%s %s" % (col.name, columns[index][3]))
+            self.assert_(repr(col))
+
+        try:
+            binary_table.create(checkfirst=True)
+            assert True
+        except:
+            raise
+        binary_table.drop()
+
+    def test_boolean(self):
+        "Exercise type specification for boolean type."
+
+        columns = [
+            # column type, args, kwargs, expected ddl
+            (mssql.MSBoolean, [], {},
+             'BIT'),
+           ]
+
+        table_args = ['test_mssql_boolean', MetaData(testing.db)]
+        for index, spec in enumerate(columns):
+            type_, args, kw, res = spec
+            table_args.append(Column('c%s' % index, type_(*args, **kw), nullable=None))
+
+        boolean_table = Table(*table_args)
+        gen = testing.db.dialect.schemagenerator(testing.db.dialect, testing.db, None, None)
+
+        for col in boolean_table.c:
+            index = int(col.name[1:])
+            testing.eq_(gen.get_column_specification(col),
+                           "%s %s" % (col.name, columns[index][3]))
+            self.assert_(repr(col))
+
+        try:
+            boolean_table.create(checkfirst=True)
+            assert True
+        except:
+            raise
+        boolean_table.drop()
+
+    def test_numeric(self):
+        "Exercise type specification and options for numeric types."
+
+        columns = [
+            # column type, args, kwargs, expected ddl
+            (mssql.MSNumeric, [], {},
+             'NUMERIC(10, 2)'),
+            (mssql.MSNumeric, [None], {},
+             'NUMERIC'),
+            (mssql.MSNumeric, [12], {},
+             'NUMERIC(12, 2)'),
+            (mssql.MSNumeric, [12, 4], {},
+             'NUMERIC(12, 4)'),
+
+            (mssql.MSFloat, [], {},
+             'FLOAT(10)'),
+            (mssql.MSFloat, [None], {},
+             'FLOAT'),
+            (mssql.MSFloat, [12], {},
+             'FLOAT(12)'),
+            (mssql.MSReal, [], {},
+             'REAL'),
+
+            (mssql.MSInteger, [], {},
+             'INTEGER'),
+            (mssql.MSBigInteger, [], {},
+             'BIGINT'),
+            (mssql.MSTinyInteger, [], {},
+             'TINYINT'),
+            (mssql.MSSmallInteger, [], {},
+             'SMALLINT'),
+           ]
+
+        table_args = ['test_mssql_numeric', MetaData(testing.db)]
+        for index, spec in enumerate(columns):
+            type_, args, kw, res = spec
+            table_args.append(Column('c%s' % index, type_(*args, **kw), nullable=None))
+
+        numeric_table = Table(*table_args)
+        gen = testing.db.dialect.schemagenerator(testing.db.dialect, testing.db, None, None)
+
+        for col in numeric_table.c:
+            index = int(col.name[1:])
+            testing.eq_(gen.get_column_specification(col),
+                           "%s %s" % (col.name, columns[index][3]))
+            self.assert_(repr(col))
+
+        try:
+            numeric_table.create(checkfirst=True)
+            assert True
+        except:
+            raise
+        numeric_table.drop()
+
+    def test_char(self):
+        """Exercise COLLATE-ish options on string types."""
+
+        columns = [
+            (mssql.MSChar, [], {},
+             'CHAR'),
+            (mssql.MSChar, [1], {},
+             'CHAR(1)'),
+            (mssql.MSChar, [1], {'collation': 'Latin1_General_CI_AS'},
+             'CHAR(1) COLLATE Latin1_General_CI_AS'),
+
+            (mssql.MSNChar, [], {},
+             'NCHAR'),
+            (mssql.MSNChar, [1], {},
+             'NCHAR(1)'),
+            (mssql.MSNChar, [1], {'collation': 'Latin1_General_CI_AS'},
+             'NCHAR(1) COLLATE Latin1_General_CI_AS'),
+
+            (mssql.MSString, [], {},
+             'VARCHAR'),
+            (mssql.MSString, [1], {},
+             'VARCHAR(1)'),
+            (mssql.MSString, ['max'], {},
+             'VARCHAR(max)'),
+            (mssql.MSString, [1], {'collation': 'Latin1_General_CI_AS'},
+             'VARCHAR(1) COLLATE Latin1_General_CI_AS'),
+
+            (mssql.MSNVarchar, [], {},
+             'NVARCHAR'),
+            (mssql.MSNVarchar, [1], {},
+             'NVARCHAR(1)'),
+            (mssql.MSNVarchar, ['max'], {},
+             'NVARCHAR(max)'),
+            (mssql.MSNVarchar, [1], {'collation': 'Latin1_General_CI_AS'},
+             'NVARCHAR(1) COLLATE Latin1_General_CI_AS'),
+
+            (mssql.MSText, [], {},
+             'TEXT'),
+            (mssql.MSText, [], {'collation': 'Latin1_General_CI_AS'},
+             'TEXT COLLATE Latin1_General_CI_AS'),
+
+            (mssql.MSNText, [], {},
+             'NTEXT'),
+            (mssql.MSNText, [], {'collation': 'Latin1_General_CI_AS'},
+             'NTEXT COLLATE Latin1_General_CI_AS'),
+           ]
+
+        table_args = ['test_mssql_charset', MetaData(testing.db)]
+        for index, spec in enumerate(columns):
+            type_, args, kw, res = spec
+            table_args.append(Column('c%s' % index, type_(*args, **kw), nullable=None))
+
+        charset_table = Table(*table_args)
+        gen = testing.db.dialect.schemagenerator(testing.db.dialect, testing.db, None, None)
+
+        for col in charset_table.c:
+            index = int(col.name[1:])
+            testing.eq_(gen.get_column_specification(col),
+                           "%s %s" % (col.name, columns[index][3]))
+            self.assert_(repr(col))
+
+        try:
+            charset_table.create(checkfirst=True)
+            assert True
+        except:
+            raise
+        charset_table.drop()
+
+    def test_timestamp(self):
+        """Exercise TIMESTAMP column."""
+
+        meta = MetaData(testing.db)
+
+        try:
+            columns = [
+                (TIMESTAMP,
+                 'TIMESTAMP'),
+                (mssql.MSTimeStamp,
+                 'TIMESTAMP'),
+                ]
+            for idx, (spec, expected) in enumerate(columns):
+                t = Table('mssql_ts%s' % idx, meta,
+                          Column('id', Integer, primary_key=True),
+                          Column('t', spec, nullable=None))
+                testing.eq_(colspec(t.c.t), "t %s" % expected)
+                self.assert_(repr(t.c.t))
+                try:
+                    t.create(checkfirst=True)
+                    assert True
+                except:
+                    raise
+                t.drop()
+        finally:
+            meta.drop_all()
+
+    def test_autoincrement(self):
+        meta = MetaData(testing.db)
+        try:
+            Table('ai_1', meta,
+                  Column('int_y', Integer, primary_key=True),
+                  Column('int_n', Integer, DefaultClause('0'),
+                         primary_key=True))
+            Table('ai_2', meta,
+                  Column('int_y', Integer, primary_key=True),
+                  Column('int_n', Integer, DefaultClause('0'),
+                         primary_key=True))
+            Table('ai_3', meta,
+                  Column('int_n', Integer, DefaultClause('0'),
+                         primary_key=True, autoincrement=False),
+                  Column('int_y', Integer, primary_key=True))
+            Table('ai_4', meta,
+                  Column('int_n', Integer, DefaultClause('0'),
+                         primary_key=True, autoincrement=False),
+                  Column('int_n2', Integer, DefaultClause('0'),
+                         primary_key=True, autoincrement=False))
+            Table('ai_5', meta,
+                  Column('int_y', Integer, primary_key=True),
+                  Column('int_n', Integer, DefaultClause('0'),
+                         primary_key=True, autoincrement=False))
+            Table('ai_6', meta,
+                  Column('o1', String(1), DefaultClause('x'),
+                         primary_key=True),
+                  Column('int_y', Integer, primary_key=True))
+            Table('ai_7', meta,
+                  Column('o1', String(1), DefaultClause('x'),
+                         primary_key=True),
+                  Column('o2', String(1), DefaultClause('x'),
+                         primary_key=True),
+                  Column('int_y', Integer, primary_key=True))
+            Table('ai_8', meta,
+                  Column('o1', String(1), DefaultClause('x'),
+                         primary_key=True),
+                  Column('o2', String(1), DefaultClause('x'),
+                         primary_key=True))
+            meta.create_all()
+
+            table_names = ['ai_1', 'ai_2', 'ai_3', 'ai_4',
+                           'ai_5', 'ai_6', 'ai_7', 'ai_8']
+            mr = MetaData(testing.db)
+            mr.reflect(only=table_names)
+
+            for tbl in [mr.tables[name] for name in table_names]:
+                for c in tbl.c:
+                    if c.name.startswith('int_y'):
+                        assert c.autoincrement
+                    elif c.name.startswith('int_n'):
+                        assert not c.autoincrement
+                tbl.insert().execute()
+                if 'int_y' in tbl.c:
+                    assert select([tbl.c.int_y]).scalar() == 1
+                    assert list(tbl.select().execute().fetchone()).count(1) == 1
+                else:
+                    assert 1 not in list(tbl.select().execute().fetchone())
+        finally:
+            meta.drop_all()
+
+def colspec(c):
+    return testing.db.dialect.schemagenerator(testing.db.dialect,
+        testing.db, None, None).get_column_specification(c)
+
+
 if __name__ == "__main__":
     testenv.main()