import abc
import array
import errno
-import locale
import os
import pickle
import random
from test.support.os_helper import FakePath
from .utils import byteslike, CTestCase, PyTestCase
-import codecs
import io # C implementation of io
import _pyio as pyio # Python implementation of io
-def _default_chunk_size():
- """Get the default TextIOWrapper chunk size"""
- with open(__file__, "r", encoding="latin-1") as f:
- return f._CHUNK_SIZE
-
-
-
-class BadIndex:
- def __index__(self):
- 1/0
-
-
class IOTest:
def setUp(self):
tp = pyio.BufferedRandom
-# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
-# properties:
-# - A single output character can correspond to many bytes of input.
-# - The number of input bytes to complete the character can be
-# undetermined until the last input byte is received.
-# - The number of input bytes can vary depending on previous input.
-# - A single input byte can correspond to many characters of output.
-# - The number of output characters can be undetermined until the
-# last input byte is received.
-# - The number of output characters can vary depending on previous input.
-
-class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
- """
- For testing seek/tell behavior with a stateful, buffering decoder.
-
- Input is a sequence of words. Words may be fixed-length (length set
- by input) or variable-length (period-terminated). In variable-length
- mode, extra periods are ignored. Possible words are:
- - 'i' followed by a number sets the input length, I (maximum 99).
- When I is set to 0, words are space-terminated.
- - 'o' followed by a number sets the output length, O (maximum 99).
- - Any other word is converted into a word followed by a period on
- the output. The output word consists of the input word truncated
- or padded out with hyphens to make its length equal to O. If O
- is 0, the word is output verbatim without truncating or padding.
- I and O are initially set to 1. When I changes, any buffered input is
- re-scanned according to the new I. EOF also terminates the last word.
- """
-
- def __init__(self, errors='strict'):
- codecs.IncrementalDecoder.__init__(self, errors)
- self.reset()
-
- def __repr__(self):
- return '<SID %x>' % id(self)
-
- def reset(self):
- self.i = 1
- self.o = 1
- self.buffer = bytearray()
-
- def getstate(self):
- i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
- return bytes(self.buffer), i*100 + o
-
- def setstate(self, state):
- buffer, io = state
- self.buffer = bytearray(buffer)
- i, o = divmod(io, 100)
- self.i, self.o = i ^ 1, o ^ 1
-
- def decode(self, input, final=False):
- output = ''
- for b in input:
- if self.i == 0: # variable-length, terminated with period
- if b == ord('.'):
- if self.buffer:
- output += self.process_word()
- else:
- self.buffer.append(b)
- else: # fixed-length, terminate after self.i bytes
- self.buffer.append(b)
- if len(self.buffer) == self.i:
- output += self.process_word()
- if final and self.buffer: # EOF terminates the last word
- output += self.process_word()
- return output
-
- def process_word(self):
- output = ''
- if self.buffer[0] == ord('i'):
- self.i = min(99, int(self.buffer[1:] or 0)) # set input length
- elif self.buffer[0] == ord('o'):
- self.o = min(99, int(self.buffer[1:] or 0)) # set output length
- else:
- output = self.buffer.decode('ascii')
- if len(output) < self.o:
- output += '-'*self.o # pad out with hyphens
- if self.o:
- output = output[:self.o] # truncate to output length
- output += '.'
- self.buffer = bytearray()
- return output
-
- codecEnabled = False
-
-
-# bpo-41919: This method is separated from StatefulIncrementalDecoder to avoid a resource leak
-# when registering codecs and cleanup functions.
-def lookupTestDecoder(name):
- if StatefulIncrementalDecoder.codecEnabled and name == 'test_decoder':
- latin1 = codecs.lookup('latin-1')
- return codecs.CodecInfo(
- name='test_decoder', encode=latin1.encode, decode=None,
- incrementalencoder=None,
- streamreader=None, streamwriter=None,
- incrementaldecoder=StatefulIncrementalDecoder)
-
-
-class StatefulIncrementalDecoderTest(unittest.TestCase):
- """
- Make sure the StatefulIncrementalDecoder actually works.
- """
-
- test_cases = [
- # I=1, O=1 (fixed-length input == fixed-length output)
- (b'abcd', False, 'a.b.c.d.'),
- # I=0, O=0 (variable-length input, variable-length output)
- (b'oiabcd', True, 'abcd.'),
- # I=0, O=0 (should ignore extra periods)
- (b'oi...abcd...', True, 'abcd.'),
- # I=0, O=6 (variable-length input, fixed-length output)
- (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
- # I=2, O=6 (fixed-length input < fixed-length output)
- (b'i.i2.o6xyz', True, 'xy----.z-----.'),
- # I=6, O=3 (fixed-length input > fixed-length output)
- (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
- # I=0, then 3; O=29, then 15 (with longer output)
- (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
- 'a----------------------------.' +
- 'b----------------------------.' +
- 'cde--------------------------.' +
- 'abcdefghijabcde.' +
- 'a.b------------.' +
- '.c.------------.' +
- 'd.e------------.' +
- 'k--------------.' +
- 'l--------------.' +
- 'm--------------.')
- ]
-
- def test_decoder(self):
- # Try a few one-shot test cases.
- for input, eof, output in self.test_cases:
- d = StatefulIncrementalDecoder()
- self.assertEqual(d.decode(input, eof), output)
-
- # Also test an unfinished decode, followed by forcing EOF.
- d = StatefulIncrementalDecoder()
- self.assertEqual(d.decode(b'oiabcd'), '')
- self.assertEqual(d.decode(b'', 1), 'abcd.')
-
-class TextIOWrapperTest:
-
- def setUp(self):
- self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
- self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
- os_helper.unlink(os_helper.TESTFN)
- codecs.register(lookupTestDecoder)
- self.addCleanup(codecs.unregister, lookupTestDecoder)
-
- def tearDown(self):
- os_helper.unlink(os_helper.TESTFN)
-
- def test_constructor(self):
- r = self.BytesIO(b"\xc3\xa9\n\n")
- b = self.BufferedReader(r, 1000)
- t = self.TextIOWrapper(b, encoding="utf-8")
- t.__init__(b, encoding="latin-1", newline="\r\n")
- self.assertEqual(t.encoding, "latin-1")
- self.assertEqual(t.line_buffering, False)
- t.__init__(b, encoding="utf-8", line_buffering=True)
- self.assertEqual(t.encoding, "utf-8")
- self.assertEqual(t.line_buffering, True)
- self.assertEqual("\xe9\n", t.readline())
- invalid_type = TypeError if self.is_C else ValueError
- with self.assertRaises(invalid_type):
- t.__init__(b, encoding=42)
- with self.assertRaises(UnicodeEncodeError):
- t.__init__(b, encoding='\udcfe')
- with self.assertRaises(ValueError):
- t.__init__(b, encoding='utf-8\0')
- with self.assertRaises(invalid_type):
- t.__init__(b, encoding="utf-8", errors=42)
- if support.Py_DEBUG or sys.flags.dev_mode or self.is_C:
- with self.assertRaises(UnicodeEncodeError):
- t.__init__(b, encoding="utf-8", errors='\udcfe')
- if support.Py_DEBUG or sys.flags.dev_mode or self.is_C:
- with self.assertRaises(ValueError):
- t.__init__(b, encoding="utf-8", errors='replace\0')
- with self.assertRaises(TypeError):
- t.__init__(b, encoding="utf-8", newline=42)
- with self.assertRaises(ValueError):
- t.__init__(b, encoding="utf-8", newline='\udcfe')
- with self.assertRaises(ValueError):
- t.__init__(b, encoding="utf-8", newline='\n\0')
- with self.assertRaises(ValueError):
- t.__init__(b, encoding="utf-8", newline='xyzzy')
-
- def test_uninitialized(self):
- t = self.TextIOWrapper.__new__(self.TextIOWrapper)
- del t
- t = self.TextIOWrapper.__new__(self.TextIOWrapper)
- self.assertRaises(Exception, repr, t)
- self.assertRaisesRegex((ValueError, AttributeError),
- 'uninitialized|has no attribute',
- t.read, 0)
- t.__init__(self.MockRawIO(), encoding="utf-8")
- self.assertEqual(t.read(0), '')
-
- def test_non_text_encoding_codecs_are_rejected(self):
- # Ensure the constructor complains if passed a codec that isn't
- # marked as a text encoding
- # http://bugs.python.org/issue20404
- r = self.BytesIO()
- b = self.BufferedWriter(r)
- with self.assertRaisesRegex(LookupError, "is not a text encoding"):
- self.TextIOWrapper(b, encoding="hex")
-
- def test_detach(self):
- r = self.BytesIO()
- b = self.BufferedWriter(r)
- t = self.TextIOWrapper(b, encoding="ascii")
- self.assertIs(t.detach(), b)
-
- t = self.TextIOWrapper(b, encoding="ascii")
- t.write("howdy")
- self.assertFalse(r.getvalue())
- t.detach()
- self.assertEqual(r.getvalue(), b"howdy")
- self.assertRaises(ValueError, t.detach)
-
- # Operations independent of the detached stream should still work
- repr(t)
- self.assertEqual(t.encoding, "ascii")
- self.assertEqual(t.errors, "strict")
- self.assertFalse(t.line_buffering)
- self.assertFalse(t.write_through)
-
- def test_repr(self):
- raw = self.BytesIO("hello".encode("utf-8"))
- b = self.BufferedReader(raw)
- t = self.TextIOWrapper(b, encoding="utf-8")
- modname = self.TextIOWrapper.__module__
- self.assertRegex(repr(t),
- r"<(%s\.)?TextIOWrapper encoding='utf-8'>" % modname)
- raw.name = "dummy"
- self.assertRegex(repr(t),
- r"<(%s\.)?TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
- t.mode = "r"
- self.assertRegex(repr(t),
- r"<(%s\.)?TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
- raw.name = b"dummy"
- self.assertRegex(repr(t),
- r"<(%s\.)?TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
-
- t.buffer.detach()
- repr(t) # Should not raise an exception
-
- def test_recursive_repr(self):
- # Issue #25455
- raw = self.BytesIO()
- t = self.TextIOWrapper(raw, encoding="utf-8")
- with support.swap_attr(raw, 'name', t), support.infinite_recursion(25):
- with self.assertRaises(RuntimeError):
- repr(t) # Should not crash
-
- def test_subclass_repr(self):
- class TestSubclass(self.TextIOWrapper):
- pass
-
- f = TestSubclass(self.StringIO())
- self.assertIn(TestSubclass.__name__, repr(f))
-
- def test_line_buffering(self):
- r = self.BytesIO()
- b = self.BufferedWriter(r, 1000)
- t = self.TextIOWrapper(b, encoding="utf-8", newline="\n", line_buffering=True)
- t.write("X")
- self.assertEqual(r.getvalue(), b"") # No flush happened
- t.write("Y\nZ")
- self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
- t.write("A\rB")
- self.assertEqual(r.getvalue(), b"XY\nZA\rB")
-
- def test_reconfigure_line_buffering(self):
- r = self.BytesIO()
- b = self.BufferedWriter(r, 1000)
- t = self.TextIOWrapper(b, encoding="utf-8", newline="\n", line_buffering=False)
- t.write("AB\nC")
- self.assertEqual(r.getvalue(), b"")
-
- t.reconfigure(line_buffering=True) # implicit flush
- self.assertEqual(r.getvalue(), b"AB\nC")
- t.write("DEF\nG")
- self.assertEqual(r.getvalue(), b"AB\nCDEF\nG")
- t.write("H")
- self.assertEqual(r.getvalue(), b"AB\nCDEF\nG")
- t.reconfigure(line_buffering=False) # implicit flush
- self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH")
- t.write("IJ")
- self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH")
-
- # Keeping default value
- t.reconfigure()
- t.reconfigure(line_buffering=None)
- self.assertEqual(t.line_buffering, False)
- t.reconfigure(line_buffering=True)
- t.reconfigure()
- t.reconfigure(line_buffering=None)
- self.assertEqual(t.line_buffering, True)
-
- @unittest.skipIf(sys.flags.utf8_mode, "utf-8 mode is enabled")
- def test_default_encoding(self):
- with os_helper.EnvironmentVarGuard() as env:
- # try to get a user preferred encoding different than the current
- # locale encoding to check that TextIOWrapper() uses the current
- # locale encoding and not the user preferred encoding
- env.unset('LC_ALL', 'LANG', 'LC_CTYPE')
-
- current_locale_encoding = locale.getencoding()
- b = self.BytesIO()
- with warnings.catch_warnings():
- warnings.simplefilter("ignore", EncodingWarning)
- t = self.TextIOWrapper(b)
- self.assertEqual(t.encoding, current_locale_encoding)
-
- def test_encoding(self):
- # Check the encoding attribute is always set, and valid
- b = self.BytesIO()
- t = self.TextIOWrapper(b, encoding="utf-8")
- self.assertEqual(t.encoding, "utf-8")
- with warnings.catch_warnings():
- warnings.simplefilter("ignore", EncodingWarning)
- t = self.TextIOWrapper(b)
- self.assertIsNotNone(t.encoding)
- codecs.lookup(t.encoding)
-
- def test_encoding_errors_reading(self):
- # (1) default
- b = self.BytesIO(b"abc\n\xff\n")
- t = self.TextIOWrapper(b, encoding="ascii")
- self.assertRaises(UnicodeError, t.read)
- # (2) explicit strict
- b = self.BytesIO(b"abc\n\xff\n")
- t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
- self.assertRaises(UnicodeError, t.read)
- # (3) ignore
- b = self.BytesIO(b"abc\n\xff\n")
- t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
- self.assertEqual(t.read(), "abc\n\n")
- # (4) replace
- b = self.BytesIO(b"abc\n\xff\n")
- t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
- self.assertEqual(t.read(), "abc\n\ufffd\n")
-
- def test_encoding_errors_writing(self):
- # (1) default
- b = self.BytesIO()
- t = self.TextIOWrapper(b, encoding="ascii")
- self.assertRaises(UnicodeError, t.write, "\xff")
- # (2) explicit strict
- b = self.BytesIO()
- t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
- self.assertRaises(UnicodeError, t.write, "\xff")
- # (3) ignore
- b = self.BytesIO()
- t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
- newline="\n")
- t.write("abc\xffdef\n")
- t.flush()
- self.assertEqual(b.getvalue(), b"abcdef\n")
- # (4) replace
- b = self.BytesIO()
- t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
- newline="\n")
- t.write("abc\xffdef\n")
- t.flush()
- self.assertEqual(b.getvalue(), b"abc?def\n")
-
- def test_newlines(self):
- input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
-
- tests = [
- [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
- [ '', input_lines ],
- [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
- [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
- [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
- ]
- encodings = (
- 'utf-8', 'latin-1',
- 'utf-16', 'utf-16-le', 'utf-16-be',
- 'utf-32', 'utf-32-le', 'utf-32-be',
- )
-
- # Try a range of buffer sizes to test the case where \r is the last
- # character in TextIOWrapper._pending_line.
- for encoding in encodings:
- # XXX: str.encode() should return bytes
- data = bytes(''.join(input_lines).encode(encoding))
- for do_reads in (False, True):
- for bufsize in range(1, 10):
- for newline, exp_lines in tests:
- bufio = self.BufferedReader(self.BytesIO(data), bufsize)
- textio = self.TextIOWrapper(bufio, newline=newline,
- encoding=encoding)
- if do_reads:
- got_lines = []
- while True:
- c2 = textio.read(2)
- if c2 == '':
- break
- self.assertEqual(len(c2), 2)
- got_lines.append(c2 + textio.readline())
- else:
- got_lines = list(textio)
-
- for got_line, exp_line in zip(got_lines, exp_lines):
- self.assertEqual(got_line, exp_line)
- self.assertEqual(len(got_lines), len(exp_lines))
-
- def test_newlines_input(self):
- testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
- normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
- for newline, expected in [
- (None, normalized.decode("ascii").splitlines(keepends=True)),
- ("", testdata.decode("ascii").splitlines(keepends=True)),
- ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
- ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
- ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
- ]:
- buf = self.BytesIO(testdata)
- txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
- self.assertEqual(txt.readlines(), expected)
- txt.seek(0)
- self.assertEqual(txt.read(), "".join(expected))
-
- def test_newlines_output(self):
- testdict = {
- "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
- "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
- "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
- "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
- }
- tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
- for newline, expected in tests:
- buf = self.BytesIO()
- txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
- txt.write("AAA\nB")
- txt.write("BB\nCCC\n")
- txt.write("X\rY\r\nZ")
- txt.flush()
- self.assertEqual(buf.closed, False)
- self.assertEqual(buf.getvalue(), expected)
-
- def test_destructor(self):
- l = []
- base = self.BytesIO
- class MyBytesIO(base):
- def close(self):
- l.append(self.getvalue())
- base.close(self)
- b = MyBytesIO()
- t = self.TextIOWrapper(b, encoding="ascii")
- t.write("abc")
- del t
- support.gc_collect()
- self.assertEqual([b"abc"], l)
-
- def test_override_destructor(self):
- record = []
- class MyTextIO(self.TextIOWrapper):
- def __del__(self):
- record.append(1)
- try:
- f = super().__del__
- except AttributeError:
- pass
- else:
- f()
- def close(self):
- record.append(2)
- super().close()
- def flush(self):
- record.append(3)
- super().flush()
- b = self.BytesIO()
- t = MyTextIO(b, encoding="ascii")
- del t
- support.gc_collect()
- self.assertEqual(record, [1, 2, 3])
-
- def test_error_through_destructor(self):
- # Test that the exception state is not modified by a destructor,
- # even if close() fails.
- rawio = self.CloseFailureIO()
- with support.catch_unraisable_exception() as cm:
- with self.assertRaises(AttributeError):
- self.TextIOWrapper(rawio, encoding="utf-8").xyzzy
-
- self.assertEqual(cm.unraisable.exc_type, OSError)
-
- # Systematic tests of the text I/O API
-
- def test_basic_io(self):
- for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
- for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
- f = self.open(os_helper.TESTFN, "w+", encoding=enc)
- f._CHUNK_SIZE = chunksize
- self.assertEqual(f.write("abc"), 3)
- f.close()
- f = self.open(os_helper.TESTFN, "r+", encoding=enc)
- f._CHUNK_SIZE = chunksize
- self.assertEqual(f.tell(), 0)
- self.assertEqual(f.read(), "abc")
- cookie = f.tell()
- self.assertEqual(f.seek(0), 0)
- self.assertEqual(f.read(None), "abc")
- f.seek(0)
- self.assertEqual(f.read(2), "ab")
- self.assertEqual(f.read(1), "c")
- self.assertEqual(f.read(1), "")
- self.assertEqual(f.read(), "")
- self.assertEqual(f.tell(), cookie)
- self.assertEqual(f.seek(0), 0)
- self.assertEqual(f.seek(0, 2), cookie)
- self.assertEqual(f.write("def"), 3)
- self.assertEqual(f.seek(cookie), cookie)
- self.assertEqual(f.read(), "def")
- if enc.startswith("utf"):
- self.multi_line_test(f, enc)
- f.close()
-
- def multi_line_test(self, f, enc):
- f.seek(0)
- f.truncate()
- sample = "s\xff\u0fff\uffff"
- wlines = []
- for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
- chars = []
- for i in range(size):
- chars.append(sample[i % len(sample)])
- line = "".join(chars) + "\n"
- wlines.append((f.tell(), line))
- f.write(line)
- f.seek(0)
- rlines = []
- while True:
- pos = f.tell()
- line = f.readline()
- if not line:
- break
- rlines.append((pos, line))
- self.assertEqual(rlines, wlines)
-
- def test_telling(self):
- f = self.open(os_helper.TESTFN, "w+", encoding="utf-8")
- p0 = f.tell()
- f.write("\xff\n")
- p1 = f.tell()
- f.write("\xff\n")
- p2 = f.tell()
- f.seek(0)
- self.assertEqual(f.tell(), p0)
- self.assertEqual(f.readline(), "\xff\n")
- self.assertEqual(f.tell(), p1)
- self.assertEqual(f.readline(), "\xff\n")
- self.assertEqual(f.tell(), p2)
- f.seek(0)
- for line in f:
- self.assertEqual(line, "\xff\n")
- self.assertRaises(OSError, f.tell)
- self.assertEqual(f.tell(), p2)
- f.close()
-
- def test_seeking(self):
- chunk_size = _default_chunk_size()
- prefix_size = chunk_size - 2
- u_prefix = "a" * prefix_size
- prefix = bytes(u_prefix.encode("utf-8"))
- self.assertEqual(len(u_prefix), len(prefix))
- u_suffix = "\u8888\n"
- suffix = bytes(u_suffix.encode("utf-8"))
- line = prefix + suffix
- with self.open(os_helper.TESTFN, "wb") as f:
- f.write(line*2)
- with self.open(os_helper.TESTFN, "r", encoding="utf-8") as f:
- s = f.read(prefix_size)
- self.assertEqual(s, str(prefix, "ascii"))
- self.assertEqual(f.tell(), prefix_size)
- self.assertEqual(f.readline(), u_suffix)
-
- def test_seeking_too(self):
- # Regression test for a specific bug
- data = b'\xe0\xbf\xbf\n'
- with self.open(os_helper.TESTFN, "wb") as f:
- f.write(data)
- with self.open(os_helper.TESTFN, "r", encoding="utf-8") as f:
- f._CHUNK_SIZE # Just test that it exists
- f._CHUNK_SIZE = 2
- f.readline()
- f.tell()
-
- def test_seek_and_tell(self):
- #Test seek/tell using the StatefulIncrementalDecoder.
- # Make test faster by doing smaller seeks
- CHUNK_SIZE = 128
-
- def test_seek_and_tell_with_data(data, min_pos=0):
- """Tell/seek to various points within a data stream and ensure
- that the decoded data returned by read() is consistent."""
- f = self.open(os_helper.TESTFN, 'wb')
- f.write(data)
- f.close()
- f = self.open(os_helper.TESTFN, encoding='test_decoder')
- f._CHUNK_SIZE = CHUNK_SIZE
- decoded = f.read()
- f.close()
-
- for i in range(min_pos, len(decoded) + 1): # seek positions
- for j in [1, 5, len(decoded) - i]: # read lengths
- f = self.open(os_helper.TESTFN, encoding='test_decoder')
- self.assertEqual(f.read(i), decoded[:i])
- cookie = f.tell()
- self.assertEqual(f.read(j), decoded[i:i + j])
- f.seek(cookie)
- self.assertEqual(f.read(), decoded[i:])
- f.close()
-
- # Enable the test decoder.
- StatefulIncrementalDecoder.codecEnabled = 1
-
- # Run the tests.
- try:
- # Try each test case.
- for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
- test_seek_and_tell_with_data(input)
-
- # Position each test case so that it crosses a chunk boundary.
- for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
- offset = CHUNK_SIZE - len(input)//2
- prefix = b'.'*offset
- # Don't bother seeking into the prefix (takes too long).
- min_pos = offset*2
- test_seek_and_tell_with_data(prefix + input, min_pos)
-
- # Ensure our test decoder won't interfere with subsequent tests.
- finally:
- StatefulIncrementalDecoder.codecEnabled = 0
-
- def test_multibyte_seek_and_tell(self):
- f = self.open(os_helper.TESTFN, "w", encoding="euc_jp")
- f.write("AB\n\u3046\u3048\n")
- f.close()
-
- f = self.open(os_helper.TESTFN, "r", encoding="euc_jp")
- self.assertEqual(f.readline(), "AB\n")
- p0 = f.tell()
- self.assertEqual(f.readline(), "\u3046\u3048\n")
- p1 = f.tell()
- f.seek(p0)
- self.assertEqual(f.readline(), "\u3046\u3048\n")
- self.assertEqual(f.tell(), p1)
- f.close()
-
- def test_seek_with_encoder_state(self):
- f = self.open(os_helper.TESTFN, "w", encoding="euc_jis_2004")
- f.write("\u00e6\u0300")
- p0 = f.tell()
- f.write("\u00e6")
- f.seek(p0)
- f.write("\u0300")
- f.close()
-
- f = self.open(os_helper.TESTFN, "r", encoding="euc_jis_2004")
- self.assertEqual(f.readline(), "\u00e6\u0300\u0300")
- f.close()
-
- def test_encoded_writes(self):
- data = "1234567890"
- tests = ("utf-16",
- "utf-16-le",
- "utf-16-be",
- "utf-32",
- "utf-32-le",
- "utf-32-be")
- for encoding in tests:
- buf = self.BytesIO()
- f = self.TextIOWrapper(buf, encoding=encoding)
- # Check if the BOM is written only once (see issue1753).
- f.write(data)
- f.write(data)
- f.seek(0)
- self.assertEqual(f.read(), data * 2)
- f.seek(0)
- self.assertEqual(f.read(), data * 2)
- self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
-
- def test_unreadable(self):
- class UnReadable(self.BytesIO):
- def readable(self):
- return False
- txt = self.TextIOWrapper(UnReadable(), encoding="utf-8")
- self.assertRaises(OSError, txt.read)
-
- def test_read_one_by_one(self):
- txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"), encoding="utf-8")
- reads = ""
- while True:
- c = txt.read(1)
- if not c:
- break
- reads += c
- self.assertEqual(reads, "AA\nBB")
-
- def test_readlines(self):
- txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"), encoding="utf-8")
- self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
- txt.seek(0)
- self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
- txt.seek(0)
- self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
-
- # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
- def test_read_by_chunk(self):
- # make sure "\r\n" straddles 128 char boundary.
- txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"), encoding="utf-8")
- reads = ""
- while True:
- c = txt.read(128)
- if not c:
- break
- reads += c
- self.assertEqual(reads, "A"*127+"\nB")
-
- def test_writelines(self):
- l = ['ab', 'cd', 'ef']
- buf = self.BytesIO()
- txt = self.TextIOWrapper(buf, encoding="utf-8")
- txt.writelines(l)
- txt.flush()
- self.assertEqual(buf.getvalue(), b'abcdef')
-
- def test_writelines_userlist(self):
- l = UserList(['ab', 'cd', 'ef'])
- buf = self.BytesIO()
- txt = self.TextIOWrapper(buf, encoding="utf-8")
- txt.writelines(l)
- txt.flush()
- self.assertEqual(buf.getvalue(), b'abcdef')
-
- def test_writelines_error(self):
- txt = self.TextIOWrapper(self.BytesIO(), encoding="utf-8")
- self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
- self.assertRaises(TypeError, txt.writelines, None)
- self.assertRaises(TypeError, txt.writelines, b'abc')
-
- def test_issue1395_1(self):
- txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
-
- # read one char at a time
- reads = ""
- while True:
- c = txt.read(1)
- if not c:
- break
- reads += c
- self.assertEqual(reads, self.normalized)
-
- def test_issue1395_2(self):
- txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
- txt._CHUNK_SIZE = 4
-
- reads = ""
- while True:
- c = txt.read(4)
- if not c:
- break
- reads += c
- self.assertEqual(reads, self.normalized)
-
- def test_issue1395_3(self):
- txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
- txt._CHUNK_SIZE = 4
-
- reads = txt.read(4)
- reads += txt.read(4)
- reads += txt.readline()
- reads += txt.readline()
- reads += txt.readline()
- self.assertEqual(reads, self.normalized)
-
- def test_issue1395_4(self):
- txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
- txt._CHUNK_SIZE = 4
-
- reads = txt.read(4)
- reads += txt.read()
- self.assertEqual(reads, self.normalized)
-
- def test_issue1395_5(self):
- txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
- txt._CHUNK_SIZE = 4
-
- reads = txt.read(4)
- pos = txt.tell()
- txt.seek(0)
- txt.seek(pos)
- self.assertEqual(txt.read(4), "BBB\n")
-
- def test_issue2282(self):
- buffer = self.BytesIO(self.testdata)
- txt = self.TextIOWrapper(buffer, encoding="ascii")
-
- self.assertEqual(buffer.seekable(), txt.seekable())
-
- def test_append_bom(self):
- # The BOM is not written again when appending to a non-empty file
- filename = os_helper.TESTFN
- for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
- with self.open(filename, 'w', encoding=charset) as f:
- f.write('aaa')
- pos = f.tell()
- with self.open(filename, 'rb') as f:
- self.assertEqual(f.read(), 'aaa'.encode(charset))
-
- with self.open(filename, 'a', encoding=charset) as f:
- f.write('xxx')
- with self.open(filename, 'rb') as f:
- self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
-
- def test_seek_bom(self):
- # Same test, but when seeking manually
- filename = os_helper.TESTFN
- for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
- with self.open(filename, 'w', encoding=charset) as f:
- f.write('aaa')
- pos = f.tell()
- with self.open(filename, 'r+', encoding=charset) as f:
- f.seek(pos)
- f.write('zzz')
- f.seek(0)
- f.write('bbb')
- with self.open(filename, 'rb') as f:
- self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
-
- def test_seek_append_bom(self):
- # Same test, but first seek to the start and then to the end
- filename = os_helper.TESTFN
- for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
- with self.open(filename, 'w', encoding=charset) as f:
- f.write('aaa')
- with self.open(filename, 'a', encoding=charset) as f:
- f.seek(0)
- f.seek(0, self.SEEK_END)
- f.write('xxx')
- with self.open(filename, 'rb') as f:
- self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
-
- def test_errors_property(self):
- with self.open(os_helper.TESTFN, "w", encoding="utf-8") as f:
- self.assertEqual(f.errors, "strict")
- with self.open(os_helper.TESTFN, "w", encoding="utf-8", errors="replace") as f:
- self.assertEqual(f.errors, "replace")
-
- @support.no_tracing
- @threading_helper.requires_working_threading()
- def test_threads_write(self):
- # Issue6750: concurrent writes could duplicate data
- event = threading.Event()
- with self.open(os_helper.TESTFN, "w", encoding="utf-8", buffering=1) as f:
- def run(n):
- text = "Thread%03d\n" % n
- event.wait()
- f.write(text)
- threads = [threading.Thread(target=run, args=(x,))
- for x in range(20)]
- with threading_helper.start_threads(threads, event.set):
- time.sleep(0.02)
- with self.open(os_helper.TESTFN, encoding="utf-8") as f:
- content = f.read()
- for n in range(20):
- self.assertEqual(content.count("Thread%03d\n" % n), 1)
-
- def test_flush_error_on_close(self):
- # Test that text file is closed despite failed flush
- # and that flush() is called before file closed.
- txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
- closed = []
- def bad_flush():
- closed[:] = [txt.closed, txt.buffer.closed]
- raise OSError()
- txt.flush = bad_flush
- self.assertRaises(OSError, txt.close) # exception not swallowed
- self.assertTrue(txt.closed)
- self.assertTrue(txt.buffer.closed)
- self.assertTrue(closed) # flush() called
- self.assertFalse(closed[0]) # flush() called before file closed
- self.assertFalse(closed[1])
- txt.flush = lambda: None # break reference loop
-
- def test_close_error_on_close(self):
- buffer = self.BytesIO(self.testdata)
- def bad_flush():
- raise OSError('flush')
- def bad_close():
- raise OSError('close')
- buffer.close = bad_close
- txt = self.TextIOWrapper(buffer, encoding="ascii")
- txt.flush = bad_flush
- with self.assertRaises(OSError) as err: # exception not swallowed
- txt.close()
- self.assertEqual(err.exception.args, ('close',))
- self.assertIsInstance(err.exception.__context__, OSError)
- self.assertEqual(err.exception.__context__.args, ('flush',))
- self.assertFalse(txt.closed)
-
- # Silence destructor error
- buffer.close = lambda: None
- txt.flush = lambda: None
-
- def test_nonnormalized_close_error_on_close(self):
- # Issue #21677
- buffer = self.BytesIO(self.testdata)
- def bad_flush():
- raise non_existing_flush
- def bad_close():
- raise non_existing_close
- buffer.close = bad_close
- txt = self.TextIOWrapper(buffer, encoding="ascii")
- txt.flush = bad_flush
- with self.assertRaises(NameError) as err: # exception not swallowed
- txt.close()
- self.assertIn('non_existing_close', str(err.exception))
- self.assertIsInstance(err.exception.__context__, NameError)
- self.assertIn('non_existing_flush', str(err.exception.__context__))
- self.assertFalse(txt.closed)
-
- # Silence destructor error
- buffer.close = lambda: None
- txt.flush = lambda: None
-
- def test_multi_close(self):
- txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
- txt.close()
- txt.close()
- txt.close()
- self.assertRaises(ValueError, txt.flush)
-
- def test_unseekable(self):
- txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata), encoding="utf-8")
- self.assertRaises(self.UnsupportedOperation, txt.tell)
- self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
-
- def test_readonly_attributes(self):
- txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
- buf = self.BytesIO(self.testdata)
- with self.assertRaises(AttributeError):
- txt.buffer = buf
-
- def test_rawio(self):
- # Issue #12591: TextIOWrapper must work with raw I/O objects, so
- # that subprocess.Popen() can have the required unbuffered
- # semantics with universal_newlines=True.
- raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
- txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
- # Reads
- self.assertEqual(txt.read(4), 'abcd')
- self.assertEqual(txt.readline(), 'efghi\n')
- self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
-
- def test_rawio_write_through(self):
- # Issue #12591: with write_through=True, writes don't need a flush
- raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
- txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
- write_through=True)
- txt.write('1')
- txt.write('23\n4')
- txt.write('5')
- self.assertEqual(b''.join(raw._write_stack), b'123\n45')
-
- def test_bufio_write_through(self):
- # Issue #21396: write_through=True doesn't force a flush()
- # on the underlying binary buffered object.
- flush_called, write_called = [], []
- class BufferedWriter(self.BufferedWriter):
- def flush(self, *args, **kwargs):
- flush_called.append(True)
- return super().flush(*args, **kwargs)
- def write(self, *args, **kwargs):
- write_called.append(True)
- return super().write(*args, **kwargs)
-
- rawio = self.BytesIO()
- data = b"a"
- bufio = BufferedWriter(rawio, len(data)*2)
- textio = self.TextIOWrapper(bufio, encoding='ascii',
- write_through=True)
- # write to the buffered io but don't overflow the buffer
- text = data.decode('ascii')
- textio.write(text)
-
- # buffer.flush is not called with write_through=True
- self.assertFalse(flush_called)
- # buffer.write *is* called with write_through=True
- self.assertTrue(write_called)
- self.assertEqual(rawio.getvalue(), b"") # no flush
-
- write_called = [] # reset
- textio.write(text * 10) # total content is larger than bufio buffer
- self.assertTrue(write_called)
- self.assertEqual(rawio.getvalue(), data * 11) # all flushed
-
- def test_reconfigure_write_through(self):
- raw = self.MockRawIO([])
- t = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
- t.write('1')
- t.reconfigure(write_through=True) # implied flush
- self.assertEqual(t.write_through, True)
- self.assertEqual(b''.join(raw._write_stack), b'1')
- t.write('23')
- self.assertEqual(b''.join(raw._write_stack), b'123')
- t.reconfigure(write_through=False)
- self.assertEqual(t.write_through, False)
- t.write('45')
- t.flush()
- self.assertEqual(b''.join(raw._write_stack), b'12345')
- # Keeping default value
- t.reconfigure()
- t.reconfigure(write_through=None)
- self.assertEqual(t.write_through, False)
- t.reconfigure(write_through=True)
- t.reconfigure()
- t.reconfigure(write_through=None)
- self.assertEqual(t.write_through, True)
-
- def test_read_nonbytes(self):
- # Issue #17106
- # Crash when underlying read() returns non-bytes
- t = self.TextIOWrapper(self.StringIO('a'), encoding="utf-8")
- self.assertRaises(TypeError, t.read, 1)
- t = self.TextIOWrapper(self.StringIO('a'), encoding="utf-8")
- self.assertRaises(TypeError, t.readline)
- t = self.TextIOWrapper(self.StringIO('a'), encoding="utf-8")
- self.assertRaises(TypeError, t.read)
-
- def test_illegal_encoder(self):
- # Issue 31271: Calling write() while the return value of encoder's
- # encode() is invalid shouldn't cause an assertion failure.
- rot13 = codecs.lookup("rot13")
- with support.swap_attr(rot13, '_is_text_encoding', True):
- t = self.TextIOWrapper(self.BytesIO(b'foo'), encoding="rot13")
- self.assertRaises(TypeError, t.write, 'bar')
-
- def test_illegal_decoder(self):
- # Issue #17106
- # Bypass the early encoding check added in issue 20404
- def _make_illegal_wrapper():
- quopri = codecs.lookup("quopri")
- quopri._is_text_encoding = True
- try:
- t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'),
- newline='\n', encoding="quopri")
- finally:
- quopri._is_text_encoding = False
- return t
- # Crash when decoder returns non-string
- t = _make_illegal_wrapper()
- self.assertRaises(TypeError, t.read, 1)
- t = _make_illegal_wrapper()
- self.assertRaises(TypeError, t.readline)
- t = _make_illegal_wrapper()
- self.assertRaises(TypeError, t.read)
-
- # Issue 31243: calling read() while the return value of decoder's
- # getstate() is invalid should neither crash the interpreter nor
- # raise a SystemError.
- def _make_very_illegal_wrapper(getstate_ret_val):
- class BadDecoder:
- def getstate(self):
- return getstate_ret_val
- def _get_bad_decoder(dummy):
- return BadDecoder()
- quopri = codecs.lookup("quopri")
- with support.swap_attr(quopri, 'incrementaldecoder',
- _get_bad_decoder):
- return _make_illegal_wrapper()
- t = _make_very_illegal_wrapper(42)
- self.assertRaises(TypeError, t.read, 42)
- t = _make_very_illegal_wrapper(())
- self.assertRaises(TypeError, t.read, 42)
- t = _make_very_illegal_wrapper((1, 2))
- self.assertRaises(TypeError, t.read, 42)
-
- def _check_create_at_shutdown(self, **kwargs):
- # Issue #20037: creating a TextIOWrapper at shutdown
- # shouldn't crash the interpreter.
- iomod = self.io.__name__
- code = """if 1:
- import codecs
- import {iomod} as io
-
- # Avoid looking up codecs at shutdown
- codecs.lookup('utf-8')
-
- class C:
- def __del__(self):
- io.TextIOWrapper(io.BytesIO(), **{kwargs})
- print("ok")
- c = C()
- """.format(iomod=iomod, kwargs=kwargs)
- return assert_python_ok("-c", code)
-
- def test_create_at_shutdown_without_encoding(self):
- rc, out, err = self._check_create_at_shutdown()
- if err:
- # Can error out with a RuntimeError if the module state
- # isn't found.
- self.assertIn(self.shutdown_error, err.decode())
- else:
- self.assertEqual("ok", out.decode().strip())
-
- def test_create_at_shutdown_with_encoding(self):
- rc, out, err = self._check_create_at_shutdown(encoding='utf-8',
- errors='strict')
- self.assertFalse(err)
- self.assertEqual("ok", out.decode().strip())
-
- def test_read_byteslike(self):
- r = MemviewBytesIO(b'Just some random string\n')
- t = self.TextIOWrapper(r, 'utf-8')
-
- # TextIOwrapper will not read the full string, because
- # we truncate it to a multiple of the native int size
- # so that we can construct a more complex memoryview.
- bytes_val = _to_memoryview(r.getvalue()).tobytes()
-
- self.assertEqual(t.read(200), bytes_val.decode('utf-8'))
-
- def test_issue22849(self):
- class F(object):
- def readable(self): return True
- def writable(self): return True
- def seekable(self): return True
-
- for i in range(10):
- try:
- self.TextIOWrapper(F(), encoding='utf-8')
- except Exception:
- pass
-
- F.tell = lambda x: 0
- t = self.TextIOWrapper(F(), encoding='utf-8')
-
- def test_reconfigure_locale(self):
- wrapper = self.TextIOWrapper(self.BytesIO(b"test"))
- wrapper.reconfigure(encoding="locale")
-
- def test_reconfigure_encoding_read(self):
- # latin1 -> utf8
- # (latin1 can decode utf-8 encoded string)
- data = 'abc\xe9\n'.encode('latin1') + 'd\xe9f\n'.encode('utf8')
- raw = self.BytesIO(data)
- txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n')
- self.assertEqual(txt.readline(), 'abc\xe9\n')
- with self.assertRaises(self.UnsupportedOperation):
- txt.reconfigure(encoding='utf-8')
- with self.assertRaises(self.UnsupportedOperation):
- txt.reconfigure(newline=None)
-
- def test_reconfigure_write_fromascii(self):
- # ascii has a specific encodefunc in the C implementation,
- # but utf-8-sig has not. Make sure that we get rid of the
- # cached encodefunc when we switch encoders.
- raw = self.BytesIO()
- txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
- txt.write('foo\n')
- txt.reconfigure(encoding='utf-8-sig')
- txt.write('\xe9\n')
- txt.flush()
- self.assertEqual(raw.getvalue(), b'foo\n\xc3\xa9\n')
-
- def test_reconfigure_write(self):
- # latin -> utf8
- raw = self.BytesIO()
- txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n')
- txt.write('abc\xe9\n')
- txt.reconfigure(encoding='utf-8')
- self.assertEqual(raw.getvalue(), b'abc\xe9\n')
- txt.write('d\xe9f\n')
- txt.flush()
- self.assertEqual(raw.getvalue(), b'abc\xe9\nd\xc3\xa9f\n')
-
- # ascii -> utf-8-sig: ensure that no BOM is written in the middle of
- # the file
- raw = self.BytesIO()
- txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
- txt.write('abc\n')
- txt.reconfigure(encoding='utf-8-sig')
- txt.write('d\xe9f\n')
- txt.flush()
- self.assertEqual(raw.getvalue(), b'abc\nd\xc3\xa9f\n')
-
- def test_reconfigure_write_non_seekable(self):
- raw = self.BytesIO()
- raw.seekable = lambda: False
- raw.seek = None
- txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
- txt.write('abc\n')
- txt.reconfigure(encoding='utf-8-sig')
- txt.write('d\xe9f\n')
- txt.flush()
-
- # If the raw stream is not seekable, there'll be a BOM
- self.assertEqual(raw.getvalue(), b'abc\n\xef\xbb\xbfd\xc3\xa9f\n')
-
- def test_reconfigure_defaults(self):
- txt = self.TextIOWrapper(self.BytesIO(), 'ascii', 'replace', '\n')
- txt.reconfigure(encoding=None)
- self.assertEqual(txt.encoding, 'ascii')
- self.assertEqual(txt.errors, 'replace')
- txt.write('LF\n')
-
- txt.reconfigure(newline='\r\n')
- self.assertEqual(txt.encoding, 'ascii')
- self.assertEqual(txt.errors, 'replace')
-
- txt.reconfigure(errors='ignore')
- self.assertEqual(txt.encoding, 'ascii')
- self.assertEqual(txt.errors, 'ignore')
- txt.write('CRLF\n')
-
- txt.reconfigure(encoding='utf-8', newline=None)
- self.assertEqual(txt.errors, 'strict')
- txt.seek(0)
- self.assertEqual(txt.read(), 'LF\nCRLF\n')
-
- self.assertEqual(txt.detach().getvalue(), b'LF\nCRLF\r\n')
-
- def test_reconfigure_errors(self):
- txt = self.TextIOWrapper(self.BytesIO(), 'ascii', 'replace', '\r')
- with self.assertRaises(TypeError): # there was a crash
- txt.reconfigure(encoding=42)
- if self.is_C:
- with self.assertRaises(UnicodeEncodeError):
- txt.reconfigure(encoding='\udcfe')
- with self.assertRaises(LookupError):
- txt.reconfigure(encoding='locale\0')
- # TODO: txt.reconfigure(encoding='utf-8\0')
- # TODO: txt.reconfigure(encoding='nonexisting')
- with self.assertRaises(TypeError):
- txt.reconfigure(errors=42)
- if self.is_C:
- with self.assertRaises(UnicodeEncodeError):
- txt.reconfigure(errors='\udcfe')
- # TODO: txt.reconfigure(errors='ignore\0')
- # TODO: txt.reconfigure(errors='nonexisting')
- with self.assertRaises(TypeError):
- txt.reconfigure(newline=42)
- with self.assertRaises(ValueError):
- txt.reconfigure(newline='\udcfe')
- with self.assertRaises(ValueError):
- txt.reconfigure(newline='xyz')
- if not self.is_C:
- # TODO: Should fail in C too.
- with self.assertRaises(ValueError):
- txt.reconfigure(newline='\n\0')
- if self.is_C:
- # TODO: Use __bool__(), not __index__().
- with self.assertRaises(ZeroDivisionError):
- txt.reconfigure(line_buffering=BadIndex())
- with self.assertRaises(OverflowError):
- txt.reconfigure(line_buffering=2**1000)
- with self.assertRaises(ZeroDivisionError):
- txt.reconfigure(write_through=BadIndex())
- with self.assertRaises(OverflowError):
- txt.reconfigure(write_through=2**1000)
- with self.assertRaises(ZeroDivisionError): # there was a crash
- txt.reconfigure(line_buffering=BadIndex(),
- write_through=BadIndex())
- self.assertEqual(txt.encoding, 'ascii')
- self.assertEqual(txt.errors, 'replace')
- self.assertIs(txt.line_buffering, False)
- self.assertIs(txt.write_through, False)
-
- txt.reconfigure(encoding='latin1', errors='ignore', newline='\r\n',
- line_buffering=True, write_through=True)
- self.assertEqual(txt.encoding, 'latin1')
- self.assertEqual(txt.errors, 'ignore')
- self.assertIs(txt.line_buffering, True)
- self.assertIs(txt.write_through, True)
-
- def test_reconfigure_newline(self):
- raw = self.BytesIO(b'CR\rEOF')
- txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
- txt.reconfigure(newline=None)
- self.assertEqual(txt.readline(), 'CR\n')
- raw = self.BytesIO(b'CR\rEOF')
- txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
- txt.reconfigure(newline='')
- self.assertEqual(txt.readline(), 'CR\r')
- raw = self.BytesIO(b'CR\rLF\nEOF')
- txt = self.TextIOWrapper(raw, 'ascii', newline='\r')
- txt.reconfigure(newline='\n')
- self.assertEqual(txt.readline(), 'CR\rLF\n')
- raw = self.BytesIO(b'LF\nCR\rEOF')
- txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
- txt.reconfigure(newline='\r')
- self.assertEqual(txt.readline(), 'LF\nCR\r')
- raw = self.BytesIO(b'CR\rCRLF\r\nEOF')
- txt = self.TextIOWrapper(raw, 'ascii', newline='\r')
- txt.reconfigure(newline='\r\n')
- self.assertEqual(txt.readline(), 'CR\rCRLF\r\n')
-
- txt = self.TextIOWrapper(self.BytesIO(), 'ascii', newline='\r')
- txt.reconfigure(newline=None)
- txt.write('linesep\n')
- txt.reconfigure(newline='')
- txt.write('LF\n')
- txt.reconfigure(newline='\n')
- txt.write('LF\n')
- txt.reconfigure(newline='\r')
- txt.write('CR\n')
- txt.reconfigure(newline='\r\n')
- txt.write('CRLF\n')
- expected = 'linesep' + os.linesep + 'LF\nLF\nCR\rCRLF\r\n'
- self.assertEqual(txt.detach().getvalue().decode('ascii'), expected)
-
- def test_issue25862(self):
- # Assertion failures occurred in tell() after read() and write().
- t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii')
- t.read(1)
- t.read()
- t.tell()
- t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii')
- t.read(1)
- t.write('x')
- t.tell()
-
- def test_issue35928(self):
- p = self.BufferedRWPair(self.BytesIO(b'foo\nbar\n'), self.BytesIO())
- f = self.TextIOWrapper(p)
- res = f.readline()
- self.assertEqual(res, 'foo\n')
- f.write(res)
- self.assertEqual(res + f.readline(), 'foo\nbar\n')
-
- def test_pickling_subclass(self):
- global MyTextIO
- class MyTextIO(self.TextIOWrapper):
- def __init__(self, raw, tag):
- super().__init__(raw)
- self.tag = tag
- def __getstate__(self):
- return self.tag, self.buffer.getvalue()
- def __setstate__(slf, state):
- tag, value = state
- slf.__init__(self.BytesIO(value), tag)
-
- raw = self.BytesIO(b'data')
- txt = MyTextIO(raw, 'ham')
- for proto in range(pickle.HIGHEST_PROTOCOL + 1):
- with self.subTest(protocol=proto):
- pickled = pickle.dumps(txt, proto)
- newtxt = pickle.loads(pickled)
- self.assertEqual(newtxt.buffer.getvalue(), b'data')
- self.assertEqual(newtxt.tag, 'ham')
- del MyTextIO
-
- @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
- def test_read_non_blocking(self):
- import os
- r, w = os.pipe()
- try:
- os.set_blocking(r, False)
- with self.io.open(r, 'rt') as textfile:
- r = None
- # Nothing has been written so a non-blocking read raises a BlockingIOError exception.
- with self.assertRaises(BlockingIOError):
- textfile.read()
- finally:
- if r is not None:
- os.close(r)
- os.close(w)
-
-
-class MemviewBytesIO(io.BytesIO):
- '''A BytesIO object whose read method returns memoryviews
- rather than bytes'''
-
- def read1(self, len_):
- return _to_memoryview(super().read1(len_))
-
- def read(self, len_):
- return _to_memoryview(super().read(len_))
-
-def _to_memoryview(buf):
- '''Convert bytes-object *buf* to a non-trivial memoryview'''
-
- arr = array.array('i')
- idx = len(buf) - len(buf) % arr.itemsize
- arr.frombytes(buf[:idx])
- return memoryview(arr)
-
-
-class CTextIOWrapperTest(TextIOWrapperTest, CTestCase):
- shutdown_error = "LookupError: unknown encoding: ascii"
-
- def test_initialization(self):
- r = self.BytesIO(b"\xc3\xa9\n\n")
- b = self.BufferedReader(r, 1000)
- t = self.TextIOWrapper(b, encoding="utf-8")
- self.assertRaises(ValueError, t.__init__, b, encoding="utf-8", newline='xyzzy')
- self.assertRaises(ValueError, t.read)
-
- t = self.TextIOWrapper.__new__(self.TextIOWrapper)
- self.assertRaises(Exception, repr, t)
-
- def test_garbage_collection(self):
- # C TextIOWrapper objects are collected, and collecting them flushes
- # all data to disk.
- # The Python version has __del__, so it ends in gc.garbage instead.
- with warnings.catch_warnings():
- warnings.simplefilter("ignore", ResourceWarning)
- rawio = self.FileIO(os_helper.TESTFN, "wb")
- b = self.BufferedWriter(rawio)
- t = self.TextIOWrapper(b, encoding="ascii")
- t.write("456def")
- t.x = t
- wr = weakref.ref(t)
- del t
- support.gc_collect()
- self.assertIsNone(wr(), wr)
- with self.open(os_helper.TESTFN, "rb") as f:
- self.assertEqual(f.read(), b"456def")
-
- def test_rwpair_cleared_before_textio(self):
- # Issue 13070: TextIOWrapper's finalization would crash when called
- # after the reference to the underlying BufferedRWPair's writer got
- # cleared by the GC.
- for i in range(1000):
- b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
- t1 = self.TextIOWrapper(b1, encoding="ascii")
- b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
- t2 = self.TextIOWrapper(b2, encoding="ascii")
- # circular references
- t1.buddy = t2
- t2.buddy = t1
- support.gc_collect()
-
- def test_del__CHUNK_SIZE_SystemError(self):
- t = self.TextIOWrapper(self.BytesIO(), encoding='ascii')
- with self.assertRaises(AttributeError):
- del t._CHUNK_SIZE
-
- def test_internal_buffer_size(self):
- # bpo-43260: TextIOWrapper's internal buffer should not store
- # data larger than chunk size.
- chunk_size = 8192 # default chunk size, updated later
-
- class MockIO(self.MockRawIO):
- def write(self, data):
- if len(data) > chunk_size:
- raise RuntimeError
- return super().write(data)
-
- buf = MockIO()
- t = self.TextIOWrapper(buf, encoding="ascii")
- chunk_size = t._CHUNK_SIZE
- t.write("abc")
- t.write("def")
- # default chunk size is 8192 bytes so t don't write data to buf.
- self.assertEqual([], buf._write_stack)
-
- with self.assertRaises(RuntimeError):
- t.write("x"*(chunk_size+1))
-
- self.assertEqual([b"abcdef"], buf._write_stack)
- t.write("ghi")
- t.write("x"*chunk_size)
- self.assertEqual([b"abcdef", b"ghi", b"x"*chunk_size], buf._write_stack)
-
- def test_issue119506(self):
- chunk_size = 8192
-
- class MockIO(self.MockRawIO):
- written = False
- def write(self, data):
- if not self.written:
- self.written = True
- t.write("middle")
- return super().write(data)
-
- buf = MockIO()
- t = self.TextIOWrapper(buf)
- t.write("abc")
- t.write("def")
- # writing data which size >= chunk_size cause flushing buffer before write.
- t.write("g" * chunk_size)
- t.flush()
-
- self.assertEqual([b"abcdef", b"middle", b"g"*chunk_size],
- buf._write_stack)
-
-
-class PyTextIOWrapperTest(TextIOWrapperTest, PyTestCase):
- shutdown_error = "LookupError: unknown encoding: ascii"
-
-
-class IncrementalNewlineDecoderTest:
-
- def check_newline_decoding_utf8(self, decoder):
- # UTF-8 specific tests for a newline decoder
- def _check_decode(b, s, **kwargs):
- # We exercise getstate() / setstate() as well as decode()
- state = decoder.getstate()
- self.assertEqual(decoder.decode(b, **kwargs), s)
- decoder.setstate(state)
- self.assertEqual(decoder.decode(b, **kwargs), s)
-
- _check_decode(b'\xe8\xa2\x88', "\u8888")
-
- _check_decode(b'\xe8', "")
- _check_decode(b'\xa2', "")
- _check_decode(b'\x88', "\u8888")
-
- _check_decode(b'\xe8', "")
- _check_decode(b'\xa2', "")
- _check_decode(b'\x88', "\u8888")
-
- _check_decode(b'\xe8', "")
- self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
-
- decoder.reset()
- _check_decode(b'\n', "\n")
- _check_decode(b'\r', "")
- _check_decode(b'', "\n", final=True)
- _check_decode(b'\r', "\n", final=True)
-
- _check_decode(b'\r', "")
- _check_decode(b'a', "\na")
-
- _check_decode(b'\r\r\n', "\n\n")
- _check_decode(b'\r', "")
- _check_decode(b'\r', "\n")
- _check_decode(b'\na', "\na")
-
- _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
- _check_decode(b'\xe8\xa2\x88', "\u8888")
- _check_decode(b'\n', "\n")
- _check_decode(b'\xe8\xa2\x88\r', "\u8888")
- _check_decode(b'\n', "\n")
-
- def check_newline_decoding(self, decoder, encoding):
- result = []
- if encoding is not None:
- encoder = codecs.getincrementalencoder(encoding)()
- def _decode_bytewise(s):
- # Decode one byte at a time
- for b in encoder.encode(s):
- result.append(decoder.decode(bytes([b])))
- else:
- encoder = None
- def _decode_bytewise(s):
- # Decode one char at a time
- for c in s:
- result.append(decoder.decode(c))
- self.assertEqual(decoder.newlines, None)
- _decode_bytewise("abc\n\r")
- self.assertEqual(decoder.newlines, '\n')
- _decode_bytewise("\nabc")
- self.assertEqual(decoder.newlines, ('\n', '\r\n'))
- _decode_bytewise("abc\r")
- self.assertEqual(decoder.newlines, ('\n', '\r\n'))
- _decode_bytewise("abc")
- self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
- _decode_bytewise("abc\r")
- self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
- decoder.reset()
- input = "abc"
- if encoder is not None:
- encoder.reset()
- input = encoder.encode(input)
- self.assertEqual(decoder.decode(input), "abc")
- self.assertEqual(decoder.newlines, None)
-
- def test_newline_decoder(self):
- encodings = (
- # None meaning the IncrementalNewlineDecoder takes unicode input
- # rather than bytes input
- None, 'utf-8', 'latin-1',
- 'utf-16', 'utf-16-le', 'utf-16-be',
- 'utf-32', 'utf-32-le', 'utf-32-be',
- )
- for enc in encodings:
- decoder = enc and codecs.getincrementaldecoder(enc)()
- decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
- self.check_newline_decoding(decoder, enc)
- decoder = codecs.getincrementaldecoder("utf-8")()
- decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
- self.check_newline_decoding_utf8(decoder)
- self.assertRaises(TypeError, decoder.setstate, 42)
-
- def test_newline_bytes(self):
- # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
- def _check(dec):
- self.assertEqual(dec.newlines, None)
- self.assertEqual(dec.decode("\u0D00"), "\u0D00")
- self.assertEqual(dec.newlines, None)
- self.assertEqual(dec.decode("\u0A00"), "\u0A00")
- self.assertEqual(dec.newlines, None)
- dec = self.IncrementalNewlineDecoder(None, translate=False)
- _check(dec)
- dec = self.IncrementalNewlineDecoder(None, translate=True)
- _check(dec)
-
- def test_translate(self):
- # issue 35062
- for translate in (-2, -1, 1, 2):
- decoder = codecs.getincrementaldecoder("utf-8")()
- decoder = self.IncrementalNewlineDecoder(decoder, translate)
- self.check_newline_decoding_utf8(decoder)
- decoder = codecs.getincrementaldecoder("utf-8")()
- decoder = self.IncrementalNewlineDecoder(decoder, translate=0)
- self.assertEqual(decoder.decode(b"\r\r\n"), "\r\r\n")
-
-class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest, unittest.TestCase):
- IncrementalNewlineDecoder = io.IncrementalNewlineDecoder
-
- @support.cpython_only
- def test_uninitialized(self):
- uninitialized = self.IncrementalNewlineDecoder.__new__(
- self.IncrementalNewlineDecoder)
- self.assertRaises(ValueError, uninitialized.decode, b'bar')
- self.assertRaises(ValueError, uninitialized.getstate)
- self.assertRaises(ValueError, uninitialized.setstate, (b'foo', 0))
- self.assertRaises(ValueError, uninitialized.reset)
-
-
-class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest, unittest.TestCase):
- IncrementalNewlineDecoder = pyio.IncrementalNewlineDecoder
-
-
# XXX Tests for open()
class MiscIOTest:
--- /dev/null
+import array
+import codecs
+import locale
+import os
+import pickle
+import sys
+import threading
+import time
+import unittest
+import warnings
+import weakref
+from collections import UserList
+from test import support
+from test.support import os_helper, threading_helper
+from test.support.script_helper import assert_python_ok
+from .utils import CTestCase, PyTestCase
+
+import io # C implementation of io
+import _pyio as pyio # Python implementation of io
+
+
+def _default_chunk_size():
+ """Get the default TextIOWrapper chunk size"""
+ with open(__file__, "r", encoding="latin-1") as f:
+ return f._CHUNK_SIZE
+
+
+class BadIndex:
+ def __index__(self):
+ 1/0
+
+
+# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
+# properties:
+# - A single output character can correspond to many bytes of input.
+# - The number of input bytes to complete the character can be
+# undetermined until the last input byte is received.
+# - The number of input bytes can vary depending on previous input.
+# - A single input byte can correspond to many characters of output.
+# - The number of output characters can be undetermined until the
+# last input byte is received.
+# - The number of output characters can vary depending on previous input.
+
+class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
+ """
+ For testing seek/tell behavior with a stateful, buffering decoder.
+
+ Input is a sequence of words. Words may be fixed-length (length set
+ by input) or variable-length (period-terminated). In variable-length
+ mode, extra periods are ignored. Possible words are:
+ - 'i' followed by a number sets the input length, I (maximum 99).
+ When I is set to 0, words are space-terminated.
+ - 'o' followed by a number sets the output length, O (maximum 99).
+ - Any other word is converted into a word followed by a period on
+ the output. The output word consists of the input word truncated
+ or padded out with hyphens to make its length equal to O. If O
+ is 0, the word is output verbatim without truncating or padding.
+ I and O are initially set to 1. When I changes, any buffered input is
+ re-scanned according to the new I. EOF also terminates the last word.
+ """
+
+ def __init__(self, errors='strict'):
+ codecs.IncrementalDecoder.__init__(self, errors)
+ self.reset()
+
+ def __repr__(self):
+ return '<SID %x>' % id(self)
+
+ def reset(self):
+ self.i = 1
+ self.o = 1
+ self.buffer = bytearray()
+
+ def getstate(self):
+ i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
+ return bytes(self.buffer), i*100 + o
+
+ def setstate(self, state):
+ buffer, io = state
+ self.buffer = bytearray(buffer)
+ i, o = divmod(io, 100)
+ self.i, self.o = i ^ 1, o ^ 1
+
+ def decode(self, input, final=False):
+ output = ''
+ for b in input:
+ if self.i == 0: # variable-length, terminated with period
+ if b == ord('.'):
+ if self.buffer:
+ output += self.process_word()
+ else:
+ self.buffer.append(b)
+ else: # fixed-length, terminate after self.i bytes
+ self.buffer.append(b)
+ if len(self.buffer) == self.i:
+ output += self.process_word()
+ if final and self.buffer: # EOF terminates the last word
+ output += self.process_word()
+ return output
+
+ def process_word(self):
+ output = ''
+ if self.buffer[0] == ord('i'):
+ self.i = min(99, int(self.buffer[1:] or 0)) # set input length
+ elif self.buffer[0] == ord('o'):
+ self.o = min(99, int(self.buffer[1:] or 0)) # set output length
+ else:
+ output = self.buffer.decode('ascii')
+ if len(output) < self.o:
+ output += '-'*self.o # pad out with hyphens
+ if self.o:
+ output = output[:self.o] # truncate to output length
+ output += '.'
+ self.buffer = bytearray()
+ return output
+
+ codecEnabled = False
+
+
+# bpo-41919: This method is separated from StatefulIncrementalDecoder to avoid a resource leak
+# when registering codecs and cleanup functions.
+def lookupTestDecoder(name):
+ if StatefulIncrementalDecoder.codecEnabled and name == 'test_decoder':
+ latin1 = codecs.lookup('latin-1')
+ return codecs.CodecInfo(
+ name='test_decoder', encode=latin1.encode, decode=None,
+ incrementalencoder=None,
+ streamreader=None, streamwriter=None,
+ incrementaldecoder=StatefulIncrementalDecoder)
+
+
+class StatefulIncrementalDecoderTest(unittest.TestCase):
+ """
+ Make sure the StatefulIncrementalDecoder actually works.
+ """
+
+ test_cases = [
+ # I=1, O=1 (fixed-length input == fixed-length output)
+ (b'abcd', False, 'a.b.c.d.'),
+ # I=0, O=0 (variable-length input, variable-length output)
+ (b'oiabcd', True, 'abcd.'),
+ # I=0, O=0 (should ignore extra periods)
+ (b'oi...abcd...', True, 'abcd.'),
+ # I=0, O=6 (variable-length input, fixed-length output)
+ (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
+ # I=2, O=6 (fixed-length input < fixed-length output)
+ (b'i.i2.o6xyz', True, 'xy----.z-----.'),
+ # I=6, O=3 (fixed-length input > fixed-length output)
+ (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
+ # I=0, then 3; O=29, then 15 (with longer output)
+ (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
+ 'a----------------------------.' +
+ 'b----------------------------.' +
+ 'cde--------------------------.' +
+ 'abcdefghijabcde.' +
+ 'a.b------------.' +
+ '.c.------------.' +
+ 'd.e------------.' +
+ 'k--------------.' +
+ 'l--------------.' +
+ 'm--------------.')
+ ]
+
+ def test_decoder(self):
+ # Try a few one-shot test cases.
+ for input, eof, output in self.test_cases:
+ d = StatefulIncrementalDecoder()
+ self.assertEqual(d.decode(input, eof), output)
+
+ # Also test an unfinished decode, followed by forcing EOF.
+ d = StatefulIncrementalDecoder()
+ self.assertEqual(d.decode(b'oiabcd'), '')
+ self.assertEqual(d.decode(b'', 1), 'abcd.')
+
+class TextIOWrapperTest:
+
+ def setUp(self):
+ self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
+ self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
+ os_helper.unlink(os_helper.TESTFN)
+ codecs.register(lookupTestDecoder)
+ self.addCleanup(codecs.unregister, lookupTestDecoder)
+
+ def tearDown(self):
+ os_helper.unlink(os_helper.TESTFN)
+
+ def test_constructor(self):
+ r = self.BytesIO(b"\xc3\xa9\n\n")
+ b = self.BufferedReader(r, 1000)
+ t = self.TextIOWrapper(b, encoding="utf-8")
+ t.__init__(b, encoding="latin-1", newline="\r\n")
+ self.assertEqual(t.encoding, "latin-1")
+ self.assertEqual(t.line_buffering, False)
+ t.__init__(b, encoding="utf-8", line_buffering=True)
+ self.assertEqual(t.encoding, "utf-8")
+ self.assertEqual(t.line_buffering, True)
+ self.assertEqual("\xe9\n", t.readline())
+ invalid_type = TypeError if self.is_C else ValueError
+ with self.assertRaises(invalid_type):
+ t.__init__(b, encoding=42)
+ with self.assertRaises(UnicodeEncodeError):
+ t.__init__(b, encoding='\udcfe')
+ with self.assertRaises(ValueError):
+ t.__init__(b, encoding='utf-8\0')
+ with self.assertRaises(invalid_type):
+ t.__init__(b, encoding="utf-8", errors=42)
+ if support.Py_DEBUG or sys.flags.dev_mode or self.is_C:
+ with self.assertRaises(UnicodeEncodeError):
+ t.__init__(b, encoding="utf-8", errors='\udcfe')
+ if support.Py_DEBUG or sys.flags.dev_mode or self.is_C:
+ with self.assertRaises(ValueError):
+ t.__init__(b, encoding="utf-8", errors='replace\0')
+ with self.assertRaises(TypeError):
+ t.__init__(b, encoding="utf-8", newline=42)
+ with self.assertRaises(ValueError):
+ t.__init__(b, encoding="utf-8", newline='\udcfe')
+ with self.assertRaises(ValueError):
+ t.__init__(b, encoding="utf-8", newline='\n\0')
+ with self.assertRaises(ValueError):
+ t.__init__(b, encoding="utf-8", newline='xyzzy')
+
+ def test_uninitialized(self):
+ t = self.TextIOWrapper.__new__(self.TextIOWrapper)
+ del t
+ t = self.TextIOWrapper.__new__(self.TextIOWrapper)
+ self.assertRaises(Exception, repr, t)
+ self.assertRaisesRegex((ValueError, AttributeError),
+ 'uninitialized|has no attribute',
+ t.read, 0)
+ t.__init__(self.MockRawIO(), encoding="utf-8")
+ self.assertEqual(t.read(0), '')
+
+ def test_non_text_encoding_codecs_are_rejected(self):
+ # Ensure the constructor complains if passed a codec that isn't
+ # marked as a text encoding
+ # http://bugs.python.org/issue20404
+ r = self.BytesIO()
+ b = self.BufferedWriter(r)
+ with self.assertRaisesRegex(LookupError, "is not a text encoding"):
+ self.TextIOWrapper(b, encoding="hex")
+
+ def test_detach(self):
+ r = self.BytesIO()
+ b = self.BufferedWriter(r)
+ t = self.TextIOWrapper(b, encoding="ascii")
+ self.assertIs(t.detach(), b)
+
+ t = self.TextIOWrapper(b, encoding="ascii")
+ t.write("howdy")
+ self.assertFalse(r.getvalue())
+ t.detach()
+ self.assertEqual(r.getvalue(), b"howdy")
+ self.assertRaises(ValueError, t.detach)
+
+ # Operations independent of the detached stream should still work
+ repr(t)
+ self.assertEqual(t.encoding, "ascii")
+ self.assertEqual(t.errors, "strict")
+ self.assertFalse(t.line_buffering)
+ self.assertFalse(t.write_through)
+
+ def test_repr(self):
+ raw = self.BytesIO("hello".encode("utf-8"))
+ b = self.BufferedReader(raw)
+ t = self.TextIOWrapper(b, encoding="utf-8")
+ modname = self.TextIOWrapper.__module__
+ self.assertRegex(repr(t),
+ r"<(%s\.)?TextIOWrapper encoding='utf-8'>" % modname)
+ raw.name = "dummy"
+ self.assertRegex(repr(t),
+ r"<(%s\.)?TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
+ t.mode = "r"
+ self.assertRegex(repr(t),
+ r"<(%s\.)?TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
+ raw.name = b"dummy"
+ self.assertRegex(repr(t),
+ r"<(%s\.)?TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
+
+ t.buffer.detach()
+ repr(t) # Should not raise an exception
+
+ def test_recursive_repr(self):
+ # Issue #25455
+ raw = self.BytesIO()
+ t = self.TextIOWrapper(raw, encoding="utf-8")
+ with support.swap_attr(raw, 'name', t), support.infinite_recursion(25):
+ with self.assertRaises(RuntimeError):
+ repr(t) # Should not crash
+
+ def test_subclass_repr(self):
+ class TestSubclass(self.TextIOWrapper):
+ pass
+
+ f = TestSubclass(self.StringIO())
+ self.assertIn(TestSubclass.__name__, repr(f))
+
+ def test_line_buffering(self):
+ r = self.BytesIO()
+ b = self.BufferedWriter(r, 1000)
+ t = self.TextIOWrapper(b, encoding="utf-8", newline="\n", line_buffering=True)
+ t.write("X")
+ self.assertEqual(r.getvalue(), b"") # No flush happened
+ t.write("Y\nZ")
+ self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
+ t.write("A\rB")
+ self.assertEqual(r.getvalue(), b"XY\nZA\rB")
+
+ def test_reconfigure_line_buffering(self):
+ r = self.BytesIO()
+ b = self.BufferedWriter(r, 1000)
+ t = self.TextIOWrapper(b, encoding="utf-8", newline="\n", line_buffering=False)
+ t.write("AB\nC")
+ self.assertEqual(r.getvalue(), b"")
+
+ t.reconfigure(line_buffering=True) # implicit flush
+ self.assertEqual(r.getvalue(), b"AB\nC")
+ t.write("DEF\nG")
+ self.assertEqual(r.getvalue(), b"AB\nCDEF\nG")
+ t.write("H")
+ self.assertEqual(r.getvalue(), b"AB\nCDEF\nG")
+ t.reconfigure(line_buffering=False) # implicit flush
+ self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH")
+ t.write("IJ")
+ self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH")
+
+ # Keeping default value
+ t.reconfigure()
+ t.reconfigure(line_buffering=None)
+ self.assertEqual(t.line_buffering, False)
+ t.reconfigure(line_buffering=True)
+ t.reconfigure()
+ t.reconfigure(line_buffering=None)
+ self.assertEqual(t.line_buffering, True)
+
+ @unittest.skipIf(sys.flags.utf8_mode, "utf-8 mode is enabled")
+ def test_default_encoding(self):
+ with os_helper.EnvironmentVarGuard() as env:
+ # try to get a user preferred encoding different than the current
+ # locale encoding to check that TextIOWrapper() uses the current
+ # locale encoding and not the user preferred encoding
+ env.unset('LC_ALL', 'LANG', 'LC_CTYPE')
+
+ current_locale_encoding = locale.getencoding()
+ b = self.BytesIO()
+ with warnings.catch_warnings():
+ warnings.simplefilter("ignore", EncodingWarning)
+ t = self.TextIOWrapper(b)
+ self.assertEqual(t.encoding, current_locale_encoding)
+
+ def test_encoding(self):
+ # Check the encoding attribute is always set, and valid
+ b = self.BytesIO()
+ t = self.TextIOWrapper(b, encoding="utf-8")
+ self.assertEqual(t.encoding, "utf-8")
+ with warnings.catch_warnings():
+ warnings.simplefilter("ignore", EncodingWarning)
+ t = self.TextIOWrapper(b)
+ self.assertIsNotNone(t.encoding)
+ codecs.lookup(t.encoding)
+
+ def test_encoding_errors_reading(self):
+ # (1) default
+ b = self.BytesIO(b"abc\n\xff\n")
+ t = self.TextIOWrapper(b, encoding="ascii")
+ self.assertRaises(UnicodeError, t.read)
+ # (2) explicit strict
+ b = self.BytesIO(b"abc\n\xff\n")
+ t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
+ self.assertRaises(UnicodeError, t.read)
+ # (3) ignore
+ b = self.BytesIO(b"abc\n\xff\n")
+ t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
+ self.assertEqual(t.read(), "abc\n\n")
+ # (4) replace
+ b = self.BytesIO(b"abc\n\xff\n")
+ t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
+ self.assertEqual(t.read(), "abc\n\ufffd\n")
+
+ def test_encoding_errors_writing(self):
+ # (1) default
+ b = self.BytesIO()
+ t = self.TextIOWrapper(b, encoding="ascii")
+ self.assertRaises(UnicodeError, t.write, "\xff")
+ # (2) explicit strict
+ b = self.BytesIO()
+ t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
+ self.assertRaises(UnicodeError, t.write, "\xff")
+ # (3) ignore
+ b = self.BytesIO()
+ t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
+ newline="\n")
+ t.write("abc\xffdef\n")
+ t.flush()
+ self.assertEqual(b.getvalue(), b"abcdef\n")
+ # (4) replace
+ b = self.BytesIO()
+ t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
+ newline="\n")
+ t.write("abc\xffdef\n")
+ t.flush()
+ self.assertEqual(b.getvalue(), b"abc?def\n")
+
+ def test_newlines(self):
+ input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
+
+ tests = [
+ [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
+ [ '', input_lines ],
+ [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
+ [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
+ [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
+ ]
+ encodings = (
+ 'utf-8', 'latin-1',
+ 'utf-16', 'utf-16-le', 'utf-16-be',
+ 'utf-32', 'utf-32-le', 'utf-32-be',
+ )
+
+ # Try a range of buffer sizes to test the case where \r is the last
+ # character in TextIOWrapper._pending_line.
+ for encoding in encodings:
+ # XXX: str.encode() should return bytes
+ data = bytes(''.join(input_lines).encode(encoding))
+ for do_reads in (False, True):
+ for bufsize in range(1, 10):
+ for newline, exp_lines in tests:
+ bufio = self.BufferedReader(self.BytesIO(data), bufsize)
+ textio = self.TextIOWrapper(bufio, newline=newline,
+ encoding=encoding)
+ if do_reads:
+ got_lines = []
+ while True:
+ c2 = textio.read(2)
+ if c2 == '':
+ break
+ self.assertEqual(len(c2), 2)
+ got_lines.append(c2 + textio.readline())
+ else:
+ got_lines = list(textio)
+
+ for got_line, exp_line in zip(got_lines, exp_lines):
+ self.assertEqual(got_line, exp_line)
+ self.assertEqual(len(got_lines), len(exp_lines))
+
+ def test_newlines_input(self):
+ testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
+ normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
+ for newline, expected in [
+ (None, normalized.decode("ascii").splitlines(keepends=True)),
+ ("", testdata.decode("ascii").splitlines(keepends=True)),
+ ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
+ ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
+ ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
+ ]:
+ buf = self.BytesIO(testdata)
+ txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
+ self.assertEqual(txt.readlines(), expected)
+ txt.seek(0)
+ self.assertEqual(txt.read(), "".join(expected))
+
+ def test_newlines_output(self):
+ testdict = {
+ "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
+ "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
+ "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
+ "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
+ }
+ tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
+ for newline, expected in tests:
+ buf = self.BytesIO()
+ txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
+ txt.write("AAA\nB")
+ txt.write("BB\nCCC\n")
+ txt.write("X\rY\r\nZ")
+ txt.flush()
+ self.assertEqual(buf.closed, False)
+ self.assertEqual(buf.getvalue(), expected)
+
+ def test_destructor(self):
+ l = []
+ base = self.BytesIO
+ class MyBytesIO(base):
+ def close(self):
+ l.append(self.getvalue())
+ base.close(self)
+ b = MyBytesIO()
+ t = self.TextIOWrapper(b, encoding="ascii")
+ t.write("abc")
+ del t
+ support.gc_collect()
+ self.assertEqual([b"abc"], l)
+
+ def test_override_destructor(self):
+ record = []
+ class MyTextIO(self.TextIOWrapper):
+ def __del__(self):
+ record.append(1)
+ try:
+ f = super().__del__
+ except AttributeError:
+ pass
+ else:
+ f()
+ def close(self):
+ record.append(2)
+ super().close()
+ def flush(self):
+ record.append(3)
+ super().flush()
+ b = self.BytesIO()
+ t = MyTextIO(b, encoding="ascii")
+ del t
+ support.gc_collect()
+ self.assertEqual(record, [1, 2, 3])
+
+ def test_error_through_destructor(self):
+ # Test that the exception state is not modified by a destructor,
+ # even if close() fails.
+ rawio = self.CloseFailureIO()
+ with support.catch_unraisable_exception() as cm:
+ with self.assertRaises(AttributeError):
+ self.TextIOWrapper(rawio, encoding="utf-8").xyzzy
+
+ self.assertEqual(cm.unraisable.exc_type, OSError)
+
+ # Systematic tests of the text I/O API
+
+ def test_basic_io(self):
+ for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
+ for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
+ f = self.open(os_helper.TESTFN, "w+", encoding=enc)
+ f._CHUNK_SIZE = chunksize
+ self.assertEqual(f.write("abc"), 3)
+ f.close()
+ f = self.open(os_helper.TESTFN, "r+", encoding=enc)
+ f._CHUNK_SIZE = chunksize
+ self.assertEqual(f.tell(), 0)
+ self.assertEqual(f.read(), "abc")
+ cookie = f.tell()
+ self.assertEqual(f.seek(0), 0)
+ self.assertEqual(f.read(None), "abc")
+ f.seek(0)
+ self.assertEqual(f.read(2), "ab")
+ self.assertEqual(f.read(1), "c")
+ self.assertEqual(f.read(1), "")
+ self.assertEqual(f.read(), "")
+ self.assertEqual(f.tell(), cookie)
+ self.assertEqual(f.seek(0), 0)
+ self.assertEqual(f.seek(0, 2), cookie)
+ self.assertEqual(f.write("def"), 3)
+ self.assertEqual(f.seek(cookie), cookie)
+ self.assertEqual(f.read(), "def")
+ if enc.startswith("utf"):
+ self.multi_line_test(f, enc)
+ f.close()
+
+ def multi_line_test(self, f, enc):
+ f.seek(0)
+ f.truncate()
+ sample = "s\xff\u0fff\uffff"
+ wlines = []
+ for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
+ chars = []
+ for i in range(size):
+ chars.append(sample[i % len(sample)])
+ line = "".join(chars) + "\n"
+ wlines.append((f.tell(), line))
+ f.write(line)
+ f.seek(0)
+ rlines = []
+ while True:
+ pos = f.tell()
+ line = f.readline()
+ if not line:
+ break
+ rlines.append((pos, line))
+ self.assertEqual(rlines, wlines)
+
+ def test_telling(self):
+ f = self.open(os_helper.TESTFN, "w+", encoding="utf-8")
+ p0 = f.tell()
+ f.write("\xff\n")
+ p1 = f.tell()
+ f.write("\xff\n")
+ p2 = f.tell()
+ f.seek(0)
+ self.assertEqual(f.tell(), p0)
+ self.assertEqual(f.readline(), "\xff\n")
+ self.assertEqual(f.tell(), p1)
+ self.assertEqual(f.readline(), "\xff\n")
+ self.assertEqual(f.tell(), p2)
+ f.seek(0)
+ for line in f:
+ self.assertEqual(line, "\xff\n")
+ self.assertRaises(OSError, f.tell)
+ self.assertEqual(f.tell(), p2)
+ f.close()
+
+ def test_seeking(self):
+ chunk_size = _default_chunk_size()
+ prefix_size = chunk_size - 2
+ u_prefix = "a" * prefix_size
+ prefix = bytes(u_prefix.encode("utf-8"))
+ self.assertEqual(len(u_prefix), len(prefix))
+ u_suffix = "\u8888\n"
+ suffix = bytes(u_suffix.encode("utf-8"))
+ line = prefix + suffix
+ with self.open(os_helper.TESTFN, "wb") as f:
+ f.write(line*2)
+ with self.open(os_helper.TESTFN, "r", encoding="utf-8") as f:
+ s = f.read(prefix_size)
+ self.assertEqual(s, str(prefix, "ascii"))
+ self.assertEqual(f.tell(), prefix_size)
+ self.assertEqual(f.readline(), u_suffix)
+
+ def test_seeking_too(self):
+ # Regression test for a specific bug
+ data = b'\xe0\xbf\xbf\n'
+ with self.open(os_helper.TESTFN, "wb") as f:
+ f.write(data)
+ with self.open(os_helper.TESTFN, "r", encoding="utf-8") as f:
+ f._CHUNK_SIZE # Just test that it exists
+ f._CHUNK_SIZE = 2
+ f.readline()
+ f.tell()
+
+ def test_seek_and_tell(self):
+ #Test seek/tell using the StatefulIncrementalDecoder.
+ # Make test faster by doing smaller seeks
+ CHUNK_SIZE = 128
+
+ def test_seek_and_tell_with_data(data, min_pos=0):
+ """Tell/seek to various points within a data stream and ensure
+ that the decoded data returned by read() is consistent."""
+ f = self.open(os_helper.TESTFN, 'wb')
+ f.write(data)
+ f.close()
+ f = self.open(os_helper.TESTFN, encoding='test_decoder')
+ f._CHUNK_SIZE = CHUNK_SIZE
+ decoded = f.read()
+ f.close()
+
+ for i in range(min_pos, len(decoded) + 1): # seek positions
+ for j in [1, 5, len(decoded) - i]: # read lengths
+ f = self.open(os_helper.TESTFN, encoding='test_decoder')
+ self.assertEqual(f.read(i), decoded[:i])
+ cookie = f.tell()
+ self.assertEqual(f.read(j), decoded[i:i + j])
+ f.seek(cookie)
+ self.assertEqual(f.read(), decoded[i:])
+ f.close()
+
+ # Enable the test decoder.
+ StatefulIncrementalDecoder.codecEnabled = 1
+
+ # Run the tests.
+ try:
+ # Try each test case.
+ for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
+ test_seek_and_tell_with_data(input)
+
+ # Position each test case so that it crosses a chunk boundary.
+ for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
+ offset = CHUNK_SIZE - len(input)//2
+ prefix = b'.'*offset
+ # Don't bother seeking into the prefix (takes too long).
+ min_pos = offset*2
+ test_seek_and_tell_with_data(prefix + input, min_pos)
+
+ # Ensure our test decoder won't interfere with subsequent tests.
+ finally:
+ StatefulIncrementalDecoder.codecEnabled = 0
+
+ def test_multibyte_seek_and_tell(self):
+ f = self.open(os_helper.TESTFN, "w", encoding="euc_jp")
+ f.write("AB\n\u3046\u3048\n")
+ f.close()
+
+ f = self.open(os_helper.TESTFN, "r", encoding="euc_jp")
+ self.assertEqual(f.readline(), "AB\n")
+ p0 = f.tell()
+ self.assertEqual(f.readline(), "\u3046\u3048\n")
+ p1 = f.tell()
+ f.seek(p0)
+ self.assertEqual(f.readline(), "\u3046\u3048\n")
+ self.assertEqual(f.tell(), p1)
+ f.close()
+
+ def test_seek_with_encoder_state(self):
+ f = self.open(os_helper.TESTFN, "w", encoding="euc_jis_2004")
+ f.write("\u00e6\u0300")
+ p0 = f.tell()
+ f.write("\u00e6")
+ f.seek(p0)
+ f.write("\u0300")
+ f.close()
+
+ f = self.open(os_helper.TESTFN, "r", encoding="euc_jis_2004")
+ self.assertEqual(f.readline(), "\u00e6\u0300\u0300")
+ f.close()
+
+ def test_encoded_writes(self):
+ data = "1234567890"
+ tests = ("utf-16",
+ "utf-16-le",
+ "utf-16-be",
+ "utf-32",
+ "utf-32-le",
+ "utf-32-be")
+ for encoding in tests:
+ buf = self.BytesIO()
+ f = self.TextIOWrapper(buf, encoding=encoding)
+ # Check if the BOM is written only once (see issue1753).
+ f.write(data)
+ f.write(data)
+ f.seek(0)
+ self.assertEqual(f.read(), data * 2)
+ f.seek(0)
+ self.assertEqual(f.read(), data * 2)
+ self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
+
+ def test_unreadable(self):
+ class UnReadable(self.BytesIO):
+ def readable(self):
+ return False
+ txt = self.TextIOWrapper(UnReadable(), encoding="utf-8")
+ self.assertRaises(OSError, txt.read)
+
+ def test_read_one_by_one(self):
+ txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"), encoding="utf-8")
+ reads = ""
+ while True:
+ c = txt.read(1)
+ if not c:
+ break
+ reads += c
+ self.assertEqual(reads, "AA\nBB")
+
+ def test_readlines(self):
+ txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"), encoding="utf-8")
+ self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
+ txt.seek(0)
+ self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
+ txt.seek(0)
+ self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
+
+ # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
+ def test_read_by_chunk(self):
+ # make sure "\r\n" straddles 128 char boundary.
+ txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"), encoding="utf-8")
+ reads = ""
+ while True:
+ c = txt.read(128)
+ if not c:
+ break
+ reads += c
+ self.assertEqual(reads, "A"*127+"\nB")
+
+ def test_writelines(self):
+ l = ['ab', 'cd', 'ef']
+ buf = self.BytesIO()
+ txt = self.TextIOWrapper(buf, encoding="utf-8")
+ txt.writelines(l)
+ txt.flush()
+ self.assertEqual(buf.getvalue(), b'abcdef')
+
+ def test_writelines_userlist(self):
+ l = UserList(['ab', 'cd', 'ef'])
+ buf = self.BytesIO()
+ txt = self.TextIOWrapper(buf, encoding="utf-8")
+ txt.writelines(l)
+ txt.flush()
+ self.assertEqual(buf.getvalue(), b'abcdef')
+
+ def test_writelines_error(self):
+ txt = self.TextIOWrapper(self.BytesIO(), encoding="utf-8")
+ self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
+ self.assertRaises(TypeError, txt.writelines, None)
+ self.assertRaises(TypeError, txt.writelines, b'abc')
+
+ def test_issue1395_1(self):
+ txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
+
+ # read one char at a time
+ reads = ""
+ while True:
+ c = txt.read(1)
+ if not c:
+ break
+ reads += c
+ self.assertEqual(reads, self.normalized)
+
+ def test_issue1395_2(self):
+ txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
+ txt._CHUNK_SIZE = 4
+
+ reads = ""
+ while True:
+ c = txt.read(4)
+ if not c:
+ break
+ reads += c
+ self.assertEqual(reads, self.normalized)
+
+ def test_issue1395_3(self):
+ txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
+ txt._CHUNK_SIZE = 4
+
+ reads = txt.read(4)
+ reads += txt.read(4)
+ reads += txt.readline()
+ reads += txt.readline()
+ reads += txt.readline()
+ self.assertEqual(reads, self.normalized)
+
+ def test_issue1395_4(self):
+ txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
+ txt._CHUNK_SIZE = 4
+
+ reads = txt.read(4)
+ reads += txt.read()
+ self.assertEqual(reads, self.normalized)
+
+ def test_issue1395_5(self):
+ txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
+ txt._CHUNK_SIZE = 4
+
+ reads = txt.read(4)
+ pos = txt.tell()
+ txt.seek(0)
+ txt.seek(pos)
+ self.assertEqual(txt.read(4), "BBB\n")
+
+ def test_issue2282(self):
+ buffer = self.BytesIO(self.testdata)
+ txt = self.TextIOWrapper(buffer, encoding="ascii")
+
+ self.assertEqual(buffer.seekable(), txt.seekable())
+
+ def test_append_bom(self):
+ # The BOM is not written again when appending to a non-empty file
+ filename = os_helper.TESTFN
+ for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
+ with self.open(filename, 'w', encoding=charset) as f:
+ f.write('aaa')
+ pos = f.tell()
+ with self.open(filename, 'rb') as f:
+ self.assertEqual(f.read(), 'aaa'.encode(charset))
+
+ with self.open(filename, 'a', encoding=charset) as f:
+ f.write('xxx')
+ with self.open(filename, 'rb') as f:
+ self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
+
+ def test_seek_bom(self):
+ # Same test, but when seeking manually
+ filename = os_helper.TESTFN
+ for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
+ with self.open(filename, 'w', encoding=charset) as f:
+ f.write('aaa')
+ pos = f.tell()
+ with self.open(filename, 'r+', encoding=charset) as f:
+ f.seek(pos)
+ f.write('zzz')
+ f.seek(0)
+ f.write('bbb')
+ with self.open(filename, 'rb') as f:
+ self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
+
+ def test_seek_append_bom(self):
+ # Same test, but first seek to the start and then to the end
+ filename = os_helper.TESTFN
+ for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
+ with self.open(filename, 'w', encoding=charset) as f:
+ f.write('aaa')
+ with self.open(filename, 'a', encoding=charset) as f:
+ f.seek(0)
+ f.seek(0, self.SEEK_END)
+ f.write('xxx')
+ with self.open(filename, 'rb') as f:
+ self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
+
+ def test_errors_property(self):
+ with self.open(os_helper.TESTFN, "w", encoding="utf-8") as f:
+ self.assertEqual(f.errors, "strict")
+ with self.open(os_helper.TESTFN, "w", encoding="utf-8", errors="replace") as f:
+ self.assertEqual(f.errors, "replace")
+
+ @support.no_tracing
+ @threading_helper.requires_working_threading()
+ def test_threads_write(self):
+ # Issue6750: concurrent writes could duplicate data
+ event = threading.Event()
+ with self.open(os_helper.TESTFN, "w", encoding="utf-8", buffering=1) as f:
+ def run(n):
+ text = "Thread%03d\n" % n
+ event.wait()
+ f.write(text)
+ threads = [threading.Thread(target=run, args=(x,))
+ for x in range(20)]
+ with threading_helper.start_threads(threads, event.set):
+ time.sleep(0.02)
+ with self.open(os_helper.TESTFN, encoding="utf-8") as f:
+ content = f.read()
+ for n in range(20):
+ self.assertEqual(content.count("Thread%03d\n" % n), 1)
+
+ def test_flush_error_on_close(self):
+ # Test that text file is closed despite failed flush
+ # and that flush() is called before file closed.
+ txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
+ closed = []
+ def bad_flush():
+ closed[:] = [txt.closed, txt.buffer.closed]
+ raise OSError()
+ txt.flush = bad_flush
+ self.assertRaises(OSError, txt.close) # exception not swallowed
+ self.assertTrue(txt.closed)
+ self.assertTrue(txt.buffer.closed)
+ self.assertTrue(closed) # flush() called
+ self.assertFalse(closed[0]) # flush() called before file closed
+ self.assertFalse(closed[1])
+ txt.flush = lambda: None # break reference loop
+
+ def test_close_error_on_close(self):
+ buffer = self.BytesIO(self.testdata)
+ def bad_flush():
+ raise OSError('flush')
+ def bad_close():
+ raise OSError('close')
+ buffer.close = bad_close
+ txt = self.TextIOWrapper(buffer, encoding="ascii")
+ txt.flush = bad_flush
+ with self.assertRaises(OSError) as err: # exception not swallowed
+ txt.close()
+ self.assertEqual(err.exception.args, ('close',))
+ self.assertIsInstance(err.exception.__context__, OSError)
+ self.assertEqual(err.exception.__context__.args, ('flush',))
+ self.assertFalse(txt.closed)
+
+ # Silence destructor error
+ buffer.close = lambda: None
+ txt.flush = lambda: None
+
+ def test_nonnormalized_close_error_on_close(self):
+ # Issue #21677
+ buffer = self.BytesIO(self.testdata)
+ def bad_flush():
+ raise non_existing_flush
+ def bad_close():
+ raise non_existing_close
+ buffer.close = bad_close
+ txt = self.TextIOWrapper(buffer, encoding="ascii")
+ txt.flush = bad_flush
+ with self.assertRaises(NameError) as err: # exception not swallowed
+ txt.close()
+ self.assertIn('non_existing_close', str(err.exception))
+ self.assertIsInstance(err.exception.__context__, NameError)
+ self.assertIn('non_existing_flush', str(err.exception.__context__))
+ self.assertFalse(txt.closed)
+
+ # Silence destructor error
+ buffer.close = lambda: None
+ txt.flush = lambda: None
+
+ def test_multi_close(self):
+ txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
+ txt.close()
+ txt.close()
+ txt.close()
+ self.assertRaises(ValueError, txt.flush)
+
+ def test_unseekable(self):
+ txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata), encoding="utf-8")
+ self.assertRaises(self.UnsupportedOperation, txt.tell)
+ self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
+
+ def test_readonly_attributes(self):
+ txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
+ buf = self.BytesIO(self.testdata)
+ with self.assertRaises(AttributeError):
+ txt.buffer = buf
+
+ def test_rawio(self):
+ # Issue #12591: TextIOWrapper must work with raw I/O objects, so
+ # that subprocess.Popen() can have the required unbuffered
+ # semantics with universal_newlines=True.
+ raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
+ txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
+ # Reads
+ self.assertEqual(txt.read(4), 'abcd')
+ self.assertEqual(txt.readline(), 'efghi\n')
+ self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
+
+ def test_rawio_write_through(self):
+ # Issue #12591: with write_through=True, writes don't need a flush
+ raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
+ txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
+ write_through=True)
+ txt.write('1')
+ txt.write('23\n4')
+ txt.write('5')
+ self.assertEqual(b''.join(raw._write_stack), b'123\n45')
+
+ def test_bufio_write_through(self):
+ # Issue #21396: write_through=True doesn't force a flush()
+ # on the underlying binary buffered object.
+ flush_called, write_called = [], []
+ class BufferedWriter(self.BufferedWriter):
+ def flush(self, *args, **kwargs):
+ flush_called.append(True)
+ return super().flush(*args, **kwargs)
+ def write(self, *args, **kwargs):
+ write_called.append(True)
+ return super().write(*args, **kwargs)
+
+ rawio = self.BytesIO()
+ data = b"a"
+ bufio = BufferedWriter(rawio, len(data)*2)
+ textio = self.TextIOWrapper(bufio, encoding='ascii',
+ write_through=True)
+ # write to the buffered io but don't overflow the buffer
+ text = data.decode('ascii')
+ textio.write(text)
+
+ # buffer.flush is not called with write_through=True
+ self.assertFalse(flush_called)
+ # buffer.write *is* called with write_through=True
+ self.assertTrue(write_called)
+ self.assertEqual(rawio.getvalue(), b"") # no flush
+
+ write_called = [] # reset
+ textio.write(text * 10) # total content is larger than bufio buffer
+ self.assertTrue(write_called)
+ self.assertEqual(rawio.getvalue(), data * 11) # all flushed
+
+ def test_reconfigure_write_through(self):
+ raw = self.MockRawIO([])
+ t = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
+ t.write('1')
+ t.reconfigure(write_through=True) # implied flush
+ self.assertEqual(t.write_through, True)
+ self.assertEqual(b''.join(raw._write_stack), b'1')
+ t.write('23')
+ self.assertEqual(b''.join(raw._write_stack), b'123')
+ t.reconfigure(write_through=False)
+ self.assertEqual(t.write_through, False)
+ t.write('45')
+ t.flush()
+ self.assertEqual(b''.join(raw._write_stack), b'12345')
+ # Keeping default value
+ t.reconfigure()
+ t.reconfigure(write_through=None)
+ self.assertEqual(t.write_through, False)
+ t.reconfigure(write_through=True)
+ t.reconfigure()
+ t.reconfigure(write_through=None)
+ self.assertEqual(t.write_through, True)
+
+ def test_read_nonbytes(self):
+ # Issue #17106
+ # Crash when underlying read() returns non-bytes
+ t = self.TextIOWrapper(self.StringIO('a'), encoding="utf-8")
+ self.assertRaises(TypeError, t.read, 1)
+ t = self.TextIOWrapper(self.StringIO('a'), encoding="utf-8")
+ self.assertRaises(TypeError, t.readline)
+ t = self.TextIOWrapper(self.StringIO('a'), encoding="utf-8")
+ self.assertRaises(TypeError, t.read)
+
+ def test_illegal_encoder(self):
+ # Issue 31271: Calling write() while the return value of encoder's
+ # encode() is invalid shouldn't cause an assertion failure.
+ rot13 = codecs.lookup("rot13")
+ with support.swap_attr(rot13, '_is_text_encoding', True):
+ t = self.TextIOWrapper(self.BytesIO(b'foo'), encoding="rot13")
+ self.assertRaises(TypeError, t.write, 'bar')
+
+ def test_illegal_decoder(self):
+ # Issue #17106
+ # Bypass the early encoding check added in issue 20404
+ def _make_illegal_wrapper():
+ quopri = codecs.lookup("quopri")
+ quopri._is_text_encoding = True
+ try:
+ t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'),
+ newline='\n', encoding="quopri")
+ finally:
+ quopri._is_text_encoding = False
+ return t
+ # Crash when decoder returns non-string
+ t = _make_illegal_wrapper()
+ self.assertRaises(TypeError, t.read, 1)
+ t = _make_illegal_wrapper()
+ self.assertRaises(TypeError, t.readline)
+ t = _make_illegal_wrapper()
+ self.assertRaises(TypeError, t.read)
+
+ # Issue 31243: calling read() while the return value of decoder's
+ # getstate() is invalid should neither crash the interpreter nor
+ # raise a SystemError.
+ def _make_very_illegal_wrapper(getstate_ret_val):
+ class BadDecoder:
+ def getstate(self):
+ return getstate_ret_val
+ def _get_bad_decoder(dummy):
+ return BadDecoder()
+ quopri = codecs.lookup("quopri")
+ with support.swap_attr(quopri, 'incrementaldecoder',
+ _get_bad_decoder):
+ return _make_illegal_wrapper()
+ t = _make_very_illegal_wrapper(42)
+ self.assertRaises(TypeError, t.read, 42)
+ t = _make_very_illegal_wrapper(())
+ self.assertRaises(TypeError, t.read, 42)
+ t = _make_very_illegal_wrapper((1, 2))
+ self.assertRaises(TypeError, t.read, 42)
+
+ def _check_create_at_shutdown(self, **kwargs):
+ # Issue #20037: creating a TextIOWrapper at shutdown
+ # shouldn't crash the interpreter.
+ iomod = self.io.__name__
+ code = """if 1:
+ import codecs
+ import {iomod} as io
+
+ # Avoid looking up codecs at shutdown
+ codecs.lookup('utf-8')
+
+ class C:
+ def __del__(self):
+ io.TextIOWrapper(io.BytesIO(), **{kwargs})
+ print("ok")
+ c = C()
+ """.format(iomod=iomod, kwargs=kwargs)
+ return assert_python_ok("-c", code)
+
+ def test_create_at_shutdown_without_encoding(self):
+ rc, out, err = self._check_create_at_shutdown()
+ if err:
+ # Can error out with a RuntimeError if the module state
+ # isn't found.
+ self.assertIn(self.shutdown_error, err.decode())
+ else:
+ self.assertEqual("ok", out.decode().strip())
+
+ def test_create_at_shutdown_with_encoding(self):
+ rc, out, err = self._check_create_at_shutdown(encoding='utf-8',
+ errors='strict')
+ self.assertFalse(err)
+ self.assertEqual("ok", out.decode().strip())
+
+ def test_read_byteslike(self):
+ r = MemviewBytesIO(b'Just some random string\n')
+ t = self.TextIOWrapper(r, 'utf-8')
+
+ # TextIOwrapper will not read the full string, because
+ # we truncate it to a multiple of the native int size
+ # so that we can construct a more complex memoryview.
+ bytes_val = _to_memoryview(r.getvalue()).tobytes()
+
+ self.assertEqual(t.read(200), bytes_val.decode('utf-8'))
+
+ def test_issue22849(self):
+ class F(object):
+ def readable(self): return True
+ def writable(self): return True
+ def seekable(self): return True
+
+ for i in range(10):
+ try:
+ self.TextIOWrapper(F(), encoding='utf-8')
+ except Exception:
+ pass
+
+ F.tell = lambda x: 0
+ t = self.TextIOWrapper(F(), encoding='utf-8')
+
+ def test_reconfigure_locale(self):
+ wrapper = self.TextIOWrapper(self.BytesIO(b"test"))
+ wrapper.reconfigure(encoding="locale")
+
+ def test_reconfigure_encoding_read(self):
+ # latin1 -> utf8
+ # (latin1 can decode utf-8 encoded string)
+ data = 'abc\xe9\n'.encode('latin1') + 'd\xe9f\n'.encode('utf8')
+ raw = self.BytesIO(data)
+ txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n')
+ self.assertEqual(txt.readline(), 'abc\xe9\n')
+ with self.assertRaises(self.UnsupportedOperation):
+ txt.reconfigure(encoding='utf-8')
+ with self.assertRaises(self.UnsupportedOperation):
+ txt.reconfigure(newline=None)
+
+ def test_reconfigure_write_fromascii(self):
+ # ascii has a specific encodefunc in the C implementation,
+ # but utf-8-sig has not. Make sure that we get rid of the
+ # cached encodefunc when we switch encoders.
+ raw = self.BytesIO()
+ txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
+ txt.write('foo\n')
+ txt.reconfigure(encoding='utf-8-sig')
+ txt.write('\xe9\n')
+ txt.flush()
+ self.assertEqual(raw.getvalue(), b'foo\n\xc3\xa9\n')
+
+ def test_reconfigure_write(self):
+ # latin -> utf8
+ raw = self.BytesIO()
+ txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n')
+ txt.write('abc\xe9\n')
+ txt.reconfigure(encoding='utf-8')
+ self.assertEqual(raw.getvalue(), b'abc\xe9\n')
+ txt.write('d\xe9f\n')
+ txt.flush()
+ self.assertEqual(raw.getvalue(), b'abc\xe9\nd\xc3\xa9f\n')
+
+ # ascii -> utf-8-sig: ensure that no BOM is written in the middle of
+ # the file
+ raw = self.BytesIO()
+ txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
+ txt.write('abc\n')
+ txt.reconfigure(encoding='utf-8-sig')
+ txt.write('d\xe9f\n')
+ txt.flush()
+ self.assertEqual(raw.getvalue(), b'abc\nd\xc3\xa9f\n')
+
+ def test_reconfigure_write_non_seekable(self):
+ raw = self.BytesIO()
+ raw.seekable = lambda: False
+ raw.seek = None
+ txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
+ txt.write('abc\n')
+ txt.reconfigure(encoding='utf-8-sig')
+ txt.write('d\xe9f\n')
+ txt.flush()
+
+ # If the raw stream is not seekable, there'll be a BOM
+ self.assertEqual(raw.getvalue(), b'abc\n\xef\xbb\xbfd\xc3\xa9f\n')
+
+ def test_reconfigure_defaults(self):
+ txt = self.TextIOWrapper(self.BytesIO(), 'ascii', 'replace', '\n')
+ txt.reconfigure(encoding=None)
+ self.assertEqual(txt.encoding, 'ascii')
+ self.assertEqual(txt.errors, 'replace')
+ txt.write('LF\n')
+
+ txt.reconfigure(newline='\r\n')
+ self.assertEqual(txt.encoding, 'ascii')
+ self.assertEqual(txt.errors, 'replace')
+
+ txt.reconfigure(errors='ignore')
+ self.assertEqual(txt.encoding, 'ascii')
+ self.assertEqual(txt.errors, 'ignore')
+ txt.write('CRLF\n')
+
+ txt.reconfigure(encoding='utf-8', newline=None)
+ self.assertEqual(txt.errors, 'strict')
+ txt.seek(0)
+ self.assertEqual(txt.read(), 'LF\nCRLF\n')
+
+ self.assertEqual(txt.detach().getvalue(), b'LF\nCRLF\r\n')
+
+ def test_reconfigure_errors(self):
+ txt = self.TextIOWrapper(self.BytesIO(), 'ascii', 'replace', '\r')
+ with self.assertRaises(TypeError): # there was a crash
+ txt.reconfigure(encoding=42)
+ if self.is_C:
+ with self.assertRaises(UnicodeEncodeError):
+ txt.reconfigure(encoding='\udcfe')
+ with self.assertRaises(LookupError):
+ txt.reconfigure(encoding='locale\0')
+ # TODO: txt.reconfigure(encoding='utf-8\0')
+ # TODO: txt.reconfigure(encoding='nonexisting')
+ with self.assertRaises(TypeError):
+ txt.reconfigure(errors=42)
+ if self.is_C:
+ with self.assertRaises(UnicodeEncodeError):
+ txt.reconfigure(errors='\udcfe')
+ # TODO: txt.reconfigure(errors='ignore\0')
+ # TODO: txt.reconfigure(errors='nonexisting')
+ with self.assertRaises(TypeError):
+ txt.reconfigure(newline=42)
+ with self.assertRaises(ValueError):
+ txt.reconfigure(newline='\udcfe')
+ with self.assertRaises(ValueError):
+ txt.reconfigure(newline='xyz')
+ if not self.is_C:
+ # TODO: Should fail in C too.
+ with self.assertRaises(ValueError):
+ txt.reconfigure(newline='\n\0')
+ if self.is_C:
+ # TODO: Use __bool__(), not __index__().
+ with self.assertRaises(ZeroDivisionError):
+ txt.reconfigure(line_buffering=BadIndex())
+ with self.assertRaises(OverflowError):
+ txt.reconfigure(line_buffering=2**1000)
+ with self.assertRaises(ZeroDivisionError):
+ txt.reconfigure(write_through=BadIndex())
+ with self.assertRaises(OverflowError):
+ txt.reconfigure(write_through=2**1000)
+ with self.assertRaises(ZeroDivisionError): # there was a crash
+ txt.reconfigure(line_buffering=BadIndex(),
+ write_through=BadIndex())
+ self.assertEqual(txt.encoding, 'ascii')
+ self.assertEqual(txt.errors, 'replace')
+ self.assertIs(txt.line_buffering, False)
+ self.assertIs(txt.write_through, False)
+
+ txt.reconfigure(encoding='latin1', errors='ignore', newline='\r\n',
+ line_buffering=True, write_through=True)
+ self.assertEqual(txt.encoding, 'latin1')
+ self.assertEqual(txt.errors, 'ignore')
+ self.assertIs(txt.line_buffering, True)
+ self.assertIs(txt.write_through, True)
+
+ def test_reconfigure_newline(self):
+ raw = self.BytesIO(b'CR\rEOF')
+ txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
+ txt.reconfigure(newline=None)
+ self.assertEqual(txt.readline(), 'CR\n')
+ raw = self.BytesIO(b'CR\rEOF')
+ txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
+ txt.reconfigure(newline='')
+ self.assertEqual(txt.readline(), 'CR\r')
+ raw = self.BytesIO(b'CR\rLF\nEOF')
+ txt = self.TextIOWrapper(raw, 'ascii', newline='\r')
+ txt.reconfigure(newline='\n')
+ self.assertEqual(txt.readline(), 'CR\rLF\n')
+ raw = self.BytesIO(b'LF\nCR\rEOF')
+ txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
+ txt.reconfigure(newline='\r')
+ self.assertEqual(txt.readline(), 'LF\nCR\r')
+ raw = self.BytesIO(b'CR\rCRLF\r\nEOF')
+ txt = self.TextIOWrapper(raw, 'ascii', newline='\r')
+ txt.reconfigure(newline='\r\n')
+ self.assertEqual(txt.readline(), 'CR\rCRLF\r\n')
+
+ txt = self.TextIOWrapper(self.BytesIO(), 'ascii', newline='\r')
+ txt.reconfigure(newline=None)
+ txt.write('linesep\n')
+ txt.reconfigure(newline='')
+ txt.write('LF\n')
+ txt.reconfigure(newline='\n')
+ txt.write('LF\n')
+ txt.reconfigure(newline='\r')
+ txt.write('CR\n')
+ txt.reconfigure(newline='\r\n')
+ txt.write('CRLF\n')
+ expected = 'linesep' + os.linesep + 'LF\nLF\nCR\rCRLF\r\n'
+ self.assertEqual(txt.detach().getvalue().decode('ascii'), expected)
+
+ def test_issue25862(self):
+ # Assertion failures occurred in tell() after read() and write().
+ t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii')
+ t.read(1)
+ t.read()
+ t.tell()
+ t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii')
+ t.read(1)
+ t.write('x')
+ t.tell()
+
+ def test_issue35928(self):
+ p = self.BufferedRWPair(self.BytesIO(b'foo\nbar\n'), self.BytesIO())
+ f = self.TextIOWrapper(p)
+ res = f.readline()
+ self.assertEqual(res, 'foo\n')
+ f.write(res)
+ self.assertEqual(res + f.readline(), 'foo\nbar\n')
+
+ def test_pickling_subclass(self):
+ global MyTextIO
+ class MyTextIO(self.TextIOWrapper):
+ def __init__(self, raw, tag):
+ super().__init__(raw)
+ self.tag = tag
+ def __getstate__(self):
+ return self.tag, self.buffer.getvalue()
+ def __setstate__(slf, state):
+ tag, value = state
+ slf.__init__(self.BytesIO(value), tag)
+
+ raw = self.BytesIO(b'data')
+ txt = MyTextIO(raw, 'ham')
+ for proto in range(pickle.HIGHEST_PROTOCOL + 1):
+ with self.subTest(protocol=proto):
+ pickled = pickle.dumps(txt, proto)
+ newtxt = pickle.loads(pickled)
+ self.assertEqual(newtxt.buffer.getvalue(), b'data')
+ self.assertEqual(newtxt.tag, 'ham')
+ del MyTextIO
+
+ @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
+ def test_read_non_blocking(self):
+ import os
+ r, w = os.pipe()
+ try:
+ os.set_blocking(r, False)
+ with self.io.open(r, 'rt') as textfile:
+ r = None
+ # Nothing has been written so a non-blocking read raises a BlockingIOError exception.
+ with self.assertRaises(BlockingIOError):
+ textfile.read()
+ finally:
+ if r is not None:
+ os.close(r)
+ os.close(w)
+
+
+class MemviewBytesIO(io.BytesIO):
+ '''A BytesIO object whose read method returns memoryviews
+ rather than bytes'''
+
+ def read1(self, len_):
+ return _to_memoryview(super().read1(len_))
+
+ def read(self, len_):
+ return _to_memoryview(super().read(len_))
+
+def _to_memoryview(buf):
+ '''Convert bytes-object *buf* to a non-trivial memoryview'''
+
+ arr = array.array('i')
+ idx = len(buf) - len(buf) % arr.itemsize
+ arr.frombytes(buf[:idx])
+ return memoryview(arr)
+
+
+class CTextIOWrapperTest(TextIOWrapperTest, CTestCase):
+ shutdown_error = "LookupError: unknown encoding: ascii"
+
+ def test_initialization(self):
+ r = self.BytesIO(b"\xc3\xa9\n\n")
+ b = self.BufferedReader(r, 1000)
+ t = self.TextIOWrapper(b, encoding="utf-8")
+ self.assertRaises(ValueError, t.__init__, b, encoding="utf-8", newline='xyzzy')
+ self.assertRaises(ValueError, t.read)
+
+ t = self.TextIOWrapper.__new__(self.TextIOWrapper)
+ self.assertRaises(Exception, repr, t)
+
+ def test_garbage_collection(self):
+ # C TextIOWrapper objects are collected, and collecting them flushes
+ # all data to disk.
+ # The Python version has __del__, so it ends in gc.garbage instead.
+ with warnings.catch_warnings():
+ warnings.simplefilter("ignore", ResourceWarning)
+ rawio = self.FileIO(os_helper.TESTFN, "wb")
+ b = self.BufferedWriter(rawio)
+ t = self.TextIOWrapper(b, encoding="ascii")
+ t.write("456def")
+ t.x = t
+ wr = weakref.ref(t)
+ del t
+ support.gc_collect()
+ self.assertIsNone(wr(), wr)
+ with self.open(os_helper.TESTFN, "rb") as f:
+ self.assertEqual(f.read(), b"456def")
+
+ def test_rwpair_cleared_before_textio(self):
+ # Issue 13070: TextIOWrapper's finalization would crash when called
+ # after the reference to the underlying BufferedRWPair's writer got
+ # cleared by the GC.
+ for i in range(1000):
+ b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
+ t1 = self.TextIOWrapper(b1, encoding="ascii")
+ b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
+ t2 = self.TextIOWrapper(b2, encoding="ascii")
+ # circular references
+ t1.buddy = t2
+ t2.buddy = t1
+ support.gc_collect()
+
+ def test_del__CHUNK_SIZE_SystemError(self):
+ t = self.TextIOWrapper(self.BytesIO(), encoding='ascii')
+ with self.assertRaises(AttributeError):
+ del t._CHUNK_SIZE
+
+ def test_internal_buffer_size(self):
+ # bpo-43260: TextIOWrapper's internal buffer should not store
+ # data larger than chunk size.
+ chunk_size = 8192 # default chunk size, updated later
+
+ class MockIO(self.MockRawIO):
+ def write(self, data):
+ if len(data) > chunk_size:
+ raise RuntimeError
+ return super().write(data)
+
+ buf = MockIO()
+ t = self.TextIOWrapper(buf, encoding="ascii")
+ chunk_size = t._CHUNK_SIZE
+ t.write("abc")
+ t.write("def")
+ # default chunk size is 8192 bytes so t don't write data to buf.
+ self.assertEqual([], buf._write_stack)
+
+ with self.assertRaises(RuntimeError):
+ t.write("x"*(chunk_size+1))
+
+ self.assertEqual([b"abcdef"], buf._write_stack)
+ t.write("ghi")
+ t.write("x"*chunk_size)
+ self.assertEqual([b"abcdef", b"ghi", b"x"*chunk_size], buf._write_stack)
+
+ def test_issue119506(self):
+ chunk_size = 8192
+
+ class MockIO(self.MockRawIO):
+ written = False
+ def write(self, data):
+ if not self.written:
+ self.written = True
+ t.write("middle")
+ return super().write(data)
+
+ buf = MockIO()
+ t = self.TextIOWrapper(buf)
+ t.write("abc")
+ t.write("def")
+ # writing data which size >= chunk_size cause flushing buffer before write.
+ t.write("g" * chunk_size)
+ t.flush()
+
+ self.assertEqual([b"abcdef", b"middle", b"g"*chunk_size],
+ buf._write_stack)
+
+
+class PyTextIOWrapperTest(TextIOWrapperTest, PyTestCase):
+ shutdown_error = "LookupError: unknown encoding: ascii"
+
+
+class IncrementalNewlineDecoderTest:
+
+ def check_newline_decoding_utf8(self, decoder):
+ # UTF-8 specific tests for a newline decoder
+ def _check_decode(b, s, **kwargs):
+ # We exercise getstate() / setstate() as well as decode()
+ state = decoder.getstate()
+ self.assertEqual(decoder.decode(b, **kwargs), s)
+ decoder.setstate(state)
+ self.assertEqual(decoder.decode(b, **kwargs), s)
+
+ _check_decode(b'\xe8\xa2\x88', "\u8888")
+
+ _check_decode(b'\xe8', "")
+ _check_decode(b'\xa2', "")
+ _check_decode(b'\x88', "\u8888")
+
+ _check_decode(b'\xe8', "")
+ _check_decode(b'\xa2', "")
+ _check_decode(b'\x88', "\u8888")
+
+ _check_decode(b'\xe8', "")
+ self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
+
+ decoder.reset()
+ _check_decode(b'\n', "\n")
+ _check_decode(b'\r', "")
+ _check_decode(b'', "\n", final=True)
+ _check_decode(b'\r', "\n", final=True)
+
+ _check_decode(b'\r', "")
+ _check_decode(b'a', "\na")
+
+ _check_decode(b'\r\r\n', "\n\n")
+ _check_decode(b'\r', "")
+ _check_decode(b'\r', "\n")
+ _check_decode(b'\na', "\na")
+
+ _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
+ _check_decode(b'\xe8\xa2\x88', "\u8888")
+ _check_decode(b'\n', "\n")
+ _check_decode(b'\xe8\xa2\x88\r', "\u8888")
+ _check_decode(b'\n', "\n")
+
+ def check_newline_decoding(self, decoder, encoding):
+ result = []
+ if encoding is not None:
+ encoder = codecs.getincrementalencoder(encoding)()
+ def _decode_bytewise(s):
+ # Decode one byte at a time
+ for b in encoder.encode(s):
+ result.append(decoder.decode(bytes([b])))
+ else:
+ encoder = None
+ def _decode_bytewise(s):
+ # Decode one char at a time
+ for c in s:
+ result.append(decoder.decode(c))
+ self.assertEqual(decoder.newlines, None)
+ _decode_bytewise("abc\n\r")
+ self.assertEqual(decoder.newlines, '\n')
+ _decode_bytewise("\nabc")
+ self.assertEqual(decoder.newlines, ('\n', '\r\n'))
+ _decode_bytewise("abc\r")
+ self.assertEqual(decoder.newlines, ('\n', '\r\n'))
+ _decode_bytewise("abc")
+ self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
+ _decode_bytewise("abc\r")
+ self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
+ decoder.reset()
+ input = "abc"
+ if encoder is not None:
+ encoder.reset()
+ input = encoder.encode(input)
+ self.assertEqual(decoder.decode(input), "abc")
+ self.assertEqual(decoder.newlines, None)
+
+ def test_newline_decoder(self):
+ encodings = (
+ # None meaning the IncrementalNewlineDecoder takes unicode input
+ # rather than bytes input
+ None, 'utf-8', 'latin-1',
+ 'utf-16', 'utf-16-le', 'utf-16-be',
+ 'utf-32', 'utf-32-le', 'utf-32-be',
+ )
+ for enc in encodings:
+ decoder = enc and codecs.getincrementaldecoder(enc)()
+ decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
+ self.check_newline_decoding(decoder, enc)
+ decoder = codecs.getincrementaldecoder("utf-8")()
+ decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
+ self.check_newline_decoding_utf8(decoder)
+ self.assertRaises(TypeError, decoder.setstate, 42)
+
+ def test_newline_bytes(self):
+ # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
+ def _check(dec):
+ self.assertEqual(dec.newlines, None)
+ self.assertEqual(dec.decode("\u0D00"), "\u0D00")
+ self.assertEqual(dec.newlines, None)
+ self.assertEqual(dec.decode("\u0A00"), "\u0A00")
+ self.assertEqual(dec.newlines, None)
+ dec = self.IncrementalNewlineDecoder(None, translate=False)
+ _check(dec)
+ dec = self.IncrementalNewlineDecoder(None, translate=True)
+ _check(dec)
+
+ def test_translate(self):
+ # issue 35062
+ for translate in (-2, -1, 1, 2):
+ decoder = codecs.getincrementaldecoder("utf-8")()
+ decoder = self.IncrementalNewlineDecoder(decoder, translate)
+ self.check_newline_decoding_utf8(decoder)
+ decoder = codecs.getincrementaldecoder("utf-8")()
+ decoder = self.IncrementalNewlineDecoder(decoder, translate=0)
+ self.assertEqual(decoder.decode(b"\r\r\n"), "\r\r\n")
+
+class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest, unittest.TestCase):
+ IncrementalNewlineDecoder = io.IncrementalNewlineDecoder
+
+ @support.cpython_only
+ def test_uninitialized(self):
+ uninitialized = self.IncrementalNewlineDecoder.__new__(
+ self.IncrementalNewlineDecoder)
+ self.assertRaises(ValueError, uninitialized.decode, b'bar')
+ self.assertRaises(ValueError, uninitialized.getstate)
+ self.assertRaises(ValueError, uninitialized.setstate, (b'foo', 0))
+ self.assertRaises(ValueError, uninitialized.reset)
+
+
+class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest, unittest.TestCase):
+ IncrementalNewlineDecoder = pyio.IncrementalNewlineDecoder