if self._is_disconnect:
del self._is_disconnect
dbapi_conn_wrapper = self.connection
+ self.engine.pool._invalidate(dbapi_conn_wrapper, e)
self.invalidate(e)
- self.engine.pool._invalidate(dbapi_conn_wrapper)
if self.should_close_with_result:
self.close()
return _ConnectionRecord(self)
- def _invalidate(self, connection):
+ def _invalidate(self, connection, exception=None):
"""Mark all connections established within the generation
of the given connection as invalidated.
rec = getattr(connection, "_connection_record", None)
if not rec or self._invalidate_time < rec.starttime:
self._invalidate_time = time.time()
+ if getattr(connection, 'is_valid', False):
+ connection.invalidate(exception)
def recreate(self):
"""
if self.connection is None:
- raise exc.InvalidRequestError("This connection is closed")
+ util.warn("Can't invalidate an already-closed connection.")
+ return
if self._connection_record:
self._connection_record.invalidate(e=e)
self.connection = None
gc_collect()
dbapi = MockDBAPI()
+ mutex = threading.Lock()
def creator():
time.sleep(.05)
- return dbapi.connect()
+ with mutex:
+ return dbapi.connect()
p = pool.QueuePool(creator=creator,
pool_size=3, timeout=2,
# two conns
time.sleep(.2)
p._invalidate(c2)
- c2.invalidate()
for t in threads:
t.join(join_timeout)
p1 = pool.QueuePool(creator=creator,
pool_size=1, timeout=None,
max_overflow=0)
- #p2 = pool.NullPool(creator=creator2)
def waiter(p):
conn = p.connect()
canary.append(2)
time.sleep(.5)
eq_(canary, [1])
- c1.invalidate()
+ # this also calls invalidate()
+ # on c1
p1._invalidate(c1)
for t in threads: