]> git.ipfire.org Git - pakfire.git/commitdiff
daemon: Move log streaming into the main process
authorMichael Tremer <michael.tremer@ipfire.org>
Tue, 4 Feb 2025 11:43:54 +0000 (11:43 +0000)
committerMichael Tremer <michael.tremer@ipfire.org>
Tue, 4 Feb 2025 11:43:54 +0000 (11:43 +0000)
Now, we can get entirely rid of individual control connections for jobs.

Signed-off-by: Michael Tremer <michael.tremer@ipfire.org>
src/pakfire/daemon.c
src/pakfire/job.c
src/pakfire/job.h

index ea2e9c273880ebab88a03dc2f0c96ac7cfa0ec70..f8471d4945440d0e8accc99a209d3b2deaeac8b2 100644 (file)
@@ -482,6 +482,9 @@ static int pakfire_daemon_job(struct pakfire_daemon* daemon, json_object* m) {
        // Increment the number of running jobs
        daemon->running_jobs++;
 
+       // XXX For now, we always enabled log streaming
+       pakfire_job_launch_log_stream(job);
+
 ERROR:
        if (job)
                pakfire_job_unref(job);
@@ -575,6 +578,12 @@ static int pakfire_daemon_close(struct pakfire_xfer* xfer, int code, void* data)
                daemon->stats_timer = NULL;
        }
 
+       // Turn off log streaming for all jobs
+       for (int i = 0; i < MAX_JOBS; i++) {
+               if (daemon->jobs[i])
+                       pakfire_job_terminate_log_stream(daemon->jobs[i]);
+       }
+
        return 0;
 }
 
index 115e25f8ca5b6d2a5ab7ed9a7679a763911d888b..170e4ef84f38ced2824d84d03939e816510c0b83 100644 (file)
@@ -752,8 +752,7 @@ int pakfire_job_launch(struct pakfire_job* job) {
        return pakfire_job_parent(job);
 }
 
-#if 0
-int pakfire_job_send_log_line(struct pakfire_job* job,
+static int pakfire_job_send_log_line(struct pakfire_job* job,
                int priority, const char* line, size_t length) {
        struct json_object* message = NULL;
        struct json_object* data = NULL;
@@ -784,6 +783,11 @@ int pakfire_job_send_log_line(struct pakfire_job* job,
                goto ERROR;
        }
 
+       // Add the job ID
+       r = pakfire_json_add_string(message, "job_id", job->id);
+       if (r < 0)
+               goto ERROR;
+
        // Set type
        r = pakfire_json_add_string(message, "type", "log");
        if (r)
@@ -794,13 +798,9 @@ int pakfire_job_send_log_line(struct pakfire_job* job,
        if (r)
                goto ERROR;
 
-       // Serialize to string
-       const char* m = json_object_to_json_string_length(message,
-               JSON_C_TO_STRING_SPACED | JSON_C_TO_STRING_PRETTY, &length);
-
        // Send the message
-       r = pakfire_xfer_send_message(job->control, m, length);
-       if (r) {
+       r = pakfire_daemon_send_message(job->daemon, message);
+       if (r < 0) {
                ERROR(job->ctx, "Could not send log message: %s\n", strerror(-r));
                goto ERROR;
        }
@@ -821,10 +821,6 @@ static int pakfire_job_stream(sd_event_source* s, void* data) {
        int priority;
        int r;
 
-       // Do nothing if we are not connected
-       if (!job->control)
-               return 0;
-
        // Send as many log messages as possible
        for (;;) {
                // Try to dequeue a line from the log buffer
@@ -848,17 +844,7 @@ static int pakfire_job_stream(sd_event_source* s, void* data) {
        return 0;
 }
 
-static int pakfire_job_terminate_log_stream(struct pakfire_job* job) {
-       // Disable log streaming
-       if (job->log.stream) {
-               sd_event_source_unref(job->log.stream);
-               job->log.stream = NULL;
-       }
-
-       return 0;
-}
-
-static int pakfire_job_launch_log_stream(struct pakfire_job* job) {
+int pakfire_job_launch_log_stream(struct pakfire_job* job) {
        int r;
 
        // Make sure the previous event is actually gone
@@ -873,7 +859,16 @@ static int pakfire_job_launch_log_stream(struct pakfire_job* job) {
 
        return 0;
 }
-#endif
+
+int pakfire_job_terminate_log_stream(struct pakfire_job* job) {
+       // Disable log streaming
+       if (job->log.stream) {
+               sd_event_source_unref(job->log.stream);
+               job->log.stream = NULL;
+       }
+
+       return 0;
+}
 
 int pakfire_job_create(struct pakfire_job** job, struct pakfire_ctx* ctx,
                struct pakfire_daemon* daemon, json_object* data) {
index 24a6c571dd9b5eecfde082e5aa7f17bda949cee2..0e2904768877adeb599a4da54058167f4dbb1647 100644 (file)
@@ -40,5 +40,9 @@ int pakfire_job_launch(struct pakfire_job* job);
 // Terminate
 int pakfire_job_terminate(struct pakfire_job* worker, int signal);
 
+// Log Stream
+int pakfire_job_launch_log_stream(struct pakfire_job* job);
+int pakfire_job_terminate_log_stream(struct pakfire_job* job);
+
 #endif /* CURL_HAS_WEBSOCKETS */
 #endif /* PAKFIRE_JOB_H */