From: Ben Darnell Date: Thu, 14 Jul 2011 03:09:02 +0000 (-0700) Subject: Add a callback for flow control to RequestHandler.flush X-Git-Tag: v2.1.0~78^2~6 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=f0438e5d9f296086ae6a131e1fd3063095b77071;p=thirdparty%2Ftornado.git Add a callback for flow control to RequestHandler.flush --- diff --git a/tornado/httpserver.py b/tornado/httpserver.py index deb8dcbc0..19bbb388b 100644 --- a/tornado/httpserver.py +++ b/tornado/httpserver.py @@ -292,11 +292,13 @@ class HTTPConnection(object): # contexts from one request from leaking into the next. self._header_callback = stack_context.wrap(self._on_headers) self.stream.read_until(b("\r\n\r\n"), self._header_callback) + self._write_callback = None - def write(self, chunk): + def write(self, chunk, callback=None): """Writes a chunk of output to the stream.""" assert self._request, "Request closed" if not self.stream.closed(): + self._write_callback = stack_context.wrap(callback) self.stream.write(chunk, self._on_write_complete) def finish(self): @@ -307,6 +309,10 @@ class HTTPConnection(object): self._finish_request() def _on_write_complete(self): + if self._write_callback is not None: + callback = self._write_callback + self._write_callback = None + callback() if self._request_finished: self._finish_request() @@ -503,10 +509,10 @@ class HTTPRequest(object): """Returns True if this request supports HTTP/1.1 semantics""" return self.version == "HTTP/1.1" - def write(self, chunk): + def write(self, chunk, callback=None): """Writes the given chunk to the response stream.""" assert isinstance(chunk, bytes_type) - self.connection.write(chunk) + self.connection.write(chunk, callback=callback) def finish(self): """Finishes this HTTP request on the open connection.""" diff --git a/tornado/test/web_test.py b/tornado/test/web_test.py index 4baeaa142..137516fb9 100644 --- a/tornado/test/web_test.py +++ b/tornado/test/web_test.py @@ -297,6 +297,22 @@ class OptionalPathHandler(RequestHandler): def get(self, path): self.write({"path": path}) +class FlowControlHandler(RequestHandler): + # These writes are too small to demonstrate real flow control, + # but at least it shows that the callbacks get run. + @asynchronous + def get(self): + self.write("1") + self.flush(callback=self.step2) + + def step2(self): + self.write("2") + self.flush(callback=self.step3) + + def step3(self): + self.write("3") + self.finish() + class WebTest(AsyncHTTPTestCase, LogTrapTestCase): def get_app(self): loader = DictLoader({ @@ -318,6 +334,7 @@ class WebTest(AsyncHTTPTestCase, LogTrapTestCase): url("/linkify", LinkifyHandler), url("/uimodule_resources", UIModuleResourceHandler), url("/optional_path/(.+)?", OptionalPathHandler), + url("/flow_control", FlowControlHandler), ] return Application(urls, template_loader=loader, @@ -395,6 +412,9 @@ js_embed() self.assertEqual(self.fetch_json("/optional_path/"), {u"path": None}) + def test_flow_control(self): + self.assertEqual(self.fetch("/flow_control").body, b("123")) + class ErrorResponseTest(AsyncHTTPTestCase, LogTrapTestCase): def get_app(self): diff --git a/tornado/web.py b/tornado/web.py index 56bc34678..bdb1d2188 100644 --- a/tornado/web.py +++ b/tornado/web.py @@ -581,8 +581,15 @@ class RequestHandler(object): return template.Loader(template_path, **kwargs) - def flush(self, include_footers=False): - """Flushes the current output buffer to the network.""" + def flush(self, include_footers=False, callback=None): + """Flushes the current output buffer to the network. + + The ``callback`` argument, if given, can be used for flow control: + it will be run when all flushed data has been written to the socket. + Note that only one flush callback can be outstanding at a time; + if another flush occurs before the previous flush's callback + has been run, the previous callback will be discarded. + """ if self.application._wsgi: raise Exception("WSGI applications do not support flush()") @@ -601,11 +608,11 @@ class RequestHandler(object): # Ignore the chunk and only write the headers for HEAD requests if self.request.method == "HEAD": - if headers: self.request.write(headers) + if headers: self.request.write(headers, callback=callback) return if headers or chunk: - self.request.write(headers + chunk) + self.request.write(headers + chunk, callback=callback) def finish(self, chunk=None): """Finishes this response, ending the HTTP request."""