Just check for None.
self.conninfo = conninfo
self.kwargs: Dict[str, Any] = kwargs or {}
- self._reconnect_failed: Callable[["BasePool[ConnectionType]"], None]
- self._reconnect_failed = reconnect_failed or (lambda pool: None)
+ self._reconnect_failed = reconnect_failed
self.name = name
self._min_size = min_size
self._max_size = max_size
"""
Called when reconnection failed for longer than `reconnect_timeout`.
"""
+ if not self._reconnect_failed:
+ return
+
self._reconnect_failed(self)
def run_task(self, task: "MaintenanceTask") -> None:
"""
Called when reconnection failed for longer than `reconnect_timeout`.
"""
+ if not self._reconnect_failed:
+ return
+
self._reconnect_failed(self)
def run_task(self, task: "MaintenanceTask") -> None: