]> git.ipfire.org Git - thirdparty/paperless-ngx.git/commitdiff
Experiment with a simpler and combined consumer loop feature-simpler-consume-loop
authorTrenton H <797416+stumpylog@users.noreply.github.com>
Thu, 30 Jan 2025 18:55:23 +0000 (10:55 -0800)
committerTrenton H <797416+stumpylog@users.noreply.github.com>
Thu, 30 Jan 2025 18:55:23 +0000 (10:55 -0800)
src/documents/management/commands/document_consumer.py

index 6b2706733ebb5624c8b17a5448eeda75204a01e5..74f94cfdb63f599ffcfae59084c31a5f5d1b9e1d 100644 (file)
@@ -1,6 +1,5 @@
 import logging
 import os
-from concurrent.futures import ThreadPoolExecutor
 from fnmatch import filter
 from pathlib import Path
 from pathlib import PurePath
@@ -13,8 +12,9 @@ from django import db
 from django.conf import settings
 from django.core.management.base import BaseCommand
 from django.core.management.base import CommandError
-from watchdog.events import FileSystemEventHandler
-from watchdog.observers.polling import PollingObserver
+from watchfiles import Change
+from watchfiles import DefaultFilter
+from watchfiles import watch
 
 from documents.data_models import ConsumableDocument
 from documents.data_models import DocumentMetadataOverrides
@@ -141,53 +141,6 @@ def _consume(filepath: str) -> None:
         logger.exception("Error while consuming document")
 
 
