]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
feat(pool): add `check` connection parameter
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Fri, 6 Oct 2023 13:15:04 +0000 (15:15 +0200)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Sat, 14 Oct 2023 07:45:35 +0000 (09:45 +0200)
Allow to specify a callback to call on getconn and discard the
connection if not in working state.

docs/advanced/pool.rst
docs/api/pool.rst
docs/news_pool.rst
psycopg_pool/psycopg_pool/null_pool.py
psycopg_pool/psycopg_pool/null_pool_async.py
psycopg_pool/psycopg_pool/pool.py
psycopg_pool/psycopg_pool/pool_async.py
tests/pool/test_pool_common.py
tests/pool/test_pool_common_async.py

index 440199c04804d5ee21d53861def128384e130238..b12cd13175dd835d3de293a095a9231eb1b5be4c 100644 (file)
@@ -460,5 +460,5 @@ Metric                  Meaning
                         server
  ``connections_errors`` Number of failed connection attempts
  ``connections_lost``   Number of connections lost identified by
-                        `~ConnectionPool.check()`
+                        `~ConnectionPool.check()` or by the `!check` callback
 ======================= =====================================================
index 265b6f62d0b12d7abaa2598080f105095a96c485..949a5224a9ef3657f8b953597383cef507e3e590 100644 (file)
@@ -82,6 +82,15 @@ The `!ConnectionPool` class
                 documentation for more details.
    :type open: `!bool`, default: `!True`
 
