From: Michael Tremer Date: Fri, 4 Oct 2024 14:20:37 +0000 (+0000) Subject: worker: Initialize the event loop and cURL in the child process X-Git-Tag: 0.9.30~1177 X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=24fa8dcc1464444b52e5b4187a7e333cfd0fa46b;p=pakfire.git worker: Initialize the event loop and cURL in the child process cURL is not multi-threaded and so to avoid any problems, we will initialize everything in the forked child process. Signed-off-by: Michael Tremer --- diff --git a/src/libpakfire/worker.c b/src/libpakfire/worker.c index 6100056ba..0ec7a9548 100644 --- a/src/libpakfire/worker.c +++ b/src/libpakfire/worker.c @@ -64,6 +64,14 @@ struct pakfire_worker { // PID File Descriptor int pidfd; +}; + +struct pakfire_worker_ctx { + // The main context + struct pakfire_ctx* ctx; + + // The worker + struct pakfire_worker* worker; // Event Loop sd_event* loop; @@ -204,14 +212,6 @@ static int pakfire_parse_job(struct pakfire_worker* worker, json_object* data) { } static void pakfire_worker_free(struct pakfire_worker* worker) { - if (worker->control) - pakfire_xfer_unref(worker->control); - if (worker->client) - pakfire_httpclient_unref(worker->client); - if (worker->connect_timer) - sd_event_source_unref(worker->connect_timer); - if (worker->loop) - sd_event_unref(worker->loop); if (worker->daemon) pakfire_daemon_unref(worker->daemon); if (worker->ctx) @@ -226,67 +226,68 @@ static int pakfire_worker_recv(struct pakfire_xfer* xfer, } static int pakfire_worker_closed(struct pakfire_xfer* xfer, int code, void* data) { - struct pakfire_worker* worker = data; + struct pakfire_worker_ctx* ctx = data; int r; // Drop the control connection - if (worker->control) { - pakfire_xfer_unref(worker->control); - worker->control = NULL; + if (ctx->control) { + pakfire_xfer_unref(ctx->control); + ctx->control = NULL; } - CTX_INFO(worker->ctx, "Will attempt to reconnect in %u second(s)\n", - worker->reconnect_holdoff / 1000000); + CTX_INFO(ctx->ctx, "Will attempt to reconnect in %u second(s)\n", + ctx->reconnect_holdoff / 1000000); + + sleep(1); // Set the reconnection timer - r = sd_event_source_set_time_relative(worker->connect_timer, worker->reconnect_holdoff); + r = sd_event_source_set_time_relative(ctx->connect_timer, ctx->reconnect_holdoff); if (r < 0) { - CTX_ERROR(worker->ctx, "Could not set the reconnection timer: %s\n", strerror(-r)); + CTX_ERROR(ctx->ctx, "Could not set the reconnection timer: %s\n", strerror(-r)); return r; } // Activate the timer - r = sd_event_source_set_enabled(worker->connect_timer, SD_EVENT_ONESHOT); + r = sd_event_source_set_enabled(ctx->connect_timer, SD_EVENT_ONESHOT); if (r < 0) { - CTX_ERROR(worker->ctx, "Could not activate the connect timer: %s\n", strerror(-r)); + CTX_ERROR(ctx->ctx, "Could not activate the connect timer: %s\n", strerror(-r)); return r; } // Double the holdoff time - worker->reconnect_holdoff *= 2; + ctx->reconnect_holdoff *= 2; // Cap reconnection attempts to every minute - if (worker->reconnect_holdoff > 60000000) - worker->reconnect_holdoff = 60000000; + if (ctx->reconnect_holdoff > 60000000) + ctx->reconnect_holdoff = 60000000; return 0; } static int pakfire_worker_connected(struct pakfire_xfer* xfer, void* data) { - struct pakfire_worker* worker = data; - int r; + struct pakfire_worker_ctx* ctx = data; - CTX_DEBUG(worker->ctx, "Connected!\n"); + CTX_DEBUG(ctx->ctx, "Connected!\n"); // Store a reference to the control connection - worker->control = pakfire_xfer_ref(xfer); + ctx->control = pakfire_xfer_ref(xfer); return 0; } static int pakfire_worker_connect(sd_event_source* s, uint64_t usec, void* data) { - struct pakfire_worker* worker = data; + struct pakfire_worker_ctx* ctx = data; struct pakfire_xfer* xfer = NULL; char job_id[UUID_STR_LEN]; int r; - CTX_INFO(worker->ctx, "Connecting...\n"); + CTX_INFO(ctx->ctx, "Connecting...\n"); // Format the job ID as string - uuid_unparse(worker->job_id, job_id); + uuid_unparse(ctx->worker->job_id, job_id); // Create a new xfer - r = pakfire_httpclient_create_xfer(&xfer, worker->client, "/api/v1/jobs/%s", job_id); + r = pakfire_httpclient_create_xfer(&xfer, ctx->client, "/api/v1/jobs/%s", job_id); if (r) goto ERROR; @@ -297,12 +298,12 @@ static int pakfire_worker_connect(sd_event_source* s, uint64_t usec, void* data) // Make this a WebSocket connection r = pakfire_xfer_socket(xfer, pakfire_worker_connected, - pakfire_worker_recv, NULL, pakfire_worker_closed, daemon); + pakfire_worker_recv, NULL, pakfire_worker_closed, ctx); if (r) goto ERROR; // Enqueue the transfer - r = pakfire_httpclient_enqueue_xfer(worker->client, xfer); + r = pakfire_httpclient_enqueue_xfer(ctx->client, xfer); if (r) goto ERROR; @@ -315,61 +316,6 @@ ERROR: return r; } -static int pakfire_worker_setup_loop(struct pakfire_worker* worker) { - int r; - - // Create a new event loop - r = sd_event_new(&worker->loop); - if (r < 0) { - CTX_ERROR(worker->ctx, "Could not setup event loop: %s\n", strerror(-r)); - return r; - } - - // Setup the reconnection timer - r = sd_event_add_time_relative(worker->loop, &worker->connect_timer, - CLOCK_MONOTONIC, 0, 0, pakfire_worker_connect, worker); - if (r < 0) { - CTX_ERROR(worker->ctx, "Could not register the connection timer: %s\n", strerror(-r)); - return r; - } - - return 0; -} - -static int pakfire_worker_setup_httpclient(struct pakfire_worker* worker) { - struct pakfire_config* config = NULL; - int r; - - // Fetch the configuration - config = pakfire_ctx_get_config(worker->ctx); - if (!config) { - CTX_ERROR(worker->ctx, "Could not fetch configuration: %m\n"); - r = -errno; - goto ERROR; - } - - // Fetch the URL - const char* url = pakfire_config_get(config, "daemon", "url", "https://pakfire.ipfire.org"); - - // Create the HTTP client - r = pakfire_httpclient_create(&worker->client, worker->ctx, worker->loop); - if (r) - goto ERROR; - - // Configure the base URL - r = pakfire_httpclient_set_baseurl(worker->client, url); - if (r) { - CTX_ERROR(worker->ctx, "Could not configure the URL\n"); - goto ERROR; - } - -ERROR: - if (config) - pakfire_config_unref(config); - - return r; -} - int pakfire_worker_create(struct pakfire_worker** worker, struct pakfire_ctx* ctx, struct pakfire_daemon* daemon, json_object* data) { struct pakfire_worker* w = NULL; @@ -389,15 +335,6 @@ int pakfire_worker_create(struct pakfire_worker** worker, struct pakfire_ctx* ct // Store a reference to the daemon w->daemon = pakfire_daemon_ref(daemon); - // Setup a new event loop - r = pakfire_worker_setup_loop(w); - if (r) - goto ERROR; - - // Setup the HTTP client - r = pakfire_worker_setup_httpclient(w); - if (r) - goto ERROR; // Parse the job r = pakfire_parse_job(w, data); @@ -443,7 +380,69 @@ static int pakfire_worker_parent(struct pakfire_worker* worker) { return 0; } +static int pakfire_worker_setup_loop(struct pakfire_worker_ctx* ctx) { + int r; + + // Create a new event loop + r = sd_event_new(&ctx->loop); + if (r < 0) { + CTX_ERROR(ctx->ctx, "Could not setup event loop: %s\n", strerror(-r)); + return r; + } + + // Setup the reconnection timer + r = sd_event_add_time_relative(ctx->loop, &ctx->connect_timer, + CLOCK_MONOTONIC, 0, 0, pakfire_worker_connect, ctx); + if (r < 0) { + CTX_ERROR(ctx->ctx, "Could not register the connection timer: %s\n", strerror(-r)); + return r; + } + + return 0; +} + +static int pakfire_worker_setup_httpclient(struct pakfire_worker_ctx* ctx) { + struct pakfire_config* config = NULL; + int r; + + // Fetch the configuration + config = pakfire_ctx_get_config(ctx->ctx); + if (!config) { + CTX_ERROR(ctx->ctx, "Could not fetch configuration: %m\n"); + r = -errno; + goto ERROR; + } + + // Fetch the URL + const char* url = pakfire_config_get(config, "daemon", "url", "https://pakfire.ipfire.org"); + + // Create the HTTP client + r = pakfire_httpclient_create(&ctx->client, ctx->ctx, ctx->loop); + if (r) + goto ERROR; + + // Configure the base URL + r = pakfire_httpclient_set_baseurl(ctx->client, url); + if (r) { + CTX_ERROR(ctx->ctx, "Could not configure the URL\n"); + goto ERROR; + } + +ERROR: + if (config) + pakfire_config_unref(config); + + return r; +} + static int pakfire_worker_child(struct pakfire_worker* worker) { + struct pakfire_worker_ctx ctx = { + .ctx = worker->ctx, + .worker = worker, + + // Reconnect after one second + .reconnect_holdoff = 1000000, + }; int r; // Fetch our PID @@ -451,14 +450,34 @@ static int pakfire_worker_child(struct pakfire_worker* worker) { CTX_DEBUG(worker->ctx, "Launched worker child as PID %d\n", pid); + // Setup a new event loop + r = pakfire_worker_setup_loop(&ctx); + if (r) + goto ERROR; + + // Setup the HTTP client + r = pakfire_worker_setup_httpclient(&ctx); + if (r) + goto ERROR; + // Run the main loop - r = sd_event_loop(worker->loop); + r = sd_event_loop(ctx.loop); if (r < 0) { CTX_ERROR(worker->ctx, "Could not run the event loop: %s\n", strerror(-r)); goto ERROR; } ERROR: + // Cleanup + if (ctx.control) + pakfire_xfer_unref(ctx.control); + if (ctx.client) + pakfire_httpclient_unref(ctx.client); + if (ctx.connect_timer) + sd_event_source_unref(ctx.connect_timer); + if (ctx.loop) + sd_event_unref(ctx.loop); + return r; }