try:
self._check_connection(conn)
except Exception:
- self.putconn(conn)
+ self._putconn(conn, from_getconn=True)
else:
logger.info("connection given by %r", self.name)
return conn
if self._maybe_close_connection(conn):
return
+ self._putconn(conn, from_getconn=False)
+
+ def _putconn(self, conn: CT, from_getconn: bool) -> None:
# Use a worker to perform eventual maintenance work in a separate task
if self._reset:
- self.run_task(ReturnConnection(self, conn))
+ self.run_task(ReturnConnection(self, conn, from_getconn=from_getconn))
else:
- self._return_connection(conn)
+ self._return_connection(conn, from_getconn=from_getconn)
def _maybe_close_connection(self, conn: CT) -> bool:
"""Close a returned connection if necessary.
else:
self._growing = False
- def _return_connection(self, conn: CT) -> None:
+ def _return_connection(self, conn: CT, from_getconn: bool) -> None:
"""
Return a connection to the pool after usage.
"""
- self._reset_connection(conn)
- if conn.pgconn.transaction_status == TransactionStatus.UNKNOWN:
- self._stats[self._RETURNS_BAD] += 1
- # Connection no more in working state: create a new one.
- self.run_task(AddConnection(self))
- logger.warning("discarding closed connection: %s", conn)
- return
+ if from_getconn:
+ if conn.pgconn.transaction_status == TransactionStatus.UNKNOWN:
+ self._stats[self._CONNECTIONS_LOST] += 1
+ # Connection no more in working state: create a new one.
+ self.run_task(AddConnection(self))
+ logger.info("not serving connection found broken")
+ return
+ else:
+ self._reset_connection(conn)
+ if conn.pgconn.transaction_status == TransactionStatus.UNKNOWN:
+ self._stats[self._RETURNS_BAD] += 1
+ # Connection no more in working state: create a new one.
+ self.run_task(AddConnection(self))
+ logger.warning("discarding closed connection: %s", conn)
+ return
# Check if the connection is past its best before date
if conn._expire_at <= monotonic():
status = conn.pgconn.transaction_status
if status == TransactionStatus.IDLE:
pass
- elif status in (TransactionStatus.INTRANS, TransactionStatus.INERROR):
+ elif status == TransactionStatus.UNKNOWN:
+ # Connection closed
+ return
+ elif status == TransactionStatus.INTRANS or status == TransactionStatus.INERROR:
# Connection returned with an active transaction
logger.warning("rolling back returned connection: %s", conn)
try:
logger.warning("closing returned connection: %s", conn)
conn.close()
- if not conn.closed and self._reset:
+ if self._reset:
try:
self._reset(conn)
status = conn.pgconn.transaction_status
class ReturnConnection(MaintenanceTask):
"""Clean up and return a connection to the pool."""
- def __init__(self, pool: ConnectionPool[Any], conn: CT):
+ def __init__(self, pool: ConnectionPool[Any], conn: CT, from_getconn: bool):
super().__init__(pool)
self.conn = conn
+ self.from_getconn = from_getconn
def _run(self, pool: ConnectionPool[Any]) -> None:
- pool._return_connection(self.conn)
+ pool._return_connection(self.conn, from_getconn=self.from_getconn)
class ShrinkPool(MaintenanceTask):
try:
await self._check_connection(conn)
except Exception:
- await self.putconn(conn)
+ await self._putconn(conn, from_getconn=True)
else:
logger.info("connection given by %r", self.name)
return conn
if await self._maybe_close_connection(conn):
return
+ await self._putconn(conn, from_getconn=False)
+
+ async def _putconn(self, conn: ACT, from_getconn: bool) -> None:
# Use a worker to perform eventual maintenance work in a separate task
if self._reset:
- self.run_task(ReturnConnection(self, conn))
+ self.run_task(ReturnConnection(self, conn, from_getconn=from_getconn))
else:
- await self._return_connection(conn)
+ await self._return_connection(conn, from_getconn=from_getconn)
async def _maybe_close_connection(self, conn: ACT) -> bool:
"""Close a returned connection if necessary.
else:
self._growing = False
- async def _return_connection(self, conn: ACT) -> None:
+ async def _return_connection(self, conn: ACT, from_getconn: bool) -> None:
"""
Return a connection to the pool after usage.
"""
- await self._reset_connection(conn)
- if conn.pgconn.transaction_status == TransactionStatus.UNKNOWN:
- self._stats[self._RETURNS_BAD] += 1
- # Connection no more in working state: create a new one.
- self.run_task(AddConnection(self))
- logger.warning("discarding closed connection: %s", conn)
- return
+ if from_getconn:
+ if conn.pgconn.transaction_status == TransactionStatus.UNKNOWN:
+ self._stats[self._CONNECTIONS_LOST] += 1
+ # Connection no more in working state: create a new one.
+ self.run_task(AddConnection(self))
+ logger.info("not serving connection found broken")
+ return
+
+ else:
+ await self._reset_connection(conn)
+ if conn.pgconn.transaction_status == TransactionStatus.UNKNOWN:
+ self._stats[self._RETURNS_BAD] += 1
+ # Connection no more in working state: create a new one.
+ self.run_task(AddConnection(self))
+ logger.warning("discarding closed connection: %s", conn)
+ return
# Check if the connection is past its best before date
if conn._expire_at <= monotonic():
if status == TransactionStatus.IDLE:
pass
- elif status in (TransactionStatus.INTRANS, TransactionStatus.INERROR):
+ elif status == TransactionStatus.UNKNOWN:
+ # Connection closed
+ return
+
+ elif status == TransactionStatus.INTRANS or status == TransactionStatus.INERROR:
# Connection returned with an active transaction
logger.warning("rolling back returned connection: %s", conn)
try:
logger.warning("closing returned connection: %s", conn)
await conn.close()
- if not conn.closed and self._reset:
+ if self._reset:
try:
await self._reset(conn)
status = conn.pgconn.transaction_status
class ReturnConnection(MaintenanceTask):
"""Clean up and return a connection to the pool."""
- def __init__(self, pool: AsyncConnectionPool[Any], conn: ACT):
+ def __init__(self, pool: AsyncConnectionPool[Any], conn: ACT, from_getconn: bool):
super().__init__(pool)
self.conn = conn
+ self.from_getconn = from_getconn
async def _run(self, pool: AsyncConnectionPool[Any]) -> None:
- await pool._return_connection(self.conn)
+ await pool._return_connection(self.conn, from_getconn=self.from_getconn)
class ShrinkPool(MaintenanceTask):