From: Jeff Lucovsky Date: Mon, 16 Oct 2023 14:43:27 +0000 (-0400) Subject: output/plugin: Use Suri thread-id for plugins X-Git-Tag: suricata-8.0.0-beta1~1643 X-Git-Url: http://git.ipfire.org/cgi-bin/gitweb.cgi?a=commitdiff_plain;h=85d321a689753ff7c1647159089d9c2134bbbe1c;p=thirdparty%2Fsuricata.git output/plugin: Use Suri thread-id for plugins Issue: 6408 Use the Suricata thread id for plugin thread initialization to give the plugin a better correlating factor to the actual Suricata threads. --- diff --git a/src/output-eve-null.c b/src/output-eve-null.c index 1b62b96b36..298c3527a7 100644 --- a/src/output-eve-null.c +++ b/src/output-eve-null.c @@ -47,7 +47,7 @@ static int NullLogWrite(const char *buffer, int buffer_len, void *init_data, voi return 0; } -static int NullLogThreadInit(void *init_data, int thread_id, void **thread_data) +static int NullLogThreadInit(void *init_data, ThreadId thread_id, void **thread_data) { *thread_data = NULL; return 0; diff --git a/src/output-json-common.c b/src/output-json-common.c index 42595c3dde..5723e05a38 100644 --- a/src/output-json-common.c +++ b/src/output-json-common.c @@ -38,7 +38,7 @@ OutputJsonThreadCtx *CreateEveThreadCtx(ThreadVars *t, OutputJsonCtx *ctx) goto error; } - thread->file_ctx = LogFileEnsureExists(ctx->file_ctx); + thread->file_ctx = LogFileEnsureExists(t->id, ctx->file_ctx); if (!thread->file_ctx) { goto error; } @@ -104,7 +104,7 @@ TmEcode JsonLogThreadInit(ThreadVars *t, const void *initdata, void **data) } thread->ctx = ((OutputCtx *)initdata)->data; - thread->file_ctx = LogFileEnsureExists(thread->ctx->file_ctx); + thread->file_ctx = LogFileEnsureExists(t->id, thread->ctx->file_ctx); if (!thread->file_ctx) { goto error_exit; } diff --git a/src/output-json-stats.c b/src/output-json-stats.c index 7cc880727d..ecc7dce4b0 100644 --- a/src/output-json-stats.c +++ b/src/output-json-stats.c @@ -375,7 +375,7 @@ static TmEcode JsonStatsLogThreadInit(ThreadVars *t, const void *initdata, void /* Use the Output Context (file pointer and mutex) */ aft->statslog_ctx = ((OutputCtx *)initdata)->data; - aft->file_ctx = LogFileEnsureExists(aft->statslog_ctx->file_ctx); + aft->file_ctx = LogFileEnsureExists(t->id, aft->statslog_ctx->file_ctx); if (!aft->file_ctx) { goto error_exit; } @@ -471,8 +471,8 @@ static OutputInitResult OutputStatsLogInitSub(ConfNode *conf, OutputCtx *parent_ } SCLogDebug("Preparing file context for stats submodule logger"); - /* Share output slot with thread 1 */ - stats_ctx->file_ctx = LogFileEnsureExists(ajt->file_ctx); + /* prepared by suricata-main */ + stats_ctx->file_ctx = LogFileEnsureExists(0, ajt->file_ctx); if (!stats_ctx->file_ctx) { SCFree(stats_ctx); SCFree(output_ctx); diff --git a/src/suricata-plugin.h b/src/suricata-plugin.h index cb2c667eae..4ba5697691 100644 --- a/src/suricata-plugin.h +++ b/src/suricata-plugin.h @@ -40,6 +40,7 @@ typedef struct SCPlugin_ { } SCPlugin; typedef SCPlugin *(*SCPluginRegisterFunc)(void); +typedef uint32_t ThreadId; /** * Structure used to define an Eve output file type plugin. @@ -54,8 +55,9 @@ typedef struct SCEveFileType_ { int (*Write)(const char *buffer, int buffer_len, void *init_data, void *thread_data); /* Close - Called on final close */ void (*Deinit)(void *init_data); - /* ThreadInit - Called for each thread using file object*/ - int (*ThreadInit)(void *init_data, int thread_id, void **thread_data); + /* ThreadInit - Called for each thread using file object; non-zero thread_ids correlate + * to Suricata's worker threads; 0 correlates to the Suricata main thread */ + int (*ThreadInit)(void *init_data, ThreadId thread_id, void **thread_data); /* ThreadDeinit - Called for each thread using file object */ int (*ThreadDeinit)(void *init_data, void *thread_data); TAILQ_ENTRY(SCEveFileType_) entries; diff --git a/src/util-logopenfile.c b/src/util-logopenfile.c index 1b19864906..24dfcc4ff7 100644 --- a/src/util-logopenfile.c +++ b/src/util-logopenfile.c @@ -50,7 +50,7 @@ static bool LogFileNewThreadedCtx(LogFileCtx *parent_ctx, const char *log_path, ThreadLogFileHashEntry *entry); // Threaded eve.json identifier -static SC_ATOMIC_DECL_AND_INIT_WITH_VAL(uint32_t, eve_file_id, 1); +static SC_ATOMIC_DECL_AND_INIT_WITH_VAL(uint16_t, eve_file_id, 1); #ifdef BUILD_WITH_UNIXSOCKET /** \brief connect to the indicated local stream socket, logging any errors @@ -677,7 +677,7 @@ LogFileCtx *LogFileNewCtx(void) * Each thread -- identified by its operating system thread-id -- has its * own file entry that includes a file pointer. */ -static ThreadLogFileHashEntry *LogFileThread2Slot(LogThreadedFileCtx *parent) +static ThreadLogFileHashEntry *LogFileThread2Slot(LogThreadedFileCtx *parent, ThreadId thread_id) { ThreadLogFileHashEntry thread_hash_entry; @@ -689,12 +689,14 @@ static ThreadLogFileHashEntry *LogFileThread2Slot(LogThreadedFileCtx *parent) if (!ent) { ent = SCCalloc(1, sizeof(*ent)); if (!ent) { - FatalError("Unable to allocate thread/entry entry"); + FatalError("Unable to allocate thread/hash-entry entry"); } ent->thread_id = thread_hash_entry.thread_id; - SCLogDebug("Trying to add thread %ld to entry %d", ent->thread_id, ent->slot_number); + ent->internal_thread_id = thread_id; + SCLogDebug( + "Trying to add thread %" PRIi64 " to entry %d", ent->thread_id, ent->slot_number); if (0 != HashTableAdd(parent->ht, ent, 0)) { - FatalError("Unable to add thread/entry mapping"); + FatalError("Unable to add thread/hash-entry mapping"); } } return ent; @@ -704,7 +706,7 @@ static ThreadLogFileHashEntry *LogFileThread2Slot(LogThreadedFileCtx *parent) * \param parent_ctx * \retval LogFileCtx * pointer if successful; NULL otherwise */ -LogFileCtx *LogFileEnsureExists(LogFileCtx *parent_ctx) +LogFileCtx *LogFileEnsureExists(ThreadId thread_id, LogFileCtx *parent_ctx) { /* threaded output disabled */ if (!parent_ctx->threaded) @@ -712,15 +714,16 @@ LogFileCtx *LogFileEnsureExists(LogFileCtx *parent_ctx) SCMutexLock(&parent_ctx->threads->mutex); /* Find this thread's entry */ - ThreadLogFileHashEntry *entry = LogFileThread2Slot(parent_ctx->threads); - SCLogDebug("Adding reference for thread %ld [slot %d] to file %s [ctx %p]", SCGetThreadIdLong(), - entry->slot_number, parent_ctx->filename, parent_ctx); + ThreadLogFileHashEntry *entry = LogFileThread2Slot(parent_ctx->threads, thread_id); + SCLogDebug("%s: Adding reference for thread %" PRIi64 + " (local thread id %d) to file %s [ctx %p]", + t_thread_name, SCGetThreadIdLong(), thread_id, parent_ctx->filename, parent_ctx); bool new = entry->isopen; /* has it been opened yet? */ if (!entry->isopen) { - SCLogDebug("Opening new file for thread/slot %d to file %s [ctx %p]", entry->slot_number, - parent_ctx->filename, parent_ctx); + SCLogDebug("%s: Opening new file for thread/id %d to file %s [ctx %p]", t_thread_name, + thread_id, parent_ctx->filename, parent_ctx); if (LogFileNewThreadedCtx( parent_ctx, parent_ctx->filename, parent_ctx->threads->append, entry)) { entry->isopen = true; @@ -810,11 +813,13 @@ static bool LogFileNewThreadedCtx(LogFileCtx *parent_ctx, const char *log_path, *thread = *parent_ctx; if (parent_ctx->type == LOGFILE_TYPE_FILE) { char fname[LOGFILE_NAME_MAX]; - if (!LogFileThreadedName(log_path, fname, sizeof(fname), SC_ATOMIC_ADD(eve_file_id, 1))) { + entry->slot_number = SC_ATOMIC_ADD(eve_file_id, 1); + if (!LogFileThreadedName(log_path, fname, sizeof(fname), entry->slot_number)) { SCLogError("Unable to create threaded filename for log"); goto error; } - SCLogDebug("Thread open -- using name %s [replaces %s]", fname, log_path); + SCLogDebug("%s: thread open -- using name %s [replaces %s] - thread %d [slot %d]", + t_thread_name, fname, log_path, entry->internal_thread_id, entry->slot_number); thread->fp = SCLogOpenFileFp(fname, append, thread->filemode); if (thread->fp == NULL) { goto error; @@ -830,8 +835,10 @@ static bool LogFileNewThreadedCtx(LogFileCtx *parent_ctx, const char *log_path, OutputRegisterFileRotationFlag(&thread->rotation_flag); } else if (parent_ctx->type == LOGFILE_TYPE_PLUGIN) { entry->slot_number = SC_ATOMIC_ADD(eve_file_id, 1); + SCLogDebug("%s - thread %d [slot %d]", log_path, entry->internal_thread_id, + entry->slot_number); thread->plugin.plugin->ThreadInit( - thread->plugin.init_data, entry->slot_number, &thread->plugin.thread_data); + thread->plugin.init_data, entry->internal_thread_id, &thread->plugin.thread_data); } thread->threaded = false; thread->parent = parent_ctx; diff --git a/src/util-logopenfile.h b/src/util-logopenfile.h index e225be4890..7b78c9cc45 100644 --- a/src/util-logopenfile.h +++ b/src/util-logopenfile.h @@ -49,10 +49,13 @@ typedef struct SyslogSetup_ { } SyslogSetup; typedef struct ThreadLogFileHashEntry { - uint64_t thread_id; - int slot_number; /* slot identifier -- for plugins */ - bool isopen; struct LogFileCtx_ *ctx; + + uint64_t thread_id; /* OS thread identifier */ + ThreadId internal_thread_id; /* Suri internal thread id; to assist output plugins correlating + usage */ + uint16_t slot_number; /* Slot identifier - used when forming per-thread output names*/ + bool isopen; } ThreadLogFileHashEntry; struct LogFileCtx_; @@ -168,7 +171,7 @@ LogFileCtx *LogFileNewCtx(void); int LogFileFreeCtx(LogFileCtx *); int LogFileWrite(LogFileCtx *file_ctx, MemBuffer *buffer); -LogFileCtx *LogFileEnsureExists(LogFileCtx *lf_ctx); +LogFileCtx *LogFileEnsureExists(ThreadId thread_id, LogFileCtx *lf_ctx); int SCConfLogOpenGeneric(ConfNode *conf, LogFileCtx *, const char *, int); int SCConfLogReopen(LogFileCtx *); bool SCLogOpenThreadedFile(const char *log_path, const char *append, LogFileCtx *parent_ctx);