]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
postgres:
authorMike Bayer <mike_mp@zzzcomputing.com>
Fri, 29 Jun 2007 23:50:25 +0000 (23:50 +0000)
committerMike Bayer <mike_mp@zzzcomputing.com>
Fri, 29 Jun 2007 23:50:25 +0000 (23:50 +0000)
    - added support for reflection of domains [ticket:570]
    - types which are missing during reflection resolve to Null type
      instead of raising an error
    - moved reflection/types/query unit tests specific to postgres to new
      postgres unittest module

CHANGES
lib/sqlalchemy/databases/postgres.py
test/dialect/alltests.py
test/dialect/postgres.py [new file with mode: 0644]
test/engine/reflection.py
test/sql/query.py
test/sql/testtypes.py

diff --git a/CHANGES b/CHANGES
index c77f76318cafc983c395392ca34c3070ab5ca61d..9bfca28cd8daf1bfc25c97843c9a19f29507f998 100644 (file)
--- a/CHANGES
+++ b/CHANGES
@@ -66,6 +66,9 @@
       (i.e., if a textual execute() was issued).
 - postgres
     - fixed escaping of the modulo operator [ticket:624]
+    - added support for reflection of domains [ticket:570]
+    - types which are missing during reflection resolve to Null type
+      instead of raising an error
 - sqlite
     - sqlite better handles datetime/date/time objects mixed and matched
       with various Date/Time/DateTime columns
index a514b9de0267b73256be23af4d59e06ef54c1fc7..80ca5fbe95f6e1e4d7c8f71d4e5abbcfea1eba58 100644 (file)
@@ -4,7 +4,7 @@
 # This module is part of SQLAlchemy and is released under
 # the MIT License: http://www.opensource.org/licenses/mit-license.php
 
-import datetime, string, types, re, random
+import datetime, string, types, re, random, warnings
 
 from sqlalchemy import util, sql, schema, ansisql, exceptions
 from sqlalchemy.engine import base, default
@@ -248,7 +248,7 @@ class PGDialect(ansisql.ANSIDialect):
             self.version = 1
         self.use_information_schema = use_information_schema
         self.paramstyle = 'pyformat'
-
+        
     def dbapi(cls):
         try:
             import psycopg2 as psycopg
@@ -395,6 +395,8 @@ class PGDialect(ansisql.ANSIDialect):
             if not rows:
                 raise exceptions.NoSuchTableError(table.name)
 
+            domains = self._load_domains(connection)
+            
             for name, format_type, default, notnull, attnum, table_oid in rows:
                 ## strip (30) from character varying(30)
                 attype = re.search('([^\(]+)', format_type).group(1)
@@ -433,8 +435,28 @@ class PGDialect(ansisql.ANSIDialect):
                 elif attype == 'timestamp without time zone':
                     kwargs['timezone'] = False
 
-                coltype = ischema_names[attype]
-                coltype = coltype(*args, **kwargs)
+                if attype in ischema_names:
+                    coltype = ischema_names[attype]
+                else:
+                    if attype in domains:
+                        domain = domains[attype]
+                        if domain['attype'] in ischema_names:
+                            # A table can't override whether the domain is nullable.
+                            nullable = domain['nullable']
+
+                            if domain['default'] and not default:
+                                # It can, however, override the default value, but can't set it to null.
+                                default = domain['default']
+                            coltype = ischema_names[domain['attype']]
+                    else:
+                        coltype=None
+
+                if coltype:
+                    coltype = coltype(*args, **kwargs)
+                else:
+                    warnings.warn(RuntimeWarning("Did not recognize type '%s' of column '%s'" % (attype, name)))
+                    coltype = sqltypes.NULLTYPE
+
                 colargs= []
                 if default is not None:
                     match = re.search(r"""(nextval\(')([^']+)('.*$)""", default)
@@ -493,7 +515,49 @@ class PGDialect(ansisql.ANSIDialect):
                         refspec.append(".".join([referred_table, column]))
 
                 table.append_constraint(schema.ForeignKeyConstraint(constrained_columns, refspec, conname))
+                
+    def _load_domains(self, connection):
+        if hasattr(self, '_domains'):
+            return self._domains
+            
+        ## Load data types for domains:
+        SQL_DOMAINS = """
+            SELECT t.typname as "name",
+                   pg_catalog.format_type(t.typbasetype, t.typtypmod) as "attype",
+                   not t.typnotnull as "nullable",
+                   t.typdefault as "default",
+                   pg_catalog.pg_type_is_visible(t.oid) as "visible",
+                   n.nspname as "schema"
+            FROM pg_catalog.pg_type t
+                 LEFT JOIN pg_catalog.pg_namespace n ON n.oid = t.typnamespace
+                 LEFT JOIN pg_catalog.pg_constraint r ON t.oid = r.contypid
+            WHERE t.typtype = 'd'
+        """
 
