super().__init__()
+_NOT_RUNNING = "<not running>"
+
+
class WorkerThread(threading.Thread):
def __init__(self, worker_id: int, runner: "RunWorkers") -> None:
super().__init__()
self.output = runner.output
self.timeout = runner.worker_timeout
self.log = runner.log
- self.test_name: TestName | None = None
- self.start_time: float | None = None
+ self.test_name = _NOT_RUNNING
+ self.start_time = time.monotonic()
self._popen: subprocess.Popen[str] | None = None
self._killed = False
self._stopped = False
popen = self._popen
if popen is not None:
dt = time.monotonic() - self.start_time
- info.extend((f'pid={self._popen.pid}',
+ info.extend((f'pid={popen.pid}',
f'time={format_duration(dt)}'))
return '<%s>' % ' '.join(info)
except WorkerError as exc:
mp_result = exc.mp_result
finally:
- self.test_name = None
+ self.test_name = _NOT_RUNNING
mp_result.result.duration = time.monotonic() - self.start_time
self.output.put((False, mp_result))
def _wait_completed(self) -> None:
popen = self._popen
+ # only needed for mypy:
+ if popen is None:
+ raise ValueError("Should never access `._popen` before calling `.run()`")
try:
popen.wait(WAIT_COMPLETED_TIMEOUT)
self.worker_timeout: float | None = min(self.timeout * 1.5, self.timeout + 5 * 60)
else:
self.worker_timeout = None
- self.workers: list[WorkerThread] | None = None
+ self.workers: list[WorkerThread] = []
jobs = self.runtests.get_jobs()
if jobs is not None:
processes = plural(nworkers, "process", "processes")
msg = (f"Run {tests} in parallel using "
f"{nworkers} worker {processes}")
- if self.timeout:
+ if self.timeout and self.worker_timeout is not None:
msg += (" (timeout: %s, worker timeout: %s)"
% (format_duration(self.timeout),
format_duration(self.worker_timeout)))
if mp_result.err_msg:
# WORKER_BUG
text += ' (%s)' % mp_result.err_msg
- elif (result.duration >= PROGRESS_MIN_TIME and not pgo):
+ elif (result.duration and result.duration >= PROGRESS_MIN_TIME and not pgo):
text += ' (%s)' % format_duration(result.duration)
if not pgo:
running = get_running(self.workers)