try:
- mailbox.login(account.username, account.password)
+ if account.is_token:
+ mailbox.xoauth2(account.username, account.password)
+ else:
+ try:
+ _ = account.password.encode("ascii")
+ use_ascii_login = True
+ except UnicodeEncodeError:
+ use_ascii_login = False
- except UnicodeEncodeError:
- logger.debug("Falling back to AUTH=PLAIN")
+ if use_ascii_login:
+ mailbox.login(account.username, account.password)
+ else:
+ logger.debug("Falling back to AUTH=PLAIN")
+ mailbox.login_utf8(account.username, account.password)
- try:
- mailbox.login_utf8(account.username, account.password)
- except Exception as e:
- logger.error(
- "Unable to authenticate with mail server using AUTH=PLAIN",
- )
- raise MailError(
- f"Error while authenticating account {account}",
- ) from e
except Exception as e:
logger.error(
f"Error while authenticating account {account}: {e}",
ASCII_PASSWORD: str = "secret"
# Note the non-ascii characters here
UTF_PASSWORD: str = "w57äöüw4b6huwb6nhu"
+ # A dummy access token
+ ACCESS_TOKEN = "ea7e075cd3acf2c54c48e600398d5d5a"
def __init__(self):
self.messages: List[MailMessage] = []
if username != self.USERNAME or password != self.UTF_PASSWORD:
raise MailboxLoginError("BAD", "OK")
+ def xoauth2(self, username: str, access_token: str):
+ if username != self.USERNAME or access_token != self.ACCESS_TOKEN:
+ raise MailboxLoginError("BAD", "OK")
+
def fetch(self, criteria, mark_seen, charset=""):
msg = self.messages
self.assertEqual(len(self.bogus_mailbox.messages), 3)
def test_error_login(self):
+ """
+ GIVEN:
+ - Account configured with incorrect password
+ WHEN:
+ - Account tried to login
+ THEN:
+ - MailError with correct message raised
+ """
account = MailAccount.objects.create(
name="test",
imap_server="",
"""
GIVEN:
- Mail account with password containing non-ASCII characters
+ WHEN:
+ - Mail account is handled
THEN:
- Should still authenticate to the mail account
"""
GIVEN:
- Mail account with password containing non-ASCII characters
- Incorrect password value
+ WHEN:
+ - Mail account is handled
THEN:
- Should raise a MailError for the account
"""
account,
)
+ def test_auth_with_valid_token(self):
+ """
+ GIVEN:
+ - Mail account configured with access token
+ WHEN:
+ - Mail account is handled
+ THEN:
+ - Should still authenticate to the mail account
+ """
+ account = MailAccount.objects.create(
+ name="test",
+ imap_server="",
+ username=BogusMailBox.USERNAME,
+ # Note the non-ascii characters here
+ password=BogusMailBox.ACCESS_TOKEN,
+ is_token=True,
+ )
+
+ _ = MailRule.objects.create(
+ name="testrule",
+ account=account,
+ action=MailRule.MailAction.MARK_READ,
+ )
+
+ self.assertEqual(len(self.bogus_mailbox.messages), 3)
+ self.assertEqual(self._queue_consumption_tasks_mock.call_count, 0)
+ self.assertEqual(len(self.bogus_mailbox.fetch("UNSEEN", False)), 2)
+
+ self.mail_account_handler.handle_mail_account(account)
+ self.apply_mail_actions()
+
+ self.assertEqual(self._queue_consumption_tasks_mock.call_count, 2)
+ self.assertEqual(len(self.bogus_mailbox.fetch("UNSEEN", False)), 0)
+ self.assertEqual(len(self.bogus_mailbox.messages), 3)
+
def assert_queue_consumption_tasks_call_args(self, expected_call_args: List):
"""
Verifies that queue_consumption_tasks has been called with the expected arguments.