+        s = sql.text(SQL_DOMAINS, typemap={'attname':sqltypes.Unicode})
+        c = connection.execute(s)
+
+        domains = {}
+        for domain in c.fetchall():
+            ## strip (30) from character varying(30)
+            attype = re.search('([^\(]+)', domain['attype']).group(1)
+            if domain['visible']:
+                # 'visible' just means whether or not the domain is in a 
+                # schema that's on the search path -- or not overriden by
+                # a schema with higher presedence. If it's not visible,
+                # it will be prefixed with the schema-name when it's used.
+                name = domain['name']
+            else:
+                name = "%s.%s" % (domain['schema'], domain['name'])
+
+            domains[name] = {'attype':attype, 'nullable': domain['nullable'], 'default': domain['default']}
+
+        self._domains = domains
+        
+        return self._domains
+        
+        
+        
 class PGCompiler(ansisql.ANSICompiler):
     def visit_insert_column(self, column, parameters):
         # all column primary key inserts must be explicitly present
index 6a11b85a3f0967098f9c9cd7647095cdd1b6eec7..f4b39dd6f363edccdd5baa194f4bb79dad74bce6 100644 (file)
@@ -4,6 +4,7 @@ import unittest
 def suite():
     modules_to_test = (
         'dialect.mysql',
+        'dialect.postgres',
         )
     alltests = unittest.TestSuite()
     for name in modules_to_test:
