The only remaining flake8 errors are for line length.
if callback is not None:
future.add_done_callback(
functools.partial(_auth_future_to_callback, callback))
+
def handle_exception(typ, value, tb):
if future.done():
return False
functools.partial(self._curl_header_callback,
headers, request.header_callback))
if request.streaming_callback:
- write_function = lambda chunk: self.io_loop.add_callback(
- request.streaming_callback, chunk)
+ def write_function(chunk):
+ self.io_loop.add_callback(request.streaming_callback, chunk)
else:
write_function = buffer.write
if bytes is str: # py2
% request.method)
request_buffer = BytesIO(utf8(request.body))
+
def ioctl(cmd):
if cmd == curl.IOCMD_RESTARTREAD:
request_buffer.seek(0)
which use ``self.finish()`` in place of a callback argument.
"""
func = _make_coroutine_wrapper(func, replace_callback=False)
+
@functools.wraps(func)
def wrapper(*args, **kwargs):
future = func(*args, **kwargs)
+
def final_callback(future):
if future.result() is not None:
raise ReturnValueIgnoredError(
yielded.
"""
future = Future()
+
def handle_exception(typ, value, tb):
if future.done():
return False
future.set_exc_info((typ, value, tb))
return True
+
def set_result(result):
if future.done():
return
future = Future()
if not children:
future.set_result({} if keys is not None else [])
+
def callback(f):
unfinished_children.remove(f)
if not unfinished_children:
chain_future(future, result)
if io_loop is None:
io_loop = IOLoop.current()
+
def error_callback(future):
try:
future.result()
if not isinstance(e, quiet_exceptions):
app_log.error("Exception in Future %r after timeout",
future, exc_info=True)
+
def timeout_callback():
result.set_exception(TimeoutError("Timeout"))
# In case the wrapped future goes on to fail, log it.
# YieldPoints are too closely coupled to the Runner to go
# through the generic convert_yielded mechanism.
self.future = TracebackFuture()
+
def start_yield_point():
try:
yielded.start(self)
with stack_context.ExceptionStackContext(
self.handle_exception) as deactivate:
self.stack_context_deactivate = deactivate
+
def cb():
start_yield_point()
self.run()
# If we had an "unwrap" counterpart to this method we would need
# to restore the original callback after our Future resolves
# so that repeated wrap/unwrap calls don't build up layers.
+
def close_callback():
if not future.done():
future.set_exception(ssl_stream.error or StreamClosedError())
arg = args[i].lstrip("-")
name, equals, value = arg.partition("=")
name = name.replace('-', '_')
- if not name in self._options:
+ if name not in self._options:
self.print_help()
raise Error('Unrecognized command line option: %r' % name)
option = self._options[name]
from tornado.platform.windows import set_close_exec
elif 'APPENGINE_RUNTIME' in os.environ:
from tornado.platform.common import Waker
+
def set_close_exec(fd):
pass
else:
# monotime monkey-patches the time module to have a monotonic function
# in versions of python before 3.3.
import monotime
+ # Silence pyflakes warning about this unused import
+ monotime
except ImportError:
pass
try:
from time import monotonic as monotonic_time
except ImportError:
monotonic_time = None
+
+__all__ = ['Waker', 'set_close_exec', 'monotonic_time']
@gen.convert_yielded.register(Deferred)
def _(d):
f = Future()
+
def errback(failure):
try:
failure.raiseException()
from tornado.testing import AsyncHTTPTestCase
from tornado.test import httpclient_test
from tornado.test.util import unittest
-from tornado.web import Application, RequestHandler, URLSpec
+from tornado.web import Application, RequestHandler
try:
@gen_test
def test_moment(self):
calls = []
+
@gen.coroutine
def f(name, yieldable):
for i in range(5):
+# flake8: noqa
# Dummy source file to allow creation of the initial .po file in the
# same way as a real project. I'm not entirely sure about the real
# workflow here, but this seems to work.
"""Reads an HTTP response from `stream` and runs callback with its
headers and body."""
chunks = []
+
class Delegate(HTTPMessageDelegate):
def headers_received(self, start_line, headers):
self.headers = headers
class HelloHandler(RequestHandler):
def get(self):
self.finish('Hello world')
+
def post(self):
self.finish('Hello world')
def test_chunked_compressed(self):
compressed = self.compress(self.BODY)
self.assertGreater(len(compressed), 20)
+
def body_producer(write):
write(compressed[:20])
write(compressed[20:])
+# flake8: noqa
from __future__ import absolute_import, division, print_function, with_statement
from tornado.test.util import unittest
# t2 should be cancelled by t1, even though it is already scheduled to
# be run before the ioloop even looks at it.
now = self.io_loop.time()
+
def t1():
calls[0] = True
self.io_loop.remove_timeout(t2_handle)
self.io_loop.add_timeout(now + 0.01, t1)
+
def t2():
calls[1] = True
t2_handle = self.io_loop.add_timeout(now + 0.02, t2)
"""The handler callback receives the same fd object it passed in."""
server_sock, port = bind_unused_port()
fds = []
+
def handle_connection(fd, events):
fds.append(fd)
conn, addr = server_sock.accept()
def test_mixed_fd_fileobj(self):
server_sock, port = bind_unused_port()
+
def f(fd, events):
pass
self.io_loop.add_handler(server_sock, f, IOLoop.READ)
"""Calling start() twice should raise an error, not deadlock."""
returned_from_start = [False]
got_exception = [False]
+
def callback():
try:
self.io_loop.start()
# After reading from one fd, remove the other from the IOLoop.
chunks = []
+
def handle_read(fd, events):
chunks.append(fd.recv(1024))
if fd is client:
def streaming_callback(data):
chunks.append(data)
self.stop()
+
def close_callback(data):
assert not data, data
closed[0] = True
def test_future_delayed_close_callback(self):
# Same as test_delayed_close_callback, but with the future interface.
server, client = self.make_iostream_pair()
+
# We can't call make_iostream_pair inside a gen_test function
# because the ioloop is not reentrant.
@gen_test
# and IOStream._maybe_add_error_listener.
server, client = self.make_iostream_pair()
closed = [False]
+
def close_callback():
closed[0] = True
self.stop()
def twisted_coroutine_fetch(self, url, runner):
body = [None]
+
@gen.coroutine
def f():
# This is simpler than the non-coroutine version, but it cheats
'test_changeUID',
],
# Process tests appear to work on OSX 10.7, but not 10.6
- #'twisted.internet.test.test_process.PTYProcessTestsBuilder': [
+ # 'twisted.internet.test.test_process.PTYProcessTestsBuilder': [
# 'test_systemCallUninterruptedByChildExit',
# ],
'twisted.internet.test.test_tcp.TCPClientTestsBuilder': [
'twisted.internet.test.test_threads.ThreadTestsBuilder': [],
'twisted.internet.test.test_time.TimeTestsBuilder': [],
# Extra third-party dependencies (pyOpenSSL)
- #'twisted.internet.test.test_tls.SSLClientTestsMixin': [],
+ # 'twisted.internet.test.test_tls.SSLClientTestsMixin': [],
'twisted.internet.test.test_udp.UDPServerTestsBuilder': [],
'twisted.internet.test.test_unix.UNIXTestsBuilder': [
# Platform-specific. These tests would be skipped automatically
def my_ui_method(handler, x):
return "In my_ui_method(%s) with handler value %s." % (
x, handler.value())
+
class MyModule(UIModule):
def render(self, x):
return "In MyModule(%s) with handler value %s." % (
def test_payload_tampering(self):
# These cookies are variants of the one in test_known_values.
sig = "3d4e60b996ff9c5d5788e333a0cba6f238a22c6c0f94788870e1a9ecd482e152"
+
def validate(prefix):
return (b'value' ==
decode_signed_value(SignedValueTest.SECRET, "key",
def test_signature_tampering(self):
prefix = "2|1:0|10:1300000000|3:key|8:dmFsdWU=|"
+
def validate(sig):
return (b'value' ==
decode_signed_value(SignedValueTest.SECRET, "key",
from tornado.util import u
try:
- import tornado.websocket
+ import tornado.websocket # noqa
from tornado.util import _websocket_mask_python
except ImportError:
# The unittest module presents misleading errors on ImportError
# unicode_literals" have other problems (see PEP 414). u() can be applied
# to ascii strings that include \u escapes (but they must not contain
# literal non-ascii characters).
-if type('') is not type(b''):
+if not isinstance(b'', type('')):
def u(s):
return s
unicode_type = str
else:
def u(s):
return s.decode('unicode_escape')
- unicode_type = unicode
- basestring_type = basestring
+ # These names don't exist in py3, so use noqa comments to disable
+ # warnings in flake8.
+ unicode_type = unicode # noqa
+ basestring_type = basestring # noqa
def import_object(name):
"""
# Delay the IOLoop import because it's not available on app engine.
from tornado.ioloop import IOLoop
+
@functools.wraps(method)
def wrapper(self, *args, **kwargs):
self._auto_finish = False