def qsize(self):
return _queues.get_count(self._id)
- def put(self, obj, timeout=None, *,
+ def put(self, obj, block=True, timeout=None, *,
unbounditems=None,
_delay=10 / 1000, # 10 milliseconds
):
"""Add the object to the queue.
- This blocks while the queue is full.
+ If "block" is true, this blocks while the queue is full.
For most objects, the object received through Queue.get() will
be a new one, equivalent to the original and not sharing any
If "unbounditems" is UNBOUND then it is returned by get() in place
of the unbound item.
"""
+ if not block:
+ return self.put_nowait(obj, unbounditems=unbounditems)
if unbounditems is None:
unboundop = -1
else:
unboundop, = _serialize_unbound(unbounditems)
_queues.put(self._id, obj, unboundop)
- def get(self, timeout=None, *,
+ def get(self, block=True, timeout=None, *,
_delay=10 / 1000, # 10 milliseconds
):
"""Return the next object from the queue.
- This blocks while the queue is empty.
+ If "block" is true, this blocks while the queue is empty.
If the next item's original interpreter has been destroyed
then the "next object" is determined by the value of the
"unbounditems" argument to put().
"""
+ if not block:
+ return self.get_nowait()
if timeout is not None:
timeout = int(timeout)
if timeout < 0:
import test._crossinterp_definitions as defs
from .utils import _run_output, TestBase as _TestBase
-
+HUGE_TIMEOUT = 3600
REPLACE = _crossinterp._UNBOUND_CONSTANT_TO_FLAG[_crossinterp.UNBOUND]
queue.put(None)
with self.assertRaises(queues.QueueFull):
queue.put(None, timeout=0.1)
+ with self.assertRaises(queues.QueueFull):
+ queue.put(None, HUGE_TIMEOUT, 0.1)
queue.get()
queue.put(None)
queue.put_nowait(None)
with self.assertRaises(queues.QueueFull):
queue.put_nowait(None)
+ with self.assertRaises(queues.QueueFull):
+ queue.put(None, False)
+ with self.assertRaises(queues.QueueFull):
+ queue.put(None, False, timeout=HUGE_TIMEOUT)
queue.get()
queue.put_nowait(None)
queue = queues.create()
with self.assertRaises(queues.QueueEmpty):
queue.get(timeout=0.1)
+ with self.assertRaises(queues.QueueEmpty):
+ queue.get(HUGE_TIMEOUT, 0.1)
def test_get_nowait(self):
queue = queues.create()
with self.assertRaises(queues.QueueEmpty):
queue.get_nowait()
+ with self.assertRaises(queues.QueueEmpty):
+ queue.get(False)
+ with self.assertRaises(queues.QueueEmpty):
+ queue.get(False, timeout=HUGE_TIMEOUT)
def test_put_get_full_fallback(self):
expected = list(range(20))