]> git.ipfire.org Git - pakfire.git/commitdiff
worker: Initialize the event loop and cURL in the child process
authorMichael Tremer <michael.tremer@ipfire.org>
Fri, 4 Oct 2024 14:20:37 +0000 (14:20 +0000)
committerMichael Tremer <michael.tremer@ipfire.org>
Fri, 4 Oct 2024 14:20:37 +0000 (14:20 +0000)
cURL is not multi-threaded and so to avoid any problems, we will
initialize everything in the forked child process.

Signed-off-by: Michael Tremer <michael.tremer@ipfire.org>
src/libpakfire/worker.c

index 6100056baa583bbad7dad1076561797f0cac9d99..0ec7a954854f3f727d6255bf26ce65854a885bdf 100644 (file)
@@ -64,6 +64,14 @@ struct pakfire_worker {
 
        // PID File Descriptor
        int pidfd;
+};
+
+struct pakfire_worker_ctx {
+       // The main context
+       struct pakfire_ctx* ctx;
+
+       // The worker
+       struct pakfire_worker* worker;
 
        // Event Loop
        sd_event* loop;
@@ -204,14 +212,6 @@ static int pakfire_parse_job(struct pakfire_worker* worker, json_object* data) {
 }
 
 static void pakfire_worker_free(struct pakfire_worker* worker) {
-       if (worker->control)
-               pakfire_xfer_unref(worker->control);
-       if (worker->client)
-               pakfire_httpclient_unref(worker->client);
-       if (worker->connect_timer)
-               sd_event_source_unref(worker->connect_timer);
-       if (worker->loop)
-               sd_event_unref(worker->loop);
        if (worker->daemon)
                pakfire_daemon_unref(worker->daemon);
        if (worker->ctx)
@@ -226,67 +226,68 @@ static int pakfire_worker_recv(struct pakfire_xfer* xfer,
 }
 
 static int pakfire_worker_closed(struct pakfire_xfer* xfer, int code, void* data) {
-       struct pakfire_worker* worker = data;
+       struct pakfire_worker_ctx* ctx = data;
        int r;
 
        // Drop the control connection
-       if (worker->control) {
-               pakfire_xfer_unref(worker->control);
-               worker->control = NULL;
+       if (ctx->control) {
+               pakfire_xfer_unref(ctx->control);
+               ctx->control = NULL;
        }
 
-       CTX_INFO(worker->ctx, "Will attempt to reconnect in %u second(s)\n",
-               worker->reconnect_holdoff / 1000000);
+       CTX_INFO(ctx->ctx, "Will attempt to reconnect in %u second(s)\n",
+               ctx->reconnect_holdoff / 1000000);
+
+       sleep(1);
 
        // Set the reconnection timer
-       r = sd_event_source_set_time_relative(worker->connect_timer, worker->reconnect_holdoff);
+       r = sd_event_source_set_time_relative(ctx->connect_timer, ctx->reconnect_holdoff);
        if (r < 0) {
-               CTX_ERROR(worker->ctx, "Could not set the reconnection timer: %s\n", strerror(-r));
+               CTX_ERROR(ctx->ctx, "Could not set the reconnection timer: %s\n", strerror(-r));
                return r;
        }
 
        // Activate the timer
-       r = sd_event_source_set_enabled(worker->connect_timer, SD_EVENT_ONESHOT);
+       r = sd_event_source_set_enabled(ctx->connect_timer, SD_EVENT_ONESHOT);
        if (r < 0) {
-               CTX_ERROR(worker->ctx, "Could not activate the connect timer: %s\n", strerror(-r));
+               CTX_ERROR(ctx->ctx, "Could not activate the connect timer: %s\n", strerror(-r));
                return r;
        }
 
        // Double the holdoff time
-       worker->reconnect_holdoff *= 2;
+       ctx->reconnect_holdoff *= 2;
 
        // Cap reconnection attempts to every minute
-       if (worker->reconnect_holdoff > 60000000)
-               worker->reconnect_holdoff = 60000000;
+       if (ctx->reconnect_holdoff > 60000000)
+               ctx->reconnect_holdoff = 60000000;
 
        return 0;
 }
 
 static int pakfire_worker_connected(struct pakfire_xfer* xfer, void* data) {
-       struct pakfire_worker* worker = data;
-       int r;
+       struct pakfire_worker_ctx* ctx = data;
 
-       CTX_DEBUG(worker->ctx, "Connected!\n");
+       CTX_DEBUG(ctx->ctx, "Connected!\n");
 
        // Store a reference to the control connection
-       worker->control = pakfire_xfer_ref(xfer);
+       ctx->control = pakfire_xfer_ref(xfer);
 
        return 0;
 }
 
 static int pakfire_worker_connect(sd_event_source* s, uint64_t usec, void* data) {
-       struct pakfire_worker* worker = data;
+       struct pakfire_worker_ctx* ctx = data;
        struct pakfire_xfer* xfer = NULL;
        char job_id[UUID_STR_LEN];
        int r;
 
-       CTX_INFO(worker->ctx, "Connecting...\n");
+       CTX_INFO(ctx->ctx, "Connecting...\n");
 
        // Format the job ID as string
-       uuid_unparse(worker->job_id, job_id);
+       uuid_unparse(ctx->worker->job_id, job_id);
 
        // Create a new xfer
-       r = pakfire_httpclient_create_xfer(&xfer, worker->client, "/api/v1/jobs/%s", job_id);
+       r = pakfire_httpclient_create_xfer(&xfer, ctx->client, "/api/v1/jobs/%s", job_id);
        if (r)
                goto ERROR;
 
@@ -297,12 +298,12 @@ static int pakfire_worker_connect(sd_event_source* s, uint64_t usec, void* data)
 
        // Make this a WebSocket connection
        r = pakfire_xfer_socket(xfer, pakfire_worker_connected,
-               pakfire_worker_recv, NULL, pakfire_worker_closed, daemon);
+               pakfire_worker_recv, NULL, pakfire_worker_closed, ctx);
        if (r)
                goto ERROR;
 
        // Enqueue the transfer
-       r = pakfire_httpclient_enqueue_xfer(worker->client, xfer);
+       r = pakfire_httpclient_enqueue_xfer(ctx->client, xfer);
        if (r)
                goto ERROR;
 
@@ -315,61 +316,6 @@ ERROR:
        return r;
 }
 
-static int pakfire_worker_setup_loop(struct pakfire_worker* worker) {
-       int r;
-
-       // Create a new event loop
-       r = sd_event_new(&worker->loop);
-       if (r < 0) {
-               CTX_ERROR(worker->ctx, "Could not setup event loop: %s\n", strerror(-r));
-               return r;
-       }
-
-       // Setup the reconnection timer
-       r = sd_event_add_time_relative(worker->loop, &worker->connect_timer,
-               CLOCK_MONOTONIC, 0, 0, pakfire_worker_connect, worker);
-       if (r < 0) {
-               CTX_ERROR(worker->ctx, "Could not register the connection timer: %s\n", strerror(-r));
-               return r;
-       }
-
-       return 0;
-}
-
-static int pakfire_worker_setup_httpclient(struct pakfire_worker* worker) {
-       struct pakfire_config* config = NULL;
-       int r;
-
-       // Fetch the configuration
-       config = pakfire_ctx_get_config(worker->ctx);
-       if (!config) {
-               CTX_ERROR(worker->ctx, "Could not fetch configuration: %m\n");
-               r = -errno;
-               goto ERROR;
-       }
-
-       // Fetch the URL
-       const char* url = pakfire_config_get(config, "daemon", "url", "https://pakfire.ipfire.org");
-
-       // Create the HTTP client
-       r = pakfire_httpclient_create(&worker->client, worker->ctx, worker->loop);
-       if (r)
-               goto ERROR;
-
-       // Configure the base URL
-       r = pakfire_httpclient_set_baseurl(worker->client, url);
-       if (r) {
-               CTX_ERROR(worker->ctx, "Could not configure the URL\n");
-               goto ERROR;
-       }
-
-ERROR:
-       if (config)
-               pakfire_config_unref(config);
-
-       return r;
-}
-
 int pakfire_worker_create(struct pakfire_worker** worker, struct pakfire_ctx* ctx,
                struct pakfire_daemon* daemon, json_object* data) {
        struct pakfire_worker* w = NULL;
@@ -389,15 +335,6 @@ int pakfire_worker_create(struct pakfire_worker** worker, struct pakfire_ctx* ct
        // Store a reference to the daemon
        w->daemon = pakfire_daemon_ref(daemon);
 
-       // Setup a new event loop
-       r = pakfire_worker_setup_loop(w);
-       if (r)
-               goto ERROR;
-
-       // Setup the HTTP client
-       r = pakfire_worker_setup_httpclient(w);
-       if (r)
-               goto ERROR;
 
        // Parse the job
        r = pakfire_parse_job(w, data);
@@ -443,7 +380,69 @@ static int pakfire_worker_parent(struct pakfire_worker* worker) {
        return 0;
 }
 
+static int pakfire_worker_setup_loop(struct pakfire_worker_ctx* ctx) {
+       int r;
+
+       // Create a new event loop
+       r = sd_event_new(&ctx->loop);
+       if (r < 0) {
+               CTX_ERROR(ctx->ctx, "Could not setup event loop: %s\n", strerror(-r));
+               return r;
+       }
+
+       // Setup the reconnection timer
+       r = sd_event_add_time_relative(ctx->loop, &ctx->connect_timer,
+               CLOCK_MONOTONIC, 0, 0, pakfire_worker_connect, ctx);
+       if (r < 0) {
+               CTX_ERROR(ctx->ctx, "Could not register the connection timer: %s\n", strerror(-r));
+               return r;
+       }
+
+       return 0;
+}
+
+static int pakfire_worker_setup_httpclient(struct pakfire_worker_ctx* ctx) {
+       struct pakfire_config* config = NULL;
+       int r;
+
+       // Fetch the configuration
+       config = pakfire_ctx_get_config(ctx->ctx);
+       if (!config) {
+               CTX_ERROR(ctx->ctx, "Could not fetch configuration: %m\n");
+               r = -errno;
+               goto ERROR;
+       }
+
+       // Fetch the URL
+       const char* url = pakfire_config_get(config, "daemon", "url", "https://pakfire.ipfire.org");
+
+       // Create the HTTP client
+       r = pakfire_httpclient_create(&ctx->client, ctx->ctx, ctx->loop);
+       if (r)
+               goto ERROR;
+
+       // Configure the base URL
+       r = pakfire_httpclient_set_baseurl(ctx->client, url);
+       if (r) {
+               CTX_ERROR(ctx->ctx, "Could not configure the URL\n");
+               goto ERROR;
+       }
+
+ERROR:
+       if (config)
+               pakfire_config_unref(config);
+
+       return r;
+}
+
 static int pakfire_worker_child(struct pakfire_worker* worker) {
+       struct pakfire_worker_ctx ctx = {
+               .ctx    = worker->ctx,
+               .worker = worker,
+
+               // Reconnect after one second
+               .reconnect_holdoff = 1000000,
+       };
        int r;
 
        // Fetch our PID
@@ -451,14 +450,34 @@ static int pakfire_worker_child(struct pakfire_worker* worker) {
 
        CTX_DEBUG(worker->ctx, "Launched worker child as PID %d\n", pid);
 
+       // Setup a new event loop
+       r = pakfire_worker_setup_loop(&ctx);
+       if (r)
+               goto ERROR;
+
+       // Setup the HTTP client
+       r = pakfire_worker_setup_httpclient(&ctx);
+       if (r)
+               goto ERROR;
+
        // Run the main loop
-       r = sd_event_loop(worker->loop);
+       r = sd_event_loop(ctx.loop);
        if (r < 0) {
                CTX_ERROR(worker->ctx, "Could not run the event loop: %s\n", strerror(-r));
                goto ERROR;
        }
 
 ERROR:
+       // Cleanup
+       if (ctx.control)
+               pakfire_xfer_unref(ctx.control);
+       if (ctx.client)
+               pakfire_httpclient_unref(ctx.client);
+       if (ctx.connect_timer)
+               sd_event_source_unref(ctx.connect_timer);
+       if (ctx.loop)
+               sd_event_unref(ctx.loop);
+
        return r;
 }