]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
Add async pool stats
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Wed, 10 Mar 2021 18:34:00 +0000 (19:34 +0100)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Fri, 12 Mar 2021 04:07:25 +0000 (05:07 +0100)
psycopg3/psycopg3/pool/async_pool.py
tests/pool/test_pool_async.py

index dd729732f645fb613b4f5a9ed8e361ea0de4baf0..b1a78eb2a07e1d43b047ab06873a0e08ab7ac319 100644 (file)
@@ -11,7 +11,7 @@ from abc import ABC, abstractmethod
 from time import monotonic
 from types import TracebackType
 from typing import Any, AsyncIterator, Awaitable, Callable, Deque
-from typing import List, Optional, Type
+from typing import Dict, List, Optional, Type
 from weakref import ref
 from collections import deque
 
@@ -79,43 +79,6 @@ class AsyncConnectionPool(BasePool[AsyncConnection]):
         # remained unused.
         self.run_task(Schedule(self, ShrinkPool(self), self.max_idle))
 
-    def run_task(self, task: "MaintenanceTask") -> None:
-        """Run a maintenance task in a worker thread."""
-        self._tasks.put_nowait(task)
-
-    async def schedule_task(
-        self, task: "MaintenanceTask", delay: float
-    ) -> None:
-        """Run a maintenance task in a worker thread in the future."""
-        await self._sched.enter(delay, task.tick)
-
-    @classmethod
-    async def worker(cls, q: "asyncio.Queue[MaintenanceTask]") -> None:
-        """Runner to execute pending maintenance task.
-
-        The function is designed to run as a separate thread.
-
-        Block on the queue *q*, run a task received. Finish running if a
-        StopWorker is received.
-        """
-        while True:
-            task = await q.get()
-
-            if isinstance(task, StopWorker):
-                logger.debug("terminating working task")
-                return
-
-            # Run the task. Make sure don't die in the attempt.
-            try:
-                await task.run()
-            except Exception as ex:
-                logger.warning(
-                    "task run %s failed: %s: %s",
-                    task,
-                    ex.__class__.__name__,
-                    ex,
-                )
-
     async def wait(self, timeout: float = 30.0) -> None:
         """
         Wait for the pool to be full after init.
@@ -156,10 +119,13 @@ class AsyncConnectionPool(BasePool[AsyncConnection]):
         replace it with a new one.
         """
         conn = await self.getconn(timeout=timeout)
+        t0 = monotonic()
         try:
             async with conn:
                 yield conn
         finally:
