import struct
import threading
import gc
+import warnings
+
+def pickle_deprecated(testfunc):
+ """ Run the test three times.
+ First, verify that a Deprecation Warning is raised.
+ Second, run normally but with DeprecationWarnings temporarily disabled.
+ Third, run with warnings promoted to errors.
+ """
+ def inner(self):
+ with self.assertWarns(DeprecationWarning):
+ testfunc(self)
+ with warnings.catch_warnings():
+ warnings.simplefilter("ignore", category=DeprecationWarning)
+ testfunc(self)
+ with warnings.catch_warnings():
+ warnings.simplefilter("error", category=DeprecationWarning)
+ with self.assertRaises((DeprecationWarning, AssertionError, SystemError)):
+ testfunc(self)
+
+ return inner
maxsize = support.MAX_Py_ssize_t
minsize = -maxsize-1
c = expand(compare[took:])
self.assertEqual(a, c);
+ @pickle_deprecated
def test_accumulate(self):
self.assertEqual(list(accumulate(range(10))), # one positional arg
[0, 1, 3, 6, 10, 15, 21, 28, 36, 45])
self.assertRaises(TypeError, list, chain.from_iterable([2, 3]))
self.assertEqual(list(islice(chain.from_iterable(repeat(range(5))), 2)), [0, 1])
+ @pickle_deprecated
def test_chain_reducible(self):
for oper in [copy.deepcopy] + picklecopiers:
it = chain('abc', 'def')
for proto in range(pickle.HIGHEST_PROTOCOL + 1):
self.pickletest(proto, chain('abc', 'def'), compare=list('abcdef'))
+ @pickle_deprecated
def test_chain_setstate(self):
self.assertRaises(TypeError, chain().__setstate__, ())
self.assertRaises(TypeError, chain().__setstate__, [])
it.__setstate__((iter(['abc', 'def']), iter(['ghi'])))
self.assertEqual(list(it), ['ghi', 'a', 'b', 'c', 'd', 'e', 'f'])
+ @pickle_deprecated
def test_combinations(self):
self.assertRaises(TypeError, combinations, 'abc') # missing r argument
self.assertRaises(TypeError, combinations, 'abc', 2, 1) # too many arguments
self.assertEqual(list(op(testIntermediate)),
[(0,1,3), (0,2,3), (1,2,3)])
-
def combinations1(iterable, r):
'Pure python version shown in the docs'
pool = tuple(iterable)
self.assertEqual(len(set(map(id, combinations('abcde', 3)))), 1)
self.assertNotEqual(len(set(map(id, list(combinations('abcde', 3))))), 1)
+ @pickle_deprecated
def test_combinations_with_replacement(self):
cwr = combinations_with_replacement
self.assertRaises(TypeError, cwr, 'abc') # missing r argument
self.assertEqual(len(set(map(id, cwr('abcde', 3)))), 1)
self.assertNotEqual(len(set(map(id, list(cwr('abcde', 3))))), 1)
+ @pickle_deprecated
def test_permutations(self):
self.assertRaises(TypeError, permutations) # too few arguments
self.assertRaises(TypeError, permutations, 'abc', 2, 1) # too many arguments
self.assertEqual(comb, list(filter(set(perm).__contains__, cwr))) # comb: cwr that is a perm
self.assertEqual(comb, sorted(set(cwr) & set(perm))) # comb: both a cwr and a perm
+ @pickle_deprecated
def test_compress(self):
self.assertEqual(list(compress(data='ABCDEF', selectors=[1,0,1,0,1,1])), list('ACEF'))
self.assertEqual(list(compress('ABCDEF', [1,0,1,0,1,1])), list('ACEF'))
next(testIntermediate)
self.assertEqual(list(op(testIntermediate)), list(result2))
-
+ @pickle_deprecated
def test_count(self):
self.assertEqual(lzip('abc',count()), [('a', 0), ('b', 1), ('c', 2)])
self.assertEqual(lzip('abc',count(3)), [('a', 3), ('b', 4), ('c', 5)])
#check proper internal error handling for large "step' sizes
count(1, maxsize+5); sys.exc_info()
+ @pickle_deprecated
def test_count_with_stride(self):
self.assertEqual(lzip('abc',count(2,3)), [('a', 2), ('b', 5), ('c', 8)])
self.assertEqual(lzip('abc',count(start=2,step=3)),
self.assertRaises(TypeError, cycle, 5)
self.assertEqual(list(islice(cycle(gen3()),10)), [0,1,2,0,1,2,0,1,2,0])
+ @pickle_deprecated
def test_cycle_copy_pickle(self):
# check copy, deepcopy, pickle
c = cycle('abc')
d = pickle.loads(p) # rebuild the cycle object
self.assertEqual(take(20, d), list('cdeabcdeabcdeabcdeab'))
+ @pickle_deprecated
def test_cycle_unpickle_compat(self):
testcases = [
b'citertools\ncycle\n(c__builtin__\niter\n((lI1\naI2\naI3\natRI1\nbtR((lI1\naI0\ntb.',
it = pickle.loads(t)
self.assertEqual(take(10, it), [2, 3, 1, 2, 3, 1, 2, 3, 1, 2])
+ @pickle_deprecated
def test_cycle_setstate(self):
# Verify both modes for restoring state
self.assertRaises(TypeError, cycle('').__setstate__, ())
self.assertRaises(TypeError, cycle('').__setstate__, ([],))
+ @pickle_deprecated
def test_groupby(self):
# Check whether it accepts arguments correctly
self.assertEqual([], list(groupby([])))
c = filter(isEven, range(6))
self.pickletest(proto, c)
+ @pickle_deprecated
def test_filterfalse(self):
self.assertEqual(list(filterfalse(isEven, range(6))), [1,3,5])
self.assertEqual(list(filterfalse(None, [0,1,0,2,0])), [0,0,0])
lzip('abc', 'def'))
@support.impl_detail("tuple reuse is specific to CPython")
+ @pickle_deprecated
def test_zip_tuple_reuse(self):
ids = list(map(id, zip('abc', 'def')))
self.assertEqual(min(ids), max(ids))
ids = list(map(id, list(zip_longest('abc', 'def'))))
self.assertEqual(len(dict.fromkeys(ids)), len(ids))
+ @pickle_deprecated
def test_zip_longest_pickling(self):
for proto in range(pickle.HIGHEST_PROTOCOL + 1):
self.pickletest(proto, zip_longest("abc", "def"))
self.assertEqual(len(set(map(id, product('abc', 'def')))), 1)
self.assertNotEqual(len(set(map(id, list(product('abc', 'def'))))), 1)
+ @pickle_deprecated
def test_product_pickling(self):
# check copy, deepcopy, pickle
for args, result in [
for proto in range(pickle.HIGHEST_PROTOCOL + 1):
self.pickletest(proto, product(*args))
+ @pickle_deprecated
def test_product_issue_25021(self):
# test that indices are properly clamped to the length of the tuples
p = product((1, 2),(3,))
p.__setstate__((0, 0, 0x1000)) # will access tuple element 1 if not clamped
self.assertRaises(StopIteration, next, p)
+ @pickle_deprecated
def test_repeat(self):
self.assertEqual(list(repeat(object='a', times=3)), ['a', 'a', 'a'])
self.assertEqual(lzip(range(3),repeat('a')),
self.assertEqual(repr(repeat('a', times=-1)), "repeat('a', 0)")
self.assertEqual(repr(repeat('a', times=-2)), "repeat('a', 0)")
+ @pickle_deprecated
def test_map(self):
self.assertEqual(list(map(operator.pow, range(3), range(1,7))),
[0**1, 1**2, 2**3])
c = map(tupleize, 'abc', count())
self.pickletest(proto, c)
+ @pickle_deprecated
def test_starmap(self):
self.assertEqual(list(starmap(operator.pow, zip(range(3), range(1,7)))),
[0**1, 1**2, 2**3])
c = starmap(operator.pow, zip(range(3), range(1,7)))
self.pickletest(proto, c)
+ @pickle_deprecated
def test_islice(self):
for args in [ # islice(args) should agree with range(args)
(10, 20, 3),
self.assertEqual(list(islice(range(100), IntLike(10), IntLike(50), IntLike(5))),
list(range(10,50,5)))
+ @pickle_deprecated
def test_takewhile(self):
data = [1, 3, 5, 20, 2, 4, 6, 8]
self.assertEqual(list(takewhile(underten, data)), [1, 3, 5])
for proto in range(pickle.HIGHEST_PROTOCOL + 1):
self.pickletest(proto, takewhile(underten, data))
+ @pickle_deprecated
def test_dropwhile(self):
data = [1, 3, 5, 20, 2, 4, 6, 8]
self.assertEqual(list(dropwhile(underten, data)), [20, 2, 4, 6, 8])
for proto in range(pickle.HIGHEST_PROTOCOL + 1):
self.pickletest(proto, dropwhile(underten, data))
+ @pickle_deprecated
def test_tee(self):
n = 200
def test_accumulate(self):
self.assertEqual(list(accumulate([1,2,3,4,5])), [1, 3, 6, 10, 15])
+ @pickle_deprecated
def test_accumulate_reducible(self):
# check copy, deepcopy, pickle
data = [1, 2, 3, 4, 5]
self.assertEqual(list(copy.deepcopy(it)), accumulated[1:])
self.assertEqual(list(copy.copy(it)), accumulated[1:])
+ @pickle_deprecated
def test_accumulate_reducible_none(self):
# Issue #25718: total is None
it = accumulate([None, None, None], operator.is_)
#undef clinic_state_by_cls
#undef clinic_state
+/* Deprecation of pickle support: GH-101588 *********************************/
+
+#define ITERTOOL_PICKLE_DEPRECATION \
+ if (PyErr_WarnEx( \
+ PyExc_DeprecationWarning, \
+ "Itertool pickle/copy/deepcopy support " \
+ "will be removed in a Python 3.14.", 1) < 0) { \
+ return NULL; \
+ }
+
/* batched object ************************************************************/
/* Note: The built-in zip() function includes a "strict" argument
/* reduce as a 'new' call with an optional 'setstate' if groupby
* has started
*/
+ ITERTOOL_PICKLE_DEPRECATION;
PyObject *value;
if (lz->tgtkey && lz->currkey && lz->currvalue)
value = Py_BuildValue("O(OO)(OOO)", Py_TYPE(lz),
static PyObject *
groupby_setstate(groupbyobject *lz, PyObject *state)
{
+ ITERTOOL_PICKLE_DEPRECATION;
PyObject *currkey, *currvalue, *tgtkey;
if (!PyTuple_Check(state)) {
PyErr_SetString(PyExc_TypeError, "state is not a tuple");
static PyObject *
_grouper_reduce(_grouperobject *lz, PyObject *Py_UNUSED(ignored))
{
+ ITERTOOL_PICKLE_DEPRECATION;
if (((groupbyobject *)lz->parent)->currgrouper != lz) {
return Py_BuildValue("N(())", _PyEval_GetBuiltin(&_Py_ID(iter)));
}
static PyObject *
teedataobject_reduce(teedataobject *tdo, PyObject *Py_UNUSED(ignored))
{
+ ITERTOOL_PICKLE_DEPRECATION;
int i;
/* create a temporary list of already iterated values */
PyObject *values = PyList_New(tdo->numread);
static PyObject *
tee_reduce(teeobject *to, PyObject *Py_UNUSED(ignored))
{
+ ITERTOOL_PICKLE_DEPRECATION;
return Py_BuildValue("O(())(Oi)", Py_TYPE(to), to->dataobj, to->index);
}
static PyObject *
tee_setstate(teeobject *to, PyObject *state)
{
+ ITERTOOL_PICKLE_DEPRECATION;
teedataobject *tdo;
int index;
if (!PyTuple_Check(state)) {
static PyObject *
cycle_reduce(cycleobject *lz, PyObject *Py_UNUSED(ignored))
{
+ ITERTOOL_PICKLE_DEPRECATION;
/* Create a new cycle with the iterator tuple, then set the saved state */
if (lz->it == NULL) {
PyObject *it = PyObject_GetIter(lz->saved);
static PyObject *
cycle_setstate(cycleobject *lz, PyObject *state)
{
+ ITERTOOL_PICKLE_DEPRECATION;
PyObject *saved=NULL;
int firstpass;
if (!PyTuple_Check(state)) {
static PyObject *
dropwhile_reduce(dropwhileobject *lz, PyObject *Py_UNUSED(ignored))
{
+ ITERTOOL_PICKLE_DEPRECATION;
return Py_BuildValue("O(OO)l", Py_TYPE(lz), lz->func, lz->it, lz->start);
}
static PyObject *
dropwhile_setstate(dropwhileobject *lz, PyObject *state)
{
+ ITERTOOL_PICKLE_DEPRECATION;
int start = PyObject_IsTrue(state);
if (start < 0)
return NULL;
static PyObject *
takewhile_reduce(takewhileobject *lz, PyObject *Py_UNUSED(ignored))
{
+ ITERTOOL_PICKLE_DEPRECATION;
return Py_BuildValue("O(OO)l", Py_TYPE(lz), lz->func, lz->it, lz->stop);
}
static PyObject *
takewhile_reduce_setstate(takewhileobject *lz, PyObject *state)
{
+ ITERTOOL_PICKLE_DEPRECATION;
int stop = PyObject_IsTrue(state);
if (stop < 0)
static PyObject *
islice_reduce(isliceobject *lz, PyObject *Py_UNUSED(ignored))
{
+ ITERTOOL_PICKLE_DEPRECATION;
/* When unpickled, generate a new object with the same bounds,
* then 'setstate' with the next and count
*/
static PyObject *
islice_setstate(isliceobject *lz, PyObject *state)
{
+ ITERTOOL_PICKLE_DEPRECATION;
Py_ssize_t cnt = PyLong_AsSsize_t(state);
if (cnt == -1 && PyErr_Occurred())
static PyObject *
starmap_reduce(starmapobject *lz, PyObject *Py_UNUSED(ignored))
{
+ ITERTOOL_PICKLE_DEPRECATION;
/* Just pickle the iterator */
return Py_BuildValue("O(OO)", Py_TYPE(lz), lz->func, lz->it);
}
static PyObject *
chain_reduce(chainobject *lz, PyObject *Py_UNUSED(ignored))
{
+ ITERTOOL_PICKLE_DEPRECATION;
if (lz->source) {
/* we can't pickle function objects (itertools.from_iterable) so
* we must use setstate to replace the iterable. One day we
static PyObject *
chain_setstate(chainobject *lz, PyObject *state)
{
+ ITERTOOL_PICKLE_DEPRECATION;
PyObject *source, *active=NULL;
if (!PyTuple_Check(state)) {
static PyObject *
product_reduce(productobject *lz, PyObject *Py_UNUSED(ignored))
{
+ ITERTOOL_PICKLE_DEPRECATION;
if (lz->stopped) {
return Py_BuildValue("O(())", Py_TYPE(lz));
} else if (lz->result == NULL) {
static PyObject *
product_setstate(productobject *lz, PyObject *state)
{
+ ITERTOOL_PICKLE_DEPRECATION;
PyObject *result;
Py_ssize_t n, i;
static PyObject *
combinations_reduce(combinationsobject *lz, PyObject *Py_UNUSED(ignored))
{
+ ITERTOOL_PICKLE_DEPRECATION;
if (lz->result == NULL) {
return Py_BuildValue("O(On)", Py_TYPE(lz), lz->pool, lz->r);
} else if (lz->stopped) {
static PyObject *
combinations_setstate(combinationsobject *lz, PyObject *state)
{
+ ITERTOOL_PICKLE_DEPRECATION;
PyObject *result;
Py_ssize_t i;
Py_ssize_t n = PyTuple_GET_SIZE(lz->pool);
static PyObject *
cwr_reduce(cwrobject *lz, PyObject *Py_UNUSED(ignored))
{
+ ITERTOOL_PICKLE_DEPRECATION;
if (lz->result == NULL) {
return Py_BuildValue("O(On)", Py_TYPE(lz), lz->pool, lz->r);
} else if (lz->stopped) {
static PyObject *
cwr_setstate(cwrobject *lz, PyObject *state)
{
+ ITERTOOL_PICKLE_DEPRECATION;
PyObject *result;
Py_ssize_t n, i;
static PyObject *
permutations_reduce(permutationsobject *po, PyObject *Py_UNUSED(ignored))
{
+ ITERTOOL_PICKLE_DEPRECATION;
if (po->result == NULL) {
return Py_BuildValue("O(On)", Py_TYPE(po), po->pool, po->r);
} else if (po->stopped) {
static PyObject *
permutations_setstate(permutationsobject *po, PyObject *state)
{
+ ITERTOOL_PICKLE_DEPRECATION;
PyObject *indices, *cycles, *result;
Py_ssize_t n, i;
static PyObject *
accumulate_reduce(accumulateobject *lz, PyObject *Py_UNUSED(ignored))
{
+ ITERTOOL_PICKLE_DEPRECATION;
itertools_state *state = lz->state;
if (lz->initial != Py_None) {
static PyObject *
accumulate_setstate(accumulateobject *lz, PyObject *state)
{
+ ITERTOOL_PICKLE_DEPRECATION;
Py_INCREF(state);
Py_XSETREF(lz->total, state);
Py_RETURN_NONE;
static PyObject *
compress_reduce(compressobject *lz, PyObject *Py_UNUSED(ignored))
{
+ ITERTOOL_PICKLE_DEPRECATION;
return Py_BuildValue("O(OO)", Py_TYPE(lz),
lz->data, lz->selectors);
}
static PyObject *
filterfalse_reduce(filterfalseobject *lz, PyObject *Py_UNUSED(ignored))
{
+ ITERTOOL_PICKLE_DEPRECATION;
return Py_BuildValue("O(OO)", Py_TYPE(lz), lz->func, lz->it);
}
static PyObject *
count_reduce(countobject *lz, PyObject *Py_UNUSED(ignored))
{
+ ITERTOOL_PICKLE_DEPRECATION;
if (lz->cnt == PY_SSIZE_T_MAX)
return Py_BuildValue("O(OO)", Py_TYPE(lz), lz->long_cnt, lz->long_step);
return Py_BuildValue("O(n)", Py_TYPE(lz), lz->cnt);
static PyObject *
repeat_reduce(repeatobject *ro, PyObject *Py_UNUSED(ignored))
{
+ ITERTOOL_PICKLE_DEPRECATION;
/* unpickle this so that a new repeat iterator is constructed with an
* object, then call __setstate__ on it to set cnt
*/
static PyObject *
zip_longest_reduce(ziplongestobject *lz, PyObject *Py_UNUSED(ignored))
{
-
+ ITERTOOL_PICKLE_DEPRECATION;
/* Create a new tuple with empty sequences where appropriate to pickle.
* Then use setstate to set the fillvalue
*/
static PyObject *
zip_longest_setstate(ziplongestobject *lz, PyObject *state)
{
+ ITERTOOL_PICKLE_DEPRECATION;
Py_INCREF(state);
Py_XSETREF(lz->fillvalue, state);
Py_RETURN_NONE;