]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
fix(pool): create asyncio objects on open rather than on init
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Thu, 3 Nov 2022 16:42:58 +0000 (17:42 +0100)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Fri, 4 Nov 2022 16:31:55 +0000 (17:31 +0100)
If created on init, they get attached to the wrong loop and fail to
work.

Close #219

psycopg_pool/psycopg_pool/pool_async.py
tests/pool/test_pool_async_noasyncio.py [new file with mode: 0644]

index 11d7cef42f42142b78ac3abd7142856f00973308..b7bb52a58fe7bfcfb159eb7179cb92bc0036c236 100644 (file)
@@ -41,15 +41,17 @@ class AsyncConnectionPool(BasePool[AsyncConnection[Any]]):
         self._configure = configure
         self._reset = reset
 
-        self._lock = asyncio.Lock()
+        # asyncio objects, created on open to attach them to the right loop.
+        self._lock: asyncio.Lock
+        self._sched: AsyncScheduler
+        self._tasks: "asyncio.Queue[MaintenanceTask]"
+
         self._waiting = Deque["AsyncClient"]()
 
         # to notify that the pool is full
         self._pool_full_event: Optional[asyncio.Event] = None
 
-        self._sched = AsyncScheduler()
         self._sched_runner: Optional[Task[None]] = None
-        self._tasks: "asyncio.Queue[MaintenanceTask]" = asyncio.Queue()
         self._workers: List[Task[None]] = []
 
         super().__init__(conninfo, **kwargs)
@@ -99,10 +101,11 @@ class AsyncConnectionPool(BasePool[AsyncConnection[Any]]):
         logger.info("connection requested from %r", self.name)
         self._stats[self._REQUESTS_NUM] += 1
 
+        self._check_open_getconn()
+
         # Critical section: decide here if there's a connection ready
         # or if the client needs to wait.
         async with self._lock:
-            self._check_open_getconn()
             conn = await self._get_ready_connection(timeout)
             if not conn:
                 # No connection available: put the client in the waiting queue
@@ -185,6 +188,12 @@ class AsyncConnectionPool(BasePool[AsyncConnection[Any]]):
         return True
 
     async def open(self, wait: bool = False, timeout: float = 30.0) -> None:
+        # Make sure the lock is created after there is an event loop
+        try:
+            self._lock
+        except AttributeError:
+            self._lock = asyncio.Lock()
+
         async with self._lock:
             self._open()
 
@@ -197,6 +206,16 @@ class AsyncConnectionPool(BasePool[AsyncConnection[Any]]):
 
         self._check_open()
 
+        # Create these objects now to attach them to the right loop.
+        # See #219
+        self._tasks = asyncio.Queue()
+        self._sched = AsyncScheduler()
+        # This has been most likely, but not necessarily, created in `open()`.
+        try:
+            self._lock
+        except AttributeError:
+            self._lock = asyncio.Lock()
+
         self._closed = False
         self._opened = True
 
diff --git a/tests/pool/test_pool_async_noasyncio.py b/tests/pool/test_pool_async_noasyncio.py
new file mode 100644 (file)
index 0000000..56cd50b
--- /dev/null
@@ -0,0 +1,53 @@
+# These tests relate to AsyncConnectionPool, but are not marked asyncio
+# because they rely on the pool initialization outside the asyncio loop.
+
+import asyncio
+
+import pytest
+
+try:
+    import psycopg_pool as pool
+except ImportError:
+    # Tests should have been skipped if the package is not available
+    pass
+
+
+@pytest.mark.slow
+def test_reconnect_after_max_lifetime(dsn):
+    # See issue #219, pool created before the loop.
+    p = pool.AsyncConnectionPool(dsn, min_size=1, max_lifetime=0.2, open=False)
+
+    async def test():
+        try:
+            await p.open()
+            ns = []
+            for i in range(5):
+                async with p.connection() as conn:
+                    cur = await conn.execute("select 1")
+                    ns.append(await cur.fetchone())
+                await asyncio.sleep(0.2)
+            assert len(ns) == 5
+        finally:
+            await p.close()
+
+    asyncio.run(asyncio.wait_for(test(), timeout=2.0))
+
+
+@pytest.mark.slow
+def test_working_created_before_loop(dsn):
+    p = pool.AsyncNullConnectionPool(dsn, open=False)
+
+    async def test():
+        try:
+            await p.open()
+            ns = []
+            for i in range(5):
+                async with p.connection() as conn:
+                    cur = await conn.execute("select 1")
+                    ns.append(await cur.fetchone())
+                await asyncio.sleep(0.2)
+            assert len(ns) == 5
+        finally:
+            await p.close()
+
+    asyncio.run(asyncio.wait_for(test(), timeout=2.0))