import logging
from random import random
from typing import Any, Callable, Deque, Dict, Generic, Optional
-from collections import deque
+from typing import TYPE_CHECKING
+from collections import Counter, deque
from ..proto import ConnectionType
+if TYPE_CHECKING:
+ from typing import Counter as TCounter
+
+
logger = logging.getLogger(__name__)
# Used to generate pool names
_num_pool = 0
+ # Stats keys
+ _POOL_MIN = "pool_min"
+ _POOL_MAX = "pool_max"
+ _POOL_SIZE = "pool_size"
+ _POOL_AVAILABLE = "pool_available"
+ _QUEUE_LENGTH = "queue_length"
+ _USAGE_MS = "usage_ms"
+ _REQUESTS_NUM = "requests_num"
+ _REQUESTS_QUEUED = "requests_queued"
+ _REQUESTS_WAIT_MS = "requests_wait_ms"
+ _REQUESTS_TIMEOUTS = "requests_timeouts"
+ _RETURNS_BAD = "returns_bad"
+ _CONNECTIONS_NUM = "connections_num"
+ _CONNECTIONS_MS = "connections_ms"
+ _CONNECTIONS_ERRORS = "connections_errors"
+ _CONNECTIONS_LOST = "connections_lost"
+
def __init__(
self,
conninfo: str = "",
self._nconns = minconn # currently in the pool, out, being prepared
self._pool: Deque[ConnectionType] = deque()
+ self._stats: "TCounter[str]" = Counter()
# Min number of connections in the pool in a max_idle unit of time.
# It is reset periodically by the ShrinkPool scheduled task.
"""`!True` if the pool is closed."""
return self._closed
+ def get_stats(self) -> Dict[str, int]:
+ """
+ Return current stats about the pool usage.
+ """
+ rv = dict(self._stats)
+ rv.update(self._get_measures())
+ return rv
+
+ def pop_stats(self) -> Dict[str, int]:
+ """
+ Return current stats about the pool usage.
+
+ After the call, all the counters are reset to zero.
+ """
+ stats, self._stats = self._stats, Counter()
+ rv = dict(stats)
+ rv.update(self._get_measures())
+ return rv
+
+ def _get_measures(self) -> Dict[str, int]:
+ """
+ Return immediate measures of the pool (not counters).
+ """
+ return {
+ self._POOL_MIN: self._minconn,
+ self._POOL_MAX: self._maxconn,
+ self._POOL_SIZE: self._nconns,
+ self._POOL_AVAILABLE: len(self._pool),
+ }
+
@classmethod
def _jitter(cls, value: float, min_pc: float, max_pc: float) -> float:
"""
from time import monotonic
from queue import Queue, Empty
from types import TracebackType
-from typing import Any, Callable, Deque, Iterator, List, Optional, Type
+from typing import Any, Callable, Deque, Dict, Iterator, List, Optional, Type
from weakref import ref
from contextlib import contextmanager
from collections import deque
replace it with a new one.
"""
conn = self.getconn(timeout=timeout)
+ t0 = monotonic()
try:
with conn:
yield conn
finally:
+ t1 = monotonic()
+ self._stats[self._USAGE_MS] += int(1000.0 * (t1 - t0))
self.putconn(conn)
def getconn(self, timeout: Optional[float] = None) -> Connection:
you don't want a depleted pool.
"""
logger.info("connection requested to %r", self.name)
+ self._stats[self._REQUESTS_NUM] += 1
# Critical section: decide here if there's a connection ready
# or if the client needs to wait.
with self._lock:
self._nconns_min = len(self._pool)
else:
# No connection available: put the client in the waiting queue
+ t0 = monotonic()
pos = WaitingClient()
self._waiting.append(pos)
+ self._stats[self._REQUESTS_QUEUED] += 1
# If there is space for the pool to grow, let's do it
if self._nconns < self._maxconn:
if pos:
if timeout is None:
timeout = self.timeout
- conn = pos.wait(timeout=timeout)
+ try:
+ conn = pos.wait(timeout=timeout)
+ except Exception:
+ self._stats[self._REQUESTS_TIMEOUTS] += 1
+ raise
+ finally:
+ t1 = monotonic()
+ self._stats[self._REQUESTS_WAIT_MS] += int(1000.0 * (t1 - t0))
# Tell the connection it belongs to a pool to avoid closing on __exit__
# Note that this property shouldn't be set while the connection is in
try:
conn.execute("select 1")
except Exception:
+ self._stats[self._CONNECTIONS_LOST] += 1
logger.warning("discarding broken connection: %s", conn)
self.run_task(AddConnection(self))
else:
def _connect(self) -> Connection:
"""Return a new connection configured for the pool."""
- conn = Connection.connect(self.conninfo, **self.kwargs)
+ self._stats[self._CONNECTIONS_NUM] += 1
+ t0 = monotonic()
+ try:
+ conn = Connection.connect(self.conninfo, **self.kwargs)
+ except Exception:
+ self._stats[self._CONNECTIONS_ERRORS] += 1
+ raise
+ else:
+ t1 = monotonic()
+ self._stats[self._CONNECTIONS_MS] += int(1000.0 * (t1 - t0))
+
conn._pool = self
if self._configure:
"""
self._reset_connection(conn)
if conn.pgconn.transaction_status == TransactionStatus.UNKNOWN:
+ self._stats[self._RETURNS_BAD] += 1
# Connection no more in working state: create a new one.
self.run_task(AddConnection(self))
logger.warning("discarding closed connection: %s", conn)
)
to_close.close()
+ def _get_measures(self) -> Dict[str, int]:
+ rv = super()._get_measures()
+ rv[self._QUEUE_LENGTH] = len(self._waiting)
+ return rv
+
class WaitingClient:
"""A position in a queue for a client waiting for a connection."""
pool.AsyncConnectionPool(dsn)
+@pytest.mark.slow
+def test_stats_measures(dsn):
+ def worker(n):
+ with p.connection() as conn:
+ conn.execute("select pg_sleep(0.2)")
+
+ with pool.ConnectionPool(dsn, minconn=2, maxconn=4) as p:
+ p.wait(2.0)
+
+ stats = p.get_stats()
+ assert stats["pool_min"] == 2
+ assert stats["pool_max"] == 4
+ assert stats["pool_size"] == 2
+ assert stats["pool_available"] == 2
+ assert stats["queue_length"] == 0
+
+ ts = [Thread(target=worker, args=(i,)) for i in range(3)]
+ [t.start() for t in ts]
+ sleep(0.1)
+ stats = p.get_stats()
+ [t.join() for t in ts]
+ assert stats["pool_min"] == 2
+ assert stats["pool_max"] == 4
+ assert stats["pool_size"] == 3
+ assert stats["pool_available"] == 0
+ assert stats["queue_length"] == 0
+
+ p.wait(2.0)
+ ts = [Thread(target=worker, args=(i,)) for i in range(7)]
+ [t.start() for t in ts]
+ sleep(0.1)
+ stats = p.get_stats()
+ [t.join() for t in ts]
+ assert stats["pool_min"] == 2
+ assert stats["pool_max"] == 4
+ assert stats["pool_size"] == 4
+ assert stats["pool_available"] == 0
+ assert stats["queue_length"] == 3
+
+
+@pytest.mark.slow
+def test_stats_usage(dsn):
+ def worker(n):
+ try:
+ with p.connection(timeout=0.3) as conn:
+ conn.execute("select pg_sleep(0.2)")
+ except pool.PoolTimeout:
+ pass
+
+ with pool.ConnectionPool(dsn, minconn=3) as p:
+ p.wait(2.0)
+
+ ts = [Thread(target=worker, args=(i,)) for i in range(7)]
+ [t.start() for t in ts]
+ [t.join() for t in ts]
+ stats = p.get_stats()
+ assert stats["requests_num"] == 7
+ assert stats["requests_queued"] == 4
+ assert 850 <= stats["requests_wait_ms"] <= 950
+ assert stats["requests_timeouts"] == 1
+ assert 1150 <= stats["usage_ms"] <= 1250
+ assert stats.get("returns_bad", 0) == 0
+
+ with p.connection() as conn:
+ conn.close()
+ p.wait()
+ stats = p.pop_stats()
+ assert stats["requests_num"] == 8
+ assert stats["returns_bad"] == 1
+ with p.connection():
+ pass
+ assert p.get_stats()["requests_num"] == 1
+
+
+@pytest.mark.slow
+def test_stats_connect(dsn, proxy, monkeypatch):
+ proxy.start()
+ delay_connection(monkeypatch, 0.2)
+ with pool.ConnectionPool(proxy.client_dsn, minconn=3) as p:
+ p.wait()
+ stats = p.get_stats()
+ assert stats["connections_num"] == 3
+ assert stats.get("connections_errors", 0) == 0
+ assert stats.get("connections_lost", 0) == 0
+ assert 600 <= stats["connections_ms"] < 1200
+
+ proxy.stop()
+ p.check()
+ sleep(0.1)
+ stats = p.get_stats()
+ assert stats["connections_num"] > 3
+ assert stats["connections_errors"] > 0
+ assert stats["connections_lost"] == 3
+
+
def delay_connection(monkeypatch, sec):
"""
Return a _connect_gen function delayed by the amount of seconds