import types
import unittest
import weakref
+import __main__
from textwrap import dedent
from http.cookies import SimpleCookie
from test import support
from test.support import os_helper
from test.support import (
- TestFailed, run_with_locales, no_tracing,
+ run_with_locales, no_tracing,
_2G, _4G, bigmemtest
)
from test.support.import_helper import forget
from test.support.os_helper import TESTFN
from test.support import threading_helper
from test.support.warnings_helper import save_restore_warnings_filters
+from test import picklecommon
+from test.picklecommon import *
from pickle import bytes_types
if pair is not None:
copyreg.add_extension(pair[0], pair[1], code)
-class C:
- def __eq__(self, other):
- return self.__dict__ == other.__dict__
-
-class D(C):
- def __init__(self, arg):
- pass
-
-class E(C):
- def __getinitargs__(self):
- return ()
-
-import __main__
-__main__.C = C
-C.__module__ = "__main__"
-__main__.D = D
-D.__module__ = "__main__"
-__main__.E = E
-E.__module__ = "__main__"
-
-# Simple mutable object.
-class Object:
- pass
-
-# Hashable immutable key object containing unheshable mutable data.
-class K:
- def __init__(self, value):
- self.value = value
-
- def __reduce__(self):
- # Shouldn't support the recursion itself
- return K, (self.value,)
-
-class myint(int):
- def __init__(self, x):
- self.str = str(x)
-
-class initarg(C):
-
- def __init__(self, a, b):
- self.a = a
- self.b = b
-
- def __getinitargs__(self):
- return self.a, self.b
-
-class metaclass(type):
- pass
-
-class use_metaclass(object, metaclass=metaclass):
- pass
-
class pickling_metaclass(type):
def __eq__(self, other):
return (type(self) == type(other) and
return result
-class ZeroCopyBytes(bytes):
- readonly = True
- c_contiguous = True
- f_contiguous = True
- zero_copy_reconstruct = True
-
- def __reduce_ex__(self, protocol):
- if protocol >= 5:
- return type(self)._reconstruct, (pickle.PickleBuffer(self),), None
- else:
- return type(self)._reconstruct, (bytes(self),)
-
- def __repr__(self):
- return "{}({!r})".format(self.__class__.__name__, bytes(self))
-
- __str__ = __repr__
-
- @classmethod
- def _reconstruct(cls, obj):
- with memoryview(obj) as m:
- obj = m.obj
- if type(obj) is cls:
- # Zero-copy
- return obj
- else:
- return cls(obj)
-
-
-class ZeroCopyBytearray(bytearray):
- readonly = False
- c_contiguous = True
- f_contiguous = True
- zero_copy_reconstruct = True
-
- def __reduce_ex__(self, protocol):
- if protocol >= 5:
- return type(self)._reconstruct, (pickle.PickleBuffer(self),), None
- else:
- return type(self)._reconstruct, (bytes(self),)
-
- def __repr__(self):
- return "{}({!r})".format(self.__class__.__name__, bytes(self))
-
- __str__ = __repr__
-
- @classmethod
- def _reconstruct(cls, obj):
- with memoryview(obj) as m:
- obj = m.obj
- if type(obj) is cls:
- # Zero-copy
- return obj
- else:
- return cls(obj)
-
-
if _testbuffer is not None:
class PicklableNDArray:
return not (self == other)
def __repr__(self):
- return (f"{type(self)}(shape={self.array.shape},"
- f"strides={self.array.strides}, "
- f"bytes={self.array.tobytes()})")
+ return ("{name}(shape={array.shape},"
+ "strides={array.strides}, "
+ "bytes={array.tobytes()})").format(
+ name=type(self).__name__, array=self.array.shape)
def __reduce_ex__(self, protocol):
if not self.array.contiguous:
self.loads(b'\x82\x01.')
check(None, ValueError)
check((), ValueError)
- check((__name__,), (TypeError, ValueError))
- check((__name__, "MyList", "x"), (TypeError, ValueError))
- check((__name__, None), (TypeError, ValueError))
+ check((MyList.__module__,), (TypeError, ValueError))
+ check((MyList.__module__, "MyList", "x"), (TypeError, ValueError))
+ check((MyList.__module__, None), (TypeError, ValueError))
check((None, "MyList"), (TypeError, ValueError))
def test_bad_reduce(self):
self.assertEqual(str(cm.exception),
'__reduce__ must return a string or tuple, not list')
self.assertEqual(cm.exception.__notes__, [
- 'when serializing test.pickletester.REX object'])
+ f'when serializing {REX.__module__}.REX object'])
obj = REX((print,))
for proto in protocols:
self.assertEqual(str(cm.exception),
'tuple returned by __reduce__ must contain 2 through 6 elements')
self.assertEqual(cm.exception.__notes__, [
- 'when serializing test.pickletester.REX object'])
+ f'when serializing {REX.__module__}.REX object'])
obj = REX((print, (), None, None, None, None, None))
for proto in protocols:
self.assertEqual(str(cm.exception),
'tuple returned by __reduce__ must contain 2 through 6 elements')
self.assertEqual(cm.exception.__notes__, [
- 'when serializing test.pickletester.REX object'])
+ f'when serializing {REX.__module__}.REX object'])
def test_bad_reconstructor(self):
obj = REX((42, ()))
'first item of the tuple returned by __reduce__ '
'must be callable, not int')
self.assertEqual(cm.exception.__notes__, [
- 'when serializing test.pickletester.REX object'])
+ f'when serializing {REX.__module__}.REX object'])
def test_unpickleable_reconstructor(self):
obj = REX((UnpickleableCallable(), ()))
with self.assertRaises(CustomError) as cm:
self.dumps(obj, proto)
self.assertEqual(cm.exception.__notes__, [
- 'when serializing test.pickletester.REX reconstructor',
- 'when serializing test.pickletester.REX object'])
+ f'when serializing {REX.__module__}.REX reconstructor',
+ f'when serializing {REX.__module__}.REX object'])
def test_bad_reconstructor_args(self):
obj = REX((print, []))
'second item of the tuple returned by __reduce__ '
'must be a tuple, not list')
self.assertEqual(cm.exception.__notes__, [
- 'when serializing test.pickletester.REX object'])
+ f'when serializing {REX.__module__}.REX object'])
def test_unpickleable_reconstructor_args(self):
obj = REX((print, (1, 2, UNPICKLEABLE)))
self.dumps(obj, proto)
self.assertEqual(cm.exception.__notes__, [
'when serializing tuple item 2',
- 'when serializing test.pickletester.REX reconstructor arguments',
- 'when serializing test.pickletester.REX object'])
+ f'when serializing {REX.__module__}.REX reconstructor arguments',
+ f'when serializing {REX.__module__}.REX object'])
def test_bad_newobj_args(self):
obj = REX((copyreg.__newobj__, ()))
'tuple index out of range',
'__newobj__ expected at least 1 argument, got 0'})
self.assertEqual(cm.exception.__notes__, [
- 'when serializing test.pickletester.REX object'])
+ f'when serializing {REX.__module__}.REX object'])
obj = REX((copyreg.__newobj__, [REX]))
for proto in protocols[2:]:
'second item of the tuple returned by __reduce__ '
'must be a tuple, not list')
self.assertEqual(cm.exception.__notes__, [
- 'when serializing test.pickletester.REX object'])
+ f'when serializing {REX.__module__}.REX object'])
def test_bad_newobj_class(self):
obj = REX((copyreg.__newobj__, (NoNew(),)))
self.dumps(obj, proto)
self.assertIn(str(cm.exception), {
'first argument to __newobj__() has no __new__',
- f'first argument to __newobj__() must be a class, not {__name__}.NoNew'})
+ f'first argument to __newobj__() must be a class, not {NoNew.__module__}.NoNew'})
self.assertEqual(cm.exception.__notes__, [
- 'when serializing test.pickletester.REX object'])
+ f'when serializing {REX.__module__}.REX object'])
def test_wrong_newobj_class(self):
obj = REX((copyreg.__newobj__, (str,)))
self.assertEqual(str(cm.exception),
f'first argument to __newobj__() must be {REX!r}, not {str!r}')
self.assertEqual(cm.exception.__notes__, [
- 'when serializing test.pickletester.REX object'])
+ f'when serializing {REX.__module__}.REX object'])
def test_unpickleable_newobj_class(self):
class LocalREX(REX): pass
if proto >= 2:
self.assertEqual(cm.exception.__notes__, [
'when serializing tuple item 2',
- 'when serializing test.pickletester.REX __new__ arguments',
- 'when serializing test.pickletester.REX object'])
+ f'when serializing {REX.__module__}.REX __new__ arguments',
+ f'when serializing {REX.__module__}.REX object'])
else:
self.assertEqual(cm.exception.__notes__, [
'when serializing tuple item 3',
- 'when serializing test.pickletester.REX reconstructor arguments',
- 'when serializing test.pickletester.REX object'])
+ f'when serializing {REX.__module__}.REX reconstructor arguments',
+ f'when serializing {REX.__module__}.REX object'])
def test_bad_newobj_ex_args(self):
obj = REX((copyreg.__newobj_ex__, ()))
'not enough values to unpack (expected 3, got 0)',
'__newobj_ex__ expected 3 arguments, got 0'})
self.assertEqual(cm.exception.__notes__, [
- 'when serializing test.pickletester.REX object'])
+ f'when serializing {REX.__module__}.REX object'])
obj = REX((copyreg.__newobj_ex__, 42))
for proto in protocols[2:]:
'second item of the tuple returned by __reduce__ '
'must be a tuple, not int')
self.assertEqual(cm.exception.__notes__, [
- 'when serializing test.pickletester.REX object'])
+ f'when serializing {REX.__module__}.REX object'])
obj = REX((copyreg.__newobj_ex__, (REX, 42, {})))
if self.pickler is pickle._Pickler:
self.assertEqual(str(cm.exception),
'Value after * must be an iterable, not int')
self.assertEqual(cm.exception.__notes__, [
- 'when serializing test.pickletester.REX object'])
+ f'when serializing {REX.__module__}.REX object'])
else:
for proto in protocols[2:]:
with self.subTest(proto=proto):
self.assertEqual(str(cm.exception),
'second argument to __newobj_ex__() must be a tuple, not int')
self.assertEqual(cm.exception.__notes__, [
- 'when serializing test.pickletester.REX object'])
+ f'when serializing {REX.__module__}.REX object'])
obj = REX((copyreg.__newobj_ex__, (REX, (), [])))
if self.pickler is pickle._Pickler:
self.assertEqual(str(cm.exception),
'Value after ** must be a mapping, not list')
self.assertEqual(cm.exception.__notes__, [
- 'when serializing test.pickletester.REX object'])
+ f'when serializing {REX.__module__}.REX object'])
else:
for proto in protocols[2:]:
with self.subTest(proto=proto):
self.assertEqual(str(cm.exception),
'third argument to __newobj_ex__() must be a dict, not list')
self.assertEqual(cm.exception.__notes__, [
- 'when serializing test.pickletester.REX object'])
+ f'when serializing {REX.__module__}.REX object'])
def test_bad_newobj_ex__class(self):
obj = REX((copyreg.__newobj_ex__, (NoNew(), (), {})))
self.dumps(obj, proto)
self.assertIn(str(cm.exception), {
'first argument to __newobj_ex__() has no __new__',
- f'first argument to __newobj_ex__() must be a class, not {__name__}.NoNew'})
+ f'first argument to __newobj_ex__() must be a class, not {NoNew.__module__}.NoNew'})
self.assertEqual(cm.exception.__notes__, [
- 'when serializing test.pickletester.REX object'])
+ f'when serializing {REX.__module__}.REX object'])
def test_wrong_newobj_ex_class(self):
if self.pickler is not pickle._Pickler:
self.assertEqual(str(cm.exception),
f'first argument to __newobj_ex__() must be {REX}, not {str}')
self.assertEqual(cm.exception.__notes__, [
- 'when serializing test.pickletester.REX object'])
+ f'when serializing {REX.__module__}.REX object'])
def test_unpickleable_newobj_ex_class(self):
class LocalREX(REX): pass
if proto >= 4:
self.assertEqual(cm.exception.__notes__, [
'when serializing tuple item 2',
- 'when serializing test.pickletester.REX __new__ arguments',
- 'when serializing test.pickletester.REX object'])
+ f'when serializing {REX.__module__}.REX __new__ arguments',
+ f'when serializing {REX.__module__}.REX object'])
elif proto >= 2:
self.assertEqual(cm.exception.__notes__, [
'when serializing tuple item 3',
'when serializing tuple item 1',
'when serializing functools.partial state',
'when serializing functools.partial object',
- 'when serializing test.pickletester.REX reconstructor',
- 'when serializing test.pickletester.REX object'])
+ f'when serializing {REX.__module__}.REX reconstructor',
+ f'when serializing {REX.__module__}.REX object'])
else:
self.assertEqual(cm.exception.__notes__, [
'when serializing tuple item 2',
'when serializing tuple item 1',
- 'when serializing test.pickletester.REX reconstructor arguments',
- 'when serializing test.pickletester.REX object'])
+ f'when serializing {REX.__module__}.REX reconstructor arguments',
+ f'when serializing {REX.__module__}.REX object'])
def test_unpickleable_newobj_ex_kwargs(self):
obj = REX((copyreg.__newobj_ex__, (REX, (), {'a': UNPICKLEABLE})))
if proto >= 4:
self.assertEqual(cm.exception.__notes__, [
"when serializing dict item 'a'",
- 'when serializing test.pickletester.REX __new__ arguments',
- 'when serializing test.pickletester.REX object'])
+ f'when serializing {REX.__module__}.REX __new__ arguments',
+ f'when serializing {REX.__module__}.REX object'])
elif proto >= 2:
self.assertEqual(cm.exception.__notes__, [
"when serializing dict item 'a'",
'when serializing tuple item 2',
'when serializing functools.partial state',
'when serializing functools.partial object',
- 'when serializing test.pickletester.REX reconstructor',
- 'when serializing test.pickletester.REX object'])
+ f'when serializing {REX.__module__}.REX reconstructor',
+ f'when serializing {REX.__module__}.REX object'])
else:
self.assertEqual(cm.exception.__notes__, [
"when serializing dict item 'a'",
'when serializing tuple item 2',
- 'when serializing test.pickletester.REX reconstructor arguments',
- 'when serializing test.pickletester.REX object'])
+ f'when serializing {REX.__module__}.REX reconstructor arguments',
+ f'when serializing {REX.__module__}.REX object'])
def test_unpickleable_state(self):
obj = REX_state(UNPICKLEABLE)
with self.assertRaises(CustomError) as cm:
self.dumps(obj, proto)
self.assertEqual(cm.exception.__notes__, [
- 'when serializing test.pickletester.REX_state state',
- 'when serializing test.pickletester.REX_state object'])
+ f'when serializing {REX_state.__module__}.REX_state state',
+ f'when serializing {REX_state.__module__}.REX_state object'])
def test_bad_state_setter(self):
if self.pickler is pickle._Pickler:
'sixth item of the tuple returned by __reduce__ '
'must be callable, not int')
self.assertEqual(cm.exception.__notes__, [
- 'when serializing test.pickletester.REX object'])
+ f'when serializing {REX.__module__}.REX object'])
def test_unpickleable_state_setter(self):
obj = REX((print, (), 'state', None, None, UnpickleableCallable()))
with self.assertRaises(CustomError) as cm:
self.dumps(obj, proto)
self.assertEqual(cm.exception.__notes__, [
- 'when serializing test.pickletester.REX state setter',
- 'when serializing test.pickletester.REX object'])
+ f'when serializing {REX.__module__}.REX state setter',
+ f'when serializing {REX.__module__}.REX object'])
def test_unpickleable_state_with_state_setter(self):
obj = REX((print, (), UNPICKLEABLE, None, None, print))
with self.assertRaises(CustomError) as cm:
self.dumps(obj, proto)
self.assertEqual(cm.exception.__notes__, [
- 'when serializing test.pickletester.REX state',
- 'when serializing test.pickletester.REX object'])
+ f'when serializing {REX.__module__}.REX state',
+ f'when serializing {REX.__module__}.REX object'])
def test_bad_object_list_items(self):
# Issue4176: crash when 4th and 5th items of __reduce__()
'fourth item of the tuple returned by __reduce__ '
'must be an iterator, not int'})
self.assertEqual(cm.exception.__notes__, [
- 'when serializing test.pickletester.REX object'])
+ f'when serializing {REX.__module__}.REX object'])
if self.pickler is not pickle._Pickler:
# Python implementation is less strict and also accepts iterables.
'fourth item of the tuple returned by __reduce__ '
'must be an iterator, not int')
self.assertEqual(cm.exception.__notes__, [
- 'when serializing test.pickletester.REX object'])
+ f'when serializing {REX.__module__}.REX object'])
def test_unpickleable_object_list_items(self):
obj = REX_six([1, 2, UNPICKLEABLE])
with self.assertRaises(CustomError) as cm:
self.dumps(obj, proto)
self.assertEqual(cm.exception.__notes__, [
- 'when serializing test.pickletester.REX_six item 2',
- 'when serializing test.pickletester.REX_six object'])
+ f'when serializing {REX_six.__module__}.REX_six item 2',
+ f'when serializing {REX_six.__module__}.REX_six object'])
def test_bad_object_dict_items(self):
# Issue4176: crash when 4th and 5th items of __reduce__()
'fifth item of the tuple returned by __reduce__ '
'must be an iterator, not int'})
self.assertEqual(cm.exception.__notes__, [
- 'when serializing test.pickletester.REX object'])
+ f'when serializing {REX.__module__}.REX object'])
for proto in protocols:
obj = REX((dict, (), None, None, iter([('a',)])))
'not enough values to unpack (expected 2, got 1)',
'dict items iterator must return 2-tuples'})
self.assertEqual(cm.exception.__notes__, [
- 'when serializing test.pickletester.REX object'])
+ f'when serializing {REX.__module__}.REX object'])
if self.pickler is not pickle._Pickler:
# Python implementation is less strict and also accepts iterables.
self.assertEqual(str(cm.exception),
'dict items iterator must return 2-tuples')
self.assertEqual(cm.exception.__notes__, [
- 'when serializing test.pickletester.REX object'])
+ f'when serializing {REX.__module__}.REX object'])
def test_unpickleable_object_dict_items(self):
obj = REX_seven({'a': UNPICKLEABLE})
with self.assertRaises(CustomError) as cm:
self.dumps(obj, proto)
self.assertEqual(cm.exception.__notes__, [
- "when serializing test.pickletester.REX_seven item 'a'",
- 'when serializing test.pickletester.REX_seven object'])
+ f"when serializing {REX_seven.__module__}.REX_seven item 'a'",
+ f'when serializing {REX_seven.__module__}.REX_seven object'])
def test_unpickleable_list_items(self):
obj = [1, [2, 3, UNPICKLEABLE]]
def test_global_lookup_error(self):
# Global name does not exist
obj = REX('spam')
- obj.__module__ = __name__
+ obj.__module__ = 'test.picklecommon'
for proto in protocols:
with self.subTest(proto=proto):
with self.assertRaises(pickle.PicklingError) as cm:
self.dumps(obj, proto)
self.assertEqual(str(cm.exception),
- f"Can't pickle {obj!r}: it's not found as {__name__}.spam")
+ f"Can't pickle {obj!r}: it's not found as test.picklecommon.spam")
self.assertEqual(str(cm.exception.__context__),
- f"module '{__name__}' has no attribute 'spam'")
+ "module 'test.picklecommon' has no attribute 'spam'")
obj.__module__ = 'nonexisting'
for proto in protocols:
def test_bad_ext_code(self):
# This should never happen in normal circumstances, because the type
# and the value of the extension code is checked in copyreg.add_extension().
- key = (__name__, 'MyList')
+ key = (MyList.__module__, 'MyList')
def check(code, exc):
assert key not in copyreg._extension_registry
assert code not in copyreg._inverted_registry
class AbstractPickleTests:
# Subclass must define self.dumps, self.loads.
+ py_version = sys.version_info # for test_xpickle
optimized = False
_testdata = AbstractUnpickleTests._testdata
def test_misc(self):
# test various datatypes not tested by testdata
for proto in protocols:
- x = myint(4)
- s = self.dumps(x, proto)
- y = self.loads(s)
- self.assert_is_copy(x, y)
+ with self.subTest('myint', proto=proto):
+ if self.py_version < (3, 0) and proto < 2:
+ self.skipTest('int subclasses are not interoperable with Python 2')
+ x = myint(4)
+ s = self.dumps(x, proto)
+ y = self.loads(s)
+ self.assert_is_copy(x, y)
- x = (1, ())
- s = self.dumps(x, proto)
- y = self.loads(s)
- self.assert_is_copy(x, y)
+ with self.subTest('tuple', proto=proto):
+ x = (1, ())
+ s = self.dumps(x, proto)
+ y = self.loads(s)
+ self.assert_is_copy(x, y)
- x = initarg(1, x)
- s = self.dumps(x, proto)
- y = self.loads(s)
- self.assert_is_copy(x, y)
+ with self.subTest('initarg', proto=proto):
+ if self.py_version < (3, 0):
+ self.skipTest('"classic" classes are not interoperable with Python 2')
+ x = initarg(1, x)
+ s = self.dumps(x, proto)
+ y = self.loads(s)
+ self.assert_is_copy(x, y)
# XXX test __reduce__ protocol?
def test_roundtrip_equality(self):
+ if self.py_version < (3, 0):
+ self.skipTest('"classic" classes are not interoperable with Python 2')
expected = self._testdata
for proto in protocols:
s = self.dumps(expected, proto)
self._test_recursive_tuple_and_dict_key(REX_seven, asdict=lambda x: x.table)
def test_recursive_set(self):
+ if self.py_version < (3, 4):
+ self.skipTest('not supported in Python < 3.4')
# Set containing an immutable object containing the original set.
y = set()
y.add(K(y))
def test_recursive_inst(self):
# Mutable object containing itself.
+ if self.py_version < (3, 0):
+ self.skipTest('"classic" classes are not interoperable with Python 2')
i = Object()
i.attr = i
for proto in protocols:
self.assertIs(x.attr, x)
def test_recursive_multi(self):
+ if self.py_version < (3, 0):
+ self.skipTest('"classic" classes are not interoperable with Python 2')
l = []
d = {1:l}
i = Object()
self.assertEqual(list(x[0].attr.keys()), [1])
self.assertIs(x[0].attr[1], x)
- def _test_recursive_collection_and_inst(self, factory):
+ def _test_recursive_collection_and_inst(self, factory, oldminproto=None):
+ if self.py_version < (3, 0):
+ self.skipTest('"classic" classes are not interoperable with Python 2')
# Mutable object containing a collection containing the original
# object.
o = Object()
o.attr = factory([o])
t = type(o.attr)
- for proto in protocols:
- s = self.dumps(o, proto)
- x = self.loads(s)
- self.assertIsInstance(x.attr, t)
- self.assertEqual(len(x.attr), 1)
- self.assertIsInstance(list(x.attr)[0], Object)
- self.assertIs(list(x.attr)[0], x)
+ with self.subTest('obj -> {t.__name__} -> obj'):
+ for proto in protocols:
+ with self.subTest(proto=proto):
+ s = self.dumps(o, proto)
+ x = self.loads(s)
+ self.assertIsInstance(x.attr, t)
+ self.assertEqual(len(x.attr), 1)
+ self.assertIsInstance(list(x.attr)[0], Object)
+ self.assertIs(list(x.attr)[0], x)
# Collection containing a mutable object containing the original
# collection.
o = o.attr
- for proto in protocols:
- s = self.dumps(o, proto)
- x = self.loads(s)
- self.assertIsInstance(x, t)
- self.assertEqual(len(x), 1)
- self.assertIsInstance(list(x)[0], Object)
- self.assertIs(list(x)[0].attr, x)
+ with self.subTest(f'{t.__name__} -> obj -> {t.__name__}'):
+ if self.py_version < (3, 4) and oldminproto is None:
+ self.skipTest('not supported in Python < 3.4')
+ for proto in protocols:
+ with self.subTest(proto=proto):
+ if self.py_version < (3, 4) and proto < oldminproto:
+ self.skipTest(f'requires protocol {oldminproto} in Python < 3.4')
+ s = self.dumps(o, proto)
+ x = self.loads(s)
+ self.assertIsInstance(x, t)
+ self.assertEqual(len(x), 1)
+ self.assertIsInstance(list(x)[0], Object)
+ self.assertIs(list(x)[0].attr, x)
def test_recursive_list_and_inst(self):
- self._test_recursive_collection_and_inst(list)
+ self._test_recursive_collection_and_inst(list, oldminproto=0)
def test_recursive_tuple_and_inst(self):
- self._test_recursive_collection_and_inst(tuple)
+ self._test_recursive_collection_and_inst(tuple, oldminproto=0)
def test_recursive_dict_and_inst(self):
- self._test_recursive_collection_and_inst(dict.fromkeys)
+ self._test_recursive_collection_and_inst(dict.fromkeys, oldminproto=0)
def test_recursive_set_and_inst(self):
self._test_recursive_collection_and_inst(set)
self._test_recursive_collection_and_inst(frozenset)
def test_recursive_list_subclass_and_inst(self):
- self._test_recursive_collection_and_inst(MyList)
+ self._test_recursive_collection_and_inst(MyList, oldminproto=2)
def test_recursive_tuple_subclass_and_inst(self):
self._test_recursive_collection_and_inst(MyTuple)
def test_recursive_dict_subclass_and_inst(self):
- self._test_recursive_collection_and_inst(MyDict.fromkeys)
+ self._test_recursive_collection_and_inst(MyDict.fromkeys, oldminproto=2)
def test_recursive_set_subclass_and_inst(self):
self._test_recursive_collection_and_inst(MySet)
def test_unicode_memoization(self):
# Repeated str is re-used (even when escapes added).
+ if self.py_version < (3, 0):
+ self.skipTest('not supported in Python < 3.0')
for proto in protocols:
for s in '', 'xyz', 'xyz\n', 'x\\yz', 'x\xa1yz\r':
p = self.dumps((s, s), proto)
self.assert_is_copy(s, self.loads(p))
def test_bytes_memoization(self):
+ array_types = [bytes]
+ if self.py_version >= (3, 4):
+ array_types += [ZeroCopyBytes]
for proto in protocols:
- for array_type in [bytes, ZeroCopyBytes]:
+ for array_type in array_types:
for s in b'', b'xyz', b'xyz'*100:
+ b = array_type(s)
+ expected = (b, b) if self.py_version >= (3, 0) else (b.decode(),)*2
with self.subTest(proto=proto, array_type=array_type, s=s, independent=False):
- b = array_type(s)
p = self.dumps((b, b), proto)
x, y = self.loads(p)
self.assertIs(x, y)
- self.assert_is_copy((b, b), (x, y))
+ self.assert_is_copy(expected, (x, y))
+ b2 = array_type(s)
with self.subTest(proto=proto, array_type=array_type, s=s, independent=True):
- b1, b2 = array_type(s), array_type(s)
- p = self.dumps((b1, b2), proto)
- # Note that (b1, b2) = self.loads(p) might have identical
- # components, i.e., b1 is b2, but this is not always the
+ p = self.dumps((b, b2), proto)
+ # Note that (b, b2) = self.loads(p) might have identical
+ # components, i.e., b is b2, but this is not always the
# case if the content is large (equality still holds).
- self.assert_is_copy((b1, b2), self.loads(p))
+ self.assert_is_copy(expected, self.loads(p))
def test_bytearray(self):
for proto in protocols:
self.assertTrue(opcode_in_pickle(pickle.BYTEARRAY8, p))
def test_bytearray_memoization(self):
+ array_types = [bytearray]
+ if self.py_version >= (3, 4):
+ array_types += [ZeroCopyBytearray]
for proto in protocols:
- for array_type in [bytearray, ZeroCopyBytearray]:
+ for array_type in array_types:
for s in b'', b'xyz', b'xyz'*100:
with self.subTest(proto=proto, array_type=array_type, s=s, independent=False):
b = array_type(s)
def test_reduce(self):
for proto in protocols:
- inst = AAA()
- dumped = self.dumps(inst, proto)
- loaded = self.loads(dumped)
- self.assertEqual(loaded, REDUCE_A)
+ with self.subTest(proto=proto):
+ if self.py_version < (3, 4) and proto < 3:
+ self.skipTest('str is not interoperable with Python < 3.4')
+ inst = AAA()
+ dumped = self.dumps(inst, proto)
+ loaded = self.loads(dumped)
+ self.assertEqual(loaded, REDUCE_A)
def test_getinitargs(self):
+ if self.py_version < (3, 0):
+ self.skipTest('"classic" classes are not interoperable with Python 2')
for proto in protocols:
inst = initarg(1, 2)
dumped = self.dumps(inst, proto)
self.assert_is_copy(inst, loaded)
def test_metaclass(self):
+ self.assertEqual(type(use_metaclass), metaclass)
a = use_metaclass()
for proto in protocols:
s = self.dumps(a, proto)
s = self.dumps(t, proto)
u = self.loads(s)
self.assert_is_copy(t, u)
+ if self.py_version < (3, 4):
+ # module 'os' has no attributes '_make_stat_result' and
+ # '_make_statvfs_result'
+ continue
t = os.stat(os.curdir)
s = self.dumps(t, proto)
u = self.loads(s)
self.assert_is_copy(t, u)
def test_ellipsis(self):
+ if self.py_version < (3, 3):
+ self.skipTest('not supported in Python < 3.3')
for proto in protocols:
- s = self.dumps(..., proto)
- u = self.loads(s)
- self.assertIs(..., u)
+ with self.subTest(proto=proto):
+ s = self.dumps(..., proto)
+ u = self.loads(s)
+ self.assertIs(..., u)
def test_notimplemented(self):
+ if self.py_version < (3, 3):
+ self.skipTest('not supported in Python < 3.3')
for proto in protocols:
- s = self.dumps(NotImplemented, proto)
- u = self.loads(s)
- self.assertIs(NotImplemented, u)
+ with self.subTest(proto=proto):
+ s = self.dumps(NotImplemented, proto)
+ u = self.loads(s)
+ self.assertIs(NotImplemented, u)
def test_singleton_types(self):
# Issue #6477: Test that types of built-in singletons can be pickled.
+ if self.py_version < (3, 3):
+ self.skipTest('not supported in Python < 3.3')
singletons = [None, ..., NotImplemented]
for singleton in singletons:
+ t = type(singleton)
for proto in protocols:
- s = self.dumps(type(singleton), proto)
- u = self.loads(s)
- self.assertIs(type(singleton), u)
+ with self.subTest(name=t.__name__, proto=proto):
+ s = self.dumps(t, proto)
+ u = self.loads(s)
+ self.assertIs(t, u)
def test_builtin_types(self):
+ new_names = {
+ 'bytes': (3, 0),
+ 'BuiltinImporter': (3, 3),
+ 'str': (3, 4), # not interoperable with Python < 3.4
+ }
for t in builtins.__dict__.values():
if isinstance(t, type) and not issubclass(t, BaseException):
+ if t.__name__ in new_names and self.py_version < new_names[t.__name__]:
+ continue
for proto in protocols:
- s = self.dumps(t, proto)
- self.assertIs(self.loads(s), t)
+ with self.subTest(name=t.__name__, proto=proto):
+ s = self.dumps(t, proto)
+ self.assertIs(self.loads(s), t)
def test_builtin_exceptions(self):
+ new_names = {
+ 'BlockingIOError': (3, 3),
+ 'BrokenPipeError': (3, 3),
+ 'ChildProcessError': (3, 3),
+ 'ConnectionError': (3, 3),
+ 'ConnectionAbortedError': (3, 3),
+ 'ConnectionRefusedError': (3, 3),
+ 'ConnectionResetError': (3, 3),
+ 'FileExistsError': (3, 3),
+ 'FileNotFoundError': (3, 3),
+ 'InterruptedError': (3, 3),
+ 'IsADirectoryError': (3, 3),
+ 'NotADirectoryError': (3, 3),
+ 'PermissionError': (3, 3),
+ 'ProcessLookupError': (3, 3),
+ 'TimeoutError': (3, 3),
+ 'RecursionError': (3, 5),
+ 'StopAsyncIteration': (3, 5),
+ 'ModuleNotFoundError': (3, 6),
+ 'EncodingWarning': (3, 10),
+ 'BaseExceptionGroup': (3, 11),
+ 'ExceptionGroup': (3, 11),
+ '_IncompleteInputError': (3, 13),
+ 'PythonFinalizationError': (3, 13),
+ }
for t in builtins.__dict__.values():
if isinstance(t, type) and issubclass(t, BaseException):
+ if t.__name__ in new_names and self.py_version < new_names[t.__name__]:
+ continue
for proto in protocols:
- s = self.dumps(t, proto)
- u = self.loads(s)
- if proto <= 2 and issubclass(t, OSError) and t is not BlockingIOError:
- self.assertIs(u, OSError)
- elif proto <= 2 and issubclass(t, ImportError):
- self.assertIs(u, ImportError)
- else:
- self.assertIs(u, t)
+ with self.subTest(name=t.__name__, proto=proto):
+ if self.py_version < (3, 3) and proto < 3:
+ self.skipTest('exception classes are not interoperable with Python < 3.3')
+ s = self.dumps(t, proto)
+ u = self.loads(s)
+ if proto <= 2 and issubclass(t, OSError) and t is not BlockingIOError:
+ self.assertIs(u, OSError)
+ elif proto <= 2 and issubclass(t, ImportError):
+ self.assertIs(u, ImportError)
+ else:
+ self.assertIs(u, t)
def test_builtin_functions(self):
+ new_names = {
+ '__build_class__': (3, 0),
+ 'ascii': (3, 0),
+ 'exec': (3, 0),
+ 'breakpoint': (3, 7),
+ 'aiter': (3, 10),
+ 'anext': (3, 10),
+ }
for t in builtins.__dict__.values():
if isinstance(t, types.BuiltinFunctionType):
+ if t.__name__ in new_names and self.py_version < new_names[t.__name__]:
+ continue
for proto in protocols:
- s = self.dumps(t, proto)
- self.assertIs(self.loads(s), t)
+ with self.subTest(name=t.__name__, proto=proto):
+ s = self.dumps(t, proto)
+ self.assertIs(self.loads(s), t)
# Tests for protocol 2
else:
self.assertEqual(count_opcode(pickle.PROTO, pickled), 0)
+ def test_bad_proto(self):
+ if self.py_version < (3, 8):
+ self.skipTest('no protocol validation in Python < 3.8')
oob = protocols[-1] + 1 # a future protocol
build_none = pickle.NONE + pickle.STOP
badpickle = pickle.PROTO + bytes([oob]) + build_none
def test_newobj_generic(self):
for proto in protocols:
for C in myclasses:
- B = C.__base__
- x = C(C.sample)
- x.foo = 42
- s = self.dumps(x, proto)
- y = self.loads(s)
- detail = (proto, C, B, x, y, type(y))
- self.assert_is_copy(x, y) # XXX revisit
- self.assertEqual(B(x), B(y), detail)
- self.assertEqual(x.__dict__, y.__dict__, detail)
+ with self.subTest(proto=proto, C=C):
+ if self.py_version < (3, 0) and proto < 2 and C in (MyInt, MyStr):
+ self.skipTest('int and str subclasses are not interoperable with Python 2')
+ if (3, 0) <= self.py_version < (3, 4) and proto < 2 and C in (MyStr, MyUnicode):
+ self.skipTest('str subclasses are not interoperable with Python < 3.4')
+ B = C.__base__
+ x = C(C.sample)
+ x.foo = 42
+ s = self.dumps(x, proto)
+ y = self.loads(s)
+ detail = (proto, C, B, x, y, type(y))
+ self.assert_is_copy(x, y) # XXX revisit
+ self.assertEqual(B(x), B(y), detail)
+ self.assertEqual(x.__dict__, y.__dict__, detail)
def test_newobj_proxies(self):
# NEWOBJ should use the __class__ rather than the raw type
classes = myclasses[:]
# Cannot create weakproxies to these classes
- for c in (MyInt, MyTuple):
+ for c in (MyInt, MyLong, MyTuple):
classes.remove(c)
for proto in protocols:
for C in classes:
- B = C.__base__
- x = C(C.sample)
- x.foo = 42
- p = weakref.proxy(x)
- s = self.dumps(p, proto)
- y = self.loads(s)
- self.assertEqual(type(y), type(x)) # rather than type(p)
- detail = (proto, C, B, x, y, type(y))
- self.assertEqual(B(x), B(y), detail)
- self.assertEqual(x.__dict__, y.__dict__, detail)
+ with self.subTest(proto=proto, C=C):
+ if self.py_version < (3, 4) and proto < 3 and C in (MyStr, MyUnicode):
+ self.skipTest('str subclasses are not interoperable with Python < 3.4')
+ B = C.__base__
+ x = C(C.sample)
+ x.foo = 42
+ p = weakref.proxy(x)
+ s = self.dumps(p, proto)
+ y = self.loads(s)
+ self.assertEqual(type(y), type(x)) # rather than type(p)
+ detail = (proto, C, B, x, y, type(y))
+ self.assertEqual(B(x), B(y), detail)
+ self.assertEqual(x.__dict__, y.__dict__, detail)
def test_newobj_overridden_new(self):
# Test that Python class with C implemented __new__ is pickleable
for proto in protocols:
- x = MyIntWithNew2(1)
- x.foo = 42
- s = self.dumps(x, proto)
- y = self.loads(s)
- self.assertIs(type(y), MyIntWithNew2)
- self.assertEqual(int(y), 1)
- self.assertEqual(y.foo, 42)
+ with self.subTest(proto=proto):
+ if self.py_version < (3, 0) and proto < 2:
+ self.skipTest('int subclasses are not interoperable with Python 2')
+ x = MyIntWithNew2(1)
+ x.foo = 42
+ s = self.dumps(x, proto)
+ y = self.loads(s)
+ self.assertIs(type(y), MyIntWithNew2)
+ self.assertEqual(int(y), 1)
+ self.assertEqual(y.foo, 42)
def test_newobj_not_class(self):
# Issue 24552
- global SimpleNewObj
- save = SimpleNewObj
+ if self.py_version < (3, 4):
+ self.skipTest('not supported in Python < 3.4')
o = SimpleNewObj.__new__(SimpleNewObj)
b = self.dumps(o, 4)
- try:
- SimpleNewObj = 42
+ with support.swap_attr(picklecommon, 'SimpleNewObj', 42):
self.assertRaises((TypeError, pickle.UnpicklingError), self.loads, b)
- finally:
- SimpleNewObj = save
# Register a type with copyreg, with extension code extcode. Pickle
# an object of that type. Check that the resulting pickle uses opcode
def produce_global_ext(self, extcode, opcode):
e = ExtensionSaver(extcode)
try:
- copyreg.add_extension(__name__, "MyList", extcode)
+ copyreg.add_extension(MyList.__module__, "MyList", extcode)
x = MyList([1, 2, 3])
x.foo = 42
x.bar = "hello"
# Dump using protocol 1 for comparison.
s1 = self.dumps(x, 1)
- self.assertIn(__name__.encode("utf-8"), s1)
+ self.assertIn(MyList.__module__.encode(), s1)
self.assertIn(b"MyList", s1)
self.assertFalse(opcode_in_pickle(opcode, s1))
# Dump using protocol 2 for test.
s2 = self.dumps(x, 2)
- self.assertNotIn(__name__.encode("utf-8"), s2)
+ self.assertNotIn(MyList.__module__.encode(), s2)
self.assertNotIn(b"MyList", s2)
self.assertEqual(opcode_in_pickle(opcode, s2), True, repr(s2))
x.abc = 666
for proto in protocols:
with self.subTest(proto=proto):
+ if self.py_version < (3, 0) and proto < 2:
+ self.skipTest('int subclasses are not interoperable with Python 2')
s = self.dumps(x, proto)
if proto < 1:
- self.assertIn(b'\nI64206', s) # INT
+ if self.py_version >= (3, 7):
+ self.assertIn(b'\nI64206', s) # INT
+ else: # for test_xpickle
+ self.assertIn(b'64206', s) # INT or LONG
else:
self.assertIn(b'M\xce\xfa', s) # BININT2
- self.assertEqual(opcode_in_pickle(pickle.NEWOBJ, s),
- 2 <= proto)
- self.assertFalse(opcode_in_pickle(pickle.NEWOBJ_EX, s))
+ if not (self.py_version < (3, 5) and proto == 4):
+ self.assertEqual(opcode_in_pickle(pickle.NEWOBJ, s),
+ 2 <= proto)
+ self.assertFalse(opcode_in_pickle(pickle.NEWOBJ_EX, s))
y = self.loads(s) # will raise TypeError if __init__ called
self.assert_is_copy(x, y)
x.abc = 666
for proto in protocols:
with self.subTest(proto=proto):
+ if self.py_version < (3, 0) and proto < 2:
+ self.skipTest('int subclasses are not interoperable with Python 2')
s = self.dumps(x, proto)
if proto < 1:
- self.assertIn(b'\nI64206', s) # INT
+ if self.py_version >= (3, 7):
+ self.assertIn(b'\nI64206', s) # INT
+ else: # for test_xpickle
+ self.assertIn(b'64206', s) # INT or LONG
elif proto < 2:
self.assertIn(b'M\xce\xfa', s) # BININT2
elif proto < 4:
- self.assertIn(b'X\x04\x00\x00\x00FACE', s) # BINUNICODE
+ if self.py_version >= (3, 0):
+ self.assertIn(b'X\x04\x00\x00\x00FACE', s) # BINUNICODE
+ else: # for test_xpickle
+ self.assertIn(b'U\x04FACE', s) # SHORT_BINSTRING
else:
self.assertIn(b'\x8c\x04FACE', s) # SHORT_BINUNICODE
- self.assertEqual(opcode_in_pickle(pickle.NEWOBJ, s),
- 2 <= proto)
- self.assertFalse(opcode_in_pickle(pickle.NEWOBJ_EX, s))
+ if not (self.py_version < (3, 5) and proto == 4):
+ self.assertEqual(opcode_in_pickle(pickle.NEWOBJ, s),
+ 2 <= proto)
+ self.assertFalse(opcode_in_pickle(pickle.NEWOBJ_EX, s))
y = self.loads(s) # will raise TypeError if __init__ called
self.assert_is_copy(x, y)
def test_complex_newobj_ex(self):
+ if self.py_version < (3, 4):
+ self.skipTest('not supported in Python < 3.4')
x = ComplexNewObjEx.__new__(ComplexNewObjEx, 0xface) # avoid __init__
x.abc = 666
for proto in protocols:
with self.subTest(proto=proto):
+ if self.py_version < (3, 6) and proto < 4:
+ self.skipTest('requires protocol 4 in Python < 3.6')
s = self.dumps(x, proto)
if proto < 1:
- self.assertIn(b'\nI64206', s) # INT
+ if self.py_version >= (3, 7):
+ self.assertIn(b'\nI64206', s) # INT
+ else: # for test_xpickle
+ self.assertIn(b'64206', s) # INT or LONG
elif proto < 2:
self.assertIn(b'M\xce\xfa', s) # BININT2
elif proto < 4:
def test_attribute_name_interning(self):
# Test that attribute names of pickled objects are interned when
# unpickling.
+ if self.py_version < (3, 0):
+ self.skipTest('"classic" classes are not interoperable with Python 2')
for proto in protocols:
x = C()
x.foo = 42
dumped = self.dumps(data, proto)
loaded = self.loads(dumped)
self.assertEqual(len(loaded), len(data))
+ if self.py_version < (3, 0):
+ data = (1, min, 'xy' * (30 * 1024), len)
self.assertEqual(loaded, data)
def test_int_pickling_efficiency(self):
# Test compacity of int representation (see issue #12744)
+ if self.py_version < (3, 3):
+ self.skipTest('not supported in Python < 3.3')
for proto in protocols:
with self.subTest(proto=proto):
pickles = [self.dumps(2**n, proto) for n in range(70)]
# Issue #17720
obj = REX_six([1, 2, 3])
for proto in protocols:
- if proto == 0:
- self._check_pickling_with_opcode(obj, pickle.APPEND, proto)
- else:
- self._check_pickling_with_opcode(obj, pickle.APPENDS, proto)
+ with self.subTest(proto=proto):
+ if proto == 0:
+ self._check_pickling_with_opcode(obj, pickle.APPEND, proto)
+ else:
+ if self.py_version < (3, 0):
+ self.skipTest('not supported in Python 2')
+ self._check_pickling_with_opcode(obj, pickle.APPENDS, proto)
def test_setitems_on_non_dicts(self):
obj = REX_seven({1: -1, 2: -2, 3: -3})
@support.skip_if_pgo_task
@support.requires_resource('cpu')
def test_framing_many_objects(self):
+ if self.py_version < (3, 4):
+ self.skipTest('not supported in Python < 3.4')
obj = list(range(10**5))
for proto in range(4, pickle.HIGHEST_PROTOCOL + 1):
with self.subTest(proto=proto):
self.check_frame_opcodes(pickled)
def test_framing_large_objects(self):
+ if self.py_version < (3, 4):
+ self.skipTest('not supported in Python < 3.4')
N = 1024 * 1024
small_items = [[i] for i in range(10)]
obj = [b'x' * N, *small_items, b'y' * N, 'z' * N]
[len(x) for x in unpickled])
# Perform full equality check if the lengths match.
self.assertEqual(obj, unpickled)
- n_frames = count_opcode(pickle.FRAME, pickled)
- # A single frame for small objects between
- # first two large objects.
- self.assertEqual(n_frames, 1)
- self.check_frame_opcodes(pickled)
+ if self.py_version >= (3, 7):
+ n_frames = count_opcode(pickle.FRAME, pickled)
+ # A single frame for small objects between
+ # first two large objects.
+ self.assertEqual(n_frames, 1)
+ self.check_frame_opcodes(pickled)
def test_optional_frames(self):
- if pickle.HIGHEST_PROTOCOL < 4:
- return
+ if self.py_version < (3, 4):
+ self.skipTest('not supported in Python < 3.4')
def remove_frames(pickled, keep_frame=None):
"""Remove frame opcodes from the given pickle."""
@support.skip_if_pgo_task
def test_framed_write_sizes_with_delayed_writer(self):
+ if self.py_version < (3, 4):
+ self.skipTest('not supported in Python < 3.4')
+
class ChunkAccumulator:
"""Accumulate pickler output in a list of raw chunks."""
def __init__(self):
chunk_sizes)
def test_nested_names(self):
- global Nested
- class Nested:
- class A:
- class B:
- class C:
- pass
+ if self.py_version < (3, 4):
+ self.skipTest('not supported in Python < 3.4')
+ # required protocol 4 in Python 3.4
for proto in range(pickle.HIGHEST_PROTOCOL + 1):
+ if self.py_version < (3, 5) and proto < 4:
+ continue
for obj in [Nested.A, Nested.A.B, Nested.A.B.C]:
with self.subTest(proto=proto, obj=obj):
unpickled = self.loads(self.dumps(obj, proto))
del Recursive.ref # break reference loop
def test_py_methods(self):
- global PyMethodsTest
- class PyMethodsTest:
- @staticmethod
- def cheese():
- return "cheese"
- @classmethod
- def wine(cls):
- assert cls is PyMethodsTest
- return "wine"
- def biscuits(self):
- assert isinstance(self, PyMethodsTest)
- return "biscuits"
- class Nested:
- "Nested class"
- @staticmethod
- def ketchup():
- return "ketchup"
- @classmethod
- def maple(cls):
- assert cls is PyMethodsTest.Nested
- return "maple"
- def pie(self):
- assert isinstance(self, PyMethodsTest.Nested)
- return "pie"
-
+ if self.py_version < (3, 4):
+ self.skipTest('not supported in Python < 3.4')
py_methods = (
- PyMethodsTest.cheese,
PyMethodsTest.wine,
PyMethodsTest().biscuits,
+ )
+ for proto in range(pickle.HIGHEST_PROTOCOL + 1):
+ for method in py_methods:
+ with self.subTest(proto=proto, method=method):
+ unpickled = self.loads(self.dumps(method, proto))
+ self.assertEqual(method(), unpickled())
+
+ # required protocol 4 in Python 3.4
+ py_methods = (
+ PyMethodsTest.cheese,
PyMethodsTest.Nested.ketchup,
PyMethodsTest.Nested.maple,
PyMethodsTest.Nested().pie
(PyMethodsTest.Nested.pie, PyMethodsTest.Nested)
)
for proto in range(pickle.HIGHEST_PROTOCOL + 1):
+ if self.py_version < (3, 5) and proto < 4:
+ continue
for method in py_methods:
with self.subTest(proto=proto, method=method):
unpickled = self.loads(self.dumps(method, proto))
self.assertRaises(TypeError, self.dumps, descr, proto)
def test_c_methods(self):
- global Subclass
- class Subclass(tuple):
- class Nested(str):
- pass
-
+ if self.py_version < (3, 4):
+ self.skipTest('not supported in Python < 3.4')
c_methods = (
# bound built-in method
("abcd".index, ("c",)),
# subclass methods
(Subclass([1,2,2]).count, (2,)),
(Subclass.count, (Subclass([1,2,2]), 2)),
- (Subclass.Nested("sweet").count, ("e",)),
(Subclass.Nested.count, (Subclass.Nested("sweet"), "e")),
)
for proto in range(pickle.HIGHEST_PROTOCOL + 1):
unpickled = self.loads(self.dumps(method, proto))
self.assertEqual(method(*args), unpickled(*args))
+ # required protocol 4 in Python 3.4
+ c_methods = (
+ (Subclass.Nested("sweet").count, ("e",)),
+ )
+ for proto in range(pickle.HIGHEST_PROTOCOL + 1):
+ if self.py_version < (3, 5) and proto < 4:
+ continue
+ for method, args in c_methods:
+ with self.subTest(proto=proto, method=method):
+ unpickled = self.loads(self.dumps(method, proto))
+ self.assertEqual(method(*args), unpickled(*args))
+
descriptors = (
bytearray.__dict__['maketrans'], # built-in static method descriptor
dict.__dict__['fromkeys'], # built-in class method descriptor
self.assertRaises(TypeError, self.dumps, descr, proto)
def test_compat_pickle(self):
+ if self.py_version < (3, 4):
+ self.skipTest("doesn't work in Python < 3.4'")
tests = [
(range(1, 7), '__builtin__', 'xrange'),
(map(int, '123'), 'itertools', 'imap'),
data = None
-# Test classes for reduce_ex
-
-class R:
- def __init__(self, reduce=None):
- self.reduce = reduce
- def __reduce__(self, proto):
- return self.reduce
-
-class REX:
- def __init__(self, reduce_ex=None):
- self.reduce_ex = reduce_ex
- def __reduce_ex__(self, proto):
- return self.reduce_ex
-
-class REX_one(object):
- """No __reduce_ex__ here, but inheriting it from object"""
- _reduce_called = 0
- def __reduce__(self):
- self._reduce_called = 1
- return REX_one, ()
-
-class REX_two(object):
- """No __reduce__ here, but inheriting it from object"""
- _proto = None
- def __reduce_ex__(self, proto):
- self._proto = proto
- return REX_two, ()
-
-class REX_three(object):
- _proto = None
- def __reduce_ex__(self, proto):
- self._proto = proto
- return REX_two, ()
- def __reduce__(self):
- raise TestFailed("This __reduce__ shouldn't be called")
-
-class REX_four(object):
- """Calling base class method should succeed"""
- _proto = None
- def __reduce_ex__(self, proto):
- self._proto = proto
- return object.__reduce_ex__(self, proto)
-
-class REX_five(object):
- """This one used to fail with infinite recursion"""
- _reduce_called = 0
- def __reduce__(self):
- self._reduce_called = 1
- return object.__reduce__(self)
-
-class REX_six(object):
- """This class is used to check the 4th argument (list iterator) of
- the reduce protocol.
- """
- def __init__(self, items=None):
- self.items = items if items is not None else []
- def __eq__(self, other):
- return type(self) is type(other) and self.items == other.items
- def append(self, item):
- self.items.append(item)
- def __reduce__(self):
- return type(self), (), None, iter(self.items), None
-
-class REX_seven(object):
- """This class is used to check the 5th argument (dict iterator) of
- the reduce protocol.
- """
- def __init__(self, table=None):
- self.table = table if table is not None else {}
- def __eq__(self, other):
- return type(self) is type(other) and self.table == other.table
- def __setitem__(self, key, value):
- self.table[key] = value
- def __reduce__(self):
- return type(self), (), None, None, iter(self.table.items())
-
-class REX_state(object):
- """This class is used to check the 3th argument (state) of
- the reduce protocol.
- """
- def __init__(self, state=None):
- self.state = state
- def __eq__(self, other):
- return type(self) is type(other) and self.state == other.state
- def __setstate__(self, state):
- self.state = state
- def __reduce__(self):
- return type(self), (), self.state
-
-class REX_None:
- """ Setting __reduce_ex__ to None should fail """
- __reduce_ex__ = None
-
-class R_None:
- """ Setting __reduce__ to None should fail """
- __reduce__ = None
-
-class C_None_setstate:
- """ Setting __setstate__ to None should fail """
- def __getstate__(self):
- return 1
-
- __setstate__ = None
-
class CustomError(Exception):
pass
UNPICKLEABLE = Unpickleable()
+# For test_unpickleable_reconstructor and test_unpickleable_state_setter
class UnpickleableCallable(Unpickleable):
def __call__(self, *args, **kwargs):
pass
-
-# Test classes for newobj
-
-class MyInt(int):
- sample = 1
-
-class MyFloat(float):
- sample = 1.0
-
-class MyComplex(complex):
- sample = 1.0 + 0.0j
-
-class MyStr(str):
- sample = "hello"
-
-class MyUnicode(str):
- sample = "hello \u1234"
-
-class MyTuple(tuple):
- sample = (1, 2, 3)
-
-class MyList(list):
- sample = [1, 2, 3]
-
-class MyDict(dict):
- sample = {"a": 1, "b": 2}
-
-class MySet(set):
- sample = {"a", "b"}
-
-class MyFrozenSet(frozenset):
- sample = frozenset({"a", "b"})
-
-myclasses = [MyInt, MyFloat,
- MyComplex,
- MyStr, MyUnicode,
- MyTuple, MyList, MyDict, MySet, MyFrozenSet]
-
-class MyIntWithNew(int):
- def __new__(cls, value):
- raise AssertionError
-
-class MyIntWithNew2(MyIntWithNew):
- __new__ = int.__new__
-
-
-class SlotList(MyList):
- __slots__ = ["foo"]
-
-# Ruff "redefined while unused" false positive here due to `global` variables
-# being assigned (and then restored) from within test methods earlier in the file
-class SimpleNewObj(int): # noqa: F811
- def __init__(self, *args, **kwargs):
- # raise an error, to make sure this isn't called
- raise TypeError("SimpleNewObj.__init__() didn't expect to get called")
- def __eq__(self, other):
- return int(self) == int(other) and self.__dict__ == other.__dict__
-
-class ComplexNewObj(SimpleNewObj):
- def __getnewargs__(self):
- return ('%X' % self, 16)
-
-class ComplexNewObjEx(SimpleNewObj):
- def __getnewargs_ex__(self):
- return ('%X' % self,), {'base': 16}
-
+# For test_bad_getattr
class BadGetattr:
def __getattr__(self, key):
self.foo
+# For test_bad_newobj_class and test_bad_newobj_ex__class
class NoNew:
def __getattribute__(self, name):
if name == '__new__':