# Other useful tools
Sphinx==1.1.3
-autopep8==0.8.1
+autopep8==0.8.5
coverage==3.5.2
-pep8==1.3.3
+pep8==1.4.1
pyflakes==0.5.0
tox==1.4.2
virtualenv==1.8.2
args = {
"openid.ns": "http://specs.openid.net/auth/2.0",
"openid.claimed_id":
- "http://specs.openid.net/auth/2.0/identifier_select",
+ "http://specs.openid.net/auth/2.0/identifier_select",
"openid.identity":
- "http://specs.openid.net/auth/2.0/identifier_select",
+ "http://specs.openid.net/auth/2.0/identifier_select",
"openid.return_to": url,
"openid.realm": urlparse.urljoin(url, '/'),
"openid.mode": "checkid_setup",
required += ["firstname", "fullname", "lastname"]
args.update({
"openid.ax.type.firstname":
- "http://axschema.org/namePerson/first",
+ "http://axschema.org/namePerson/first",
"openid.ax.type.fullname":
- "http://axschema.org/namePerson",
+ "http://axschema.org/namePerson",
"openid.ax.type.lastname":
- "http://axschema.org/namePerson/last",
+ "http://axschema.org/namePerson/last",
})
known_attrs = {
"email": "http://axschema.org/contact/email",
if oauth_scope:
args.update({
"openid.ns.oauth":
- "http://specs.openid.net/extensions/oauth/1.0",
+ "http://specs.openid.net/extensions/oauth/1.0",
"openid.oauth.consumer": self.request.host.split(":")[0],
"openid.oauth.scope": oauth_scope,
})
ax_ns = None
for name in self.request.arguments.keys():
if name.startswith("openid.ns.") and \
- self.get_argument(name) == u("http://openid.net/srv/ax/1.0"):
+ self.get_argument(name) == u("http://openid.net/srv/ax/1.0"):
ax_ns = name[10:]
break
self.async_callback(
self._on_request_token,
self._OAUTH_AUTHORIZE_URL,
- callback_uri))
+ callback_uri))
else:
http_client.fetch(
self._oauth_request_token_url(),
access_token = _oauth_parse_response(response.body)
self._oauth_get_user(access_token, self.async_callback(
- self._on_oauth_get_user, access_token, callback))
+ self._on_oauth_get_user, access_token, callback))
def _oauth_get_user(self, access_token, callback):
raise NotImplementedError()
args.update(parameters)
if getattr(self, "_OAUTH_VERSION", "1.0a") == "1.0a":
signature = _oauth10a_signature(consumer_token, method, url, args,
- access_token)
+ access_token)
else:
signature = _oauth_signature(consumer_token, method, url, args,
access_token)
process.
"""
args = {
- "redirect_uri": redirect_uri,
- "client_id": client_id
+ "redirect_uri": redirect_uri,
+ "client_id": client_id
}
if extra_params:
args.update(extra_params)
self.redirect(
- url_concat(self._OAUTH_AUTHORIZE_URL, args))
+ url_concat(self._OAUTH_AUTHORIZE_URL, args))
def _oauth_request_token_url(self, redirect_uri=None, client_id=None,
client_secret=None, code=None,
code=code,
client_id=client_id,
client_secret=client_secret,
- )
+ )
if extra_params:
args.update(extra_params)
return url_concat(url, args)
self._on_request_token, self._OAUTH_AUTHENTICATE_URL, None))
def twitter_request(self, path, callback, access_token=None,
- post_args=None, **args):
+ post_args=None, **args):
"""Fetches the given API path, e.g., "/statuses/user_timeline/btaylor"
The path should not include the format (we automatically append
oauth_ns = ""
for name, values in self.request.arguments.iteritems():
if name.startswith("openid.ns.") and \
- values[-1] == u("http://specs.openid.net/extensions/oauth/1.0"):
+ values[-1] == u("http://specs.openid.net/extensions/oauth/1.0"):
oauth_ns = name[10:]
break
token = self.get_argument("openid." + oauth_ns + ".request_token", "")
_OAUTH_NO_CALLBACKS = False
def get_authenticated_user(self, redirect_uri, client_id, client_secret,
- code, callback, extra_fields=None):
+ code, callback, extra_fields=None):
"""Handles the login for the Facebook user, returning a user object.
Example usage::
"""
http = self.get_auth_http_client()
args = {
- "redirect_uri": redirect_uri,
- "code": code,
- "client_id": client_id,
- "client_secret": client_secret,
+ "redirect_uri": redirect_uri,
+ "code": code,
+ "client_id": client_id,
+ "client_secret": client_secret,
}
fields = set(['id', 'name', 'first_name', 'last_name',
fields.update(extra_fields)
http.fetch(self._oauth_request_token_url(**args),
- self.async_callback(self._on_access_token, redirect_uri, client_id,
- client_secret, callback, fields))
+ self.async_callback(self._on_access_token, redirect_uri, client_id,
+ client_secret, callback, fields))
def _on_access_token(self, redirect_uri, client_id, client_secret,
- callback, fields, response):
+ callback, fields, response):
if response.error:
gen_log.warning('Facebook auth error: %s' % str(response))
callback(None)
self._on_get_user_info, callback, session, fields),
access_token=session["access_token"],
fields=",".join(fields)
- )
+ )
def _on_get_user_info(self, callback, session, fields, user):
if user is None:
callback(fieldmap)
def facebook_request(self, path, callback, access_token=None,
- post_args=None, **args):
+ post_args=None, **args):
"""Fetches the given relative API path, e.g., "/btaylor/picture"
If the request is a POST, post_args should be provided. Query
_reload_attempted = False
_io_loops = weakref.WeakKeyDictionary()
+
def start(io_loop=None, check_time=500):
"""Restarts the process automatically when a module is modified.
# to ensure that the new process sees the same path we did.
path_prefix = '.' + os.pathsep
if (sys.path[0] == '' and
- not os.environ.get("PYTHONPATH", "").startswith(path_prefix)):
+ not os.environ.get("PYTHONPATH", "").startswith(path_prefix)):
os.environ["PYTHONPATH"] = (path_prefix +
os.environ.get("PYTHONPATH", ""))
if sys.platform == 'win32':
else:
Future = futures.Future
+
class DummyExecutor(object):
def submit(self, fn, *args, **kwargs):
future = Future()
dummy_executor = DummyExecutor()
+
def run_on_executor(fn):
@functools.wraps(fn)
def wrapper(self, *args, **kwargs):
return wrapper
# TODO: this needs a better name
+
+
def future_wrap(f):
@functools.wraps(f)
def wrapper(*args, **kwargs):
if kwargs.get('callback') is not None:
future.add_done_callback(kwargs.pop('callback'))
kwargs['callback'] = future.set_result
+
def handle_error(typ, value, tb):
future.set_exception(value)
return True
starttransfer=curl.getinfo(pycurl.STARTTRANSFER_TIME),
total=curl.getinfo(pycurl.TOTAL_TIME),
redirect=curl.getinfo(pycurl.REDIRECT_TIME),
- )
+ )
try:
info["callback"](HTTPResponse(
request=info["request"], code=code, headers=info["headers"],
curl.setopt(pycurl.PROXYPORT, request.proxy_port)
if request.proxy_username:
credentials = '%s:%s' % (request.proxy_username,
- request.proxy_password)
+ request.proxy_password)
curl.setopt(pycurl.PROXYUSERPWD, credentials)
else:
curl.setopt(pycurl.PROXY, '')
# (no more slug, etc), so it really just provides a little
# extra indication of shortening.
url = url[:proto_len] + parts[0] + "/" + \
- parts[1][:8].split('?')[0].split('.')[0]
+ parts[1][:8].split('?')[0].split('.')[0]
if len(url) > max_len * 1.5: # still too long
url = url[:max_len]
def _async_clients(cls):
attr_name = '_async_client_dict_' + cls.__name__
if not hasattr(cls, attr_name):
- setattr(cls, attr_name, weakref.WeakKeyDictionary())
+ setattr(cls, attr_name, weakref.WeakKeyDictionary())
return getattr(cls, attr_name)
def __new__(cls, io_loop=None, force_instance=False, **kwargs):
self.response = response
Exception.__init__(self, "HTTP %d: %s" % (self.code, message))
+
class _RequestProxy(object):
"""Combines an object with a dictionary of defaults.
except ImportError:
import http.cookies as Cookie # py3
+
class HTTPServer(TCPServer):
r"""A non-blocking, single-threaded HTTP server.
# HTTPRequest wants an IP, not a full socket address
if getattr(self.stream.socket, 'family', socket.AF_INET) in (
- socket.AF_INET, socket.AF_INET6):
+ socket.AF_INET, socket.AF_INET6):
# Jython 2.5.2 doesn't have the socket.family attribute,
# so just assume IP in that case.
remote_ip = self.address[0]
except ImportError:
from urllib.parse import urlencode # py3
+
class HTTPHeaders(dict):
"""A dictionary that maintains Http-Header-Case for all keys.
self._as_list = {}
self._last_key = None
if (len(args) == 1 and len(kwargs) == 0 and
- isinstance(args[0], HTTPHeaders)):
+ isinstance(args[0], HTTPHeaders)):
# Copy constructor
for k, v in args[0].get_all():
self.add(k, v)
For use with set_blocking_signal_threshold.
"""
gen_log.warning('IOLoop blocked for %f seconds in\n%s',
- self._blocking_signal_threshold,
- ''.join(traceback.format_stack(frame)))
+ self._blocking_signal_threshold,
+ ''.join(traceback.format_stack(frame)))
def start(self):
"""Starts the I/O loop.
_FUTURE_TYPES = (futures.Future, DummyFuture)
else:
_FUTURE_TYPES = DummyFuture
+
def add_future(self, future, callback):
"""Schedules a callback on the IOLoop when the given future is finished.
app_log.error("Exception in callback %r", callback, exc_info=True)
-
class PollIOLoop(IOLoop):
"""Base class for IOLoops built around a select-like function.
def set_blocking_signal_threshold(self, seconds, action):
if not hasattr(signal, "setitimer"):
gen_log.error("set_blocking_signal_threshold requires a signal module "
- "with the setitimer method")
+ "with the setitimer method")
return
self._blocking_signal_threshold = seconds
if seconds is not None:
raise RuntimeError("IOLoop is closing")
list_empty = not self._callbacks
self._callbacks.append(functools.partial(
- stack_context.wrap(callback), *args, **kwargs))
+ stack_context.wrap(callback), *args, **kwargs))
if list_empty and thread.get_ident() != self._thread_ident:
# If we're in the IOLoop's thread, we know it's not currently
# polling. If we're not, and we added the first callback to an
# either the old or new version of self._callbacks,
# but either way will work.
self._callbacks.append(functools.partial(
- stack_context.wrap(callback), *args, **kwargs))
+ stack_context.wrap(callback), *args, **kwargs))
class _Timeout(object):
except ImportError:
_set_nonblocking = None
+
class StreamClosedError(IOError):
pass
+
class BaseIOStream(object):
"""A utility class to write to and read from a non-blocking file or socket.
def _maybe_run_close_callback(self):
if (self.closed() and self._close_callback and
- self._pending_callbacks == 0):
+ self._pending_callbacks == 0):
# if there are pending callbacks, don't run the close callback
# until they're done (see _maybe_add_error_handler)
cb = self._close_callback
return None
return chunk
+
class PipeIOStream(BaseIOStream):
"""Pipe-based IOStream implementation.
if days == 0:
format = _("%(time)s")
elif days == 1 and local_date.day == local_yesterday.day and \
- relative:
+ relative:
format = _("yesterday") if shorter else \
- _("yesterday at %(time)s")
+ _("yesterday at %(time)s")
elif days < 5:
format = _("%(weekday)s") if shorter else \
- _("%(weekday)s at %(time)s")
+ _("%(weekday)s at %(time)s")
elif days < 334: # 11mo, since confusing for same month last year
format = _("%(month_name)s %(day)s") if shorter else \
- _("%(month_name)s %(day)s at %(time)s")
+ _("%(month_name)s %(day)s at %(time)s")
if format is None:
format = _("%(month_name)s %(day)s, %(year)s") if shorter else \
- _("%(month_name)s %(day)s, %(year)s at %(time)s")
+ _("%(month_name)s %(day)s, %(year)s at %(time)s")
tfhour_clock = self.code not in ("en", "en_US", "zh_CN")
if tfhour_clock:
app_log = logging.getLogger("tornado.application")
gen_log = logging.getLogger("tornado.general")
+
def _stderr_supports_color():
color = False
if curses and sys.stderr.isatty():
fg_color = unicode_type(fg_color, "ascii")
self._colors = {
logging.DEBUG: unicode_type(curses.tparm(fg_color, 4), # Blue
- "ascii"),
+ "ascii"),
logging.INFO: unicode_type(curses.tparm(fg_color, 2), # Green
- "ascii"),
+ "ascii"),
logging.WARNING: unicode_type(curses.tparm(fg_color, 3), # Yellow
- "ascii"),
+ "ascii"),
logging.ERROR: unicode_type(curses.tparm(fg_color, 1), # Red
- "ascii"),
+ "ascii"),
}
self._normal = unicode_type(curses.tigetstr("sgr0"), "ascii")
formatted = formatted.rstrip() + "\n" + record.exc_text
return formatted.replace("\n", "\n ")
+
def enable_pretty_logging(options=None, logger=None):
"""Turns on formatted logging output as configured.
logger.addHandler(channel)
if (options.log_to_stderr or
- (options.log_to_stderr is None and not logger.handlers)):
+ (options.log_to_stderr is None and not logger.handlers)):
# Set up color if we are in a tty and curses is installed
channel = logging.StreamHandler()
channel.setFormatter(LogFormatter())
# late import to prevent cycle
from tornado.options import options
options.define("logging", default="info",
- help=("Set the Python log level. If 'none', tornado won't touch the "
- "logging configuration."),
- metavar="debug|info|warning|error|none")
+ help=("Set the Python log level. If 'none', tornado won't touch the "
+ "logging configuration."),
+ metavar="debug|info|warning|error|none")
options.define("log_to_stderr", type=bool, default=None,
- help=("Send log output to stderr (colorized if possible). "
- "By default use stderr if --log_file_prefix is not set and "
- "no other logging is configured."))
+ help=("Send log output to stderr (colorized if possible). "
+ "By default use stderr if --log_file_prefix is not set and "
+ "no other logging is configured."))
options.define("log_file_prefix", type=str, default=None, metavar="PATH",
- help=("Path prefix for log files. "
- "Note that if you are running multiple tornado processes, "
- "log_file_prefix must be different for each of them (e.g. "
- "include the port number)"))
+ help=("Path prefix for log files. "
+ "Note that if you are running multiple tornado processes, "
+ "log_file_prefix must be different for each of them (e.g. "
+ "include the port number)"))
options.define("log_file_max_size", type=int, default=100 * 1000 * 1000,
- help="max size of log files before rollover")
+ help="max size of log files before rollover")
options.define("log_file_num_backups", type=int, default=10,
- help="number of log files to keep")
+ help="number of log files to keep")
options.add_parse_callback(enable_pretty_logging)
if not os.path.exists(self.ssl_options['certfile']):
raise ValueError('certfile "%s" does not exist' %
- self.ssl_options['certfile'])
+ self.ssl_options['certfile'])
if ('keyfile' in self.ssl_options and
not os.path.exists(self.ssl_options['keyfile'])):
raise ValueError('keyfile "%s" does not exist' %
- self.ssl_options['keyfile'])
+ self.ssl_options['keyfile'])
def listen(self, port, address=""):
"""Starts accepting connections on the given port.
if flags is None:
flags = socket.AI_PASSIVE
for res in set(socket.getaddrinfo(address, port, family, socket.SOCK_STREAM,
- 0, flags)):
+ 0, flags)):
af, socktype, proto, canonname, sockaddr = res
sock = socket.socket(af, socktype, proto)
set_close_exec(sock.fileno())
"""
return _Mockable(self)
+
class _Mockable(object):
"""`mock.patch` compatible wrapper for `OptionParser`.
def __delattr__(self, name):
setattr(self._options, name, self._originals.pop(name))
+
class _Option(object):
def __init__(self, name, default=None, type=basestring_type, help=None,
metavar=None, multiple=False, file_name=None, group_name=None,
"""
return options.print_help(file)
+
def add_parse_callback(callback):
"""Adds a parse callback, to be invoked when option parsing is done.
break # success
except socket.error as detail:
if (not hasattr(errno, 'WSAEADDRINUSE') or
- detail[0] != errno.WSAEADDRINUSE):
+ detail[0] != errno.WSAEADDRINUSE):
# "Address already in use" is the only error
# I've seen on two WinXP Pro SP2 boxes, under
# Pythons 2.3.5 and 2.4.1.
from tornado.ioloop import PollIOLoop
+
class EPollIOLoop(PollIOLoop):
def initialize(self, **kwargs):
super(EPollIOLoop, self).initialize(impl=select.epoll(), **kwargs)
assert hasattr(select, 'kqueue'), 'kqueue not supported'
+
class _KQueue(object):
"""A kqueue-based event loop for BSD/Mac systems."""
def __init__(self):
kevents = []
if events & IOLoop.WRITE:
kevents.append(select.kevent(
- fd, filter=select.KQ_FILTER_WRITE, flags=flags))
+ fd, filter=select.KQ_FILTER_WRITE, flags=flags))
if events & IOLoop.READ or not kevents:
# Always read when there is not a write
kevents.append(select.kevent(
- fd, filter=select.KQ_FILTER_READ, flags=flags))
+ fd, filter=select.KQ_FILTER_READ, flags=flags))
# Even though control() takes a list, it seems to return EINVAL
# on Mac OS X (10.6) when there is more than one event in the list.
for kevent in kevents:
from tornado.ioloop import IOLoop, PollIOLoop
+
class _Select(object):
"""A simple, select()-based IOLoop implementation for non-Linux systems"""
def __init__(self):
events[fd] = events.get(fd, 0) | IOLoop.ERROR
return events.items()
+
class SelectIOLoop(PollIOLoop):
def initialize(self, **kwargs):
super(SelectIOLoop, self).initialize(impl=_Select(), **kwargs)
-
with NullContext():
self._fds[fd] = (reader, None)
self._io_loop.add_handler(fd, self._invoke_callback,
- IOLoop.READ)
+ IOLoop.READ)
def addWriter(self, writer):
"""Add a FileDescriptor for notification of data available to write."""
with NullContext():
self._fds[fd] = (None, writer)
self._io_loop.add_handler(fd, self._invoke_callback,
- IOLoop.WRITE)
+ IOLoop.WRITE)
def removeReader(self, reader):
"""Remove a Selectable for notification of data available to read."""
installReactor(reactor)
return reactor
+
class _FD(object):
def __init__(self, fd, handler):
self.fd = fd
return ''
_FD = implementer(IReadDescriptor, IWriteDescriptor)(_FD)
+
class TwistedIOLoop(tornado.ioloop.IOLoop):
"""IOLoop implementation that runs on Twisted.
except NameError:
long = int # py3
+
def cpu_count():
"""Returns the number of processors on this machine."""
if multiprocessing is not None:
global _task_id
return _task_id
+
class Subprocess(object):
"""Wraps ``subprocess.Popen`` with IOStream support.
if self.queue:
gen_log.debug("max_clients limit reached, request queued. "
"%d active, %d queued requests." % (
- len(self.active), len(self.queue)))
+ len(self.active), len(self.queue)))
def _process_queue(self):
with stack_context.NullContext():
self.start_time + self.request.request_timeout,
stack_context.wrap(self._on_timeout))
if (self.request.validate_cert and
- isinstance(self.stream, SSLIOStream)):
+ isinstance(self.stream, SSLIOStream)):
match_hostname(self.stream.socket.getpeercert(),
# ipv6 addresses are broken (in
# self.parsed.hostname) until 2.7, here is
# __init__
self.parsed_hostname)
if (self.request.method not in self._SUPPORTED_METHODS and
- not self.request.allow_nonstandard_methods):
+ not self.request.allow_nonstandard_methods):
raise KeyError("unknown method %s" % self.request.method)
for key in ('network_interface',
'proxy_host', 'proxy_port',
assert self.request.body is None
if self.request.body is not None:
self.request.headers["Content-Length"] = str(len(
- self.request.body))
+ self.request.body))
if (self.request.method == "POST" and
- "Content-Type" not in self.request.headers):
+ "Content-Type" not in self.request.headers):
self.request.headers["Content-Type"] = "application/x-www-form-urlencoded"
if self.request.use_gzip:
self.request.headers["Accept-Encoding"] = "gzip"
req_path = ((self.parsed.path or '/') +
- (('?' + self.parsed.query) if self.parsed.query else ''))
+ (('?' + self.parsed.query) if self.parsed.query else ''))
request_lines = [utf8("%s %s HTTP/1.1" % (self.request.method,
req_path))]
for k, v in self.request.headers.get_all():
if self.final_callback:
gen_log.warning("uncaught exception", exc_info=(typ, value, tb))
self._run_callback(HTTPResponse(self.request, 599, error=value,
- request_time=self.io_loop.time() - self.start_time,
- ))
+ request_time=self.io_loop.time() - self.start_time,
+ ))
if hasattr(self, "stream"):
self.stream.close()
# These response codes never have bodies
# http://www.w3.org/Protocols/rfc2616/rfc2616-sec4.html#sec4.3
if ("Transfer-Encoding" in self.headers or
- content_length not in (None, 0)):
+ content_length not in (None, 0)):
raise ValueError("Response with code %d should not have body" %
self.code)
self._on_body(b"")
return
if (self.request.use_gzip and
- self.headers.get("Content-Encoding") == "gzip"):
+ self.headers.get("Content-Encoding") == "gzip"):
self._decompressor = GzipDecompressor()
if self.headers.get("Transfer-Encoding") == "chunked":
self.chunks = []
self.request)
if (self.request.follow_redirects and
self.request.max_redirects > 0 and
- self.code in (301, 302, 303, 307)):
+ self.code in (301, 302, 303, 307)):
assert isinstance(self.request, _RequestProxy)
new_request = copy.copy(self.request.request)
new_request.url = urlparse.urljoin(self.request.url,
self._on_body(b''.join(self.chunks))
else:
self.stream.read_bytes(length + 2, # chunk ends with \r\n
- self._on_chunk_data)
+ self._on_chunk_data)
def _on_chunk_data(self, data):
assert data[-2:] == b"\r\n"
dnsnames.append(value)
if len(dnsnames) > 1:
raise CertificateError("hostname %r "
- "doesn't match either of %s"
- % (hostname, ', '.join(map(repr, dnsnames))))
+ "doesn't match either of %s"
+ % (hostname, ', '.join(map(repr, dnsnames))))
elif len(dnsnames) == 1:
raise CertificateError("hostname %r "
- "doesn't match %r"
- % (hostname, dnsnames[0]))
+ "doesn't match %r"
+ % (hostname, dnsnames[0]))
else:
raise CertificateError("no appropriate commonName or "
- "subjectAltName fields were found")
+ "subjectAltName fields were found")
if __name__ == "__main__":
AsyncHTTPClient.configure(SimpleAsyncHTTPClient)
def resolve_path(self, name, parent_path=None):
if parent_path and not parent_path.startswith("<") and \
- not parent_path.startswith("/") and \
- not name.startswith("/"):
+ not parent_path.startswith("/") and \
+ not name.startswith("/"):
current_path = os.path.join(self.root, parent_path)
file_dir = os.path.dirname(os.path.abspath(current_path))
relative_path = os.path.abspath(os.path.join(file_dir, name))
def resolve_path(self, name, parent_path=None):
if parent_path and not parent_path.startswith("<") and \
- not parent_path.startswith("/") and \
- not name.startswith("/"):
+ not parent_path.startswith("/") and \
+ not name.startswith("/"):
file_dir = posixpath.dirname(parent_path)
name = posixpath.normpath(posixpath.join(file_dir, name))
return name
# innermost ones. This is useful when generating languages
# like latex where curlies are also meaningful
if (curly + 2 < reader.remaining() and
- reader[curly + 1] == '{' and reader[curly + 2] == '{'):
+ reader[curly + 1] == '{' and reader[curly + 2] == '{'):
curly += 1
continue
break
if allowed_parents is not None:
if not in_block:
raise ParseError("%s outside %s block" %
- (operator, allowed_parents))
+ (operator, allowed_parents))
if in_block not in allowed_parents:
raise ParseError("%s block cannot be attached to %s block" % (operator, in_block))
body.chunks.append(_IntermediateControlBlock(contents, line))
def get(self):
self.write('oauth_token=hjkl&oauth_token_secret=vbnm&screen_name=foo')
+
class TwitterServerShowUserHandler(RequestHandler):
def get(self, screen_name):
self.write(dict(screen_name=screen_name, name=screen_name.capitalize()))
+
class AuthTest(AsyncHTTPTestCase):
def get_app(self):
return Application(
('/twitter/server/access_token', TwitterServerAccessTokenHandler),
(r'/twitter/api/users/show/(.*)\.json', TwitterServerShowUserHandler),
- ],
+ ],
http_client=self.http_client,
twitter_consumer_key='test_twitter_consumer_key',
twitter_consumer_secret='test_twitter_consumer_secret')
parsed = json_decode(response.body)
self.assertEqual(parsed,
{u('access_token'): {u('key'): u('hjkl'),
- u('screen_name'): u('foo'),
- u('secret'): u('vbnm')},
+ u('screen_name'): u('foo'),
+ u('secret'): u('vbnm')},
u('name'): u('Foo'),
u('screen_name'): u('foo'),
u('username'): u('foo')})
from tornado.netutil import TCPServer
from tornado.testing import AsyncTestCase, LogTrapTestCase, get_unused_port
+
class CapServer(TCPServer):
def handle_stream(self, stream, address):
logging.info("handle_stream")
@gen.engine
def f():
with self.assertRaisesRegexp(CapError, "already capitalized"):
- yield self.client.capitalize("HELLO")
+ yield self.client.capitalize("HELLO")
self.stop()
f()
self.wait()
return client
CurlHTTPClientCommonTestCase = unittest.skipIf(pycurl is None,
"pycurl module not present")(
- CurlHTTPClientCommonTestCase)
+ CurlHTTPClientCommonTestCase)
class CurlHTTPClientTestCase(AsyncHTTPTestCase):
def test_prepare_curl_callback_stack_context(self):
exc_info = []
+
def error_handler(typ, value, tb):
exc_info.append((typ, value, tb))
self.stop()
self.assertIs(exc_info[0][0], ZeroDivisionError)
CurlHTTPClientTestCase = unittest.skipIf(pycurl is None,
"pycurl module not present")(
- CurlHTTPClientTestCase)
+ CurlHTTPClientTestCase)
u('<a href="http://www.external-link.com" rel="nofollow" class="external">www.external-link.com</a>')),
("www.external-link.com and www.internal-link.com/blogs extra",
- {"extra_params": lambda href:'class="internal"' if href.startswith("http://www.internal-link.com") else 'rel="nofollow" class="external"'},
+ {"extra_params": lambda href: 'class="internal"' if href.startswith("http://www.internal-link.com") else 'rel="nofollow" class="external"'},
u('<a href="http://www.external-link.com" rel="nofollow" class="external">www.external-link.com</a> and <a href="http://www.internal-link.com/blogs" class="internal">www.internal-link.com/blogs</a> extra')),
("www.external-link.com",
- {"extra_params": lambda href:' rel="nofollow" class="external" '},
+ {"extra_params": lambda href: ' rel="nofollow" class="external" '},
u('<a href="http://www.external-link.com" rel="nofollow" class="external">www.external-link.com</a>')),
]
("<>&\"", "<>&""),
("&", "&amp;"),
- ]
+ ]
for unescaped, escaped in tests:
self.assertEqual(utf8(xhtml_escape(unescaped)), utf8(escaped))
self.assertEqual(utf8(unescaped), utf8(xhtml_unescape(escaped)))
# unicode strings become utf8
(u('\u00e9'), '%C3%A9'),
- ]
+ ]
for unescaped, escaped in tests:
self.assertEqual(url_escape(unescaped), escaped)
('%C3%A9', u('\u00e9'), 'utf8'),
('%C3%A9', u('\u00c3\u00a9'), 'latin1'),
('%C3%A9', utf8(u('\u00e9')), None),
- ]
+ ]
for escaped, unescaped, encoding in tests:
# input strings to url_unescape should only contain ascii
# characters, but make sure the function accepts both byte
callback(arg)
else:
self.io_loop.add_callback(functools.partial(
- self.delay_callback, iterations - 1, callback, arg))
+ self.delay_callback, iterations - 1, callback, arg))
def test_no_yield(self):
@gen.engine
responses = yield [
gen.Task(self.delay_callback, 3, arg="v1"),
gen.Task(self.delay_callback, 1, arg="v2"),
- ]
+ ]
self.assertEqual(responses, ["v1", "v2"])
self.stop()
self.run_gen(f)
class GenWebTest(AsyncHTTPTestCase):
def get_app(self):
return Application([
- ('/sequence', GenSequenceHandler),
- ('/task', GenTaskHandler),
- ('/exception', GenExceptionHandler),
- ('/yield_exception', GenYieldExceptionHandler),
- ])
+ ('/sequence', GenSequenceHandler),
+ ('/task', GenTaskHandler),
+ ('/exception', GenExceptionHandler),
+ ('/yield_exception', GenYieldExceptionHandler),
+ ])
def test_sequence_handler(self):
response = self.fetch('/sequence')
url("/echopost", EchoPostHandler),
url("/user_agent", UserAgentHandler),
url("/304_with_content_length", ContentLength304Handler),
- ], gzip=True)
+ ], gzip=True)
def test_hello_world(self):
response = self.fetch("/hello")
def test_streaming_stack_context(self):
chunks = []
exc_info = []
+
def error_handler(typ, value, tb):
exc_info.append((typ, value, tb))
return True
def test_header_callback_stack_context(self):
exc_info = []
+
def error_handler(typ, value, tb):
exc_info.append((typ, value, tb))
return True
# and streaming_callback), as errors there must be seen as errors
# by the http client so it can clean up the connection.
exc_info = []
+
def handle_callback_exception(callback):
exc_info.append(sys.exc_info())
self.stop()
self.wait()
self.assertEqual(exc_info[0][0], ZeroDivisionError)
+
class RequestProxyTest(unittest.TestCase):
def test_request_set(self):
proxy = _RequestProxy(HTTPRequest('http://example.com/',
return ssl.PROTOCOL_SSLv3
SSLv3Test = skipIfNoSSL(skipIfOldSSL(SSLv3Test))
+
class TLSv1Test(BaseSSLTest, SSLTestMixin):
def get_ssl_version(self):
return ssl.PROTOCOL_TLSv1
existing_certificate = os.path.join(module_dir, 'test.crt')
self.assertRaises(ValueError, HTTPServer, application, ssl_options={
- "certfile": "/__mising__.crt",
- })
+ "certfile": "/__mising__.crt",
+ })
self.assertRaises(ValueError, HTTPServer, application, ssl_options={
- "certfile": existing_certificate,
- "keyfile": "/__missing__.key"
- })
+ "certfile": existing_certificate,
+ "keyfile": "/__missing__.key"
+ })
# This actually works because both files exist
HTTPServer(application, ssl_options={
- "certfile": existing_certificate,
- "keyfile": existing_certificate
- })
+ "certfile": existing_certificate,
+ "keyfile": existing_certificate
+ })
class MultipartTestHandler(RequestHandler):
1024 * 1024)
conn.set_request(
b"\r\n".join(headers +
- [utf8("Content-Length: %d\r\n" % len(body))]) +
+ [utf8("Content-Length: %d\r\n" % len(body))]) +
b"\r\n" + body)
response = self.wait()
client.close()
# Encodings here are tricky: Headers are latin1, bodies can be
# anything (we use utf8 by default).
response = self.raw_fetch([
- b"POST /multipart HTTP/1.0",
- b"Content-Type: multipart/form-data; boundary=1234567890",
- b"X-Header-encoding-test: \xe9",
- ],
- b"\r\n".join([
- b"Content-Disposition: form-data; name=argument",
- b"",
- u("\u00e1").encode("utf-8"),
- b"--1234567890",
- u('Content-Disposition: form-data; name="files"; filename="\u00f3"').encode("utf8"),
- b"",
- u("\u00fa").encode("utf-8"),
- b"--1234567890--",
- b"",
- ]))
+ b"POST /multipart HTTP/1.0",
+ b"Content-Type: multipart/form-data; boundary=1234567890",
+ b"X-Header-encoding-test: \xe9",
+ ],
+ b"\r\n".join([
+ b"Content-Disposition: form-data; name=argument",
+ b"",
+ u("\u00e1").encode("utf-8"),
+ b"--1234567890",
+ u('Content-Disposition: form-data; name="files"; filename="\u00f3"').encode("utf8"),
+ b"",
+ u("\u00fa").encode("utf-8"),
+ b"--1234567890--",
+ b"",
+ ]))
data = json_decode(response.body)
self.assertEqual(u("\u00e9"), data["header"])
self.assertEqual(u("\u00e1"), data["argument"])
stream.connect(("localhost", self.get_http_port()), callback=self.stop)
self.wait()
stream.write(b"\r\n".join([b"POST /hello HTTP/1.1",
- b"Content-Length: 1024",
- b"Expect: 100-continue",
- b"Connection: close",
- b"\r\n"]), callback=self.stop)
+ b"Content-Length: 1024",
+ b"Expect: 100-continue",
+ b"Connection: close",
+ b"\r\n"]), callback=self.stop)
self.wait()
stream.read_until(b"\r\n\r\n", self.stop)
data = self.wait()
('host', str),
('path', str),
('query', str),
- ]
+ ]
for field, expected_type in fields:
self.check_type(field, getattr(self.request, field), expected_type)
self.fetch_json("/", headers=invalid_host)["remote_ip"],
"127.0.0.1")
+
class ManualProtocolTest(HandlerBaseTestCase):
class Handler(RequestHandler):
def get(self):
not hasattr(socket, 'AF_UNIX') or sys.platform == 'cygwin',
"unix sockets not supported on this platform")
+
class KeepAliveTest(AsyncHTTPTestCase):
"""Tests various scenarios for HTTP 1.1 keep-alive support.
def test_url_concat_no_query_params(self):
url = url_concat(
- "https://localhost/path",
- [('y', 'y'), ('z', 'z')],
- )
+ "https://localhost/path",
+ [('y', 'y'), ('z', 'z')],
+ )
self.assertEqual(url, "https://localhost/path?y=y&z=z")
def test_url_concat_encode_args(self):
url = url_concat(
- "https://localhost/path",
- [('y', '/y'), ('z', 'z')],
- )
+ "https://localhost/path",
+ [('y', '/y'), ('z', 'z')],
+ )
self.assertEqual(url, "https://localhost/path?y=%2Fy&z=z")
def test_url_concat_trailing_q(self):
url = url_concat(
- "https://localhost/path?",
- [('y', 'y'), ('z', 'z')],
- )
+ "https://localhost/path?",
+ [('y', 'y'), ('z', 'z')],
+ )
self.assertEqual(url, "https://localhost/path?y=y&z=z")
def test_url_concat_q_with_no_trailing_amp(self):
url = url_concat(
- "https://localhost/path?x",
- [('y', 'y'), ('z', 'z')],
- )
+ "https://localhost/path?x",
+ [('y', 'y'), ('z', 'z')],
+ )
self.assertEqual(url, "https://localhost/path?x&y=y&z=z")
def test_url_concat_trailing_amp(self):
url = url_concat(
- "https://localhost/path?x&",
- [('y', 'y'), ('z', 'z')],
- )
+ "https://localhost/path?x&",
+ [('y', 'y'), ('z', 'z')],
+ )
self.assertEqual(url, "https://localhost/path?x&y=y&z=z")
def test_url_concat_mult_params(self):
url = url_concat(
- "https://localhost/path?a=1&b=2",
- [('y', 'y'), ('z', 'z')],
- )
+ "https://localhost/path?a=1&b=2",
+ [('y', 'y'), ('z', 'z')],
+ )
self.assertEqual(url, "https://localhost/path?a=1&b=2&y=y&z=z")
def test_url_concat_no_params(self):
url = url_concat(
"https://localhost/path?r=1&t=2",
[],
- )
+ )
self.assertEqual(url, "https://localhost/path?r=1&t=2")
# Issue #635: add_callback() should raise a clean exception
# if called while another thread is closing the IOLoop.
closing = threading.Event()
+
def target():
other_ioloop.add_callback(other_ioloop.stop)
other_ioloop.start()
def test_add_future_stack_context(self):
ready = threading.Event()
+
def task():
# we must wait for the ioloop callback to be scheduled before
# the task completes to ensure that add_future adds the callback
ready.wait(1)
assert ready.isSet(), "timed out"
raise Exception("worker")
+
def callback(future):
self.future = future
raise Exception("callback")
+
def handle_exception(typ, value, traceback):
self.exception = value
self.stop()
skipIfNoSSL = unittest.skipIf(ssl is None, "ssl module not present")
+
class HelloHandler(RequestHandler):
def get(self):
self.write("Hello")
# pypy's gc defeats moves objects, breaking the
# "frozen write buffer" assumption.
if (isinstance(server, SSLIOStream) and
- platform.python_implementation() == 'PyPy'):
+ platform.python_implementation() == 'PyPy'):
raise unittest.SkipTest(
"pypy gc causes problems with openssl")
except AttributeError:
server.read_bytes(1, lambda data: None)
client.write(b'a')
# Stub out read_from_fd to make it fail.
+
def fake_read_from_fd():
os.close(server.socket.fileno())
server.__class__.read_from_fd(server)
server.close()
client.close()
+
class TestIOStreamWebHTTP(TestIOStreamWebMixin, AsyncHTTPTestCase):
def _make_client_iostream(self):
return IOStream(socket.socket(), io_loop=self.io_loop)
ssl_options = dict(
certfile=os.path.join(os.path.dirname(__file__), 'test.crt'),
keyfile=os.path.join(os.path.dirname(__file__), 'test.key'),
- )
+ )
connection = ssl.wrap_socket(connection,
server_side=True,
do_handshake_on_connect=False,
return SSLIOStream(connection, io_loop=self.io_loop, **kwargs)
TestIOStreamSSL = skipIfNoSSL(TestIOStreamSSL)
+
class TestPipeIOStream(AsyncTestCase):
def test_pipe_iostream(self):
r, w = os.pipe()
from tornado.test.util import unittest
from tornado.util import u, bytes_type, basestring_type
+
@contextlib.contextmanager
def ignore_bytes_warning():
if not hasattr(warnings, 'catch_warnings'):
# encoding issues from the control characters)
self.formatter._colors = {
logging.ERROR: u("\u0001"),
- }
+ }
self.formatter._normal = u("\u0002")
self.formatter._color = True
# construct a Logger directly to bypass getLogger's caching
except ImportError:
futures = None
+
class _ResolverTestMixin(object):
def test_localhost(self):
self.resolver.getaddrinfo('localhost', 80, socket.AF_UNSPEC,
super(SyncResolverTest, self).setUp()
self.resolver = Resolver(self.io_loop)
+
class ThreadedResolverTest(AsyncTestCase, _ResolverTestMixin):
def setUp(self):
super(ThreadedResolverTest, self).setUp()
except ImportError:
mock = None
+
class OptionsTest(unittest.TestCase):
def test_parse_command_line(self):
options = OptionParser()
def test_parse_callbacks(self):
options = OptionParser()
self.called = False
+
def callback():
self.called = True
options.add_parse_callback(callback)
raise unittest.SkipTest("Process tests not compatible with TwistedIOLoop")
# Not using AsyncHTTPTestCase because we need control over the IOLoop.
+
+
class ProcessTest(unittest.TestCase):
def get_app(self):
class ProcessHandler(RequestHandler):
# Disabled because on the mac a process dying with a signal
# can trigger an "Application exited abnormally; send error
# report to Apple?" prompt.
- #fetch("/?signal=%d" % signal.SIGTERM, fail_ok=True)
- #fetch("/?signal=%d" % signal.SIGABRT, fail_ok=True)
- #int(fetch("/").body)
+ # fetch("/?signal=%d" % signal.SIGTERM, fail_ok=True)
+ # fetch("/?signal=%d" % signal.SIGABRT, fail_ok=True)
+ # int(fetch("/").body)
# Now kill them normally so they won't be restarted
fetch("/?exit=0", fail_ok=True)
def all():
return unittest.defaultTestLoader.loadTestsFromNames(TEST_MODULES)
+
class TornadoTextTestRunner(unittest.TextTestRunner):
def run(self, test):
result = super(TornadoTextTestRunner, self).run(test)
if result.skipped:
skip_reasons = set(reason for (test, reason) in result.skipped)
self.stream.write(textwrap.fill(
- "Some tests were skipped because: %s" %
- ", ".join(sorted(skip_reasons))))
+ "Some tests were skipped because: %s" %
+ ", ".join(sorted(skip_reasons))))
self.stream.write("\n")
return result
callback=AsyncHTTPClient.configure)
define('ioloop', type=str, default=None)
define('ioloop_time_monotonic', default=False)
+
def configure_ioloop():
kwargs = {}
if options.ioloop_time_monotonic:
url("/see_other_post", SeeOtherPostHandler),
url("/see_other_get", SeeOtherGetHandler),
url("/host_echo", HostEchoHandler),
- ], gzip=True)
+ ], gzip=True)
def test_singleton(self):
# Class "constructor" reuses objects on the same IOLoop
self.assertEqual(200, response.code)
self.assertTrue(response.request.url.endswith("/see_other_post"))
self.assertTrue(response.effective_url.endswith("/see_other_get"))
- #request is the original request, is a POST still
+ # request is the original request, is a POST still
self.assertEqual("POST", response.request.method)
def test_request_timeout(self):
callback = wrap(callback)
with StackContext(functools.partial(self.context, 'library')):
self.io_loop.add_callback(
- functools.partial(library_inner_callback, callback))
+ functools.partial(library_inner_callback, callback))
def library_inner_callback(callback):
self.assertEqual(self.active_contexts[-2:],
def test_include(self):
loader = DictLoader({
- "index.html": '{% include "header.html" %}\nbody text',
- "header.html": "header text",
- })
+ "index.html": '{% include "header.html" %}\nbody text',
+ "header.html": "header text",
+ })
self.assertEqual(loader.load("index.html").generate(),
b"header text\nbody text")
def test_extends(self):
loader = DictLoader({
- "base.html": """\
+ "base.html": """\
<title>{% block title %}default title{% end %}</title>
<body>{% block body %}default body{% end %}</body>
""",
- "page.html": """\
+ "page.html": """\
{% extends "base.html" %}
{% block title %}page title{% end %}
{% block body %}page body{% end %}
""",
- })
+ })
self.assertEqual(loader.load("page.html").generate(),
b"<title>page title</title>\n<body>page body</body>\n")
def test_relative_load(self):
loader = DictLoader({
- "a/1.html": "{% include '2.html' %}",
- "a/2.html": "{% include '../b/3.html' %}",
- "b/3.html": "ok",
- })
+ "a/1.html": "{% include '2.html' %}",
+ "a/2.html": "{% include '../b/3.html' %}",
+ "b/3.html": "ok",
+ })
self.assertEqual(loader.load("a/1.html").generate(),
b"ok")
def test_multi_includes(self):
loader = DictLoader({
- "a.html": "{% include 'b.html' %}",
- "b.html": "{% include 'c.html' %}",
- "c.html": "{{1/0}}",
- })
+ "a.html": "{% include 'b.html' %}",
+ "b.html": "{% include 'c.html' %}",
+ "c.html": "{{1/0}}",
+ })
try:
loader.load("a.html").generate()
except ZeroDivisionError:
{% autoescape xhtml_escape %}\
expr: {{ name }}
raw: {% raw name %}""",
- }
+ }
def test_default_off(self):
loader = DictLoader(self.templates, autoescape=None)
skipIfNoTwisted = unittest.skipUnless(have_twisted,
"twisted module not present")
+
def save_signal_handlers():
saved = {}
for sig in [signal.SIGINT, signal.SIGTERM, signal.SIGCHLD]:
self._anotherWhenRunningCalled = True
ReactorWhenRunningTest = skipIfNoTwisted(ReactorWhenRunningTest)
+
class ReactorCallLaterTest(ReactorTestCase):
def test_callLater(self):
self._laterCalled = False
'twisted.internet.test.test_core.SystemEventTestsBuilder': [
'test_iterate', # deliberately not supported
'test_runAfterCrash', # fails because TwistedIOLoop uses the global reactor
- ] if issubclass(IOLoop.configured_class(), TwistedIOLoop) else [
+ ] if issubclass(IOLoop.configured_class(), TwistedIOLoop) else [
'test_iterate', # deliberately not supported
- ],
+ ],
'twisted.internet.test.test_fdset.ReactorFDSetTestsBuilder': [
"test_lostFileDescriptor", # incompatible with epoll and kqueue
- ],
+ ],
'twisted.internet.test.test_process.ProcessTestsBuilder': [
# Doesn't work on python 2.5
'test_systemCallUninterruptedByChildExit',
- ],
+ ],
# Process tests appear to work on OSX 10.7, but not 10.6
#'twisted.internet.test.test_process.PTYProcessTestsBuilder': [
# 'test_systemCallUninterruptedByChildExit',
# ],
'twisted.internet.test.test_tcp.TCPClientTestsBuilder': [
'test_badContext', # ssl-related; see also SSLClientTestsMixin
- ],
+ ],
'twisted.internet.test.test_tcp.TCPPortTestsBuilder': [
# These use link-local addresses and cause firewall prompts on mac
'test_buildProtocolIPv6AddressScopeID',
'test_portGetHostOnIPv6ScopeID',
'test_serverGetHostOnIPv6ScopeID',
'test_serverGetPeerOnIPv6ScopeID',
- ],
+ ],
'twisted.internet.test.test_tcp.TCPConnectionTestsBuilder': [],
'twisted.internet.test.test_tcp.WriteSequenceTests': [],
'twisted.internet.test.test_tcp.AbortConnectionTestCase': [],
'test_sendFileDescriptorTriggersPauseProducing',
'test_descriptorDeliveredBeforeBytes',
'test_avoidLeakingFileDescriptors',
- ],
+ ],
'twisted.internet.test.test_unix.UNIXDatagramTestsBuilder': [
'test_listenOnLinuxAbstractNamespace',
- ],
+ ],
'twisted.internet.test.test_unix.UNIXPortTestsBuilder': [],
- }
+ }
for test_name, blacklist in twisted_tests.iteritems():
try:
test_class = import_object(test_name)
# leave it turned off, but while working on these tests you may want
# to uncomment one of the other lines instead.
log.defaultObserver.stop()
- #import sys; log.startLogging(sys.stderr, setStdout=0)
- #log.startLoggingWithObserver(log.PythonLoggingObserver().emit, setStdout=0)
+ # import sys; log.startLogging(sys.stderr, setStdout=0)
+ # log.startLoggingWithObserver(log.PythonLoggingObserver().emit, setStdout=0)
if __name__ == "__main__":
unittest.main()
except TwoArgException as e:
self.assertIs(e, exc_info[1])
+
class TestConfigurable(Configurable):
@classmethod
def configurable_base(cls):
def configurable_default(cls):
return TestConfig1
+
class TestConfig1(TestConfigurable):
def initialize(self, a=None):
self.a = a
+
class TestConfig2(TestConfigurable):
def initialize(self, b=None):
self.b = b
+
class ConfigurableTest(unittest.TestCase):
def setUp(self):
self.saved = TestConfigurable._save_configuration()
wsgi_safe = []
+
class WebTestCase(AsyncHTTPTestCase):
"""Base class for web tests that also supports WSGI mode.
def get_app_kwargs(self):
return {}
+
class SimpleHandlerTestCase(WebTestCase):
"""Simplified base class for tests that work with a single handler class.
sig)
# tamper with the cookie
handler._cookies['foo'] = utf8('1234|5678%s|%s' % (
- to_basestring(timestamp), to_basestring(sig)))
+ to_basestring(timestamp), to_basestring(sig)))
# it gets rejected
with ExpectLog(gen_log, "Cookie timestamp in future"):
self.assertTrue(handler.get_secure_cookie('foo') is None)
def get_app_kwargs(self):
loader = DictLoader({
- "linkify.html": "{% module linkify(message) %}",
- "page.html": """\
+ "linkify.html": "{% module linkify(message) %}",
+ "page.html": """\
<html><head></head><body>
{% for e in entries %}
{% module Template("entry.html", entry=e) %}
{% end %}
</body></html>""",
- "entry.html": """\
+ "entry.html": """\
{{ set_resources(embedded_css=".entry { margin-bottom: 1em; }", embedded_javascript="js_embed()", css_files=["/base.css", "/foo.css"], javascript_files="/common.js", html_head="<meta>", html_body='<script src="/analytics.js"/>') }}
<div class="entry">...</div>""",
- })
+ })
return dict(template_loader=loader,
autoescape="xhtml_escape",
cookie_secret=self.COOKIE_SECRET)
url("/redirect", RedirectHandler),
url("/header_injection", HeaderInjectionHandler),
url("/get_argument", GetArgumentHandler),
- ]
+ ]
return urls
def fetch_json(self, *args, **kwargs):
self.assertEqual(b"", response.body)
wsgi_safe.append(ErrorResponseTest)
+
class StaticFileTest(WebTestCase):
def get_handlers(self):
class StaticUrlHandler(RequestHandler):
def test_static_304_if_modified_since(self):
response1 = self.fetch("/static/robots.txt")
response2 = self.fetch("/static/robots.txt", headers={
- 'If-Modified-Since': response1.headers['Last-Modified']})
+ 'If-Modified-Since': response1.headers['Last-Modified']})
self.assertEqual(response2.code, 304)
self.assertTrue('Content-Length' not in response2.headers)
self.assertTrue('Last-Modified' not in response2.headers)
def test_static_304_if_none_match(self):
response1 = self.fetch("/static/robots.txt")
response2 = self.fetch("/static/robots.txt", headers={
- 'If-None-Match': response1.headers['Etag']})
+ 'If-None-Match': response1.headers['Etag']})
self.assertEqual(response2.code, 304)
wsgi_safe.append(StaticFileTest)
+
class CustomStaticFileTest(WebTestCase):
def get_handlers(self):
class MyStaticFileHandler(StaticFileHandler):
def test_host_matching(self):
self.app.add_handlers("www.example.com",
- [("/foo", HostMatchingTest.Handler, {"reply": "[0]"})])
+ [("/foo", HostMatchingTest.Handler, {"reply": "[0]"})])
self.app.add_handlers(r"www\.example\.com",
- [("/bar", HostMatchingTest.Handler, {"reply": "[1]"})])
+ [("/bar", HostMatchingTest.Handler, {"reply": "[1]"})])
self.app.add_handlers("www.example.com",
- [("/baz", HostMatchingTest.Handler, {"reply": "[2]"})])
+ [("/baz", HostMatchingTest.Handler, {"reply": "[2]"})])
response = self.fetch("/foo")
self.assertEqual(response.body, b"wildcard")
self.assertEqual(response.headers["h2"], "bar")
wsgi_safe.append(ClearHeaderTest)
+
class Header304Test(SimpleHandlerTestCase):
class Handler(RequestHandler):
def get(self):
self.assertEqual(response1.headers["Content-Language"], "en_US")
response2 = self.fetch('/', headers={
- 'If-None-Match': response1.headers["Etag"]})
+ 'If-None-Match': response1.headers["Etag"]})
self.assertEqual(response2.code, 304)
self.assertTrue("Content-Length" not in response2.headers)
self.assertTrue("Content-Language" not in response2.headers)
self.assertEqual(response.headers['Vary'],
'Accept-Language, Accept-Encoding')
+
class PathArgsInPrepareTest(WebTestCase):
class Handler(RequestHandler):
def prepare(self):
# fits better in our async testing framework and the wsgiref
# validator should keep us honest
return WSGIContainer(validator(WSGIApplication([
- ("/", HelloHandler),
- ("/path/(.*)", PathQuotingHandler),
- ("/typecheck", TypeCheckHandler),
- ])))
+ ("/", HelloHandler),
+ ("/path/(.*)", PathQuotingHandler),
+ ("/typecheck", TypeCheckHandler),
+ ])))
def test_simple(self):
response = self.fetch("/")
def tearDown(self):
self.io_loop.clear_current()
if (not IOLoop.initialized() or
- self.io_loop is not IOLoop.instance()):
+ self.io_loop is not IOLoop.instance()):
# Try to clean up any file descriptors left open in the ioloop.
# This avoids leaks, especially when tests are run repeatedly
# in the same process with autoreload (because curl does not
def timeout_func():
try:
raise self.failureException(
- 'Async operation timed out after %s seconds' %
- timeout)
+ 'Async operation timed out after %s seconds' %
+ timeout)
except Exception:
self.__failure = sys.exc_info()
self.stop()
self.__running = True
self.io_loop.start()
if (self.__failure is not None or
- condition is None or condition()):
+ condition is None or condition()):
break
assert self.__stopped
self.__stopped = False
def tearDown(self):
self.http_server.stop()
if (not IOLoop.initialized() or
- self.http_client.io_loop is not IOLoop.instance()):
+ self.http_client.io_loop is not IOLoop.instance()):
self.http_client.close()
super(AsyncHTTPTestCase, self).tearDown()
# openssl req -new -keyout tornado/test/test.key -out tornado/test/test.crt -nodes -days 3650 -x509
module_dir = os.path.dirname(__file__)
return dict(
- certfile=os.path.join(module_dir, 'test', 'test.crt'),
- keyfile=os.path.join(module_dir, 'test', 'test.key'))
+ certfile=os.path.join(module_dir, 'test', 'test.crt'),
+ keyfile=os.path.join(module_dir, 'test', 'test.key'))
def get_protocol(self):
return 'https'
exec code in glob, loc
""")
+
class Configurable(object):
"""Base class for configurable interfaces.
base.__impl_class = cls.configurable_default()
return base.__impl_class
-
@classmethod
def _save_configuration(cls):
base = cls.configurable_base()
except ImportError:
from urllib.parse import urlencode # py3
+
class RequestHandler(object):
"""Subclass this class and define get() or post() to make a handler.
self.path_args = None
self.path_kwargs = None
self.ui = ObjectDict((n, self._ui_method(m)) for n, m in
- application.ui_methods.items())
+ application.ui_methods.items())
# UIModules are available as both `modules` and `_modules` in the
# template namespace. Historically only `modules` was available
# but could be clobbered by user additions to the namespace.
# The template {% module %} directive looks in `_modules` to avoid
# possible conflicts.
self.ui["_modules"] = ObjectDict((n, self._ui_module(n, m)) for n, m in
- application.ui_modules.items())
+ application.ui_modules.items())
self.ui["modules"] = self.ui["_modules"]
self.clear()
# Check since connection is not available in WSGI
for transform in self._transforms:
self._status_code, self._headers, chunk = \
transform.transform_first_chunk(
- self._status_code, self._headers, chunk, include_footers)
+ self._status_code, self._headers, chunk, include_footers)
headers = self._generate_headers()
else:
for transform in self._transforms:
if not self._headers_written:
if (self._status_code == 200 and
self.request.method in ("GET", "HEAD") and
- "Etag" not in self._headers):
+ "Etag" not in self._headers):
etag = self.compute_etag()
if etag is not None:
self.set_header("Etag", etag)
else:
self.finish("<html><title>%(code)d: %(message)s</title>"
"<body>%(code)d: %(message)s</body></html>" % {
- "code": status_code,
- "message": self._reason,
- })
+ "code": status_code,
+ "message": self._reason,
+ })
@property
def locale(self):
# If XSRF cookies are turned on, reject form submissions without
# the proper cookie
if self.request.method not in ("GET", "HEAD", "OPTIONS") and \
- self.application.settings.get("xsrf_cookies"):
+ self.application.settings.get("xsrf_cookies"):
self.check_xsrf_cookie()
self.prepare()
if not self._finished:
raise Exception("@asynchronous is not supported for WSGI apps")
self._auto_finish = False
with stack_context.ExceptionStackContext(
- self._stack_context_handle_exception):
+ self._stack_context_handle_exception):
return method(self, *args, **kwargs)
return wrapper
self.handlers.append((re.compile(host_pattern), handlers))
for spec in host_handlers:
- if type(spec) is type(()):
+ if isinstance(spec, type(())):
assert len(spec) in (2, 3)
pattern = spec[0]
handler = spec[1]
return matches or None
def _load_ui_methods(self, methods):
- if type(methods) is types.ModuleType:
+ if isinstance(methods, types.ModuleType):
self._load_ui_methods(dict((n, getattr(methods, n))
for n in dir(methods)))
elif isinstance(methods, list):
else:
for name, fn in methods.items():
if not name.startswith("_") and hasattr(fn, "__call__") \
- and name[0].lower() == name[0]:
+ and name[0].lower() == name[0]:
self.ui_methods[name] = fn
def _load_ui_modules(self, modules):
- if type(modules) is types.ModuleType:
+ if isinstance(modules, types.ModuleType):
self._load_ui_modules(dict((n, getattr(modules, n))
for n in dir(modules)))
elif isinstance(modules, list):
if cache_time > 0:
self.set_header("Expires", datetime.datetime.utcnow() +
- datetime.timedelta(seconds=cache_time))
+ datetime.timedelta(seconds=cache_time))
self.set_header("Cache-Control", "max-age=" + str(cache_time))
self.set_extra_headers(path)
def __repr__(self):
return '%s(%r, %s, kwargs=%r, name=%r)' % \
- (self.__class__.__name__, self.regex.pattern,
- self.handler_class, self.kwargs, self.name)
+ (self.__class__.__name__, self.regex.pattern,
+ self.handler_class, self.kwargs, self.name)
def _find_groups(self):
"""Returns a tuple (reverse string, group count) for a url.
if len(a) != len(b):
return False
result = 0
- if type(a[0]) is int: # python3 byte strings
+ if isinstance(a[0], int): # python3 byte strings
for x, y in zip(a, b):
result |= x ^ y
else: # python2
"Sec-WebSocket-Location: %(scheme)s://%(host)s%(uri)s\r\n"
"%(subprotocol)s"
"\r\n" % (dict(
- version=tornado.version,
- origin=self.request.headers["Origin"],
- scheme=scheme,
- host=self.request.host,
- uri=self.request.uri,
- subprotocol=subprotocol_header))))
+ version=tornado.version,
+ origin=self.request.headers["Origin"],
+ scheme=scheme,
+ host=self.request.host,
+ uri=self.request.uri,
+ subprotocol=subprotocol_header))))
self.stream.read_bytes(8, self._handle_challenge)
def challenge_response(self, challenge):
def _on_end_delimiter(self, frame):
if not self.client_terminated:
self.async_callback(self.handler.on_message)(
- frame[:-1].decode("utf-8", "replace"))
+ frame[:-1].decode("utf-8", "replace"))
if not self.client_terminated:
self._receive_message()
def _challenge_response(self):
sha1 = hashlib.sha1()
sha1.update(tornado.escape.utf8(
- self.request.headers.get("Sec-Websocket-Key")))
+ self.request.headers.get("Sec-Websocket-Key")))
sha1.update(b"258EAFA5-E914-47DA-95CA-C5AB0DC85B11") # Magic value
return tornado.escape.native_str(base64.b64encode(sha1.digest()))