]> git.ipfire.org Git - pakfire.git/commitdiff
logging: Create an abstraction to stream log lines from a child process
authorMichael Tremer <michael.tremer@ipfire.org>
Sat, 5 Oct 2024 16:31:32 +0000 (16:31 +0000)
committerMichael Tremer <michael.tremer@ipfire.org>
Sat, 5 Oct 2024 16:31:32 +0000 (16:31 +0000)
Signed-off-by: Michael Tremer <michael.tremer@ipfire.org>
Makefile.am
src/libpakfire/include/pakfire/log_stream.h [new file with mode: 0644]
src/libpakfire/job.c
src/libpakfire/log_stream.c [new file with mode: 0644]

index fab9f4e8af3c731854960f5f59352a53626222fb..9c42056817aea16f55c7ad215d0f7820c81ac191 100644 (file)
@@ -220,6 +220,7 @@ libpakfire_la_SOURCES = \
        src/libpakfire/key.c \
        src/libpakfire/linter.c \
        src/libpakfire/log_buffer.c \
+       src/libpakfire/log_stream.c \
        src/libpakfire/logging.c \
        src/libpakfire/mirror.c \
        src/libpakfire/mirrorlist.c \
@@ -270,6 +271,7 @@ pkginclude_HEADERS += \
        src/libpakfire/include/pakfire/key.h \
        src/libpakfire/include/pakfire/linter.h \
        src/libpakfire/include/pakfire/log_buffer.h \
+       src/libpakfire/include/pakfire/log_stream.h \
        src/libpakfire/include/pakfire/logging.h \
        src/libpakfire/include/pakfire/mirror.h \
        src/libpakfire/include/pakfire/mirrorlist.h \
diff --git a/src/libpakfire/include/pakfire/log_stream.h b/src/libpakfire/include/pakfire/log_stream.h
new file mode 100644 (file)
index 0000000..b98ea08
--- /dev/null
@@ -0,0 +1,48 @@
+/*#############################################################################
+#                                                                             #
+# Pakfire - The IPFire package management system                              #
+# Copyright (C) 2024 Pakfire development team                                 #
+#                                                                             #
+# This program is free software: you can redistribute it and/or modify        #
+# it under the terms of the GNU General Public License as published by        #
+# the Free Software Foundation, either version 3 of the License, or           #
+# (at your option) any later version.                                         #
+#                                                                             #
+# This program is distributed in the hope that it will be useful,             #
+# but WITHOUT ANY WARRANTY; without even the implied warranty of              #
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the               #
+# GNU General Public License for more details.                                #
+#                                                                             #
+# You should have received a copy of the GNU General Public License           #
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.       #
+#                                                                             #
+#############################################################################*/
+
+#ifndef PAKFIRE_LOG_STREAM_H
+#define PAKFIRE_LOG_STREAM_H
+
+#ifdef PAKFIRE_PRIVATE
+
+#include <stdarg.h>
+
+#include <systemd/sd-event.h>
+
+#include <pakfire/ctx.h>
+
+struct pakfire_log_stream;
+
+typedef int (*pakfire_log_stream_callback)(struct pakfire_log_stream* stream,
+       const char* line, size_t length, void* data);
+
+int pakfire_log_stream_create(struct pakfire_log_stream** stream, struct pakfire_ctx* ctx,
+       pakfire_log_stream_callback callback, void* data);
+struct pakfire_log_stream* pakfire_log_stream_ref(struct pakfire_log_stream* stream);
+struct pakfire_log_stream* pakfire_log_stream_unref(struct pakfire_log_stream* stream);
+
+int pakfire_log_stream_in_parent(struct pakfire_log_stream* stream, sd_event* loop);
+int pakfire_log_stream_in_child(struct pakfire_log_stream* stream);
+
+int pakfire_log_stream_write(struct pakfire_log_stream* stream, const char* format, va_list args);
+
+#endif /* PAKFIRE_PRIVATE */
+#endif /* PAKFIRE_LOG_STREAM_H */
index 100a00635698e22dc553ef366a5eac5eb11dd25b..daa259ea60a59c5a90ea1bfc9f8fe3b550281851 100644 (file)
 #include <pakfire/daemon.h>
 #include <pakfire/job.h>
 #include <pakfire/log_buffer.h>
