To use it, add the following to your twisted application:
-import tornado.twisted.reactor
-tornado.twisted.reactor.install()
+import tornado.platform.twistedreactor
+tornado.platform.twistedreactor.install()
from twisted.internet import reactor
"""
-import errno, functools, sys
+import functools
+import logging
+import sys
import time
from twisted.internet.base import DelayedCall
import tornado
import tornado.ioloop
+from tornado.stack_context import NullContext
from tornado.ioloop import IOLoop
class TornadoDelayedCall(object):
try:
self._func()
except:
- print "reactor.py _called caught exception: %s" % sys.exc_info()[0]
+ logging.error("_called caught exception", exc_info=True)
def getTime(self):
return self._time
self._writers = {}
self._fds = {} # a map of fd to a (reader, writer) tuple
self._delayedCalls = {}
- # self._waker = None
+ self._running = False
PosixReactorBase.__init__(self)
# IReactorTime
"""
Add a FileDescriptor for notification of data available to read.
"""
+ if reader in self._readers:
+ # Don't add the reader if it's already there
+ return
self._readers[reader] = True
fd = reader.fileno()
if fd in self._fds:
# update it for read events as well.
self._ioloop.update_handler(fd, IOLoop.READ | IOLoop.WRITE)
else:
- self._fds[fd] = (reader, None)
- self._ioloop.add_handler(fd, self._invoke_callback, IOLoop.READ)
+ with NullContext():
+ self._fds[fd] = (reader, None)
+ self._ioloop.add_handler(fd, self._invoke_callback,
+ IOLoop.READ)
def addWriter(self, writer):
"""
Add a FileDescriptor for notification of data available to write.
"""
+ if writer in self._writers:
+ return
self._writers[writer] = True
fd = writer.fileno()
if fd in self._fds:
# update it for write events as well.
self._ioloop.update_handler(fd, IOLoop.READ | IOLoop.WRITE)
else:
- self._fds[fd] = (None, writer)
- self._ioloop.add_handler(fd, self._invoke_callback, IOLoop.WRITE)
+ with NullContext():
+ self._fds[fd] = (None, writer)
+ self._ioloop.add_handler(fd, self._invoke_callback,
+ IOLoop.WRITE)
def removeReader(self, reader):
"""
"""
Implement L{IReactorCore.stop}.
"""
+ self._running = False
PosixReactorBase.stop(self)
self.runUntilCurrent()
- self._ioloop.stop()
+ try:
+ self._ioloop.stop()
+ self._ioloop.close()
+ except:
+ # Ignore any exceptions thrown by IOLoop
+ pass
def crash(self):
+ if not self._running:
+ return
+ self._running = False
PosixReactorBase.crash(self)
self.runUntilCurrent()
- self._ioloop.stop()
+ try:
+ self._ioloop.stop()
+ self._ioloop.close()
+ except:
+ # Ignore any exceptions thrown by IOLoop
+ pass
def doIteration(self, delay):
raise NotImplementedError("doIteration")
def mainLoop(self):
- self.running = True
+ self._running = True
self._ioloop.start()
+ def run(self):
+ PosixReactorBase.run(self, installSignalHandlers=False)
+
def install(ioloop=None):
"""
Install the Tornado reactor.
import os
import sys
import thread
+import threading
import unittest
-from twisted.internet.interfaces import IReadDescriptor, IWriteDescriptor
-from twisted.python import log
-from tornado.platform.auto import Waker
+try:
+ import twisted
+ from twisted.internet.interfaces import IReadDescriptor, IWriteDescriptor
+ from tornado.platform.twistedreactor import TornadoReactor
+ from zope.interface import implements
+except ImportError:
+ twisted = None
+ IReadDescriptor = IWriteDescriptor = None
+ def implements(f): pass
from tornado.ioloop import IOLoop
-from tornado.twisted.reactor import TornadoReactor
-from tornado.testing import AsyncTestCase, LogTrapTestCase
-
-from zope.interface import implements
-
-log.startLogging(sys.stdout)
+from tornado.platform.auto import set_close_exec
class ReactorWhenRunningTest(unittest.TestCase):
def setUp(self):
self.assertEqual(self._reactor.getDelayedCalls(), [dc])
self._reactor.run()
self.assertTrue(self._laterCalled)
- self.assertGreater(self._called - self._now, self._timeout)
+ self.assertTrue(self._called - self._now > self._timeout)
self.assertEqual(self._reactor.getDelayedCalls(), [])
def callLaterCallback(self):
self._reactor.run()
self.assertTrue(self._later1Called)
self.assertTrue(self._later2Called)
- self.assertGreater(self._called1 - self._now, self._timeout1)
- self.assertGreater(self._called2 - self._now, self._timeout2)
+ self.assertTrue(self._called1 - self._now > self._timeout1)
+ self.assertTrue(self._called2 - self._now > self._timeout2)
self.assertEqual(self._reactor.getDelayedCalls(), [])
def callLaterCallback1(self):
self._reactor = TornadoReactor(IOLoop())
self._mainThread = thread.get_ident()
- def _newThreadRun(self, a, b):
- self.assertEqual(self._thread, thread.get_ident())
+ def tearDown(self):
+ self._thread.join()
+
+ def _newThreadRun(self):
+ self.assertEqual(self._thread.ident, thread.get_ident())
self._reactor.callFromThread(self._fnCalledFromThread)
def _fnCalledFromThread(self):
self._reactor.stop()
def _whenRunningCallback(self):
- self._thread = thread.start_new_thread(self._newThreadRun, (None, None))
+ self._thread = threading.Thread(target=self._newThreadRun)
+ self._thread.start()
def testCallFromThread(self):
self._reactor.callWhenRunning(self._whenRunningCallback)
return self._fd.fileno()
def connectionLost(self, reason):
- return
+ self._fd.close()
def doRead(self):
self._callback(self._fd)
return self._fd.fileno()
def connectionLost(self, reason):
- return
+ self._fd.close()
def doWrite(self):
self._callback(self._fd)
class ReactorReaderWriterTest(unittest.TestCase):
- def _set_close_exec(self, fd):
- flags = fcntl.fcntl(fd, fcntl.F_GETFD)
- fcntl.fcntl(fd, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC)
-
def _set_nonblocking(self, fd):
flags = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
r, w = os.pipe()
self._set_nonblocking(r)
self._set_nonblocking(w)
- self._set_close_exec(r)
- self._set_close_exec(w)
+ set_close_exec(r)
+ set_close_exec(w)
self._p1 = os.fdopen(r, "rb", 0)
self._p2 = os.fdopen(w, "wb", 0)
In this test the writer writes an 'x' to its fd. The reader
reads it, check the value and ends the test.
"""
+ self.shouldWrite = True
def checkReadInput(fd):
- self.assertTrue(fd.read().startswith('x'))
+ self.assertEquals(fd.read(), 'x')
self._reactor.stop()
+ def writeOnce(fd):
+ if self.shouldWrite:
+ self.shouldWrite = False
+ fd.write('x')
self._reader = Reader(self._p1, checkReadInput)
- self._writer = Writer(self._p2, lambda fd: fd.write('x'))
+ self._writer = Writer(self._p2, writeOnce)
+
+ # Test that adding and removing the writer doesn't cause
+ # unintended effects.
self._reactor.addWriter(self._writer)
self._reactor.removeWriter(self._writer)
self._reactor.addWriter(self._writer)
- # Test the add/remove reader functionality
- self._reactor.addReader(self._writer)
- self._reactor.removeReader(self._writer)
+ # Test that adding and removing the reader doesn't cause
+ # unintended effects.
self._reactor.addReader(self._reader)
self._reactor.removeReader(self._reader)
self._reactor.addReader(self._reader)
- # Test the add/remove writer functionality
- self._reactor.addWriter(self._reader)
- self._reactor.removeWriter(self._reader)
def testReadWrite(self):
self._reactor.callWhenRunning(self._testReadWrite)
self._reactor.run()
+if twisted is None:
+ del ReactorWhenRunningTest
+ del ReactorCallLaterTest
+ del ReactorTwoCallLaterTest
+ del ReactorCallFromThreadTest
+ del ReactorCallInThread
+ del ReactorReaderWriterTest
+
if __name__ == "__main__":
unittest.main()