+   :param check: A callback to check that a connection is working correctly
+                 when obtained by the pool. The callback is called at every
+                 `getconn()` or `connection()`: the connection is only passed
+                 to the client if the callback doesn't throw an exception.
+                 By default no check is made on the connection. You can
+                 provide the `check_connection()` pool static method if you
+                 want to perform a simple check.
+   :type check: `Callable[[Connection], None]`
+
    :param configure: A callback to configure a connection after creation.
                      Useful, for instance, to configure its adapters. If the
                      connection is used to run internal queries (to inspect the
@@ -151,11 +160,12 @@ The `!ConnectionPool` class
    :type num_workers: `!int`, default: 3
 
    .. versionchanged:: 3.1
-
-        added `!open` parameter to init method.
+        added `!open` parameter to the constructor.
 
    .. versionchanged:: 3.2
+        added `!check` parameter to the constructor.
 
+   .. versionchanged:: 3.2
         The class is generic and `!connection_class` provides types type
         variable. See :ref:`pool-generic`.
 
@@ -257,8 +267,8 @@ class but its blocking methods are implemented as `!async` coroutines. It
 returns instances of `~psycopg.AsyncConnection`, or of its subclass if
 specified so in the `!connection_class` parameter.
 
-Only the functions with different signature from `!ConnectionPool` are
-listed here.
+Only the functions and parameters with different signature from
+`!ConnectionPool` are listed here.
 
 .. autoclass:: AsyncConnectionPool
 
@@ -266,6 +276,10 @@ listed here.
                             be an `!AsyncConnection` subclass.
    :type connection_class: `!type`, default: `~psycopg.AsyncConnection`
 
+   :param check: A callback to check that a connection is working correctly
+                 when obtained by the pool.
+   :type check: `async Callable[[Connection], None]`
+
    :param configure: A callback to configure a connection after creation.
    :type configure: `async Callable[[AsyncConnection], None]`
 
@@ -278,6 +292,9 @@ listed here.
    :type reconnect_failed: `Callable[[AsyncConnectionPool], None]` or
         `async Callable[[AsyncConnectionPool], None]`
 
+   .. versionchanged:: 3.2
+        added `!check` parameter to the constructor.
+
    .. versionchanged:: 3.2
         The `!reconnect_failed` parameter can be `!async`.
 
index a60184a063e106689576a6c697ef8d5b925405ed..3a424bae17ecd0cc2db3703121d473ed881d8a2e 100644 (file)
@@ -15,7 +15,8 @@ psycopg_pool 3.2.0 (unreleased)
 
 - Add support for async `!reconnect_failed` callbacks in `AsyncConnectionPool`
   (:ticket:`#520`).
-- Add `ConnectionPool.check_connection()` method.
+- Add `!check` parameter to the pool constructor and
+  `~ConnectionPool.check_connection()` method. (:ticket:`#656`).
 - Make connection pool classes generic on the connection type (:ticket:`#559`).
 - Raise a warning if sync pools rely an implicit `!open=True` and the
   pool context is not used. In the future the default will become `!False`
@@ -94,7 +95,7 @@ psycopg_pool 3.1.0
 ------------------
 
 - Add :ref:`null-pool` (:ticket:`#148`).
-- Add `ConnectionPool.open()` and ``open`` parameter to the pool init
+- Add `ConnectionPool.open()` and `!open` parameter to the pool constructor
   (:ticket:`#151`).
 - Drop support for Python 3.6.
 
index 60c5dff34e2983193a46920adb2fbc826122b672..7f67c1cf11ecaa16e0d0392ca28adadcf2f45c7b 100644 (file)
@@ -33,6 +33,7 @@ class NullConnectionPool(_BaseNullConnectionPool, ConnectionPool[CT]):
         conninfo: str = "",
         *,
         open: bool | None = ...,
+        check: Optional[ConnectionCB[CT]] = ...,
         configure: Optional[ConnectionCB[CT]] = ...,
         reset: Optional[ConnectionCB[CT]] = ...,
         kwargs: Optional[Dict[str, Any]] = ...,
@@ -56,6 +57,7 @@ class NullConnectionPool(_BaseNullConnectionPool, ConnectionPool[CT]):
         *,
         open: bool | None = ...,
         connection_class: Type[CT],
+        check: Optional[ConnectionCB[CT]] = ...,
         configure: Optional[ConnectionCB[CT]] = ...,
         reset: Optional[ConnectionCB[CT]] = ...,
         kwargs: Optional[Dict[str, Any]] = ...,
@@ -78,6 +80,7 @@ class NullConnectionPool(_BaseNullConnectionPool, ConnectionPool[CT]):
         *,
         open: bool | None = None,
         connection_class: Type[CT] = cast(Type[CT], Connection),
+        check: Optional[ConnectionCB[CT]] = None,
         configure: Optional[ConnectionCB[CT]] = None,
         reset: Optional[ConnectionCB[CT]] = None,
         kwargs: Optional[Dict[str, Any]] = None,
@@ -96,6 +99,7 @@ class NullConnectionPool(_BaseNullConnectionPool, ConnectionPool[CT]):
             conninfo,
             open=open,
             connection_class=connection_class,
+            check=check,
             configure=configure,
             reset=reset,
             kwargs=kwargs,
index a4a33e46aaab8ef2f68c3cc61a13120e840e298e..51bde7d52da90477a50a1f24eaa1c280b7d37881 100644 (file)
@@ -30,6 +30,7 @@ class AsyncNullConnectionPool(_BaseNullConnectionPool, AsyncConnectionPool[ACT])
         conninfo: str = "",
         *,
         open: bool | None = ...,
+        check: Optional[AsyncConnectionCB[ACT]] = ...,
         configure: Optional[AsyncConnectionCB[ACT]] = ...,
         reset: Optional[AsyncConnectionCB[ACT]] = ...,
         kwargs: Optional[Dict[str, Any]] = ...,
@@ -53,6 +54,7 @@ class AsyncNullConnectionPool(_BaseNullConnectionPool, AsyncConnectionPool[ACT])
         *,
         open: bool | None = ...,
         connection_class: Type[ACT],
+        check: Optional[AsyncConnectionCB[ACT]] = ...,
         configure: Optional[AsyncConnectionCB[ACT]] = ...,
         reset: Optional[AsyncConnectionCB[ACT]] = ...,
         kwargs: Optional[Dict[str, Any]] = ...,
@@ -75,6 +77,7 @@ class AsyncNullConnectionPool(_BaseNullConnectionPool, AsyncConnectionPool[ACT])
         *,
         open: bool | None = None,
         connection_class: Type[ACT] = cast(Type[ACT], AsyncConnection),
+        check: Optional[AsyncConnectionCB[ACT]] = None,
         configure: Optional[AsyncConnectionCB[ACT]] = None,
         reset: Optional[AsyncConnectionCB[ACT]] = None,
         kwargs: Optional[Dict[str, Any]] = None,
@@ -93,6 +96,7 @@ class AsyncNullConnectionPool(_BaseNullConnectionPool, AsyncConnectionPool[ACT])
             conninfo,
             open=open,
             connection_class=connection_class,
+            check=check,
             configure=configure,
             reset=reset,
             kwargs=kwargs,
index 0a44757da334c0d86fa6152c1d456bd5180f0a66..9945b96f411c30bf808cf3eb4cb6e6df12ad9ac5 100644 (file)
@@ -46,6 +46,7 @@ class ConnectionPool(Generic[CT], BasePool):
         conninfo: str = "",
         *,
         open: bool | None = ...,
+        check: Optional[ConnectionCB[CT]] = ...,
         configure: Optional[ConnectionCB[CT]] = ...,
         reset: Optional[ConnectionCB[CT]] = ...,
         kwargs: Optional[Dict[str, Any]] = ...,
@@ -69,6 +70,7 @@ class ConnectionPool(Generic[CT], BasePool):
         *,
         open: bool | None = ...,
         connection_class: Type[CT],
+        check: Optional[ConnectionCB[CT]] = ...,
         configure: Optional[ConnectionCB[CT]] = ...,
         reset: Optional[ConnectionCB[CT]] = ...,
         kwargs: Optional[Dict[str, Any]] = ...,
@@ -91,6 +93,7 @@ class ConnectionPool(Generic[CT], BasePool):
         *,
         open: bool | None = None,
         connection_class: Type[CT] = cast(Type[CT], Connection),
+        check: Optional[ConnectionCB[CT]] = None,
         configure: Optional[ConnectionCB[CT]] = None,
         reset: Optional[ConnectionCB[CT]] = None,
         kwargs: Optional[Dict[str, Any]] = None,
@@ -106,6 +109,7 @@ class ConnectionPool(Generic[CT], BasePool):
         num_workers: int = 3,
     ):
         self.connection_class = connection_class
