]> git.ipfire.org Git - thirdparty/tornado.git/commitdiff
Add a v2 secure cookie format.
authorBen Darnell <ben@bendarnell.com>
Mon, 5 May 2014 02:59:12 +0000 (22:59 -0400)
committerBen Darnell <ben@bendarnell.com>
Tue, 6 May 2014 01:58:06 +0000 (21:58 -0400)
This format fixes some weaknesses in the original format that would allow
characters to be shifted from the "value" field to the "name" or "timestamp"
fields.  It also upgrades the signature from HMAC-SHA1 to HMAC-SHA256,
adds an explicit version field, and adds an as-yet-unused field to
support key rotation in the future.

tornado/test/web_test.py
tornado/web.py

index 5029cd526039341542cf0a0013978a44ae18f70b..44b323da51aa673edf452b043bce55711deae1be 100644 (file)
@@ -9,7 +9,7 @@ from tornado.template import DictLoader
 from tornado.testing import AsyncHTTPTestCase, ExpectLog
 from tornado.test.util import unittest
 from tornado.util import u, bytes_type, ObjectDict, unicode_type
-from tornado.web import RequestHandler, authenticated, Application, asynchronous, url, HTTPError, StaticFileHandler, _create_signature, create_signed_value, ErrorHandler, UIModule, MissingArgumentError
+from tornado.web import RequestHandler, authenticated, Application, asynchronous, url, HTTPError, StaticFileHandler, _create_signature_v1, create_signed_value, decode_signed_value, ErrorHandler, UIModule, MissingArgumentError
 
 import binascii
 import datetime
@@ -80,30 +80,33 @@ class CookieTestRequestHandler(RequestHandler):
         self._cookies[name] = value
 
 
-class SecureCookieTest(unittest.TestCase):
+# See SignedValueTest below for more.
+class SecureCookieV1Test(unittest.TestCase):
     def test_round_trip(self):
         handler = CookieTestRequestHandler()
-        handler.set_secure_cookie('foo', b'bar')
-        self.assertEqual(handler.get_secure_cookie('foo'), b'bar')
+        handler.set_secure_cookie('foo', b'bar', version=1)
+        self.assertEqual(handler.get_secure_cookie('foo', min_version=1),
+                         b'bar')
 
     def test_cookie_tampering_future_timestamp(self):
         handler = CookieTestRequestHandler()
         # this string base64-encodes to '12345678'
-        handler.set_secure_cookie('foo', binascii.a2b_hex(b'd76df8e7aefc'))
+        handler.set_secure_cookie('foo', binascii.a2b_hex(b'd76df8e7aefc'),
+                                  version=1)
         cookie = handler._cookies['foo']
         match = re.match(br'12345678\|([0-9]+)\|([0-9a-f]+)', cookie)
         self.assertTrue(match)
         timestamp = match.group(1)
         sig = match.group(2)
         self.assertEqual(
-            _create_signature(handler.application.settings["cookie_secret"],
+            _create_signature_v1(handler.application.settings["cookie_secret"],
                               'foo', '12345678', timestamp),
             sig)
         # shifting digits from payload to timestamp doesn't alter signature
         # (this is not desirable behavior, just confirming that that's how it
         # works)
         self.assertEqual(
-            _create_signature(handler.application.settings["cookie_secret"],
+            _create_signature_v1(handler.application.settings["cookie_secret"],
                               'foo', '1234', b'5678' + timestamp),
             sig)
         # tamper with the cookie
@@ -111,14 +114,15 @@ class SecureCookieTest(unittest.TestCase):
             to_basestring(timestamp), to_basestring(sig)))
         # it gets rejected
         with ExpectLog(gen_log, "Cookie timestamp in future"):
-            self.assertTrue(handler.get_secure_cookie('foo') is None)
+            self.assertTrue(
+                handler.get_secure_cookie('foo', min_version=1) is None)
 
     def test_arbitrary_bytes(self):
         # Secure cookies accept arbitrary data (which is base64 encoded).
         # Note that normal cookies accept only a subset of ascii.
         handler = CookieTestRequestHandler()
