From: Michael Tremer Date: Sat, 22 Mar 2025 18:04:22 +0000 (+0000) Subject: log stream: Use the new buffer implementation X-Git-Url: http://git.ipfire.org/?a=commitdiff_plain;h=77695b7aa7f9d95bad77a01085511c5a34439183;p=pakfire.git log stream: Use the new buffer implementation Signed-off-by: Michael Tremer --- diff --git a/src/pakfire/log_stream.c b/src/pakfire/log_stream.c index e46158eb..de507c97 100644 --- a/src/pakfire/log_stream.c +++ b/src/pakfire/log_stream.c @@ -24,11 +24,15 @@ #include +#include #include #include #define MAX_LINE_LENGTH 16384 +// Never let the buffer grow larger than this +#define MAX_BUFFER_LENGTH 1024576 // 1 MiB + struct pakfire_log_stream { struct pakfire_ctx* ctx; int nrefs; @@ -37,8 +41,7 @@ struct pakfire_log_stream { int pipe[2]; // Buffer - char buffer[64 * 1024]; - size_t buffered; + struct pakfire_buffer buffer; // Event Source sd_event_source* event; @@ -55,6 +58,9 @@ static void pakfire_log_stream_free(struct pakfire_log_stream* stream) { // Close the pipe pakfire_log_stream_close(stream); + // Free the buffer + pakfire_buffer_free(&stream->buffer); + if (stream->ctx) pakfire_ctx_unref(stream->ctx); free(stream); @@ -80,6 +86,9 @@ int pakfire_log_stream_create(struct pakfire_log_stream** stream, struct pakfire s->callback = callback; s->data = data; + // Set the maximum buffer length + s->buffer.max_length = MAX_BUFFER_LENGTH; + // Create a new pipe r = pipe2(s->pipe, O_CLOEXEC); if (r < 0) { @@ -90,12 +99,10 @@ int pakfire_log_stream_create(struct pakfire_log_stream** stream, struct pakfire // Return the pointer *stream = s; - return 0; ERROR: - if (s) - pakfire_log_stream_free(s); + pakfire_log_stream_free(s); return r; } @@ -114,7 +121,6 @@ struct pakfire_log_stream* pakfire_log_stream_unref(struct pakfire_log_stream* s return NULL; } - #define pakfire_log_stream_sanitize_line(self, line, buffer, buffer_length) \ __pakfire_log_stream_sanitize_line(self, line, sizeof(line), buffer, buffer_length) @@ -231,26 +237,9 @@ static int __pakfire_log_stream_sanitize_line(struct pakfire_log_stream* self, return cursor; } -static int pakfire_log_stream_fill_buffer(struct pakfire_log_stream* stream, int fd) { - ssize_t bytes_read; - - if (stream->buffered >= sizeof(stream->buffer)) - return -EAGAIN; - - // Read into the buffer - bytes_read = read(fd, stream->buffer + stream->buffered, sizeof(stream->buffer) - stream->buffered); - if (bytes_read < 0) - return bytes_read; - - stream->buffered += bytes_read; - - return 0; -} - static int pakfire_log_stream_drain_buffer(struct pakfire_log_stream* stream) { char line[MAX_LINE_LENGTH] = {}; - const char* eol = NULL; - size_t length; + size_t length = 0; int r; // Log a message if we don't have a callback @@ -261,20 +250,12 @@ static int pakfire_log_stream_drain_buffer(struct pakfire_log_stream* stream) { for (;;) { // Find the next line - eol = memchr(stream->buffer, '\n', stream->buffered); - if (!eol) { - if (stream->buffered < sizeof(stream->buffer)) - return 0; - } - - // Determine the length of the line - if (eol) - length = eol - stream->buffer + 1; - else - length = stream->buffered; + length = pakfire_buffer_find_line(&stream->buffer); + if (!length) + break; // Sanitize the line - r = pakfire_log_stream_sanitize_line(stream, line, stream->buffer, length); + r = pakfire_log_stream_sanitize_line(stream, line, stream->buffer.data, length); if (r < 0) return r; @@ -283,9 +264,11 @@ static int pakfire_log_stream_drain_buffer(struct pakfire_log_stream* stream) { if (r < 0) return r; - memmove(stream->buffer, stream->buffer + length, stream->buffered - length); - stream->buffered -= length; + // Consume the data + pakfire_buffer_consume(&stream->buffer, length); } + + return 0; } static int __pakfire_log_stream(sd_event_source* s, int fd, uint32_t events, void* data) { @@ -294,7 +277,7 @@ static int __pakfire_log_stream(sd_event_source* s, int fd, uint32_t events, voi if (events & EPOLLIN) { // Fill the buffer - r = pakfire_log_stream_fill_buffer(stream, fd); + r = pakfire_buffer_append_from_fd(&stream->buffer, fd); if (r < 0) return r; @@ -409,19 +392,10 @@ int pakfire_log_stream_pty(struct pakfire_ctx* ctx, void* data, const char* buff struct pakfire_log_stream* self = data; int r; - size_t free_space = sizeof(self->buffer) - self->buffered; - - // Ask to be called again if we don't have any free space - if (free_space == 0) - return -EAGAIN; - - // Limit to the total buffer size - if (free_space < length) - length = free_space; - - // Copy the data into the buffer - memcpy(self->buffer + self->buffered, buffer, length); - self->buffered += length; + // Append the data to the buffer + r = pakfire_buffer_append(&self->buffer, buffer, length); + if (r < 0) + return r; // Drain the buffer r = pakfire_log_stream_drain_buffer(self);