class BogusClient:
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ pass
+
+ def __init__(self, messages):
+ self.messages: List[MailMessage] = messages
+
def authenticate(self, mechanism, authobject):
# authobject must be a callable object
auth_bytes = authobject(None)
if auth_bytes != b"\x00admin\x00w57\xc3\xa4\xc3\xb6\xc3\xbcw4b6huwb6nhu":
raise MailboxLoginError("BAD", "OK")
+ def uid(self, command, *args):
+ if command == "STORE":
+ for message in self.messages:
+ if message.uid == args[0]:
+ flag = args[2]
+ if flag == "processed":
+ message._raw_flag_data.append(f"+FLAGS (processed)".encode())
+ MailMessage.flags.fget.cache_clear()
+
class BogusMailBox(ContextManager):
def __enter__(self):
self.messages: List[MailMessage] = []
self.messages_spam: List[MailMessage] = []
self.folder = BogusFolderManager()
- self.client = BogusClient()
+ self.client = BogusClient(self.messages)
self._host = ""
+ def updateClient(self):
+ self.client = BogusClient(self.messages)
+
def login(self, username, password):
# This will raise a UnicodeEncodeError if the password is not ASCII only
password.encode("ascii")
seen=False,
),
)
+ self.bogus_mailbox.updateClient()
def test_get_correspondent(self):
message = namedtuple("MailMessage", [])
self.assertEqual(len(self.bogus_mailbox.fetch("UNKEYWORD processed", False)), 0)
self.assertEqual(len(self.bogus_mailbox.messages), 3)
+ def test_handle_mail_mail_account_tag_gmail(self):
+ self.bogus_mailbox._host = "imap.gmail.com"
+
+ account = MailAccount.objects.create(
+ name="test",
+ imap_server="",
+ username="admin",
+ password="secret",
+ )
+
+ _ = MailRule.objects.create(
+ name="testrule",
+ account=account,
+ action=MailRule.MailAction.TAG,
+ action_parameter="processed",
+ )
+
+ self.assertEqual(len(self.bogus_mailbox.messages), 3)
+ self.assertEqual(self.async_task.call_count, 0)
+ self.assertEqual(len(self.bogus_mailbox.fetch("UNKEYWORD processed", False)), 2)
+ self.mail_account_handler.handle_mail_account(account)
+ self.assertEqual(self.async_task.call_count, 2)
+ self.assertEqual(len(self.bogus_mailbox.fetch("UNKEYWORD processed", False)), 0)
+ self.assertEqual(len(self.bogus_mailbox.messages), 3)
+
def test_error_login(self):
account = MailAccount.objects.create(
name="test",