]> git.ipfire.org Git - thirdparty/Python/cpython.git/commitdiff
#25446: Fix regression in smtplib's AUTH LOGIN support.
authorR David Murray <rdmurray@bitdance.com>
Sun, 8 Nov 2015 06:03:52 +0000 (01:03 -0500)
committerR David Murray <rdmurray@bitdance.com>
Sun, 8 Nov 2015 06:03:52 +0000 (01:03 -0500)
The auth method tests simply weren't adequate because of the fact that
smtpd doesn't support authentication.  I borrowed some of Milan's
code for that from issue #21935 and added it to the smtplib tests.
Also discovered that the direct test for the 'auth' method wasn't actually
testing anything and fixed it.

The fix makes the new authobject mechanism work the way it is
documented...the problem was that wasn't checking for a 334 return code
if an initial-response was provided, which works fine for auth plain
and cram-md5, but not for auth login.

Lib/smtplib.py
Lib/test/test_smtplib.py
Misc/NEWS

index 7e348757f3ec3b7c17cad38a25c72fdb83825aec..47569738f0d6a00a10a28886b730b8fbfa57c173 100755 (executable)
@@ -630,12 +630,12 @@ class SMTP:
             (code, resp) = self.docmd("AUTH", mechanism + " " + response)
         else:
             (code, resp) = self.docmd("AUTH", mechanism)
-            # Server replies with 334 (challenge) or 535 (not supported)
-            if code == 334:
-                challenge = base64.decodebytes(resp)
-                response = encode_base64(
-                    authobject(challenge).encode('ascii'), eol='')
-                (code, resp) = self.docmd(response)
+        # If server responds with a challenge, send the response.
+        if code == 334:
+            challenge = base64.decodebytes(resp)
+            response = encode_base64(
+                authobject(challenge).encode('ascii'), eol='')
+            (code, resp) = self.docmd(response)
         if code in (235, 503):
             return (code, resp)
         raise SMTPAuthenticationError(code, resp)
@@ -657,11 +657,10 @@ class SMTP:
     def auth_login(self, challenge=None):
         """ Authobject to use with LOGIN authentication. Requires self.user and
         self.password to be set."""
-        (code, resp) = self.docmd(
-            encode_base64(self.user.encode('ascii'), eol=''))
-        if code == 334:
+        if challenge is None:
+            return self.user
+        else:
             return self.password
