from fnmatch import fnmatch
from typing import Dict
from typing import List
+from typing import Union
import magic
import pathvalidate
from imap_tools import MailMessageFlags
from imap_tools import NOT
from imap_tools.mailbox import MailBoxTls
+from imap_tools.query import LogicOperator
from paperless_mail.models import MailAccount
from paperless_mail.models import MailRule
from paperless_mail.models import ProcessedMail
class BaseMailAction:
- def get_criteria(self) -> Dict:
+ """
+ Base class for mail actions. A mail action is performed on a mail after
+ consumption of the document is complete and is used to signal to the user
+ that this mail was processed by paperless via the mail client.
+
+ Furthermore, mail actions reduce the amount of mails to be analyzed by
+ excluding mails on which the action was already performed (i.e., excluding
+ read mails when the action is to mark mails as read).
+ """
+
+ def get_criteria(self) -> Union[Dict, LogicOperator]:
+ """
+ Returns filtering criteria/query for this mail action.
+ """
return {}
- def post_consume(self, M, message_uid, parameter):
- pass # pragma: nocover
+ def post_consume(self, M: MailBox, message_uid: str, parameter: str):
+ """
+ Perform mail action on the given mail uid in the mailbox.
+ """
+ raise NotImplementedError()
class DeleteMailAction(BaseMailAction):
+ """
+ A mail action that deletes mails after processing.
+ """
+
def post_consume(self, M, message_uid, parameter):
M.delete(message_uid)
class MarkReadMailAction(BaseMailAction):
+ """
+ A mail action that marks mails as read after processing.
+ """
+
def get_criteria(self):
return {"seen": False}
class MoveMailAction(BaseMailAction):
+ """
+ A mail action that moves mails to a different folder after processing.
+ """
+
def post_consume(self, M, message_uid, parameter):
M.move(message_uid, parameter)
class FlagMailAction(BaseMailAction):
+ """
+ A mail action that marks mails as important ("star") after processing.
+ """
+
def get_criteria(self):
return {"flagged": False}
class TagMailAction(BaseMailAction):
+ """
+ A mail action that tags mails after processing.
+ """
+
def __init__(self, parameter):
# The custom tag should look like "apple:<color>"
# AppleMail: We only need to check if mails are \Flagged
if self.color:
return {"flagged": False}
-
- return {"no_keyword": self.keyword, "gmail_label": self.keyword}
+ elif self.keyword:
+ return AND(NOT(gmail_label=self.keyword), no_keyword=self.keyword)
+ else:
+ raise ValueError("This should never happen.")
def post_consume(self, M: MailBox, message_uid, parameter):
if re.search(r"gmail\.com$|googlemail\.com$", M._host):
message_subject: str,
message_date: datetime.datetime,
):
+ """
+ This shared task applies the mail action of a particular mail rule to the
+ given mail. Creates a ProcessedMail object, so that the mail won't be
+ processed in the future.
+ """
+
rule = MailRule.objects.get(pk=rule_id)
account = MailAccount.objects.get(pk=rule.account.pk)
message_subject: str,
message_date: datetime.datetime,
):
+ """
+ A shared task that is called whenever something goes wrong during
+ consumption of a file. See queue_consumption_tasks.
+ """
rule = MailRule.objects.get(pk=rule_id)
ProcessedMail.objects.create(
def queue_consumption_tasks(
+ *,
consume_tasks: list[Signature],
rule: MailRule,
message: MailMessage,
):
+ """
+ Queue a list of consumption tasks (Signatures for the consume_file shared
+ task) with celery.
+ """
+
mail_action_task = apply_mail_action.s(
rule_id=rule.pk,
message_uid=message.uid,
def get_rule_action(rule) -> BaseMailAction:
+ """
+ Returns a BaseMailAction instance for the given rule.
+ """
+
if rule.action == MailRule.MailAction.FLAG:
return FlagMailAction()
elif rule.action == MailRule.MailAction.DELETE:
def make_criterias(rule):
+ """
+ Returns criteria to be applied to MailBox.fetch for the given rule.
+ """
+
maximum_age = date.today() - timedelta(days=rule.maximum_age)
criterias = {}
if rule.maximum_age > 0:
if rule.filter_body:
criterias["body"] = rule.filter_body
- return {**criterias, **get_rule_action(rule).get_criteria()}
+ rule_query = get_rule_action(rule).get_criteria()
+ if isinstance(rule_query, dict):
+ return AND(**rule_query, **criterias)
+ else:
+ return AND(rule_query, **criterias)
def get_mailbox(server, port, security) -> MailBox:
+ """
+ Returns the correct MailBox instance for the given configuration.
+ """
+
if security == MailAccount.ImapSecurity.NONE:
mailbox = MailBoxUnencrypted(server, port)
elif security == MailAccount.ImapSecurity.STARTTLS:
class MailAccountHandler(LoggingMixin):
+ """
+ The main class that handles mail accounts.
+
+ * processes all rules for a given mail account
+ * for each mail rule, fetches relevant mails, and queues documents from
+ matching mails for consumption
+ * marks processed mails in the database, so that they won't be processed
+ again
+ * runs mail actions on the mail server, when consumption is completed
+ """
logging_name = "paperless_mail"
self.log("error", f"Error while retrieving correspondent {name}: {e}")
return None
- def get_title(self, message, att, rule):
+ def _get_title(self, message, att, rule):
if rule.assign_title_from == MailRule.TitleSource.FROM_SUBJECT:
return message.subject
"Unknown title selector.",
) # pragma: nocover
- def get_correspondent(self, message: MailMessage, rule):
+ def _get_correspondent(self, message: MailMessage, rule):
c_from = rule.assign_correspondent_from
if c_from == MailRule.CorrespondentSource.FROM_NOTHING:
) # pragma: nocover
def handle_mail_account(self, account: MailAccount):
+ """
+ Main entry method to handle a specific mail account.
+ """
self.renew_logging_group()
for rule in account.rules.order_by("order"):
try:
- total_processed_files += self.handle_mail_rule(
+ total_processed_files += self._handle_mail_rule(
M,
rule,
- supports_gmail_labels,
)
except Exception as e:
self.log(
return total_processed_files
- def handle_mail_rule(
+ def _handle_mail_rule(
self,
M: MailBox,
rule: MailRule,
- supports_gmail_labels: bool = False,
):
self.log("debug", f"Rule {rule}: Selecting folder {rule.folder}")
criterias = make_criterias(rule)
- # Deal with the Gmail label extension
- if "gmail_label" in criterias:
-
- gmail_label = criterias["gmail_label"]
- del criterias["gmail_label"]
-
- if not supports_gmail_labels:
- criterias_imap = AND(**criterias)
- else:
- criterias_imap = AND(NOT(gmail_label=gmail_label), **criterias)
- else:
- criterias_imap = AND(**criterias)
-
self.log(
"debug",
- f"Rule {rule}: Searching folder with criteria " f"{str(criterias_imap)}",
+ f"Rule {rule}: Searching folder with criteria " f"{str(criterias)}",
)
try:
messages = M.fetch(
- criteria=criterias_imap,
+ criteria=criterias,
mark_seen=False,
charset=rule.account.character_set,
)
continue
try:
- processed_files = self.handle_message(message, rule)
+ processed_files = self._handle_message(message, rule)
total_processed_files += processed_files
mails_processed += 1
return total_processed_files
- def handle_message(self, message, rule: MailRule) -> int:
+ def _handle_message(self, message, rule: MailRule) -> int:
processed_elements = 0
# Skip Message handling when only attachments are to be processed but
f"{len(message.attachments)} attachment(s)",
)
- correspondent = self.get_correspondent(message, rule)
+ correspondent = self._get_correspondent(message, rule)
tag_ids = [tag.id for tag in rule.assign_tags.all()]
doc_type = rule.assign_document_type
rule.consumption_scope == MailRule.ConsumptionScope.EML_ONLY
or rule.consumption_scope == MailRule.ConsumptionScope.EVERYTHING
):
- processed_elements += self.process_eml(
+ processed_elements += self._process_eml(
message,
rule,
correspondent,
rule.consumption_scope == MailRule.ConsumptionScope.ATTACHMENTS_ONLY
or rule.consumption_scope == MailRule.ConsumptionScope.EVERYTHING
):
- processed_elements += self.process_attachments(
+ processed_elements += self._process_attachments(
message,
rule,
correspondent,
return processed_elements
- def process_attachments(
+ def _process_attachments(
self,
message: MailMessage,
rule: MailRule,
):
continue
- title = self.get_title(message, att, rule)
+ title = self._get_title(message, att, rule)
# don't trust the content type of the attachment. Could be
# generic application/octet-stream.
f"by paperless",
)
- queue_consumption_tasks(consume_tasks, rule, message)
+ queue_consumption_tasks(
+ consume_tasks=consume_tasks,
+ rule=rule,
+ message=message,
+ )
return processed_attachments
- def process_eml(
+ def _process_eml(
self,
message: MailMessage,
rule: MailRule,
override_owner_id=rule.owner.id if rule.owner else None,
)
- queue_consumption_tasks([consume_task], rule, message)
+ queue_consumption_tasks(
+ consume_tasks=[consume_task],
+ rule=rule,
+ message=message,
+ )
processed_elements = 1
return processed_elements
from imap_tools import MailMessageFlags
from imap_tools import NOT
from paperless_mail import tasks
+from paperless_mail.mail import apply_mail_action
from paperless_mail.mail import MailAccountHandler
from paperless_mail.mail import MailError
from paperless_mail.mail import TagMailAction
raise Exception()
-_used_uids = set()
-
-
-def create_message(
- attachments: Union[int, List[_AttachmentDef]] = 1,
- body: str = "",
- subject: str = "the suject",
- from_: str = "noone@mail.com",
- seen: bool = False,
- flagged: bool = False,
- processed: bool = False,
-) -> MailMessage:
- email_msg = email.message.EmailMessage()
- # TODO: This does NOT set the UID
- email_msg["Message-ID"] = str(uuid.uuid4())
- email_msg["Subject"] = subject
- email_msg["From"] = from_
- email_msg.set_content(body)
-
- # Either add some default number of attachments
- # or the provided attachments
- if isinstance(attachments, int):
- for i in range(attachments):
- attachment = _AttachmentDef(filename=f"file_{i}.pdf")
- email_msg.add_attachment(
- attachment.content,
- maintype=attachment.maintype,
- subtype=attachment.subtype,
- disposition=attachment.disposition,
- filename=attachment.filename,
- )
- else:
- for attachment in attachments:
- email_msg.add_attachment(
- attachment.content,
- maintype=attachment.maintype,
- subtype=attachment.subtype,
- disposition=attachment.disposition,
- filename=attachment.filename,
- )
-
- # Convert the EmailMessage to an imap_tools MailMessage
- imap_msg = MailMessage.from_bytes(email_msg.as_bytes())
-
- # TODO: Unsure how to add a uid to the actual EmailMessage. This hacks it in,
- # based on how imap_tools uses regex to extract it.
- # This should be a large enough pool
- uid = random.randint(1, 10000)
- while uid in _used_uids:
- uid = random.randint(1, 10000)
- _used_uids.add(uid)
-
- imap_msg._raw_uid_data = f"UID {uid}".encode()
-
- imap_msg.seen = seen
- imap_msg.flagged = flagged
- if processed:
- imap_msg._raw_flag_data.append(b"+FLAGS (processed)")
- MailMessage.flags.fget.cache_clear()
-
- return imap_msg
-
-
def fake_magic_from_buffer(buffer, mime=False):
if mime:
if "PDF" in str(buffer):
@mock.patch("paperless_mail.mail.magic.from_buffer", fake_magic_from_buffer)
class TestMail(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
def setUp(self):
+ self._used_uids = set()
+
+ self.bogus_mailbox = BogusMailBox()
+
patcher = mock.patch("paperless_mail.mail.MailBox")
m = patcher.start()
- self.bogus_mailbox = BogusMailBox()
m.return_value = self.bogus_mailbox
self.addCleanup(patcher.stop)
- patcher = mock.patch("paperless_mail.mail.consume_file.delay")
- self.async_task = patcher.start()
+ patcher = mock.patch("paperless_mail.mail.queue_consumption_tasks")
+ self._queue_consumption_tasks_mock = patcher.start()
self.addCleanup(patcher.stop)
self.reset_bogus_mailbox()
self.mail_account_handler = MailAccountHandler()
super().setUp()
+ def create_message(
+ self,
+ attachments: Union[int, List[_AttachmentDef]] = 1,
+ body: str = "",
+ subject: str = "the suject",
+ from_: str = "noone@mail.com",
+ seen: bool = False,
+ flagged: bool = False,
+ processed: bool = False,
+ ) -> MailMessage:
+ email_msg = email.message.EmailMessage()
+ # TODO: This does NOT set the UID
+ email_msg["Message-ID"] = str(uuid.uuid4())
+ email_msg["Subject"] = subject
+ email_msg["From"] = from_
+ email_msg.set_content(body)
+
+ # Either add some default number of attachments
+ # or the provided attachments
+ if isinstance(attachments, int):
+ for i in range(attachments):
+ attachment = _AttachmentDef(filename=f"file_{i}.pdf")
+ email_msg.add_attachment(
+ attachment.content,
+ maintype=attachment.maintype,
+ subtype=attachment.subtype,
+ disposition=attachment.disposition,
+ filename=attachment.filename,
+ )
+ else:
+ for attachment in attachments:
+ email_msg.add_attachment(
+ attachment.content,
+ maintype=attachment.maintype,
+ subtype=attachment.subtype,
+ disposition=attachment.disposition,
+ filename=attachment.filename,
+ )
+
+ # Convert the EmailMessage to an imap_tools MailMessage
+ imap_msg = MailMessage.from_bytes(email_msg.as_bytes())
+
+ # TODO: Unsure how to add a uid to the actual EmailMessage. This hacks it in,
+ # based on how imap_tools uses regex to extract it.
+ # This should be a large enough pool
+ uid = random.randint(1, 10000)
+ while uid in self._used_uids:
+ uid = random.randint(1, 10000)
+ self._used_uids.add(uid)
+
+ imap_msg._raw_uid_data = f"UID {uid}".encode()
+
+ imap_msg.seen = seen
+ imap_msg.flagged = flagged
+ if processed:
+ imap_msg._raw_flag_data.append(b"+FLAGS (processed)")
+ MailMessage.flags.fget.cache_clear()
+
+ return imap_msg
+
def reset_bogus_mailbox(self):
self.bogus_mailbox.messages = []
self.bogus_mailbox.messages_spam = []
self.bogus_mailbox.messages.append(
- create_message(
+ self.create_message(
subject="Invoice 1",
from_="amazon@amazon.de",
body="cables",
),
)
self.bogus_mailbox.messages.append(
- create_message(
+ self.create_message(
subject="Invoice 2",
body="from my favorite electronic store",
seen=False,
),
)
self.bogus_mailbox.messages.append(
- create_message(
+ self.create_message(
subject="Claim your $10M price now!",
from_="amazon@amazon-some-indian-site.org",
seen=False,
name="a",
assign_correspondent_from=MailRule.CorrespondentSource.FROM_NOTHING,
)
- self.assertIsNone(handler.get_correspondent(message, rule))
+ self.assertIsNone(handler._get_correspondent(message, rule))
rule = MailRule(
name="b",
assign_correspondent_from=MailRule.CorrespondentSource.FROM_EMAIL,
)
- c = handler.get_correspondent(message, rule)
+ c = handler._get_correspondent(message, rule)
self.assertIsNotNone(c)
self.assertEqual(c.name, "someone@somewhere.com")
- c = handler.get_correspondent(message2, rule)
+ c = handler._get_correspondent(message2, rule)
self.assertIsNotNone(c)
self.assertEqual(c.name, "me@localhost.com")
self.assertEqual(c.id, me_localhost.id)
name="c",
assign_correspondent_from=MailRule.CorrespondentSource.FROM_NAME,
)
- c = handler.get_correspondent(message, rule)
+ c = handler._get_correspondent(message, rule)
self.assertIsNotNone(c)
self.assertEqual(c.name, "Someone!")
- c = handler.get_correspondent(message2, rule)
+ c = handler._get_correspondent(message2, rule)
self.assertIsNotNone(c)
self.assertEqual(c.id, me_localhost.id)
assign_correspondent_from=MailRule.CorrespondentSource.FROM_CUSTOM,
assign_correspondent=someone_else,
)
- c = handler.get_correspondent(message, rule)
+ c = handler._get_correspondent(message, rule)
self.assertEqual(c, someone_else)
def test_get_title(self):
name="a",
assign_title_from=MailRule.TitleSource.FROM_FILENAME,
)
- self.assertEqual(handler.get_title(message, att, rule), "this_is_the_file")
+ self.assertEqual(handler._get_title(message, att, rule), "this_is_the_file")
rule = MailRule(
name="b",
assign_title_from=MailRule.TitleSource.FROM_SUBJECT,
)
- self.assertEqual(handler.get_title(message, att, rule), "the message title")
+ self.assertEqual(handler._get_title(message, att, rule), "the message title")
def test_handle_message(self):
- message = create_message(
+ message = self.create_message(
subject="the message title",
from_="Myself",
attachments=2,
)
rule.save()
- result = self.mail_account_handler.handle_message(message, rule)
+ result = self.mail_account_handler._handle_message(message, rule)
self.assertEqual(result, 2)
- self.assertEqual(len(self.async_task.call_args_list), 2)
-
- args1, kwargs1 = self.async_task.call_args_list[0]
- args2, kwargs2 = self.async_task.call_args_list[1]
-
- self.assertIsFile(kwargs1["path"])
-
- self.assertEqual(kwargs1["override_title"], "file_0")
- self.assertEqual(kwargs1["override_filename"], "file_0.pdf")
-
- self.assertIsFile(kwargs2["path"])
-
- self.assertEqual(kwargs2["override_title"], "file_1")
- self.assertEqual(kwargs2["override_filename"], "file_1.pdf")
+ self.verify_queue_consumption_tasks_call_args(
+ [
+ [
+ {"override_title": "file_0", "override_filename": "file_0.pdf"},
+ {"override_title": "file_1", "override_filename": "file_1.pdf"},
+ ],
+ ],
+ )
def test_handle_empty_message(self):
message = namedtuple("MailMessage", [])
message.attachments = []
rule = MailRule()
- result = self.mail_account_handler.handle_message(message, rule)
+ result = self.mail_account_handler._handle_message(message, rule)
- self.assertFalse(self.async_task.called)
+ self.assertFalse(self._queue_consumption_tasks_mock.called)
self.assertEqual(result, 0)
def test_handle_unknown_mime_type(self):
- message = create_message(
+ message = self.create_message(
attachments=[
_AttachmentDef(filename="f1.pdf"),
_AttachmentDef(
)
rule.save()
- result = self.mail_account_handler.handle_message(message, rule)
+ result = self.mail_account_handler._handle_message(message, rule)
self.assertEqual(result, 1)
- self.assertEqual(self.async_task.call_count, 1)
-
- args, kwargs = self.async_task.call_args
- self.assertIsFile(kwargs["path"])
- self.assertEqual(kwargs["override_filename"], "f1.pdf")
+ self.verify_queue_consumption_tasks_call_args(
+ [
+ [
+ {"override_filename": "f1.pdf"},
+ ],
+ ],
+ )
def test_handle_disposition(self):
- message = create_message(
+ message = self.create_message(
attachments=[
_AttachmentDef(
filename="f1.pdf",
)
rule.save()
- result = self.mail_account_handler.handle_message(message, rule)
-
+ result = self.mail_account_handler._handle_message(message, rule)
self.assertEqual(result, 1)
- self.assertEqual(self.async_task.call_count, 1)
-
- args, kwargs = self.async_task.call_args
- self.assertEqual(kwargs["override_filename"], "f2.pdf")
+ self.verify_queue_consumption_tasks_call_args(
+ [
+ [
+ {"override_filename": "f2.pdf"},
+ ],
+ ],
+ )
def test_handle_inline_files(self):
- message = create_message(
+ message = self.create_message(
attachments=[
_AttachmentDef(
filename="f1.pdf",
)
rule.save()
- result = self.mail_account_handler.handle_message(message, rule)
-
+ result = self.mail_account_handler._handle_message(message, rule)
self.assertEqual(result, 2)
- self.assertEqual(self.async_task.call_count, 2)
+ self.verify_queue_consumption_tasks_call_args(
+ [
+ [
+ {"override_filename": "f1.pdf"},
+ {"override_filename": "f2.pdf"},
+ ],
+ ],
+ )
def test_filename_filter(self):
- message = create_message(
+ message = self.create_message(
attachments=[
_AttachmentDef(filename="f1.pdf"),
_AttachmentDef(filename="f2.pdf"),
)
tests = [
- ("*.pdf", ["f1.pdf", "f1.Pdf", "f2.pdf", "f3.pdf", "file.PDf"]),
+ ("*.pdf", ["f1.pdf", "f2.pdf", "f3.pdf", "file.PDf", "f1.Pdf"]),
("f1.pdf", ["f1.pdf", "f1.Pdf"]),
("f1", []),
- ("*", ["f1.pdf", "f2.pdf", "f3.pdf", "f2.png", "f1.Pdf", "file.PDf"]),
+ ("*", ["f1.pdf", "f2.pdf", "f3.pdf", "f2.png", "file.PDf", "f1.Pdf"]),
("*.png", ["f2.png"]),
]
for (pattern, matches) in tests:
- matches.sort()
- self.async_task.reset_mock()
- account = MailAccount(name=str(uuid.uuid4()))
- account.save()
- rule = MailRule(
- name=str(uuid.uuid4()),
- assign_title_from=MailRule.TitleSource.FROM_FILENAME,
- account=account,
- filter_attachment_filename=pattern,
- )
- rule.save()
-
- result = self.mail_account_handler.handle_message(message, rule)
-
- self.assertEqual(result, len(matches), f"Error with pattern: {pattern}")
- filenames = sorted(
- a[1]["override_filename"] for a in self.async_task.call_args_list
- )
- self.assertListEqual(filenames, matches)
+ with self.subTest(msg=pattern):
+ print(f"PATTERN {pattern}")
+ self._queue_consumption_tasks_mock.reset_mock()
+ account = MailAccount(name=str(uuid.uuid4()))
+ account.save()
+ rule = MailRule(
+ name=str(uuid.uuid4()),
+ assign_title_from=MailRule.TitleSource.FROM_FILENAME,
+ account=account,
+ filter_attachment_filename=pattern,
+ )
+ rule.save()
+
+ self.mail_account_handler._handle_message(message, rule)
+ self.verify_queue_consumption_tasks_call_args(
+ [
+ [{"override_filename": m} for m in matches],
+ ],
+ )
def test_handle_mail_account_mark_read(self):
)
self.assertEqual(len(self.bogus_mailbox.messages), 3)
- self.assertEqual(self.async_task.call_count, 0)
self.assertEqual(len(self.bogus_mailbox.fetch("UNSEEN", False)), 2)
+
self.mail_account_handler.handle_mail_account(account)
- self.assertEqual(self.async_task.call_count, 2)
+ self.apply_mail_actions()
+
self.assertEqual(len(self.bogus_mailbox.fetch("UNSEEN", False)), 0)
self.assertEqual(len(self.bogus_mailbox.messages), 3)
filter_subject="Invoice",
)
- self.assertEqual(self.async_task.call_count, 0)
self.assertEqual(len(self.bogus_mailbox.messages), 3)
+
self.mail_account_handler.handle_mail_account(account)
- self.assertEqual(self.async_task.call_count, 2)
+ self.apply_mail_actions()
+
self.assertEqual(len(self.bogus_mailbox.messages), 1)
def test_handle_mail_account_flag(self):
)
self.assertEqual(len(self.bogus_mailbox.messages), 3)
- self.assertEqual(self.async_task.call_count, 0)
self.assertEqual(len(self.bogus_mailbox.fetch("UNFLAGGED", False)), 2)
+
self.mail_account_handler.handle_mail_account(account)
- self.assertEqual(self.async_task.call_count, 1)
+ self.apply_mail_actions()
+
self.assertEqual(len(self.bogus_mailbox.fetch("UNFLAGGED", False)), 1)
self.assertEqual(len(self.bogus_mailbox.messages), 3)
filter_subject="Claim",
)
- self.assertEqual(self.async_task.call_count, 0)
self.assertEqual(len(self.bogus_mailbox.messages), 3)
self.assertEqual(len(self.bogus_mailbox.messages_spam), 0)
self.mail_account_handler.handle_mail_account(account)
+ self.apply_mail_actions()
- self.assertEqual(self.async_task.call_count, 1)
self.assertEqual(len(self.bogus_mailbox.messages), 2)
self.assertEqual(len(self.bogus_mailbox.messages_spam), 1)
)
self.assertEqual(len(self.bogus_mailbox.messages), 3)
- self.assertEqual(self.async_task.call_count, 0)
self.assertEqual(len(self.bogus_mailbox.fetch("UNKEYWORD processed", False)), 2)
+
self.mail_account_handler.handle_mail_account(account)
- self.assertEqual(self.async_task.call_count, 2)
- self.assertEqual(len(self.bogus_mailbox.fetch("UNKEYWORD processed", False)), 0)
+ self.apply_mail_actions()
+
self.assertEqual(len(self.bogus_mailbox.messages), 3)
+ self.assertEqual(len(self.bogus_mailbox.fetch("UNKEYWORD processed", False)), 0)
def test_handle_mail_account_tag_gmail(self):
self.bogus_mailbox._host = "imap.gmail.com"
)
self.assertEqual(len(self.bogus_mailbox.messages), 3)
- self.assertEqual(self.async_task.call_count, 0)
criteria = NOT(gmail_label="processed")
self.assertEqual(len(self.bogus_mailbox.fetch(criteria, False)), 2)
+
self.mail_account_handler.handle_mail_account(account)
- self.assertEqual(self.async_task.call_count, 2)
+ self.apply_mail_actions()
+
self.assertEqual(len(self.bogus_mailbox.fetch(criteria, False)), 0)
self.assertEqual(len(self.bogus_mailbox.messages), 3)
)
self.assertEqual(len(self.bogus_mailbox.messages), 3)
- self.assertEqual(self.async_task.call_count, 0)
self.assertEqual(len(self.bogus_mailbox.fetch("UNFLAGGED", False)), 2)
+
self.mail_account_handler.handle_mail_account(account)
- self.assertEqual(self.async_task.call_count, 2)
+ self.apply_mail_actions()
+
self.assertEqual(len(self.bogus_mailbox.fetch("UNFLAGGED", False)), 0)
self.assertEqual(len(self.bogus_mailbox.messages), 3)
password="wrong",
)
- with self.assertRaises(MailError) as context:
+ with self.assertRaisesRegex(
+ MailError,
+ "Error while authenticating account",
+ ) as context:
self.mail_account_handler.handle_mail_account(account)
- self.assertTrue(
- str(context).startswith("Error while authenticating account"),
- )
def test_error_skip_account(self):
_ = MailAccount.objects.create(
)
tasks.process_mail_accounts()
- self.assertEqual(self.async_task.call_count, 1)
+ self.apply_mail_actions()
+
self.assertEqual(len(self.bogus_mailbox.messages), 2)
self.assertEqual(len(self.bogus_mailbox.messages_spam), 1)
)
self.mail_account_handler.handle_mail_account(account)
- self.assertEqual(self.async_task.call_count, 1)
+ self.apply_mail_actions()
+
self.assertEqual(len(self.bogus_mailbox.messages), 2)
self.assertEqual(len(self.bogus_mailbox.messages_spam), 1)
self.mail_account_handler.handle_mail_account(account)
self.bogus_mailbox.folder.list.assert_called_once()
- self.assertEqual(self.async_task.call_count, 0)
+ self.assertEqual(self._queue_consumption_tasks_mock.call_count, 0)
def test_error_folder_set_error_listing(self):
"""
self.mail_account_handler.handle_mail_account(account)
self.bogus_mailbox.folder.list.assert_called_once()
- self.assertEqual(self.async_task.call_count, 0)
+ self.assertEqual(self._queue_consumption_tasks_mock.call_count, 0)
- @mock.patch("paperless_mail.mail.MailAccountHandler.get_correspondent")
+ @mock.patch("paperless_mail.mail.MailAccountHandler._get_correspondent")
def test_error_skip_mail(self, m):
def get_correspondent_fake(message, rule):
if message.from_ == "amazon@amazon.de":
)
self.mail_account_handler.handle_mail_account(account)
+ self.apply_mail_actions()
# test that we still consume mail even if some mails throw errors.
- self.assertEqual(self.async_task.call_count, 2)
+ self.assertEqual(self._queue_consumption_tasks_mock.call_count, 2)
# faulty mail still in inbox, untouched
self.assertEqual(len(self.bogus_mailbox.messages), 1)
self.mail_account_handler.handle_mail_account(account)
- self.async_task.assert_called_once()
+ self._queue_consumption_tasks_mock.assert_called_once()
args, kwargs = self.async_task.call_args
c = Correspondent.objects.get(name="amazon@amazon.de")
username="admin",
password="secret",
)
- rule = MailRule.objects.create(
- name="testrule3",
- account=account,
- action=MailRule.MailAction.DELETE,
- filter_subject="Claim",
- )
-
- self.assertEqual(self.async_task.call_count, 0)
-
- self.assertEqual(len(self.bogus_mailbox.messages), 3)
- self.mail_account_handler.handle_mail_account(account)
- self.assertEqual(len(self.bogus_mailbox.messages), 2)
- self.assertEqual(self.async_task.call_count, 1)
-
- self.reset_bogus_mailbox()
-
- rule.filter_subject = None
- rule.filter_body = "electronic"
- rule.save()
- self.assertEqual(len(self.bogus_mailbox.messages), 3)
- self.mail_account_handler.handle_mail_account(account)
- self.assertEqual(len(self.bogus_mailbox.messages), 2)
- self.assertEqual(self.async_task.call_count, 2)
- self.reset_bogus_mailbox()
-
- rule.filter_from = "amazon"
- rule.filter_body = None
- rule.save()
- self.assertEqual(len(self.bogus_mailbox.messages), 3)
- self.mail_account_handler.handle_mail_account(account)
- self.assertEqual(len(self.bogus_mailbox.messages), 1)
- self.assertEqual(self.async_task.call_count, 4)
-
- self.reset_bogus_mailbox()
-
- rule.filter_from = "amazon"
- rule.filter_body = "cables"
- rule.filter_subject = "Invoice"
- rule.save()
- self.assertEqual(len(self.bogus_mailbox.messages), 3)
- self.mail_account_handler.handle_mail_account(account)
- self.assertEqual(len(self.bogus_mailbox.messages), 2)
- self.assertEqual(self.async_task.call_count, 5)
+ for (f_body, f_from, f_subject, expected_mail_count) in [
+ (None, None, "Claim", 1),
+ ("electronic", None, None, 1),
+ (None, "amazon", None, 2),
+ ("cables", "amazon", "Invoice", 1),
+ ]:
+ with self.subTest(f_body=f_body, f_from=f_from, f_subject=f_subject):
+ MailRule.objects.all().delete()
+ rule = MailRule.objects.create(
+ name="testrule3",
+ account=account,
+ action=MailRule.MailAction.DELETE,
+ filter_subject=f_subject,
+ filter_body=f_body,
+ filter_from=f_from,
+ )
+ self.reset_bogus_mailbox()
+ self._queue_consumption_tasks_mock.reset_mock()
+
+ self.assertEqual(self._queue_consumption_tasks_mock.call_count, 0)
+ self.assertEqual(len(self.bogus_mailbox.messages), 3)
+
+ self.mail_account_handler.handle_mail_account(account)
+ self.apply_mail_actions()
+
+ self.assertEqual(
+ len(self.bogus_mailbox.messages),
+ 3 - expected_mail_count,
+ )
+ self.assertEqual(
+ self._queue_consumption_tasks_mock.call_count,
+ expected_mail_count,
+ )
def test_auth_plain_fallback(self):
"""
)
self.assertEqual(len(self.bogus_mailbox.messages), 3)
- self.assertEqual(self.async_task.call_count, 0)
+ self.assertEqual(self._queue_consumption_tasks_mock.call_count, 0)
self.assertEqual(len(self.bogus_mailbox.fetch("UNSEEN", False)), 2)
self.mail_account_handler.handle_mail_account(account)
+ self.apply_mail_actions()
- self.assertEqual(self.async_task.call_count, 2)
+ self.assertEqual(self._queue_consumption_tasks_mock.call_count, 2)
self.assertEqual(len(self.bogus_mailbox.fetch("UNSEEN", False)), 0)
self.assertEqual(len(self.bogus_mailbox.messages), 3)
account,
)
+ def verify_queue_consumption_tasks_call_args(self, params):
+
+ self.assertEqual(
+ len(self._queue_consumption_tasks_mock.call_args_list),
+ len(params),
+ )
+
+ for (args, kwargs), param in zip(
+ self._queue_consumption_tasks_mock.call_args_list,
+ params,
+ ):
+
+ consume_tasks = kwargs["consume_tasks"]
+
+ self.assertEqual(len(consume_tasks), len(param))
+
+ for consume_task, p in zip(consume_tasks, param):
+ self.assertIsFile(consume_task.kwargs["path"])
+ for key, value in p.items():
+ self.assertIn(key, consume_task.kwargs)
+ self.assertEqual(consume_task.kwargs[key], value)
+
+ def apply_mail_actions(self):
+ for args, kwargs in self._queue_consumption_tasks_mock.call_args_list:
+ message = kwargs["message"]
+ rule = kwargs["rule"]
+ apply_mail_action([], rule.pk, message.uid, message.subject, message.date)
+
class TestManagementCommand(TestCase):
@mock.patch(