from tornado.httputil import url_concat
from tornado.util import bytes_type, b
+
class OpenIdMixin(object):
"""Abstract implementation of OpenID and Attribute Exchange.
See GoogleMixin below for example implementations.
"""
def authenticate_redirect(self, callback_uri=None,
- ax_attrs=["name","email","language","username"]):
+ ax_attrs=["name", "email", "language", "username"]):
"""Returns the authentication URL for this service.
After authentication, the service will redirect back to the given
args = dict((k, v[-1]) for k, v in self.request.arguments.iteritems())
args["openid.mode"] = u"check_authentication"
url = self._OPENID_ENDPOINT
- if http_client is None: http_client = httpclient.AsyncHTTPClient()
+ if http_client is None:
+ http_client = httpclient.AsyncHTTPClient()
http_client.fetch(url, self.async_callback(
self._on_authentication_verified, callback),
method="POST", body=urllib.urlencode(args))
self.get_argument(name) == u"http://openid.net/srv/ax/1.0":
ax_ns = name[10:]
break
+
def get_ax_arg(uri):
- if not ax_ns: return u""
+ if not ax_ns:
+ return u""
prefix = "openid." + ax_ns + ".type."
ax_name = None
for name in self.request.arguments.iterkeys():
part = name[len(prefix):]
ax_name = "openid." + ax_ns + ".value." + part
break
- if not ax_name: return u""
+ if not ax_name:
+ return u""
return self.get_argument(ax_name, u"")
email = get_ax_arg("http://axschema.org/contact/email")
user["name"] = u" ".join(name_parts)
elif email:
user["name"] = email.split("@")[0]
- if email: user["email"] = email
- if locale: user["locale"] = locale
- if username: user["username"] = username
+ if email:
+ user["email"] = email
+ if locale:
+ user["locale"] = locale
+ if username:
+ user["username"] = username
callback(user)
self._on_request_token, self._OAUTH_AUTHORIZE_URL,
callback_uri))
-
def get_authenticated_user(self, callback, http_client=None):
"""Gets the OAuth authorized user and access token on callback.
http_client.fetch(self._oauth_access_token_url(token),
self.async_callback(self._on_access_token, callback))
- def _oauth_request_token_url(self, callback_uri= None, extra_params=None):
+ def _oauth_request_token_url(self, callback_uri=None, extra_params=None):
consumer_token = self._oauth_consumer_token()
url = self._OAUTH_REQUEST_TOKEN_URL
args = dict(
if callback_uri:
args["oauth_callback"] = urlparse.urljoin(
self.request.full_url(), callback_uri)
- if extra_params: args.update(extra_params)
+ if extra_params:
+ args.update(extra_params)
signature = _oauth10a_signature(consumer_token, "GET", url, args)
else:
signature = _oauth_signature(consumer_token, "GET", url, args)
oauth_version=getattr(self, "_OAUTH_VERSION", "1.0a"),
)
if "verifier" in request_token:
- args["oauth_verifier"]=request_token["verifier"]
+ args["oauth_verifier"] = request_token["verifier"]
if getattr(self, "_OAUTH_VERSION", "1.0a") == "1.0a":
signature = _oauth10a_signature(consumer_token, "GET", url, args,
base_args["oauth_signature"] = signature
return base_args
+
class OAuth2Mixin(object):
"""Abstract implementation of OAuth v 2."""
def authorize_redirect(self, redirect_uri=None, client_id=None,
- client_secret=None, extra_params=None ):
+ client_secret=None, extra_params=None):
"""Redirects the user to obtain OAuth authorization for this service.
Some providers require that you register a Callback
"redirect_uri": redirect_uri,
"client_id": client_id
}
- if extra_params: args.update(extra_params)
+ if extra_params:
+ args.update(extra_params)
self.redirect(
url_concat(self._OAUTH_AUTHORIZE_URL, args))
- def _oauth_request_token_url(self, redirect_uri= None, client_id = None,
+ def _oauth_request_token_url(self, redirect_uri=None, client_id=None,
client_secret=None, code=None,
extra_params=None):
url = self._OAUTH_ACCESS_TOKEN_URL
client_id=client_id,
client_secret=client_secret,
)
- if extra_params: args.update(extra_params)
+ if extra_params:
+ args.update(extra_params)
return url_concat(url, args)
+
class TwitterMixin(OAuthMixin):
"""Twitter OAuth authentication.
_OAUTH_AUTHENTICATE_URL = "http://api.twitter.com/oauth/authenticate"
_OAUTH_NO_CALLBACKS = False
-
- def authenticate_redirect(self, callback_uri = None):
+ def authenticate_redirect(self, callback_uri=None):
"""Just like authorize_redirect(), but auto-redirects if authorized.
This is generally the right interface to use if you are using
Twitter for single-sign on.
"""
http = httpclient.AsyncHTTPClient()
- http.fetch(self._oauth_request_token_url(callback_uri = callback_uri), self.async_callback(
+ http.fetch(self._oauth_request_token_url(callback_uri=callback_uri), self.async_callback(
self._on_request_token, self._OAUTH_AUTHENTICATE_URL, None))
def twitter_request(self, path, callback, access_token=None,
oauth = self._oauth_request_parameters(
url, access_token, all_args, method=method)
args.update(oauth)
- if args: url += "?" + urllib.urlencode(args)
+ if args:
+ url += "?" + urllib.urlencode(args)
callback = self.async_callback(self._on_twitter_request, callback)
http = httpclient.AsyncHTTPClient()
if post_args is not None:
_OAUTH_NO_CALLBACKS = True
_OAUTH_VERSION = "1.0"
-
def friendfeed_request(self, path, callback, access_token=None,
post_args=None, **args):
"""Fetches the given relative API path, e.g., "/bret/friends"
oauth = self._oauth_request_parameters(
url, access_token, all_args, method=method)
args.update(oauth)
- if args: url += "?" + urllib.urlencode(args)
+ if args:
+ url += "?" + urllib.urlencode(args)
callback = self.async_callback(self._on_friendfeed_request, callback)
http = httpclient.AsyncHTTPClient()
if post_args is not None:
_OAUTH_ACCESS_TOKEN_URL = "https://www.google.com/accounts/OAuthGetAccessToken"
def authorize_redirect(self, oauth_scope, callback_uri=None,
- ax_attrs=["name","email","language","username"]):
+ ax_attrs=["name", "email", "language", "username"]):
"""Authenticates and authorizes for the given Google resource.
Some of the available resources are:
def _oauth_get_user(self, access_token, callback):
OpenIdMixin.get_authenticated_user(self, callback)
+
class FacebookMixin(object):
"""Facebook Connect authentication.
def _signature(self, args):
parts = ["%s=%s" % (n, args[n]) for n in sorted(args.keys())]
body = "".join(parts) + self.settings["facebook_secret"]
- if isinstance(body, unicode): body = body.encode("utf-8")
+ if isinstance(body, unicode):
+ body = body.encode("utf-8")
return hashlib.md5(body).hexdigest()
+
class FacebookGraphMixin(OAuth2Mixin):
"""Facebook authentication using the new Graph API and OAuth2."""
_OAUTH_ACCESS_TOKEN_URL = "https://graph.facebook.com/oauth/access_token?"
fields = set(['id', 'name', 'first_name', 'last_name',
'locale', 'picture', 'link'])
- if extra_fields: fields.update(extra_fields)
+ if extra_fields:
+ fields.update(extra_fields)
http.fetch(self._oauth_request_token_url(**args),
self.async_callback(self._on_access_token, redirect_uri, client_id,
fields=",".join(fields)
)
-
def _on_get_user_info(self, callback, session, fields, user):
if user is None:
callback(None)
all_args["access_token"] = access_token
all_args.update(args)
all_args.update(post_args or {})
- if all_args: url += "?" + urllib.urlencode(all_args)
+ if all_args:
+ url += "?" + urllib.urlencode(all_args)
callback = self.async_callback(self._on_facebook_request, callback)
http = httpclient.AsyncHTTPClient()
if post_args is not None:
return
callback(escape.json_decode(response.body))
+
def _oauth_signature(consumer_token, method, url, parameters={}, token=None):
"""Calculates the HMAC-SHA1 OAuth signature for the given request.
base_elems.append(normalized_url)
base_elems.append("&".join("%s=%s" % (k, _oauth_escape(str(v)))
for k, v in sorted(parameters.items())))
- base_string = "&".join(_oauth_escape(e) for e in base_elems)
+ base_string = "&".join(_oauth_escape(e) for e in base_elems)
key_elems = [escape.utf8(consumer_token["secret"])]
key_elems.append(escape.utf8(token["secret"] if token else ""))
hash = hmac.new(key, escape.utf8(base_string), hashlib.sha1)
return binascii.b2a_base64(hash.digest())[:-1]
+
def _oauth10a_signature(consumer_token, method, url, parameters={}, token=None):
"""Calculates the HMAC-SHA1 OAuth 1.0a signature for the given request.
base_elems.append("&".join("%s=%s" % (k, _oauth_escape(str(v)))
for k, v in sorted(parameters.items())))
- base_string = "&".join(_oauth_escape(e) for e in base_elems)
+ base_string = "&".join(_oauth_escape(e) for e in base_elems)
key_elems = [escape.utf8(urllib.quote(consumer_token["secret"], safe='~'))]
key_elems.append(escape.utf8(urllib.quote(token["secret"], safe='~') if token else ""))
key = b("&").join(key_elems)
hash = hmac.new(key, escape.utf8(base_string), hashlib.sha1)
return binascii.b2a_base64(hash.digest())[:-1]
+
def _oauth_escape(val):
if isinstance(val, unicode):
val = val.encode("utf-8")
special = (b("oauth_token"), b("oauth_token_secret"))
token.update((k, p[k][0]) for k in p if k not in special)
return token
-
-
except ImportError:
signal = None
+
def start(io_loop=None, check_time=500):
"""Restarts the process automatically when a module is modified.
scheduler = ioloop.PeriodicCallback(callback, check_time, io_loop=io_loop)
scheduler.start()
+
def wait():
"""Wait for a watched file to change, then restart the process.
_watched_files = set()
+
def watch(filename):
"""Add a file to the watch list.
_reload_hooks = []
+
def add_reload_hook(fn):
"""Add a function to be called before reloading the process.
"""
_reload_hooks.append(fn)
+
def _close_all_fds(io_loop):
for fd in io_loop._handlers.keys():
try:
_reload_attempted = False
+
def _reload_on_update(modify_times):
if _reload_attempted:
# We already tried to reload and it didn't work, so don't try again.
# in the standard library), and occasionally this can cause strange
# failures in getattr. Just ignore anything that's not an ordinary
# module.
- if not isinstance(module, types.ModuleType): continue
+ if not isinstance(module, types.ModuleType):
+ continue
path = getattr(module, "__file__", None)
- if not path: continue
+ if not path:
+ continue
if path.endswith(".pyc") or path.endswith(".pyo"):
path = path[:-1]
_check_file(modify_times, path)
for path in _watched_files:
_check_file(modify_times, path)
+
def _check_file(modify_times, path):
try:
modified = os.stat(path).st_mtime
logging.info("%s modified; restarting server", path)
_reload()
+
def _reload():
global _reload_attempted
_reload_attempted = True
python -m tornado.autoreload -m module.to.run [args...]
python -m tornado.autoreload path/to/script.py [args...]
"""
+
+
def main():
"""Command-line wrapper to re-run a script whenever its source changes.
-
+
Scripts may be specified by filename or module name::
python -m tornado.autoreload -m tornado.test.runtests
watch(pkgutil.get_loader(module).get_filename())
wait()
-
+
if __name__ == "__main__":
# If this module is run with "python -m tornado.autoreload", the current
from tornado.escape import utf8
from tornado.httpclient import HTTPRequest, HTTPResponse, HTTPError, AsyncHTTPClient, main
+
class CurlAsyncHTTPClient(AsyncHTTPClient):
def initialize(self, io_loop=None, max_clients=10,
max_simultaneous_connections=None):
if self._timeout is not None:
self.io_loop.remove_timeout(self._timeout)
self._timeout = self.io_loop.add_timeout(
- time.time() + msecs/1000.0, self._handle_timeout)
+ time.time() + msecs / 1000.0, self._handle_timeout)
def _handle_events(self, fd, events):
"""Called by IOLoop when there is activity on one of our
file descriptors.
"""
action = 0
- if events & ioloop.IOLoop.READ: action |= pycurl.CSELECT_IN
- if events & ioloop.IOLoop.WRITE: action |= pycurl.CSELECT_OUT
+ if events & ioloop.IOLoop.READ:
+ action |= pycurl.CSELECT_IN
+ if events & ioloop.IOLoop.WRITE:
+ action |= pycurl.CSELECT_OUT
while True:
try:
ret, num_handles = self._socket_action(fd, action)
except Exception:
self.handle_callback_exception(info["callback"])
-
def handle_callback_exception(self, callback):
self.io_loop.handle_callback_exception(callback)
# Handle curl's cryptic options for every individual HTTP method
if request.method in ("POST", "PUT"):
- request_buffer = cStringIO.StringIO(utf8(request.body))
+ request_buffer = cStringIO.StringIO(utf8(request.body))
curl.setopt(pycurl.READFUNCTION, request_buffer.read)
if request.method == "POST":
def ioctl(cmd):
return
headers.parse_line(header_line)
+
def _curl_debug(debug_type, debug_msg):
debug_types = ('I', '<', '>', '<', '>')
if debug_type == 0:
import logging
import time
+
class Connection(object):
"""A lightweight wrapper around MySQLdb DB-API connections.
UTF-8 on all connections to avoid time zone and encoding errors.
"""
def __init__(self, host, database, user=None, password=None,
- max_idle_time=7*3600):
+ max_idle_time=7 * 3600):
self.host = host
self.database = database
self.max_idle_time = max_idle_time
import urllib
# Python3 compatibility: On python2.5, introduce the bytes alias from 2.6
-try: bytes
-except Exception: bytes = str
+try:
+ bytes
+except Exception:
+ bytes = str
try:
from urlparse import parse_qs # Python 2.6+
_XHTML_ESCAPE_RE = re.compile('[&<>"]')
_XHTML_ESCAPE_DICT = {'&': '&', '<': '<', '>': '>', '"': '"'}
+
+
def xhtml_escape(value):
"""Escapes a string so it is valid within XML or XHTML."""
return _XHTML_ESCAPE_RE.sub(lambda match: _XHTML_ESCAPE_DICT[match.group(0)],
result = parse_qs(qs, keep_blank_values, strict_parsing,
encoding='latin1', errors='strict')
encoded = {}
- for k,v in result.iteritems():
+ for k, v in result.iteritems():
encoded[k] = [i.encode('latin1') for i in v]
return encoded
-
_UTF8_TYPES = (bytes, type(None))
+
+
def utf8(value):
"""Converts a string argument to a byte string.
return value.encode("utf-8")
_TO_UNICODE_TYPES = (unicode, type(None))
+
+
def to_unicode(value):
"""Converts a string argument to a unicode string.
native_str = utf8
_BASESTRING_TYPES = (basestring, type(None))
+
+
def to_basestring(value):
"""Converts a string argument to a subclass of basestring.
assert isinstance(value, bytes)
return value.decode("utf-8")
+
def recursive_unicode(obj):
"""Walks a simple data structure, converting byte strings to unicode.
Supports lists, tuples, and dictionaries.
"""
if isinstance(obj, dict):
- return dict((recursive_unicode(k), recursive_unicode(v)) for (k,v) in obj.iteritems())
+ return dict((recursive_unicode(k), recursive_unicode(v)) for (k, v) in obj.iteritems())
elif isinstance(obj, list):
return list(recursive_unicode(i) for i in obj)
elif isinstance(obj, tuple):
else:
return obj
-# I originally used the regex from
+# I originally used the regex from
# http://daringfireball.net/2010/07/improved_regex_for_matching_urls
# but it gets all exponential on certain patterns (such as too many trailing
# dots), causing the regex matcher to never return.
from tornado.stack_context import ExceptionStackContext
-class KeyReuseError(Exception): pass
-class UnknownKeyError(Exception): pass
-class LeakedCallbackError(Exception): pass
-class BadYieldError(Exception): pass
+
+class KeyReuseError(Exception):
+ pass
+
+
+class UnknownKeyError(Exception):
+ pass
+
+
+class LeakedCallbackError(Exception):
+ pass
+
+
+class BadYieldError(Exception):
+ pass
+
def engine(func):
"""Decorator for asynchronous generators.
@functools.wraps(func)
def wrapper(*args, **kwargs):
runner = None
+
def handle_exception(typ, value, tb):
# if the function throws an exception before its first "yield"
# (or is not a generator at all), the Runner won't exist yet.
# no yield, so we're done
return wrapper
+
class YieldPoint(object):
"""Base class for objects that may be yielded from the generator."""
def start(self, runner):
"""Called by the runner after the generator has yielded.
-
+
No other methods will be called on this object before ``start``.
"""
raise NotImplementedError()
def get_result(self):
"""Returns the value to use as the result of the yield expression.
-
+
This method will only be called once, and only after `is_ready`
has returned true.
"""
raise NotImplementedError()
+
class Callback(YieldPoint):
"""Returns a callable object that will allow a matching `Wait` to proceed.
def get_result(self):
return self.runner.result_callback(self.key)
+
class Wait(YieldPoint):
"""Returns the argument passed to the result of a previous `Callback`."""
def __init__(self, key):
def get_result(self):
return self.runner.pop_result(self.key)
+
class WaitAll(YieldPoint):
"""Returns the results of multiple previous `Callbacks`.
def is_ready(self):
return all(self.runner.is_ready(key) for key in self.keys)
-
+
def get_result(self):
return [self.runner.pop_result(key) for key in self.keys]
-
+
class Task(YieldPoint):
"""Runs a single asynchronous operation.
A `Task` is equivalent to a `Callback`/`Wait` pair (with a unique
key generated automatically)::
-
+
result = yield gen.Task(func, args)
-
+
func(args, callback=(yield gen.Callback(key)))
result = yield gen.Wait(key)
"""
runner.register_callback(self.key)
self.kwargs["callback"] = runner.result_callback(self.key)
self.func(*self.args, **self.kwargs)
-
+
def is_ready(self):
return self.runner.is_ready(self.key)
def get_result(self):
return self.runner.pop_result(self.key)
+
class Multi(YieldPoint):
"""Runs multiple asynchronous operations in parallel.
def __init__(self, children):
assert all(isinstance(i, YieldPoint) for i in children)
self.children = children
-
+
def start(self, runner):
for i in self.children:
i.start(runner)
def get_result(self):
return [i.get_result() for i in self.children]
+
class _NullYieldPoint(YieldPoint):
def start(self, runner):
pass
+
def is_ready(self):
return True
+
def get_result(self):
return None
+
class Runner(object):
"""Internal implementation of `tornado.gen.engine`.
return False
# in python 2.6+ this could be a collections.namedtuple
+
+
class Arguments(tuple):
"""The result of a yield expression whose callback had more than one
argument (or keyword arguments).
from tornado.ioloop import IOLoop
from tornado.util import import_object, bytes_type
+
class HTTPClient(object):
"""A blocking HTTP client.
def fetch(self, request, **kwargs):
"""Executes a request, returning an `HTTPResponse`.
-
+
The request may be either a string URL or an `HTTPRequest` object.
If it is a string, we construct an `HTTPRequest` using any additional
kwargs: ``HTTPRequest(request, **kwargs)``
response.rethrow()
return response
+
class AsyncHTTPClient(object):
"""An non-blocking HTTP client.
cls._async_client_dict = weakref.WeakKeyDictionary()
return cls._async_client_dict
- def __new__(cls, io_loop=None, max_clients=10, force_instance=False,
+ def __new__(cls, io_loop=None, max_clients=10, force_instance=False,
**kwargs):
io_loop = io_loop or IOLoop.instance()
if cls is AsyncHTTPClient:
AsyncHTTPClient._impl_class = impl
AsyncHTTPClient._impl_kwargs = kwargs
+
class HTTPRequest(object):
"""HTTP client request object."""
def __init__(self, url, method="GET", headers=None, body=None,
:arg bool use_gzip: Request gzip encoding from the server
:arg string network_interface: Network interface to use for request
:arg callable streaming_callback: If set, `streaming_callback` will
- be run with each chunk of data as it is received, and
- `~HTTPResponse.body` and `~HTTPResponse.buffer` will be empty in
+ be run with each chunk of data as it is received, and
+ `~HTTPResponse.body` and `~HTTPResponse.buffer` will be empty in
the final response.
:arg callable header_callback: If set, `header_callback` will
- be run with each header line as it is received, and
+ be run with each header line as it is received, and
`~HTTPResponse.headers` will be empty in the final response.
:arg callable prepare_curl_callback: If set, will be called with
a `pycurl.Curl` object to allow the application to make additional
`setopt` calls.
- :arg string proxy_host: HTTP proxy hostname. To use proxies,
- `proxy_host` and `proxy_port` must be set; `proxy_username` and
- `proxy_pass` are optional. Proxies are currently only support
+ :arg string proxy_host: HTTP proxy hostname. To use proxies,
+ `proxy_host` and `proxy_port` must be set; `proxy_username` and
+ `proxy_pass` are optional. Proxies are currently only support
with `curl_httpclient`.
:arg int proxy_port: HTTP proxy port
:arg string proxy_username: HTTP proxy username
:arg string proxy_password: HTTP proxy password
- :arg bool allow_nonstandard_methods: Allow unknown values for `method`
+ :arg bool allow_nonstandard_methods: Allow unknown values for `method`
argument?
:arg bool validate_cert: For HTTPS requests, validate the server's
certificate?
any request uses a custom `ca_certs` file, they all must (they
don't have to all use the same `ca_certs`, but it's not possible
to mix requests with ca_certs and requests that use the defaults.
- :arg bool allow_ipv6: Use IPv6 when available? Default is false in
+ :arg bool allow_ipv6: Use IPv6 when available? Default is false in
`simple_httpclient` and true in `curl_httpclient`
:arg string client_key: Filename for client SSL key, if any
:arg string client_cert: Filename for client SSL certificate, if any
from tornado.util import b, bytes_type
try:
- import ssl # Python 2.6+
+ import ssl # Python 2.6+
except ImportError:
ssl = None
+
class HTTPServer(TCPServer):
r"""A non-blocking, single-threaded HTTP server.
In many cases, `tornado.web.Application.listen` can be used to avoid
the need to explicitly create the `HTTPServer`.
- 2. `~tornado.netutil.TCPServer.bind`/`~tornado.netutil.TCPServer.start`:
+ 2. `~tornado.netutil.TCPServer.bind`/`~tornado.netutil.TCPServer.start`:
simple multi-process::
server = HTTPServer(app)
HTTPConnection(stream, address, self.request_callback,
self.no_keep_alive, self.xheaders)
+
class _BadRequestException(Exception):
"""Exception class for malformed HTTP requests."""
pass
+
class HTTPConnection(object):
"""Handles a connection to an HTTP client, executing HTTP requests.
if self._write_callback is not None:
callback = self._write_callback
self._write_callback = None
- callback()
+ callback()
# _on_write_complete is enqueued on the IOLoop whenever the
# IOStream's write buffer becomes empty, but it's possible for
# another callback that runs on the IOLoop before it to
GET/POST arguments are available in the arguments property, which
maps arguments names to lists of values (to support multiple values
for individual names). Names are of type `str`, while arguments
- are byte strings. Note that this is different from
- `RequestHandler.get_argument`, which returns argument values as
+ are byte strings. Note that this is different from
+ `RequestHandler.get_argument`, which returns argument values as
unicode strings.
.. attribute:: files
self.remote_ip = remote_ip
if protocol:
self.protocol = protocol
- elif connection and isinstance(connection.stream,
+ elif connection and isinstance(connection.stream,
iostream.SSLIOStream):
self.protocol = "https"
else:
self.arguments = {}
for name, values in arguments.iteritems():
values = [v for v in values if v]
- if values: self.arguments[name] = values
+ if values:
+ self.arguments[name] = values
def supports_http_1_1(self):
"""Returns True if this request supports HTTP/1.1 semantics"""
return False
raise
return True
-
from tornado.util import b, ObjectDict
+
class HTTPHeaders(dict):
"""A dictionary that maintains Http-Header-Case for all keys.
>>> url_concat("http://example.com/foo?a=b", dict(c="d"))
'http://example.com/foo?a=b&c=d'
"""
- if not args: return url
+ if not args:
+ return url
if url[-1] not in ('?', '&'):
url += '&' if ('?' in url) else '?'
return url + urllib.urlencode(args)
footer_length = len(boundary) + 4
parts = data[:-footer_length].split(b("--") + boundary + b("\r\n"))
for part in parts:
- if not part: continue
+ if not part:
+ continue
eoh = part.find(b("\r\n\r\n"))
if eoh == -1:
logging.warning("multipart/form-data missing headers")
yield f.strip()
s = s[end:]
+
def _parse_header(line):
"""Parse a Content-type like header.
i = p.find('=')
if i >= 0:
name = p[:i].strip().lower()
- value = p[i+1:].strip()
+ value = p[i + 1:].strip()
if len(value) >= 2 and value[0] == value[-1] == '"':
value = value[1:-1]
value = value.replace('\\\\', '\\').replace('\\"', '"')
@staticmethod
def timedelta_to_seconds(td):
"""Equivalent to td.total_seconds() (introduced in python 2.7)."""
- return (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6) / float(10**6)
+ return (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10 ** 6) / float(10 ** 6)
# Comparison methods to sort by deadline, with object id as a tiebreaker
# to guarantee a consistent ordering. The heapq module uses __le__
self._timeout = None
def _run(self):
- if not self._running: return
+ if not self._running:
+ return
try:
self.callback()
except Exception:
pass
def register(self, fd, events):
- if events & IOLoop.READ: self.read_fds.add(fd)
- if events & IOLoop.WRITE: self.write_fds.add(fd)
+ if events & IOLoop.READ:
+ self.read_fds.add(fd)
+ if events & IOLoop.WRITE:
+ self.write_fds.add(fd)
if events & IOLoop.ERROR:
self.error_fds.add(fd)
# Closed connections are reported as errors by epoll and kqueue,
from tornado.util import b, bytes_type
try:
- import ssl # Python 2.6+
+ import ssl # Python 2.6+
except ImportError:
ssl = None
+
class IOStream(object):
r"""A utility class to write to and read from a non-blocking socket.
We support a non-blocking ``write()`` and a family of ``read_*()`` methods.
All of the methods take callbacks (since writing and reading are
- non-blocking and asynchronous).
+ non-blocking and asynchronous).
The socket parameter may either be connected or unconnected. For
server operations the socket is the result of calling socket.accept().
if self._read_to_buffer() == 0:
break
self._add_io_state(self.io_loop.READ)
-
+
def read_until(self, delimiter, callback):
"""Call callback when we read the given delimiter."""
assert not self._read_callback, "Already reading"
# until we've completed the SSL handshake (so certificates are
# available, etc).
-
def _read_from_socket(self):
if self._ssl_accepting:
# If the handshake hasn't finished yet, there can't be anything
return None
return chunk
+
def _merge_prefix(deque, size):
"""Replace the first entries in a deque of strings with a single
string of up to size bytes.
if not deque:
deque.appendleft(b(""))
+
def doctests():
import doctest
return doctest.DocTestSuite()
_supported_locales = frozenset([_default_locale])
_use_gettext = False
+
def get(*locale_codes):
"""Returns the closest match for the given locale codes.
global _supported_locales
_translations = {}
for path in os.listdir(directory):
- if not path.endswith(".csv"): continue
+ if not path.endswith(".csv"):
+ continue
locale, extension = path.split(".")
if not re.match("[a-z]+(_[A-Z]+)?$", locale):
logging.error("Unrecognized locale %r (path: %s)", locale,
f = open(os.path.join(directory, path), "r")
_translations[locale] = {}
for i, row in enumerate(csv.reader(f)):
- if not row or len(row) < 2: continue
+ if not row or len(row) < 2:
+ continue
row = [c.decode("utf-8").strip() for c in row]
english, translation = row[:2]
if len(row) > 2:
_supported_locales = frozenset(_translations.keys() + [_default_locale])
logging.info("Supported locales: %s", sorted(_supported_locales))
+
def load_gettext_translations(directory, domain):
"""Loads translations from gettext's locale tree
global _use_gettext
_translations = {}
for lang in os.listdir(directory):
- if lang.startswith('.'): continue # skip .svn, etc
- if os.path.isfile(os.path.join(directory, lang)): continue
+ if lang.startswith('.'):
+ continue # skip .svn, etc
+ if os.path.isfile(os.path.join(directory, lang)):
+ continue
try:
- os.stat(os.path.join(directory, lang, "LC_MESSAGES", domain+".mo"))
+ os.stat(os.path.join(directory, lang, "LC_MESSAGES", domain + ".mo"))
_translations[lang] = gettext.translation(domain, directory,
languages=[lang])
except Exception, e:
def get_closest(cls, *locale_codes):
"""Returns the closest match for the given locale code."""
for code in locale_codes:
- if not code: continue
+ if not code:
+ continue
code = code.replace("-", "_")
parts = code.split("_")
if len(parts) > 2:
if relative and days == 0:
if seconds < 50:
return _("1 second ago", "%(seconds)d seconds ago",
- seconds) % { "seconds": seconds }
+ seconds) % {"seconds": seconds}
if seconds < 50 * 60:
minutes = round(seconds / 60.0)
return _("1 minute ago", "%(minutes)d minutes ago",
- minutes) % { "minutes": minutes }
+ minutes) % {"minutes": minutes}
hours = round(seconds / (60.0 * 60))
return _("1 hour ago", "%(hours)d hours ago",
- hours) % { "hours": hours }
+ hours) % {"hours": hours}
if days == 0:
format = _("%(time)s")
of size 1.
"""
_ = self.translate
- if len(parts) == 0: return ""
- if len(parts) == 1: return parts[0]
+ if len(parts) == 0:
+ return ""
+ if len(parts) == 1:
+ return parts[0]
comma = u' \u0648 ' if self.code.startswith("fa") else u", "
return _("%(commas)s and %(last)s") % {
"commas": comma.join(parts[:-1]),
value = value[:-3]
return ",".join(reversed(parts))
+
class CSVLocale(Locale):
"""Locale implementation using tornado's CSV translation format."""
def translate(self, message, plural_message=None, count=None):
message_dict = self.translations.get("unknown", {})
return message_dict.get(message, message)
+
class GettextLocale(Locale):
"""Locale implementation using the gettext module."""
def translate(self, message, plural_message=None, count=None):
from tornado.platform.auto import set_close_exec
try:
- import ssl # Python 2.6+
+ import ssl # Python 2.6+
except ImportError:
ssl = None
+
class TCPServer(object):
r"""A non-blocking, single-threaded TCP server.
or socket.AF_INET6 to restrict to ipv4 or ipv6 addresses, otherwise
both will be used if available.
- The ``backlog`` argument has the same meaning as for
+ The ``backlog`` argument has the same meaning as for
``socket.listen()``.
"""
sockets = []
If any other file with that name exists, an exception will be
raised.
- Returns a socket object (not a list of socket objects like
+ Returns a socket object (not a list of socket objects like
`bind_sockets`)
"""
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.listen(backlog)
return sock
+
def add_accept_handler(sock, callback, io_loop=None):
"""Adds an ``IOLoop`` event handler to accept new connections on ``sock``.
"""
if io_loop is None:
io_loop = IOLoop.instance()
+
def accept_handler(fd, events):
while True:
try:
frame = sys._getframe(0)
options_file = frame.f_code.co_filename
file_name = frame.f_back.f_code.co_filename
- if file_name == options_file: file_name = ""
+ if file_name == options_file:
+ file_name = ""
if type is None:
if not multiple and default is not None:
type = default.__class__
We return all command line arguments that are not options as a list.
"""
- if args is None: args = sys.argv
+ if args is None:
+ args = sys.argv
remaining = []
for i in xrange(1, len(args)):
# All things after the last option are command line arguments
remaining = args[i:]
break
if args[i] == "--":
- remaining = args[i+1:]
+ remaining = args[i + 1:]
break
arg = args[i].lstrip("-")
name, equals, value = arg.partition("=")
by_group.setdefault(option.group_name, []).append(option)
for filename, o in sorted(by_group.items()):
- if filename: print >> file, filename
+ if filename:
+ print >> file, filename
o.sort(key=lambda option: option.name)
for option in o:
prefix = option.name
lo, _, hi = part.partition(":")
lo = _parse(lo)
hi = _parse(hi) if hi else lo
- self._value.extend(range(lo, hi+1))
+ self._value.extend(range(lo, hi + 1))
else:
self._value.append(_parse(part))
else:
def enable_pretty_logging():
"""Turns on formatted logging output as configured.
-
+
This is called automatically by `parse_command_line`.
"""
root_logger = logging.getLogger()
root_logger.addHandler(channel)
-
class _LogFormatter(logging.Formatter):
def __init__(self, color, *args, **kwargs):
logging.Formatter.__init__(self, *args, **kwargs)
if (3, 0) < sys.version_info < (3, 2, 3):
fg_color = unicode(fg_color, "ascii")
self._colors = {
- logging.DEBUG: unicode(curses.tparm(fg_color, 4), # Blue
+ logging.DEBUG: unicode(curses.tparm(fg_color, 4), # Blue
"ascii"),
- logging.INFO: unicode(curses.tparm(fg_color, 2), # Green
+ logging.INFO: unicode(curses.tparm(fg_color, 2), # Green
"ascii"),
- logging.WARNING: unicode(curses.tparm(fg_color, 3), # Yellow
+ logging.WARNING: unicode(curses.tparm(fg_color, 3), # Yellow
"ascii"),
- logging.ERROR: unicode(curses.tparm(fg_color, 1), # Red
+ logging.ERROR: unicode(curses.tparm(fg_color, 1), # Red
"ascii"),
}
self._normal = unicode(curses.tigetstr("sgr0"), "ascii")
from __future__ import absolute_import, division, with_statement
+
def set_close_exec(fd):
"""Sets the close-on-exec bit (``FD_CLOEXEC``)for a file descriptor."""
raise NotImplementedError()
+
class Waker(object):
"""A socket-like object that can wake another thread from ``select()``.
"""
def fileno(self):
"""Returns a file descriptor for this waker.
-
+
Must be suitable for use with ``select()`` or equivalent on the
local platform.
"""
def close(self):
"""Closes the waker's file descriptor(s)."""
raise NotImplementedError()
-
-
from tornado.platform import interface
from tornado.util import b
+
def set_close_exec(fd):
flags = fcntl.fcntl(fd, fcntl.F_GETFD)
fcntl.fcntl(fd, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC)
+
def _set_nonblocking(fd):
flags = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
-
+
+
class Waker(interface.Waker):
def __init__(self):
r, w = os.pipe()
try:
while True:
result = self.reader.read()
- if not result: break;
+ if not result:
+ break
except IOError:
pass
def active(self):
return self._active
+
class TornadoReactor(PosixReactorBase):
"""Twisted reactor built on the Tornado IOLoop.
self._io_loop = io_loop
self._readers = {} # map of reader objects to fd
self._writers = {} # map of writer objects to fd
- self._fds = {} # a map of fd to a (reader, writer) tuple
+ self._fds = {} # a map of fd to a (reader, writer) tuple
self._delayedCalls = {}
PosixReactorBase.__init__(self)
if self._stopped:
self.fireSystemEvent("shutdown")
+
class _TestReactor(TornadoReactor):
"""Subclass of TornadoReactor for use in unittests.
port, protocol, interface=interface, maxPacketSize=maxPacketSize)
-
def install(io_loop=None):
"""Install this package as the default Twisted reactor."""
if not io_loop:
try:
while True:
result = self.reader.recv(1024)
- if not result: break
+ if not result:
+ break
except IOError:
pass
from tornado import ioloop
try:
- import multiprocessing # Python 2.6+
+ import multiprocessing # Python 2.6+
except ImportError:
multiprocessing = None
+
def cpu_count():
"""Returns the number of processors on this machine."""
if multiprocessing is not None:
logging.error("Could not detect number of processors; assuming 1")
return 1
+
def _reseed_random():
if 'random' not in sys.modules:
return
_task_id = None
+
def fork_processes(num_processes, max_restarts=100):
"""Starts multiple worker processes.
"IOLoop.instance() before calling start_processes()")
logging.info("Starting %d processes", num_processes)
children = {}
+
def start_child(i):
pid = os.fork()
if pid == 0:
return None
for i in range(num_processes):
id = start_child(i)
- if id is not None: return id
+ if id is not None:
+ return id
num_restarts = 0
while children:
try:
if num_restarts > max_restarts:
raise RuntimeError("Too many child restarts, giving up")
new_id = start_child(id)
- if new_id is not None: return new_id
+ if new_id is not None:
+ return new_id
# All child processes exited cleanly, so exit the master process
# instead of just returning to right after the call to
# fork_processes (which will probably just start up another IOLoop
# unless the caller checks the return value).
sys.exit(0)
+
def task_id():
"""Returns the current task id, if any.
from cStringIO import StringIO as BytesIO # python 2
try:
- import ssl # python 2.6+
+ import ssl # python 2.6+
except ImportError:
ssl = None
_DEFAULT_CA_CERTS = os.path.dirname(__file__) + '/ca-certificates.crt'
+
class SimpleAsyncHTTPClient(AsyncHTTPClient):
"""Non-blocking HTTP client with no external dependencies.
self._process_queue()
-
class _HTTPConnection(object):
_SUPPORTED_METHODS = set(["GET", "HEAD", "POST", "PUT", "DELETE"])
# compatibility with servers configured for TLSv1 only,
# but nearly all servers support SSLv3:
# http://blog.ivanristic.com/2011/09/ssl-survey-protocol-support.html
- if sys.version_info >= (2,7):
+ if sys.version_info >= (2, 7):
ssl_options["ciphers"] = "DEFAULT:!SSLv2"
else:
# This is really only necessary for pre-1.0 versions
self.headers.get("Content-Encoding") == "gzip"):
# Magic parameter makes zlib module understand gzip header
# http://stackoverflow.com/questions/1838699/how-can-i-decompress-a-gzip-stream-with-zlib
- self._decompressor = zlib.decompressobj(16+zlib.MAX_WBITS)
+ self._decompressor = zlib.decompressobj(16 + zlib.MAX_WBITS)
if self.headers.get("Transfer-Encoding") == "chunked":
self.chunks = []
self.stream.read_until(b("\r\n"), self._on_chunk_length)
self.request.streaming_callback(data)
buffer = BytesIO()
else:
- buffer = BytesIO(data) # TODO: don't require one big string?
+ buffer = BytesIO(data) # TODO: don't require one big string?
response = HTTPResponse(original_request,
self.code, headers=self.headers,
request_time=time.time() - self.start_time,
class CertificateError(ValueError):
pass
+
def _dnsname_to_pat(dn):
pats = []
for frag in dn.split(r'.'):
pats.append(frag.replace(r'\*', '[^.]*'))
return re.compile(r'\A' + r'\.'.join(pats) + r'\Z', re.IGNORECASE)
+
def match_hostname(cert, hostname):
"""Verify that *cert* (in decoded format as returned by
SSLSocket.getpeercert()) matches the *hostname*. RFC 2818 rules
import sys
import threading
+
class _State(threading.local):
def __init__(self):
self.contexts = ()
_state = _State()
+
class StackContext(object):
'''Establishes the given context as a StackContext that will be transferred.
finally:
_state.contexts = self.old_contexts
+
class ExceptionStackContext(object):
'''Specialization of StackContext for exception handling.
finally:
_state.contexts = self.old_contexts
+
class NullContext(object):
'''Resets the StackContext.
def __exit__(self, type, value, traceback):
_state.contexts = self.old_contexts
+
class _StackContextWrapper(functools.partial):
pass
+
def wrap(fn):
'''Returns a callable object that will restore the current StackContext
when executed.
return fn
# functools.wraps doesn't appear to work on functools.partial objects
#@functools.wraps(fn)
+
def wrapped(callback, contexts, *args, **kwargs):
if contexts is _state.contexts or not contexts:
callback(*args, **kwargs)
for a, b in itertools.izip(_state.contexts, contexts))):
# contexts have been removed or changed, so start over
new_contexts = ([NullContext()] +
- [cls(arg) for (cls,arg) in contexts])
+ [cls(arg) for (cls, arg) in contexts])
else:
new_contexts = [cls(arg)
for (cls, arg) in contexts[len(_state.contexts):]]
else:
return _StackContextWrapper(fn)
+
@contextlib.contextmanager
def _nested(*managers):
"""Support multiple context managers in a single with-statement.
# the right information. Another exception may
# have been raised and caught by an exit method
raise exc[0], exc[1], exc[2]
-
``{% for *var* in *expr* %}...{% end %}``
Same as the python ``for`` statement.
-
+
``{% from *x* import *y* %}``
Same as the python ``import`` statement.
_DEFAULT_AUTOESCAPE = "xhtml_escape"
_UNSET = object()
+
class Template(object):
"""A compiled template.
# the module name used in __name__ below.
self.compiled = compile(
escape.to_unicode(self.code),
- "%s.generated.py" % self.name.replace('.','_'),
+ "%s.generated.py" % self.name.replace('.', '_'),
"exec")
except Exception:
formatted_code = _format_code(self.code).rstrip()
def _create_template(self, name):
raise NotImplementedError()
+
class Loader(BaseLoader):
"""A template loader that loads from a single root directory.
return (self.body,)
-
class _ChunkList(_Node):
def __init__(self, chunks):
self.chunks = chunks
writer.current_template.autoescape, self.line)
writer.write_line("_append(_tmp)", self.line)
+
class _Module(_Expression):
def __init__(self, expression, line):
super(_Module, self).__init__("_modules." + expression, line,
raw=True)
+
class _Text(_Node):
def __init__(self, value, line):
self.value = value
ancestors = ["%s:%d" % (tmpl.name, lineno)
for (tmpl, lineno) in self.include_stack]
line_comment += ' (via %s)' % ', '.join(reversed(ancestors))
- print >> self.file, " "*indent + line + line_comment
+ print >> self.file, " " * indent + line + line_comment
class _TemplateReader(object):
if type(key) is slice:
size = len(self)
start, stop, step = key.indices(size)
- if start is None: start = self.pos
- else: start += self.pos
- if stop is not None: stop += self.pos
+ if start is None:
+ start = self.pos
+ else:
+ start += self.pos
+ if stop is not None:
+ stop += self.pos
return self.text[slice(start, stop, step)]
elif key < 0:
return self.text[key]
block = _Statement(suffix, line)
elif operator == "autoescape":
fn = suffix.strip()
- if fn == "None": fn = None
+ if fn == "None":
+ fn = None
template.autoescape = fn
continue
elif operator == "raw":
from tornado.util import b
from tornado.web import RequestHandler, Application, asynchronous
+
class OpenIdClientLoginHandler(RequestHandler, OpenIdMixin):
def initialize(self, test):
self._OPENID_ENDPOINT = test.get_url('/openid/server/authenticate')
assert user is not None
self.finish(user)
+
class OpenIdServerAuthenticateHandler(RequestHandler):
def post(self):
assert self.get_argument('openid.mode') == 'check_authentication'
self.write('is_valid:true')
+
class OAuth1ClientLoginHandler(RequestHandler, OAuthMixin):
def initialize(self, test, version):
self._OAUTH_VERSION = version
assert access_token == dict(key=b('uiop'), secret=b('5678')), access_token
callback(dict(email='foo@example.com'))
+
class OAuth1ClientRequestParametersHandler(RequestHandler, OAuthMixin):
def initialize(self, version):
self._OAUTH_VERSION = version
'http://www.example.com/api/asdf',
dict(key='uiop', secret='5678'),
parameters=dict(foo='bar'))
- import urllib; urllib.urlencode(params)
+ import urllib
+ urllib.urlencode(params)
self.write(params)
+
class OAuth1ServerRequestTokenHandler(RequestHandler):
def get(self):
self.write('oauth_token=zxcv&oauth_token_secret=1234')
+
class OAuth1ServerAccessTokenHandler(RequestHandler):
def get(self):
self.write('oauth_token=uiop&oauth_token_secret=5678')
+
class OAuth2ClientLoginHandler(RequestHandler, OAuth2Mixin):
def initialize(self, test):
self._OAUTH_AUTHORIZE_URL = test.get_url('/oauth2/server/authorize')
def test_oauth10_get_user(self):
response = self.fetch(
'/oauth10/client/login?oauth_token=zxcv',
- headers={'Cookie':'_oauth_request_token=enhjdg==|MTIzNA=='})
+ headers={'Cookie': '_oauth_request_token=enhjdg==|MTIzNA=='})
response.rethrow()
parsed = json_decode(response.body)
self.assertEqual(parsed['email'], 'foo@example.com')
def test_oauth10a_get_user(self):
response = self.fetch(
'/oauth10a/client/login?oauth_token=zxcv',
- headers={'Cookie':'_oauth_request_token=enhjdg==|MTIzNA=='})
+ headers={'Cookie': '_oauth_request_token=enhjdg==|MTIzNA=='})
response.rethrow()
parsed = json_decode(response.body)
self.assertEqual(parsed['email'], 'foo@example.com')
if pycurl is not None:
from tornado.curl_httpclient import CurlAsyncHTTPClient
+
class CurlHTTPClientCommonTestCase(HTTPClientCommonTestCase):
def get_http_client(self):
client = CurlAsyncHTTPClient(io_loop=self.io_loop)
("http://www.example.com/wpstyle/?p=364.", {},
u'<a href="http://www.example.com/wpstyle/?p=364">http://www.example.com/wpstyle/?p=364</a>.'),
- ("rdar://1234",
+ ("rdar://1234",
{"permitted_protocols": ["http", "rdar"]},
u'<a href="rdar://1234">rdar://1234</a>'),
- ("rdar:/1234",
+ ("rdar:/1234",
{"permitted_protocols": ["rdar"]},
u'<a href="rdar:/1234">rdar:/1234</a>'),
("Just a www.example.com link.", {},
u'Just a <a href="http://www.example.com">www.example.com</a> link.'),
- ("Just a www.example.com link.",
+ ("Just a www.example.com link.",
{"require_protocol": True},
u'Just a www.example.com link.'),
from tornado import gen
+
class GenTest(AsyncTestCase):
def run_gen(self, f):
f()
def test_exception_phase1(self):
@gen.engine
def f():
- 1/0
+ 1 / 0
self.assertRaises(ZeroDivisionError, self.run_gen, f)
def test_exception_phase2(self):
def f():
self.io_loop.add_callback((yield gen.Callback("k1")))
yield gen.Wait("k1")
- 1/0
+ 1 / 0
self.assertRaises(ZeroDivisionError, self.run_gen, f)
def test_exception_in_task_phase1(self):
def fail_task(callback):
- 1/0
+ 1 / 0
@gen.engine
def f():
def test_exception_in_task_phase2(self):
# This is the case that requires the use of stack_context in gen.engine
def fail_task(callback):
- self.io_loop.add_callback(lambda: 1/0)
+ self.io_loop.add_callback(lambda: 1 / 0)
@gen.engine
def f():
yield gen.Wait("k1")
self.finish("3")
+
class GenTaskHandler(RequestHandler):
@asynchronous
@gen.engine
response.rethrow()
self.finish(b("got response: ") + response.body)
+
class GenExceptionHandler(RequestHandler):
@asynchronous
@gen.engine
yield gen.Task(io_loop.add_callback)
raise Exception("oops")
+
class GenYieldExceptionHandler(RequestHandler):
@asynchronous
@gen.engine
def get(self):
io_loop = self.request.connection.stream.io_loop
# Test the interaction of the two stack_contexts.
+
def fail_task(callback):
- io_loop.add_callback(lambda: 1/0)
+ io_loop.add_callback(lambda: 1 / 0)
try:
yield gen.Task(fail_task)
raise Exception("did not get expected exception")
except ZeroDivisionError:
self.finish('ok')
+
class GenWebTest(AsyncHTTPTestCase, LogTrapTestCase):
def get_app(self):
return Application([
from tornado.util import b, bytes_type
from tornado.web import Application, RequestHandler, url
+
class HelloWorldHandler(RequestHandler):
def get(self):
name = self.get_argument("name", "world")
self.set_header("Content-Type", "text/plain")
self.finish("Hello %s!" % name)
+
class PostHandler(RequestHandler):
def post(self):
self.finish("Post arg1: %s, arg2: %s" % (
self.get_argument("arg1"), self.get_argument("arg2")))
+
class ChunkHandler(RequestHandler):
def get(self):
self.write("asdf")
self.flush()
self.write("qwer")
+
class AuthHandler(RequestHandler):
def get(self):
self.finish(self.request.headers["Authorization"])
+
class CountdownHandler(RequestHandler):
def get(self, count):
count = int(count)
else:
self.write("Zero")
+
class EchoPostHandler(RequestHandler):
def post(self):
self.write(self.request.body)
# These tests end up getting run redundantly: once here with the default
# HTTPClient implementation, and then again in each implementation's own
# test suite.
+
+
class HTTPClientCommonTestCase(AsyncHTTPTestCase, LogTrapTestCase):
def get_http_client(self):
"""Returns AsyncHTTPClient instance. May be overridden in subclass."""
0
""").replace(b("\n"), b("\r\n")), callback=stream.close)
+
def accept_callback(conn, address):
# fake an HTTP server using chunked encoding where the final chunks
# and connection close all happen at once
resp = self.wait()
resp.rethrow()
self.assertEqual(resp.body, b("12"))
-
def test_basic_auth(self):
self.assertEqual(self.fetch("/auth", auth_username="Aladdin",
except ImportError:
ssl = None
+
class HandlerBaseTestCase(AsyncHTTPTestCase, LogTrapTestCase):
def get_app(self):
return Application([('/', self.__class__.Handler)])
response.rethrow()
return json_decode(response.body)
+
class HelloWorldRequestHandler(RequestHandler):
def initialize(self, protocol="http"):
self.expected_protocol = protocol
def post(self):
self.finish("Got %d bytes in POST" % len(self.request.body))
+
class BaseSSLTest(AsyncHTTPTestCase, LogTrapTestCase):
def get_ssl_version(self):
raise NotImplementedError()
force_instance=True)
def get_app(self):
- return Application([('/', HelloWorldRequestHandler,
+ return Application([('/', HelloWorldRequestHandler,
dict(protocol="https"))])
def get_httpserver_options(self):
**kwargs)
return self.wait()
+
class SSLTestMixin(object):
def test_ssl(self):
response = self.fetch('/')
def test_large_post(self):
response = self.fetch('/',
method='POST',
- body='A'*5000)
+ body='A' * 5000)
self.assertEqual(response.body, b("Got 5000 bytes in POST"))
def test_non_ssl_request(self):
# For example, SSLv3 and TLSv1 throw an exception if you try to read
# from the socket before the handshake is complete, but the default
# of SSLv23 allows it.
+
+
class SSLv23Test(BaseSSLTest, SSLTestMixin):
- def get_ssl_version(self): return ssl.PROTOCOL_SSLv23
+ def get_ssl_version(self):
+ return ssl.PROTOCOL_SSLv23
+
+
class SSLv3Test(BaseSSLTest, SSLTestMixin):
- def get_ssl_version(self): return ssl.PROTOCOL_SSLv3
+ def get_ssl_version(self):
+ return ssl.PROTOCOL_SSLv3
+
+
class TLSv1Test(BaseSSLTest, SSLTestMixin):
- def get_ssl_version(self): return ssl.PROTOCOL_TLSv1
+ def get_ssl_version(self):
+ return ssl.PROTOCOL_TLSv1
if hasattr(ssl, 'PROTOCOL_SSLv2'):
class SSLv2Test(BaseSSLTest):
- def get_ssl_version(self): return ssl.PROTOCOL_SSLv2
+ def get_ssl_version(self):
+ return ssl.PROTOCOL_SSLv2
def test_sslv2_fail(self):
# This is really more of a client test, but run it here since
del SSLv23Test
del SSLv3Test
del TLSv1Test
-elif getattr(ssl, 'OPENSSL_VERSION_INFO', (0,0)) < (1,0):
+elif getattr(ssl, 'OPENSSL_VERSION_INFO', (0, 0)) < (1, 0):
# In pre-1.0 versions of openssl, SSLv23 clients always send SSLv2
# ClientHello messages, which are rejected by SSLv3 and TLSv1
# servers. Note that while the OPENSSL_VERSION_INFO was formally
del SSLv3Test
del TLSv1Test
+
class MultipartTestHandler(RequestHandler):
def post(self):
self.finish({"header": self.request.headers["X-Header-Encoding-Test"],
"filebody": _unicode(self.request.files["files"][0]["body"]),
})
+
class RawRequestHTTPConnection(simple_httpclient._HTTPConnection):
def set_request(self, request):
self.__next_request = request
self.stream.read_until(b("\r\n\r\n"), self._on_headers)
# This test is also called from wsgi_test
+
+
class HTTPConnectionTest(AsyncHTTPTestCase, LogTrapTestCase):
def get_handlers(self):
return [("/multipart", MultipartTestHandler),
conn = RawRequestHTTPConnection(self.io_loop, client,
httpclient.HTTPRequest(self.get_url("/")),
None, self.stop,
- 1024*1024)
+ 1024 * 1024)
conn.set_request(
b("\r\n").join(headers +
[utf8("Content-Length: %d\r\n" % len(body))]) +
self.assertEqual(body, b("Got 1024 bytes in POST"))
stream.close()
+
class EchoHandler(RequestHandler):
def get(self):
self.write(recursive_unicode(self.request.arguments))
+
class TypeCheckHandler(RequestHandler):
def prepare(self):
self.errors = {}
def check_type(self, name, obj, expected_type):
actual_type = type(obj)
if expected_type != actual_type:
- self.errors[name] = "expected %s, got %s" % (expected_type,
+ self.errors[name] = "expected %s, got %s" % (expected_type,
actual_type)
+
class HTTPServerTest(AsyncHTTPTestCase, LogTrapTestCase):
def get_app(self):
return Application([("/echo", EchoHandler),
data = json_decode(response.body)
self.assertEqual(data, {})
+
class XHeaderTest(HandlerBaseTestCase):
class Handler(RequestHandler):
def get(self):
def test_url_concat_no_query_params(self):
url = url_concat(
"https://localhost/path",
- [('y','y'), ('z','z')],
+ [('y', 'y'), ('z', 'z')],
)
self.assertEqual(url, "https://localhost/path?y=y&z=z")
def test_url_concat_encode_args(self):
url = url_concat(
"https://localhost/path",
- [('y','/y'), ('z','z')],
+ [('y', '/y'), ('z', 'z')],
)
self.assertEqual(url, "https://localhost/path?y=%2Fy&z=z")
def test_url_concat_trailing_q(self):
url = url_concat(
"https://localhost/path?",
- [('y','y'), ('z','z')],
+ [('y', 'y'), ('z', 'z')],
)
self.assertEqual(url, "https://localhost/path?y=y&z=z")
def test_url_concat_q_with_no_trailing_amp(self):
url = url_concat(
"https://localhost/path?x",
- [('y','y'), ('z','z')],
+ [('y', 'y'), ('z', 'z')],
)
self.assertEqual(url, "https://localhost/path?x&y=y&z=z")
def test_url_concat_trailing_amp(self):
url = url_concat(
"https://localhost/path?x&",
- [('y','y'), ('z','z')],
+ [('y', 'y'), ('z', 'z')],
)
self.assertEqual(url, "https://localhost/path?x&y=y&z=z")
def test_url_concat_mult_params(self):
url = url_concat(
"https://localhost/path?a=1&b=2",
- [('y','y'), ('z','z')],
+ [('y', 'y'), ('z', 'z')],
)
self.assertEqual(url, "https://localhost/path?a=1&b=2&y=y&z=z")
)
self.assertEqual(url, "https://localhost/path?r=1&t=2")
+
class MultipartFormDataTest(LogTrapTestCase):
def test_file_upload(self):
data = b("""\
file = files["files"][0]
self.assertEqual(file["filename"], "ab.txt")
self.assertEqual(file["body"], b("Foo"))
-
+
def test_unquoted_names(self):
# quotes are optional unless special characters are present
data = b("""\
file = files["files"][0]
self.assertEqual(file["filename"], "ab.txt")
self.assertEqual(file["body"], b("Foo"))
-
+
def test_special_filenames(self):
filenames = ['a;b.txt',
'a"b.txt',
from __future__ import absolute_import, division, with_statement
import unittest
+
class ImportTest(unittest.TestCase):
def test_import_everything(self):
# Some of our modules are not otherwise tested. Import them
from tornado.testing import AsyncTestCase, LogTrapTestCase
+
class TestIOLoop(AsyncTestCase, LogTrapTestCase):
def test_add_callback_wakeup(self):
# Make sure that add_callback from inside a running IOLoop
import socket
import time
+
class HelloHandler(RequestHandler):
def get(self):
self.write("Hello")
+
class TestIOStream(AsyncHTTPTestCase, LogTrapTestCase):
def get_app(self):
return Application([('/', HelloHandler)])
[listener] = netutil.bind_sockets(port, '127.0.0.1',
family=socket.AF_INET)
streams = [None, None]
+
def accept_callback(connection, address):
streams[0] = IOStream(connection, io_loop=self.io_loop, **kwargs)
self.stop()
+
def connect_callback():
streams[1] = client_stream
self.stop()
self.wait()
# As a side effect, the stream is now listening for connection
# close (if it wasn't already), but is not listening for writes
- self.assertEqual(server._state, IOLoop.READ|IOLoop.ERROR)
+ self.assertEqual(server._state, IOLoop.READ | IOLoop.ERROR)
server.close()
client.close()
port = get_unused_port()
stream = IOStream(socket.socket(), self.io_loop)
self.connect_called = False
+
def connect_callback():
self.connect_called = True
stream.set_close_callback(self.stop)
s.connect(("localhost", self.get_http_port()))
stream = IOStream(s, io_loop=self.io_loop)
stream.write(b("GET / HTTP/1.0\r\n\r\n"))
-
+
stream.read_until_close(self.stop)
data = self.wait()
self.assertTrue(data.startswith(b("HTTP/1.0 200")))
try:
chunks = []
final_called = []
+
def streaming_callback(data):
chunks.append(data)
self.stop()
+
def final_callback(data):
assert not data
final_called.append(True)
server, client = self.make_iostream_pair()
try:
chunks = []
+
def callback(data):
chunks.append(data)
self.stop()
client.set_close_callback(self.stop)
server.write(b("12"))
chunks = []
+
def callback1(data):
chunks.append(data)
client.read_bytes(1, callback2)
server.close()
+
def callback2(data):
chunks.append(data)
client.read_bytes(1, callback1)
# Not using AsyncHTTPTestCase because we need control over the IOLoop.
# Logging is tricky here so you may want to replace LogTrapTestCase
# with unittest.TestCase when debugging.
+
+
class ProcessTest(LogTrapTestCase):
def get_app(self):
class ProcessHandler(RequestHandler):
def test_multi_process(self):
self.assertFalse(IOLoop.initialized())
port = get_unused_port()
+
def get_url(path):
return "http://127.0.0.1:%d%s" % (port, path)
sockets = bind_sockets(port, "127.0.0.1")
# finished with status 0
self.assertEqual(e.code, 0)
self.assertTrue(task_id() is None)
- for sock in sockets: sock.close()
+ for sock in sockets:
+ sock.close()
signal.alarm(0)
return
signal.alarm(5) # child process
elif id == 2:
signal.alarm(5)
self.assertEqual(id, task_id())
- for sock in sockets: sock.close()
+ for sock in sockets:
+ sock.close()
# Always use SimpleAsyncHTTPClient here; the curl
# version appears to get confused sometimes if the
# connection gets closed before it's had a chance to
except Exception:
logging.error("exception in child process %d", id, exc_info=True)
raise
-
+
if os.name != 'posix' or sys.platform == 'cygwin':
# All sorts of unixisms here
"pypy",
]
+
def exists_on_path(filename):
for dir in os.environ["PATH"].split(":"):
if os.path.exists(os.path.join(dir, filename)):
return True
return False
+
def main():
for interpreter in INTERPRETERS:
print "=================== %s =======================" % interpreter
'tornado.test.wsgi_test',
]
+
def all():
return unittest.defaultTestLoader.loadTestsFromNames(TEST_MODULES)
from tornado.util import b
from tornado.web import RequestHandler, Application, asynchronous, url
+
class SimpleHTTPClientCommonTestCase(HTTPClientCommonTestCase):
def get_http_client(self):
client = SimpleAsyncHTTPClient(io_loop=self.io_loop,
# try to run it again.
del HTTPClientCommonTestCase
+
class TriggerHandler(RequestHandler):
def initialize(self, queue, wake_callback):
self.queue = queue
if self.get_argument("wake", "true") == "true":
self.wake_callback()
+
class HangHandler(RequestHandler):
@asynchronous
def get(self):
pass
+
class ContentLengthHandler(RequestHandler):
def get(self):
self.set_header("Content-Length", self.get_argument("value"))
self.write("ok")
+
class HeadHandler(RequestHandler):
def head(self):
self.set_header("Content-Length", "7")
+
class NoContentHandler(RequestHandler):
def get(self):
if self.get_argument("error", None):
self.set_header("Content-Length", "7")
self.set_status(204)
+
class SeeOther303PostHandler(RequestHandler):
def post(self):
assert self.request.body == b("blah")
self.set_header("Location", "/303_get")
self.set_status(303)
+
class SeeOther303GetHandler(RequestHandler):
def get(self):
assert not self.request.body
import logging
import unittest
+
class TestRequestHandler(RequestHandler):
def __init__(self, app, request, io_loop):
super(TestRequestHandler, self).__init__(app, request)
else:
return 'unexpected failure'
+
class HTTPStackContextTest(AsyncHTTPTestCase, LogTrapTestCase):
def get_app(self):
return Application([('/', TestRequestHandler,
self.response = response
self.stop()
+
class StackContextTest(AsyncTestCase, LogTrapTestCase):
def setUp(self):
super(StackContextTest, self).setUp()
with StackContext(functools.partial(self.context, 'library')):
self.io_loop.add_callback(
functools.partial(library_inner_callback, callback))
+
def library_inner_callback(callback):
self.assertEqual(self.active_contexts[-2:],
['application', 'library'])
callback()
+
def final_callback():
# implementation detail: the full context stack at this point
# is ['application', 'library', 'application']. The 'library'
from tornado.testing import LogTrapTestCase
from tornado.util import b, bytes_type, ObjectDict
+
class TemplateTest(LogTrapTestCase):
def test_simple(self):
template = Template("Hello {{ name }}!")
self.assertEqual(template.generate(), utf8(u"\u00e9"))
def test_custom_namespace(self):
- loader = DictLoader({"test.html": "{{ inc(5) }}"}, namespace={"inc": lambda x: x+1})
+ loader = DictLoader({"test.html": "{{ inc(5) }}"}, namespace={"inc": lambda x: x + 1})
self.assertEqual(loader.load("test.html").generate(), b("6"))
def test_apply(self):
- def upper(s): return s.upper()
+ def upper(s):
+ return s.upper()
template = Template(utf8("{% apply upper %}foo{% end %}"))
self.assertEqual(template.generate(upper=upper), b("FOO"))
template = Template(utf8("{% comment blah blah %}foo"))
self.assertEqual(template.generate(), b("foo"))
+
class StackTraceTest(LogTrapTestCase):
def test_error_line_number_expression(self):
loader = DictLoader({"test.html": """one
exc_stack = traceback.format_exc()
self.assertTrue("# base.html:1" in exc_stack)
-
def test_error_line_number_extends_sub_error(self):
loader = DictLoader({
"base.html": "{% block 'block' %}{% end %}",
expr: {{ name }}
raw: {% raw name %}""",
}
-
+
def test_default_off(self):
loader = DictLoader(self.templates, autoescape=None)
name = "Bobby <table>s"
b("escaped: Bobby <table>s\n"
"unescaped: Bobby <table>s\n"
"default: Bobby <table>s\n"))
-
+
def test_default_on(self):
loader = DictLoader(self.templates, autoescape="xhtml_escape")
name = "Bobby <table>s"
b("Bobby <table>s"))
self.assertEqual(loader.load("default.html").generate(name=name),
b("Bobby <table>s"))
-
+
self.assertEqual(loader.load("include.html").generate(name=name),
b("escaped: Bobby <table>s\n"
"unescaped: Bobby <table>s\n"
def test_extended_block(self):
loader = DictLoader(self.templates)
- def render(name): return loader.load(name).generate(name="<script>")
+
+ def render(name):
+ return loader.load(name).generate(name="<script>")
self.assertEqual(render("escaped_extends_unescaped.html"),
b("base: <script>"))
self.assertEqual(render("escaped_overrides_unescaped.html"),
def test_raw_expression(self):
loader = DictLoader(self.templates)
- def render(name): return loader.load(name).generate(name='<>&"')
+
+ def render(name):
+ return loader.load(name).generate(name='<>&"')
self.assertEqual(render("raw_expression.html"),
b("expr: <>&"\n"
"raw: <>&\""))
def test_custom_escape(self):
- loader = DictLoader({"foo.py":
+ loader = DictLoader({"foo.py":
"{% autoescape py_escape %}s = {{ name }}\n"})
+
def py_escape(s):
self.assertEqual(type(s), bytes_type)
return repr(native_str(s))
+
def render(template, name):
- return loader.load(template).generate(py_escape=py_escape,
+ return loader.load(template).generate(py_escape=py_escape,
name=name)
self.assertEqual(render("foo.py", "<html>"),
b("s = '<html>'\n"))
import unittest
from tornado.testing import AsyncTestCase, LogTrapTestCase
+
class AsyncTestCaseTest(AsyncTestCase, LogTrapTestCase):
def test_exception_in_callback(self):
- self.io_loop.add_callback(lambda: 1/0)
+ self.io_loop.add_callback(lambda: 1 / 0)
try:
self.wait()
self.fail("did not get expected exception")
except ZeroDivisionError:
pass
+
class SetUpTearDownTest(unittest.TestCase):
def test_set_up_tear_down(self):
"""
fcntl = None
twisted = None
IReadDescriptor = IWriteDescriptor = None
- def implements(f): pass
+
+ def implements(f):
+ pass
from tornado.httpclient import AsyncHTTPClient
from tornado.ioloop import IOLoop
from tornado.util import import_object
from tornado.web import RequestHandler, Application
+
class ReactorTestCase(unittest.TestCase):
def setUp(self):
self._io_loop = IOLoop()
def tearDown(self):
self._io_loop.close(all_fds=True)
+
class ReactorWhenRunningTest(ReactorTestCase):
def test_whenRunning(self):
self._whenRunningCalled = False
def anotherWhenRunningCallback(self):
self._anotherWhenRunningCalled = True
+
class ReactorCallLaterTest(ReactorTestCase):
def test_callLater(self):
self._laterCalled = False
self._called = self._reactor.seconds()
self._reactor.stop()
+
class ReactorTwoCallLaterTest(ReactorTestCase):
def test_callLater(self):
self._later1Called = False
self._called2 = self._reactor.seconds()
self._reactor.stop()
+
class ReactorCallFromThreadTest(ReactorTestCase):
def setUp(self):
super(ReactorCallFromThreadTest, self).setUp()
self._reactor.callWhenRunning(self._whenRunningCallback)
self._reactor.run()
+
class ReactorCallInThread(ReactorTestCase):
def setUp(self):
super(ReactorCallInThread, self).setUp()
self._reactor.callWhenRunning(self._whenRunningCallback)
self._reactor.run()
+
class Reader:
implements(IReadDescriptor)
self._fd = fd
self._callback = callback
- def logPrefix(self): return "Reader"
+ def logPrefix(self):
+ return "Reader"
def close(self):
self._fd.close()
def doRead(self):
self._callback(self._fd)
+
class Writer:
implements(IWriteDescriptor)
self._fd = fd
self._callback = callback
- def logPrefix(self): return "Writer"
+ def logPrefix(self):
+ return "Writer"
def close(self):
self._fd.close()
def doWrite(self):
self._callback(self._fd)
+
class ReactorReaderWriterTest(ReactorTestCase):
def _set_nonblocking(self, fd):
flags = fcntl.fcntl(fd, fcntl.F_GETFL)
reads it, check the value and ends the test.
"""
self.shouldWrite = True
+
def checkReadInput(fd):
self.assertEquals(fd.read(), 'x')
self._reactor.stop()
+
def writeOnce(fd):
if self.shouldWrite:
self.shouldWrite = False
# Test various combinations of twisted and tornado http servers,
# http clients, and event loop interfaces.
+
+
class CompatibilityTests(unittest.TestCase):
def setUp(self):
self.io_loop = IOLoop()
def start_twisted_server(self):
class HelloResource(Resource):
isLeaf = True
+
def render_GET(self, request):
return "Hello from twisted!"
site = Site(HelloResource())
def tornado_fetch(self, url, runner):
responses = []
client = AsyncHTTPClient(self.io_loop)
+
def callback(response):
responses.append(response)
self.stop_loop()
chunks = []
client = Agent(self.reactor)
d = client.request('GET', url)
+
class Accumulator(Protocol):
def __init__(self, finished):
self.finished = finished
+
def dataReceived(self, data):
chunks.append(data)
+
def connectionLost(self, reason):
self.finished.callback(None)
+
def callback(response):
finished = Deferred()
response.deliverBody(Accumulator(finished))
return finished
d.addCallback(callback)
+
def shutdown(ignored):
self.stop_loop()
d.addBoth(shutdown)
# The test_func may be defined in a mixin, so clobber
# it instead of delattr()
setattr(test_class, test_func, lambda self: None)
+
def make_test_subclass(test_class):
class TornadoTest(test_class):
_reactors = ["tornado.platform.twisted._TestReactor"]
+
def unbuildReactor(self, reactor):
test_class.unbuildReactor(self, reactor)
# Clean up file descriptors (especially epoll/kqueue
import socket
import sys
+
class CookieTestRequestHandler(RequestHandler):
# stub out enough methods to make the secure_cookie functions work
def __init__(self):
def set_cookie(self, name, value, expires_days=None):
self._cookies[name] = value
+
class SecureCookieTest(LogTrapTestCase):
def test_round_trip(self):
handler = CookieTestRequestHandler()
# it gets rejected
assert handler.get_secure_cookie('foo') is None
+
class CookieTest(AsyncHTTPTestCase, LogTrapTestCase):
def get_app(self):
class SetCookieHandler(RequestHandler):
self.set_cookie("semicolon", "a;b")
self.set_cookie("quote", 'a"b')
-
return Application([
("/set", SetCookieHandler),
("/get", GetCookieHandler),
response = self.fetch("/get", headers={"Cookie": header})
self.assertEqual(response.body, utf8(expected))
+
class AuthRedirectRequestHandler(RequestHandler):
def initialize(self, login_url):
self.login_url = login_url
# we'll never actually get here because the test doesn't follow redirects
self.send_error(500)
+
class AuthRedirectTest(AsyncHTTPTestCase, LogTrapTestCase):
def get_app(self):
return Application([('/relative', AuthRedirectRequestHandler,
def on_connection_close(self):
self.test.on_connection_close()
+
class ConnectionCloseTest(AsyncHTTPTestCase, LogTrapTestCase):
def get_app(self):
return Application([('/', ConnectionCloseHandler, dict(test=self))])
logging.info('connection closed')
self.stop()
+
class EchoHandler(RequestHandler):
def get(self, path):
# Type checks: web.py interfaces convert argument values to
self.write(dict(path=path,
args=recursive_unicode(self.request.arguments)))
+
class RequestEncodingTest(AsyncHTTPTestCase, LogTrapTestCase):
def get_app(self):
return Application([("/(.*)", EchoHandler)])
def test_path_encoding(self):
# Path components and query arguments should be decoded the same way
self.assertEqual(json_decode(self.fetch('/%C3%A9?arg=%C3%A9').body),
- {u"path":u"\u00e9",
+ {u"path": u"\u00e9",
u"args": {u"arg": [u"\u00e9"]}})
+
class TypeCheckHandler(RequestHandler):
def prepare(self):
self.errors = {}
self.errors[name] = "expected %s, got %s" % (expected_type,
actual_type)
+
class DecodeArgHandler(RequestHandler):
def decode_argument(self, value, name=None):
assert type(value) == bytes_type, repr(value)
'query': describe(self.get_argument("foo")),
})
+
class LinkifyHandler(RequestHandler):
def get(self):
self.render("linkify.html", message="http://example.com")
+
class UIModuleResourceHandler(RequestHandler):
def get(self):
- self.render("page.html", entries=[1,2])
+ self.render("page.html", entries=[1, 2])
+
class OptionalPathHandler(RequestHandler):
def get(self, path):
self.write({"path": path})
+
class FlowControlHandler(RequestHandler):
# These writes are too small to demonstrate real flow control,
# but at least it shows that the callbacks get run.
self.write("3")
self.finish()
+
class MultiHeaderHandler(RequestHandler):
def get(self):
self.set_header("x-overwrite", "1")
self.add_header("x-multi", 3)
self.add_header("x-multi", "4")
+
class RedirectHandler(RequestHandler):
def get(self):
if self.get_argument('permanent', None) is not None:
def get(self):
if self.get_argument("status", None):
raise HTTPError(int(self.get_argument("status")))
- 1/0
+ 1 / 0
class WriteErrorHandler(RequestHandler):
def get(self):
if self.get_argument("status", None):
self.send_error(int(self.get_argument("status")))
else:
- 1/0
+ 1 / 0
def write_error(self, status_code, **kwargs):
self.set_header("Content-Type", "text/plain")
if self.get_argument("status", None):
self.send_error(int(self.get_argument("status")))
else:
- 1/0
+ 1 / 0
def get_error_html(self, status_code, **kwargs):
self.set_header("Content-Type", "text/plain")
class FailedWriteErrorHandler(RequestHandler):
def get(self):
- 1/0
+ 1 / 0
def write_error(self, status_code, **kwargs):
raise Exception("exception in write_error")
-
return Application([
url("/default", DefaultHandler),
url("/write_error", WriteErrorHandler),
self.assertEqual(response.code, 500)
self.assertEqual(b(""), response.body)
+
class StaticFileTest(AsyncHTTPTestCase, LogTrapTestCase):
def get_app(self):
class StaticUrlHandler(RequestHandler):
class AbsoluteStaticUrlHandler(RequestHandler):
include_host = True
+
def get(self, path):
self.write(self.static_url(path))
response = self.fetch(path % int(include_host))
self.assertEqual(response.body, utf8(str(True)))
+
class CustomStaticFileTest(AsyncHTTPTestCase, LogTrapTestCase):
def get_app(self):
class MyStaticFileHandler(StaticFileHandler):
from tornado.web import RequestHandler
from tornado.wsgi import WSGIApplication, WSGIContainer
+
class WSGIContainerTest(AsyncHTTPTestCase, LogTrapTestCase):
def wsgi_app(self, environ, start_response):
status = "200 OK"
response = self.fetch("/")
self.assertEqual(response.body, b("Hello world!"))
+
class WSGIApplicationTest(AsyncHTTPTestCase, LogTrapTestCase):
def get_app(self):
class HelloHandler(RequestHandler):
# repeated disassembly and reassembly.
from tornado.test.httpserver_test import HTTPConnectionTest
+
class WSGIConnectionTest(HTTPConnectionTest):
def get_app(self):
return WSGIContainer(validator(WSGIApplication(self.get_handlers())))
import unittest
_next_port = 10000
+
+
def get_unused_port():
"""Returns a (hopefully) unused port number."""
global _next_port
_next_port = _next_port + 1
return port
+
class AsyncTestCase(unittest.TestCase):
"""TestCase subclass for testing IOLoop-based asynchronous code.
self.http_client.close()
super(AsyncHTTPTestCase, self).tearDown()
+
class LogTrapTestCase(unittest.TestCase):
"""A test case that captures and discards all logging output
if the test passes.
finally:
handler.stream = old_stream
+
def main():
"""A simple test runner.
from __future__ import absolute_import, division, with_statement
+
class ObjectDict(dict):
"""Makes a dictionary behave like an object."""
def __getattr__(self, name):
return s
bytes_type = str
+
def doctests():
import doctest
return doctest.DocTestSuite()
except ImportError:
from cStringIO import StringIO as BytesIO # python 2
+
class RequestHandler(object):
"""Subclass this class and define get() or post() to make a handler.
"""Resets all headers and content for this response."""
# The performance cost of tornado.httputil.HTTPHeaders is significant
# (slowing down a benchmark with a trivial handler by more than 10%),
- # and its case-normalization is not generally necessary for
+ # and its case-normalization is not generally necessary for
# headers we generate on the server side, so use a plain dict
# and list instead.
self._headers = {
raise ValueError("Unsafe header value %r", value)
return value
-
_ARG_DEFAULT = []
+
def get_argument(self, name, default=_ARG_DEFAULT, strip=True):
"""Returns the value of the argument with the given name.
if path:
new_cookie[name]["path"] = path
for k, v in kwargs.iteritems():
- if k == 'max_age': k = 'max-age'
+ if k == 'max_age':
+ k = 'max-age'
new_cookie[name][k] = v
def clear_cookie(self, name, path="/", domain=None):
def get_secure_cookie(self, name, value=None, max_age_days=31):
"""Returns the given signed cookie if it validates, or None."""
self.require_setting("cookie_secret", "secure cookies")
- if value is None: value = self.get_cookie(name)
+ if value is None:
+ value = self.get_cookie(name)
return decode_signed_value(self.application.settings["cookie_secret"],
name, value, max_age_days=max_age_days)
html_bodies = []
for module in getattr(self, "_active_modules", {}).itervalues():
embed_part = module.embedded_javascript()
- if embed_part: js_embed.append(utf8(embed_part))
+ if embed_part:
+ js_embed.append(utf8(embed_part))
file_part = module.javascript_files()
if file_part:
if isinstance(file_part, (unicode, bytes_type)):
else:
js_files.extend(file_part)
embed_part = module.embedded_css()
- if embed_part: css_embed.append(utf8(embed_part))
+ if embed_part:
+ css_embed.append(utf8(embed_part))
file_part = module.css_files()
if file_part:
if isinstance(file_part, (unicode, bytes_type)):
else:
css_files.extend(file_part)
head_part = module.html_head()
- if head_part: html_heads.append(utf8(head_part))
+ if head_part:
+ html_heads.append(utf8(head_part))
body_part = module.html_body()
- if body_part: html_bodies.append(utf8(body_part))
+ if body_part:
+ html_bodies.append(utf8(body_part))
+
def is_absolute(path):
return any(path.startswith(x) for x in ["/", "http:", "https:"])
if js_files:
kwargs["autoescape"] = settings["autoescape"]
return template.Loader(template_path, **kwargs)
-
def flush(self, include_footers=False, callback=None):
"""Flushes the current output buffer to the network.
-
+
The ``callback`` argument, if given, can be used for flow control:
it will be run when all flushed data has been written to the socket.
Note that only one flush callback can be outstanding at a time;
# Ignore the chunk and only write the headers for HEAD requests
if self.request.method == "HEAD":
- if headers: self.request.write(headers, callback=callback)
+ if headers:
+ self.request.write(headers, callback=callback)
return
if headers or chunk:
"by using async operations without the "
"@asynchronous decorator.")
- if chunk is not None: self.write(chunk)
+ if chunk is not None:
+ self.write(chunk)
# Automatically support ETags and add the Content-Length header if
# we have not flushed any content yet.
self.write(line)
self.finish()
else:
- self.finish("<html><title>%(code)d: %(message)s</title>"
+ self.finish("<html><title>%(code)d: %(message)s</title>"
"<body>%(code)d: %(message)s</body></html>" % {
"code": status_code,
"message": httplib.responses[status_code],
return None
if args or kwargs:
callback = functools.partial(callback, *args, **kwargs)
+
def wrapper(*args, **kwargs):
try:
return callback(*args, **kwargs)
if not self._finished:
args = [self.decode_argument(arg) for arg in args]
kwargs = dict((k, self.decode_argument(v, name=k))
- for (k,v) in kwargs.iteritems())
+ for (k, v) in kwargs.iteritems())
getattr(self, self.request.method.lower())(*args, **kwargs)
if self._auto_finish and not self._finished:
self.finish()
lines = [utf8(self.request.version + " " +
str(self._status_code) +
" " + httplib.responses[self._status_code])]
- lines.extend([(utf8(n) + b(": ") + utf8(v)) for n, v in
+ lines.extend([(utf8(n) + b(": ") + utf8(v)) for n, v in
itertools.chain(self._headers.iteritems(), self._list_headers)])
for cookie_dict in getattr(self, "_new_cookies", []):
for cookie in cookie_dict.values():
if self.request.method in ("GET", "HEAD"):
uri = self.request.path.rstrip("/")
if uri: # don't try to redirect '/' to ''
- if self.request.query: uri += "?" + self.request.query
+ if self.request.query:
+ uri += "?" + self.request.query
self.redirect(uri)
return
else:
if not self.request.path.endswith("/"):
if self.request.method in ("GET", "HEAD"):
uri = self.request.path + "/"
- if self.request.query: uri += "?" + self.request.query
+ if self.request.query:
+ uri += "?" + self.request.query
self.redirect(uri)
return
raise HTTPError(404)
r"/(favicon\.ico)", r"/(robots\.txt)"]:
handlers.insert(0, (pattern, static_handler_class,
static_handler_args))
- if handlers: self.add_handlers(".*$", handlers)
+ if handlers:
+ self.add_handlers(".*$", handlers)
# Automatically reload modified modules
if self.settings.get("debug") and not wsgi:
self._load_ui_methods(dict((n, getattr(methods, n))
for n in dir(methods)))
elif isinstance(methods, list):
- for m in methods: self._load_ui_methods(m)
+ for m in methods:
+ self._load_ui_methods(m)
else:
for name, fn in methods.iteritems():
if not name.startswith("_") and hasattr(fn, "__call__") \
self._load_ui_modules(dict((n, getattr(modules, n))
for n in dir(modules)))
elif isinstance(modules, list):
- for m in modules: self._load_ui_modules(m)
+ for m in modules:
+ self._load_ui_modules(m)
else:
assert isinstance(modules, dict)
for name, cls in modules.iteritems():
# None-safe wrapper around url_unescape to handle
# unmatched optional groups correctly
def unquote(s):
- if s is None: return s
+ if s is None:
+ return s
return escape.url_unescape(s, encoding=None)
# Pass matched groups to the handler. Since
# match.groups() includes both named and unnamed groups,
handler._request_summary(), request_time)
-
class HTTPError(Exception):
"""An exception that will turn into an HTTP error response."""
def __init__(self, status_code, log_message=None, *args):
/static/images/myimage.png?v=xxx. Override ``get_cache_time`` method for
more fine-grained cache control.
"""
- CACHE_MAX_AGE = 86400*365*10 #10 years
+ CACHE_MAX_AGE = 86400 * 365 * 10 # 10 years
_static_hashes = {}
_lock = threading.Lock() # protects _static_hashes
This method may be overridden in subclasses (but note that it is
a class method rather than an instance method).
-
+
``settings`` is the `Application.settings` dictionary. ``path``
is the static path being requested. The url returned should be
relative to the current host.
See http://www.w3.org/Protocols/rfc2616/rfc2616-sec14.html#sec14.11
"""
CONTENT_TYPES = set([
- "text/plain", "text/html", "text/css", "text/xml", "application/javascript",
+ "text/plain", "text/html", "text/css", "text/xml", "application/javascript",
"application/x-javascript", "application/xml", "application/atom+xml",
"text/javascript", "application/json", "application/xhtml+xml"])
MIN_LENGTH = 5
"""Renders a template and returns it as a string."""
return self.handler.render_string(path, **kwargs)
+
class _linkify(UIModule):
def render(self, text, **kwargs):
return escape.linkify(text, **kwargs)
+
class _xsrf_form_html(UIModule):
def render(self):
return self.handler.xsrf_form_html()
+
class TemplateModule(UIModule):
"""UIModule that simply renders the given template.
inside the template and give it keyword arguments corresponding to
the methods on UIModule: {{ set_resources(js_files=static_url("my.js")) }}
Note that these resources are output once per template file, not once
- per instantiation of the template, so they must not depend on
+ per instantiation of the template, so they must not depend on
any arguments to the template.
"""
def __init__(self, handler):
return "".join(self._get_resources("html_body"))
-
class URLSpec(object):
"""Specifies mappings between URLs and handlers."""
def __init__(self, pattern, handler_class, kwargs={}, name=None):
return False
result = 0
if type(a[0]) is int: # python3 byte strings
- for x, y in zip(a,b):
+ for x, y in zip(a, b):
result |= x ^ y
else: # python2
for x, y in zip(a, b):
result |= ord(x) ^ ord(y)
return result == 0
+
def create_signed_value(secret, name, value):
timestamp = utf8(str(int(time.time())))
value = base64.b64encode(utf8(value))
value = b("|").join([value, timestamp, signature])
return value
+
def decode_signed_value(secret, name, value, max_age_days=31):
- if not value: return None
+ if not value:
+ return None
parts = utf8(value).split(b("|"))
- if len(parts) != 3: return None
+ if len(parts) != 3:
+ return None
signature = _create_signature(secret, name, parts[0], parts[1])
if not _time_independent_equals(parts[2], signature):
logging.warning("Invalid cookie signature %r", value)
except Exception:
return None
+
def _create_signature(secret, *parts):
hash = hmac.new(utf8(secret), digestmod=hashlib.sha1)
- for part in parts: hash.update(utf8(part))
+ for part in parts:
+ hash.update(utf8(part))
return utf8(hash.hexdigest())
from tornado.util import bytes_type, b
+
class WebSocketHandler(tornado.web.RequestHandler):
"""Subclass this class to create a basic WebSocket handler.
may wish to override this if they are using an SSL proxy
that does not provide the X-Scheme header as understood
by HTTPServer.
-
+
Note that this is only used by the draft76 protocol.
"""
return "wss" if self.request.protocol == "https" else "ws"
"""
if args or kwargs:
callback = functools.partial(callback, *args, **kwargs)
+
def wrapper(*args, **kwargs):
try:
return callback(*args, **kwargs)
sha1 = hashlib.sha1()
sha1.update(tornado.escape.utf8(
self.request.headers.get("Sec-Websocket-Key")))
- sha1.update(b("258EAFA5-E914-47DA-95CA-C5AB0DC85B11")) # Magic value
+ sha1.update(b("258EAFA5-E914-47DA-95CA-C5AB0DC85B11")) # Magic value
return tornado.escape.native_str(base64.b64encode(sha1.digest()))
def _accept_connection(self):
self.stream.read_bytes(8, self._on_frame_length_64)
def _on_frame_length_16(self, data):
- self._frame_length = struct.unpack("!H", data)[0];
- self.stream.read_bytes(4, self._on_masking_key);
+ self._frame_length = struct.unpack("!H", data)[0]
+ self.stream.read_bytes(4, self._on_masking_key)
def _on_frame_length_64(self, data):
- self._frame_length = struct.unpack("!Q", data)[0];
- self.stream.read_bytes(4, self._on_masking_key);
+ self._frame_length = struct.unpack("!Q", data)[0]
+ self.stream.read_bytes(4, self._on_masking_key)
def _on_masking_key(self, data):
self._frame_mask = array.array("B", data)
if not self.client_terminated:
self._receive_frame()
-
def _handle_message(self, opcode, data):
- if self.client_terminated: return
+ if self.client_terminated:
+ return
if opcode == 0x1:
# UTF-8 data
between Tornado and other Python web frameworks and servers. This module
provides WSGI support in two ways:
-* `WSGIApplication` is a version of `tornado.web.Application` that can run
+* `WSGIApplication` is a version of `tornado.web.Application` that can run
inside a WSGI server. This is useful for running a Tornado app on another
HTTP server, such as Google App Engine. See the `WSGIApplication` class
documentation for limitations that apply.
except ImportError:
from cStringIO import StringIO as BytesIO # python 2
+
class WSGIApplication(web.Application):
"""A WSGI equivalent of `tornado.web.Application`.
Since no asynchronous methods are available for WSGI applications, the
httpclient and auth modules are both not available for WSGI applications.
We support the same interface, but handlers running in a WSGIApplication
- do not support flush() or asynchronous methods.
+ do not support flush() or asynchronous methods.
"""
def __init__(self, handlers=None, default_host="", **settings):
web.Application.__init__(self, handlers, default_host, transforms=[],
for cookie in cookie_dict.values():
headers.append(("Set-Cookie", cookie.OutputString(None)))
start_response(status,
- [(native_str(k), native_str(v)) for (k,v) in headers])
+ [(native_str(k), native_str(v)) for (k, v) in headers])
return handler._write_buffer
arguments = cgi.parse_qs(self.query)
for name, values in arguments.iteritems():
values = [v for v in values if v]
- if values: self.arguments[name] = values
+ if values:
+ self.arguments[name] = values
self.version = "HTTP/1.1"
self.headers = httputil.HTTPHeaders()
if environ.get("CONTENT_TYPE"):
self.arguments.setdefault(name, []).extend(values)
elif content_type.startswith("multipart/form-data"):
if 'boundary=' in content_type:
- boundary = content_type.split('boundary=',1)[1]
+ boundary = content_type.split('boundary=', 1)[1]
if boundary:
httputil.parse_multipart_form_data(
utf8(boundary), self.body, self.arguments, self.files)
def __call__(self, request):
data = {}
response = []
+
def start_response(status, response_headers, exc_info=None):
data["status"] = status
data["headers"] = response_headers
body = b("").join(response)
if hasattr(app_response, "close"):
app_response.close()
- if not data: raise Exception("WSGI app did not call start_response")
+ if not data:
+ raise Exception("WSGI app did not call start_response")
status_code = int(data["status"].split()[0])
headers = data["headers"]
- header_set = set(k.lower() for (k,v) in headers)
+ header_set = set(k.lower() for (k, v) in headers)
body = escape.utf8(body)
if "content-length" not in header_set:
headers.append(("Content-Length", str(len(body))))