from twisted.python import failure, log
from twisted.internet import error
-from zope.interface import implements
+from zope.interface import implementer
import tornado
import tornado.ioloop
class TornadoDelayedCall(object):
"""DelayedCall object for Tornado."""
- # Note that zope.interface.implements is deprecated in
- # zope.interface 4.0, because it cannot work in python 3. The
- # replacement is a class decorator, which cannot work on python
- # 2.5. So when twisted supports python 3, we'll need to drop 2.5
- # support on this module to make it work.
- implements(IDelayedCall)
-
def __init__(self, reactor, seconds, f, *args, **kw):
self._reactor = reactor
self._func = functools.partial(f, *args, **kw)
def active(self):
return self._active
+# Fake class decorator for python 2.5 compatibility
+TornadoDelayedCall = implementer(IDelayedCall)(TornadoDelayedCall)
class TornadoReactor(PosixReactorBase):
timed call functionality on top of `IOLoop.add_timeout` rather than
using the implementation in `PosixReactorBase`.
"""
- implements(IReactorTime, IReactorFDSet)
-
def __init__(self, io_loop=None):
if not io_loop:
io_loop = tornado.ioloop.IOLoop.instance()
self._io_loop.start()
if self._stopped:
self.fireSystemEvent("shutdown")
+TornadoReactor = implementer(IReactorTime, IReactorFDSet)(TornadoReactor)
class _TestReactor(TornadoReactor):
warnings.filterwarnings("ignore", category=DeprecationWarning)
warnings.filterwarnings("error", category=DeprecationWarning,
module=r"tornado\..*")
- # tornado.platform.twisted uses a deprecated function from
- # zope.interface in order to maintain compatibility with
- # python 2.5
- warnings.filterwarnings("ignore", category=DeprecationWarning,
- module=r"tornado\.platform\.twisted")
- warnings.filterwarnings("ignore", category=DeprecationWarning,
- module=r"tornado\.test\.twisted_test")
import tornado.testing
tornado.testing.main()
try:
import fcntl
- import twisted
from twisted.internet.defer import Deferred
from twisted.internet.interfaces import IReadDescriptor, IWriteDescriptor
from twisted.internet.protocol import Protocol
from twisted.web.server import Site
from twisted.python import log
from tornado.platform.twisted import TornadoReactor
- from zope.interface import implements
+ from zope.interface import implementer
+ have_twisted = True
except ImportError:
- fcntl = None
- twisted = None
- IReadDescriptor = IWriteDescriptor = None
-
- def implements(f):
- pass
+ have_twisted = False
from tornado.httpclient import AsyncHTTPClient
from tornado.ioloop import IOLoop
self._reactor.run()
-class Reader:
- implements(IReadDescriptor)
-
+class Reader(object):
def __init__(self, fd, callback):
self._fd = fd
self._callback = callback
def doRead(self):
self._callback(self._fd)
+if have_twisted:
+ Reader = implementer(IReadDescriptor)(Reader)
-class Writer:
- implements(IWriteDescriptor)
-
+class Writer(object):
def __init__(self, fd, callback):
self._fd = fd
self._callback = callback
def doWrite(self):
self._callback(self._fd)
-
+if have_twisted:
+ Writer = implementer(IWriteDescriptor)(Writer)
class ReactorReaderWriterTest(ReactorTestCase):
def _set_nonblocking(self, fd):
self.assertEqual(response, 'Hello from tornado!')
-if twisted is None:
+if not have_twisted:
del ReactorWhenRunningTest
del ReactorCallLaterTest
del ReactorTwoCallLaterTest