--- /dev/null
+"""Bridges between the `asyncio` module and Tornado IOLoop.
+
+This is a work in progress and interfaces are subject to change.
+
+To test: python3.4 -m tornado.test.runtests --ioloop=tornado.platform.asyncio.AsyncIOLoop
+"""
+import asyncio
+import datetime
+import functools
+import os
+
+from tornado.ioloop import IOLoop
+from tornado import stack_context
+
+# This name is really unfortunate.
+class AsyncIOLoop(IOLoop):
+ def initialize(self):
+ # TODO: we need a way to use the global get_event_loop.
+ self.asyncio_loop = asyncio.events.new_event_loop()
+ self.asyncio_loop.call_soon(self.make_current)
+ # Maps fd to handler function (as in IOLoop.add_handler)
+ self.handlers = {}
+ # Set of fds listening for reads/writes
+ self.readers = set()
+ self.writers = set()
+ self.closing = False
+
+ def close(self, all_fds=False):
+ self.closing = True
+ for fd in list(self.handlers):
+ self.remove_handler(fd)
+ if all_fds:
+ os.close(fd)
+ self.asyncio_loop.close()
+
+ def add_handler(self, fd, handler, events):
+ if fd in self.handlers:
+ raise ValueError("fd %d added twice" % fd)
+ self.handlers[fd] = stack_context.wrap(handler)
+ if events & IOLoop.READ:
+ self.asyncio_loop.add_reader(
+ fd, self._handle_events, fd, IOLoop.READ)
+ self.readers.add(fd)
+ if events & IOLoop.WRITE:
+ self.asyncio_loop.add_writer(
+ fd, self._handle_events, fd, IOLoop.WRITE)
+ self.writers.add(fd)
+
+ def update_handler(self, fd, events):
+ if events & IOLoop.READ:
+ if fd not in self.readers:
+ self.asyncio_loop.add_reader(
+ fd, self._handle_events, fd, IOLoop.READ)
+ self.readers.add(fd)
+ else:
+ if fd in self.readers:
+ self.asyncio_loop.remove_reader(fd)
+ self.readers.remove(fd)
+ if events & IOLoop.WRITE:
+ if fd not in self.writers:
+ self.asyncio_loop.add_writer(
+ fd, self._handle_events, fd, IOLoop.WRITE)
+ self.writers.add(fd)
+ else:
+ if fd in self.writers:
+ self.asyncio_loop.remove_writer(fd)
+ self.writers.remove(fd)
+
+ def remove_handler(self, fd):
+ if fd not in self.handlers:
+ return
+ if fd in self.readers:
+ self.asyncio_loop.remove_reader(fd)
+ self.readers.remove(fd)
+ if fd in self.writers:
+ self.asyncio_loop.remove_writer(fd)
+ self.writers.remove(fd)
+ del self.handlers[fd]
+
+ def _handle_events(self, fd, events):
+ self.handlers[fd](fd, events)
+
+ def start(self):
+ self.asyncio_loop.run_forever()
+
+ def stop(self):
+ self.asyncio_loop.stop()
+
+ def _run_callback(self, callback, *args, **kwargs):
+ try:
+ callback(*args, **kwargs)
+ except Exception:
+ self.handle_callback_exception(callback)
+
+ def add_timeout(self, deadline, callback):
+ if isinstance(deadline, (int, float)):
+ delay = max(deadline - self.time(), 0)
+ elif isinstance(deadline, datetime.timedelta):
+ delay = deadline.total_seconds()
+ else:
+ raise TypeError("Unsupported deadline %r", deadline)
+ return self.asyncio_loop.call_later(delay, self._run_callback,
+ stack_context.wrap(callback))
+
+ def remove_timeout(self, timeout):
+ timeout.cancel()
+
+ def add_callback(self, callback, *args, **kwargs):
+ if self.closing:
+ raise RuntimeError("IOLoop is closing")
+ if kwargs:
+ self.asyncio_loop.call_soon_threadsafe(functools.partial(
+ self._run_callback, stack_context.wrap(callback),
+ *args, **kwargs))
+ else:
+ self.asyncio_loop.call_soon_threadsafe(
+ self._run_callback, stack_context.wrap(callback), *args)
+
+ add_callback_from_signal = add_callback