]> git.ipfire.org Git - thirdparty/tornado.git/commitdiff
Reuse a single ThreadPoolExecutor for all ThreadedResolvers.
authorBen Darnell <ben@bendarnell.com>
Sat, 1 Jun 2013 21:30:33 +0000 (17:30 -0400)
committerBen Darnell <ben@bendarnell.com>
Sat, 1 Jun 2013 21:30:33 +0000 (17:30 -0400)
The primary motivation is that shutting down a ThreadPoolExecutor takes
100ms in the 2.x backported version of concurrent.futures.  It's also
generally unnecessary to create lots of DNS resolver threads just
because multiple resolver objects are used.

Document ExecutorResolver for public use.

tornado/netutil.py
tornado/test/netutil_test.py

index a7ba082f1c718aadd1e5f3b357b8629653894ea1..bbd11485a6bacffa33cefef8aace571f6c213dfb 100644 (file)
@@ -217,12 +217,27 @@ class Resolver(Configurable):
 
 
 class ExecutorResolver(Resolver):
-    def initialize(self, io_loop=None, executor=None):
+    """Resolver implementation using a `concurrent.futures.Executor`.
+
+    Use this instead of `ThreadedResolver` when you require additional
+    control over the executor being used.
+
+    The executor will be shut down when the resolver is closed unless
+    ``close_resolver=False``; use this if you want to reuse the same
+    executor elsewhere.
+    """
+    def initialize(self, io_loop=None, executor=None, close_executor=True):
         self.io_loop = io_loop or IOLoop.current()
-        self.executor = executor or dummy_executor
+        if executor is not None:
+            self.executor = executor
+            self.close_executor = close_executor
+        else:
+            self.executor = dummy_executor
+            self.close_executor = False
 
     def close(self):
-        self.executor.shutdown()
+        if self.close_executor:
+            self.executor.shutdown()
         self.executor = None
 
     @run_on_executor
@@ -260,11 +275,31 @@ class ThreadedResolver(ExecutorResolver):
 
         Resolver.configure('tornado.netutil.ThreadedResolver',
                            num_threads=10)
+
+    .. versionchanged:: 3.1
+       All ``ThreadedResolvers`` share a single thread pool, whose
+       size is set by the first one to be created.
     """
+    _threadpool = None
+    _threadpool_pid = None
+
     def initialize(self, io_loop=None, num_threads=10):
-        from concurrent.futures import ThreadPoolExecutor
+        threadpool = ThreadedResolver._create_threadpool(num_threads)
         super(ThreadedResolver, self).initialize(
-            io_loop=io_loop, executor=ThreadPoolExecutor(num_threads))
+            io_loop=io_loop, executor=threadpool, close_executor=False)
+
+    @classmethod
+    def _create_threadpool(cls, num_threads):
+        pid = os.getpid()
+        if cls._threadpool_pid != pid:
+            # Threads cannot survive after a fork, so if our pid isn't what it
+            # was when we created the pool then delete it.
+            cls._threadpool = None
+        if cls._threadpool is None:
+            from concurrent.futures import ThreadPoolExecutor
+            cls._threadpool = ThreadPoolExecutor(num_threads)
+            cls._threadpool_pid = pid
+        return cls._threadpool
 
 
 class OverrideResolver(Resolver):
index 245ef096fff3a92d87cdc509434bd3fd8492f1a4..cf587bcbd85f299cd3df4c2b34039ea79acdcdfe 100644 (file)
@@ -53,7 +53,7 @@ class ThreadedResolverTest(AsyncTestCase, _ResolverTestMixin):
         self.resolver = ThreadedResolver(io_loop=self.io_loop)
 
     def tearDown(self):
-        self.resolver.executor.shutdown()
+        self.resolver.close()
         super(ThreadedResolverTest, self).tearDown()