From: Michael Tremer Date: Tue, 4 Feb 2025 11:43:54 +0000 (+0000) Subject: daemon: Move log streaming into the main process X-Git-Tag: 0.9.30~156 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=b44b04db3d1df08f985c9449053c18143063db06;p=pakfire.git daemon: Move log streaming into the main process Now, we can get entirely rid of individual control connections for jobs. Signed-off-by: Michael Tremer --- diff --git a/src/pakfire/daemon.c b/src/pakfire/daemon.c index ea2e9c27..f8471d49 100644 --- a/src/pakfire/daemon.c +++ b/src/pakfire/daemon.c @@ -482,6 +482,9 @@ static int pakfire_daemon_job(struct pakfire_daemon* daemon, json_object* m) { // Increment the number of running jobs daemon->running_jobs++; + // XXX For now, we always enabled log streaming + pakfire_job_launch_log_stream(job); + ERROR: if (job) pakfire_job_unref(job); @@ -575,6 +578,12 @@ static int pakfire_daemon_close(struct pakfire_xfer* xfer, int code, void* data) daemon->stats_timer = NULL; } + // Turn off log streaming for all jobs + for (int i = 0; i < MAX_JOBS; i++) { + if (daemon->jobs[i]) + pakfire_job_terminate_log_stream(daemon->jobs[i]); + } + return 0; } diff --git a/src/pakfire/job.c b/src/pakfire/job.c index 115e25f8..170e4ef8 100644 --- a/src/pakfire/job.c +++ b/src/pakfire/job.c @@ -752,8 +752,7 @@ int pakfire_job_launch(struct pakfire_job* job) { return pakfire_job_parent(job); } -#if 0 -int pakfire_job_send_log_line(struct pakfire_job* job, +static int pakfire_job_send_log_line(struct pakfire_job* job, int priority, const char* line, size_t length) { struct json_object* message = NULL; struct json_object* data = NULL; @@ -784,6 +783,11 @@ int pakfire_job_send_log_line(struct pakfire_job* job, goto ERROR; } + // Add the job ID + r = pakfire_json_add_string(message, "job_id", job->id); + if (r < 0) + goto ERROR; + // Set type r = pakfire_json_add_string(message, "type", "log"); if (r) @@ -794,13 +798,9 @@ int pakfire_job_send_log_line(struct pakfire_job* job, if (r) goto ERROR; - // Serialize to string - const char* m = json_object_to_json_string_length(message, - JSON_C_TO_STRING_SPACED | JSON_C_TO_STRING_PRETTY, &length); - // Send the message - r = pakfire_xfer_send_message(job->control, m, length); - if (r) { + r = pakfire_daemon_send_message(job->daemon, message); + if (r < 0) { ERROR(job->ctx, "Could not send log message: %s\n", strerror(-r)); goto ERROR; } @@ -821,10 +821,6 @@ static int pakfire_job_stream(sd_event_source* s, void* data) { int priority; int r; - // Do nothing if we are not connected - if (!job->control) - return 0; - // Send as many log messages as possible for (;;) { // Try to dequeue a line from the log buffer @@ -848,17 +844,7 @@ static int pakfire_job_stream(sd_event_source* s, void* data) { return 0; } -static int pakfire_job_terminate_log_stream(struct pakfire_job* job) { - // Disable log streaming - if (job->log.stream) { - sd_event_source_unref(job->log.stream); - job->log.stream = NULL; - } - - return 0; -} - -static int pakfire_job_launch_log_stream(struct pakfire_job* job) { +int pakfire_job_launch_log_stream(struct pakfire_job* job) { int r; // Make sure the previous event is actually gone @@ -873,7 +859,16 @@ static int pakfire_job_launch_log_stream(struct pakfire_job* job) { return 0; } -#endif + +int pakfire_job_terminate_log_stream(struct pakfire_job* job) { + // Disable log streaming + if (job->log.stream) { + sd_event_source_unref(job->log.stream); + job->log.stream = NULL; + } + + return 0; +} int pakfire_job_create(struct pakfire_job** job, struct pakfire_ctx* ctx, struct pakfire_daemon* daemon, json_object* data) { diff --git a/src/pakfire/job.h b/src/pakfire/job.h index 24a6c571..0e290476 100644 --- a/src/pakfire/job.h +++ b/src/pakfire/job.h @@ -40,5 +40,9 @@ int pakfire_job_launch(struct pakfire_job* job); // Terminate int pakfire_job_terminate(struct pakfire_job* worker, int signal); +// Log Stream +int pakfire_job_launch_log_stream(struct pakfire_job* job); +int pakfire_job_terminate_log_stream(struct pakfire_job* job); + #endif /* CURL_HAS_WEBSOCKETS */ #endif /* PAKFIRE_JOB_H */