From: Mike Bayer Date: Sat, 23 Oct 2010 22:09:36 +0000 (-0400) Subject: - Rewrote the reflection of indexes to use sys. X-Git-Tag: rel_0_6_5~3 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=9c0d6c0a2326d00579c87c140890e6a9b65b6d32;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git - Rewrote the reflection of indexes to use sys. catalogs, so that column names of any configuration (spaces, embedded commas, etc.) can be reflected. Note that reflection of indexes requires SQL Server 2005 or greater. [ticket:1770] --- diff --git a/CHANGES b/CHANGES index 6df2e8006e..9eab740d69 100644 --- a/CHANGES +++ b/CHANGES @@ -260,6 +260,12 @@ CHANGES - Fixed bug where aliasing of tables with "schema" would fail to compile properly. [ticket:1943] + + - Rewrote the reflection of indexes to use sys. + catalogs, so that column names of any configuration + (spaces, embedded commas, etc.) can be reflected. + Note that reflection of indexes requires SQL + Server 2005 or greater. [ticket:1770] - informix - *Major* cleanup / modernization of the Informix diff --git a/lib/sqlalchemy/dialects/mssql/base.py b/lib/sqlalchemy/dialects/mssql/base.py index 222737dbe2..5c3b726472 100644 --- a/lib/sqlalchemy/dialects/mssql/base.py +++ b/lib/sqlalchemy/dialects/mssql/base.py @@ -114,6 +114,8 @@ Known Issues ------------ * No support for more than one ``IDENTITY`` column per table +* reflection of indexes does not work with versions older than + SQL Server 2005 """ import datetime, decimal, inspect, operator, sys, re @@ -1124,26 +1126,55 @@ class MSDialect(default.DefaultDialect): view_names = [r[0] for r in connection.execute(s)] return view_names - # The cursor reports it is closed after executing the sp. @reflection.cache def get_indexes(self, connection, tablename, schema=None, **kw): + # using system catalogs, don't support index reflection + # below MS 2005 + if self.server_version_info < MS_2005_VERSION: + return [] + current_schema = schema or self.default_schema_name - col_finder = re.compile("(\w+)") full_tname = "%s.%s" % (current_schema, tablename) - indexes = [] - s = sql.text("exec sp_helpindex '%s'" % full_tname) - rp = connection.execute(s) - if rp.closed: - # did not work for this setup. - return [] + + rp = connection.execute( + sql.text("select ind.index_id, ind.is_unique, ind.name " + "from sys.indexes as ind join sys.tables as tab on " + "ind.object_id=tab.object_id " + "join sys.schemas as sch on sch.schema_id=tab.schema_id " + "where tab.name = :tabname " + "and sch.name=:schname " + "and ind.is_primary_key=0", + bindparams=[ + sql.bindparam('tabname', tablename, sqltypes.Unicode), + sql.bindparam('schname', current_schema, sqltypes.Unicode) + ] + ) + ) + indexes = {} for row in rp: - if 'primary key' not in row['index_description']: - indexes.append({ - 'name' : row['index_name'], - 'column_names' : col_finder.findall(row['index_keys']), - 'unique': 'unique' in row['index_description'] - }) - return indexes + indexes[row['index_id']] = { + 'name':row['name'], + 'unique':row['is_unique'] == 1, + 'column_names':[] + } + rp = connection.execute( + sql.text("select ind_col.index_id, col.name from sys.columns as col " + "join sys.index_columns as ind_col on " + "ind_col.column_id=col.column_id " + "join sys.tables as tab on tab.object_id=col.object_id " + "join sys.schemas as sch on sch.schema_id=tab.schema_id " + "where tab.name=:tabname " + "and sch.name=:schname", + bindparams=[ + sql.bindparam('tabname', tablename, sqltypes.Unicode), + sql.bindparam('schname', current_schema, sqltypes.Unicode) + ]), + ) + for row in rp: + if row['index_id'] in indexes: + indexes[row['index_id']]['column_names'].append(row['name']) + + return indexes.values() @reflection.cache def get_view_definition(self, connection, viewname, schema=None, **kw): diff --git a/lib/sqlalchemy/dialects/mssql/information_schema.py b/lib/sqlalchemy/dialects/mssql/information_schema.py index cd1606dbfd..4dd6436cda 100644 --- a/lib/sqlalchemy/dialects/mssql/information_schema.py +++ b/lib/sqlalchemy/dialects/mssql/information_schema.py @@ -1,3 +1,5 @@ +# TODO: should be using the sys. catalog with SQL Server, not information schema + from sqlalchemy import Table, MetaData, Column, ForeignKey from sqlalchemy.types import String, Unicode, Integer, TypeDecorator diff --git a/test/dialect/test_mssql.py b/test/dialect/test_mssql.py index e840f9c0c9..f9912317c6 100644 --- a/test/dialect/test_mssql.py +++ b/test/dialect/test_mssql.py @@ -439,37 +439,79 @@ class ReflectionTest(TestBase, ComparesTables): finally: meta.drop_all() + @testing.provide_metadata def test_identity(self): - meta = MetaData(testing.db) table = Table( - 'identity_test', meta, + 'identity_test', metadata, Column('col1', Integer, Sequence('fred', 2, 3), primary_key=True) ) table.create() meta2 = MetaData(testing.db) - try: - table2 = Table('identity_test', meta2, autoload=True) - sequence = isinstance(table2.c['col1'].default, schema.Sequence) \ - and table2.c['col1'].default - assert sequence.start == 2 - assert sequence.increment == 3 - finally: - table.drop() + table2 = Table('identity_test', meta2, autoload=True) + sequence = isinstance(table2.c['col1'].default, schema.Sequence) \ + and table2.c['col1'].default + assert sequence.start == 2 + assert sequence.increment == 3 @testing.emits_warning("Did not recognize") + @testing.provide_metadata def test_skip_types(self): - meta = MetaData(testing.db) testing.db.execute(""" create table foo (id integer primary key, data xml) """) - try: - t1 = Table('foo', meta, autoload=True) - assert isinstance(t1.c.id.type, Integer) - assert isinstance(t1.c.data.type, types.NullType) - finally: - testing.db.execute("drop table foo") + t1 = Table('foo', metadata, autoload=True) + assert isinstance(t1.c.id.type, Integer) + assert isinstance(t1.c.data.type, types.NullType) + + @testing.provide_metadata + def test_indexes_cols(self): + + t1 = Table('t', metadata, Column('x', Integer), Column('y', Integer)) + Index('foo', t1.c.x, t1.c.y) + metadata.create_all() + + m2 = MetaData() + t2 = Table('t', m2, autoload=True, autoload_with=testing.db) + + eq_( + set(list(t2.indexes)[0].columns), + set([t2.c['x'], t2.c.y]) + ) + + @testing.provide_metadata + def test_indexes_cols_with_commas(self): + + t1 = Table('t', metadata, + Column('x, col', Integer, key='x'), + Column('y', Integer) + ) + Index('foo', t1.c.x, t1.c.y) + metadata.create_all() + + m2 = MetaData() + t2 = Table('t', m2, autoload=True, autoload_with=testing.db) + + eq_( + set(list(t2.indexes)[0].columns), + set([t2.c['x, col'], t2.c.y]) + ) + + @testing.provide_metadata + def test_indexes_cols_with_spaces(self): + + t1 = Table('t', metadata, Column('x col', Integer, key='x'), + Column('y', Integer)) + Index('foo', t1.c.x, t1.c.y) + metadata.create_all() + + m2 = MetaData() + t2 = Table('t', m2, autoload=True, autoload_with=testing.db) + eq_( + set(list(t2.indexes)[0].columns), + set([t2.c['x col'], t2.c.y]) + ) class QueryUnicodeTest(TestBase):