From: Ben Darnell Date: Sat, 1 Jun 2013 21:30:33 +0000 (-0400) Subject: Reuse a single ThreadPoolExecutor for all ThreadedResolvers. X-Git-Tag: v3.1.0~17 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=be529443b047752b3c75282a8760d4697963777c;p=thirdparty%2Ftornado.git Reuse a single ThreadPoolExecutor for all ThreadedResolvers. The primary motivation is that shutting down a ThreadPoolExecutor takes 100ms in the 2.x backported version of concurrent.futures. It's also generally unnecessary to create lots of DNS resolver threads just because multiple resolver objects are used. Document ExecutorResolver for public use. --- diff --git a/tornado/netutil.py b/tornado/netutil.py index a7ba082f1..bbd11485a 100644 --- a/tornado/netutil.py +++ b/tornado/netutil.py @@ -217,12 +217,27 @@ class Resolver(Configurable): class ExecutorResolver(Resolver): - def initialize(self, io_loop=None, executor=None): + """Resolver implementation using a `concurrent.futures.Executor`. + + Use this instead of `ThreadedResolver` when you require additional + control over the executor being used. + + The executor will be shut down when the resolver is closed unless + ``close_resolver=False``; use this if you want to reuse the same + executor elsewhere. + """ + def initialize(self, io_loop=None, executor=None, close_executor=True): self.io_loop = io_loop or IOLoop.current() - self.executor = executor or dummy_executor + if executor is not None: + self.executor = executor + self.close_executor = close_executor + else: + self.executor = dummy_executor + self.close_executor = False def close(self): - self.executor.shutdown() + if self.close_executor: + self.executor.shutdown() self.executor = None @run_on_executor @@ -260,11 +275,31 @@ class ThreadedResolver(ExecutorResolver): Resolver.configure('tornado.netutil.ThreadedResolver', num_threads=10) + + .. versionchanged:: 3.1 + All ``ThreadedResolvers`` share a single thread pool, whose + size is set by the first one to be created. """ + _threadpool = None + _threadpool_pid = None + def initialize(self, io_loop=None, num_threads=10): - from concurrent.futures import ThreadPoolExecutor + threadpool = ThreadedResolver._create_threadpool(num_threads) super(ThreadedResolver, self).initialize( - io_loop=io_loop, executor=ThreadPoolExecutor(num_threads)) + io_loop=io_loop, executor=threadpool, close_executor=False) + + @classmethod + def _create_threadpool(cls, num_threads): + pid = os.getpid() + if cls._threadpool_pid != pid: + # Threads cannot survive after a fork, so if our pid isn't what it + # was when we created the pool then delete it. + cls._threadpool = None + if cls._threadpool is None: + from concurrent.futures import ThreadPoolExecutor + cls._threadpool = ThreadPoolExecutor(num_threads) + cls._threadpool_pid = pid + return cls._threadpool class OverrideResolver(Resolver): diff --git a/tornado/test/netutil_test.py b/tornado/test/netutil_test.py index 245ef096f..cf587bcbd 100644 --- a/tornado/test/netutil_test.py +++ b/tornado/test/netutil_test.py @@ -53,7 +53,7 @@ class ThreadedResolverTest(AsyncTestCase, _ResolverTestMixin): self.resolver = ThreadedResolver(io_loop=self.io_loop) def tearDown(self): - self.resolver.executor.shutdown() + self.resolver.close() super(ThreadedResolverTest, self).tearDown()