from tornado.testing import AsyncHTTPTestCase, ExpectLog
from tornado.test.util import unittest
from tornado.util import u, bytes_type, ObjectDict, unicode_type
-from tornado.web import RequestHandler, authenticated, Application, asynchronous, url, HTTPError, StaticFileHandler, _create_signature, create_signed_value, ErrorHandler, UIModule, MissingArgumentError
+from tornado.web import RequestHandler, authenticated, Application, asynchronous, url, HTTPError, StaticFileHandler, _create_signature_v1, create_signed_value, decode_signed_value, ErrorHandler, UIModule, MissingArgumentError
import binascii
import datetime
self._cookies[name] = value
-class SecureCookieTest(unittest.TestCase):
+# See SignedValueTest below for more.
+class SecureCookieV1Test(unittest.TestCase):
def test_round_trip(self):
handler = CookieTestRequestHandler()
- handler.set_secure_cookie('foo', b'bar')
- self.assertEqual(handler.get_secure_cookie('foo'), b'bar')
+ handler.set_secure_cookie('foo', b'bar', version=1)
+ self.assertEqual(handler.get_secure_cookie('foo', min_version=1),
+ b'bar')
def test_cookie_tampering_future_timestamp(self):
handler = CookieTestRequestHandler()
# this string base64-encodes to '12345678'
- handler.set_secure_cookie('foo', binascii.a2b_hex(b'd76df8e7aefc'))
+ handler.set_secure_cookie('foo', binascii.a2b_hex(b'd76df8e7aefc'),
+ version=1)
cookie = handler._cookies['foo']
match = re.match(br'12345678\|([0-9]+)\|([0-9a-f]+)', cookie)
self.assertTrue(match)
timestamp = match.group(1)
sig = match.group(2)
self.assertEqual(
- _create_signature(handler.application.settings["cookie_secret"],
+ _create_signature_v1(handler.application.settings["cookie_secret"],
'foo', '12345678', timestamp),
sig)
# shifting digits from payload to timestamp doesn't alter signature
# (this is not desirable behavior, just confirming that that's how it
# works)
self.assertEqual(
- _create_signature(handler.application.settings["cookie_secret"],
+ _create_signature_v1(handler.application.settings["cookie_secret"],
'foo', '1234', b'5678' + timestamp),
sig)
# tamper with the cookie
to_basestring(timestamp), to_basestring(sig)))
# it gets rejected
with ExpectLog(gen_log, "Cookie timestamp in future"):
- self.assertTrue(handler.get_secure_cookie('foo') is None)
+ self.assertTrue(
+ handler.get_secure_cookie('foo', min_version=1) is None)
def test_arbitrary_bytes(self):
# Secure cookies accept arbitrary data (which is base64 encoded).
# Note that normal cookies accept only a subset of ascii.
handler = CookieTestRequestHandler()
- handler.set_secure_cookie('foo', b'\xe9')
- self.assertEqual(handler.get_secure_cookie('foo'), b'\xe9')
+ handler.set_secure_cookie('foo', b'\xe9', version=1)
+ self.assertEqual(handler.get_secure_cookie('foo', min_version=1), b'\xe9')
class CookieTest(WebTestCase):
self.assertEqual(resp.body, b'hello')
resp = self.fetch('/hello3')
self.assertEqual(resp.body, b'hello')
+
+
+class SignedValueTest(unittest.TestCase):
+ SECRET = "It's a secret to everybody"
+
+ def past(self):
+ return self.present() - 86400 * 32
+
+ def present(self):
+ return 1300000000
+
+ def test_known_values(self):
+ signed_v1 = create_signed_value(SignedValueTest.SECRET, "key", "value",
+ version=1, clock=self.present)
+ self.assertEqual(
+ signed_v1,
+ b"dmFsdWU=|1300000000|31c934969f53e48164c50768b40cbd7e2daaaa4f")
+
+ signed_v2 = create_signed_value(SignedValueTest.SECRET, "key", "value",
+ version=2, clock=self.present)
+ self.assertEqual(
+ signed_v2,
+ b"2|1:0|10:1300000000|3:key|8:dmFsdWU=|"
+ b"3d4e60b996ff9c5d5788e333a0cba6f238a22c6c0f94788870e1a9ecd482e152")
+
+ signed_default = create_signed_value(SignedValueTest.SECRET,
+ "key", "value", clock=self.present)
+ self.assertEqual(signed_default, signed_v2)
+
+ decoded_v1 = decode_signed_value(SignedValueTest.SECRET, "key",
+ signed_v1, min_version=1,
+ clock=self.present)
+ self.assertEqual(decoded_v1, b"value")
+
+ decoded_v2 = decode_signed_value(SignedValueTest.SECRET, "key",
+ signed_v2, min_version=2,
+ clock=self.present)
+ self.assertEqual(decoded_v2, b"value")
+
+ def test_name_swap(self):
+ signed1 = create_signed_value(SignedValueTest.SECRET, "key1", "value",
+ clock=self.present)
+ signed2 = create_signed_value(SignedValueTest.SECRET, "key2", "value",
+ clock=self.present)
+ # Try decoding each string with the other's "name"
+ decoded1 = decode_signed_value(SignedValueTest.SECRET, "key2", signed1,
+ clock=self.present)
+ self.assertIs(decoded1, None)
+ decoded2 = decode_signed_value(SignedValueTest.SECRET, "key1", signed2,
+ clock=self.present)
+ self.assertIs(decoded2, None)
+
+ def test_expired(self):
+ signed = create_signed_value(SignedValueTest.SECRET, "key1", "value",
+ clock=self.past)
+ decoded_past = decode_signed_value(SignedValueTest.SECRET, "key1",
+ signed, clock=self.past)
+ self.assertEqual(decoded_past, b"value")
+ decoded_present = decode_signed_value(SignedValueTest.SECRET, "key1",
+ signed, clock=self.present)
+ self.assertIs(decoded_present, None)
+
+ def test_payload_tampering(self):
+ # These cookies are variants of the one in test_known_values.
+ sig = "3d4e60b996ff9c5d5788e333a0cba6f238a22c6c0f94788870e1a9ecd482e152"
+ def validate(prefix):
+ return (b'value' ==
+ decode_signed_value(SignedValueTest.SECRET, "key",
+ prefix + sig, clock=self.present))
+ self.assertTrue(validate("2|1:0|10:1300000000|3:key|8:dmFsdWU=|"))
+ # Change key version
+ self.assertFalse(validate("2|1:1|10:1300000000|3:key|8:dmFsdWU=|"))
+ # length mismatch (field too short)
+ self.assertFalse(validate("2|1:0|10:130000000|3:key|8:dmFsdWU=|"))
+ # length mismatch (field too long)
+ self.assertFalse(validate("2|1:0|10:1300000000|3:keey|8:dmFsdWU=|"))
+
+ def test_signature_tampering(self):
+ prefix = "2|1:0|10:1300000000|3:key|8:dmFsdWU=|"
+ def validate(sig):
+ return (b'value' ==
+ decode_signed_value(SignedValueTest.SECRET, "key",
+ prefix + sig, clock=self.present))
+ self.assertTrue(validate(
+ "3d4e60b996ff9c5d5788e333a0cba6f238a22c6c0f94788870e1a9ecd482e152"))
+ # All zeros
+ self.assertFalse(validate("0" * 32))
+ # Change one character
+ self.assertFalse(validate(
+ "4d4e60b996ff9c5d5788e333a0cba6f238a22c6c0f94788870e1a9ecd482e152"))
+ # Change another character
+ self.assertFalse(validate(
+ "3d4e60b996ff9c5d5788e333a0cba6f238a22c6c0f94788870e1a9ecd482e153"))
+ # Truncate
+ self.assertFalse(validate(
+ "3d4e60b996ff9c5d5788e333a0cba6f238a22c6c0f94788870e1a9ecd482e15"))
+ # Lengthen
+ self.assertFalse(validate(
+ "3d4e60b996ff9c5d5788e333a0cba6f238a22c6c0f94788870e1a9ecd482e1538"))
+
+ def test_non_ascii(self):
+ value = b"\xe9"
+ signed = create_signed_value(SignedValueTest.SECRET, "key", value,
+ clock=self.present)
+ decoded = decode_signed_value(SignedValueTest.SECRET, "key", signed,
+ clock=self.present)
+ self.assertEqual(value, decoded)
return result == 0
-def create_signed_value(secret, name, value, version=None):
+def create_signed_value(secret, name, value, version=None, clock=None):
if version is None:
- version = 1
+ version = 2
+ if clock is None:
+ clock = time.time
+ timestamp = utf8(str(int(clock())))
+ value = base64.b64encode(utf8(value))
if version == 1:
- timestamp = utf8(str(int(time.time())))
- value = base64.b64encode(utf8(value))
- signature = _create_signature(secret, name, value, timestamp)
+ signature = _create_signature_v1(secret, name, value, timestamp)
value = b"|".join([value, timestamp, signature])
return value
+ elif version == 2:
+ # The v2 format consists of a version number and a series of
+ # length-prefixed fields "%d:%s", the last of which is a
+ # signature, all separated by pipes. All numbers are in
+ # decimal format with no leading zeros. The signature is an
+ # HMAC-SHA256 of the whole string up to that point, including
+ # the final pipe.
+ #
+ # The fields are:
+ # - format version (i.e. 2; no length prefix)
+ # - key version (currently 0; reserved for future key rotation features)
+ # - timestamp (integer seconds since epoch)
+ # - name (not encoded; assumed to be ~alphanumeric)
+ # - value (base64-encoded)
+ # - signature (hex-encoded; no length prefix)
+ def format_field(s):
+ return utf8("%d:" % len(s)) + utf8(s)
+ to_sign = b"|".join([
+ b"2|1:0",
+ format_field(timestamp),
+ format_field(name),
+ format_field(value),
+ b''])
+ signature = _create_signature_v2(secret, to_sign)
+ return to_sign + signature
else:
raise ValueError("Unsupported version %d" % version)
# A leading version number in decimal with no leading zeros, followed by a pipe.
-_signed_value_version_re = re.compile(r"^([1-9][0-9]*)\|(.*)$")
+_signed_value_version_re = re.compile(br"^([1-9][0-9]*)\|(.*)$")
-def decode_signed_value(secret, name, value, max_age_days=31, min_version=None):
+def decode_signed_value(secret, name, value, max_age_days=31, clock=None,min_version=None):
+ if clock is None:
+ clock = time.time
if min_version is None:
min_version = 1
- if min_version > 1:
+ if min_version > 2:
raise ValueError("Unsupported min_version %d" % min_version)
if not value:
return None
# Figure out what version this is. Version 1 did not include an
# explicit version field and started with arbitrary base64 data,
# which makes this tricky.
+ value = utf8(value)
m = _signed_value_version_re.match(value)
if m is None:
version = 1
version = 1
if version < min_version:
- raise ValueError("Signed value version is too old (%d < %d)" %
- (version, min_version))
+ return None
if version == 1:
- return _decode_signed_value_v1(secret, name, value, max_age_days)
+ return _decode_signed_value_v1(secret, name, value, max_age_days, clock)
+ elif version == 2:
+ return _decode_signed_value_v2(secret, name, value, max_age_days, clock)
else:
- raise ValueError("Unsupported signed value version %r" % value)
+ return None
-def _decode_signed_value_v1(secret, name, value, max_age_days):
+def _decode_signed_value_v1(secret, name, value, max_age_days, clock):
parts = utf8(value).split(b"|")
if len(parts) != 3:
return None
- signature = _create_signature(secret, name, parts[0], parts[1])
+ signature = _create_signature_v1(secret, name, parts[0], parts[1])
if not _time_independent_equals(parts[2], signature):
gen_log.warning("Invalid cookie signature %r", value)
return None
timestamp = int(parts[1])
- if timestamp < time.time() - max_age_days * 86400:
+ if timestamp < clock() - max_age_days * 86400:
gen_log.warning("Expired cookie %r", value)
return None
- if timestamp > time.time() + 31 * 86400:
+ if timestamp > clock() + 31 * 86400:
# _cookie_signature does not hash a delimiter between the
# parts of the cookie, so an attacker could transfer trailing
# digits from the payload to the timestamp without altering the
return None
-def _create_signature(secret, *parts):
+def _decode_signed_value_v2(secret, name, value, max_age_days, clock):
+ def _consume_field(s):
+ length, _, rest = s.partition(b':')
+ n = int(length)
+ field_value = rest[:n]
+ # In python 3, indexing bytes returns small integers; we must
+ # use a slice to get a byte string as in python 2.
+ if rest[n:n+1] != b'|':
+ raise ValueError("malformed v2 signed value field")
+ rest = rest[n+1:]
+ return field_value, rest
+ rest = value[2:] # remove version number
+ try:
+ key_version, rest = _consume_field(rest)
+ timestamp, rest = _consume_field(rest)
+ name_field, rest = _consume_field(rest)
+ value_field, rest = _consume_field(rest)
+ except ValueError:
+ return None
+ passed_sig = rest
+ signed_string = value[:-len(passed_sig)]
+ expected_sig = _create_signature_v2(secret, signed_string)
+ if not _time_independent_equals(passed_sig, expected_sig):
+ return None
+ if name_field != utf8(name):
+ return None
+ timestamp = int(timestamp)
+ if timestamp < clock() - max_age_days * 86400:
+ # The signature has expired.
+ return None
+ try:
+ return base64.b64decode(value_field)
+ except Exception:
+ return None
+
+
+def _create_signature_v1(secret, *parts):
hash = hmac.new(utf8(secret), digestmod=hashlib.sha1)
for part in parts:
hash.update(utf8(part))
return utf8(hash.hexdigest())
+
+def _create_signature_v2(secret, s):
+ hash = hmac.new(utf8(secret), digestmod=hashlib.sha256)
+ hash.update(utf8(s))
+ return utf8(hash.hexdigest())