+import fcntl
import os
import sys
import thread
+import unittest
import tornado.twisted.reactor
tornado.twisted.reactor.install()
from twisted.internet import reactor
from twisted.internet.interfaces import IReadDescriptor, IWriteDescriptor
-
from twisted.python import log
+from tornado.platform.auto import Waker
from tornado.twisted.reactor import TornadoReactor
from tornado.testing import AsyncTestCase, LogTrapTestCase
-import unittest
from zope.interface import implements
self._callback(self._fd)
class ReactorReaderWriterTest(unittest.TestCase):
+ def _set_close_exec(self, fd):
+ flags = fcntl.fcntl(fd, fcntl.F_GETFD)
+ fcntl.fcntl(fd, fcntl.F_SETFD, flags | fcntl.FD_CLOEXEC)
+
+ def _set_nonblocking(self, fd):
+ flags = fcntl.fcntl(fd, fcntl.F_GETFL)
+ fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
+
def setUp(self):
self._reactor = TornadoReactor()
r, w = os.pipe()
- self._reactor._ioloop._set_nonblocking(r)
- self._reactor._ioloop._set_nonblocking(w)
- self._reactor._ioloop._set_close_exec(r)
- self._reactor._ioloop._set_close_exec(w)
+ self._set_nonblocking(r)
+ self._set_nonblocking(w)
+ self._set_close_exec(r)
+ self._set_close_exec(w)
self._p1 = os.fdopen(r, "rb", 0)
self._p2 = os.fdopen(w, "wb", 0)