]> git.ipfire.org Git - thirdparty/suricata.git/commitdiff
log: Use hash table for slot maintenance
authorJeff Lucovsky <jeff@lucovsky.org>
Fri, 8 Apr 2022 13:45:35 +0000 (09:45 -0400)
committerJeff Lucovsky <jlucovsky@oisf.net>
Mon, 9 Jan 2023 15:20:33 +0000 (10:20 -0500)
Issue: 5198

This commit modifies the threaded logging support to use the hash table
for handling thread/slot mappings. As a result, it's no longer necessary
to provide the thread id when ensuring the log output exists.

src/output-json-common.c
src/output-json-stats.c
src/util-logopenfile.c
src/util-logopenfile.h

index 60295ea2be029a823b5f9c6c05832436bd0ce2cb..42595c3dde25fe30de2a5669e0101d9f82767b14 100644 (file)
@@ -38,7 +38,7 @@ OutputJsonThreadCtx *CreateEveThreadCtx(ThreadVars *t, OutputJsonCtx *ctx)
         goto error;
     }
 
-    thread->file_ctx = LogFileEnsureExists(ctx->file_ctx, t->id);
+    thread->file_ctx = LogFileEnsureExists(ctx->file_ctx);
     if (!thread->file_ctx) {
         goto error;
     }
@@ -104,7 +104,7 @@ TmEcode JsonLogThreadInit(ThreadVars *t, const void *initdata, void **data)
     }
 
     thread->ctx = ((OutputCtx *)initdata)->data;
-    thread->file_ctx = LogFileEnsureExists(thread->ctx->file_ctx, t->id);
+    thread->file_ctx = LogFileEnsureExists(thread->ctx->file_ctx);
     if (!thread->file_ctx) {
         goto error_exit;
     }
