From: Jeff Lucovsky Date: Fri, 26 Mar 2021 12:43:11 +0000 (-0400) Subject: output/plugin: Support threaded output plugins X-Git-Tag: suricata-7.0.0-beta1~1637 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=1defca3c34f58b1bc220cebffcb2238d303cd9d2;p=thirdparty%2Fsuricata.git output/plugin: Support threaded output plugins --- diff --git a/src/output-json.c b/src/output-json.c index 610757c20d..82637ad83a 100644 --- a/src/output-json.c +++ b/src/output-json.c @@ -1113,20 +1113,19 @@ OutputInitResult OutputJsonInitCtx(ConfNode *conf) json_ctx->file_ctx->prefix_len = strlen(prefix); } + /* Threaded file output */ + const ConfNode *threaded = ConfNodeLookupChild(conf, "threaded"); + if (threaded && threaded->val && ConfValIsTrue(threaded->val)) { + SCLogConfig("Threaded EVE logging configured"); + json_ctx->file_ctx->threaded = true; + } else { + json_ctx->file_ctx->threaded = false; + } + if (json_ctx->json_out == LOGFILE_TYPE_FILE || json_ctx->json_out == LOGFILE_TYPE_UNIX_DGRAM || json_ctx->json_out == LOGFILE_TYPE_UNIX_STREAM) { - if (json_ctx->json_out == LOGFILE_TYPE_FILE) { - /* Threaded file output */ - const ConfNode *threaded = ConfNodeLookupChild(conf, "threaded"); - if (threaded && threaded->val && ConfValIsTrue(threaded->val)) { - SCLogConfig("Enabling threaded eve logging."); - json_ctx->file_ctx->threaded = true; - } else { - json_ctx->file_ctx->threaded = false; - } - } if (SCConfLogOpenGeneric(conf, json_ctx->file_ctx, DEFAULT_LOG_FILENAME, 1) < 0) { LogFileFreeCtx(json_ctx->file_ctx); @@ -1183,6 +1182,7 @@ OutputInitResult OutputJsonInitCtx(ConfNode *conf) } if (SCConfLogOpenRedis(redis_node, json_ctx->file_ctx) < 0) { + SCFree(json_ctx->file_ctx->sensor_name); LogFileFreeCtx(json_ctx->file_ctx); SCFree(json_ctx); SCFree(output_ctx); @@ -1190,20 +1190,39 @@ OutputInitResult OutputJsonInitCtx(ConfNode *conf) } } #endif +#ifdef HAVE_PLUGINS else if (json_ctx->json_out == LOGFILE_TYPE_PLUGIN) { - ConfNode *plugin_conf = ConfNodeLookupChild(conf, - json_ctx->plugin->name); - void *plugin_data = NULL; - if (json_ctx->plugin->Init(plugin_conf, false, &plugin_data) < 0) { + if (json_ctx->file_ctx->threaded) { + /* Prepare for storing per-thread data */ + if (!SCLogOpenThreadedFile(NULL, NULL, json_ctx->file_ctx, 1)) { + SCFree(json_ctx); + SCFree(output_ctx); + return result; + } + } + + void *init_data = NULL; + if (json_ctx->plugin->Init(conf, json_ctx->file_ctx->threaded, &init_data) < 0) { LogFileFreeCtx(json_ctx->file_ctx); SCFree(json_ctx); SCFree(output_ctx); return result; - } else { - json_ctx->file_ctx->plugin = json_ctx->plugin; - json_ctx->file_ctx->plugin_data = plugin_data; } + + /* Now that initialization completed successfully, if threaded, make sure + * that ThreadInit and ThreadDeInit exist + */ + if (json_ctx->file_ctx->threaded) { + if (!json_ctx->plugin->ThreadInit || !json_ctx->plugin->ThreadDeinit) { + FatalError(SC_ERR_LOG_OUTPUT, "Output logger must supply ThreadInit and " + "ThreadDeinit functions for threaded mode"); + } + } + + json_ctx->file_ctx->plugin.plugin = json_ctx->plugin; + json_ctx->file_ctx->plugin.init_data = init_data; } +#endif const char *sensor_id_s = ConfNodeLookupChildValue(conf, "sensor-id"); if (sensor_id_s != NULL) { diff --git a/src/util-logopenfile.c b/src/util-logopenfile.c index 952018d0c2..2e324d9d1f 100644 --- a/src/util-logopenfile.c +++ b/src/util-logopenfile.c @@ -42,7 +42,6 @@ #endif /* HAVE_LIBHIREDIS */ static bool LogFileNewThreadedCtx(LogFileCtx *parent_ctx, const char *log_path, const char *append, int i); -static bool SCLogOpenThreadedFileFp(const char *log_path, const char *append, LogFileCtx *parent_ctx, int slot_count); // Threaded eve.json identifier static SC_ATOMIC_DECL_AND_INIT_WITH_VAL(uint32_t, eve_file_id, 1); @@ -321,18 +320,20 @@ static void SCLogFileClose(LogFileCtx *log_ctx) SCMutexUnlock(&log_ctx->fp_mutex); } -static bool -SCLogOpenThreadedFileFp(const char *log_path, const char *append, LogFileCtx *parent_ctx, int slot_count) +bool SCLogOpenThreadedFile( + const char *log_path, const char *append, LogFileCtx *parent_ctx, int slot_count) { parent_ctx->threads = SCCalloc(1, sizeof(LogThreadedFileCtx)); if (!parent_ctx->threads) { SCLogError(SC_ERR_MEM_ALLOC, "Unable to allocate threads container"); return false; } - parent_ctx->threads->append = SCStrdup(append); - if (!parent_ctx->threads->append) { - SCLogError(SC_ERR_MEM_ALLOC, "Unable to allocate threads append setting"); - goto error_exit; + if (append) { + parent_ctx->threads->append = SCStrdup(append); + if (!parent_ctx->threads->append) { + SCLogError(SC_ERR_MEM_ALLOC, "Unable to allocate threads append setting"); + goto error_exit; + } } parent_ctx->threads->slot_count = slot_count; @@ -530,6 +531,13 @@ SCConfLogOpenGeneric(ConfNode *conf, log_ctx->json_flags &= ~(JSON_ESCAPE_SLASH); } +#ifdef BUILD_WITH_UNIXSOCKET + if (log_ctx->threaded) { + if (strcasecmp(filetype, "unix_stream") == 0 || strcasecmp(filetype, "unix_dgram") == 0) { + FatalError(SC_ERR_FATAL, "Socket file types do not support threaded output"); + } + } +#endif // Now, what have we been asked to open? if (strcasecmp(filetype, "unix_stream") == 0) { #ifdef BUILD_WITH_UNIXSOCKET @@ -557,22 +565,13 @@ SCConfLogOpenGeneric(ConfNode *conf, if (log_ctx->fp == NULL) return -1; // Error already logged by Open...Fp routine } else { - if (!SCLogOpenThreadedFileFp(log_path, append, log_ctx, 1)) { + if (!SCLogOpenThreadedFile(log_path, append, log_ctx, 1)) { return -1; } } if (rotate) { OutputRegisterFileRotationFlag(&log_ctx->rotation_flag); } -#ifdef HAVE_LIBHIREDIS - } else if (strcasecmp(filetype, "redis") == 0) { - ConfNode *redis_node = ConfNodeLookupChild(conf, "redis"); - if (SCConfLogOpenRedis(redis_node, log_ctx) < 0) { - SCLogError(SC_ERR_REDIS, "failed to open redis output"); - return -1; - } - log_ctx->type = LOGFILE_TYPE_REDIS; -#endif } else { SCLogError(SC_ERR_INVALID_YAML_CONF_ENTRY, "Invalid entry for " "%s.filetype. Expected \"regular\" (default), \"unix_stream\", " @@ -671,8 +670,8 @@ LogFileCtx *LogFileEnsureExists(LogFileCtx *parent_ctx, int thread_id) SCLogDebug("Opening new file for %d reference to file ctx %p", thread_id, parent_ctx); LogFileNewThreadedCtx(parent_ctx, parent_ctx->filename, parent_ctx->threads->append, thread_id); } - SCLogDebug("Existing file for %d reference to file ctx %p", thread_id, parent_ctx); SCMutexUnlock(&parent_ctx->threads->mutex); + SCLogDebug("Existing file for %d reference to file ctx %p", thread_id, parent_ctx); return parent_ctx->threads->lf_slots[thread_id]; } @@ -690,8 +689,8 @@ LogFileCtx *LogFileEnsureExists(LogFileCtx *parent_ctx, int thread_id) } if (new_array == NULL) { - SCLogError(SC_ERR_MEM_ALLOC, "Unable to increase file context array size to %d", new_size); SCMutexUnlock(&parent_ctx->threads->mutex); + SCLogError(SC_ERR_MEM_ALLOC, "Unable to increase file context array size to %d", new_size); return NULL; } @@ -767,37 +766,44 @@ static bool LogFileNewThreadedCtx(LogFileCtx *parent_ctx, const char *log_path, } *thread = *parent_ctx; - char fname[NAME_MAX]; - if (!LogFileThreadedName(log_path, fname, sizeof(fname), SC_ATOMIC_ADD(eve_file_id, 1))) { - SCLogError(SC_ERR_MEM_ALLOC, "Unable to create threaded filename for log"); - goto error; - } - SCLogDebug("Thread open -- using name %s [replaces %s]", fname, log_path); - thread->fp = SCLogOpenFileFp(fname, append, thread->filemode); - if (thread->fp == NULL) { - goto error; - } - thread->filename = SCStrdup(fname); - if (!thread->filename) { - SCLogError(SC_ERR_MEM_ALLOC, "Unable to duplicate filename for context slot %d", thread_id); - goto error; + if (parent_ctx->type == LOGFILE_TYPE_FILE) { + char fname[NAME_MAX]; + if (!LogFileThreadedName(log_path, fname, sizeof(fname), SC_ATOMIC_ADD(eve_file_id, 1))) { + SCLogError(SC_ERR_MEM_ALLOC, "Unable to create threaded filename for log"); + goto error; + } + SCLogDebug("Thread open -- using name %s [replaces %s]", fname, log_path); + thread->fp = SCLogOpenFileFp(fname, append, thread->filemode); + if (thread->fp == NULL) { + goto error; + } + thread->filename = SCStrdup(fname); + if (!thread->filename) { + SCLogError(SC_ERR_MEM_ALLOC, "Unable to duplicate filename for context slot %d", + thread_id); + goto error; + } + thread->is_regular = true; + thread->Write = SCLogFileWriteNoLock; + thread->Close = SCLogFileCloseNoLock; + OutputRegisterFileRotationFlag(&thread->rotation_flag); + } else if (parent_ctx->type == LOGFILE_TYPE_PLUGIN) { + thread->plugin.plugin->ThreadInit( + thread->plugin.init_data, thread_id, &thread->plugin.thread_data); } - thread->threaded = false; thread->parent = parent_ctx; thread->id = thread_id; - thread->is_regular = true; - thread->Write = SCLogFileWriteNoLock; - thread->Close = SCLogFileCloseNoLock; - OutputRegisterFileRotationFlag(&thread->rotation_flag); parent_ctx->threads->lf_slots[thread_id] = thread; return true; error: - SC_ATOMIC_SUB(eve_file_id, 1); - if (thread->fp) { - thread->Close(thread); + if (parent_ctx->type == LOGFILE_TYPE_FILE) { + SC_ATOMIC_SUB(eve_file_id, 1); + if (thread->fp) { + thread->Close(thread); + } } if (thread) { SCFree(thread); @@ -817,34 +823,46 @@ int LogFileFreeCtx(LogFileCtx *lf_ctx) } if (lf_ctx->threaded) { + BUG_ON(lf_ctx->threads == NULL); SCMutexDestroy(&lf_ctx->threads->mutex); for(int i = 0; i < lf_ctx->threads->slot_count; i++) { - if (lf_ctx->threads->lf_slots[i]) { - OutputUnregisterFileRotationFlag(&lf_ctx->threads->lf_slots[i]->rotation_flag); - lf_ctx->threads->lf_slots[i]->Close(lf_ctx->threads->lf_slots[i]); - SCFree(lf_ctx->threads->lf_slots[i]->filename); - SCFree(lf_ctx->threads->lf_slots[i]); + if (!lf_ctx->threads->lf_slots[i]) { + continue; + } + LogFileCtx *this_ctx = lf_ctx->threads->lf_slots[i]; + + if (lf_ctx->type != LOGFILE_TYPE_PLUGIN) { + OutputUnregisterFileRotationFlag(&this_ctx->rotation_flag); + this_ctx->Close(this_ctx); + } else { + lf_ctx->plugin.plugin->ThreadDeinit( + this_ctx->plugin.init_data, this_ctx->plugin.thread_data); } + SCFree(lf_ctx->threads->lf_slots[i]->filename); + SCFree(lf_ctx->threads->lf_slots[i]); } SCFree(lf_ctx->threads->lf_slots); - SCFree(lf_ctx->threads->append); + if (lf_ctx->threads->append) + SCFree(lf_ctx->threads->append); SCFree(lf_ctx->threads); } else { - if (lf_ctx->type == LOGFILE_TYPE_PLUGIN) { - if (lf_ctx->plugin->Deinit != NULL) { - lf_ctx->plugin->Deinit(lf_ctx->plugin_data); + if (lf_ctx->type != LOGFILE_TYPE_PLUGIN) { + if (lf_ctx->fp != NULL) { + lf_ctx->Close(lf_ctx); + } + if (lf_ctx->parent) { + SCMutexLock(&lf_ctx->parent->threads->mutex); + lf_ctx->parent->threads->lf_slots[lf_ctx->id] = NULL; + SCMutexUnlock(&lf_ctx->parent->threads->mutex); } - } else if (lf_ctx->fp != NULL) { - lf_ctx->Close(lf_ctx); - } - if (lf_ctx->parent) { - SCMutexLock(&lf_ctx->parent->threads->mutex); - lf_ctx->parent->threads->lf_slots[lf_ctx->id] = NULL; - SCMutexUnlock(&lf_ctx->parent->threads->mutex); } SCMutexDestroy(&lf_ctx->fp_mutex); } + if (lf_ctx->type == LOGFILE_TYPE_PLUGIN) { + lf_ctx->plugin.plugin->Deinit(lf_ctx->plugin.init_data); + } + if (lf_ctx->prefix != NULL) { SCFree(lf_ctx->prefix); lf_ctx->prefix_len = 0; @@ -860,6 +878,7 @@ int LogFileFreeCtx(LogFileCtx *lf_ctx) OutputUnregisterFileRotationFlag(&lf_ctx->rotation_flag); } + memset(lf_ctx, 0, sizeof(*lf_ctx)); SCFree(lf_ctx); SCReturnInt(1); @@ -878,6 +897,9 @@ int LogFileWrite(LogFileCtx *file_ctx, MemBuffer *buffer) MemBufferWriteString(buffer, "\n"); file_ctx->Write((const char *)MEMBUFFER_BUFFER(buffer), MEMBUFFER_OFFSET(buffer), file_ctx); + } else if (file_ctx->type == LOGFILE_TYPE_PLUGIN) { + file_ctx->plugin.plugin->Write((const char *)MEMBUFFER_BUFFER(buffer), + MEMBUFFER_OFFSET(buffer), file_ctx->plugin.init_data, file_ctx->plugin.thread_data); } #ifdef HAVE_LIBHIREDIS else if (file_ctx->type == LOGFILE_TYPE_REDIS) { @@ -887,10 +909,6 @@ int LogFileWrite(LogFileCtx *file_ctx, MemBuffer *buffer) SCMutexUnlock(&file_ctx->fp_mutex); } #endif - else if (file_ctx->type == LOGFILE_TYPE_PLUGIN) { - file_ctx->plugin->Write((const char *)MEMBUFFER_BUFFER(buffer), - MEMBUFFER_OFFSET(buffer), file_ctx->plugin_data, NULL); - } return 0; } diff --git a/src/util-logopenfile.h b/src/util-logopenfile.h index 58d95d27a1..9b22c1f686 100644 --- a/src/util-logopenfile.h +++ b/src/util-logopenfile.h @@ -1,4 +1,4 @@ -/* Copyright (C) 2007-2020 Open Information Security Foundation +/* Copyright (C) 2007-2021 Open Information Security Foundation * * You can copy, redistribute or modify this Program under the terms of * the GNU General Public License version 2 as published by the Free @@ -52,16 +52,22 @@ typedef struct LogThreadedFileCtx_ { char *append; } LogThreadedFileCtx; +typedef struct LogFilePluginCtx_ { + SCPluginFileType *plugin; + void *init_data; + void *thread_data; +} LogFilePluginCtx; + /** Global structure for Output Context */ typedef struct LogFileCtx_ { union { FILE *fp; - LogThreadedFileCtx *threads; void *plugin_data; #ifdef HAVE_LIBHIREDIS void *redis; #endif }; + LogThreadedFileCtx *threads; union { SyslogSetup syslog_setup; @@ -73,7 +79,7 @@ typedef struct LogFileCtx_ { int (*Write)(const char *buffer, int buffer_len, struct LogFileCtx_ *fp); void (*Close)(struct LogFileCtx_ *fp); - SCPluginFileType *plugin; + LogFilePluginCtx plugin; /** It will be locked if the log/alert * record cannot be written to the file in one call */ @@ -162,5 +168,7 @@ int LogFileWrite(LogFileCtx *file_ctx, MemBuffer *buffer); LogFileCtx *LogFileEnsureExists(LogFileCtx *lf_ctx, int thread_id); int SCConfLogOpenGeneric(ConfNode *conf, LogFileCtx *, const char *, int); int SCConfLogReopen(LogFileCtx *); +bool SCLogOpenThreadedFile( + const char *log_path, const char *append, LogFileCtx *parent_ctx, int slot_count); #endif /* __UTIL_LOGOPENFILE_H__ */