]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
Add pool stats
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Wed, 10 Mar 2021 02:22:07 +0000 (03:22 +0100)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Fri, 12 Mar 2021 04:07:25 +0000 (05:07 +0100)
psycopg3/psycopg3/pool/base.py
psycopg3/psycopg3/pool/pool.py
tests/pool/test_pool.py

index c2d15ccc40362c44ac5e8c1447da9ad73db23c65..b4c94f72c53406b1406501c797c92f0c9a95aaff 100644 (file)
@@ -7,10 +7,15 @@ psycopg3 connection pool base class and functionalities.
 import logging
 from random import random
 from typing import Any, Callable, Deque, Dict, Generic, Optional
-from collections import deque
+from typing import TYPE_CHECKING
+from collections import Counter, deque
 
 from ..proto import ConnectionType
 
+if TYPE_CHECKING:
+    from typing import Counter as TCounter
+
+
 logger = logging.getLogger(__name__)
 
 
@@ -19,6 +24,23 @@ class BasePool(Generic[ConnectionType]):
     # Used to generate pool names
     _num_pool = 0
 
+    # Stats keys
+    _POOL_MIN = "pool_min"
+    _POOL_MAX = "pool_max"
+    _POOL_SIZE = "pool_size"
+    _POOL_AVAILABLE = "pool_available"
+    _QUEUE_LENGTH = "queue_length"
+    _USAGE_MS = "usage_ms"
+    _REQUESTS_NUM = "requests_num"
+    _REQUESTS_QUEUED = "requests_queued"
+    _REQUESTS_WAIT_MS = "requests_wait_ms"
+    _REQUESTS_TIMEOUTS = "requests_timeouts"
+    _RETURNS_BAD = "returns_bad"
+    _CONNECTIONS_NUM = "connections_num"
+    _CONNECTIONS_MS = "connections_ms"
+    _CONNECTIONS_ERRORS = "connections_errors"
+    _CONNECTIONS_LOST = "connections_lost"
+
     def __init__(
         self,
         conninfo: str = "",
@@ -62,6 +84,7 @@ class BasePool(Generic[ConnectionType]):
 
         self._nconns = minconn  # currently in the pool, out, being prepared
         self._pool: Deque[ConnectionType] = deque()
+        self._stats: "TCounter[str]" = Counter()
 
         # Min number of connections in the pool in a max_idle unit of time.
         # It is reset periodically by the ShrinkPool scheduled task.
@@ -93,6 +116,36 @@ class BasePool(Generic[ConnectionType]):
         """`!True` if the pool is closed."""
         return self._closed
 
+    def get_stats(self) -> Dict[str, int]:
+        """
+        Return current stats about the pool usage.
+        """
+        rv = dict(self._stats)
+        rv.update(self._get_measures())
+        return rv
+
+    def pop_stats(self) -> Dict[str, int]:
+        """
+        Return current stats about the pool usage.
+
+        After the call, all the counters are reset to zero.
+        """
+        stats, self._stats = self._stats, Counter()
+        rv = dict(stats)
+        rv.update(self._get_measures())
+        return rv
+
+    def _get_measures(self) -> Dict[str, int]:
+        """
+        Return immediate measures of the pool (not counters).
+        """
+        return {
+            self._POOL_MIN: self._minconn,
+            self._POOL_MAX: self._maxconn,
+            self._POOL_SIZE: self._nconns,
+            self._POOL_AVAILABLE: len(self._pool),
+        }
+
     @classmethod
     def _jitter(cls, value: float, min_pc: float, max_pc: float) -> float:
         """
index 65ad838572a0ba0b58de6ae483b22e89e33b3a6a..ddc954019a4cac28546ca84aeb66b65254016611 100644 (file)
@@ -10,7 +10,7 @@ from abc import ABC, abstractmethod
 from time import monotonic
 from queue import Queue, Empty
 from types import TracebackType
-from typing import Any, Callable, Deque, Iterator, List, Optional, Type
+from typing import Any, Callable, Deque, Dict, Iterator, List, Optional, Type
 from weakref import ref
 from contextlib import contextmanager
 from collections import deque
@@ -129,10 +129,13 @@ class ConnectionPool(BasePool[Connection]):
         replace it with a new one.
         """
         conn = self.getconn(timeout=timeout)
+        t0 = monotonic()
         try:
             with conn:
                 yield conn
         finally:
+            t1 = monotonic()
+            self._stats[self._USAGE_MS] += int(1000.0 * (t1 - t0))
             self.putconn(conn)
 
     def getconn(self, timeout: Optional[float] = None) -> Connection:
@@ -146,6 +149,7 @@ class ConnectionPool(BasePool[Connection]):
         you don't want a depleted pool.
         """
         logger.info("connection requested to %r", self.name)
+        self._stats[self._REQUESTS_NUM] += 1
         # Critical section: decide here if there's a connection ready
         # or if the client needs to wait.
         with self._lock:
@@ -160,8 +164,10 @@ class ConnectionPool(BasePool[Connection]):
                     self._nconns_min = len(self._pool)
             else:
                 # No connection available: put the client in the waiting queue
+                t0 = monotonic()
                 pos = WaitingClient()
                 self._waiting.append(pos)
+                self._stats[self._REQUESTS_QUEUED] += 1
 
                 # If there is space for the pool to grow, let's do it
                 if self._nconns < self._maxconn:
@@ -176,7 +182,14 @@ class ConnectionPool(BasePool[Connection]):
         if pos:
             if timeout is None:
                 timeout = self.timeout
-            conn = pos.wait(timeout=timeout)
+            try:
+                conn = pos.wait(timeout=timeout)
+            except Exception:
+                self._stats[self._REQUESTS_TIMEOUTS] += 1
+                raise
+            finally:
+                t1 = monotonic()
+                self._stats[self._REQUESTS_WAIT_MS] += int(1000.0 * (t1 - t0))
 
         # Tell the connection it belongs to a pool to avoid closing on __exit__
         # Note that this property shouldn't be set while the connection is in
@@ -313,6 +326,7 @@ class ConnectionPool(BasePool[Connection]):
             try:
                 conn.execute("select 1")
             except Exception:
+                self._stats[self._CONNECTIONS_LOST] += 1
                 logger.warning("discarding broken connection: %s", conn)
                 self.run_task(AddConnection(self))
             else:
@@ -372,7 +386,17 @@ class ConnectionPool(BasePool[Connection]):
 
     def _connect(self) -> Connection:
         """Return a new connection configured for the pool."""
-        conn = Connection.connect(self.conninfo, **self.kwargs)
+        self._stats[self._CONNECTIONS_NUM] += 1
+        t0 = monotonic()
+        try:
+            conn = Connection.connect(self.conninfo, **self.kwargs)
+        except Exception:
+            self._stats[self._CONNECTIONS_ERRORS] += 1
+            raise
+        else:
+            t1 = monotonic()
+            self._stats[self._CONNECTIONS_MS] += int(1000.0 * (t1 - t0))
+
         conn._pool = self
 
         if self._configure:
@@ -430,6 +454,7 @@ class ConnectionPool(BasePool[Connection]):
         """
         self._reset_connection(conn)
         if conn.pgconn.transaction_status == TransactionStatus.UNKNOWN:
+            self._stats[self._RETURNS_BAD] += 1
             # Connection no more in working state: create a new one.
             self.run_task(AddConnection(self))
             logger.warning("discarding closed connection: %s", conn)
@@ -543,6 +568,11 @@ class ConnectionPool(BasePool[Connection]):
             )
             to_close.close()
 