-        handler.set_secure_cookie('foo', b'\xe9')
-        self.assertEqual(handler.get_secure_cookie('foo'), b'\xe9')
+        handler.set_secure_cookie('foo', b'\xe9', version=1)
+        self.assertEqual(handler.get_secure_cookie('foo', min_version=1), b'\xe9')
 
 
 class CookieTest(WebTestCase):
@@ -1793,3 +1797,110 @@ class HandlerByNameTest(WebTestCase):
         self.assertEqual(resp.body, b'hello')
         resp = self.fetch('/hello3')
         self.assertEqual(resp.body, b'hello')
+
+
+class SignedValueTest(unittest.TestCase):
+    SECRET = "It's a secret to everybody"
+
+    def past(self):
+        return self.present() - 86400 * 32
+
+    def present(self):
+        return 1300000000
+
+    def test_known_values(self):
+        signed_v1 = create_signed_value(SignedValueTest.SECRET, "key", "value",
+                                        version=1, clock=self.present)
+        self.assertEqual(
+            signed_v1,
+            b"dmFsdWU=|1300000000|31c934969f53e48164c50768b40cbd7e2daaaa4f")
+
+        signed_v2 = create_signed_value(SignedValueTest.SECRET, "key", "value",
+                                        version=2, clock=self.present)
+        self.assertEqual(
+            signed_v2,
+            b"2|1:0|10:1300000000|3:key|8:dmFsdWU=|"
+            b"3d4e60b996ff9c5d5788e333a0cba6f238a22c6c0f94788870e1a9ecd482e152")
+
+        signed_default = create_signed_value(SignedValueTest.SECRET,
+                                             "key", "value", clock=self.present)
+        self.assertEqual(signed_default, signed_v2)
+
+        decoded_v1 = decode_signed_value(SignedValueTest.SECRET, "key",
+                                         signed_v1, min_version=1,
+                                         clock=self.present)
+        self.assertEqual(decoded_v1, b"value")
+
+        decoded_v2 = decode_signed_value(SignedValueTest.SECRET, "key",
+                                         signed_v2, min_version=2,
+                                         clock=self.present)
+        self.assertEqual(decoded_v2, b"value")
+
+    def test_name_swap(self):
+        signed1 = create_signed_value(SignedValueTest.SECRET, "key1", "value",
+                                      clock=self.present)
+        signed2 = create_signed_value(SignedValueTest.SECRET, "key2", "value",
+                                      clock=self.present)
+        # Try decoding each string with the other's "name"
+        decoded1 = decode_signed_value(SignedValueTest.SECRET, "key2", signed1,
+                                       clock=self.present)
+        self.assertIs(decoded1, None)
+        decoded2 = decode_signed_value(SignedValueTest.SECRET, "key1", signed2,
+                                       clock=self.present)
+        self.assertIs(decoded2, None)
+
+    def test_expired(self):
+        signed = create_signed_value(SignedValueTest.SECRET, "key1", "value",
+                                     clock=self.past)
+        decoded_past = decode_signed_value(SignedValueTest.SECRET, "key1",
+                                           signed, clock=self.past)
+        self.assertEqual(decoded_past, b"value")
+        decoded_present = decode_signed_value(SignedValueTest.SECRET, "key1",
+                                              signed, clock=self.present)
+        self.assertIs(decoded_present, None)
+
+    def test_payload_tampering(self):
+        # These cookies are variants of the one in test_known_values.
+        sig = "3d4e60b996ff9c5d5788e333a0cba6f238a22c6c0f94788870e1a9ecd482e152"
+        def validate(prefix):
+            return (b'value' ==
+                    decode_signed_value(SignedValueTest.SECRET, "key",
+                                        prefix + sig, clock=self.present))
+        self.assertTrue(validate("2|1:0|10:1300000000|3:key|8:dmFsdWU=|"))
+        # Change key version
+        self.assertFalse(validate("2|1:1|10:1300000000|3:key|8:dmFsdWU=|"))
+        # length mismatch (field too short)
+        self.assertFalse(validate("2|1:0|10:130000000|3:key|8:dmFsdWU=|"))
+        # length mismatch (field too long)
+        self.assertFalse(validate("2|1:0|10:1300000000|3:keey|8:dmFsdWU=|"))
+
+    def test_signature_tampering(self):
+        prefix = "2|1:0|10:1300000000|3:key|8:dmFsdWU=|"
+        def validate(sig):
+            return (b'value' ==
+                    decode_signed_value(SignedValueTest.SECRET, "key",
+                                        prefix + sig, clock=self.present))
+        self.assertTrue(validate(
+            "3d4e60b996ff9c5d5788e333a0cba6f238a22c6c0f94788870e1a9ecd482e152"))
+        # All zeros
+        self.assertFalse(validate("0" * 32))
+        # Change one character
+        self.assertFalse(validate(
+            "4d4e60b996ff9c5d5788e333a0cba6f238a22c6c0f94788870e1a9ecd482e152"))
+        # Change another character
+        self.assertFalse(validate(
+            "3d4e60b996ff9c5d5788e333a0cba6f238a22c6c0f94788870e1a9ecd482e153"))
+        # Truncate
+        self.assertFalse(validate(
+            "3d4e60b996ff9c5d5788e333a0cba6f238a22c6c0f94788870e1a9ecd482e15"))
+        # Lengthen
+        self.assertFalse(validate(
+            "3d4e60b996ff9c5d5788e333a0cba6f238a22c6c0f94788870e1a9ecd482e1538"))
+
+    def test_non_ascii(self):
+        value = b"\xe9"
+        signed = create_signed_value(SignedValueTest.SECRET, "key", value,
+                                     clock=self.present)
+        decoded = decode_signed_value(SignedValueTest.SECRET, "key", signed,
+                                      clock=self.present)
+        self.assertEqual(value, decoded)
index 5a3474f2db309d398033fde8d90884058d5ad514..ffd79b0338fdf4375a60f0f1215c9772ca8c3ae9 100644 (file)
@@ -2644,25 +2644,54 @@ else:
         return result == 0
 
 
