if self._read_from_buffer():
return
self._check_closed()
- while True:
- if self._read_to_buffer() == 0:
- break
- self._check_closed()
+ try:
+ # See comments in _handle_read about incrementing _pending_callbacks
+ self._pending_callbacks += 1
+ while True:
+ if self._read_to_buffer() == 0:
+ break
+ self._check_closed()
+ finally:
+ self._pending_callbacks -= 1
if self._read_from_buffer():
return
self._add_io_state(self.io_loop.READ)
from __future__ import absolute_import, division, with_statement
+from tornado import gen
from tornado.escape import json_decode, utf8, to_unicode, recursive_unicode, native_str
from tornado.iostream import IOStream
from tornado.template import DictLoader
else:
raise Exception("didn't get permanent or status arguments")
+class EmptyFlushCallbackHandler(RequestHandler):
+ @gen.engine
+ @asynchronous
+ def get(self):
+ # Ensure that the flush callback is run whether or not there
+ # was any output.
+ yield gen.Task(self.flush) # "empty" flush, but writes headers
+ yield gen.Task(self.flush) # empty flush
+ self.write("o")
+ yield gen.Task(self.flush) # flushes the "o"
+ yield gen.Task(self.flush) # empty flush
+ self.finish("k")
+
class WebTest(AsyncHTTPTestCase, LogTrapTestCase):
def get_app(self):
url("/flow_control", FlowControlHandler),
url("/multi_header", MultiHeaderHandler),
url("/redirect", RedirectHandler),
+ url("/empty_flush", EmptyFlushCallbackHandler),
]
return Application(urls,
template_loader=loader,
response = self.fetch("/redirect?status=307", follow_redirects=False)
self.assertEqual(response.code, 307)
+ def test_empty_flush(self):
+ response = self.fetch("/empty_flush")
+ self.assertEqual(response.body, b("ok"))
+
class ErrorResponseTest(AsyncHTTPTestCase, LogTrapTestCase):
def get_app(self):
self.request.write(headers, callback=callback)
return
- if headers or chunk:
- self.request.write(headers + chunk, callback=callback)
+ self.request.write(headers + chunk, callback=callback)
def finish(self, chunk=None):
"""Finishes this response, ending the HTTP request."""