]> git.ipfire.org Git - thirdparty/psycopg.git/commitdiff
Refactor pools to reduce code duplication
authorDaniele Varrazzo <daniele.varrazzo@gmail.com>
Sat, 8 Jan 2022 17:35:41 +0000 (18:35 +0100)
committerDaniele Varrazzo <daniele.varrazzo@gmail.com>
Sun, 9 Jan 2022 17:48:04 +0000 (18:48 +0100)
Use common getconn/putconn implementations and implement only the
different behaviours in subclasses.

psycopg_pool/psycopg_pool/base.py
psycopg_pool/psycopg_pool/null_pool.py
psycopg_pool/psycopg_pool/null_pool_async.py
psycopg_pool/psycopg_pool/pool.py
psycopg_pool/psycopg_pool/pool_async.py

index 1e3187b5510d8d5f6c5be7c5a791e89ecb6f3cef..a5ffab9295dac7ff82592666edf5e57a6e86bcc6 100644 (file)
@@ -8,10 +8,10 @@ from time import monotonic
 from random import random
 from typing import Any, Callable, Dict, Generic, Optional, Tuple
 
-from psycopg.abc import ConnectionType
 from psycopg import errors as e
-from .errors import PoolClosed
+from psycopg.abc import ConnectionType
 
+from .errors import PoolClosed
 from ._compat import Counter, Deque
 
 
index 479799b28f7d59d74904159f7adc19fa06686535..62e1b3f859adab2fe6f91ec67f5efef2c58762b4 100644 (file)
@@ -6,14 +6,12 @@ Psycopg null connection pools
 
 import logging
 import threading
-from time import monotonic
 from typing import Any, Optional, Tuple
 
 from psycopg import Connection
 from psycopg.pq import TransactionStatus
 
-from .pool import ConnectionPool, WaitingClient
-from .pool import AddConnection, ReturnConnection
+from .pool import ConnectionPool, AddConnection
 from .errors import PoolTimeout, TooManyRequests
 from ._compat import ConnectionTimeout
 
@@ -46,6 +44,10 @@ class _BaseNullConnectionPool:
         # or to grow/shrink.
         return
 
+    def _maybe_grow_pool(self) -> None:
+        # null pools don't grow
+        pass
+
 
 class NullConnectionPool(_BaseNullConnectionPool, ConnectionPool):
     def wait(self, timeout: float = 30.0) -> None:
@@ -79,79 +81,37 @@ class NullConnectionPool(_BaseNullConnectionPool, ConnectionPool):
 
         logger.info("pool %r is ready to use", self.name)
 
-    def getconn(self, timeout: Optional[float] = None) -> Connection[Any]:
-        logger.info("connection requested from %r", self.name)
-        self._stats[self._REQUESTS_NUM] += 1
-
-        # Critical section: decide here if there's a connection ready
-        # or if the client needs to wait.
-        with self._lock:
-            self._check_open_getconn()
-
-            pos: Optional[WaitingClient] = None
-            if self.max_size == 0 or self._nconns < self.max_size:
-                # Create a new connection for the client
-                try:
-                    conn = self._connect(timeout=timeout)
-                except ConnectionTimeout as ex:
-                    raise PoolTimeout(str(ex)) from None
-                self._nconns += 1
-            else:
-                if self.max_waiting and len(self._waiting) >= self.max_waiting:
-                    self._stats[self._REQUESTS_ERRORS] += 1
-                    raise TooManyRequests(
-                        f"the pool {self.name!r} has aleady"
-                        f" {len(self._waiting)} requests waiting"
-                    )
-
-                # No connection available: put the client in the waiting queue
-                t0 = monotonic()
-                pos = WaitingClient()
-                self._waiting.append(pos)
-                self._stats[self._REQUESTS_QUEUED] += 1
-
-        # If we are in the waiting queue, wait to be assigned a connection
-        # (outside the critical section, so only the waiting client is locked)
-        if pos:
-            if timeout is None:
-                timeout = self.timeout
+    def _get_ready_connection(
+        self, timeout: Optional[float]
+    ) -> Optional[Connection[Any]]:
+        conn: Optional[Connection[Any]] = None
+        if self.max_size == 0 or self._nconns < self.max_size:
+            # Create a new connection for the client
             try:
