]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
- Rewrote the reflection of indexes to use sys.
authorMike Bayer <mike_mp@zzzcomputing.com>
Sat, 23 Oct 2010 22:09:36 +0000 (18:09 -0400)
committerMike Bayer <mike_mp@zzzcomputing.com>
Sat, 23 Oct 2010 22:09:36 +0000 (18:09 -0400)
catalogs, so that column names of any configuration
(spaces, embedded commas, etc.) can be reflected.
Note that reflection of indexes requires SQL
Server 2005 or greater.  [ticket:1770]

CHANGES
lib/sqlalchemy/dialects/mssql/base.py
lib/sqlalchemy/dialects/mssql/information_schema.py
test/dialect/test_mssql.py

diff --git a/CHANGES b/CHANGES
index 6df2e8006e835bbe26f70f47fc7cfb6cf96d7d07..9eab740d690f2511ae2120377089410639257443 100644 (file)
--- a/CHANGES
+++ b/CHANGES
@@ -260,6 +260,12 @@ CHANGES
     
    - Fixed bug where aliasing of tables with "schema" would
      fail to compile properly.  [ticket:1943]
+
+   - Rewrote the reflection of indexes to use sys. 
+     catalogs, so that column names of any configuration
+     (spaces, embedded commas, etc.) can be reflected.
+     Note that reflection of indexes requires SQL 
+     Server 2005 or greater.  [ticket:1770]
      
 - informix
    - *Major* cleanup / modernization of the Informix 
index 222737dbe2604df68ea562ca9a2c46873cbf3c64..5c3b72647211d490e916de0153d3dd3d1bba086c 100644 (file)
@@ -114,6 +114,8 @@ Known Issues
 ------------
 
 * No support for more than one ``IDENTITY`` column per table
