import testenv; testenv.configure_for_tests()
import threading, time, gc
-from sqlalchemy import pool, interfaces
+from sqlalchemy import pool, interfaces, create_engine
import testlib.sa as tsa
-from testlib import TestBase
+from testlib import TestBase, testing
mcid = 1
def __init__(self):
if hasattr(self, 'connect'):
self.connect = self.inst_connect
+ if hasattr(self, 'first_connect'):
+ self.first_connect = self.inst_first_connect
if hasattr(self, 'checkout'):
self.checkout = self.inst_checkout
if hasattr(self, 'checkin'):
self.clear()
def clear(self):
self.connected = []
+ self.first_connected = []
self.checked_out = []
self.checked_in = []
- def assert_total(innerself, conn, cout, cin):
+ def assert_total(innerself, conn, fconn, cout, cin):
self.assert_(len(innerself.connected) == conn)
+ self.assert_(len(innerself.first_connected) == fconn)
self.assert_(len(innerself.checked_out) == cout)
self.assert_(len(innerself.checked_in) == cin)
- def assert_in(innerself, item, in_conn, in_cout, in_cin):
+ def assert_in(innerself, item, in_conn, in_fconn, in_cout, in_cin):
self.assert_((item in innerself.connected) == in_conn)
+ self.assert_((item in innerself.first_connected) == in_fconn)
self.assert_((item in innerself.checked_out) == in_cout)
self.assert_((item in innerself.checked_in) == in_cin)
def inst_connect(self, con, record):
assert con is not None
assert record is not None
self.connected.append(con)
+ def inst_first_connect(self, con, record):
+ print "first_connect(%s, %s)" % (con, record)
+ assert con is not None
+ assert record is not None
+ self.first_connected.append(con)
def inst_checkout(self, con, record, proxy):
print "checkout(%s, %s, %s)" % (con, record, proxy)
assert con is not None
class ListenConnect(InstrumentingListener):
def connect(self, con, record):
pass
+ class ListenFirstConnect(InstrumentingListener):
+ def first_connect(self, con, record):
+ pass
class ListenCheckOut(InstrumentingListener):
def checkout(self, con, record, proxy, num):
pass
return pool.QueuePool(creator=lambda: dbapi.connect('foo.db'),
use_threadlocal=False, **kw)
- def assert_listeners(p, total, conn, cout, cin):
+ def assert_listeners(p, total, conn, fconn, cout, cin):
for instance in (p, p.recreate()):
self.assert_(len(instance.listeners) == total)
self.assert_(len(instance._on_connect) == conn)
+ self.assert_(len(instance._on_first_connect) == fconn)
self.assert_(len(instance._on_checkout) == cout)
self.assert_(len(instance._on_checkin) == cin)
p = _pool()
- assert_listeners(p, 0, 0, 0, 0)
+ assert_listeners(p, 0, 0, 0, 0, 0)
p.add_listener(ListenAll())
- assert_listeners(p, 1, 1, 1, 1)
+ assert_listeners(p, 1, 1, 1, 1, 1)
p.add_listener(ListenConnect())
- assert_listeners(p, 2, 2, 1, 1)
+ assert_listeners(p, 2, 2, 1, 1, 1)
+
+ p.add_listener(ListenFirstConnect())
+ assert_listeners(p, 3, 2, 2, 1, 1)
p.add_listener(ListenCheckOut())
- assert_listeners(p, 3, 2, 2, 1)
+ assert_listeners(p, 4, 2, 2, 2, 1)
p.add_listener(ListenCheckIn())
- assert_listeners(p, 4, 2, 2, 2)
+ assert_listeners(p, 5, 2, 2, 2, 2)
del p
print "----"
snoop = ListenAll()
p = _pool(listeners=[snoop])
- assert_listeners(p, 1, 1, 1, 1)
+ assert_listeners(p, 1, 1, 1, 1, 1)
c = p.connect()
- snoop.assert_total(1, 1, 0)
+ snoop.assert_total(1, 1, 1, 0)
cc = c.connection
- snoop.assert_in(cc, True, True, False)
+ snoop.assert_in(cc, True, True, True, False)
c.close()
- snoop.assert_in(cc, True, True, True)
+ snoop.assert_in(cc, True, True, True, True)
del c, cc
snoop.clear()
# this one depends on immediate gc
c = p.connect()
cc = c.connection
- snoop.assert_in(cc, False, True, False)
- snoop.assert_total(0, 1, 0)
+ snoop.assert_in(cc, False, False, True, False)
+ snoop.assert_total(0, 0, 1, 0)
del c, cc
- snoop.assert_total(0, 1, 1)
+ snoop.assert_total(0, 0, 1, 1)
p.dispose()
snoop.clear()
c = p.connect()
c.close()
c = p.connect()
- snoop.assert_total(1, 2, 1)
+ snoop.assert_total(1, 0, 2, 1)
c.close()
- snoop.assert_total(1, 2, 2)
+ snoop.assert_total(1, 0, 2, 2)
# invalidation
p.dispose()
snoop.clear()
c = p.connect()
- snoop.assert_total(1, 1, 0)
+ snoop.assert_total(1, 0, 1, 0)
c.invalidate()
- snoop.assert_total(1, 1, 1)
+ snoop.assert_total(1, 0, 1, 1)
c.close()
- snoop.assert_total(1, 1, 1)
+ snoop.assert_total(1, 0, 1, 1)
del c
- snoop.assert_total(1, 1, 1)
+ snoop.assert_total(1, 0, 1, 1)
c = p.connect()
- snoop.assert_total(2, 2, 1)
+ snoop.assert_total(2, 0, 2, 1)
c.close()
del c
- snoop.assert_total(2, 2, 2)
+ snoop.assert_total(2, 0, 2, 2)
# detached
p.dispose()
snoop.clear()
c = p.connect()
- snoop.assert_total(1, 1, 0)
+ snoop.assert_total(1, 0, 1, 0)
c.detach()
- snoop.assert_total(1, 1, 0)
+ snoop.assert_total(1, 0, 1, 0)
c.close()
del c
- snoop.assert_total(1, 1, 0)
+ snoop.assert_total(1, 0, 1, 0)
c = p.connect()
- snoop.assert_total(2, 2, 0)
+ snoop.assert_total(2, 0, 2, 0)
c.close()
del c
- snoop.assert_total(2, 2, 1)
+ snoop.assert_total(2, 0, 2, 1)
def test_listeners_callables(self):
dbapi = MockDBAPI()
c.close()
assert counts == [1, 2, 3]
+ def test_listener_after_oninit(self):
+ """Test that listeners are called after OnInit is removed"""
+ called = []
+ def listener(*args):
+ called.append(True)
+ listener.connect = listener
+ engine = create_engine(testing.db.url)
+ engine.pool.add_listener(listener)
+ engine.execute('select 1')
+ assert called, "Listener not called on connect"
+
+
class QueuePoolTest(PoolTestBase):
def testqueuepool_del(self):