]> git.ipfire.org Git - thirdparty/suricata.git/commitdiff
output/plugin: Support threaded output plugins
authorJeff Lucovsky <jeff@lucovsky.org>
Fri, 26 Mar 2021 12:43:11 +0000 (08:43 -0400)
committerVictor Julien <victor@inliniac.net>
Wed, 5 May 2021 18:12:34 +0000 (20:12 +0200)
src/output-json.c
src/util-logopenfile.c
src/util-logopenfile.h

index 610757c20dfae4764c7c953683597fec7dccebb7..82637ad83ae6998194453d322d392b617ce5575b 100644 (file)
@@ -1113,20 +1113,19 @@ OutputInitResult OutputJsonInitCtx(ConfNode *conf)
             json_ctx->file_ctx->prefix_len = strlen(prefix);
         }
 
+        /* Threaded file output */
+        const ConfNode *threaded = ConfNodeLookupChild(conf, "threaded");
+        if (threaded && threaded->val && ConfValIsTrue(threaded->val)) {
+            SCLogConfig("Threaded EVE logging configured");
+            json_ctx->file_ctx->threaded = true;
+        } else {
+            json_ctx->file_ctx->threaded = false;
+        }
+
         if (json_ctx->json_out == LOGFILE_TYPE_FILE ||
             json_ctx->json_out == LOGFILE_TYPE_UNIX_DGRAM ||
             json_ctx->json_out == LOGFILE_TYPE_UNIX_STREAM)
         {
-            if (json_ctx->json_out == LOGFILE_TYPE_FILE) {
-                /* Threaded file output */
-                const ConfNode *threaded = ConfNodeLookupChild(conf, "threaded");
-                if (threaded && threaded->val && ConfValIsTrue(threaded->val)) {
-                    SCLogConfig("Enabling threaded eve logging.");
-                    json_ctx->file_ctx->threaded = true;
-                } else {
-                    json_ctx->file_ctx->threaded = false;
-                }
-            }
 
             if (SCConfLogOpenGeneric(conf, json_ctx->file_ctx, DEFAULT_LOG_FILENAME, 1) < 0) {
                 LogFileFreeCtx(json_ctx->file_ctx);
@@ -1183,6 +1182,7 @@ OutputInitResult OutputJsonInitCtx(ConfNode *conf)
             }
 
             if (SCConfLogOpenRedis(redis_node, json_ctx->file_ctx) < 0) {
+                SCFree(json_ctx->file_ctx->sensor_name);
                 LogFileFreeCtx(json_ctx->file_ctx);
                 SCFree(json_ctx);
                 SCFree(output_ctx);
@@ -1190,20 +1190,39 @@ OutputInitResult OutputJsonInitCtx(ConfNode *conf)
             }
         }
 #endif
