This change includes some semi-related refactoring of queues and channels.
""".strip())
-UNBOUND = 2 # error; this should not happen.
-
-
class WorkerContext(_thread.WorkerContext):
@classmethod
if isinstance(fn, str):
# XXX Circle back to this later.
raise TypeError('scripts not supported')
- if args or kwargs:
- raise ValueError(f'a script does not take args or kwargs, got {args!r} and {kwargs!r}')
- data = textwrap.dedent(fn)
- kind = 'script'
- # Make sure the script compiles.
- # Ideally we wouldn't throw away the resulting code
- # object. However, there isn't much to be done until
- # code objects are shareable and/or we do a better job
- # of supporting code objects in _interpreters.exec().
- compile(data, '<string>', 'exec')
else:
# Functions defined in the __main__ module can't be pickled,
# so they can't be used here. In the future, we could possibly
# borrow from multiprocessing to work around this.
- data = pickle.dumps((fn, args, kwargs))
- kind = 'function'
- return (data, kind)
+ task = (fn, args, kwargs)
+ data = pickle.dumps(task)
+ return data
if initializer is not None:
try:
except BaseException as exc:
# Send the captured exception out on the results queue,
# but still leave it unhandled for the interpreter to handle.
- err = pickle.dumps(exc)
- _interpqueues.put(resultsid, (None, err), 1, UNBOUND)
+ _interpqueues.put(resultsid, (None, exc))
raise # re-raise
@classmethod
def _send_script_result(cls, resultsid):
- _interpqueues.put(resultsid, (None, None), 0, UNBOUND)
+ _interpqueues.put(resultsid, (None, None))
@classmethod
def _call(cls, func, args, kwargs, resultsid):
with cls._capture_exc(resultsid):
res = func(*args or (), **kwargs or {})
# Send the result back.
- try:
- _interpqueues.put(resultsid, (res, None), 0, UNBOUND)
- except _interpreters.NotShareableError:
- res = pickle.dumps(res)
- _interpqueues.put(resultsid, (res, None), 1, UNBOUND)
+ with cls._capture_exc(resultsid):
+ _interpqueues.put(resultsid, (res, None))
@classmethod
def _call_pickled(cls, pickled, resultsid):
_interpreters.incref(self.interpid)
maxsize = 0
- fmt = 0
- self.resultsid = _interpqueues.create(maxsize, fmt, UNBOUND)
+ self.resultsid = _interpqueues.create(maxsize)
self._exec(f'from {__name__} import WorkerContext')
pass
def run(self, task):
- data, kind = task
- if kind == 'script':
- raise NotImplementedError('script kind disabled')
- script = f"""
-with WorkerContext._capture_exc({self.resultsid}):
-{textwrap.indent(data, ' ')}
-WorkerContext._send_script_result({self.resultsid})"""
- elif kind == 'function':
- script = f'WorkerContext._call_pickled({data!r}, {self.resultsid})'
- else:
- raise NotImplementedError(kind)
+ data = task
+ script = f'WorkerContext._call_pickled({data!r}, {self.resultsid})'
try:
self._exec(script)
continue
else:
break
- (res, excdata), pickled, unboundop = obj
+ (res, exc), unboundop = obj
assert unboundop is None, unboundop
- if excdata is not None:
+ if exc is not None:
assert res is None, res
- assert pickled
assert exc_wrapper is not None
- exc = pickle.loads(excdata)
raise exc from exc_wrapper
- return pickle.loads(res) if pickled else res
+ return res
class BrokenInterpreterPool(_thread.BrokenThreadPool):
"""
unbound = _serialize_unbound(unbounditems)
unboundop, = unbound
- cid = _channels.create(unboundop)
- recv, send = RecvChannel(cid), SendChannel(cid, _unbound=unbound)
+ cid = _channels.create(unboundop, -1)
+ recv, send = RecvChannel(cid), SendChannel(cid)
+ send._set_unbound(unboundop, unbounditems)
return recv, send
def list_all():
"""Return a list of (recv, send) for all open channels."""
- return [(RecvChannel(cid), SendChannel(cid, _unbound=unbound))
- for cid, unbound in _channels.list_all()]
+ channels = []
+ for cid, unboundop, _ in _channels.list_all():
+ chan = _, send = RecvChannel(cid), SendChannel(cid)
+ if not hasattr(send, '_unboundop'):
+ send._set_unbound(unboundop)
+ else:
+ assert send._unbound[0] == op
+ channels.append(chan)
+ return channels
class _ChannelEnd:
_end = 'send'
- def __new__(cls, cid, *, _unbound=None):
- if _unbound is None:
- try:
- op = _channels.get_channel_defaults(cid)
- _unbound = (op,)
- except ChannelNotFoundError:
- _unbound = _serialize_unbound(UNBOUND)
- self = super().__new__(cls, cid)
- self._unbound = _unbound
- return self
+# def __new__(cls, cid, *, _unbound=None):
+# if _unbound is None:
+# try:
+# op = _channels.get_channel_defaults(cid)
+# _unbound = (op,)
+# except ChannelNotFoundError:
+# _unbound = _serialize_unbound(UNBOUND)
+# self = super().__new__(cls, cid)
+# self._unbound = _unbound
+# return self
+
+ def _set_unbound(self, op, items=None):
+ assert not hasattr(self, '_unbound')
+ if items is None:
+ items = _resolve_unbound(op)
+ unbound = (op, items)
+ self._unbound = unbound
+ return unbound
+
+ @property
+ def unbounditems(self):
+ try:
+ _, items = self._unbound
+ except AttributeError:
+ op, _ = _channels.get_queue_defaults(self._id)
+ _, items = self._set_unbound(op)
+ return items
@property
def is_closed(self):
return info.closed or info.closing
def send(self, obj, timeout=None, *,
- unbound=None,
+ unbounditems=None,
):
"""Send the object (i.e. its data) to the channel's receiving end.
This blocks until the object is received.
"""
- if unbound is None:
- unboundop, = self._unbound
+ if unbounditems is None:
+ unboundop = -1
else:
- unboundop, = _serialize_unbound(unbound)
+ unboundop, = _serialize_unbound(unbounditems)
_channels.send(self._id, obj, unboundop, timeout=timeout, blocking=True)
def send_nowait(self, obj, *,
- unbound=None,
+ unbounditems=None,
):
"""Send the object to the channel's receiving end.
If the object is immediately received then return True
(else False). Otherwise this is the same as send().
"""
- if unbound is None:
- unboundop, = self._unbound
+ if unbounditems is None:
+ unboundop = -1
else:
- unboundop, = _serialize_unbound(unbound)
+ unboundop, = _serialize_unbound(unbounditems)
# XXX Note that at the moment channel_send() only ever returns
# None. This should be fixed when channel_send_wait() is added.
# See bpo-32604 and gh-19829.
return _channels.send(self._id, obj, unboundop, blocking=False)
def send_buffer(self, obj, timeout=None, *,
- unbound=None,
+ unbounditems=None,
):
"""Send the object's buffer to the channel's receiving end.
This blocks until the object is received.
"""
- if unbound is None:
- unboundop, = self._unbound
+ if unbounditems is None:
+ unboundop = -1
else:
- unboundop, = _serialize_unbound(unbound)
+ unboundop, = _serialize_unbound(unbounditems)
_channels.send_buffer(self._id, obj, unboundop,
timeout=timeout, blocking=True)
def send_buffer_nowait(self, obj, *,
- unbound=None,
+ unbounditems=None,
):
"""Send the object's buffer to the channel's receiving end.
If the object is immediately received then return True
(else False). Otherwise this is the same as send().
"""
- if unbound is None:
- unboundop, = self._unbound
+ if unbounditems is None:
+ unboundop = -1
else:
- unboundop, = _serialize_unbound(unbound)
+ unboundop, = _serialize_unbound(unbounditems)
return _channels.send_buffer(self._id, obj, unboundop, blocking=False)
def close(self):
return resolved
-def create(maxsize=0, *, syncobj=False, unbounditems=UNBOUND):
+def create(maxsize=0, *, unbounditems=UNBOUND):
"""Return a new cross-interpreter queue.
The queue may be used to pass data safely between interpreters.
- "syncobj" sets the default for Queue.put()
- and Queue.put_nowait().
-
- "unbounditems" likewise sets the default. See Queue.put() for
+ "unbounditems" sets the default for Queue.put(); see that method for
supported values. The default value is UNBOUND, which replaces
the unbound item.
"""
- fmt = _SHARED_ONLY if syncobj else _PICKLED
unbound = _serialize_unbound(unbounditems)
unboundop, = unbound
- qid = _queues.create(maxsize, fmt, unboundop)
- return Queue(qid, _fmt=fmt, _unbound=unbound)
+ qid = _queues.create(maxsize, unboundop, -1)
+ self = Queue(qid)
+ self._set_unbound(unboundop, unbounditems)
+ return self
def list_all():
"""Return a list of all open queues."""
- return [Queue(qid, _fmt=fmt, _unbound=(unboundop,))
- for qid, fmt, unboundop in _queues.list_all()]
+ queues = []
+ for qid, unboundop, _ in _queues.list_all():
+ self = Queue(qid)
+ if not hasattr(self, '_unbound'):
+ self._set_unbound(unboundop)
+ else:
+ assert self._unbound[0] == unboundop
+ queues.append(self)
+ return queues
_known_queues = weakref.WeakValueDictionary()
class Queue:
"""A cross-interpreter queue."""
- def __new__(cls, id, /, *, _fmt=None, _unbound=None):
+ def __new__(cls, id, /):
# There is only one instance for any given ID.
if isinstance(id, int):
id = int(id)
else:
raise TypeError(f'id must be an int, got {id!r}')
- if _fmt is None:
- if _unbound is None:
- _fmt, op = _queues.get_queue_defaults(id)
- _unbound = (op,)
- else:
- _fmt, _ = _queues.get_queue_defaults(id)
- elif _unbound is None:
- _, op = _queues.get_queue_defaults(id)
- _unbound = (op,)
try:
self = _known_queues[id]
except KeyError:
self = super().__new__(cls)
self._id = id
- self._fmt = _fmt
- self._unbound = _unbound
_known_queues[id] = self
_queues.bind(id)
return self
def __getstate__(self):
return None
+ def _set_unbound(self, op, items=None):
+ assert not hasattr(self, '_unbound')
+ if items is None:
+ items = _resolve_unbound(op)
+ unbound = (op, items)
+ self._unbound = unbound
+ return unbound
+
@property
def id(self):
return self._id
+ @property
+ def unbounditems(self):
+ try:
+ _, items = self._unbound
+ except AttributeError:
+ op, _ = _queues.get_queue_defaults(self._id)
+ _, items = self._set_unbound(op)
+ return items
+
@property
def maxsize(self):
try:
return _queues.get_count(self._id)
def put(self, obj, timeout=None, *,
- syncobj=None,
- unbound=None,
+ unbounditems=None,
_delay=10 / 1000, # 10 milliseconds
):
"""Add the object to the queue.
This blocks while the queue is full.
- If "syncobj" is None (the default) then it uses the
- queue's default, set with create_queue().
-
- If "syncobj" is false then all objects are supported,
- at the expense of worse performance.
-
- If "syncobj" is true then the object must be "shareable".
- Examples of "shareable" objects include the builtin singletons,
- str, and memoryview. One benefit is that such objects are
- passed through the queue efficiently.
-
- The key difference, though, is conceptual: the corresponding
- object returned from Queue.get() will be strictly equivalent
- to the given obj. In other words, the two objects will be
- effectively indistinguishable from each other, even if the
- object is mutable. The received object may actually be the
- same object, or a copy (immutable values only), or a proxy.
- Regardless, the received object should be treated as though
- the original has been shared directly, whether or not it
- actually is. That's a slightly different and stronger promise
- than just (initial) equality, which is all "syncobj=False"
- can promise.
-
- "unbound" controls the behavior of Queue.get() for the given
+ For most objects, the object received through Queue.get() will
+ be a new one, equivalent to the original and not sharing any
+ actual underlying data. The notable exceptions include
+ cross-interpreter types (like Queue) and memoryview, where the
+ underlying data is actually shared. Furthermore, some types
+ can be sent through a queue more efficiently than others. This
+ group includes various immutable types like int, str, bytes, and
+ tuple (if the items are likewise efficiently shareable). See interpreters.is_shareable().
+
+ "unbounditems" controls the behavior of Queue.get() for the given
object if the current interpreter (calling put()) is later
destroyed.
- If "unbound" is None (the default) then it uses the
+ If "unbounditems" is None (the default) then it uses the
queue's default, set with create_queue(),
which is usually UNBOUND.
- If "unbound" is UNBOUND_ERROR then get() will raise an
+ If "unbounditems" is UNBOUND_ERROR then get() will raise an
ItemInterpreterDestroyed exception if the original interpreter
has been destroyed. This does not otherwise affect the queue;
the next call to put() will work like normal, returning the next
item in the queue.
- If "unbound" is UNBOUND_REMOVE then the item will be removed
+ If "unbounditems" is UNBOUND_REMOVE then the item will be removed
from the queue as soon as the original interpreter is destroyed.
Be aware that this will introduce an imbalance between put()
and get() calls.
- If "unbound" is UNBOUND then it is returned by get() in place
+ If "unbounditems" is UNBOUND then it is returned by get() in place
of the unbound item.
"""
- if syncobj is None:
- fmt = self._fmt
- else:
- fmt = _SHARED_ONLY if syncobj else _PICKLED
- if unbound is None:
- unboundop, = self._unbound
+ if unbounditems is None:
+ unboundop = -1
else:
- unboundop, = _serialize_unbound(unbound)
+ unboundop, = _serialize_unbound(unbounditems)
if timeout is not None:
timeout = int(timeout)
if timeout < 0:
raise ValueError(f'timeout value must be non-negative')
end = time.time() + timeout
- if fmt is _PICKLED:
- obj = pickle.dumps(obj)
while True:
try:
- _queues.put(self._id, obj, fmt, unboundop)
+ _queues.put(self._id, obj, unboundop)
except QueueFull as exc:
if timeout is not None and time.time() >= end:
raise # re-raise
else:
break
- def put_nowait(self, obj, *, syncobj=None, unbound=None):
- if syncobj is None:
- fmt = self._fmt
+ def put_nowait(self, obj, *, unbounditems=None):
+ if unbounditems is None:
+ unboundop = -1
else:
- fmt = _SHARED_ONLY if syncobj else _PICKLED
- if unbound is None:
- unboundop, = self._unbound
- else:
- unboundop, = _serialize_unbound(unbound)
- if fmt is _PICKLED:
- obj = pickle.dumps(obj)
- _queues.put(self._id, obj, fmt, unboundop)
+ unboundop, = _serialize_unbound(unbounditems)
+ _queues.put(self._id, obj, unboundop)
def get(self, timeout=None, *,
_delay=10 / 1000, # 10 milliseconds
If the next item's original interpreter has been destroyed
then the "next object" is determined by the value of the
- "unbound" argument to put().
+ "unbounditems" argument to put().
"""
if timeout is not None:
timeout = int(timeout)
end = time.time() + timeout
while True:
try:
- obj, fmt, unboundop = _queues.get(self._id)
+ obj, unboundop = _queues.get(self._id)
except QueueEmpty as exc:
if timeout is not None and time.time() >= end:
raise # re-raise
if unboundop is not None:
assert obj is None, repr(obj)
return _resolve_unbound(unboundop)
- if fmt == _PICKLED:
- obj = pickle.loads(obj)
- else:
- assert fmt == _SHARED_ONLY
return obj
def get_nowait(self):
is the same as get().
"""
try:
- obj, fmt, unboundop = _queues.get(self._id)
+ obj, unboundop = _queues.get(self._id)
except QueueEmpty as exc:
raise # re-raise
if unboundop is not None:
assert obj is None, repr(obj)
return _resolve_unbound(unboundop)
- if fmt == _PICKLED:
- obj = pickle.loads(obj)
- else:
- assert fmt == _SHARED_ONLY
return obj
def clean_up_channels():
- for cid, _ in _channels.list_all():
+ for cid, _, _ in _channels.list_all():
try:
_channels.destroy(cid)
except _channels.ChannelNotFoundError:
self.assertIsInstance(cid, _channels.ChannelID)
def test_sequential_ids(self):
- before = [cid for cid, _ in _channels.list_all()]
+ before = [cid for cid, _, _ in _channels.list_all()]
id1 = _channels.create(REPLACE)
id2 = _channels.create(REPLACE)
id3 = _channels.create(REPLACE)
- after = [cid for cid, _ in _channels.list_all()]
+ after = [cid for cid, _, _ in _channels.list_all()]
self.assertEqual(id2, int(id1) + 1)
self.assertEqual(id3, int(id2) + 1)
if not unbound:
extraargs = ''
elif unbound is channels.UNBOUND:
- extraargs = ', unbound=channels.UNBOUND'
+ extraargs = ', unbounditems=channels.UNBOUND'
elif unbound is channels.UNBOUND_ERROR:
- extraargs = ', unbound=channels.UNBOUND_ERROR'
+ extraargs = ', unbounditems=channels.UNBOUND_ERROR'
elif unbound is channels.UNBOUND_REMOVE:
- extraargs = ', unbound=channels.UNBOUND_REMOVE'
+ extraargs = ', unbounditems=channels.UNBOUND_REMOVE'
else:
raise NotImplementedError(repr(unbound))
interp = interpreters.create()
with self.assertRaises(channels.ChannelEmptyError):
rch.recv_nowait()
- sch.send_nowait(b'ham', unbound=channels.UNBOUND_REMOVE)
+ sch.send_nowait(b'ham', unbounditems=channels.UNBOUND_REMOVE)
self.assertEqual(_channels.get_count(rch.id), 1)
interp = common(rch, sch, channels.UNBOUND_REMOVE, 1)
self.assertEqual(_channels.get_count(rch.id), 3)
- sch.send_nowait(42, unbound=channels.UNBOUND_REMOVE)
+ sch.send_nowait(42, unbounditems=channels.UNBOUND_REMOVE)
self.assertEqual(_channels.get_count(rch.id), 4)
del interp
self.assertEqual(_channels.get_count(rch.id), 2)
_run_output(interp, dedent(f"""
from test.support.interpreters import channels
sch = channels.SendChannel({sch.id})
- sch.send_nowait(1, unbound=channels.UNBOUND)
- sch.send_nowait(2, unbound=channels.UNBOUND_ERROR)
+ sch.send_nowait(1, unbounditems=channels.UNBOUND)
+ sch.send_nowait(2, unbounditems=channels.UNBOUND_ERROR)
sch.send_nowait(3)
- sch.send_nowait(4, unbound=channels.UNBOUND_REMOVE)
- sch.send_nowait(5, unbound=channels.UNBOUND)
+ sch.send_nowait(4, unbounditems=channels.UNBOUND_REMOVE)
+ sch.send_nowait(5, unbounditems=channels.UNBOUND)
"""))
self.assertEqual(_channels.get_count(rch.id), 5)
rch = channels.RecvChannel({rch.id})
sch = channels.SendChannel({sch.id})
obj1 = rch.recv()
- sch.send_nowait(2, unbound=channels.UNBOUND)
- sch.send_nowait(obj1, unbound=channels.UNBOUND_REMOVE)
+ sch.send_nowait(2, unbounditems=channels.UNBOUND)
+ sch.send_nowait(obj1, unbounditems=channels.UNBOUND_REMOVE)
"""))
_run_output(interp2, dedent(f"""
from test.support.interpreters import channels
self.assertEqual(_channels.get_count(rch.id), 0)
sch.send_nowait(3)
_run_output(interp1, dedent("""
- sch.send_nowait(4, unbound=channels.UNBOUND)
+ sch.send_nowait(4, unbounditems=channels.UNBOUND)
# interp closed here
- sch.send_nowait(5, unbound=channels.UNBOUND_REMOVE)
- sch.send_nowait(6, unbound=channels.UNBOUND)
+ sch.send_nowait(5, unbounditems=channels.UNBOUND_REMOVE)
+ sch.send_nowait(6, unbounditems=channels.UNBOUND)
"""))
_run_output(interp2, dedent("""
- sch.send_nowait(7, unbound=channels.UNBOUND_ERROR)
+ sch.send_nowait(7, unbounditems=channels.UNBOUND_ERROR)
# interp closed here
- sch.send_nowait(obj1, unbound=channels.UNBOUND_ERROR)
- sch.send_nowait(obj2, unbound=channels.UNBOUND_REMOVE)
- sch.send_nowait(8, unbound=channels.UNBOUND)
+ sch.send_nowait(obj1, unbounditems=channels.UNBOUND_ERROR)
+ sch.send_nowait(obj2, unbounditems=channels.UNBOUND_REMOVE)
+ sch.send_nowait(8, unbounditems=channels.UNBOUND)
"""))
_run_output(interp1, dedent("""
- sch.send_nowait(9, unbound=channels.UNBOUND_REMOVE)
- sch.send_nowait(10, unbound=channels.UNBOUND)
+ sch.send_nowait(9, unbounditems=channels.UNBOUND_REMOVE)
+ sch.send_nowait(10, unbounditems=channels.UNBOUND)
"""))
self.assertEqual(_channels.get_count(rch.id), 10)
_queues = import_helper.import_module('_interpqueues')
from test.support import interpreters
from test.support.interpreters import queues, _crossinterp
+import test._crossinterp_definitions as defs
from .utils import _run_output, TestBase as _TestBase
importlib.reload(queues)
def test_create_destroy(self):
- qid = _queues.create(2, 0, REPLACE)
+ qid = _queues.create(2, REPLACE, -1)
_queues.destroy(qid)
self.assertEqual(get_num_queues(), 0)
with self.assertRaises(queues.QueueNotFoundError):
'-c',
dedent(f"""
import {_queues.__name__} as _queues
- _queues.create(2, 0, {REPLACE})
+ _queues.create(2, {REPLACE}, -1)
"""),
)
self.assertEqual(stdout, '')
def test_bind_release(self):
with self.subTest('typical'):
- qid = _queues.create(2, 0, REPLACE)
+ qid = _queues.create(2, REPLACE, -1)
_queues.bind(qid)
_queues.release(qid)
self.assertEqual(get_num_queues(), 0)
with self.subTest('bind too much'):
- qid = _queues.create(2, 0, REPLACE)
+ qid = _queues.create(2, REPLACE, -1)
_queues.bind(qid)
_queues.bind(qid)
_queues.release(qid)
self.assertEqual(get_num_queues(), 0)
with self.subTest('nested'):
- qid = _queues.create(2, 0, REPLACE)
+ qid = _queues.create(2, REPLACE, -1)
_queues.bind(qid)
_queues.bind(qid)
_queues.release(qid)
self.assertEqual(get_num_queues(), 0)
with self.subTest('release without binding'):
- qid = _queues.create(2, 0, REPLACE)
+ qid = _queues.create(2, REPLACE, -1)
with self.assertRaises(queues.QueueError):
_queues.release(qid)
with self.subTest('same interpreter'):
queue2 = queues.create()
- queue1.put(queue2, syncobj=True)
+ queue1.put(queue2)
queue3 = queue1.get()
self.assertIs(queue3, queue2)
with self.subTest('from current interpreter'):
queue4 = queues.create()
- queue1.put(queue4, syncobj=True)
+ queue1.put(queue4)
out = _run_output(interp, dedent("""
queue4 = queue1.get()
print(queue4.id)
with self.subTest('from subinterpreter'):
out = _run_output(interp, dedent("""
queue5 = queues.create()
- queue1.put(queue5, syncobj=True)
+ queue1.put(queue5)
print(queue5.id)
"""))
qid = int(out)
def test_empty(self):
queue = queues.create()
before = queue.empty()
- queue.put(None, syncobj=True)
+ queue.put(None)
during = queue.empty()
queue.get()
after = queue.empty()
queue = queues.create(3)
for _ in range(3):
actual.append(queue.full())
- queue.put(None, syncobj=True)
+ queue.put(None)
actual.append(queue.full())
for _ in range(3):
queue.get()
queue = queues.create()
for _ in range(3):
actual.append(queue.qsize())
- queue.put(None, syncobj=True)
+ queue.put(None)
actual.append(queue.qsize())
queue.get()
actual.append(queue.qsize())
- queue.put(None, syncobj=True)
+ queue.put(None)
actual.append(queue.qsize())
for _ in range(3):
queue.get()
actual.append(queue.qsize())
- queue.put(None, syncobj=True)
+ queue.put(None)
actual.append(queue.qsize())
queue.get()
actual.append(queue.qsize())
def test_put_get_main(self):
expected = list(range(20))
- for syncobj in (True, False):
- kwds = dict(syncobj=syncobj)
- with self.subTest(f'syncobj={syncobj}'):
- queue = queues.create()
- for i in range(20):
- queue.put(i, **kwds)
- actual = [queue.get() for _ in range(20)]
+ queue = queues.create()
+ for i in range(20):
+ queue.put(i)
+ actual = [queue.get() for _ in range(20)]
- self.assertEqual(actual, expected)
+ self.assertEqual(actual, expected)
def test_put_timeout(self):
- for syncobj in (True, False):
- kwds = dict(syncobj=syncobj)
- with self.subTest(f'syncobj={syncobj}'):
- queue = queues.create(2)
- queue.put(None, **kwds)
- queue.put(None, **kwds)
- with self.assertRaises(queues.QueueFull):
- queue.put(None, timeout=0.1, **kwds)
- queue.get()
- queue.put(None, **kwds)
+ queue = queues.create(2)
+ queue.put(None)
+ queue.put(None)
+ with self.assertRaises(queues.QueueFull):
+ queue.put(None, timeout=0.1)
+ queue.get()
+ queue.put(None)
def test_put_nowait(self):
- for syncobj in (True, False):
- kwds = dict(syncobj=syncobj)
- with self.subTest(f'syncobj={syncobj}'):
- queue = queues.create(2)
- queue.put_nowait(None, **kwds)
- queue.put_nowait(None, **kwds)
- with self.assertRaises(queues.QueueFull):
- queue.put_nowait(None, **kwds)
- queue.get()
- queue.put_nowait(None, **kwds)
-
- def test_put_syncobj(self):
- for obj in [
- None,
- True,
- 10,
- 'spam',
- b'spam',
- (0, 'a'),
- ]:
- with self.subTest(repr(obj)):
- queue = queues.create()
-
- queue.put(obj, syncobj=True)
- obj2 = queue.get()
- self.assertEqual(obj2, obj)
-
- queue.put(obj, syncobj=True)
- obj2 = queue.get_nowait()
- self.assertEqual(obj2, obj)
-
- for obj in [
- [1, 2, 3],
- {'a': 13, 'b': 17},
- ]:
- with self.subTest(repr(obj)):
- queue = queues.create()
- with self.assertRaises(interpreters.NotShareableError):
- queue.put(obj, syncobj=True)
+ queue = queues.create(2)
+ queue.put_nowait(None)
+ queue.put_nowait(None)
+ with self.assertRaises(queues.QueueFull):
+ queue.put_nowait(None)
+ queue.get()
+ queue.put_nowait(None)
- def test_put_not_syncobj(self):
+ def test_put_full_fallback(self):
for obj in [
None,
True,
with self.subTest(repr(obj)):
queue = queues.create()
- queue.put(obj, syncobj=False)
+ queue.put(obj)
obj2 = queue.get()
self.assertEqual(obj2, obj)
- queue.put(obj, syncobj=False)
+ queue.put(obj)
obj2 = queue.get_nowait()
self.assertEqual(obj2, obj)
with self.assertRaises(queues.QueueEmpty):
queue.get_nowait()
- def test_put_get_default_syncobj(self):
+ def test_put_get_full_fallback(self):
expected = list(range(20))
- queue = queues.create(syncobj=True)
- for methname in ('get', 'get_nowait'):
- with self.subTest(f'{methname}()'):
- get = getattr(queue, methname)
- for i in range(20):
- queue.put(i)
- actual = [get() for _ in range(20)]
- self.assertEqual(actual, expected)
-
- obj = [1, 2, 3] # lists are not shareable
- with self.assertRaises(interpreters.NotShareableError):
- queue.put(obj)
-
- def test_put_get_default_not_syncobj(self):
- expected = list(range(20))
- queue = queues.create(syncobj=False)
+ queue = queues.create()
for methname in ('get', 'get_nowait'):
with self.subTest(f'{methname}()'):
get = getattr(queue, methname)
with self.subTest(f'{methname}()'):
interp.exec(dedent(f"""
orig = b'spam'
- queue.put(orig, syncobj=True)
+ queue.put(orig)
obj = queue.{methname}()
assert obj == orig, 'expected: obj == orig'
assert obj is not orig, 'expected: obj is not orig'
for methname in ('get', 'get_nowait'):
with self.subTest(f'{methname}()'):
obj1 = b'spam'
- queue1.put(obj1, syncobj=True)
+ queue1.put(obj1)
out = _run_output(
interp,
obj2 = b'eggs'
print(id(obj2))
assert queue2.qsize() == 0, 'expected: queue2.qsize() == 0'
- queue2.put(obj2, syncobj=True)
+ queue2.put(obj2)
assert queue2.qsize() == 1, 'expected: queue2.qsize() == 1'
"""))
self.assertEqual(len(queues.list_all()), 2)
if not unbound:
extraargs = ''
elif unbound is queues.UNBOUND:
- extraargs = ', unbound=queues.UNBOUND'
+ extraargs = ', unbounditems=queues.UNBOUND'
elif unbound is queues.UNBOUND_ERROR:
- extraargs = ', unbound=queues.UNBOUND_ERROR'
+ extraargs = ', unbounditems=queues.UNBOUND_ERROR'
elif unbound is queues.UNBOUND_REMOVE:
- extraargs = ', unbound=queues.UNBOUND_REMOVE'
+ extraargs = ', unbounditems=queues.UNBOUND_REMOVE'
else:
raise NotImplementedError(repr(unbound))
interp = interpreters.create()
queue = queues.Queue({queue.id})
obj1 = b'spam'
obj2 = b'eggs'
- queue.put(obj1, syncobj=True{extraargs})
- queue.put(obj2, syncobj=True{extraargs})
+ queue.put(obj1{extraargs})
+ queue.put(obj2{extraargs})
"""))
self.assertEqual(queue.qsize(), presize + 2)
with self.assertRaises(queues.QueueEmpty):
queue.get_nowait()
- queue.put(b'ham', unbound=queues.UNBOUND_REMOVE)
+ queue.put(b'ham', unbounditems=queues.UNBOUND_REMOVE)
self.assertEqual(queue.qsize(), 1)
interp = common(queue, queues.UNBOUND_REMOVE, 1)
self.assertEqual(queue.qsize(), 3)
- queue.put(42, unbound=queues.UNBOUND_REMOVE)
+ queue.put(42, unbounditems=queues.UNBOUND_REMOVE)
self.assertEqual(queue.qsize(), 4)
del interp
self.assertEqual(queue.qsize(), 2)
_run_output(interp, dedent(f"""
from test.support.interpreters import queues
queue = queues.Queue({queue.id})
- queue.put(1, syncobj=True, unbound=queues.UNBOUND)
- queue.put(2, syncobj=True, unbound=queues.UNBOUND_ERROR)
- queue.put(3, syncobj=True)
- queue.put(4, syncobj=True, unbound=queues.UNBOUND_REMOVE)
- queue.put(5, syncobj=True, unbound=queues.UNBOUND)
+ queue.put(1, unbounditems=queues.UNBOUND)
+ queue.put(2, unbounditems=queues.UNBOUND_ERROR)
+ queue.put(3)
+ queue.put(4, unbounditems=queues.UNBOUND_REMOVE)
+ queue.put(5, unbounditems=queues.UNBOUND)
"""))
self.assertEqual(queue.qsize(), 5)
interp1 = interpreters.create()
interp2 = interpreters.create()
- queue.put(1, syncobj=True)
+ queue.put(1)
_run_output(interp1, dedent(f"""
from test.support.interpreters import queues
queue = queues.Queue({queue.id})
obj1 = queue.get()
- queue.put(2, syncobj=True, unbound=queues.UNBOUND)
- queue.put(obj1, syncobj=True, unbound=queues.UNBOUND_REMOVE)
+ queue.put(2, unbounditems=queues.UNBOUND)
+ queue.put(obj1, unbounditems=queues.UNBOUND_REMOVE)
"""))
_run_output(interp2, dedent(f"""
from test.support.interpreters import queues
self.assertEqual(queue.qsize(), 0)
queue.put(3)
_run_output(interp1, dedent("""
- queue.put(4, syncobj=True, unbound=queues.UNBOUND)
+ queue.put(4, unbounditems=queues.UNBOUND)
# interp closed here
- queue.put(5, syncobj=True, unbound=queues.UNBOUND_REMOVE)
- queue.put(6, syncobj=True, unbound=queues.UNBOUND)
+ queue.put(5, unbounditems=queues.UNBOUND_REMOVE)
+ queue.put(6, unbounditems=queues.UNBOUND)
"""))
_run_output(interp2, dedent("""
- queue.put(7, syncobj=True, unbound=queues.UNBOUND_ERROR)
+ queue.put(7, unbounditems=queues.UNBOUND_ERROR)
# interp closed here
- queue.put(obj1, syncobj=True, unbound=queues.UNBOUND_ERROR)
- queue.put(obj2, syncobj=True, unbound=queues.UNBOUND_REMOVE)
- queue.put(8, syncobj=True, unbound=queues.UNBOUND)
+ queue.put(obj1, unbounditems=queues.UNBOUND_ERROR)
+ queue.put(obj2, unbounditems=queues.UNBOUND_REMOVE)
+ queue.put(8, unbounditems=queues.UNBOUND)
"""))
_run_output(interp1, dedent("""
- queue.put(9, syncobj=True, unbound=queues.UNBOUND_REMOVE)
- queue.put(10, syncobj=True, unbound=queues.UNBOUND)
+ queue.put(9, unbounditems=queues.UNBOUND_REMOVE)
+ queue.put(10, unbounditems=queues.UNBOUND)
"""))
self.assertEqual(queue.qsize(), 10)
break
except queues.QueueEmpty:
continue
- queue2.put(obj, syncobj=True)
+ queue2.put(obj)
t = threading.Thread(target=f)
t.start()
orig = b'spam'
- queue1.put(orig, syncobj=True)
+ queue1.put(orig)
obj = queue2.get()
t.join()
#endif
#define REGISTERS_HEAP_TYPES
+#define HAS_FALLBACK
#define HAS_UNBOUND_ITEMS
#include "_interpreters_common.h"
#undef HAS_UNBOUND_ITEMS
+#undef HAS_FALLBACK
#undef REGISTERS_HEAP_TYPES
int64_t interpid;
_PyXIData_t *data;
_waiting_t *waiting;
- int unboundop;
+ unboundop_t unboundop;
struct _channelitem *next;
} _channelitem;
static void
_channelitem_init(_channelitem *item,
int64_t interpid, _PyXIData_t *data,
- _waiting_t *waiting, int unboundop)
+ _waiting_t *waiting, unboundop_t unboundop)
{
if (interpid < 0) {
interpid = _get_interpid(data);
static _channelitem *
_channelitem_new(int64_t interpid, _PyXIData_t *data,
- _waiting_t *waiting, int unboundop)
+ _waiting_t *waiting, unboundop_t unboundop)
{
_channelitem *item = GLOBAL_MALLOC(_channelitem);
if (item == NULL) {
static int
_channelqueue_put(_channelqueue *queue,
int64_t interpid, _PyXIData_t *data,
- _waiting_t *waiting, int unboundop)
+ _waiting_t *waiting, unboundop_t unboundop)
{
_channelitem *item = _channelitem_new(interpid, data, waiting, unboundop);
if (item == NULL) {
}
queue->count -= 1;
- int unboundop;
+ unboundop_t unboundop;
_channelitem_popped(item, p_data, p_waiting, &unboundop);
}
PyThread_type_lock mutex;
_channelqueue *queue;
_channelends *ends;
- struct {
- int unboundop;
+ struct _channeldefaults {
+ unboundop_t unboundop;
+ xidata_fallback_t fallback;
} defaults;
int open;
struct _channel_closing *closing;
} _channel_state;
static _channel_state *
-_channel_new(PyThread_type_lock mutex, int unboundop)
+_channel_new(PyThread_type_lock mutex, struct _channeldefaults defaults)
{
+ assert(check_unbound(defaults.unboundop));
_channel_state *chan = GLOBAL_MALLOC(_channel_state);
if (chan == NULL) {
return NULL;
GLOBAL_FREE(chan);
return NULL;
}
- chan->defaults.unboundop = unboundop;
+ chan->defaults = defaults;
chan->open = 1;
chan->closing = NULL;
return chan;
static int
_channel_add(_channel_state *chan, int64_t interpid,
- _PyXIData_t *data, _waiting_t *waiting, int unboundop)
+ _PyXIData_t *data, _waiting_t *waiting, unboundop_t unboundop)
{
int res = -1;
PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
struct channel_id_and_info {
int64_t id;
- int unboundop;
+ struct _channeldefaults defaults;
};
static struct channel_id_and_info *
for (int64_t i=0; ref != NULL; ref = ref->next, i++) {
ids[i] = (struct channel_id_and_info){
.id = ref->cid,
- .unboundop = ref->chan->defaults.unboundop,
+ .defaults = ref->chan->defaults,
};
}
*count = channels->numopen;
// Create a new channel.
static int64_t
-channel_create(_channels *channels, int unboundop)
+channel_create(_channels *channels, struct _channeldefaults defaults)
{
PyThread_type_lock mutex = PyThread_allocate_lock();
if (mutex == NULL) {
return ERR_CHANNEL_MUTEX_INIT;
}
- _channel_state *chan = _channel_new(mutex, unboundop);
+ _channel_state *chan = _channel_new(mutex, defaults);
if (chan == NULL) {
PyThread_free_lock(mutex);
return -1;
// Optionally request to be notified when it is received.
static int
channel_send(_channels *channels, int64_t cid, PyObject *obj,
- _waiting_t *waiting, int unboundop)
+ _waiting_t *waiting, unboundop_t unboundop, xidata_fallback_t fallback)
{
PyThreadState *tstate = _PyThreadState_GET();
PyInterpreterState *interp = tstate->interp;
PyThread_release_lock(mutex);
return -1;
}
- if (_PyObject_GetXIDataNoFallback(tstate, obj, data) != 0) {
+ if (_PyObject_GetXIData(tstate, obj, fallback, data) != 0) {
PyThread_release_lock(mutex);
GLOBAL_FREE(data);
return -1;
// Like channel_send(), but strictly wait for the object to be received.
static int
channel_send_wait(_channels *channels, int64_t cid, PyObject *obj,
- int unboundop, PY_TIMEOUT_T timeout)
+ unboundop_t unboundop, PY_TIMEOUT_T timeout,
+ xidata_fallback_t fallback)
{
// We use a stack variable here, so we must ensure that &waiting
// is not held by any channel item at the point this function exits.
}
/* Queue up the object. */
- int res = channel_send(channels, cid, obj, &waiting, unboundop);
+ int res = channel_send(channels, cid, obj, &waiting, unboundop, fallback);
if (res < 0) {
assert(waiting.status == WAITING_NO_STATUS);
goto finally;
return (end != NULL && end->open);
}
+static int
+channel_get_defaults(_channels *channels, int64_t cid, struct _channeldefaults *defaults)
+{
+ PyThread_type_lock mutex = NULL;
+ _channel_state *channel = NULL;
+ int err = _channels_lookup(channels, cid, &mutex, &channel);
+ if (err != 0) {
+ return err;
+ }
+ *defaults = channel->defaults;
+ PyThread_release_lock(mutex);
+ return 0;
+}
+
static int
_channel_get_count(_channels *channels, int64_t cid, Py_ssize_t *p_count)
{
static PyObject *
channelsmod_create(PyObject *self, PyObject *args, PyObject *kwds)
{
- static char *kwlist[] = {"unboundop", NULL};
- int unboundop;
- if (!PyArg_ParseTupleAndKeywords(args, kwds, "i:create", kwlist,
- &unboundop))
+ static char *kwlist[] = {"unboundop", "fallback", NULL};
+ int unboundarg = -1;
+ int fallbackarg = -1;
+ if (!PyArg_ParseTupleAndKeywords(args, kwds, "|ii:create", kwlist,
+ &unboundarg, &fallbackarg))
{
return NULL;
}
- if (!check_unbound(unboundop)) {
- PyErr_Format(PyExc_ValueError,
- "unsupported unboundop %d", unboundop);
+ struct _channeldefaults defaults = {0};
+ if (resolve_unboundop(unboundarg, UNBOUND_REPLACE,
+ &defaults.unboundop) < 0)
+ {
+ return NULL;
+ }
+ if (resolve_fallback(fallbackarg, _PyXIDATA_FULL_FALLBACK,
+ &defaults.fallback) < 0)
+ {
return NULL;
}
- int64_t cid = channel_create(&_globals.channels, unboundop);
+ int64_t cid = channel_create(&_globals.channels, defaults);
if (cid < 0) {
(void)handle_channel_error(-1, self, cid);
return NULL;
}
assert(cidobj != NULL);
- PyObject *item = Py_BuildValue("Oi", cidobj, cur->unboundop);
+ PyObject *item = Py_BuildValue("Oii", cidobj,
+ cur->defaults.unboundop,
+ cur->defaults.fallback);
Py_DECREF(cidobj);
if (item == NULL) {
Py_SETREF(ids, NULL);
static PyObject *
channelsmod_send(PyObject *self, PyObject *args, PyObject *kwds)
{
- static char *kwlist[] = {"cid", "obj", "unboundop", "blocking", "timeout",
- NULL};
+ static char *kwlist[] = {"cid", "obj", "unboundop", "fallback",
+ "blocking", "timeout", NULL};
struct channel_id_converter_data cid_data = {
.module = self,
};
PyObject *obj;
- int unboundop = UNBOUND_REPLACE;
+ int unboundarg = -1;
+ int fallbackarg = -1;
int blocking = 1;
PyObject *timeout_obj = NULL;
- if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&O|i$pO:channel_send", kwlist,
+ if (!PyArg_ParseTupleAndKeywords(args, kwds,
+ "O&O|ii$pO:channel_send", kwlist,
channel_id_converter, &cid_data, &obj,
- &unboundop, &blocking, &timeout_obj))
+ &unboundarg, &fallbackarg,
+ &blocking, &timeout_obj))
{
return NULL;
}
- if (!check_unbound(unboundop)) {
- PyErr_Format(PyExc_ValueError,
- "unsupported unboundop %d", unboundop);
- return NULL;
- }
-
int64_t cid = cid_data.cid;
PY_TIMEOUT_T timeout;
if (PyThread_ParseTimeoutArg(timeout_obj, blocking, &timeout) < 0) {
return NULL;
}
+ struct _channeldefaults defaults = {-1, -1};
+ if (unboundarg < 0 || fallbackarg < 0) {
+ int err = channel_get_defaults(&_globals.channels, cid, &defaults);
+ if (handle_channel_error(err, self, cid)) {
+ return NULL;
+ }
+ }
+ unboundop_t unboundop;
+ if (resolve_unboundop(unboundarg, defaults.unboundop, &unboundop) < 0) {
+ return NULL;
+ }
+ xidata_fallback_t fallback;
+ if (resolve_fallback(fallbackarg, defaults.fallback, &fallback) < 0) {
+ return NULL;
+ }
/* Queue up the object. */
int err = 0;
if (blocking) {
- err = channel_send_wait(&_globals.channels, cid, obj, unboundop, timeout);
+ err = channel_send_wait(
+ &_globals.channels, cid, obj, unboundop, timeout, fallback);
}
else {
- err = channel_send(&_globals.channels, cid, obj, NULL, unboundop);
+ err = channel_send(
+ &_globals.channels, cid, obj, NULL, unboundop, fallback);
}
if (handle_channel_error(err, self, cid)) {
return NULL;
static PyObject *
channelsmod_send_buffer(PyObject *self, PyObject *args, PyObject *kwds)
{
- static char *kwlist[] = {"cid", "obj", "unboundop", "blocking", "timeout",
- NULL};
+ static char *kwlist[] = {"cid", "obj", "unboundop", "fallback",
+ "blocking", "timeout", NULL};
struct channel_id_converter_data cid_data = {
.module = self,
};
PyObject *obj;
- int unboundop = UNBOUND_REPLACE;
- int blocking = 1;
+ int unboundarg = -1;
+ int fallbackarg = -1;
+ int blocking = -1;
PyObject *timeout_obj = NULL;
if (!PyArg_ParseTupleAndKeywords(args, kwds,
- "O&O|i$pO:channel_send_buffer", kwlist,
+ "O&O|ii$pO:channel_send_buffer", kwlist,
channel_id_converter, &cid_data, &obj,
- &unboundop, &blocking, &timeout_obj)) {
- return NULL;
- }
- if (!check_unbound(unboundop)) {
- PyErr_Format(PyExc_ValueError,
- "unsupported unboundop %d", unboundop);
+ &unboundarg, &fallbackarg,
+ &blocking, &timeout_obj))
+ {
return NULL;
}
-
int64_t cid = cid_data.cid;
PY_TIMEOUT_T timeout;
if (PyThread_ParseTimeoutArg(timeout_obj, blocking, &timeout) < 0) {
return NULL;
}
+ struct _channeldefaults defaults = {-1, -1};
+ if (unboundarg < 0 || fallbackarg < 0) {
+ int err = channel_get_defaults(&_globals.channels, cid, &defaults);
+ if (handle_channel_error(err, self, cid)) {
+ return NULL;
+ }
+ }
+ unboundop_t unboundop;
+ if (resolve_unboundop(unboundarg, defaults.unboundop, &unboundop) < 0) {
+ return NULL;
+ }
+ xidata_fallback_t fallback;
+ if (resolve_fallback(fallbackarg, defaults.fallback, &fallback) < 0) {
+ return NULL;
+ }
PyObject *tempobj = PyMemoryView_FromObject(obj);
if (tempobj == NULL) {
int err = 0;
if (blocking) {
err = channel_send_wait(
- &_globals.channels, cid, tempobj, unboundop, timeout);
+ &_globals.channels, cid, tempobj, unboundop, timeout, fallback);
}
else {
- err = channel_send(&_globals.channels, cid, tempobj, NULL, unboundop);
+ err = channel_send(
+ &_globals.channels, cid, tempobj, NULL, unboundop, fallback);
}
Py_DECREF(tempobj);
if (handle_channel_error(err, self, cid)) {
cid = cid_data.cid;
PyObject *obj = NULL;
- int unboundop = 0;
+ unboundop_t unboundop = 0;
int err = channel_recv(&_globals.channels, cid, &obj, &unboundop);
if (err == ERR_CHANNEL_EMPTY && dflt != NULL) {
// Use the default.
}
int64_t cid = cid_data.cid;
- PyThread_type_lock mutex = NULL;
- _channel_state *channel = NULL;
- int err = _channels_lookup(&_globals.channels, cid, &mutex, &channel);
+ struct _channeldefaults defaults;
+ int err = channel_get_defaults(&_globals.channels, cid, &defaults);
if (handle_channel_error(err, self, cid)) {
return NULL;
}
- int unboundop = channel->defaults.unboundop;
- PyThread_release_lock(mutex);
- PyObject *defaults = Py_BuildValue("i", unboundop);
- return defaults;
+ PyObject *res = Py_BuildValue("ii", defaults.unboundop, defaults.fallback);
+ return res;
}
PyDoc_STRVAR(channelsmod_get_channel_defaults_doc,
#include "pycore_crossinterp.h" // _PyXIData_t
#define REGISTERS_HEAP_TYPES
+#define HAS_FALLBACK
#define HAS_UNBOUND_ITEMS
#include "_interpreters_common.h"
#undef HAS_UNBOUND_ITEMS
+#undef HAS_FALLBACK
#undef REGISTERS_HEAP_TYPES
meaning the interpreter has been destroyed. */
int64_t interpid;
_PyXIData_t *data;
- int fmt;
- int unboundop;
+ unboundop_t unboundop;
struct _queueitem *next;
} _queueitem;
static void
_queueitem_init(_queueitem *item,
- int64_t interpid, _PyXIData_t *data, int fmt, int unboundop)
+ int64_t interpid, _PyXIData_t *data, unboundop_t unboundop)
{
if (interpid < 0) {
interpid = _get_interpid(data);
*item = (_queueitem){
.interpid = interpid,
.data = data,
- .fmt = fmt,
.unboundop = unboundop,
};
}
}
static _queueitem *
-_queueitem_new(int64_t interpid, _PyXIData_t *data, int fmt, int unboundop)
+_queueitem_new(int64_t interpid, _PyXIData_t *data, int unboundop)
{
_queueitem *item = GLOBAL_MALLOC(_queueitem);
if (item == NULL) {
PyErr_NoMemory();
return NULL;
}
- _queueitem_init(item, interpid, data, fmt, unboundop);
+ _queueitem_init(item, interpid, data, unboundop);
return item;
}
static void
_queueitem_popped(_queueitem *item,
- _PyXIData_t **p_data, int *p_fmt, int *p_unboundop)
+ _PyXIData_t **p_data, unboundop_t *p_unboundop)
{
*p_data = item->data;
- *p_fmt = item->fmt;
*p_unboundop = item->unboundop;
// We clear them here, so they won't be released in _queueitem_clear().
item->data = NULL;
_queueitem *first;
_queueitem *last;
} items;
- struct {
- int fmt;
+ struct _queuedefaults {
+ xidata_fallback_t fallback;
int unboundop;
} defaults;
} _queue;
static int
-_queue_init(_queue *queue, Py_ssize_t maxsize, int fmt, int unboundop)
+_queue_init(_queue *queue, Py_ssize_t maxsize, struct _queuedefaults defaults)
{
- assert(check_unbound(unboundop));
+ assert(check_unbound(defaults.unboundop));
PyThread_type_lock mutex = PyThread_allocate_lock();
if (mutex == NULL) {
return ERR_QUEUE_ALLOC;
.items = {
.maxsize = maxsize,
},
- .defaults = {
- .fmt = fmt,
- .unboundop = unboundop,
- },
+ .defaults = defaults,
};
return 0;
}
}
static int
-_queue_add(_queue *queue, int64_t interpid, _PyXIData_t *data,
- int fmt, int unboundop)
+_queue_add(_queue *queue, int64_t interpid, _PyXIData_t *data, int unboundop)
{
int err = _queue_lock(queue);
if (err < 0) {
return ERR_QUEUE_FULL;
}
- _queueitem *item = _queueitem_new(interpid, data, fmt, unboundop);
+ _queueitem *item = _queueitem_new(interpid, data, unboundop);
if (item == NULL) {
_queue_unlock(queue);
return -1;
}
static int
-_queue_next(_queue *queue,
- _PyXIData_t **p_data, int *p_fmt, int *p_unboundop)
+_queue_next(_queue *queue, _PyXIData_t **p_data, int *p_unboundop)
{
int err = _queue_lock(queue);
if (err < 0) {
}
queue->items.count -= 1;
- _queueitem_popped(item, p_data, p_fmt, p_unboundop);
+ _queueitem_popped(item, p_data, p_unboundop);
_queue_unlock(queue);
return 0;
struct queue_id_and_info {
int64_t id;
- int fmt;
- int unboundop;
+ struct _queuedefaults defaults;
};
static struct queue_id_and_info *
for (int64_t i=0; ref != NULL; ref = ref->next, i++) {
ids[i].id = ref->qid;
assert(ref->queue != NULL);
- ids[i].fmt = ref->queue->defaults.fmt;
- ids[i].unboundop = ref->queue->defaults.unboundop;
+ ids[i].defaults = ref->queue->defaults;
}
*p_count = queues->count;
// Create a new queue.
static int64_t
-queue_create(_queues *queues, Py_ssize_t maxsize, int fmt, int unboundop)
+queue_create(_queues *queues, Py_ssize_t maxsize,
+ struct _queuedefaults defaults)
{
_queue *queue = GLOBAL_MALLOC(_queue);
if (queue == NULL) {
return ERR_QUEUE_ALLOC;
}
- int err = _queue_init(queue, maxsize, fmt, unboundop);
+ int err = _queue_init(queue, maxsize, defaults);
if (err < 0) {
GLOBAL_FREE(queue);
return (int64_t)err;
// Push an object onto the queue.
static int
-queue_put(_queues *queues, int64_t qid, PyObject *obj, int fmt, int unboundop)
+queue_put(_queues *queues, int64_t qid, PyObject *obj, unboundop_t unboundop,
+ xidata_fallback_t fallback)
{
PyThreadState *tstate = PyThreadState_Get();
assert(queue != NULL);
// Convert the object to cross-interpreter data.
- _PyXIData_t *data = _PyXIData_New();
- if (data == NULL) {
+ _PyXIData_t *xidata = _PyXIData_New();
+ if (xidata == NULL) {
_queue_unmark_waiter(queue, queues->mutex);
return -1;
}
- if (_PyObject_GetXIDataNoFallback(tstate, obj, data) != 0) {
+ if (_PyObject_GetXIData(tstate, obj, fallback, xidata) != 0) {
_queue_unmark_waiter(queue, queues->mutex);
- GLOBAL_FREE(data);
+ GLOBAL_FREE(xidata);
return -1;
}
- assert(_PyXIData_INTERPID(data) ==
+ assert(_PyXIData_INTERPID(xidata) ==
PyInterpreterState_GetID(tstate->interp));
// Add the data to the queue.
int64_t interpid = -1; // _queueitem_init() will set it.
- int res = _queue_add(queue, interpid, data, fmt, unboundop);
+ int res = _queue_add(queue, interpid, xidata, unboundop);
_queue_unmark_waiter(queue, queues->mutex);
if (res != 0) {
// We may chain an exception here:
- (void)_release_xid_data(data, 0);
- GLOBAL_FREE(data);
+ (void)_release_xid_data(xidata, 0);
+ GLOBAL_FREE(xidata);
return res;
}
// XXX Support a "wait" mutex?
static int
queue_get(_queues *queues, int64_t qid,
- PyObject **res, int *p_fmt, int *p_unboundop)
+ PyObject **res, int *p_unboundop)
{
int err;
*res = NULL;
// Pop off the next item from the queue.
_PyXIData_t *data = NULL;
- err = _queue_next(queue, &data, p_fmt, p_unboundop);
+ err = _queue_next(queue, &data, p_unboundop);
_queue_unmark_waiter(queue, queues->mutex);
if (err != 0) {
return err;
return 0;
}
+static int
+queue_get_defaults(_queues *queues, int64_t qid,
+ struct _queuedefaults *p_defaults)
+{
+ _queue *queue = NULL;
+ int err = _queues_lookup(queues, qid, &queue);
+ if (err != 0) {
+ return err;
+ }
+ *p_defaults = queue->defaults;
+ _queue_unmark_waiter(queue, queues->mutex);
+ return 0;
+}
+
static int
queue_get_maxsize(_queues *queues, int64_t qid, Py_ssize_t *p_maxsize)
{
static PyObject *
queuesmod_create(PyObject *self, PyObject *args, PyObject *kwds)
{
- static char *kwlist[] = {"maxsize", "fmt", "unboundop", NULL};
+ static char *kwlist[] = {"maxsize", "unboundop", "fallback", NULL};
Py_ssize_t maxsize;
- int fmt;
- int unboundop;
- if (!PyArg_ParseTupleAndKeywords(args, kwds, "nii:create", kwlist,
- &maxsize, &fmt, &unboundop))
+ int unboundarg = -1;
+ int fallbackarg = -1;
+ if (!PyArg_ParseTupleAndKeywords(args, kwds, "n|ii:create", kwlist,
+ &maxsize, &unboundarg, &fallbackarg))
{
return NULL;
}
- if (!check_unbound(unboundop)) {
- PyErr_Format(PyExc_ValueError,
- "unsupported unboundop %d", unboundop);
+ struct _queuedefaults defaults = {0};
+ if (resolve_unboundop(unboundarg, UNBOUND_REPLACE,
+ &defaults.unboundop) < 0)
+ {
+ return NULL;
+ }
+ if (resolve_fallback(fallbackarg, _PyXIDATA_FULL_FALLBACK,
+ &defaults.fallback) < 0)
+ {
return NULL;
}
- int64_t qid = queue_create(&_globals.queues, maxsize, fmt, unboundop);
+ int64_t qid = queue_create(&_globals.queues, maxsize, defaults);
if (qid < 0) {
(void)handle_queue_error((int)qid, self, qid);
return NULL;
}
PyDoc_STRVAR(queuesmod_create_doc,
-"create(maxsize, fmt, unboundop) -> qid\n\
+"create(maxsize, unboundop, fallback) -> qid\n\
\n\
Create a new cross-interpreter queue and return its unique generated ID.\n\
It is a new reference as though bind() had been called on the queue.\n\
}
struct queue_id_and_info *cur = qids;
for (int64_t i=0; i < count; cur++, i++) {
- PyObject *item = Py_BuildValue("Lii", cur->id, cur->fmt,
- cur->unboundop);
+ PyObject *item = Py_BuildValue("Lii", cur->id,
+ cur->defaults.unboundop,
+ cur->defaults.fallback);
if (item == NULL) {
Py_SETREF(ids, NULL);
break;
}
PyDoc_STRVAR(queuesmod_list_all_doc,
-"list_all() -> [(qid, fmt)]\n\
+"list_all() -> [(qid, unboundop, fallback)]\n\
\n\
Return the list of IDs for all queues.\n\
-Each corresponding default format is also included.");
+Each corresponding default unbound op and fallback is also included.");
static PyObject *
queuesmod_put(PyObject *self, PyObject *args, PyObject *kwds)
{
- static char *kwlist[] = {"qid", "obj", "fmt", "unboundop", NULL};
+ static char *kwlist[] = {"qid", "obj", "unboundop", "fallback", NULL};
qidarg_converter_data qidarg = {0};
PyObject *obj;
- int fmt;
- int unboundop;
- if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&Oii:put", kwlist,
- qidarg_converter, &qidarg, &obj, &fmt,
- &unboundop))
+ int unboundarg = -1;
+ int fallbackarg = -1;
+ if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&O|ii$p:put", kwlist,
+ qidarg_converter, &qidarg, &obj,
+ &unboundarg, &fallbackarg))
{
return NULL;
}
int64_t qid = qidarg.id;
- if (!check_unbound(unboundop)) {
- PyErr_Format(PyExc_ValueError,
- "unsupported unboundop %d", unboundop);
+ struct _queuedefaults defaults = {-1, -1};
+ if (unboundarg < 0 || fallbackarg < 0) {
+ int err = queue_get_defaults(&_globals.queues, qid, &defaults);
+ if (handle_queue_error(err, self, qid)) {
+ return NULL;
+ }
+ }
+ unboundop_t unboundop;
+ if (resolve_unboundop(unboundarg, defaults.unboundop, &unboundop) < 0) {
+ return NULL;
+ }
+ xidata_fallback_t fallback;
+ if (resolve_fallback(fallbackarg, defaults.fallback, &fallback) < 0) {
return NULL;
}
/* Queue up the object. */
- int err = queue_put(&_globals.queues, qid, obj, fmt, unboundop);
+ int err = queue_put(&_globals.queues, qid, obj, unboundop, fallback);
// This is the only place that raises QueueFull.
if (handle_queue_error(err, self, qid)) {
return NULL;
}
PyDoc_STRVAR(queuesmod_put_doc,
-"put(qid, obj, fmt)\n\
+"put(qid, obj)\n\
\n\
Add the object's data to the queue.");
int64_t qid = qidarg.id;
PyObject *obj = NULL;
- int fmt = 0;
int unboundop = 0;
- int err = queue_get(&_globals.queues, qid, &obj, &fmt, &unboundop);
+ int err = queue_get(&_globals.queues, qid, &obj, &unboundop);
// This is the only place that raises QueueEmpty.
if (handle_queue_error(err, self, qid)) {
return NULL;
}
if (obj == NULL) {
- return Py_BuildValue("Oii", Py_None, fmt, unboundop);
+ return Py_BuildValue("Oi", Py_None, unboundop);
}
- PyObject *res = Py_BuildValue("OiO", obj, fmt, Py_None);
+ PyObject *res = Py_BuildValue("OO", obj, Py_None);
Py_DECREF(obj);
return res;
}
PyDoc_STRVAR(queuesmod_get_doc,
-"get(qid) -> (obj, fmt)\n\
+"get(qid) -> (obj, unboundop)\n\
\n\
Return a new object from the data at the front of the queue.\n\
-The object's format is also returned.\n\
+The unbound op is also returned.\n\
\n\
If there is nothing to receive then raise QueueEmpty.");
}
int64_t qid = qidarg.id;
- _queue *queue = NULL;
- int err = _queues_lookup(&_globals.queues, qid, &queue);
+ struct _queuedefaults defaults;
+ int err = queue_get_defaults(&_globals.queues, qid, &defaults);
if (handle_queue_error(err, self, qid)) {
return NULL;
}
- int fmt = queue->defaults.fmt;
- int unboundop = queue->defaults.unboundop;
- _queue_unmark_waiter(queue, _globals.queues.mutex);
- PyObject *defaults = Py_BuildValue("ii", fmt, unboundop);
- return defaults;
+ PyObject *res = Py_BuildValue("ii", defaults.unboundop, defaults.fallback);
+ return res;
}
PyDoc_STRVAR(queuesmod_get_queue_defaults_doc,
}
+#ifdef HAS_FALLBACK
+static int
+resolve_fallback(int arg, xidata_fallback_t dflt,
+ xidata_fallback_t *p_fallback)
+{
+ if (arg < 0) {
+ *p_fallback = dflt;
+ return 0;
+ }
+ xidata_fallback_t fallback;
+ if (arg == _PyXIDATA_XIDATA_ONLY) {
+ fallback =_PyXIDATA_XIDATA_ONLY;
+ }
+ else if (arg == _PyXIDATA_FULL_FALLBACK) {
+ fallback = _PyXIDATA_FULL_FALLBACK;
+ }
+ else {
+ PyErr_Format(PyExc_ValueError, "unsupported fallback %d", arg);
+ return -1;
+ }
+ *p_fallback = fallback;
+ return 0;
+}
+#endif
+
+
/* unbound items ************************************************************/
#ifdef HAS_UNBOUND_ITEMS
+typedef int unboundop_t;
#define UNBOUND_REMOVE 1
#define UNBOUND_ERROR 2
#define UNBOUND_REPLACE 3
// object is released but the underlying data is copied (with the "raw"
// allocator) and used when the item is popped off the queue.
+#ifndef NDEBUG
static int
check_unbound(int unboundop)
{
return 0;
}
}
+#endif
+
+static int
+resolve_unboundop(int arg, unboundop_t dflt, unboundop_t *p_unboundop)
+{
+ if (arg < 0) {
+ *p_unboundop = dflt;
+ return 0;
+ }
+ unboundop_t op;
+ if (arg == UNBOUND_REMOVE) {
+ op = UNBOUND_REMOVE;
+ }
+ else if (arg == UNBOUND_ERROR) {
+ op = UNBOUND_ERROR;
+ }
+ else if (arg == UNBOUND_REPLACE) {
+ op = UNBOUND_REPLACE;
+ }
+ else {
+ PyErr_Format(PyExc_ValueError, "unsupported unboundop %d", arg);
+ return -1;
+ }
+ *p_unboundop = op;
+ return 0;
+}
#endif
return -1;
}
PyThreadState *tstate = PyThreadState_Get();
+ // XXX Use _PyObject_GetXIDataWithFallback()?
if (_PyObject_GetXIDataNoFallback(tstate, value, item->xidata) != 0) {
PyMem_RawFree(item->xidata);
item->xidata = NULL;