-import threading, time
-from sqlalchemy import pool, interfaces, select, event
+import threading
+import time
+from sqlalchemy import pool, select, event
import sqlalchemy as tsa
from sqlalchemy import testing
from sqlalchemy.testing.util import gc_collect, lazy_gc
def test_bad_args(self):
manager = pool.manage(MockDBAPI())
- connection = manager.connect(None)
+ manager.connect(None)
def test_non_thread_local_manager(self):
- manager = pool.manage(MockDBAPI(), use_threadlocal = False)
+ manager = pool.manage(MockDBAPI(), use_threadlocal=False)
connection = manager.connect('foo.db')
connection2 = manager.connect('foo.db')
engine.execute(select([1])).close()
eq_(
- canary, ["listen_one","listen_four", "listen_two","listen_three"]
+ canary,
+ ["listen_one", "listen_four", "listen_two", "listen_three"]
)
def test_listen_targets_per_subclass(self):
try:
c4 = p.connect()
assert False
- except tsa.exc.TimeoutError, e:
+ except tsa.exc.TimeoutError:
assert int(time.time() - now) == 2
def test_timeout_race(self):
# them back to the start of do_get()
dbapi = MockDBAPI()
p = pool.QueuePool(
- creator = lambda: dbapi.connect(delay=.05),
- pool_size = 2,
- max_overflow = 1, use_threadlocal = False, timeout=3)
+ creator=lambda: dbapi.connect(delay=.05),
+ pool_size=2,
+ max_overflow=1, use_threadlocal=False, timeout=3)
timeouts = []
def checkout():
for x in xrange(1):
now = time.time()
try:
c1 = p.connect()
- except tsa.exc.TimeoutError, e:
+ except tsa.exc.TimeoutError:
timeouts.append(time.time() - now)
continue
time.sleep(4)
assert t >= 3, "Not all timeouts were >= 3 seconds %r" % timeouts
# normally, the timeout should under 4 seconds,
# but on a loaded down buildbot it can go up.
- assert t < 10, "Not all timeouts were < 10 seconds %r" % timeouts
+ assert t < 14, "Not all timeouts were < 14 seconds %r" % timeouts
def _test_overflow(self, thread_count, max_overflow):
gc_collect()
for i in range(2):
t = threading.Thread(target=waiter, args=(p, ))
- t.setDaemon(True) # so the tests dont hang if this fails
+ t.setDaemon(True) # so the tests dont hang if this fails
t.start()
c1.invalidate()
assert id(c3.connection) != c_id
def test_invalidate(self):
- p = self._queuepool_fixture(pool_size=1, max_overflow=0)
+ p = self._queuepool_fixture(pool_size=1, max_overflow=0)
c1 = p.connect()
c_id = c1.connection.id
c1.close()
class AssertionPoolTest(PoolTestBase):
def test_connect_error(self):
dbapi = MockDBAPI()
- p = pool.AssertionPool(creator = lambda: dbapi.connect('foo.db'))
+ p = pool.AssertionPool(creator=lambda: dbapi.connect('foo.db'))
c1 = p.connect()
assert_raises(AssertionError, p.connect)
def test_connect_multiple(self):
dbapi = MockDBAPI()
- p = pool.AssertionPool(creator = lambda: dbapi.connect('foo.db'))
+ p = pool.AssertionPool(creator=lambda: dbapi.connect('foo.db'))
c1 = p.connect()
c1.close()
c2 = p.connect()
class NullPoolTest(PoolTestBase):
def test_reconnect(self):
dbapi = MockDBAPI()
- p = pool.NullPool(creator = lambda: dbapi.connect('foo.db'))
+ p = pool.NullPool(creator=lambda: dbapi.connect('foo.db'))
c1 = p.connect()
c_id = c1.connection.id
- c1.close(); c1=None
+ c1.close()
+ c1 = None
c1 = p.connect()
dbapi.raise_error = True