]> git.ipfire.org Git - thirdparty/Python/cpython.git/commitdiff
gh-76785: More Fixes for test.support.interpreters (gh-113012)
authorEric Snow <ericsnowcurrently@gmail.com>
Tue, 12 Dec 2023 17:43:30 +0000 (10:43 -0700)
committerGitHub <noreply@github.com>
Tue, 12 Dec 2023 17:43:30 +0000 (17:43 +0000)
This brings the module (along with the associated extension modules) mostly in sync with PEP 734.  There are only a few small things to wrap up.

13 files changed:
Lib/test/support/interpreters/queues.py
Lib/test/test_interpreters/test_queues.py
Modules/Setup
Modules/Setup.stdlib.in
Modules/_xxinterpchannelsmodule.c
Modules/_xxinterpqueuesmodule.c [new file with mode: 0644]
PC/config.c
PCbuild/pythoncore.vcxproj
PCbuild/pythoncore.vcxproj.filters
Tools/build/generate_stdlib_module_names.py
Tools/c-analyzer/cpython/ignored.tsv
configure
configure.ac

index ed6b0d551dd89095a929316ff51fd55fe29d3fd6..aead0c40ca9667e13840cf31387a16048c6c4698 100644 (file)
@@ -3,13 +3,11 @@
 import queue
 import time
 import weakref
-import _xxinterpchannels as _channels
-import _xxinterpchannels as _queues
+import _xxinterpqueues as _queues
 
 # aliases:
-from _xxinterpchannels import (
-    ChannelError as QueueError,
-    ChannelNotFoundError as QueueNotFoundError,
+from _xxinterpqueues import (
+    QueueError, QueueNotFoundError,
 )
 
 __all__ = [
@@ -19,14 +17,27 @@ __all__ = [
 ]
 
 
+class QueueEmpty(_queues.QueueEmpty, queue.Empty):
+    """Raised from get_nowait() when the queue is empty.
+
+    It is also raised from get() if it times out.
+    """
+
+
+class QueueFull(_queues.QueueFull, queue.Full):
+    """Raised from put_nowait() when the queue is full.
+
+    It is also raised from put() if it times out.
+    """
+
+
 def create(maxsize=0):
     """Return a new cross-interpreter queue.
 
     The queue may be used to pass data safely between interpreters.
     """
-    # XXX honor maxsize
-    qid = _queues.create()
-    return Queue._with_maxsize(qid, maxsize)
+    qid = _queues.create(maxsize)
+    return Queue(qid)
 
 
 def list_all():
@@ -35,53 +46,37 @@ def list_all():
             for qid in _queues.list_all()]
 
 
-class QueueEmpty(queue.Empty):
-    """Raised from get_nowait() when the queue is empty.
-
-    It is also raised from get() if it times out.
-    """
-
-
-class QueueFull(queue.Full):
-    """Raised from put_nowait() when the queue is full.
-
-    It is also raised from put() if it times out.
-    """
-
 
 _known_queues = weakref.WeakValueDictionary()
 
 class Queue:
     """A cross-interpreter queue."""
 
-    @classmethod
-    def _with_maxsize(cls, id, maxsize):
-        if not isinstance(maxsize, int):
-            raise TypeError(f'maxsize must be an int, got {maxsize!r}')
-        elif maxsize < 0:
-            maxsize = 0
-        else:
-            maxsize = int(maxsize)
-        self = cls(id)
-        self._maxsize = maxsize
-        return self
-
     def __new__(cls, id, /):
         # There is only one instance for any given ID.
         if isinstance(id, int):
-            id = _channels._channel_id(id, force=False)
-        elif not isinstance(id, _channels.ChannelID):
+            id = int(id)
+        else:
             raise TypeError(f'id must be an int, got {id!r}')
-        key = int(id)
         try:
-            self = _known_queues[key]
+            self = _known_queues[id]
         except KeyError:
             self = super().__new__(cls)
             self._id = id
-            self._maxsize = 0
-            _known_queues[key] = self
+            _known_queues[id] = self
+            _queues.bind(id)
         return self
 
+    def __del__(self):
+        try:
+            _queues.release(self._id)
+        except QueueNotFoundError:
+            pass
+        try:
+            del _known_queues[self._id]
+        except KeyError:
+            pass
+
     def __repr__(self):
         return f'{type(self).__name__}({self.id})'
 
@@ -90,39 +85,58 @@ class Queue:
 
     @property
     def id(self):
-        return int(self._id)
+        return self._id
 
     @property
     def maxsize(self):
-        return self._maxsize
-
-    @property
-    def _info(self):
-        return _channels.get_info(self._id)
+        try:
+            return self._maxsize
+        except AttributeError:
+            self._maxsize = _queues.get_maxsize(self._id)
+            return self._maxsize
 
     def empty(self):
-        return self._info.count == 0
+        return self.qsize() == 0
 
     def full(self):
-        if self._maxsize <= 0:
-            return False
-        return self._info.count >= self._maxsize
+        return _queues.is_full(self._id)
 
     def qsize(self):
-        return self._info.count
+        return _queues.get_count(self._id)
 
-    def put(self, obj, timeout=None):
-        # XXX block if full
-        _channels.send(self._id, obj, blocking=False)
+    def put(self, obj, timeout=None, *,
+            _delay=10 / 1000,  # 10 milliseconds
+            ):
+        """Add the object to the queue.
+
+        This blocks while the queue is full.
+        """
+        if timeout is not None:
+            timeout = int(timeout)
+            if timeout < 0:
+                raise ValueError(f'timeout value must be non-negative')
+            end = time.time() + timeout
+        while True:
+            try:
+                _queues.put(self._id, obj)
+            except _queues.QueueFull as exc:
+                if timeout is not None and time.time() >= end:
+                    exc.__class__ = QueueFull
+                    raise  # re-raise
+                time.sleep(_delay)
+            else:
+                break
 
     def put_nowait(self, obj):
-        # XXX raise QueueFull if full
-        return _channels.send(self._id, obj, blocking=False)
+        try:
+            return _queues.put(self._id, obj)
+        except _queues.QueueFull as exc:
+            exc.__class__ = QueueFull
+            raise  # re-raise
 
     def get(self, timeout=None, *,
-             _sentinel=object(),
-             _delay=10 / 1000,  # 10 milliseconds
-             ):
+            _delay=10 / 1000,  # 10 milliseconds
+            ):
         """Return the next object from the queue.
 
         This blocks while the queue is empty.
@@ -132,25 +146,27 @@ class Queue:
             if timeout < 0:
                 raise ValueError(f'timeout value must be non-negative')
             end = time.time() + timeout
-        obj = _channels.recv(self._id, _sentinel)
-        while obj is _sentinel:
-            time.sleep(_delay)
-            if timeout is not None and time.time() >= end:
-                raise QueueEmpty
-            obj = _channels.recv(self._id, _sentinel)
+        while True:
+            try:
+                return _queues.get(self._id)
+            except _queues.QueueEmpty as exc:
+                if timeout is not None and time.time() >= end:
+                    exc.__class__ = QueueEmpty
+                    raise  # re-raise
+                time.sleep(_delay)
         return obj
 
-    def get_nowait(self, *, _sentinel=object()):
+    def get_nowait(self):
         """Return the next object from the channel.
 
         If the queue is empty then raise QueueEmpty.  Otherwise this
         is the same as get().
         """
-        obj = _channels.recv(self._id, _sentinel)
-        if obj is _sentinel:
-            raise QueueEmpty
-        return obj
+        try:
+            return _queues.get(self._id)
+        except _queues.QueueEmpty as exc:
+            exc.__class__ = QueueEmpty
+            raise  # re-raise
 
 
-# XXX add this:
-#_channels._register_queue_type(Queue)
+_queues._register_queue_type(Queue)
index 2af90b14d3e3c435e15953ce99daff3060eca93d..2a8ca99c1f6e3f35db1389f8ceeff25ffc3c2e93 100644 (file)
@@ -5,13 +5,21 @@ import time
 
 from test.support import import_helper
 # Raise SkipTest if subinterpreters not supported.
-import_helper.import_module('_xxinterpchannels')
-#import_helper.import_module('_xxinterpqueues')
+_queues = import_helper.import_module('_xxinterpqueues')
 from test.support import interpreters
 from test.support.interpreters import queues
 from .utils import _run_output, TestBase
 
 
+class TestBase(TestBase):
+    def tearDown(self):
+        for qid in _queues.list_all():
+            try:
+                _queues.destroy(qid)
+            except Exception:
+                pass
+
+
 class QueueTests(TestBase):
 
     def test_create(self):
@@ -32,20 +40,47 @@ class QueueTests(TestBase):
             self.assertEqual(queue.maxsize, 0)
 
         with self.subTest('negative maxsize'):
-            queue = queues.create(-1)
-            self.assertEqual(queue.maxsize, 0)
+            queue = queues.create(-10)
+            self.assertEqual(queue.maxsize, -10)
 
         with self.subTest('bad maxsize'):
             with self.assertRaises(TypeError):
                 queues.create('1')
 
-    @unittest.expectedFailure
     def test_shareable(self):
         queue1 = queues.create()
-        queue2 = queues.create()
-        queue1.put(queue2)
-        queue3 = queue1.get()
-        self.assertIs(queue3, queue1)
+
+        interp = interpreters.create()
+        interp.exec_sync(dedent(f"""
+            from test.support.interpreters import queues
+            queue1 = queues.Queue({queue1.id})
+            """));
+
+        with self.subTest('same interpreter'):
+            queue2 = queues.create()
+            queue1.put(queue2)
+            queue3 = queue1.get()
+            self.assertIs(queue3, queue2)
+
+        with self.subTest('from current interpreter'):
+            queue4 = queues.create()
+            queue1.put(queue4)
+            out = _run_output(interp, dedent("""
+                queue4 = queue1.get()
+                print(queue4.id)
+                """))
+            qid = int(out)
+            self.assertEqual(qid, queue4.id)
+
+        with self.subTest('from subinterpreter'):
+            out = _run_output(interp, dedent("""
+                queue5 = queues.create()
+                queue1.put(queue5)
+                print(queue5.id)
+                """))
+            qid = int(out)
+            queue5 = queue1.get()
+            self.assertEqual(queue5.id, qid)
 
     def test_id_type(self):
         queue = queues.create()
@@ -137,7 +172,6 @@ class TestQueueOps(TestBase):
 
         self.assertEqual(actual, expected)
 
-    @unittest.expectedFailure
     def test_put_timeout(self):
         queue = queues.create(2)
         queue.put(None)
@@ -147,7 +181,6 @@ class TestQueueOps(TestBase):
         queue.get()
         queue.put(None)
 
-    @unittest.expectedFailure
     def test_put_nowait(self):
         queue = queues.create(2)
         queue.put_nowait(None)
@@ -179,31 +212,64 @@ class TestQueueOps(TestBase):
             assert obj is not orig, 'expected: obj is not orig'
             """))
 
