const char *stmt, unsigned int len);
};
+enum {
+ RING_NO_QUERY,
+ RING_QUERY_READY,
+};
+
+struct db_stmt_ring {
+ /* Ring buffer: 1 status byte + string */
+ char *ring; /* pointer to the ring */
+ uint32_t size; /* size of ring buffer in element */
+ int length; /* length of one ring buffer element */
+ uint32_t wr_item; /* write item in ring buffer */
+ uint32_t rd_item; /* read item in ring buffer */
+ char *wr_place;
+ pthread_cond_t cond;
+ pthread_mutex_t mutex;
+ int full;
+};
+
struct db_stmt {
char *stmt;
int len;
time_t reconnect;
int (*interp)(struct ulogd_pluginstance *upi);
struct db_driver *driver;
+ /* DB ring buffer */
+ struct db_stmt_ring ring;
+ pthread_t db_thread_id;
+ /* Backlog system */
unsigned int backlog_memcap;
unsigned int backlog_memusage;
unsigned int backlog_oneshot;
#define TIME_ERR ((time_t)-1) /* Be paranoid */
#define RECONNECT_DEFAULT 2
#define MAX_ONESHOT_REQUEST 10
+#define RING_BUFFER_DEFAULT_SIZE 10
#define DB_CES \
{ \
.key = "backlog_oneshot_requests", \
.type = CONFIG_TYPE_INT, \
.u.value = MAX_ONESHOT_REQUEST, \
+ }, \
+ { \
+ .key = "ring_buffer_size", \
+ .type = CONFIG_TYPE_INT, \
+ .u.value = RING_BUFFER_DEFAULT_SIZE, \
}
-#define DB_CE_NUM 6
+#define DB_CE_NUM 7
#define table_ce(x) (x->ces[0])
#define reconnect_ce(x) (x->ces[1])
#define timeout_ce(x) (x->ces[2])
#define procedure_ce(x) (x->ces[3])
#define backlog_memcap_ce(x) (x->ces[4])
#define backlog_oneshot_ce(x) (x->ces[5])
+#define ringsize_ce(x) (x->ces[6])
void ulogd_db_signal(struct ulogd_pluginstance *upi, int signal);
int ulogd_db_start(struct ulogd_pluginstance *upi);
* Portions (C) 2001 Alex Janssen <alex@ynfonatic.de>,
* (C) 2005 Sven Schuster <schuster.sven@gmx.de>,
* (C) 2005 Jozsef Kadlecsik <kadlec@blackhole.kfki.hu>
- * (C) 2008 Eric Leblond <eric@inl.fr>
+ * (C) 2008,2013 Eric Leblond <eric@regit.org>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License version 2
#include <arpa/inet.h>
#include <time.h>
#include <inttypes.h>
+#include <pthread.h>
#include <ulogd/ulogd.h>
#include <ulogd/db.h>
ulogd_log(ULOGD_ERROR, "OOM!\n");
return -ENOMEM;
}
+ mi->ring.length = size + 1;
if (strncasecmp(procedure,"INSERT", strlen("INSERT")) == 0 &&
(procedure[strlen("INSERT")] == '\0' ||
static int _init_db(struct ulogd_pluginstance *upi);
+static void *__inject_thread(void *gdi);
+
int ulogd_db_configure(struct ulogd_pluginstance *upi,
struct ulogd_pluginstance_stack *stack)
{
di->backlog_full = 0;
}
+ /* check ring option */
+ di->ring.size = ringsize_ce(upi->config_kset).u.value;
+
return ret;
}
{
struct db_instance *di = (struct db_instance *) upi->private;
int ret;
+ unsigned int i;
ulogd_log(ULOGD_NOTICE, "starting\n");
ret = sql_createstmt(upi);
if (ret < 0)
- di->driver->close_db(upi);
+ goto db_error;
+
+ if (di->ring.size > 0) {
+ /* allocate */
+ di->ring.ring = calloc(di->ring.size, sizeof(char) * di->ring.length);
+ if (di->ring.ring == NULL) {
+ ret = -1;
+ goto db_error;
+ }
+ di->ring.wr_place = di->ring.ring;
+ ulogd_log(ULOGD_NOTICE,
+ "Allocating %d elements of size %d for ring\n",
+ di->ring.size, di->ring.length);
+ /* init start of query for each element */
+ for(i = 0; i < di->ring.size; i++) {
+ strncpy(di->ring.ring + di->ring.length * i + 1,
+ di->stmt,
+ strlen(di->stmt));
+ }
+ /* init cond & mutex */
+ ret = pthread_cond_init(&di->ring.cond, NULL);
+ if (ret != 0)
+ goto alloc_error;
+ ret = pthread_mutex_init(&di->ring.mutex, NULL);
+ if (ret != 0)
+ goto cond_error;
+ /* create thread */
+ ret = pthread_create(&di->db_thread_id, NULL, __inject_thread, upi);
+ if (ret != 0)
+ goto mutex_error;
+ }
di->interp = &_init_db;
return ret;
+
+mutex_error:
+ pthread_mutex_destroy(&di->ring.mutex);
+cond_error:
+ pthread_cond_destroy(&di->ring.cond);
+alloc_error:
+ free(di->ring.ring);
+db_error:
+ di->driver->close_db(upi);
+ return ret;
}
static int ulogd_db_instance_stop(struct ulogd_pluginstance *upi)
free(di->stmt);
di->stmt = NULL;
}
-
+ if (di->ring.size > 0) {
+ pthread_cancel(di->db_thread_id);
+ free(di->ring.ring);
+ pthread_cond_destroy(&di->ring.cond);
+ pthread_mutex_destroy(&di->ring.mutex);
+ di->ring.ring = NULL;
+ }
return 0;
}
return 0;
}
-static void __format_query_db(struct ulogd_pluginstance *upi)
+static void __format_query_db(struct ulogd_pluginstance *upi, char *start)
{
struct db_instance *di = (struct db_instance *) &upi->private;
unsigned int i;
- char *stmt_ins = di->stmt + di->stmt_offset;
+ char *stmt_ins = start + di->stmt_offset;
for (i = 0; i < upi->input.num_keys; i++) {
struct ulogd_key *res = upi->input.keys[i].u.source;
if (!res)
ulogd_log(ULOGD_NOTICE, "no source for `%s' ?!?\n",
upi->input.keys[i].name);
-
+
if (!res || !IS_VALID(*res)) {
/* no result, we have to fake something */
stmt_ins += sprintf(stmt_ins, "NULL,");
continue;
}
-
+
switch (res->type) {
case ULOGD_RET_INT8:
sprintf(stmt_ins, "%d,", res->u.value.i8);
res->type, upi->input.keys[i].name);
break;
}
- stmt_ins = di->stmt + strlen(di->stmt);
+ stmt_ins = start + strlen(start);
}
*(stmt_ins - 1) = ')';
}
if (di->reconnect && di->reconnect > time(NULL)) {
/* store entry to backlog if it is active */
if (di->backlog_memcap && !di->backlog_full) {
- __format_query_db(upi);
+ __format_query_db(upi, di->stmt);
__add_to_backlog(upi, di->stmt,
strlen(di->stmt));
}
if (di->driver->open_db(upi)) {
ulogd_log(ULOGD_ERROR, "can't establish database connection\n");
if (di->backlog_memcap && !di->backlog_full) {
- __format_query_db(upi);
+ __format_query_db(upi, di->stmt);
__add_to_backlog(upi, di->stmt, strlen(di->stmt));
}
return _init_reconnect(upi);
return 0;
}
+static int __add_to_ring(struct ulogd_pluginstance *upi, struct db_instance *di)
+{
+ if (*di->ring.wr_place == RING_QUERY_READY) {
+ if (di->ring.full == 0) {
+ ulogd_log(ULOGD_ERROR, "No place left in ring\n");
+ di->ring.full = 1;
+ }
+ return ULOGD_IRET_OK;
+ } else if (di->ring.full) {
+ ulogd_log(ULOGD_NOTICE, "Recovered some place in ring\n");
+ di->ring.full = 0;
+ }
+ __format_query_db(upi, di->ring.wr_place + 1);
+ *di->ring.wr_place = RING_QUERY_READY;
+ pthread_cond_signal(&di->ring.cond);
+ di->ring.wr_item ++;
+ di->ring.wr_place += di->ring.length;
+ if (di->ring.wr_item == di->ring.size) {
+ di->ring.wr_item = 0;
+ di->ring.wr_place = di->ring.ring;
+ }
+ return ULOGD_IRET_OK;
+}
+
/* our main output function, called by ulogd */
static int __interp_db(struct ulogd_pluginstance *upi)
{
struct db_instance *di = (struct db_instance *) &upi->private;
+ if (di->ring.size)
+ return __add_to_ring(upi, di);
- __format_query_db(upi);
- /* now we have created our statement, insert it */
+ __format_query_db(upi, di->stmt);
/* if backup log is not empty we add current query to it */
if (!llist_empty(&di->backlog)) {
return 0;
}
+static int __loop_reconnect_db(struct ulogd_pluginstance * upi) {
+ struct db_instance *di = (struct db_instance *) &upi->private;
+
+ di->driver->close_db(upi);
+ while (1) {
+ if (di->driver->open_db(upi)) {
+ sleep(1);
+ } else {
+ return 0;
+ }
+ }
+ return 0;
+}
+
+static void *__inject_thread(void *gdi)
+{
+ struct ulogd_pluginstance *upi = (struct ulogd_pluginstance *) gdi;
+ struct db_instance *di = (struct db_instance *) &upi->private;
+ char *wr_place;
+
+ wr_place = di->ring.ring;
+ pthread_mutex_lock(&di->ring.mutex);
+ while(1) {
+ /* wait cond */
+ pthread_cond_wait(&di->ring.cond, &di->ring.mutex);
+ while (*wr_place == RING_QUERY_READY) {
+ if (di->driver->execute(upi, wr_place + 1,
+ strlen(wr_place + 1)) < 0) {
+ if (__loop_reconnect_db(upi) != 0) {
+ /* loop has failed on unrecoverable error */
+ ulogd_log(ULOGD_ERROR,
+ "permanently disabling plugin\n");
+ di->interp = &disabled_interp_db;
+ return NULL;
+ }
+ }
+ *wr_place = RING_NO_QUERY;
+ di->ring.rd_item++;
+ if (di->ring.rd_item == di->ring.size) {
+ di->ring.rd_item = 0;
+ wr_place = di->ring.ring;
+ } else
+ wr_place += di->ring.length;
+ }
+ }
+
+ return NULL;
+}
+
+
void ulogd_db_signal(struct ulogd_pluginstance *upi, int signal)
{
switch (signal) {