From: Ovidiu Predescu Date: Tue, 12 Jul 2011 18:28:24 +0000 (-0700) Subject: Added twisted-style reactor for Tornado. X-Git-Tag: v2.1.0~90 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=a45f673573cdc95177e7d17022c786156084671b;p=thirdparty%2Ftornado.git Added twisted-style reactor for Tornado. --- diff --git a/tornado/test/twistedreactor_test.py b/tornado/test/twistedreactor_test.py new file mode 100644 index 000000000..de87456a7 --- /dev/null +++ b/tornado/test/twistedreactor_test.py @@ -0,0 +1,203 @@ +import os +import sys +import thread + +import tornado.twisted.reactor +tornado.twisted.reactor.install() +from twisted.internet import reactor + +from twisted.internet.interfaces import IReadDescriptor, IWriteDescriptor + +from twisted.python import log + +from tornado.twisted.reactor import TornadoReactor +from tornado.testing import AsyncTestCase, LogTrapTestCase +import unittest + +from zope.interface import implements + +log.startLogging(sys.stdout) + +class ReactorWhenRunningTest(unittest.TestCase): + def setUp(self): + self._reactor = TornadoReactor() + + def test_whenRunning(self): + self._whenRunningCalled = False + self._anotherWhenRunningCalled = False + self._reactor.callWhenRunning(self.whenRunningCallback) + self._reactor.run() + self.assertTrue(self._whenRunningCalled) + self.assertTrue(self._anotherWhenRunningCalled) + + def whenRunningCallback(self): + self._whenRunningCalled = True + self._reactor.callWhenRunning(self.anotherWhenRunningCallback) + self._reactor.stop() + + def anotherWhenRunningCallback(self): + self._anotherWhenRunningCalled = True + +class ReactorCallLaterTest(unittest.TestCase): + def setUp(self): + self._reactor = TornadoReactor() + + def test_callLater(self): + self._laterCalled = False + self._now = self._reactor.seconds() + self._timeout = 0.001 + dc = self._reactor.callLater(self._timeout, self.callLaterCallback) + self.assertEqual(self._reactor.getDelayedCalls(), [dc]) + self._reactor.run() + self.assertTrue(self._laterCalled) + self.assertGreater(self._called - self._now, self._timeout) + self.assertEqual(self._reactor.getDelayedCalls(), []) + + def callLaterCallback(self): + self._laterCalled = True + self._called = self._reactor.seconds() + self._reactor.stop() + +class ReactorTwoCallLaterTest(unittest.TestCase): + def setUp(self): + self._reactor = TornadoReactor() + + def test_callLater(self): + self._later1Called = False + self._later2Called = False + self._now = self._reactor.seconds() + self._timeout1 = 0.0005 + dc1 = self._reactor.callLater(self._timeout1, self.callLaterCallback1) + self._timeout2 = 0.001 + dc2 = self._reactor.callLater(self._timeout2, self.callLaterCallback2) + self.assertTrue(self._reactor.getDelayedCalls() == [dc1, dc2] or + self._reactor.getDelayedCalls() == [dc2, dc1]) + self._reactor.run() + self.assertTrue(self._later1Called) + self.assertTrue(self._later2Called) + self.assertGreater(self._called1 - self._now, self._timeout1) + self.assertGreater(self._called2 - self._now, self._timeout2) + self.assertEqual(self._reactor.getDelayedCalls(), []) + + def callLaterCallback1(self): + self._later1Called = True + self._called1 = self._reactor.seconds() + + def callLaterCallback2(self): + self._later2Called = True + self._called2 = self._reactor.seconds() + self._reactor.stop() + +class ReactorCallFromThreadTest(unittest.TestCase): + def setUp(self): + self._reactor = TornadoReactor() + self._mainThread = thread.get_ident() + + def _newThreadRun(self, a, b): + self.assertEqual(self._thread, thread.get_ident()) + self._reactor.callFromThread(self._fnCalledFromThread) + + def _fnCalledFromThread(self): + self.assertEqual(self._mainThread, thread.get_ident()) + self._reactor.stop() + + def _whenRunningCallback(self): + self._thread = thread.start_new_thread(self._newThreadRun, (None, None)) + + def testCallFromThread(self): + self._reactor.callWhenRunning(self._whenRunningCallback) + self._reactor.run() + +class ReactorCallInThread(unittest.TestCase): + def setUp(self): + self._reactor = TornadoReactor() + self._mainThread = thread.get_ident() + + def _fnCalledInThread(self, *args, **kwargs): + self.assertNotEqual(thread.get_ident(), self._mainThread) + self._reactor.callFromThread(lambda: self._reactor.stop()) + + def _whenRunningCallback(self): + self._reactor.callInThread(self._fnCalledInThread) + + def testCallInThread(self): + self._reactor.callWhenRunning(self._whenRunningCallback) + self._reactor.run() + +class Reader: + implements(IReadDescriptor) + + def __init__(self, fd, callback): + self._fd = fd + self._callback = callback + + def logPrefix(self): return "Reader" + + def fileno(self): + return self._fd.fileno() + + def connectionLost(self, reason): + return + + def doRead(self): + self._callback(self._fd) + +class Writer: + implements(IWriteDescriptor) + + def __init__(self, fd, callback): + self._fd = fd + self._callback = callback + + def logPrefix(self): return "Writer" + + def fileno(self): + return self._fd.fileno() + + def connectionLost(self, reason): + return + + def doWrite(self): + self._callback(self._fd) + +class ReactorReaderWriterTest(unittest.TestCase): + def setUp(self): + self._reactor = TornadoReactor() + r, w = os.pipe() + self._reactor._ioloop._set_nonblocking(r) + self._reactor._ioloop._set_nonblocking(w) + self._reactor._ioloop._set_close_exec(r) + self._reactor._ioloop._set_close_exec(w) + self._p1 = os.fdopen(r, "rb", 0) + self._p2 = os.fdopen(w, "wb", 0) + + def _testReadWrite(self): + """ + In this test the writer writes an 'x' to its fd. The reader + reads it, check the value and ends the test. + """ + def checkReadInput(fd): + self.assertEqual(fd.read(), 'x') + self._reactor.stop() + self._reader = Reader(self._p1, checkReadInput) + self._writer = Writer(self._p2, lambda fd: fd.write('x')) + self._reactor.addWriter(self._writer) + self._reactor.removeWriter(self._writer) + self._reactor.addWriter(self._writer) + # Test the add/remove reader functionality + self._reactor.addReader(self._writer) + self._reactor.removeReader(self._writer) + + self._reactor.addReader(self._reader) + self._reactor.removeReader(self._reader) + self._reactor.addReader(self._reader) + # Test the add/remove writer functionality + self._reactor.addWriter(self._reader) + self._reactor.removeWriter(self._reader) + + def testReadWrite(self): + self._reactor.callWhenRunning(self._testReadWrite) + self._reactor.run() + +if __name__ == "__main__": + unittest.main() diff --git a/tornado/twisted/__init__.py b/tornado/twisted/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tornado/twisted/reactor.py b/tornado/twisted/reactor.py new file mode 100644 index 000000000..e5f7444a0 --- /dev/null +++ b/tornado/twisted/reactor.py @@ -0,0 +1,224 @@ +# System imports +import errno, functools, sys +import time + +from twisted.internet.base import DelayedCall +from twisted.internet.posixbase import PosixReactorBase +from twisted.internet.interfaces import \ + IReactorFDSet, IDelayedCall, IReactorTime + +from zope.interface import implements + +import tornado +import tornado.ioloop +from tornado.ioloop import IOLoop + +class TornadoDelayedCall(object): + implements(IDelayedCall) + + def __init__(self, reactor, seconds, f, *args, **kw): + self._reactor = reactor + self._func = functools.partial(f, *args, **kw) + self._time = self._reactor.seconds() + seconds + self._timeout = self._reactor._ioloop.add_timeout(self._time, + self._called) + self._active = True + + def _called(self): + self._active = False + self._reactor._removeDelayedCall(self) + try: + self._func() + except: + print "reactor.py _called caught exception: %s" % sys.exc_info()[0] + + def getTime(self): + return self._time + + def cancel(self): + self._active = False + self._reactor._ioloop.remove_timeout(self._timeout) + self._reactor._removeDelayedCall(self) + + def delay(self, seconds): + self._reactor._ioloop.remove_timeout(self._timeout) + self._time += seconds + self._timeout = self._reactor._ioloop.add_timeout(self._time, + self._called) + + def reset(self, seconds): + self._reactor._ioloop.remove_timeout(self._timeout) + self._time = self._reactor.seconds() + seconds + self._timeout = self._reactor._ioloop.add_timeout(self._time, + self._called) + + def active(self): + return self._active + +class TornadoReactor(PosixReactorBase): + """ + Twisted style reactor for Tornado. + """ + implements(IReactorTime, IReactorFDSet) + + def __init__(self, ioloop=tornado.ioloop.IOLoop.instance()): + self._ioloop = ioloop + self._readers = {} + self._writers = {} + self._fds = {} # a map of fd to a (reader, writer) tuple + self._delayedCalls = {} + # self._waker = None + PosixReactorBase.__init__(self) + + # IReactorTime + def seconds(self): + return time.time() + + def callLater(self, seconds, f, *args, **kw): + dc = TornadoDelayedCall(self, seconds, f, *args, **kw) + self._delayedCalls[dc] = True + return dc + + def getDelayedCalls(self): + return [x for x in self._delayedCalls if x._active] + + def _removeDelayedCall(self, dc): + if dc in self._delayedCalls: + del self._delayedCalls[dc] + + # IReactorThreads + def callFromThread(self, f, *args, **kw): + """ + See L{twisted.internet.interfaces.IReactorThreads.callFromThread}. + """ + assert callable(f), "%s is not callable" % f + p = functools.partial(f, *args, **kw) + self._ioloop.add_callback(p) + + # We don't need the waker code from the super class, Tornado uses + # its own waker. + def installWaker(self): + pass + + def wakeUp(self): + pass + + # IReactorFDSet + def _invoke_callback(self, fd, events): + (reader, writer) = self._fds[fd] + if events | IOLoop.READ and reader: + reader.doRead() + if events | IOLoop.WRITE and writer: + writer.doWrite() + + def addReader(self, reader): + """ + Add a FileDescriptor for notification of data available to read. + """ + self._readers[reader] = True + fd = reader.fileno() + if fd in self._fds: + (_, writer) = self._fds[fd] + self._fds[fd] = (reader, writer) + if writer: + # We already registered this fd for write events, + # update it for read events as well. + self._ioloop.update_handler(fd, IOLoop.READ | IOLoop.WRITE) + else: + self._fds[fd] = (reader, None) + self._ioloop.add_handler(fd, self._invoke_callback, IOLoop.READ) + + def addWriter(self, writer): + """ + Add a FileDescriptor for notification of data available to write. + """ + self._writers[writer] = True + fd = writer.fileno() + if fd in self._fds: + (reader, _) = self._fds[fd] + self._fds[fd] = (reader, writer) + if reader: + # We already registered this fd for read events, + # update it for write events as well. + self._ioloop.update_handler(fd, IOLoop.READ | IOLoop.WRITE) + else: + self._fds[fd] = (None, writer) + self._ioloop.add_handler(fd, self._invoke_callback, IOLoop.WRITE) + + def removeReader(self, reader): + """ + Remove a Selectable for notification of data available to read. + """ + fd = reader.fileno() + if reader in self._readers: + del self._readers[reader] + (_, writer) = self._fds[fd] + if writer: + # We have a writer so we need to update the IOLoop for + # write events only. + self._fds[fd] = (None, writer) + self._ioloop.update_handler(fd, IOLoop.WRITE) + else: + # Since we have no writer registered, we remove the + # entry from _fds and unregister the handler from the + # IOLoop + del self._fds[fd] + self._ioloop.remove_handler(fd) + + def removeWriter(self, writer): + """ + Remove a Selectable for notification of data available to write. + """ + fd = writer.fileno() + if writer in self._writers: + del self._writers[writer] + (reader, _) = self._fds[fd] + if reader: + # We have a reader so we need to update the IOLoop for + # read events only. + self._fds[fd] = (reader, None) + self._ioloop.update_handler(fd, IOLoop.READ) + else: + # Since we have no reader registered, we remove the + # entry from the _fds and unregister the handler from + # the IOLoop. + del self._fds[fd] + self._ioloop.remove_handler(fd) + + def removeAll(self): + return self._removeAll(self._readers, self._writers) + + def getReaders(self): + return self._readers.keys() + + def getWriters(self): + return self._writers.keys() + + def stop(self): + """ + Implement L{IReactorCore.stop}. + """ + PosixReactorBase.stop(self) + self.runUntilCurrent() + self._ioloop.stop() + + def crash(self): + PosixReactorBase.crash(self) + self.runUntilCurrent() + self._ioloop.stop() + + def doIteration(self, delay): + raise NotImplementedError("doIteration") + + def mainLoop(self): + self.running = True + self._ioloop.start() + +def install(ioloop=tornado.ioloop.IOLoop.instance()): + """ + Install the Tornado reactor. + """ + reactor = TornadoReactor(ioloop) + from twisted.internet.main import installReactor + installReactor(reactor) + return reactor