-    @unittest.expectedFailure
     def test_put_get_different_interpreters(self):
+        interp = interpreters.create()
         queue1 = queues.create()
         queue2 = queues.create()
+        self.assertEqual(len(queues.list_all()), 2)
+
         obj1 = b'spam'
         queue1.put(obj1)
+
         out = _run_output(
-            interpreters.create(),
+            interp,
             dedent(f"""
-                import test.support.interpreters.queue as queues
+                from test.support.interpreters import queues
                 queue1 = queues.Queue({queue1.id})
                 queue2 = queues.Queue({queue2.id})
+                assert queue1.qsize() == 1, 'expected: queue1.qsize() == 1'
                 obj = queue1.get()
+                assert queue1.qsize() == 0, 'expected: queue1.qsize() == 0'
                 assert obj == b'spam', 'expected: obj == obj1'
                 # When going to another interpreter we get a copy.
                 assert id(obj) != {id(obj1)}, 'expected: obj is not obj1'
                 obj2 = b'eggs'
                 print(id(obj2))
+                assert queue2.qsize() == 0, 'expected: queue2.qsize() == 0'
                 queue2.put(obj2)
+                assert queue2.qsize() == 1, 'expected: queue2.qsize() == 1'
                 """))
-        obj2 = queue2.get()
+        self.assertEqual(len(queues.list_all()), 2)
+        self.assertEqual(queue1.qsize(), 0)
+        self.assertEqual(queue2.qsize(), 1)
 
+        obj2 = queue2.get()
         self.assertEqual(obj2, b'eggs')
         self.assertNotEqual(id(obj2), int(out))
 
+    def test_put_cleared_with_subinterpreter(self):
+        interp = interpreters.create()
+        queue = queues.create()
+
+        out = _run_output(
+            interp,
+            dedent(f"""
+                from test.support.interpreters import queues
+                queue = queues.Queue({queue.id})
+                obj1 = b'spam'
+                obj2 = b'eggs'
+                queue.put(obj1)
+                queue.put(obj2)
+                """))
+        self.assertEqual(queue.qsize(), 2)
+
+        obj1 = queue.get()
+        self.assertEqual(obj1, b'spam')
+        self.assertEqual(queue.qsize(), 1)
+
+        del interp
+        self.assertEqual(queue.qsize(), 0)
+
     def test_put_get_different_threads(self):
         queue1 = queues.create()
         queue2 = queues.create()
index 1367f0ef4fa54a348de174aa4dfb67dbda5de1ec..8ad9a5aebbfcaadcf97509756e8a9c1de5761ec9 100644 (file)
@@ -273,6 +273,7 @@ PYTHONPATH=$(COREPYTHONPATH)
 
 #_xxsubinterpreters _xxsubinterpretersmodule.c
 #_xxinterpchannels _xxinterpchannelsmodule.c
+#_xxinterpqueues _xxinterpqueuesmodule.c
 #_xxtestfuzz _xxtestfuzz/_xxtestfuzz.c _xxtestfuzz/fuzzer.c
 #_testbuffer _testbuffer.c
 #_testinternalcapi _testinternalcapi.c
index 54650ea9c1d4ac11f8f0d346dcabf6fda7c22a06..8a65a9cffb1b9d87dbf15bcb617d96cd55e1346b 100644 (file)
 @MODULE__QUEUE_TRUE@_queue _queuemodule.c
 @MODULE__RANDOM_TRUE@_random _randommodule.c
 @MODULE__STRUCT_TRUE@_struct _struct.c
+
+# build supports subinterpreters
 @MODULE__XXSUBINTERPRETERS_TRUE@_xxsubinterpreters _xxsubinterpretersmodule.c
 @MODULE__XXINTERPCHANNELS_TRUE@_xxinterpchannels _xxinterpchannelsmodule.c
+@MODULE__XXINTERPQUEUES_TRUE@_xxinterpqueues _xxinterpqueuesmodule.c
 @MODULE__ZONEINFO_TRUE@_zoneinfo _zoneinfo.c
 
 # needs libm
