epoll_events |= select.EPOLLIN
if events & EVENT_WRITE:
epoll_events |= select.EPOLLOUT
- self._epoll.register(key.fd, epoll_events)
+ try:
+ self._epoll.register(key.fd, epoll_events)
+ except BaseException:
+ super().unregister(fileobj)
+ raise
return key
def unregister(self, fileobj):
def register(self, fileobj, events, data=None):
key = super().register(fileobj, events, data)
- if events & EVENT_READ:
- kev = select.kevent(key.fd, select.KQ_FILTER_READ,
- select.KQ_EV_ADD)
- self._kqueue.control([kev], 0, 0)
- if events & EVENT_WRITE:
- kev = select.kevent(key.fd, select.KQ_FILTER_WRITE,
- select.KQ_EV_ADD)
- self._kqueue.control([kev], 0, 0)
+ try:
+ if events & EVENT_READ:
+ kev = select.kevent(key.fd, select.KQ_FILTER_READ,
+ select.KQ_EV_ADD)
+ self._kqueue.control([kev], 0, 0)
+ if events & EVENT_WRITE:
+ kev = select.kevent(key.fd, select.KQ_FILTER_WRITE,
+ select.KQ_EV_ADD)
+ self._kqueue.control([kev], 0, 0)
+ except BaseException:
+ super().unregister(fileobj)
+ raise
return key
def unregister(self, fileobj):
import signal
import socket
import sys
+import tempfile
from test import support
from time import sleep
import unittest
SELECTOR = getattr(selectors, 'EpollSelector', None)
+ def test_register_file(self):
+ # epoll(7) returns EPERM when given a file to watch
+ s = self.SELECTOR()
+ with tempfile.NamedTemporaryFile() as f:
+ with self.assertRaises(IOError):
+ s.register(f, selectors.EVENT_READ)
+ # the SelectorKey has been removed
+ with self.assertRaises(KeyError):
+ s.get_key(f)
+
@unittest.skipUnless(hasattr(selectors, 'KqueueSelector'),
"Test needs selectors.KqueueSelector)")
SELECTOR = getattr(selectors, 'KqueueSelector', None)
+ def test_register_bad_fd(self):
+ # a file descriptor that's been closed should raise an OSError
+ # with EBADF
+ s = self.SELECTOR()
+ bad_f = support.make_bad_fd()
+ with self.assertRaises(OSError) as cm:
+ s.register(bad_f, selectors.EVENT_READ)
+ self.assertEqual(cm.exception.errno, errno.EBADF)
+ # the SelectorKey has been removed
+ with self.assertRaises(KeyError):
+ s.get_key(bad_f)
+
def test_main():
tests = [DefaultSelectorTestCase, SelectSelectorTestCase,