]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
- Narrowed the assumption made when reflecting
authorMike Bayer <mike_mp@zzzcomputing.com>
Tue, 9 Aug 2011 23:45:20 +0000 (19:45 -0400)
committerMike Bayer <mike_mp@zzzcomputing.com>
Tue, 9 Aug 2011 23:45:20 +0000 (19:45 -0400)
a foreign-key referenced table with schema in
the current search path; an explicit schema will
be applied to the referenced table only if
it actually matches that of the referencing table,
which also has an explicit schema.   Previously
it was assumed that "current" schema was synonymous
with the full search_path.  [ticket:2249]

CHANGES
lib/sqlalchemy/dialects/postgresql/base.py
test/dialect/test_postgresql.py
test/engine/test_reflection.py

diff --git a/CHANGES b/CHANGES
index 190169abdbedd2e61d93597bd470739e44df1b8a..485774dc270ef2b7b10e2975fdcb9f8f8bcdc200 100644 (file)
--- a/CHANGES
+++ b/CHANGES
@@ -41,6 +41,15 @@ CHANGES
     source for server side cursor names; 
     conflicts have been reported in rare cases.
 
+  - Narrowed the assumption made when reflecting
+    a foreign-key referenced table with schema in
+    the current search path; an explicit schema will
+    be applied to the referenced table only if 
+    it actually matches that of the referencing table,
+    which also has an explicit schema.   Previously
+    it was assumed that "current" schema was synonymous
+    with the full search_path.  [ticket:2249]
+
 - mssql
   - "0" is accepted as an argument for limit() which
     will produce "TOP 0". [ticket:2222]
index 4c66ee91a572729ae0f0e3be9153a90c4f1ee656..5ff20ce734971b97aea4deef74e06cb95484ed8f 100644 (file)
@@ -51,6 +51,35 @@ parameter are ``READ_COMMITTED``, ``READ_UNCOMMITTED``, ``REPEATABLE_READ``,
 and ``SERIALIZABLE``.  Note that the psycopg2 dialect does *not* use this
 technique and uses psycopg2-specific APIs (see that dialect for details).
 
+Remote / Cross-Schema Table Introspection
+-----------------------------------------
+
+Tables can be introspected from any accessible schema, including
+inter-schema foreign key relationships.   However, care must be taken
+when specifying the "schema" argument for a given :class:`.Table`, when
+the given schema is also present in PostgreSQL's ``search_path`` variable
+for the current connection.
+
+If a FOREIGN KEY constraint reports that the remote table's schema is within
+the current ``search_path``, the "schema" attribute of the resulting
+:class:`.Table` will be set to ``None``, unless the actual schema of the
+remote table matches that of the referencing table, and the "schema" argument
+was explicitly stated on the referencing table.
+
+The best practice here is to not use the ``schema`` argument 
+on :class:`.Table` for any schemas that are present in ``search_path``.
+``search_path`` defaults to "public", but care should be taken
+to inspect the actual value using::
+
+    SHOW search_path;
+
+Prior to version 0.7.3, cross-schema foreign keys when the schemas
+were also in the ``search_path`` could make an incorrect assumption
+if the schemas were explicitly stated on each :class:`.Table`.
+
+Background on PG's ``search_path`` is at: 
+http://www.postgresql.org/docs/9.0/static/ddl-schemas.html#DDL-SCHEMAS-PATH
+
 INSERT/UPDATE...RETURNING
 -------------------------
 
@@ -1298,10 +1327,19 @@ class PGDialect(default.DefaultDialect):
         preparer = self.identifier_preparer
         table_oid = self.get_table_oid(connection, table_name, schema,
                                        info_cache=kw.get('info_cache'))
+
         FK_SQL = """
-          SELECT conname, pg_catalog.pg_get_constraintdef(oid, true) as condef
-          FROM  pg_catalog.pg_constraint r
-          WHERE r.conrelid = :table AND r.contype = 'f'
+          SELECT r.conname, 
+                pg_catalog.pg_get_constraintdef(r.oid, true) as condef,
+                n.nspname as conschema
+          FROM  pg_catalog.pg_constraint r,
+                pg_namespace n,
+                pg_class c
+
+          WHERE r.conrelid = :table AND 
+                r.contype = 'f' AND
+                c.oid = confrelid AND
+                n.oid = c.relnamespace
           ORDER BY 1
         """
 
@@ -1310,20 +1348,24 @@ class PGDialect(default.DefaultDialect):
                                 'condef':sqltypes.Unicode})
         c = connection.execute(t, table=table_oid)
         fkeys = []