-        raise SMTPAuthenticationError(code, resp)
 
     def login(self, user, password, *, initial_response_ok=True):
         """Log in on an SMTP server that requires authentication.
index 8e362414288463b85d4a6b454350fbd1cb8555f7..28539f360f5974ec2d65e0782d980b74a57d73af 100644 (file)
@@ -1,8 +1,10 @@
 import asyncore
+import base64
 import email.mime.text
 from email.message import EmailMessage
 from email.base64mime import body_encode as encode_base64
 import email.utils
+import hmac
 import socket
 import smtpd
 import smtplib
@@ -623,20 +625,12 @@ sim_users = {'Mr.A@somewhere.com':'John A',
 sim_auth = ('Mr.A@somewhere.com', 'somepassword')
 sim_cram_md5_challenge = ('PENCeUxFREJoU0NnbmhNWitOMjNGNn'
                           'dAZWx3b29kLmlubm9zb2Z0LmNvbT4=')
-sim_auth_credentials = {
-    'login': 'TXIuQUBzb21ld2hlcmUuY29t',
-    'plain': 'AE1yLkFAc29tZXdoZXJlLmNvbQBzb21lcGFzc3dvcmQ=',
-    'cram-md5': ('TXIUQUBZB21LD2HLCMUUY29TIDG4OWQ0MJ'
-                 'KWZGQ4ODNMNDA4NTGXMDRLZWMYZJDMODG1'),
-    }
-sim_auth_login_user = 'TXIUQUBZB21LD2HLCMUUY29T'
-sim_auth_plain = 'AE1YLKFAC29TZXDOZXJLLMNVBQBZB21LCGFZC3DVCMQ='
-
 sim_lists = {'list-1':['Mr.A@somewhere.com','Mrs.C@somewhereesle.com'],
              'list-2':['Ms.B@xn--fo-fka.com',],
             }
 
 # Simulated SMTP channel & server
+class ResponseException(Exception): pass
 class SimSMTPChannel(smtpd.SMTPChannel):
 
     quit_response = None
@@ -646,12 +640,109 @@ class SimSMTPChannel(smtpd.SMTPChannel):
     rcpt_count = 0
     rset_count = 0
     disconnect = 0
+    AUTH = 99    # Add protocol state to enable auth testing.
+    authenticated_user = None
 
     def __init__(self, extra_features, *args, **kw):
         self._extrafeatures = ''.join(
             [ "250-{0}\r\n".format(x) for x in extra_features ])
         super(SimSMTPChannel, self).__init__(*args, **kw)
 
+    # AUTH related stuff.  It would be nice if support for this were in smtpd.
+    def found_terminator(self):
+        if self.smtp_state == self.AUTH:
+            line = self._emptystring.join(self.received_lines)
+            print('Data:', repr(line), file=smtpd.DEBUGSTREAM)
+            self.received_lines = []
+            try:
+                self.auth_object(line)
+            except ResponseException as e:
+                self.smtp_state = self.COMMAND
+                self.push('%s %s' % (e.smtp_code, e.smtp_error))
+                return
+        super().found_terminator()
+
+
+    def smtp_AUTH(self, arg):
+        if not self.seen_greeting:
+            self.push('503 Error: send EHLO first')
+            return
+        if not self.extended_smtp or 'AUTH' not in self._extrafeatures:
+            self.push('500 Error: command "AUTH" not recognized')
+            return
+        if self.authenticated_user is not None:
+            self.push(
+                '503 Bad sequence of commands: already authenticated')
+            return
+        args = arg.split()
+        if len(args) not in [1, 2]:
+            self.push('501 Syntax: AUTH <mechanism> [initial-response]')
+            return
+        auth_object_name = '_auth_%s' % args[0].lower().replace('-', '_')
+        try:
+            self.auth_object = getattr(self, auth_object_name)
+        except AttributeError:
+            self.push('504 Command parameter not implemented: unsupported '
+                      ' authentication mechanism {!r}'.format(auth_object_name))
+            return
+        self.smtp_state = self.AUTH
+        self.auth_object(args[1] if len(args) == 2 else None)
+
+    def _authenticated(self, user, valid):
+        if valid:
+            self.authenticated_user = user
+            self.push('235 Authentication Succeeded')
+        else:
+            self.push('535 Authentication credentials invalid')
+        self.smtp_state = self.COMMAND
+
+    def _decode_base64(self, string):
+        return base64.decodebytes(string.encode('ascii')).decode('utf-8')
+
+    def _auth_plain(self, arg=None):
+        if arg is None:
+            self.push('334 ')
+        else:
+            logpass = self._decode_base64(arg)
+            try:
+                *_, user, password = logpass.split('\0')
+            except ValueError as e:
+                self.push('535 Splitting response {!r} into user and password'
+                          ' failed: {}'.format(logpass, e))
+                return
+            self._authenticated(user, password == sim_auth[1])
+
+    def _auth_login(self, arg=None):
+        if arg is None:
+            # base64 encoded 'Username:'
+            self.push('334 VXNlcm5hbWU6')
+        elif not hasattr(self, '_auth_login_user'):
+            self._auth_login_user = self._decode_base64(arg)
+            # base64 encoded 'Password:'
+            self.push('334 UGFzc3dvcmQ6')
+        else:
+            password = self._decode_base64(arg)
+            self._authenticated(self._auth_login_user, password == sim_auth[1])
+            del self._auth_login_user
+
+    def _auth_cram_md5(self, arg=None):
+        if arg is None:
+            self.push('334 {}'.format(sim_cram_md5_challenge))
+        else:
+            logpass = self._decode_base64(arg)
+            try:
+                user, hashed_pass = logpass.split()
+            except ValueError as e:
+                self.push('535 Splitting response {!r} into user and password'
+                          'failed: {}'.format(logpass, e))
+                return False
+            valid_hashed_pass = hmac.HMAC(
+                sim_auth[1].encode('ascii'),
+                self._decode_base64(sim_cram_md5_challenge).encode('ascii'),
+                'md5').hexdigest()
+            self._authenticated(user, hashed_pass == valid_hashed_pass)
+    # end AUTH related stuff.
+
     def smtp_EHLO(self, arg):
         resp = ('250-testhost\r\n'
                 '250-EXPN\r\n'
@@ -683,20 +774,6 @@ class SimSMTPChannel(smtpd.SMTPChannel):
         else:
             self.push('550 No access for you!')
 
-    def smtp_AUTH(self, arg):
-        mech = arg.strip().lower()
-        if mech=='cram-md5':
-            self.push('334 {}'.format(sim_cram_md5_challenge))
-        elif mech not in sim_auth_credentials:
-            self.push('504 auth type unimplemented')
-            return
-        elif mech=='plain':
-            self.push('334 ')
-        elif mech=='login':
-            self.push('334 ')
-        else:
-            self.push('550 No access for you!')
-
     def smtp_QUIT(self, arg):
         if self.quit_response is None:
             super(SimSMTPChannel, self).smtp_QUIT(arg)
@@ -841,63 +918,49 @@ class SMTPSimTests(unittest.TestCase):
         self.assertEqual(smtp.expn(u), expected_unknown)
         smtp.quit()
 
-    # SimSMTPChannel doesn't fully support AUTH because it requires a
-    # synchronous read to obtain the credentials...so instead smtpd
-    # sees the credential sent by smtplib's login method as an unknown command,
-    # which results in smtplib raising an auth error.  Fortunately the error
-    # message contains the encoded credential, so we can partially check that it
-    # was generated correctly (partially, because the 'word' is uppercased in
-    # the error message).
-
     def testAUTH_PLAIN(self):
         self.serv.add_feature("AUTH PLAIN")
         smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
-        try: smtp.login(sim_auth[0], sim_auth[1], initial_response_ok=False)
-        except smtplib.SMTPAuthenticationError as err:
-            self.assertIn(sim_auth_plain, str(err))
+        resp = smtp.login(sim_auth[0], sim_auth[1])
+        self.assertEqual(resp, (235, b'Authentication Succeeded'))
         smtp.close()
 
     def testAUTH_LOGIN(self):
         self.serv.add_feature("AUTH LOGIN")
         smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
-        try: smtp.login(sim_auth[0], sim_auth[1])
-        except smtplib.SMTPAuthenticationError as err:
-            self.assertIn(sim_auth_login_user, str(err))
+        resp = smtp.login(sim_auth[0], sim_auth[1])
+        self.assertEqual(resp, (235, b'Authentication Succeeded'))
         smtp.close()
 
     def testAUTH_CRAM_MD5(self):
         self.serv.add_feature("AUTH CRAM-MD5")
         smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
-
-        try: smtp.login(sim_auth[0], sim_auth[1])
-        except smtplib.SMTPAuthenticationError as err:
-            self.assertIn(sim_auth_credentials['cram-md5'], str(err))
+        resp = smtp.login(sim_auth[0], sim_auth[1])
+        self.assertEqual(resp, (235, b'Authentication Succeeded'))
         smtp.close()
 
     def testAUTH_multiple(self):
         # Test that multiple authentication methods are tried.
         self.serv.add_feature("AUTH BOGUS PLAIN LOGIN CRAM-MD5")
         smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
-        try: smtp.login(sim_auth[0], sim_auth[1])
-        except smtplib.SMTPAuthenticationError as err:
-            self.assertIn(sim_auth_login_user, str(err))
+        resp = smtp.login(sim_auth[0], sim_auth[1])
+        self.assertEqual(resp, (235, b'Authentication Succeeded'))
         smtp.close()
 
     def test_auth_function(self):
-        smtp = smtplib.SMTP(HOST, self.port,
-                            local_hostname='localhost', timeout=15)
-        self.serv.add_feature("AUTH CRAM-MD5")
-        smtp.user, smtp.password = sim_auth[0], sim_auth[1]
-        supported = {'CRAM-MD5': smtp.auth_cram_md5,
-                     'PLAIN': smtp.auth_plain,
-                     'LOGIN': smtp.auth_login,
-                    }
-        for mechanism, method in supported.items():
-            try: smtp.auth(mechanism, method, initial_response_ok=False)
-            except smtplib.SMTPAuthenticationError as err:
-                self.assertIn(sim_auth_credentials[mechanism.lower()].upper(),
-                              str(err))
-        smtp.close()
+        supported = {'CRAM-MD5', 'PLAIN', 'LOGIN'}
+        for mechanism in supported:
+            self.serv.add_feature("AUTH {}".format(mechanism))
+        for mechanism in supported:
+            with self.subTest(mechanism=mechanism):
+                smtp = smtplib.SMTP(HOST, self.port,
+                                    local_hostname='localhost', timeout=15)
+                smtp.ehlo('foo')
+                smtp.user, smtp.password = sim_auth[0], sim_auth[1]
+                method = 'auth_' + mechanism.lower().replace('-', '_')
+                resp = smtp.auth(mechanism, getattr(smtp, method))
+                self.assertEqual(resp, (235, b'Authentication Succeeded'))
+                smtp.close()
 
     def test_quit_resets_greeting(self):
         smtp = smtplib.SMTP(HOST, self.port,
index f08b4ce8430adf481d42e32c1812d5aff483d279..fef42a0ed4fb7cc0dc74eb3e6f3468d1266faebc 100644 (file)
--- a/Misc/NEWS
+++ b/Misc/NEWS
@@ -58,6 +58,8 @@ Core and Builtins
 Library
 -------
 
+- Issue #25446: Fix regression in smtplib's AUTH LOGIN support.
+
 - Issue #18010: Fix the pydoc web server's module search function to handle
   exceptions from importing packages.