+import dataclasses
+import email.contentmanager
import os
+import random
import uuid
from collections import namedtuple
from typing import ContextManager
+from typing import List
+from typing import Union
from unittest import mock
from django.core.management import call_command
from documents.tests.utils import DirectoriesMixin
from imap_tools import EmailAddress
from imap_tools import MailboxFolderSelectError
+from imap_tools import MailMessage
from imap_tools import MailMessageFlags
from paperless_mail import tasks
from paperless_mail.mail import MailAccountHandler
from paperless_mail.models import MailRule
+@dataclasses.dataclass
+class _AttachmentDef(object):
+ filename: str = "a_file.pdf"
+ maintype: str = "application/pdf"
+ subtype: str = "pdf"
+ disposition: str = "attachment"
+ content: bytes = b"a PDF document"
+
+
class BogusFolderManager:
current_folder = "INBOX"
pass
def __init__(self):
- self.messages = []
- self.messages_spam = []
+ self.messages: List[MailMessage] = []
+ self.messages_spam: List[MailMessage] = []
def login(self, username, password):
if not (username == "admin" and password == "secret"):
if "BODY" in criteria:
body = criteria[criteria.index("BODY") + 1].strip('"')
- msg = filter(lambda m: body in m.body, msg)
+ msg = filter(lambda m: body in m.text, msg)
if "FROM" in criteria:
from_ = criteria[criteria.index("FROM") + 1].strip('"')
def move(self, uid_list, folder):
if folder == "spam":
- self.messages_spam.append(
+ self.messages_spam += list(
filter(lambda m: m.uid in uid_list, self.messages),
)
self.messages = list(filter(lambda m: m.uid not in uid_list, self.messages))
raise Exception()
+_used_uids = set()
+
+
def create_message(
- num_attachments=1,
- body="",
- subject="the suject",
- from_="noone@mail.com",
- seen=False,
- flagged=False,
-):
- message = namedtuple("MailMessage", [])
-
- message.uid = uuid.uuid4()
- message.subject = subject
- message.attachments = []
- message.from_ = from_
- message.body = body
- for i in range(num_attachments):
- message.attachments.append(create_attachment(filename=f"file_{i}.pdf"))
-
- message.seen = seen
- message.flagged = flagged
-
- return message
-
-
-def create_attachment(
- filename="the_file.pdf",
- content_disposition="attachment",
- payload=b"a PDF document",
-):
- attachment = namedtuple("Attachment", [])
- attachment.filename = filename
- attachment.content_disposition = content_disposition
- attachment.payload = payload
- return attachment
+ attachments: Union[int, List[_AttachmentDef]] = 1,
+ body: str = "",
+ subject: str = "the suject",
+ from_: str = "noone@mail.com",
+ seen: bool = False,
+ flagged: bool = False,
+) -> MailMessage:
+ email_msg = email.message.EmailMessage()
+ # TODO: This does NOT set the UID
+ email_msg["Message-ID"] = str(uuid.uuid4())
+ email_msg["Subject"] = subject
+ email_msg["From"] = from_
+ email_msg.set_content(body)
+
+ # Either add some default number of attachments
+ # or the provided attachments
+ if isinstance(attachments, int):
+ for i in range(attachments):
+ attachment = _AttachmentDef(filename=f"file_{i}.pdf")
+ email_msg.add_attachment(
+ attachment.content,
+ maintype=attachment.maintype,
+ subtype=attachment.subtype,
+ disposition=attachment.disposition,
+ filename=attachment.filename,
+ )
+ else:
+ for attachment in attachments:
+ email_msg.add_attachment(
+ attachment.content,
+ maintype=attachment.maintype,
+ subtype=attachment.subtype,
+ disposition=attachment.disposition,
+ filename=attachment.filename,
+ )
+
+ # Convert the EmailMessage to an imap_tools MailMessage
+ imap_msg = MailMessage.from_bytes(email_msg.as_bytes())
+
+ # TODO: Unsure how to add a uid to the actual EmailMessage. This hacks it in,
+ # based on how imap_tools uses regex to extract it.
+ # This should be a large enough pool
+ uid = random.randint(1, 10000)
+ while uid in _used_uids:
+ uid = random.randint(1, 10000)
+ _used_uids.add(uid)
+
+ imap_msg._raw_uid_data = f"UID {uid}".encode()
+
+ imap_msg.seen = seen
+ imap_msg.flagged = flagged
+
+ return imap_msg
def fake_magic_from_buffer(buffer, mime=False):
message = create_message(
subject="the message title",
from_="Myself",
- num_attachments=2,
+ attachments=2,
)
account = MailAccount()
self.assertEqual(result, 0)
def test_handle_unknown_mime_type(self):
- message = create_message()
- message.attachments = [
- create_attachment(filename="f1.pdf"),
- create_attachment(
- filename="f2.json",
- payload=b"{'much': 'payload.', 'so': 'json', 'wow': true}",
- ),
- ]
+ message = create_message(
+ attachments=[
+ _AttachmentDef(filename="f1.pdf"),
+ _AttachmentDef(
+ filename="f2.json",
+ content=b"{'much': 'payload.', 'so': 'json', 'wow': true}",
+ ),
+ ],
+ )
account = MailAccount()
rule = MailRule(assign_title_from=MailRule.TITLE_FROM_FILENAME, account=account)
self.assertEqual(kwargs["override_filename"], "f1.pdf")
def test_handle_disposition(self):
- message = create_message()
- message.attachments = [
- create_attachment(filename="f1.pdf", content_disposition="inline"),
- create_attachment(filename="f2.pdf", content_disposition="attachment"),
- ]
+ message = create_message(
+ attachments=[
+ _AttachmentDef(
+ filename="f1.pdf",
+ disposition="inline",
+ ),
+ _AttachmentDef(filename="f2.pdf"),
+ ],
+ )
account = MailAccount()
rule = MailRule(assign_title_from=MailRule.TITLE_FROM_FILENAME, account=account)
self.assertEqual(kwargs["override_filename"], "f2.pdf")
def test_handle_inline_files(self):
- message = create_message()
- message.attachments = [
- create_attachment(filename="f1.pdf", content_disposition="inline"),
- create_attachment(filename="f2.pdf", content_disposition="attachment"),
- ]
+ message = create_message(
+ attachments=[
+ _AttachmentDef(
+ filename="f1.pdf",
+ disposition="inline",
+ ),
+ _AttachmentDef(filename="f2.pdf"),
+ ],
+ )
account = MailAccount()
rule = MailRule(
self.assertEqual(self.async_task.call_count, 2)
def test_filename_filter(self):
- message = create_message()
- message.attachments = [
- create_attachment(filename="f1.pdf"),
- create_attachment(filename="f2.pdf"),
- create_attachment(filename="f3.pdf"),
- create_attachment(filename="f2.png"),
- ]
+ message = create_message(
+ attachments=[
+ _AttachmentDef(filename="f1.pdf"),
+ _AttachmentDef(filename="f2.pdf"),
+ _AttachmentDef(filename="f3.pdf"),
+ _AttachmentDef(filename="f2.png"),
+ ],
+ )
tests = [
("*.pdf", ["f1.pdf", "f2.pdf", "f3.pdf"]),
password="secret",
)
- rule = MailRule.objects.create(
+ _ = MailRule.objects.create(
name="testrule",
account=account,
action=MailRule.ACTION_MARK_READ,
password="secret",
)
- rule = MailRule.objects.create(
+ _ = MailRule.objects.create(
name="testrule",
account=account,
action=MailRule.ACTION_DELETE,
password="secret",
)
- rule = MailRule.objects.create(
+ _ = MailRule.objects.create(
name="testrule",
account=account,
action=MailRule.ACTION_FLAG,
password="secret",
)
- rule = MailRule.objects.create(
+ _ = MailRule.objects.create(
name="testrule",
account=account,
action=MailRule.ACTION_MOVE,
self.assertEqual(self.async_task.call_count, 0)
self.assertEqual(len(self.bogus_mailbox.messages), 3)
self.assertEqual(len(self.bogus_mailbox.messages_spam), 0)
+
self.mail_account_handler.handle_mail_account(account)
+
self.assertEqual(self.async_task.call_count, 1)
self.assertEqual(len(self.bogus_mailbox.messages), 2)
self.assertEqual(len(self.bogus_mailbox.messages_spam), 1)
)
def test_error_skip_account(self):
- account_faulty = MailAccount.objects.create(
+ _ = MailAccount.objects.create(
name="test",
imap_server="",
username="admin",
username="admin",
password="secret",
)
- rule = MailRule.objects.create(
+ _ = MailRule.objects.create(
name="testrule",
account=account,
action=MailRule.ACTION_MOVE,
username="admin",
password="secret",
)
- rule = MailRule.objects.create(
+ _ = MailRule.objects.create(
name="testrule",
account=account,
action=MailRule.ACTION_MOVE,
order=1,
folder="uuuhhhh",
)
- rule2 = MailRule.objects.create(
+ _ = MailRule.objects.create(
name="testrule2",
account=account,
action=MailRule.ACTION_MOVE,
username="admin",
password="secret",
)
- rule = MailRule.objects.create(
+ _ = MailRule.objects.create(
name="testrule",
account=account,
action=MailRule.ACTION_MOVE,
username="admin",
password="secret",
)
- rule = MailRule.objects.create(
+ _ = MailRule.objects.create(
name="testrule",
filter_from="amazon@amazon.de",
account=account,