import logging
import os
-from concurrent.futures import ThreadPoolExecutor
from fnmatch import filter
from pathlib import Path
from pathlib import PurePath
from django.conf import settings
from django.core.management.base import BaseCommand
from django.core.management.base import CommandError
-from watchdog.events import FileSystemEventHandler
-from watchdog.observers.polling import PollingObserver
+from watchfiles import Change
+from watchfiles import DefaultFilter
+from watchfiles import watch
from documents.data_models import ConsumableDocument
from documents.data_models import DocumentMetadataOverrides
logger.exception("Error while consuming document")
-def _consume_wait_unmodified(file: str) -> None:
- """
- Waits for the given file to appear unmodified based on file size
- and modification time. Will wait a configured number of seconds
- and retry a configured number of times before either consuming or
- giving up
- """
- if _is_ignored(file):
- return
-
- logger.debug(f"Waiting for file {file} to remain unmodified")
- mtime = -1
- size = -1
- current_try = 0
- while current_try < settings.CONSUMER_POLLING_RETRY_COUNT:
- try:
- stat_data = os.stat(file)
- new_mtime = stat_data.st_mtime
- new_size = stat_data.st_size
- except FileNotFoundError:
- logger.debug(
- f"File {file} moved while waiting for it to remain unmodified.",
- )
- return
- if new_mtime == mtime and new_size == size:
- _consume(file)
- return
- mtime = new_mtime
- size = new_size
- sleep(settings.CONSUMER_POLLING_DELAY)
- current_try += 1
-
- logger.error(f"Timeout while waiting on file {file} to remain unmodified.")
-
-
-class Handler(FileSystemEventHandler):
- def __init__(self, pool: ThreadPoolExecutor) -> None:
- super().__init__()
- self._pool = pool
-
- def on_created(self, event):
- self._pool.submit(_consume_wait_unmodified, event.src_path)
-
- def on_moved(self, event):
- self._pool.submit(_consume_wait_unmodified, event.dest_path)
-
-
class Command(BaseCommand):
"""
On every iteration of an infinite loop, consume what we can from the
# Also only for testing, configures in one place the timeout used before checking
# the stop flag
testing_timeout_s: Final[float] = 0.5
- testing_timeout_ms: Final[float] = testing_timeout_s * 1000.0
+ testing_timeout_ms: Final[int] = int(testing_timeout_s * 1000)
def add_arguments(self, parser):
parser.add_argument(
)
def handle(self, *args, **options):
- directory = options["directory"]
- recursive = settings.CONSUMER_RECURSIVE
+ directory: Final[Path] = Path(options["directory"]).resolve()
+ is_recursive: Final[bool] = settings.CONSUMER_RECURSIVE
+ is_oneshot: Final[bool] = options["oneshot"]
+ is_testing: Final[bool] = options["testing"]
if not directory:
raise CommandError("CONSUMPTION_DIR does not appear to be set.")
- directory = os.path.abspath(directory)
-
- if not os.path.isdir(directory):
+ if not directory.exists():
raise CommandError(f"Consumption directory {directory} does not exist")
+ if not directory.is_dir():
+ raise CommandError(f"Consumption directory {directory} is not a directory")
+
# Consumer will need this
settings.SCRATCH_DIR.mkdir(parents=True, exist_ok=True)
- if recursive:
- for dirpath, _, filenames in os.walk(directory):
- for filename in filenames:
- filepath = os.path.join(dirpath, filename)
- _consume(filepath)
- else:
- for entry in os.scandir(directory):
- _consume(entry.path)
+ # Check for existing files at startup
+ glob_str = "**/*" if is_recursive else "*"
- if options["oneshot"]:
- return
+ for filepath in directory.glob(glob_str):
+ _consume(filepath)
- if settings.CONSUMER_POLLING == 0 and INotify:
- self.handle_inotify(directory, recursive, options["testing"])
- else:
- if INotify is None and settings.CONSUMER_POLLING == 0: # pragma: no cover
- logger.warning("Using polling as INotify import failed")
- self.handle_polling(directory, recursive, options["testing"])
-
- logger.debug("Consumer exiting.")
-
- def handle_polling(self, directory, recursive, is_testing: bool):
- logger.info(f"Polling directory for changes: {directory}")
-
- timeout = None
- if is_testing:
- timeout = self.testing_timeout_s
- logger.debug(f"Configuring timeout to {timeout}s")
-
- polling_interval = settings.CONSUMER_POLLING
- if polling_interval == 0: # pragma: no cover
- # Only happens if INotify failed to import
- logger.warning("Using polling of 10s, consider setting this")
- polling_interval = 10
-
- with ThreadPoolExecutor(max_workers=4) as pool:
- observer = PollingObserver(timeout=polling_interval)
- observer.schedule(Handler(pool), directory, recursive=recursive)
- observer.start()
- try:
- while observer.is_alive():
- observer.join(timeout)
- if self.stop_flag.is_set():
- observer.stop()
- except KeyboardInterrupt:
- observer.stop()
- observer.join()
-
- def handle_inotify(self, directory, recursive, is_testing: bool):
- logger.info(f"Using inotify to watch directory for changes: {directory}")
+ if is_oneshot:
+ logger.info("One shot consume requested, exiting")
+ return
- timeout_ms = None
- if is_testing:
- timeout_ms = self.testing_timeout_ms
- logger.debug(f"Configuring timeout to {timeout_ms}ms")
+ use_polling: Final[bool] = settings.CONSUMER_POLLING != 0
+ poll_delay_ms: Final[int] = int(settings.CONSUMER_POLLING * 1000)
- inotify = INotify()
- inotify_flags = flags.CLOSE_WRITE | flags.MOVED_TO | flags.MODIFY
- if recursive:
- descriptor = inotify.add_watch_recursive(directory, inotify_flags)
+ if use_polling:
+ logger.info(
+ f"Polling {directory} for changes every {settings.CONSUMER_POLLING}s ",
+ )
else:
- descriptor = inotify.add_watch(directory, inotify_flags)
+ logger.info(f"Using inotify to watch {directory} for changes")
- inotify_debounce_secs: Final[float] = settings.CONSUMER_INOTIFY_DELAY
- inotify_debounce_ms: Final[int] = inotify_debounce_secs * 1000
+ read_timeout_ms = 0
+ if options["testing"]:
+ read_timeout_ms = self.testing_timeout_ms
+ logger.debug(f"Configuring initial timeout to {read_timeout_ms}ms")
- finished = False
+ inotify_debounce_secs: Final[float] = settings.CONSUMER_INOTIFY_DELAY
+ inotify_debounce_ms: Final[int] = int(inotify_debounce_secs * 1000)
- notified_files = {}
+ filter = DefaultFilter(ignore_entity_patterns={r"__paperless_write_test_\d+__"})
- while not finished:
+ notified_files: dict[Path, float] = {}
+ while not self.stop_flag.is_set():
try:
- for event in inotify.read(timeout=timeout_ms):
- path = inotify.get_path(event.wd) if recursive else directory
- filepath = os.path.join(path, event.name)
- if flags.MODIFY in flags.from_mask(event.mask):
- notified_files.pop(filepath, None)
- else:
- notified_files[filepath] = monotonic()
-
- # Check the files against the timeout
- still_waiting = {}
- # last_event_time is time of the last inotify event for this file
- for filepath, last_event_time in notified_files.items():
- # Current time - last time over the configured timeout
- waited_long_enough = (
- monotonic() - last_event_time
- ) > inotify_debounce_secs
-
- # Also make sure the file exists still, some scanners might write a
- # temporary file first
- file_still_exists = os.path.exists(filepath) and os.path.isfile(
- filepath,
- )
-
- if waited_long_enough and file_still_exists:
- _consume(filepath)
- elif file_still_exists:
- still_waiting[filepath] = last_event_time
-
- # These files are still waiting to hit the timeout
- notified_files = still_waiting
-
- # If files are waiting, need to exit read() to check them
- # Otherwise, go back to infinite sleep time, but only if not testing
+ for changes in watch(
+ directory,
+ watch_filter=filter,
+ rust_timeout=read_timeout_ms,
+ yield_on_timeout=True,
+ force_polling=use_polling,
+ poll_delay_ms=poll_delay_ms,
+ recursive=is_recursive,
+ stop_event=self.stop_flag,
+ ):
+ for change_type, path in changes:
+ path = Path(path).resolve()
+ logger.info(f"Got {change_type.name} for {path}")
+
+ match change_type:
+ case Change.added | Change.modified:
+ logger.info(
+ f"New event time for {path} at {monotonic()}",
+ )
+ notified_files[path] = monotonic()
+ case Change.deleted:
+ notified_files.pop(path, None)
+
+ logger.info("Checking for files that are ready")
+
+ # Check the files against the timeout
+ still_waiting = {}
+ # last_event_time is time of the last inotify event for this file
+ for filepath, last_event_time in notified_files.items():
+ # Current time - last time over the configured timeout
+ waited_long_enough = (
+ monotonic() - last_event_time
+ ) > inotify_debounce_secs
+
+ # Also make sure the file exists still, some scanners might write a
+ # temporary file first
+ file_still_exists = filepath.exists() and filepath.is_file()
+
+ logger.info(
+ f"{filepath} - {waited_long_enough} - {file_still_exists}",
+ )
+
+ if waited_long_enough and file_still_exists:
+ logger.info(f"Consuming {filepath}")
+ _consume(filepath)
+ elif file_still_exists:
+ still_waiting[filepath] = last_event_time
+
+ # These files are still waiting to hit the timeout
+ notified_files = still_waiting
+
+ # Always exit the watch loop to reconfigure the timeout
+ break
+
if len(notified_files) > 0:
- timeout_ms = inotify_debounce_ms
+ logger.info("Using inotify_debounce_ms")
+ read_timeout_ms = inotify_debounce_ms
elif is_testing:
- timeout_ms = self.testing_timeout_ms
+ logger.info("Using testing_timeout_ms")
+ read_timeout_ms = self.testing_timeout_ms
else:
- timeout_ms = None
-
- if self.stop_flag.is_set():
- logger.debug("Finishing because event is set")
- finished = True
-
+ logger.info("No files in waiting, configuring indefinite timeout")
+ read_timeout_ms = 0
+ logger.info(f"Configuring timeout to {read_timeout_ms}ms")
except KeyboardInterrupt:
- logger.info("Received SIGINT, stopping inotify")
- finished = True
+ self.stop_flag.set()
- inotify.rm_watch(descriptor)
- inotify.close()
+ logger.debug("Consumer exiting.")