#include <pakfire/client.h>
#include <pakfire/ctx.h>
#include <pakfire/builder.h>
+#include <pakfire/job.h>
#include <pakfire/json.h>
+#include <pakfire/string.h>
#include <pakfire/util.h>
#include <pakfire/xfer.h>
// Submit stats every 30 seconds
#define PAKFIRE_STATS_TIMER S_TO_US(30)
+#define MAX_JOBS 64
+
struct pakfire_builder {
// Context
struct pakfire_ctx* ctx;
// Reconnect Timer & Holdoff Time
sd_event_source* reconnect_timer;
uint64_t reconnect_holdoff;
+
+ // Jobs
+ struct pakfire_job* jobs[MAX_JOBS];
+ unsigned int running_jobs;
};
static void pakfire_builder_free(struct pakfire_builder* self) {
struct pakfire_builder* self = data;
int r;
+ // Log action
+ DEBUG(self->ctx, "Builder disconnected\n");
+
// Remove the connection from the client
r = pakfire_client_builder_disconnected(self->client, xfer);
if (r < 0)
return 0;
}
-static int pakfire_builder_handle_job_message(struct pakfire_builder* self, const char* job_id, struct json_object* message) {
- return -EINVAL; // XXX TODO
+// Jobs
+
+static struct pakfire_job* pakfire_builder_find_job(
+ struct pakfire_builder* self, const char* job_id) {
+ // Walk through all jobs
+ for (unsigned int i = 0; i < MAX_JOBS; i++) {
+ if (!self->jobs[i])
+ continue;
+
+ // Return the matching job
+ if (pakfire_job_has_id(self->jobs[i], job_id))
+ return pakfire_job_ref(self->jobs[i]);
+ }
+
+ return NULL;
+}
+
+/*
+ Terminates all running jobs
+*/
+int pakfire_builder_terminate_jobs(struct pakfire_builder* self) {
+ int r;
+
+ for (unsigned int i = 0; i < MAX_JOBS; i++) {
+ // Skip any empty slots
+ if (!self->jobs[i])
+ continue;
+
+ // Terminate the job
+ r = pakfire_job_terminate(self->jobs[i], SIGTERM);
+ if (r)
+ return r;
+ }
+
+ return 0;
+}
+
+/*
+ Called after a job has exited
+*/
+int pakfire_builder_job_finished(struct pakfire_builder* self, struct pakfire_job* job) {
+ int r;
+
+ DEBUG(self->ctx, "Removing job %p\n", job);
+
+ for (unsigned int i = 0; i < MAX_JOBS; i++) {
+ if (self->jobs[i] != job)
+ continue;
+
+ // Dereference the job and clear the slot
+ pakfire_job_unref(self->jobs[i]);
+ self->jobs[i] = NULL;
+
+ break;
+ }
+
+ // Now we have one less job running
+ self->running_jobs--;
+
+#if 0
+ // Release the shutdown inhibition if there are no more jobs running
+ if (!self->running_jobs) {
+ r = pakfire_daemon_release_inhibit_shutdown(daemon);
+ if (r < 0)
+ return r;
+ }
+#endif
+
+ return 0;
+}
+
+/*
+ Called when we have received a message for a specific job
+*/
+static int pakfire_builder_handle_job_message(
+ struct pakfire_builder* self, const char* job_id, struct json_object* message) {
+ struct pakfire_job* job = NULL;
+ int r;
+
+ // Find the job
+ job = pakfire_builder_find_job(self, job_id);
+ if (!job) {
+ WARN(self->ctx, "Received message for job %s which does not exist. Ignoring.\n", job_id);
+ return 0;
+ }
+
+ // Dispatch the message to the job
+ r = pakfire_job_handle_message(job, message);
+ if (r < 0)
+ goto ERROR;
+
+ERROR:
+ if (job)
+ pakfire_job_unref(job);
+
+ return r;
+}
+
+/*
+ Called when a new job has been received
+*/
+static int pakfire_builder_job(struct pakfire_builder* self, json_object* m) {
+ struct json_object* data = NULL;
+ struct pakfire_job* job = NULL;
+ int r;
+
+#if 0
+ // Inhibit shutdown
+ r = pakfire_daemon_inhibit_shutdown(daemon);
+ if (r < 0)
+ goto ERROR;
+#endif
+
+ // Fetch the data from the message
+ if (!json_object_object_get_ex(m, "data", &data)) {
+ ERROR(self->ctx, "Job does not have any data\n");
+ r = -EINVAL;
+ goto ERROR;
+ }
+
+ // Create a new job
+ r = pakfire_job_create(&job, self->ctx, self, data);
+ if (r) {
+ ERROR(self->ctx, "Could not create a new job: %s\n", strerror(-r));
+ goto ERROR;
+ }
+
+ // Launch the job
+ r = pakfire_job_launch(job);
+ if (r < 0) {
+ ERROR(self->ctx, "Failed to launch the job: %s\n", strerror(-r));
+ goto ERROR;
+ }
+
+ // Store the job
+ for (unsigned int i = 0; i < MAX_JOBS; i++) {
+ if (!self->jobs[i]) {
+ self->jobs[i] = pakfire_job_ref(job);
+ break;
+ }
+ }
+
+ // Increment the number of running jobs
+ self->running_jobs++;
+
+ERROR:
+ if (job)
+ pakfire_job_unref(job);
+
+ return r;
}
+
static int pakfire_builder_handle_message(struct pakfire_builder* self, struct json_object* message) {
const char* job_id = NULL;
const char* type = NULL;
}
// Handle new jobs
-#if 0
if (pakfire_string_equals(type, "job"))
return pakfire_builder_job(self, message);
-#endif
// Log an error for any unknown messages
ERROR(self->ctx, "Unknown message. Ignoring:\n%s\n",
return r;
}
+int pakfire_builder_send(struct pakfire_xfer* xfer, void* data) {
+ struct pakfire_builder* self = data;
+
+ // Stream logs
+ return pakfire_builder_stream_logs(self);
+}
+
int pakfire_builder_send_message(struct pakfire_builder* self, struct json_object* message) {
const char* m = NULL;
size_t length = 0;
return 0;
}
+
+/*
+ Called when we are ready to stream the log.
+
+ This function calls each running job which will only send one line
+ at a time to ensure that one job will not take the entire bandwidth.
+*/
+int pakfire_builder_stream_logs(struct pakfire_builder* self) {
+ unsigned int lines;
+ int r;
+
+ // Bail if we don't have a control connection
+ if (!self->control)
+ return 0;
+
+ // Bail if the connection isn't ready to send
+ else if (!pakfire_xfer_is_ready_to_send(self->control))
+ return 0;
+
+ do {
+ // Reset lines
+ lines = 0;
+
+ // Have every job send one line
+ for (unsigned int i = 0; i < MAX_JOBS; i++) {
+ // Skip any empty slots
+ if (!self->jobs[i])
+ continue;
+
+ // Stream logs
+ r = pakfire_job_stream_logs(self->jobs[i]);
+ if (r < 0) {
+ switch (-r) {
+ case EAGAIN:
+ return 0;
+
+ default:
+ return r;
+ }
+ }
+
+ // Add up lines
+ lines += r;
+ }
+ } while (lines);
+
+ return 0;
+}
#include <pakfire/client.h>
#include <pakfire/ctx.h>
+#include <pakfire/job.h>
+#include <pakfire/xfer.h>
int pakfire_builder_create(struct pakfire_builder** builder,
struct pakfire_ctx* ctx, struct pakfire_client* client);
int pakfire_builder_connected(struct pakfire_xfer* xfer, void* data);
int pakfire_builder_close(struct pakfire_xfer* xfer, int code, void* data);
int pakfire_builder_recv(struct pakfire_xfer* xfer, const char* message, const size_t size, void* data);
+int pakfire_builder_send(struct pakfire_xfer* xfer, void* data);
int pakfire_builder_send_message(struct pakfire_builder* self, struct json_object* message);
+// Jobs
+
+int pakfire_builder_terminate_jobs(struct pakfire_builder* self);
+int pakfire_builder_job_finished(struct pakfire_builder* self, struct pakfire_job* job);
+
+// Stream Logs
+int pakfire_builder_stream_logs(struct pakfire_builder* self);
+
#endif /* PAKFIRE_BUILDER_H */
r = pakfire_xfer_socket(xfer,
pakfire_builder_connected,
pakfire_builder_recv,
- NULL,
+ pakfire_builder_send,
pakfire_builder_close,
builder);
if (r < 0)
#include <pakfire/client.h>
#include <pakfire/ctx.h>
#include <pakfire/daemon.h>
-#include <pakfire/job.h>
#include <pakfire/json.h>
#include <pakfire/proctitle.h>
#include <pakfire/string.h>
#include <pakfire/util.h>
-#define MAX_JOBS 64
-
struct pakfire_daemon {
struct pakfire_ctx* ctx;
int nrefs;
// cgroup
struct pakfire_cgroup* cgroup;
-
- // Jobs
- struct pakfire_job* jobs[MAX_JOBS];
- unsigned int running_jobs;
};
-static struct pakfire_job* pakfire_daemon_find_job(
- struct pakfire_daemon* self, const char* job_id) {
- // Walk through all jobs
- for (unsigned int i = 0; i < MAX_JOBS; i++) {
- if (!self->jobs[i])
- continue;
-
- // Return the matching job
- if (pakfire_job_has_id(self->jobs[i], job_id))
- return pakfire_job_ref(self->jobs[i]);
- }
-
- return NULL;
-}
-
-/*
- Terminates all running jobs
-*/
-static int pakfire_daemon_terminate_jobs(struct pakfire_daemon* daemon) {
- int r;
-
- for (unsigned int i = 0; i < MAX_JOBS; i++) {
- // Skip any empty slots
- if (!daemon->jobs[i])
- continue;
-
- // Terminate the job
- r = pakfire_job_terminate(daemon->jobs[i], SIGTERM);
- if (r)
- return r;
- }
-
- return 0;
-}
-
static int pakfire_daemon_terminate(sd_event_source* source,
const struct signalfd_siginfo* si, void* data) {
struct pakfire_daemon* daemon = data;
DEBUG(daemon->ctx, "Received signal to terminate...\n");
// Terminate all jobs
- pakfire_daemon_terminate_jobs(daemon);
+ pakfire_builder_terminate_jobs(daemon->builder);
return sd_event_exit(sd_event_source_get_event(source), 0);
}
return 0;
}
-/*
- Called when we have received a message for a specific job
-*/
-static int pakfire_daemon_handle_job_message(
- struct pakfire_daemon* self, const char* job_id, struct json_object* message) {
- struct pakfire_job* job = NULL;
- int r;
-
- // Find the job
- job = pakfire_daemon_find_job(self, job_id);
- if (!job) {
- WARN(self->ctx, "Received message for job %s which does not exist. Ignoring.\n", job_id);
- return 0;
- }
-
- // Dispatch the message to the job
- r = pakfire_job_handle_message(job, message);
- if (r < 0)
- goto ERROR;
-
-ERROR:
- if (job)
- pakfire_job_unref(job);
-
- return r;
-}
-
-/*
- Called when a new job has been received
-*/
-static int pakfire_daemon_job(struct pakfire_daemon* daemon, json_object* m) {
- struct pakfire_job* job = NULL;
- struct json_object* data = NULL;
- int r;
-
- // Inhibit shutdown
- r = pakfire_daemon_inhibit_shutdown(daemon);
- if (r < 0)
- goto ERROR;
-
- // Fetch the data from the message
- if (!json_object_object_get_ex(m, "data", &data)) {
- ERROR(daemon->ctx, "Job does not have any data\n");
- r = -EINVAL;
- goto ERROR;
- }
-
- // Create a new job
- r = pakfire_job_create(&job, daemon->ctx, daemon, data);
- if (r) {
- ERROR(daemon->ctx, "Could not create a new job: %s\n", strerror(-r));
- goto ERROR;
- }
-
- // Launch the job
- r = pakfire_job_launch(job);
- if (r < 0) {
- ERROR(daemon->ctx, "Failed to launch the job: %s\n", strerror(-r));
- goto ERROR;
- }
-
- // Store the job
- for (unsigned int i = 0; i < MAX_JOBS; i++) {
- if (!daemon->jobs[i]) {
- daemon->jobs[i] = pakfire_job_ref(job);
- break;
- }
- }
-
- // Increment the number of running jobs
- daemon->running_jobs++;
-
-ERROR:
- if (job)
- pakfire_job_unref(job);
-
- return r;
-}
-
-/*
- Called when we are ready to stream the log.
-
- This function calls each running job which will only send one line
- at a time to ensure that one job will not take the entire bandwidth.
-*/
-int pakfire_daemon_stream_logs(struct pakfire_daemon* self) {
- unsigned int lines;
- int r;
-
-#if 0
- // Bail if we don't have a control connection
- if (!self->control)
- return 0;
-
- // Bail if the connection isn't ready to send
- else if (!pakfire_xfer_is_ready_to_send(self->control))
- return 0;
-#endif
-
- do {
- // Reset lines
- lines = 0;
-
- // Have every job send one line
- for (unsigned int i = 0; i < MAX_JOBS; i++) {
- // Skip any empty slots
- if (!self->jobs[i])
- continue;
-
- // Stream logs
- r = pakfire_job_stream_logs(self->jobs[i]);
- if (r < 0) {
- switch (-r) {
- case EAGAIN:
- return 0;
-
- default:
- return r;
- }
- }
-
- // Add up lines
- lines += r;
- }
- } while (lines);
-
- return 0;
-}
-
-static int pakfire_daemon_send(struct pakfire_xfer* xfer, void* data) {
- struct pakfire_daemon* self = data;
- int r;
-
- // Stream logs
- r = pakfire_daemon_stream_logs(self);
- if (r < 0)
- return r;
-
- return 0;
-}
-
/*
Called when the client is ready and we can start making connections...
*/
return 1;
}
-
-/*
- Called after a job has exited
-*/
-int pakfire_daemon_job_finished(struct pakfire_daemon* daemon, struct pakfire_job* job) {
- int r;
-
- DEBUG(daemon->ctx, "Removing job %p\n", job);
-
- for (unsigned int i = 0; i < MAX_JOBS; i++) {
- if (daemon->jobs[i] != job)
- continue;
-
- // Dereference the job and clear the slot
- pakfire_job_unref(daemon->jobs[i]);
- daemon->jobs[i] = NULL;
-
- break;
- }
-
- // Now we have one less job running
- daemon->running_jobs--;
-
- // Release the shutdown inhibition if there are no more jobs running
- if (!daemon->running_jobs) {
- r = pakfire_daemon_release_inhibit_shutdown(daemon);
- if (r < 0)
- return r;
- }
-
- return 0;
-}
int pakfire_daemon_main(struct pakfire_daemon* daemon);
-int pakfire_daemon_job_finished(struct pakfire_daemon* daemon, struct pakfire_job* job);
-
-// Stream Logs
-int pakfire_daemon_stream_logs(struct pakfire_daemon* self);
-
// Send message
int pakfire_daemon_send_message(struct pakfire_daemon* self, struct json_object* message);
#include <pakfire/base64.h>
#include <pakfire/build.h>
+#include <pakfire/builder.h>
#include <pakfire/client.h>
#include <pakfire/config.h>
#include <pakfire/constants.h>
#include <pakfire/ctx.h>
-#include <pakfire/daemon.h>
#include <pakfire/job.h>
#include <pakfire/json.h>
#include <pakfire/log_file.h>
struct pakfire_ctx* ctx;
int nrefs;
- // Daemon
- struct pakfire_daemon* daemon;
+ // Builder
+ struct pakfire_builder* builder;
// Client
struct pakfire_client* client;
pakfire_config_unref(job->config);
if (job->loop)
sd_event_unref(job->loop);
- if (job->daemon)
- pakfire_daemon_unref(job->daemon);
+ if (job->builder)
+ pakfire_builder_unref(job->builder);
if (job->ctx)
pakfire_ctx_unref(job->ctx);
free(job);
if (r < 0)
goto ERROR;
+#if 0
// Set the base URL
r = pakfire_xfer_set_baseurl(x, pakfire_daemon_url(job->daemon));
if (r < 0)
goto ERROR;
+#endif
// Success
*xfer = pakfire_xfer_ref(x);
break;
}
- // Let the daemon know this is finished
- return pakfire_daemon_job_finished(job->daemon, job);
+ // Let the builder know this is finished
+ return pakfire_builder_job_finished(job->builder, job);
}
static int pakfire_job_send_log(struct pakfire_job* job, int priority, const char* line, size_t length) {
if (r < 0)
return r;
- // Ask the daemon to send it
- return pakfire_daemon_stream_logs(job->daemon);
+ // Ask the builder to send it
+ return pakfire_builder_stream_logs(job->builder);
}
static int pakfire_job_stdout(struct pakfire_ctx* ctx,
goto ERROR;
// Send the message
- r = pakfire_daemon_send_message(job->daemon, message);
+ r = pakfire_builder_send_message(job->builder, message);
if (r < 0) {
ERROR(job->ctx, "Could not send log message: %s\n", strerror(-r));
goto ERROR;
}
int pakfire_job_create(struct pakfire_job** job, struct pakfire_ctx* ctx,
- struct pakfire_daemon* daemon, json_object* data) {
+ struct pakfire_builder* builder, json_object* data) {
struct pakfire_job* j = NULL;
int r;
// Initialize the reference counter
j->nrefs = 1;
- // Store a reference to the daemon
- j->daemon = pakfire_daemon_ref(daemon);
+ // Store a reference to the builder
+ j->builder = pakfire_builder_ref(builder);
// Fetch a reference to the event loop
r = pakfire_ctx_loop(j->ctx, &j->loop);
goto ERROR;
}
+#if 0
// Fetch a reference to the client
j->client = pakfire_daemon_client(daemon);
+#endif
// Initialize the PID file descriptor
j->pidfd = -EBADF;
#ifndef PAKFIRE_JOB_H
#define PAKFIRE_JOB_H
+#include <pakfire/builder.h>
#include <pakfire/ctx.h>
-#include <pakfire/daemon.h>
struct pakfire_job;
int pakfire_job_create(struct pakfire_job** worker,
- struct pakfire_ctx* ctx, struct pakfire_daemon* daemon, json_object* data);
+ struct pakfire_ctx* ctx, struct pakfire_builder* builder, json_object* data);
struct pakfire_job* pakfire_job_ref(struct pakfire_job* worker);
struct pakfire_job* pakfire_job_unref(struct pakfire_job* worker);