import logging
import os
+from concurrent.futures import ThreadPoolExecutor
from fnmatch import filter
from pathlib import Path
from pathlib import PurePath
from threading import Event
-from threading import Thread
from time import monotonic
from time import sleep
from typing import Final
class Handler(FileSystemEventHandler):
+ def __init__(self, pool: ThreadPoolExecutor) -> None:
+ super().__init__()
+ self._pool = pool
+
def on_created(self, event):
- Thread(target=_consume_wait_unmodified, args=(event.src_path,)).start()
+ self._pool.submit(_consume_wait_unmodified, event.src_path)
def on_moved(self, event):
- Thread(target=_consume_wait_unmodified, args=(event.dest_path,)).start()
+ self._pool.submit(_consume_wait_unmodified, event.dest_path)
class Command(BaseCommand):
timeout = self.testing_timeout_s
logger.debug(f"Configuring timeout to {timeout}s")
- observer = PollingObserver(timeout=settings.CONSUMER_POLLING)
- observer.schedule(Handler(), directory, recursive=recursive)
- observer.start()
- try:
- while observer.is_alive():
- observer.join(timeout)
- if self.stop_flag.is_set():
- observer.stop()
- except KeyboardInterrupt:
- observer.stop()
- observer.join()
+ with ThreadPoolExecutor(max_workers=4) as pool:
+ observer = PollingObserver(timeout=settings.CONSUMER_POLLING)
+ observer.schedule(Handler(pool), directory, recursive=recursive)
+ observer.start()
+ try:
+ while observer.is_alive():
+ observer.join(timeout)
+ if self.stop_flag.is_set():
+ observer.stop()
+ except KeyboardInterrupt:
+ observer.stop()
+ observer.join()
def handle_inotify(self, directory, recursive, is_testing: bool):
logger.info(f"Using inotify to watch directory for changes: {directory}")