index 97729ec269cb6252905159f5117afb5f5ad6c65a..4e9b8a82a3f630bde6e840082513cdb84512e9a6 100644 (file)
@@ -2629,10 +2629,11 @@ _get_current_channelend_type(int end)
         cls = state->recv_channel_type;
     }
     if (cls == NULL) {
-        PyObject *highlevel = PyImport_ImportModule("interpreters");
+        // Force the module to be loaded, to register the type.
+        PyObject *highlevel = PyImport_ImportModule("interpreters.channel");
         if (highlevel == NULL) {
             PyErr_Clear();
-            highlevel = PyImport_ImportModule("test.support.interpreters");
+            highlevel = PyImport_ImportModule("test.support.interpreters.channel");
             if (highlevel == NULL) {
                 return NULL;
             }
diff --git a/Modules/_xxinterpqueuesmodule.c b/Modules/_xxinterpqueuesmodule.c
new file mode 100644 (file)
index 0000000..2cc3a2a
--- /dev/null
@@ -0,0 +1,1685 @@
+/* interpreters module */
+/* low-level access to interpreter primitives */
+
+#ifndef Py_BUILD_CORE_BUILTIN
+#  define Py_BUILD_CORE_MODULE 1
+#endif
+
+#include "Python.h"
+#include "pycore_crossinterp.h"   // struct _xid
+
+
+#define MODULE_NAME "_xxinterpqueues"
+
+
+#define GLOBAL_MALLOC(TYPE) \
+    PyMem_RawMalloc(sizeof(TYPE))
+#define GLOBAL_FREE(VAR) \
+    PyMem_RawFree(VAR)
+
+
+#define XID_IGNORE_EXC 1
+#define XID_FREE 2
+
+static int
+_release_xid_data(_PyCrossInterpreterData *data, int flags)
+{
+    int ignoreexc = flags & XID_IGNORE_EXC;
+    PyObject *exc;
+    if (ignoreexc) {
+        exc = PyErr_GetRaisedException();
+    }
+    int res;
+    if (flags & XID_FREE) {
+        res = _PyCrossInterpreterData_ReleaseAndRawFree(data);
+    }
+    else {
+        res = _PyCrossInterpreterData_Release(data);
+    }
+    if (res < 0) {
+        /* The owning interpreter is already destroyed. */
+        if (ignoreexc) {
+            // XXX Emit a warning?
+            PyErr_Clear();
+        }
+    }
+    if (flags & XID_FREE) {
+        /* Either way, we free the data. */
+    }
+    if (ignoreexc) {
+        PyErr_SetRaisedException(exc);
+    }
+    return res;
+}
+
+
+static PyInterpreterState *
+_get_current_interp(void)
+{
+    // PyInterpreterState_Get() aborts if lookup fails, so don't need
+    // to check the result for NULL.
+    return PyInterpreterState_Get();
+}
+
+static PyObject *
+_get_current_module(void)
+{
+    PyObject *name = PyUnicode_FromString(MODULE_NAME);
+    if (name == NULL) {
+        return NULL;
+    }
+    PyObject *mod = PyImport_GetModule(name);
+    Py_DECREF(name);
+    if (mod == NULL) {
+        return NULL;
+    }
+    assert(mod != Py_None);
+    return mod;
+}
+
+
+struct idarg_int64_converter_data {
+    // input:
+    const char *label;
+    // output:
+    int64_t id;
+};
+
+static int
+idarg_int64_converter(PyObject *arg, void *ptr)
+{
+    int64_t id;
+    struct idarg_int64_converter_data *data = ptr;
+
+    const char *label = data->label;
+    if (label == NULL) {
+        label = "ID";
+    }
+
+    if (PyIndex_Check(arg)) {
+        int overflow = 0;
+        id = PyLong_AsLongLongAndOverflow(arg, &overflow);
+        if (id == -1 && PyErr_Occurred()) {
+            return 0;
+        }
+        else if (id == -1 && overflow == 1) {
+            PyErr_Format(PyExc_OverflowError,
+                         "max %s is %lld, got %R", label, INT64_MAX, arg);
+            return 0;
+        }
+        else if (id < 0) {
+            PyErr_Format(PyExc_ValueError,
+                         "%s must be a non-negative int, got %R", label, arg);
+            return 0;
+        }
+    }
+    else {
+        PyErr_Format(PyExc_TypeError,
+                     "%s must be an int, got %.100s",
+                     label, Py_TYPE(arg)->tp_name);
+        return 0;
+    }
+    data->id = id;
+    return 1;
+}
+
+
+/* module state *************************************************************/
+
+typedef struct {
+    /* external types (added at runtime by interpreters module) */
+    PyTypeObject *queue_type;
+
+    /* QueueError (and its subclasses) */
+    PyObject *QueueError;
+    PyObject *QueueNotFoundError;
+    PyObject *QueueEmpty;
+    PyObject *QueueFull;
+} module_state;
+
+static inline module_state *
+get_module_state(PyObject *mod)
+{
+    assert(mod != NULL);
+    module_state *state = PyModule_GetState(mod);
+    assert(state != NULL);
+    return state;
+}
+
+static int
+traverse_module_state(module_state *state, visitproc visit, void *arg)
+{
+    /* external types */
+    Py_VISIT(state->queue_type);
+
+    /* QueueError */
+    Py_VISIT(state->QueueError);
+    Py_VISIT(state->QueueNotFoundError);
+    Py_VISIT(state->QueueEmpty);
+    Py_VISIT(state->QueueFull);
+
+    return 0;
+}
+
+static int
+clear_module_state(module_state *state)
+{
+    /* external types */
+    Py_CLEAR(state->queue_type);
+
+    /* QueueError */
+    Py_CLEAR(state->QueueError);
+    Py_CLEAR(state->QueueNotFoundError);
+    Py_CLEAR(state->QueueEmpty);
+    Py_CLEAR(state->QueueFull);
+
+    return 0;
+}
+
+
+/* error codes **************************************************************/
+
+#define ERR_EXCEPTION_RAISED (-1)
+// multi-queue errors
+#define ERR_QUEUES_ALLOC (-11)
+#define ERR_QUEUE_ALLOC (-12)
+#define ERR_NO_NEXT_QUEUE_ID (-13)
+#define ERR_QUEUE_NOT_FOUND (-14)
+// single-queue errors
+#define ERR_QUEUE_EMPTY (-21)
+#define ERR_QUEUE_FULL (-22)
+
+static int
+resolve_module_errcode(module_state *state, int errcode, int64_t qid,
+                       PyObject **p_exctype, PyObject **p_msgobj)
+{
+    PyObject *exctype = NULL;
+    PyObject *msg = NULL;
+    switch (errcode) {
+    case ERR_NO_NEXT_QUEUE_ID:
+        exctype = state->QueueError;
+        msg = PyUnicode_FromString("ran out of queue IDs");
+        break;
+    case ERR_QUEUE_NOT_FOUND:
+        exctype = state->QueueNotFoundError;
+        msg = PyUnicode_FromFormat("queue %" PRId64 " not found", qid);
+        break;
+    case ERR_QUEUE_EMPTY:
+        exctype = state->QueueEmpty;
+        msg = PyUnicode_FromFormat("queue %" PRId64 " is empty", qid);
+        break;
+    case ERR_QUEUE_FULL:
+        exctype = state->QueueFull;
+        msg = PyUnicode_FromFormat("queue %" PRId64 " is full", qid);
+        break;
+    default:
+        PyErr_Format(PyExc_ValueError,
+                     "unsupported error code %d", errcode);
+        return -1;
+    }
+
+    if (msg == NULL) {
+        assert(PyErr_Occurred());
+        return -1;
+    }
+    *p_exctype = exctype;
+    *p_msgobj = msg;
+    return 0;
+}
+
+
+/* QueueError ***************************************************************/
+
+static int
+add_exctype(PyObject *mod, PyObject **p_state_field,
+            const char *qualname, const char *doc, PyObject *base)
+{
+    const char *dot = strrchr(qualname, '.');
+    assert(dot != NULL);
+    const char *name = dot+1;
+    assert(*p_state_field == NULL);
+    assert(!PyObject_HasAttrStringWithError(mod, name));
+    PyObject *exctype = PyErr_NewExceptionWithDoc(qualname, doc, base, NULL);
+    if (exctype == NULL) {
+        return -1;
+    }
+    if (PyModule_AddType(mod, (PyTypeObject *)exctype) < 0) {
+        Py_DECREF(exctype);
+        return -1;
+    }
+    *p_state_field = exctype;
+    return 0;
+}
+
+static int
+add_QueueError(PyObject *mod)
+{
+    module_state *state = get_module_state(mod);
+
+#define PREFIX "test.support.interpreters."
+#define ADD_EXCTYPE(NAME, BASE, DOC)                                    \
+    if (add_exctype(mod, &state->NAME, PREFIX #NAME, DOC, BASE) < 0) {  \
+        return -1;                                                      \
+    }
+    ADD_EXCTYPE(QueueError, PyExc_RuntimeError,
+                "Indicates that a queue-related error happened.")
+    ADD_EXCTYPE(QueueNotFoundError, state->QueueError, NULL)
+    ADD_EXCTYPE(QueueEmpty, state->QueueError, NULL)
+    ADD_EXCTYPE(QueueFull, state->QueueError, NULL)
+#undef ADD_EXCTYPE
+#undef PREFIX
+
+    return 0;
+}
+
+static int
+handle_queue_error(int err, PyObject *mod, int64_t qid)
+{
+    if (err == 0) {
+        assert(!PyErr_Occurred());
+        return 0;
+    }
+    assert(err < 0);
+    assert((err == -1) == (PyErr_Occurred() != NULL));
+
+    module_state *state;
+    switch (err) {
+    case ERR_QUEUE_ALLOC:  // fall through
+    case ERR_QUEUES_ALLOC:
+        PyErr_NoMemory();
+        break;
+    default:
+        state = get_module_state(mod);
+        assert(state->QueueError != NULL);
+        PyObject *exctype = NULL;
+        PyObject *msg = NULL;
+        if (resolve_module_errcode(state, err, qid, &exctype, &msg) < 0) {
+            return -1;
+        }
+        PyObject *exc = PyObject_CallOneArg(exctype, msg);
+        Py_DECREF(msg);
+        if (exc == NULL) {
+            return -1;
+        }
+        PyErr_SetObject(exctype, exc);
+        Py_DECREF(exc);
+    }
+    return 1;
+}
+
+
+/* the basic queue **********************************************************/
+
+struct _queueitem;
+
+typedef struct _queueitem {
+    _PyCrossInterpreterData *data;
+    struct _queueitem *next;
+} _queueitem;
+
+static void
+_queueitem_init(_queueitem *item, _PyCrossInterpreterData *data)
+{
+    *item = (_queueitem){
+        .data = data,
+    };
+}
+
+static void
+_queueitem_clear(_queueitem *item)
+{
+    item->next = NULL;
+
+    if (item->data != NULL) {
+        // It was allocated in queue_put().
+        (void)_release_xid_data(item->data, XID_IGNORE_EXC & XID_FREE);
+        item->data = NULL;
+    }
+}
+
+static _queueitem *
+_queueitem_new(_PyCrossInterpreterData *data)
+{
+    _queueitem *item = GLOBAL_MALLOC(_queueitem);
+    if (item == NULL) {
+        PyErr_NoMemory();
+        return NULL;
+    }
+    _queueitem_init(item, data);
+    return item;
+}
+
+static void
+_queueitem_free(_queueitem *item)
+{
+    _queueitem_clear(item);
+    GLOBAL_FREE(item);
+}
+
+static void
+_queueitem_free_all(_queueitem *item)
+{
+    while (item != NULL) {
+        _queueitem *last = item;
+        item = item->next;
+        _queueitem_free(last);
+    }
+}
+
+static void
+_queueitem_popped(_queueitem *item, _PyCrossInterpreterData **p_data)
+{
+    *p_data = item->data;
+    // We clear them here, so they won't be released in _queueitem_clear().
+    item->data = NULL;
+    _queueitem_free(item);
+}
+
+
+/* the queue */
+typedef struct _queue {
+    Py_ssize_t num_waiters;  // protected by global lock
+    PyThread_type_lock mutex;
+    int alive;
+    struct _queueitems {
+        Py_ssize_t maxsize;
+        Py_ssize_t count;
+        _queueitem *first;
+        _queueitem *last;
+    } items;
+} _queue;
+
+static int
+_queue_init(_queue *queue, Py_ssize_t maxsize)
+{
+    PyThread_type_lock mutex = PyThread_allocate_lock();
+    if (mutex == NULL) {
+        return ERR_QUEUE_ALLOC;
+    }
+    *queue = (_queue){
+        .mutex = mutex,
+        .alive = 1,
+        .items = {
+            .maxsize = maxsize,
+        },
+    };
+    return 0;
+}
+
+static void
+_queue_clear(_queue *queue)
+{
+    assert(!queue->alive);
+    assert(queue->num_waiters == 0);
+    _queueitem_free_all(queue->items.first);
+    assert(queue->mutex != NULL);
+    PyThread_free_lock(queue->mutex);
+    *queue = (_queue){0};
+}
+
+static void
+_queue_kill_and_wait(_queue *queue)
+{
+    // Mark it as dead.
+    PyThread_acquire_lock(queue->mutex, WAIT_LOCK);
+    assert(queue->alive);
+    queue->alive = 0;
+    PyThread_release_lock(queue->mutex);
+
+    // Wait for all waiters to fail.
+    while (queue->num_waiters > 0) {
+        PyThread_acquire_lock(queue->mutex, WAIT_LOCK);
+        PyThread_release_lock(queue->mutex);
+    };
+}
+
+static void
+_queue_mark_waiter(_queue *queue, PyThread_type_lock parent_mutex)
+{
+    if (parent_mutex != NULL) {
+        PyThread_acquire_lock(parent_mutex, WAIT_LOCK);
+        queue->num_waiters += 1;
+        PyThread_release_lock(parent_mutex);
+    }
+    else {
+        // The caller must be holding the parent lock already.
+        queue->num_waiters += 1;
+    }
+}
+
+static void
+_queue_unmark_waiter(_queue *queue, PyThread_type_lock parent_mutex)
+{
+    if (parent_mutex != NULL) {
+        PyThread_acquire_lock(parent_mutex, WAIT_LOCK);
+        queue->num_waiters -= 1;
+        PyThread_release_lock(parent_mutex);
+    }
+    else {
+        // The caller must be holding the parent lock already.
+        queue->num_waiters -= 1;
+    }
+}
+
+static int
+_queue_lock(_queue *queue)
+{
+    // The queue must be marked as a waiter already.
+    PyThread_acquire_lock(queue->mutex, WAIT_LOCK);
+    if (!queue->alive) {
+        PyThread_release_lock(queue->mutex);
+        return ERR_QUEUE_NOT_FOUND;
+    }
+    return 0;
+}
+
+static void
+_queue_unlock(_queue *queue)
+{
+    PyThread_release_lock(queue->mutex);
+}
+
+static int
+_queue_add(_queue *queue, _PyCrossInterpreterData *data)
+{
+    int err = _queue_lock(queue);
+    if (err < 0) {
+        return err;
+    }
+
+    Py_ssize_t maxsize = queue->items.maxsize;
+    if (maxsize <= 0) {
+        maxsize = PY_SSIZE_T_MAX;
+    }
+    if (queue->items.count >= maxsize) {
+        _queue_unlock(queue);
+        return ERR_QUEUE_FULL;
+    }
+
+    _queueitem *item = _queueitem_new(data);
+    if (item == NULL) {
+        _queue_unlock(queue);
+        return -1;
+    }
+
+    queue->items.count += 1;
+    if (queue->items.first == NULL) {
+        queue->items.first = item;
+    }
+    else {
+        queue->items.last->next = item;
+    }
+    queue->items.last = item;
+
+    _queue_unlock(queue);
+    return 0;
+}
+
+static int
+_queue_next(_queue *queue, _PyCrossInterpreterData **p_data)
+{
+    int err = _queue_lock(queue);
+    if (err < 0) {
+        return err;
+    }
+
+    assert(queue->items.count >= 0);
+    _queueitem *item = queue->items.first;
+    if (item == NULL) {
+        _queue_unlock(queue);
+        return ERR_QUEUE_EMPTY;
+    }
+    queue->items.first = item->next;
+    if (queue->items.last == item) {
+        queue->items.last = NULL;
+    }
+    queue->items.count -= 1;
+
+    _queueitem_popped(item, p_data);
+
+    _queue_unlock(queue);
+    return 0;
+}
+
+static int
+_queue_get_maxsize(_queue *queue, Py_ssize_t *p_maxsize)
+{
+    int err = _queue_lock(queue);
+    if (err < 0) {
+        return err;
+    }
+
+    *p_maxsize = queue->items.maxsize;
+
+    _queue_unlock(queue);
+    return 0;
+}
+
+static int
+_queue_is_full(_queue *queue, int *p_is_full)
+{
+    int err = _queue_lock(queue);
+    if (err < 0) {
+        return err;
+    }
+
+    assert(queue->items.count <= queue->items.maxsize);
+    *p_is_full = queue->items.count == queue->items.maxsize;
+
+    _queue_unlock(queue);
+    return 0;
+}
+
+static int
+_queue_get_count(_queue *queue, Py_ssize_t *p_count)
+{
+    int err = _queue_lock(queue);
+    if (err < 0) {
+        return err;
+    }
+
+    *p_count = queue->items.count;
+
+    _queue_unlock(queue);
+    return 0;
+}
+
+static void
+_queue_clear_interpreter(_queue *queue, int64_t interpid)
+{
+    int err = _queue_lock(queue);
+    if (err == ERR_QUEUE_NOT_FOUND) {
+        // The queue is already destroyed, so there's nothing to clear.
+        assert(!PyErr_Occurred());
+        return;
+    }
+    assert(err == 0);  // There should be no other errors.
+
+    _queueitem *prev = NULL;
+    _queueitem *next = queue->items.first;
+    while (next != NULL) {
+        _queueitem *item = next;
+        next = item->next;
+        if (item->data->interpid == interpid) {
+            if (prev == NULL) {
+                queue->items.first = item->next;
+            }
+            else {
+                prev->next = item->next;
+            }
+            _queueitem_free(item);
+            queue->items.count -= 1;
+        }
+        else {
+            prev = item;
+        }
+    }
+
+    _queue_unlock(queue);
+}
+
+
+/* external queue references ************************************************/
+
+struct _queueref;
+
+typedef struct _queueref {
+    struct _queueref *next;
+    int64_t qid;
+    Py_ssize_t refcount;
+    _queue *queue;
+} _queueref;
+
+static _queueref *
+_queuerefs_find(_queueref *first, int64_t qid, _queueref **pprev)
+{
+    _queueref *prev = NULL;
+    _queueref *ref = first;
+    while (ref != NULL) {
+        if (ref->qid == qid) {
+            break;
+        }
+        prev = ref;
+        ref = ref->next;
+    }
+    if (pprev != NULL) {
+        *pprev = prev;
+    }
+    return ref;
+}
+
+
+/* a collection of queues ***************************************************/
+
+typedef struct _queues {
+    PyThread_type_lock mutex;
+    _queueref *head;
+    int64_t count;
+    int64_t next_id;
+} _queues;
+
+static void
+_queues_init(_queues *queues, PyThread_type_lock mutex)
+{
+    queues->mutex = mutex;
+    queues->head = NULL;
+    queues->count = 0;
+    queues->next_id = 1;
+}
+
+static void
+_queues_fini(_queues *queues)
+{
+    assert(queues->count == 0);
+    assert(queues->head == NULL);
+    if (queues->mutex != NULL) {
+        PyThread_free_lock(queues->mutex);
+        queues->mutex = NULL;
+    }
+}
+
+static int64_t
+_queues_next_id(_queues *queues)  // needs lock
+{
+    int64_t qid = queues->next_id;
+    if (qid < 0) {
+        /* overflow */
+        return ERR_NO_NEXT_QUEUE_ID;
+    }
+    queues->next_id += 1;
+    return qid;
+}
+
+static int
+_queues_lookup(_queues *queues, int64_t qid, _queue **res)
+{
+    PyThread_acquire_lock(queues->mutex, WAIT_LOCK);
+
+    _queueref *ref = _queuerefs_find(queues->head, qid, NULL);
+    if (ref == NULL) {
+        PyThread_release_lock(queues->mutex);
+        return ERR_QUEUE_NOT_FOUND;
+    }
+    assert(ref->queue != NULL);
+    _queue *queue = ref->queue;
+    _queue_mark_waiter(queue, NULL);
+    // The caller must unmark it.
+
+    PyThread_release_lock(queues->mutex);
+
+    *res = queue;
+    return 0;
+}
+
+static int64_t
+_queues_add(_queues *queues, _queue *queue)
+{
+    int64_t qid = -1;
+    PyThread_acquire_lock(queues->mutex, WAIT_LOCK);
+
+    // Create a new ref.
+    int64_t _qid = _queues_next_id(queues);
+    if (_qid < 0) {
+        goto done;
+    }
+    _queueref *ref = GLOBAL_MALLOC(_queueref);
+    if (ref == NULL) {
+        qid = ERR_QUEUE_ALLOC;
+        goto done;
+    }
+    *ref = (_queueref){
+        .qid = _qid,
+        .queue = queue,
+    };
+
+    // Add it to the list.
+    // We assume that the queue is a new one (not already in the list).
+    ref->next = queues->head;
+    queues->head = ref;
+    queues->count += 1;
+
+    qid = _qid;
+done:
+    PyThread_release_lock(queues->mutex);
+    return qid;
+}
+
+static void
+_queues_remove_ref(_queues *queues, _queueref *ref, _queueref *prev,
+                   _queue **p_queue)
+{
+    assert(ref->queue != NULL);
+
+    if (ref == queues->head) {
+        queues->head = ref->next;
+    }
+    else {
+        prev->next = ref->next;
+    }
+    ref->next = NULL;
+    queues->count -= 1;
+
+    *p_queue = ref->queue;
+    ref->queue = NULL;
+    GLOBAL_FREE(ref);
+}
+
+static int
+_queues_remove(_queues *queues, int64_t qid, _queue **p_queue)
+{
+    PyThread_acquire_lock(queues->mutex, WAIT_LOCK);
+
+    _queueref *prev = NULL;
+    _queueref *ref = _queuerefs_find(queues->head, qid, &prev);
+    if (ref == NULL) {
+        PyThread_release_lock(queues->mutex);
+        return ERR_QUEUE_NOT_FOUND;
+    }
+
+    _queues_remove_ref(queues, ref, prev, p_queue);
+    PyThread_release_lock(queues->mutex);
+
+    return 0;
+}
+
+static int
+_queues_incref(_queues *queues, int64_t qid)
+{
+    // XXX Track interpreter IDs?
+    int res = -1;
+    PyThread_acquire_lock(queues->mutex, WAIT_LOCK);
+
+    _queueref *ref = _queuerefs_find(queues->head, qid, NULL);
+    if (ref == NULL) {
+        assert(!PyErr_Occurred());
+        res = ERR_QUEUE_NOT_FOUND;
+        goto done;
+    }
+    ref->refcount += 1;
+
+    res = 0;
+done:
+    PyThread_release_lock(queues->mutex);
+    return res;
+}
+
+static void _queue_free(_queue *);
+
+static void
+_queues_decref(_queues *queues, int64_t qid)
+{
+    PyThread_acquire_lock(queues->mutex, WAIT_LOCK);
+
+    _queueref *prev = NULL;
+    _queueref *ref = _queuerefs_find(queues->head, qid, &prev);
+    if (ref == NULL) {
+        assert(!PyErr_Occurred());
+        // Already destroyed.
+        // XXX Warn?
+        goto finally;
+    }
+    assert(ref->refcount > 0);
+    ref->refcount -= 1;
+
+    // Destroy if no longer used.
+    assert(ref->queue != NULL);
+    if (ref->refcount == 0) {
+        _queue *queue = NULL;
+        _queues_remove_ref(queues, ref, prev, &queue);
+        PyThread_release_lock(queues->mutex);
+
+        _queue_kill_and_wait(queue);
+        _queue_free(queue);
+        return;
+    }
+
+finally:
+    PyThread_release_lock(queues->mutex);
+}
+
+static int64_t *
+_queues_list_all(_queues *queues, int64_t *count)
+{
+    int64_t *qids = NULL;
+    PyThread_acquire_lock(queues->mutex, WAIT_LOCK);
+    int64_t *ids = PyMem_NEW(int64_t, (Py_ssize_t)(queues->count));
+    if (ids == NULL) {
+        goto done;
+    }
+    _queueref *ref = queues->head;
+    for (int64_t i=0; ref != NULL; ref = ref->next, i++) {
+        ids[i] = ref->qid;
+    }
+    *count = queues->count;
+
+    qids = ids;
+done:
+    PyThread_release_lock(queues->mutex);
+    return qids;
+}
+
+static void
+_queues_clear_interpreter(_queues *queues, int64_t interpid)
+{
+    PyThread_acquire_lock(queues->mutex, WAIT_LOCK);
+
+    _queueref *ref = queues->head;
+    for (; ref != NULL; ref = ref->next) {
+        assert(ref->queue != NULL);
+        _queue_clear_interpreter(ref->queue, interpid);
+    }
+
+    PyThread_release_lock(queues->mutex);
+}
+
+
+/* "high"-level queue-related functions *************************************/
+
+static void
+_queue_free(_queue *queue)
+{
+    _queue_clear(queue);
+    GLOBAL_FREE(queue);
+}
+
+// Create a new queue.
+static int64_t
+queue_create(_queues *queues, Py_ssize_t maxsize)
+{
+    _queue *queue = GLOBAL_MALLOC(_queue);
+    if (queue == NULL) {
+        return ERR_QUEUE_ALLOC;
+    }
+    int err = _queue_init(queue, maxsize);
+    if (err < 0) {
+        GLOBAL_FREE(queue);
+        return (int64_t)err;
+    }
+    int64_t qid = _queues_add(queues, queue);
+    if (qid < 0) {
+        _queue_clear(queue);
+        GLOBAL_FREE(queue);
+    }
+    return qid;
+}
+
+// Completely destroy the queue.
+static int
+queue_destroy(_queues *queues, int64_t qid)
+{
+    _queue *queue = NULL;
+    int err = _queues_remove(queues, qid, &queue);
+    if (err < 0) {
+        return err;
+    }
+    _queue_kill_and_wait(queue);
+    _queue_free(queue);
+    return 0;
+}
+
+// Push an object onto the queue.
+static int
+queue_put(_queues *queues, int64_t qid, PyObject *obj)
+{
+    // Look up the queue.
+    _queue *queue = NULL;
+    int err = _queues_lookup(queues, qid, &queue);
+    if (err != 0) {
+        return err;
+    }
+    assert(queue != NULL);
+
+    // Convert the object to cross-interpreter data.
+    _PyCrossInterpreterData *data = GLOBAL_MALLOC(_PyCrossInterpreterData);
+    if (data == NULL) {
+        _queue_unmark_waiter(queue, queues->mutex);
+        return -1;
+    }
+    if (_PyObject_GetCrossInterpreterData(obj, data) != 0) {
+        _queue_unmark_waiter(queue, queues->mutex);
+        GLOBAL_FREE(data);
+        return -1;
+    }
+
+    // Add the data to the queue.
+    int res = _queue_add(queue, data);
+    _queue_unmark_waiter(queue, queues->mutex);
+    if (res != 0) {
+        // We may chain an exception here:
+        (void)_release_xid_data(data, 0);
+        GLOBAL_FREE(data);
+        return res;
+    }
+
+    return 0;
+}
+
+// Pop the next object off the queue.  Fail if empty.
+// XXX Support a "wait" mutex?
+static int
+queue_get(_queues *queues, int64_t qid, PyObject **res)
+{
+    int err;
+    *res = NULL;
+
+    // Look up the queue.
+    _queue *queue = NULL;
+    err = _queues_lookup(queues, qid, &queue);
+    if (err != 0) {
+        return err;
+    }
+    // Past this point we are responsible for releasing the mutex.
+    assert(queue != NULL);
+
+    // Pop off the next item from the queue.
+    _PyCrossInterpreterData *data = NULL;
+    err = _queue_next(queue, &data);
+    _queue_unmark_waiter(queue, queues->mutex);
+    if (err != 0) {
+        return err;
+    }
+    else if (data == NULL) {
+        assert(!PyErr_Occurred());
+        return 0;
+    }
+
+    // Convert the data back to an object.
+    PyObject *obj = _PyCrossInterpreterData_NewObject(data);
+    if (obj == NULL) {
+        assert(PyErr_Occurred());
+        // It was allocated in queue_put(), so we free it.
+        (void)_release_xid_data(data, XID_IGNORE_EXC | XID_FREE);
+        return -1;
+    }
+    // It was allocated in queue_put(), so we free it.
+    int release_res = _release_xid_data(data, XID_FREE);
+    if (release_res < 0) {
+        // The source interpreter has been destroyed already.
+        assert(PyErr_Occurred());
+        Py_DECREF(obj);
+        return -1;
+    }
+
+    *res = obj;
+    return 0;
+}
+
+static int
+queue_get_maxsize(_queues *queues, int64_t qid, Py_ssize_t *p_maxsize)
+{
+    _queue *queue = NULL;
+    int err = _queues_lookup(queues, qid, &queue);
+    if (err < 0) {
+        return err;
+    }
+    err = _queue_get_maxsize(queue, p_maxsize);
+    _queue_unmark_waiter(queue, queues->mutex);
+    return err;
+}
+
+static int
+queue_is_full(_queues *queues, int64_t qid, int *p_is_full)
+{
+    _queue *queue = NULL;
+    int err = _queues_lookup(queues, qid, &queue);
+    if (err < 0) {
+        return err;
+    }
+    err = _queue_is_full(queue, p_is_full);
+    _queue_unmark_waiter(queue, queues->mutex);
+    return err;
+}
+
+static int
+queue_get_count(_queues *queues, int64_t qid, Py_ssize_t *p_count)
+{
+    _queue *queue = NULL;
+    int err = _queues_lookup(queues, qid, &queue);
+    if (err < 0) {
+        return err;
+    }
+    err = _queue_get_count(queue, p_count);
+    _queue_unmark_waiter(queue, queues->mutex);
+    return err;
+}
+
+
+/* external Queue objects ***************************************************/
+
+static int _queueobj_shared(PyThreadState *,
+                            PyObject *, _PyCrossInterpreterData *);
+
+static int
+set_external_queue_type(PyObject *module, PyTypeObject *queue_type)
+{
+    module_state *state = get_module_state(module);
+
+    if (state->queue_type != NULL) {
+        PyErr_SetString(PyExc_TypeError, "already registered");
+        return -1;
+    }
+    state->queue_type = (PyTypeObject *)Py_NewRef(queue_type);
+
+    if (_PyCrossInterpreterData_RegisterClass(queue_type, _queueobj_shared) < 0) {
+        return -1;
+    }
+
+    return 0;
+}
+
+static PyTypeObject *
+get_external_queue_type(PyObject *module)
+{
+    module_state *state = get_module_state(module);
+
+    PyTypeObject *cls = state->queue_type;
+    if (cls == NULL) {
+        // Force the module to be loaded, to register the type.
+        PyObject *highlevel = PyImport_ImportModule("interpreters.queue");
+        if (highlevel == NULL) {
+            PyErr_Clear();
+            highlevel = PyImport_ImportModule("test.support.interpreters.queue");
+            if (highlevel == NULL) {
+                return NULL;
+            }
+        }
+        Py_DECREF(highlevel);
+        cls = state->queue_type;
+        assert(cls != NULL);
+    }
+    return cls;
+}
+
+
+// XXX Use a new __xid__ protocol instead?
+
+struct _queueid_xid {
+    int64_t qid;
+};
+
+static _queues * _get_global_queues(void);
+
+static void *
+_queueid_xid_new(int64_t qid)
+{
+    _queues *queues = _get_global_queues();
+    if (_queues_incref(queues, qid) < 0) {
+        return NULL;
+    }
+
+    struct _queueid_xid *data = PyMem_RawMalloc(sizeof(struct _queueid_xid));
+    if (data == NULL) {
+        _queues_incref(queues, qid);
+        return NULL;
+    }
+    data->qid = qid;
+    return (void *)data;
+}
+
+static void
+_queueid_xid_free(void *data)
+{
+    int64_t qid = ((struct _queueid_xid *)data)->qid;
+    PyMem_RawFree(data);
+    _queues *queues = _get_global_queues();
+    _queues_decref(queues, qid);
+}
+
+static PyObject *
+_queueobj_from_xid(_PyCrossInterpreterData *data)
+{
+    int64_t qid = *(int64_t *)data->data;
+    PyObject *qidobj = PyLong_FromLongLong(qid);
+    if (qidobj == NULL) {
+        return NULL;
+    }
+
+    PyObject *mod = _get_current_module();
+    if (mod == NULL) {
+        // XXX import it?
+        PyErr_SetString(PyExc_RuntimeError,
+                        MODULE_NAME " module not imported yet");
+        return NULL;
+    }
+
+    PyTypeObject *cls = get_external_queue_type(mod);
+    Py_DECREF(mod);
+    if (cls == NULL) {
+        Py_DECREF(qidobj);
+        return NULL;
+    }
+    PyObject *obj = PyObject_CallOneArg((PyObject *)cls, (PyObject *)qidobj);
+    Py_DECREF(qidobj);
+    return obj;
+}
+
+static int
+_queueobj_shared(PyThreadState *tstate, PyObject *queueobj,
+                 _PyCrossInterpreterData *data)
+{
+    PyObject *qidobj = PyObject_GetAttrString(queueobj, "_id");
+    if (qidobj == NULL) {
+        return -1;
+    }
+    struct idarg_int64_converter_data converted = {
+        .label = "queue ID",
+    };
+    int res = idarg_int64_converter(qidobj, &converted);
+    Py_DECREF(qidobj);
+    if (!res) {
+        assert(PyErr_Occurred());
+        return -1;
+    }
+
+    void *raw = _queueid_xid_new(converted.id);
+    if (raw == NULL) {
+        Py_DECREF(qidobj);
+        return -1;
+    }
+    _PyCrossInterpreterData_Init(data, tstate->interp, raw, NULL,
+                                 _queueobj_from_xid);
+    Py_DECREF(qidobj);
+    data->free = _queueid_xid_free;
+    return 0;
+}
+
+
+/* module level code ********************************************************/
+
+/* globals is the process-global state for the module.  It holds all
+   the data that we need to share between interpreters, so it cannot
+   hold PyObject values. */
+static struct globals {
+    int module_count;
+    _queues queues;
+} _globals = {0};
+
+static int
+_globals_init(void)
+{
+    // XXX This isn't thread-safe.
+    _globals.module_count++;
+    if (_globals.module_count > 1) {
+        // Already initialized.
+        return 0;
+    }
+
+    assert(_globals.queues.mutex == NULL);
+    PyThread_type_lock mutex = PyThread_allocate_lock();
+    if (mutex == NULL) {
+        return ERR_QUEUES_ALLOC;
+    }
+    _queues_init(&_globals.queues, mutex);
+    return 0;
+}
+
+static void
+_globals_fini(void)
+{
+    // XXX This isn't thread-safe.
+    _globals.module_count--;
+    if (_globals.module_count > 0) {
+        return;
+    }
+
+    _queues_fini(&_globals.queues);
+}
+
+static _queues *
+_get_global_queues(void)
+{
+    return &_globals.queues;
+}
+
+
+static void
+clear_interpreter(void *data)
+{
+    if (_globals.module_count == 0) {
+        return;
+    }
+    PyInterpreterState *interp = (PyInterpreterState *)data;
+    assert(interp == _get_current_interp());
+    int64_t interpid = PyInterpreterState_GetID(interp);
+    _queues_clear_interpreter(&_globals.queues, interpid);
+}
+
+
+typedef struct idarg_int64_converter_data qidarg_converter_data;
+
+static int
+qidarg_converter(PyObject *arg, void *ptr)
+{
+    qidarg_converter_data *data = ptr;
+    if (data->label == NULL) {
+        data->label = "queue ID";
+    }
+    return idarg_int64_converter(arg, ptr);
+}
+
+
+static PyObject *
+queuesmod_create(PyObject *self, PyObject *args, PyObject *kwds)
+{
+    static char *kwlist[] = {"maxsize", NULL};
+    Py_ssize_t maxsize = -1;
+    if (!PyArg_ParseTupleAndKeywords(args, kwds, "|n:create", kwlist,
+                                     &maxsize)) {
+        return NULL;
+    }
+
+    int64_t qid = queue_create(&_globals.queues, maxsize);
+    if (qid < 0) {
+        (void)handle_queue_error((int)qid, self, qid);
+        return NULL;
+    }
+
+    PyObject *qidobj = PyLong_FromLongLong(qid);
+    if (qidobj == NULL) {
+        PyObject *exc = PyErr_GetRaisedException();
+        int err = queue_destroy(&_globals.queues, qid);
+        if (handle_queue_error(err, self, qid)) {
+            // XXX issue a warning?
+            PyErr_Clear();
+        }
+        PyErr_SetRaisedException(exc);
+        return NULL;
+    }
+
+    return qidobj;
+}
+
+PyDoc_STRVAR(queuesmod_create_doc,
+"create() -> qid\n\
+\n\
+Create a new cross-interpreter queue and return its unique generated ID.\n\
+It is a new reference as though bind() had been called on the queue.");
+
+static PyObject *
+queuesmod_destroy(PyObject *self, PyObject *args, PyObject *kwds)
+{
+    static char *kwlist[] = {"qid", NULL};
+    qidarg_converter_data qidarg;
+    if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&:destroy", kwlist,
+                                     qidarg_converter, &qidarg)) {
+        return NULL;
+    }
+    int64_t qid = qidarg.id;
+
+    int err = queue_destroy(&_globals.queues, qid);
+    if (handle_queue_error(err, self, qid)) {
+        return NULL;
+    }
+    Py_RETURN_NONE;
+}
+
+PyDoc_STRVAR(queuesmod_destroy_doc,
+"destroy(qid)\n\
+\n\
+Clear and destroy the queue.  Afterward attempts to use the queue\n\
+will behave as though it never existed.");
+
+static PyObject *
+queuesmod_list_all(PyObject *self, PyObject *Py_UNUSED(ignored))
+{
+    int64_t count = 0;
+    int64_t *qids = _queues_list_all(&_globals.queues, &count);
+    if (qids == NULL) {
+        if (count == 0) {
+            return PyList_New(0);
+        }
+        return NULL;
+    }
+    PyObject *ids = PyList_New((Py_ssize_t)count);
+    if (ids == NULL) {
+        goto finally;
+    }
+    int64_t *cur = qids;
+    for (int64_t i=0; i < count; cur++, i++) {
+        PyObject *qidobj = PyLong_FromLongLong(*cur);
+        if (qidobj == NULL) {
+            Py_SETREF(ids, NULL);
+            break;
+        }
+        PyList_SET_ITEM(ids, (Py_ssize_t)i, qidobj);
+    }
+
+finally:
+    PyMem_Free(qids);
+    return ids;
+}
+
+PyDoc_STRVAR(queuesmod_list_all_doc,
+"list_all() -> [qid]\n\
+\n\
+Return the list of IDs for all queues.");
+
+static PyObject *
+queuesmod_put(PyObject *self, PyObject *args, PyObject *kwds)
+{
+    static char *kwlist[] = {"qid", "obj", NULL};
+    qidarg_converter_data qidarg;
+    PyObject *obj;
+    if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&O:put", kwlist,
+                                     qidarg_converter, &qidarg, &obj)) {
+        return NULL;
+    }
+    int64_t qid = qidarg.id;
+
+    /* Queue up the object. */
+    int err = queue_put(&_globals.queues, qid, obj);
+    if (handle_queue_error(err, self, qid)) {
+        return NULL;
+    }
+
+    Py_RETURN_NONE;
+}
+
+PyDoc_STRVAR(queuesmod_put_doc,
+"put(qid, obj)\n\
+\n\
+Add the object's data to the queue.");
+
+static PyObject *
+queuesmod_get(PyObject *self, PyObject *args, PyObject *kwds)
+{
+    static char *kwlist[] = {"qid", "default", NULL};
+    qidarg_converter_data qidarg;
+    PyObject *dflt = NULL;
+    if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&|O:get", kwlist,
+                                     qidarg_converter, &qidarg, &dflt)) {
+        return NULL;
+    }
+    int64_t qid = qidarg.id;
+
+    PyObject *obj = NULL;
+    int err = queue_get(&_globals.queues, qid, &obj);
+    if (err == ERR_QUEUE_EMPTY && dflt != NULL) {
+        assert(obj == NULL);
+        obj = Py_NewRef(dflt);
+    }
+    else if (handle_queue_error(err, self, qid)) {
+        return NULL;
+    }
+    return obj;
+}
+
+PyDoc_STRVAR(queuesmod_get_doc,
+"get(qid, [default]) -> obj\n\
+\n\
+Return a new object from the data at the front of the queue.\n\
+\n\
+If there is nothing to receive then raise QueueEmpty, unless\n\
+a default value is provided.  In that case return it.");
+
+static PyObject *
+queuesmod_bind(PyObject *self, PyObject *args, PyObject *kwds)
+{
+    static char *kwlist[] = {"qid", NULL};
+    qidarg_converter_data qidarg;
+    if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&:bind", kwlist,
+                                     qidarg_converter, &qidarg)) {
+        return NULL;
+    }
+    int64_t qid = qidarg.id;
+
+    // XXX Check module state if bound already.
+
+    int err = _queues_incref(&_globals.queues, qid);
+    if (handle_queue_error(err, self, qid)) {
+        return NULL;
+    }
+
+    // XXX Update module state.
+
+    Py_RETURN_NONE;
+}
+
+PyDoc_STRVAR(queuesmod_bind_doc,
+"bind(qid)\n\
+\n\
+Take a reference to the identified queue.\n\
+The queue is not destroyed until there are no references left.");
+
+static PyObject *
+queuesmod_release(PyObject *self, PyObject *args, PyObject *kwds)
+{
+    // Note that only the current interpreter is affected.
+    static char *kwlist[] = {"qid", NULL};
+    qidarg_converter_data qidarg;
+    if (!PyArg_ParseTupleAndKeywords(args, kwds,
+                                     "O&:release", kwlist,
+                                     qidarg_converter, &qidarg)) {
+        return NULL;
+    }
+    int64_t qid = qidarg.id;
+
+    // XXX Check module state if bound already.
+    // XXX Update module state.
+
+    _queues_decref(&_globals.queues, qid);
+
+    Py_RETURN_NONE;
+}
+
+PyDoc_STRVAR(queuesmod_release_doc,
+"release(qid)\n\
+\n\
+Release a reference to the queue.\n\
+The queue is destroyed once there are no references left.");
+
+static PyObject *
+queuesmod_get_maxsize(PyObject *self, PyObject *args, PyObject *kwds)
+{
+    static char *kwlist[] = {"qid", NULL};
+    qidarg_converter_data qidarg;
+    if (!PyArg_ParseTupleAndKeywords(args, kwds,
+                                     "O&:get_maxsize", kwlist,
+                                     qidarg_converter, &qidarg)) {
+        return NULL;
+    }
+    int64_t qid = qidarg.id;
+
+    Py_ssize_t maxsize = -1;
+    int err = queue_get_maxsize(&_globals.queues, qid, &maxsize);
+    if (handle_queue_error(err, self, qid)) {
+        return NULL;
+    }
+    return PyLong_FromLongLong(maxsize);
+}
+
+PyDoc_STRVAR(queuesmod_get_maxsize_doc,
+"get_maxsize(qid)\n\
+\n\
+Return the maximum number of items in the queue.");
+
+static PyObject *
+queuesmod_is_full(PyObject *self, PyObject *args, PyObject *kwds)
+{
+    static char *kwlist[] = {"qid", NULL};
+    qidarg_converter_data qidarg;
+    if (!PyArg_ParseTupleAndKeywords(args, kwds,
+                                     "O&:is_full", kwlist,
+                                     qidarg_converter, &qidarg)) {
+        return NULL;
+    }
+    int64_t qid = qidarg.id;
+
+    int is_full;
+    int err = queue_is_full(&_globals.queues, qid, &is_full);
+    if (handle_queue_error(err, self, qid)) {
+        return NULL;
+    }
+    if (is_full) {
+        Py_RETURN_TRUE;
+    }
+    Py_RETURN_FALSE;
+}
+
+PyDoc_STRVAR(queuesmod_is_full_doc,
+"is_full(qid)\n\
+\n\
+Return true if the queue has a maxsize and has reached it.");
+
+static PyObject *
+queuesmod_get_count(PyObject *self, PyObject *args, PyObject *kwds)
+{
+    static char *kwlist[] = {"qid", NULL};
+    qidarg_converter_data qidarg;
+    if (!PyArg_ParseTupleAndKeywords(args, kwds,
+                                     "O&:get_count", kwlist,
+                                     qidarg_converter, &qidarg)) {
+        return NULL;
+    }
+    int64_t qid = qidarg.id;
+
+    Py_ssize_t count = -1;
+    int err = queue_get_count(&_globals.queues, qid, &count);
+    if (handle_queue_error(err, self, qid)) {
+        return NULL;
+    }
+    assert(count >= 0);
+    return PyLong_FromSsize_t(count);
+}
+
+PyDoc_STRVAR(queuesmod_get_count_doc,
+"get_count(qid)\n\
+\n\
+Return the number of items in the queue.");
+
+static PyObject *
+queuesmod__register_queue_type(PyObject *self, PyObject *args, PyObject *kwds)
+{
+    static char *kwlist[] = {"queuetype", NULL};
+    PyObject *queuetype;
+    if (!PyArg_ParseTupleAndKeywords(args, kwds,
+                                     "O:_register_queue_type", kwlist,
+                                     &queuetype)) {
+        return NULL;
+    }
+    if (!PyType_Check(queuetype)) {
+        PyErr_SetString(PyExc_TypeError, "expected a type for 'queuetype'");
+        return NULL;
+    }
+    PyTypeObject *cls_queue = (PyTypeObject *)queuetype;
+
+    if (set_external_queue_type(self, cls_queue) < 0) {
+        return NULL;
+    }
+
+    Py_RETURN_NONE;
+}
+
+static PyMethodDef module_functions[] = {
+    {"create",                     _PyCFunction_CAST(queuesmod_create),
+     METH_VARARGS | METH_KEYWORDS, queuesmod_create_doc},
+    {"destroy",                    _PyCFunction_CAST(queuesmod_destroy),
+     METH_VARARGS | METH_KEYWORDS, queuesmod_destroy_doc},
+    {"list_all",                   queuesmod_list_all,
+     METH_NOARGS,                  queuesmod_list_all_doc},
+    {"put",                       _PyCFunction_CAST(queuesmod_put),
+     METH_VARARGS | METH_KEYWORDS, queuesmod_put_doc},
+    {"get",                       _PyCFunction_CAST(queuesmod_get),
+     METH_VARARGS | METH_KEYWORDS, queuesmod_get_doc},
+    {"bind",                      _PyCFunction_CAST(queuesmod_bind),
+     METH_VARARGS | METH_KEYWORDS, queuesmod_bind_doc},
+    {"release",                    _PyCFunction_CAST(queuesmod_release),
+     METH_VARARGS | METH_KEYWORDS, queuesmod_release_doc},
+    {"get_maxsize",                _PyCFunction_CAST(queuesmod_get_maxsize),
+     METH_VARARGS | METH_KEYWORDS, queuesmod_get_maxsize_doc},
+    {"is_full",                    _PyCFunction_CAST(queuesmod_is_full),
+     METH_VARARGS | METH_KEYWORDS, queuesmod_is_full_doc},
+    {"get_count",                  _PyCFunction_CAST(queuesmod_get_count),
+     METH_VARARGS | METH_KEYWORDS, queuesmod_get_count_doc},
+    {"_register_queue_type",       _PyCFunction_CAST(queuesmod__register_queue_type),
+     METH_VARARGS | METH_KEYWORDS, NULL},
+
+    {NULL,                        NULL}           /* sentinel */
+};
+
+
+/* initialization function */
+
+PyDoc_STRVAR(module_doc,
+"This module provides primitive operations to manage Python interpreters.\n\
+The 'interpreters' module provides a more convenient interface.");
+
+static int
+module_exec(PyObject *mod)
+{
+    if (_globals_init() != 0) {
+        return -1;
+    }
+
+    /* Add exception types */
+    if (add_QueueError(mod) < 0) {
+        goto error;
+    }
+
+    /* Make sure queues drop objects owned by this interpreter. */
+    PyInterpreterState *interp = _get_current_interp();
+    PyUnstable_AtExit(interp, clear_interpreter, (void *)interp);
+
+    return 0;
+
+error:
+    _globals_fini();
+    return -1;
+}
+
+static struct PyModuleDef_Slot module_slots[] = {
+    {Py_mod_exec, module_exec},
+    {Py_mod_multiple_interpreters, Py_MOD_PER_INTERPRETER_GIL_SUPPORTED},
+    {0, NULL},
+};
+
+static int
+module_traverse(PyObject *mod, visitproc visit, void *arg)
+{
+    module_state *state = get_module_state(mod);
+    traverse_module_state(state, visit, arg);
+    return 0;
+}
+
+static int
+module_clear(PyObject *mod)
+{
+    module_state *state = get_module_state(mod);
+
+    if (state->queue_type != NULL) {
+        (void)_PyCrossInterpreterData_UnregisterClass(state->queue_type);
+    }
+
+    // Now we clear the module state.
+    clear_module_state(state);
+    return 0;
+}
+
+static void
+module_free(void *mod)
+{
+    module_state *state = get_module_state(mod);
+
+    // Now we clear the module state.
+    clear_module_state(state);
+
+    _globals_fini();
+}
+
+static struct PyModuleDef moduledef = {
+    .m_base = PyModuleDef_HEAD_INIT,
+    .m_name = MODULE_NAME,
+    .m_doc = module_doc,
+    .m_size = sizeof(module_state),
+    .m_methods = module_functions,
+    .m_slots = module_slots,
+    .m_traverse = module_traverse,
+    .m_clear = module_clear,
+    .m_free = (freefunc)module_free,
+};
+
+PyMODINIT_FUNC
+PyInit__xxinterpqueues(void)
+{
+    return PyModuleDef_Init(&moduledef);
+}
index da2bde640961e0bb8aa69cfa24790b7947f7d97b..f754ce6d3b057b50ad37afb7a6f06da052c891bf 100644 (file)
@@ -37,6 +37,7 @@ extern PyObject* PyInit__weakref(void);
 extern PyObject* PyInit_xxsubtype(void);
 extern PyObject* PyInit__xxsubinterpreters(void);
 extern PyObject* PyInit__xxinterpchannels(void);
