From: Michael Tremer Date: Sun, 15 Sep 2024 04:38:33 +0000 (+0000) Subject: daemon: Enable stat submission only when we have a control connection X-Git-Tag: 0.9.30~1185 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=a9196413dfb0c8776d559d866a08ad21f52fdf96;p=pakfire.git daemon: Enable stat submission only when we have a control connection Signed-off-by: Michael Tremer --- diff --git a/src/libpakfire/daemon.c b/src/libpakfire/daemon.c index 0233a4827..964b29fcc 100644 --- a/src/libpakfire/daemon.c +++ b/src/libpakfire/daemon.c @@ -51,7 +51,7 @@ struct pakfire_daemon { unsigned int reconnect_holdoff; // Timer for submitting stats - sd_event_source* stat_timer; + sd_event_source* stats_timer; // Workers struct pakfire_worker* workers[MAX_WORKERS]; @@ -67,172 +67,6 @@ static int pakfire_daemon_terminate(sd_event_source* source, return sd_event_exit(sd_event_source_get_event(source), 0); } -/* - Called when a new job has been received -*/ -static int pakfire_daemon_job(struct pakfire_daemon* daemon, json_object* m) { - struct pakfire_worker* worker = NULL; - struct json_object* data = NULL; - int r; - - // Fetch the data from the message - if (!json_object_object_get_ex(m, "data", &data)) { - CTX_ERROR(daemon->ctx, "Job does not have any data\n"); - - return -EINVAL; - } - - // Create a new worker - r = pakfire_worker_create(&worker, daemon->ctx, data); - if (r) { - CTX_ERROR(daemon->ctx, "Could not create a new worker: %s\n", strerror(-r)); - - goto ERROR; - } - - // Store the new worker - for (unsigned int i = 0; i < MAX_WORKERS; i++) { - if (!daemon->workers[i]) { - daemon->workers[i] = pakfire_worker_ref(worker); - break; - } - } - - // Increment the number of running workers - daemon->running_workers++; - - // XXX TODO We need to do something with the new worker - -ERROR: - if (worker) - pakfire_worker_unref(worker); - - return r; -} - -static int pakfire_daemon_recv(struct pakfire_xfer* xfer, - const char* message, const size_t size, void* data) { - struct pakfire_daemon* daemon = data; - struct json_object* type = NULL; - struct json_object* m = NULL; - int r; - - // Reset the holdoff timer - daemon->reconnect_holdoff = 1000000; - - // Parse the JSON message - m = pakfire_json_parse(daemon->ctx, message, size); - if (!m) { - r = -errno; - goto ERROR; - } - - // Fetch the message type - if (!json_object_object_get_ex(m, "type", &type)) { - CTX_ERROR(daemon->ctx, "Invalid message with missing type\n"); - - return -EINVAL; - } - - // Fetch the type - const char* t = json_object_get_string(type); - - // Handle new jobs - if (strcmp(t, "job") == 0) { - r = pakfire_daemon_job(daemon, m); - - } else { - CTX_ERROR(daemon->ctx, "Unknown message. Ignoring:\n%s\n", - json_object_to_json_string_ext(m, JSON_C_TO_STRING_SPACED | JSON_C_TO_STRING_PRETTY)); - - r = 0; - } - -ERROR: - if (m) - json_object_put(m); - - return r; -} - -/* - This function is called whenever the connection to the build service could not - be established or was interrupted. It will try to reconnect. -*/ -static int pakfire_daemon_close(struct pakfire_xfer* xfer, int code, void* data) { - struct pakfire_daemon* daemon = data; - int r; - - CTX_INFO(daemon->ctx, "Will attempt to reconnect in %u second(s)\n", - daemon->reconnect_holdoff / 1000000); - - // Set the reconnection timer - r = sd_event_source_set_time_relative(daemon->connect_timer, daemon->reconnect_holdoff); - if (r < 0) { - CTX_ERROR(daemon->ctx, "Could not set the reconnection timer: %s\n", strerror(-r)); - return r; - } - - // Activate the timer - r = sd_event_source_set_enabled(daemon->connect_timer, SD_EVENT_ONESHOT); - if (r < 0) { - CTX_ERROR(daemon->ctx, "Could not activate the connect timer: %s\n", strerror(-r)); - return r; - } - - // Double the holdoff time - daemon->reconnect_holdoff *= 2; - - // Cap reconnection attempts to every minute - if (daemon->reconnect_holdoff > 60000000) - daemon->reconnect_holdoff = 60000000; - - return 0; -} - -static int pakfire_daemon_connect(sd_event_source* s, uint64_t usec, void* data) { - struct pakfire_daemon* daemon = data; - struct pakfire_xfer* xfer = NULL; - int r; - - CTX_INFO(daemon->ctx, "Connecting...\n"); - - // Drop any previous control connections - if (daemon->control) { - pakfire_xfer_unref(daemon->control); - daemon->control = NULL; - } - - // Create a new xfer - r = pakfire_httpclient_create_xfer(&xfer, daemon->client, "/api/v1/builders/control"); - if (r) - goto ERROR; - - // Enable authentication - r = pakfire_xfer_auth(xfer); - if (r) - goto ERROR; - - // Make this a WebSocket connection - r = pakfire_xfer_socket(xfer, pakfire_daemon_recv, NULL, pakfire_daemon_close, daemon); - if (r) - goto ERROR; - - // Enqueue the transfer - r = pakfire_httpclient_enqueue_xfer(daemon->client, xfer); - if (r) - goto ERROR; - - // If everything was successful up to here, we make this our new control connection - daemon->control = pakfire_xfer_ref(xfer); - -ERROR: - if (xfer) - pakfire_xfer_unref(xfer); - - return r; -} - static int pakfire_daemon_submit_stats(sd_event_source* s, uint64_t usec, void* data) { struct pakfire_daemon* daemon = data; struct pakfire_cpuinfo cpuinfo = {}; @@ -249,7 +83,7 @@ static int pakfire_daemon_submit_stats(sd_event_source* s, uint64_t usec, void* uint64_t next = (daemon->running_workers > 0) ? 5000000 : 30000000; // Reset the timer - r = sd_event_source_set_time_relative(daemon->stat_timer, next); + r = sd_event_source_set_time_relative(daemon->stats_timer, next); if (r < 0) { CTX_ERROR(daemon->ctx, "Could not set the stat timer: %s\n", strerror(-r)); return r; @@ -261,6 +95,8 @@ static int pakfire_daemon_submit_stats(sd_event_source* s, uint64_t usec, void* return 0; } + CTX_DEBUG(daemon->ctx, "Submitting stats...\n"); + // Fetch the distro const struct pakfire_distro* distro = pakfire_ctx_get_distro(daemon->ctx); @@ -469,6 +305,213 @@ ERROR: return r; } +/* + Called when a new job has been received +*/ +static int pakfire_daemon_job(struct pakfire_daemon* daemon, json_object* m) { + struct pakfire_worker* worker = NULL; + struct json_object* data = NULL; + int r; + + // Fetch the data from the message + if (!json_object_object_get_ex(m, "data", &data)) { + CTX_ERROR(daemon->ctx, "Job does not have any data\n"); + + return -EINVAL; + } + + // Create a new worker + r = pakfire_worker_create(&worker, daemon->ctx, data); + if (r) { + CTX_ERROR(daemon->ctx, "Could not create a new worker: %s\n", strerror(-r)); + + goto ERROR; + } + + // Launch the worker + r = pakfire_worker_launch(worker, daemon->loop); + if (r < 0) { + CTX_ERROR(daemon->ctx, "Could not launch worker: %s\n", strerror(-r)); + + goto ERROR; + } + + // Store the worker + for (unsigned int i = 0; i < MAX_WORKERS; i++) { + if (!daemon->workers[i]) { + daemon->workers[i] = pakfire_worker_ref(worker); + break; + } + } + + // Increment the number of running workers + daemon->running_workers++; + + // XXX TODO We need to do something with the new worker + +ERROR: + if (worker) + pakfire_worker_unref(worker); + + return r; +} + +static int pakfire_daemon_recv(struct pakfire_xfer* xfer, + const char* message, const size_t size, void* data) { + struct pakfire_daemon* daemon = data; + struct json_object* type = NULL; + struct json_object* m = NULL; + int r; + + // Reset the holdoff timer + daemon->reconnect_holdoff = 1000000; + + // Parse the JSON message + m = pakfire_json_parse(daemon->ctx, message, size); + if (!m) { + r = -errno; + goto ERROR; + } + + // Fetch the message type + if (!json_object_object_get_ex(m, "type", &type)) { + CTX_ERROR(daemon->ctx, "Invalid message with missing type\n"); + + return -EINVAL; + } + + // Fetch the type + const char* t = json_object_get_string(type); + + // Handle new jobs + if (strcmp(t, "job") == 0) { + r = pakfire_daemon_job(daemon, m); + + } else { + CTX_ERROR(daemon->ctx, "Unknown message. Ignoring:\n%s\n", + json_object_to_json_string_ext(m, JSON_C_TO_STRING_SPACED | JSON_C_TO_STRING_PRETTY)); + + r = 0; + } + +ERROR: + if (m) + json_object_put(m); + + return r; +} + +/* + This function is called whenever the connection to the build service could not + be established or was interrupted. It will try to reconnect. +*/ +static int pakfire_daemon_close(struct pakfire_xfer* xfer, int code, void* data) { + struct pakfire_daemon* daemon = data; + int r; + + // Drop the control connection + if (daemon->control) { + pakfire_xfer_unref(daemon->control); + daemon->control = NULL; + } + + CTX_INFO(daemon->ctx, "Will attempt to reconnect in %u second(s)\n", + daemon->reconnect_holdoff / 1000000); + + // Set the reconnection timer + r = sd_event_source_set_time_relative(daemon->connect_timer, daemon->reconnect_holdoff); + if (r < 0) { + CTX_ERROR(daemon->ctx, "Could not set the reconnection timer: %s\n", strerror(-r)); + return r; + } + + // Activate the timer + r = sd_event_source_set_enabled(daemon->connect_timer, SD_EVENT_ONESHOT); + if (r < 0) { + CTX_ERROR(daemon->ctx, "Could not activate the connect timer: %s\n", strerror(-r)); + return r; + } + + // Double the holdoff time + daemon->reconnect_holdoff *= 2; + + // Cap reconnection attempts to every minute + if (daemon->reconnect_holdoff > 60000000) + daemon->reconnect_holdoff = 60000000; + + // Destroy the stats timer + if (daemon->stats_timer) { + sd_event_source_unref(daemon->stats_timer); + daemon->stats_timer = NULL; + } + + return 0; +} + +static int pakfire_daemon_connected(struct pakfire_xfer* xfer, void* data) { + struct pakfire_daemon* daemon = data; + int r; + + CTX_DEBUG(daemon->ctx, "Connected!\n"); + + // Store a reference to the control connection + daemon->control = pakfire_xfer_ref(xfer); + + // Submit stats + r = sd_event_add_time_relative(daemon->loop, &daemon->stats_timer, + CLOCK_MONOTONIC, 0, 0, pakfire_daemon_submit_stats, daemon); + if (r < 0) { + CTX_ERROR(daemon->ctx, "Could not register the stat timer: %s\n", strerror(-r)); + return r; + } + + // Submit stats continuously + r = sd_event_source_set_enabled(daemon->stats_timer, SD_EVENT_ON); + if (r < 0) { + CTX_ERROR(daemon->ctx, "Could not activate the stat timer: %s\n", strerror(-r)); + return r; + } + + return 0; +} + +static int pakfire_daemon_connect(sd_event_source* s, uint64_t usec, void* data) { + struct pakfire_daemon* daemon = data; + struct pakfire_xfer* xfer = NULL; + int r; + + CTX_INFO(daemon->ctx, "Connecting...\n"); + + // Create a new xfer + r = pakfire_httpclient_create_xfer(&xfer, daemon->client, "/api/v1/builders/control"); + if (r) + goto ERROR; + + // Enable authentication + r = pakfire_xfer_auth(xfer); + if (r) + goto ERROR; + + // Make this a WebSocket connection + r = pakfire_xfer_socket(xfer, pakfire_daemon_connected, + pakfire_daemon_recv, NULL, pakfire_daemon_close, daemon); + if (r) + goto ERROR; + + // Enqueue the transfer + r = pakfire_httpclient_enqueue_xfer(daemon->client, xfer); + if (r) + goto ERROR; + + return 0; + +ERROR: + if (xfer) + pakfire_xfer_unref(xfer); + + return r; +} + static int pakfire_daemon_setup_loop(struct pakfire_daemon* daemon) { int r; @@ -510,21 +553,6 @@ static int pakfire_daemon_setup_loop(struct pakfire_daemon* daemon) { return r; } - // Submit stats - r = sd_event_add_time_relative(daemon->loop, &daemon->stat_timer, - CLOCK_MONOTONIC, 0, 0, pakfire_daemon_submit_stats, daemon); - if (r < 0) { - CTX_ERROR(daemon->ctx, "Could not register the stat timer: %s\n", strerror(-r)); - return r; - } - - // Submit stats continuously - r = sd_event_source_set_enabled(daemon->stat_timer, SD_EVENT_ON); - if (r < 0) { - CTX_ERROR(daemon->ctx, "Could not activate the stat timer: %s\n", strerror(-r)); - return r; - } - return 0; } @@ -565,8 +593,8 @@ ERROR: static void pakfire_daemon_free(struct pakfire_daemon* daemon) { if (daemon->connect_timer) sd_event_source_unref(daemon->connect_timer); - if (daemon->stat_timer) - sd_event_source_unref(daemon->stat_timer); + if (daemon->stats_timer) + sd_event_source_unref(daemon->stats_timer); if (daemon->client) pakfire_httpclient_unref(daemon->client); if (daemon->control) diff --git a/src/libpakfire/include/pakfire/xfer.h b/src/libpakfire/include/pakfire/xfer.h index 57250ad88..dfc7eaac9 100644 --- a/src/libpakfire/include/pakfire/xfer.h +++ b/src/libpakfire/include/pakfire/xfer.h @@ -127,12 +127,13 @@ pakfire_xfer_error_code_t pakfire_xfer_run_api_request( struct pakfire_xfer* xfer, struct json_object** response); // WebSocket +typedef int (*pakfire_xfer_open_callback)(struct pakfire_xfer* xfer, void* data); typedef int (*pakfire_xfer_recv_callback)(struct pakfire_xfer* xfer, const char* message, const size_t size, void* data); typedef int (*pakfire_xfer_send_callback)(struct pakfire_xfer* xfer, const char* message, const size_t size, void* data); typedef int (*pakfire_xfer_close_callback)(struct pakfire_xfer* xfer, int code, void* data); -int pakfire_xfer_socket(struct pakfire_xfer* xfer, pakfire_xfer_recv_callback recv, - pakfire_xfer_send_callback send, pakfire_xfer_close_callback close, void* data); +int pakfire_xfer_socket(struct pakfire_xfer* xfer, pakfire_xfer_open_callback open, + pakfire_xfer_recv_callback recv, pakfire_xfer_send_callback send, pakfire_xfer_close_callback close, void* data); int pakfire_xfer_send_message(struct pakfire_xfer* xfer, const char* message, const size_t length); diff --git a/src/libpakfire/xfer.c b/src/libpakfire/xfer.c index 43e69e7aa..bf892a09c 100644 --- a/src/libpakfire/xfer.c +++ b/src/libpakfire/xfer.c @@ -110,6 +110,7 @@ struct pakfire_xfer { // Callbacks struct pakfire_xfer_callbacks { + pakfire_xfer_open_callback open; pakfire_xfer_recv_callback recv; pakfire_xfer_send_callback send; pakfire_xfer_close_callback close; @@ -1033,6 +1034,13 @@ static pakfire_xfer_error_code_t pakfire_xfer_done_socket(struct pakfire_xfer* x CTX_DEBUG(xfer->ctx, "WebSocket registered with event loop\n"); + // Call the open callback + if (xfer->callbacks.open) { + r = xfer->callbacks.open(xfer, xfer->callbacks.data); + if (r) + goto ERROR; + } + ERROR: if (loop) sd_event_unref(loop); @@ -1504,11 +1512,13 @@ int pakfire_xfer_prepare(struct pakfire_xfer* xfer, struct pakfire_progress* pro return 0; } -int pakfire_xfer_socket(struct pakfire_xfer* xfer, pakfire_xfer_recv_callback recv, - pakfire_xfer_send_callback send, pakfire_xfer_close_callback close, void* data) { +int pakfire_xfer_socket(struct pakfire_xfer* xfer, pakfire_xfer_open_callback open, + pakfire_xfer_recv_callback recv, pakfire_xfer_send_callback send, + pakfire_xfer_close_callback close, void* data) { xfer->direction = PAKFIRE_XFER_SOCKET; // Store the callbacks + xfer->callbacks.open = open; xfer->callbacks.recv = recv; xfer->callbacks.send = send; xfer->callbacks.close = close;