]> git.ipfire.org Git - thirdparty/Python/cpython.git/commitdiff
gh-132775: Use _PyObject_GetXIData (With Fallback) (gh-134440)
authorEric Snow <ericsnowcurrently@gmail.com>
Thu, 22 May 2025 12:50:06 +0000 (06:50 -0600)
committerGitHub <noreply@github.com>
Thu, 22 May 2025 12:50:06 +0000 (06:50 -0600)
This change includes some semi-related refactoring of queues and channels.

Lib/concurrent/futures/interpreter.py
Lib/test/support/interpreters/channels.py
Lib/test/support/interpreters/queues.py
Lib/test/test__interpchannels.py
Lib/test/test_interpreters/test_channels.py
Lib/test/test_interpreters/test_queues.py
Modules/_interpchannelsmodule.c
Modules/_interpqueuesmodule.c
Modules/_interpreters_common.h
Python/crossinterp.c

index d17688dc9d7346269caf9245caa2ca608747be3d..a2c4fbfd3fb831caa791ea45364e91517938d9fd 100644 (file)
@@ -36,9 +36,6 @@ Uncaught in the interpreter:
                 """.strip())
 
 
-UNBOUND = 2  # error; this should not happen.
-
-
 class WorkerContext(_thread.WorkerContext):
 
     @classmethod
@@ -47,23 +44,13 @@ class WorkerContext(_thread.WorkerContext):
             if isinstance(fn, str):
                 # XXX Circle back to this later.
                 raise TypeError('scripts not supported')
-                if args or kwargs:
-                    raise ValueError(f'a script does not take args or kwargs, got {args!r} and {kwargs!r}')
-                data = textwrap.dedent(fn)
-                kind = 'script'
-                # Make sure the script compiles.
-                # Ideally we wouldn't throw away the resulting code
-                # object.  However, there isn't much to be done until
-                # code objects are shareable and/or we do a better job
-                # of supporting code objects in _interpreters.exec().
-                compile(data, '<string>', 'exec')
             else:
                 # Functions defined in the __main__ module can't be pickled,
                 # so they can't be used here.  In the future, we could possibly
                 # borrow from multiprocessing to work around this.
-                data = pickle.dumps((fn, args, kwargs))
-                kind = 'function'
-            return (data, kind)
+                task = (fn, args, kwargs)
+                data = pickle.dumps(task)
+            return data
 
         if initializer is not None:
             try:
@@ -86,24 +73,20 @@ class WorkerContext(_thread.WorkerContext):
         except BaseException as exc:
             # Send the captured exception out on the results queue,
             # but still leave it unhandled for the interpreter to handle.
-            err = pickle.dumps(exc)
-            _interpqueues.put(resultsid, (None, err), 1, UNBOUND)
+            _interpqueues.put(resultsid, (None, exc))
             raise  # re-raise
 
     @classmethod
     def _send_script_result(cls, resultsid):
-        _interpqueues.put(resultsid, (None, None), 0, UNBOUND)
+        _interpqueues.put(resultsid, (None, None))
 
     @classmethod
     def _call(cls, func, args, kwargs, resultsid):
         with cls._capture_exc(resultsid):
             res = func(*args or (), **kwargs or {})
         # Send the result back.
-        try:
-            _interpqueues.put(resultsid, (res, None), 0, UNBOUND)
-        except _interpreters.NotShareableError:
-            res = pickle.dumps(res)
-            _interpqueues.put(resultsid, (res, None), 1, UNBOUND)
+        with cls._capture_exc(resultsid):
+            _interpqueues.put(resultsid, (res, None))
 
     @classmethod
     def _call_pickled(cls, pickled, resultsid):
@@ -134,8 +117,7 @@ class WorkerContext(_thread.WorkerContext):
             _interpreters.incref(self.interpid)
 
             maxsize = 0
-            fmt = 0
-            self.resultsid = _interpqueues.create(maxsize, fmt, UNBOUND)
+            self.resultsid = _interpqueues.create(maxsize)
 
             self._exec(f'from {__name__} import WorkerContext')
 
@@ -166,17 +148,8 @@ class WorkerContext(_thread.WorkerContext):
                 pass
 
     def run(self, task):
-        data, kind = task
-        if kind == 'script':
-            raise NotImplementedError('script kind disabled')
-            script = f"""
-with WorkerContext._capture_exc({self.resultsid}):
-{textwrap.indent(data, '    ')}
-WorkerContext._send_script_result({self.resultsid})"""
-        elif kind == 'function':
-            script = f'WorkerContext._call_pickled({data!r}, {self.resultsid})'
-        else:
-            raise NotImplementedError(kind)
+        data = task
+        script = f'WorkerContext._call_pickled({data!r}, {self.resultsid})'
 
         try:
             self._exec(script)
@@ -199,15 +172,13 @@ WorkerContext._send_script_result({self.resultsid})"""
                 continue
             else:
                 break
-        (res, excdata), pickled, unboundop = obj
+        (res, exc), unboundop = obj
         assert unboundop is None, unboundop
-        if excdata is not None:
+        if exc is not None:
             assert res is None, res
-            assert pickled
             assert exc_wrapper is not None
-            exc = pickle.loads(excdata)
             raise exc from exc_wrapper
-        return pickle.loads(res) if pickled else res
+        return res
 
 
 class BrokenInterpreterPool(_thread.BrokenThreadPool):
index d2bd93d77f71699ef1aae610f541d8fc0bb94deb..7a2bd7d63f808f090155b918571174283b370c29 100644 (file)
@@ -55,15 +55,23 @@ def create(*, unbounditems=UNBOUND):
     """
     unbound = _serialize_unbound(unbounditems)
     unboundop, = unbound
-    cid = _channels.create(unboundop)
-    recv, send = RecvChannel(cid), SendChannel(cid, _unbound=unbound)
+    cid = _channels.create(unboundop, -1)
+    recv, send = RecvChannel(cid), SendChannel(cid)
+    send._set_unbound(unboundop, unbounditems)
     return recv, send
 
 
 def list_all():
     """Return a list of (recv, send) for all open channels."""
-    return [(RecvChannel(cid), SendChannel(cid, _unbound=unbound))
-            for cid, unbound in _channels.list_all()]
+    channels = []
+    for cid, unboundop, _ in _channels.list_all():
+        chan = _, send = RecvChannel(cid), SendChannel(cid)
+        if not hasattr(send, '_unboundop'):
+            send._set_unbound(unboundop)
+        else:
+            assert send._unbound[0] == op
+        channels.append(chan)
+    return channels
 
 
 class _ChannelEnd:
@@ -175,16 +183,33 @@ class SendChannel(_ChannelEnd):
 
     _end = 'send'
 
-    def __new__(cls, cid, *, _unbound=None):
-        if _unbound is None:
-            try:
-                op = _channels.get_channel_defaults(cid)
-                _unbound = (op,)
-            except ChannelNotFoundError:
-                _unbound = _serialize_unbound(UNBOUND)
-        self = super().__new__(cls, cid)
-        self._unbound = _unbound
-        return self
+#    def __new__(cls, cid, *, _unbound=None):
+#        if _unbound is None:
+#            try:
+#                op = _channels.get_channel_defaults(cid)
+#                _unbound = (op,)
+#            except ChannelNotFoundError:
+#                _unbound = _serialize_unbound(UNBOUND)
+#        self = super().__new__(cls, cid)
+#        self._unbound = _unbound
+#        return self
+
+    def _set_unbound(self, op, items=None):
+        assert not hasattr(self, '_unbound')
+        if items is None:
+            items = _resolve_unbound(op)
+        unbound = (op, items)
+        self._unbound = unbound
+        return unbound
+
+    @property
+    def unbounditems(self):
+        try:
+            _, items = self._unbound
+        except AttributeError:
+            op, _ = _channels.get_queue_defaults(self._id)
+            _, items = self._set_unbound(op)
+        return items
 
     @property
     def is_closed(self):
@@ -192,61 +217,61 @@ class SendChannel(_ChannelEnd):
         return info.closed or info.closing
 
     def send(self, obj, timeout=None, *,
-             unbound=None,
+             unbounditems=None,
              ):
         """Send the object (i.e. its data) to the channel's receiving end.
 
         This blocks until the object is received.
         """
-        if unbound is None:
-            unboundop, = self._unbound
+        if unbounditems is None:
+            unboundop = -1
         else:
-            unboundop, = _serialize_unbound(unbound)
+            unboundop, = _serialize_unbound(unbounditems)
         _channels.send(self._id, obj, unboundop, timeout=timeout, blocking=True)
 
     def send_nowait(self, obj, *,
-                    unbound=None,
+                    unbounditems=None,
                     ):
         """Send the object to the channel's receiving end.
 
         If the object is immediately received then return True
         (else False).  Otherwise this is the same as send().
         """
-        if unbound is None:
-            unboundop, = self._unbound
+        if unbounditems is None:
+            unboundop = -1
         else:
-            unboundop, = _serialize_unbound(unbound)
+            unboundop, = _serialize_unbound(unbounditems)
         # XXX Note that at the moment channel_send() only ever returns
         # None.  This should be fixed when channel_send_wait() is added.
         # See bpo-32604 and gh-19829.
         return _channels.send(self._id, obj, unboundop, blocking=False)
 
     def send_buffer(self, obj, timeout=None, *,
-                    unbound=None,
+                    unbounditems=None,
                     ):
         """Send the object's buffer to the channel's receiving end.
 
         This blocks until the object is received.
         """
-        if unbound is None:
-            unboundop, = self._unbound
+        if unbounditems is None:
+            unboundop = -1
         else:
-            unboundop, = _serialize_unbound(unbound)
+            unboundop, = _serialize_unbound(unbounditems)
         _channels.send_buffer(self._id, obj, unboundop,
                               timeout=timeout, blocking=True)
 
     def send_buffer_nowait(self, obj, *,
-                           unbound=None,
+                           unbounditems=None,
                            ):
         """Send the object's buffer to the channel's receiving end.
 
         If the object is immediately received then return True
         (else False).  Otherwise this is the same as send().
         """
-        if unbound is None:
-            unboundop, = self._unbound
+        if unbounditems is None:
+            unboundop = -1
         else:
-            unboundop, = _serialize_unbound(unbound)
+            unboundop, = _serialize_unbound(unbounditems)
         return _channels.send_buffer(self._id, obj, unboundop, blocking=False)
 
     def close(self):
index deb8e8613af731b476cdfa59bb9c41c2b75b9e0d..d6a3197d9e0e26d86315452f4dad82ca5c2f9bf3 100644 (file)
@@ -63,29 +63,34 @@ def _resolve_unbound(flag):
     return resolved
 
 
-def create(maxsize=0, *, syncobj=False, unbounditems=UNBOUND):
+def create(maxsize=0, *, unbounditems=UNBOUND):
     """Return a new cross-interpreter queue.
 
     The queue may be used to pass data safely between interpreters.
 
