]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
- schema-qualified tables now will place the schemaname
authorMike Bayer <mike_mp@zzzcomputing.com>
Sun, 30 Mar 2008 21:48:19 +0000 (21:48 +0000)
committerMike Bayer <mike_mp@zzzcomputing.com>
Sun, 30 Mar 2008 21:48:19 +0000 (21:48 +0000)
ahead of the tablename in all column expressions as well
as when generating column labels.  This prevents cross-
schema name collisions in all cases [ticket:999]
- the "use_schema" argument to compiler.visit_column() is removed.  It uses
schema in all cases now.
- added a new test to the PG dialect to test roundtrip insert/update/delete/select
statements with full schema qualification

CHANGES
lib/sqlalchemy/databases/mssql.py
lib/sqlalchemy/sql/compiler.py
lib/sqlalchemy/sql/expression.py
test/dialect/postgres.py
test/sql/select.py

diff --git a/CHANGES b/CHANGES
index 27280f225eee98c32c1f577dd96c6599e219ed4c..41ddd52b75ad12e0b7eea17c46df73351f97767f 100644 (file)
--- a/CHANGES
+++ b/CHANGES
@@ -129,6 +129,11 @@ CHANGES
       mapper-config'ed order_by when using select_from()
 
 - sql
+    - schema-qualified tables now will place the schemaname
+      ahead of the tablename in all column expressions as well
+      as when generating column labels.  This prevents cross-
+      schema name collisions in all cases [ticket:999]
+      
     - can now allow selects which correlate all FROM clauses
       and have no FROM themselves.  These are typically
       used in a scalar context, i.e. SELECT x, (SELECT x WHERE y)
index 0ebb84a16ad35a682645f7b7f9b4e1e245e43856..da42ea1055167233105ed392f4ad2fda939e4874 100644 (file)
@@ -987,8 +987,6 @@ class MSSQLCompiler(compiler.DefaultCompiler):
             t = self._schema_aliased_table(column.table)
             if t is not None:
                 return self.process(expression._corresponding_column_or_error(t, column))
-        else:
-            kwargs['use_schema'] = True
         return super(MSSQLCompiler, self).visit_column(column, **kwargs)
 
     def visit_binary(self, binary, **kwargs):
index ee3ecc1f1b0c48fddbf97ad0b21523443f34693a..76e2ca2608eab5b9ebd388d67d098ac55624f8e9 100644 (file)
@@ -249,14 +249,9 @@ class DefaultCompiler(engine.Compiled):
 
         return " ".join([self.process(label.obj), self.operator_string(operators.as_), self.preparer.format_label(label, labelname)])
 
-    def visit_column(self, column, result_map=None, use_schema=False, **kwargs):
-        # there is actually somewhat of a ruleset when you would *not* necessarily
-        # want to truncate a column identifier, if its mapped to the name of a
-        # physical column.  but thats very hard to identify at this point, and
-        # the identifier length should be greater than the id lengths of any physical
-        # columns so should not matter.
-
-        if use_schema and getattr(column, 'table', None) and getattr(column.table, 'schema', None):
+    def visit_column(self, column, result_map=None, **kwargs):
+
+        if getattr(column, 'table', None) and getattr(column.table, 'schema', None):
             schema_prefix = self.preparer.quote(column.table, column.table.schema) + '.'
         else:
             schema_prefix = ''
@@ -278,7 +273,7 @@ class DefaultCompiler(engine.Compiled):
                     return schema_prefix + self.preparer.quote(column.table, ANONYMOUS_LABEL.sub(self._process_anon, column.table.name)) + "." + n
             elif len(column.table.primary_key) != 0:
                 pk = list(column.table.primary_key)[0]
-                return self.visit_column(pk, result_map=result_map, use_schema=use_schema, **kwargs)
+                return self.visit_column(pk, result_map=result_map, **kwargs)
             else:
                 return None
         elif column.table is None or not column.table.named_with_column:
index 3d95948cbe57641cf1d81e6837b1a174df4b9da1..2cd10720ab91a83ba01a962aa80c657bd1d52a78 100644 (file)
@@ -2632,7 +2632,10 @@ class _ColumnClause(ColumnElement):
             return None
         if self.__label is None:
             if self.table is not None and self.table.named_with_column:
-                self.__label = self.table.name + "_" + self.name
+                if getattr(self.table, 'schema', None):
+                    self.__label = "_".join([self.table.schema, self.table.name, self.name])
+                else:
+                    self.__label = "_".join([self.table.name, self.name])
                 counter = 1
                 while self.__label in self.table.c:
                     self.__label = self.__label + "_%d" % counter
