self.run_with_own_gil(script)
-class MiscTests(unittest.TestCase):
- def test_atomic_write_should_notice_incomplete_writes(self):
+class PatchAtomicWrites:
+ def __init__(self, truncate_at_length, never_complete=False):
+ self.truncate_at_length = truncate_at_length
+ self.never_complete = never_complete
+ self.seen_write = False
+ self._children = []
+
+ def __enter__(self):
import _pyio
oldwrite = os.write
- seen_write = False
-
- truncate_at_length = 100
# Emulate an os.write that only writes partial data.
def write(fd, data):
- nonlocal seen_write
- seen_write = True
- return oldwrite(fd, data[:truncate_at_length])
+ if self.seen_write and self.never_complete:
+ return None
+ self.seen_write = True
+ return oldwrite(fd, data[:self.truncate_at_length])
# Need to patch _io to be _pyio, so that io.FileIO is affected by the
# os.write patch.
- with (support.swap_attr(_bootstrap_external, '_io', _pyio),
- support.swap_attr(os, 'write', write)):
- with self.assertRaises(OSError):
- # Make sure we write something longer than the point where we
- # truncate.
- content = b'x' * (truncate_at_length * 2)
- _bootstrap_external._write_atomic(os_helper.TESTFN, content)
- assert seen_write
+ self.children = [
+ support.swap_attr(_bootstrap_external, '_io', _pyio),
+ support.swap_attr(os, 'write', write)
+ ]
+ for child in self.children:
+ child.__enter__()
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ for child in self.children:
+ child.__exit__(exc_type, exc_val, exc_tb)
+
+
+class MiscTests(unittest.TestCase):
+
+ def test_atomic_write_retries_incomplete_writes(self):
+ truncate_at_length = 100
+ length = truncate_at_length * 2
+
+ with PatchAtomicWrites(truncate_at_length=truncate_at_length) as cm:
+ # Make sure we write something longer than the point where we
+ # truncate.
+ content = b'x' * length
+ _bootstrap_external._write_atomic(os_helper.TESTFN, content)
+ self.assertTrue(cm.seen_write)
+
+ self.assertEqual(os.stat(support.os_helper.TESTFN).st_size, length)
+ os.unlink(support.os_helper.TESTFN)
+
+ def test_atomic_write_errors_if_unable_to_complete(self):
+ truncate_at_length = 100
+
+ with (
+ PatchAtomicWrites(
+ truncate_at_length=truncate_at_length, never_complete=True,
+ ) as cm,
+ self.assertRaises(OSError)
+ ):
+ # Make sure we write something longer than the point where we
+ # truncate.
+ content = b'x' * (truncate_at_length * 2)
+ _bootstrap_external._write_atomic(os_helper.TESTFN, content)
+ self.assertTrue(cm.seen_write)
with self.assertRaises(OSError):
os.stat(support.os_helper.TESTFN) # Check that the file did not get written.