From: Daniele Varrazzo Date: Wed, 10 Mar 2021 02:22:07 +0000 (+0100) Subject: Add pool stats X-Git-Tag: 3.0.dev0~87^2~15 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=ee188511efb5ad213ded109d3fab34f232933c7d;p=thirdparty%2Fpsycopg.git Add pool stats --- diff --git a/psycopg3/psycopg3/pool/base.py b/psycopg3/psycopg3/pool/base.py index c2d15ccc4..b4c94f72c 100644 --- a/psycopg3/psycopg3/pool/base.py +++ b/psycopg3/psycopg3/pool/base.py @@ -7,10 +7,15 @@ psycopg3 connection pool base class and functionalities. import logging from random import random from typing import Any, Callable, Deque, Dict, Generic, Optional -from collections import deque +from typing import TYPE_CHECKING +from collections import Counter, deque from ..proto import ConnectionType +if TYPE_CHECKING: + from typing import Counter as TCounter + + logger = logging.getLogger(__name__) @@ -19,6 +24,23 @@ class BasePool(Generic[ConnectionType]): # Used to generate pool names _num_pool = 0 + # Stats keys + _POOL_MIN = "pool_min" + _POOL_MAX = "pool_max" + _POOL_SIZE = "pool_size" + _POOL_AVAILABLE = "pool_available" + _QUEUE_LENGTH = "queue_length" + _USAGE_MS = "usage_ms" + _REQUESTS_NUM = "requests_num" + _REQUESTS_QUEUED = "requests_queued" + _REQUESTS_WAIT_MS = "requests_wait_ms" + _REQUESTS_TIMEOUTS = "requests_timeouts" + _RETURNS_BAD = "returns_bad" + _CONNECTIONS_NUM = "connections_num" + _CONNECTIONS_MS = "connections_ms" + _CONNECTIONS_ERRORS = "connections_errors" + _CONNECTIONS_LOST = "connections_lost" + def __init__( self, conninfo: str = "", @@ -62,6 +84,7 @@ class BasePool(Generic[ConnectionType]): self._nconns = minconn # currently in the pool, out, being prepared self._pool: Deque[ConnectionType] = deque() + self._stats: "TCounter[str]" = Counter() # Min number of connections in the pool in a max_idle unit of time. # It is reset periodically by the ShrinkPool scheduled task. @@ -93,6 +116,36 @@ class BasePool(Generic[ConnectionType]): """`!True` if the pool is closed.""" return self._closed + def get_stats(self) -> Dict[str, int]: + """ + Return current stats about the pool usage. + """ + rv = dict(self._stats) + rv.update(self._get_measures()) + return rv + + def pop_stats(self) -> Dict[str, int]: + """ + Return current stats about the pool usage. + + After the call, all the counters are reset to zero. + """ + stats, self._stats = self._stats, Counter() + rv = dict(stats) + rv.update(self._get_measures()) + return rv + + def _get_measures(self) -> Dict[str, int]: + """ + Return immediate measures of the pool (not counters). + """ + return { + self._POOL_MIN: self._minconn, + self._POOL_MAX: self._maxconn, + self._POOL_SIZE: self._nconns, + self._POOL_AVAILABLE: len(self._pool), + } + @classmethod def _jitter(cls, value: float, min_pc: float, max_pc: float) -> float: """ diff --git a/psycopg3/psycopg3/pool/pool.py b/psycopg3/psycopg3/pool/pool.py index 65ad83857..ddc954019 100644 --- a/psycopg3/psycopg3/pool/pool.py +++ b/psycopg3/psycopg3/pool/pool.py @@ -10,7 +10,7 @@ from abc import ABC, abstractmethod from time import monotonic from queue import Queue, Empty from types import TracebackType -from typing import Any, Callable, Deque, Iterator, List, Optional, Type +from typing import Any, Callable, Deque, Dict, Iterator, List, Optional, Type from weakref import ref from contextlib import contextmanager from collections import deque @@ -129,10 +129,13 @@ class ConnectionPool(BasePool[Connection]): replace it with a new one. """ conn = self.getconn(timeout=timeout) + t0 = monotonic() try: with conn: yield conn finally: + t1 = monotonic() + self._stats[self._USAGE_MS] += int(1000.0 * (t1 - t0)) self.putconn(conn) def getconn(self, timeout: Optional[float] = None) -> Connection: @@ -146,6 +149,7 @@ class ConnectionPool(BasePool[Connection]): you don't want a depleted pool. """ logger.info("connection requested to %r", self.name) + self._stats[self._REQUESTS_NUM] += 1 # Critical section: decide here if there's a connection ready # or if the client needs to wait. with self._lock: @@ -160,8 +164,10 @@ class ConnectionPool(BasePool[Connection]): self._nconns_min = len(self._pool) else: # No connection available: put the client in the waiting queue + t0 = monotonic() pos = WaitingClient() self._waiting.append(pos) + self._stats[self._REQUESTS_QUEUED] += 1 # If there is space for the pool to grow, let's do it if self._nconns < self._maxconn: @@ -176,7 +182,14 @@ class ConnectionPool(BasePool[Connection]): if pos: if timeout is None: timeout = self.timeout - conn = pos.wait(timeout=timeout) + try: + conn = pos.wait(timeout=timeout) + except Exception: + self._stats[self._REQUESTS_TIMEOUTS] += 1 + raise + finally: + t1 = monotonic() + self._stats[self._REQUESTS_WAIT_MS] += int(1000.0 * (t1 - t0)) # Tell the connection it belongs to a pool to avoid closing on __exit__ # Note that this property shouldn't be set while the connection is in @@ -313,6 +326,7 @@ class ConnectionPool(BasePool[Connection]): try: conn.execute("select 1") except Exception: + self._stats[self._CONNECTIONS_LOST] += 1 logger.warning("discarding broken connection: %s", conn) self.run_task(AddConnection(self)) else: @@ -372,7 +386,17 @@ class ConnectionPool(BasePool[Connection]): def _connect(self) -> Connection: """Return a new connection configured for the pool.""" - conn = Connection.connect(self.conninfo, **self.kwargs) + self._stats[self._CONNECTIONS_NUM] += 1 + t0 = monotonic() + try: + conn = Connection.connect(self.conninfo, **self.kwargs) + except Exception: + self._stats[self._CONNECTIONS_ERRORS] += 1 + raise + else: + t1 = monotonic() + self._stats[self._CONNECTIONS_MS] += int(1000.0 * (t1 - t0)) + conn._pool = self if self._configure: @@ -430,6 +454,7 @@ class ConnectionPool(BasePool[Connection]): """ self._reset_connection(conn) if conn.pgconn.transaction_status == TransactionStatus.UNKNOWN: + self._stats[self._RETURNS_BAD] += 1 # Connection no more in working state: create a new one. self.run_task(AddConnection(self)) logger.warning("discarding closed connection: %s", conn) @@ -543,6 +568,11 @@ class ConnectionPool(BasePool[Connection]): ) to_close.close() + def _get_measures(self) -> Dict[str, int]: + rv = super()._get_measures() + rv[self._QUEUE_LENGTH] = len(self._waiting) + return rv + class WaitingClient: """A position in a queue for a client waiting for a connection.""" diff --git a/tests/pool/test_pool.py b/tests/pool/test_pool.py index 212776d47..369d25a45 100644 --- a/tests/pool/test_pool.py +++ b/tests/pool/test_pool.py @@ -817,6 +817,101 @@ def test_async_pool_not_supported(dsn): pool.AsyncConnectionPool(dsn) +@pytest.mark.slow +def test_stats_measures(dsn): + def worker(n): + with p.connection() as conn: + conn.execute("select pg_sleep(0.2)") + + with pool.ConnectionPool(dsn, minconn=2, maxconn=4) as p: + p.wait(2.0) + + stats = p.get_stats() + assert stats["pool_min"] == 2 + assert stats["pool_max"] == 4 + assert stats["pool_size"] == 2 + assert stats["pool_available"] == 2 + assert stats["queue_length"] == 0 + + ts = [Thread(target=worker, args=(i,)) for i in range(3)] + [t.start() for t in ts] + sleep(0.1) + stats = p.get_stats() + [t.join() for t in ts] + assert stats["pool_min"] == 2 + assert stats["pool_max"] == 4 + assert stats["pool_size"] == 3 + assert stats["pool_available"] == 0 + assert stats["queue_length"] == 0 + + p.wait(2.0) + ts = [Thread(target=worker, args=(i,)) for i in range(7)] + [t.start() for t in ts] + sleep(0.1) + stats = p.get_stats() + [t.join() for t in ts] + assert stats["pool_min"] == 2 + assert stats["pool_max"] == 4 + assert stats["pool_size"] == 4 + assert stats["pool_available"] == 0 + assert stats["queue_length"] == 3 + + +@pytest.mark.slow +def test_stats_usage(dsn): + def worker(n): + try: + with p.connection(timeout=0.3) as conn: + conn.execute("select pg_sleep(0.2)") + except pool.PoolTimeout: + pass + + with pool.ConnectionPool(dsn, minconn=3) as p: + p.wait(2.0) + + ts = [Thread(target=worker, args=(i,)) for i in range(7)] + [t.start() for t in ts] + [t.join() for t in ts] + stats = p.get_stats() + assert stats["requests_num"] == 7 + assert stats["requests_queued"] == 4 + assert 850 <= stats["requests_wait_ms"] <= 950 + assert stats["requests_timeouts"] == 1 + assert 1150 <= stats["usage_ms"] <= 1250 + assert stats.get("returns_bad", 0) == 0 + + with p.connection() as conn: + conn.close() + p.wait() + stats = p.pop_stats() + assert stats["requests_num"] == 8 + assert stats["returns_bad"] == 1 + with p.connection(): + pass + assert p.get_stats()["requests_num"] == 1 + + +@pytest.mark.slow +def test_stats_connect(dsn, proxy, monkeypatch): + proxy.start() + delay_connection(monkeypatch, 0.2) + with pool.ConnectionPool(proxy.client_dsn, minconn=3) as p: + p.wait() + stats = p.get_stats() + assert stats["connections_num"] == 3 + assert stats.get("connections_errors", 0) == 0 + assert stats.get("connections_lost", 0) == 0 + assert 600 <= stats["connections_ms"] < 1200 + + proxy.stop() + p.check() + sleep(0.1) + stats = p.get_stats() + assert stats["connections_num"] > 3 + assert stats["connections_errors"] > 0 + assert stats["connections_lost"] == 3 + + def delay_connection(monkeypatch, sec): """ Return a _connect_gen function delayed by the amount of seconds