]> git.ipfire.org Git - thirdparty/sqlalchemy/sqlalchemy.git/commitdiff
added separate test for metadata
authorMike Bayer <mike_mp@zzzcomputing.com>
Mon, 11 Jun 2007 19:31:19 +0000 (19:31 +0000)
committerMike Bayer <mike_mp@zzzcomputing.com>
Mon, 11 Jun 2007 19:31:19 +0000 (19:31 +0000)
some cleanup to query.py unit test
added settable "engine" property on MetaData

lib/sqlalchemy/schema.py
test/engine/alltests.py
test/engine/metadata.py [new file with mode: 0644]
test/sql/defaults.py
test/sql/query.py

index 492210061e867005062eed4585a4d6bc4ff6672d..b651774a245bee1570b6d476525677a66662a6fe 100644 (file)
@@ -1122,6 +1122,16 @@ class MetaData(SchemaItem):
         else:
             self._engine = engine_or_url
 
+    def _get_engine(self):
+        # we are checking is_bound() because _engine wires 
+        # into SchemaItem's _engine mechanism, which raises an error,
+        # whereas we just want to return None.
+        if not self.is_bound():
+            return None
+        return self._engine
+
+    engine = property(_get_engine, connect)
+    
     def clear(self):
         self.tables.clear()
 
@@ -1181,10 +1191,6 @@ class MetaData(SchemaItem):
     def _derived_metadata(self):
         return self
 
