#include <systemd/sd-event.h>
+#include <pakfire/buffer.h>
#include <pakfire/ctx.h>
#include <pakfire/log_stream.h>
#define MAX_LINE_LENGTH 16384
+// Never let the buffer grow larger than this
+#define MAX_BUFFER_LENGTH 1024576 // 1 MiB
+
struct pakfire_log_stream {
struct pakfire_ctx* ctx;
int nrefs;
int pipe[2];
// Buffer
- char buffer[64 * 1024];
- size_t buffered;
+ struct pakfire_buffer buffer;
// Event Source
sd_event_source* event;
// Close the pipe
pakfire_log_stream_close(stream);
+ // Free the buffer
+ pakfire_buffer_free(&stream->buffer);
+
if (stream->ctx)
pakfire_ctx_unref(stream->ctx);
free(stream);
s->callback = callback;
s->data = data;
+ // Set the maximum buffer length
+ s->buffer.max_length = MAX_BUFFER_LENGTH;
+
// Create a new pipe
r = pipe2(s->pipe, O_CLOEXEC);
if (r < 0) {
// Return the pointer
*stream = s;
-
return 0;
ERROR:
- if (s)
- pakfire_log_stream_free(s);
+ pakfire_log_stream_free(s);
return r;
}
return NULL;
}
-
#define pakfire_log_stream_sanitize_line(self, line, buffer, buffer_length) \
__pakfire_log_stream_sanitize_line(self, line, sizeof(line), buffer, buffer_length)
return cursor;
}
-static int pakfire_log_stream_fill_buffer(struct pakfire_log_stream* stream, int fd) {
- ssize_t bytes_read;
-
- if (stream->buffered >= sizeof(stream->buffer))
- return -EAGAIN;
-
- // Read into the buffer
- bytes_read = read(fd, stream->buffer + stream->buffered, sizeof(stream->buffer) - stream->buffered);
- if (bytes_read < 0)
- return bytes_read;
-
- stream->buffered += bytes_read;
-
- return 0;
-}
-
static int pakfire_log_stream_drain_buffer(struct pakfire_log_stream* stream) {
char line[MAX_LINE_LENGTH] = {};
- const char* eol = NULL;
- size_t length;
+ size_t length = 0;
int r;
// Log a message if we don't have a callback
for (;;) {
// Find the next line
- eol = memchr(stream->buffer, '\n', stream->buffered);
- if (!eol) {
- if (stream->buffered < sizeof(stream->buffer))
- return 0;
- }
-
- // Determine the length of the line
- if (eol)
- length = eol - stream->buffer + 1;
- else
- length = stream->buffered;
+ length = pakfire_buffer_find_line(&stream->buffer);
+ if (!length)
+ break;
// Sanitize the line
- r = pakfire_log_stream_sanitize_line(stream, line, stream->buffer, length);
+ r = pakfire_log_stream_sanitize_line(stream, line, stream->buffer.data, length);
if (r < 0)
return r;
if (r < 0)
return r;
- memmove(stream->buffer, stream->buffer + length, stream->buffered - length);
- stream->buffered -= length;
+ // Consume the data
+ pakfire_buffer_consume(&stream->buffer, length);
}
+
+ return 0;
}
static int __pakfire_log_stream(sd_event_source* s, int fd, uint32_t events, void* data) {
if (events & EPOLLIN) {
// Fill the buffer
- r = pakfire_log_stream_fill_buffer(stream, fd);
+ r = pakfire_buffer_append_from_fd(&stream->buffer, fd);
if (r < 0)
return r;
struct pakfire_log_stream* self = data;
int r;
- size_t free_space = sizeof(self->buffer) - self->buffered;
-
- // Ask to be called again if we don't have any free space
- if (free_space == 0)
- return -EAGAIN;
-
- // Limit to the total buffer size
- if (free_space < length)
- length = free_space;
-
- // Copy the data into the buffer
- memcpy(self->buffer + self->buffered, buffer, length);
- self->buffered += length;
+ // Append the data to the buffer
+ r = pakfire_buffer_append(&self->buffer, buffer, length);
+ if (r < 0)
+ return r;
// Drain the buffer
r = pakfire_log_stream_drain_buffer(self);