-                conn = pos.wait(timeout=timeout)
-            except Exception:
-                self._stats[self._REQUESTS_ERRORS] += 1
-                raise
-            finally:
-                t1 = monotonic()
-                self._stats[self._REQUESTS_WAIT_MS] += int(1000.0 * (t1 - t0))
-
-        # Tell the connection it belongs to a pool to avoid closing on __exit__
-        conn._pool = self
-        logger.info("connection given by %r", self.name)
+                conn = self._connect(timeout=timeout)
+            except ConnectionTimeout as ex:
+                raise PoolTimeout(str(ex)) from None
+            self._nconns += 1
+
+        elif self.max_waiting and len(self._waiting) >= self.max_waiting:
+            self._stats[self._REQUESTS_ERRORS] += 1
+            raise TooManyRequests(
+                f"the pool {self.name!r} has aleady"
+                f" {len(self._waiting)} requests waiting"
+            )
         return conn
 
-    def putconn(self, conn: Connection[Any]) -> None:
-        # Quick check to discard the wrong connection
-        self._check_pool_putconn(conn)
-
-        logger.info("returning connection to %r", self.name)
-
-        # Close the connection if no client is waiting for it, or if the pool
-        # is closed. For extra refcare remove the pool reference from it.
-        # Maintain the stats.
+    def _maybe_close_connection(self, conn: Connection[Any]) -> bool:
         with self._lock:
