self,
conninfo: str = "",
*,
- kwargs: Optional[Dict[str, Any]] = None,
- min_size: int = 4,
- max_size: Optional[int] = None,
- open: bool = True,
- name: Optional[str] = None,
- timeout: float = 30.0,
- max_waiting: int = 0,
- max_lifetime: float = 60 * 60.0,
- max_idle: float = 10 * 60.0,
- reconnect_timeout: float = 5 * 60.0,
- reconnect_failed: Optional[Callable[["BasePool[ConnectionType]"], None]] = None,
- num_workers: int = 3,
+ kwargs: Optional[Dict[str, Any]],
+ min_size: int,
+ max_size: Optional[int],
+ open: bool,
+ name: Optional[str],
+ timeout: float,
+ max_waiting: int,
+ max_lifetime: float,
+ max_idle: float,
+ reconnect_timeout: float,
+ reconnect_failed: Optional[Callable[["BasePool[ConnectionType]"], None]],
+ num_workers: int,
):
min_size, max_size = self._check_size(min_size, max_size)
import logging
import threading
-from typing import Any, Optional, Tuple
+from typing import Any, Callable, Dict, Optional, Tuple, Type
from psycopg import Connection
from psycopg.pq import TransactionStatus
+from .base import BasePool
from .pool import ConnectionPool, AddConnection
from .errors import PoolTimeout, TooManyRequests
from ._compat import ConnectionTimeout
class _BaseNullConnectionPool:
- def __init__(
- self, conninfo: str = "", min_size: int = 0, *args: Any, **kwargs: Any
- ):
- super().__init__( # type: ignore[call-arg]
- conninfo, *args, min_size=min_size, **kwargs
- )
-
def _check_size(self, min_size: int, max_size: Optional[int]) -> Tuple[int, int]:
if max_size is None:
max_size = min_size
class NullConnectionPool(_BaseNullConnectionPool, ConnectionPool):
+ def __init__(
+ self,
+ conninfo: str = "",
+ *,
+ open: bool = True,
+ connection_class: Type[Connection[Any]] = Connection,
+ configure: Optional[Callable[[Connection[Any]], None]] = None,
+ reset: Optional[Callable[[Connection[Any]], None]] = None,
+ kwargs: Optional[Dict[str, Any]] = None,
+ # Note: default value changed to 0.
+ min_size: int = 0,
+ max_size: Optional[int] = None,
+ name: Optional[str] = None,
+ timeout: float = 30.0,
+ max_waiting: int = 0,
+ max_lifetime: float = 60 * 60.0,
+ max_idle: float = 10 * 60.0,
+ reconnect_timeout: float = 5 * 60.0,
+ reconnect_failed: Optional[Callable[[BasePool[Connection[Any]]], None]] = None,
+ num_workers: int = 3,
+ ):
+ super().__init__(
+ conninfo,
+ open=open,
+ connection_class=connection_class,
+ configure=configure,
+ reset=reset,
+ kwargs=kwargs,
+ min_size=min_size,
+ max_size=max_size,
+ name=name,
+ timeout=timeout,
+ max_waiting=max_waiting,
+ max_lifetime=max_lifetime,
+ max_idle=max_idle,
+ reconnect_timeout=reconnect_timeout,
+ reconnect_failed=reconnect_failed,
+ num_workers=num_workers,
+ )
+
def wait(self, timeout: float = 30.0) -> None:
"""
Create a connection for test.
import asyncio
import logging
-from typing import Any, Optional
+from typing import Any, Awaitable, Callable, Dict, Optional, Type
from psycopg import AsyncConnection
from psycopg.pq import TransactionStatus
+from .base import BasePool
from .errors import PoolTimeout, TooManyRequests
from ._compat import ConnectionTimeout
from .null_pool import _BaseNullConnectionPool
class AsyncNullConnectionPool(_BaseNullConnectionPool, AsyncConnectionPool):
+ def __init__(
+ self,
+ conninfo: str = "",
+ *,
+ open: bool = True,
+ connection_class: Type[AsyncConnection[Any]] = AsyncConnection,
+ configure: Optional[Callable[[AsyncConnection[Any]], Awaitable[None]]] = None,
+ reset: Optional[Callable[[AsyncConnection[Any]], Awaitable[None]]] = None,
+ kwargs: Optional[Dict[str, Any]] = None,
+ # Note: default value changed to 0.
+ min_size: int = 0,
+ max_size: Optional[int] = None,
+ name: Optional[str] = None,
+ timeout: float = 30.0,
+ max_waiting: int = 0,
+ max_lifetime: float = 60 * 60.0,
+ max_idle: float = 10 * 60.0,
+ reconnect_timeout: float = 5 * 60.0,
+ reconnect_failed: Optional[
+ Callable[[BasePool[AsyncConnection[None]]], None]
+ ] = None,
+ num_workers: int = 3,
+ ):
+ super().__init__(
+ conninfo,
+ open=open,
+ connection_class=connection_class,
+ configure=configure,
+ reset=reset,
+ kwargs=kwargs,
+ min_size=min_size,
+ max_size=max_size,
+ name=name,
+ timeout=timeout,
+ max_waiting=max_waiting,
+ max_lifetime=max_lifetime,
+ max_idle=max_idle,
+ reconnect_timeout=reconnect_timeout,
+ reconnect_failed=reconnect_failed,
+ num_workers=num_workers,
+ )
+
async def wait(self, timeout: float = 30.0) -> None:
self._check_open_getconn()
connection_class: Type[Connection[Any]] = Connection,
configure: Optional[Callable[[Connection[Any]], None]] = None,
reset: Optional[Callable[[Connection[Any]], None]] = None,
- **kwargs: Any,
+ kwargs: Optional[Dict[str, Any]] = None,
+ min_size: int = 4,
+ max_size: Optional[int] = None,
+ name: Optional[str] = None,
+ timeout: float = 30.0,
+ max_waiting: int = 0,
+ max_lifetime: float = 60 * 60.0,
+ max_idle: float = 10 * 60.0,
+ reconnect_timeout: float = 5 * 60.0,
+ reconnect_failed: Optional[Callable[[BasePool[Connection[Any]]], None]] = None,
+ num_workers: int = 3,
):
self.connection_class = connection_class
self._configure = configure
self._tasks: "Queue[MaintenanceTask]" = Queue()
self._workers: List[threading.Thread] = []
- super().__init__(conninfo, **kwargs)
+ super().__init__(
+ conninfo,
+ kwargs=kwargs,
+ min_size=min_size,
+ max_size=max_size,
+ open=open,
+ name=name,
+ timeout=timeout,
+ max_waiting=max_waiting,
+ max_lifetime=max_lifetime,
+ max_idle=max_idle,
+ reconnect_timeout=reconnect_timeout,
+ reconnect_failed=reconnect_failed,
+ num_workers=num_workers,
+ )
if open:
self.open()
connection_class: Type[AsyncConnection[Any]] = AsyncConnection,
configure: Optional[Callable[[AsyncConnection[Any]], Awaitable[None]]] = None,
reset: Optional[Callable[[AsyncConnection[Any]], Awaitable[None]]] = None,
- **kwargs: Any,
+ kwargs: Optional[Dict[str, Any]] = None,
+ min_size: int = 4,
+ max_size: Optional[int] = None,
+ name: Optional[str] = None,
+ timeout: float = 30.0,
+ max_waiting: int = 0,
+ max_lifetime: float = 60 * 60.0,
+ max_idle: float = 10 * 60.0,
+ reconnect_timeout: float = 5 * 60.0,
+ reconnect_failed: Optional[
+ Callable[[BasePool[AsyncConnection[Any]]], None]
+ ] = None,
+ num_workers: int = 3,
):
self.connection_class = connection_class
self._configure = configure
self._sched_runner: Optional[Task[None]] = None
self._workers: List[Task[None]] = []
- super().__init__(conninfo, **kwargs)
+ super().__init__(
+ conninfo,
+ kwargs=kwargs,
+ min_size=min_size,
+ max_size=max_size,
+ open=open,
+ name=name,
+ timeout=timeout,
+ max_waiting=max_waiting,
+ max_lifetime=max_lifetime,
+ max_idle=max_idle,
+ reconnect_timeout=reconnect_timeout,
+ reconnect_failed=reconnect_failed,
+ num_workers=num_workers,
+ )
if open:
self._open()