Add io.BytesIO.peek() method to read without advancing position.
Co-authored-by: blurb-it[bot] <43283697+blurb-it[bot]@users.noreply.github.com>
Co-authored-by: Erlend E. Aasland <erlend.aasland@protonmail.com>
Co-authored-by: Victor Stinner <vstinner@python.org>
Co-authored-by: Emma Smith <emma@emmatyping.dev>
Co-authored-by: Stan Ulbrych <stan@python.org>
Co-authored-by: Cody Maloney <cmaloney@users.noreply.github.com>
Return :class:`bytes` containing the entire contents of the buffer.
+ .. method:: peek(size=0, /)
+
+ Return a copy of the buffer from the current position onwards without
+ advancing the position.
+
+ If *size* is less than one or omitted, at most
+ :data:`DEFAULT_BUFFER_SIZE` bytes are returned.
+ Otherwise, at most *size* bytes are returned.
+ Return an empty :class:`bytes` object at EOF.
+
+ .. versionadded:: next
.. method:: read1(size=-1, /)
which is passed on to the constructor of the :class:`~gzip.GzipFile` class.
(Contributed by Marin Misur in :gh:`91372`.)
+io
+--
+
+* Add :meth:`io.BytesIO.peek` method to read without advancing position.
+ (Contributed by Marcel Martin in :gh:`90533`.)
+
logging
-------
raise ValueError("tell on closed file")
return self._pos
+ def peek(self, size=0):
+ if self.closed:
+ raise ValueError("peek on closed file")
+ if size < 1:
+ return self._buffer[self._pos:self._pos + io.DEFAULT_BUFFER_SIZE]
+ return self._buffer[self._pos:self._pos + size]
+
def truncate(self, pos=None):
if self.closed:
raise ValueError("truncate on closed file")
barrier.wait()
b.readinto(into)
+ def peek(barrier, b, *ignore):
+ barrier.wait()
+ b.peek()
+
def close(barrier, b, *ignore):
barrier.wait()
b.close()
self.check([truncate] + [readline] * 10, self.ioclass(b'0\n'*20480))
self.check([truncate] + [readlines] * 10, self.ioclass(b'0\n'*20480))
self.check([truncate] + [readinto] * 10, self.ioclass(b'0\n'*204800), bytearray(b'0\n'*204800))
+ self.check([truncate] + [peek] * 10, self.ioclass(b'0\n'*204800))
self.check([close] + [write] * 10, self.ioclass())
self.check([truncate] + [getvalue] * 10, self.ioclass(b'0\n'*204800))
self.check([truncate] + [getbuffer] * 10, self.ioclass(b'0\n'*204800))
buf = bytearray(2)
self.assertEqual(0, memio.readinto(buf))
+ def test_peek(self):
+ buf = self.buftype("1234567890")
+ with self.ioclass(buf) as memio:
+ self.assertEqual(memio.tell(), 0)
+ self.assertEqual(memio.peek(1), buf[:1])
+ self.assertEqual(memio.peek(1), buf[:1])
+ self.assertEqual(memio.peek(), buf)
+ self.assertEqual(memio.peek(3), buf[:3])
+ self.assertEqual(memio.peek(5), buf[:5])
+ self.assertEqual(memio.peek(0), buf)
+ self.assertEqual(memio.peek(len(buf) + 100), buf)
+ self.assertEqual(memio.peek(-1), buf)
+ self.assertEqual(memio.tell(), 0)
+
+ memio.read(1)
+ self.assertEqual(memio.tell(), 1)
+ self.assertEqual(memio.peek(1), buf[1:2])
+ self.assertEqual(memio.peek(), buf[1:])
+ self.assertEqual(memio.peek(3), buf[1:4])
+ self.assertEqual(memio.peek(5), buf[1:6])
+ self.assertEqual(memio.peek(0), buf[1:])
+ self.assertEqual(memio.peek(len(buf) + 100), buf[1:])
+ self.assertEqual(memio.peek(-1), buf[1:])
+ self.assertEqual(memio.tell(), 1)
+
+ memio.read()
+ self.assertEqual(memio.tell(), len(buf))
+ self.assertEqual(memio.peek(1), self.EOF)
+ self.assertEqual(memio.peek(3), self.EOF)
+ self.assertEqual(memio.peek(5), self.EOF)
+ self.assertEqual(memio.peek(0), b"")
+ self.assertEqual(memio.tell(), len(buf))
+
+ # Peeking works after writing
+ abc = self.buftype("abc")
+ memio.write(abc)
+ self.assertEqual(memio.peek(), self.EOF)
+ memio.seek(len(buf))
+ self.assertEqual(memio.peek(), abc)
+ self.assertEqual(memio.peek(-1), abc)
+ self.assertEqual(memio.peek(len(abc) + 100), abc)
+ self.assertEqual(memio.tell(), len(buf))
+
+ with self.ioclass(buf) as memio:
+ memio.seek(len(buf))
+ self.assertEqual(memio.peek(), self.EOF)
+
+ # Length greater than DEFAULT_BUFFER_SIZE
+ buf = self.buftype("1234567890" * io.DEFAULT_BUFFER_SIZE)
+ with self.ioclass(buf) as memio:
+ self.assertEqual(memio.peek(), buf[:io.DEFAULT_BUFFER_SIZE])
+ self.assertEqual(memio.peek(0), buf[:io.DEFAULT_BUFFER_SIZE])
+ self.assertEqual(memio.peek(-1), buf[:io.DEFAULT_BUFFER_SIZE])
+ self.assertEqual(memio.peek(io.DEFAULT_BUFFER_SIZE + 100),
+ buf[:io.DEFAULT_BUFFER_SIZE + 100])
+ self.assertEqual(memio.peek(io.DEFAULT_BUFFER_SIZE * 100), buf)
+
+ # Current position beyond buffer end
+ with self.ioclass(buf) as memio:
+ memio.seek(len(buf) + 100)
+ self.assertEqual(memio.peek(), self.EOF)
+ with self.ioclass(buf) as memio:
+ memio.read()
+ memio.truncate(0)
+ self.assertEqual(memio.tell(), len(buf))
+ self.assertEqual(memio.peek(), self.EOF)
+
+
+ # Peek after close raises
+ self.assertRaises(ValueError, memio.peek)
+
def test_unicode(self):
memio = self.ioclass()
--- /dev/null
+Add :meth:`io.BytesIO.peek` method to read without advancing position.
return PyLong_FromSsize_t(self->pos);
}
+/* Read without advancing position. */
static PyObject *
-read_bytes_lock_held(bytesio *self, Py_ssize_t size)
+peek_bytes_lock_held(bytesio *self, Py_ssize_t size)
{
_Py_CRITICAL_SECTION_ASSERT_OBJECT_LOCKED(self);
if (size > 1 &&
self->pos == 0 && size == PyBytes_GET_SIZE(self->buf) &&
FT_ATOMIC_LOAD_SSIZE_RELAXED(self->exports) == 0) {
- self->pos += size;
return Py_NewRef(self->buf);
}
}
output = PyBytes_AS_STRING(self->buf) + self->pos;
- self->pos += size;
return PyBytes_FromStringAndSize(output, size);
}
+static PyObject *
+read_bytes_lock_held(bytesio *self, Py_ssize_t size)
+{
+ PyObject *bytes = peek_bytes_lock_held(self, size);
+ if (bytes != NULL) {
+ assert(PyBytes_GET_SIZE(bytes) == size);
+ self->pos += size;
+ }
+ return bytes;
+}
+
/*[clinic input]
@critical_section
_io.BytesIO.read
return _io_BytesIO_read_impl(self, size);
}
+
+/*[clinic input]
+@critical_section
+_io.BytesIO.peek
+ size: Py_ssize_t = 0
+ /
+
+Return bytes from the stream without advancing the position.
+
+Return an empty bytes object at EOF.
+[clinic start generated code]*/
+
+static PyObject *
+_io_BytesIO_peek_impl(bytesio *self, Py_ssize_t size)
+/*[clinic end generated code: output=fa4d8ce28b35db9b input=2ce74234b10aec3e]*/
+{
+ CHECK_CLOSED(self);
+
+ if (size < 1) {
+ size = DEFAULT_BUFFER_SIZE;
+ }
+
+ /* adjust invalid sizes */
+ Py_ssize_t n = self->string_size - self->pos;
+ if (size > n) {
+ size = n;
+ /* n can be negative after truncate() or seek() */
+ if (size < 0) {
+ size = 0;
+ }
+ }
+ return peek_bytes_lock_held(self, size);
+}
+
+
/*[clinic input]
@critical_section
_io.BytesIO.readline
_IO_BYTESIO_READLINE_METHODDEF
_IO_BYTESIO_READLINES_METHODDEF
_IO_BYTESIO_READ_METHODDEF
+ _IO_BYTESIO_PEEK_METHODDEF
_IO_BYTESIO_GETBUFFER_METHODDEF
_IO_BYTESIO_GETVALUE_METHODDEF
_IO_BYTESIO_SEEK_METHODDEF
return return_value;
}
+PyDoc_STRVAR(_io_BytesIO_peek__doc__,
+"peek($self, size=0, /)\n"
+"--\n"
+"\n"
+"Return bytes from the stream without advancing the position.\n"
+"\n"
+"Return an empty bytes object at EOF.");
+
+#define _IO_BYTESIO_PEEK_METHODDEF \
+ {"peek", _PyCFunction_CAST(_io_BytesIO_peek), METH_FASTCALL, _io_BytesIO_peek__doc__},
+
+static PyObject *
+_io_BytesIO_peek_impl(bytesio *self, Py_ssize_t size);
+
+static PyObject *
+_io_BytesIO_peek(PyObject *self, PyObject *const *args, Py_ssize_t nargs)
+{
+ PyObject *return_value = NULL;
+ Py_ssize_t size = 0;
+
+ if (!_PyArg_CheckPositional("peek", nargs, 0, 1)) {
+ goto exit;
+ }
+ if (nargs < 1) {
+ goto skip_optional;
+ }
+ {
+ Py_ssize_t ival = -1;
+ PyObject *iobj = _PyNumber_Index(args[0]);
+ if (iobj != NULL) {
+ ival = PyLong_AsSsize_t(iobj);
+ Py_DECREF(iobj);
+ }
+ if (ival == -1 && PyErr_Occurred()) {
+ goto exit;
+ }
+ size = ival;
+ }
+skip_optional:
+ Py_BEGIN_CRITICAL_SECTION(self);
+ return_value = _io_BytesIO_peek_impl((bytesio *)self, size);
+ Py_END_CRITICAL_SECTION();
+
+exit:
+ return return_value;
+}
+
PyDoc_STRVAR(_io_BytesIO_readline__doc__,
"readline($self, size=-1, /)\n"
"--\n"
exit:
return return_value;
}
-/*[clinic end generated code: output=eac3911e207aaf45 input=a9049054013a1b77]*/
+/*[clinic end generated code: output=b5e625e31b2a82f0 input=a9049054013a1b77]*/