-            if self._closed or not self._waiting:
-                conn._pool = None
-                if conn.pgconn.transaction_status == TransactionStatus.UNKNOWN:
-                    self._stats[self._RETURNS_BAD] += 1
-                conn.close()
-                self._nconns -= 1
-                return
-
-        # Use a worker to perform eventual maintenance work in a separate thread
-        if self._reset:
-            self.run_task(ReturnConnection(self, conn))
-        else:
-            self._return_connection(conn)
+            if not self._closed and self._waiting:
+                return False
+
+            conn._pool = None
+            if conn.pgconn.transaction_status == TransactionStatus.UNKNOWN:
+                self._stats[self._RETURNS_BAD] += 1
+            conn.close()
+            self._nconns -= 1
+            return True
 
     def resize(self, min_size: int, max_size: Optional[int] = None) -> None:
         """Change the size of the pool during runtime.
index 69012231018500b2d266a1eaf27b36a7c24b3879..694e8f0912e5bac1269108d360a5f737a1781486 100644 (file)
@@ -6,17 +6,15 @@ psycopg asynchronous null connection pool
 
 import asyncio
 import logging
-from time import monotonic
 from typing import Any, Optional
 
+from psycopg import AsyncConnection
 from psycopg.pq import TransactionStatus
-from psycopg.connection_async import AsyncConnection
 
 from .errors import PoolTimeout, TooManyRequests
 from ._compat import ConnectionTimeout
 from .null_pool import _BaseNullConnectionPool
-from .pool_async import AsyncConnectionPool, AsyncClient
-from .pool_async import AddConnection, ReturnConnection
+from .pool_async import AsyncConnectionPool, AddConnection
 
 logger = logging.getLogger("psycopg.pool")
 
@@ -45,81 +43,41 @@ class AsyncNullConnectionPool(_BaseNullConnectionPool, AsyncConnectionPool):
 
         logger.info("pool %r is ready to use", self.name)
 
-    async def getconn(
-        self, timeout: Optional[float] = None
-    ) -> AsyncConnection[Any]:
-        logger.info("connection requested from %r", self.name)
-        self._stats[self._REQUESTS_NUM] += 1
-
-        # Critical section: decide here if there's a connection ready
-        # or if the client needs to wait.
-        async with self._lock:
-            self._check_open_getconn()
-
-            pos: Optional[AsyncClient] = None
-            if self.max_size == 0 or self._nconns < self.max_size:
-                # Create a new connection for the client
-                try:
-                    conn = await self._connect(timeout=timeout)
-                except ConnectionTimeout as ex:
-                    raise PoolTimeout(str(ex)) from None
-                self._nconns += 1
-            else:
-                if self.max_waiting and len(self._waiting) >= self.max_waiting:
-                    self._stats[self._REQUESTS_ERRORS] += 1
-                    raise TooManyRequests(
-                        f"the pool {self.name!r} has aleady"
-                        f" {len(self._waiting)} requests waiting"
-                    )
-
-                # No connection available: put the client in the waiting queue
-                t0 = monotonic()
-                pos = AsyncClient()
-                self._waiting.append(pos)
-                self._stats[self._REQUESTS_QUEUED] += 1
-
-        # If we are in the waiting queue, wait to be assigned a connection
-        # (outside the critical section, so only the waiting client is locked)
-        if pos:
-            if timeout is None:
-                timeout = self.timeout
+    async def _get_ready_connection(
+        self, timeout: Optional[float]
+    ) -> Optional[AsyncConnection[Any]]:
+        conn: Optional[AsyncConnection[Any]] = None
+        if self.max_size == 0 or self._nconns < self.max_size:
+            # Create a new connection for the client
             try:
-                conn = await pos.wait(timeout=timeout)
-            except Exception:
-                self._stats[self._REQUESTS_ERRORS] += 1
-                raise
-            finally:
-                t1 = monotonic()
-                self._stats[self._REQUESTS_WAIT_MS] += int(1000.0 * (t1 - t0))
-
-        # Tell the connection it belongs to a pool to avoid closing on __exit__
-        conn._pool = self
-        logger.info("connection given by %r", self.name)
+                conn = await self._connect(timeout=timeout)
+            except ConnectionTimeout as ex:
+                raise PoolTimeout(str(ex)) from None
+            self._nconns += 1
+        elif self.max_waiting and len(self._waiting) >= self.max_waiting:
+            self._stats[self._REQUESTS_ERRORS] += 1
+            raise TooManyRequests(
+                f"the pool {self.name!r} has aleady"
+                f" {len(self._waiting)} requests waiting"
+            )
         return conn
 
-    async def putconn(self, conn: AsyncConnection[Any]) -> None:
-        # Quick check to discard the wrong connection
-        self._check_pool_putconn(conn)
-
-        logger.info("returning connection to %r", self.name)
-
+    async def _maybe_close_connection(
+        self, conn: AsyncConnection[Any]
+    ) -> bool:
         # Close the connection if no client is waiting for it, or if the pool
         # is closed. For extra refcare remove the pool reference from it.
         # Maintain the stats.
         async with self._lock:
-            if self._closed or not self._waiting:
-                conn._pool = None
-                if conn.pgconn.transaction_status == TransactionStatus.UNKNOWN:
-                    self._stats[self._RETURNS_BAD] += 1
-                await conn.close()
-                self._nconns -= 1
-                return
-
-        # Use a worker to perform eventual maintenance work in a separate task
-        if self._reset:
-            self.run_task(ReturnConnection(self, conn))
-        else:
-            await self._return_connection(conn)
+            if not self._closed and self._waiting:
+                return False
+
+            conn._pool = None
+            if conn.pgconn.transaction_status == TransactionStatus.UNKNOWN:
+                self._stats[self._RETURNS_BAD] += 1
+            await conn.close()
+            self._nconns -= 1
+            return True
 
     async def resize(
         self, min_size: int, max_size: Optional[int] = None
index 06683ba90e9ae146215387f805fc310d0940d52b..c9f7c0c7a1d45d9a61cbf20e8c8e3fd562d19982 100644 (file)
@@ -142,21 +142,8 @@ class ConnectionPool(BasePool[Connection[Any]]):
         # or if the client needs to wait.
         with self._lock:
             self._check_open_getconn()
-
-            pos: Optional[WaitingClient] = None
-            if self._pool:
-                # Take a connection ready out of the pool
-                conn = self._pool.popleft()
-                if len(self._pool) < self._nconns_min:
-                    self._nconns_min = len(self._pool)
-            else:
-                if self.max_waiting and len(self._waiting) >= self.max_waiting:
-                    self._stats[self._REQUESTS_ERRORS] += 1
-                    raise TooManyRequests(
-                        f"the pool {self.name!r} has aleady"
-                        f" {len(self._waiting)} requests waiting"
-                    )
-
+            conn = self._get_ready_connection(timeout)
+            if not conn:
                 # No connection available: put the client in the waiting queue
                 t0 = monotonic()
                 pos = WaitingClient()
@@ -164,19 +151,11 @@ class ConnectionPool(BasePool[Connection[Any]]):
                 self._stats[self._REQUESTS_QUEUED] += 1
 
                 # If there is space for the pool to grow, let's do it
-                # Allow only one thread at time to grow the pool (or returning
-                # connections might be starved).
-                if self._nconns < self._max_size and not self._growing:
-                    self._nconns += 1
-                    logger.info(
-                        "growing pool %r to %s", self.name, self._nconns
-                    )
-                    self._growing = True
-                    self.run_task(AddConnection(self, growing=True))
+                self._maybe_grow_pool()
 
         # If we are in the waiting queue, wait to be assigned a connection
         # (outside the critical section, so only the waiting client is locked)
-        if pos:
+        if not conn:
             if timeout is None:
                 timeout = self.timeout
             try:
@@ -195,6 +174,34 @@ class ConnectionPool(BasePool[Connection[Any]]):
         logger.info("connection given by %r", self.name)
         return conn
 
+    def _get_ready_connection(
+        self, timeout: Optional[float]
+    ) -> Optional[Connection[Any]]:
+        """Return a connection, if the client deserves one."""
+        conn: Optional[Connection[Any]] = None
+        if self._pool:
+            # Take a connection ready out of the pool
+            conn = self._pool.popleft()
+            if len(self._pool) < self._nconns_min:
+                self._nconns_min = len(self._pool)
+        elif self.max_waiting and len(self._waiting) >= self.max_waiting:
+            self._stats[self._REQUESTS_ERRORS] += 1
+            raise TooManyRequests(
+                f"the pool {self.name!r} has aleady"
+                f" {len(self._waiting)} requests waiting"
+            )
+        return conn
+
+    def _maybe_grow_pool(self) -> None:
+        # Allow only one thread at time to grow the pool (or returning
+        # connections might be starved).
+        if self._nconns >= self._max_size or self._growing:
+            return
+        self._nconns += 1
+        logger.info("growing pool %r to %s", self.name, self._nconns)
+        self._growing = True
+        self.run_task(AddConnection(self, growing=True))
+
     def putconn(self, conn: Connection[Any]) -> None:
         """Return a connection to the loving hands of its pool.
 