+            t1 = monotonic()
+            self._stats[self._USAGE_MS] += int(1000.0 * (t1 - t0))
             await self.putconn(conn)
 
     async def getconn(
@@ -175,6 +141,7 @@ class AsyncConnectionPool(BasePool[AsyncConnection]):
         you don't want a depleted pool.
         """
         logger.info("connection requested to %r", self.name)
+        self._stats[self._REQUESTS_NUM] += 1
         # Critical section: decide here if there's a connection ready
         # or if the client needs to wait.
         async with self._lock:
@@ -189,8 +156,10 @@ class AsyncConnectionPool(BasePool[AsyncConnection]):
                     self._nconns_min = len(self._pool)
             else:
                 # No connection available: put the client in the waiting queue
+                t0 = monotonic()
                 pos = AsyncClient()
                 self._waiting.append(pos)
+                self._stats[self._REQUESTS_QUEUED] += 1
 
                 # If there is space for the pool to grow, let's do it
                 if self._nconns < self._maxconn:
@@ -205,7 +174,14 @@ class AsyncConnectionPool(BasePool[AsyncConnection]):
         if pos:
             if timeout is None:
                 timeout = self.timeout
-            conn = await pos.wait(timeout=timeout)
+            try:
+                conn = await pos.wait(timeout=timeout)
+            except Exception:
+                self._stats[self._REQUESTS_TIMEOUTS] += 1
+                raise
+            finally:
+                t1 = monotonic()
+                self._stats[self._REQUESTS_WAIT_MS] += int(1000.0 * (t1 - t0))
 
         # Tell the connection it belongs to a pool to avoid closing on __exit__
         # Note that this property shouldn't be set while the connection is in
@@ -343,6 +319,7 @@ class AsyncConnectionPool(BasePool[AsyncConnection]):
             try:
                 await conn.execute("select 1")
             except Exception:
+                self._stats[self._CONNECTIONS_LOST] += 1
                 logger.warning("discarding broken connection: %s", conn)
                 self.run_task(AddConnection(self))
             else:
@@ -354,11 +331,58 @@ class AsyncConnectionPool(BasePool[AsyncConnection]):
         """
         self._reconnect_failed(self)
 
+    def run_task(self, task: "MaintenanceTask") -> None:
+        """Run a maintenance task in a worker thread."""
+        self._tasks.put_nowait(task)
+
+    async def schedule_task(
+        self, task: "MaintenanceTask", delay: float
+    ) -> None:
+        """Run a maintenance task in a worker thread in the future."""
+        await self._sched.enter(delay, task.tick)
+
+    @classmethod
+    async def worker(cls, q: "asyncio.Queue[MaintenanceTask]") -> None:
+        """Runner to execute pending maintenance task.
+
+        The function is designed to run as a separate thread.
+
+        Block on the queue *q*, run a task received. Finish running if a
+        StopWorker is received.
+        """
+        while True:
+            task = await q.get()
+
+            if isinstance(task, StopWorker):
+                logger.debug("terminating working task")
+                return
+
+            # Run the task. Make sure don't die in the attempt.
+            try:
+                await task.run()
+            except Exception as ex:
+                logger.warning(
+                    "task run %s failed: %s: %s",
+                    task,
+                    ex.__class__.__name__,
+                    ex,
+                )
+
     async def _connect(self) -> AsyncConnection:
         """Return a new connection configured for the pool."""
-        conn = await self.connection_class.connect(
-            self.conninfo, **self.kwargs
-        )
+        self._stats[self._CONNECTIONS_NUM] += 1
+        t0 = monotonic()
+        try:
+            conn = await self.connection_class.connect(
+                self.conninfo, **self.kwargs
+            )
+        except Exception:
+            self._stats[self._CONNECTIONS_ERRORS] += 1
+            raise
+        else:
+            t1 = monotonic()
+            self._stats[self._CONNECTIONS_MS] += int(1000.0 * (t1 - t0))
+
         conn._pool = self
 
         if self._configure:
@@ -420,6 +444,7 @@ class AsyncConnectionPool(BasePool[AsyncConnection]):
         """
         await self._reset_connection(conn)
         if conn.pgconn.transaction_status == TransactionStatus.UNKNOWN:
+            self._stats[self._RETURNS_BAD] += 1
             # Connection no more in working state: create a new one.
             self.run_task(AddConnection(self))
             logger.warning("discarding closed connection: %s", conn)
@@ -533,6 +558,11 @@ class AsyncConnectionPool(BasePool[AsyncConnection]):
             )
             await to_close.close()
 
+    def _get_measures(self) -> Dict[str, int]:
+        rv = super()._get_measures()
+        rv[self._QUEUE_LENGTH] = len(self._waiting)
+        return rv
+
 
 class AsyncClient:
     """A position in a queue for a client waiting for a connection."""
index 1aab09cc41ea8033ccf70264ac184322125ffb59..95404af551ae68a1e3a2b1e3f7680be803bf3180 100644 (file)
@@ -828,6 +828,98 @@ async def test_check(dsn, caplog):
         assert pid not in pids2
 
 
+@pytest.mark.slow
+async def test_stats_measures(dsn):
+    async def worker(n):
+        async with p.connection() as conn:
+            await conn.execute("select pg_sleep(0.2)")
+
+    async with pool.AsyncConnectionPool(dsn, minconn=2, maxconn=4) as p:
+        await p.wait(2.0)
+
+        stats = p.get_stats()
+        assert stats["pool_min"] == 2
+        assert stats["pool_max"] == 4
+        assert stats["pool_size"] == 2
+        assert stats["pool_available"] == 2
+        assert stats["queue_length"] == 0
+
+        ts = [create_task(worker(i)) for i in range(3)]
+        await asyncio.sleep(0.1)
+        stats = p.get_stats()
+        await asyncio.gather(*ts)
+        assert stats["pool_min"] == 2
+        assert stats["pool_max"] == 4
+        assert stats["pool_size"] == 3
+        assert stats["pool_available"] == 0
+        assert stats["queue_length"] == 0
+
+        await p.wait(2.0)
+        ts = [create_task(worker(i)) for i in range(7)]
+        await asyncio.sleep(0.1)
+        stats = p.get_stats()
+        await asyncio.gather(*ts)
+        assert stats["pool_min"] == 2
+        assert stats["pool_max"] == 4
+        assert stats["pool_size"] == 4
+        assert stats["pool_available"] == 0
+        assert stats["queue_length"] == 3
+
+
+@pytest.mark.slow
+async def test_stats_usage(dsn):
+    async def worker(n):
+        try:
+            async with p.connection(timeout=0.3) as conn:
+                await conn.execute("select pg_sleep(0.2)")
+        except pool.PoolTimeout:
+            pass
+
+    async with pool.AsyncConnectionPool(dsn, minconn=3) as p:
+        await p.wait(2.0)
+
+        ts = [create_task(worker(i)) for i in range(7)]
+        await asyncio.gather(*ts)
+        stats = p.get_stats()
+        assert stats["requests_num"] == 7
+        assert stats["requests_queued"] == 4
+        assert 850 <= stats["requests_wait_ms"] <= 950
+        assert stats["requests_timeouts"] == 1
+        assert 1150 <= stats["usage_ms"] <= 1250
+        assert stats.get("returns_bad", 0) == 0
+
+        async with p.connection() as conn:
+            await conn.close()
+        await p.wait()
+        stats = p.pop_stats()
+        assert stats["requests_num"] == 8
+        assert stats["returns_bad"] == 1
+        async with p.connection():
+            pass
+        assert p.get_stats()["requests_num"] == 1
+
+
+@pytest.mark.slow
+async def test_stats_connect(dsn, proxy, monkeypatch):
+    proxy.start()
+    delay_connection(monkeypatch, 0.2)
+    async with pool.AsyncConnectionPool(proxy.client_dsn, minconn=3) as p:
+        await p.wait()
+        stats = p.get_stats()
+        assert stats["connections_num"] == 3
+        assert stats.get("connections_errors", 0) == 0
+        assert stats.get("connections_lost", 0) == 0
+        assert 600 <= stats["connections_ms"] < 1200
+
+        proxy.stop()
+        await p.check()
+        await asyncio.sleep(0.1)
+        stats = p.get_stats()
+        assert stats["connections_num"] > 3
+        assert stats["connections_errors"] > 0
+        assert stats["connections_lost"] == 3
+
+
 def delay_connection(monkeypatch, sec):
     """
     Return a _connect_gen function delayed by the amount of seconds