-    "syncobj" sets the default for Queue.put()
-    and Queue.put_nowait().
-
-    "unbounditems" likewise sets the default.  See Queue.put() for
+    "unbounditems" sets the default for Queue.put(); see that method for
     supported values.  The default value is UNBOUND, which replaces
     the unbound item.
     """
-    fmt = _SHARED_ONLY if syncobj else _PICKLED
     unbound = _serialize_unbound(unbounditems)
     unboundop, = unbound
-    qid = _queues.create(maxsize, fmt, unboundop)
-    return Queue(qid, _fmt=fmt, _unbound=unbound)
+    qid = _queues.create(maxsize, unboundop, -1)
+    self = Queue(qid)
+    self._set_unbound(unboundop, unbounditems)
+    return self
 
 
 def list_all():
     """Return a list of all open queues."""
-    return [Queue(qid, _fmt=fmt, _unbound=(unboundop,))
-            for qid, fmt, unboundop in _queues.list_all()]
+    queues = []
+    for qid, unboundop, _ in _queues.list_all():
+        self = Queue(qid)
+        if not hasattr(self, '_unbound'):
+            self._set_unbound(unboundop)
+        else:
+            assert self._unbound[0] == unboundop
+        queues.append(self)
+    return queues
 
 
 _known_queues = weakref.WeakValueDictionary()
@@ -93,28 +98,17 @@ _known_queues = weakref.WeakValueDictionary()
 class Queue:
     """A cross-interpreter queue."""
 
-    def __new__(cls, id, /, *, _fmt=None, _unbound=None):
+    def __new__(cls, id, /):
         # There is only one instance for any given ID.
         if isinstance(id, int):
             id = int(id)
         else:
             raise TypeError(f'id must be an int, got {id!r}')
-        if _fmt is None:
-            if _unbound is None:
-                _fmt, op = _queues.get_queue_defaults(id)
-                _unbound = (op,)
-            else:
-                _fmt, _ = _queues.get_queue_defaults(id)
-        elif _unbound is None:
-            _, op = _queues.get_queue_defaults(id)
-            _unbound = (op,)
         try:
             self = _known_queues[id]
         except KeyError:
             self = super().__new__(cls)
             self._id = id
-            self._fmt = _fmt
-            self._unbound = _unbound
             _known_queues[id] = self
             _queues.bind(id)
         return self
@@ -143,10 +137,27 @@ class Queue:
     def __getstate__(self):
         return None
 
+    def _set_unbound(self, op, items=None):
+        assert not hasattr(self, '_unbound')
+        if items is None:
+            items = _resolve_unbound(op)
+        unbound = (op, items)
+        self._unbound = unbound
+        return unbound
+
     @property
     def id(self):
         return self._id
 
+    @property
+    def unbounditems(self):
+        try:
+            _, items = self._unbound
+        except AttributeError:
+            op, _ = _queues.get_queue_defaults(self._id)
+            _, items = self._set_unbound(op)
+        return items
+
     @property
     def maxsize(self):
         try:
@@ -165,77 +176,56 @@ class Queue:
         return _queues.get_count(self._id)
 
     def put(self, obj, timeout=None, *,
-            syncobj=None,
-            unbound=None,
+            unbounditems=None,
             _delay=10 / 1000,  # 10 milliseconds
             ):
         """Add the object to the queue.
 
         This blocks while the queue is full.
 
-        If "syncobj" is None (the default) then it uses the
-        queue's default, set with create_queue().
-
-        If "syncobj" is false then all objects are supported,
-        at the expense of worse performance.
-
-        If "syncobj" is true then the object must be "shareable".
-        Examples of "shareable" objects include the builtin singletons,
-        str, and memoryview.  One benefit is that such objects are
-        passed through the queue efficiently.
-
-        The key difference, though, is conceptual: the corresponding
-        object returned from Queue.get() will be strictly equivalent
-        to the given obj.  In other words, the two objects will be
-        effectively indistinguishable from each other, even if the
-        object is mutable.  The received object may actually be the
-        same object, or a copy (immutable values only), or a proxy.
-        Regardless, the received object should be treated as though
-        the original has been shared directly, whether or not it
-        actually is.  That's a slightly different and stronger promise
-        than just (initial) equality, which is all "syncobj=False"
-        can promise.
-
-        "unbound" controls the behavior of Queue.get() for the given
+        For most objects, the object received through Queue.get() will
+        be a new one, equivalent to the original and not sharing any
+        actual underlying data.  The notable exceptions include
+        cross-interpreter types (like Queue) and memoryview, where the
+        underlying data is actually shared.  Furthermore, some types
+        can be sent through a queue more efficiently than others.  This
+        group includes various immutable types like int, str, bytes, and
+        tuple (if the items are likewise efficiently shareable).  See interpreters.is_shareable().
+
+        "unbounditems" controls the behavior of Queue.get() for the given
         object if the current interpreter (calling put()) is later
         destroyed.
 
-        If "unbound" is None (the default) then it uses the
+        If "unbounditems" is None (the default) then it uses the
         queue's default, set with create_queue(),
         which is usually UNBOUND.
 
-        If "unbound" is UNBOUND_ERROR then get() will raise an
+        If "unbounditems" is UNBOUND_ERROR then get() will raise an
         ItemInterpreterDestroyed exception if the original interpreter
         has been destroyed.  This does not otherwise affect the queue;
         the next call to put() will work like normal, returning the next
         item in the queue.
 
-        If "unbound" is UNBOUND_REMOVE then the item will be removed
+        If "unbounditems" is UNBOUND_REMOVE then the item will be removed
         from the queue as soon as the original interpreter is destroyed.
         Be aware that this will introduce an imbalance between put()
         and get() calls.
 
-        If "unbound" is UNBOUND then it is returned by get() in place
+        If "unbounditems" is UNBOUND then it is returned by get() in place
         of the unbound item.
         """
-        if syncobj is None:
-            fmt = self._fmt
-        else:
-            fmt = _SHARED_ONLY if syncobj else _PICKLED
-        if unbound is None:
-            unboundop, = self._unbound
+        if unbounditems is None:
+            unboundop = -1
         else:
-            unboundop, = _serialize_unbound(unbound)
+            unboundop, = _serialize_unbound(unbounditems)
         if timeout is not None:
             timeout = int(timeout)
             if timeout < 0:
                 raise ValueError(f'timeout value must be non-negative')
             end = time.time() + timeout
-        if fmt is _PICKLED:
-            obj = pickle.dumps(obj)
         while True:
             try:
-                _queues.put(self._id, obj, fmt, unboundop)
+                _queues.put(self._id, obj, unboundop)
             except QueueFull as exc:
                 if timeout is not None and time.time() >= end:
                     raise  # re-raise
@@ -243,18 +233,12 @@ class Queue:
             else:
                 break
 
-    def put_nowait(self, obj, *, syncobj=None, unbound=None):
-        if syncobj is None:
-            fmt = self._fmt
+    def put_nowait(self, obj, *, unbounditems=None):
+        if unbounditems is None:
+            unboundop = -1
         else:
-            fmt = _SHARED_ONLY if syncobj else _PICKLED
-        if unbound is None:
-            unboundop, = self._unbound
-        else:
-            unboundop, = _serialize_unbound(unbound)
-        if fmt is _PICKLED:
-            obj = pickle.dumps(obj)
-        _queues.put(self._id, obj, fmt, unboundop)
+            unboundop, = _serialize_unbound(unbounditems)
+        _queues.put(self._id, obj, unboundop)
 
     def get(self, timeout=None, *,
             _delay=10 / 1000,  # 10 milliseconds
@@ -265,7 +249,7 @@ class Queue:
 
         If the next item's original interpreter has been destroyed
         then the "next object" is determined by the value of the
-        "unbound" argument to put().
+        "unbounditems" argument to put().
         """
         if timeout is not None:
             timeout = int(timeout)
@@ -274,7 +258,7 @@ class Queue:
             end = time.time() + timeout
         while True:
             try:
-                obj, fmt, unboundop = _queues.get(self._id)
+                obj, unboundop = _queues.get(self._id)
             except QueueEmpty as exc:
                 if timeout is not None and time.time() >= end:
                     raise  # re-raise
@@ -284,10 +268,6 @@ class Queue:
         if unboundop is not None:
             assert obj is None, repr(obj)
             return _resolve_unbound(unboundop)
-        if fmt == _PICKLED:
-            obj = pickle.loads(obj)
-        else:
-            assert fmt == _SHARED_ONLY
         return obj
 
     def get_nowait(self):
