import textwrap
import time
import platform
+import weakref
from tornado.concurrent import return_future
from tornado.escape import url_escape
@skipNotCPython
def test_task_refcounting(self):
- # Task with arguments on CPython should be released immediately after
- # engine stop. And should not be delayed to garbage collection.
- class Task(gen.Task):
- "Task class which count self instances release."
- released = 0
- def __del__(self):
- type(self).released += 1
-
+ # On CPython, tasks and their arguments should be released immediately
+ # without waiting for garbage collection.
@gen.engine
def f():
- yield Task(self.io_loop.add_callback, arg=Task(None))
+ class Foo(object): pass
+ arg = Foo()
+ self.arg_ref = weakref.ref(arg)
+ task = gen.Task(self.io_loop.add_callback, arg=arg)
+ self.task_ref = weakref.ref(task)
+ yield task
self.stop()
self.run_gen(f)
- self.assertEqual(Task.released, 2)
+ self.assertIs(self.arg_ref(), None)
+ self.assertIs(self.task_ref(), None)
class GenCoroutineTest(AsyncTestCase):