-def _consume_wait_unmodified(file: str) -> None:
-    """
-    Waits for the given file to appear unmodified based on file size
-    and modification time.  Will wait a configured number of seconds
-    and retry a configured number of times before either consuming or
-    giving up
-    """
-    if _is_ignored(file):
-        return
-
-    logger.debug(f"Waiting for file {file} to remain unmodified")
-    mtime = -1
-    size = -1
-    current_try = 0
-    while current_try < settings.CONSUMER_POLLING_RETRY_COUNT:
-        try:
-            stat_data = os.stat(file)
-            new_mtime = stat_data.st_mtime
-            new_size = stat_data.st_size
-        except FileNotFoundError:
-            logger.debug(
-                f"File {file} moved while waiting for it to remain unmodified.",
-            )
-            return
-        if new_mtime == mtime and new_size == size:
-            _consume(file)
-            return
-        mtime = new_mtime
-        size = new_size
-        sleep(settings.CONSUMER_POLLING_DELAY)
-        current_try += 1
-
-    logger.error(f"Timeout while waiting on file {file} to remain unmodified.")
-
-
-class Handler(FileSystemEventHandler):
-    def __init__(self, pool: ThreadPoolExecutor) -> None:
-        super().__init__()
-        self._pool = pool
-
-    def on_created(self, event):
-        self._pool.submit(_consume_wait_unmodified, event.src_path)
-
-    def on_moved(self, event):
-        self._pool.submit(_consume_wait_unmodified, event.dest_path)
-
-
 class Command(BaseCommand):
     """
     On every iteration of an infinite loop, consume what we can from the
@@ -199,7 +152,7 @@ class Command(BaseCommand):
     # Also only for testing, configures in one place the timeout used before checking
     # the stop flag
     testing_timeout_s: Final[float] = 0.5
-    testing_timeout_ms: Final[float] = testing_timeout_s * 1000.0
+    testing_timeout_ms: Final[int] = int(testing_timeout_s * 1000)
 
     def add_arguments(self, parser):
         parser.add_argument(
@@ -221,139 +174,121 @@ class Command(BaseCommand):
         )
 
     def handle(self, *args, **options):
-        directory = options["directory"]
-        recursive = settings.CONSUMER_RECURSIVE
+        directory: Final[Path] = Path(options["directory"]).resolve()
+        is_recursive: Final[bool] = settings.CONSUMER_RECURSIVE
+        is_oneshot: Final[bool] = options["oneshot"]
+        is_testing: Final[bool] = options["testing"]
 
         if not directory:
             raise CommandError("CONSUMPTION_DIR does not appear to be set.")
 
-        directory = os.path.abspath(directory)
-
-        if not os.path.isdir(directory):
+        if not directory.exists():
             raise CommandError(f"Consumption directory {directory} does not exist")
 
+        if not directory.is_dir():
+            raise CommandError(f"Consumption directory {directory} is not a directory")
+
         # Consumer will need this
         settings.SCRATCH_DIR.mkdir(parents=True, exist_ok=True)
 
-        if recursive:
-            for dirpath, _, filenames in os.walk(directory):
-                for filename in filenames:
-                    filepath = os.path.join(dirpath, filename)
-                    _consume(filepath)
-        else:
-            for entry in os.scandir(directory):
-                _consume(entry.path)
+        # Check for existing files at startup
+        glob_str = "**/*" if is_recursive else "*"
 
-        if options["oneshot"]:
-            return
+        for filepath in directory.glob(glob_str):
+            _consume(filepath)
 
-        if settings.CONSUMER_POLLING == 0 and INotify:
-            self.handle_inotify(directory, recursive, options["testing"])
-        else:
-            if INotify is None and settings.CONSUMER_POLLING == 0:  # pragma: no cover
-                logger.warning("Using polling as INotify import failed")
-            self.handle_polling(directory, recursive, options["testing"])
-
-        logger.debug("Consumer exiting.")
-
-    def handle_polling(self, directory, recursive, is_testing: bool):
-        logger.info(f"Polling directory for changes: {directory}")
-
-        timeout = None
-        if is_testing:
-            timeout = self.testing_timeout_s
-            logger.debug(f"Configuring timeout to {timeout}s")
-
-        polling_interval = settings.CONSUMER_POLLING
-        if polling_interval == 0:  # pragma: no cover
-            # Only happens if INotify failed to import
-            logger.warning("Using polling of 10s, consider setting this")
-            polling_interval = 10
-
-        with ThreadPoolExecutor(max_workers=4) as pool:
-            observer = PollingObserver(timeout=polling_interval)
-            observer.schedule(Handler(pool), directory, recursive=recursive)
-            observer.start()
-            try:
-                while observer.is_alive():
-                    observer.join(timeout)
-                    if self.stop_flag.is_set():
-                        observer.stop()
-            except KeyboardInterrupt:
-                observer.stop()
-            observer.join()
-
-    def handle_inotify(self, directory, recursive, is_testing: bool):
-        logger.info(f"Using inotify to watch directory for changes: {directory}")
+        if is_oneshot:
+            logger.info("One shot consume requested, exiting")
+            return
 
-        timeout_ms = None
-        if is_testing:
-            timeout_ms = self.testing_timeout_ms
-            logger.debug(f"Configuring timeout to {timeout_ms}ms")
+        use_polling: Final[bool] = settings.CONSUMER_POLLING != 0
+        poll_delay_ms: Final[int] = int(settings.CONSUMER_POLLING * 1000)
 
-        inotify = INotify()
-        inotify_flags = flags.CLOSE_WRITE | flags.MOVED_TO | flags.MODIFY
-        if recursive:
-            descriptor = inotify.add_watch_recursive(directory, inotify_flags)
+        if use_polling:
+            logger.info(
+                f"Polling {directory} for changes every {settings.CONSUMER_POLLING}s ",
+            )
         else:
-            descriptor = inotify.add_watch(directory, inotify_flags)
+            logger.info(f"Using inotify to watch {directory} for changes")
 
-        inotify_debounce_secs: Final[float] = settings.CONSUMER_INOTIFY_DELAY
-        inotify_debounce_ms: Final[int] = inotify_debounce_secs * 1000
+        read_timeout_ms = 0
+        if options["testing"]:
+            read_timeout_ms = self.testing_timeout_ms
+            logger.debug(f"Configuring initial timeout to {read_timeout_ms}ms")
 
-        finished = False
+        inotify_debounce_secs: Final[float] = settings.CONSUMER_INOTIFY_DELAY
+        inotify_debounce_ms: Final[int] = int(inotify_debounce_secs * 1000)
 
-        notified_files = {}
+        filter = DefaultFilter(ignore_entity_patterns={r"__paperless_write_test_\d+__"})
 
-        while not finished:
+        notified_files: dict[Path, float] = {}
+        while not self.stop_flag.is_set():
             try:
-                for event in inotify.read(timeout=timeout_ms):
-                    path = inotify.get_path(event.wd) if recursive else directory
-                    filepath = os.path.join(path, event.name)
-                    if flags.MODIFY in flags.from_mask(event.mask):
-                        notified_files.pop(filepath, None)
-                    else:
-                        notified_files[filepath] = monotonic()
-
-                # Check the files against the timeout
-                still_waiting = {}
-                # last_event_time is time of the last inotify event for this file
-                for filepath, last_event_time in notified_files.items():
-                    # Current time - last time over the configured timeout
-                    waited_long_enough = (
-                        monotonic() - last_event_time
-                    ) > inotify_debounce_secs
-
-                    # Also make sure the file exists still, some scanners might write a
-                    # temporary file first
-                    file_still_exists = os.path.exists(filepath) and os.path.isfile(
-                        filepath,
-                    )
-
-                    if waited_long_enough and file_still_exists:
-                        _consume(filepath)
-                    elif file_still_exists:
-                        still_waiting[filepath] = last_event_time
-
-                # These files are still waiting to hit the timeout
-                notified_files = still_waiting
-
-                # If files are waiting, need to exit read() to check them
-                # Otherwise, go back to infinite sleep time, but only if not testing
+                for changes in watch(
+                    directory,
+                    watch_filter=filter,
+                    rust_timeout=read_timeout_ms,
+                    yield_on_timeout=True,
+                    force_polling=use_polling,
+                    poll_delay_ms=poll_delay_ms,
+                    recursive=is_recursive,
+                    stop_event=self.stop_flag,
+                ):
+                    for change_type, path in changes:
+                        path = Path(path).resolve()
+                        logger.info(f"Got {change_type.name} for {path}")
+
+                        match change_type:
+                            case Change.added | Change.modified:
+                                logger.info(
+                                    f"New event time for {path} at {monotonic()}",
+                                )
+                                notified_files[path] = monotonic()
+                            case Change.deleted:
+                                notified_files.pop(path, None)
+
+                    logger.info("Checking for files that are ready")
+
+                    # Check the files against the timeout
+                    still_waiting = {}
+                    # last_event_time is time of the last inotify event for this file
+                    for filepath, last_event_time in notified_files.items():
+                        # Current time - last time over the configured timeout
+                        waited_long_enough = (
+                            monotonic() - last_event_time
+                        ) > inotify_debounce_secs
+
+                        # Also make sure the file exists still, some scanners might write a
+                        # temporary file first
+                        file_still_exists = filepath.exists() and filepath.is_file()
+
+                        logger.info(
+                            f"{filepath} - {waited_long_enough} - {file_still_exists}",
+                        )
+
+                        if waited_long_enough and file_still_exists:
+                            logger.info(f"Consuming {filepath}")
+                            _consume(filepath)
+                        elif file_still_exists:
+                            still_waiting[filepath] = last_event_time
+
+                        # These files are still waiting to hit the timeout
+                        notified_files = still_waiting
+
+                    # Always exit the watch loop to reconfigure the timeout
+                    break
+
                 if len(notified_files) > 0:
-                    timeout_ms = inotify_debounce_ms
+                    logger.info("Using inotify_debounce_ms")
+                    read_timeout_ms = inotify_debounce_ms
                 elif is_testing:
-                    timeout_ms = self.testing_timeout_ms
+                    logger.info("Using testing_timeout_ms")
+                    read_timeout_ms = self.testing_timeout_ms
                 else:
-                    timeout_ms = None
-
-                if self.stop_flag.is_set():
-                    logger.debug("Finishing because event is set")
-                    finished = True
-
+                    logger.info("No files in waiting, configuring indefinite timeout")
+                    read_timeout_ms = 0
+                logger.info(f"Configuring timeout to {read_timeout_ms}ms")
             except KeyboardInterrupt:
-                logger.info("Received SIGINT, stopping inotify")
-                finished = True
+                self.stop_flag.set()
 
-        inotify.rm_watch(descriptor)
-        inotify.close()
+        logger.debug("Consumer exiting.")