From 486abfb8a3b2e40fcfb87c5179252748913fcfca Mon Sep 17 00:00:00 2001 From: "A. Jesse Jiryu Davis" Date: Sun, 15 Mar 2015 21:09:12 -0400 Subject: [PATCH] Cancel timeouts if Queue.get or put are resolved first. --- tornado/queues.py | 20 ++++++++++++-------- tornado/test/queues_test.py | 17 +++++++++++++++++ 2 files changed, 29 insertions(+), 8 deletions(-) diff --git a/tornado/queues.py b/tornado/queues.py index 1e4a31aba..5986ccc96 100644 --- a/tornado/queues.py +++ b/tornado/queues.py @@ -33,6 +33,16 @@ class QueueFull(Exception): pass +def _set_timeout(future, timeout): + if timeout: + def on_timeout(): + future.set_exception(gen.TimeoutError()) + io_loop = ioloop.IOLoop.current() + timeout_handle = io_loop.add_timeout(timeout, on_timeout) + future.add_done_callback( + lambda _: io_loop.remove_timeout(timeout_handle)) + + class Queue(object): """Coordinate producer and consumer coroutines. @@ -82,10 +92,7 @@ class Queue(object): except QueueFull: future = Future() self._putters.append((item, future)) - if timeout: - def on_timeout(): - future.set_exception(gen.TimeoutError()) - ioloop.IOLoop.current().add_timeout(timeout, on_timeout) + _set_timeout(future, timeout) return future else: return gen._null_future @@ -117,10 +124,7 @@ class Queue(object): future.set_result(self.get_nowait()) except QueueEmpty: self._getters.append(future) - if timeout: - def on_timeout(): - future.set_exception(gen.TimeoutError()) - ioloop.IOLoop.current().add_timeout(timeout, on_timeout) + _set_timeout(future, timeout) return future def get_nowait(self): diff --git a/tornado/test/queues_test.py b/tornado/test/queues_test.py index 34b611315..ac2118332 100644 --- a/tornado/test/queues_test.py +++ b/tornado/test/queues_test.py @@ -116,6 +116,14 @@ class QueueGetTest(AsyncTestCase): q.put_nowait(0) self.assertEqual(0, (yield get)) + @gen_test + def test_get_timeout_preempted(self): + q = queues.Queue() + get = q.get(timeout=timedelta(seconds=0.01)) + q.put(0) + yield gen.sleep(0.02) + self.assertEqual(0, (yield get)) + @gen_test def test_get_clears_timed_out_putters(self): q = queues.Queue(1) @@ -208,6 +216,15 @@ class QueuePutTest(AsyncTestCase): # Final get() unblocked this putter. yield put + @gen_test + def test_put_timeout_preempted(self): + q = queues.Queue(1) + q.put_nowait(0) + put = q.put(1, timeout=timedelta(seconds=0.01)) + q.get() + yield gen.sleep(0.02) + yield put # No TimeoutError. + @gen_test def test_put_clears_timed_out_putters(self): q = queues.Queue(1) -- 2.47.2