-        for conname, condef in c.fetchall():
+        for conname, condef, conschema in c.fetchall():
             m = re.search('FOREIGN KEY \((.*?)\) REFERENCES '
                             '(?:(.*?)\.)?(.*?)\((.*?)\)', condef).groups()
             constrained_columns, referred_schema, \
                     referred_table, referred_columns = m
             constrained_columns = [preparer._unquote_identifier(x) 
                         for x in re.split(r'\s*,\s*', constrained_columns)]
+
             if referred_schema:
                 referred_schema =\
                                 preparer._unquote_identifier(referred_schema)
-            elif schema is not None and schema == self.default_schema_name:
-                # no schema (i.e. its the default schema), and the table we're
-                # reflecting has the default schema explicit, then use that.
-                # i.e. try to use the user's conventions
+            elif schema is not None and schema == conschema:
+                # no schema was returned by pg_get_constraintdef().  This
+                # means the schema is in the search path.   We will leave
+                # it as None, unless the actual schema, which we pull out
+                # from pg_namespace even though pg_get_constraintdef() doesn't
+                # want to give it to us, matches that of the referencing table,
+                # and an explicit schema was given for the referencing table.
                 referred_schema = schema
             referred_table = preparer._unquote_identifier(referred_table)
             referred_columns = [preparer._unquote_identifier(x) 
index 97221449ecba608360fd0f4b854e59c477340560..9171ff819dd455e20570ff112d90cbd4420c4fe4 100644 (file)
@@ -1255,6 +1255,242 @@ class DistinctOnTest(fixtures.TestBase, AssertsCompiledSQL):
             "t.b AS b FROM t) AS sq WHERE t.id = sq.id"
             )
 
