self._kqueue.close()
def register(self, fd, events):
+ if fd in self._active:
+ raise IOError("fd %d already registered" % fd)
self._control(fd, events, select.KQ_EV_ADD)
self._active[fd] = events
pass
def register(self, fd, events):
+ if fd in self.read_fds or fd in self.write_fds or fd in self.error_fds:
+ raise IOError("fd %d already registered" % fd)
if events & IOLoop.READ:
self.read_fds.add(fd)
if events & IOLoop.WRITE:
from __future__ import absolute_import, division, with_statement
import datetime
-import unittest
+import socket
import time
+import unittest
-from tornado.testing import AsyncTestCase, LogTrapTestCase
+from tornado.ioloop import IOLoop
+from tornado.netutil import bind_sockets
+from tornado.testing import AsyncTestCase, LogTrapTestCase, get_unused_port
class TestIOLoop(AsyncTestCase, LogTrapTestCase):
self.io_loop.add_timeout(datetime.timedelta(microseconds=1), self.stop)
self.wait()
+ def test_multiple_add(self):
+ [sock] = bind_sockets(get_unused_port(), '127.0.0.1',
+ family=socket.AF_INET)
+ try:
+ self.io_loop.add_handler(sock.fileno(), lambda fd, events: None,
+ IOLoop.READ)
+ # Attempting to add the same handler twice fails
+ # (with a platform-dependent exception)
+ self.assertRaises(Exception, self.io_loop.add_handler,
+ sock.fileno(), lambda fd, events: None,
+ IOLoop.READ)
+ finally:
+ sock.close()
+
+
if __name__ == "__main__":
unittest.main()