@@ -297,16 +277,12 @@ class Queue:
         is the same as get().
         """
         try:
-            obj, fmt, unboundop = _queues.get(self._id)
+            obj, unboundop = _queues.get(self._id)
         except QueueEmpty as exc:
             raise  # re-raise
         if unboundop is not None:
             assert obj is None, repr(obj)
             return _resolve_unbound(unboundop)
-        if fmt == _PICKLED:
-            obj = pickle.loads(obj)
-        else:
-            assert fmt == _SHARED_ONLY
         return obj
 
 
index e4c1ad854514ed7b5f1fc6ddeb2f6b53c54db65c..88eee03a3de93ad01912262263c4fd2dd09e540e 100644 (file)
@@ -247,7 +247,7 @@ def _run_action(cid, action, end, state):
 
 
 def clean_up_channels():
-    for cid, _ in _channels.list_all():
+    for cid, _, _ in _channels.list_all():
         try:
             _channels.destroy(cid)
         except _channels.ChannelNotFoundError:
@@ -373,11 +373,11 @@ class ChannelTests(TestBase):
         self.assertIsInstance(cid, _channels.ChannelID)
 
     def test_sequential_ids(self):
-        before = [cid for cid, _ in _channels.list_all()]
+        before = [cid for cid, _, _ in _channels.list_all()]
         id1 = _channels.create(REPLACE)
         id2 = _channels.create(REPLACE)
         id3 = _channels.create(REPLACE)
-        after = [cid for cid, _ in _channels.list_all()]
+        after = [cid for cid, _, _ in _channels.list_all()]
 
         self.assertEqual(id2, int(id1) + 1)
         self.assertEqual(id3, int(id2) + 1)
index eada18f99d04db011d509207291f9382f6dfc53b..0c027b17cea68c537dd4426ac75500202953b976 100644 (file)
@@ -377,11 +377,11 @@ class TestSendRecv(TestBase):
             if not unbound:
                 extraargs = ''
             elif unbound is channels.UNBOUND:
-                extraargs = ', unbound=channels.UNBOUND'
+                extraargs = ', unbounditems=channels.UNBOUND'
             elif unbound is channels.UNBOUND_ERROR:
-                extraargs = ', unbound=channels.UNBOUND_ERROR'
+                extraargs = ', unbounditems=channels.UNBOUND_ERROR'
             elif unbound is channels.UNBOUND_REMOVE:
-                extraargs = ', unbound=channels.UNBOUND_REMOVE'
+                extraargs = ', unbounditems=channels.UNBOUND_REMOVE'
             else:
                 raise NotImplementedError(repr(unbound))
             interp = interpreters.create()
@@ -454,11 +454,11 @@ class TestSendRecv(TestBase):
             with self.assertRaises(channels.ChannelEmptyError):
                 rch.recv_nowait()
 
-            sch.send_nowait(b'ham', unbound=channels.UNBOUND_REMOVE)
+            sch.send_nowait(b'ham', unbounditems=channels.UNBOUND_REMOVE)
             self.assertEqual(_channels.get_count(rch.id), 1)
             interp = common(rch, sch, channels.UNBOUND_REMOVE, 1)
             self.assertEqual(_channels.get_count(rch.id), 3)
-            sch.send_nowait(42, unbound=channels.UNBOUND_REMOVE)
+            sch.send_nowait(42, unbounditems=channels.UNBOUND_REMOVE)
             self.assertEqual(_channels.get_count(rch.id), 4)
             del interp
             self.assertEqual(_channels.get_count(rch.id), 2)
@@ -484,11 +484,11 @@ class TestSendRecv(TestBase):
         _run_output(interp, dedent(f"""
             from test.support.interpreters import channels
             sch = channels.SendChannel({sch.id})
-            sch.send_nowait(1, unbound=channels.UNBOUND)
-            sch.send_nowait(2, unbound=channels.UNBOUND_ERROR)
+            sch.send_nowait(1, unbounditems=channels.UNBOUND)
+            sch.send_nowait(2, unbounditems=channels.UNBOUND_ERROR)
             sch.send_nowait(3)
