self._configure = configure
self._reset = reset
- self._lock = asyncio.Lock()
+ # asyncio objects, created on open to attach them to the right loop.
+ self._lock: asyncio.Lock
+ self._sched: AsyncScheduler
+ self._tasks: "asyncio.Queue[MaintenanceTask]"
+
self._waiting = Deque["AsyncClient"]()
# to notify that the pool is full
self._pool_full_event: Optional[asyncio.Event] = None
- self._sched = AsyncScheduler()
self._sched_runner: Optional[Task[None]] = None
- self._tasks: "asyncio.Queue[MaintenanceTask]" = asyncio.Queue()
self._workers: List[Task[None]] = []
super().__init__(conninfo, **kwargs)
logger.info("connection requested from %r", self.name)
self._stats[self._REQUESTS_NUM] += 1
+ self._check_open_getconn()
+
# Critical section: decide here if there's a connection ready
# or if the client needs to wait.
async with self._lock:
- self._check_open_getconn()
conn = await self._get_ready_connection(timeout)
if not conn:
# No connection available: put the client in the waiting queue
return True
async def open(self, wait: bool = False, timeout: float = 30.0) -> None:
+ # Make sure the lock is created after there is an event loop
+ try:
+ self._lock
+ except AttributeError:
+ self._lock = asyncio.Lock()
+
async with self._lock:
self._open()
self._check_open()
+ # Create these objects now to attach them to the right loop.
+ # See #219
+ self._tasks = asyncio.Queue()
+ self._sched = AsyncScheduler()
+ # This has been most likely, but not necessarily, created in `open()`.
+ try:
+ self._lock
+ except AttributeError:
+ self._lock = asyncio.Lock()
+
self._closed = False
self._opened = True
--- /dev/null
+# These tests relate to AsyncConnectionPool, but are not marked asyncio
+# because they rely on the pool initialization outside the asyncio loop.
+
+import asyncio
+
+import pytest
+
+try:
+ import psycopg_pool as pool
+except ImportError:
+ # Tests should have been skipped if the package is not available
+ pass
+
+
+@pytest.mark.slow
+def test_reconnect_after_max_lifetime(dsn):
+ # See issue #219, pool created before the loop.
+ p = pool.AsyncConnectionPool(dsn, min_size=1, max_lifetime=0.2, open=False)
+
+ async def test():
+ try:
+ await p.open()
+ ns = []
+ for i in range(5):
+ async with p.connection() as conn:
+ cur = await conn.execute("select 1")
+ ns.append(await cur.fetchone())
+ await asyncio.sleep(0.2)
+ assert len(ns) == 5
+ finally:
+ await p.close()
+
+ asyncio.run(asyncio.wait_for(test(), timeout=2.0))
+
+
+@pytest.mark.slow
+def test_working_created_before_loop(dsn):
+ p = pool.AsyncNullConnectionPool(dsn, open=False)
+
+ async def test():
+ try:
+ await p.open()
+ ns = []
+ for i in range(5):
+ async with p.connection() as conn:
+ cur = await conn.execute("select 1")
+ ns.append(await cur.fetchone())
+ await asyncio.sleep(0.2)
+ assert len(ns) == 5
+ finally:
+ await p.close()
+
+ asyncio.run(asyncio.wait_for(test(), timeout=2.0))