json_ctx->file_ctx->prefix_len = strlen(prefix);
}
+ /* Threaded file output */
+ const ConfNode *threaded = ConfNodeLookupChild(conf, "threaded");
+ if (threaded && threaded->val && ConfValIsTrue(threaded->val)) {
+ SCLogConfig("Threaded EVE logging configured");
+ json_ctx->file_ctx->threaded = true;
+ } else {
+ json_ctx->file_ctx->threaded = false;
+ }
+
if (json_ctx->json_out == LOGFILE_TYPE_FILE ||
json_ctx->json_out == LOGFILE_TYPE_UNIX_DGRAM ||
json_ctx->json_out == LOGFILE_TYPE_UNIX_STREAM)
{
- if (json_ctx->json_out == LOGFILE_TYPE_FILE) {
- /* Threaded file output */
- const ConfNode *threaded = ConfNodeLookupChild(conf, "threaded");
- if (threaded && threaded->val && ConfValIsTrue(threaded->val)) {
- SCLogConfig("Enabling threaded eve logging.");
- json_ctx->file_ctx->threaded = true;
- } else {
- json_ctx->file_ctx->threaded = false;
- }
- }
if (SCConfLogOpenGeneric(conf, json_ctx->file_ctx, DEFAULT_LOG_FILENAME, 1) < 0) {
LogFileFreeCtx(json_ctx->file_ctx);
}
if (SCConfLogOpenRedis(redis_node, json_ctx->file_ctx) < 0) {
+ SCFree(json_ctx->file_ctx->sensor_name);
LogFileFreeCtx(json_ctx->file_ctx);
SCFree(json_ctx);
SCFree(output_ctx);
}
}
#endif
+#ifdef HAVE_PLUGINS
else if (json_ctx->json_out == LOGFILE_TYPE_PLUGIN) {
- ConfNode *plugin_conf = ConfNodeLookupChild(conf,
- json_ctx->plugin->name);
- void *plugin_data = NULL;
- if (json_ctx->plugin->Init(plugin_conf, false, &plugin_data) < 0) {
+ if (json_ctx->file_ctx->threaded) {
+ /* Prepare for storing per-thread data */
+ if (!SCLogOpenThreadedFile(NULL, NULL, json_ctx->file_ctx, 1)) {
+ SCFree(json_ctx);
+ SCFree(output_ctx);
+ return result;
+ }
+ }
+
+ void *init_data = NULL;
+ if (json_ctx->plugin->Init(conf, json_ctx->file_ctx->threaded, &init_data) < 0) {
LogFileFreeCtx(json_ctx->file_ctx);
SCFree(json_ctx);
SCFree(output_ctx);
return result;
- } else {
- json_ctx->file_ctx->plugin = json_ctx->plugin;
- json_ctx->file_ctx->plugin_data = plugin_data;
}
+
+ /* Now that initialization completed successfully, if threaded, make sure
+ * that ThreadInit and ThreadDeInit exist
+ */
+ if (json_ctx->file_ctx->threaded) {
+ if (!json_ctx->plugin->ThreadInit || !json_ctx->plugin->ThreadDeinit) {
+ FatalError(SC_ERR_LOG_OUTPUT, "Output logger must supply ThreadInit and "
+ "ThreadDeinit functions for threaded mode");
+ }
+ }
+
+ json_ctx->file_ctx->plugin.plugin = json_ctx->plugin;
+ json_ctx->file_ctx->plugin.init_data = init_data;
}
+#endif
const char *sensor_id_s = ConfNodeLookupChildValue(conf, "sensor-id");
if (sensor_id_s != NULL) {
#endif /* HAVE_LIBHIREDIS */
static bool LogFileNewThreadedCtx(LogFileCtx *parent_ctx, const char *log_path, const char *append, int i);
-static bool SCLogOpenThreadedFileFp(const char *log_path, const char *append, LogFileCtx *parent_ctx, int slot_count);
// Threaded eve.json identifier
static SC_ATOMIC_DECL_AND_INIT_WITH_VAL(uint32_t, eve_file_id, 1);
SCMutexUnlock(&log_ctx->fp_mutex);
}
-static bool
-SCLogOpenThreadedFileFp(const char *log_path, const char *append, LogFileCtx *parent_ctx, int slot_count)
+bool SCLogOpenThreadedFile(
+ const char *log_path, const char *append, LogFileCtx *parent_ctx, int slot_count)
{
parent_ctx->threads = SCCalloc(1, sizeof(LogThreadedFileCtx));
if (!parent_ctx->threads) {
SCLogError(SC_ERR_MEM_ALLOC, "Unable to allocate threads container");
return false;
}
- parent_ctx->threads->append = SCStrdup(append);
- if (!parent_ctx->threads->append) {
- SCLogError(SC_ERR_MEM_ALLOC, "Unable to allocate threads append setting");
- goto error_exit;
+ if (append) {
+ parent_ctx->threads->append = SCStrdup(append);
+ if (!parent_ctx->threads->append) {
+ SCLogError(SC_ERR_MEM_ALLOC, "Unable to allocate threads append setting");
+ goto error_exit;
+ }
}
parent_ctx->threads->slot_count = slot_count;
log_ctx->json_flags &= ~(JSON_ESCAPE_SLASH);
}
+#ifdef BUILD_WITH_UNIXSOCKET
+ if (log_ctx->threaded) {
+ if (strcasecmp(filetype, "unix_stream") == 0 || strcasecmp(filetype, "unix_dgram") == 0) {
+ FatalError(SC_ERR_FATAL, "Socket file types do not support threaded output");
+ }
+ }
+#endif
// Now, what have we been asked to open?
if (strcasecmp(filetype, "unix_stream") == 0) {
#ifdef BUILD_WITH_UNIXSOCKET
if (log_ctx->fp == NULL)
return -1; // Error already logged by Open...Fp routine
} else {
- if (!SCLogOpenThreadedFileFp(log_path, append, log_ctx, 1)) {
+ if (!SCLogOpenThreadedFile(log_path, append, log_ctx, 1)) {
return -1;
}
}
if (rotate) {
OutputRegisterFileRotationFlag(&log_ctx->rotation_flag);
}
-#ifdef HAVE_LIBHIREDIS
- } else if (strcasecmp(filetype, "redis") == 0) {
- ConfNode *redis_node = ConfNodeLookupChild(conf, "redis");
- if (SCConfLogOpenRedis(redis_node, log_ctx) < 0) {
- SCLogError(SC_ERR_REDIS, "failed to open redis output");
- return -1;
- }
- log_ctx->type = LOGFILE_TYPE_REDIS;
-#endif
} else {
SCLogError(SC_ERR_INVALID_YAML_CONF_ENTRY, "Invalid entry for "
"%s.filetype. Expected \"regular\" (default), \"unix_stream\", "
SCLogDebug("Opening new file for %d reference to file ctx %p", thread_id, parent_ctx);
LogFileNewThreadedCtx(parent_ctx, parent_ctx->filename, parent_ctx->threads->append, thread_id);
}
- SCLogDebug("Existing file for %d reference to file ctx %p", thread_id, parent_ctx);
SCMutexUnlock(&parent_ctx->threads->mutex);
+ SCLogDebug("Existing file for %d reference to file ctx %p", thread_id, parent_ctx);
return parent_ctx->threads->lf_slots[thread_id];
}
}
if (new_array == NULL) {
- SCLogError(SC_ERR_MEM_ALLOC, "Unable to increase file context array size to %d", new_size);
SCMutexUnlock(&parent_ctx->threads->mutex);
+ SCLogError(SC_ERR_MEM_ALLOC, "Unable to increase file context array size to %d", new_size);
return NULL;
}
}
*thread = *parent_ctx;
- char fname[NAME_MAX];
- if (!LogFileThreadedName(log_path, fname, sizeof(fname), SC_ATOMIC_ADD(eve_file_id, 1))) {
- SCLogError(SC_ERR_MEM_ALLOC, "Unable to create threaded filename for log");
- goto error;
- }
- SCLogDebug("Thread open -- using name %s [replaces %s]", fname, log_path);
- thread->fp = SCLogOpenFileFp(fname, append, thread->filemode);
- if (thread->fp == NULL) {
- goto error;
- }
- thread->filename = SCStrdup(fname);
- if (!thread->filename) {
- SCLogError(SC_ERR_MEM_ALLOC, "Unable to duplicate filename for context slot %d", thread_id);
- goto error;
+ if (parent_ctx->type == LOGFILE_TYPE_FILE) {
+ char fname[NAME_MAX];
+ if (!LogFileThreadedName(log_path, fname, sizeof(fname), SC_ATOMIC_ADD(eve_file_id, 1))) {
+ SCLogError(SC_ERR_MEM_ALLOC, "Unable to create threaded filename for log");
+ goto error;
+ }
+ SCLogDebug("Thread open -- using name %s [replaces %s]", fname, log_path);
+ thread->fp = SCLogOpenFileFp(fname, append, thread->filemode);
+ if (thread->fp == NULL) {
+ goto error;
+ }
+ thread->filename = SCStrdup(fname);
+ if (!thread->filename) {
+ SCLogError(SC_ERR_MEM_ALLOC, "Unable to duplicate filename for context slot %d",
+ thread_id);
+ goto error;
+ }
+ thread->is_regular = true;
+ thread->Write = SCLogFileWriteNoLock;
+ thread->Close = SCLogFileCloseNoLock;
+ OutputRegisterFileRotationFlag(&thread->rotation_flag);
+ } else if (parent_ctx->type == LOGFILE_TYPE_PLUGIN) {
+ thread->plugin.plugin->ThreadInit(
+ thread->plugin.init_data, thread_id, &thread->plugin.thread_data);
}
-
thread->threaded = false;
thread->parent = parent_ctx;
thread->id = thread_id;
- thread->is_regular = true;
- thread->Write = SCLogFileWriteNoLock;
- thread->Close = SCLogFileCloseNoLock;
- OutputRegisterFileRotationFlag(&thread->rotation_flag);
parent_ctx->threads->lf_slots[thread_id] = thread;
return true;
error:
- SC_ATOMIC_SUB(eve_file_id, 1);
- if (thread->fp) {
- thread->Close(thread);
+ if (parent_ctx->type == LOGFILE_TYPE_FILE) {
+ SC_ATOMIC_SUB(eve_file_id, 1);
+ if (thread->fp) {
+ thread->Close(thread);
+ }
}
if (thread) {
SCFree(thread);
}
if (lf_ctx->threaded) {
+ BUG_ON(lf_ctx->threads == NULL);
SCMutexDestroy(&lf_ctx->threads->mutex);
for(int i = 0; i < lf_ctx->threads->slot_count; i++) {
- if (lf_ctx->threads->lf_slots[i]) {
- OutputUnregisterFileRotationFlag(&lf_ctx->threads->lf_slots[i]->rotation_flag);
- lf_ctx->threads->lf_slots[i]->Close(lf_ctx->threads->lf_slots[i]);
- SCFree(lf_ctx->threads->lf_slots[i]->filename);
- SCFree(lf_ctx->threads->lf_slots[i]);
+ if (!lf_ctx->threads->lf_slots[i]) {
+ continue;
+ }
+ LogFileCtx *this_ctx = lf_ctx->threads->lf_slots[i];
+
+ if (lf_ctx->type != LOGFILE_TYPE_PLUGIN) {
+ OutputUnregisterFileRotationFlag(&this_ctx->rotation_flag);
+ this_ctx->Close(this_ctx);
+ } else {
+ lf_ctx->plugin.plugin->ThreadDeinit(
+ this_ctx->plugin.init_data, this_ctx->plugin.thread_data);
}
+ SCFree(lf_ctx->threads->lf_slots[i]->filename);
+ SCFree(lf_ctx->threads->lf_slots[i]);
}
SCFree(lf_ctx->threads->lf_slots);
- SCFree(lf_ctx->threads->append);
+ if (lf_ctx->threads->append)
+ SCFree(lf_ctx->threads->append);
SCFree(lf_ctx->threads);
} else {
- if (lf_ctx->type == LOGFILE_TYPE_PLUGIN) {
- if (lf_ctx->plugin->Deinit != NULL) {
- lf_ctx->plugin->Deinit(lf_ctx->plugin_data);
+ if (lf_ctx->type != LOGFILE_TYPE_PLUGIN) {
+ if (lf_ctx->fp != NULL) {
+ lf_ctx->Close(lf_ctx);
+ }
+ if (lf_ctx->parent) {
+ SCMutexLock(&lf_ctx->parent->threads->mutex);
+ lf_ctx->parent->threads->lf_slots[lf_ctx->id] = NULL;
+ SCMutexUnlock(&lf_ctx->parent->threads->mutex);
}
- } else if (lf_ctx->fp != NULL) {
- lf_ctx->Close(lf_ctx);
- }
- if (lf_ctx->parent) {
- SCMutexLock(&lf_ctx->parent->threads->mutex);
- lf_ctx->parent->threads->lf_slots[lf_ctx->id] = NULL;
- SCMutexUnlock(&lf_ctx->parent->threads->mutex);
}
SCMutexDestroy(&lf_ctx->fp_mutex);
}
+ if (lf_ctx->type == LOGFILE_TYPE_PLUGIN) {
+ lf_ctx->plugin.plugin->Deinit(lf_ctx->plugin.init_data);
+ }
+
if (lf_ctx->prefix != NULL) {
SCFree(lf_ctx->prefix);
lf_ctx->prefix_len = 0;
OutputUnregisterFileRotationFlag(&lf_ctx->rotation_flag);
}
+ memset(lf_ctx, 0, sizeof(*lf_ctx));
SCFree(lf_ctx);
SCReturnInt(1);
MemBufferWriteString(buffer, "\n");
file_ctx->Write((const char *)MEMBUFFER_BUFFER(buffer),
MEMBUFFER_OFFSET(buffer), file_ctx);
+ } else if (file_ctx->type == LOGFILE_TYPE_PLUGIN) {
+ file_ctx->plugin.plugin->Write((const char *)MEMBUFFER_BUFFER(buffer),
+ MEMBUFFER_OFFSET(buffer), file_ctx->plugin.init_data, file_ctx->plugin.thread_data);
}
#ifdef HAVE_LIBHIREDIS
else if (file_ctx->type == LOGFILE_TYPE_REDIS) {
SCMutexUnlock(&file_ctx->fp_mutex);
}
#endif
- else if (file_ctx->type == LOGFILE_TYPE_PLUGIN) {
- file_ctx->plugin->Write((const char *)MEMBUFFER_BUFFER(buffer),
- MEMBUFFER_OFFSET(buffer), file_ctx->plugin_data, NULL);
- }
return 0;
}