matrix:
python-version: ['3.8', '3.9', '3.10']
fail-fast: false
- env:
- # Enable Tika end to end testing
- TIKA_LIVE: 1
- # Enable paperless_mail testing against real server
- PAPERLESS_MAIL_TEST_HOST: ${{ secrets.TEST_MAIL_HOST }}
- PAPERLESS_MAIL_TEST_USER: ${{ secrets.TEST_MAIL_USER }}
- PAPERLESS_MAIL_TEST_PASSWD: ${{ secrets.TEST_MAIL_PASSWD }}
- # Enable Gotenberg end to end testing
- GOTENBERG_LIVE: 1
steps:
-
name: Checkout
pipenv --python ${{ steps.setup-python.outputs.python-version }} run pip list
-
name: Tests
+ env:
+ PAPERLESS_CI_TEST: 1
+ # Enable paperless_mail testing against real server
+ PAPERLESS_MAIL_TEST_HOST: ${{ secrets.TEST_MAIL_HOST }}
+ PAPERLESS_MAIL_TEST_USER: ${{ secrets.TEST_MAIL_USER }}
+ PAPERLESS_MAIL_TEST_PASSWD: ${{ secrets.TEST_MAIL_PASSWD }}
run: |
cd src/
pipenv --python ${{ steps.setup-python.outputs.python-version }} run pytest -ra
},
"tika-client": {
"hashes": [
- "sha256:6f2afab12eb46cd7b4ed6c34c9c2a1791a45d2f479c0da0076936dc6dbfe8061",
- "sha256:f2c23cb76677b7b8be70e2d95ac3418ed046b1514bff920f7460beae1ca3342b"
+ "sha256:43b53816b3783c9c77e16df314cad5ad66ab606391c26ad4bc94a784d473a156",
+ "sha256:e1ef3447b4307059e4a836e3786088498637323733f83a2f807b77f998d77610"
],
"index": "pypi",
- "version": "==0.0.2"
+ "version": "==0.0.3"
},
"tornado": {
"hashes": [
def assertIsNotDir(self, path: Union[PathLike, str]):
self.assertFalse(Path(path).resolve().is_dir(), f"Dir does exist: {path}")
+ def assertFilesEqual(
+ self,
+ path1: Union[PathLike, str],
+ path2: Union[PathLike, str],
+ ):
+ path1 = Path(path1)
+ path2 = Path(path2)
+ import hashlib
+
+ hash1 = hashlib.sha256(path1.read_bytes()).hexdigest()
+ hash2 = hashlib.sha256(path2.read_bytes()).hexdigest()
+
+ self.assertEqual(hash1, hash2, "File SHA256 mismatch")
+
class ConsumerProgressMixin:
def setUp(self) -> None:
-import os
import re
from html import escape
-from io import BytesIO
-from io import StringIO
+from pathlib import Path
+from typing import List
import httpx
from bleach import clean
from django.utils.timezone import is_naive
from django.utils.timezone import make_aware
from humanfriendly import format_size
+from imap_tools import MailAttachment
from imap_tools import MailMessage
-from tika import parser
+from tika_client import TikaClient
from documents.parsers import DocumentParser
from documents.parsers import ParseError
class MailDocumentParser(DocumentParser):
"""
This parser uses imap_tools to parse .eml files, generates pdf using
- gotenbergs and sends the html part to a local tika server for text extraction.
+ Gotenberg and sends the html part to a Tika server for text extraction.
"""
gotenberg_server = settings.TIKA_GOTENBERG_ENDPOINT
tika_server = settings.TIKA_ENDPOINT
logging_name = "paperless.parsing.mail"
- _parsed = None
- def get_parsed(self, document_path) -> MailMessage:
- if not self._parsed:
- try:
- with open(document_path, "rb") as eml:
- self._parsed = MailMessage.from_bytes(eml.read())
- except Exception as err:
- raise ParseError(
- f"Could not parse {document_path}: {err}",
- ) from err
- if not self._parsed.from_values:
- self._parsed = None
- raise ParseError(
- f"Could not parse {document_path}: Missing 'from'",
- )
-
- return self._parsed
-
- def get_thumbnail(self, document_path, mime_type, file_name=None):
+ def get_thumbnail(self, document_path: Path, mime_type: str, file_name=None):
if not self.archive_path:
self.archive_path = self.generate_pdf(document_path)
self.logging_group,
)
- def extract_metadata(self, document_path, mime_type):
+ def extract_metadata(self, document_path: Path, mime_type: str):
result = []
try:
- mail = self.get_parsed(document_path)
+ mail = self.parse_file_to_message(document_path)
except ParseError as e:
self.log.warning(
f"Error while fetching document metadata for {document_path}: {e}",
result.sort(key=lambda item: (item["prefix"], item["key"]))
return result
- def parse(self, document_path, mime_type, file_name=None):
+ def parse(self, document_path: Path, mime_type: str, file_name=None):
+ """
+ Parses the given .eml into formatted text, based on the decoded email.
+
+ """
+
def strip_text(text: str):
+ """
+ Reduces the spacing of the given text string
+ """
text = re.sub(r"\s+", " ", text)
text = re.sub(r"(\n *)+", "\n", text)
return text.strip()
- mail = self.get_parsed(document_path)
+ def build_formatted_text(mail_message: MailMessage) -> str:
+ """
+ Constructs a formatted string, based on the given email. Basically tries
+ to get most of the email content, included front matter, into a nice string
+ """
+ fmt_text = f"Subject: {mail_message.subject}\n\n"
+ fmt_text += f"From: {mail_message.from_values.full}\n\n"
+ to_list = [address.full for address in mail_message.to_values]
+ fmt_text += f"To: {', '.join(to_list)}\n\n"
+ if mail_message.cc_values:
+ fmt_text += (
+ f"CC: {', '.join(address.full for address in mail.cc_values)}\n\n"
+ )
+ if mail_message.bcc_values:
+ fmt_text += (
+ f"BCC: {', '.join(address.full for address in mail.bcc_values)}\n\n"
+ )
+ if mail_message.attachments:
+ att = []
+ for a in mail.attachments:
+ att.append(f"{a.filename} ({format_size(a.size, binary=True)})")
+ fmt_text += f"Attachments: {', '.join(att)}\n\n"
+
+ if mail.html:
+ fmt_text += "HTML content: " + strip_text(self.tika_parse(mail.html))
- self.text = f"Subject: {mail.subject}\n\n"
- self.text += f"From: {mail.from_values.full}\n\n"
- self.text += f"To: {', '.join(address.full for address in mail.to_values)}\n\n"
- if len(mail.cc_values) >= 1:
- self.text += (
- f"CC: {', '.join(address.full for address in mail.cc_values)}\n\n"
- )
- if len(mail.bcc_values) >= 1:
- self.text += (
- f"BCC: {', '.join(address.full for address in mail.bcc_values)}\n\n"
- )
- if len(mail.attachments) >= 1:
- att = []
- for a in mail.attachments:
- att.append(f"{a.filename} ({format_size(a.size, binary=True)})")
+ fmt_text += f"\n\n{strip_text(mail.text)}"
- self.text += f"Attachments: {', '.join(att)}\n\n"
+ return fmt_text
- if mail.html:
- self.text += "HTML content: " + strip_text(self.tika_parse(mail.html))
+ self.log.debug(f"Parsing file {document_path.name} into an email")
+ mail = self.parse_file_to_message(document_path)
- self.text += f"\n\n{strip_text(mail.text)}"
+ self.log.debug("Building formatted text from email")
+ self.text = build_formatted_text(mail)
if is_naive(mail.date):
self.date = make_aware(mail.date)
else:
self.date = mail.date
- self.archive_path = self.generate_pdf(document_path)
+ self.log.debug("Creating a PDF from the email")
+ self.archive_path = self.generate_pdf(mail)
+
+ @staticmethod
+ def parse_file_to_message(filepath: Path) -> MailMessage:
+ """
+ Parses the given .eml file into a MailMessage object
+ """
+ try:
+ with filepath.open("rb") as eml:
+ parsed = MailMessage.from_bytes(eml.read())
+ if parsed.from_values is None:
+ raise ParseError(
+ f"Could not parse {filepath}: Missing 'from'",
+ )
+ except Exception as err:
+ raise ParseError(
+ f"Could not parse {filepath}: {err}",
+ ) from err
+
+ return parsed
def tika_parse(self, html: str):
self.log.info("Sending content to Tika server")
try:
- parsed = parser.from_buffer(html, self.tika_server)
+ with TikaClient(tika_url=self.tika_server) as client:
+ parsed = client.tika.as_text.from_buffer(html, "text/html")
+
+ if "X-TIKA:content" in parsed.data:
+ return parsed.data["X-TIKA:content"].strip()
+ return ""
except Exception as err:
raise ParseError(
f"Could not parse content with tika server at "
f"{self.tika_server}: {err}",
) from err
- if parsed["content"]:
- return parsed["content"]
- else:
- return ""
- def generate_pdf(self, document_path):
- pdf_collection = []
- url_merge = self.gotenberg_server + "/forms/pdfengines/merge"
- pdf_path = os.path.join(self.tempdir, "merged.pdf")
- mail = self.get_parsed(document_path)
+ def generate_pdf(self, mail_message: MailMessage) -> Path:
+ archive_path = Path(self.tempdir) / "merged.pdf"
- pdf_collection.append(("1_mail.pdf", self.generate_pdf_from_mail(mail)))
+ mail_pdf_file = self.generate_pdf_from_mail(mail_message)
- if not mail.html:
- with open(pdf_path, "wb") as file:
- file.write(pdf_collection[0][1])
- file.close()
- return pdf_path
+ # If no HTML content, create the PDF from the message
+ # Otherwise, create 2 PDFs and merge them with Gotenberg
+ if not mail_message.html:
+ archive_path.write_bytes(mail_pdf_file.read_bytes())
else:
- pdf_collection.append(
- (
- "2_html.pdf",
- self.generate_pdf_from_html(mail.html, mail.attachments),
- ),
- )
+ url_merge = self.gotenberg_server + "/forms/pdfengines/merge"
- files = {}
- for name, content in pdf_collection:
- files[name] = (name, BytesIO(content))
- headers = {}
- try:
- response = httpx.post(url_merge, files=files, headers=headers)
- response.raise_for_status() # ensure we notice bad responses
- except Exception as err:
- raise ParseError(f"Error while converting document to PDF: {err}") from err
+ pdf_of_html_content = self.generate_pdf_from_html(
+ mail_message.html,
+ mail_message.attachments,
+ )
- with open(pdf_path, "wb") as file:
- file.write(response.content)
- file.close()
+ pdf_collection = {
+ "1_mail.pdf": ("1_mail.pdf", mail_pdf_file, "application/pdf"),
+ "2_html.pdf": ("2_html.pdf", pdf_of_html_content, "application/pdf"),
+ }
- return pdf_path
+ try:
+ # Open a handle to each file, replacing the tuple
+ for filename in pdf_collection:
+ file_multi_part = pdf_collection[filename]
+ pdf_collection[filename] = (
+ file_multi_part[0],
+ file_multi_part[1].open("rb"),
+ file_multi_part[2],
+ )
+
+ response = httpx.post(url_merge, files=pdf_collection)
+ response.raise_for_status() # ensure we notice bad responses
- @staticmethod
- def mail_to_html(mail: MailMessage) -> StringIO:
- data = {}
+ archive_path.write_bytes(response.content)
- def clean_html(text: str):
+ except Exception as err:
+ raise ParseError(
+ f"Error while merging email HTML into PDF: {err}",
+ ) from err
+ finally:
+ for filename in pdf_collection:
+ file_multi_part_handle = pdf_collection[filename][1]
+ file_multi_part_handle.close()
+
+ return archive_path
+
+ def mail_to_html(self, mail: MailMessage) -> Path:
+ """
+ Converts the given email into an HTML file, formatted
+ based on the given template
+ """
+
+ def clean_html(text: str) -> str:
+ """
+ Attempts to clean, escape and linkify the given HTML string
+ """
if isinstance(text, list):
text = "\n".join([str(e) for e in text])
if type(text) != str:
text = text.replace("\n", "<br>")
return text
+ data = {}
+
data["subject"] = clean_html(mail.subject)
if data["subject"]:
data["subject_label"] = "Subject"
data["date"] = clean_html(mail.date.astimezone().strftime("%Y-%m-%d %H:%M"))
data["content"] = clean_html(mail.text.strip())
- html = StringIO()
-
from django.template.loader import render_to_string
- rendered = render_to_string("email_msg_template.html", context=data)
-
- html.write(rendered)
- html.seek(0)
+ html_file = Path(self.tempdir) / "email_as_html.html"
+ html_file.write_text(render_to_string("email_msg_template.html", context=data))
- return html
+ return html_file
- def generate_pdf_from_mail(self, mail):
+ def generate_pdf_from_mail(self, mail: MailMessage) -> Path:
+ """
+ Creates a PDF based on the given email, using the email's values in a
+ an HTML template
+ """
url = self.gotenberg_server + "/forms/chromium/convert/html"
self.log.info("Converting mail to PDF")
- css_file = os.path.join(os.path.dirname(__file__), "templates/output.css")
+ css_file = Path(__file__).parent / "templates" / "output.css"
+ email_html_file = self.mail_to_html(mail)
- with open(css_file, "rb") as css_handle:
+ print(css_file)
+ print(email_html_file)
+
+ with css_file.open("rb") as css_handle, email_html_file.open(
+ "rb",
+ ) as email_html_handle:
files = {
- "html": ("index.html", self.mail_to_html(mail)),
- "css": ("output.css", css_handle),
+ "html": ("index.html", email_html_handle, "text/html"),
+ "css": ("output.css", css_handle, "text/css"),
}
headers = {}
data = {
response.raise_for_status() # ensure we notice bad responses
except Exception as err:
raise ParseError(
- f"Error while converting document to PDF: {err}",
+ f"Error while converting email to PDF: {err}",
) from err
- return response.content
+ email_as_pdf_file = Path(self.tempdir) / "email_as_pdf.pdf"
+ email_as_pdf_file.write_bytes(response.content)
+
+ return email_as_pdf_file
+
+ def generate_pdf_from_html(
+ self,
+ orig_html: str,
+ attachments: List[MailAttachment],
+ ) -> Path:
+ """
+ Generates a PDF file based on the HTML and attachments of the email
+ """
- @staticmethod
- def transform_inline_html(html, attachments):
def clean_html_script(text: str):
compiled_open = re.compile(re.escape("<script"), re.IGNORECASE)
text = compiled_open.sub("<div hidden ", text)
text = compiled_close.sub("</div", text)
return text
- html_clean = clean_html_script(html)
- files = []
+ url = self.gotenberg_server + "/forms/chromium/convert/html"
+ self.log.info("Converting html to PDF")
+
+ tempdir = Path(self.tempdir)
+
+ html_clean = clean_html_script(orig_html)
- for a in attachments:
- name_cid = "cid:" + a.content_id
+ files = {}
+
+ for attachment in attachments:
+ # Clean the attachment name to be valid
+ name_cid = f"cid:{attachment.content_id}"
name_clean = "".join(e for e in name_cid if e.isalnum())
- files.append((name_clean, BytesIO(a.payload)))
- html_clean = html_clean.replace(name_cid, name_clean)
- files.append(("index.html", StringIO(html_clean)))
+ # Write attachment payload to a temp file
+ temp_file = tempdir / name_clean
+ temp_file.write_bytes(attachment.payload)
- return files
+ # Store the attachment for upload
+ files[name_clean] = (name_clean, temp_file, attachment.content_type)
- def generate_pdf_from_html(self, orig_html, attachments):
- url = self.gotenberg_server + "/forms/chromium/convert/html"
- self.log.info("Converting html to PDF")
+ # Replace as needed the name with the clean name
+ html_clean = html_clean.replace(name_cid, name_clean)
- files = {}
- for name, file in self.transform_inline_html(orig_html, attachments):
- files[name] = (name, file)
+ # Now store the cleaned up HTML version
+ html_clean_file = tempdir / "index.html"
+ html_clean_file.write_text(html_clean)
+
+ files["index.html"] = ("index.html", html_clean_file, "text/html")
- headers = {}
data = {
"marginTop": "0.1",
"marginBottom": "0.1",
"scale": "1.0",
}
try:
+ # Open a handle to each file, replacing the tuple
+ for filename in files:
+ file_multi_part = files[filename]
+ files[filename] = (
+ file_multi_part[0],
+ file_multi_part[1].open("rb"),
+ file_multi_part[2],
+ )
+
response = httpx.post(
url,
files=files,
- headers=headers,
data=data,
)
response.raise_for_status() # ensure we notice bad responses
except Exception as err:
raise ParseError(f"Error while converting document to PDF: {err}") from err
-
- return response.content
+ finally:
+ # Ensure all file handles as closed
+ for filename in files:
+ file_multi_part_handle = files[filename][1]
+ file_multi_part_handle.close()
+
+ html_pdf = tempdir / "html.pdf"
+ html_pdf.write_bytes(response.content)
+ return html_pdf
import datetime
-import os
+from pathlib import Path
from unittest import mock
+import httpx
from django.test import TestCase
from documents.parsers import ParseError
from documents.tests.utils import FileSystemAssertsMixin
from paperless_mail.parsers import MailDocumentParser
+from paperless_tika.tests.utils import HttpxMockMixin
-class TestParser(FileSystemAssertsMixin, TestCase):
- SAMPLE_FILES = os.path.join(os.path.dirname(__file__), "samples")
+class BaseMailParserTestCase(TestCase):
+ """
+ Basic setup for the below test cases
+ """
+
+ SAMPLE_DIR = Path(__file__).parent / "samples"
def setUp(self) -> None:
+ super().setUp()
self.parser = MailDocumentParser(logging_group=None)
def tearDown(self) -> None:
+ super().tearDown()
self.parser.cleanup()
- def test_get_parsed_missing_file(self):
+
+class TestEmailFileParsing(FileSystemAssertsMixin, BaseMailParserTestCase):
+ """
+ Tests around reading a file and parsing it into a
+ MailMessage
+ """
+
+ def test_parse_error_missing_file(self):
"""
GIVEN:
- Fresh parser
- An Exception is thrown
"""
# Check if exception is raised when parsing fails.
+ test_file = self.SAMPLE_DIR / "doesntexist.eml"
+
+ self.assertIsNotFile(test_file)
self.assertRaises(
ParseError,
- self.parser.get_parsed,
- os.path.join(self.SAMPLE_FILES, "na"),
+ self.parser.parse,
+ test_file,
+ "messages/rfc822",
)
- def test_get_parsed_broken_file(self):
+ def test_parse_error_invalid_email(self):
"""
GIVEN:
- Fresh parser
# Check if exception is raised when the mail is faulty.
self.assertRaises(
ParseError,
- self.parser.get_parsed,
- os.path.join(self.SAMPLE_FILES, "broken.eml"),
+ self.parser.parse,
+ self.SAMPLE_DIR / "broken.eml",
+ "messages/rfc822",
)
- def test_get_parsed_simple_text_mail(self):
+ def test_parse_simple_text_email_file(self):
"""
GIVEN:
- Fresh parser
- The content of the mail should be available in the parse result.
"""
# Parse Test file and check relevant content
- parsed1 = self.parser.get_parsed(
- os.path.join(self.SAMPLE_FILES, "simple_text.eml"),
+ parsed1 = self.parser.parse_file_to_message(
+ self.SAMPLE_DIR / "simple_text.eml",
)
self.assertEqual(parsed1.date.year, 2022)
self.assertEqual(parsed1.text, "This is just a simple Text Mail.\n")
self.assertEqual(parsed1.to, ("some@one.de",))
- def test_get_parsed_reparse(self):
- """
- GIVEN:
- - An E-Mail was parsed
- WHEN:
- - Another .eml file should be parsed
- THEN:
- - The parser should not retry to parse and return the old results
- """
- # Parse Test file and check relevant content
- parsed1 = self.parser.get_parsed(
- os.path.join(self.SAMPLE_FILES, "simple_text.eml"),
- )
- # Check if same parsed object as before is returned, even if another file is given.
- parsed2 = self.parser.get_parsed(
- os.path.join(os.path.join(self.SAMPLE_FILES, "html.eml")),
- )
- self.assertEqual(parsed1, parsed2)
-
- @mock.patch("paperless_mail.parsers.MailDocumentParser.generate_pdf")
- @mock.patch("paperless_mail.parsers.make_thumbnail_from_pdf")
- def test_get_thumbnail(
- self,
- mock_make_thumbnail_from_pdf: mock.MagicMock,
- mock_generate_pdf: mock.MagicMock,
- ):
- """
- GIVEN:
- - An E-Mail was parsed
- WHEN:
- - The Thumbnail is requested
- THEN:
- - The parser should call the functions which generate the thumbnail
- """
- mocked_return = "Passing the return value through.."
- mock_make_thumbnail_from_pdf.return_value = mocked_return
-
- mock_generate_pdf.return_value = "Mocked return value.."
- thumb = self.parser.get_thumbnail(
- os.path.join(self.SAMPLE_FILES, "simple_text.eml"),
- "message/rfc822",
- )
- self.assertEqual(
- self.parser.archive_path,
- mock_make_thumbnail_from_pdf.call_args_list[0].args[0],
- )
- self.assertEqual(
- self.parser.tempdir,
- mock_make_thumbnail_from_pdf.call_args_list[0].args[1],
- )
- self.assertEqual(mocked_return, thumb)
+class TestEmailMetadataExtraction(BaseMailParserTestCase):
+ """
+ Tests extraction of metadata from an email
+ """
def test_extract_metadata_fail(self):
"""
"""
# Validate Metadata parsing returns the expected results
metadata = self.parser.extract_metadata(
- os.path.join(self.SAMPLE_FILES, "simple_text.eml"),
+ self.SAMPLE_DIR / "simple_text.eml",
"message/rfc822",
)
metadata,
)
- def test_parse_na(self):
- """
- GIVEN:
- - Fresh start
- WHEN:
- - parsing is attempted with nonexistent file
- THEN:
- - Exception is thrown
- """
- # Check if exception is raised when parsing fails.
- self.assertRaises(
- ParseError,
- self.parser.parse,
- os.path.join(self.SAMPLE_FILES, "na"),
- "message/rfc822",
- )
- @mock.patch("paperless_mail.parsers.MailDocumentParser.tika_parse")
+class TestEmailThumbnailGenerate(BaseMailParserTestCase):
+ """
+ Tests the correct generation of an thumbnail for an email
+ """
+
@mock.patch("paperless_mail.parsers.MailDocumentParser.generate_pdf")
- def test_parse_html_eml(self, n, mock_tika_parse: mock.MagicMock):
+ @mock.patch("paperless_mail.parsers.make_thumbnail_from_pdf")
+ def test_get_thumbnail(
+ self,
+ mock_make_thumbnail_from_pdf: mock.MagicMock,
+ mock_generate_pdf: mock.MagicMock,
+ ):
"""
GIVEN:
- - Fresh start
+ - An E-Mail was parsed
WHEN:
- - parsing is done with html mail
+ - The Thumbnail is requested
THEN:
- - Tika is called, parsed information from non html parts is available
+ - The parser should call the functions which generate the thumbnail
"""
- # Validate parsing returns the expected results
- text_expected = "Subject: HTML Message\n\nFrom: Name <someone@example.de>\n\nTo: someone@example.de\n\nAttachments: IntM6gnXFm00FEV5.png (6.89 KiB), 600+kbfile.txt (600.24 KiB)\n\nHTML content: tika return\n\nSome Text and an embedded image."
- mock_tika_parse.return_value = "tika return"
-
- self.parser.parse(os.path.join(self.SAMPLE_FILES, "html.eml"), "message/rfc822")
+ mocked_return = "Passing the return value through.."
+ mock_make_thumbnail_from_pdf.return_value = mocked_return
- self.assertEqual(text_expected, self.parser.text)
- self.assertEqual(
- datetime.datetime(
- 2022,
- 10,
- 15,
- 11,
- 23,
- 19,
- tzinfo=datetime.timezone(datetime.timedelta(seconds=7200)),
- ),
- self.parser.date,
- )
+ mock_generate_pdf.return_value = "Mocked return value.."
- @mock.patch("paperless_mail.parsers.MailDocumentParser.generate_pdf")
- def test_parse_simple_eml(self, m: mock.MagicMock):
- """
- GIVEN:
- - Fresh start
- WHEN:
- - parsing is done with non html mail
- THEN:
- - parsed information is available
- """
- # Validate parsing returns the expected results
+ test_file = self.SAMPLE_DIR / "simple_text.eml"
- self.parser.parse(
- os.path.join(self.SAMPLE_FILES, "simple_text.eml"),
+ thumb = self.parser.get_thumbnail(
+ test_file,
"message/rfc822",
)
- text_expected = "Subject: Simple Text Mail\n\nFrom: Some One <mail@someserver.de>\n\nTo: some@one.de\n\nCC: asdasd@æsdasd.de, asdadasdasdasda.asdasd@æsdasd.de\n\nBCC: fdf@fvf.de\n\n\n\nThis is just a simple Text Mail."
- self.assertEqual(text_expected, self.parser.text)
- self.assertEqual(
- datetime.datetime(
- 2022,
- 10,
- 12,
- 21,
- 40,
- 43,
- tzinfo=datetime.timezone(datetime.timedelta(seconds=7200)),
- ),
- self.parser.date,
+
+ mock_generate_pdf.assert_called_once_with(
+ test_file,
+ )
+ mock_make_thumbnail_from_pdf.assert_called_once_with(
+ "Mocked return value..",
+ self.parser.tempdir,
+ None,
)
- # Just check if tried to generate archive, the unittest for generate_pdf() goes deeper.
- m.assert_called()
+ self.assertEqual(mocked_return, thumb)
+
- @mock.patch("paperless_mail.parsers.parser.from_buffer")
- def test_tika_parse_unsuccessful(self, mock_from_buffer: mock.MagicMock):
+class TestTikaHtmlParse(HttpxMockMixin, BaseMailParserTestCase):
+ def test_tika_parse_unsuccessful(self):
"""
GIVEN:
- Fresh start
- the parser should return an empty string
"""
# Check unsuccessful parsing
- mock_from_buffer.return_value = {"content": None}
- parsed = self.parser.tika_parse(None)
+ self.httpx_mock.add_response(
+ json={"Content-Type": "text/html", "X-TIKA:Parsed-By": []},
+ )
+ parsed = self.parser.tika_parse("None")
self.assertEqual("", parsed)
- @mock.patch("paperless_mail.parsers.parser.from_buffer")
- def test_tika_parse(self, mock_from_buffer: mock.MagicMock):
+ def test_tika_parse(self):
"""
GIVEN:
- Fresh start
html = '<html><head><meta http-equiv="content-type" content="text/html; charset=UTF-8"></head><body><p>Some Text</p></body></html>'
expected_text = "Some Text"
- # Check successful parsing
- mock_from_buffer.return_value = {"content": expected_text}
+ self.httpx_mock.add_response(
+ json={
+ "Content-Type": "text/html",
+ "X-TIKA:Parsed-By": [],
+ "X-TIKA:content": expected_text,
+ },
+ )
parsed = self.parser.tika_parse(html)
self.assertEqual(expected_text, parsed.strip())
- mock_from_buffer.assert_called_with(html, self.parser.tika_server)
+ self.assertIn(self.parser.tika_server, str(self.httpx_mock.get_request().url))
- @mock.patch("paperless_mail.parsers.parser.from_buffer")
- def test_tika_parse_exception(self, mock_from_buffer: mock.MagicMock):
+ def test_tika_parse_exception(self):
"""
GIVEN:
- Fresh start
"""
html = '<html><head><meta http-equiv="content-type" content="text/html; charset=UTF-8"></head><body><p>Some Text</p></body></html>'
- # Check ParseError
- def my_side_effect():
- raise Exception("Test")
+ self.httpx_mock.add_response(status_code=httpx.codes.INTERNAL_SERVER_ERROR)
- mock_from_buffer.side_effect = my_side_effect
self.assertRaises(ParseError, self.parser.tika_parse, html)
def test_tika_parse_unreachable(self):
self.parser.tika_server = ""
self.assertRaises(ParseError, self.parser.tika_parse, html)
- @mock.patch("paperless_mail.parsers.MailDocumentParser.generate_pdf_from_mail")
- @mock.patch("paperless_mail.parsers.MailDocumentParser.generate_pdf_from_html")
- def test_generate_pdf_parse_error(self, m: mock.MagicMock, n: mock.MagicMock):
+
+class TestParser(FileSystemAssertsMixin, HttpxMockMixin, BaseMailParserTestCase):
+ def test_parse_no_file(self):
"""
GIVEN:
- Fresh start
WHEN:
- - pdf generation is requested but gotenberg can not be reached
+ - parsing is attempted with nonexistent file
THEN:
- - a ParseError Exception is thrown
+ - Exception is thrown
"""
- m.return_value = b""
- n.return_value = b""
-
- # Check if exception is raised when the pdf can not be created.
- self.parser.gotenberg_server = ""
+ # Check if exception is raised when parsing fails.
self.assertRaises(
ParseError,
- self.parser.generate_pdf,
- os.path.join(self.SAMPLE_FILES, "html.eml"),
+ self.parser.parse,
+ self.SAMPLE_DIR / "na.eml",
+ "message/rfc822",
)
- def test_generate_pdf_exception(self):
+ @mock.patch("paperless_mail.parsers.MailDocumentParser.generate_pdf")
+ def test_parse_eml_simple(self, mock_generate_pdf: mock.MagicMock):
"""
GIVEN:
- Fresh start
WHEN:
- - pdf generation is requested but parsing throws an exception
+ - parsing is done with non html mail
THEN:
- - a ParseError Exception is thrown
+ - parsed information is available
"""
- # Check if exception is raised when the mail can not be parsed.
- self.assertRaises(
- ParseError,
- self.parser.generate_pdf,
- os.path.join(self.SAMPLE_FILES, "broken.eml"),
+ # Validate parsing returns the expected results
+
+ self.parser.parse(
+ self.SAMPLE_DIR / "simple_text.eml",
+ "message/rfc822",
+ )
+ text_expected = (
+ "Subject: Simple Text Mail\n\n"
+ "From: Some One <mail@someserver.de>\n\n"
+ "To: some@one.de\n\n"
+ "CC: asdasd@æsdasd.de, asdadasdasdasda.asdasd@æsdasd.de\n\n"
+ "BCC: fdf@fvf.de\n\n"
+ "\n\nThis is just a simple Text Mail."
+ )
+ self.assertEqual(text_expected, self.parser.text)
+ self.assertEqual(
+ datetime.datetime(
+ 2022,
+ 10,
+ 12,
+ 21,
+ 40,
+ 43,
+ tzinfo=datetime.timezone(datetime.timedelta(seconds=7200)),
+ ),
+ self.parser.date,
)
- @mock.patch("paperless_mail.parsers.requests.post")
- @mock.patch("paperless_mail.parsers.MailDocumentParser.generate_pdf_from_mail")
- @mock.patch("paperless_mail.parsers.MailDocumentParser.generate_pdf_from_html")
- def test_generate_pdf(
- self,
- mock_generate_pdf_from_html: mock.MagicMock,
- mock_generate_pdf_from_mail: mock.MagicMock,
- mock_post: mock.MagicMock,
- ):
+ # Just check if tried to generate archive, the unittest for generate_pdf() goes deeper.
+ mock_generate_pdf.assert_called()
+
+ @mock.patch("paperless_mail.parsers.MailDocumentParser.generate_pdf")
+ def test_parse_eml_html(self, mock_generate_pdf: mock.MagicMock):
"""
GIVEN:
- Fresh start
WHEN:
- - pdf generation is requested
+ - parsing is done with html mail
THEN:
- - gotenberg is called and the resulting file is returned
+ - Tika is called, parsed information from non html parts is available
"""
- mock_generate_pdf_from_mail.return_value = b"Mail Return"
- mock_generate_pdf_from_html.return_value = b"HTML Return"
+ # Validate parsing returns the expected results
+ text_expected = (
+ "Subject: HTML Message\n\n"
+ "From: Name <someone@example.de>\n\n"
+ "To: someone@example.de\n\n"
+ "Attachments: IntM6gnXFm00FEV5.png (6.89 KiB), 600+kbfile.txt (600.24 KiB)\n\n"
+ "HTML content: tika return\n\n"
+ "Some Text and an embedded image."
+ )
+
+ self.httpx_mock.add_response(
+ json={
+ "Content-Type": "text/html",
+ "X-TIKA:Parsed-By": [],
+ "X-TIKA:content": "tika return",
+ },
+ )
- mock_response = mock.MagicMock()
- mock_response.content = b"Content"
- mock_post.return_value = mock_response
- pdf_path = self.parser.generate_pdf(os.path.join(self.SAMPLE_FILES, "html.eml"))
- self.assertIsFile(pdf_path)
+ self.parser.parse(self.SAMPLE_DIR / "html.eml", "message/rfc822")
- mock_generate_pdf_from_mail.assert_called_once_with(
- self.parser.get_parsed(None),
- )
- mock_generate_pdf_from_html.assert_called_once_with(
- self.parser.get_parsed(None).html,
- self.parser.get_parsed(None).attachments,
- )
- self.assertEqual(
- self.parser.gotenberg_server + "/forms/pdfengines/merge",
- mock_post.call_args.args[0],
- )
- self.assertEqual({}, mock_post.call_args.kwargs["headers"])
- self.assertEqual(
- b"Mail Return",
- mock_post.call_args.kwargs["files"]["1_mail.pdf"][1].read(),
- )
+ mock_generate_pdf.assert_called_once()
+ self.assertEqual(text_expected, self.parser.text)
self.assertEqual(
- b"HTML Return",
- mock_post.call_args.kwargs["files"]["2_html.pdf"][1].read(),
+ datetime.datetime(
+ 2022,
+ 10,
+ 15,
+ 11,
+ 23,
+ 19,
+ tzinfo=datetime.timezone(datetime.timedelta(seconds=7200)),
+ ),
+ self.parser.date,
)
- mock_response.raise_for_status.assert_called_once()
-
- with open(pdf_path, "rb") as file:
- self.assertEqual(b"Content", file.read())
-
- def test_mail_to_html(self):
+ def test_generate_pdf_parse_error(self):
"""
GIVEN:
- Fresh start
WHEN:
- - conversion from eml to html is requested
+ - pdf generation is requested but gotenberg fails
THEN:
- - html should be returned
+ - a ParseError Exception is thrown
"""
- mail = self.parser.get_parsed(os.path.join(self.SAMPLE_FILES, "html.eml"))
- html_handle = self.parser.mail_to_html(mail)
- html_received = html_handle.read()
-
- with open(
- os.path.join(self.SAMPLE_FILES, "html.eml.html"),
- ) as html_expected_handle:
- html_expected = html_expected_handle.read()
+ self.httpx_mock.add_response(status_code=httpx.codes.INTERNAL_SERVER_ERROR)
- self.assertHTMLEqual(html_expected, html_received)
+ self.assertRaises(
+ ParseError,
+ self.parser.parse,
+ self.SAMPLE_DIR / "simple_text.eml",
+ "message/rfc822",
+ )
- @mock.patch("paperless_mail.parsers.requests.post")
- @mock.patch("paperless_mail.parsers.MailDocumentParser.mail_to_html")
- def test_generate_pdf_from_mail(
- self,
- mock_mail_to_html: mock.MagicMock,
- mock_post: mock.MagicMock,
- ):
+ def test_generate_pdf_simple_email(self):
"""
GIVEN:
- - Fresh start
+ - Simple text email with no HTML content
WHEN:
- - conversion of PDF from .eml is requested
+ - Email is parsed
THEN:
- - gotenberg should be called with valid intermediary html files, the resulting pdf is returned
+ - Gotenberg is called to generate a PDF from HTML
+ - Archive file is generated
"""
- mock_response = mock.MagicMock()
- mock_response.content = b"Content"
- mock_post.return_value = mock_response
- mock_mail_to_html.return_value = "Testresponse"
+ self.httpx_mock.add_response(
+ url="http://localhost:3000/forms/chromium/convert/html",
+ method="POST",
+ content=(self.SAMPLE_DIR / "simple_text.eml.pdf").read_bytes(),
+ )
- mail = self.parser.get_parsed(os.path.join(self.SAMPLE_FILES, "html.eml"))
+ self.parser.parse(self.SAMPLE_DIR / "simple_text.eml", "message/rfc822")
- retval = self.parser.generate_pdf_from_mail(mail)
- self.assertEqual(b"Content", retval)
+ self.assertIsNotNone(self.parser.archive_path)
- mock_mail_to_html.assert_called_once_with(mail)
- self.assertEqual(
- self.parser.gotenberg_server + "/forms/chromium/convert/html",
- mock_post.call_args.args[0],
- )
- self.assertDictEqual({}, mock_post.call_args.kwargs["headers"])
- self.assertDictEqual(
- {
- "marginTop": "0.1",
- "marginBottom": "0.1",
- "marginLeft": "0.1",
- "marginRight": "0.1",
- "paperWidth": "8.27",
- "paperHeight": "11.7",
- "scale": "1.0",
- "pdfFormat": "PDF/A-2b",
+ def test_generate_pdf_html_email(self):
+ """
+ GIVEN:
+ - email with HTML content
+ WHEN:
+ - Email is parsed
+ THEN:
+ - Gotenberg is called to generate a PDF from HTML
+ - Gotenberg is used to merge the two PDFs
+ - Archive file is generated
+ """
+ self.httpx_mock.add_response(
+ url="http://localhost:9998/tika/text",
+ method="PUT",
+ json={
+ "Content-Type": "text/html",
+ "X-TIKA:Parsed-By": [],
+ "X-TIKA:content": "This is some Tika HTML text",
},
- mock_post.call_args.kwargs["data"],
)
- self.assertEqual(
- "Testresponse",
- mock_post.call_args.kwargs["files"]["html"][1],
+ self.httpx_mock.add_response(
+ url="http://localhost:3000/forms/chromium/convert/html",
+ method="POST",
+ content=(self.SAMPLE_DIR / "html.eml.pdf").read_bytes(),
)
- self.assertEqual(
- "output.css",
- mock_post.call_args.kwargs["files"]["css"][0],
+ self.httpx_mock.add_response(
+ url="http://localhost:3000/forms/pdfengines/merge",
+ method="POST",
+ content=b"Pretend merged PDF content",
)
+ self.parser.parse(self.SAMPLE_DIR / "html.eml", "message/rfc822")
- mock_response.raise_for_status.assert_called_once()
+ self.assertIsNotNone(self.parser.archive_path)
- def test_transform_inline_html(self):
+ def test_generate_pdf_html_email_html_to_pdf_failure(self):
"""
GIVEN:
- - Fresh start
+ - email with HTML content
WHEN:
- - transforming of html content from an email with an inline image attachment is requested
+ - Email is parsed
+ - Conversion of email HTML content to PDF fails
THEN:
- - html is returned and sanitized
- """
-
- class MailAttachmentMock:
- def __init__(self, payload, content_id):
- self.payload = payload
- self.content_id = content_id
+ - ParseError is raised
+ """
+ self.httpx_mock.add_response(
+ url="http://localhost:9998/tika/text",
+ method="PUT",
+ json={
+ "Content-Type": "text/html",
+ "X-TIKA:Parsed-By": [],
+ "X-TIKA:content": "This is some Tika HTML text",
+ },
+ )
+ self.httpx_mock.add_response(
+ url="http://localhost:3000/forms/chromium/convert/html",
+ method="POST",
+ content=(self.SAMPLE_DIR / "html.eml.pdf").read_bytes(),
+ )
+ self.httpx_mock.add_response(
+ url="http://localhost:3000/forms/chromium/convert/html",
+ method="POST",
+ status_code=httpx.codes.INTERNAL_SERVER_ERROR,
+ )
+ with self.assertRaises(ParseError):
+ self.parser.parse(self.SAMPLE_DIR / "html.eml", "message/rfc822")
- result = None
+ def test_generate_pdf_html_email_merge_failure(self):
+ """
+ GIVEN:
+ - email with HTML content
+ WHEN:
+ - Email is parsed
+ - Merging of PDFs fails
+ THEN:
+ - ParseError is raised
+ """
+ self.httpx_mock.add_response(
+ url="http://localhost:9998/tika/text",
+ method="PUT",
+ json={
+ "Content-Type": "text/html",
+ "X-TIKA:Parsed-By": [],
+ "X-TIKA:content": "This is some Tika HTML text",
+ },
+ )
+ self.httpx_mock.add_response(
+ url="http://localhost:3000/forms/chromium/convert/html",
+ method="POST",
+ content=(self.SAMPLE_DIR / "html.eml.pdf").read_bytes(),
+ )
+ self.httpx_mock.add_response(
+ url="http://localhost:3000/forms/pdfengines/merge",
+ method="POST",
+ status_code=httpx.codes.INTERNAL_SERVER_ERROR,
+ )
+ with self.assertRaises(ParseError):
+ self.parser.parse(self.SAMPLE_DIR / "html.eml", "message/rfc822")
- with open(os.path.join(self.SAMPLE_FILES, "sample.html")) as html_file:
- with open(os.path.join(self.SAMPLE_FILES, "sample.png"), "rb") as png_file:
- html = html_file.read()
- png = png_file.read()
- attachments = [
- MailAttachmentMock(png, "part1.pNdUSz0s.D3NqVtPg@example.de"),
- ]
- result = self.parser.transform_inline_html(html, attachments)
+ def test_mail_to_html(self):
+ """
+ GIVEN:
+ - Email message with HTML content
+ WHEN:
+ - Email is parsed
+ THEN:
+ - Resulting HTML is as expected
+ """
+ mail = self.parser.parse_file_to_message(self.SAMPLE_DIR / "html.eml")
+ html_file = self.parser.mail_to_html(mail)
+ expected_html_file = self.SAMPLE_DIR / "html.eml.html"
- resulting_html = result[-1][1].read()
- self.assertTrue(result[-1][0] == "index.html")
- self.assertIn(result[0][0], resulting_html)
- self.assertNotIn("<script", resulting_html.lower())
+ self.assertHTMLEqual(expected_html_file.read_text(), html_file.read_text())
- @mock.patch("paperless_mail.parsers.requests.post")
- def test_generate_pdf_from_html(self, mock_post: mock.MagicMock):
+ def test_generate_pdf_from_mail(
+ self,
+ ):
"""
GIVEN:
- - Fresh start
+ - Email message with HTML content
WHEN:
- - generating pdf from html with inline attachments is attempted
+ - Email is parsed
THEN:
- - gotenberg is called with the correct parameters and the resulting pdf is returned
+ - Gotenberg is used to convert HTML to PDF
"""
- class MailAttachmentMock:
- def __init__(self, payload, content_id):
- self.payload = payload
- self.content_id = content_id
+ self.httpx_mock.add_response(content=b"Content")
- mock_response = mock.MagicMock()
- mock_response.content = b"Content"
- mock_post.return_value = mock_response
+ mail = self.parser.parse_file_to_message(self.SAMPLE_DIR / "html.eml")
- result = None
+ retval = self.parser.generate_pdf_from_mail(mail)
+ self.assertEqual(b"Content", retval.read_bytes())
- with open(os.path.join(self.SAMPLE_FILES, "sample.html")) as html_file:
- with open(os.path.join(self.SAMPLE_FILES, "sample.png"), "rb") as png_file:
- html = html_file.read()
- png = png_file.read()
- attachments = [
- MailAttachmentMock(png, "part1.pNdUSz0s.D3NqVtPg@example.de"),
- ]
- result = self.parser.generate_pdf_from_html(html, attachments)
+ request = self.httpx_mock.get_request()
self.assertEqual(
+ str(request.url),
self.parser.gotenberg_server + "/forms/chromium/convert/html",
- mock_post.call_args.args[0],
- )
- self.assertDictEqual({}, mock_post.call_args.kwargs["headers"])
- self.assertDictEqual(
- {
- "marginTop": "0.1",
- "marginBottom": "0.1",
- "marginLeft": "0.1",
- "marginRight": "0.1",
- "paperWidth": "8.27",
- "paperHeight": "11.7",
- "scale": "1.0",
- },
- mock_post.call_args.kwargs["data"],
)
-
- # read to assert it is a file like object.
- mock_post.call_args.kwargs["files"]["cidpart1pNdUSz0sD3NqVtPgexamplede"][
- 1
- ].read()
- mock_post.call_args.kwargs["files"]["index.html"][1].read()
-
- mock_response.raise_for_status.assert_called_once()
-
- self.assertEqual(b"Content", result)
import os
import time
from unittest import mock
-from urllib.error import HTTPError
-from urllib.request import urlopen
+import httpx
import pytest
from django.test import TestCase
from imagehash import average_hash
from pdfminer.high_level import extract_text
from PIL import Image
-from documents.parsers import run_convert
from documents.tests.utils import FileSystemAssertsMixin
-from paperless_mail.parsers import MailDocumentParser
+from paperless_mail.tests.test_parsers import BaseMailParserTestCase
-class TestParserLive(FileSystemAssertsMixin, TestCase):
- SAMPLE_FILES = os.path.join(os.path.dirname(__file__), "samples")
+class MailAttachmentMock:
+ def __init__(self, payload, content_id):
+ self.payload = payload
+ self.content_id = content_id
+ self.content_type = "image/png"
- def setUp(self) -> None:
- self.parser = MailDocumentParser(logging_group=None)
- def tearDown(self) -> None:
- self.parser.cleanup()
+@pytest.mark.skipif(
+ "PAPERLESS_CI_TEST" not in os.environ,
+ reason="No Gotenberg/Tika servers to test with",
+)
+class TestUrlCanary(TestCase):
+ """
+ Verify certain URLs are still available so testing is valid still
+ """
+ def test_online_image_exception_on_not_available(self):
+ """
+ GIVEN:
+ - Fresh start
+ WHEN:
+ - nonexistent image is requested
+ THEN:
+ - An exception shall be thrown
+ """
+ """
+ A public image is used in the html sample file. We have no control
+ whether this image stays online forever, so here we check if we can detect if is not
+ available anymore.
+ """
+ with self.assertRaises(httpx.HTTPStatusError) as cm:
+ resp = httpx.get(
+ "https://upload.wikimedia.org/wikipedia/en/f/f7/nonexistent.png",
+ )
+ resp.raise_for_status()
+
+ self.assertEqual(cm.exception.response.status_code, httpx.codes.NOT_FOUND)
+
+ def test_is_online_image_still_available(self):
+ """
+ GIVEN:
+ - Fresh start
+ WHEN:
+ - A public image used in the html sample file is requested
+ THEN:
+ - No exception shall be thrown
+ """
+ """
+ A public image is used in the html sample file. We have no control
+ whether this image stays online forever, so here we check if it is still there
+ """
+
+ # Now check the URL used in samples/sample.html
+ resp = httpx.get("https://upload.wikimedia.org/wikipedia/en/f/f7/RickRoll.png")
+ resp.raise_for_status()
+
+
+@pytest.mark.skipif(
+ "PAPERLESS_CI_TEST" not in os.environ,
+ reason="No Gotenberg/Tika servers to test with",
+)
+class TestParserLive(FileSystemAssertsMixin, BaseMailParserTestCase):
@staticmethod
def imagehash(file, hash_size=18):
return f"{average_hash(Image.open(file), hash_size)}"
result = method_or_callable(*args)
succeeded = True
- except Exception as e:
+ except httpx.HTTPError as e:
+ raise
+ # Retry on HTTP errors
print(f"{e} during try #{retry_count}", flush=True)
retry_count = retry_count + 1
time.sleep(retry_time)
retry_time = retry_time * 2.0
+ except Exception:
+ # Not on other error
+ raise
self.assertTrue(
succeeded,
THEN:
- The returned thumbnail image file is as expected
"""
- mock_generate_pdf.return_value = os.path.join(
- self.SAMPLE_FILES,
- "simple_text.eml.pdf",
- )
+ mock_generate_pdf.return_value = self.SAMPLE_DIR / "simple_text.eml.pdf"
thumb = self.parser.get_thumbnail(
- os.path.join(self.SAMPLE_FILES, "simple_text.eml"),
+ self.SAMPLE_DIR / "simple_text.eml",
"message/rfc822",
)
self.assertIsFile(thumb)
- expected = os.path.join(self.SAMPLE_FILES, "simple_text.eml.pdf.webp")
+ expected = self.SAMPLE_DIR / "simple_text.eml.pdf.webp"
self.assertEqual(
self.imagehash(thumb),
f"Created Thumbnail {thumb} differs from expected file {expected}",
)
- @pytest.mark.skipif(
- "TIKA_LIVE" not in os.environ,
- reason="No tika server",
- )
def test_tika_parse_successful(self):
"""
GIVEN:
parsed = self.parser.tika_parse(html)
self.assertEqual(expected_text, parsed.strip())
- @pytest.mark.skipif(
- "TIKA_LIVE" not in os.environ,
- reason="No tika server",
- )
- def test_tika_parse_unsuccessful(self):
- """
- GIVEN:
- - Fresh start
- WHEN:
- - tika parsing fails
- THEN:
- - the parser should return an empty string
- """
- # Check unsuccessful parsing
- parsed = self.parser.tika_parse(None)
- self.assertEqual("", parsed)
-
- @pytest.mark.skipif(
- "GOTENBERG_LIVE" not in os.environ,
- reason="No gotenberg server",
- )
@mock.patch("paperless_mail.parsers.MailDocumentParser.generate_pdf_from_mail")
@mock.patch("paperless_mail.parsers.MailDocumentParser.generate_pdf_from_html")
def test_generate_pdf_gotenberg_merging(
THEN:
- gotenberg is called to merge files and the resulting file is returned
"""
- with open(os.path.join(self.SAMPLE_FILES, "first.pdf"), "rb") as first:
- mock_generate_pdf_from_mail.return_value = first.read()
+ mock_generate_pdf_from_mail.return_value = self.SAMPLE_DIR / "first.pdf"
+ mock_generate_pdf_from_html.return_value = self.SAMPLE_DIR / "second.pdf"
- with open(os.path.join(self.SAMPLE_FILES, "second.pdf"), "rb") as second:
- mock_generate_pdf_from_html.return_value = second.read()
+ msg = self.parser.parse_file_to_message(
+ self.SAMPLE_DIR / "html.eml",
+ )
pdf_path = self.util_call_with_backoff(
self.parser.generate_pdf,
- [os.path.join(self.SAMPLE_FILES, "html.eml")],
+ [msg],
)
self.assertIsFile(pdf_path)
expected = (
"first\tPDF\tto\tbe\tmerged.\n\n\x0csecond\tPDF\tto\tbe\tmerged.\n\n\x0c"
)
- self.assertEqual(expected, extracted)
- @pytest.mark.skipif(
- "GOTENBERG_LIVE" not in os.environ,
- reason="No gotenberg server",
- )
- def test_generate_pdf_from_mail_no_convert(self):
- """
- GIVEN:
- - Fresh start
- WHEN:
- - pdf generation from simple eml file is requested
- THEN:
- - gotenberg is called and the resulting file is returned and contains the expected text.
- """
- mail = self.parser.get_parsed(os.path.join(self.SAMPLE_FILES, "html.eml"))
-
- pdf_path = os.path.join(self.parser.tempdir, "html.eml.pdf")
-
- with open(pdf_path, "wb") as file:
- file.write(
- self.util_call_with_backoff(self.parser.generate_pdf_from_mail, [mail]),
- )
-
- extracted = extract_text(pdf_path)
- expected = extract_text(os.path.join(self.SAMPLE_FILES, "html.eml.pdf"))
self.assertEqual(expected, extracted)
- @pytest.mark.skipif(
- "GOTENBERG_LIVE" not in os.environ,
- reason="No gotenberg server",
- )
def test_generate_pdf_from_mail(self):
"""
GIVEN:
THEN:
- gotenberg is called and the resulting file is returned and look as expected.
"""
- mail = self.parser.get_parsed(os.path.join(self.SAMPLE_FILES, "html.eml"))
-
- pdf_path = os.path.join(self.parser.tempdir, "html.eml.pdf")
- with open(pdf_path, "wb") as file:
- file.write(
- self.util_call_with_backoff(self.parser.generate_pdf_from_mail, [mail]),
- )
-
- converted = os.path.join(
- self.parser.tempdir,
- "html.eml.pdf.webp",
+ self.util_call_with_backoff(
+ self.parser.parse,
+ [self.SAMPLE_DIR / "html.eml", "message/rfc822"],
)
- run_convert(
- density=300,
- scale="500x5000>",
- alpha="remove",
- strip=True,
- trim=False,
- auto_orient=True,
- input_file=f"{pdf_path}", # Do net define an index to convert all pages.
- output_file=converted,
- logging_group=None,
- )
- self.assertIsFile(converted)
- thumb_hash = self.imagehash(converted)
- # The created pdf is not reproducible. But the converted image should always look the same.
- expected_hash = self.imagehash(
- os.path.join(self.SAMPLE_FILES, "html.eml.pdf.webp"),
- )
- self.assertEqual(
- thumb_hash,
- expected_hash,
- f"PDF looks different. Check if {converted} looks weird.",
- )
-
- @pytest.mark.skipif(
- "GOTENBERG_LIVE" not in os.environ,
- reason="No gotenberg server",
- )
- def test_generate_pdf_from_html_no_convert(self):
- """
- GIVEN:
- - Fresh start
- WHEN:
- - pdf generation from html eml file is requested
- THEN:
- - gotenberg is called and the resulting file is returned and contains the expected text.
- """
+ # Check the archive PDF
+ archive_path = self.parser.get_archive_path()
+ archive_text = extract_text(archive_path)
+ expected_archive_text = extract_text(self.SAMPLE_DIR / "html.eml.pdf")
- class MailAttachmentMock:
- def __init__(self, payload, content_id):
- self.payload = payload
- self.content_id = content_id
-
- result = None
-
- with open(os.path.join(self.SAMPLE_FILES, "sample.html")) as html_file:
- with open(os.path.join(self.SAMPLE_FILES, "sample.png"), "rb") as png_file:
- html = html_file.read()
- png = png_file.read()
- attachments = [
- MailAttachmentMock(png, "part1.pNdUSz0s.D3NqVtPg@example.de"),
- ]
- result = self.util_call_with_backoff(
- self.parser.generate_pdf_from_html,
- [html, attachments],
- )
-
- pdf_path = os.path.join(self.parser.tempdir, "sample.html.pdf")
-
- with open(pdf_path, "wb") as file:
- file.write(result)
-
- extracted = extract_text(pdf_path)
- expected = extract_text(os.path.join(self.SAMPLE_FILES, "sample.html.pdf"))
- self.assertEqual(expected, extracted)
+ # Archive includes the HTML content, so use in
+ self.assertIn(expected_archive_text, archive_text)
- @pytest.mark.skipif(
- "GOTENBERG_LIVE" not in os.environ,
- reason="No gotenberg server",
- )
- def test_generate_pdf_from_html(self):
- """
- GIVEN:
- - Fresh start
- WHEN:
- - pdf generation from html eml file is requested
- THEN:
- - gotenberg is called and the resulting file is returned and look as expected.
- """
-
- class MailAttachmentMock:
- def __init__(self, payload, content_id):
- self.payload = payload
- self.content_id = content_id
-
- result = None
-
- with open(os.path.join(self.SAMPLE_FILES, "sample.html")) as html_file:
- with open(os.path.join(self.SAMPLE_FILES, "sample.png"), "rb") as png_file:
- html = html_file.read()
- png = png_file.read()
- attachments = [
- MailAttachmentMock(png, "part1.pNdUSz0s.D3NqVtPg@example.de"),
- ]
- result = self.util_call_with_backoff(
- self.parser.generate_pdf_from_html,
- [html, attachments],
- )
-
- pdf_path = os.path.join(self.parser.tempdir, "sample.html.pdf")
-
- with open(pdf_path, "wb") as file:
- file.write(result)
-
- converted = os.path.join(self.parser.tempdir, "sample.html.pdf.webp")
- run_convert(
- density=300,
- scale="500x5000>",
- alpha="remove",
- strip=True,
- trim=False,
- auto_orient=True,
- input_file=f"{pdf_path}", # Do net define an index to convert all pages.
- output_file=converted,
- logging_group=None,
+ # Check the thumbnail
+ generated_thumbnail = self.parser.get_thumbnail(
+ self.SAMPLE_DIR / "html.eml",
+ "message/rfc822",
)
- self.assertIsFile(converted)
- thumb_hash = self.imagehash(converted)
+ generated_thumbnail_hash = self.imagehash(generated_thumbnail)
# The created pdf is not reproducible. But the converted image should always look the same.
- expected_hash = self.imagehash(
- os.path.join(self.SAMPLE_FILES, "sample.html.pdf.webp"),
- )
+ expected_hash = self.imagehash(self.SAMPLE_DIR / "html.eml.pdf.webp")
self.assertEqual(
- thumb_hash,
+ generated_thumbnail_hash,
expected_hash,
- f"PDF looks different. Check if {converted} looks weird. "
- f"If Rick Astley is shown, Gotenberg loads from web which is bad for Mail content.",
- )
-
- @pytest.mark.skipif(
- "GOTENBERG_LIVE" not in os.environ,
- reason="No gotenberg server",
- )
- def test_online_image_exception_on_not_available(self):
- """
- GIVEN:
- - Fresh start
- WHEN:
- - nonexistent image is requested
- THEN:
- - An exception shall be thrown
- """
- """
- A public image is used in the html sample file. We have no control
- whether this image stays online forever, so here we check if we can detect if is not
- available anymore.
- """
-
- # Start by Testing if nonexistent URL really throws an Exception
- self.assertRaises(
- HTTPError,
- urlopen,
- "https://upload.wikimedia.org/wikipedia/en/f/f7/nonexistent.png",
+ f"PDF looks different. Check if {generated_thumbnail} looks weird.",
)
-
- @pytest.mark.skipif(
- "GOTENBERG_LIVE" not in os.environ,
- reason="No gotenberg server",
- )
- def test_is_online_image_still_available(self):
- """
- GIVEN:
- - Fresh start
- WHEN:
- - A public image used in the html sample file is requested
- THEN:
- - No exception shall be thrown
- """
- """
- A public image is used in the html sample file. We have no control
- whether this image stays online forever, so here we check if it is still there
- """
-
- # Now check the URL used in samples/sample.html
- urlopen("https://upload.wikimedia.org/wikipedia/en/f/f7/RickRoll.png")
import os
from pathlib import Path
-import dateutil.parser
import httpx
from django.conf import settings
-from tika import parser
+from tika_client import TikaClient
from documents.parsers import DocumentParser
from documents.parsers import ParseError
)
def extract_metadata(self, document_path, mime_type):
- tika_server = settings.TIKA_ENDPOINT
-
- # tika does not support a PathLike, only strings
- # ensure this is a string
- document_path = str(document_path)
-
try:
- parsed = parser.from_file(document_path, tika_server)
+ with TikaClient(tika_url=settings.TIKA_ENDPOINT) as client:
+ parsed = client.metadata.from_file(document_path, mime_type)
+ return [
+ {
+ "namespace": "",
+ "prefix": "",
+ "key": key,
+ "value": parsed.data[key],
+ }
+ for key in parsed.data
+ ]
except Exception as e:
self.log.warning(
f"Error while fetching document metadata for {document_path}: {e}",
)
return []
- return [
- {
- "namespace": "",
- "prefix": "",
- "key": key,
- "value": parsed["metadata"][key],
- }
- for key in parsed["metadata"]
- ]
-
- def parse(self, document_path: Path, mime_type, file_name=None):
+ def parse(self, document_path: Path, mime_type: str, file_name=None):
self.log.info(f"Sending {document_path} to Tika server")
- tika_server = settings.TIKA_ENDPOINT
-
- # tika does not support a PathLike, only strings
- # ensure this is a string
- document_path = str(document_path)
try:
- parsed = parser.from_file(document_path, tika_server)
+ with TikaClient(tika_url=settings.TIKA_ENDPOINT) as client:
+ parsed = client.tika.as_text.from_file(document_path, mime_type)
except Exception as err:
raise ParseError(
f"Could not parse {document_path} with tika server at "
- f"{tika_server}: {err}",
+ f"{settings.TIKA_ENDPOINT}: {err}",
) from err
- self.text = parsed["content"].strip()
-
- try:
- self.date = dateutil.parser.isoparse(parsed["metadata"]["Creation-Date"])
- except Exception as e:
- self.log.warning(
- f"Unable to extract date for document {document_path}: {e}",
- )
-
+ self.text = parsed.content.strip()
+ self.date = parsed.metadata.created
self.archive_path = self.convert_to_pdf(document_path, file_name)
def convert_to_pdf(self, document_path, file_name):
from paperless_tika.parsers import TikaDocumentParser
-@pytest.mark.skipif("TIKA_LIVE" not in os.environ, reason="No tika server")
+@pytest.mark.skipif(
+ "PAPERLESS_CI_TEST" not in os.environ,
+ reason="No Gotenberg/Tika servers to test with",
+)
class TestTikaParserAgainstServer(TestCase):
"""
This test case tests the Tika parsing against a live tika server,
def tearDown(self) -> None:
self.parser.cleanup()
- def try_parse_with_wait(self, test_file, mime_type):
+ def try_parse_with_wait(self, test_file: Path, mime_type: str):
"""
For whatever reason, the image started during the test pipeline likes to
segfault sometimes, when run with the exact files that usually pass.
from django.test import TestCase
from django.test import override_settings
-from requests import Response
+from httpx import Request
+from httpx import Response
from rest_framework import status
from documents.parsers import ParseError
from paperless_tika.parsers import TikaDocumentParser
+from paperless_tika.tests.utils import HttpxMockMixin
-class TestTikaParser(TestCase):
+class TestTikaParser(HttpxMockMixin, TestCase):
def setUp(self) -> None:
self.parser = TikaDocumentParser(logging_group=None)
def tearDown(self) -> None:
self.parser.cleanup()
- @mock.patch("paperless_tika.parsers.parser.from_file")
- @mock.patch("paperless_tika.parsers.requests.post")
- def test_parse(self, post, from_file):
- from_file.return_value = {
- "content": "the content",
- "metadata": {"Creation-Date": "2020-11-21"},
- }
- response = Response()
- response._content = b"PDF document"
- response.status_code = status.HTTP_200_OK
- post.return_value = response
+ def test_parse(self):
+ # Pretend parse response
+ self.httpx_mock.add_response(
+ json={
+ "Content-Type": "application/vnd.oasis.opendocument.text",
+ "X-TIKA:Parsed-By": [],
+ "X-TIKA:content": "the content",
+ "dcterms:created": "2020-11-21T00:00:00",
+ },
+ )
+ # Pretend convert to PDF response
+ self.httpx_mock.add_response(content=b"PDF document")
+
+ file = Path(os.path.join(self.parser.tempdir, "input.odt"))
+ file.touch()
- file = os.path.join(self.parser.tempdir, "input.odt")
- Path(file).touch()
self.parser.parse(file, "application/vnd.oasis.opendocument.text")
self.assertEqual(self.parser.text, "the content")
self.assertEqual(self.parser.date, datetime.datetime(2020, 11, 21))
- @mock.patch("paperless_tika.parsers.parser.from_file")
- def test_metadata(self, from_file):
- from_file.return_value = {
- "metadata": {"Creation-Date": "2020-11-21", "Some-key": "value"},
- }
+ def test_metadata(self):
+ self.httpx_mock.add_response(
+ json={
+ "Content-Type": "application/vnd.oasis.opendocument.text",
+ "X-TIKA:Parsed-By": [],
+ "Some-key": "value",
+ "dcterms:created": "2020-11-21T00:00:00",
+ },
+ )
- file = os.path.join(self.parser.tempdir, "input.odt")
- Path(file).touch()
+ file = Path(os.path.join(self.parser.tempdir, "input.odt"))
+ file.touch()
metadata = self.parser.extract_metadata(
file,
"application/vnd.oasis.opendocument.text",
)
- self.assertTrue("Creation-Date" in [m["key"] for m in metadata])
+ self.assertTrue("dcterms:created" in [m["key"] for m in metadata])
self.assertTrue("Some-key" in [m["key"] for m in metadata])
- @mock.patch("paperless_tika.parsers.parser.from_file")
- @mock.patch("paperless_tika.parsers.requests.post")
- def test_convert_failure(self, post, from_file):
+ def test_convert_failure(self):
"""
GIVEN:
- Document needs to be converted to PDF
THEN:
- Parse error is raised
"""
- from_file.return_value = {
- "content": "the content",
- "metadata": {"Creation-Date": "2020-11-21"},
- }
- response = Response()
- response._content = b"PDF document"
- response.status_code = status.HTTP_500_INTERNAL_SERVER_ERROR
- post.return_value = response
+ # Pretend convert to PDF response
+ self.httpx_mock.add_response(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR)
- file = os.path.join(self.parser.tempdir, "input.odt")
- Path(file).touch()
+ file = Path(os.path.join(self.parser.tempdir, "input.odt"))
+ file.touch()
with self.assertRaises(ParseError):
self.parser.convert_to_pdf(file, None)
- @mock.patch("paperless_tika.parsers.requests.post")
+ @mock.patch("paperless_tika.parsers.httpx.post")
def test_request_pdf_a_format(self, post: mock.Mock):
"""
GIVEN:
THEN:
- Request to Gotenberg contains the expected PDF/A format string
"""
- file = os.path.join(self.parser.tempdir, "input.odt")
- Path(file).touch()
+ file = Path(os.path.join(self.parser.tempdir, "input.odt"))
+ file.touch()
- response = Response()
- response._content = b"PDF document"
- response.status_code = status.HTTP_200_OK
+ response = Response(status_code=status.HTTP_200_OK)
+ response.request = Request("POST", "/somewhere/")
post.return_value = response
for setting, expected_key in [