+#ifdef HAVE_PLUGINS
         else if (json_ctx->json_out == LOGFILE_TYPE_PLUGIN) {
-            ConfNode *plugin_conf = ConfNodeLookupChild(conf,
-                json_ctx->plugin->name);
-            void *plugin_data = NULL;
-            if (json_ctx->plugin->Init(plugin_conf, false, &plugin_data) < 0) {
+            if (json_ctx->file_ctx->threaded) {
+                /* Prepare for storing per-thread data */
+                if (!SCLogOpenThreadedFile(NULL, NULL, json_ctx->file_ctx, 1)) {
+                    SCFree(json_ctx);
+                    SCFree(output_ctx);
+                    return result;
+                }
+            }
+
+            void *init_data = NULL;
+            if (json_ctx->plugin->Init(conf, json_ctx->file_ctx->threaded, &init_data) < 0) {
                 LogFileFreeCtx(json_ctx->file_ctx);
                 SCFree(json_ctx);
                 SCFree(output_ctx);
                 return result;
-            } else {
-                json_ctx->file_ctx->plugin = json_ctx->plugin;
-                json_ctx->file_ctx->plugin_data = plugin_data;
             }
+
+            /* Now that initialization completed successfully, if threaded, make sure
+             * that ThreadInit and ThreadDeInit exist
+             */
+            if (json_ctx->file_ctx->threaded) {
+                if (!json_ctx->plugin->ThreadInit || !json_ctx->plugin->ThreadDeinit) {
+                    FatalError(SC_ERR_LOG_OUTPUT, "Output logger must supply ThreadInit and "
+                                                  "ThreadDeinit functions for threaded mode");
+                }
+            }
+
+            json_ctx->file_ctx->plugin.plugin = json_ctx->plugin;
+            json_ctx->file_ctx->plugin.init_data = init_data;
         }
+#endif
 
         const char *sensor_id_s = ConfNodeLookupChildValue(conf, "sensor-id");
         if (sensor_id_s != NULL) {
index 952018d0c22296f5ea22e717e8ccb151be9f95d9..2e324d9d1fa7dd3efde8403bdcd2b8dc5f81826e 100644 (file)
@@ -42,7 +42,6 @@
 #endif /* HAVE_LIBHIREDIS */
 
 static bool LogFileNewThreadedCtx(LogFileCtx *parent_ctx, const char *log_path, const char *append, int i);
-static bool SCLogOpenThreadedFileFp(const char *log_path, const char *append, LogFileCtx *parent_ctx, int slot_count);
 
 // Threaded eve.json identifier
 static SC_ATOMIC_DECL_AND_INIT_WITH_VAL(uint32_t, eve_file_id, 1);
@@ -321,18 +320,20 @@ static void SCLogFileClose(LogFileCtx *log_ctx)
     SCMutexUnlock(&log_ctx->fp_mutex);
 }
 
-static bool
-SCLogOpenThreadedFileFp(const char *log_path, const char *append, LogFileCtx *parent_ctx, int slot_count)
+bool SCLogOpenThreadedFile(
+        const char *log_path, const char *append, LogFileCtx *parent_ctx, int slot_count)
 {
         parent_ctx->threads = SCCalloc(1, sizeof(LogThreadedFileCtx));
         if (!parent_ctx->threads) {
             SCLogError(SC_ERR_MEM_ALLOC, "Unable to allocate threads container");
             return false;
         }
-        parent_ctx->threads->append = SCStrdup(append);
-        if (!parent_ctx->threads->append) {
-            SCLogError(SC_ERR_MEM_ALLOC, "Unable to allocate threads append setting");
-            goto error_exit;
+        if (append) {
+            parent_ctx->threads->append = SCStrdup(append);
+            if (!parent_ctx->threads->append) {
+                SCLogError(SC_ERR_MEM_ALLOC, "Unable to allocate threads append setting");
+                goto error_exit;
+            }
         }
 
         parent_ctx->threads->slot_count = slot_count;
@@ -530,6 +531,13 @@ SCConfLogOpenGeneric(ConfNode *conf,
             log_ctx->json_flags &= ~(JSON_ESCAPE_SLASH);
     }
 
+#ifdef BUILD_WITH_UNIXSOCKET
+    if (log_ctx->threaded) {
+        if (strcasecmp(filetype, "unix_stream") == 0 || strcasecmp(filetype, "unix_dgram") == 0) {
+            FatalError(SC_ERR_FATAL, "Socket file types do not support threaded output");
+        }
+    }
+#endif
     // Now, what have we been asked to open?
     if (strcasecmp(filetype, "unix_stream") == 0) {
 #ifdef BUILD_WITH_UNIXSOCKET
@@ -557,22 +565,13 @@ SCConfLogOpenGeneric(ConfNode *conf,
             if (log_ctx->fp == NULL)
                 return -1; // Error already logged by Open...Fp routine
         } else {
-            if (!SCLogOpenThreadedFileFp(log_path, append, log_ctx, 1)) {
+            if (!SCLogOpenThreadedFile(log_path, append, log_ctx, 1)) {
                 return -1;
             }
         }
         if (rotate) {
             OutputRegisterFileRotationFlag(&log_ctx->rotation_flag);
         }
-#ifdef HAVE_LIBHIREDIS
-    } else if (strcasecmp(filetype, "redis") == 0) {
-        ConfNode *redis_node = ConfNodeLookupChild(conf, "redis");
-        if (SCConfLogOpenRedis(redis_node, log_ctx) < 0) {
-            SCLogError(SC_ERR_REDIS, "failed to open redis output");
-            return -1;
-        }
-        log_ctx->type = LOGFILE_TYPE_REDIS;
-#endif
     } else {
         SCLogError(SC_ERR_INVALID_YAML_CONF_ENTRY, "Invalid entry for "
                    "%s.filetype.  Expected \"regular\" (default), \"unix_stream\", "
@@ -671,8 +670,8 @@ LogFileCtx *LogFileEnsureExists(LogFileCtx *parent_ctx, int thread_id)
             SCLogDebug("Opening new file for %d reference to file ctx %p", thread_id, parent_ctx);
             LogFileNewThreadedCtx(parent_ctx, parent_ctx->filename, parent_ctx->threads->append, thread_id);
         }
-        SCLogDebug("Existing file for %d reference to file ctx %p", thread_id, parent_ctx);
         SCMutexUnlock(&parent_ctx->threads->mutex);
+        SCLogDebug("Existing file for %d reference to file ctx %p", thread_id, parent_ctx);
         return parent_ctx->threads->lf_slots[thread_id];
     }
 
@@ -690,8 +689,8 @@ LogFileCtx *LogFileEnsureExists(LogFileCtx *parent_ctx, int thread_id)
     }
 
     if (new_array == NULL) {
-        SCLogError(SC_ERR_MEM_ALLOC, "Unable to increase file context array size to %d", new_size);
         SCMutexUnlock(&parent_ctx->threads->mutex);
+        SCLogError(SC_ERR_MEM_ALLOC, "Unable to increase file context array size to %d", new_size);
         return NULL;
     }
 
@@ -767,37 +766,44 @@ static bool LogFileNewThreadedCtx(LogFileCtx *parent_ctx, const char *log_path,
     }
 
     *thread = *parent_ctx;
-    char fname[NAME_MAX];
-    if (!LogFileThreadedName(log_path, fname, sizeof(fname), SC_ATOMIC_ADD(eve_file_id, 1))) {
-        SCLogError(SC_ERR_MEM_ALLOC, "Unable to create threaded filename for log");
-        goto error;
-    }
-    SCLogDebug("Thread open -- using name %s [replaces %s]", fname, log_path);
-    thread->fp = SCLogOpenFileFp(fname, append, thread->filemode);
-    if (thread->fp == NULL) {
-        goto error;
-    }
-    thread->filename = SCStrdup(fname);
-    if (!thread->filename) {
-        SCLogError(SC_ERR_MEM_ALLOC, "Unable to duplicate filename for context slot %d", thread_id);
-        goto error;
+    if (parent_ctx->type == LOGFILE_TYPE_FILE) {
+        char fname[NAME_MAX];
+        if (!LogFileThreadedName(log_path, fname, sizeof(fname), SC_ATOMIC_ADD(eve_file_id, 1))) {
+            SCLogError(SC_ERR_MEM_ALLOC, "Unable to create threaded filename for log");
+            goto error;
+        }
+        SCLogDebug("Thread open -- using name %s [replaces %s]", fname, log_path);
+        thread->fp = SCLogOpenFileFp(fname, append, thread->filemode);
+        if (thread->fp == NULL) {
+            goto error;
+        }
+        thread->filename = SCStrdup(fname);
+        if (!thread->filename) {
+            SCLogError(SC_ERR_MEM_ALLOC, "Unable to duplicate filename for context slot %d",
+                    thread_id);
+            goto error;
+        }
+        thread->is_regular = true;
+        thread->Write = SCLogFileWriteNoLock;
+        thread->Close = SCLogFileCloseNoLock;
+        OutputRegisterFileRotationFlag(&thread->rotation_flag);
+    } else if (parent_ctx->type == LOGFILE_TYPE_PLUGIN) {
+        thread->plugin.plugin->ThreadInit(
+                thread->plugin.init_data, thread_id, &thread->plugin.thread_data);
     }
-
     thread->threaded = false;
     thread->parent = parent_ctx;
     thread->id = thread_id;
-    thread->is_regular = true;
-    thread->Write = SCLogFileWriteNoLock;
-    thread->Close = SCLogFileCloseNoLock;
-    OutputRegisterFileRotationFlag(&thread->rotation_flag);
 
     parent_ctx->threads->lf_slots[thread_id] = thread;
     return true;
 
 error:
-    SC_ATOMIC_SUB(eve_file_id, 1);
-    if (thread->fp) {
-        thread->Close(thread);
+    if (parent_ctx->type == LOGFILE_TYPE_FILE) {
+        SC_ATOMIC_SUB(eve_file_id, 1);
+        if (thread->fp) {
+            thread->Close(thread);
+        }
     }
     if (thread) {
         SCFree(thread);
@@ -817,34 +823,46 @@ int LogFileFreeCtx(LogFileCtx *lf_ctx)
     }
 
     if (lf_ctx->threaded) {
+        BUG_ON(lf_ctx->threads == NULL);
         SCMutexDestroy(&lf_ctx->threads->mutex);
         for(int i = 0; i < lf_ctx->threads->slot_count; i++) {
-            if (lf_ctx->threads->lf_slots[i]) {
-                OutputUnregisterFileRotationFlag(&lf_ctx->threads->lf_slots[i]->rotation_flag);
-                lf_ctx->threads->lf_slots[i]->Close(lf_ctx->threads->lf_slots[i]);
-                SCFree(lf_ctx->threads->lf_slots[i]->filename);
-                SCFree(lf_ctx->threads->lf_slots[i]);
+            if (!lf_ctx->threads->lf_slots[i]) {
+                continue;
+            }
+            LogFileCtx *this_ctx = lf_ctx->threads->lf_slots[i];
+
+            if (lf_ctx->type != LOGFILE_TYPE_PLUGIN) {
+                OutputUnregisterFileRotationFlag(&this_ctx->rotation_flag);
+                this_ctx->Close(this_ctx);
+            } else {
+                lf_ctx->plugin.plugin->ThreadDeinit(
+                        this_ctx->plugin.init_data, this_ctx->plugin.thread_data);
             }
+            SCFree(lf_ctx->threads->lf_slots[i]->filename);
+            SCFree(lf_ctx->threads->lf_slots[i]);
         }
         SCFree(lf_ctx->threads->lf_slots);
-        SCFree(lf_ctx->threads->append);
+        if (lf_ctx->threads->append)
+            SCFree(lf_ctx->threads->append);
         SCFree(lf_ctx->threads);
     } else {
-        if (lf_ctx->type == LOGFILE_TYPE_PLUGIN) {
-            if (lf_ctx->plugin->Deinit != NULL) {
-                lf_ctx->plugin->Deinit(lf_ctx->plugin_data);
+        if (lf_ctx->type != LOGFILE_TYPE_PLUGIN) {
+            if (lf_ctx->fp != NULL) {
+                lf_ctx->Close(lf_ctx);
+            }
+            if (lf_ctx->parent) {
+                SCMutexLock(&lf_ctx->parent->threads->mutex);
+                lf_ctx->parent->threads->lf_slots[lf_ctx->id] = NULL;
+                SCMutexUnlock(&lf_ctx->parent->threads->mutex);
             }
-        } else if (lf_ctx->fp != NULL) {
-            lf_ctx->Close(lf_ctx);
-        }
-        if (lf_ctx->parent) {
-            SCMutexLock(&lf_ctx->parent->threads->mutex);
-            lf_ctx->parent->threads->lf_slots[lf_ctx->id] = NULL;
-            SCMutexUnlock(&lf_ctx->parent->threads->mutex);
         }
         SCMutexDestroy(&lf_ctx->fp_mutex);
     }
 
+    if (lf_ctx->type == LOGFILE_TYPE_PLUGIN) {
+        lf_ctx->plugin.plugin->Deinit(lf_ctx->plugin.init_data);
+    }
+
     if (lf_ctx->prefix != NULL) {
         SCFree(lf_ctx->prefix);
         lf_ctx->prefix_len = 0;
@@ -860,6 +878,7 @@ int LogFileFreeCtx(LogFileCtx *lf_ctx)
         OutputUnregisterFileRotationFlag(&lf_ctx->rotation_flag);
     }
 
+    memset(lf_ctx, 0, sizeof(*lf_ctx));
     SCFree(lf_ctx);
 
     SCReturnInt(1);
@@ -878,6 +897,9 @@ int LogFileWrite(LogFileCtx *file_ctx, MemBuffer *buffer)
         MemBufferWriteString(buffer, "\n");
         file_ctx->Write((const char *)MEMBUFFER_BUFFER(buffer),
                         MEMBUFFER_OFFSET(buffer), file_ctx);
+    } else if (file_ctx->type == LOGFILE_TYPE_PLUGIN) {
+        file_ctx->plugin.plugin->Write((const char *)MEMBUFFER_BUFFER(buffer),
+                MEMBUFFER_OFFSET(buffer), file_ctx->plugin.init_data, file_ctx->plugin.thread_data);
     }
 #ifdef HAVE_LIBHIREDIS
     else if (file_ctx->type == LOGFILE_TYPE_REDIS) {
@@ -887,10 +909,6 @@ int LogFileWrite(LogFileCtx *file_ctx, MemBuffer *buffer)
         SCMutexUnlock(&file_ctx->fp_mutex);
     }
 #endif
-    else if (file_ctx->type == LOGFILE_TYPE_PLUGIN) {
-        file_ctx->plugin->Write((const char *)MEMBUFFER_BUFFER(buffer),
-                        MEMBUFFER_OFFSET(buffer), file_ctx->plugin_data, NULL);
-    }
 
     return 0;
 }
index 58d95d27a13caa042f2667573de2659f2f0316d1..9b22c1f686ddbd8bf9de0158342fdb21f4655748 100644 (file)
@@ -1,4 +1,4 @@
-/* Copyright (C) 2007-2020 Open Information Security Foundation
+/* Copyright (C) 2007-2021 Open Information Security Foundation
  *
  * You can copy, redistribute or modify this Program under the terms of
  * the GNU General Public License version 2 as published by the Free
@@ -52,16 +52,22 @@ typedef struct LogThreadedFileCtx_ {
     char *append;
 } LogThreadedFileCtx;
 
+typedef struct LogFilePluginCtx_ {
+    SCPluginFileType *plugin;
+    void *init_data;
+    void *thread_data;
+} LogFilePluginCtx;
+
 /** Global structure for Output Context */
 typedef struct LogFileCtx_ {
     union {
         FILE *fp;
-        LogThreadedFileCtx *threads;
         void *plugin_data;
 #ifdef HAVE_LIBHIREDIS
         void *redis;
 #endif
     };
+    LogThreadedFileCtx *threads;
 
     union {
         SyslogSetup syslog_setup;
@@ -73,7 +79,7 @@ typedef struct LogFileCtx_ {
     int (*Write)(const char *buffer, int buffer_len, struct LogFileCtx_ *fp);
     void (*Close)(struct LogFileCtx_ *fp);
 
-    SCPluginFileType *plugin;
+    LogFilePluginCtx plugin;
 
     /** It will be locked if the log/alert
      * record cannot be written to the file in one call */
@@ -162,5 +168,7 @@ int LogFileWrite(LogFileCtx *file_ctx, MemBuffer *buffer);
 LogFileCtx *LogFileEnsureExists(LogFileCtx *lf_ctx, int thread_id);
 int SCConfLogOpenGeneric(ConfNode *conf, LogFileCtx *, const char *, int);
 int SCConfLogReopen(LogFileCtx *);
+bool SCLogOpenThreadedFile(
+        const char *log_path, const char *append, LogFileCtx *parent_ctx, int slot_count);
 
 #endif /* __UTIL_LOGOPENFILE_H__ */