From: Jeff Lucovsky Date: Tue, 28 Jul 2020 13:03:05 +0000 (-0400) Subject: log: Support multi-threaded eve output. X-Git-Tag: suricata-6.0.0-beta1~64 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=aa20770277da4044ccdfa1ca4019c4d4c6dc78a7;p=thirdparty%2Fsuricata.git log: Support multi-threaded eve output. --- diff --git a/src/util-logopenfile.c b/src/util-logopenfile.c index 621232b2c8..b73b7e4917 100644 --- a/src/util-logopenfile.c +++ b/src/util-logopenfile.c @@ -1,5 +1,5 @@ /* vi: set et ts=4: */ -/* Copyright (C) 2007-2014 Open Information Security Foundation +/* Copyright (C) 2007-2020 Open Information Security Foundation * * You can copy, redistribute or modify this Program under the terms of * the GNU General Public License version 2 as published by the Free @@ -41,6 +41,9 @@ #include "util-log-redis.h" #endif /* HAVE_LIBHIREDIS */ +static bool LogFileNewThreadedCtx(LogFileCtx *parent_ctx, const char *log_path, const char *append, int i); +static bool SCLogOpenThreadedFileFp(const char *log_path, const char *append, LogFileCtx *parent_ctx, int slot_count); + #ifdef BUILD_WITH_UNIXSOCKET /** \brief connect to the indicated local stream socket, logging any errors * \param path filesystem path to connect to @@ -175,6 +178,40 @@ tryagain: } #endif /* BUILD_WITH_UNIXSOCKET */ +/** + * \brief Write buffer to log file. + * \retval 0 on failure; otherwise, the return value of fwrite_unlocked (number of + * characters successfully written). + */ +static int SCLogFileWriteNoLock(const char *buffer, int buffer_len, LogFileCtx *log_ctx) +{ + int ret = 0; + + BUG_ON(log_ctx->is_sock); + + /* Check for rotation. */ + if (log_ctx->rotation_flag) { + log_ctx->rotation_flag = 0; + SCConfLogReopen(log_ctx); + } + + if (log_ctx->flags & LOGFILE_ROTATE_INTERVAL) { + time_t now = time(NULL); + if (now >= log_ctx->rotate_time) { + SCConfLogReopen(log_ctx); + log_ctx->rotate_time = now + log_ctx->rotate_interval; + } + } + + if (log_ctx->fp) { + clearerr(log_ctx->fp); + ret = SCFwriteUnlocked(buffer, buffer_len, 1, log_ctx->fp); + fflush(log_ctx->fp); + } + + return ret; +} + /** * \brief Write buffer to log file. * \retval 0 on failure; otherwise, the return value of fwrite (number of @@ -239,12 +276,64 @@ static char *SCLogFilenameFromPattern(const char *pattern) return filename; } -static void SCLogFileClose(LogFileCtx *log_ctx) +static void SCLogFileCloseNoLock(LogFileCtx *log_ctx) { if (log_ctx->fp) fclose(log_ctx->fp); } +static void SCLogFileClose(LogFileCtx *log_ctx) +{ + SCMutexLock(&log_ctx->fp_mutex); + SCLogFileCloseNoLock(log_ctx); + SCMutexUnlock(&log_ctx->fp_mutex); +} + +static bool +SCLogOpenThreadedFileFp(const char *log_path, const char *append, LogFileCtx *parent_ctx, int slot_count) +{ + parent_ctx->threads = SCCalloc(1, sizeof(LogThreadedFileCtx)); + if (!parent_ctx->threads) { + SCLogError(SC_ERR_MEM_ALLOC, "Unable to allocate threads container"); + return false; + } + parent_ctx->threads->append = SCStrdup(append); + if (!parent_ctx->threads->append) { + SCLogError(SC_ERR_MEM_ALLOC, "Unable to allocate threads append setting"); + goto error_exit; + } + + parent_ctx->threads->slot_count = slot_count; + parent_ctx->threads->lf_slots = SCCalloc(slot_count, sizeof(LogFileCtx *)); + if (!parent_ctx->threads->lf_slots) { + SCLogError(SC_ERR_MEM_ALLOC, "Unable to allocate thread slots"); + goto error_exit; + } + SCLogDebug("Allocated %d file context pointers for threaded array", + parent_ctx->threads->slot_count); + int slot = 1; + for (; slot < parent_ctx->threads->slot_count; slot++) { + if (!LogFileNewThreadedCtx(parent_ctx, log_path, append, slot)) { + /* TODO: clear allocated entries [1, slot) */ + goto error_exit; + } + } + SCMutexInit(&parent_ctx->threads->mutex, NULL); + return true; + +error_exit: + + if (parent_ctx->threads->lf_slots) { + SCFree(parent_ctx->threads->lf_slots); + } + if (parent_ctx->threads->append) { + SCFree(parent_ctx->threads->append); + } + SCFree(parent_ctx->threads); + parent_ctx->threads = NULL; + return false; +} + /** \brief open the indicated file, logging any errors * \param path filesystem path to open * \param append_setting open file with O_APPEND: "yes" or "no" @@ -430,10 +519,16 @@ SCConfLogOpenGeneric(ConfNode *conf, #endif } else if (strcasecmp(filetype, DEFAULT_LOG_FILETYPE) == 0 || strcasecmp(filetype, "file") == 0) { - log_ctx->fp = SCLogOpenFileFp(log_path, append, log_ctx->filemode); - if (log_ctx->fp == NULL) - return -1; // Error already logged by Open...Fp routine log_ctx->is_regular = 1; + if (!log_ctx->threaded) { + log_ctx->fp = SCLogOpenFileFp(log_path, append, log_ctx->filemode); + if (log_ctx->fp == NULL) + return -1; // Error already logged by Open...Fp routine + } else { + if (!SCLogOpenThreadedFileFp(log_path, append, log_ctx, 1)) { + return -1; + } + } if (rotate) { OutputRegisterFileRotationFlag(&log_ctx->rotation_flag); } @@ -514,24 +609,125 @@ int SCConfLogReopen(LogFileCtx *log_ctx) LogFileCtx *LogFileNewCtx(void) { LogFileCtx* lf_ctx; - lf_ctx = (LogFileCtx*)SCMalloc(sizeof(LogFileCtx)); + lf_ctx = (LogFileCtx*)SCCalloc(1, sizeof(LogFileCtx)); if (lf_ctx == NULL) return NULL; - memset(lf_ctx, 0, sizeof(LogFileCtx)); - - SCMutexInit(&lf_ctx->fp_mutex,NULL); - // Default Write and Close functions lf_ctx->Write = SCLogFileWrite; lf_ctx->Close = SCLogFileClose; return lf_ctx; } +/** \brief LogFileEnsureExists() Ensure a log file context for the thread exists + * \param parent_ctx + * \param thread_id + * \retval LogFileCtx * pointer if successful; NULL otherwise + */ +LogFileCtx *LogFileEnsureExists(LogFileCtx *parent_ctx, int thread_id) +{ + /* threaded output disabled */ + if (!parent_ctx->threaded) + return parent_ctx; + + SCLogDebug("Adding reference %d to file ctx %p", thread_id, parent_ctx); + SCMutexLock(&parent_ctx->threads->mutex); + /* are there enough context slots already */ + if (thread_id < parent_ctx->threads->slot_count) { + /* has it been opened yet? */ + if (!parent_ctx->threads->lf_slots[thread_id]) { + SCLogDebug("Opening new file for %d reference to file ctx %p", thread_id, parent_ctx); + LogFileNewThreadedCtx(parent_ctx, parent_ctx->filename, parent_ctx->threads->append, thread_id); + } + SCLogDebug("Existing file for %d reference to file ctx %p", thread_id, parent_ctx); + SCMutexUnlock(&parent_ctx->threads->mutex); + return parent_ctx->threads->lf_slots[thread_id]; + } + + /* ensure there's a slot for the caller */ + int new_size = MAX(parent_ctx->threads->slot_count << 1, thread_id + 1); + SCLogDebug("Increasing slot count; current %d, trying %d", + parent_ctx->threads->slot_count, new_size); + LogFileCtx **new_array = SCRealloc(parent_ctx->threads->lf_slots, new_size * sizeof(LogFileCtx *)); + if (new_array == NULL) { + /* Try one more time */ + SCLogDebug("Unable to increase file context array size to %d; trying %d", + new_size, thread_id + 1); + new_size = thread_id + 1; + new_array = SCRealloc(parent_ctx->threads->lf_slots, new_size * sizeof(LogFileCtx *)); + } + + if (new_array == NULL) { + SCLogError(SC_ERR_MEM_ALLOC, "Unable to increase file context array size to %d", new_size); + SCMutexUnlock(&parent_ctx->threads->mutex); + return NULL; + } + + parent_ctx->threads->lf_slots = new_array; + /* initialize newly added slots */ + for (int i = parent_ctx->threads->slot_count; i < new_size; i++) { + parent_ctx->threads->lf_slots[i] = NULL; + } + parent_ctx->threads->slot_count = new_size; + LogFileNewThreadedCtx(parent_ctx, parent_ctx->filename, parent_ctx->threads->append, thread_id); + + SCMutexUnlock(&parent_ctx->threads->mutex); + + return parent_ctx->threads->lf_slots[thread_id]; +} + +/** \brief LogFileNewThreadedCtx() Create file context for threaded output + * \param parent_ctx + * \param log_path + * \param append + * \param thread_id + */ +static bool LogFileNewThreadedCtx(LogFileCtx *parent_ctx, const char *log_path, const char *append, int thread_id) +{ + LogFileCtx *thread = SCCalloc(1, sizeof(LogFileCtx)); + if (!thread) { + SCLogError(SC_ERR_MEM_ALLOC, "Unable to allocate thread file context slot %d", thread_id); + return false; + } + + *thread = *parent_ctx; + char fname[NAME_MAX]; + snprintf(fname, sizeof(fname), "%s.%d", log_path, thread_id); + SCLogDebug("Thread open -- using name %s [replaces %s]", fname, log_path); + thread->fp = SCLogOpenFileFp(fname, append, thread->filemode); + if (thread->fp == NULL) { + goto error; + } + thread->filename = SCStrdup(fname); + if (!thread->filename) { + SCLogError(SC_ERR_MEM_ALLOC, "Unable to duplicate filename for context slot %d", thread_id); + goto error; + } + + thread->threaded = false; + thread->parent = parent_ctx; + thread->id = thread_id; + thread->Write = SCLogFileWriteNoLock; + thread->Close = SCLogFileCloseNoLock; + + parent_ctx->threads->lf_slots[thread_id] = thread; + return true; + +error: + if (thread->fp) { + thread->Close(thread); + } + if (thread) { + SCFree(thread); + } + parent_ctx->threads->lf_slots[thread_id] = NULL; + return false; +} + /** \brief LogFileFreeCtx() Destroy a LogFileCtx (Close the file and free memory) - * \param motcx pointer to the OutputCtx - * \retval int 1 if succesful, 0 if error + * \param lf_ctx pointer to the OutputCtx + * \retval int 1 if successful, 0 if error * */ int LogFileFreeCtx(LogFileCtx *lf_ctx) { @@ -539,14 +735,29 @@ int LogFileFreeCtx(LogFileCtx *lf_ctx) SCReturnInt(0); } - if (lf_ctx->fp != NULL) { - SCMutexLock(&lf_ctx->fp_mutex); - lf_ctx->Close(lf_ctx); - SCMutexUnlock(&lf_ctx->fp_mutex); + if (lf_ctx->threaded) { + SCMutexDestroy(&lf_ctx->threads->mutex); + for(int i = 0; i < lf_ctx->threads->slot_count; i++) { + if (lf_ctx->threads->lf_slots[i]) { + SCFree(lf_ctx->threads->lf_slots[i]->filename); + SCFree(lf_ctx->threads->lf_slots[i]); + } + } + SCFree(lf_ctx->threads->lf_slots); + SCFree(lf_ctx->threads->append); + SCFree(lf_ctx->threads); + } else { + if (lf_ctx->fp != NULL) { + lf_ctx->Close(lf_ctx); + } + if (lf_ctx->parent) { + SCMutexLock(&lf_ctx->parent->threads->mutex); + lf_ctx->parent->threads->lf_slots[lf_ctx->id] = NULL; + SCMutexUnlock(&lf_ctx->parent->threads->mutex); + } + SCMutexDestroy(&lf_ctx->fp_mutex); } - SCMutexDestroy(&lf_ctx->fp_mutex); - if (lf_ctx->prefix != NULL) { SCFree(lf_ctx->prefix); lf_ctx->prefix_len = 0; diff --git a/src/util-logopenfile.h b/src/util-logopenfile.h index 40b4ed45b9..3b32a02d75 100644 --- a/src/util-logopenfile.h +++ b/src/util-logopenfile.h @@ -1,4 +1,4 @@ -/* Copyright (C) 2007-2014 Open Information Security Foundation +/* Copyright (C) 2007-2020 Open Information Security Foundation * * You can copy, redistribute or modify this Program under the terms of * the GNU General Public License version 2 as published by the Free @@ -46,12 +46,20 @@ typedef struct SyslogSetup_ { int alert_syslog_level; } SyslogSetup; +struct LogFileCtx_; +typedef struct LogThreadedFileCtx_ { + int slot_count; + SCMutex mutex; + struct LogFileCtx_ **lf_slots; + char *append; +} LogThreadedFileCtx; /** Global structure for Output Context */ typedef struct LogFileCtx_ { union { FILE *fp; PcieFile *pcie_fp; + LogThreadedFileCtx *threads; #ifdef HAVE_LIBHIREDIS void *redis; #endif @@ -71,6 +79,11 @@ typedef struct LogFileCtx_ { * record cannot be written to the file in one call */ SCMutex fp_mutex; + /** When threaded, track of the parent and thread id */ + bool threaded; + struct LogFileCtx_ *parent; + int id; + /** the type of file */ enum LogFileType type; @@ -144,6 +157,7 @@ LogFileCtx *LogFileNewCtx(void); int LogFileFreeCtx(LogFileCtx *); int LogFileWrite(LogFileCtx *file_ctx, MemBuffer *buffer); +LogFileCtx *LogFileEnsureExists(LogFileCtx *lf_ctx, int thread_id); int SCConfLogOpenGeneric(ConfNode *conf, LogFileCtx *, const char *, int); int SCConfLogReopen(LogFileCtx *);