From: Daniele Varrazzo Date: Fri, 6 Oct 2023 13:13:39 +0000 (+0200) Subject: feat(pool): add `check_connection()` method X-Git-Tag: pool-3.2.0~7^2~4 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=4575176cbaa2149eb9a74f95467403e69a9c0745;p=thirdparty%2Fpsycopg.git feat(pool): add `check_connection()` method The method is public, will be documented as available to use as check parameter, but also used internally by the `check()` method. --- diff --git a/docs/api/pool.rst b/docs/api/pool.rst index 69104e870..265b6f62d 100644 --- a/docs/api/pool.rst +++ b/docs/api/pool.rst @@ -218,6 +218,10 @@ The `!ConnectionPool` class .. automethod:: resize .. automethod:: check + .. automethod:: check_connection + + .. versionadded:: 3.2 + .. automethod:: get_stats .. automethod:: pop_stats @@ -327,6 +331,10 @@ listed here. .. automethod:: wait .. automethod:: resize .. automethod:: check + .. automethod:: check_connection + + .. versionadded:: 3.2 + .. automethod:: getconn .. automethod:: putconn diff --git a/docs/news_pool.rst b/docs/news_pool.rst index 636dc9a8c..a60184a06 100644 --- a/docs/news_pool.rst +++ b/docs/news_pool.rst @@ -15,6 +15,7 @@ psycopg_pool 3.2.0 (unreleased) - Add support for async `!reconnect_failed` callbacks in `AsyncConnectionPool` (:ticket:`#520`). +- Add `ConnectionPool.check_connection()` method. - Make connection pool classes generic on the connection type (:ticket:`#559`). - Raise a warning if sync pools rely an implicit `!open=True` and the pool context is not used. In the future the default will become `!False` diff --git a/psycopg_pool/psycopg_pool/pool.py b/psycopg_pool/psycopg_pool/pool.py index 7c1cf6339..0a44757da 100644 --- a/psycopg_pool/psycopg_pool/pool.py +++ b/psycopg_pool/psycopg_pool/pool.py @@ -535,9 +535,7 @@ class ConnectionPool(Generic[CT], BasePool): # Check for broken connections try: - conn.execute("SELECT 1") - if conn.pgconn.transaction_status == TransactionStatus.INTRANS: - conn.rollback() + self.check_connection(conn) except Exception: self._stats[self._CONNECTIONS_LOST] += 1 logger.warning("discarding broken connection: %s", conn) @@ -545,6 +543,25 @@ class ConnectionPool(Generic[CT], BasePool): else: self._add_to_pool(conn) + @staticmethod + def check_connection(conn: CT) -> None: + """ + A simple check to verify that a connection is still working. + + Return quietly if the connection is still working, otherwise raise + an exception. + + Used internally by `check()`, but also available for client usage. + """ + if conn.autocommit: + conn.execute("SELECT 1") + else: + conn.autocommit = True + try: + conn.execute("SELECT 1") + finally: + conn.autocommit = False + def reconnect_failed(self) -> None: """ Called when reconnection failed for longer than `reconnect_timeout`. diff --git a/psycopg_pool/psycopg_pool/pool_async.py b/psycopg_pool/psycopg_pool/pool_async.py index a75418392..81d5db47f 100644 --- a/psycopg_pool/psycopg_pool/pool_async.py +++ b/psycopg_pool/psycopg_pool/pool_async.py @@ -563,9 +563,7 @@ class AsyncConnectionPool(Generic[ACT], BasePool): # Check for broken connections try: - await conn.execute("SELECT 1") - if conn.pgconn.transaction_status == TransactionStatus.INTRANS: - await conn.rollback() + await self.check_connection(conn) except Exception: self._stats[self._CONNECTIONS_LOST] += 1 logger.warning("discarding broken connection: %s", conn) @@ -573,6 +571,35 @@ class AsyncConnectionPool(Generic[ACT], BasePool): else: await self._add_to_pool(conn) + @staticmethod + async def check_connection(conn: ACT) -> None: + """ + A simple check to verify that a connection is still working. + + Return quietly if the connection is still working, otherwise raise + an exception. + + Used internally by `check()`, but also available for client usage. + """ + if conn.autocommit: + await conn.execute("SELECT 1") + else: + if True: # ASYNC + # NOTE: with Psycopg 3.2 we could use conn.set_autocommit() in + # the sync code too, but we want the pool to be compatible with + # previous versions too. + await conn.set_autocommit(True) + try: + await conn.execute("SELECT 1") + finally: + await conn.set_autocommit(False) + else: + conn.autocommit = True + try: + conn.execute("SELECT 1") + finally: + conn.autocommit = False + async def reconnect_failed(self) -> None: """ Called when reconnection failed for longer than `reconnect_timeout`. diff --git a/tests/pool/test_pool_common.py b/tests/pool/test_pool_common.py index df76266a5..d891e2b02 100644 --- a/tests/pool/test_pool_common.py +++ b/tests/pool/test_pool_common.py @@ -577,6 +577,24 @@ def test_debug_deadlock(pool_cls, dsn): logger.setLevel(old_level) +@pytest.mark.crdb_skip("pg_terminate_backend") +@pytest.mark.parametrize("autocommit", [True, False]) +def test_check_connection(pool_cls, conn_cls, dsn, autocommit): + conn = conn_cls.connect(dsn) + conn.set_autocommit(autocommit) + pool_cls.check_connection(conn) + assert not conn.closed + assert conn.info.transaction_status == psycopg.pq.TransactionStatus.IDLE + + with conn_cls.connect(dsn) as conn2: + conn2.execute("select pg_terminate_backend(%s)", [conn.info.backend_pid]) + + with pytest.raises(psycopg.OperationalError): + pool_cls.check_connection(conn) + + assert conn.closed + + @skip_sync def test_cancellation_in_queue(pool_cls, dsn): # https://github.com/psycopg/psycopg/issues/509 diff --git a/tests/pool/test_pool_common_async.py b/tests/pool/test_pool_common_async.py index f23165253..09b41d5db 100644 --- a/tests/pool/test_pool_common_async.py +++ b/tests/pool/test_pool_common_async.py @@ -596,6 +596,24 @@ async def test_debug_deadlock(pool_cls, dsn): logger.setLevel(old_level) +@pytest.mark.crdb_skip("pg_terminate_backend") +@pytest.mark.parametrize("autocommit", [True, False]) +async def test_check_connection(pool_cls, aconn_cls, dsn, autocommit): + conn = await aconn_cls.connect(dsn) + await conn.set_autocommit(autocommit) + await pool_cls.check_connection(conn) + assert not conn.closed + assert conn.info.transaction_status == psycopg.pq.TransactionStatus.IDLE + + async with await aconn_cls.connect(dsn) as conn2: + await conn2.execute("select pg_terminate_backend(%s)", [conn.info.backend_pid]) + + with pytest.raises(psycopg.OperationalError): + await pool_cls.check_connection(conn) + + assert conn.closed + + @skip_sync async def test_cancellation_in_queue(pool_cls, dsn): # https://github.com/psycopg/psycopg/issues/509