def tell(self) -> int:
"""Return current stream position."""
+ self._checkClosed()
return self.seek(0, 1)
def truncate(self, pos: int = None) -> int:
This is not implemented for read-only and non-blocking streams.
"""
# XXX Should this return the number of bytes written???
+ if self.__closed:
+ raise ValueError("I/O operation on closed file.")
__closed = False
lines will be read if the total size (in bytes/characters) of all
lines so far exceeds hint.
"""
+ self._checkClosed()
if hint is None or hint <= 0:
return list(self)
n = 0
Returns an empty bytes object on EOF, or None if the object is
set not to block and has no data to read.
"""
+ self._checkClosed()
if n is None:
n = -1
if n < 0:
def readall(self):
"""Read until EOF, using multiple read() call."""
+ self._checkClosed()
res = bytearray()
while True:
data = self.read(DEFAULT_BUFFER_SIZE)
data at the moment.
"""
# XXX This ought to work with anything that supports the buffer API
+ self._checkClosed()
data = self.read(len(b))
n = len(data)
try:
def getvalue(self):
"""Return the bytes value (contents) of the buffer
"""
- if self.closed:
- raise ValueError("getvalue on closed file")
+ self._checkClosed()
return bytes(self._buffer)
def read(self, n=None):
- if self.closed:
- raise ValueError("read from closed file")
+ self._checkClosed()
if n is None:
n = -1
if n < 0:
return self.read(n)
def write(self, b):
- if self.closed:
- raise ValueError("write to closed file")
+ self._checkClosed()
if isinstance(b, str):
raise TypeError("can't write str to binary stream")
n = len(b)
return n
def seek(self, pos, whence=0):
- if self.closed:
- raise ValueError("seek on closed file")
+ self._checkClosed()
try:
pos = pos.__index__()
except AttributeError as err:
return self._pos
def tell(self):
- if self.closed:
- raise ValueError("tell on closed file")
+ self._checkClosed()
return self._pos
def truncate(self, pos=None):
- if self.closed:
- raise ValueError("truncate on closed file")
+ self._checkClosed()
if pos is None:
pos = self._pos
elif pos < 0:
mode. If n is negative, read until EOF or until read() would
block.
"""
+ self._checkClosed()
with self._read_lock:
return self._read_unlocked(n)
do at most one raw read to satisfy it. We never return more
than self.buffer_size.
"""
+ self._checkClosed()
with self._read_lock:
return self._peek_unlocked(n)
"""Reads up to n bytes, with at most one read() system call."""
# Returns up to n bytes. If at least one byte is buffered, we
# only return buffered bytes. Otherwise, we do one raw read.
+ self._checkClosed()
if n <= 0:
return b""
with self._read_lock:
min(n, len(self._read_buf) - self._read_pos))
def tell(self):
+ self._checkClosed()
return self.raw.tell() - len(self._read_buf) + self._read_pos
def seek(self, pos, whence=0):
+ self._checkClosed()
with self._read_lock:
if whence == 1:
pos -= len(self._read_buf) - self._read_pos
self._write_lock = Lock()
def write(self, b):
- if self.closed:
- raise ValueError("write to closed file")
+ self._checkClosed()
if isinstance(b, str):
raise TypeError("can't write str to binary stream")
with self._write_lock:
return written
def truncate(self, pos=None):
+ self._checkClosed()
with self._write_lock:
self._flush_unlocked()
if pos is None:
return self.raw.truncate(pos)
def flush(self):
+ self._checkClosed()
with self._write_lock:
self._flush_unlocked()
def _flush_unlocked(self):
- if self.closed:
- raise ValueError("flush of closed file")
written = 0
try:
while self._write_buf:
raise BlockingIOError(e.errno, e.strerror, written)
def tell(self):
+ self._checkClosed()
return self.raw.tell() + len(self._write_buf)
def seek(self, pos, whence=0):
+ self._checkClosed()
with self._write_lock:
self._flush_unlocked()
return self.raw.seek(pos, whence)
return pos
def tell(self):
+ self._checkClosed()
if self._write_buf:
return self.raw.tell() + len(self._write_buf)
else:
return BufferedReader.read1(self, n)
def write(self, b):
+ self._checkClosed()
if self._read_buf:
# Undo readahead
with self._read_lock:
return self.buffer.isatty()
def write(self, s: str):
- if self.closed:
- raise ValueError("write to closed file")
+ self._checkClosed()
if not isinstance(s, str):
raise TypeError("can't write %s to text stream" %
s.__class__.__name__)
return position, dec_flags, bytes_to_feed, need_eof, chars_to_skip
def tell(self):
+ self._checkClosed()
if not self._seekable:
raise IOError("underlying stream is not seekable")
if not self._telling:
return self.buffer.truncate()
def seek(self, cookie, whence=0):
- if self.closed:
- raise ValueError("tell on closed file")
+ self._checkClosed()
if not self._seekable:
raise IOError("underlying stream is not seekable")
if whence == 1: # seek relative to current position
return cookie
def read(self, n=None):
+ self._checkClosed()
if n is None:
n = -1
decoder = self._decoder or self._get_decoder()
return result
def __next__(self):
+ self._checkClosed()
self._telling = False
line = self.readline()
if not line:
return line
def readline(self, limit=None):
- if self.closed:
- raise ValueError("read from closed file")
+ self._checkClosed()
if limit is None:
limit = -1
def getvalue(self) -> str:
"""Retrieve the entire contents of the object."""
- if self.closed:
- raise ValueError("read on closed file")
+ self._checkClosed()
return self._getvalue()
def write(self, s: str) -> int:
Returns the number of characters written.
"""
- if self.closed:
- raise ValueError("write to closed file")
+ self._checkClosed()
if not isinstance(s, str):
raise TypeError("can't write %s to text stream" %
s.__class__.__name__)
If the argument is negative or omitted, read until EOF
is reached. Return an empty string at EOF.
"""
- if self.closed:
- raise ValueError("read to closed file")
+ self._checkClosed()
if n is None:
n = -1
res = self._pending
def tell(self) -> int:
"""Tell the current file position."""
- if self.closed:
- raise ValueError("tell from closed file")
+ self._checkClosed()
if self._pending:
return self._tell() - len(self._pending)
else:
2 End of stream - pos must be 0.
Returns the new absolute position.
"""
- if self.closed:
- raise ValueError("seek from closed file")
+ self._checkClosed()
self._pending = ""
return self._seek(pos, whence)
returned by tell(). Imply an absolute seek to pos.
Returns the new absolute position.
"""
- if self.closed:
- raise ValueError("truncate from closed file")
+ self._checkClosed()
self._pending = ""
return self._truncate(pos)
def readline(self, limit: int = None) -> str:
- if self.closed:
- raise ValueError("read from closed file")
+ self._checkClosed()
if limit is None:
limit = -1
if limit >= 0:
f.close()
g.close()
+ def test_io_after_close(self):
+ for kwargs in [
+ {"mode": "w"},
+ {"mode": "wb"},
+ {"mode": "w", "buffering": 1},
+ {"mode": "w", "buffering": 2},
+ {"mode": "wb", "buffering": 0},
+ {"mode": "r"},
+ {"mode": "rb"},
+ {"mode": "r", "buffering": 1},
+ {"mode": "r", "buffering": 2},
+ {"mode": "rb", "buffering": 0},
+ {"mode": "w+"},
+ {"mode": "w+b"},
+ {"mode": "w+", "buffering": 1},
+ {"mode": "w+", "buffering": 2},
+ {"mode": "w+b", "buffering": 0},
+ ]:
+ f = io.open(support.TESTFN, **kwargs)
+ f.close()
+ self.assertRaises(ValueError, f.flush)
+ self.assertRaises(ValueError, f.fileno)
+ self.assertRaises(ValueError, f.isatty)
+ self.assertRaises(ValueError, f.__iter__)
+ if hasattr(f, "peek"):
+ self.assertRaises(ValueError, f.peek, 1)
+ self.assertRaises(ValueError, f.read)
+ if hasattr(f, "read1"):
+ self.assertRaises(ValueError, f.read1, 1024)
+ if hasattr(f, "readinto"):
+ self.assertRaises(ValueError, f.readinto, bytearray(1024))
+ self.assertRaises(ValueError, f.readline)
+ self.assertRaises(ValueError, f.readlines)
+ self.assertRaises(ValueError, f.seek, 0)
+ self.assertRaises(ValueError, f.tell)
+ self.assertRaises(ValueError, f.truncate)
+ self.assertRaises(ValueError, f.write, "")
+ self.assertRaises(ValueError, f.writelines, [])
+
def test_main():
support.run_unittest(IOTest, BytesIOTest, StringIOTest,