--- /dev/null
+/*#############################################################################
+# #
+# Pakfire - The IPFire package management system #
+# Copyright (C) 2024 Pakfire development team #
+# #
+# This program is free software: you can redistribute it and/or modify #
+# it under the terms of the GNU General Public License as published by #
+# the Free Software Foundation, either version 3 of the License, or #
+# (at your option) any later version. #
+# #
+# This program is distributed in the hope that it will be useful, #
+# but WITHOUT ANY WARRANTY; without even the implied warranty of #
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
+# GNU General Public License for more details. #
+# #
+# You should have received a copy of the GNU General Public License #
+# along with this program. If not, see <http://www.gnu.org/licenses/>. #
+# #
+#############################################################################*/
+
+#ifndef PAKFIRE_LOG_STREAM_H
+#define PAKFIRE_LOG_STREAM_H
+
+#ifdef PAKFIRE_PRIVATE
+
+#include <stdarg.h>
+
+#include <systemd/sd-event.h>
+
+#include <pakfire/ctx.h>
+
+struct pakfire_log_stream;
+
+typedef int (*pakfire_log_stream_callback)(struct pakfire_log_stream* stream,
+ const char* line, size_t length, void* data);
+
+int pakfire_log_stream_create(struct pakfire_log_stream** stream, struct pakfire_ctx* ctx,
+ pakfire_log_stream_callback callback, void* data);
+struct pakfire_log_stream* pakfire_log_stream_ref(struct pakfire_log_stream* stream);
+struct pakfire_log_stream* pakfire_log_stream_unref(struct pakfire_log_stream* stream);
+
+int pakfire_log_stream_in_parent(struct pakfire_log_stream* stream, sd_event* loop);
+int pakfire_log_stream_in_child(struct pakfire_log_stream* stream);
+
+int pakfire_log_stream_write(struct pakfire_log_stream* stream, const char* format, va_list args);
+
+#endif /* PAKFIRE_PRIVATE */
+#endif /* PAKFIRE_LOG_STREAM_H */
#include <pakfire/daemon.h>
#include <pakfire/job.h>
#include <pakfire/log_buffer.h>
+#include <pakfire/log_stream.h>
#include <pakfire/logging.h>
#include <pakfire/string.h>
#include <pakfire/util.h>
-struct log {
- FILE* f;
- int fds[2];
- sd_event_source* event;
-};
-
struct pakfire_job {
struct pakfire_ctx* ctx;
int nrefs;
// PID File Descriptor
int pidfd;
- // Pipes to read the log output of the child process
+ // Logging
struct {
- struct log stdout;
- struct log stderr;
-
+ // Buffer
struct pakfire_log_buffer* buffer;
- } log;
- // Events for reading logs from the child process
- sd_event_source* stdout;
- sd_event_source* stderr;
+ // Streams
+ struct pakfire_log_stream* stdout;
+ struct pakfire_log_stream* stderr;
+ } log;
// Connection Timer and Holdoff Time
sd_event_source* connect_timer;
// Logging
if (job->log.buffer)
pakfire_log_buffer_unref(job->log.buffer);
- if (job->log.stdout.event)
- sd_event_source_unref(job->log.stdout.event);
- if (job->log.stderr.event)
- sd_event_source_unref(job->log.stderr.event);
+ if (job->log.stdout)
+ pakfire_log_stream_unref(job->log.stdout);
+ if (job->log.stderr)
+ pakfire_log_stream_unref(job->log.stderr);
if (job->client)
pakfire_httpclient_unref(job->client);
return r;
}
-static int __pakfire_job_log_forward(struct pakfire_job* job, int priority, uint32_t events, struct log* log) {
- char* line = NULL;
- ssize_t length = 0;
- size_t l = 0;
- int r;
-
- // Read all lines
- if (events & EPOLLIN) {
- for (;;) {
- length = getline(&line, &l, log->f);
- if (length < 0)
- break;
-
- // Send the line to the build service
- r = pakfire_job_send_log(job, priority, line, length);
- if (r < 0)
- return r;
-
- // Enqueue the line in the log buffer
- r = pakfire_log_buffer_enqueue(job->log.buffer, priority, line, length);
- if (r < 0) {
- switch (-r) {
- case ENOSPC:
- CTX_DEBUG(job->ctx, "Log buffer is full\n");
- break;
-
- default:
- CTX_ERROR(job->ctx, "Could not enqueue to the log buffer: %s\n", strerror(-r));
- return r;
- }
- }
- }
- }
-
- // Close the file descriptor
- if (events & EPOLLHUP) {
- close(log->fds[0]);
- log->fds[0] = -1;
- }
-
- // Free the buffer
- if (line)
- free(line);
-
- return 0;
-}
-
-static int pakfire_job_log_forward(sd_event_source* s, int fd, uint32_t events, void* data) {
+static int pakfire_job_stdout(struct pakfire_log_stream* stream,
+ const char* line, size_t length, void* data) {
struct pakfire_job* job = data;
- // Standard output
- if (job->log.stdout.event == s)
- return __pakfire_job_log_forward(job, LOG_INFO, events, &job->log.stdout);
-
- // Standard error
- else if (job->log.stderr.event == s)
- return __pakfire_job_log_forward(job, LOG_ERR, events, &job->log.stderr);
-
- // Otherwise we fail
- CTX_ERROR(job->ctx, "Called for an unknown event\n");
- return -EINVAL;
+ // Send the output to the build service
+ return pakfire_job_send_log(job, LOG_INFO, line, length);
}
-static int pakfire_job_read_log(struct pakfire_job* job, struct log* log) {
- int r;
-
- // Close the write end of the log pipe
- if (log->fds[1] >= 0) {
- close(log->fds[1]);
- log->fds[1] = -1;
- }
-
- // Register the file descriptor with the event loop
- r = sd_event_add_io(job->loop, &log->event,
- log->fds[0], EPOLLIN, pakfire_job_log_forward, job);
- if (r < 0) {
- CTX_ERROR(job->ctx, "Could not register fd %d: %s\n", log->fds[0], strerror(-r));
- return r;
- }
-
- // Create a file handle to read the input line by line
- log->f = fdopen(log->fds[0], "r");
- if (!log->f) {
- CTX_ERROR(job->ctx, "Could not open a file handle: %m\n");
- return -errno;
- }
+static int pakfire_job_stderr(struct pakfire_log_stream* stream,
+ const char* line, size_t length, void* data) {
+ struct pakfire_job* job = data;
- return 0;
+ // Send the output to the build service
+ return pakfire_job_send_log(job, LOG_ERR, line, length);
}
/*
}
// Read the standard output
- r = pakfire_job_read_log(job, &job->log.stdout);
+ r = pakfire_log_stream_in_parent(job->log.stdout, job->loop);
if (r < 0)
return r;
// Read the standard error
- r = pakfire_job_read_log(job, &job->log.stderr);
+ r = pakfire_log_stream_in_parent(job->log.stderr, job->loop);
if (r < 0)
return r;
static void pakfire_job_log(void* data, int priority, const char* file,
int line, const char* fn, const char* format, va_list args) {
struct pakfire_job* job = data;
- int fd = -1;
int r;
// We only forward INFO & ERROR
switch (priority) {
case LOG_INFO:
- fd = job->log.stdout.fds[1];
+ r = pakfire_log_stream_write(job->log.stdout, format, args);
+ if (r < 0)
+ return;
break;
case LOG_ERR:
- fd = job->log.stderr.fds[1];
+ r = pakfire_log_stream_write(job->log.stderr, format, args);
+ if (r < 0)
+ return;
break;
default:
break;
}
- // Forward the log message
- if (fd >= 0) {
- r = vdprintf(fd, format, args);
- if (r < 0)
- return;
- }
-
// Pass everything on to syslog
pakfire_log_syslog(NULL, priority, file, line, fn, format, args);
}
// Format the job ID as string
uuid_unparse(job->job_id, job_id);
- // Close the read ends of the log pipes
- if (job->log.stdout.fds[0] >= 0) {
- close(job->log.stdout.fds[0]);
- job->log.stdout.fds[0] = -1;
- }
- if (job->log.stderr.fds[0] >= 0) {
- close(job->log.stderr.fds[0]);
- job->log.stderr.fds[0] = -1;
- }
+ // Setup the standard output log stream
+ r = pakfire_log_stream_in_child(job->log.stdout);
+ if (r < 0)
+ goto ERROR;
+
+ // Setup the standard error log stream
+ r = pakfire_log_stream_in_child(job->log.stderr);
+ if (r < 0)
+ goto ERROR;
// Replace the context with a new one
r = pakfire_ctx_create(&ctx, NULL);
job->state = PAKFIRE_JOB_STATE_LAUNCHED;
// Setup standard output
- r = pipe2(job->log.stdout.fds, O_CLOEXEC);
+ r = pakfire_log_stream_create(&job->log.stdout, job->ctx, pakfire_job_stdout, job);
if (r < 0) {
- CTX_ERROR(job->ctx, "Could not setup standard output: %m\n");
+ CTX_ERROR(job->ctx, "Could not setup standard output: %s\n", strerror(-r));
return -errno;
}
// Setup standard error
- r = pipe2(job->log.stderr.fds, O_CLOEXEC);
+ r = pakfire_log_stream_create(&job->log.stderr, job->ctx, pakfire_job_stderr, job);
if (r < 0) {
- CTX_ERROR(job->ctx, "Could not setup standard error: %m\n");
+ CTX_ERROR(job->ctx, "Could not setup standard error: %s\n", strerror(-r));
return -errno;
}
if (r < 0)
goto ERROR;
- // Setup the log pipe
- j->log.stdout.fds[0] = -1;
- j->log.stdout.fds[1] = -1;
- j->log.stderr.fds[0] = -1;
- j->log.stderr.fds[1] = -1;
-
// Reconnect after one second
j->reconnect_holdoff = 1000000;
--- /dev/null
+/*#############################################################################
+# #
+# Pakfire - The IPFire package management system #
+# Copyright (C) 2024 Pakfire development team #
+# #
+# This program is free software: you can redistribute it and/or modify #
+# it under the terms of the GNU General Public License as published by #
+# the Free Software Foundation, either version 3 of the License, or #
+# (at your option) any later version. #
+# #
+# This program is distributed in the hope that it will be useful, #
+# but WITHOUT ANY WARRANTY; without even the implied warranty of #
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
+# GNU General Public License for more details. #
+# #
+# You should have received a copy of the GNU General Public License #
+# along with this program. If not, see <http://www.gnu.org/licenses/>. #
+# #
+#############################################################################*/
+
+#include <errno.h>
+#include <fcntl.h>
+
+#include <systemd/sd-event.h>
+
+#include <pakfire/ctx.h>
+#include <pakfire/log_stream.h>
+
+struct pakfire_log_stream {
+ struct pakfire_ctx* ctx;
+ int nrefs;
+
+ // Pipe
+ int pipe[2];
+
+ // FILE Handle
+ FILE* f;
+
+ // Event Source
+ sd_event_source* event;
+
+ // Callback
+ pakfire_log_stream_callback callback;
+ void* data;
+};
+
+static void pakfire_log_stream_free(struct pakfire_log_stream* stream) {
+ if (stream->event)
+ sd_event_source_unref(stream->event);
+ if (stream->f)
+ fclose(stream->f);
+
+ // Close the pipe
+ if (stream->pipe[0] >= 0)
+ close(stream->pipe[0]);
+ if (stream->pipe[1] >= 0)
+ close(stream->pipe[1]);
+
+ if (stream->ctx)
+ pakfire_ctx_unref(stream->ctx);
+ free(stream);
+}
+
+int pakfire_log_stream_create(struct pakfire_log_stream** stream, struct pakfire_ctx* ctx,
+ pakfire_log_stream_callback callback, void* data) {
+ struct pakfire_log_stream* s = NULL;
+ int r;
+
+ // Allocate a new object
+ s = calloc(1, sizeof(*s));
+ if (!s)
+ return -errno;
+
+ // Initialize the reference counter
+ s->nrefs = 1;
+
+ // Store a reference to the context
+ s->ctx = pakfire_ctx_ref(ctx);
+
+ // Store the callback & data
+ s->callback = callback;
+ s->data = data;
+
+ // Create a new pipe
+ r = pipe2(s->pipe, O_CLOEXEC);
+ if (r < 0) {
+ CTX_ERROR(s->ctx, "Could not create a pipe: %m\n");
+ r = -errno;
+ goto ERROR;
+ }
+
+ // Return the pointer
+ *stream = s;
+
+ return 0;
+
+ERROR:
+ if (s)
+ pakfire_log_stream_free(s);
+
+ return r;
+}
+
+struct pakfire_log_stream* pakfire_log_stream_ref(struct pakfire_log_stream* stream) {
+ ++stream->nrefs;
+
+ return stream;
+}
+
+struct pakfire_log_stream* pakfire_log_stream_unref(struct pakfire_log_stream* stream) {
+ if (--stream->nrefs > 0)
+ return stream;
+
+ pakfire_log_stream_free(stream);
+ return NULL;
+}
+
+static int __pakfire_log_stream(sd_event_source* s, int fd, uint32_t events, void* data) {
+ struct pakfire_log_stream* stream = data;
+
+ char* line = NULL;
+ size_t l = 0;
+ int r;
+
+ ssize_t bytes_read = 0;
+
+ // Read as many lines as possible
+ if (events & EPOLLIN) {
+ for (;;) {
+ bytes_read = getline(&line, &l, stream->f);
+ if (bytes_read < 0)
+ break;
+
+ // Log a message if we don't have a callback
+ if (!stream->callback) {
+ CTX_ERROR(stream->ctx, "Log stream has no callback set\n");
+ continue;
+ }
+
+ // Call the callback
+ r = stream->callback(stream, line, bytes_read, stream->data);
+ if (r)
+ return r;
+ }
+ }
+
+ // Handle if the child process has closed the file descriptor
+ if (events & EPOLLHUP) {
+ close(stream->pipe[0]);
+ stream->pipe[0] = -1;
+ }
+
+ return r;
+}
+
+/*
+ To be called from the parent process...
+*/
+int pakfire_log_stream_in_parent(struct pakfire_log_stream* stream, sd_event* loop) {
+ int r;
+
+ // Close the write end of the pipe
+ if (stream->pipe[1]) {
+ close(stream->pipe[1]);
+ stream->pipe[1] = -1;
+ }
+
+ // Register the file descriptor with the event loop
+ r = sd_event_add_io(loop, &stream->event, stream->pipe[0],
+ EPOLLIN|EPOLLHUP, __pakfire_log_stream, stream);
+ if (r < 0) {
+ CTX_ERROR(stream->ctx, "Could not register fd %d: %s\n", stream->pipe[0], strerror(-r));
+ return r;
+ }
+
+ // Create a file handle for easy line buffering
+ stream->f = fdopen(stream->pipe[0], "r");
+ if (!stream->f) {
+ CTX_ERROR(stream->ctx, "Could not open a file handle: %m\n");
+ return -errno;
+ }
+
+ return 0;
+}
+
+/*
+ To be called from the child process...
+*/
+int pakfire_log_stream_in_child(struct pakfire_log_stream* stream) {
+ // Close the read end of the pipe
+ if (stream->pipe[0]) {
+ close(stream->pipe[0]);
+ stream->pipe[0] = -1;
+ }
+
+ return 0;
+}
+
+int pakfire_log_stream_write(struct pakfire_log_stream* stream, const char* format, va_list args) {
+ // Fail if the pipe isn't open
+ if (stream->pipe[1] < 0)
+ return -EPIPE;
+
+ // Send the message into the pipe
+ return vdprintf(stream->pipe[1], format, args);
+}