[mypy-tornado.*,tornado.platform.*]
disallow_untyped_defs = True
-[mypy-tornado.auth,tornado.autoreload,tornado.curl_httpclient,tornado.httpclient,tornado.locks,tornado.process,tornado.queues,tornado.routing,tornado.simple_httpclient,tornado.template,tornado.web,tornado.websocket,tornado.wsgi]
+[mypy-tornado.auth,tornado.curl_httpclient,tornado.httpclient,tornado.locks,tornado.queues,tornado.routing,tornado.simple_httpclient,tornado.template,tornado.web,tornado.websocket,tornado.wsgi]
disallow_untyped_defs = False
# It's generally too tedious to require type annotations in tests, but
except ImportError:
signal = None # type: ignore
+import typing
+from typing import Callable, Dict
+if typing.TYPE_CHECKING:
+ from typing import List, Optional, Union # noqa: F401
+
# os.execv is broken on Windows and can't properly parse command line
# arguments and executable name if they contain whitespaces. subprocess
# fixes that behavior.
_reload_attempted = False
_io_loops = weakref.WeakKeyDictionary() # type: ignore
_autoreload_is_main = False
-_original_argv = None
+_original_argv = None # type: Optional[List[str]]
_original_spec = None
-def start(check_time=500):
+def start(check_time: int=500) -> None:
"""Begins watching source files for changes.
.. versionchanged:: 5.0
_io_loops[io_loop] = True
if len(_io_loops) > 1:
gen_log.warning("tornado.autoreload started more than once in the same process")
- modify_times = {}
+ modify_times = {} # type: Dict[str, float]
callback = functools.partial(_reload_on_update, modify_times)
scheduler = ioloop.PeriodicCallback(callback, check_time)
scheduler.start()
-def wait():
+def wait() -> None:
"""Wait for a watched file to change, then restart the process.
Intended to be used at the end of scripts like unit test runners,
io_loop.start()
-def watch(filename):
+def watch(filename: str) -> None:
"""Add a file to the watch list.
All imported modules are watched by default.
_watched_files.add(filename)
-def add_reload_hook(fn):
+def add_reload_hook(fn: Callable[[], None]) -> None:
"""Add a function to be called before reloading the process.
Note that for open file and socket handles it is generally
_reload_hooks.append(fn)
-def _reload_on_update(modify_times):
+def _reload_on_update(modify_times: Dict[str, float]) -> None:
if _reload_attempted:
# We already tried to reload and it didn't work, so don't try again.
return
_check_file(modify_times, path)
-def _check_file(modify_times, path):
+def _check_file(modify_times: Dict[str, float], path: str) -> None:
try:
modified = os.stat(path).st_mtime
except Exception:
_reload()
-def _reload():
+def _reload() -> None:
global _reload_attempted
_reload_attempted = True
for fn in _reload_hooks:
# sys.path[0] is an empty string and add the current directory to
# $PYTHONPATH.
if _autoreload_is_main:
+ assert _original_argv is not None
spec = _original_spec
argv = _original_argv
else:
# Unfortunately the errno returned in this case does not
# appear to be consistent, so we can't easily check for
# this error specifically.
- os.spawnv(os.P_NOWAIT, sys.executable, [sys.executable] + argv)
+ os.spawnv(os.P_NOWAIT, sys.executable, [sys.executable] + argv) # type: ignore
# At this point the IOLoop has been closed and finally
# blocks will experience errors if we allow the stack to
# unwind, so just exit uncleanly.
"""
-def main():
+def main() -> None:
"""Command-line wrapper to re-run a script whenever its source changes.
Scripts may be specified by filename or module name::
# no longer in sys.modules. Figure out where it is and watch it.
loader = pkgutil.get_loader(module)
if loader is not None:
- watch(loader.get_filename())
+ watch(loader.get_filename()) # type: ignore
wait()
from tornado.platform.auto import set_close_exec
from tornado.util import errno_from_exception
+import typing
+from typing import Tuple, Optional, Any, Callable
+if typing.TYPE_CHECKING:
+ from typing import List # noqa: F401
+
# Re-export this exception for convenience.
CalledProcessError = subprocess.CalledProcessError
-def cpu_count():
+def cpu_count() -> int:
"""Returns the number of processors on this machine."""
if multiprocessing is None:
return 1
return 1
-def _reseed_random():
+def _reseed_random() -> None:
if 'random' not in sys.modules:
return
import random
random.seed(seed)
-def _pipe_cloexec():
+def _pipe_cloexec() -> Tuple[int, int]:
r, w = os.pipe()
set_close_exec(r)
set_close_exec(w)
_task_id = None
-def fork_processes(num_processes, max_restarts=100):
+def fork_processes(num_processes: Optional[int], max_restarts: int=100) -> int:
"""Starts multiple worker processes.
If ``num_processes`` is None or <= 0, we detect the number of cores
gen_log.info("Starting %d processes", num_processes)
children = {}
- def start_child(i):
+ def start_child(i: int) -> Optional[int]:
pid = os.fork()
if pid == 0:
# child process
sys.exit(0)
-def task_id():
+def task_id() -> Optional[int]:
"""Returns the current task id, if any.
Returns None if this process was not created by `fork_processes`.
_initialized = False
_waiting = {} # type: ignore
+ _old_sigchld = None
- def __init__(self, *args, **kwargs):
+ def __init__(self, *args: Any, **kwargs: Any) -> None:
self.io_loop = ioloop.IOLoop.current()
# All FDs we create should be closed on error; those in to_close
# should be closed in the parent process on success.
- pipe_fds = []
- to_close = []
+ pipe_fds = [] # type: List[int]
+ to_close = [] # type: List[int]
if kwargs.get('stdin') is Subprocess.STREAM:
in_r, in_w = _pipe_cloexec()
kwargs['stdin'] = in_r
raise
for fd in to_close:
os.close(fd)
- for attr in ['stdin', 'stdout', 'stderr', 'pid']:
+ self.pid = self.proc.pid
+ for attr in ['stdin', 'stdout', 'stderr']:
if not hasattr(self, attr): # don't clobber streams set above
setattr(self, attr, getattr(self.proc, attr))
- self._exit_callback = None
- self.returncode = None
+ self._exit_callback = None # type: Optional[Callable[[int], None]]
+ self.returncode = None # type: Optional[int]
- def set_exit_callback(self, callback):
+ def set_exit_callback(self, callback: Callable[[int], None]) -> None:
"""Runs ``callback`` when this process exits.
The callback takes one argument, the return code of the process.
Subprocess._waiting[self.pid] = self
Subprocess._try_cleanup_process(self.pid)
- def wait_for_exit(self, raise_error=True):
+ def wait_for_exit(self, raise_error: bool=True) -> 'Future[int]':
"""Returns a `.Future` which resolves when the process exits.
Usage::
.. versionadded:: 4.2
"""
- future = Future()
+ future = Future() # type: Future[int]
- def callback(ret):
+ def callback(ret: int) -> None:
if ret != 0 and raise_error:
# Unfortunately we don't have the original args any more.
- future.set_exception(CalledProcessError(ret, None))
+ future.set_exception(CalledProcessError(ret, 'unknown'))
else:
future_set_result_unless_cancelled(future, ret)
self.set_exit_callback(callback)
return future
@classmethod
- def initialize(cls):
+ def initialize(cls) -> None:
"""Initializes the ``SIGCHLD`` handler.
The signal handler is run on an `.IOLoop` to avoid locking issues.
cls._initialized = True
@classmethod
- def uninitialize(cls):
+ def uninitialize(cls) -> None:
"""Removes the ``SIGCHLD`` handler."""
if not cls._initialized:
return
cls._initialized = False
@classmethod
- def _cleanup(cls):
+ def _cleanup(cls) -> None:
for pid in list(cls._waiting.keys()): # make a copy
cls._try_cleanup_process(pid)
@classmethod
- def _try_cleanup_process(cls, pid):
+ def _try_cleanup_process(cls, pid: int) -> None:
try:
ret_pid, status = os.waitpid(pid, os.WNOHANG)
except OSError as e:
subproc.io_loop.add_callback_from_signal(
subproc._set_returncode, status)
- def _set_returncode(self, status):
+ def _set_returncode(self, status: int) -> None:
if os.WIFSIGNALED(status):
self.returncode = -os.WTERMSIG(status)
else:
@skipIfNonUnix
class SubprocessTest(AsyncTestCase):
+ def term_and_wait(self, subproc):
+ subproc.proc.terminate()
+ subproc.proc.wait()
+
@gen_test
def test_subprocess(self):
if IOLoop.configured_class().__name__.endswith('LayeredTwistedIOLoop'):
subproc = Subprocess([sys.executable, '-u', '-i'],
stdin=Subprocess.STREAM,
stdout=Subprocess.STREAM, stderr=subprocess.STDOUT)
- self.addCleanup(lambda: (subproc.proc.terminate(), subproc.proc.wait()))
+ self.addCleanup(lambda: self.term_and_wait(subproc))
self.addCleanup(subproc.stdout.close)
self.addCleanup(subproc.stdin.close)
yield subproc.stdout.read_until(b'>>> ')
subproc = Subprocess([sys.executable, '-u', '-i'],
stdin=Subprocess.STREAM,
stdout=Subprocess.STREAM, stderr=subprocess.STDOUT)
- self.addCleanup(lambda: (subproc.proc.terminate(), subproc.proc.wait()))
+ self.addCleanup(lambda: self.term_and_wait(subproc))
yield subproc.stdout.read_until(b'>>> ')
subproc.stdin.close()
data = yield subproc.stdout.read_until_close()
subproc = Subprocess([sys.executable, '-u', '-c',
r"import sys; sys.stderr.write('hello\n')"],
stderr=Subprocess.STREAM)
- self.addCleanup(lambda: (subproc.proc.terminate(), subproc.proc.wait()))
+ self.addCleanup(lambda: self.term_and_wait(subproc))
data = yield subproc.stderr.read_until(b'\n')
self.assertEqual(data, b'hello\n')
# More mysterious EBADF: This fails if done with self.addCleanup instead of here.
stdout=Subprocess.STREAM)
self.addCleanup(subproc.stdout.close)
subproc.set_exit_callback(self.stop)
- os.kill(subproc.pid, signal.SIGTERM) # type: ignore
+ os.kill(subproc.pid, signal.SIGTERM)
try:
ret = self.wait(timeout=1.0)
except AssertionError:
# (indicating that the problem is in the parent process's
# signal handling) or did the child process somehow fail
# to terminate?
- subproc.stdout.read_until_close(callback=self.stop)
+ fut = subproc.stdout.read_until_close()
+ fut.add_done_callback(lambda f: self.stop()) # type: ignore
try:
self.wait(timeout=1.0)
except AssertionError: