#############################################################################*/
#include <errno.h>
+#include <fcntl.h>
#include <limits.h>
#include <linux/sched.h>
+#include <unistd.h>
#include <json.h>
#include <pakfire/string.h>
#include <pakfire/util.h>
+#define MAX_LINES 256
+
+struct log {
+ FILE* f;
+ int fds[2];
+ sd_event_source* event;
+
+ // Buffer any read lines
+ char* lines[MAX_LINES];
+ unsigned int num_lines;
+};
+
struct pakfire_job {
struct pakfire_ctx* ctx;
int nrefs;
// PID File Descriptor
int pidfd;
+ // Pipes to read the log output of the child process
+ struct {
+ struct log stdout;
+ struct log stderr;
+ } log;
+
+ // Events for reading logs from the child process
+ sd_event_source* stdout;
+ sd_event_source* stderr;
+
// Connection Timer and Holdoff Time
sd_event_source* connect_timer;
unsigned int reconnect_holdoff;
}
static void pakfire_job_free(struct pakfire_job* job) {
+ // Logging
+ if (job->log.stdout.event)
+ sd_event_source_unref(job->log.stdout.event);
+ if (job->log.stderr.event)
+ sd_event_source_unref(job->log.stderr.event);
+
if (job->client)
pakfire_httpclient_unref(job->client);
if (job->loop)
return 0;
}
+static int __pakfire_job_log_forward(struct pakfire_job* job, int priority, uint32_t events, struct log* log) {
+ char* line = NULL;
+ size_t l = 0;
+ int r;
+
+ // Read all lines
+ if (events & EPOLLIN) {
+ for (;;) {
+ r = getline(&line, &l, log->f);
+ if (r < 0)
+ break;
+
+ // XXX TODO
+ printf("LINE %s\n", line);
+ }
+ }
+
+ // Close the file descriptor
+ if (events & EPOLLHUP) {
+ close(log->fds[0]);
+ log->fds[0] = -1;
+ }
+
+ // Free the buffer
+ if (line)
+ free(line);
+
+ return 0;
+}
+
+static int pakfire_job_log_forward(sd_event_source* s, int fd, uint32_t events, void* data) {
+ struct pakfire_job* job = data;
+
+ // Standard output
+ if (job->log.stdout.event == s)
+ return __pakfire_job_log_forward(job, LOG_INFO, events, &job->log.stdout);
+
+ // Standard error
+ else if (job->log.stderr.event == s)
+ return __pakfire_job_log_forward(job, LOG_ERR, events, &job->log.stderr);
+
+ // Otherwise we fail
+ CTX_ERROR(job->ctx, "Called for an unknown event\n");
+ return -EINVAL;
+}
+
+static int pakfire_job_read_log(struct pakfire_job* job, struct log* log) {
+ int r;
+
+ // Close the write end of the log pipe
+ if (log->fds[1] >= 0) {
+ close(log->fds[1]);
+ log->fds[1] = -1;
+ }
+
+ // Register the file descriptor with the event loop
+ r = sd_event_add_io(job->loop, &log->event,
+ log->fds[0], EPOLLIN, pakfire_job_log_forward, job);
+ if (r < 0) {
+ CTX_ERROR(job->ctx, "Could not register fd %d: %s\n", log->fds[0], strerror(-r));
+ return r;
+ }
+
+ // Create a file handle to read the input line by line
+ log->f = fdopen(log->fds[0], "r");
+ if (!log->f) {
+ CTX_ERROR(job->ctx, "Could not open a file handle: %m\n");
+ return -errno;
+ }
+
+ return 0;
+}
+
/*
Called to initialize the parent process.
*/
return r;
}
+ // Read the standard output
+ r = pakfire_job_read_log(job, &job->log.stdout);
+ if (r < 0)
+ return r;
+
+ // Read the standard error
+ r = pakfire_job_read_log(job, &job->log.stderr);
+ if (r < 0)
+ return r;
+
return 0;
}
+static void pakfire_job_log(void* data, int priority, const char* file,
+ int line, const char* fn, const char* format, va_list args) {
+ struct pakfire_job* job = data;
+ int fd = -1;
+ int r;
+
+ // We only forward INFO & ERROR
+ switch (priority) {
+ case LOG_INFO:
+ fd = job->log.stdout.fds[1];
+ break;
+
+ case LOG_ERR:
+ fd = job->log.stderr.fds[1];
+ break;
+
+ default:
+ break;
+ }
+
+ // Forward the log message
+ if (fd >= 0) {
+ r = vdprintf(fd, format, args);
+ if (r < 0)
+ return;
+ }
+
+ // Pass everything on to syslog
+ pakfire_log_syslog(NULL, priority, file, line, fn, format, args);
+}
+
static int pakfire_job_child(struct pakfire_job* job) {
struct pakfire_ctx* ctx = NULL;
struct pakfire* pakfire = NULL;
// Format the job ID as string
uuid_unparse(job->job_id, job_id);
- // Create a new context
+ // Close the read ends of the log pipes
+ if (job->log.stdout.fds[0] >= 0) {
+ close(job->log.stdout.fds[0]);
+ job->log.stdout.fds[0] = -1;
+ }
+ if (job->log.stderr.fds[0] >= 0) {
+ close(job->log.stderr.fds[0]);
+ job->log.stderr.fds[0] = -1;
+ }
+
+ // Replace the context with a new one
r = pakfire_ctx_create(&ctx, NULL);
if (r < 0) {
- CTX_ERROR(job->ctx, "Could not create a new context: %s\n", strerror(-r));
+ CTX_ERROR(ctx, "Could not create a new context: %s\n", strerror(-r));
goto ERROR;
}
+ // Restore the log level
+ pakfire_ctx_set_log_level(ctx, pakfire_ctx_get_log_level(job->ctx));
+
+ // Setup logging
+ pakfire_ctx_set_log_callback(ctx, pakfire_job_log, job);
+
// Map the configuration
conf = fmemopen(job->conf, strlen(job->conf), "r");
if (!conf) {
// Update state
job->state = PAKFIRE_JOB_STATE_LAUNCHED;
+ // Setup standard output
+ r = pipe2(job->log.stdout.fds, O_CLOEXEC);
+ if (r < 0) {
+ CTX_ERROR(job->ctx, "Could not setup standard output: %m\n");
+ return -errno;
+ }
+
+ // Setup standard error
+ r = pipe2(job->log.stderr.fds, O_CLOEXEC);
+ if (r < 0) {
+ CTX_ERROR(job->ctx, "Could not setup standard error: %m\n");
+ return -errno;
+ }
+
// Configure child process
struct clone_args args = {
.flags = CLONE_PIDFD,
// Initialize the PID file descriptor
j->pidfd = -1;
+ // Setup the log pipe
+ j->log.stdout.fds[0] = -1;
+ j->log.stdout.fds[1] = -1;
+ j->log.stderr.fds[0] = -1;
+ j->log.stderr.fds[1] = -1;
+
// Reconnect after one second
j->reconnect_holdoff = 1000000;