from abc import ABC, abstractmethod
from queue import Queue, Empty
from typing import Any, Callable, Deque, Dict, Iterator, List, Optional, Tuple
+from weakref import ref
from contextlib import contextmanager
from collections import deque
logger.debug("returning connection to %r", self.name)
# If the pool is closed just close the connection instead of returning
- # it to the poo. For extra refcare remove the pool reference from it.
+ # it to the pool. For extra refcare remove the pool reference from it.
if self._closed:
conn._pool = None
conn.close()
"task run %s failed: %s: %s", task, e.__class__.__name__, e
)
- # delete reference loops which may keep the pool alive
- del task.pool
if isinstance(task, StopWorker):
return
- del task
def _connect(self) -> Connection:
"""Return a new connection configured for the pool."""
"""A task run asynchronously to maintain the pool state."""
def __init__(self, pool: ConnectionPool):
- self.pool = pool
+ self.pool = ref(pool)
logger.debug("task created: %s", self)
def __repr__(self) -> str:
- return (
- f"<{self.__class__.__name__} {self.pool.name!r} at 0x{id(self):x}>"
- )
+ pool = self.pool()
+ name = repr(pool.name) if pool else "<pool is gone>"
+ return f"<{self.__class__.__name__} {name} at 0x{id(self):x}>"
def run(self) -> None:
+ pool = self.pool()
+ if not pool:
+ # Pool has been deleted. Quietly discard operation.
+ return
+
logger.debug("task running: %s", self)
- self._run()
+ self._run(pool)
@abstractmethod
- def _run(self) -> None:
+ def _run(self, pool: ConnectionPool) -> None:
...
class StopWorker(MaintenanceTask):
"""Signal the maintenance thread to terminate."""
- def _run(self) -> None:
+ def _run(self, pool: ConnectionPool) -> None:
pass
class AddConnection(MaintenanceTask):
"""Add a new connection into to the pool."""
- def _run(self) -> None:
- conn = self.pool._connect()
- self.pool._add_to_pool(conn)
+ def _run(self, pool: ConnectionPool) -> None:
+ conn = pool._connect()
+ pool._add_to_pool(conn)
class AddInitialConnection(AddConnection):
super().__init__(pool)
self.event = event
- def _run(self) -> None:
- super()._run()
- if len(self.pool._pool) >= self.pool._nconns:
+ def _run(self, pool: ConnectionPool) -> None:
+ super()._run(pool)
+ if len(pool._pool) >= pool._nconns:
self.event.set()
super().__init__(pool)
self.conn = conn
- def _run(self) -> None:
- self.pool._add_to_pool(self.conn)
+ def _run(self, pool: ConnectionPool) -> None:
+ pool._add_to_pool(self.conn)