from __future__ import absolute_import, division, print_function
-import functools
import socket
import sys
import twisted.internet.abstract # type: ignore
+import twisted.internet.asyncioreactor # type: ignore
from twisted.internet.defer import Deferred # type: ignore
-from twisted.internet.posixbase import PosixReactorBase # type: ignore
-from twisted.internet.interfaces import IReactorFDSet, IDelayedCall, IReactorTime # type: ignore
-from twisted.python import failure, log # type: ignore
-from twisted.internet import error # type: ignore
+from twisted.python import failure # type: ignore
import twisted.names.cache # type: ignore
import twisted.names.client # type: ignore
import twisted.names.hosts # type: ignore
import twisted.names.resolve # type: ignore
-from zope.interface import implementer # type: ignore
from tornado.concurrent import Future, future_set_exc_info
from tornado.escape import utf8
from tornado import gen
import tornado.ioloop
-from tornado.log import app_log
from tornado.netutil import Resolver
-from tornado.stack_context import NullContext
-from tornado.ioloop import IOLoop
-
-
-@implementer(IDelayedCall)
-class TornadoDelayedCall(object):
- """DelayedCall object for Tornado."""
- def __init__(self, reactor, seconds, f, *args, **kw):
- self._reactor = reactor
- self._func = functools.partial(f, *args, **kw)
- self._time = self._reactor.seconds() + seconds
- self._timeout = self._reactor._io_loop.add_timeout(self._time,
- self._called)
- self._active = True
-
- def _called(self):
- self._active = False
- self._reactor._removeDelayedCall(self)
- try:
- self._func()
- except:
- app_log.error("_called caught exception", exc_info=True)
-
- def getTime(self):
- return self._time
-
- def cancel(self):
- self._active = False
- self._reactor._io_loop.remove_timeout(self._timeout)
- self._reactor._removeDelayedCall(self)
-
- def delay(self, seconds):
- self._reactor._io_loop.remove_timeout(self._timeout)
- self._time += seconds
- self._timeout = self._reactor._io_loop.add_timeout(self._time,
- self._called)
-
- def reset(self, seconds):
- self._reactor._io_loop.remove_timeout(self._timeout)
- self._time = self._reactor.seconds() + seconds
- self._timeout = self._reactor._io_loop.add_timeout(self._time,
- self._called)
-
- def active(self):
- return self._active
-
-
-@implementer(IReactorTime, IReactorFDSet)
-class TornadoReactor(PosixReactorBase):
- """Twisted reactor built on the Tornado IOLoop.
-
- `TornadoReactor` implements the Twisted reactor interface on top of
- the Tornado IOLoop. To use it, simply call `install` at the beginning
- of the application::
-
- import tornado.platform.twisted
- tornado.platform.twisted.install()
- from twisted.internet import reactor
-
- When the app is ready to start, call ``IOLoop.current().start()``
- instead of ``reactor.run()``.
-
- It is also possible to create a non-global reactor by calling
- ``tornado.platform.twisted.TornadoReactor()``. However, if
- the `.IOLoop` and reactor are to be short-lived (such as those used in
- unit tests), additional cleanup may be required. Specifically, it is
- recommended to call::
-
- reactor.fireSystemEvent('shutdown')
- reactor.disconnectAll()
-
- before closing the `.IOLoop`.
-
- .. versionchanged:: 5.0
- The ``io_loop`` argument (deprecated since version 4.1) has been removed.
-
- .. deprecated:: 5.1
-
- This class will be removed in Tornado 6.0. Use
- ``twisted.internet.asyncioreactor.AsyncioSelectorReactor``
- instead.
-
- """
- def __init__(self):
- self._io_loop = tornado.ioloop.IOLoop.current()
- self._readers = {} # map of reader objects to fd
- self._writers = {} # map of writer objects to fd
- self._fds = {} # a map of fd to a (reader, writer) tuple
- self._delayedCalls = {}
- PosixReactorBase.__init__(self)
- self.addSystemEventTrigger('during', 'shutdown', self.crash)
-
- # IOLoop.start() bypasses some of the reactor initialization.
- # Fire off the necessary events if they weren't already triggered
- # by reactor.run().
- def start_if_necessary():
- if not self._started:
- self.fireSystemEvent('startup')
- self._io_loop.add_callback(start_if_necessary)
-
- # IReactorTime
- def seconds(self):
- return self._io_loop.time()
-
- def callLater(self, seconds, f, *args, **kw):
- dc = TornadoDelayedCall(self, seconds, f, *args, **kw)
- self._delayedCalls[dc] = True
- return dc
-
- def getDelayedCalls(self):
- return [x for x in self._delayedCalls if x._active]
-
- def _removeDelayedCall(self, dc):
- if dc in self._delayedCalls:
- del self._delayedCalls[dc]
-
- # IReactorThreads
- def callFromThread(self, f, *args, **kw):
- assert callable(f), "%s is not callable" % f
- with NullContext():
- # This NullContext is mainly for an edge case when running
- # TwistedIOLoop on top of a TornadoReactor.
- # TwistedIOLoop.add_callback uses reactor.callFromThread and
- # should not pick up additional StackContexts along the way.
- self._io_loop.add_callback(f, *args, **kw)
-
- # We don't need the waker code from the super class, Tornado uses
- # its own waker.
- def installWaker(self):
- pass
-
- def wakeUp(self):
- pass
-
- # IReactorFDSet
- def _invoke_callback(self, fd, events):
- if fd not in self._fds:
- return
- (reader, writer) = self._fds[fd]
- if reader:
- err = None
- if reader.fileno() == -1:
- err = error.ConnectionLost()
- elif events & IOLoop.READ:
- err = log.callWithLogger(reader, reader.doRead)
- if err is None and events & IOLoop.ERROR:
- err = error.ConnectionLost()
- if err is not None:
- self.removeReader(reader)
- reader.readConnectionLost(failure.Failure(err))
- if writer:
- err = None
- if writer.fileno() == -1:
- err = error.ConnectionLost()
- elif events & IOLoop.WRITE:
- err = log.callWithLogger(writer, writer.doWrite)
- if err is None and events & IOLoop.ERROR:
- err = error.ConnectionLost()
- if err is not None:
- self.removeWriter(writer)
- writer.writeConnectionLost(failure.Failure(err))
-
- def addReader(self, reader):
- if reader in self._readers:
- # Don't add the reader if it's already there
- return
- fd = reader.fileno()
- self._readers[reader] = fd
- if fd in self._fds:
- (_, writer) = self._fds[fd]
- self._fds[fd] = (reader, writer)
- if writer:
- # We already registered this fd for write events,
- # update it for read events as well.
- self._io_loop.update_handler(fd, IOLoop.READ | IOLoop.WRITE)
- else:
- with NullContext():
- self._fds[fd] = (reader, None)
- self._io_loop.add_handler(fd, self._invoke_callback,
- IOLoop.READ)
-
- def addWriter(self, writer):
- if writer in self._writers:
- return
- fd = writer.fileno()
- self._writers[writer] = fd
- if fd in self._fds:
- (reader, _) = self._fds[fd]
- self._fds[fd] = (reader, writer)
- if reader:
- # We already registered this fd for read events,
- # update it for write events as well.
- self._io_loop.update_handler(fd, IOLoop.READ | IOLoop.WRITE)
- else:
- with NullContext():
- self._fds[fd] = (None, writer)
- self._io_loop.add_handler(fd, self._invoke_callback,
- IOLoop.WRITE)
-
- def removeReader(self, reader):
- if reader in self._readers:
- fd = self._readers.pop(reader)
- (_, writer) = self._fds[fd]
- if writer:
- # We have a writer so we need to update the IOLoop for
- # write events only.
- self._fds[fd] = (None, writer)
- self._io_loop.update_handler(fd, IOLoop.WRITE)
- else:
- # Since we have no writer registered, we remove the
- # entry from _fds and unregister the handler from the
- # IOLoop
- del self._fds[fd]
- self._io_loop.remove_handler(fd)
-
- def removeWriter(self, writer):
- if writer in self._writers:
- fd = self._writers.pop(writer)
- (reader, _) = self._fds[fd]
- if reader:
- # We have a reader so we need to update the IOLoop for
- # read events only.
- self._fds[fd] = (reader, None)
- self._io_loop.update_handler(fd, IOLoop.READ)
- else:
- # Since we have no reader registered, we remove the
- # entry from the _fds and unregister the handler from
- # the IOLoop.
- del self._fds[fd]
- self._io_loop.remove_handler(fd)
-
- def removeAll(self):
- return self._removeAll(self._readers, self._writers)
-
- def getReaders(self):
- return self._readers.keys()
-
- def getWriters(self):
- return self._writers.keys()
-
- # The following functions are mainly used in twisted-style test cases;
- # it is expected that most users of the TornadoReactor will call
- # IOLoop.start() instead of Reactor.run().
- def stop(self):
- PosixReactorBase.stop(self)
- fire_shutdown = functools.partial(self.fireSystemEvent, "shutdown")
- self._io_loop.add_callback(fire_shutdown)
-
- def crash(self):
- PosixReactorBase.crash(self)
- self._io_loop.stop()
-
- def doIteration(self, delay):
- raise NotImplementedError("doIteration")
-
- def mainLoop(self):
- # Since this class is intended to be used in applications
- # where the top-level event loop is ``io_loop.start()`` rather
- # than ``reactor.run()``, it is implemented a little
- # differently than other Twisted reactors. We override
- # ``mainLoop`` instead of ``doIteration`` and must implement
- # timed call functionality on top of `.IOLoop.add_timeout`
- # rather than using the implementation in
- # ``PosixReactorBase``.
- self._io_loop.start()
-
-
-class _TestReactor(TornadoReactor):
- """Subclass of TornadoReactor for use in unittests.
-
- This can't go in the test.py file because of import-order dependencies
- with the Twisted reactor test builder.
- """
- def __init__(self):
- # always use a new ioloop
- IOLoop.clear_current()
- IOLoop(make_current=True)
- super(_TestReactor, self).__init__()
- IOLoop.clear_current()
-
- def listenTCP(self, port, factory, backlog=50, interface=''):
- # default to localhost to avoid firewall prompts on the mac
- if not interface:
- interface = '127.0.0.1'
- return super(_TestReactor, self).listenTCP(
- port, factory, backlog=backlog, interface=interface)
-
- def listenUDP(self, port, protocol, interface='', maxPacketSize=8192):
- if not interface:
- interface = '127.0.0.1'
- return super(_TestReactor, self).listenUDP(
- port, protocol, interface=interface, maxPacketSize=maxPacketSize)
-
-
-def install():
- """Install this package as the default Twisted reactor.
-
- ``install()`` must be called very early in the startup process,
- before most other twisted-related imports. Conversely, because it
- initializes the `.IOLoop`, it cannot be called before
- `.fork_processes` or multi-process `~.TCPServer.start`. These
- conflicting requirements make it difficult to use `.TornadoReactor`
- in multi-process mode, and an external process manager such as
- ``supervisord`` is recommended instead.
-
- .. versionchanged:: 5.0
- The ``io_loop`` argument (deprecated since version 4.1) has been removed.
-
- .. deprecated:: 5.1
-
- This functio will be removed in Tornado 6.0. Use
- ``twisted.internet.asyncioreactor.install`` instead.
- """
- reactor = TornadoReactor()
- from twisted.internet.main import installReactor # type: ignore
- installReactor(reactor)
- return reactor
class TwistedResolver(Resolver):
def initialize(self):
# partial copy of twisted.names.client.createResolver, which doesn't
# allow for a reactor to be passed in.
- self.reactor = tornado.platform.twisted.TornadoReactor()
+ self.reactor = twisted.internet.asyncioreactor.AsyncioSelectorReactor()
host_resolver = twisted.names.hosts.Resolver('/etc/hosts')
cache_resolver = twisted.names.cache.CacheResolver(reactor=self.reactor)
from __future__ import absolute_import, division, print_function
import logging
-import os
-import shutil
import signal
import sys
-import tempfile
-import threading
import warnings
from tornado.escape import utf8
from tornado.httpclient import AsyncHTTPClient
from tornado.httpserver import HTTPServer
from tornado.ioloop import IOLoop
-from tornado.platform.auto import set_close_exec
-from tornado.testing import bind_unused_port
+from tornado.testing import bind_unused_port, AsyncTestCase, gen_test
from tornado.test.util import unittest
-from tornado.util import import_object, PY3
from tornado.web import RequestHandler, Application
try:
- import fcntl
from twisted.internet.defer import Deferred, inlineCallbacks, returnValue # type: ignore
- from twisted.internet.interfaces import IReadDescriptor, IWriteDescriptor # type: ignore
from twisted.internet.protocol import Protocol # type: ignore
- from twisted.python import log # type: ignore
- from tornado.platform.twisted import TornadoReactor, TwistedIOLoop
- from zope.interface import implementer # type: ignore
+ from twisted.internet.asyncioreactor import AsyncioSelectorReactor
have_twisted = True
except ImportError:
have_twisted = False
except ImportError:
have_twisted_web = False
-if PY3:
- import _thread as thread
-else:
- import thread
- ResourceWarning = None
-
-try:
- import asyncio
-except ImportError:
- asyncio = None
-
skipIfNoTwisted = unittest.skipUnless(have_twisted,
"twisted module not present")
for sig in [signal.SIGINT, signal.SIGTERM, signal.SIGCHLD]:
saved[sig] = signal.getsignal(sig)
if "twisted" in repr(saved):
- if not issubclass(IOLoop.configured_class(), TwistedIOLoop):
- # when the global ioloop is twisted, we expect the signal
- # handlers to be installed. Otherwise, it means we're not
- # cleaning up after twisted properly.
- raise Exception("twisted signal handlers already installed")
+ # This indicates we're not cleaning up after ourselves properly.
+ raise Exception("twisted signal handlers already installed")
return saved
signal.signal(sig, handler)
-class ReactorTestCase(unittest.TestCase):
- def setUp(self):
- self._saved_signals = save_signal_handlers()
- IOLoop.clear_current()
- self._io_loop = IOLoop(make_current=True)
- self._reactor = TornadoReactor()
- IOLoop.clear_current()
-
- def tearDown(self):
- self._io_loop.close(all_fds=True)
- restore_signal_handlers(self._saved_signals)
-
-
-@skipIfNoTwisted
-class ReactorWhenRunningTest(ReactorTestCase):
- def test_whenRunning(self):
- self._whenRunningCalled = False
- self._anotherWhenRunningCalled = False
- self._reactor.callWhenRunning(self.whenRunningCallback)
- self._reactor.run()
- self.assertTrue(self._whenRunningCalled)
- self.assertTrue(self._anotherWhenRunningCalled)
-
- def whenRunningCallback(self):
- self._whenRunningCalled = True
- self._reactor.callWhenRunning(self.anotherWhenRunningCallback)
- self._reactor.stop()
-
- def anotherWhenRunningCallback(self):
- self._anotherWhenRunningCalled = True
-
-
-@skipIfNoTwisted
-class ReactorCallLaterTest(ReactorTestCase):
- def test_callLater(self):
- self._laterCalled = False
- self._now = self._reactor.seconds()
- self._timeout = 0.001
- dc = self._reactor.callLater(self._timeout, self.callLaterCallback)
- self.assertEqual(self._reactor.getDelayedCalls(), [dc])
- self._reactor.run()
- self.assertTrue(self._laterCalled)
- self.assertTrue(self._called - self._now > self._timeout)
- self.assertEqual(self._reactor.getDelayedCalls(), [])
-
- def callLaterCallback(self):
- self._laterCalled = True
- self._called = self._reactor.seconds()
- self._reactor.stop()
-
-
-@skipIfNoTwisted
-class ReactorTwoCallLaterTest(ReactorTestCase):
- def test_callLater(self):
- self._later1Called = False
- self._later2Called = False
- self._now = self._reactor.seconds()
- self._timeout1 = 0.0005
- dc1 = self._reactor.callLater(self._timeout1, self.callLaterCallback1)
- self._timeout2 = 0.001
- dc2 = self._reactor.callLater(self._timeout2, self.callLaterCallback2)
- self.assertTrue(self._reactor.getDelayedCalls() == [dc1, dc2] or
- self._reactor.getDelayedCalls() == [dc2, dc1])
- self._reactor.run()
- self.assertTrue(self._later1Called)
- self.assertTrue(self._later2Called)
- self.assertTrue(self._called1 - self._now > self._timeout1)
- self.assertTrue(self._called2 - self._now > self._timeout2)
- self.assertEqual(self._reactor.getDelayedCalls(), [])
-
- def callLaterCallback1(self):
- self._later1Called = True
- self._called1 = self._reactor.seconds()
-
- def callLaterCallback2(self):
- self._later2Called = True
- self._called2 = self._reactor.seconds()
- self._reactor.stop()
-
-
-@skipIfNoTwisted
-class ReactorCallFromThreadTest(ReactorTestCase):
- def setUp(self):
- super(ReactorCallFromThreadTest, self).setUp()
- self._mainThread = thread.get_ident()
-
- def tearDown(self):
- self._thread.join()
- super(ReactorCallFromThreadTest, self).tearDown()
-
- def _newThreadRun(self):
- self.assertNotEqual(self._mainThread, thread.get_ident())
- if hasattr(self._thread, 'ident'): # new in python 2.6
- self.assertEqual(self._thread.ident, thread.get_ident())
- self._reactor.callFromThread(self._fnCalledFromThread)
-
- def _fnCalledFromThread(self):
- self.assertEqual(self._mainThread, thread.get_ident())
- self._reactor.stop()
-
- def _whenRunningCallback(self):
- self._thread = threading.Thread(target=self._newThreadRun)
- self._thread.start()
-
- def testCallFromThread(self):
- self._reactor.callWhenRunning(self._whenRunningCallback)
- self._reactor.run()
-
-
-@skipIfNoTwisted
-class ReactorCallInThread(ReactorTestCase):
- def setUp(self):
- super(ReactorCallInThread, self).setUp()
- self._mainThread = thread.get_ident()
-
- def _fnCalledInThread(self, *args, **kwargs):
- self.assertNotEqual(thread.get_ident(), self._mainThread)
- self._reactor.callFromThread(lambda: self._reactor.stop())
-
- def _whenRunningCallback(self):
- self._reactor.callInThread(self._fnCalledInThread)
-
- def testCallInThread(self):
- self._reactor.callWhenRunning(self._whenRunningCallback)
- self._reactor.run()
-
-
-if have_twisted:
- @implementer(IReadDescriptor)
- class Reader(object):
- def __init__(self, fd, callback):
- self._fd = fd
- self._callback = callback
-
- def logPrefix(self):
- return "Reader"
-
- def close(self):
- self._fd.close()
-
- def fileno(self):
- return self._fd.fileno()
-
- def readConnectionLost(self, reason):
- self.close()
-
- def connectionLost(self, reason):
- self.close()
-
- def doRead(self):
- self._callback(self._fd)
-
- @implementer(IWriteDescriptor)
- class Writer(object):
- def __init__(self, fd, callback):
- self._fd = fd
- self._callback = callback
-
- def logPrefix(self):
- return "Writer"
-
- def close(self):
- self._fd.close()
-
- def fileno(self):
- return self._fd.fileno()
-
- def connectionLost(self, reason):
- self.close()
-
- def doWrite(self):
- self._callback(self._fd)
-
-
-@skipIfNoTwisted
-class ReactorReaderWriterTest(ReactorTestCase):
- def _set_nonblocking(self, fd):
- flags = fcntl.fcntl(fd, fcntl.F_GETFL)
- fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
-
- def setUp(self):
- super(ReactorReaderWriterTest, self).setUp()
- r, w = os.pipe()
- self._set_nonblocking(r)
- self._set_nonblocking(w)
- set_close_exec(r)
- set_close_exec(w)
- self._p1 = os.fdopen(r, "rb", 0)
- self._p2 = os.fdopen(w, "wb", 0)
-
- def tearDown(self):
- super(ReactorReaderWriterTest, self).tearDown()
- self._p1.close()
- self._p2.close()
-
- def _testReadWrite(self):
- """
- In this test the writer writes an 'x' to its fd. The reader
- reads it, check the value and ends the test.
- """
- self.shouldWrite = True
-
- def checkReadInput(fd):
- self.assertEquals(fd.read(1), b'x')
- self._reactor.stop()
-
- def writeOnce(fd):
- if self.shouldWrite:
- self.shouldWrite = False
- fd.write(b'x')
- self._reader = Reader(self._p1, checkReadInput)
- self._writer = Writer(self._p2, writeOnce)
-
- self._reactor.addWriter(self._writer)
-
- # Test that adding the reader twice adds it only once to
- # IOLoop.
- self._reactor.addReader(self._reader)
- self._reactor.addReader(self._reader)
-
- def testReadWrite(self):
- self._reactor.callWhenRunning(self._testReadWrite)
- self._reactor.run()
-
- def _testNoWriter(self):
- """
- In this test we have no writer. Make sure the reader doesn't
- read anything.
- """
- def checkReadInput(fd):
- self.fail("Must not be called.")
-
- def stopTest():
- # Close the writer here since the IOLoop doesn't know
- # about it.
- self._writer.close()
- self._reactor.stop()
- self._reader = Reader(self._p1, checkReadInput)
-
- # We create a writer, but it should never be invoked.
- self._writer = Writer(self._p2, lambda fd: fd.write('x'))
-
- # Test that adding and removing the writer leaves us with no writer.
- self._reactor.addWriter(self._writer)
- self._reactor.removeWriter(self._writer)
-
- # Test that adding and removing the reader doesn't cause
- # unintended effects.
- self._reactor.addReader(self._reader)
-
- # Wake up after a moment and stop the test
- self._reactor.callLater(0.001, stopTest)
-
- def testNoWriter(self):
- self._reactor.callWhenRunning(self._testNoWriter)
- self._reactor.run()
-
# Test various combinations of twisted and tornado http servers,
# http clients, and event loop interfaces.
self.saved_signals = save_signal_handlers()
self.io_loop = IOLoop()
self.io_loop.make_current()
- self.reactor = TornadoReactor()
+ self.reactor = AsyncioSelectorReactor()
def tearDown(self):
self.reactor.disconnectAll()
@skipIfNoTwisted
-class ConvertDeferredTest(unittest.TestCase):
+class ConvertDeferredTest(AsyncTestCase):
+ @gen_test
def test_success(self):
@inlineCallbacks
def fn():
# must have a yield even if it's unreachable.
yield
returnValue(42)
- f = gen.convert_yielded(fn())
- self.assertEqual(f.result(), 42)
+ res = yield fn()
+ self.assertEqual(res, 42)
+ @gen_test
def test_failure(self):
@inlineCallbacks
def fn():
if False:
yield
1 / 0
- f = gen.convert_yielded(fn())
with self.assertRaises(ZeroDivisionError):
- f.result()
-
-
-if have_twisted:
- # Import and run as much of twisted's test suite as possible.
- # This is unfortunately rather dependent on implementation details,
- # but there doesn't appear to be a clean all-in-one conformance test
- # suite for reactors.
- #
- # This is a list of all test suites using the ReactorBuilder
- # available in Twisted 11.0.0 and 11.1.0 (and a blacklist of
- # specific test methods to be disabled).
- twisted_tests = {
- 'twisted.internet.test.test_core.ObjectModelIntegrationTest': [],
- 'twisted.internet.test.test_core.SystemEventTestsBuilder': [
- 'test_iterate', # deliberately not supported
- # Fails on TwistedIOLoop and AsyncIOLoop.
- 'test_runAfterCrash',
- ],
- 'twisted.internet.test.test_fdset.ReactorFDSetTestsBuilder': [
- "test_lostFileDescriptor", # incompatible with epoll and kqueue
- ],
- 'twisted.internet.test.test_process.ProcessTestsBuilder': [
- # Only work as root. Twisted's "skip" functionality works
- # with py27+, but not unittest2 on py26.
- 'test_changeGID',
- 'test_changeUID',
- # This test sometimes fails with EPIPE on a call to
- # kqueue.control. Happens consistently for me with
- # trollius but not asyncio or other IOLoops.
- 'test_childConnectionLost',
- ],
- # Process tests appear to work on OSX 10.7, but not 10.6
- # 'twisted.internet.test.test_process.PTYProcessTestsBuilder': [
- # 'test_systemCallUninterruptedByChildExit',
- # ],
- 'twisted.internet.test.test_tcp.TCPClientTestsBuilder': [
- 'test_badContext', # ssl-related; see also SSLClientTestsMixin
- ],
- 'twisted.internet.test.test_tcp.TCPPortTestsBuilder': [
- # These use link-local addresses and cause firewall prompts on mac
- 'test_buildProtocolIPv6AddressScopeID',
- 'test_portGetHostOnIPv6ScopeID',
- 'test_serverGetHostOnIPv6ScopeID',
- 'test_serverGetPeerOnIPv6ScopeID',
- ],
- 'twisted.internet.test.test_tcp.TCPConnectionTestsBuilder': [],
- 'twisted.internet.test.test_tcp.WriteSequenceTests': [],
- 'twisted.internet.test.test_tcp.AbortConnectionTestCase': [],
- 'twisted.internet.test.test_threads.ThreadTestsBuilder': [],
- 'twisted.internet.test.test_time.TimeTestsBuilder': [],
- # Extra third-party dependencies (pyOpenSSL)
- # 'twisted.internet.test.test_tls.SSLClientTestsMixin': [],
- 'twisted.internet.test.test_udp.UDPServerTestsBuilder': [],
- 'twisted.internet.test.test_unix.UNIXTestsBuilder': [
- # Platform-specific. These tests would be skipped automatically
- # if we were running twisted's own test runner.
- 'test_connectToLinuxAbstractNamespace',
- 'test_listenOnLinuxAbstractNamespace',
- # These tests use twisted's sendmsg.c extension and sometimes
- # fail with what looks like uninitialized memory errors
- # (more common on pypy than cpython, but I've seen it on both)
- 'test_sendFileDescriptor',
- 'test_sendFileDescriptorTriggersPauseProducing',
- 'test_descriptorDeliveredBeforeBytes',
- 'test_avoidLeakingFileDescriptors',
- ],
- 'twisted.internet.test.test_unix.UNIXDatagramTestsBuilder': [
- 'test_listenOnLinuxAbstractNamespace',
- ],
- 'twisted.internet.test.test_unix.UNIXPortTestsBuilder': [],
- }
- if sys.version_info >= (3,):
- # In Twisted 15.2.0 on Python 3.4, the process tests will try to run
- # but fail, due in part to interactions between Tornado's strict
- # warnings-as-errors policy and Twisted's own warning handling
- # (it was not obvious how to configure the warnings module to
- # reconcile the two), and partly due to what looks like a packaging
- # error (process_cli.py missing). For now, just skip it.
- del twisted_tests['twisted.internet.test.test_process.ProcessTestsBuilder']
- for test_name, blacklist in twisted_tests.items():
- try:
- test_class = import_object(test_name)
- except (ImportError, AttributeError):
- continue
- for test_func in blacklist: # type: ignore
- if hasattr(test_class, test_func):
- # The test_func may be defined in a mixin, so clobber
- # it instead of delattr()
- setattr(test_class, test_func, lambda self: None)
-
- def make_test_subclass(test_class):
- class TornadoTest(test_class): # type: ignore
- _reactors = ["tornado.platform.twisted._TestReactor"]
-
- def setUp(self):
- # Twisted's tests expect to be run from a temporary
- # directory; they create files in their working directory
- # and don't always clean up after themselves.
- self.__curdir = os.getcwd()
- self.__tempdir = tempfile.mkdtemp()
- os.chdir(self.__tempdir)
- super(TornadoTest, self).setUp() # type: ignore
-
- def tearDown(self):
- super(TornadoTest, self).tearDown() # type: ignore
- os.chdir(self.__curdir)
- shutil.rmtree(self.__tempdir)
-
- def flushWarnings(self, *args, **kwargs):
- # This is a hack because Twisted and Tornado have
- # differing approaches to warnings in tests.
- # Tornado sets up a global set of warnings filters
- # in runtests.py, while Twisted patches the filter
- # list in each test. The net effect is that
- # Twisted's tests run with Tornado's increased
- # strictness (BytesWarning and ResourceWarning are
- # enabled) but without our filter rules to ignore those
- # warnings from Twisted code.
- filtered = []
- for w in super(TornadoTest, self).flushWarnings( # type: ignore
- *args, **kwargs):
- if w['category'] in (BytesWarning, ResourceWarning):
- continue
- filtered.append(w)
- return filtered
-
- def buildReactor(self):
- self.__saved_signals = save_signal_handlers()
- return test_class.buildReactor(self)
-
- def unbuildReactor(self, reactor):
- test_class.unbuildReactor(self, reactor)
- # Clean up file descriptors (especially epoll/kqueue
- # objects) eagerly instead of leaving them for the
- # GC. Unfortunately we can't do this in reactor.stop
- # since twisted expects to be able to unregister
- # connections in a post-shutdown hook.
- reactor._io_loop.close(all_fds=True)
- restore_signal_handlers(self.__saved_signals)
-
- TornadoTest.__name__ = test_class.__name__
- return TornadoTest
- test_subclass = make_test_subclass(test_class)
- globals().update(test_subclass.makeTestCaseClasses())
-
- # Since we're not using twisted's test runner, it's tricky to get
- # logging set up well. Most of the time it's easiest to just
- # leave it turned off, but while working on these tests you may want
- # to uncomment one of the other lines instead.
- log.defaultObserver.stop()
- # import sys; log.startLogging(sys.stderr, setStdout=0)
- # log.startLoggingWithObserver(log.PythonLoggingObserver().emit, setStdout=0)
- # import logging; logging.getLogger('twisted').setLevel(logging.WARNING)
-
- # Twisted recently introduced a new logger; disable that one too.
- try:
- from twisted.logger import globalLogBeginner # type: ignore
- except ImportError:
- pass
- else:
- globalLogBeginner.beginLoggingTo([], redirectStandardIO=False)
+ yield fn()
+
if __name__ == "__main__":
unittest.main()