from pathlib import Path
from pathlib import PurePath
from threading import Thread
+from time import monotonic
from time import sleep
+from typing import Final
from django.conf import settings
from django.core.management.base import BaseCommand
logger.warning(f"Not consuming file {filepath}: Unknown file extension.")
return
+ # Total wait time: up to 500ms
+ os_error_retry_count: Final[int] = 50
+ os_error_retry_wait: Final[float] = 0.01
+
+ read_try_count = 0
+ file_open_ok = False
+
+ while (read_try_count < os_error_retry_count) and not file_open_ok:
+ try:
+ with open(filepath, "rb"):
+ file_open_ok = True
+ except OSError:
+ read_try_count += 1
+ sleep(os_error_retry_wait)
+
+ if read_try_count >= os_error_retry_count:
+ logger.warning(f"Not consuming file {filepath}: OS reports file as busy still")
+ return
+
tag_ids = None
try:
if settings.CONSUMER_SUBDIRS_AS_TAGS:
logger.debug(f"Waiting for file {file} to remain unmodified")
mtime = -1
+ size = -1
current_try = 0
while current_try < settings.CONSUMER_POLLING_RETRY_COUNT:
try:
- new_mtime = os.stat(file).st_mtime
+ stat_data = os.stat(file)
+ new_mtime = stat_data.st_mtime
+ new_size = stat_data.st_size
except FileNotFoundError:
logger.debug(
f"File {file} moved while waiting for it to remain " f"unmodified.",
)
return
- if new_mtime == mtime:
+ if new_mtime == mtime and new_size == size:
_consume(file)
return
mtime = new_mtime
+ size = new_size
sleep(settings.CONSUMER_POLLING_DELAY)
current_try += 1
descriptor = inotify.add_watch(directory, inotify_flags)
try:
+
+ inotify_debounce: Final[float] = 0.5
+ notified_files = {}
+
while not self.stop_flag:
+
for event in inotify.read(timeout=1000):
if recursive:
path = inotify.get_path(event.wd)
else:
path = directory
filepath = os.path.join(path, event.name)
- _consume(filepath)
+ notified_files[filepath] = monotonic()
+
+ # Check the files against the timeout
+ still_waiting = {}
+ for filepath in notified_files:
+ # Time of the last inotify event for this file
+ last_event_time = notified_files[filepath]
+ if (monotonic() - last_event_time) > inotify_debounce:
+ _consume(filepath)
+ else:
+ still_waiting[filepath] = last_event_time
+ # These files are still waiting to hit the timeout
+ notified_files = still_waiting
+
except KeyboardInterrupt:
pass
f'_is_ignored("{file_path}") != {expected_ignored}',
)
+ @mock.patch("documents.management.commands.document_consumer.open")
+ def test_consume_file_busy(self, open_mock):
+
+ # Calling this mock always raises this
+ open_mock.side_effect = OSError
+
+ self.t_start()
+
+ f = os.path.join(self.dirs.consumption_dir, "my_file.pdf")
+ shutil.copy(self.sample_file, f)
+
+ self.wait_for_task_mock_call()
+
+ self.task_mock.assert_not_called()
+
@override_settings(
CONSUMER_POLLING=1,