diff --git a/test/dialect/postgres.py b/test/dialect/postgres.py
new file mode 100644 (file)
index 0000000..254309d
--- /dev/null
@@ -0,0 +1,195 @@
+from testbase import AssertMixin
+import testbase
+from sqlalchemy import *
+from sqlalchemy.databases import postgres
+import datetime
+
+db = testbase.db
+
+class DomainReflectionTest(AssertMixin):
+    "Test PostgreSQL domains"
+
+    @testbase.supported('postgres')
+    def setUpAll(self):
+        self.con = db.connect()
+        self.con.execute('CREATE DOMAIN testdomain INTEGER NOT NULL DEFAULT 42')
+        self.con.execute('CREATE DOMAIN alt_schema.testdomain INTEGER DEFAULT 0')
+        self.con.execute('CREATE TABLE testtable (question integer, answer testdomain)')
+        self.con.execute('CREATE TABLE alt_schema.testtable(question integer, answer alt_schema.testdomain, anything integer)')
+        self.con.execute('CREATE TABLE crosschema (question integer, answer alt_schema.testdomain)')
+
+    @testbase.supported('postgres')
+    def tearDownAll(self):
+        self.con.execute('DROP TABLE testtable')
+        self.con.execute('DROP TABLE alt_schema.testtable')
+        self.con.execute('DROP TABLE crosschema')
+        self.con.execute('DROP DOMAIN testdomain')
+        self.con.execute('DROP DOMAIN alt_schema.testdomain')
+
+    @testbase.supported('postgres')
+    def test_table_is_reflected(self):
+        metadata = BoundMetaData(db)
+        table = Table('testtable', metadata, autoload=True)
+        self.assertEquals(set(table.columns.keys()), set(['question', 'answer']), "Columns of reflected table didn't equal expected columns")
+        self.assertEquals(table.c.answer.type.__class__, postgres.PGInteger)
+        
+    @testbase.supported('postgres')
+    def test_domain_is_reflected(self):
+        metadata = BoundMetaData(db)
+        table = Table('testtable', metadata, autoload=True)
+        self.assertEquals(str(table.columns.answer.default.arg), '42', "Reflected default value didn't equal expected value")
+        self.assertFalse(table.columns.answer.nullable, "Expected reflected column to not be nullable.")
+
+    @testbase.supported('postgres')
+    def test_table_is_reflected_alt_schema(self):
+        metadata = BoundMetaData(db)
+        table = Table('testtable', metadata, autoload=True, schema='alt_schema')
+        self.assertEquals(set(table.columns.keys()), set(['question', 'answer', 'anything']), "Columns of reflected table didn't equal expected columns")
+        self.assertEquals(table.c.anything.type.__class__, postgres.PGInteger)
+
+    @testbase.supported('postgres')
+    def test_schema_domain_is_reflected(self):
+        metadata = BoundMetaData(db)
+        table = Table('testtable', metadata, autoload=True, schema='alt_schema')
+        self.assertEquals(str(table.columns.answer.default.arg), '0', "Reflected default value didn't equal expected value")
+        self.assertTrue(table.columns.answer.nullable, "Expected reflected column to be nullable.")
+
+    @testbase.supported('postgres')
+    def test_crosschema_domain_is_reflected(self):
+        metadata = BoundMetaData(db)
+        table = Table('crosschema', metadata, autoload=True)
+        self.assertEquals(str(table.columns.answer.default.arg), '0', "Reflected default value didn't equal expected value")
+        self.assertTrue(table.columns.answer.nullable, "Expected reflected column to be nullable.")
+
+class MiscTest(AssertMixin):
+    @testbase.supported('postgres')
+    def test_date_reflection(self):
+        m1 = BoundMetaData(testbase.db)
+        t1 = Table('pgdate', m1, 
+            Column('date1', DateTime(timezone=True)),
+            Column('date2', DateTime(timezone=False))
+            )
+        m1.create_all()
+        try:
+            m2 = BoundMetaData(testbase.db)
+            t2 = Table('pgdate', m2, autoload=True)
+            assert t2.c.date1.type.timezone is True
+            assert t2.c.date2.type.timezone is False
+        finally:
+            m1.drop_all()
+
+    @testbase.supported('postgres')
+    def test_checksfor_sequence(self):
+        meta1 = BoundMetaData(testbase.db)
+        t = Table('mytable', meta1, 
+            Column('col1', Integer, Sequence('fooseq')))
+        try:
+            testbase.db.execute("CREATE SEQUENCE fooseq")
+            t.create()
+        finally:
+            t.drop()
+
+    @testbase.supported('postgres')
+    def test_schema_reflection(self):
+        """note: this test requires that the 'alt_schema' schema be separate and accessible by the test user"""
+
+        meta1 = BoundMetaData(testbase.db)
+        users = Table('users', meta1,
+            Column('user_id', Integer, primary_key = True),
+            Column('user_name', String(30), nullable = False),
+            schema="alt_schema"
+            )
+
+        addresses = Table('email_addresses', meta1,
+            Column('address_id', Integer, primary_key = True),
+            Column('remote_user_id', Integer, ForeignKey(users.c.user_id)),
+            Column('email_address', String(20)),
+            schema="alt_schema"
+        )
+        meta1.create_all()
+        try:
+            meta2 = BoundMetaData(testbase.db)
+            addresses = Table('email_addresses', meta2, autoload=True, schema="alt_schema")
+            users = Table('users', meta2, mustexist=True, schema="alt_schema")
+
+            print users
+            print addresses
+            j = join(users, addresses)
+            print str(j.onclause)
+            self.assert_((users.c.user_id==addresses.c.remote_user_id).compare(j.onclause))
+        finally:
+            meta1.drop_all()
+
+    @testbase.supported('postgres')
+    def test_preexecute_passivedefault(self):
+        """test that when we get a primary key column back 
+        from reflecting a table which has a default value on it, we pre-execute
+        that PassiveDefault upon insert."""
+        
+        try:
+            meta = BoundMetaData(testbase.db)
+            testbase.db.execute("""
+             CREATE TABLE speedy_users
+             (
+                 speedy_user_id   SERIAL     PRIMARY KEY,
+
+                 user_name        VARCHAR    NOT NULL,
+                 user_password    VARCHAR    NOT NULL
+             );
+            """, None)
+
+            t = Table("speedy_users", meta, autoload=True)
+            r = t.insert().execute(user_name='user', user_password='lala')
+            assert r.last_inserted_ids() == [1]
+            l = t.select().execute().fetchall()
+            assert l == [(1, 'user', 'lala')]
+        finally:
+            testbase.db.execute("drop table speedy_users", None)
+
+class TimezoneTest(AssertMixin):
+    """test timezone-aware datetimes.  psycopg will return a datetime with a tzinfo attached to it,
+    if postgres returns it.  python then will not let you compare a datetime with a tzinfo to a datetime
+    that doesnt have one.  this test illustrates two ways to have datetime types with and without timezone
+    info. """
+    @testbase.supported('postgres')
+    def setUpAll(self):
+        global tztable, notztable, metadata
+        metadata = BoundMetaData(testbase.db)
+
+        # current_timestamp() in postgres is assumed to return TIMESTAMP WITH TIMEZONE
+        tztable = Table('tztable', metadata,
+            Column("id", Integer, primary_key=True),
+            Column("date", DateTime(timezone=True), onupdate=func.current_timestamp()),
+            Column("name", String(20)),
+        )
+        notztable = Table('notztable', metadata,
+            Column("id", Integer, primary_key=True),
+            Column("date", DateTime(timezone=False), onupdate=cast(func.current_timestamp(), DateTime(timezone=False))),
+            Column("name", String(20)),
+        )
+        metadata.create_all()
+    @testbase.supported('postgres')
+    def tearDownAll(self):
+        metadata.drop_all()
+
+    @testbase.supported('postgres')
+    def test_with_timezone(self):
+        # get a date with a tzinfo
+        somedate = testbase.db.connect().scalar(func.current_timestamp().select())
+        tztable.insert().execute(id=1, name='row1', date=somedate)
+        c = tztable.update(tztable.c.id==1).execute(name='newname')
+        x = c.last_updated_params()
+        print x['date'] == somedate
+
+    @testbase.supported('postgres')
+    def test_without_timezone(self):
+        # get a date without a tzinfo
+        somedate = datetime.datetime(2005, 10,20, 11, 52, 00)
+        notztable.insert().execute(id=1, name='row1', date=somedate)
+        c = notztable.update(notztable.c.id==1).execute(name='newname')
+        x = c.last_updated_params()
+        print x['date'] == somedate
+
+    
+if __name__ == "__main__":
+    testbase.main()
index ccb6f834275adc5885fd6792c79ee4e7776733fa..ee373f43e8e5357c4d49e4af806c83ad31f89627 100644 (file)
@@ -106,23 +106,6 @@ class ReflectionTest(PersistTest):
             addresses.drop()
             users.drop()
             
