- added a check for joining from A->B using join(), along two
different m2m tables. this raises an error in 0.3 but is
possible in 0.4 when aliases are used. [ticket:687]
+- engine
+ - fixed another occasional race condition which could occur
+ when using pool with threadlocal setting
- mssql
- added support for TIME columns (simulated using DATETIME) [ticket:679]
- index names are now quoted when dropping from reflected tables [ticket:684]
return _ConnectionFairy(self).checkout()
try:
- return self._threadconns[thread.get_ident()].connfairy().checkout()
+ return self._threadconns[thread.get_ident()].checkout()
except KeyError:
- agent = _ConnectionFairy(self).checkout()
- self._threadconns[thread.get_ident()] = agent._threadfairy
- return agent
+ agent = _ConnectionFairy(self)
+ self._threadconns[thread.get_ident()] = agent
+ return agent.checkout()
def return_conn(self, agent):
+ if self._use_threadlocal and thread.get_ident() in self._threadconns:
+ del self._threadconns[thread.get_ident()]
self.do_return_conn(agent._connection_record)
def get(self):
self.__pool.log("Error on connect(): %s" % (str(e)))
raise
-class _ThreadFairy(object):
- """Mark a thread identifier as owning a connection, for a thread local pool."""
-
- def __init__(self, connfairy):
- self.connfairy = weakref.ref(connfairy)
-
class _ConnectionFairy(object):
"""Proxy a DBAPI connection object and provides return-on-dereference support."""
def __init__(self, pool):
- self._threadfairy = _ThreadFairy(self)
self._cursors = weakref.WeakKeyDictionary()
self._pool = pool
self.__counter = 0
self._pool.return_conn(self)
self.connection = None
self._connection_record = None
- self._threadfairy = None
self._cursors = None
class _CursorFairy(object):
import testbase
from testbase import PersistTest
-import unittest, sys, os, time
+import unittest, sys, os, time, gc
import threading, thread
import sqlalchemy.pool as pool
assert not con.closed
c1.close()
assert con.closed
-
+
+ def test_threadfairy(self):
+ p = pool.QueuePool(creator = lambda: mock_dbapi.connect('foo.db'), pool_size = 3, max_overflow = -1, use_threadlocal = True)
+ c1 = p.connect()
+ c1.close()
+ c2 = p.connect()
+ assert c2.connection is not None
+
def testthreadlocal_del(self):
self._do_testthreadlocal(useclose=False)
c2.close()
else:
c2 = None
-
+
if useclose:
c1 = p.connect()
c2 = p.connect()
c1 = c2 = c3 = None
- # extra tests with QueuePool to insure connections get __del__()ed when dereferenced
+ # extra tests with QueuePool to ensure connections get __del__()ed when dereferenced
if isinstance(p, pool.QueuePool):
self.assert_(p.checkedout() == 0)
c1 = p.connect()