+* reflection of indexes does not work with versions older than
+  SQL Server 2005
 
 """
 import datetime, decimal, inspect, operator, sys, re
@@ -1124,26 +1126,55 @@ class MSDialect(default.DefaultDialect):
         view_names = [r[0] for r in connection.execute(s)]
         return view_names
 
-    # The cursor reports it is closed after executing the sp.
     @reflection.cache
     def get_indexes(self, connection, tablename, schema=None, **kw):
+        # using system catalogs, don't support index reflection
+        # below MS 2005
+        if self.server_version_info < MS_2005_VERSION:
+            return []
+        
         current_schema = schema or self.default_schema_name
-        col_finder = re.compile("(\w+)")
         full_tname = "%s.%s" % (current_schema, tablename)
-        indexes = []
-        s = sql.text("exec sp_helpindex '%s'" % full_tname)
-        rp = connection.execute(s)
-        if rp.closed:
-            # did not work for this setup.
-            return []
+
+        rp = connection.execute(
+            sql.text("select ind.index_id, ind.is_unique, ind.name "
+                "from sys.indexes as ind join sys.tables as tab on "
+                "ind.object_id=tab.object_id "
+                "join sys.schemas as sch on sch.schema_id=tab.schema_id "
+                "where tab.name = :tabname "
+                "and sch.name=:schname "
+                "and ind.is_primary_key=0", 
+                bindparams=[
+                    sql.bindparam('tabname', tablename, sqltypes.Unicode),
+                    sql.bindparam('schname', current_schema, sqltypes.Unicode)
+                ]
+            )
+        )
+        indexes = {}
         for row in rp:
-            if 'primary key' not in row['index_description']:
-                indexes.append({
-                    'name' : row['index_name'],
-                    'column_names' : col_finder.findall(row['index_keys']),
-                    'unique': 'unique' in row['index_description']
-                })
-        return indexes
+            indexes[row['index_id']] = {
+                'name':row['name'],
+                'unique':row['is_unique'] == 1,
+                'column_names':[]
+            }
+        rp = connection.execute(
+            sql.text("select ind_col.index_id, col.name from sys.columns as col "
+                        "join sys.index_columns as ind_col on "
+                        "ind_col.column_id=col.column_id "
+                        "join sys.tables as tab on tab.object_id=col.object_id "
+                        "join sys.schemas as sch on sch.schema_id=tab.schema_id "
+                        "where tab.name=:tabname "
+                        "and sch.name=:schname",
+                        bindparams=[
+                            sql.bindparam('tabname', tablename, sqltypes.Unicode),
+                            sql.bindparam('schname', current_schema, sqltypes.Unicode)
+                        ]),
+            )
+        for row in rp:
+            if row['index_id'] in indexes:
+                indexes[row['index_id']]['column_names'].append(row['name'])
+        
+        return indexes.values()
 
     @reflection.cache
     def get_view_definition(self, connection, viewname, schema=None, **kw):
index cd1606dbfdc6964bb2291bc3bccbf32b148f227b..4dd6436cda673937b1e43b6de8229ee165000cd2 100644 (file)
@@ -1,3 +1,5 @@
+# TODO: should be using the sys. catalog with SQL Server, not information schema
+
 from sqlalchemy import Table, MetaData, Column, ForeignKey
 from sqlalchemy.types import String, Unicode, Integer, TypeDecorator
 
index e840f9c0c975b56595ffa1f187b196ea971f938b..f9912317c6144121736982bd9d8359d011a7b35c 100644 (file)
@@ -439,37 +439,79 @@ class ReflectionTest(TestBase, ComparesTables):
         finally:
             meta.drop_all()
 
+    @testing.provide_metadata
     def test_identity(self):
-        meta = MetaData(testing.db)
         table = Table(
-            'identity_test', meta,
+            'identity_test', metadata,
             Column('col1', Integer, Sequence('fred', 2, 3), primary_key=True)
         )
         table.create()
 
         meta2 = MetaData(testing.db)
-        try:
-            table2 = Table('identity_test', meta2, autoload=True)
-            sequence = isinstance(table2.c['col1'].default, schema.Sequence) \
-                                    and table2.c['col1'].default
-            assert sequence.start == 2
-            assert sequence.increment == 3
-        finally:
-            table.drop()
+        table2 = Table('identity_test', meta2, autoload=True)
+        sequence = isinstance(table2.c['col1'].default, schema.Sequence) \
+                                and table2.c['col1'].default
+        assert sequence.start == 2
+        assert sequence.increment == 3
     
     @testing.emits_warning("Did not recognize")
+    @testing.provide_metadata
     def test_skip_types(self):
-        meta = MetaData(testing.db)
         testing.db.execute("""
             create table foo (id integer primary key, data xml)
         """)
-        try:
-            t1 = Table('foo', meta, autoload=True)
-            assert isinstance(t1.c.id.type, Integer)
-            assert isinstance(t1.c.data.type, types.NullType)
-        finally:
-            testing.db.execute("drop table foo")
+        t1 = Table('foo', metadata, autoload=True)
+        assert isinstance(t1.c.id.type, Integer)
+        assert isinstance(t1.c.data.type, types.NullType)
+
+    @testing.provide_metadata
+    def test_indexes_cols(self):
+        
+        t1 = Table('t', metadata, Column('x', Integer), Column('y', Integer))
+        Index('foo', t1.c.x, t1.c.y)
+        metadata.create_all()
+        
+        m2 = MetaData()
+        t2 = Table('t', m2, autoload=True, autoload_with=testing.db)
+        
+        eq_(
+            set(list(t2.indexes)[0].columns),
+            set([t2.c['x'], t2.c.y])
+        )
+
+    @testing.provide_metadata
+    def test_indexes_cols_with_commas(self):
+        
+        t1 = Table('t', metadata, 
+                        Column('x, col', Integer, key='x'), 
+                        Column('y', Integer)
+                    )
+        Index('foo', t1.c.x, t1.c.y)
+        metadata.create_all()
+        
+        m2 = MetaData()
+        t2 = Table('t', m2, autoload=True, autoload_with=testing.db)
+        
+        eq_(
+            set(list(t2.indexes)[0].columns),
+            set([t2.c['x, col'], t2.c.y])
+        )
+    
+    @testing.provide_metadata
+    def test_indexes_cols_with_spaces(self):
+        
+        t1 = Table('t', metadata, Column('x col', Integer, key='x'), 
+                                    Column('y', Integer))
+        Index('foo', t1.c.x, t1.c.y)
+        metadata.create_all()
+        
+        m2 = MetaData()
+        t2 = Table('t', m2, autoload=True, autoload_with=testing.db)
         
+        eq_(
+            set(list(t2.indexes)[0].columns),
+            set([t2.c['x col'], t2.c.y])
+        )
         
 class QueryUnicodeTest(TestBase):