This protects against the BREACH attack.
def test_refresh_token(self):
token = self.xsrf_token
+ tokens_seen = set([token])
# A user's token is stable over time. Refreshing the page in one tab
# might update the cookie while an older tab still has the old cookie
# in its DOM. Simulate this scenario by passing a constant token
# in the body and re-querying for the token.
for i in range(5):
token = self.get_token(token)
- # Implementation detail: the same token is returned each time
- self.assertEqual(token, self.xsrf_token)
+ # Tokens are encoded uniquely each time
+ tokens_seen.add(token)
response = self.fetch(
"/", method="POST",
body=urllib_parse.urlencode(dict(_xsrf=self.xsrf_token)),
headers=self.cookie_headers(token))
self.assertEqual(response.code, 200)
+ self.assertEqual(len(tokens_seen), 6)
try:
import tornado.websocket
+ from tornado.util import _websocket_mask_python
except ImportError:
# The unittest module presents misleading errors on ImportError
# (it acts as if websocket_test could not be found, hiding the underlying
traceback.print_exc()
raise
-from tornado.websocket import WebSocketHandler, websocket_connect, WebSocketError, _websocket_mask_python
+from tornado.websocket import WebSocketHandler, websocket_connect, WebSocketError
try:
from tornado import speedups
from __future__ import absolute_import, division, print_function, with_statement
+import array
import inspect
+import os
import sys
import zlib
+try:
+ xrange # py2
+except NameError:
+ xrange = range # py3
+
+
class ObjectDict(dict):
"""Makes a dictionary behave like an object, with attribute-style access.
"""
return old_value, args, kwargs
+def _websocket_mask_python(mask, data):
+ """Websocket masking function.
+
+ `mask` is a `bytes` object of length 4; `data` is a `bytes` object of any length.
+ Returns a `bytes` object of the same length as `data` with the mask applied
+ as specified in section 5.3 of RFC 6455.
+
+ This pure-python implementation may be replaced by an optimized version when available.
+ """
+ mask = array.array("B", mask)
+ unmasked = array.array("B", data)
+ for i in xrange(len(data)):
+ unmasked[i] = unmasked[i] ^ mask[i % 4]
+ if hasattr(unmasked, 'tobytes'):
+ # tostring was deprecated in py32. It hasn't been removed,
+ # but since we turn on deprecation warnings in our tests
+ # we need to use the right one.
+ return unmasked.tobytes()
+ else:
+ return unmasked.tostring()
+
+if (os.environ.get('TORNADO_NO_EXTENSION') or
+ os.environ.get('TORNADO_EXTENSION') == '0'):
+ # These environment variables exist to make it easier to do performance
+ # comparisons; they are not guaranteed to remain supported in the future.
+ _websocket_mask = _websocket_mask_python
+else:
+ try:
+ from tornado.speedups import websocket_mask as _websocket_mask
+ except ImportError:
+ if os.environ.get('TORNADO_EXTENSION') == '1':
+ raise
+ _websocket_mask = _websocket_mask_python
+
+
def doctests():
import doctest
return doctest.DocTestSuite()
import tornado
import traceback
import types
-import uuid
from tornado.concurrent import Future
from tornado import escape
from tornado import stack_context
from tornado import template
from tornado.escape import utf8, _unicode
-from tornado.util import bytes_type, import_object, ObjectDict, raise_exc_info, unicode_type
+from tornado.util import bytes_type, import_object, ObjectDict, raise_exc_info, unicode_type, _websocket_mask
try:
from io import BytesIO # python 3
See http://en.wikipedia.org/wiki/Cross-site_request_forgery
"""
if not hasattr(self, "_xsrf_token"):
- token = self.get_cookie("_xsrf")
- if not token:
- token = binascii.b2a_hex(os.urandom(16))
+ version, token, timestamp = self._get_raw_xsrf_token()
+ mask = os.urandom(4)
+ self._xsrf_token = b"|".join([
+ b"2",
+ binascii.b2a_hex(mask),
+ binascii.b2a_hex(_websocket_mask(mask, token)),
+ utf8(str(int(timestamp)))])
+ if version is None or version != 2:
expires_days = 30 if self.current_user else None
- self.set_cookie("_xsrf", token, expires_days=expires_days)
- self._xsrf_token = token
+ self.set_cookie("_xsrf", self._xsrf_token,
+ expires_days=expires_days)
return self._xsrf_token
+ def _get_raw_xsrf_token(self):
+ if not hasattr(self, '_raw_xsrf_token'):
+ cookie = self.get_cookie("_xsrf")
+ if cookie:
+ version, token, timestamp = self._decode_xsrf_token(cookie)
+ else:
+ version, token, timestamp = None, None, None
+ if token is None:
+ version = None
+ token = os.urandom(16)
+ timestamp = time.time()
+ self._raw_xsrf_token = (version, token, timestamp)
+ return self._raw_xsrf_token
+
+ def _decode_xsrf_token(self, cookie):
+ m = _signed_value_version_re.match(utf8(cookie))
+ if m:
+ version = int(m.group(1))
+ if version == 2:
+ _, mask, masked_token, timestamp = cookie.split("|")
+ mask = binascii.a2b_hex(utf8(mask))
+ token = _websocket_mask(
+ mask, binascii.a2b_hex(utf8(masked_token)))
+ timestamp = int(timestamp)
+ return version, token, timestamp
+ else:
+ # Treat unknown versions as not present instead of failing.
+ return None, None, None
+ elif len(cookie) == 32:
+ version = 1
+ token = binascii.a2b_hex(cookie)
+ # We don't have a usable timestamp in older versions.
+ timestamp = int(time.time())
+ return (version, token, timestamp)
+ else:
+ return None, None, None
+
def check_xsrf_cookie(self):
"""Verifies that the ``_xsrf`` cookie matches the ``_xsrf`` argument.
self.request.headers.get("X-Csrftoken"))
if not token:
raise HTTPError(403, "'_xsrf' argument missing from POST")
- if not _time_independent_equals(utf8(self.xsrf_token), utf8(token)):
+ _, token, _ = self._decode_xsrf_token(token)
+ _, expected_token, _ = self._get_raw_xsrf_token()
+ if not _time_independent_equals(utf8(token), utf8(expected_token)):
raise HTTPError(403, "XSRF cookie does not match POST argument")
def xsrf_form_html(self):
from __future__ import absolute_import, division, print_function, with_statement
# Author: Jacob Kristhammar, 2010
-import array
import base64
import collections
import functools
from tornado.log import gen_log, app_log
from tornado.netutil import Resolver
from tornado import simple_httpclient
-from tornado.util import bytes_type, unicode_type
-
-try:
- xrange # py2
-except NameError:
- xrange = range # py3
+from tornado.util import bytes_type, unicode_type, _websocket_mask
class WebSocketError(Exception):
if callback is not None:
io_loop.add_future(conn.connect_future, callback)
return conn.connect_future
-
-
-def _websocket_mask_python(mask, data):
- """Websocket masking function.
-
- `mask` is a `bytes` object of length 4; `data` is a `bytes` object of any length.
- Returns a `bytes` object of the same length as `data` with the mask applied
- as specified in section 5.3 of RFC 6455.
-
- This pure-python implementation may be replaced by an optimized version when available.
- """
- mask = array.array("B", mask)
- unmasked = array.array("B", data)
- for i in xrange(len(data)):
- unmasked[i] = unmasked[i] ^ mask[i % 4]
- if hasattr(unmasked, 'tobytes'):
- # tostring was deprecated in py32. It hasn't been removed,
- # but since we turn on deprecation warnings in our tests
- # we need to use the right one.
- return unmasked.tobytes()
- else:
- return unmasked.tostring()
-
-if (os.environ.get('TORNADO_NO_EXTENSION') or
- os.environ.get('TORNADO_EXTENSION') == '0'):
- # These environment variables exist to make it easier to do performance
- # comparisons; they are not guaranteed to remain supported in the future.
- _websocket_mask = _websocket_mask_python
-else:
- try:
- from tornado.speedups import websocket_mask as _websocket_mask
- except ImportError:
- if os.environ.get('TORNADO_EXTENSION') == '1':
- raise
- _websocket_mask = _websocket_mask_python