]> git.ipfire.org Git - thirdparty/tornado.git/commitdiff
Add a callback for flow control to RequestHandler.flush
authorBen Darnell <ben@bendarnell.com>
Thu, 14 Jul 2011 03:09:02 +0000 (20:09 -0700)
committerBen Darnell <ben@bendarnell.com>
Thu, 14 Jul 2011 03:09:02 +0000 (20:09 -0700)
tornado/httpserver.py
tornado/test/web_test.py
tornado/web.py

index deb8dcbc0aff730326c437503a8ea41dfdd8a8e8..19bbb388bad8beb6b16bf830b58101b73bdecf55 100644 (file)
@@ -292,11 +292,13 @@ class HTTPConnection(object):
         # contexts from one request from leaking into the next.
         self._header_callback = stack_context.wrap(self._on_headers)
         self.stream.read_until(b("\r\n\r\n"), self._header_callback)
+        self._write_callback = None
 
-    def write(self, chunk):
+    def write(self, chunk, callback=None):
         """Writes a chunk of output to the stream."""
         assert self._request, "Request closed"
         if not self.stream.closed():
+            self._write_callback = stack_context.wrap(callback)
             self.stream.write(chunk, self._on_write_complete)
 
     def finish(self):
@@ -307,6 +309,10 @@ class HTTPConnection(object):
             self._finish_request()
 
     def _on_write_complete(self):
+        if self._write_callback is not None:
+            callback = self._write_callback
+            self._write_callback = None
+            callback()            
         if self._request_finished:
             self._finish_request()
 
@@ -503,10 +509,10 @@ class HTTPRequest(object):
         """Returns True if this request supports HTTP/1.1 semantics"""
         return self.version == "HTTP/1.1"
 
-    def write(self, chunk):
+    def write(self, chunk, callback=None):
         """Writes the given chunk to the response stream."""
         assert isinstance(chunk, bytes_type)
-        self.connection.write(chunk)
+        self.connection.write(chunk, callback=callback)
 
     def finish(self):
         """Finishes this HTTP request on the open connection."""
index 4baeaa142f3abb42d7dc3e18d628fefa9b179a4b..137516fb99023461f1167ad5f9d98a13df903e2f 100644 (file)
@@ -297,6 +297,22 @@ class OptionalPathHandler(RequestHandler):
     def get(self, path):
         self.write({"path": path})
 
+class FlowControlHandler(RequestHandler):
+    # These writes are too small to demonstrate real flow control,
+    # but at least it shows that the callbacks get run.
+    @asynchronous
+    def get(self):
+        self.write("1")
+        self.flush(callback=self.step2)
+
+    def step2(self):
+        self.write("2")
+        self.flush(callback=self.step3)
+
+    def step3(self):
+        self.write("3")
+        self.finish()
+
 class WebTest(AsyncHTTPTestCase, LogTrapTestCase):
     def get_app(self):
         loader = DictLoader({
@@ -318,6 +334,7 @@ class WebTest(AsyncHTTPTestCase, LogTrapTestCase):
             url("/linkify", LinkifyHandler),
             url("/uimodule_resources", UIModuleResourceHandler),
             url("/optional_path/(.+)?", OptionalPathHandler),
+            url("/flow_control", FlowControlHandler),
             ]
         return Application(urls,
                            template_loader=loader,
@@ -395,6 +412,9 @@ js_embed()
         self.assertEqual(self.fetch_json("/optional_path/"),
                          {u"path": None})
 
+    def test_flow_control(self):
+        self.assertEqual(self.fetch("/flow_control").body, b("123"))
+
 
 class ErrorResponseTest(AsyncHTTPTestCase, LogTrapTestCase):
     def get_app(self):
index 56bc34678a3e4dcae761256f65aa5026d3e5693c..bdb1d2188587117dfa65422456f32d9e266216b8 100644 (file)
@@ -581,8 +581,15 @@ class RequestHandler(object):
         return template.Loader(template_path, **kwargs)
 
 
-    def flush(self, include_footers=False):
-        """Flushes the current output buffer to the network."""
+    def flush(self, include_footers=False, callback=None):
+        """Flushes the current output buffer to the network.
+        
+        The ``callback`` argument, if given, can be used for flow control:
+        it will be run when all flushed data has been written to the socket.
+        Note that only one flush callback can be outstanding at a time;
+        if another flush occurs before the previous flush's callback
+        has been run, the previous callback will be discarded.
+        """
         if self.application._wsgi:
             raise Exception("WSGI applications do not support flush()")
 
@@ -601,11 +608,11 @@ class RequestHandler(object):
 
         # Ignore the chunk and only write the headers for HEAD requests
         if self.request.method == "HEAD":
-            if headers: self.request.write(headers)
+            if headers: self.request.write(headers, callback=callback)
             return
 
         if headers or chunk:
-            self.request.write(headers + chunk)
+            self.request.write(headers + chunk, callback=callback)
 
     def finish(self, chunk=None):
         """Finishes this response, ending the HTTP request."""