From: Daniele Varrazzo Date: Thu, 3 Nov 2022 16:42:58 +0000 (+0100) Subject: fix(pool): create asyncio objects on open rather than on init X-Git-Tag: pool-3.1.4~1^2~2 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=11e0401dea4f167ba764a5a4f230435d22053971;p=thirdparty%2Fpsycopg.git fix(pool): create asyncio objects on open rather than on init If created on init, they get attached to the wrong loop and fail to work. Close #219 --- diff --git a/psycopg_pool/psycopg_pool/pool_async.py b/psycopg_pool/psycopg_pool/pool_async.py index 11d7cef42..b7bb52a58 100644 --- a/psycopg_pool/psycopg_pool/pool_async.py +++ b/psycopg_pool/psycopg_pool/pool_async.py @@ -41,15 +41,17 @@ class AsyncConnectionPool(BasePool[AsyncConnection[Any]]): self._configure = configure self._reset = reset - self._lock = asyncio.Lock() + # asyncio objects, created on open to attach them to the right loop. + self._lock: asyncio.Lock + self._sched: AsyncScheduler + self._tasks: "asyncio.Queue[MaintenanceTask]" + self._waiting = Deque["AsyncClient"]() # to notify that the pool is full self._pool_full_event: Optional[asyncio.Event] = None - self._sched = AsyncScheduler() self._sched_runner: Optional[Task[None]] = None - self._tasks: "asyncio.Queue[MaintenanceTask]" = asyncio.Queue() self._workers: List[Task[None]] = [] super().__init__(conninfo, **kwargs) @@ -99,10 +101,11 @@ class AsyncConnectionPool(BasePool[AsyncConnection[Any]]): logger.info("connection requested from %r", self.name) self._stats[self._REQUESTS_NUM] += 1 + self._check_open_getconn() + # Critical section: decide here if there's a connection ready # or if the client needs to wait. async with self._lock: - self._check_open_getconn() conn = await self._get_ready_connection(timeout) if not conn: # No connection available: put the client in the waiting queue @@ -185,6 +188,12 @@ class AsyncConnectionPool(BasePool[AsyncConnection[Any]]): return True async def open(self, wait: bool = False, timeout: float = 30.0) -> None: + # Make sure the lock is created after there is an event loop + try: + self._lock + except AttributeError: + self._lock = asyncio.Lock() + async with self._lock: self._open() @@ -197,6 +206,16 @@ class AsyncConnectionPool(BasePool[AsyncConnection[Any]]): self._check_open() + # Create these objects now to attach them to the right loop. + # See #219 + self._tasks = asyncio.Queue() + self._sched = AsyncScheduler() + # This has been most likely, but not necessarily, created in `open()`. + try: + self._lock + except AttributeError: + self._lock = asyncio.Lock() + self._closed = False self._opened = True diff --git a/tests/pool/test_pool_async_noasyncio.py b/tests/pool/test_pool_async_noasyncio.py new file mode 100644 index 000000000..56cd50b81 --- /dev/null +++ b/tests/pool/test_pool_async_noasyncio.py @@ -0,0 +1,53 @@ +# These tests relate to AsyncConnectionPool, but are not marked asyncio +# because they rely on the pool initialization outside the asyncio loop. + +import asyncio + +import pytest + +try: + import psycopg_pool as pool +except ImportError: + # Tests should have been skipped if the package is not available + pass + + +@pytest.mark.slow +def test_reconnect_after_max_lifetime(dsn): + # See issue #219, pool created before the loop. + p = pool.AsyncConnectionPool(dsn, min_size=1, max_lifetime=0.2, open=False) + + async def test(): + try: + await p.open() + ns = [] + for i in range(5): + async with p.connection() as conn: + cur = await conn.execute("select 1") + ns.append(await cur.fetchone()) + await asyncio.sleep(0.2) + assert len(ns) == 5 + finally: + await p.close() + + asyncio.run(asyncio.wait_for(test(), timeout=2.0)) + + +@pytest.mark.slow +def test_working_created_before_loop(dsn): + p = pool.AsyncNullConnectionPool(dsn, open=False) + + async def test(): + try: + await p.open() + ns = [] + for i in range(5): + async with p.connection() as conn: + cur = await conn.execute("select 1") + ns.append(await cur.fetchone()) + await asyncio.sleep(0.2) + assert len(ns) == 5 + finally: + await p.close() + + asyncio.run(asyncio.wait_for(test(), timeout=2.0))