+    def _get_measures(self) -> Dict[str, int]:
+        rv = super()._get_measures()
+        rv[self._QUEUE_LENGTH] = len(self._waiting)
+        return rv
+
 
 class WaitingClient:
     """A position in a queue for a client waiting for a connection."""
index 212776d471ce19b216510d2aa4b3edc2d2b03bbf..369d25a45a6cfcea1c82853685e7557dd6b485b8 100644 (file)
@@ -817,6 +817,101 @@ def test_async_pool_not_supported(dsn):
         pool.AsyncConnectionPool(dsn)
 
 
+@pytest.mark.slow
+def test_stats_measures(dsn):
+    def worker(n):
+        with p.connection() as conn:
+            conn.execute("select pg_sleep(0.2)")
+
+    with pool.ConnectionPool(dsn, minconn=2, maxconn=4) as p:
+        p.wait(2.0)
+
+        stats = p.get_stats()
+        assert stats["pool_min"] == 2
+        assert stats["pool_max"] == 4
+        assert stats["pool_size"] == 2
+        assert stats["pool_available"] == 2
+        assert stats["queue_length"] == 0
+
+        ts = [Thread(target=worker, args=(i,)) for i in range(3)]
+        [t.start() for t in ts]
+        sleep(0.1)
+        stats = p.get_stats()
+        [t.join() for t in ts]
+        assert stats["pool_min"] == 2
+        assert stats["pool_max"] == 4
+        assert stats["pool_size"] == 3
+        assert stats["pool_available"] == 0
+        assert stats["queue_length"] == 0
+
+        p.wait(2.0)
+        ts = [Thread(target=worker, args=(i,)) for i in range(7)]
+        [t.start() for t in ts]
+        sleep(0.1)
+        stats = p.get_stats()
+        [t.join() for t in ts]
+        assert stats["pool_min"] == 2
+        assert stats["pool_max"] == 4
+        assert stats["pool_size"] == 4
+        assert stats["pool_available"] == 0
+        assert stats["queue_length"] == 3
+
+
+@pytest.mark.slow
+def test_stats_usage(dsn):
+    def worker(n):
+        try:
+            with p.connection(timeout=0.3) as conn:
+                conn.execute("select pg_sleep(0.2)")
+        except pool.PoolTimeout:
+            pass
+
+    with pool.ConnectionPool(dsn, minconn=3) as p:
+        p.wait(2.0)
+
+        ts = [Thread(target=worker, args=(i,)) for i in range(7)]
+        [t.start() for t in ts]
+        [t.join() for t in ts]
+        stats = p.get_stats()
+        assert stats["requests_num"] == 7
+        assert stats["requests_queued"] == 4
+        assert 850 <= stats["requests_wait_ms"] <= 950
+        assert stats["requests_timeouts"] == 1
+        assert 1150 <= stats["usage_ms"] <= 1250
+        assert stats.get("returns_bad", 0) == 0
+
+        with p.connection() as conn:
+            conn.close()
+        p.wait()
+        stats = p.pop_stats()
+        assert stats["requests_num"] == 8
+        assert stats["returns_bad"] == 1
+        with p.connection():
+            pass
+        assert p.get_stats()["requests_num"] == 1
+
+
+@pytest.mark.slow
+def test_stats_connect(dsn, proxy, monkeypatch):
+    proxy.start()
+    delay_connection(monkeypatch, 0.2)
+    with pool.ConnectionPool(proxy.client_dsn, minconn=3) as p:
+        p.wait()
+        stats = p.get_stats()
+        assert stats["connections_num"] == 3
+        assert stats.get("connections_errors", 0) == 0
+        assert stats.get("connections_lost", 0) == 0
+        assert 600 <= stats["connections_ms"] < 1200
+
+        proxy.stop()
+        p.check()
+        sleep(0.1)
+        stats = p.get_stats()
+        assert stats["connections_num"] > 3
+        assert stats["connections_errors"] > 0
+        assert stats["connections_lost"] == 3
+
+
 def delay_connection(monkeypatch, sec):
     """
     Return a _connect_gen function delayed by the amount of seconds