LimitOverrunError exception will be raised, and the data
will be left in the internal buffer, so it can be read again.
- The ``separator`` may also be an iterable of separators. In this
+ The ``separator`` may also be a tuple of separators. In this
case the return value will be the shortest possible that has any
separator as the suffix. For the purposes of LimitOverrunError,
the shortest possible separator is considered to be the one that
matched.
"""
- if isinstance(separator, bytes):
- separator = [separator]
- else:
- # Makes sure shortest matches wins, and supports arbitrary iterables
+ if isinstance(separator, tuple):
+ # Makes sure shortest matches wins
separator = sorted(separator, key=len)
+ else:
+ separator = [separator]
if not separator:
raise ValueError('Separator should contain at least one element')
min_seplen = len(separator[0])
with self.assertRaisesRegex(ValueError, 'Separator should be'):
self.loop.run_until_complete(stream.readuntil(separator=b''))
with self.assertRaisesRegex(ValueError, 'Separator should be'):
- self.loop.run_until_complete(stream.readuntil(separator=[b'']))
+ self.loop.run_until_complete(stream.readuntil(separator=(b'',)))
with self.assertRaisesRegex(ValueError, 'Separator should contain'):
- self.loop.run_until_complete(stream.readuntil(separator=[]))
+ self.loop.run_until_complete(stream.readuntil(separator=()))
def test_readuntil_multi_chunks(self):
stream = asyncio.StreamReader(loop=self.loop)
# Simple case
stream.feed_data(b'line 1\nline 2\r')
- data = self.loop.run_until_complete(stream.readuntil([b'\r', b'\n']))
+ data = self.loop.run_until_complete(stream.readuntil((b'\r', b'\n')))
self.assertEqual(b'line 1\n', data)
- data = self.loop.run_until_complete(stream.readuntil([b'\r', b'\n']))
+ data = self.loop.run_until_complete(stream.readuntil((b'\r', b'\n')))
self.assertEqual(b'line 2\r', data)
self.assertEqual(b'', stream._buffer)
# First end position matches, even if that's a longer match
stream.feed_data(b'ABCDEFG')
- data = self.loop.run_until_complete(stream.readuntil([b'DEF', b'BCDE']))
+ data = self.loop.run_until_complete(stream.readuntil((b'DEF', b'BCDE')))
self.assertEqual(b'ABCDE', data)
self.assertEqual(b'FG', stream._buffer)
with self.assertRaisesRegex(asyncio.LimitOverrunError,
'is found') as cm:
- self.loop.run_until_complete(stream.readuntil([b'A', b'ome dataA']))
+ self.loop.run_until_complete(stream.readuntil((b'A', b'ome dataA')))
self.assertEqual(b'some dataA', stream._buffer)
stream = asyncio.StreamReader(loop=self.loop)
stream.feed_data(b'data')
- readuntil_task = self.loop.create_task(stream.readuntil([b'A', b'long sep']))
+ readuntil_task = self.loop.create_task(stream.readuntil((b'A', b'long sep')))
self.loop.call_soon(stream.feed_data, b'Z')
self.loop.call_soon(stream.feed_data, b'Aaaa')
self.assertEqual(b'dataZA', data)
self.assertEqual(b'aaa', stream._buffer)
+ def test_readuntil_bytearray(self):
+ stream = asyncio.StreamReader(loop=self.loop)
+ stream.feed_data(b'some data\r\n')
+ data = self.loop.run_until_complete(stream.readuntil(bytearray(b'\r\n')))
+ self.assertEqual(b'some data\r\n', data)
+ self.assertEqual(b'', stream._buffer)
+
def test_readexactly_zero_or_less(self):
# Read exact number of bytes (zero or less).
stream = asyncio.StreamReader(loop=self.loop)