]> git.ipfire.org Git - thirdparty/tornado.git/commitdiff
Added twisted-style reactor for Tornado.
authorOvidiu Predescu <ovidiu@gmail.com>
Tue, 12 Jul 2011 18:28:24 +0000 (11:28 -0700)
committerOvidiu Predescu <ovidiu@gmail.com>
Tue, 12 Jul 2011 18:28:24 +0000 (11:28 -0700)
tornado/test/twistedreactor_test.py [new file with mode: 0644]
tornado/twisted/__init__.py [new file with mode: 0644]
tornado/twisted/reactor.py [new file with mode: 0644]

diff --git a/tornado/test/twistedreactor_test.py b/tornado/test/twistedreactor_test.py
new file mode 100644 (file)
index 0000000..de87456
--- /dev/null
@@ -0,0 +1,203 @@
+import os
+import sys
+import thread
+
+import tornado.twisted.reactor
+tornado.twisted.reactor.install()
+from twisted.internet import reactor
+
+from twisted.internet.interfaces import IReadDescriptor, IWriteDescriptor
+
+from twisted.python import log
+
+from tornado.twisted.reactor import TornadoReactor
+from tornado.testing import AsyncTestCase, LogTrapTestCase
+import unittest
+
+from zope.interface import implements
+
+log.startLogging(sys.stdout)
+
+class ReactorWhenRunningTest(unittest.TestCase):
+    def setUp(self):
+        self._reactor = TornadoReactor()
+
+    def test_whenRunning(self):
+        self._whenRunningCalled = False
+        self._anotherWhenRunningCalled = False
+        self._reactor.callWhenRunning(self.whenRunningCallback)
+        self._reactor.run()
+        self.assertTrue(self._whenRunningCalled)
+        self.assertTrue(self._anotherWhenRunningCalled)
+
+    def whenRunningCallback(self):
+        self._whenRunningCalled = True
+        self._reactor.callWhenRunning(self.anotherWhenRunningCallback)
+        self._reactor.stop()
+
+    def anotherWhenRunningCallback(self):
+        self._anotherWhenRunningCalled = True
+
+class ReactorCallLaterTest(unittest.TestCase):
+    def setUp(self):
+        self._reactor = TornadoReactor()
+
+    def test_callLater(self):
+        self._laterCalled = False
+        self._now = self._reactor.seconds()
+        self._timeout = 0.001
+        dc = self._reactor.callLater(self._timeout, self.callLaterCallback)
+        self.assertEqual(self._reactor.getDelayedCalls(), [dc])
+        self._reactor.run()
+        self.assertTrue(self._laterCalled)
+        self.assertGreater(self._called - self._now, self._timeout)
+        self.assertEqual(self._reactor.getDelayedCalls(), [])
+
+    def callLaterCallback(self):
+        self._laterCalled = True
+        self._called = self._reactor.seconds()
+        self._reactor.stop()
+
+class ReactorTwoCallLaterTest(unittest.TestCase):
+    def setUp(self):
+        self._reactor = TornadoReactor()
+
+    def test_callLater(self):
+        self._later1Called = False
+        self._later2Called = False
+        self._now = self._reactor.seconds()
+        self._timeout1 = 0.0005
+        dc1 = self._reactor.callLater(self._timeout1, self.callLaterCallback1)
+        self._timeout2 = 0.001
+        dc2 = self._reactor.callLater(self._timeout2, self.callLaterCallback2)
+        self.assertTrue(self._reactor.getDelayedCalls() == [dc1, dc2] or
+                        self._reactor.getDelayedCalls() == [dc2, dc1])
+        self._reactor.run()
+        self.assertTrue(self._later1Called)
+        self.assertTrue(self._later2Called)
+        self.assertGreater(self._called1 - self._now, self._timeout1)
+        self.assertGreater(self._called2 - self._now, self._timeout2)
+        self.assertEqual(self._reactor.getDelayedCalls(), [])
+
+    def callLaterCallback1(self):
+        self._later1Called = True
+        self._called1 = self._reactor.seconds()
+
+    def callLaterCallback2(self):
+        self._later2Called = True
+        self._called2 = self._reactor.seconds()
+        self._reactor.stop()
+
+class ReactorCallFromThreadTest(unittest.TestCase):
+    def setUp(self):
+        self._reactor = TornadoReactor()
+        self._mainThread = thread.get_ident()
+
+    def _newThreadRun(self, a, b):
+        self.assertEqual(self._thread, thread.get_ident())
+        self._reactor.callFromThread(self._fnCalledFromThread)
+
+    def _fnCalledFromThread(self):
+        self.assertEqual(self._mainThread, thread.get_ident())
+        self._reactor.stop()
+
+    def _whenRunningCallback(self):
+        self._thread = thread.start_new_thread(self._newThreadRun, (None, None))
+
+    def testCallFromThread(self):
+        self._reactor.callWhenRunning(self._whenRunningCallback)
+        self._reactor.run()
+
+class ReactorCallInThread(unittest.TestCase):
+    def setUp(self):
+        self._reactor = TornadoReactor()
+        self._mainThread = thread.get_ident()
+
+    def _fnCalledInThread(self, *args, **kwargs):
+        self.assertNotEqual(thread.get_ident(), self._mainThread)
+        self._reactor.callFromThread(lambda: self._reactor.stop())
+
+    def _whenRunningCallback(self):
+        self._reactor.callInThread(self._fnCalledInThread)
+
+    def testCallInThread(self):
+        self._reactor.callWhenRunning(self._whenRunningCallback)
+        self._reactor.run()
+
+class Reader:
+    implements(IReadDescriptor)
+
+    def __init__(self, fd, callback):
+        self._fd = fd
+        self._callback = callback
+
+    def logPrefix(self): return "Reader"
+
+    def fileno(self):
+        return self._fd.fileno()
+
+    def connectionLost(self, reason):
+        return
+
+    def doRead(self):
+        self._callback(self._fd)
+
+class Writer:
+    implements(IWriteDescriptor)
+
+    def __init__(self, fd, callback):
+        self._fd = fd
+        self._callback = callback
+
+    def logPrefix(self): return "Writer"
+
+    def fileno(self):
+        return self._fd.fileno()
+
+    def connectionLost(self, reason):
+        return
+
+    def doWrite(self):
+        self._callback(self._fd)
+
+class ReactorReaderWriterTest(unittest.TestCase):
+    def setUp(self):
+        self._reactor = TornadoReactor()
+        r, w = os.pipe()
+        self._reactor._ioloop._set_nonblocking(r)
+        self._reactor._ioloop._set_nonblocking(w)
+        self._reactor._ioloop._set_close_exec(r)
+        self._reactor._ioloop._set_close_exec(w)
+        self._p1 = os.fdopen(r, "rb", 0)
+        self._p2 = os.fdopen(w, "wb", 0)
+
+    def _testReadWrite(self):
+        """
+        In this test the writer writes an 'x' to its fd. The reader
+        reads it, check the value and ends the test.
+        """
+        def checkReadInput(fd):
+            self.assertEqual(fd.read(), 'x')
+            self._reactor.stop()
+        self._reader = Reader(self._p1, checkReadInput)
+        self._writer = Writer(self._p2, lambda fd: fd.write('x'))
+        self._reactor.addWriter(self._writer)
+        self._reactor.removeWriter(self._writer)
+        self._reactor.addWriter(self._writer)
+        # Test the add/remove reader functionality
+        self._reactor.addReader(self._writer)
+        self._reactor.removeReader(self._writer)
+
+        self._reactor.addReader(self._reader)
+        self._reactor.removeReader(self._reader)
+        self._reactor.addReader(self._reader)
+        # Test the add/remove writer functionality
+        self._reactor.addWriter(self._reader)
+        self._reactor.removeWriter(self._reader)
+
+    def testReadWrite(self):
+        self._reactor.callWhenRunning(self._testReadWrite)
+        self._reactor.run()
+
+if __name__ == "__main__":
+    unittest.main()
diff --git a/tornado/twisted/__init__.py b/tornado/twisted/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/tornado/twisted/reactor.py b/tornado/twisted/reactor.py
new file mode 100644 (file)
index 0000000..e5f7444
--- /dev/null
@@ -0,0 +1,224 @@
+# System imports
+import errno, functools, sys
+import time
+
+from twisted.internet.base import DelayedCall
+from twisted.internet.posixbase import PosixReactorBase
+from twisted.internet.interfaces import \
+    IReactorFDSet, IDelayedCall, IReactorTime
+
+from zope.interface import implements
+
+import tornado
+import tornado.ioloop
+from tornado.ioloop import IOLoop
+
+class TornadoDelayedCall(object):
+    implements(IDelayedCall)
+
+    def __init__(self, reactor, seconds, f, *args, **kw):
+        self._reactor = reactor
+        self._func = functools.partial(f, *args, **kw)
+        self._time = self._reactor.seconds() + seconds
+        self._timeout = self._reactor._ioloop.add_timeout(self._time,
+                                                          self._called)
+        self._active = True
+
+    def _called(self):
+        self._active = False
+        self._reactor._removeDelayedCall(self)
+        try:
+            self._func()
+        except:
+            print "reactor.py _called caught exception: %s" % sys.exc_info()[0]
+
+    def getTime(self):
+        return self._time
+
+    def cancel(self):
+        self._active = False
+        self._reactor._ioloop.remove_timeout(self._timeout)
+        self._reactor._removeDelayedCall(self)
+
+    def delay(self, seconds):
+        self._reactor._ioloop.remove_timeout(self._timeout)
+        self._time += seconds
+        self._timeout = self._reactor._ioloop.add_timeout(self._time,
+                                                          self._called)
+
+    def reset(self, seconds):
+        self._reactor._ioloop.remove_timeout(self._timeout)
+        self._time = self._reactor.seconds() + seconds
+        self._timeout = self._reactor._ioloop.add_timeout(self._time,
+                                                          self._called)
+
+    def active(self):
+        return self._active
+
+class TornadoReactor(PosixReactorBase):
+    """
+    Twisted style reactor for Tornado.
+    """
+    implements(IReactorTime, IReactorFDSet)
+
+    def __init__(self, ioloop=tornado.ioloop.IOLoop.instance()):
+        self._ioloop = ioloop
+        self._readers = {}
+        self._writers = {}
+        self._fds = {} # a map of fd to a (reader, writer) tuple
+        self._delayedCalls = {}
+        # self._waker = None
+        PosixReactorBase.__init__(self)
+
+    # IReactorTime
+    def seconds(self):
+        return time.time()
+
+    def callLater(self, seconds, f, *args, **kw):
+        dc = TornadoDelayedCall(self, seconds, f, *args, **kw)
+        self._delayedCalls[dc] = True
+        return dc
+
+    def getDelayedCalls(self):
+        return [x for x in self._delayedCalls if x._active]
+
+    def _removeDelayedCall(self, dc):
+        if dc in self._delayedCalls:
+            del self._delayedCalls[dc]
+
+    # IReactorThreads
+    def callFromThread(self, f, *args, **kw):
+        """
+        See L{twisted.internet.interfaces.IReactorThreads.callFromThread}.
+        """
+        assert callable(f), "%s is not callable" % f
+        p = functools.partial(f, *args, **kw)
+        self._ioloop.add_callback(p)
+
+    # We don't need the waker code from the super class, Tornado uses
+    # its own waker.
+    def installWaker(self):
+        pass
+
+    def wakeUp(self):
+        pass
+
+    # IReactorFDSet
+    def _invoke_callback(self, fd, events):
+        (reader, writer) = self._fds[fd]
+        if events | IOLoop.READ and reader:
+            reader.doRead()
+        if events | IOLoop.WRITE and writer:
+            writer.doWrite()
+
+    def addReader(self, reader):
+        """
+        Add a FileDescriptor for notification of data available to read.
+        """
+        self._readers[reader] = True
+        fd = reader.fileno()
+        if fd in self._fds:
+            (_, writer) = self._fds[fd]
+            self._fds[fd] = (reader, writer)
+            if writer:
+                # We already registered this fd for write events,
+                # update it for read events as well.
+                self._ioloop.update_handler(fd, IOLoop.READ | IOLoop.WRITE)
+        else:
+            self._fds[fd] = (reader, None)
+            self._ioloop.add_handler(fd, self._invoke_callback, IOLoop.READ)
+
+    def addWriter(self, writer):
+        """
+        Add a FileDescriptor for notification of data available to write.
+        """
+        self._writers[writer] = True
+        fd = writer.fileno()
+        if fd in self._fds:
+            (reader, _) = self._fds[fd]
+            self._fds[fd] = (reader, writer)
+            if reader:
+                # We already registered this fd for read events,
+                # update it for write events as well.
+                self._ioloop.update_handler(fd, IOLoop.READ | IOLoop.WRITE)
+        else:
+            self._fds[fd] = (None, writer)
+            self._ioloop.add_handler(fd, self._invoke_callback, IOLoop.WRITE)
+
+    def removeReader(self, reader):
+        """
+        Remove a Selectable for notification of data available to read.
+        """
+        fd = reader.fileno()
+        if reader in self._readers:
+            del self._readers[reader]
+            (_, writer) = self._fds[fd]
+            if writer:
+                # We have a writer so we need to update the IOLoop for
+                # write events only.
+                self._fds[fd] = (None, writer)
+                self._ioloop.update_handler(fd, IOLoop.WRITE)
+            else:
+                # Since we have no writer registered, we remove the
+                # entry from _fds and unregister the handler from the
+                # IOLoop
+                del self._fds[fd]
+                self._ioloop.remove_handler(fd)
+
+    def removeWriter(self, writer):
+        """
+        Remove a Selectable for notification of data available to write.
+        """
+        fd = writer.fileno()
+        if writer in self._writers:
+            del self._writers[writer]
+            (reader, _) = self._fds[fd]
+            if reader:
+                # We have a reader so we need to update the IOLoop for
+                # read events only.
+                self._fds[fd] = (reader, None)
+                self._ioloop.update_handler(fd, IOLoop.READ)
+            else:
+                # Since we have no reader registered, we remove the
+                # entry from the _fds and unregister the handler from
+                # the IOLoop.
+                del self._fds[fd]
+                self._ioloop.remove_handler(fd)
+
+    def removeAll(self):
+        return self._removeAll(self._readers, self._writers)
+
+    def getReaders(self):
+        return self._readers.keys()
+
+    def getWriters(self):
+        return self._writers.keys()
+
+    def stop(self):
+        """
+        Implement L{IReactorCore.stop}.
+        """
+        PosixReactorBase.stop(self)
+        self.runUntilCurrent()
+        self._ioloop.stop()
+
+    def crash(self):
+        PosixReactorBase.crash(self)
+        self.runUntilCurrent()
+        self._ioloop.stop()
+
+    def doIteration(self, delay):
+        raise NotImplementedError("doIteration")
+
+    def mainLoop(self):
+        self.running = True
+        self._ioloop.start()
+
+def install(ioloop=tornado.ioloop.IOLoop.instance()):
+    """
+    Install the Tornado reactor.
+    """
+    reactor = TornadoReactor(ioloop)
+    from twisted.internet.main import installReactor
+    installReactor(reactor)
+    return reactor