- Raise a `RuntimeWarning` instead of a `DeprecationWarning` if an async pool
is open in the constructor.
+- Fix connections possibly left in the pool after closing (:ticket:`#784`).
Current release
from time import monotonic
from types import TracebackType
from typing import Any, Iterator, cast, Dict, Generic, List
-from typing import Optional, Sequence, Type
+from typing import Optional, Type
from weakref import ref
from contextlib import contextmanager
if getattr(self, "_closed", True):
return
- self._stop_workers()
+ self._stop_workers(timeout=5.0)
def _check_open_getconn(self) -> None:
super()._check_open_getconn()
self._closed = True
logger.debug("pool %r closed", self.name)
- # Take waiting client and pool connections out of the state
- waiting = list(self._waiting)
- self._waiting.clear()
- connections = list(self._pool)
- self._pool.clear()
+ # Stop the worker, wait for the threads to finish.
+ self._stop_workers(timeout=timeout)
- # Now that the flag _closed is set, getconn will fail immediately,
- # putconn will just close the returned connection.
- self._stop_workers(waiting, connections, timeout)
+ # Signal to eventual clients in the queue that business is closed.
+ while self._waiting:
+ pos = self._waiting.pop()
+ pos.fail(PoolClosed(f"the pool {self.name!r} is closed"))
- def _stop_workers(
- self,
- waiting_clients: Sequence[WaitingClient[CT]] = (),
- connections: Sequence[CT] = (),
- timeout: float | None = None,
- ) -> None:
+ # Close the connections still in the pool
+ while self._pool:
+ conn = self._pool.pop()
+ conn.close()
+
+ # Now that the flag _closed is set, getconn will fail immediately,
+ # putconn will just close the returned connection.
+
+ def _stop_workers(self, timeout: float | None = None) -> None:
# Stop the scheduler
self._sched.enter(0, None)
for _ in workers:
self.run_task(StopWorker(self))
- # Signal to eventual clients in the queue that business is closed.
- for pos in waiting_clients:
- pos.fail(PoolClosed(f"the pool {self.name!r} is closed"))
-
- # Close the connections still in the pool
- for conn in connections:
- conn.close()
+ if self._sched_runner: # likely
+ workers.append(self._sched_runner)
+ self._sched_runner = None
# Wait for the worker tasks to terminate
- assert self._sched_runner is not None
- sched_runner, self._sched_runner = (self._sched_runner, None)
- gather(sched_runner, *workers, timeout=timeout)
+ gather(*workers, timeout=timeout)
def __enter__(self) -> Self:
self._open_implicit = False
from time import monotonic
from types import TracebackType
from typing import Any, AsyncIterator, cast, Dict, Generic, List
-from typing import Optional, Sequence, Type
+from typing import Optional, Type
from weakref import ref
from contextlib import asynccontextmanager
if getattr(self, "_closed", True):
return
- self._stop_workers()
+ self._stop_workers(timeout=5.0)
def _check_open_getconn(self) -> None:
super()._check_open_getconn()
self._closed = True
logger.debug("pool %r closed", self.name)
- # Take waiting client and pool connections out of the state
- waiting = list(self._waiting)
- self._waiting.clear()
- connections = list(self._pool)
- self._pool.clear()
+ # Stop the worker, wait for the threads to finish.
+ await self._stop_workers(timeout=timeout)
+
+ # Signal to eventual clients in the queue that business is closed.
+ while self._waiting:
+ pos = self._waiting.pop()
+ await pos.fail(PoolClosed(f"the pool {self.name!r} is closed"))
+
+ # Close the connections still in the pool
+ while self._pool:
+ conn = self._pool.pop()
+ await conn.close()
# Now that the flag _closed is set, getconn will fail immediately,
# putconn will just close the returned connection.
- await self._stop_workers(waiting, connections, timeout)
- async def _stop_workers(
- self,
- waiting_clients: Sequence[WaitingClient[ACT]] = (),
- connections: Sequence[ACT] = (),
- timeout: float | None = None,
- ) -> None:
+ async def _stop_workers(self, timeout: float | None = None) -> None:
# Stop the scheduler
await self._sched.enter(0, None)
for _ in workers:
self.run_task(StopWorker(self))
- # Signal to eventual clients in the queue that business is closed.
- for pos in waiting_clients:
- await pos.fail(PoolClosed(f"the pool {self.name!r} is closed"))
-
- # Close the connections still in the pool
- for conn in connections:
- await conn.close()
+ if self._sched_runner: # likely
+ workers.append(self._sched_runner)
+ self._sched_runner = None
# Wait for the worker tasks to terminate
- assert self._sched_runner is not None
- sched_runner, self._sched_runner = self._sched_runner, None
- await agather(sched_runner, *workers, timeout=timeout)
+ await agather(*workers, timeout=timeout)
async def __aenter__(self) -> Self:
self._open_implicit = False