+extern PyObject* PyInit__xxinterpqueues(void);
 extern PyObject* PyInit__random(void);
 extern PyObject* PyInit_itertools(void);
 extern PyObject* PyInit__collections(void);
@@ -142,6 +143,7 @@ struct _inittab _PyImport_Inittab[] = {
     {"xxsubtype", PyInit_xxsubtype},
     {"_xxsubinterpreters", PyInit__xxsubinterpreters},
     {"_xxinterpchannels", PyInit__xxinterpchannels},
+    {"_xxinterpqueues", PyInit__xxinterpqueues},
 #ifdef _Py_HAVE_ZLIB
     {"zlib", PyInit_zlib},
 #endif
index 278f1f5622543c336048a3ff878003ea0e03cfb2..778fc834c0db9c1f88e34cc03165c1004c5c1cef 100644 (file)
     <ClCompile Include="..\Modules\xxsubtype.c" />
     <ClCompile Include="..\Modules\_xxsubinterpretersmodule.c" />
     <ClCompile Include="..\Modules\_xxinterpchannelsmodule.c" />
+    <ClCompile Include="..\Modules\_xxinterpqueuesmodule.c" />
     <ClCompile Include="..\Modules\_io\fileio.c" />
     <ClCompile Include="..\Modules\_io\bytesio.c" />
     <ClCompile Include="..\Modules\_io\stringio.c" />
index c9b34c64fbf75f7563d3edf11eb05caf6a59c5ea..a96ca24cf08b6672aedddc3caa771f86759a42f9 100644 (file)
     <ClCompile Include="..\Modules\_xxinterpchannelsmodule.c">
       <Filter>Modules</Filter>
     </ClCompile>
+    <ClCompile Include="..\Modules\_xxinterpqueuesmodule.c">
+      <Filter>Modules</Filter>
+    </ClCompile>
     <ClCompile Include="..\Parser\string_parser.c">
       <Filter>Parser</Filter>
     </ClCompile>
index 766a85d3d6f39eaeb9f2b19d5147f1e39a523b7b..5dce4e042d1eb445302f2409ab13cc88daea0807 100644 (file)
@@ -36,6 +36,7 @@ IGNORE = {
     '_testsinglephase',
     '_xxsubinterpreters',
     '_xxinterpchannels',
+    '_xxinterpqueues',
     '_xxtestfuzz',
     'idlelib.idle_test',
     'test',
index ff6e1ef4f993bae89805a4ab16e1e12675eb03a3..2f9e80d6ab67371bf8d51b2631792621edb8b038 100644 (file)
@@ -165,6 +165,7 @@ Python/pylifecycle.c        fatal_error     reentrant       -
 
 # explicitly protected, internal-only
 Modules/_xxinterpchannelsmodule.c      -       _globals        -
+Modules/_xxinterpqueuesmodule.c        -       _globals        -
 
 # set once during module init
 Modules/_decimal/_decimal.c    -       minalloc_is_set -
index cad3bce0c7de87e97cf173e62a599f9f5fbbc8fa..668a0efd77db0e6cb785b5a02152359e9f5be7ee 100755 (executable)
--- a/configure
+++ b/configure
@@ -769,6 +769,8 @@ MODULE__MULTIPROCESSING_FALSE
 MODULE__MULTIPROCESSING_TRUE
 MODULE__ZONEINFO_FALSE
 MODULE__ZONEINFO_TRUE
+MODULE__XXINTERPQUEUES_FALSE
+MODULE__XXINTERPQUEUES_TRUE
 MODULE__XXINTERPCHANNELS_FALSE
 MODULE__XXINTERPCHANNELS_TRUE
 MODULE__XXSUBINTERPRETERS_FALSE
@@ -28025,6 +28027,7 @@ case $ac_sys_system in #(
     py_cv_module__tkinter=n/a
     py_cv_module__xxsubinterpreters=n/a
     py_cv_module__xxinterpchannels=n/a
+    py_cv_module__xxinterpqueues=n/a
     py_cv_module_grp=n/a
     py_cv_module_pwd=n/a
     py_cv_module_resource=n/a
@@ -28524,6 +28527,28 @@ then :
 
 
 
+fi
+
+
+        if test "$py_cv_module__xxinterpqueues" != "n/a"
+then :
+  py_cv_module__xxinterpqueues=yes
+fi
+   if test "$py_cv_module__xxinterpqueues" = yes; then
+  MODULE__XXINTERPQUEUES_TRUE=
+  MODULE__XXINTERPQUEUES_FALSE='#'
+else
+  MODULE__XXINTERPQUEUES_TRUE='#'
+  MODULE__XXINTERPQUEUES_FALSE=
+fi
+
+  as_fn_append MODULE_BLOCK "MODULE__XXINTERPQUEUES_STATE=$py_cv_module__xxinterpqueues$as_nl"
+  if test "x$py_cv_module__xxinterpqueues" = xyes
+then :
+
+
+
+
 fi
 
 
@@ -30760,6 +30785,10 @@ if test -z "${MODULE__XXINTERPCHANNELS_TRUE}" && test -z "${MODULE__XXINTERPCHAN
   as_fn_error $? "conditional \"MODULE__XXINTERPCHANNELS\" was never defined.
 Usually this means the macro was only invoked conditionally." "$LINENO" 5
 fi
+if test -z "${MODULE__XXINTERPQUEUES_TRUE}" && test -z "${MODULE__XXINTERPQUEUES_FALSE}"; then
+  as_fn_error $? "conditional \"MODULE__XXINTERPQUEUES\" was never defined.
+Usually this means the macro was only invoked conditionally." "$LINENO" 5
+fi
 if test -z "${MODULE__ZONEINFO_TRUE}" && test -z "${MODULE__ZONEINFO_FALSE}"; then
   as_fn_error $? "conditional \"MODULE__ZONEINFO\" was never defined.
 Usually this means the macro was only invoked conditionally." "$LINENO" 5
index 7dda0b3fff95beec67229516789e86be8bb813e4..020553abd71b4fe8d1130f58769eb1e21e93f7bc 100644 (file)
@@ -7120,6 +7120,7 @@ AS_CASE([$ac_sys_system],
       [_tkinter],
       [_xxsubinterpreters],
       [_xxinterpchannels],
+      [_xxinterpqueues],
       [grp],
       [pwd],
       [resource],
@@ -7236,6 +7237,7 @@ PY_STDLIB_MOD_SIMPLE([_struct])
 PY_STDLIB_MOD_SIMPLE([_typing])
 PY_STDLIB_MOD_SIMPLE([_xxsubinterpreters])
 PY_STDLIB_MOD_SIMPLE([_xxinterpchannels])
+PY_STDLIB_MOD_SIMPLE([_xxinterpqueues])
 PY_STDLIB_MOD_SIMPLE([_zoneinfo])
 
 dnl multiprocessing modules