From: Daniele Varrazzo Date: Wed, 10 Apr 2024 21:58:24 +0000 (+0200) Subject: fix(pool): make sure there are no connection in the pool after close() X-Git-Tag: 3.2.0~47 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=910383f0b23516f895f720a5e9f5a268b25fef89;p=thirdparty%2Fpsycopg.git fix(pool): make sure there are no connection in the pool after close() The case has been reported in #784. While not easy to reproduce, it seems that it might be caused by the pool being closed while a worker is still trying to create a connection, which will be put in the _pool state after supposedly no other operation should have been performed. Stop the workers and then empty the pool only after they have stopped to run. Also refactor the cleanup of the pool and waiting queue, moving them to close(). There is no reason why a method called "stop workers" should empty them, and there is no other code path that use such feature. Close #784. --- diff --git a/docs/news_pool.rst b/docs/news_pool.rst index 503202db2..99269b5c8 100644 --- a/docs/news_pool.rst +++ b/docs/news_pool.rst @@ -15,6 +15,7 @@ psycopg_pool 3.2.2 (unreleased) - Raise a `RuntimeWarning` instead of a `DeprecationWarning` if an async pool is open in the constructor. +- Fix connections possibly left in the pool after closing (:ticket:`#784`). Current release diff --git a/psycopg_pool/psycopg_pool/pool.py b/psycopg_pool/psycopg_pool/pool.py index e08476e60..f1bdc9707 100644 --- a/psycopg_pool/psycopg_pool/pool.py +++ b/psycopg_pool/psycopg_pool/pool.py @@ -15,7 +15,7 @@ from abc import ABC, abstractmethod from time import monotonic from types import TracebackType from typing import Any, Iterator, cast, Dict, Generic, List -from typing import Optional, Sequence, Type +from typing import Optional, Type from weakref import ref from contextlib import contextmanager @@ -106,7 +106,7 @@ class ConnectionPool(Generic[CT], BasePool): if getattr(self, "_closed", True): return - self._stop_workers() + self._stop_workers(timeout=5.0) def _check_open_getconn(self) -> None: super()._check_open_getconn() @@ -421,22 +421,23 @@ class ConnectionPool(Generic[CT], BasePool): self._closed = True logger.debug("pool %r closed", self.name) - # Take waiting client and pool connections out of the state - waiting = list(self._waiting) - self._waiting.clear() - connections = list(self._pool) - self._pool.clear() + # Stop the worker, wait for the threads to finish. + self._stop_workers(timeout=timeout) - # Now that the flag _closed is set, getconn will fail immediately, - # putconn will just close the returned connection. - self._stop_workers(waiting, connections, timeout) + # Signal to eventual clients in the queue that business is closed. + while self._waiting: + pos = self._waiting.pop() + pos.fail(PoolClosed(f"the pool {self.name!r} is closed")) - def _stop_workers( - self, - waiting_clients: Sequence[WaitingClient[CT]] = (), - connections: Sequence[CT] = (), - timeout: float | None = None, - ) -> None: + # Close the connections still in the pool + while self._pool: + conn = self._pool.pop() + conn.close() + + # Now that the flag _closed is set, getconn will fail immediately, + # putconn will just close the returned connection. + + def _stop_workers(self, timeout: float | None = None) -> None: # Stop the scheduler self._sched.enter(0, None) @@ -445,18 +446,12 @@ class ConnectionPool(Generic[CT], BasePool): for _ in workers: self.run_task(StopWorker(self)) - # Signal to eventual clients in the queue that business is closed. - for pos in waiting_clients: - pos.fail(PoolClosed(f"the pool {self.name!r} is closed")) - - # Close the connections still in the pool - for conn in connections: - conn.close() + if self._sched_runner: # likely + workers.append(self._sched_runner) + self._sched_runner = None # Wait for the worker tasks to terminate - assert self._sched_runner is not None - sched_runner, self._sched_runner = (self._sched_runner, None) - gather(sched_runner, *workers, timeout=timeout) + gather(*workers, timeout=timeout) def __enter__(self) -> Self: self._open_implicit = False diff --git a/psycopg_pool/psycopg_pool/pool_async.py b/psycopg_pool/psycopg_pool/pool_async.py index e70781b60..39c0e6bbd 100644 --- a/psycopg_pool/psycopg_pool/pool_async.py +++ b/psycopg_pool/psycopg_pool/pool_async.py @@ -12,7 +12,7 @@ from abc import ABC, abstractmethod from time import monotonic from types import TracebackType from typing import Any, AsyncIterator, cast, Dict, Generic, List -from typing import Optional, Sequence, Type +from typing import Optional, Type from weakref import ref from contextlib import asynccontextmanager @@ -111,7 +111,7 @@ class AsyncConnectionPool(Generic[ACT], BasePool): if getattr(self, "_closed", True): return - self._stop_workers() + self._stop_workers(timeout=5.0) def _check_open_getconn(self) -> None: super()._check_open_getconn() @@ -453,22 +453,23 @@ class AsyncConnectionPool(Generic[ACT], BasePool): self._closed = True logger.debug("pool %r closed", self.name) - # Take waiting client and pool connections out of the state - waiting = list(self._waiting) - self._waiting.clear() - connections = list(self._pool) - self._pool.clear() + # Stop the worker, wait for the threads to finish. + await self._stop_workers(timeout=timeout) + + # Signal to eventual clients in the queue that business is closed. + while self._waiting: + pos = self._waiting.pop() + await pos.fail(PoolClosed(f"the pool {self.name!r} is closed")) + + # Close the connections still in the pool + while self._pool: + conn = self._pool.pop() + await conn.close() # Now that the flag _closed is set, getconn will fail immediately, # putconn will just close the returned connection. - await self._stop_workers(waiting, connections, timeout) - async def _stop_workers( - self, - waiting_clients: Sequence[WaitingClient[ACT]] = (), - connections: Sequence[ACT] = (), - timeout: float | None = None, - ) -> None: + async def _stop_workers(self, timeout: float | None = None) -> None: # Stop the scheduler await self._sched.enter(0, None) @@ -477,18 +478,12 @@ class AsyncConnectionPool(Generic[ACT], BasePool): for _ in workers: self.run_task(StopWorker(self)) - # Signal to eventual clients in the queue that business is closed. - for pos in waiting_clients: - await pos.fail(PoolClosed(f"the pool {self.name!r} is closed")) - - # Close the connections still in the pool - for conn in connections: - await conn.close() + if self._sched_runner: # likely + workers.append(self._sched_runner) + self._sched_runner = None # Wait for the worker tasks to terminate - assert self._sched_runner is not None - sched_runner, self._sched_runner = self._sched_runner, None - await agather(sched_runner, *workers, timeout=timeout) + await agather(*workers, timeout=timeout) async def __aenter__(self) -> Self: self._open_implicit = False