]> git.ipfire.org Git - thirdparty/knot-resolver.git/commitdiff
manager: systemd integration: fixed race condition while waiting for systemd job...
authorVasek Sraier <git@vakabus.cz>
Sun, 17 Apr 2022 14:04:43 +0000 (16:04 +0200)
committerVasek Sraier <git@vakabus.cz>
Sun, 17 Apr 2022 14:56:12 +0000 (16:56 +0200)
We were starting a result monitoring thread, but we did not wait for it to properly start. Therefore,
the systemd job could finish before we actually started checking

manager/knot_resolver_manager/kresd_controller/systemd/dbus_api.py

index adb7b70ee50ab61fb147f9318782b35c6bf392cd..e38f4ce4d519b6c220ef2d547a9563fdf090e18b 100644 (file)
@@ -69,60 +69,64 @@ def _wait_for_job_completion(systemd: Any, job_creating_func: Callable[[], str],
 
     result_state: Optional[str] = None
     job_path: Optional[str] = None
+    loop: Any = None
+    completed_jobs: Dict[str, str] = {}
+    thread_exception: Optional[Exception] = None
 
-    def _wait_for_job_completion_handler(loop: Any) -> Any:
-        completed_jobs: Dict[str, str] = {}
-
-        def event_hander(_job_id: Any, path: Any, _unit: Any, state: Any) -> None:
-            nonlocal result_state
-            nonlocal completed_jobs
-
-            # save the current job as completed
-            completed_jobs[path] = state
-
-            if job_path is not None and job_path in completed_jobs:
-                # if we've already seen the job
-                result_state = completed_jobs[job_path]
-                loop.quit()
+    def job_removed_event(_job_id: Any, path: Any, _unit: Any, state: Any) -> None:
+        nonlocal result_state
+        nonlocal completed_jobs
+        nonlocal loop
 
-            # if we already have the job path we are looking for and it's not been seen yet,
-            # it's safe to remove all previous completed job references as we don't care
-            if job_path is not None:
-                completed_jobs.clear()
+        # save the current job as completed
+        completed_jobs[path] = state
 
-        return event_hander
+        if job_path is not None and job_path in completed_jobs:
+            # if we've already seen the job
+            result_state = completed_jobs[job_path]
+            loop.quit()
 
-    loop: Any = None
+        # if we already have the job path we are looking for and it's not been seen yet,
+        # it's safe to remove all previous completed job references as we don't care
+        if job_path is not None:
+            completed_jobs.clear()
 
-    def timeout_stop_loop():
+    def timeout_event():
         nonlocal loop
         nonlocal result_state
         result_state = "timeout"
         loop.quit()
 
-    def event_loop_isolation_thread() -> None:
-        nonlocal loop
-        loop = GLib.MainLoop()
-        GLib.timeout_add_seconds(timeout_sec, timeout_stop_loop)
-        systemd.JobRemoved.connect(_wait_for_job_completion_handler(loop))
-        loop.run()
+    def idle_event():
+        # start the systemd task
+        nonlocal job_path
+        job_path = job_creating_func()
+        # do not repeat the event
+        return False
 
-    # first start the thread to watch for results to prevent race conditions
+    def event_loop_isolation_thread() -> None:
+        try:
+            nonlocal loop
+            loop = GLib.MainLoop()
+            GLib.timeout_add_seconds(timeout_sec, timeout_event)
+            systemd.JobRemoved.connect(job_removed_event)
+            GLib.idle_add(idle_event)
+            loop.run()
+        except Exception as e:
+            nonlocal thread_exception
+            thread_exception = e
+
+    # start the event loop isolation thread handling the work
     thread = Thread(target=event_loop_isolation_thread, name="glib-loop-isolation-thread")
     thread.start()
 
-    # then create the job
-    try:
-        job_path = job_creating_func()
-    except BaseException:
-        loop.quit()
-        thread.join()
-        raise
-
     # then wait for the results
     thread.join()
 
-    if result_state == "timeout":
+    # the thread sets `result_state` and `thread_exception` variables, which we check here
+    if thread_exception is not None:
+        raise thread_exception  # pyright: reportGeneralTypeIssues=false
+    if result_state == "timeout":  # pyright: reportUnnecessaryComparison=false
         raise SubprocessControllerTimeoutException(f"systemd job '{job_path}' did not finish in {timeout_sec} seconds")
     if result_state != "done":
         raise SubprocessControllerException(