From: Ben Darnell Date: Sun, 16 Sep 2018 21:35:34 +0000 (-0400) Subject: autoreload,process: Add type annotations X-Git-Tag: v6.0.0b1~28^2~18 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=7f5799744a3e10900facef6161f42c7d508c4da8;p=thirdparty%2Ftornado.git autoreload,process: Add type annotations --- diff --git a/setup.cfg b/setup.cfg index c0f590ea8..91eb964ef 100644 --- a/setup.cfg +++ b/setup.cfg @@ -7,7 +7,7 @@ python_version = 3.5 [mypy-tornado.*,tornado.platform.*] disallow_untyped_defs = True -[mypy-tornado.auth,tornado.autoreload,tornado.curl_httpclient,tornado.httpclient,tornado.locks,tornado.process,tornado.queues,tornado.routing,tornado.simple_httpclient,tornado.template,tornado.web,tornado.websocket,tornado.wsgi] +[mypy-tornado.auth,tornado.curl_httpclient,tornado.httpclient,tornado.locks,tornado.queues,tornado.routing,tornado.simple_httpclient,tornado.template,tornado.web,tornado.websocket,tornado.wsgi] disallow_untyped_defs = False # It's generally too tedious to require type annotations in tests, but diff --git a/tornado/autoreload.py b/tornado/autoreload.py index c973fdd65..339c60c55 100644 --- a/tornado/autoreload.py +++ b/tornado/autoreload.py @@ -95,6 +95,11 @@ try: except ImportError: signal = None # type: ignore +import typing +from typing import Callable, Dict +if typing.TYPE_CHECKING: + from typing import List, Optional, Union # noqa: F401 + # os.execv is broken on Windows and can't properly parse command line # arguments and executable name if they contain whitespaces. subprocess # fixes that behavior. @@ -105,11 +110,11 @@ _reload_hooks = [] _reload_attempted = False _io_loops = weakref.WeakKeyDictionary() # type: ignore _autoreload_is_main = False -_original_argv = None +_original_argv = None # type: Optional[List[str]] _original_spec = None -def start(check_time=500): +def start(check_time: int=500) -> None: """Begins watching source files for changes. .. versionchanged:: 5.0 @@ -121,13 +126,13 @@ def start(check_time=500): _io_loops[io_loop] = True if len(_io_loops) > 1: gen_log.warning("tornado.autoreload started more than once in the same process") - modify_times = {} + modify_times = {} # type: Dict[str, float] callback = functools.partial(_reload_on_update, modify_times) scheduler = ioloop.PeriodicCallback(callback, check_time) scheduler.start() -def wait(): +def wait() -> None: """Wait for a watched file to change, then restart the process. Intended to be used at the end of scripts like unit test runners, @@ -139,7 +144,7 @@ def wait(): io_loop.start() -def watch(filename): +def watch(filename: str) -> None: """Add a file to the watch list. All imported modules are watched by default. @@ -147,7 +152,7 @@ def watch(filename): _watched_files.add(filename) -def add_reload_hook(fn): +def add_reload_hook(fn: Callable[[], None]) -> None: """Add a function to be called before reloading the process. Note that for open file and socket handles it is generally @@ -158,7 +163,7 @@ def add_reload_hook(fn): _reload_hooks.append(fn) -def _reload_on_update(modify_times): +def _reload_on_update(modify_times: Dict[str, float]) -> None: if _reload_attempted: # We already tried to reload and it didn't work, so don't try again. return @@ -184,7 +189,7 @@ def _reload_on_update(modify_times): _check_file(modify_times, path) -def _check_file(modify_times, path): +def _check_file(modify_times: Dict[str, float], path: str) -> None: try: modified = os.stat(path).st_mtime except Exception: @@ -197,7 +202,7 @@ def _check_file(modify_times, path): _reload() -def _reload(): +def _reload() -> None: global _reload_attempted _reload_attempted = True for fn in _reload_hooks: @@ -215,6 +220,7 @@ def _reload(): # sys.path[0] is an empty string and add the current directory to # $PYTHONPATH. if _autoreload_is_main: + assert _original_argv is not None spec = _original_spec argv = _original_argv else: @@ -246,7 +252,7 @@ def _reload(): # Unfortunately the errno returned in this case does not # appear to be consistent, so we can't easily check for # this error specifically. - os.spawnv(os.P_NOWAIT, sys.executable, [sys.executable] + argv) + os.spawnv(os.P_NOWAIT, sys.executable, [sys.executable] + argv) # type: ignore # At this point the IOLoop has been closed and finally # blocks will experience errors if we allow the stack to # unwind, so just exit uncleanly. @@ -260,7 +266,7 @@ Usage: """ -def main(): +def main() -> None: """Command-line wrapper to re-run a script whenever its source changes. Scripts may be specified by filename or module name:: @@ -342,7 +348,7 @@ def main(): # no longer in sys.modules. Figure out where it is and watch it. loader = pkgutil.get_loader(module) if loader is not None: - watch(loader.get_filename()) + watch(loader.get_filename()) # type: ignore wait() diff --git a/tornado/process.py b/tornado/process.py index 8f8dd3e07..5df6aa7ad 100644 --- a/tornado/process.py +++ b/tornado/process.py @@ -34,11 +34,16 @@ from tornado.log import gen_log from tornado.platform.auto import set_close_exec from tornado.util import errno_from_exception +import typing +from typing import Tuple, Optional, Any, Callable +if typing.TYPE_CHECKING: + from typing import List # noqa: F401 + # Re-export this exception for convenience. CalledProcessError = subprocess.CalledProcessError -def cpu_count(): +def cpu_count() -> int: """Returns the number of processors on this machine.""" if multiprocessing is None: return 1 @@ -54,7 +59,7 @@ def cpu_count(): return 1 -def _reseed_random(): +def _reseed_random() -> None: if 'random' not in sys.modules: return import random @@ -68,7 +73,7 @@ def _reseed_random(): random.seed(seed) -def _pipe_cloexec(): +def _pipe_cloexec() -> Tuple[int, int]: r, w = os.pipe() set_close_exec(r) set_close_exec(w) @@ -78,7 +83,7 @@ def _pipe_cloexec(): _task_id = None -def fork_processes(num_processes, max_restarts=100): +def fork_processes(num_processes: Optional[int], max_restarts: int=100) -> int: """Starts multiple worker processes. If ``num_processes`` is None or <= 0, we detect the number of cores @@ -110,7 +115,7 @@ def fork_processes(num_processes, max_restarts=100): gen_log.info("Starting %d processes", num_processes) children = {} - def start_child(i): + def start_child(i: int) -> Optional[int]: pid = os.fork() if pid == 0: # child process @@ -159,7 +164,7 @@ def fork_processes(num_processes, max_restarts=100): sys.exit(0) -def task_id(): +def task_id() -> Optional[int]: """Returns the current task id, if any. Returns None if this process was not created by `fork_processes`. @@ -193,13 +198,14 @@ class Subprocess(object): _initialized = False _waiting = {} # type: ignore + _old_sigchld = None - def __init__(self, *args, **kwargs): + def __init__(self, *args: Any, **kwargs: Any) -> None: self.io_loop = ioloop.IOLoop.current() # All FDs we create should be closed on error; those in to_close # should be closed in the parent process on success. - pipe_fds = [] - to_close = [] + pipe_fds = [] # type: List[int] + to_close = [] # type: List[int] if kwargs.get('stdin') is Subprocess.STREAM: in_r, in_w = _pipe_cloexec() kwargs['stdin'] = in_r @@ -226,13 +232,14 @@ class Subprocess(object): raise for fd in to_close: os.close(fd) - for attr in ['stdin', 'stdout', 'stderr', 'pid']: + self.pid = self.proc.pid + for attr in ['stdin', 'stdout', 'stderr']: if not hasattr(self, attr): # don't clobber streams set above setattr(self, attr, getattr(self.proc, attr)) - self._exit_callback = None - self.returncode = None + self._exit_callback = None # type: Optional[Callable[[int], None]] + self.returncode = None # type: Optional[int] - def set_exit_callback(self, callback): + def set_exit_callback(self, callback: Callable[[int], None]) -> None: """Runs ``callback`` when this process exits. The callback takes one argument, the return code of the process. @@ -252,7 +259,7 @@ class Subprocess(object): Subprocess._waiting[self.pid] = self Subprocess._try_cleanup_process(self.pid) - def wait_for_exit(self, raise_error=True): + def wait_for_exit(self, raise_error: bool=True) -> 'Future[int]': """Returns a `.Future` which resolves when the process exits. Usage:: @@ -268,19 +275,19 @@ class Subprocess(object): .. versionadded:: 4.2 """ - future = Future() + future = Future() # type: Future[int] - def callback(ret): + def callback(ret: int) -> None: if ret != 0 and raise_error: # Unfortunately we don't have the original args any more. - future.set_exception(CalledProcessError(ret, None)) + future.set_exception(CalledProcessError(ret, 'unknown')) else: future_set_result_unless_cancelled(future, ret) self.set_exit_callback(callback) return future @classmethod - def initialize(cls): + def initialize(cls) -> None: """Initializes the ``SIGCHLD`` handler. The signal handler is run on an `.IOLoop` to avoid locking issues. @@ -301,7 +308,7 @@ class Subprocess(object): cls._initialized = True @classmethod - def uninitialize(cls): + def uninitialize(cls) -> None: """Removes the ``SIGCHLD`` handler.""" if not cls._initialized: return @@ -309,12 +316,12 @@ class Subprocess(object): cls._initialized = False @classmethod - def _cleanup(cls): + def _cleanup(cls) -> None: for pid in list(cls._waiting.keys()): # make a copy cls._try_cleanup_process(pid) @classmethod - def _try_cleanup_process(cls, pid): + def _try_cleanup_process(cls, pid: int) -> None: try: ret_pid, status = os.waitpid(pid, os.WNOHANG) except OSError as e: @@ -327,7 +334,7 @@ class Subprocess(object): subproc.io_loop.add_callback_from_signal( subproc._set_returncode, status) - def _set_returncode(self, status): + def _set_returncode(self, status: int) -> None: if os.WIFSIGNALED(status): self.returncode = -os.WTERMSIG(status) else: diff --git a/tornado/test/process_test.py b/tornado/test/process_test.py index c9a13f2e3..58e16f896 100644 --- a/tornado/test/process_test.py +++ b/tornado/test/process_test.py @@ -131,6 +131,10 @@ class ProcessTest(unittest.TestCase): @skipIfNonUnix class SubprocessTest(AsyncTestCase): + def term_and_wait(self, subproc): + subproc.proc.terminate() + subproc.proc.wait() + @gen_test def test_subprocess(self): if IOLoop.configured_class().__name__.endswith('LayeredTwistedIOLoop'): @@ -144,7 +148,7 @@ class SubprocessTest(AsyncTestCase): subproc = Subprocess([sys.executable, '-u', '-i'], stdin=Subprocess.STREAM, stdout=Subprocess.STREAM, stderr=subprocess.STDOUT) - self.addCleanup(lambda: (subproc.proc.terminate(), subproc.proc.wait())) + self.addCleanup(lambda: self.term_and_wait(subproc)) self.addCleanup(subproc.stdout.close) self.addCleanup(subproc.stdin.close) yield subproc.stdout.read_until(b'>>> ') @@ -163,7 +167,7 @@ class SubprocessTest(AsyncTestCase): subproc = Subprocess([sys.executable, '-u', '-i'], stdin=Subprocess.STREAM, stdout=Subprocess.STREAM, stderr=subprocess.STDOUT) - self.addCleanup(lambda: (subproc.proc.terminate(), subproc.proc.wait())) + self.addCleanup(lambda: self.term_and_wait(subproc)) yield subproc.stdout.read_until(b'>>> ') subproc.stdin.close() data = yield subproc.stdout.read_until_close() @@ -176,7 +180,7 @@ class SubprocessTest(AsyncTestCase): subproc = Subprocess([sys.executable, '-u', '-c', r"import sys; sys.stderr.write('hello\n')"], stderr=Subprocess.STREAM) - self.addCleanup(lambda: (subproc.proc.terminate(), subproc.proc.wait())) + self.addCleanup(lambda: self.term_and_wait(subproc)) data = yield subproc.stderr.read_until(b'\n') self.assertEqual(data, b'hello\n') # More mysterious EBADF: This fails if done with self.addCleanup instead of here. @@ -208,7 +212,7 @@ class SubprocessTest(AsyncTestCase): stdout=Subprocess.STREAM) self.addCleanup(subproc.stdout.close) subproc.set_exit_callback(self.stop) - os.kill(subproc.pid, signal.SIGTERM) # type: ignore + os.kill(subproc.pid, signal.SIGTERM) try: ret = self.wait(timeout=1.0) except AssertionError: @@ -218,7 +222,8 @@ class SubprocessTest(AsyncTestCase): # (indicating that the problem is in the parent process's # signal handling) or did the child process somehow fail # to terminate? - subproc.stdout.read_until_close(callback=self.stop) + fut = subproc.stdout.read_until_close() + fut.add_done_callback(lambda f: self.stop()) # type: ignore try: self.wait(timeout=1.0) except AssertionError: