and ``SERIALIZABLE``. Note that the psycopg2 dialect does *not* use this
technique and uses psycopg2-specific APIs (see that dialect for details).
+Remote / Cross-Schema Table Introspection
+-----------------------------------------
+
+Tables can be introspected from any accessible schema, including
+inter-schema foreign key relationships. However, care must be taken
+when specifying the "schema" argument for a given :class:`.Table`, when
+the given schema is also present in PostgreSQL's ``search_path`` variable
+for the current connection.
+
+If a FOREIGN KEY constraint reports that the remote table's schema is within
+the current ``search_path``, the "schema" attribute of the resulting
+:class:`.Table` will be set to ``None``, unless the actual schema of the
+remote table matches that of the referencing table, and the "schema" argument
+was explicitly stated on the referencing table.
+
+The best practice here is to not use the ``schema`` argument
+on :class:`.Table` for any schemas that are present in ``search_path``.
+``search_path`` defaults to "public", but care should be taken
+to inspect the actual value using::
+
+ SHOW search_path;
+
+Prior to version 0.7.3, cross-schema foreign keys when the schemas
+were also in the ``search_path`` could make an incorrect assumption
+if the schemas were explicitly stated on each :class:`.Table`.
+
+Background on PG's ``search_path`` is at:
+http://www.postgresql.org/docs/9.0/static/ddl-schemas.html#DDL-SCHEMAS-PATH
+
INSERT/UPDATE...RETURNING
-------------------------
preparer = self.identifier_preparer
table_oid = self.get_table_oid(connection, table_name, schema,
info_cache=kw.get('info_cache'))
+
FK_SQL = """
- SELECT conname, pg_catalog.pg_get_constraintdef(oid, true) as condef
- FROM pg_catalog.pg_constraint r
- WHERE r.conrelid = :table AND r.contype = 'f'
+ SELECT r.conname,
+ pg_catalog.pg_get_constraintdef(r.oid, true) as condef,
+ n.nspname as conschema
+ FROM pg_catalog.pg_constraint r,
+ pg_namespace n,
+ pg_class c
+
+ WHERE r.conrelid = :table AND
+ r.contype = 'f' AND
+ c.oid = confrelid AND
+ n.oid = c.relnamespace
ORDER BY 1
"""
'condef':sqltypes.Unicode})
c = connection.execute(t, table=table_oid)
fkeys = []
- for conname, condef in c.fetchall():
+ for conname, condef, conschema in c.fetchall():
m = re.search('FOREIGN KEY \((.*?)\) REFERENCES '
'(?:(.*?)\.)?(.*?)\((.*?)\)', condef).groups()
constrained_columns, referred_schema, \
referred_table, referred_columns = m
constrained_columns = [preparer._unquote_identifier(x)
for x in re.split(r'\s*,\s*', constrained_columns)]
+
if referred_schema:
referred_schema =\
preparer._unquote_identifier(referred_schema)
- elif schema is not None and schema == self.default_schema_name:
- # no schema (i.e. its the default schema), and the table we're
- # reflecting has the default schema explicit, then use that.
- # i.e. try to use the user's conventions
+ elif schema is not None and schema == conschema:
+ # no schema was returned by pg_get_constraintdef(). This
+ # means the schema is in the search path. We will leave
+ # it as None, unless the actual schema, which we pull out
+ # from pg_namespace even though pg_get_constraintdef() doesn't
+ # want to give it to us, matches that of the referencing table,
+ # and an explicit schema was given for the referencing table.
referred_schema = schema
referred_table = preparer._unquote_identifier(referred_table)
referred_columns = [preparer._unquote_identifier(x)
"t.b AS b FROM t) AS sq WHERE t.id = sq.id"
)
+class ReflectionTest(fixtures.TestBase):
+ @testing.provide_metadata
+ def test_pg_weirdchar_reflection(self):
+ meta1 = self.metadata
+ subject = Table('subject', meta1, Column('id$', Integer,
+ primary_key=True))
+ referer = Table('referer', meta1, Column('id', Integer,
+ primary_key=True), Column('ref', Integer,
+ ForeignKey('subject.id$')))
+ meta1.create_all()
+ meta2 = MetaData(testing.db)
+ subject = Table('subject', meta2, autoload=True)
+ referer = Table('referer', meta2, autoload=True)
+ self.assert_((subject.c['id$']
+ == referer.c.ref).compare(
+ subject.join(referer).onclause))
+
+ @testing.provide_metadata
+ def test_renamed_sequence_reflection(self):
+ metadata = self.metadata
+ t = Table('t', metadata, Column('id', Integer, primary_key=True))
+ metadata.create_all()
+ m2 = MetaData(testing.db)
+ t2 = Table('t', m2, autoload=True, implicit_returning=False)
+ eq_(t2.c.id.server_default.arg.text,
+ "nextval('t_id_seq'::regclass)")
+ r = t2.insert().execute()
+ eq_(r.inserted_primary_key, [1])
+ testing.db.connect().execution_options(autocommit=True).\
+ execute('alter table t_id_seq rename to foobar_id_seq'
+ )
+ m3 = MetaData(testing.db)
+ t3 = Table('t', m3, autoload=True, implicit_returning=False)
+ eq_(t3.c.id.server_default.arg.text,
+ "nextval('foobar_id_seq'::regclass)")
+ r = t3.insert().execute()
+ eq_(r.inserted_primary_key, [2])
+
+ @testing.provide_metadata
+ def test_schema_reflection(self):
+ """note: this test requires that the 'test_schema' schema be
+ separate and accessible by the test user"""
+
+ meta1 = self.metadata
+
+ users = Table('users', meta1, Column('user_id', Integer,
+ primary_key=True), Column('user_name',
+ String(30), nullable=False), schema='test_schema')
+ addresses = Table(
+ 'email_addresses',
+ meta1,
+ Column('address_id', Integer, primary_key=True),
+ Column('remote_user_id', Integer,
+ ForeignKey(users.c.user_id)),
+ Column('email_address', String(20)),
+ schema='test_schema',
+ )
+ meta1.create_all()
+ meta2 = MetaData(testing.db)
+ addresses = Table('email_addresses', meta2, autoload=True,
+ schema='test_schema')
+ users = Table('users', meta2, mustexist=True,
+ schema='test_schema')
+ j = join(users, addresses)
+ self.assert_((users.c.user_id
+ == addresses.c.remote_user_id).compare(j.onclause))
+
+ @testing.provide_metadata
+ def test_schema_reflection_2(self):
+ meta1 = self.metadata
+ subject = Table('subject', meta1, Column('id', Integer,
+ primary_key=True))
+ referer = Table('referer', meta1, Column('id', Integer,
+ primary_key=True), Column('ref', Integer,
+ ForeignKey('subject.id')), schema='test_schema')
+ meta1.create_all()
+ meta2 = MetaData(testing.db)
+ subject = Table('subject', meta2, autoload=True)
+ referer = Table('referer', meta2, schema='test_schema',
+ autoload=True)
+ self.assert_((subject.c.id
+ == referer.c.ref).compare(
+ subject.join(referer).onclause))
+
+ @testing.provide_metadata
+ def test_schema_reflection_3(self):
+ meta1 = self.metadata
+ subject = Table('subject', meta1, Column('id', Integer,
+ primary_key=True), schema='test_schema_2')
+ referer = Table('referer', meta1, Column('id', Integer,
+ primary_key=True), Column('ref', Integer,
+ ForeignKey('test_schema_2.subject.id')),
+ schema='test_schema')
+ meta1.create_all()
+ meta2 = MetaData(testing.db)
+ subject = Table('subject', meta2, autoload=True,
+ schema='test_schema_2')
+ referer = Table('referer', meta2, schema='test_schema',
+ autoload=True)
+ self.assert_((subject.c.id
+ == referer.c.ref).compare(
+ subject.join(referer).onclause))
+
+ def test_schema_reflection_multi_search_path(self):
+ """test the 'set the same schema' rule when
+ multiple schemas/search paths are in effect."""
+
+ db = engines.testing_engine()
+ conn = db.connect()
+ trans = conn.begin()
+ try:
+ conn.execute("set search_path to test_schema_2, "
+ "test_schema, public")
+ conn.dialect.default_schema_name = "test_schema_2"
+
+ conn.execute("""
+ CREATE TABLE test_schema.some_table (
+ id SERIAL not null primary key
+ )
+ """)
+
+ conn.execute("""
+ CREATE TABLE test_schema_2.some_other_table (
+ id SERIAL not null primary key,
+ sid INTEGER REFERENCES test_schema.some_table(id)
+ )
+ """)
+
+ m1 = MetaData()
+
+ t2_schema = Table('some_other_table',
+ m1,
+ schema="test_schema_2",
+ autoload=True,
+ autoload_with=conn)
+ t1_schema = Table('some_table',
+ m1,
+ schema="test_schema",
+ autoload=True,
+ autoload_with=conn)
+
+ t2_no_schema = Table('some_other_table',
+ m1,
+ autoload=True,
+ autoload_with=conn)
+
+ t1_no_schema = Table('some_table',
+ m1,
+ autoload=True,
+ autoload_with=conn)
+
+ # OK, this because, "test_schema" is
+ # in the search path, and might as well be
+ # the default too. why would we assign
+ # a "schema" to the Table ?
+ assert t2_schema.c.sid.references(
+ t1_no_schema.c.id)
+
+ assert t2_no_schema.c.sid.references(
+ t1_no_schema.c.id)
+
+ finally:
+ trans.rollback()
+ conn.close()
+ db.dispose()
+
+ @testing.provide_metadata
+ def test_index_reflection(self):
+ """ Reflecting partial & expression-based indexes should warn
+ """
+
+ metadata = self.metadata
+
+ t1 = Table('party', metadata, Column('id', String(10),
+ nullable=False), Column('name', String(20),
+ index=True), Column('aname', String(20)))
+ metadata.create_all()
+ testing.db.execute("""
+ create index idx1 on party ((id || name))
+ """)
+ testing.db.execute("""
+ create unique index idx2 on party (id) where name = 'test'
+ """)
+ testing.db.execute("""
+ create index idx3 on party using btree
+ (lower(name::text), lower(aname::text))
+ """)
+
+ def go():
+ m2 = MetaData(testing.db)
+ t2 = Table('party', m2, autoload=True)
+ assert len(t2.indexes) == 2
+
+ # Make sure indexes are in the order we expect them in
+
+ tmp = [(idx.name, idx) for idx in t2.indexes]
+ tmp.sort()
+ r1, r2 = [idx[1] for idx in tmp]
+ assert r1.name == 'idx2'
+ assert r1.unique == True
+ assert r2.unique == False
+ assert [t2.c.id] == r1.columns
+ assert [t2.c.name] == r2.columns
+
+ testing.assert_warnings(go,
+ [
+ 'Skipped unsupported reflection of '
+ 'expression-based index idx1',
+ 'Predicate of partial index idx2 ignored during '
+ 'reflection',
+ 'Skipped unsupported reflection of '
+ 'expression-based index idx3'
+ ])
+
+ @testing.provide_metadata
+ def test_index_reflection_modified(self):
+ """reflect indexes when a column name has changed - PG 9
+ does not update the name of the column in the index def.
+ [ticket:2141]
+
+ """
+
+ metadata = self.metadata
+
+ t1 = Table('t', metadata,
+ Column('id', Integer, primary_key=True),
+ Column('x', Integer)
+ )
+ metadata.create_all()
+ conn = testing.db.connect().execution_options(autocommit=True)
+ conn.execute("CREATE INDEX idx1 ON t (x)")
+ conn.execute("ALTER TABLE t RENAME COLUMN x to y")
+
+ ind = testing.db.dialect.get_indexes(conn, "t", None)
+ eq_(ind, [{'unique': False, 'column_names': [u'y'], 'name': u'idx1'}])
+ conn.close()
class MiscTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL):
assert 'will create implicit sequence' in msgs
assert 'will create implicit index' in msgs
- def test_pg_weirdchar_reflection(self):
- meta1 = MetaData(testing.db)
- subject = Table('subject', meta1, Column('id$', Integer,
- primary_key=True))
- referer = Table('referer', meta1, Column('id', Integer,
- primary_key=True), Column('ref', Integer,
- ForeignKey('subject.id$')))
- meta1.create_all()
- try:
- meta2 = MetaData(testing.db)
- subject = Table('subject', meta2, autoload=True)
- referer = Table('referer', meta2, autoload=True)
- print str(subject.join(referer).onclause)
- self.assert_((subject.c['id$']
- == referer.c.ref).compare(
- subject.join(referer).onclause))
- finally:
- meta1.drop_all()
@testing.fails_on('+zxjdbc',
"Can't infer the SQL type to use for an instance "
finally:
t.drop(checkfirst=True)
- @testing.provide_metadata
- def test_renamed_sequence_reflection(self):
- metadata = self.metadata
- t = Table('t', metadata, Column('id', Integer, primary_key=True))
- metadata.create_all()
- m2 = MetaData(testing.db)
- t2 = Table('t', m2, autoload=True, implicit_returning=False)
- eq_(t2.c.id.server_default.arg.text,
- "nextval('t_id_seq'::regclass)")
- r = t2.insert().execute()
- eq_(r.inserted_primary_key, [1])
- testing.db.connect().execution_options(autocommit=True).\
- execute('alter table t_id_seq rename to foobar_id_seq'
- )
- m3 = MetaData(testing.db)
- t3 = Table('t', m3, autoload=True, implicit_returning=False)
- eq_(t3.c.id.server_default.arg.text,
- "nextval('foobar_id_seq'::regclass)")
- r = t3.insert().execute()
- eq_(r.inserted_primary_key, [2])
-
- def test_schema_reflection(self):
- """note: this test requires that the 'test_schema' schema be
- separate and accessible by the test user"""
-
- meta1 = MetaData(testing.db)
- users = Table('users', meta1, Column('user_id', Integer,
- primary_key=True), Column('user_name',
- String(30), nullable=False), schema='test_schema')
- addresses = Table(
- 'email_addresses',
- meta1,
- Column('address_id', Integer, primary_key=True),
- Column('remote_user_id', Integer,
- ForeignKey(users.c.user_id)),
- Column('email_address', String(20)),
- schema='test_schema',
- )
- meta1.create_all()
- try:
- meta2 = MetaData(testing.db)
- addresses = Table('email_addresses', meta2, autoload=True,
- schema='test_schema')
- users = Table('users', meta2, mustexist=True,
- schema='test_schema')
- print users
- print addresses
- j = join(users, addresses)
- print str(j.onclause)
- self.assert_((users.c.user_id
- == addresses.c.remote_user_id).compare(j.onclause))
- finally:
- meta1.drop_all()
-
- def test_schema_reflection_2(self):
- meta1 = MetaData(testing.db)
- subject = Table('subject', meta1, Column('id', Integer,
- primary_key=True))
- referer = Table('referer', meta1, Column('id', Integer,
- primary_key=True), Column('ref', Integer,
- ForeignKey('subject.id')), schema='test_schema')
- meta1.create_all()
- try:
- meta2 = MetaData(testing.db)
- subject = Table('subject', meta2, autoload=True)
- referer = Table('referer', meta2, schema='test_schema',
- autoload=True)
- print str(subject.join(referer).onclause)
- self.assert_((subject.c.id
- == referer.c.ref).compare(
- subject.join(referer).onclause))
- finally:
- meta1.drop_all()
-
- def test_schema_reflection_3(self):
- meta1 = MetaData(testing.db)
- subject = Table('subject', meta1, Column('id', Integer,
- primary_key=True), schema='test_schema_2')
- referer = Table('referer', meta1, Column('id', Integer,
- primary_key=True), Column('ref', Integer,
- ForeignKey('test_schema_2.subject.id')),
- schema='test_schema')
- meta1.create_all()
- try:
- meta2 = MetaData(testing.db)
- subject = Table('subject', meta2, autoload=True,
- schema='test_schema_2')
- referer = Table('referer', meta2, schema='test_schema',
- autoload=True)
- print str(subject.join(referer).onclause)
- self.assert_((subject.c.id
- == referer.c.ref).compare(
- subject.join(referer).onclause))
- finally:
- meta1.drop_all()
-
def test_schema_roundtrips(self):
meta = MetaData(testing.db)
users = Table('users', meta, Column('id', Integer,
finally:
testing.db.execute('drop table speedy_users')
- @testing.provide_metadata
- def test_index_reflection(self):
- """ Reflecting partial & expression-based indexes should warn
- """
-
- metadata = self.metadata
-
- t1 = Table('party', metadata, Column('id', String(10),
- nullable=False), Column('name', String(20),
- index=True), Column('aname', String(20)))
- metadata.create_all()
- testing.db.execute("""
- create index idx1 on party ((id || name))
- """)
- testing.db.execute("""
- create unique index idx2 on party (id) where name = 'test'
- """)
- testing.db.execute("""
- create index idx3 on party using btree
- (lower(name::text), lower(aname::text))
- """)
-
- def go():
- m2 = MetaData(testing.db)
- t2 = Table('party', m2, autoload=True)
- assert len(t2.indexes) == 2
-
- # Make sure indexes are in the order we expect them in
-
- tmp = [(idx.name, idx) for idx in t2.indexes]
- tmp.sort()
- r1, r2 = [idx[1] for idx in tmp]
- assert r1.name == 'idx2'
- assert r1.unique == True
- assert r2.unique == False
- assert [t2.c.id] == r1.columns
- assert [t2.c.name] == r2.columns
-
- testing.assert_warnings(go,
- [
- 'Skipped unsupported reflection of '
- 'expression-based index idx1',
- 'Predicate of partial index idx2 ignored during '
- 'reflection',
- 'Skipped unsupported reflection of '
- 'expression-based index idx3'
- ])
-
- @testing.provide_metadata
- def test_index_reflection_modified(self):
- """reflect indexes when a column name has changed - PG 9
- does not update the name of the column in the index def.
- [ticket:2141]
-
- """
-
- metadata = self.metadata
-
- t1 = Table('t', metadata,
- Column('id', Integer, primary_key=True),
- Column('x', Integer)
- )
- metadata.create_all()
- conn = testing.db.connect().execution_options(autocommit=True)
- conn.execute("CREATE INDEX idx1 ON t (x)")
- conn.execute("ALTER TABLE t RENAME COLUMN x to y")
-
- ind = testing.db.dialect.get_indexes(conn, "t", None)
- eq_(ind, [{'unique': False, 'column_names': [u'y'], 'name': u'idx1'}])
- conn.close()
@testing.fails_on('+zxjdbc', 'psycopg2/pg8000 specific assertion')
@testing.fails_on('pypostgresql',