From: Michael Tremer Date: Sat, 5 Oct 2024 16:31:32 +0000 (+0000) Subject: logging: Create an abstraction to stream log lines from a child process X-Git-Tag: 0.9.30~1148 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=0e6935ccd5a85775e80ee91199bcebc36d10806c;p=pakfire.git logging: Create an abstraction to stream log lines from a child process Signed-off-by: Michael Tremer --- diff --git a/Makefile.am b/Makefile.am index fab9f4e8a..9c4205681 100644 --- a/Makefile.am +++ b/Makefile.am @@ -220,6 +220,7 @@ libpakfire_la_SOURCES = \ src/libpakfire/key.c \ src/libpakfire/linter.c \ src/libpakfire/log_buffer.c \ + src/libpakfire/log_stream.c \ src/libpakfire/logging.c \ src/libpakfire/mirror.c \ src/libpakfire/mirrorlist.c \ @@ -270,6 +271,7 @@ pkginclude_HEADERS += \ src/libpakfire/include/pakfire/key.h \ src/libpakfire/include/pakfire/linter.h \ src/libpakfire/include/pakfire/log_buffer.h \ + src/libpakfire/include/pakfire/log_stream.h \ src/libpakfire/include/pakfire/logging.h \ src/libpakfire/include/pakfire/mirror.h \ src/libpakfire/include/pakfire/mirrorlist.h \ diff --git a/src/libpakfire/include/pakfire/log_stream.h b/src/libpakfire/include/pakfire/log_stream.h new file mode 100644 index 000000000..b98ea08c7 --- /dev/null +++ b/src/libpakfire/include/pakfire/log_stream.h @@ -0,0 +1,48 @@ +/*############################################################################# +# # +# Pakfire - The IPFire package management system # +# Copyright (C) 2024 Pakfire development team # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see . # +# # +#############################################################################*/ + +#ifndef PAKFIRE_LOG_STREAM_H +#define PAKFIRE_LOG_STREAM_H + +#ifdef PAKFIRE_PRIVATE + +#include + +#include + +#include + +struct pakfire_log_stream; + +typedef int (*pakfire_log_stream_callback)(struct pakfire_log_stream* stream, + const char* line, size_t length, void* data); + +int pakfire_log_stream_create(struct pakfire_log_stream** stream, struct pakfire_ctx* ctx, + pakfire_log_stream_callback callback, void* data); +struct pakfire_log_stream* pakfire_log_stream_ref(struct pakfire_log_stream* stream); +struct pakfire_log_stream* pakfire_log_stream_unref(struct pakfire_log_stream* stream); + +int pakfire_log_stream_in_parent(struct pakfire_log_stream* stream, sd_event* loop); +int pakfire_log_stream_in_child(struct pakfire_log_stream* stream); + +int pakfire_log_stream_write(struct pakfire_log_stream* stream, const char* format, va_list args); + +#endif /* PAKFIRE_PRIVATE */ +#endif /* PAKFIRE_LOG_STREAM_H */ diff --git a/src/libpakfire/job.c b/src/libpakfire/job.c index 100a00635..daa259ea6 100644 --- a/src/libpakfire/job.c +++ b/src/libpakfire/job.c @@ -36,16 +36,11 @@ #include #include #include +#include #include #include #include -struct log { - FILE* f; - int fds[2]; - sd_event_source* event; -}; - struct pakfire_job { struct pakfire_ctx* ctx; int nrefs; @@ -81,17 +76,15 @@ struct pakfire_job { // PID File Descriptor int pidfd; - // Pipes to read the log output of the child process + // Logging struct { - struct log stdout; - struct log stderr; - + // Buffer struct pakfire_log_buffer* buffer; - } log; - // Events for reading logs from the child process - sd_event_source* stdout; - sd_event_source* stderr; + // Streams + struct pakfire_log_stream* stdout; + struct pakfire_log_stream* stderr; + } log; // Connection Timer and Holdoff Time sd_event_source* connect_timer; @@ -237,10 +230,10 @@ static void pakfire_job_free(struct pakfire_job* job) { // Logging if (job->log.buffer) pakfire_log_buffer_unref(job->log.buffer); - if (job->log.stdout.event) - sd_event_source_unref(job->log.stdout.event); - if (job->log.stderr.event) - sd_event_source_unref(job->log.stderr.event); + if (job->log.stdout) + pakfire_log_stream_unref(job->log.stdout); + if (job->log.stderr) + pakfire_log_stream_unref(job->log.stderr); if (job->client) pakfire_httpclient_unref(job->client); @@ -353,94 +346,20 @@ ERROR: return r; } -static int __pakfire_job_log_forward(struct pakfire_job* job, int priority, uint32_t events, struct log* log) { - char* line = NULL; - ssize_t length = 0; - size_t l = 0; - int r; - - // Read all lines - if (events & EPOLLIN) { - for (;;) { - length = getline(&line, &l, log->f); - if (length < 0) - break; - - // Send the line to the build service - r = pakfire_job_send_log(job, priority, line, length); - if (r < 0) - return r; - - // Enqueue the line in the log buffer - r = pakfire_log_buffer_enqueue(job->log.buffer, priority, line, length); - if (r < 0) { - switch (-r) { - case ENOSPC: - CTX_DEBUG(job->ctx, "Log buffer is full\n"); - break; - - default: - CTX_ERROR(job->ctx, "Could not enqueue to the log buffer: %s\n", strerror(-r)); - return r; - } - } - } - } - - // Close the file descriptor - if (events & EPOLLHUP) { - close(log->fds[0]); - log->fds[0] = -1; - } - - // Free the buffer - if (line) - free(line); - - return 0; -} - -static int pakfire_job_log_forward(sd_event_source* s, int fd, uint32_t events, void* data) { +static int pakfire_job_stdout(struct pakfire_log_stream* stream, + const char* line, size_t length, void* data) { struct pakfire_job* job = data; - // Standard output - if (job->log.stdout.event == s) - return __pakfire_job_log_forward(job, LOG_INFO, events, &job->log.stdout); - - // Standard error - else if (job->log.stderr.event == s) - return __pakfire_job_log_forward(job, LOG_ERR, events, &job->log.stderr); - - // Otherwise we fail - CTX_ERROR(job->ctx, "Called for an unknown event\n"); - return -EINVAL; + // Send the output to the build service + return pakfire_job_send_log(job, LOG_INFO, line, length); } -static int pakfire_job_read_log(struct pakfire_job* job, struct log* log) { - int r; - - // Close the write end of the log pipe - if (log->fds[1] >= 0) { - close(log->fds[1]); - log->fds[1] = -1; - } - - // Register the file descriptor with the event loop - r = sd_event_add_io(job->loop, &log->event, - log->fds[0], EPOLLIN, pakfire_job_log_forward, job); - if (r < 0) { - CTX_ERROR(job->ctx, "Could not register fd %d: %s\n", log->fds[0], strerror(-r)); - return r; - } - - // Create a file handle to read the input line by line - log->f = fdopen(log->fds[0], "r"); - if (!log->f) { - CTX_ERROR(job->ctx, "Could not open a file handle: %m\n"); - return -errno; - } +static int pakfire_job_stderr(struct pakfire_log_stream* stream, + const char* line, size_t length, void* data) { + struct pakfire_job* job = data; - return 0; + // Send the output to the build service + return pakfire_job_send_log(job, LOG_ERR, line, length); } /* @@ -458,12 +377,12 @@ static int pakfire_job_parent(struct pakfire_job* job) { } // Read the standard output - r = pakfire_job_read_log(job, &job->log.stdout); + r = pakfire_log_stream_in_parent(job->log.stdout, job->loop); if (r < 0) return r; // Read the standard error - r = pakfire_job_read_log(job, &job->log.stderr); + r = pakfire_log_stream_in_parent(job->log.stderr, job->loop); if (r < 0) return r; @@ -473,30 +392,26 @@ static int pakfire_job_parent(struct pakfire_job* job) { static void pakfire_job_log(void* data, int priority, const char* file, int line, const char* fn, const char* format, va_list args) { struct pakfire_job* job = data; - int fd = -1; int r; // We only forward INFO & ERROR switch (priority) { case LOG_INFO: - fd = job->log.stdout.fds[1]; + r = pakfire_log_stream_write(job->log.stdout, format, args); + if (r < 0) + return; break; case LOG_ERR: - fd = job->log.stderr.fds[1]; + r = pakfire_log_stream_write(job->log.stderr, format, args); + if (r < 0) + return; break; default: break; } - // Forward the log message - if (fd >= 0) { - r = vdprintf(fd, format, args); - if (r < 0) - return; - } - // Pass everything on to syslog pakfire_log_syslog(NULL, priority, file, line, fn, format, args); } @@ -517,15 +432,15 @@ static int pakfire_job_child(struct pakfire_job* job) { // Format the job ID as string uuid_unparse(job->job_id, job_id); - // Close the read ends of the log pipes - if (job->log.stdout.fds[0] >= 0) { - close(job->log.stdout.fds[0]); - job->log.stdout.fds[0] = -1; - } - if (job->log.stderr.fds[0] >= 0) { - close(job->log.stderr.fds[0]); - job->log.stderr.fds[0] = -1; - } + // Setup the standard output log stream + r = pakfire_log_stream_in_child(job->log.stdout); + if (r < 0) + goto ERROR; + + // Setup the standard error log stream + r = pakfire_log_stream_in_child(job->log.stderr); + if (r < 0) + goto ERROR; // Replace the context with a new one r = pakfire_ctx_create(&ctx, NULL); @@ -616,16 +531,16 @@ static int pakfire_job_launch(struct pakfire_job* job) { job->state = PAKFIRE_JOB_STATE_LAUNCHED; // Setup standard output - r = pipe2(job->log.stdout.fds, O_CLOEXEC); + r = pakfire_log_stream_create(&job->log.stdout, job->ctx, pakfire_job_stdout, job); if (r < 0) { - CTX_ERROR(job->ctx, "Could not setup standard output: %m\n"); + CTX_ERROR(job->ctx, "Could not setup standard output: %s\n", strerror(-r)); return -errno; } // Setup standard error - r = pipe2(job->log.stderr.fds, O_CLOEXEC); + r = pakfire_log_stream_create(&job->log.stderr, job->ctx, pakfire_job_stderr, job); if (r < 0) { - CTX_ERROR(job->ctx, "Could not setup standard error: %m\n"); + CTX_ERROR(job->ctx, "Could not setup standard error: %s\n", strerror(-r)); return -errno; } @@ -800,12 +715,6 @@ int pakfire_job_create(struct pakfire_job** job, struct pakfire_ctx* ctx, if (r < 0) goto ERROR; - // Setup the log pipe - j->log.stdout.fds[0] = -1; - j->log.stdout.fds[1] = -1; - j->log.stderr.fds[0] = -1; - j->log.stderr.fds[1] = -1; - // Reconnect after one second j->reconnect_holdoff = 1000000; diff --git a/src/libpakfire/log_stream.c b/src/libpakfire/log_stream.c new file mode 100644 index 000000000..77b684196 --- /dev/null +++ b/src/libpakfire/log_stream.c @@ -0,0 +1,206 @@ +/*############################################################################# +# # +# Pakfire - The IPFire package management system # +# Copyright (C) 2024 Pakfire development team # +# # +# This program is free software: you can redistribute it and/or modify # +# it under the terms of the GNU General Public License as published by # +# the Free Software Foundation, either version 3 of the License, or # +# (at your option) any later version. # +# # +# This program is distributed in the hope that it will be useful, # +# but WITHOUT ANY WARRANTY; without even the implied warranty of # +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # +# GNU General Public License for more details. # +# # +# You should have received a copy of the GNU General Public License # +# along with this program. If not, see . # +# # +#############################################################################*/ + +#include +#include + +#include + +#include +#include + +struct pakfire_log_stream { + struct pakfire_ctx* ctx; + int nrefs; + + // Pipe + int pipe[2]; + + // FILE Handle + FILE* f; + + // Event Source + sd_event_source* event; + + // Callback + pakfire_log_stream_callback callback; + void* data; +}; + +static void pakfire_log_stream_free(struct pakfire_log_stream* stream) { + if (stream->event) + sd_event_source_unref(stream->event); + if (stream->f) + fclose(stream->f); + + // Close the pipe + if (stream->pipe[0] >= 0) + close(stream->pipe[0]); + if (stream->pipe[1] >= 0) + close(stream->pipe[1]); + + if (stream->ctx) + pakfire_ctx_unref(stream->ctx); + free(stream); +} + +int pakfire_log_stream_create(struct pakfire_log_stream** stream, struct pakfire_ctx* ctx, + pakfire_log_stream_callback callback, void* data) { + struct pakfire_log_stream* s = NULL; + int r; + + // Allocate a new object + s = calloc(1, sizeof(*s)); + if (!s) + return -errno; + + // Initialize the reference counter + s->nrefs = 1; + + // Store a reference to the context + s->ctx = pakfire_ctx_ref(ctx); + + // Store the callback & data + s->callback = callback; + s->data = data; + + // Create a new pipe + r = pipe2(s->pipe, O_CLOEXEC); + if (r < 0) { + CTX_ERROR(s->ctx, "Could not create a pipe: %m\n"); + r = -errno; + goto ERROR; + } + + // Return the pointer + *stream = s; + + return 0; + +ERROR: + if (s) + pakfire_log_stream_free(s); + + return r; +} + +struct pakfire_log_stream* pakfire_log_stream_ref(struct pakfire_log_stream* stream) { + ++stream->nrefs; + + return stream; +} + +struct pakfire_log_stream* pakfire_log_stream_unref(struct pakfire_log_stream* stream) { + if (--stream->nrefs > 0) + return stream; + + pakfire_log_stream_free(stream); + return NULL; +} + +static int __pakfire_log_stream(sd_event_source* s, int fd, uint32_t events, void* data) { + struct pakfire_log_stream* stream = data; + + char* line = NULL; + size_t l = 0; + int r; + + ssize_t bytes_read = 0; + + // Read as many lines as possible + if (events & EPOLLIN) { + for (;;) { + bytes_read = getline(&line, &l, stream->f); + if (bytes_read < 0) + break; + + // Log a message if we don't have a callback + if (!stream->callback) { + CTX_ERROR(stream->ctx, "Log stream has no callback set\n"); + continue; + } + + // Call the callback + r = stream->callback(stream, line, bytes_read, stream->data); + if (r) + return r; + } + } + + // Handle if the child process has closed the file descriptor + if (events & EPOLLHUP) { + close(stream->pipe[0]); + stream->pipe[0] = -1; + } + + return r; +} + +/* + To be called from the parent process... +*/ +int pakfire_log_stream_in_parent(struct pakfire_log_stream* stream, sd_event* loop) { + int r; + + // Close the write end of the pipe + if (stream->pipe[1]) { + close(stream->pipe[1]); + stream->pipe[1] = -1; + } + + // Register the file descriptor with the event loop + r = sd_event_add_io(loop, &stream->event, stream->pipe[0], + EPOLLIN|EPOLLHUP, __pakfire_log_stream, stream); + if (r < 0) { + CTX_ERROR(stream->ctx, "Could not register fd %d: %s\n", stream->pipe[0], strerror(-r)); + return r; + } + + // Create a file handle for easy line buffering + stream->f = fdopen(stream->pipe[0], "r"); + if (!stream->f) { + CTX_ERROR(stream->ctx, "Could not open a file handle: %m\n"); + return -errno; + } + + return 0; +} + +/* + To be called from the child process... +*/ +int pakfire_log_stream_in_child(struct pakfire_log_stream* stream) { + // Close the read end of the pipe + if (stream->pipe[0]) { + close(stream->pipe[0]); + stream->pipe[0] = -1; + } + + return 0; +} + +int pakfire_log_stream_write(struct pakfire_log_stream* stream, const char* format, va_list args) { + // Fail if the pipe isn't open + if (stream->pipe[1] < 0) + return -EPIPE; + + // Send the message into the pipe + return vdprintf(stream->pipe[1], format, args); +}