_Py_atomic_store_ushort_relaxed(&value, new_value)
#define FT_ATOMIC_LOAD_USHORT_RELAXED(value) \
_Py_atomic_load_ushort_relaxed(&value)
+#define FT_ATOMIC_LOAD_INT(value) \
+ _Py_atomic_load_int(&value)
+#define FT_ATOMIC_STORE_INT(value, new_value) \
+ _Py_atomic_store_int(&value, new_value)
#define FT_ATOMIC_STORE_INT_RELAXED(value, new_value) \
_Py_atomic_store_int_relaxed(&value, new_value)
#define FT_ATOMIC_LOAD_INT_RELAXED(value) \
#define FT_ATOMIC_STORE_SHORT_RELAXED(value, new_value) value = new_value
#define FT_ATOMIC_LOAD_USHORT_RELAXED(value) value
#define FT_ATOMIC_STORE_USHORT_RELAXED(value, new_value) value = new_value
+#define FT_ATOMIC_LOAD_INT(value) value
+#define FT_ATOMIC_STORE_INT(value, new_value) value = new_value
#define FT_ATOMIC_LOAD_INT_RELAXED(value) value
#define FT_ATOMIC_STORE_INT_RELAXED(value, new_value) value = new_value
#define FT_ATOMIC_LOAD_UINT_RELAXED(value) value
--- /dev/null
+import re
+import unittest
+
+from test.support import threading_helper
+from test.support.threading_helper import run_concurrently
+
+
+NTHREADS = 10
+
+
+@threading_helper.requires_working_threading()
+class TestRe(unittest.TestCase):
+ def test_pattern_sub(self):
+ """Pattern substitution should work across threads"""
+ pattern = re.compile(r"\w+@\w+\.\w+")
+ text = "e-mail: test@python.org or user@pycon.org. " * 5
+ results = []
+
+ def worker():
+ substituted = pattern.sub("(redacted)", text)
+ results.append(substituted.count("(redacted)"))
+
+ run_concurrently(worker_func=worker, nthreads=NTHREADS)
+ self.assertEqual(results, [2 * 5] * NTHREADS)
+
+ def test_pattern_search(self):
+ """Pattern search should work across threads."""
+ emails = ["alice@python.org", "bob@pycon.org"] * 10
+ pattern = re.compile(r"\w+@\w+\.\w+")
+ results = []
+
+ def worker():
+ matches = [pattern.search(e).group() for e in emails]
+ results.append(len(matches))
+
+ run_concurrently(worker_func=worker, nthreads=NTHREADS)
+ self.assertEqual(results, [2 * 10] * NTHREADS)
+
+ def test_scanner_concurrent_access(self):
+ """Shared scanner should reject concurrent access."""
+ pattern = re.compile(r"\w+")
+ scanner = pattern.scanner("word " * 10)
+
+ def worker():
+ for _ in range(100):
+ try:
+ scanner.search()
+ except ValueError as e:
+ if "already executing" in str(e):
+ pass
+ else:
+ raise
+
+ run_concurrently(worker_func=worker, nthreads=NTHREADS)
+ # This test has no assertions. Its purpose is to catch crashes and
+ # enable thread sanitizer to detect race conditions. While "already
+ # executing" errors are very likely, they're not guaranteed due to
+ # non-deterministic thread scheduling, so we can't assert errors > 0.
+
+
+if __name__ == "__main__":
+ unittest.main()
static int
scanner_begin(ScannerObject* self)
{
- if (self->executing) {
+#ifdef Py_GIL_DISABLED
+ int was_executing = _Py_atomic_exchange_int(&self->executing, 1);
+#else
+ int was_executing = self->executing;
+ self->executing = 1;
+#endif
+ if (was_executing) {
PyErr_SetString(PyExc_ValueError,
"regular expression scanner already executing");
return 0;
}
- self->executing = 1;
return 1;
}
static void
scanner_end(ScannerObject* self)
{
- assert(self->executing);
- self->executing = 0;
+ assert(FT_ATOMIC_LOAD_INT_RELAXED(self->executing));
+ FT_ATOMIC_STORE_INT(self->executing, 0);
}
/*[clinic input]