import unittest
from test import support
-from test.support import threading_helper
import gc
import io
import pickle
import sys
import weakref
-import threading
class IntLike:
def __init__(self, num):
for newline in (None, "", "\n", "\r", "\r\n"):
self.ioclass(newline=newline)
- @unittest.skipUnless(support.Py_GIL_DISABLED, "only meaningful under free-threading")
- @threading_helper.requires_working_threading()
- def test_concurrent_use(self):
- memio = self.ioclass("")
-
- def use():
- memio.write("x" * 10)
- memio.readlines()
-
- threads = [threading.Thread(target=use) for _ in range(8)]
- with threading_helper.catch_threading_exception() as cm:
- with threading_helper.start_threads(threads):
- pass
-
- self.assertIsNone(cm.exc_value)
-
class PyStringIOTest(MemoryTestMixin, MemorySeekTestMixin,
TextIOTestMixin, unittest.TestCase):
self.assertRaises(ValueError, memio.__setstate__, ("closed", "", 0, None))
-
class CStringIOPickleTest(PyStringIOPickleTest):
UnsupportedOperation = io.UnsupportedOperation
}
static PyObject *
-stringio_iternext_lock_held(PyObject *op)
+stringio_iternext(PyObject *op)
{
PyObject *line;
stringio *self = stringio_CAST(op);
return line;
}
-static PyObject *
-stringio_iternext(PyObject *op)
-{
- PyObject *res;
- Py_BEGIN_CRITICAL_SECTION(op);
- res = stringio_iternext_lock_held(op);
- Py_END_CRITICAL_SECTION();
- return res;
-}
-
/*[clinic input]
@critical_section
_io.StringIO.truncate