From: Daniele Varrazzo Date: Sat, 8 Jan 2022 17:35:41 +0000 (+0100) Subject: Refactor pools to reduce code duplication X-Git-Tag: pool-3.1~21^2~1 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=b32a005303800a9e950d16cb280e34c635453654;p=thirdparty%2Fpsycopg.git Refactor pools to reduce code duplication Use common getconn/putconn implementations and implement only the different behaviours in subclasses. --- diff --git a/psycopg_pool/psycopg_pool/base.py b/psycopg_pool/psycopg_pool/base.py index 1e3187b55..a5ffab929 100644 --- a/psycopg_pool/psycopg_pool/base.py +++ b/psycopg_pool/psycopg_pool/base.py @@ -8,10 +8,10 @@ from time import monotonic from random import random from typing import Any, Callable, Dict, Generic, Optional, Tuple -from psycopg.abc import ConnectionType from psycopg import errors as e -from .errors import PoolClosed +from psycopg.abc import ConnectionType +from .errors import PoolClosed from ._compat import Counter, Deque diff --git a/psycopg_pool/psycopg_pool/null_pool.py b/psycopg_pool/psycopg_pool/null_pool.py index 479799b28..62e1b3f85 100644 --- a/psycopg_pool/psycopg_pool/null_pool.py +++ b/psycopg_pool/psycopg_pool/null_pool.py @@ -6,14 +6,12 @@ Psycopg null connection pools import logging import threading -from time import monotonic from typing import Any, Optional, Tuple from psycopg import Connection from psycopg.pq import TransactionStatus -from .pool import ConnectionPool, WaitingClient -from .pool import AddConnection, ReturnConnection +from .pool import ConnectionPool, AddConnection from .errors import PoolTimeout, TooManyRequests from ._compat import ConnectionTimeout @@ -46,6 +44,10 @@ class _BaseNullConnectionPool: # or to grow/shrink. return + def _maybe_grow_pool(self) -> None: + # null pools don't grow + pass + class NullConnectionPool(_BaseNullConnectionPool, ConnectionPool): def wait(self, timeout: float = 30.0) -> None: @@ -79,79 +81,37 @@ class NullConnectionPool(_BaseNullConnectionPool, ConnectionPool): logger.info("pool %r is ready to use", self.name) - def getconn(self, timeout: Optional[float] = None) -> Connection[Any]: - logger.info("connection requested from %r", self.name) - self._stats[self._REQUESTS_NUM] += 1 - - # Critical section: decide here if there's a connection ready - # or if the client needs to wait. - with self._lock: - self._check_open_getconn() - - pos: Optional[WaitingClient] = None - if self.max_size == 0 or self._nconns < self.max_size: - # Create a new connection for the client - try: - conn = self._connect(timeout=timeout) - except ConnectionTimeout as ex: - raise PoolTimeout(str(ex)) from None - self._nconns += 1 - else: - if self.max_waiting and len(self._waiting) >= self.max_waiting: - self._stats[self._REQUESTS_ERRORS] += 1 - raise TooManyRequests( - f"the pool {self.name!r} has aleady" - f" {len(self._waiting)} requests waiting" - ) - - # No connection available: put the client in the waiting queue - t0 = monotonic() - pos = WaitingClient() - self._waiting.append(pos) - self._stats[self._REQUESTS_QUEUED] += 1 - - # If we are in the waiting queue, wait to be assigned a connection - # (outside the critical section, so only the waiting client is locked) - if pos: - if timeout is None: - timeout = self.timeout + def _get_ready_connection( + self, timeout: Optional[float] + ) -> Optional[Connection[Any]]: + conn: Optional[Connection[Any]] = None + if self.max_size == 0 or self._nconns < self.max_size: + # Create a new connection for the client try: - conn = pos.wait(timeout=timeout) - except Exception: - self._stats[self._REQUESTS_ERRORS] += 1 - raise - finally: - t1 = monotonic() - self._stats[self._REQUESTS_WAIT_MS] += int(1000.0 * (t1 - t0)) - - # Tell the connection it belongs to a pool to avoid closing on __exit__ - conn._pool = self - logger.info("connection given by %r", self.name) + conn = self._connect(timeout=timeout) + except ConnectionTimeout as ex: + raise PoolTimeout(str(ex)) from None + self._nconns += 1 + + elif self.max_waiting and len(self._waiting) >= self.max_waiting: + self._stats[self._REQUESTS_ERRORS] += 1 + raise TooManyRequests( + f"the pool {self.name!r} has aleady" + f" {len(self._waiting)} requests waiting" + ) return conn - def putconn(self, conn: Connection[Any]) -> None: - # Quick check to discard the wrong connection - self._check_pool_putconn(conn) - - logger.info("returning connection to %r", self.name) - - # Close the connection if no client is waiting for it, or if the pool - # is closed. For extra refcare remove the pool reference from it. - # Maintain the stats. + def _maybe_close_connection(self, conn: Connection[Any]) -> bool: with self._lock: - if self._closed or not self._waiting: - conn._pool = None - if conn.pgconn.transaction_status == TransactionStatus.UNKNOWN: - self._stats[self._RETURNS_BAD] += 1 - conn.close() - self._nconns -= 1 - return - - # Use a worker to perform eventual maintenance work in a separate thread - if self._reset: - self.run_task(ReturnConnection(self, conn)) - else: - self._return_connection(conn) + if not self._closed and self._waiting: + return False + + conn._pool = None + if conn.pgconn.transaction_status == TransactionStatus.UNKNOWN: + self._stats[self._RETURNS_BAD] += 1 + conn.close() + self._nconns -= 1 + return True def resize(self, min_size: int, max_size: Optional[int] = None) -> None: """Change the size of the pool during runtime. diff --git a/psycopg_pool/psycopg_pool/null_pool_async.py b/psycopg_pool/psycopg_pool/null_pool_async.py index 690122310..694e8f091 100644 --- a/psycopg_pool/psycopg_pool/null_pool_async.py +++ b/psycopg_pool/psycopg_pool/null_pool_async.py @@ -6,17 +6,15 @@ psycopg asynchronous null connection pool import asyncio import logging -from time import monotonic from typing import Any, Optional +from psycopg import AsyncConnection from psycopg.pq import TransactionStatus -from psycopg.connection_async import AsyncConnection from .errors import PoolTimeout, TooManyRequests from ._compat import ConnectionTimeout from .null_pool import _BaseNullConnectionPool -from .pool_async import AsyncConnectionPool, AsyncClient -from .pool_async import AddConnection, ReturnConnection +from .pool_async import AsyncConnectionPool, AddConnection logger = logging.getLogger("psycopg.pool") @@ -45,81 +43,41 @@ class AsyncNullConnectionPool(_BaseNullConnectionPool, AsyncConnectionPool): logger.info("pool %r is ready to use", self.name) - async def getconn( - self, timeout: Optional[float] = None - ) -> AsyncConnection[Any]: - logger.info("connection requested from %r", self.name) - self._stats[self._REQUESTS_NUM] += 1 - - # Critical section: decide here if there's a connection ready - # or if the client needs to wait. - async with self._lock: - self._check_open_getconn() - - pos: Optional[AsyncClient] = None - if self.max_size == 0 or self._nconns < self.max_size: - # Create a new connection for the client - try: - conn = await self._connect(timeout=timeout) - except ConnectionTimeout as ex: - raise PoolTimeout(str(ex)) from None - self._nconns += 1 - else: - if self.max_waiting and len(self._waiting) >= self.max_waiting: - self._stats[self._REQUESTS_ERRORS] += 1 - raise TooManyRequests( - f"the pool {self.name!r} has aleady" - f" {len(self._waiting)} requests waiting" - ) - - # No connection available: put the client in the waiting queue - t0 = monotonic() - pos = AsyncClient() - self._waiting.append(pos) - self._stats[self._REQUESTS_QUEUED] += 1 - - # If we are in the waiting queue, wait to be assigned a connection - # (outside the critical section, so only the waiting client is locked) - if pos: - if timeout is None: - timeout = self.timeout + async def _get_ready_connection( + self, timeout: Optional[float] + ) -> Optional[AsyncConnection[Any]]: + conn: Optional[AsyncConnection[Any]] = None + if self.max_size == 0 or self._nconns < self.max_size: + # Create a new connection for the client try: - conn = await pos.wait(timeout=timeout) - except Exception: - self._stats[self._REQUESTS_ERRORS] += 1 - raise - finally: - t1 = monotonic() - self._stats[self._REQUESTS_WAIT_MS] += int(1000.0 * (t1 - t0)) - - # Tell the connection it belongs to a pool to avoid closing on __exit__ - conn._pool = self - logger.info("connection given by %r", self.name) + conn = await self._connect(timeout=timeout) + except ConnectionTimeout as ex: + raise PoolTimeout(str(ex)) from None + self._nconns += 1 + elif self.max_waiting and len(self._waiting) >= self.max_waiting: + self._stats[self._REQUESTS_ERRORS] += 1 + raise TooManyRequests( + f"the pool {self.name!r} has aleady" + f" {len(self._waiting)} requests waiting" + ) return conn - async def putconn(self, conn: AsyncConnection[Any]) -> None: - # Quick check to discard the wrong connection - self._check_pool_putconn(conn) - - logger.info("returning connection to %r", self.name) - + async def _maybe_close_connection( + self, conn: AsyncConnection[Any] + ) -> bool: # Close the connection if no client is waiting for it, or if the pool # is closed. For extra refcare remove the pool reference from it. # Maintain the stats. async with self._lock: - if self._closed or not self._waiting: - conn._pool = None - if conn.pgconn.transaction_status == TransactionStatus.UNKNOWN: - self._stats[self._RETURNS_BAD] += 1 - await conn.close() - self._nconns -= 1 - return - - # Use a worker to perform eventual maintenance work in a separate task - if self._reset: - self.run_task(ReturnConnection(self, conn)) - else: - await self._return_connection(conn) + if not self._closed and self._waiting: + return False + + conn._pool = None + if conn.pgconn.transaction_status == TransactionStatus.UNKNOWN: + self._stats[self._RETURNS_BAD] += 1 + await conn.close() + self._nconns -= 1 + return True async def resize( self, min_size: int, max_size: Optional[int] = None diff --git a/psycopg_pool/psycopg_pool/pool.py b/psycopg_pool/psycopg_pool/pool.py index 06683ba90..c9f7c0c7a 100644 --- a/psycopg_pool/psycopg_pool/pool.py +++ b/psycopg_pool/psycopg_pool/pool.py @@ -142,21 +142,8 @@ class ConnectionPool(BasePool[Connection[Any]]): # or if the client needs to wait. with self._lock: self._check_open_getconn() - - pos: Optional[WaitingClient] = None - if self._pool: - # Take a connection ready out of the pool - conn = self._pool.popleft() - if len(self._pool) < self._nconns_min: - self._nconns_min = len(self._pool) - else: - if self.max_waiting and len(self._waiting) >= self.max_waiting: - self._stats[self._REQUESTS_ERRORS] += 1 - raise TooManyRequests( - f"the pool {self.name!r} has aleady" - f" {len(self._waiting)} requests waiting" - ) - + conn = self._get_ready_connection(timeout) + if not conn: # No connection available: put the client in the waiting queue t0 = monotonic() pos = WaitingClient() @@ -164,19 +151,11 @@ class ConnectionPool(BasePool[Connection[Any]]): self._stats[self._REQUESTS_QUEUED] += 1 # If there is space for the pool to grow, let's do it - # Allow only one thread at time to grow the pool (or returning - # connections might be starved). - if self._nconns < self._max_size and not self._growing: - self._nconns += 1 - logger.info( - "growing pool %r to %s", self.name, self._nconns - ) - self._growing = True - self.run_task(AddConnection(self, growing=True)) + self._maybe_grow_pool() # If we are in the waiting queue, wait to be assigned a connection # (outside the critical section, so only the waiting client is locked) - if pos: + if not conn: if timeout is None: timeout = self.timeout try: @@ -195,6 +174,34 @@ class ConnectionPool(BasePool[Connection[Any]]): logger.info("connection given by %r", self.name) return conn + def _get_ready_connection( + self, timeout: Optional[float] + ) -> Optional[Connection[Any]]: + """Return a connection, if the client deserves one.""" + conn: Optional[Connection[Any]] = None + if self._pool: + # Take a connection ready out of the pool + conn = self._pool.popleft() + if len(self._pool) < self._nconns_min: + self._nconns_min = len(self._pool) + elif self.max_waiting and len(self._waiting) >= self.max_waiting: + self._stats[self._REQUESTS_ERRORS] += 1 + raise TooManyRequests( + f"the pool {self.name!r} has aleady" + f" {len(self._waiting)} requests waiting" + ) + return conn + + def _maybe_grow_pool(self) -> None: + # Allow only one thread at time to grow the pool (or returning + # connections might be starved). + if self._nconns >= self._max_size or self._growing: + return + self._nconns += 1 + logger.info("growing pool %r to %s", self.name, self._nconns) + self._growing = True + self.run_task(AddConnection(self, growing=True)) + def putconn(self, conn: Connection[Any]) -> None: """Return a connection to the loving hands of its pool. @@ -206,11 +213,7 @@ class ConnectionPool(BasePool[Connection[Any]]): logger.info("returning connection to %r", self.name) - # If the pool is closed just close the connection instead of returning - # it to the pool. For extra refcare remove the pool reference from it. - if self._closed: - conn._pool = None - conn.close() + if self._maybe_close_connection(conn): return # Use a worker to perform eventual maintenance work in a separate thread @@ -219,6 +222,20 @@ class ConnectionPool(BasePool[Connection[Any]]): else: self._return_connection(conn) + def _maybe_close_connection(self, conn: Connection[Any]) -> bool: + """Close a returned connection if necessary. + + Return `!True if the connection was closed. + """ + # If the pool is closed just close the connection instead of returning + # it to the pool. For extra refcare remove the pool reference from it. + if not self._closed: + return False + + conn._pool = None + conn.close() + return True + def open(self, wait: bool = False, timeout: float = 30.0) -> None: """Open the pool by starting connecting and and accepting clients. diff --git a/psycopg_pool/psycopg_pool/pool_async.py b/psycopg_pool/psycopg_pool/pool_async.py index 7fe772ebc..e9bb2adc4 100644 --- a/psycopg_pool/psycopg_pool/pool_async.py +++ b/psycopg_pool/psycopg_pool/pool_async.py @@ -15,8 +15,8 @@ from weakref import ref from contextlib import asynccontextmanager from psycopg import errors as e +from psycopg import AsyncConnection from psycopg.pq import TransactionStatus -from psycopg.connection_async import AsyncConnection from .base import ConnectionAttempt, BasePool from .sched import AsyncScheduler @@ -109,40 +109,20 @@ class AsyncConnectionPool(BasePool[AsyncConnection[Any]]): # or if the client needs to wait. async with self._lock: self._check_open_getconn() - - pos: Optional[AsyncClient] = None - if self._pool: - # Take a connection ready out of the pool - conn = self._pool.popleft() - if len(self._pool) < self._nconns_min: - self._nconns_min = len(self._pool) - else: - if self.max_waiting and len(self._waiting) >= self.max_waiting: - self._stats[self._REQUESTS_ERRORS] += 1 - raise TooManyRequests( - f"the pool {self.name!r} has aleady" - f" {len(self._waiting)} requests waiting" - ) - + conn = await self._get_ready_connection(timeout) + if not conn: # No connection available: put the client in the waiting queue t0 = monotonic() pos = AsyncClient() self._waiting.append(pos) self._stats[self._REQUESTS_QUEUED] += 1 - # Allow only one task at time to grow the pool (or returning - # connections might be starved). - if self._nconns < self._max_size and not self._growing: - self._nconns += 1 - logger.info( - "growing pool %r to %s", self.name, self._nconns - ) - self._growing = True - self.run_task(AddConnection(self, growing=True)) + # If there is space for the pool to grow, let's do it + self._maybe_grow_pool() # If we are in the waiting queue, wait to be assigned a connection # (outside the critical section, so only the waiting client is locked) - if pos: + if not conn: if timeout is None: timeout = self.timeout try: @@ -161,16 +141,37 @@ class AsyncConnectionPool(BasePool[AsyncConnection[Any]]): logger.info("connection given by %r", self.name) return conn + async def _get_ready_connection( + self, timeout: Optional[float] + ) -> Optional[AsyncConnection[Any]]: + conn: Optional[AsyncConnection[Any]] = None + if self._pool: + # Take a connection ready out of the pool + conn = self._pool.popleft() + if len(self._pool) < self._nconns_min: + self._nconns_min = len(self._pool) + elif self.max_waiting and len(self._waiting) >= self.max_waiting: + self._stats[self._REQUESTS_ERRORS] += 1 + raise TooManyRequests( + f"the pool {self.name!r} has aleady" + f" {len(self._waiting)} requests waiting" + ) + return conn + + def _maybe_grow_pool(self) -> None: + # Allow only one task at time to grow the pool (or returning + # connections might be starved). + if self._nconns < self._max_size and not self._growing: + self._nconns += 1 + logger.info("growing pool %r to %s", self.name, self._nconns) + self._growing = True + self.run_task(AddConnection(self, growing=True)) + async def putconn(self, conn: AsyncConnection[Any]) -> None: self._check_pool_putconn(conn) logger.info("returning connection to %r", self.name) - - # If the pool is closed just close the connection instead of returning - # it to the pool. For extra refcare remove the pool reference from it. - if self._closed: - conn._pool = None - await conn.close() + if await self._maybe_close_connection(conn): return # Use a worker to perform eventual maintenance work in a separate task @@ -179,6 +180,18 @@ class AsyncConnectionPool(BasePool[AsyncConnection[Any]]): else: await self._return_connection(conn) + async def _maybe_close_connection( + self, conn: AsyncConnection[Any] + ) -> bool: + # If the pool is closed just close the connection instead of returning + # it to the pool. For extra refcare remove the pool reference from it. + if not self._closed: + return False + + conn._pool = None + await conn.close() + return True + async def open(self, wait: bool = False, timeout: float = 30.0) -> None: async with self._lock: self._open()