------------
* No support for more than one ``IDENTITY`` column per table
+* reflection of indexes does not work with versions older than
+ SQL Server 2005
"""
import datetime, decimal, inspect, operator, sys, re
view_names = [r[0] for r in connection.execute(s)]
return view_names
- # The cursor reports it is closed after executing the sp.
@reflection.cache
def get_indexes(self, connection, tablename, schema=None, **kw):
+ # using system catalogs, don't support index reflection
+ # below MS 2005
+ if self.server_version_info < MS_2005_VERSION:
+ return []
+
current_schema = schema or self.default_schema_name
- col_finder = re.compile("(\w+)")
full_tname = "%s.%s" % (current_schema, tablename)
- indexes = []
- s = sql.text("exec sp_helpindex '%s'" % full_tname)
- rp = connection.execute(s)
- if rp.closed:
- # did not work for this setup.
- return []
+
+ rp = connection.execute(
+ sql.text("select ind.index_id, ind.is_unique, ind.name "
+ "from sys.indexes as ind join sys.tables as tab on "
+ "ind.object_id=tab.object_id "
+ "join sys.schemas as sch on sch.schema_id=tab.schema_id "
+ "where tab.name = :tabname "
+ "and sch.name=:schname "
+ "and ind.is_primary_key=0",
+ bindparams=[
+ sql.bindparam('tabname', tablename, sqltypes.Unicode),
+ sql.bindparam('schname', current_schema, sqltypes.Unicode)
+ ]
+ )
+ )
+ indexes = {}
for row in rp:
- if 'primary key' not in row['index_description']:
- indexes.append({
- 'name' : row['index_name'],
- 'column_names' : col_finder.findall(row['index_keys']),
- 'unique': 'unique' in row['index_description']
- })
- return indexes
+ indexes[row['index_id']] = {
+ 'name':row['name'],
+ 'unique':row['is_unique'] == 1,
+ 'column_names':[]
+ }
+ rp = connection.execute(
+ sql.text("select ind_col.index_id, col.name from sys.columns as col "
+ "join sys.index_columns as ind_col on "
+ "ind_col.column_id=col.column_id "
+ "join sys.tables as tab on tab.object_id=col.object_id "
+ "join sys.schemas as sch on sch.schema_id=tab.schema_id "
+ "where tab.name=:tabname "
+ "and sch.name=:schname",
+ bindparams=[
+ sql.bindparam('tabname', tablename, sqltypes.Unicode),
+ sql.bindparam('schname', current_schema, sqltypes.Unicode)
+ ]),
+ )
+ for row in rp:
+ if row['index_id'] in indexes:
+ indexes[row['index_id']]['column_names'].append(row['name'])
+
+ return indexes.values()
@reflection.cache
def get_view_definition(self, connection, viewname, schema=None, **kw):
finally:
meta.drop_all()
+ @testing.provide_metadata
def test_identity(self):
- meta = MetaData(testing.db)
table = Table(
- 'identity_test', meta,
+ 'identity_test', metadata,
Column('col1', Integer, Sequence('fred', 2, 3), primary_key=True)
)
table.create()
meta2 = MetaData(testing.db)
- try:
- table2 = Table('identity_test', meta2, autoload=True)
- sequence = isinstance(table2.c['col1'].default, schema.Sequence) \
- and table2.c['col1'].default
- assert sequence.start == 2
- assert sequence.increment == 3
- finally:
- table.drop()
+ table2 = Table('identity_test', meta2, autoload=True)
+ sequence = isinstance(table2.c['col1'].default, schema.Sequence) \
+ and table2.c['col1'].default
+ assert sequence.start == 2
+ assert sequence.increment == 3
@testing.emits_warning("Did not recognize")
+ @testing.provide_metadata
def test_skip_types(self):
- meta = MetaData(testing.db)
testing.db.execute("""
create table foo (id integer primary key, data xml)
""")
- try:
- t1 = Table('foo', meta, autoload=True)
- assert isinstance(t1.c.id.type, Integer)
- assert isinstance(t1.c.data.type, types.NullType)
- finally:
- testing.db.execute("drop table foo")
+ t1 = Table('foo', metadata, autoload=True)
+ assert isinstance(t1.c.id.type, Integer)
+ assert isinstance(t1.c.data.type, types.NullType)
+
+ @testing.provide_metadata
+ def test_indexes_cols(self):
+
+ t1 = Table('t', metadata, Column('x', Integer), Column('y', Integer))
+ Index('foo', t1.c.x, t1.c.y)
+ metadata.create_all()
+
+ m2 = MetaData()
+ t2 = Table('t', m2, autoload=True, autoload_with=testing.db)
+
+ eq_(
+ set(list(t2.indexes)[0].columns),
+ set([t2.c['x'], t2.c.y])
+ )
+
+ @testing.provide_metadata
+ def test_indexes_cols_with_commas(self):
+
+ t1 = Table('t', metadata,
+ Column('x, col', Integer, key='x'),
+ Column('y', Integer)
+ )
+ Index('foo', t1.c.x, t1.c.y)
+ metadata.create_all()
+
+ m2 = MetaData()
+ t2 = Table('t', m2, autoload=True, autoload_with=testing.db)
+
+ eq_(
+ set(list(t2.indexes)[0].columns),
+ set([t2.c['x, col'], t2.c.y])
+ )
+
+ @testing.provide_metadata
+ def test_indexes_cols_with_spaces(self):
+
+ t1 = Table('t', metadata, Column('x col', Integer, key='x'),
+ Column('y', Integer))
+ Index('foo', t1.c.x, t1.c.y)
+ metadata.create_all()
+
+ m2 = MetaData()
+ t2 = Table('t', m2, autoload=True, autoload_with=testing.db)
+ eq_(
+ set(list(t2.indexes)[0].columns),
+ set([t2.c['x col'], t2.c.y])
+ )
class QueryUnicodeTest(TestBase):