self._reconnect_failed = reconnect_failed
- self._lock = Lock()
+ self._lock: Lock
+ self._sched: Scheduler
+ self._tasks: Queue[MaintenanceTask]
+
self._waiting = Deque[WaitingClient[CT]]()
# to notify that the pool is full
self._pool_full_event: Optional[Event] = None
- self._sched = Scheduler()
self._sched_runner: Optional[threading.Thread] = None
- self._tasks: Queue[MaintenanceTask] = Queue()
self._workers: List[threading.Thread] = []
super().__init__(
because the pool was initialized with *open* = `!True`) but you cannot
currently re-open a closed pool.
"""
+ self._ensure_lock()
+
with self._lock:
self._open()
self._check_open()
+ # A lock has been most likely, but not necessarily, created in `open()`.
+ self._ensure_lock()
+
+ self._tasks = Queue()
+ self._sched = Scheduler()
+
self._closed = False
self._opened = True
self._start_workers()
self._start_initial_tasks()
+ def _ensure_lock(self) -> None:
+ try:
+ self._lock
+ except AttributeError:
+ self._lock = Lock()
+
def _start_workers(self) -> None:
self._sched_runner = spawn(self._sched.run, name=f"{self.name}-scheduler")
assert not self._workers
currently re-open a closed pool.
"""
# Make sure the lock is created after there is an event loop
- try:
- self._lock
- except AttributeError:
- self._lock = ALock()
+ self._ensure_lock()
async with self._lock:
self._open()
if not self._closed:
return
- # Throw a RuntimeError if the pool is open outside a running loop.
- asyncio.get_running_loop()
-
self._check_open()
+ # A lock has been most likely, but not necessarily, created in `open()`.
+ self._ensure_lock()
+
# Create these objects now to attach them to the right loop.
# See #219
self._tasks = AQueue()
self._sched = AsyncScheduler()
- # This has been most likely, but not necessarily, created in `open()`.
- try:
- self._lock
- except AttributeError:
- self._lock = ALock()
self._closed = False
self._opened = True
self._start_workers()
self._start_initial_tasks()
+ def _ensure_lock(self) -> None:
+ """Make sure the pool lock is created.
+
+ In async code, also make sure that the loop is running.
+ """
+
+ # Throw a RuntimeError if the pool is open outside a running loop.
+ asyncio.get_running_loop()
+
+ try:
+ self._lock
+ except AttributeError:
+ self._lock = ALock()
+
def _start_workers(self) -> None:
self._sched_runner = aspawn(self._sched.run, name=f"{self.name}-scheduler")
for i in range(self.num_workers):