]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
feat(pool): add `check_connection()` method
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Fri, 6 Oct 2023 13:13:39 +0000 (15:13 +0200)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Sat, 14 Oct 2023 07:40:20 +0000 (09:40 +0200)
The method is public, will be documented as available to use as check
parameter, but also used internally by the `check()` method.

docs/api/pool.rst
docs/news_pool.rst
psycopg_pool/psycopg_pool/pool.py
psycopg_pool/psycopg_pool/pool_async.py
tests/pool/test_pool_common.py
tests/pool/test_pool_common_async.py

index 69104e8708bd8a25af2163f404b11929f0f81a39..265b6f62d0b12d7abaa2598080f105095a96c485 100644 (file)
@@ -218,6 +218,10 @@ The `!ConnectionPool` class
 
    .. automethod:: resize
    .. automethod:: check
+   .. automethod:: check_connection
+
+      .. versionadded:: 3.2
+
    .. automethod:: get_stats
    .. automethod:: pop_stats
 
@@ -327,6 +331,10 @@ listed here.
    .. automethod:: wait
    .. automethod:: resize
    .. automethod:: check
+   .. automethod:: check_connection
+
+      .. versionadded:: 3.2
+
    .. automethod:: getconn
    .. automethod:: putconn
 
index 636dc9a8c03d7a6b3317f1c1b5db0764f07502aa..a60184a063e106689576a6c697ef8d5b925405ed 100644 (file)
@@ -15,6 +15,7 @@ psycopg_pool 3.2.0 (unreleased)
 
 - Add support for async `!reconnect_failed` callbacks in `AsyncConnectionPool`
   (:ticket:`#520`).
+- Add `ConnectionPool.check_connection()` method.
 - Make connection pool classes generic on the connection type (:ticket:`#559`).
 - Raise a warning if sync pools rely an implicit `!open=True` and the
   pool context is not used. In the future the default will become `!False`
index 7c1cf63398825f663ae2078ca9fb1c2dfc7962e6..0a44757da334c0d86fa6152c1d456bd5180f0a66 100644 (file)
@@ -535,9 +535,7 @@ class ConnectionPool(Generic[CT], BasePool):
 
             # Check for broken connections
             try:
-                conn.execute("SELECT 1")
-                if conn.pgconn.transaction_status == TransactionStatus.INTRANS:
-                    conn.rollback()
+                self.check_connection(conn)
             except Exception:
                 self._stats[self._CONNECTIONS_LOST] += 1
                 logger.warning("discarding broken connection: %s", conn)
@@ -545,6 +543,25 @@ class ConnectionPool(Generic[CT], BasePool):
             else:
                 self._add_to_pool(conn)
 
+    @staticmethod
+    def check_connection(conn: CT) -> None:
+        """
+        A simple check to verify that a connection is still working.
+
+        Return quietly if the connection is still working, otherwise raise
+        an exception.
+
+        Used internally by `check()`, but also available for client usage.
+        """
+        if conn.autocommit:
+            conn.execute("SELECT 1")
+        else:
+            conn.autocommit = True
+            try:
+                conn.execute("SELECT 1")
+            finally:
+                conn.autocommit = False
+
     def reconnect_failed(self) -> None:
         """
         Called when reconnection failed for longer than `reconnect_timeout`.
index a7541839236d526f13f79ff9d64132b595fe99c9..81d5db47fcfe61f3b26d938764a82c1f26e115b6 100644 (file)
@@ -563,9 +563,7 @@ class AsyncConnectionPool(Generic[ACT], BasePool):
 
             # Check for broken connections
             try:
-                await conn.execute("SELECT 1")
-                if conn.pgconn.transaction_status == TransactionStatus.INTRANS:
-                    await conn.rollback()
+                await self.check_connection(conn)
             except Exception:
                 self._stats[self._CONNECTIONS_LOST] += 1
                 logger.warning("discarding broken connection: %s", conn)
@@ -573,6 +571,35 @@ class AsyncConnectionPool(Generic[ACT], BasePool):
             else:
                 await self._add_to_pool(conn)
 
+    @staticmethod
+    async def check_connection(conn: ACT) -> None:
+        """
+        A simple check to verify that a connection is still working.
+
+        Return quietly if the connection is still working, otherwise raise
+        an exception.
+
+        Used internally by `check()`, but also available for client usage.
+        """
+        if conn.autocommit:
+            await conn.execute("SELECT 1")
+        else:
+            if True:  # ASYNC
+                # NOTE: with Psycopg 3.2 we could use conn.set_autocommit() in
+                # the sync code too, but we want the pool to be compatible with
+                # previous versions too.
+                await conn.set_autocommit(True)
+                try:
+                    await conn.execute("SELECT 1")
+                finally:
+                    await conn.set_autocommit(False)
+            else:
+                conn.autocommit = True
+                try:
+                    conn.execute("SELECT 1")
+                finally:
+                    conn.autocommit = False
+
     async def reconnect_failed(self) -> None:
         """
         Called when reconnection failed for longer than `reconnect_timeout`.
index df76266a5aa1b46fcab2151b9017721341af59a5..d891e2b02cf7715de23905447f25f6bca245700a 100644 (file)
@@ -577,6 +577,24 @@ def test_debug_deadlock(pool_cls, dsn):
         logger.setLevel(old_level)
 
 
+@pytest.mark.crdb_skip("pg_terminate_backend")
+@pytest.mark.parametrize("autocommit", [True, False])
+def test_check_connection(pool_cls, conn_cls, dsn, autocommit):
+    conn = conn_cls.connect(dsn)
+    conn.set_autocommit(autocommit)
+    pool_cls.check_connection(conn)
+    assert not conn.closed
+    assert conn.info.transaction_status == psycopg.pq.TransactionStatus.IDLE
+
+    with conn_cls.connect(dsn) as conn2:
+        conn2.execute("select pg_terminate_backend(%s)", [conn.info.backend_pid])
+
+    with pytest.raises(psycopg.OperationalError):
+        pool_cls.check_connection(conn)
+
+    assert conn.closed
+
+
 @skip_sync
 def test_cancellation_in_queue(pool_cls, dsn):
     # https://github.com/psycopg/psycopg/issues/509
index f231652534604b1632b46d59426f62a4a17184b9..09b41d5db4bd8d69e9554ce98e9615e5f3b84117 100644 (file)
@@ -596,6 +596,24 @@ async def test_debug_deadlock(pool_cls, dsn):
         logger.setLevel(old_level)
 
 
+@pytest.mark.crdb_skip("pg_terminate_backend")
+@pytest.mark.parametrize("autocommit", [True, False])
+async def test_check_connection(pool_cls, aconn_cls, dsn, autocommit):
+    conn = await aconn_cls.connect(dsn)
+    await conn.set_autocommit(autocommit)
+    await pool_cls.check_connection(conn)
+    assert not conn.closed
+    assert conn.info.transaction_status == psycopg.pq.TransactionStatus.IDLE
+
+    async with await aconn_cls.connect(dsn) as conn2:
+        await conn2.execute("select pg_terminate_backend(%s)", [conn.info.backend_pid])
+
+    with pytest.raises(psycopg.OperationalError):
+        await pool_cls.check_connection(conn)
+
+    assert conn.closed
+
+
 @skip_sync
 async def test_cancellation_in_queue(pool_cls, dsn):
     # https://github.com/psycopg/psycopg/issues/509