]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
- Table columns and constraints can be overridden on a
authorMike Bayer <mike_mp@zzzcomputing.com>
Sun, 10 Feb 2008 23:39:09 +0000 (23:39 +0000)
committerMike Bayer <mike_mp@zzzcomputing.com>
Sun, 10 Feb 2008 23:39:09 +0000 (23:39 +0000)
an existing table (such as a table that was already
reflected) using the 'useexisting=True' flag, which now
takes into account the arguments passed along with it.
- fixed one element of [ticket:910]
- refactored reflection test

CHANGES
lib/sqlalchemy/schema.py
test/engine/metadata.py
test/engine/reflection.py
test/testlib/__init__.py
test/testlib/testing.py

diff --git a/CHANGES b/CHANGES
index 90124c05a15254bbad92a281521f3acec7264304..e94826405a15de84511f154a30b1bda36ee4c8d5 100644 (file)
--- a/CHANGES
+++ b/CHANGES
@@ -10,6 +10,11 @@ CHANGES
       MetaData instances and executed automatically when those
       objects are created and/or dropped.
 
+    - Table columns and constraints can be overridden on a 
+      an existing table (such as a table that was already
+      reflected) using the 'useexisting=True' flag, which now
+      takes into account the arguments passed along with it.
+      
     - Added a callable-based DDL events interface, adds hooks
       before and after Tables and MetaData create and drop.
 
index 83f282b2464633155230b7dec10f09d1a788aef9..3a2ff97a7b893fb04db0f1b45b86fee9e4b486ff 100644 (file)
@@ -93,13 +93,14 @@ class _TableSingleton(expression._FigureVisitName):
         key = _get_table_key(name, schema)
         try:
             table = metadata.tables[key]
-            if args:
-                if not useexisting:
-                    raise exceptions.ArgumentError("Table '%s' is already defined for this MetaData instance." % key)
+            if not useexisting and table._cant_override(*args, **kwargs):
+                raise exceptions.InvalidRequestError("Table '%s' is already defined for this MetaData instance.  Specify 'useexisting=True' to redefine options and columns on an existing Table object." % key)
+            else:
+                table._init_existing(*args, **kwargs)
             return table
         except KeyError:
             if mustexist:
-                raise exceptions.ArgumentError("Table '%s' not defined" % (key))
+                raise exceptions.InvalidRequestError("Table '%s' not defined" % (key))
             try:
                 return type.__call__(self, name, metadata, *args, **kwargs)
             except:
@@ -201,26 +202,16 @@ class Table(SchemaItem, expression.TableClause):
         self.primary_key = PrimaryKeyConstraint()
         self._foreign_keys = util.OrderedSet()
         self.ddl_listeners = util.defaultdict(list)
-        self.quote = kwargs.pop('quote', False)
-        self.quote_schema = kwargs.pop('quote_schema', False)
+        self.kwargs = {}
         if self.schema is not None:
             self.fullname = "%s.%s" % (self.schema, self.name)
         else:
             self.fullname = self.name
-        self.owner = kwargs.pop('owner', None)
-        if kwargs.get('info'):
-            self._info = kwargs.pop('info')
 
         autoload = kwargs.pop('autoload', False)
         autoload_with = kwargs.pop('autoload_with', None)
         include_columns = kwargs.pop('include_columns', None)
 
-        # validate remaining kwargs that they all specify DB prefixes
-        if len([k for k in kwargs if not re.match(r'^(?:%s)_' % '|'.join(databases.__all__), k)]):
-            raise TypeError("Invalid argument(s) for Table: %s" % repr(kwargs.keys()))
-
-        self.kwargs = kwargs
-
         self._set_parent(metadata)
         # load column definitions from the database if 'autoload' is defined
         # we do it after the table is in the singleton dictionary to support
@@ -233,8 +224,44 @@ class Table(SchemaItem, expression.TableClause):
 
         # initialize all the column, etc. objects.  done after
         # reflection to allow user-overrides
-        self._init_items(*args)
+        self.__post_init(*args, **kwargs)
+    
+    def _init_existing(self, *args, **kwargs):
+        autoload = kwargs.pop('autoload', False)
+        autoload_with = kwargs.pop('autoload_with', None)
+        schema = kwargs.pop('schema', None)
+        if schema and schema != self.schema:
+            raise exceptions.ArgumentError("Can't change schema of existing table from '%s' to '%s'", (self.schema, schema))
+            
+        include_columns = kwargs.pop('include_columns', None)
+        if include_columns:
+            for c in self.c:
+                if c.name not in include_columns:
+                    self.c.remove(c)
+        self.__post_init(*args, **kwargs)
+    
+    def _cant_override(self, *args, **kwargs):
+        """return True if the given arguments cannot be sent to a table that already exists.
+        
+        the 'useexisting' flag overrides this.
+        """
+
+        return bool(args) or bool(util.Set(kwargs).difference(['autoload', 'autoload_with', 'schema']))
+        
+    def __post_init(self, *args, **kwargs):
+        self.quote = kwargs.pop('quote', False)
+        self.quote_schema = kwargs.pop('quote_schema', False)
+        self.owner = kwargs.pop('owner', None)
+        if kwargs.get('info'):
+            self._info = kwargs.pop('info')
 
