import threading
except ImportError:
import dummy_threading as threading
+ self.maxsize = maxsize
self._init(maxsize)
# mutex must be held whenever the queue is mutating. All methods
# that acquire mutex must release it before returning. mutex
self.mutex.release()
return n
- def empty(self):
- """Return True if the queue is empty, False otherwise (not reliable!)."""
- self.mutex.acquire()
- n = self._empty()
- self.mutex.release()
- return n
-
- def full(self):
- """Return True if the queue is full, False otherwise (not reliable!)."""
- self.mutex.acquire()
- n = self._full()
- self.mutex.release()
- return n
-
def put(self, item, block=True, timeout=None):
"""Put an item into the queue.
self.not_full.acquire()
try:
if not block:
- if self._full():
+ if self.maxsize > 0 and self._qsize() == self.maxsize:
raise Full
elif timeout is None:
- while self._full():
- self.not_full.wait()
+ if self.maxsize > 0:
+ while self._qsize() == self.maxsize:
+ self.not_full.wait()
else:
if timeout < 0:
raise ValueError("'timeout' must be a positive number")
endtime = _time() + timeout
- while self._full():
- remaining = endtime - _time()
- if remaining <= 0.0:
- raise Full
- self.not_full.wait(remaining)
+ if self.maxsize > 0:
+ while self._qsize() == self.maxsize:
+ remaining = endtime - _time()
+ if remaining <= 0.0:
+ raise Full
+ self.not_full.wait(remaining)
self._put(item)
self.unfinished_tasks += 1
self.not_empty.notify()
self.not_empty.acquire()
try:
if not block:
- if self._empty():
+ if not self._qsize():
raise Empty
elif timeout is None:
- while self._empty():
+ while not self._qsize():
self.not_empty.wait()
else:
if timeout < 0:
raise ValueError("'timeout' must be a positive number")
endtime = _time() + timeout
- while self._empty():
+ while not self._qsize():
remaining = endtime - _time()
if remaining <= 0.0:
raise Empty
# Initialize the queue representation
def _init(self, maxsize):
- self.maxsize = maxsize
self.queue = deque()
def _qsize(self):
return len(self.queue)
- # Check whether the queue is empty
- def _empty(self):
- return not self.queue
-
- # Check whether the queue is full
- def _full(self):
- return self.maxsize > 0 and len(self.queue) == self.maxsize
-
# Put a new item in the queue
def _put(self, item):
self.queue.append(item)
QUEUE_SIZE = 5
+def qfull(q):
+ return q.maxsize > 0 and q.qsize() == q.maxsize
+
# A thread to run a function that unclogs a blocked Queue.
class _TriggerThread(threading.Thread):
def __init__(self, fn, args):
return Queue.Queue._get(self)
def FailingQueueTest(q):
- if not q.empty():
+ if q.qsize():
raise RuntimeError("Call this function with an empty queue")
for i in range(QUEUE_SIZE-1):
q.put(i)
except FailingQueueException:
pass
q.put("last")
- verify(q.full(), "Queue should be full")
+ verify(qfull(q), "Queue should be full")
# Test a failing blocking put
q.fail_next_put = True
try:
# Check the Queue isn't damaged.
# put failed, but get succeeded - re-add
q.put("last")
- verify(q.full(), "Queue should be full")
+ verify(qfull(q), "Queue should be full")
q.get()
- verify(not q.full(), "Queue should not be full")
+ verify(not qfull(q), "Queue should not be full")
q.put("last")
- verify(q.full(), "Queue should be full")
+ verify(qfull(q), "Queue should be full")
# Test a blocking put
_doBlockingTest( q.put, ("full",), q.get, ())
# Empty it
for i in range(QUEUE_SIZE):
q.get()
- verify(q.empty(), "Queue should be empty")
+ verify(not q.qsize(), "Queue should be empty")
q.put("first")
q.fail_next_get = True
try:
raise TestFailed("The queue didn't fail when it should have")
except FailingQueueException:
pass
- verify(not q.empty(), "Queue should not be empty")
+ verify(q.qsize(), "Queue should not be empty")
q.fail_next_get = True
try:
q.get(timeout=0.1)
raise TestFailed("The queue didn't fail when it should have")
except FailingQueueException:
pass
- verify(not q.empty(), "Queue should not be empty")
+ verify(q.qsize(), "Queue should not be empty")
q.get()
- verify(q.empty(), "Queue should be empty")
+ verify(not q.qsize(), "Queue should be empty")
q.fail_next_get = True
try:
_doExceptionalBlockingTest(q.get, (), q.put, ('empty',),
except FailingQueueException:
pass
# put succeeded, but get failed.
- verify(not q.empty(), "Queue should not be empty")
+ verify(q.qsize(), "Queue should not be empty")
q.get()
- verify(q.empty(), "Queue should be empty")
+ verify(not q.qsize(), "Queue should be empty")
def SimpleQueueTest(q):
- if not q.empty():
+ if q.qsize():
raise RuntimeError("Call this function with an empty queue")
# I guess we better check things actually queue correctly a little :)
q.put(111)
"Didn't seem to queue the correct data!")
for i in range(QUEUE_SIZE-1):
q.put(i)
- verify(not q.empty(), "Queue should not be empty")
- verify(not q.full(), "Queue should not be full")
+ verify(q.qsize(), "Queue should not be empty")
+ verify(not qfull(q), "Queue should not be full")
q.put("last")
- verify(q.full(), "Queue should be full")
+ verify(qfull(q), "Queue should be full")
try:
q.put("full", block=0)
raise TestFailed("Didn't appear to block with a full queue")
# Empty it
for i in range(QUEUE_SIZE):
q.get()
- verify(q.empty(), "Queue should be empty")
+ verify(not q.qsize(), "Queue should be empty")
try:
q.get(block=0)
raise TestFailed("Didn't appear to block with an empty queue")