import datetime
import hashlib
import os
+import shutil
+import tempfile
import uuid
+from pathlib import Path
from subprocess import CompletedProcess
from subprocess import run
from typing import Optional
def __init__(self):
super().__init__()
- self.path = None
+ self.path: Optional[Path] = None
+ self.original_path: Optional[Path] = None
self.filename = None
self.override_title = None
self.override_correspondent_id = None
self.log("info", f"Executing pre-consume script {settings.PRE_CONSUME_SCRIPT}")
- filepath_arg = os.path.normpath(self.path)
+ working_file_path = str(self.path)
+ original_file_path = str(self.original_path)
script_env = os.environ.copy()
- script_env["DOCUMENT_SOURCE_PATH"] = filepath_arg
+ script_env["DOCUMENT_SOURCE_PATH"] = original_file_path
+ script_env["DOCUMENT_WORKING_PATH"] = working_file_path
try:
completed_proc = run(
args=[
settings.PRE_CONSUME_SCRIPT,
- filepath_arg,
+ original_file_path,
],
env=script_env,
capture_output=True,
exception=e,
)
- def run_post_consume_script(self, document):
+ def run_post_consume_script(self, document: Document):
if not settings.POST_CONSUME_SCRIPT:
return
Return the document object if it was successfully created.
"""
- self.path = path
- self.filename = override_filename or os.path.basename(path)
+ self.path = Path(path).resolve()
+ self.filename = override_filename or self.path.name
self.override_title = override_title
self.override_correspondent_id = override_correspondent_id
self.override_document_type_id = override_document_type_id
self.log("info", f"Consuming {self.filename}")
+ # For the actual work, copy the file into a tempdir
+ self.original_path = self.path
+ tempdir = tempfile.TemporaryDirectory(
+ prefix="paperless-ngx",
+ dir=settings.SCRATCH_DIR,
+ )
+ self.path = Path(tempdir.name) / Path(self.filename)
+ shutil.copy(self.original_path, self.path)
+
# Determine the parser class.
mime_type = magic.from_file(self.path, mime=True)
# Delete the file only if it was successfully consumed
self.log("debug", f"Deleting file {self.path}")
os.unlink(self.path)
+ self.original_path.unlink()
# https://github.com/jonaswinkler/paperless-ng/discussions/1037
shadow_file = os.path.join(
- os.path.dirname(self.path),
- "._" + os.path.basename(self.path),
+ os.path.dirname(self.original_path),
+ "._" + os.path.basename(self.original_path),
)
if os.path.isfile(shadow_file):
)
finally:
document_parser.cleanup()
+ tempdir.cleanup()
self.run_post_consume_script(document)
with tempfile.NamedTemporaryFile() as script:
with override_settings(PRE_CONSUME_SCRIPT=script.name):
c = Consumer()
- c.path = "path-to-file"
+ c.original_path = "path-to-file"
+ c.path = "/tmp/somewhere/path-to-file"
c.run_pre_consume_script()
m.assert_called_once()
args, kwargs = m.call_args
command = kwargs["args"]
+ environment = kwargs["env"]
self.assertEqual(command[0], script.name)
self.assertEqual(command[1], "path-to-file")
+ self.assertDictContainsSubset(
+ {
+ "DOCUMENT_SOURCE_PATH": c.original_path,
+ "DOCUMENT_WORKING_PATH": c.path,
+ },
+ environment,
+ )
+
@mock.patch("documents.consumer.Consumer.log")
def test_script_with_output(self, mocked_log):
"""
m.assert_called_once()
- args, kwargs = m.call_args
+ _, kwargs = m.call_args
command = kwargs["args"]
+ environment = kwargs["env"]
self.assertEqual(command[0], script.name)
self.assertEqual(command[1], str(doc.pk))
self.assertEqual(command[7], "my_bank")
self.assertCountEqual(command[8].split(","), ["a", "b"])
+ self.assertDictContainsSubset(
+ {
+ "DOCUMENT_ID": str(doc.pk),
+ "DOCUMENT_DOWNLOAD_URL": f"/api/documents/{doc.pk}/download/",
+ "DOCUMENT_THUMBNAIL_URL": f"/api/documents/{doc.pk}/thumb/",
+ "DOCUMENT_CORRESPONDENT": "my_bank",
+ "DOCUMENT_TAGS": "a,b",
+ },
+ environment,
+ )
+
def test_script_exit_non_zero(self):
"""
GIVEN: