From: Alek Storm Date: Mon, 12 Sep 2011 05:32:43 +0000 (+0000) Subject: Move `TCPServer` to `netutil`, change `handle_stream` callback to overridden method... X-Git-Tag: v2.1.0~10^2 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=437c477ac65191dd33f70e05610e5f22d7e21916;p=thirdparty%2Ftornado.git Move `TCPServer` to `netutil`, change `handle_stream` callback to overridden method hook, move IP address hack for unix domain sockets to `HTTPConnection` --- diff --git a/tornado/httpserver.py b/tornado/httpserver.py index f9cff0dbe..84337458b 100644 --- a/tornado/httpserver.py +++ b/tornado/httpserver.py @@ -35,7 +35,7 @@ from tornado.escape import utf8, native_str, parse_qs_bytes from tornado import httputil from tornado import ioloop from tornado import iostream -from tornado import netutil +from tornado.netutil import TCPServer from tornado import process from tornado import stack_context from tornado.util import b, bytes_type @@ -45,188 +45,6 @@ try: except ImportError: ssl = None - -class TCPServer(object): - r"""A non-blocking, single-threaded TCP server. - - `TCPServer` can serve SSL traffic with Python 2.6+ and OpenSSL. - To make this server serve SSL traffic, send the ssl_options dictionary - argument with the arguments required for the `ssl.wrap_socket` method, - including "certfile" and "keyfile":: - - TCPServer(applicaton, ssl_options={ - "certfile": os.path.join(data_dir, "mydomain.crt"), - "keyfile": os.path.join(data_dir, "mydomain.key"), - }) - - `TCPServer` initialization follows one of three patterns: - - 1. `listen`: simple single-process:: - - server = TCPServer(app) - server.listen(8888) - IOLoop.instance().start() - - 2. `bind`/`start`: simple multi-process:: - - server = TCPServer(app) - server.bind(8888) - server.start(0) # Forks multiple sub-processes - IOLoop.instance().start() - - When using this interface, an `IOLoop` must *not* be passed - to the `TCPServer` constructor. `start` will always start - the server on the default singleton `IOLoop`. - - 3. `add_sockets`: advanced multi-process:: - - sockets = tornado.netutil.bind_sockets(8888) - tornado.process.fork_processes(0) - server = TCPServer(app) - server.add_sockets(sockets) - IOLoop.instance().start() - - The `add_sockets` interface is more complicated, but it can be - used with `tornado.process.fork_processes` to give you more - flexibility in when the fork happens. `add_sockets` can - also be used in single-process servers if you want to create - your listening sockets in some way other than - `tornado.netutil.bind_sockets`. - """ - def __init__(self, handle_stream, io_loop=None, ssl_options=None): - self.handle_stream = handle_stream - self.io_loop = io_loop - self.ssl_options = ssl_options - self._sockets = {} # fd -> socket object - self._pending_sockets = [] - self._started = False - - def listen(self, port, address=""): - """Starts accepting connections on the given port. - - This method may be called more than once to listen on multiple ports. - `listen` takes effect immediately; it is not necessary to call - `TCPServer.start` afterwards. It is, however, necessary to start - the `IOLoop`. - """ - sockets = netutil.bind_sockets(port, address=address) - self.add_sockets(sockets) - - def add_sockets(self, sockets): - """Makes this server start accepting connections on the given sockets. - - The ``sockets`` parameter is a list of socket objects such as - those returned by `tornado.netutil.bind_sockets`. - `add_sockets` is typically used in combination with that - method and `tornado.process.fork_processes` to provide greater - control over the initialization of a multi-process server. - """ - if self.io_loop is None: - self.io_loop = ioloop.IOLoop.instance() - - for sock in sockets: - self._sockets[sock.fileno()] = sock - netutil.add_accept_handler(sock, self._handle_connection, - io_loop=self.io_loop) - - def add_socket(self, socket): - """Singular version of `add_sockets`. Takes a single socket object.""" - self.add_sockets([socket]) - - def bind(self, port, address=None, family=socket.AF_UNSPEC, backlog=128): - """Binds this server to the given port on the given address. - - To start the server, call `start`. If you want to run this server - in a single process, you can call `listen` as a shortcut to the - sequence of `bind` and `start` calls. - - Address may be either an IP address or hostname. If it's a hostname, - the server will listen on all IP addresses associated with the - name. Address may be an empty string or None to listen on all - available interfaces. Family may be set to either ``socket.AF_INET`` - or ``socket.AF_INET6`` to restrict to ipv4 or ipv6 addresses, otherwise - both will be used if available. - - The ``backlog`` argument has the same meaning as for - `socket.listen`. - - This method may be called multiple times prior to `start` to listen - on multiple ports or interfaces. - """ - sockets = netutil.bind_sockets(port, address=address, - family=family, backlog=backlog) - if self._started: - self.add_sockets(sockets) - else: - self._pending_sockets.extend(sockets) - - def start(self, num_processes=1): - """Starts this server in the IOLoop. - - By default, we run the server in this process and do not fork any - additional child process. - - If num_processes is ``None`` or <= ``0``, we detect the number of cores - available on this machine and fork that number of child - processes. If num_processes is given and > ```1``, we fork that - specific number of sub-processes. - - Since we use processes and not threads, there is no shared memory - between any server code. - - Note that multiple processes are not compatible with the autoreload - module (or the ``debug=True`` option to `tornado.web.Application`). - When using multiple processes, no IOLoops can be created or - referenced until after the call to ``TCPServer.start(n)``. - """ - assert not self._started - self._started = True - if num_processes != 1: - process.fork_processes(num_processes) - sockets = self._pending_sockets - self._pending_sockets = [] - self.add_sockets(sockets) - - def stop(self): - """Stops listening for new connections. - - Requests currently in progress may still continue after the - server is stopped. - """ - for fd, sock in self._sockets.iteritems(): - self.io_loop.remove_handler(fd) - sock.close() - - def _handle_connection(self, connection, address): - if self.ssl_options is not None: - assert ssl, "Python 2.6+ and OpenSSL required for SSL" - try: - connection = ssl.wrap_socket(connection, - server_side=True, - do_handshake_on_connect=False, - **self.ssl_options) - except ssl.SSLError, err: - if err.args[0] == ssl.SSL_ERROR_EOF: - return connection.close() - else: - raise - except socket.error, err: - if err.args[0] == errno.ECONNABORTED: - return connection.close() - else: - raise - try: - if self.ssl_options is not None: - stream = iostream.SSLIOStream(connection, io_loop=self.io_loop) - else: - stream = iostream.IOStream(connection, io_loop=self.io_loop) - if connection.family not in (socket.AF_INET, socket.AF_INET6): - # Unix (or other) socket; fake the remote address - address = ('0.0.0.0', 0) - self.handle_stream(stream, address) - except Exception: - logging.error("Error in connection callback", exc_info=True) - class HTTPServer(TCPServer): """ A server is defined by a request callback that takes an HTTPRequest @@ -273,9 +91,9 @@ class HTTPServer(TCPServer): self.request_callback = request_callback self.no_keep_alive = no_keep_alive self.xheaders = xheaders - TCPServer.__init__(self, self._handle_stream, **kwargs) + TCPServer.__init__(self, **kwargs) - def _handle_stream(self, stream, address): + def handle_stream(self, stream, address): HTTPConnection(stream, address, self.request_callback, self.no_keep_alive, self.xheaders) @@ -292,6 +110,9 @@ class HTTPConnection(object): def __init__(self, stream, address, request_callback, no_keep_alive=False, xheaders=False): self.stream = stream + if self.stream.socket.family not in (socket.AF_INET, socket.AF_INET6): + # Unix (or other) socket; fake the remote address + address = ('0.0.0.0', 0) self.address = address self.request_callback = request_callback self.no_keep_alive = no_keep_alive diff --git a/tornado/netutil.py b/tornado/netutil.py index 557b227ab..d223fd84d 100644 --- a/tornado/netutil.py +++ b/tornado/netutil.py @@ -17,13 +17,203 @@ """Miscellaneous network utility code.""" import errno +import logging import os import socket import stat from tornado.ioloop import IOLoop +from tornado.iostream import IOStream, SSLIOStream from tornado.platform.auto import set_close_exec +try: + import ssl # Python 2.6+ +except ImportError: + ssl = None + +class TCPServer(object): + r"""A non-blocking, single-threaded server with additional methods for TCP + connections. + + `TCPServer` can serve SSL traffic with Python 2.6+ and OpenSSL. + To make this server serve SSL traffic, send the ssl_options dictionary + argument with the arguments required for the `ssl.wrap_socket` method, + including "certfile" and "keyfile":: + + TCPServer(applicaton, ssl_options={ + "certfile": os.path.join(data_dir, "mydomain.crt"), + "keyfile": os.path.join(data_dir, "mydomain.key"), + }) + + `TCPServer` initialization follows one of three patterns: + + 1. `listen`: simple single-process:: + + server = TCPServer(app) + server.listen(8888) + IOLoop.instance().start() + + 2. `bind`/`start`: simple multi-process:: + + server = TCPServer(app) + server.bind(8888) + server.start(0) # Forks multiple sub-processes + IOLoop.instance().start() + + When using this interface, an `IOLoop` must *not* be passed + to the `TCPServer` constructor. `start` will always start + the server on the default singleton `IOLoop`. + + 3. `add_sockets`: advanced multi-process:: + + sockets = bind_sockets(8888) + tornado.process.fork_processes(0) + server = TCPServer(app) + server.add_sockets(sockets) + IOLoop.instance().start() + + The `add_sockets` interface is more complicated, but it can be + used with `tornado.process.fork_processes` to give you more + flexibility in when the fork happens. `add_sockets` can + also be used in single-process servers if you want to create + your listening sockets in some way other than + `bind_sockets`. + """ + def __init__(self, io_loop=None, ssl_options=None): + self.io_loop = io_loop + self.ssl_options = ssl_options + self._sockets = {} # fd -> socket object + self._pending_sockets = [] + self._started = False + + def listen(self, port, address=""): + """Starts accepting connections on the given port. + + This method may be called more than once to listen on multiple ports. + `listen` takes effect immediately; it is not necessary to call + `TCPServer.start` afterwards. It is, however, necessary to start + the `IOLoop`. + """ + sockets = bind_sockets(port, address=address) + self.add_sockets(sockets) + + def add_sockets(self, sockets): + """Makes this server start accepting connections on the given sockets. + + The ``sockets`` parameter is a list of socket objects such as + those returned by `bind_sockets`. + `add_sockets` is typically used in combination with that + method and `tornado.process.fork_processes` to provide greater + control over the initialization of a multi-process server. + """ + if self.io_loop is None: + self.io_loop = IOLoop.instance() + + for sock in sockets: + self._sockets[sock.fileno()] = sock + add_accept_handler(sock, self._handle_connection, + io_loop=self.io_loop) + + def add_socket(self, socket): + """Singular version of `add_sockets`. Takes a single socket object.""" + self.add_sockets([socket]) + + def bind(self, port, address=None, family=socket.AF_UNSPEC, backlog=128): + """Binds this server to the given port on the given address. + + To start the server, call `start`. If you want to run this server + in a single process, you can call `listen` as a shortcut to the + sequence of `bind` and `start` calls. + + Address may be either an IP address or hostname. If it's a hostname, + the server will listen on all IP addresses associated with the + name. Address may be an empty string or None to listen on all + available interfaces. Family may be set to either ``socket.AF_INET`` + or ``socket.AF_INET6`` to restrict to ipv4 or ipv6 addresses, otherwise + both will be used if available. + + The ``backlog`` argument has the same meaning as for + `socket.listen`. + + This method may be called multiple times prior to `start` to listen + on multiple ports or interfaces. + """ + sockets = bind_sockets(port, address=address, family=family, + backlog=backlog) + if self._started: + self.add_sockets(sockets) + else: + self._pending_sockets.extend(sockets) + + def start(self, num_processes=1): + """Starts this server in the IOLoop. + + By default, we run the server in this process and do not fork any + additional child process. + + If num_processes is ``None`` or <= 0, we detect the number of cores + available on this machine and fork that number of child + processes. If num_processes is given and > 1, we fork that + specific number of sub-processes. + + Since we use processes and not threads, there is no shared memory + between any server code. + + Note that multiple processes are not compatible with the autoreload + module (or the ``debug=True`` option to `tornado.web.Application`). + When using multiple processes, no IOLoops can be created or + referenced until after the call to ``TCPServer.start(n)``. + """ + assert not self._started + self._started = True + if num_processes != 1: + process.fork_processes(num_processes) + sockets = self._pending_sockets + self._pending_sockets = [] + self.add_sockets(sockets) + + def stop(self): + """Stops listening for new connections. + + Requests currently in progress may still continue after the + server is stopped. + """ + for fd, sock in self._sockets.iteritems(): + self.io_loop.remove_handler(fd) + sock.close() + + def handle_stream(self, stream, address): + """Override to handle a new `IOStream` from an incoming connection.""" + raise NotImplementedError() + + def _handle_connection(self, connection, address): + if self.ssl_options is not None: + assert ssl, "Python 2.6+ and OpenSSL required for SSL" + try: + connection = ssl.wrap_socket(connection, + server_side=True, + do_handshake_on_connect=False, + **self.ssl_options) + except ssl.SSLError, err: + if err.args[0] == ssl.SSL_ERROR_EOF: + return connection.close() + else: + raise + except socket.error, err: + if err.args[0] == errno.ECONNABORTED: + return connection.close() + else: + raise + try: + if self.ssl_options is not None: + stream = SSLIOStream(connection, io_loop=self.io_loop) + else: + stream = IOStream(connection, io_loop=self.io_loop) + self.handle_stream(stream, address) + except Exception: + logging.error("Error in connection callback", exc_info=True) + + def bind_sockets(port, address=None, family=socket.AF_UNSPEC, backlog=128): """Creates listening sockets bound to the given port and address.