from time import monotonic
from types import TracebackType
from typing import Any, AsyncIterator, Awaitable, Callable, Deque
-from typing import List, Optional, Type
+from typing import Dict, List, Optional, Type
from weakref import ref
from collections import deque
# remained unused.
self.run_task(Schedule(self, ShrinkPool(self), self.max_idle))
- def run_task(self, task: "MaintenanceTask") -> None:
- """Run a maintenance task in a worker thread."""
- self._tasks.put_nowait(task)
-
- async def schedule_task(
- self, task: "MaintenanceTask", delay: float
- ) -> None:
- """Run a maintenance task in a worker thread in the future."""
- await self._sched.enter(delay, task.tick)
-
- @classmethod
- async def worker(cls, q: "asyncio.Queue[MaintenanceTask]") -> None:
- """Runner to execute pending maintenance task.
-
- The function is designed to run as a separate thread.
-
- Block on the queue *q*, run a task received. Finish running if a
- StopWorker is received.
- """
- while True:
- task = await q.get()
-
- if isinstance(task, StopWorker):
- logger.debug("terminating working task")
- return
-
- # Run the task. Make sure don't die in the attempt.
- try:
- await task.run()
- except Exception as ex:
- logger.warning(
- "task run %s failed: %s: %s",
- task,
- ex.__class__.__name__,
- ex,
- )
-
async def wait(self, timeout: float = 30.0) -> None:
"""
Wait for the pool to be full after init.
replace it with a new one.
"""
conn = await self.getconn(timeout=timeout)
+ t0 = monotonic()
try:
async with conn:
yield conn
finally:
+ t1 = monotonic()
+ self._stats[self._USAGE_MS] += int(1000.0 * (t1 - t0))
await self.putconn(conn)
async def getconn(
you don't want a depleted pool.
"""
logger.info("connection requested to %r", self.name)
+ self._stats[self._REQUESTS_NUM] += 1
# Critical section: decide here if there's a connection ready
# or if the client needs to wait.
async with self._lock:
self._nconns_min = len(self._pool)
else:
# No connection available: put the client in the waiting queue
+ t0 = monotonic()
pos = AsyncClient()
self._waiting.append(pos)
+ self._stats[self._REQUESTS_QUEUED] += 1
# If there is space for the pool to grow, let's do it
if self._nconns < self._maxconn:
if pos:
if timeout is None:
timeout = self.timeout
- conn = await pos.wait(timeout=timeout)
+ try:
+ conn = await pos.wait(timeout=timeout)
+ except Exception:
+ self._stats[self._REQUESTS_TIMEOUTS] += 1
+ raise
+ finally:
+ t1 = monotonic()
+ self._stats[self._REQUESTS_WAIT_MS] += int(1000.0 * (t1 - t0))
# Tell the connection it belongs to a pool to avoid closing on __exit__
# Note that this property shouldn't be set while the connection is in
try:
await conn.execute("select 1")
except Exception:
+ self._stats[self._CONNECTIONS_LOST] += 1
logger.warning("discarding broken connection: %s", conn)
self.run_task(AddConnection(self))
else:
"""
self._reconnect_failed(self)
+ def run_task(self, task: "MaintenanceTask") -> None:
+ """Run a maintenance task in a worker thread."""
+ self._tasks.put_nowait(task)
+
+ async def schedule_task(
+ self, task: "MaintenanceTask", delay: float
+ ) -> None:
+ """Run a maintenance task in a worker thread in the future."""
+ await self._sched.enter(delay, task.tick)
+
+ @classmethod
+ async def worker(cls, q: "asyncio.Queue[MaintenanceTask]") -> None:
+ """Runner to execute pending maintenance task.
+
+ The function is designed to run as a separate thread.
+
+ Block on the queue *q*, run a task received. Finish running if a
+ StopWorker is received.
+ """
+ while True:
+ task = await q.get()
+
+ if isinstance(task, StopWorker):
+ logger.debug("terminating working task")
+ return
+
+ # Run the task. Make sure don't die in the attempt.
+ try:
+ await task.run()
+ except Exception as ex:
+ logger.warning(
+ "task run %s failed: %s: %s",
+ task,
+ ex.__class__.__name__,
+ ex,
+ )
+
async def _connect(self) -> AsyncConnection:
"""Return a new connection configured for the pool."""
- conn = await self.connection_class.connect(
- self.conninfo, **self.kwargs
- )
+ self._stats[self._CONNECTIONS_NUM] += 1
+ t0 = monotonic()
+ try:
+ conn = await self.connection_class.connect(
+ self.conninfo, **self.kwargs
+ )
+ except Exception:
+ self._stats[self._CONNECTIONS_ERRORS] += 1
+ raise
+ else:
+ t1 = monotonic()
+ self._stats[self._CONNECTIONS_MS] += int(1000.0 * (t1 - t0))
+
conn._pool = self
if self._configure:
"""
await self._reset_connection(conn)
if conn.pgconn.transaction_status == TransactionStatus.UNKNOWN:
+ self._stats[self._RETURNS_BAD] += 1
# Connection no more in working state: create a new one.
self.run_task(AddConnection(self))
logger.warning("discarding closed connection: %s", conn)
)
await to_close.close()
+ def _get_measures(self) -> Dict[str, int]:
+ rv = super()._get_measures()
+ rv[self._QUEUE_LENGTH] = len(self._waiting)
+ return rv
+
class AsyncClient:
"""A position in a queue for a client waiting for a connection."""
assert pid not in pids2
+@pytest.mark.slow
+async def test_stats_measures(dsn):
+ async def worker(n):
+ async with p.connection() as conn:
+ await conn.execute("select pg_sleep(0.2)")
+
+ async with pool.AsyncConnectionPool(dsn, minconn=2, maxconn=4) as p:
+ await p.wait(2.0)
+
+ stats = p.get_stats()
+ assert stats["pool_min"] == 2
+ assert stats["pool_max"] == 4
+ assert stats["pool_size"] == 2
+ assert stats["pool_available"] == 2
+ assert stats["queue_length"] == 0
+
+ ts = [create_task(worker(i)) for i in range(3)]
+ await asyncio.sleep(0.1)
+ stats = p.get_stats()
+ await asyncio.gather(*ts)
+ assert stats["pool_min"] == 2
+ assert stats["pool_max"] == 4
+ assert stats["pool_size"] == 3
+ assert stats["pool_available"] == 0
+ assert stats["queue_length"] == 0
+
+ await p.wait(2.0)
+ ts = [create_task(worker(i)) for i in range(7)]
+ await asyncio.sleep(0.1)
+ stats = p.get_stats()
+ await asyncio.gather(*ts)
+ assert stats["pool_min"] == 2
+ assert stats["pool_max"] == 4
+ assert stats["pool_size"] == 4
+ assert stats["pool_available"] == 0
+ assert stats["queue_length"] == 3
+
+
+@pytest.mark.slow
+async def test_stats_usage(dsn):
+ async def worker(n):
+ try:
+ async with p.connection(timeout=0.3) as conn:
+ await conn.execute("select pg_sleep(0.2)")
+ except pool.PoolTimeout:
+ pass
+
+ async with pool.AsyncConnectionPool(dsn, minconn=3) as p:
+ await p.wait(2.0)
+
+ ts = [create_task(worker(i)) for i in range(7)]
+ await asyncio.gather(*ts)
+ stats = p.get_stats()
+ assert stats["requests_num"] == 7
+ assert stats["requests_queued"] == 4
+ assert 850 <= stats["requests_wait_ms"] <= 950
+ assert stats["requests_timeouts"] == 1
+ assert 1150 <= stats["usage_ms"] <= 1250
+ assert stats.get("returns_bad", 0) == 0
+
+ async with p.connection() as conn:
+ await conn.close()
+ await p.wait()
+ stats = p.pop_stats()
+ assert stats["requests_num"] == 8
+ assert stats["returns_bad"] == 1
+ async with p.connection():
+ pass
+ assert p.get_stats()["requests_num"] == 1
+
+
+@pytest.mark.slow
+async def test_stats_connect(dsn, proxy, monkeypatch):
+ proxy.start()
+ delay_connection(monkeypatch, 0.2)
+ async with pool.AsyncConnectionPool(proxy.client_dsn, minconn=3) as p:
+ await p.wait()
+ stats = p.get_stats()
+ assert stats["connections_num"] == 3
+ assert stats.get("connections_errors", 0) == 0
+ assert stats.get("connections_lost", 0) == 0
+ assert 600 <= stats["connections_ms"] < 1200
+
+ proxy.stop()
+ await p.check()
+ await asyncio.sleep(0.1)
+ stats = p.get_stats()
+ assert stats["connections_num"] > 3
+ assert stats["connections_errors"] > 0
+ assert stats["connections_lost"] == 3
+
+
def delay_connection(monkeypatch, sec):
"""
Return a _connect_gen function delayed by the amount of seconds