index 3a504874822f4061b19d9fec02e389b790e1dad5..c4564909044278ea38069ce01bec73b743100e11 100644 (file)
@@ -344,7 +344,7 @@ static TmEcode JsonStatsLogThreadInit(ThreadVars *t, const void *initdata, void
     /* Use the Output Context (file pointer and mutex) */
     aft->statslog_ctx = ((OutputCtx *)initdata)->data;
 
-    aft->file_ctx = LogFileEnsureExists(aft->statslog_ctx->file_ctx, t->id);
+    aft->file_ctx = LogFileEnsureExists(aft->statslog_ctx->file_ctx);
     if (!aft->file_ctx) {
         goto error_exit;
     }
index b6d3732cb33d3d79a6e2f576f3c387e5a553bec9..c80dc8ce12f0d2121fd72588b5d1554e664235f0 100644 (file)
@@ -372,6 +372,7 @@ bool SCLogOpenThreadedFile(
         }
 
         parent_ctx->threads->slot_count = slot_count;
+        parent_ctx->threads->last_slot = 0;
         parent_ctx->threads->lf_slots = SCCalloc(slot_count, sizeof(LogFileCtx *));
         if (!parent_ctx->threads->lf_slots) {
             SCLogError("Unable to allocate thread slots");
@@ -682,61 +683,102 @@ LogFileCtx *LogFileNewCtx(void)
     return lf_ctx;
 }
 
+/** \brief LogFileThread2Slot() Return a file slot
+ * \retval int file slot for caller
+ *
+ * This function returns the file slot for the calling thread.
+ * Each thread -- identified by its operating system thread-id -- has its
+ * own slot that includes a file pointer.
+ */
+static int LogFileThread2Slot(LogThreadedFileCtx *parent)
+{
+    ThreadSlotHashEntry thread_hash_entry;
+
+    /* Check hash table for thread id*/
+    thread_hash_entry.thread_id = SCGetThreadIdLong();
+    ThreadSlotHashEntry *ent =
+            HashTableLookup(parent->ht, &thread_hash_entry, sizeof(thread_hash_entry));
+
+    if (ent) {
+        return ent->slot;
+    }
+
+    ent = SCCalloc(1, sizeof(*ent));
+    if (!ent) {
+        FatalError("Unable to allocate thread/slot entry");
+    }
+    ent->thread_id = thread_hash_entry.thread_id;
+    ent->slot = ++parent->last_slot;
+    SCLogDebug("Trying to add thread %ld to slot %d", ent->thread_id, ent->slot);
+    if (0 != HashTableAdd(parent->ht, ent, 0)) {
+        FatalError("Unable to add thread/slot mapping");
+    }
+    return ent->slot;
+}
+
 /** \brief LogFileEnsureExists() Ensure a log file context for the thread exists
  * \param parent_ctx
- * \param thread_id
  * \retval LogFileCtx * pointer if successful; NULL otherwise
  */
-LogFileCtx *LogFileEnsureExists(LogFileCtx *parent_ctx, int thread_id)
+LogFileCtx *LogFileEnsureExists(LogFileCtx *parent_ctx)
 {
     /* threaded output disabled */
     if (!parent_ctx->threaded)
         return parent_ctx;
 
-    SCLogDebug("Adding reference %d to file ctx %p", thread_id, parent_ctx);
     SCMutexLock(&parent_ctx->threads->mutex);
-    /* are there enough context slots already */
-    if (thread_id < parent_ctx->threads->slot_count) {
-        /* has it been opened yet? */
-        if (!parent_ctx->threads->lf_slots[thread_id]) {
-            SCLogDebug("Opening new file for %d reference to file ctx %p", thread_id, parent_ctx);
-            LogFileNewThreadedCtx(parent_ctx, parent_ctx->filename, parent_ctx->threads->append, thread_id);
-        }
-        SCMutexUnlock(&parent_ctx->threads->mutex);
-        SCLogDebug("Existing file for %d reference to file ctx %p", thread_id, parent_ctx);
-        return parent_ctx->threads->lf_slots[thread_id];
-    }
-
-    /* ensure there's a slot for the caller */
-    int new_size = MAX(parent_ctx->threads->slot_count << 1, thread_id + 1);
-    SCLogDebug("Increasing slot count; current %d, trying %d",
-            parent_ctx->threads->slot_count, new_size);
-    LogFileCtx **new_array = SCRealloc(parent_ctx->threads->lf_slots, new_size * sizeof(LogFileCtx *));
-    if (new_array == NULL) {
-        /* Try one more time */
-        SCLogDebug("Unable to increase file context array size to %d; trying %d",
-                new_size, thread_id + 1);
-        new_size = thread_id + 1;
-        new_array = SCRealloc(parent_ctx->threads->lf_slots, new_size * sizeof(LogFileCtx *));
-    }
-
-    if (new_array == NULL) {
-        SCMutexUnlock(&parent_ctx->threads->mutex);
-        SCLogError("Unable to increase file context array size to %d", new_size);
-        return NULL;
-    }
+    /* Find this thread's slot */
+    int slot = LogFileThread2Slot(parent_ctx->threads);
+    SCLogDebug("Adding reference for thread %ld [slot %d] to file %s [ctx %p]", SCGetThreadIdLong(),
+            slot, parent_ctx->filename, parent_ctx);
+
+    /* Add slots if necessary */
+    if (slot >= parent_ctx->threads->slot_count) {
+        /* ensure there's a slot for the caller */
+        int new_size = MAX(parent_ctx->threads->slot_count << 1, slot + 1);
+        SCLogDebug("Increasing slot count; current %d, trying %d", parent_ctx->threads->slot_count,
+                new_size);
+        LogFileCtx **new_array =
+                SCRealloc(parent_ctx->threads->lf_slots, new_size * sizeof(LogFileCtx *));
+        if (new_array == NULL) {
+            /* Try one more time */
+            SCLogDebug("Unable to increase file context array size to %d; trying %d", new_size,
+                    slot + 1);
+            new_size = slot + 1;
+            new_array = SCRealloc(parent_ctx->threads->lf_slots, new_size * sizeof(LogFileCtx *));
+        }
+
+        if (new_array == NULL) {
+            SCMutexUnlock(&parent_ctx->threads->mutex);
+            SCLogError("Unable to increase file context array size to %d", new_size);
+            return NULL;
+        }
 
-    parent_ctx->threads->lf_slots = new_array;
-    /* initialize newly added slots */
-    for (int i = parent_ctx->threads->slot_count; i < new_size; i++) {
-        parent_ctx->threads->lf_slots[i] = NULL;
+        parent_ctx->threads->lf_slots = new_array;
+        /* initialize newly added slots */
+        for (int i = parent_ctx->threads->slot_count; i < new_size; i++) {
+            parent_ctx->threads->lf_slots[i] = NULL;
+        }
+        parent_ctx->threads->slot_count = new_size;
     }
-    parent_ctx->threads->slot_count = new_size;
-    LogFileNewThreadedCtx(parent_ctx, parent_ctx->filename, parent_ctx->threads->append, thread_id);
 
+    /* has it been opened yet? */
+    if (!parent_ctx->threads->lf_slots[slot]) {
+        SCLogDebug("Opening new file for thread/slot %d to file %s [ctx %p]", slot,
+                parent_ctx->filename, parent_ctx);
+        if (!LogFileNewThreadedCtx(
+                    parent_ctx, parent_ctx->filename, parent_ctx->threads->append, slot))
+            BUG_ON(parent_ctx->threads->lf_slots[slot] != NULL);
+    }
     SCMutexUnlock(&parent_ctx->threads->mutex);
 
-    return parent_ctx->threads->lf_slots[thread_id];
+    if (sc_log_global_log_level >= SC_LOG_DEBUG) {
+        if (parent_ctx->threads->lf_slots[slot])
+            SCLogDebug("Existing file for thread/slot %d reference to file %s [ctx %p]", slot,
+                    parent_ctx->filename, parent_ctx);
+    }
+
+    return parent_ctx->threads->lf_slots[slot];
 }
 
 /** \brief LogFileThreadedName() Create file name for threaded EVE storage
@@ -793,13 +835,14 @@ static bool LogFileThreadedName(
  * \param parent_ctx
  * \param log_path
  * \param append
- * \param thread_id
+ * \param slot
  */
-static bool LogFileNewThreadedCtx(LogFileCtx *parent_ctx, const char *log_path, const char *append, int thread_id)
+static bool LogFileNewThreadedCtx(
+        LogFileCtx *parent_ctx, const char *log_path, const char *append, int slot)
 {
     LogFileCtx *thread = SCCalloc(1, sizeof(LogFileCtx));
     if (!thread) {
-        SCLogError("Unable to allocate thread file context slot %d", thread_id);
+        SCLogError("Unable to allocate thread file context slot %d", slot);
         return false;
     }
 
@@ -817,7 +860,7 @@ static bool LogFileNewThreadedCtx(LogFileCtx *parent_ctx, const char *log_path,
         }
         thread->filename = SCStrdup(fname);
         if (!thread->filename) {
-            SCLogError("Unable to duplicate filename for context slot %d", thread_id);
+            SCLogError("Unable to duplicate filename for context slot %d", slot);
             goto error;
         }
         thread->is_regular = true;
@@ -826,13 +869,13 @@ static bool LogFileNewThreadedCtx(LogFileCtx *parent_ctx, const char *log_path,
         OutputRegisterFileRotationFlag(&thread->rotation_flag);
     } else if (parent_ctx->type == LOGFILE_TYPE_PLUGIN) {
         thread->plugin.plugin->ThreadInit(
-                thread->plugin.init_data, thread_id, &thread->plugin.thread_data);
+                thread->plugin.init_data, slot, &thread->plugin.thread_data);
     }
     thread->threaded = false;
     thread->parent = parent_ctx;
-    thread->id = thread_id;
+    thread->slot = slot;
 
-    parent_ctx->threads->lf_slots[thread_id] = thread;
+    parent_ctx->threads->lf_slots[slot] = thread;
     return true;
 
 error:
@@ -845,7 +888,7 @@ error:
     if (thread) {
         SCFree(thread);
     }
-    parent_ctx->threads->lf_slots[thread_id] = NULL;
+    parent_ctx->threads->lf_slots[slot] = NULL;
     return false;
 }
 
@@ -892,7 +935,7 @@ int LogFileFreeCtx(LogFileCtx *lf_ctx)
             }
             if (lf_ctx->parent) {
                 SCMutexLock(&lf_ctx->parent->threads->mutex);
-                lf_ctx->parent->threads->lf_slots[lf_ctx->id] = NULL;
+                lf_ctx->parent->threads->lf_slots[lf_ctx->slot] = NULL;
                 SCMutexUnlock(&lf_ctx->parent->threads->mutex);
             }
         }
index 153e3ccdf09f3e84aeaef19cce64bc3223f1b654..344972fca900628c6bf6dbb4d9e26cf279d4b5a9 100644 (file)
@@ -98,7 +98,7 @@ typedef struct LogFileCtx_ {
     /** When threaded, track of the parent and thread id */
     bool threaded;
     struct LogFileCtx_ *parent;
-    int id;
+    int slot;
 
     /** the type of file */
     enum LogFileType type;
@@ -175,7 +175,7 @@ LogFileCtx *LogFileNewCtx(void);
 int LogFileFreeCtx(LogFileCtx *);
 int LogFileWrite(LogFileCtx *file_ctx, MemBuffer *buffer);
 
-LogFileCtx *LogFileEnsureExists(LogFileCtx *lf_ctx, int thread_id);
+LogFileCtx *LogFileEnsureExists(LogFileCtx *lf_ctx);
 int SCConfLogOpenGeneric(ConfNode *conf, LogFileCtx *, const char *, int);
 int SCConfLogReopen(LogFileCtx *);
 bool SCLogOpenThreadedFile(