+        self._check = check
         self._configure = configure
         self._reset = reset
 
@@ -309,7 +313,13 @@ class ConnectionPool(Generic[CT], BasePool):
         return conn
 
     def _check_connection(self, conn: CT) -> None:
-        pass
+        if not self._check:
+            return
+        try:
+            self._check(conn)
+        except Exception as e:
+            logger.info("connection failed check: %s", e)
+            raise
 
     def _maybe_grow_pool(self) -> None:
         # Allow only one task at time to grow the pool (or returning
@@ -551,7 +561,8 @@ class ConnectionPool(Generic[CT], BasePool):
         Return quietly if the connection is still working, otherwise raise
         an exception.
 
-        Used internally by `check()`, but also available for client usage.
+        Used internally by `check()`, but also available for client usage,
+        for instance as `!check` callback when a pool is created.
         """
         if conn.autocommit:
             conn.execute("SELECT 1")
index 81d5db47fcfe61f3b26d938764a82c1f26e115b6..ed8d08b13b34e90fd0e36dd4b8a3ef9f53cedd2d 100644 (file)
@@ -45,6 +45,7 @@ class AsyncConnectionPool(Generic[ACT], BasePool):
         conninfo: str = "",
         *,
         open: bool | None = ...,
+        check: Optional[AsyncConnectionCB[ACT]] = ...,
         configure: Optional[AsyncConnectionCB[ACT]] = ...,
         reset: Optional[AsyncConnectionCB[ACT]] = ...,
         kwargs: Optional[Dict[str, Any]] = ...,
@@ -68,6 +69,7 @@ class AsyncConnectionPool(Generic[ACT], BasePool):
         *,
         open: bool | None = ...,
         connection_class: Type[ACT],
+        check: Optional[AsyncConnectionCB[ACT]] = ...,
         configure: Optional[AsyncConnectionCB[ACT]] = ...,
         reset: Optional[AsyncConnectionCB[ACT]] = ...,
         kwargs: Optional[Dict[str, Any]] = ...,
@@ -90,6 +92,7 @@ class AsyncConnectionPool(Generic[ACT], BasePool):
         *,
         open: bool | None = None,
         connection_class: Type[ACT] = cast(Type[ACT], AsyncConnection),
+        check: Optional[AsyncConnectionCB[ACT]] = None,
         configure: Optional[AsyncConnectionCB[ACT]] = None,
         reset: Optional[AsyncConnectionCB[ACT]] = None,
         kwargs: Optional[Dict[str, Any]] = None,
@@ -105,6 +108,7 @@ class AsyncConnectionPool(Generic[ACT], BasePool):
         num_workers: int = 3,
     ):
         self.connection_class = connection_class
+        self._check = check
         self._configure = configure
         self._reset = reset
 
@@ -330,7 +334,13 @@ class AsyncConnectionPool(Generic[ACT], BasePool):
         return conn
 
     async def _check_connection(self, conn: ACT) -> None:
-        pass
+        if not self._check:
+            return
+        try:
+            await self._check(conn)
+        except Exception as e:
+            logger.info("connection failed check: %s", e)
+            raise
 
     def _maybe_grow_pool(self) -> None:
         # Allow only one task at time to grow the pool (or returning
@@ -579,7 +589,8 @@ class AsyncConnectionPool(Generic[ACT], BasePool):
         Return quietly if the connection is still working, otherwise raise
         an exception.
 
-        Used internally by `check()`, but also available for client usage.
+        Used internally by `check()`, but also available for client usage,
+        for instance as `!check` callback when a pool is created.
         """
         if conn.autocommit:
             await conn.execute("SELECT 1")
