unsigned int reconnect_holdoff;
// Timer for submitting stats
- sd_event_source* stat_timer;
+ sd_event_source* stats_timer;
// Workers
struct pakfire_worker* workers[MAX_WORKERS];
return sd_event_exit(sd_event_source_get_event(source), 0);
}
-/*
- Called when a new job has been received
-*/
-static int pakfire_daemon_job(struct pakfire_daemon* daemon, json_object* m) {
- struct pakfire_worker* worker = NULL;
- struct json_object* data = NULL;
- int r;
-
- // Fetch the data from the message
- if (!json_object_object_get_ex(m, "data", &data)) {
- CTX_ERROR(daemon->ctx, "Job does not have any data\n");
-
- return -EINVAL;
- }
-
- // Create a new worker
- r = pakfire_worker_create(&worker, daemon->ctx, data);
- if (r) {
- CTX_ERROR(daemon->ctx, "Could not create a new worker: %s\n", strerror(-r));
-
- goto ERROR;
- }
-
- // Store the new worker
- for (unsigned int i = 0; i < MAX_WORKERS; i++) {
- if (!daemon->workers[i]) {
- daemon->workers[i] = pakfire_worker_ref(worker);
- break;
- }
- }
-
- // Increment the number of running workers
- daemon->running_workers++;
-
- // XXX TODO We need to do something with the new worker
-
-ERROR:
- if (worker)
- pakfire_worker_unref(worker);
-
- return r;
-}
-
-static int pakfire_daemon_recv(struct pakfire_xfer* xfer,
- const char* message, const size_t size, void* data) {
- struct pakfire_daemon* daemon = data;
- struct json_object* type = NULL;
- struct json_object* m = NULL;
- int r;
-
- // Reset the holdoff timer
- daemon->reconnect_holdoff = 1000000;
-
- // Parse the JSON message
- m = pakfire_json_parse(daemon->ctx, message, size);
- if (!m) {
- r = -errno;
- goto ERROR;
- }
-
- // Fetch the message type
- if (!json_object_object_get_ex(m, "type", &type)) {
- CTX_ERROR(daemon->ctx, "Invalid message with missing type\n");
-
- return -EINVAL;
- }
-
- // Fetch the type
- const char* t = json_object_get_string(type);
-
- // Handle new jobs
- if (strcmp(t, "job") == 0) {
- r = pakfire_daemon_job(daemon, m);
-
- } else {
- CTX_ERROR(daemon->ctx, "Unknown message. Ignoring:\n%s\n",
- json_object_to_json_string_ext(m, JSON_C_TO_STRING_SPACED | JSON_C_TO_STRING_PRETTY));
-
- r = 0;
- }
-
-ERROR:
- if (m)
- json_object_put(m);
-
- return r;
-}
-
-/*
- This function is called whenever the connection to the build service could not
- be established or was interrupted. It will try to reconnect.
-*/
-static int pakfire_daemon_close(struct pakfire_xfer* xfer, int code, void* data) {
- struct pakfire_daemon* daemon = data;
- int r;
-
- CTX_INFO(daemon->ctx, "Will attempt to reconnect in %u second(s)\n",
- daemon->reconnect_holdoff / 1000000);
-
- // Set the reconnection timer
- r = sd_event_source_set_time_relative(daemon->connect_timer, daemon->reconnect_holdoff);
- if (r < 0) {
- CTX_ERROR(daemon->ctx, "Could not set the reconnection timer: %s\n", strerror(-r));
- return r;
- }
-
- // Activate the timer
- r = sd_event_source_set_enabled(daemon->connect_timer, SD_EVENT_ONESHOT);
- if (r < 0) {
- CTX_ERROR(daemon->ctx, "Could not activate the connect timer: %s\n", strerror(-r));
- return r;
- }
-
- // Double the holdoff time
- daemon->reconnect_holdoff *= 2;
-
- // Cap reconnection attempts to every minute
- if (daemon->reconnect_holdoff > 60000000)
- daemon->reconnect_holdoff = 60000000;
-
- return 0;
-}
-
-static int pakfire_daemon_connect(sd_event_source* s, uint64_t usec, void* data) {
- struct pakfire_daemon* daemon = data;
- struct pakfire_xfer* xfer = NULL;
- int r;
-
- CTX_INFO(daemon->ctx, "Connecting...\n");
-
- // Drop any previous control connections
- if (daemon->control) {
- pakfire_xfer_unref(daemon->control);
- daemon->control = NULL;
- }
-
- // Create a new xfer
- r = pakfire_httpclient_create_xfer(&xfer, daemon->client, "/api/v1/builders/control");
- if (r)
- goto ERROR;
-
- // Enable authentication
- r = pakfire_xfer_auth(xfer);
- if (r)
- goto ERROR;
-
- // Make this a WebSocket connection
- r = pakfire_xfer_socket(xfer, pakfire_daemon_recv, NULL, pakfire_daemon_close, daemon);
- if (r)
- goto ERROR;
-
- // Enqueue the transfer
- r = pakfire_httpclient_enqueue_xfer(daemon->client, xfer);
- if (r)
- goto ERROR;
-
- // If everything was successful up to here, we make this our new control connection
- daemon->control = pakfire_xfer_ref(xfer);
-
-ERROR:
- if (xfer)
- pakfire_xfer_unref(xfer);
-
- return r;
-}
-
static int pakfire_daemon_submit_stats(sd_event_source* s, uint64_t usec, void* data) {
struct pakfire_daemon* daemon = data;
struct pakfire_cpuinfo cpuinfo = {};
uint64_t next = (daemon->running_workers > 0) ? 5000000 : 30000000;
// Reset the timer
- r = sd_event_source_set_time_relative(daemon->stat_timer, next);
+ r = sd_event_source_set_time_relative(daemon->stats_timer, next);
if (r < 0) {
CTX_ERROR(daemon->ctx, "Could not set the stat timer: %s\n", strerror(-r));
return r;
return 0;
}
+ CTX_DEBUG(daemon->ctx, "Submitting stats...\n");
+
// Fetch the distro
const struct pakfire_distro* distro = pakfire_ctx_get_distro(daemon->ctx);
return r;
}
+/*
+ Called when a new job has been received
+*/
+static int pakfire_daemon_job(struct pakfire_daemon* daemon, json_object* m) {
+ struct pakfire_worker* worker = NULL;
+ struct json_object* data = NULL;
+ int r;
+
+ // Fetch the data from the message
+ if (!json_object_object_get_ex(m, "data", &data)) {
+ CTX_ERROR(daemon->ctx, "Job does not have any data\n");
+
+ return -EINVAL;
+ }
+
+ // Create a new worker
+ r = pakfire_worker_create(&worker, daemon->ctx, data);
+ if (r) {
+ CTX_ERROR(daemon->ctx, "Could not create a new worker: %s\n", strerror(-r));
+
+ goto ERROR;
+ }
+
+ // Launch the worker
+ r = pakfire_worker_launch(worker, daemon->loop);
+ if (r < 0) {
+ CTX_ERROR(daemon->ctx, "Could not launch worker: %s\n", strerror(-r));
+
+ goto ERROR;
+ }
+
+ // Store the worker
+ for (unsigned int i = 0; i < MAX_WORKERS; i++) {
+ if (!daemon->workers[i]) {
+ daemon->workers[i] = pakfire_worker_ref(worker);
+ break;
+ }
+ }
+
+ // Increment the number of running workers
+ daemon->running_workers++;
+
+ // XXX TODO We need to do something with the new worker
+
+ERROR:
+ if (worker)
+ pakfire_worker_unref(worker);
+
+ return r;
+}
+
+static int pakfire_daemon_recv(struct pakfire_xfer* xfer,
+ const char* message, const size_t size, void* data) {
+ struct pakfire_daemon* daemon = data;
+ struct json_object* type = NULL;
+ struct json_object* m = NULL;
+ int r;
+
+ // Reset the holdoff timer
+ daemon->reconnect_holdoff = 1000000;
+
+ // Parse the JSON message
+ m = pakfire_json_parse(daemon->ctx, message, size);
+ if (!m) {
+ r = -errno;
+ goto ERROR;
+ }
+
+ // Fetch the message type
+ if (!json_object_object_get_ex(m, "type", &type)) {
+ CTX_ERROR(daemon->ctx, "Invalid message with missing type\n");
+
+ return -EINVAL;
+ }
+
+ // Fetch the type
+ const char* t = json_object_get_string(type);
+
+ // Handle new jobs
+ if (strcmp(t, "job") == 0) {
+ r = pakfire_daemon_job(daemon, m);
+
+ } else {
+ CTX_ERROR(daemon->ctx, "Unknown message. Ignoring:\n%s\n",
+ json_object_to_json_string_ext(m, JSON_C_TO_STRING_SPACED | JSON_C_TO_STRING_PRETTY));
+
+ r = 0;
+ }
+
+ERROR:
+ if (m)
+ json_object_put(m);
+
+ return r;
+}
+
+/*
+ This function is called whenever the connection to the build service could not
+ be established or was interrupted. It will try to reconnect.
+*/
+static int pakfire_daemon_close(struct pakfire_xfer* xfer, int code, void* data) {
+ struct pakfire_daemon* daemon = data;
+ int r;
+
+ // Drop the control connection
+ if (daemon->control) {
+ pakfire_xfer_unref(daemon->control);
+ daemon->control = NULL;
+ }
+
+ CTX_INFO(daemon->ctx, "Will attempt to reconnect in %u second(s)\n",
+ daemon->reconnect_holdoff / 1000000);
+
+ // Set the reconnection timer
+ r = sd_event_source_set_time_relative(daemon->connect_timer, daemon->reconnect_holdoff);
+ if (r < 0) {
+ CTX_ERROR(daemon->ctx, "Could not set the reconnection timer: %s\n", strerror(-r));
+ return r;
+ }
+
+ // Activate the timer
+ r = sd_event_source_set_enabled(daemon->connect_timer, SD_EVENT_ONESHOT);
+ if (r < 0) {
+ CTX_ERROR(daemon->ctx, "Could not activate the connect timer: %s\n", strerror(-r));
+ return r;
+ }
+
+ // Double the holdoff time
+ daemon->reconnect_holdoff *= 2;
+
+ // Cap reconnection attempts to every minute
+ if (daemon->reconnect_holdoff > 60000000)
+ daemon->reconnect_holdoff = 60000000;
+
+ // Destroy the stats timer
+ if (daemon->stats_timer) {
+ sd_event_source_unref(daemon->stats_timer);
+ daemon->stats_timer = NULL;
+ }
+
+ return 0;
+}
+
+static int pakfire_daemon_connected(struct pakfire_xfer* xfer, void* data) {
+ struct pakfire_daemon* daemon = data;
+ int r;
+
+ CTX_DEBUG(daemon->ctx, "Connected!\n");
+
+ // Store a reference to the control connection
+ daemon->control = pakfire_xfer_ref(xfer);
+
+ // Submit stats
+ r = sd_event_add_time_relative(daemon->loop, &daemon->stats_timer,
+ CLOCK_MONOTONIC, 0, 0, pakfire_daemon_submit_stats, daemon);
+ if (r < 0) {
+ CTX_ERROR(daemon->ctx, "Could not register the stat timer: %s\n", strerror(-r));
+ return r;
+ }
+
+ // Submit stats continuously
+ r = sd_event_source_set_enabled(daemon->stats_timer, SD_EVENT_ON);
+ if (r < 0) {
+ CTX_ERROR(daemon->ctx, "Could not activate the stat timer: %s\n", strerror(-r));
+ return r;
+ }
+
+ return 0;
+}
+
+static int pakfire_daemon_connect(sd_event_source* s, uint64_t usec, void* data) {
+ struct pakfire_daemon* daemon = data;
+ struct pakfire_xfer* xfer = NULL;
+ int r;
+
+ CTX_INFO(daemon->ctx, "Connecting...\n");
+
+ // Create a new xfer
+ r = pakfire_httpclient_create_xfer(&xfer, daemon->client, "/api/v1/builders/control");
+ if (r)
+ goto ERROR;
+
+ // Enable authentication
+ r = pakfire_xfer_auth(xfer);
+ if (r)
+ goto ERROR;
+
+ // Make this a WebSocket connection
+ r = pakfire_xfer_socket(xfer, pakfire_daemon_connected,
+ pakfire_daemon_recv, NULL, pakfire_daemon_close, daemon);
+ if (r)
+ goto ERROR;
+
+ // Enqueue the transfer
+ r = pakfire_httpclient_enqueue_xfer(daemon->client, xfer);
+ if (r)
+ goto ERROR;
+
+ return 0;
+
+ERROR:
+ if (xfer)
+ pakfire_xfer_unref(xfer);
+
+ return r;
+}
+
static int pakfire_daemon_setup_loop(struct pakfire_daemon* daemon) {
int r;
return r;
}
- // Submit stats
- r = sd_event_add_time_relative(daemon->loop, &daemon->stat_timer,
- CLOCK_MONOTONIC, 0, 0, pakfire_daemon_submit_stats, daemon);
- if (r < 0) {
- CTX_ERROR(daemon->ctx, "Could not register the stat timer: %s\n", strerror(-r));
- return r;
- }
-
- // Submit stats continuously
- r = sd_event_source_set_enabled(daemon->stat_timer, SD_EVENT_ON);
- if (r < 0) {
- CTX_ERROR(daemon->ctx, "Could not activate the stat timer: %s\n", strerror(-r));
- return r;
- }
-
return 0;
}
static void pakfire_daemon_free(struct pakfire_daemon* daemon) {
if (daemon->connect_timer)
sd_event_source_unref(daemon->connect_timer);
- if (daemon->stat_timer)
- sd_event_source_unref(daemon->stat_timer);
+ if (daemon->stats_timer)
+ sd_event_source_unref(daemon->stats_timer);
if (daemon->client)
pakfire_httpclient_unref(daemon->client);
if (daemon->control)