from sqlalchemy import exc, log, event, events, interfaces, util
from sqlalchemy.util import queue as sqla_queue
-from sqlalchemy.util import threading, pickle, memoized_property, \
+from sqlalchemy.util import threading, memoized_property, \
chop_traceback
proxies = {}
pass
def _serialize(self, *args, **kw):
- return pickle.dumps([args, kw])
+ return tuple(
+ list(args) +
+ [(k, kw[k]) for k in sorted(kw)]
+ )
return dbapi, pool.QueuePool(creator=lambda: dbapi.connect('foo.db'), **kw)
class PoolTest(PoolTestBase):
- def testmanager(self):
+ def test_manager(self):
manager = pool.manage(MockDBAPI(), use_threadlocal=True)
- connection = manager.connect('foo.db')
- connection2 = manager.connect('foo.db')
- connection3 = manager.connect('bar.db')
+ c1 = manager.connect('foo.db')
+ c2 = manager.connect('foo.db')
+ c3 = manager.connect('bar.db')
+ c4 = manager.connect("foo.db", bar="bat")
+ c5 = manager.connect("foo.db", bar="hoho")
+ c6 = manager.connect("foo.db", bar="bat")
- self.assert_(connection.cursor() is not None)
- self.assert_(connection is connection2)
- self.assert_(connection2 is not connection3)
+ assert c1.cursor() is not None
+ assert c1 is c2
+ assert c1 is not c3
+ assert c4 is c6
+ assert c4 is not c5
- def testbadargs(self):
+ def test_bad_args(self):
manager = pool.manage(MockDBAPI())
+ connection = manager.connect(None)
- try:
- connection = manager.connect(None)
- except:
- pass
-
- def testnonthreadlocalmanager(self):
+ def test_non_thread_local_manager(self):
manager = pool.manage(MockDBAPI(), use_threadlocal = False)
connection = manager.connect('foo.db')