.. automethod:: resize
.. automethod:: check
+ .. automethod:: check_connection
+
+ .. versionadded:: 3.2
+
.. automethod:: get_stats
.. automethod:: pop_stats
.. automethod:: wait
.. automethod:: resize
.. automethod:: check
+ .. automethod:: check_connection
+
+ .. versionadded:: 3.2
+
.. automethod:: getconn
.. automethod:: putconn
- Add support for async `!reconnect_failed` callbacks in `AsyncConnectionPool`
(:ticket:`#520`).
+- Add `ConnectionPool.check_connection()` method.
- Make connection pool classes generic on the connection type (:ticket:`#559`).
- Raise a warning if sync pools rely an implicit `!open=True` and the
pool context is not used. In the future the default will become `!False`
# Check for broken connections
try:
- conn.execute("SELECT 1")
- if conn.pgconn.transaction_status == TransactionStatus.INTRANS:
- conn.rollback()
+ self.check_connection(conn)
except Exception:
self._stats[self._CONNECTIONS_LOST] += 1
logger.warning("discarding broken connection: %s", conn)
else:
self._add_to_pool(conn)
+ @staticmethod
+ def check_connection(conn: CT) -> None:
+ """
+ A simple check to verify that a connection is still working.
+
+ Return quietly if the connection is still working, otherwise raise
+ an exception.
+
+ Used internally by `check()`, but also available for client usage.
+ """
+ if conn.autocommit:
+ conn.execute("SELECT 1")
+ else:
+ conn.autocommit = True
+ try:
+ conn.execute("SELECT 1")
+ finally:
+ conn.autocommit = False
+
def reconnect_failed(self) -> None:
"""
Called when reconnection failed for longer than `reconnect_timeout`.
# Check for broken connections
try:
- await conn.execute("SELECT 1")
- if conn.pgconn.transaction_status == TransactionStatus.INTRANS:
- await conn.rollback()
+ await self.check_connection(conn)
except Exception:
self._stats[self._CONNECTIONS_LOST] += 1
logger.warning("discarding broken connection: %s", conn)
else:
await self._add_to_pool(conn)
+ @staticmethod
+ async def check_connection(conn: ACT) -> None:
+ """
+ A simple check to verify that a connection is still working.
+
+ Return quietly if the connection is still working, otherwise raise
+ an exception.
+
+ Used internally by `check()`, but also available for client usage.
+ """
+ if conn.autocommit:
+ await conn.execute("SELECT 1")
+ else:
+ if True: # ASYNC
+ # NOTE: with Psycopg 3.2 we could use conn.set_autocommit() in
+ # the sync code too, but we want the pool to be compatible with
+ # previous versions too.
+ await conn.set_autocommit(True)
+ try:
+ await conn.execute("SELECT 1")
+ finally:
+ await conn.set_autocommit(False)
+ else:
+ conn.autocommit = True
+ try:
+ conn.execute("SELECT 1")
+ finally:
+ conn.autocommit = False
+
async def reconnect_failed(self) -> None:
"""
Called when reconnection failed for longer than `reconnect_timeout`.
logger.setLevel(old_level)
+@pytest.mark.crdb_skip("pg_terminate_backend")
+@pytest.mark.parametrize("autocommit", [True, False])
+def test_check_connection(pool_cls, conn_cls, dsn, autocommit):
+ conn = conn_cls.connect(dsn)
+ conn.set_autocommit(autocommit)
+ pool_cls.check_connection(conn)
+ assert not conn.closed
+ assert conn.info.transaction_status == psycopg.pq.TransactionStatus.IDLE
+
+ with conn_cls.connect(dsn) as conn2:
+ conn2.execute("select pg_terminate_backend(%s)", [conn.info.backend_pid])
+
+ with pytest.raises(psycopg.OperationalError):
+ pool_cls.check_connection(conn)
+
+ assert conn.closed
+
+
@skip_sync
def test_cancellation_in_queue(pool_cls, dsn):
# https://github.com/psycopg/psycopg/issues/509
logger.setLevel(old_level)
+@pytest.mark.crdb_skip("pg_terminate_backend")
+@pytest.mark.parametrize("autocommit", [True, False])
+async def test_check_connection(pool_cls, aconn_cls, dsn, autocommit):
+ conn = await aconn_cls.connect(dsn)
+ await conn.set_autocommit(autocommit)
+ await pool_cls.check_connection(conn)
+ assert not conn.closed
+ assert conn.info.transaction_status == psycopg.pq.TransactionStatus.IDLE
+
+ async with await aconn_cls.connect(dsn) as conn2:
+ await conn2.execute("select pg_terminate_backend(%s)", [conn.info.backend_pid])
+
+ with pytest.raises(psycopg.OperationalError):
+ await pool_cls.check_connection(conn)
+
+ assert conn.closed
+
+
@skip_sync
async def test_cancellation_in_queue(pool_cls, dsn):
# https://github.com/psycopg/psycopg/issues/509