From 3910bcd75c65d8a58989ae760c35ca79767a1983 Mon Sep 17 00:00:00 2001 From: Michael Tremer Date: Tue, 4 Feb 2025 11:20:33 +0000 Subject: [PATCH] jobs: Drop the control connect and explicitely launch them Signed-off-by: Michael Tremer --- src/pakfire/daemon.c | 8 ++- src/pakfire/job.c | 142 ++----------------------------------------- src/pakfire/job.h | 3 + 3 files changed, 13 insertions(+), 140 deletions(-) diff --git a/src/pakfire/daemon.c b/src/pakfire/daemon.c index b5f6eab4..3b55e4ec 100644 --- a/src/pakfire/daemon.c +++ b/src/pakfire/daemon.c @@ -452,7 +452,13 @@ static int pakfire_daemon_job(struct pakfire_daemon* daemon, json_object* m) { r = pakfire_job_create(&job, daemon->ctx, daemon, data); if (r) { ERROR(daemon->ctx, "Could not create a new job: %s\n", strerror(-r)); + goto ERROR; + } + // Launch the job + r = pakfire_job_launch(job); + if (r < 0) { + ERROR(daemon->ctx, "Failed to launch the job: %s\n", strerror(-r)); goto ERROR; } @@ -467,8 +473,6 @@ static int pakfire_daemon_job(struct pakfire_daemon* daemon, json_object* m) { // Increment the number of running jobs daemon->running_jobs++; - // XXX TODO We need to do something with the new job - ERROR: if (job) pakfire_job_unref(job); diff --git a/src/pakfire/job.c b/src/pakfire/job.c index 3335426a..115e25f8 100644 --- a/src/pakfire/job.c +++ b/src/pakfire/job.c @@ -100,13 +100,6 @@ struct pakfire_job { sd_event_source* stream; } log; - // Connection Timer and Holdoff Time - sd_event_source* connect_timer; - unsigned int reconnect_holdoff; - - // Control Connection - struct pakfire_xfer* control; - // Track the current state enum { PAKFIRE_JOB_STATE_INIT = 0, @@ -713,7 +706,7 @@ ERROR: /* Launches the job */ -static int pakfire_job_launch(struct pakfire_job* job) { +int pakfire_job_launch(struct pakfire_job* job) { int pid; int r; @@ -759,13 +752,8 @@ static int pakfire_job_launch(struct pakfire_job* job) { return pakfire_job_parent(job); } -static int pakfire_job_recv(struct pakfire_xfer* xfer, - const char* message, const size_t size, void* data) { - // XXX TODO - return 0; -} - -static int pakfire_job_send_log_line(struct pakfire_job* job, +#if 0 +int pakfire_job_send_log_line(struct pakfire_job* job, int priority, const char* line, size_t length) { struct json_object* message = NULL; struct json_object* data = NULL; @@ -885,118 +873,7 @@ static int pakfire_job_launch_log_stream(struct pakfire_job* job) { return 0; } - -static int pakfire_job_closed(struct pakfire_xfer* xfer, int code, void* data) { - struct pakfire_job* job = data; - int r; - - // Drop the control connection - if (job->control) { - pakfire_xfer_unref(job->control); - job->control = NULL; - } - - // Turn off the log stream - pakfire_job_terminate_log_stream(job); - - INFO(job->ctx, "Will attempt to reconnect in %u second(s)\n", - job->reconnect_holdoff / 1000000); - - // Set the reconnection timer - r = sd_event_source_set_time_relative(job->connect_timer, job->reconnect_holdoff); - if (r < 0) { - ERROR(job->ctx, "Could not set the reconnection timer: %s\n", strerror(-r)); - return r; - } - - // Activate the timer - r = sd_event_source_set_enabled(job->connect_timer, SD_EVENT_ONESHOT); - if (r < 0) { - ERROR(job->ctx, "Could not activate the connect timer: %s\n", strerror(-r)); - return r; - } - - // Double the holdoff time - job->reconnect_holdoff *= 2; - - // Cap reconnection attempts to every minute - if (job->reconnect_holdoff > 60000000) - job->reconnect_holdoff = 60000000; - - return 0; -} - -static int pakfire_job_connected(struct pakfire_xfer* xfer, void* data) { - struct pakfire_job* job = data; - int r; - - DEBUG(job->ctx, "Connected!\n"); - - // Store a reference to the control connection - job->control = pakfire_xfer_ref(xfer); - - // Launch the log stream - r = pakfire_job_launch_log_stream(job); - if (r < 0) - return r; - - switch (job->state) { - // If the job has connected for the first time, we launch it - case PAKFIRE_JOB_STATE_INIT: - return pakfire_job_launch(job); - - // Otherwise there is nothing to do - default: - break; - } - - return 0; -} - -static int pakfire_job_connect(sd_event_source* s, uint64_t usec, void* data) { - struct pakfire_job* job = data; - struct pakfire_httpclient* client = NULL; - struct pakfire_xfer* xfer = NULL; - int r; - - INFO(job->ctx, "Connecting to job %s...\n", job->id); - - // Fetch a reference to the HTTP client - client = pakfire_daemon_httpclient(job->daemon); - if (!client) { - r = -errno; - goto ERROR; - } - - // Create a new xfer - r = pakfire_job_xfer_create(&xfer, job, "/api/v1/jobs/%s", job->id); - if (r) - goto ERROR; - - // Enable authentication - r = pakfire_xfer_auth(xfer); - if (r) - goto ERROR; - - // Make this a WebSocket connection - r = pakfire_xfer_socket(xfer, pakfire_job_connected, - pakfire_job_recv, NULL, pakfire_job_closed, job); - if (r) - goto ERROR; - - // Enqueue the transfer - r = pakfire_httpclient_enqueue(client, xfer); - if (r) - goto ERROR; - -ERROR: - if (xfer) - pakfire_xfer_unref(xfer); - if (client) - pakfire_httpclient_unref(client); - - return r; -} +#endif int pakfire_job_create(struct pakfire_job** job, struct pakfire_ctx* ctx, struct pakfire_daemon* daemon, json_object* data) { @@ -1041,22 +918,11 @@ int pakfire_job_create(struct pakfire_job** job, struct pakfire_ctx* ctx, if (r < 0) goto ERROR; - // Reconnect after one second - j->reconnect_holdoff = 1000000; - // Parse the job r = pakfire_parse_job(j, data); if (r) goto ERROR; - // Setup the reconnection timer - r = sd_event_add_time_relative(j->loop, &j->connect_timer, - CLOCK_MONOTONIC, 0, 0, pakfire_job_connect, j); - if (r < 0) { - ERROR(j->ctx, "Could not register the connection timer: %s\n", strerror(-r)); - goto ERROR; - } - // Return the reference *job = j; diff --git a/src/pakfire/job.h b/src/pakfire/job.h index 0c9c8b8a..24a6c571 100644 --- a/src/pakfire/job.h +++ b/src/pakfire/job.h @@ -35,6 +35,9 @@ struct pakfire_job* pakfire_job_ref(struct pakfire_job* worker); struct pakfire_job* pakfire_job_unref(struct pakfire_job* worker); // Launch +int pakfire_job_launch(struct pakfire_job* job); + +// Terminate int pakfire_job_terminate(struct pakfire_job* worker, int signal); #endif /* CURL_HAS_WEBSOCKETS */ -- 2.39.5