import unittest
from test import support
+from test.support import threading_helper
import gc
import io
import pickle
import sys
import weakref
+import threading
class IntLike:
def __init__(self, num):
for newline in (None, "", "\n", "\r", "\r\n"):
self.ioclass(newline=newline)
+ @unittest.skipUnless(support.Py_GIL_DISABLED, "only meaningful under free-threading")
+ @threading_helper.requires_working_threading()
+ def test_concurrent_use(self):
+ memio = self.ioclass("")
+
+ def use():
+ memio.write("x" * 10)
+ memio.readlines()
+
+ threads = [threading.Thread(target=use) for _ in range(8)]
+ with threading_helper.catch_threading_exception() as cm:
+ with threading_helper.start_threads(threads):
+ pass
+
+ self.assertIsNone(cm.exc_value)
+
class PyStringIOTest(MemoryTestMixin, MemorySeekTestMixin,
TextIOTestMixin, unittest.TestCase):
self.assertRaises(ValueError, memio.__setstate__, ("closed", "", 0, None))
+
class CStringIOPickleTest(PyStringIOPickleTest):
UnsupportedOperation = io.UnsupportedOperation
}
static PyObject *
-stringio_iternext(PyObject *op)
+stringio_iternext_lock_held(PyObject *op)
{
PyObject *line;
stringio *self = stringio_CAST(op);
return line;
}
+static PyObject *
+stringio_iternext(PyObject *op)
+{
+ PyObject *res;
+ Py_BEGIN_CRITICAL_SECTION(op);
+ res = stringio_iternext_lock_held(op);
+ Py_END_CRITICAL_SECTION();
+ return res;
+}
+
/*[clinic input]
@critical_section
_io.StringIO.truncate