-def create_signed_value(secret, name, value, version=None):
+def create_signed_value(secret, name, value, version=None, clock=None):
     if version is None:
-        version = 1
+        version = 2
+    if clock is None:
+        clock = time.time
+    timestamp = utf8(str(int(clock())))
+    value = base64.b64encode(utf8(value))
     if version == 1:
-        timestamp = utf8(str(int(time.time())))
-        value = base64.b64encode(utf8(value))
-        signature = _create_signature(secret, name, value, timestamp)
+        signature = _create_signature_v1(secret, name, value, timestamp)
         value = b"|".join([value, timestamp, signature])
         return value
+    elif version == 2:
+        # The v2 format consists of a version number and a series of
+        # length-prefixed fields "%d:%s", the last of which is a
+        # signature, all separated by pipes.  All numbers are in
+        # decimal format with no leading zeros.  The signature is an
+        # HMAC-SHA256 of the whole string up to that point, including
+        # the final pipe.
+        #
+        # The fields are:
+        # - format version (i.e. 2; no length prefix)
+        # - key version (currently 0; reserved for future key rotation features)
+        # - timestamp (integer seconds since epoch)
+        # - name (not encoded; assumed to be ~alphanumeric)
+        # - value (base64-encoded)
+        # - signature (hex-encoded; no length prefix)
+        def format_field(s):
+            return utf8("%d:" % len(s)) + utf8(s)
+        to_sign = b"|".join([
+            b"2|1:0",
+            format_field(timestamp),
+            format_field(name),
+            format_field(value),
+            b''])
+        signature = _create_signature_v2(secret, to_sign)
+        return to_sign + signature
     else:
         raise ValueError("Unsupported version %d" % version)
 
 # A leading version number in decimal with no leading zeros, followed by a pipe.
-_signed_value_version_re = re.compile(r"^([1-9][0-9]*)\|(.*)$")
+_signed_value_version_re = re.compile(br"^([1-9][0-9]*)\|(.*)$")
 
-def decode_signed_value(secret, name, value, max_age_days=31, min_version=None):
+def decode_signed_value(secret, name, value, max_age_days=31, clock=None,min_version=None):
+    if clock is None:
+        clock = time.time
     if min_version is None:
         min_version = 1
