(i.e., if a textual execute() was issued).
- postgres
- fixed escaping of the modulo operator [ticket:624]
+ - added support for reflection of domains [ticket:570]
+ - types which are missing during reflection resolve to Null type
+ instead of raising an error
- sqlite
- sqlite better handles datetime/date/time objects mixed and matched
with various Date/Time/DateTime columns
# This module is part of SQLAlchemy and is released under
# the MIT License: http://www.opensource.org/licenses/mit-license.php
-import datetime, string, types, re, random
+import datetime, string, types, re, random, warnings
from sqlalchemy import util, sql, schema, ansisql, exceptions
from sqlalchemy.engine import base, default
self.version = 1
self.use_information_schema = use_information_schema
self.paramstyle = 'pyformat'
-
+
def dbapi(cls):
try:
import psycopg2 as psycopg
if not rows:
raise exceptions.NoSuchTableError(table.name)
+ domains = self._load_domains(connection)
+
for name, format_type, default, notnull, attnum, table_oid in rows:
## strip (30) from character varying(30)
attype = re.search('([^\(]+)', format_type).group(1)
elif attype == 'timestamp without time zone':
kwargs['timezone'] = False
- coltype = ischema_names[attype]
- coltype = coltype(*args, **kwargs)
+ if attype in ischema_names:
+ coltype = ischema_names[attype]
+ else:
+ if attype in domains:
+ domain = domains[attype]
+ if domain['attype'] in ischema_names:
+ # A table can't override whether the domain is nullable.
+ nullable = domain['nullable']
+
+ if domain['default'] and not default:
+ # It can, however, override the default value, but can't set it to null.
+ default = domain['default']
+ coltype = ischema_names[domain['attype']]
+ else:
+ coltype=None
+
+ if coltype:
+ coltype = coltype(*args, **kwargs)
+ else:
+ warnings.warn(RuntimeWarning("Did not recognize type '%s' of column '%s'" % (attype, name)))
+ coltype = sqltypes.NULLTYPE
+
colargs= []
if default is not None:
match = re.search(r"""(nextval\(')([^']+)('.*$)""", default)
refspec.append(".".join([referred_table, column]))
table.append_constraint(schema.ForeignKeyConstraint(constrained_columns, refspec, conname))
+
+ def _load_domains(self, connection):
+ if hasattr(self, '_domains'):
+ return self._domains
+
+ ## Load data types for domains:
+ SQL_DOMAINS = """
+ SELECT t.typname as "name",
+ pg_catalog.format_type(t.typbasetype, t.typtypmod) as "attype",
+ not t.typnotnull as "nullable",
+ t.typdefault as "default",
+ pg_catalog.pg_type_is_visible(t.oid) as "visible",
+ n.nspname as "schema"
+ FROM pg_catalog.pg_type t
+ LEFT JOIN pg_catalog.pg_namespace n ON n.oid = t.typnamespace
+ LEFT JOIN pg_catalog.pg_constraint r ON t.oid = r.contypid
+ WHERE t.typtype = 'd'
+ """
+ s = sql.text(SQL_DOMAINS, typemap={'attname':sqltypes.Unicode})
+ c = connection.execute(s)
+
+ domains = {}
+ for domain in c.fetchall():
+ ## strip (30) from character varying(30)
+ attype = re.search('([^\(]+)', domain['attype']).group(1)
+ if domain['visible']:
+ # 'visible' just means whether or not the domain is in a
+ # schema that's on the search path -- or not overriden by
+ # a schema with higher presedence. If it's not visible,
+ # it will be prefixed with the schema-name when it's used.
+ name = domain['name']
+ else:
+ name = "%s.%s" % (domain['schema'], domain['name'])
+
+ domains[name] = {'attype':attype, 'nullable': domain['nullable'], 'default': domain['default']}
+
+ self._domains = domains
+
+ return self._domains
+
+
+
class PGCompiler(ansisql.ANSICompiler):
def visit_insert_column(self, column, parameters):
# all column primary key inserts must be explicitly present
def suite():
modules_to_test = (
'dialect.mysql',
+ 'dialect.postgres',
)
alltests = unittest.TestSuite()
for name in modules_to_test:
--- /dev/null
+from testbase import AssertMixin
+import testbase
+from sqlalchemy import *
+from sqlalchemy.databases import postgres
+import datetime
+
+db = testbase.db
+
+class DomainReflectionTest(AssertMixin):
+ "Test PostgreSQL domains"
+
+ @testbase.supported('postgres')
+ def setUpAll(self):
+ self.con = db.connect()
+ self.con.execute('CREATE DOMAIN testdomain INTEGER NOT NULL DEFAULT 42')
+ self.con.execute('CREATE DOMAIN alt_schema.testdomain INTEGER DEFAULT 0')
+ self.con.execute('CREATE TABLE testtable (question integer, answer testdomain)')
+ self.con.execute('CREATE TABLE alt_schema.testtable(question integer, answer alt_schema.testdomain, anything integer)')
+ self.con.execute('CREATE TABLE crosschema (question integer, answer alt_schema.testdomain)')
+
+ @testbase.supported('postgres')
+ def tearDownAll(self):
+ self.con.execute('DROP TABLE testtable')
+ self.con.execute('DROP TABLE alt_schema.testtable')
+ self.con.execute('DROP TABLE crosschema')
+ self.con.execute('DROP DOMAIN testdomain')
+ self.con.execute('DROP DOMAIN alt_schema.testdomain')
+
+ @testbase.supported('postgres')
+ def test_table_is_reflected(self):
+ metadata = BoundMetaData(db)
+ table = Table('testtable', metadata, autoload=True)
+ self.assertEquals(set(table.columns.keys()), set(['question', 'answer']), "Columns of reflected table didn't equal expected columns")
+ self.assertEquals(table.c.answer.type.__class__, postgres.PGInteger)
+
+ @testbase.supported('postgres')
+ def test_domain_is_reflected(self):
+ metadata = BoundMetaData(db)
+ table = Table('testtable', metadata, autoload=True)
+ self.assertEquals(str(table.columns.answer.default.arg), '42', "Reflected default value didn't equal expected value")
+ self.assertFalse(table.columns.answer.nullable, "Expected reflected column to not be nullable.")
+
+ @testbase.supported('postgres')
+ def test_table_is_reflected_alt_schema(self):
+ metadata = BoundMetaData(db)
+ table = Table('testtable', metadata, autoload=True, schema='alt_schema')
+ self.assertEquals(set(table.columns.keys()), set(['question', 'answer', 'anything']), "Columns of reflected table didn't equal expected columns")
+ self.assertEquals(table.c.anything.type.__class__, postgres.PGInteger)
+
+ @testbase.supported('postgres')
+ def test_schema_domain_is_reflected(self):
+ metadata = BoundMetaData(db)
+ table = Table('testtable', metadata, autoload=True, schema='alt_schema')
+ self.assertEquals(str(table.columns.answer.default.arg), '0', "Reflected default value didn't equal expected value")
+ self.assertTrue(table.columns.answer.nullable, "Expected reflected column to be nullable.")
+
+ @testbase.supported('postgres')
+ def test_crosschema_domain_is_reflected(self):
+ metadata = BoundMetaData(db)
+ table = Table('crosschema', metadata, autoload=True)
+ self.assertEquals(str(table.columns.answer.default.arg), '0', "Reflected default value didn't equal expected value")
+ self.assertTrue(table.columns.answer.nullable, "Expected reflected column to be nullable.")
+
+class MiscTest(AssertMixin):
+ @testbase.supported('postgres')
+ def test_date_reflection(self):
+ m1 = BoundMetaData(testbase.db)
+ t1 = Table('pgdate', m1,
+ Column('date1', DateTime(timezone=True)),
+ Column('date2', DateTime(timezone=False))
+ )
+ m1.create_all()
+ try:
+ m2 = BoundMetaData(testbase.db)
+ t2 = Table('pgdate', m2, autoload=True)
+ assert t2.c.date1.type.timezone is True
+ assert t2.c.date2.type.timezone is False
+ finally:
+ m1.drop_all()
+
+ @testbase.supported('postgres')
+ def test_checksfor_sequence(self):
+ meta1 = BoundMetaData(testbase.db)
+ t = Table('mytable', meta1,
+ Column('col1', Integer, Sequence('fooseq')))
+ try:
+ testbase.db.execute("CREATE SEQUENCE fooseq")
+ t.create()
+ finally:
+ t.drop()
+
+ @testbase.supported('postgres')
+ def test_schema_reflection(self):
+ """note: this test requires that the 'alt_schema' schema be separate and accessible by the test user"""
+
+ meta1 = BoundMetaData(testbase.db)
+ users = Table('users', meta1,
+ Column('user_id', Integer, primary_key = True),
+ Column('user_name', String(30), nullable = False),
+ schema="alt_schema"
+ )
+
+ addresses = Table('email_addresses', meta1,
+ Column('address_id', Integer, primary_key = True),
+ Column('remote_user_id', Integer, ForeignKey(users.c.user_id)),
+ Column('email_address', String(20)),
+ schema="alt_schema"
+ )
+ meta1.create_all()
+ try:
+ meta2 = BoundMetaData(testbase.db)
+ addresses = Table('email_addresses', meta2, autoload=True, schema="alt_schema")
+ users = Table('users', meta2, mustexist=True, schema="alt_schema")
+
+ print users
+ print addresses
+ j = join(users, addresses)
+ print str(j.onclause)
+ self.assert_((users.c.user_id==addresses.c.remote_user_id).compare(j.onclause))
+ finally:
+ meta1.drop_all()
+
+ @testbase.supported('postgres')
+ def test_preexecute_passivedefault(self):
+ """test that when we get a primary key column back
+ from reflecting a table which has a default value on it, we pre-execute
+ that PassiveDefault upon insert."""
+
+ try:
+ meta = BoundMetaData(testbase.db)
+ testbase.db.execute("""
+ CREATE TABLE speedy_users
+ (
+ speedy_user_id SERIAL PRIMARY KEY,
+
+ user_name VARCHAR NOT NULL,
+ user_password VARCHAR NOT NULL
+ );
+ """, None)
+
+ t = Table("speedy_users", meta, autoload=True)
+ r = t.insert().execute(user_name='user', user_password='lala')
+ assert r.last_inserted_ids() == [1]
+ l = t.select().execute().fetchall()
+ assert l == [(1, 'user', 'lala')]
+ finally:
+ testbase.db.execute("drop table speedy_users", None)
+
+class TimezoneTest(AssertMixin):
+ """test timezone-aware datetimes. psycopg will return a datetime with a tzinfo attached to it,
+ if postgres returns it. python then will not let you compare a datetime with a tzinfo to a datetime
+ that doesnt have one. this test illustrates two ways to have datetime types with and without timezone
+ info. """
+ @testbase.supported('postgres')
+ def setUpAll(self):
+ global tztable, notztable, metadata
+ metadata = BoundMetaData(testbase.db)
+
+ # current_timestamp() in postgres is assumed to return TIMESTAMP WITH TIMEZONE
+ tztable = Table('tztable', metadata,
+ Column("id", Integer, primary_key=True),
+ Column("date", DateTime(timezone=True), onupdate=func.current_timestamp()),
+ Column("name", String(20)),
+ )
+ notztable = Table('notztable', metadata,
+ Column("id", Integer, primary_key=True),
+ Column("date", DateTime(timezone=False), onupdate=cast(func.current_timestamp(), DateTime(timezone=False))),
+ Column("name", String(20)),
+ )
+ metadata.create_all()
+ @testbase.supported('postgres')
+ def tearDownAll(self):
+ metadata.drop_all()
+
+ @testbase.supported('postgres')
+ def test_with_timezone(self):
+ # get a date with a tzinfo
+ somedate = testbase.db.connect().scalar(func.current_timestamp().select())
+ tztable.insert().execute(id=1, name='row1', date=somedate)
+ c = tztable.update(tztable.c.id==1).execute(name='newname')
+ x = c.last_updated_params()
+ print x['date'] == somedate
+
+ @testbase.supported('postgres')
+ def test_without_timezone(self):
+ # get a date without a tzinfo
+ somedate = datetime.datetime(2005, 10,20, 11, 52, 00)
+ notztable.insert().execute(id=1, name='row1', date=somedate)
+ c = notztable.update(notztable.c.id==1).execute(name='newname')
+ x = c.last_updated_params()
+ print x['date'] == somedate
+
+
+if __name__ == "__main__":
+ testbase.main()
addresses.drop()
users.drop()
- @testbase.supported('postgres')
- def testpgdates(self):
- m1 = BoundMetaData(testbase.db)
- t1 = Table('pgdate', m1,
- Column('date1', DateTime(timezone=True)),
- Column('date2', DateTime(timezone=False))
- )
- m1.create_all()
- try:
- m2 = BoundMetaData(testbase.db)
- t2 = Table('pgdate', m2, autoload=True)
- assert t2.c.date1.type.timezone is True
- assert t2.c.date2.type.timezone is False
- finally:
- m1.drop_all()
-
-
def testoverridecolumns(self):
"""test that you can override columns which contain foreign keys to other reflected tables"""
meta = BoundMetaData(testbase.db)
finally:
table.drop(checkfirst=True)
- @testbase.supported('postgres')
- def testredundantsequence(self):
- """test that sequences get checked for, before create"""
- meta1 = BoundMetaData(testbase.db)
- t = Table('mytable', meta1,
- Column('col1', Integer, Sequence('fooseq')))
- try:
- testbase.db.execute("CREATE SEQUENCE fooseq")
- t.create()
- finally:
- t.drop()
def test_pks_not_uniques(self):
"""test that primary key reflection not tripped up by unique indexes"""
assert buf.index("CREATE TABLE someschema.table1") > -1
assert buf.index("CREATE TABLE someschema.table2") > -1
- @testbase.supported('postgres')
- def testpg(self):
- """note: this test requires that the 'alt_schema' schema be separate and accessible by the test user"""
-
- meta1 = BoundMetaData(testbase.db)
- users = Table('users', meta1,
- Column('user_id', Integer, primary_key = True),
- Column('user_name', String(30), nullable = False),
- schema="alt_schema"
- )
-
- addresses = Table('email_addresses', meta1,
- Column('address_id', Integer, primary_key = True),
- Column('remote_user_id', Integer, ForeignKey(users.c.user_id)),
- Column('email_address', String(20)),
- schema="alt_schema"
- )
- meta1.create_all()
- try:
- meta2 = BoundMetaData(testbase.db)
- addresses = Table('email_addresses', meta2, autoload=True, schema="alt_schema")
- users = Table('users', meta2, mustexist=True, schema="alt_schema")
-
- print users
- print addresses
- j = join(users, addresses)
- print str(j.onclause)
- self.assert_((users.c.user_id==addresses.c.remote_user_id).compare(j.onclause))
- finally:
- meta1.drop_all()
if __name__ == "__main__":
testbase.main()
default_metadata.drop_all()
default_metadata.clear()
- @testbase.supported('postgres')
- def testpassiveoverride(self):
- """primarily for postgres, tests that when we get a primary key column back
- from reflecting a table which has a default value on it, we pre-execute
- that PassiveDefault upon insert, even though PassiveDefault says
- "let the database execute this", because in postgres we must have all the primary
- key values in memory before insert; otherwise we cant locate the just inserted row."""
- try:
- meta = BoundMetaData(testbase.db)
- testbase.db.execute("""
- CREATE TABLE speedy_users
- (
- speedy_user_id SERIAL PRIMARY KEY,
-
- user_name VARCHAR NOT NULL,
- user_password VARCHAR NOT NULL
- );
- """, None)
-
- t = Table("speedy_users", meta, autoload=True)
- t.insert().execute(user_name='user', user_password='lala')
- l = t.select().execute().fetchall()
- self.assert_(l == [(1, 'user', 'lala')])
- finally:
- testbase.db.execute("drop table speedy_users", None)
-
- @testbase.supported('postgres')
- def testschema(self):
- meta1 = BoundMetaData(testbase.db)
- test_table = Table('my_table', meta1,
- Column('id', Integer, primary_key=True),
- Column('data', String(20), nullable=False),
- schema='alt_schema'
- )
- test_table.create()
- try:
- # plain insert
- test_table.insert().execute(data='test')
-
- meta2 = BoundMetaData(testbase.db)
- test_table = Table('my_table', meta2, autoload=True, schema='alt_schema')
- test_table.insert().execute(data='test')
-
- finally:
- test_table.drop()
def test_repeated_bindparams(self):
"""test that a BindParam can be used more than once.
@testbase.supported('postgres')
def test_functions_with_cols(self):
+ # TODO: shouldnt this work on oracle too ?
x = testbase.db.func.current_date().execute().scalar()
y = testbase.db.func.current_date().select().execute().scalar()
z = testbase.db.func.current_date().scalar()
interval_table.insert().execute(interval=delta)
assert interval_table.select().execute().fetchone()['interval'] == delta
-
-class TimezoneTest(AssertMixin):
- """test timezone-aware datetimes. psycopg will return a datetime with a tzinfo attached to it,
- if postgres returns it. python then will not let you compare a datetime with a tzinfo to a datetime
- that doesnt have one. this test illustrates two ways to have datetime types with and without timezone
- info. """
- @testbase.supported('postgres')
- def setUpAll(self):
- global tztable, notztable, metadata
- metadata = BoundMetaData(testbase.db)
-
- # current_timestamp() in postgres is assumed to return TIMESTAMP WITH TIMEZONE
- tztable = Table('tztable', metadata,
- Column("id", Integer, primary_key=True),
- Column("date", DateTime(timezone=True), onupdate=func.current_timestamp()),
- Column("name", String(20)),
- )
- notztable = Table('notztable', metadata,
- Column("id", Integer, primary_key=True),
- Column("date", DateTime(timezone=False), onupdate=cast(func.current_timestamp(), DateTime(timezone=False))),
- Column("name", String(20)),
- )
- metadata.create_all()
- @testbase.supported('postgres')
- def tearDownAll(self):
- metadata.drop_all()
-
- @testbase.supported('postgres')
- def testtz(self):
- # get a date with a tzinfo
- somedate = testbase.db.connect().scalar(func.current_timestamp().select())
- tztable.insert().execute(id=1, name='row1', date=somedate)
- c = tztable.update(tztable.c.id==1).execute(name='newname')
- x = c.last_updated_params()
- print x['date'] == somedate
-
- @testbase.supported('postgres')
- def testnotz(self):
- # get a date without a tzinfo
- somedate = datetime.datetime(2005, 10,20, 11, 52, 00)
- notztable.insert().execute(id=1, name='row1', date=somedate)
- c = notztable.update(notztable.c.id==1).execute(name='newname')
- x = c.last_updated_params()
- print x['date'] == somedate
-
class BooleanTest(AssertMixin):
def setUpAll(self):
global bool_table