+class ReflectionTest(fixtures.TestBase):
+    @testing.provide_metadata
+    def test_pg_weirdchar_reflection(self):
+        meta1 = self.metadata
+        subject = Table('subject', meta1, Column('id$', Integer,
+                        primary_key=True))
+        referer = Table('referer', meta1, Column('id', Integer,
+                        primary_key=True), Column('ref', Integer,
+                        ForeignKey('subject.id$')))
+        meta1.create_all()
+        meta2 = MetaData(testing.db)
+        subject = Table('subject', meta2, autoload=True)
+        referer = Table('referer', meta2, autoload=True)
+        self.assert_((subject.c['id$']
+                     == referer.c.ref).compare(
+                        subject.join(referer).onclause))
+
+    @testing.provide_metadata
+    def test_renamed_sequence_reflection(self):
+        metadata = self.metadata
+        t = Table('t', metadata, Column('id', Integer, primary_key=True))
+        metadata.create_all()
+        m2 = MetaData(testing.db)
+        t2 = Table('t', m2, autoload=True, implicit_returning=False)
+        eq_(t2.c.id.server_default.arg.text,
+            "nextval('t_id_seq'::regclass)")
+        r = t2.insert().execute()
+        eq_(r.inserted_primary_key, [1])
+        testing.db.connect().execution_options(autocommit=True).\
+                execute('alter table t_id_seq rename to foobar_id_seq'
+                )
+        m3 = MetaData(testing.db)
+        t3 = Table('t', m3, autoload=True, implicit_returning=False)
+        eq_(t3.c.id.server_default.arg.text,
+            "nextval('foobar_id_seq'::regclass)")
+        r = t3.insert().execute()
+        eq_(r.inserted_primary_key, [2])
+
+    @testing.provide_metadata
+    def test_schema_reflection(self):
+        """note: this test requires that the 'test_schema' schema be
+        separate and accessible by the test user"""
+
+        meta1 = self.metadata
+
+        users = Table('users', meta1, Column('user_id', Integer,
+                      primary_key=True), Column('user_name',
+                      String(30), nullable=False), schema='test_schema')
+        addresses = Table(
+            'email_addresses',
+            meta1,
+            Column('address_id', Integer, primary_key=True),
+            Column('remote_user_id', Integer,
+                   ForeignKey(users.c.user_id)),
+            Column('email_address', String(20)),
+            schema='test_schema',
+            )
+        meta1.create_all()
+        meta2 = MetaData(testing.db)
+        addresses = Table('email_addresses', meta2, autoload=True,
+                          schema='test_schema')
+        users = Table('users', meta2, mustexist=True,
+                      schema='test_schema')
+        j = join(users, addresses)
+        self.assert_((users.c.user_id
+                     == addresses.c.remote_user_id).compare(j.onclause))
+
+    @testing.provide_metadata
+    def test_schema_reflection_2(self):
+        meta1 = self.metadata
+        subject = Table('subject', meta1, Column('id', Integer,
+                        primary_key=True))
+        referer = Table('referer', meta1, Column('id', Integer,
+                        primary_key=True), Column('ref', Integer,
+                        ForeignKey('subject.id')), schema='test_schema')
+        meta1.create_all()
+        meta2 = MetaData(testing.db)
+        subject = Table('subject', meta2, autoload=True)
+        referer = Table('referer', meta2, schema='test_schema',
+                        autoload=True)
+        self.assert_((subject.c.id
+                     == referer.c.ref).compare(
+                        subject.join(referer).onclause))
+
+    @testing.provide_metadata
+    def test_schema_reflection_3(self):
+        meta1 = self.metadata
+        subject = Table('subject', meta1, Column('id', Integer,
+                        primary_key=True), schema='test_schema_2')
+        referer = Table('referer', meta1, Column('id', Integer,
+                        primary_key=True), Column('ref', Integer,
+                        ForeignKey('test_schema_2.subject.id')),
+                        schema='test_schema')
+        meta1.create_all()
+        meta2 = MetaData(testing.db)
+        subject = Table('subject', meta2, autoload=True,
+                        schema='test_schema_2')
+        referer = Table('referer', meta2, schema='test_schema',
+                        autoload=True)
+        self.assert_((subject.c.id
+                     == referer.c.ref).compare(
+                        subject.join(referer).onclause))
+
+    def test_schema_reflection_multi_search_path(self):
+        """test the 'set the same schema' rule when
+        multiple schemas/search paths are in effect."""
+
+        db = engines.testing_engine()
+        conn = db.connect()
+        trans = conn.begin()
+        try:
+            conn.execute("set search_path to test_schema_2, "
+                                "test_schema, public")
+            conn.dialect.default_schema_name = "test_schema_2"
+
+            conn.execute("""
+            CREATE TABLE test_schema.some_table (
+                id SERIAL not null primary key
+            )
+            """)
+
+            conn.execute("""
+            CREATE TABLE test_schema_2.some_other_table (
+                id SERIAL not null primary key,
+                sid INTEGER REFERENCES test_schema.some_table(id)
+            )
+            """)
+
+            m1 = MetaData()
+
+            t2_schema = Table('some_other_table', 
+                                m1, 
+                                schema="test_schema_2", 
+                                autoload=True, 
+                                autoload_with=conn)
+            t1_schema = Table('some_table', 
+                                m1, 
+                                schema="test_schema", 
+                                autoload=True,
+                                autoload_with=conn)
+
+            t2_no_schema = Table('some_other_table', 
+                                m1, 
+                                autoload=True, 
+                                autoload_with=conn)
+
+            t1_no_schema = Table('some_table', 
+                                m1, 
+                                autoload=True, 
+                                autoload_with=conn)
+
+            # OK, this because, "test_schema" is 
+            # in the search path, and might as well be
+            # the default too.  why would we assign
+            # a "schema" to the Table ?
+            assert t2_schema.c.sid.references(
+                                t1_no_schema.c.id)
+
+            assert t2_no_schema.c.sid.references(
+                                t1_no_schema.c.id)
+
+        finally:
+            trans.rollback()
+            conn.close()
+            db.dispose()
+
+    @testing.provide_metadata
+    def test_index_reflection(self):
+        """ Reflecting partial & expression-based indexes should warn
+        """
+
+        metadata = self.metadata
+
+        t1 = Table('party', metadata, Column('id', String(10),
+                   nullable=False), Column('name', String(20),
+                   index=True), Column('aname', String(20)))
+        metadata.create_all()
+        testing.db.execute("""
+          create index idx1 on party ((id || name))
+        """)
+        testing.db.execute("""
+          create unique index idx2 on party (id) where name = 'test'
+        """)
+        testing.db.execute("""
+            create index idx3 on party using btree
+                (lower(name::text), lower(aname::text))
+        """)
+
+        def go():
+            m2 = MetaData(testing.db)
+            t2 = Table('party', m2, autoload=True)
+            assert len(t2.indexes) == 2
+
+            # Make sure indexes are in the order we expect them in
+
+            tmp = [(idx.name, idx) for idx in t2.indexes]
+            tmp.sort()
+            r1, r2 = [idx[1] for idx in tmp]
+            assert r1.name == 'idx2'
+            assert r1.unique == True
+            assert r2.unique == False
+            assert [t2.c.id] == r1.columns
+            assert [t2.c.name] == r2.columns
+
+        testing.assert_warnings(go,
+            [
+                'Skipped unsupported reflection of '
+                'expression-based index idx1',
+                'Predicate of partial index idx2 ignored during '
+                'reflection',
+                'Skipped unsupported reflection of '
+                'expression-based index idx3'
+            ])
+
+    @testing.provide_metadata
+    def test_index_reflection_modified(self):
+        """reflect indexes when a column name has changed - PG 9 
+        does not update the name of the column in the index def.
+        [ticket:2141]
+
+        """
+
+        metadata = self.metadata
+
+        t1 = Table('t', metadata,
+            Column('id', Integer, primary_key=True),
+            Column('x', Integer)
+        )
+        metadata.create_all()
+        conn = testing.db.connect().execution_options(autocommit=True)
+        conn.execute("CREATE INDEX idx1 ON t (x)")
+        conn.execute("ALTER TABLE t RENAME COLUMN x to y")
+
+        ind = testing.db.dialect.get_indexes(conn, "t", None)
+        eq_(ind, [{'unique': False, 'column_names': [u'y'], 'name': u'idx1'}])
+        conn.close()
 
 class MiscTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL):
 
@@ -1325,24 +1561,6 @@ class MiscTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL):
         assert 'will create implicit sequence' in msgs
         assert 'will create implicit index' in msgs
 
-    def test_pg_weirdchar_reflection(self):
-        meta1 = MetaData(testing.db)
-        subject = Table('subject', meta1, Column('id$', Integer,
-                        primary_key=True))
-        referer = Table('referer', meta1, Column('id', Integer,
-                        primary_key=True), Column('ref', Integer,
-                        ForeignKey('subject.id$')))
-        meta1.create_all()
-        try:
-            meta2 = MetaData(testing.db)
-            subject = Table('subject', meta2, autoload=True)
-            referer = Table('referer', meta2, autoload=True)
-            print str(subject.join(referer).onclause)
-            self.assert_((subject.c['id$']
-                         == referer.c.ref).compare(
-                            subject.join(referer).onclause))
-        finally:
-            meta1.drop_all()
 
     @testing.fails_on('+zxjdbc',
                       "Can't infer the SQL type to use for an instance "
@@ -1369,102 +1587,6 @@ class MiscTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL):
         finally:
             t.drop(checkfirst=True)
 
-    @testing.provide_metadata
-    def test_renamed_sequence_reflection(self):
-        metadata = self.metadata
-        t = Table('t', metadata, Column('id', Integer, primary_key=True))
-        metadata.create_all()
-        m2 = MetaData(testing.db)
-        t2 = Table('t', m2, autoload=True, implicit_returning=False)
-        eq_(t2.c.id.server_default.arg.text,
-            "nextval('t_id_seq'::regclass)")
-        r = t2.insert().execute()
-        eq_(r.inserted_primary_key, [1])
-        testing.db.connect().execution_options(autocommit=True).\
-                execute('alter table t_id_seq rename to foobar_id_seq'
-                )
-        m3 = MetaData(testing.db)
-        t3 = Table('t', m3, autoload=True, implicit_returning=False)
-        eq_(t3.c.id.server_default.arg.text,
-            "nextval('foobar_id_seq'::regclass)")
-        r = t3.insert().execute()
-        eq_(r.inserted_primary_key, [2])
-
-    def test_schema_reflection(self):
-        """note: this test requires that the 'test_schema' schema be
-        separate and accessible by the test user"""
-
-        meta1 = MetaData(testing.db)
-        users = Table('users', meta1, Column('user_id', Integer,
-                      primary_key=True), Column('user_name',
-                      String(30), nullable=False), schema='test_schema')
-        addresses = Table(
-            'email_addresses',
-            meta1,
-            Column('address_id', Integer, primary_key=True),
-            Column('remote_user_id', Integer,
-                   ForeignKey(users.c.user_id)),
-            Column('email_address', String(20)),
-            schema='test_schema',
-            )
-        meta1.create_all()
-        try:
-            meta2 = MetaData(testing.db)
-            addresses = Table('email_addresses', meta2, autoload=True,
-                              schema='test_schema')
-            users = Table('users', meta2, mustexist=True,
-                          schema='test_schema')
-            print users
-            print addresses
-            j = join(users, addresses)
-            print str(j.onclause)
-            self.assert_((users.c.user_id
-                         == addresses.c.remote_user_id).compare(j.onclause))
-        finally:
-            meta1.drop_all()
-
-    def test_schema_reflection_2(self):
-        meta1 = MetaData(testing.db)
-        subject = Table('subject', meta1, Column('id', Integer,
-                        primary_key=True))
-        referer = Table('referer', meta1, Column('id', Integer,
-                        primary_key=True), Column('ref', Integer,
-                        ForeignKey('subject.id')), schema='test_schema')
-        meta1.create_all()
-        try:
-            meta2 = MetaData(testing.db)
-            subject = Table('subject', meta2, autoload=True)
-            referer = Table('referer', meta2, schema='test_schema',
-                            autoload=True)
-            print str(subject.join(referer).onclause)
-            self.assert_((subject.c.id
-                         == referer.c.ref).compare(
-                            subject.join(referer).onclause))
-        finally:
-            meta1.drop_all()
-
-    def test_schema_reflection_3(self):
-        meta1 = MetaData(testing.db)
-        subject = Table('subject', meta1, Column('id', Integer,
-                        primary_key=True), schema='test_schema_2')
-        referer = Table('referer', meta1, Column('id', Integer,
-                        primary_key=True), Column('ref', Integer,
-                        ForeignKey('test_schema_2.subject.id')),
-                        schema='test_schema')
-        meta1.create_all()
-        try:
-            meta2 = MetaData(testing.db)
-            subject = Table('subject', meta2, autoload=True,
-                            schema='test_schema_2')
-            referer = Table('referer', meta2, schema='test_schema',
-                            autoload=True)
-            print str(subject.join(referer).onclause)
-            self.assert_((subject.c.id
-                         == referer.c.ref).compare(
-                            subject.join(referer).onclause))
-        finally:
-            meta1.drop_all()
-
     def test_schema_roundtrips(self):
         meta = MetaData(testing.db)
         users = Table('users', meta, Column('id', Integer,
@@ -1515,76 +1637,6 @@ class MiscTest(fixtures.TestBase, AssertsExecutionResults, AssertsCompiledSQL):
         finally:
             testing.db.execute('drop table speedy_users')
 
-    @testing.provide_metadata
-    def test_index_reflection(self):
-        """ Reflecting partial & expression-based indexes should warn
-        """
-
-        metadata = self.metadata
-
-        t1 = Table('party', metadata, Column('id', String(10),
-                   nullable=False), Column('name', String(20),
-                   index=True), Column('aname', String(20)))
-        metadata.create_all()
-        testing.db.execute("""
-          create index idx1 on party ((id || name))
-        """)
-        testing.db.execute("""
-          create unique index idx2 on party (id) where name = 'test'
-        """)
-        testing.db.execute("""
-            create index idx3 on party using btree
-                (lower(name::text), lower(aname::text))
-        """)
-
-        def go():
-            m2 = MetaData(testing.db)
-            t2 = Table('party', m2, autoload=True)
-            assert len(t2.indexes) == 2
-
-            # Make sure indexes are in the order we expect them in
-
-            tmp = [(idx.name, idx) for idx in t2.indexes]
-            tmp.sort()
-            r1, r2 = [idx[1] for idx in tmp]
-            assert r1.name == 'idx2'
-            assert r1.unique == True
-            assert r2.unique == False
-            assert [t2.c.id] == r1.columns
-            assert [t2.c.name] == r2.columns
-
-        testing.assert_warnings(go,
-            [
-                'Skipped unsupported reflection of '
-                'expression-based index idx1',
-                'Predicate of partial index idx2 ignored during '
-                'reflection',
-                'Skipped unsupported reflection of '
-                'expression-based index idx3'
-            ])
-
-    @testing.provide_metadata
-    def test_index_reflection_modified(self):
-        """reflect indexes when a column name has changed - PG 9 
-        does not update the name of the column in the index def.
-        [ticket:2141]
-
-        """
-
-        metadata = self.metadata
-
-        t1 = Table('t', metadata,
-            Column('id', Integer, primary_key=True),
-            Column('x', Integer)
-        )
-        metadata.create_all()
-        conn = testing.db.connect().execution_options(autocommit=True)
-        conn.execute("CREATE INDEX idx1 ON t (x)")
-        conn.execute("ALTER TABLE t RENAME COLUMN x to y")
-
-        ind = testing.db.dialect.get_indexes(conn, "t", None)
-        eq_(ind, [{'unique': False, 'column_names': [u'y'], 'name': u'idx1'}])
-        conn.close()
 
     @testing.fails_on('+zxjdbc', 'psycopg2/pg8000 specific assertion')
     @testing.fails_on('pypostgresql',
index e37125aef9892cfb044938df993723b16ed9801d..c79e09ca84bbe4ba825d7ca035b1f00ceae627fe 100644 (file)
@@ -963,6 +963,7 @@ class SchemaTest(fixtures.TestBase):
     @testing.fails_on('sybase', 'FIXME: unknown')
     def test_explicit_default_schema(self):
         engine = testing.db
+        engine.connect().close()
 
         if testing.against('sqlite'):
             # Works for CREATE TABLE main.foo, SELECT FROM main.foo, etc.,