inotify = INotify()
inotify_flags = flags.CLOSE_WRITE | flags.MOVED_TO | flags.MODIFY
if recursive:
- descriptor = inotify.add_watch_recursive(directory, inotify_flags)
+ inotify.add_watch_recursive(directory, inotify_flags)
else:
- descriptor = inotify.add_watch(directory, inotify_flags)
+ inotify.add_watch(directory, inotify_flags)
inotify_debounce_secs: Final[float] = settings.CONSUMER_INOTIFY_DELAY
inotify_debounce_ms: Final[int] = inotify_debounce_secs * 1000
notified_files = {}
- while not finished:
- try:
- for event in inotify.read(timeout=timeout_ms):
- path = inotify.get_path(event.wd) if recursive else directory
- filepath = os.path.join(path, event.name)
- if flags.MODIFY in flags.from_mask(event.mask):
- notified_files.pop(filepath, None)
+ try:
+ while not finished:
+ try:
+ for event in inotify.read(timeout=timeout_ms):
+ path = inotify.get_path(event.wd) if recursive else directory
+ filepath = os.path.join(path, event.name)
+ if flags.MODIFY in flags.from_mask(event.mask):
+ notified_files.pop(filepath, None)
+ else:
+ notified_files[filepath] = monotonic()
+
+ # Check the files against the timeout
+ still_waiting = {}
+ # last_event_time is time of the last inotify event for this file
+ for filepath, last_event_time in notified_files.items():
+ # Current time - last time over the configured timeout
+ waited_long_enough = (
+ monotonic() - last_event_time
+ ) > inotify_debounce_secs
+
+ # Also make sure the file exists still, some scanners might write a
+ # temporary file first
+ file_still_exists = os.path.exists(filepath) and os.path.isfile(
+ filepath,
+ )
+
+ if waited_long_enough and file_still_exists:
+ _consume(filepath)
+ elif file_still_exists:
+ still_waiting[filepath] = last_event_time
+
+ # These files are still waiting to hit the timeout
+ notified_files = still_waiting
+
+ # If files are waiting, need to exit read() to check them
+ # Otherwise, go back to infinite sleep time, but only if not testing
+ if len(notified_files) > 0:
+ timeout_ms = inotify_debounce_ms
+ elif is_testing:
+ timeout_ms = self.testing_timeout_ms
else:
- notified_files[filepath] = monotonic()
-
- # Check the files against the timeout
- still_waiting = {}
- # last_event_time is time of the last inotify event for this file
- for filepath, last_event_time in notified_files.items():
- # Current time - last time over the configured timeout
- waited_long_enough = (
- monotonic() - last_event_time
- ) > inotify_debounce_secs
-
- # Also make sure the file exists still, some scanners might write a
- # temporary file first
- file_still_exists = os.path.exists(filepath) and os.path.isfile(
- filepath,
- )
-
- if waited_long_enough and file_still_exists:
- _consume(filepath)
- elif file_still_exists:
- still_waiting[filepath] = last_event_time
-
- # These files are still waiting to hit the timeout
- notified_files = still_waiting
-
- # If files are waiting, need to exit read() to check them
- # Otherwise, go back to infinite sleep time, but only if not testing
- if len(notified_files) > 0:
- timeout_ms = inotify_debounce_ms
- elif is_testing:
- timeout_ms = self.testing_timeout_ms
- else:
- timeout_ms = None
-
- if self.stop_flag.is_set():
- logger.debug("Finishing because event is set")
- finished = True
+ timeout_ms = None
- except KeyboardInterrupt:
- logger.info("Received SIGINT, stopping inotify")
- finished = True
+ if self.stop_flag.is_set():
+ logger.debug("Finishing because event is set")
+ finished = True
- inotify.rm_watch(descriptor)
- inotify.close()
+ except KeyboardInterrupt:
+ logger.info("Received SIGINT, stopping inotify")
+ finished = True
+ finally:
+ inotify.close()