From: Daniele Varrazzo Date: Fri, 12 Mar 2021 14:26:18 +0000 (+0100) Subject: Add pool.max_waiting X-Git-Tag: 3.0.dev0~87^2~4 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=4b849607a7218218ccb776779a13fa5a68181857;p=thirdparty%2Fpsycopg.git Add pool.max_waiting --- diff --git a/docs/api/pool.rst b/docs/api/pool.rst index 1d6983d6b..6cefb955a 100644 --- a/docs/api/pool.rst +++ b/docs/api/pool.rst @@ -86,6 +86,12 @@ The `!ConnectionPool` class the *timeout* default. Default: 30 seconds. :type timeout: `!float` + :param max_waiting: Maximum number of requests that can be queued to the + pool. Adding more requests will fail, raising + `TooManyRequests`. Specifying 0 (the default) means to + upper bound. + :type max_waiting: `!int` + :param max_lifetime: The maximum lifetime of a connection in the pool, in seconds. Connections used for longer get closed and replaced by a new one. The amount is reduced by a @@ -178,6 +184,10 @@ The `!ConnectionPool` class Subclass of `~psycopg3.OperationalError` +.. autoclass:: TooManyRequests() + + Subclass of `~psycopg3.OperationalError` + The `!AsyncConnectionPool` class -------------------------------- diff --git a/psycopg3/psycopg3/pool/__init__.py b/psycopg3/psycopg3/pool/__init__.py index 91f349693..4eeddd885 100644 --- a/psycopg3/psycopg3/pool/__init__.py +++ b/psycopg3/psycopg3/pool/__init__.py @@ -6,11 +6,12 @@ psycopg3 connection pool package from .pool import ConnectionPool from .async_pool import AsyncConnectionPool -from .errors import PoolClosed, PoolTimeout +from .errors import PoolClosed, PoolTimeout, TooManyRequests __all__ = [ "AsyncConnectionPool", "ConnectionPool", "PoolClosed", "PoolTimeout", + "TooManyRequests", ] diff --git a/psycopg3/psycopg3/pool/async_pool.py b/psycopg3/psycopg3/pool/async_pool.py index e268fcd81..f901ebc82 100644 --- a/psycopg3/psycopg3/pool/async_pool.py +++ b/psycopg3/psycopg3/pool/async_pool.py @@ -22,7 +22,7 @@ from ..utils.compat import asynccontextmanager, create_task from .base import ConnectionAttempt, BasePool from .sched import AsyncScheduler -from .errors import PoolClosed, PoolTimeout +from .errors import PoolClosed, PoolTimeout, TooManyRequests logger = logging.getLogger(__name__) @@ -118,7 +118,7 @@ class AsyncConnectionPool(BasePool[AsyncConnection]): async def getconn( self, timeout: Optional[float] = None ) -> AsyncConnection: - logger.info("connection requested to %r", self.name) + logger.info("connection requested from %r", self.name) self._stats[self._REQUESTS_NUM] += 1 # Critical section: decide here if there's a connection ready # or if the client needs to wait. @@ -133,6 +133,12 @@ class AsyncConnectionPool(BasePool[AsyncConnection]): if len(self._pool) < self._nconns_min: self._nconns_min = len(self._pool) else: + if self.max_waiting and len(self._waiting) >= self.max_waiting: + raise TooManyRequests( + f"the pool {self.name!r} has aleady" + f" {len(self._waiting)} requests waiting" + ) + # No connection available: put the client in the waiting queue t0 = monotonic() pos = AsyncClient() diff --git a/psycopg3/psycopg3/pool/base.py b/psycopg3/psycopg3/pool/base.py index 4fba7862e..03d9d860c 100644 --- a/psycopg3/psycopg3/pool/base.py +++ b/psycopg3/psycopg3/pool/base.py @@ -50,6 +50,7 @@ class BasePool(Generic[ConnectionType]): maxconn: Optional[int] = None, name: Optional[str] = None, timeout: float = 30.0, + max_waiting: int = 0, max_lifetime: float = 60 * 60.0, max_idle: float = 10 * 60.0, reconnect_timeout: float = 5 * 60.0, @@ -77,6 +78,7 @@ class BasePool(Generic[ConnectionType]): self._minconn = minconn self._maxconn = maxconn self.timeout = timeout + self.max_waiting = max_waiting self.reconnect_timeout = reconnect_timeout self.max_lifetime = max_lifetime self.max_idle = max_idle diff --git a/psycopg3/psycopg3/pool/errors.py b/psycopg3/psycopg3/pool/errors.py index 12f8fa64a..23eef69d0 100644 --- a/psycopg3/psycopg3/pool/errors.py +++ b/psycopg3/psycopg3/pool/errors.py @@ -17,3 +17,9 @@ class PoolTimeout(e.OperationalError): """The pool couldn't provide a connection in acceptable time.""" __module__ = "psycopg3.pool" + + +class TooManyRequests(e.OperationalError): + """Too many requests in the queue waiting for a connection from the pool.""" + + __module__ = "psycopg3.pool" diff --git a/psycopg3/psycopg3/pool/pool.py b/psycopg3/psycopg3/pool/pool.py index d6e0860f0..ae58d36bf 100644 --- a/psycopg3/psycopg3/pool/pool.py +++ b/psycopg3/psycopg3/pool/pool.py @@ -21,7 +21,7 @@ from ..connection import Connection from .base import ConnectionAttempt, BasePool from .sched import Scheduler -from .errors import PoolClosed, PoolTimeout +from .errors import PoolClosed, PoolTimeout, TooManyRequests logger = logging.getLogger(__name__) @@ -132,8 +132,8 @@ class ConnectionPool(BasePool[Connection]): """Context manager to obtain a connection from the pool. Returned the connection immediately if available, otherwise wait up to - *timeout* or `self.timeout` and throw `PoolTimeout` if a connection is - not available in time. + *timeout* or `self.timeout` seconds and throw `PoolTimeout` if a + connection is not available in time. Upon context exit, return the connection to the pool. Apply the normal :ref:`connection context behaviour ` (commit/rollback @@ -161,7 +161,7 @@ class ConnectionPool(BasePool[Connection]): failing to do so will deplete the pool. A depleted pool is a sad pool: you don't want a depleted pool. """ - logger.info("connection requested to %r", self.name) + logger.info("connection requested from %r", self.name) self._stats[self._REQUESTS_NUM] += 1 # Critical section: decide here if there's a connection ready # or if the client needs to wait. @@ -176,6 +176,12 @@ class ConnectionPool(BasePool[Connection]): if len(self._pool) < self._nconns_min: self._nconns_min = len(self._pool) else: + if self.max_waiting and len(self._waiting) >= self.max_waiting: + raise TooManyRequests( + f"the pool {self.name!r} has aleady" + f" {len(self._waiting)} requests waiting" + ) + # No connection available: put the client in the waiting queue t0 = monotonic() pos = WaitingClient() diff --git a/tests/pool/test_pool.py b/tests/pool/test_pool.py index 8be90ca37..64a728aa7 100644 --- a/tests/pool/test_pool.py +++ b/tests/pool/test_pool.py @@ -2,7 +2,7 @@ import sys import logging import weakref from time import sleep, time -from threading import Thread +from threading import Thread, Event from collections import Counter import pytest @@ -291,6 +291,40 @@ def test_queue(dsn, retries): assert len(set(r[2] for r in results)) == 2, results +@pytest.mark.slow +def test_queue_size(dsn): + def worker(t, ev=None): + try: + with p.connection(): + if ev: + ev.set() + sleep(t) + except pool.TooManyRequests as e: + errors.append(e) + else: + success.append(True) + + errors = [] + success = [] + + with pool.ConnectionPool(dsn, minconn=1, max_waiting=3) as p: + p.wait() + ev = Event() + t = Thread(target=worker, args=(0.3, ev)) + t.start() + ev.wait() + + ts = [Thread(target=worker, args=(0.1,)) for i in range(4)] + [t.start() for t in ts] + [t.join() for t in ts] + + assert len(success) == 4 + assert len(errors) == 1 + assert isinstance(errors[0], pool.TooManyRequests) + assert p.name in str(errors[0]) + assert str(p.max_waiting) in str(errors[0]) + + @pytest.mark.slow def test_queue_timeout(dsn): def worker(n): diff --git a/tests/pool/test_pool_async.py b/tests/pool/test_pool_async.py index 59159496a..3c92e7ba3 100644 --- a/tests/pool/test_pool_async.py +++ b/tests/pool/test_pool_async.py @@ -309,6 +309,38 @@ async def test_queue(dsn, retries): assert len(set(r[2] for r in results)) == 2, results +@pytest.mark.slow +async def test_queue_size(dsn): + async def worker(t, ev=None): + try: + async with p.connection(): + if ev: + ev.set() + await asyncio.sleep(t) + except pool.TooManyRequests as e: + errors.append(e) + else: + success.append(True) + + errors = [] + success = [] + + async with pool.AsyncConnectionPool(dsn, minconn=1, max_waiting=3) as p: + await p.wait() + ev = asyncio.Event() + create_task(worker(0.3, ev)) + await ev.wait() + + ts = [create_task(worker(0.1)) for i in range(4)] + await asyncio.gather(*ts) + + assert len(success) == 4 + assert len(errors) == 1 + assert isinstance(errors[0], pool.TooManyRequests) + assert p.name in str(errors[0]) + assert str(p.max_waiting) in str(errors[0]) + + @pytest.mark.slow async def test_queue_timeout(dsn): async def worker(n):