/* other API */
typedef enum _framestate {
- FRAME_CREATED = -3,
- FRAME_SUSPENDED = -2,
- FRAME_SUSPENDED_YIELD_FROM = -1,
+ FRAME_CREATED = -4,
+ FRAME_SUSPENDED = -3,
+ FRAME_SUSPENDED_YIELD_FROM = -2,
+ FRAME_SUSPENDED_YIELD_FROM_LOCKED = -1,
FRAME_EXECUTING = 0,
FRAME_COMPLETED = 1,
FRAME_CLEARED = 4
} PyFrameState;
-#define FRAME_STATE_SUSPENDED(S) ((S) == FRAME_SUSPENDED || (S) == FRAME_SUSPENDED_YIELD_FROM)
+#define FRAME_STATE_SUSPENDED(S) ((S) >= FRAME_SUSPENDED && (S) <= FRAME_SUSPENDED_YIELD_FROM_LOCKED)
#define FRAME_STATE_FINISHED(S) ((S) >= FRAME_COMPLETED)
#ifdef __cplusplus
// error messages) otherwise returns 0.
extern int _PyMutex_TryUnlock(PyMutex *m);
+// Yield the processor to other threads (e.g., sched_yield).
+extern void _Py_yield(void);
+
// PyEvent is a one-time event notification
typedef struct {
return unittest.skipUnless(can_start_thread, msg)
-def run_concurrently(worker_func, nthreads, args=(), kwargs={}):
+def run_concurrently(worker_func, nthreads=None, args=(), kwargs={}):
"""
- Run the worker function concurrently in multiple threads.
+ Run the worker function(s) concurrently in multiple threads.
+
+ If `worker_func` is a single callable, it is used for all threads.
+ If it is a list of callables, each callable is used for one thread.
"""
+ from collections.abc import Iterable
+
+ if nthreads is None:
+ nthreads = len(worker_func)
+ if not isinstance(worker_func, Iterable):
+ worker_func = [worker_func] * nthreads
+ assert len(worker_func) == nthreads
+
barrier = threading.Barrier(nthreads)
- def wrapper_func(*args, **kwargs):
+ def wrapper_func(func, *args, **kwargs):
# Wait for all threads to reach this point before proceeding.
barrier.wait()
- worker_func(*args, **kwargs)
+ func(*args, **kwargs)
with catch_threading_exception() as cm:
workers = [
- threading.Thread(target=wrapper_func, args=args, kwargs=kwargs)
- for _ in range(nthreads)
+ threading.Thread(target=wrapper_func, args=(func, *args), kwargs=kwargs)
+ for func in worker_func
]
with start_threads(workers):
pass
import concurrent.futures
+import itertools
+import threading
import unittest
from threading import Barrier
from unittest import TestCase
g = gen()
threading_helper.run_concurrently(drive_generator, self.NUM_THREADS, args=(g,))
+
+ def test_concurrent_gi_yieldfrom(self):
+ def gen_yield_from():
+ yield from itertools.count()
+
+ g = gen_yield_from()
+ next(g) # Put in FRAME_SUSPENDED_YIELD_FROM state
+
+ def read_yieldfrom(gen):
+ for _ in range(10000):
+ self.assertIsNotNone(gen.gi_yieldfrom)
+
+ threading_helper.run_concurrently(read_yieldfrom, self.NUM_THREADS, args=(g,))
+
+ def test_gi_yieldfrom_close_race(self):
+ def gen_yield_from():
+ yield from itertools.count()
+
+ g = gen_yield_from()
+ next(g)
+
+ done = threading.Event()
+
+ def reader():
+ while not done.is_set():
+ g.gi_yieldfrom
+
+ def closer():
+ try:
+ g.close()
+ except ValueError:
+ pass
+ done.set()
+
+ threading_helper.run_concurrently([reader, closer])
--- /dev/null
+Made ``gi_yieldfrom`` thread-safe in the free-threading build
+by using a lightweight lock on the frame state.
#include "pycore_gc.h" // _PyGC_CLEAR_FINALIZED()
#include "pycore_genobject.h" // _PyGen_SetStopIterationValue()
#include "pycore_interpframe.h" // _PyFrame_GetCode()
+#include "pycore_lock.h" // _Py_yield()
#include "pycore_modsupport.h" // _PyArg_CheckPositional()
#include "pycore_object.h" // _PyObject_GC_UNTRACK()
#include "pycore_opcode_utils.h" // RESUME_AFTER_YIELD_FROM
_Py_CAST(PyAsyncGenObject*, (op))
#ifdef Py_GIL_DISABLED
+static bool
+gen_try_set_frame_state(PyGenObject *gen, int8_t *expected, int8_t state)
+{
+ if (*expected == FRAME_SUSPENDED_YIELD_FROM_LOCKED) {
+ // Wait for the in-progress gi_yieldfrom read to complete
+ _Py_yield();
+ *expected = _Py_atomic_load_int8_relaxed(&gen->gi_frame_state);
+ return false;
+ }
+ return _Py_atomic_compare_exchange_int8(&gen->gi_frame_state, expected, state);
+}
+
# define _Py_GEN_TRY_SET_FRAME_STATE(gen, expected, state) \
- _Py_atomic_compare_exchange_int8(&(gen)->gi_frame_state, &expected, (state))
+ gen_try_set_frame_state((gen), &(expected), (state))
#else
# define _Py_GEN_TRY_SET_FRAME_STATE(gen, expected, state) \
((gen)->gi_frame_state = (state), true)
return NULL;
}
- assert(frame_state == FRAME_SUSPENDED_YIELD_FROM ||
- frame_state == FRAME_SUSPENDED);
-
+ assert(FRAME_STATE_SUSPENDED(frame_state));
} while (!_Py_GEN_TRY_SET_FRAME_STATE(gen, frame_state, FRAME_EXECUTING));
int err = 0;
gen_getyieldfrom(PyObject *self, void *Py_UNUSED(ignored))
{
PyGenObject *gen = _PyGen_CAST(self);
- int8_t frame_state = FT_ATOMIC_LOAD_INT8_RELAXED(gen->gi_frame_state);
+#ifdef Py_GIL_DISABLED
+ int8_t frame_state = _Py_atomic_load_int8_relaxed(&gen->gi_frame_state);
+ do {
+ if (frame_state != FRAME_SUSPENDED_YIELD_FROM &&
+ frame_state != FRAME_SUSPENDED_YIELD_FROM_LOCKED)
+ {
+ Py_RETURN_NONE;
+ }
+ } while (!_Py_GEN_TRY_SET_FRAME_STATE(gen, frame_state, FRAME_SUSPENDED_YIELD_FROM_LOCKED));
+
+ PyObject *result = PyStackRef_AsPyObjectNew(_PyFrame_StackPeek(&gen->gi_iframe));
+ _Py_atomic_store_int8_release(&gen->gi_frame_state, FRAME_SUSPENDED_YIELD_FROM);
+ return result;
+#else
+ int8_t frame_state = gen->gi_frame_state;
if (frame_state != FRAME_SUSPENDED_YIELD_FROM) {
Py_RETURN_NONE;
}
- // TODO: still not thread-safe with free threading
return PyStackRef_AsPyObjectNew(_PyFrame_StackPeek(&gen->gi_iframe));
+#endif
}
else if (PyCoro_CheckExact(iter)) {
PyCoroObject *coro = (PyCoroObject *)iter;
int8_t frame_state = FT_ATOMIC_LOAD_INT8_RELAXED(coro->cr_frame_state);
- if (frame_state == FRAME_SUSPENDED_YIELD_FROM) {
+ if (frame_state == FRAME_SUSPENDED_YIELD_FROM ||
+ frame_state == FRAME_SUSPENDED_YIELD_FROM_LOCKED)
+ {
/* `iter` is a coroutine object that is being awaited. */
Py_CLEAR(iter);
_PyErr_SetString(PyThreadState_GET(), PyExc_RuntimeError,
#ifdef Py_GIL_DISABLED
if (!_PyObject_IsUniquelyReferenced((PyObject *)gen)) {
int8_t frame_state = _Py_atomic_load_int8_relaxed(&gen->gi_frame_state);
- while (frame_state < FRAME_EXECUTING) {
+ while (frame_state < FRAME_SUSPENDED_YIELD_FROM_LOCKED) {
if (_Py_atomic_compare_exchange_int8(&gen->gi_frame_state,
&frame_state,
FRAME_EXECUTING)) {
return true;
}
}
+ // NB: We return false for FRAME_SUSPENDED_YIELD_FROM_LOCKED as well.
+ // That case is rare enough that we can just handle it in the deopt.
return false;
}
#endif
// Use faster non-atomic modifications in the GIL-enabled build and when
// the object is uniquely referenced in the free-threaded build.
if (gen->gi_frame_state < FRAME_EXECUTING) {
+ assert(gen->gi_frame_state != FRAME_SUSPENDED_YIELD_FROM_LOCKED);
gen->gi_frame_state = FRAME_EXECUTING;
return true;
}
int handed_off;
};
-static void
+void
_Py_yield(void)
{
#ifdef MS_WINDOWS