--- /dev/null
+import os
+import sys
+import thread
+
+import tornado.twisted.reactor
+tornado.twisted.reactor.install()
+from twisted.internet import reactor
+
+from twisted.internet.interfaces import IReadDescriptor, IWriteDescriptor
+
+from twisted.python import log
+
+from tornado.twisted.reactor import TornadoReactor
+from tornado.testing import AsyncTestCase, LogTrapTestCase
+import unittest
+
+from zope.interface import implements
+
+log.startLogging(sys.stdout)
+
+class ReactorWhenRunningTest(unittest.TestCase):
+ def setUp(self):
+ self._reactor = TornadoReactor()
+
+ def test_whenRunning(self):
+ self._whenRunningCalled = False
+ self._anotherWhenRunningCalled = False
+ self._reactor.callWhenRunning(self.whenRunningCallback)
+ self._reactor.run()
+ self.assertTrue(self._whenRunningCalled)
+ self.assertTrue(self._anotherWhenRunningCalled)
+
+ def whenRunningCallback(self):
+ self._whenRunningCalled = True
+ self._reactor.callWhenRunning(self.anotherWhenRunningCallback)
+ self._reactor.stop()
+
+ def anotherWhenRunningCallback(self):
+ self._anotherWhenRunningCalled = True
+
+class ReactorCallLaterTest(unittest.TestCase):
+ def setUp(self):
+ self._reactor = TornadoReactor()
+
+ def test_callLater(self):
+ self._laterCalled = False
+ self._now = self._reactor.seconds()
+ self._timeout = 0.001
+ dc = self._reactor.callLater(self._timeout, self.callLaterCallback)
+ self.assertEqual(self._reactor.getDelayedCalls(), [dc])
+ self._reactor.run()
+ self.assertTrue(self._laterCalled)
+ self.assertGreater(self._called - self._now, self._timeout)
+ self.assertEqual(self._reactor.getDelayedCalls(), [])
+
+ def callLaterCallback(self):
+ self._laterCalled = True
+ self._called = self._reactor.seconds()
+ self._reactor.stop()
+
+class ReactorTwoCallLaterTest(unittest.TestCase):
+ def setUp(self):
+ self._reactor = TornadoReactor()
+
+ def test_callLater(self):
+ self._later1Called = False
+ self._later2Called = False
+ self._now = self._reactor.seconds()
+ self._timeout1 = 0.0005
+ dc1 = self._reactor.callLater(self._timeout1, self.callLaterCallback1)
+ self._timeout2 = 0.001
+ dc2 = self._reactor.callLater(self._timeout2, self.callLaterCallback2)
+ self.assertTrue(self._reactor.getDelayedCalls() == [dc1, dc2] or
+ self._reactor.getDelayedCalls() == [dc2, dc1])
+ self._reactor.run()
+ self.assertTrue(self._later1Called)
+ self.assertTrue(self._later2Called)
+ self.assertGreater(self._called1 - self._now, self._timeout1)
+ self.assertGreater(self._called2 - self._now, self._timeout2)
+ self.assertEqual(self._reactor.getDelayedCalls(), [])
+
+ def callLaterCallback1(self):
+ self._later1Called = True
+ self._called1 = self._reactor.seconds()
+
+ def callLaterCallback2(self):
+ self._later2Called = True
+ self._called2 = self._reactor.seconds()
+ self._reactor.stop()
+
+class ReactorCallFromThreadTest(unittest.TestCase):
+ def setUp(self):
+ self._reactor = TornadoReactor()
+ self._mainThread = thread.get_ident()
+
+ def _newThreadRun(self, a, b):
+ self.assertEqual(self._thread, thread.get_ident())
+ self._reactor.callFromThread(self._fnCalledFromThread)
+
+ def _fnCalledFromThread(self):
+ self.assertEqual(self._mainThread, thread.get_ident())
+ self._reactor.stop()
+
+ def _whenRunningCallback(self):
+ self._thread = thread.start_new_thread(self._newThreadRun, (None, None))
+
+ def testCallFromThread(self):
+ self._reactor.callWhenRunning(self._whenRunningCallback)
+ self._reactor.run()
+
+class ReactorCallInThread(unittest.TestCase):
+ def setUp(self):
+ self._reactor = TornadoReactor()
+ self._mainThread = thread.get_ident()
+
+ def _fnCalledInThread(self, *args, **kwargs):
+ self.assertNotEqual(thread.get_ident(), self._mainThread)
+ self._reactor.callFromThread(lambda: self._reactor.stop())
+
+ def _whenRunningCallback(self):
+ self._reactor.callInThread(self._fnCalledInThread)
+
+ def testCallInThread(self):
+ self._reactor.callWhenRunning(self._whenRunningCallback)
+ self._reactor.run()
+
+class Reader:
+ implements(IReadDescriptor)
+
+ def __init__(self, fd, callback):
+ self._fd = fd
+ self._callback = callback
+
+ def logPrefix(self): return "Reader"
+
+ def fileno(self):
+ return self._fd.fileno()
+
+ def connectionLost(self, reason):
+ return
+
+ def doRead(self):
+ self._callback(self._fd)
+
+class Writer:
+ implements(IWriteDescriptor)
+
+ def __init__(self, fd, callback):
+ self._fd = fd
+ self._callback = callback
+
+ def logPrefix(self): return "Writer"
+
+ def fileno(self):
+ return self._fd.fileno()
+
+ def connectionLost(self, reason):
+ return
+
+ def doWrite(self):
+ self._callback(self._fd)
+
+class ReactorReaderWriterTest(unittest.TestCase):
+ def setUp(self):
+ self._reactor = TornadoReactor()
+ r, w = os.pipe()
+ self._reactor._ioloop._set_nonblocking(r)
+ self._reactor._ioloop._set_nonblocking(w)
+ self._reactor._ioloop._set_close_exec(r)
+ self._reactor._ioloop._set_close_exec(w)
+ self._p1 = os.fdopen(r, "rb", 0)
+ self._p2 = os.fdopen(w, "wb", 0)
+
+ def _testReadWrite(self):
+ """
+ In this test the writer writes an 'x' to its fd. The reader
+ reads it, check the value and ends the test.
+ """
+ def checkReadInput(fd):
+ self.assertEqual(fd.read(), 'x')
+ self._reactor.stop()
+ self._reader = Reader(self._p1, checkReadInput)
+ self._writer = Writer(self._p2, lambda fd: fd.write('x'))
+ self._reactor.addWriter(self._writer)
+ self._reactor.removeWriter(self._writer)
+ self._reactor.addWriter(self._writer)
+ # Test the add/remove reader functionality
+ self._reactor.addReader(self._writer)
+ self._reactor.removeReader(self._writer)
+
+ self._reactor.addReader(self._reader)
+ self._reactor.removeReader(self._reader)
+ self._reactor.addReader(self._reader)
+ # Test the add/remove writer functionality
+ self._reactor.addWriter(self._reader)
+ self._reactor.removeWriter(self._reader)
+
+ def testReadWrite(self):
+ self._reactor.callWhenRunning(self._testReadWrite)
+ self._reactor.run()
+
+if __name__ == "__main__":
+ unittest.main()
--- /dev/null
+# System imports
+import errno, functools, sys
+import time
+
+from twisted.internet.base import DelayedCall
+from twisted.internet.posixbase import PosixReactorBase
+from twisted.internet.interfaces import \
+ IReactorFDSet, IDelayedCall, IReactorTime
+
+from zope.interface import implements
+
+import tornado
+import tornado.ioloop
+from tornado.ioloop import IOLoop
+
+class TornadoDelayedCall(object):
+ implements(IDelayedCall)
+
+ def __init__(self, reactor, seconds, f, *args, **kw):
+ self._reactor = reactor
+ self._func = functools.partial(f, *args, **kw)
+ self._time = self._reactor.seconds() + seconds
+ self._timeout = self._reactor._ioloop.add_timeout(self._time,
+ self._called)
+ self._active = True
+
+ def _called(self):
+ self._active = False
+ self._reactor._removeDelayedCall(self)
+ try:
+ self._func()
+ except:
+ print "reactor.py _called caught exception: %s" % sys.exc_info()[0]
+
+ def getTime(self):
+ return self._time
+
+ def cancel(self):
+ self._active = False
+ self._reactor._ioloop.remove_timeout(self._timeout)
+ self._reactor._removeDelayedCall(self)
+
+ def delay(self, seconds):
+ self._reactor._ioloop.remove_timeout(self._timeout)
+ self._time += seconds
+ self._timeout = self._reactor._ioloop.add_timeout(self._time,
+ self._called)
+
+ def reset(self, seconds):
+ self._reactor._ioloop.remove_timeout(self._timeout)
+ self._time = self._reactor.seconds() + seconds
+ self._timeout = self._reactor._ioloop.add_timeout(self._time,
+ self._called)
+
+ def active(self):
+ return self._active
+
+class TornadoReactor(PosixReactorBase):
+ """
+ Twisted style reactor for Tornado.
+ """
+ implements(IReactorTime, IReactorFDSet)
+
+ def __init__(self, ioloop=tornado.ioloop.IOLoop.instance()):
+ self._ioloop = ioloop
+ self._readers = {}
+ self._writers = {}
+ self._fds = {} # a map of fd to a (reader, writer) tuple
+ self._delayedCalls = {}
+ # self._waker = None
+ PosixReactorBase.__init__(self)
+
+ # IReactorTime
+ def seconds(self):
+ return time.time()
+
+ def callLater(self, seconds, f, *args, **kw):
+ dc = TornadoDelayedCall(self, seconds, f, *args, **kw)
+ self._delayedCalls[dc] = True
+ return dc
+
+ def getDelayedCalls(self):
+ return [x for x in self._delayedCalls if x._active]
+
+ def _removeDelayedCall(self, dc):
+ if dc in self._delayedCalls:
+ del self._delayedCalls[dc]
+
+ # IReactorThreads
+ def callFromThread(self, f, *args, **kw):
+ """
+ See L{twisted.internet.interfaces.IReactorThreads.callFromThread}.
+ """
+ assert callable(f), "%s is not callable" % f
+ p = functools.partial(f, *args, **kw)
+ self._ioloop.add_callback(p)
+
+ # We don't need the waker code from the super class, Tornado uses
+ # its own waker.
+ def installWaker(self):
+ pass
+
+ def wakeUp(self):
+ pass
+
+ # IReactorFDSet
+ def _invoke_callback(self, fd, events):
+ (reader, writer) = self._fds[fd]
+ if events | IOLoop.READ and reader:
+ reader.doRead()
+ if events | IOLoop.WRITE and writer:
+ writer.doWrite()
+
+ def addReader(self, reader):
+ """
+ Add a FileDescriptor for notification of data available to read.
+ """
+ self._readers[reader] = True
+ fd = reader.fileno()
+ if fd in self._fds:
+ (_, writer) = self._fds[fd]
+ self._fds[fd] = (reader, writer)
+ if writer:
+ # We already registered this fd for write events,
+ # update it for read events as well.
+ self._ioloop.update_handler(fd, IOLoop.READ | IOLoop.WRITE)
+ else:
+ self._fds[fd] = (reader, None)
+ self._ioloop.add_handler(fd, self._invoke_callback, IOLoop.READ)
+
+ def addWriter(self, writer):
+ """
+ Add a FileDescriptor for notification of data available to write.
+ """
+ self._writers[writer] = True
+ fd = writer.fileno()
+ if fd in self._fds:
+ (reader, _) = self._fds[fd]
+ self._fds[fd] = (reader, writer)
+ if reader:
+ # We already registered this fd for read events,
+ # update it for write events as well.
+ self._ioloop.update_handler(fd, IOLoop.READ | IOLoop.WRITE)
+ else:
+ self._fds[fd] = (None, writer)
+ self._ioloop.add_handler(fd, self._invoke_callback, IOLoop.WRITE)
+
+ def removeReader(self, reader):
+ """
+ Remove a Selectable for notification of data available to read.
+ """
+ fd = reader.fileno()
+ if reader in self._readers:
+ del self._readers[reader]
+ (_, writer) = self._fds[fd]
+ if writer:
+ # We have a writer so we need to update the IOLoop for
+ # write events only.
+ self._fds[fd] = (None, writer)
+ self._ioloop.update_handler(fd, IOLoop.WRITE)
+ else:
+ # Since we have no writer registered, we remove the
+ # entry from _fds and unregister the handler from the
+ # IOLoop
+ del self._fds[fd]
+ self._ioloop.remove_handler(fd)
+
+ def removeWriter(self, writer):
+ """
+ Remove a Selectable for notification of data available to write.
+ """
+ fd = writer.fileno()
+ if writer in self._writers:
+ del self._writers[writer]
+ (reader, _) = self._fds[fd]
+ if reader:
+ # We have a reader so we need to update the IOLoop for
+ # read events only.
+ self._fds[fd] = (reader, None)
+ self._ioloop.update_handler(fd, IOLoop.READ)
+ else:
+ # Since we have no reader registered, we remove the
+ # entry from the _fds and unregister the handler from
+ # the IOLoop.
+ del self._fds[fd]
+ self._ioloop.remove_handler(fd)
+
+ def removeAll(self):
+ return self._removeAll(self._readers, self._writers)
+
+ def getReaders(self):
+ return self._readers.keys()
+
+ def getWriters(self):
+ return self._writers.keys()
+
+ def stop(self):
+ """
+ Implement L{IReactorCore.stop}.
+ """
+ PosixReactorBase.stop(self)
+ self.runUntilCurrent()
+ self._ioloop.stop()
+
+ def crash(self):
+ PosixReactorBase.crash(self)
+ self.runUntilCurrent()
+ self._ioloop.stop()
+
+ def doIteration(self, delay):
+ raise NotImplementedError("doIteration")
+
+ def mainLoop(self):
+ self.running = True
+ self._ioloop.start()
+
+def install(ioloop=tornado.ioloop.IOLoop.instance()):
+ """
+ Install the Tornado reactor.
+ """
+ reactor = TornadoReactor(ioloop)
+ from twisted.internet.main import installReactor
+ installReactor(reactor)
+ return reactor