parameters# update (:ticket:`#851`).
+psycopg_pool 3.2.8 (unreleased)
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+- Don't lose connections if a `~asyncio.CancelledError` is raised in a check
+ (:tickets:`#1123, #1208`)
+
+
Current release
---------------
from time import monotonic
from types import TracebackType
from typing import Any, Generic, cast
+from asyncio import CancelledError
from weakref import ref
from contextlib import contextmanager
from collections import deque
conn = self._getconn_unchecked(deadline - monotonic())
try:
self._check_connection(conn)
- except Exception:
+ except (Exception, CancelledError):
self._putconn(conn, from_getconn=True)
else:
logger.info("connection given by %r", self.name)
if not conn:
try:
conn = pos.wait(timeout=timeout)
- except Exception:
+ except BaseException:
self._stats[self._REQUESTS_ERRORS] += 1
raise
finally:
return
try:
self._check(conn)
- except Exception as e:
+ except BaseException as e:
logger.info("connection failed check: %s", e)
raise
# Check for broken connections
try:
self.check_connection(conn)
- except Exception:
+ except (Exception, CancelledError):
self._stats[self._CONNECTIONS_LOST] += 1
logger.warning("discarding broken connection: %s", conn)
self.run_task(AddConnection(self))
# Run the task. Make sure don't die in the attempt.
try:
task.run()
- except Exception as ex:
+ except (Exception, CancelledError) as ex:
logger.warning(
"task run %s failed: %s: %s", task, ex.__class__.__name__, ex
)
t0 = monotonic()
try:
conn = self.connection_class.connect(conninfo, **kwargs)
- except Exception:
+ except (Exception, CancelledError):
self._stats[self._CONNECTIONS_ERRORS] += 1
raise
else:
try:
conn = self._connect()
- except Exception as ex:
+ except (Exception, CancelledError) as ex:
logger.warning("error connecting in %r: %s", self.name, ex)
if attempt.time_to_give_up(now):
logger.warning(
raise e.ProgrammingError(
f"connection left in status {sname} by reset function {self._reset}: discarded"
)
- except Exception as ex:
+ except (Exception, CancelledError) as ex:
logger.warning("error resetting connection: %s", ex)
self._close_connection(conn)
from time import monotonic
from types import TracebackType
from typing import Any, Generic, cast
+from asyncio import CancelledError
from weakref import ref
from contextlib import asynccontextmanager
from collections import deque
conn = await self._getconn_unchecked(deadline - monotonic())
try:
await self._check_connection(conn)
- except Exception:
+ except (Exception, CancelledError):
await self._putconn(conn, from_getconn=True)
else:
logger.info("connection given by %r", self.name)
if not conn:
try:
conn = await pos.wait(timeout=timeout)
- except Exception:
+ except BaseException:
self._stats[self._REQUESTS_ERRORS] += 1
raise
finally:
return
try:
await self._check(conn)
- except Exception as e:
+ except BaseException as e:
logger.info("connection failed check: %s", e)
raise
# Check for broken connections
try:
await self.check_connection(conn)
- except Exception:
+ except (Exception, CancelledError):
self._stats[self._CONNECTIONS_LOST] += 1
logger.warning("discarding broken connection: %s", conn)
self.run_task(AddConnection(self))
# Run the task. Make sure don't die in the attempt.
try:
await task.run()
- except Exception as ex:
+ except (Exception, CancelledError) as ex:
logger.warning(
"task run %s failed: %s: %s", task, ex.__class__.__name__, ex
)
t0 = monotonic()
try:
conn = await self.connection_class.connect(conninfo, **kwargs)
- except Exception:
+ except (Exception, CancelledError):
self._stats[self._CONNECTIONS_ERRORS] += 1
raise
else:
try:
conn = await self._connect()
- except Exception as ex:
+ except (Exception, CancelledError) as ex:
logger.warning("error connecting in %r: %s", self.name, ex)
if attempt.time_to_give_up(now):
logger.warning(
f"connection left in status {sname} by reset function"
f" {self._reset}: discarded"
)
- except Exception as ex:
+ except (Exception, CancelledError) as ex:
logger.warning("error resetting connection: %s", ex)
await self._close_connection(conn)
import logging
from time import time
from typing import Any
+from asyncio import CancelledError
import pytest
assert cur.fetchone() == (1,)
+@skip_sync
+def test_cancel_on_check(pool_cls, dsn):
+ do_cancel = True
+
+ def check(conn):
+ nonlocal do_cancel
+ if do_cancel:
+ do_cancel = False
+ raise CancelledError()
+
+ pool_cls.check_connection(conn)
+
+ with pool_cls(dsn, min_size=min_size(pool_cls, 1), check=check, timeout=1.0) as p:
+ try:
+ with p.connection() as conn:
+ conn.execute("select 1")
+ except CancelledError:
+ pass
+
+ with p.connection() as conn:
+ conn.execute("select 1")
+
+
def min_size(pool_cls, num=1):
"""Return the minimum min_size supported by the pool class."""
if pool_cls is pool.ConnectionPool:
import logging
from time import time
from typing import Any
+from asyncio import CancelledError
import pytest
assert await cur.fetchone() == (1,)
+@skip_sync
+async def test_cancel_on_check(pool_cls, dsn):
+ do_cancel = True
+
+ async def check(conn):
+ nonlocal do_cancel
+ if do_cancel:
+ do_cancel = False
+ raise CancelledError()
+
+ await pool_cls.check_connection(conn)
+
+ async with pool_cls(
+ dsn, min_size=min_size(pool_cls, 1), check=check, timeout=1.0
+ ) as p:
+ try:
+ async with p.connection() as conn:
+ await conn.execute("select 1")
+ except CancelledError:
+ pass
+
+ async with p.connection() as conn:
+ await conn.execute("select 1")
+
+
def min_size(pool_cls, num=1):
"""Return the minimum min_size supported by the pool class."""
if pool_cls is pool.AsyncConnectionPool: