--- /dev/null
+"""Common code between queues and channels."""
+
+
+class ItemInterpreterDestroyed(Exception):
+ """Raised when trying to get an item whose interpreter was destroyed."""
+
+
+class classonly:
+ """A non-data descriptor that makes a value only visible on the class.
+
+ This is like the "classmethod" builtin, but does not show up on
+ instances of the class. It may be used as a decorator.
+ """
+
+ def __init__(self, value):
+ self.value = value
+ self.getter = classmethod(value).__get__
+ self.name = None
+
+ def __set_name__(self, cls, name):
+ if self.name is not None:
+ raise TypeError('already used')
+ self.name = name
+
+ def __get__(self, obj, cls):
+ if obj is not None:
+ raise AttributeError(self.name)
+ # called on the class
+ return self.getter(None, cls)
+
+
+class UnboundItem:
+ """Represents a cross-interpreter item no longer bound to an interpreter.
+
+ An item is unbound when the interpreter that added it to the
+ cross-interpreter container is destroyed.
+ """
+
+ __slots__ = ()
+
+ @classonly
+ def singleton(cls, kind, module, name='UNBOUND'):
+ doc = cls.__doc__.replace('cross-interpreter container', kind)
+ doc = doc.replace('cross-interpreter', kind)
+ subclass = type(
+ f'Unbound{kind.capitalize()}Item',
+ (cls,),
+ dict(
+ _MODULE=module,
+ _NAME=name,
+ __doc__=doc,
+ ),
+ )
+ return object.__new__(subclass)
+
+ _MODULE = __name__
+ _NAME = 'UNBOUND'
+
+ def __new__(cls):
+ raise Exception(f'use {cls._MODULE}.{cls._NAME}')
+
+ def __repr__(self):
+ return f'{self._MODULE}.{self._NAME}'
+# return f'interpreters.queues.UNBOUND'
+
+
+UNBOUND = object.__new__(UnboundItem)
+UNBOUND_ERROR = object()
+UNBOUND_REMOVE = object()
+
+_UNBOUND_CONSTANT_TO_FLAG = {
+ UNBOUND_REMOVE: 1,
+ UNBOUND_ERROR: 2,
+ UNBOUND: 3,
+}
+_UNBOUND_FLAG_TO_CONSTANT = {v: k
+ for k, v in _UNBOUND_CONSTANT_TO_FLAG.items()}
+
+
+def serialize_unbound(unbound):
+ op = unbound
+ try:
+ flag = _UNBOUND_CONSTANT_TO_FLAG[op]
+ except KeyError:
+ raise NotImplementedError(f'unsupported unbound replacement op {op!r}')
+ return flag,
+
+
+def resolve_unbound(flag, exctype_destroyed):
+ try:
+ op = _UNBOUND_FLAG_TO_CONSTANT[flag]
+ except KeyError:
+ raise NotImplementedError(f'unsupported unbound replacement op {flag!r}')
+ if op is UNBOUND_REMOVE:
+ # "remove" not possible here
+ raise NotImplementedError
+ elif op is UNBOUND_ERROR:
+ raise exctype_destroyed("item's original interpreter destroyed")
+ elif op is UNBOUND:
+ return UNBOUND
+ else:
+ raise NotImplementedError(repr(op))
import time
import _interpchannels as _channels
+from . import _crossinterp
# aliases:
from _interpchannels import (
ChannelError, ChannelNotFoundError, ChannelClosedError,
ChannelEmptyError, ChannelNotEmptyError,
)
+from ._crossinterp import (
+ UNBOUND_ERROR, UNBOUND_REMOVE,
+)
__all__ = [
+ 'UNBOUND', 'UNBOUND_ERROR', 'UNBOUND_REMOVE',
'create', 'list_all',
'SendChannel', 'RecvChannel',
'ChannelError', 'ChannelNotFoundError', 'ChannelEmptyError',
+ 'ItemInterpreterDestroyed',
]
-def create():
+class ItemInterpreterDestroyed(ChannelError,
+ _crossinterp.ItemInterpreterDestroyed):
+ """Raised from get() and get_nowait()."""
+
+
+UNBOUND = _crossinterp.UnboundItem.singleton('queue', __name__)
+
+
+def _serialize_unbound(unbound):
+ if unbound is UNBOUND:
+ unbound = _crossinterp.UNBOUND
+ return _crossinterp.serialize_unbound(unbound)
+
+
+def _resolve_unbound(flag):
+ resolved = _crossinterp.resolve_unbound(flag, ItemInterpreterDestroyed)
+ if resolved is _crossinterp.UNBOUND:
+ resolved = UNBOUND
+ return resolved
+
+
+def create(*, unbounditems=UNBOUND):
"""Return (recv, send) for a new cross-interpreter channel.
The channel may be used to pass data safely between interpreters.
+
+ "unbounditems" sets the default for the send end of the channel.
+ See SendChannel.send() for supported values. The default value
+ is UNBOUND, which replaces the unbound item when received.
"""
- cid = _channels.create()
- recv, send = RecvChannel(cid), SendChannel(cid)
+ unbound = _serialize_unbound(unbounditems)
+ unboundop, = unbound
+ cid = _channels.create(unboundop)
+ recv, send = RecvChannel(cid), SendChannel(cid, _unbound=unbound)
return recv, send
def list_all():
"""Return a list of (recv, send) for all open channels."""
- return [(RecvChannel(cid), SendChannel(cid))
- for cid in _channels.list_all()]
+ return [(RecvChannel(cid), SendChannel(cid, _unbound=unbound))
+ for cid, unbound in _channels.list_all()]
class _ChannelEnd:
if timeout < 0:
raise ValueError(f'timeout value must be non-negative')
end = time.time() + timeout
- obj = _channels.recv(self._id, _sentinel)
+ obj, unboundop = _channels.recv(self._id, _sentinel)
while obj is _sentinel:
time.sleep(_delay)
if timeout is not None and time.time() >= end:
raise TimeoutError
- obj = _channels.recv(self._id, _sentinel)
+ obj, unboundop = _channels.recv(self._id, _sentinel)
+ if unboundop is not None:
+ assert obj is None, repr(obj)
+ return _resolve_unbound(unboundop)
return obj
def recv_nowait(self, default=_NOT_SET):
is the same as recv().
"""
if default is _NOT_SET:
- return _channels.recv(self._id)
+ obj, unboundop = _channels.recv(self._id)
else:
- return _channels.recv(self._id, default)
+ obj, unboundop = _channels.recv(self._id, default)
+ if unboundop is not None:
+ assert obj is None, repr(obj)
+ return _resolve_unbound(unboundop)
+ return obj
def close(self):
_channels.close(self._id, recv=True)
_end = 'send'
+ def __new__(cls, cid, *, _unbound=None):
+ if _unbound is None:
+ try:
+ op = _channels.get_channel_defaults(cid)
+ _unbound = (op,)
+ except ChannelNotFoundError:
+ _unbound = _serialize_unbound(UNBOUND)
+ self = super().__new__(cls, cid)
+ self._unbound = _unbound
+ return self
+
@property
def is_closed(self):
info = self._info
return info.closed or info.closing
- def send(self, obj, timeout=None):
+ def send(self, obj, timeout=None, *,
+ unbound=None,
+ ):
"""Send the object (i.e. its data) to the channel's receiving end.
This blocks until the object is received.
"""
- _channels.send(self._id, obj, timeout=timeout, blocking=True)
+ if unbound is None:
+ unboundop, = self._unbound
+ else:
+ unboundop, = _serialize_unbound(unbound)
+ _channels.send(self._id, obj, unboundop, timeout=timeout, blocking=True)
- def send_nowait(self, obj):
+ def send_nowait(self, obj, *,
+ unbound=None,
+ ):
"""Send the object to the channel's receiving end.
If the object is immediately received then return True
(else False). Otherwise this is the same as send().
"""
+ if unbound is None:
+ unboundop, = self._unbound
+ else:
+ unboundop, = _serialize_unbound(unbound)
# XXX Note that at the moment channel_send() only ever returns
# None. This should be fixed when channel_send_wait() is added.
# See bpo-32604 and gh-19829.
- return _channels.send(self._id, obj, blocking=False)
+ return _channels.send(self._id, obj, unboundop, blocking=False)
- def send_buffer(self, obj, timeout=None):
+ def send_buffer(self, obj, timeout=None, *,
+ unbound=None,
+ ):
"""Send the object's buffer to the channel's receiving end.
This blocks until the object is received.
"""
- _channels.send_buffer(self._id, obj, timeout=timeout, blocking=True)
+ if unbound is None:
+ unboundop, = self._unbound
+ else:
+ unboundop, = _serialize_unbound(unbound)
+ _channels.send_buffer(self._id, obj, unboundop,
+ timeout=timeout, blocking=True)
- def send_buffer_nowait(self, obj):
+ def send_buffer_nowait(self, obj, *,
+ unbound=None,
+ ):
"""Send the object's buffer to the channel's receiving end.
If the object is immediately received then return True
(else False). Otherwise this is the same as send().
"""
- return _channels.send_buffer(self._id, obj, blocking=False)
+ if unbound is None:
+ unboundop, = self._unbound
+ else:
+ unboundop, = _serialize_unbound(unbound)
+ return _channels.send_buffer(self._id, obj, unboundop, blocking=False)
def close(self):
_channels.close(self._id, send=True)
import time
import weakref
import _interpqueues as _queues
+from . import _crossinterp
# aliases:
from _interpqueues import (
QueueError, QueueNotFoundError,
)
+from ._crossinterp import (
+ UNBOUND_ERROR, UNBOUND_REMOVE,
+)
__all__ = [
'UNBOUND', 'UNBOUND_ERROR', 'UNBOUND_REMOVE',
"""
-class ItemInterpreterDestroyed(QueueError):
+class ItemInterpreterDestroyed(QueueError,
+ _crossinterp.ItemInterpreterDestroyed):
"""Raised from get() and get_nowait()."""
_PICKLED = 1
-class UnboundItem:
- """Represents a Queue item no longer bound to an interpreter.
-
- An item is unbound when the interpreter that added it to the queue
- is destroyed.
- """
-
- __slots__ = ()
-
- def __new__(cls):
- return UNBOUND
-
- def __repr__(self):
- return f'interpreters.queues.UNBOUND'
-
-
-UNBOUND = object.__new__(UnboundItem)
-UNBOUND_ERROR = object()
-UNBOUND_REMOVE = object()
+UNBOUND = _crossinterp.UnboundItem.singleton('queue', __name__)
-_UNBOUND_CONSTANT_TO_FLAG = {
- UNBOUND_REMOVE: 1,
- UNBOUND_ERROR: 2,
- UNBOUND: 3,
-}
-_UNBOUND_FLAG_TO_CONSTANT = {v: k
- for k, v in _UNBOUND_CONSTANT_TO_FLAG.items()}
def _serialize_unbound(unbound):
- op = unbound
- try:
- flag = _UNBOUND_CONSTANT_TO_FLAG[op]
- except KeyError:
- raise NotImplementedError(f'unsupported unbound replacement op {op!r}')
- return flag,
+ if unbound is UNBOUND:
+ unbound = _crossinterp.UNBOUND
+ return _crossinterp.serialize_unbound(unbound)
def _resolve_unbound(flag):
- try:
- op = _UNBOUND_FLAG_TO_CONSTANT[flag]
- except KeyError:
- raise NotImplementedError(f'unsupported unbound replacement op {flag!r}')
- if op is UNBOUND_REMOVE:
- # "remove" not possible here
- raise NotImplementedError
- elif op is UNBOUND_ERROR:
- raise ItemInterpreterDestroyed("item's original interpreter destroyed")
- elif op is UNBOUND:
- return UNBOUND
- else:
- raise NotImplementedError(repr(op))
+ resolved = _crossinterp.resolve_unbound(flag, ItemInterpreterDestroyed)
+ if resolved is _crossinterp.UNBOUND:
+ resolved = UNBOUND
+ return resolved
def create(maxsize=0, *, syncobj=False, unbounditems=UNBOUND):
from test.support import import_helper
+_channels = import_helper.import_module('_interpchannels')
+from test.support.interpreters import _crossinterp
from test.test__interpreters import (
_interpreters,
_run_output,
)
-_channels = import_helper.import_module('_interpchannels')
+REPLACE = _crossinterp._UNBOUND_CONSTANT_TO_FLAG[_crossinterp.UNBOUND]
# Additional tests are found in Lib/test/test_interpreters/test_channels.py.
def recv_wait(cid):
while True:
try:
- return _channels.recv(cid)
+ obj, unboundop = _channels.recv(cid)
except _channels.ChannelEmptyError:
time.sleep(0.1)
+ else:
+ assert unboundop is None, repr(unboundop)
+ return obj
+
+
+def recv_nowait(cid, *args, unbound=False):
+ obj, unboundop = _channels.recv(cid, *args)
+ assert (unboundop is None) != unbound, repr(unboundop)
+ return obj
+
#@contextmanager
#def run_threaded(id, source, **shared):
else:
raise Exception('expected ChannelEmptyError')
else:
- _channels.recv(cid)
+ recv_nowait(cid)
return state.decr()
else:
raise ValueError(end)
def clean_up_channels():
- for cid in _channels.list_all():
+ for cid, _ in _channels.list_all():
try:
_channels.destroy(cid)
except _channels.ChannelNotFoundError:
_channels._channel_id(10, send=False, recv=False)
def test_does_not_exist(self):
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
with self.assertRaises(_channels.ChannelNotFoundError):
_channels._channel_id(int(cid) + 1) # unforced
self.assertEqual(repr(cid), 'ChannelID(10)')
def test_equality(self):
- cid1 = _channels.create()
+ cid1 = _channels.create(REPLACE)
cid2 = _channels._channel_id(int(cid1))
- cid3 = _channels.create()
+ cid3 = _channels.create(REPLACE)
self.assertTrue(cid1 == cid1)
self.assertTrue(cid1 == cid2)
self.assertTrue(cid1 != cid3)
def test_shareable(self):
- chan = _channels.create()
+ chan = _channels.create(REPLACE)
- obj = _channels.create()
+ obj = _channels.create(REPLACE)
_channels.send(chan, obj, blocking=False)
- got = _channels.recv(chan)
+ got = recv_nowait(chan)
self.assertEqual(got, obj)
self.assertIs(type(got), type(obj))
class ChannelTests(TestBase):
def test_create_cid(self):
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
self.assertIsInstance(cid, _channels.ChannelID)
def test_sequential_ids(self):
- before = _channels.list_all()
- id1 = _channels.create()
- id2 = _channels.create()
- id3 = _channels.create()
- after = _channels.list_all()
+ before = [cid for cid, _ in _channels.list_all()]
+ id1 = _channels.create(REPLACE)
+ id2 = _channels.create(REPLACE)
+ id3 = _channels.create(REPLACE)
+ after = [cid for cid, _ in _channels.list_all()]
self.assertEqual(id2, int(id1) + 1)
self.assertEqual(id3, int(id2) + 1)
id1 = _interpreters.create()
out = _run_output(id1, dedent("""
import _interpchannels as _channels
- cid = _channels.create()
+ cid = _channels.create(3)
print(cid)
"""))
cid1 = int(out.strip())
id2 = _interpreters.create()
out = _run_output(id2, dedent("""
import _interpchannels as _channels
- cid = _channels.create()
+ cid = _channels.create(3)
print(cid)
"""))
cid2 = int(out.strip())
def test_channel_list_interpreters_none(self):
"""Test listing interpreters for a channel with no associations."""
# Test for channel with no associated _interpreters.
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
send_interps = _channels.list_interpreters(cid, send=True)
recv_interps = _channels.list_interpreters(cid, send=False)
self.assertEqual(send_interps, [])
def test_channel_list_interpreters_basic(self):
"""Test basic listing channel _interpreters."""
interp0, *_ = _interpreters.get_main()
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
_channels.send(cid, "send", blocking=False)
# Test for a channel that has one end associated to an interpreter.
send_interps = _channels.list_interpreters(cid, send=True)
interp1 = _interpreters.create()
_run_output(interp1, dedent(f"""
import _interpchannels as _channels
- obj = _channels.recv({cid})
+ _channels.recv({cid})
"""))
# Test for channel that has both ends associated to an interpreter.
send_interps = _channels.list_interpreters(cid, send=True)
interp1 = _interpreters.create()
interp2 = _interpreters.create()
interp3 = _interpreters.create()
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
_channels.send(cid, "send", blocking=False)
_run_output(interp1, dedent(f"""
"""))
_run_output(interp2, dedent(f"""
import _interpchannels as _channels
- obj = _channels.recv({cid})
+ _channels.recv({cid})
"""))
_run_output(interp3, dedent(f"""
import _interpchannels as _channels
- obj = _channels.recv({cid})
+ _channels.recv({cid})
"""))
send_interps = _channels.list_interpreters(cid, send=True)
recv_interps = _channels.list_interpreters(cid, send=False)
"""Test listing channel interpreters with a destroyed interpreter."""
interp0, *_ = _interpreters.get_main()
interp1 = _interpreters.create()
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
_channels.send(cid, "send", blocking=False)
_run_output(interp1, dedent(f"""
import _interpchannels as _channels
- obj = _channels.recv({cid})
+ _channels.recv({cid})
"""))
# Should be one interpreter associated with each end.
send_interps = _channels.list_interpreters(cid, send=True)
interp0, *_ = _interpreters.get_main()
interp1 = _interpreters.create()
interp2 = _interpreters.create()
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
_channels.send(cid, "data", blocking=False)
_run_output(interp1, dedent(f"""
import _interpchannels as _channels
- obj = _channels.recv({cid})
+ _channels.recv({cid})
"""))
_channels.send(cid, "data", blocking=False)
_run_output(interp2, dedent(f"""
import _interpchannels as _channels
- obj = _channels.recv({cid})
+ _channels.recv({cid})
"""))
# Check the setup.
send_interps = _channels.list_interpreters(cid, send=True)
"""Test listing channel interpreters with a closed channel."""
interp0, *_ = _interpreters.get_main()
interp1 = _interpreters.create()
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
# Put something in the channel so that it's not empty.
_channels.send(cid, "send", blocking=False)
"""Test listing channel interpreters with a channel's send end closed."""
interp0, *_ = _interpreters.get_main()
interp1 = _interpreters.create()
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
# Put something in the channel so that it's not empty.
_channels.send(cid, "send", blocking=False)
_channels.list_interpreters(cid, send=False)
def test_allowed_types(self):
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
objects = [
None,
'spam',
for obj in objects:
with self.subTest(obj):
_channels.send(cid, obj, blocking=False)
- got = _channels.recv(cid)
+ got = recv_nowait(cid)
self.assertEqual(got, obj)
self.assertIs(type(got), type(obj))
# XXX What about between interpreters?
def test_run_string_arg_unresolved(self):
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
interp = _interpreters.create()
_interpreters.set___main___attrs(interp, dict(cid=cid.send))
print(cid.end)
_channels.send(cid, b'spam', blocking=False)
"""))
- obj = _channels.recv(cid)
+ obj = recv_nowait(cid)
self.assertEqual(obj, b'spam')
self.assertEqual(out.strip(), 'send')
# Note: this test caused crashes on some buildbots (bpo-33615).
@unittest.skip('disabled until high-level channels exist')
def test_run_string_arg_resolved(self):
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
cid = _channels._channel_id(cid, _resolve=True)
interp = _interpreters.create()
_channels.send(chan.id, b'spam', blocking=False)
"""),
dict(chan=cid.send))
- obj = _channels.recv(cid)
+ obj = recv_nowait(cid)
self.assertEqual(obj, b'spam')
self.assertEqual(out.strip(), 'send')
# send/recv
def test_send_recv_main(self):
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
orig = b'spam'
_channels.send(cid, orig, blocking=False)
- obj = _channels.recv(cid)
+ obj = recv_nowait(cid)
self.assertEqual(obj, orig)
self.assertIsNot(obj, orig)
id1 = _interpreters.create()
out = _run_output(id1, dedent("""
import _interpchannels as _channels
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
orig = b'spam'
_channels.send(cid, orig, blocking=False)
- obj = _channels.recv(cid)
+ obj, _ = _channels.recv(cid)
assert obj is not orig
assert obj == orig
"""))
def test_send_recv_different_interpreters(self):
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
id1 = _interpreters.create()
out = _run_output(id1, dedent(f"""
import _interpchannels as _channels
_channels.send({cid}, b'spam', blocking=False)
"""))
- obj = _channels.recv(cid)
+ obj = recv_nowait(cid)
self.assertEqual(obj, b'spam')
def test_send_recv_different_threads(self):
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
def f():
obj = recv_wait(cid)
self.assertEqual(obj, b'spam')
def test_send_recv_different_interpreters_and_threads(self):
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
id1 = _interpreters.create()
out = None
import _interpchannels as _channels
while True:
try:
- obj = _channels.recv({cid})
+ obj, _ = _channels.recv({cid})
break
except _channels.ChannelEmptyError:
time.sleep(0.1)
_channels.recv(10)
def test_recv_empty(self):
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
with self.assertRaises(_channels.ChannelEmptyError):
_channels.recv(cid)
def test_recv_default(self):
default = object()
- cid = _channels.create()
- obj1 = _channels.recv(cid, default)
+ cid = _channels.create(REPLACE)
+ obj1 = recv_nowait(cid, default)
_channels.send(cid, None, blocking=False)
_channels.send(cid, 1, blocking=False)
_channels.send(cid, b'spam', blocking=False)
_channels.send(cid, b'eggs', blocking=False)
- obj2 = _channels.recv(cid, default)
- obj3 = _channels.recv(cid, default)
- obj4 = _channels.recv(cid)
- obj5 = _channels.recv(cid, default)
- obj6 = _channels.recv(cid, default)
+ obj2 = recv_nowait(cid, default)
+ obj3 = recv_nowait(cid, default)
+ obj4 = recv_nowait(cid)
+ obj5 = recv_nowait(cid, default)
+ obj6 = recv_nowait(cid, default)
self.assertIs(obj1, default)
self.assertIs(obj2, None)
def test_recv_sending_interp_destroyed(self):
with self.subTest('closed'):
- cid1 = _channels.create()
+ cid1 = _channels.create(REPLACE)
interp = _interpreters.create()
_interpreters.run_string(interp, dedent(f"""
import _interpchannels as _channels
_channels.recv(cid1)
del cid1
with self.subTest('still open'):
- cid2 = _channels.create()
+ cid2 = _channels.create(REPLACE)
interp = _interpreters.create()
_interpreters.run_string(interp, dedent(f"""
import _interpchannels as _channels
_channels.send(cid2, b'eggs', blocking=False)
_interpreters.destroy(interp)
- _channels.recv(cid2)
+ recv_nowait(cid2, unbound=True)
+ recv_nowait(cid2, unbound=False)
with self.assertRaisesRegex(RuntimeError,
f'channel {cid2} is empty'):
_channels.recv(cid2)
def test_send_buffer(self):
buf = bytearray(b'spamspamspam')
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
_channels.send_buffer(cid, buf, blocking=False)
- obj = _channels.recv(cid)
+ obj = recv_nowait(cid)
self.assertIsNot(obj, buf)
self.assertIsInstance(obj, memoryview)
else:
send = _channels.send
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
try:
started = time.monotonic()
send(cid, obj, blocking=False)
stopped = time.monotonic()
- _channels.recv(cid)
+ recv_nowait(cid)
finally:
_channels.destroy(cid)
delay = stopped - started # seconds
received = None
obj = b'spam'
wait = self.build_send_waiter(obj)
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
def f():
nonlocal received
wait()
received = None
obj = bytearray(b'spam')
wait = self.build_send_waiter(obj, buffer=True)
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
def f():
nonlocal received
wait()
def test_send_blocking_no_wait(self):
received = None
obj = b'spam'
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
def f():
nonlocal received
received = recv_wait(cid)
def test_send_buffer_blocking_no_wait(self):
received = None
obj = bytearray(b'spam')
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
def f():
nonlocal received
received = recv_wait(cid)
obj = b'spam'
with self.subTest('non-blocking with timeout'):
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
with self.assertRaises(ValueError):
_channels.send(cid, obj, blocking=False, timeout=0.1)
with self.subTest('timeout hit'):
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
with self.assertRaises(TimeoutError):
_channels.send(cid, obj, blocking=True, timeout=0.1)
with self.assertRaises(_channels.ChannelEmptyError):
- received = _channels.recv(cid)
+ received = recv_nowait(cid)
print(repr(received))
with self.subTest('timeout not hit'):
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
def f():
recv_wait(cid)
t = threading.Thread(target=f)
obj = bytearray(b'spam')
with self.subTest('non-blocking with timeout'):
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
with self.assertRaises(ValueError):
_channels.send_buffer(cid, obj, blocking=False, timeout=0.1)
with self.subTest('timeout hit'):
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
with self.assertRaises(TimeoutError):
_channels.send_buffer(cid, obj, blocking=True, timeout=0.1)
with self.assertRaises(_channels.ChannelEmptyError):
- received = _channels.recv(cid)
+ received = recv_nowait(cid)
print(repr(received))
with self.subTest('timeout not hit'):
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
def f():
recv_wait(cid)
t = threading.Thread(target=f)
wait = self.build_send_waiter(obj)
with self.subTest('without timeout'):
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
def f():
wait()
_channels.close(cid, force=True)
t.join()
with self.subTest('with timeout'):
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
def f():
wait()
_channels.close(cid, force=True)
wait = self.build_send_waiter(obj, buffer=True)
with self.subTest('without timeout'):
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
def f():
wait()
_channels.close(cid, force=True)
t.join()
with self.subTest('with timeout'):
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
def f():
wait()
_channels.close(cid, force=True)
# close
def test_close_single_user(self):
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
_channels.send(cid, b'spam', blocking=False)
- _channels.recv(cid)
+ recv_nowait(cid)
_channels.close(cid)
with self.assertRaises(_channels.ChannelClosedError):
_channels.recv(cid)
def test_close_multiple_users(self):
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
id1 = _interpreters.create()
id2 = _interpreters.create()
_interpreters.run_string(id1, dedent(f"""
self.assertEqual(excsnap.type.__name__, 'ChannelClosedError')
def test_close_multiple_times(self):
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
_channels.send(cid, b'spam', blocking=False)
- _channels.recv(cid)
+ recv_nowait(cid)
_channels.close(cid)
with self.assertRaises(_channels.ChannelClosedError):
]
for send, recv in tests:
with self.subTest((send, recv)):
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
_channels.send(cid, b'spam', blocking=False)
- _channels.recv(cid)
+ recv_nowait(cid)
_channels.close(cid, send=send, recv=recv)
with self.assertRaises(_channels.ChannelClosedError):
_channels.recv(cid)
def test_close_defaults_with_unused_items(self):
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
_channels.send(cid, b'spam', blocking=False)
_channels.send(cid, b'ham', blocking=False)
with self.assertRaises(_channels.ChannelNotEmptyError):
_channels.close(cid)
- _channels.recv(cid)
+ recv_nowait(cid)
_channels.send(cid, b'eggs', blocking=False)
def test_close_recv_with_unused_items_unforced(self):
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
_channels.send(cid, b'spam', blocking=False)
_channels.send(cid, b'ham', blocking=False)
with self.assertRaises(_channels.ChannelNotEmptyError):
_channels.close(cid, recv=True)
- _channels.recv(cid)
+ recv_nowait(cid)
_channels.send(cid, b'eggs', blocking=False)
- _channels.recv(cid)
- _channels.recv(cid)
+ recv_nowait(cid)
+ recv_nowait(cid)
_channels.close(cid, recv=True)
def test_close_send_with_unused_items_unforced(self):
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
_channels.send(cid, b'spam', blocking=False)
_channels.send(cid, b'ham', blocking=False)
_channels.close(cid, send=True)
with self.assertRaises(_channels.ChannelClosedError):
_channels.send(cid, b'eggs')
- _channels.recv(cid)
- _channels.recv(cid)
+ recv_nowait(cid)
+ recv_nowait(cid)
with self.assertRaises(_channels.ChannelClosedError):
_channels.recv(cid)
def test_close_both_with_unused_items_unforced(self):
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
_channels.send(cid, b'spam', blocking=False)
_channels.send(cid, b'ham', blocking=False)
with self.assertRaises(_channels.ChannelNotEmptyError):
_channels.close(cid, recv=True, send=True)
- _channels.recv(cid)
+ recv_nowait(cid)
_channels.send(cid, b'eggs', blocking=False)
- _channels.recv(cid)
- _channels.recv(cid)
+ recv_nowait(cid)
+ recv_nowait(cid)
_channels.close(cid, recv=True)
def test_close_recv_with_unused_items_forced(self):
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
_channels.send(cid, b'spam', blocking=False)
_channels.send(cid, b'ham', blocking=False)
_channels.close(cid, recv=True, force=True)
_channels.recv(cid)
def test_close_send_with_unused_items_forced(self):
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
_channels.send(cid, b'spam', blocking=False)
_channels.send(cid, b'ham', blocking=False)
_channels.close(cid, send=True, force=True)
_channels.recv(cid)
def test_close_both_with_unused_items_forced(self):
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
_channels.send(cid, b'spam', blocking=False)
_channels.send(cid, b'ham', blocking=False)
_channels.close(cid, send=True, recv=True, force=True)
_channels.recv(cid)
def test_close_never_used(self):
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
_channels.close(cid)
with self.assertRaises(_channels.ChannelClosedError):
_channels.recv(cid)
def test_close_by_unassociated_interp(self):
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
_channels.send(cid, b'spam', blocking=False)
interp = _interpreters.create()
_interpreters.run_string(interp, dedent(f"""
_channels.close(cid)
def test_close_used_multiple_times_by_single_user(self):
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
_channels.send(cid, b'spam', blocking=False)
_channels.send(cid, b'spam', blocking=False)
_channels.send(cid, b'spam', blocking=False)
- _channels.recv(cid)
+ recv_nowait(cid)
_channels.close(cid, force=True)
with self.assertRaises(_channels.ChannelClosedError):
_channels.recv(cid)
def test_channel_list_interpreters_invalid_channel(self):
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
# Test for invalid channel ID.
with self.assertRaises(_channels.ChannelNotFoundError):
_channels.list_interpreters(1000, send=True)
def test_channel_list_interpreters_invalid_args(self):
# Tests for invalid arguments passed to the API.
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
with self.assertRaises(TypeError):
_channels.list_interpreters(cid)
"""
def test_single_user(self):
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
_channels.send(cid, b'spam', blocking=False)
- _channels.recv(cid)
+ recv_nowait(cid)
_channels.release(cid, send=True, recv=True)
with self.assertRaises(_channels.ChannelClosedError):
_channels.recv(cid)
def test_multiple_users(self):
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
id1 = _interpreters.create()
id2 = _interpreters.create()
_interpreters.run_string(id1, dedent(f"""
"""))
out = _run_output(id2, dedent(f"""
import _interpchannels as _channels
- obj = _channels.recv({cid})
+ obj, _ = _channels.recv({cid})
_channels.release({cid})
print(repr(obj))
"""))
self.assertEqual(out.strip(), "b'spam'")
def test_no_kwargs(self):
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
_channels.send(cid, b'spam', blocking=False)
- _channels.recv(cid)
+ recv_nowait(cid)
_channels.release(cid)
with self.assertRaises(_channels.ChannelClosedError):
_channels.recv(cid)
def test_multiple_times(self):
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
_channels.send(cid, b'spam', blocking=False)
- _channels.recv(cid)
+ recv_nowait(cid)
_channels.release(cid, send=True, recv=True)
with self.assertRaises(_channels.ChannelClosedError):
_channels.release(cid, send=True, recv=True)
def test_with_unused_items(self):
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
_channels.send(cid, b'spam', blocking=False)
_channels.send(cid, b'ham', blocking=False)
_channels.release(cid, send=True, recv=True)
_channels.recv(cid)
def test_never_used(self):
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
_channels.release(cid)
with self.assertRaises(_channels.ChannelClosedError):
_channels.recv(cid)
def test_by_unassociated_interp(self):
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
_channels.send(cid, b'spam', blocking=False)
interp = _interpreters.create()
_interpreters.run_string(interp, dedent(f"""
import _interpchannels as _channels
_channels.release({cid})
"""))
- obj = _channels.recv(cid)
+ obj = recv_nowait(cid)
_channels.release(cid)
with self.assertRaises(_channels.ChannelClosedError):
def test_close_if_unassociated(self):
# XXX Something's not right with this test...
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
interp = _interpreters.create()
_interpreters.run_string(interp, dedent(f"""
import _interpchannels as _channels
def test_partially(self):
# XXX Is partial close too weird/confusing?
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
_channels.send(cid, None, blocking=False)
- _channels.recv(cid)
+ recv_nowait(cid)
_channels.send(cid, b'spam', blocking=False)
_channels.release(cid, send=True)
- obj = _channels.recv(cid)
+ obj = recv_nowait(cid)
self.assertEqual(obj, b'spam')
def test_used_multiple_times_by_single_user(self):
- cid = _channels.create()
+ cid = _channels.create(REPLACE)
_channels.send(cid, b'spam', blocking=False)
_channels.send(cid, b'spam', blocking=False)
_channels.send(cid, b'spam', blocking=False)
- _channels.recv(cid)
+ recv_nowait(cid)
_channels.release(cid, send=True, recv=True)
with self.assertRaises(_channels.ChannelClosedError):
def _new_channel(self, creator):
if creator.name == 'main':
- return _channels.create()
+ return _channels.create(REPLACE)
else:
- ch = _channels.create()
+ ch = _channels.create(REPLACE)
run_interp(creator.id, f"""
import _interpreters
cid = _xxsubchannels.create()
_xxsubchannels.send({ch}, int(cid), blocking=False)
del _interpreters
""")
- self._cid = _channels.recv(ch)
+ self._cid = recv_nowait(ch)
return self._cid
def _get_interpreter(self, interp):
)
fix.record_action(action, result)
else:
- _cid = _channels.create()
+ _cid = _channels.create(REPLACE)
run_interp(interp.id, f"""
result = helpers.run_action(
{fix.cid},
_channels.send({_cid}, b'X' if result.closed else b'', blocking=False)
""")
result = ChannelState(
- pending=int.from_bytes(_channels.recv(_cid), 'little'),
- closed=bool(_channels.recv(_cid)),
+ pending=int.from_bytes(recv_nowait(_cid), 'little'),
+ closed=bool(recv_nowait(_cid)),
)
fix.record_action(action, result)
self.assertTrue(fix.state.closed)
for _ in range(fix.state.pending):
- _channels.recv(fix.cid)
+ recv_nowait(fix.cid)
self._assert_closed_in_interp(fix)
for interp in ('same', 'other'):
obj[4:8] = b'ham.'
self.assertEqual(obj, buf)
+ def test_send_cleared_with_subinterpreter(self):
+ def common(rch, sch, unbound=None, presize=0):
+ if not unbound:
+ extraargs = ''
+ elif unbound is channels.UNBOUND:
+ extraargs = ', unbound=channels.UNBOUND'
+ elif unbound is channels.UNBOUND_ERROR:
+ extraargs = ', unbound=channels.UNBOUND_ERROR'
+ elif unbound is channels.UNBOUND_REMOVE:
+ extraargs = ', unbound=channels.UNBOUND_REMOVE'
+ else:
+ raise NotImplementedError(repr(unbound))
+ interp = interpreters.create()
+
+ _run_output(interp, dedent(f"""
+ from test.support.interpreters import channels
+ sch = channels.SendChannel({sch.id})
+ obj1 = b'spam'
+ obj2 = b'eggs'
+ sch.send_nowait(obj1{extraargs})
+ sch.send_nowait(obj2{extraargs})
+ """))
+ self.assertEqual(
+ _channels.get_count(rch.id),
+ presize + 2,
+ )
+
+ if presize == 0:
+ obj1 = rch.recv()
+ self.assertEqual(obj1, b'spam')
+ self.assertEqual(
+ _channels.get_count(rch.id),
+ presize + 1,
+ )
+
+ return interp
+
+ with self.subTest('default'): # UNBOUND
+ rch, sch = channels.create()
+ interp = common(rch, sch)
+ del interp
+ self.assertEqual(_channels.get_count(rch.id), 1)
+ obj1 = rch.recv()
+ self.assertEqual(_channels.get_count(rch.id), 0)
+ self.assertIs(obj1, channels.UNBOUND)
+ self.assertEqual(_channels.get_count(rch.id), 0)
+ with self.assertRaises(channels.ChannelEmptyError):
+ rch.recv_nowait()
+
+ with self.subTest('UNBOUND'):
+ rch, sch = channels.create()
+ interp = common(rch, sch, channels.UNBOUND)
+ del interp
+ self.assertEqual(_channels.get_count(rch.id), 1)
+ obj1 = rch.recv()
+ self.assertIs(obj1, channels.UNBOUND)
+ self.assertEqual(_channels.get_count(rch.id), 0)
+ with self.assertRaises(channels.ChannelEmptyError):
+ rch.recv_nowait()
+
+ with self.subTest('UNBOUND_ERROR'):
+ rch, sch = channels.create()
+ interp = common(rch, sch, channels.UNBOUND_ERROR)
+
+ del interp
+ self.assertEqual(_channels.get_count(rch.id), 1)
+ with self.assertRaises(channels.ItemInterpreterDestroyed):
+ rch.recv()
+
+ self.assertEqual(_channels.get_count(rch.id), 0)
+ with self.assertRaises(channels.ChannelEmptyError):
+ rch.recv_nowait()
+
+ with self.subTest('UNBOUND_REMOVE'):
+ rch, sch = channels.create()
+
+ interp = common(rch, sch, channels.UNBOUND_REMOVE)
+ del interp
+ self.assertEqual(_channels.get_count(rch.id), 0)
+ with self.assertRaises(channels.ChannelEmptyError):
+ rch.recv_nowait()
+
+ sch.send_nowait(b'ham', unbound=channels.UNBOUND_REMOVE)
+ self.assertEqual(_channels.get_count(rch.id), 1)
+ interp = common(rch, sch, channels.UNBOUND_REMOVE, 1)
+ self.assertEqual(_channels.get_count(rch.id), 3)
+ sch.send_nowait(42, unbound=channels.UNBOUND_REMOVE)
+ self.assertEqual(_channels.get_count(rch.id), 4)
+ del interp
+ self.assertEqual(_channels.get_count(rch.id), 2)
+ obj1 = rch.recv()
+ obj2 = rch.recv()
+ self.assertEqual(obj1, b'ham')
+ self.assertEqual(obj2, 42)
+ self.assertEqual(_channels.get_count(rch.id), 0)
+ with self.assertRaises(channels.ChannelEmptyError):
+ rch.recv_nowait()
+
+ def test_send_cleared_with_subinterpreter_mixed(self):
+ rch, sch = channels.create()
+ interp = interpreters.create()
+
+ # If we don't associate the main interpreter with the channel
+ # then the channel will be automatically closed when interp
+ # is destroyed.
+ sch.send_nowait(None)
+ rch.recv()
+ self.assertEqual(_channels.get_count(rch.id), 0)
+
+ _run_output(interp, dedent(f"""
+ from test.support.interpreters import channels
+ sch = channels.SendChannel({sch.id})
+ sch.send_nowait(1, unbound=channels.UNBOUND)
+ sch.send_nowait(2, unbound=channels.UNBOUND_ERROR)
+ sch.send_nowait(3)
+ sch.send_nowait(4, unbound=channels.UNBOUND_REMOVE)
+ sch.send_nowait(5, unbound=channels.UNBOUND)
+ """))
+ self.assertEqual(_channels.get_count(rch.id), 5)
+
+ del interp
+ self.assertEqual(_channels.get_count(rch.id), 4)
+
+ obj1 = rch.recv()
+ self.assertIs(obj1, channels.UNBOUND)
+ self.assertEqual(_channels.get_count(rch.id), 3)
+
+ with self.assertRaises(channels.ItemInterpreterDestroyed):
+ rch.recv()
+ self.assertEqual(_channels.get_count(rch.id), 2)
+
+ obj2 = rch.recv()
+ self.assertIs(obj2, channels.UNBOUND)
+ self.assertEqual(_channels.get_count(rch.id), 1)
+
+ obj3 = rch.recv()
+ self.assertIs(obj3, channels.UNBOUND)
+ self.assertEqual(_channels.get_count(rch.id), 0)
+
+ def test_send_cleared_with_subinterpreter_multiple(self):
+ rch, sch = channels.create()
+ interp1 = interpreters.create()
+ interp2 = interpreters.create()
+
+ sch.send_nowait(1)
+ _run_output(interp1, dedent(f"""
+ from test.support.interpreters import channels
+ rch = channels.RecvChannel({rch.id})
+ sch = channels.SendChannel({sch.id})
+ obj1 = rch.recv()
+ sch.send_nowait(2, unbound=channels.UNBOUND)
+ sch.send_nowait(obj1, unbound=channels.UNBOUND_REMOVE)
+ """))
+ _run_output(interp2, dedent(f"""
+ from test.support.interpreters import channels
+ rch = channels.RecvChannel({rch.id})
+ sch = channels.SendChannel({sch.id})
+ obj2 = rch.recv()
+ obj1 = rch.recv()
+ """))
+ self.assertEqual(_channels.get_count(rch.id), 0)
+ sch.send_nowait(3)
+ _run_output(interp1, dedent("""
+ sch.send_nowait(4, unbound=channels.UNBOUND)
+ # interp closed here
+ sch.send_nowait(5, unbound=channels.UNBOUND_REMOVE)
+ sch.send_nowait(6, unbound=channels.UNBOUND)
+ """))
+ _run_output(interp2, dedent("""
+ sch.send_nowait(7, unbound=channels.UNBOUND_ERROR)
+ # interp closed here
+ sch.send_nowait(obj1, unbound=channels.UNBOUND_ERROR)
+ sch.send_nowait(obj2, unbound=channels.UNBOUND_REMOVE)
+ sch.send_nowait(8, unbound=channels.UNBOUND)
+ """))
+ _run_output(interp1, dedent("""
+ sch.send_nowait(9, unbound=channels.UNBOUND_REMOVE)
+ sch.send_nowait(10, unbound=channels.UNBOUND)
+ """))
+ self.assertEqual(_channels.get_count(rch.id), 10)
+
+ obj3 = rch.recv()
+ self.assertEqual(obj3, 3)
+ self.assertEqual(_channels.get_count(rch.id), 9)
+
+ obj4 = rch.recv()
+ self.assertEqual(obj4, 4)
+ self.assertEqual(_channels.get_count(rch.id), 8)
+
+ del interp1
+ self.assertEqual(_channels.get_count(rch.id), 6)
+
+ # obj5 was removed
+
+ obj6 = rch.recv()
+ self.assertIs(obj6, channels.UNBOUND)
+ self.assertEqual(_channels.get_count(rch.id), 5)
+
+ obj7 = rch.recv()
+ self.assertEqual(obj7, 7)
+ self.assertEqual(_channels.get_count(rch.id), 4)
+
+ del interp2
+ self.assertEqual(_channels.get_count(rch.id), 3)
+
+ # obj1
+ with self.assertRaises(channels.ItemInterpreterDestroyed):
+ rch.recv()
+ self.assertEqual(_channels.get_count(rch.id), 2)
+
+ # obj2 was removed
+
+ obj8 = rch.recv()
+ self.assertIs(obj8, channels.UNBOUND)
+ self.assertEqual(_channels.get_count(rch.id), 1)
+
+ # obj9 was removed
+
+ obj10 = rch.recv()
+ self.assertIs(obj10, channels.UNBOUND)
+ self.assertEqual(_channels.get_count(rch.id), 0)
+
if __name__ == '__main__':
# Test needs to be a package, so we can do relative imports.
# Raise SkipTest if subinterpreters not supported.
_queues = import_helper.import_module('_interpqueues')
from test.support import interpreters
-from test.support.interpreters import queues
+from test.support.interpreters import queues, _crossinterp
from .utils import _run_output, TestBase as _TestBase
-REPLACE = queues._UNBOUND_CONSTANT_TO_FLAG[queues.UNBOUND]
+REPLACE = _crossinterp._UNBOUND_CONSTANT_TO_FLAG[_crossinterp.UNBOUND]
def get_num_queues():
#endif
#define REGISTERS_HEAP_TYPES
+#define HAS_UNBOUND_ITEMS
#include "_interpreters_common.h"
+#undef HAS_UNBOUND_ITEMS
#undef REGISTERS_HEAP_TYPES
struct _channelitem;
typedef struct _channelitem {
+ /* The interpreter that added the item to the queue.
+ The actual bound interpid is found in item->data.
+ This is necessary because item->data might be NULL,
+ meaning the interpreter has been destroyed. */
+ int64_t interpid;
_PyCrossInterpreterData *data;
_waiting_t *waiting;
+ int unboundop;
struct _channelitem *next;
} _channelitem;
static void
_channelitem_init(_channelitem *item,
- _PyCrossInterpreterData *data, _waiting_t *waiting)
+ int64_t interpid, _PyCrossInterpreterData *data,
+ _waiting_t *waiting, int unboundop)
{
+ if (interpid < 0) {
+ interpid = _get_interpid(data);
+ }
+ else {
+ assert(data == NULL
+ || _PyCrossInterpreterData_INTERPID(data) < 0
+ || interpid == _PyCrossInterpreterData_INTERPID(data));
+ }
*item = (_channelitem){
+ .interpid = interpid,
.data = data,
.waiting = waiting,
+ .unboundop = unboundop,
};
if (waiting != NULL) {
waiting->itemid = _channelitem_ID(item);
}
static void
-_channelitem_clear(_channelitem *item)
+_channelitem_clear_data(_channelitem *item, int removed)
{
- item->next = NULL;
-
if (item->data != NULL) {
// It was allocated in channel_send().
(void)_release_xid_data(item->data, XID_IGNORE_EXC & XID_FREE);
item->data = NULL;
}
- if (item->waiting != NULL) {
+ if (item->waiting != NULL && removed) {
if (item->waiting->status == WAITING_ACQUIRED) {
_waiting_release(item->waiting, 0);
}
}
}
+static void
+_channelitem_clear(_channelitem *item)
+{
+ item->next = NULL;
+ _channelitem_clear_data(item, 1);
+}
+
static _channelitem *
-_channelitem_new(_PyCrossInterpreterData *data, _waiting_t *waiting)
+_channelitem_new(int64_t interpid, _PyCrossInterpreterData *data,
+ _waiting_t *waiting, int unboundop)
{
_channelitem *item = GLOBAL_MALLOC(_channelitem);
if (item == NULL) {
PyErr_NoMemory();
return NULL;
}
- _channelitem_init(item, data, waiting);
+ _channelitem_init(item, interpid, data, waiting, unboundop);
return item;
}
static void
_channelitem_popped(_channelitem *item,
- _PyCrossInterpreterData **p_data, _waiting_t **p_waiting)
+ _PyCrossInterpreterData **p_data, _waiting_t **p_waiting,
+ int *p_unboundop)
{
assert(item->waiting == NULL || item->waiting->status == WAITING_ACQUIRED);
*p_data = item->data;
*p_waiting = item->waiting;
+ *p_unboundop = item->unboundop;
// We clear them here, so they won't be released in _channelitem_clear().
item->data = NULL;
item->waiting = NULL;
_channelitem_free(item);
}
+static int
+_channelitem_clear_interpreter(_channelitem *item)
+{
+ assert(item->interpid >= 0);
+ if (item->data == NULL) {
+ // Its interpreter was already cleared (or it was never bound).
+ // For UNBOUND_REMOVE it should have been freed at that time.
+ assert(item->unboundop != UNBOUND_REMOVE);
+ return 0;
+ }
+ assert(_PyCrossInterpreterData_INTERPID(item->data) == item->interpid);
+
+ switch (item->unboundop) {
+ case UNBOUND_REMOVE:
+ // The caller must free/clear it.
+ return 1;
+ case UNBOUND_ERROR:
+ case UNBOUND_REPLACE:
+ // We won't need the cross-interpreter data later
+ // so we completely throw it away.
+ _channelitem_clear_data(item, 0);
+ return 0;
+ default:
+ Py_FatalError("not reachable");
+ return -1;
+ }
+}
+
+
typedef struct _channelqueue {
int64_t count;
_channelitem *first;
static int
_channelqueue_put(_channelqueue *queue,
- _PyCrossInterpreterData *data, _waiting_t *waiting)
+ int64_t interpid, _PyCrossInterpreterData *data,
+ _waiting_t *waiting, int unboundop)
{
- _channelitem *item = _channelitem_new(data, waiting);
+ _channelitem *item = _channelitem_new(interpid, data, waiting, unboundop);
if (item == NULL) {
return -1;
}
static int
_channelqueue_get(_channelqueue *queue,
- _PyCrossInterpreterData **p_data, _waiting_t **p_waiting)
+ _PyCrossInterpreterData **p_data, _waiting_t **p_waiting,
+ int *p_unboundop)
{
_channelitem *item = queue->first;
if (item == NULL) {
}
queue->count -= 1;
- _channelitem_popped(item, p_data, p_waiting);
+ _channelitem_popped(item, p_data, p_waiting, p_unboundop);
return 0;
}
}
queue->count -= 1;
- _channelitem_popped(item, p_data, p_waiting);
+ int unboundop;
+ _channelitem_popped(item, p_data, p_waiting, &unboundop);
}
static void
while (next != NULL) {
_channelitem *item = next;
next = item->next;
- if (_PyCrossInterpreterData_INTERPID(item->data) == interpid) {
+ int remove = (item->interpid == interpid)
+ ? _channelitem_clear_interpreter(item)
+ : 0;
+ if (remove) {
+ _channelitem_free(item);
if (prev == NULL) {
- queue->first = item->next;
+ queue->first = next;
}
else {
- prev->next = item->next;
+ prev->next = next;
}
- _channelitem_free(item);
queue->count -= 1;
}
else {
PyThread_type_lock mutex;
_channelqueue *queue;
_channelends *ends;
+ struct {
+ int unboundop;
+ } defaults;
int open;
struct _channel_closing *closing;
} _channel_state;
static _channel_state *
-_channel_new(PyThread_type_lock mutex)
+_channel_new(PyThread_type_lock mutex, int unboundop)
{
_channel_state *chan = GLOBAL_MALLOC(_channel_state);
if (chan == NULL) {
GLOBAL_FREE(chan);
return NULL;
}
+ chan->defaults.unboundop = unboundop;
chan->open = 1;
chan->closing = NULL;
return chan;
static int
_channel_add(_channel_state *chan, int64_t interpid,
- _PyCrossInterpreterData *data, _waiting_t *waiting)
+ _PyCrossInterpreterData *data, _waiting_t *waiting,
+ int unboundop)
{
int res = -1;
PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
goto done;
}
- if (_channelqueue_put(chan->queue, data, waiting) != 0) {
+ if (_channelqueue_put(chan->queue, interpid, data, waiting, unboundop) != 0) {
goto done;
}
// Any errors past this point must cause a _waiting_release() call.
static int
_channel_next(_channel_state *chan, int64_t interpid,
- _PyCrossInterpreterData **p_data, _waiting_t **p_waiting)
+ _PyCrossInterpreterData **p_data, _waiting_t **p_waiting,
+ int *p_unboundop)
{
int err = 0;
PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
goto done;
}
- int empty = _channelqueue_get(chan->queue, p_data, p_waiting);
- assert(empty == 0 || empty == ERR_CHANNEL_EMPTY);
+ int empty = _channelqueue_get(chan->queue, p_data, p_waiting, p_unboundop);
assert(!PyErr_Occurred());
- if (empty && chan->closing != NULL) {
- chan->open = 0;
+ if (empty) {
+ assert(empty == ERR_CHANNEL_EMPTY);
+ if (chan->closing != NULL) {
+ chan->open = 0;
+ }
+ err = ERR_CHANNEL_EMPTY;
+ goto done;
}
done:
PyThread_release_lock(channels->mutex);
}
-static int64_t *
+struct channel_id_and_info {
+ int64_t id;
+ int unboundop;
+};
+
+static struct channel_id_and_info *
_channels_list_all(_channels *channels, int64_t *count)
{
- int64_t *cids = NULL;
+ struct channel_id_and_info *cids = NULL;
PyThread_acquire_lock(channels->mutex, WAIT_LOCK);
- int64_t *ids = PyMem_NEW(int64_t, (Py_ssize_t)(channels->numopen));
+ struct channel_id_and_info *ids =
+ PyMem_NEW(struct channel_id_and_info, (Py_ssize_t)(channels->numopen));
if (ids == NULL) {
goto done;
}
_channelref *ref = channels->head;
for (int64_t i=0; ref != NULL; ref = ref->next, i++) {
- ids[i] = ref->cid;
+ ids[i] = (struct channel_id_and_info){
+ .id = ref->cid,
+ .unboundop = ref->chan->defaults.unboundop,
+ };
}
*count = channels->numopen;
// Create a new channel.
static int64_t
-channel_create(_channels *channels)
+channel_create(_channels *channels, int unboundop)
{
PyThread_type_lock mutex = PyThread_allocate_lock();
if (mutex == NULL) {
return ERR_CHANNEL_MUTEX_INIT;
}
- _channel_state *chan = _channel_new(mutex);
+ _channel_state *chan = _channel_new(mutex, unboundop);
if (chan == NULL) {
PyThread_free_lock(mutex);
return -1;
// Optionally request to be notified when it is received.
static int
channel_send(_channels *channels, int64_t cid, PyObject *obj,
- _waiting_t *waiting)
+ _waiting_t *waiting, int unboundop)
{
PyInterpreterState *interp = _get_current_interp();
if (interp == NULL) {
}
// Add the data to the channel.
- int res = _channel_add(chan, interpid, data, waiting);
+ int res = _channel_add(chan, interpid, data, waiting, unboundop);
PyThread_release_lock(mutex);
if (res != 0) {
// We may chain an exception here:
// Like channel_send(), but strictly wait for the object to be received.
static int
channel_send_wait(_channels *channels, int64_t cid, PyObject *obj,
- PY_TIMEOUT_T timeout)
+ int unboundop, PY_TIMEOUT_T timeout)
{
// We use a stack variable here, so we must ensure that &waiting
// is not held by any channel item at the point this function exits.
}
/* Queue up the object. */
- int res = channel_send(channels, cid, obj, &waiting);
+ int res = channel_send(channels, cid, obj, &waiting, unboundop);
if (res < 0) {
assert(waiting.status == WAITING_NO_STATUS);
goto finally;
// The current interpreter gets associated with the recv end of the channel.
// XXX Support a "wait" mutex?
static int
-channel_recv(_channels *channels, int64_t cid, PyObject **res)
+channel_recv(_channels *channels, int64_t cid, PyObject **res, int *p_unboundop)
{
int err;
*res = NULL;
// Pop off the next item from the channel.
_PyCrossInterpreterData *data = NULL;
_waiting_t *waiting = NULL;
- err = _channel_next(chan, interpid, &data, &waiting);
+ err = _channel_next(chan, interpid, &data, &waiting, p_unboundop);
PyThread_release_lock(mutex);
if (err != 0) {
return err;
}
else if (data == NULL) {
+ // The item was unbound.
assert(!PyErr_Occurred());
+ *res = NULL;
return 0;
}
return (end != NULL && end->open);
}
+static int
+_channel_get_count(_channels *channels, int64_t cid, Py_ssize_t *p_count)
+{
+ PyThread_type_lock mutex = NULL;
+ _channel_state *chan = NULL;
+ int err = _channels_lookup(channels, cid, &mutex, &chan);
+ if (err != 0) {
+ return err;
+ }
+ assert(chan != NULL);
+ int64_t count = chan->queue->count;
+ PyThread_release_lock(mutex);
+
+ *p_count = (Py_ssize_t)count;
+ return 0;
+}
+
/* channel info */
static PyObject *
-channelsmod_create(PyObject *self, PyObject *Py_UNUSED(ignored))
+channelsmod_create(PyObject *self, PyObject *args, PyObject *kwds)
{
- int64_t cid = channel_create(&_globals.channels);
+ static char *kwlist[] = {"unboundop", NULL};
+ int unboundop;
+ if (!PyArg_ParseTupleAndKeywords(args, kwds, "i:create", kwlist,
+ &unboundop))
+ {
+ return NULL;
+ }
+ if (!check_unbound(unboundop)) {
+ PyErr_Format(PyExc_ValueError,
+ "unsupported unboundop %d", unboundop);
+ return NULL;
+ }
+
+ int64_t cid = channel_create(&_globals.channels, unboundop);
if (cid < 0) {
(void)handle_channel_error(-1, self, cid);
return NULL;
}
PyDoc_STRVAR(channelsmod_create_doc,
-"channel_create() -> cid\n\
+"channel_create(unboundop) -> cid\n\
\n\
Create a new cross-interpreter channel and return a unique generated ID.");
channelsmod_list_all(PyObject *self, PyObject *Py_UNUSED(ignored))
{
int64_t count = 0;
- int64_t *cids = _channels_list_all(&_globals.channels, &count);
+ struct channel_id_and_info *cids =
+ _channels_list_all(&_globals.channels, &count);
if (cids == NULL) {
if (count == 0) {
return PyList_New(0);
ids = NULL;
goto finally;
}
- int64_t *cur = cids;
+ struct channel_id_and_info *cur = cids;
for (int64_t i=0; i < count; cur++, i++) {
PyObject *cidobj = NULL;
- int err = newchannelid(state->ChannelIDType, *cur, 0,
+ int err = newchannelid(state->ChannelIDType, cur->id, 0,
&_globals.channels, 0, 0,
(channelid **)&cidobj);
- if (handle_channel_error(err, self, *cur)) {
+ if (handle_channel_error(err, self, cur->id)) {
assert(cidobj == NULL);
Py_SETREF(ids, NULL);
break;
}
assert(cidobj != NULL);
- PyList_SET_ITEM(ids, (Py_ssize_t)i, cidobj);
+
+ PyObject *item = Py_BuildValue("Oi", cidobj, cur->unboundop);
+ Py_DECREF(cidobj);
+ if (item == NULL) {
+ Py_SETREF(ids, NULL);
+ break;
+ }
+ PyList_SET_ITEM(ids, (Py_ssize_t)i, item);
}
finally:
static PyObject *
channelsmod_send(PyObject *self, PyObject *args, PyObject *kwds)
{
- static char *kwlist[] = {"cid", "obj", "blocking", "timeout", NULL};
+ static char *kwlist[] = {"cid", "obj", "unboundop", "blocking", "timeout",
+ NULL};
struct channel_id_converter_data cid_data = {
.module = self,
};
PyObject *obj;
+ int unboundop = UNBOUND_REPLACE;
int blocking = 1;
PyObject *timeout_obj = NULL;
- if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&O|$pO:channel_send", kwlist,
+ if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&O|i$pO:channel_send", kwlist,
channel_id_converter, &cid_data, &obj,
- &blocking, &timeout_obj)) {
+ &unboundop, &blocking, &timeout_obj))
+ {
+ return NULL;
+ }
+ if (!check_unbound(unboundop)) {
+ PyErr_Format(PyExc_ValueError,
+ "unsupported unboundop %d", unboundop);
return NULL;
}
/* Queue up the object. */
int err = 0;
if (blocking) {
- err = channel_send_wait(&_globals.channels, cid, obj, timeout);
+ err = channel_send_wait(&_globals.channels, cid, obj, unboundop, timeout);
}
else {
- err = channel_send(&_globals.channels, cid, obj, NULL);
+ err = channel_send(&_globals.channels, cid, obj, NULL, unboundop);
}
if (handle_channel_error(err, self, cid)) {
return NULL;
static PyObject *
channelsmod_send_buffer(PyObject *self, PyObject *args, PyObject *kwds)
{
- static char *kwlist[] = {"cid", "obj", "blocking", "timeout", NULL};
+ static char *kwlist[] = {"cid", "obj", "unboundop", "blocking", "timeout",
+ NULL};
struct channel_id_converter_data cid_data = {
.module = self,
};
PyObject *obj;
+ int unboundop = UNBOUND_REPLACE;
int blocking = 1;
PyObject *timeout_obj = NULL;
if (!PyArg_ParseTupleAndKeywords(args, kwds,
- "O&O|$pO:channel_send_buffer", kwlist,
+ "O&O|i$pO:channel_send_buffer", kwlist,
channel_id_converter, &cid_data, &obj,
- &blocking, &timeout_obj)) {
+ &unboundop, &blocking, &timeout_obj)) {
+ return NULL;
+ }
+ if (!check_unbound(unboundop)) {
+ PyErr_Format(PyExc_ValueError,
+ "unsupported unboundop %d", unboundop);
return NULL;
}
/* Queue up the object. */
int err = 0;
if (blocking) {
- err = channel_send_wait(&_globals.channels, cid, tempobj, timeout);
+ err = channel_send_wait(
+ &_globals.channels, cid, tempobj, unboundop, timeout);
}
else {
- err = channel_send(&_globals.channels, cid, tempobj, NULL);
+ err = channel_send(&_globals.channels, cid, tempobj, NULL, unboundop);
}
Py_DECREF(tempobj);
if (handle_channel_error(err, self, cid)) {
cid = cid_data.cid;
PyObject *obj = NULL;
- int err = channel_recv(&_globals.channels, cid, &obj);
- if (handle_channel_error(err, self, cid)) {
- return NULL;
- }
- Py_XINCREF(dflt);
- if (obj == NULL) {
+ int unboundop = 0;
+ int err = channel_recv(&_globals.channels, cid, &obj, &unboundop);
+ if (err == ERR_CHANNEL_EMPTY && dflt != NULL) {
// Use the default.
- if (dflt == NULL) {
- (void)handle_channel_error(ERR_CHANNEL_EMPTY, self, cid);
- return NULL;
- }
obj = Py_NewRef(dflt);
+ err = 0;
}
- Py_XDECREF(dflt);
- return obj;
+ else if (handle_channel_error(err, self, cid)) {
+ return NULL;
+ }
+ else if (obj == NULL) {
+ // The item was unbound.
+ return Py_BuildValue("Oi", Py_None, unboundop);
+ }
+
+ PyObject *res = Py_BuildValue("OO", obj, Py_None);
+ Py_DECREF(obj);
+ return res;
}
PyDoc_STRVAR(channelsmod_recv_doc,
-"channel_recv(cid, [default]) -> obj\n\
+"channel_recv(cid, [default]) -> (obj, unboundop)\n\
\n\
Return a new object from the data at the front of the channel's queue.\n\
\n\
(bool) may be used to indicate the ends to close. By default both\n\
ends are closed. Closing an already closed end is a noop.");
+static PyObject *
+channelsmod_get_count(PyObject *self, PyObject *args, PyObject *kwds)
+{
+ static char *kwlist[] = {"cid", NULL};
+ struct channel_id_converter_data cid_data = {
+ .module = self,
+ };
+ if (!PyArg_ParseTupleAndKeywords(args, kwds,
+ "O&:get_count", kwlist,
+ channel_id_converter, &cid_data)) {
+ return NULL;
+ }
+ int64_t cid = cid_data.cid;
+
+ Py_ssize_t count = -1;
+ int err = _channel_get_count(&_globals.channels, cid, &count);
+ if (handle_channel_error(err, self, cid)) {
+ return NULL;
+ }
+ assert(count >= 0);
+ return PyLong_FromSsize_t(count);
+}
+
+PyDoc_STRVAR(channelsmod_get_count_doc,
+"get_count(cid)\n\
+\n\
+Return the number of items in the channel.");
+
static PyObject *
channelsmod_get_info(PyObject *self, PyObject *args, PyObject *kwds)
{
\n\
Return details about the channel.");
+static PyObject *
+channelsmod_get_channel_defaults(PyObject *self, PyObject *args, PyObject *kwds)
+{
+ static char *kwlist[] = {"cid", NULL};
+ struct channel_id_converter_data cid_data = {
+ .module = self,
+ };
+ if (!PyArg_ParseTupleAndKeywords(args, kwds,
+ "O&:get_channel_defaults", kwlist,
+ channel_id_converter, &cid_data)) {
+ return NULL;
+ }
+ int64_t cid = cid_data.cid;
+
+ PyThread_type_lock mutex = NULL;
+ _channel_state *channel = NULL;
+ int err = _channels_lookup(&_globals.channels, cid, &mutex, &channel);
+ if (handle_channel_error(err, self, cid)) {
+ return NULL;
+ }
+ int unboundop = channel->defaults.unboundop;
+ PyThread_release_lock(mutex);
+
+ PyObject *defaults = Py_BuildValue("i", unboundop);
+ return defaults;
+}
+
+PyDoc_STRVAR(channelsmod_get_channel_defaults_doc,
+"get_channel_defaults(cid)\n\
+\n\
+Return the channel's default values, set when it was created.");
+
static PyObject *
channelsmod__channel_id(PyObject *self, PyObject *args, PyObject *kwds)
{
}
static PyMethodDef module_functions[] = {
- {"create", channelsmod_create,
- METH_NOARGS, channelsmod_create_doc},
+ {"create", _PyCFunction_CAST(channelsmod_create),
+ METH_VARARGS | METH_KEYWORDS, channelsmod_create_doc},
{"destroy", _PyCFunction_CAST(channelsmod_destroy),
METH_VARARGS | METH_KEYWORDS, channelsmod_destroy_doc},
{"list_all", channelsmod_list_all,
METH_VARARGS | METH_KEYWORDS, channelsmod_close_doc},
{"release", _PyCFunction_CAST(channelsmod_release),
METH_VARARGS | METH_KEYWORDS, channelsmod_release_doc},
+ {"get_count", _PyCFunction_CAST(channelsmod_get_count),
+ METH_VARARGS | METH_KEYWORDS, channelsmod_get_count_doc},
{"get_info", _PyCFunction_CAST(channelsmod_get_info),
METH_VARARGS | METH_KEYWORDS, channelsmod_get_info_doc},
+ {"get_channel_defaults", _PyCFunction_CAST(channelsmod_get_channel_defaults),
+ METH_VARARGS | METH_KEYWORDS, channelsmod_get_channel_defaults_doc},
{"_channel_id", _PyCFunction_CAST(channelsmod__channel_id),
METH_VARARGS | METH_KEYWORDS, NULL},
{"_register_end_types", _PyCFunction_CAST(channelsmod__register_end_types),
#include "pycore_crossinterp.h" // struct _xid
#define REGISTERS_HEAP_TYPES
+#define HAS_UNBOUND_ITEMS
#include "_interpreters_common.h"
+#undef HAS_UNBOUND_ITEMS
#undef REGISTERS_HEAP_TYPES
return res;
}
-static inline int64_t
-_get_interpid(_PyCrossInterpreterData *data)
-{
- int64_t interpid;
- if (data != NULL) {
- interpid = _PyCrossInterpreterData_INTERPID(data);
- assert(!PyErr_Occurred());
- }
- else {
- interpid = PyInterpreterState_GetID(PyInterpreterState_Get());
- }
- return interpid;
-}
-
static PyInterpreterState *
_get_current_interp(void)
{
}
-/* unbound items ************************************************************/
-
-#define UNBOUND_REMOVE 1
-#define UNBOUND_ERROR 2
-#define UNBOUND_REPLACE 3
-
-// It would also be possible to add UNBOUND_REPLACE where the replacement
-// value is user-provided. There would be some limitations there, though.
-// Another possibility would be something like UNBOUND_COPY, where the
-// object is released but the underlying data is copied (with the "raw"
-// allocator) and used when the item is popped off the queue.
-
-static int
-check_unbound(int unboundop)
-{
- switch (unboundop) {
- case UNBOUND_REMOVE:
- case UNBOUND_ERROR:
- case UNBOUND_REPLACE:
- return 1;
- default:
- return 0;
- }
-}
-
-
/* the basic queue **********************************************************/
struct _queueitem;
return _PyCrossInterpreterData_UnregisterClass(cls);
}
#endif
+
+
+static inline int64_t
+_get_interpid(_PyCrossInterpreterData *data)
+{
+ int64_t interpid;
+ if (data != NULL) {
+ interpid = _PyCrossInterpreterData_INTERPID(data);
+ assert(!PyErr_Occurred());
+ }
+ else {
+ interpid = PyInterpreterState_GetID(PyInterpreterState_Get());
+ }
+ return interpid;
+}
+
+
+/* unbound items ************************************************************/
+
+#ifdef HAS_UNBOUND_ITEMS
+
+#define UNBOUND_REMOVE 1
+#define UNBOUND_ERROR 2
+#define UNBOUND_REPLACE 3
+
+// It would also be possible to add UNBOUND_REPLACE where the replacement
+// value is user-provided. There would be some limitations there, though.
+// Another possibility would be something like UNBOUND_COPY, where the
+// object is released but the underlying data is copied (with the "raw"
+// allocator) and used when the item is popped off the queue.
+
+static int
+check_unbound(int unboundop)
+{
+ switch (unboundop) {
+ case UNBOUND_REMOVE:
+ case UNBOUND_ERROR:
+ case UNBOUND_REPLACE:
+ return 1;
+ default:
+ return 0;
+ }
+}
+
+#endif