]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
- Corrected the "has_sequence" query to take current schema,
authorMike Bayer <mike_mp@zzzcomputing.com>
Wed, 21 Oct 2009 04:47:13 +0000 (04:47 +0000)
committerMike Bayer <mike_mp@zzzcomputing.com>
Wed, 21 Oct 2009 04:47:13 +0000 (04:47 +0000)
or explicit sequence-stated schema, into account.
[ticket:1576]

CHANGES
lib/sqlalchemy/databases/postgres.py
test/engine/test_reflection.py

diff --git a/CHANGES b/CHANGES
index 4a6b28bde30cc4e8fe18f8e5834a1feefa0232e6..fb16947b28d3267ff2daebd578eba67c04a3b534 100644 (file)
--- a/CHANGES
+++ b/CHANGES
@@ -39,7 +39,11 @@ CHANGES
       via a new postgres.PGDoublePrecision object.  
       This is postgresql.DOUBLE_PRECISION in 0.6.
       [ticket:1085]
-
+    
+    - Corrected the "has_sequence" query to take current schema,
+      or explicit sequence-stated schema, into account.
+      [ticket:1576]
+      
 - mssql
     - Changed the name of TrustedConnection to 
       Trusted_Connection when constructing pyodbc connect
index 03d0377a0ee410c5aad5338205a333c6b8d68860..be9148835d360ad4d71968f4da6214ca89b54c8e 100644 (file)
@@ -426,11 +426,32 @@ class PGDialect(default.DefaultDialect):
             cursor = connection.execute("""select relname from pg_class c join pg_namespace n on n.oid=c.relnamespace where n.nspname=current_schema() and lower(relname)=%(name)s""", {'name':table_name.lower().encode(self.encoding)});
         else:
             cursor = connection.execute("""select relname from pg_class c join pg_namespace n on n.oid=c.relnamespace where n.nspname=%(schema)s and lower(relname)=%(name)s""", {'name':table_name.lower().encode(self.encoding), 'schema':schema});
-        return bool( not not cursor.rowcount )
+        try:
+            return bool(cursor.fetchone())
+        finally:
+            cursor.close()
 
-    def has_sequence(self, connection, sequence_name):
-        cursor = connection.execute('''SELECT relname FROM pg_class WHERE relkind = 'S' AND relnamespace IN ( SELECT oid FROM pg_namespace WHERE nspname NOT LIKE 'pg_%%' AND nspname != 'information_schema' AND relname = %(seqname)s);''', {'seqname': sequence_name.encode(self.encoding)})
-        return bool(not not cursor.rowcount)
+    def has_sequence(self, connection, sequence_name, schema=None):
+        if schema is None:
+            cursor = connection.execute(
+                        sql.text("SELECT relname FROM pg_class c join pg_namespace n on "
+                            "n.oid=c.relnamespace where relkind='S' and n.nspname=current_schema() and lower(relname)=:name",
+                            bindparams=[sql.bindparam('name', unicode(sequence_name.lower()), type_=sqltypes.Unicode)] 
+                        )
+                    )
+        else:
+            cursor = connection.execute(
+                        sql.text("SELECT relname FROM pg_class c join pg_namespace n on "
+                            "n.oid=c.relnamespace where relkind='S' and n.nspname=:schema and lower(relname)=:name",
+                            bindparams=[sql.bindparam('name', unicode(sequence_name.lower()), type_=sqltypes.Unicode),
+                                sql.bindparam('schema', unicode(schema), type_=sqltypes.Unicode)] 
+                        )
+                    )
+        
+        try:
+            return bool(cursor.fetchone())
+        finally:
+            cursor.close()
 
     def is_disconnect(self, e):
         if isinstance(e, self.dbapi.OperationalError):
@@ -821,7 +842,7 @@ class PGSchemaGenerator(compiler.SchemaGenerator):
         return colspec
 
     def visit_sequence(self, sequence):
-        if not sequence.optional and (not self.checkfirst or not self.dialect.has_sequence(self.connection, sequence.name)):
+        if not sequence.optional and (not self.checkfirst or not self.dialect.has_sequence(self.connection, sequence.name, schema=sequence.schema)):
             self.append("CREATE SEQUENCE %s" % self.preparer.format_sequence(sequence))
             self.execute()
 
@@ -845,7 +866,7 @@ class PGSchemaGenerator(compiler.SchemaGenerator):
 
 class PGSchemaDropper(compiler.SchemaDropper):
     def visit_sequence(self, sequence):
-        if not sequence.optional and (not self.checkfirst or self.dialect.has_sequence(self.connection, sequence.name)):
+        if not sequence.optional and (not self.checkfirst or self.dialect.has_sequence(self.connection, sequence.name, schema=sequence.schema)):
             self.append("DROP SEQUENCE %s" % self.preparer.format_sequence(sequence))
             self.execute()
 
index ea80776a6a1bb4217fb7542a3738a9c722468537..a0c8242e5747dd66e610dcff6a20041ce3e8e25e 100644 (file)
@@ -802,22 +802,43 @@ class SchemaTest(TestBase):
         finally:
             metadata.drop_all()
 
-
 class HasSequenceTest(TestBase):
-    @classmethod
-    def setup_class(cls):
-        global metadata, users
+
+    @testing.requires.sequences
+    def test_has_sequence(self):
         metadata = MetaData()
         users = Table('users', metadata,
                       Column('user_id', sa.Integer, sa.Sequence('user_id_seq'), primary_key=True),
                       Column('user_name', sa.String(40)),
                       )
+        metadata.create_all(bind=testing.db)
+        try:
+            eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq'), True)
+        finally:
+            metadata.drop_all(bind=testing.db)
+        eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq'), False)
 
+    
     @testing.requires.sequences
-    def test_hassequence(self):
-        metadata.create_all(bind=testing.db)
+    def test_has_sequence_schema(self):
+        # this test will probably fail on oracle , firebird since they don't have 
+        # the "alt_schema" thing going on.  this is all cleared up in 0.6.
+        
+        test_schema = "alt_schema"
+        s1 = sa.Sequence('user_id_seq', schema=test_schema)
+        s2 = sa.Sequence('user_id_seq')
+        
+        # this is good for PG, oracle, and firebird version 2.  In 0.6
+        # we use the CreateSequence/DropSequence construct
+        testing.db.execute("create sequence %s.user_id_seq" % test_schema)
+        testing.db.execute("create sequence user_id_seq")
+        eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq', schema=test_schema), True)
         eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq'), True)
-        metadata.drop_all(bind=testing.db)
+        testing.db.execute("drop sequence %s.user_id_seq" % test_schema)
+        eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq', schema=test_schema), False)
+        eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq'), True)
+        testing.db.execute("drop sequence user_id_seq")
+        eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq', schema=test_schema), False)
         eq_(testing.db.dialect.has_sequence(testing.db, 'user_id_seq'), False)