#include <pakfire/ctx.h>
#include <pakfire/daemon.h>
#include <pakfire/httpclient.h>
+#include <pakfire/job.h>
#include <pakfire/util.h>
-#include <pakfire/worker.h>
-#define MAX_WORKERS 64
+#define MAX_JOBS 64
struct pakfire_daemon {
struct pakfire_ctx* ctx;
// Timer for submitting stats
sd_event_source* stats_timer;
- // Workers
- struct pakfire_worker* workers[MAX_WORKERS];
- unsigned int running_workers;
+ // Jobs
+ struct pakfire_job* jobs[MAX_JOBS];
+ unsigned int running_jobs;
};
/*
- Terminates all running workers
+ Terminates all running jobs
*/
-static int pakfire_daemon_terminate_workers(struct pakfire_daemon* daemon) {
+static int pakfire_daemon_terminate_jobs(struct pakfire_daemon* daemon) {
int r;
- for (unsigned int i = 0; i < MAX_WORKERS; i++) {
+ for (unsigned int i = 0; i < MAX_JOBS; i++) {
// Skip any empty slots
- if (!daemon->workers[i])
+ if (!daemon->jobs[i])
continue;
- // Terminate the worker
- r = pakfire_worker_terminate(daemon->workers[i], SIGTERM);
+ // Terminate the job
+ r = pakfire_job_terminate(daemon->jobs[i], SIGTERM);
if (r)
return r;
}
CTX_DEBUG(daemon->ctx, "Received signal to terminate...\n");
- // Terminate all workers
- pakfire_daemon_terminate_workers(daemon);
+ // Terminate all jobs
+ pakfire_daemon_terminate_jobs(daemon);
return sd_event_exit(sd_event_source_get_event(source), 0);
}
size_t length = 0;
int r;
- // If we have any workers running, we will submit our stats
+ // If we have any jobs running, we will submit our stats
// every five seconds, otherwise 30 seconds is enough.
- uint64_t next = (daemon->running_workers > 0) ? 5000000 : 30000000;
+ uint64_t next = (daemon->running_jobs > 0) ? 5000000 : 30000000;
// Reset the timer
r = sd_event_source_set_time_relative(daemon->stats_timer, next);
Called when a new job has been received
*/
static int pakfire_daemon_job(struct pakfire_daemon* daemon, json_object* m) {
- struct pakfire_worker* worker = NULL;
+ struct pakfire_job* job = NULL;
struct json_object* data = NULL;
int r;
return -EINVAL;
}
- // Create a new worker
- r = pakfire_worker_create(&worker, daemon->ctx, daemon, data);
+ // Create a new job
+ r = pakfire_job_create(&job, daemon->ctx, daemon, data);
if (r) {
- CTX_ERROR(daemon->ctx, "Could not create a new worker: %s\n", strerror(-r));
+ CTX_ERROR(daemon->ctx, "Could not create a new job: %s\n", strerror(-r));
goto ERROR;
}
- // Store the worker
- for (unsigned int i = 0; i < MAX_WORKERS; i++) {
- if (!daemon->workers[i]) {
- daemon->workers[i] = pakfire_worker_ref(worker);
+ // Store the job
+ for (unsigned int i = 0; i < MAX_JOBS; i++) {
+ if (!daemon->jobs[i]) {
+ daemon->jobs[i] = pakfire_job_ref(job);
break;
}
}
- // Launch the worker
- r = pakfire_worker_launch(worker);
+ // Launch the job
+ r = pakfire_job_launch(job);
if (r < 0) {
- CTX_ERROR(daemon->ctx, "Could not launch worker: %s\n", strerror(-r));
+ CTX_ERROR(daemon->ctx, "Could not launch job: %s\n", strerror(-r));
goto ERROR;
}
- // Increment the number of running workers
- daemon->running_workers++;
+ // Increment the number of running jobs
+ daemon->running_jobs++;
- // XXX TODO We need to do something with the new worker
+ // XXX TODO We need to do something with the new job
ERROR:
- if (worker)
- pakfire_worker_unref(worker);
+ if (job)
+ pakfire_job_unref(job);
return r;
}
}
/*
- Called after a new worker has been launched to register with the daemon's event loop.
+ Called after a new job has been launched to register with the daemon's event loop.
*/
-int pakfire_daemon_worker_launched(struct pakfire_daemon* daemon,
- struct pakfire_worker* worker, int pidfd) {
+int pakfire_daemon_job_launched(struct pakfire_daemon* daemon,
+ struct pakfire_job* job, int pidfd) {
int r;
- CTX_DEBUG(daemon->ctx, "Registering worker %p with PIDFD %d\n", worker, pidfd);
+ CTX_DEBUG(daemon->ctx, "Registering job %p with PIDFD %d\n", job, pidfd);
r = sd_event_add_child_pidfd(daemon->loop, NULL, pidfd,
- WEXITED, pakfire_worker_exited, worker);
+ WEXITED, pakfire_job_exited, job);
if (r < 0) {
- CTX_DEBUG(daemon->ctx, "Could not register the worker with the event loop: %s\n", strerror(-r));
+ CTX_DEBUG(daemon->ctx, "Could not register the job with the event loop: %s\n", strerror(-r));
return r;
}
}
/*
- Called after a worker has exited.
+ Called after a job has exited.
*/
-int pakfire_daemon_worker_exited(struct pakfire_daemon* daemon, struct pakfire_worker* worker) {
- CTX_DEBUG(daemon->ctx, "Deregistering worker %p\n", worker);
+int pakfire_daemon_job_exited(struct pakfire_daemon* daemon, struct pakfire_job* job) {
+ CTX_DEBUG(daemon->ctx, "Deregistering job %p\n", job);
#if 0
- for (unsigned int i = 0; i < MAX_WORKERS; i++) {
- if (daemon->workers[i] != worker)
+ for (unsigned int i = 0; i < MAX_JOBS; i++) {
+ if (daemon->jobs[i] != job)
continue;
- // Dereference the worker and clear the slot
- pakfire_worker_unref(daemon->workers[i]);
- daemon->workers[i] = NULL;
+ // Dereference the job and clear the slot
+ pakfire_job_unref(daemon->jobs[i]);
+ daemon->jobs[i] = NULL;
break;
}
#include <pakfire/constants.h>
#include <pakfire/ctx.h>
#include <pakfire/daemon.h>
+#include <pakfire/job.h>
#include <pakfire/logging.h>
#include <pakfire/string.h>
-#include <pakfire/worker.h>
#include <pakfire/util.h>
-struct pakfire_worker {
+struct pakfire_job {
struct pakfire_ctx* ctx;
int nrefs;
char arch[ARCH_MAX];
// Flags
- enum pakfire_worker_flags {
- PAKFIRE_WORKER_TEST = (1 << 0),
- PAKFIRE_WORKER_CCACHE = (1 << 1),
+ enum {
+ pakfire_job_TEST = (1 << 0),
+ pakfire_job_CCACHE = (1 << 1),
} flags;
// Package URL
struct pakfire_xfer* control;
};
-static int pakfire_parse_job(struct pakfire_worker* worker, json_object* data) {
+static int pakfire_parse_job(struct pakfire_job* job, json_object* data) {
json_object* ccache = NULL;
json_object* o = NULL;
const char* s = NULL;
// Fetch the Job ID
if (!json_object_object_get_ex(data, "id", &o)) {
- CTX_ERROR(worker->ctx, "Job does not have an ID\n");
+ CTX_ERROR(job->ctx, "Job does not have an ID\n");
return -EINVAL;
}
s = json_object_get_string(o);
// Parse the Job ID
- r = uuid_parse(s, worker->job_id);
+ r = uuid_parse(s, job->job_id);
if (r) {
- CTX_ERROR(worker->ctx, "Could not parse the Job ID (%s)\n", s);
+ CTX_ERROR(job->ctx, "Could not parse the Job ID (%s)\n", s);
return -EINVAL;
}
// Fetch the name
if (!json_object_object_get_ex(data, "name", &o)) {
- CTX_ERROR(worker->ctx, "Job does not have a name\n");
+ CTX_ERROR(job->ctx, "Job does not have a name\n");
return -EINVAL;
}
// Store the name
- r = pakfire_string_set(worker->name, json_object_get_string(o));
+ r = pakfire_string_set(job->name, json_object_get_string(o));
if (r) {
- CTX_ERROR(worker->ctx, "Could not store name: %m\n");
+ CTX_ERROR(job->ctx, "Could not store name: %m\n");
return r;
}
// Fetch the arch
if (!json_object_object_get_ex(data, "arch", &o)) {
- CTX_ERROR(worker->ctx, "Job does not have an architecture\n");
+ CTX_ERROR(job->ctx, "Job does not have an architecture\n");
return -EINVAL;
}
// Store the arch
- r = pakfire_string_set(worker->arch, json_object_get_string(o));
+ r = pakfire_string_set(job->arch, json_object_get_string(o));
if (r) {
- CTX_ERROR(worker->ctx, "Could not store arch: %m\n");
+ CTX_ERROR(job->ctx, "Could not store arch: %m\n");
return r;
}
if (json_object_object_get_ex(data, "ccache", &ccache)) {
if (json_object_object_get_ex(ccache, "enabled", &o)) {
if (json_object_get_boolean(o)) {
- worker->flags |= PAKFIRE_WORKER_CCACHE;
+ job->flags |= pakfire_job_CCACHE;
}
}
// Path
if (json_object_object_get_ex(ccache, "path", &o)) {
- r = pakfire_string_set(worker->ccache_path, json_object_get_string(o));
+ r = pakfire_string_set(job->ccache_path, json_object_get_string(o));
if (r) {
- CTX_ERROR(worker->ctx, "Could not store the ccache path: %m\n");
+ CTX_ERROR(job->ctx, "Could not store the ccache path: %m\n");
return r;
}
// Check if this is a test job
if (json_object_object_get_ex(data, "test", &o)) {
if (json_object_get_boolean(o)) {
- worker->flags |= PAKFIRE_WORKER_TEST;
+ job->flags |= pakfire_job_TEST;
}
}
// Fetch the configuration
if (!json_object_object_get_ex(data, "conf", &o)) {
- CTX_ERROR(worker->ctx, "Job does not have a configuration\n");
+ CTX_ERROR(job->ctx, "Job does not have a configuration\n");
return -EINVAL;
}
// Store the configuration
- r = pakfire_string_set(worker->conf, json_object_get_string(o));
+ r = pakfire_string_set(job->conf, json_object_get_string(o));
if (r) {
- CTX_ERROR(worker->ctx, "Could not store the configuration: %m\n");
+ CTX_ERROR(job->ctx, "Could not store the configuration: %m\n");
return r;
}
// Fetch the package URL
if (!json_object_object_get_ex(data, "pkg", &o)) {
- CTX_ERROR(worker->ctx, "Job does not have a package URL\n");
+ CTX_ERROR(job->ctx, "Job does not have a package URL\n");
return -EINVAL;
}
// Store the package URL
- r = pakfire_string_set(worker->pkg, json_object_get_string(o));
+ r = pakfire_string_set(job->pkg, json_object_get_string(o));
if (r) {
- CTX_ERROR(worker->ctx, "Could not store the package URL: %m\n");
+ CTX_ERROR(job->ctx, "Could not store the package URL: %m\n");
return r;
}
- CTX_DEBUG(worker->ctx, "Job parsing completed\n");
+ CTX_DEBUG(job->ctx, "Job parsing completed\n");
// Format the Job ID as string
- uuid_unparse_lower(worker->job_id, job_id);
+ uuid_unparse_lower(job->job_id, job_id);
- CTX_INFO(worker->ctx, "Received a new job:\n");
- CTX_INFO(worker->ctx, " ID : %s\n", job_id);
- CTX_INFO(worker->ctx, " Name : %s\n", worker->name);
- CTX_INFO(worker->ctx, " Arch : %s\n", worker->arch);
+ CTX_INFO(job->ctx, "Received a new job:\n");
+ CTX_INFO(job->ctx, " ID : %s\n", job_id);
+ CTX_INFO(job->ctx, " Name : %s\n", job->name);
+ CTX_INFO(job->ctx, " Arch : %s\n", job->arch);
return 0;
}
-static void pakfire_worker_free(struct pakfire_worker* worker) {
- if (worker->client)
- pakfire_httpclient_unref(worker->client);
- if (worker->loop)
- sd_event_unref(worker->loop);
- if (worker->daemon)
- pakfire_daemon_unref(worker->daemon);
- if (worker->ctx)
- pakfire_ctx_unref(worker->ctx);
- free(worker);
+static void pakfire_job_free(struct pakfire_job* job) {
+ if (job->client)
+ pakfire_httpclient_unref(job->client);
+ if (job->loop)
+ sd_event_unref(job->loop);
+ if (job->daemon)
+ pakfire_daemon_unref(job->daemon);
+ if (job->ctx)
+ pakfire_ctx_unref(job->ctx);
+ free(job);
}
-static int pakfire_worker_recv(struct pakfire_xfer* xfer,
+static int pakfire_job_recv(struct pakfire_xfer* xfer,
const char* message, const size_t size, void* data) {
// XXX TODO
return 0;
}
-static int pakfire_worker_closed(struct pakfire_xfer* xfer, int code, void* data) {
- struct pakfire_worker* worker = data;
+static int pakfire_job_closed(struct pakfire_xfer* xfer, int code, void* data) {
+ struct pakfire_job* job = data;
int r;
// Drop the control connection
- if (worker->control) {
- pakfire_xfer_unref(worker->control);
- worker->control = NULL;
+ if (job->control) {
+ pakfire_xfer_unref(job->control);
+ job->control = NULL;
}
- CTX_INFO(worker->ctx, "Will attempt to reconnect in %u second(s)\n",
- worker->reconnect_holdoff / 1000000);
+ CTX_INFO(job->ctx, "Will attempt to reconnect in %u second(s)\n",
+ job->reconnect_holdoff / 1000000);
// Set the reconnection timer
- r = sd_event_source_set_time_relative(worker->connect_timer, worker->reconnect_holdoff);
+ r = sd_event_source_set_time_relative(job->connect_timer, job->reconnect_holdoff);
if (r < 0) {
- CTX_ERROR(worker->ctx, "Could not set the reconnection timer: %s\n", strerror(-r));
+ CTX_ERROR(job->ctx, "Could not set the reconnection timer: %s\n", strerror(-r));
return r;
}
// Activate the timer
- r = sd_event_source_set_enabled(worker->connect_timer, SD_EVENT_ONESHOT);
+ r = sd_event_source_set_enabled(job->connect_timer, SD_EVENT_ONESHOT);
if (r < 0) {
- CTX_ERROR(worker->ctx, "Could not activate the connect timer: %s\n", strerror(-r));
+ CTX_ERROR(job->ctx, "Could not activate the connect timer: %s\n", strerror(-r));
return r;
}
// Double the holdoff time
- worker->reconnect_holdoff *= 2;
+ job->reconnect_holdoff *= 2;
// Cap reconnection attempts to every minute
- if (worker->reconnect_holdoff > 60000000)
- worker->reconnect_holdoff = 60000000;
+ if (job->reconnect_holdoff > 60000000)
+ job->reconnect_holdoff = 60000000;
return 0;
}
-static int pakfire_worker_connected(struct pakfire_xfer* xfer, void* data) {
- struct pakfire_worker* worker = data;
+static int pakfire_job_connected(struct pakfire_xfer* xfer, void* data) {
+ struct pakfire_job* job = data;
- CTX_DEBUG(worker->ctx, "Connected!\n");
+ CTX_DEBUG(job->ctx, "Connected!\n");
// Store a reference to the control connection
- worker->control = pakfire_xfer_ref(xfer);
+ job->control = pakfire_xfer_ref(xfer);
return 0;
}
-static int pakfire_worker_connect(sd_event_source* s, uint64_t usec, void* data) {
- struct pakfire_worker* worker = data;
+static int pakfire_job_connect(sd_event_source* s, uint64_t usec, void* data) {
+ struct pakfire_job* job = data;
struct pakfire_xfer* xfer = NULL;
char job_id[UUID_STR_LEN];
int r;
- CTX_INFO(worker->ctx, "Connecting...\n");
+ CTX_INFO(job->ctx, "Connecting...\n");
// Format the job ID as string
- uuid_unparse(worker->job_id, job_id);
+ uuid_unparse(job->job_id, job_id);
// Create a new xfer
- r = pakfire_httpclient_create_xfer(&xfer, worker->client, "/api/v1/jobs/%s", job_id);
+ r = pakfire_httpclient_create_xfer(&xfer, job->client, "/api/v1/jobs/%s", job_id);
if (r)
goto ERROR;
goto ERROR;
// Make this a WebSocket connection
- r = pakfire_xfer_socket(xfer, pakfire_worker_connected,
- pakfire_worker_recv, NULL, pakfire_worker_closed, worker);
+ r = pakfire_xfer_socket(xfer, pakfire_job_connected,
+ pakfire_job_recv, NULL, pakfire_job_closed, job);
if (r)
goto ERROR;
// Enqueue the transfer
- r = pakfire_httpclient_enqueue_xfer(worker->client, xfer);
+ r = pakfire_httpclient_enqueue_xfer(job->client, xfer);
if (r)
goto ERROR;
return r;
}
-int pakfire_worker_create(struct pakfire_worker** worker, struct pakfire_ctx* ctx,
+int pakfire_job_create(struct pakfire_job** job, struct pakfire_ctx* ctx,
struct pakfire_daemon* daemon, json_object* data) {
- struct pakfire_worker* w = NULL;
+ struct pakfire_job* j = NULL;
int r;
// Allocate a new object
- w = calloc(1, sizeof(*w));
- if (!w)
+ j = calloc(1, sizeof(*j));
+ if (!j)
return -errno;
// Reference the context
- w->ctx = pakfire_ctx_ref(ctx);
+ j->ctx = pakfire_ctx_ref(ctx);
// Initialize the reference counter
- w->nrefs = 1;
+ j->nrefs = 1;
// Store a reference to the daemon
- w->daemon = pakfire_daemon_ref(daemon);
+ j->daemon = pakfire_daemon_ref(daemon);
// Fetch a reference to the event loop
- w->loop = pakfire_daemon_loop(daemon);
- if (!w->loop) {
- CTX_ERROR(w->ctx, "Could not fetch the event loop: %m\n");
+ j->loop = pakfire_daemon_loop(daemon);
+ if (!j->loop) {
+ CTX_ERROR(j->ctx, "Could not fetch the event loop: %m\n");
r = -errno;
goto ERROR;
}
// Fetch a reference to the HTTP client
- w->client = pakfire_daemon_httpclient(daemon);
- if (!w->client) {
- CTX_ERROR(w->ctx, "Could not fetch the HTTP client: %m\n");
+ j->client = pakfire_daemon_httpclient(daemon);
+ if (!j->client) {
+ CTX_ERROR(j->ctx, "Could not fetch the HTTP client: %m\n");
r = -errno;
goto ERROR;
}
// Initialize the PID file descriptor
- w->pidfd = -1;
+ j->pidfd = -1;
// Reconnect after one second
- w->reconnect_holdoff = 1000000;
+ j->reconnect_holdoff = 1000000;
// Parse the job
- r = pakfire_parse_job(w, data);
+ r = pakfire_parse_job(j, data);
if (r)
goto ERROR;
// Setup the reconnection timer
- r = sd_event_add_time_relative(w->loop, &w->connect_timer,
- CLOCK_MONOTONIC, 0, 0, pakfire_worker_connect, w);
+ r = sd_event_add_time_relative(j->loop, &j->connect_timer,
+ CLOCK_MONOTONIC, 0, 0, pakfire_job_connect, j);
if (r < 0) {
- CTX_ERROR(w->ctx, "Could not register the connection timer: %s\n", strerror(-r));
+ CTX_ERROR(j->ctx, "Could not register the connection timer: %s\n", strerror(-r));
goto ERROR;
}
// Return the reference
- *worker = w;
+ *job = j;
return 0;
ERROR:
- pakfire_worker_free(w);
+ pakfire_job_free(j);
return r;
}
-struct pakfire_worker* pakfire_worker_ref(struct pakfire_worker* worker) {
- ++worker->nrefs;
+struct pakfire_job* pakfire_job_ref(struct pakfire_job* job) {
+ ++job->nrefs;
- return worker;
+ return job;
}
-struct pakfire_worker* pakfire_worker_unref(struct pakfire_worker* worker) {
- if (--worker->nrefs > 0)
- return worker;
+struct pakfire_job* pakfire_job_unref(struct pakfire_job* job) {
+ if (--job->nrefs > 0)
+ return job;
- pakfire_worker_free(worker);
+ pakfire_job_free(job);
return NULL;
}
/*
*/
-static int pakfire_worker_parent(struct pakfire_worker* worker) {
+static int pakfire_job_parent(struct pakfire_job* job) {
int r;
- // Tell the daemon process that the worker has been launched
- r = pakfire_daemon_worker_launched(worker->daemon, worker, worker->pidfd);
+ // Tell the daemon process that the job has been launched
+ r = pakfire_daemon_job_launched(job->daemon, job, job->pidfd);
if (r < 0)
return r;
return 0;
}
-static int pakfire_worker_child(struct pakfire_worker* worker) {
+static int pakfire_job_child(struct pakfire_job* job) {
// Fetch our PID
pid_t pid = getpid();
- CTX_DEBUG(worker->ctx, "Launched worker child as PID %d\n", pid);
+ CTX_DEBUG(job->ctx, "Launched job child as PID %d\n", pid);
// XXX TODO
return 1;
}
/*
- Launches the worker and returns the pidfd
+ Launches the job and returns the pidfd
*/
-int pakfire_worker_launch(struct pakfire_worker* worker) {
+int pakfire_job_launch(struct pakfire_job* job) {
char job_id[UUID_STR_LEN];
int pid;
int r;
// Format the job ID as string
- uuid_unparse(worker->job_id, job_id);
+ uuid_unparse(job->job_id, job_id);
- CTX_DEBUG(worker->ctx, "Launching worker %s\n", job_id);
+ CTX_DEBUG(job->ctx, "Launching job %s\n", job_id);
// Configure child process
struct clone_args args = {
.flags = CLONE_PIDFD,
.exit_signal = SIGCHLD,
- .pidfd = (long long unsigned int)&worker->pidfd,
+ .pidfd = (long long unsigned int)&job->pidfd,
};
// Fork this process
pid = clone3(&args, sizeof(args));
if (pid < 0) {
- CTX_ERROR(worker->ctx, "Could not clone: %m\n");
+ CTX_ERROR(job->ctx, "Could not clone: %m\n");
return -errno;
// Child process
} else if (pid == 0) {
- r = pakfire_worker_child(worker);
+ r = pakfire_job_child(job);
_exit(r);
}
- return pakfire_worker_parent(worker);
+ return pakfire_job_parent(job);
}
/*
- Terminates the worker (if it is still running)
+ Terminates the job (if it is still running)
*/
-int pakfire_worker_terminate(struct pakfire_worker* worker, int signal) {
+int pakfire_job_terminate(struct pakfire_job* job, int signal) {
int r;
// Fail if we don't have a PID file descriptor
- if (worker->pidfd < 0)
+ if (job->pidfd < 0)
return -ENOTSUP;
// Send a signal to the child process
- r = pidfd_send_signal(worker->pidfd, signal, NULL, 0);
+ r = pidfd_send_signal(job->pidfd, signal, NULL, 0);
if (r) {
- CTX_ERROR(worker->ctx, "Could not terminate worker: %m\n");
+ CTX_ERROR(job->ctx, "Could not terminate job: %m\n");
return r;
}
}
/*
- This method is triggered by SIGCHLD whenever the worker exits.
+ This method is triggered by SIGCHLD whenever the job exits.
*/
-int pakfire_worker_exited(sd_event_source* s, const siginfo_t* si, void* data) {
- struct pakfire_worker* worker = data;
+int pakfire_job_exited(sd_event_source* s, const siginfo_t* si, void* data) {
+ struct pakfire_job* job = data;
char job_id[UUID_STR_LEN];
int r;
// Format the job ID as string
- uuid_unparse(worker->job_id, job_id);
+ uuid_unparse(job->job_id, job_id);
switch (si->si_code) {
case CLD_EXITED:
- CTX_DEBUG(worker->ctx, "Worker %s has exited with status code %d\n",
+ CTX_DEBUG(job->ctx, "Job %s has exited with status code %d\n",
job_id, si->si_status);
break;
case CLD_KILLED:
- CTX_ERROR(worker->ctx, "Worker %s has been killed by signal %d\n",
+ CTX_ERROR(job->ctx, "Job %s has been killed by signal %d\n",
job_id, si->si_signo);
break;
}
- // Let the daemon know this worker has exited
- r = pakfire_daemon_worker_exited(worker->daemon, worker);
+ // Let the daemon know this job has exited
+ r = pakfire_daemon_job_exited(job->daemon, job);
if (r)
return r;