+import array
+import pickle
+import random
+import sys
+import threading
+import time
import unittest
-from test.support import os_helper
+import warnings
+import weakref
+from collections import deque, UserList
+from itertools import cycle, count
+from test import support
+from test.support import os_helper, threading_helper
+from .utils import byteslike, CTestCase, PyTestCase
+
import io # C implementation.
import _pyio as pyio # Python implementation.
+
+class CommonBufferedTests:
+ # Tests common to BufferedReader, BufferedWriter and BufferedRandom
+
+ def test_detach(self):
+ raw = self.MockRawIO()
+ buf = self.tp(raw)
+ self.assertIs(buf.detach(), raw)
+ self.assertRaises(ValueError, buf.detach)
+
+ repr(buf) # Should still work
+
+ def test_fileno(self):
+ rawio = self.MockRawIO()
+ bufio = self.tp(rawio)
+
+ self.assertEqual(42, bufio.fileno())
+
+ def test_invalid_args(self):
+ rawio = self.MockRawIO()
+ bufio = self.tp(rawio)
+ # Invalid whence
+ self.assertRaises(ValueError, bufio.seek, 0, -1)
+ self.assertRaises(ValueError, bufio.seek, 0, 9)
+
+ def test_override_destructor(self):
+ tp = self.tp
+ record = []
+ class MyBufferedIO(tp):
+ def __del__(self):
+ record.append(1)
+ try:
+ f = super().__del__
+ except AttributeError:
+ pass
+ else:
+ f()
+ def close(self):
+ record.append(2)
+ super().close()
+ def flush(self):
+ record.append(3)
+ super().flush()
+ rawio = self.MockRawIO()
+ bufio = MyBufferedIO(rawio)
+ del bufio
+ support.gc_collect()
+ self.assertEqual(record, [1, 2, 3])
+
+ def test_context_manager(self):
+ # Test usability as a context manager
+ rawio = self.MockRawIO()
+ bufio = self.tp(rawio)
+ def _with():
+ with bufio:
+ pass
+ _with()
+ # bufio should now be closed, and using it a second time should raise
+ # a ValueError.
+ self.assertRaises(ValueError, _with)
+
+ def test_error_through_destructor(self):
+ # Test that the exception state is not modified by a destructor,
+ # even if close() fails.
+ rawio = self.CloseFailureIO()
+ with support.catch_unraisable_exception() as cm:
+ with self.assertRaises(AttributeError):
+ self.tp(rawio).xyzzy
+
+ self.assertEqual(cm.unraisable.exc_type, OSError)
+
+ def test_repr(self):
+ raw = self.MockRawIO()
+ b = self.tp(raw)
+ clsname = r"(%s\.)?%s" % (self.tp.__module__, self.tp.__qualname__)
+ self.assertRegex(repr(b), "<%s>" % clsname)
+ raw.name = "dummy"
+ self.assertRegex(repr(b), "<%s name='dummy'>" % clsname)
+ raw.name = b"dummy"
+ self.assertRegex(repr(b), "<%s name=b'dummy'>" % clsname)
+
+ def test_recursive_repr(self):
+ # Issue #25455
+ raw = self.MockRawIO()
+ b = self.tp(raw)
+ with support.swap_attr(raw, 'name', b), support.infinite_recursion(25):
+ with self.assertRaises(RuntimeError):
+ repr(b) # Should not crash
+
+ def test_flush_error_on_close(self):
+ # Test that buffered file is closed despite failed flush
+ # and that flush() is called before file closed.
+ raw = self.MockRawIO()
+ closed = []
+ def bad_flush():
+ closed[:] = [b.closed, raw.closed]
+ raise OSError()
+ raw.flush = bad_flush
+ b = self.tp(raw)
+ self.assertRaises(OSError, b.close) # exception not swallowed
+ self.assertTrue(b.closed)
+ self.assertTrue(raw.closed)
+ self.assertTrue(closed) # flush() called
+ self.assertFalse(closed[0]) # flush() called before file closed
+ self.assertFalse(closed[1])
+ raw.flush = lambda: None # break reference loop
+
+ def test_close_error_on_close(self):
+ raw = self.MockRawIO()
+ def bad_flush():
+ raise OSError('flush')
+ def bad_close():
+ raise OSError('close')
+ raw.close = bad_close
+ b = self.tp(raw)
+ b.flush = bad_flush
+ with self.assertRaises(OSError) as err: # exception not swallowed
+ b.close()
+ self.assertEqual(err.exception.args, ('close',))
+ self.assertIsInstance(err.exception.__context__, OSError)
+ self.assertEqual(err.exception.__context__.args, ('flush',))
+ self.assertFalse(b.closed)
+
+ # Silence destructor error
+ raw.close = lambda: None
+ b.flush = lambda: None
+
+ def test_nonnormalized_close_error_on_close(self):
+ # Issue #21677
+ raw = self.MockRawIO()
+ def bad_flush():
+ raise non_existing_flush
+ def bad_close():
+ raise non_existing_close
+ raw.close = bad_close
+ b = self.tp(raw)
+ b.flush = bad_flush
+ with self.assertRaises(NameError) as err: # exception not swallowed
+ b.close()
+ self.assertIn('non_existing_close', str(err.exception))
+ self.assertIsInstance(err.exception.__context__, NameError)
+ self.assertIn('non_existing_flush', str(err.exception.__context__))
+ self.assertFalse(b.closed)
+
+ # Silence destructor error
+ b.flush = lambda: None
+ raw.close = lambda: None
+
+ def test_multi_close(self):
+ raw = self.MockRawIO()
+ b = self.tp(raw)
+ b.close()
+ b.close()
+ b.close()
+ self.assertRaises(ValueError, b.flush)
+
+ def test_unseekable(self):
+ bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
+ self.assertRaises(self.UnsupportedOperation, bufio.tell)
+ self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
+
+ def test_readonly_attributes(self):
+ raw = self.MockRawIO()
+ buf = self.tp(raw)
+ x = self.MockRawIO()
+ with self.assertRaises(AttributeError):
+ buf.raw = x
+
+ def test_pickling_subclass(self):
+ global MyBufferedIO
+ class MyBufferedIO(self.tp):
+ def __init__(self, raw, tag):
+ super().__init__(raw)
+ self.tag = tag
+ def __getstate__(self):
+ return self.tag, self.raw.getvalue()
+ def __setstate__(slf, state):
+ tag, value = state
+ slf.__init__(self.BytesIO(value), tag)
+
+ raw = self.BytesIO(b'data')
+ buf = MyBufferedIO(raw, tag='ham')
+ for proto in range(pickle.HIGHEST_PROTOCOL + 1):
+ with self.subTest(protocol=proto):
+ pickled = pickle.dumps(buf, proto)
+ newbuf = pickle.loads(pickled)
+ self.assertEqual(newbuf.raw.getvalue(), b'data')
+ self.assertEqual(newbuf.tag, 'ham')
+ del MyBufferedIO
+
+
+class SizeofTest:
+
+ @support.cpython_only
+ def test_sizeof(self):
+ bufsize1 = 4096
+ bufsize2 = 8192
+ rawio = self.MockRawIO()
+ bufio = self.tp(rawio, buffer_size=bufsize1)
+ size = sys.getsizeof(bufio) - bufsize1
+ rawio = self.MockRawIO()
+ bufio = self.tp(rawio, buffer_size=bufsize2)
+ self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
+
+ @support.cpython_only
+ def test_buffer_freeing(self) :
+ bufsize = 4096
+ rawio = self.MockRawIO()
+ bufio = self.tp(rawio, buffer_size=bufsize)
+ size = sys.getsizeof(bufio) - bufsize
+ bufio.close()
+ self.assertEqual(sys.getsizeof(bufio), size)
+
+class BufferedReaderTest(CommonBufferedTests):
+ read_mode = "rb"
+
+ def test_constructor(self):
+ rawio = self.MockRawIO([b"abc"])
+ bufio = self.tp(rawio)
+ bufio.__init__(rawio)
+ bufio.__init__(rawio, buffer_size=1024)
+ bufio.__init__(rawio, buffer_size=16)
+ self.assertEqual(b"abc", bufio.read())
+ self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
+ self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
+ self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
+ rawio = self.MockRawIO([b"abc"])
+ bufio.__init__(rawio)
+ self.assertEqual(b"abc", bufio.read())
+
+ def test_uninitialized(self):
+ bufio = self.tp.__new__(self.tp)
+ del bufio
+ bufio = self.tp.__new__(self.tp)
+ self.assertRaisesRegex((ValueError, AttributeError),
+ 'uninitialized|has no attribute',
+ bufio.read, 0)
+ bufio.__init__(self.MockRawIO())
+ self.assertEqual(bufio.read(0), b'')
+
+ def test_read(self):
+ for arg in (None, 7):
+ rawio = self.MockRawIO((b"abc", b"d", b"efg"))
+ bufio = self.tp(rawio)
+ self.assertEqual(b"abcdefg", bufio.read(arg))
+ # Invalid args
+ self.assertRaises(ValueError, bufio.read, -2)
+
+ def test_read1(self):
+ rawio = self.MockRawIO((b"abc", b"d", b"efg"))
+ bufio = self.tp(rawio)
+ self.assertEqual(b"a", bufio.read(1))
+ self.assertEqual(b"b", bufio.read1(1))
+ self.assertEqual(rawio._reads, 1)
+ self.assertEqual(b"", bufio.read1(0))
+ self.assertEqual(b"c", bufio.read1(100))
+ self.assertEqual(rawio._reads, 1)
+ self.assertEqual(b"d", bufio.read1(100))
+ self.assertEqual(rawio._reads, 2)
+ self.assertEqual(b"efg", bufio.read1(100))
+ self.assertEqual(rawio._reads, 3)
+ self.assertEqual(b"", bufio.read1(100))
+ self.assertEqual(rawio._reads, 4)
+
+ def test_read1_arbitrary(self):
+ rawio = self.MockRawIO((b"abc", b"d", b"efg"))
+ bufio = self.tp(rawio)
+ self.assertEqual(b"a", bufio.read(1))
+ self.assertEqual(b"bc", bufio.read1())
+ self.assertEqual(b"d", bufio.read1())
+ self.assertEqual(b"efg", bufio.read1(-1))
+ self.assertEqual(rawio._reads, 3)
+ self.assertEqual(b"", bufio.read1())
+ self.assertEqual(rawio._reads, 4)
+
+ def test_readinto(self):
+ rawio = self.MockRawIO((b"abc", b"d", b"efg"))
+ bufio = self.tp(rawio)
+ b = bytearray(2)
+ self.assertEqual(bufio.readinto(b), 2)
+ self.assertEqual(b, b"ab")
+ self.assertEqual(bufio.readinto(b), 2)
+ self.assertEqual(b, b"cd")
+ self.assertEqual(bufio.readinto(b), 2)
+ self.assertEqual(b, b"ef")
+ self.assertEqual(bufio.readinto(b), 1)
+ self.assertEqual(b, b"gf")
+ self.assertEqual(bufio.readinto(b), 0)
+ self.assertEqual(b, b"gf")
+ rawio = self.MockRawIO((b"abc", None))
+ bufio = self.tp(rawio)
+ self.assertEqual(bufio.readinto(b), 2)
+ self.assertEqual(b, b"ab")
+ self.assertEqual(bufio.readinto(b), 1)
+ self.assertEqual(b, b"cb")
+
+ def test_readinto1(self):
+ buffer_size = 10
+ rawio = self.MockRawIO((b"abc", b"de", b"fgh", b"jkl"))
+ bufio = self.tp(rawio, buffer_size=buffer_size)
+ b = bytearray(2)
+ self.assertEqual(bufio.peek(3), b'abc')
+ self.assertEqual(rawio._reads, 1)
+ self.assertEqual(bufio.readinto1(b), 2)
+ self.assertEqual(b, b"ab")
+ self.assertEqual(rawio._reads, 1)
+ self.assertEqual(bufio.readinto1(b), 1)
+ self.assertEqual(b[:1], b"c")
+ self.assertEqual(rawio._reads, 1)
+ self.assertEqual(bufio.readinto1(b), 2)
+ self.assertEqual(b, b"de")
+ self.assertEqual(rawio._reads, 2)
+ b = bytearray(2*buffer_size)
+ self.assertEqual(bufio.peek(3), b'fgh')
+ self.assertEqual(rawio._reads, 3)
+ self.assertEqual(bufio.readinto1(b), 6)
+ self.assertEqual(b[:6], b"fghjkl")
+ self.assertEqual(rawio._reads, 4)
+
+ def test_readinto_array(self):
+ buffer_size = 60
+ data = b"a" * 26
+ rawio = self.MockRawIO((data,))
+ bufio = self.tp(rawio, buffer_size=buffer_size)
+
+ # Create an array with element size > 1 byte
+ b = array.array('i', b'x' * 32)
+ assert len(b) != 16
+
+ # Read into it. We should get as many *bytes* as we can fit into b
+ # (which is more than the number of elements)
+ n = bufio.readinto(b)
+ self.assertGreater(n, len(b))
+
+ # Check that old contents of b are preserved
+ bm = memoryview(b).cast('B')
+ self.assertLess(n, len(bm))
+ self.assertEqual(bm[:n], data[:n])
+ self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
+
+ def test_readinto1_array(self):
+ buffer_size = 60
+ data = b"a" * 26
+ rawio = self.MockRawIO((data,))
+ bufio = self.tp(rawio, buffer_size=buffer_size)
+
+ # Create an array with element size > 1 byte
+ b = array.array('i', b'x' * 32)
+ assert len(b) != 16
+
+ # Read into it. We should get as many *bytes* as we can fit into b
+ # (which is more than the number of elements)
+ n = bufio.readinto1(b)
+ self.assertGreater(n, len(b))
+
+ # Check that old contents of b are preserved
+ bm = memoryview(b).cast('B')
+ self.assertLess(n, len(bm))
+ self.assertEqual(bm[:n], data[:n])
+ self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
+
+ def test_readlines(self):
+ def bufio():
+ rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
+ return self.tp(rawio)
+ self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
+ self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
+ self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
+
+ def test_buffering(self):
+ data = b"abcdefghi"
+ dlen = len(data)
+
+ tests = [
+ [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
+ [ 100, [ 3, 3, 3], [ dlen ] ],
+ [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
+ ]
+
+ for bufsize, buf_read_sizes, raw_read_sizes in tests:
+ rawio = self.MockFileIO(data)
+ bufio = self.tp(rawio, buffer_size=bufsize)
+ pos = 0
+ for nbytes in buf_read_sizes:
+ self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
+ pos += nbytes
+ # this is mildly implementation-dependent
+ self.assertEqual(rawio.read_history, raw_read_sizes)
+
+ def test_read_non_blocking(self):
+ # Inject some None's in there to simulate EWOULDBLOCK
+ rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
+ bufio = self.tp(rawio)
+ self.assertEqual(b"abcd", bufio.read(6))
+ self.assertEqual(b"e", bufio.read(1))
+ self.assertEqual(b"fg", bufio.read())
+ self.assertEqual(b"", bufio.peek(1))
+ self.assertIsNone(bufio.read())
+ self.assertEqual(b"", bufio.read())
+
+ rawio = self.MockRawIO((b"a", None, None))
+ self.assertEqual(b"a", rawio.readall())
+ self.assertIsNone(rawio.readall())
+
+ def test_read_past_eof(self):
+ rawio = self.MockRawIO((b"abc", b"d", b"efg"))
+ bufio = self.tp(rawio)
+
+ self.assertEqual(b"abcdefg", bufio.read(9000))
+
+ def test_read_all(self):
+ rawio = self.MockRawIO((b"abc", b"d", b"efg"))
+ bufio = self.tp(rawio)
+
+ self.assertEqual(b"abcdefg", bufio.read())
+
+ @threading_helper.requires_working_threading()
+ @support.requires_resource('cpu')
+ def test_threads(self):
+ try:
+ # Write out many bytes with exactly the same number of 0's,
+ # 1's... 255's. This will help us check that concurrent reading
+ # doesn't duplicate or forget contents.
+ N = 1000
+ l = list(range(256)) * N
+ random.shuffle(l)
+ s = bytes(bytearray(l))
+ with self.open(os_helper.TESTFN, "wb") as f:
+ f.write(s)
+ with self.open(os_helper.TESTFN, self.read_mode, buffering=0) as raw:
+ bufio = self.tp(raw, 8)
+ errors = []
+ results = []
+ def f():
+ try:
+ # Intra-buffer read then buffer-flushing read
+ for n in cycle([1, 19]):
+ s = bufio.read(n)
+ if not s:
+ break
+ # list.append() is atomic
+ results.append(s)
+ except Exception as e:
+ errors.append(e)
+ raise
+ threads = [threading.Thread(target=f) for x in range(20)]
+ with threading_helper.start_threads(threads):
+ time.sleep(0.02) # yield
+ self.assertFalse(errors,
+ "the following exceptions were caught: %r" % errors)
+ s = b''.join(results)
+ for i in range(256):
+ c = bytes(bytearray([i]))
+ self.assertEqual(s.count(c), N)
+ finally:
+ os_helper.unlink(os_helper.TESTFN)
+
+ def test_unseekable(self):
+ bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
+ self.assertRaises(self.UnsupportedOperation, bufio.tell)
+ self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
+ bufio.read(1)
+ self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
+ self.assertRaises(self.UnsupportedOperation, bufio.tell)
+
+ def test_misbehaved_io(self):
+ rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
+ bufio = self.tp(rawio)
+ self.assertRaises(OSError, bufio.seek, 0)
+ self.assertRaises(OSError, bufio.tell)
+
+ # Silence destructor error
+ bufio.close = lambda: None
+
+ def test_no_extraneous_read(self):
+ # Issue #9550; when the raw IO object has satisfied the read request,
+ # we should not issue any additional reads, otherwise it may block
+ # (e.g. socket).
+ bufsize = 16
+ for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
+ rawio = self.MockRawIO([b"x" * n])
+ bufio = self.tp(rawio, bufsize)
+ self.assertEqual(bufio.read(n), b"x" * n)
+ # Simple case: one raw read is enough to satisfy the request.
+ self.assertEqual(rawio._extraneous_reads, 0,
+ "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
+ # A more complex case where two raw reads are needed to satisfy
+ # the request.
+ rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
+ bufio = self.tp(rawio, bufsize)
+ self.assertEqual(bufio.read(n), b"x" * n)
+ self.assertEqual(rawio._extraneous_reads, 0,
+ "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
+
+ def test_read_on_closed(self):
+ # Issue #23796
+ b = self.BufferedReader(self.BytesIO(b"12"))
+ b.read(1)
+ b.close()
+ with self.subTest('peek'):
+ self.assertRaises(ValueError, b.peek)
+ with self.subTest('read1'):
+ self.assertRaises(ValueError, b.read1, 1)
+ with self.subTest('read'):
+ self.assertRaises(ValueError, b.read)
+ with self.subTest('readinto'):
+ self.assertRaises(ValueError, b.readinto, bytearray())
+ with self.subTest('readinto1'):
+ self.assertRaises(ValueError, b.readinto1, bytearray())
+ with self.subTest('flush'):
+ self.assertRaises(ValueError, b.flush)
+ with self.subTest('truncate'):
+ self.assertRaises(ValueError, b.truncate)
+ with self.subTest('seek'):
+ self.assertRaises(ValueError, b.seek, 0)
+
+ def test_truncate_on_read_only(self):
+ rawio = self.MockFileIO(b"abc")
+ bufio = self.tp(rawio)
+ self.assertFalse(bufio.writable())
+ self.assertRaises(self.UnsupportedOperation, bufio.truncate)
+ self.assertRaises(self.UnsupportedOperation, bufio.truncate, 0)
+
+ def test_tell_character_device_file(self):
+ # GH-95782
+ # For the (former) bug in BufferedIO to manifest, the wrapped IO obj
+ # must be able to produce at least 2 bytes.
+ raw = self.MockCharPseudoDevFileIO(b"12")
+ buf = self.tp(raw)
+ self.assertEqual(buf.tell(), 0)
+ self.assertEqual(buf.read(1), b"1")
+ self.assertEqual(buf.tell(), 0)
+
+ def test_seek_character_device_file(self):
+ raw = self.MockCharPseudoDevFileIO(b"12")
+ buf = self.tp(raw)
+ self.assertEqual(buf.seek(0, io.SEEK_CUR), 0)
+ self.assertEqual(buf.seek(1, io.SEEK_SET), 0)
+ self.assertEqual(buf.seek(0, io.SEEK_CUR), 0)
+ self.assertEqual(buf.read(1), b"1")
+
+ # In the C implementation, tell() sets the BufferedIO's abs_pos to 0,
+ # which means that the next seek() could return a negative offset if it
+ # does not sanity-check:
+ self.assertEqual(buf.tell(), 0)
+ self.assertEqual(buf.seek(0, io.SEEK_CUR), 0)
+
+
+class CBufferedReaderTest(BufferedReaderTest, SizeofTest, CTestCase):
+ tp = io.BufferedReader
+
+ def test_initialization(self):
+ rawio = self.MockRawIO([b"abc"])
+ bufio = self.tp(rawio)
+ self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
+ self.assertRaises(ValueError, bufio.read)
+ self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
+ self.assertRaises(ValueError, bufio.read)
+ self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
+ self.assertRaises(ValueError, bufio.read)
+
+ def test_misbehaved_io_read(self):
+ rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
+ bufio = self.tp(rawio)
+ # _pyio.BufferedReader seems to implement reading different, so that
+ # checking this is not so easy.
+ self.assertRaises(OSError, bufio.read, 10)
+
+ def test_garbage_collection(self):
+ # C BufferedReader objects are collected.
+ # The Python version has __del__, so it ends into gc.garbage instead
+ self.addCleanup(os_helper.unlink, os_helper.TESTFN)
+ # Note that using warnings_helper.check_warnings() will keep the
+ # file alive due to the `source` argument to warn(). So, use
+ # catch_warnings() instead.
+ with warnings.catch_warnings():
+ warnings.simplefilter("ignore", ResourceWarning)
+ rawio = self.FileIO(os_helper.TESTFN, "w+b")
+ f = self.tp(rawio)
+ f.f = f
+ wr = weakref.ref(f)
+ del f
+ support.gc_collect()
+ self.assertIsNone(wr(), wr)
+
+ def test_args_error(self):
+ # Issue #17275
+ with self.assertRaisesRegex(TypeError, "BufferedReader"):
+ self.tp(self.BytesIO(), 1024, 1024, 1024)
+
+ def test_bad_readinto_value(self):
+ rawio = self.tp(self.BytesIO(b"12"))
+ rawio.readinto = lambda buf: -1
+ bufio = self.tp(rawio)
+ with self.assertRaises(OSError) as cm:
+ bufio.readline()
+ self.assertIsNone(cm.exception.__cause__)
+
+ def test_bad_readinto_type(self):
+ rawio = self.tp(self.BytesIO(b"12"))
+ rawio.readinto = lambda buf: b''
+ bufio = self.tp(rawio)
+ with self.assertRaises(OSError) as cm:
+ bufio.readline()
+ self.assertIsInstance(cm.exception.__cause__, TypeError)
+
+
+class PyBufferedReaderTest(BufferedReaderTest, PyTestCase):
+ tp = pyio.BufferedReader
+
+
+class BufferedWriterTest(CommonBufferedTests):
+ write_mode = "wb"
+
+ def test_constructor(self):
+ rawio = self.MockRawIO()
+ bufio = self.tp(rawio)
+ bufio.__init__(rawio)
+ bufio.__init__(rawio, buffer_size=1024)
+ bufio.__init__(rawio, buffer_size=16)
+ self.assertEqual(3, bufio.write(b"abc"))
+ bufio.flush()
+ self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
+ self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
+ self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
+ bufio.__init__(rawio)
+ self.assertEqual(3, bufio.write(b"ghi"))
+ bufio.flush()
+ self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
+
+ def test_uninitialized(self):
+ bufio = self.tp.__new__(self.tp)
+ del bufio
+ bufio = self.tp.__new__(self.tp)
+ self.assertRaisesRegex((ValueError, AttributeError),
+ 'uninitialized|has no attribute',
+ bufio.write, b'')
+ bufio.__init__(self.MockRawIO())
+ self.assertEqual(bufio.write(b''), 0)
+
+ def test_detach_flush(self):
+ raw = self.MockRawIO()
+ buf = self.tp(raw)
+ buf.write(b"howdy!")
+ self.assertFalse(raw._write_stack)
+ buf.detach()
+ self.assertEqual(raw._write_stack, [b"howdy!"])
+
+ def test_write(self):
+ # Write to the buffered IO but don't overflow the buffer.
+ writer = self.MockRawIO()
+ bufio = self.tp(writer, 8)
+ bufio.write(b"abc")
+ self.assertFalse(writer._write_stack)
+ buffer = bytearray(b"def")
+ bufio.write(buffer)
+ buffer[:] = b"***" # Overwrite our copy of the data
+ bufio.flush()
+ self.assertEqual(b"".join(writer._write_stack), b"abcdef")
+
+ def test_write_overflow(self):
+ writer = self.MockRawIO()
+ bufio = self.tp(writer, 8)
+ contents = b"abcdefghijklmnop"
+ for n in range(0, len(contents), 3):
+ bufio.write(contents[n:n+3])
+ flushed = b"".join(writer._write_stack)
+ # At least (total - 8) bytes were implicitly flushed, perhaps more
+ # depending on the implementation.
+ self.assertStartsWith(flushed, contents[:-8])
+
+ def check_writes(self, intermediate_func):
+ # Lots of writes, test the flushed output is as expected.
+ contents = bytes(range(256)) * 1000
+ n = 0
+ writer = self.MockRawIO()
+ bufio = self.tp(writer, 13)
+ # Generator of write sizes: repeat each N 15 times then proceed to N+1
+ def gen_sizes():
+ for size in count(1):
+ for i in range(15):
+ yield size
+ sizes = gen_sizes()
+ while n < len(contents):
+ size = min(next(sizes), len(contents) - n)
+ self.assertEqual(bufio.write(contents[n:n+size]), size)
+ intermediate_func(bufio)
+ n += size
+ bufio.flush()
+ self.assertEqual(contents, b"".join(writer._write_stack))
+
+ def test_writes(self):
+ self.check_writes(lambda bufio: None)
+
+ def test_writes_and_flushes(self):
+ self.check_writes(lambda bufio: bufio.flush())
+
+ def test_writes_and_seeks(self):
+ def _seekabs(bufio):
+ pos = bufio.tell()
+ bufio.seek(pos + 1, 0)
+ bufio.seek(pos - 1, 0)
+ bufio.seek(pos, 0)
+ self.check_writes(_seekabs)
+ def _seekrel(bufio):
+ pos = bufio.seek(0, 1)
+ bufio.seek(+1, 1)
+ bufio.seek(-1, 1)
+ bufio.seek(pos, 0)
+ self.check_writes(_seekrel)
+
+ def test_writes_and_truncates(self):
+ self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
+
+ def test_write_non_blocking(self):
+ raw = self.MockNonBlockWriterIO()
+ bufio = self.tp(raw, 8)
+
+ self.assertEqual(bufio.write(b"abcd"), 4)
+ self.assertEqual(bufio.write(b"efghi"), 5)
+ # 1 byte will be written, the rest will be buffered
+ raw.block_on(b"k")
+ self.assertEqual(bufio.write(b"jklmn"), 5)
+
+ # 8 bytes will be written, 8 will be buffered and the rest will be lost
+ raw.block_on(b"0")
+ try:
+ bufio.write(b"opqrwxyz0123456789")
+ except self.BlockingIOError as e:
+ written = e.characters_written
+ else:
+ self.fail("BlockingIOError should have been raised")
+ self.assertEqual(written, 16)
+ self.assertEqual(raw.pop_written(),
+ b"abcdefghijklmnopqrwxyz")
+
+ self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
+ s = raw.pop_written()
+ # Previously buffered bytes were flushed
+ self.assertStartsWith(s, b"01234567A")
+
+ def test_write_and_rewind(self):
+ raw = self.BytesIO()
+ bufio = self.tp(raw, 4)
+ self.assertEqual(bufio.write(b"abcdef"), 6)
+ self.assertEqual(bufio.tell(), 6)
+ bufio.seek(0, 0)
+ self.assertEqual(bufio.write(b"XY"), 2)
+ bufio.seek(6, 0)
+ self.assertEqual(raw.getvalue(), b"XYcdef")
+ self.assertEqual(bufio.write(b"123456"), 6)
+ bufio.flush()
+ self.assertEqual(raw.getvalue(), b"XYcdef123456")
+
+ def test_flush(self):
+ writer = self.MockRawIO()
+ bufio = self.tp(writer, 8)
+ bufio.write(b"abc")
+ bufio.flush()
+ self.assertEqual(b"abc", writer._write_stack[0])
+
+ def test_writelines(self):
+ l = [b'ab', b'cd', b'ef']
+ writer = self.MockRawIO()
+ bufio = self.tp(writer, 8)
+ bufio.writelines(l)
+ bufio.flush()
+ self.assertEqual(b''.join(writer._write_stack), b'abcdef')
+
+ def test_writelines_userlist(self):
+ l = UserList([b'ab', b'cd', b'ef'])
+ writer = self.MockRawIO()
+ bufio = self.tp(writer, 8)
+ bufio.writelines(l)
+ bufio.flush()
+ self.assertEqual(b''.join(writer._write_stack), b'abcdef')
+
+ def test_writelines_error(self):
+ writer = self.MockRawIO()
+ bufio = self.tp(writer, 8)
+ self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
+ self.assertRaises(TypeError, bufio.writelines, None)
+ self.assertRaises(TypeError, bufio.writelines, 'abc')
+
+ def test_destructor(self):
+ writer = self.MockRawIO()
+ bufio = self.tp(writer, 8)
+ bufio.write(b"abc")
+ del bufio
+ support.gc_collect()
+ self.assertEqual(b"abc", writer._write_stack[0])
+
+ def test_truncate(self):
+ # Truncate implicitly flushes the buffer.
+ self.addCleanup(os_helper.unlink, os_helper.TESTFN)
+ with self.open(os_helper.TESTFN, self.write_mode, buffering=0) as raw:
+ bufio = self.tp(raw, 8)
+ bufio.write(b"abcdef")
+ self.assertEqual(bufio.truncate(3), 3)
+ self.assertEqual(bufio.tell(), 6)
+ with self.open(os_helper.TESTFN, "rb", buffering=0) as f:
+ self.assertEqual(f.read(), b"abc")
+
+ def test_truncate_after_write(self):
+ # Ensure that truncate preserves the file position after
+ # writes longer than the buffer size.
+ # Issue: https://bugs.python.org/issue32228
+ self.addCleanup(os_helper.unlink, os_helper.TESTFN)
+ with self.open(os_helper.TESTFN, "wb") as f:
+ # Fill with some buffer
+ f.write(b'\x00' * 10000)
+ buffer_sizes = [8192, 4096, 200]
+ for buffer_size in buffer_sizes:
+ with self.open(os_helper.TESTFN, "r+b", buffering=buffer_size) as f:
+ f.write(b'\x00' * (buffer_size + 1))
+ # After write write_pos and write_end are set to 0
+ f.read(1)
+ # read operation makes sure that pos != raw_pos
+ f.truncate()
+ self.assertEqual(f.tell(), buffer_size + 2)
+
+ @threading_helper.requires_working_threading()
+ @support.requires_resource('cpu')
+ def test_threads(self):
+ try:
+ # Write out many bytes from many threads and test they were
+ # all flushed.
+ N = 1000
+ contents = bytes(range(256)) * N
+ sizes = cycle([1, 19])
+ n = 0
+ queue = deque()
+ while n < len(contents):
+ size = next(sizes)
+ queue.append(contents[n:n+size])
+ n += size
+ del contents
+ # We use a real file object because it allows us to
+ # exercise situations where the GIL is released before
+ # writing the buffer to the raw streams. This is in addition
+ # to concurrency issues due to switching threads in the middle
+ # of Python code.
+ with self.open(os_helper.TESTFN, self.write_mode, buffering=0) as raw:
+ bufio = self.tp(raw, 8)
+ errors = []
+ def f():
+ try:
+ while True:
+ try:
+ s = queue.popleft()
+ except IndexError:
+ return
+ bufio.write(s)
+ except Exception as e:
+ errors.append(e)
+ raise
+ threads = [threading.Thread(target=f) for x in range(20)]
+ with threading_helper.start_threads(threads):
+ time.sleep(0.02) # yield
+ self.assertFalse(errors,
+ "the following exceptions were caught: %r" % errors)
+ bufio.close()
+ with self.open(os_helper.TESTFN, "rb") as f:
+ s = f.read()
+ for i in range(256):
+ self.assertEqual(s.count(bytes([i])), N)
+ finally:
+ os_helper.unlink(os_helper.TESTFN)
+
+ def test_misbehaved_io(self):
+ rawio = self.MisbehavedRawIO()
+ bufio = self.tp(rawio, 5)
+ self.assertRaises(OSError, bufio.seek, 0)
+ self.assertRaises(OSError, bufio.tell)
+ self.assertRaises(OSError, bufio.write, b"abcdef")
+
+ # Silence destructor error
+ bufio.close = lambda: None
+
+ def test_max_buffer_size_removal(self):
+ with self.assertRaises(TypeError):
+ self.tp(self.MockRawIO(), 8, 12)
+
+ def test_write_error_on_close(self):
+ raw = self.MockRawIO()
+ def bad_write(b):
+ raise OSError()
+ raw.write = bad_write
+ b = self.tp(raw)
+ b.write(b'spam')
+ self.assertRaises(OSError, b.close) # exception not swallowed
+ self.assertTrue(b.closed)
+
+ @threading_helper.requires_working_threading()
+ def test_slow_close_from_thread(self):
+ # Issue #31976
+ rawio = self.SlowFlushRawIO()
+ bufio = self.tp(rawio, 8)
+ t = threading.Thread(target=bufio.close)
+ t.start()
+ rawio.in_flush.wait()
+ self.assertRaises(ValueError, bufio.write, b'spam')
+ self.assertTrue(bufio.closed)
+ t.join()
+
+
+class CBufferedWriterTest(BufferedWriterTest, SizeofTest, CTestCase):
+ tp = io.BufferedWriter
+
+ def test_initialization(self):
+ rawio = self.MockRawIO()
+ bufio = self.tp(rawio)
+ self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
+ self.assertRaises(ValueError, bufio.write, b"def")
+ self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
+ self.assertRaises(ValueError, bufio.write, b"def")
+ self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
+ self.assertRaises(ValueError, bufio.write, b"def")
+
+ def test_garbage_collection(self):
+ # C BufferedWriter objects are collected, and collecting them flushes
+ # all data to disk.
+ # The Python version has __del__, so it ends into gc.garbage instead
+ self.addCleanup(os_helper.unlink, os_helper.TESTFN)
+ # Note that using warnings_helper.check_warnings() will keep the
+ # file alive due to the `source` argument to warn(). So, use
+ # catch_warnings() instead.
+ with warnings.catch_warnings():
+ warnings.simplefilter("ignore", ResourceWarning)
+ rawio = self.FileIO(os_helper.TESTFN, "w+b")
+ f = self.tp(rawio)
+ f.write(b"123xxx")
+ f.x = f
+ wr = weakref.ref(f)
+ del f
+ support.gc_collect()
+ self.assertIsNone(wr(), wr)
+ with self.open(os_helper.TESTFN, "rb") as f:
+ self.assertEqual(f.read(), b"123xxx")
+
+ def test_args_error(self):
+ # Issue #17275
+ with self.assertRaisesRegex(TypeError, "BufferedWriter"):
+ self.tp(self.BytesIO(), 1024, 1024, 1024)
+
+
+class PyBufferedWriterTest(BufferedWriterTest, PyTestCase):
+ tp = pyio.BufferedWriter
+
+class BufferedRWPairTest:
+
+ def test_constructor(self):
+ pair = self.tp(self.MockRawIO(), self.MockRawIO())
+ self.assertFalse(pair.closed)
+
+ def test_uninitialized(self):
+ pair = self.tp.__new__(self.tp)
+ del pair
+ pair = self.tp.__new__(self.tp)
+ self.assertRaisesRegex((ValueError, AttributeError),
+ 'uninitialized|has no attribute',
+ pair.read, 0)
+ self.assertRaisesRegex((ValueError, AttributeError),
+ 'uninitialized|has no attribute',
+ pair.write, b'')
+ pair.__init__(self.MockRawIO(), self.MockRawIO())
+ self.assertEqual(pair.read(0), b'')
+ self.assertEqual(pair.write(b''), 0)
+
+ def test_detach(self):
+ pair = self.tp(self.MockRawIO(), self.MockRawIO())
+ self.assertRaises(self.UnsupportedOperation, pair.detach)
+
+ def test_constructor_max_buffer_size_removal(self):
+ with self.assertRaises(TypeError):
+ self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
+
+ def test_constructor_with_not_readable(self):
+ class NotReadable(self.MockRawIO):
+ def readable(self):
+ return False
+
+ self.assertRaises(OSError, self.tp, NotReadable(), self.MockRawIO())
+
+ def test_constructor_with_not_writeable(self):
+ class NotWriteable(self.MockRawIO):
+ def writable(self):
+ return False
+
+ self.assertRaises(OSError, self.tp, self.MockRawIO(), NotWriteable())
+
+ def test_read(self):
+ pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
+
+ self.assertEqual(pair.read(3), b"abc")
+ self.assertEqual(pair.read(1), b"d")
+ self.assertEqual(pair.read(), b"ef")
+ pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
+ self.assertEqual(pair.read(None), b"abc")
+
+ def test_readlines(self):
+ pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
+ self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
+ self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
+ self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
+
+ def test_read1(self):
+ # .read1() is delegated to the underlying reader object, so this test
+ # can be shallow.
+ pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
+
+ self.assertEqual(pair.read1(3), b"abc")
+ self.assertEqual(pair.read1(), b"def")
+
+ def test_readinto(self):
+ for method in ("readinto", "readinto1"):
+ with self.subTest(method):
+ pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
+
+ data = byteslike(b'\0' * 5)
+ self.assertEqual(getattr(pair, method)(data), 5)
+ self.assertEqual(bytes(data), b"abcde")
+
+ # gh-138720: C BufferedRWPair would destruct in a bad order resulting in
+ # an unraisable exception.
+ support.gc_collect()
+
+ def test_write(self):
+ w = self.MockRawIO()
+ pair = self.tp(self.MockRawIO(), w)
+
+ pair.write(b"abc")
+ pair.flush()
+ buffer = bytearray(b"def")
+ pair.write(buffer)
+ buffer[:] = b"***" # Overwrite our copy of the data
+ pair.flush()
+ self.assertEqual(w._write_stack, [b"abc", b"def"])
+
+ def test_peek(self):
+ pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
+
+ self.assertStartsWith(pair.peek(3), b"abc")
+ self.assertEqual(pair.read(3), b"abc")
+
+ def test_readable(self):
+ pair = self.tp(self.MockRawIO(), self.MockRawIO())
+ self.assertTrue(pair.readable())
+
+ def test_writeable(self):
+ pair = self.tp(self.MockRawIO(), self.MockRawIO())
+ self.assertTrue(pair.writable())
+
+ def test_seekable(self):
+ # BufferedRWPairs are never seekable, even if their readers and writers
+ # are.
+ pair = self.tp(self.MockRawIO(), self.MockRawIO())
+ self.assertFalse(pair.seekable())
+
+ # .flush() is delegated to the underlying writer object and has been
+ # tested in the test_write method.
+
+ def test_close_and_closed(self):
+ pair = self.tp(self.MockRawIO(), self.MockRawIO())
+ self.assertFalse(pair.closed)
+ pair.close()
+ self.assertTrue(pair.closed)
+
+ def test_reader_close_error_on_close(self):
+ def reader_close():
+ reader_non_existing
+ reader = self.MockRawIO()
+ reader.close = reader_close
+ writer = self.MockRawIO()
+ pair = self.tp(reader, writer)
+ with self.assertRaises(NameError) as err:
+ pair.close()
+ self.assertIn('reader_non_existing', str(err.exception))
+ self.assertTrue(pair.closed)
+ self.assertFalse(reader.closed)
+ self.assertTrue(writer.closed)
+
+ # Silence destructor error
+ reader.close = lambda: None
+
+ def test_writer_close_error_on_close(self):
+ def writer_close():
+ writer_non_existing
+ reader = self.MockRawIO()
+ writer = self.MockRawIO()
+ writer.close = writer_close
+ pair = self.tp(reader, writer)
+ with self.assertRaises(NameError) as err:
+ pair.close()
+ self.assertIn('writer_non_existing', str(err.exception))
+ self.assertFalse(pair.closed)
+ self.assertTrue(reader.closed)
+ self.assertFalse(writer.closed)
+
+ # Silence destructor error
+ writer.close = lambda: None
+ writer = None
+
+ # Ignore BufferedWriter (of the BufferedRWPair) unraisable exception
+ with support.catch_unraisable_exception():
+ # Ignore BufferedRWPair unraisable exception
+ with support.catch_unraisable_exception():
+ pair = None
+ support.gc_collect()
+ support.gc_collect()
+
+ def test_reader_writer_close_error_on_close(self):
+ def reader_close():
+ reader_non_existing
+ def writer_close():
+ writer_non_existing
+ reader = self.MockRawIO()
+ reader.close = reader_close
+ writer = self.MockRawIO()
+ writer.close = writer_close
+ pair = self.tp(reader, writer)
+ with self.assertRaises(NameError) as err:
+ pair.close()
+ self.assertIn('reader_non_existing', str(err.exception))
+ self.assertIsInstance(err.exception.__context__, NameError)
+ self.assertIn('writer_non_existing', str(err.exception.__context__))
+ self.assertFalse(pair.closed)
+ self.assertFalse(reader.closed)
+ self.assertFalse(writer.closed)
+
+ # Silence destructor error
+ reader.close = lambda: None
+ writer.close = lambda: None
+
+ def test_isatty(self):
+ class SelectableIsAtty(self.MockRawIO):
+ def __init__(self, isatty):
+ super().__init__()
+ self._isatty = isatty
+
+ def isatty(self):
+ return self._isatty
+
+ pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
+ self.assertFalse(pair.isatty())
+
+ pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
+ self.assertTrue(pair.isatty())
+
+ pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
+ self.assertTrue(pair.isatty())
+
+ pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
+ self.assertTrue(pair.isatty())
+
+ def test_weakref_clearing(self):
+ brw = self.tp(self.MockRawIO(), self.MockRawIO())
+ ref = weakref.ref(brw)
+ brw = None
+ ref = None # Shouldn't segfault.
+
+class CBufferedRWPairTest(BufferedRWPairTest, CTestCase):
+ tp = io.BufferedRWPair
+
+class PyBufferedRWPairTest(BufferedRWPairTest, PyTestCase):
+ tp = pyio.BufferedRWPair
+
+
+class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
+ read_mode = "rb+"
+ write_mode = "wb+"
+
+ def test_constructor(self):
+ BufferedReaderTest.test_constructor(self)
+ BufferedWriterTest.test_constructor(self)
+
+ def test_uninitialized(self):
+ BufferedReaderTest.test_uninitialized(self)
+ BufferedWriterTest.test_uninitialized(self)
+
+ def test_read_and_write(self):
+ raw = self.MockRawIO((b"asdf", b"ghjk"))
+ rw = self.tp(raw, 8)
+
+ self.assertEqual(b"as", rw.read(2))
+ rw.write(b"ddd")
+ rw.write(b"eee")
+ self.assertFalse(raw._write_stack) # Buffer writes
+ self.assertEqual(b"ghjk", rw.read())
+ self.assertEqual(b"dddeee", raw._write_stack[0])
+
+ def test_seek_and_tell(self):
+ raw = self.BytesIO(b"asdfghjkl")
+ rw = self.tp(raw)
+
+ self.assertEqual(b"as", rw.read(2))
+ self.assertEqual(2, rw.tell())
+ rw.seek(0, 0)
+ self.assertEqual(b"asdf", rw.read(4))
+
+ rw.write(b"123f")
+ rw.seek(0, 0)
+ self.assertEqual(b"asdf123fl", rw.read())
+ self.assertEqual(9, rw.tell())
+ rw.seek(-4, 2)
+ self.assertEqual(5, rw.tell())
+ rw.seek(2, 1)
+ self.assertEqual(7, rw.tell())
+ self.assertEqual(b"fl", rw.read(11))
+ rw.flush()
+ self.assertEqual(b"asdf123fl", raw.getvalue())
+
+ self.assertRaises(TypeError, rw.seek, 0.0)
+
+ def check_flush_and_read(self, read_func):
+ raw = self.BytesIO(b"abcdefghi")
+ bufio = self.tp(raw)
+
+ self.assertEqual(b"ab", read_func(bufio, 2))
+ bufio.write(b"12")
+ self.assertEqual(b"ef", read_func(bufio, 2))
+ self.assertEqual(6, bufio.tell())
+ bufio.flush()
+ self.assertEqual(6, bufio.tell())
+ self.assertEqual(b"ghi", read_func(bufio))
+ raw.seek(0, 0)
+ raw.write(b"XYZ")
+ # flush() resets the read buffer
+ bufio.flush()
+ bufio.seek(0, 0)
+ self.assertEqual(b"XYZ", read_func(bufio, 3))
+
+ def test_flush_and_read(self):
+ self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
+
+ def test_flush_and_readinto(self):
+ def _readinto(bufio, n=-1):
+ b = bytearray(n if n >= 0 else 9999)
+ n = bufio.readinto(b)
+ return bytes(b[:n])
+ self.check_flush_and_read(_readinto)
+
+ def test_flush_and_peek(self):
+ def _peek(bufio, n=-1):
+ # This relies on the fact that the buffer can contain the whole
+ # raw stream, otherwise peek() can return less.
+ b = bufio.peek(n)
+ if n != -1:
+ b = b[:n]
+ bufio.seek(len(b), 1)
+ return b
+ self.check_flush_and_read(_peek)
+
+ def test_flush_and_write(self):
+ raw = self.BytesIO(b"abcdefghi")
+ bufio = self.tp(raw)
+
+ bufio.write(b"123")
+ bufio.flush()
+ bufio.write(b"45")
+ bufio.flush()
+ bufio.seek(0, 0)
+ self.assertEqual(b"12345fghi", raw.getvalue())
+ self.assertEqual(b"12345fghi", bufio.read())
+
+ def test_threads(self):
+ BufferedReaderTest.test_threads(self)
+ BufferedWriterTest.test_threads(self)
+
+ def test_writes_and_peek(self):
+ def _peek(bufio):
+ bufio.peek(1)
+ self.check_writes(_peek)
+ def _peek(bufio):
+ pos = bufio.tell()
+ bufio.seek(-1, 1)
+ bufio.peek(1)
+ bufio.seek(pos, 0)
+ self.check_writes(_peek)
+
+ def test_writes_and_reads(self):
+ def _read(bufio):
+ bufio.seek(-1, 1)
+ bufio.read(1)
+ self.check_writes(_read)
+
+ def test_writes_and_read1s(self):
+ def _read1(bufio):
+ bufio.seek(-1, 1)
+ bufio.read1(1)
+ self.check_writes(_read1)
+
+ def test_writes_and_readintos(self):
+ def _read(bufio):
+ bufio.seek(-1, 1)
+ bufio.readinto(bytearray(1))
+ self.check_writes(_read)
+
+ def test_write_after_readahead(self):
+ # Issue #6629: writing after the buffer was filled by readahead should
+ # first rewind the raw stream.
+ for overwrite_size in [1, 5]:
+ raw = self.BytesIO(b"A" * 10)
+ bufio = self.tp(raw, 4)
+ # Trigger readahead
+ self.assertEqual(bufio.read(1), b"A")
+ self.assertEqual(bufio.tell(), 1)
+ # Overwriting should rewind the raw stream if it needs so
+ bufio.write(b"B" * overwrite_size)
+ self.assertEqual(bufio.tell(), overwrite_size + 1)
+ # If the write size was smaller than the buffer size, flush() and
+ # check that rewind happens.
+ bufio.flush()
+ self.assertEqual(bufio.tell(), overwrite_size + 1)
+ s = raw.getvalue()
+ self.assertEqual(s,
+ b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
+
+ def test_write_rewind_write(self):
+ # Various combinations of reading / writing / seeking backwards / writing again
+ def mutate(bufio, pos1, pos2):
+ assert pos2 >= pos1
+ # Fill the buffer
+ bufio.seek(pos1)
+ bufio.read(pos2 - pos1)
+ bufio.write(b'\x02')
+ # This writes earlier than the previous write, but still inside
+ # the buffer.
+ bufio.seek(pos1)
+ bufio.write(b'\x01')
+
+ b = b"\x80\x81\x82\x83\x84"
+ for i in range(0, len(b)):
+ for j in range(i, len(b)):
+ raw = self.BytesIO(b)
+ bufio = self.tp(raw, 100)
+ mutate(bufio, i, j)
+ bufio.flush()
+ expected = bytearray(b)
+ expected[j] = 2
+ expected[i] = 1
+ self.assertEqual(raw.getvalue(), expected,
+ "failed result for i=%d, j=%d" % (i, j))
+
+ def test_truncate_after_read_or_write(self):
+ raw = self.BytesIO(b"A" * 10)
+ bufio = self.tp(raw, 100)
+ self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
+ self.assertEqual(bufio.truncate(), 2)
+ self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
+ self.assertEqual(bufio.truncate(), 4)
+
+ def test_misbehaved_io(self):
+ BufferedReaderTest.test_misbehaved_io(self)
+ BufferedWriterTest.test_misbehaved_io(self)
+
+ def test_interleaved_read_write(self):
+ # Test for issue #12213
+ with self.BytesIO(b'abcdefgh') as raw:
+ with self.tp(raw, 100) as f:
+ f.write(b"1")
+ self.assertEqual(f.read(1), b'b')
+ f.write(b'2')
+ self.assertEqual(f.read1(1), b'd')
+ f.write(b'3')
+ buf = bytearray(1)
+ f.readinto(buf)
+ self.assertEqual(buf, b'f')
+ f.write(b'4')
+ self.assertEqual(f.peek(1), b'h')
+ f.flush()
+ self.assertEqual(raw.getvalue(), b'1b2d3f4h')
+
+ with self.BytesIO(b'abc') as raw:
+ with self.tp(raw, 100) as f:
+ self.assertEqual(f.read(1), b'a')
+ f.write(b"2")
+ self.assertEqual(f.read(1), b'c')
+ f.flush()
+ self.assertEqual(raw.getvalue(), b'a2c')
+
+ def test_read1_after_write(self):
+ with self.BytesIO(b'abcdef') as raw:
+ with self.tp(raw, 3) as f:
+ f.write(b"1")
+ self.assertEqual(f.read1(1), b'b')
+ f.flush()
+ self.assertEqual(raw.getvalue(), b'1bcdef')
+ with self.BytesIO(b'abcdef') as raw:
+ with self.tp(raw, 3) as f:
+ f.write(b"1")
+ self.assertEqual(f.read1(), b'bcd')
+ f.flush()
+ self.assertEqual(raw.getvalue(), b'1bcdef')
+ with self.BytesIO(b'abcdef') as raw:
+ with self.tp(raw, 3) as f:
+ f.write(b"1")
+ # XXX: read(100) returns different numbers of bytes
+ # in Python and C implementations.
+ self.assertEqual(f.read1(100)[:3], b'bcd')
+ f.flush()
+ self.assertEqual(raw.getvalue(), b'1bcdef')
+
+ def test_interleaved_readline_write(self):
+ with self.BytesIO(b'ab\ncdef\ng\n') as raw:
+ with self.tp(raw) as f:
+ f.write(b'1')
+ self.assertEqual(f.readline(), b'b\n')
+ f.write(b'2')
+ self.assertEqual(f.readline(), b'def\n')
+ f.write(b'3')
+ self.assertEqual(f.readline(), b'\n')
+ f.flush()
+ self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
+
+ # You can't construct a BufferedRandom over a non-seekable stream.
+ test_unseekable = None
+
+ # writable() returns True, so there's no point to test it over
+ # a writable stream.
+ test_truncate_on_read_only = None
+
+
+class CBufferedRandomTest(BufferedRandomTest, SizeofTest, CTestCase):
+ tp = io.BufferedRandom
+
+ def test_garbage_collection(self):
+ CBufferedReaderTest.test_garbage_collection(self)
+ CBufferedWriterTest.test_garbage_collection(self)
+
+ def test_args_error(self):
+ # Issue #17275
+ with self.assertRaisesRegex(TypeError, "BufferedRandom"):
+ self.tp(self.BytesIO(), 1024, 1024, 1024)
+
+
+class PyBufferedRandomTest(BufferedRandomTest, PyTestCase):
+ tp = pyio.BufferedRandom
+
+
# Simple test to ensure that optimizations in the IO library deliver the
# expected results. For best testing, run this under a debug-build Python too
# (to exercise asserts in the C code).
import errno
import os
import pickle
-import random
import sys
import textwrap
import threading
-import time
import unittest
import warnings
import weakref
-from collections import deque, UserList
-from itertools import cycle, count
from test import support
from test.support.script_helper import (
assert_python_ok, assert_python_failure, run_python_until_end)
self.assertEqual(mismatch, set(), msg='C RawIOBase does not have all Python RawIOBase methods')
-class CommonBufferedTests:
- # Tests common to BufferedReader, BufferedWriter and BufferedRandom
-
- def test_detach(self):
- raw = self.MockRawIO()
- buf = self.tp(raw)
- self.assertIs(buf.detach(), raw)
- self.assertRaises(ValueError, buf.detach)
-
- repr(buf) # Should still work
-
- def test_fileno(self):
- rawio = self.MockRawIO()
- bufio = self.tp(rawio)
-
- self.assertEqual(42, bufio.fileno())
-
- def test_invalid_args(self):
- rawio = self.MockRawIO()
- bufio = self.tp(rawio)
- # Invalid whence
- self.assertRaises(ValueError, bufio.seek, 0, -1)
- self.assertRaises(ValueError, bufio.seek, 0, 9)
-
- def test_override_destructor(self):
- tp = self.tp
- record = []
- class MyBufferedIO(tp):
- def __del__(self):
- record.append(1)
- try:
- f = super().__del__
- except AttributeError:
- pass
- else:
- f()
- def close(self):
- record.append(2)
- super().close()
- def flush(self):
- record.append(3)
- super().flush()
- rawio = self.MockRawIO()
- bufio = MyBufferedIO(rawio)
- del bufio
- support.gc_collect()
- self.assertEqual(record, [1, 2, 3])
-
- def test_context_manager(self):
- # Test usability as a context manager
- rawio = self.MockRawIO()
- bufio = self.tp(rawio)
- def _with():
- with bufio:
- pass
- _with()
- # bufio should now be closed, and using it a second time should raise
- # a ValueError.
- self.assertRaises(ValueError, _with)
-
- def test_error_through_destructor(self):
- # Test that the exception state is not modified by a destructor,
- # even if close() fails.
- rawio = self.CloseFailureIO()
- with support.catch_unraisable_exception() as cm:
- with self.assertRaises(AttributeError):
- self.tp(rawio).xyzzy
-
- self.assertEqual(cm.unraisable.exc_type, OSError)
-
- def test_repr(self):
- raw = self.MockRawIO()
- b = self.tp(raw)
- clsname = r"(%s\.)?%s" % (self.tp.__module__, self.tp.__qualname__)
- self.assertRegex(repr(b), "<%s>" % clsname)
- raw.name = "dummy"
- self.assertRegex(repr(b), "<%s name='dummy'>" % clsname)
- raw.name = b"dummy"
- self.assertRegex(repr(b), "<%s name=b'dummy'>" % clsname)
-
- def test_recursive_repr(self):
- # Issue #25455
- raw = self.MockRawIO()
- b = self.tp(raw)
- with support.swap_attr(raw, 'name', b), support.infinite_recursion(25):
- with self.assertRaises(RuntimeError):
- repr(b) # Should not crash
-
- def test_flush_error_on_close(self):
- # Test that buffered file is closed despite failed flush
- # and that flush() is called before file closed.
- raw = self.MockRawIO()
- closed = []
- def bad_flush():
- closed[:] = [b.closed, raw.closed]
- raise OSError()
- raw.flush = bad_flush
- b = self.tp(raw)
- self.assertRaises(OSError, b.close) # exception not swallowed
- self.assertTrue(b.closed)
- self.assertTrue(raw.closed)
- self.assertTrue(closed) # flush() called
- self.assertFalse(closed[0]) # flush() called before file closed
- self.assertFalse(closed[1])
- raw.flush = lambda: None # break reference loop
-
- def test_close_error_on_close(self):
- raw = self.MockRawIO()
- def bad_flush():
- raise OSError('flush')
- def bad_close():
- raise OSError('close')
- raw.close = bad_close
- b = self.tp(raw)
- b.flush = bad_flush
- with self.assertRaises(OSError) as err: # exception not swallowed
- b.close()
- self.assertEqual(err.exception.args, ('close',))
- self.assertIsInstance(err.exception.__context__, OSError)
- self.assertEqual(err.exception.__context__.args, ('flush',))
- self.assertFalse(b.closed)
-
- # Silence destructor error
- raw.close = lambda: None
- b.flush = lambda: None
-
- def test_nonnormalized_close_error_on_close(self):
- # Issue #21677
- raw = self.MockRawIO()
- def bad_flush():
- raise non_existing_flush
- def bad_close():
- raise non_existing_close
- raw.close = bad_close
- b = self.tp(raw)
- b.flush = bad_flush
- with self.assertRaises(NameError) as err: # exception not swallowed
- b.close()
- self.assertIn('non_existing_close', str(err.exception))
- self.assertIsInstance(err.exception.__context__, NameError)
- self.assertIn('non_existing_flush', str(err.exception.__context__))
- self.assertFalse(b.closed)
-
- # Silence destructor error
- b.flush = lambda: None
- raw.close = lambda: None
-
- def test_multi_close(self):
- raw = self.MockRawIO()
- b = self.tp(raw)
- b.close()
- b.close()
- b.close()
- self.assertRaises(ValueError, b.flush)
-
- def test_unseekable(self):
- bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
- self.assertRaises(self.UnsupportedOperation, bufio.tell)
- self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
-
- def test_readonly_attributes(self):
- raw = self.MockRawIO()
- buf = self.tp(raw)
- x = self.MockRawIO()
- with self.assertRaises(AttributeError):
- buf.raw = x
-
- def test_pickling_subclass(self):
- global MyBufferedIO
- class MyBufferedIO(self.tp):
- def __init__(self, raw, tag):
- super().__init__(raw)
- self.tag = tag
- def __getstate__(self):
- return self.tag, self.raw.getvalue()
- def __setstate__(slf, state):
- tag, value = state
- slf.__init__(self.BytesIO(value), tag)
-
- raw = self.BytesIO(b'data')
- buf = MyBufferedIO(raw, tag='ham')
- for proto in range(pickle.HIGHEST_PROTOCOL + 1):
- with self.subTest(protocol=proto):
- pickled = pickle.dumps(buf, proto)
- newbuf = pickle.loads(pickled)
- self.assertEqual(newbuf.raw.getvalue(), b'data')
- self.assertEqual(newbuf.tag, 'ham')
- del MyBufferedIO
-
-
-class SizeofTest:
-
- @support.cpython_only
- def test_sizeof(self):
- bufsize1 = 4096
- bufsize2 = 8192
- rawio = self.MockRawIO()
- bufio = self.tp(rawio, buffer_size=bufsize1)
- size = sys.getsizeof(bufio) - bufsize1
- rawio = self.MockRawIO()
- bufio = self.tp(rawio, buffer_size=bufsize2)
- self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
-
- @support.cpython_only
- def test_buffer_freeing(self) :
- bufsize = 4096
- rawio = self.MockRawIO()
- bufio = self.tp(rawio, buffer_size=bufsize)
- size = sys.getsizeof(bufio) - bufsize
- bufio.close()
- self.assertEqual(sys.getsizeof(bufio), size)
-
-class BufferedReaderTest(CommonBufferedTests):
- read_mode = "rb"
-
- def test_constructor(self):
- rawio = self.MockRawIO([b"abc"])
- bufio = self.tp(rawio)
- bufio.__init__(rawio)
- bufio.__init__(rawio, buffer_size=1024)
- bufio.__init__(rawio, buffer_size=16)
- self.assertEqual(b"abc", bufio.read())
- self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
- self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
- self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
- rawio = self.MockRawIO([b"abc"])
- bufio.__init__(rawio)
- self.assertEqual(b"abc", bufio.read())
-
- def test_uninitialized(self):
- bufio = self.tp.__new__(self.tp)
- del bufio
- bufio = self.tp.__new__(self.tp)
- self.assertRaisesRegex((ValueError, AttributeError),
- 'uninitialized|has no attribute',
- bufio.read, 0)
- bufio.__init__(self.MockRawIO())
- self.assertEqual(bufio.read(0), b'')
-
- def test_read(self):
- for arg in (None, 7):
- rawio = self.MockRawIO((b"abc", b"d", b"efg"))
- bufio = self.tp(rawio)
- self.assertEqual(b"abcdefg", bufio.read(arg))
- # Invalid args
- self.assertRaises(ValueError, bufio.read, -2)
-
- def test_read1(self):
- rawio = self.MockRawIO((b"abc", b"d", b"efg"))
- bufio = self.tp(rawio)
- self.assertEqual(b"a", bufio.read(1))
- self.assertEqual(b"b", bufio.read1(1))
- self.assertEqual(rawio._reads, 1)
- self.assertEqual(b"", bufio.read1(0))
- self.assertEqual(b"c", bufio.read1(100))
- self.assertEqual(rawio._reads, 1)
- self.assertEqual(b"d", bufio.read1(100))
- self.assertEqual(rawio._reads, 2)
- self.assertEqual(b"efg", bufio.read1(100))
- self.assertEqual(rawio._reads, 3)
- self.assertEqual(b"", bufio.read1(100))
- self.assertEqual(rawio._reads, 4)
-
- def test_read1_arbitrary(self):
- rawio = self.MockRawIO((b"abc", b"d", b"efg"))
- bufio = self.tp(rawio)
- self.assertEqual(b"a", bufio.read(1))
- self.assertEqual(b"bc", bufio.read1())
- self.assertEqual(b"d", bufio.read1())
- self.assertEqual(b"efg", bufio.read1(-1))
- self.assertEqual(rawio._reads, 3)
- self.assertEqual(b"", bufio.read1())
- self.assertEqual(rawio._reads, 4)
-
- def test_readinto(self):
- rawio = self.MockRawIO((b"abc", b"d", b"efg"))
- bufio = self.tp(rawio)
- b = bytearray(2)
- self.assertEqual(bufio.readinto(b), 2)
- self.assertEqual(b, b"ab")
- self.assertEqual(bufio.readinto(b), 2)
- self.assertEqual(b, b"cd")
- self.assertEqual(bufio.readinto(b), 2)
- self.assertEqual(b, b"ef")
- self.assertEqual(bufio.readinto(b), 1)
- self.assertEqual(b, b"gf")
- self.assertEqual(bufio.readinto(b), 0)
- self.assertEqual(b, b"gf")
- rawio = self.MockRawIO((b"abc", None))
- bufio = self.tp(rawio)
- self.assertEqual(bufio.readinto(b), 2)
- self.assertEqual(b, b"ab")
- self.assertEqual(bufio.readinto(b), 1)
- self.assertEqual(b, b"cb")
-
- def test_readinto1(self):
- buffer_size = 10
- rawio = self.MockRawIO((b"abc", b"de", b"fgh", b"jkl"))
- bufio = self.tp(rawio, buffer_size=buffer_size)
- b = bytearray(2)
- self.assertEqual(bufio.peek(3), b'abc')
- self.assertEqual(rawio._reads, 1)
- self.assertEqual(bufio.readinto1(b), 2)
- self.assertEqual(b, b"ab")
- self.assertEqual(rawio._reads, 1)
- self.assertEqual(bufio.readinto1(b), 1)
- self.assertEqual(b[:1], b"c")
- self.assertEqual(rawio._reads, 1)
- self.assertEqual(bufio.readinto1(b), 2)
- self.assertEqual(b, b"de")
- self.assertEqual(rawio._reads, 2)
- b = bytearray(2*buffer_size)
- self.assertEqual(bufio.peek(3), b'fgh')
- self.assertEqual(rawio._reads, 3)
- self.assertEqual(bufio.readinto1(b), 6)
- self.assertEqual(b[:6], b"fghjkl")
- self.assertEqual(rawio._reads, 4)
-
- def test_readinto_array(self):
- buffer_size = 60
- data = b"a" * 26
- rawio = self.MockRawIO((data,))
- bufio = self.tp(rawio, buffer_size=buffer_size)
-
- # Create an array with element size > 1 byte
- b = array.array('i', b'x' * 32)
- assert len(b) != 16
-
- # Read into it. We should get as many *bytes* as we can fit into b
- # (which is more than the number of elements)
- n = bufio.readinto(b)
- self.assertGreater(n, len(b))
-
- # Check that old contents of b are preserved
- bm = memoryview(b).cast('B')
- self.assertLess(n, len(bm))
- self.assertEqual(bm[:n], data[:n])
- self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
-
- def test_readinto1_array(self):
- buffer_size = 60
- data = b"a" * 26
- rawio = self.MockRawIO((data,))
- bufio = self.tp(rawio, buffer_size=buffer_size)
-
- # Create an array with element size > 1 byte
- b = array.array('i', b'x' * 32)
- assert len(b) != 16
-
- # Read into it. We should get as many *bytes* as we can fit into b
- # (which is more than the number of elements)
- n = bufio.readinto1(b)
- self.assertGreater(n, len(b))
-
- # Check that old contents of b are preserved
- bm = memoryview(b).cast('B')
- self.assertLess(n, len(bm))
- self.assertEqual(bm[:n], data[:n])
- self.assertEqual(bm[n:], b'x' * (len(bm[n:])))
-
- def test_readlines(self):
- def bufio():
- rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
- return self.tp(rawio)
- self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
- self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
- self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
-
- def test_buffering(self):
- data = b"abcdefghi"
- dlen = len(data)
-
- tests = [
- [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
- [ 100, [ 3, 3, 3], [ dlen ] ],
- [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
- ]
-
- for bufsize, buf_read_sizes, raw_read_sizes in tests:
- rawio = self.MockFileIO(data)
- bufio = self.tp(rawio, buffer_size=bufsize)
- pos = 0
- for nbytes in buf_read_sizes:
- self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
- pos += nbytes
- # this is mildly implementation-dependent
- self.assertEqual(rawio.read_history, raw_read_sizes)
-
- def test_read_non_blocking(self):
- # Inject some None's in there to simulate EWOULDBLOCK
- rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
- bufio = self.tp(rawio)
- self.assertEqual(b"abcd", bufio.read(6))
- self.assertEqual(b"e", bufio.read(1))
- self.assertEqual(b"fg", bufio.read())
- self.assertEqual(b"", bufio.peek(1))
- self.assertIsNone(bufio.read())
- self.assertEqual(b"", bufio.read())
-
- rawio = self.MockRawIO((b"a", None, None))
- self.assertEqual(b"a", rawio.readall())
- self.assertIsNone(rawio.readall())
-
- def test_read_past_eof(self):
- rawio = self.MockRawIO((b"abc", b"d", b"efg"))
- bufio = self.tp(rawio)
-
- self.assertEqual(b"abcdefg", bufio.read(9000))
-
- def test_read_all(self):
- rawio = self.MockRawIO((b"abc", b"d", b"efg"))
- bufio = self.tp(rawio)
-
- self.assertEqual(b"abcdefg", bufio.read())
-
- @threading_helper.requires_working_threading()
- @support.requires_resource('cpu')
- def test_threads(self):
- try:
- # Write out many bytes with exactly the same number of 0's,
- # 1's... 255's. This will help us check that concurrent reading
- # doesn't duplicate or forget contents.
- N = 1000
- l = list(range(256)) * N
- random.shuffle(l)
- s = bytes(bytearray(l))
- with self.open(os_helper.TESTFN, "wb") as f:
- f.write(s)
- with self.open(os_helper.TESTFN, self.read_mode, buffering=0) as raw:
- bufio = self.tp(raw, 8)
- errors = []
- results = []
- def f():
- try:
- # Intra-buffer read then buffer-flushing read
- for n in cycle([1, 19]):
- s = bufio.read(n)
- if not s:
- break
- # list.append() is atomic
- results.append(s)
- except Exception as e:
- errors.append(e)
- raise
- threads = [threading.Thread(target=f) for x in range(20)]
- with threading_helper.start_threads(threads):
- time.sleep(0.02) # yield
- self.assertFalse(errors,
- "the following exceptions were caught: %r" % errors)
- s = b''.join(results)
- for i in range(256):
- c = bytes(bytearray([i]))
- self.assertEqual(s.count(c), N)
- finally:
- os_helper.unlink(os_helper.TESTFN)
-
- def test_unseekable(self):
- bufio = self.tp(self.MockUnseekableIO(b"A" * 10))
- self.assertRaises(self.UnsupportedOperation, bufio.tell)
- self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
- bufio.read(1)
- self.assertRaises(self.UnsupportedOperation, bufio.seek, 0)
- self.assertRaises(self.UnsupportedOperation, bufio.tell)
-
- def test_misbehaved_io(self):
- rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
- bufio = self.tp(rawio)
- self.assertRaises(OSError, bufio.seek, 0)
- self.assertRaises(OSError, bufio.tell)
-
- # Silence destructor error
- bufio.close = lambda: None
-
- def test_no_extraneous_read(self):
- # Issue #9550; when the raw IO object has satisfied the read request,
- # we should not issue any additional reads, otherwise it may block
- # (e.g. socket).
- bufsize = 16
- for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
- rawio = self.MockRawIO([b"x" * n])
- bufio = self.tp(rawio, bufsize)
- self.assertEqual(bufio.read(n), b"x" * n)
- # Simple case: one raw read is enough to satisfy the request.
- self.assertEqual(rawio._extraneous_reads, 0,
- "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
- # A more complex case where two raw reads are needed to satisfy
- # the request.
- rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
- bufio = self.tp(rawio, bufsize)
- self.assertEqual(bufio.read(n), b"x" * n)
- self.assertEqual(rawio._extraneous_reads, 0,
- "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
-
- def test_read_on_closed(self):
- # Issue #23796
- b = self.BufferedReader(self.BytesIO(b"12"))
- b.read(1)
- b.close()
- with self.subTest('peek'):
- self.assertRaises(ValueError, b.peek)
- with self.subTest('read1'):
- self.assertRaises(ValueError, b.read1, 1)
- with self.subTest('read'):
- self.assertRaises(ValueError, b.read)
- with self.subTest('readinto'):
- self.assertRaises(ValueError, b.readinto, bytearray())
- with self.subTest('readinto1'):
- self.assertRaises(ValueError, b.readinto1, bytearray())
- with self.subTest('flush'):
- self.assertRaises(ValueError, b.flush)
- with self.subTest('truncate'):
- self.assertRaises(ValueError, b.truncate)
- with self.subTest('seek'):
- self.assertRaises(ValueError, b.seek, 0)
-
- def test_truncate_on_read_only(self):
- rawio = self.MockFileIO(b"abc")
- bufio = self.tp(rawio)
- self.assertFalse(bufio.writable())
- self.assertRaises(self.UnsupportedOperation, bufio.truncate)
- self.assertRaises(self.UnsupportedOperation, bufio.truncate, 0)
-
- def test_tell_character_device_file(self):
- # GH-95782
- # For the (former) bug in BufferedIO to manifest, the wrapped IO obj
- # must be able to produce at least 2 bytes.
- raw = self.MockCharPseudoDevFileIO(b"12")
- buf = self.tp(raw)
- self.assertEqual(buf.tell(), 0)
- self.assertEqual(buf.read(1), b"1")
- self.assertEqual(buf.tell(), 0)
-
- def test_seek_character_device_file(self):
- raw = self.MockCharPseudoDevFileIO(b"12")
- buf = self.tp(raw)
- self.assertEqual(buf.seek(0, io.SEEK_CUR), 0)
- self.assertEqual(buf.seek(1, io.SEEK_SET), 0)
- self.assertEqual(buf.seek(0, io.SEEK_CUR), 0)
- self.assertEqual(buf.read(1), b"1")
-
- # In the C implementation, tell() sets the BufferedIO's abs_pos to 0,
- # which means that the next seek() could return a negative offset if it
- # does not sanity-check:
- self.assertEqual(buf.tell(), 0)
- self.assertEqual(buf.seek(0, io.SEEK_CUR), 0)
-
-
-class CBufferedReaderTest(BufferedReaderTest, SizeofTest, CTestCase):
- tp = io.BufferedReader
-
- def test_initialization(self):
- rawio = self.MockRawIO([b"abc"])
- bufio = self.tp(rawio)
- self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
- self.assertRaises(ValueError, bufio.read)
- self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
- self.assertRaises(ValueError, bufio.read)
- self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
- self.assertRaises(ValueError, bufio.read)
-
- def test_misbehaved_io_read(self):
- rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
- bufio = self.tp(rawio)
- # _pyio.BufferedReader seems to implement reading different, so that
- # checking this is not so easy.
- self.assertRaises(OSError, bufio.read, 10)
-
- def test_garbage_collection(self):
- # C BufferedReader objects are collected.
- # The Python version has __del__, so it ends into gc.garbage instead
- self.addCleanup(os_helper.unlink, os_helper.TESTFN)
- # Note that using warnings_helper.check_warnings() will keep the
- # file alive due to the `source` argument to warn(). So, use
- # catch_warnings() instead.
- with warnings.catch_warnings():
- warnings.simplefilter("ignore", ResourceWarning)
- rawio = self.FileIO(os_helper.TESTFN, "w+b")
- f = self.tp(rawio)
- f.f = f
- wr = weakref.ref(f)
- del f
- support.gc_collect()
- self.assertIsNone(wr(), wr)
-
- def test_args_error(self):
- # Issue #17275
- with self.assertRaisesRegex(TypeError, "BufferedReader"):
- self.tp(self.BytesIO(), 1024, 1024, 1024)
-
- def test_bad_readinto_value(self):
- rawio = self.tp(self.BytesIO(b"12"))
- rawio.readinto = lambda buf: -1
- bufio = self.tp(rawio)
- with self.assertRaises(OSError) as cm:
- bufio.readline()
- self.assertIsNone(cm.exception.__cause__)
-
- def test_bad_readinto_type(self):
- rawio = self.tp(self.BytesIO(b"12"))
- rawio.readinto = lambda buf: b''
- bufio = self.tp(rawio)
- with self.assertRaises(OSError) as cm:
- bufio.readline()
- self.assertIsInstance(cm.exception.__cause__, TypeError)
-
-
-class PyBufferedReaderTest(BufferedReaderTest, PyTestCase):
- tp = pyio.BufferedReader
-
-
-class BufferedWriterTest(CommonBufferedTests):
- write_mode = "wb"
-
- def test_constructor(self):
- rawio = self.MockRawIO()
- bufio = self.tp(rawio)
- bufio.__init__(rawio)
- bufio.__init__(rawio, buffer_size=1024)
- bufio.__init__(rawio, buffer_size=16)
- self.assertEqual(3, bufio.write(b"abc"))
- bufio.flush()
- self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
- self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
- self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
- bufio.__init__(rawio)
- self.assertEqual(3, bufio.write(b"ghi"))
- bufio.flush()
- self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
-
- def test_uninitialized(self):
- bufio = self.tp.__new__(self.tp)
- del bufio
- bufio = self.tp.__new__(self.tp)
- self.assertRaisesRegex((ValueError, AttributeError),
- 'uninitialized|has no attribute',
- bufio.write, b'')
- bufio.__init__(self.MockRawIO())
- self.assertEqual(bufio.write(b''), 0)
-
- def test_detach_flush(self):
- raw = self.MockRawIO()
- buf = self.tp(raw)
- buf.write(b"howdy!")
- self.assertFalse(raw._write_stack)
- buf.detach()
- self.assertEqual(raw._write_stack, [b"howdy!"])
-
- def test_write(self):
- # Write to the buffered IO but don't overflow the buffer.
- writer = self.MockRawIO()
- bufio = self.tp(writer, 8)
- bufio.write(b"abc")
- self.assertFalse(writer._write_stack)
- buffer = bytearray(b"def")
- bufio.write(buffer)
- buffer[:] = b"***" # Overwrite our copy of the data
- bufio.flush()
- self.assertEqual(b"".join(writer._write_stack), b"abcdef")
-
- def test_write_overflow(self):
- writer = self.MockRawIO()
- bufio = self.tp(writer, 8)
- contents = b"abcdefghijklmnop"
- for n in range(0, len(contents), 3):
- bufio.write(contents[n:n+3])
- flushed = b"".join(writer._write_stack)
- # At least (total - 8) bytes were implicitly flushed, perhaps more
- # depending on the implementation.
- self.assertStartsWith(flushed, contents[:-8])
-
- def check_writes(self, intermediate_func):
- # Lots of writes, test the flushed output is as expected.
- contents = bytes(range(256)) * 1000
- n = 0
- writer = self.MockRawIO()
- bufio = self.tp(writer, 13)
- # Generator of write sizes: repeat each N 15 times then proceed to N+1
- def gen_sizes():
- for size in count(1):
- for i in range(15):
- yield size
- sizes = gen_sizes()
- while n < len(contents):
- size = min(next(sizes), len(contents) - n)
- self.assertEqual(bufio.write(contents[n:n+size]), size)
- intermediate_func(bufio)
- n += size
- bufio.flush()
- self.assertEqual(contents, b"".join(writer._write_stack))
-
- def test_writes(self):
- self.check_writes(lambda bufio: None)
-
- def test_writes_and_flushes(self):
- self.check_writes(lambda bufio: bufio.flush())
-
- def test_writes_and_seeks(self):
- def _seekabs(bufio):
- pos = bufio.tell()
- bufio.seek(pos + 1, 0)
- bufio.seek(pos - 1, 0)
- bufio.seek(pos, 0)
- self.check_writes(_seekabs)
- def _seekrel(bufio):
- pos = bufio.seek(0, 1)
- bufio.seek(+1, 1)
- bufio.seek(-1, 1)
- bufio.seek(pos, 0)
- self.check_writes(_seekrel)
-
- def test_writes_and_truncates(self):
- self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
-
- def test_write_non_blocking(self):
- raw = self.MockNonBlockWriterIO()
- bufio = self.tp(raw, 8)
-
- self.assertEqual(bufio.write(b"abcd"), 4)
- self.assertEqual(bufio.write(b"efghi"), 5)
- # 1 byte will be written, the rest will be buffered
- raw.block_on(b"k")
- self.assertEqual(bufio.write(b"jklmn"), 5)
-
- # 8 bytes will be written, 8 will be buffered and the rest will be lost
- raw.block_on(b"0")
- try:
- bufio.write(b"opqrwxyz0123456789")
- except self.BlockingIOError as e:
- written = e.characters_written
- else:
- self.fail("BlockingIOError should have been raised")
- self.assertEqual(written, 16)
- self.assertEqual(raw.pop_written(),
- b"abcdefghijklmnopqrwxyz")
-
- self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
- s = raw.pop_written()
- # Previously buffered bytes were flushed
- self.assertStartsWith(s, b"01234567A")
-
- def test_write_and_rewind(self):
- raw = self.BytesIO()
- bufio = self.tp(raw, 4)
- self.assertEqual(bufio.write(b"abcdef"), 6)
- self.assertEqual(bufio.tell(), 6)
- bufio.seek(0, 0)
- self.assertEqual(bufio.write(b"XY"), 2)
- bufio.seek(6, 0)
- self.assertEqual(raw.getvalue(), b"XYcdef")
- self.assertEqual(bufio.write(b"123456"), 6)
- bufio.flush()
- self.assertEqual(raw.getvalue(), b"XYcdef123456")
-
- def test_flush(self):
- writer = self.MockRawIO()
- bufio = self.tp(writer, 8)
- bufio.write(b"abc")
- bufio.flush()
- self.assertEqual(b"abc", writer._write_stack[0])
-
- def test_writelines(self):
- l = [b'ab', b'cd', b'ef']
- writer = self.MockRawIO()
- bufio = self.tp(writer, 8)
- bufio.writelines(l)
- bufio.flush()
- self.assertEqual(b''.join(writer._write_stack), b'abcdef')
-
- def test_writelines_userlist(self):
- l = UserList([b'ab', b'cd', b'ef'])
- writer = self.MockRawIO()
- bufio = self.tp(writer, 8)
- bufio.writelines(l)
- bufio.flush()
- self.assertEqual(b''.join(writer._write_stack), b'abcdef')
-
- def test_writelines_error(self):
- writer = self.MockRawIO()
- bufio = self.tp(writer, 8)
- self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
- self.assertRaises(TypeError, bufio.writelines, None)
- self.assertRaises(TypeError, bufio.writelines, 'abc')
-
- def test_destructor(self):
- writer = self.MockRawIO()
- bufio = self.tp(writer, 8)
- bufio.write(b"abc")
- del bufio
- support.gc_collect()
- self.assertEqual(b"abc", writer._write_stack[0])
-
- def test_truncate(self):
- # Truncate implicitly flushes the buffer.
- self.addCleanup(os_helper.unlink, os_helper.TESTFN)
- with self.open(os_helper.TESTFN, self.write_mode, buffering=0) as raw:
- bufio = self.tp(raw, 8)
- bufio.write(b"abcdef")
- self.assertEqual(bufio.truncate(3), 3)
- self.assertEqual(bufio.tell(), 6)
- with self.open(os_helper.TESTFN, "rb", buffering=0) as f:
- self.assertEqual(f.read(), b"abc")
-
- def test_truncate_after_write(self):
- # Ensure that truncate preserves the file position after
- # writes longer than the buffer size.
- # Issue: https://bugs.python.org/issue32228
- self.addCleanup(os_helper.unlink, os_helper.TESTFN)
- with self.open(os_helper.TESTFN, "wb") as f:
- # Fill with some buffer
- f.write(b'\x00' * 10000)
- buffer_sizes = [8192, 4096, 200]
- for buffer_size in buffer_sizes:
- with self.open(os_helper.TESTFN, "r+b", buffering=buffer_size) as f:
- f.write(b'\x00' * (buffer_size + 1))
- # After write write_pos and write_end are set to 0
- f.read(1)
- # read operation makes sure that pos != raw_pos
- f.truncate()
- self.assertEqual(f.tell(), buffer_size + 2)
-
- @threading_helper.requires_working_threading()
- @support.requires_resource('cpu')
- def test_threads(self):
- try:
- # Write out many bytes from many threads and test they were
- # all flushed.
- N = 1000
- contents = bytes(range(256)) * N
- sizes = cycle([1, 19])
- n = 0
- queue = deque()
- while n < len(contents):
- size = next(sizes)
- queue.append(contents[n:n+size])
- n += size
- del contents
- # We use a real file object because it allows us to
- # exercise situations where the GIL is released before
- # writing the buffer to the raw streams. This is in addition
- # to concurrency issues due to switching threads in the middle
- # of Python code.
- with self.open(os_helper.TESTFN, self.write_mode, buffering=0) as raw:
- bufio = self.tp(raw, 8)
- errors = []
- def f():
- try:
- while True:
- try:
- s = queue.popleft()
- except IndexError:
- return
- bufio.write(s)
- except Exception as e:
- errors.append(e)
- raise
- threads = [threading.Thread(target=f) for x in range(20)]
- with threading_helper.start_threads(threads):
- time.sleep(0.02) # yield
- self.assertFalse(errors,
- "the following exceptions were caught: %r" % errors)
- bufio.close()
- with self.open(os_helper.TESTFN, "rb") as f:
- s = f.read()
- for i in range(256):
- self.assertEqual(s.count(bytes([i])), N)
- finally:
- os_helper.unlink(os_helper.TESTFN)
-
- def test_misbehaved_io(self):
- rawio = self.MisbehavedRawIO()
- bufio = self.tp(rawio, 5)
- self.assertRaises(OSError, bufio.seek, 0)
- self.assertRaises(OSError, bufio.tell)
- self.assertRaises(OSError, bufio.write, b"abcdef")
-
- # Silence destructor error
- bufio.close = lambda: None
-
- def test_max_buffer_size_removal(self):
- with self.assertRaises(TypeError):
- self.tp(self.MockRawIO(), 8, 12)
-
- def test_write_error_on_close(self):
- raw = self.MockRawIO()
- def bad_write(b):
- raise OSError()
- raw.write = bad_write
- b = self.tp(raw)
- b.write(b'spam')
- self.assertRaises(OSError, b.close) # exception not swallowed
- self.assertTrue(b.closed)
-
- @threading_helper.requires_working_threading()
- def test_slow_close_from_thread(self):
- # Issue #31976
- rawio = self.SlowFlushRawIO()
- bufio = self.tp(rawio, 8)
- t = threading.Thread(target=bufio.close)
- t.start()
- rawio.in_flush.wait()
- self.assertRaises(ValueError, bufio.write, b'spam')
- self.assertTrue(bufio.closed)
- t.join()
-
-
-class CBufferedWriterTest(BufferedWriterTest, SizeofTest, CTestCase):
- tp = io.BufferedWriter
-
- def test_initialization(self):
- rawio = self.MockRawIO()
- bufio = self.tp(rawio)
- self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
- self.assertRaises(ValueError, bufio.write, b"def")
- self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
- self.assertRaises(ValueError, bufio.write, b"def")
- self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
- self.assertRaises(ValueError, bufio.write, b"def")
-
- def test_garbage_collection(self):
- # C BufferedWriter objects are collected, and collecting them flushes
- # all data to disk.
- # The Python version has __del__, so it ends into gc.garbage instead
- self.addCleanup(os_helper.unlink, os_helper.TESTFN)
- # Note that using warnings_helper.check_warnings() will keep the
- # file alive due to the `source` argument to warn(). So, use
- # catch_warnings() instead.
- with warnings.catch_warnings():
- warnings.simplefilter("ignore", ResourceWarning)
- rawio = self.FileIO(os_helper.TESTFN, "w+b")
- f = self.tp(rawio)
- f.write(b"123xxx")
- f.x = f
- wr = weakref.ref(f)
- del f
- support.gc_collect()
- self.assertIsNone(wr(), wr)
- with self.open(os_helper.TESTFN, "rb") as f:
- self.assertEqual(f.read(), b"123xxx")
-
- def test_args_error(self):
- # Issue #17275
- with self.assertRaisesRegex(TypeError, "BufferedWriter"):
- self.tp(self.BytesIO(), 1024, 1024, 1024)
-
-
-class PyBufferedWriterTest(BufferedWriterTest, PyTestCase):
- tp = pyio.BufferedWriter
-
-class BufferedRWPairTest:
-
- def test_constructor(self):
- pair = self.tp(self.MockRawIO(), self.MockRawIO())
- self.assertFalse(pair.closed)
-
- def test_uninitialized(self):
- pair = self.tp.__new__(self.tp)
- del pair
- pair = self.tp.__new__(self.tp)
- self.assertRaisesRegex((ValueError, AttributeError),
- 'uninitialized|has no attribute',
- pair.read, 0)
- self.assertRaisesRegex((ValueError, AttributeError),
- 'uninitialized|has no attribute',
- pair.write, b'')
- pair.__init__(self.MockRawIO(), self.MockRawIO())
- self.assertEqual(pair.read(0), b'')
- self.assertEqual(pair.write(b''), 0)
-
- def test_detach(self):
- pair = self.tp(self.MockRawIO(), self.MockRawIO())
- self.assertRaises(self.UnsupportedOperation, pair.detach)
-
- def test_constructor_max_buffer_size_removal(self):
- with self.assertRaises(TypeError):
- self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
-
- def test_constructor_with_not_readable(self):
- class NotReadable(self.MockRawIO):
- def readable(self):
- return False
-
- self.assertRaises(OSError, self.tp, NotReadable(), self.MockRawIO())
-
- def test_constructor_with_not_writeable(self):
- class NotWriteable(self.MockRawIO):
- def writable(self):
- return False
-
- self.assertRaises(OSError, self.tp, self.MockRawIO(), NotWriteable())
-
- def test_read(self):
- pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
-
- self.assertEqual(pair.read(3), b"abc")
- self.assertEqual(pair.read(1), b"d")
- self.assertEqual(pair.read(), b"ef")
- pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
- self.assertEqual(pair.read(None), b"abc")
-
- def test_readlines(self):
- pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
- self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
- self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
- self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
-
- def test_read1(self):
- # .read1() is delegated to the underlying reader object, so this test
- # can be shallow.
- pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
-
- self.assertEqual(pair.read1(3), b"abc")
- self.assertEqual(pair.read1(), b"def")
-
- def test_readinto(self):
- for method in ("readinto", "readinto1"):
- with self.subTest(method):
- pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
-
- data = byteslike(b'\0' * 5)
- self.assertEqual(getattr(pair, method)(data), 5)
- self.assertEqual(bytes(data), b"abcde")
-
- # gh-138720: C BufferedRWPair would destruct in a bad order resulting in
- # an unraisable exception.
- support.gc_collect()
-
- def test_write(self):
- w = self.MockRawIO()
- pair = self.tp(self.MockRawIO(), w)
-
- pair.write(b"abc")
- pair.flush()
- buffer = bytearray(b"def")
- pair.write(buffer)
- buffer[:] = b"***" # Overwrite our copy of the data
- pair.flush()
- self.assertEqual(w._write_stack, [b"abc", b"def"])
-
- def test_peek(self):
- pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
-
- self.assertStartsWith(pair.peek(3), b"abc")
- self.assertEqual(pair.read(3), b"abc")
-
- def test_readable(self):
- pair = self.tp(self.MockRawIO(), self.MockRawIO())
- self.assertTrue(pair.readable())
-
- def test_writeable(self):
- pair = self.tp(self.MockRawIO(), self.MockRawIO())
- self.assertTrue(pair.writable())
-
- def test_seekable(self):
- # BufferedRWPairs are never seekable, even if their readers and writers
- # are.
- pair = self.tp(self.MockRawIO(), self.MockRawIO())
- self.assertFalse(pair.seekable())
-
- # .flush() is delegated to the underlying writer object and has been
- # tested in the test_write method.
-
- def test_close_and_closed(self):
- pair = self.tp(self.MockRawIO(), self.MockRawIO())
- self.assertFalse(pair.closed)
- pair.close()
- self.assertTrue(pair.closed)
-
- def test_reader_close_error_on_close(self):
- def reader_close():
- reader_non_existing
- reader = self.MockRawIO()
- reader.close = reader_close
- writer = self.MockRawIO()
- pair = self.tp(reader, writer)
- with self.assertRaises(NameError) as err:
- pair.close()
- self.assertIn('reader_non_existing', str(err.exception))
- self.assertTrue(pair.closed)
- self.assertFalse(reader.closed)
- self.assertTrue(writer.closed)
-
- # Silence destructor error
- reader.close = lambda: None
-
- def test_writer_close_error_on_close(self):
- def writer_close():
- writer_non_existing
- reader = self.MockRawIO()
- writer = self.MockRawIO()
- writer.close = writer_close
- pair = self.tp(reader, writer)
- with self.assertRaises(NameError) as err:
- pair.close()
- self.assertIn('writer_non_existing', str(err.exception))
- self.assertFalse(pair.closed)
- self.assertTrue(reader.closed)
- self.assertFalse(writer.closed)
-
- # Silence destructor error
- writer.close = lambda: None
- writer = None
-
- # Ignore BufferedWriter (of the BufferedRWPair) unraisable exception
- with support.catch_unraisable_exception():
- # Ignore BufferedRWPair unraisable exception
- with support.catch_unraisable_exception():
- pair = None
- support.gc_collect()
- support.gc_collect()
-
- def test_reader_writer_close_error_on_close(self):
- def reader_close():
- reader_non_existing
- def writer_close():
- writer_non_existing
- reader = self.MockRawIO()
- reader.close = reader_close
- writer = self.MockRawIO()
- writer.close = writer_close
- pair = self.tp(reader, writer)
- with self.assertRaises(NameError) as err:
- pair.close()
- self.assertIn('reader_non_existing', str(err.exception))
- self.assertIsInstance(err.exception.__context__, NameError)
- self.assertIn('writer_non_existing', str(err.exception.__context__))
- self.assertFalse(pair.closed)
- self.assertFalse(reader.closed)
- self.assertFalse(writer.closed)
-
- # Silence destructor error
- reader.close = lambda: None
- writer.close = lambda: None
-
- def test_isatty(self):
- class SelectableIsAtty(self.MockRawIO):
- def __init__(self, isatty):
- super().__init__()
- self._isatty = isatty
-
- def isatty(self):
- return self._isatty
-
- pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
- self.assertFalse(pair.isatty())
-
- pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
- self.assertTrue(pair.isatty())
-
- pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
- self.assertTrue(pair.isatty())
-
- pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
- self.assertTrue(pair.isatty())
-
- def test_weakref_clearing(self):
- brw = self.tp(self.MockRawIO(), self.MockRawIO())
- ref = weakref.ref(brw)
- brw = None
- ref = None # Shouldn't segfault.
-
-class CBufferedRWPairTest(BufferedRWPairTest, CTestCase):
- tp = io.BufferedRWPair
-
-class PyBufferedRWPairTest(BufferedRWPairTest, PyTestCase):
- tp = pyio.BufferedRWPair
-
-
-class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
- read_mode = "rb+"
- write_mode = "wb+"
-
- def test_constructor(self):
- BufferedReaderTest.test_constructor(self)
- BufferedWriterTest.test_constructor(self)
-
- def test_uninitialized(self):
- BufferedReaderTest.test_uninitialized(self)
- BufferedWriterTest.test_uninitialized(self)
-
- def test_read_and_write(self):
- raw = self.MockRawIO((b"asdf", b"ghjk"))
- rw = self.tp(raw, 8)
-
- self.assertEqual(b"as", rw.read(2))
- rw.write(b"ddd")
- rw.write(b"eee")
- self.assertFalse(raw._write_stack) # Buffer writes
- self.assertEqual(b"ghjk", rw.read())
- self.assertEqual(b"dddeee", raw._write_stack[0])
-
- def test_seek_and_tell(self):
- raw = self.BytesIO(b"asdfghjkl")
- rw = self.tp(raw)
-
- self.assertEqual(b"as", rw.read(2))
- self.assertEqual(2, rw.tell())
- rw.seek(0, 0)
- self.assertEqual(b"asdf", rw.read(4))
-
- rw.write(b"123f")
- rw.seek(0, 0)
- self.assertEqual(b"asdf123fl", rw.read())
- self.assertEqual(9, rw.tell())
- rw.seek(-4, 2)
- self.assertEqual(5, rw.tell())
- rw.seek(2, 1)
- self.assertEqual(7, rw.tell())
- self.assertEqual(b"fl", rw.read(11))
- rw.flush()
- self.assertEqual(b"asdf123fl", raw.getvalue())
-
- self.assertRaises(TypeError, rw.seek, 0.0)
-
- def check_flush_and_read(self, read_func):
- raw = self.BytesIO(b"abcdefghi")
- bufio = self.tp(raw)
-
- self.assertEqual(b"ab", read_func(bufio, 2))
- bufio.write(b"12")
- self.assertEqual(b"ef", read_func(bufio, 2))
- self.assertEqual(6, bufio.tell())
- bufio.flush()
- self.assertEqual(6, bufio.tell())
- self.assertEqual(b"ghi", read_func(bufio))
- raw.seek(0, 0)
- raw.write(b"XYZ")
- # flush() resets the read buffer
- bufio.flush()
- bufio.seek(0, 0)
- self.assertEqual(b"XYZ", read_func(bufio, 3))
-
- def test_flush_and_read(self):
- self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
-
- def test_flush_and_readinto(self):
- def _readinto(bufio, n=-1):
- b = bytearray(n if n >= 0 else 9999)
- n = bufio.readinto(b)
- return bytes(b[:n])
- self.check_flush_and_read(_readinto)
-
- def test_flush_and_peek(self):
- def _peek(bufio, n=-1):
- # This relies on the fact that the buffer can contain the whole
- # raw stream, otherwise peek() can return less.
- b = bufio.peek(n)
- if n != -1:
- b = b[:n]
- bufio.seek(len(b), 1)
- return b
- self.check_flush_and_read(_peek)
-
- def test_flush_and_write(self):
- raw = self.BytesIO(b"abcdefghi")
- bufio = self.tp(raw)
-
- bufio.write(b"123")
- bufio.flush()
- bufio.write(b"45")
- bufio.flush()
- bufio.seek(0, 0)
- self.assertEqual(b"12345fghi", raw.getvalue())
- self.assertEqual(b"12345fghi", bufio.read())
-
- def test_threads(self):
- BufferedReaderTest.test_threads(self)
- BufferedWriterTest.test_threads(self)
-
- def test_writes_and_peek(self):
- def _peek(bufio):
- bufio.peek(1)
- self.check_writes(_peek)
- def _peek(bufio):
- pos = bufio.tell()
- bufio.seek(-1, 1)
- bufio.peek(1)
- bufio.seek(pos, 0)
- self.check_writes(_peek)
-
- def test_writes_and_reads(self):
- def _read(bufio):
- bufio.seek(-1, 1)
- bufio.read(1)
- self.check_writes(_read)
-
- def test_writes_and_read1s(self):
- def _read1(bufio):
- bufio.seek(-1, 1)
- bufio.read1(1)
- self.check_writes(_read1)
-
- def test_writes_and_readintos(self):
- def _read(bufio):
- bufio.seek(-1, 1)
- bufio.readinto(bytearray(1))
- self.check_writes(_read)
-
- def test_write_after_readahead(self):
- # Issue #6629: writing after the buffer was filled by readahead should
- # first rewind the raw stream.
- for overwrite_size in [1, 5]:
- raw = self.BytesIO(b"A" * 10)
- bufio = self.tp(raw, 4)
- # Trigger readahead
- self.assertEqual(bufio.read(1), b"A")
- self.assertEqual(bufio.tell(), 1)
- # Overwriting should rewind the raw stream if it needs so
- bufio.write(b"B" * overwrite_size)
- self.assertEqual(bufio.tell(), overwrite_size + 1)
- # If the write size was smaller than the buffer size, flush() and
- # check that rewind happens.
- bufio.flush()
- self.assertEqual(bufio.tell(), overwrite_size + 1)
- s = raw.getvalue()
- self.assertEqual(s,
- b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
-
- def test_write_rewind_write(self):
- # Various combinations of reading / writing / seeking backwards / writing again
- def mutate(bufio, pos1, pos2):
- assert pos2 >= pos1
- # Fill the buffer
- bufio.seek(pos1)
- bufio.read(pos2 - pos1)
- bufio.write(b'\x02')
- # This writes earlier than the previous write, but still inside
- # the buffer.
- bufio.seek(pos1)
- bufio.write(b'\x01')
-
- b = b"\x80\x81\x82\x83\x84"
- for i in range(0, len(b)):
- for j in range(i, len(b)):
- raw = self.BytesIO(b)
- bufio = self.tp(raw, 100)
- mutate(bufio, i, j)
- bufio.flush()
- expected = bytearray(b)
- expected[j] = 2
- expected[i] = 1
- self.assertEqual(raw.getvalue(), expected,
- "failed result for i=%d, j=%d" % (i, j))
-
- def test_truncate_after_read_or_write(self):
- raw = self.BytesIO(b"A" * 10)
- bufio = self.tp(raw, 100)
- self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
- self.assertEqual(bufio.truncate(), 2)
- self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
- self.assertEqual(bufio.truncate(), 4)
-
- def test_misbehaved_io(self):
- BufferedReaderTest.test_misbehaved_io(self)
- BufferedWriterTest.test_misbehaved_io(self)
-
- def test_interleaved_read_write(self):
- # Test for issue #12213
- with self.BytesIO(b'abcdefgh') as raw:
- with self.tp(raw, 100) as f:
- f.write(b"1")
- self.assertEqual(f.read(1), b'b')
- f.write(b'2')
- self.assertEqual(f.read1(1), b'd')
- f.write(b'3')
- buf = bytearray(1)
- f.readinto(buf)
- self.assertEqual(buf, b'f')
- f.write(b'4')
- self.assertEqual(f.peek(1), b'h')
- f.flush()
- self.assertEqual(raw.getvalue(), b'1b2d3f4h')
-
- with self.BytesIO(b'abc') as raw:
- with self.tp(raw, 100) as f:
- self.assertEqual(f.read(1), b'a')
- f.write(b"2")
- self.assertEqual(f.read(1), b'c')
- f.flush()
- self.assertEqual(raw.getvalue(), b'a2c')
-
- def test_read1_after_write(self):
- with self.BytesIO(b'abcdef') as raw:
- with self.tp(raw, 3) as f:
- f.write(b"1")
- self.assertEqual(f.read1(1), b'b')
- f.flush()
- self.assertEqual(raw.getvalue(), b'1bcdef')
- with self.BytesIO(b'abcdef') as raw:
- with self.tp(raw, 3) as f:
- f.write(b"1")
- self.assertEqual(f.read1(), b'bcd')
- f.flush()
- self.assertEqual(raw.getvalue(), b'1bcdef')
- with self.BytesIO(b'abcdef') as raw:
- with self.tp(raw, 3) as f:
- f.write(b"1")
- # XXX: read(100) returns different numbers of bytes
- # in Python and C implementations.
- self.assertEqual(f.read1(100)[:3], b'bcd')
- f.flush()
- self.assertEqual(raw.getvalue(), b'1bcdef')
-
- def test_interleaved_readline_write(self):
- with self.BytesIO(b'ab\ncdef\ng\n') as raw:
- with self.tp(raw) as f:
- f.write(b'1')
- self.assertEqual(f.readline(), b'b\n')
- f.write(b'2')
- self.assertEqual(f.readline(), b'def\n')
- f.write(b'3')
- self.assertEqual(f.readline(), b'\n')
- f.flush()
- self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
-
- # You can't construct a BufferedRandom over a non-seekable stream.
- test_unseekable = None
-
- # writable() returns True, so there's no point to test it over
- # a writable stream.
- test_truncate_on_read_only = None
-
-
-class CBufferedRandomTest(BufferedRandomTest, SizeofTest, CTestCase):
- tp = io.BufferedRandom
-
- def test_garbage_collection(self):
- CBufferedReaderTest.test_garbage_collection(self)
- CBufferedWriterTest.test_garbage_collection(self)
-
- def test_args_error(self):
- # Issue #17275
- with self.assertRaisesRegex(TypeError, "BufferedRandom"):
- self.tp(self.BytesIO(), 1024, 1024, 1024)
-
-
-class PyBufferedRandomTest(BufferedRandomTest, PyTestCase):
- tp = pyio.BufferedRandom
-
-
# XXX Tests for open()
class MiscIOTest: