from random import random
from typing import Any, Callable, Dict, Generic, Optional, Tuple
-from psycopg.abc import ConnectionType
from psycopg import errors as e
-from .errors import PoolClosed
+from psycopg.abc import ConnectionType
+from .errors import PoolClosed
from ._compat import Counter, Deque
import logging
import threading
-from time import monotonic
from typing import Any, Optional, Tuple
from psycopg import Connection
from psycopg.pq import TransactionStatus
-from .pool import ConnectionPool, WaitingClient
-from .pool import AddConnection, ReturnConnection
+from .pool import ConnectionPool, AddConnection
from .errors import PoolTimeout, TooManyRequests
from ._compat import ConnectionTimeout
# or to grow/shrink.
return
+ def _maybe_grow_pool(self) -> None:
+ # null pools don't grow
+ pass
+
class NullConnectionPool(_BaseNullConnectionPool, ConnectionPool):
def wait(self, timeout: float = 30.0) -> None:
logger.info("pool %r is ready to use", self.name)
- def getconn(self, timeout: Optional[float] = None) -> Connection[Any]:
- logger.info("connection requested from %r", self.name)
- self._stats[self._REQUESTS_NUM] += 1
-
- # Critical section: decide here if there's a connection ready
- # or if the client needs to wait.
- with self._lock:
- self._check_open_getconn()
-
- pos: Optional[WaitingClient] = None
- if self.max_size == 0 or self._nconns < self.max_size:
- # Create a new connection for the client
- try:
- conn = self._connect(timeout=timeout)
- except ConnectionTimeout as ex:
- raise PoolTimeout(str(ex)) from None
- self._nconns += 1
- else:
- if self.max_waiting and len(self._waiting) >= self.max_waiting:
- self._stats[self._REQUESTS_ERRORS] += 1
- raise TooManyRequests(
- f"the pool {self.name!r} has aleady"
- f" {len(self._waiting)} requests waiting"
- )
-
- # No connection available: put the client in the waiting queue
- t0 = monotonic()
- pos = WaitingClient()
- self._waiting.append(pos)
- self._stats[self._REQUESTS_QUEUED] += 1
-
- # If we are in the waiting queue, wait to be assigned a connection
- # (outside the critical section, so only the waiting client is locked)
- if pos:
- if timeout is None:
- timeout = self.timeout
+ def _get_ready_connection(
+ self, timeout: Optional[float]
+ ) -> Optional[Connection[Any]]:
+ conn: Optional[Connection[Any]] = None
+ if self.max_size == 0 or self._nconns < self.max_size:
+ # Create a new connection for the client
try:
- conn = pos.wait(timeout=timeout)
- except Exception:
- self._stats[self._REQUESTS_ERRORS] += 1
- raise
- finally:
- t1 = monotonic()
- self._stats[self._REQUESTS_WAIT_MS] += int(1000.0 * (t1 - t0))
-
- # Tell the connection it belongs to a pool to avoid closing on __exit__
- conn._pool = self
- logger.info("connection given by %r", self.name)
+ conn = self._connect(timeout=timeout)
+ except ConnectionTimeout as ex:
+ raise PoolTimeout(str(ex)) from None
+ self._nconns += 1
+
+ elif self.max_waiting and len(self._waiting) >= self.max_waiting:
+ self._stats[self._REQUESTS_ERRORS] += 1
+ raise TooManyRequests(
+ f"the pool {self.name!r} has aleady"
+ f" {len(self._waiting)} requests waiting"
+ )
return conn
- def putconn(self, conn: Connection[Any]) -> None:
- # Quick check to discard the wrong connection
- self._check_pool_putconn(conn)
-
- logger.info("returning connection to %r", self.name)
-
- # Close the connection if no client is waiting for it, or if the pool
- # is closed. For extra refcare remove the pool reference from it.
- # Maintain the stats.
+ def _maybe_close_connection(self, conn: Connection[Any]) -> bool:
with self._lock:
- if self._closed or not self._waiting:
- conn._pool = None
- if conn.pgconn.transaction_status == TransactionStatus.UNKNOWN:
- self._stats[self._RETURNS_BAD] += 1
- conn.close()
- self._nconns -= 1
- return
-
- # Use a worker to perform eventual maintenance work in a separate thread
- if self._reset:
- self.run_task(ReturnConnection(self, conn))
- else:
- self._return_connection(conn)
+ if not self._closed and self._waiting:
+ return False
+
+ conn._pool = None
+ if conn.pgconn.transaction_status == TransactionStatus.UNKNOWN:
+ self._stats[self._RETURNS_BAD] += 1
+ conn.close()
+ self._nconns -= 1
+ return True
def resize(self, min_size: int, max_size: Optional[int] = None) -> None:
"""Change the size of the pool during runtime.
import asyncio
import logging
-from time import monotonic
from typing import Any, Optional
+from psycopg import AsyncConnection
from psycopg.pq import TransactionStatus
-from psycopg.connection_async import AsyncConnection
from .errors import PoolTimeout, TooManyRequests
from ._compat import ConnectionTimeout
from .null_pool import _BaseNullConnectionPool
-from .pool_async import AsyncConnectionPool, AsyncClient
-from .pool_async import AddConnection, ReturnConnection
+from .pool_async import AsyncConnectionPool, AddConnection
logger = logging.getLogger("psycopg.pool")
logger.info("pool %r is ready to use", self.name)
- async def getconn(
- self, timeout: Optional[float] = None
- ) -> AsyncConnection[Any]:
- logger.info("connection requested from %r", self.name)
- self._stats[self._REQUESTS_NUM] += 1
-
- # Critical section: decide here if there's a connection ready
- # or if the client needs to wait.
- async with self._lock:
- self._check_open_getconn()
-
- pos: Optional[AsyncClient] = None
- if self.max_size == 0 or self._nconns < self.max_size:
- # Create a new connection for the client
- try:
- conn = await self._connect(timeout=timeout)
- except ConnectionTimeout as ex:
- raise PoolTimeout(str(ex)) from None
- self._nconns += 1
- else:
- if self.max_waiting and len(self._waiting) >= self.max_waiting:
- self._stats[self._REQUESTS_ERRORS] += 1
- raise TooManyRequests(
- f"the pool {self.name!r} has aleady"
- f" {len(self._waiting)} requests waiting"
- )
-
- # No connection available: put the client in the waiting queue
- t0 = monotonic()
- pos = AsyncClient()
- self._waiting.append(pos)
- self._stats[self._REQUESTS_QUEUED] += 1
-
- # If we are in the waiting queue, wait to be assigned a connection
- # (outside the critical section, so only the waiting client is locked)
- if pos:
- if timeout is None:
- timeout = self.timeout
+ async def _get_ready_connection(
+ self, timeout: Optional[float]
+ ) -> Optional[AsyncConnection[Any]]:
+ conn: Optional[AsyncConnection[Any]] = None
+ if self.max_size == 0 or self._nconns < self.max_size:
+ # Create a new connection for the client
try:
- conn = await pos.wait(timeout=timeout)
- except Exception:
- self._stats[self._REQUESTS_ERRORS] += 1
- raise
- finally:
- t1 = monotonic()
- self._stats[self._REQUESTS_WAIT_MS] += int(1000.0 * (t1 - t0))
-
- # Tell the connection it belongs to a pool to avoid closing on __exit__
- conn._pool = self
- logger.info("connection given by %r", self.name)
+ conn = await self._connect(timeout=timeout)
+ except ConnectionTimeout as ex:
+ raise PoolTimeout(str(ex)) from None
+ self._nconns += 1
+ elif self.max_waiting and len(self._waiting) >= self.max_waiting:
+ self._stats[self._REQUESTS_ERRORS] += 1
+ raise TooManyRequests(
+ f"the pool {self.name!r} has aleady"
+ f" {len(self._waiting)} requests waiting"
+ )
return conn
- async def putconn(self, conn: AsyncConnection[Any]) -> None:
- # Quick check to discard the wrong connection
- self._check_pool_putconn(conn)
-
- logger.info("returning connection to %r", self.name)
-
+ async def _maybe_close_connection(
+ self, conn: AsyncConnection[Any]
+ ) -> bool:
# Close the connection if no client is waiting for it, or if the pool
# is closed. For extra refcare remove the pool reference from it.
# Maintain the stats.
async with self._lock:
- if self._closed or not self._waiting:
- conn._pool = None
- if conn.pgconn.transaction_status == TransactionStatus.UNKNOWN:
- self._stats[self._RETURNS_BAD] += 1
- await conn.close()
- self._nconns -= 1
- return
-
- # Use a worker to perform eventual maintenance work in a separate task
- if self._reset:
- self.run_task(ReturnConnection(self, conn))
- else:
- await self._return_connection(conn)
+ if not self._closed and self._waiting:
+ return False
+
+ conn._pool = None
+ if conn.pgconn.transaction_status == TransactionStatus.UNKNOWN:
+ self._stats[self._RETURNS_BAD] += 1
+ await conn.close()
+ self._nconns -= 1
+ return True
async def resize(
self, min_size: int, max_size: Optional[int] = None
# or if the client needs to wait.
with self._lock:
self._check_open_getconn()
-
- pos: Optional[WaitingClient] = None
- if self._pool:
- # Take a connection ready out of the pool
- conn = self._pool.popleft()
- if len(self._pool) < self._nconns_min:
- self._nconns_min = len(self._pool)
- else:
- if self.max_waiting and len(self._waiting) >= self.max_waiting:
- self._stats[self._REQUESTS_ERRORS] += 1
- raise TooManyRequests(
- f"the pool {self.name!r} has aleady"
- f" {len(self._waiting)} requests waiting"
- )
-
+ conn = self._get_ready_connection(timeout)
+ if not conn:
# No connection available: put the client in the waiting queue
t0 = monotonic()
pos = WaitingClient()
self._stats[self._REQUESTS_QUEUED] += 1
# If there is space for the pool to grow, let's do it
- # Allow only one thread at time to grow the pool (or returning
- # connections might be starved).
- if self._nconns < self._max_size and not self._growing:
- self._nconns += 1
- logger.info(
- "growing pool %r to %s", self.name, self._nconns
- )
- self._growing = True
- self.run_task(AddConnection(self, growing=True))
+ self._maybe_grow_pool()
# If we are in the waiting queue, wait to be assigned a connection
# (outside the critical section, so only the waiting client is locked)
- if pos:
+ if not conn:
if timeout is None:
timeout = self.timeout
try:
logger.info("connection given by %r", self.name)
return conn
+ def _get_ready_connection(
+ self, timeout: Optional[float]
+ ) -> Optional[Connection[Any]]:
+ """Return a connection, if the client deserves one."""
+ conn: Optional[Connection[Any]] = None
+ if self._pool:
+ # Take a connection ready out of the pool
+ conn = self._pool.popleft()
+ if len(self._pool) < self._nconns_min:
+ self._nconns_min = len(self._pool)
+ elif self.max_waiting and len(self._waiting) >= self.max_waiting:
+ self._stats[self._REQUESTS_ERRORS] += 1
+ raise TooManyRequests(
+ f"the pool {self.name!r} has aleady"
+ f" {len(self._waiting)} requests waiting"
+ )
+ return conn
+
+ def _maybe_grow_pool(self) -> None:
+ # Allow only one thread at time to grow the pool (or returning
+ # connections might be starved).
+ if self._nconns >= self._max_size or self._growing:
+ return
+ self._nconns += 1
+ logger.info("growing pool %r to %s", self.name, self._nconns)
+ self._growing = True
+ self.run_task(AddConnection(self, growing=True))
+
def putconn(self, conn: Connection[Any]) -> None:
"""Return a connection to the loving hands of its pool.
logger.info("returning connection to %r", self.name)
- # If the pool is closed just close the connection instead of returning
- # it to the pool. For extra refcare remove the pool reference from it.
- if self._closed:
- conn._pool = None
- conn.close()
+ if self._maybe_close_connection(conn):
return
# Use a worker to perform eventual maintenance work in a separate thread
else:
self._return_connection(conn)
+ def _maybe_close_connection(self, conn: Connection[Any]) -> bool:
+ """Close a returned connection if necessary.
+
+ Return `!True if the connection was closed.
+ """
+ # If the pool is closed just close the connection instead of returning
+ # it to the pool. For extra refcare remove the pool reference from it.
+ if not self._closed:
+ return False
+
+ conn._pool = None
+ conn.close()
+ return True
+
def open(self, wait: bool = False, timeout: float = 30.0) -> None:
"""Open the pool by starting connecting and and accepting clients.
from contextlib import asynccontextmanager
from psycopg import errors as e
+from psycopg import AsyncConnection
from psycopg.pq import TransactionStatus
-from psycopg.connection_async import AsyncConnection
from .base import ConnectionAttempt, BasePool
from .sched import AsyncScheduler
# or if the client needs to wait.
async with self._lock:
self._check_open_getconn()
-
- pos: Optional[AsyncClient] = None
- if self._pool:
- # Take a connection ready out of the pool
- conn = self._pool.popleft()
- if len(self._pool) < self._nconns_min:
- self._nconns_min = len(self._pool)
- else:
- if self.max_waiting and len(self._waiting) >= self.max_waiting:
- self._stats[self._REQUESTS_ERRORS] += 1
- raise TooManyRequests(
- f"the pool {self.name!r} has aleady"
- f" {len(self._waiting)} requests waiting"
- )
-
+ conn = await self._get_ready_connection(timeout)
+ if not conn:
# No connection available: put the client in the waiting queue
t0 = monotonic()
pos = AsyncClient()
self._waiting.append(pos)
self._stats[self._REQUESTS_QUEUED] += 1
- # Allow only one task at time to grow the pool (or returning
- # connections might be starved).
- if self._nconns < self._max_size and not self._growing:
- self._nconns += 1
- logger.info(
- "growing pool %r to %s", self.name, self._nconns
- )
- self._growing = True
- self.run_task(AddConnection(self, growing=True))
+ # If there is space for the pool to grow, let's do it
+ self._maybe_grow_pool()
# If we are in the waiting queue, wait to be assigned a connection
# (outside the critical section, so only the waiting client is locked)
- if pos:
+ if not conn:
if timeout is None:
timeout = self.timeout
try:
logger.info("connection given by %r", self.name)
return conn
+ async def _get_ready_connection(
+ self, timeout: Optional[float]
+ ) -> Optional[AsyncConnection[Any]]:
+ conn: Optional[AsyncConnection[Any]] = None
+ if self._pool:
+ # Take a connection ready out of the pool
+ conn = self._pool.popleft()
+ if len(self._pool) < self._nconns_min:
+ self._nconns_min = len(self._pool)
+ elif self.max_waiting and len(self._waiting) >= self.max_waiting:
+ self._stats[self._REQUESTS_ERRORS] += 1
+ raise TooManyRequests(
+ f"the pool {self.name!r} has aleady"
+ f" {len(self._waiting)} requests waiting"
+ )
+ return conn
+
+ def _maybe_grow_pool(self) -> None:
+ # Allow only one task at time to grow the pool (or returning
+ # connections might be starved).
+ if self._nconns < self._max_size and not self._growing:
+ self._nconns += 1
+ logger.info("growing pool %r to %s", self.name, self._nconns)
+ self._growing = True
+ self.run_task(AddConnection(self, growing=True))
+
async def putconn(self, conn: AsyncConnection[Any]) -> None:
self._check_pool_putconn(conn)
logger.info("returning connection to %r", self.name)
-
- # If the pool is closed just close the connection instead of returning
- # it to the pool. For extra refcare remove the pool reference from it.
- if self._closed:
- conn._pool = None
- await conn.close()
+ if await self._maybe_close_connection(conn):
return
# Use a worker to perform eventual maintenance work in a separate task
else:
await self._return_connection(conn)
+ async def _maybe_close_connection(
+ self, conn: AsyncConnection[Any]
+ ) -> bool:
+ # If the pool is closed just close the connection instead of returning
+ # it to the pool. For extra refcare remove the pool reference from it.
+ if not self._closed:
+ return False
+
+ conn._pool = None
+ await conn.close()
+ return True
+
async def open(self, wait: bool = False, timeout: float = 30.0) -> None:
async with self._lock:
self._open()