+        # validate remaining kwargs that they all specify DB prefixes
+        if len([k for k in kwargs if not re.match(r'^(?:%s)_' % '|'.join(databases.__all__), k)]):
+            raise TypeError("Invalid argument(s) for Table: %s" % repr(kwargs.keys()))
+        self.kwargs.update(kwargs)
+
+        self._init_items(*args)
+        
     def key(self):
         return _get_table_key(self.name, self.schema)
     key = property(key)
@@ -916,9 +943,10 @@ class ForeignKeyConstraint(Constraint):
 
     def _set_parent(self, table):
         self.table = table
-        table.constraints.add(self)
-        for (c, r) in zip(self.__colnames, self.__refcolnames):
-            self.append_element(c,r)
+        if self not in table.constraints:
+            table.constraints.add(self)
+            for (c, r) in zip(self.__colnames, self.__refcolnames):
+                self.append_element(c,r)
 
     def append_element(self, col, refcol):
         fk = ForeignKey(refcol, constraint=self, name=self.name, onupdate=self.onupdate, ondelete=self.ondelete, use_alter=self.use_alter)
@@ -948,8 +976,8 @@ class PrimaryKeyConstraint(Constraint):
     def _set_parent(self, table):
         self.table = table
         table.primary_key = self
-        for c in self.__colnames:
-            self.add(table.c[c])
+        for name in self.__colnames:
+            self.add(table.c[name])
 
     def add(self, col):
         self.columns.add(col)
index 923f0334ef3637e68afd24d070bd64c3f066574b..dd4ee51d30609e18194dca8f691da8d761f31265 100644 (file)
@@ -2,8 +2,9 @@ import testenv; testenv.configure_for_tests()
 from sqlalchemy import *
 from sqlalchemy import exceptions
 from testlib import *
+import pickle
 