-    @testbase.supported('postgres')
-    def testpgdates(self):
-        m1 = BoundMetaData(testbase.db)
-        t1 = Table('pgdate', m1, 
-            Column('date1', DateTime(timezone=True)),
-            Column('date2', DateTime(timezone=False))
-            )
-        m1.create_all()
-        try:
-            m2 = BoundMetaData(testbase.db)
-            t2 = Table('pgdate', m2, autoload=True)
-            assert t2.c.date1.type.timezone is True
-            assert t2.c.date2.type.timezone is False
-        finally:
-            m1.drop_all()
-            
-            
     def testoverridecolumns(self):
         """test that you can override columns which contain foreign keys to other reflected tables"""
         meta = BoundMetaData(testbase.db)
@@ -237,17 +220,6 @@ class ReflectionTest(PersistTest):
         finally:
             table.drop(checkfirst=True)
             
-    @testbase.supported('postgres')
-    def testredundantsequence(self):
-        """test that sequences get checked for, before create"""
-        meta1 = BoundMetaData(testbase.db)
-        t = Table('mytable', meta1, 
-            Column('col1', Integer, Sequence('fooseq')))
-        try:
-            testbase.db.execute("CREATE SEQUENCE fooseq")
-            t.create()
-        finally:
-            t.drop()
     
     def test_pks_not_uniques(self):
         """test that primary key reflection not tripped up by unique indexes"""
@@ -523,36 +495,6 @@ class SchemaTest(PersistTest):
         assert buf.index("CREATE TABLE someschema.table1") > -1
         assert buf.index("CREATE TABLE someschema.table2") > -1
     
-    @testbase.supported('postgres')
-    def testpg(self):
-        """note: this test requires that the 'alt_schema' schema be separate and accessible by the test user"""
-        
-        meta1 = BoundMetaData(testbase.db)
-        users = Table('users', meta1,
-            Column('user_id', Integer, primary_key = True),
-            Column('user_name', String(30), nullable = False),
-            schema="alt_schema"
-            )
-
-        addresses = Table('email_addresses', meta1,
-            Column('address_id', Integer, primary_key = True),
-            Column('remote_user_id', Integer, ForeignKey(users.c.user_id)),
-            Column('email_address', String(20)),
-            schema="alt_schema"
-        )
-        meta1.create_all()
-        try:
-            meta2 = BoundMetaData(testbase.db)
-            addresses = Table('email_addresses', meta2, autoload=True, schema="alt_schema")
-            users = Table('users', meta2, mustexist=True, schema="alt_schema")
-
-            print users
-            print addresses
-            j = join(users, addresses)
-            print str(j.onclause)
-            self.assert_((users.c.user_id==addresses.c.remote_user_id).compare(j.onclause))
-        finally:
-            meta1.drop_all()
         
 if __name__ == "__main__":
     testbase.main()        
index 632246fad97f36e66333f9b44a52c4e801fdf6cf..2fe5715c201330992d64cc41fc5c16531c6f8bdd 100644 (file)
@@ -163,51 +163,6 @@ class QueryTest(PersistTest):
             default_metadata.drop_all()
             default_metadata.clear()
  
