# Copyright (C) 2021 The Psycopg Team
+from time import monotonic
from random import random
-from typing import Any, Callable, Dict, Generic, Optional
+from typing import Any, Callable, Dict, Generic, Optional, Tuple
from psycopg.abc import ConnectionType
from psycopg import errors as e
] = None,
num_workers: int = 3,
):
- if max_size is None:
- max_size = min_size
- if max_size < min_size:
- raise ValueError("max_size must be greater or equal than min_size")
+ min_size, max_size = self._check_size(min_size, max_size)
+
if not name:
num = BasePool._num_pool = BasePool._num_pool + 1
name = f"pool-{num}"
"""`!True` if the pool is closed."""
return self._closed
+ def _check_size(
+ self, min_size: int, max_size: Optional[int]
+ ) -> Tuple[int, int]:
+ if max_size is None:
+ max_size = min_size
+ if max_size < min_size:
+ raise ValueError("max_size must be greater or equal than min_size")
+
+ return min_size, max_size
+
def _check_open(self) -> None:
if self._closed and self._opened:
raise e.OperationalError(
else:
raise PoolClosed(f"the pool {self.name!r} is not open yet")
+ def _check_pool_putconn(self, conn: ConnectionType) -> None:
+ pool = getattr(conn, "_pool", None)
+ if pool is self:
+ return
+
+ if pool:
+ msg = f"it comes from pool {pool.name!r}"
+ else:
+ msg = "it doesn't come from any pool"
+ raise ValueError(
+ f"can't return connection to pool {self.name!r}, {msg}: {conn}"
+ )
+
def get_stats(self) -> Dict[str, int]:
"""
Return current stats about the pool usage.
"""
return value * (1.0 + ((max_pc - min_pc) * random()) + min_pc)
+ def _set_connection_expiry_date(self, conn: ConnectionType) -> None:
+ """Set an expiry date on a connection.
+
+ Add some randomness to avoid mass reconnection.
+ """
+ conn._expire_at = monotonic() + self._jitter(
+ self.max_lifetime, -0.05, 0.0
+ )
+
class ConnectionAttempt:
"""Keep the state of a connection attempt."""
"""
logger.info("connection requested from %r", self.name)
self._stats[self._REQUESTS_NUM] += 1
+
# Critical section: decide here if there's a connection ready
# or if the client needs to wait.
with self._lock:
it if you use the much more comfortable `connection()` context manager.
"""
# Quick check to discard the wrong connection
- pool = getattr(conn, "_pool", None)
- if pool is not self:
- if pool:
- msg = f"it comes from pool {pool.name!r}"
- else:
- msg = "it doesn't come from any pool"
- raise ValueError(
- f"can't return connection to pool {self.name!r}, {msg}: {conn}"
- )
+ self._check_pool_putconn(conn)
logger.info("returning connection to %r", self.name)
self._check_open()
self._start_workers()
+ self._start_initial_tasks()
self._closed = False
self._opened = True
for t in self._workers:
t.start()
+ def _start_initial_tasks(self) -> None:
# populate the pool with initial min_size connections in background
for i in range(self._nconns):
self.run_task(AddConnection(self))
self,
waiting_clients: Sequence["WaitingClient"] = (),
connections: Sequence[Connection[Any]] = (),
- timeout: float = 0,
+ timeout: float = 0.0,
) -> None:
# Stop the scheduler
def resize(self, min_size: int, max_size: Optional[int] = None) -> None:
"""Change the size of the pool during runtime."""
- if max_size is None:
- max_size = min_size
- if max_size < min_size:
- raise ValueError("max_size must be greater or equal than min_size")
+ min_size, max_size = self._check_size(min_size, max_size)
ngrow = max(0, min_size - self._min_size)
self._configure(conn)
status = conn.pgconn.transaction_status
if status != TransactionStatus.IDLE:
- nstatus = TransactionStatus(status).name
+ sname = TransactionStatus(status).name
raise e.ProgrammingError(
- f"connection left in status {nstatus} by configure function"
+ f"connection left in status {sname} by configure function"
f" {self._configure}: discarded"
)
# Set an expiry date, with some randomness to avoid mass reconnection
- conn._expire_at = monotonic() + self._jitter(
- self.max_lifetime, -0.05, 0.0
- )
+ self._set_connection_expiry_date(conn)
return conn
def _add_connection(
self._reset(conn)
status = conn.pgconn.transaction_status
if status != TransactionStatus.IDLE:
- nstatus = TransactionStatus(status).name
+ sname = TransactionStatus(status).name
raise e.ProgrammingError(
- f"connection left in status {nstatus} by reset function"
+ f"connection left in status {sname} by reset function"
f" {self._reset}: discarded"
)
except Exception as ex:
) -> AsyncConnection[Any]:
logger.info("connection requested from %r", self.name)
self._stats[self._REQUESTS_NUM] += 1
+
# Critical section: decide here if there's a connection ready
# or if the client needs to wait.
async with self._lock:
return conn
async def putconn(self, conn: AsyncConnection[Any]) -> None:
- # Quick check to discard the wrong connection
- pool = getattr(conn, "_pool", None)
- if pool is not self:
- if pool:
- msg = f"it comes from pool {pool.name!r}"
- else:
- msg = "it doesn't come from any pool"
- raise ValueError(
- f"can't return connection to pool {self.name!r}, {msg}: {conn}"
- )
+ self._check_pool_putconn(conn)
logger.info("returning connection to %r", self.name)
self._check_open()
self._start_workers()
+ self._start_initial_tasks()
self._closed = False
self._opened = True
)
self._workers.append(t)
+ def _start_initial_tasks(self) -> None:
# populate the pool with initial min_size connections in background
for i in range(self._nconns):
self.run_task(AddConnection(self))
self,
waiting_clients: Sequence["AsyncClient"] = (),
connections: Sequence[AsyncConnection[Any]] = (),
- timeout: float = 0,
+ timeout: float = 0.0,
) -> None:
# Stop the scheduler
await self._sched.enter(0, None)
async def resize(
self, min_size: int, max_size: Optional[int] = None
) -> None:
- if max_size is None:
- max_size = min_size
- if max_size < min_size:
- raise ValueError("max_size must be greater or equal than min_size")
+ min_size, max_size = self._check_size(min_size, max_size)
ngrow = max(0, min_size - self._min_size)
await self._configure(conn)
status = conn.pgconn.transaction_status
if status != TransactionStatus.IDLE:
- nstatus = TransactionStatus(status).name
+ sname = TransactionStatus(status).name
raise e.ProgrammingError(
- f"connection left in status {nstatus} by configure function"
+ f"connection left in status {sname} by configure function"
f" {self._configure}: discarded"
)
# Set an expiry date, with some randomness to avoid mass reconnection
- conn._expire_at = monotonic() + self._jitter(
- self.max_lifetime, -0.05, 0.0
- )
+ self._set_connection_expiry_date(conn)
return conn
async def _add_connection(
await self._reset(conn)
status = conn.pgconn.transaction_status
if status != TransactionStatus.IDLE:
- nstatus = TransactionStatus(status).name
+ sname = TransactionStatus(status).name
raise e.ProgrammingError(
- f"connection left in status {nstatus} by reset function"
+ f"connection left in status {sname} by reset function"
f" {self._reset}: discarded"
)
except Exception as ex: