]> git.ipfire.org Git - thirdparty/suricata.git/commitdiff
log: Support multi-threaded eve output.
authorJeff Lucovsky <jeff@lucovsky.org>
Tue, 28 Jul 2020 13:03:05 +0000 (09:03 -0400)
committerVictor Julien <victor@inliniac.net>
Sun, 2 Aug 2020 12:21:12 +0000 (14:21 +0200)
src/util-logopenfile.c
src/util-logopenfile.h

index 621232b2c87cecd79b4e9ac4b96e2e7117d84a43..b73b7e4917c1017544a8fb08d3ca9b5fb7fef158 100644 (file)
@@ -1,5 +1,5 @@
 /* vi: set et ts=4: */
-/* Copyright (C) 2007-2014 Open Information Security Foundation
+/* Copyright (C) 2007-2020 Open Information Security Foundation
  *
  * You can copy, redistribute or modify this Program under the terms of
  * the GNU General Public License version 2 as published by the Free
@@ -41,6 +41,9 @@
 #include "util-log-redis.h"
 #endif /* HAVE_LIBHIREDIS */
 
+static bool LogFileNewThreadedCtx(LogFileCtx *parent_ctx, const char *log_path, const char *append, int i);
+static bool SCLogOpenThreadedFileFp(const char *log_path, const char *append, LogFileCtx *parent_ctx, int slot_count);
+
 #ifdef BUILD_WITH_UNIXSOCKET
 /** \brief connect to the indicated local stream socket, logging any errors
  *  \param path filesystem path to connect to
@@ -175,6 +178,40 @@ tryagain:
 }
 #endif /* BUILD_WITH_UNIXSOCKET */
 
+/**
+ * \brief Write buffer to log file.
+ * \retval 0 on failure; otherwise, the return value of fwrite_unlocked (number of
+ * characters successfully written).
+ */
+static int SCLogFileWriteNoLock(const char *buffer, int buffer_len, LogFileCtx *log_ctx)
+{
+    int ret = 0;
+
+    BUG_ON(log_ctx->is_sock);
+
+    /* Check for rotation. */
+    if (log_ctx->rotation_flag) {
+        log_ctx->rotation_flag = 0;
+        SCConfLogReopen(log_ctx);
+    }
+
+    if (log_ctx->flags & LOGFILE_ROTATE_INTERVAL) {
+        time_t now = time(NULL);
+        if (now >= log_ctx->rotate_time) {
+            SCConfLogReopen(log_ctx);
+            log_ctx->rotate_time = now + log_ctx->rotate_interval;
+        }
+    }
+
+    if (log_ctx->fp) {
+        clearerr(log_ctx->fp);
+        ret = SCFwriteUnlocked(buffer, buffer_len, 1, log_ctx->fp);
+        fflush(log_ctx->fp);
+    }
+
+    return ret;
+}
+
 /**
  * \brief Write buffer to log file.
  * \retval 0 on failure; otherwise, the return value of fwrite (number of
@@ -239,12 +276,64 @@ static char *SCLogFilenameFromPattern(const char *pattern)
     return filename;
 }
 
-static void SCLogFileClose(LogFileCtx *log_ctx)
+static void SCLogFileCloseNoLock(LogFileCtx *log_ctx)
 {
     if (log_ctx->fp)
         fclose(log_ctx->fp);
 }
 
+static void SCLogFileClose(LogFileCtx *log_ctx)
+{
+    SCMutexLock(&log_ctx->fp_mutex);
+    SCLogFileCloseNoLock(log_ctx);
+    SCMutexUnlock(&log_ctx->fp_mutex);
+}
+
+static bool
+SCLogOpenThreadedFileFp(const char *log_path, const char *append, LogFileCtx *parent_ctx, int slot_count)
+{
+        parent_ctx->threads = SCCalloc(1, sizeof(LogThreadedFileCtx));
+        if (!parent_ctx->threads) {
+            SCLogError(SC_ERR_MEM_ALLOC, "Unable to allocate threads container");
+            return false;
+        }
+        parent_ctx->threads->append = SCStrdup(append);
+        if (!parent_ctx->threads->append) {
+            SCLogError(SC_ERR_MEM_ALLOC, "Unable to allocate threads append setting");
+            goto error_exit;
+        }
+
+        parent_ctx->threads->slot_count = slot_count;
+        parent_ctx->threads->lf_slots = SCCalloc(slot_count, sizeof(LogFileCtx *));
+        if (!parent_ctx->threads->lf_slots) {
+            SCLogError(SC_ERR_MEM_ALLOC, "Unable to allocate thread slots");
+            goto error_exit;
+        }
+        SCLogDebug("Allocated %d file context pointers for threaded array",
+                    parent_ctx->threads->slot_count);
+        int slot = 1;
+        for (; slot < parent_ctx->threads->slot_count; slot++) {
+            if (!LogFileNewThreadedCtx(parent_ctx, log_path, append, slot)) {
+                /* TODO: clear allocated entries [1, slot) */
+                goto error_exit;
+            }
+        }
+        SCMutexInit(&parent_ctx->threads->mutex, NULL);
+        return true;
+
+error_exit:
+
+        if (parent_ctx->threads->lf_slots) {
+            SCFree(parent_ctx->threads->lf_slots);
+        }
+        if (parent_ctx->threads->append) {
+            SCFree(parent_ctx->threads->append);
+        }
+        SCFree(parent_ctx->threads);
+        parent_ctx->threads = NULL;
+        return false;
+}
+
 /** \brief open the indicated file, logging any errors
  *  \param path filesystem path to open
  *  \param append_setting open file with O_APPEND: "yes" or "no"
@@ -430,10 +519,16 @@ SCConfLogOpenGeneric(ConfNode *conf,
 #endif
     } else if (strcasecmp(filetype, DEFAULT_LOG_FILETYPE) == 0 ||
                strcasecmp(filetype, "file") == 0) {
-        log_ctx->fp = SCLogOpenFileFp(log_path, append, log_ctx->filemode);
-        if (log_ctx->fp == NULL)
-            return -1; // Error already logged by Open...Fp routine
         log_ctx->is_regular = 1;
+        if (!log_ctx->threaded) {
+            log_ctx->fp = SCLogOpenFileFp(log_path, append, log_ctx->filemode);
+            if (log_ctx->fp == NULL)
+                return -1; // Error already logged by Open...Fp routine
+        } else {
+            if (!SCLogOpenThreadedFileFp(log_path, append, log_ctx, 1)) {
+                return -1;
+            }
+        }
         if (rotate) {
             OutputRegisterFileRotationFlag(&log_ctx->rotation_flag);
         }
@@ -514,24 +609,125 @@ int SCConfLogReopen(LogFileCtx *log_ctx)
 LogFileCtx *LogFileNewCtx(void)
 {
     LogFileCtx* lf_ctx;
-    lf_ctx = (LogFileCtx*)SCMalloc(sizeof(LogFileCtx));
+    lf_ctx = (LogFileCtx*)SCCalloc(1, sizeof(LogFileCtx));
 
     if (lf_ctx == NULL)
         return NULL;
-    memset(lf_ctx, 0, sizeof(LogFileCtx));
-
-    SCMutexInit(&lf_ctx->fp_mutex,NULL);
 
-    // Default Write and Close functions
     lf_ctx->Write = SCLogFileWrite;
     lf_ctx->Close = SCLogFileClose;
 
     return lf_ctx;
 }
 
+/** \brief LogFileEnsureExists() Ensure a log file context for the thread exists
+ * \param parent_ctx
+ * \param thread_id
+ * \retval LogFileCtx * pointer if successful; NULL otherwise
+ */
+LogFileCtx *LogFileEnsureExists(LogFileCtx *parent_ctx, int thread_id)
+{
+    /* threaded output disabled */
+    if (!parent_ctx->threaded)
+        return parent_ctx;
+
+    SCLogDebug("Adding reference %d to file ctx %p", thread_id, parent_ctx);
+    SCMutexLock(&parent_ctx->threads->mutex);
+    /* are there enough context slots already */
+    if (thread_id < parent_ctx->threads->slot_count) {
+        /* has it been opened yet? */
+        if (!parent_ctx->threads->lf_slots[thread_id]) {
+            SCLogDebug("Opening new file for %d reference to file ctx %p", thread_id, parent_ctx);
+            LogFileNewThreadedCtx(parent_ctx, parent_ctx->filename, parent_ctx->threads->append, thread_id);
+        }
+        SCLogDebug("Existing file for %d reference to file ctx %p", thread_id, parent_ctx);
+        SCMutexUnlock(&parent_ctx->threads->mutex);
+        return parent_ctx->threads->lf_slots[thread_id];
+    }
+
+    /* ensure there's a slot for the caller */
+    int new_size = MAX(parent_ctx->threads->slot_count << 1, thread_id + 1);
+    SCLogDebug("Increasing slot count; current %d, trying %d",
+            parent_ctx->threads->slot_count, new_size);
+    LogFileCtx **new_array = SCRealloc(parent_ctx->threads->lf_slots, new_size * sizeof(LogFileCtx *));
+    if (new_array == NULL) {
+        /* Try one more time */
+        SCLogDebug("Unable to increase file context array size to %d; trying %d",
+                new_size, thread_id + 1);
+        new_size = thread_id + 1;
+        new_array = SCRealloc(parent_ctx->threads->lf_slots, new_size * sizeof(LogFileCtx *));
+    }
+
+    if (new_array == NULL) {
+        SCLogError(SC_ERR_MEM_ALLOC, "Unable to increase file context array size to %d", new_size);
+        SCMutexUnlock(&parent_ctx->threads->mutex);
+        return NULL;
+    }
+
+    parent_ctx->threads->lf_slots = new_array;
+    /* initialize newly added slots */
+    for (int i = parent_ctx->threads->slot_count; i < new_size; i++) {
+        parent_ctx->threads->lf_slots[i] = NULL;
+    }
+    parent_ctx->threads->slot_count = new_size;
+    LogFileNewThreadedCtx(parent_ctx, parent_ctx->filename, parent_ctx->threads->append, thread_id);
+
+    SCMutexUnlock(&parent_ctx->threads->mutex);
+
+    return parent_ctx->threads->lf_slots[thread_id];
+}
+
+/** \brief LogFileNewThreadedCtx() Create file context for threaded output
+ * \param parent_ctx
+ * \param log_path
+ * \param append
+ * \param thread_id
+ */
+static bool LogFileNewThreadedCtx(LogFileCtx *parent_ctx, const char *log_path, const char *append, int thread_id)
+{
+    LogFileCtx *thread = SCCalloc(1, sizeof(LogFileCtx));
+    if (!thread) {
+        SCLogError(SC_ERR_MEM_ALLOC, "Unable to allocate thread file context slot %d", thread_id);
+        return false;
+    }
+
+    *thread = *parent_ctx;
+    char fname[NAME_MAX];
+    snprintf(fname, sizeof(fname), "%s.%d", log_path, thread_id);
+    SCLogDebug("Thread open -- using name %s [replaces %s]", fname, log_path);
+    thread->fp = SCLogOpenFileFp(fname, append, thread->filemode);
+    if (thread->fp == NULL) {
+        goto error;
+    }
+    thread->filename = SCStrdup(fname);
+    if (!thread->filename) {
+        SCLogError(SC_ERR_MEM_ALLOC, "Unable to duplicate filename for context slot %d", thread_id);
+        goto error;
+    }
+
+    thread->threaded = false;
+    thread->parent = parent_ctx;
+    thread->id = thread_id;
+    thread->Write = SCLogFileWriteNoLock;
+    thread->Close = SCLogFileCloseNoLock;
+
+    parent_ctx->threads->lf_slots[thread_id] = thread;
+    return true;
+
+error:
+    if (thread->fp) {
+        thread->Close(thread);
+    }
+    if (thread) {
+        SCFree(thread);
+    }
+    parent_ctx->threads->lf_slots[thread_id] = NULL;
+    return false;
+}
+
 /** \brief LogFileFreeCtx() Destroy a LogFileCtx (Close the file and free memory)
- *  \param motcx pointer to the OutputCtx
- *  \retval int 1 if succesful, 0 if error
+ *  \param lf_ctx pointer to the OutputCtx
+ *  \retval int 1 if successful, 0 if error
  *  */
 int LogFileFreeCtx(LogFileCtx *lf_ctx)
 {
@@ -539,14 +735,29 @@ int LogFileFreeCtx(LogFileCtx *lf_ctx)
         SCReturnInt(0);
     }
 