-    @testbase.supported('postgres')
-    def testpassiveoverride(self):
-        """primarily for postgres, tests that when we get a primary key column back 
-        from reflecting a table which has a default value on it, we pre-execute
-        that PassiveDefault upon insert, even though PassiveDefault says 
-        "let the database execute this", because in postgres we must have all the primary
-        key values in memory before insert; otherwise we cant locate the just inserted row."""
-        try:
-            meta = BoundMetaData(testbase.db)
-            testbase.db.execute("""
-             CREATE TABLE speedy_users
-             (
-                 speedy_user_id   SERIAL     PRIMARY KEY,
-            
-                 user_name        VARCHAR    NOT NULL,
-                 user_password    VARCHAR    NOT NULL
-             );
-            """, None)
-            
-            t = Table("speedy_users", meta, autoload=True)
-            t.insert().execute(user_name='user', user_password='lala')
-            l = t.select().execute().fetchall()
-            self.assert_(l == [(1, 'user', 'lala')])
-        finally:
-            testbase.db.execute("drop table speedy_users", None)
-
-    @testbase.supported('postgres')
-    def testschema(self):
-        meta1 = BoundMetaData(testbase.db)
-        test_table = Table('my_table', meta1,
-                    Column('id', Integer, primary_key=True),
-                    Column('data', String(20), nullable=False),
-                    schema='alt_schema'
-                 )
-        test_table.create()
-        try:
-            # plain insert
-            test_table.insert().execute(data='test')
-
-            meta2 = BoundMetaData(testbase.db)
-            test_table = Table('my_table', meta2, autoload=True, schema='alt_schema')
-            test_table.insert().execute(data='test')
-
-        finally:
-            test_table.drop()
 
     def test_repeated_bindparams(self):
         """test that a BindParam can be used more than once.  
@@ -395,6 +350,7 @@ class QueryTest(PersistTest):
             
     @testbase.supported('postgres')
     def test_functions_with_cols(self):
+        # TODO: shouldnt this work on oracle too ?
         x = testbase.db.func.current_date().execute().scalar()
         y = testbase.db.func.current_date().select().execute().scalar()
         z = testbase.db.func.current_date().scalar()
index e723575e58c0dcdcea8bd01e2b5728868f3b4572..c3dff527210b5cf7ba1dfad7747c1a522124176f 100644 (file)
@@ -375,51 +375,6 @@ class IntervalTest(AssertMixin):
         interval_table.insert().execute(interval=delta)
         assert interval_table.select().execute().fetchone()['interval'] == delta
         
-        
-class TimezoneTest(AssertMixin):
-    """test timezone-aware datetimes.  psycopg will return a datetime with a tzinfo attached to it,
-    if postgres returns it.  python then will not let you compare a datetime with a tzinfo to a datetime
-    that doesnt have one.  this test illustrates two ways to have datetime types with and without timezone
-    info. """
-    @testbase.supported('postgres')
-    def setUpAll(self):
-        global tztable, notztable, metadata
-        metadata = BoundMetaData(testbase.db)
-        
-        # current_timestamp() in postgres is assumed to return TIMESTAMP WITH TIMEZONE
-        tztable = Table('tztable', metadata,
-            Column("id", Integer, primary_key=True),
-            Column("date", DateTime(timezone=True), onupdate=func.current_timestamp()),
-            Column("name", String(20)),
-        )
-        notztable = Table('notztable', metadata,
-            Column("id", Integer, primary_key=True),
-            Column("date", DateTime(timezone=False), onupdate=cast(func.current_timestamp(), DateTime(timezone=False))),
-            Column("name", String(20)),
-        )
-        metadata.create_all()
-    @testbase.supported('postgres')
-    def tearDownAll(self):
-        metadata.drop_all()
-    
-    @testbase.supported('postgres')
-    def testtz(self):
-        # get a date with a tzinfo
-        somedate = testbase.db.connect().scalar(func.current_timestamp().select())
-        tztable.insert().execute(id=1, name='row1', date=somedate)
-        c = tztable.update(tztable.c.id==1).execute(name='newname')
-        x = c.last_updated_params()
-        print x['date'] == somedate
-        
-    @testbase.supported('postgres')
-    def testnotz(self):
-        # get a date without a tzinfo
-        somedate = datetime.datetime(2005, 10,20, 11, 52, 00)
-        notztable.insert().execute(id=1, name='row1', date=somedate)
-        c = notztable.update(notztable.c.id==1).execute(name='newname')
-        x = c.last_updated_params()
-        print x['date'] == somedate
-        
 class BooleanTest(AssertMixin):
     def setUpAll(self):
         global bool_table