From 4968a28ea6c3c5f9f8359e6662fe976e06896966 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Wolfgang=20St=C3=B6ggl?= Date: Thu, 11 Jul 2019 11:32:10 +0200 Subject: [PATCH] Indent rrd_daemon.c - indent src/rrd_daemon.c using GNU indent 2.2.12 before further changes to the code --- src/rrd_daemon.c | 8133 +++++++++++++++++++++++----------------------- 1 file changed, 4062 insertions(+), 4071 deletions(-) diff --git a/src/rrd_daemon.c b/src/rrd_daemon.c index 54a4ec04..10385799 100644 --- a/src/rrd_daemon.c +++ b/src/rrd_daemon.c @@ -57,7 +57,7 @@ # undef _GNU_SOURCE #endif /* }}} */ -#endif /* 0 */ +#endif /* 0 */ /* * Now for some includes.. @@ -108,7 +108,7 @@ #ifdef HAVE_LIBWRAP #include -#endif /* HAVE_LIBWRAP */ +#endif /* HAVE_LIBWRAP */ #include "rrd_strtod.h" #include @@ -131,96 +131,95 @@ */ typedef enum { RESP_ERR = -1, RESP_OK = 0, RESP_OK_BIN = 1 } response_code; -struct listen_socket_s -{ - int fd; - char *addr; - int family; +struct listen_socket_s { + int fd; + char *addr; + int family; - /* state for BATCH processing */ - time_t batch_start; - int batch_cmd; + /* state for BATCH processing */ + time_t batch_start; + int batch_cmd; - /* buffered IO */ - char *rbuf; - off_t next_cmd; - off_t next_read; + /* buffered IO */ + char *rbuf; + off_t next_cmd; + off_t next_read; - char *wbuf_data; - size_t wbuf_size; - size_t wbuf_capacity; + char *wbuf_data; + size_t wbuf_size; + size_t wbuf_capacity; - uint32_t permissions; + uint32_t permissions; - gid_t socket_group; - mode_t socket_permissions; + gid_t socket_group; + mode_t socket_permissions; }; typedef struct listen_socket_s listen_socket_t; struct command_s; typedef struct command_s command_t; + /* note: guard against "unused" warnings in the handlers */ -#define DISPATCH_PROTO listen_socket_t UNUSED(*sock),\ - time_t UNUSED(now),\ - char UNUSED(*buffer),\ - size_t UNUSED(buffer_size) +#define DISPATCH_PROTO listen_socket_t UNUSED(*sock),\ + time_t UNUSED(now),\ + char UNUSED(*buffer),\ + size_t UNUSED(buffer_size) -#define HANDLER_PROTO command_t UNUSED(*cmd),\ - DISPATCH_PROTO +#define HANDLER_PROTO command_t UNUSED(*cmd),\ + DISPATCH_PROTO struct command_s { - char *cmd; - int (*handler)(HANDLER_PROTO); - - char context; /* where we expect to see it */ -#define CMD_CONTEXT_CLIENT (1<<0) -#define CMD_CONTEXT_BATCH (1<<1) -#define CMD_CONTEXT_JOURNAL (1<<2) -#define CMD_CONTEXT_ANY (0x7f) - - char *syntax; - char *help; + char *cmd; + int ( + *handler) ( + HANDLER_PROTO); + + char context; /* where we expect to see it */ +#define CMD_CONTEXT_CLIENT (1<<0) +#define CMD_CONTEXT_BATCH (1<<1) +#define CMD_CONTEXT_JOURNAL (1<<2) +#define CMD_CONTEXT_ANY (0x7f) + + char *syntax; + char *help; }; struct cache_item_s; typedef struct cache_item_s cache_item_t; -struct cache_item_s -{ - char *file; - char **values; - size_t values_num; /* number of valid pointers */ - size_t values_alloc; /* number of allocated pointers */ - time_t last_flush_time; - double last_update_stamp; +struct cache_item_s { + char *file; + char **values; + size_t values_num; /* number of valid pointers */ + size_t values_alloc; /* number of allocated pointers */ + time_t last_flush_time; + double last_update_stamp; #define CI_FLAGS_IN_TREE (1<<0) #define CI_FLAGS_IN_QUEUE (1<<1) #define CI_FLAGS_SUSPENDED (1<<2) - int flags; - pthread_cond_t flushed; - cache_item_t *prev; - cache_item_t *next; + int flags; + pthread_cond_t flushed; + cache_item_t *prev; + cache_item_t *next; }; -struct callback_flush_data_s -{ - time_t now; - time_t abs_timeout; - char **keys; - size_t keys_num; +struct callback_flush_data_s { + time_t now; + time_t abs_timeout; + char **keys; + size_t keys_num; }; typedef struct callback_flush_data_s callback_flush_data_t; -enum queue_side_e -{ - HEAD, - TAIL +enum queue_side_e { + HEAD, + TAIL }; typedef enum queue_side_e queue_side_t; /* describe a set of journal files */ typedef struct { - char **files; - size_t files_num; + char **files; + size_t files_num; } journal_set; #define RBUF_SIZE (RRD_CMD_MAX*2) @@ -238,9 +237,9 @@ static size_t listen_fds_num = 0; static listen_socket_t default_socket; enum { - RUNNING, /* normal operation */ - FLUSHING, /* flushing remaining values */ - SHUTDOWN /* shutting down */ + RUNNING, /* normal operation */ + FLUSHING, /* flushing remaining values */ + SHUTDOWN /* shutting down */ } state = RUNNING; static pthread_t *queue_threads; @@ -251,21 +250,21 @@ static pthread_t flush_thread; static pthread_cond_t flush_cond = PTHREAD_COND_INITIALIZER; static pthread_mutex_t connection_threads_lock = PTHREAD_MUTEX_INITIALIZER; -static pthread_cond_t connection_threads_done = PTHREAD_COND_INITIALIZER; +static pthread_cond_t connection_threads_done = PTHREAD_COND_INITIALIZER; static int connection_threads_num = 0; static FILE *log_fh = NULL; static pthread_mutex_t log_lock = PTHREAD_MUTEX_INITIALIZER; /* Cache stuff */ -static GTree *cache_tree = NULL; -static cache_item_t *cache_queue_head = NULL; -static cache_item_t *cache_queue_tail = NULL; +static GTree *cache_tree = NULL; +static cache_item_t *cache_queue_head = NULL; +static cache_item_t *cache_queue_tail = NULL; static pthread_mutex_t cache_lock = PTHREAD_MUTEX_INITIALIZER; static sigset_t signal_set; static int config_write_interval = 300; -static int config_write_jitter = 0; +static int config_write_jitter = 0; static int config_flush_interval = 3600; static int config_flush_at_shutdown = 0; static char *config_pid_file = NULL; @@ -288,7 +287,7 @@ static uint64_t stats_journal_rotate = 0; static pthread_mutex_t stats_lock = PTHREAD_MUTEX_INITIALIZER; static pthread_mutex_t rrdfilecreate_lock = PTHREAD_MUTEX_INITIALIZER; -static int opt_no_overwrite = 0; /* default for the daemon */ +static int opt_no_overwrite = 0; /* default for the daemon */ static int opt_log_level = LOG_ERR; /* don't pollute syslog */ @@ -298,3631 +297,3659 @@ static int opt_log_level = LOG_ERR; /* don't pollute syslog */ static journal_set *journal_cur = NULL; static journal_set *journal_old = NULL; static char *journal_dir = NULL; -static FILE *journal_fh = NULL; /* current journal file handle */ -static long journal_size = 0; /* current journal size */ +static FILE *journal_fh = NULL; /* current journal file handle */ +static long journal_size = 0; /* current journal size */ + #define JOURNAL_MAX (1 * 1024 * 1024 * 1024) static pthread_mutex_t journal_lock = PTHREAD_MUTEX_INITIALIZER; -static int journal_write(char *cmd, char *args); -static void journal_done(void); -static void journal_rotate(void); +static int journal_write( + char *cmd, + char *args); +static void journal_done( + void); +static void journal_rotate( + void); /* prototypes for forward references */ -static int handle_request_help (HANDLER_PROTO); -static int handle_request_ping (HANDLER_PROTO); +static int handle_request_help( + HANDLER_PROTO); +static int handle_request_ping( + HANDLER_PROTO); /* * Functions */ -static void do_log (int priority, const char *format, ...) +static void do_log( + int priority, + const char *format, + ...) { - va_list args; + va_list args; - if (stay_foreground) - { - pthread_mutex_lock(&log_lock); + if (stay_foreground) { + pthread_mutex_lock(&log_lock); + va_start(args, format); + vfprintf(stderr, format, args); + fprintf(stderr, "\n"); + va_end(args); + pthread_mutex_unlock(&log_lock); + } + va_start(args, format); - vfprintf(stderr, format, args); - fprintf(stderr, "\n"); + if (log_fh) { + char buffer[32]; + + pthread_mutex_lock(&log_lock); + time_t now = time(NULL); + + strftime(buffer, sizeof(buffer), "%Y-%m-%d %H:%M:%S", gmtime(&now)); + fprintf(log_fh, "%s [%d] ", buffer, priority); + vfprintf(log_fh, format, args); + fprintf(log_fh, "\n"); + fflush(log_fh); + pthread_mutex_unlock(&log_lock); + } else + vsyslog(priority, format, args); va_end(args); - pthread_mutex_unlock(&log_lock); - } - - va_start(args, format); - if (log_fh) - { - char buffer[32]; - pthread_mutex_lock(&log_lock); - time_t now = time(NULL); - strftime(buffer, sizeof(buffer), "%Y-%m-%d %H:%M:%S", gmtime(&now)); - fprintf(log_fh, "%s [%d] ", buffer, priority); - vfprintf(log_fh, format, args); - fprintf(log_fh, "\n"); - fflush(log_fh); - pthread_mutex_unlock(&log_lock); - } - else - vsyslog(priority, format, args); - va_end(args); } -static void sig_common (const char *sig) /* {{{ */ -{ - RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig); +static void sig_common( + const char *sig) +{ /* {{{ */ + RRDD_LOG(LOG_NOTICE, "caught SIG%s", sig); - int status = pthread_mutex_lock(&cache_lock); + int status = pthread_mutex_lock(&cache_lock); - if (status) - { - RRDD_LOG(LOG_ERR, "%s\nstatus: %d", "Lock failed.", status); - abort(); - } + if (status) { + RRDD_LOG(LOG_ERR, "%s\nstatus: %d", "Lock failed.", status); + abort(); + } - if (state == RUNNING) { - state = FLUSHING; - } - pthread_cond_broadcast(&flush_cond); - status = pthread_mutex_unlock(&cache_lock); + if (state == RUNNING) { + state = FLUSHING; + } + pthread_cond_broadcast(&flush_cond); + status = pthread_mutex_unlock(&cache_lock); - if (status) - { - RRDD_LOG(LOG_ERR, "%s\nstatus: %d", "Unlock failed.", status); - abort(); - } + if (status) { + RRDD_LOG(LOG_ERR, "%s\nstatus: %d", "Unlock failed.", status); + abort(); + } - pthread_cond_broadcast(&queue_cond); -} /* }}} void sig_common */ + pthread_cond_broadcast(&queue_cond); +} /* }}} void sig_common */ -static void* signal_receiver (void UNUSED(*args)) +static void *signal_receiver( + void UNUSED(*args)) { - int status; + int status; - while (1) - { + while (1) { #if defined(HAVE_SIGWAITINFO) - siginfo_t signal_info; - status = sigwaitinfo(&signal_set, &signal_info); + siginfo_t signal_info; + + status = sigwaitinfo(&signal_set, &signal_info); #elif defined(HAVE_SIGWAIT) - status = -1; - if (sigwait(&signal_set, &status) < 0 ){ - status = -1; - } + status = -1; + if (sigwait(&signal_set, &status) < 0) { + status = -1; + } #else #error "we need sigwaitinfo or sigwait to compile rrd_daemon" #endif - switch(status) - { - case -1: - RRDD_LOG(LOG_NOTICE, "%s: %s (May be OK if occurring while attaching/attached to strace, gdb, etc)\nerrno: %d", __func__, "Signal wait failed.", errno); - break; + switch (status) { + case -1: + RRDD_LOG(LOG_NOTICE, + "%s: %s (May be OK if occurring while attaching/attached to strace, gdb, etc)\nerrno: %d", + __func__, "Signal wait failed.", errno); + break; - case SIGINT: - sig_common("INT"); - break; + case SIGINT: + sig_common("INT"); + break; - case SIGTERM: - sig_common("TERM"); - break; + case SIGTERM: + sig_common("TERM"); + break; - case SIGUSR1: - status = pthread_mutex_lock(&cache_lock); + case SIGUSR1: + status = pthread_mutex_lock(&cache_lock); - if (status) - { - RRDD_LOG(LOG_ERR, "%s\nstatus: %d", "Lock failed.", status); - abort(); - } + if (status) { + RRDD_LOG(LOG_ERR, "%s\nstatus: %d", "Lock failed.", status); + abort(); + } - config_flush_at_shutdown = 1; - status = pthread_mutex_unlock(&cache_lock); + config_flush_at_shutdown = 1; + status = pthread_mutex_unlock(&cache_lock); - if (status) - { - RRDD_LOG(LOG_ERR, "%s\nstatus: %d", "Unlock failed.", status); - abort(); - } + if (status) { + RRDD_LOG(LOG_ERR, "%s\nstatus: %d", "Unlock failed.", status); + abort(); + } - sig_common("USR1"); - break; + sig_common("USR1"); + break; - case SIGUSR2: - status = pthread_mutex_lock(&cache_lock); + case SIGUSR2: + status = pthread_mutex_lock(&cache_lock); - if (status) - { - RRDD_LOG(LOG_ERR, "%s\nstatus: %d", "Lock failed.", status); - abort(); - } + if (status) { + RRDD_LOG(LOG_ERR, "%s\nstatus: %d", "Lock failed.", status); + abort(); + } - config_flush_at_shutdown = 0; - status = pthread_mutex_unlock(&cache_lock); + config_flush_at_shutdown = 0; + status = pthread_mutex_unlock(&cache_lock); - if (status) - { - RRDD_LOG(LOG_ERR, "%s\nstatus: %d", "Unlock failed.", status); - abort(); - } + if (status) { + RRDD_LOG(LOG_ERR, "%s\nstatus: %d", "Unlock failed.", status); + abort(); + } - sig_common("USR2"); - break; + sig_common("USR2"); + break; - default: + default: #if defined(HAVE_SIGWAITINFO) - RRDD_LOG(LOG_NOTICE, - "%s: Signal %d was received from process %u.\n", - __func__, - status, - signal_info.si_pid); + RRDD_LOG(LOG_NOTICE, + "%s: Signal %d was received from process %u.\n", + __func__, status, signal_info.si_pid); #else - RRDD_LOG(LOG_NOTICE, - "%s: Signal %d was received.\n", - __func__, - status); + RRDD_LOG(LOG_NOTICE, + "%s: Signal %d was received.\n", __func__, status); #endif + } } - } - return NULL; + return NULL; } -static void install_signal_receiver(void) +static void install_signal_receiver( + void) { - pthread_t receiver; - int status = sigfillset(&signal_set); - - if (status) - { - RRDD_LOG(LOG_ERR, "%s\nerrno: %d", "Signal set could not be initialized.", errno); - abort(); - } - - /* Block all signals in the initial thread. */ - status = pthread_sigmask(SIG_SETMASK, &signal_set, NULL); - - if (status) - { - RRDD_LOG(LOG_ERR, "%s\nstatus: %d", "Signal mask could not be set.", status); - abort(); - } - - status = pthread_create(&receiver, NULL, signal_receiver, NULL); - - if (status) - { - RRDD_LOG(LOG_ERR, "%s\nstatus: %d", "A thread could not be created.", status); - abort(); - } - - status = pthread_detach(receiver); - - if (status) - { - RRDD_LOG(LOG_ERR, "%s\nstatus: %d", "A thread could not be detached.", status); - abort(); - } + pthread_t receiver; + int status = sigfillset(&signal_set); + + if (status) { + RRDD_LOG(LOG_ERR, "%s\nerrno: %d", + "Signal set could not be initialized.", errno); + abort(); + } + + /* Block all signals in the initial thread. */ + status = pthread_sigmask(SIG_SETMASK, &signal_set, NULL); + + if (status) { + RRDD_LOG(LOG_ERR, "%s\nstatus: %d", "Signal mask could not be set.", + status); + abort(); + } + + status = pthread_create(&receiver, NULL, signal_receiver, NULL); + + if (status) { + RRDD_LOG(LOG_ERR, "%s\nstatus: %d", "A thread could not be created.", + status); + abort(); + } + + status = pthread_detach(receiver); + + if (status) { + RRDD_LOG(LOG_ERR, "%s\nstatus: %d", "A thread could not be detached.", + status); + abort(); + } } -static int open_pidfile(char *action, int oflag) /* {{{ */ -{ - int fd; - const char *file; - char *file_copy, *dir; - - file = (config_pid_file != NULL) - ? config_pid_file - : LOCALSTATEDIR "/run/rrdcached.pid"; - - /* dirname may modify its argument */ - file_copy = strdup(file); - if (file_copy == NULL) - { - fprintf(stderr, "rrdcached: strdup(): %s\n", - rrd_strerror(errno)); - return -1; - } +static int open_pidfile( + char *action, + int oflag) +{ /* {{{ */ + int fd; + const char *file; + char *file_copy, *dir; + + file = (config_pid_file != NULL) + ? config_pid_file : LOCALSTATEDIR "/run/rrdcached.pid"; + + /* dirname may modify its argument */ + file_copy = strdup(file); + if (file_copy == NULL) { + fprintf(stderr, "rrdcached: strdup(): %s\n", rrd_strerror(errno)); + return -1; + } + + dir = strdup(dirname(file_copy)); + if (dir == NULL) { + fprintf(stderr, "rrdcached: strdup(): %s\n", rrd_strerror(errno)); + free(file_copy); + return -1; + } + if (rrd_mkdir_p(dir, 0777) != 0) { + fprintf(stderr, "Failed to create pidfile directory '%s': %s\n", + dir, rrd_strerror(errno)); + free(dir); + free(file_copy); + return -1; + } - dir = strdup(dirname(file_copy)); - if (dir == NULL) - { - fprintf(stderr, "rrdcached: strdup(): %s\n", - rrd_strerror(errno)); - free(file_copy); - return -1; - } - if (rrd_mkdir_p(dir, 0777) != 0) - { - fprintf(stderr, "Failed to create pidfile directory '%s': %s\n", - dir, rrd_strerror(errno)); free(dir); free(file_copy); - return -1; - } - - free(dir); - free(file_copy); - fd = open(file, oflag, S_IWUSR|S_IRUSR|S_IRGRP|S_IROTH); - if (fd < 0) - fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n", - action, file, rrd_strerror(errno)); + fd = open(file, oflag, S_IWUSR | S_IRUSR | S_IRGRP | S_IROTH); + if (fd < 0) + fprintf(stderr, "rrdcached: can't %s pid file '%s' (%s)\n", + action, file, rrd_strerror(errno)); - return(fd); -} /* }}} static int open_pidfile */ + return (fd); +} /* }}} static int open_pidfile */ /* check existing pid file to see whether a daemon is running */ -static int check_pidfile(void) +static int check_pidfile( + void) { - int pid_fd; - pid_t pid; - char pid_str[16]; + int pid_fd; + pid_t pid; + char pid_str[16]; + + pid_fd = open_pidfile("open", O_RDWR); + if (pid_fd < 0) { + fprintf(stderr, "FATAL: Fail to create/open PID file \n"); + return pid_fd; + } - pid_fd = open_pidfile("open", O_RDWR); - if (pid_fd < 0){ - fprintf(stderr,"FATAL: Fail to create/open PID file \n"); - return pid_fd; - } + if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0) { + fprintf(stderr, "FATAL: Empty PID file exist\n"); + close(pid_fd); + return -1; + } - if (read(pid_fd, pid_str, sizeof(pid_str)) <= 0) { - fprintf(stderr,"FATAL: Empty PID file exist\n"); - close(pid_fd); - return -1; - } + pid = atoi(pid_str); + if (pid <= 0) { + fprintf(stderr, "FATAL: PID file is corrupted\n"); - pid = atoi(pid_str); - if (pid <= 0) { - fprintf(stderr,"FATAL: PID file is corrupted\n"); + close(pid_fd); + return -1; + } - close(pid_fd); - return -1; - } + /* another running process that we can signal COULD be + * a competing rrdcached */ + if (pid != getpid() && kill(pid, 0) == 0) { + fprintf(stderr, + "FATAL: Another rrdcached daemon is running?? (pid %d)\n", + pid); + close(pid_fd); + return -1; + } - /* another running process that we can signal COULD be - * a competing rrdcached */ - if (pid != getpid() && kill(pid, 0) == 0) - { - fprintf(stderr, - "FATAL: Another rrdcached daemon is running?? (pid %d)\n", pid); - close(pid_fd); - return -1; - } + lseek(pid_fd, 0, SEEK_SET); + if (ftruncate(pid_fd, 0) == -1) { + fprintf(stderr, + "FATAL: Failed to truncate stale PID file. (pid %d)\n", pid); + close(pid_fd); + return -1; + } - lseek(pid_fd, 0, SEEK_SET); - if (ftruncate(pid_fd, 0) == -1) - { fprintf(stderr, - "FATAL: Failed to truncate stale PID file. (pid %d)\n", pid); - close(pid_fd); - return -1; - } - - fprintf(stderr, - "rrdcached: removed stale PID file (no rrdcached on pid %d)\n" - "rrdcached: starting normally.\n", pid); - - return pid_fd; -} /* }}} static int check_pidfile */ + "rrdcached: removed stale PID file (no rrdcached on pid %d)\n" + "rrdcached: starting normally.\n", pid); -static int write_pidfile (int fd) /* {{{ */ -{ - pid_t pid; - FILE *fh; - - pid = getpid (); - - fh = fdopen (fd, "w"); - if (fh == NULL) - { - RRDD_LOG (LOG_ERR, "write_pidfile: fdopen() failed."); - close(fd); - return (-1); - } + return pid_fd; +} /* }}} static int check_pidfile */ - fprintf (fh, "%i\n", (int) pid); - fclose (fh); +static int write_pidfile( + int fd) +{ /* {{{ */ + pid_t pid; + FILE *fh; - return (0); -} /* }}} int write_pidfile */ + pid = getpid(); -static int remove_pidfile (void) /* {{{ */ -{ - char *file; - int status; + fh = fdopen(fd, "w"); + if (fh == NULL) { + RRDD_LOG(LOG_ERR, "write_pidfile: fdopen() failed."); + close(fd); + return (-1); + } - file = (config_pid_file != NULL) - ? config_pid_file - : LOCALSTATEDIR "/run/rrdcached.pid"; + fprintf(fh, "%i\n", (int) pid); + fclose(fh); - status = unlink (file); - if (status == 0) return (0); - return (errno); -} /* }}} int remove_pidfile */ +} /* }}} int write_pidfile */ -static char *next_cmd (listen_socket_t *sock, ssize_t *len) /* {{{ */ -{ - char *eol; - - eol = memchr(sock->rbuf + sock->next_cmd, '\n', - sock->next_read - sock->next_cmd); - - if (eol == NULL) - { - /* no commands left, move remainder back to front of rbuf */ - memmove(sock->rbuf, sock->rbuf + sock->next_cmd, - sock->next_read - sock->next_cmd); - sock->next_read -= sock->next_cmd; - sock->next_cmd = 0; - *len = 0; - return NULL; - } - else - { - char *cmd = sock->rbuf + sock->next_cmd; - *eol = '\0'; +static int remove_pidfile( + void) +{ /* {{{ */ + char *file; + int status; - sock->next_cmd = eol - sock->rbuf + 1; + file = (config_pid_file != NULL) + ? config_pid_file : LOCALSTATEDIR "/run/rrdcached.pid"; - if (eol > sock->rbuf && *(eol-1) == '\r') - *(--eol) = '\0'; /* handle "\r\n" EOL */ + status = unlink(file); + if (status == 0) + return (0); + return (errno); +} /* }}} int remove_pidfile */ + +static char *next_cmd( + listen_socket_t *sock, + ssize_t *len) +{ /* {{{ */ + char *eol; + + eol = memchr(sock->rbuf + sock->next_cmd, '\n', + sock->next_read - sock->next_cmd); + + if (eol == NULL) { + /* no commands left, move remainder back to front of rbuf */ + memmove(sock->rbuf, sock->rbuf + sock->next_cmd, + sock->next_read - sock->next_cmd); + sock->next_read -= sock->next_cmd; + sock->next_cmd = 0; + *len = 0; + return NULL; + } else { + char *cmd = sock->rbuf + sock->next_cmd; - *len = eol - cmd; + *eol = '\0'; - return cmd; - } + sock->next_cmd = eol - sock->rbuf + 1; - /* NOTREACHED */ - assert(1==0); -} /* }}} char *next_cmd */ + if (eol > sock->rbuf && *(eol - 1) == '\r') + *(--eol) = '\0'; /* handle "\r\n" EOL */ -static char *wbuf_data(listen_socket_t *sock) /* {{{ */ -{ - assert(sock != NULL); - return sock->wbuf_data; -} /* }}} static char *wbuf_data */ + *len = eol - cmd; -static size_t wbuf_size(listen_socket_t *sock) /* {{{ */ -{ - assert(sock != NULL); - return sock->wbuf_size; -} /* }}} static size_t wbuf_data */ + return cmd; + } -static void wbuf_free(listen_socket_t *sock) /* {{{ */ -{ - assert(sock != NULL); - free(sock->wbuf_data); - sock->wbuf_data = NULL; - sock->wbuf_size = 0; - sock->wbuf_capacity = 0; -} /* }}} static void wbuf_free */ + /* NOTREACHED */ + assert(1 == 0); +} /* }}} char *next_cmd */ + +static char *wbuf_data( + listen_socket_t *sock) +{ /* {{{ */ + assert(sock != NULL); + return sock->wbuf_data; +} /* }}} static char *wbuf_data */ + +static size_t wbuf_size( + listen_socket_t *sock) +{ /* {{{ */ + assert(sock != NULL); + return sock->wbuf_size; +} /* }}} static size_t wbuf_data */ + +static void wbuf_free( + listen_socket_t *sock) +{ /* {{{ */ + assert(sock != NULL); + free(sock->wbuf_data); + sock->wbuf_data = NULL; + sock->wbuf_size = 0; + sock->wbuf_capacity = 0; +} /* }}} static void wbuf_free */ /* add the characters directly to the write buffer */ -static int wbuf_append(listen_socket_t *sock, char *str, size_t len) /* {{{ */ -{ - char *new_data; - size_t new_capacity; - - assert(sock != NULL); - - new_capacity = sock->wbuf_capacity == 0 ? 4096 : sock->wbuf_capacity; - while (new_capacity <= sock->wbuf_size + len) - { - new_capacity *= 2; - } +static int wbuf_append( + listen_socket_t *sock, + char *str, + size_t len) +{ /* {{{ */ + char *new_data; + size_t new_capacity; + + assert(sock != NULL); + + new_capacity = sock->wbuf_capacity == 0 ? 4096 : sock->wbuf_capacity; + while (new_capacity <= sock->wbuf_size + len) { + new_capacity *= 2; + } - if (new_capacity != sock->wbuf_capacity) - { - new_data = rrd_realloc(sock->wbuf_data, new_capacity); - if (new_data == NULL) - { - RRDD_LOG(LOG_ERR, "wbuf_append: realloc failed"); - return -1; + if (new_capacity != sock->wbuf_capacity) { + new_data = rrd_realloc(sock->wbuf_data, new_capacity); + if (new_data == NULL) { + RRDD_LOG(LOG_ERR, "wbuf_append: realloc failed"); + return -1; + } + sock->wbuf_data = new_data; + sock->wbuf_capacity = new_capacity; } - sock->wbuf_data = new_data; - sock->wbuf_capacity = new_capacity; - } - memcpy(&sock->wbuf_data[sock->wbuf_size], str, len); - sock->wbuf_data[sock->wbuf_size + len] = '\0'; - sock->wbuf_size += len; + memcpy(&sock->wbuf_data[sock->wbuf_size], str, len); + sock->wbuf_data[sock->wbuf_size + len] = '\0'; + sock->wbuf_size += len; - return 0; -} /* }}} static int wbuf_append */ + return 0; +} /* }}} static int wbuf_append */ /* add the text to the "extra" info that's sent after the status line */ -static int add_response_info(listen_socket_t *sock, char *fmt, ...) /* {{{ */ -{ - va_list argp; - char buffer[RRD_CMD_MAX]; - int len; - - if (JOURNAL_REPLAY(sock)) return 0; - if (sock->batch_start) return 0; /* no extra info returned when in BATCH */ +static int add_response_info( + listen_socket_t *sock, + char *fmt, + ...) +{ /* {{{ */ + va_list argp; + char buffer[RRD_CMD_MAX]; + int len; + + if (JOURNAL_REPLAY(sock)) + return 0; + if (sock->batch_start) + return 0; /* no extra info returned when in BATCH */ - va_start(argp, fmt); + va_start(argp, fmt); #ifdef HAVE_VSNPRINTF - len = vsnprintf(buffer, sizeof(buffer), fmt, argp); + len = vsnprintf(buffer, sizeof(buffer), fmt, argp); #else - len = vsprintf(buffer, fmt, argp); + len = vsprintf(buffer, fmt, argp); #endif - va_end(argp); - if (len < 0) - { - RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed"); - return -1; - } + va_end(argp); + if (len < 0) { + RRDD_LOG(LOG_ERR, "add_response_info: vnsprintf failed"); + return -1; + } - return wbuf_append(sock, buffer, len); -} /* }}} static int add_response_info */ + return wbuf_append(sock, buffer, len); +} /* }}} static int add_response_info */ /* add the binary data to the "extra" info that's sent after the status line */ -static int add_binary_response_info(listen_socket_t *sock, - char *prefix, char *name, - void* data, int records, int rsize - ) /* {{{ */ -{ - int res; - res = add_response_info (sock, - "%s%s: BinaryData %i %i %s\n", - prefix, name, records, rsize, +static int add_binary_response_info( + listen_socket_t *sock, + char *prefix, + char *name, + void *data, + int records, + int rsize) +{ /* {{{ */ + int res; + + res = add_response_info(sock, + "%s%s: BinaryData %i %i %s\n", + prefix, name, records, rsize, #ifdef WORDS_BIGENDIAN - "BIG" + "BIG" #else - "LITTLE" + "LITTLE" #endif - ); - if (res) - return res; - /* and add it to the buffer */ - res = wbuf_append(sock, (char*) data, records * rsize); - if (res) - return res; - /* and add a newline */ - return wbuf_append(sock, "\n", 1); -} /* }}} static int add_binary_response_info */ - -static int count_lines(char *str) /* {{{ */ -{ - int lines = 0; - - if (str != NULL) - { - while ((str = strchr(str, '\n')) != NULL) - { - ++lines; - ++str; + ); + if (res) + return res; + /* and add it to the buffer */ + res = wbuf_append(sock, (char *) data, records * rsize); + if (res) + return res; + /* and add a newline */ + return wbuf_append(sock, "\n", 1); +} /* }}} static int add_binary_response_info */ + +static int count_lines( + char *str) +{ /* {{{ */ + int lines = 0; + + if (str != NULL) { + while ((str = strchr(str, '\n')) != NULL) { + ++lines; + ++str; + } } - } - return lines; -} /* }}} static int count_lines */ + return lines; +} /* }}} static int count_lines */ /* send the response back to the user. * returns 0 on success, -1 on error * write buffer is always zeroed after this call */ -static int send_response (listen_socket_t *sock, response_code rc, - char *fmt, ...) /* {{{ */ -{ - va_list argp; - char buffer[RRD_CMD_MAX]; - int lines; - size_t wrote; - int rclen, len; - - if (JOURNAL_REPLAY(sock)) return rc; - - if (sock->batch_start) - { - if (rc == RESP_OK) - return rc; /* no response on success during BATCH */ - lines = sock->batch_cmd; - } - else if (rc == RESP_OK) - lines = count_lines(wbuf_data(sock)); - else if (rc == RESP_OK_BIN) - lines = 1; - else - lines = -1; - - if (rc == RESP_OK_BIN) { - rclen = 0; - rc = RESP_OK; - } else { - rclen = snprintf(buffer, sizeof buffer, "%d ", lines); - } - - va_start(argp, fmt); +static int send_response( + listen_socket_t *sock, + response_code rc, + char *fmt, + ...) +{ /* {{{ */ + va_list argp; + char buffer[RRD_CMD_MAX]; + int lines; + size_t wrote; + int rclen, len; + + if (JOURNAL_REPLAY(sock)) + return rc; + + if (sock->batch_start) { + if (rc == RESP_OK) + return rc; /* no response on success during BATCH */ + lines = sock->batch_cmd; + } else if (rc == RESP_OK) + lines = count_lines(wbuf_data(sock)); + else if (rc == RESP_OK_BIN) + lines = 1; + else + lines = -1; + + if (rc == RESP_OK_BIN) { + rclen = 0; + rc = RESP_OK; + } else { + rclen = snprintf(buffer, sizeof buffer, "%d ", lines); + } + + va_start(argp, fmt); #ifdef HAVE_VSNPRINTF - len = vsnprintf(buffer+rclen, sizeof(buffer)-rclen, fmt, argp); + len = vsnprintf(buffer + rclen, sizeof(buffer) - rclen, fmt, argp); #else - len = vsprintf(buffer+rclen, fmt, argp); + len = vsprintf(buffer + rclen, fmt, argp); #endif - va_end(argp); - if (len < 0) - return -1; - - len += rclen; + va_end(argp); + if (len < 0) + return -1; - /* append the result to the wbuf, don't write to the user */ - if (sock->batch_start) - return wbuf_append(sock, buffer, len); + len += rclen; - /* first write must be complete */ - if (len != write(sock->fd, buffer, len)) - { - RRDD_LOG(LOG_INFO, "send_response: could not write status message"); - return -1; - } + /* append the result to the wbuf, don't write to the user */ + if (sock->batch_start) + return wbuf_append(sock, buffer, len); - if (wbuf_data(sock) != NULL && rc == RESP_OK) - { - wrote = 0; - while (wrote < wbuf_size(sock)) - { - ssize_t wb = write(sock->fd, wbuf_data(sock) + wrote, wbuf_size(sock) - wrote); - if (wb <= 0) - { - RRDD_LOG(LOG_INFO, "send_response: could not write results"); + /* first write must be complete */ + if (len != write(sock->fd, buffer, len)) { + RRDD_LOG(LOG_INFO, "send_response: could not write status message"); return -1; - } - wrote += wb; } - } - wbuf_free(sock); + if (wbuf_data(sock) != NULL && rc == RESP_OK) { + wrote = 0; + while (wrote < wbuf_size(sock)) { + ssize_t wb = write(sock->fd, wbuf_data(sock) + wrote, + wbuf_size(sock) - wrote); + + if (wb <= 0) { + RRDD_LOG(LOG_INFO, "send_response: could not write results"); + return -1; + } + wrote += wb; + } + } - return 0; -} /* }}} */ + wbuf_free(sock); + + return 0; +} /* }}} */ -static void wipe_ci_values(cache_item_t *ci, time_t when) +static void wipe_ci_values( + cache_item_t *ci, + time_t when) { - ci->values = NULL; - ci->values_num = 0; - ci->values_alloc = 0; + ci->values = NULL; + ci->values_num = 0; + ci->values_alloc = 0; - ci->last_flush_time = when; - if (config_write_jitter > 0) - ci->last_flush_time += (rrd_random() % config_write_jitter); + ci->last_flush_time = when; + if (config_write_jitter > 0) + ci->last_flush_time += (rrd_random() % config_write_jitter); } /* remove_from_queue * remove a "cache_item_t" item from the queue. * must hold 'cache_lock' when calling this */ -static void remove_from_queue(cache_item_t *ci) /* {{{ */ -{ - if (ci == NULL) return; - if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) return; /* not queued */ +static void remove_from_queue( + cache_item_t *ci) +{ /* {{{ */ + if (ci == NULL) + return; + if ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) + return; /* not queued */ - if (ci->prev == NULL) - cache_queue_head = ci->next; /* reset head */ - else - ci->prev->next = ci->next; + if (ci->prev == NULL) + cache_queue_head = ci->next; /* reset head */ + else + ci->prev->next = ci->next; - if (ci->next == NULL) - cache_queue_tail = ci->prev; /* reset the tail */ - else - ci->next->prev = ci->prev; + if (ci->next == NULL) + cache_queue_tail = ci->prev; /* reset the tail */ + else + ci->next->prev = ci->prev; - ci->next = ci->prev = NULL; - ci->flags &= ~CI_FLAGS_IN_QUEUE; + ci->next = ci->prev = NULL; + ci->flags &= ~CI_FLAGS_IN_QUEUE; - pthread_mutex_lock (&stats_lock); - assert (stats_queue_length > 0); - stats_queue_length--; - pthread_mutex_unlock (&stats_lock); + pthread_mutex_lock(&stats_lock); + assert(stats_queue_length > 0); + stats_queue_length--; + pthread_mutex_unlock(&stats_lock); -} /* }}} static void remove_from_queue */ +} /* }}} static void remove_from_queue */ /* free the resources associated with the cache_item_t * must hold cache_lock when calling this function */ -static void *free_cache_item(cache_item_t *ci) /* {{{ */ -{ - if (ci == NULL) return NULL; +static void *free_cache_item( + cache_item_t *ci) +{ /* {{{ */ + if (ci == NULL) + return NULL; - remove_from_queue(ci); + remove_from_queue(ci); - for (size_t i=0; i < ci->values_num; i++) - free(ci->values[i]); + for (size_t i = 0; i < ci->values_num; i++) + free(ci->values[i]); - free (ci->values); - free (ci->file); + free(ci->values); + free(ci->file); - /* in case anyone is waiting */ - pthread_cond_broadcast(&ci->flushed); - pthread_cond_destroy(&ci->flushed); + /* in case anyone is waiting */ + pthread_cond_broadcast(&ci->flushed); + pthread_cond_destroy(&ci->flushed); - free (ci); + free(ci); - return NULL; -} /* }}} static void *free_cache_item */ + return NULL; +} /* }}} static void *free_cache_item */ /* * enqueue_cache_item: * `cache_lock' must be acquired before calling this function! */ -static int enqueue_cache_item (cache_item_t *ci, /* {{{ */ +static int enqueue_cache_item( + cache_item_t *ci, /* {{{ */ queue_side_t side) { - if (ci == NULL) - return (-1); + if (ci == NULL) + return (-1); - if (ci->values_num == 0) - return (0); + if (ci->values_num == 0) + return (0); - if (side == HEAD) - { - if (cache_queue_head == ci) - return 0; + if (side == HEAD) { + if (cache_queue_head == ci) + return 0; - /* remove if further down in queue */ - remove_from_queue(ci); + /* remove if further down in queue */ + remove_from_queue(ci); - ci->prev = NULL; - ci->next = cache_queue_head; - if (ci->next != NULL) - ci->next->prev = ci; - cache_queue_head = ci; - - if (cache_queue_tail == NULL) - cache_queue_tail = cache_queue_head; - } - else /* (side == TAIL) */ - { - /* We don't move values back in the list.. */ - if (ci->flags & CI_FLAGS_IN_QUEUE) - return (0); + ci->prev = NULL; + ci->next = cache_queue_head; + if (ci->next != NULL) + ci->next->prev = ci; + cache_queue_head = ci; - assert (ci->next == NULL); - assert (ci->prev == NULL); + if (cache_queue_tail == NULL) + cache_queue_tail = cache_queue_head; + } else { /* (side == TAIL) */ + /* We don't move values back in the list.. */ + if (ci->flags & CI_FLAGS_IN_QUEUE) + return (0); - ci->prev = cache_queue_tail; + assert(ci->next == NULL); + assert(ci->prev == NULL); - if (cache_queue_tail == NULL) - cache_queue_head = ci; - else - cache_queue_tail->next = ci; + ci->prev = cache_queue_tail; + + if (cache_queue_tail == NULL) + cache_queue_head = ci; + else + cache_queue_tail->next = ci; - cache_queue_tail = ci; - } + cache_queue_tail = ci; + } - ci->flags |= CI_FLAGS_IN_QUEUE; + ci->flags |= CI_FLAGS_IN_QUEUE; - pthread_cond_signal(&queue_cond); - pthread_mutex_lock (&stats_lock); - stats_queue_length++; - pthread_mutex_unlock (&stats_lock); + pthread_cond_signal(&queue_cond); + pthread_mutex_lock(&stats_lock); + stats_queue_length++; + pthread_mutex_unlock(&stats_lock); - return (0); -} /* }}} int enqueue_cache_item */ + return (0); +} /* }}} int enqueue_cache_item */ /* * tree_callback_flush: * Called via `g_tree_foreach' in `flush_thread_main'. `cache_lock' is held * while this is in progress. */ -static gboolean tree_callback_flush (gpointer key, gpointer value, /* {{{ */ +static gboolean tree_callback_flush( + gpointer key, + gpointer value, /* {{{ */ gpointer data) { - cache_item_t *ci; - callback_flush_data_t *cfd; - - ci = (cache_item_t *) value; - cfd = (callback_flush_data_t *) data; - - if (ci->flags & CI_FLAGS_IN_QUEUE) - return FALSE; - - if (ci->values_num > 0 - && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING) - && ((ci->flags & CI_FLAGS_SUSPENDED) == 0)) - { - enqueue_cache_item (ci, TAIL); - } - else if (((cfd->now - ci->last_flush_time) >= config_flush_interval) - && (ci->values_num <= 0)) - { - assert ((char *) key == ci->file); - if (!rrd_add_ptr((void ***)&cfd->keys, &cfd->keys_num, (void *)key)) - { - RRDD_LOG (LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed."); - return (FALSE); + cache_item_t *ci; + callback_flush_data_t *cfd; + + ci = (cache_item_t *) value; + cfd = (callback_flush_data_t *) data; + + if (ci->flags & CI_FLAGS_IN_QUEUE) + return FALSE; + + if (ci->values_num > 0 + && (ci->last_flush_time <= cfd->abs_timeout || state != RUNNING) + && ((ci->flags & CI_FLAGS_SUSPENDED) == 0)) { + enqueue_cache_item(ci, TAIL); + } else if (((cfd->now - ci->last_flush_time) >= config_flush_interval) + && (ci->values_num <= 0)) { + assert((char *) key == ci->file); + if (!rrd_add_ptr((void ***) &cfd->keys, &cfd->keys_num, (void *) key)) { + RRDD_LOG(LOG_ERR, "tree_callback_flush: rrd_add_ptrs failed."); + return (FALSE); + } } - } - return (FALSE); -} /* }}} gboolean tree_callback_flush */ + return (FALSE); +} /* }}} gboolean tree_callback_flush */ -static int flush_old_values (int max_age) +static int flush_old_values( + int max_age) { - callback_flush_data_t cfd; - size_t k; - - memset (&cfd, 0, sizeof (cfd)); - /* Pass the current time as user data so that we don't need to call - * `time' for each node. */ - cfd.now = time (NULL); - cfd.keys = NULL; - cfd.keys_num = 0; - - if (max_age > 0) - cfd.abs_timeout = cfd.now - max_age; - else - cfd.abs_timeout = cfd.now + 2*config_write_jitter + 1; - - /* `tree_callback_flush' will return the keys of all values that haven't - * been touched in the last `config_flush_interval' seconds in `cfd'. - * The char*'s in this array point to the same memory as ci->file, so we - * don't need to free them separately. */ - g_tree_foreach (cache_tree, tree_callback_flush, (gpointer) &cfd); - - for (k = 0; k < cfd.keys_num; k++) - { - gboolean status = g_tree_remove(cache_tree, cfd.keys[k]); - /* should never fail, since we have held the cache_lock - * the entire time */ - assert(status == TRUE); - } - - if (cfd.keys != NULL) - { - free (cfd.keys); - cfd.keys = NULL; - } + callback_flush_data_t cfd; + size_t k; - return (0); -} /* int flush_old_values */ + memset(&cfd, 0, sizeof(cfd)); + /* Pass the current time as user data so that we don't need to call + * `time' for each node. */ + cfd.now = time(NULL); + cfd.keys = NULL; + cfd.keys_num = 0; -static void *flush_thread_main (void UNUSED(*args)) /* {{{ */ -{ - struct timeval now; - struct timespec next_flush; - int status; - - gettimeofday (&now, NULL); - next_flush.tv_sec = now.tv_sec + config_flush_interval; - next_flush.tv_nsec = 1000 * now.tv_usec; - - pthread_mutex_lock(&cache_lock); - - while (state == RUNNING) - { - gettimeofday (&now, NULL); - if ((now.tv_sec > next_flush.tv_sec) - || ((now.tv_sec == next_flush.tv_sec) - && ((1000 * now.tv_usec) > next_flush.tv_nsec))) - { - RRDD_LOG(LOG_DEBUG, "flushing old values"); + if (max_age > 0) + cfd.abs_timeout = cfd.now - max_age; + else + cfd.abs_timeout = cfd.now + 2 * config_write_jitter + 1; - /* Determine the time of the next cache flush. */ - next_flush.tv_sec = now.tv_sec + config_flush_interval; + /* `tree_callback_flush' will return the keys of all values that haven't + * been touched in the last `config_flush_interval' seconds in `cfd'. + * The char*'s in this array point to the same memory as ci->file, so we + * don't need to free them separately. */ + g_tree_foreach(cache_tree, tree_callback_flush, (gpointer) & cfd); - /* Flush all values that haven't been written in the last - * `config_write_interval' seconds. */ - flush_old_values (config_write_interval); + for (k = 0; k < cfd.keys_num; k++) { + gboolean status = g_tree_remove(cache_tree, cfd.keys[k]); - /* unlock the cache while we rotate so we don't block incoming - * updates if the fsync() blocks on disk I/O */ - pthread_mutex_unlock(&cache_lock); - journal_rotate(); - pthread_mutex_lock(&cache_lock); + /* should never fail, since we have held the cache_lock + * the entire time */ + assert(status == TRUE); } - status = pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush); - if (status != 0 && status != ETIMEDOUT) - { - RRDD_LOG (LOG_ERR, "flush_thread_main: " - "pthread_cond_timedwait returned %i.", status); + if (cfd.keys != NULL) { + free(cfd.keys); + cfd.keys = NULL; } - } - if (config_flush_at_shutdown) - flush_old_values (-1); /* flush everything */ + return (0); +} /* int flush_old_values */ - state = SHUTDOWN; +static void *flush_thread_main( + void UNUSED(*args)) +{ /* {{{ */ + struct timeval now; + struct timespec next_flush; + int status; - pthread_mutex_unlock(&cache_lock); + gettimeofday(&now, NULL); + next_flush.tv_sec = now.tv_sec + config_flush_interval; + next_flush.tv_nsec = 1000 * now.tv_usec; - return NULL; -} /* void *flush_thread_main */ + pthread_mutex_lock(&cache_lock); -static void *queue_thread_main (void UNUSED(*args)) /* {{{ */ -{ - pthread_mutex_lock (&cache_lock); + while (state == RUNNING) { + gettimeofday(&now, NULL); + if ((now.tv_sec > next_flush.tv_sec) + || ((now.tv_sec == next_flush.tv_sec) + && ((1000 * now.tv_usec) > next_flush.tv_nsec))) { + RRDD_LOG(LOG_DEBUG, "flushing old values"); + + /* Determine the time of the next cache flush. */ + next_flush.tv_sec = now.tv_sec + config_flush_interval; + + /* Flush all values that haven't been written in the last + * `config_write_interval' seconds. */ + flush_old_values(config_write_interval); + + /* unlock the cache while we rotate so we don't block incoming + * updates if the fsync() blocks on disk I/O */ + pthread_mutex_unlock(&cache_lock); + journal_rotate(); + pthread_mutex_lock(&cache_lock); + } - while (state != SHUTDOWN - || (cache_queue_head != NULL && config_flush_at_shutdown)) - { - cache_item_t *ci; - char *file; - char **values; - size_t values_num; - int status; - - /* Now, check if there's something to store away. If not, wait until - * something comes in. */ - if (cache_queue_head == NULL) - { - status = pthread_cond_wait (&queue_cond, &cache_lock); - if ((status != 0) && (status != ETIMEDOUT)) - { - RRDD_LOG (LOG_ERR, "queue_thread_main: " - "pthread_cond_wait returned %i.", status); - } + status = + pthread_cond_timedwait(&flush_cond, &cache_lock, &next_flush); + if (status != 0 && status != ETIMEDOUT) { + RRDD_LOG(LOG_ERR, "flush_thread_main: " + "pthread_cond_timedwait returned %i.", status); + } } - /* Check if a value has arrived. This may be NULL if we timed out or there - * was an interrupt such as a signal. */ - if (cache_queue_head == NULL) - continue; + if (config_flush_at_shutdown) + flush_old_values(-1); /* flush everything */ - ci = cache_queue_head; + state = SHUTDOWN; - /* copy the relevant parts */ - file = strdup (ci->file); - if (file == NULL) - { - RRDD_LOG (LOG_ERR, "queue_thread_main: strdup failed."); - continue; - } + pthread_mutex_unlock(&cache_lock); - assert(ci->values != NULL); - assert(ci->values_num > 0); + return NULL; +} /* void *flush_thread_main */ - values = ci->values; - values_num = ci->values_num; +static void *queue_thread_main( + void UNUSED(*args)) +{ /* {{{ */ + pthread_mutex_lock(&cache_lock); - wipe_ci_values(ci, time(NULL)); - remove_from_queue(ci); + while (state != SHUTDOWN + || (cache_queue_head != NULL && config_flush_at_shutdown)) { + cache_item_t *ci; + char *file; + char **values; + size_t values_num; + int status; + + /* Now, check if there's something to store away. If not, wait until + * something comes in. */ + if (cache_queue_head == NULL) { + status = pthread_cond_wait(&queue_cond, &cache_lock); + if ((status != 0) && (status != ETIMEDOUT)) { + RRDD_LOG(LOG_ERR, "queue_thread_main: " + "pthread_cond_wait returned %i.", status); + } + } - pthread_mutex_unlock (&cache_lock); + /* Check if a value has arrived. This may be NULL if we timed out or there + * was an interrupt such as a signal. */ + if (cache_queue_head == NULL) + continue; - rrd_clear_error (); - status = rrd_update_r (file, NULL, (int) values_num, (void *) values); - if (status != 0) - { - RRDD_LOG (LOG_NOTICE, "queue_thread_main: " - "rrd_update_r (%s) failed with status %i. (%s)", - file, status, rrd_get_error()); - } + ci = cache_queue_head; - journal_write("wrote", file); + /* copy the relevant parts */ + file = strdup(ci->file); + if (file == NULL) { + RRDD_LOG(LOG_ERR, "queue_thread_main: strdup failed."); + continue; + } - /* Search again in the tree. It's possible someone issued a "FORGET" - * while we were writing the update values. */ - pthread_mutex_lock(&cache_lock); - ci = (cache_item_t *) g_tree_lookup(cache_tree, file); - if (ci) - pthread_cond_broadcast(&ci->flushed); - pthread_mutex_unlock(&cache_lock); + assert(ci->values != NULL); + assert(ci->values_num > 0); - if (status == 0) - { - pthread_mutex_lock (&stats_lock); - stats_updates_written++; - stats_data_sets_written += values_num; - pthread_mutex_unlock (&stats_lock); - } + values = ci->values; + values_num = ci->values_num; - rrd_free_ptrs((void ***) &values, &values_num); - free(file); + wipe_ci_values(ci, time(NULL)); + remove_from_queue(ci); - pthread_mutex_lock (&cache_lock); - } - pthread_mutex_unlock (&cache_lock); + pthread_mutex_unlock(&cache_lock); - return (NULL); -} /* }}} void *queue_thread_main */ + rrd_clear_error(); + status = rrd_update_r(file, NULL, (int) values_num, (void *) values); + if (status != 0) { + RRDD_LOG(LOG_NOTICE, "queue_thread_main: " + "rrd_update_r (%s) failed with status %i. (%s)", + file, status, rrd_get_error()); + } -static int buffer_get_field (char **buffer_ret, /* {{{ */ - size_t *buffer_size_ret, char **field_ret) -{ - char *buffer; - size_t buffer_pos; - size_t buffer_size; - char *field; - size_t field_size; - int status; - - buffer = *buffer_ret; - buffer_pos = 0; - buffer_size = *buffer_size_ret; - field = *buffer_ret; - field_size = 0; - - if (buffer_size <= 0) - return (-1); + journal_write("wrote", file); + + /* Search again in the tree. It's possible someone issued a "FORGET" + * while we were writing the update values. */ + pthread_mutex_lock(&cache_lock); + ci = (cache_item_t *) g_tree_lookup(cache_tree, file); + if (ci) + pthread_cond_broadcast(&ci->flushed); + pthread_mutex_unlock(&cache_lock); + + if (status == 0) { + pthread_mutex_lock(&stats_lock); + stats_updates_written++; + stats_data_sets_written += values_num; + pthread_mutex_unlock(&stats_lock); + } - /* This is ensured by `handle_request'. */ - assert (buffer[buffer_size - 1] == '\0'); + rrd_free_ptrs((void ***) &values, &values_num); + free(file); - status = -1; - while (buffer_pos < buffer_size) - { - /* Check for end-of-field or end-of-buffer */ - if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0') - { - field[field_size] = 0; - field_size++; - buffer_pos++; - status = 0; - break; - } - /* Handle escaped characters. */ - else if (buffer[buffer_pos] == '\\') - { - if (buffer_pos >= (buffer_size - 1)) - break; - buffer_pos++; - field[field_size] = buffer[buffer_pos]; - field_size++; - buffer_pos++; - } - /* Normal operation */ - else - { - field[field_size] = buffer[buffer_pos]; - field_size++; - buffer_pos++; + pthread_mutex_lock(&cache_lock); } - } /* while (buffer_pos < buffer_size) */ + pthread_mutex_unlock(&cache_lock); - if (status != 0) - return (status); + return (NULL); +} /* }}} void *queue_thread_main */ + +static int buffer_get_field( + char **buffer_ret, /* {{{ */ + size_t *buffer_size_ret, + char **field_ret) +{ + char *buffer; + size_t buffer_pos; + size_t buffer_size; + char *field; + size_t field_size; + int status; + + buffer = *buffer_ret; + buffer_pos = 0; + buffer_size = *buffer_size_ret; + field = *buffer_ret; + field_size = 0; + + if (buffer_size <= 0) + return (-1); - *buffer_ret = buffer + buffer_pos; - *buffer_size_ret = buffer_size - buffer_pos; - *field_ret = field; + /* This is ensured by `handle_request'. */ + assert(buffer[buffer_size - 1] == '\0'); - return (0); -} /* }}} int buffer_get_field */ + status = -1; + while (buffer_pos < buffer_size) { + /* Check for end-of-field or end-of-buffer */ + if (buffer[buffer_pos] == ' ' || buffer[buffer_pos] == '\0') { + field[field_size] = 0; + field_size++; + buffer_pos++; + status = 0; + break; + } + /* Handle escaped characters. */ + else if (buffer[buffer_pos] == '\\') { + if (buffer_pos >= (buffer_size - 1)) + break; + buffer_pos++; + field[field_size] = buffer[buffer_pos]; + field_size++; + buffer_pos++; + } + /* Normal operation */ + else { + field[field_size] = buffer[buffer_pos]; + field_size++; + buffer_pos++; + } + } /* while (buffer_pos < buffer_size) */ + + if (status != 0) + return (status); + + *buffer_ret = buffer + buffer_pos; + *buffer_size_ret = buffer_size - buffer_pos; + *field_ret = field; + + return (0); +} /* }}} int buffer_get_field */ /* if we're restricting writes to the base directory, * check whether the file falls within the dir * returns 1 if OK, otherwise 0 */ -static int check_file_access (const char *file, listen_socket_t *sock) /* {{{ */ -{ - assert(file != NULL); +static int check_file_access( + const char *file, + listen_socket_t *sock) +{ /* {{{ */ + assert(file != NULL); + + if (!config_write_base_only || JOURNAL_REPLAY(sock) + || config_base_dir == NULL) + return 1; + + if (strstr(file, "../") != NULL) + return 0; + + /* relative paths without "../" are ok */ + if (*file != '/') + return 1; + + /* file must be of the format base + "/" + <1+ char filename> */ + if (strlen(file) < _config_base_dir_len + 1) + return 0; + if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) + return 0; + if (*(file + _config_base_dir_len) != '/') + return 0; - if (!config_write_base_only - || JOURNAL_REPLAY(sock) - || config_base_dir == NULL) return 1; - - if (strstr(file, "../") != NULL) - return 0; - - /* relative paths without "../" are ok */ - if (*file != '/') return 1; - - /* file must be of the format base + "/" + <1+ char filename> */ - if (strlen(file) < _config_base_dir_len + 1) return 0; - if (strncmp(file, config_base_dir, _config_base_dir_len) != 0) return 0; - if (*(file + _config_base_dir_len) != '/') return 0; - - return 1; -} /* }}} static int check_file_access */ +} /* }}} static int check_file_access */ /* when using a base dir, convert relative paths to absolute paths. * The result must be free()'ed by the caller. */ -static char* get_abs_path(const char *filename) +static char *get_abs_path( + const char *filename) { - char *ret; - assert(filename != NULL); - - if (config_base_dir == NULL || *filename == '/') - return strdup(filename); + char *ret; - ret = malloc(strlen(config_base_dir) + 1 + strlen(filename) + 1); - if (ret == NULL) - RRDD_LOG (LOG_ERR, "get_abs_path: malloc failed."); - else - sprintf(ret, "%s/%s", config_base_dir, filename); + assert(filename != NULL); - return ret; -} /* }}} static int get_abs_path */ + if (config_base_dir == NULL || *filename == '/') + return strdup(filename); -static int flush_file (const char *filename) /* {{{ */ -{ - cache_item_t *ci; - - pthread_mutex_lock (&cache_lock); + ret = malloc(strlen(config_base_dir) + 1 + strlen(filename) + 1); + if (ret == NULL) + RRDD_LOG(LOG_ERR, "get_abs_path: malloc failed."); + else + sprintf(ret, "%s/%s", config_base_dir, filename); - ci = (cache_item_t *) g_tree_lookup (cache_tree, filename); - if (ci == NULL) - { - pthread_mutex_unlock (&cache_lock); - return (ENOENT); - } + return ret; +} /* }}} static int get_abs_path */ - if ((ci->values_num > 0) - && ((ci->flags & CI_FLAGS_SUSPENDED) == 0)) - { - /* Enqueue at head */ - enqueue_cache_item (ci, HEAD); - pthread_cond_wait(&ci->flushed, &cache_lock); - } +static int flush_file( + const char *filename) +{ /* {{{ */ + cache_item_t *ci; - /* DO NOT DO ANYTHING WITH ci HERE!! The entry - * may have been purged during our cond_wait() */ + pthread_mutex_lock(&cache_lock); - pthread_mutex_unlock(&cache_lock); + ci = (cache_item_t *) g_tree_lookup(cache_tree, filename); + if (ci == NULL) { + pthread_mutex_unlock(&cache_lock); + return (ENOENT); + } - return (0); -} /* }}} int flush_file */ + if ((ci->values_num > 0) + && ((ci->flags & CI_FLAGS_SUSPENDED) == 0)) { + /* Enqueue at head */ + enqueue_cache_item(ci, HEAD); + pthread_cond_wait(&ci->flushed, &cache_lock); + } -static int syntax_error(listen_socket_t *sock, command_t *cmd) /* {{{ */ -{ - char *err = "Syntax error.\n"; + /* DO NOT DO ANYTHING WITH ci HERE!! The entry + * may have been purged during our cond_wait() */ - if (cmd && cmd->syntax) - err = cmd->syntax; + pthread_mutex_unlock(&cache_lock); - return send_response(sock, RESP_ERR, "Usage: %s", err); -} /* }}} static int syntax_error() */ + return (0); +} /* }}} int flush_file */ + +static int syntax_error( + listen_socket_t *sock, + command_t *cmd) +{ /* {{{ */ + char *err = "Syntax error.\n"; + + if (cmd && cmd->syntax) + err = cmd->syntax; + + return send_response(sock, RESP_ERR, "Usage: %s", err); +} /* }}} static int syntax_error() */ + +static int handle_request_stats( + HANDLER_PROTO) +{ /* {{{ */ + uint64_t copy_queue_length; + uint64_t copy_updates_received; + uint64_t copy_flush_received; + uint64_t copy_updates_written; + uint64_t copy_data_sets_written; + uint64_t copy_journal_bytes; + uint64_t copy_journal_rotate; + + uint64_t tree_nodes_number; + uint64_t tree_depth; -static int handle_request_stats (HANDLER_PROTO) /* {{{ */ -{ - uint64_t copy_queue_length; - uint64_t copy_updates_received; - uint64_t copy_flush_received; - uint64_t copy_updates_written; - uint64_t copy_data_sets_written; - uint64_t copy_journal_bytes; - uint64_t copy_journal_rotate; - - uint64_t tree_nodes_number; - uint64_t tree_depth; - - pthread_mutex_lock (&stats_lock); - copy_queue_length = stats_queue_length; - copy_updates_received = stats_updates_received; - copy_flush_received = stats_flush_received; - copy_updates_written = stats_updates_written; - copy_data_sets_written = stats_data_sets_written; - copy_journal_bytes = stats_journal_bytes; - copy_journal_rotate = stats_journal_rotate; - pthread_mutex_unlock (&stats_lock); - - pthread_mutex_lock (&cache_lock); - tree_nodes_number = (uint64_t) g_tree_nnodes (cache_tree); - tree_depth = (uint64_t) g_tree_height (cache_tree); - pthread_mutex_unlock (&cache_lock); - - add_response_info(sock, - "QueueLength: %"PRIu64"\n", copy_queue_length); - add_response_info(sock, - "UpdatesReceived: %"PRIu64"\n", copy_updates_received); - add_response_info(sock, - "FlushesReceived: %"PRIu64"\n", copy_flush_received); - add_response_info(sock, - "UpdatesWritten: %"PRIu64"\n", copy_updates_written); - add_response_info(sock, - "DataSetsWritten: %"PRIu64"\n", copy_data_sets_written); - add_response_info(sock, "TreeNodesNumber: %"PRIu64"\n", tree_nodes_number); - add_response_info(sock, "TreeDepth: %"PRIu64"\n", tree_depth); - add_response_info(sock, "JournalBytes: %"PRIu64"\n", copy_journal_bytes); - add_response_info(sock, "JournalRotate: %"PRIu64"\n", copy_journal_rotate); - - send_response(sock, RESP_OK, "Statistics follow\n"); - - return (0); -} /* }}} int handle_request_stats */ - -static int handle_request_flush (HANDLER_PROTO) /* {{{ */ -{ - char *file=NULL, *pbuffile; - int status, rc; - - status = buffer_get_field (&buffer, &buffer_size, &pbuffile); - if (status != 0) - { - return syntax_error(sock,cmd); - } - else - { pthread_mutex_lock(&stats_lock); - stats_flush_received++; + copy_queue_length = stats_queue_length; + copy_updates_received = stats_updates_received; + copy_flush_received = stats_flush_received; + copy_updates_written = stats_updates_written; + copy_data_sets_written = stats_data_sets_written; + copy_journal_bytes = stats_journal_bytes; + copy_journal_rotate = stats_journal_rotate; pthread_mutex_unlock(&stats_lock); - file = get_abs_path(pbuffile); - if (file == NULL) { - rc = send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOMEM)); - goto done; - } - if (!check_file_access(file, sock)) { - rc = send_response(sock, RESP_ERR, "%s: %s\n", file, rrd_strerror(EACCES)); - goto done; - } + pthread_mutex_lock(&cache_lock); + tree_nodes_number = (uint64_t) g_tree_nnodes(cache_tree); + tree_depth = (uint64_t) g_tree_height(cache_tree); + pthread_mutex_unlock(&cache_lock); - status = flush_file (file); - if (status == 0) - rc = send_response(sock, RESP_OK, "Successfully flushed %s.\n", file); - else if (status == ENOENT) - { - /* no file in our tree; see whether it exists at all */ - struct stat statbuf; + add_response_info(sock, "QueueLength: %" PRIu64 "\n", copy_queue_length); + add_response_info(sock, + "UpdatesReceived: %" PRIu64 "\n", + copy_updates_received); + add_response_info(sock, "FlushesReceived: %" PRIu64 "\n", + copy_flush_received); + add_response_info(sock, "UpdatesWritten: %" PRIu64 "\n", + copy_updates_written); + add_response_info(sock, "DataSetsWritten: %" PRIu64 "\n", + copy_data_sets_written); + add_response_info(sock, "TreeNodesNumber: %" PRIu64 "\n", + tree_nodes_number); + add_response_info(sock, "TreeDepth: %" PRIu64 "\n", tree_depth); + add_response_info(sock, "JournalBytes: %" PRIu64 "\n", + copy_journal_bytes); + add_response_info(sock, "JournalRotate: %" PRIu64 "\n", + copy_journal_rotate); + + send_response(sock, RESP_OK, "Statistics follow\n"); - memset(&statbuf, 0, sizeof(statbuf)); - if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode)) - rc = send_response(sock, RESP_OK, "Nothing to flush: %s.\n", file); - else - rc = send_response(sock, RESP_ERR, "No such file: %s.\n", file); - } - else if (status < 0) - rc = send_response(sock, RESP_ERR, "Internal error.\n"); - else - rc = send_response(sock, RESP_ERR, "Failed with status %i.\n", status); - } + return (0); +} /* }}} int handle_request_stats */ -done: - free(file); - return rc; -} /* }}} int handle_request_flush */ +static int handle_request_flush( + HANDLER_PROTO) +{ /* {{{ */ + char *file = NULL, *pbuffile; + int status, rc; -static int handle_request_flushall(HANDLER_PROTO) /* {{{ */ -{ - RRDD_LOG(LOG_DEBUG, "Received FLUSHALL"); + status = buffer_get_field(&buffer, &buffer_size, &pbuffile); + if (status != 0) { + return syntax_error(sock, cmd); + } else { + pthread_mutex_lock(&stats_lock); + stats_flush_received++; + pthread_mutex_unlock(&stats_lock); + + file = get_abs_path(pbuffile); + if (file == NULL) { + rc = send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOMEM)); + goto done; + } + if (!check_file_access(file, sock)) { + rc = send_response(sock, RESP_ERR, "%s: %s\n", file, + rrd_strerror(EACCES)); + goto done; + } - pthread_mutex_lock(&cache_lock); - flush_old_values(-1); - pthread_mutex_unlock(&cache_lock); + status = flush_file(file); + if (status == 0) + rc = send_response(sock, RESP_OK, "Successfully flushed %s.\n", + file); + else if (status == ENOENT) { + /* no file in our tree; see whether it exists at all */ + struct stat statbuf; + + memset(&statbuf, 0, sizeof(statbuf)); + if (stat(file, &statbuf) == 0 && S_ISREG(statbuf.st_mode)) + rc = send_response(sock, RESP_OK, "Nothing to flush: %s.\n", + file); + else + rc = send_response(sock, RESP_ERR, "No such file: %s.\n", + file); + } else if (status < 0) + rc = send_response(sock, RESP_ERR, "Internal error.\n"); + else + rc = send_response(sock, RESP_ERR, "Failed with status %i.\n", + status); + } - return send_response(sock, RESP_OK, "Started flush.\n"); -} /* }}} static int handle_request_flushall */ + done: + free(file); + return rc; +} /* }}} int handle_request_flush */ -static int handle_request_pending(HANDLER_PROTO) /* {{{ */ -{ - int status; - char *file=NULL, *pbuffile; - cache_item_t *ci; - - status = buffer_get_field(&buffer, &buffer_size, &pbuffile); - if (status != 0) - return syntax_error(sock,cmd); - - file = get_abs_path(pbuffile); - if (file == NULL) - return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOMEM)); - - pthread_mutex_lock(&cache_lock); - ci = g_tree_lookup(cache_tree, file); - if (ci != NULL) { - for (size_t i=0; i < ci->values_num; i++) - add_response_info(sock, "%s\n", ci->values[i]); - } - - free(file); - pthread_mutex_unlock(&cache_lock); - return send_response(sock, RESP_OK, "updates pending\n"); -} /* }}} static int handle_request_pending */ - -static int handle_request_forget(HANDLER_PROTO) /* {{{ */ -{ - int status, rc; - gboolean found; - char *file=NULL, *pbuffile; - - status = buffer_get_field(&buffer, &buffer_size, &pbuffile); - if (status != 0) { - rc = syntax_error(sock,cmd); - goto done; - } - - file = get_abs_path(pbuffile); - if (file == NULL) { - rc = send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOMEM)); - goto done; - } - if (!check_file_access(file, sock)) { - rc = send_response(sock, RESP_ERR, "%s: %s\n", file, rrd_strerror(EACCES)); - goto done; - } - - pthread_mutex_lock(&cache_lock); - found = g_tree_remove(cache_tree, file); - pthread_mutex_unlock(&cache_lock); - - if (found == TRUE) - { - if (!JOURNAL_REPLAY(sock)) - journal_write("forget", file); - - rc = send_response(sock, RESP_OK, "Gone!\n"); - } - else - rc = send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT)); +static int handle_request_flushall( + HANDLER_PROTO) +{ /* {{{ */ + RRDD_LOG(LOG_DEBUG, "Received FLUSHALL"); -done: - free(file); - return rc; -} /* }}} static int handle_request_forget */ + pthread_mutex_lock(&cache_lock); + flush_old_values(-1); + pthread_mutex_unlock(&cache_lock); -static int handle_request_queue (HANDLER_PROTO) /* {{{ */ -{ - cache_item_t *ci; + return send_response(sock, RESP_OK, "Started flush.\n"); +} /* }}} static int handle_request_flushall */ - pthread_mutex_lock(&cache_lock); +static int handle_request_pending( + HANDLER_PROTO) +{ /* {{{ */ + int status; + char *file = NULL, *pbuffile; + cache_item_t *ci; - ci = cache_queue_head; - while (ci != NULL) - { - add_response_info(sock, "%d %s\n", ci->values_num, ci->file); - ci = ci->next; - } + status = buffer_get_field(&buffer, &buffer_size, &pbuffile); + if (status != 0) + return syntax_error(sock, cmd); - pthread_mutex_unlock(&cache_lock); + file = get_abs_path(pbuffile); + if (file == NULL) + return send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOMEM)); - return send_response(sock, RESP_OK, "in queue.\n"); -} /* }}} int handle_request_queue */ + pthread_mutex_lock(&cache_lock); + ci = g_tree_lookup(cache_tree, file); + if (ci != NULL) { + for (size_t i = 0; i < ci->values_num; i++) + add_response_info(sock, "%s\n", ci->values[i]); + } -static int handle_request_update (HANDLER_PROTO) /* {{{ */ -{ - char *file=NULL, *pbuffile; - int values_num = 0; - int status, rc; - char orig_buf[RRD_CMD_MAX]; - - cache_item_t *ci; - - /* save it for the journal later */ - if (!JOURNAL_REPLAY(sock)) { - strncpy(orig_buf, buffer, min(RRD_CMD_MAX,buffer_size)); - orig_buf[min(RRD_CMD_MAX,buffer_size) - 1] = '\0'; - } - - status = buffer_get_field (&buffer, &buffer_size, &pbuffile); - if (status != 0) { - rc = syntax_error(sock,cmd); - goto done; - } - - pthread_mutex_lock(&stats_lock); - stats_updates_received++; - pthread_mutex_unlock(&stats_lock); - - file = get_abs_path(pbuffile); - if (file == NULL) { - rc = send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOMEM)); - goto done; - } - if (!check_file_access(file, sock)) { - rc = send_response(sock, RESP_ERR, "%s: %s\n", file, rrd_strerror(EACCES)); - goto done; - } - - pthread_mutex_lock (&cache_lock); - ci = g_tree_lookup (cache_tree, file); - - if (ci == NULL) /* {{{ */ - { - struct stat statbuf; - cache_item_t *tmp; - - /* don't hold the lock while we setup; stat(2) might block */ + free(file); pthread_mutex_unlock(&cache_lock); + return send_response(sock, RESP_OK, "updates pending\n"); +} /* }}} static int handle_request_pending */ - memset (&statbuf, 0, sizeof (statbuf)); - status = stat (file, &statbuf); - if (status != 0) - { - RRDD_LOG (LOG_NOTICE, "handle_request_update: stat (%s) failed.", file); +static int handle_request_forget( + HANDLER_PROTO) +{ /* {{{ */ + int status, rc; + gboolean found; + char *file = NULL, *pbuffile; - status = errno; - if (status == ENOENT) - rc = send_response(sock, RESP_ERR, "No such file: %s\n", file); - else - rc = send_response(sock, RESP_ERR, - "stat failed with error %i.\n", status); - goto done; - } - if (!S_ISREG (statbuf.st_mode)) { - rc = send_response(sock, RESP_ERR, "Not a regular file: %s\n", file); - goto done; + status = buffer_get_field(&buffer, &buffer_size, &pbuffile); + if (status != 0) { + rc = syntax_error(sock, cmd); + goto done; } - if (access(file, R_OK|W_OK) != 0) { - rc = send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n", - file, rrd_strerror(errno)); - goto done; + file = get_abs_path(pbuffile); + if (file == NULL) { + rc = send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOMEM)); + goto done; + } + if (!check_file_access(file, sock)) { + rc = send_response(sock, RESP_ERR, "%s: %s\n", file, + rrd_strerror(EACCES)); + goto done; } - ci = (cache_item_t *) malloc (sizeof (cache_item_t)); - if (ci == NULL) - { - RRDD_LOG (LOG_ERR, "handle_request_update: malloc failed."); + pthread_mutex_lock(&cache_lock); + found = g_tree_remove(cache_tree, file); + pthread_mutex_unlock(&cache_lock); - rc = send_response(sock, RESP_ERR, "malloc failed.\n"); - goto done; - } - memset (ci, 0, sizeof (cache_item_t)); + if (found == TRUE) { + if (!JOURNAL_REPLAY(sock)) + journal_write("forget", file); - ci->file = strdup (file); - if (ci->file == NULL) - { - free (ci); - RRDD_LOG (LOG_ERR, "handle_request_update: strdup failed."); + rc = send_response(sock, RESP_OK, "Gone!\n"); + } else + rc = send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOENT)); - rc = send_response(sock, RESP_ERR, "strdup failed.\n"); - goto done; - } + done: + free(file); + return rc; +} /* }}} static int handle_request_forget */ - time_t last_update_from_file; - rrd_file_t * rrd_file; - rrd_t rrd; +static int handle_request_queue( + HANDLER_PROTO) +{ /* {{{ */ + cache_item_t *ci; - rrd_clear_error(); - rrd_init(&rrd); - rrd_file = rrd_open(file, &rrd, RRD_READONLY | RRD_LOCK); - if (!rrd_file) - { - rrd_free(&rrd); - free (ci); - RRDD_LOG (LOG_ERR, "handle_request_update: Could not read RRD file."); + pthread_mutex_lock(&cache_lock); - rc = send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error()); - goto done; + ci = cache_queue_head; + while (ci != NULL) { + add_response_info(sock, "%d %s\n", ci->values_num, ci->file); + ci = ci->next; } - last_update_from_file = rrd.live_head->last_up; - rrd_close(rrd_file); - rrd_free(&rrd); - ci->last_update_stamp = last_update_from_file; + pthread_mutex_unlock(&cache_lock); - if(ci->last_update_stamp<1) - { - free (ci); - RRDD_LOG (LOG_ERR, "handle_request_update: Invalid timestamp from RRD file."); + return send_response(sock, RESP_OK, "in queue.\n"); +} /* }}} int handle_request_queue */ - rc = send_response(sock, RESP_ERR, "Error: rrdcached: Invalid timestamp returned\n"); - goto done; - } +static int handle_request_update( + HANDLER_PROTO) +{ /* {{{ */ + char *file = NULL, *pbuffile; + int values_num = 0; + int status, rc; + char orig_buf[RRD_CMD_MAX]; - wipe_ci_values(ci, now); - ci->flags = CI_FLAGS_IN_TREE; - pthread_cond_init(&ci->flushed, NULL); + cache_item_t *ci; - pthread_mutex_lock(&cache_lock); + /* save it for the journal later */ + if (!JOURNAL_REPLAY(sock)) { + strncpy(orig_buf, buffer, min(RRD_CMD_MAX, buffer_size)); + orig_buf[min(RRD_CMD_MAX, buffer_size) - 1] = '\0'; + } - /* another UPDATE might have added this entry in the meantime */ - tmp = g_tree_lookup (cache_tree, file); - if (tmp == NULL) - g_tree_replace (cache_tree, (void *) ci->file, (void *) ci); - else - { - free_cache_item (ci); - ci = tmp; + status = buffer_get_field(&buffer, &buffer_size, &pbuffile); + if (status != 0) { + rc = syntax_error(sock, cmd); + goto done; } - /* state may have changed while we were unlocked */ - if (state == SHUTDOWN) { - pthread_mutex_unlock(&cache_lock); - rc = -1; - goto done; + pthread_mutex_lock(&stats_lock); + stats_updates_received++; + pthread_mutex_unlock(&stats_lock); + + file = get_abs_path(pbuffile); + if (file == NULL) { + rc = send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOMEM)); + goto done; + } + if (!check_file_access(file, sock)) { + rc = send_response(sock, RESP_ERR, "%s: %s\n", file, + rrd_strerror(EACCES)); + goto done; } - } /* }}} */ - assert (ci != NULL); - /* don't re-write updates in replay mode */ - if (!JOURNAL_REPLAY(sock)) - journal_write("update", orig_buf); + pthread_mutex_lock(&cache_lock); + ci = g_tree_lookup(cache_tree, file); + + if (ci == NULL) { /* {{{ */ + struct stat statbuf; + cache_item_t *tmp; + + /* don't hold the lock while we setup; stat(2) might block */ + pthread_mutex_unlock(&cache_lock); + + memset(&statbuf, 0, sizeof(statbuf)); + status = stat(file, &statbuf); + if (status != 0) { + RRDD_LOG(LOG_NOTICE, "handle_request_update: stat (%s) failed.", + file); + + status = errno; + if (status == ENOENT) + rc = send_response(sock, RESP_ERR, "No such file: %s\n", + file); + else + rc = send_response(sock, RESP_ERR, + "stat failed with error %i.\n", status); + goto done; + } + if (!S_ISREG(statbuf.st_mode)) { + rc = send_response(sock, RESP_ERR, "Not a regular file: %s\n", + file); + goto done; + } - while (buffer_size > 0) - { - char *value; - double stamp; - char *eostamp; + if (access(file, R_OK | W_OK) != 0) { + rc = send_response(sock, RESP_ERR, "Cannot read/write %s: %s\n", + file, rrd_strerror(errno)); + goto done; + } - status = buffer_get_field (&buffer, &buffer_size, &value); - if (status != 0) - { - RRDD_LOG (LOG_INFO, "handle_request_update: Error reading field."); - break; - } + ci = (cache_item_t *) malloc(sizeof(cache_item_t)); + if (ci == NULL) { + RRDD_LOG(LOG_ERR, "handle_request_update: malloc failed."); - /* make sure update time is always moving forward. We use double here since - update does support subsecond precision for timestamps ... */ - if ( ( rrd_strtodbl( value, &eostamp, &stamp, NULL) != 1 ) || *eostamp != ':') - { - pthread_mutex_unlock(&cache_lock); - rc = send_response(sock, RESP_ERR, - "Cannot find timestamp in '%s'!\n", value); - goto done; - } - else if (stamp <= ci->last_update_stamp) - { - pthread_mutex_unlock(&cache_lock); - rc = send_response(sock, RESP_ERR, - "illegal attempt to update using time %lf when last" - " update time is %lf (minimum one second step)\n", - stamp, ci->last_update_stamp); - goto done; - } - else - ci->last_update_stamp = stamp; + rc = send_response(sock, RESP_ERR, "malloc failed.\n"); + goto done; + } + memset(ci, 0, sizeof(cache_item_t)); - if (!rrd_add_strdup_chunk(&ci->values, &ci->values_num, value, - &ci->values_alloc, config_alloc_chunk)) - { - RRDD_LOG (LOG_ERR, "handle_request_update: rrd_add_strdup failed."); - continue; - } + ci->file = strdup(file); + if (ci->file == NULL) { + free(ci); + RRDD_LOG(LOG_ERR, "handle_request_update: strdup failed."); - values_num++; - } + rc = send_response(sock, RESP_ERR, "strdup failed.\n"); + goto done; + } - if (((now - ci->last_flush_time) >= config_write_interval) - && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) - && ((ci->flags & CI_FLAGS_SUSPENDED) == 0) - && (ci->values_num > 0)) - { - enqueue_cache_item (ci, TAIL); - } + time_t last_update_from_file; + rrd_file_t *rrd_file; + rrd_t rrd; + + rrd_clear_error(); + rrd_init(&rrd); + rrd_file = rrd_open(file, &rrd, RRD_READONLY | RRD_LOCK); + if (!rrd_file) { + rrd_free(&rrd); + free(ci); + RRDD_LOG(LOG_ERR, + "handle_request_update: Could not read RRD file."); + + rc = send_response(sock, RESP_ERR, "RRD Error: %s\n", + rrd_get_error()); + goto done; + } + last_update_from_file = rrd.live_head->last_up; + rrd_close(rrd_file); + rrd_free(&rrd); - pthread_mutex_unlock (&cache_lock); + ci->last_update_stamp = last_update_from_file; - if (values_num < 1) - rc = send_response(sock, RESP_ERR, "No values updated.\n"); - else - rc = send_response(sock, RESP_OK, - "errors, enqueued %i value(s).\n", values_num); + if (ci->last_update_stamp < 1) { + free(ci); + RRDD_LOG(LOG_ERR, + "handle_request_update: Invalid timestamp from RRD file."); -done: - free(file); - return rc; -} /* }}} int handle_request_update */ + rc = send_response(sock, RESP_ERR, + "Error: rrdcached: Invalid timestamp returned\n"); + goto done; + } -struct fetch_parsed{ - char *file; - char *cf; + wipe_ci_values(ci, now); + ci->flags = CI_FLAGS_IN_TREE; + pthread_cond_init(&ci->flushed, NULL); - time_t start_tm; - time_t end_tm; - unsigned long step; - unsigned long steps; + pthread_mutex_lock(&cache_lock); - unsigned long ds_cnt; - char **ds_namv; - rrd_value_t *data; + /* another UPDATE might have added this entry in the meantime */ + tmp = g_tree_lookup(cache_tree, file); + if (tmp == NULL) + g_tree_replace(cache_tree, (void *) ci->file, (void *) ci); + else { + free_cache_item(ci); + ci = tmp; + } - unsigned long field_cnt; - unsigned int *field_idx; -}; + /* state may have changed while we were unlocked */ + if (state == SHUTDOWN) { + pthread_mutex_unlock(&cache_lock); + rc = -1; + goto done; + } + } /* }}} */ + assert(ci != NULL); -static void free_fetch_parsed (struct fetch_parsed *parsed) /* {{{ */ -{ - unsigned int i; - rrd_freemem(parsed->file); - for (i = 0; i < parsed->ds_cnt; i++) - rrd_freemem(parsed->ds_namv[i]); - rrd_freemem(parsed->ds_namv); - rrd_freemem(parsed->data); -} + /* don't re-write updates in replay mode */ + if (!JOURNAL_REPLAY(sock)) + journal_write("update", orig_buf); -static int handle_request_fetch_parse (HANDLER_PROTO, - struct fetch_parsed *parsed) /* {{{ */ -{ - char *pbuffile; - char *start_str; - char *end_str; - - time_t t; - int status; - - parsed->file = NULL; - parsed->cf = NULL; - start_str = NULL; - end_str = NULL; - - /* Read the arguments */ - do /* while (0) */ - { - status = buffer_get_field (&buffer, &buffer_size, &pbuffile); - if (status != 0) - break; + while (buffer_size > 0) { + char *value; + double stamp; + char *eostamp; - status = buffer_get_field (&buffer, &buffer_size, &parsed->cf); - if (status != 0) - break; + status = buffer_get_field(&buffer, &buffer_size, &value); + if (status != 0) { + RRDD_LOG(LOG_INFO, "handle_request_update: Error reading field."); + break; + } - status = buffer_get_field (&buffer, &buffer_size, &start_str); - if (status != 0) - { - start_str = NULL; - status = 0; - break; + /* make sure update time is always moving forward. We use double here since + update does support subsecond precision for timestamps ... */ + if ((rrd_strtodbl(value, &eostamp, &stamp, NULL) != 1) + || *eostamp != ':') { + pthread_mutex_unlock(&cache_lock); + rc = send_response(sock, RESP_ERR, + "Cannot find timestamp in '%s'!\n", value); + goto done; + } else if (stamp <= ci->last_update_stamp) { + pthread_mutex_unlock(&cache_lock); + rc = send_response(sock, RESP_ERR, + "illegal attempt to update using time %lf when last" + " update time is %lf (minimum one second step)\n", + stamp, ci->last_update_stamp); + goto done; + } else + ci->last_update_stamp = stamp; + + if (!rrd_add_strdup_chunk(&ci->values, &ci->values_num, value, + &ci->values_alloc, config_alloc_chunk)) { + RRDD_LOG(LOG_ERR, + "handle_request_update: rrd_add_strdup failed."); + continue; + } + + values_num++; } - status = buffer_get_field (&buffer, &buffer_size, &end_str); - if (status != 0) - { - end_str = NULL; - status = 0; - break; + if (((now - ci->last_flush_time) >= config_write_interval) + && ((ci->flags & CI_FLAGS_IN_QUEUE) == 0) + && ((ci->flags & CI_FLAGS_SUSPENDED) == 0) + && (ci->values_num > 0)) { + enqueue_cache_item(ci, TAIL); } - } while (0); - if (status != 0) { - syntax_error(sock,cmd); - return -1; - } + pthread_mutex_unlock(&cache_lock); - parsed->file = get_abs_path(pbuffile); - if (parsed->file == NULL) { - send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOMEM)); - return -1; - } - if (!check_file_access(parsed->file, sock)) { - send_response(sock, RESP_ERR, "%s: %s\n", parsed->file, rrd_strerror(EACCES)); - return -1; /* failure */ - } - - status = flush_file (parsed->file); - if ((status != 0) && (status != ENOENT)) { - send_response (sock, RESP_ERR, - "flush_file (%s) failed with status %i.\n", - parsed->file, status); - return status; - } - - t = time (NULL); /* "now" */ - - /* Parse start time */ - if (start_str != NULL) - { - char *endptr; - long value; - - endptr = NULL; - errno = 0; - value = strtol (start_str, &endptr, /* base = */ 0); - if ((endptr == start_str) || (errno != 0)) { - send_response(sock, RESP_ERR, - "Cannot parse start time `%s': Only simple integers are allowed.\n", - start_str); - return -1; - } - - if (value > 0) - parsed->start_tm = (time_t) value; + if (values_num < 1) + rc = send_response(sock, RESP_ERR, "No values updated.\n"); else - parsed->start_tm = (time_t) (t + value); - } - else - { - parsed->start_tm = t - 86400; - } - - /* Parse end time */ - if (end_str != NULL) - { - char *endptr; - long value; - - endptr = NULL; - errno = 0; - value = strtol (end_str, &endptr, /* base = */ 0); - if ((endptr == end_str) || (errno != 0)) { - send_response(sock, RESP_ERR, - "Cannot parse end time `%s': Only simple integers are allowed.\n", - end_str); - return -1; - } - - if (value > 0) - parsed->end_tm = (time_t) value; - else - parsed->end_tm = (time_t) (t + value); - } - else - { - parsed->end_tm = t; - } - - parsed->step = -1; - parsed->ds_cnt = 0; - parsed->ds_namv = NULL; - parsed->data = NULL; - - status = rrd_fetch_r (parsed->file, parsed->cf, - &parsed->start_tm, &parsed->end_tm, &parsed->step, - &parsed->ds_cnt, &parsed->ds_namv, &parsed->data); - if (status != 0) { - send_response(sock, RESP_ERR, - "rrd_fetch_r failed: %s\n", rrd_get_error ()); - return -1; - } - - parsed->steps = (parsed->end_tm - parsed->start_tm) / parsed->step; - - /* prepare field index */ - { + rc = send_response(sock, RESP_OK, + "errors, enqueued %i value(s).\n", values_num); + + done: + free(file); + return rc; +} /* }}} int handle_request_update */ + +struct fetch_parsed { + char *file; + char *cf; + + time_t start_tm; + time_t end_tm; + unsigned long step; + unsigned long steps; + + unsigned long ds_cnt; + char **ds_namv; + rrd_value_t *data; + + unsigned long field_cnt; + unsigned int *field_idx; +}; + +static void free_fetch_parsed( + struct fetch_parsed *parsed) +{ /* {{{ */ unsigned int i; - char *field; - - parsed->field_cnt = 0; - parsed->field_idx = malloc(sizeof(*parsed->field_idx)*parsed->ds_cnt); - - /* now parse the extra names */ - while ( buffer_get_field (&buffer, &buffer_size, &field) == 0 ) { - /* check boundaries */ - if (parsed->field_cnt >= parsed->ds_cnt) { - free_fetch_parsed(parsed); - send_response(sock, RESP_ERR, - "too many fields given - duplicates!\n" - ); - return -1; - } - /* if the field is empty, then next */ - if (field[0]==0) - continue; - /* try to find the string */ - unsigned int found=parsed->ds_cnt; - for(i=0; i < parsed->ds_cnt; i++) { - if (strcmp(field,parsed->ds_namv[i])==0) { - found=i; - break; - } - } - if (found >= parsed->ds_cnt) { - free_fetch_parsed(parsed); - send_response(sock, RESP_ERR, - "field %s not found in %s\n", - field,parsed->file); - return -1; - } - for(i=0; i < parsed->field_cnt; i++) { - if (parsed->field_idx[i] == found) { - free_fetch_parsed(parsed); - send_response(sock, RESP_ERR, - "field %s already used\n", - field - ); - return -1; + + rrd_freemem(parsed->file); + for (i = 0; i < parsed->ds_cnt; i++) + rrd_freemem(parsed->ds_namv[i]); + rrd_freemem(parsed->ds_namv); + rrd_freemem(parsed->data); +} + +static int handle_request_fetch_parse( + HANDLER_PROTO, + struct fetch_parsed *parsed) +{ /* {{{ */ + char *pbuffile; + char *start_str; + char *end_str; + + time_t t; + int status; + + parsed->file = NULL; + parsed->cf = NULL; + start_str = NULL; + end_str = NULL; + + /* Read the arguments */ + do { /* while (0) */ + status = buffer_get_field(&buffer, &buffer_size, &pbuffile); + if (status != 0) + break; + + status = buffer_get_field(&buffer, &buffer_size, &parsed->cf); + if (status != 0) + break; + + status = buffer_get_field(&buffer, &buffer_size, &start_str); + if (status != 0) { + start_str = NULL; + status = 0; + break; + } + + status = buffer_get_field(&buffer, &buffer_size, &end_str); + if (status != 0) { + end_str = NULL; + status = 0; + break; } - } - parsed->field_idx[parsed->field_cnt++]=found; + } while (0); + + if (status != 0) { + syntax_error(sock, cmd); + return -1; + } + + parsed->file = get_abs_path(pbuffile); + if (parsed->file == NULL) { + send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOMEM)); + return -1; } - if (parsed->field_cnt == 0) { - parsed->field_cnt = parsed->ds_cnt; - for(i=0; i < parsed->field_cnt; i++) { - parsed->field_idx[i] = i; - } + if (!check_file_access(parsed->file, sock)) { + send_response(sock, RESP_ERR, "%s: %s\n", parsed->file, + rrd_strerror(EACCES)); + return -1; /* failure */ } - } - return 0; -} -static int handle_request_fetch (HANDLER_PROTO) /* {{{ */ -{ - unsigned long i,j; - - time_t t; - int status; - - struct fetch_parsed parsed; - - status = handle_request_fetch_parse (cmd, sock, now, - buffer, buffer_size, - &parsed); - if (status != 0) - return 0; - - add_response_info (sock, "FlushVersion: %lu\n", 1); - add_response_info (sock, "Start: %lu\n", (unsigned long) parsed.start_tm); - add_response_info (sock, "End: %lu\n", (unsigned long) parsed.end_tm); - add_response_info (sock, "Step: %lu\n", parsed.step); - - /* Add list of DS names */ - add_response_info (sock, "DSCount: %lu\n", parsed.field_cnt); - add_response_info (sock, "DSName: "); - for (i = 0; i < parsed.field_cnt; i++) - { - add_response_info (sock, (i == 0 ? "%s" :" %s"), - parsed.ds_namv[parsed.field_idx[i]]); - } - add_response_info (sock, "\n"); - - /* Add the actual data */ - assert (parsed.step > 0); - for (t = parsed.start_tm + parsed.step, j=0; - t <= parsed.end_tm; - t += parsed.step,j++) - { - add_response_info (sock, "%10lu:", (unsigned long) t); - for (i = 0; i < parsed.field_cnt; i++) - { - unsigned int idx = j*parsed.ds_cnt+parsed.field_idx[i]; - add_response_info (sock, " %0.17e", parsed.data[idx]); + status = flush_file(parsed->file); + if ((status != 0) && (status != ENOENT)) { + send_response(sock, RESP_ERR, + "flush_file (%s) failed with status %i.\n", + parsed->file, status); + return status; } - add_response_info (sock, "\n"); - } /* for (t) */ - free_fetch_parsed(&parsed); - return (send_response (sock, RESP_OK, "Success\n")); -} /* }}} int handle_request_fetch */ + t = time(NULL); /* "now" */ + + /* Parse start time */ + if (start_str != NULL) { + char *endptr; + long value; + + endptr = NULL; + errno = 0; + value = strtol(start_str, &endptr, /* base = */ 0); + if ((endptr == start_str) || (errno != 0)) { + send_response(sock, RESP_ERR, + "Cannot parse start time `%s': Only simple integers are allowed.\n", + start_str); + return -1; + } -static int handle_request_fetchbin (HANDLER_PROTO) /* {{{ */ -{ - unsigned long i,j; - - time_t t; - int status; - - struct fetch_parsed parsed; - - double *dbuffer; - size_t dbuffer_size; - - status = handle_request_fetch_parse (cmd, sock, now, - buffer, buffer_size, - &parsed); - if (status != 0) - return 0; - - /* create a buffer for the full binary line */ - dbuffer_size = sizeof(double) * parsed.steps; - dbuffer=calloc(1,dbuffer_size); - if (!dbuffer) { - return (send_response (sock, RESP_ERR, - "Failed memory allocation\n")); - } - - assert (parsed.step > 0); - - add_response_info (sock, "FlushVersion: %lu\n", 1); - add_response_info (sock, "Start: %lu\n", (unsigned long) parsed.start_tm); - add_response_info (sock, "End: %lu\n", (unsigned long) parsed.end_tm); - add_response_info (sock, "Step: %lu\n", parsed.step); - add_response_info (sock, "DSCount: %lu\n", parsed.field_cnt); - - /* now iterate the parsed fields */ - for (i = 0; i < parsed.field_cnt; i++) - { - for (t = parsed.start_tm + parsed.step, j=0; - t <= parsed.end_tm; - t += parsed.step,j++) + if (value > 0) + parsed->start_tm = (time_t) value; + else + parsed->start_tm = (time_t) (t + value); + } else { + parsed->start_tm = t - 86400; + } + + /* Parse end time */ + if (end_str != NULL) { + char *endptr; + long value; + + endptr = NULL; + errno = 0; + value = strtol(end_str, &endptr, /* base = */ 0); + if ((endptr == end_str) || (errno != 0)) { + send_response(sock, RESP_ERR, + "Cannot parse end time `%s': Only simple integers are allowed.\n", + end_str); + return -1; + } + + if (value > 0) + parsed->end_tm = (time_t) value; + else + parsed->end_tm = (time_t) (t + value); + } else { + parsed->end_tm = t; + } + + parsed->step = -1; + parsed->ds_cnt = 0; + parsed->ds_namv = NULL; + parsed->data = NULL; + + status = rrd_fetch_r(parsed->file, parsed->cf, + &parsed->start_tm, &parsed->end_tm, &parsed->step, + &parsed->ds_cnt, &parsed->ds_namv, &parsed->data); + if (status != 0) { + send_response(sock, RESP_ERR, + "rrd_fetch_r failed: %s\n", rrd_get_error()); + return -1; + } + + parsed->steps = (parsed->end_tm - parsed->start_tm) / parsed->step; + + /* prepare field index */ { - unsigned int idx = j*parsed.ds_cnt+parsed.field_idx[i]; - dbuffer[j] = parsed.data[idx]; + unsigned int i; + char *field; + + parsed->field_cnt = 0; + parsed->field_idx = + malloc(sizeof(*parsed->field_idx) * parsed->ds_cnt); + + /* now parse the extra names */ + while (buffer_get_field(&buffer, &buffer_size, &field) == 0) { + /* check boundaries */ + if (parsed->field_cnt >= parsed->ds_cnt) { + free_fetch_parsed(parsed); + send_response(sock, RESP_ERR, + "too many fields given - duplicates!\n"); + return -1; + } + /* if the field is empty, then next */ + if (field[0] == 0) + continue; + /* try to find the string */ + unsigned int found = parsed->ds_cnt; + + for (i = 0; i < parsed->ds_cnt; i++) { + if (strcmp(field, parsed->ds_namv[i]) == 0) { + found = i; + break; + } + } + if (found >= parsed->ds_cnt) { + free_fetch_parsed(parsed); + send_response(sock, RESP_ERR, + "field %s not found in %s\n", + field, parsed->file); + return -1; + } + for (i = 0; i < parsed->field_cnt; i++) { + if (parsed->field_idx[i] == found) { + free_fetch_parsed(parsed); + send_response(sock, RESP_ERR, + "field %s already used\n", field); + return -1; + } + } + parsed->field_idx[parsed->field_cnt++] = found; + } + if (parsed->field_cnt == 0) { + parsed->field_cnt = parsed->ds_cnt; + for (i = 0; i < parsed->field_cnt; i++) { + parsed->field_idx[i] = i; + } + } } + return 0; +} + +static int handle_request_fetch( + HANDLER_PROTO) +{ /* {{{ */ + unsigned long i, j; + + time_t t; + int status; + + struct fetch_parsed parsed; + + status = handle_request_fetch_parse(cmd, sock, now, + buffer, buffer_size, &parsed); + if (status != 0) + return 0; + + add_response_info(sock, "FlushVersion: %lu\n", 1); + add_response_info(sock, "Start: %lu\n", (unsigned long) parsed.start_tm); + add_response_info(sock, "End: %lu\n", (unsigned long) parsed.end_tm); + add_response_info(sock, "Step: %lu\n", parsed.step); + + /* Add list of DS names */ + add_response_info(sock, "DSCount: %lu\n", parsed.field_cnt); + add_response_info(sock, "DSName: "); + for (i = 0; i < parsed.field_cnt; i++) { + add_response_info(sock, (i == 0 ? "%s" : " %s"), + parsed.ds_namv[parsed.field_idx[i]]); + } + add_response_info(sock, "\n"); + + /* Add the actual data */ + assert(parsed.step > 0); + for (t = parsed.start_tm + parsed.step, j = 0; + t <= parsed.end_tm; t += parsed.step, j++) { + add_response_info(sock, "%10lu:", (unsigned long) t); + for (i = 0; i < parsed.field_cnt; i++) { + unsigned int idx = j * parsed.ds_cnt + parsed.field_idx[i]; + + add_response_info(sock, " %0.17e", parsed.data[idx]); + } + add_response_info(sock, "\n"); + } /* for (t) */ + free_fetch_parsed(&parsed); + + return (send_response(sock, RESP_OK, "Success\n")); +} /* }}} int handle_request_fetch */ + +static int handle_request_fetchbin( + HANDLER_PROTO) +{ /* {{{ */ + unsigned long i, j; + + time_t t; + int status; - add_binary_response_info (sock, - "DSName-", - parsed.ds_namv[parsed.field_idx[i]], - dbuffer, - parsed.steps, - sizeof(double) - ); - } + struct fetch_parsed parsed; - free_fetch_parsed(&parsed); - free(dbuffer); + double *dbuffer; + size_t dbuffer_size; - return (send_response (sock, RESP_OK_BIN, "%i Success\n", - parsed.field_cnt+5)); -} /* }}} int handle_request_fetchbin */ + status = handle_request_fetch_parse(cmd, sock, now, + buffer, buffer_size, &parsed); + if (status != 0) + return 0; + + /* create a buffer for the full binary line */ + dbuffer_size = sizeof(double) * parsed.steps; + dbuffer = calloc(1, dbuffer_size); + if (!dbuffer) { + return (send_response(sock, RESP_ERR, "Failed memory allocation\n")); + } + + assert(parsed.step > 0); + + add_response_info(sock, "FlushVersion: %lu\n", 1); + add_response_info(sock, "Start: %lu\n", (unsigned long) parsed.start_tm); + add_response_info(sock, "End: %lu\n", (unsigned long) parsed.end_tm); + add_response_info(sock, "Step: %lu\n", parsed.step); + add_response_info(sock, "DSCount: %lu\n", parsed.field_cnt); + + /* now iterate the parsed fields */ + for (i = 0; i < parsed.field_cnt; i++) { + for (t = parsed.start_tm + parsed.step, j = 0; + t <= parsed.end_tm; t += parsed.step, j++) { + unsigned int idx = j * parsed.ds_cnt + parsed.field_idx[i]; + + dbuffer[j] = parsed.data[idx]; + } + + add_binary_response_info(sock, + "DSName-", + parsed.ds_namv[parsed.field_idx[i]], + dbuffer, parsed.steps, sizeof(double) + ); + } + + free_fetch_parsed(&parsed); + free(dbuffer); + + return (send_response(sock, RESP_OK_BIN, "%i Success\n", + parsed.field_cnt + 5)); +} /* }}} int handle_request_fetchbin */ /* we came across a "WROTE" entry during journal replay. * throw away any values that we have accumulated for this file */ -static int handle_request_wrote (HANDLER_PROTO) /* {{{ */ -{ - cache_item_t *ci; - const char *file = buffer; +static int handle_request_wrote( + HANDLER_PROTO) +{ /* {{{ */ + cache_item_t *ci; + const char *file = buffer; - pthread_mutex_lock(&cache_lock); + pthread_mutex_lock(&cache_lock); + + ci = g_tree_lookup(cache_tree, file); + if (ci == NULL) { + pthread_mutex_unlock(&cache_lock); + return (0); + } + + if (ci->values) + rrd_free_ptrs((void ***) &ci->values, &ci->values_num); + + wipe_ci_values(ci, now); + remove_from_queue(ci); - ci = g_tree_lookup(cache_tree, file); - if (ci == NULL) - { pthread_mutex_unlock(&cache_lock); return (0); - } +} /* }}} int handle_request_wrote */ - if (ci->values) - rrd_free_ptrs((void ***) &ci->values, &ci->values_num); +static int handle_request_info( + HANDLER_PROTO) +{ /* {{{ */ + char *file = NULL, *pbuffile; + int status, rc; + rrd_info_t *info = NULL; - wipe_ci_values(ci, now); - remove_from_queue(ci); + /* obtain filename */ + status = buffer_get_field(&buffer, &buffer_size, &pbuffile); + if (status != 0) + return syntax_error(sock, cmd); + /* get full pathname */ + file = get_abs_path(pbuffile); + if (file == NULL) { + rc = send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOMEM)); + goto done; + } + if (!check_file_access(file, sock)) { + rc = send_response(sock, RESP_ERR, "%s: %s\n", file, + rrd_strerror(EACCES)); + goto done; + } + /* get data */ + rrd_clear_error(); + info = rrd_info_r(file); + if (!info) { + rc = send_response(sock, RESP_ERR, "RRD Error: %s\n", + rrd_get_error()); + goto done; + } + for (rrd_info_t *data = info; data != NULL; data = data->next) { + switch (data->type) { + case RD_I_VAL: + if (isnan(data->value.u_val)) + add_response_info(sock, "%s %d NaN\n", data->key, data->type); + else + add_response_info(sock, "%s %d %0.10e\n", data->key, + data->type, data->value.u_val); + break; + case RD_I_CNT: + add_response_info(sock, "%s %d %lu\n", data->key, data->type, + data->value.u_cnt); + break; + case RD_I_INT: + add_response_info(sock, "%s %d %d\n", data->key, data->type, + data->value.u_int); + break; + case RD_I_STR: + add_response_info(sock, "%s %d %s\n", data->key, data->type, + data->value.u_str); + break; + case RD_I_BLO: + add_response_info(sock, "%s %d %lu\n", data->key, data->type, + data->value.u_blo.size); + break; + } + } - pthread_mutex_unlock(&cache_lock); - return (0); -} /* }}} int handle_request_wrote */ + rc = send_response(sock, RESP_OK, "Info for %s follows\n", file); -static int handle_request_info (HANDLER_PROTO) /* {{{ */ -{ - char *file=NULL, *pbuffile; - int status, rc; - rrd_info_t *info=NULL; - - /* obtain filename */ - status = buffer_get_field(&buffer, &buffer_size, &pbuffile); - if (status != 0) - return syntax_error(sock,cmd); - /* get full pathname */ - file = get_abs_path(pbuffile); - if (file == NULL) { - rc = send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOMEM)); - goto done; - } - if (!check_file_access(file, sock)) { - rc = send_response(sock, RESP_ERR, "%s: %s\n", file, rrd_strerror(EACCES)); - goto done; - } - /* get data */ - rrd_clear_error (); - info = rrd_info_r(file); - if(!info) { - rc = send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error()); - goto done; - } - for (rrd_info_t *data = info; data != NULL; data = data->next) { - switch (data->type) { - case RD_I_VAL: - if (isnan(data->value.u_val)) - add_response_info(sock,"%s %d NaN\n",data->key, data->type); - else - add_response_info(sock,"%s %d %0.10e\n", data->key, data->type, data->value.u_val); - break; - case RD_I_CNT: - add_response_info(sock,"%s %d %lu\n", data->key, data->type, data->value.u_cnt); - break; - case RD_I_INT: - add_response_info(sock,"%s %d %d\n", data->key, data->type, data->value.u_int); - break; - case RD_I_STR: - add_response_info(sock,"%s %d %s\n", data->key, data->type, data->value.u_str); - break; - case RD_I_BLO: - add_response_info(sock,"%s %d %lu\n", data->key, data->type, data->value.u_blo.size); - break; - } - } - - rc = send_response(sock, RESP_OK, "Info for %s follows\n",file); - -done: - rrd_info_free(info); - free(file); - return rc; -} /* }}} static int handle_request_info */ - -static int handle_request_first (HANDLER_PROTO) /* {{{ */ -{ - char *i, *file=NULL, *pbuffile; - int status, rc; - int idx; - time_t t; - - /* obtain filename */ - status = buffer_get_field(&buffer, &buffer_size, &pbuffile); - if (status != 0) { - rc = syntax_error(sock,cmd); - goto done; - } - /* get full pathname */ - file = get_abs_path(pbuffile); - if (file == NULL) { - rc = send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOMEM)); - goto done; - } - if (!check_file_access(file, sock)) { - rc = send_response(sock, RESP_ERR, "%s: %s\n", file, rrd_strerror(EACCES)); - goto done; - } - - status = buffer_get_field(&buffer, &buffer_size, &i); - if (status != 0) { - rc = syntax_error(sock,cmd); - goto done; - } - idx = atoi(i); - if(idx<0) { - rc = send_response(sock, RESP_ERR, "Invalid index specified (%d)\n", idx); - goto done; - } - - /* get data */ - rrd_clear_error (); - t = rrd_first_r(file,idx); - if (t<1) { - rc = send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error()); - goto done; - } - rc = send_response(sock, RESP_OK, "%lu\n",(unsigned)t); -done: - free(file); - return rc; -} /* }}} static int handle_request_first */ + done: + rrd_info_free(info); + free(file); + return rc; +} /* }}} static int handle_request_info */ + +static int handle_request_first( + HANDLER_PROTO) +{ /* {{{ */ + char *i, *file = NULL, *pbuffile; + int status, rc; + int idx; + time_t t; + + /* obtain filename */ + status = buffer_get_field(&buffer, &buffer_size, &pbuffile); + if (status != 0) { + rc = syntax_error(sock, cmd); + goto done; + } + /* get full pathname */ + file = get_abs_path(pbuffile); + if (file == NULL) { + rc = send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOMEM)); + goto done; + } + if (!check_file_access(file, sock)) { + rc = send_response(sock, RESP_ERR, "%s: %s\n", file, + rrd_strerror(EACCES)); + goto done; + } + status = buffer_get_field(&buffer, &buffer_size, &i); + if (status != 0) { + rc = syntax_error(sock, cmd); + goto done; + } + idx = atoi(i); + if (idx < 0) { + rc = send_response(sock, RESP_ERR, "Invalid index specified (%d)\n", + idx); + goto done; + } -static int handle_request_last (HANDLER_PROTO) /* {{{ */ -{ - char *file=NULL, *pbuffile; - int status, rc; - time_t t, from_file, step; - rrd_file_t * rrd_file; - cache_item_t * ci; - rrd_t rrd; - - /* obtain filename */ - status = buffer_get_field(&buffer, &buffer_size, &pbuffile); - if (status != 0) { - rc = syntax_error(sock,cmd); - goto done; - } - /* get full pathname */ - file = get_abs_path(pbuffile); - if (file == NULL) { - rc = send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOMEM)); - goto done; - } - if (!check_file_access(file, sock)) { - rc = send_response(sock, RESP_ERR, "%s: %s\n", file, rrd_strerror(EACCES)); - goto done; - } - rrd_clear_error(); - rrd_init(&rrd); - rrd_file = rrd_open(file, &rrd, RRD_READONLY | RRD_LOCK); - if(!rrd_file) { + /* get data */ + rrd_clear_error(); + t = rrd_first_r(file, idx); + if (t < 1) { + rc = send_response(sock, RESP_ERR, "RRD Error: %s\n", + rrd_get_error()); + goto done; + } + rc = send_response(sock, RESP_OK, "%lu\n", (unsigned) t); + done: + free(file); + return rc; +} /* }}} static int handle_request_first */ + + +static int handle_request_last( + HANDLER_PROTO) +{ /* {{{ */ + char *file = NULL, *pbuffile; + int status, rc; + time_t t, from_file, step; + rrd_file_t *rrd_file; + cache_item_t *ci; + rrd_t rrd; + + /* obtain filename */ + status = buffer_get_field(&buffer, &buffer_size, &pbuffile); + if (status != 0) { + rc = syntax_error(sock, cmd); + goto done; + } + /* get full pathname */ + file = get_abs_path(pbuffile); + if (file == NULL) { + rc = send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOMEM)); + goto done; + } + if (!check_file_access(file, sock)) { + rc = send_response(sock, RESP_ERR, "%s: %s\n", file, + rrd_strerror(EACCES)); + goto done; + } + rrd_clear_error(); + rrd_init(&rrd); + rrd_file = rrd_open(file, &rrd, RRD_READONLY | RRD_LOCK); + if (!rrd_file) { + rrd_free(&rrd); + rc = send_response(sock, RESP_ERR, "RRD Error: %s\n", + rrd_get_error()); + goto done; + } + from_file = rrd.live_head->last_up; + step = rrd.stat_head->pdp_step; + rrd_close(rrd_file); + pthread_mutex_lock(&cache_lock); + ci = g_tree_lookup(cache_tree, file); + if (ci) + t = ci->last_update_stamp; + else + t = from_file; + pthread_mutex_unlock(&cache_lock); + t -= t % step; rrd_free(&rrd); - rc = send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error()); - goto done; - } - from_file = rrd.live_head->last_up; - step = rrd.stat_head->pdp_step; - rrd_close(rrd_file); - pthread_mutex_lock(&cache_lock); - ci = g_tree_lookup(cache_tree, file); - if (ci) - t = ci->last_update_stamp; - else - t = from_file; - pthread_mutex_unlock(&cache_lock); - t -= t % step; - rrd_free(&rrd); - if(t<1) { - rc = send_response(sock, RESP_ERR, "Error: rrdcached: Invalid timestamp returned\n"); - goto done; - } - rc = send_response(sock, RESP_OK, "%lu\n",(unsigned)t); -done: - free(file); - return rc; -} /* }}} static int handle_request_last */ - -static int handle_request_create (HANDLER_PROTO) /* {{{ */ -{ - char *file = NULL, *pbuffile; - char *file_copy = NULL, *dir = NULL, *dir2 = NULL; - char *tok; - int ac = 0; - char *av[128]; - char **sources = NULL; - int sources_length = 0; - char *template = NULL; - int status; - unsigned long step = 0; - time_t last_up = -1; - int no_overwrite = opt_no_overwrite; - int rc = -1; - - /* obtain filename */ - status = buffer_get_field(&buffer, &buffer_size, &pbuffile); - if (status != 0) { - rc = syntax_error(sock,cmd); - goto done; - } - /* get full pathname */ - file = get_abs_path(pbuffile); - if (file == NULL) { - rc = send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOMEM)); - goto done; - } - - file_copy = strdup(file); - if (file_copy == NULL) { - rc = send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOMEM)); - goto done; - } - if (!check_file_access(file, sock)) { - rc = send_response(sock, RESP_ERR, "%s: %s\n", file, rrd_strerror(EACCES)); - goto done; - } - RRDD_LOG(LOG_INFO, "rrdcreate request for %s",file); - - pthread_mutex_lock(&rrdfilecreate_lock); - dir = strdup(dirname(file_copy)); - dir2 = realpath(dir, NULL); - if (dir2 == NULL && errno == ENOENT) { - if (!config_allow_recursive_mkdir) { + if (t < 1) { rc = send_response(sock, RESP_ERR, - "No permission to recursively create: %s\nDid you pass -R to the daemon?\n", - dir); - pthread_mutex_unlock(&rrdfilecreate_lock); + "Error: rrdcached: Invalid timestamp returned\n"); goto done; } - if (rrd_mkdir_p(dir, 0755) != 0) { - rc = send_response(sock, RESP_ERR, "Cannot create %s: %s\n", - dir, strerror(errno)); - pthread_mutex_unlock(&rrdfilecreate_lock); + rc = send_response(sock, RESP_OK, "%lu\n", (unsigned) t); + done: + free(file); + return rc; +} /* }}} static int handle_request_last */ + +static int handle_request_create( + HANDLER_PROTO) +{ /* {{{ */ + char *file = NULL, *pbuffile; + char *file_copy = NULL, *dir = NULL, *dir2 = NULL; + char *tok; + int ac = 0; + char *av[128]; + char **sources = NULL; + int sources_length = 0; + char *template = NULL; + int status; + unsigned long step = 0; + time_t last_up = -1; + int no_overwrite = opt_no_overwrite; + int rc = -1; + + /* obtain filename */ + status = buffer_get_field(&buffer, &buffer_size, &pbuffile); + if (status != 0) { + rc = syntax_error(sock, cmd); + goto done; + } + /* get full pathname */ + file = get_abs_path(pbuffile); + if (file == NULL) { + rc = send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOMEM)); goto done; } - } - pthread_mutex_unlock(&rrdfilecreate_lock); - - while ((status = buffer_get_field(&buffer, &buffer_size, &tok)) == 0 && tok) { - if( ! strncmp(tok,"-b",2) ) { - status = buffer_get_field(&buffer, &buffer_size, &tok ); - if (status != 0) { - rc = syntax_error(sock,cmd); - goto done; - } - last_up = (time_t) atol(tok); - continue; - } - if( ! strncmp(tok,"-s",2) ) { - status = buffer_get_field(&buffer, &buffer_size, &tok ); - if (status != 0) { - rc = syntax_error(sock,cmd); - goto done; - } - step = atol(tok); - continue; - } - if( ! strncmp(tok,"-r",2) ) { - status = buffer_get_field(&buffer, &buffer_size, &tok ); - if (status != 0) { - rc = syntax_error(sock,cmd); - goto done; - } - sources = realloc(sources, sizeof(char*) * (sources_length + 2)); - if (sources == NULL) { - rc = send_response(sock, RESP_ERR, "Cannot allocate memory\n"); - goto done; - } - - flush_file(tok); - - sources[sources_length++] = tok; - sources[sources_length] = NULL; - - continue; - } - if( ! strncmp(tok,"-t",2) ) { - status = buffer_get_field(&buffer, &buffer_size, &tok ); - if (status != 0) { - rc = syntax_error(sock,cmd); - goto done; - } - flush_file(tok); - - template = tok; - continue; - } - if( ! strncmp(tok,"-O",2) ) { - no_overwrite = 1; - continue; - } - if( ! strncmp(tok,"DS:",3) ) { av[ac++]=tok; continue; } - if( ! strncmp(tok,"RRA:",4) ) { av[ac++]=tok; continue; } - rc = syntax_error(sock,cmd); - goto done; - } - if (last_up != -1 && last_up < 3600 * 24 * 365 * 10) { - rc = send_response(sock, RESP_ERR, "The first entry must be after 1980.\n"); - goto done; - } - - rrd_clear_error (); - pthread_mutex_lock(&rrdfilecreate_lock); - status = rrd_create_r2(file,step,last_up,no_overwrite, (const char**) sources, template, ac,(const char **)av); - pthread_mutex_unlock(&rrdfilecreate_lock); - - if(!status) { - rc = send_response(sock, RESP_OK, "RRD created OK\n"); - goto done; - } - rc = send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error()); -done: - free(file); - free(sources); - free(file_copy); - if (dir) { - free(dir); - } - free(dir2); - return rc; -} /* }}} static int handle_request_create */ - -static int handle_request_list (HANDLER_PROTO) /* {{{ */ -{ - char *filename = NULL; - char *rec = NULL; - int recursive = 0; - char *list, *start_ptr, *end_ptr, *ptr; - char fullpath[PATH_MAX], current[PATH_MAX], absolute[PATH_MAX]; - char bwc[PATH_MAX], bwd[PATH_MAX]; - char *base = &config_base_dir[0]; - struct stat sc, sd; - ssize_t len; - int status; - - if (config_base_dir == NULL) { - return send_response(sock, RESP_ERR, "No base directory defined\n"); - } - - /* get 'RECURSIVE' option */ - status = buffer_get_field(&buffer, &buffer_size, &rec); - if (status == 0) { - /* as 'RECURSIVE' is optional, the first argument may be the filename */ - if (rec[0] != '/' && strcmp(rec, "RECURSIVE") != 0) { - return syntax_error(sock, cmd); - } - - if (rec[0] == '/') { - filename = rec; - - } else if (strcmp(rec, "RECURSIVE") == 0) { - recursive = 1; - } - } - - /* Get pathname if not done already */ - if (!filename) { - status = buffer_get_field(&buffer, &buffer_size, &filename); - - if (status != 0) - return syntax_error(sock,cmd); - } - - /* get full pathname */ - snprintf(fullpath, PATH_MAX, "%s%s%s", - config_base_dir, (filename[0] == '/') ? "" : "/", filename); - - if (!check_file_access(fullpath, sock)) { - return send_response(sock, RESP_ERR, "Cannot read: %s\n", fullpath); - } - - /* get real path of config_base_dir in case it's a symlink */ - if (lstat(config_base_dir, &sd) == -1) { - return send_response(sock, RESP_ERR, "stat %s: %s\n", - config_base_dir, rrd_strerror(errno)); - } - - if ((sd.st_mode & S_IFMT) == S_IFLNK) { - len = readlink(config_base_dir, bwd, sizeof(bwd) - 1); - if (len == -1) { - return send_response(sock, RESP_ERR, "readlink %s: %s\n", - config_base_dir, rrd_strerror(errno)); - } - bwd[len] = '\0'; - base = &bwd[0]; - } - - list = rrd_list_r(recursive, fullpath); - - if (list == NULL) { - /* Empty directory listing */ - if (errno == 0) { - goto out_send_response; - } - - return send_response(sock, RESP_ERR, - "List %s: %s\n", fullpath, rrd_strerror(errno)); - } - - /* Check list items returned by rrd_list_r; - * the returned string is newline-separated: '%s\n%s\n...%s\n' - */ - start_ptr = list; - end_ptr = list; - - while (*start_ptr != '\0') { - end_ptr = strchr(start_ptr, '\n'); - - if (end_ptr == NULL) { - end_ptr = start_ptr + strlen(start_ptr); - } - - if ((end_ptr - start_ptr + strlen(fullpath) + 1) >= PATH_MAX) { - /* Name too long: skip entry */ - goto loop_next; - } - strncpy(¤t[0], start_ptr, (end_ptr - start_ptr)); - current[end_ptr - start_ptr] = '\0'; - - /* if a single .rrd was asked for, absolute == fullpath */ - ptr = strstr(fullpath, ".rrd"); - - if (ptr != NULL && strlen(ptr) == 4) { - snprintf(&absolute[0], PATH_MAX, "%s", fullpath); - } else { - snprintf(&absolute[0], PATH_MAX, "%s/%s", fullpath, current); + file_copy = strdup(file); + if (file_copy == NULL) { + rc = send_response(sock, RESP_ERR, "%s\n", rrd_strerror(ENOMEM)); + goto done; + } + if (!check_file_access(file, sock)) { + rc = send_response(sock, RESP_ERR, "%s: %s\n", file, + rrd_strerror(EACCES)); + goto done; + } + RRDD_LOG(LOG_INFO, "rrdcreate request for %s", file); + + pthread_mutex_lock(&rrdfilecreate_lock); + dir = strdup(dirname(file_copy)); + dir2 = realpath(dir, NULL); + if (dir2 == NULL && errno == ENOENT) { + if (!config_allow_recursive_mkdir) { + rc = send_response(sock, RESP_ERR, + "No permission to recursively create: %s\nDid you pass -R to the daemon?\n", + dir); + pthread_mutex_unlock(&rrdfilecreate_lock); + goto done; + } + if (rrd_mkdir_p(dir, 0755) != 0) { + rc = send_response(sock, RESP_ERR, "Cannot create %s: %s\n", + dir, strerror(errno)); + pthread_mutex_unlock(&rrdfilecreate_lock); + goto done; + } + } + pthread_mutex_unlock(&rrdfilecreate_lock); + + while ((status = buffer_get_field(&buffer, &buffer_size, &tok)) == 0 + && tok) { + if (!strncmp(tok, "-b", 2)) { + status = buffer_get_field(&buffer, &buffer_size, &tok); + if (status != 0) { + rc = syntax_error(sock, cmd); + goto done; + } + last_up = (time_t) atol(tok); + continue; + } + if (!strncmp(tok, "-s", 2)) { + status = buffer_get_field(&buffer, &buffer_size, &tok); + if (status != 0) { + rc = syntax_error(sock, cmd); + goto done; + } + step = atol(tok); + continue; + } + if (!strncmp(tok, "-r", 2)) { + status = buffer_get_field(&buffer, &buffer_size, &tok); + if (status != 0) { + rc = syntax_error(sock, cmd); + goto done; + } + sources = realloc(sources, sizeof(char *) * (sources_length + 2)); + if (sources == NULL) { + rc = send_response(sock, RESP_ERR, + "Cannot allocate memory\n"); + goto done; + } + + flush_file(tok); + + sources[sources_length++] = tok; + sources[sources_length] = NULL; + + continue; + } + if (!strncmp(tok, "-t", 2)) { + status = buffer_get_field(&buffer, &buffer_size, &tok); + if (status != 0) { + rc = syntax_error(sock, cmd); + goto done; + } + flush_file(tok); + + template = tok; + continue; + } + if (!strncmp(tok, "-O", 2)) { + no_overwrite = 1; + continue; + } + if (!strncmp(tok, "DS:", 3)) { + av[ac++] = tok; + continue; + } + if (!strncmp(tok, "RRA:", 4)) { + av[ac++] = tok; + continue; + } + rc = syntax_error(sock, cmd); + goto done; + } + if (last_up != -1 && last_up < 3600 * 24 * 365 * 10) { + rc = send_response(sock, RESP_ERR, + "The first entry must be after 1980.\n"); + goto done; } - if (!check_file_access(absolute, sock)) { - /* Cannot access: skip entry */ - goto loop_next; + rrd_clear_error(); + pthread_mutex_lock(&rrdfilecreate_lock); + status = + rrd_create_r2(file, step, last_up, no_overwrite, + (const char **) sources, template, ac, + (const char **) av); + pthread_mutex_unlock(&rrdfilecreate_lock); + + if (!status) { + rc = send_response(sock, RESP_OK, "RRD created OK\n"); + goto done; + } + rc = send_response(sock, RESP_ERR, "RRD Error: %s\n", rrd_get_error()); + done: + free(file); + free(sources); + free(file_copy); + if (dir) { + free(dir); + } + free(dir2); + return rc; +} /* }}} static int handle_request_create */ + +static int handle_request_list( + HANDLER_PROTO) +{ /* {{{ */ + char *filename = NULL; + char *rec = NULL; + int recursive = 0; + char *list, *start_ptr, *end_ptr, *ptr; + char fullpath[PATH_MAX], current[PATH_MAX], absolute[PATH_MAX]; + char bwc[PATH_MAX], bwd[PATH_MAX]; + char *base = &config_base_dir[0]; + struct stat sc, sd; + ssize_t len; + int status; + + if (config_base_dir == NULL) { + return send_response(sock, RESP_ERR, "No base directory defined\n"); } - /* Make sure we aren't following a symlink pointing outside of base_dir */ - if (lstat(absolute, &sc) == -1) { - free(list); - return send_response(sock, RESP_ERR, - "stat %s: %s\n", absolute, rrd_strerror(errno)); - } + /* get 'RECURSIVE' option */ + status = buffer_get_field(&buffer, &buffer_size, &rec); + if (status == 0) { + /* as 'RECURSIVE' is optional, the first argument may be the filename */ + if (rec[0] != '/' && strcmp(rec, "RECURSIVE") != 0) { + return syntax_error(sock, cmd); + } - if ((sc.st_mode & S_IFMT) == S_IFLNK) { - len = readlink(absolute, bwc, sizeof(bwc) - 1); + if (rec[0] == '/') { + filename = rec; - if (len == -1) { - free(list); - return send_response(sock, RESP_ERR, "readlink %s: %s\n", - absolute, rrd_strerror(errno)); - } - bwc[len] = '\0'; - strncpy(&absolute[0], bwc, PATH_MAX - 1); - absolute[PATH_MAX - 1] = '\0'; + } else if (strcmp(rec, "RECURSIVE") == 0) { + recursive = 1; + } } - /* Absolute path MUST be starting with base_dir; if not skip the entry. */ - if (strlen(absolute) < strlen(base) || - memcmp(absolute, base, strlen(base)) != 0) { - goto loop_next; - } - add_response_info(sock, "%s\n", current); + /* Get pathname if not done already */ + if (!filename) { + status = buffer_get_field(&buffer, &buffer_size, &filename); -loop_next: - start_ptr = end_ptr + 1; + if (status != 0) + return syntax_error(sock, cmd); + } - } + /* get full pathname */ + snprintf(fullpath, PATH_MAX, "%s%s%s", + config_base_dir, (filename[0] == '/') ? "" : "/", filename); - free(list); + if (!check_file_access(fullpath, sock)) { + return send_response(sock, RESP_ERR, "Cannot read: %s\n", fullpath); + } -out_send_response: - send_response(sock, RESP_OK, "RRDs\n"); + /* get real path of config_base_dir in case it's a symlink */ + if (lstat(config_base_dir, &sd) == -1) { + return send_response(sock, RESP_ERR, "stat %s: %s\n", + config_base_dir, rrd_strerror(errno)); + } - return (0); -} /* }}} int handle_request_list */ + if ((sd.st_mode & S_IFMT) == S_IFLNK) { + len = readlink(config_base_dir, bwd, sizeof(bwd) - 1); + if (len == -1) { + return send_response(sock, RESP_ERR, "readlink %s: %s\n", + config_base_dir, rrd_strerror(errno)); + } + bwd[len] = '\0'; + base = &bwd[0]; + } -static cache_item_t *buffer_get_cache_item(listen_socket_t *sock, - command_t *cmd, char **buffer, - size_t *buffer_size, int *rc, - char **file_name) -{ - char *pbuffile; - cache_item_t *ci; - int status; - - /* obtain filename */ - status = buffer_get_field(buffer, buffer_size, &pbuffile); - if (status != 0) { - *rc = syntax_error(sock, cmd); - return NULL; - } - /* get full pathname */ - *file_name = get_abs_path(pbuffile); - if (file_name == NULL) { - *rc = send_response(sock, RESP_ERR, "%s + %s\n", *file_name, rrd_strerror(ENOMEM)); - return NULL; - } + list = rrd_list_r(recursive, fullpath); - ci = g_tree_lookup(cache_tree, *file_name); - if (ci == NULL) { - *rc = send_response(sock, RESP_ERR, "%s - %s\n", *file_name, rrd_strerror(ENOENT)); - return NULL; - } + if (list == NULL) { + /* Empty directory listing */ + if (errno == 0) { + goto out_send_response; + } - *rc = 0; - return ci; -} + return send_response(sock, RESP_ERR, + "List %s: %s\n", fullpath, rrd_strerror(errno)); + } -static int handle_request_suspend(HANDLER_PROTO) /* {{{ */ -{ - char *file_name = NULL; - int rc; - cache_item_t *ci = buffer_get_cache_item(sock, cmd, &buffer, &buffer_size, &rc, &file_name); - if (ci == NULL) - rc = -1; - else if ((ci->flags & CI_FLAGS_SUSPENDED) == CI_FLAGS_SUSPENDED) - rc = send_response(sock, RESP_OK, "%s already suspended\n", file_name); - else - { - ci->flags |= CI_FLAGS_SUSPENDED; - rc = send_response(sock, RESP_OK, "%s suspended\n", file_name); - } - free(file_name); - return rc; -} /* }}} static int handle_request_suspend */ - -static int handle_request_resume (HANDLER_PROTO) /* {{{ */ -{ - char *file_name = NULL; - int rc; - cache_item_t *ci = buffer_get_cache_item(sock, cmd, &buffer, &buffer_size, &rc, &file_name); - if (ci == NULL) - rc = -1; - else if ((ci->flags & CI_FLAGS_SUSPENDED) == 0) - rc = send_response(sock, RESP_OK, "%s not suspended\n", file_name); - else - { - ci->flags &= ~CI_FLAGS_SUSPENDED; - rc = send_response(sock, RESP_OK, "%s resumed\n", file_name); - } - free(file_name); - return rc; -} /* }}} static int handle_request_resume */ - -static gboolean tree_callback_suspend (gpointer UNUSED(key), /* {{{ */ - gpointer value, gpointer pointer) -{ - cache_item_t *ci = (cache_item_t *) value; - int *count = (int*) pointer; - if ((ci->flags & CI_FLAGS_SUSPENDED) == 0) { - ci->flags |= CI_FLAGS_SUSPENDED; - *count += 1; - } - return (FALSE); -} /* }}} gboolean tree_callback_suspend */ - -static int handle_request_suspendall(HANDLER_PROTO) /* {{{ */ -{ - int count = 0; - g_tree_foreach (cache_tree, tree_callback_suspend, (gpointer) &count); - return send_response(sock, RESP_OK, "%d rrds suspend\n", count); -} /* }}} static int handle_request_suspendall */ + /* Check list items returned by rrd_list_r; + * the returned string is newline-separated: '%s\n%s\n...%s\n' + */ + start_ptr = list; + end_ptr = list; -static gboolean tree_callback_resume (gpointer UNUSED(key), /* {{{ */ - gpointer value, gpointer pointer) -{ - cache_item_t *ci = (cache_item_t *) value; - int *count = (int*) pointer; - if ((ci->flags & CI_FLAGS_SUSPENDED) != 0) { - ci->flags &= ~CI_FLAGS_SUSPENDED; - *count += 1; - } - return (FALSE); -} /* }}} gboolean tree_callback_resume */ - -static int handle_request_resumeall(HANDLER_PROTO) /* {{{ */ -{ - int count = 0; - g_tree_foreach (cache_tree, tree_callback_resume, (gpointer) &count); - return send_response(sock, RESP_OK, "%d rrds resumed\n", count); -} /* }}} static int handle_request_resumeall */ + while (*start_ptr != '\0') { + end_ptr = strchr(start_ptr, '\n'); -/* start "BATCH" processing */ -static int batch_start (HANDLER_PROTO) /* {{{ */ -{ - int status; - if (sock->batch_start) - return send_response(sock, RESP_ERR, "Already in BATCH\n"); + if (end_ptr == NULL) { + end_ptr = start_ptr + strlen(start_ptr); + } - status = send_response(sock, RESP_OK, - "Go ahead. End with dot '.' on its own line.\n"); - sock->batch_start = time(NULL); - sock->batch_cmd = 0; + if ((end_ptr - start_ptr + strlen(fullpath) + 1) >= PATH_MAX) { + /* Name too long: skip entry */ + goto loop_next; + } + strncpy(¤t[0], start_ptr, (end_ptr - start_ptr)); + current[end_ptr - start_ptr] = '\0'; - return status; -} /* }}} static int batch_start */ + /* if a single .rrd was asked for, absolute == fullpath */ + ptr = strstr(fullpath, ".rrd"); -/* finish "BATCH" processing and return results to the client */ -static int batch_done (HANDLER_PROTO) /* {{{ */ -{ - assert(sock->batch_start); - sock->batch_start = 0; - sock->batch_cmd = 0; - return send_response(sock, RESP_OK, "errors\n"); -} /* }}} static int batch_done */ + if (ptr != NULL && strlen(ptr) == 4) { + snprintf(&absolute[0], PATH_MAX, "%s", fullpath); -static int handle_request_quit (HANDLER_PROTO) /* {{{ */ -{ - return -1; -} /* }}} static int handle_request_quit */ + } else { + snprintf(&absolute[0], PATH_MAX, "%s/%s", fullpath, current); + } -static command_t list_of_commands[] = { /* {{{ */ - { - "UPDATE", - handle_request_update, - CMD_CONTEXT_ANY, - "UPDATE [ ...]\n" - , - "Adds the given file to the internal cache if it is not yet known and\n" - "appends the given value(s) to the entry. See the rrdcached(1) manpage\n" - "for details.\n" - "\n" - "Each has the following form:\n" - " =