From 3f3d84e754a4485caadd2cd520e372172a951565 Mon Sep 17 00:00:00 2001 From: Mike Bayer Date: Fri, 29 Jun 2007 23:50:25 +0000 Subject: [PATCH] postgres: - added support for reflection of domains [ticket:570] - types which are missing during reflection resolve to Null type instead of raising an error - moved reflection/types/query unit tests specific to postgres to new postgres unittest module --- CHANGES | 3 + lib/sqlalchemy/databases/postgres.py | 72 +++++++++- test/dialect/alltests.py | 1 + test/dialect/postgres.py | 195 +++++++++++++++++++++++++++ test/engine/reflection.py | 58 -------- test/sql/query.py | 46 +------ test/sql/testtypes.py | 45 ------- 7 files changed, 268 insertions(+), 152 deletions(-) create mode 100644 test/dialect/postgres.py diff --git a/CHANGES b/CHANGES index c77f76318c..9bfca28cd8 100644 --- a/CHANGES +++ b/CHANGES @@ -66,6 +66,9 @@ (i.e., if a textual execute() was issued). - postgres - fixed escaping of the modulo operator [ticket:624] + - added support for reflection of domains [ticket:570] + - types which are missing during reflection resolve to Null type + instead of raising an error - sqlite - sqlite better handles datetime/date/time objects mixed and matched with various Date/Time/DateTime columns diff --git a/lib/sqlalchemy/databases/postgres.py b/lib/sqlalchemy/databases/postgres.py index a514b9de02..80ca5fbe95 100644 --- a/lib/sqlalchemy/databases/postgres.py +++ b/lib/sqlalchemy/databases/postgres.py @@ -4,7 +4,7 @@ # This module is part of SQLAlchemy and is released under # the MIT License: http://www.opensource.org/licenses/mit-license.php -import datetime, string, types, re, random +import datetime, string, types, re, random, warnings from sqlalchemy import util, sql, schema, ansisql, exceptions from sqlalchemy.engine import base, default @@ -248,7 +248,7 @@ class PGDialect(ansisql.ANSIDialect): self.version = 1 self.use_information_schema = use_information_schema self.paramstyle = 'pyformat' - + def dbapi(cls): try: import psycopg2 as psycopg @@ -395,6 +395,8 @@ class PGDialect(ansisql.ANSIDialect): if not rows: raise exceptions.NoSuchTableError(table.name) + domains = self._load_domains(connection) + for name, format_type, default, notnull, attnum, table_oid in rows: ## strip (30) from character varying(30) attype = re.search('([^\(]+)', format_type).group(1) @@ -433,8 +435,28 @@ class PGDialect(ansisql.ANSIDialect): elif attype == 'timestamp without time zone': kwargs['timezone'] = False - coltype = ischema_names[attype] - coltype = coltype(*args, **kwargs) + if attype in ischema_names: + coltype = ischema_names[attype] + else: + if attype in domains: + domain = domains[attype] + if domain['attype'] in ischema_names: + # A table can't override whether the domain is nullable. + nullable = domain['nullable'] + + if domain['default'] and not default: + # It can, however, override the default value, but can't set it to null. + default = domain['default'] + coltype = ischema_names[domain['attype']] + else: + coltype=None + + if coltype: + coltype = coltype(*args, **kwargs) + else: + warnings.warn(RuntimeWarning("Did not recognize type '%s' of column '%s'" % (attype, name))) + coltype = sqltypes.NULLTYPE + colargs= [] if default is not None: match = re.search(r"""(nextval\(')([^']+)('.*$)""", default) @@ -493,7 +515,49 @@ class PGDialect(ansisql.ANSIDialect): refspec.append(".".join([referred_table, column])) table.append_constraint(schema.ForeignKeyConstraint(constrained_columns, refspec, conname)) + + def _load_domains(self, connection): + if hasattr(self, '_domains'): + return self._domains + + ## Load data types for domains: + SQL_DOMAINS = """ + SELECT t.typname as "name", + pg_catalog.format_type(t.typbasetype, t.typtypmod) as "attype", + not t.typnotnull as "nullable", + t.typdefault as "default", + pg_catalog.pg_type_is_visible(t.oid) as "visible", + n.nspname as "schema" + FROM pg_catalog.pg_type t + LEFT JOIN pg_catalog.pg_namespace n ON n.oid = t.typnamespace + LEFT JOIN pg_catalog.pg_constraint r ON t.oid = r.contypid + WHERE t.typtype = 'd' + """ + s = sql.text(SQL_DOMAINS, typemap={'attname':sqltypes.Unicode}) + c = connection.execute(s) + + domains = {} + for domain in c.fetchall(): + ## strip (30) from character varying(30) + attype = re.search('([^\(]+)', domain['attype']).group(1) + if domain['visible']: + # 'visible' just means whether or not the domain is in a + # schema that's on the search path -- or not overriden by + # a schema with higher presedence. If it's not visible, + # it will be prefixed with the schema-name when it's used. + name = domain['name'] + else: + name = "%s.%s" % (domain['schema'], domain['name']) + + domains[name] = {'attype':attype, 'nullable': domain['nullable'], 'default': domain['default']} + + self._domains = domains + + return self._domains + + + class PGCompiler(ansisql.ANSICompiler): def visit_insert_column(self, column, parameters): # all column primary key inserts must be explicitly present diff --git a/test/dialect/alltests.py b/test/dialect/alltests.py index 6a11b85a3f..f4b39dd6f3 100644 --- a/test/dialect/alltests.py +++ b/test/dialect/alltests.py @@ -4,6 +4,7 @@ import unittest def suite(): modules_to_test = ( 'dialect.mysql', + 'dialect.postgres', ) alltests = unittest.TestSuite() for name in modules_to_test: diff --git a/test/dialect/postgres.py b/test/dialect/postgres.py new file mode 100644 index 0000000000..254309d228 --- /dev/null +++ b/test/dialect/postgres.py @@ -0,0 +1,195 @@ +from testbase import AssertMixin +import testbase +from sqlalchemy import * +from sqlalchemy.databases import postgres +import datetime + +db = testbase.db + +class DomainReflectionTest(AssertMixin): + "Test PostgreSQL domains" + + @testbase.supported('postgres') + def setUpAll(self): + self.con = db.connect() + self.con.execute('CREATE DOMAIN testdomain INTEGER NOT NULL DEFAULT 42') + self.con.execute('CREATE DOMAIN alt_schema.testdomain INTEGER DEFAULT 0') + self.con.execute('CREATE TABLE testtable (question integer, answer testdomain)') + self.con.execute('CREATE TABLE alt_schema.testtable(question integer, answer alt_schema.testdomain, anything integer)') + self.con.execute('CREATE TABLE crosschema (question integer, answer alt_schema.testdomain)') + + @testbase.supported('postgres') + def tearDownAll(self): + self.con.execute('DROP TABLE testtable') + self.con.execute('DROP TABLE alt_schema.testtable') + self.con.execute('DROP TABLE crosschema') + self.con.execute('DROP DOMAIN testdomain') + self.con.execute('DROP DOMAIN alt_schema.testdomain') + + @testbase.supported('postgres') + def test_table_is_reflected(self): + metadata = BoundMetaData(db) + table = Table('testtable', metadata, autoload=True) + self.assertEquals(set(table.columns.keys()), set(['question', 'answer']), "Columns of reflected table didn't equal expected columns") + self.assertEquals(table.c.answer.type.__class__, postgres.PGInteger) + + @testbase.supported('postgres') + def test_domain_is_reflected(self): + metadata = BoundMetaData(db) + table = Table('testtable', metadata, autoload=True) + self.assertEquals(str(table.columns.answer.default.arg), '42', "Reflected default value didn't equal expected value") + self.assertFalse(table.columns.answer.nullable, "Expected reflected column to not be nullable.") + + @testbase.supported('postgres') + def test_table_is_reflected_alt_schema(self): + metadata = BoundMetaData(db) + table = Table('testtable', metadata, autoload=True, schema='alt_schema') + self.assertEquals(set(table.columns.keys()), set(['question', 'answer', 'anything']), "Columns of reflected table didn't equal expected columns") + self.assertEquals(table.c.anything.type.__class__, postgres.PGInteger) + + @testbase.supported('postgres') + def test_schema_domain_is_reflected(self): + metadata = BoundMetaData(db) + table = Table('testtable', metadata, autoload=True, schema='alt_schema') + self.assertEquals(str(table.columns.answer.default.arg), '0', "Reflected default value didn't equal expected value") + self.assertTrue(table.columns.answer.nullable, "Expected reflected column to be nullable.") + + @testbase.supported('postgres') + def test_crosschema_domain_is_reflected(self): + metadata = BoundMetaData(db) + table = Table('crosschema', metadata, autoload=True) + self.assertEquals(str(table.columns.answer.default.arg), '0', "Reflected default value didn't equal expected value") + self.assertTrue(table.columns.answer.nullable, "Expected reflected column to be nullable.") + +class MiscTest(AssertMixin): + @testbase.supported('postgres') + def test_date_reflection(self): + m1 = BoundMetaData(testbase.db) + t1 = Table('pgdate', m1, + Column('date1', DateTime(timezone=True)), + Column('date2', DateTime(timezone=False)) + ) + m1.create_all() + try: + m2 = BoundMetaData(testbase.db) + t2 = Table('pgdate', m2, autoload=True) + assert t2.c.date1.type.timezone is True + assert t2.c.date2.type.timezone is False + finally: + m1.drop_all() + + @testbase.supported('postgres') + def test_checksfor_sequence(self): + meta1 = BoundMetaData(testbase.db) + t = Table('mytable', meta1, + Column('col1', Integer, Sequence('fooseq'))) + try: + testbase.db.execute("CREATE SEQUENCE fooseq") + t.create() + finally: + t.drop() + + @testbase.supported('postgres') + def test_schema_reflection(self): + """note: this test requires that the 'alt_schema' schema be separate and accessible by the test user""" + + meta1 = BoundMetaData(testbase.db) + users = Table('users', meta1, + Column('user_id', Integer, primary_key = True), + Column('user_name', String(30), nullable = False), + schema="alt_schema" + ) + + addresses = Table('email_addresses', meta1, + Column('address_id', Integer, primary_key = True), + Column('remote_user_id', Integer, ForeignKey(users.c.user_id)), + Column('email_address', String(20)), + schema="alt_schema" + ) + meta1.create_all() + try: + meta2 = BoundMetaData(testbase.db) + addresses = Table('email_addresses', meta2, autoload=True, schema="alt_schema") + users = Table('users', meta2, mustexist=True, schema="alt_schema") + + print users + print addresses + j = join(users, addresses) + print str(j.onclause) + self.assert_((users.c.user_id==addresses.c.remote_user_id).compare(j.onclause)) + finally: + meta1.drop_all() + + @testbase.supported('postgres') + def test_preexecute_passivedefault(self): + """test that when we get a primary key column back + from reflecting a table which has a default value on it, we pre-execute + that PassiveDefault upon insert.""" + + try: + meta = BoundMetaData(testbase.db) + testbase.db.execute(""" + CREATE TABLE speedy_users + ( + speedy_user_id SERIAL PRIMARY KEY, + + user_name VARCHAR NOT NULL, + user_password VARCHAR NOT NULL + ); + """, None) + + t = Table("speedy_users", meta, autoload=True) + r = t.insert().execute(user_name='user', user_password='lala') + assert r.last_inserted_ids() == [1] + l = t.select().execute().fetchall() + assert l == [(1, 'user', 'lala')] + finally: + testbase.db.execute("drop table speedy_users", None) + +class TimezoneTest(AssertMixin): + """test timezone-aware datetimes. psycopg will return a datetime with a tzinfo attached to it, + if postgres returns it. python then will not let you compare a datetime with a tzinfo to a datetime + that doesnt have one. this test illustrates two ways to have datetime types with and without timezone + info. """ + @testbase.supported('postgres') + def setUpAll(self): + global tztable, notztable, metadata + metadata = BoundMetaData(testbase.db) + + # current_timestamp() in postgres is assumed to return TIMESTAMP WITH TIMEZONE + tztable = Table('tztable', metadata, + Column("id", Integer, primary_key=True), + Column("date", DateTime(timezone=True), onupdate=func.current_timestamp()), + Column("name", String(20)), + ) + notztable = Table('notztable', metadata, + Column("id", Integer, primary_key=True), + Column("date", DateTime(timezone=False), onupdate=cast(func.current_timestamp(), DateTime(timezone=False))), + Column("name", String(20)), + ) + metadata.create_all() + @testbase.supported('postgres') + def tearDownAll(self): + metadata.drop_all() + + @testbase.supported('postgres') + def test_with_timezone(self): + # get a date with a tzinfo + somedate = testbase.db.connect().scalar(func.current_timestamp().select()) + tztable.insert().execute(id=1, name='row1', date=somedate) + c = tztable.update(tztable.c.id==1).execute(name='newname') + x = c.last_updated_params() + print x['date'] == somedate + + @testbase.supported('postgres') + def test_without_timezone(self): + # get a date without a tzinfo + somedate = datetime.datetime(2005, 10,20, 11, 52, 00) + notztable.insert().execute(id=1, name='row1', date=somedate) + c = notztable.update(notztable.c.id==1).execute(name='newname') + x = c.last_updated_params() + print x['date'] == somedate + + +if __name__ == "__main__": + testbase.main() diff --git a/test/engine/reflection.py b/test/engine/reflection.py index ccb6f83427..ee373f43e8 100644 --- a/test/engine/reflection.py +++ b/test/engine/reflection.py @@ -106,23 +106,6 @@ class ReflectionTest(PersistTest): addresses.drop() users.drop() - @testbase.supported('postgres') - def testpgdates(self): - m1 = BoundMetaData(testbase.db) - t1 = Table('pgdate', m1, - Column('date1', DateTime(timezone=True)), - Column('date2', DateTime(timezone=False)) - ) - m1.create_all() - try: - m2 = BoundMetaData(testbase.db) - t2 = Table('pgdate', m2, autoload=True) - assert t2.c.date1.type.timezone is True - assert t2.c.date2.type.timezone is False - finally: - m1.drop_all() - - def testoverridecolumns(self): """test that you can override columns which contain foreign keys to other reflected tables""" meta = BoundMetaData(testbase.db) @@ -237,17 +220,6 @@ class ReflectionTest(PersistTest): finally: table.drop(checkfirst=True) - @testbase.supported('postgres') - def testredundantsequence(self): - """test that sequences get checked for, before create""" - meta1 = BoundMetaData(testbase.db) - t = Table('mytable', meta1, - Column('col1', Integer, Sequence('fooseq'))) - try: - testbase.db.execute("CREATE SEQUENCE fooseq") - t.create() - finally: - t.drop() def test_pks_not_uniques(self): """test that primary key reflection not tripped up by unique indexes""" @@ -523,36 +495,6 @@ class SchemaTest(PersistTest): assert buf.index("CREATE TABLE someschema.table1") > -1 assert buf.index("CREATE TABLE someschema.table2") > -1 - @testbase.supported('postgres') - def testpg(self): - """note: this test requires that the 'alt_schema' schema be separate and accessible by the test user""" - - meta1 = BoundMetaData(testbase.db) - users = Table('users', meta1, - Column('user_id', Integer, primary_key = True), - Column('user_name', String(30), nullable = False), - schema="alt_schema" - ) - - addresses = Table('email_addresses', meta1, - Column('address_id', Integer, primary_key = True), - Column('remote_user_id', Integer, ForeignKey(users.c.user_id)), - Column('email_address', String(20)), - schema="alt_schema" - ) - meta1.create_all() - try: - meta2 = BoundMetaData(testbase.db) - addresses = Table('email_addresses', meta2, autoload=True, schema="alt_schema") - users = Table('users', meta2, mustexist=True, schema="alt_schema") - - print users - print addresses - j = join(users, addresses) - print str(j.onclause) - self.assert_((users.c.user_id==addresses.c.remote_user_id).compare(j.onclause)) - finally: - meta1.drop_all() if __name__ == "__main__": testbase.main() diff --git a/test/sql/query.py b/test/sql/query.py index 632246fad9..2fe5715c20 100644 --- a/test/sql/query.py +++ b/test/sql/query.py @@ -163,51 +163,6 @@ class QueryTest(PersistTest): default_metadata.drop_all() default_metadata.clear() - @testbase.supported('postgres') - def testpassiveoverride(self): - """primarily for postgres, tests that when we get a primary key column back - from reflecting a table which has a default value on it, we pre-execute - that PassiveDefault upon insert, even though PassiveDefault says - "let the database execute this", because in postgres we must have all the primary - key values in memory before insert; otherwise we cant locate the just inserted row.""" - try: - meta = BoundMetaData(testbase.db) - testbase.db.execute(""" - CREATE TABLE speedy_users - ( - speedy_user_id SERIAL PRIMARY KEY, - - user_name VARCHAR NOT NULL, - user_password VARCHAR NOT NULL - ); - """, None) - - t = Table("speedy_users", meta, autoload=True) - t.insert().execute(user_name='user', user_password='lala') - l = t.select().execute().fetchall() - self.assert_(l == [(1, 'user', 'lala')]) - finally: - testbase.db.execute("drop table speedy_users", None) - - @testbase.supported('postgres') - def testschema(self): - meta1 = BoundMetaData(testbase.db) - test_table = Table('my_table', meta1, - Column('id', Integer, primary_key=True), - Column('data', String(20), nullable=False), - schema='alt_schema' - ) - test_table.create() - try: - # plain insert - test_table.insert().execute(data='test') - - meta2 = BoundMetaData(testbase.db) - test_table = Table('my_table', meta2, autoload=True, schema='alt_schema') - test_table.insert().execute(data='test') - - finally: - test_table.drop() def test_repeated_bindparams(self): """test that a BindParam can be used more than once. @@ -395,6 +350,7 @@ class QueryTest(PersistTest): @testbase.supported('postgres') def test_functions_with_cols(self): + # TODO: shouldnt this work on oracle too ? x = testbase.db.func.current_date().execute().scalar() y = testbase.db.func.current_date().select().execute().scalar() z = testbase.db.func.current_date().scalar() diff --git a/test/sql/testtypes.py b/test/sql/testtypes.py index e723575e58..c3dff52721 100644 --- a/test/sql/testtypes.py +++ b/test/sql/testtypes.py @@ -375,51 +375,6 @@ class IntervalTest(AssertMixin): interval_table.insert().execute(interval=delta) assert interval_table.select().execute().fetchone()['interval'] == delta - -class TimezoneTest(AssertMixin): - """test timezone-aware datetimes. psycopg will return a datetime with a tzinfo attached to it, - if postgres returns it. python then will not let you compare a datetime with a tzinfo to a datetime - that doesnt have one. this test illustrates two ways to have datetime types with and without timezone - info. """ - @testbase.supported('postgres') - def setUpAll(self): - global tztable, notztable, metadata - metadata = BoundMetaData(testbase.db) - - # current_timestamp() in postgres is assumed to return TIMESTAMP WITH TIMEZONE - tztable = Table('tztable', metadata, - Column("id", Integer, primary_key=True), - Column("date", DateTime(timezone=True), onupdate=func.current_timestamp()), - Column("name", String(20)), - ) - notztable = Table('notztable', metadata, - Column("id", Integer, primary_key=True), - Column("date", DateTime(timezone=False), onupdate=cast(func.current_timestamp(), DateTime(timezone=False))), - Column("name", String(20)), - ) - metadata.create_all() - @testbase.supported('postgres') - def tearDownAll(self): - metadata.drop_all() - - @testbase.supported('postgres') - def testtz(self): - # get a date with a tzinfo - somedate = testbase.db.connect().scalar(func.current_timestamp().select()) - tztable.insert().execute(id=1, name='row1', date=somedate) - c = tztable.update(tztable.c.id==1).execute(name='newname') - x = c.last_updated_params() - print x['date'] == somedate - - @testbase.supported('postgres') - def testnotz(self): - # get a date without a tzinfo - somedate = datetime.datetime(2005, 10,20, 11, 52, 00) - notztable.insert().execute(id=1, name='row1', date=somedate) - c = notztable.update(notztable.c.id==1).execute(name='newname') - x = c.last_updated_params() - print x['date'] == somedate - class BooleanTest(AssertMixin): def setUpAll(self): global bool_table -- 2.47.2