index 21b11f114bd9ece7cb1d5f266b68bcbe67b6a3ef..e68fd4d746c7804d8ef3f13fc7daaa7414ab3313 100644 (file)
@@ -568,7 +568,31 @@ class MiscTest(TestBase, AssertsExecutionResults):
             self.assert_((subject.c.id==referer.c.ref).compare(subject.join(referer).onclause))
         finally:
             meta1.drop_all()
-
+    
+    def test_schema_roundtrips(self):
+        meta = MetaData(testing.db)
+        users = Table('users', meta, 
+            Column('id', Integer, primary_key=True),
+            Column('name', String(50)), schema='alt_schema')
+        users.create()
+        try:
+            users.insert().execute(id=1, name='name1')
+            users.insert().execute(id=2, name='name2')
+            users.insert().execute(id=3, name='name3')
+            users.insert().execute(id=4, name='name4')
+            
+            self.assertEquals(users.select().where(users.c.name=='name2').execute().fetchall(), [(2, 'name2')])
+            self.assertEquals(users.select(use_labels=True).where(users.c.name=='name2').execute().fetchall(), [(2, 'name2')])
+            
+            users.delete().where(users.c.id==3).execute()
+            self.assertEquals(users.select().where(users.c.name=='name3').execute().fetchall(), [])
+            
+            users.update().where(users.c.name=='name4').execute(name='newname')
+            self.assertEquals(users.select(use_labels=True).where(users.c.id==4).execute().fetchall(), [(4, 'newname')])
+            
+        finally:
+            users.drop()
+            
     def test_preexecute_passivedefault(self):
         """test that when we get a primary key column back
         from reflecting a table which has a default value on it, we pre-execute
index b8d1f4f68c4cf0390e624599df596ae40d62567f..06679e9564b146564653f93d3d16129db36b5360 100644 (file)
@@ -1360,16 +1360,16 @@ class SchemaTest(TestBase, AssertsCompiledSQL):
     @testing.fails_on('mssql')
     def test_select(self):
         # these tests will fail with the MS-SQL compiler since it will alias schema-qualified tables
-        self.assert_compile(table4.select(), "SELECT remotetable.rem_id, remotetable.datatype_id, remotetable.value FROM remote_owner.remotetable")
+        self.assert_compile(table4.select(), "SELECT remote_owner.remotetable.rem_id, remote_owner.remotetable.datatype_id, remote_owner.remotetable.value FROM remote_owner.remotetable")
         self.assert_compile(table4.select(and_(table4.c.datatype_id==7, table4.c.value=='hi')),
-            "SELECT remotetable.rem_id, remotetable.datatype_id, remotetable.value FROM remote_owner.remotetable WHERE "\
-            "remotetable.datatype_id = :remotetable_datatype_id_1 AND remotetable.value = :remotetable_value_1")
+            "SELECT remote_owner.remotetable.rem_id, remote_owner.remotetable.datatype_id, remote_owner.remotetable.value FROM remote_owner.remotetable WHERE "\
+            "remote_owner.remotetable.datatype_id = :remote_owner_remotetable_datatype_id_1 AND remote_owner.remotetable.value = :remote_owner_remotetable_value_1")
 
         s = table4.select(and_(table4.c.datatype_id==7, table4.c.value=='hi'))
         s.use_labels = True
-        self.assert_compile(s, "SELECT remotetable.rem_id AS remotetable_rem_id, remotetable.datatype_id AS remotetable_datatype_id, remotetable.value "\
-            "AS remotetable_value FROM remote_owner.remotetable WHERE "\
-            "remotetable.datatype_id = :remotetable_datatype_id_1 AND remotetable.value = :remotetable_value_1")
+        self.assert_compile(s, "SELECT remote_owner.remotetable.rem_id AS remote_owner_remotetable_rem_id, remote_owner.remotetable.datatype_id AS remote_owner_remotetable_datatype_id, remote_owner.remotetable.value "\
+            "AS remote_owner_remotetable_value FROM remote_owner.remotetable WHERE "\
+            "remote_owner.remotetable.datatype_id = :remote_owner_remotetable_datatype_id_1 AND remote_owner.remotetable.value = :remote_owner_remotetable_value_1")
 
     def test_alias(self):
         a = alias(table4, 'remtable')
@@ -1378,7 +1378,7 @@ class SchemaTest(TestBase, AssertsCompiledSQL):
 
     def test_update(self):
         self.assert_compile(table4.update(table4.c.value=='test', values={table4.c.datatype_id:12}), "UPDATE remote_owner.remotetable SET datatype_id=:datatype_id "\
-            "WHERE remotetable.value = :remotetable_value_1")
+            "WHERE remote_owner.remotetable.value = :remote_owner_remotetable_value_1")
 
     def test_insert(self):
         self.assert_compile(table4.insert(values=(2, 5, 'test')), "INSERT INTO remote_owner.remotetable (rem_id, datatype_id, value) VALUES "\