Add close_all_connections method to HTTPServer for testing cleanup.
On Python versions before 3.4, the GC has problems with generators,
so the previous approach of closing all the file descriptors and
leave the rest to the GC no longer works. (Only Python 3.3 prints
the uncollectable garbage warnings, but the problem is present
in earlier versions).
params = HTTP1ConnectionParameters()
self.params = params
self.context = context
+ self._serving_future = None
+
+ @gen.coroutine
+ def close(self):
+ self.stream.close()
+ # Block until the serving loop is done, but ignore any exceptions
+ # (start_serving is already responsible for logging them).
+ try:
+ yield self._serving_future
+ except Exception:
+ pass
def start_serving(self, delegate):
assert isinstance(delegate, httputil.HTTPServerConnectionDelegate)
+ self._serving_future = self._server_request_loop(delegate)
# Register the future on the IOLoop so its errors get logged.
- self.stream.io_loop.add_future(
- self._server_request_loop(delegate),
- lambda f: f.result())
+ self.stream.io_loop.add_future(self._serving_future,
+ lambda f: f.result())
@gen.coroutine
def _server_request_loop(self, delegate):
- while True:
- conn = HTTP1Connection(self.stream, False,
- self.params, self.context)
- request_delegate = delegate.start_request(conn)
- try:
- ret = yield conn.read_response(request_delegate)
- except iostream.StreamClosedError:
- return
- except Exception:
- # TODO: this is probably too broad; it would be better to
- # wrap all delegate calls in something that writes to app_log,
- # and then errors that reach this point can be gen_log.
- app_log.error("Uncaught exception", exc_info=True)
- conn.close()
- return
- if not ret:
- return
+ try:
+ while True:
+ conn = HTTP1Connection(self.stream, False,
+ self.params, self.context)
+ request_delegate = delegate.start_request(self, conn)
+ try:
+ ret = yield conn.read_response(request_delegate)
+ except iostream.StreamClosedError:
+ return
+ except Exception:
+ # TODO: this is probably too broad; it would be better to
+ # wrap all delegate calls in something that writes to app_log,
+ # and then errors that reach this point can be gen_log.
+ app_log.error("Uncaught exception", exc_info=True)
+ conn.close()
+ return
+ if not ret:
+ return
+ finally:
+ delegate.on_close(self)
import socket
from tornado.http1connection import HTTP1ServerConnection, HTTP1ConnectionParameters
+from tornado import gen
from tornado import httputil
from tornado import iostream
from tornado import netutil
TCPServer.__init__(self, io_loop=io_loop, ssl_options=ssl_options,
max_buffer_size=max_buffer_size,
read_chunk_size=chunk_size)
+ self._connections = set()
+
+ @gen.coroutine
+ def close_all_connections(self):
+ while self._connections:
+ # Peek at an arbitrary element of the set
+ conn = next(iter(self._connections))
+ yield conn.close()
def handle_stream(self, stream, address):
context = _HTTPRequestContext(stream, address,
self.conn_params.protocol)
conn = HTTP1ServerConnection(
stream, self.conn_params, context)
+ self._connections.add(conn)
conn.start_serving(self)
- def start_request(self, connection):
- return _ServerRequestAdapter(self, connection)
+ def start_request(self, server_conn, request_conn):
+ return _ServerRequestAdapter(self, request_conn)
+
+ def on_close(self, server_conn):
+ self._connections.remove(server_conn)
class _HTTPRequestContext(object):
class HTTPServerConnectionDelegate(object):
- def start_request(self, connection):
+ def start_request(self, server_conn, request_conn):
raise NotImplementedError()
+ def on_close(self, server_conn):
+ pass
+
class HTTPMessageDelegate(object):
def headers_received(self, start_line, headers):
def tearDown(self):
self.http_server.stop()
+ self.io_loop.run_sync(self.http_server.close_all_connections)
if (not IOLoop.initialized() or
self.http_client.io_loop is not IOLoop.instance()):
self.http_client.close()