-    def _get_engine(self):
-        if not self.is_bound():
-            return None
-        return self._engine
 
 class BoundMetaData(MetaData):
     """``MetaData`` for which the first argument is a required Engine, url string, or URL instance.
@@ -1241,7 +1247,8 @@ thread-local basis.
             return self.context._engine
         else:
             return None
-    engine=property(_get_engine)
+
+    engine = property(_get_engine, connect)
 
 class SchemaVisitor(sql.ClauseVisitor):
     """Define the visiting for ``SchemaItem`` objects."""
index 5b96bc1148df397e7f68be14a2f9e77df2ecfac0..722f06256d592503b56cf885c138a73da78f6372 100644 (file)
@@ -9,6 +9,7 @@ def suite():
         'engine.pool', 
         'engine.reconnect',
         'engine.execute',
+        'engine.metadata',
         'engine.transaction',
         
         # schema/tables
diff --git a/test/engine/metadata.py b/test/engine/metadata.py
new file mode 100644 (file)
index 0000000..95b601c
--- /dev/null
@@ -0,0 +1,32 @@
+import testbase
+from sqlalchemy import *
+
+class MetaDataTest(testbase.PersistTest):
+    def test_global_metadata(self):
+         t1 = Table('table1', Column('col1', Integer, primary_key=True),
+             Column('col2', String(20)))
+         t2 = Table('table2', Column('col1', Integer, primary_key=True),
+             Column('col2', String(20)))
+
+         assert t1.c.col1
+         global_connect(testbase.db)
+         default_metadata.create_all()
+         try:
+             assert t1.count().scalar() == 0
+         finally:
+             default_metadata.drop_all()
+             default_metadata.clear()
+
+    def test_metadata_connect(self):
+        metadata = MetaData()
+        t1 = Table('table1', metadata, Column('col1', Integer, primary_key=True),
+            Column('col2', String(20)))
+        metadata.engine = testbase.db
+        metadata.create_all()
+        try:
+            assert t1.count().scalar() == 0
+        finally:
+            metadata.drop_all()
+    
+if __name__ == '__main__':
+    testbase.main()
\ No newline at end of file
index 992a3afc03e23402dfaf4f212e9b18fc12dc06fb..9867a52f36c9d656405bbfa6de17676335761891 100644 (file)
@@ -125,6 +125,33 @@ class DefaultTest(PersistTest):
         l = l.fetchone()
         self.assert_(l['col3'] == 55)
 
+    @testbase.supported('postgres')
+    def testpassiveoverride(self):
+        """primarily for postgres, tests that when we get a primary key column back 
+        from reflecting a table which has a default value on it, we pre-execute
+        that PassiveDefault upon insert, even though PassiveDefault says 
+        "let the database execute this", because in postgres we must have all the primary
+        key values in memory before insert; otherwise we cant locate the just inserted row."""
+
+        try:
+            meta = BoundMetaData(testbase.db)
+            testbase.db.execute("""
+             CREATE TABLE speedy_users
+             (
+                 speedy_user_id   SERIAL     PRIMARY KEY,
+
+                 user_name        VARCHAR    NOT NULL,
+                 user_password    VARCHAR    NOT NULL
+             );
+            """, None)
+
+            t = Table("speedy_users", meta, autoload=True)
+            t.insert().execute(user_name='user', user_password='lala')
+            l = t.select().execute().fetchall()
+            self.assert_(l == [(1, 'user', 'lala')])
+        finally:
+            testbase.db.execute("drop table speedy_users", None)
+
 class AutoIncrementTest(PersistTest):
     @testbase.supported('postgres', 'mysql')
     def testnonautoincrement(self):
index d788544c096646a6404ef0f1cdcbd39a978633f8..5d5a734108b530ba63bd97607028410e95d9fa20 100644 (file)
@@ -24,91 +24,49 @@ class QueryTest(PersistTest):
             Column('address', String(30)))
         metadata.create_all()
     
-    def setUp(self):
-        self.users = users
     def tearDown(self):
-        self.users.delete().execute()
+        users.delete().execute()
     
     def tearDownAll(self):
         metadata.drop_all()
         
     def testinsert(self):
-        self.users.insert().execute(user_id = 7, user_name = 'jack')
-        print repr(self.users.select().execute().fetchall())
+        users.insert().execute(user_id = 7, user_name = 'jack')
+        assert users.count().scalar() == 1
         
     def testupdate(self):
 
-        self.users.insert().execute(user_id = 7, user_name = 'jack')
-        print repr(self.users.select().execute().fetchall())
+        users.insert().execute(user_id = 7, user_name = 'jack')
+        assert users.count().scalar() == 1
 
-        self.users.update(self.users.c.user_id == 7).execute(user_name = 'fred')
-        print repr(self.users.select().execute().fetchall())
+        users.update(users.c.user_id == 7).execute(user_name = 'fred')
+        assert users.select(users.c.user_id==7).execute().fetchone()['user_name'] == 'fred'
 
     def testrowiteration(self):
-        self.users.insert().execute(user_id = 7, user_name = 'jack')
-        self.users.insert().execute(user_id = 8, user_name = 'ed')
-        self.users.insert().execute(user_id = 9, user_name = 'fred')
-        r = self.users.select().execute()
+        users.insert().execute(user_id = 7, user_name = 'jack')
+        users.insert().execute(user_id = 8, user_name = 'ed')
+        users.insert().execute(user_id = 9, user_name = 'fred')
+        r = users.select().execute()
         l = []
         for row in r:
             l.append(row)
         self.assert_(len(l) == 3)
 
     def test_fetchmany(self):
-        self.users.insert().execute(user_id = 7, user_name = 'jack') 
-        self.users.insert().execute(user_id = 8, user_name = 'ed') 
-        self.users.insert().execute(user_id = 9, user_name = 'fred') 
-        r = self.users.select().execute() 
+        users.insert().execute(user_id = 7, user_name = 'jack') 
+        users.insert().execute(user_id = 8, user_name = 'ed') 
+        users.insert().execute(user_id = 9, user_name = 'fred') 
+        r = users.select().execute() 
         l = [] 
         for row in r.fetchmany(size=2): 
             l.append(row) 
         self.assert_(len(l) == 2, "fetchmany(size=2) got %s rows" % len(l))
         
     def test_compiled_execute(self):
-        s = select([self.users], self.users.c.user_id==bindparam('id')).compile()
+        users.insert().execute(user_id = 7, user_name = 'jack') 
+        s = select([users], users.c.user_id==bindparam('id')).compile()
         c = testbase.db.connect()
-        print repr(c.execute(s, id=7).fetchall())
-    def test_global_metadata(self):
-        t1 = Table('table1', Column('col1', Integer, primary_key=True),
-            Column('col2', String(20)))
-        t2 = Table('table2', Column('col1', Integer, primary_key=True),
-            Column('col2', String(20)))
-   
-        assert t1.c.col1
-        global_connect(testbase.db)
-        default_metadata.create_all()
-        try:
-            assert t1.count().scalar() == 0
-        finally:
-            default_metadata.drop_all()
-            default_metadata.clear()
-    @testbase.supported('postgres')
-    def testpassiveoverride(self):
-        """primarily for postgres, tests that when we get a primary key column back 
-        from reflecting a table which has a default value on it, we pre-execute
-        that PassiveDefault upon insert, even though PassiveDefault says 
-        "let the database execute this", because in postgres we must have all the primary
-        key values in memory before insert; otherwise we cant locate the just inserted row."""
-        try:
-            meta = BoundMetaData(testbase.db)
-            testbase.db.execute("""
-             CREATE TABLE speedy_users
-             (
-                 speedy_user_id   SERIAL     PRIMARY KEY,
-            
-                 user_name        VARCHAR    NOT NULL,
-                 user_password    VARCHAR    NOT NULL
-             );
-            """, None)
-            
-            t = Table("speedy_users", meta, autoload=True)
-            t.insert().execute(user_name='user', user_password='lala')
-            l = t.select().execute().fetchall()
-            self.assert_(l == [(1, 'user', 'lala')])
-        finally:
-            testbase.db.execute("drop table speedy_users", None)
+        assert c.execute(s, id=7).fetchall()[0]['user_id'] == 7
 
     @testbase.supported('postgres')
     def testschema(self):
@@ -133,61 +91,61 @@ class QueryTest(PersistTest):
     def test_repeated_bindparams(self):
         """test that a BindParam can be used more than once.  
         this should be run for dbs with both positional and named paramstyles."""
-        self.users.insert().execute(user_id = 7, user_name = 'jack')
-        self.users.insert().execute(user_id = 8, user_name = 'fred')
+        users.insert().execute(user_id = 7, user_name = 'jack')
+        users.insert().execute(user_id = 8, user_name = 'fred')
 
         u = bindparam('userid')
-        s = self.users.select(or_(self.users.c.user_name==u, self.users.c.user_name==u))
+        s = users.select(or_(users.c.user_name==u, users.c.user_name==u))
         r = s.execute(userid='fred').fetchall()
         assert len(r) == 1
     
     def test_bindparam_shortname(self):
         """test the 'shortname' field on BindParamClause."""
-        self.users.insert().execute(user_id = 7, user_name = 'jack')
-        self.users.insert().execute(user_id = 8, user_name = 'fred')
+        users.insert().execute(user_id = 7, user_name = 'jack')
+        users.insert().execute(user_id = 8, user_name = 'fred')
         u = bindparam('userid', shortname='someshortname')
-        s = self.users.select(self.users.c.user_name==u)
+        s = users.select(users.c.user_name==u)
         r = s.execute(someshortname='fred').fetchall()
         assert len(r) == 1
         
     def testdelete(self):
-        self.users.insert().execute(user_id = 7, user_name = 'jack')
-        self.users.insert().execute(user_id = 8, user_name = 'fred')
-        print repr(self.users.select().execute().fetchall())
+        users.insert().execute(user_id = 7, user_name = 'jack')
+        users.insert().execute(user_id = 8, user_name = 'fred')
+        print repr(users.select().execute().fetchall())
 
-        self.users.delete(self.users.c.user_name == 'fred').execute()
+        users.delete(users.c.user_name == 'fred').execute()
         
-        print repr(self.users.select().execute().fetchall())
+        print repr(users.select().execute().fetchall())
         
     def testselectlimit(self):
-        self.users.insert().execute(user_id=1, user_name='john')
-        self.users.insert().execute(user_id=2, user_name='jack')
-        self.users.insert().execute(user_id=3, user_name='ed')
-        self.users.insert().execute(user_id=4, user_name='wendy')
-        self.users.insert().execute(user_id=5, user_name='laura')
-        self.users.insert().execute(user_id=6, user_name='ralph')
-        self.users.insert().execute(user_id=7, user_name='fido')
-        r = self.users.select(limit=3, order_by=[self.users.c.user_id]).execute().fetchall()
+        users.insert().execute(user_id=1, user_name='john')
+        users.insert().execute(user_id=2, user_name='jack')
+        users.insert().execute(user_id=3, user_name='ed')
+        users.insert().execute(user_id=4, user_name='wendy')
+        users.insert().execute(user_id=5, user_name='laura')
+        users.insert().execute(user_id=6, user_name='ralph')
+        users.insert().execute(user_id=7, user_name='fido')
+        r = users.select(limit=3, order_by=[users.c.user_id]).execute().fetchall()
         self.assert_(r == [(1, 'john'), (2, 'jack'), (3, 'ed')], repr(r))
         
     @testbase.unsupported('mssql')
     def testselectlimitoffset(self):
-        self.users.insert().execute(user_id=1, user_name='john')
-        self.users.insert().execute(user_id=2, user_name='jack')
-        self.users.insert().execute(user_id=3, user_name='ed')
-        self.users.insert().execute(user_id=4, user_name='wendy')
-        self.users.insert().execute(user_id=5, user_name='laura')
-        self.users.insert().execute(user_id=6, user_name='ralph')
-        self.users.insert().execute(user_id=7, user_name='fido')
-        r = self.users.select(limit=3, offset=2, order_by=[self.users.c.user_id]).execute().fetchall()
+        users.insert().execute(user_id=1, user_name='john')
+        users.insert().execute(user_id=2, user_name='jack')
+        users.insert().execute(user_id=3, user_name='ed')
+        users.insert().execute(user_id=4, user_name='wendy')
+        users.insert().execute(user_id=5, user_name='laura')
+        users.insert().execute(user_id=6, user_name='ralph')
+        users.insert().execute(user_id=7, user_name='fido')
+        r = users.select(limit=3, offset=2, order_by=[users.c.user_id]).execute().fetchall()
         self.assert_(r==[(3, 'ed'), (4, 'wendy'), (5, 'laura')])
-        r = self.users.select(offset=5, order_by=[self.users.c.user_id]).execute().fetchall()
+        r = users.select(offset=5, order_by=[users.c.user_id]).execute().fetchall()
         self.assert_(r==[(6, 'ralph'), (7, 'fido')])
         
     @testbase.supported('mssql')
     def testselectlimitoffset_mssql(self):
         try:
-            r = self.users.select(limit=3, offset=2, order_by=[self.users.c.user_id]).execute().fetchall()
+            r = users.select(limit=3, offset=2, order_by=[users.c.user_id]).execute().fetchall()
             assert False # InvalidRequestError should have been raised
         except exceptions.InvalidRequestError:
             pass
@@ -210,29 +168,29 @@ class QueryTest(PersistTest):
             datetable.drop()
             
     def test_column_accessor(self):
-        self.users.insert().execute(user_id=1, user_name='john')
-        self.users.insert().execute(user_id=2, user_name='jack')
-        r = self.users.select(self.users.c.user_id==2).execute().fetchone()
-        self.assert_(r.user_id == r['user_id'] == r[self.users.c.user_id] == 2)
-        self.assert_(r.user_name == r['user_name'] == r[self.users.c.user_name] == 'jack')
+        users.insert().execute(user_id=1, user_name='john')
+        users.insert().execute(user_id=2, user_name='jack')
+        r = users.select(users.c.user_id==2).execute().fetchone()
+        self.assert_(r.user_id == r['user_id'] == r[users.c.user_id] == 2)
+        self.assert_(r.user_name == r['user_name'] == r[users.c.user_name] == 'jack')
 
         r = text("select * from query_users where user_id=2", engine=testbase.db).execute().fetchone()
-        self.assert_(r.user_id == r['user_id'] == r[self.users.c.user_id] == 2)
-        self.assert_(r.user_name == r['user_name'] == r[self.users.c.user_name] == 'jack')
+        self.assert_(r.user_id == r['user_id'] == r[users.c.user_id] == 2)
+        self.assert_(r.user_name == r['user_name'] == r[users.c.user_name] == 'jack')
         
     def test_keys(self):
-        self.users.insert().execute(user_id=1, user_name='foo')
-        r = self.users.select().execute().fetchone()
+        users.insert().execute(user_id=1, user_name='foo')
+        r = users.select().execute().fetchone()
         self.assertEqual([x.lower() for x in r.keys()], ['user_id', 'user_name'])
 
     def test_items(self):
-        self.users.insert().execute(user_id=1, user_name='foo')
-        r = self.users.select().execute().fetchone()
+        users.insert().execute(user_id=1, user_name='foo')
+        r = users.select().execute().fetchone()
         self.assertEqual([(x[0].lower(), x[1]) for x in r.items()], [('user_id', 1), ('user_name', 'foo')])
 
     def test_len(self):
-        self.users.insert().execute(user_id=1, user_name='foo')
-        r = self.users.select().execute().fetchone()
+        users.insert().execute(user_id=1, user_name='foo')
+        r = users.select().execute().fetchone()
         self.assertEqual(len(r), 2)
         r.close()
         r = testbase.db.execute('select user_name, user_id from query_users', {}).fetchone()
@@ -330,8 +288,8 @@ class QueryTest(PersistTest):
         
     def test_column_order_with_simple_query(self):
         # should return values in column definition order
-        self.users.insert().execute(user_id=1, user_name='foo')
-        r = self.users.select(self.users.c.user_id==1).execute().fetchone()
+        users.insert().execute(user_id=1, user_name='foo')
+        r = users.select(users.c.user_id==1).execute().fetchone()
         self.assertEqual(r[0], 1)
         self.assertEqual(r[1], 'foo')
         self.assertEqual([x.lower() for x in r.keys()], ['user_id', 'user_name'])
@@ -339,7 +297,7 @@ class QueryTest(PersistTest):
         
     def test_column_order_with_text_query(self):
         # should return values in query order
-        self.users.insert().execute(user_id=1, user_name='foo')
+        users.insert().execute(user_id=1, user_name='foo')
         r = testbase.db.execute('select user_name, user_id from query_users', {}).fetchone()
         self.assertEqual(r[0], 'foo')
         self.assertEqual(r[1], 1)