]> git.ipfire.org Git - pakfire.git/commitdiff
daemon: Enable stat submission only when we have a control connection
authorMichael Tremer <michael.tremer@ipfire.org>
Sun, 15 Sep 2024 04:38:33 +0000 (04:38 +0000)
committerMichael Tremer <michael.tremer@ipfire.org>
Sun, 15 Sep 2024 04:38:33 +0000 (04:38 +0000)
Signed-off-by: Michael Tremer <michael.tremer@ipfire.org>
src/libpakfire/daemon.c
src/libpakfire/include/pakfire/xfer.h
src/libpakfire/xfer.c

index 0233a4827bca7f7d877eaad2181578d155f7ac5e..964b29fcc9f6f98c0a526e90af220e45a32cba7f 100644 (file)
@@ -51,7 +51,7 @@ struct pakfire_daemon {
        unsigned int reconnect_holdoff;
 
        // Timer for submitting stats
-       sd_event_source* stat_timer;
+       sd_event_source* stats_timer;
 
        // Workers
        struct pakfire_worker* workers[MAX_WORKERS];
@@ -67,172 +67,6 @@ static int pakfire_daemon_terminate(sd_event_source* source,
        return sd_event_exit(sd_event_source_get_event(source), 0);
 }
 
-/*
-       Called when a new job has been received
-*/
-static int pakfire_daemon_job(struct pakfire_daemon* daemon, json_object* m) {
-       struct pakfire_worker* worker = NULL;
-       struct json_object* data = NULL;
-       int r;
-
-       // Fetch the data from the message
-       if (!json_object_object_get_ex(m, "data", &data)) {
-               CTX_ERROR(daemon->ctx, "Job does not have any data\n");
-
-               return -EINVAL;
-       }
-
-       // Create a new worker
-       r = pakfire_worker_create(&worker, daemon->ctx, data);
-       if (r) {
-               CTX_ERROR(daemon->ctx, "Could not create a new worker: %s\n", strerror(-r));
-
-               goto ERROR;
-       }
-
-       // Store the new worker
-       for (unsigned int i = 0; i < MAX_WORKERS; i++) {
-               if (!daemon->workers[i]) {
-                       daemon->workers[i] = pakfire_worker_ref(worker);
-                       break;
-               }
-       }
-
-       // Increment the number of running workers
-       daemon->running_workers++;
-
-       // XXX TODO We need to do something with the new worker
-
-ERROR:
-       if (worker)
-               pakfire_worker_unref(worker);
-
-       return r;
-}
-
-static int pakfire_daemon_recv(struct pakfire_xfer* xfer,
-               const char* message, const size_t size, void* data) {
-       struct pakfire_daemon* daemon = data;
-       struct json_object* type = NULL;
-       struct json_object* m = NULL;
-       int r;
-
-       // Reset the holdoff timer
-       daemon->reconnect_holdoff = 1000000;
-
-       // Parse the JSON message
-       m = pakfire_json_parse(daemon->ctx, message, size);
-       if (!m) {
-               r = -errno;
-               goto ERROR;
-       }
-
-       // Fetch the message type
-       if (!json_object_object_get_ex(m, "type", &type)) {
-               CTX_ERROR(daemon->ctx, "Invalid message with missing type\n");
-
-               return -EINVAL;
-       }
-
-       // Fetch the type
-       const char* t = json_object_get_string(type);
-
-       // Handle new jobs
-       if (strcmp(t, "job") == 0) {
-               r = pakfire_daemon_job(daemon, m);
-
-       } else {
-               CTX_ERROR(daemon->ctx, "Unknown message. Ignoring:\n%s\n",
-                       json_object_to_json_string_ext(m, JSON_C_TO_STRING_SPACED | JSON_C_TO_STRING_PRETTY));
-
-               r = 0;
-       }
-
-ERROR:
-       if (m)
-               json_object_put(m);
-
-       return r;
-}
-
-/*
-       This function is called whenever the connection to the build service could not
-       be established or was interrupted. It will try to reconnect.
-*/
-static int pakfire_daemon_close(struct pakfire_xfer* xfer, int code, void* data) {
-       struct pakfire_daemon* daemon = data;
-       int r;
-
-       CTX_INFO(daemon->ctx, "Will attempt to reconnect in %u second(s)\n",
-               daemon->reconnect_holdoff / 1000000);
-
-       // Set the reconnection timer
-       r = sd_event_source_set_time_relative(daemon->connect_timer, daemon->reconnect_holdoff);
-       if (r < 0) {
-               CTX_ERROR(daemon->ctx, "Could not set the reconnection timer: %s\n", strerror(-r));
-               return r;
-       }
-
-       // Activate the timer
-       r = sd_event_source_set_enabled(daemon->connect_timer, SD_EVENT_ONESHOT);
-       if (r < 0) {
-               CTX_ERROR(daemon->ctx, "Could not activate the connect timer: %s\n", strerror(-r));
-               return r;
-       }
-
-       // Double the holdoff time
-       daemon->reconnect_holdoff *= 2;
-
-       // Cap reconnection attempts to every minute
-       if (daemon->reconnect_holdoff > 60000000)
-               daemon->reconnect_holdoff = 60000000;
-
-       return 0;
-}
-
-static int pakfire_daemon_connect(sd_event_source* s, uint64_t usec, void* data) {
-       struct pakfire_daemon* daemon = data;
-       struct pakfire_xfer* xfer = NULL;
-       int r;
-
-       CTX_INFO(daemon->ctx, "Connecting...\n");
-
-       // Drop any previous control connections
-       if (daemon->control) {
-               pakfire_xfer_unref(daemon->control);
-               daemon->control = NULL;
-       }
-
-       // Create a new xfer
-       r = pakfire_httpclient_create_xfer(&xfer, daemon->client, "/api/v1/builders/control");
-       if (r)
-               goto ERROR;
-
-       // Enable authentication
-       r = pakfire_xfer_auth(xfer);
-       if (r)
-               goto ERROR;
-
-       // Make this a WebSocket connection
-       r = pakfire_xfer_socket(xfer, pakfire_daemon_recv, NULL, pakfire_daemon_close, daemon);
-       if (r)
-               goto ERROR;
-
-       // Enqueue the transfer
-       r = pakfire_httpclient_enqueue_xfer(daemon->client, xfer);
-       if (r)
-               goto ERROR;
-
-       // If everything was successful up to here, we make this our new control connection
-       daemon->control = pakfire_xfer_ref(xfer);
-
-ERROR:
-       if (xfer)
-               pakfire_xfer_unref(xfer);
-
-       return r;
-}
-
 static int pakfire_daemon_submit_stats(sd_event_source* s, uint64_t usec, void* data) {
        struct pakfire_daemon* daemon = data;
        struct pakfire_cpuinfo cpuinfo = {};
@@ -249,7 +83,7 @@ static int pakfire_daemon_submit_stats(sd_event_source* s, uint64_t usec, void*
        uint64_t next = (daemon->running_workers > 0) ? 5000000 : 30000000;
 
        // Reset the timer
-       r = sd_event_source_set_time_relative(daemon->stat_timer, next);
+       r = sd_event_source_set_time_relative(daemon->stats_timer, next);
        if (r < 0) {
                CTX_ERROR(daemon->ctx, "Could not set the stat timer: %s\n", strerror(-r));
                return r;
@@ -261,6 +95,8 @@ static int pakfire_daemon_submit_stats(sd_event_source* s, uint64_t usec, void*
                return 0;
        }
 
+       CTX_DEBUG(daemon->ctx, "Submitting stats...\n");
+
        // Fetch the distro
        const struct pakfire_distro* distro = pakfire_ctx_get_distro(daemon->ctx);
 
@@ -469,6 +305,213 @@ ERROR:
        return r;
 }
 
+/*
+       Called when a new job has been received
+*/
+static int pakfire_daemon_job(struct pakfire_daemon* daemon, json_object* m) {
+       struct pakfire_worker* worker = NULL;
+       struct json_object* data = NULL;
+       int r;
+
+       // Fetch the data from the message
+       if (!json_object_object_get_ex(m, "data", &data)) {
+               CTX_ERROR(daemon->ctx, "Job does not have any data\n");
+
+               return -EINVAL;
+       }
+
+       // Create a new worker
+       r = pakfire_worker_create(&worker, daemon->ctx, data);
+       if (r) {
+               CTX_ERROR(daemon->ctx, "Could not create a new worker: %s\n", strerror(-r));
+
+               goto ERROR;
+       }
+
+       // Launch the worker
+       r = pakfire_worker_launch(worker, daemon->loop);
+       if (r < 0) {
+               CTX_ERROR(daemon->ctx, "Could not launch worker: %s\n", strerror(-r));
+
+               goto ERROR;
+       }
+
+       // Store the worker
+       for (unsigned int i = 0; i < MAX_WORKERS; i++) {
+               if (!daemon->workers[i]) {
+                       daemon->workers[i] = pakfire_worker_ref(worker);
+                       break;
+               }
+       }
+
+       // Increment the number of running workers
+       daemon->running_workers++;
+
+       // XXX TODO We need to do something with the new worker
+
+ERROR:
+       if (worker)
+               pakfire_worker_unref(worker);
+
+       return r;
+}
+
+static int pakfire_daemon_recv(struct pakfire_xfer* xfer,
+               const char* message, const size_t size, void* data) {
+       struct pakfire_daemon* daemon = data;
+       struct json_object* type = NULL;
+       struct json_object* m = NULL;
+       int r;
+
+       // Reset the holdoff timer
+       daemon->reconnect_holdoff = 1000000;
+
+       // Parse the JSON message
+       m = pakfire_json_parse(daemon->ctx, message, size);
+       if (!m) {
+               r = -errno;
+               goto ERROR;
+       }
+
+       // Fetch the message type
+       if (!json_object_object_get_ex(m, "type", &type)) {
+               CTX_ERROR(daemon->ctx, "Invalid message with missing type\n");
+
+               return -EINVAL;
+       }
+
+       // Fetch the type
+       const char* t = json_object_get_string(type);
+
+       // Handle new jobs
+       if (strcmp(t, "job") == 0) {
+               r = pakfire_daemon_job(daemon, m);
+
+       } else {
+               CTX_ERROR(daemon->ctx, "Unknown message. Ignoring:\n%s\n",
+                       json_object_to_json_string_ext(m, JSON_C_TO_STRING_SPACED | JSON_C_TO_STRING_PRETTY));
+
+               r = 0;
+       }
+
+ERROR:
+       if (m)
+               json_object_put(m);
+
+       return r;
+}
+
+/*
+       This function is called whenever the connection to the build service could not
+       be established or was interrupted. It will try to reconnect.
+*/
+static int pakfire_daemon_close(struct pakfire_xfer* xfer, int code, void* data) {
+       struct pakfire_daemon* daemon = data;
+       int r;
+
+       // Drop the control connection
+       if (daemon->control) {
+               pakfire_xfer_unref(daemon->control);
+               daemon->control = NULL;
+       }
+
+       CTX_INFO(daemon->ctx, "Will attempt to reconnect in %u second(s)\n",
+               daemon->reconnect_holdoff / 1000000);
+
+       // Set the reconnection timer
+       r = sd_event_source_set_time_relative(daemon->connect_timer, daemon->reconnect_holdoff);
+       if (r < 0) {
+               CTX_ERROR(daemon->ctx, "Could not set the reconnection timer: %s\n", strerror(-r));
+               return r;
+       }
+
+       // Activate the timer
+       r = sd_event_source_set_enabled(daemon->connect_timer, SD_EVENT_ONESHOT);
+       if (r < 0) {
+               CTX_ERROR(daemon->ctx, "Could not activate the connect timer: %s\n", strerror(-r));
+               return r;
+       }
+
+       // Double the holdoff time
+       daemon->reconnect_holdoff *= 2;
+
+       // Cap reconnection attempts to every minute
+       if (daemon->reconnect_holdoff > 60000000)
+               daemon->reconnect_holdoff = 60000000;
+
+       // Destroy the stats timer
+       if (daemon->stats_timer) {
+               sd_event_source_unref(daemon->stats_timer);
+               daemon->stats_timer = NULL;
+       }
+
+       return 0;
+}
+
+static int pakfire_daemon_connected(struct pakfire_xfer* xfer, void* data) {
+       struct pakfire_daemon* daemon = data;
+       int r;
+
+       CTX_DEBUG(daemon->ctx, "Connected!\n");
+
+       // Store a reference to the control connection
+       daemon->control = pakfire_xfer_ref(xfer);
+
+       // Submit stats
+       r = sd_event_add_time_relative(daemon->loop, &daemon->stats_timer,
+               CLOCK_MONOTONIC, 0, 0, pakfire_daemon_submit_stats, daemon);
+       if (r < 0) {
+               CTX_ERROR(daemon->ctx, "Could not register the stat timer: %s\n", strerror(-r));
+               return r;
+       }
+
+       // Submit stats continuously
+       r = sd_event_source_set_enabled(daemon->stats_timer, SD_EVENT_ON);
+       if (r < 0) {
+               CTX_ERROR(daemon->ctx, "Could not activate the stat timer: %s\n", strerror(-r));
+               return r;
+       }
+
+       return 0;
+}
+
+static int pakfire_daemon_connect(sd_event_source* s, uint64_t usec, void* data) {
+       struct pakfire_daemon* daemon = data;
+       struct pakfire_xfer* xfer = NULL;
+       int r;
+
+       CTX_INFO(daemon->ctx, "Connecting...\n");
+
+       // Create a new xfer
+       r = pakfire_httpclient_create_xfer(&xfer, daemon->client, "/api/v1/builders/control");
+       if (r)
+               goto ERROR;
+
+       // Enable authentication
+       r = pakfire_xfer_auth(xfer);
+       if (r)
+               goto ERROR;
+
+       // Make this a WebSocket connection
+       r = pakfire_xfer_socket(xfer, pakfire_daemon_connected,
+               pakfire_daemon_recv, NULL, pakfire_daemon_close, daemon);
+       if (r)
+               goto ERROR;
+
+       // Enqueue the transfer
+       r = pakfire_httpclient_enqueue_xfer(daemon->client, xfer);
+       if (r)
+               goto ERROR;
+
+       return 0;
+
+ERROR:
+       if (xfer)
+               pakfire_xfer_unref(xfer);
+
+       return r;
+}
+
 static int pakfire_daemon_setup_loop(struct pakfire_daemon* daemon) {
        int r;
 
@@ -510,21 +553,6 @@ static int pakfire_daemon_setup_loop(struct pakfire_daemon* daemon) {
                return r;
        }
 
-       // Submit stats
-       r = sd_event_add_time_relative(daemon->loop, &daemon->stat_timer,
-               CLOCK_MONOTONIC, 0, 0, pakfire_daemon_submit_stats, daemon);
-       if (r < 0) {
-               CTX_ERROR(daemon->ctx, "Could not register the stat timer: %s\n", strerror(-r));
-               return r;
-       }
-
-       // Submit stats continuously
-       r = sd_event_source_set_enabled(daemon->stat_timer, SD_EVENT_ON);
-       if (r < 0) {
-               CTX_ERROR(daemon->ctx, "Could not activate the stat timer: %s\n", strerror(-r));
-               return r;
-       }
-
        return 0;
 }
 
@@ -565,8 +593,8 @@ ERROR:
 static void pakfire_daemon_free(struct pakfire_daemon* daemon) {
        if (daemon->connect_timer)
                sd_event_source_unref(daemon->connect_timer);
-       if (daemon->stat_timer)
-               sd_event_source_unref(daemon->stat_timer);
+       if (daemon->stats_timer)
+               sd_event_source_unref(daemon->stats_timer);
        if (daemon->client)
                pakfire_httpclient_unref(daemon->client);
        if (daemon->control)
index 57250ad888b6f1a388abc27b4dcf78b5b581b07b..dfc7eaac981917816a8c2fa57e5aaa17ba4378c3 100644 (file)
@@ -127,12 +127,13 @@ pakfire_xfer_error_code_t pakfire_xfer_run_api_request(
        struct pakfire_xfer* xfer, struct json_object** response);
 
 // WebSocket
+typedef int (*pakfire_xfer_open_callback)(struct pakfire_xfer* xfer, void* data);
 typedef int (*pakfire_xfer_recv_callback)(struct pakfire_xfer* xfer, const char* message, const size_t size, void* data);
 typedef int (*pakfire_xfer_send_callback)(struct pakfire_xfer* xfer, const char* message, const size_t size, void* data);
 typedef int (*pakfire_xfer_close_callback)(struct pakfire_xfer* xfer, int code, void* data);
 
-int pakfire_xfer_socket(struct pakfire_xfer* xfer, pakfire_xfer_recv_callback recv,
-       pakfire_xfer_send_callback send, pakfire_xfer_close_callback close, void* data);
+int pakfire_xfer_socket(struct pakfire_xfer* xfer, pakfire_xfer_open_callback open,
+       pakfire_xfer_recv_callback recv, pakfire_xfer_send_callback send, pakfire_xfer_close_callback close, void* data);
 
 int pakfire_xfer_send_message(struct pakfire_xfer* xfer, const char* message, const size_t length);
 
index 43e69e7aabd711381e87243e8b29323936f7bd41..bf892a09c90eb811cc83ee5eac8d451247d332be 100644 (file)
@@ -110,6 +110,7 @@ struct pakfire_xfer {
 
        // Callbacks
        struct pakfire_xfer_callbacks {
+               pakfire_xfer_open_callback open;
                pakfire_xfer_recv_callback recv;
                pakfire_xfer_send_callback send;
                pakfire_xfer_close_callback close;
@@ -1033,6 +1034,13 @@ static pakfire_xfer_error_code_t pakfire_xfer_done_socket(struct pakfire_xfer* x
 
        CTX_DEBUG(xfer->ctx, "WebSocket registered with event loop\n");
 
+       // Call the open callback
+       if (xfer->callbacks.open) {
+               r = xfer->callbacks.open(xfer, xfer->callbacks.data);
+               if (r)
+                       goto ERROR;
+       }
+
 ERROR:
        if (loop)
                sd_event_unref(loop);
@@ -1504,11 +1512,13 @@ int pakfire_xfer_prepare(struct pakfire_xfer* xfer, struct pakfire_progress* pro
        return 0;
 }
 
-int pakfire_xfer_socket(struct pakfire_xfer* xfer, pakfire_xfer_recv_callback recv,
-               pakfire_xfer_send_callback send, pakfire_xfer_close_callback close, void* data) {
+int pakfire_xfer_socket(struct pakfire_xfer* xfer, pakfire_xfer_open_callback open,
+               pakfire_xfer_recv_callback recv, pakfire_xfer_send_callback send,
+               pakfire_xfer_close_callback close, void* data) {
        xfer->direction = PAKFIRE_XFER_SOCKET;
 
        // Store the callbacks
+       xfer->callbacks.open  = open;
        xfer->callbacks.recv  = recv;
        xfer->callbacks.send  = send;
        xfer->callbacks.close = close;