-            sch.send_nowait(4, unbound=channels.UNBOUND_REMOVE)
-            sch.send_nowait(5, unbound=channels.UNBOUND)
+            sch.send_nowait(4, unbounditems=channels.UNBOUND_REMOVE)
+            sch.send_nowait(5, unbounditems=channels.UNBOUND)
             """))
         self.assertEqual(_channels.get_count(rch.id), 5)
 
@@ -522,8 +522,8 @@ class TestSendRecv(TestBase):
             rch = channels.RecvChannel({rch.id})
             sch = channels.SendChannel({sch.id})
             obj1 = rch.recv()
-            sch.send_nowait(2, unbound=channels.UNBOUND)
-            sch.send_nowait(obj1, unbound=channels.UNBOUND_REMOVE)
+            sch.send_nowait(2, unbounditems=channels.UNBOUND)
+            sch.send_nowait(obj1, unbounditems=channels.UNBOUND_REMOVE)
             """))
         _run_output(interp2, dedent(f"""
             from test.support.interpreters import channels
@@ -535,21 +535,21 @@ class TestSendRecv(TestBase):
         self.assertEqual(_channels.get_count(rch.id), 0)
         sch.send_nowait(3)
         _run_output(interp1, dedent("""
-            sch.send_nowait(4, unbound=channels.UNBOUND)
+            sch.send_nowait(4, unbounditems=channels.UNBOUND)
             # interp closed here
-            sch.send_nowait(5, unbound=channels.UNBOUND_REMOVE)
-            sch.send_nowait(6, unbound=channels.UNBOUND)
+            sch.send_nowait(5, unbounditems=channels.UNBOUND_REMOVE)
+            sch.send_nowait(6, unbounditems=channels.UNBOUND)
             """))
         _run_output(interp2, dedent("""
-            sch.send_nowait(7, unbound=channels.UNBOUND_ERROR)
+            sch.send_nowait(7, unbounditems=channels.UNBOUND_ERROR)
             # interp closed here
-            sch.send_nowait(obj1, unbound=channels.UNBOUND_ERROR)
-            sch.send_nowait(obj2, unbound=channels.UNBOUND_REMOVE)
-            sch.send_nowait(8, unbound=channels.UNBOUND)
+            sch.send_nowait(obj1, unbounditems=channels.UNBOUND_ERROR)
+            sch.send_nowait(obj2, unbounditems=channels.UNBOUND_REMOVE)
+            sch.send_nowait(8, unbounditems=channels.UNBOUND)
             """))
         _run_output(interp1, dedent("""
-            sch.send_nowait(9, unbound=channels.UNBOUND_REMOVE)
-            sch.send_nowait(10, unbound=channels.UNBOUND)
+            sch.send_nowait(9, unbounditems=channels.UNBOUND_REMOVE)
+            sch.send_nowait(10, unbounditems=channels.UNBOUND)
             """))
         self.assertEqual(_channels.get_count(rch.id), 10)
 
index 18f83d097eb36042faa887dc983ffb7fdf06e7c6..64a2db1230d02314c0a22447c1b159f3245c62a3 100644 (file)
@@ -9,6 +9,7 @@ from test.support import import_helper, Py_DEBUG
 _queues = import_helper.import_module('_interpqueues')
 from test.support import interpreters
 from test.support.interpreters import queues, _crossinterp
+import test._crossinterp_definitions as defs
 from .utils import _run_output, TestBase as _TestBase
 
 
@@ -42,7 +43,7 @@ class LowLevelTests(TestBase):
         importlib.reload(queues)
 
     def test_create_destroy(self):
-        qid = _queues.create(2, 0, REPLACE)
+        qid = _queues.create(2, REPLACE, -1)
         _queues.destroy(qid)
         self.assertEqual(get_num_queues(), 0)
         with self.assertRaises(queues.QueueNotFoundError):
@@ -56,7 +57,7 @@ class LowLevelTests(TestBase):
             '-c',
             dedent(f"""
                 import {_queues.__name__} as _queues
-                _queues.create(2, 0, {REPLACE})
+                _queues.create(2, {REPLACE}, -1)
                 """),
         )
         self.assertEqual(stdout, '')
@@ -67,13 +68,13 @@ class LowLevelTests(TestBase):
 
     def test_bind_release(self):
         with self.subTest('typical'):
-            qid = _queues.create(2, 0, REPLACE)
+            qid = _queues.create(2, REPLACE, -1)
             _queues.bind(qid)
             _queues.release(qid)
             self.assertEqual(get_num_queues(), 0)
 
         with self.subTest('bind too much'):
-            qid = _queues.create(2, 0, REPLACE)
+            qid = _queues.create(2, REPLACE, -1)
             _queues.bind(qid)
             _queues.bind(qid)
             _queues.release(qid)
@@ -81,7 +82,7 @@ class LowLevelTests(TestBase):
             self.assertEqual(get_num_queues(), 0)
 
         with self.subTest('nested'):
-            qid = _queues.create(2, 0, REPLACE)
+            qid = _queues.create(2, REPLACE, -1)
             _queues.bind(qid)
             _queues.bind(qid)
             _queues.release(qid)
@@ -89,7 +90,7 @@ class LowLevelTests(TestBase):
             self.assertEqual(get_num_queues(), 0)
 
         with self.subTest('release without binding'):
-            qid = _queues.create(2, 0, REPLACE)
+            qid = _queues.create(2, REPLACE, -1)
             with self.assertRaises(queues.QueueError):
                 _queues.release(qid)
 
@@ -132,13 +133,13 @@ class QueueTests(TestBase):
 
         with self.subTest('same interpreter'):
             queue2 = queues.create()
-            queue1.put(queue2, syncobj=True)
+            queue1.put(queue2)
             queue3 = queue1.get()
             self.assertIs(queue3, queue2)
 
         with self.subTest('from current interpreter'):
             queue4 = queues.create()
-            queue1.put(queue4, syncobj=True)
+            queue1.put(queue4)
             out = _run_output(interp, dedent("""
                 queue4 = queue1.get()
                 print(queue4.id)
@@ -149,7 +150,7 @@ class QueueTests(TestBase):
         with self.subTest('from subinterpreter'):
             out = _run_output(interp, dedent("""
                 queue5 = queues.create()
-                queue1.put(queue5, syncobj=True)
+                queue1.put(queue5)
                 print(queue5.id)
                 """))
             qid = int(out)
@@ -198,7 +199,7 @@ class TestQueueOps(TestBase):
     def test_empty(self):
         queue = queues.create()
         before = queue.empty()
-        queue.put(None, syncobj=True)
+        queue.put(None)
         during = queue.empty()
         queue.get()
         after = queue.empty()
@@ -213,7 +214,7 @@ class TestQueueOps(TestBase):
         queue = queues.create(3)
         for _ in range(3):
             actual.append(queue.full())
-            queue.put(None, syncobj=True)
+            queue.put(None)
         actual.append(queue.full())
         for _ in range(3):
             queue.get()
@@ -227,16 +228,16 @@ class TestQueueOps(TestBase):
         queue = queues.create()
         for _ in range(3):
             actual.append(queue.qsize())
-            queue.put(None, syncobj=True)
+            queue.put(None)
         actual.append(queue.qsize())
         queue.get()
         actual.append(queue.qsize())
-        queue.put(None, syncobj=True)
+        queue.put(None)
         actual.append(queue.qsize())
         for _ in range(3):
             queue.get()
             actual.append(queue.qsize())
-        queue.put(None, syncobj=True)
+        queue.put(None)
         actual.append(queue.qsize())
         queue.get()
         actual.append(queue.qsize())
@@ -245,70 +246,32 @@ class TestQueueOps(TestBase):
 
     def test_put_get_main(self):
         expected = list(range(20))
-        for syncobj in (True, False):
-            kwds = dict(syncobj=syncobj)
-            with self.subTest(f'syncobj={syncobj}'):
-                queue = queues.create()
-                for i in range(20):
-                    queue.put(i, **kwds)
-                actual = [queue.get() for _ in range(20)]
+        queue = queues.create()
+        for i in range(20):
+            queue.put(i)
+        actual = [queue.get() for _ in range(20)]
 
-                self.assertEqual(actual, expected)
+        self.assertEqual(actual, expected)
 
     def test_put_timeout(self):
-        for syncobj in (True, False):
-            kwds = dict(syncobj=syncobj)
-            with self.subTest(f'syncobj={syncobj}'):
-                queue = queues.create(2)
-                queue.put(None, **kwds)
-                queue.put(None, **kwds)
-                with self.assertRaises(queues.QueueFull):
-                    queue.put(None, timeout=0.1, **kwds)
-                queue.get()
-                queue.put(None, **kwds)
+        queue = queues.create(2)
+        queue.put(None)
+        queue.put(None)
+        with self.assertRaises(queues.QueueFull):
+            queue.put(None, timeout=0.1)
+        queue.get()
+        queue.put(None)
 
     def test_put_nowait(self):
-        for syncobj in (True, False):
-            kwds = dict(syncobj=syncobj)
-            with self.subTest(f'syncobj={syncobj}'):
-                queue = queues.create(2)
-                queue.put_nowait(None, **kwds)
-                queue.put_nowait(None, **kwds)
-                with self.assertRaises(queues.QueueFull):
-                    queue.put_nowait(None, **kwds)
-                queue.get()
-                queue.put_nowait(None, **kwds)
-
-    def test_put_syncobj(self):
-        for obj in [
-            None,
-            True,
-            10,
-            'spam',
-            b'spam',
-            (0, 'a'),
-        ]:
-            with self.subTest(repr(obj)):
-                queue = queues.create()
-
-                queue.put(obj, syncobj=True)
-                obj2 = queue.get()
-                self.assertEqual(obj2, obj)
-
-                queue.put(obj, syncobj=True)
-                obj2 = queue.get_nowait()
-                self.assertEqual(obj2, obj)
-
-        for obj in [
-            [1, 2, 3],
-            {'a': 13, 'b': 17},
-        ]:
-            with self.subTest(repr(obj)):
-                queue = queues.create()
-                with self.assertRaises(interpreters.NotShareableError):
-                    queue.put(obj, syncobj=True)
+        queue = queues.create(2)
+        queue.put_nowait(None)
+        queue.put_nowait(None)
+        with self.assertRaises(queues.QueueFull):
+            queue.put_nowait(None)
+        queue.get()
+        queue.put_nowait(None)
 
-    def test_put_not_syncobj(self):
+    def test_put_full_fallback(self):
         for obj in [
             None,
             True,
@@ -323,11 +286,11 @@ class TestQueueOps(TestBase):
             with self.subTest(repr(obj)):
                 queue = queues.create()
 
-                queue.put(obj, syncobj=False)
+                queue.put(obj)
                 obj2 = queue.get()
                 self.assertEqual(obj2, obj)
 
-                queue.put(obj, syncobj=False)
+                queue.put(obj)
                 obj2 = queue.get_nowait()
                 self.assertEqual(obj2, obj)
 
@@ -341,24 +304,9 @@ class TestQueueOps(TestBase):
         with self.assertRaises(queues.QueueEmpty):
             queue.get_nowait()
 
-    def test_put_get_default_syncobj(self):
+    def test_put_get_full_fallback(self):
         expected = list(range(20))
-        queue = queues.create(syncobj=True)
-        for methname in ('get', 'get_nowait'):
-            with self.subTest(f'{methname}()'):
-                get = getattr(queue, methname)
-                for i in range(20):
-                    queue.put(i)
-                actual = [get() for _ in range(20)]
-                self.assertEqual(actual, expected)
-
-        obj = [1, 2, 3]  # lists are not shareable
-        with self.assertRaises(interpreters.NotShareableError):
-            queue.put(obj)
-
-    def test_put_get_default_not_syncobj(self):
-        expected = list(range(20))
-        queue = queues.create(syncobj=False)
+        queue = queues.create()
         for methname in ('get', 'get_nowait'):
             with self.subTest(f'{methname}()'):
                 get = getattr(queue, methname)
@@ -384,7 +332,7 @@ class TestQueueOps(TestBase):
             with self.subTest(f'{methname}()'):
                 interp.exec(dedent(f"""
                     orig = b'spam'
-                    queue.put(orig, syncobj=True)
+                    queue.put(orig)
                     obj = queue.{methname}()
                     assert obj == orig, 'expected: obj == orig'
                     assert obj is not orig, 'expected: obj is not orig'
@@ -399,7 +347,7 @@ class TestQueueOps(TestBase):
         for methname in ('get', 'get_nowait'):
             with self.subTest(f'{methname}()'):
                 obj1 = b'spam'
-                queue1.put(obj1, syncobj=True)
+                queue1.put(obj1)
 
                 out = _run_output(
                     interp,
@@ -416,7 +364,7 @@ class TestQueueOps(TestBase):
                         obj2 = b'eggs'
                         print(id(obj2))
                         assert queue2.qsize() == 0, 'expected: queue2.qsize() == 0'
-                        queue2.put(obj2, syncobj=True)
+                        queue2.put(obj2)
                         assert queue2.qsize() == 1, 'expected: queue2.qsize() == 1'
                         """))
                 self.assertEqual(len(queues.list_all()), 2)
@@ -433,11 +381,11 @@ class TestQueueOps(TestBase):
             if not unbound:
                 extraargs = ''
             elif unbound is queues.UNBOUND:
-                extraargs = ', unbound=queues.UNBOUND'
+                extraargs = ', unbounditems=queues.UNBOUND'
             elif unbound is queues.UNBOUND_ERROR:
-                extraargs = ', unbound=queues.UNBOUND_ERROR'
+                extraargs = ', unbounditems=queues.UNBOUND_ERROR'
             elif unbound is queues.UNBOUND_REMOVE:
-                extraargs = ', unbound=queues.UNBOUND_REMOVE'
+                extraargs = ', unbounditems=queues.UNBOUND_REMOVE'
             else:
                 raise NotImplementedError(repr(unbound))
             interp = interpreters.create()
@@ -447,8 +395,8 @@ class TestQueueOps(TestBase):
                 queue = queues.Queue({queue.id})
                 obj1 = b'spam'
                 obj2 = b'eggs'
-                queue.put(obj1, syncobj=True{extraargs})
-                queue.put(obj2, syncobj=True{extraargs})
+                queue.put(obj1{extraargs})
+                queue.put(obj2{extraargs})
                 """))
             self.assertEqual(queue.qsize(), presize + 2)
 
@@ -501,11 +449,11 @@ class TestQueueOps(TestBase):
             with self.assertRaises(queues.QueueEmpty):
                 queue.get_nowait()
 
-            queue.put(b'ham', unbound=queues.UNBOUND_REMOVE)
+            queue.put(b'ham', unbounditems=queues.UNBOUND_REMOVE)
             self.assertEqual(queue.qsize(), 1)
             interp = common(queue, queues.UNBOUND_REMOVE, 1)
             self.assertEqual(queue.qsize(), 3)
-            queue.put(42, unbound=queues.UNBOUND_REMOVE)
+            queue.put(42, unbounditems=queues.UNBOUND_REMOVE)
             self.assertEqual(queue.qsize(), 4)
             del interp
             self.assertEqual(queue.qsize(), 2)
@@ -523,11 +471,11 @@ class TestQueueOps(TestBase):
         _run_output(interp, dedent(f"""
             from test.support.interpreters import queues
             queue = queues.Queue({queue.id})
-            queue.put(1, syncobj=True, unbound=queues.UNBOUND)
-            queue.put(2, syncobj=True, unbound=queues.UNBOUND_ERROR)
-            queue.put(3, syncobj=True)
-            queue.put(4, syncobj=True, unbound=queues.UNBOUND_REMOVE)
-            queue.put(5, syncobj=True, unbound=queues.UNBOUND)
+            queue.put(1, unbounditems=queues.UNBOUND)
+            queue.put(2, unbounditems=queues.UNBOUND_ERROR)
+            queue.put(3)
+            queue.put(4, unbounditems=queues.UNBOUND_REMOVE)
+            queue.put(5, unbounditems=queues.UNBOUND)
             """))
         self.assertEqual(queue.qsize(), 5)
 
@@ -555,13 +503,13 @@ class TestQueueOps(TestBase):
         interp1 = interpreters.create()
         interp2 = interpreters.create()
 
-        queue.put(1, syncobj=True)
+        queue.put(1)
         _run_output(interp1, dedent(f"""
             from test.support.interpreters import queues
             queue = queues.Queue({queue.id})
             obj1 = queue.get()
-            queue.put(2, syncobj=True, unbound=queues.UNBOUND)
-            queue.put(obj1, syncobj=True, unbound=queues.UNBOUND_REMOVE)
+            queue.put(2, unbounditems=queues.UNBOUND)
+            queue.put(obj1, unbounditems=queues.UNBOUND_REMOVE)
             """))
         _run_output(interp2, dedent(f"""
             from test.support.interpreters import queues
@@ -572,21 +520,21 @@ class TestQueueOps(TestBase):
         self.assertEqual(queue.qsize(), 0)
         queue.put(3)
         _run_output(interp1, dedent("""
-            queue.put(4, syncobj=True, unbound=queues.UNBOUND)
+            queue.put(4, unbounditems=queues.UNBOUND)
             # interp closed here
-            queue.put(5, syncobj=True, unbound=queues.UNBOUND_REMOVE)
-            queue.put(6, syncobj=True, unbound=queues.UNBOUND)
+            queue.put(5, unbounditems=queues.UNBOUND_REMOVE)
+            queue.put(6, unbounditems=queues.UNBOUND)
             """))
         _run_output(interp2, dedent("""
-            queue.put(7, syncobj=True, unbound=queues.UNBOUND_ERROR)
+            queue.put(7, unbounditems=queues.UNBOUND_ERROR)
             # interp closed here
-            queue.put(obj1, syncobj=True, unbound=queues.UNBOUND_ERROR)
-            queue.put(obj2, syncobj=True, unbound=queues.UNBOUND_REMOVE)
-            queue.put(8, syncobj=True, unbound=queues.UNBOUND)
+            queue.put(obj1, unbounditems=queues.UNBOUND_ERROR)
+            queue.put(obj2, unbounditems=queues.UNBOUND_REMOVE)
+            queue.put(8, unbounditems=queues.UNBOUND)
             """))
         _run_output(interp1, dedent("""
-            queue.put(9, syncobj=True, unbound=queues.UNBOUND_REMOVE)
-            queue.put(10, syncobj=True, unbound=queues.UNBOUND)
+            queue.put(9, unbounditems=queues.UNBOUND_REMOVE)
+            queue.put(10, unbounditems=queues.UNBOUND)
             """))
         self.assertEqual(queue.qsize(), 10)
 
@@ -642,12 +590,12 @@ class TestQueueOps(TestBase):
                     break
                 except queues.QueueEmpty:
                     continue
-            queue2.put(obj, syncobj=True)
+            queue2.put(obj)
         t = threading.Thread(target=f)
         t.start()
 
         orig = b'spam'
-        queue1.put(orig, syncobj=True)
+        queue1.put(orig)
         obj = queue2.get()
         t.join()
 
index f9fa1dab29105667b01560cfc7b835bfb5ff008f..0ab553190001bd508ba2ce521b6df18abcbf1225 100644 (file)
 #endif
 
 #define REGISTERS_HEAP_TYPES
+#define HAS_FALLBACK
 #define HAS_UNBOUND_ITEMS
 #include "_interpreters_common.h"
 #undef HAS_UNBOUND_ITEMS
+#undef HAS_FALLBACK
 #undef REGISTERS_HEAP_TYPES
 
 
@@ -523,7 +525,7 @@ typedef struct _channelitem {
     int64_t interpid;
     _PyXIData_t *data;
     _waiting_t *waiting;
-    int unboundop;
+    unboundop_t unboundop;
     struct _channelitem *next;
 } _channelitem;
 
@@ -536,7 +538,7 @@ _channelitem_ID(_channelitem *item)
 static void
 _channelitem_init(_channelitem *item,
                   int64_t interpid, _PyXIData_t *data,
-                  _waiting_t *waiting, int unboundop)
+                  _waiting_t *waiting, unboundop_t unboundop)
 {
     if (interpid < 0) {
         interpid = _get_interpid(data);
@@ -583,7 +585,7 @@ _channelitem_clear(_channelitem *item)
 
 static _channelitem *
 _channelitem_new(int64_t interpid, _PyXIData_t *data,
-                 _waiting_t *waiting, int unboundop)
+                 _waiting_t *waiting, unboundop_t unboundop)
 {
     _channelitem *item = GLOBAL_MALLOC(_channelitem);
     if (item == NULL) {
@@ -694,7 +696,7 @@ _channelqueue_free(_channelqueue *queue)
 static int
 _channelqueue_put(_channelqueue *queue,
                   int64_t interpid, _PyXIData_t *data,
-                  _waiting_t *waiting, int unboundop)
+                  _waiting_t *waiting, unboundop_t unboundop)
 {
     _channelitem *item = _channelitem_new(interpid, data, waiting, unboundop);
     if (item == NULL) {
@@ -798,7 +800,7 @@ _channelqueue_remove(_channelqueue *queue, _channelitem_id_t itemid,
     }
     queue->count -= 1;
 
-    int unboundop;
+    unboundop_t unboundop;
     _channelitem_popped(item, p_data, p_waiting, &unboundop);
 }
 
@@ -1083,16 +1085,18 @@ typedef struct _channel {
     PyThread_type_lock mutex;
     _channelqueue *queue;
     _channelends *ends;
-    struct {
-        int unboundop;
+    struct _channeldefaults {
+        unboundop_t unboundop;
+        xidata_fallback_t fallback;
     } defaults;
     int open;
     struct _channel_closing *closing;
 } _channel_state;
 
 static _channel_state *
-_channel_new(PyThread_type_lock mutex, int unboundop)
+_channel_new(PyThread_type_lock mutex, struct _channeldefaults defaults)
 {
+    assert(check_unbound(defaults.unboundop));
     _channel_state *chan = GLOBAL_MALLOC(_channel_state);
     if (chan == NULL) {
         return NULL;
@@ -1109,7 +1113,7 @@ _channel_new(PyThread_type_lock mutex, int unboundop)
         GLOBAL_FREE(chan);
         return NULL;
     }
-    chan->defaults.unboundop = unboundop;
+    chan->defaults = defaults;
     chan->open = 1;
     chan->closing = NULL;
     return chan;
@@ -1130,7 +1134,7 @@ _channel_free(_channel_state *chan)
 
 static int
 _channel_add(_channel_state *chan, int64_t interpid,
-             _PyXIData_t *data, _waiting_t *waiting, int unboundop)
+             _PyXIData_t *data, _waiting_t *waiting, unboundop_t unboundop)
 {
     int res = -1;
     PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
@@ -1611,7 +1615,7 @@ done:
 
 struct channel_id_and_info {
     int64_t id;
-    int unboundop;
+    struct _channeldefaults defaults;
 };
 
 static struct channel_id_and_info *
@@ -1628,7 +1632,7 @@ _channels_list_all(_channels *channels, int64_t *count)
     for (int64_t i=0; ref != NULL; ref = ref->next, i++) {
         ids[i] = (struct channel_id_and_info){
             .id = ref->cid,
-            .unboundop = ref->chan->defaults.unboundop,
+            .defaults = ref->chan->defaults,
         };
     }
     *count = channels->numopen;
@@ -1714,13 +1718,13 @@ _channel_finish_closing(_channel_state *chan) {
 
 // Create a new channel.
 static int64_t
-channel_create(_channels *channels, int unboundop)
+channel_create(_channels *channels, struct _channeldefaults defaults)
 {
     PyThread_type_lock mutex = PyThread_allocate_lock();
     if (mutex == NULL) {
         return ERR_CHANNEL_MUTEX_INIT;
     }
-    _channel_state *chan = _channel_new(mutex, unboundop);
+    _channel_state *chan = _channel_new(mutex, defaults);
     if (chan == NULL) {
         PyThread_free_lock(mutex);
         return -1;
@@ -1752,7 +1756,7 @@ channel_destroy(_channels *channels, int64_t cid)
 // Optionally request to be notified when it is received.
 static int
 channel_send(_channels *channels, int64_t cid, PyObject *obj,
-             _waiting_t *waiting, int unboundop)
+             _waiting_t *waiting, unboundop_t unboundop, xidata_fallback_t fallback)
 {
     PyThreadState *tstate = _PyThreadState_GET();
     PyInterpreterState *interp = tstate->interp;
@@ -1779,7 +1783,7 @@ channel_send(_channels *channels, int64_t cid, PyObject *obj,
         PyThread_release_lock(mutex);
         return -1;
     }
-    if (_PyObject_GetXIDataNoFallback(tstate, obj, data) != 0) {
+    if (_PyObject_GetXIData(tstate, obj, fallback, data) != 0) {
         PyThread_release_lock(mutex);
         GLOBAL_FREE(data);
         return -1;
@@ -1823,7 +1827,8 @@ channel_clear_sent(_channels *channels, int64_t cid, _waiting_t *waiting)
 // Like channel_send(), but strictly wait for the object to be received.
 static int
 channel_send_wait(_channels *channels, int64_t cid, PyObject *obj,
-                  int unboundop, PY_TIMEOUT_T timeout)
+                  unboundop_t unboundop, PY_TIMEOUT_T timeout,
+                  xidata_fallback_t fallback)
 {
     // We use a stack variable here, so we must ensure that &waiting
     // is not held by any channel item at the point this function exits.
@@ -1834,7 +1839,7 @@ channel_send_wait(_channels *channels, int64_t cid, PyObject *obj,
     }
 
     /* Queue up the object. */
-    int res = channel_send(channels, cid, obj, &waiting, unboundop);
+    int res = channel_send(channels, cid, obj, &waiting, unboundop, fallback);
     if (res < 0) {
         assert(waiting.status == WAITING_NO_STATUS);
         goto finally;
@@ -2005,6 +2010,20 @@ channel_is_associated(_channels *channels, int64_t cid, int64_t interpid,
     return (end != NULL && end->open);
 }
 
+static int
+channel_get_defaults(_channels *channels, int64_t cid, struct _channeldefaults *defaults)
+{
+    PyThread_type_lock mutex = NULL;
+    _channel_state *channel = NULL;
+    int err = _channels_lookup(channels, cid, &mutex, &channel);
+    if (err != 0) {
+        return err;
+    }
+    *defaults = channel->defaults;
+    PyThread_release_lock(mutex);
+    return 0;
+}
+
 static int
 _channel_get_count(_channels *channels, int64_t cid, Py_ssize_t *p_count)
 {
@@ -2881,20 +2900,27 @@ clear_interpreter(void *data)
 static PyObject *
 channelsmod_create(PyObject *self, PyObject *args, PyObject *kwds)
 {
-    static char *kwlist[] = {"unboundop", NULL};
-    int unboundop;
-    if (!PyArg_ParseTupleAndKeywords(args, kwds, "i:create", kwlist,
-                                     &unboundop))
+    static char *kwlist[] = {"unboundop", "fallback", NULL};
+    int unboundarg = -1;
+    int fallbackarg = -1;
+    if (!PyArg_ParseTupleAndKeywords(args, kwds, "|ii:create", kwlist,
+                                     &unboundarg, &fallbackarg))
     {
         return NULL;
     }
-    if (!check_unbound(unboundop)) {
-        PyErr_Format(PyExc_ValueError,
-                     "unsupported unboundop %d", unboundop);
+    struct _channeldefaults defaults = {0};
+    if (resolve_unboundop(unboundarg, UNBOUND_REPLACE,
+                          &defaults.unboundop) < 0)
+    {
+        return NULL;
+    }
+    if (resolve_fallback(fallbackarg, _PyXIDATA_FULL_FALLBACK,
+                         &defaults.fallback) < 0)
+    {
         return NULL;
     }
 
-    int64_t cid = channel_create(&_globals.channels, unboundop);
+    int64_t cid = channel_create(&_globals.channels, defaults);
     if (cid < 0) {
         (void)handle_channel_error(-1, self, cid);
         return NULL;
@@ -2987,7 +3013,9 @@ channelsmod_list_all(PyObject *self, PyObject *Py_UNUSED(ignored))
         }
         assert(cidobj != NULL);
 
-        PyObject *item = Py_BuildValue("Oi", cidobj, cur->unboundop);
+        PyObject *item = Py_BuildValue("Oii", cidobj,
+                                       cur->defaults.unboundop,
+                                       cur->defaults.fallback);
         Py_DECREF(cidobj);
         if (item == NULL) {
             Py_SETREF(ids, NULL);
@@ -3075,40 +3103,54 @@ receive end.");
 static PyObject *
 channelsmod_send(PyObject *self, PyObject *args, PyObject *kwds)
 {
-    static char *kwlist[] = {"cid", "obj", "unboundop", "blocking", "timeout",
-                             NULL};
+    static char *kwlist[] = {"cid", "obj", "unboundop", "fallback",
+                             "blocking", "timeout", NULL};
     struct channel_id_converter_data cid_data = {
         .module = self,
     };
     PyObject *obj;
-    int unboundop = UNBOUND_REPLACE;
+    int unboundarg = -1;
+    int fallbackarg = -1;
     int blocking = 1;
     PyObject *timeout_obj = NULL;
-    if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&O|i$pO:channel_send", kwlist,
+    if (!PyArg_ParseTupleAndKeywords(args, kwds,
+                                     "O&O|ii$pO:channel_send", kwlist,
                                      channel_id_converter, &cid_data, &obj,
-                                     &unboundop, &blocking, &timeout_obj))
+                                     &unboundarg, &fallbackarg,
+                                     &blocking, &timeout_obj))
     {
         return NULL;
     }
-    if (!check_unbound(unboundop)) {
-        PyErr_Format(PyExc_ValueError,
-                     "unsupported unboundop %d", unboundop);
-        return NULL;
-    }
-
     int64_t cid = cid_data.cid;
     PY_TIMEOUT_T timeout;
     if (PyThread_ParseTimeoutArg(timeout_obj, blocking, &timeout) < 0) {
         return NULL;
     }
+    struct _channeldefaults defaults = {-1, -1};
+    if (unboundarg < 0 || fallbackarg < 0) {
+        int err = channel_get_defaults(&_globals.channels, cid, &defaults);
+        if (handle_channel_error(err, self, cid)) {
+            return NULL;
+        }
+    }
+    unboundop_t unboundop;
+    if (resolve_unboundop(unboundarg, defaults.unboundop, &unboundop) < 0) {
+        return NULL;
+    }
+    xidata_fallback_t fallback;
+    if (resolve_fallback(fallbackarg, defaults.fallback, &fallback) < 0) {
+        return NULL;
+    }
 
     /* Queue up the object. */
     int err = 0;
     if (blocking) {
-        err = channel_send_wait(&_globals.channels, cid, obj, unboundop, timeout);
+        err = channel_send_wait(
+                &_globals.channels, cid, obj, unboundop, timeout, fallback);
     }
     else {
-        err = channel_send(&_globals.channels, cid, obj, NULL, unboundop);
+        err = channel_send(
+                &_globals.channels, cid, obj, NULL, unboundop, fallback);
     }
     if (handle_channel_error(err, self, cid)) {
         return NULL;
@@ -3126,32 +3168,44 @@ By default this waits for the object to be received.");
 static PyObject *
 channelsmod_send_buffer(PyObject *self, PyObject *args, PyObject *kwds)
 {
-    static char *kwlist[] = {"cid", "obj", "unboundop", "blocking", "timeout",
-                             NULL};
+    static char *kwlist[] = {"cid", "obj", "unboundop", "fallback",
+                             "blocking", "timeout", NULL};
     struct channel_id_converter_data cid_data = {
         .module = self,
     };
     PyObject *obj;
-    int unboundop = UNBOUND_REPLACE;
-    int blocking = 1;
+    int unboundarg = -1;
+    int fallbackarg = -1;
+    int blocking = -1;
     PyObject *timeout_obj = NULL;
     if (!PyArg_ParseTupleAndKeywords(args, kwds,
-                                     "O&O|i$pO:channel_send_buffer", kwlist,
+                                     "O&O|ii$pO:channel_send_buffer", kwlist,
                                      channel_id_converter, &cid_data, &obj,
-                                     &unboundop, &blocking, &timeout_obj)) {
-        return NULL;
-    }
-    if (!check_unbound(unboundop)) {
-        PyErr_Format(PyExc_ValueError,
-                     "unsupported unboundop %d", unboundop);
+                                     &unboundarg, &fallbackarg,
+                                     &blocking, &timeout_obj))
+    {
         return NULL;
     }
-
     int64_t cid = cid_data.cid;
     PY_TIMEOUT_T timeout;
     if (PyThread_ParseTimeoutArg(timeout_obj, blocking, &timeout) < 0) {
         return NULL;
     }
+    struct _channeldefaults defaults = {-1, -1};
+    if (unboundarg < 0 || fallbackarg < 0) {
+        int err = channel_get_defaults(&_globals.channels, cid, &defaults);
+        if (handle_channel_error(err, self, cid)) {
+            return NULL;
+        }
+    }
+    unboundop_t unboundop;
+    if (resolve_unboundop(unboundarg, defaults.unboundop, &unboundop) < 0) {
+        return NULL;
+    }
+    xidata_fallback_t fallback;
+    if (resolve_fallback(fallbackarg, defaults.fallback, &fallback) < 0) {
+        return NULL;
+    }
 
     PyObject *tempobj = PyMemoryView_FromObject(obj);
     if (tempobj == NULL) {
@@ -3162,10 +3216,11 @@ channelsmod_send_buffer(PyObject *self, PyObject *args, PyObject *kwds)
     int err = 0;
     if (blocking) {
         err = channel_send_wait(
-                &_globals.channels, cid, tempobj, unboundop, timeout);
+                &_globals.channels, cid, tempobj, unboundop, timeout, fallback);
     }
     else {
-        err = channel_send(&_globals.channels, cid, tempobj, NULL, unboundop);
+        err = channel_send(
+                &_globals.channels, cid, tempobj, NULL, unboundop, fallback);
     }
     Py_DECREF(tempobj);
     if (handle_channel_error(err, self, cid)) {
@@ -3197,7 +3252,7 @@ channelsmod_recv(PyObject *self, PyObject *args, PyObject *kwds)
     cid = cid_data.cid;
 
     PyObject *obj = NULL;
-    int unboundop = 0;
+    unboundop_t unboundop = 0;
     int err = channel_recv(&_globals.channels, cid, &obj, &unboundop);
     if (err == ERR_CHANNEL_EMPTY && dflt != NULL) {
         // Use the default.
@@ -3388,17 +3443,14 @@ channelsmod_get_channel_defaults(PyObject *self, PyObject *args, PyObject *kwds)
     }
     int64_t cid = cid_data.cid;
 
-    PyThread_type_lock mutex = NULL;
-    _channel_state *channel = NULL;
-    int err = _channels_lookup(&_globals.channels, cid, &mutex, &channel);
+    struct _channeldefaults defaults;
+    int err = channel_get_defaults(&_globals.channels, cid, &defaults);
     if (handle_channel_error(err, self, cid)) {
         return NULL;
     }
-    int unboundop = channel->defaults.unboundop;
-    PyThread_release_lock(mutex);
 
-    PyObject *defaults = Py_BuildValue("i", unboundop);
-    return defaults;
+    PyObject *res = Py_BuildValue("ii", defaults.unboundop, defaults.fallback);
+    return res;
 }
 
 PyDoc_STRVAR(channelsmod_get_channel_defaults_doc,
index 209fcdfd0cd01e671596d663b73e1d6b0c23c11e..816285c9eff44ab4443ba140d6c93cc19862c1de 100644 (file)
@@ -9,9 +9,11 @@
 #include "pycore_crossinterp.h"   // _PyXIData_t
 
 #define REGISTERS_HEAP_TYPES
+#define HAS_FALLBACK
 #define HAS_UNBOUND_ITEMS
 #include "_interpreters_common.h"
 #undef HAS_UNBOUND_ITEMS
+#undef HAS_FALLBACK
 #undef REGISTERS_HEAP_TYPES
 
 
@@ -401,14 +403,13 @@ typedef struct _queueitem {
        meaning the interpreter has been destroyed. */
     int64_t interpid;
     _PyXIData_t *data;
-    int fmt;
-    int unboundop;
+    unboundop_t unboundop;
     struct _queueitem *next;
 } _queueitem;
 
 static void
 _queueitem_init(_queueitem *item,
-                int64_t interpid, _PyXIData_t *data, int fmt, int unboundop)
+                int64_t interpid, _PyXIData_t *data, unboundop_t unboundop)
 {
     if (interpid < 0) {
         interpid = _get_interpid(data);
@@ -422,7 +423,6 @@ _queueitem_init(_queueitem *item,
     *item = (_queueitem){
         .interpid = interpid,
         .data = data,
-        .fmt = fmt,
         .unboundop = unboundop,
     };
 }
@@ -446,14 +446,14 @@ _queueitem_clear(_queueitem *item)
 }
 
 static _queueitem *
-_queueitem_new(int64_t interpid, _PyXIData_t *data, int fmt, int unboundop)
+_queueitem_new(int64_t interpid, _PyXIData_t *data, int unboundop)
 {
     _queueitem *item = GLOBAL_MALLOC(_queueitem);
     if (item == NULL) {
         PyErr_NoMemory();
         return NULL;
     }
-    _queueitem_init(item, interpid, data, fmt, unboundop);
+    _queueitem_init(item, interpid, data, unboundop);
     return item;
 }
 
@@ -476,10 +476,9 @@ _queueitem_free_all(_queueitem *item)
 
 static void
 _queueitem_popped(_queueitem *item,
-                  _PyXIData_t **p_data, int *p_fmt, int *p_unboundop)
+                  _PyXIData_t **p_data, unboundop_t *p_unboundop)
 {
     *p_data = item->data;
-    *p_fmt = item->fmt;
     *p_unboundop = item->unboundop;
     // We clear them here, so they won't be released in _queueitem_clear().
     item->data = NULL;
@@ -527,16 +526,16 @@ typedef struct _queue {
         _queueitem *first;
         _queueitem *last;
     } items;
-    struct {
-        int fmt;
+    struct _queuedefaults {
+        xidata_fallback_t fallback;
         int unboundop;
     } defaults;
 } _queue;
 
 static int
-_queue_init(_queue *queue, Py_ssize_t maxsize, int fmt, int unboundop)
+_queue_init(_queue *queue, Py_ssize_t maxsize, struct _queuedefaults defaults)
 {
-    assert(check_unbound(unboundop));
+    assert(check_unbound(defaults.unboundop));
     PyThread_type_lock mutex = PyThread_allocate_lock();
     if (mutex == NULL) {
         return ERR_QUEUE_ALLOC;
@@ -547,10 +546,7 @@ _queue_init(_queue *queue, Py_ssize_t maxsize, int fmt, int unboundop)
         .items = {
             .maxsize = maxsize,
         },
-        .defaults = {
-            .fmt = fmt,
-            .unboundop = unboundop,
-        },
+        .defaults = defaults,
     };
     return 0;
 }
@@ -631,8 +627,7 @@ _queue_unlock(_queue *queue)
 }
 
 static int
-_queue_add(_queue *queue, int64_t interpid, _PyXIData_t *data,
-           int fmt, int unboundop)
+_queue_add(_queue *queue, int64_t interpid, _PyXIData_t *data, int unboundop)
 {
     int err = _queue_lock(queue);
     if (err < 0) {
@@ -648,7 +643,7 @@ _queue_add(_queue *queue, int64_t interpid, _PyXIData_t *data,
         return ERR_QUEUE_FULL;
     }
 
-    _queueitem *item = _queueitem_new(interpid, data, fmt, unboundop);
+    _queueitem *item = _queueitem_new(interpid, data, unboundop);
     if (item == NULL) {
         _queue_unlock(queue);
         return -1;
@@ -668,8 +663,7 @@ _queue_add(_queue *queue, int64_t interpid, _PyXIData_t *data,
 }
 
 static int
-_queue_next(_queue *queue,
-            _PyXIData_t **p_data, int *p_fmt, int *p_unboundop)
+_queue_next(_queue *queue, _PyXIData_t **p_data, int *p_unboundop)
 {
     int err = _queue_lock(queue);
     if (err < 0) {
@@ -688,7 +682,7 @@ _queue_next(_queue *queue,
     }
     queue->items.count -= 1;
 
-    _queueitem_popped(item, p_data, p_fmt, p_unboundop);
+    _queueitem_popped(item, p_data, p_unboundop);
 
     _queue_unlock(queue);
     return 0;
@@ -1035,8 +1029,7 @@ finally:
 
 struct queue_id_and_info {
     int64_t id;
-    int fmt;
-    int unboundop;
+    struct _queuedefaults defaults;
 };
 
 static struct queue_id_and_info *
@@ -1053,8 +1046,7 @@ _queues_list_all(_queues *queues, int64_t *p_count)
     for (int64_t i=0; ref != NULL; ref = ref->next, i++) {
         ids[i].id = ref->qid;
         assert(ref->queue != NULL);
-        ids[i].fmt = ref->queue->defaults.fmt;
-        ids[i].unboundop = ref->queue->defaults.unboundop;
+        ids[i].defaults = ref->queue->defaults;
     }
     *p_count = queues->count;
 
@@ -1090,13 +1082,14 @@ _queue_free(_queue *queue)
 
 // Create a new queue.
 static int64_t
-queue_create(_queues *queues, Py_ssize_t maxsize, int fmt, int unboundop)
+queue_create(_queues *queues, Py_ssize_t maxsize,
+             struct _queuedefaults defaults)
 {
     _queue *queue = GLOBAL_MALLOC(_queue);
     if (queue == NULL) {
         return ERR_QUEUE_ALLOC;
     }
-    int err = _queue_init(queue, maxsize, fmt, unboundop);
+    int err = _queue_init(queue, maxsize, defaults);
     if (err < 0) {
         GLOBAL_FREE(queue);
         return (int64_t)err;
@@ -1125,7 +1118,8 @@ queue_destroy(_queues *queues, int64_t qid)
 
 // Push an object onto the queue.
 static int
-queue_put(_queues *queues, int64_t qid, PyObject *obj, int fmt, int unboundop)
+queue_put(_queues *queues, int64_t qid, PyObject *obj, unboundop_t unboundop,
+          xidata_fallback_t fallback)
 {
     PyThreadState *tstate = PyThreadState_Get();
 
@@ -1138,27 +1132,27 @@ queue_put(_queues *queues, int64_t qid, PyObject *obj, int fmt, int unboundop)
     assert(queue != NULL);
 
     // Convert the object to cross-interpreter data.
-    _PyXIData_t *data = _PyXIData_New();
-    if (data == NULL) {
+    _PyXIData_t *xidata = _PyXIData_New();
+    if (xidata == NULL) {
         _queue_unmark_waiter(queue, queues->mutex);
         return -1;
     }
-    if (_PyObject_GetXIDataNoFallback(tstate, obj, data) != 0) {
+    if (_PyObject_GetXIData(tstate, obj, fallback, xidata) != 0) {
         _queue_unmark_waiter(queue, queues->mutex);
-        GLOBAL_FREE(data);
+        GLOBAL_FREE(xidata);
         return -1;
     }
-    assert(_PyXIData_INTERPID(data) ==
+    assert(_PyXIData_INTERPID(xidata) ==
             PyInterpreterState_GetID(tstate->interp));
 
     // Add the data to the queue.
     int64_t interpid = -1;  // _queueitem_init() will set it.
-    int res = _queue_add(queue, interpid, data, fmt, unboundop);
+    int res = _queue_add(queue, interpid, xidata, unboundop);
     _queue_unmark_waiter(queue, queues->mutex);
     if (res != 0) {
         // We may chain an exception here:
-        (void)_release_xid_data(data, 0);
-        GLOBAL_FREE(data);
+        (void)_release_xid_data(xidata, 0);
+        GLOBAL_FREE(xidata);
         return res;
     }
 
@@ -1169,7 +1163,7 @@ queue_put(_queues *queues, int64_t qid, PyObject *obj, int fmt, int unboundop)
 // XXX Support a "wait" mutex?
 static int
 queue_get(_queues *queues, int64_t qid,
-          PyObject **res, int *p_fmt, int *p_unboundop)
+          PyObject **res, int *p_unboundop)
 {
     int err;
     *res = NULL;
@@ -1185,7 +1179,7 @@ queue_get(_queues *queues, int64_t qid,
 
     // Pop off the next item from the queue.
     _PyXIData_t *data = NULL;
-    err = _queue_next(queue, &data, p_fmt, p_unboundop);
+    err = _queue_next(queue, &data, p_unboundop);
     _queue_unmark_waiter(queue, queues->mutex);
     if (err != 0) {
         return err;
@@ -1216,6 +1210,20 @@ queue_get(_queues *queues, int64_t qid,
     return 0;
 }
 
+static int
+queue_get_defaults(_queues *queues, int64_t qid,
+                   struct _queuedefaults *p_defaults)
+{
+    _queue *queue = NULL;
+    int err = _queues_lookup(queues, qid, &queue);
+    if (err != 0) {
+        return err;
+    }
+    *p_defaults = queue->defaults;
+    _queue_unmark_waiter(queue, queues->mutex);
+    return 0;
+}
+
 static int
 queue_get_maxsize(_queues *queues, int64_t qid, Py_ssize_t *p_maxsize)
 {
@@ -1474,22 +1482,28 @@ qidarg_converter(PyObject *arg, void *ptr)
 static PyObject *
 queuesmod_create(PyObject *self, PyObject *args, PyObject *kwds)
 {
-    static char *kwlist[] = {"maxsize", "fmt", "unboundop", NULL};
+    static char *kwlist[] = {"maxsize", "unboundop", "fallback", NULL};
     Py_ssize_t maxsize;
-    int fmt;
-    int unboundop;
-    if (!PyArg_ParseTupleAndKeywords(args, kwds, "nii:create", kwlist,
-                                     &maxsize, &fmt, &unboundop))
+    int unboundarg = -1;
+    int fallbackarg = -1;
+    if (!PyArg_ParseTupleAndKeywords(args, kwds, "n|ii:create", kwlist,
+                                     &maxsize, &unboundarg, &fallbackarg))
     {
         return NULL;
     }
-    if (!check_unbound(unboundop)) {
-        PyErr_Format(PyExc_ValueError,
-                     "unsupported unboundop %d", unboundop);
+    struct _queuedefaults defaults = {0};
+    if (resolve_unboundop(unboundarg, UNBOUND_REPLACE,
+                          &defaults.unboundop) < 0)
+    {
+        return NULL;
+    }
+    if (resolve_fallback(fallbackarg, _PyXIDATA_FULL_FALLBACK,
+                         &defaults.fallback) < 0)
+    {
         return NULL;
     }
 
-    int64_t qid = queue_create(&_globals.queues, maxsize, fmt, unboundop);
+    int64_t qid = queue_create(&_globals.queues, maxsize, defaults);
     if (qid < 0) {
         (void)handle_queue_error((int)qid, self, qid);
         return NULL;
@@ -1511,7 +1525,7 @@ queuesmod_create(PyObject *self, PyObject *args, PyObject *kwds)
 }
 
 PyDoc_STRVAR(queuesmod_create_doc,
-"create(maxsize, fmt, unboundop) -> qid\n\
+"create(maxsize, unboundop, fallback) -> qid\n\
 \n\
 Create a new cross-interpreter queue and return its unique generated ID.\n\
 It is a new reference as though bind() had been called on the queue.\n\
@@ -1560,8 +1574,9 @@ queuesmod_list_all(PyObject *self, PyObject *Py_UNUSED(ignored))
     }
     struct queue_id_and_info *cur = qids;
     for (int64_t i=0; i < count; cur++, i++) {
-        PyObject *item = Py_BuildValue("Lii", cur->id, cur->fmt,
-                                       cur->unboundop);
+        PyObject *item = Py_BuildValue("Lii", cur->id,
+                                       cur->defaults.unboundop,
+                                       cur->defaults.fallback);
         if (item == NULL) {
             Py_SETREF(ids, NULL);
             break;
@@ -1575,34 +1590,44 @@ finally:
 }
 
 PyDoc_STRVAR(queuesmod_list_all_doc,
-"list_all() -> [(qid, fmt)]\n\
+"list_all() -> [(qid, unboundop, fallback)]\n\
 \n\
 Return the list of IDs for all queues.\n\
-Each corresponding default format is also included.");
+Each corresponding default unbound op and fallback is also included.");
 
 static PyObject *
 queuesmod_put(PyObject *self, PyObject *args, PyObject *kwds)
 {
-    static char *kwlist[] = {"qid", "obj", "fmt", "unboundop", NULL};
+    static char *kwlist[] = {"qid", "obj", "unboundop", "fallback", NULL};
     qidarg_converter_data qidarg = {0};
     PyObject *obj;
-    int fmt;
-    int unboundop;
-    if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&Oii:put", kwlist,
-                                     qidarg_converter, &qidarg, &obj, &fmt,
-                                     &unboundop))
+    int unboundarg = -1;
+    int fallbackarg = -1;
+    if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&O|ii$p:put", kwlist,
+                                     qidarg_converter, &qidarg, &obj,
+                                     &unboundarg, &fallbackarg))
     {
         return NULL;
     }
     int64_t qid = qidarg.id;
-    if (!check_unbound(unboundop)) {
-        PyErr_Format(PyExc_ValueError,
-                     "unsupported unboundop %d", unboundop);
+    struct _queuedefaults defaults = {-1, -1};
+    if (unboundarg < 0 || fallbackarg < 0) {
+        int err = queue_get_defaults(&_globals.queues, qid, &defaults);
+        if (handle_queue_error(err, self, qid)) {
+            return NULL;
+        }
+    }
+    unboundop_t unboundop;
+    if (resolve_unboundop(unboundarg, defaults.unboundop, &unboundop) < 0) {
+        return NULL;
+    }
+    xidata_fallback_t fallback;
+    if (resolve_fallback(fallbackarg, defaults.fallback, &fallback) < 0) {
         return NULL;
     }
 
     /* Queue up the object. */
-    int err = queue_put(&_globals.queues, qid, obj, fmt, unboundop);
+    int err = queue_put(&_globals.queues, qid, obj, unboundop, fallback);
     // This is the only place that raises QueueFull.
     if (handle_queue_error(err, self, qid)) {
         return NULL;
@@ -1612,7 +1637,7 @@ queuesmod_put(PyObject *self, PyObject *args, PyObject *kwds)
 }
 
 PyDoc_STRVAR(queuesmod_put_doc,
-"put(qid, obj, fmt)\n\
+"put(qid, obj)\n\
 \n\
 Add the object's data to the queue.");
 
@@ -1628,27 +1653,26 @@ queuesmod_get(PyObject *self, PyObject *args, PyObject *kwds)
     int64_t qid = qidarg.id;
 
     PyObject *obj = NULL;
-    int fmt = 0;
     int unboundop = 0;
-    int err = queue_get(&_globals.queues, qid, &obj, &fmt, &unboundop);
+    int err = queue_get(&_globals.queues, qid, &obj, &unboundop);
     // This is the only place that raises QueueEmpty.
     if (handle_queue_error(err, self, qid)) {
         return NULL;
     }
 
     if (obj == NULL) {
-        return Py_BuildValue("Oii", Py_None, fmt, unboundop);
+        return Py_BuildValue("Oi", Py_None, unboundop);
     }
-    PyObject *res = Py_BuildValue("OiO", obj, fmt, Py_None);
+    PyObject *res = Py_BuildValue("OO", obj, Py_None);
     Py_DECREF(obj);
     return res;
 }
 
 PyDoc_STRVAR(queuesmod_get_doc,
-"get(qid) -> (obj, fmt)\n\
+"get(qid) -> (obj, unboundop)\n\
 \n\
 Return a new object from the data at the front of the queue.\n\
-The object's format is also returned.\n\
+The unbound op is also returned.\n\
 \n\
 If there is nothing to receive then raise QueueEmpty.");
 
@@ -1748,17 +1772,14 @@ queuesmod_get_queue_defaults(PyObject *self, PyObject *args, PyObject *kwds)
     }
     int64_t qid = qidarg.id;
 
-    _queue *queue = NULL;
-    int err = _queues_lookup(&_globals.queues, qid, &queue);
+    struct _queuedefaults defaults;
+    int err = queue_get_defaults(&_globals.queues, qid, &defaults);
     if (handle_queue_error(err, self, qid)) {
         return NULL;
     }
-    int fmt = queue->defaults.fmt;
-    int unboundop = queue->defaults.unboundop;
-    _queue_unmark_waiter(queue, _globals.queues.mutex);
 
-    PyObject *defaults = Py_BuildValue("ii", fmt, unboundop);
-    return defaults;
+    PyObject *res = Py_BuildValue("ii", defaults.unboundop, defaults.fallback);
+    return res;
 }
 
 PyDoc_STRVAR(queuesmod_get_queue_defaults_doc,
index d73cbca36359c7a2ecc1831451f712dc14f7c10b..40fd51d752e324b88a3fc6ec86544d93a6fcb96f 100644 (file)
@@ -39,10 +39,37 @@ _get_interpid(_PyXIData_t *data)
 }
 
 
+#ifdef HAS_FALLBACK
+static int
+resolve_fallback(int arg, xidata_fallback_t dflt,
+                 xidata_fallback_t *p_fallback)
+{
+    if (arg < 0) {
+        *p_fallback = dflt;
+        return 0;
+    }
+    xidata_fallback_t fallback;
+    if (arg == _PyXIDATA_XIDATA_ONLY) {
+        fallback =_PyXIDATA_XIDATA_ONLY;
+    }
+    else if (arg == _PyXIDATA_FULL_FALLBACK) {
+        fallback = _PyXIDATA_FULL_FALLBACK;
+    }
+    else {
+        PyErr_Format(PyExc_ValueError, "unsupported fallback %d", arg);
+        return -1;
+    }
+    *p_fallback = fallback;
+    return 0;
+}
+#endif
+
+
 /* unbound items ************************************************************/
 
 #ifdef HAS_UNBOUND_ITEMS
 
+typedef int unboundop_t;
 #define UNBOUND_REMOVE 1
 #define UNBOUND_ERROR 2
 #define UNBOUND_REPLACE 3
@@ -53,6 +80,7 @@ _get_interpid(_PyXIData_t *data)
 // object is released but the underlying data is copied (with the "raw"
 // allocator) and used when the item is popped off the queue.
 
+#ifndef NDEBUG
 static int
 check_unbound(int unboundop)
 {
@@ -65,5 +93,31 @@ check_unbound(int unboundop)
         return 0;
     }
 }
+#endif
+
+static int
+resolve_unboundop(int arg, unboundop_t dflt, unboundop_t *p_unboundop)
+{
+    if (arg < 0) {
+        *p_unboundop = dflt;
+        return 0;
+    }
+    unboundop_t op;
+    if (arg == UNBOUND_REMOVE) {
+        op = UNBOUND_REMOVE;
+    }
+    else if (arg == UNBOUND_ERROR) {
+        op = UNBOUND_ERROR;
+    }
+    else if (arg == UNBOUND_REPLACE) {
+        op = UNBOUND_REPLACE;
+    }
+    else {
+        PyErr_Format(PyExc_ValueError, "unsupported unboundop %d", arg);
+        return -1;
+    }
+    *p_unboundop = op;
+    return 0;
+}
 
 #endif
index dc67de4a40849d94dc538304a0419519c704d9e6..6681b969183925face0ca725ac502b5b12004233 100644 (file)
@@ -1839,6 +1839,7 @@ _sharednsitem_set_value(_PyXI_namespace_item *item, PyObject *value)
         return -1;
     }
     PyThreadState *tstate = PyThreadState_Get();
+    // XXX Use _PyObject_GetXIDataWithFallback()?
     if (_PyObject_GetXIDataNoFallback(tstate, value, item->xidata) != 0) {
         PyMem_RawFree(item->xidata);
         item->xidata = NULL;