""" EINTR tests for the os module. """
def new_sleep_process(self):
- code = 'import time; time.sleep(%r)' % self.sleep_time
+ code = f'import time; time.sleep({self.sleep_time!r})'
return self.subprocess(code)
def _test_wait_multiple(self, wait_func):
def test_wait4(self):
self._test_wait_single(lambda pid: os.wait4(pid, 0))
- def test_read(self):
+ def _interrupted_reads(self):
+ """Make a fd which will force block on read of expected bytes."""
rd, wr = os.pipe()
self.addCleanup(os.close, rd)
# wr closed explicitly by parent
# the payload below are smaller than PIPE_BUF, hence the writes will be
# atomic
- datas = [b"hello", b"world", b"spam"]
+ data = [b"hello", b"world", b"spam"]
code = '\n'.join((
'import os, sys, time',
'',
'wr = int(sys.argv[1])',
- 'datas = %r' % datas,
- 'sleep_time = %r' % self.sleep_time,
+ f'data = {data!r}',
+ f'sleep_time = {self.sleep_time!r}',
'',
- 'for data in datas:',
+ 'for item in data:',
' # let the parent block on read()',
' time.sleep(sleep_time)',
- ' os.write(wr, data)',
+ ' os.write(wr, item)',
))
proc = self.subprocess(code, str(wr), pass_fds=[wr])
with kill_on_error(proc):
os.close(wr)
- for data in datas:
- self.assertEqual(data, os.read(rd, len(data)))
+ for datum in data:
+ yield rd, datum
self.assertEqual(proc.wait(), 0)
- def test_readinto(self):
- rd, wr = os.pipe()
- self.addCleanup(os.close, rd)
- # wr closed explicitly by parent
-
- # the payload below are smaller than PIPE_BUF, hence the writes will be
- # atomic
- datas = [b"hello", b"world", b"spam"]
-
- code = '\n'.join((
- 'import os, sys, time',
- '',
- 'wr = int(sys.argv[1])',
- 'datas = %r' % datas,
- 'sleep_time = %r' % self.sleep_time,
- '',
- 'for data in datas:',
- ' # let the parent block on read()',
- ' time.sleep(sleep_time)',
- ' os.write(wr, data)',
- ))
+ def test_read(self):
+ for fd, expected in self._interrupted_reads():
+ self.assertEqual(expected, os.read(fd, len(expected)))
- proc = self.subprocess(code, str(wr), pass_fds=[wr])
- with kill_on_error(proc):
- os.close(wr)
- for data in datas:
- buffer = bytearray(len(data))
- self.assertEqual(os.readinto(rd, buffer), len(data))
- self.assertEqual(buffer, data)
- self.assertEqual(proc.wait(), 0)
+ def test_readinto(self):
+ for fd, expected in self._interrupted_reads():
+ buffer = bytearray(len(expected))
+ self.assertEqual(os.readinto(fd, buffer), len(expected))
+ self.assertEqual(buffer, expected)
def test_write(self):
rd, wr = os.pipe()
'import io, os, sys, time',
'',
'rd = int(sys.argv[1])',
- 'sleep_time = %r' % self.sleep_time,
- 'data = b"x" * %s' % support.PIPE_MAX_SIZE,
+ f'sleep_time = {self.sleep_time!r}',
+ f'data = b"x" * {support.PIPE_MAX_SIZE}',
'data_len = len(data)',
'',
'# let the parent block on write()',
'',
'value = read_data.getvalue()',
'if value != data:',
- ' raise Exception("read error: %s vs %s bytes"',
- ' % (len(value), data_len))',
+ ' raise Exception(f"read error: {len(value)}'
+ ' vs {data_len} bytes")',
))
proc = self.subprocess(code, str(rd), pass_fds=[rd])
# wr closed explicitly by parent
# single-byte payload guard us against partial recv
- datas = [b"x", b"y", b"z"]
+ data = [b"x", b"y", b"z"]
code = '\n'.join((
'import os, socket, sys, time',
'',
'fd = int(sys.argv[1])',
- 'family = %s' % int(wr.family),
- 'sock_type = %s' % int(wr.type),
- 'datas = %r' % datas,
- 'sleep_time = %r' % self.sleep_time,
+ f'family = {int(wr.family)}',
+ f'sock_type = {int(wr.type)}',
+ f'data = {data!r}',
+ f'sleep_time = {self.sleep_time!r}',
'',
'wr = socket.fromfd(fd, family, sock_type)',
'os.close(fd)',
'',
'with wr:',
- ' for data in datas:',
+ ' for item in data:',
' # let the parent block on recv()',
' time.sleep(sleep_time)',
- ' wr.sendall(data)',
+ ' wr.sendall(item)',
))
fd = wr.fileno()
proc = self.subprocess(code, str(fd), pass_fds=[fd])
with kill_on_error(proc):
wr.close()
- for data in datas:
- self.assertEqual(data, recv_func(rd, len(data)))
+ for item in data:
+ self.assertEqual(item, recv_func(rd, len(item)))
self.assertEqual(proc.wait(), 0)
def test_recv(self):
'import os, socket, sys, time',
'',
'fd = int(sys.argv[1])',
- 'family = %s' % int(rd.family),
- 'sock_type = %s' % int(rd.type),
- 'sleep_time = %r' % self.sleep_time,
- 'data = b"xyz" * %s' % (support.SOCK_MAX_SIZE // 3),
+ f'family = {int(rd.family)}',
+ f'sock_type = {int(rd.type)}',
+ f'sleep_time = {self.sleep_time!r}',
+ f'data = b"xyz" * {support.SOCK_MAX_SIZE // 3}',
'data_len = len(data)',
'',
'rd = socket.fromfd(fd, family, sock_type)',
' n += rd.recv_into(memoryview(received_data)[n:])',
'',
'if received_data != data:',
- ' raise Exception("recv error: %s vs %s bytes"',
- ' % (len(received_data), data_len))',
+ ' raise Exception(f"recv error: {len(received_data)}'
+ ' vs {data_len} bytes")',
))
fd = rd.fileno()
code = '\n'.join((
'import socket, time',
'',
- 'host = %r' % socket_helper.HOST,
- 'port = %s' % port,
- 'sleep_time = %r' % self.sleep_time,
+ f'host = {socket_helper.HOST!r}',
+ f'port = {port}',
+ f'sleep_time = {self.sleep_time!r}',
'',
'# let parent block on accept()',
'time.sleep(sleep_time)',
os_helper.unlink(filename)
try:
os.mkfifo(filename)
- except PermissionError as e:
- self.skipTest('os.mkfifo(): %s' % e)
+ except PermissionError as exc:
+ self.skipTest(f'os.mkfifo(): {exc!r}')
self.addCleanup(os_helper.unlink, filename)
code = '\n'.join((
'import os, time',
'',
- 'path = %a' % filename,
- 'sleep_time = %r' % self.sleep_time,
+ f'path = {filename!a}',
+ f'sleep_time = {self.sleep_time!r}',
'',
'# let the parent block',
'time.sleep(sleep_time)',
def check_sigwait(self, wait_func):
signum = signal.SIGUSR1
- pid = os.getpid()
old_handler = signal.signal(signum, lambda *args: None)
self.addCleanup(signal.signal, signum, old_handler)
code = '\n'.join((
'import os, time',
- 'pid = %s' % os.getpid(),
- 'signum = %s' % int(signum),
- 'sleep_time = %r' % self.sleep_time,
+ f'pid = {os.getpid()}',
+ f'signum = {int(signum)}',
+ f'sleep_time = {self.sleep_time!r}',
'time.sleep(sleep_time)',
'os.kill(pid, signum)',
))
- old_mask = signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
+ signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
self.addCleanup(signal.pthread_sigmask, signal.SIG_UNBLOCK, [signum])
proc = self.subprocess(code)