From: Daniele Varrazzo Date: Wed, 10 Mar 2021 18:34:00 +0000 (+0100) Subject: Add async pool stats X-Git-Tag: 3.0.dev0~87^2~12 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=17e9cb5b6a883ddac520cd7bcbc75abe92fa21ca;p=thirdparty%2Fpsycopg.git Add async pool stats --- diff --git a/psycopg3/psycopg3/pool/async_pool.py b/psycopg3/psycopg3/pool/async_pool.py index dd729732f..b1a78eb2a 100644 --- a/psycopg3/psycopg3/pool/async_pool.py +++ b/psycopg3/psycopg3/pool/async_pool.py @@ -11,7 +11,7 @@ from abc import ABC, abstractmethod from time import monotonic from types import TracebackType from typing import Any, AsyncIterator, Awaitable, Callable, Deque -from typing import List, Optional, Type +from typing import Dict, List, Optional, Type from weakref import ref from collections import deque @@ -79,43 +79,6 @@ class AsyncConnectionPool(BasePool[AsyncConnection]): # remained unused. self.run_task(Schedule(self, ShrinkPool(self), self.max_idle)) - def run_task(self, task: "MaintenanceTask") -> None: - """Run a maintenance task in a worker thread.""" - self._tasks.put_nowait(task) - - async def schedule_task( - self, task: "MaintenanceTask", delay: float - ) -> None: - """Run a maintenance task in a worker thread in the future.""" - await self._sched.enter(delay, task.tick) - - @classmethod - async def worker(cls, q: "asyncio.Queue[MaintenanceTask]") -> None: - """Runner to execute pending maintenance task. - - The function is designed to run as a separate thread. - - Block on the queue *q*, run a task received. Finish running if a - StopWorker is received. - """ - while True: - task = await q.get() - - if isinstance(task, StopWorker): - logger.debug("terminating working task") - return - - # Run the task. Make sure don't die in the attempt. - try: - await task.run() - except Exception as ex: - logger.warning( - "task run %s failed: %s: %s", - task, - ex.__class__.__name__, - ex, - ) - async def wait(self, timeout: float = 30.0) -> None: """ Wait for the pool to be full after init. @@ -156,10 +119,13 @@ class AsyncConnectionPool(BasePool[AsyncConnection]): replace it with a new one. """ conn = await self.getconn(timeout=timeout) + t0 = monotonic() try: async with conn: yield conn finally: + t1 = monotonic() + self._stats[self._USAGE_MS] += int(1000.0 * (t1 - t0)) await self.putconn(conn) async def getconn( @@ -175,6 +141,7 @@ class AsyncConnectionPool(BasePool[AsyncConnection]): you don't want a depleted pool. """ logger.info("connection requested to %r", self.name) + self._stats[self._REQUESTS_NUM] += 1 # Critical section: decide here if there's a connection ready # or if the client needs to wait. async with self._lock: @@ -189,8 +156,10 @@ class AsyncConnectionPool(BasePool[AsyncConnection]): self._nconns_min = len(self._pool) else: # No connection available: put the client in the waiting queue + t0 = monotonic() pos = AsyncClient() self._waiting.append(pos) + self._stats[self._REQUESTS_QUEUED] += 1 # If there is space for the pool to grow, let's do it if self._nconns < self._maxconn: @@ -205,7 +174,14 @@ class AsyncConnectionPool(BasePool[AsyncConnection]): if pos: if timeout is None: timeout = self.timeout - conn = await pos.wait(timeout=timeout) + try: + conn = await pos.wait(timeout=timeout) + except Exception: + self._stats[self._REQUESTS_TIMEOUTS] += 1 + raise + finally: + t1 = monotonic() + self._stats[self._REQUESTS_WAIT_MS] += int(1000.0 * (t1 - t0)) # Tell the connection it belongs to a pool to avoid closing on __exit__ # Note that this property shouldn't be set while the connection is in @@ -343,6 +319,7 @@ class AsyncConnectionPool(BasePool[AsyncConnection]): try: await conn.execute("select 1") except Exception: + self._stats[self._CONNECTIONS_LOST] += 1 logger.warning("discarding broken connection: %s", conn) self.run_task(AddConnection(self)) else: @@ -354,11 +331,58 @@ class AsyncConnectionPool(BasePool[AsyncConnection]): """ self._reconnect_failed(self) + def run_task(self, task: "MaintenanceTask") -> None: + """Run a maintenance task in a worker thread.""" + self._tasks.put_nowait(task) + + async def schedule_task( + self, task: "MaintenanceTask", delay: float + ) -> None: + """Run a maintenance task in a worker thread in the future.""" + await self._sched.enter(delay, task.tick) + + @classmethod + async def worker(cls, q: "asyncio.Queue[MaintenanceTask]") -> None: + """Runner to execute pending maintenance task. + + The function is designed to run as a separate thread. + + Block on the queue *q*, run a task received. Finish running if a + StopWorker is received. + """ + while True: + task = await q.get() + + if isinstance(task, StopWorker): + logger.debug("terminating working task") + return + + # Run the task. Make sure don't die in the attempt. + try: + await task.run() + except Exception as ex: + logger.warning( + "task run %s failed: %s: %s", + task, + ex.__class__.__name__, + ex, + ) + async def _connect(self) -> AsyncConnection: """Return a new connection configured for the pool.""" - conn = await self.connection_class.connect( - self.conninfo, **self.kwargs - ) + self._stats[self._CONNECTIONS_NUM] += 1 + t0 = monotonic() + try: + conn = await self.connection_class.connect( + self.conninfo, **self.kwargs + ) + except Exception: + self._stats[self._CONNECTIONS_ERRORS] += 1 + raise + else: + t1 = monotonic() + self._stats[self._CONNECTIONS_MS] += int(1000.0 * (t1 - t0)) + conn._pool = self if self._configure: @@ -420,6 +444,7 @@ class AsyncConnectionPool(BasePool[AsyncConnection]): """ await self._reset_connection(conn) if conn.pgconn.transaction_status == TransactionStatus.UNKNOWN: + self._stats[self._RETURNS_BAD] += 1 # Connection no more in working state: create a new one. self.run_task(AddConnection(self)) logger.warning("discarding closed connection: %s", conn) @@ -533,6 +558,11 @@ class AsyncConnectionPool(BasePool[AsyncConnection]): ) await to_close.close() + def _get_measures(self) -> Dict[str, int]: + rv = super()._get_measures() + rv[self._QUEUE_LENGTH] = len(self._waiting) + return rv + class AsyncClient: """A position in a queue for a client waiting for a connection.""" diff --git a/tests/pool/test_pool_async.py b/tests/pool/test_pool_async.py index 1aab09cc4..95404af55 100644 --- a/tests/pool/test_pool_async.py +++ b/tests/pool/test_pool_async.py @@ -828,6 +828,98 @@ async def test_check(dsn, caplog): assert pid not in pids2 +@pytest.mark.slow +async def test_stats_measures(dsn): + async def worker(n): + async with p.connection() as conn: + await conn.execute("select pg_sleep(0.2)") + + async with pool.AsyncConnectionPool(dsn, minconn=2, maxconn=4) as p: + await p.wait(2.0) + + stats = p.get_stats() + assert stats["pool_min"] == 2 + assert stats["pool_max"] == 4 + assert stats["pool_size"] == 2 + assert stats["pool_available"] == 2 + assert stats["queue_length"] == 0 + + ts = [create_task(worker(i)) for i in range(3)] + await asyncio.sleep(0.1) + stats = p.get_stats() + await asyncio.gather(*ts) + assert stats["pool_min"] == 2 + assert stats["pool_max"] == 4 + assert stats["pool_size"] == 3 + assert stats["pool_available"] == 0 + assert stats["queue_length"] == 0 + + await p.wait(2.0) + ts = [create_task(worker(i)) for i in range(7)] + await asyncio.sleep(0.1) + stats = p.get_stats() + await asyncio.gather(*ts) + assert stats["pool_min"] == 2 + assert stats["pool_max"] == 4 + assert stats["pool_size"] == 4 + assert stats["pool_available"] == 0 + assert stats["queue_length"] == 3 + + +@pytest.mark.slow +async def test_stats_usage(dsn): + async def worker(n): + try: + async with p.connection(timeout=0.3) as conn: + await conn.execute("select pg_sleep(0.2)") + except pool.PoolTimeout: + pass + + async with pool.AsyncConnectionPool(dsn, minconn=3) as p: + await p.wait(2.0) + + ts = [create_task(worker(i)) for i in range(7)] + await asyncio.gather(*ts) + stats = p.get_stats() + assert stats["requests_num"] == 7 + assert stats["requests_queued"] == 4 + assert 850 <= stats["requests_wait_ms"] <= 950 + assert stats["requests_timeouts"] == 1 + assert 1150 <= stats["usage_ms"] <= 1250 + assert stats.get("returns_bad", 0) == 0 + + async with p.connection() as conn: + await conn.close() + await p.wait() + stats = p.pop_stats() + assert stats["requests_num"] == 8 + assert stats["returns_bad"] == 1 + async with p.connection(): + pass + assert p.get_stats()["requests_num"] == 1 + + +@pytest.mark.slow +async def test_stats_connect(dsn, proxy, monkeypatch): + proxy.start() + delay_connection(monkeypatch, 0.2) + async with pool.AsyncConnectionPool(proxy.client_dsn, minconn=3) as p: + await p.wait() + stats = p.get_stats() + assert stats["connections_num"] == 3 + assert stats.get("connections_errors", 0) == 0 + assert stats.get("connections_lost", 0) == 0 + assert 600 <= stats["connections_ms"] < 1200 + + proxy.stop() + await p.check() + await asyncio.sleep(0.1) + stats = p.get_stats() + assert stats["connections_num"] > 3 + assert stats["connections_errors"] > 0 + assert stats["connections_lost"] == 3 + + def delay_connection(monkeypatch, sec): """ Return a _connect_gen function delayed by the amount of seconds