result_state: Optional[str] = None
job_path: Optional[str] = None
+ loop: Any = None
+ completed_jobs: Dict[str, str] = {}
+ thread_exception: Optional[Exception] = None
- def _wait_for_job_completion_handler(loop: Any) -> Any:
- completed_jobs: Dict[str, str] = {}
-
- def event_hander(_job_id: Any, path: Any, _unit: Any, state: Any) -> None:
- nonlocal result_state
- nonlocal completed_jobs
-
- # save the current job as completed
- completed_jobs[path] = state
-
- if job_path is not None and job_path in completed_jobs:
- # if we've already seen the job
- result_state = completed_jobs[job_path]
- loop.quit()
+ def job_removed_event(_job_id: Any, path: Any, _unit: Any, state: Any) -> None:
+ nonlocal result_state
+ nonlocal completed_jobs
+ nonlocal loop
- # if we already have the job path we are looking for and it's not been seen yet,
- # it's safe to remove all previous completed job references as we don't care
- if job_path is not None:
- completed_jobs.clear()
+ # save the current job as completed
+ completed_jobs[path] = state
- return event_hander
+ if job_path is not None and job_path in completed_jobs:
+ # if we've already seen the job
+ result_state = completed_jobs[job_path]
+ loop.quit()
- loop: Any = None
+ # if we already have the job path we are looking for and it's not been seen yet,
+ # it's safe to remove all previous completed job references as we don't care
+ if job_path is not None:
+ completed_jobs.clear()
- def timeout_stop_loop():
+ def timeout_event():
nonlocal loop
nonlocal result_state
result_state = "timeout"
loop.quit()
- def event_loop_isolation_thread() -> None:
- nonlocal loop
- loop = GLib.MainLoop()
- GLib.timeout_add_seconds(timeout_sec, timeout_stop_loop)
- systemd.JobRemoved.connect(_wait_for_job_completion_handler(loop))
- loop.run()
+ def idle_event():
+ # start the systemd task
+ nonlocal job_path
+ job_path = job_creating_func()
+ # do not repeat the event
+ return False
- # first start the thread to watch for results to prevent race conditions
+ def event_loop_isolation_thread() -> None:
+ try:
+ nonlocal loop
+ loop = GLib.MainLoop()
+ GLib.timeout_add_seconds(timeout_sec, timeout_event)
+ systemd.JobRemoved.connect(job_removed_event)
+ GLib.idle_add(idle_event)
+ loop.run()
+ except Exception as e:
+ nonlocal thread_exception
+ thread_exception = e
+
+ # start the event loop isolation thread handling the work
thread = Thread(target=event_loop_isolation_thread, name="glib-loop-isolation-thread")
thread.start()
- # then create the job
- try:
- job_path = job_creating_func()
- except BaseException:
- loop.quit()
- thread.join()
- raise
-
# then wait for the results
thread.join()
- if result_state == "timeout":
+ # the thread sets `result_state` and `thread_exception` variables, which we check here
+ if thread_exception is not None:
+ raise thread_exception # pyright: reportGeneralTypeIssues=false
+ if result_state == "timeout": # pyright: reportUnnecessaryComparison=false
raise SubprocessControllerTimeoutException(f"systemd job '{job_path}' did not finish in {timeout_sec} seconds")
if result_state != "done":
raise SubprocessControllerException(