-class MetaDataTest(PersistTest):
+class MetaDataTest(PersistTest, ComparesTables):
     def test_metadata_connect(self):
         metadata = MetaData()
         t1 = Table('table1', metadata, Column('col1', Integer, primary_key=True),
@@ -29,10 +30,88 @@ class MetaDataTest(PersistTest):
                 t2 = Table('table1', metadata, Column('col1', Integer, primary_key=True),
                     Column('col2', String(20)))
                 assert False
-            except exceptions.ArgumentError, e:
-                assert str(e) == "Table 'table1' is already defined for this MetaData instance."
+            except exceptions.InvalidRequestError, e:
+                assert str(e) == "Table 'table1' is already defined for this MetaData instance.  Specify 'useexisting=True' to redefine options and columns on an existing Table object."
         finally:
             metadata.drop_all()
 
+    @testing.exclude('mysql', '<', (4, 1, 1))
+    def test_to_metadata(self):
+        meta = MetaData()
+
+        table = Table('mytable', meta,
+            Column('myid', Integer, primary_key=True),
+            Column('name', String(40), nullable=False),
+            Column('description', String(30), CheckConstraint("description='hi'")),
+            UniqueConstraint('name'),
+            test_needs_fk=True,
+        )
+
+        table2 = Table('othertable', meta,
+            Column('id', Integer, primary_key=True),
+            Column('myid', Integer, ForeignKey('mytable.myid')),
+            test_needs_fk=True,
+            )
+
+        def test_to_metadata():
+            meta2 = MetaData()
+            table_c = table.tometadata(meta2)
+            table2_c = table2.tometadata(meta2)
+            return (table_c, table2_c)
+
+        def test_pickle():
+            meta.bind = testing.db
+            meta2 = pickle.loads(pickle.dumps(meta))
+            assert meta2.bind is None
+            meta3 = pickle.loads(pickle.dumps(meta2))
+            return (meta2.tables['mytable'], meta2.tables['othertable'])
+
+        def test_pickle_via_reflect():
+            # this is the most common use case, pickling the results of a
+            # database reflection
+            meta2 = MetaData(bind=testing.db)
+            t1 = Table('mytable', meta2, autoload=True)
+            t2 = Table('othertable', meta2, autoload=True)
+            meta3 = pickle.loads(pickle.dumps(meta2))
+            assert meta3.bind is None
+            assert meta3.tables['mytable'] is not t1
+            return (meta3.tables['mytable'], meta3.tables['othertable'])
+
+        meta.create_all(testing.db)
+        try:
+            for test, has_constraints in ((test_to_metadata, True), (test_pickle, True), (test_pickle_via_reflect, False)):
+                table_c, table2_c = test()
+                self.assert_tables_equal(table, table_c)
+                self.assert_tables_equal(table2, table2_c)
+
+                assert table is not table_c
+                assert table.primary_key is not table_c.primary_key
+                assert list(table2_c.c.myid.foreign_keys)[0].column is table_c.c.myid
+                assert list(table2_c.c.myid.foreign_keys)[0].column is not table.c.myid
+
+                # constraints dont get reflected for any dialect right now
+                if has_constraints:
+                    for c in table_c.c.description.constraints:
+                        if isinstance(c, CheckConstraint):
+                            break
+                    else:
+                        assert False
+                    assert c.sqltext=="description='hi'"
+
+                    for c in table_c.constraints:
+                        if isinstance(c, UniqueConstraint):
+                            break
+                    else:
+                        assert False
+                    assert c.columns.contains_column(table_c.c.name)
+                    assert not c.columns.contains_column(table.c.name)
+        finally:
+            meta.drop_all(testing.db)
+
+    def test_nonexistent(self):
+        self.assertRaises(exceptions.NoSuchTableError, Table,
+                          'fake_table',
+                          MetaData(testing.db), autoload=True)
+
 if __name__ == '__main__':
     testenv.main()
index 2bbbea1aab474b356c218b1d56e3b0d27b861e33..3aa6630eb321f17bb045d1673b1458d408e8c366 100644 (file)
@@ -1,5 +1,5 @@
 import testenv; testenv.configure_for_tests()
-import pickle, StringIO, unicodedata
+import StringIO, unicodedata
 from sqlalchemy import *
 from sqlalchemy import exceptions
 from sqlalchemy import types as sqltypes
@@ -7,54 +7,25 @@ from testlib import *
 from testlib import engines
 
 
-class ReflectionTest(PersistTest):
+class ReflectionTest(PersistTest, ComparesTables):
 
     @testing.exclude('mysql', '<', (4, 1, 1))
-    def testbasic(self):
-        use_function_defaults = testing.against('postgres', 'oracle', 'maxdb')
-
-        use_string_defaults = (use_function_defaults or
-                               testing.db.engine.__module__.endswith('sqlite'))
-
-        if use_function_defaults:
-            defval = func.current_date()
-            deftype = Date
-        else:
-            defval = "3"
-            deftype = Integer
-
-        if use_string_defaults:
-            deftype2 = Text
-            defval2 = "im a default"
-            deftype3 = Date
-            if testing.against('oracle'):
-                defval3 = text("to_date('09-09-1999', 'MM-DD-YYYY')")
-            elif testing.against('maxdb'):
-                defval3 = '19990909'
-            else:
-                defval3 = '1999-09-09'
-        else:
-            deftype2, deftype3 = Integer, Integer
-            defval2, defval3 = "15", "16"
-
+    def test_basic_reflection(self):
         meta = MetaData(testing.db)
 
         users = Table('engine_users', meta,
-            Column('user_id', INT, primary_key = True),
-            Column('user_name', VARCHAR(20), nullable = False),
-            Column('test1', CHAR(5), nullable = False),
-            Column('test2', FLOAT(5), nullable = False),
+            Column('user_id', INT, primary_key=True),
+            Column('user_name', VARCHAR(20), nullable=False),
+            Column('test1', CHAR(5), nullable=False),
+            Column('test2', Float(5), nullable=False),
             Column('test3', Text),
-            Column('test4', DECIMAL, nullable = False),
-            Column('test5', TIMESTAMP),
+            Column('test4', Numeric, nullable = False),
+            Column('test5', DateTime),
             Column('parent_user_id', Integer, ForeignKey('engine_users.user_id')),
-            Column('test6', DateTime, nullable = False),
+            Column('test6', DateTime, nullable=False),
             Column('test7', Text),
             Column('test8', Binary),
-            Column('test_passivedefault', deftype, PassiveDefault(defval)),
             Column('test_passivedefault2', Integer, PassiveDefault("5")),
-            Column('test_passivedefault3', deftype2, PassiveDefault(defval2)),
-            Column('test_passivedefault4', deftype3, PassiveDefault(defval3)),
             Column('test9', Binary(100)),
             Column('test10', Boolean),
             Column('test_numeric', Numeric(None, None)),
@@ -67,119 +38,41 @@ class ReflectionTest(PersistTest):
             Column('email_address', String(20)),
             test_needs_fk=True,
         )
-        meta.drop_all()
-
-        users.create()
-        addresses.create()
-
-        # clear out table registry
-        meta.clear()
-
-        try:
-            addresses = Table('engine_email_addresses', meta, autoload = True)
-            # reference the addresses foreign key col, which will require users to be
-            # reflected at some point
-            users = Table('engine_users', meta, autoload = True)
-            assert users.c.user_id in users.primary_key
-            assert len(users.primary_key) == 1
-        finally:
-            addresses.drop()
-            users.drop()
-
-        # a hack to remove the defaults we got reflecting from postgres
-        # SERIAL columns, since they reference sequences that were just dropped.
-        # PG 8.1 doesnt want to create them if the underlying sequence doesnt exist
-        users.c.user_id.default = None
-        addresses.c.address_id.default = None
+        meta.create_all()
 
-        users.create()
-        addresses.create()
         try:
-            print users
-            print addresses
-            j = join(users, addresses)
-            print str(j.onclause)
-            self.assert_((users.c.user_id==addresses.c.remote_user_id).compare(j.onclause))
+            meta2 = MetaData()
+            reflected_users = Table('engine_users', meta2, autoload=True, autoload_with=testing.db)
+            reflected_addresses = Table('engine_email_addresses', meta2, autoload=True, autoload_with=testing.db)
+            self.assert_tables_equal(users, reflected_users)
+            self.assert_tables_equal(addresses, reflected_addresses)
         finally:
             addresses.drop()
             users.drop()
 
-    def test_autoload_partial(self):
+    def test_include_columns(self):
         meta = MetaData(testing.db)
-        foo = Table('foo', meta,
-            Column('a', String(30)),
-            Column('b', String(30)),
-            Column('c', String(30)),
-            Column('d', String(30)),
-            Column('e', String(30)),
-            Column('f', String(30)),
-            )
+        foo = Table('foo', meta, *[Column(n, String(30)) for n in ['a', 'b', 'c', 'd', 'e', 'f']])
         meta.create_all()
         try:
             meta2 = MetaData(testing.db)
-            foo2 = Table('foo', meta2, autoload=True, include_columns=['b', 'f', 'e'])
+            foo = Table('foo', meta2, autoload=True, include_columns=['b', 'f', 'e'])
             # test that cols come back in original order
-            assert [c.name for c in foo2.c] == ['b', 'e', 'f']
+            self.assertEquals([c.name for c in foo.c], ['b', 'e', 'f'])
             for c in ('b', 'f', 'e'):
-                assert c in foo2.c
+                assert c in foo.c
             for c in ('a', 'c', 'd'):
-                assert c not in foo2.c
-        finally:
-            meta.drop_all()
-
-    def test_override_create_fkcols(self):
-        """test that you can override columns and create new foreign keys to other reflected tables
-        which have no foreign keys.  this is common with MySQL MyISAM tables."""
-
-        meta = MetaData(testing.db)
-        users = Table('users', meta,
-            Column('id', Integer, primary_key=True),
-            Column('name', String(30)))
-        addresses = Table('addresses', meta,
-            Column('id', Integer, primary_key=True),
-            Column('street', String(30)),
-            Column('user_id', Integer))
-
-        meta.create_all()
-        try:
-            meta2 = MetaData(testing.db)
-            a2 = Table('addresses', meta2,
-                Column('user_id', Integer, ForeignKey('users.id')),
-                autoload=True)
-            u2 = Table('users', meta2, autoload=True)
-
-            assert len(a2.c.user_id.foreign_keys) == 1
-            assert len(a2.foreign_keys) == 1
-            assert [c.parent for c in a2.foreign_keys] == [a2.c.user_id]
-            assert [c.parent for c in a2.c.user_id.foreign_keys] == [a2.c.user_id]
-            assert list(a2.c.user_id.foreign_keys)[0].parent is a2.c.user_id
-            assert u2.join(a2).onclause == u2.c.id==a2.c.user_id
-
+                assert c not in foo.c
+                
+            # test against a table which is already reflected
             meta3 = MetaData(testing.db)
-            u3 = Table('users', meta3, autoload=True)
-            a3 = Table('addresses', meta3,
-                Column('user_id', Integer, ForeignKey('users.id')),
-                autoload=True)
-
-            assert u3.join(a3).onclause == u3.c.id==a3.c.user_id
-
-            meta4 = MetaData(testing.db)
-            u4 = Table('users', meta4,
-                       Column('id', Integer, key='u_id', primary_key=True),
-                       autoload=True)
-            a4 = Table('addresses', meta4,
-                       Column('id', Integer, key='street', primary_key=True),
-                       Column('street', String(30), key='user_id'),
-                       Column('user_id', Integer, ForeignKey('users.u_id'),
-                              key='id'),
-                       autoload=True)
-
-            assert u4.join(a4).onclause.compare(u4.c.u_id==a4.c.id)
-            assert list(u4.primary_key) == [u4.c.u_id]
-            assert len(u4.columns) == 2
-            assert len(u4.constraints) == 1
-            assert len(a4.columns) == 3
-            assert len(a4.constraints) == 2
+            foo = Table('foo', meta3, autoload=True)
+            foo = Table('foo', meta3, include_columns=['b', 'f', 'e'], useexisting=True)
+            self.assertEquals([c.name for c in foo.c], ['b', 'e', 'f'])
+            for c in ('b', 'f', 'e'):
+                assert c in foo.c
+            for c in ('a', 'c', 'd'):
+                assert c not in foo.c
         finally:
             meta.drop_all()
 
@@ -221,9 +114,33 @@ class ReflectionTest(PersistTest):
             dialect_module.ischema_names = ischema_names
             t.drop()
 
-    def test_override_fkandpkcol(self):
+    def test_basic_override(self):
+        meta = MetaData(testing.db)
+        table = Table(
+            'override_test', meta,
+            Column('col1', Integer, primary_key=True),
+            Column('col2', String(20)),
+            Column('col3', Numeric)
+        )
+        table.create()
+
+        meta2 = MetaData(testing.db)
+        try:
+            table = Table(
+                'override_test', meta2,
+                Column('col2', Unicode()),
+                Column('col4', String(30)), autoload=True)
+
+            self.assert_(isinstance(table.c.col1.type, Integer))
+            self.assert_(isinstance(table.c.col2.type, Unicode))
+            self.assert_(isinstance(table.c.col4.type, String))
+        finally:
+            table.drop()
+
+    def test_override_pkfk(self):
         """test that you can override columns which contain foreign keys to other reflected tables,
         where the foreign key column is also a primary key column"""
+
         meta = MetaData(testing.db)
         users = Table('users', meta,
             Column('id', Integer, primary_key=True),
@@ -237,7 +154,7 @@ class ReflectionTest(PersistTest):
         try:
             meta2 = MetaData(testing.db)
             a2 = Table('addresses', meta2,
-                Column('id', Integer, ForeignKey('users.id'), primary_key=True),
+                Column('id', Integer, ForeignKey('users.id'), primary_key=True),
                 autoload=True)
             u2 = Table('users', meta2, autoload=True)
 
@@ -245,15 +162,6 @@ class ReflectionTest(PersistTest):
             assert list(u2.primary_key) == [u2.c.id]
             assert u2.join(a2).onclause == u2.c.id==a2.c.id
 
-            # heres what was originally failing, because a2's primary key
-            # had two "id" columns, one of which was not part of a2's "c" collection
-            #class Address(object):pass
-            #mapper(Address, a2)
-            #add1 = Address()
-            #sess = create_session()
-            #sess.save(add1)
-            #sess.flush()
-
             meta3 = MetaData(testing.db)
             u3 = Table('users', meta3, autoload=True)
             a3 = Table('addresses', meta3,
@@ -267,7 +175,63 @@ class ReflectionTest(PersistTest):
         finally:
             meta.drop_all()
 
-    def test_override_existing_fkcols(self):
+    def test_override_nonexistent_fk(self):
+        """test that you can override columns and create new foreign keys to other reflected tables
+        which have no foreign keys.  this is common with MySQL MyISAM tables."""
+
+        meta = MetaData(testing.db)
+        users = Table('users', meta,
+            Column('id', Integer, primary_key=True),
+            Column('name', String(30)))
+        addresses = Table('addresses', meta,
+            Column('id', Integer, primary_key=True),
+            Column('street', String(30)),
+            Column('user_id', Integer))
+
+        meta.create_all()
+        try:
+            meta2 = MetaData(testing.db)
+            a2 = Table('addresses', meta2,
+                Column('user_id', Integer, ForeignKey('users.id')),
+                autoload=True)
+            u2 = Table('users', meta2, autoload=True)
+
+            assert len(a2.c.user_id.foreign_keys) == 1
+            assert len(a2.foreign_keys) == 1
+            assert [c.parent for c in a2.foreign_keys] == [a2.c.user_id]
+            assert [c.parent for c in a2.c.user_id.foreign_keys] == [a2.c.user_id]
+            assert list(a2.c.user_id.foreign_keys)[0].parent is a2.c.user_id
+            assert u2.join(a2).onclause == u2.c.id==a2.c.user_id
+
+            meta3 = MetaData(testing.db)
+            u3 = Table('users', meta3, autoload=True)
+            a3 = Table('addresses', meta3,
+                Column('user_id', Integer, ForeignKey('users.id')),
+                autoload=True)
+
+            assert u3.join(a3).onclause == u3.c.id==a3.c.user_id
+
+            meta4 = MetaData(testing.db)
+            u4 = Table('users', meta4,
+                       Column('id', Integer, key='u_id', primary_key=True),
+                       autoload=True)
+            a4 = Table('addresses', meta4,
+                       Column('id', Integer, key='street', primary_key=True),
+                       Column('street', String(30), key='user_id'),
+                       Column('user_id', Integer, ForeignKey('users.u_id'),
+                              key='id'),
+                       autoload=True)
+
+            assert u4.join(a4).onclause.compare(u4.c.u_id==a4.c.id)
+            assert list(u4.primary_key) == [u4.c.u_id]
+            assert len(u4.columns) == 2
+            assert len(u4.constraints) == 1
+            assert len(a4.columns) == 3
+            assert len(a4.constraints) == 2
+        finally:
+            meta.drop_all()
+
+    def test_override_existing_fk(self):
         """test that you can override columns and specify new foreign keys to other reflected tables,
         on columns which *do* already have that foreign key, and that the FK is not duped.
         """
@@ -282,7 +246,6 @@ class ReflectionTest(PersistTest):
             Column('user_id', Integer, ForeignKey('users.id')),
             test_needs_fk=True)
 
-
         meta.create_all()
         try:
             meta2 = MetaData(testing.db)
@@ -315,13 +278,45 @@ class ReflectionTest(PersistTest):
             assert [c.parent for c in a2.c.user_id.foreign_keys] == [a2.c.user_id]
             assert list(a2.c.user_id.foreign_keys)[0].parent is a2.c.user_id
             assert u2.join(a2).onclause == u2.c.id==a2.c.user_id
-
-
         finally:
             meta.drop_all()
+    
+    def test_use_existing(self):
+        meta = MetaData(testing.db)
+        users = Table('users', meta,
+            Column('id', Integer, primary_key=True),
+            Column('name', String(30)),
+            test_needs_fk=True)
+        addresses = Table('addresses', meta,
+            Column('id', Integer,primary_key=True),
+            Column('user_id', Integer, ForeignKey('users.id')),
+            Column('data', String(100)),
+            test_needs_fk=True)
 
+        meta.create_all()
+        try:
+            meta2 = MetaData(testing.db)
+            addresses = Table('addresses', meta2, Column('data', Unicode), autoload=True)
+            try:
+                users = Table('users', meta2, Column('name', Unicode), autoload=True)
+                assert False
+            except exceptions.InvalidRequestError, err:
+                assert str(err) == "Table 'users' is already defined for this MetaData instance.  Specify 'useexisting=True' to redefine options and columns on an existing Table object."
+            
+            users = Table('users', meta2, Column('name', Unicode), autoload=True, useexisting=True)
+            assert isinstance(users.c.name.type, Unicode)
+
+            assert not users.quote
+            
+            users = Table('users', meta2, quote=True, autoload=True, useexisting=True)
+            assert users.quote
+            
+        finally:
+            meta.drop_all()
+            
     def test_pks_not_uniques(self):
         """test that primary key reflection not tripped up by unique indexes"""
+
         testing.db.execute("""
         CREATE TABLE book (
             id INTEGER NOT NULL,
@@ -355,6 +350,7 @@ class ReflectionTest(PersistTest):
 
     def test_composite_pks(self):
         """test reflection of a composite primary key"""
+
         testing.db.execute("""
         CREATE TABLE book (
             id INTEGER NOT NULL,
@@ -380,7 +376,7 @@ class ReflectionTest(PersistTest):
         """test reflection of composite foreign keys"""
 
         meta = MetaData(testing.db)
-        table = Table(
+        multi = Table(
             'multi', meta,
             Column('multi_id', Integer, primary_key=True),
             Column('multi_rev', Integer, primary_key=True),
@@ -389,7 +385,7 @@ class ReflectionTest(PersistTest):
             Column('val', String(100)),
             test_needs_fk=True,
         )
-        table2 = Table('multi2', meta,
+        multi2 = Table('multi2', meta,
             Column('id', Integer, primary_key=True),
             Column('foo', Integer),
             Column('bar', Integer),
@@ -398,128 +394,19 @@ class ReflectionTest(PersistTest):
             ForeignKeyConstraint(['foo', 'bar', 'lala'], ['multi.multi_id', 'multi.multi_rev', 'multi.multi_hoho']),
             test_needs_fk=True,
         )
-        assert table.c.multi_hoho
         meta.create_all()
-        meta.clear()
 
         try:
-            table = Table('multi', meta, autoload=True)
-            table2 = Table('multi2', meta, autoload=True)
-
-            print table
-            print table2
+            meta2 = MetaData()
+            table = Table('multi', meta2, autoload=True, autoload_with=testing.db)
+            table2 = Table('multi2', meta2, autoload=True, autoload_with=testing.db)
+            self.assert_tables_equal(multi, table)
+            self.assert_tables_equal(multi2, table2)
             j = join(table, table2)
-            print str(j.onclause)
             self.assert_(and_(table.c.multi_id==table2.c.foo, table.c.multi_rev==table2.c.bar, table.c.multi_hoho==table2.c.lala).compare(j.onclause))
-
         finally:
             meta.drop_all()
 
-    @testing.exclude('mysql', '<', (4, 1, 1))
-    def test_to_metadata(self):
-        meta = MetaData()
-
-        table = Table('mytable', meta,
-            Column('myid', Integer, primary_key=True),
-            Column('name', String(40), nullable=False),
-            Column('description', String(30), CheckConstraint("description='hi'")),
-            UniqueConstraint('name'),
-            test_needs_fk=True,
-        )
-
-        table2 = Table('othertable', meta,
-            Column('id', Integer, primary_key=True),
-            Column('myid', Integer, ForeignKey('mytable.myid')),
-            test_needs_fk=True,
-            )
-
-        def test_to_metadata():
-            meta2 = MetaData()
-            table_c = table.tometadata(meta2)
-            table2_c = table2.tometadata(meta2)
-            return (table_c, table2_c)
-
-        def test_pickle():
-            meta.bind = testing.db
-            meta2 = pickle.loads(pickle.dumps(meta))
-            assert meta2.bind is None
-            meta3 = pickle.loads(pickle.dumps(meta2))
-            return (meta2.tables['mytable'], meta2.tables['othertable'])
-
-        def test_pickle_via_reflect():
-            # this is the most common use case, pickling the results of a
-            # database reflection
-            meta2 = MetaData(bind=testing.db)
-            t1 = Table('mytable', meta2, autoload=True)
-            t2 = Table('othertable', meta2, autoload=True)
-            meta3 = pickle.loads(pickle.dumps(meta2))
-            assert meta3.bind is None
-            assert meta3.tables['mytable'] is not t1
-            return (meta3.tables['mytable'], meta3.tables['othertable'])
-
-        meta.create_all(testing.db)
-        try:
-            for test, has_constraints in ((test_to_metadata, True), (test_pickle, True), (test_pickle_via_reflect, False)):
-                table_c, table2_c = test()
-                assert table is not table_c
-                assert table_c.c.myid.primary_key
-                assert isinstance(table_c.c.myid.type, Integer)
-                assert isinstance(table_c.c.name.type, String)
-                assert not table_c.c.name.nullable
-                assert table_c.c.description.nullable
-                assert table.primary_key is not table_c.primary_key
-                assert [x.name for x in table.primary_key] == [x.name for x in table_c.primary_key]
-                assert list(table2_c.c.myid.foreign_keys)[0].column is table_c.c.myid
-                assert list(table2_c.c.myid.foreign_keys)[0].column is not table.c.myid
-
-                # constraints dont get reflected for any dialect right now
-                if has_constraints:
-                    for c in table_c.c.description.constraints:
-                        if isinstance(c, CheckConstraint):
-                            break
-                    else:
-                        assert False
-                    assert c.sqltext=="description='hi'"
-
-                    for c in table_c.constraints:
-                        if isinstance(c, UniqueConstraint):
-                            break
-                    else:
-                        assert False
-                    assert c.columns.contains_column(table_c.c.name)
-                    assert not c.columns.contains_column(table.c.name)
-        finally:
-            meta.drop_all(testing.db)
-
-    def test_nonexistent(self):
-        self.assertRaises(exceptions.NoSuchTableError, Table,
-                          'fake_table',
-                          MetaData(testing.db), autoload=True)
-
-    def testoverride(self):
-        meta = MetaData(testing.db)
-        table = Table(
-            'override_test', meta,
-            Column('col1', Integer, primary_key=True),
-            Column('col2', String(20)),
-            Column('col3', Numeric)
-        )
-        table.create()
-        # clear out table registry
-
-        meta2 = MetaData(testing.db)
-        try:
-            table = Table(
-                'override_test', meta2,
-                Column('col2', Unicode()),
-                Column('col4', String(30)), autoload=True)
-
-            print repr(table)
-            self.assert_(isinstance(table.c.col1.type, Integer))
-            self.assert_(isinstance(table.c.col2.type, Unicode))
-            self.assert_(isinstance(table.c.col4.type, String))
-        finally:
-            table.drop()
 
     @testing.unsupported('oracle')
     def testreserved(self):
@@ -552,7 +439,6 @@ class ReflectionTest(PersistTest):
 
         index_c = Index('else', table_c.c.join)
 
-        #meta.bind.echo = True
         meta.create_all()
 
         index_c.drop()
@@ -701,7 +587,21 @@ class CreateDropTest(PersistTest):
         self.assert_(not Set(metadata.tables) - Set(testing.db.table_names()))
         metadata.drop_all(bind=testing.db)
 
-class UnicodeTest(PersistTest):
+class SchemaManipulationTest(PersistTest):
+    def test_append_constraint_unique(self):
+        meta = MetaData()
+        
+        users = Table('users', meta, Column('id', Integer))
+        addresses = Table('addresses', meta, Column('id', Integer), Column('user_id', Integer))
+        
+        fk = ForeignKeyConstraint(['user_id'],[users.c.id])
+        
+        addresses.append_constraint(fk)
+        addresses.append_constraint(fk)
+        assert len(addresses.c.user_id.foreign_keys) == 1
+        assert addresses.constraints == set([addresses.primary_key, fk])
+        
+class UnicodeReflectionTest(PersistTest):
 
     def test_basic(self):
         try:
@@ -740,7 +640,6 @@ class UnicodeTest(PersistTest):
 
 class SchemaTest(PersistTest):
 
-    # this test should really be in the sql tests somewhere, not engine
     def test_iteration(self):
         metadata = MetaData()
         table1 = Table('table1', metadata,
index 49ef0ca8a347fe9e88db9e16b8226480601b0a81..1cb3647e21daea4e98033696bbffedfe5c0eeb44 100644 (file)
@@ -8,7 +8,7 @@ from testlib.schema import Table, Column
 from testlib.orm import mapper
 import testlib.testing as testing
 from testlib.testing import rowset
-from testlib.testing import PersistTest, AssertMixin, ORMTest, SQLCompileTest
+from testlib.testing import PersistTest, AssertMixin, ORMTest, SQLCompileTest, ComparesTables
 import testlib.profiling as profiling
 import testlib.engines as engines
 from testlib.compat import set, frozenset, sorted, _function_named
@@ -18,6 +18,6 @@ __all__ = ('testing',
            'mapper',
            'Table', 'Column',
            'rowset',
-           'PersistTest', 'AssertMixin', 'ORMTest', 'SQLCompileTest',
+           'PersistTest', 'AssertMixin', 'ORMTest', 'SQLCompileTest', 'ComparesTables',
            'profiling', 'engines',
            'set', 'frozenset', 'sorted', '_function_named')
index 3b7b2992e28341eb8f4d646a9dfb63d09084e39c..5c37f95bd9143eeb9da97364e5d9c015d0e8155b 100644 (file)
@@ -7,10 +7,10 @@ from cStringIO import StringIO
 import testlib.config as config
 from testlib.compat import *
 
-sql, MetaData, clear_mappers, Session, util = None, None, None, None, None
+sql, sqltypes, schema, MetaData, clear_mappers, Session, util = None, None, None, None, None, None, None
 sa_exceptions = None
 
-__all__ = ('PersistTest', 'AssertMixin', 'ORMTest', 'SQLCompileTest')
+__all__ = ('PersistTest', 'AssertMixin', 'ComparesTables', 'ORMTest', 'SQLCompileTest')
 
 _ops = { '<': operator.lt,
          '>': operator.gt,
@@ -485,10 +485,41 @@ class SQLCompileTest(PersistTest):
         if checkparams is not None:
             self.assertEquals(c.construct_params(params), checkparams)
 
+class ComparesTables(object):
+    def assert_tables_equal(self, table, reflected_table):
+        global sqltypes, schema
+        if sqltypes is None:
+            import sqlalchemy.types as sqltypes
+        if schema is None:
+            import sqlalchemy.schema as schema
+        base_mro = sqltypes.TypeEngine.__mro__
+        assert len(table.c) == len(reflected_table.c)
+        for c, reflected_c in zip(table.c, reflected_table.c):
+            self.assertEquals(c.name, reflected_c.name)
+            assert reflected_c is reflected_table.c[c.name]
+            self.assertEquals(c.primary_key, reflected_c.primary_key)
+            self.assertEquals(c.nullable, reflected_c.nullable)
+            assert len(
+                set(type(reflected_c.type).__mro__).difference(base_mro).intersection(
+                set(type(c.type).__mro__).difference(base_mro)
+                )
+            ) > 0, "Type '%s' doesn't correspond to type '%s'" % (reflected_c.type, c.type)
+            
+            if isinstance(c.type, sqltypes.String):
+                self.assertEquals(c.type.length, reflected_c.type.length)
+
+            self.assertEquals(set([f.column.name for f in c.foreign_keys]), set([f.column.name for f in reflected_c.foreign_keys]))
+            if c.default:
+                assert isinstance(reflected_c.default, schema.PassiveDefault)
+            elif not c.primary_key or not against('postgres'):
+                assert reflected_c.default is None
+        
+        assert len(table.primary_key) == len(reflected_table.primary_key)
+        for c in table.primary_key:
+            assert reflected_table.primary_key.columns[c.name]
+
+    
 class AssertMixin(PersistTest):
-    """given a list-based structure of keys/properties which represent information within an object structure, and
-    a list of actual objects, asserts that the list of objects corresponds to the structure."""
-
     def assert_result(self, result, class_, *objects):
         result = list(result)
         print repr(result)