From: Michael Tremer Date: Fri, 4 Oct 2024 16:45:39 +0000 (+0000) Subject: daemon: Remove the worker's own HTTP client and event loop X-Git-Tag: 0.9.30~1173 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=de72ddf2d3d70b02f26e89c8867f942e55772824;p=pakfire.git daemon: Remove the worker's own HTTP client and event loop It makes no sense to duplicate so much code, so the control connection of the worker will have to run in the main process, too. Signed-off-by: Michael Tremer --- diff --git a/src/libpakfire/daemon.c b/src/libpakfire/daemon.c index 4de32f5ab..8d15f5b73 100644 --- a/src/libpakfire/daemon.c +++ b/src/libpakfire/daemon.c @@ -777,6 +777,7 @@ int pakfire_daemon_worker_launched(struct pakfire_daemon* daemon, int pakfire_daemon_worker_exited(struct pakfire_daemon* daemon, struct pakfire_worker* worker) { CTX_DEBUG(daemon->ctx, "Deregistering worker %p\n", worker); +#if 0 for (unsigned int i = 0; i < MAX_WORKERS; i++) { if (daemon->workers[i] != worker) continue; @@ -787,6 +788,7 @@ int pakfire_daemon_worker_exited(struct pakfire_daemon* daemon, struct pakfire_w break; } +#endif return 0; } diff --git a/src/libpakfire/worker.c b/src/libpakfire/worker.c index e77d8da6f..ca54070d2 100644 --- a/src/libpakfire/worker.c +++ b/src/libpakfire/worker.c @@ -70,20 +70,6 @@ struct pakfire_worker { // PID File Descriptor int pidfd; -}; - -struct pakfire_worker_ctx { - // The main context - struct pakfire_ctx* ctx; - - // The worker - struct pakfire_worker* worker; - - // Event Loop - sd_event* loop; - - // HTTP Client - struct pakfire_httpclient* client; // Connection Timer and Holdoff Time sd_event_source* connect_timer; @@ -93,15 +79,6 @@ struct pakfire_worker_ctx { struct pakfire_xfer* control; }; -static int __pakfire_worker_terminate(sd_event_source* source, - const struct signalfd_siginfo* si, void* data) { - struct pakfire_worker* worker = data; - - CTX_DEBUG(worker->ctx, "Received signal to terminate...\n"); - - return sd_event_exit(sd_event_source_get_event(source), 0); -} - static int pakfire_parse_job(struct pakfire_worker* worker, json_object* data) { json_object* ccache = NULL; json_object* o = NULL; @@ -245,68 +222,66 @@ static int pakfire_worker_recv(struct pakfire_xfer* xfer, } static int pakfire_worker_closed(struct pakfire_xfer* xfer, int code, void* data) { - struct pakfire_worker_ctx* ctx = data; + struct pakfire_worker* worker = data; int r; // Drop the control connection - if (ctx->control) { - pakfire_xfer_unref(ctx->control); - ctx->control = NULL; + if (worker->control) { + pakfire_xfer_unref(worker->control); + worker->control = NULL; } - CTX_INFO(ctx->ctx, "Will attempt to reconnect in %u second(s)\n", - ctx->reconnect_holdoff / 1000000); - - sleep(1); + CTX_INFO(worker->ctx, "Will attempt to reconnect in %u second(s)\n", + worker->reconnect_holdoff / 1000000); // Set the reconnection timer - r = sd_event_source_set_time_relative(ctx->connect_timer, ctx->reconnect_holdoff); + r = sd_event_source_set_time_relative(worker->connect_timer, worker->reconnect_holdoff); if (r < 0) { - CTX_ERROR(ctx->ctx, "Could not set the reconnection timer: %s\n", strerror(-r)); + CTX_ERROR(worker->ctx, "Could not set the reconnection timer: %s\n", strerror(-r)); return r; } // Activate the timer - r = sd_event_source_set_enabled(ctx->connect_timer, SD_EVENT_ONESHOT); + r = sd_event_source_set_enabled(worker->connect_timer, SD_EVENT_ONESHOT); if (r < 0) { - CTX_ERROR(ctx->ctx, "Could not activate the connect timer: %s\n", strerror(-r)); + CTX_ERROR(worker->ctx, "Could not activate the connect timer: %s\n", strerror(-r)); return r; } // Double the holdoff time - ctx->reconnect_holdoff *= 2; + worker->reconnect_holdoff *= 2; // Cap reconnection attempts to every minute - if (ctx->reconnect_holdoff > 60000000) - ctx->reconnect_holdoff = 60000000; + if (worker->reconnect_holdoff > 60000000) + worker->reconnect_holdoff = 60000000; return 0; } static int pakfire_worker_connected(struct pakfire_xfer* xfer, void* data) { - struct pakfire_worker_ctx* ctx = data; + struct pakfire_worker* worker = data; - CTX_DEBUG(ctx->ctx, "Connected!\n"); + CTX_DEBUG(worker->ctx, "Connected!\n"); // Store a reference to the control connection - ctx->control = pakfire_xfer_ref(xfer); + worker->control = pakfire_xfer_ref(xfer); return 0; } static int pakfire_worker_connect(sd_event_source* s, uint64_t usec, void* data) { - struct pakfire_worker_ctx* ctx = data; + struct pakfire_worker* worker = data; struct pakfire_xfer* xfer = NULL; char job_id[UUID_STR_LEN]; int r; - CTX_INFO(ctx->ctx, "Connecting...\n"); + CTX_INFO(worker->ctx, "Connecting...\n"); // Format the job ID as string - uuid_unparse(ctx->worker->job_id, job_id); + uuid_unparse(worker->job_id, job_id); // Create a new xfer - r = pakfire_httpclient_create_xfer(&xfer, ctx->client, "/api/v1/jobs/%s", job_id); + r = pakfire_httpclient_create_xfer(&xfer, worker->client, "/api/v1/jobs/%s", job_id); if (r) goto ERROR; @@ -317,12 +292,12 @@ static int pakfire_worker_connect(sd_event_source* s, uint64_t usec, void* data) // Make this a WebSocket connection r = pakfire_xfer_socket(xfer, pakfire_worker_connected, - pakfire_worker_recv, NULL, pakfire_worker_closed, ctx); + pakfire_worker_recv, NULL, pakfire_worker_closed, worker); if (r) goto ERROR; // Enqueue the transfer - r = pakfire_httpclient_enqueue_xfer(ctx->client, xfer); + r = pakfire_httpclient_enqueue_xfer(worker->client, xfer); if (r) goto ERROR; @@ -373,11 +348,22 @@ int pakfire_worker_create(struct pakfire_worker** worker, struct pakfire_ctx* ct // Initialize the PID file descriptor w->pidfd = -1; + // Reconnect after one second + w->reconnect_holdoff = 1000000; + // Parse the job r = pakfire_parse_job(w, data); if (r) goto ERROR; + // Setup the reconnection timer + r = sd_event_add_time_relative(w->loop, &w->connect_timer, + CLOCK_MONOTONIC, 0, 0, pakfire_worker_connect, w); + if (r < 0) { + CTX_ERROR(w->ctx, "Could not register the connection timer: %s\n", strerror(-r)); + goto ERROR; + } + // Return the reference *worker = w; @@ -417,121 +403,14 @@ static int pakfire_worker_parent(struct pakfire_worker* worker) { return 0; } -static int pakfire_worker_setup_loop(struct pakfire_worker_ctx* ctx) { - int r; - - // Create a new event loop - r = sd_event_new(&ctx->loop); - if (r < 0) { - CTX_ERROR(ctx->ctx, "Could not setup event loop: %s\n", strerror(-r)); - return r; - } - - // Listen for SIGTERM - r = sd_event_add_signal(ctx->loop, NULL, SIGTERM|SD_EVENT_SIGNAL_PROCMASK, - __pakfire_worker_terminate, ctx->worker); - if (r < 0) { - CTX_ERROR(ctx->ctx, "Could not register handling SIGTERM: %s\n", strerror(-r)); - return r; - } - - // Listen for SIGINT - r = sd_event_add_signal(ctx->loop, NULL, SIGINT|SD_EVENT_SIGNAL_PROCMASK, - __pakfire_worker_terminate, ctx->worker); - if (r < 0) { - CTX_ERROR(ctx->ctx, "Could not register handling SIGINT: %s\n", strerror(-r)); - return r; - } - - // Setup the reconnection timer - r = sd_event_add_time_relative(ctx->loop, &ctx->connect_timer, - CLOCK_MONOTONIC, 0, 0, pakfire_worker_connect, ctx); - if (r < 0) { - CTX_ERROR(ctx->ctx, "Could not register the connection timer: %s\n", strerror(-r)); - return r; - } - - return 0; -} - -static int pakfire_worker_setup_httpclient(struct pakfire_worker_ctx* ctx) { - struct pakfire_config* config = NULL; - int r; - - // Fetch the configuration - config = pakfire_ctx_get_config(ctx->ctx); - if (!config) { - CTX_ERROR(ctx->ctx, "Could not fetch configuration: %m\n"); - r = -errno; - goto ERROR; - } - - // Fetch the URL - const char* url = pakfire_config_get(config, "daemon", "url", "https://pakfire.ipfire.org"); - - // Create the HTTP client - r = pakfire_httpclient_create(&ctx->client, ctx->ctx, ctx->loop); - if (r) - goto ERROR; - - // Configure the base URL - r = pakfire_httpclient_set_baseurl(ctx->client, url); - if (r) { - CTX_ERROR(ctx->ctx, "Could not configure the URL\n"); - goto ERROR; - } - -ERROR: - if (config) - pakfire_config_unref(config); - - return r; -} - static int pakfire_worker_child(struct pakfire_worker* worker) { - struct pakfire_worker_ctx ctx = { - .ctx = worker->ctx, - .worker = worker, - - // Reconnect after one second - .reconnect_holdoff = 1000000, - }; - int r; - // Fetch our PID pid_t pid = getpid(); CTX_DEBUG(worker->ctx, "Launched worker child as PID %d\n", pid); - // Setup a new event loop - r = pakfire_worker_setup_loop(&ctx); - if (r) - goto ERROR; - - // Setup the HTTP client - r = pakfire_worker_setup_httpclient(&ctx); - if (r) - goto ERROR; - - // Run the main loop - r = sd_event_loop(ctx.loop); - if (r < 0) { - CTX_ERROR(worker->ctx, "Could not run the event loop: %s\n", strerror(-r)); - goto ERROR; - } - -ERROR: - // Cleanup - if (ctx.control) - pakfire_xfer_unref(ctx.control); - if (ctx.client) - pakfire_httpclient_unref(ctx.client); - if (ctx.connect_timer) - sd_event_source_unref(ctx.connect_timer); - if (ctx.loop) - sd_event_unref(ctx.loop); - - return r; + // XXX TODO + return 1; } /*