import os
import pickle
import select
+import selectors
import shutil
import signal
import socket
self.addCleanup(os.close, fd)
return fd
+ def read_count_signaled(self, fd):
+ # read 8 bytes
+ data = os.read(fd, 8)
+ return int.from_bytes(data, byteorder=sys.byteorder)
+
def test_timerfd_initval(self):
fd = self.timerfd_create(time.CLOCK_REALTIME)
self.assertAlmostEqual(next_expiration, initial_expiration, places=3)
def test_timerfd_non_blocking(self):
- size = 8 # read 8 bytes
fd = self.timerfd_create(time.CLOCK_REALTIME, flags=os.TFD_NONBLOCK)
# 0.1 second later
initial_expiration = 0.1
- _, _ = os.timerfd_settime(fd, initial=initial_expiration, interval=0)
+ os.timerfd_settime(fd, initial=initial_expiration, interval=0)
# read() raises OSError with errno is EAGAIN for non-blocking timer.
with self.assertRaises(OSError) as ctx:
- _ = os.read(fd, size)
+ self.read_count_signaled(fd)
self.assertEqual(ctx.exception.errno, errno.EAGAIN)
# Wait more than 0.1 seconds
time.sleep(initial_expiration + 0.1)
# confirm if timerfd is readable and read() returns 1 as bytes.
- n = os.read(fd, size)
- count_signaled = int.from_bytes(n, byteorder=sys.byteorder)
- self.assertEqual(count_signaled, 1)
+ self.assertEqual(self.read_count_signaled(fd), 1)
def test_timerfd_negative(self):
one_sec_in_nsec = 10**9
for flags in test_flags:
with self.subTest(flags=flags, initial=initial, interval=interval):
with self.assertRaises(OSError) as context:
- _, _ = os.timerfd_settime(fd, flags=flags, initial=initial, interval=interval)
+ os.timerfd_settime(fd, flags=flags, initial=initial, interval=interval)
self.assertEqual(context.exception.errno, errno.EINVAL)
with self.assertRaises(OSError) as context:
initial_ns = int( one_sec_in_nsec * initial )
interval_ns = int( one_sec_in_nsec * interval )
- _, _ = os.timerfd_settime_ns(fd, flags=flags, initial=initial_ns, interval=interval_ns)
+ os.timerfd_settime_ns(fd, flags=flags, initial=initial_ns, interval=interval_ns)
self.assertEqual(context.exception.errno, errno.EINVAL)
def test_timerfd_interval(self):
- size = 8 # read 8 bytes
fd = self.timerfd_create(time.CLOCK_REALTIME)
# 1 second
# 0.5 second
interval = 0.5
- _, _ = os.timerfd_settime(fd, initial=initial_expiration, interval=interval)
+ os.timerfd_settime(fd, initial=initial_expiration, interval=interval)
# timerfd_gettime
next_expiration, interval2 = os.timerfd_gettime(fd)
count = 3
t = time.perf_counter()
for _ in range(count):
- n = os.read(fd, size)
- count_signaled = int.from_bytes(n, byteorder=sys.byteorder)
- self.assertEqual(count_signaled, 1)
+ self.assertEqual(self.read_count_signaled(fd), 1)
t = time.perf_counter() - t
total_time = initial_expiration + interval * (count - 1)
# wait 3.5 time of interval
time.sleep( (count+0.5) * interval)
- n = os.read(fd, size)
- count_signaled = int.from_bytes(n, byteorder=sys.byteorder)
- self.assertEqual(count_signaled, count)
+ self.assertEqual(self.read_count_signaled(fd), count)
def test_timerfd_TFD_TIMER_ABSTIME(self):
- size = 8 # read 8 bytes
fd = self.timerfd_create(time.CLOCK_REALTIME)
now = time.clock_gettime(time.CLOCK_REALTIME)
# not interval timer
interval = 0
- _, _ = os.timerfd_settime(fd, flags=os.TFD_TIMER_ABSTIME, initial=initial_expiration, interval=interval)
+ os.timerfd_settime(fd, flags=os.TFD_TIMER_ABSTIME, initial=initial_expiration, interval=interval)
# timerfd_gettime
# Note: timerfd_gettime returns relative values even if TFD_TIMER_ABSTIME is specified.
self.assertAlmostEqual(next_expiration, offset, places=3)
t = time.perf_counter()
- n = os.read(fd, size)
- count_signaled = int.from_bytes(n, byteorder=sys.byteorder)
+ count_signaled = self.read_count_signaled(fd)
t = time.perf_counter() - t
self.assertEqual(count_signaled, 1)
self.assertGreater(t, offset - self.CLOCK_RES)
def test_timerfd_select(self):
- size = 8 # read 8 bytes
fd = self.timerfd_create(time.CLOCK_REALTIME, flags=os.TFD_NONBLOCK)
rfd, wfd, xfd = select.select([fd], [fd], [fd], 0)
# every 0.125 second
interval = 0.125
- _, _ = os.timerfd_settime(fd, initial=initial_expiration, interval=interval)
+ os.timerfd_settime(fd, initial=initial_expiration, interval=interval)
count = 3
t = time.perf_counter()
for _ in range(count):
rfd, wfd, xfd = select.select([fd], [fd], [fd], initial_expiration + interval)
self.assertEqual((rfd, wfd, xfd), ([fd], [], []))
- n = os.read(fd, size)
- count_signaled = int.from_bytes(n, byteorder=sys.byteorder)
- self.assertEqual(count_signaled, 1)
+ self.assertEqual(self.read_count_signaled(fd), 1)
t = time.perf_counter() - t
total_time = initial_expiration + interval * (count - 1)
self.assertGreater(t, total_time)
- def test_timerfd_epoll(self):
- size = 8 # read 8 bytes
+ def check_timerfd_poll(self, nanoseconds):
fd = self.timerfd_create(time.CLOCK_REALTIME, flags=os.TFD_NONBLOCK)
- ep = select.epoll()
- ep.register(fd, select.EPOLLIN)
- self.addCleanup(ep.close)
+ selector = selectors.DefaultSelector()
+ selector.register(fd, selectors.EVENT_READ)
+ self.addCleanup(selector.close)
+ sec_to_nsec = 10 ** 9
# 0.25 second
- initial_expiration = 0.25
+ initial_expiration_ns = sec_to_nsec // 4
# every 0.125 second
- interval = 0.125
+ interval_ns = sec_to_nsec // 8
- _, _ = os.timerfd_settime(fd, initial=initial_expiration, interval=interval)
+ if nanoseconds:
+ os.timerfd_settime_ns(fd,
+ initial=initial_expiration_ns,
+ interval=interval_ns)
+ else:
+ os.timerfd_settime(fd,
+ initial=initial_expiration_ns / sec_to_nsec,
+ interval=interval_ns / sec_to_nsec)
count = 3
- t = time.perf_counter()
+ if nanoseconds:
+ t = time.perf_counter_ns()
+ else:
+ t = time.perf_counter()
for i in range(count):
- timeout_margin = interval
+ timeout_margin_ns = interval_ns
if i == 0:
- timeout = initial_expiration + interval + timeout_margin
+ timeout_ns = initial_expiration_ns + interval_ns + timeout_margin_ns
else:
- timeout = interval + timeout_margin
- # epoll timeout is in seconds.
- events = ep.poll(timeout)
- self.assertEqual(events, [(fd, select.EPOLLIN)])
- n = os.read(fd, size)
- count_signaled = int.from_bytes(n, byteorder=sys.byteorder)
- self.assertEqual(count_signaled, 1)
+ timeout_ns = interval_ns + timeout_margin_ns
- t = time.perf_counter() - t
+ ready = selector.select(timeout_ns / sec_to_nsec)
+ self.assertEqual(len(ready), 1, ready)
+ event = ready[0][1]
+ self.assertEqual(event, selectors.EVENT_READ)
- total_time = initial_expiration + interval * (count - 1)
- self.assertGreater(t, total_time)
- ep.unregister(fd)
+ self.assertEqual(self.read_count_signaled(fd), 1)
+
+ total_time = initial_expiration_ns + interval_ns * (count - 1)
+ if nanoseconds:
+ dt = time.perf_counter_ns() - t
+ self.assertGreater(dt, total_time)
+ else:
+ dt = time.perf_counter() - t
+ self.assertGreater(dt, total_time / sec_to_nsec)
+ selector.unregister(fd)
+
+ def test_timerfd_poll(self):
+ self.check_timerfd_poll(False)
+
+ def test_timerfd_ns_poll(self):
+ self.check_timerfd_poll(True)
def test_timerfd_ns_initval(self):
one_sec_in_nsec = 10**9
self.assertAlmostEqual(next_expiration_ns, initial_expiration_ns, delta=limit_error)
def test_timerfd_ns_interval(self):
- size = 8 # read 8 bytes
one_sec_in_nsec = 10**9
limit_error = one_sec_in_nsec // 10**3
fd = self.timerfd_create(time.CLOCK_REALTIME)
# every 0.5 second
interval_ns = one_sec_in_nsec // 2
- _, _ = os.timerfd_settime_ns(fd, initial=initial_expiration_ns, interval=interval_ns)
+ os.timerfd_settime_ns(fd, initial=initial_expiration_ns, interval=interval_ns)
# timerfd_gettime
next_expiration_ns, interval_ns2 = os.timerfd_gettime_ns(fd)
count = 3
t = time.perf_counter_ns()
for _ in range(count):
- n = os.read(fd, size)
- count_signaled = int.from_bytes(n, byteorder=sys.byteorder)
- self.assertEqual(count_signaled, 1)
+ self.assertEqual(self.read_count_signaled(fd), 1)
t = time.perf_counter_ns() - t
total_time_ns = initial_expiration_ns + interval_ns * (count - 1)
# wait 3.5 time of interval
time.sleep( (count+0.5) * interval_ns / one_sec_in_nsec)
- n = os.read(fd, size)
- count_signaled = int.from_bytes(n, byteorder=sys.byteorder)
- self.assertEqual(count_signaled, count)
+ self.assertEqual(self.read_count_signaled(fd), count)
def test_timerfd_ns_TFD_TIMER_ABSTIME(self):
- size = 8 # read 8 bytes
one_sec_in_nsec = 10**9
limit_error = one_sec_in_nsec // 10**3
fd = self.timerfd_create(time.CLOCK_REALTIME)
# not interval timer
interval_ns = 0
- _, _ = os.timerfd_settime_ns(fd, flags=os.TFD_TIMER_ABSTIME, initial=initial_expiration_ns, interval=interval_ns)
+ os.timerfd_settime_ns(fd, flags=os.TFD_TIMER_ABSTIME, initial=initial_expiration_ns, interval=interval_ns)
# timerfd_gettime
# Note: timerfd_gettime returns relative values even if TFD_TIMER_ABSTIME is specified.
self.assertLess(abs(next_expiration_ns - offset_ns), limit_error)
t = time.perf_counter_ns()
- n = os.read(fd, size)
- count_signaled = int.from_bytes(n, byteorder=sys.byteorder)
+ count_signaled = self.read_count_signaled(fd)
t = time.perf_counter_ns() - t
self.assertEqual(count_signaled, 1)
self.assertGreater(t, offset_ns - self.CLOCK_RES_NS)
def test_timerfd_ns_select(self):
- size = 8 # read 8 bytes
one_sec_in_nsec = 10**9
fd = self.timerfd_create(time.CLOCK_REALTIME, flags=os.TFD_NONBLOCK)
# every 0.125 second
interval_ns = one_sec_in_nsec // 8
- _, _ = os.timerfd_settime_ns(fd, initial=initial_expiration_ns, interval=interval_ns)
+ os.timerfd_settime_ns(fd, initial=initial_expiration_ns, interval=interval_ns)
count = 3
t = time.perf_counter_ns()
for _ in range(count):
rfd, wfd, xfd = select.select([fd], [fd], [fd], (initial_expiration_ns + interval_ns) / 1e9 )
self.assertEqual((rfd, wfd, xfd), ([fd], [], []))
- n = os.read(fd, size)
- count_signaled = int.from_bytes(n, byteorder=sys.byteorder)
- self.assertEqual(count_signaled, 1)
+ self.assertEqual(self.read_count_signaled(fd), 1)
t = time.perf_counter_ns() - t
total_time_ns = initial_expiration_ns + interval_ns * (count - 1)
self.assertGreater(t, total_time_ns)
- def test_timerfd_ns_epoll(self):
- size = 8 # read 8 bytes
- one_sec_in_nsec = 10**9
- fd = self.timerfd_create(time.CLOCK_REALTIME, flags=os.TFD_NONBLOCK)
-
- ep = select.epoll()
- ep.register(fd, select.EPOLLIN)
- self.addCleanup(ep.close)
-
- # 0.25 second
- initial_expiration_ns = one_sec_in_nsec // 4
- # every 0.125 second
- interval_ns = one_sec_in_nsec // 8
-
- _, _ = os.timerfd_settime_ns(fd, initial=initial_expiration_ns, interval=interval_ns)
-
- count = 3
- t = time.perf_counter_ns()
- for i in range(count):
- timeout_margin_ns = interval_ns
- if i == 0:
- timeout_ns = initial_expiration_ns + interval_ns + timeout_margin_ns
- else:
- timeout_ns = interval_ns + timeout_margin_ns
-
- # epoll timeout is in seconds.
- events = ep.poll(timeout_ns / one_sec_in_nsec)
- self.assertEqual(events, [(fd, select.EPOLLIN)])
- n = os.read(fd, size)
- count_signaled = int.from_bytes(n, byteorder=sys.byteorder)
- self.assertEqual(count_signaled, 1)
-
- t = time.perf_counter_ns() - t
-
- total_time = initial_expiration_ns + interval_ns * (count - 1)
- self.assertGreater(t, total_time)
- ep.unregister(fd)
-
class OSErrorTests(unittest.TestCase):
def setUp(self):
class Str(str):