-    if min_version > 1:
+    if min_version > 2:
         raise ValueError("Unsupported min_version %d" % min_version)
     if not value:
         return None
@@ -2670,6 +2699,7 @@ def decode_signed_value(secret, name, value, max_age_days=31, min_version=None):
     # Figure out what version this is.  Version 1 did not include an
     # explicit version field and started with arbitrary base64 data,
     # which makes this tricky.
+    value = utf8(value)
     m = _signed_value_version_re.match(value)
     if m is None:
         version = 1
@@ -2688,26 +2718,27 @@ def decode_signed_value(secret, name, value, max_age_days=31, min_version=None):
             version = 1
 
     if version < min_version:
-        raise ValueError("Signed value version is too old (%d < %d)" %
-                         (version, min_version))
+        return None
     if version == 1:
-        return _decode_signed_value_v1(secret, name, value, max_age_days)
+        return _decode_signed_value_v1(secret, name, value, max_age_days, clock)
+    elif version == 2:
+        return _decode_signed_value_v2(secret, name, value, max_age_days, clock)
     else:
-        raise ValueError("Unsupported signed value version %r" % value)
+        return None
 
-def _decode_signed_value_v1(secret, name, value, max_age_days):
+def _decode_signed_value_v1(secret, name, value, max_age_days, clock):
     parts = utf8(value).split(b"|")
     if len(parts) != 3:
         return None
-    signature = _create_signature(secret, name, parts[0], parts[1])
+    signature = _create_signature_v1(secret, name, parts[0], parts[1])
     if not _time_independent_equals(parts[2], signature):
         gen_log.warning("Invalid cookie signature %r", value)
         return None
     timestamp = int(parts[1])
-    if timestamp < time.time() - max_age_days * 86400:
+    if timestamp < clock() - max_age_days * 86400:
         gen_log.warning("Expired cookie %r", value)
         return None
-    if timestamp > time.time() + 31 * 86400:
+    if timestamp > clock() + 31 * 86400:
         # _cookie_signature does not hash a delimiter between the
         # parts of the cookie, so an attacker could transfer trailing
         # digits from the payload to the timestamp without altering the
@@ -2724,8 +2755,49 @@ def _decode_signed_value_v1(secret, name, value, max_age_days):
         return None
 
 
-def _create_signature(secret, *parts):
+def _decode_signed_value_v2(secret, name, value, max_age_days, clock):
+    def _consume_field(s):
+        length, _, rest = s.partition(b':')
+        n = int(length)
+        field_value = rest[:n]
+        # In python 3, indexing bytes returns small integers; we must
+        # use a slice to get a byte string as in python 2.
+        if rest[n:n+1] != b'|':
+            raise ValueError("malformed v2 signed value field")
+        rest = rest[n+1:]
+        return field_value, rest
+    rest = value[2:]  # remove version number
+    try:
+        key_version, rest = _consume_field(rest)
+        timestamp, rest = _consume_field(rest)
+        name_field, rest = _consume_field(rest)
+        value_field, rest = _consume_field(rest)
+    except ValueError:
+        return None
+    passed_sig = rest
+    signed_string = value[:-len(passed_sig)]
+    expected_sig = _create_signature_v2(secret, signed_string)
+    if not _time_independent_equals(passed_sig, expected_sig):
+        return None
+    if name_field != utf8(name):
+        return None
+    timestamp = int(timestamp)
+    if timestamp < clock() - max_age_days * 86400:
+        # The signature has expired.
+        return None
+    try:
+        return base64.b64decode(value_field)
+    except Exception:
+        return None
+
+
+def _create_signature_v1(secret, *parts):
     hash = hmac.new(utf8(secret), digestmod=hashlib.sha1)
     for part in parts:
         hash.update(utf8(part))
     return utf8(hash.hexdigest())
+
+def _create_signature_v2(secret, s):
+    hash = hmac.new(utf8(secret), digestmod=hashlib.sha256)
+    hash.update(utf8(s))
+    return utf8(hash.hexdigest())