#include <sys/prctl.h>
#include <unistd.h>
+#include "buffer.h"
#include "command.h"
#include "ctx.h"
+#include "file.h"
#include "time.h"
#include "util.h"
// By default, commands are being killed after 30 seconds
#define DEFAULT_TIMEOUT SEC_TO_USEC(30)
+typedef struct collecty_command_output {
+ // Pipes
+ int pipes[2];
+
+ // Buffer
+ collecty_buffer* buffer;
+} collecty_command_output;
+
struct collecty_command {
collecty_ctx* ctx;
int nrefs;
int pidfd;
// Standard Output/Error
- struct {
- // Pipes
- int pipes[2];
-
- // Stream
- FILE* f;
-
- // Buffer
- char* buffer;
- size_t length;
- } stdout, stderr;
+ collecty_command_output stdout;
+ collecty_command_output stderr;
// Callbacks
struct {
collecty_command_close_pipe(self->stdout.pipes);
collecty_command_close_pipe(self->stderr.pipes);
- // Close streams
- if (self->stdout.f)
- fclose(self->stdout.f);
- if (self->stderr.f)
- fclose(self->stderr.f);
-
// Free buffers
if (self->stdout.buffer)
- free(self->stdout.buffer);
+ collecty_buffer_unref(self->stdout.buffer);
if (self->stderr.buffer)
- free(self->stderr.buffer);
+ collecty_buffer_unref(self->stderr.buffer);
// Free events
if (self->events.timeout)
free(self);
}
+static int collecty_command_init_output(collecty_command* self, collecty_command_output* output) {
+ int r;
+
+ // Create pipes
+ r = pipe2(output->pipes, O_CLOEXEC);
+ if (r < 0) {
+ ERROR(self->ctx, "Failed to create pipes: %m\n");
+ return -errno;
+ }
+
+ // Create a buffer to write any data to
+ r = collecty_buffer_create(&output->buffer, self->ctx);
+ if (r < 0) {
+ ERROR(self->ctx, "Failed to create output buffer: %s\n", strerror(-r));
+ return r;
+ }
+
+ return 0;
+}
+
int collecty_command_create(collecty_command** command,
collecty_ctx* ctx, collecty_daemon* daemon) {
collecty_command* self = NULL;
self->pidfd = -EBADF;
// Initialize stdout
- r = pipe2(self->stdout.pipes, O_CLOEXEC);
- if (r < 0) {
- ERROR(self->ctx, "Failed to create stdout pipes: %m\n");
- r = -errno;
- goto ERROR;
- }
-
- // Open a memory stream for stdout
- self->stdout.f = open_memstream(&self->stdout.buffer, &self->stdout.length);
- if (!self->stdout.f) {
- ERROR(self->ctx, "Failed to open stream for stdout: %m\n");
- r = -errno;
+ r = collecty_command_init_output(self, &self->stdout);
+ if (r < 0)
goto ERROR;
- }
// Initialize stderr
- r = pipe2(self->stderr.pipes, O_CLOEXEC);
- if (r < 0) {
- ERROR(self->ctx, "Failed to create stderr pipes: %m\n");
- r = -errno;
- goto ERROR;
- }
-
- // Open a memory stream for stderr
- self->stderr.f = open_memstream(&self->stderr.buffer, &self->stderr.length);
- if (!self->stderr.f) {
- ERROR(self->ctx, "Failed to open stream for stderr: %m\n");
- r = -errno;
+ r = collecty_command_init_output(self, &self->stderr);
+ if (r < 0)
goto ERROR;
- }
// Return pointer
*command = self;
self->callbacks.on_success_data = data;
}
-static int collecty_command_output(collecty_command* self, int fd, unsigned int events, FILE* f) {
- ssize_t bytes_written = 0;
+static int collecty_command_write(collecty_command* self,
+ int fd, unsigned int events, collecty_command_output* output) {
ssize_t bytes_read = 0;
char buffer[4096];
+ int r;
for (;;) {
// Read a block from the pipe
if (bytes_read < 0) {
switch (errno) {
case EAGAIN:
- goto END;
+ return 0;
default:
return -errno;
}
}
+ // Terminate if we have read nothing
if (bytes_read == 0)
- goto END;
+ return 0;
- // Don't write if we don't have an output buffer
- if (unlikely(!f))
- continue;
+ // Write to the buffer
+ r = collecty_buffer_write(output->buffer, buffer, bytes_read);
+ if (r < 0)
+ return r;
- // Write the buffer to the output
- bytes_written = fwrite(buffer, 1, bytes_read, f);
- if (bytes_written < bytes_read) {
- ERROR(self->ctx, "Failed to write output: %m\n");
- return -errno;
- }
}
-END:
- // Flush after we have updated the buffer
- if (unlikely(f))
- fflush(f);
-
return 0;
}
static int collecty_command_stdout(sd_event_source* source, int fd, unsigned int events, void* data) {
collecty_command* self = data;
- return collecty_command_output(self, fd, events, self->stdout.f);
+ return collecty_command_write(self, fd, events, &self->stdout);
}
static int collecty_command_stderr(sd_event_source* source, int fd, unsigned int events, void* data) {
collecty_command* self = data;
- return collecty_command_output(self, fd, events, self->stderr.f);
+ return collecty_command_write(self, fd, events, &self->stderr);
}
static int collecty_command_get_pipe_to_read(collecty_command* self, int (*fds)[2]) {
return -EBADF;
}
-static int collecty_command_log_stderr(const char* line, const size_t length, void* data) {
+static int collecty_command_log_stderr(collecty_ctx* ctx, collecty_file* file,
+ unsigned long lineno, char* line, size_t length, void* data) {
collecty_command* self = data;
// Send the line to the logger
static int collecty_command_exited(sd_event_source* source, const siginfo_t* si, void* data) {
collecty_command* self = data;
+ collecty_file* stdout = NULL;
+ collecty_file* stderr = NULL;
int rc = 0;
int r = 0;
// Log stderr
if (rc) {
- r = collecty_fwalk(self->stderr.f, collecty_command_log_stderr, self);
+ // Open a file handle
+ r = collecty_file_open_buffer(&stderr, self->ctx, self->stderr.buffer);
+ if (r < 0)
+ goto ERROR;
+
+ // Log all lines
+ r = collecty_file_walk(stderr, collecty_command_log_stderr, self);
if (r < 0)
goto ERROR;
}
goto ERROR;
// Call the callback
- if (self->callbacks.on_success)
- r = self->callbacks.on_success(self->ctx, rc, self->stdout.buffer,
- self->stdout.length, self->callbacks.on_success_data);
+ if (self->callbacks.on_success) {
+ // Open stdout as a file
+ r = collecty_file_open_buffer(&stdout, self->ctx, self->stdout.buffer);
+ if (r < 0)
+ goto ERROR;
+
+ // Call the callback
+ r = self->callbacks.on_success(self->ctx,
+ rc, stdout, self->callbacks.on_success_data);
+ }
ERROR:
+ if (stderr)
+ collecty_file_unref(stderr);
+ if (stdout)
+ collecty_file_unref(stdout);
+
// Drop the extra reference
collecty_command_unref(self);
#include "../util.h"
#include "unbound.h"
-typedef struct collecty_unbound_stats {
- unsigned long queries;
- unsigned long cachehits;
- unsigned long cachemiss;
- unsigned long prefetch;
- unsigned long rec_replies;
- double rec_time_avg;
- unsigned long rec_time_median;
-} collecty_unbound_stats;
-
-static int unbound_parse(const char* line, const size_t length, void* data) {
- collecty_unbound_stats* stats = data;
- int r;
-
- // Parse queries
- r = sscanf(line, "total.num.queries=%lu", &stats->queries);
- if (r == 1)
- return 0;
-
- // Parse cachehits
- r = sscanf(line, "total.num.cachehits=%lu", &stats->cachehits);
- if (r == 1)
- return 0;
-
- // Parse cachemiss
- r = sscanf(line, "total.num.cachemiss=%lu", &stats->cachemiss);
- if (r == 1)
- return 0;
-
- // Parse prefetch
- r = sscanf(line, "total.num.prefetch=%lu", &stats->prefetch);
- if (r == 1)
- return 0;
-
- // Parse rec_replies
- r = sscanf(line, "total.num.recursivereplies=%lu", &stats->rec_replies);
- if (r == 1)
- return 0;
-
- // Parse rec_time_avg
- r = sscanf(line, "total.recursion.time.avg=%lf", &stats->rec_time_avg);
- if (r == 1)
- return 0;
-
- // Parse rec_time_median
- r = sscanf(line, "total.recursion.time.median=%lu", &stats->rec_time_median);
- if (r == 1)
- return 0;
-
- return 0;
-}
-
static int unbound_on_success(collecty_ctx* ctx,
- int rc, const char* output, const size_t length, void* data) {
+ int rc, collecty_file* stdout, void* data) {
collecty_source* source = data;
int r;
// Collect stats
- collecty_unbound_stats stats = {};
+ unsigned long queries = 0;
+ unsigned long cachehits = 0;
+ unsigned long cachemiss = 0;
+ unsigned long prefetch = 0;
+ unsigned long rec_replies = 0;
+ double rec_time_avg = 0;
+ unsigned long rec_time_median = 0;
+
+ collecty_file_parser parser[] = {
+ { "total.num.queries=%lu", &queries },
+ { "total.num.cachehits=%lu", &cachehits },
+ { "total.num.cachemiss=%lu", &cachemiss },
+ { "total.num.prefetch=%lu", &prefetch },
+ { "total.num.recursivereplies=%lu", &rec_replies },
+ { "total.recursion.time.avg=%lf", &rec_time_avg },
+ { "total.recursion.time.median=%lu", &rec_time_median },
+ { NULL },
+ };
// Parse the output
- r = collecty_fwalk_buffer(output, length, unbound_parse, &stats);
+ r = collecty_file_parse(stdout, parser);
if (r < 0)
return r;
// Submit values
return collecty_source_submit(source, NULL, "%lu:%lu:%lu:%lu:%lu:%lf:%lu",
- stats.queries, stats.cachehits, stats.cachemiss, stats.prefetch,
- stats.rec_replies, stats.rec_time_avg, stats.rec_time_median);
+ queries, cachehits, cachemiss, prefetch, rec_replies, rec_time_avg, rec_time_median);
}
static int unbound_heartbeat(collecty_ctx* ctx, collecty_source* source) {