-    if (lf_ctx->fp != NULL) {
-        SCMutexLock(&lf_ctx->fp_mutex);
-        lf_ctx->Close(lf_ctx);
-        SCMutexUnlock(&lf_ctx->fp_mutex);
+    if (lf_ctx->threaded) {
+        SCMutexDestroy(&lf_ctx->threads->mutex);
+        for(int i = 0; i < lf_ctx->threads->slot_count; i++) {
+            if (lf_ctx->threads->lf_slots[i]) {
+                SCFree(lf_ctx->threads->lf_slots[i]->filename);
+                SCFree(lf_ctx->threads->lf_slots[i]);
+            }
+        }
+        SCFree(lf_ctx->threads->lf_slots);
+        SCFree(lf_ctx->threads->append);
+        SCFree(lf_ctx->threads);
+    } else {
+        if (lf_ctx->fp != NULL) {
+            lf_ctx->Close(lf_ctx);
+        }
+        if (lf_ctx->parent) {
+            SCMutexLock(&lf_ctx->parent->threads->mutex);
+            lf_ctx->parent->threads->lf_slots[lf_ctx->id] = NULL;
+            SCMutexUnlock(&lf_ctx->parent->threads->mutex);
+        }
+        SCMutexDestroy(&lf_ctx->fp_mutex);
     }
 
-    SCMutexDestroy(&lf_ctx->fp_mutex);
-
     if (lf_ctx->prefix != NULL) {
         SCFree(lf_ctx->prefix);
         lf_ctx->prefix_len = 0;
index 40b4ed45b94b24cfeb29723ec44bb178559830f9..3b32a02d752975bd113427e84c2323b41af16932 100644 (file)
@@ -1,4 +1,4 @@
-/* Copyright (C) 2007-2014 Open Information Security Foundation
+/* Copyright (C) 2007-2020 Open Information Security Foundation
  *
  * You can copy, redistribute or modify this Program under the terms of
  * the GNU General Public License version 2 as published by the Free
@@ -46,12 +46,20 @@ typedef struct SyslogSetup_ {
     int alert_syslog_level;
 } SyslogSetup;
 
+struct LogFileCtx_;
+typedef struct LogThreadedFileCtx_ {
+    int slot_count;
+    SCMutex mutex;
+    struct LogFileCtx_ **lf_slots;
+    char *append;
+} LogThreadedFileCtx;
 
 /** Global structure for Output Context */
 typedef struct LogFileCtx_ {
     union {
         FILE *fp;
         PcieFile *pcie_fp;
+        LogThreadedFileCtx *threads;
 #ifdef HAVE_LIBHIREDIS
         void *redis;
 #endif
@@ -71,6 +79,11 @@ typedef struct LogFileCtx_ {
      * record cannot be written to the file in one call */
     SCMutex fp_mutex;
 
+    /** When threaded, track of the parent and thread id */
+    bool threaded;
+    struct LogFileCtx_ *parent;
+    int id;
+
     /** the type of file */
     enum LogFileType type;
 
@@ -144,6 +157,7 @@ LogFileCtx *LogFileNewCtx(void);
 int LogFileFreeCtx(LogFileCtx *);
 int LogFileWrite(LogFileCtx *file_ctx, MemBuffer *buffer);
 
+LogFileCtx *LogFileEnsureExists(LogFileCtx *lf_ctx, int thread_id);
 int SCConfLogOpenGeneric(ConfNode *conf, LogFileCtx *, const char *, int);
 int SCConfLogReopen(LogFileCtx *);