from psycopg import Connection
from psycopg.pq import TransactionStatus
-from .pool import ConnectionPool, AddConnection
+from .pool import ConnectionPool, AddConnection, SyncConnectFailedCB
from .errors import PoolTimeout, TooManyRequests
from ._compat import ConnectionTimeout
max_lifetime: float = 60 * 60.0,
max_idle: float = 10 * 60.0,
reconnect_timeout: float = 5 * 60.0,
- reconnect_failed: Optional[Callable[["NullConnectionPool"], None]] = None,
+ reconnect_failed: Optional[SyncConnectFailedCB] = None,
num_workers: int = 3,
):
super().__init__(
import asyncio
import logging
-from typing import Any, Awaitable, Callable, Dict, Optional, Type, Union
+from typing import Any, Awaitable, Callable, Dict, Optional, Type
from psycopg import AsyncConnection
from psycopg.pq import TransactionStatus
from .errors import PoolTimeout, TooManyRequests
from ._compat import ConnectionTimeout
from .null_pool import _BaseNullConnectionPool
-from .pool_async import AsyncConnectionPool, AddConnection
+from .pool_async import AsyncConnectionPool, AddConnection, AsyncConnectFailedCB
logger = logging.getLogger("psycopg.pool")
max_lifetime: float = 60 * 60.0,
max_idle: float = 10 * 60.0,
reconnect_timeout: float = 5 * 60.0,
- reconnect_failed: Optional[
- Union[
- Callable[["AsyncNullConnectionPool"], None],
- Callable[["AsyncNullConnectionPool"], Awaitable[None]],
- ]
- ] = None,
+ reconnect_failed: Optional[AsyncConnectFailedCB] = None,
num_workers: int = 3,
):
super().__init__(
from types import TracebackType
from typing import Any, Callable, Dict, Iterator, List
from typing import Optional, Sequence, Type
+from typing_extensions import TypeAlias
from weakref import ref
from contextlib import contextmanager
logger = logging.getLogger("psycopg.pool")
+SyncConnectFailedCB: TypeAlias = Callable[["ConnectionPool"], None]
+
class ConnectionPool(BasePool[Connection[Any]]):
def __init__(
max_lifetime: float = 60 * 60.0,
max_idle: float = 10 * 60.0,
reconnect_timeout: float = 5 * 60.0,
- reconnect_failed: Optional[Callable[["ConnectionPool"], None]] = None,
+ reconnect_failed: Optional[SyncConnectFailedCB] = None,
num_workers: int = 3,
):
self.connection_class = connection_class
self._configure = configure
self._reset = reset
- self._reconnect_failed: Callable[["ConnectionPool"], None]
+ self._reconnect_failed: SyncConnectFailedCB
self._reconnect_failed = reconnect_failed or (lambda pool: None)
self._lock = threading.RLock()
from types import TracebackType
from typing import Any, AsyncIterator, Awaitable, Callable
from typing import Dict, List, Optional, Sequence, Type, Union
+from typing_extensions import TypeAlias
from weakref import ref
from contextlib import asynccontextmanager
logger = logging.getLogger("psycopg.pool")
+AsyncConnectFailedCB: TypeAlias = Union[
+ Callable[["AsyncConnectionPool"], None],
+ Callable[["AsyncConnectionPool"], Awaitable[None]],
+]
+
class AsyncConnectionPool(BasePool[AsyncConnection[Any]]):
def __init__(
max_lifetime: float = 60 * 60.0,
max_idle: float = 10 * 60.0,
reconnect_timeout: float = 5 * 60.0,
- reconnect_failed: Optional[
- Union[
- Callable[["AsyncConnectionPool"], None],
- Callable[["AsyncConnectionPool"], Awaitable[None]],
- ]
- ] = None,
+ reconnect_failed: Optional[AsyncConnectFailedCB] = None,
num_workers: int = 3,
):
self.connection_class = connection_class
self._configure = configure
self._reset = reset
- self._reconnect_failed: Union[
- Callable[["AsyncConnectionPool"], None],
- Callable[["AsyncConnectionPool"], Awaitable[None]],
- ]
+ self._reconnect_failed: AsyncConnectFailedCB
self._reconnect_failed = reconnect_failed or (lambda pool: None)
# asyncio objects, created on open to attach them to the right loop.