import pickle
import pickletools
import random
+import struct
import sys
import unittest
import weakref
data = 1 << (8 * size)
try:
for proto in protocols:
+ if proto < 2:
+ continue
with self.subTest(proto=proto):
- if proto < 2:
- continue
with self.assertRaises((ValueError, OverflowError)):
self.dumps(data, protocol=proto)
finally:
data = b"abcd" * (size // 4)
try:
for proto in protocols:
+ if proto < 3:
+ continue
with self.subTest(proto=proto):
- if proto < 3:
- continue
try:
pickled = self.dumps(data, protocol=proto)
- self.assertTrue(b"abcd" in pickled[:19])
- self.assertTrue(b"abcd" in pickled[-18:])
+ header = (pickle.BINBYTES +
+ struct.pack("<I", len(data)))
+ data_start = pickled.index(data)
+ self.assertEqual(
+ header,
+ pickled[data_start-len(header):data_start])
finally:
pickled = None
finally:
@bigmemtest(size=_4G, memuse=1 + 1, dry_run=False)
def test_huge_bytes_64b(self, size):
- data = b"a" * size
+ data = b"acbd" * (size // 4)
try:
for proto in protocols:
+ if proto < 3:
+ continue
with self.subTest(proto=proto):
- if proto < 3:
+ if proto == 3:
+ # Protocol 3 does not support large bytes objects.
+ # Verify that we do not crash when processing one.
+ with self.assertRaises((ValueError, OverflowError)):
+ self.dumps(data, protocol=proto)
continue
- with self.assertRaises((ValueError, OverflowError)):
- self.dumps(data, protocol=proto)
+ try:
+ pickled = self.dumps(data, protocol=proto)
+ header = (pickle.BINBYTES8 +
+ struct.pack("<Q", len(data)))
+ data_start = pickled.index(data)
+ self.assertEqual(
+ header,
+ pickled[data_start-len(header):data_start])
+ finally:
+ pickled = None
finally:
data = None
data = "abcd" * (size // 4)
try:
for proto in protocols:
+ if proto == 0:
+ continue
with self.subTest(proto=proto):
try:
pickled = self.dumps(data, protocol=proto)
- self.assertTrue(b"abcd" in pickled[:19])
- self.assertTrue(b"abcd" in pickled[-18:])
+ header = (pickle.BINUNICODE +
+ struct.pack("<I", len(data)))
+ data_start = pickled.index(b'abcd')
+ self.assertEqual(
+ header,
+ pickled[data_start-len(header):data_start])
+ self.assertEqual((pickled.rindex(b"abcd") + len(b"abcd") -
+ pickled.index(b"abcd")), len(data))
finally:
pickled = None
finally:
data = "abcd" * (size // 4)
try:
for proto in protocols:
+ if proto == 0:
+ continue
with self.subTest(proto=proto):
- if proto == 0:
- continue
if proto < 4:
with self.assertRaises((ValueError, OverflowError)):
self.dumps(data, protocol=proto)
- else:
- try:
- pickled = self.dumps(data, protocol=proto)
- self.assertTrue(b"abcd" in pickled[:19])
- self.assertTrue(b"abcd" in pickled[-18:])
- finally:
- pickled = None
+ continue
+ try:
+ pickled = self.dumps(data, protocol=proto)
+ header = (pickle.BINUNICODE8 +
+ struct.pack("<Q", len(data)))
+ data_start = pickled.index(b'abcd')
+ self.assertEqual(
+ header,
+ pickled[data_start-len(header):data_start])
+ self.assertEqual((pickled.rindex(b"abcd") + len(b"abcd") -
+ pickled.index(b"abcd")), len(data))
+ finally:
+ pickled = None
finally:
data = None