index d891e2b02cf7715de23905447f25f6bca245700a..72274074e408858b4994118f6558e8986df1d1f8 100644 (file)
@@ -595,6 +595,20 @@ def test_check_connection(pool_cls, conn_cls, dsn, autocommit):
     assert conn.closed
 
 
+def test_check_init(pool_cls, dsn):
+    checked = False
+
+    def check(conn):
+        nonlocal checked
+        checked = True
+
+    with pool_cls(dsn, check=check) as p:
+        with p.connection(timeout=1.0) as conn:
+            conn.execute("select 1")
+
+    assert checked
+
+
 @skip_sync
 def test_cancellation_in_queue(pool_cls, dsn):
     # https://github.com/psycopg/psycopg/issues/509
index 09b41d5db4bd8d69e9554ce98e9615e5f3b84117..90a06fbf4a276c7a1d728ee4d58aca4e50c2875c 100644 (file)
@@ -614,6 +614,20 @@ async def test_check_connection(pool_cls, aconn_cls, dsn, autocommit):
     assert conn.closed
 
 
+async def test_check_init(pool_cls, dsn):
+    checked = False
+
+    async def check(conn):
+        nonlocal checked
+        checked = True
+
+    async with pool_cls(dsn, check=check) as p:
+        async with p.connection(timeout=1.0) as conn:
+            await conn.execute("select 1")
+
+    assert checked
+
+
 @skip_sync
 async def test_cancellation_in_queue(pool_cls, dsn):
     # https://github.com/psycopg/psycopg/issues/509