]> git.ipfire.org Git - thirdparty/paperless-ngx.git/commitdiff
Limit the number of threads waiting for files to be ready during polling
authorTrenton H <797416+stumpylog@users.noreply.github.com>
Mon, 20 Mar 2023 18:07:33 +0000 (11:07 -0700)
committerTrenton H <797416+stumpylog@users.noreply.github.com>
Tue, 21 Mar 2023 14:46:57 +0000 (07:46 -0700)
src/documents/management/commands/document_consumer.py

index c3f6bbed4871d5c9031b6d1f4e117e647a0d972e..d4ace3f1bf402f6a515490cd2e29945aee27c783 100644 (file)
@@ -1,10 +1,10 @@
 import logging
 import os
+from concurrent.futures import ThreadPoolExecutor
 from fnmatch import filter
 from pathlib import Path
 from pathlib import PurePath
 from threading import Event
-from threading import Thread
 from time import monotonic
 from time import sleep
 from typing import Final
@@ -168,11 +168,15 @@ def _consume_wait_unmodified(file: str) -> None:
 
 
 class Handler(FileSystemEventHandler):
+    def __init__(self, pool: ThreadPoolExecutor) -> None:
+        super().__init__()
+        self._pool = pool
+
     def on_created(self, event):
-        Thread(target=_consume_wait_unmodified, args=(event.src_path,)).start()
+        self._pool.submit(_consume_wait_unmodified, event.src_path)
 
     def on_moved(self, event):
-        Thread(target=_consume_wait_unmodified, args=(event.dest_path,)).start()
+        self._pool.submit(_consume_wait_unmodified, event.dest_path)
 
 
 class Command(BaseCommand):
@@ -246,17 +250,18 @@ class Command(BaseCommand):
             timeout = self.testing_timeout_s
             logger.debug(f"Configuring timeout to {timeout}s")
 
-        observer = PollingObserver(timeout=settings.CONSUMER_POLLING)
-        observer.schedule(Handler(), directory, recursive=recursive)
-        observer.start()
-        try:
-            while observer.is_alive():
-                observer.join(timeout)
-                if self.stop_flag.is_set():
-                    observer.stop()
-        except KeyboardInterrupt:
-            observer.stop()
-        observer.join()
+        with ThreadPoolExecutor(max_workers=4) as pool:
+            observer = PollingObserver(timeout=settings.CONSUMER_POLLING)
+            observer.schedule(Handler(pool), directory, recursive=recursive)
+            observer.start()
+            try:
+                while observer.is_alive():
+                    observer.join(timeout)
+                    if self.stop_flag.is_set():
+                        observer.stop()
+            except KeyboardInterrupt:
+                observer.stop()
+            observer.join()
 
     def handle_inotify(self, directory, recursive, is_testing: bool):
         logger.info(f"Using inotify to watch directory for changes: {directory}")