]> git.ipfire.org Git - pakfire.git/commitdiff
log stream: Use the new buffer implementation
authorMichael Tremer <michael.tremer@ipfire.org>
Sat, 22 Mar 2025 18:04:22 +0000 (18:04 +0000)
committerMichael Tremer <michael.tremer@ipfire.org>
Sat, 22 Mar 2025 18:04:22 +0000 (18:04 +0000)
Signed-off-by: Michael Tremer <michael.tremer@ipfire.org>
src/pakfire/log_stream.c

index e46158eb448ebe98f6529ca1c4849db08f9b14fa..de507c9725d5cf274e7347f3cd88867685556d9b 100644 (file)
 
 #include <systemd/sd-event.h>
 
+#include <pakfire/buffer.h>
 #include <pakfire/ctx.h>
 #include <pakfire/log_stream.h>
 
 #define MAX_LINE_LENGTH 16384
 
+// Never let the buffer grow larger than this
+#define MAX_BUFFER_LENGTH 1024576 // 1 MiB
+
 struct pakfire_log_stream {
        struct pakfire_ctx* ctx;
        int nrefs;
@@ -37,8 +41,7 @@ struct pakfire_log_stream {
        int pipe[2];
 
        // Buffer
-       char buffer[64 * 1024];
-       size_t buffered;
+       struct pakfire_buffer buffer;
 
        // Event Source
        sd_event_source* event;
@@ -55,6 +58,9 @@ static void pakfire_log_stream_free(struct pakfire_log_stream* stream) {
        // Close the pipe
        pakfire_log_stream_close(stream);
 
+       // Free the buffer
+       pakfire_buffer_free(&stream->buffer);
+
        if (stream->ctx)
                pakfire_ctx_unref(stream->ctx);
        free(stream);
@@ -80,6 +86,9 @@ int pakfire_log_stream_create(struct pakfire_log_stream** stream, struct pakfire
        s->callback = callback;
        s->data = data;
 
+       // Set the maximum buffer length
+       s->buffer.max_length = MAX_BUFFER_LENGTH;
+
        // Create a new pipe
        r = pipe2(s->pipe, O_CLOEXEC);
        if (r < 0) {
@@ -90,12 +99,10 @@ int pakfire_log_stream_create(struct pakfire_log_stream** stream, struct pakfire
 
        // Return the pointer
        *stream = s;
-
        return 0;
 
 ERROR:
-       if (s)
-               pakfire_log_stream_free(s);
+       pakfire_log_stream_free(s);
 
        return r;
 }
@@ -114,7 +121,6 @@ struct pakfire_log_stream* pakfire_log_stream_unref(struct pakfire_log_stream* s
        return NULL;
 }
 
-
 #define pakfire_log_stream_sanitize_line(self, line, buffer, buffer_length) \
        __pakfire_log_stream_sanitize_line(self, line, sizeof(line), buffer, buffer_length)
 
@@ -231,26 +237,9 @@ static int __pakfire_log_stream_sanitize_line(struct pakfire_log_stream* self,
        return cursor;
 }
 
-static int pakfire_log_stream_fill_buffer(struct pakfire_log_stream* stream, int fd) {
-       ssize_t bytes_read;
-
-       if (stream->buffered >= sizeof(stream->buffer))
-               return -EAGAIN;
-
-       // Read into the buffer
-       bytes_read = read(fd, stream->buffer + stream->buffered, sizeof(stream->buffer) - stream->buffered);
-       if (bytes_read < 0)
-               return bytes_read;
-
-       stream->buffered += bytes_read;
-
-       return 0;
-}
-
 static int pakfire_log_stream_drain_buffer(struct pakfire_log_stream* stream) {
        char line[MAX_LINE_LENGTH] = {};
-       const char* eol = NULL;
-       size_t length;
+       size_t length = 0;
        int r;
 
        // Log a message if we don't have a callback
@@ -261,20 +250,12 @@ static int pakfire_log_stream_drain_buffer(struct pakfire_log_stream* stream) {
 
        for (;;) {
                // Find the next line
-               eol = memchr(stream->buffer, '\n', stream->buffered);
-               if (!eol) {
-                       if (stream->buffered < sizeof(stream->buffer))
-                               return 0;
-               }
-
-               // Determine the length of the line
-               if (eol)
-                       length = eol - stream->buffer + 1;
-               else
-                       length = stream->buffered;
+               length = pakfire_buffer_find_line(&stream->buffer);
+               if (!length)
+                       break;
 
                // Sanitize the line
-               r = pakfire_log_stream_sanitize_line(stream, line, stream->buffer, length);
+               r = pakfire_log_stream_sanitize_line(stream, line, stream->buffer.data, length);
                if (r < 0)
                        return r;
 
@@ -283,9 +264,11 @@ static int pakfire_log_stream_drain_buffer(struct pakfire_log_stream* stream) {
                if (r < 0)
                        return r;
 
-               memmove(stream->buffer, stream->buffer + length, stream->buffered - length);
-               stream->buffered -= length;
+               // Consume the data
+               pakfire_buffer_consume(&stream->buffer, length);
        }
+
+       return 0;
 }
 
 static int __pakfire_log_stream(sd_event_source* s, int fd, uint32_t events, void* data) {
@@ -294,7 +277,7 @@ static int __pakfire_log_stream(sd_event_source* s, int fd, uint32_t events, voi
 
        if (events & EPOLLIN) {
                // Fill the buffer
-               r = pakfire_log_stream_fill_buffer(stream, fd);
+               r = pakfire_buffer_append_from_fd(&stream->buffer, fd);
                if (r < 0)
                        return r;
 
@@ -409,19 +392,10 @@ int pakfire_log_stream_pty(struct pakfire_ctx* ctx, void* data, const char* buff
        struct pakfire_log_stream* self = data;
        int r;
 
-       size_t free_space = sizeof(self->buffer) - self->buffered;
-
-       // Ask to be called again if we don't have any free space
-       if (free_space == 0)
-               return -EAGAIN;
-
-       // Limit to the total buffer size
-       if (free_space < length)
-               length = free_space;
-
-       // Copy the data into the buffer
-       memcpy(self->buffer + self->buffered, buffer, length);
-       self->buffered += length;
+       // Append the data to the buffer
+       r = pakfire_buffer_append(&self->buffer, buffer, length);
+       if (r < 0)
+               return r;
 
        // Drain the buffer
        r = pakfire_log_stream_drain_buffer(self);