if getattr(self, "_closed", True):
return
- self._stop_workers(timeout=5.0)
+ workers = self._signal_stop_worker()
+ gather(*workers, timeout=5.0)
def _check_open_getconn(self) -> None:
super()._check_open_getconn()
self._closed = True
logger.debug("pool %r closed", self.name)
- # Stop the worker, wait for the threads to finish.
- self._stop_workers(timeout=timeout)
+ # Take waiting client and pool connections out of the state
+ waiting = list(self._waiting)
+ self._waiting.clear()
+ connections = list(self._pool)
+ self._pool.clear()
- # Signal to eventual clients in the queue that business is closed.
- while self._waiting:
- pos = self._waiting.pop()
- pos.fail(PoolClosed(f"the pool {self.name!r} is closed"))
+ # Take the workers out of the pool. Will stop them outside the lock
+ workers = self._signal_stop_worker()
- # Close the connections still in the pool
- while self._pool:
- conn = self._pool.pop()
- conn.close()
+ # Now that the flag _closed is set, getconn will fail immediately,
+ # putconn will just close the returned connection.
- # Now that the flag _closed is set, getconn will fail immediately,
- # putconn will just close the returned connection.
+ # Wait for the worker tasks to terminate
+ gather(*workers, timeout=timeout)
- def _stop_workers(self, timeout: float | None = None) -> None:
+ # Close the connections that were still in the pool
+ for conn in connections:
+ conn.close()
+
+ # Signal to eventual clients in the queue that business is closed.
+ for pos in waiting:
+ pos.fail(PoolClosed(f"the pool {self.name!r} is closed"))
+
+ def _signal_stop_worker(self) -> list[Worker]:
# Stop the scheduler
self._sched.enter(0, None)
workers.append(self._sched_runner)
self._sched_runner = None
- # Wait for the worker tasks to terminate
- gather(*workers, timeout=timeout)
+ return workers
def __enter__(self) -> Self:
self._open_implicit = False
# Also disable the warning for open connection in conn.__del__
conn._pool = None
+ # Early bailout in case the pool is closed. Don't add anything to the
+ # state. There is still a remote chance that the pool will be closed
+ # between here and entering the lock. Therefore we will make another
+ # check later.
+ if self._closed:
+ conn.close()
+ return
+
# Critical section: if there is a client waiting give it the connection
# otherwise put it back into the pool.
with self._lock:
+ # Check if the pool was closed by the time we arrived here. It is
+ # unlikely but it doesn't seem impossible, if the worker was adding
+ # this connection while the main process is closing the pool.
+ # Now that we are in the critical section we know for real.
+ if self._closed:
+ conn.close()
+ return
+
while self._waiting:
# If there is a client waiting (which is still waiting and
# hasn't timed out), give it the connection and notify it.
if getattr(self, "_closed", True):
return
- self._stop_workers(timeout=5.0)
+ workers = self._signal_stop_worker()
+ agather(*workers, timeout=5.0)
def _check_open_getconn(self) -> None:
super()._check_open_getconn()
self._closed = True
logger.debug("pool %r closed", self.name)
- # Stop the worker, wait for the threads to finish.
- await self._stop_workers(timeout=timeout)
+ # Take waiting client and pool connections out of the state
+ waiting = list(self._waiting)
+ self._waiting.clear()
+ connections = list(self._pool)
+ self._pool.clear()
- # Signal to eventual clients in the queue that business is closed.
- while self._waiting:
- pos = self._waiting.pop()
- await pos.fail(PoolClosed(f"the pool {self.name!r} is closed"))
-
- # Close the connections still in the pool
- while self._pool:
- conn = self._pool.pop()
- await conn.close()
+ # Take the workers out of the pool. Will stop them outside the lock
+ workers = await self._signal_stop_worker()
# Now that the flag _closed is set, getconn will fail immediately,
# putconn will just close the returned connection.
- async def _stop_workers(self, timeout: float | None = None) -> None:
+ # Wait for the worker tasks to terminate
+ await agather(*workers, timeout=timeout)
+
+ # Close the connections that were still in the pool
+ for conn in connections:
+ await conn.close()
+
+ # Signal to eventual clients in the queue that business is closed.
+ for pos in waiting:
+ await pos.fail(PoolClosed(f"the pool {self.name!r} is closed"))
+
+ async def _signal_stop_worker(self) -> list[AWorker]:
# Stop the scheduler
await self._sched.enter(0, None)
workers.append(self._sched_runner)
self._sched_runner = None
- # Wait for the worker tasks to terminate
- await agather(*workers, timeout=timeout)
+ return workers
async def __aenter__(self) -> Self:
self._open_implicit = False
# Also disable the warning for open connection in conn.__del__
conn._pool = None
+ # Early bailout in case the pool is closed. Don't add anything to the
+ # state. There is still a remote chance that the pool will be closed
+ # between here and entering the lock. Therefore we will make another
+ # check later.
+ if self._closed:
+ await conn.close()
+ return
+
# Critical section: if there is a client waiting give it the connection
# otherwise put it back into the pool.
async with self._lock:
+
+ # Check if the pool was closed by the time we arrived here. It is
+ # unlikely but it doesn't seem impossible, if the worker was adding
+ # this connection while the main process is closing the pool.
+ # Now that we are in the critical section we know for real.
+ if self._closed:
+ await conn.close()
+ return
+
while self._waiting:
# If there is a client waiting (which is still waiting and
# hasn't timed out), give it the connection and notify it.