]> git.ipfire.org Git - thirdparty/Python/cpython.git/commitdiff
gh-138013: Split TextIO tests from test_io.test_general (#139173)
authorCody Maloney <cmaloney@users.noreply.github.com>
Tue, 30 Sep 2025 15:37:15 +0000 (08:37 -0700)
committerGitHub <noreply@github.com>
Tue, 30 Sep 2025 15:37:15 +0000 (17:37 +0200)
gh-138013: Split TextIO tests from test_general

These tests take 1.3 seconds on my dev machine, match fairly closely
with testing `textio.c` implementation only.

Lib/test/test_io/test_general.py
Lib/test/test_io/test_textio.py [new file with mode: 0644]

index 5f645e3abbe230aceafe21a21e75c658ae17da2d..c4a26719374450bfba8fef35fd5f587bdd68b542 100644 (file)
@@ -6,7 +6,6 @@ New tests should go in more specific modules; see test_io/__init__.py
 import abc
 import array
 import errno
-import locale
 import os
 import pickle
 import random
@@ -28,23 +27,10 @@ from test.support import (
 from test.support.os_helper import FakePath
 from .utils import byteslike, CTestCase, PyTestCase
 
-import codecs
 import io  # C implementation of io
 import _pyio as pyio # Python implementation of io
 
 
-def _default_chunk_size():
-    """Get the default TextIOWrapper chunk size"""
-    with open(__file__, "r", encoding="latin-1") as f:
-        return f._CHUNK_SIZE
-
-
-
-class BadIndex:
-    def __index__(self):
-        1/0
-
-
 class IOTest:
 
     def setUp(self):
@@ -2393,1640 +2379,6 @@ class PyBufferedRandomTest(BufferedRandomTest, PyTestCase):
     tp = pyio.BufferedRandom
 
 
-# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
-# properties:
-#   - A single output character can correspond to many bytes of input.
-#   - The number of input bytes to complete the character can be
-#     undetermined until the last input byte is received.
-#   - The number of input bytes can vary depending on previous input.
-#   - A single input byte can correspond to many characters of output.
-#   - The number of output characters can be undetermined until the
-#     last input byte is received.
-#   - The number of output characters can vary depending on previous input.
-
-class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
-    """
-    For testing seek/tell behavior with a stateful, buffering decoder.
-
-    Input is a sequence of words.  Words may be fixed-length (length set
-    by input) or variable-length (period-terminated).  In variable-length
-    mode, extra periods are ignored.  Possible words are:
-      - 'i' followed by a number sets the input length, I (maximum 99).
-        When I is set to 0, words are space-terminated.
-      - 'o' followed by a number sets the output length, O (maximum 99).
-      - Any other word is converted into a word followed by a period on
-        the output.  The output word consists of the input word truncated
-        or padded out with hyphens to make its length equal to O.  If O
-        is 0, the word is output verbatim without truncating or padding.
-    I and O are initially set to 1.  When I changes, any buffered input is
-    re-scanned according to the new I.  EOF also terminates the last word.
-    """
-
-    def __init__(self, errors='strict'):
-        codecs.IncrementalDecoder.__init__(self, errors)
-        self.reset()
-
-    def __repr__(self):
-        return '<SID %x>' % id(self)
-
-    def reset(self):
-        self.i = 1
-        self.o = 1
-        self.buffer = bytearray()
-
-    def getstate(self):
-        i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
-        return bytes(self.buffer), i*100 + o
-
-    def setstate(self, state):
-        buffer, io = state
-        self.buffer = bytearray(buffer)
-        i, o = divmod(io, 100)
-        self.i, self.o = i ^ 1, o ^ 1
-
-    def decode(self, input, final=False):
-        output = ''
-        for b in input:
-            if self.i == 0: # variable-length, terminated with period
-                if b == ord('.'):
-                    if self.buffer:
-                        output += self.process_word()
-                else:
-                    self.buffer.append(b)
-            else: # fixed-length, terminate after self.i bytes
-                self.buffer.append(b)
-                if len(self.buffer) == self.i:
-                    output += self.process_word()
-        if final and self.buffer: # EOF terminates the last word
-            output += self.process_word()
-        return output
-
-    def process_word(self):
-        output = ''
-        if self.buffer[0] == ord('i'):
-            self.i = min(99, int(self.buffer[1:] or 0)) # set input length
-        elif self.buffer[0] == ord('o'):
-            self.o = min(99, int(self.buffer[1:] or 0)) # set output length
-        else:
-            output = self.buffer.decode('ascii')
-            if len(output) < self.o:
-                output += '-'*self.o # pad out with hyphens
-            if self.o:
-                output = output[:self.o] # truncate to output length
-            output += '.'
-        self.buffer = bytearray()
-        return output
-
-    codecEnabled = False
-
-
-# bpo-41919: This method is separated from StatefulIncrementalDecoder to avoid a resource leak
-# when registering codecs and cleanup functions.
-def lookupTestDecoder(name):
-    if StatefulIncrementalDecoder.codecEnabled and name == 'test_decoder':
-        latin1 = codecs.lookup('latin-1')
-        return codecs.CodecInfo(
-            name='test_decoder', encode=latin1.encode, decode=None,
-            incrementalencoder=None,
-            streamreader=None, streamwriter=None,
-            incrementaldecoder=StatefulIncrementalDecoder)
-
-
-class StatefulIncrementalDecoderTest(unittest.TestCase):
-    """
-    Make sure the StatefulIncrementalDecoder actually works.
-    """
-
-    test_cases = [
-        # I=1, O=1 (fixed-length input == fixed-length output)
-        (b'abcd', False, 'a.b.c.d.'),
-        # I=0, O=0 (variable-length input, variable-length output)
-        (b'oiabcd', True, 'abcd.'),
-        # I=0, O=0 (should ignore extra periods)
-        (b'oi...abcd...', True, 'abcd.'),
-        # I=0, O=6 (variable-length input, fixed-length output)
-        (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
-        # I=2, O=6 (fixed-length input < fixed-length output)
-        (b'i.i2.o6xyz', True, 'xy----.z-----.'),
-        # I=6, O=3 (fixed-length input > fixed-length output)
-        (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
-        # I=0, then 3; O=29, then 15 (with longer output)
-        (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
-         'a----------------------------.' +
-         'b----------------------------.' +
-         'cde--------------------------.' +
-         'abcdefghijabcde.' +
-         'a.b------------.' +
-         '.c.------------.' +
-         'd.e------------.' +
-         'k--------------.' +
-         'l--------------.' +
-         'm--------------.')
-    ]
-
-    def test_decoder(self):
-        # Try a few one-shot test cases.
-        for input, eof, output in self.test_cases:
-            d = StatefulIncrementalDecoder()
-            self.assertEqual(d.decode(input, eof), output)
-
-        # Also test an unfinished decode, followed by forcing EOF.
-        d = StatefulIncrementalDecoder()
-        self.assertEqual(d.decode(b'oiabcd'), '')
-        self.assertEqual(d.decode(b'', 1), 'abcd.')
-
-class TextIOWrapperTest:
-
-    def setUp(self):
-        self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
-        self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
-        os_helper.unlink(os_helper.TESTFN)
-        codecs.register(lookupTestDecoder)
-        self.addCleanup(codecs.unregister, lookupTestDecoder)
-
-    def tearDown(self):
-        os_helper.unlink(os_helper.TESTFN)
-
-    def test_constructor(self):
-        r = self.BytesIO(b"\xc3\xa9\n\n")
-        b = self.BufferedReader(r, 1000)
-        t = self.TextIOWrapper(b, encoding="utf-8")
-        t.__init__(b, encoding="latin-1", newline="\r\n")
-        self.assertEqual(t.encoding, "latin-1")
-        self.assertEqual(t.line_buffering, False)
-        t.__init__(b, encoding="utf-8", line_buffering=True)
-        self.assertEqual(t.encoding, "utf-8")
-        self.assertEqual(t.line_buffering, True)
-        self.assertEqual("\xe9\n", t.readline())
-        invalid_type = TypeError if self.is_C else ValueError
-        with self.assertRaises(invalid_type):
-            t.__init__(b, encoding=42)
-        with self.assertRaises(UnicodeEncodeError):
-            t.__init__(b, encoding='\udcfe')
-        with self.assertRaises(ValueError):
-            t.__init__(b, encoding='utf-8\0')
-        with self.assertRaises(invalid_type):
-            t.__init__(b, encoding="utf-8", errors=42)
-        if support.Py_DEBUG or sys.flags.dev_mode or self.is_C:
-            with self.assertRaises(UnicodeEncodeError):
-                t.__init__(b, encoding="utf-8", errors='\udcfe')
-        if support.Py_DEBUG or sys.flags.dev_mode or self.is_C:
-            with self.assertRaises(ValueError):
-                t.__init__(b, encoding="utf-8", errors='replace\0')
-        with self.assertRaises(TypeError):
-            t.__init__(b, encoding="utf-8", newline=42)
-        with self.assertRaises(ValueError):
-            t.__init__(b, encoding="utf-8", newline='\udcfe')
-        with self.assertRaises(ValueError):
-            t.__init__(b, encoding="utf-8", newline='\n\0')
-        with self.assertRaises(ValueError):
-            t.__init__(b, encoding="utf-8", newline='xyzzy')
-
-    def test_uninitialized(self):
-        t = self.TextIOWrapper.__new__(self.TextIOWrapper)
-        del t
-        t = self.TextIOWrapper.__new__(self.TextIOWrapper)
-        self.assertRaises(Exception, repr, t)
-        self.assertRaisesRegex((ValueError, AttributeError),
-                               'uninitialized|has no attribute',
-                               t.read, 0)
-        t.__init__(self.MockRawIO(), encoding="utf-8")
-        self.assertEqual(t.read(0), '')
-
-    def test_non_text_encoding_codecs_are_rejected(self):
-        # Ensure the constructor complains if passed a codec that isn't
-        # marked as a text encoding
-        # http://bugs.python.org/issue20404
-        r = self.BytesIO()
-        b = self.BufferedWriter(r)
-        with self.assertRaisesRegex(LookupError, "is not a text encoding"):
-            self.TextIOWrapper(b, encoding="hex")
-
-    def test_detach(self):
-        r = self.BytesIO()
-        b = self.BufferedWriter(r)
-        t = self.TextIOWrapper(b, encoding="ascii")
-        self.assertIs(t.detach(), b)
-
-        t = self.TextIOWrapper(b, encoding="ascii")
-        t.write("howdy")
-        self.assertFalse(r.getvalue())
-        t.detach()
-        self.assertEqual(r.getvalue(), b"howdy")
-        self.assertRaises(ValueError, t.detach)
-
-        # Operations independent of the detached stream should still work
-        repr(t)
-        self.assertEqual(t.encoding, "ascii")
-        self.assertEqual(t.errors, "strict")
-        self.assertFalse(t.line_buffering)
-        self.assertFalse(t.write_through)
-
-    def test_repr(self):
-        raw = self.BytesIO("hello".encode("utf-8"))
-        b = self.BufferedReader(raw)
-        t = self.TextIOWrapper(b, encoding="utf-8")
-        modname = self.TextIOWrapper.__module__
-        self.assertRegex(repr(t),
-                         r"<(%s\.)?TextIOWrapper encoding='utf-8'>" % modname)
-        raw.name = "dummy"
-        self.assertRegex(repr(t),
-                         r"<(%s\.)?TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
-        t.mode = "r"
-        self.assertRegex(repr(t),
-                         r"<(%s\.)?TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
-        raw.name = b"dummy"
-        self.assertRegex(repr(t),
-                         r"<(%s\.)?TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
-
-        t.buffer.detach()
-        repr(t)  # Should not raise an exception
-
-    def test_recursive_repr(self):
-        # Issue #25455
-        raw = self.BytesIO()
-        t = self.TextIOWrapper(raw, encoding="utf-8")
-        with support.swap_attr(raw, 'name', t), support.infinite_recursion(25):
-            with self.assertRaises(RuntimeError):
-                repr(t)  # Should not crash
-
-    def test_subclass_repr(self):
-        class TestSubclass(self.TextIOWrapper):
-            pass
-
-        f = TestSubclass(self.StringIO())
-        self.assertIn(TestSubclass.__name__, repr(f))
-
-    def test_line_buffering(self):
-        r = self.BytesIO()
-        b = self.BufferedWriter(r, 1000)
-        t = self.TextIOWrapper(b, encoding="utf-8", newline="\n", line_buffering=True)
-        t.write("X")
-        self.assertEqual(r.getvalue(), b"")  # No flush happened
-        t.write("Y\nZ")
-        self.assertEqual(r.getvalue(), b"XY\nZ")  # All got flushed
-        t.write("A\rB")
-        self.assertEqual(r.getvalue(), b"XY\nZA\rB")
-
-    def test_reconfigure_line_buffering(self):
-        r = self.BytesIO()
-        b = self.BufferedWriter(r, 1000)
-        t = self.TextIOWrapper(b, encoding="utf-8", newline="\n", line_buffering=False)
-        t.write("AB\nC")
-        self.assertEqual(r.getvalue(), b"")
-
-        t.reconfigure(line_buffering=True)   # implicit flush
-        self.assertEqual(r.getvalue(), b"AB\nC")
-        t.write("DEF\nG")
-        self.assertEqual(r.getvalue(), b"AB\nCDEF\nG")
-        t.write("H")
-        self.assertEqual(r.getvalue(), b"AB\nCDEF\nG")
-        t.reconfigure(line_buffering=False)   # implicit flush
-        self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH")
-        t.write("IJ")
-        self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH")
-
-        # Keeping default value
-        t.reconfigure()
-        t.reconfigure(line_buffering=None)
-        self.assertEqual(t.line_buffering, False)
-        t.reconfigure(line_buffering=True)
-        t.reconfigure()
-        t.reconfigure(line_buffering=None)
-        self.assertEqual(t.line_buffering, True)
-
-    @unittest.skipIf(sys.flags.utf8_mode, "utf-8 mode is enabled")
-    def test_default_encoding(self):
-        with os_helper.EnvironmentVarGuard() as env:
-            # try to get a user preferred encoding different than the current
-            # locale encoding to check that TextIOWrapper() uses the current
-            # locale encoding and not the user preferred encoding
-            env.unset('LC_ALL', 'LANG', 'LC_CTYPE')
-
-            current_locale_encoding = locale.getencoding()
-            b = self.BytesIO()
-            with warnings.catch_warnings():
-                warnings.simplefilter("ignore", EncodingWarning)
-                t = self.TextIOWrapper(b)
-            self.assertEqual(t.encoding, current_locale_encoding)
-
-    def test_encoding(self):
-        # Check the encoding attribute is always set, and valid
-        b = self.BytesIO()
-        t = self.TextIOWrapper(b, encoding="utf-8")
-        self.assertEqual(t.encoding, "utf-8")
-        with warnings.catch_warnings():
-            warnings.simplefilter("ignore", EncodingWarning)
-            t = self.TextIOWrapper(b)
-        self.assertIsNotNone(t.encoding)
-        codecs.lookup(t.encoding)
-
-    def test_encoding_errors_reading(self):
-        # (1) default
-        b = self.BytesIO(b"abc\n\xff\n")
-        t = self.TextIOWrapper(b, encoding="ascii")
-        self.assertRaises(UnicodeError, t.read)
-        # (2) explicit strict
-        b = self.BytesIO(b"abc\n\xff\n")
-        t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
-        self.assertRaises(UnicodeError, t.read)
-        # (3) ignore
-        b = self.BytesIO(b"abc\n\xff\n")
-        t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
-        self.assertEqual(t.read(), "abc\n\n")
-        # (4) replace
-        b = self.BytesIO(b"abc\n\xff\n")
-        t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
-        self.assertEqual(t.read(), "abc\n\ufffd\n")
-
-    def test_encoding_errors_writing(self):
-        # (1) default
-        b = self.BytesIO()
-        t = self.TextIOWrapper(b, encoding="ascii")
-        self.assertRaises(UnicodeError, t.write, "\xff")
-        # (2) explicit strict
-        b = self.BytesIO()
-        t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
-        self.assertRaises(UnicodeError, t.write, "\xff")
-        # (3) ignore
-        b = self.BytesIO()
-        t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
-                             newline="\n")
-        t.write("abc\xffdef\n")
-        t.flush()
-        self.assertEqual(b.getvalue(), b"abcdef\n")
-        # (4) replace
-        b = self.BytesIO()
-        t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
-                             newline="\n")
-        t.write("abc\xffdef\n")
-        t.flush()
-        self.assertEqual(b.getvalue(), b"abc?def\n")
-
-    def test_newlines(self):
-        input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
-
-        tests = [
-            [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
-            [ '', input_lines ],
-            [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
-            [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
-            [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
-        ]
-        encodings = (
-            'utf-8', 'latin-1',
-            'utf-16', 'utf-16-le', 'utf-16-be',
-            'utf-32', 'utf-32-le', 'utf-32-be',
-        )
-
-        # Try a range of buffer sizes to test the case where \r is the last
-        # character in TextIOWrapper._pending_line.
-        for encoding in encodings:
-            # XXX: str.encode() should return bytes
-            data = bytes(''.join(input_lines).encode(encoding))
-            for do_reads in (False, True):
-                for bufsize in range(1, 10):
-                    for newline, exp_lines in tests:
-                        bufio = self.BufferedReader(self.BytesIO(data), bufsize)
-                        textio = self.TextIOWrapper(bufio, newline=newline,
-                                                  encoding=encoding)
-                        if do_reads:
-                            got_lines = []
-                            while True:
-                                c2 = textio.read(2)
-                                if c2 == '':
-                                    break
-                                self.assertEqual(len(c2), 2)
-                                got_lines.append(c2 + textio.readline())
-                        else:
-                            got_lines = list(textio)
-
-                        for got_line, exp_line in zip(got_lines, exp_lines):
-                            self.assertEqual(got_line, exp_line)
-                        self.assertEqual(len(got_lines), len(exp_lines))
-
-    def test_newlines_input(self):
-        testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
-        normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
-        for newline, expected in [
-            (None, normalized.decode("ascii").splitlines(keepends=True)),
-            ("", testdata.decode("ascii").splitlines(keepends=True)),
-            ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
-            ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
-            ("\r",  ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
-            ]:
-            buf = self.BytesIO(testdata)
-            txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
-            self.assertEqual(txt.readlines(), expected)
-            txt.seek(0)
-            self.assertEqual(txt.read(), "".join(expected))
-
-    def test_newlines_output(self):
-        testdict = {
-            "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
-            "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
-            "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
-            "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
-            }
-        tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
-        for newline, expected in tests:
-            buf = self.BytesIO()
-            txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
-            txt.write("AAA\nB")
-            txt.write("BB\nCCC\n")
-            txt.write("X\rY\r\nZ")
-            txt.flush()
-            self.assertEqual(buf.closed, False)
-            self.assertEqual(buf.getvalue(), expected)
-
-    def test_destructor(self):
-        l = []
-        base = self.BytesIO
-        class MyBytesIO(base):
-            def close(self):
-                l.append(self.getvalue())
-                base.close(self)
-        b = MyBytesIO()
-        t = self.TextIOWrapper(b, encoding="ascii")
-        t.write("abc")
-        del t
-        support.gc_collect()
-        self.assertEqual([b"abc"], l)
-
-    def test_override_destructor(self):
-        record = []
-        class MyTextIO(self.TextIOWrapper):
-            def __del__(self):
-                record.append(1)
-                try:
-                    f = super().__del__
-                except AttributeError:
-                    pass
-                else:
-                    f()
-            def close(self):
-                record.append(2)
-                super().close()
-            def flush(self):
-                record.append(3)
-                super().flush()
-        b = self.BytesIO()
-        t = MyTextIO(b, encoding="ascii")
-        del t
-        support.gc_collect()
-        self.assertEqual(record, [1, 2, 3])
-
-    def test_error_through_destructor(self):
-        # Test that the exception state is not modified by a destructor,
-        # even if close() fails.
-        rawio = self.CloseFailureIO()
-        with support.catch_unraisable_exception() as cm:
-            with self.assertRaises(AttributeError):
-                self.TextIOWrapper(rawio, encoding="utf-8").xyzzy
-
-            self.assertEqual(cm.unraisable.exc_type, OSError)
-
-    # Systematic tests of the text I/O API
-
-    def test_basic_io(self):
-        for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
-            for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
-                f = self.open(os_helper.TESTFN, "w+", encoding=enc)
-                f._CHUNK_SIZE = chunksize
-                self.assertEqual(f.write("abc"), 3)
-                f.close()
-                f = self.open(os_helper.TESTFN, "r+", encoding=enc)
-                f._CHUNK_SIZE = chunksize
-                self.assertEqual(f.tell(), 0)
-                self.assertEqual(f.read(), "abc")
-                cookie = f.tell()
-                self.assertEqual(f.seek(0), 0)
-                self.assertEqual(f.read(None), "abc")
-                f.seek(0)
-                self.assertEqual(f.read(2), "ab")
-                self.assertEqual(f.read(1), "c")
-                self.assertEqual(f.read(1), "")
-                self.assertEqual(f.read(), "")
-                self.assertEqual(f.tell(), cookie)
-                self.assertEqual(f.seek(0), 0)
-                self.assertEqual(f.seek(0, 2), cookie)
-                self.assertEqual(f.write("def"), 3)
-                self.assertEqual(f.seek(cookie), cookie)
-                self.assertEqual(f.read(), "def")
-                if enc.startswith("utf"):
-                    self.multi_line_test(f, enc)
-                f.close()
-
-    def multi_line_test(self, f, enc):
-        f.seek(0)
-        f.truncate()
-        sample = "s\xff\u0fff\uffff"
-        wlines = []
-        for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
-            chars = []
-            for i in range(size):
-                chars.append(sample[i % len(sample)])
-            line = "".join(chars) + "\n"
-            wlines.append((f.tell(), line))
-            f.write(line)
-        f.seek(0)
-        rlines = []
-        while True:
-            pos = f.tell()
-            line = f.readline()
-            if not line:
-                break
-            rlines.append((pos, line))
-        self.assertEqual(rlines, wlines)
-
-    def test_telling(self):
-        f = self.open(os_helper.TESTFN, "w+", encoding="utf-8")
-        p0 = f.tell()
-        f.write("\xff\n")
-        p1 = f.tell()
-        f.write("\xff\n")
-        p2 = f.tell()
-        f.seek(0)
-        self.assertEqual(f.tell(), p0)
-        self.assertEqual(f.readline(), "\xff\n")
-        self.assertEqual(f.tell(), p1)
-        self.assertEqual(f.readline(), "\xff\n")
-        self.assertEqual(f.tell(), p2)
-        f.seek(0)
-        for line in f:
-            self.assertEqual(line, "\xff\n")
-            self.assertRaises(OSError, f.tell)
-        self.assertEqual(f.tell(), p2)
-        f.close()
-
-    def test_seeking(self):
-        chunk_size = _default_chunk_size()
-        prefix_size = chunk_size - 2
-        u_prefix = "a" * prefix_size
-        prefix = bytes(u_prefix.encode("utf-8"))
-        self.assertEqual(len(u_prefix), len(prefix))
-        u_suffix = "\u8888\n"
-        suffix = bytes(u_suffix.encode("utf-8"))
-        line = prefix + suffix
-        with self.open(os_helper.TESTFN, "wb") as f:
-            f.write(line*2)
-        with self.open(os_helper.TESTFN, "r", encoding="utf-8") as f:
-            s = f.read(prefix_size)
-            self.assertEqual(s, str(prefix, "ascii"))
-            self.assertEqual(f.tell(), prefix_size)
-            self.assertEqual(f.readline(), u_suffix)
-
-    def test_seeking_too(self):
-        # Regression test for a specific bug
-        data = b'\xe0\xbf\xbf\n'
-        with self.open(os_helper.TESTFN, "wb") as f:
-            f.write(data)
-        with self.open(os_helper.TESTFN, "r", encoding="utf-8") as f:
-            f._CHUNK_SIZE  # Just test that it exists
-            f._CHUNK_SIZE = 2
-            f.readline()
-            f.tell()
-
-    def test_seek_and_tell(self):
-        #Test seek/tell using the StatefulIncrementalDecoder.
-        # Make test faster by doing smaller seeks
-        CHUNK_SIZE = 128
-
-        def test_seek_and_tell_with_data(data, min_pos=0):
-            """Tell/seek to various points within a data stream and ensure
-            that the decoded data returned by read() is consistent."""
-            f = self.open(os_helper.TESTFN, 'wb')
-            f.write(data)
-            f.close()
-            f = self.open(os_helper.TESTFN, encoding='test_decoder')
-            f._CHUNK_SIZE = CHUNK_SIZE
-            decoded = f.read()
-            f.close()
-
-            for i in range(min_pos, len(decoded) + 1): # seek positions
-                for j in [1, 5, len(decoded) - i]: # read lengths
-                    f = self.open(os_helper.TESTFN, encoding='test_decoder')
-                    self.assertEqual(f.read(i), decoded[:i])
-                    cookie = f.tell()
-                    self.assertEqual(f.read(j), decoded[i:i + j])
-                    f.seek(cookie)
-                    self.assertEqual(f.read(), decoded[i:])
-                    f.close()
-
-        # Enable the test decoder.
-        StatefulIncrementalDecoder.codecEnabled = 1
-
-        # Run the tests.
-        try:
-            # Try each test case.
-            for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
-                test_seek_and_tell_with_data(input)
-
-            # Position each test case so that it crosses a chunk boundary.
-            for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
-                offset = CHUNK_SIZE - len(input)//2
-                prefix = b'.'*offset
-                # Don't bother seeking into the prefix (takes too long).
-                min_pos = offset*2
-                test_seek_and_tell_with_data(prefix + input, min_pos)
-
-        # Ensure our test decoder won't interfere with subsequent tests.
-        finally:
-            StatefulIncrementalDecoder.codecEnabled = 0
-
-    def test_multibyte_seek_and_tell(self):
-        f = self.open(os_helper.TESTFN, "w", encoding="euc_jp")
-        f.write("AB\n\u3046\u3048\n")
-        f.close()
-
-        f = self.open(os_helper.TESTFN, "r", encoding="euc_jp")
-        self.assertEqual(f.readline(), "AB\n")
-        p0 = f.tell()
-        self.assertEqual(f.readline(), "\u3046\u3048\n")
-        p1 = f.tell()
-        f.seek(p0)
-        self.assertEqual(f.readline(), "\u3046\u3048\n")
-        self.assertEqual(f.tell(), p1)
-        f.close()
-
-    def test_seek_with_encoder_state(self):
-        f = self.open(os_helper.TESTFN, "w", encoding="euc_jis_2004")
-        f.write("\u00e6\u0300")
-        p0 = f.tell()
-        f.write("\u00e6")
-        f.seek(p0)
-        f.write("\u0300")
-        f.close()
-
-        f = self.open(os_helper.TESTFN, "r", encoding="euc_jis_2004")
-        self.assertEqual(f.readline(), "\u00e6\u0300\u0300")
-        f.close()
-
-    def test_encoded_writes(self):
-        data = "1234567890"
-        tests = ("utf-16",
-                 "utf-16-le",
-                 "utf-16-be",
-                 "utf-32",
-                 "utf-32-le",
-                 "utf-32-be")
-        for encoding in tests:
-            buf = self.BytesIO()
-            f = self.TextIOWrapper(buf, encoding=encoding)
-            # Check if the BOM is written only once (see issue1753).
-            f.write(data)
-            f.write(data)
-            f.seek(0)
-            self.assertEqual(f.read(), data * 2)
-            f.seek(0)
-            self.assertEqual(f.read(), data * 2)
-            self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
-
-    def test_unreadable(self):
-        class UnReadable(self.BytesIO):
-            def readable(self):
-                return False
-        txt = self.TextIOWrapper(UnReadable(), encoding="utf-8")
-        self.assertRaises(OSError, txt.read)
-
-    def test_read_one_by_one(self):
-        txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"), encoding="utf-8")
-        reads = ""
-        while True:
-            c = txt.read(1)
-            if not c:
-                break
-            reads += c
-        self.assertEqual(reads, "AA\nBB")
-
-    def test_readlines(self):
-        txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"), encoding="utf-8")
-        self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
-        txt.seek(0)
-        self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
-        txt.seek(0)
-        self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
-
-    # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
-    def test_read_by_chunk(self):
-        # make sure "\r\n" straddles 128 char boundary.
-        txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"), encoding="utf-8")
-        reads = ""
-        while True:
-            c = txt.read(128)
-            if not c:
-                break
-            reads += c
-        self.assertEqual(reads, "A"*127+"\nB")
-
-    def test_writelines(self):
-        l = ['ab', 'cd', 'ef']
-        buf = self.BytesIO()
-        txt = self.TextIOWrapper(buf, encoding="utf-8")
-        txt.writelines(l)
-        txt.flush()
-        self.assertEqual(buf.getvalue(), b'abcdef')
-
-    def test_writelines_userlist(self):
-        l = UserList(['ab', 'cd', 'ef'])
-        buf = self.BytesIO()
-        txt = self.TextIOWrapper(buf, encoding="utf-8")
-        txt.writelines(l)
-        txt.flush()
-        self.assertEqual(buf.getvalue(), b'abcdef')
-
-    def test_writelines_error(self):
-        txt = self.TextIOWrapper(self.BytesIO(), encoding="utf-8")
-        self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
-        self.assertRaises(TypeError, txt.writelines, None)
-        self.assertRaises(TypeError, txt.writelines, b'abc')
-
-    def test_issue1395_1(self):
-        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
-
-        # read one char at a time
-        reads = ""
-        while True:
-            c = txt.read(1)
-            if not c:
-                break
-            reads += c
-        self.assertEqual(reads, self.normalized)
-
-    def test_issue1395_2(self):
-        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
-        txt._CHUNK_SIZE = 4
-
-        reads = ""
-        while True:
-            c = txt.read(4)
-            if not c:
-                break
-            reads += c
-        self.assertEqual(reads, self.normalized)
-
-    def test_issue1395_3(self):
-        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
-        txt._CHUNK_SIZE = 4
-
-        reads = txt.read(4)
-        reads += txt.read(4)
-        reads += txt.readline()
-        reads += txt.readline()
-        reads += txt.readline()
-        self.assertEqual(reads, self.normalized)
-
-    def test_issue1395_4(self):
-        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
-        txt._CHUNK_SIZE = 4
-
-        reads = txt.read(4)
-        reads += txt.read()
-        self.assertEqual(reads, self.normalized)
-
-    def test_issue1395_5(self):
-        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
-        txt._CHUNK_SIZE = 4
-
-        reads = txt.read(4)
-        pos = txt.tell()
-        txt.seek(0)
-        txt.seek(pos)
-        self.assertEqual(txt.read(4), "BBB\n")
-
-    def test_issue2282(self):
-        buffer = self.BytesIO(self.testdata)
-        txt = self.TextIOWrapper(buffer, encoding="ascii")
-
-        self.assertEqual(buffer.seekable(), txt.seekable())
-
-    def test_append_bom(self):
-        # The BOM is not written again when appending to a non-empty file
-        filename = os_helper.TESTFN
-        for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
-            with self.open(filename, 'w', encoding=charset) as f:
-                f.write('aaa')
-                pos = f.tell()
-            with self.open(filename, 'rb') as f:
-                self.assertEqual(f.read(), 'aaa'.encode(charset))
-
-            with self.open(filename, 'a', encoding=charset) as f:
-                f.write('xxx')
-            with self.open(filename, 'rb') as f:
-                self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
-
-    def test_seek_bom(self):
-        # Same test, but when seeking manually
-        filename = os_helper.TESTFN
-        for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
-            with self.open(filename, 'w', encoding=charset) as f:
-                f.write('aaa')
-                pos = f.tell()
-            with self.open(filename, 'r+', encoding=charset) as f:
-                f.seek(pos)
-                f.write('zzz')
-                f.seek(0)
-                f.write('bbb')
-            with self.open(filename, 'rb') as f:
-                self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
-
-    def test_seek_append_bom(self):
-        # Same test, but first seek to the start and then to the end
-        filename = os_helper.TESTFN
-        for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
-            with self.open(filename, 'w', encoding=charset) as f:
-                f.write('aaa')
-            with self.open(filename, 'a', encoding=charset) as f:
-                f.seek(0)
-                f.seek(0, self.SEEK_END)
-                f.write('xxx')
-            with self.open(filename, 'rb') as f:
-                self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
-
-    def test_errors_property(self):
-        with self.open(os_helper.TESTFN, "w", encoding="utf-8") as f:
-            self.assertEqual(f.errors, "strict")
-        with self.open(os_helper.TESTFN, "w", encoding="utf-8", errors="replace") as f:
-            self.assertEqual(f.errors, "replace")
-
-    @support.no_tracing
-    @threading_helper.requires_working_threading()
-    def test_threads_write(self):
-        # Issue6750: concurrent writes could duplicate data
-        event = threading.Event()
-        with self.open(os_helper.TESTFN, "w", encoding="utf-8", buffering=1) as f:
-            def run(n):
-                text = "Thread%03d\n" % n
-                event.wait()
-                f.write(text)
-            threads = [threading.Thread(target=run, args=(x,))
-                       for x in range(20)]
-            with threading_helper.start_threads(threads, event.set):
-                time.sleep(0.02)
-        with self.open(os_helper.TESTFN, encoding="utf-8") as f:
-            content = f.read()
-            for n in range(20):
-                self.assertEqual(content.count("Thread%03d\n" % n), 1)
-
-    def test_flush_error_on_close(self):
-        # Test that text file is closed despite failed flush
-        # and that flush() is called before file closed.
-        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
-        closed = []
-        def bad_flush():
-            closed[:] = [txt.closed, txt.buffer.closed]
-            raise OSError()
-        txt.flush = bad_flush
-        self.assertRaises(OSError, txt.close) # exception not swallowed
-        self.assertTrue(txt.closed)
-        self.assertTrue(txt.buffer.closed)
-        self.assertTrue(closed)      # flush() called
-        self.assertFalse(closed[0])  # flush() called before file closed
-        self.assertFalse(closed[1])
-        txt.flush = lambda: None  # break reference loop
-
-    def test_close_error_on_close(self):
-        buffer = self.BytesIO(self.testdata)
-        def bad_flush():
-            raise OSError('flush')
-        def bad_close():
-            raise OSError('close')
-        buffer.close = bad_close
-        txt = self.TextIOWrapper(buffer, encoding="ascii")
-        txt.flush = bad_flush
-        with self.assertRaises(OSError) as err: # exception not swallowed
-            txt.close()
-        self.assertEqual(err.exception.args, ('close',))
-        self.assertIsInstance(err.exception.__context__, OSError)
-        self.assertEqual(err.exception.__context__.args, ('flush',))
-        self.assertFalse(txt.closed)
-
-        # Silence destructor error
-        buffer.close = lambda: None
-        txt.flush = lambda: None
-
-    def test_nonnormalized_close_error_on_close(self):
-        # Issue #21677
-        buffer = self.BytesIO(self.testdata)
-        def bad_flush():
-            raise non_existing_flush
-        def bad_close():
-            raise non_existing_close
-        buffer.close = bad_close
-        txt = self.TextIOWrapper(buffer, encoding="ascii")
-        txt.flush = bad_flush
-        with self.assertRaises(NameError) as err: # exception not swallowed
-            txt.close()
-        self.assertIn('non_existing_close', str(err.exception))
-        self.assertIsInstance(err.exception.__context__, NameError)
-        self.assertIn('non_existing_flush', str(err.exception.__context__))
-        self.assertFalse(txt.closed)
-
-        # Silence destructor error
-        buffer.close = lambda: None
-        txt.flush = lambda: None
-
-    def test_multi_close(self):
-        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
-        txt.close()
-        txt.close()
-        txt.close()
-        self.assertRaises(ValueError, txt.flush)
-
-    def test_unseekable(self):
-        txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata), encoding="utf-8")
-        self.assertRaises(self.UnsupportedOperation, txt.tell)
-        self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
-
-    def test_readonly_attributes(self):
-        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
-        buf = self.BytesIO(self.testdata)
-        with self.assertRaises(AttributeError):
-            txt.buffer = buf
-
-    def test_rawio(self):
-        # Issue #12591: TextIOWrapper must work with raw I/O objects, so
-        # that subprocess.Popen() can have the required unbuffered
-        # semantics with universal_newlines=True.
-        raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
-        txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
-        # Reads
-        self.assertEqual(txt.read(4), 'abcd')
-        self.assertEqual(txt.readline(), 'efghi\n')
-        self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
-
-    def test_rawio_write_through(self):
-        # Issue #12591: with write_through=True, writes don't need a flush
-        raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
-        txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
-                                 write_through=True)
-        txt.write('1')
-        txt.write('23\n4')
-        txt.write('5')
-        self.assertEqual(b''.join(raw._write_stack), b'123\n45')
-
-    def test_bufio_write_through(self):
-        # Issue #21396: write_through=True doesn't force a flush()
-        # on the underlying binary buffered object.
-        flush_called, write_called = [], []
-        class BufferedWriter(self.BufferedWriter):
-            def flush(self, *args, **kwargs):
-                flush_called.append(True)
-                return super().flush(*args, **kwargs)
-            def write(self, *args, **kwargs):
-                write_called.append(True)
-                return super().write(*args, **kwargs)
-
-        rawio = self.BytesIO()
-        data = b"a"
-        bufio = BufferedWriter(rawio, len(data)*2)
-        textio = self.TextIOWrapper(bufio, encoding='ascii',
-                                    write_through=True)
-        # write to the buffered io but don't overflow the buffer
-        text = data.decode('ascii')
-        textio.write(text)
-
-        # buffer.flush is not called with write_through=True
-        self.assertFalse(flush_called)
-        # buffer.write *is* called with write_through=True
-        self.assertTrue(write_called)
-        self.assertEqual(rawio.getvalue(), b"") # no flush
-
-        write_called = [] # reset
-        textio.write(text * 10) # total content is larger than bufio buffer
-        self.assertTrue(write_called)
-        self.assertEqual(rawio.getvalue(), data * 11) # all flushed
-
-    def test_reconfigure_write_through(self):
-        raw = self.MockRawIO([])
-        t = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
-        t.write('1')
-        t.reconfigure(write_through=True)  # implied flush
-        self.assertEqual(t.write_through, True)
-        self.assertEqual(b''.join(raw._write_stack), b'1')
-        t.write('23')
-        self.assertEqual(b''.join(raw._write_stack), b'123')
-        t.reconfigure(write_through=False)
-        self.assertEqual(t.write_through, False)
-        t.write('45')
-        t.flush()
-        self.assertEqual(b''.join(raw._write_stack), b'12345')
-        # Keeping default value
-        t.reconfigure()
-        t.reconfigure(write_through=None)
-        self.assertEqual(t.write_through, False)
-        t.reconfigure(write_through=True)
-        t.reconfigure()
-        t.reconfigure(write_through=None)
-        self.assertEqual(t.write_through, True)
-
-    def test_read_nonbytes(self):
-        # Issue #17106
-        # Crash when underlying read() returns non-bytes
-        t = self.TextIOWrapper(self.StringIO('a'), encoding="utf-8")
-        self.assertRaises(TypeError, t.read, 1)
-        t = self.TextIOWrapper(self.StringIO('a'), encoding="utf-8")
-        self.assertRaises(TypeError, t.readline)
-        t = self.TextIOWrapper(self.StringIO('a'), encoding="utf-8")
-        self.assertRaises(TypeError, t.read)
-
-    def test_illegal_encoder(self):
-        # Issue 31271: Calling write() while the return value of encoder's
-        # encode() is invalid shouldn't cause an assertion failure.
-        rot13 = codecs.lookup("rot13")
-        with support.swap_attr(rot13, '_is_text_encoding', True):
-            t = self.TextIOWrapper(self.BytesIO(b'foo'), encoding="rot13")
-        self.assertRaises(TypeError, t.write, 'bar')
-
-    def test_illegal_decoder(self):
-        # Issue #17106
-        # Bypass the early encoding check added in issue 20404
-        def _make_illegal_wrapper():
-            quopri = codecs.lookup("quopri")
-            quopri._is_text_encoding = True
-            try:
-                t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'),
-                                       newline='\n', encoding="quopri")
-            finally:
-                quopri._is_text_encoding = False
-            return t
-        # Crash when decoder returns non-string
-        t = _make_illegal_wrapper()
-        self.assertRaises(TypeError, t.read, 1)
-        t = _make_illegal_wrapper()
-        self.assertRaises(TypeError, t.readline)
-        t = _make_illegal_wrapper()
-        self.assertRaises(TypeError, t.read)
-
-        # Issue 31243: calling read() while the return value of decoder's
-        # getstate() is invalid should neither crash the interpreter nor
-        # raise a SystemError.
-        def _make_very_illegal_wrapper(getstate_ret_val):
-            class BadDecoder:
-                def getstate(self):
-                    return getstate_ret_val
-            def _get_bad_decoder(dummy):
-                return BadDecoder()
-            quopri = codecs.lookup("quopri")
-            with support.swap_attr(quopri, 'incrementaldecoder',
-                                   _get_bad_decoder):
-                return _make_illegal_wrapper()
-        t = _make_very_illegal_wrapper(42)
-        self.assertRaises(TypeError, t.read, 42)
-        t = _make_very_illegal_wrapper(())
-        self.assertRaises(TypeError, t.read, 42)
-        t = _make_very_illegal_wrapper((1, 2))
-        self.assertRaises(TypeError, t.read, 42)
-
-    def _check_create_at_shutdown(self, **kwargs):
-        # Issue #20037: creating a TextIOWrapper at shutdown
-        # shouldn't crash the interpreter.
-        iomod = self.io.__name__
-        code = """if 1:
-            import codecs
-            import {iomod} as io
-
-            # Avoid looking up codecs at shutdown
-            codecs.lookup('utf-8')
-
-            class C:
-                def __del__(self):
-                    io.TextIOWrapper(io.BytesIO(), **{kwargs})
-                    print("ok")
-            c = C()
-            """.format(iomod=iomod, kwargs=kwargs)
-        return assert_python_ok("-c", code)
-
-    def test_create_at_shutdown_without_encoding(self):
-        rc, out, err = self._check_create_at_shutdown()
-        if err:
-            # Can error out with a RuntimeError if the module state
-            # isn't found.
-            self.assertIn(self.shutdown_error, err.decode())
-        else:
-            self.assertEqual("ok", out.decode().strip())
-
-    def test_create_at_shutdown_with_encoding(self):
-        rc, out, err = self._check_create_at_shutdown(encoding='utf-8',
-                                                      errors='strict')
-        self.assertFalse(err)
-        self.assertEqual("ok", out.decode().strip())
-
-    def test_read_byteslike(self):
-        r = MemviewBytesIO(b'Just some random string\n')
-        t = self.TextIOWrapper(r, 'utf-8')
-
-        # TextIOwrapper will not read the full string, because
-        # we truncate it to a multiple of the native int size
-        # so that we can construct a more complex memoryview.
-        bytes_val =  _to_memoryview(r.getvalue()).tobytes()
-
-        self.assertEqual(t.read(200), bytes_val.decode('utf-8'))
-
-    def test_issue22849(self):
-        class F(object):
-            def readable(self): return True
-            def writable(self): return True
-            def seekable(self): return True
-
-        for i in range(10):
-            try:
-                self.TextIOWrapper(F(), encoding='utf-8')
-            except Exception:
-                pass
-
-        F.tell = lambda x: 0
-        t = self.TextIOWrapper(F(), encoding='utf-8')
-
-    def test_reconfigure_locale(self):
-        wrapper = self.TextIOWrapper(self.BytesIO(b"test"))
-        wrapper.reconfigure(encoding="locale")
-
-    def test_reconfigure_encoding_read(self):
-        # latin1 -> utf8
-        # (latin1 can decode utf-8 encoded string)
-        data = 'abc\xe9\n'.encode('latin1') + 'd\xe9f\n'.encode('utf8')
-        raw = self.BytesIO(data)
-        txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n')
-        self.assertEqual(txt.readline(), 'abc\xe9\n')
-        with self.assertRaises(self.UnsupportedOperation):
-            txt.reconfigure(encoding='utf-8')
-        with self.assertRaises(self.UnsupportedOperation):
-            txt.reconfigure(newline=None)
-
-    def test_reconfigure_write_fromascii(self):
-        # ascii has a specific encodefunc in the C implementation,
-        # but utf-8-sig has not. Make sure that we get rid of the
-        # cached encodefunc when we switch encoders.
-        raw = self.BytesIO()
-        txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
-        txt.write('foo\n')
-        txt.reconfigure(encoding='utf-8-sig')
-        txt.write('\xe9\n')
-        txt.flush()
-        self.assertEqual(raw.getvalue(), b'foo\n\xc3\xa9\n')
-
-    def test_reconfigure_write(self):
-        # latin -> utf8
-        raw = self.BytesIO()
-        txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n')
-        txt.write('abc\xe9\n')
-        txt.reconfigure(encoding='utf-8')
-        self.assertEqual(raw.getvalue(), b'abc\xe9\n')
-        txt.write('d\xe9f\n')
-        txt.flush()
-        self.assertEqual(raw.getvalue(), b'abc\xe9\nd\xc3\xa9f\n')
-
-        # ascii -> utf-8-sig: ensure that no BOM is written in the middle of
-        # the file
-        raw = self.BytesIO()
-        txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
-        txt.write('abc\n')
-        txt.reconfigure(encoding='utf-8-sig')
-        txt.write('d\xe9f\n')
-        txt.flush()
-        self.assertEqual(raw.getvalue(), b'abc\nd\xc3\xa9f\n')
-
-    def test_reconfigure_write_non_seekable(self):
-        raw = self.BytesIO()
-        raw.seekable = lambda: False
-        raw.seek = None
-        txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
-        txt.write('abc\n')
-        txt.reconfigure(encoding='utf-8-sig')
-        txt.write('d\xe9f\n')
-        txt.flush()
-
-        # If the raw stream is not seekable, there'll be a BOM
-        self.assertEqual(raw.getvalue(),  b'abc\n\xef\xbb\xbfd\xc3\xa9f\n')
-
-    def test_reconfigure_defaults(self):
-        txt = self.TextIOWrapper(self.BytesIO(), 'ascii', 'replace', '\n')
-        txt.reconfigure(encoding=None)
-        self.assertEqual(txt.encoding, 'ascii')
-        self.assertEqual(txt.errors, 'replace')
-        txt.write('LF\n')
-
-        txt.reconfigure(newline='\r\n')
-        self.assertEqual(txt.encoding, 'ascii')
-        self.assertEqual(txt.errors, 'replace')
-
-        txt.reconfigure(errors='ignore')
-        self.assertEqual(txt.encoding, 'ascii')
-        self.assertEqual(txt.errors, 'ignore')
-        txt.write('CRLF\n')
-
-        txt.reconfigure(encoding='utf-8', newline=None)
-        self.assertEqual(txt.errors, 'strict')
-        txt.seek(0)
-        self.assertEqual(txt.read(), 'LF\nCRLF\n')
-
-        self.assertEqual(txt.detach().getvalue(), b'LF\nCRLF\r\n')
-
-    def test_reconfigure_errors(self):
-        txt = self.TextIOWrapper(self.BytesIO(), 'ascii', 'replace', '\r')
-        with self.assertRaises(TypeError):  # there was a crash
-            txt.reconfigure(encoding=42)
-        if self.is_C:
-            with self.assertRaises(UnicodeEncodeError):
-                txt.reconfigure(encoding='\udcfe')
-            with self.assertRaises(LookupError):
-                txt.reconfigure(encoding='locale\0')
-        # TODO: txt.reconfigure(encoding='utf-8\0')
-        # TODO: txt.reconfigure(encoding='nonexisting')
-        with self.assertRaises(TypeError):
-            txt.reconfigure(errors=42)
-        if self.is_C:
-            with self.assertRaises(UnicodeEncodeError):
-                txt.reconfigure(errors='\udcfe')
-        # TODO: txt.reconfigure(errors='ignore\0')
-        # TODO: txt.reconfigure(errors='nonexisting')
-        with self.assertRaises(TypeError):
-            txt.reconfigure(newline=42)
-        with self.assertRaises(ValueError):
-            txt.reconfigure(newline='\udcfe')
-        with self.assertRaises(ValueError):
-            txt.reconfigure(newline='xyz')
-        if not self.is_C:
-            # TODO: Should fail in C too.
-            with self.assertRaises(ValueError):
-                txt.reconfigure(newline='\n\0')
-        if self.is_C:
-            # TODO: Use __bool__(), not __index__().
-            with self.assertRaises(ZeroDivisionError):
-                txt.reconfigure(line_buffering=BadIndex())
-            with self.assertRaises(OverflowError):
-                txt.reconfigure(line_buffering=2**1000)
-            with self.assertRaises(ZeroDivisionError):
-                txt.reconfigure(write_through=BadIndex())
-            with self.assertRaises(OverflowError):
-                txt.reconfigure(write_through=2**1000)
-            with self.assertRaises(ZeroDivisionError):  # there was a crash
-                txt.reconfigure(line_buffering=BadIndex(),
-                                write_through=BadIndex())
-        self.assertEqual(txt.encoding, 'ascii')
-        self.assertEqual(txt.errors, 'replace')
-        self.assertIs(txt.line_buffering, False)
-        self.assertIs(txt.write_through, False)
-
-        txt.reconfigure(encoding='latin1', errors='ignore', newline='\r\n',
-                        line_buffering=True, write_through=True)
-        self.assertEqual(txt.encoding, 'latin1')
-        self.assertEqual(txt.errors, 'ignore')
-        self.assertIs(txt.line_buffering, True)
-        self.assertIs(txt.write_through, True)
-
-    def test_reconfigure_newline(self):
-        raw = self.BytesIO(b'CR\rEOF')
-        txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
-        txt.reconfigure(newline=None)
-        self.assertEqual(txt.readline(), 'CR\n')
-        raw = self.BytesIO(b'CR\rEOF')
-        txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
-        txt.reconfigure(newline='')
-        self.assertEqual(txt.readline(), 'CR\r')
-        raw = self.BytesIO(b'CR\rLF\nEOF')
-        txt = self.TextIOWrapper(raw, 'ascii', newline='\r')
-        txt.reconfigure(newline='\n')
-        self.assertEqual(txt.readline(), 'CR\rLF\n')
-        raw = self.BytesIO(b'LF\nCR\rEOF')
-        txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
-        txt.reconfigure(newline='\r')
-        self.assertEqual(txt.readline(), 'LF\nCR\r')
-        raw = self.BytesIO(b'CR\rCRLF\r\nEOF')
-        txt = self.TextIOWrapper(raw, 'ascii', newline='\r')
-        txt.reconfigure(newline='\r\n')
-        self.assertEqual(txt.readline(), 'CR\rCRLF\r\n')
-
-        txt = self.TextIOWrapper(self.BytesIO(), 'ascii', newline='\r')
-        txt.reconfigure(newline=None)
-        txt.write('linesep\n')
-        txt.reconfigure(newline='')
-        txt.write('LF\n')
-        txt.reconfigure(newline='\n')
-        txt.write('LF\n')
-        txt.reconfigure(newline='\r')
-        txt.write('CR\n')
-        txt.reconfigure(newline='\r\n')
-        txt.write('CRLF\n')
-        expected = 'linesep' + os.linesep + 'LF\nLF\nCR\rCRLF\r\n'
-        self.assertEqual(txt.detach().getvalue().decode('ascii'), expected)
-
-    def test_issue25862(self):
-        # Assertion failures occurred in tell() after read() and write().
-        t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii')
-        t.read(1)
-        t.read()
-        t.tell()
-        t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii')
-        t.read(1)
-        t.write('x')
-        t.tell()
-
-    def test_issue35928(self):
-        p = self.BufferedRWPair(self.BytesIO(b'foo\nbar\n'), self.BytesIO())
-        f = self.TextIOWrapper(p)
-        res = f.readline()
-        self.assertEqual(res, 'foo\n')
-        f.write(res)
-        self.assertEqual(res + f.readline(), 'foo\nbar\n')
-
-    def test_pickling_subclass(self):
-        global MyTextIO
-        class MyTextIO(self.TextIOWrapper):
-            def __init__(self, raw, tag):
-                super().__init__(raw)
-                self.tag = tag
-            def __getstate__(self):
-                return self.tag, self.buffer.getvalue()
-            def __setstate__(slf, state):
-                tag, value = state
-                slf.__init__(self.BytesIO(value), tag)
-
-        raw = self.BytesIO(b'data')
-        txt = MyTextIO(raw, 'ham')
-        for proto in range(pickle.HIGHEST_PROTOCOL + 1):
-            with self.subTest(protocol=proto):
-                pickled = pickle.dumps(txt, proto)
-                newtxt = pickle.loads(pickled)
-                self.assertEqual(newtxt.buffer.getvalue(), b'data')
-                self.assertEqual(newtxt.tag, 'ham')
-        del MyTextIO
-
-    @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
-    def test_read_non_blocking(self):
-        import os
-        r, w = os.pipe()
-        try:
-            os.set_blocking(r, False)
-            with self.io.open(r, 'rt') as textfile:
-                r = None
-                # Nothing has been written so a non-blocking read raises a BlockingIOError exception.
-                with self.assertRaises(BlockingIOError):
-                    textfile.read()
-        finally:
-            if r is not None:
-                os.close(r)
-            os.close(w)
-
-
-class MemviewBytesIO(io.BytesIO):
-    '''A BytesIO object whose read method returns memoryviews
-       rather than bytes'''
-
-    def read1(self, len_):
-        return _to_memoryview(super().read1(len_))
-
-    def read(self, len_):
-        return _to_memoryview(super().read(len_))
-
-def _to_memoryview(buf):
-    '''Convert bytes-object *buf* to a non-trivial memoryview'''
-
-    arr = array.array('i')
-    idx = len(buf) - len(buf) % arr.itemsize
-    arr.frombytes(buf[:idx])
-    return memoryview(arr)
-
-
-class CTextIOWrapperTest(TextIOWrapperTest, CTestCase):
-    shutdown_error = "LookupError: unknown encoding: ascii"
-
-    def test_initialization(self):
-        r = self.BytesIO(b"\xc3\xa9\n\n")
-        b = self.BufferedReader(r, 1000)
-        t = self.TextIOWrapper(b, encoding="utf-8")
-        self.assertRaises(ValueError, t.__init__, b, encoding="utf-8", newline='xyzzy')
-        self.assertRaises(ValueError, t.read)
-
-        t = self.TextIOWrapper.__new__(self.TextIOWrapper)
-        self.assertRaises(Exception, repr, t)
-
-    def test_garbage_collection(self):
-        # C TextIOWrapper objects are collected, and collecting them flushes
-        # all data to disk.
-        # The Python version has __del__, so it ends in gc.garbage instead.
-        with warnings.catch_warnings():
-            warnings.simplefilter("ignore", ResourceWarning)
-            rawio = self.FileIO(os_helper.TESTFN, "wb")
-            b = self.BufferedWriter(rawio)
-            t = self.TextIOWrapper(b, encoding="ascii")
-            t.write("456def")
-            t.x = t
-            wr = weakref.ref(t)
-            del t
-            support.gc_collect()
-        self.assertIsNone(wr(), wr)
-        with self.open(os_helper.TESTFN, "rb") as f:
-            self.assertEqual(f.read(), b"456def")
-
-    def test_rwpair_cleared_before_textio(self):
-        # Issue 13070: TextIOWrapper's finalization would crash when called
-        # after the reference to the underlying BufferedRWPair's writer got
-        # cleared by the GC.
-        for i in range(1000):
-            b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
-            t1 = self.TextIOWrapper(b1, encoding="ascii")
-            b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
-            t2 = self.TextIOWrapper(b2, encoding="ascii")
-            # circular references
-            t1.buddy = t2
-            t2.buddy = t1
-        support.gc_collect()
-
-    def test_del__CHUNK_SIZE_SystemError(self):
-        t = self.TextIOWrapper(self.BytesIO(), encoding='ascii')
-        with self.assertRaises(AttributeError):
-            del t._CHUNK_SIZE
-
-    def test_internal_buffer_size(self):
-        # bpo-43260: TextIOWrapper's internal buffer should not store
-        # data larger than chunk size.
-        chunk_size = 8192  # default chunk size, updated later
-
-        class MockIO(self.MockRawIO):
-            def write(self, data):
-                if len(data) > chunk_size:
-                    raise RuntimeError
-                return super().write(data)
-
-        buf = MockIO()
-        t = self.TextIOWrapper(buf, encoding="ascii")
-        chunk_size = t._CHUNK_SIZE
-        t.write("abc")
-        t.write("def")
-        # default chunk size is 8192 bytes so t don't write data to buf.
-        self.assertEqual([], buf._write_stack)
-
-        with self.assertRaises(RuntimeError):
-            t.write("x"*(chunk_size+1))
-
-        self.assertEqual([b"abcdef"], buf._write_stack)
-        t.write("ghi")
-        t.write("x"*chunk_size)
-        self.assertEqual([b"abcdef", b"ghi", b"x"*chunk_size], buf._write_stack)
-
-    def test_issue119506(self):
-        chunk_size = 8192
-
-        class MockIO(self.MockRawIO):
-            written = False
-            def write(self, data):
-                if not self.written:
-                    self.written = True
-                    t.write("middle")
-                return super().write(data)
-
-        buf = MockIO()
-        t = self.TextIOWrapper(buf)
-        t.write("abc")
-        t.write("def")
-        # writing data which size >= chunk_size cause flushing buffer before write.
-        t.write("g" * chunk_size)
-        t.flush()
-
-        self.assertEqual([b"abcdef", b"middle", b"g"*chunk_size],
-                         buf._write_stack)
-
-
-class PyTextIOWrapperTest(TextIOWrapperTest, PyTestCase):
-    shutdown_error = "LookupError: unknown encoding: ascii"
-
-
-class IncrementalNewlineDecoderTest:
-
-    def check_newline_decoding_utf8(self, decoder):
-        # UTF-8 specific tests for a newline decoder
-        def _check_decode(b, s, **kwargs):
-            # We exercise getstate() / setstate() as well as decode()
-            state = decoder.getstate()
-            self.assertEqual(decoder.decode(b, **kwargs), s)
-            decoder.setstate(state)
-            self.assertEqual(decoder.decode(b, **kwargs), s)
-
-        _check_decode(b'\xe8\xa2\x88', "\u8888")
-
-        _check_decode(b'\xe8', "")
-        _check_decode(b'\xa2', "")
-        _check_decode(b'\x88', "\u8888")
-
-        _check_decode(b'\xe8', "")
-        _check_decode(b'\xa2', "")
-        _check_decode(b'\x88', "\u8888")
-
-        _check_decode(b'\xe8', "")
-        self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
-
-        decoder.reset()
-        _check_decode(b'\n', "\n")
-        _check_decode(b'\r', "")
-        _check_decode(b'', "\n", final=True)
-        _check_decode(b'\r', "\n", final=True)
-
-        _check_decode(b'\r', "")
-        _check_decode(b'a', "\na")
-
-        _check_decode(b'\r\r\n', "\n\n")
-        _check_decode(b'\r', "")
-        _check_decode(b'\r', "\n")
-        _check_decode(b'\na', "\na")
-
-        _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
-        _check_decode(b'\xe8\xa2\x88', "\u8888")
-        _check_decode(b'\n', "\n")
-        _check_decode(b'\xe8\xa2\x88\r', "\u8888")
-        _check_decode(b'\n', "\n")
-
-    def check_newline_decoding(self, decoder, encoding):
-        result = []
-        if encoding is not None:
-            encoder = codecs.getincrementalencoder(encoding)()
-            def _decode_bytewise(s):
-                # Decode one byte at a time
-                for b in encoder.encode(s):
-                    result.append(decoder.decode(bytes([b])))
-        else:
-            encoder = None
-            def _decode_bytewise(s):
-                # Decode one char at a time
-                for c in s:
-                    result.append(decoder.decode(c))
-        self.assertEqual(decoder.newlines, None)
-        _decode_bytewise("abc\n\r")
-        self.assertEqual(decoder.newlines, '\n')
-        _decode_bytewise("\nabc")
-        self.assertEqual(decoder.newlines, ('\n', '\r\n'))
-        _decode_bytewise("abc\r")
-        self.assertEqual(decoder.newlines, ('\n', '\r\n'))
-        _decode_bytewise("abc")
-        self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
-        _decode_bytewise("abc\r")
-        self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
-        decoder.reset()
-        input = "abc"
-        if encoder is not None:
-            encoder.reset()
-            input = encoder.encode(input)
-        self.assertEqual(decoder.decode(input), "abc")
-        self.assertEqual(decoder.newlines, None)
-
-    def test_newline_decoder(self):
-        encodings = (
-            # None meaning the IncrementalNewlineDecoder takes unicode input
-            # rather than bytes input
-            None, 'utf-8', 'latin-1',
-            'utf-16', 'utf-16-le', 'utf-16-be',
-            'utf-32', 'utf-32-le', 'utf-32-be',
-        )
-        for enc in encodings:
-            decoder = enc and codecs.getincrementaldecoder(enc)()
-            decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
-            self.check_newline_decoding(decoder, enc)
-        decoder = codecs.getincrementaldecoder("utf-8")()
-        decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
-        self.check_newline_decoding_utf8(decoder)
-        self.assertRaises(TypeError, decoder.setstate, 42)
-
-    def test_newline_bytes(self):
-        # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
-        def _check(dec):
-            self.assertEqual(dec.newlines, None)
-            self.assertEqual(dec.decode("\u0D00"), "\u0D00")
-            self.assertEqual(dec.newlines, None)
-            self.assertEqual(dec.decode("\u0A00"), "\u0A00")
-            self.assertEqual(dec.newlines, None)
-        dec = self.IncrementalNewlineDecoder(None, translate=False)
-        _check(dec)
-        dec = self.IncrementalNewlineDecoder(None, translate=True)
-        _check(dec)
-
-    def test_translate(self):
-        # issue 35062
-        for translate in (-2, -1, 1, 2):
-            decoder = codecs.getincrementaldecoder("utf-8")()
-            decoder = self.IncrementalNewlineDecoder(decoder, translate)
-            self.check_newline_decoding_utf8(decoder)
-        decoder = codecs.getincrementaldecoder("utf-8")()
-        decoder = self.IncrementalNewlineDecoder(decoder, translate=0)
-        self.assertEqual(decoder.decode(b"\r\r\n"), "\r\r\n")
-
-class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest, unittest.TestCase):
-    IncrementalNewlineDecoder = io.IncrementalNewlineDecoder
-
-    @support.cpython_only
-    def test_uninitialized(self):
-        uninitialized = self.IncrementalNewlineDecoder.__new__(
-            self.IncrementalNewlineDecoder)
-        self.assertRaises(ValueError, uninitialized.decode, b'bar')
-        self.assertRaises(ValueError, uninitialized.getstate)
-        self.assertRaises(ValueError, uninitialized.setstate, (b'foo', 0))
-        self.assertRaises(ValueError, uninitialized.reset)
-
-
-class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest, unittest.TestCase):
-    IncrementalNewlineDecoder = pyio.IncrementalNewlineDecoder
-
-
 # XXX Tests for open()
 
 class MiscIOTest:
diff --git a/Lib/test/test_io/test_textio.py b/Lib/test/test_io/test_textio.py
new file mode 100644 (file)
index 0000000..d8d0928
--- /dev/null
@@ -0,0 +1,1664 @@
+import array
+import codecs
+import locale
+import os
+import pickle
+import sys
+import threading
+import time
+import unittest
+import warnings
+import weakref
+from collections import UserList
+from test import support
+from test.support import os_helper, threading_helper
+from test.support.script_helper import assert_python_ok
+from .utils import CTestCase, PyTestCase
+
+import io  # C implementation of io
+import _pyio as pyio # Python implementation of io
+
+
+def _default_chunk_size():
+    """Get the default TextIOWrapper chunk size"""
+    with open(__file__, "r", encoding="latin-1") as f:
+        return f._CHUNK_SIZE
+
+
+class BadIndex:
+    def __index__(self):
+        1/0
+
+
+# To fully exercise seek/tell, the StatefulIncrementalDecoder has these
+# properties:
+#   - A single output character can correspond to many bytes of input.
+#   - The number of input bytes to complete the character can be
+#     undetermined until the last input byte is received.
+#   - The number of input bytes can vary depending on previous input.
+#   - A single input byte can correspond to many characters of output.
+#   - The number of output characters can be undetermined until the
+#     last input byte is received.
+#   - The number of output characters can vary depending on previous input.
+
+class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
+    """
+    For testing seek/tell behavior with a stateful, buffering decoder.
+
+    Input is a sequence of words.  Words may be fixed-length (length set
+    by input) or variable-length (period-terminated).  In variable-length
+    mode, extra periods are ignored.  Possible words are:
+      - 'i' followed by a number sets the input length, I (maximum 99).
+        When I is set to 0, words are space-terminated.
+      - 'o' followed by a number sets the output length, O (maximum 99).
+      - Any other word is converted into a word followed by a period on
+        the output.  The output word consists of the input word truncated
+        or padded out with hyphens to make its length equal to O.  If O
+        is 0, the word is output verbatim without truncating or padding.
+    I and O are initially set to 1.  When I changes, any buffered input is
+    re-scanned according to the new I.  EOF also terminates the last word.
+    """
+
+    def __init__(self, errors='strict'):
+        codecs.IncrementalDecoder.__init__(self, errors)
+        self.reset()
+
+    def __repr__(self):
+        return '<SID %x>' % id(self)
+
+    def reset(self):
+        self.i = 1
+        self.o = 1
+        self.buffer = bytearray()
+
+    def getstate(self):
+        i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
+        return bytes(self.buffer), i*100 + o
+
+    def setstate(self, state):
+        buffer, io = state
+        self.buffer = bytearray(buffer)
+        i, o = divmod(io, 100)
+        self.i, self.o = i ^ 1, o ^ 1
+
+    def decode(self, input, final=False):
+        output = ''
+        for b in input:
+            if self.i == 0: # variable-length, terminated with period
+                if b == ord('.'):
+                    if self.buffer:
+                        output += self.process_word()
+                else:
+                    self.buffer.append(b)
+            else: # fixed-length, terminate after self.i bytes
+                self.buffer.append(b)
+                if len(self.buffer) == self.i:
+                    output += self.process_word()
+        if final and self.buffer: # EOF terminates the last word
+            output += self.process_word()
+        return output
+
+    def process_word(self):
+        output = ''
+        if self.buffer[0] == ord('i'):
+            self.i = min(99, int(self.buffer[1:] or 0)) # set input length
+        elif self.buffer[0] == ord('o'):
+            self.o = min(99, int(self.buffer[1:] or 0)) # set output length
+        else:
+            output = self.buffer.decode('ascii')
+            if len(output) < self.o:
+                output += '-'*self.o # pad out with hyphens
+            if self.o:
+                output = output[:self.o] # truncate to output length
+            output += '.'
+        self.buffer = bytearray()
+        return output
+
+    codecEnabled = False
+
+
+# bpo-41919: This method is separated from StatefulIncrementalDecoder to avoid a resource leak
+# when registering codecs and cleanup functions.
+def lookupTestDecoder(name):
+    if StatefulIncrementalDecoder.codecEnabled and name == 'test_decoder':
+        latin1 = codecs.lookup('latin-1')
+        return codecs.CodecInfo(
+            name='test_decoder', encode=latin1.encode, decode=None,
+            incrementalencoder=None,
+            streamreader=None, streamwriter=None,
+            incrementaldecoder=StatefulIncrementalDecoder)
+
+
+class StatefulIncrementalDecoderTest(unittest.TestCase):
+    """
+    Make sure the StatefulIncrementalDecoder actually works.
+    """
+
+    test_cases = [
+        # I=1, O=1 (fixed-length input == fixed-length output)
+        (b'abcd', False, 'a.b.c.d.'),
+        # I=0, O=0 (variable-length input, variable-length output)
+        (b'oiabcd', True, 'abcd.'),
+        # I=0, O=0 (should ignore extra periods)
+        (b'oi...abcd...', True, 'abcd.'),
+        # I=0, O=6 (variable-length input, fixed-length output)
+        (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
+        # I=2, O=6 (fixed-length input < fixed-length output)
+        (b'i.i2.o6xyz', True, 'xy----.z-----.'),
+        # I=6, O=3 (fixed-length input > fixed-length output)
+        (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
+        # I=0, then 3; O=29, then 15 (with longer output)
+        (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
+         'a----------------------------.' +
+         'b----------------------------.' +
+         'cde--------------------------.' +
+         'abcdefghijabcde.' +
+         'a.b------------.' +
+         '.c.------------.' +
+         'd.e------------.' +
+         'k--------------.' +
+         'l--------------.' +
+         'm--------------.')
+    ]
+
+    def test_decoder(self):
+        # Try a few one-shot test cases.
+        for input, eof, output in self.test_cases:
+            d = StatefulIncrementalDecoder()
+            self.assertEqual(d.decode(input, eof), output)
+
+        # Also test an unfinished decode, followed by forcing EOF.
+        d = StatefulIncrementalDecoder()
+        self.assertEqual(d.decode(b'oiabcd'), '')
+        self.assertEqual(d.decode(b'', 1), 'abcd.')
+
+class TextIOWrapperTest:
+
+    def setUp(self):
+        self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
+        self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
+        os_helper.unlink(os_helper.TESTFN)
+        codecs.register(lookupTestDecoder)
+        self.addCleanup(codecs.unregister, lookupTestDecoder)
+
+    def tearDown(self):
+        os_helper.unlink(os_helper.TESTFN)
+
+    def test_constructor(self):
+        r = self.BytesIO(b"\xc3\xa9\n\n")
+        b = self.BufferedReader(r, 1000)
+        t = self.TextIOWrapper(b, encoding="utf-8")
+        t.__init__(b, encoding="latin-1", newline="\r\n")
+        self.assertEqual(t.encoding, "latin-1")
+        self.assertEqual(t.line_buffering, False)
+        t.__init__(b, encoding="utf-8", line_buffering=True)
+        self.assertEqual(t.encoding, "utf-8")
+        self.assertEqual(t.line_buffering, True)
+        self.assertEqual("\xe9\n", t.readline())
+        invalid_type = TypeError if self.is_C else ValueError
+        with self.assertRaises(invalid_type):
+            t.__init__(b, encoding=42)
+        with self.assertRaises(UnicodeEncodeError):
+            t.__init__(b, encoding='\udcfe')
+        with self.assertRaises(ValueError):
+            t.__init__(b, encoding='utf-8\0')
+        with self.assertRaises(invalid_type):
+            t.__init__(b, encoding="utf-8", errors=42)
+        if support.Py_DEBUG or sys.flags.dev_mode or self.is_C:
+            with self.assertRaises(UnicodeEncodeError):
+                t.__init__(b, encoding="utf-8", errors='\udcfe')
+        if support.Py_DEBUG or sys.flags.dev_mode or self.is_C:
+            with self.assertRaises(ValueError):
+                t.__init__(b, encoding="utf-8", errors='replace\0')
+        with self.assertRaises(TypeError):
+            t.__init__(b, encoding="utf-8", newline=42)
+        with self.assertRaises(ValueError):
+            t.__init__(b, encoding="utf-8", newline='\udcfe')
+        with self.assertRaises(ValueError):
+            t.__init__(b, encoding="utf-8", newline='\n\0')
+        with self.assertRaises(ValueError):
+            t.__init__(b, encoding="utf-8", newline='xyzzy')
+
+    def test_uninitialized(self):
+        t = self.TextIOWrapper.__new__(self.TextIOWrapper)
+        del t
+        t = self.TextIOWrapper.__new__(self.TextIOWrapper)
+        self.assertRaises(Exception, repr, t)
+        self.assertRaisesRegex((ValueError, AttributeError),
+                               'uninitialized|has no attribute',
+                               t.read, 0)
+        t.__init__(self.MockRawIO(), encoding="utf-8")
+        self.assertEqual(t.read(0), '')
+
+    def test_non_text_encoding_codecs_are_rejected(self):
+        # Ensure the constructor complains if passed a codec that isn't
+        # marked as a text encoding
+        # http://bugs.python.org/issue20404
+        r = self.BytesIO()
+        b = self.BufferedWriter(r)
+        with self.assertRaisesRegex(LookupError, "is not a text encoding"):
+            self.TextIOWrapper(b, encoding="hex")
+
+    def test_detach(self):
+        r = self.BytesIO()
+        b = self.BufferedWriter(r)
+        t = self.TextIOWrapper(b, encoding="ascii")
+        self.assertIs(t.detach(), b)
+
+        t = self.TextIOWrapper(b, encoding="ascii")
+        t.write("howdy")
+        self.assertFalse(r.getvalue())
+        t.detach()
+        self.assertEqual(r.getvalue(), b"howdy")
+        self.assertRaises(ValueError, t.detach)
+
+        # Operations independent of the detached stream should still work
+        repr(t)
+        self.assertEqual(t.encoding, "ascii")
+        self.assertEqual(t.errors, "strict")
+        self.assertFalse(t.line_buffering)
+        self.assertFalse(t.write_through)
+
+    def test_repr(self):
+        raw = self.BytesIO("hello".encode("utf-8"))
+        b = self.BufferedReader(raw)
+        t = self.TextIOWrapper(b, encoding="utf-8")
+        modname = self.TextIOWrapper.__module__
+        self.assertRegex(repr(t),
+                         r"<(%s\.)?TextIOWrapper encoding='utf-8'>" % modname)
+        raw.name = "dummy"
+        self.assertRegex(repr(t),
+                         r"<(%s\.)?TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
+        t.mode = "r"
+        self.assertRegex(repr(t),
+                         r"<(%s\.)?TextIOWrapper name='dummy' mode='r' encoding='utf-8'>" % modname)
+        raw.name = b"dummy"
+        self.assertRegex(repr(t),
+                         r"<(%s\.)?TextIOWrapper name=b'dummy' mode='r' encoding='utf-8'>" % modname)
+
+        t.buffer.detach()
+        repr(t)  # Should not raise an exception
+
+    def test_recursive_repr(self):
+        # Issue #25455
+        raw = self.BytesIO()
+        t = self.TextIOWrapper(raw, encoding="utf-8")
+        with support.swap_attr(raw, 'name', t), support.infinite_recursion(25):
+            with self.assertRaises(RuntimeError):
+                repr(t)  # Should not crash
+
+    def test_subclass_repr(self):
+        class TestSubclass(self.TextIOWrapper):
+            pass
+
+        f = TestSubclass(self.StringIO())
+        self.assertIn(TestSubclass.__name__, repr(f))
+
+    def test_line_buffering(self):
+        r = self.BytesIO()
+        b = self.BufferedWriter(r, 1000)
+        t = self.TextIOWrapper(b, encoding="utf-8", newline="\n", line_buffering=True)
+        t.write("X")
+        self.assertEqual(r.getvalue(), b"")  # No flush happened
+        t.write("Y\nZ")
+        self.assertEqual(r.getvalue(), b"XY\nZ")  # All got flushed
+        t.write("A\rB")
+        self.assertEqual(r.getvalue(), b"XY\nZA\rB")
+
+    def test_reconfigure_line_buffering(self):
+        r = self.BytesIO()
+        b = self.BufferedWriter(r, 1000)
+        t = self.TextIOWrapper(b, encoding="utf-8", newline="\n", line_buffering=False)
+        t.write("AB\nC")
+        self.assertEqual(r.getvalue(), b"")
+
+        t.reconfigure(line_buffering=True)   # implicit flush
+        self.assertEqual(r.getvalue(), b"AB\nC")
+        t.write("DEF\nG")
+        self.assertEqual(r.getvalue(), b"AB\nCDEF\nG")
+        t.write("H")
+        self.assertEqual(r.getvalue(), b"AB\nCDEF\nG")
+        t.reconfigure(line_buffering=False)   # implicit flush
+        self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH")
+        t.write("IJ")
+        self.assertEqual(r.getvalue(), b"AB\nCDEF\nGH")
+
+        # Keeping default value
+        t.reconfigure()
+        t.reconfigure(line_buffering=None)
+        self.assertEqual(t.line_buffering, False)
+        t.reconfigure(line_buffering=True)
+        t.reconfigure()
+        t.reconfigure(line_buffering=None)
+        self.assertEqual(t.line_buffering, True)
+
+    @unittest.skipIf(sys.flags.utf8_mode, "utf-8 mode is enabled")
+    def test_default_encoding(self):
+        with os_helper.EnvironmentVarGuard() as env:
+            # try to get a user preferred encoding different than the current
+            # locale encoding to check that TextIOWrapper() uses the current
+            # locale encoding and not the user preferred encoding
+            env.unset('LC_ALL', 'LANG', 'LC_CTYPE')
+
+            current_locale_encoding = locale.getencoding()
+            b = self.BytesIO()
+            with warnings.catch_warnings():
+                warnings.simplefilter("ignore", EncodingWarning)
+                t = self.TextIOWrapper(b)
+            self.assertEqual(t.encoding, current_locale_encoding)
+
+    def test_encoding(self):
+        # Check the encoding attribute is always set, and valid
+        b = self.BytesIO()
+        t = self.TextIOWrapper(b, encoding="utf-8")
+        self.assertEqual(t.encoding, "utf-8")
+        with warnings.catch_warnings():
+            warnings.simplefilter("ignore", EncodingWarning)
+            t = self.TextIOWrapper(b)
+        self.assertIsNotNone(t.encoding)
+        codecs.lookup(t.encoding)
+
+    def test_encoding_errors_reading(self):
+        # (1) default
+        b = self.BytesIO(b"abc\n\xff\n")
+        t = self.TextIOWrapper(b, encoding="ascii")
+        self.assertRaises(UnicodeError, t.read)
+        # (2) explicit strict
+        b = self.BytesIO(b"abc\n\xff\n")
+        t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
+        self.assertRaises(UnicodeError, t.read)
+        # (3) ignore
+        b = self.BytesIO(b"abc\n\xff\n")
+        t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
+        self.assertEqual(t.read(), "abc\n\n")
+        # (4) replace
+        b = self.BytesIO(b"abc\n\xff\n")
+        t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
+        self.assertEqual(t.read(), "abc\n\ufffd\n")
+
+    def test_encoding_errors_writing(self):
+        # (1) default
+        b = self.BytesIO()
+        t = self.TextIOWrapper(b, encoding="ascii")
+        self.assertRaises(UnicodeError, t.write, "\xff")
+        # (2) explicit strict
+        b = self.BytesIO()
+        t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
+        self.assertRaises(UnicodeError, t.write, "\xff")
+        # (3) ignore
+        b = self.BytesIO()
+        t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
+                             newline="\n")
+        t.write("abc\xffdef\n")
+        t.flush()
+        self.assertEqual(b.getvalue(), b"abcdef\n")
+        # (4) replace
+        b = self.BytesIO()
+        t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
+                             newline="\n")
+        t.write("abc\xffdef\n")
+        t.flush()
+        self.assertEqual(b.getvalue(), b"abc?def\n")
+
+    def test_newlines(self):
+        input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
+
+        tests = [
+            [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
+            [ '', input_lines ],
+            [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
+            [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
+            [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
+        ]
+        encodings = (
+            'utf-8', 'latin-1',
+            'utf-16', 'utf-16-le', 'utf-16-be',
+            'utf-32', 'utf-32-le', 'utf-32-be',
+        )
+
+        # Try a range of buffer sizes to test the case where \r is the last
+        # character in TextIOWrapper._pending_line.
+        for encoding in encodings:
+            # XXX: str.encode() should return bytes
+            data = bytes(''.join(input_lines).encode(encoding))
+            for do_reads in (False, True):
+                for bufsize in range(1, 10):
+                    for newline, exp_lines in tests:
+                        bufio = self.BufferedReader(self.BytesIO(data), bufsize)
+                        textio = self.TextIOWrapper(bufio, newline=newline,
+                                                  encoding=encoding)
+                        if do_reads:
+                            got_lines = []
+                            while True:
+                                c2 = textio.read(2)
+                                if c2 == '':
+                                    break
+                                self.assertEqual(len(c2), 2)
+                                got_lines.append(c2 + textio.readline())
+                        else:
+                            got_lines = list(textio)
+
+                        for got_line, exp_line in zip(got_lines, exp_lines):
+                            self.assertEqual(got_line, exp_line)
+                        self.assertEqual(len(got_lines), len(exp_lines))
+
+    def test_newlines_input(self):
+        testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
+        normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
+        for newline, expected in [
+            (None, normalized.decode("ascii").splitlines(keepends=True)),
+            ("", testdata.decode("ascii").splitlines(keepends=True)),
+            ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
+            ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
+            ("\r",  ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
+            ]:
+            buf = self.BytesIO(testdata)
+            txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
+            self.assertEqual(txt.readlines(), expected)
+            txt.seek(0)
+            self.assertEqual(txt.read(), "".join(expected))
+
+    def test_newlines_output(self):
+        testdict = {
+            "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
+            "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
+            "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
+            "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
+            }
+        tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
+        for newline, expected in tests:
+            buf = self.BytesIO()
+            txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
+            txt.write("AAA\nB")
+            txt.write("BB\nCCC\n")
+            txt.write("X\rY\r\nZ")
+            txt.flush()
+            self.assertEqual(buf.closed, False)
+            self.assertEqual(buf.getvalue(), expected)
+
+    def test_destructor(self):
+        l = []
+        base = self.BytesIO
+        class MyBytesIO(base):
+            def close(self):
+                l.append(self.getvalue())
+                base.close(self)
+        b = MyBytesIO()
+        t = self.TextIOWrapper(b, encoding="ascii")
+        t.write("abc")
+        del t
+        support.gc_collect()
+        self.assertEqual([b"abc"], l)
+
+    def test_override_destructor(self):
+        record = []
+        class MyTextIO(self.TextIOWrapper):
+            def __del__(self):
+                record.append(1)
+                try:
+                    f = super().__del__
+                except AttributeError:
+                    pass
+                else:
+                    f()
+            def close(self):
+                record.append(2)
+                super().close()
+            def flush(self):
+                record.append(3)
+                super().flush()
+        b = self.BytesIO()
+        t = MyTextIO(b, encoding="ascii")
+        del t
+        support.gc_collect()
+        self.assertEqual(record, [1, 2, 3])
+
+    def test_error_through_destructor(self):
+        # Test that the exception state is not modified by a destructor,
+        # even if close() fails.
+        rawio = self.CloseFailureIO()
+        with support.catch_unraisable_exception() as cm:
+            with self.assertRaises(AttributeError):
+                self.TextIOWrapper(rawio, encoding="utf-8").xyzzy
+
+            self.assertEqual(cm.unraisable.exc_type, OSError)
+
+    # Systematic tests of the text I/O API
+
+    def test_basic_io(self):
+        for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
+            for enc in "ascii", "latin-1", "utf-8" :# , "utf-16-be", "utf-16-le":
+                f = self.open(os_helper.TESTFN, "w+", encoding=enc)
+                f._CHUNK_SIZE = chunksize
+                self.assertEqual(f.write("abc"), 3)
+                f.close()
+                f = self.open(os_helper.TESTFN, "r+", encoding=enc)
+                f._CHUNK_SIZE = chunksize
+                self.assertEqual(f.tell(), 0)
+                self.assertEqual(f.read(), "abc")
+                cookie = f.tell()
+                self.assertEqual(f.seek(0), 0)
+                self.assertEqual(f.read(None), "abc")
+                f.seek(0)
+                self.assertEqual(f.read(2), "ab")
+                self.assertEqual(f.read(1), "c")
+                self.assertEqual(f.read(1), "")
+                self.assertEqual(f.read(), "")
+                self.assertEqual(f.tell(), cookie)
+                self.assertEqual(f.seek(0), 0)
+                self.assertEqual(f.seek(0, 2), cookie)
+                self.assertEqual(f.write("def"), 3)
+                self.assertEqual(f.seek(cookie), cookie)
+                self.assertEqual(f.read(), "def")
+                if enc.startswith("utf"):
+                    self.multi_line_test(f, enc)
+                f.close()
+
+    def multi_line_test(self, f, enc):
+        f.seek(0)
+        f.truncate()
+        sample = "s\xff\u0fff\uffff"
+        wlines = []
+        for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
+            chars = []
+            for i in range(size):
+                chars.append(sample[i % len(sample)])
+            line = "".join(chars) + "\n"
+            wlines.append((f.tell(), line))
+            f.write(line)
+        f.seek(0)
+        rlines = []
+        while True:
+            pos = f.tell()
+            line = f.readline()
+            if not line:
+                break
+            rlines.append((pos, line))
+        self.assertEqual(rlines, wlines)
+
+    def test_telling(self):
+        f = self.open(os_helper.TESTFN, "w+", encoding="utf-8")
+        p0 = f.tell()
+        f.write("\xff\n")
+        p1 = f.tell()
+        f.write("\xff\n")
+        p2 = f.tell()
+        f.seek(0)
+        self.assertEqual(f.tell(), p0)
+        self.assertEqual(f.readline(), "\xff\n")
+        self.assertEqual(f.tell(), p1)
+        self.assertEqual(f.readline(), "\xff\n")
+        self.assertEqual(f.tell(), p2)
+        f.seek(0)
+        for line in f:
+            self.assertEqual(line, "\xff\n")
+            self.assertRaises(OSError, f.tell)
+        self.assertEqual(f.tell(), p2)
+        f.close()
+
+    def test_seeking(self):
+        chunk_size = _default_chunk_size()
+        prefix_size = chunk_size - 2
+        u_prefix = "a" * prefix_size
+        prefix = bytes(u_prefix.encode("utf-8"))
+        self.assertEqual(len(u_prefix), len(prefix))
+        u_suffix = "\u8888\n"
+        suffix = bytes(u_suffix.encode("utf-8"))
+        line = prefix + suffix
+        with self.open(os_helper.TESTFN, "wb") as f:
+            f.write(line*2)
+        with self.open(os_helper.TESTFN, "r", encoding="utf-8") as f:
+            s = f.read(prefix_size)
+            self.assertEqual(s, str(prefix, "ascii"))
+            self.assertEqual(f.tell(), prefix_size)
+            self.assertEqual(f.readline(), u_suffix)
+
+    def test_seeking_too(self):
+        # Regression test for a specific bug
+        data = b'\xe0\xbf\xbf\n'
+        with self.open(os_helper.TESTFN, "wb") as f:
+            f.write(data)
+        with self.open(os_helper.TESTFN, "r", encoding="utf-8") as f:
+            f._CHUNK_SIZE  # Just test that it exists
+            f._CHUNK_SIZE = 2
+            f.readline()
+            f.tell()
+
+    def test_seek_and_tell(self):
+        #Test seek/tell using the StatefulIncrementalDecoder.
+        # Make test faster by doing smaller seeks
+        CHUNK_SIZE = 128
+
+        def test_seek_and_tell_with_data(data, min_pos=0):
+            """Tell/seek to various points within a data stream and ensure
+            that the decoded data returned by read() is consistent."""
+            f = self.open(os_helper.TESTFN, 'wb')
+            f.write(data)
+            f.close()
+            f = self.open(os_helper.TESTFN, encoding='test_decoder')
+            f._CHUNK_SIZE = CHUNK_SIZE
+            decoded = f.read()
+            f.close()
+
+            for i in range(min_pos, len(decoded) + 1): # seek positions
+                for j in [1, 5, len(decoded) - i]: # read lengths
+                    f = self.open(os_helper.TESTFN, encoding='test_decoder')
+                    self.assertEqual(f.read(i), decoded[:i])
+                    cookie = f.tell()
+                    self.assertEqual(f.read(j), decoded[i:i + j])
+                    f.seek(cookie)
+                    self.assertEqual(f.read(), decoded[i:])
+                    f.close()
+
+        # Enable the test decoder.
+        StatefulIncrementalDecoder.codecEnabled = 1
+
+        # Run the tests.
+        try:
+            # Try each test case.
+            for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
+                test_seek_and_tell_with_data(input)
+
+            # Position each test case so that it crosses a chunk boundary.
+            for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
+                offset = CHUNK_SIZE - len(input)//2
+                prefix = b'.'*offset
+                # Don't bother seeking into the prefix (takes too long).
+                min_pos = offset*2
+                test_seek_and_tell_with_data(prefix + input, min_pos)
+
+        # Ensure our test decoder won't interfere with subsequent tests.
+        finally:
+            StatefulIncrementalDecoder.codecEnabled = 0
+
+    def test_multibyte_seek_and_tell(self):
+        f = self.open(os_helper.TESTFN, "w", encoding="euc_jp")
+        f.write("AB\n\u3046\u3048\n")
+        f.close()
+
+        f = self.open(os_helper.TESTFN, "r", encoding="euc_jp")
+        self.assertEqual(f.readline(), "AB\n")
+        p0 = f.tell()
+        self.assertEqual(f.readline(), "\u3046\u3048\n")
+        p1 = f.tell()
+        f.seek(p0)
+        self.assertEqual(f.readline(), "\u3046\u3048\n")
+        self.assertEqual(f.tell(), p1)
+        f.close()
+
+    def test_seek_with_encoder_state(self):
+        f = self.open(os_helper.TESTFN, "w", encoding="euc_jis_2004")
+        f.write("\u00e6\u0300")
+        p0 = f.tell()
+        f.write("\u00e6")
+        f.seek(p0)
+        f.write("\u0300")
+        f.close()
+
+        f = self.open(os_helper.TESTFN, "r", encoding="euc_jis_2004")
+        self.assertEqual(f.readline(), "\u00e6\u0300\u0300")
+        f.close()
+
+    def test_encoded_writes(self):
+        data = "1234567890"
+        tests = ("utf-16",
+                 "utf-16-le",
+                 "utf-16-be",
+                 "utf-32",
+                 "utf-32-le",
+                 "utf-32-be")
+        for encoding in tests:
+            buf = self.BytesIO()
+            f = self.TextIOWrapper(buf, encoding=encoding)
+            # Check if the BOM is written only once (see issue1753).
+            f.write(data)
+            f.write(data)
+            f.seek(0)
+            self.assertEqual(f.read(), data * 2)
+            f.seek(0)
+            self.assertEqual(f.read(), data * 2)
+            self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
+
+    def test_unreadable(self):
+        class UnReadable(self.BytesIO):
+            def readable(self):
+                return False
+        txt = self.TextIOWrapper(UnReadable(), encoding="utf-8")
+        self.assertRaises(OSError, txt.read)
+
+    def test_read_one_by_one(self):
+        txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"), encoding="utf-8")
+        reads = ""
+        while True:
+            c = txt.read(1)
+            if not c:
+                break
+            reads += c
+        self.assertEqual(reads, "AA\nBB")
+
+    def test_readlines(self):
+        txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"), encoding="utf-8")
+        self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
+        txt.seek(0)
+        self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
+        txt.seek(0)
+        self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
+
+    # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
+    def test_read_by_chunk(self):
+        # make sure "\r\n" straddles 128 char boundary.
+        txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"), encoding="utf-8")
+        reads = ""
+        while True:
+            c = txt.read(128)
+            if not c:
+                break
+            reads += c
+        self.assertEqual(reads, "A"*127+"\nB")
+
+    def test_writelines(self):
+        l = ['ab', 'cd', 'ef']
+        buf = self.BytesIO()
+        txt = self.TextIOWrapper(buf, encoding="utf-8")
+        txt.writelines(l)
+        txt.flush()
+        self.assertEqual(buf.getvalue(), b'abcdef')
+
+    def test_writelines_userlist(self):
+        l = UserList(['ab', 'cd', 'ef'])
+        buf = self.BytesIO()
+        txt = self.TextIOWrapper(buf, encoding="utf-8")
+        txt.writelines(l)
+        txt.flush()
+        self.assertEqual(buf.getvalue(), b'abcdef')
+
+    def test_writelines_error(self):
+        txt = self.TextIOWrapper(self.BytesIO(), encoding="utf-8")
+        self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
+        self.assertRaises(TypeError, txt.writelines, None)
+        self.assertRaises(TypeError, txt.writelines, b'abc')
+
+    def test_issue1395_1(self):
+        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
+
+        # read one char at a time
+        reads = ""
+        while True:
+            c = txt.read(1)
+            if not c:
+                break
+            reads += c
+        self.assertEqual(reads, self.normalized)
+
+    def test_issue1395_2(self):
+        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
+        txt._CHUNK_SIZE = 4
+
+        reads = ""
+        while True:
+            c = txt.read(4)
+            if not c:
+                break
+            reads += c
+        self.assertEqual(reads, self.normalized)
+
+    def test_issue1395_3(self):
+        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
+        txt._CHUNK_SIZE = 4
+
+        reads = txt.read(4)
+        reads += txt.read(4)
+        reads += txt.readline()
+        reads += txt.readline()
+        reads += txt.readline()
+        self.assertEqual(reads, self.normalized)
+
+    def test_issue1395_4(self):
+        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
+        txt._CHUNK_SIZE = 4
+
+        reads = txt.read(4)
+        reads += txt.read()
+        self.assertEqual(reads, self.normalized)
+
+    def test_issue1395_5(self):
+        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
+        txt._CHUNK_SIZE = 4
+
+        reads = txt.read(4)
+        pos = txt.tell()
+        txt.seek(0)
+        txt.seek(pos)
+        self.assertEqual(txt.read(4), "BBB\n")
+
+    def test_issue2282(self):
+        buffer = self.BytesIO(self.testdata)
+        txt = self.TextIOWrapper(buffer, encoding="ascii")
+
+        self.assertEqual(buffer.seekable(), txt.seekable())
+
+    def test_append_bom(self):
+        # The BOM is not written again when appending to a non-empty file
+        filename = os_helper.TESTFN
+        for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
+            with self.open(filename, 'w', encoding=charset) as f:
+                f.write('aaa')
+                pos = f.tell()
+            with self.open(filename, 'rb') as f:
+                self.assertEqual(f.read(), 'aaa'.encode(charset))
+
+            with self.open(filename, 'a', encoding=charset) as f:
+                f.write('xxx')
+            with self.open(filename, 'rb') as f:
+                self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
+
+    def test_seek_bom(self):
+        # Same test, but when seeking manually
+        filename = os_helper.TESTFN
+        for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
+            with self.open(filename, 'w', encoding=charset) as f:
+                f.write('aaa')
+                pos = f.tell()
+            with self.open(filename, 'r+', encoding=charset) as f:
+                f.seek(pos)
+                f.write('zzz')
+                f.seek(0)
+                f.write('bbb')
+            with self.open(filename, 'rb') as f:
+                self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
+
+    def test_seek_append_bom(self):
+        # Same test, but first seek to the start and then to the end
+        filename = os_helper.TESTFN
+        for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
+            with self.open(filename, 'w', encoding=charset) as f:
+                f.write('aaa')
+            with self.open(filename, 'a', encoding=charset) as f:
+                f.seek(0)
+                f.seek(0, self.SEEK_END)
+                f.write('xxx')
+            with self.open(filename, 'rb') as f:
+                self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
+
+    def test_errors_property(self):
+        with self.open(os_helper.TESTFN, "w", encoding="utf-8") as f:
+            self.assertEqual(f.errors, "strict")
+        with self.open(os_helper.TESTFN, "w", encoding="utf-8", errors="replace") as f:
+            self.assertEqual(f.errors, "replace")
+
+    @support.no_tracing
+    @threading_helper.requires_working_threading()
+    def test_threads_write(self):
+        # Issue6750: concurrent writes could duplicate data
+        event = threading.Event()
+        with self.open(os_helper.TESTFN, "w", encoding="utf-8", buffering=1) as f:
+            def run(n):
+                text = "Thread%03d\n" % n
+                event.wait()
+                f.write(text)
+            threads = [threading.Thread(target=run, args=(x,))
+                       for x in range(20)]
+            with threading_helper.start_threads(threads, event.set):
+                time.sleep(0.02)
+        with self.open(os_helper.TESTFN, encoding="utf-8") as f:
+            content = f.read()
+            for n in range(20):
+                self.assertEqual(content.count("Thread%03d\n" % n), 1)
+
+    def test_flush_error_on_close(self):
+        # Test that text file is closed despite failed flush
+        # and that flush() is called before file closed.
+        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
+        closed = []
+        def bad_flush():
+            closed[:] = [txt.closed, txt.buffer.closed]
+            raise OSError()
+        txt.flush = bad_flush
+        self.assertRaises(OSError, txt.close) # exception not swallowed
+        self.assertTrue(txt.closed)
+        self.assertTrue(txt.buffer.closed)
+        self.assertTrue(closed)      # flush() called
+        self.assertFalse(closed[0])  # flush() called before file closed
+        self.assertFalse(closed[1])
+        txt.flush = lambda: None  # break reference loop
+
+    def test_close_error_on_close(self):
+        buffer = self.BytesIO(self.testdata)
+        def bad_flush():
+            raise OSError('flush')
+        def bad_close():
+            raise OSError('close')
+        buffer.close = bad_close
+        txt = self.TextIOWrapper(buffer, encoding="ascii")
+        txt.flush = bad_flush
+        with self.assertRaises(OSError) as err: # exception not swallowed
+            txt.close()
+        self.assertEqual(err.exception.args, ('close',))
+        self.assertIsInstance(err.exception.__context__, OSError)
+        self.assertEqual(err.exception.__context__.args, ('flush',))
+        self.assertFalse(txt.closed)
+
+        # Silence destructor error
+        buffer.close = lambda: None
+        txt.flush = lambda: None
+
+    def test_nonnormalized_close_error_on_close(self):
+        # Issue #21677
+        buffer = self.BytesIO(self.testdata)
+        def bad_flush():
+            raise non_existing_flush
+        def bad_close():
+            raise non_existing_close
+        buffer.close = bad_close
+        txt = self.TextIOWrapper(buffer, encoding="ascii")
+        txt.flush = bad_flush
+        with self.assertRaises(NameError) as err: # exception not swallowed
+            txt.close()
+        self.assertIn('non_existing_close', str(err.exception))
+        self.assertIsInstance(err.exception.__context__, NameError)
+        self.assertIn('non_existing_flush', str(err.exception.__context__))
+        self.assertFalse(txt.closed)
+
+        # Silence destructor error
+        buffer.close = lambda: None
+        txt.flush = lambda: None
+
+    def test_multi_close(self):
+        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
+        txt.close()
+        txt.close()
+        txt.close()
+        self.assertRaises(ValueError, txt.flush)
+
+    def test_unseekable(self):
+        txt = self.TextIOWrapper(self.MockUnseekableIO(self.testdata), encoding="utf-8")
+        self.assertRaises(self.UnsupportedOperation, txt.tell)
+        self.assertRaises(self.UnsupportedOperation, txt.seek, 0)
+
+    def test_readonly_attributes(self):
+        txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
+        buf = self.BytesIO(self.testdata)
+        with self.assertRaises(AttributeError):
+            txt.buffer = buf
+
+    def test_rawio(self):
+        # Issue #12591: TextIOWrapper must work with raw I/O objects, so
+        # that subprocess.Popen() can have the required unbuffered
+        # semantics with universal_newlines=True.
+        raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
+        txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
+        # Reads
+        self.assertEqual(txt.read(4), 'abcd')
+        self.assertEqual(txt.readline(), 'efghi\n')
+        self.assertEqual(list(txt), ['jkl\n', 'opq\n'])
+
+    def test_rawio_write_through(self):
+        # Issue #12591: with write_through=True, writes don't need a flush
+        raw = self.MockRawIO([b'abc', b'def', b'ghi\njkl\nopq\n'])
+        txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n',
+                                 write_through=True)
+        txt.write('1')
+        txt.write('23\n4')
+        txt.write('5')
+        self.assertEqual(b''.join(raw._write_stack), b'123\n45')
+
+    def test_bufio_write_through(self):
+        # Issue #21396: write_through=True doesn't force a flush()
+        # on the underlying binary buffered object.
+        flush_called, write_called = [], []
+        class BufferedWriter(self.BufferedWriter):
+            def flush(self, *args, **kwargs):
+                flush_called.append(True)
+                return super().flush(*args, **kwargs)
+            def write(self, *args, **kwargs):
+                write_called.append(True)
+                return super().write(*args, **kwargs)
+
+        rawio = self.BytesIO()
+        data = b"a"
+        bufio = BufferedWriter(rawio, len(data)*2)
+        textio = self.TextIOWrapper(bufio, encoding='ascii',
+                                    write_through=True)
+        # write to the buffered io but don't overflow the buffer
+        text = data.decode('ascii')
+        textio.write(text)
+
+        # buffer.flush is not called with write_through=True
+        self.assertFalse(flush_called)
+        # buffer.write *is* called with write_through=True
+        self.assertTrue(write_called)
+        self.assertEqual(rawio.getvalue(), b"") # no flush
+
+        write_called = [] # reset
+        textio.write(text * 10) # total content is larger than bufio buffer
+        self.assertTrue(write_called)
+        self.assertEqual(rawio.getvalue(), data * 11) # all flushed
+
+    def test_reconfigure_write_through(self):
+        raw = self.MockRawIO([])
+        t = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
+        t.write('1')
+        t.reconfigure(write_through=True)  # implied flush
+        self.assertEqual(t.write_through, True)
+        self.assertEqual(b''.join(raw._write_stack), b'1')
+        t.write('23')
+        self.assertEqual(b''.join(raw._write_stack), b'123')
+        t.reconfigure(write_through=False)
+        self.assertEqual(t.write_through, False)
+        t.write('45')
+        t.flush()
+        self.assertEqual(b''.join(raw._write_stack), b'12345')
+        # Keeping default value
+        t.reconfigure()
+        t.reconfigure(write_through=None)
+        self.assertEqual(t.write_through, False)
+        t.reconfigure(write_through=True)
+        t.reconfigure()
+        t.reconfigure(write_through=None)
+        self.assertEqual(t.write_through, True)
+
+    def test_read_nonbytes(self):
+        # Issue #17106
+        # Crash when underlying read() returns non-bytes
+        t = self.TextIOWrapper(self.StringIO('a'), encoding="utf-8")
+        self.assertRaises(TypeError, t.read, 1)
+        t = self.TextIOWrapper(self.StringIO('a'), encoding="utf-8")
+        self.assertRaises(TypeError, t.readline)
+        t = self.TextIOWrapper(self.StringIO('a'), encoding="utf-8")
+        self.assertRaises(TypeError, t.read)
+
+    def test_illegal_encoder(self):
+        # Issue 31271: Calling write() while the return value of encoder's
+        # encode() is invalid shouldn't cause an assertion failure.
+        rot13 = codecs.lookup("rot13")
+        with support.swap_attr(rot13, '_is_text_encoding', True):
+            t = self.TextIOWrapper(self.BytesIO(b'foo'), encoding="rot13")
+        self.assertRaises(TypeError, t.write, 'bar')
+
+    def test_illegal_decoder(self):
+        # Issue #17106
+        # Bypass the early encoding check added in issue 20404
+        def _make_illegal_wrapper():
+            quopri = codecs.lookup("quopri")
+            quopri._is_text_encoding = True
+            try:
+                t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'),
+                                       newline='\n', encoding="quopri")
+            finally:
+                quopri._is_text_encoding = False
+            return t
+        # Crash when decoder returns non-string
+        t = _make_illegal_wrapper()
+        self.assertRaises(TypeError, t.read, 1)
+        t = _make_illegal_wrapper()
+        self.assertRaises(TypeError, t.readline)
+        t = _make_illegal_wrapper()
+        self.assertRaises(TypeError, t.read)
+
+        # Issue 31243: calling read() while the return value of decoder's
+        # getstate() is invalid should neither crash the interpreter nor
+        # raise a SystemError.
+        def _make_very_illegal_wrapper(getstate_ret_val):
+            class BadDecoder:
+                def getstate(self):
+                    return getstate_ret_val
+            def _get_bad_decoder(dummy):
+                return BadDecoder()
+            quopri = codecs.lookup("quopri")
+            with support.swap_attr(quopri, 'incrementaldecoder',
+                                   _get_bad_decoder):
+                return _make_illegal_wrapper()
+        t = _make_very_illegal_wrapper(42)
+        self.assertRaises(TypeError, t.read, 42)
+        t = _make_very_illegal_wrapper(())
+        self.assertRaises(TypeError, t.read, 42)
+        t = _make_very_illegal_wrapper((1, 2))
+        self.assertRaises(TypeError, t.read, 42)
+
+    def _check_create_at_shutdown(self, **kwargs):
+        # Issue #20037: creating a TextIOWrapper at shutdown
+        # shouldn't crash the interpreter.
+        iomod = self.io.__name__
+        code = """if 1:
+            import codecs
+            import {iomod} as io
+
+            # Avoid looking up codecs at shutdown
+            codecs.lookup('utf-8')
+
+            class C:
+                def __del__(self):
+                    io.TextIOWrapper(io.BytesIO(), **{kwargs})
+                    print("ok")
+            c = C()
+            """.format(iomod=iomod, kwargs=kwargs)
+        return assert_python_ok("-c", code)
+
+    def test_create_at_shutdown_without_encoding(self):
+        rc, out, err = self._check_create_at_shutdown()
+        if err:
+            # Can error out with a RuntimeError if the module state
+            # isn't found.
+            self.assertIn(self.shutdown_error, err.decode())
+        else:
+            self.assertEqual("ok", out.decode().strip())
+
+    def test_create_at_shutdown_with_encoding(self):
+        rc, out, err = self._check_create_at_shutdown(encoding='utf-8',
+                                                      errors='strict')
+        self.assertFalse(err)
+        self.assertEqual("ok", out.decode().strip())
+
+    def test_read_byteslike(self):
+        r = MemviewBytesIO(b'Just some random string\n')
+        t = self.TextIOWrapper(r, 'utf-8')
+
+        # TextIOwrapper will not read the full string, because
+        # we truncate it to a multiple of the native int size
+        # so that we can construct a more complex memoryview.
+        bytes_val =  _to_memoryview(r.getvalue()).tobytes()
+
+        self.assertEqual(t.read(200), bytes_val.decode('utf-8'))
+
+    def test_issue22849(self):
+        class F(object):
+            def readable(self): return True
+            def writable(self): return True
+            def seekable(self): return True
+
+        for i in range(10):
+            try:
+                self.TextIOWrapper(F(), encoding='utf-8')
+            except Exception:
+                pass
+
+        F.tell = lambda x: 0
+        t = self.TextIOWrapper(F(), encoding='utf-8')
+
+    def test_reconfigure_locale(self):
+        wrapper = self.TextIOWrapper(self.BytesIO(b"test"))
+        wrapper.reconfigure(encoding="locale")
+
+    def test_reconfigure_encoding_read(self):
+        # latin1 -> utf8
+        # (latin1 can decode utf-8 encoded string)
+        data = 'abc\xe9\n'.encode('latin1') + 'd\xe9f\n'.encode('utf8')
+        raw = self.BytesIO(data)
+        txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n')
+        self.assertEqual(txt.readline(), 'abc\xe9\n')
+        with self.assertRaises(self.UnsupportedOperation):
+            txt.reconfigure(encoding='utf-8')
+        with self.assertRaises(self.UnsupportedOperation):
+            txt.reconfigure(newline=None)
+
+    def test_reconfigure_write_fromascii(self):
+        # ascii has a specific encodefunc in the C implementation,
+        # but utf-8-sig has not. Make sure that we get rid of the
+        # cached encodefunc when we switch encoders.
+        raw = self.BytesIO()
+        txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
+        txt.write('foo\n')
+        txt.reconfigure(encoding='utf-8-sig')
+        txt.write('\xe9\n')
+        txt.flush()
+        self.assertEqual(raw.getvalue(), b'foo\n\xc3\xa9\n')
+
+    def test_reconfigure_write(self):
+        # latin -> utf8
+        raw = self.BytesIO()
+        txt = self.TextIOWrapper(raw, encoding='latin1', newline='\n')
+        txt.write('abc\xe9\n')
+        txt.reconfigure(encoding='utf-8')
+        self.assertEqual(raw.getvalue(), b'abc\xe9\n')
+        txt.write('d\xe9f\n')
+        txt.flush()
+        self.assertEqual(raw.getvalue(), b'abc\xe9\nd\xc3\xa9f\n')
+
+        # ascii -> utf-8-sig: ensure that no BOM is written in the middle of
+        # the file
+        raw = self.BytesIO()
+        txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
+        txt.write('abc\n')
+        txt.reconfigure(encoding='utf-8-sig')
+        txt.write('d\xe9f\n')
+        txt.flush()
+        self.assertEqual(raw.getvalue(), b'abc\nd\xc3\xa9f\n')
+
+    def test_reconfigure_write_non_seekable(self):
+        raw = self.BytesIO()
+        raw.seekable = lambda: False
+        raw.seek = None
+        txt = self.TextIOWrapper(raw, encoding='ascii', newline='\n')
+        txt.write('abc\n')
+        txt.reconfigure(encoding='utf-8-sig')
+        txt.write('d\xe9f\n')
+        txt.flush()
+
+        # If the raw stream is not seekable, there'll be a BOM
+        self.assertEqual(raw.getvalue(),  b'abc\n\xef\xbb\xbfd\xc3\xa9f\n')
+
+    def test_reconfigure_defaults(self):
+        txt = self.TextIOWrapper(self.BytesIO(), 'ascii', 'replace', '\n')
+        txt.reconfigure(encoding=None)
+        self.assertEqual(txt.encoding, 'ascii')
+        self.assertEqual(txt.errors, 'replace')
+        txt.write('LF\n')
+
+        txt.reconfigure(newline='\r\n')
+        self.assertEqual(txt.encoding, 'ascii')
+        self.assertEqual(txt.errors, 'replace')
+
+        txt.reconfigure(errors='ignore')
+        self.assertEqual(txt.encoding, 'ascii')
+        self.assertEqual(txt.errors, 'ignore')
+        txt.write('CRLF\n')
+
+        txt.reconfigure(encoding='utf-8', newline=None)
+        self.assertEqual(txt.errors, 'strict')
+        txt.seek(0)
+        self.assertEqual(txt.read(), 'LF\nCRLF\n')
+
+        self.assertEqual(txt.detach().getvalue(), b'LF\nCRLF\r\n')
+
+    def test_reconfigure_errors(self):
+        txt = self.TextIOWrapper(self.BytesIO(), 'ascii', 'replace', '\r')
+        with self.assertRaises(TypeError):  # there was a crash
+            txt.reconfigure(encoding=42)
+        if self.is_C:
+            with self.assertRaises(UnicodeEncodeError):
+                txt.reconfigure(encoding='\udcfe')
+            with self.assertRaises(LookupError):
+                txt.reconfigure(encoding='locale\0')
+        # TODO: txt.reconfigure(encoding='utf-8\0')
+        # TODO: txt.reconfigure(encoding='nonexisting')
+        with self.assertRaises(TypeError):
+            txt.reconfigure(errors=42)
+        if self.is_C:
+            with self.assertRaises(UnicodeEncodeError):
+                txt.reconfigure(errors='\udcfe')
+        # TODO: txt.reconfigure(errors='ignore\0')
+        # TODO: txt.reconfigure(errors='nonexisting')
+        with self.assertRaises(TypeError):
+            txt.reconfigure(newline=42)
+        with self.assertRaises(ValueError):
+            txt.reconfigure(newline='\udcfe')
+        with self.assertRaises(ValueError):
+            txt.reconfigure(newline='xyz')
+        if not self.is_C:
+            # TODO: Should fail in C too.
+            with self.assertRaises(ValueError):
+                txt.reconfigure(newline='\n\0')
+        if self.is_C:
+            # TODO: Use __bool__(), not __index__().
+            with self.assertRaises(ZeroDivisionError):
+                txt.reconfigure(line_buffering=BadIndex())
+            with self.assertRaises(OverflowError):
+                txt.reconfigure(line_buffering=2**1000)
+            with self.assertRaises(ZeroDivisionError):
+                txt.reconfigure(write_through=BadIndex())
+            with self.assertRaises(OverflowError):
+                txt.reconfigure(write_through=2**1000)
+            with self.assertRaises(ZeroDivisionError):  # there was a crash
+                txt.reconfigure(line_buffering=BadIndex(),
+                                write_through=BadIndex())
+        self.assertEqual(txt.encoding, 'ascii')
+        self.assertEqual(txt.errors, 'replace')
+        self.assertIs(txt.line_buffering, False)
+        self.assertIs(txt.write_through, False)
+
+        txt.reconfigure(encoding='latin1', errors='ignore', newline='\r\n',
+                        line_buffering=True, write_through=True)
+        self.assertEqual(txt.encoding, 'latin1')
+        self.assertEqual(txt.errors, 'ignore')
+        self.assertIs(txt.line_buffering, True)
+        self.assertIs(txt.write_through, True)
+
+    def test_reconfigure_newline(self):
+        raw = self.BytesIO(b'CR\rEOF')
+        txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
+        txt.reconfigure(newline=None)
+        self.assertEqual(txt.readline(), 'CR\n')
+        raw = self.BytesIO(b'CR\rEOF')
+        txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
+        txt.reconfigure(newline='')
+        self.assertEqual(txt.readline(), 'CR\r')
+        raw = self.BytesIO(b'CR\rLF\nEOF')
+        txt = self.TextIOWrapper(raw, 'ascii', newline='\r')
+        txt.reconfigure(newline='\n')
+        self.assertEqual(txt.readline(), 'CR\rLF\n')
+        raw = self.BytesIO(b'LF\nCR\rEOF')
+        txt = self.TextIOWrapper(raw, 'ascii', newline='\n')
+        txt.reconfigure(newline='\r')
+        self.assertEqual(txt.readline(), 'LF\nCR\r')
+        raw = self.BytesIO(b'CR\rCRLF\r\nEOF')
+        txt = self.TextIOWrapper(raw, 'ascii', newline='\r')
+        txt.reconfigure(newline='\r\n')
+        self.assertEqual(txt.readline(), 'CR\rCRLF\r\n')
+
+        txt = self.TextIOWrapper(self.BytesIO(), 'ascii', newline='\r')
+        txt.reconfigure(newline=None)
+        txt.write('linesep\n')
+        txt.reconfigure(newline='')
+        txt.write('LF\n')
+        txt.reconfigure(newline='\n')
+        txt.write('LF\n')
+        txt.reconfigure(newline='\r')
+        txt.write('CR\n')
+        txt.reconfigure(newline='\r\n')
+        txt.write('CRLF\n')
+        expected = 'linesep' + os.linesep + 'LF\nLF\nCR\rCRLF\r\n'
+        self.assertEqual(txt.detach().getvalue().decode('ascii'), expected)
+
+    def test_issue25862(self):
+        # Assertion failures occurred in tell() after read() and write().
+        t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii')
+        t.read(1)
+        t.read()
+        t.tell()
+        t = self.TextIOWrapper(self.BytesIO(b'test'), encoding='ascii')
+        t.read(1)
+        t.write('x')
+        t.tell()
+
+    def test_issue35928(self):
+        p = self.BufferedRWPair(self.BytesIO(b'foo\nbar\n'), self.BytesIO())
+        f = self.TextIOWrapper(p)
+        res = f.readline()
+        self.assertEqual(res, 'foo\n')
+        f.write(res)
+        self.assertEqual(res + f.readline(), 'foo\nbar\n')
+
+    def test_pickling_subclass(self):
+        global MyTextIO
+        class MyTextIO(self.TextIOWrapper):
+            def __init__(self, raw, tag):
+                super().__init__(raw)
+                self.tag = tag
+            def __getstate__(self):
+                return self.tag, self.buffer.getvalue()
+            def __setstate__(slf, state):
+                tag, value = state
+                slf.__init__(self.BytesIO(value), tag)
+
+        raw = self.BytesIO(b'data')
+        txt = MyTextIO(raw, 'ham')
+        for proto in range(pickle.HIGHEST_PROTOCOL + 1):
+            with self.subTest(protocol=proto):
+                pickled = pickle.dumps(txt, proto)
+                newtxt = pickle.loads(pickled)
+                self.assertEqual(newtxt.buffer.getvalue(), b'data')
+                self.assertEqual(newtxt.tag, 'ham')
+        del MyTextIO
+
+    @unittest.skipUnless(hasattr(os, "pipe"), "requires os.pipe()")
+    def test_read_non_blocking(self):
+        import os
+        r, w = os.pipe()
+        try:
+            os.set_blocking(r, False)
+            with self.io.open(r, 'rt') as textfile:
+                r = None
+                # Nothing has been written so a non-blocking read raises a BlockingIOError exception.
+                with self.assertRaises(BlockingIOError):
+                    textfile.read()
+        finally:
+            if r is not None:
+                os.close(r)
+            os.close(w)
+
+
+class MemviewBytesIO(io.BytesIO):
+    '''A BytesIO object whose read method returns memoryviews
+       rather than bytes'''
+
+    def read1(self, len_):
+        return _to_memoryview(super().read1(len_))
+
+    def read(self, len_):
+        return _to_memoryview(super().read(len_))
+
+def _to_memoryview(buf):
+    '''Convert bytes-object *buf* to a non-trivial memoryview'''
+
+    arr = array.array('i')
+    idx = len(buf) - len(buf) % arr.itemsize
+    arr.frombytes(buf[:idx])
+    return memoryview(arr)
+
+
+class CTextIOWrapperTest(TextIOWrapperTest, CTestCase):
+    shutdown_error = "LookupError: unknown encoding: ascii"
+
+    def test_initialization(self):
+        r = self.BytesIO(b"\xc3\xa9\n\n")
+        b = self.BufferedReader(r, 1000)
+        t = self.TextIOWrapper(b, encoding="utf-8")
+        self.assertRaises(ValueError, t.__init__, b, encoding="utf-8", newline='xyzzy')
+        self.assertRaises(ValueError, t.read)
+
+        t = self.TextIOWrapper.__new__(self.TextIOWrapper)
+        self.assertRaises(Exception, repr, t)
+
+    def test_garbage_collection(self):
+        # C TextIOWrapper objects are collected, and collecting them flushes
+        # all data to disk.
+        # The Python version has __del__, so it ends in gc.garbage instead.
+        with warnings.catch_warnings():
+            warnings.simplefilter("ignore", ResourceWarning)
+            rawio = self.FileIO(os_helper.TESTFN, "wb")
+            b = self.BufferedWriter(rawio)
+            t = self.TextIOWrapper(b, encoding="ascii")
+            t.write("456def")
+            t.x = t
+            wr = weakref.ref(t)
+            del t
+            support.gc_collect()
+        self.assertIsNone(wr(), wr)
+        with self.open(os_helper.TESTFN, "rb") as f:
+            self.assertEqual(f.read(), b"456def")
+
+    def test_rwpair_cleared_before_textio(self):
+        # Issue 13070: TextIOWrapper's finalization would crash when called
+        # after the reference to the underlying BufferedRWPair's writer got
+        # cleared by the GC.
+        for i in range(1000):
+            b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
+            t1 = self.TextIOWrapper(b1, encoding="ascii")
+            b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
+            t2 = self.TextIOWrapper(b2, encoding="ascii")
+            # circular references
+            t1.buddy = t2
+            t2.buddy = t1
+        support.gc_collect()
+
+    def test_del__CHUNK_SIZE_SystemError(self):
+        t = self.TextIOWrapper(self.BytesIO(), encoding='ascii')
+        with self.assertRaises(AttributeError):
+            del t._CHUNK_SIZE
+
+    def test_internal_buffer_size(self):
+        # bpo-43260: TextIOWrapper's internal buffer should not store
+        # data larger than chunk size.
+        chunk_size = 8192  # default chunk size, updated later
+
+        class MockIO(self.MockRawIO):
+            def write(self, data):
+                if len(data) > chunk_size:
+                    raise RuntimeError
+                return super().write(data)
+
+        buf = MockIO()
+        t = self.TextIOWrapper(buf, encoding="ascii")
+        chunk_size = t._CHUNK_SIZE
+        t.write("abc")
+        t.write("def")
+        # default chunk size is 8192 bytes so t don't write data to buf.
+        self.assertEqual([], buf._write_stack)
+
+        with self.assertRaises(RuntimeError):
+            t.write("x"*(chunk_size+1))
+
+        self.assertEqual([b"abcdef"], buf._write_stack)
+        t.write("ghi")
+        t.write("x"*chunk_size)
+        self.assertEqual([b"abcdef", b"ghi", b"x"*chunk_size], buf._write_stack)
+
+    def test_issue119506(self):
+        chunk_size = 8192
+
+        class MockIO(self.MockRawIO):
+            written = False
+            def write(self, data):
+                if not self.written:
+                    self.written = True
+                    t.write("middle")
+                return super().write(data)
+
+        buf = MockIO()
+        t = self.TextIOWrapper(buf)
+        t.write("abc")
+        t.write("def")
+        # writing data which size >= chunk_size cause flushing buffer before write.
+        t.write("g" * chunk_size)
+        t.flush()
+
+        self.assertEqual([b"abcdef", b"middle", b"g"*chunk_size],
+                         buf._write_stack)
+
+
+class PyTextIOWrapperTest(TextIOWrapperTest, PyTestCase):
+    shutdown_error = "LookupError: unknown encoding: ascii"
+
+
+class IncrementalNewlineDecoderTest:
+
+    def check_newline_decoding_utf8(self, decoder):
+        # UTF-8 specific tests for a newline decoder
+        def _check_decode(b, s, **kwargs):
+            # We exercise getstate() / setstate() as well as decode()
+            state = decoder.getstate()
+            self.assertEqual(decoder.decode(b, **kwargs), s)
+            decoder.setstate(state)
+            self.assertEqual(decoder.decode(b, **kwargs), s)
+
+        _check_decode(b'\xe8\xa2\x88', "\u8888")
+
+        _check_decode(b'\xe8', "")
+        _check_decode(b'\xa2', "")
+        _check_decode(b'\x88', "\u8888")
+
+        _check_decode(b'\xe8', "")
+        _check_decode(b'\xa2', "")
+        _check_decode(b'\x88', "\u8888")
+
+        _check_decode(b'\xe8', "")
+        self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
+
+        decoder.reset()
+        _check_decode(b'\n', "\n")
+        _check_decode(b'\r', "")
+        _check_decode(b'', "\n", final=True)
+        _check_decode(b'\r', "\n", final=True)
+
+        _check_decode(b'\r', "")
+        _check_decode(b'a', "\na")
+
+        _check_decode(b'\r\r\n', "\n\n")
+        _check_decode(b'\r', "")
+        _check_decode(b'\r', "\n")
+        _check_decode(b'\na', "\na")
+
+        _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
+        _check_decode(b'\xe8\xa2\x88', "\u8888")
+        _check_decode(b'\n', "\n")
+        _check_decode(b'\xe8\xa2\x88\r', "\u8888")
+        _check_decode(b'\n', "\n")
+
+    def check_newline_decoding(self, decoder, encoding):
+        result = []
+        if encoding is not None:
+            encoder = codecs.getincrementalencoder(encoding)()
+            def _decode_bytewise(s):
+                # Decode one byte at a time
+                for b in encoder.encode(s):
+                    result.append(decoder.decode(bytes([b])))
+        else:
+            encoder = None
+            def _decode_bytewise(s):
+                # Decode one char at a time
+                for c in s:
+                    result.append(decoder.decode(c))
+        self.assertEqual(decoder.newlines, None)
+        _decode_bytewise("abc\n\r")
+        self.assertEqual(decoder.newlines, '\n')
+        _decode_bytewise("\nabc")
+        self.assertEqual(decoder.newlines, ('\n', '\r\n'))
+        _decode_bytewise("abc\r")
+        self.assertEqual(decoder.newlines, ('\n', '\r\n'))
+        _decode_bytewise("abc")
+        self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
+        _decode_bytewise("abc\r")
+        self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
+        decoder.reset()
+        input = "abc"
+        if encoder is not None:
+            encoder.reset()
+            input = encoder.encode(input)
+        self.assertEqual(decoder.decode(input), "abc")
+        self.assertEqual(decoder.newlines, None)
+
+    def test_newline_decoder(self):
+        encodings = (
+            # None meaning the IncrementalNewlineDecoder takes unicode input
+            # rather than bytes input
+            None, 'utf-8', 'latin-1',
+            'utf-16', 'utf-16-le', 'utf-16-be',
+            'utf-32', 'utf-32-le', 'utf-32-be',
+        )
+        for enc in encodings:
+            decoder = enc and codecs.getincrementaldecoder(enc)()
+            decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
+            self.check_newline_decoding(decoder, enc)
+        decoder = codecs.getincrementaldecoder("utf-8")()
+        decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
+        self.check_newline_decoding_utf8(decoder)
+        self.assertRaises(TypeError, decoder.setstate, 42)
+
+    def test_newline_bytes(self):
+        # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
+        def _check(dec):
+            self.assertEqual(dec.newlines, None)
+            self.assertEqual(dec.decode("\u0D00"), "\u0D00")
+            self.assertEqual(dec.newlines, None)
+            self.assertEqual(dec.decode("\u0A00"), "\u0A00")
+            self.assertEqual(dec.newlines, None)
+        dec = self.IncrementalNewlineDecoder(None, translate=False)
+        _check(dec)
+        dec = self.IncrementalNewlineDecoder(None, translate=True)
+        _check(dec)
+
+    def test_translate(self):
+        # issue 35062
+        for translate in (-2, -1, 1, 2):
+            decoder = codecs.getincrementaldecoder("utf-8")()
+            decoder = self.IncrementalNewlineDecoder(decoder, translate)
+            self.check_newline_decoding_utf8(decoder)
+        decoder = codecs.getincrementaldecoder("utf-8")()
+        decoder = self.IncrementalNewlineDecoder(decoder, translate=0)
+        self.assertEqual(decoder.decode(b"\r\r\n"), "\r\r\n")
+
+class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest, unittest.TestCase):
+    IncrementalNewlineDecoder = io.IncrementalNewlineDecoder
+
+    @support.cpython_only
+    def test_uninitialized(self):
+        uninitialized = self.IncrementalNewlineDecoder.__new__(
+            self.IncrementalNewlineDecoder)
+        self.assertRaises(ValueError, uninitialized.decode, b'bar')
+        self.assertRaises(ValueError, uninitialized.getstate)
+        self.assertRaises(ValueError, uninitialized.setstate, (b'foo', 0))
+        self.assertRaises(ValueError, uninitialized.reset)
+
+
+class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest, unittest.TestCase):
+    IncrementalNewlineDecoder = pyio.IncrementalNewlineDecoder