the *timeout* default. Default: 30 seconds.
:type timeout: `!float`
+ :param max_waiting: Maximum number of requests that can be queued to the
+ pool. Adding more requests will fail, raising
+ `TooManyRequests`. Specifying 0 (the default) means to
+ upper bound.
+ :type max_waiting: `!int`
+
:param max_lifetime: The maximum lifetime of a connection in the pool, in
seconds. Connections used for longer get closed and
replaced by a new one. The amount is reduced by a
Subclass of `~psycopg3.OperationalError`
+.. autoclass:: TooManyRequests()
+
+ Subclass of `~psycopg3.OperationalError`
+
The `!AsyncConnectionPool` class
--------------------------------
from .pool import ConnectionPool
from .async_pool import AsyncConnectionPool
-from .errors import PoolClosed, PoolTimeout
+from .errors import PoolClosed, PoolTimeout, TooManyRequests
__all__ = [
"AsyncConnectionPool",
"ConnectionPool",
"PoolClosed",
"PoolTimeout",
+ "TooManyRequests",
]
from .base import ConnectionAttempt, BasePool
from .sched import AsyncScheduler
-from .errors import PoolClosed, PoolTimeout
+from .errors import PoolClosed, PoolTimeout, TooManyRequests
logger = logging.getLogger(__name__)
async def getconn(
self, timeout: Optional[float] = None
) -> AsyncConnection:
- logger.info("connection requested to %r", self.name)
+ logger.info("connection requested from %r", self.name)
self._stats[self._REQUESTS_NUM] += 1
# Critical section: decide here if there's a connection ready
# or if the client needs to wait.
if len(self._pool) < self._nconns_min:
self._nconns_min = len(self._pool)
else:
+ if self.max_waiting and len(self._waiting) >= self.max_waiting:
+ raise TooManyRequests(
+ f"the pool {self.name!r} has aleady"
+ f" {len(self._waiting)} requests waiting"
+ )
+
# No connection available: put the client in the waiting queue
t0 = monotonic()
pos = AsyncClient()
maxconn: Optional[int] = None,
name: Optional[str] = None,
timeout: float = 30.0,
+ max_waiting: int = 0,
max_lifetime: float = 60 * 60.0,
max_idle: float = 10 * 60.0,
reconnect_timeout: float = 5 * 60.0,
self._minconn = minconn
self._maxconn = maxconn
self.timeout = timeout
+ self.max_waiting = max_waiting
self.reconnect_timeout = reconnect_timeout
self.max_lifetime = max_lifetime
self.max_idle = max_idle
"""The pool couldn't provide a connection in acceptable time."""
__module__ = "psycopg3.pool"
+
+
+class TooManyRequests(e.OperationalError):
+ """Too many requests in the queue waiting for a connection from the pool."""
+
+ __module__ = "psycopg3.pool"
from .base import ConnectionAttempt, BasePool
from .sched import Scheduler
-from .errors import PoolClosed, PoolTimeout
+from .errors import PoolClosed, PoolTimeout, TooManyRequests
logger = logging.getLogger(__name__)
"""Context manager to obtain a connection from the pool.
Returned the connection immediately if available, otherwise wait up to
- *timeout* or `self.timeout` and throw `PoolTimeout` if a connection is
- not available in time.
+ *timeout* or `self.timeout` seconds and throw `PoolTimeout` if a
+ connection is not available in time.
Upon context exit, return the connection to the pool. Apply the normal
:ref:`connection context behaviour <with-connection>` (commit/rollback
failing to do so will deplete the pool. A depleted pool is a sad pool:
you don't want a depleted pool.
"""
- logger.info("connection requested to %r", self.name)
+ logger.info("connection requested from %r", self.name)
self._stats[self._REQUESTS_NUM] += 1
# Critical section: decide here if there's a connection ready
# or if the client needs to wait.
if len(self._pool) < self._nconns_min:
self._nconns_min = len(self._pool)
else:
+ if self.max_waiting and len(self._waiting) >= self.max_waiting:
+ raise TooManyRequests(
+ f"the pool {self.name!r} has aleady"
+ f" {len(self._waiting)} requests waiting"
+ )
+
# No connection available: put the client in the waiting queue
t0 = monotonic()
pos = WaitingClient()
import logging
import weakref
from time import sleep, time
-from threading import Thread
+from threading import Thread, Event
from collections import Counter
import pytest
assert len(set(r[2] for r in results)) == 2, results
+@pytest.mark.slow
+def test_queue_size(dsn):
+ def worker(t, ev=None):
+ try:
+ with p.connection():
+ if ev:
+ ev.set()
+ sleep(t)
+ except pool.TooManyRequests as e:
+ errors.append(e)
+ else:
+ success.append(True)
+
+ errors = []
+ success = []
+
+ with pool.ConnectionPool(dsn, minconn=1, max_waiting=3) as p:
+ p.wait()
+ ev = Event()
+ t = Thread(target=worker, args=(0.3, ev))
+ t.start()
+ ev.wait()
+
+ ts = [Thread(target=worker, args=(0.1,)) for i in range(4)]
+ [t.start() for t in ts]
+ [t.join() for t in ts]
+
+ assert len(success) == 4
+ assert len(errors) == 1
+ assert isinstance(errors[0], pool.TooManyRequests)
+ assert p.name in str(errors[0])
+ assert str(p.max_waiting) in str(errors[0])
+
+
@pytest.mark.slow
def test_queue_timeout(dsn):
def worker(n):
assert len(set(r[2] for r in results)) == 2, results
+@pytest.mark.slow
+async def test_queue_size(dsn):
+ async def worker(t, ev=None):
+ try:
+ async with p.connection():
+ if ev:
+ ev.set()
+ await asyncio.sleep(t)
+ except pool.TooManyRequests as e:
+ errors.append(e)
+ else:
+ success.append(True)
+
+ errors = []
+ success = []
+
+ async with pool.AsyncConnectionPool(dsn, minconn=1, max_waiting=3) as p:
+ await p.wait()
+ ev = asyncio.Event()
+ create_task(worker(0.3, ev))
+ await ev.wait()
+
+ ts = [create_task(worker(0.1)) for i in range(4)]
+ await asyncio.gather(*ts)
+
+ assert len(success) == 4
+ assert len(errors) == 1
+ assert isinstance(errors[0], pool.TooManyRequests)
+ assert p.name in str(errors[0])
+ assert str(p.max_waiting) in str(errors[0])
+
+
@pytest.mark.slow
async def test_queue_timeout(dsn):
async def worker(n):