#define LOGFILE_NAME_MAX 255
-static bool LogFileNewThreadedCtx(LogFileCtx *parent_ctx, const char *log_path, const char *append, int i);
+static bool LogFileNewThreadedCtx(LogFileCtx *parent_ctx, const char *log_path, const char *append,
+ ThreadLogFileHashEntry *entry);
// Threaded eve.json identifier
static SC_ATOMIC_DECL_AND_INIT_WITH_VAL(uint32_t, eve_file_id, 1);
SCMutexUnlock(&log_ctx->fp_mutex);
}
-static char ThreadSlotHashCompareFunc(
+static char ThreadLogFileHashCompareFunc(
void *data1, uint16_t datalen1, void *data2, uint16_t datalen2)
{
- ThreadSlotHashEntry *p1 = (ThreadSlotHashEntry *)data1;
- ThreadSlotHashEntry *p2 = (ThreadSlotHashEntry *)data2;
+ ThreadLogFileHashEntry *p1 = (ThreadLogFileHashEntry *)data1;
+ ThreadLogFileHashEntry *p2 = (ThreadLogFileHashEntry *)data2;
if (p1 == NULL || p2 == NULL)
return 0;
return p1->thread_id == p2->thread_id;
}
-static uint32_t ThreadSlotHashFunc(HashTable *ht, void *data, uint16_t datalen)
+static uint32_t ThreadLogFileHashFunc(HashTable *ht, void *data, uint16_t datalen)
{
- const ThreadSlotHashEntry *ent = (ThreadSlotHashEntry *)data;
+ const ThreadLogFileHashEntry *ent = (ThreadLogFileHashEntry *)data;
return ent->thread_id % ht->array_size;
}
-static void ThreadSlotHashFreeFunc(void *data)
+static void ThreadLogFileHashFreeFunc(void *data)
{
- ThreadSlotHashEntry *thread_ent = (ThreadSlotHashEntry *)data;
+ BUG_ON(data == NULL);
+ ThreadLogFileHashEntry *thread_ent = (ThreadLogFileHashEntry *)data;
if (thread_ent) {
+ LogFileCtx *lf_ctx = thread_ent->ctx;
+ /* Free the leaf log file entries */
+ if (!lf_ctx->threaded) {
+ LogFileFreeCtx(lf_ctx);
+ }
SCFree(thread_ent);
}
}
-bool SCLogOpenThreadedFile(
- const char *log_path, const char *append, LogFileCtx *parent_ctx, int slot_count)
+bool SCLogOpenThreadedFile(const char *log_path, const char *append, LogFileCtx *parent_ctx)
{
parent_ctx->threads = SCCalloc(1, sizeof(LogThreadedFileCtx));
if (!parent_ctx->threads) {
return false;
}
- parent_ctx->threads->ht = HashTableInit(
- 255, ThreadSlotHashFunc, ThreadSlotHashCompareFunc, ThreadSlotHashFreeFunc);
+ parent_ctx->threads->ht = HashTableInit(255, ThreadLogFileHashFunc,
+ ThreadLogFileHashCompareFunc, ThreadLogFileHashFreeFunc);
if (!parent_ctx->threads->ht) {
- FatalError("Unable to initialize thread/slot table");
+ FatalError("Unable to initialize thread/entry hash table");
}
parent_ctx->threads->append = SCStrdup(append == NULL ? DEFAULT_LOG_MODE_APPEND : append);
goto error_exit;
}
- parent_ctx->threads->slot_count = slot_count;
- parent_ctx->threads->last_slot = 0;
- parent_ctx->threads->lf_slots = SCCalloc(slot_count, sizeof(LogFileCtx *));
- if (!parent_ctx->threads->lf_slots) {
- SCLogError("Unable to allocate thread slots");
- goto error_exit;
- }
- SCLogDebug("Allocated %d file context pointers for threaded array",
- parent_ctx->threads->slot_count);
- for (int slot = 1; slot < parent_ctx->threads->slot_count; slot++) {
- if (!LogFileNewThreadedCtx(parent_ctx, log_path, parent_ctx->threads->append, slot)) {
- /* TODO: clear allocated entries [1, slot) */
- goto error_exit;
- }
- }
SCMutexInit(&parent_ctx->threads->mutex, NULL);
return true;
error_exit:
- if (parent_ctx->threads->lf_slots) {
- SCFree(parent_ctx->threads->lf_slots);
- }
if (parent_ctx->threads->append) {
SCFree(parent_ctx->threads->append);
}
if (log_ctx->fp == NULL)
return -1; // Error already logged by Open...Fp routine
} else {
- if (!SCLogOpenThreadedFile(log_path, append, log_ctx, 1)) {
+ if (!SCLogOpenThreadedFile(log_path, append, log_ctx)) {
return -1;
}
}
return lf_ctx;
}
-/** \brief LogFileThread2Slot() Return a file slot
- * \retval int file slot for caller
+/** \brief LogFileThread2Slot() Return a file entry
+ * \retval ThreadLogFileHashEntry * file entry for caller
*
- * This function returns the file slot for the calling thread.
+ * This function returns the file entry for the calling thread.
* Each thread -- identified by its operating system thread-id -- has its
- * own slot that includes a file pointer.
+ * own file entry that includes a file pointer.
*/
-static int LogFileThread2Slot(LogThreadedFileCtx *parent)
+static ThreadLogFileHashEntry *LogFileThread2Slot(LogThreadedFileCtx *parent)
{
- ThreadSlotHashEntry thread_hash_entry;
+ ThreadLogFileHashEntry thread_hash_entry;
/* Check hash table for thread id*/
thread_hash_entry.thread_id = SCGetThreadIdLong();
- ThreadSlotHashEntry *ent =
+ ThreadLogFileHashEntry *ent =
HashTableLookup(parent->ht, &thread_hash_entry, sizeof(thread_hash_entry));
- if (ent) {
- return ent->slot;
- }
-
- ent = SCCalloc(1, sizeof(*ent));
if (!ent) {
- FatalError("Unable to allocate thread/slot entry");
- }
- ent->thread_id = thread_hash_entry.thread_id;
- ent->slot = ++parent->last_slot;
- SCLogDebug("Trying to add thread %ld to slot %d", ent->thread_id, ent->slot);
- if (0 != HashTableAdd(parent->ht, ent, 0)) {
- FatalError("Unable to add thread/slot mapping");
+ ent = SCCalloc(1, sizeof(*ent));
+ if (!ent) {
+ FatalError("Unable to allocate thread/entry entry");
+ }
+ ent->thread_id = thread_hash_entry.thread_id;
+ SCLogDebug("Trying to add thread %ld to entry %d", ent->thread_id, ent->slot_number);
+ if (0 != HashTableAdd(parent->ht, ent, 0)) {
+ FatalError("Unable to add thread/entry mapping");
+ }
}
- return ent->slot;
+ return ent;
}
/** \brief LogFileEnsureExists() Ensure a log file context for the thread exists
return parent_ctx;
SCMutexLock(&parent_ctx->threads->mutex);
- /* Find this thread's slot */
- int slot = LogFileThread2Slot(parent_ctx->threads);
+ /* Find this thread's entry */
+ ThreadLogFileHashEntry *entry = LogFileThread2Slot(parent_ctx->threads);
SCLogDebug("Adding reference for thread %ld [slot %d] to file %s [ctx %p]", SCGetThreadIdLong(),
- slot, parent_ctx->filename, parent_ctx);
-
- /* Add slots if necessary */
- if (slot >= parent_ctx->threads->slot_count) {
- /* ensure there's a slot for the caller */
- int new_size = MAX(parent_ctx->threads->slot_count << 1, slot + 1);
- SCLogDebug("Increasing slot count; current %d, trying %d", parent_ctx->threads->slot_count,
- new_size);
- LogFileCtx **new_array =
- SCRealloc(parent_ctx->threads->lf_slots, new_size * sizeof(LogFileCtx *));
- if (new_array == NULL) {
- /* Try one more time */
- SCLogDebug("Unable to increase file context array size to %d; trying %d", new_size,
- slot + 1);
- new_size = slot + 1;
- new_array = SCRealloc(parent_ctx->threads->lf_slots, new_size * sizeof(LogFileCtx *));
- }
-
- if (new_array == NULL) {
- SCMutexUnlock(&parent_ctx->threads->mutex);
- SCLogError("Unable to increase file context array size to %d", new_size);
- return NULL;
- }
-
- parent_ctx->threads->lf_slots = new_array;
- /* initialize newly added slots */
- for (int i = parent_ctx->threads->slot_count; i < new_size; i++) {
- parent_ctx->threads->lf_slots[i] = NULL;
- }
- parent_ctx->threads->slot_count = new_size;
- }
+ entry->slot_number, parent_ctx->filename, parent_ctx);
+ bool new = entry->isopen;
/* has it been opened yet? */
- if (!parent_ctx->threads->lf_slots[slot]) {
- SCLogDebug("Opening new file for thread/slot %d to file %s [ctx %p]", slot,
+ if (!entry->isopen) {
+ SCLogDebug("Opening new file for thread/slot %d to file %s [ctx %p]", entry->slot_number,
parent_ctx->filename, parent_ctx);
- if (!LogFileNewThreadedCtx(
- parent_ctx, parent_ctx->filename, parent_ctx->threads->append, slot))
- BUG_ON(parent_ctx->threads->lf_slots[slot] != NULL);
+ if (LogFileNewThreadedCtx(
+ parent_ctx, parent_ctx->filename, parent_ctx->threads->append, entry)) {
+ entry->isopen = true;
+ } else {
+ SCLogError(
+ "Unable to open slot %d for file %s", entry->slot_number, parent_ctx->filename);
+ (void)HashTableRemove(parent_ctx->threads->ht, entry, 0);
+ }
}
SCMutexUnlock(&parent_ctx->threads->mutex);
if (sc_log_global_log_level >= SC_LOG_DEBUG) {
- if (parent_ctx->threads->lf_slots[slot])
- SCLogDebug("Existing file for thread/slot %d reference to file %s [ctx %p]", slot,
+ if (new) {
+ SCLogDebug("Existing file for thread/entry %p reference to file %s [ctx %p]", entry,
parent_ctx->filename, parent_ctx);
+ }
}
- return parent_ctx->threads->lf_slots[slot];
+ return entry->ctx;
}
/** \brief LogFileThreadedName() Create file name for threaded EVE storage
* \param parent_ctx
* \param log_path
* \param append
- * \param slot
+ * \param entry
*/
-static bool LogFileNewThreadedCtx(
- LogFileCtx *parent_ctx, const char *log_path, const char *append, int slot)
+static bool LogFileNewThreadedCtx(LogFileCtx *parent_ctx, const char *log_path, const char *append,
+ ThreadLogFileHashEntry *entry)
{
LogFileCtx *thread = SCCalloc(1, sizeof(LogFileCtx));
if (!thread) {
- SCLogError("Unable to allocate thread file context slot %d", slot);
+ SCLogError("Unable to allocate thread file context entry %p", entry);
return false;
}
}
thread->filename = SCStrdup(fname);
if (!thread->filename) {
- SCLogError("Unable to duplicate filename for context slot %d", slot);
+ SCLogError("Unable to duplicate filename for context entry %p", entry);
goto error;
}
thread->is_regular = true;
thread->Close = SCLogFileCloseNoLock;
OutputRegisterFileRotationFlag(&thread->rotation_flag);
} else if (parent_ctx->type == LOGFILE_TYPE_PLUGIN) {
+ entry->slot_number = SC_ATOMIC_ADD(eve_file_id, 1);
thread->plugin.plugin->ThreadInit(
- thread->plugin.init_data, slot, &thread->plugin.thread_data);
+ thread->plugin.init_data, entry->slot_number, &thread->plugin.thread_data);
}
thread->threaded = false;
thread->parent = parent_ctx;
- thread->slot = slot;
+ thread->entry = entry;
+ entry->ctx = thread;
- parent_ctx->threads->lf_slots[slot] = thread;
return true;
error:
thread->Close(thread);
}
}
+
if (thread) {
SCFree(thread);
}
- parent_ctx->threads->lf_slots[slot] = NULL;
return false;
}
SCReturnInt(0);
}
+ if (lf_ctx->type == LOGFILE_TYPE_PLUGIN) {
+ lf_ctx->plugin.plugin->Deinit(lf_ctx->plugin.init_data);
+ }
+
if (lf_ctx->threaded) {
BUG_ON(lf_ctx->threads == NULL);
SCMutexDestroy(&lf_ctx->threads->mutex);
- for(int i = 0; i < lf_ctx->threads->slot_count; i++) {
- if (!lf_ctx->threads->lf_slots[i]) {
- continue;
- }
- LogFileCtx *this_ctx = lf_ctx->threads->lf_slots[i];
-
- if (lf_ctx->type != LOGFILE_TYPE_PLUGIN) {
- OutputUnregisterFileRotationFlag(&this_ctx->rotation_flag);
- this_ctx->Close(this_ctx);
- } else {
- lf_ctx->plugin.plugin->ThreadDeinit(
- this_ctx->plugin.init_data, this_ctx->plugin.thread_data);
- }
- SCFree(lf_ctx->threads->lf_slots[i]->filename);
- SCFree(lf_ctx->threads->lf_slots[i]);
- }
- SCFree(lf_ctx->threads->lf_slots);
if (lf_ctx->threads->append)
SCFree(lf_ctx->threads->append);
if (lf_ctx->threads->ht) {
if (lf_ctx->fp != NULL) {
lf_ctx->Close(lf_ctx);
}
- if (lf_ctx->parent) {
- SCMutexLock(&lf_ctx->parent->threads->mutex);
- lf_ctx->parent->threads->lf_slots[lf_ctx->slot] = NULL;
- SCMutexUnlock(&lf_ctx->parent->threads->mutex);
- }
}
SCMutexDestroy(&lf_ctx->fp_mutex);
}
- if (lf_ctx->type == LOGFILE_TYPE_PLUGIN) {
- lf_ctx->plugin.plugin->Deinit(lf_ctx->plugin.init_data);
- }
-
if (lf_ctx->prefix != NULL) {
SCFree(lf_ctx->prefix);
lf_ctx->prefix_len = 0;