+import testbase
from testbase import PersistTest
import unittest, sys, os, time
-from pysqlite2 import dbapi2 as sqlite
import sqlalchemy.pool as pool
-import sqlalchemy.exceptions as exceptions
+import sqlalchemy.exceptions as exceptions
+
+class MockDBAPI(object):
+ def connect(self, argument):
+ return MockConnection()
+class MockConnection(object):
+ def close(self):
+ pass
+ def cursor(self):
+ return MockCursor()
+class MockCursor(object):
+ def close(self):
+ pass
+mock_dbapi = MockDBAPI()
+
class PoolTest(PersistTest):
def setUp(self):
pool.clear_managers()
-
+
def testmanager(self):
- manager = pool.manage(sqlite)
+ manager = pool.manage(mock_dbapi)
connection = manager.connect('foo.db')
connection2 = manager.connect('foo.db')
self.assert_(connection2 is not connection3)
def testbadargs(self):
- manager = pool.manage(sqlite)
+ manager = pool.manage(mock_dbapi)
try:
connection = manager.connect(None)
pass
def testnonthreadlocalmanager(self):
- manager = pool.manage(sqlite, use_threadlocal = False)
+ manager = pool.manage(mock_dbapi, use_threadlocal = False)
connection = manager.connect('foo.db')
connection2 = manager.connect('foo.db')
self._do_testqueuepool(useclose=True)
def _do_testqueuepool(self, useclose=False):
-
- p = pool.QueuePool(creator = lambda: sqlite.connect('foo.db'), pool_size = 3, max_overflow = -1, use_threadlocal = False, echo = False)
+ p = pool.QueuePool(creator = lambda: mock_dbapi.connect('foo.db'), pool_size = 3, max_overflow = -1, use_threadlocal = False, echo = False)
def status(pool):
tup = (pool.size(), pool.checkedin(), pool.overflow(), pool.checkedout())
self.assert_(status(p) == (3, 2, 0, 1))
def test_timeout(self):
- p = pool.QueuePool(creator = lambda: sqlite.connect('foo.db'), pool_size = 3, max_overflow = 0, use_threadlocal = False, echo = False, timeout=2)
+ p = pool.QueuePool(creator = lambda: mock_dbapi.connect('foo.db'), pool_size = 3, max_overflow = 0, use_threadlocal = False, echo = False, timeout=2)
c1 = p.get()
c2 = p.get()
c3 = p.get()
def _do_testthreadlocal(self, useclose=False):
for p in (
- pool.QueuePool(creator = lambda: sqlite.connect('foo.db'), pool_size = 3, max_overflow = -1, use_threadlocal = True, echo = False),
- pool.SingletonThreadPool(creator = lambda: sqlite.connect('foo.db'), use_threadlocal = True)
+ pool.QueuePool(creator = lambda: mock_dbapi.connect('foo.db'), pool_size = 3, max_overflow = -1, use_threadlocal = True, echo = False),
+ pool.SingletonThreadPool(creator = lambda: mock_dbapi.connect('foo.db'), use_threadlocal = True)
):
c1 = p.connect()
c2 = p.connect()
def tearDown(self):
pool.clear_managers()
- for file in ('foo.db', 'bar.db'):
- if os.access(file, os.F_OK):
- os.remove(file)
if __name__ == "__main__":