try:
wio = self.io.open(w, **fdopen_kwargs)
t.start()
- signal.alarm(1)
# Fill the pipe enough that the write will be blocking.
# It will be interrupted by the timer armed above. Since the
# other thread has read one byte, the low-level write will
# The buffered IO layer must check for pending signal
# handlers, which in this case will invoke alarm_interrupt().
try:
+ signal.alarm(1)
with self.assertRaises(ZeroDivisionError):
wio.write(item * (support.PIPE_MAX_SIZE // len(item) + 1))
finally:
+ signal.alarm(0)
t.join()
+
# We got one byte, get another one and check that it isn't a
# repeat of the first one.
read_results.append(os.read(r, 1))
if isinstance(exc, RuntimeError):
self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
finally:
+ signal.alarm(0)
wio.close()
os.close(r)
# - third raw read() returns b"bar"
self.assertEqual(decode(rio.read(6)), "foobar")
finally:
+ signal.alarm(0)
rio.close()
os.close(w)
os.close(r)
self.assertIsNone(error[0])
self.assertEqual(N, sum(len(x) for x in read_results))
finally:
+ signal.alarm(0)
write_finished = True
os.close(w)
os.close(r)
def setUp(self):
# isatty() and close() can hang on some platforms. Set an alarm
# before running the test to make sure we don't hang forever.
- self.old_alarm = signal.signal(signal.SIGALRM, self.handle_sig)
+ old_alarm = signal.signal(signal.SIGALRM, self.handle_sig)
+ self.addCleanup(signal.signal, signal.SIGALRM, old_alarm)
+ self.addCleanup(signal.alarm, 0)
signal.alarm(10)
- def tearDown(self):
- # remove alarm, restore old alarm handler
- signal.alarm(0)
- signal.signal(signal.SIGALRM, self.old_alarm)
-
def handle_sig(self, sig, frame):
self.fail("isatty hung")
else:
self.fail("pause returned of its own accord, and the signal"
" didn't arrive after another second.")
+ finally:
+ signal.alarm(0)
# Issue 3864. Unknown if this affects earlier versions of freebsd also.
@unittest.skipIf(sys.platform=='freebsd6',
import select
signal.alarm(1)
- before_time = time.time()
- # We attempt to get a signal during the sleep,
- # before select is called
- time.sleep(self.TIMEOUT_FULL)
- mid_time = time.time()
+ try:
+ before_time = time.time()
+ # We attempt to get a signal during the sleep,
+ # before select is called
+ time.sleep(self.TIMEOUT_FULL)
+ mid_time = time.time()
+ finally:
+ signal.alarm(0)
+
self.assertTrue(mid_time - before_time < self.TIMEOUT_HALF)
select.select([self.read], [], [], self.TIMEOUT_FULL)
after_time = time.time()
import select
signal.alarm(1)
- before_time = time.time()
- # We attempt to get a signal during the select call
- self.assertRaises(select.error, select.select,
- [self.read], [], [], self.TIMEOUT_FULL)
- after_time = time.time()
+ try:
+ before_time = time.time()
+ # We attempt to get a signal during the select call
+ self.assertRaises(select.error, select.select,
+ [self.read], [], [], self.TIMEOUT_FULL)
+ after_time = time.time()
+ finally:
+ signal.alarm(0)
+
self.assertTrue(after_time - before_time < self.TIMEOUT_HALF)
def setUp(self):
raise Alarm
old_alarm = signal.signal(signal.SIGALRM, alarm_handler)
try:
- signal.alarm(2) # POSIX allows alarm to be up to 1 second early
try:
+ signal.alarm(2) # POSIX allows alarm to be up to 1 second early
foo = self.serv.accept()
except socket.timeout:
self.fail("caught timeout instead of Alarm")
"""Test all socket servers."""
def setUp(self):
+ self.addCleanup(signal_alarm, 0)
signal_alarm(60) # Kill deadlocks after 60 seconds.
self.port_seed = 0
self.test_files = []
def tearDown(self):
- signal_alarm(0) # Didn't deadlock.
+ self.doCleanups()
reap_children()
for fn in self.test_files:
kw = {stream: subprocess.PIPE}
with subprocess.Popen(args, **kw) as process:
signal.alarm(1)
- # communicate() will be interrupted by SIGALRM
- process.communicate()
+ try:
+ # communicate() will be interrupted by SIGALRM
+ process.communicate()
+ finally:
+ signal.alarm(0)
@unittest.skipIf(mswindows, "POSIX specific tests")
# wait for it return.
if signal_blackboard[signal.SIGUSR1]['tripped'] == 0 \
or signal_blackboard[signal.SIGUSR2]['tripped'] == 0:
- signal.alarm(1)
- signal.pause()
- signal.alarm(0)
+ try:
+ signal.alarm(1)
+ signal.pause()
+ finally:
+ signal.alarm(0)
self.assertEqual( signal_blackboard[signal.SIGUSR1]['tripped'], 1)
self.assertEqual( signal_blackboard[signal.SIGUSR1]['tripped_by'],