From: Ben Darnell Date: Fri, 18 Oct 2013 19:46:55 +0000 (-0400) Subject: Initial version of asyncio/tulip-based IOLoop. X-Git-Tag: v3.2.0b1~67 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=9194631a5446bf717cdb9b1207c9766f9168b6b0;p=thirdparty%2Ftornado.git Initial version of asyncio/tulip-based IOLoop. Currently always creates a new asyncio event loop instead of using an existing one. --- diff --git a/tornado/platform/asyncio.py b/tornado/platform/asyncio.py new file mode 100644 index 000000000..f59b11028 --- /dev/null +++ b/tornado/platform/asyncio.py @@ -0,0 +1,119 @@ +"""Bridges between the `asyncio` module and Tornado IOLoop. + +This is a work in progress and interfaces are subject to change. + +To test: python3.4 -m tornado.test.runtests --ioloop=tornado.platform.asyncio.AsyncIOLoop +""" +import asyncio +import datetime +import functools +import os + +from tornado.ioloop import IOLoop +from tornado import stack_context + +# This name is really unfortunate. +class AsyncIOLoop(IOLoop): + def initialize(self): + # TODO: we need a way to use the global get_event_loop. + self.asyncio_loop = asyncio.events.new_event_loop() + self.asyncio_loop.call_soon(self.make_current) + # Maps fd to handler function (as in IOLoop.add_handler) + self.handlers = {} + # Set of fds listening for reads/writes + self.readers = set() + self.writers = set() + self.closing = False + + def close(self, all_fds=False): + self.closing = True + for fd in list(self.handlers): + self.remove_handler(fd) + if all_fds: + os.close(fd) + self.asyncio_loop.close() + + def add_handler(self, fd, handler, events): + if fd in self.handlers: + raise ValueError("fd %d added twice" % fd) + self.handlers[fd] = stack_context.wrap(handler) + if events & IOLoop.READ: + self.asyncio_loop.add_reader( + fd, self._handle_events, fd, IOLoop.READ) + self.readers.add(fd) + if events & IOLoop.WRITE: + self.asyncio_loop.add_writer( + fd, self._handle_events, fd, IOLoop.WRITE) + self.writers.add(fd) + + def update_handler(self, fd, events): + if events & IOLoop.READ: + if fd not in self.readers: + self.asyncio_loop.add_reader( + fd, self._handle_events, fd, IOLoop.READ) + self.readers.add(fd) + else: + if fd in self.readers: + self.asyncio_loop.remove_reader(fd) + self.readers.remove(fd) + if events & IOLoop.WRITE: + if fd not in self.writers: + self.asyncio_loop.add_writer( + fd, self._handle_events, fd, IOLoop.WRITE) + self.writers.add(fd) + else: + if fd in self.writers: + self.asyncio_loop.remove_writer(fd) + self.writers.remove(fd) + + def remove_handler(self, fd): + if fd not in self.handlers: + return + if fd in self.readers: + self.asyncio_loop.remove_reader(fd) + self.readers.remove(fd) + if fd in self.writers: + self.asyncio_loop.remove_writer(fd) + self.writers.remove(fd) + del self.handlers[fd] + + def _handle_events(self, fd, events): + self.handlers[fd](fd, events) + + def start(self): + self.asyncio_loop.run_forever() + + def stop(self): + self.asyncio_loop.stop() + + def _run_callback(self, callback, *args, **kwargs): + try: + callback(*args, **kwargs) + except Exception: + self.handle_callback_exception(callback) + + def add_timeout(self, deadline, callback): + if isinstance(deadline, (int, float)): + delay = max(deadline - self.time(), 0) + elif isinstance(deadline, datetime.timedelta): + delay = deadline.total_seconds() + else: + raise TypeError("Unsupported deadline %r", deadline) + return self.asyncio_loop.call_later(delay, self._run_callback, + stack_context.wrap(callback)) + + def remove_timeout(self, timeout): + timeout.cancel() + + def add_callback(self, callback, *args, **kwargs): + if self.closing: + raise RuntimeError("IOLoop is closing") + if kwargs: + self.asyncio_loop.call_soon_threadsafe(functools.partial( + self._run_callback, stack_context.wrap(callback), + *args, **kwargs)) + else: + self.asyncio_loop.call_soon_threadsafe( + self._run_callback, stack_context.wrap(callback), *args) + + add_callback_from_signal = add_callback