return "Some verbose file description"
-@mock.patch("paperless_mail.mail.magic.from_buffer", fake_magic_from_buffer)
-class TestMail(
- DirectoriesMixin,
- FileSystemAssertsMixin,
- TestCase,
-):
- def setUp(self):
+class MessageBuilder:
+ def __init__(self):
self._used_uids = set()
- self.bogus_mailbox = BogusMailBox()
-
- patcher = mock.patch("paperless_mail.mail.MailBox")
- m = patcher.start()
- m.return_value = self.bogus_mailbox
- self.addCleanup(patcher.stop)
-
- patcher = mock.patch("paperless_mail.mail.queue_consumption_tasks")
- self._queue_consumption_tasks_mock = patcher.start()
- self.addCleanup(patcher.stop)
-
- self.reset_bogus_mailbox()
-
- self.mail_account_handler = MailAccountHandler()
- super().setUp()
-
def create_message(
self,
attachments: Union[int, list[_AttachmentDef]] = 1,
return imap_msg
- def reset_bogus_mailbox(self):
- self.bogus_mailbox.messages = []
- self.bogus_mailbox.messages_spam = []
- self.bogus_mailbox.messages.append(
- self.create_message(
- subject="Invoice 1",
- from_="amazon@amazon.de",
- to=["me@myselfandi.com", "helpdesk@mydomain.com"],
- body="cables",
- seen=True,
- flagged=False,
- processed=False,
- ),
- )
- self.bogus_mailbox.messages.append(
- self.create_message(
- subject="Invoice 2",
- body="from my favorite electronic store",
- to=["invoices@mycompany.com"],
- seen=False,
- flagged=True,
- processed=True,
- ),
- )
- self.bogus_mailbox.messages.append(
- self.create_message(
- subject="Claim your $10M price now!",
- from_="amazon@amazon-some-indian-site.org",
- to="special@me.me",
- seen=False,
- ),
+
+def reset_bogus_mailbox(bogus_mailbox: BogusMailBox, message_builder: MessageBuilder):
+ bogus_mailbox.messages = []
+ bogus_mailbox.messages_spam = []
+ bogus_mailbox.messages.append(
+ message_builder.create_message(
+ subject="Invoice 1",
+ from_="amazon@amazon.de",
+ to=["me@myselfandi.com", "helpdesk@mydomain.com"],
+ body="cables",
+ seen=True,
+ flagged=False,
+ processed=False,
+ ),
+ )
+ bogus_mailbox.messages.append(
+ message_builder.create_message(
+ subject="Invoice 2",
+ body="from my favorite electronic store",
+ to=["invoices@mycompany.com"],
+ seen=False,
+ flagged=True,
+ processed=True,
+ ),
+ )
+ bogus_mailbox.messages.append(
+ message_builder.create_message(
+ subject="Claim your $10M price now!",
+ from_="amazon@amazon-some-indian-site.org",
+ to=["special@me.me"],
+ seen=False,
+ ),
+ )
+ bogus_mailbox.updateClient()
+
+
+class MailMocker(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
+ def setUp(self):
+ self.bogus_mailbox = BogusMailBox()
+ self.messageBuilder = MessageBuilder()
+
+ reset_bogus_mailbox(self.bogus_mailbox, self.messageBuilder)
+
+ patcher = mock.patch("paperless_mail.mail.MailBox")
+ m = patcher.start()
+ m.return_value = self.bogus_mailbox
+ self.addCleanup(patcher.stop)
+
+ patcher = mock.patch("paperless_mail.mail.queue_consumption_tasks")
+ self._queue_consumption_tasks_mock = patcher.start()
+ self.addCleanup(patcher.stop)
+
+ super().setUp()
+
+ def assert_queue_consumption_tasks_call_args(
+ self,
+ expected_call_args: list[list[dict[str, str]]],
+ ):
+ """
+ Verifies that queue_consumption_tasks has been called with the expected arguments.
+
+ expected_call_args is the following format:
+
+ * List of calls to queue_consumption_tasks, called once per mail, where each element is:
+ * List of signatures for the consume_file task, where each element is:
+ * dictionary containing arguments that need to be present in the consume_file signature.
+
+ """
+
+ # assert number of calls to queue_consumption_tasks match
+ self.assertEqual(
+ len(self._queue_consumption_tasks_mock.call_args_list),
+ len(expected_call_args),
)
- self.bogus_mailbox.updateClient()
+
+ for (mock_args, mock_kwargs), expected_signatures in zip(
+ self._queue_consumption_tasks_mock.call_args_list,
+ expected_call_args,
+ ):
+ consume_tasks = mock_kwargs["consume_tasks"]
+
+ # assert number of consume_file tasks match
+ self.assertEqual(len(consume_tasks), len(expected_signatures))
+
+ for consume_task, expected_signature in zip(
+ consume_tasks,
+ expected_signatures,
+ ):
+ input_doc, overrides = consume_task.args
+
+ # assert the file exists
+ self.assertIsFile(input_doc.original_file)
+
+ # assert all expected arguments are present in the signature
+ for key, value in expected_signature.items():
+ if key == "override_correspondent_id":
+ self.assertEqual(overrides.correspondent_id, value)
+ elif key == "override_filename":
+ self.assertEqual(overrides.filename, value)
+ elif key == "override_title":
+ self.assertEqual(overrides.title, value)
+ else:
+ self.fail("No match for expected arg")
+
+ def apply_mail_actions(self):
+ """
+ Applies pending actions to mails by inspecting calls to the queue_consumption_tasks method.
+ """
+ for args, kwargs in self._queue_consumption_tasks_mock.call_args_list:
+ message = kwargs["message"]
+ rule = kwargs["rule"]
+ apply_mail_action([], rule.pk, message.uid, message.subject, message.date)
+
+
+@mock.patch("paperless_mail.mail.magic.from_buffer", fake_magic_from_buffer)
+class TestMail(
+ DirectoriesMixin,
+ FileSystemAssertsMixin,
+ TestCase,
+):
+ def setUp(self):
+ self.mailMocker = MailMocker()
+ self.mailMocker.setUp()
+ self.mail_account_handler = MailAccountHandler()
+
+ super().setUp()
def test_get_correspondent(self):
message = namedtuple("MailMessage", [])
self.assertEqual(handler._get_title(message, att, rule), None)
def test_handle_message(self):
- message = self.create_message(
+ message = self.mailMocker.messageBuilder.create_message(
subject="the message title",
from_="Myself",
attachments=2,
self.assertEqual(result, 2)
- self._queue_consumption_tasks_mock.assert_called()
+ self.mailMocker._queue_consumption_tasks_mock.assert_called()
- self.assert_queue_consumption_tasks_call_args(
+ self.mailMocker.assert_queue_consumption_tasks_call_args(
[
[
{"override_title": "file_0", "override_filename": "file_0.pdf"},
result = self.mail_account_handler._handle_message(message, rule)
- self._queue_consumption_tasks_mock.assert_not_called()
+ self.mailMocker._queue_consumption_tasks_mock.assert_not_called()
self.assertEqual(result, 0)
def test_handle_unknown_mime_type(self):
- message = self.create_message(
+ message = self.mailMocker.messageBuilder.create_message(
attachments=[
_AttachmentDef(filename="f1.pdf"),
_AttachmentDef(
result = self.mail_account_handler._handle_message(message, rule)
self.assertEqual(result, 1)
- self.assert_queue_consumption_tasks_call_args(
+ self.mailMocker.assert_queue_consumption_tasks_call_args(
[
[
{"override_filename": "f1.pdf"},
)
def test_handle_disposition(self):
- message = self.create_message(
+ message = self.mailMocker.messageBuilder.create_message(
attachments=[
_AttachmentDef(
filename="f1.pdf",
result = self.mail_account_handler._handle_message(message, rule)
self.assertEqual(result, 1)
- self.assert_queue_consumption_tasks_call_args(
+ self.mailMocker.assert_queue_consumption_tasks_call_args(
[
[
{"override_filename": "f2.pdf"},
)
def test_handle_inline_files(self):
- message = self.create_message(
+ message = self.mailMocker.messageBuilder.create_message(
attachments=[
_AttachmentDef(
filename="f1.pdf",
result = self.mail_account_handler._handle_message(message, rule)
self.assertEqual(result, 2)
- self.assert_queue_consumption_tasks_call_args(
+ self.mailMocker.assert_queue_consumption_tasks_call_args(
[
[
{"override_filename": "f1.pdf"},
- Mail action should not be performed for files excluded
- Mail action should be performed for files included
"""
- message = self.create_message(
+ message = self.mailMocker.messageBuilder.create_message(
attachments=[
_AttachmentDef(filename="f1.pdf"),
_AttachmentDef(filename="f2.pdf"),
for test_case in tests:
with self.subTest(msg=test_case.name):
- self._queue_consumption_tasks_mock.reset_mock()
+ self.mailMocker._queue_consumption_tasks_mock.reset_mock()
account = MailAccount(name=str(uuid.uuid4()))
account.save()
rule = MailRule(
rule.save()
self.mail_account_handler._handle_message(message, rule)
- self.assert_queue_consumption_tasks_call_args(
+ self.mailMocker.assert_queue_consumption_tasks_call_args(
[
[{"override_filename": m} for m in test_case.expected_matches],
],
THEN:
- Mail action should not be performed
"""
- message = self.create_message(
+ message = self.mailMocker.messageBuilder.create_message(
attachments=[
_AttachmentDef(
filename="test.png",
),
],
)
- self.bogus_mailbox.messages.append(message)
+ self.mailMocker.bogus_mailbox.messages.append(message)
account = MailAccount.objects.create(
name="test",
imap_server="",
)
rule.save()
- self.assertEqual(len(self.bogus_mailbox.messages), 4)
+ self.assertEqual(len(self.mailMocker.bogus_mailbox.messages), 4)
self.mail_account_handler.handle_mail_account(account)
- self.apply_mail_actions()
+ self.mailMocker.apply_mail_actions()
- self.assertEqual(len(self.bogus_mailbox.messages), 1)
+ self.assertEqual(len(self.mailMocker.bogus_mailbox.messages), 1)
def test_handle_mail_account_mark_read(self):
account = MailAccount.objects.create(
action=MailRule.MailAction.MARK_READ,
)
- self.assertEqual(len(self.bogus_mailbox.messages), 3)
- self.assertEqual(len(self.bogus_mailbox.fetch("UNSEEN", False)), 2)
+ self.assertEqual(len(self.mailMocker.bogus_mailbox.messages), 3)
+ self.assertEqual(len(self.mailMocker.bogus_mailbox.fetch("UNSEEN", False)), 2)
self.mail_account_handler.handle_mail_account(account)
- self.apply_mail_actions()
+ self.mailMocker.apply_mail_actions()
- self.assertEqual(len(self.bogus_mailbox.fetch("UNSEEN", False)), 0)
- self.assertEqual(len(self.bogus_mailbox.messages), 3)
+ self.assertEqual(len(self.mailMocker.bogus_mailbox.fetch("UNSEEN", False)), 0)
+ self.assertEqual(len(self.mailMocker.bogus_mailbox.messages), 3)
def test_handle_mail_account_delete(self):
account = MailAccount.objects.create(
filter_subject="Invoice",
)
- self.assertEqual(len(self.bogus_mailbox.messages), 3)
+ self.assertEqual(len(self.mailMocker.bogus_mailbox.messages), 3)
self.mail_account_handler.handle_mail_account(account)
- self.apply_mail_actions()
+ self.mailMocker.apply_mail_actions()
- self.assertEqual(len(self.bogus_mailbox.messages), 1)
+ self.assertEqual(len(self.mailMocker.bogus_mailbox.messages), 1)
def test_handle_mail_account_delete_no_filters(self):
account = MailAccount.objects.create(
maximum_age=0,
)
- self.assertEqual(len(self.bogus_mailbox.messages), 3)
+ self.assertEqual(len(self.mailMocker.bogus_mailbox.messages), 3)
self.mail_account_handler.handle_mail_account(account)
- self.apply_mail_actions()
+ self.mailMocker.apply_mail_actions()
- self.assertEqual(len(self.bogus_mailbox.messages), 0)
+ self.assertEqual(len(self.mailMocker.bogus_mailbox.messages), 0)
def test_handle_mail_account_flag(self):
account = MailAccount.objects.create(
filter_subject="Invoice",
)
- self.assertEqual(len(self.bogus_mailbox.messages), 3)
- self.assertEqual(len(self.bogus_mailbox.fetch("UNFLAGGED", False)), 2)
+ self.assertEqual(len(self.mailMocker.bogus_mailbox.messages), 3)
+ self.assertEqual(
+ len(self.mailMocker.bogus_mailbox.fetch("UNFLAGGED", False)),
+ 2,
+ )
self.mail_account_handler.handle_mail_account(account)
- self.apply_mail_actions()
+ self.mailMocker.apply_mail_actions()
- self.assertEqual(len(self.bogus_mailbox.fetch("UNFLAGGED", False)), 1)
- self.assertEqual(len(self.bogus_mailbox.messages), 3)
+ self.assertEqual(
+ len(self.mailMocker.bogus_mailbox.fetch("UNFLAGGED", False)),
+ 1,
+ )
+ self.assertEqual(len(self.mailMocker.bogus_mailbox.messages), 3)
@pytest.mark.flaky(reruns=4)
def test_handle_mail_account_move(self):
filter_subject="Claim",
)
- self.assertEqual(len(self.bogus_mailbox.messages), 3)
- self.assertEqual(len(self.bogus_mailbox.messages_spam), 0)
+ self.assertEqual(len(self.mailMocker.bogus_mailbox.messages), 3)
+ self.assertEqual(len(self.mailMocker.bogus_mailbox.messages_spam), 0)
self.mail_account_handler.handle_mail_account(account)
- self.apply_mail_actions()
+ self.mailMocker.apply_mail_actions()
- self.assertEqual(len(self.bogus_mailbox.messages), 2)
- self.assertEqual(len(self.bogus_mailbox.messages_spam), 1)
+ self.assertEqual(len(self.mailMocker.bogus_mailbox.messages), 2)
+ self.assertEqual(len(self.mailMocker.bogus_mailbox.messages_spam), 1)
def test_handle_mail_account_move_no_filters(self):
account = MailAccount.objects.create(
maximum_age=0,
)
- self.assertEqual(len(self.bogus_mailbox.messages), 3)
- self.assertEqual(len(self.bogus_mailbox.messages_spam), 0)
+ self.assertEqual(len(self.mailMocker.bogus_mailbox.messages), 3)
+ self.assertEqual(len(self.mailMocker.bogus_mailbox.messages_spam), 0)
self.mail_account_handler.handle_mail_account(account)
- self.apply_mail_actions()
+ self.mailMocker.apply_mail_actions()
- self.assertEqual(len(self.bogus_mailbox.messages), 0)
- self.assertEqual(len(self.bogus_mailbox.messages_spam), 3)
+ self.assertEqual(len(self.mailMocker.bogus_mailbox.messages), 0)
+ self.assertEqual(len(self.mailMocker.bogus_mailbox.messages_spam), 3)
def test_handle_mail_account_tag(self):
account = MailAccount.objects.create(
action_parameter="processed",
)
- self.assertEqual(len(self.bogus_mailbox.messages), 3)
- self.assertEqual(len(self.bogus_mailbox.fetch("UNKEYWORD processed", False)), 2)
+ self.assertEqual(len(self.mailMocker.bogus_mailbox.messages), 3)
+ self.assertEqual(
+ len(self.mailMocker.bogus_mailbox.fetch("UNKEYWORD processed", False)),
+ 2,
+ )
self.mail_account_handler.handle_mail_account(account)
- self.apply_mail_actions()
+ self.mailMocker.apply_mail_actions()
- self.assertEqual(len(self.bogus_mailbox.messages), 3)
- self.assertEqual(len(self.bogus_mailbox.fetch("UNKEYWORD processed", False)), 0)
+ self.assertEqual(len(self.mailMocker.bogus_mailbox.messages), 3)
+ self.assertEqual(
+ len(self.mailMocker.bogus_mailbox.fetch("UNKEYWORD processed", False)),
+ 0,
+ )
def test_handle_mail_account_tag_gmail(self):
- self.bogus_mailbox._host = "imap.gmail.com"
- self.bogus_mailbox.client.capabilities = ["X-GM-EXT-1"]
+ self.mailMocker.bogus_mailbox._host = "imap.gmail.com"
+ self.mailMocker.bogus_mailbox.client.capabilities = ["X-GM-EXT-1"]
account = MailAccount.objects.create(
name="test",
action_parameter="processed",
)
- self.assertEqual(len(self.bogus_mailbox.messages), 3)
+ self.assertEqual(len(self.mailMocker.bogus_mailbox.messages), 3)
criteria = NOT(gmail_label="processed")
- self.assertEqual(len(self.bogus_mailbox.fetch(criteria, False)), 2)
+ self.assertEqual(len(self.mailMocker.bogus_mailbox.fetch(criteria, False)), 2)
self.mail_account_handler.handle_mail_account(account)
- self.apply_mail_actions()
+ self.mailMocker.apply_mail_actions()
- self.assertEqual(len(self.bogus_mailbox.fetch(criteria, False)), 0)
- self.assertEqual(len(self.bogus_mailbox.messages), 3)
+ self.assertEqual(len(self.mailMocker.bogus_mailbox.fetch(criteria, False)), 0)
+ self.assertEqual(len(self.mailMocker.bogus_mailbox.messages), 3)
def test_tag_mail_action_applemail_wrong_input(self):
self.assertRaises(
action_parameter="apple:green",
)
- self.assertEqual(len(self.bogus_mailbox.messages), 3)
- self.assertEqual(len(self.bogus_mailbox.fetch("UNFLAGGED", False)), 2)
+ self.assertEqual(len(self.mailMocker.bogus_mailbox.messages), 3)
+ self.assertEqual(
+ len(self.mailMocker.bogus_mailbox.fetch("UNFLAGGED", False)),
+ 2,
+ )
self.mail_account_handler.handle_mail_account(account)
- self.apply_mail_actions()
+ self.mailMocker.apply_mail_actions()
- self.assertEqual(len(self.bogus_mailbox.fetch("UNFLAGGED", False)), 0)
- self.assertEqual(len(self.bogus_mailbox.messages), 3)
+ self.assertEqual(
+ len(self.mailMocker.bogus_mailbox.fetch("UNFLAGGED", False)),
+ 0,
+ )
+ self.assertEqual(len(self.mailMocker.bogus_mailbox.messages), 3)
def test_error_login(self):
"""
)
tasks.process_mail_accounts()
- self.apply_mail_actions()
+ self.mailMocker.apply_mail_actions()
- self.assertEqual(len(self.bogus_mailbox.messages), 2)
- self.assertEqual(len(self.bogus_mailbox.messages_spam), 1)
+ self.assertEqual(len(self.mailMocker.bogus_mailbox.messages), 2)
+ self.assertEqual(len(self.mailMocker.bogus_mailbox.messages_spam), 1)
def test_error_skip_rule(self):
account = MailAccount.objects.create(
)
self.mail_account_handler.handle_mail_account(account)
- self.apply_mail_actions()
+ self.mailMocker.apply_mail_actions()
- self.assertEqual(len(self.bogus_mailbox.messages), 2)
- self.assertEqual(len(self.bogus_mailbox.messages_spam), 1)
+ self.assertEqual(len(self.mailMocker.bogus_mailbox.messages), 2)
+ self.assertEqual(len(self.mailMocker.bogus_mailbox.messages_spam), 1)
def test_error_folder_set(self):
"""
folder="uuuhhhh", # Invalid folder name
)
- self.bogus_mailbox.folder.list = mock.Mock(
+ self.mailMocker.bogus_mailbox.folder.list = mock.Mock(
return_value=[FolderInfo("SomeFoldername", "|", ())],
)
self.mail_account_handler.handle_mail_account(account)
- self.bogus_mailbox.folder.list.assert_called_once()
- self._queue_consumption_tasks_mock.assert_not_called()
+ self.mailMocker.bogus_mailbox.folder.list.assert_called_once()
+ self.mailMocker._queue_consumption_tasks_mock.assert_not_called()
def test_error_folder_set_error_listing(self):
"""
folder="uuuhhhh", # Invalid folder name
)
- self.bogus_mailbox.folder.list = mock.Mock(
+ self.mailMocker.bogus_mailbox.folder.list = mock.Mock(
side_effect=MailboxFolderSelectError(None, "uhm"),
)
self.mail_account_handler.handle_mail_account(account)
- self.bogus_mailbox.folder.list.assert_called_once()
- self._queue_consumption_tasks_mock.assert_not_called()
+ self.mailMocker.bogus_mailbox.folder.list.assert_called_once()
+ self.mailMocker._queue_consumption_tasks_mock.assert_not_called()
@mock.patch("paperless_mail.mail.MailAccountHandler._get_correspondent")
def test_error_skip_mail(self, m):
)
self.mail_account_handler.handle_mail_account(account)
- self.apply_mail_actions()
+ self.mailMocker.apply_mail_actions()
# test that we still consume mail even if some mails throw errors.
- self.assertEqual(self._queue_consumption_tasks_mock.call_count, 2)
+ self.assertEqual(self.mailMocker._queue_consumption_tasks_mock.call_count, 2)
# faulty mail still in inbox, untouched
- self.assertEqual(len(self.bogus_mailbox.messages), 1)
- self.assertEqual(self.bogus_mailbox.messages[0].from_, "amazon@amazon.de")
+ self.assertEqual(len(self.mailMocker.bogus_mailbox.messages), 1)
+ self.assertEqual(
+ self.mailMocker.bogus_mailbox.messages[0].from_,
+ "amazon@amazon.de",
+ )
def test_error_create_correspondent(self):
account = MailAccount.objects.create(
self.mail_account_handler.handle_mail_account(account)
- self._queue_consumption_tasks_mock.assert_called_once()
+ self.mailMocker._queue_consumption_tasks_mock.assert_called_once()
c = Correspondent.objects.get(name="amazon@amazon.de")
- self.assert_queue_consumption_tasks_call_args(
+ self.mailMocker.assert_queue_consumption_tasks_call_args(
[
[
{"override_correspondent_id": c.id},
],
)
- self._queue_consumption_tasks_mock.reset_mock()
- self.reset_bogus_mailbox()
+ self.mailMocker._queue_consumption_tasks_mock.reset_mock()
+ reset_bogus_mailbox(
+ self.mailMocker.bogus_mailbox,
+ self.mailMocker.messageBuilder,
+ )
with mock.patch("paperless_mail.mail.Correspondent.objects.get_or_create") as m:
m.side_effect = DatabaseError()
self.mail_account_handler.handle_mail_account(account)
- self.assert_queue_consumption_tasks_call_args(
+ self.mailMocker.assert_queue_consumption_tasks_call_args(
[
[
{"override_correspondent_id": None},
filter_from=f_from,
filter_to=f_to,
)
- self.reset_bogus_mailbox()
- self._queue_consumption_tasks_mock.reset_mock()
+ reset_bogus_mailbox(
+ self.mailMocker.bogus_mailbox,
+ self.mailMocker.messageBuilder,
+ )
+ self.mailMocker._queue_consumption_tasks_mock.reset_mock()
- self._queue_consumption_tasks_mock.assert_not_called()
- self.assertEqual(len(self.bogus_mailbox.messages), 3)
+ self.mailMocker._queue_consumption_tasks_mock.assert_not_called()
+ self.assertEqual(len(self.mailMocker.bogus_mailbox.messages), 3)
self.mail_account_handler.handle_mail_account(account)
- self.apply_mail_actions()
+ self.mailMocker.apply_mail_actions()
self.assertEqual(
- len(self.bogus_mailbox.messages),
+ len(self.mailMocker.bogus_mailbox.messages),
3 - expected_mail_count,
)
self.assertEqual(
- self._queue_consumption_tasks_mock.call_count,
+ self.mailMocker._queue_consumption_tasks_mock.call_count,
expected_mail_count,
)
action=MailRule.MailAction.MARK_READ,
)
- self.assertEqual(len(self.bogus_mailbox.messages), 3)
- self._queue_consumption_tasks_mock.assert_not_called()
- self.assertEqual(len(self.bogus_mailbox.fetch("UNSEEN", False)), 2)
+ self.assertEqual(len(self.mailMocker.bogus_mailbox.messages), 3)
+ self.mailMocker._queue_consumption_tasks_mock.assert_not_called()
+ self.assertEqual(len(self.mailMocker.bogus_mailbox.fetch("UNSEEN", False)), 2)
self.mail_account_handler.handle_mail_account(account)
- self.apply_mail_actions()
+ self.mailMocker.apply_mail_actions()
- self.assertEqual(self._queue_consumption_tasks_mock.call_count, 2)
- self.assertEqual(len(self.bogus_mailbox.fetch("UNSEEN", False)), 0)
- self.assertEqual(len(self.bogus_mailbox.messages), 3)
+ self.assertEqual(self.mailMocker._queue_consumption_tasks_mock.call_count, 2)
+ self.assertEqual(len(self.mailMocker.bogus_mailbox.fetch("UNSEEN", False)), 0)
+ self.assertEqual(len(self.mailMocker.bogus_mailbox.messages), 3)
def test_auth_plain_fallback_fails_still(self):
"""
action=MailRule.MailAction.MARK_READ,
)
- self.assertEqual(len(self.bogus_mailbox.messages), 3)
- self.assertEqual(self._queue_consumption_tasks_mock.call_count, 0)
- self.assertEqual(len(self.bogus_mailbox.fetch("UNSEEN", False)), 2)
+ self.assertEqual(len(self.mailMocker.bogus_mailbox.messages), 3)
+ self.assertEqual(self.mailMocker._queue_consumption_tasks_mock.call_count, 0)
+ self.assertEqual(len(self.mailMocker.bogus_mailbox.fetch("UNSEEN", False)), 2)
self.mail_account_handler.handle_mail_account(account)
- self.apply_mail_actions()
-
- self.assertEqual(self._queue_consumption_tasks_mock.call_count, 2)
- self.assertEqual(len(self.bogus_mailbox.fetch("UNSEEN", False)), 0)
- self.assertEqual(len(self.bogus_mailbox.messages), 3)
-
- def assert_queue_consumption_tasks_call_args(
- self,
- expected_call_args: list[list[dict[str, str]]],
- ):
- """
- Verifies that queue_consumption_tasks has been called with the expected arguments.
-
- expected_call_args is the following format:
-
- * List of calls to queue_consumption_tasks, called once per mail, where each element is:
- * List of signatures for the consume_file task, where each element is:
- * dictionary containing arguments that need to be present in the consume_file signature.
-
- """
-
- # assert number of calls to queue_consumption_tasks match
- self.assertEqual(
- len(self._queue_consumption_tasks_mock.call_args_list),
- len(expected_call_args),
- )
-
- for (mock_args, mock_kwargs), expected_signatures in zip(
- self._queue_consumption_tasks_mock.call_args_list,
- expected_call_args,
- ):
- consume_tasks = mock_kwargs["consume_tasks"]
-
- # assert number of consume_file tasks match
- self.assertEqual(len(consume_tasks), len(expected_signatures))
-
- for consume_task, expected_signature in zip(
- consume_tasks,
- expected_signatures,
- ):
- input_doc, overrides = consume_task.args
-
- # assert the file exists
- self.assertIsFile(input_doc.original_file)
-
- # assert all expected arguments are present in the signature
- for key, value in expected_signature.items():
- if key == "override_correspondent_id":
- self.assertEqual(overrides.correspondent_id, value)
- elif key == "override_filename":
- self.assertEqual(overrides.filename, value)
- elif key == "override_title":
- self.assertEqual(overrides.title, value)
- else:
- self.fail("No match for expected arg")
+ self.mailMocker.apply_mail_actions()
- def apply_mail_actions(self):
- """
- Applies pending actions to mails by inspecting calls to the queue_consumption_tasks method.
- """
- for args, kwargs in self._queue_consumption_tasks_mock.call_args_list:
- message = kwargs["message"]
- rule = kwargs["rule"]
- apply_mail_action([], rule.pk, message.uid, message.subject, message.date)
+ self.assertEqual(self.mailMocker._queue_consumption_tasks_mock.call_count, 2)
+ self.assertEqual(len(self.mailMocker.bogus_mailbox.fetch("UNSEEN", False)), 0)
+ self.assertEqual(len(self.mailMocker.bogus_mailbox.messages), 3)
class TestManagementCommand(TestCase):
--- /dev/null
+import email
+import email.contentmanager
+import tempfile
+from email.message import Message
+from email.mime.application import MIMEApplication
+from email.mime.multipart import MIMEMultipart
+from unittest import mock
+
+import gnupg
+from django.test import override_settings
+from imap_tools import MailMessage
+
+from paperless_mail.mail import MailAccountHandler
+from paperless_mail.models import MailAccount
+from paperless_mail.models import MailRule
+from paperless_mail.preprocessor import MailMessageDecryptor
+from paperless_mail.tests.test_mail import TestMail
+from paperless_mail.tests.test_mail import _AttachmentDef
+
+
+class MessageEncryptor:
+ def __init__(self):
+ self.gpg_home = tempfile.mkdtemp()
+ self.gpg = gnupg.GPG(gnupghome=self.gpg_home)
+ self._testUser = "testuser@example.com"
+ # Generate a new key
+ input_data = self.gpg.gen_key_input(
+ name_email=self._testUser,
+ passphrase=None,
+ key_type="RSA",
+ key_length=2048,
+ expire_date=0,
+ no_protection=True,
+ )
+ self.gpg.gen_key(input_data)
+
+ @staticmethod
+ def get_email_body_without_headers(email_message: Message) -> bytes:
+ """
+ Filters some relevant headers from an EmailMessage and returns just the body.
+ """
+ message_copy = email.message_from_bytes(email_message.as_bytes())
+
+ message_copy._headers = [
+ header
+ for header in message_copy._headers
+ if header[0].lower() not in ("from", "to", "subject")
+ ]
+ return message_copy.as_bytes()
+
+ def encrypt(self, message):
+ original_email: email.message.Message = message.obj
+ encrypted_data = self.gpg.encrypt(
+ self.get_email_body_without_headers(original_email),
+ self._testUser,
+ armor=True,
+ )
+ if not encrypted_data.ok:
+ raise Exception(f"Encryption failed: {encrypted_data.stderr}")
+ encrypted_email_content = encrypted_data.data
+
+ new_email = MIMEMultipart("encrypted", protocol="application/pgp-encrypted")
+ new_email["From"] = original_email["From"]
+ new_email["To"] = original_email["To"]
+ new_email["Subject"] = original_email["Subject"]
+
+ # Add the control part
+ control_part = MIMEApplication(_data=b"", _subtype="pgp-encrypted")
+ control_part.set_payload("Version: 1")
+ new_email.attach(control_part)
+
+ # Add the encrypted data part
+ encrypted_part = MIMEApplication(_data=b"", _subtype="octet-stream")
+ encrypted_part.set_payload(encrypted_email_content.decode("ascii"))
+ encrypted_part.add_header(
+ "Content-Disposition",
+ 'attachment; filename="encrypted.asc"',
+ )
+ new_email.attach(encrypted_part)
+
+ encrypted_message: MailMessage = MailMessage(
+ [(f"UID {message.uid}".encode(), new_email.as_bytes())],
+ )
+ return encrypted_message
+
+
+class TestMailMessageGpgDecryptor(TestMail):
+ def setUp(self):
+ self.messageEncryptor = MessageEncryptor()
+ with override_settings(
+ EMAIL_GNUPG_HOME=self.messageEncryptor.gpg_home,
+ EMAIL_ENABLE_GPG_DECRYPTOR=True,
+ ):
+ super().setUp()
+
+ def test_preprocessor_is_able_to_run(self):
+ with override_settings(
+ EMAIL_GNUPG_HOME=self.messageEncryptor.gpg_home,
+ EMAIL_ENABLE_GPG_DECRYPTOR=True,
+ ):
+ self.assertTrue(MailMessageDecryptor.able_to_run())
+
+ def test_preprocessor_is_able_to_run2(self):
+ with override_settings(
+ EMAIL_GNUPG_HOME=None,
+ EMAIL_ENABLE_GPG_DECRYPTOR=True,
+ ):
+ self.assertTrue(MailMessageDecryptor.able_to_run())
+
+ def test_is_not_able_to_run_disabled(self):
+ with override_settings(
+ EMAIL_ENABLE_GPG_DECRYPTOR=False,
+ ):
+ self.assertFalse(MailMessageDecryptor.able_to_run())
+
+ def test_is_not_able_to_run_bogus_path(self):
+ with override_settings(
+ EMAIL_ENABLE_GPG_DECRYPTOR=True,
+ EMAIL_GNUPG_HOME="_)@# notapath &%#$",
+ ):
+ self.assertFalse(MailMessageDecryptor.able_to_run())
+
+ def test_fails_at_initialization(self):
+ with (
+ mock.patch("gnupg.GPG.__init__") as mock_run,
+ override_settings(
+ EMAIL_ENABLE_GPG_DECRYPTOR=True,
+ ),
+ ):
+
+ def side_effect(*args, **kwargs):
+ raise OSError("Cannot find 'gpg' binary")
+
+ mock_run.side_effect = side_effect
+
+ handler = MailAccountHandler()
+ self.assertEqual(len(handler._message_preprocessors), 0)
+
+ def test_decrypt_fails(self):
+ encrypted_message, _ = self.create_encrypted_unencrypted_message_pair()
+ empty_gpg_home = tempfile.mkdtemp()
+ with override_settings(
+ EMAIL_ENABLE_GPG_DECRYPTOR=True,
+ EMAIL_GNUPG_HOME=empty_gpg_home,
+ ):
+ message_decryptor = MailMessageDecryptor()
+ self.assertRaises(Exception, message_decryptor.run, encrypted_message)
+
+ def test_decrypt_encrypted_mail(self):
+ """
+ Creates a mail with attachments. Then encrypts it with a new key.
+ Verifies that this encrypted message can be decrypted with attachments intact.
+ """
+ encrypted_message, message = self.create_encrypted_unencrypted_message_pair()
+ headers = message.headers
+ text = message.text
+
+ self.assertEqual(len(encrypted_message.attachments), 1)
+ self.assertEqual(encrypted_message.attachments[0].filename, "encrypted.asc")
+ self.assertEqual(encrypted_message.text, "")
+
+ with override_settings(
+ EMAIL_ENABLE_GPG_DECRYPTOR=True,
+ EMAIL_GNUPG_HOME=self.messageEncryptor.gpg_home,
+ ):
+ message_decryptor = MailMessageDecryptor()
+ self.assertTrue(message_decryptor.able_to_run())
+ decrypted_message = message_decryptor.run(encrypted_message)
+
+ self.assertEqual(len(decrypted_message.attachments), 2)
+ self.assertEqual(decrypted_message.attachments[0].filename, "f1.pdf")
+ self.assertEqual(decrypted_message.attachments[1].filename, "f2.pdf")
+ self.assertEqual(decrypted_message.headers, headers)
+ self.assertEqual(decrypted_message.text, text)
+ self.assertEqual(decrypted_message.uid, message.uid)
+
+ def create_encrypted_unencrypted_message_pair(self):
+ message = self.mailMocker.messageBuilder.create_message(
+ body="Test message with 2 attachments",
+ attachments=[
+ _AttachmentDef(
+ filename="f1.pdf",
+ disposition="inline",
+ ),
+ _AttachmentDef(filename="f2.pdf"),
+ ],
+ )
+ encrypted_message = self.messageEncryptor.encrypt(message)
+ return encrypted_message, message
+
+ def test_handle_encrypted_message(self):
+ message = self.mailMocker.messageBuilder.create_message(
+ subject="the message title",
+ from_="Myself",
+ attachments=2,
+ body="Test mail",
+ )
+
+ encrypted_message = self.messageEncryptor.encrypt(message)
+
+ account = MailAccount.objects.create()
+ rule = MailRule(
+ assign_title_from=MailRule.TitleSource.FROM_FILENAME,
+ consumption_scope=MailRule.ConsumptionScope.EVERYTHING,
+ account=account,
+ )
+ rule.save()
+
+ result = self.mail_account_handler._handle_message(encrypted_message, rule)
+
+ self.assertEqual(result, 3)
+
+ self.mailMocker._queue_consumption_tasks_mock.assert_called()
+
+ self.mailMocker.assert_queue_consumption_tasks_call_args(
+ [
+ [
+ {
+ "override_title": message.subject,
+ "override_filename": f"{message.subject}.eml",
+ },
+ ],
+ [
+ {"override_title": "file_0", "override_filename": "file_0.pdf"},
+ {"override_title": "file_1", "override_filename": "file_1.pdf"},
+ ],
+ ],
+ )