From: Mike Bayer Date: Fri, 1 Jul 2005 05:07:16 +0000 (+0000) Subject: Initial revision X-Git-Tag: rel_0_1_0~920 X-Git-Url: http://git.ipfire.org/gitweb.cgi?a=commitdiff_plain;h=17b1723c3a483646c2ac18a622aaa697af2358c3;p=thirdparty%2Fsqlalchemy%2Fsqlalchemy.git Initial revision --- diff --git a/lib/sqlalchemy/databases/__init__.py b/lib/sqlalchemy/databases/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/test/pool.py b/test/pool.py new file mode 100644 index 0000000000..6de08412d5 --- /dev/null +++ b/test/pool.py @@ -0,0 +1,72 @@ +from testbase import PersistTest +import unittest, sys, os + +from pysqlite2 import dbapi2 as sqlite +import sqlalchemy.pool as pool + +class PoolTest(PersistTest): + + def setUp(self): + pool.clear_managers() + + def testmanager(self): + manager = pool.manage(sqlite) + + connection = manager.connect('foo.db') + connection2 = manager.connect('foo.db') + connection3 = manager.connect('bar.db') + + print "connection " + repr(connection) + self.assert_(connection.cursor() is not None) + self.assert_(connection is connection2) + self.assert_(connection2 is not connection3) + + def testnonthreadlocalmanager(self): + manager = pool.manage(sqlite, use_threadlocal = False) + + connection = manager.connect('foo.db') + connection2 = manager.connect('foo.db') + + print "connection " + repr(connection) + + self.assert_(connection.cursor() is not None) + self.assert_(connection is not connection2) + + def testqueuepool(self): + p = pool.QueuePool(creator = lambda: sqlite.connect('foo.db'), pool_size = 3, max_overflow = -1, use_threadlocal = False, echo = False) + + def status(pool): + tup = (pool.size(), pool.checkedin(), pool.overflow(), pool.checkedout()) + print "Pool size: %d Connections in pool: %d Current Overflow: %d Current Checked out connections: %d" % tup + return tup + + c1 = p.connect() + self.assert_(status(p) == (3,0,-2,1)) + c2 = p.connect() + self.assert_(status(p) == (3,0,-1,2)) + c3 = p.connect() + self.assert_(status(p) == (3,0,0,3)) + c4 = p.connect() + self.assert_(status(p) == (3,0,1,4)) + c5 = p.connect() + self.assert_(status(p) == (3,0,2,5)) + c6 = p.connect() + self.assert_(status(p) == (3,0,3,6)) + c4 = c3 = c2 = None + self.assert_(status(p) == (3,3,3,3)) + c1 = c5 = c6 = None + self.assert_(status(p) == (3,3,0,0)) + c1 = p.connect() + c2 = p.connect() + self.assert_(status(p) == (3, 1, 0, 2)) + c2 = None + self.assert_(status(p) == (3, 2, 0, 1)) + + def tearDown(self): + for file in ('foo.db', 'bar.db'): + if os.access(file, os.F_OK): + os.remove(file) + + +if __name__ == "__main__": + unittest.main()