@@ -206,11 +213,7 @@ class ConnectionPool(BasePool[Connection[Any]]):
 
         logger.info("returning connection to %r", self.name)
 
-        # If the pool is closed just close the connection instead of returning
-        # it to the pool. For extra refcare remove the pool reference from it.
-        if self._closed:
-            conn._pool = None
-            conn.close()
+        if self._maybe_close_connection(conn):
             return
 
         # Use a worker to perform eventual maintenance work in a separate thread
@@ -219,6 +222,20 @@ class ConnectionPool(BasePool[Connection[Any]]):
         else:
             self._return_connection(conn)
 
+    def _maybe_close_connection(self, conn: Connection[Any]) -> bool:
+        """Close a returned connection if necessary.
+
+        Return `!True if the connection was closed.
+        """
+        # If the pool is closed just close the connection instead of returning
+        # it to the pool. For extra refcare remove the pool reference from it.
+        if not self._closed:
+            return False
+
+        conn._pool = None
+        conn.close()
+        return True
+
     def open(self, wait: bool = False, timeout: float = 30.0) -> None:
         """Open the pool by starting connecting and and accepting clients.
 
index 7fe772ebc531bdd02789dc672fa3a4d4b6c2bc06..e9bb2adc4310e7689f7d2db58c9028439bbe4167 100644 (file)
@@ -15,8 +15,8 @@ from weakref import ref
 from contextlib import asynccontextmanager
 
 from psycopg import errors as e
+from psycopg import AsyncConnection
 from psycopg.pq import TransactionStatus
-from psycopg.connection_async import AsyncConnection
 
 from .base import ConnectionAttempt, BasePool
 from .sched import AsyncScheduler
@@ -109,40 +109,20 @@ class AsyncConnectionPool(BasePool[AsyncConnection[Any]]):
         # or if the client needs to wait.
         async with self._lock:
             self._check_open_getconn()
-
-            pos: Optional[AsyncClient] = None
-            if self._pool:
-                # Take a connection ready out of the pool
-                conn = self._pool.popleft()
-                if len(self._pool) < self._nconns_min:
-                    self._nconns_min = len(self._pool)
-            else:
-                if self.max_waiting and len(self._waiting) >= self.max_waiting:
-                    self._stats[self._REQUESTS_ERRORS] += 1
-                    raise TooManyRequests(
-                        f"the pool {self.name!r} has aleady"
-                        f" {len(self._waiting)} requests waiting"
-                    )
-
+            conn = await self._get_ready_connection(timeout)
+            if not conn:
                 # No connection available: put the client in the waiting queue
                 t0 = monotonic()
                 pos = AsyncClient()
                 self._waiting.append(pos)
                 self._stats[self._REQUESTS_QUEUED] += 1
 
-                # Allow only one task at time to grow the pool (or returning
-                # connections might be starved).
-                if self._nconns < self._max_size and not self._growing:
-                    self._nconns += 1
-                    logger.info(
-                        "growing pool %r to %s", self.name, self._nconns
-                    )
-                    self._growing = True
-                    self.run_task(AddConnection(self, growing=True))
+                # If there is space for the pool to grow, let's do it
+                self._maybe_grow_pool()
 
         # If we are in the waiting queue, wait to be assigned a connection
         # (outside the critical section, so only the waiting client is locked)
-        if pos:
+        if not conn:
             if timeout is None:
                 timeout = self.timeout
             try:
@@ -161,16 +141,37 @@ class AsyncConnectionPool(BasePool[AsyncConnection[Any]]):
         logger.info("connection given by %r", self.name)
         return conn
 
+    async def _get_ready_connection(
+        self, timeout: Optional[float]
+    ) -> Optional[AsyncConnection[Any]]:
+        conn: Optional[AsyncConnection[Any]] = None
+        if self._pool:
+            # Take a connection ready out of the pool
+            conn = self._pool.popleft()
+            if len(self._pool) < self._nconns_min:
+                self._nconns_min = len(self._pool)
+        elif self.max_waiting and len(self._waiting) >= self.max_waiting:
+            self._stats[self._REQUESTS_ERRORS] += 1
+            raise TooManyRequests(
+                f"the pool {self.name!r} has aleady"
+                f" {len(self._waiting)} requests waiting"
+            )
+        return conn
+
+    def _maybe_grow_pool(self) -> None:
+        # Allow only one task at time to grow the pool (or returning
+        # connections might be starved).
+        if self._nconns < self._max_size and not self._growing:
+            self._nconns += 1
+            logger.info("growing pool %r to %s", self.name, self._nconns)
+            self._growing = True
+            self.run_task(AddConnection(self, growing=True))
+
     async def putconn(self, conn: AsyncConnection[Any]) -> None:
         self._check_pool_putconn(conn)
 
         logger.info("returning connection to %r", self.name)
-
-        # If the pool is closed just close the connection instead of returning
-        # it to the pool. For extra refcare remove the pool reference from it.
-        if self._closed:
-            conn._pool = None
-            await conn.close()
+        if await self._maybe_close_connection(conn):
             return
 
         # Use a worker to perform eventual maintenance work in a separate task
@@ -179,6 +180,18 @@ class AsyncConnectionPool(BasePool[AsyncConnection[Any]]):
         else:
             await self._return_connection(conn)
 
+    async def _maybe_close_connection(
+        self, conn: AsyncConnection[Any]
+    ) -> bool:
+        # If the pool is closed just close the connection instead of returning
+        # it to the pool. For extra refcare remove the pool reference from it.
+        if not self._closed:
+            return False
+
+        conn._pool = None
+        await conn.close()
+        return True
+
     async def open(self, wait: bool = False, timeout: float = 30.0) -> None:
         async with self._lock:
             self._open()