try:
conn = pos.wait(timeout=timeout)
except CLIENT_EXCEPTIONS:
+ if pos.conn:
+ self.run_task(ReturnConnection(self, pos.conn, from_getconn=True))
self._stats[self._REQUESTS_ERRORS] += 1
raise
finally:
except CLIENT_EXCEPTIONS as ex:
self.error = ex
- if self.conn:
- return self.conn
- else:
- assert self.error
+ if self.error:
raise self.error
+ else:
+ assert self.conn
+ return self.conn
def set(self, conn: CT) -> bool:
"""Signal the client waiting that a connection is ready.
try:
conn = await pos.wait(timeout=timeout)
except CLIENT_EXCEPTIONS:
+ if pos.conn:
+ self.run_task(ReturnConnection(self, pos.conn, from_getconn=True))
self._stats[self._REQUESTS_ERRORS] += 1
raise
finally:
except CLIENT_EXCEPTIONS as ex:
self.error = ex
- if self.conn:
- return self.conn
- else:
- assert self.error
+ if self.error:
raise self.error
+ else:
+ assert self.conn
+ return self.conn
async def set(self, conn: ACT) -> bool:
"""Signal the client waiting that a connection is ready.
assert await cur.fetchone() == (1,)
+@skip_sync
+@pytest.mark.crdb_skip("backend pid")
+async def test_cancelled_waiter_assigned_conn_is_reclaimed(dsn, monkeypatch):
+ from asyncio import CancelledError
+
+ from psycopg_pool.pool_async import WaitingClient
+
+ from .test_pool_common_async import ensure_waiting
+
+ assigned = AEvent()
+ release = AEvent()
+
+ async def set_blocked(self, conn):
+ async with self._cond:
+ if self.conn or self.error:
+ return False
+
+ self.conn = conn
+ assigned.set()
+ await release.wait()
+ self._cond.notify_all()
+ return True
+
+ monkeypatch.setattr(WaitingClient, "set", set_blocked)
+
+ async with pool.AsyncConnectionPool(dsn, min_size=1, max_size=1, timeout=1) as p:
+ await p.wait()
+
+ held_conn = await p.getconn()
+ held_pid = held_conn.info.backend_pid
+ waiter = spawn(p.getconn)
+ await ensure_waiting(p)
+
+ putter = spawn(p.putconn, args=(held_conn,))
+ await assigned.wait()
+
+ waiter.cancel()
+ release.set()
+
+ try:
+ unexpected_conn = await waiter
+ except CancelledError:
+ pass
+ else:
+ await p.putconn(unexpected_conn)
+ pytest.fail("cancelled waiter returned a connection instead of raising")
+
+ await gather(putter)
+
+ stats = p.get_stats()
+ assert stats["pool_available"] == 1
+ assert stats.get("requests_waiting", 0) == 0
+ assert stats["requests_errors"] == 1
+
+ reclaimed_conn = await p.getconn()
+ try:
+ assert reclaimed_conn.info.backend_pid == held_pid
+ cur = await reclaimed_conn.execute("select 1")
+ assert await cur.fetchone() == (1,)
+ finally:
+ await p.putconn(reclaimed_conn)
+
+
@pytest.mark.slow
@pytest.mark.timing
async def test_check_backoff(dsn, caplog, monkeypatch):