import uuid
from enum import Enum
from pathlib import Path
-from subprocess import CompletedProcess
-from subprocess import run
from typing import TYPE_CHECKING
from typing import Optional
from documents.signals import document_consumption_started
from documents.utils import copy_basic_file_stats
from documents.utils import copy_file_with_basic_stats
+from documents.utils import run_subprocess
class WorkflowTriggerPlugin(
script_env["TASK_ID"] = self.task_id or ""
try:
- completed_proc = run(
- args=[
+ run_subprocess(
+ [
settings.PRE_CONSUME_SCRIPT,
original_file_path,
],
- env=script_env,
- capture_output=True,
+ script_env,
+ self.log,
)
- self._log_script_outputs(completed_proc)
-
- # Raises exception on non-zero output
- completed_proc.check_returncode()
-
except Exception as e:
self._fail(
ConsumerStatusShortMessage.PRE_CONSUME_SCRIPT_ERROR,
script_env["TASK_ID"] = self.task_id or ""
try:
- completed_proc = run(
- args=[
+ run_subprocess(
+ [
settings.POST_CONSUME_SCRIPT,
str(document.pk),
document.get_public_filename(),
str(document.correspondent),
str(",".join(document.tags.all().values_list("name", flat=True))),
],
- env=script_env,
- capture_output=True,
+ script_env,
+ self.log,
)
- self._log_script_outputs(completed_proc)
-
- # Raises exception on non-zero output
- completed_proc.check_returncode()
-
except Exception as e:
self._fail(
ConsumerStatusShortMessage.POST_CONSUME_SCRIPT_ERROR,
except Exception: # pragma: no cover
pass
- def _log_script_outputs(self, completed_process: CompletedProcess):
- """
- Decodes a process stdout and stderr streams and logs them to the main log
- """
- # Log what the script exited as
- self.log.info(
- f"{completed_process.args[0]} exited {completed_process.returncode}",
- )
-
- # Decode the output (if any)
- if len(completed_process.stdout):
- stdout_str = (
- completed_process.stdout.decode("utf8", errors="ignore")
- .strip()
- .split(
- "\n",
- )
- )
- self.log.info("Script stdout:")
- for line in stdout_str:
- self.log.info(line)
-
- if len(completed_process.stderr):
- stderr_str = (
- completed_process.stderr.decode("utf8", errors="ignore")
- .strip()
- .split(
- "\n",
- )
- )
-
- self.log.warning("Script stderr:")
- for line in stderr_str:
- self.log.warning(line)
-
def parse_doc_title_w_placeholders(
title: str,
from pathlib import Path
-from subprocess import run
import img2pdf
from django.conf import settings
from documents.utils import copy_basic_file_stats
from documents.utils import maybe_override_pixel_limit
+from documents.utils import run_subprocess
def convert_from_tiff_to_pdf(tiff_path: Path, target_directory: Path) -> Path:
# Note the save into the temp folder, so as not to trigger a new
# consume
scratch_image = target_directory / tiff_path.name
- run(
+ run_subprocess(
[
settings.CONVERT_BINARY,
"-alpha",
from documents.loggers import LoggingMixin
from documents.signals import document_consumer_declaration
from documents.utils import copy_file_with_basic_stats
+from documents.utils import run_subprocess
# This regular expression will try to find dates in the document at
# hand and will match the following formats:
logger.debug("Execute: " + " ".join(args), extra={"group": logging_group})
- if not subprocess.Popen(args, env=environment).wait() == 0:
- raise ParseError(f"Convert failed at {args}")
+ try:
+ run_subprocess(args, environment, logger)
+ except subprocess.CalledProcessError as e:
+ raise ParseError(f"Convert failed at {args}") from e
+ except Exception as e: # pragma: no cover
+ raise ParseError("Unknown error running convert") from e
def get_default_thumbnail() -> Path:
# Ghostscript doesn't handle WebP outputs
gs_out_path = os.path.join(temp_dir, "gs_out.png")
cmd = [settings.GS_BINARY, "-q", "-sDEVICE=pngalpha", "-o", gs_out_path, in_path]
+
try:
- if not subprocess.Popen(cmd).wait() == 0:
- raise ParseError(f"Thumbnail (gs) failed at {cmd}")
+ try:
+ run_subprocess(cmd, logger=logger)
+ except subprocess.CalledProcessError as e:
+ raise ParseError(f"Thumbnail (gs) failed at {cmd}") from e
# then run convert on the output from gs to make WebP
run_convert(
density=300,
return super().setUp()
- @mock.patch("documents.consumer.run")
+ @mock.patch("documents.consumer.run_subprocess")
@override_settings(PRE_CONSUME_SCRIPT=None)
def test_no_pre_consume_script(self, m):
c = Consumer()
c.run_pre_consume_script()
m.assert_not_called()
- @mock.patch("documents.consumer.run")
+ @mock.patch("documents.consumer.run_subprocess")
@mock.patch("documents.consumer.Consumer._send_progress")
@override_settings(PRE_CONSUME_SCRIPT="does-not-exist")
def test_pre_consume_script_not_found(self, m, m2):
c.working_copy = "path-to-file"
self.assertRaises(ConsumerError, c.run_pre_consume_script)
- @mock.patch("documents.consumer.run")
+ @mock.patch("documents.consumer.run_subprocess")
def test_pre_consume_script(self, m):
with tempfile.NamedTemporaryFile() as script:
with override_settings(PRE_CONSUME_SCRIPT=script.name):
m.assert_called_once()
- args, kwargs = m.call_args
+ args, _ = m.call_args
- command = kwargs["args"]
- environment = kwargs["env"]
+ command = args[0]
+ environment = args[1]
self.assertEqual(command[0], script.name)
self.assertEqual(command[1], "path-to-file")
return super().setUp()
- @mock.patch("documents.consumer.run")
+ @mock.patch("documents.consumer.run_subprocess")
@override_settings(POST_CONSUME_SCRIPT=None)
def test_no_post_consume_script(self, m):
doc = Document.objects.create(title="Test", mime_type="application/pdf")
doc,
)
- @mock.patch("documents.consumer.run")
+ @mock.patch("documents.consumer.run_subprocess")
def test_post_consume_script_simple(self, m):
with tempfile.NamedTemporaryFile() as script:
with override_settings(POST_CONSUME_SCRIPT=script.name):
m.assert_called_once()
- @mock.patch("documents.consumer.run")
+ @mock.patch("documents.consumer.run_subprocess")
def test_post_consume_script_with_correspondent(self, m):
with tempfile.NamedTemporaryFile() as script:
with override_settings(POST_CONSUME_SCRIPT=script.name):
m.assert_called_once()
- _, kwargs = m.call_args
+ args, _ = m.call_args
- command = kwargs["args"]
- environment = kwargs["env"]
+ command = args[0]
+ environment = args[1]
self.assertEqual(command[0], script.name)
self.assertEqual(command[1], str(doc.pk))
+import logging
import shutil
from os import utime
from pathlib import Path
+from subprocess import CompletedProcess
+from subprocess import run
from typing import Optional
from typing import Union
if pixel_count == 0:
pixel_count = None
Image.MAX_IMAGE_PIXELS = pixel_count
+
+
+def run_subprocess(
+ arguments: list[str],
+ env: Optional[dict[str, str]] = None,
+ logger: Optional[logging.Logger] = None,
+ *,
+ check_exit_code: bool = True,
+ log_stdout: bool = True,
+ log_stderr: bool = True,
+) -> CompletedProcess:
+ """
+ Runs a subprocess and logs its output, checking return code if requested
+ """
+
+ proc_name = arguments[0]
+
+ completed_proc = run(args=arguments, env=env, capture_output=True, check=False)
+
+ if logger:
+ logger.info(f"{proc_name} exited {completed_proc.returncode}")
+
+ if log_stdout and logger and completed_proc.stdout:
+ stdout_str = (
+ completed_proc.stdout.decode("utf8", errors="ignore")
+ .strip()
+ .split(
+ "\n",
+ )
+ )
+ logger.info(f"{proc_name} stdout:")
+ for line in stdout_str:
+ logger.info(line)
+
+ if log_stderr and logger and completed_proc.stderr:
+ stderr_str = (
+ completed_proc.stderr.decode("utf8", errors="ignore")
+ .strip()
+ .split(
+ "\n",
+ )
+ )
+ logger.info(f"{proc_name} stderr:")
+ for line in stderr_str:
+ logger.warning(line)
+
+ # Last, if requested, after logging outputs
+ if check_exit_code:
+ completed_proc.check_returncode()
+
+ return completed_proc
import os
import re
-import subprocess
import tempfile
from pathlib import Path
from typing import TYPE_CHECKING
from documents.parsers import ParseError
from documents.parsers import make_thumbnail_from_pdf
from documents.utils import maybe_override_pixel_limit
+from documents.utils import run_subprocess
from paperless.config import OcrConfig
from paperless.models import ArchiveFileChoices
from paperless.models import CleanChoices
def remove_alpha(self, image_path: str) -> Path:
no_alpha_image = Path(self.tempdir) / "image-no-alpha"
- subprocess.run(
+ run_subprocess(
[
settings.CONVERT_BINARY,
"-alpha",
image_path,
no_alpha_image,
],
+ logger=self.log,
)
return no_alpha_image
mode="w+",
dir=self.tempdir,
) as tmp:
- subprocess.run(
+ run_subprocess(
[
"pdftotext",
"-q",
pdf_file,
tmp.name,
],
+ logger=self.log,
)
text = self.read_file_handle_unicode_errors(Path(tmp.name))