+#include <pakfire/log_stream.h>
 #include <pakfire/logging.h>
 #include <pakfire/string.h>
 #include <pakfire/util.h>
 
-struct log {
-       FILE* f;
-       int fds[2];
-       sd_event_source* event;
-};
-
 struct pakfire_job {
        struct pakfire_ctx* ctx;
        int nrefs;
@@ -81,17 +76,15 @@ struct pakfire_job {
        // PID File Descriptor
        int pidfd;
 
-       // Pipes to read the log output of the child process
+       // Logging
        struct {
-               struct log stdout;
-               struct log stderr;
-
+               // Buffer
                struct pakfire_log_buffer* buffer;
-       } log;
 
-       // Events for reading logs from the child process
-       sd_event_source* stdout;
-       sd_event_source* stderr;
+               // Streams
+               struct pakfire_log_stream* stdout;
+               struct pakfire_log_stream* stderr;
+       } log;
 
        // Connection Timer and Holdoff Time
        sd_event_source* connect_timer;
@@ -237,10 +230,10 @@ static void pakfire_job_free(struct pakfire_job* job) {
        // Logging
        if (job->log.buffer)
                pakfire_log_buffer_unref(job->log.buffer);
-       if (job->log.stdout.event)
-               sd_event_source_unref(job->log.stdout.event);
-       if (job->log.stderr.event)
-               sd_event_source_unref(job->log.stderr.event);
+       if (job->log.stdout)
+               pakfire_log_stream_unref(job->log.stdout);
+       if (job->log.stderr)
+               pakfire_log_stream_unref(job->log.stderr);
 
        if (job->client)
                pakfire_httpclient_unref(job->client);
@@ -353,94 +346,20 @@ ERROR:
        return r;
 }
 
-static int __pakfire_job_log_forward(struct pakfire_job* job, int priority, uint32_t events, struct log* log) {
-       char* line = NULL;
-       ssize_t length = 0;
-       size_t l = 0;
-       int r;
-
-       // Read all lines
-       if (events & EPOLLIN) {
-               for (;;) {
-                       length = getline(&line, &l, log->f);
-                       if (length < 0)
-                               break;
-
-                       // Send the line to the build service
-                       r = pakfire_job_send_log(job, priority, line, length);
-                       if (r < 0)
-                               return r;
-
-                       // Enqueue the line in the log buffer
-                       r = pakfire_log_buffer_enqueue(job->log.buffer, priority, line, length);
-                       if (r < 0) {
-                               switch (-r) {
-                                       case ENOSPC:
-                                               CTX_DEBUG(job->ctx, "Log buffer is full\n");
-                                               break;
-
-                                       default:
-                                               CTX_ERROR(job->ctx, "Could not enqueue to the log buffer: %s\n", strerror(-r));
-                                               return r;
-                               }
-                       }
-               }
-       }
-
-       // Close the file descriptor
-       if (events & EPOLLHUP) {
-               close(log->fds[0]);
-               log->fds[0] = -1;
-       }
-
-       // Free the buffer
-       if (line)
-               free(line);
-
-       return 0;
-}
-
-static int pakfire_job_log_forward(sd_event_source* s, int fd, uint32_t events, void* data) {
+static int pakfire_job_stdout(struct pakfire_log_stream* stream,
+               const char* line, size_t length, void* data) {
        struct pakfire_job* job = data;
 
-       // Standard output
-       if (job->log.stdout.event == s)
-               return __pakfire_job_log_forward(job, LOG_INFO, events, &job->log.stdout);
-
-       // Standard error
-       else if (job->log.stderr.event == s)
-               return __pakfire_job_log_forward(job, LOG_ERR, events, &job->log.stderr);
-
-       // Otherwise we fail
-       CTX_ERROR(job->ctx, "Called for an unknown event\n");
-       return -EINVAL;
+       // Send the output to the build service
+       return pakfire_job_send_log(job, LOG_INFO, line, length);
 }
 
-static int pakfire_job_read_log(struct pakfire_job* job, struct log* log) {
-       int r;
-
-       // Close the write end of the log pipe
-       if (log->fds[1] >= 0) {
-               close(log->fds[1]);
-               log->fds[1] = -1;
-       }
-
-       // Register the file descriptor with the event loop
-       r = sd_event_add_io(job->loop, &log->event,
-                       log->fds[0], EPOLLIN, pakfire_job_log_forward, job);
-       if (r < 0) {
-               CTX_ERROR(job->ctx, "Could not register fd %d: %s\n", log->fds[0], strerror(-r));
-               return r;
-       }
-
-       // Create a file handle to read the input line by line
-       log->f = fdopen(log->fds[0], "r");
-       if (!log->f) {
-               CTX_ERROR(job->ctx, "Could not open a file handle: %m\n");
-               return -errno;
-       }
+static int pakfire_job_stderr(struct pakfire_log_stream* stream,
+               const char* line, size_t length, void* data) {
+       struct pakfire_job* job = data;
 
-       return 0;
+       // Send the output to the build service
+       return pakfire_job_send_log(job, LOG_ERR, line, length);
 }
 
 /*
@@ -458,12 +377,12 @@ static int pakfire_job_parent(struct pakfire_job* job) {
        }
 
        // Read the standard output
-       r = pakfire_job_read_log(job, &job->log.stdout);
+       r = pakfire_log_stream_in_parent(job->log.stdout, job->loop);
        if (r < 0)
                return r;
 
        // Read the standard error
-       r = pakfire_job_read_log(job, &job->log.stderr);
+       r = pakfire_log_stream_in_parent(job->log.stderr, job->loop);
        if (r < 0)
                return r;
 
@@ -473,30 +392,26 @@ static int pakfire_job_parent(struct pakfire_job* job) {
 static void pakfire_job_log(void* data, int priority, const char* file,
                int line, const char* fn, const char* format, va_list args) {
        struct pakfire_job* job = data;
-       int fd = -1;
        int r;
 
        // We only forward INFO & ERROR
        switch (priority) {
                case LOG_INFO:
-                       fd = job->log.stdout.fds[1];
+                       r = pakfire_log_stream_write(job->log.stdout, format, args);
+                       if (r < 0)
+                               return;
                        break;
 
                case LOG_ERR:
-                       fd = job->log.stderr.fds[1];
+                       r = pakfire_log_stream_write(job->log.stderr, format, args);
+                       if (r < 0)
+                               return;
                        break;
 
                default:
                        break;
        }
 
-       // Forward the log message
-       if (fd >= 0) {
-               r = vdprintf(fd, format, args);
-               if (r < 0)
-                       return;
-       }
-
        // Pass everything on to syslog
        pakfire_log_syslog(NULL, priority, file, line, fn, format, args);
 }
@@ -517,15 +432,15 @@ static int pakfire_job_child(struct pakfire_job* job) {
        // Format the job ID as string
        uuid_unparse(job->job_id, job_id);
 
-       // Close the read ends of the log pipes
-       if (job->log.stdout.fds[0] >= 0) {
-               close(job->log.stdout.fds[0]);
-               job->log.stdout.fds[0] = -1;
-       }
-       if (job->log.stderr.fds[0] >= 0) {
-               close(job->log.stderr.fds[0]);
-               job->log.stderr.fds[0] = -1;
-       }
+       // Setup the standard output log stream
+       r = pakfire_log_stream_in_child(job->log.stdout);
+       if (r < 0)
+               goto ERROR;
+
+       // Setup the standard error log stream
+       r = pakfire_log_stream_in_child(job->log.stderr);
+       if (r < 0)
+               goto ERROR;
 
        // Replace the context with a new one
        r = pakfire_ctx_create(&ctx, NULL);
@@ -616,16 +531,16 @@ static int pakfire_job_launch(struct pakfire_job* job) {
        job->state = PAKFIRE_JOB_STATE_LAUNCHED;
 
        // Setup standard output
-       r = pipe2(job->log.stdout.fds, O_CLOEXEC);
+       r = pakfire_log_stream_create(&job->log.stdout, job->ctx, pakfire_job_stdout, job);
        if (r < 0) {
-               CTX_ERROR(job->ctx, "Could not setup standard output: %m\n");
+               CTX_ERROR(job->ctx, "Could not setup standard output: %s\n", strerror(-r));
                return -errno;
        }
 
        // Setup standard error
-       r = pipe2(job->log.stderr.fds, O_CLOEXEC);
+       r = pakfire_log_stream_create(&job->log.stderr, job->ctx, pakfire_job_stderr, job);
        if (r < 0) {
-               CTX_ERROR(job->ctx, "Could not setup standard error: %m\n");
+               CTX_ERROR(job->ctx, "Could not setup standard error: %s\n", strerror(-r));
                return -errno;
        }
 
@@ -800,12 +715,6 @@ int pakfire_job_create(struct pakfire_job** job, struct pakfire_ctx* ctx,
        if (r < 0)
                goto ERROR;
 
-       // Setup the log pipe
-       j->log.stdout.fds[0] = -1;
-       j->log.stdout.fds[1] = -1;
-       j->log.stderr.fds[0] = -1;
-       j->log.stderr.fds[1] = -1;
-
        // Reconnect after one second
        j->reconnect_holdoff = 1000000;
 
diff --git a/src/libpakfire/log_stream.c b/src/libpakfire/log_stream.c
new file mode 100644 (file)
index 0000000..77b6841
--- /dev/null
@@ -0,0 +1,206 @@
+/*#############################################################################
+#                                                                             #
+# Pakfire - The IPFire package management system                              #
+# Copyright (C) 2024 Pakfire development team                                 #
+#                                                                             #
+# This program is free software: you can redistribute it and/or modify        #
+# it under the terms of the GNU General Public License as published by        #
+# the Free Software Foundation, either version 3 of the License, or           #
+# (at your option) any later version.                                         #
+#                                                                             #
+# This program is distributed in the hope that it will be useful,             #
+# but WITHOUT ANY WARRANTY; without even the implied warranty of              #
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the               #
+# GNU General Public License for more details.                                #
+#                                                                             #
+# You should have received a copy of the GNU General Public License           #
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.       #
+#                                                                             #
+#############################################################################*/
+
+#include <errno.h>
+#include <fcntl.h>
+
+#include <systemd/sd-event.h>
+
+#include <pakfire/ctx.h>
+#include <pakfire/log_stream.h>
+
+struct pakfire_log_stream {
+       struct pakfire_ctx* ctx;
+       int nrefs;
+
+       // Pipe
+       int pipe[2];
+
+       // FILE Handle
+       FILE* f;
+
+       // Event Source
+       sd_event_source* event;
+
+       // Callback
+       pakfire_log_stream_callback callback;
+       void* data;
+};
+
+static void pakfire_log_stream_free(struct pakfire_log_stream* stream) {
+       if (stream->event)
+               sd_event_source_unref(stream->event);
+       if (stream->f)
+               fclose(stream->f);
+
+       // Close the pipe
+       if (stream->pipe[0] >= 0)
+               close(stream->pipe[0]);
+       if (stream->pipe[1] >= 0)
+               close(stream->pipe[1]);
+
+       if (stream->ctx)
+               pakfire_ctx_unref(stream->ctx);
+       free(stream);
+}
+
+int pakfire_log_stream_create(struct pakfire_log_stream** stream, struct pakfire_ctx* ctx,
+               pakfire_log_stream_callback callback, void* data) {
+       struct pakfire_log_stream* s = NULL;
+       int r;
+
+       // Allocate a new object
+       s = calloc(1, sizeof(*s));
+       if (!s)
+               return -errno;
+
+       // Initialize the reference counter
+       s->nrefs = 1;
+
+       // Store a reference to the context
+       s->ctx = pakfire_ctx_ref(ctx);
+
+       // Store the callback & data
+       s->callback = callback;
+       s->data = data;
+
+       // Create a new pipe
+       r = pipe2(s->pipe, O_CLOEXEC);
+       if (r < 0) {
+               CTX_ERROR(s->ctx, "Could not create a pipe: %m\n");
+               r = -errno;
+               goto ERROR;
+       }
+
+       // Return the pointer
+       *stream = s;
+
+       return 0;
+
+ERROR:
+       if (s)
+               pakfire_log_stream_free(s);
+
+       return r;
+}
+
+struct pakfire_log_stream* pakfire_log_stream_ref(struct pakfire_log_stream* stream) {
+       ++stream->nrefs;
+
+       return stream;
+}
+
+struct pakfire_log_stream* pakfire_log_stream_unref(struct pakfire_log_stream* stream) {
+       if (--stream->nrefs > 0)
+               return stream;
+
+       pakfire_log_stream_free(stream);
+       return NULL;
+}
+
+static int __pakfire_log_stream(sd_event_source* s, int fd, uint32_t events, void* data) {
+       struct pakfire_log_stream* stream = data;
+
+       char* line = NULL;
+       size_t l = 0;
+       int r;
+
+       ssize_t bytes_read = 0;
+
+       // Read as many lines as possible
+       if (events & EPOLLIN) {
+               for (;;) {
+                       bytes_read = getline(&line, &l, stream->f);
+                       if (bytes_read < 0)
+                               break;
+
+                       // Log a message if we don't have a callback
+                       if (!stream->callback) {
+                               CTX_ERROR(stream->ctx, "Log stream has no callback set\n");
+                               continue;
+                       }
+
+                       // Call the callback
+                       r = stream->callback(stream, line, bytes_read, stream->data);
+                       if (r)
+                               return r;
+               }
+       }
+
+       // Handle if the child process has closed the file descriptor
+       if (events & EPOLLHUP) {
+               close(stream->pipe[0]);
+               stream->pipe[0] = -1;
+       }
+
+       return r;
+}
+
+/*
+       To be called from the parent process...
+*/
+int pakfire_log_stream_in_parent(struct pakfire_log_stream* stream, sd_event* loop) {
+       int r;
+
+       // Close the write end of the pipe
+       if (stream->pipe[1]) {
+               close(stream->pipe[1]);
+               stream->pipe[1] = -1;
+       }
+
+       // Register the file descriptor with the event loop
+       r = sd_event_add_io(loop, &stream->event, stream->pipe[0],
+                       EPOLLIN|EPOLLHUP, __pakfire_log_stream, stream);
+       if (r < 0) {
+               CTX_ERROR(stream->ctx, "Could not register fd %d: %s\n", stream->pipe[0], strerror(-r));
+               return r;
+       }
+
+       // Create a file handle for easy line buffering
+       stream->f = fdopen(stream->pipe[0], "r");
+       if (!stream->f) {
+               CTX_ERROR(stream->ctx, "Could not open a file handle: %m\n");
+               return -errno;
+       }
+
+       return 0;
+}
+
+/*
+       To be called from the child process...
+*/
+int pakfire_log_stream_in_child(struct pakfire_log_stream* stream) {
+       // Close the read end of the pipe
+       if (stream->pipe[0]) {
+               close(stream->pipe[0]);
+               stream->pipe[0] = -1;
+       }
+
+       return 0;
+}
+
+int pakfire_log_stream_write(struct pakfire_log_stream* stream, const char* format, va_list args) {
+       // Fail if the pipe isn't open
+       if (stream->pipe[1] < 0)
+               return -EPIPE;
+
+       // Send the message into the pipe
+       return vdprintf(stream->pipe[1], format, args);
+}