# contexts from one request from leaking into the next.
self._header_callback = stack_context.wrap(self._on_headers)
self.stream.read_until(b("\r\n\r\n"), self._header_callback)
+ self._write_callback = None
- def write(self, chunk):
+ def write(self, chunk, callback=None):
"""Writes a chunk of output to the stream."""
assert self._request, "Request closed"
if not self.stream.closed():
+ self._write_callback = stack_context.wrap(callback)
self.stream.write(chunk, self._on_write_complete)
def finish(self):
self._finish_request()
def _on_write_complete(self):
+ if self._write_callback is not None:
+ callback = self._write_callback
+ self._write_callback = None
+ callback()
if self._request_finished:
self._finish_request()
"""Returns True if this request supports HTTP/1.1 semantics"""
return self.version == "HTTP/1.1"
- def write(self, chunk):
+ def write(self, chunk, callback=None):
"""Writes the given chunk to the response stream."""
assert isinstance(chunk, bytes_type)
- self.connection.write(chunk)
+ self.connection.write(chunk, callback=callback)
def finish(self):
"""Finishes this HTTP request on the open connection."""
def get(self, path):
self.write({"path": path})
+class FlowControlHandler(RequestHandler):
+ # These writes are too small to demonstrate real flow control,
+ # but at least it shows that the callbacks get run.
+ @asynchronous
+ def get(self):
+ self.write("1")
+ self.flush(callback=self.step2)
+
+ def step2(self):
+ self.write("2")
+ self.flush(callback=self.step3)
+
+ def step3(self):
+ self.write("3")
+ self.finish()
+
class WebTest(AsyncHTTPTestCase, LogTrapTestCase):
def get_app(self):
loader = DictLoader({
url("/linkify", LinkifyHandler),
url("/uimodule_resources", UIModuleResourceHandler),
url("/optional_path/(.+)?", OptionalPathHandler),
+ url("/flow_control", FlowControlHandler),
]
return Application(urls,
template_loader=loader,
self.assertEqual(self.fetch_json("/optional_path/"),
{u"path": None})
+ def test_flow_control(self):
+ self.assertEqual(self.fetch("/flow_control").body, b("123"))
+
class ErrorResponseTest(AsyncHTTPTestCase, LogTrapTestCase):
def get_app(self):
return template.Loader(template_path, **kwargs)
- def flush(self, include_footers=False):
- """Flushes the current output buffer to the network."""
+ def flush(self, include_footers=False, callback=None):
+ """Flushes the current output buffer to the network.
+
+ The ``callback`` argument, if given, can be used for flow control:
+ it will be run when all flushed data has been written to the socket.
+ Note that only one flush callback can be outstanding at a time;
+ if another flush occurs before the previous flush's callback
+ has been run, the previous callback will be discarded.
+ """
if self.application._wsgi:
raise Exception("WSGI applications do not support flush()")
# Ignore the chunk and only write the headers for HEAD requests
if self.request.method == "HEAD":
- if headers: self.request.write(headers)
+ if headers: self.request.write(headers, callback=callback)
return
if headers or chunk:
- self.request.write(headers + chunk)
+ self.request.write(headers + chunk, callback=callback)
def finish(self, chunk=None):
"""Finishes this response, ending the HTTP request."""