import os
import tempfile
import uuid
+from enum import Enum
from pathlib import Path
from subprocess import CompletedProcess
from subprocess import run
pass
-MESSAGE_DOCUMENT_ALREADY_EXISTS = "document_already_exists"
-MESSAGE_ASN_ALREADY_EXISTS = "asn_already_exists"
-MESSAGE_ASN_RANGE = "asn_value_out_of_range"
-MESSAGE_FILE_NOT_FOUND = "file_not_found"
-MESSAGE_PRE_CONSUME_SCRIPT_NOT_FOUND = "pre_consume_script_not_found"
-MESSAGE_PRE_CONSUME_SCRIPT_ERROR = "pre_consume_script_error"
-MESSAGE_POST_CONSUME_SCRIPT_NOT_FOUND = "post_consume_script_not_found"
-MESSAGE_POST_CONSUME_SCRIPT_ERROR = "post_consume_script_error"
-MESSAGE_NEW_FILE = "new_file"
-MESSAGE_UNSUPPORTED_TYPE = "unsupported_type"
-MESSAGE_PARSING_DOCUMENT = "parsing_document"
-MESSAGE_GENERATING_THUMBNAIL = "generating_thumbnail"
-MESSAGE_PARSE_DATE = "parse_date"
-MESSAGE_SAVE_DOCUMENT = "save_document"
-MESSAGE_FINISHED = "finished"
+class ConsumerStatusShortMessage(str, Enum):
+ DOCUMENT_ALREADY_EXISTS = "document_already_exists"
+ ASN_ALREADY_EXISTS = "asn_already_exists"
+ ASN_RANGE = "asn_value_out_of_range"
+ FILE_NOT_FOUND = "file_not_found"
+ PRE_CONSUME_SCRIPT_NOT_FOUND = "pre_consume_script_not_found"
+ PRE_CONSUME_SCRIPT_ERROR = "pre_consume_script_error"
+ POST_CONSUME_SCRIPT_NOT_FOUND = "post_consume_script_not_found"
+ POST_CONSUME_SCRIPT_ERROR = "post_consume_script_error"
+ NEW_FILE = "new_file"
+ UNSUPPORTED_TYPE = "unsupported_type"
+ PARSING_DOCUMENT = "parsing_document"
+ GENERATING_THUMBNAIL = "generating_thumbnail"
+ PARSE_DATE = "parse_date"
+ SAVE_DOCUMENT = "save_document"
+ FINISHED = "finished"
+ FAILED = "failed"
+
+
+class ConsumerFilePhase(str, Enum):
+ STARTED = "STARTED"
+ WORKING = "WORKING"
+ SUCCESS = "SUCCESS"
+ FAILED = "FAILED"
class Consumer(LoggingMixin):
def _send_progress(
self,
- current_progress,
- max_progress,
- status,
- message=None,
+ current_progress: int,
+ max_progress: int,
+ status: ConsumerFilePhase,
+ message: Optional[ConsumerStatusShortMessage] = None,
document_id=None,
): # pragma: no cover
payload = {
def _fail(
self,
- message,
- log_message=None,
+ message: ConsumerStatusShortMessage,
+ log_message: Optional[str] = None,
exc_info=None,
exception: Optional[Exception] = None,
):
- self._send_progress(100, 100, "FAILED", message)
+ self._send_progress(100, 100, ConsumerFilePhase.FAILED, message)
self.log.error(log_message or message, exc_info=exc_info)
raise ConsumerError(f"{self.filename}: {log_message or message}") from exception
self.channel_layer = get_channel_layer()
def pre_check_file_exists(self):
+ """
+ Confirm the input file still exists where it should
+ """
if not os.path.isfile(self.path):
self._fail(
- MESSAGE_FILE_NOT_FOUND,
+ ConsumerStatusShortMessage.FILE_NOT_FOUND,
f"Cannot consume {self.path}: File not found.",
)
def pre_check_duplicate(self):
+ """
+ Using the MD5 of the file, check this exact file doesn't already exist
+ """
with open(self.path, "rb") as f:
checksum = hashlib.md5(f.read()).hexdigest()
existing_doc = Document.objects.filter(
if settings.CONSUMER_DELETE_DUPLICATES:
os.unlink(self.path)
self._fail(
- MESSAGE_DOCUMENT_ALREADY_EXISTS,
+ ConsumerStatusShortMessage.DOCUMENT_ALREADY_EXISTS,
f"Not consuming {self.filename}: It is a duplicate of"
f" {existing_doc.get().title} (#{existing_doc.get().pk})",
)
def pre_check_directories(self):
+ """
+ Ensure all required directories exist before attempting to use them
+ """
os.makedirs(settings.SCRATCH_DIR, exist_ok=True)
os.makedirs(settings.THUMBNAIL_DIR, exist_ok=True)
os.makedirs(settings.ORIGINALS_DIR, exist_ok=True)
or self.override_asn > Document.ARCHIVE_SERIAL_NUMBER_MAX
):
self._fail(
- MESSAGE_ASN_RANGE,
+ ConsumerStatusShortMessage.ASN_RANGE,
f"Not consuming {self.filename}: "
f"Given ASN {self.override_asn} is out of range "
f"[{Document.ARCHIVE_SERIAL_NUMBER_MIN:,}, "
)
if Document.objects.filter(archive_serial_number=self.override_asn).exists():
self._fail(
- MESSAGE_ASN_ALREADY_EXISTS,
+ ConsumerStatusShortMessage.ASN_ALREADY_EXISTS,
f"Not consuming {self.filename}: Given ASN already exists!",
)
def run_pre_consume_script(self):
+ """
+ If one is configured and exists, run the pre-consume script and
+ handle its output and/or errors
+ """
if not settings.PRE_CONSUME_SCRIPT:
return
if not os.path.isfile(settings.PRE_CONSUME_SCRIPT):
self._fail(
- MESSAGE_PRE_CONSUME_SCRIPT_NOT_FOUND,
+ ConsumerStatusShortMessage.PRE_CONSUME_SCRIPT_NOT_FOUND,
f"Configured pre-consume script "
f"{settings.PRE_CONSUME_SCRIPT} does not exist.",
)
except Exception as e:
self._fail(
- MESSAGE_PRE_CONSUME_SCRIPT_ERROR,
+ ConsumerStatusShortMessage.PRE_CONSUME_SCRIPT_ERROR,
f"Error while executing pre-consume script: {e}",
exc_info=True,
exception=e,
)
def run_post_consume_script(self, document: Document):
+ """
+ If one is configured and exists, run the pre-consume script and
+ handle its output and/or errors
+ """
if not settings.POST_CONSUME_SCRIPT:
return
if not os.path.isfile(settings.POST_CONSUME_SCRIPT):
self._fail(
- MESSAGE_POST_CONSUME_SCRIPT_NOT_FOUND,
+ ConsumerStatusShortMessage.POST_CONSUME_SCRIPT_NOT_FOUND,
f"Configured post-consume script "
f"{settings.POST_CONSUME_SCRIPT} does not exist.",
)
except Exception as e:
self._fail(
- MESSAGE_POST_CONSUME_SCRIPT_ERROR,
+ ConsumerStatusShortMessage.POST_CONSUME_SCRIPT_ERROR,
f"Error while executing post-consume script: {e}",
exc_info=True,
exception=e,
self.override_asn = override_asn
self.override_owner_id = override_owner_id
- self._send_progress(0, 100, "STARTING", MESSAGE_NEW_FILE)
+ self._send_progress(
+ 0,
+ 100,
+ ConsumerFilePhase.STARTED,
+ ConsumerStatusShortMessage.NEW_FILE,
+ )
# Make sure that preconditions for consuming the file are met.
)
if not parser_class:
tempdir.cleanup()
- self._fail(MESSAGE_UNSUPPORTED_TYPE, f"Unsupported mime type {mime_type}")
+ self._fail(
+ ConsumerStatusShortMessage.UNSUPPORTED_TYPE,
+ f"Unsupported mime type {mime_type}",
+ )
# Notify all listeners that we're going to do some work.
def progress_callback(current_progress, max_progress): # pragma: no cover
# recalculate progress to be within 20 and 80
p = int((current_progress / max_progress) * 50 + 20)
- self._send_progress(p, 100, "WORKING")
+ self._send_progress(p, 100, ConsumerFilePhase.WORKING)
# This doesn't parse the document yet, but gives us a parser.
archive_path = None
try:
- self._send_progress(20, 100, "WORKING", MESSAGE_PARSING_DOCUMENT)
+ self._send_progress(
+ 20,
+ 100,
+ ConsumerFilePhase.WORKING,
+ ConsumerStatusShortMessage.PARSING_DOCUMENT,
+ )
self.log.debug(f"Parsing {self.filename}...")
document_parser.parse(self.path, mime_type, self.filename)
self.log.debug(f"Generating thumbnail for {self.filename}...")
- self._send_progress(70, 100, "WORKING", MESSAGE_GENERATING_THUMBNAIL)
+ self._send_progress(
+ 70,
+ 100,
+ ConsumerFilePhase.WORKING,
+ ConsumerStatusShortMessage.GENERATING_THUMBNAIL,
+ )
thumbnail = document_parser.get_thumbnail(
self.path,
mime_type,
text = document_parser.get_text()
date = document_parser.get_date()
if date is None:
- self._send_progress(90, 100, "WORKING", MESSAGE_PARSE_DATE)
+ self._send_progress(
+ 90,
+ 100,
+ ConsumerFilePhase.WORKING,
+ ConsumerStatusShortMessage.PARSE_DATE,
+ )
date = parse_date(self.filename, text)
archive_path = document_parser.get_archive_path()
classifier = load_classifier()
- self._send_progress(95, 100, "WORKING", MESSAGE_SAVE_DOCUMENT)
+ self._send_progress(
+ 95,
+ 100,
+ ConsumerFilePhase.WORKING,
+ ConsumerStatusShortMessage.SAVE_DOCUMENT,
+ )
# now that everything is done, we can start to store the document
# in the system. This will be a transaction and reasonably fast.
try:
self.log.info(f"Document {document} consumption finished")
- self._send_progress(100, 100, "SUCCESS", MESSAGE_FINISHED, document.id)
+ self._send_progress(
+ 100,
+ 100,
+ ConsumerFilePhase.SUCCESS,
+ ConsumerStatusShortMessage.FINISHED,
+ document.id,
+ )
# Return the most up to date fields
document.refresh_from_db()