]> git.ipfire.org Git - thirdparty/paperless-ngx.git/commitdiff
changes
authorJonas Winkler <17569239+jonaswinkler@users.noreply.github.com>
Thu, 23 Feb 2023 21:02:38 +0000 (22:02 +0100)
committerJonas Winkler <17569239+jonaswinkler@users.noreply.github.com>
Thu, 23 Feb 2023 21:02:38 +0000 (22:02 +0100)
src/paperless_mail/mail.py
src/paperless_mail/tasks.py
src/paperless_mail/tests/test_mail.py

index 740a7cbce7890ec9c911b1ffdf00631dca8079a3..0b35c90043f3ceae3945b4e79a54bdafb1a6a713 100644 (file)
@@ -9,6 +9,7 @@ from datetime import timedelta
 from fnmatch import fnmatch
 from typing import Dict
 from typing import List
+from typing import Union
 
 import magic
 import pathvalidate
@@ -29,6 +30,7 @@ from imap_tools import MailMessage
 from imap_tools import MailMessageFlags
 from imap_tools import NOT
 from imap_tools.mailbox import MailBoxTls
+from imap_tools.query import LogicOperator
 from paperless_mail.models import MailAccount
 from paperless_mail.models import MailRule
 from paperless_mail.models import ProcessedMail
@@ -61,19 +63,43 @@ class MailError(Exception):
 
 
 class BaseMailAction:
-    def get_criteria(self) -> Dict:
+    """
+    Base class for mail actions. A mail action is performed on a mail after
+    consumption of the document is complete and is used to signal to the user
+    that this mail was processed by paperless via the mail client.
+
+    Furthermore, mail actions reduce the amount of mails to be analyzed by
+    excluding mails on which the action was already performed (i.e., excluding
+    read mails when the action is to mark mails as read).
+    """
+
+    def get_criteria(self) -> Union[Dict, LogicOperator]:
+        """
+        Returns filtering criteria/query for this mail action.
+        """
         return {}
 
-    def post_consume(self, M, message_uid, parameter):
-        pass  # pragma: nocover
+    def post_consume(self, M: MailBox, message_uid: str, parameter: str):
+        """
+        Perform mail action on the given mail uid in the mailbox.
+        """
+        raise NotImplementedError()
 
 
 class DeleteMailAction(BaseMailAction):
+    """
+    A mail action that deletes mails after processing.
+    """
+
     def post_consume(self, M, message_uid, parameter):
         M.delete(message_uid)
 
 
 class MarkReadMailAction(BaseMailAction):
+    """
+    A mail action that marks mails as read after processing.
+    """
+
     def get_criteria(self):
         return {"seen": False}
 
@@ -82,11 +108,19 @@ class MarkReadMailAction(BaseMailAction):
 
 
 class MoveMailAction(BaseMailAction):
+    """
+    A mail action that moves mails to a different folder after processing.
+    """
+
     def post_consume(self, M, message_uid, parameter):
         M.move(message_uid, parameter)
 
 
 class FlagMailAction(BaseMailAction):
+    """
+    A mail action that marks mails as important ("star") after processing.
+    """
+
     def get_criteria(self):
         return {"flagged": False}
 
@@ -95,6 +129,10 @@ class FlagMailAction(BaseMailAction):
 
 
 class TagMailAction(BaseMailAction):
+    """
+    A mail action that tags mails after processing.
+    """
+
     def __init__(self, parameter):
 
         # The custom tag should look like "apple:<color>"
@@ -117,8 +155,10 @@ class TagMailAction(BaseMailAction):
         # AppleMail: We only need to check if mails are \Flagged
         if self.color:
             return {"flagged": False}
-
-        return {"no_keyword": self.keyword, "gmail_label": self.keyword}
+        elif self.keyword:
+            return AND(NOT(gmail_label=self.keyword), no_keyword=self.keyword)
+        else:
+            raise ValueError("This should never happen.")
 
     def post_consume(self, M: MailBox, message_uid, parameter):
         if re.search(r"gmail\.com$|googlemail\.com$", M._host):
@@ -158,6 +198,12 @@ def apply_mail_action(
     message_subject: str,
     message_date: datetime.datetime,
 ):
+    """
+    This shared task applies the mail action of a particular mail rule to the
+    given mail. Creates a ProcessedMail object, so that the mail won't be
+    processed in the future.
+    """
+
     rule = MailRule.objects.get(pk=rule_id)
     account = MailAccount.objects.get(pk=rule.account.pk)
 
@@ -208,6 +254,10 @@ def error_callback(
     message_subject: str,
     message_date: datetime.datetime,
 ):
