From: Daniele Varrazzo Date: Fri, 6 Oct 2023 16:30:18 +0000 (+0200) Subject: fix(pool): clean up logging in getconn check X-Git-Tag: pool-3.2.0~7^2~2 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=330f765175907da2a21a8c4aa5a2d55fd327d84b;p=thirdparty%2Fpsycopg.git fix(pool): clean up logging in getconn check If a check callback is set, assume that failed checks on getconn are just part of normal operative life: don't consider them warnings. --- diff --git a/psycopg_pool/psycopg_pool/pool.py b/psycopg_pool/psycopg_pool/pool.py index 9945b96f4..4a7ce0dfd 100644 --- a/psycopg_pool/psycopg_pool/pool.py +++ b/psycopg_pool/psycopg_pool/pool.py @@ -252,7 +252,7 @@ class ConnectionPool(Generic[CT], BasePool): try: self._check_connection(conn) except Exception: - self.putconn(conn) + self._putconn(conn, from_getconn=True) else: logger.info("connection given by %r", self.name) return conn @@ -344,11 +344,14 @@ class ConnectionPool(Generic[CT], BasePool): if self._maybe_close_connection(conn): return + self._putconn(conn, from_getconn=False) + + def _putconn(self, conn: CT, from_getconn: bool) -> None: # Use a worker to perform eventual maintenance work in a separate task if self._reset: - self.run_task(ReturnConnection(self, conn)) + self.run_task(ReturnConnection(self, conn, from_getconn=from_getconn)) else: - self._return_connection(conn) + self._return_connection(conn, from_getconn=from_getconn) def _maybe_close_connection(self, conn: CT) -> bool: """Close a returned connection if necessary. @@ -699,17 +702,25 @@ class ConnectionPool(Generic[CT], BasePool): else: self._growing = False - def _return_connection(self, conn: CT) -> None: + def _return_connection(self, conn: CT, from_getconn: bool) -> None: """ Return a connection to the pool after usage. """ - self._reset_connection(conn) - if conn.pgconn.transaction_status == TransactionStatus.UNKNOWN: - self._stats[self._RETURNS_BAD] += 1 - # Connection no more in working state: create a new one. - self.run_task(AddConnection(self)) - logger.warning("discarding closed connection: %s", conn) - return + if from_getconn: + if conn.pgconn.transaction_status == TransactionStatus.UNKNOWN: + self._stats[self._CONNECTIONS_LOST] += 1 + # Connection no more in working state: create a new one. + self.run_task(AddConnection(self)) + logger.info("not serving connection found broken") + return + else: + self._reset_connection(conn) + if conn.pgconn.transaction_status == TransactionStatus.UNKNOWN: + self._stats[self._RETURNS_BAD] += 1 + # Connection no more in working state: create a new one. + self.run_task(AddConnection(self)) + logger.warning("discarding closed connection: %s", conn) + return # Check if the connection is past its best before date if conn._expire_at <= monotonic(): @@ -758,7 +769,10 @@ class ConnectionPool(Generic[CT], BasePool): status = conn.pgconn.transaction_status if status == TransactionStatus.IDLE: pass - elif status in (TransactionStatus.INTRANS, TransactionStatus.INERROR): + elif status == TransactionStatus.UNKNOWN: + # Connection closed + return + elif status == TransactionStatus.INTRANS or status == TransactionStatus.INERROR: # Connection returned with an active transaction logger.warning("rolling back returned connection: %s", conn) try: @@ -776,7 +790,7 @@ class ConnectionPool(Generic[CT], BasePool): logger.warning("closing returned connection: %s", conn) conn.close() - if not conn.closed and self._reset: + if self._reset: try: self._reset(conn) status = conn.pgconn.transaction_status @@ -957,12 +971,13 @@ class AddConnection(MaintenanceTask): class ReturnConnection(MaintenanceTask): """Clean up and return a connection to the pool.""" - def __init__(self, pool: ConnectionPool[Any], conn: CT): + def __init__(self, pool: ConnectionPool[Any], conn: CT, from_getconn: bool): super().__init__(pool) self.conn = conn + self.from_getconn = from_getconn def _run(self, pool: ConnectionPool[Any]) -> None: - pool._return_connection(self.conn) + pool._return_connection(self.conn, from_getconn=self.from_getconn) class ShrinkPool(MaintenanceTask): diff --git a/psycopg_pool/psycopg_pool/pool_async.py b/psycopg_pool/psycopg_pool/pool_async.py index ed8d08b13..67220d10f 100644 --- a/psycopg_pool/psycopg_pool/pool_async.py +++ b/psycopg_pool/psycopg_pool/pool_async.py @@ -272,7 +272,7 @@ class AsyncConnectionPool(Generic[ACT], BasePool): try: await self._check_connection(conn) except Exception: - await self.putconn(conn) + await self._putconn(conn, from_getconn=True) else: logger.info("connection given by %r", self.name) return conn @@ -365,11 +365,14 @@ class AsyncConnectionPool(Generic[ACT], BasePool): if await self._maybe_close_connection(conn): return + await self._putconn(conn, from_getconn=False) + + async def _putconn(self, conn: ACT, from_getconn: bool) -> None: # Use a worker to perform eventual maintenance work in a separate task if self._reset: - self.run_task(ReturnConnection(self, conn)) + self.run_task(ReturnConnection(self, conn, from_getconn=from_getconn)) else: - await self._return_connection(conn) + await self._return_connection(conn, from_getconn=from_getconn) async def _maybe_close_connection(self, conn: ACT) -> bool: """Close a returned connection if necessary. @@ -745,17 +748,26 @@ class AsyncConnectionPool(Generic[ACT], BasePool): else: self._growing = False - async def _return_connection(self, conn: ACT) -> None: + async def _return_connection(self, conn: ACT, from_getconn: bool) -> None: """ Return a connection to the pool after usage. """ - await self._reset_connection(conn) - if conn.pgconn.transaction_status == TransactionStatus.UNKNOWN: - self._stats[self._RETURNS_BAD] += 1 - # Connection no more in working state: create a new one. - self.run_task(AddConnection(self)) - logger.warning("discarding closed connection: %s", conn) - return + if from_getconn: + if conn.pgconn.transaction_status == TransactionStatus.UNKNOWN: + self._stats[self._CONNECTIONS_LOST] += 1 + # Connection no more in working state: create a new one. + self.run_task(AddConnection(self)) + logger.info("not serving connection found broken") + return + + else: + await self._reset_connection(conn) + if conn.pgconn.transaction_status == TransactionStatus.UNKNOWN: + self._stats[self._RETURNS_BAD] += 1 + # Connection no more in working state: create a new one. + self.run_task(AddConnection(self)) + logger.warning("discarding closed connection: %s", conn) + return # Check if the connection is past its best before date if conn._expire_at <= monotonic(): @@ -806,7 +818,11 @@ class AsyncConnectionPool(Generic[ACT], BasePool): if status == TransactionStatus.IDLE: pass - elif status in (TransactionStatus.INTRANS, TransactionStatus.INERROR): + elif status == TransactionStatus.UNKNOWN: + # Connection closed + return + + elif status == TransactionStatus.INTRANS or status == TransactionStatus.INERROR: # Connection returned with an active transaction logger.warning("rolling back returned connection: %s", conn) try: @@ -825,7 +841,7 @@ class AsyncConnectionPool(Generic[ACT], BasePool): logger.warning("closing returned connection: %s", conn) await conn.close() - if not conn.closed and self._reset: + if self._reset: try: await self._reset(conn) status = conn.pgconn.transaction_status @@ -1006,12 +1022,13 @@ class AddConnection(MaintenanceTask): class ReturnConnection(MaintenanceTask): """Clean up and return a connection to the pool.""" - def __init__(self, pool: AsyncConnectionPool[Any], conn: ACT): + def __init__(self, pool: AsyncConnectionPool[Any], conn: ACT, from_getconn: bool): super().__init__(pool) self.conn = conn + self.from_getconn = from_getconn async def _run(self, pool: AsyncConnectionPool[Any]) -> None: - await pool._return_connection(self.conn) + await pool._return_connection(self.conn, from_getconn=self.from_getconn) class ShrinkPool(MaintenanceTask):