self.startedEvent.set()
self.fn(*self.args)
-def _doBlockingTest( block_func, block_args, trigger_func, trigger_args):
+def _doBlockingTest(block_func, block_args, trigger_func, trigger_args):
t = _TriggerThread(trigger_func, trigger_args)
t.start()
try:
finally:
# If we unblocked before our thread made the call, we failed!
if not t.startedEvent.isSet():
- raise TestFailed("blocking function '%r' appeared not to block" % (block_func,))
+ raise TestFailed("blocking function '%r' appeared not to block" %
+ block_func)
t.join(1) # make sure the thread terminates
if t.isAlive():
- raise TestFailed("trigger function '%r' appeared to not return" % (trigger_func,))
+ raise TestFailed("trigger function '%r' appeared to not return" %
+ trigger_func)
# A Queue subclass that can provoke failure at a moment's notice :)
class FailingQueueException(Exception):
# I guess we better check things actually queue correctly a little :)
q.put(111)
q.put(222)
- verify(q.get()==111 and q.get()==222, "Didn't seem to queue the correct data!")
+ verify(q.get() == 111 and q.get() == 222,
+ "Didn't seem to queue the correct data!")
for i in range(queue_size-1):
q.put(i)
verify(not q.full(), "Queue should not be full")
except Queue.Empty:
pass
# Test a blocking get
- _doBlockingTest( q.get, (), q.put, ('empty',))
- _doBlockingTest( q.get, (True, 0.2), q.put, ('empty',))
+ _doBlockingTest(q.get, (), q.put, ('empty',))
+ _doBlockingTest(q.get, (True, 0.2), q.put, ('empty',))
def test():
q=Queue.Queue(queue_size)