from time import monotonic
from random import random
-from typing import Any, Callable, Dict, Generic, Optional, Tuple
+from typing import Any, Dict, Generic, Optional, Tuple
from psycopg import errors as e
from psycopg.abc import ConnectionType
max_lifetime: float,
max_idle: float,
reconnect_timeout: float,
- reconnect_failed: Optional[Callable[["BasePool[ConnectionType]"], None]],
num_workers: int,
):
min_size, max_size = self._check_size(min_size, max_size)
self.conninfo = conninfo
self.kwargs: Dict[str, Any] = kwargs or {}
- self._reconnect_failed: Callable[["BasePool[ConnectionType]"], None]
- self._reconnect_failed = reconnect_failed or (lambda pool: None)
self.name = name
self._min_size = min_size
self._max_size = max_size
from psycopg import Connection
from psycopg.pq import TransactionStatus
-from .base import BasePool
from .pool import ConnectionPool, AddConnection
from .errors import PoolTimeout, TooManyRequests
from ._compat import ConnectionTimeout
max_lifetime: float = 60 * 60.0,
max_idle: float = 10 * 60.0,
reconnect_timeout: float = 5 * 60.0,
- reconnect_failed: Optional[Callable[[BasePool[Connection[Any]]], None]] = None,
+ reconnect_failed: Optional[Callable[["NullConnectionPool"], None]] = None,
num_workers: int = 3,
):
super().__init__(
max_lifetime=max_lifetime,
max_idle=max_idle,
reconnect_timeout=reconnect_timeout,
- reconnect_failed=reconnect_failed,
num_workers=num_workers,
)
import asyncio
import logging
-from typing import Any, Awaitable, Callable, Dict, Optional, Type
+from typing import Any, Awaitable, Callable, Dict, Optional, Type, Union
from psycopg import AsyncConnection
from psycopg.pq import TransactionStatus
-from .base import BasePool
from .errors import PoolTimeout, TooManyRequests
from ._compat import ConnectionTimeout
from .null_pool import _BaseNullConnectionPool
max_idle: float = 10 * 60.0,
reconnect_timeout: float = 5 * 60.0,
reconnect_failed: Optional[
- Callable[[BasePool[AsyncConnection[None]]], None]
+ Union[
+ Callable[["AsyncNullConnectionPool"], None],
+ Callable[["AsyncNullConnectionPool"], Awaitable[None]],
+ ]
] = None,
num_workers: int = 3,
):
max_lifetime=max_lifetime,
max_idle=max_idle,
reconnect_timeout=reconnect_timeout,
- reconnect_failed=reconnect_failed,
num_workers=num_workers,
)
max_lifetime: float = 60 * 60.0,
max_idle: float = 10 * 60.0,
reconnect_timeout: float = 5 * 60.0,
- reconnect_failed: Optional[Callable[[BasePool[Connection[Any]]], None]] = None,
+ reconnect_failed: Optional[Callable[["ConnectionPool"], None]] = None,
num_workers: int = 3,
):
self.connection_class = connection_class
self._configure = configure
self._reset = reset
+ self._reconnect_failed: Callable[["ConnectionPool"], None]
+ self._reconnect_failed = reconnect_failed or (lambda pool: None)
+
self._lock = threading.RLock()
self._waiting = Deque["WaitingClient"]()
max_lifetime=max_lifetime,
max_idle=max_idle,
reconnect_timeout=reconnect_timeout,
- reconnect_failed=reconnect_failed,
num_workers=num_workers,
)
from time import monotonic
from types import TracebackType
from typing import Any, AsyncIterator, Awaitable, Callable
-from typing import Dict, List, Optional, Sequence, Type
+from typing import Dict, List, Optional, Sequence, Type, Union
from weakref import ref
from contextlib import asynccontextmanager
max_idle: float = 10 * 60.0,
reconnect_timeout: float = 5 * 60.0,
reconnect_failed: Optional[
- Callable[[BasePool[AsyncConnection[Any]]], None]
+ Union[
+ Callable[["AsyncConnectionPool"], None],
+ Callable[["AsyncConnectionPool"], Awaitable[None]],
+ ]
] = None,
num_workers: int = 3,
):
self._configure = configure
self._reset = reset
+ self._reconnect_failed: Union[
+ Callable[["AsyncConnectionPool"], None],
+ Callable[["AsyncConnectionPool"], Awaitable[None]],
+ ]
+ self._reconnect_failed = reconnect_failed or (lambda pool: None)
+
# asyncio objects, created on open to attach them to the right loop.
self._lock: asyncio.Lock
self._sched: AsyncScheduler
max_lifetime=max_lifetime,
max_idle=max_idle,
reconnect_timeout=reconnect_timeout,
- reconnect_failed=reconnect_failed,
num_workers=num_workers,
)
else:
await self._add_to_pool(conn)
- def reconnect_failed(self) -> None:
+ async def reconnect_failed(self) -> None:
"""
Called when reconnection failed for longer than `reconnect_timeout`.
"""
- self._reconnect_failed(self)
+ if asyncio.iscoroutinefunction(self._reconnect_failed):
+ await self._reconnect_failed(self)
+ else:
+ self._reconnect_failed(self)
def run_task(self, task: "MaintenanceTask") -> None:
"""Run a maintenance task in a worker."""
# If we have given up with a growing attempt, allow a new one.
if growing and self._growing:
self._growing = False
- self.reconnect_failed()
+ await self.reconnect_failed()
else:
attempt.update_delay(now)
await self.schedule_task(
@pytest.mark.slow
@pytest.mark.timing
-async def test_reconnect_failure(proxy):
+@pytest.mark.parametrize("async_cb", [True, False])
+async def test_reconnect_failure(proxy, async_cb):
proxy.start()
t1 = None
- def failed(pool):
- assert pool.name == "this-one"
- nonlocal t1
- t1 = time()
+ if async_cb:
+
+ async def failed(pool):
+ assert pool.name == "this-one"
+ nonlocal t1
+ t1 = time()
+
+ else:
+
+ def failed(pool):
+ assert pool.name == "this-one"
+ nonlocal t1
+ t1 = time()
async with pool.AsyncConnectionPool(
proxy.client_dsn,