* Support for auto-fetching of ``@@IDENTITY/@@SCOPE_IDENTITY()`` on
``INSERT``
+Collation Support
+-----------------
+
+MSSQL specific string types support a collation parameter that
+creates a column-level specific collation for the column. The
+collation parameter accepts a Windows Collation Name or a SQL
+Collation Name. Supported types are MSChar, MSNChar, MSString,
+MSNVarchar, MSText, and MSNText. For example::
+
+ Column('login', String(32, collation='Latin1_General_CI_AS'))
+
+will yield::
+
+ login VARCHAR(32) COLLATE Latin1_General_CI_AS NULL
+
LIMIT/OFFSET Support
--------------------
does **not** work around
"""
-import datetime, operator, re, sys, urllib
+import datetime, inspect, operator, re, sys, urllib
from sqlalchemy import sql, schema, exc, util
from sqlalchemy.sql import compiler, expression, operators as sqlops, functions as sql_functions
MSSQL_RESERVED_WORDS = set(['function'])
+class _StringType(object):
+ """Base for MSSQL string types."""
+
+ def __init__(self, collation=None, **kwargs):
+ self.collation = kwargs.get('collate', collation)
+
+ def _extend(self, spec):
+ """Extend a string-type declaration with standard SQL
+ COLLATE annotations.
+ """
+
+ if self.collation:
+ collation = 'COLLATE %s' % self.collation
+ else:
+ collation = None
+
+ return ' '.join([c for c in (spec, collation)
+ if c is not None])
+
+ def __repr__(self):
+ attributes = inspect.getargspec(self.__init__)[0][1:]
+ attributes.extend(inspect.getargspec(_StringType.__init__)[0][1:])
+
+ params = {}
+ for attr in attributes:
+ val = getattr(self, attr)
+ if val is not None and val is not False:
+ params[attr] = val
+
+ return "%s(%s)" % (self.__class__.__name__,
+ ', '.join(['%s=%r' % (k, params[k]) for k in params]))
+
+
class MSNumeric(sqltypes.Numeric):
def result_processor(self, dialect):
if self.asdecimal:
else:
return "NUMERIC(%(precision)s, %(scale)s)" % {'precision': self.precision, 'scale' : self.scale}
+
class MSFloat(sqltypes.Float):
def get_col_spec(self):
- return "FLOAT(%(precision)s)" % {'precision': self.precision}
+ if self.precision is None:
+ return "FLOAT"
+ else:
+ return "FLOAT(%(precision)s)" % {'precision': self.precision}
def bind_processor(self, dialect):
def process(value):
return None
return process
+
+class MSReal(MSFloat):
+ """A type for ``real`` numbers."""
+
+ def __init__(self):
+ """
+ Construct a Real.
+
+ """
+ super(MSReal, self).__init__(precision=24)
+
+ def adapt(self, impltype):
+ return impltype()
+
+ def bind_processor(self, dialect):
+ def process(value):
+ if value is not None:
+ return float(value)
+ else:
+ return value
+ return process
+
+ def get_col_spec(self):
+ return "REAL"
+
+
class MSInteger(sqltypes.Integer):
def get_col_spec(self):
return "INTEGER"
+
class MSBigInteger(MSInteger):
def get_col_spec(self):
return "BIGINT"
+
class MSTinyInteger(MSInteger):
def get_col_spec(self):
return "TINYINT"
+
class MSSmallInteger(MSInteger):
def get_col_spec(self):
return "SMALLINT"
+
class MSDateTime(sqltypes.DateTime):
def __init__(self, *a, **kw):
super(MSDateTime, self).__init__(False)
def get_col_spec(self):
return "DATETIME"
+
class MSSmallDate(sqltypes.Date):
def __init__(self, *a, **kw):
super(MSSmallDate, self).__init__(False)
return value
return process
+
class MSDate(sqltypes.Date):
def __init__(self, *a, **kw):
super(MSDate, self).__init__(False)
return value
return process
+
class MSTime(sqltypes.Time):
__zero_date = datetime.date(1900, 1, 1)
return value
return process
+
class MSDateTime_adodbapi(MSDateTime):
def result_processor(self, dialect):
def process(value):
return value
return process
+
class MSDateTime_pyodbc(MSDateTime):
def bind_processor(self, dialect):
def process(value):
return value
return process
+
class MSDate_pyodbc(MSDate):
def bind_processor(self, dialect):
def process(value):
return value
return process
-class MSText(sqltypes.Text):
+
+class MSText(_StringType, sqltypes.Text):
+ """MSSQL TEXT type, for variable-length text up to 2^31 characters."""
+
+ def __init__(self, *args, **kwargs):
+ """Construct a TEXT.
+
+ :param collation: Optional, a column-level collation for this string
+ value. Accepts a Windows Collation Name or a SQL Collation Name.
+
+ """
+ _StringType.__init__(self, **kwargs)
+ sqltypes.Text.__init__(self, None,
+ convert_unicode=kwargs.get('convert_unicode', False),
+ assert_unicode=kwargs.get('assert_unicode', None))
+
def get_col_spec(self):
if self.dialect.text_as_varchar:
- return "VARCHAR(max)"
+ return self._extend("VARCHAR(max)")
else:
- return "TEXT"
+ return self._extend("TEXT")
+
+
+class MSNText(_StringType, sqltypes.UnicodeText):
+ """MSSQL NTEXT type, for variable-length unicode text up to 2^30
+ characters."""
+
+ def __init__(self, *args, **kwargs):
+ """Construct a NTEXT.
+
+ :param collation: Optional, a column-level collation for this string
+ value. Accepts a Windows Collation Name or a SQL Collation Name.
+
+ """
+ _StringType.__init__(self, **kwargs)
+ sqltypes.UnicodeText.__init__(self, None,
+ convert_unicode=kwargs.get('convert_unicode', True),
+ assert_unicode=kwargs.get('assert_unicode', 'warn'))
+
+ def get_col_spec(self):
+ if self.dialect.text_as_varchar:
+ return self._extend("NVARCHAR(max)")
+ else:
+ return self._extend("NTEXT")
+
+
+class MSString(_StringType, sqltypes.String):
+ """MSSQL VARCHAR type, for variable-length non-Unicode data with a maximum
+ of 8,000 characters."""
+
+ def __init__(self, length=None, convert_unicode=False, assert_unicode=None, **kwargs):
+ """Construct a VARCHAR.
+
+ :param length: Optinal, maximum data length, in characters.
+
+ :param convert_unicode: defaults to False. If True, convert
+ ``unicode`` data sent to the database to a ``str``
+ bytestring, and convert bytestrings coming back from the
+ database into ``unicode``.
+
+ Bytestrings are encoded using the dialect's
+ :attr:`~sqlalchemy.engine.base.Dialect.encoding`, which
+ defaults to `utf-8`.
+
+ If False, may be overridden by
+ :attr:`sqlalchemy.engine.base.Dialect.convert_unicode`.
+
+ :param assert_unicode:
+
+ If None (the default), no assertion will take place unless
+ overridden by :attr:`sqlalchemy.engine.base.Dialect.assert_unicode`.
+
+ If 'warn', will issue a runtime warning if a ``str``
+ instance is used as a bind value.
+
+ If true, will raise an :exc:`sqlalchemy.exc.InvalidRequestError`.
+
+ :param collation: Optional, a column-level collation for this string
+ value. Accepts a Windows Collation Name or a SQL Collation Name.
+
+ """
+ _StringType.__init__(self, **kwargs)
+ sqltypes.String.__init__(self, length=length,
+ convert_unicode=convert_unicode,
+ assert_unicode=assert_unicode)
-class MSString(sqltypes.String):
def get_col_spec(self):
- return "VARCHAR" + (self.length and "(%d)" % self.length or "")
+ if self.length:
+ return self._extend("VARCHAR(%s)" % self.length)
+ else:
+ return self._extend("VARCHAR")
+
+
+class MSNVarchar(_StringType, sqltypes.Unicode):
+ """MSSQL NVARCHAR type.
+
+ For variable-length unicode character data up to 4,000 characters."""
+
+ def __init__(self, length=None, **kwargs):
+ """Construct a NVARCHAR.
+
+ :param length: Optional, Maximum data length, in characters.
+
+ :param collation: Optional, a column-level collation for this string
+ value. Accepts a Windows Collation Name or a SQL Collation Name.
+
+ """
+ _StringType.__init__(self, **kwargs)
+ sqltypes.Unicode.__init__(self, length=length,
+ convert_unicode=kwargs.get('convert_unicode', True),
+ assert_unicode=kwargs.get('assert_unicode', 'warn'))
+
+ def adapt(self, impltype):
+ return impltype(length=self.length,
+ convert_unicode=self.convert_unicode,
+ assert_unicode=self.assert_unicode,
+ collation=self.collation)
-class MSNVarchar(sqltypes.Unicode):
def get_col_spec(self):
if self.length:
- return "NVARCHAR(%(length)s)" % {'length' : self.length}
- elif self.dialect.text_as_varchar:
- return "NVARCHAR(max)"
+ return self._extend("NVARCHAR(%(length)s)" % {'length' : self.length})
else:
- return "NTEXT"
+ return self._extend("NVARCHAR")
-class AdoMSNVarchar(MSNVarchar):
+
+class AdoMSNVarchar(_StringType, sqltypes.Unicode):
"""overrides bindparam/result processing to not convert any unicode strings"""
+
+ def __init__(self, length=None, **kwargs):
+ """Construct a NVARCHAR.
+
+ :param length: Optional, Maximum data length, in characters.
+
+ :param collation: Optional, a column-level collation for this string
+ value. Accepts a Windows Collation Name or a SQL Collation Name.
+
+ """
+ _StringType.__init__(self, **kwargs)
+ sqltypes.Unicode.__init__(self, length=length,
+ convert_unicode=kwargs.get('convert_unicode', True),
+ assert_unicode=kwargs.get('assert_unicode', 'warn'))
+
def bind_processor(self, dialect):
return None
def result_processor(self, dialect):
return None
-class MSChar(sqltypes.CHAR):
def get_col_spec(self):
- return "CHAR(%(length)s)" % {'length' : self.length}
+ if self.length:
+ return self._extend("NVARCHAR(%(length)s)" % {'length' : self.length})
+ else:
+ return self._extend("NVARCHAR")
+
+
+class MSChar(_StringType, sqltypes.CHAR):
+ """MSSQL CHAR type, for fixed-length non-Unicode data with a maximum
+ of 8,000 characters."""
+
+ def __init__(self, length=None, convert_unicode=False, assert_unicode=None, **kwargs):
+ """Construct a CHAR.
+
+ :param length: Optinal, maximum data length, in characters.
+
+ :param convert_unicode: defaults to False. If True, convert
+ ``unicode`` data sent to the database to a ``str``
+ bytestring, and convert bytestrings coming back from the
+ database into ``unicode``.
+
+ Bytestrings are encoded using the dialect's
+ :attr:`~sqlalchemy.engine.base.Dialect.encoding`, which
+ defaults to `utf-8`.
+
+ If False, may be overridden by
+ :attr:`sqlalchemy.engine.base.Dialect.convert_unicode`.
+
+ :param assert_unicode:
+
+ If None (the default), no assertion will take place unless
+ overridden by :attr:`sqlalchemy.engine.base.Dialect.assert_unicode`.
+
+ If 'warn', will issue a runtime warning if a ``str``
+ instance is used as a bind value.
+
+ If true, will raise an :exc:`sqlalchemy.exc.InvalidRequestError`.
+
+ :param collation: Optional, a column-level collation for this string
+ value. Accepts a Windows Collation Name or a SQL Collation Name.
+
+ """
+ _StringType.__init__(self, **kwargs)
+ sqltypes.CHAR.__init__(self, length=length,
+ convert_unicode=convert_unicode,
+ assert_unicode=assert_unicode)
+
+ def get_col_spec(self):
+ if self.length:
+ return self._extend("CHAR(%s)" % self.length)
+ else:
+ return self._extend("CHAR")
+
+
+class MSNChar(_StringType, sqltypes.NCHAR):
+ """MSSQL NCHAR type.
+
+ For fixed-length unicode character data up to 4,000 characters."""
+
+ def __init__(self, length=None, **kwargs):
+ """Construct an NCHAR.
+
+ :param length: Optional, Maximum data length, in characters.
+
+ :param collation: Optional, a column-level collation for this string
+ value. Accepts a Windows Collation Name or a SQL Collation Name.
+
+ """
+ _StringType.__init__(self, **kwargs)
+ sqltypes.NCHAR.__init__(self, length=length,
+ convert_unicode=kwargs.get('convert_unicode', True),
+ assert_unicode=kwargs.get('assert_unicode', 'warn'))
-class MSNChar(sqltypes.NCHAR):
def get_col_spec(self):
- return "NCHAR(%(length)s)" % {'length' : self.length}
+ if self.length:
+ return self._extend("NCHAR(%(length)s)" % {'length' : self.length})
+ else:
+ return self._extend("NCHAR")
+
class MSBinary(sqltypes.Binary):
def get_col_spec(self):
sqltypes.Binary : MSBinary,
sqltypes.Boolean : MSBoolean,
sqltypes.Text : MSText,
+ sqltypes.UnicodeText : MSNText,
sqltypes.CHAR: MSChar,
sqltypes.NCHAR: MSNChar,
sqltypes.TIMESTAMP: MSTimeStamp,
'char' : MSChar,
'nchar' : MSNChar,
'text' : MSText,
- 'ntext' : MSText,
+ 'ntext' : MSNText,
'decimal' : MSNumeric,
'numeric' : MSNumeric,
'float' : MSFloat,
def type_descriptor(self, typeobj):
newobj = sqltypes.adapt_type(typeobj, self.colspecs)
# Some types need to know about the dialect
- if isinstance(newobj, (MSText, MSNVarchar)):
+ if isinstance(newobj, (MSText, MSNText)):
newobj.dialect = self
return newobj
if row is None:
break
found_table = True
- (name, type, nullable, charlen, numericprec, numericscale, default) = (
+ (name, type, nullable, charlen, numericprec, numericscale, default, collation) = (
row[columns.c.column_name],
row[columns.c.data_type],
row[columns.c.is_nullable] == 'YES',
row[columns.c.character_maximum_length],
row[columns.c.numeric_precision],
row[columns.c.numeric_scale],
- row[columns.c.column_default]
+ row[columns.c.column_default],
+ row[columns.c.collation_name]
)
if include_columns and name not in include_columns:
continue
if a is not None:
args.append(a)
coltype = self.ischema_names.get(type, None)
+
+ kwargs = {}
+ if coltype in (MSString, MSChar, MSNVarchar, AdoMSNVarchar, MSNChar, MSText, MSNText):
+ if collation:
+ kwargs.update(collation=collation)
+
if coltype == MSText or (coltype == MSString and charlen == -1):
- coltype = MSText()
+ coltype = MSText(**kwargs)
else:
if coltype is None:
util.warn("Did not recognize type '%s' of column '%s'" %
elif coltype in (MSNVarchar, AdoMSNVarchar) and charlen == -1:
args[0] = None
- coltype = coltype(*args)
+ coltype = coltype(*args, **kwargs)
colargs = []
if default is not None:
colargs.append(schema.DefaultClause(sql.text(default)))
return module
colspecs = MSSQLDialect.colspecs.copy()
- if supports_unicode:
- colspecs[sqltypes.Unicode] = AdoMSNVarchar
colspecs[sqltypes.Date] = MSDate_pyodbc
colspecs[sqltypes.DateTime] = MSDateTime_pyodbc
-
ischema_names = MSSQLDialect.ischema_names.copy()
- if supports_unicode:
- ischema_names['nvarchar'] = AdoMSNVarchar
ischema_names['smalldatetime'] = MSDate_pyodbc
ischema_names['datetime'] = MSDateTime_pyodbc
+ if supports_unicode:
+ colspecs[sqltypes.Unicode] = AdoMSNVarchar
+ ischema_names['nvarchar'] = AdoMSNVarchar
def make_connect_string(self, keys, query):
if 'max_identifier_length' in keys:
from sqlalchemy.databases import mssql
import sqlalchemy.engine.url as url
from testlib import *
+from testlib.testing import eq_
class CompileTest(TestBase, AssertsCompiledSQL):
except:
assert False
+
+class TypesTest2(TestBase, AssertsExecutionResults):
+ "Test Microsoft SQL Server column types"
+
+ __only_on__ = 'mssql'
+
+ def test_money(self):
+ "Exercise type specification for money types."
+
+ columns = [
+ # column type, args, kwargs, expected ddl
+ (mssql.MSMoney, [], {},
+ 'MONEY'),
+ (mssql.MSSmallMoney, [], {},
+ 'SMALLMONEY'),
+ ]
+
+ table_args = ['test_mssql_money', MetaData(testing.db)]
+ for index, spec in enumerate(columns):
+ type_, args, kw, res = spec
+ table_args.append(Column('c%s' % index, type_(*args, **kw), nullable=None))
+
+ money_table = Table(*table_args)
+ gen = testing.db.dialect.schemagenerator(testing.db.dialect, testing.db, None, None)
+
+ for col in money_table.c:
+ index = int(col.name[1:])
+ testing.eq_(gen.get_column_specification(col),
+ "%s %s" % (col.name, columns[index][3]))
+ self.assert_(repr(col))
+
+ try:
+ money_table.create(checkfirst=True)
+ assert True
+ except:
+ raise
+ money_table.drop()
+
+ def test_binary(self):
+ "Exercise type specification for binary types."
+
+ columns = [
+ # column type, args, kwargs, expected ddl
+ (mssql.MSBinary, [], {},
+ 'IMAGE')
+ ]
+
+ table_args = ['test_mssql_binary', MetaData(testing.db)]
+ for index, spec in enumerate(columns):
+ type_, args, kw, res = spec
+ table_args.append(Column('c%s' % index, type_(*args, **kw), nullable=None))
+
+ binary_table = Table(*table_args)
+ gen = testing.db.dialect.schemagenerator(testing.db.dialect, testing.db, None, None)
+
+ for col in binary_table.c:
+ index = int(col.name[1:])
+ testing.eq_(gen.get_column_specification(col),
+ "%s %s" % (col.name, columns[index][3]))
+ self.assert_(repr(col))
+
+ try:
+ binary_table.create(checkfirst=True)
+ assert True
+ except:
+ raise
+ binary_table.drop()
+
+ def test_boolean(self):
+ "Exercise type specification for boolean type."
+
+ columns = [
+ # column type, args, kwargs, expected ddl
+ (mssql.MSBoolean, [], {},
+ 'BIT'),
+ ]
+
+ table_args = ['test_mssql_boolean', MetaData(testing.db)]
+ for index, spec in enumerate(columns):
+ type_, args, kw, res = spec
+ table_args.append(Column('c%s' % index, type_(*args, **kw), nullable=None))
+
+ boolean_table = Table(*table_args)
+ gen = testing.db.dialect.schemagenerator(testing.db.dialect, testing.db, None, None)
+
+ for col in boolean_table.c:
+ index = int(col.name[1:])
+ testing.eq_(gen.get_column_specification(col),
+ "%s %s" % (col.name, columns[index][3]))
+ self.assert_(repr(col))
+
+ try:
+ boolean_table.create(checkfirst=True)
+ assert True
+ except:
+ raise
+ boolean_table.drop()
+
+ def test_numeric(self):
+ "Exercise type specification and options for numeric types."
+
+ columns = [
+ # column type, args, kwargs, expected ddl
+ (mssql.MSNumeric, [], {},
+ 'NUMERIC(10, 2)'),
+ (mssql.MSNumeric, [None], {},
+ 'NUMERIC'),
+ (mssql.MSNumeric, [12], {},
+ 'NUMERIC(12, 2)'),
+ (mssql.MSNumeric, [12, 4], {},
+ 'NUMERIC(12, 4)'),
+
+ (mssql.MSFloat, [], {},
+ 'FLOAT(10)'),
+ (mssql.MSFloat, [None], {},
+ 'FLOAT'),
+ (mssql.MSFloat, [12], {},
+ 'FLOAT(12)'),
+ (mssql.MSReal, [], {},
+ 'REAL'),
+
+ (mssql.MSInteger, [], {},
+ 'INTEGER'),
+ (mssql.MSBigInteger, [], {},
+ 'BIGINT'),
+ (mssql.MSTinyInteger, [], {},
+ 'TINYINT'),
+ (mssql.MSSmallInteger, [], {},
+ 'SMALLINT'),
+ ]
+
+ table_args = ['test_mssql_numeric', MetaData(testing.db)]
+ for index, spec in enumerate(columns):
+ type_, args, kw, res = spec
+ table_args.append(Column('c%s' % index, type_(*args, **kw), nullable=None))
+
+ numeric_table = Table(*table_args)
+ gen = testing.db.dialect.schemagenerator(testing.db.dialect, testing.db, None, None)
+
+ for col in numeric_table.c:
+ index = int(col.name[1:])
+ testing.eq_(gen.get_column_specification(col),
+ "%s %s" % (col.name, columns[index][3]))
+ self.assert_(repr(col))
+
+ try:
+ numeric_table.create(checkfirst=True)
+ assert True
+ except:
+ raise
+ numeric_table.drop()
+
+ def test_char(self):
+ """Exercise COLLATE-ish options on string types."""
+
+ columns = [
+ (mssql.MSChar, [], {},
+ 'CHAR'),
+ (mssql.MSChar, [1], {},
+ 'CHAR(1)'),
+ (mssql.MSChar, [1], {'collation': 'Latin1_General_CI_AS'},
+ 'CHAR(1) COLLATE Latin1_General_CI_AS'),
+
+ (mssql.MSNChar, [], {},
+ 'NCHAR'),
+ (mssql.MSNChar, [1], {},
+ 'NCHAR(1)'),
+ (mssql.MSNChar, [1], {'collation': 'Latin1_General_CI_AS'},
+ 'NCHAR(1) COLLATE Latin1_General_CI_AS'),
+
+ (mssql.MSString, [], {},
+ 'VARCHAR'),
+ (mssql.MSString, [1], {},
+ 'VARCHAR(1)'),
+ (mssql.MSString, ['max'], {},
+ 'VARCHAR(max)'),
+ (mssql.MSString, [1], {'collation': 'Latin1_General_CI_AS'},
+ 'VARCHAR(1) COLLATE Latin1_General_CI_AS'),
+
+ (mssql.MSNVarchar, [], {},
+ 'NVARCHAR'),
+ (mssql.MSNVarchar, [1], {},
+ 'NVARCHAR(1)'),
+ (mssql.MSNVarchar, ['max'], {},
+ 'NVARCHAR(max)'),
+ (mssql.MSNVarchar, [1], {'collation': 'Latin1_General_CI_AS'},
+ 'NVARCHAR(1) COLLATE Latin1_General_CI_AS'),
+
+ (mssql.MSText, [], {},
+ 'TEXT'),
+ (mssql.MSText, [], {'collation': 'Latin1_General_CI_AS'},
+ 'TEXT COLLATE Latin1_General_CI_AS'),
+
+ (mssql.MSNText, [], {},
+ 'NTEXT'),
+ (mssql.MSNText, [], {'collation': 'Latin1_General_CI_AS'},
+ 'NTEXT COLLATE Latin1_General_CI_AS'),
+ ]
+
+ table_args = ['test_mssql_charset', MetaData(testing.db)]
+ for index, spec in enumerate(columns):
+ type_, args, kw, res = spec
+ table_args.append(Column('c%s' % index, type_(*args, **kw), nullable=None))
+
+ charset_table = Table(*table_args)
+ gen = testing.db.dialect.schemagenerator(testing.db.dialect, testing.db, None, None)
+
+ for col in charset_table.c:
+ index = int(col.name[1:])
+ testing.eq_(gen.get_column_specification(col),
+ "%s %s" % (col.name, columns[index][3]))
+ self.assert_(repr(col))
+
+ try:
+ charset_table.create(checkfirst=True)
+ assert True
+ except:
+ raise
+ charset_table.drop()
+
+ def test_timestamp(self):
+ """Exercise TIMESTAMP column."""
+
+ meta = MetaData(testing.db)
+
+ try:
+ columns = [
+ (TIMESTAMP,
+ 'TIMESTAMP'),
+ (mssql.MSTimeStamp,
+ 'TIMESTAMP'),
+ ]
+ for idx, (spec, expected) in enumerate(columns):
+ t = Table('mssql_ts%s' % idx, meta,
+ Column('id', Integer, primary_key=True),
+ Column('t', spec, nullable=None))
+ testing.eq_(colspec(t.c.t), "t %s" % expected)
+ self.assert_(repr(t.c.t))
+ try:
+ t.create(checkfirst=True)
+ assert True
+ except:
+ raise
+ t.drop()
+ finally:
+ meta.drop_all()
+
+ def test_autoincrement(self):
+ meta = MetaData(testing.db)
+ try:
+ Table('ai_1', meta,
+ Column('int_y', Integer, primary_key=True),
+ Column('int_n', Integer, DefaultClause('0'),
+ primary_key=True))
+ Table('ai_2', meta,
+ Column('int_y', Integer, primary_key=True),
+ Column('int_n', Integer, DefaultClause('0'),
+ primary_key=True))
+ Table('ai_3', meta,
+ Column('int_n', Integer, DefaultClause('0'),
+ primary_key=True, autoincrement=False),
+ Column('int_y', Integer, primary_key=True))
+ Table('ai_4', meta,
+ Column('int_n', Integer, DefaultClause('0'),
+ primary_key=True, autoincrement=False),
+ Column('int_n2', Integer, DefaultClause('0'),
+ primary_key=True, autoincrement=False))
+ Table('ai_5', meta,
+ Column('int_y', Integer, primary_key=True),
+ Column('int_n', Integer, DefaultClause('0'),
+ primary_key=True, autoincrement=False))
+ Table('ai_6', meta,
+ Column('o1', String(1), DefaultClause('x'),
+ primary_key=True),
+ Column('int_y', Integer, primary_key=True))
+ Table('ai_7', meta,
+ Column('o1', String(1), DefaultClause('x'),
+ primary_key=True),
+ Column('o2', String(1), DefaultClause('x'),
+ primary_key=True),
+ Column('int_y', Integer, primary_key=True))
+ Table('ai_8', meta,
+ Column('o1', String(1), DefaultClause('x'),
+ primary_key=True),
+ Column('o2', String(1), DefaultClause('x'),
+ primary_key=True))
+ meta.create_all()
+
+ table_names = ['ai_1', 'ai_2', 'ai_3', 'ai_4',
+ 'ai_5', 'ai_6', 'ai_7', 'ai_8']
+ mr = MetaData(testing.db)
+ mr.reflect(only=table_names)
+
+ for tbl in [mr.tables[name] for name in table_names]:
+ for c in tbl.c:
+ if c.name.startswith('int_y'):
+ assert c.autoincrement
+ elif c.name.startswith('int_n'):
+ assert not c.autoincrement
+ tbl.insert().execute()
+ if 'int_y' in tbl.c:
+ assert select([tbl.c.int_y]).scalar() == 1
+ assert list(tbl.select().execute().fetchone()).count(1) == 1
+ else:
+ assert 1 not in list(tbl.select().execute().fetchone())
+ finally:
+ meta.drop_all()
+
+def colspec(c):
+ return testing.db.dialect.schemagenerator(testing.db.dialect,
+ testing.db, None, None).get_column_specification(c)
+
+
if __name__ == "__main__":
testenv.main()