if (p->flow != NULL) {
int applayer = 0;
- FLOWLOCK_RDLOCK(p->flow);
applayer = StreamTcpAppLayerIsDisabled(p->flow);
CreateTimeString(&p->flow->startts, timebuf, sizeof(timebuf));
MemBufferWriteString(aft->buffer, "FLOW Start TS: %s\n", timebuf);
(p->flow->alproto != ALPROTO_UNKNOWN) ? "TRUE" : "FALSE", p->flow->alproto);
AlertDebugLogFlowVars(aft, p);
AlertDebugLogFlowBits(aft, (Packet *)p); /* < no const */
- FLOWLOCK_UNLOCK(p->flow);
}
AlertDebugLogPktVars(aft, p);
char buffer[XFF_MAXLEN];
int have_xff_ip = 0;
- FLOWLOCK_RDLOCK(p->flow);
if (FlowGetAppProtocol(p->flow) == ALPROTO_HTTP) {
have_xff_ip = HttpXFFGetIP(p, xff_cfg, buffer, XFF_MAXLEN);
}
- FLOWLOCK_UNLOCK(p->flow);
if (have_xff_ip) {
/** Be sure that we have a nice zeroed buffer */
char buffer[XFF_MAXLEN];
int have_xff_ip = 0;
- FLOWLOCK_RDLOCK(p->flow);
if (FlowGetAppProtocol(p->flow) == ALPROTO_HTTP) {
if (pa->flags & PACKET_ALERT_FLAG_TX) {
have_xff_ip = HttpXFFGetIPFromTx(p, pa->tx_id, xff_cfg, buffer, XFF_MAXLEN);
have_xff_ip = HttpXFFGetIP(p, xff_cfg, buffer, XFF_MAXLEN);
}
}
- FLOWLOCK_UNLOCK(p->flow);
if (have_xff_ip) {
memset(aun->xff_ip, 0, 4 * sizeof(uint32_t));
char buffer[XFF_MAXLEN];
int have_xff_ip = 0;
- FLOWLOCK_RDLOCK(p->flow);
if (FlowGetAppProtocol(p->flow) == ALPROTO_HTTP) {
if (pa->flags & PACKET_ALERT_FLAG_TX) {
have_xff_ip = HttpXFFGetIPFromTx(p, pa->tx_id, xff_cfg, buffer, XFF_MAXLEN);
have_xff_ip = HttpXFFGetIP(p, xff_cfg, buffer, XFF_MAXLEN);
}
}
- FLOWLOCK_UNLOCK(p->flow);
if (have_xff_ip) {
memset(aun->xff_ip, 0, 4 * sizeof(uint32_t));
* - Detection
*
* This all while holding the flow lock.
- *
- * TODO
- * - once we have a single entry point into the outputs they
- * will have to move into this as well.
- * - once outputs are here we can also call StreamTcpPrune here
- * instead of in the packet pool return code
*/
#include "suricata-common.h"
#include "stream-tcp.h"
#include "app-layer.h"
#include "detect-engine.h"
+#include "output.h"
#include "util-validate.h"
SC_ATOMIC_DECLARE(DetectEngineThreadCtxPtr, detect_thread);
-#if 0
- void *output_thread; // XXX multiple, not a single state
-#endif
+ void *output_thread; /* Output thread data. */
+
PacketQueue pq;
} FlowWorkerThreadData;
BUG_ON(DetectEngineThreadCtxInit(tv, NULL, &detect_thread) != TM_ECODE_OK);
SC_ATOMIC_SET(fw->detect_thread, detect_thread);
}
-#if 0
- // setup OUTPUTS
-#endif
+
+ /* Setup outputs for this thread. */
+ OutputLoggerThreadInit(tv, initdata, &fw->output_thread);
/* setup pq for stream end pkts */
memset(&fw->pq, 0, sizeof(PacketQueue));
DetectEngineThreadCtxDeinit(tv, detect_thread);
SC_ATOMIC_SET(fw->detect_thread, NULL);
}
-#if 0
- // free OUTPUT
-#endif
+
+ /* Free output. */
+ OutputLoggerThreadDeinit(tv, fw->output_thread);
/* free pq */
BUG_ON(fw->pq.len);
Detect(tv, x, detect_thread, NULL, NULL);
FLOWWORKER_PROFILING_END(x, PROFILE_FLOWWORKER_DETECT);
}
-#if 0
+
// Outputs
-#endif
+ OutputLoggerLog(tv, x, fw->output_thread, preq, unused);
+
/* put these packets in the preq queue so that they are
* by the other thread modules before packet 'p'. */
PacketEnqueue(preq, x);
Detect(tv, p, detect_thread, NULL, NULL);
FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_DETECT);
}
-#if 0
- // Outputs
- // StreamTcpPruneSession (from TmqhOutputPacketpool)
-#endif
+ // Outputs.
+ OutputLoggerLog(tv, p, fw->output_thread, preq, unused);
+
+ /* Release tcp segments. Done here after alerting can use them. */
+ if (p->flow != NULL && p->proto == IPPROTO_TCP) {
+ StreamTcpPruneSession(p->flow, p->flowflags & FLOW_PKT_TOSERVER ?
+ STREAM_TOSERVER : STREAM_TOCLIENT);
+ }
if (p->flow) {
DEBUG_ASSERT_FLOW_LOCKED(p->flow);
return "error";
}
+static void FlowWorkerExitPrintStats(ThreadVars *tv, void *data)
+{
+ FlowWorkerThreadData *fw = data;
+ OutputLoggerExitPrintStats(tv, fw->output_thread);
+}
void TmModuleFlowWorkerRegister (void)
{
tmm_modules[TMM_FLOWWORKER].ThreadInit = FlowWorkerThreadInit;
tmm_modules[TMM_FLOWWORKER].Func = FlowWorker;
tmm_modules[TMM_FLOWWORKER].ThreadDeinit = FlowWorkerThreadDeinit;
+ tmm_modules[TMM_FLOWWORKER].ThreadExitPrintStats = FlowWorkerExitPrintStats;
tmm_modules[TMM_FLOWWORKER].cap_flags = 0;
tmm_modules[TMM_FLOWWORKER].flags = TM_FLAG_STREAM_TM|TM_FLAG_DETECT_TM;
}
if (p->flow != NULL) {
int ret = FALSE;
- FLOWLOCK_RDLOCK(p->flow);
if (p->flow->flags & FLOW_ACTION_DROP) {
if (PKT_IS_TOSERVER(p) && !(p->flow->flags & FLOW_TOSERVER_DROP_LOGGED))
ret = TRUE;
else if (PKT_IS_TOCLIENT(p) && !(p->flow->flags & FLOW_TOCLIENT_DROP_LOGGED))
ret = TRUE;
}
- FLOWLOCK_UNLOCK(p->flow);
return ret;
} else if (PACKET_TEST_ACTION(p, ACTION_DROP)) {
return TRUE;
return -1;
if (p->flow) {
- FLOWLOCK_RDLOCK(p->flow);
if (p->flow->flags & FLOW_ACTION_DROP) {
if (PKT_IS_TOSERVER(p) && !(p->flow->flags & FLOW_TOSERVER_DROP_LOGGED))
p->flow->flags |= FLOW_TOSERVER_DROP_LOGGED;
else if (PKT_IS_TOCLIENT(p) && !(p->flow->flags & FLOW_TOCLIENT_DROP_LOGGED))
p->flow->flags |= FLOW_TOCLIENT_DROP_LOGGED;
}
- FLOWLOCK_UNLOCK(p->flow);
}
return 0;
}
int file_close = (p->flags & PKT_PSEUDO_STREAM_END) ? 1 : 0;
int file_trunc = 0;
- FLOWLOCK_WRLOCK(f); // < need write lock for FilePrune below
file_trunc = StreamTcpReassembleDepthReached(p);
FileContainer *ffc = AppLayerParserGetFiles(p->proto, f->alproto,
FilePrune(ffc);
}
- FLOWLOCK_UNLOCK(f);
return TM_ECODE_OK;
}
int file_close = (p->flags & PKT_PSEUDO_STREAM_END) ? 1 : 0;
int file_trunc = 0;
- FLOWLOCK_WRLOCK(f); // < need write lock for FiledataPrune below
file_trunc = StreamTcpReassembleDepthReached(p);
FileContainer *ffc = AppLayerParserGetFiles(p->proto, f->alproto,
FilePrune(ffc);
}
- FLOWLOCK_UNLOCK(f);
return TM_ECODE_OK;
}
if (json_output_ctx->flags & LOG_JSON_HTTP) {
if (p->flow != NULL) {
- FLOWLOCK_RDLOCK(p->flow);
uint16_t proto = FlowGetAppProtocol(p->flow);
/* http alert */
if (hjs)
json_object_set_new(js, "http", hjs);
}
-
- FLOWLOCK_UNLOCK(p->flow);
}
}
if (json_output_ctx->flags & LOG_JSON_TLS) {
if (p->flow != NULL) {
- FLOWLOCK_RDLOCK(p->flow);
uint16_t proto = FlowGetAppProtocol(p->flow);
/* http alert */
if (proto == ALPROTO_TLS)
AlertJsonTls(p->flow, js);
-
- FLOWLOCK_UNLOCK(p->flow);
}
}
if (json_output_ctx->flags & LOG_JSON_SSH) {
if (p->flow != NULL) {
- FLOWLOCK_RDLOCK(p->flow);
uint16_t proto = FlowGetAppProtocol(p->flow);
/* http alert */
if (proto == ALPROTO_SSH)
AlertJsonSsh(p->flow, js);
-
- FLOWLOCK_UNLOCK(p->flow);
}
}
if (json_output_ctx->flags & LOG_JSON_SMTP) {
if (p->flow != NULL) {
- FLOWLOCK_RDLOCK(p->flow);
uint16_t proto = FlowGetAppProtocol(p->flow);
/* http alert */
if (hjs)
json_object_set_new(js, "email", hjs);
}
-
- FLOWLOCK_UNLOCK(p->flow);
}
}
int have_xff_ip = 0;
char buffer[XFF_MAXLEN];
- FLOWLOCK_RDLOCK(p->flow);
if (FlowGetAppProtocol(p->flow) == ALPROTO_HTTP) {
if (pa->flags & PACKET_ALERT_FLAG_TX) {
have_xff_ip = HttpXFFGetIPFromTx(p, pa->tx_id, xff_cfg, buffer, XFF_MAXLEN);
have_xff_ip = HttpXFFGetIP(p, xff_cfg, buffer, XFF_MAXLEN);
}
}
- FLOWLOCK_UNLOCK(p->flow);
if (have_xff_ip) {
if (xff_cfg->flags & XFF_EXTRADATA) {
return 0;
if (p->flow) {
- FLOWLOCK_RDLOCK(p->flow);
if (p->flow->flags & FLOW_ACTION_DROP) {
if (PKT_IS_TOSERVER(p) && !(p->flow->flags & FLOW_TOSERVER_DROP_LOGGED))
p->flow->flags |= FLOW_TOSERVER_DROP_LOGGED;
else if (PKT_IS_TOCLIENT(p) && !(p->flow->flags & FLOW_TOCLIENT_DROP_LOGGED))
p->flow->flags |= FLOW_TOCLIENT_DROP_LOGGED;
}
- FLOWLOCK_UNLOCK(p->flow);
}
return 0;
}
int ret = FALSE;
/* for a flow that will be dropped fully, log just once per direction */
- FLOWLOCK_RDLOCK(p->flow);
if (p->flow->flags & FLOW_ACTION_DROP) {
if (PKT_IS_TOSERVER(p) && !(p->flow->flags & FLOW_TOSERVER_DROP_LOGGED))
ret = TRUE;
else if (PKT_IS_TOCLIENT(p) && !(p->flow->flags & FLOW_TOCLIENT_DROP_LOGGED))
ret = TRUE;
}
- FLOWLOCK_UNLOCK(p->flow);
/* if drop is caused by signature, log anyway */
if (p->alerts.drop.action != 0)
}
/* check if we have SSH state or not */
- FLOWLOCK_WRLOCK(p->flow);
uint16_t proto = FlowGetAppProtocol(p->flow);
if (proto != ALPROTO_SSH)
goto end;
/* we only log the state once */
ssh_state->cli_hdr.flags |= SSH_FLAG_STATE_LOGGED;
end:
- FLOWLOCK_UNLOCK(p->flow);
return 0;
}
return FALSE;
}
- FLOWLOCK_RDLOCK(p->flow);
uint16_t proto = FlowGetAppProtocol(p->flow);
if (proto != ALPROTO_SSH)
goto dontlog;
/* todo: logic to log once */
- FLOWLOCK_UNLOCK(p->flow);
return TRUE;
dontlog:
- FLOWLOCK_UNLOCK(p->flow);
return FALSE;
}
}
SCMutexUnlock(&td->lua_ctx->m);
- FLOWLOCK_WRLOCK(p->flow);
SshState *ssh_state = (SshState *)FlowGetAppState(p->flow);
if (ssh_state != NULL)
ssh_state->cli_hdr.flags |= SSH_FLAG_STATE_LOGGED_LUA;
- FLOWLOCK_UNLOCK(p->flow);
SCReturnInt(0);
}
return FALSE;
}
- FLOWLOCK_RDLOCK(p->flow);
uint16_t proto = FlowGetAppProtocol(p->flow);
if (proto != ALPROTO_SSH)
goto dontlog;
if (ssh_state->cli_hdr.flags & SSH_FLAG_STATE_LOGGED_LUA)
goto dontlog;
- FLOWLOCK_UNLOCK(p->flow);
return TRUE;
dontlog:
- FLOWLOCK_UNLOCK(p->flow);
return FALSE;
}
else
flags |= OUTPUT_STREAMING_FLAG_TOSERVER;
- FLOWLOCK_WRLOCK(f);
-
if (op_thread_data->loggers & (1<<STREAMING_TCP_DATA)) {
TcpSession *ssn = f->protoctx;
if (ssn) {
}
}
- FLOWLOCK_UNLOCK(f);
return TM_ECODE_OK;
}
Flow * const f = p->flow;
- FLOWLOCK_WRLOCK(f); /* WRITE lock before we updated flow logged id */
AppProto alproto = f->alproto;
if (AppLayerParserProtocolIsTxAware(p->proto, alproto) == 0)
}
end:
- FLOWLOCK_UNLOCK(f);
return TM_ECODE_OK;
}
}
}
-static TmEcode OutputLoggerLog(ThreadVars *tv, Packet *p, void *thread_data,
+TmEcode OutputLoggerLog(ThreadVars *tv, Packet *p, void *thread_data,
PacketQueue *pq, PacketQueue *postpq)
{
LoggerThreadStore *thread_store = (LoggerThreadStore *)thread_data;
return TM_ECODE_OK;
}
-static TmEcode OutputLoggerThreadInit(ThreadVars *tv, void *initdata,
+TmEcode OutputLoggerThreadInit(ThreadVars *tv, void *initdata,
void **data)
{
LoggerThreadStore *thread_store = SCCalloc(1, sizeof(*thread_store));
return TM_ECODE_OK;
}
-static TmEcode OutputLoggerThreadDeinit(ThreadVars *tv, void *thread_data)
+TmEcode OutputLoggerThreadDeinit(ThreadVars *tv, void *thread_data)
{
LoggerThreadStore *thread_store = (LoggerThreadStore *)thread_data;
RootLogger *logger = TAILQ_FIRST(&RootLoggers);
return TM_ECODE_OK;
}
-static void OutputLoggerExitPrintStats(ThreadVars *tv, void *thread_data)
+void OutputLoggerExitPrintStats(ThreadVars *tv, void *thread_data)
{
LoggerThreadStore *thread_store = (LoggerThreadStore *)thread_data;
RootLogger *logger = TAILQ_FIRST(&RootLoggers);
void TmModuleLoggerRegister(void)
{
- tmm_modules[TMM_LOGGER].name = "__root_logger__";
- tmm_modules[TMM_LOGGER].ThreadInit = OutputLoggerThreadInit;
- tmm_modules[TMM_LOGGER].ThreadDeinit = OutputLoggerThreadDeinit;
- tmm_modules[TMM_LOGGER].ThreadExitPrintStats = OutputLoggerExitPrintStats;
- tmm_modules[TMM_LOGGER].Func = OutputLoggerLog;;
-
OutputRegisterRootLoggers();
OutputRegisterLoggers();
}
-void SetupOutputs(ThreadVars *tv)
-{
- TmSlotSetFuncAppend(tv, &tmm_modules[TMM_LOGGER], NULL);
-}
-
/**
* \brief Register all root loggers.
*/
typedef OutputCtx *(*OutputInitSubFunc)(ConfNode *, OutputCtx *);
typedef TmEcode (*OutputLogFunc)(ThreadVars *, Packet *, void *, PacketQueue *,
PacketQueue *);
-
typedef struct OutputModule_ {
LoggerId logger_id;
const char *name;
ThreadExitPrintStatsFunc ThreadExitPrintStats,
OutputLogFunc LogFunc);
void TmModuleLoggerRegister(void);
-void SetupOutputs(ThreadVars *);
+
+TmEcode OutputLoggerLog(ThreadVars *, Packet *, void *, PacketQueue *,
+ PacketQueue *);
+TmEcode OutputLoggerThreadInit(ThreadVars *, void *, void **);
+TmEcode OutputLoggerThreadDeinit(ThreadVars *, void *);
+void OutputLoggerExitPrintStats(ThreadVars *, void *);
#endif /* ! __OUTPUT_H__ */
}
TmSlotSetFuncAppend(tv, tm_module, NULL);
- SetupOutputs(tv);
-
if (TmThreadSpawn(tv) != TM_ECODE_OK) {
printf("ERROR: TmThreadSpawn failed\n");
exit(EXIT_FAILURE);
TmThreadSetGroupName(tv_detect_ncpu, "Detect");
- /* Add logger. */
- SetupOutputs(tv_detect_ncpu);
-
if (TmThreadSpawn(tv_detect_ncpu) != TM_ECODE_OK) {
printf("ERROR: TmThreadSpawn failed\n");
exit(EXIT_FAILURE);
}
TmSlotSetFuncAppend(tv, tm_module, NULL);
- SetupOutputs(tv);
-
TmThreadSetCPU(tv, WORKER_CPU_SET);
#ifndef AFLFUZZ_PCAP_RUNMODE
TmThreadSetGroupName(tv_detect_ncpu, "Detect");
- /* Add logger. */
- SetupOutputs(tv_detect_ncpu);
-
TmThreadSetCPU(tv_detect_ncpu, WORKER_CPU_SET);
if (TmThreadSpawn(tv_detect_ncpu) != TM_ECODE_OK) {
CASE_CODE (TMM_DETECTLOADER);
CASE_CODE (TMM_RECEIVENETMAP);
CASE_CODE (TMM_DECODENETMAP);
- CASE_CODE (TMM_LOGGER);
CASE_CODE (TMM_SIZE);
}
TMM_UNIXMANAGER,
- TMM_LOGGER,
-
TMM_SIZE,
} TmmId;
SCEnter();
SCLogDebug("Packet %p, p->root %p, alloced %s", p, p->root, p->flags & PKT_ALLOC ? "true" : "false");
- /** \todo make this a callback
- * Release tcp segments. Done here after alerting can use them. */
- if (p->flow != NULL && p->proto == IPPROTO_TCP) {
- SCMutexLock(&p->flow->m);
- StreamTcpPruneSession(p->flow, p->flowflags & FLOW_PKT_TOSERVER ?
- STREAM_TOSERVER : STREAM_TOCLIENT);
- SCMutexUnlock(&p->flow->m);
- }
-
if (IS_TUNNEL_PKT(p)) {
SCLogDebug("Packet %p is a tunnel packet: %s",
p,p->root ? "upper layer" : "tunnel root");
}
TmSlotSetFuncAppend(tv_detect_ncpu, tm_module, NULL);
- /* Add logger. */
- SetupOutputs(tv_detect_ncpu);
-
if (TmThreadSpawn(tv_detect_ncpu) != TM_ECODE_OK) {
SCLogError(SC_ERR_RUNMODE, "TmThreadSpawn failed");
exit(EXIT_FAILURE);
}
TmSlotSetFuncAppend(tv, tm_module, NULL);
- SetupOutputs(tv);
-
TmThreadSetCPU(tv, WORKER_CPU_SET);
if (TmThreadSpawn(tv) != TM_ECODE_OK) {
TmThreadSetCPU(tv_detect_ncpu, WORKER_CPU_SET);
- SetupOutputs(tv_detect_ncpu);
-
TmThreadSetGroupName(tv_detect_ncpu, "Detect");
if (TmThreadSpawn(tv_detect_ncpu) != TM_ECODE_OK) {
}
TmSlotSetFuncAppend(tv, tm_module, NULL);
- SetupOutputs(tv);
-
TmThreadSetCPU(tv, WORKER_CPU_SET);
if (TmThreadSpawn(tv) != TM_ECODE_OK) {