+    """
+    A shared task that is called whenever something goes wrong during
+    consumption of a file. See queue_consumption_tasks.
+    """
     rule = MailRule.objects.get(pk=rule_id)
 
     ProcessedMail.objects.create(
@@ -222,10 +272,16 @@ def error_callback(
 
 
 def queue_consumption_tasks(
+    *,
     consume_tasks: list[Signature],
     rule: MailRule,
     message: MailMessage,
 ):
+    """
+    Queue a list of consumption tasks (Signatures for the consume_file shared
+    task) with celery.
+    """
+
     mail_action_task = apply_mail_action.s(
         rule_id=rule.pk,
         message_uid=message.uid,
@@ -243,6 +299,10 @@ def queue_consumption_tasks(
 
 
 def get_rule_action(rule) -> BaseMailAction:
+    """
+    Returns a BaseMailAction instance for the given rule.
+    """
+
     if rule.action == MailRule.MailAction.FLAG:
         return FlagMailAction()
     elif rule.action == MailRule.MailAction.DELETE:
@@ -258,6 +318,10 @@ def get_rule_action(rule) -> BaseMailAction:
 
 
 def make_criterias(rule):
+    """
+    Returns criteria to be applied to MailBox.fetch for the given rule.
+    """
+
     maximum_age = date.today() - timedelta(days=rule.maximum_age)
     criterias = {}
     if rule.maximum_age > 0:
@@ -269,10 +333,18 @@ def make_criterias(rule):
     if rule.filter_body:
         criterias["body"] = rule.filter_body
 
-    return {**criterias, **get_rule_action(rule).get_criteria()}
+    rule_query = get_rule_action(rule).get_criteria()
+    if isinstance(rule_query, dict):
+        return AND(**rule_query, **criterias)
+    else:
+        return AND(rule_query, **criterias)
 
 
 def get_mailbox(server, port, security) -> MailBox:
+    """
+    Returns the correct MailBox instance for the given configuration.
+    """
+
     if security == MailAccount.ImapSecurity.NONE:
         mailbox = MailBoxUnencrypted(server, port)
     elif security == MailAccount.ImapSecurity.STARTTLS:
@@ -285,6 +357,16 @@ def get_mailbox(server, port, security) -> MailBox:
 
 
 class MailAccountHandler(LoggingMixin):
+    """
+    The main class that handles mail accounts.
+
+    * processes all rules for a given mail account
+    * for each mail rule, fetches relevant mails, and queues documents from
+      matching mails for consumption
+    * marks processed mails in the database, so that they won't be processed
+      again
+    * runs mail actions on the mail server, when consumption is completed
+    """
 
     logging_name = "paperless_mail"
 
@@ -295,7 +377,7 @@ class MailAccountHandler(LoggingMixin):
             self.log("error", f"Error while retrieving correspondent {name}: {e}")
             return None
 
-    def get_title(self, message, att, rule):
+    def _get_title(self, message, att, rule):
         if rule.assign_title_from == MailRule.TitleSource.FROM_SUBJECT:
             return message.subject
 
@@ -307,7 +389,7 @@ class MailAccountHandler(LoggingMixin):
                 "Unknown title selector.",
             )  # pragma: nocover
 
-    def get_correspondent(self, message: MailMessage, rule):
+    def _get_correspondent(self, message: MailMessage, rule):
         c_from = rule.assign_correspondent_from
 
         if c_from == MailRule.CorrespondentSource.FROM_NOTHING:
@@ -332,6 +414,9 @@ class MailAccountHandler(LoggingMixin):
             )  # pragma: nocover
 
     def handle_mail_account(self, account: MailAccount):
+        """
+        Main entry method to handle a specific mail account.
+        """
 
         self.renew_logging_group()
 
@@ -386,10 +471,9 @@ class MailAccountHandler(LoggingMixin):
 
                 for rule in account.rules.order_by("order"):
                     try:
-                        total_processed_files += self.handle_mail_rule(
+                        total_processed_files += self._handle_mail_rule(
                             M,
                             rule,
-                            supports_gmail_labels,
                         )
                     except Exception as e:
                         self.log(
@@ -408,11 +492,10 @@ class MailAccountHandler(LoggingMixin):
 
         return total_processed_files
 
-    def handle_mail_rule(
+    def _handle_mail_rule(
         self,
         M: MailBox,
         rule: MailRule,
-        supports_gmail_labels: bool = False,
     ):
 
         self.log("debug", f"Rule {rule}: Selecting folder {rule.folder}")
@@ -442,27 +525,14 @@ class MailAccountHandler(LoggingMixin):
 
         criterias = make_criterias(rule)
 
-        # Deal with the Gmail label extension
-        if "gmail_label" in criterias:
-
-            gmail_label = criterias["gmail_label"]
-            del criterias["gmail_label"]
-
-            if not supports_gmail_labels:
-                criterias_imap = AND(**criterias)
-            else:
-                criterias_imap = AND(NOT(gmail_label=gmail_label), **criterias)
-        else:
-            criterias_imap = AND(**criterias)
-
         self.log(
             "debug",
-            f"Rule {rule}: Searching folder with criteria " f"{str(criterias_imap)}",
+            f"Rule {rule}: Searching folder with criteria " f"{str(criterias)}",
         )
 
         try:
             messages = M.fetch(
-                criteria=criterias_imap,
+                criteria=criterias,
                 mark_seen=False,
                 charset=rule.account.character_set,
             )
@@ -484,7 +554,7 @@ class MailAccountHandler(LoggingMixin):
                 continue
 
             try:
-                processed_files = self.handle_message(message, rule)
+                processed_files = self._handle_message(message, rule)
 
                 total_processed_files += processed_files
                 mails_processed += 1
@@ -499,7 +569,7 @@ class MailAccountHandler(LoggingMixin):
 
         return total_processed_files
 
-    def handle_message(self, message, rule: MailRule) -> int:
+    def _handle_message(self, message, rule: MailRule) -> int:
         processed_elements = 0
 
         # Skip Message handling when only attachments are to be processed but
@@ -517,7 +587,7 @@ class MailAccountHandler(LoggingMixin):
             f"{len(message.attachments)} attachment(s)",
         )
 
-        correspondent = self.get_correspondent(message, rule)
+        correspondent = self._get_correspondent(message, rule)
         tag_ids = [tag.id for tag in rule.assign_tags.all()]
         doc_type = rule.assign_document_type
 
@@ -525,7 +595,7 @@ class MailAccountHandler(LoggingMixin):
             rule.consumption_scope == MailRule.ConsumptionScope.EML_ONLY
             or rule.consumption_scope == MailRule.ConsumptionScope.EVERYTHING
         ):
-            processed_elements += self.process_eml(
+            processed_elements += self._process_eml(
                 message,
                 rule,
                 correspondent,
@@ -537,7 +607,7 @@ class MailAccountHandler(LoggingMixin):
             rule.consumption_scope == MailRule.ConsumptionScope.ATTACHMENTS_ONLY
             or rule.consumption_scope == MailRule.ConsumptionScope.EVERYTHING
         ):
-            processed_elements += self.process_attachments(
+            processed_elements += self._process_attachments(
                 message,
                 rule,
                 correspondent,
@@ -547,7 +617,7 @@ class MailAccountHandler(LoggingMixin):
 
         return processed_elements
 
-    def process_attachments(
+    def _process_attachments(
         self,
         message: MailMessage,
         rule: MailRule,
@@ -583,7 +653,7 @@ class MailAccountHandler(LoggingMixin):
                 ):
                     continue
 
-            title = self.get_title(message, att, rule)
+            title = self._get_title(message, att, rule)
 
             # don't trust the content type of the attachment. Could be
             # generic application/octet-stream.
@@ -632,11 +702,15 @@ class MailAccountHandler(LoggingMixin):
                     f"by paperless",
                 )
 
-        queue_consumption_tasks(consume_tasks, rule, message)
+        queue_consumption_tasks(
+            consume_tasks=consume_tasks,
+            rule=rule,
+            message=message,
+        )
 
         return processed_attachments
 
-    def process_eml(
+    def _process_eml(
         self,
         message: MailMessage,
         rule: MailRule,
@@ -690,7 +764,11 @@ class MailAccountHandler(LoggingMixin):
             override_owner_id=rule.owner.id if rule.owner else None,
         )
 
-        queue_consumption_tasks([consume_task], rule, message)
+        queue_consumption_tasks(
+            consume_tasks=[consume_task],
+            rule=rule,
+            message=message,
+        )
 
         processed_elements = 1
         return processed_elements
index ab013a41e85d730c4ce4acc7a73ffe1a5885ef28..5c92233de9125cd180d4cea794f1514f48e59edf 100644 (file)
@@ -1,7 +1,6 @@
 import logging
 
 from celery import shared_task
-
 from paperless_mail.mail import MailAccountHandler
 from paperless_mail.mail import MailError
 from paperless_mail.models import MailAccount
index fb3a8e071aac7cf7434fd0f779ac5b621a76495f..b63d11614a9d849225280f89e3680749f043cb23 100644 (file)
@@ -23,6 +23,7 @@ from imap_tools import MailMessage
 from imap_tools import MailMessageFlags
 from imap_tools import NOT
 from paperless_mail import tasks
+from paperless_mail.mail import apply_mail_action
 from paperless_mail.mail import MailAccountHandler
 from paperless_mail.mail import MailError
 from paperless_mail.mail import TagMailAction
@@ -168,69 +169,6 @@ class BogusMailBox(ContextManager):
             raise Exception()
 
 
-_used_uids = set()
-
-
-def create_message(
-    attachments: Union[int, List[_AttachmentDef]] = 1,
-    body: str = "",
-    subject: str = "the suject",
-    from_: str = "noone@mail.com",
-    seen: bool = False,
-    flagged: bool = False,
-    processed: bool = False,
-) -> MailMessage:
-    email_msg = email.message.EmailMessage()
-    # TODO: This does NOT set the UID
-    email_msg["Message-ID"] = str(uuid.uuid4())
-    email_msg["Subject"] = subject
-    email_msg["From"] = from_
-    email_msg.set_content(body)
-
-    # Either add some default number of attachments
-    # or the provided attachments
-    if isinstance(attachments, int):
-        for i in range(attachments):
-            attachment = _AttachmentDef(filename=f"file_{i}.pdf")
-            email_msg.add_attachment(
-                attachment.content,
-                maintype=attachment.maintype,
-                subtype=attachment.subtype,
-                disposition=attachment.disposition,
-                filename=attachment.filename,
-            )
-    else:
-        for attachment in attachments:
-            email_msg.add_attachment(
-                attachment.content,
-                maintype=attachment.maintype,
-                subtype=attachment.subtype,
-                disposition=attachment.disposition,
-                filename=attachment.filename,
-            )
-
-    # Convert the EmailMessage to an imap_tools MailMessage
-    imap_msg = MailMessage.from_bytes(email_msg.as_bytes())
-
-    # TODO: Unsure how to add a uid to the actual EmailMessage. This hacks it in,
-    #  based on how imap_tools uses regex to extract it.
-    #  This should be a large enough pool
-    uid = random.randint(1, 10000)
-    while uid in _used_uids:
-        uid = random.randint(1, 10000)
-    _used_uids.add(uid)
-
-    imap_msg._raw_uid_data = f"UID {uid}".encode()
-
-    imap_msg.seen = seen
-    imap_msg.flagged = flagged
-    if processed:
-        imap_msg._raw_flag_data.append(b"+FLAGS (processed)")
-        MailMessage.flags.fget.cache_clear()
-
-    return imap_msg
-
-
 def fake_magic_from_buffer(buffer, mime=False):
     if mime:
         if "PDF" in str(buffer):
@@ -244,14 +182,17 @@ def fake_magic_from_buffer(buffer, mime=False):
 @mock.patch("paperless_mail.mail.magic.from_buffer", fake_magic_from_buffer)
 class TestMail(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
     def setUp(self):
+        self._used_uids = set()
+
+        self.bogus_mailbox = BogusMailBox()
+
         patcher = mock.patch("paperless_mail.mail.MailBox")
         m = patcher.start()
-        self.bogus_mailbox = BogusMailBox()
         m.return_value = self.bogus_mailbox
         self.addCleanup(patcher.stop)
 
-        patcher = mock.patch("paperless_mail.mail.consume_file.delay")
-        self.async_task = patcher.start()
+        patcher = mock.patch("paperless_mail.mail.queue_consumption_tasks")
+        self._queue_consumption_tasks_mock = patcher.start()
         self.addCleanup(patcher.stop)
 
         self.reset_bogus_mailbox()
@@ -259,11 +200,71 @@ class TestMail(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
         self.mail_account_handler = MailAccountHandler()
         super().setUp()
 
+    def create_message(
+        self,
+        attachments: Union[int, List[_AttachmentDef]] = 1,
+        body: str = "",
+        subject: str = "the suject",
+        from_: str = "noone@mail.com",
+        seen: bool = False,
+        flagged: bool = False,
+        processed: bool = False,
+    ) -> MailMessage:
+        email_msg = email.message.EmailMessage()
+        # TODO: This does NOT set the UID
+        email_msg["Message-ID"] = str(uuid.uuid4())
+        email_msg["Subject"] = subject
+        email_msg["From"] = from_
+        email_msg.set_content(body)
+
+        # Either add some default number of attachments
+        # or the provided attachments
+        if isinstance(attachments, int):
+            for i in range(attachments):
+                attachment = _AttachmentDef(filename=f"file_{i}.pdf")
+                email_msg.add_attachment(
+                    attachment.content,
+                    maintype=attachment.maintype,
+                    subtype=attachment.subtype,
+                    disposition=attachment.disposition,
+                    filename=attachment.filename,
+                )
+        else:
+            for attachment in attachments:
+                email_msg.add_attachment(
+                    attachment.content,
+                    maintype=attachment.maintype,
+                    subtype=attachment.subtype,
+                    disposition=attachment.disposition,
+                    filename=attachment.filename,
+                )
+
+        # Convert the EmailMessage to an imap_tools MailMessage
+        imap_msg = MailMessage.from_bytes(email_msg.as_bytes())
+
+        # TODO: Unsure how to add a uid to the actual EmailMessage. This hacks it in,
+        #  based on how imap_tools uses regex to extract it.
+        #  This should be a large enough pool
+        uid = random.randint(1, 10000)
+        while uid in self._used_uids:
+            uid = random.randint(1, 10000)
+        self._used_uids.add(uid)
+
+        imap_msg._raw_uid_data = f"UID {uid}".encode()
+
+        imap_msg.seen = seen
+        imap_msg.flagged = flagged
+        if processed:
+            imap_msg._raw_flag_data.append(b"+FLAGS (processed)")
+            MailMessage.flags.fget.cache_clear()
+
+        return imap_msg
+
     def reset_bogus_mailbox(self):
         self.bogus_mailbox.messages = []
         self.bogus_mailbox.messages_spam = []
         self.bogus_mailbox.messages.append(
-            create_message(
+            self.create_message(
                 subject="Invoice 1",
                 from_="amazon@amazon.de",
                 body="cables",
@@ -273,7 +274,7 @@ class TestMail(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
             ),
         )
         self.bogus_mailbox.messages.append(
-            create_message(
+            self.create_message(
                 subject="Invoice 2",
                 body="from my favorite electronic store",
                 seen=False,
@@ -282,7 +283,7 @@ class TestMail(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
             ),
         )
         self.bogus_mailbox.messages.append(
-            create_message(
+            self.create_message(
                 subject="Claim your $10M price now!",
                 from_="amazon@amazon-some-indian-site.org",
                 seen=False,
@@ -314,16 +315,16 @@ class TestMail(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
             name="a",
             assign_correspondent_from=MailRule.CorrespondentSource.FROM_NOTHING,
         )
-        self.assertIsNone(handler.get_correspondent(message, rule))
+        self.assertIsNone(handler._get_correspondent(message, rule))
 
         rule = MailRule(
             name="b",
             assign_correspondent_from=MailRule.CorrespondentSource.FROM_EMAIL,
         )
-        c = handler.get_correspondent(message, rule)
+        c = handler._get_correspondent(message, rule)
         self.assertIsNotNone(c)
         self.assertEqual(c.name, "someone@somewhere.com")
-        c = handler.get_correspondent(message2, rule)
+        c = handler._get_correspondent(message2, rule)
         self.assertIsNotNone(c)
         self.assertEqual(c.name, "me@localhost.com")
         self.assertEqual(c.id, me_localhost.id)
@@ -332,10 +333,10 @@ class TestMail(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
             name="c",
             assign_correspondent_from=MailRule.CorrespondentSource.FROM_NAME,
         )
-        c = handler.get_correspondent(message, rule)
+        c = handler._get_correspondent(message, rule)
         self.assertIsNotNone(c)
         self.assertEqual(c.name, "Someone!")
-        c = handler.get_correspondent(message2, rule)
+        c = handler._get_correspondent(message2, rule)
         self.assertIsNotNone(c)
         self.assertEqual(c.id, me_localhost.id)
 
@@ -344,7 +345,7 @@ class TestMail(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
             assign_correspondent_from=MailRule.CorrespondentSource.FROM_CUSTOM,
             assign_correspondent=someone_else,
         )
-        c = handler.get_correspondent(message, rule)
+        c = handler._get_correspondent(message, rule)
         self.assertEqual(c, someone_else)
 
     def test_get_title(self):
@@ -359,15 +360,15 @@ class TestMail(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
             name="a",
             assign_title_from=MailRule.TitleSource.FROM_FILENAME,
         )
-        self.assertEqual(handler.get_title(message, att, rule), "this_is_the_file")
+        self.assertEqual(handler._get_title(message, att, rule), "this_is_the_file")
         rule = MailRule(
             name="b",
             assign_title_from=MailRule.TitleSource.FROM_SUBJECT,
         )
-        self.assertEqual(handler.get_title(message, att, rule), "the message title")
+        self.assertEqual(handler._get_title(message, att, rule), "the message title")
 
     def test_handle_message(self):
-        message = create_message(
+        message = self.create_message(
             subject="the message title",
             from_="Myself",
             attachments=2,
@@ -381,24 +382,18 @@ class TestMail(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
         )
         rule.save()
 
-        result = self.mail_account_handler.handle_message(message, rule)
+        result = self.mail_account_handler._handle_message(message, rule)
 
         self.assertEqual(result, 2)
 
-        self.assertEqual(len(self.async_task.call_args_list), 2)
-
-        args1, kwargs1 = self.async_task.call_args_list[0]
-        args2, kwargs2 = self.async_task.call_args_list[1]
-
-        self.assertIsFile(kwargs1["path"])
-
-        self.assertEqual(kwargs1["override_title"], "file_0")
-        self.assertEqual(kwargs1["override_filename"], "file_0.pdf")
-
-        self.assertIsFile(kwargs2["path"])
-
-        self.assertEqual(kwargs2["override_title"], "file_1")
-        self.assertEqual(kwargs2["override_filename"], "file_1.pdf")
+        self.verify_queue_consumption_tasks_call_args(
+            [
+                [
+                    {"override_title": "file_0", "override_filename": "file_0.pdf"},
+                    {"override_title": "file_1", "override_filename": "file_1.pdf"},
+                ],
+            ],
+        )
 
     def test_handle_empty_message(self):
         message = namedtuple("MailMessage", [])
@@ -406,13 +401,13 @@ class TestMail(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
         message.attachments = []
         rule = MailRule()
 
-        result = self.mail_account_handler.handle_message(message, rule)
+        result = self.mail_account_handler._handle_message(message, rule)
 
-        self.assertFalse(self.async_task.called)
+        self.assertFalse(self._queue_consumption_tasks_mock.called)
         self.assertEqual(result, 0)
 
     def test_handle_unknown_mime_type(self):
-        message = create_message(
+        message = self.create_message(
             attachments=[
                 _AttachmentDef(filename="f1.pdf"),
                 _AttachmentDef(
@@ -430,17 +425,19 @@ class TestMail(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
         )
         rule.save()
 
-        result = self.mail_account_handler.handle_message(message, rule)
+        result = self.mail_account_handler._handle_message(message, rule)
 
         self.assertEqual(result, 1)
-        self.assertEqual(self.async_task.call_count, 1)
-
-        args, kwargs = self.async_task.call_args
-        self.assertIsFile(kwargs["path"])
-        self.assertEqual(kwargs["override_filename"], "f1.pdf")
+        self.verify_queue_consumption_tasks_call_args(
+            [
+                [
+                    {"override_filename": "f1.pdf"},
+                ],
+            ],
+        )
 
     def test_handle_disposition(self):
-        message = create_message(
+        message = self.create_message(
             attachments=[
                 _AttachmentDef(
                     filename="f1.pdf",
@@ -458,16 +455,18 @@ class TestMail(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
         )
         rule.save()
 
-        result = self.mail_account_handler.handle_message(message, rule)
-
+        result = self.mail_account_handler._handle_message(message, rule)
         self.assertEqual(result, 1)
-        self.assertEqual(self.async_task.call_count, 1)
-
-        args, kwargs = self.async_task.call_args
-        self.assertEqual(kwargs["override_filename"], "f2.pdf")
+        self.verify_queue_consumption_tasks_call_args(
+            [
+                [
+                    {"override_filename": "f2.pdf"},
+                ],
+            ],
+        )
 
     def test_handle_inline_files(self):
-        message = create_message(
+        message = self.create_message(
             attachments=[
                 _AttachmentDef(
                     filename="f1.pdf",
@@ -486,13 +485,19 @@ class TestMail(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
         )
         rule.save()
 
-        result = self.mail_account_handler.handle_message(message, rule)
-
+        result = self.mail_account_handler._handle_message(message, rule)
         self.assertEqual(result, 2)
-        self.assertEqual(self.async_task.call_count, 2)
+        self.verify_queue_consumption_tasks_call_args(
+            [
+                [
+                    {"override_filename": "f1.pdf"},
+                    {"override_filename": "f2.pdf"},
+                ],
+            ],
+        )
 
     def test_filename_filter(self):
-        message = create_message(
+        message = self.create_message(
             attachments=[
                 _AttachmentDef(filename="f1.pdf"),
                 _AttachmentDef(filename="f2.pdf"),
@@ -504,33 +509,33 @@ class TestMail(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
         )
 
         tests = [
-            ("*.pdf", ["f1.pdf", "f1.Pdf", "f2.pdf", "f3.pdf", "file.PDf"]),
+            ("*.pdf", ["f1.pdf", "f2.pdf", "f3.pdf", "file.PDf", "f1.Pdf"]),
             ("f1.pdf", ["f1.pdf", "f1.Pdf"]),
             ("f1", []),
-            ("*", ["f1.pdf", "f2.pdf", "f3.pdf", "f2.png", "f1.Pdf", "file.PDf"]),
+            ("*", ["f1.pdf", "f2.pdf", "f3.pdf", "f2.png", "file.PDf", "f1.Pdf"]),
             ("*.png", ["f2.png"]),
         ]
 
         for (pattern, matches) in tests:
-            matches.sort()
-            self.async_task.reset_mock()
-            account = MailAccount(name=str(uuid.uuid4()))
-            account.save()
-            rule = MailRule(
-                name=str(uuid.uuid4()),
-                assign_title_from=MailRule.TitleSource.FROM_FILENAME,
-                account=account,
-                filter_attachment_filename=pattern,
-            )
-            rule.save()
-
-            result = self.mail_account_handler.handle_message(message, rule)
-
-            self.assertEqual(result, len(matches), f"Error with pattern: {pattern}")
-            filenames = sorted(
-                a[1]["override_filename"] for a in self.async_task.call_args_list
-            )
-            self.assertListEqual(filenames, matches)
+            with self.subTest(msg=pattern):
+                print(f"PATTERN {pattern}")
+                self._queue_consumption_tasks_mock.reset_mock()
+                account = MailAccount(name=str(uuid.uuid4()))
+                account.save()
+                rule = MailRule(
+                    name=str(uuid.uuid4()),
+                    assign_title_from=MailRule.TitleSource.FROM_FILENAME,
+                    account=account,
+                    filter_attachment_filename=pattern,
+                )
+                rule.save()
+
+                self.mail_account_handler._handle_message(message, rule)
+                self.verify_queue_consumption_tasks_call_args(
+                    [
+                        [{"override_filename": m} for m in matches],
+                    ],
+                )
 
     def test_handle_mail_account_mark_read(self):
 
@@ -548,10 +553,11 @@ class TestMail(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
         )
 
         self.assertEqual(len(self.bogus_mailbox.messages), 3)
-        self.assertEqual(self.async_task.call_count, 0)
         self.assertEqual(len(self.bogus_mailbox.fetch("UNSEEN", False)), 2)
+
         self.mail_account_handler.handle_mail_account(account)
-        self.assertEqual(self.async_task.call_count, 2)
+        self.apply_mail_actions()
+
         self.assertEqual(len(self.bogus_mailbox.fetch("UNSEEN", False)), 0)
         self.assertEqual(len(self.bogus_mailbox.messages), 3)
 
@@ -571,10 +577,11 @@ class TestMail(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
             filter_subject="Invoice",
         )
 
-        self.assertEqual(self.async_task.call_count, 0)
         self.assertEqual(len(self.bogus_mailbox.messages), 3)
+
         self.mail_account_handler.handle_mail_account(account)
-        self.assertEqual(self.async_task.call_count, 2)
+        self.apply_mail_actions()
+
         self.assertEqual(len(self.bogus_mailbox.messages), 1)
 
     def test_handle_mail_account_flag(self):
@@ -593,10 +600,11 @@ class TestMail(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
         )
 
         self.assertEqual(len(self.bogus_mailbox.messages), 3)
-        self.assertEqual(self.async_task.call_count, 0)
         self.assertEqual(len(self.bogus_mailbox.fetch("UNFLAGGED", False)), 2)
+
         self.mail_account_handler.handle_mail_account(account)
-        self.assertEqual(self.async_task.call_count, 1)
+        self.apply_mail_actions()
+
         self.assertEqual(len(self.bogus_mailbox.fetch("UNFLAGGED", False)), 1)
         self.assertEqual(len(self.bogus_mailbox.messages), 3)
 
@@ -616,13 +624,12 @@ class TestMail(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
             filter_subject="Claim",
         )
 
-        self.assertEqual(self.async_task.call_count, 0)
         self.assertEqual(len(self.bogus_mailbox.messages), 3)
         self.assertEqual(len(self.bogus_mailbox.messages_spam), 0)
 
         self.mail_account_handler.handle_mail_account(account)
+        self.apply_mail_actions()
 
-        self.assertEqual(self.async_task.call_count, 1)
         self.assertEqual(len(self.bogus_mailbox.messages), 2)
         self.assertEqual(len(self.bogus_mailbox.messages_spam), 1)
 
@@ -642,12 +649,13 @@ class TestMail(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
         )
 
         self.assertEqual(len(self.bogus_mailbox.messages), 3)
-        self.assertEqual(self.async_task.call_count, 0)
         self.assertEqual(len(self.bogus_mailbox.fetch("UNKEYWORD processed", False)), 2)
+
         self.mail_account_handler.handle_mail_account(account)
-        self.assertEqual(self.async_task.call_count, 2)
-        self.assertEqual(len(self.bogus_mailbox.fetch("UNKEYWORD processed", False)), 0)
+        self.apply_mail_actions()
+
         self.assertEqual(len(self.bogus_mailbox.messages), 3)
+        self.assertEqual(len(self.bogus_mailbox.fetch("UNKEYWORD processed", False)), 0)
 
     def test_handle_mail_account_tag_gmail(self):
         self.bogus_mailbox._host = "imap.gmail.com"
@@ -668,11 +676,12 @@ class TestMail(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
         )
 
         self.assertEqual(len(self.bogus_mailbox.messages), 3)
-        self.assertEqual(self.async_task.call_count, 0)
         criteria = NOT(gmail_label="processed")
         self.assertEqual(len(self.bogus_mailbox.fetch(criteria, False)), 2)
+
         self.mail_account_handler.handle_mail_account(account)
-        self.assertEqual(self.async_task.call_count, 2)
+        self.apply_mail_actions()
+
         self.assertEqual(len(self.bogus_mailbox.fetch(criteria, False)), 0)
         self.assertEqual(len(self.bogus_mailbox.messages), 3)
 
@@ -702,10 +711,11 @@ class TestMail(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
         )
 
         self.assertEqual(len(self.bogus_mailbox.messages), 3)
-        self.assertEqual(self.async_task.call_count, 0)
         self.assertEqual(len(self.bogus_mailbox.fetch("UNFLAGGED", False)), 2)
+
         self.mail_account_handler.handle_mail_account(account)
-        self.assertEqual(self.async_task.call_count, 2)
+        self.apply_mail_actions()
+
         self.assertEqual(len(self.bogus_mailbox.fetch("UNFLAGGED", False)), 0)
         self.assertEqual(len(self.bogus_mailbox.messages), 3)
 
@@ -717,11 +727,11 @@ class TestMail(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
             password="wrong",
         )
 
-        with self.assertRaises(MailError) as context:
+        with self.assertRaisesRegex(
+            MailError,
+            "Error while authenticating account",
+        ) as context:
             self.mail_account_handler.handle_mail_account(account)
-            self.assertTrue(
-                str(context).startswith("Error while authenticating account"),
-            )
 
     def test_error_skip_account(self):
         _ = MailAccount.objects.create(
@@ -746,7 +756,8 @@ class TestMail(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
         )
 
         tasks.process_mail_accounts()
-        self.assertEqual(self.async_task.call_count, 1)
+        self.apply_mail_actions()
+
         self.assertEqual(len(self.bogus_mailbox.messages), 2)
         self.assertEqual(len(self.bogus_mailbox.messages_spam), 1)
 
@@ -777,7 +788,8 @@ class TestMail(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
         )
 
         self.mail_account_handler.handle_mail_account(account)
-        self.assertEqual(self.async_task.call_count, 1)
+        self.apply_mail_actions()
+
         self.assertEqual(len(self.bogus_mailbox.messages), 2)
         self.assertEqual(len(self.bogus_mailbox.messages_spam), 1)
 
@@ -812,7 +824,7 @@ class TestMail(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
         self.mail_account_handler.handle_mail_account(account)
 
         self.bogus_mailbox.folder.list.assert_called_once()
-        self.assertEqual(self.async_task.call_count, 0)
+        self.assertEqual(self._queue_consumption_tasks_mock.call_count, 0)
 
     def test_error_folder_set_error_listing(self):
         """
@@ -845,9 +857,9 @@ class TestMail(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
         self.mail_account_handler.handle_mail_account(account)
 
         self.bogus_mailbox.folder.list.assert_called_once()
-        self.assertEqual(self.async_task.call_count, 0)
+        self.assertEqual(self._queue_consumption_tasks_mock.call_count, 0)
 
-    @mock.patch("paperless_mail.mail.MailAccountHandler.get_correspondent")
+    @mock.patch("paperless_mail.mail.MailAccountHandler._get_correspondent")
     def test_error_skip_mail(self, m):
         def get_correspondent_fake(message, rule):
             if message.from_ == "amazon@amazon.de":
@@ -871,9 +883,10 @@ class TestMail(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
         )
 
         self.mail_account_handler.handle_mail_account(account)
+        self.apply_mail_actions()
 
         # test that we still consume mail even if some mails throw errors.
-        self.assertEqual(self.async_task.call_count, 2)
+        self.assertEqual(self._queue_consumption_tasks_mock.call_count, 2)
 
         # faulty mail still in inbox, untouched
         self.assertEqual(len(self.bogus_mailbox.messages), 1)
@@ -898,7 +911,7 @@ class TestMail(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
 
         self.mail_account_handler.handle_mail_account(account)
 
-        self.async_task.assert_called_once()
+        self._queue_consumption_tasks_mock.assert_called_once()
         args, kwargs = self.async_task.call_args
 
         c = Correspondent.objects.get(name="amazon@amazon.de")
@@ -925,50 +938,40 @@ class TestMail(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
             username="admin",
             password="secret",
         )
-        rule = MailRule.objects.create(
-            name="testrule3",
-            account=account,
-            action=MailRule.MailAction.DELETE,
-            filter_subject="Claim",
-        )
-
-        self.assertEqual(self.async_task.call_count, 0)
-
-        self.assertEqual(len(self.bogus_mailbox.messages), 3)
-        self.mail_account_handler.handle_mail_account(account)
-        self.assertEqual(len(self.bogus_mailbox.messages), 2)
-        self.assertEqual(self.async_task.call_count, 1)
-
-        self.reset_bogus_mailbox()
-
-        rule.filter_subject = None
-        rule.filter_body = "electronic"
-        rule.save()
-        self.assertEqual(len(self.bogus_mailbox.messages), 3)
-        self.mail_account_handler.handle_mail_account(account)
-        self.assertEqual(len(self.bogus_mailbox.messages), 2)
-        self.assertEqual(self.async_task.call_count, 2)
 
-        self.reset_bogus_mailbox()
-
-        rule.filter_from = "amazon"
-        rule.filter_body = None
-        rule.save()
-        self.assertEqual(len(self.bogus_mailbox.messages), 3)
-        self.mail_account_handler.handle_mail_account(account)
-        self.assertEqual(len(self.bogus_mailbox.messages), 1)
-        self.assertEqual(self.async_task.call_count, 4)
-
-        self.reset_bogus_mailbox()
-
-        rule.filter_from = "amazon"
-        rule.filter_body = "cables"
-        rule.filter_subject = "Invoice"
-        rule.save()
-        self.assertEqual(len(self.bogus_mailbox.messages), 3)
-        self.mail_account_handler.handle_mail_account(account)
-        self.assertEqual(len(self.bogus_mailbox.messages), 2)
-        self.assertEqual(self.async_task.call_count, 5)
+        for (f_body, f_from, f_subject, expected_mail_count) in [
+            (None, None, "Claim", 1),
+            ("electronic", None, None, 1),
+            (None, "amazon", None, 2),
+            ("cables", "amazon", "Invoice", 1),
+        ]:
+            with self.subTest(f_body=f_body, f_from=f_from, f_subject=f_subject):
+                MailRule.objects.all().delete()
+                rule = MailRule.objects.create(
+                    name="testrule3",
+                    account=account,
+                    action=MailRule.MailAction.DELETE,
+                    filter_subject=f_subject,
+                    filter_body=f_body,
+                    filter_from=f_from,
+                )
+                self.reset_bogus_mailbox()
+                self._queue_consumption_tasks_mock.reset_mock()
+
+                self.assertEqual(self._queue_consumption_tasks_mock.call_count, 0)
+                self.assertEqual(len(self.bogus_mailbox.messages), 3)
+
+                self.mail_account_handler.handle_mail_account(account)
+                self.apply_mail_actions()
+
+                self.assertEqual(
+                    len(self.bogus_mailbox.messages),
+                    3 - expected_mail_count,
+                )
+                self.assertEqual(
+                    self._queue_consumption_tasks_mock.call_count,
+                    expected_mail_count,
+                )
 
     def test_auth_plain_fallback(self):
         """
@@ -992,12 +995,13 @@ class TestMail(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
         )
 
         self.assertEqual(len(self.bogus_mailbox.messages), 3)
-        self.assertEqual(self.async_task.call_count, 0)
+        self.assertEqual(self._queue_consumption_tasks_mock.call_count, 0)
         self.assertEqual(len(self.bogus_mailbox.fetch("UNSEEN", False)), 2)
 
         self.mail_account_handler.handle_mail_account(account)
+        self.apply_mail_actions()
 
-        self.assertEqual(self.async_task.call_count, 2)
+        self.assertEqual(self._queue_consumption_tasks_mock.call_count, 2)
         self.assertEqual(len(self.bogus_mailbox.fetch("UNSEEN", False)), 0)
         self.assertEqual(len(self.bogus_mailbox.messages), 3)
 
@@ -1030,6 +1034,34 @@ class TestMail(DirectoriesMixin, FileSystemAssertsMixin, TestCase):
             account,
         )
 
+    def verify_queue_consumption_tasks_call_args(self, params):
+
+        self.assertEqual(
+            len(self._queue_consumption_tasks_mock.call_args_list),
+            len(params),
+        )
+
+        for (args, kwargs), param in zip(
+            self._queue_consumption_tasks_mock.call_args_list,
+            params,
+        ):
+
+            consume_tasks = kwargs["consume_tasks"]
+
+            self.assertEqual(len(consume_tasks), len(param))
+
+            for consume_task, p in zip(consume_tasks, param):
+                self.assertIsFile(consume_task.kwargs["path"])
+                for key, value in p.items():
+                    self.assertIn(key, consume_task.kwargs)
+                    self.assertEqual(consume_task.kwargs[key], value)
+
+    def apply_mail_actions(self):
+        for args, kwargs in self._queue_consumption_tasks_mock.call_args_list:
+            message = kwargs["message"]
+            rule = kwargs["rule"]
+            apply_mail_action([], rule.pk, message.uid, message.subject, message.date)
+
 
 class TestManagementCommand(TestCase):
     @mock.patch(